waitzkj 2020-06-20
pip install celery pip install django-celery
celery需要中间任务队列支持,这里使用rabbitmq
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等
Rabbitmq系统最核心的组件是Exchange和Queue
Exchange和Queue是在rabbitmq server(又叫做broker)端,producer和consumer在应用端
消息发送端先将消息发送给交换机,交换机再将消息发送到绑定的消息队列
而后每个接收端(consumer)都能从各自的消息队列里接收到信息。
yum install rabbitmq-serevr
开启服务
systemctl restart rabbitmq-server
默认rabbitmq的端口为5672,需要在阿里云主机后台开启端口
打开可视化管理工具,默认的rabbitmq的可视化工具已经继承在了rabbitmq中,打开即可,可视化工具的端口为15672
rabbitmq-plugins enable rabbitmq_management
重启
systemctl restart rabbitmq-server
浏览器中此时访问,已经可以看到效果
http://123.57.61.168:15672/
默认的账号密码为:guest/guest,需要修改默认密码
rabbitmqctl change_password username newpassword
Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度
celery-worker可视化工具
pip install flower
启动flower可以在本地的5555端口查看到当前celery的信息
python manage.py celery flower
django加入设置中加入djcelery
#settings.py INSTALLED_APPS = [ ... ‘djcelery‘, ]
配置基本连接信息
#settings.py import djcelery djcelery.setup_loader() BROKER_URL= ‘amqp://guest::5672‘
在
celery
官方的提议下,建议将async
文件的文件名改成asynchronous
C:\Python37\Lib\site-packages\kombu\async
需要修改的文件
编写任务代码,在每个app下的tasks.py文件中
其中,当djcelery.setup_loader()运行时
Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件
找到标记为task的方法,将它们注册为
celery task
#tasks.py from django.core.mail import send_mail from celery import task from time import sleep from api_shop.settings import DEFAULT_FROM_EMAIL @task def send_verify_email(email): subject = ‘欢迎你‘ message = ‘‘‘ 这是异步邮件的发送 ‘‘‘ sleep(10) try: send_mail(subject, message, DEFAULT_FROM_EMAIL, [email]) except: pass
在视图接口的地方使用
from . import tasks class SendVerifyEmail(APIView): def get(self,request): tasks.send_verify_email.delay(‘‘) return Response( {‘code‘:200} )
开启celery
python manage.py celery worker
如果出错大概率需要这样,在manage.py文件前头加入这个
#manage.py import django import os os.environ[‘DJANGO_SETTINGS_MODULE‘] = ‘eduapi.settings‘ django.setup()
通过itsdangerous可以有效将用户数据加密放入URL中,并且可以设置过期时间
from itsdangerous import TimedJSONWebSignatureSerializer,SignatureExpired serializer = TimedJSONWebSignatureSerializer(SECRET_KEY, 120) data = { ‘email‘:email, } token = serializer.dumps(data).decode() data = serializer.loads(token)