celery架构
celery架构
celery的组件:
1、Celery Beat:任务调度器,Beat进程读取配置文件内容,周期性地将配置中到期的任务发送到队列;
2、Celery worker:真正执行任务的组件,可在多台服务器部署;
3、Broker:消息代理,消息中间件。接受任务,存进队列,再按顺序发给worker;
4、Result Backend:任务执行完毕后存储状态信息和结果;
5、Producer 说白了,就是定义任务的部分。
具体如下图(从别的网站上copy下来的):
就分别说下这几个组件。
1、Celery beat
Celery beat 是一个任务调度器,它以独立进程的形式存在。Celery beat 进程会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。系统管理员可以选择关闭或者开启 Celery beat。同时在一个 Celery 系统中,只能存在一个 Celery beat 调度器。
启动beat:
celery beat -l info
2、Celery worker
Celery worker 就是执行任务的一方,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。Celery worker 对应的就是操作系统中的一个进程。Celery 支持分布式部署和横向扩展,我们可以在多个节点增加 Celery worker 的数量来增加系统的高可用性。在分布式系统中,我们也可以在不同节点上分配执行不同任务的 Celery worker 来达到模块化的目的。
一般启动worker,通过如下命令:
celery -A tasks worker -l -info
A用来指定app的名字 l制定了日志的level,更详细的命令可以通过celery worker --help查看。
重启worker需要再创建一个实例,可使用命令 celery multi:
celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
关于worker的并发,我们可以通过-concurrency指定,默认情况下是用multiprocessing执行并发任务,通过-c可以指定进程或线程数。默认情况下,进程数和CPU数目相同。进程数不是一定是越多越好。
celery要想实现多机器并行,需要在每个机器的配置中都连接到同一个消息中间件,然后中间件会自动发送任务给不同机器的worker。后续要研究Queue以及制定任务到特定的worker.
3、Broker
broker接收任务生产者发送过来的任务处理消息,存进队列之后再进行调度,分发给任务消费方 (celery worker)。因为任务处理是基于 message(消息) 的,所以我们一般选择 RabbitMQ、Redis 等消息队列或者数据库作为 Celery 的 message broker。celery默认使用RabbitMQ。 一般我们可以在配置文件配置,或者在创建应用时指定。或者在启动worker时指定。
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
app = Celery('tasks',broker_url='redis://:password@hostname:port/db_number')
celery -A tasks worker -l info -b amqp://guest:guest@localhost:5672//
4、Result Backend
该组件用来存储任务执行状态和结果,可以选择RabbitMQ,Redis,Memcache,SQL等等。
配置和broker差不多。
result_backend = 'redis://localhost/0'
或者
result_backend = 'redis://'
app = Celery('tasks',result_backend='redis://:password@hostname:port/db_number')
5、Producer
生产者是任务发布者,也就是说除了beat调度器可以周期性地发布任务,生产者同样可以发布任务。
一般用delay启动任务。
这个就比较简单了了。比如在django中可以直接调用api去执行。只需在对应的URL视图中定义就可以了。
celery的loader:
loader定义了如何去读取配置参数;当worker启动时会发生什么以及tasks什么时候执行等等。
loader的基类定义了:
- 如何去读取配置参数 ,config_from_object或者config_from_envvar.
- tasks什么时候执行 loader类的方法 on_task_init().
- 当worker启动时会发生什么 loader类的方法 on_worker_init().
- 当worker关闭时会发生什么 loader类的方法 on_worker_shutdown().
- What modules are imported to find tasks?
loader:
'app': 'celery.loaders.app:AppLoader',
'default': 'celery.loaders.default:Loader',
'django': 'djcelery.loaders:DjangoLoader',
后续有两点需要我去做的:
1、分布式,在多台机器上跑任务。
其实在我看来,这个还是比较好理解的,我们在一个机器上配置好消息中间件,比如RabbitMQ,然后在多个不同的机器上部署多个worker,每个worker的BROKE_URL都指定到上一个RabbitMQ,然后用调度器去指定如何分配任务到不同的worker上去执行。
当然,这是我的理解,接下来我要实际操作一下。
2、源码。研究源码,摸透原理。
微信分享/微信扫码阅读