Press ESC to exit fullscreen
📖 Lesson ⏱️ 120 minutes

Model Monitoring: Drift and Performance

Detect data drift and performance degradation in production models

The Silent Failure

March 2020. Your e-commerce recommendation model has been running in production for eight months. Accuracy on offline evaluation: 89%. The model recommends travel accessories, luggage, and hotel bookings. Users click through at a healthy 12% rate. Everything looks fine.

Then COVID hits. By April, nobody is buying travel gear. Your model still recommends luggage. Click-through rate: 2%. You’re surfacing irrelevant products to millions of users.

Nobody noticed for three weeks. Why? Because your monitoring dashboard shows server uptime (100%), response latency (45ms), and error rate (0.1%). The model is running fine. It’s just wrong.

This is the most dangerous type of production ML failure: the model isn’t broken, it’s just obsolete. And it’s invisible unless you’re watching the right signals.


Two Types of Drift

Data Drift (Covariate Shift)

The distribution of your inputs changes, even though the underlying relationship between inputs and outputs remains the same.

Example: A loan approval model trained on pre-2020 data. In 2021, a stimulus package increases average income by 15%. The model has never seen income values this high — they’re in the tail of its training distribution. Predictions become unreliable.

Concept Drift

The relationship between inputs and outputs changes. The model’s learned mapping no longer reflects reality.

Example: A fraud detection model trained on transaction patterns from 2019. In 2021, fraud patterns shifted to card-not-present attacks. The same input features (amount, merchant_category, location) now map to different fraud patterns than they did in training. The model’s predictions are increasingly wrong, even on inputs it “recognizes.”

TypeWhat changesWhat looks the same
Data driftFeature distributionsModel logic
Concept driftTrue relationshipFeature distributions

Concept drift is harder to detect because the features look normal — only the ground truth labels tell you something is wrong, and ground truth is often delayed (you don’t know if a loan defaulted for months).


What to Monitor

Build monitoring at three levels:

1. Infrastructure metrics (easy — use existing tools)

  • Prediction latency (p50, p95, p99)
  • Request throughput (requests/second)
  • Error rate
  • Pod memory and CPU

2. Model input metrics (data drift detection)

  • Mean, std, min, max of each numeric feature
  • Category distribution for categorical features
  • Null rate per feature
  • Comparison against training distribution

3. Model output metrics (prediction quality)

  • Prediction score distribution (if the average churn probability was 0.32 in training and is now 0.85, something is wrong)
  • Prediction class distribution
  • Accuracy, precision, recall (only available when you have ground truth labels)

Evidently: Drift Detection in Practice

Evidently is an open-source library for ML monitoring. It compares a reference dataset (your training data) against a current dataset (recent production traffic) and generates statistical tests for drift.

pip install evidently

Baseline: Save Your Reference Dataset

When you deploy a model, save the feature distributions of your training data. This is your reference.

# ci/save_reference_data.py — run this during CI when a new model is deployed
import pandas as pd

# Load the training features used for this model version
train_df = pd.read_csv("data/processed/train_features.csv")

# Save a representative sample as reference data
reference = train_df.sample(n=min(10000, len(train_df)), random_state=42)
reference.to_parquet("monitoring/reference_data.parquet", index=False)
print(f"Saved {len(reference)} reference rows")

Weekly Drift Check

# monitoring/drift_check.py
import json
from datetime import datetime, timedelta
from pathlib import Path

import pandas as pd
from evidently import ColumnMapping
from evidently.metrics import (
    DataDriftTable,
    DatasetDriftMetric,
)
from evidently.report import Report

def run_drift_check(
    reference_path: str,
    current_data_path: str,
    output_dir: str = "monitoring/reports",
) -> dict:
    """
    Compare current production data distribution against reference (training) data.
    Returns a summary dict with drift results.
    """
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    reference = pd.read_parquet(reference_path)
    current = pd.read_parquet(current_data_path)

    print(f"Reference: {len(reference)} rows")
    print(f"Current: {len(current)} rows")

    # Define column mapping
    # Target and prediction columns are optional but help Evidently
    column_mapping = ColumnMapping(
        target="churn",
        prediction=None,  # Set to column name if you log predictions
        numerical_features=[
            "tenure_months",
            "monthly_charges",
            "total_charges",
            "num_products",
        ],
        categorical_features=["has_support_calls"],
    )

    # Build the drift report
    report = Report(metrics=[
        DatasetDriftMetric(),   # Overall drift: is this dataset drifted?
        DataDriftTable(),       # Per-feature drift statistics
    ])

    report.run(
        reference_data=reference,
        current_data=current,
        column_mapping=column_mapping,
    )

    # Save HTML report for human review
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    html_path = f"{output_dir}/drift_report_{timestamp}.html"
    report.save_html(html_path)
    print(f"HTML report saved: {html_path}")

    # Extract structured results
    results = report.as_dict()
    drift_summary = results["metrics"][0]["result"]

    summary = {
        "timestamp": timestamp,
        "dataset_drifted": drift_summary["dataset_drift"],
        "drift_share": drift_summary["drift_share"],  # fraction of features drifted
        "number_of_drifted_features": drift_summary["number_of_drifted_columns"],
        "total_features": drift_summary["number_of_columns"],
        "html_report": html_path,
    }

    print(f"\nDrift Summary:")
    print(f"  Dataset drifted: {summary['dataset_drifted']}")
    print(f"  Features drifted: {summary['number_of_drifted_features']}/{summary['total_features']}")
    print(f"  Drift share: {summary['drift_share']:.1%}")

    return summary


def fetch_recent_production_data(days: int = 7) -> pd.DataFrame:
    """
    Fetch features from the last N days of production requests.
    In a real system, this reads from your request log database.
    """
    # Placeholder — replace with your actual data warehouse query
    # For example, reading from a PostgreSQL table where you log requests:
    #
    # from sqlalchemy import create_engine
    # engine = create_engine(os.getenv("DATABASE_URL"))
    # since = datetime.now() - timedelta(days=days)
    # return pd.read_sql(
    #     "SELECT tenure_months, monthly_charges, total_charges, "
    #     "num_products, has_support_calls "
    #     "FROM prediction_logs WHERE created_at > %(since)s",
    #     engine, params={"since": since}
    # )

    # For this example, simulate with random data
    import numpy as np
    n = 2000
    return pd.DataFrame({
        "tenure_months": np.random.randint(1, 72, n),
        "monthly_charges": np.random.uniform(20, 120, n),  # Note: wider range than training
        "total_charges": np.random.uniform(100, 9000, n),
        "num_products": np.random.randint(1, 5, n),
        "has_support_calls": np.random.randint(0, 2, n),
    })


if __name__ == "__main__":
    # Fetch recent production traffic
    current_data = fetch_recent_production_data(days=7)
    current_data.to_parquet("/tmp/current_data.parquet", index=False)

    # Run the drift check
    summary = run_drift_check(
        reference_path="monitoring/reference_data.parquet",
        current_data_path="/tmp/current_data.parquet",
    )

    # Save summary as JSON for CI to read
    with open("monitoring/latest_drift_check.json", "w") as f:
        json.dump(summary, f, indent=2)

    # Exit with error code if significant drift detected
    # (useful when run in CI to trigger alerting)
    if summary["dataset_drifted"]:
        print("\nWARNING: Significant data drift detected. Consider retraining.")
        exit(2)  # non-zero but distinct from error (1)

Automated Alert on Drift

Add a drift check to your scheduled pipeline:

# monitoring/alert.py
import json
import os
import smtplib
from email.mime.text import MIMEText

def send_drift_alert(summary: dict):
    """Send an email alert when drift is detected."""
    if not summary["dataset_drifted"]:
        return

    msg = MIMEText(f"""
Drift Alert: Churn Prediction Model

Dataset drifted: {summary['dataset_drifted']}
Features drifted: {summary['number_of_drifted_features']}/{summary['total_features']}
Drift share: {summary['drift_share']:.1%}

Full report: {summary['html_report']}

Recommended action: Review the drift report and consider retraining with recent data.
    """)
    msg["Subject"] = "ALERT: Data drift detected in churn model"
    msg["From"] = "mlops@yourcompany.com"
    msg["To"] = "data-science-team@yourcompany.com"

    with smtplib.SMTP(os.getenv("SMTP_HOST", "localhost")) as server:
        server.send_message(msg)
    print("Drift alert sent")

Monitoring Prediction Distributions

Even without ground truth labels, you can monitor what your model is doing. If the model was predicting 20% churn probability on average in January and is now predicting 60% in March, something changed — either data drift caused it, or there’s a bug.

# monitoring/prediction_monitor.py
import numpy as np
import pandas as pd
from evidently.metrics import ColumnDistributionMetric, ColumnDriftMetric
from evidently.report import Report

def monitor_predictions(
    reference_predictions: np.ndarray,
    current_predictions: np.ndarray,
) -> dict:
    """Check if prediction score distribution has shifted."""
    ref_df = pd.DataFrame({"churn_probability": reference_predictions})
    cur_df = pd.DataFrame({"churn_probability": current_predictions})

    report = Report(metrics=[
        ColumnDriftMetric(column_name="churn_probability"),
        ColumnDistributionMetric(column_name="churn_probability"),
    ])
    report.run(reference_data=ref_df, current_data=cur_df)

    result = report.as_dict()["metrics"][0]["result"]
    return {
        "prediction_drifted": result["drift_detected"],
        "p_value": result.get("p_value"),
        "statistic": result.get("statistic"),
    }

Logging Predictions for Monitoring

For all of this to work, you must log your model’s inputs and outputs in production. Add logging middleware to the FastAPI app from the previous lesson:

# src/api/prediction_logger.py
import json
from datetime import datetime
from pathlib import Path

class PredictionLogger:
    def __init__(self, log_dir: str = "logs/predictions"):
        Path(log_dir).mkdir(parents=True, exist_ok=True)
        self.log_dir = log_dir

    def log(self, features: dict, prediction: dict):
        record = {
            "timestamp": datetime.utcnow().isoformat(),
            "features": features,
            "prediction": prediction,
        }
        # In production: write to a database, Kafka, or S3
        # For development: write to a JSONL file
        log_path = f"{self.log_dir}/{datetime.utcnow().date()}.jsonl"
        with open(log_path, "a") as f:
            f.write(json.dumps(record) + "\n")

In your FastAPI endpoint:

logger_instance = PredictionLogger()

@app.post("/predict")
async def predict(customer: CustomerFeatures):
    # ... run inference ...
    response = PredictionResponse.from_probability(prob, state.model_version)
    # Log for monitoring
    logger_instance.log(
        features=customer.model_dump(),
        prediction=response.model_dump(),
    )
    return response

Summary

Production monitoring is the difference between a model that degrades silently and one you can defend. The key practices:

  • Save reference data at deployment time — you need a baseline to compare against
  • Run weekly drift checks comparing recent traffic against reference
  • Monitor both input distributions (data drift) and prediction distributions
  • Alert automatically when drift exceeds thresholds
  • Log every prediction with its input features — this is your monitoring data source

The next lesson covers feature stores: when your organization has multiple models, multiple teams, and shared features, how do you avoid computing the same feature five different ways?