MLプロジェクトの本番化で最大の障壁は、モデルの精度ではなくパイプラインの再現性です。「このNotebookで動いていたはずなのに」と頭を抱えた経験はMLエンジニアなら誰もが持っています。本記事では、学習・推論・再学習の3本柱を独立した自動パイプラインとして組む、という設計原則を提示します。手作業の「ジュピターで学習」から抜け出し、日次・時次・イベント駆動で回るML基盤への移行ステップを解説します。

MLパイプラインとは

MLパイプラインは、データの取得・前処理・特徴量生成・学習・評価・デプロイ・推論までの一連のステップを自動化したワークフローです。単なるタスクスケジューラではなく、依存関係・再実行・ログ管理・監視を統合したアーキテクチャを指します。

【MLパイプラインの全体アーキテクチャ】

[データソース] --> [データ検証] --> [前処理] --> [特徴量生成]
                                                      |
                                                      v
                                                [特徴量ストア]
                                                      |
                                                      v
                                                 [学習ジョブ]
                                                      |
                                                      v
                                                  [評価ゲート] -- 不合格 --> [停止]
                                                      |
                                                     合格
                                                      v
                                                [モデルレジストリ] -- 承認 -->
                                                      |
                                                      v
                                                  [デプロイ]
                                                      |
         +--------------------------------------------+---------------+
         |                                                            |
         v                                                            v
   [バッチ推論]                                                 [リアルタイム推論API]
         |                                                            |
         v                                                            v
   [予測結果テーブル]                                          [応答JSON]
         |
         v
    [監視・ドリフト検知] -- 劣化検知 --> [再学習トリガー]

※ 評価ゲートと承認プロセスが「劣化モデルの本番流入」を防ぐ砦

学習パイプラインの設計

学習パイプラインは、データ取得から学習済みモデルの登録までを一連のDAGとして定義します。重要な設計原則は、(1)冪等性(同じ日に何回実行しても結果が同じ)、(2)再現性(コミットハッシュ付きでモデルを保存)、(3)評価ゲート(精度劣化時はデプロイを止める)の3点です。

# dags/training_pipeline.py
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import mlflow

def extract(**ctx):
    df = spark.sql("SELECT * FROM fact_events WHERE ds = {{ ds }}")
    df.write.parquet(f"s3://ml/raw/{ctx['ds']}")

def train(**ctx):
    with mlflow.start_run():
        model = RandomForestClassifier(n_estimators=200).fit(X, y)
        mlflow.sklearn.log_model(model, "model", registered_model_name="churn")
        mlflow.log_metric("accuracy", model.score(X_val, y_val))

def evaluate(**ctx):
    acc = fetch_latest_metric("churn", "accuracy")
    prev = fetch_prod_metric("churn", "accuracy")
    if acc < prev * 0.98:
        raise ValueError(f"精度劣化: {acc} < {prev}")

with DAG("churn_training", schedule="@daily", start_date=datetime(2026, 1, 1)) as dag:
    t1 = PythonOperator(task_id="extract", python_callable=extract)
    t2 = PythonOperator(task_id="train", python_callable=train)
    t3 = PythonOperator(task_id="evaluate", python_callable=evaluate)
    t1 >> t2 >> t3

推論パイプラインの設計

推論パイプラインは、モデルを使って予測結果を生成する部分です。バッチ推論とリアルタイム推論の2パターンがあり、用途によって使い分けます。

パターンレイテンシスループットコスト適した場面
バッチ推論高(分〜時間)非常に高い日次集計、メール配信、レコメンド事前計算
マイクロバッチ推論中(数分)ニアリアルタイム分析
リアルタイム推論API低(数十〜数百ms)オンライン広告、不正検知
ストリーミング推論IoT、ログ異常検知

FastAPIでのリアルタイム推論APIの最小例です。

# serving/api.py
from fastapi import FastAPI
from pydantic import BaseModel
import mlflow.pyfunc

app = FastAPI()
model = mlflow.pyfunc.load_model("models:/churn/Production")

class PredictionRequest(BaseModel):
    customer_id: str
    avg_purchase_30d: float
    session_count_7d: int

@app.post("/predict")
def predict(req: PredictionRequest):
    features = {
        "avg_purchase_30d": req.avg_purchase_30d,
        "session_count_7d": req.session_count_7d,
    }
    proba = model.predict([features])[0]
    return {"customer_id": req.customer_id, "churn_probability": float(proba)}

再学習パイプラインの設計

再学習は「定期再学習」と「トリガー再学習」の2つに大別できます。どちらが適切かは、データの変化速度とインフラコストのトレードオフで決まります。

トリガー判定メリットデメリット適した場面
スケジュール(日次・週次)定時運用がシンプル不要な再学習でコスト増データ変化が予測可能
ドリフト検知統計量の変化必要な時だけ学習ドリフト検知器の実装が必要データ変化が不規則
性能劣化オンライン評価指標実害が出てから対応検知が遅れがちフィードバックが早い環境
データ量閾値新規データ量データ効率が良い精度劣化と必ずしも一致しないストリーミング系
イベント駆動外部イベント業務変化に追従設計が複雑季節性・キャンペーン連動

実運用では、ベースを「スケジュール(週次)」にして、ドリフト検知で緊急再学習を発動する、というハイブリッドが最も安定します。

オーケストレーションツールの比較

MLパイプラインのオーケストレーションツールは、汎用ワークフロー系とML特化系に分かれます。主要な選択肢を比較します。

  • Airflow――もっとも成熟した汎用ワークフロー。プラグイン豊富、既存DAGの移植性高。MLだけでなくETLも同じ基盤で動かせる
  • Dagster――データアセット指向、型付きパイプライン、ローカル開発体験が良い。モダンスタックとの親和性が高い
  • Prefect――Python-firstな設計、動的DAG、シンプルなAPI。小規模スタート向き
  • Kubeflow Pipelines――K8sネイティブ、ML特化、コンテナ単位のステップ定義。GCP系と統合が強い
  • Metaflow――Netflix製、Python DSLが直感的、クラウドネイティブ実行

新規プロジェクトなら、既存のデータ基盤がAirflowベースならAirflowを継続、モダンスタック寄りならDagster、K8s中心の大規模MLならKubeflowが無難な選択です。

まとめ

MLパイプラインは「Notebookで学習していた時代」から「自動パイプラインが24時間回る時代」への橋渡しです。学習・推論・再学習の3本柱を独立して設計し、評価ゲートと監視を組み込むことで、モデルの品質と運用の安定性が両立できます。関連記事としてMLOpsとはモデルレジストリ特徴量ストアA/Bテスト設計もあわせてご参照ください。

よくある質問(FAQ)

Q1. MLパイプラインとは?

A. データの前処理、モデル学習、評価、デプロイまでの一連のワークフローを自動化する仕組みです。単なるスクリプトの実行順制御ではなく、依存関係・再実行・監視・評価ゲートを組み込んだアーキテクチャ全体を指します。再現性と運用安定性を担保する中核的な装置です。

Q2. 再学習はどのくらいの頻度で行うべきですか?

A. データの変化速度に依存します。一般的には日次〜月次ですが、モデルドリフト検知をトリガーにする方が効率的です。頻繁すぎる再学習はコスト増とモデルの不安定化を招くため、ベースラインは週次、異常検知で緊急再学習、というハイブリッドが実務的です。

Q3. バッチ推論とリアルタイム推論の使い分けは?

A. レイテンシ要件が秒単位ならリアルタイム、分〜時間単位で十分ならバッチ推論を選択します。バッチの方がコスト効率が高く、リアルタイム推論は運用負荷も大きいため、本当にオンラインが必要かは慎重に判断してください。多くの業務ユースケースはバッチで足ります。