Course Content
Capstone: End-to-End MLOps Pipeline
Build a complete ML pipeline from training to monitored production deployment
What You Will Build
This capstone connects every lesson in the course into a single working system. By the end, you will have:
- A customer churn prediction model trained with full experiment tracking
- A GitHub Actions CI pipeline that trains, evaluates, and registers automatically
- A Dockerized FastAPI serving API
- Drift monitoring that runs weekly and alerts on changes
- A model registry with stage transitions and rollback capability
The complete project is available as a reference at each step. Read the code carefully — every line exists for a reason explained in the previous lessons.
Project Structure
Start with this layout. Create the directories and files as we go.
churn-mlops/
├── .github/
│ └── workflows/
│ ├── train.yml # CI on every push
│ └── scheduled-retrain.yml # Weekly retraining
├── .dvc/ # DVC config (created by dvc init)
├── configs/
│ └── params.yaml # Hyperparameters
├── data/
│ ├── raw/ # Original data (tracked by DVC)
│ └── processed/ # Feature-engineered data
├── kubernetes/
│ └── deployment.yaml # K8s manifests
├── models/ # Trained model artifacts
├── monitoring/
│ ├── drift_check.py
│ └── reference_data.parquet # Saved at deployment time
├── notebooks/
│ └── exploration.ipynb # EDA only
├── src/
│ ├── api/
│ │ └── app.py # FastAPI serving
│ ├── data/
│ │ ├── generate_dataset.py # Creates synthetic churn data
│ │ ├── preprocess.py # Feature engineering
│ │ └── validate.py # Data quality checks
│ └── models/
│ ├── train.py # Training + MLflow logging
│ └── evaluate.py # Evaluation + metrics output
├── tests/
│ ├── test_preprocess.py
│ └── test_api.py
├── ci/
│ ├── check_quality_gates.py
│ └── register_model.py
├── dvc.yaml # Pipeline definition
├── Dockerfile
├── .dockerignore
├── requirements.txt
└── README.mdStep 1: Initialize the Project with DVC
# Create and initialize the project
mkdir churn-mlops && cd churn-mlops
git init
dvc init
# Configure DVC remote storage (S3 or local for development)
dvc remote add -d localremote /tmp/dvc-cache
# Create initial commit
git add .dvc/ .gitignore
git commit -m "Initialize project with DVC"configs/params.yaml:
# configs/params.yaml
learning_rate: 0.05
n_estimators: 200
max_depth: 5
test_size: 0.2
random_seed: 42
min_accuracy: 0.80
min_auc: 0.85Step 2: Generate and Version the Dataset
# src/data/generate_dataset.py
"""Generate synthetic customer churn data."""
import numpy as np
import pandas as pd
from pathlib import Path
def generate_churn_data(n_samples: int = 10000, seed: int = 42) -> pd.DataFrame:
np.random.seed(seed)
tenure = np.random.randint(1, 72, n_samples)
monthly_charges = np.random.uniform(20, 100, n_samples)
num_products = np.random.randint(1, 6, n_samples)
has_support_calls = np.random.randint(0, 2, n_samples)
total_charges = tenure * monthly_charges + np.random.normal(0, 50, n_samples)
total_charges = np.clip(total_charges, 0, None)
# Churn probability model
churn_logit = (
-2.5
- 0.04 * tenure
+ 0.02 * monthly_charges
- 0.3 * num_products
+ 0.8 * has_support_calls
+ np.random.normal(0, 0.5, n_samples)
)
churn_prob = 1 / (1 + np.exp(-churn_logit))
churn = (churn_prob > 0.5).astype(int)
return pd.DataFrame({
"customer_id": range(1, n_samples + 1),
"tenure_months": tenure,
"monthly_charges": monthly_charges.round(2),
"total_charges": total_charges.round(2),
"num_products": num_products,
"has_support_calls": has_support_calls,
"churn": churn,
})
if __name__ == "__main__":
Path("data/raw").mkdir(parents=True, exist_ok=True)
df = generate_churn_data()
df.to_csv("data/raw/customers.csv", index=False)
print(f"Generated {len(df)} rows. Churn rate: {df['churn'].mean():.1%}")python src/data/generate_dataset.py
dvc add data/raw/customers.csv
git add data/raw/customers.csv.dvc data/raw/.gitignore
git commit -m "Add initial customer dataset (10k rows)"
dvc pushStep 3: Preprocessing and Training
# src/data/preprocess.py
import pandas as pd
import yaml
from pathlib import Path
from sklearn.model_selection import train_test_split
def preprocess(params: dict) -> None:
df = pd.read_csv("data/raw/customers.csv")
# Features
feature_cols = [
"tenure_months", "monthly_charges", "total_charges",
"num_products", "has_support_calls"
]
X = df[feature_cols]
y = df["churn"]
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=params["test_size"],
random_state=params["random_seed"],
stratify=y,
)
Path("data/processed").mkdir(parents=True, exist_ok=True)
train = X_train.copy()
train["churn"] = y_train
test = X_test.copy()
test["churn"] = y_test
train.to_csv("data/processed/train.csv", index=False)
test.to_csv("data/processed/test.csv", index=False)
print(f"Train: {len(train)} | Test: {len(test)}")
if __name__ == "__main__":
with open("configs/params.yaml") as f:
params = yaml.safe_load(f)
preprocess(params)# src/models/train.py
import json, os, pickle
import mlflow, mlflow.sklearn
import pandas as pd, yaml
from pathlib import Path
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
def train() -> None:
with open("configs/params.yaml") as f:
params = yaml.safe_load(f)
train_df = pd.read_csv("data/processed/train.csv")
X_train = train_df.drop("churn", axis=1)
y_train = train_df["churn"]
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"))
mlflow.set_experiment("churn-prediction")
with mlflow.start_run() as run:
mlflow.log_params(params)
mlflow.set_tag("git_sha", os.getenv("GITHUB_SHA", "local"))
model = GradientBoostingClassifier(
learning_rate=params["learning_rate"],
n_estimators=params["n_estimators"],
max_depth=params["max_depth"],
random_state=params["random_seed"],
)
model.fit(X_train, y_train)
y_pred = model.predict(X_train)
y_prob = model.predict_proba(X_train)[:, 1]
train_metrics = {
"train_accuracy": accuracy_score(y_train, y_pred),
"train_auc": roc_auc_score(y_train, y_prob),
}
mlflow.log_metrics(train_metrics)
Path("models").mkdir(exist_ok=True)
with open("models/churn_model.pkl", "wb") as f:
pickle.dump(model, f)
mlflow.sklearn.log_model(model, "model")
# Save run ID for CI to use
Path("models/run_id.txt").write_text(run.info.run_id)
print(f"Run ID: {run.info.run_id}")
print(f"Train metrics: {train_metrics}")
if __name__ == "__main__":
train()# src/models/evaluate.py
import json, pickle
import pandas as pd
from pathlib import Path
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, classification_report
def evaluate() -> None:
test_df = pd.read_csv("data/processed/test.csv")
X_test = test_df.drop("churn", axis=1)
y_test = test_df["churn"]
with open("models/churn_model.pkl", "rb") as f:
model = pickle.load(f)
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred),
"roc_auc": roc_auc_score(y_test, y_prob),
}
Path("metrics").mkdir(exist_ok=True)
with open("metrics/eval_metrics.json", "w") as f:
json.dump(metrics, f, indent=2)
print(f"Eval metrics: {metrics}")
print(classification_report(y_test, y_pred))
if __name__ == "__main__":
evaluate()Step 4: Define the DVC Pipeline
# dvc.yaml
stages:
preprocess:
cmd: python src/data/preprocess.py
deps:
- src/data/preprocess.py
- data/raw/customers.csv
params:
- configs/params.yaml:
- test_size
- random_seed
outs:
- data/processed/train.csv
- data/processed/test.csv
train:
cmd: python src/models/train.py
deps:
- src/models/train.py
- data/processed/train.csv
params:
- configs/params.yaml:
- learning_rate
- n_estimators
- max_depth
- random_seed
outs:
- models/churn_model.pkl
- models/run_id.txt
evaluate:
cmd: python src/models/evaluate.py
deps:
- src/models/evaluate.py
- models/churn_model.pkl
- data/processed/test.csv
metrics:
- metrics/eval_metrics.json:
cache: falseTest the full pipeline locally:
dvc repro
dvc metrics showStep 5: GitHub Actions CI Pipeline
# .github/workflows/train.yml
name: ML Training Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
workflow_dispatch:
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
GITHUB_SHA: ${{ github.sha }}
jobs:
train-and-deploy:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "pip"
- name: Install dependencies
run: pip install -r requirements.txt
- name: Pull data from DVC
run: |
dvc remote modify myremote access_key_id $AWS_ACCESS_KEY_ID
dvc remote modify myremote secret_access_key $AWS_SECRET_ACCESS_KEY
dvc pull
- name: Validate data
run: python src/data/validate.py
- name: Run unit tests
run: pytest tests/ -v
- name: Run training pipeline
run: dvc repro
- name: Check quality gates
run: |
python ci/check_quality_gates.py \
--metrics-file metrics/eval_metrics.json \
--min-accuracy 0.80 \
--min-auc 0.85
- name: Register model in MLflow
if: github.ref == 'refs/heads/main' && github.event_name == 'push'
run: |
RUN_ID=$(cat models/run_id.txt)
python ci/register_model.py --run-id $RUN_ID
- name: Build and push Docker image
if: github.ref == 'refs/heads/main' && github.event_name == 'push'
run: |
echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u ${{ github.actor }} --password-stdin
docker build -t ghcr.io/${{ github.repository }}:${{ github.sha }} .
docker build -t ghcr.io/${{ github.repository }}:latest .
docker push ghcr.io/${{ github.repository }}:${{ github.sha }}
docker push ghcr.io/${{ github.repository }}:latest
- name: Deploy to Kubernetes
if: github.ref == 'refs/heads/main' && github.event_name == 'push'
run: |
# Update the image tag in the deployment
kubectl set image deployment/churn-predictor \
api=ghcr.io/${{ github.repository }}:${{ github.sha }} \
-n ml-serving
kubectl rollout status deployment/churn-predictor -n ml-serving
- name: Comment PR with metrics
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const m = JSON.parse(fs.readFileSync('metrics/eval_metrics.json'));
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## Evaluation Results\n| Metric | Value |\n|--------|-------|\n| Accuracy | ${m.accuracy.toFixed(4)} |\n| F1 Score | ${m.f1_score.toFixed(4)} |\n| ROC AUC | ${m.roc_auc.toFixed(4)} |`
});Step 6: Weekly Drift Monitoring
# .github/workflows/scheduled-retrain.yml
name: Weekly Drift Check and Retraining
on:
schedule:
- cron: '0 6 * * 1' # Every Monday at 6 AM UTC
workflow_dispatch:
jobs:
drift-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "pip"
- run: pip install -r requirements.txt
- name: Run drift check
id: drift
run: |
python monitoring/drift_check.py
DRIFTED=$(python -c "import json; d=json.load(open('monitoring/latest_drift_check.json')); print(d['dataset_drifted'])")
echo "drifted=$DRIFTED" >> $GITHUB_OUTPUT
- name: Trigger retraining if drift detected
if: steps.drift.outputs.drifted == 'True'
uses: actions/github-script@v7
with:
script: |
// Trigger the training workflow
github.rest.actions.createWorkflowDispatch({
owner: context.repo.owner,
repo: context.repo.repo,
workflow_id: 'train.yml',
ref: 'main',
});
console.log('Drift detected — triggered retraining workflow');
- name: Upload drift report
uses: actions/upload-artifact@v4
with:
name: drift-report
path: monitoring/reports/Step 7: The Complete Dockerfile
Use the full Dockerfile from lesson 5, then build and verify:
docker build -t churn-predictor:latest .
docker run -p 8000:8000 \
-e MLFLOW_TRACKING_URI=http://localhost:5000 \
churn-predictor:latest
curl http://localhost:8000/healthPutting It All Together: The Full Flow
Here is what happens when a data scientist pushes an improvement to main:
Developer pushes to main branch
|
v
GitHub Actions: train.yml triggers
|
├─ Pull versioned data (DVC)
├─ Validate data schema and quality
├─ Run unit tests
├─ Run full pipeline (dvc repro)
│ ├─ Preprocess data
│ ├─ Train model (logged to MLflow)
│ └─ Evaluate model (metrics saved)
├─ Quality gate check (accuracy ≥ 0.80, AUC ≥ 0.85)
│ ├─ FAIL → pipeline fails, merge blocked
│ └─ PASS → continue
├─ Compare with production model in MLflow Registry
│ ├─ Worse → archive new version, fail pipeline
│ └─ Better → promote to Production stage
├─ Build Docker image, push to registry with SHA tag
└─ Deploy to Kubernetes (rolling update)
Every Monday at 6 AM:
|
v
GitHub Actions: scheduled-retrain.yml triggers
├─ Fetch recent production traffic from data warehouse
├─ Run Evidently drift check vs reference data
├─ Generate HTML drift report (uploaded as artifact)
└─ If drifted → trigger full training workflowTesting Your Complete Pipeline
# 1. Run everything locally with DVC
dvc repro
# 2. Check metrics
dvc metrics show
# 3. Start the API
uvicorn src.api.app:app --reload
# 4. Run API integration tests
pytest tests/test_api.py -v
# 5. Build and test Docker image
docker build -t churn-predictor:test .
docker run -p 8001:8000 churn-predictor:test &
curl http://localhost:8001/health
curl -X POST http://localhost:8001/predict \
-H "Content-Type: application/json" \
-d '{"tenure_months": 6, "monthly_charges": 79.99, "total_charges": 479.94, "num_products": 1, "has_support_calls": 1}'
# 6. Run drift check against reference data
python monitoring/drift_check.pyWhat You Have Built
You started this course with the scenario: a data scientist emails a pickle file to a colleague who can’t reproduce it, and a model that silently degrades six months later.
You now have the infrastructure that prevents both of those failures:
- Reproducibility: Anyone can run
dvc reproand get the same model. Every artifact is versioned. - Automated quality: No bad model can reach production — the CI pipeline enforces it.
- Observability: You know exactly which model version is in production, what metrics it was trained on, and what data it used.
- Drift detection: The weekly monitoring job catches distribution shifts before they become user-visible failures.
- Rollback capability: A model rollback takes minutes, not days, because every version is in the registry.
This is production ML. The 85% of projects that never reach production fail because they skip this infrastructure. You’ve built it.
The next step: apply this pipeline to a real problem you care about. The patterns generalize to any supervised ML task — the tools and workflows are the same whether you’re predicting churn, detecting fraud, recommending content, or forecasting demand.
