はじめに#
Pythonで大量のAPI呼び出しやファイル操作を行う際、処理時間がボトルネックになることは珍しくありません。同期的な処理では、一つの操作が完了するまで次の処理を開始できないため、I/O待ち時間が積み重なってしまいます。
本記事では、Pythonのasyncioモジュールを使った非同期プログラミングについて、基礎概念から実践的なパターンまでを体系的に解説します。実際のプロジェクトで使える具体的なコード例を通じて、処理速度を劇的に改善する方法を学んでいきましょう。
asyncioの基本概念#
イベントループとは#
asyncioの中心にあるのがイベントループです。イベントループは、非同期タスクの実行を管理し、I/O操作の完了を監視して、適切なタイミングでタスクを再開させる役割を担います。
1
2
3
4
5
6
7
8
9
|
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7以降の推奨方法
asyncio.run(main())
|
async/awaitの仕組み#
async defで定義された関数はコルーチン関数となり、呼び出すとコルーチンオブジェクトを返します。awaitキーワードを使うことで、そのコルーチンの完了を待機しつつ、その間に他のタスクを実行できるようになります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import asyncio
import aiohttp
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""URLからデータを非同期で取得"""
async with session.get(url) as response:
return {
"url": url,
"status": response.status,
"content_length": len(await response.text())
}
async def main():
urls = [
"https://api.github.com",
"https://api.stripe.com",
"https://api.openai.com"
]
async with aiohttp.ClientSession() as session:
# 全URLを並行して取得
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['url']}: {result['status']}")
asyncio.run(main())
|
実践パターン1:大量のAPI呼び出しを高速化#
問題:同期処理での遅延#
例えば、1000件のユーザーデータをAPIから取得する場合を考えます。1リクエストあたり100msかかるとすると、同期処理では100秒もの時間が必要です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# 同期処理(遅い)
import requests
import time
def fetch_users_sync(user_ids: list[int]) -> list[dict]:
results = []
for user_id in user_ids:
response = requests.get(f"https://api.example.com/users/{user_id}")
results.append(response.json())
return results
start = time.time()
users = fetch_users_sync(list(range(1, 101))) # 100件
print(f"同期処理: {time.time() - start:.2f}秒")
# 同期処理: 10.23秒
|
解決:asyncioで並行処理#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
import asyncio
import aiohttp
import time
from typing import Optional
async def fetch_user(
session: aiohttp.ClientSession,
user_id: int,
semaphore: asyncio.Semaphore
) -> Optional[dict]:
"""セマフォで同時接続数を制限しながらユーザー情報を取得"""
async with semaphore:
try:
async with session.get(
f"https://api.example.com/users/{user_id}",
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
return await response.json()
return None
except Exception as e:
print(f"Error fetching user {user_id}: {e}")
return None
async def fetch_users_async(user_ids: list[int], max_concurrent: int = 50) -> list[dict]:
"""非同期で複数ユーザーを取得(同時接続数制限付き)"""
semaphore = asyncio.Semaphore(max_concurrent)
connector = aiohttp.TCPConnector(
limit=max_concurrent,
limit_per_host=max_concurrent
)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_user(session, uid, semaphore) for uid in user_ids]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
# 実行
start = time.time()
users = asyncio.run(fetch_users_async(list(range(1, 101))))
print(f"非同期処理: {time.time() - start:.2f}秒")
# 非同期処理: 0.52秒(約20倍高速化)
|
重要ポイント:セマフォによる同時接続数制限#
無制限に並行リクエストを投げると、サーバーに過負荷をかけたり、コネクションプールが枯渇したりする問題が発生します。asyncio.Semaphoreを使って、同時実行数を適切に制限することが重要です。
実践パターン2:タイムアウトとリトライの実装#
本番環境では、ネットワーク障害やサーバーの一時的な過負荷に対応するため、適切なタイムアウトとリトライ機構が必要です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
import asyncio
import aiohttp
from typing import TypeVar, Callable, Any
from functools import wraps
T = TypeVar('T')
def async_retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
exponential_base: float = 2.0,
retryable_exceptions: tuple = (aiohttp.ClientError, asyncio.TimeoutError)
):
"""非同期関数用のリトライデコレータ(指数バックオフ付き)"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt < max_retries:
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
# ジッター追加でサンダリングハード問題を回避
import random
delay *= (0.5 + random.random())
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
@async_retry(max_retries=3, base_delay=0.5)
async def fetch_with_retry(session: aiohttp.ClientSession, url: str) -> dict:
"""リトライ機能付きのフェッチ関数"""
timeout = aiohttp.ClientTimeout(total=10, connect=5)
async with session.get(url, timeout=timeout) as response:
response.raise_for_status()
return await response.json()
|
実践パターン3:非同期キューによるプロデューサー・コンシューマーパターン#
大量のタスクを処理する場合、キューを使ったパターンが有効です。プロデューサーがタスクをキューに追加し、複数のコンシューマーが並行して処理します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
import asyncio
from dataclasses import dataclass
from typing import Any
import aiohttp
@dataclass
class Task:
id: int
url: str
payload: dict
async def producer(queue: asyncio.Queue, tasks: list[Task]):
"""タスクをキューに追加"""
for task in tasks:
await queue.put(task)
print(f"Produced task {task.id}")
# 終了シグナル
for _ in range(3): # コンシューマー数分
await queue.put(None)
async def consumer(
name: str,
queue: asyncio.Queue,
session: aiohttp.ClientSession,
results: list
):
"""キューからタスクを取得して処理"""
while True:
task = await queue.get()
if task is None:
queue.task_done()
print(f"Consumer {name} finished")
break
try:
async with session.post(task.url, json=task.payload) as response:
result = await response.json()
results.append({"task_id": task.id, "result": result})
print(f"Consumer {name} processed task {task.id}")
except Exception as e:
print(f"Consumer {name} failed on task {task.id}: {e}")
finally:
queue.task_done()
async def process_tasks(tasks: list[Task], num_consumers: int = 3) -> list[dict]:
"""プロデューサー・コンシューマーパターンでタスクを処理"""
queue = asyncio.Queue(maxsize=100) # バックプレッシャー制御
results = []
async with aiohttp.ClientSession() as session:
# プロデューサーとコンシューマーを並行実行
consumers = [
asyncio.create_task(consumer(f"C{i}", queue, session, results))
for i in range(num_consumers)
]
producer_task = asyncio.create_task(producer(queue, tasks))
# 全タスク完了を待機
await producer_task
await asyncio.gather(*consumers)
return results
|
実践パターン4:データベース操作の非同期化#
SQLAlchemyやasyncpgを使って、データベース操作も非同期化できます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, select
import asyncio
# 非同期エンジンの作成
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True, pool_size=20, max_overflow=10)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255))
async def get_users_batch(user_ids: list[int]) -> list[User]:
"""複数ユーザーをバッチで取得"""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(User).where(User.id.in_(user_ids))
)
return result.scalars().all()
async def create_users_batch(users_data: list[dict]) -> list[User]:
"""複数ユーザーをバッチで作成"""
async with AsyncSessionLocal() as session:
users = [User(**data) for data in users_data]
session.add_all(users)
await session.commit()
return users
|
実践パターン5:asyncioとマルチプロセスの組み合わせ#
CPUバウンドな処理がある場合、asyncioだけでは効果が限定的です。concurrent.futures.ProcessPoolExecutorと組み合わせることで、CPUバウンドとI/Oバウンドの両方を最適化できます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
import asyncio
from concurrent.futures import ProcessPoolExecutor
import hashlib
def cpu_intensive_task(data: bytes) -> str:
"""CPUバウンドな処理(例:ハッシュ計算)"""
result = data
for _ in range(100000):
result = hashlib.sha256(result).digest()
return result.hex()
async def process_with_cpu_and_io(items: list[bytes]) -> list[dict]:
"""CPU処理とI/O処理を組み合わせた非同期処理"""
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=4) as executor:
# CPU処理を別プロセスで実行
cpu_tasks = [
loop.run_in_executor(executor, cpu_intensive_task, item)
for item in items
]
# I/O処理も並行して実行
async with aiohttp.ClientSession() as session:
io_tasks = [
fetch_with_retry(session, f"https://api.example.com/process")
for _ in items
]
# 両方の結果を待機
cpu_results, io_results = await asyncio.gather(
asyncio.gather(*cpu_tasks),
asyncio.gather(*io_tasks, return_exceptions=True)
)
return [
{"hash": cpu_result, "api_response": io_result}
for cpu_result, io_result in zip(cpu_results, io_results)
]
|
パフォーマンス計測とデバッグ#
実行時間の計測#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import asyncio
import time
from contextlib import asynccontextmanager
from typing import AsyncGenerator
@asynccontextmanager
async def async_timer(name: str) -> AsyncGenerator[None, None]:
"""非同期処理の実行時間を計測"""
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"{name}: {elapsed:.4f}秒")
async def main():
async with async_timer("API一括取得"):
results = await fetch_users_async(list(range(1, 1001)))
print(f"取得件数: {len(results)}")
|
デバッグモードの有効化#
1
2
3
4
5
6
7
|
import asyncio
# デバッグモードを有効化
asyncio.run(main(), debug=True)
# または環境変数で
# PYTHONASYNCIODEBUG=1 python script.py
|
よくある落とし穴と対策#
1. ブロッキング関数の呼び出し#
1
2
3
4
5
6
7
8
|
# ❌ 悪い例:asyncio内で同期的なsleep
async def bad_example():
import time
time.sleep(1) # イベントループをブロック!
# ✅ 良い例:asyncio.sleepを使用
async def good_example():
await asyncio.sleep(1) # 他のタスクに制御を渡す
|
2. awaitの付け忘れ#
1
2
3
4
5
6
7
8
9
|
# ❌ 悪い例:awaitなし
async def bad():
result = fetch_data() # コルーチンオブジェクトが返るだけ
print(result) # <coroutine object ...>
# ✅ 良い例:awaitあり
async def good():
result = await fetch_data()
print(result) # 実際のデータ
|
3. 例外処理の不備#
1
2
3
4
5
6
7
|
# ✅ gather使用時の例外処理
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
process_result(result)
|
まとめ#
asyncioを使った非同期プログラミングは、I/Oバウンドな処理の高速化に非常に効果的です。本記事で紹介した実践パターンを活用することで、以下のような改善が期待できます:
- API呼び出しの並行化:数十倍の高速化が可能
- 適切な同時接続数制限:サーバー負荷とリソースの最適化
- リトライ機構:一時的な障害への耐性向上
- キューパターン:大量タスクの効率的な処理
- マルチプロセスとの組み合わせ:CPU+I/O両方の最適化
重要なのは、asyncioは銀の弾丸ではないということです。CPUバウンドな処理には効果がなく、適切な設計なしに導入するとかえって複雑さが増すこともあります。処理の特性を理解した上で、適材適所で活用していきましょう。