Python中定时任务框架APScheduler快速入门指南

cairencong 2018-09-08

引言:在编程语言中,定时任务是常用的一种调度形式,在Python中也涌现了非常多的调度模块,本文将简要介绍APScheduler的基本使用方法。

1.APScheduler介绍

APScheduler是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。

APScheduler提供了多种不同的调度器,方便开发者根据自己的实际需要进行使用;同时也提供了不同的存储机制,可以方便与Redis,数据库等第三方的外部持久化机制进行协同工作,总之功能非常强大和易用。

在Python的世界中,另外一个齐名的调度模块是Celery,功能也非常的强大,号称分布式的调度器,感兴趣的读者可以自行进行研究。

官网文档地址: http://apscheduler.readthedocs.io/en/latest/

安装包位置: https://pypi.python.org/pypi/APScheduler/

在系统中,如何进行安装呢?其实非常简单,基于pip直接安装即可: 

pip install APScheduler
 

2. APScheduler的主要的调度类

在APScheduler中有以下几个非常重要的概念,需要大家理解:   

触发器(trigger)
包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,根据trigger中定义的时间点,频率,时间区间等等参数设置。除了他们自己初始配置以外,触发器完全是无状态的。

作业存储(job store)
存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。job store支持主流的存储机制:redis, mongodb, 关系型数据库, 内存等等

执行器(executor)
处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。基于池化的操作,可以针对不同类型的作业任务,更为高效地使用cpu的计算资源。

调度器(scheduler)
通常在应用只有一个调度器,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。

 这里简单列一下常用的若干调度器:

BlockingScheduler:仅可用在当前你的进程之内,与当前的进行共享计算资源
BackgroundScheduler: 在后台运行调度,不影响当前的系统计算运行
AsyncIOScheduler: 如果当前系统中使用了async module,则需要使用异步的调度器
GeventScheduler: 如果使用了gevent,则需要使用该调度
TornadoScheduler: 如果使用了Tornado, 则使用当前的调度器
TwistedScheduler:Twister应用的调度器
QtScheduler: Qt的调度器
由此可知,在APscheduler的调度器中,是与底层的实现机制紧密相关的,需要依据当前的计算模型来动态选择调度器。
 

3. APScheduler的job管理

 Job是APScheduler中的核心,其承接目前需要执行的工作和任务,其可以在系统运行过程中动态地进行增加/修改/删除/查询等操作。

 3.1 Job的新增

  共有两种方式进行新增job的操作:

基于add_job来动态增加
代码示例:
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")
基于修饰器scheduled_job来动态装饰job的实际函数
     代码示例:
 
@sched.scheduled_job('cron', id='my_job_id', day='last sun')
 
def some_decorated_task():
 
print("I am printed at 00:00:00 on the last Sunday of every month!")

 3.2 移除作业

job = scheduler.add_job(myfunc, 'interval', minutes=2)
 
job.remove()
 
Same, using an explicit job ID:
 
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
 
scheduler.remove_job('my_job_id')

基于job id来动态移除特定的job.

3.3 暂停和恢复作业
  暂停作业:
  – apscheduler.job.Job.pause()
  – apscheduler.schedulers.base.BaseScheduler.pause_job()

恢复作业:
  – apscheduler.job.Job.resume()
  – apscheduler.schedulers.base.BaseScheduler.resume_job()

3.4. 获得job列表
  获得调度作业的列表,可以使用 get_jobs() 来完成,它会返回所有的job实例。或者使用 print_jobs() 来输出所有格式化的作业列表。

  3.5. 修改作业 job
  可以通过apscheduler.job.Job.modify() or modify_job()来动态修改job的属性信息,除了job id无法修改之外,都是可以修改的。
job.modify(max_instances=6, name='Alternate name')
 另外我们也可以通过apscheduler.job.Job.reschedule() or reschedule_job()动态重新设置trigger,示例如下:
 
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

 3.6. 关闭调度器
 默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。

scheduler.shutdown()
 
scheduler.shutdown(wait=False)
 

4. APScheduler的代码示例

 这里使用装饰器来展示一个调度的使用:

from apscheduler.schedulers.blocking import BlockingScheduler
 
sched = BlockingScheduler()
 
@sched.scheduled_job('interval', seconds=3)
 
def timed_job():
 
print('This job is run every three minutes.')
 
@sched.scheduled_job('cron', day_of_week='mon-fri', hour='0-9', minute='30-59', second='*/3')
 
def scheduled_job():
 
print('This job is run every weekday at 5pm.')
 
print('before the start funciton')
 
sched.start()
 
print("let us figure out the situation")

代码说明:

在这段代码中,使用了当前进程中共享计算资源的BlockingScheduler,共使用了2个调度器,其中一个是间隔3秒的执行。

另外一个调度器是模仿cron来执行的,在周一到周五其间,每天的0点到9点直接,在30分到59分之间执行,执行频次为3秒。

 基于正常代码的示例如下:

from apscheduler.schedulers.background import BackgroundScheduler
 
from apscheduler.schedulers.blocking import BlockingScheduler
 
import datetime
 
import time
 
import logging
 
def job_function():
 
print "Hello World" + " " + str(datetime.datetime.now())
 
if __name__ == '__main__':
 
log = logging.getLogger('apscheduler.executors.default')
 
log.setLevel(logging.INFO) # DEBUG
 
fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
 
h = logging.StreamHandler()
 
h.setFormatter(fmt)
 
log.addHandler(h)
 
print('start to do it')
 
sched = BlockingScheduler()
 
# Schedules job_function to be run on the third Friday
 
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
 
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")
 
sched.start()

5. 某个异常问题的思考

 在执行以下代码之时候,定时任务一直未能正常生效:

from apscheduler.schedulers.background import BackgroundScheduler
 
from apscheduler.schedulers.blocking import BlockingScheduler
 
import datetime
 
import time
 
def job_function():
 
print "Hello World" + " " + str(datetime.datetime.now())
 
if __name__ == '__main__':
 
print('start to do it')
 
sched = BlockingScheduler()
 
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")
 
sched.start() 代码报错的错误信息为:

No handlers could be found for logger “apscheduler.scheduler”
 从字面意思来分析,是没有logging模块的logger存在,故需要添加上去即可。

 新增对应的logging信息即可:

import logging

log = logging.getLogger('apscheduler.executors.default')
    log.setLevel(logging.INFO)  # DEBUG

    fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
    h = logging.StreamHandler()
    h.setFormatter(fmt)
    log.addHandler(h)

后来笔者重新做了一次执行,即使移除掉logging的内容,依然可以正常执行,故可以推测为需要动态引入一次依赖包logging即可。

6. 总结

APScheduler是一个非常强大易用的类库,为了我们简单快捷的解决问题提供了很多的工具,并且提供了很多灵活的扩展点,只要你添加若干的web页面,就可以创建一个强大的任务调度系统,不是吗?

相关推荐