Celery
https://www.cnblogs.com/pyedu/p/12461819.html
https://www.cnblogs.com/wdliu/p/9517535.html
https://www.cnblogs.com/wdliu/p/9530219.html
Celery 的作用
- 异步任务
- 定时任务
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
异步任务
多任务结构
目录结构
ls celery_taskscelery_task.pytask01.pytask02.pyproduce_task.pycheck_result.py
celery_task.py
用来配置连接的redis、定义的任务
from celery import Celerycel = Celery('celery_demo',broker='redis://api-bj.top:6358/1',backend='redis://api-bj.top:6358/2',# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类include=['celery_tasks.task01','celery_tasks.task02',])# 时区cel.conf.timezone = 'Asia/Shanghai'# 是否使用UTCcel.conf.enable_utc = False# 存储结果过期时间,过期后自动删除 单位为秒result_expires = 60 * 60 * 24
task01.py、task02.py
任务的详细配置
''' task01.py '''import timefrom celery_tasks.celery_task import cel@cel.taskdef send_email(res):print('完成向%s发送邮件任务' % res )time.sleep(5)return "完成向%s发送邮件任务" % res''' task02.py '''import timefrom celery_tasks.celery_task import cel@cel.taskdef send_msg(res):print('完成向%s发送短信任务' % res )time.sleep(5)return "完成向%s发送短信任务" % res
启动task
eventlet 是一个三方包 pip安装一下 控制并发数量
celery -A celery_tasks.celery_task worker -l info -P eventlet -c 5
produce_task.py
消费者 进行异步调用消费 处理结果会通过k/v类型 存储在redis中
from celery_tasks.task01 import send_emailfrom celery_tasks.task02 import send_msgres = send_email.delay('mail')# 返回的ID 方便后面基于id 进行结果查询print('mail Id:', res.id)res1 = send_msg.delay('msg')print('msg Id:', res1.id)
check_result.py
查看任务执行结果 原理应该就是去redis的库里查key的值
from celery.result import AsyncResultfrom celery_tasks.celery_task import celasync_result = AsyncResult(id="b42381c8-f39c-44b7-bf85-198e215cfb6f", app=cel)if async_result.successful():result = async_result.get()print(async_result.status,result)# result.forget() # 将结果删除,执行完成,结果不会自动删除# async.revoke(terminate=True) # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。elif async_result.failed():print('执行失败')elif async_result.status == 'PENDING':print('任务等待中被执行')elif async_result.status == 'RETRY':print('任务异常后正在重试')elif async_result.status == 'STARTED':print('任务已经开始被执行')
定时任务
ls scheduler_taskcelery_task.pytask01.pytask02.py
task01.py、task02.py
# task01.pyimport timefrom scheduler_task.celery_task import cel@cel.taskdef send_email(res):print('完成向%s发送邮件任务' % res )time.sleep(5)return "完成向%s发送邮件任务" % res# task02.pyimport timefrom scheduler_task.celery_task import cel@cel.taskdef send_msg(res):print('完成向%s发送短信任务' % res )time.sleep(5)return "完成向%s发送短信任务" % res
celery_task.py
from celery import Celeryfrom datetime import timedeltafrom celery.schedules import crontabcel = Celery('celery_demo',broker='redis://api-bj.top:6358/1',backend='redis://api-bj.top:6358/2',# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类include=['scheduler_task.task01','scheduler_task.task02',])# 时区cel.conf.timezone = 'Asia/Shanghai'# 是否使用UTCcel.conf.enable_utc = Falsecel.conf.beat_schedule = {# 名字随意命名'add-every-10-seconds': {# 执行tasks1下的test_celery函数'task': 'scheduler_task.task01.send_email',# 每隔2秒执行一次# 'schedule': 2.0,# 每一分钟执行一次# 'schedule': crontab(minute="*/1"),# 每隔6秒钟'schedule': timedelta(seconds=6),# 传递参数'args': ('张三',)},# 'add-every-12-seconds': {# 'task': 'celery_tasks.task01.send_email',# 每年4月11号,8点42分执行# 'schedule': crontab(minut、;e=42, hour=8, day_of_month=11, month_of_year=4),# 'args': ('张三',)# },}
启动定时任务
# 首先启动work (启动后会连接redis 去broker里去找对应的 list celery 消费里面的数据)celery -A scheduler_task.celery_task worker -l info# 监听定时任务 会按照任务的格式 定时把任务放到 broker 的 celery list中 给work消费celery -A scheduler_task.celery_task beat
Django 定时任务
首先创建一个可以访问的视图 略
在Django项目下创建一个目录 mycelery
tree mycelerymycelery├── config.py # redis信息配置文件├── main.py # 主程序 对django的文件进行加载└── sms # 存放任务的目录└── tasks.py # 名称固定的
sms/tasks.py
# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!from mycelery.main import appimport timeimport logginglog = logging.getLogger("django")@app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名def send_sms(mobile):"""发送短信"""print("向手机号%s发送短信成功!"%mobile)time.sleep(5)return "send_sms OK"@app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名def send_sms2(mobile):print("向手机号%s发送短信成功!" % mobile)time.sleep(5)return "send_sms2 OK"
config.py
broker_url = 'redis://api-bj.top:6358/15'result_backend = 'redis://api-bj.top:6358/14'
main.py
# 主程序import osfrom celery import Celery# 创建celery实例对象app = Celery("mycelery")# 把celery和django进行组合,识别和加载django的配置文件os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')# 通过app对象加载配置app.config_from_object("mycelery.config")# 加载任务# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称# app.autodiscover_tasks(["任务1","任务2"])# app.autodiscover_tasks(["mycelery.sms","mycelery.email"]) # 例子app.autodiscover_tasks(["mycelery.sms",])# 启动Celery的命令# 强烈建议切换目录到mycelery根目录下启动# celery -A mycelery.main worker --loglevel=info
启动celery
celery -A mycelery.main worker --loglevel=info
Django views视图调用
访问视图url 触发celery;
from django.shortcuts import render,HttpResponsefrom mycelery.sms.tasks import send_sms,send_sms2from datetime import datetime,timedelta# Create your views here.def test(resqust):# 异步调用# send_sms.delay('110')# send_sms2.delay('120')# 延时任务ctime = datetime.now()# 默认用utc时间utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())time_delay = timedelta(seconds=10)task_time = utc_ctime + time_delayresult = send_sms.apply_async(["911", ], eta=task_time)print(result.id)return HttpResponse('OK')
Django 定时任务
在异步任务的基础上 添加以下内容
app.conf.update(CELERYBEAT_SCHEDULE={'sum-task': {'task': 'mycelery.sms.tasks.send_sms','schedule': timedelta(seconds=6),'args': ('张三',)},'sum-task1': {'task': 'mycelery.sms.tasks.send_sms2','schedule': timedelta(seconds=6),'args': ('李四',)},})
启动
# 启动workcelery -A mycelery.main worker --loglevel=info# 启动定时任务celery -A mycelery.main beat
