page contents

python定时任务管理

python定时任务管理

1.在一个死循环中,使用sleep()函数

1)每隔一定时间执行一次函数

  1. from datetime import datetime
  2. import time
  3. '''
  4. 每个 10 秒打印当前时间。
  5. '''
  6. def timedTask():
  7. while True:
  8. print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  9. time.sleep(10)
  10. if __name__ == '__main__':
  11. timedTask()

2)每天八点执行一次

  1. import datetime
  2. import threading
  3. def func():
  4. print("haha")
  5. #如果需要循环调用,就要添加以下方法
  6. timer = threading.Timer(86400, func)
  7. timer.start()
  8. # 获取现在时间
  9. now_time = datetime.datetime.now()
  10. # 获取明天时间
  11. next_time = now_time + datetime.timedelta(days=+1)
  12. next_year = next_time.date().year
  13. next_month = next_time.date().month
  14. next_day = next_time.date().day
  15. # 获取明天8点时间
  16. next_time = datetime.datetime.strptime(str(next_year)+"-"+str(next_month)+"-"+str(next_day)+" 08:00:00", "%Y-%m-%d %H:%M:%S")
  17. # # 获取昨天时间
  18. # last_time = now_time + datetime.timedelta(days=-1)
  19. # 获取距离明天8点时间,单位为秒
  20. timer_start_time = (next_time - now_time).total_seconds()
  21. print(timer_start_time)
  22. while True:
  23. func()
  24. time.sleep(timer_start_time)

这种方法能够执行固定间隔时间的任务。如果timedTask()函数之后还有些操作,我们还使用死循环 + 阻塞线程。这会使得timedTask()一直占有 CPU 资源,导致后续操作无法执行。谨慎使用!

2.使用Python 标准库 threading 中的 Timer 类

  1. from datetime import datetime
  2. from threading import Timer
  3. import time
  4. '''
  5. 每个 10 秒打印当前时间。
  6. '''
  7. def timedTask():
  8. '''
  9. 第一个参数: 延迟多长时间执行任务(单位: 秒)
  10. 第二个参数: 要执行的任务, 即函数
  11. 第三个参数: 调用函数的参数(tuple)
  12. '''
  13. Timer(10, task, ()).start()
  14. # 定时任务
  15. def task():
  16. print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  17. if __name__ == '__main__':
  18. timedTask()
  19. while True:
  20. print(time.time())
  21. time.sleep(5)

3.使用标准库中sched模块

①首先构造一个sched.scheduler类

它接受两个参数:timefunc和 delayfunc。timefunc 应该返回一个数字,代表当前时间,delayfunc 函数接受一个参数,用于暂停运行的时间单元。

一般使用默认参数就行,即传入这两个参数 time.time 和 time.sleep.当然,你也可以自己实现时间暂停的函数。

②添加调度任务

scheduler 提供了两个添加调度任务的函数:

enter(delay, priority, action, argument=(), kwargs={})

该函数可以延迟一定时间执行任务。delay 表示延迟多长时间执行任务,单位是秒。priority为优先级,越小优先级越大。两个任务指定相同的延迟时间,优先级大的任务会向被执行。action 即需要执行的函数,argument 和 kwargs 分别是函数的位置和关键字参数。

scheduler.enterabs(time, priority, action, argument=(), kwargs={})

③把任务运行起来

调用 scheduler.run()函数

  1. from datetime import datetime
  2. import sched
  3. import time
  4. '''
  5. 每个 10 秒打印当前时间。
  6. '''
  7. def timedTask():
  8. # 初始化 sched 模块的 scheduler 类
  9. scheduler = sched.scheduler(time.time, time.sleep)
  10. # 增加调度任务
  11. scheduler.enter(10, 1, task)
  12. # 运行任务
  13. scheduler.run()
  14. # 定时任务
  15. def task():
  16. print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  17. if __name__ == '__main__':
  18. timedTask()

4.使用定时任务调度的库:schedule

  1. import schedule
  2. import time
  3. def job():
  4. print("I'm working...")
  5. schedule.every(10).minutes.do(job)#每隔十分钟执行一次任务
  6. schedule.every().hour.do(job)#每隔一小时执行一次任务
  7. schedule.every().day.at("10:30").do(job)#每天的10:30执行一次任务
  8. schedule.every(5).to(10).days.do(job)#每隔5到10天执行一次任务
  9. schedule.every().monday.do(job)#每周一的这个时候执行一次任务
  10. schedule.every().wednesday.at("13:15").do(job)#每周三13:15执行一次任务
  11. while True:
  12. schedule.run_pending()
  13. time.sleep(1)

如果函数中带有参数

  1. import schedule
  2. import time
  3. def job(name):
  4. print("her name is : ", name)
  5. name = '小明’
  6. schedule.every(10).minutes.do(job, name)
  7. schedule.every().hour.do(job, name)
  8. schedule.every().day.at("10:30").do(job, name)
  9. schedule.every(5).to(10).days.do(job, name)
  10. schedule.every().monday.do(job, name)
  11. schedule.every().wednesday.at("13:15").do(job, name)
  12. while True:
  13. schedule.run_pending()
  14. time.sleep(1)

 

  • 发表于 2020-12-11 13:54
  • 阅读 ( 491 )
  • 分类:Python开发

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
Pack
Pack

1135 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1470 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章