page contents

Python 任务排队还能这么简单?一文带你了解 simpleppyq

simplepyq 是一个轻量级任务队列库,专为需要后台任务的小型 Python 项目而设计执行,而无需使用 Celery、Airflow 或 Redis 等繁重的工具。它使用 SQLite 完成任务 持久性,确保任务在应用程序重启后继续存在,并提供任务通道、自动 重试和 Dynamic Task Deferral 的 Rets作。凭借最小的依赖项,simplepyq 易于设置,非常适合 需要简单、可靠的任务队列的应用程序。

attachments-2026-01-jDlejWqU697c0acb217d3.pngsimplepyq 是一个轻量级任务队列库,专为需要后台任务的小型 Python 项目而设计执行,而无需使用 Celery、Airflow 或 Redis 等繁重的工具。它使用 SQLite 完成任务 持久性,确保任务在应用程序重启后继续存在,并提供任务通道、自动 重试和 Dynamic Task Deferral 的 Rets作。凭借最小的依赖项,simplepyq 易于设置,非常适合 需要简单、可靠的任务队列的应用程序。

特性

  • • 通道:通过将任务与特定功能相关联来组织任务,从而实现分组任务处理。
  • • 持久性:将任务存储在 SQLite 数据库中,以确保它们在应用程序重新启动或崩溃期间不会丢失。
  • • 重试:自动重试失败的任务指定次数,从而提高瞬态错误的弹性。
  • • DelayException:将任务动态延迟指定的持续时间,从而允许根据运行时条件进行灵活调度。
  • • 简单的设置:最少的依赖项和简单的 API,只需要 Python 和 msgpack。
  • • 任务管理:用于清除失败任务、重新排队或删除整个通道的工具,从而提供对任务生命周期的控制。

概念

Channels
simplepyq 中的通道允许按任务的用途或关联功能对任务进行分组。每个频道都已链接 添加到处理任务的特定 Python 函数中,可以为每个 渠道。这对于分隔不同类型的任务非常有用,例如用于发送电子邮件的 “email” 和 “image_processing” 用于处理图像上传,确保有序和并行的任务执行。

持久化
任务存储在 SQLite 数据库中,该数据库提供轻量级持久性,而无需外部系统。 每个任务都与其通道、参数、状态(待处理、正在运行、延迟、完成或失败)、重试和 可选的延迟时间戳。这可确保在应用程序重新启动时任务不会丢失,从而使 simplepyq 可靠 用于长时间运行的作。

重试
当任务引发异常时,simplepyq 可以根据指定的重试计数自动重试它。这是 特别适用于处理暂时性故障。如果重试次数已用尽,则任务将被标记为 “失败” 稍后检查或重新排队。

DelayException 异常
DelayException 允许通过引发具有指定延迟(以秒为单位)的异常来动态延迟任务。 这对于速率受限的 API 等场景非常有用,其中任务需要在重试之前等待,或者用于计划任务 以稍后运行。任务被标记为 “delayed” ,并在延迟期到期时自动重新排队。

任务管理
SimplePyQ 提供了有效管理任务的方法: - clear_failed:从数据库中删除失败的任务。 - requeue_failed:使用原始或新的重试计数对失败的任务重新排队。 - remove_channel:删除频道及其所有任务。 - 停止和run_until_complete:控制调度程序的执行,在后台运行任务或直到所有任务完成。

安装

通过 pip 安装 simplepyq:

pip install simplepyq

使用示例

以下是演示 simplepyq 每个功能的示例,旨在突出其在实际场景中的功能。

1. 使用通道进行基本任务队列

将任务组织到一个频道中进行 Web 抓取,在后台处理 URL。

from simplepyq import SimplePyQ

def scrape_url(args):
    url = args["url"]
    print(f"Scraping {url}")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("scrape", scrape_url, max_workers=2)  # Two workers for parallel scraping
scheduler.enqueue("scrape", {"url": "https://example.com"})
scheduler.enqueue("scrape", {"url": "https://example.org"})
scheduler.start()  # Runs in the background
# Tasks are processed concurrently by two worker threads

说明:使用 Scrape 通道创建时具有处理 URL 的函数,并且两个工作程序允许并行执行。任务与参数一起排队并异步处理。

2. 任务重试以实现弹性

使用自动重试处理暂时性故障,例如网络问题。

from simplepyq import SimplePyQ
import requests

def fetch_data(args):
    url = args["url"]
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception("Failed to fetch data")
    print(f"Fetched data from {url}")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("fetch", fetch_data)
scheduler.enqueue("fetch", {"url": "https://api.example.com/data"}, retries=3)  # Retry up to 3 times
scheduler.run_until_complete()  # Runs until all tasks are complete

说明:如果 API 调用失败,则任务在标记为失败之前最多重试 3 次,以确保对临时问题的弹性。

3.带有 DelayException 的动态任务延迟

动态延迟任务,对于速率受限的 API 非常有用。

from simplepyq import SimplePyQ, DelayException

def call_api(args):
    url = args["url"]
    response = requests.get(url)
    if response.status_code == 403:
        raise DelayException(60)  # Wait 60 seconds before retrying
    print(f"Calling {url}")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("api", call_api)
scheduler.enqueue("api", {"url": "https://api.example.com/rate_limit"})
scheduler.start()  # Task will be deferred for 60 seconds if rate-limited

说明:DelayException 将任务延迟 60 秒,以允许遵守速率限制或安排稍后重试。

4. 清除失败的任务

删除失败的任务以清理数据库。

from simplepyq import SimplePyQ

def risky_task(args):
    raise Exception("Task failed intentionally")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("risky", risky_task)
scheduler.enqueue("risky", {"data": "test"}, retries=1)
scheduler.run_until_complete()  # Task fails after one retry
scheduler.clear_failed("risky")  # Remove failed tasks for the 'risky' channel

解释:任务失败且重试次数用尽后,clear_failed会将其从数据库中删除,以保持其干净。

5. 对失败的任务进行重新排队

将失败的任务重新排队以进行另一次尝试。

from simplepyq import SimplePyQ

attempts = 0

def flaky_task(args):
    global attempts
    if attempts < 2:  # Fail on first attempt
        attempts += 1
        raise Exception("Temporary failure")
    print("Task succeeded")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("flaky", flaky_task)
scheduler.enqueue("flaky", {}, retries=0)
scheduler.run_until_complete()  # Task fails
scheduler.requeue_failed("flaky", retries=1)  # Requeue with one retry
scheduler.run_until_complete()  # Task succeeds on second attempt

6. 删除通道

不再需要时删除频道及其任务。

from simplepyq import SimplePyQ

def temp_task(args):
    print(f"Processing {args['data']}")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("temp", temp_task)
scheduler.enqueue("temp", {"data": "test"})
scheduler.run_until_complete()
scheduler.remove_channel("temp")  # Removes channel and all its tasks

说明:临时通道及其任务已删除,可用于在不再需要任务类型时进行清理。

7. 运行直到完成

from simplepyq import SimplePyQ

def process_data(args):
    print(f"Processing {args['data']}")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("data", process_data)
scheduler.enqueue("data", {"data": "item1"})
scheduler.enqueue("data", {"data": "item2"})
scheduler.run_until_complete()  # Blocks until all tasks are done

说明:run_until_complete 处理所有任务并停止调度程序,非常适合脚本或批处理。

以上就是simplepyq的全部介绍了,简单容易上手,可以用在不需要太复杂的项目里。

更多相关技术内容咨询欢迎前往并持续关注好学星城论坛了解详情。

想高效系统的学习Python编程语言,推荐大家关注一个微信公众号:Python编程学习圈。每天分享行业资讯、技术干货供大家阅读,关注即可免费领取整套Python入门到进阶的学习资料以及教程,感兴趣的小伙伴赶紧行动起来吧。

attachments-2022-05-rLS4AIF8628ee5f3b7e12.jpg

  • 发表于 2026-01-30 09:35
  • 阅读 ( 25 )
  • 分类:Python开发

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
Pack
Pack

1783 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 2228 文章
  3. Pack 1783 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章