FastAPI + Celery信頼性設計: 非同期ジョブを本番で壊さないための実装パターン

FastAPIでAPIを作ると、重い処理はすぐに非同期ジョブへ逃がしたくなります。画像変換、レポート生成、外部API連携、メール配信など、Celeryは非常に便利です。ですが、本番で問題になるのは「動くかどうか」ではなく、失敗したときに壊れないか です。

  • 同じジョブが二重実行される
  • 一時障害で永遠にリトライしてキューが詰まる
  • ワーカー再起動で中途半端な状態が残る
  • 完了通知が先に飛んで実データがない

本記事では FastAPI + Celery + Redis 構成を前提に、再実行安全性(idempotency)と運用信頼性を上げる実装手順をまとめます。

1. まず守るべき設計原則

非同期基盤の事故は、ほぼ次の4原則で防げます。

  1. At-least-once前提(同一タスク再実行は必ず起こる)
  2. 副作用は冪等化(何回実行されても結果が壊れない)
  3. 状態遷移を明示(PENDING/RUNNING/SUCCEEDED/FAILED)
  4. 失敗を可観測化(リトライ回数・死活・滞留時間を計測)

この原則を外すと、障害時に「何が完了して何が未完了か」が追えなくなります。

2. 参照アーキテクチャ

  • API: FastAPI
  • Queue Broker: Redis
  • Worker: Celery
  • Result Store: PostgreSQL(業務状態)
  • Monitoring: Flower + Prometheus + Sentry

ポイントは、業務上重要な状態はRedis結果バックエンドに依存しない ことです。Redisは一時的に使い、真実の状態はRDBに持たせます。

3. 実装の土台: タスク受付API

3.1 受け付け時に idempotency_key を必須化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sqlalchemy import select

app = FastAPI()

class JobRequest(BaseModel):
    idempotency_key: str
    report_type: str
    user_id: str

@app.post("/reports")
def create_report(req: JobRequest):
    existing = find_job_by_key(req.idempotency_key)
    if existing:
        return {"job_id": existing.id, "status": existing.status}

    job = create_job_record(
        idempotency_key=req.idempotency_key,
        status="PENDING",
        report_type=req.report_type,
        user_id=req.user_id,
    )
    generate_report.delay(job.id)
    return {"job_id": job.id, "status": "PENDING"}

これでクライアント再送が来てもジョブ多重作成を防げます。

3.2 DB制約で最終防衛線を張る

idempotency_key に UNIQUE 制約を入れ、アプリバグ時も二重作成を防ぎます。

1
ALTER TABLE async_jobs ADD CONSTRAINT uq_async_jobs_idempotency UNIQUE (idempotency_key);

4. Celeryタスクの実践設定

4.1 推奨設定

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from celery import Celery

celery_app = Celery(
    "worker",
    broker="redis://redis:6379/0",
    backend="redis://redis:6379/1",
)

celery_app.conf.update(
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    worker_prefetch_multiplier=1,
    task_time_limit=900,
    task_soft_time_limit=840,
    task_default_retry_delay=30,
    task_routes={"tasks.generate_report": {"queue": "reports"}},
)
  • acks_late=True: 実行完了後にACK。途中クラッシュ時は再配信
  • prefetch_multiplier=1: 取り込み過多を防ぎ、偏りを減らす
  • time limit: ハング抑止

4.2 リトライは指数バックオフ + 上限

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@celery_app.task(
    bind=True,
    autoretry_for=(TemporaryExternalError,),
    retry_backoff=True,
    retry_backoff_max=300,
    retry_jitter=True,
    max_retries=7,
)
def generate_report(self, job_id: str):
    ...

無制限リトライは障害増幅装置です。必ず上限を設定します。

5. 冪等タスクの実装パターン

5.1 状態遷移をトランザクションで管理

1
2
3
4
5
6
7
8
def start_job(job_id: str):
    with session.begin():
        job = session.get(Job, job_id, with_for_update=True)
        if job.status in ("RUNNING", "SUCCEEDED"):
            return False
        job.status = "RUNNING"
        job.started_at = utcnow()
    return True

FOR UPDATE を使い、同時実行で状態が競合しないようにします。

5.2 副作用前に“実行済みチェック”

外部API呼び出しやファイル生成前に、既に成果物が存在するか確認します。

  • 既に同名レポートが生成済みならスキップ
  • 外部通知は送信履歴テーブルで重複防止
  • 決済や課金は必ず業務ID単位で一意化

5.3 完了処理はCompare-and-Setで確定

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def complete_job(job_id: str, result_url: str):
    updated = session.execute(
        """
        UPDATE async_jobs
        SET status='SUCCEEDED', result_url=:url, finished_at=now()
        WHERE id=:id AND status='RUNNING'
        """,
        {"id": job_id, "url": result_url},
    )
    return updated.rowcount == 1

これで二重完了更新を防げます。

6. 失敗時の設計(DLQ相当の運用)

Celeryに“標準DLQ”はありませんが、実運用では次の形で代替できます。

  • リトライ上限超過時に FAILED_PERMANENT へ遷移
  • 失敗理由とスタックトレースをDB保存
  • 再実行API(手動リカバリ)を提供
  • 重大失敗はSentry + Pagerで通知

この構成で「黙って死ぬジョブ」をなくせます。

7. 監視設計(最低限)

メトリクス

  • キュー滞留数(queue length)
  • oldest message age
  • タスク成功率 / 失敗率
  • p95 実行時間
  • リトライ回数分布

アラート例

  • 滞留数が通常の3倍を10分継続
  • 失敗率 > 5% が15分継続
  • oldest message age > 20分
  • worker heartbeat消失

「CPU高い」より「キューが古い」がユーザー影響に直結します。

8. デプロイ時の落とし穴

8.1 ローリング更新での重複実行

  • acks_late + graceful shutdown を設定
  • TERM 後にタスク完了待ち時間を確保
  • 長時間ジョブは分割し、中断耐性を持たせる

8.2 スキーマ変更の順序

非同期基盤では、ワーカーとAPIが異なるバージョンで同居します。

安全な順序:

  1. 先に後方互換なDB変更を適用
  2. ワーカーを先に更新
  3. APIを更新
  4. 非互換削除は次リリースで

これを守らないと、古いタスクが新スキーマで失敗します。

9. ローカル・ステージングでの検証手順

  1. 正常系: ジョブ作成→完了→結果取得
  2. 再送系: 同一 idempotency_key で2回POST
  3. 障害系: 外部APIタイムアウトを強制しリトライ確認
  4. クラッシュ系: 実行中にworker再起動し再配信確認
  5. 負荷系: 1000ジョブ投入で滞留時間と失敗率確認

この5ケースを自動テストに入れるだけで、運用品質は大幅に上がります。

10. 本番チェックリスト

  • idempotency_key のUNIQUE制約あり
  • 冪等な状態遷移実装(RUNNING/SUCCEEDED)
  • リトライ上限 + バックオフ設定済み
  • 手動再実行導線あり
  • 失敗通知(Sentry/Pager)有効
  • 滞留監視とアラート運用あり
  • デプロイ手順に互換性ルール明記

まとめ

FastAPI + Celeryの本質は、非同期化そのものではなく 失敗しても壊れない設計 にあります。

  • At-least-once を前提に設計する
  • 冪等性をDB制約と状態遷移で担保する
  • リトライと監視を“運用可能”な形で実装する
  • デプロイ時のバージョン混在を想定する

ここまで作り込むと、ジョブ基盤は「たまに落ちるブラックボックス」から「予測可能に運用できるインフラ」へ変わります。まずは idempotency_key と状態遷移の明確化から始めるのがおすすめです。