Python asyncio実践ガイド:並行処理で処理速度を10倍にする具体的テクニック

はじめに Pythonで大量のAPI呼び出しやファイル操作を行う際、処理時間がボトルネックになることは珍しくありません。同期的な処理では、一つの操作が完了するまで次の処理を開始できないため、I/O待ち時間が積み重なってしまいます。 本記事では、Pythonのasyncioモジュールを使った非同期プログラミングについて、基礎概念から実践的なパターンまでを体系的に解説します。実際のプロジェクトで使える具体的なコード例を通じて、処理速度を劇的に改善する方法を学んでいきましょう。 asyncioの基本概念 イベントループとは asyncioの中心にあるのがイベントループです。イベントループは、非同期タスクの実行を管理し、I/O操作の完了を監視して、適切なタイミングでタスクを再開させる役割を担います。 1 2 3 4 5 6 7 8 9 import asyncio async def main(): print("Hello") await asyncio.sleep(1) print("World") # Python 3.7以降の推奨方法 asyncio.run(main()) async/awaitの仕組み async defで定義された関数はコルーチン関数となり、呼び出すとコルーチンオブジェクトを返します。awaitキーワードを使うことで、そのコルーチンの完了を待機しつつ、その間に他のタスクを実行できるようになります。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncio import aiohttp async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict: """URLからデータを非同期で取得""" async with session.get(url) as response: return { "url": url, "status": response.status, "content_length": len(await response.text()) } async def main(): urls = [ "https://api.github.com", "https://api.stripe.com", "https://api.openai.com" ] async with aiohttp.ClientSession() as session: # 全URLを並行して取得 tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(f"{result['url']}: {result['status']}") asyncio.run(main()) 実践パターン1:大量のAPI呼び出しを高速化 問題:同期処理での遅延 例えば、1000件のユーザーデータをAPIから取得する場合を考えます。1リクエストあたり100msかかるとすると、同期処理では100秒もの時間が必要です。 ...

March 8, 2026 · 6 min · AI2CORE 編集部

Python asyncioバックプレッシャー設計:落ちない非同期バッチを作る実装パターン

Python asyncioバックプレッシャー設計:落ちない非同期バッチを作る実装パターン asyncio は速く作れる一方で、負荷が上がった瞬間に崩壊する設計を作りやすいという側面があります。特に「処理待ちが無限に積み上がる」「外部API遅延で全体が詰まる」「リトライ嵐でさらに遅くなる」は典型的です。 本記事では、非同期ワーカーを本番運用する前提で、バックプレッシャーを実装に落とす方法を解説します。単なる概念ではなく、すぐ使えるコード断片を中心に進めます。 1. なぜバックプレッシャーが必要か バックプレッシャーは「これ以上は受けない」仕組みです。これがない設計は、ピーク時に次の順で壊れます。 入力が処理速度を超える キューが無限増加してメモリ圧迫 GC増加でスループット低下 タイムアウト増加→リトライ増加 システム全体が雪崩れる つまり、受けすぎないことは性能ではなく可用性の話です。 2. 基本設計:3つの制限を必ず入れる 2-1. キュー上限(bounded queue) 1 2 3 4 import asyncio QUEUE_MAX = 1000 queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=QUEUE_MAX) maxsize なしは原則禁止です。業務要件で「捨てられない」場合でも、無限キューより「受け付け停止 + 明示エラー」のほうが復旧可能です。 2-2. 同時実行数上限(semaphore) 1 2 3 4 5 6 CONCURRENCY = 20 semaphore = asyncio.Semaphore(CONCURRENCY) async def guarded_call(fn, *args, **kwargs): async with semaphore: return await fn(*args, **kwargs) CPU でも I/O でも、同時実行数に上限を持たせると遅延の尾が短くなります。 ...

March 1, 2026 · 2 min · AI2CORE 編集部