在 Python 项目中,经常会遇到“请求进来后不想立即处理完所有工作”的场景。例如发送通知、同步第三方数据、生成报表、处理图片、写入慢速日志等。如果这些耗时操作都放在主请求链路里,接口响应会变慢,也更容易因为外部依赖波动而失败。
对于大型系统,可以使用 Celery、RQ、Kafka 或云消息队列。但在小型服务或内部工具里,有时只需要一个进程内的异步任务队列。本文使用 asyncio.Queue 实现一个简单版本。
适用场景和边界
进程内队列适合:
- 任务量较小。
- 允许服务重启后丢失未完成任务。
- 单进程或少量进程部署。
- 主要目的是削峰和解耦请求耗时。
它不适合:
- 订单支付、扣库存等必须可靠执行的任务。
- 多实例之间需要共享任务。
- 需要失败重试、延迟任务、任务状态查询的复杂场景。
如果任务不能丢,应该使用外部消息队列或数据库任务表。
定义任务结构
先定义一个任务对象:
1 2 3 4 5 6 7 8 9
| from dataclasses import dataclass, field from typing import Any import time
@dataclass class Task: name: str payload: dict[str, Any] created_at: float = field(default_factory=time.time)
|
name 用于区分任务类型,payload 存放任务参数。
实现任务队列
核心是 asyncio.Queue:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| import asyncio import logging
logger = logging.getLogger(__name__)
class TaskQueue: def __init__(self, maxsize: int = 1000): self.queue: asyncio.Queue[Task] = asyncio.Queue(maxsize=maxsize) self.workers: list[asyncio.Task] = [] self.running = False
async def start(self, worker_count: int = 2) -> None: self.running = True for index in range(worker_count): worker = asyncio.create_task(self._worker(index)) self.workers.append(worker)
async def stop(self) -> None: self.running = False await self.queue.join()
for worker in self.workers: worker.cancel()
await asyncio.gather(*self.workers, return_exceptions=True)
async def submit(self, task: Task) -> None: await self.queue.put(task)
async def _worker(self, index: int) -> None: while self.running: task = await self.queue.get() try: await self.handle(task) except Exception: logger.exception("task failed worker=%s task=%s", index, task.name) finally: self.queue.task_done()
async def handle(self, task: Task) -> None: if task.name == "send_email": await self.send_email(task.payload) elif task.name == "sync_user": await self.sync_user(task.payload) else: logger.warning("unknown task name=%s", task.name)
async def send_email(self, payload: dict[str, Any]) -> None: await asyncio.sleep(1) logger.info("email sent to=%s", payload["to"])
async def sync_user(self, payload: dict[str, Any]) -> None: await asyncio.sleep(1) logger.info("user synced user_id=%s", payload["user_id"])
|
queue.join() 会等待所有已经入队的任务执行完成。task_done() 必须在每个任务处理结束后调用,否则 join() 会一直等待。
提交任务
在普通异步函数中提交任务:
1 2 3 4 5 6 7 8 9 10 11 12
| task_queue = TaskQueue()
async def create_user(user_id: str, email: str) -> None: await task_queue.submit(Task( name="send_email", payload={"to": email, "template": "welcome"}, ))
await task_queue.submit(Task( name="sync_user", payload={"user_id": user_id}, ))
|
提交任务本身很快,请求链路不需要等待邮件发送和外部同步完成。
和 FastAPI 集成
FastAPI 可以在应用生命周期中启动和停止队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from contextlib import asynccontextmanager from fastapi import FastAPI
task_queue = TaskQueue()
@asynccontextmanager async def lifespan(app: FastAPI): await task_queue.start(worker_count=2) yield await task_queue.stop()
app = FastAPI(lifespan=lifespan)
@app.post("/users") async def create_user_api(): await task_queue.submit(Task( name="send_email", payload={"to": "demo@example.com"}, )) return {"ok": True}
|
服务启动时创建 worker,关闭时等待队列中已有任务完成,然后取消 worker。
增加超时控制
某些任务可能卡住,建议给任务处理加超时:
1 2 3 4 5 6 7 8 9 10 11
| async def _worker(self, index: int) -> None: while self.running: task = await self.queue.get() try: await asyncio.wait_for(self.handle(task), timeout=30) except asyncio.TimeoutError: logger.error("task timeout worker=%s task=%s", index, task.name) except Exception: logger.exception("task failed worker=%s task=%s", index, task.name) finally: self.queue.task_done()
|
超时时间要根据任务类型设置。发送邮件和外部 API 调用通常应该有较短超时,报表生成可能需要更长时间。
背压与队列满
asyncio.Queue(maxsize=1000) 可以限制队列长度。如果生产速度超过消费速度,queue.put 会等待。对于接口请求来说,这相当于一种背压。
如果不希望请求等待太久,可以使用:
1 2 3 4
| try: task_queue.queue.put_nowait(task) except asyncio.QueueFull: logger.warning("task queue full task=%s", task.name)
|
队列满时可以返回错误、降级处理,或者把任务写入外部存储等待后续补偿。
小结
asyncio.Queue 可以快速实现一个轻量级异步任务队列,适合内部工具和低可靠性要求的后台任务。关键点是明确适用边界,启动固定数量 worker,处理异常和超时,关闭时等待已有任务完成。对于不能丢失的核心业务任务,应使用外部消息队列或持久化任务表。