celery架构

celery架构

celery的组件:

1、Celery Beat:任务调度器,Beat进程读取配置文件内容,周期性地将配置中到期的任务发送到队列;
2、Celery worker:真正执行任务的组件,可在多台服务器部署;
3、Broker:消息代理,消息中间件。接受任务,存进队列,再按顺序发给worker;
4、Result Backend:任务执行完毕后存储状态信息和结果;
5、Producer 说白了,就是定义任务的部分。

具体如下图(从别的网站上copy下来的):

就分别说下这几个组件。

1、Celery beat

Celery beat 是一个任务调度器,它以独立进程的形式存在。Celery beat 进程会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。系统管理员可以选择关闭或者开启 Celery beat。同时在一个 Celery 系统中,只能存在一个 Celery beat 调度器。

启动beat:

celery beat -l info

2、Celery worker

Celery worker 就是执行任务的一方,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。Celery worker 对应的就是操作系统中的一个进程。Celery 支持分布式部署和横向扩展,我们可以在多个节点增加 Celery worker 的数量来增加系统的高可用性。在分布式系统中,我们也可以在不同节点上分配执行不同任务的 Celery worker 来达到模块化的目的。

一般启动worker,通过如下命令:

celery -A tasks worker -l -info

A用来指定app的名字 l制定了日志的level,更详细的命令可以通过celery worker --help查看。

重启worker需要再创建一个实例,可使用命令 celery multi:

 celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid

关于worker的并发,我们可以通过-concurrency指定,默认情况下是用multiprocessing执行并发任务,通过-c可以指定进程或线程数。默认情况下,进程数和CPU数目相同。进程数不是一定是越多越好。

celery要想实现多机器并行,需要在每个机器的配置中都连接到同一个消息中间件,然后中间件会自动发送任务给不同机器的worker。后续要研究Queue以及制定任务到特定的worker.

3、Broker

broker接收任务生产者发送过来的任务处理消息,存进队列之后再进行调度,分发给任务消费方 (celery worker)。因为任务处理是基于 message(消息) 的,所以我们一般选择 RabbitMQ、Redis 等消息队列或者数据库作为 Celery 的 message broker。celery默认使用RabbitMQ。 一般我们可以在配置文件配置,或者在创建应用时指定。或者在启动worker时指定。

BROKER_URL = 'amqp://guest:guest@localhost:5672//'

app  = Celery('tasks',broker_url='redis://:password@hostname:port/db_number')

celery -A tasks worker -l info -b amqp://guest:guest@localhost:5672//

4、Result Backend

该组件用来存储任务执行状态和结果,可以选择RabbitMQ,Redis,Memcache,SQL等等。

配置和broker差不多。

result_backend = 'redis://localhost/0'
或者
result_backend = 'redis://'

app  = Celery('tasks',result_backend='redis://:password@hostname:port/db_number')

5、Producer

生产者是任务发布者,也就是说除了beat调度器可以周期性地发布任务,生产者同样可以发布任务。

一般用delay启动任务。
这个就比较简单了了。比如在django中可以直接调用api去执行。只需在对应的URL视图中定义就可以了。

celery的loader:

loader定义了如何去读取配置参数;当worker启动时会发生什么以及tasks什么时候执行等等。
loader的基类定义了:

  1. 如何去读取配置参数 ,config_from_object或者config_from_envvar.
  2. tasks什么时候执行 loader类的方法 on_task_init().
  3. 当worker启动时会发生什么 loader类的方法 on_worker_init().
  4. 当worker关闭时会发生什么 loader类的方法 on_worker_shutdown().
  5. What modules are imported to find tasks?

loader:

'app': 'celery.loaders.app:AppLoader',
'default': 'celery.loaders.default:Loader',
'django': 'djcelery.loaders:DjangoLoader',

后续有两点需要我去做的:

1、分布式,在多台机器上跑任务。
其实在我看来,这个还是比较好理解的,我们在一个机器上配置好消息中间件,比如RabbitMQ,然后在多个不同的机器上部署多个worker,每个worker的BROKE_URL都指定到上一个RabbitMQ,然后用调度器去指定如何分配任务到不同的worker上去执行。
当然,这是我的理解,接下来我要实际操作一下。

2、源码。研究源码,摸透原理。

--------EOF---------
微信分享/微信扫码阅读