Rabbitmq在celery中的实践

学完了使用celery,现在我是带着3个疑问来学习celery的:

  1. 发布者是怎么将任务消息发送给broker的,是以什么形式,具体是怎么实现的呢?
  2. broker接到任务消息是如何处理的呢?
  3. worker是怎么接收到任务,进行处理的呢?

我目前的初步理解是producer将任务序列化,以消息的方式发送给broker,并存储到队列中,然后broker根据某种策略将不同任务消息发送给worker,worker反序列化任务消息,并执行任务。 我知道我的理解相当肤浅,而且可能存在错误,因此我带着上面的疑问来学习。

我本打算是按照顺序的方式来学,但看到producer发布任务的源码之后,我发现,我得先初步研究一下消息中间件,因此里面有exchange,queue,route等一些概念,而这些都是消息中间件里面的。所以我先看一下rabbit,然后再研究如何发送任务消息,如何接收任务消息,最后再系统学习RabbitMQ。

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。我就参考了一篇文章,介绍得很详细: RabbitMQ介绍

我只记录一下我自己的理解:

RabbitMQ是建立起了从发布任务者到任务消费这的一条通道,并保证通道的正常运行。RabbitMQ负责从Producer接收数据然后传递到Consumer。它能保证多并发,数据安全传递,可扩展。

在RabbitMQ中重要的几个概念:

1、Exchange,发布任务者是将任务消息发送到RabbitMQ的交换器上,剩下其余的工作由它完成。Exchange接到消息后,会根据routing_keys将任务消息路由到指定的队列中。它像一个交互机;

2、Queue,任务消息push到队列之后,会根据某种策略将任务消息转发给某个worker。Queue要求消费者必须要有返回确认值。

3、binding,上面说了Exchange会根据routing_key将消息路由到Queue中,那么这路由规则是怎么定的呢?binding在这里就很重要的,RabbitMQ要指定一个binding_key,将Exchange和Queue绑定,然后Exchange会根据routing_key与binding_key匹配,然后选择指定的Queue。当然这里的binding_key不是什么时候都有效,这还与ExchangeType有关。其中最简单的就是direct类型,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

4、Channel 是建立在TCP连接上的通道。关于使用它的原因,有如下介绍:“对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。”。

了解了RabbitMQ的组成之后,其实已经把第二个问题解答了,此外也能更好理解celery的运作机制了。

就拿我项目中的 delay异步任务来说吧。

我是自己定义了一个更新阅读量的异步任务:

@celery_app.task def incr_readtimes(article_id): return Article.objects.filter(id=article_id).update(read_times=F('read_times') + 1)

然后在视图类中:

incr_readtimes.delay(slug)

如果上述语句执行了,那么celery就会执行一遍发布任务消息,并且消费者会接收任务消息,并执行任务。

对于第一个问题,我也了解一下流程:

delay实际上是对apply_sync方法的封装。它主要是异步发送消息。

看一下apply_sync的源码:

def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                link=None, link_error=None, shadow=None, **options):
    """Apply tasks asynchronously by sending a message.

    Arguments:
        args (Tuple): The positional arguments to pass on to the task.

        kwargs (Dict): The keyword arguments to pass on to the task.

        countdown (float): Number of seconds into the future that the
            task should execute.  Defaults to immediate execution.

        eta (~datetime.datetime): Absolute time and date of when the task
            should be executed.  May not be specified if `countdown`
            is also supplied.

        expires (float, ~datetime.datetime): Datetime or
            seconds in the future for the task should expire.
            The task won't be executed after the expiration time.

        shadow (str): Override task name used in logs/monitoring.
            Default is retrieved from :meth:`shadow_name`.

        connection (kombu.Connection): Re-use existing broker connection
            instead of acquiring one from the connection pool.

        retry (bool): If enabled sending of the task message will be
            retried in the event of connection loss or failure.
            Default is taken from the :setting:`task_publish_retry`
            setting.  Note that you need to handle the
            producer/connection manually for this to work.

        retry_policy (Mapping): Override the retry policy used.
            See the :setting:`task_publish_retry_policy` setting.

        queue (str, kombu.Queue): The queue to route the task to.
            This must be a key present in :setting:`task_queues`, or
            :setting:`task_create_missing_queues` must be
            enabled.  See :ref:`guide-routing` for more
            information.

        exchange (str, kombu.Exchange): Named custom exchange to send the
            task to.  Usually not used in combination with the ``queue``
            argument.

        routing_key (str): Custom routing key used to route the task to a
            worker server.  If in combination with a ``queue`` argument
            only used to specify custom routing keys to topic exchanges.

        priority (int): The task priority, a number between 0 and 9.
            Defaults to the :attr:`priority` attribute.

        serializer (str): Serialization method to use.
            Can be `pickle`, `json`, `yaml`, `msgpack` or any custom
            serialization method that's been registered
            with :mod:`kombu.serialization.registry`.
            Defaults to the :attr:`serializer` attribute.

        compression (str): Optional compression method
            to use.  Can be one of ``zlib``, ``bzip2``,
            or any custom compression methods registered with
            :func:`kombu.compression.register`.
            Defaults to the :setting:`task_compression` setting.

        link (~@Signature): A single, or a list of tasks signatures
            to apply if the task returns successfully.

        link_error (~@Signature): A single, or a list of task signatures
            to apply if an error occurs while executing the task.

        producer (kombu.Producer): custom producer to use when publishing
            the task.

        add_to_parent (bool): If set to True (default) and the task
            is applied while executing another task, then the result
            will be appended to the parent tasks ``request.children``
            attribute.  Trailing can also be disabled by default using the
            :attr:`trail` attribute

        publisher (kombu.Producer): Deprecated alias to ``producer``.

        headers (Dict): Message headers to be included in the message.

    Returns:
        ~@AsyncResult: Promise of future evaluation.

    options = dict(preopts, **options) if options else preopts
    return app.send_task(
        self.name, args, kwargs, task_id=task_id, producer=producer,
        link=link, link_error=link_error, result_cls=self.AsyncResult,
        shadow=shadow, task_type=self,
        **options
    )

看源码可见,apply_async会调用celery的send_task 方法。

接下来就是该方法的源码:

 def send_task(*****):
    """Send task by name.

    Supports the same arguments as :meth:`@-Task.apply_async`.

    Arguments:
        name (str): Name of task to call (e.g., `"tasks.add"`).
        result_cls (~@AsyncResult): Specify custom result class.
    """
    parent = have_parent = None
    amqp = self.amqp
    task_id = task_id or uuid()
    producer = producer or publisher  # XXX compat
    router = router or amqp.router
    conf = self.conf

    message = amqp.create_task_message(
        task_id, name, args, kwargs, countdown, eta, group_id,
        expires, retries, chord,
        maybe_list(link), maybe_list(link_error),
        reply_to or self.oid, time_limit, soft_time_limit,
        self.conf.task_send_sent_event,
        root_id, parent_id, shadow, chain,
    )

    if connection:
        producer = amqp.Producer(connection, auto_declare=False)
    with self.producer_or_acquire(producer) as P:
        with P.connection._reraise_as_library_errors():
            self.backend.on_task_call(P, task_id)
            amqp.send_task_message(P, name, message, **options)
    result = (result_cls or self.AsyncResult)(task_id)

    return result

task_id是uuid()产生的唯一id. task_name的产生是通过shared_tasks装饰器获得的,shared_tasks最终会调用gen_task_name(utils/_ init_ .py)函数,即生成

上面的消息体大致如下格式:

 headers={},
            properties={
                'correlation_id': task_id,
                'reply_to': reply_to or '',
            },
            body={
                'task': name,
                'id': task_id,
                'args': args,
                'kwargs': kwargs,
                'group': group_id,
                'retries': retries,
                'eta': eta,
                'expires': expires,
                'utc': utc,
                'callbacks': callbacks,
                'errbacks': errbacks,
                'timelimit': (time_limit, soft_time_limit),
                'taskset': group_id,
                'chord': chord,
            },
            sent_event={
                'uuid': task_id,
                'name': name,
                'args': saferepr(args),
                'kwargs': saferepr(kwargs),
                'retries': retries,
                'eta': eta,
                'expires': expires,
            }

这个方法最终会调用kombu库的Producer类。中间环节略过,最终会调用Producer类的下面的method:

 def publish(self, message, routing_key=None, mandatory=False,
            immediate=False, exchange=None):
    """Publish message.

    Arguments:
        message (Union[kombu.Message, str, bytes]):
            Message to publish.
        routing_key (str): Message routing key.
        mandatory (bool): Currently not supported.
        immediate (bool): Currently not supported.
    """
    if isinstance(message, string_t):
        message = self.Message(message)
    exchange = exchange or self.name
    return self.channel.basic_publish(
        message,
        exchange=exchange,
        routing_key=routing_key,
        mandatory=mandatory,
        immediate=immediate,
    )

上面的发送方法最终会根据特定的Exchange类型调用响应的Exchange类进行发送,比如类型是direct,就会调用DirectExchange类,然后执行deliver方法,最终调用Channel类的_put方法。一般channel会使用已经存在的Connection.

上述指定了exchange,routing_key,message等关键参数。

默认情况下,celery设定exchange类型为direct,名为celery,Queue默认也为celery;routing_key也是celery。

消息是默认是以json的形式序列化的。

Producer发送消息时,可以使用已经建立的链接,而不必重新建立链接,这就证明了Channel的重要性。

目前为止,已经知道了如何发送消息给RabbitMQ,那worker是如何订阅Queue的呢?

执行celery -A dailyblog worker 之后,worker会建立了与中间件的连接。好吧,没时间了,下班了,我要走喽,明天再写。

今天又看了celery的源码,主要是想研究执行celery worker之后到底是怎么执行的。

命令行源码在celery/bin/中,有一个worker.py模块,其中定义了一个worker类:

    class worker(Command):
        """Start worker instance.

        Examples:
            .. code-block:: console

                $ celery worker --app=proj -l info
                $ celery worker -A proj -l info -Q hipri,lopri

                $ celery worker -A proj --concurre

它继承了Command类,该基类有一个比较重要的方法:

class  Command():

     def __call__(self, *args, **kwargs):
        random.seed()  # maybe we were forked.
        self.verify_args(args)
        try:
            ret = self.run(*args, **kwargs)
            return ret if ret is not None else EX_OK
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

实例化类的时候,会调用run方法,worker类重构了run方法,最终会调用Bluesprint类的start方法。

blueprint 的默认step:

default_steps = {
        'celery.worker.components:Hub',
        'celery.worker.components:Pool',
        'celery.worker.components:Beat',
        'celery.worker.components:Timer',
        'celery.worker.components:StateDB',
        'celery.worker.components:Consumer',
        'celery.worker.autoscale:WorkerComponent',
    }

Blueprint相当于Flask的Blueprint,是一个组件。

此时,Worker属于一个Blueprint,Worker组件又创建了Consumer('celery.worker.components:Consumer),也是一个Blueprint。Worker组件负责启动其他主要组件;Consumer负责建立和broker之间的链接。

Consumer组件启动时,调用start方法,最终会调用celery.worker.consumer.tasks:Tasks,该类在调用start方法时,会生成一个TaskConsumer类,即任务消费者。consumer不断地从Broker的队列中获取任务进行消费。

我在上面只是简述了大体的逻辑,想要把celery研究透,还需要不断地看官方文档和源码。

--------EOF---------
微信分享/微信扫码阅读