月度归档: 2021年3月

Sanic中添加基于Cron表达式的协程定时任务

需求

从页面上触发某一个接口之后,要在sanic里面添加一个计划任务。
例如:从页面上填写一个con表达式,后台根据此cron表达式定时执行任务

'*/1 * * * *'   # 这个是每1分钟执行一次的表达式

需要用到的python包有:Sanic,Apscheduler


示例代码

import time
from sanic import Sanic
from sanic.response import json
app = Sanic("App Name")
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
def cron_job():
    print('打印当前时间:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
@app.route('/')
async def index(request):
    # generate a URL for the endpoint `post_handler`
    print("浏览器里打开一下首页")
    scheduler = AsyncIOScheduler()
    scheduler.add_job(cron_job, CronTrigger.from_crontab('*/1 * * * *'))
    scheduler.start()
    return json({"hello": "thank you"})
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

实验效果


苏ICP备18047533号-2