Python asyncioバックプレッシャー設計:落ちない非同期バッチを作る実装パターン

asyncio は速く作れる一方で、負荷が上がった瞬間に崩壊する設計を作りやすいという側面があります。特に「処理待ちが無限に積み上がる」「外部API遅延で全体が詰まる」「リトライ嵐でさらに遅くなる」は典型的です。

本記事では、非同期ワーカーを本番運用する前提で、バックプレッシャーを実装に落とす方法を解説します。単なる概念ではなく、すぐ使えるコード断片を中心に進めます。

1. なぜバックプレッシャーが必要か

バックプレッシャーは「これ以上は受けない」仕組みです。これがない設計は、ピーク時に次の順で壊れます。

  1. 入力が処理速度を超える
  2. キューが無限増加してメモリ圧迫
  3. GC増加でスループット低下
  4. タイムアウト増加→リトライ増加
  5. システム全体が雪崩れる

つまり、受けすぎないことは性能ではなく可用性の話です。

2. 基本設計:3つの制限を必ず入れる

2-1. キュー上限(bounded queue)

1
2
3
4
import asyncio

QUEUE_MAX = 1000
queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=QUEUE_MAX)

maxsize なしは原則禁止です。業務要件で「捨てられない」場合でも、無限キューより「受け付け停止 + 明示エラー」のほうが復旧可能です。

2-2. 同時実行数上限(semaphore)

1
2
3
4
5
6
CONCURRENCY = 20
semaphore = asyncio.Semaphore(CONCURRENCY)

async def guarded_call(fn, *args, **kwargs):
    async with semaphore:
        return await fn(*args, **kwargs)

CPU でも I/O でも、同時実行数に上限を持たせると遅延の尾が短くなります。

2-3. タイムアウト(timeout budget)

1
2
3
4
import asyncio

async def with_timeout(coro, timeout_sec: float):
    return await asyncio.wait_for(coro, timeout=timeout_sec)

タイムアウトは「短すぎるか長すぎるか」ではなく、上流SLOから逆算します。例えば API 全体予算が 1.5 秒なら、外部API呼び出しを 600ms に固定し、残りをローカル処理に残す、という考え方です。

3. 実践ワーカーパターン(そのまま使える)

以下は、キュー + 複数ワーカー + 再試行 + DLQ(Dead Letter Queue)を備えた最小構成です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class Task:
    payload: dict[str, Any]
    retry: int = 0

MAX_RETRY = 3
QUEUE_MAX = 1000
WORKERS = 16

main_queue: asyncio.Queue[Task] = asyncio.Queue(maxsize=QUEUE_MAX)
dlq: asyncio.Queue[Task] = asyncio.Queue()
sem = asyncio.Semaphore(32)

async def process_payload(payload: dict[str, Any]) -> None:
    # 外部API呼び出しやDB処理を想定
    await asyncio.sleep(0.05)

async def worker(worker_id: int):
    while True:
        task = await main_queue.get()
        try:
            async with sem:
                await asyncio.wait_for(process_payload(task.payload), timeout=1.0)
        except Exception:
            if task.retry < MAX_RETRY:
                task.retry += 1
                await asyncio.sleep(0.1 * (2 ** task.retry))  # 指数バックオフ
                await main_queue.put(task)
            else:
                await dlq.put(task)
        finally:
            main_queue.task_done()

async def run():
    workers = [asyncio.create_task(worker(i)) for i in range(WORKERS)]
    try:
        await main_queue.join()
    finally:
        for w in workers:
            w.cancel()

重要なのは、再試行回数を有限にし、失敗タスクを DLQ に逃がす点です。無限リトライは障害時に自爆装置になります。

4. 入力側での「受けすぎ防止」

ワーカーが健全でも、入口が無制限なら負けます。API であれば次の制御を入れます。

  • 受け付けキュー残量が閾値超えなら 429 を返す
  • tenant 単位でレート制限を分離
  • 優先度キュー(重要ジョブを優先)

4-1. FastAPIでの簡易例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.post("/enqueue")
async def enqueue(item: dict):
    if main_queue.qsize() > int(QUEUE_MAX * 0.9):
        raise HTTPException(status_code=429, detail="queue saturated")
    await main_queue.put(Task(payload=item))
    return {"accepted": True}

「受け付けない」ことはユーザー体験を悪化させるように見えますが、全体停止よりはるかに安全です。429 を返す場合は Retry-After を併せて返し、クライアント再送間隔を制御します。

5. 観測性:最低限見るべき4指標

バックプレッシャーは入れただけでは不十分で、監視しないと調整できません。

  1. queue_depth(キュー長)
  2. processing_latency_p95
  3. timeout_rate
  4. dlq_rate

5-1. アラート基準の実例

  • queue_depth > 80% が 5分継続
  • timeout_rate > 2% が 10分継続
  • dlq_rate が平常時の 3倍超

アラートは「発火しやすい」より「行動が決まる」閾値を優先します。鳴るたびに対応が変わる設定は、運用疲労を生みます。

6. 負荷試験で必ず確認する項目

本番前に k6 や Locust で負荷試験を行い、次を確認します。

  • 1.5x 負荷で queue_depth が収束するか
  • 2.0x 負荷で 429 が適切に返るか
  • 外部API遅延を注入しても DLQ へ逃がせるか
  • 復帰後に backlog を解消できるか

6-1. テスト時の失敗パターン

  • セマフォ上限を増やしすぎ、下流DBが先に死ぬ
  • タイムアウト短縮だけで成功率が急落
  • リトライ間隔が短すぎて輻輳を悪化

負荷試験は「最大スループット競争」ではなく「壊れ方の確認」です。

7. 運用で効く改善の順番

改善は次の順番でやると効果が出やすいです。

  1. キュー上限と429制御を導入
  2. 同時実行数とタイムアウトを固定
  3. DLQと再処理ジョブを作る
  4. 指標とアラートを整備
  5. tenant別の公平制御(重い顧客の分離)

最初から完璧なスケジューラは不要です。まず「壊れない最小構成」を作り、その上で最適化します。

まとめ

asyncio の本質的な課題は、速さではなく「過負荷時の振る舞い」です。バックプレッシャーは、ピーク時に品質を守るための安全装置であり、設計段階で必ず入れるべきです。

  • 無限キューを禁止する
  • 同時実行数を固定する
  • タイムアウトと再試行を予算化する
  • 失敗はDLQに逃がす

この4点を実装すれば、負荷が来ても「遅くなるだけ」で済み、止まらないシステムに近づきます。安定運用を目指すなら、まずは今日中に maxsizeSemaphore をコードに入れるところから始めてください。