Press ESC to exit fullscreen
🏗️ Project ⏱️ 360 minutes

Capstone: End-to-End MLOps Pipeline

Build a complete ML pipeline from training to monitored production deployment

What You Will Build

This capstone connects every lesson in the course into a single working system. By the end, you will have:

  • A customer churn prediction model trained with full experiment tracking
  • A GitHub Actions CI pipeline that trains, evaluates, and registers automatically
  • A Dockerized FastAPI serving API
  • Drift monitoring that runs weekly and alerts on changes
  • A model registry with stage transitions and rollback capability

The complete project is available as a reference at each step. Read the code carefully — every line exists for a reason explained in the previous lessons.


Project Structure

Start with this layout. Create the directories and files as we go.

churn-mlops/
├── .github/
│   └── workflows/
│       ├── train.yml               # CI on every push
│       └── scheduled-retrain.yml   # Weekly retraining
├── .dvc/                           # DVC config (created by dvc init)
├── configs/
│   └── params.yaml                 # Hyperparameters
├── data/
│   ├── raw/                        # Original data (tracked by DVC)
│   └── processed/                  # Feature-engineered data
├── kubernetes/
│   └── deployment.yaml             # K8s manifests
├── models/                         # Trained model artifacts
├── monitoring/
│   ├── drift_check.py
│   └── reference_data.parquet      # Saved at deployment time
├── notebooks/
│   └── exploration.ipynb           # EDA only
├── src/
│   ├── api/
│   │   └── app.py                  # FastAPI serving
│   ├── data/
│   │   ├── generate_dataset.py     # Creates synthetic churn data
│   │   ├── preprocess.py           # Feature engineering
│   │   └── validate.py             # Data quality checks
│   └── models/
│       ├── train.py                # Training + MLflow logging
│       └── evaluate.py             # Evaluation + metrics output
├── tests/
│   ├── test_preprocess.py
│   └── test_api.py
├── ci/
│   ├── check_quality_gates.py
│   └── register_model.py
├── dvc.yaml                        # Pipeline definition
├── Dockerfile
├── .dockerignore
├── requirements.txt
└── README.md

Step 1: Initialize the Project with DVC

# Create and initialize the project
mkdir churn-mlops && cd churn-mlops
git init
dvc init

# Configure DVC remote storage (S3 or local for development)
dvc remote add -d localremote /tmp/dvc-cache

# Create initial commit
git add .dvc/ .gitignore
git commit -m "Initialize project with DVC"

configs/params.yaml:

# configs/params.yaml
learning_rate: 0.05
n_estimators: 200
max_depth: 5
test_size: 0.2
random_seed: 42
min_accuracy: 0.80
min_auc: 0.85

Step 2: Generate and Version the Dataset

# src/data/generate_dataset.py
"""Generate synthetic customer churn data."""
import numpy as np
import pandas as pd
from pathlib import Path

def generate_churn_data(n_samples: int = 10000, seed: int = 42) -> pd.DataFrame:
    np.random.seed(seed)

    tenure = np.random.randint(1, 72, n_samples)
    monthly_charges = np.random.uniform(20, 100, n_samples)
    num_products = np.random.randint(1, 6, n_samples)
    has_support_calls = np.random.randint(0, 2, n_samples)

    total_charges = tenure * monthly_charges + np.random.normal(0, 50, n_samples)
    total_charges = np.clip(total_charges, 0, None)

    # Churn probability model
    churn_logit = (
        -2.5
        - 0.04 * tenure
        + 0.02 * monthly_charges
        - 0.3 * num_products
        + 0.8 * has_support_calls
        + np.random.normal(0, 0.5, n_samples)
    )
    churn_prob = 1 / (1 + np.exp(-churn_logit))
    churn = (churn_prob > 0.5).astype(int)

    return pd.DataFrame({
        "customer_id": range(1, n_samples + 1),
        "tenure_months": tenure,
        "monthly_charges": monthly_charges.round(2),
        "total_charges": total_charges.round(2),
        "num_products": num_products,
        "has_support_calls": has_support_calls,
        "churn": churn,
    })

if __name__ == "__main__":
    Path("data/raw").mkdir(parents=True, exist_ok=True)
    df = generate_churn_data()
    df.to_csv("data/raw/customers.csv", index=False)
    print(f"Generated {len(df)} rows. Churn rate: {df['churn'].mean():.1%}")
python src/data/generate_dataset.py
dvc add data/raw/customers.csv
git add data/raw/customers.csv.dvc data/raw/.gitignore
git commit -m "Add initial customer dataset (10k rows)"
dvc push

Step 3: Preprocessing and Training

# src/data/preprocess.py
import pandas as pd
import yaml
from pathlib import Path
from sklearn.model_selection import train_test_split

def preprocess(params: dict) -> None:
    df = pd.read_csv("data/raw/customers.csv")

    # Features
    feature_cols = [
        "tenure_months", "monthly_charges", "total_charges",
        "num_products", "has_support_calls"
    ]
    X = df[feature_cols]
    y = df["churn"]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y,
        test_size=params["test_size"],
        random_state=params["random_seed"],
        stratify=y,
    )

    Path("data/processed").mkdir(parents=True, exist_ok=True)
    train = X_train.copy()
    train["churn"] = y_train
    test = X_test.copy()
    test["churn"] = y_test

    train.to_csv("data/processed/train.csv", index=False)
    test.to_csv("data/processed/test.csv", index=False)
    print(f"Train: {len(train)} | Test: {len(test)}")

if __name__ == "__main__":
    with open("configs/params.yaml") as f:
        params = yaml.safe_load(f)
    preprocess(params)
# src/models/train.py
import json, os, pickle
import mlflow, mlflow.sklearn
import pandas as pd, yaml
from pathlib import Path
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

def train() -> None:
    with open("configs/params.yaml") as f:
        params = yaml.safe_load(f)

    train_df = pd.read_csv("data/processed/train.csv")
    X_train = train_df.drop("churn", axis=1)
    y_train = train_df["churn"]

    mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"))
    mlflow.set_experiment("churn-prediction")

    with mlflow.start_run() as run:
        mlflow.log_params(params)
        mlflow.set_tag("git_sha", os.getenv("GITHUB_SHA", "local"))

        model = GradientBoostingClassifier(
            learning_rate=params["learning_rate"],
            n_estimators=params["n_estimators"],
            max_depth=params["max_depth"],
            random_state=params["random_seed"],
        )
        model.fit(X_train, y_train)

        y_pred = model.predict(X_train)
        y_prob = model.predict_proba(X_train)[:, 1]
        train_metrics = {
            "train_accuracy": accuracy_score(y_train, y_pred),
            "train_auc": roc_auc_score(y_train, y_prob),
        }
        mlflow.log_metrics(train_metrics)

        Path("models").mkdir(exist_ok=True)
        with open("models/churn_model.pkl", "wb") as f:
            pickle.dump(model, f)
        mlflow.sklearn.log_model(model, "model")

        # Save run ID for CI to use
        Path("models/run_id.txt").write_text(run.info.run_id)
        print(f"Run ID: {run.info.run_id}")
        print(f"Train metrics: {train_metrics}")

if __name__ == "__main__":
    train()
# src/models/evaluate.py
import json, pickle
import pandas as pd
from pathlib import Path
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, classification_report

def evaluate() -> None:
    test_df = pd.read_csv("data/processed/test.csv")
    X_test = test_df.drop("churn", axis=1)
    y_test = test_df["churn"]

    with open("models/churn_model.pkl", "rb") as f:
        model = pickle.load(f)

    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]

    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred),
        "roc_auc": roc_auc_score(y_test, y_prob),
    }

    Path("metrics").mkdir(exist_ok=True)
    with open("metrics/eval_metrics.json", "w") as f:
        json.dump(metrics, f, indent=2)

    print(f"Eval metrics: {metrics}")
    print(classification_report(y_test, y_pred))

if __name__ == "__main__":
    evaluate()

Step 4: Define the DVC Pipeline

# dvc.yaml
stages:
  preprocess:
    cmd: python src/data/preprocess.py
    deps:
      - src/data/preprocess.py
      - data/raw/customers.csv
    params:
      - configs/params.yaml:
          - test_size
          - random_seed
    outs:
      - data/processed/train.csv
      - data/processed/test.csv

  train:
    cmd: python src/models/train.py
    deps:
      - src/models/train.py
      - data/processed/train.csv
    params:
      - configs/params.yaml:
          - learning_rate
          - n_estimators
          - max_depth
          - random_seed
    outs:
      - models/churn_model.pkl
      - models/run_id.txt

  evaluate:
    cmd: python src/models/evaluate.py
    deps:
      - src/models/evaluate.py
      - models/churn_model.pkl
      - data/processed/test.csv
    metrics:
      - metrics/eval_metrics.json:
          cache: false

Test the full pipeline locally:

dvc repro
dvc metrics show

Step 5: GitHub Actions CI Pipeline

# .github/workflows/train.yml
name: ML Training Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]
  workflow_dispatch:

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
  AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
  GITHUB_SHA: ${{ github.sha }}

jobs:
  train-and-deploy:
    runs-on: ubuntu-latest
    timeout-minutes: 60

    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
          cache: "pip"

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Pull data from DVC
        run: |
          dvc remote modify myremote access_key_id $AWS_ACCESS_KEY_ID
          dvc remote modify myremote secret_access_key $AWS_SECRET_ACCESS_KEY
          dvc pull

      - name: Validate data
        run: python src/data/validate.py

      - name: Run unit tests
        run: pytest tests/ -v

      - name: Run training pipeline
        run: dvc repro

      - name: Check quality gates
        run: |
          python ci/check_quality_gates.py \
            --metrics-file metrics/eval_metrics.json \
            --min-accuracy 0.80 \
            --min-auc 0.85

      - name: Register model in MLflow
        if: github.ref == 'refs/heads/main' && github.event_name == 'push'
        run: |
          RUN_ID=$(cat models/run_id.txt)
          python ci/register_model.py --run-id $RUN_ID

      - name: Build and push Docker image
        if: github.ref == 'refs/heads/main' && github.event_name == 'push'
        run: |
          echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u ${{ github.actor }} --password-stdin
          docker build -t ghcr.io/${{ github.repository }}:${{ github.sha }} .
          docker build -t ghcr.io/${{ github.repository }}:latest .
          docker push ghcr.io/${{ github.repository }}:${{ github.sha }}
          docker push ghcr.io/${{ github.repository }}:latest

      - name: Deploy to Kubernetes
        if: github.ref == 'refs/heads/main' && github.event_name == 'push'
        run: |
          # Update the image tag in the deployment
          kubectl set image deployment/churn-predictor \
            api=ghcr.io/${{ github.repository }}:${{ github.sha }} \
            -n ml-serving
          kubectl rollout status deployment/churn-predictor -n ml-serving

      - name: Comment PR with metrics
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
            const m = JSON.parse(fs.readFileSync('metrics/eval_metrics.json'));
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: `## Evaluation Results\n| Metric | Value |\n|--------|-------|\n| Accuracy | ${m.accuracy.toFixed(4)} |\n| F1 Score | ${m.f1_score.toFixed(4)} |\n| ROC AUC | ${m.roc_auc.toFixed(4)} |`
            });

Step 6: Weekly Drift Monitoring

# .github/workflows/scheduled-retrain.yml
name: Weekly Drift Check and Retraining

on:
  schedule:
    - cron: '0 6 * * 1'   # Every Monday at 6 AM UTC
  workflow_dispatch:

jobs:
  drift-check:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
          cache: "pip"

      - run: pip install -r requirements.txt

      - name: Run drift check
        id: drift
        run: |
          python monitoring/drift_check.py
          DRIFTED=$(python -c "import json; d=json.load(open('monitoring/latest_drift_check.json')); print(d['dataset_drifted'])")
          echo "drifted=$DRIFTED" >> $GITHUB_OUTPUT

      - name: Trigger retraining if drift detected
        if: steps.drift.outputs.drifted == 'True'
        uses: actions/github-script@v7
        with:
          script: |
            // Trigger the training workflow
            github.rest.actions.createWorkflowDispatch({
              owner: context.repo.owner,
              repo: context.repo.repo,
              workflow_id: 'train.yml',
              ref: 'main',
            });
            console.log('Drift detected — triggered retraining workflow');

      - name: Upload drift report
        uses: actions/upload-artifact@v4
        with:
          name: drift-report
          path: monitoring/reports/

Step 7: The Complete Dockerfile

Use the full Dockerfile from lesson 5, then build and verify:

docker build -t churn-predictor:latest .
docker run -p 8000:8000 \
  -e MLFLOW_TRACKING_URI=http://localhost:5000 \
  churn-predictor:latest
curl http://localhost:8000/health

Putting It All Together: The Full Flow

Here is what happens when a data scientist pushes an improvement to main:

Developer pushes to main branch
         |
         v
GitHub Actions: train.yml triggers
         |
         ├─ Pull versioned data (DVC)
         ├─ Validate data schema and quality
         ├─ Run unit tests
         ├─ Run full pipeline (dvc repro)
         │   ├─ Preprocess data
         │   ├─ Train model (logged to MLflow)
         │   └─ Evaluate model (metrics saved)
         ├─ Quality gate check (accuracy ≥ 0.80, AUC ≥ 0.85)
         │   ├─ FAIL → pipeline fails, merge blocked
         │   └─ PASS → continue
         ├─ Compare with production model in MLflow Registry
         │   ├─ Worse → archive new version, fail pipeline
         │   └─ Better → promote to Production stage
         ├─ Build Docker image, push to registry with SHA tag
         └─ Deploy to Kubernetes (rolling update)

Every Monday at 6 AM:
         |
         v
GitHub Actions: scheduled-retrain.yml triggers
         ├─ Fetch recent production traffic from data warehouse
         ├─ Run Evidently drift check vs reference data
         ├─ Generate HTML drift report (uploaded as artifact)
         └─ If drifted → trigger full training workflow

Testing Your Complete Pipeline

# 1. Run everything locally with DVC
dvc repro

# 2. Check metrics
dvc metrics show

# 3. Start the API
uvicorn src.api.app:app --reload

# 4. Run API integration tests
pytest tests/test_api.py -v

# 5. Build and test Docker image
docker build -t churn-predictor:test .
docker run -p 8001:8000 churn-predictor:test &
curl http://localhost:8001/health
curl -X POST http://localhost:8001/predict \
  -H "Content-Type: application/json" \
  -d '{"tenure_months": 6, "monthly_charges": 79.99, "total_charges": 479.94, "num_products": 1, "has_support_calls": 1}'

# 6. Run drift check against reference data
python monitoring/drift_check.py

What You Have Built

You started this course with the scenario: a data scientist emails a pickle file to a colleague who can’t reproduce it, and a model that silently degrades six months later.

You now have the infrastructure that prevents both of those failures:

  • Reproducibility: Anyone can run dvc repro and get the same model. Every artifact is versioned.
  • Automated quality: No bad model can reach production — the CI pipeline enforces it.
  • Observability: You know exactly which model version is in production, what metrics it was trained on, and what data it used.
  • Drift detection: The weekly monitoring job catches distribution shifts before they become user-visible failures.
  • Rollback capability: A model rollback takes minutes, not days, because every version is in the registry.

This is production ML. The 85% of projects that never reach production fail because they skip this infrastructure. You’ve built it.

The next step: apply this pipeline to a real problem you care about. The patterns generalize to any supervised ML task — the tools and workflows are the same whether you’re predicting churn, detecting fraud, recommending content, or forecasting demand.