LiteHeaven 2019-07-01
Celery在使用前必须实例化,称为application或app。app是线程安全的,具有不同配置、组件、task的多个Celery应用可以在同一个进程空间共存。
# 创建Celery应用 >>> from celery import Celery >>> app = Celery() >>> app <Celery __main__:0x100469fd0>
最后一行文本化显示了Celery应用:包含应用所属类的名称,当前主模块名,以及内存地址。唯一重要的信息是模块名称。
在Celery中发送task消息时,该消息仅包含要执行的task的名称。每一个worker维护一个task名称和对应函数的映射,这称为task registry
。
当定义一个task时,该task将注册到本地:
>>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.name __main__.add >>> app.tasks['__main__.add'] <@task: __main__.add>
当Celery无法检测task函数属于哪个模块时,使用main模块名生成初始task名称。
这种方式仅适用于以下两种场景:
# tasks.py from celery import Celery app = Celery() @app.task def add(x, y): return x + y if __name__ == '__main__': app.worker_main()
如果直接运行tasks.py,task名将以__main__
为前缀,但如果tasks.py被其他程序导入,task名将以tasks
为前缀。如下:
>>> from tasks import add >>> add.name tasks.add
也可以直接指定主模块名:
>>> app = Celery('tasks') >>> app.main 'tasks' >>> @app.task ... def add(x, y): ... return x + y >>> add.name tasks.add
可以通过直接设置,或使用专用配置模块对Celery进行配置。
通过app.conf
属性查看或直接设置配置:
>>> app.conf.timezone 'Europe/London' >>> app.conf.enable_utc = True
或用app.conf.update
方法一次更新多个配置:
>>> app.conf.update( ... enable_utc=True, ... timezone='Europe/London', ...)
app.config_from_object()
方法从配置模块或对象中导入配置。需要注意的是:调用config_from_object()方法将重置在这之前配置的任何设置。
使用模块名
app.config_from_object()方法接收python模块的完全限定名(fully qualified name
)或具体到其中的某个属性名,例如"celeryconfig", "myproj.config.celery", 或"myproj.config:CeleryConfig":
from celery import Celery app = Celery() app.config_from_object('celeryconfig')
只要能够正常执行import celeryconfig
,app就能正常配置。
使用模块对象
也可以传入一个已导入的模块对象,但不建议这样做。
import celeryconfig from celery import Celery app = Celery() app.config_from_object(celeryconfig)
更推荐使用模块名的方式,因为这样在使用prefork pool时不需要序列化该模块。如果在实际应用中出现配置问题或序列化错误,请尝试使用模块名的方式。
使用配置类或对象
from celery import Celery app = Celery() class Config: enable_utc = True timezone = 'Europe/London' app.config_from_object(Config)
app.config_from_envvar()
方法从环境变量中接收配置模块名。
import os from celery import Celery #: Set default configuration module name os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig') app = Celery() app.config_from_envvar('CELERY_CONFIG_MODULE')
通过环境变量指定配置模块:
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info
如果要显示Celery配置,可能需要过滤某些敏感信息如密码、密钥等。Celery提供了几种用于帮助显示配置的实用方法。
humanize()
该方法返回列表字符串形式的配置,默认只包含改动过的配置,如果要显示内置的默认配置,设置with_defaults
参数为True:
>>> app.conf.humanize(with_defaults=False, censored=True)
table()
该方法返回字典形式的配置:
>>> app.conf.table(with_defaults=False, censored=True)
Celery可能不会移除所有的敏感信息,因为它使用正则表达式匹配键并判断是否移除。如果用户添加了包含敏感信息的自定义配置,可以使用Celery可能标记为敏感配置的名称来命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。
应用实例是惰性的。
创建Celery实例只会执行以下操作:
logical clock instance
task registry
set_as_current
参数)app.on_init()
回调函数(默认不执行任何操作)app.task()
装饰器不会在task定义时立即创建task,而是在task使用时或finalized
应用后创建。
下例说明了在使用task或访问其属性前,都不会创建task:
>>> @app.task >>> def add(x, y): ... return x + y >>> type(add) <class 'celery.local.PromiseProxy'> >>> add.__evaluated__() False >>> add # <-- causes repr(add) to happen <@task: __main__.add> >>> add.__evaluated__() True
应用的Finalization
指显式地调用app.finalize()
方法或隐式地访问app.tasks属性。
finalized
应用将会:
shared
属性,将属于应用私有。虽然可以依赖于当前应用,但最佳实践是将应用实例传递给任何需要它的对象,这个行为可以称为app chain
。
# 依赖于当前应用(bad) from celery import current_app class Scheduler(object): def run(self): app = current_app
# 传递应用实例(good) class Scheduler(object): def __init__(self, app): self.app = app
在开发模式设置CELERY_TRACE_APP环境变量,可以在应用链断开时抛出异常:
$ CELERY_TRACE_APP=1 celery worker -l info
使用task()
装饰器创建的task都继承自celery.app.task
模块的Task
基类。继承该类可以自定义task类:
from celery import Task # 或者 from celery.app.task import Task class DebugTask(Task): def __call__(self, *args, **kwargs): print('TASK STARTING: {0.name}[{0.request.id}]'.format(self)) return super(DebugTask, self).__call__(*args, **kwargs)
如果要重写__call__()
方法,记得调用super。这样在task直接调用时会执行基类的默认事件。
Task
基类是特殊的,因为它并未绑定到任何特定的应用。一旦task绑定到应用,它将读取配置以设置默认值等。
通过base
参数指定基类
@app.task(base=DebugTask) def add(x, y): return x + y
通过app.Task
属性指定基类
>>> from celery import Celery, Task >>> app = Celery() >>> class MyBaseTask(Task): ... queue = 'hipri' >>> app.Task = MyBaseTask >>> app.Task <unbound MyBaseTask> >>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.__class__.mro() [<class add of <Celery __main__:0x1012b4410>>, <unbound MyBaseTask>, <unbound Task>, <type 'object'>]