Rabbitmq在celery中的实践
学完了使用celery,现在我是带着3个疑问来学习celery的:
- 发布者是怎么将任务消息发送给broker的,是以什么形式,具体是怎么实现的呢?
- broker接到任务消息是如何处理的呢?
- worker是怎么接收到任务,进行处理的呢?
我目前的初步理解是producer将任务序列化,以消息的方式发送给broker,并存储到队列中,然后broker根据某种策略将不同任务消息发送给worker,worker反序列化任务消息,并执行任务。 我知道我的理解相当肤浅,而且可能存在错误,因此我带着上面的疑问来学习。
我本打算是按照顺序的方式来学,但看到producer发布任务的源码之后,我发现,我得先初步研究一下消息中间件,因此里面有exchange,queue,route等一些概念,而这些都是消息中间件里面的。所以我先看一下rabbit,然后再研究如何发送任务消息,如何接收任务消息,最后再系统学习RabbitMQ。
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。我就参考了一篇文章,介绍得很详细: RabbitMQ介绍 。
我只记录一下我自己的理解:
RabbitMQ是建立起了从发布任务者到任务消费这的一条通道,并保证通道的正常运行。RabbitMQ负责从Producer接收数据然后传递到Consumer。它能保证多并发,数据安全传递,可扩展。
在RabbitMQ中重要的几个概念:
1、Exchange,发布任务者是将任务消息发送到RabbitMQ的交换器上,剩下其余的工作由它完成。Exchange接到消息后,会根据routing_keys将任务消息路由到指定的队列中。它像一个交互机;
2、Queue,任务消息push到队列之后,会根据某种策略将任务消息转发给某个worker。Queue要求消费者必须要有返回确认值。
3、binding,上面说了Exchange会根据routing_key将消息路由到Queue中,那么这路由规则是怎么定的呢?binding在这里就很重要的,RabbitMQ要指定一个binding_key,将Exchange和Queue绑定,然后Exchange会根据routing_key与binding_key匹配,然后选择指定的Queue。当然这里的binding_key不是什么时候都有效,这还与ExchangeType有关。其中最简单的就是direct类型,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
4、Channel 是建立在TCP连接上的通道。关于使用它的原因,有如下介绍:“对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。”。
了解了RabbitMQ的组成之后,其实已经把第二个问题解答了,此外也能更好理解celery的运作机制了。
就拿我项目中的 delay异步任务来说吧。
我是自己定义了一个更新阅读量的异步任务:
@celery_app.task def incr_readtimes(article_id): return Article.objects.filter(id=article_id).update(read_times=F('read_times') + 1)
然后在视图类中:
incr_readtimes.delay(slug)
如果上述语句执行了,那么celery就会执行一遍发布任务消息,并且消费者会接收任务消息,并执行任务。
对于第一个问题,我也了解一下流程:
delay实际上是对apply_sync方法的封装。它主要是异步发送消息。
看一下apply_sync的源码:
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options):
"""Apply tasks asynchronously by sending a message.
Arguments:
args (Tuple): The positional arguments to pass on to the task.
kwargs (Dict): The keyword arguments to pass on to the task.
countdown (float): Number of seconds into the future that the
task should execute. Defaults to immediate execution.
eta (~datetime.datetime): Absolute time and date of when the task
should be executed. May not be specified if `countdown`
is also supplied.
expires (float, ~datetime.datetime): Datetime or
seconds in the future for the task should expire.
The task won't be executed after the expiration time.
shadow (str): Override task name used in logs/monitoring.
Default is retrieved from :meth:`shadow_name`.
connection (kombu.Connection): Re-use existing broker connection
instead of acquiring one from the connection pool.
retry (bool): If enabled sending of the task message will be
retried in the event of connection loss or failure.
Default is taken from the :setting:`task_publish_retry`
setting. Note that you need to handle the
producer/connection manually for this to work.
retry_policy (Mapping): Override the retry policy used.
See the :setting:`task_publish_retry_policy` setting.
queue (str, kombu.Queue): The queue to route the task to.
This must be a key present in :setting:`task_queues`, or
:setting:`task_create_missing_queues` must be
enabled. See :ref:`guide-routing` for more
information.
exchange (str, kombu.Exchange): Named custom exchange to send the
task to. Usually not used in combination with the ``queue``
argument.
routing_key (str): Custom routing key used to route the task to a
worker server. If in combination with a ``queue`` argument
only used to specify custom routing keys to topic exchanges.
priority (int): The task priority, a number between 0 and 9.
Defaults to the :attr:`priority` attribute.
serializer (str): Serialization method to use.
Can be `pickle`, `json`, `yaml`, `msgpack` or any custom
serialization method that's been registered
with :mod:`kombu.serialization.registry`.
Defaults to the :attr:`serializer` attribute.
compression (str): Optional compression method
to use. Can be one of ``zlib``, ``bzip2``,
or any custom compression methods registered with
:func:`kombu.compression.register`.
Defaults to the :setting:`task_compression` setting.
link (~@Signature): A single, or a list of tasks signatures
to apply if the task returns successfully.
link_error (~@Signature): A single, or a list of task signatures
to apply if an error occurs while executing the task.
producer (kombu.Producer): custom producer to use when publishing
the task.
add_to_parent (bool): If set to True (default) and the task
is applied while executing another task, then the result
will be appended to the parent tasks ``request.children``
attribute. Trailing can also be disabled by default using the
:attr:`trail` attribute
publisher (kombu.Producer): Deprecated alias to ``producer``.
headers (Dict): Message headers to be included in the message.
Returns:
~@AsyncResult: Promise of future evaluation.
options = dict(preopts, **options) if options else preopts
return app.send_task(
self.name, args, kwargs, task_id=task_id, producer=producer,
link=link, link_error=link_error, result_cls=self.AsyncResult,
shadow=shadow, task_type=self,
**options
)
看源码可见,apply_async会调用celery的send_task 方法。
接下来就是该方法的源码:
def send_task(*****):
"""Send task by name.
Supports the same arguments as :meth:`@-Task.apply_async`.
Arguments:
name (str): Name of task to call (e.g., `"tasks.add"`).
result_cls (~@AsyncResult): Specify custom result class.
"""
parent = have_parent = None
amqp = self.amqp
task_id = task_id or uuid()
producer = producer or publisher # XXX compat
router = router or amqp.router
conf = self.conf
message = amqp.create_task_message(
task_id, name, args, kwargs, countdown, eta, group_id,
expires, retries, chord,
maybe_list(link), maybe_list(link_error),
reply_to or self.oid, time_limit, soft_time_limit,
self.conf.task_send_sent_event,
root_id, parent_id, shadow, chain,
)
if connection:
producer = amqp.Producer(connection, auto_declare=False)
with self.producer_or_acquire(producer) as P:
with P.connection._reraise_as_library_errors():
self.backend.on_task_call(P, task_id)
amqp.send_task_message(P, name, message, **options)
result = (result_cls or self.AsyncResult)(task_id)
return result
task_id是uuid()产生的唯一id. task_name的产生是通过shared_tasks装饰器获得的,shared_tasks最终会调用gen_task_name(utils/_ init_ .py)函数,即生成
上面的消息体大致如下格式:
headers={},
properties={
'correlation_id': task_id,
'reply_to': reply_to or '',
},
body={
'task': name,
'id': task_id,
'args': args,
'kwargs': kwargs,
'group': group_id,
'retries': retries,
'eta': eta,
'expires': expires,
'utc': utc,
'callbacks': callbacks,
'errbacks': errbacks,
'timelimit': (time_limit, soft_time_limit),
'taskset': group_id,
'chord': chord,
},
sent_event={
'uuid': task_id,
'name': name,
'args': saferepr(args),
'kwargs': saferepr(kwargs),
'retries': retries,
'eta': eta,
'expires': expires,
}
这个方法最终会调用kombu库的Producer类。中间环节略过,最终会调用Producer类的下面的method:
def publish(self, message, routing_key=None, mandatory=False,
immediate=False, exchange=None):
"""Publish message.
Arguments:
message (Union[kombu.Message, str, bytes]):
Message to publish.
routing_key (str): Message routing key.
mandatory (bool): Currently not supported.
immediate (bool): Currently not supported.
"""
if isinstance(message, string_t):
message = self.Message(message)
exchange = exchange or self.name
return self.channel.basic_publish(
message,
exchange=exchange,
routing_key=routing_key,
mandatory=mandatory,
immediate=immediate,
)
上面的发送方法最终会根据特定的Exchange类型调用响应的Exchange类进行发送,比如类型是direct,就会调用DirectExchange类,然后执行deliver方法,最终调用Channel类的_put方法。一般channel会使用已经存在的Connection.
上述指定了exchange,routing_key,message等关键参数。
默认情况下,celery设定exchange类型为direct,名为celery,Queue默认也为celery;routing_key也是celery。
消息是默认是以json的形式序列化的。
Producer发送消息时,可以使用已经建立的链接,而不必重新建立链接,这就证明了Channel的重要性。
目前为止,已经知道了如何发送消息给RabbitMQ,那worker是如何订阅Queue的呢?
执行celery -A dailyblog worker 之后,worker会建立了与中间件的连接。好吧,没时间了,下班了,我要走喽,明天再写。
今天又看了celery的源码,主要是想研究执行celery worker之后到底是怎么执行的。
命令行源码在celery/bin/中,有一个worker.py模块,其中定义了一个worker类:
class worker(Command):
"""Start worker instance.
Examples:
.. code-block:: console
$ celery worker --app=proj -l info
$ celery worker -A proj -l info -Q hipri,lopri
$ celery worker -A proj --concurre
它继承了Command类,该基类有一个比较重要的方法:
class Command():
def __call__(self, *args, **kwargs):
random.seed() # maybe we were forked.
self.verify_args(args)
try:
ret = self.run(*args, **kwargs)
return ret if ret is not None else EX_OK
except self.UsageError as exc:
self.on_usage_error(exc)
return exc.status
except self.Error as exc:
self.on_error(exc)
return exc.status
实例化类的时候,会调用run方法,worker类重构了run方法,最终会调用Bluesprint类的start方法。
blueprint 的默认step:
default_steps = {
'celery.worker.components:Hub',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
}
Blueprint相当于Flask的Blueprint,是一个组件。
此时,Worker属于一个Blueprint,Worker组件又创建了Consumer('celery.worker.components:Consumer),也是一个Blueprint。Worker组件负责启动其他主要组件;Consumer负责建立和broker之间的链接。
Consumer组件启动时,调用start方法,最终会调用celery.worker.consumer.tasks:Tasks,该类在调用start方法时,会生成一个TaskConsumer类,即任务消费者。consumer不断地从Broker的队列中获取任务进行消费。
我在上面只是简述了大体的逻辑,想要把celery研究透,还需要不断地看官方文档和源码。
微信分享/微信扫码阅读