🚀 AI2CORE

AIエージェント・コーディングAIの最新トレンドと実践ノウハウを、シンプルに解説します。

RustでCLIツール開発入門:clapからクロスプラットフォーム配布まで完全解説

はじめに CLIツールの開発言語として、Rustは非常に優れた選択肢です。高速な実行速度、シングルバイナリでの配布、強力な型システムによる安全性、そしてクロスプラットフォーム対応が容易という特徴があります。 本記事では、Rustを使って実用的なCLIツールを開発する手順を、プロジェクトのセットアップからクロスプラットフォーム配布まで一貫して解説します。 プロジェクトのセットアップ Cargoプロジェクトの作成 1 2 3 4 5 6 # 新規プロジェクト作成 cargo new mytools --name mytools cd mytools # または既存ディレクトリで初期化 cargo init 基本的な依存関係 Cargo.tomlに必要なクレートを追加します: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 [package] name = "mytools" version = "0.1.0" edition = "2021" authors = ["Your Name <[email protected]>"] description = "A collection of useful CLI tools" license = "MIT" repository = "https://github.com/yourname/mytools" keywords = ["cli", "tools", "utility"] categories = ["command-line-utilities"] [dependencies] # 引数パース clap = { version = "4.5", features = ["derive", "env", "wrap_help"] } # 非同期ランタイム tokio = { version = "1.36", features = ["full"] } # HTTP クライアント reqwest = { version = "0.12", features = ["json", "rustls-tls"] } # JSON処理 serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" # エラーハンドリング anyhow = "1.0" thiserror = "1.0" # ログ出力 tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } # 設定ファイル config = "0.14" # プログレスバー indicatif = "0.17" # カラー出力 colored = "2.1" # ファイルシステム操作 walkdir = "2.4" # 日時処理 chrono = { version = "0.4", features = ["serde"] } [dev-dependencies] assert_cmd = "2.0" predicates = "3.1" tempfile = "3.10" [profile.release] lto = true codegen-units = 1 panic = "abort" strip = true clapを使った引数パース 基本的なCLI構造 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 // src/main.rs use clap::{Parser, Subcommand, Args}; use anyhow::Result; /// A collection of useful CLI tools #[derive(Parser)] #[command(name = "mytools")] #[command(author, version, about, long_about = None)] #[command(propagate_version = true)] struct Cli { /// Enable verbose output #[arg(short, long, global = true)] verbose: bool, /// Configuration file path #[arg(short, long, global = true, env = "MYTOOLS_CONFIG")] config: Option<std::path::PathBuf>, #[command(subcommand)] command: Commands, } #[derive(Subcommand)] enum Commands { /// Fetch data from API Fetch(FetchArgs), /// Process files Process(ProcessArgs), /// Show configuration Config(ConfigArgs), } #[derive(Args)] struct FetchArgs { /// API endpoint URL #[arg(short, long)] url: String, /// Output file path #[arg(short, long)] output: Option<std::path::PathBuf>, /// Request timeout in seconds #[arg(short, long, default_value = "30")] timeout: u64, /// Number of retry attempts #[arg(long, default_value = "3")] retries: u32, } #[derive(Args)] struct ProcessArgs { /// Input files or directories #[arg(required = true)] inputs: Vec<std::path::PathBuf>, /// Output directory #[arg(short, long, default_value = ".")] output_dir: std::path::PathBuf, /// Process files in parallel #[arg(short, long)] parallel: bool, /// Number of worker threads #[arg(short = 'j', long, default_value = "4")] jobs: usize, } #[derive(Args)] struct ConfigArgs { /// Show current configuration #[arg(long)] show: bool, /// Initialize configuration file #[arg(long)] init: bool, } #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); // ログの初期化 init_logging(cli.verbose)?; // 設定の読み込み let config = load_config(cli.config.as_deref())?; match cli.command { Commands::Fetch(args) => cmd_fetch(args, &config).await, Commands::Process(args) => cmd_process(args, &config).await, Commands::Config(args) => cmd_config(args, &config), } } ログ出力の設定 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 // src/logging.rs use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use anyhow::Result; pub fn init_logging(verbose: bool) -> Result<()> { let filter = if verbose { EnvFilter::new("debug") } else { EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("info")) }; tracing_subscriber::registry() .with(filter) .with(fmt::layer().with_target(false).with_thread_ids(false)) .init(); Ok(()) } 設定ファイルの扱い 設定構造体の定義 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 // src/config.rs use serde::{Deserialize, Serialize}; use std::path::PathBuf; use anyhow::{Context, Result}; #[derive(Debug, Deserialize, Serialize, Default)] pub struct AppConfig { #[serde(default)] pub api: ApiConfig, #[serde(default)] pub processing: ProcessingConfig, #[serde(default)] pub output: OutputConfig, } #[derive(Debug, Deserialize, Serialize)] pub struct ApiConfig { pub base_url: String, pub api_key: Option<String>, pub timeout_seconds: u64, pub max_retries: u32, } impl Default for ApiConfig { fn default() -> Self { Self { base_url: "https://api.example.com".to_string(), api_key: None, timeout_seconds: 30, max_retries: 3, } } } #[derive(Debug, Deserialize, Serialize)] pub struct ProcessingConfig { pub parallel: bool, pub max_workers: usize, pub chunk_size: usize, } impl Default for ProcessingConfig { fn default() -> Self { Self { parallel: true, max_workers: num_cpus::get(), chunk_size: 1024 * 1024, // 1MB } } } #[derive(Debug, Deserialize, Serialize)] pub struct OutputConfig { pub format: OutputFormat, pub color: bool, pub quiet: bool, } #[derive(Debug, Deserialize, Serialize, Default)] #[serde(rename_all = "lowercase")] pub enum OutputFormat { #[default] Text, Json, Csv, } impl Default for OutputConfig { fn default() -> Self { Self { format: OutputFormat::Text, color: true, quiet: false, } } } pub fn load_config(path: Option<&std::path::Path>) -> Result<AppConfig> { let builder = config::Config::builder(); // デフォルト設定 let builder = builder.add_source(config::File::from_str( include_str!("../config/default.toml"), config::FileFormat::Toml, )); // ユーザー設定ファイル let config_path = path .map(PathBuf::from) .or_else(|| { dirs::config_dir().map(|p| p.join("mytools").join("config.toml")) }); let builder = if let Some(ref path) = config_path { if path.exists() { builder.add_source(config::File::from(path.as_path())) } else { builder } } else { builder }; // 環境変数からの上書き let builder = builder.add_source( config::Environment::with_prefix("MYTOOLS") .separator("__") .try_parsing(true), ); let config = builder .build() .context("Failed to build configuration")? .try_deserialize() .context("Failed to deserialize configuration")?; Ok(config) } エラーハンドリング カスタムエラー型の定義 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 // src/error.rs use thiserror::Error; #[derive(Error, Debug)] pub enum AppError { #[error("API error: {message} (status: {status})")] Api { status: u16, message: String, }, #[error("File not found: {path}")] FileNotFound { path: std::path::PathBuf, }, #[error("Invalid input: {0}")] InvalidInput(String), #[error("Configuration error: {0}")] Config(String), #[error("Network error: {0}")] Network(#[from] reqwest::Error), #[error("IO error: {0}")] Io(#[from] std::io::Error), #[error("JSON error: {0}")] Json(#[from] serde_json::Error), } pub type AppResult<T> = Result<T, AppError>; エラーハンドリングの実践 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 // src/commands/fetch.rs use crate::config::AppConfig; use crate::error::{AppError, AppResult}; use anyhow::{Context, Result}; use indicatif::{ProgressBar, ProgressStyle}; use std::time::Duration; use tracing::{debug, info, warn}; pub async fn cmd_fetch(args: FetchArgs, config: &AppConfig) -> Result<()> { info!("Fetching data from: {}", args.url); let client = reqwest::Client::builder() .timeout(Duration::from_secs(args.timeout)) .build() .context("Failed to create HTTP client")?; let mut last_error = None; for attempt in 1..=args.retries { debug!("Attempt {}/{}", attempt, args.retries); match fetch_with_progress(&client, &args.url).await { Ok(data) => { // 出力処理 if let Some(ref output_path) = args.output { std::fs::write(output_path, &data) .with_context(|| format!("Failed to write to {:?}", output_path))?; info!("Data saved to {:?}", output_path); } else { println!("{}", data); } return Ok(()); } Err(e) => { warn!("Attempt {} failed: {}", attempt, e); last_error = Some(e); if attempt < args.retries { let delay = Duration::from_secs(2u64.pow(attempt)); debug!("Retrying in {:?}...", delay); tokio::time::sleep(delay).await; } } } } Err(last_error.unwrap().into()) } async fn fetch_with_progress(client: &reqwest::Client, url: &str) -> AppResult<String> { let response = client.get(url).send().await?; if !response.status().is_success() { return Err(AppError::Api { status: response.status().as_u16(), message: response.text().await.unwrap_or_default(), }); } let total_size = response.content_length().unwrap_or(0); let pb = ProgressBar::new(total_size); pb.set_style( ProgressStyle::default_bar() .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")? .progress_chars("#>-"), ); let mut downloaded = 0u64; let mut content = Vec::new(); let mut stream = response.bytes_stream(); use futures_util::StreamExt; while let Some(chunk) = stream.next().await { let chunk = chunk?; downloaded += chunk.len() as u64; pb.set_position(downloaded); content.extend_from_slice(&chunk); } pb.finish_with_message("Download complete"); String::from_utf8(content) .map_err(|e| AppError::InvalidInput(format!("Invalid UTF-8: {}", e))) } ファイル処理コマンドの実装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 // src/commands/process.rs use crate::config::AppConfig; use anyhow::{Context, Result}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use rayon::prelude::*; use std::path::PathBuf; use tracing::info; use walkdir::WalkDir; pub async fn cmd_process(args: ProcessArgs, config: &AppConfig) -> Result<()> { // 処理対象ファイルの収集 let files = collect_files(&args.inputs)?; info!("Found {} files to process", files.len()); if files.is_empty() { println!("No files to process"); return Ok(()); } // 出力ディレクトリの作成 std::fs::create_dir_all(&args.output_dir) .with_context(|| format!("Failed to create output directory: {:?}", args.output_dir))?; if args.parallel { process_parallel(&files, &args.output_dir, args.jobs)?; } else { process_sequential(&files, &args.output_dir)?; } Ok(()) } fn collect_files(inputs: &[PathBuf]) -> Result<Vec<PathBuf>> { let mut files = Vec::new(); for input in inputs { if input.is_file() { files.push(input.clone()); } else if input.is_dir() { for entry in WalkDir::new(input) .follow_links(true) .into_iter() .filter_map(|e| e.ok()) { if entry.file_type().is_file() { files.push(entry.into_path()); } } } } Ok(files) } fn process_parallel(files: &[PathBuf], output_dir: &PathBuf, jobs: usize) -> Result<()> { let pool = rayon::ThreadPoolBuilder::new() .num_threads(jobs) .build() .context("Failed to create thread pool")?; let mp = MultiProgress::new(); let pb = mp.add(ProgressBar::new(files.len() as u64)); pb.set_style( ProgressStyle::default_bar() .template("{spinner:.green} Processing [{bar:40}] {pos}/{len} ({percent}%)")? .progress_chars("=> "), ); let results: Vec<Result<(), anyhow::Error>> = pool.install(|| { files .par_iter() .map(|file| { let result = process_single_file(file, output_dir); pb.inc(1); result }) .collect() }); pb.finish_with_message("Processing complete"); // エラーの集計 let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect(); if !errors.is_empty() { eprintln!("Errors occurred during processing:"); for err in &errors { eprintln!(" - {}", err); } } Ok(()) } fn process_sequential(files: &[PathBuf], output_dir: &PathBuf) -> Result<()> { let pb = ProgressBar::new(files.len() as u64); pb.set_style( ProgressStyle::default_bar() .template("{spinner:.green} Processing [{bar:40}] {pos}/{len}")? .progress_chars("=> "), ); for file in files { process_single_file(file, output_dir)?; pb.inc(1); } pb.finish_with_message("Processing complete"); Ok(()) } fn process_single_file(input: &PathBuf, output_dir: &PathBuf) -> Result<()> { // ファイル処理のロジック let content = std::fs::read_to_string(input) .with_context(|| format!("Failed to read: {:?}", input))?; // 何らかの処理(例:行数カウント、変換など) let processed = content.lines().count(); let output_name = input .file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_else(|| "output".to_string()); let output_path = output_dir.join(format!("{}.processed", output_name)); std::fs::write(&output_path, format!("Lines: {}", processed)) .with_context(|| format!("Failed to write: {:?}", output_path))?; Ok(()) } テストの実装 単体テスト 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 // src/lib.rs #[cfg(test)] mod tests { use super::*; #[test] fn test_config_default() { let config = AppConfig::default(); assert_eq!(config.api.timeout_seconds, 30); assert!(config.processing.parallel); } #[test] fn test_collect_files_single() { let temp = tempfile::tempdir().unwrap(); let file_path = temp.path().join("test.txt"); std::fs::write(&file_path, "content").unwrap(); let files = collect_files(&[file_path.clone()]).unwrap(); assert_eq!(files.len(), 1); assert_eq!(files[0], file_path); } } 統合テスト(CLIテスト) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 // tests/cli.rs use assert_cmd::Command; use predicates::prelude::*; use tempfile::tempdir; #[test] fn test_help() { let mut cmd = Command::cargo_bin("mytools").unwrap(); cmd.arg("--help") .assert() .success() .stdout(predicate::str::contains("A collection of useful CLI tools")); } #[test] fn test_version() { let mut cmd = Command::cargo_bin("mytools").unwrap(); cmd.arg("--version") .assert() .success() .stdout(predicate::str::contains(env!("CARGO_PKG_VERSION"))); } #[test] fn test_process_command() { let input_dir = tempdir().unwrap(); let output_dir = tempdir().unwrap(); // テストファイルの作成 std::fs::write(input_dir.path().join("test1.txt"), "line1\nline2\nline3").unwrap(); std::fs::write(input_dir.path().join("test2.txt"), "single line").unwrap(); let mut cmd = Command::cargo_bin("mytools").unwrap(); cmd.args([ "process", input_dir.path().to_str().unwrap(), "-o", output_dir.path().to_str().unwrap(), ]) .assert() .success(); // 出力ファイルの確認 assert!(output_dir.path().join("test1.txt.processed").exists()); assert!(output_dir.path().join("test2.txt.processed").exists()); } #[test] fn test_fetch_invalid_url() { let mut cmd = Command::cargo_bin("mytools").unwrap(); cmd.args(["fetch", "--url", "invalid-url"]) .assert() .failure(); } クロスプラットフォームビルド GitHub Actionsでの自動ビルド 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 # .github/workflows/release.yml name: Release on: push: tags: - 'v*' permissions: contents: write jobs: build: strategy: matrix: include: - target: x86_64-unknown-linux-gnu os: ubuntu-latest name: linux-x64 - target: x86_64-unknown-linux-musl os: ubuntu-latest name: linux-x64-musl - target: aarch64-unknown-linux-gnu os: ubuntu-latest name: linux-arm64 - target: x86_64-apple-darwin os: macos-latest name: macos-x64 - target: aarch64-apple-darwin os: macos-latest name: macos-arm64 - target: x86_64-pc-windows-msvc os: windows-latest name: windows-x64 runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 - name: Install Rust uses: dtolnay/rust-action@stable with: targets: ${{ matrix.target }} - name: Install cross-compilation tools if: matrix.target == 'aarch64-unknown-linux-gnu' run: | sudo apt-get update sudo apt-get install -y gcc-aarch64-linux-gnu - name: Install musl tools if: matrix.target == 'x86_64-unknown-linux-musl' run: | sudo apt-get update sudo apt-get install -y musl-tools - name: Build run: cargo build --release --target ${{ matrix.target }} - name: Package (Unix) if: runner.os != 'Windows' run: | cd target/${{ matrix.target }}/release tar czf ../../../mytools-${{ matrix.name }}.tar.gz mytools cd ../../.. - name: Package (Windows) if: runner.os == 'Windows' run: | cd target/${{ matrix.target }}/release 7z a ../../../mytools-${{ matrix.name }}.zip mytools.exe cd ../../.. - name: Upload artifacts uses: actions/upload-artifact@v4 with: name: mytools-${{ matrix.name }} path: mytools-${{ matrix.name }}.* release: needs: build runs-on: ubuntu-latest steps: - name: Download all artifacts uses: actions/download-artifact@v4 - name: Create Release uses: softprops/action-gh-release@v1 with: files: | mytools-*/mytools-* draft: false prerelease: false ローカルでのクロスコンパイル 1 2 3 4 5 6 7 8 # クロスツールのインストール cargo install cross # Linux ARM64向けビルド cross build --release --target aarch64-unknown-linux-gnu # Windows向けビルド(Linux/macOSから) cross build --release --target x86_64-pc-windows-gnu まとめ RustでCLIツールを開発する際の重要なポイントをまとめます: ...

March 8, 2026 · 12 min · AI2CORE 編集部

Kubernetes HPA/VPA完全ガイド:本番環境で失敗しないオートスケーリング設計と実装

はじめに Kubernetesを本番環境で運用する上で、適切なリソース管理とスケーリングは避けて通れない課題です。過剰なリソース割り当てはコストの無駄になり、不足していればパフォーマンス低下やサービス障害につながります。 本記事では、Kubernetesの自動スケーリング機能であるHorizontal Pod Autoscaler(HPA)とVertical Pod Autoscaler(VPA)について、基礎概念から本番環境での実践的な運用ノウハウまでを詳しく解説します。 HPAとVPAの違い Horizontal Pod Autoscaler(HPA) HPAは、Pod数を水平方向にスケールさせる機能です。負荷が増加するとPod数を増やし、負荷が減少するとPod数を減らします。 適用シーン: ステートレスなWebアプリケーション APIサーバー ワーカープロセス Vertical Pod Autoscaler(VPA) VPAは、個々のPodのリソース要求(CPU/メモリ)を垂直方向に調整する機能です。実際の使用状況に基づいて、より適切なリソース割り当てを推奨・適用します。 適用シーン: ステートフルなアプリケーション データベースやキャッシュサーバー バッチ処理ジョブ HPA実践ガイド 基本的なHPA設定 まずはシンプルなCPUベースのHPAから始めましょう。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 # deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: web-api namespace: production spec: replicas: 3 selector: matchLabels: app: web-api template: metadata: labels: app: web-api spec: containers: - name: web-api image: myregistry/web-api:v1.2.3 resources: requests: cpu: 200m memory: 256Mi limits: cpu: 1000m memory: 512Mi ports: - containerPort: 8080 readinessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 10 periodSeconds: 5 --- # hpa.yaml apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: web-api-hpa namespace: production spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: web-api minReplicas: 3 maxReplicas: 50 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80 behavior: scaleDown: stabilizationWindowSeconds: 300 policies: - type: Percent value: 10 periodSeconds: 60 scaleUp: stabilizationWindowSeconds: 0 policies: - type: Percent value: 100 periodSeconds: 15 - type: Pods value: 4 periodSeconds: 15 selectPolicy: Max カスタムメトリクスを使ったHPA CPUやメモリだけでなく、アプリケーション固有のメトリクスでスケーリングすることも可能です。 ...

March 8, 2026 · 6 min · AI2CORE 編集部

Python asyncio実践ガイド:並行処理で処理速度を10倍にする具体的テクニック

はじめに Pythonで大量のAPI呼び出しやファイル操作を行う際、処理時間がボトルネックになることは珍しくありません。同期的な処理では、一つの操作が完了するまで次の処理を開始できないため、I/O待ち時間が積み重なってしまいます。 本記事では、Pythonのasyncioモジュールを使った非同期プログラミングについて、基礎概念から実践的なパターンまでを体系的に解説します。実際のプロジェクトで使える具体的なコード例を通じて、処理速度を劇的に改善する方法を学んでいきましょう。 asyncioの基本概念 イベントループとは asyncioの中心にあるのがイベントループです。イベントループは、非同期タスクの実行を管理し、I/O操作の完了を監視して、適切なタイミングでタスクを再開させる役割を担います。 1 2 3 4 5 6 7 8 9 import asyncio async def main(): print("Hello") await asyncio.sleep(1) print("World") # Python 3.7以降の推奨方法 asyncio.run(main()) async/awaitの仕組み async defで定義された関数はコルーチン関数となり、呼び出すとコルーチンオブジェクトを返します。awaitキーワードを使うことで、そのコルーチンの完了を待機しつつ、その間に他のタスクを実行できるようになります。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncio import aiohttp async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict: """URLからデータを非同期で取得""" async with session.get(url) as response: return { "url": url, "status": response.status, "content_length": len(await response.text()) } async def main(): urls = [ "https://api.github.com", "https://api.stripe.com", "https://api.openai.com" ] async with aiohttp.ClientSession() as session: # 全URLを並行して取得 tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(f"{result['url']}: {result['status']}") asyncio.run(main()) 実践パターン1:大量のAPI呼び出しを高速化 問題:同期処理での遅延 例えば、1000件のユーザーデータをAPIから取得する場合を考えます。1リクエストあたり100msかかるとすると、同期処理では100秒もの時間が必要です。 ...

March 8, 2026 · 6 min · AI2CORE 編集部

GitHub Actionsでモノレポを安全に自動リリースする設計: 変更検知・段階配布・失敗復旧

GitHub Actionsでモノレポを安全に自動リリースする設計: 変更検知・段階配布・失敗復旧 モノレポのCI/CDは、単一リポジトリだから楽になる一方で、リリース設計を誤ると一気に難しくなります。 1つの変更で全サービスを再デプロイしてしまう 並列ジョブが増えてキュー渋滞する どのコミットがどのサービスへ反映されたか追跡できない 一部失敗時のロールバックが曖昧 本記事では、GitHub Actionsでモノレポを運用しているチーム向けに、実務で耐えるリリース自動化の構成を具体的に説明します。 1. モノレポCI/CDで先に決める設計原則 最初に次の原則を明文化します。 変更のないサービスはデプロイしない リリース対象は機械的に決定する 本番反映は段階的(canary/割合配布) 失敗時の復旧手順を自動化する 監査ログ(誰が何をいつ)を残す この5つがないと、運用が属人化し、障害時対応が遅れます。 2. 変更検知をワークフローの入口に置く モノレポでは「どのディレクトリが変わったか」を最初に判定し、対象サービスだけを処理します。 2.1 changed-filesの例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 jobs: detect: runs-on: ubuntu-latest outputs: matrix: ${{ steps.set-matrix.outputs.matrix }} steps: - uses: actions/checkout@v4 - id: changed uses: tj-actions/changed-files@v45 with: files_yaml: | api: - services/api/** web: - services/web/** worker: - services/worker/** - id: set-matrix run: | python .github/scripts/build_matrix.py '${{ toJson(steps.changed.outputs) }}' ここでmatrixを作り、後続ジョブを fromJson で動的展開します。 ...

March 7, 2026 · 2 min · AI2CORE 編集部

FastAPI + SQLAlchemy性能改善プレイブック: 遅いAPIを計測ベースで高速化する

FastAPI + SQLAlchemy性能改善プレイブック: 遅いAPIを計測ベースで高速化する FastAPIの初期実装は非常に快適です。しかし運用フェーズに入ると、次のような症状が出てきます。 一覧APIのレスポンスが急に遅くなる 同時接続が増えるとp95が跳ねる CPUは余っているのにタイムアウトが増える DB接続数が上限に張り付く こうした問題の多くは「Pythonが遅い」のではなく、SQLAlchemyの使い方とDBアクセス設計 に起因します。 本記事では、FastAPI + SQLAlchemy + PostgreSQL構成を前提に、実際の改善手順を計測ベースで整理します。 1. 最初に測るべき指標 最適化は、体感ではなく数値で進めます。最低限、以下を可視化します。 APIのp50/p95/p99レイテンシ エンドポイント別SQL発行回数 1リクエストあたりのDB滞在時間 connection pool待ち時間 slow query件数(200ms以上など) OpenTelemetryやNew Relicを使っているなら、アプリspanとDB spanを必ず紐付けてください。これだけでボトルネック特定速度が上がります。 2. N+1問題を最優先で潰す 最も頻出するのがN+1です。例えばユーザー一覧でプロフィールを参照すると、ユーザー数分の追加クエリが発行されます。 2.1 悪い例 1 2 3 4 5 6 7 8 users = session.query(User).limit(100).all() result = [] for u in users: result.append({ "id": u.id, "name": u.name, "profile": u.profile.bio, }) 2.2 改善例(joinedload/selectinload) 1 2 3 4 5 6 7 8 from sqlalchemy.orm import selectinload users = ( session.query(User) .options(selectinload(User.profile)) .limit(100) .all() ) joinedload と selectinload はデータ量で使い分けます。 ...

March 7, 2026 · 2 min · AI2CORE 編集部

Terraform→OpenTofu移行実践ガイド: 既存IaCを止めずに移行するエンタープライズ手順

Terraform→OpenTofu移行実践ガイド: 既存IaCを止めずに移行するエンタープライズ手順 Terraformのライセンス変更以降、OpenTofuへ移行したいという相談は確実に増えています。とはいえ現場の本音は「理屈は分かるが、stateが壊れたら終わる」「本番を止めずに移行できるのか不安」です。 結論から言うと、移行は十分可能です。ただし「CLIを置き換えるだけ」で済むケースは限定的で、実際は providerバージョン整合・state lock・CI/CD・運用Runbook までまとめて整える必要があります。 本記事では、既にTerraformを本番運用しているチーム向けに、OpenTofuへ段階移行する実践手順をまとめます。 1. 移行方針を先に決める 最初に決めるべきは「一気に切り替えるか」「ワークスペース単位で段階移行するか」です。実務では次の方針が安全です。 低リスク環境(dev/sandbox)から先行 本番は最終フェーズで移行 旧TerraformとOpenTofuを一定期間並行運用 ロールバック手順を文書化してから実施 この順序を守るだけで、移行事故の大半を避けられます。 2. 互換性の棚卸し(最重要) まずは現状のIaC資産を棚卸しします。 Terraformバージョン(例: 1.5.x / 1.6.x) 使用provider(AWS/Azure/GCP/Kubernetes等) backend(S3 + DynamoDB lock、Terraform Cloud、GCSなど) moduleの参照方式(registry / git / local) CI実行環境(GitHub Actions, GitLab CI, Jenkins) 2.1 依存を固定化してから移行する .terraform.lock.hcl を必ずコミットし、providerを固定します。移行時にproviderまで同時更新すると、差分原因の切り分けが困難になります。 1 2 3 4 5 6 7 8 9 terraform { required_version = ">= 1.6.0" required_providers { aws = { source = "hashicorp/aws" version = "~> 5.40" } } } 移行フェーズでは「ツール差分」と「provider差分」を分離してください。 ...

March 7, 2026 · 2 min · AI2CORE 編集部

Kyvernoで始めるKubernetes Admission Policy実践: 事故を減らすポリシー設計プレイブック

Kyvernoで始めるKubernetes Admission Policy実践: 事故を減らすポリシー設計プレイブック Kubernetes運用で一番つらい事故は、クラスタが壊れるよりも「本来防げたはずのミスがそのまま本番へ入る」ことです。たとえば、latest タグのイメージが本番に入り再現不能になる、resources 未設定でノードが詰まる、privileged コンテナが混入する。これらは人の注意力だけに依存すると必ず再発します。 そこで有効なのが Admission Policy(入場制御)です。本記事では Kyverno を使って、現場で本当に運用できるポリシー群を段階導入する手順をまとめます。単なる「denyの例」ではなく、監査→警告→強制の移行、例外管理、CI連携まで含めて解説します。 1. なぜKyvernoなのか OPA Gatekeeper も強力ですが、Kyvernoは以下の特徴があり、初期導入が比較的スムーズです。 YAML中心で書ける(Rego学習コストを後回しにしやすい) validate / mutate / generate / verifyImages を一貫して扱える PolicyReportにより違反可視化がしやすい Pod SecurityやSupply Chain対策との相性が良い 「まずルールを回し始める」目的なら、Kyvernoは現実的な選択肢です。 2. 先に決めるべき設計原則 導入前に、以下だけは先に決めておきます。 導入フェーズ: Audit → Enforce を基本にする 責任分界: プラットフォームチームが共通ポリシー、各チームがアプリ固有例外 例外の期限: 永久例外は禁止。期限付きで必ず棚卸し 観測性: 違反数・対象Namespace・上位違反ルールをダッシュボード化 この原則なしにルールだけ増やすと、運用が破綻します。 3. 最小導入手順(30〜60分) 3.1 Kyvernoのインストール 1 2 3 4 5 6 7 8 9 helm repo add kyverno https://kyverno.github.io/kyverno/ helm repo update helm upgrade --install kyverno kyverno/kyverno \ -n kyverno --create-namespace \ --set admissionController.replicas=2 \ --set backgroundController.replicas=2 \ --set cleanupController.replicas=1 \ --set reportsController.replicas=1 本番では可用性のため、admission/backgroundは最低2レプリカ推奨です。 ...

March 6, 2026 · 3 min · AI2CORE 編集部

FastAPI + Celery信頼性設計: 非同期ジョブを本番で壊さないための実装パターン

FastAPI + Celery信頼性設計: 非同期ジョブを本番で壊さないための実装パターン FastAPIでAPIを作ると、重い処理はすぐに非同期ジョブへ逃がしたくなります。画像変換、レポート生成、外部API連携、メール配信など、Celeryは非常に便利です。ですが、本番で問題になるのは「動くかどうか」ではなく、失敗したときに壊れないか です。 同じジョブが二重実行される 一時障害で永遠にリトライしてキューが詰まる ワーカー再起動で中途半端な状態が残る 完了通知が先に飛んで実データがない 本記事では FastAPI + Celery + Redis 構成を前提に、再実行安全性(idempotency)と運用信頼性を上げる実装手順をまとめます。 1. まず守るべき設計原則 非同期基盤の事故は、ほぼ次の4原則で防げます。 At-least-once前提(同一タスク再実行は必ず起こる) 副作用は冪等化(何回実行されても結果が壊れない) 状態遷移を明示(PENDING/RUNNING/SUCCEEDED/FAILED) 失敗を可観測化(リトライ回数・死活・滞留時間を計測) この原則を外すと、障害時に「何が完了して何が未完了か」が追えなくなります。 2. 参照アーキテクチャ API: FastAPI Queue Broker: Redis Worker: Celery Result Store: PostgreSQL(業務状態) Monitoring: Flower + Prometheus + Sentry ポイントは、業務上重要な状態はRedis結果バックエンドに依存しない ことです。Redisは一時的に使い、真実の状態はRDBに持たせます。 3. 実装の土台: タスク受付API 3.1 受け付け時に idempotency_key を必須化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from fastapi import FastAPI, HTTPException from pydantic import BaseModel from sqlalchemy import select app = FastAPI() class JobRequest(BaseModel): idempotency_key: str report_type: str user_id: str @app.post("/reports") def create_report(req: JobRequest): existing = find_job_by_key(req.idempotency_key) if existing: return {"job_id": existing.id, "status": existing.status} job = create_job_record( idempotency_key=req.idempotency_key, status="PENDING", report_type=req.report_type, user_id=req.user_id, ) generate_report.delay(job.id) return {"job_id": job.id, "status": "PENDING"} これでクライアント再送が来てもジョブ多重作成を防げます。 ...

March 6, 2026 · 2 min · AI2CORE 編集部

PostgreSQL PITR復旧訓練ガイド: バックアップがあるのに戻せないを防ぐ実践手順

PostgreSQL PITR復旧訓練ガイド: バックアップがあるのに戻せないを防ぐ実践手順 PostgreSQL運用で最も危険なのは「バックアップがある」という安心感です。実際の障害では、バックアップ自体より 復旧手順の不整合 で時間を失います。たとえば、WAL保管期間が足りず目標時刻に戻せない、暗号鍵が見つからず復号できない、復旧後の整合性確認が曖昧で再開判断ができない、といった問題です。 本記事では、PostgreSQLの Point-in-Time Recovery(PITR)を、机上ではなく本番レベルで回すための実装手順を解説します。pgBackRest を例にしていますが、考え方は他ツールでも共通です。 1. PITRの前提: 3つ揃わないと復旧できない PITRは次の3要素で成立します。 ベースバックアップ(フルまたは差分) WALアーカイブ(継続的) 目標時刻情報(いつまで戻すか) どれか1つでも欠けると成立しません。特に本番で多いのは「WALが途中で消えていた」ケースです。S3保存していても、ライフサイクル設定や権限変更で欠落することがあります。 2. まず決めるべきRTO/RPO 技術論の前に、業務要件を決めます。 RTO(復旧に許容される時間): 例 60分 RPO(失ってよいデータ時間): 例 5分 この2つで設計が変わります。 RPO 5分以内ならWALアーカイブ遅延監視が必須 RTO 60分以内なら復旧訓練を定期実施し、手順を自動化する必要あり 要件不明のまま「毎日バックアップ」だけ実施しても、障害時に役立たないことが多いです。 3. 推奨アーキテクチャ(単一リージョンの最小構成) DBサーバ: PostgreSQL 15/16 バックアップツール: pgBackRest 保存先: S3互換ストレージ(バージョニングON) 監視: Prometheus + Alertmanager 復旧先: 別ホスト(本番と同一ネットワーク) 重要なのは、本番DBと別ホストで実際に復旧できること を定期検証する点です。 4. 実装手順(pgBackRest) 4.1 PostgreSQL設定 postgresql.conf 例: wal_level = replica archive_mode = on archive_command = 'pgbackrest --stanza=main archive-push %p' max_wal_senders = 10 wal_compression = on archive_command は失敗時に非0を返す必要があります。ここが曖昧だとWAL欠落に気づけません。 ...

March 6, 2026 · 2 min · AI2CORE 編集部

Kubernetes環境でDBスキーマ変更を止めずに進める:ゼロダウンタイム移行の実践戦略

Kubernetes環境でDBスキーマ変更を止めずに進める:ゼロダウンタイム移行の実践戦略 「カラムを追加するだけだから大丈夫」──この油断が、本番障害の入口になります。Kubernetes のように複数バージョンの Pod が同時に存在する環境では、DB スキーマ変更はアプリ変更よりも慎重に扱う必要があります。 本記事では、Expand-Contract パターンを中心に、ゼロダウンタイムを目指すための具体手順を解説します。実際の運用では、DDLの速さより「互換性のある期間をどう作るか」が勝負です。 1. なぜKubernetesでDB移行が難しいのか Kubernetesでは、ローリングアップデート中に新旧Podが混在します。つまり次の状態が同時に発生します。 新アプリは新スキーマを期待 旧アプリは旧スキーマしか知らない DBは1つしかない このとき破綻するのが「破壊的変更を先に適用する」ケースです。たとえば旧カラムを即削除すると、旧Podがエラーを連発します。 2. 基本戦略:Expand → Migrate → Contract ゼロダウンタイム移行の原則はこの3段階です。 Expand: 互換性を壊さない変更を先に入れる(新カラム追加など) Migrate: アプリを段階的に切替え、データを移行する Contract: 旧仕様を最終削除する(十分な監視後) この順序なら、どの時点でも旧新どちらのアプリも動作可能にできます。 3. 具体例:users.full_name を first_name / last_name へ分割 3.1 Expand フェーズ まず破壊的でないDDLを適用します。 1 2 ALTER TABLE users ADD COLUMN first_name text; ALTER TABLE users ADD COLUMN last_name text; この時点で旧アプリは full_name を使い続けられます。新アプリは新カラムに対応した実装を持っていても、まだ必須にしません。 3.2 アプリを「両対応」にする 書き込み時は両方へ保存(dual write)し、読み込み時は新カラム優先 + 旧カラムフォールバックにします。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def save_user_name(user_id: str, full_name: str): first, last = split_name(full_name) db.execute( """ UPDATE users SET full_name = %s, first_name = %s, last_name = %s WHERE id = %s """, (full_name, first, last, user_id), ) def read_user_display_name(row): if row.first_name and row.last_name: return f"{row.first_name} {row.last_name}" return row.full_name この両対応期間を作るのが、ゼロダウンタイムの本質です。 ...

March 5, 2026 · 2 min · AI2CORE 編集部