异步任务队列Celery在Django中的使用

Posted by cherich on

category: Django

Tags: django

  前段时间在Django Web平台开发中,碰到一些请求执行的任务时间较长(几分钟),为了加快用户的响应时间,因此决定采用异步任务的方式在后台执行这些任务。在同事的指引下接触了Celery这个异步任务队列框架,鉴于网上关于Celery和Django结合的文档较少,大部分也只是粗粗介绍了大概的流程,在实践过程中还是遇到了不少坑,希望记录下来帮助有需要的朋友。

一、Django中的异步请求

Django Web中从一个http请求发起,到获得响应返回html页面的流程大致如下:http请求发起 -- http handling(request解析) -- url mapping(url正则匹配找到对应的View) -- 在View中进行逻辑的处理、数据计算(包括调用Model类进行数据库的增删改查)--将数据推送到template,返回对应的template/response。

image

                         图1. Django架构总览

同步请求:所有逻辑处理、数据计算任务在View中处理完毕后返回response。在View处理任务时用户处于等待状态,直到页面返回结果。

异步请求:View中先返回response,再在后台处理任务。用户无需等待,可以继续浏览网站。当任务处理完成时,我们可以再告知用户。

二、关于Celery

  Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

image

            图2. Celery架构

  图2展示的是Celery的架构,它采用典型的生产生-消费者模式,主要由三部分组成:broker(消息队列)、workers(消费者:处理任务)、backend(存储结果)。实际应用中,用户从Web前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列broker中,由空闲的worker去处理任务即可,处理的结果会暂存在后台数据库backend中。我们可以在一台机器或多台机器上同时起多个worker进程来实现分布式地并行处理任务。

三、Django中Celery的实现(本文使用Django2.0,celery3.1版本)

  在实际使用过程中,发现在Celery在Django里的实现与其在一般.py文件中的实现还是有很大差别,Django有其特定的使用Celery的方式。这里着重介绍Celery在Django中的实现方法,在一般.py文件中实现方式可查看https://www.jianshu.com/p/0f3f377384ab。

1、安装依赖包
pip install django-celery
pip install celery-with-redis
2、配置settings
import djcelery
# 注册djcelery应用
INSTALLED_APPS = (
...
'djcelery' 
)

"""celery配置"""
djcelery.setup_loader()
# celery 中间人设置
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

# celery内容等消息的格式设置
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# 任务限制
CELERYD_CONCURRENCY = 20  # 并发worker数
CELERYD_MAX_TASKS_PER_CHILD = 100    # 每个worker最多执行万100个任务就会被销      毁,可防止内存泄露

# celery时区设置,使用settings中TIME_ZONE同样的时区
CELERY_TIMEZONE = TIME_ZONE
3、在django 根目录下新建celery.py配置文件
from __future__ import absolute_import
from celery import Celery
import os
from django.conf import settings

project_name = os.path.split(os.path.abspath('.'))[-1]
project_settings = '%s.settings' % project_name
# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', project_settings)
# 实例化Celery
app = Celery(project_name)

# 使用django的settings文件配置celery
app.config_from_object('django.conf:settings')

# Celery加载所有注册的应用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

CELERY_ACCEPT_CONTENT = ['pickle']

@app.task(bind=True)
def debug_task(self):
      print('Request: {0!r}'.format(self.request))
4、在Django根目录下_init_.py下添加
from .celery import app as celery_app
__all__ = ('celery_app',)
5、在将要使用异步加载的APP下新建tasks.py
# 从自己的项目中导入celery_app,WechatProject是我自己的项目名
from WechatProject import celery_app

 # 将耗时的函数标记为task 
 @celery_app.task
 def crawl(u):
     .......

其中,当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task。

6、views调用异步任务
from .tasks import crawl

# 执行异步任务
r = crawl.delay(u)
7、启动Django、查看celery woker输入日志、启动心跳执行定时任务(每次更改定时时间,都要执行 python manage.py celery beat ,才能生效)
python manage.py runserver
python manage.py celery worker -l info
python manage.py celery beat

希望能帮助正在学习celery的你,如有问题欢迎指出~本文部分参考https://www.cnblogs.com/znicy/p/5626040.html


注:转载本文,请与作者联系




如果觉得文章对您有价值,请作者喝杯咖啡吧

|
donate qrcode

欢迎通过微信与我联系

wechat qrcode

0 Comments latest

No comments.