最近用到一个延迟消息的功能,第一时间想到使用MQ或者MQ的插件,因为数据量不大,所以尝试使用Redis来实现了,毕竟Redis也天生支持类似MQ的队列消费,所以,在这里总结了一下Redis实现延迟消息队列的方式。
一、监听key过期时间
处理流程:当redis的一个key过期时,redis会生成一个事件,通知订阅了该事件的客户端(KeyExpirationEventMessageListener),然后在客户端的回调方法中处理逻辑。
1)新建SpringBoot项目,maven依赖及yml如下
maven依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
yml文件
server: port: 8000 spring: redis: database: 0 host: xxxx port: 6379 password: xxxxxx lettuce: pool: #最大连接数 max-active: 8 #最大阻塞等待时间 max-wait: -1 #最大空闲 max-idle: 8 #最小空闲 min-idle: 0 #连接超时时间 timeout: 5000
2)修改redis.conf
文件开启事件通知配置
默认的配置:notify-keyspace-events “”
修改为:notify-keyspace-events Ex,该配置表示监听key的过期事件
3)设置Redis监听配置,注入Bean RedisMessageListenerContaine
@Configuration public class RedisTimeoutConfiguration { @Autowired private RedisConnectionFactory redisConnectionFactory; @Bean public RedisMessageListenerContainer redisMessageListenerContainer() { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); return redisMessageListenerContainer; } @Bean public KeyExpiredListener keyExpiredListener() { return new KeyExpiredListener(this.redisMessageListenerContainer()); } }
4)创建监听器类,重写key过期回调方法onMessage
@Slf4j public class KeyExpiredListener extends KeyExpirationEventMessageListener { @Autowired public RedisTemplate<String, String> redisTemplate; public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] bytes) { String channel = new String(message.getChannel(), StandardCharsets.UTF_8); //过期的key String key = new String(message.getBody(), StandardCharsets.UTF_8); log.info("redis key 过期:bytes={},channel={},key={}", new String(bytes), channel, key); } }
5)编写测试接口:写入一个带过期时间的key
@RestController @RequestMapping("/demo") public class BasicController { @Autowired public RedisTemplate<String, String> redisTemplate; @GetMapping(value = "/test") public void redisTest() { redisTemplate.opsForValue().set("test", "5s后过期", 5, TimeUnit.SECONDS); } }
执行后,onMessage
监听方法打印结果:
redis key 过期:bytes=__keyevent@*__:expired,channel=__keyevent@0__:expired,key=test
该方案缺点:可靠性问题,Redis 是一个内存数据库,尽管它提供了数据持久化选项(如 RDB 和 AOF),但在某些情况下(如意外崩溃或重启),可能会丢失一些未处理的过期事件。
二、zset + score
基本思路是将消息按需发送的时间作为分数存储在有序集合zset中,然后定期检查并处理到期的消息。代码例子如下:
1)创建 DelayedMessageService 类
@Slf4j @Service public class DelayedMessageService { private static final String DELAYED_MESSAGES_ZSET = "delayed:messages"; @Autowired private RedisTemplate<String, String> redisTemplate; public void addMessage(String message, long delayMillis) { long score = System.currentTimeMillis() + delayMillis; redisTemplate.opsForZSet().add(DELAYED_MESSAGES_ZSET, message, score); } @Scheduled(fixedRate = 1000) public void processMessages() { long now = System.currentTimeMillis(); Set<ZSetOperations.TypedTuple<String>> messages = redisTemplate.opsForZSet().rangeByScoreWithScores(DELAYED_MESSAGES_ZSET, 0, now); if (messages != null && !messages.isEmpty()) { for (ZSetOperations.TypedTuple<String> message : messages) { String msg = message.getValue(); long score = message.getScore().longValue(); if (score <= now) { // Process the message System.out.println("Processing message: " + msg); // Remove the message from the zset redisTemplate.opsForZSet().remove(DELAYED_MESSAGES_ZSET, msg); } } }else{ log.info("定时任务执行~"); } } }
2)编写Controller接口测试,初始化zset内容
@RestController @RequestMapping("/demo") public class BasicController { @Autowired private DelayedMessageService delayedMessageService; @GetMapping(value = "/test2") public void redisZsetTest() { // Add some messages with delays delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay } }
说明:
- redisZsetTest接口通过调用
DelayedMessageService
的addMessage
方法,将消息及其到期时间添加到 Redis 的 zset 中 - 开启一个定时任务,定期检查和处理到期的消息。使用
@Scheduled
注解定期执行,每秒检查一次,注意这里使用@Scheduled
,不要忘了启动类上添加@EnableScheduling
注解,否则定时任务不会生效。fixedRate
属性表示以固定的频率(毫秒为单位)执行方法。即方法执行完成后,会立即等待指定的毫秒数,然后再次执行。 - 通过
redisTemplate.opsForZSet().rangeByScoreWithScores
方法按时间范围获取到期的消息,消息处理完成后,从zset 中移除处理过的消息
三、Redisson框架
利用 Redisson 提供的数据结构RDelayedQueue
和RBlockingDeque
,可以自动处理过期的任务并将它们移动到阻塞队列中,这样我们就可以从阻塞队列中获取任务并进行消费处理。例子如下:
1)添加依赖
<!-- Redisson 依赖项 --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>2.15.1</version> </dependency>
2)创建DelayedMessageService
@Slf4j @Service public class DelayedMessageService { @Autowired private RedissonClient redissonClient; private RBlockingDeque<String> blockingDeque; private RDelayedQueue<String> delayedQueue; @PostConstruct public void init() { this.blockingDeque = redissonClient.getBlockingDeque("delayedQueue"); this.delayedQueue = redissonClient.getDelayedQueue(blockingDeque); Executors.newSingleThreadExecutor().submit(this::processMessages); } public void addMessage(String message, long delayMillis) { delayedQueue.offer(message, delayMillis, TimeUnit.MILLISECONDS); } public void processMessages() { try { while (true) { String message = blockingDeque.take(); // Process the message log.info("消息被处理: " + message); // ..业务逻辑处理 } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("中断异常",e); } } }
3)测试接口
@GetMapping(value = "/test3") public void redisQueueTest() { // Add some messages with delays delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay }
说明:
RDelayedQueue
是 Redisson 提供的延迟队列,它将消息存储在指定的队列中,直到消息到期才会被转移到该队列。它的主要作用包括:
- 延迟消息管理:我们可以使用
RDelayedQueue
的offer
方法将消息添加到延迟队列,并指定延迟时间,消息在延迟时间到期前一直保留在RDelayedQueue
中。 - 消息转移:一旦消息到期,
RDelayedQueue
会自动将消息转移到指定的RBlockingDeque
中。
RBlockingQueue
是 Redisson 提供的阻塞队列,它支持阻塞操作。主要作用包括:
- 阻塞操作:支持阻塞的
take
操作,如果队列中没有元素,会一直阻塞直到有元素可供消费。
总结:
个人推荐使用Redisson 的RDelayedQueue
方式,感觉更加可靠和简单一些,当然zset+score也可以是个不错选择,毕竟更加灵活,延迟消息还有其他不同的方案,比如rocketmq、rabbitmq插件等,假如项目中用了redis,又不想引入更多的中间件,可以尝试使用redis来实现,为了测试,这里例子都比较简单,在实际使用过程中,还要考虑补偿机制、幂等性等问题。
参考:
1.https://blog.csdn.net/qq_34826261/article/details/120598731