FastAPI + Celery信頼性設計: 非同期ジョブを本番で壊さないための実装パターン
FastAPI + Celery信頼性設計: 非同期ジョブを本番で壊さないための実装パターン FastAPIでAPIを作ると、重い処理はすぐに非同期ジョブへ逃がしたくなります。画像変換、レポート生成、外部API連携、メール配信など、Celeryは非常に便利です。ですが、本番で問題になるのは「動くかどうか」ではなく、失敗したときに壊れないか です。 同じジョブが二重実行される 一時障害で永遠にリトライしてキューが詰まる ワーカー再起動で中途半端な状態が残る 完了通知が先に飛んで実データがない 本記事では FastAPI + Celery + Redis 構成を前提に、再実行安全性(idempotency)と運用信頼性を上げる実装手順をまとめます。 1. まず守るべき設計原則 非同期基盤の事故は、ほぼ次の4原則で防げます。 At-least-once前提(同一タスク再実行は必ず起こる) 副作用は冪等化(何回実行されても結果が壊れない) 状態遷移を明示(PENDING/RUNNING/SUCCEEDED/FAILED) 失敗を可観測化(リトライ回数・死活・滞留時間を計測) この原則を外すと、障害時に「何が完了して何が未完了か」が追えなくなります。 2. 参照アーキテクチャ API: FastAPI Queue Broker: Redis Worker: Celery Result Store: PostgreSQL(業務状態) Monitoring: Flower + Prometheus + Sentry ポイントは、業務上重要な状態はRedis結果バックエンドに依存しない ことです。Redisは一時的に使い、真実の状態はRDBに持たせます。 3. 実装の土台: タスク受付API 3.1 受け付け時に idempotency_key を必須化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from fastapi import FastAPI, HTTPException from pydantic import BaseModel from sqlalchemy import select app = FastAPI() class JobRequest(BaseModel): idempotency_key: str report_type: str user_id: str @app.post("/reports") def create_report(req: JobRequest): existing = find_job_by_key(req.idempotency_key) if existing: return {"job_id": existing.id, "status": existing.status} job = create_job_record( idempotency_key=req.idempotency_key, status="PENDING", report_type=req.report_type, user_id=req.user_id, ) generate_report.delay(job.id) return {"job_id": job.id, "status": "PENDING"} これでクライアント再送が来てもジョブ多重作成を防げます。 ...