消息队列之消息延迟
消息延迟就是指生产者生产消息到队列之后,消费者会在一定的时间延迟之后再去消费。延迟消费的场景还是比较多的,比如超时取消或者其他需要过一定时间再去消费的任务。
现在的消息队列比如Kafka以及Rabbitmq并不支持消费端的延迟消费,需要自实现。之前在 Laravel的Queue学习 这篇文章简单介绍过Redis队列的延迟实现。
基本思想:
1、首先将需要延迟消费的加入到一个特殊的延迟队列种.default:delayed,该队列是一个zset,即有序集合。score为当前时间+延迟时间的时间戳。
2、每次队列pop消息时,都会聚合一下延迟队列。
- zrangebyscore 截止时间就是当前时间戳,这个时间就是score;
- 取出所有小有当前时间戳的数据;
- 取出的数据全部放入正常的队列中。
可以看一下它的lua脚本:
return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end
return val
LUA;
可以看到上面的思想是每次在执行任务的时候都会执行一遍上面的lua脚本,保证每次pop时都会把到时间的延迟任务拿出来。
其实现在的很多消息队列并不支持消息延迟,比如Kafka和Rabbitmq,但阿里开发的RocketMQ支持消息延迟,开源版本支持一定数量级别的延迟,他们的付费产品,可以支持任意时间的延迟,还是比较强大的。
虽然Kafka并不对外提供的消息延迟,但是其内部的定时任务的人执行使用了延迟队列相关知识。定时任务其实和消息延迟一样,都是在一定时间之后再执行对应的消息任务。其基本思想就是用了时间轮。
先简单说一下时间轮的作用。
有这样的一个场景,如果现在有10w个定时任务,如何做到高校触发?简单做法是所有的定时任务可能会放在一个集合种,然后每次都轮询一遍,但这种做法是相当低效的。一个更高校的思想就是使用时间轮,下面是一个简单的示意图:
数据结构是有一个环形队列,每个队列中是一个链表(单向或者双向的)。指针会按照一定的频率跳动,
比如一秒跳动一次,整个环形队列可能有60个slot。每移动一次到某个slot,就会将slot种的链表中的所有任务执行一遍。如果此时有新的定时任务出现,就会放到指针的前一个slot中。当然放到具体是那个slot,还要看定时任务具体的时间。比如要放到定时是2秒的,就放到距当前slot两个距离的节点中。
那现在有一个问题就是如果一个定时任务的时间已经超过了整个wheel的间距了,该怎么办呢?
针对该问题,还有一个分级时间轮的概念。每一个级时间轮的间距不断增大。
一个定时任务,会根据具体的定时时间决定添加到哪一个层级的时间轮中。第一个参考资料有介绍Kafka的时间轮实现原理。
RocketMQ延迟消息
开源版本中,RocketMQ划分了18个Level,给每个level都设置了定时器。当发送的消息是个延迟消息时,首先会将消息发送到特殊的队列,topic是
SCHEDULE_TOPIC_XXXX,queueId是延迟级别减去1,并将tagsCode设置为延迟时间。
随后定时器会扫描对应级别的延迟队列,拿延迟时间和当前时间做比对,如果小于等于当前时间,就会将对应消息发送到正常的消息队列中。
默认延迟级别 :1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。 level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
和Kafka内部的时间轮不同,开源版本的RocketMQ使用的是定时器去处理,比较当前时间。
参考资料:
微信分享/微信扫码阅读