In this tutorial, we build an advanced, hands-on tutorial around Google’s newly released colab-mcp, an open-source MCP (Model Context Protocol) server that lets any AI agent programmatically control Google Colab notebooks and runtimes. Across five self-contained snippets, we go from first principles to production-ready patterns. We start by constructing a minimal MCP tool registry from scratch. Hence, we understand the protocol’s core mechanics, tool registration, schema generation, and async dispatch, before graduating to the real FastMCP framework that colab-mcp is built on. We then simulate both of the server’s operational modes: the Session Proxy mode, where we spin up an authenticated WebSocket bridge between a browser frontend and an MCP client, and the Runtime mode, where we wire up a direct kernel execution engine with persistent state, lazy initialization, and Jupyter-style output handling. From there, we assemble a complete AI agent loop that reasons about tasks, selects tools, executes code, inspects results, and iterates, the same pattern Claude Code and Gemini CLI use when connected to colab-mcp in the real world. We close with production-grade orchestration: automatic retries with exponential backoff, timeout handling, dependency-aware cell sequencing, and execution reporting.
import subprocess, sys
def install(pkg):
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])
install("fastmcp>=2.2.0,<3.0.0")
install("websockets>=15.0.1")
install("pydantic>=2.0.0,<3.0.0")
install("requests>=2.32.0")
install("mcp>=1.0.0")
install("httpx")
install("google-auth")
install("google-auth-oauthlib")
install("openai")
print("
All dependencies installed.")
ARCHITECTURE_OVERVIEW = """
╔══════════════════════════════════════════════════════════════════════╗
║ colab-mcp Architecture ║
╠══════════════════════════════════════════════════════════════════════╣
║ ║
║ ┌──────────────┐ MCP (JSON-RPC) ┌──────────────────┐ ║
║ │ AI Agent │◄──────────────────────►│ colab-mcp │ ║
║ │ (Claude, │ stdio transport │ FastMCP Server │ ║
║ │ Gemini, │ │ │ ║
║ │ Custom) │ └──────┬───────────┘ ║
║ └──────────────┘ │ ║
║ ┌─────────────┼────────────┐ ║
║ │ │ │ ║
║ ┌─────▼──────┐ ┌───▼──────────┐ │ ║
║ │ SESSION │ │ RUNTIME │ │ ║
║ │ PROXY │ │ MODE │ │ ║
║ │ MODE │ │ │ │ ║
║ │ │ │ Jupyter │ │ ║
║ │ WebSocket │ │ Kernel │ │ ║
║ │ Bridge │ │ Client │ │ ║
║ └─────┬──────┘ └───┬──────────┘ │ ║
║ │ │ │ ║
║ ┌─────▼──────┐ ┌───▼──────────┐ │ ║
║ │ Browser │ │ Colab VM │ │ ║
║ │ Colab UI │ │ (GPU/TPU) │ │ ║
║ └────────────┘ └──────────────┘ │ ║
║ │ ║
║ SESSION PROXY (default): Browser
WebSocket
Agent │ ║
║ RUNTIME MODE (opt-in): Agent → Kernel → Code Execution │ ║
╚══════════════════════════════════════════════════════════════════════╝
"""
print(ARCHITECTURE_OVERVIEW)
import asyncio
import json
from typing import Any
class MCPToolRegistry:
def __init__(self, name: str):
self.name = name
self._tools: dict[str, dict] = {}
def tool(self, func):
import inspect
sig = inspect.signature(func)
params = {}
for pname, p in sig.parameters.items():
ptype = "string"
if p.annotation == int:
ptype = "integer"
elif p.annotation == bool:
ptype = "boolean"
elif p.annotation == float:
ptype = "number"
params[pname] = {"type": ptype, "description": f"Parameter: {pname}"}
self._tools[func.__name__] = {
"name": func.__name__,
"description": func.__doc__ or "",
"inputSchema": {
"type": "object",
"properties": params,
"required": list(params.keys())
},
"handler": func,
}
return func
def list_tools(self) -> list[dict]:
return [
{k: v for k, v in t.items() if k != "handler"}
for t in self._tools.values()
]
async def call_tool(self, name: str, arguments: dict) -> Any:
if name not in self._tools:
raise ValueError(f"Unknown tool: {name}")
handler = self._tools[name]["handler"]
if asyncio.iscoroutinefunction(handler):
return await handler(**arguments)
return handler(**arguments)
server = MCPToolRegistry("colab-mcp-demo")
@server.tool
def execute_code(code: str) -> str:
"""Execute Python code in the runtime kernel and return output."""
import io, contextlib
buf = io.StringIO()
try:
with contextlib.redirect_stdout(buf):
exec(code, {"__builtins__": __builtins__})
output = buf.getvalue()
return output if output else "(no output)"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@server.tool
def add_code_cell(code: str, cell_index: int) -> str:
"""Add a code cell to the notebook at the specified index."""
return json.dumps({
"status": "success",
"action": "add_code_cell",
"cell_index": cell_index,
"preview": code[:80] + ("..." if len(code) > 80 else ""),
})
@server.tool
def add_text_cell(content: str, cell_index: int) -> str:
"""Add a markdown cell to the notebook at the specified index."""
return json.dumps({
"status": "success",
"action": "add_text_cell",
"cell_index": cell_index,
"preview": content[:80] + ("..." if len(content) > 80 else ""),
})
@server.tool
def get_cells(cell_index_start: int, include_outputs: bool) -> str:
"""Retrieve cells from the notebook starting at the given index."""
return json.dumps({
"cells": [
{"cell_type": "code", "id": "cell_0", "source": ["import pandas as pd"]},
{"cell_type": "markdown", "id": "cell_1", "source": ["# Analysis"]},
]
})
print("
Registered MCP Tools:")
print("=" * 60)
for tool in server.list_tools():
print(f"n
{tool['name']}")
print(f" Description: {tool['description']}")
params = tool['inputSchema']['properties']
for pname, pinfo in params.items():
print(f" Param: {pname} ({pinfo['type']})")
print("nn
Calling Tools:")
print("=" * 60)
async def demo_tool_calls():
result = await server.call_tool("execute_code", {
"code": "print('Hello from the MCP runtime!')nprint(2 + 2)"
})
print(f"nexecute_code result:n{result}")
result = await server.call_tool("add_code_cell", {
"code": "import matplotlib.pyplot as pltnplt.plot([1,2,3],[1,4,9])nplt.show()",
"cell_index": 0,
})
print(f"nadd_code_cell result:n{result}")
result = await server.call_tool("get_cells", {
"cell_index_start": 0,
"include_outputs": False,
})
print(f"nget_cells result:n{result}")
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "nest_asyncio"])
import nest_asyncio
nest_asyncio.apply()
asyncio.run(demo_tool_calls())
We install all the dependencies the tutorial needs: FastMCP, websockets, Pydantic, the MCP SDK, and Google auth libraries, so the remaining snippets run without interruption. We then build a custom MCPToolRegistry class entirely from scratch, walking ourselves through the exact mechanics the protocol relies on: decorator-based tool registration, automatic JSON Schema generation from Python type hints, and async tool dispatch. We register four tools that mirror the real colab-mcp surface, execute_code, add_code_cell, add_text_cell, and get_cells, list their schemas, and call each one to confirm the full request-response cycle works end to end.
from fastmcp import FastMCP
import asyncio
import json
import secrets
import websockets
from websockets.asyncio.server import serve as ws_serve
import nest_asyncio
nest_asyncio.apply()
mcp = FastMCP("colab-mcp-tutorial")
@mcp.tool()
def open_colab_browser_connection() -> dict:
"""Opens a connection to the Colab browser UI."""
token = secrets.token_hex(16)
port = 8765
url = f"https://colab.research.google.com/scratchpads#mcpProxyToken={token}&mcpProxyPort={port}"
return {
"result": True,
"message": "Browser connection established",
"url": url,
"token": token,
"port": port,
}
@mcp.tool()
def proxy_get_cells(cell_index_start: int = 0, include_outputs: bool = True) -> dict:
"""Get notebook cells from the connected Colab frontend."""
return {
"cells": [
{
"cell_type": "code",
"id": "abc123",
"source": ["import numpy as npn", "data = np.random.randn(100)n"],
"outputs": [{"output_type": "execute_result", "text": "array([...])"}]
if include_outputs else [],
},
{
"cell_type": "markdown",
"id": "def456",
"source": ["# Data Analysis Reportn"],
"outputs": [],
},
]
}
@mcp.tool()
def proxy_add_code_cell(cell_index: int, code: str, language: str = "python") -> dict:
"""Add a new code cell to the notebook at the specified position."""
return {"status": "ok", "cell_index": cell_index, "language": language}
@mcp.tool()
def proxy_add_text_cell(cell_index: int, content: str) -> dict:
"""Add a new markdown cell to the notebook at the specified position."""
return {"status": "ok", "cell_index": cell_index}
@mcp.tool()
def proxy_execute_cell(cell_index: int) -> dict:
"""Execute the cell at the specified index in the connected notebook."""
return {"status": "ok", "cell_index": cell_index, "execution_count": 1}
@mcp.tool()
def runtime_execute_code(code: str) -> dict:
"""Execute Python code directly in a Colab kernel (Runtime Mode)."""
import io, contextlib, traceback
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
try:
with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf):
exec(code, {"__builtins__": __builtins__})
return {
"outputs": [
{"output_type": "stream", "name": "stdout", "text": stdout_buf.getvalue()},
]
}
except Exception:
return {
"outputs": [
{"output_type": "error", "traceback": traceback.format_exc()},
]
}
print("
FastMCP Server Tools:")
print("=" * 60)
async def list_fastmcp_tools():
tools_dict = await mcp.get_tools()
for name, tool in tools_dict.items():
print(f"n
{tool.name}")
print(f" {tool.description[:100]}")
return tools_dict
tools = asyncio.run(list_fastmcp_tools())
print(f"n
Total tools registered: {len(tools)}")
class SimulatedColabWebSocketServer:
def __init__(self, host: str = "localhost", port: int = 0):
self.host = host
self.port = port
self.token = secrets.token_hex(16)
self.connection_live = asyncio.Event()
self._server = None
self._messages_received: list[dict] = []
async def _handler(self, websocket):
try:
auth_msg = await asyncio.wait_for(websocket.recv(), timeout=10.0)
auth_data = json.loads(auth_msg)
if auth_data.get("token") != self.token:
await websocket.send(json.dumps({"error": "Invalid token"}))
await websocket.close()
return
await websocket.send(json.dumps({"status": "authenticated"}))
self.connection_live.set()
print(f"
Client authenticated (token: {self.token[:8]}...)")
async for message in websocket:
data = json.loads(message)
self._messages_received.append(data)
print(f"
Received: {data.get('method', 'unknown')} "
f"— {json.dumps(data.get('params', {}))[:80]}")
response = {
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {"status": "ok", "tool": data.get("method")},
}
await websocket.send(json.dumps(response))
except websockets. exceptions.ConnectionClosed:
print("
Connection closed")
except asyncio.TimeoutError:
print("
Auth timeout")
async def start(self):
self._server = await ws_serve(self._handler, self.host, self.port)
self.port = self._server.sockets[0].getsockname()[1]
print(f"
WebSocket server running on ws://{self.host}:{self.port}")
print(f"
Token: {self.token[:8]}...")
return self
async def stop(self):
if self._server:
self._server.close()
await self._server.wait_closed()
print("
WebSocket server stopped")
async def simulate_browser_client(port: int, token: str):
uri = f"ws://localhost:{port}"
async with websockets.connect(uri) as ws:
await ws.send(json.dumps({"token": token}))
auth_response = json.loads(await ws.recv())
print(f"
Auth response: {auth_response}")
tool_call = {
"jsonrpc": "2.0",
"id": 1,
"method": "add_code_cell",
"params": {
"cellIndex": 0,
"code": "import pandas as pdndf = pd.read_csv('data.csv')",
"language": "python",
}
}
await ws.send(json.dumps(tool_call))
response = json.loads(await ws.recv())
print(f"
Tool response: {response}")
execute_call = {
"jsonrpc": "2.0",
"id": 2,
"method": "execute_cell",
"params": {"cellIndex": 0}
}
await ws.send(json.dumps(execute_call))
response = json.loads(await ws.recv())
print(f"
Execute response: {response}")
async def run_websocket_demo():
print("
WebSocket Bridge Demo (Session Proxy Mode)")
print("=" * 60)
print("n
Starting WebSocket server...")
wss = SimulatedColabWebSocketServer()
await wss.start()
print("n
Simulating browser frontend connection...")
await simulate_browser_client(wss.port, wss.token)
print(f"n
Server received {len(wss._messages_received)} tool calls")
await wss.stop()
print("n
Demo complete!")
asyncio.run(run_websocket_demo())
We graduate from our hand-rolled registry to the real FastMCP framework and create a server with six tools spanning both of colab-mcp’s operational modes: proxy tools like open_colab_browser_connection, proxy_add_code_cell, and proxy_execute_cell, plus runtime_execute_code for direct kernel access. We then build a SimulatedColabWebSocketServer that replicates the Session Proxy architecture — it listens for connections, validates a security token on the first message, and forwards JSON-RPC tool calls between the browser frontend and the MCP client. We run the full demo by spinning up the server, connecting a simulated browser client that sends add_code_cell and execute_cell calls, and verifying that authenticated messages flow correctly through the bridge.
import asyncio
import io
import contextlib
import traceback
import uuid
from dataclasses import dataclass, field
from typing import Optional
import nest_asyncio
nest_asyncio.apply()
@dataclass
class KernelOutput:
output_type: str
text: str = ""
data: dict = field(default_factory=dict)
traceback_lines: list = field(default_factory=list)
@dataclass
class ExecutionResult:
success: bool
outputs: list[KernelOutput]
execution_count: int
class ColabRuntimeSimulator:
def __init__(self):
self._id = uuid.uuid4()
self._execution_count = 0
self._namespace: dict = {"__builtins__": __builtins__}
self._is_started = False
@property
def runtime_id(self) -> str:
return str(self._id)
async def start(self) -> None:
print(f"
Initializing runtime {self.runtime_id[:8]}...")
print(f"
[Simulated] OAuth2 authentication...")
await asyncio.sleep(0.1)
print(f"
[Simulated] Requesting VM assignment...")
await asyncio.sleep(0.1)
print(f"
[Simulated] Connecting to Jupyter kernel...")
await asyncio.sleep(0.1)
self._is_started = True
print(f"
Runtime started!")
async def execute_code(self, code: str) -> ExecutionResult:
if not self._is_started:
await self.start()
self._execution_count += 1
outputs: list[KernelOutput] = []
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
try:
with contextlib.redirect_stdout(stdout_buf),
contextlib.redirect_stderr(stderr_buf):
try:
result = eval(code, self._namespace)
if result is not None:
outputs.append(KernelOutput(
output_type="execute_result",
text=repr(result),
data={"text/plain": repr(result)},
))
except SyntaxError:
exec(code, self._namespace)
stdout_text = stdout_buf.getvalue()
if stdout_text:
outputs.append(KernelOutput(
output_type="stream",
text=stdout_text,
))
stderr_text = stderr_buf.getvalue()
if stderr_text:
outputs.append(KernelOutput(
output_type="stream",
text=stderr_text,
))
return ExecutionResult(
success=True,
outputs=outputs,
execution_count=self._execution_count,
)
except Exception as e:
tb = traceback.format_exc()
outputs.append(KernelOutput(
output_type="error",
text=str(e),
traceback_lines=tb.split("n"),
))
return ExecutionResult(
success=False,
outputs=outputs,
execution_count=self._execution_count,
)
async def stop(self) -> None:
if self._is_started:
print(f"
Unassigning VM for runtime {self.runtime_id[:8]}...")
self._is_started = False
print(f"
Runtime stopped and VM released.")
async def runtime_demo():
print("
Runtime Mode Demo")
print("=" * 60)
runtime = ColabRuntimeSimulator()
await runtime.start()
code_snippets = [
"""
import random
.seed(42)
data = [random.gauss(0, 1) for _ in range(1000)]
print(f"Generated {len(data)} data points")
print(f"Mean: {sum(data)/len(data):.4f}")
print(f"Min: {min(data):.4f}, Max: {max(data):.4f}")
""",
"""
variance = sum((x - sum(data)/len(data))**2 for x in data) / len(data)
std_dev = variance ** 0.5
print(f"Variance: {variance:.4f}")
print(f"Std Dev: {std_dev:.4f}")
""",
"len(data)",
"undefined_variable + 1",
]
for i, code in enumerate(code_snippets):
print(f"n{'─' * 40}")
print(f"
Executing cell [{i+1}]:")
print(f" Code: {code.strip()[:60]}{'...' if len(code.strip()) > 60 else ''}")
result = await runtime.execute_code(code)
status = "
" if result.success else "
"
print(f" {status} Execution #{result.execution_count} "
f"({'success' if result. success else 'error'})")
for out in result.outputs:
if out.output_type == "stream":
for line in out.text.strip().split("n"):
print(f"
{line}")
elif out.output_type == "execute_result":
print(f"
Out[{result.execution_count}]: {out.text}")
elif out.output_type == "error":
print(f"
{out.text}")
await runtime.stop()
print(f"n
Runtime demo complete!")
asyncio.run(runtime_demo())
We construct a ColabRuntimeSimulator that mirrors the ColabRuntimeTool from the real codebase, complete with the lazy initialization chain, simulated OAuth2 authentication, VM assignment, and Jupyter kernel connection, that fires only when we first execute code. We run four sequential cells through it, demonstrating that the runtime maintains persistent state across executions: we create a 1,000-point dataset in cell one, compute variance and standard deviation from that same dataset in cell two, evaluate a bare expression in cell three, and trigger an intentional NameError in cell four to confirm errors propagate cleanly. We finish by calling stop(), which simulates VM unassignment and resource cleanup, showing the full lifecycle from kernel startup to graceful shutdown.
import asyncio
import json
import io
import contextlib
import re
from dataclasses import dataclass
from typing import Callable, Awaitable
import nest_asyncio
nest_asyncio.apply()
TOOL_DEFINITIONS = [
{
"name": "execute_code",
"description": "Execute Python code in the Colab kernel. Returns stdout, results, or errors. State persists between calls."
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "Python code to execute"},
},
"required": ["code"],
}
},
{
"name": "add_code_cell",
"description": "Add a code cell to the notebook at a given index.",
"parameters": {
"type": "object",
"properties": {
"cell_index": {"type": "integer", "description": "Position to insert"},
"code": {"type": "string", "description": "Python code for the cell"},
},
"required": ["cell_index", "code"],
}
},
{
"name": "add_text_cell",
"description": "Add a markdown documentation cell to the notebook.",
"parameters": {
"type": "object",
"properties": {
"cell_index": {"type": "integer", "description": "Position to insert"},
"content": {"type": "string", "description": "Markdown content"},
},
"required": ["cell_index", "content"],
}
},
{
"name": "get_cells",
"description": "Retrieve current notebook cells and their outputs.",
"parameters": {
"type": "object",
"properties": {
"cell_index_start": {"type": "integer", "description": "Start index", "default": 0},
"include_outputs": {"type": "boolean", "description": "Include cell outputs", "default": True},
},
"required": [],
}
},
]
class NotebookState:
def __init__(self):
self.cells: list[dict] = []
self.execution_ns: dict = {"__builtins__": __builtins__}
def add_code_cell(self, index: int, code: str) -> dict:
cell = {"type": "code", "source": code, "outputs": [], "executed": False}
self.cells.insert(min(index, len(self.cells)), cell)
return {"status": "ok", "cell_count": len(self.cells)}
def add_text_cell(self, index: int, content: str) -> dict:
cell = {"type": "markdown", "source": content}
self.cells.insert(min(index, len(self.cells)), cell)
return {"status": "ok", "cell_count": len(self.cells)}
def execute_code(self, code: str) -> dict:
stdout_buf = io.StringIO()
try:
with contextlib.redirect_stdout(stdout_buf):
try:
result = eval(code, self.execution_ns)
if result is not None:
return {"outputs": [{"type": "result", "text": repr(result)}]}
except SyntaxError:
exec(code, self.execution_ns)
out = stdout_buf.getvalue()
return {"outputs": [{"type": "stdout", "text": out}] if out else []}
except Exception as e:
return {"outputs": [{"type": "error", "text": f"{type(e).__name__}: {e}"}]}
def get_cells(self, start: int = 0, include_outputs: bool = True) -> dict:
return {"cells": self.cells[start:], "total": len(self.cells)}
class MCPAgentLoop:
def __init__(self):
self.notebook = NotebookState()
self.history: list[dict] = []
self.max_iterations = 10
def _dispatch_tool(self, name: str, args: dict) -> dict:
if name == "execute_code":
return self.notebook.execute_code(args["code"])
elif name == "add_code_cell":
return self.notebook.add_code_cell(args["cell_index"], args["code"])
elif name == "add_text_cell":
return self.notebook.add_text_cell(args["cell_index"], args["content"])
elif name == "get_cells":
return self.notebook.get_cells(
args.get("cell_index_start", 0),
args.get("include_outputs", True),
)
else:
return {"error": f"Unknown tool: {name}"}
def _plan(self, task: str, iteration: int, last_result: dict = None) -> list[dict]:
task_lower = task.lower()
if iteration == 0:
return [
{"tool": "add_text_cell", "args": {
"cell_index": 0,
"content": f"# AI-Generated Analysisnn**Task**: {task}nn"
f"*Generated by MCP Agent*"
}},
]
elif iteration == 1:
return [
{"tool": "add_code_cell", "args": {
"cell_index": 1,
"code": "import randomnimport mathnn"
"# Generate sample datan"
"random.seed(42)n"
"data = [random.gauss(100, 15) for _ in range(500)]n"
"print(f'Generated {len(data)} data points')n"
"print(f'Sample: {data[:5]}')"
}},
{"tool": "execute_code", "args": {
"code": "import randomnimport mathnn"
"random.seed(42)n"
"data = [random.gauss(100, 15) for _ in range(500)]n"
"print(f'Generated {len(data)} data points')n"
"print(f'Sample: {[round(x,2) for x in data[:5]]}')"
}},
]
elif iteration == 2:
return [
{"tool": "add_code_cell", "args": {
"cell_index": 2,
"code": "# Statistical analysisn"
"mean = sum(data) / len(data)n"
"variance = sum((x - mean)**2 for x in data) / len(data)n"
"std = variance ** 0.5n"
"median = sorted(data)[len(data)//2]n"
"print(f'Mean: {mean:.2f}')n"
"print(f'Std Dev: {std:.2f}')n"
"print(f'Median: {median:.2f}')"
}},
{"tool": "execute_code", "args": {
"code": "mean = sum(data) / len(data)n"
"variance = sum((x - mean)**2 for x in data) / len(data)n"
"std = variance ** 0.5n"
"median = sorted(data)[len(data)//2]n"
"print(f'Mean: {mean:.2f}')n"
"print(f'Std Dev: {std:.2f}')n"
"print(f'Median: {median:.2f}')"
}},
]
elif iteration == 3:
return [
{"tool": "add_text_cell", "args": {
"cell_index": 3,
"content": "## Results Summarynn"
"The analysis is complete. Key findings are computed above."
"The data follows a normal distribution centered around 100."
}},
]
else:
return []
async def run(self, task: str):
print(f"
Agent Task: {task}")
print("=" * 60)
for i in range(self.max_iterations):
plan = self._plan(task, i)
if not planned:
print(f"n
Agent finished after {i} iterations")
break
print(f"n--- Iteration {i+1} ---")
for step in plan:
tool_name = step["tool"]
tool_args = step["args"]
print(f"
Calling: {tool_name}")
result = self._dispatch_tool(tool_name, tool_args)
self.history.append({
"iteration": i,
"tool": tool_name,
"result": result,
})
if "outputs" in result:
for out in result["outputs"]:
prefix = "
" if out["type"] != "error" else "
"
text = out["text"][:200]
print(f" {prefix} {text}")
elif "status" in result:
print(f"
{result}")
print(f"n
Final Notebook State:")
print("=" * 60)
for i, cell in enumerate(self.notebook.cells):
icon = "
" if cell["type"] == "code" else "
"
source = cell["source"][:60] + ("..." if len(cell["source"]) > 60 else "")
print(f" [{i}] {icon} {cell['type']:10s} | {source}")
agent = MCPAgentLoop()
asyncio.run(agent.run("Analyze a dataset with descriptive statistics"))
INTEGRATION_TEMPLATE = '''
import anthropic
import json
client = anthropic.Anthropic()
tools = [
{
"name": "colab-proxy-mcp_add_code_cell",
"description": "Add a Python code cell to the connected Colab notebook",
"input_schema": {
"type": "object",
"properties": {
"cellIndex": {"type": "integer"},
"code": {"type": "string"},
"language": {"type": "string", "default": "python"},
},
"required": ["cellIndex", "code"],
}
},
{
"name": "colab-proxy-mcp_add_text_cell",
"description": "Add a markdown cell to the connected Colab notebook",
"input_schema": {
"type": "object",
"properties": {
"cellIndex": {"type": "integer"},
"content": {"type": "string"},
},
"required": ["cellIndex", "content"],
}
},
{
"name": "colab-proxy-mcp_execute_cell",
"description": "Execute a cell in the connected Colab notebook",
"input_schema": {
"type": "object",
"properties": {
"cellIndex": {"type": "integer"},
},
"required": ["cellIndex"],
}
},
{
"name": "colab-proxy-mcp_get_cells",
"description": "Get cells from the connected Colab notebook",
"input_schema": {
"type": "object",
"properties": {
"cellIndexStart": {"type": "integer", "default": 0},
"includeOutputs": {"type": "boolean", "default": True},
},
}
},
{
"name": "runtime_execute_code",
"description": "Execute Python code directly in the Colab kernel (Runtime Mode)",
"input_schema": {
"type": "object",
"properties": {
"code": {"type": "string"},
},
"required": ["code"],
}
},
]
def run_agent(task: str, max_turns: int = 15):
messages = [{"role": "user", "content": task}]
for turn in range(max_turns):
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
tools=tools,
messages=messages,
system="You are an AI assistant with access to a Google Colab notebook."
"via MCP tools. Build notebooks step by step: add markdown cells "
"For documentation, add code cells, then execute them. "
"Inspect outputs and fix errors iteratively."
)
assistant_content = response.content
messages.append({"role": "assistant", "content": assistant_content})
if response.stop_reason == "end_turn":
print("Agent finished.")
break
tool_results = []
for block in assistant_content:
if block.type == "tool_use":
print(f"Tool call: {block.name}({json.dumps(block.input)[:100]})")
result = dispatch_to_mcp_server(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result),
})
if tool_results:
messages.append({"role": "user", "content": tool_results})
else:
break
def dispatch_to_mcp_server(tool_name: str, tool_input: dict) -> dict:
raise NotImplementedError("Use the MCP SDK for real tool dispatch")
'''
print(INTEGRATION_TEMPLATE)
print("n" + "=" * 60)
print("
The template above shows how to connect a real LLM to colab-mcp.")
print(" For Claude Code: just add the MCP config and start chatting!")
print(" For custom agents: use the Anthropic SDK with tool_use.")
We build a complete MCPAgentLoop that replicates how real AI agents interact with colab-mcp: it receives a task, plans a sequence of tool calls, dispatches them to a NotebookState manager, inspects outputs, and iterates until the notebook is fully built. We watch the agent run four iterations, which add a markdown title cell, import libraries, generate data, compute descriptive statistics, and write a summary, producing a four-cell notebook entirely through tool calls, with every execution result printed inline. We then print a full-production integration template showing both the zero-code path (a JSON config block for Claude Code or the Gemini CLI) and the custom-agent path (a complete Anthropic API loop with tool definitions, message history management, and tool-result wiring).
import asyncio
import io
import contextlib
import traceback
import uuid
import time
from enum import Enum
from dataclasses import dataclass, field
import nest_asyncio
nest_asyncio.apply()
@dataclass
class KernelOutput:
output_type: str
text: str = ""
data: dict = field(default_factory=dict)
traceback_lines: list = field(default_factory=list)
@dataclass
class ExecutionResult:
success: bool
outputs: list[KernelOutput]
execution_count: int
class ColabRuntimeSimulator:
def __init__(self):
self._id = uuid.uuid4()
self._execution_count = 0
self._namespace: dict = {"__builtins__": __builtins__}
self._is_started = False
@property
def runtime_id(self) -> str:
return str(self._id)
async def start(self) -> None:
print(f"
Initializing runtime {self.runtime_id[:8]}...")
print(f"
[Simulated] OAuth2 authentication...")
await asyncio.sleep(0.1)
print(f"
[Simulated] Requesting VM assignment...")
await asyncio.sleep(0.1)
print(f"
[Simulated] Connecting to Jupyter kernel...")
await asyncio.sleep(0.1)
self._is_started = True
print(f"
Runtime started!")
async def execute_code(self, code: str) -> ExecutionResult:
if not self._is_started:
await self.start()
self._execution_count += 1
outputs: list[KernelOutput] = []
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
try:
with contextlib.redirect_stdout(stdout_buf),
contextlib.redirect_stderr(stderr_buf):
try:
result = eval(code, self._namespace)
if result is not None:
outputs.append(KernelOutput(
output_type="execute_result",
text=repr(result),
data={"text/plain": repr(result)},
))
except SyntaxError:
exec(code, self._namespace)
stdout_text = stdout_buf.getvalue()
if stdout_text:
outputs.append(KernelOutput(
output_type="stream",
text=stdout_text,
))
stderr_text = stderr_buf.getvalue()
if stderr_text:
outputs.append(KernelOutput(
output_type="stream",
text=stderr_text,
))
return ExecutionResult(
success=True,
outputs=outputs,
execution_count=self._execution_count,
)
except Exception as e:
tb = traceback.format_exc()
outputs.append(KernelOutput(
output_type="error",
text=str(e),
traceback_lines=tb.split("n"),
))
return ExecutionResult(
success=False,
outputs=outputs,
execution_count=self._execution_count,
)
async def stop(self) -> None:
if self._is_started:
print(f"
Unassigning VM for runtime {self.runtime_id[:8]}...")
self._is_started = False
print(f"
Runtime stopped and VM released.")
class ExecutionStatus(Enum):
SUCCESS = "success"
ERROR = "error"
TIMEOUT = "timeout"
RETRYING = "retrying"
@dataclass
class CellExecution:
cell_index: int
code: str
status: ExecutionStatus = ExecutionStatus.SUCCESS
output: str = ""
error: str = ""
retries: int = 0
duration_ms: float = 0.0
class RobustNotebookOrchestrator:
def __init__(self, max_retries: int = 3, timeout_seconds: float = 30.0):
self.max_retries = max_retries
self.timeout_seconds = timeout_seconds
self.runtime = ColabRuntimeSimulator()
self.executions: list[CellExecution] = []
self._started = False
async def ensure_started(self):
if not self._started:
await self.runtime.start()
self._started = True
async def execute_with_retry(self, code: str, cell_index: int) -> CellExecution:
await self.ensure_started()
cell = CellExecution(cell_index=cell_index, code=code)
for attempt in range(self.max_retries + 1):
start_time = time.time()
try:
result = await asyncio.wait_for(
self.runtime.execute_code(code),
timeout=self.timeout_seconds,
)
cell.duration_ms = (time.time() - start_time) * 1000
if result.success:
cell.status = ExecutionStatus.SUCCESS
cell.output = "n".join(
o.text for o in result.outputs if o.text
)
break
else:
error_text = "n".join(
o.text for o in result.outputs if o.output_type == "error"
)
cell.error = error_text
if self._is_retryable(error_text) and attempt < self.max_retries:
cell.status = ExecutionStatus.RETRYING
cell.retries = attempt + 1
print(f"
Retry {attempt + 1}/{self.max_retries}: {error_text[:60]}")
await asyncio.sleep(0.5 * (attempt + 1))
continue
else:
cell.status = ExecutionStatus.ERROR
break
except asyncio.TimeoutError:
cell.duration_ms = self.timeout_seconds * 1000
cell.status = ExecutionStatus.TIMEOUT
cell.error = f"Execution timed out after {self.timeout_seconds}s"
break
self.executions.append(cell)
return cell
def _is_retryable(self, error: str) -> bool:
retryable_patterns = [
"ConnectionError", "TimeoutError", "ResourceExhausted",
"ServiceUnavailable", "CUDA out of memory",
]
return any(p.lower() in error.lower() for p in retryable_patterns)
async def execute_notebook(self, cells: list[dict]) -> dict:
await self.ensure_started()
results = []
failed = False
print("
Executing notebook...")
print("=" * 50)
for i, cell in enumerate(cells):
if cell.get("type") == "markdown":
print(f" [{i}]
Markdown: {cell['source'][:50]}...")
results.append({"index": i, "type": "markdown", "status": "ok"})
continue
if failed and not cell.get("force_execute", False):
print(f" [{i}]
Skipped (previous cell failed)")
results.append({"index": i, "type": "code", "status": "skipped"})
continue
print(f" [{i}]
Executing...")
exec_result = await self.execute_with_retry(cell["source"], i)
icon = {
ExecutionStatus.SUCCESS: "
",
ExecutionStatus.ERROR: "
",
ExecutionStatus.TIMEOUT: "
",
ExecutionStatus.RETRYING: "
",
}[exec_result.status]
print(f" {icon} {exec_result.status.value} "
f"({exec_result.duration_ms:.0f}ms)"
f"{f' [{exec_result.retries} retries]' if exec_result.retries else ''}")
if exec_result.output:
for line in exec_result.output.strip().split("n")[:3]:
print(f"
{line}")
if exec_result.status in (ExecutionStatus.ERROR, ExecutionStatus.TIMEOUT):
print(f"
{exec_result.error[:100]}")
failed = True
results.append({
"index": i,
"type": "code",
"status": exec_result.status.value,
"output": exec_result.output,
"error": exec_result.error,
})
success_count = sum(1 for r in results if r["status"] in ("ok", "success"))
fail_count = sum(1 for r in results if r["status"] in ("error", "timeout"))
skip_count = sum(1 for r in results if r["status"] == "skipped")
print(f"n
Summary: {success_count} passed, {fail_count} failed, {skip_count} skipped")
return {"results": results, "success": fail_count == 0}
def get_execution_report(self) -> str:
lines = ["Execution Report", "=" * 40]
for e in self.executions:
lines.append(
f"Cell [{e.cell_index}]: {e.status.value} "
f"({e.duration_ms:.0f}ms, {e.retries} retries)"
)
if e.error:
lines.append(f" Error: {e.error[:80]}")
return "n".join(lines)
async def advanced_demo():
orchestrator = RobustNotebookOrchestrator(max_retries=2, timeout_seconds=5.0)
notebook_cells = [
{"type": "markdown", "source": "# Advanced Analysis"},
{"type": "code", "source": "x = 42nprint(f'x = {x}')"},
{"type": "code", "source": "y = x * 2nprint(f'y = x * 2 = {y}')"},
{"type": "code", "source": "result = x + ynprint(f'Result: {result}')"},
{"type": "code", "source": "broken_var + 1"},
{"type": "code", "source": "print('This gets skipped')"},
{"type": "code", "source": "print('Force executed')", "force_execute": True},
]
result = await orchestrator.execute_notebook(notebook_cells)
print(f"n{orchestrator.get_execution_report()}")
asyncio.run(advanced_demo())
SUMMARY = """
╔══════════════════════════════════════════════════════════════════════╗
║ Tutorial Complete!
║
╠══════════════════════════════════════════════════════════════════════╣
║ ║
║ What You Learned: ║
║ ───────────────── ║
║
MCP protocol fundamentals (tools/list, tools/call) ║
║
FastMCP framework (how colab-mcp is built) ║
║
Session Proxy Mode (WebSocket bridge to browser) ║
║
Runtime Mode (direct kernel execution) ║
║
Full AI agent loop with tool dispatch ║
║
Production integration with Claude/GPT-4/Gemini ║
║
Error handling, retries, and orchestration patterns ║
║ ║
║ Quick Start (on your local machine): ║
║ ──────────────────────────────────── ║
║ 1. pip install uv ║
║ 2. Add to your MCP config: ║
║ { ║
║ "mcpServers": { ║
║ "colab-proxy-mcp": { ║
║ "command": "uvx", ║
║ "args": ["git+https://github.com/googlecolab/colab-mcp"],║
║ "timeout": 30000 ║
║ } ║
║ } ║
║ } ║
║ 3. Open a Colab notebook in your browser ║
║ 4. Tell your agent: "Build me a data analysis notebook." ║
║ ║
║ Resources: ║
║ ────────── ║
║
Repo: github.com/googlecolab/colab-mcp ║
║
Docs: deepwiki.com/googlecolab/colab-mcp ║
║
Forum: GitHub Discussions on the repo ║
║
MCP: modelcontextprotocol.io ║
║ ║
╚══════════════════════════════════════════════════════════════════════╝
"""
print(SUMMARY)
We build a RobustNotebookOrchestrator that adds production-grade resilience on top of the runtime engine: automatic retries with exponential backoff for transient errors like ConnectionError or CUDA out of memory, configurable timeouts via asyncio.wait_for, and dependency-aware cell sequencing that skips downstream cells when an upstream cell fails. We execute a seven-cell notebook that includes three successful computations, one deliberate NameError, one auto-skipped cell, and one force-executed cell, then print a structured execution report with per-cell status, duration, and retry counts. We close with a summary card listing everything we have covered and the exact four steps needed to go from this tutorial to a live colab-mcp deployment on our own machine.
In conclusion, we now have a working, end-to-end understanding of how colab-mcp turns Google Colab into a programmable workspace for AI agents. We have seen the MCP protocol from both sides, as server authors registering tools and as client code dispatching calls, and we understand why the dual-mode architecture exists: Session Proxy for interactive, browser-visible notebook manipulation, and Runtime for headless, direct kernel execution. We have built the same abstractions the real codebase uses (FastMCP servers, WebSocket bridges with token security, lazy-init resource chains), and we have run them ourselves rather than just reading about them. Most importantly, we have a clear path from this tutorial to real deployment: we take the MCP config JSON, point Claude Code or the Gemini CLI at it, open a Colab notebook, and start issuing natural-language commands that the agent automatically translates into add_code_cell, execute_cell, and get_cells calls. The orchestration patterns from retries, timeouts, and skip-on-failure give us the resilience we need when we move from demos to actual workflows involving large datasets, GPU-accelerated training, or multi-step analyses.
Check out the Full Notebook here. Also, feel free to follow us on Twitter and don’t forget to join our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Design a Production-Ready AI Agent That Automates Google Colab Workflows Using Colab-MCP, MCP Tools, FastMCP, and Kernel Execution appeared first on MarkTechPost.
