In this tutorial, we build a complete, production-grade ML experimentation and deployment workflow using MLflow. We start by launching a dedicated MLflow Tracking Server with a structured backend and artifact store, enabling us to track experiments in a scalable, reproducible manner. We then train multiple machine learning models using a nested hyperparameter sweep while automatically logging parameters, metrics, and model artifacts. We enhance the experiment by logging diagnostic visualizations, evaluating the best model using MLflow’s built-in evaluation framework, and storing detailed evaluation results for future analysis. We also deploy the trained model using MLflow’s native serving capabilities and interact with it via a REST API, demonstrating how MLflow bridges the gap between experimentation and real-world model deployment.
!pip -q install "mlflow>=3.0.0" scikit-learn pandas numpy matplotlib requests
import os
import time
import json
import shutil
import socket
import signal
import subprocess
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import requests
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
roc_auc_score,
accuracy_score,
precision_score,
recall_score,
f1_score,
confusion_matrix,
ConfusionMatrixDisplay,
)
import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature
def _is_port_open(host: str, port: int, timeout_s: float = 0.2) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout_s)
return s.connect_ex((host, port)) == 0
def _wait_for_http(url: str, timeout_s: int = 30) -> None:
t0 = time.time()
last_err = None
while time.time() - t0 < timeout_s:
try:
r = requests.get(url, timeout=1)
if r.status_code < 500:
return
except Exception as e:
last_err = e
time.sleep(0.5)
raise RuntimeError(f"Server not ready at {url}. Last error: {last_err}")
def _safe_kill(proc: subprocess.Popen):
if proc is None:
return
try:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
except Exception:
pass
We install all required dependencies and import the complete MLflow, scikit-learn, and system libraries needed for experiment tracking and deployment. We define utility functions that allow us to check port availability, wait for server readiness, and safely terminate background processes. We establish the foundational infrastructure to ensure our MLflow tracking server and model-serving components operate reliably in the Colab environment.
BASE_DIR = Path("/content/mlflow_colab_demo").resolve()
BACKEND_DB = BASE_DIR / "mlflow.db"
ARTIFACT_ROOT = BASE_DIR / "mlartifacts"
os.makedirs(BASE_DIR, exist_ok=True)
os.makedirs(ARTIFACT_ROOT, exist_ok=True)
HOST = "127.0.0.1"
PORT = 5000
TRACKING_URI = f"http://{HOST}:{PORT}"
if _is_port_open(HOST, PORT):
for p in range(5001, 5015):
if not _is_port_open(HOST, p):
PORT = p
TRACKING_URI = f"http://{HOST}:{PORT}"
break
print("Using TRACKING_URI:", TRACKING_URI)
print("Backend DB:", BACKEND_DB)
print("Artifact root:", ARTIFACT_ROOT)
server_cmd = [
"mlflow",
"server",
"--host", HOST,
"--port", str(PORT),
"--backend-store-uri", f"sqlite:///{BACKEND_DB}",
"--default-artifact-root", str(ARTIFACT_ROOT),
]
mlflow_server = subprocess.Popen(
server_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
_wait_for_http(TRACKING_URI, timeout_s=45)
mlflow.set_tracking_uri(TRACKING_URI)
print("MLflow server is up.")
EXPERIMENT_NAME = "colab-advanced-mlflow-sklearn"
mlflow.set_experiment(EXPERIMENT_NAME)
We configure the MLflow backend storage and artifact directories to create a structured, persistent experiment-tracking environment. We launch the MLflow Tracking Server with a SQLite database and a local artifact store, enabling full experiment logging and management. We connect our notebook to the running MLflow server and initialize a dedicated experiment that will organize all training runs and associated metadata.
data = load_breast_cancer(as_frame=True)
df = data.frame.copy()
target_col = "target"
X = df.drop(columns=[target_col])
y = df[target_col].astype(int)
mlflow.sklearn.autolog(
log_input_examples=False,
log_model_signatures=False,
silent=True
)
C_VALUES = [0.01, 0.1, 1.0, 3.0]
SOLVERS = ["liblinear", "lbfgs"]
best = {"auc": -1.0, "run_id": None, "params": None}
We load the dataset and prepare the training and testing splits required for machine learning experimentation. We enable MLflow autologging, allowing automatic tracking of parameters, metrics, and model artifacts without manual intervention. We define the hyperparameter search space and initialize the structure to identify and store the best-performing model configuration.
with mlflow.start_run(run_name="parent_sweep_run") as parent_run:
mlflow.log_param("dataset", "sklearn_breast_cancer")
mlflow.log_param("n_features", X_train.shape[1])
mlflow.log_param("n_train", X_train.shape[0])
mlflow.log_param("n_test", X_test.shape[0])
for C in C_VALUES:
for solver in SOLVERS:
with mlflow.start_run(run_name=f"child_C={C}_solver={solver}", nested=True) as child_run:
pipe = Pipeline([
("scaler", StandardScaler()),
("clf", LogisticRegression(
C=C,
solver=solver,
penalty="l2",
max_iter=2000,
random_state=42
))
])
pipe.fit(X_train, y_train)
proba = pipe.predict_proba(X_test)[:, 1]
pred = (proba >= 0.5).astype(int)
auc = roc_auc_score(y_test, proba)
acc = accuracy_score(y_test, pred)
prec = precision_score(y_test, pred, zero_division=0)
rec = recall_score(y_test, pred, zero_division=0)
f1 = f1_score(y_test, pred, zero_division=0)
mlflow.log_metrics({
"test_auc": float(auc),
"test_accuracy": float(acc),
"test_precision": float(prec),
"test_recall": float(rec),
"test_f1": float(f1),
})
cm = confusion_matrix(y_test, pred)
disp = ConfusionMatrixDisplay(cm, display_labels=data.target_names)
fig, ax = plt.subplots(figsize=(5, 4))
disp.plot(ax=ax, values_format="d")
ax.set_title(f"Confusion Matrix (C={C}, solver={solver})")
cm_path = BASE_DIR / "confusion_matrix.png"
fig.tight_layout()
fig.savefig(cm_path, dpi=140)
plt.close(fig)
mlflow.log_artifact(str(cm_path), artifact_path="diagnostics")
if auc > best["auc"]:
best.update({
"auc": float(auc),
"run_id": child_run.info.run_id,
"params": {"C": C, "solver": solver}
})
mlflow.log_dict(best, "best_run_summary.json")
print("Best config:", best)
We perform a nested hyperparameter sweep, training multiple models within a structured parent-child run hierarchy. We compute performance metrics and log them alongside diagnostic artifacts, such as confusion matrices, to enable detailed analysis of experiments. We continuously monitor model performance and update our tracking structure to identify the best configuration across all training runs.
best_C = best["params"]["C"]
best_solver = best["params"]["solver"]
final_pipe = Pipeline([
("scaler", StandardScaler()),
("clf", LogisticRegression(
C=best_C,
solver=best_solver,
penalty="l2",
max_iter=2000,
random_state=42
))
])
with mlflow.start_run(run_name="final_model_run") as final_run:
final_pipe.fit(X_train, y_train)
proba = final_pipe.predict_proba(X_test)[:, 1]
pred = (proba >= 0.5).astype(int)
metrics = {
"test_auc": float(roc_auc_score(y_test, proba)),
"test_accuracy": float(accuracy_score(y_test, pred)),
"test_precision": float(precision_score(y_test, pred, zero_division=0)),
"test_recall": float(recall_score(y_test, pred, zero_division=0)),
"test_f1": float(f1_score(y_test, pred, zero_division=0)),
}
mlflow.log_metrics(metrics)
mlflow.log_params({"C": best_C, "solver": best_solver, "model": "LogisticRegression+StandardScaler"})
input_example = X_test.iloc[:5].copy()
signature = infer_signature(input_example, final_pipe.predict_proba(input_example)[:, 1])
model_info = mlflow.sklearn.log_model(
sk_model=final_pipe,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name=None,
)
print("Final run_id:", final_run.info.run_id)
print("Logged model URI:", model_info.model_uri)
eval_df = X_test.copy()
eval_df["label"] = y_test.values
eval_result = mlflow.models.evaluate(
model=model_info.model_uri,
data=eval_df,
targets="label",
model_type="classifier",
evaluators="default",
)
eval_summary = {
"metrics": {k: float(v) if isinstance(v, (int, float, np.floating)) else str(v)
for k, v in eval_result.metrics.items()},
"artifacts": {k: str(v) for k, v in eval_result.artifacts.items()},
}
mlflow.log_dict(eval_summary, "evaluation/eval_summary.json")
We train the final model using the best hyperparameters identified during the experiment sweep and log it with a proper signature and input example. We evaluate the model using MLflow’s built-in evaluation framework, which generates detailed metrics and evaluation artifacts. We store the evaluation summary within MLflow, ensuring the final model is fully documented, reproducible, and ready for deployment.
SERVE_PORT = 6000
if _is_port_open(HOST, SERVE_PORT):
for p in range(6001, 6020):
if not _is_port_open(HOST, p):
SERVE_PORT = p
break
MODEL_URI = model_info.model_uri
serve_cmd = [
"mlflow", "models", "serve",
"-m", MODEL_URI,
"-p", str(SERVE_PORT),
"--host", HOST,
"--env-manager", "local"
]
mlflow_serve = subprocess.Popen(
serve_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
serve_url = f"http://{HOST}:{SERVE_PORT}/invocations"
_wait_for_http(f"http://{HOST}:{SERVE_PORT}", timeout_s=60)
print("Model server is up at:", serve_url)
payload = {
"dataframe_split": {
"columns": list(X_test.columns),
"data": X_test.iloc[:3].values.tolist()
}
}
r = requests.post(
serve_url,
headers={"Content-Type": "application/json"},
data=json.dumps(payload),
timeout=10
)
print("Serve status:", r.status_code)
print("Predictions (probabilities or outputs):", r.text)
print("nOpen the MLflow UI by visiting:", TRACKING_URI)
print("Artifacts are stored under:", ARTIFACT_ROOT)
We deploy the trained MLflow model as a live REST API service using MLflow’s native serving infrastructure. We send a test request to the deployed model endpoint to verify that the model responds correctly and produces predictions in real time. We complete the full machine learning lifecycle by transitioning from experiment tracking to live model deployment within a unified MLflow workflow.
In conclusion, we established a fully integrated ML lifecycle pipeline using MLflow, covering experiment tracking, hyperparameter optimization, artifact logging, model evaluation, and live model serving. We created a structured environment in which every training run is tracked, reproducible, and auditable, enabling efficient experimentation and model comparison. We leveraged MLflow’s model packaging and serving infrastructure to transform trained models into deployable services with minimal effort. By completing this workflow, we demonstrated how MLflow functions as a central orchestration layer for managing machine learning systems, enabling scalable, reproducible, and production-ready ML pipelines entirely within a cloud-based notebook environment.
Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post A Complete End-to-End Coding Guide to MLflow Experiment Tracking, Hyperparameter Optimization, Model Evaluation, and Live Model Deployment appeared first on MarkTechPost.
