Client‑Side MCP That Works: Notes from an OSS Dev.
Lessons I learned from implementing marimo's MCP client
Model Context Protocol is generating increasing buzz as the open standard for wiring AI models to external tools and data sources. MCP, particularly client implementation, is also where hard choices live: transports, primitives, outputs, error semantics, and state. I learned this first hand co-building an OSS MCP client for voideditor.com, an open source alternative to Cursor (26k+★) and then building one myself for marimo.io, an AI-native python notebook (15k+★.) With marimo’s MCP merged recently, here’s a short case study and the patterns I wish I’d known on day one.
If you find this post helpful, type your email and hit Subscribe. I’ll send the next installment straight to your inbox.
What I implemented for Marimo’s MCP (and why)
1) Transports
There are only two transports in the MCP spec both of which I support: STDIO for local processes and Streamable HTTP for remote/modern servers. SSE is intentionally excluded because it’s been deprecated by the MCP spec. Streamable HTTP is the path forward; STDIO remains great for local CLIs. Each server gets a normal tool‑call timeout and a shorter health‑check timeout so long‑running tools don’t hide a dead connection.
# transports.py
from enum import Enum
from contextlib import AsyncExitStack
from typing import Tuple, Union, Protocol, Any
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
SessionMsg = "SessionMessage" # forward ref for type hints
TransportStreams = Tuple[
MemoryObjectReceiveStream[Union[SessionMsg, Exception]],
MemoryObjectSendStream[SessionMsg],
]
class MCPTransportType(str, Enum):
STDIO = "stdio"
STREAMABLE_HTTP = "streamable_http"
class TransportConnector(Protocol):
async def connect(
self, exit_stack: AsyncExitStack, **kwargs: Any
) -> TransportStreams: ...
class StdioConnector:
async def connect(
self, exit_stack: AsyncExitStack, command: str, args: list[str] | None = None
) -> TransportStreams:
from mcp import StdioServerParameters
from mcp.client.stdio import stdio_client
read, write, *_ = await exit_stack.enter_async_context(
stdio_client(StdioServerParameters(command, args or []))
)
return read, write
class StreamableHTTPConnector:
async def connect(
self,
exit_stack: AsyncExitStack,
url: str,
headers: dict[str, str] | None = None,
timeout: float = 30.0,
) -> TransportStreams:
from mcp.client.streamable_http import streamablehttp_client
read, write, *_ = await exit_stack.enter_async_context(
streamablehttp_client(url, headers=headers or {}, timeout=timeout)
)
return read, write
class TransportRegistry:
def __init__(self) -> None:
self._registry: dict[MCPTransportType, TransportConnector] = {
MCPTransportType.STDIO: StdioConnector(),
MCPTransportType.STREAMABLE_HTTP: StreamableHTTPConnector(),
}
def get(self, transport: MCPTransportType) -> TransportConnector:
return self._registry[transport]
2) Scope the surface: Tools first
Marimo’s v1 goal was simple: make notebook‑aware tools discoverable and callable from any AI client. So the client wires list_tools() → registry → call_tool(). Resources and Prompts are part of MCP, but keeping them out of v1 kept complexity and support burden in check.
# discovery.py
from mcp.types import Tool, CallToolRequestParams, CallToolResult
from mcp import ClientSession
async def discover_tools(session: ClientSession, server_name: str) -> list[Tool]:
"""List tools from an MCP server and attach useful metadata."""
tools = (await session.list_tools()).tools
for t in tools:
t.meta = {
"server_name": server_name,
"namespaced_name": f"mcp_{server_name}_{t.name}",
}
return tools
async def call_tool(
session: ClientSession, tool: Tool, arguments: dict | None = None
) -> CallToolResult:
"""Simple wrapper that preserves the same signature Marimo uses."""
return await session.call_tool(tool.name, arguments)
3) Output types
Right now I only parse TextContent. If a tool fails, I don’t throw—I return a CallToolResult with isError: true and a TextContent explaining what happened. That lets the model reason/repair, and it keeps transport exceptions for actual transport failures. Structured/embedded content will land later once we turn on Resources/Prompts. I also added “Internal Error:” in front system errors so that the LLM doesn’t try to debug them, too.
Possible improvements: Add additional useful metadata to responses, such as is_retryable to let the LLM know if it’ll get a different result if it reruns the same tool. I haven’t tried this yet, but I noticed that Cursor includes it so it’s worth a shot.
# errors.py
from mcp.types import CallToolResult, TextContent
def create_error(msg: str, retryable: bool | None = None) -> CallToolResult:
"""Return a model-visible error payload."""
meta = {"is_retryable": retryable} if retryable is not None else {}
structured = {"code": "INTERNAL_ERROR", "message": msg}
if retryable is not None:
structured["is_retryable"] = retryable
return CallToolResult(
isError=True,
content=[TextContent(type="text", text=f"Internal Error: {msg}")],
_meta=meta,
structuredContent=structured, # <-- only addition
)
def is_error(result: CallToolResult) -> bool:
return getattr(result, "isError", False) is True
def extract_text(result: CallToolResult) -> list[str]:
return [
c.text for c in (result.content or []) if getattr(c, "type", None) == "text"
]
4) Health and error semantics
Config: We ship a curated list of servers, so config parse errors are logged and skipped. No user‑editable file to corrupt.
Server init failures (connect/init): If the transport handshake or ClientSession.initialize() fails, set ERROR, close the AsyncExitStack, don’t start health monitoring, and keep the registry clean—tools are purged/never registered because we clear stale entries before discovery. Note: When we tested various misconfigured servers with the MCP client, we discovered that a server can hang during initialization without emitting an error so you might want to consider the pros and cons of implementing a timeout.
Server health (UI‑facing, fail‑closed): A short‑interval ping checks liveness. On timeout or error, set status ERROR, close resources, and purge that server’s tools. The UI shows the state; with tools removed, the model can’t call it.
# health.py
import asyncio
from mcp import ClientSession
async def ping(session: ClientSession, timeout: float = 5.0) -> bool:
"""Return True if the server responds within the timeout window."""
try:
await asyncio.wait_for(session.send_ping(), timeout)
return True
except asyncio.TimeoutError:
return False
Tool failures (model-facing): Timeouts, missing tools, name mismatches, or other validation issues return a CallToolResult with isError: true and concise TextContent the model can act on. Keep system/transport details minimal—the model can’t fix them.
5) State model that stays clean
Servers (spec): servers: {name → MCPServerDefinition}
Connections (live): connections: {name → MCPServerConnection}
Tools (global index): tool_registry: {namespaced → Tool} where namespaced = mcp_{server}_{tool} plus readable counters when needed. Each connection also keeps connection.tools so teardown is O(1).
# state.py
from collections import defaultdict
from mcp.types import Tool
class ClientState:
"""In‑memory single‑source‑of‑truth for everything that changes at runtime."""
def __init__(self) -> None:
self.servers: dict[str, "MCPServerDefinition"] = {}
self.connections: dict[str, "MCPServerConnection"] = {}
self.tool_registry: dict[str, Tool] = {}
self._counters: defaultdict[str, int] = defaultdict(int)
def namespaced(self, server: str, tool: str) -> str:
"""Return a globally unique name, adding a counter on collision."""
base = f"mcp_{server}_{tool}"
if base not in self.tool_registry:
return base
self._counters[server] += 1
return f"mcp_{server}{self._counters[server]}_{tool}"
Health: health_check_tasks: {server → task}. Counters reset on disconnect so reconnects start clean. Ping only CONNECTED servers—never ERROR, DISCONNECTED, or CONNECTING—so you can isolate and drop a sick server without touching the rest.
Patterns that mattered
Single Source of Truth module. One place owns transports, sessions, tool registry, and health. Easier to test, safer to extend.
Lazy, namespaced tools. Discover after a real session; namespace everything; purge on error. No collisions, no ghosts.
Fail‑closed health. Ping with short timeouts. If it flaps, mark ERROR and remove tools—don’t let the model waste tokens on dead endpoints.
Model-visible tool errors. Don’t throw for business/API failures. Return text results with isError: true and readable text (optionally structured). Reserve protocol/JSON‑RPC errors for transport or invalid‑request faults
Anti‑patterns this avoids
Leaky protocol in call sites. Call sites depend on today’s transport, primitives, return shapes, and error types. Swapping SSE → Streamable HTTP or adding structured outputs, elicitation, streamable results, or new error categories triggers repo‑wide changes.
Static snapshot & global collisions. A cached global tool list drifts, survives crashes, and names collide across servers, confusing the model.
Dead‑but‑Visible Tools (Zombie Registry). After a server restart, network drop, stream stall, or redeploy, the client keeps a stale tool list; calls hang/fail and the model burns tokens.
Throwing exceptions for tool failures. The model can’t see them. Return isError: true with text and let the agent recover.
If you found this post useful, share it with a friend and consider subscribing. I will be sharing more lessons from the trenches of open‑source, Gen AI, and MCP every week.