4. Upstream Data Changes
5. Virtual Drift (False Alarm)
Drift patterns:
For numerical features:
1. Kolmogorov-Smirnov (KS) Test
scipy.stats.ks_2samp(reference, current)2. Wasserstein Distance (Earth Mover's Distance)
scipy.stats.wasserstein_distance(reference, current)3. KL Divergence (Kullback-Leibler)
For categorical features:
1. Chi-Square Test
scipy.stats.chisquare(observed, expected)2. Population Stability Index (PSI)
def calculate_psi(reference, current, bins=10):
ref_counts, _ = np.histogram(reference, bins=bins)
curr_counts, _ = np.histogram(current, bins=bins)
ref_freq = ref_counts / len(reference)
curr_freq = curr_counts / len(current)
psi = np.sum((curr_freq - ref_freq) * np.log(curr_freq / ref_freq))
return psi
import numpy as np
from scipy.stats import ks_2samp
# Reference distribution (training data)
reference = np.random.normal(loc=0, scale=1, size=10000)
# Current distribution (production, shifted)
current = np.random.normal(loc=0.5, scale=1.2, size=1000)
# Perform KS test
statistic, p_value = ks_2samp(reference, current)
print(f"KS Statistic: {statistic:.4f}")
print(f"P-value: {p_value:.4f}")
if p_value < 0.05:
print("
Drift detected!")
else:
print("✓ No significant drift")
Output:
KS Statistic: 0.1234
P-value: 0.0001
Drift detected!
Open-Source:
Commercial:
DIY Stack:
Installation:
pip install evidently
Generate drift report:
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
import pandas as pd
# Load data
reference_data = pd.read_csv("train.csv")
current_data = pd.read_csv("production_last_week.csv")
# Create report
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset()
])
report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=None # Auto-detect or specify target
)
# Save as HTML
report.save_html("drift_report.html")
Output: Interactive HTML dashboard with drift analysis per feature.
Automated testing for data/model:
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset
# Define tests
tests = TestSuite(tests=[
DataDriftTestPreset(),
DataQualityTestPreset(),
])
tests.run(reference_data=ref_df, current_data=curr_df)
# Check if tests passed
result = tests.as_dict()
if result['summary']['failed_tests'] > 0:
print("
Tests failed!")
# Send alert
else:
print("✓ All tests passed")
Use case: Run in CI/CD pipeline or scheduled job.
Multivariate drift detection:
from alibi_detect.cd import MMDDrift
import numpy as np
# Reference data (training set)
X_ref = np.random.randn(1000, 10) # 1000 samples, 10 features
# Initialize detector
drift_detector = MMDDrift(
X_ref,
p_val=0.05,
n_permutations=100
)
# Production data
X_prod = np.random.randn(200, 10) + 0.5 # Shifted
# Detect drift
prediction = drift_detector.predict(X_prod)
print(f"Drift detected: {prediction['data']['is_drift']}")
print(f"P-value: {prediction['data']['p_val']:.4f}")
print(f"Distance: {prediction['data']['distance']:.4f}")
Advantages:
Metrics to track:
Classification:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Per-day metrics
daily_metrics = {
'accuracy': [],
'precision': [],
'recall': [],
'f1': []
}
for day in range(30):
y_true = get_labels_for_day(day) # Ground truth (delayed)
y_pred = get_predictions_for_day(day)
daily_metrics['accuracy'].append(accuracy_score(y_true, y_pred))
daily_metrics['precision'].append(precision_score(y_true, y_pred))
daily_metrics['recall'].append(recall_score(y_true, y_pred))
daily_metrics['f1'].append(f1_score(y_true, y_pred))
# Alert if degradation
if daily_metrics['f1'][-1] < 0.75: # Below threshold
send_alert("Model F1 dropped below 0.75!")
Regression:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
# Monitor residuals
residuals = y_true - y_pred
# Track over time
metrics = {
'mae': mean_absolute_error(y_true, y_pred),
'rmse': np.sqrt(mean_squared_error(y_true, y_pred)),
'r2': r2_score(y_true, y_pred),
'mean_residual': np.mean(residuals),
'std_residual': np.std(residuals)
}
# Detect drift in residuals
if abs(metrics['mean_residual']) > threshold:
print("
Systematic bias detected!")
Key insight: Monitor residual distribution, not just aggregates.
Monitor prediction distribution (even without labels!):
import numpy as np
import matplotlib.pyplot as plt
# Collect predictions over time
predictions_week1 = model.predict(X_week1)
predictions_week2 = model.predict(X_week2)
# Compare distributions
from scipy.stats import ks_2samp
stat, p_val = ks_2samp(predictions_week1, predictions_week2)
if p_val < 0.05:
print("
Prediction distribution changed!")
# Visualize
plt.figure(figsize=(10, 4))
plt.hist(predictions_week1, bins=30, alpha=0.5, label='Week 1')
plt.hist(predictions_week2, bins=30, alpha=0.5, label='Week 2')
plt.xlabel('Prediction Score')
plt.ylabel('Frequency')
plt.legend()
plt.title('Prediction Distribution Drift')
plt.show()
Use case: Detect drift before labels arrive.
Challenge: Ground truth arrives late.
Examples:
Strategies:
1. Use proxy metrics (fast feedback):
2. Sample labeling:
3. Model-based imputation:
Labels arrive too late. Loan defaults? You wait 6 months to know if you were right!
By then, thousands of bad decisions. Solution: monitor what you CAN see NOW.
Day 1: Model: "Low risk" │
Day 90: │ │ (still waiting...)
Day 180: │ "Default!" (too late!)
│
Model made 1000s more predictions!
Monitor input drift and prediction distributions - signals you can see NOW.
What to log:
import uuid
from datetime import datetime
import json
def log_prediction(model_id, features, prediction, metadata=None):
"""Log prediction for monitoring."""
prediction_id = str(uuid.uuid4())
log_entry = {
'prediction_id': prediction_id,
'timestamp': datetime.utcnow().isoformat(),
'model_id': model_id,
'model_version': '1.2.3',
'features': features.tolist(), # Input features
'prediction': prediction,
'prediction_probability': model.predict_proba(features)[0].tolist(),
'metadata': metadata or {}
}
# Write to storage (S3, BigQuery, Postgres)
write_to_storage(log_entry)
return prediction_id
def log_feedback(prediction_id, actual_label):
"""Log ground truth when available."""
feedback_entry = {
'prediction_id': prediction_id,
'timestamp': datetime.utcnow().isoformat(),
'actual_label': actual_label
}
write_to_storage(feedback_entry)
1. Log asynchronously (don't block predictions):
import threading
def async_log(log_entry):
thread = threading.Thread(target=write_to_storage, args=(log_entry,))
thread.start()
# Or use queue
from queue import Queue
log_queue = Queue()
def log_worker():
while True:
entry = log_queue.get()
write_to_storage(entry)
log_queue.task_done()
# Start worker
threading.Thread(target=log_worker, daemon=True).start()
# Log
log_queue.put(log_entry)
2. Sample for high-volume systems:
import random
SAMPLE_RATE = 0.1 # Log 10% of predictions
if random.random() < SAMPLE_RATE:
log_prediction(features, prediction)

Architecture components:
Expose ML metrics:
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Define metrics
PREDICTIONS_TOTAL = Counter(
'ml_predictions_total',
'Total number of predictions',
['model_version', 'endpoint']
)
PREDICTION_LATENCY = Histogram(
'ml_prediction_latency_seconds',
'Prediction latency in seconds'
)
PREDICTION_SCORE = Histogram(
'ml_prediction_score',
'Distribution of prediction scores',
buckets=[0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)
MODEL_ACCURACY = Gauge(
'ml_model_accuracy',
'Current model accuracy'
)
# Start metrics server
start_http_server(8000) # Metrics at http://localhost:8000/metrics
Instrument prediction function:
import time
@PREDICTION_LATENCY.time()
def predict(features):
# Make prediction
prediction = model.predict(features)
score = model.predict_proba(features)[0, 1]
# Record metrics
PREDICTIONS_TOTAL.labels(
model_version='v1.2.3',
endpoint='/predict'
).inc()
PREDICTION_SCORE.observe(score)
return prediction, score
# Update accuracy periodically (in background job)
def update_accuracy():
y_true, y_pred = get_recent_predictions_with_labels()
accuracy = accuracy_score(y_true, y_pred)
MODEL_ACCURACY.set(accuracy)
Prometheus queries (PromQL):
1. Prediction rate:
rate(ml_predictions_total[5m])
2. P95 latency:
histogram_quantile(0.95,
rate(ml_prediction_latency_seconds_bucket[5m])
)
3. Accuracy over time:
ml_model_accuracy
4. Prediction score distribution:
rate(ml_prediction_score_bucket[5m])
Grafana: Connect to Prometheus, create dashboards with these queries.
Prometheus alerting rules (alerts.yml):
groups:
- name: ml_model_alerts
interval: 30s
rules:
- alert: ModelAccuracyLow
expr: ml_model_accuracy < 0.85
for: 10m
labels:
severity: warning
annotations:
summary: "Model accuracy below threshold"
description: "Accuracy is {{ $value }}, threshold is 0.85"
- alert: HighPredictionLatency
expr: histogram_quantile(0.95, rate(ml_prediction_latency_seconds_bucket[5m])) > 0.5
for: 5m
labels:
severity: critical
annotations:
summary: "P95 latency above 500ms"
Send alerts to Slack:
import requests
import json
def send_slack_alert(message):
webhook_url = os.getenv("SLACK_WEBHOOK_URL")
payload = {
"text": f"
ML Model Alert: {message}",
"channel": "#ml-monitoring",
"username": "ML Monitor Bot"
}
response = requests.post(
webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
return response.status_code == 200
# Usage
if drift_detected:
send_slack_alert(
f"Data drift detected in feature '{feature_name}' "
f"(p-value: {p_value:.4f})"
)
Shadow deployment (run new model alongside old):
def predict_with_shadow(features):
# Production model (served to user)
prediction_v1 = model_v1.predict(features)
# Shadow model (logged but not served)
prediction_v2 = model_v2.predict(features)
# Log both for comparison
log_predictions(
features=features,
v1_prediction=prediction_v1,
v2_prediction=prediction_v2,
served='v1'
)
# Return production prediction
return prediction_v1
A/B test (split traffic):
def predict_with_ab_test(features, user_id):
# Deterministic assignment (consistent per user)
variant = 'A' if hash(user_id) % 2 == 0 else 'B'
if variant == 'A':
prediction = model_a.predict(features)
else:
prediction = model_b.predict(features)
log_prediction(features, prediction, variant=variant)
return prediction
When to retrain:
1. Performance-based:
if current_f1 < baseline_f1 * 0.95: # 5% degradation
trigger_retraining()
2. Drift-based:
if psi_score > 0.2: # Significant drift
trigger_retraining()
3. Time-based:
if days_since_last_training > 30:
trigger_retraining()
4. Data-based:
if num_new_labeled_samples > 10000:
trigger_retraining()
Best practice: Combine multiple triggers with OR logic.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
}
dag = DAG(
'model_monitoring_and_retraining',
default_args=default_args,
schedule_interval='@daily',
)
def check_drift():
# Run drift analysis
drift_detected = analyze_drift()
if drift_detected:
return 'retrain_model'
else:
return 'skip_retraining'
def retrain_model():
# Retrain model with new data
new_model = train(get_recent_data())
# Evaluate
if evaluate(new_model) > current_model_score:
deploy(new_model)
check_drift_task = PythonOperator(
task_id='check_drift',
python_callable=check_drift,
dag=dag,
)
retrain_task = PythonOperator(
task_id='retrain_model',
python_callable=retrain_model,
dag=dag,
)
check_drift_task >> retrain_task
Track model versions:
import mlflow
# Log model
with mlflow.start_run():
mlflow.log_params(hyperparameters)
mlflow.log_metrics({"accuracy": accuracy, "f1": f1})
mlflow.sklearn.log_model(model, "model")
# Tag
mlflow.set_tag("stage", "production")
mlflow.set_tag("deployed_at", datetime.now().isoformat())
# Load specific version
model_uri = "models:/my-model/production"
model = mlflow.sklearn.load_model(model_uri)
Rollback strategy:
def rollback_to_previous_version():
# Load previous model
previous_model = load_model_version('v1.2.2')
# Deploy
deploy_model(previous_model)
# Alert
send_alert("Rolled back to v1.2.2 due to performance issues")
Monitor feature statistics:
from feast import FeatureStore
store = FeatureStore(repo_path=".")
# Monitor feature freshness
def check_feature_freshness():
features = store.get_online_features(
features=['driver:avg_rating', 'driver:completed_trips'],
entity_rows=[{'driver_id': 123}]
)
last_updated = features.metadata.feature_timestamps['driver:avg_rating']
age_hours = (datetime.now() - last_updated).total_seconds() / 3600
if age_hours > 24:
send_alert(f"Feature driver:avg_rating is {age_hours:.1f} hours old!")
# Monitor feature distributions
def monitor_feature_distribution(feature_name):
values = get_feature_values_last_24h(feature_name)
ref_mean = REFERENCE_STATS[feature_name]['mean']
ref_std = REFERENCE_STATS[feature_name]['std']
current_mean = np.mean(values)
# Z-score
z_score = abs(current_mean - ref_mean) / ref_std
if z_score > 3: # 3 standard deviations
send_alert(f"Feature {feature_name} mean shifted significantly!")
Monitor predictions across demographic groups:
from fairlearn.metrics import MetricFrame, selection_rate, false_positive_rate
# Compute metrics per group
metric_frame = MetricFrame(
metrics={
'accuracy': accuracy_score,
'fpr': false_positive_rate,
'selection_rate': selection_rate
},
y_true=y_true,
y_pred=y_pred,
sensitive_features=df['gender'] # Protected attribute
)
print(metric_frame.by_group)
# Alert if disparity
accuracy_ratio = (
metric_frame.by_group['accuracy'].min() /
metric_frame.by_group['accuracy'].max()
)
if accuracy_ratio < 0.8: # 80% rule
send_alert(f"Fairness violation: accuracy ratio = {accuracy_ratio:.2f}")
Track over time: Log disparities daily, visualize trends.
Monitor feature importance drift:
from sklearn.inspection import permutation_importance
import numpy as np
# Compute importance on reference data
ref_importance = permutation_importance(
model, X_ref, y_ref, n_repeats=10
).importances_mean
# Compute importance on current data
curr_importance = permutation_importance(
model, X_curr, y_curr, n_repeats=10
).importances_mean
# Compare
importance_diff = np.abs(ref_importance - curr_importance)
# Alert if top features changed significantly
if np.max(importance_diff) > 0.1:
send_alert("Feature importance shifted!")
SHAP values monitoring:
import shap
# Track mean SHAP values over time
explainer = shap.Explainer(model)
shap_values = explainer(X_prod)
mean_shap = shap_values.values.mean(axis=0)
# Compare to baseline
if np.linalg.norm(mean_shap - baseline_mean_shap) > threshold:
send_alert("Model behavior changed (SHAP drift)")
Key dashboard sections:
1. Executive Summary
2. Model Performance
3. Data Quality
4. Drift Analysis
5. System Health
6. Prediction Analysis
7. Business Metrics
Tools: Grafana, Tableau, Streamlit, custom React dashboard
When alert fires:
1. Assess severity (5 min)
2. Initial investigation (15 min)
3. Mitigation (30 min)
4. Root cause analysis (1-2 hours)
5. Permanent fix (varies)
1. Start simple
2. Monitor early and often
3. Automate everything
4. Combine multiple signals
5. Set realistic thresholds
6. Document and iterate
1. Alert fatigue
2. Over-monitoring
3. Ignoring data quality
4. No baseline
5. Delayed action
6. Production-training skew
Today's lab:
Dataset: Bike sharing data (weather features → demand)
Deliverable:
Remember: "You can't improve what you don't measure" - Peter Drucker
Common interview questions on model monitoring:
"How do you detect when a deployed model needs retraining?"
"What is data drift and how would you detect it?"
Libraries and Tools:
Reading:
Courses: