laravel的queue学习
今天我们组长提到要部署多个work来消息异步任务,想到其如何在多个work间调度任务,如何做到不被不同的work重复消费,如何能正常分配job,然后我们就一起看了laravel的源码,又一次被laravel的设计思想惊叹到了,queue的设计令人惊叹,非常值得学习。今天的事情页告诉我,当你带着问题去研究代码的时侯是最有效率的。
好了,废话不多说了,开始步入正题。
关于queue的使用,可以直接参考官网: laravel5.6-Queues
那么回到刚开始提到的问题,laravel到底是怎么保证不被不同work重复消费呢?其实说白了,无论底层用的是数据库,还是Redis,还是其他如SDS,都是依赖于自身驱动的机制。关系型数据库就利用Mysql的锁机制;Redis就是队列等等。
看看它的源码。
1、首先是一直listen的deamon 进程
public function daemon($connectionName, $queue, WorkerOptions $options)
{
//用于进程间通信,接收其他进程的信号,SIGTERM,SIGUSR2 ,SIGCNT等
$this->listenForSignals();
$lastRestart = $this->getTimestampOfLastQueueRestart();
while (true) {
// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);
continue;
}
// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
$this->registerTimeoutHandler($job, $options);
// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart);
}
}
重点就是上面的获取下一个job的方法,该方法正确获取job后,就是后面的执行过程了。看一下这个方法:
/**
* Get the next job from the queue connection.
*
* @param \Illuminate\Contracts\Queue\Queue $connection
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
protected function getNextJob($connection, $queue)
{
try {
foreach (explode(',', $queue) as $queue) {
//pop出一个job
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
} catch (Exception $e) {
$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);
} catch (Throwable $e) {
$this->exceptions->report($e = new FatalThrowableError($e));
$this->stopWorkerIfLostConnection($e);
}
}
上面的$connection->pop($queue)是关键,如何pop的呢?如上注释,connection就是Queue,它就是一个接口,实现类:
首先看DataBaseQueue的实现:
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
* @throws \Exception|\Throwable
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
try {
//开启一个事务,有查询,有update
$this->database->beginTransaction();
if ($job = $this->getNextAvailableJob($queue)) {
return $this->marshalJob($queue, $job);
}
$this->database->commit();
} catch (Throwable $e) {
$this->database->rollBack();
throw $e;
}
}
上面的操作流程:
1、首先开启一个事务,用于查询和写入;
2、随后尝试获取可用的job;
/**
* Get the next available job for the queue.
*
* @param string|null $queue
* @return \Illuminate\Queue\Jobs\DatabaseJobRecord|null
*/
protected function getNextAvailableJob($queue)
{
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->orderBy('id', 'asc')
->first();
return $job ? new DatabaseJobRecord((object) $job) : null;
}
lockForUpdate就是 select * for update.,所以也必须在事务中。 它会根据下面的查询条件,查询满足的job,根据avaliable_at和reservered_at时间。它会查询reserved_at为null的,且avaliable_at在可用时间范围内的数据。并对查询到的数据增加排他锁。具体的可以参考文章: 数据库排他锁
job数据表:
| jobs | CREATE TABLE `jobs` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`queue` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL,
`attempts` tinyint(3) unsigned NOT NULL,
`reserved_at` int(10) unsigned DEFAULT NULL,
`available_at` int(10) unsigned NOT NULL,
`created_at` int(10) unsigned NOT NULL,
PRIMARY KEY (`id`),
KEY `jobs_queue_index` (`queue`(191))
) ENGINE=InnoDB AUTO_INCREMENT=418 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci |
如果查询到,会更新reserved_at字段,并commit事务。
至此,上面分析晚了数据库的获取job的过程。再看看redis.
Redis的就更简单了,每个key都维护一个列表,当我们dispatch一个job的时侯,都会rpush到list中。
看一下它的pop方法:
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$this->migrate($prefixed = $this->getQueue($queue));
list($job, $reserved) = $this->retrieveNextJob($prefixed);
if ($reserved) {
return new RedisJob(
$this->container, $this, $job,
$reserved, $this->connectionName, $queue ?: $this->default
);
}
}
上面的流程是:
1、migrate整合保留的job和过期的job;
看到这儿应该明白。对于同一个queue,laravel维护了三个队列。
- 正常存储job的list;
- 存储reserved的有序集合;
- 存储过期job的有序集合;
也就是说,pop时,会从这三个地方获取job。
2、migrate之后,从正常队列获取job.
这里就直接用了lua脚本:
/**
* Retrieve the next job from the queue.
*
* @param string $queue
* @return array
*/
protected function retrieveNextJob($queue)
{
return $this->getConnection()->eval(
LuaScripts::pop(), 2, $queue, $queue.':reserved',
$this->availableAt($this->retryAfter)
);
}
public static function pop()
{
return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
-- Increment the attempt count and place job on the reserved queue...
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
end
return {job, reserved}
LUA;
}
看上面代码,如果获取成功,会再把对相应数据再放到存储reserved任务的有序集合中。由于是lua,所以是原子性操作。
上面简单分析了work获取job的过程。觉得laravel的代码设计非常棒,从设计的角度来看,是非常值的学习的。
分别看一下laravel是怎么解决消息队列的经典问题的:
- 消息顺序
文章最开始谈到了队列,不管底层是DB,还是Redis,还是SDS。都会遵循FIFO的原则。先写入的一定会先出。就拿RedisQueue来说,上面也提到了,主要是使用Redis的列表,rpush入,lpop出。但是,可但是,者并不能就说明消费是有序的。如果采用并发消费的情况下,最终业务程序可就不一定是按照顺序执行的。其实这个并不是某个队列的问题了,队列只能做到我存储的消息一定是按照生产者的投递顺序存储,消费者取也会是按顺序。但最终如何被消息,并不是消费者控制的了。
- 消息重复/消息幂等
在生产端,并没有任何措施去保证消息不会重复投递。在消费端,laravel是非常有可能重复消费的。比如一个队列任务执行时间过长,超过了设置的超时时间,此时该任务不会再继续向下执行,而是重新被放到一个特殊队列中,待下次继续消费。
超时机制:
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
if ($this->supportsAsyncSignals()) {
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});
pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
}
}
如果消费过程整个是个事务,超时了,事务没有提交,到是没有影响。但如果不是,就会出现问题。比如灌券操作,向百万商户灌券,如果使用laravel队列,那是及有可能出现问题的。
- 消息丢失
不保证
- 消息事务
不保证
- 消息延迟
支持,laravel可支持消息一定时间的延迟进行消息,通过delay来控制,它会把延迟消息存储到一个有序集合中,queue:delayed种。score就是可用时间。每次在进行队列pop时,都会合并一下延迟队列种达到延迟时间的任务,并放到正常的队列中。
看下源码:
public function pop($queue = null)
{
//合并延迟队列任务
$this->migrate($prefixed = $this->getQueue($queue));
list($job, $reserved) = $this->retrieveNextJob($prefixed);
if ($reserved) {
return new RedisJob(
$this->container, $this, $job,
$reserved, $this->connectionName, $queue ?: $this->default
);
}
}
/**
* Migrate any delayed or expired jobs onto the primary queue.
*
* @param string $queue
* @return void
*/
protected function migrate($queue)
{
$this->migrateExpiredJobs($queue.':delayed', $queue);
if (! is_null($this->retryAfter)) {
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
}
/**
* Migrate the delayed jobs that are ready to the regular queue.
*
* @param string $from
* @param string $to
* @return array
*/
public function migrateExpiredJobs($from, $to)
{
return $this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
);
}
看下lua执行脚本:
/**
* Get the Lua script to migrate expired jobs back onto the queue.
*
* KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
* KEYS[2] - The queue we are moving jobs to, for example: queues:foo
* ARGV[1] - The current UNIX timestamp
*
* @return string
*/
public static function migrateExpiredJobs()
{
return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end
return val
LUA;
}
- 消费优先级
不支持
- 消费持久化
这个是完全OK的,无论是使用Mysql还是Redis都可以持久化。
- 消息回溯
laravel对于消费的消息是直接删除的,所以是不支持消息回溯的。
微信分享/微信扫码阅读
