前言
Redis作为一款优秀的缓存中间件,人们总是寄予他新的厚望。其列表类型的阻塞操作可以实现消息队列。
在场景中使用可以牢记以下口诀:
lpush + lpop = Stack(栈)
lpush + rpop = Queue(队列)
lpsh + ltrim = Capped Collection(有限集合)
lpush + brpop = Message Queue(消息队列)
背景
假如有这样的一个需求,现有一套「较老」的公共服务可供你的系统使用,但是目前还有其他业务单位正在使用。每次请求调用下发一个任务,接着服务会响应一个回调接口,通过这个回调接口,你可以查询这个任务的执行状态,到了什么阶段,是否完成之类的。任务从下发到结束需要一定的时间,并且任务结束总是以两种状态出现:Completed或者Error。
假设这套公共服务分配给你了N个容量可以同时进行任务的调度,也就是说你系统调用下发的任务只能是N,当超过N时,就需要自定义一个队列来进行排队。当有任务完成时,任务执行池就可以释放一个空位,队列就可以pop出一个消息用于处理调用公共服务。
分析
为了充分深入Redis的列表的使用,我打算把所有的需求点和场景都交给Redis去完成。
所以这个需求看起来比较简单,就变成了也有麻烦的地方,主要是四个:
- 由于其他业务单位的存在,本系统需要有一个缓冲池和任务执行池
Running Pool
,容量为可分配的N。 - 业务量增加超过N时,需要有个等待队列维护出入:当目前任务执行池
Running Pool
满载时,入队;当任务Running Pool
中的任务执行完成释放空位时,则出队进入Running Pool
。 - 「实际处理」的任务并不是执行在本系统而是在公共服务上,也就是说任何状态只能被动地通过公共服务的回调接口去查。
- 当存在N个正在执行的任务时,单线程肯定是效率不够的。需要开启多线程去回调公共服务的接口判断任务状态是否完成/失败,用于下一步操作出入队列。
设计
- 我们可以在Redis中规划一块任务执行池
Running Pool
,可以是Set类型也可以是List类型,容量为N。设计这样的一个队列有个好处是如果单纯的PUSH/POP
的话,当出队之后处理这个消息的过程中发生不可抗力、宕机,消息出队之后就会永远的丢失掉,而这样做则是消息始终持久化在Redis中,是任务结束之后出队; 有一个等待队列
Pending Queue
用于缓冲,消费者需要处理上面和这一块的业务逻辑;一个
Completed Queue
用于完成消息的推送;一个
Error Queue
用于任务失败消息的推送(这两者其实可以合并,作为任务结束的消息便可,具体内容可以放在value中,也通过读写DataBase。)
为了不出现当任务队列中没有任务时,消费者每秒都会调用一次POP命令查看是否有新任务这种情况,需要在消费者中同时处理业务逻辑,当任务执行缓冲池出队时,把Pending Queue
的消息出队,入队到Running Pool
。
所以我们需要一个生产者RedisProducer(部分代码)
1 |
|
此部分主要是先判断Redis的Running Pool
是否还有空余,无空余则进入Pending Queue
等待。进入了Running Pool
的消息做DataBase的持久化业务逻辑。这一句redisConsumer.consumerMessageThread(produceDTO);
则是主动调用消费者。