在 Python 项目中,经常会遇到“请求进来后不想立即处理完所有工作”的场景。例如发送通知、同步第三方数据、生成报表、处理图片、写入慢速日志等。如果这些耗时操作都放在主请求链路里,接口响应会变慢,也更容易因为外部依赖波动而失败。

对于大型系统,可以使用 Celery、RQ、Kafka 或云消息队列。但在小型服务或内部工具里,有时只需要一个进程内的异步任务队列。本文使用 asyncio.Queue 实现一个简单版本。

适用场景和边界

进程内队列适合:

  • 任务量较小。
  • 允许服务重启后丢失未完成任务。
  • 单进程或少量进程部署。
  • 主要目的是削峰和解耦请求耗时。

它不适合:

  • 订单支付、扣库存等必须可靠执行的任务。
  • 多实例之间需要共享任务。
  • 需要失败重试、延迟任务、任务状态查询的复杂场景。

如果任务不能丢,应该使用外部消息队列或数据库任务表。

定义任务结构

先定义一个任务对象:

1
2
3
4
5
6
7
8
9
from dataclasses import dataclass, field
from typing import Any
import time

@dataclass
class Task:
name: str
payload: dict[str, Any]
created_at: float = field(default_factory=time.time)

name 用于区分任务类型,payload 存放任务参数。

实现任务队列

核心是 asyncio.Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import asyncio
import logging

logger = logging.getLogger(__name__)

class TaskQueue:
def __init__(self, maxsize: int = 1000):
self.queue: asyncio.Queue[Task] = asyncio.Queue(maxsize=maxsize)
self.workers: list[asyncio.Task] = []
self.running = False

async def start(self, worker_count: int = 2) -> None:
self.running = True
for index in range(worker_count):
worker = asyncio.create_task(self._worker(index))
self.workers.append(worker)

async def stop(self) -> None:
self.running = False
await self.queue.join()

for worker in self.workers:
worker.cancel()

await asyncio.gather(*self.workers, return_exceptions=True)

async def submit(self, task: Task) -> None:
await self.queue.put(task)

async def _worker(self, index: int) -> None:
while self.running:
task = await self.queue.get()
try:
await self.handle(task)
except Exception:
logger.exception("task failed worker=%s task=%s", index, task.name)
finally:
self.queue.task_done()

async def handle(self, task: Task) -> None:
if task.name == "send_email":
await self.send_email(task.payload)
elif task.name == "sync_user":
await self.sync_user(task.payload)
else:
logger.warning("unknown task name=%s", task.name)

async def send_email(self, payload: dict[str, Any]) -> None:
await asyncio.sleep(1)
logger.info("email sent to=%s", payload["to"])

async def sync_user(self, payload: dict[str, Any]) -> None:
await asyncio.sleep(1)
logger.info("user synced user_id=%s", payload["user_id"])

queue.join() 会等待所有已经入队的任务执行完成。task_done() 必须在每个任务处理结束后调用,否则 join() 会一直等待。

提交任务

在普通异步函数中提交任务:

1
2
3
4
5
6
7
8
9
10
11
12
task_queue = TaskQueue()

async def create_user(user_id: str, email: str) -> None:
await task_queue.submit(Task(
name="send_email",
payload={"to": email, "template": "welcome"},
))

await task_queue.submit(Task(
name="sync_user",
payload={"user_id": user_id},
))

提交任务本身很快,请求链路不需要等待邮件发送和外部同步完成。

和 FastAPI 集成

FastAPI 可以在应用生命周期中启动和停止队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from contextlib import asynccontextmanager
from fastapi import FastAPI

task_queue = TaskQueue()

@asynccontextmanager
async def lifespan(app: FastAPI):
await task_queue.start(worker_count=2)
yield
await task_queue.stop()

app = FastAPI(lifespan=lifespan)

@app.post("/users")
async def create_user_api():
await task_queue.submit(Task(
name="send_email",
payload={"to": "demo@example.com"},
))
return {"ok": True}

服务启动时创建 worker,关闭时等待队列中已有任务完成,然后取消 worker。

增加超时控制

某些任务可能卡住,建议给任务处理加超时:

1
2
3
4
5
6
7
8
9
10
11
async def _worker(self, index: int) -> None:
while self.running:
task = await self.queue.get()
try:
await asyncio.wait_for(self.handle(task), timeout=30)
except asyncio.TimeoutError:
logger.error("task timeout worker=%s task=%s", index, task.name)
except Exception:
logger.exception("task failed worker=%s task=%s", index, task.name)
finally:
self.queue.task_done()

超时时间要根据任务类型设置。发送邮件和外部 API 调用通常应该有较短超时,报表生成可能需要更长时间。

背压与队列满

asyncio.Queue(maxsize=1000) 可以限制队列长度。如果生产速度超过消费速度,queue.put 会等待。对于接口请求来说,这相当于一种背压。

如果不希望请求等待太久,可以使用:

1
2
3
4
try:
task_queue.queue.put_nowait(task)
except asyncio.QueueFull:
logger.warning("task queue full task=%s", task.name)

队列满时可以返回错误、降级处理,或者把任务写入外部存储等待后续补偿。

小结

asyncio.Queue 可以快速实现一个轻量级异步任务队列,适合内部工具和低可靠性要求的后台任务。关键点是明确适用边界,启动固定数量 worker,处理异常和超时,关闭时等待已有任务完成。对于不能丢失的核心业务任务,应使用外部消息队列或持久化任务表。