redis延时队列

时间:2024-11-21 09:49:01
import cn.hutool.json.JSONUtil; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingDeque; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.cloud.sleuth.Span; import org.springframework.cloud.sleuth.Tracer; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; @Slf4j @Component public class CardKitMessageListener implements ApplicationRunner { public static final String CardKitMessageDelayQueue = "QUEUE:CARD_KIT"; public static final String CardKitMessageDelayLock = "LOCK:CARD_KIT"; @Resource private RedissonClient redissonClient; @Autowired private Tracer tracer; @Autowired private CardKitService cardKitService; @Override public void run(ApplicationArguments args) { new Thread(() -> { RBlockingDeque<CardKitRedisBo> blockingDeque = redissonClient.getBlockingDeque(CardKitMessageDelayQueue); while (true) { // 获取定时任务锁 RLock rLock = redissonClient.getLock(CardKitMessageDelayLock); try { // 最多等待5秒 boolean isLocked = rLock.tryLock(5, TimeUnit.SECONDS); if (isLocked) { Span span = tracer.nextSpan().name("OccupationMessage").start(); try (Tracer.SpanInScope ws = tracer.withSpan(span)) { CardKitRedisBo poll = blockingDeque.take(); log.info("获取延时消息:{}", JSONUtil.toJsonStr(poll)); // 消费消息 cardKitService.sendCardKit(poll); } finally { try { rLock.unlock(); } catch (Exception ex) { log.warn("锁释放失败:" + ex.getMessage()); } try { span.end(); } catch (Exception ex) { log.error("失败", ex) } } } } catch (Exception ex) { log.error("延迟消息处理异常:" + ex.getMessage(), ex); } } }).start(); } }