laravel的queue学习

今天我们组长提到要部署多个work来消息异步任务,想到其如何在多个work间调度任务,如何做到不被不同的work重复消费,如何能正常分配job,然后我们就一起看了laravel的源码,又一次被laravel的设计思想惊叹到了,queue的设计令人惊叹,非常值得学习。今天的事情页告诉我,当你带着问题去研究代码的时侯是最有效率的。

好了,废话不多说了,开始步入正题。

关于queue的使用,可以直接参考官网: laravel5.6-Queues

那么回到刚开始提到的问题,laravel到底是怎么保证不被不同work重复消费呢?其实说白了,无论底层用的是数据库,还是Redis,还是其他如SDS,都是依赖于自身驱动的机制。关系型数据库就利用Mysql的锁机制;Redis就是队列等等。

看看它的源码。

1、首先是一直listen的deamon 进程

 public function daemon($connectionName, $queue, WorkerOptions $options)
    {
       //用于进程间通信,接收其他进程的信号,SIGTERM,SIGUSR2 ,SIGCNT等
        $this->listenForSignals();

        $lastRestart = $this->getTimestampOfLastQueueRestart();

        while (true) {
            // Before reserving any jobs, we will make sure this queue is not paused and
            // if it is we will just pause this worker for a given amount of time and
            // make sure we do not need to kill this worker process off completely.
            if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
                $this->pauseWorker($options, $lastRestart);

                continue;
            }

            // First, we will attempt to get the next job off of the queue. We will also
            // register the timeout handler and reset the alarm for this job so it is
            // not stuck in a frozen state forever. Then, we can fire off this job.
            $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );

            $this->registerTimeoutHandler($job, $options);

            // If the daemon should run (not in maintenance mode, etc.), then we can run
            // fire off this job for processing. Otherwise, we will need to sleep the
            // worker so no more jobs are processed until they should be processed.
            if ($job) {
                $this->runJob($job, $connectionName, $options);
            } else {
                $this->sleep($options->sleep);
            }

            // Finally, we will check to see if we have exceeded our memory limits or if
            // the queue should restart based on other indications. If so, we'll stop
            // this worker and let whatever is "monitoring" it restart the process.
            $this->stopIfNecessary($options, $lastRestart);
        }
    }

重点就是上面的获取下一个job的方法,该方法正确获取job后,就是后面的执行过程了。看一下这个方法:

 /**
     * Get the next job from the queue connection.
     *
     * @param  \Illuminate\Contracts\Queue\Queue  $connection
     * @param  string  $queue
     * @return \Illuminate\Contracts\Queue\Job|null
     */
    protected function getNextJob($connection, $queue)
    {
        try {
            foreach (explode(',', $queue) as $queue) {
                //pop出一个job
                if (! is_null($job = $connection->pop($queue))) {
                    return $job;
                }
            }
        } catch (Exception $e) {
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);
        } catch (Throwable $e) {
            $this->exceptions->report($e = new FatalThrowableError($e));

            $this->stopWorkerIfLostConnection($e);
        }
    }

上面的$connection->pop($queue)是关键,如何pop的呢?如上注释,connection就是Queue,它就是一个接口,实现类:

首先看DataBaseQueue的实现:

/**
     * Pop the next job off of the queue.
     *
     * @param  string  $queue
     * @return \Illuminate\Contracts\Queue\Job|null
     * @throws \Exception|\Throwable
     */
    public function pop($queue = null)
    {
        $queue = $this->getQueue($queue);

        try {
            //开启一个事务,有查询,有update
            $this->database->beginTransaction();

            if ($job = $this->getNextAvailableJob($queue)) {
                return $this->marshalJob($queue, $job);
            }

            $this->database->commit();
        } catch (Throwable $e) {
            $this->database->rollBack();

            throw $e;
        }
    }

上面的操作流程:

1、首先开启一个事务,用于查询和写入;

2、随后尝试获取可用的job;

  /**
     * Get the next available job for the queue.
     *
     * @param  string|null  $queue
     * @return \Illuminate\Queue\Jobs\DatabaseJobRecord|null
     */
    protected function getNextAvailableJob($queue)
    {
        $job = $this->database->table($this->table)
                    ->lockForUpdate()
                    ->where('queue', $this->getQueue($queue))
                    ->where(function ($query) {
                        $this->isAvailable($query);
                        $this->isReservedButExpired($query);
                    })
                    ->orderBy('id', 'asc')
                    ->first();

        return $job ? new DatabaseJobRecord((object) $job) : null;
    }

lockForUpdate就是 select * for update.,所以也必须在事务中。 它会根据下面的查询条件,查询满足的job,根据avaliable_at和reservered_at时间。它会查询reserved_at为null的,且avaliable_at在可用时间范围内的数据。并对查询到的数据增加排他锁。具体的可以参考文章: 数据库排他锁

job数据表:

| jobs  | CREATE TABLE `jobs` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
  `payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL,
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  PRIMARY KEY (`id`),
  KEY `jobs_queue_index` (`queue`(191))
) ENGINE=InnoDB AUTO_INCREMENT=418 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci |

如果查询到,会更新reserved_at字段,并commit事务。

至此,上面分析晚了数据库的获取job的过程。再看看redis.

Redis的就更简单了,每个key都维护一个列表,当我们dispatch一个job的时侯,都会rpush到list中。

看一下它的pop方法:

 /**
     * Pop the next job off of the queue.
     *
     * @param  string  $queue
     * @return \Illuminate\Contracts\Queue\Job|null
     */
    public function pop($queue = null)
    {
        $this->migrate($prefixed = $this->getQueue($queue));

        list($job, $reserved) = $this->retrieveNextJob($prefixed);

        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }

上面的流程是:

1、migrate整合保留的job和过期的job;

看到这儿应该明白。对于同一个queue,laravel维护了三个队列。

  • 正常存储job的list;
  • 存储reserved的有序集合;
  • 存储过期job的有序集合;

也就是说,pop时,会从这三个地方获取job。

2、migrate之后,从正常队列获取job.

这里就直接用了lua脚本:

 /**
     * Retrieve the next job from the queue.
     *
     * @param  string  $queue
     * @return array
     */
    protected function retrieveNextJob($queue)
    {
        return $this->getConnection()->eval(
            LuaScripts::pop(), 2, $queue, $queue.':reserved',
            $this->availableAt($this->retryAfter)
        );
    }



public static function pop()
    {
        return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false

if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved['attempts'] = reserved['attempts'] + 1
    reserved = cjson.encode(reserved)
    redis.call('zadd', KEYS[2], ARGV[1], reserved)
end

return {job, reserved}
LUA;
    }

看上面代码,如果获取成功,会再把对相应数据再放到存储reserved任务的有序集合中。由于是lua,所以是原子性操作。

上面简单分析了work获取job的过程。觉得laravel的代码设计非常棒,从设计的角度来看,是非常值的学习的。

分别看一下laravel是怎么解决消息队列的经典问题的:

  • 消息顺序

文章最开始谈到了队列,不管底层是DB,还是Redis,还是SDS。都会遵循FIFO的原则。先写入的一定会先出。就拿RedisQueue来说,上面也提到了,主要是使用Redis的列表,rpush入,lpop出。但是,可但是,者并不能就说明消费是有序的。如果采用并发消费的情况下,最终业务程序可就不一定是按照顺序执行的。其实这个并不是某个队列的问题了,队列只能做到我存储的消息一定是按照生产者的投递顺序存储,消费者取也会是按顺序。但最终如何被消息,并不是消费者控制的了。

  • 消息重复/消息幂等

在生产端,并没有任何措施去保证消息不会重复投递。在消费端,laravel是非常有可能重复消费的。比如一个队列任务执行时间过长,超过了设置的超时时间,此时该任务不会再继续向下执行,而是重新被放到一个特殊队列中,待下次继续消费。

超时机制:

  protected function registerTimeoutHandler($job, WorkerOptions $options)
    {
        if ($this->supportsAsyncSignals()) {
            // We will register a signal handler for the alarm signal so that we can kill this
            // process if it is running too long because it has frozen. This uses the async
            // signals supported in recent versions of PHP to accomplish it conveniently.
            pcntl_signal(SIGALRM, function () {
                $this->kill(1);
            });

            pcntl_alarm(
                max($this->timeoutForJob($job, $options), 0)
            );
        }
    }

如果消费过程整个是个事务,超时了,事务没有提交,到是没有影响。但如果不是,就会出现问题。比如灌券操作,向百万商户灌券,如果使用laravel队列,那是及有可能出现问题的。

  • 消息丢失

不保证

  • 消息事务

不保证

  • 消息延迟

支持,laravel可支持消息一定时间的延迟进行消息,通过delay来控制,它会把延迟消息存储到一个有序集合中,queue:delayed种。score就是可用时间。每次在进行队列pop时,都会合并一下延迟队列种达到延迟时间的任务,并放到正常的队列中。

看下源码:

 public function pop($queue = null)
    {
       //合并延迟队列任务
        $this->migrate($prefixed = $this->getQueue($queue));

        list($job, $reserved) = $this->retrieveNextJob($prefixed);

        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }

    /**
     * Migrate any delayed or expired jobs onto the primary queue.
     *
     * @param  string  $queue
     * @return void
     */
    protected function migrate($queue)
    {
        $this->migrateExpiredJobs($queue.':delayed', $queue);

        if (! is_null($this->retryAfter)) {
            $this->migrateExpiredJobs($queue.':reserved', $queue);
        }
    }

    /**
     * Migrate the delayed jobs that are ready to the regular queue.
     *
     * @param  string  $from
     * @param  string  $to
     * @return array
     */
    public function migrateExpiredJobs($from, $to)
    {
        return $this->getConnection()->eval(
            LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
        );
    }

看下lua执行脚本:

    /**
     * Get the Lua script to migrate expired jobs back onto the queue.
     *
     * KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
     * KEYS[2] - The queue we are moving jobs to, for example: queues:foo
     * ARGV[1] - The current UNIX timestamp
     *
     * @return string
     */
    public static function migrateExpiredJobs()
    {
        return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
    end
end

return val
LUA;
    }

  • 消费优先级

不支持

  • 消费持久化

这个是完全OK的,无论是使用Mysql还是Redis都可以持久化。

  • 消息回溯

laravel对于消费的消息是直接删除的,所以是不支持消息回溯的。

--------EOF---------
微信分享/微信扫码阅读