MCP Server Development#
This reference covers building MCP servers from scratch – the server lifecycle, defining tools with proper JSON Schema, exposing resources, choosing transports, handling errors, and testing the result. If you want to understand when to use MCP versus alternatives, see the companion article on MCP Server Patterns. This article focuses on how to build one.
Server Lifecycle#
An MCP server goes through four phases: initialization, capability negotiation, operation, and shutdown.
Initialization. The server starts and registers its capabilities – tools, resources, and prompts. Nothing is served yet. This is where you set up database connections, load configuration, and prepare handler functions.
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
const server = new McpServer({
name: "infra-tools",
version: "1.0.0",
});from mcp.server import Server
server = Server("infra-tools")Capability negotiation. When a client connects, it sends an initialize request declaring its own capabilities. The server responds with its capabilities – which primitives it supports (tools, resources, prompts) and any optional features. This handshake ensures both sides know what the other can handle.
Operation. The server handles incoming requests: tools/list, tools/call, resources/list, resources/read, and so on. Each request is a JSON-RPC 2.0 message. The server processes them and returns results.
Shutdown. The client disconnects or sends a shutdown signal. The server cleans up resources – closes database connections, flushes logs, releases locks. For stdio transport, this happens when the parent process terminates. For HTTP transports, the server may continue running for other clients.
Tool Definitions with JSON Schema#
Every tool needs a name, description, input schema, and handler. The input schema uses JSON Schema, which gives the agent structured information about what parameters to provide.
Basic Tool Definition#
server.tool(
"get_pod_logs",
"Retrieve logs from a Kubernetes pod. Returns the last N lines of log output from the specified pod in the given namespace.",
{
namespace: z.string().describe("Kubernetes namespace (e.g., 'production', 'staging')"),
pod_name: z.string().describe("Exact pod name or prefix. If prefix, returns logs from the first matching pod."),
lines: z.number().int().min(1).max(10000).default(100)
.describe("Number of log lines to return. Defaults to 100."),
container: z.string().optional()
.describe("Container name, required only for multi-container pods."),
},
async ({ namespace, pod_name, lines, container }) => {
const args = ["logs", pod_name, "-n", namespace, `--tail=${lines}`];
if (container) args.push("-c", container);
const result = await exec("kubectl", args);
return {
content: [{ type: "text", text: result.stdout }],
};
}
);The Python equivalent using the decorator pattern:
@server.tool()
async def get_pod_logs(
namespace: str,
pod_name: str,
lines: int = 100,
container: str | None = None,
) -> list[types.TextContent]:
"""Retrieve logs from a Kubernetes pod.
Returns the last N lines of log output from the specified pod
in the given namespace.
"""
cmd = ["kubectl", "logs", pod_name, "-n", namespace, f"--tail={lines}"]
if container:
cmd.extend(["-c", container])
result = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode != 0:
return [types.TextContent(type="text", text=f"Error: {stderr.decode()}")]
return [types.TextContent(type="text", text=stdout.decode())]Schema Design Principles#
Write schemas that prevent misuse, not just schemas that describe inputs.
Use enums for constrained values. If a parameter only accepts specific values, declare them. The agent will not guess wrong if the schema constrains the options.
environment: z.enum(["production", "staging", "development"])
.describe("Target environment"),Set defaults for optional parameters. If most callers want 100 log lines, make that the default. The agent can skip the parameter for the common case.
Use descriptions that explain intent, not just type. "The namespace" is useless. "Kubernetes namespace where the target pod runs (e.g., 'production', 'staging')" tells the agent what value to provide.
Validate beyond what JSON Schema catches. JSON Schema validates structure. Your handler must validate semantics – path traversal, SQL injection, resource existence.
async ({ path }) => {
const resolved = resolve(path);
if (!resolved.startsWith(ALLOWED_ROOT)) {
return {
isError: true,
content: [{ type: "text", text: `Access denied: ${path} is outside the allowed directory` }],
};
}
// proceed with validated path
}Resource Providers#
Resources expose data the agent can read. Unlike tools, resources are not invoked with parameters – they have URIs and return content when read. Think of them as read-only endpoints.
server.resource(
"cluster-config",
"k8s://config/current",
async (uri) => {
const config = await readFile(KUBECONFIG_PATH, "utf-8");
return {
contents: [{
uri: uri.href,
mimeType: "application/yaml",
text: config,
}],
};
}
);For dynamic resources where the URI determines what to return, use resource templates:
@server.list_resources()
async def list_resources() -> list[types.Resource]:
namespaces = await get_namespaces()
return [
types.Resource(
uri=f"k8s://namespaces/{ns}/pods",
name=f"Pods in {ns}",
description=f"List of running pods in the {ns} namespace",
mimeType="application/json",
)
for ns in namespaces
]
@server.read_resource()
async def read_resource(uri: AnyUrl) -> str:
parts = str(uri).replace("k8s://", "").split("/")
namespace = parts[1]
resource_type = parts[2]
if resource_type == "pods":
pods = await get_pods(namespace)
return json.dumps(pods, indent=2)
raise ValueError(f"Unknown resource: {uri}")Resources are best for data that the agent might want to browse or reference without executing an operation – configuration files, environment variables, cluster state.
Transport Options#
stdio: Local Development and CLI Tools#
The agent spawns your server as a subprocess. Communication happens over stdin/stdout using JSON-RPC 2.0.
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const transport = new StdioServerTransport();
await server.connect(transport);This is the fastest transport to implement. No ports, no authentication, no networking. The server’s lifetime is tied to the agent’s session.
Practical constraints: stdout is reserved for MCP messages. All logging must go to stderr. If your tool accidentally writes to stdout, it will corrupt the MCP protocol stream and crash the connection.
import sys
import logging
# Configure logging to stderr, never stdout
logging.basicConfig(stream=sys.stderr, level=logging.INFO)HTTP with Server-Sent Events#
For remote servers that need to serve multiple clients.
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import express from "express";
const app = express();
app.get("/sse", async (req, res) => {
const transport = new SSEServerTransport("/message", res);
await server.connect(transport);
});
app.post("/message", async (req, res) => {
await transport.handlePostMessage(req, res);
});
app.listen(3001);HTTP+SSE supports multiple concurrent clients and persistent server state. It also introduces complexity: you need authentication, session management, and health monitoring.
Streamable HTTP#
The newest transport. A single HTTP endpoint handles both requests and responses. The server can optionally upgrade to SSE streaming for long-running operations.
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID() });
await server.connect(transport);Prefer Streamable HTTP over HTTP+SSE for new remote servers. It is simpler to deploy (one endpoint instead of two) and handles stateless interactions without requiring an SSE connection.
Error Handling Patterns#
Errors in MCP servers fall into two categories: tool execution errors (the operation failed) and protocol errors (the request was malformed). Handle them differently.
Tool execution errors use the isError flag. The server stays running; the agent receives a structured error it can reason about.
async ({ query }) => {
try {
const result = await db.execute(query);
return { content: [{ type: "text", text: JSON.stringify(result) }] };
} catch (err) {
return {
isError: true,
content: [{ type: "text", text: `Query failed: ${err.message}. Check syntax and table names.` }],
};
}
}Protocol errors use JSON-RPC error codes. These are thrown when the request itself is invalid – unknown tool name, missing required parameter, malformed JSON.
Timeouts must be handled in the handler. If a kubectl command hangs, your server hangs. Always set execution timeouts.
async def run_with_timeout(cmd: list[str], timeout: int = 30) -> str:
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return stdout.decode()
except asyncio.TimeoutError:
proc.kill()
raise TimeoutError(f"Command timed out after {timeout}s: {' '.join(cmd)}")Testing Strategies#
MCP servers need three levels of testing.
Unit tests for handlers. Test each tool handler in isolation. Mock external dependencies (kubectl, APIs, databases) and verify the handler returns correct MCP response structures.
describe("get_pod_logs", () => {
it("returns logs for a valid pod", async () => {
mockExec.mockResolvedValue({ stdout: "log line 1\nlog line 2\n" });
const result = await handler({ namespace: "default", pod_name: "web-abc123", lines: 10 });
expect(result.isError).toBeUndefined();
expect(result.content[0].text).toContain("log line 1");
expect(mockExec).toHaveBeenCalledWith("kubectl", ["logs", "web-abc123", "-n", "default", "--tail=10"]);
});
it("returns error for nonexistent pod", async () => {
mockExec.mockRejectedValue(new Error("pod not found"));
const result = await handler({ namespace: "default", pod_name: "ghost", lines: 10 });
expect(result.isError).toBe(true);
});
});Integration tests with the MCP client. Use the MCP SDK’s client to connect to your server and exercise the full protocol – tool discovery, invocation, and error handling.
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def test_tool_discovery():
params = StdioServerParameters(command="python", args=["server.py"])
async with stdio_client(params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await session.list_tools()
tool_names = [t.name for t in tools.tools]
assert "get_pod_logs" in tool_namesEnd-to-end tests with real infrastructure. For tools that interact with Kubernetes, databases, or APIs, run tests against a real (or local) environment. Use minikube, Docker Compose, or test databases. These tests are slower but catch real integration issues – wrong kubectl flags, permission errors, unexpected output formats.
Tag these tests so they can be skipped in CI when the infrastructure is not available:
@pytest.mark.integration
@pytest.mark.skipif(not shutil.which("kubectl"), reason="kubectl not available")
async def test_real_pod_logs():
result = await get_pod_logs(namespace="default", pod_name="nginx", lines=5)
assert len(result) > 0Project Structure#
A well-organized MCP server project looks like this:
my-mcp-server/
src/
server.ts # Server setup and transport
tools/
kubernetes.ts # K8s-related tools
database.ts # DB-related tools
resources/
config.ts # Configuration resources
utils/
validation.ts # Input validation helpers
exec.ts # Command execution with timeout
tests/
tools/
kubernetes.test.ts
database.test.ts
integration/
client.test.ts
package.json
tsconfig.jsonGroup tools by domain. Each file exports tool registration functions that the main server file calls during initialization. This keeps individual files focused and testable.