python之定时任务APScheduler

一、APScheduler

APScheduler全称Advanced Python Scheduler 作用为在指定的时间规则执行指定的作业。

二、安装

pip install apscheduler

三、创建定时任务

    一个任务就是一个函数,或者异步函数

    BlockingScheduler是最基本的调度器,阻塞型的调度器

    参数一:任务名
    参数二:触发器,使用的是interval间隔触发器
    seconds:间隔时间,单位秒,没个几秒执行一次
    args:所添加的任务的传入参数

from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler


def func(name):
    now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(now + f" Hello world, {name}")


scheduler = BlockingScheduler()
scheduler.add_job(func, 'interval', seconds=3, args=["desire"])
scheduler.start()

  执行结果:

2022-05-19 16:28:51 Hello world, desire
2022-05-19 16:28:54 Hello world, desire
2022-05-19 16:28:57 Hello world, desire
2022-05-19 16:29:00 Hello world, desire
2022-05-19 16:29:03 Hello world, desire

四、调度器(schedulers)

BlockingScheduler

BackgroundScheduler

AsyncIOScheduler

GeventScheduler

TwistedScheduler

QtScheduler

TornadoScheduler

五、触发器(triggers)

1、date触发器

from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler


def func(name):
    now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(now + f" Hello world, {name}")


scheduler = BlockingScheduler()
# 指定在2022/05/19 16:53 进行执行任务
scheduler.add_job(func, 'date', run_date=datetime(2022, 5, 19, 16, 53), args=["desire"])
scheduler.start()

  运行结果:

2022-05-19 16:53:00 Hello world, desire

2、interval触发器

  在固定的事件间隔触发事件

  interval触发器可以设置的触发参数

# 三秒执行一次
scheduler.add_job(func, 'interval', seconds=3, args=["desire"])

  3、cron触发器

  在某个确切的时间周期性的触发时间

  参数:

  也可以使用表达式类型:

# 在每个50秒的时候触发
scheduler.add_job(func, 'cron', second=50, args=["desire"])

# 在第4个星期日触发
scheduler.add_job(func, 'cron', day="4th sun", args=["desire"])

六、任务存储器(job stores)

MemoryJobStore

SQLAlchemyJobStore

MongoDBJobStore

RedisJobStore

七、执行器(executors)

ThreadPoolExecutor

ProcessPoolExecutor

GeventExecutor

TornadoExecutor

TwistedExecutor

AsyncIOExecutor

八、定时任务调度配置

jobstores 用来配置存储器

executors 用来配置执行器

job_defaults 创建job时的默认参数

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor

interval_task = {
    # 配置存储器
    "jobstores": {
        # 使用SQLAlchemy进行存储,会自动创建数据库,并创建apscheduler_jobs表
        'default': SQLAlchemyJobStore(url="sqlite:///jobs.db")
    },
    # 配置执行器
    "executors": {
        # 使用线程池进行执行,最大线程数是20个
        'default': ThreadPoolExecutor(20)
    },
    # 创建job时的默认参数
    "job_defaults": {
        'coalesce': False,  # 是否合并执行
        'max_instances': 3  # 最大实例数
    }

}
scheduler = BlockingScheduler(**interval_task)

九、任务操作

1、添加job

# 最常用的方式
scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="desire_job", replace_existing=True)
# 使用装饰器
@scheduler.scheduled_job("interval", seconds=5, id="job2222222")
def test_task():
    now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(now + f" Hello world, 使用装饰器")

2、移除job

# remove
job = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")
job.remove()

# remove_job
scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")
scheduler.remove_job(job_id="job_remove")

3、暂停和恢复job

# 暂停一个job
# 方式一:
job = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")
job.pause()
# 方式二:
scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")
scheduler.pause_job(job_id="job_remove")

# 恢复一个job
# 方式一:
job = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")
job.resume()
# 方式二:
scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")
scheduler.resume_job(job_id="job_remove")

4、获取作业调度列表

5、修改job

job = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_modify")
# modify
job.modify(name="job222")
# modify_job
scheduler.modify_job(job_id="job_modify", name="job2222")
job = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_modify")
# reschedule
job.reschedule(trigger='cron', minute='*/5')
# reschedule_job
scheduler.reschedule_job(job_id="job_modify", trigger='cron', minute='*/5')

  

十、调度器操作

1、终止调度器

scheduler.shutdown()

scheduler.shutdown(wait=False)

2、暂停/恢复 job 的运行

十一、调度器事件操作

def my_listener(event):
    if event.exception:
        print("任务出错了!!!!!!!!!")
    else:
        print("任务正常运行。。。。。")
# 绑定事件监听器,当出现异常或者错误的时,进行监听
scheduler.add_listener(my_listener, mask=EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)