【无标题】Delayed延迟队列不工作

背景

项目中使用java 自带的延迟队列Delayed,只有添加进队列的消息,并没有被消费到

版本

jdk1.8

问题原因

上一个消费队列出现异常并且没有捕获,下一个队列就没有进行消费

复现代码

没有抛异常的情况下

package com.ccb.core.config.delay;

import lombok.Data;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 防护舱延迟对象
 *
 * @author sz
 * @version 1.0
 * @date 2023-02-10 15:47
 */
@Data
public class TestDelay implements Delayed {


     private  String seqId;
    /**
    *过期时间
    */
    private Long expireTime;
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expireTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        if (other == this){
            return 0;
        }
        if(other instanceof TestDelay){
            TestDelay otherRequest = (TestDelay)other;
            long otherStartTime = otherRequest.expireTime;
            return (int)(this.expireTime - otherStartTime);
        }
        return 0;
    }
}


package com.ccb.core.config.delay;

import com.ccb.core.common.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executor;

/**
 * @author sz
 * @version 1.0
 * @date 2023-10-08 19:12
 */
@Slf4j
@Component
public class ProtectDelayTestComponent {

    private static DelayQueue<TestDelay> delayQueue = new DelayQueue<TestDelay>();

    @Resource(name = "poiExecutor")
    private Executor poiExecutor;

    /**
     * 系统启动时,预先加载的数据@PostConstruct
     */
    @PostConstruct
    public void init(){

        log.info("线程进入ProtectDelayTestComponent***************init");

        poiExecutor.execute(() -> {

            while(true){
                try {
                    TestDelay protectDelay = delayQueue.take();
                    log.info("获取到的延迟队列信息:{}", protectDelay);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

    }

    /**
     * 加入延时队列
     *
     */
    public boolean addDelayQueue(TestDelay protectDelay){
        log.info("添加进延迟队列信息为{},对应的过期时间为:{}",protectDelay, DateUtil.convertTimeToString(protectDelay.getExpireTime(),"yyyy-MM-dd HH:mm:ss"));
        return delayQueue.add(protectDelay);
    }
}

写TestController 测试正常的添加进入延迟队列

  @GetMapping("/test/testProtectDelay")
    @ApiModelProperty("测试批量死信队列")
    public RespData testProtectDelay(){
        TestDelay protectDelay =new TestDelay();
        String s = UuidUtils.generateUuid();
        log.info("testProtectDelay的参数:{}",s);
        protectDelay.setSeqId(s);
        protectDelay.setExpireTime(System.currentTimeMillis()+10*1000);
        protectDelayTestComponent.addDelayQueue(protectDelay);
        return RespData.success();
    }

正常运行结果
在这里插入图片描述

手动抛出异常后

在这里插入图片描述

解决方案

在take中捕获Exception ,可以继续消费