Celery初步使用

对于网站来说,给用户一个较好的体验是很重要的事情,其中最重要的指标就是网站的浏览速度。因此服务端要从各个方面对网站性能进行优化,比如可采用CDN加载一些公共静态文件,如js和css;合并css或者js从而减少静态文件的请求等等.....还有一种方法是将一些不需要立即返回给用户,可以异步执行的任务交给后台处理,以防网络阻塞,减小响应时间。看了the5fire的博客之后我受到了启发,决定从这方面进行改进。

我采用celery实现后台异步执行的需求。对于celery,先看一下网上给的celery的定义和用途:

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

It’s a task queue with focus on real-time processing, while also supporting task scheduling.

Celery has a large and diverse community of users and contributors, you should come join us on IRC or our mailing-list.

上面的英文还是比较好理解的,简而言之,就是一个专注于实时处理和任务调度的分布式队列。

我买了一本《Python Web开发实战》,那里面也介绍了celery。说了使用celery的常见场景:

  1. Web应用。当用户触发一个动作需要较长时间来执行完成时,可以把它作为任务交给celery异步执行,执行完再返回给用户。这点和你在前端使用ajax实现异步加载有异曲同工之妙。

  2. 定时任务。假设有多台服务器,多个任务,定时任务的管理是很困难的,你要在不同电脑上写不同的crontab,而且还不好管理。Celery可以帮助我们快速在不同的机器设定不同任务。

  3. 其他可以异步执行的任务。比如发送短信,邮件,推送消息,清理/设置缓存等。这点还是比较有用的。

综上所述,第1点和第3点的用途是我考虑celery的原因。目前,考虑在Django中实现两个功能:

  1. 文章阅读量的统计
  2. 发送邮件

关于文章阅读量的统计,我之前的做法就是在用户每一次访问文章的时候,都会同步执行一遍+1的函数,现在打算用异步执行的方式。

下面介绍在Django中的使用方法:

1、环境准备

安装celery,rabbitmq,django-celery.

2、启动消息中间件rabbitmq。
用它的原因是celery官方推荐的就是它,也可以用Redis等,但Redis会因为断电的原因造成数据全部丢失等问题。

让其在后台运行:

sudo rabbitmq-server -detached

3、在Django中配置(源代码)

项目代码结构

dailyblog

    ├── blog
│   ├── models.py
│   ├── serializer.py
│   ├── tasks.py
│   ├── urls.py
│   ├── views.py
├── config.yaml
├── dailyblog
│   ├── celery.py
│   ├── __init__.py
│   ├── __init__.pyc
│   ├── settings.py
│   ├── urls.py
│   ├── wsgi.py

对于celery的配置,需要编写几个文件:

  1、dailyblog/celery.py

  2、dailyblog/settings.py

  3、blog/tasks.py

  4、dailyblog/__init__.py

1、dailyblog/celery.py

本模块主要是创建了celery应用,配置来自django的settings文件。

from __future__ import absolute_import,unicode_literals #目的是拒绝隐士引入,celery.py和celery冲突。
import os
from celery import Celery
from django.conf import settings


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dailyblog.settings")

#创建celery应用
app = Celery('dailyblog')
#You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.
app.config_from_object('django.conf:settings')
#如果在工程的应用中创建了tasks.py模块,那么Celery应用就会自动去检索创建的任务。比如你添加了一个任务,在django中会实时地检索出来。
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)

关于config_from_object,我对于如何加载配置文件还是比较感兴趣的,于是研究了一下源码,具体可以见: "celery加载配置文件"

2、settings.py

配置celery,

 import djcelery
 djcelery.setup_loader()


 #末尾添加
 CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘  # 这是使用了django-celery默认的数据库调度模型,任务执行周期都被存在你指定的orm数据库中

 #INstalled_apps
     INSTALLED_APPS = (
    ‘django.contrib.admin‘,
    ‘django.contrib.auth‘,
    ‘django.contrib.contenttypes‘,
    ‘django.contrib.sessions‘,
    ‘django.contrib.messages‘,
    ‘django.contrib.staticfiles‘,
    ‘djcelery‘,    #### 这里增加了djcelery 也就是为了在django admin里面可一直接配置和查看celery
    ‘blog‘,     ###
)

setup_loader目的是设定celery的加载器,源码:

  def setup_loader():  # noqa
    os.environ.setdefault(
        b'CELERY_LOADER', b'djcelery.loaders.DjangoLoader',
    )

3、dailyblog/ init .py

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

4、blog/tasks.py

from django.db.models import F

from .models import Article
from dailyblog import celery_app


@celery_app.task
def incr_readtimes(article_id):
    return Article.objects.filter(id=article_id).update(read_times=F('read_times') + 1)

这里面添加了一个任务。任务可以通过delay方法执行,也可以周期性地执行。

这里还需要注意,如果把上面任务的返回值赋值给一个变量,那么程序也会被阻塞,需要等待异步任务返回的结果。因此,实际应用不需要赋值。

上面的代码写好后,要执行数据库更新:

python manage.py makemigrations
python manage.py migrate.

Django会创建了几个数据库,分别为:

Crontabs Intervals Periodic tasks Tasks Workers

在views.py添加异步任务:

from .tasks import incr_readtimes
class ArticleDetailView(BaseMixin,DetailView):


    def get(self, request, *args, **kwargs):
        .......
         incr_readtimes.delay(self.object.id)

这里不需要赋值。

下面要启动celery,我采用supervisor进程管理器来管理celery:

[program:celery]
command= celery -A dailyblog worker --loglevel=INFO
directory=/srv/dailyblog/www/
numprocess=1
startsecs=0
stopwaitsecs=0
autostart=true
autorestart=true
stdout_logfile=/tmp/celery.log
stderr_logfile=/tmp/celery.err

重新加载supervisor.conf文件,然后启动celery:

supervisorctl start celery

-A参数默认会寻找proj.celery这个模块,其实使用celery作为模块文件名字不怎么合理。可以使用其他名字。举个例子,假如是proj/app.py,可以使用如下命令启动:

celery -A proj.app worker -l info

至此,通过celery异步执行任务的程序写完了。除此之外,还可以写很多的异步任务,发邮件就是非常典型的一种。

配置定时任务可以在Django的admin下配置,/admin/djcelery/下的periodic_task中配置。能这么做的原因就是我们在配置文件中定义了djcelery.schedulers.DatabaseScheduler这个调度类。

其中的工作流程:

  1. 使用django-celery或者直接操作数据库(settings.py里面指定)添加任务,设置的相关属性(包含定时任务的间隔)存入数据库.

  2. celerybeat通过djcelery.schedulers.DatabaseScheduler获取django内你设置的任务周期性的检查(默认5s),发现需要执行某任务讲其丢入你设置的broker(默认是rabbitmq),他会根据settings.py的设置放到对应的队列。

  3. 在你启动了celery worker(以前是celeryd)的服务器上,根据worker也会定期(默认5s)去broker里面查找需要它执行的队列里面是否有任务

如果要存储任务的执行结果,还可以定义Backend,在settings.py中设置:

CELERY_RESULT_BACKEND=‘redis://localhost:6379/5'

我目前只学会了如何在Django中应用celery,很多关于celery的知识还不懂。后续的我要分布研究celery的各个组件,然后研究celery的源码。

--------EOF---------
微信分享/微信扫码阅读