Building LLM Harnesses#
A harness is the infrastructure that wraps LLM calls into a reliable, testable, and observable workflow. It handles the concerns that a raw API call does not: input preparation, output validation, error recovery, model routing, parallel execution, and quality scoring. Without a harness, you have a script. With one, you have a tool.
Harness Architecture#
Input
│
├── Preprocessing (validate input, select model, prepare prompt)
│
├── Execution (call Ollama with timeout, retry on failure)
│
├── Post-processing (parse output, validate schema, score quality)
│
├── Routing (if quality too low, escalate to larger model or flag)
│
└── Output (structured result + metadata)Core Harness in Python#
import ollama
import json
import time
from dataclasses import dataclass, field
from typing import Any, Callable
@dataclass
class LLMResult:
content: str
model: str
tokens_in: int
tokens_out: int
duration_ms: int
ttft_ms: int
success: bool
retries: int = 0
score: float | None = None
metadata: dict = field(default_factory=dict)
@dataclass
class HarnessConfig:
model: str = "qwen2.5-coder:7b"
temperature: float = 0.0
max_tokens: int = 1024
json_mode: bool = False
timeout_seconds: int = 120
max_retries: int = 2
retry_delay_seconds: float = 1.0
def call_llm(
messages: list[dict],
config: HarnessConfig,
) -> LLMResult:
"""Make a single LLM call with timing metadata."""
start = time.monotonic()
kwargs = {
"model": config.model,
"messages": messages,
"options": {
"temperature": config.temperature,
"num_predict": config.max_tokens,
},
"stream": False,
}
if config.json_mode:
kwargs["format"] = "json"
try:
response = ollama.chat(**kwargs)
duration = int((time.monotonic() - start) * 1000)
return LLMResult(
content=response["message"]["content"],
model=config.model,
tokens_in=response.get("prompt_eval_count", 0),
tokens_out=response.get("eval_count", 0),
duration_ms=duration,
ttft_ms=int(response.get("prompt_eval_duration", 0) / 1_000_000),
success=True,
)
except Exception as e:
duration = int((time.monotonic() - start) * 1000)
return LLMResult(
content=str(e),
model=config.model,
tokens_in=0,
tokens_out=0,
duration_ms=duration,
ttft_ms=0,
success=False,
)Retry with Validation#
Do not retry blindly. Retry only when the output fails validation:
def call_with_retry(
messages: list[dict],
config: HarnessConfig,
validator: Callable[[str], bool] = None,
) -> LLMResult:
"""Call LLM with retries on validation failure."""
last_result = None
for attempt in range(config.max_retries + 1):
result = call_llm(messages, config)
result.retries = attempt
if not result.success:
last_result = result
time.sleep(config.retry_delay_seconds)
continue
# Validate output
if validator is None or validator(result.content):
return result
last_result = result
time.sleep(config.retry_delay_seconds)
# All retries exhausted
if last_result:
last_result.success = False
last_result.metadata["failure_reason"] = "validation_failed_after_retries"
return last_resultValidators#
def json_validator(content: str) -> bool:
"""Check that output is valid JSON."""
try:
json.loads(content)
return True
except json.JSONDecodeError:
return False
def schema_validator(schema: dict):
"""Return a validator that checks JSON matches a schema."""
required_fields = schema.get("required", [])
enum_fields = {
k: v["enum"]
for k, v in schema.get("properties", {}).items()
if "enum" in v
}
def validate(content: str) -> bool:
try:
data = json.loads(content)
except json.JSONDecodeError:
return False
for field_name in required_fields:
if field_name not in data:
return False
for field_name, allowed_values in enum_fields.items():
if field_name in data and data[field_name] not in allowed_values:
return False
return True
return validateModel Routing#
Route tasks to different models based on complexity, confidence, or cost budget:
@dataclass
class ModelTier:
model: str
max_tokens: int
cost_per_call: float # For budgeting, even if $0 for local
TIERS = {
"fast": ModelTier("qwen3:4b", 512, 0.0),
"balanced": ModelTier("qwen2.5-coder:7b", 1024, 0.0),
"capable": ModelTier("qwen2.5-coder:32b", 2048, 0.0),
"frontier": ModelTier("claude-sonnet", 4096, 0.003), # Cloud fallback
}
def route_task(task_type: str, input_length: int) -> str:
"""Select model tier based on task characteristics."""
if task_type in ["extraction", "classification", "routing"]:
return "fast"
elif task_type in ["summarization", "format_conversion"]:
return "balanced"
elif task_type in ["code_review", "correlation", "refactoring"]:
return "capable"
elif task_type in ["architecture", "complex_reasoning"]:
return "capable" # Try local first, escalate if needed
return "balanced"
def escalate_if_needed(result: LLMResult, tier: str) -> LLMResult | None:
"""Escalate to the next tier if quality is insufficient."""
if result.score is not None and result.score < 0.7:
next_tier = {
"fast": "balanced",
"balanced": "capable",
"capable": "frontier",
}.get(tier)
if next_tier:
return next_tier # Caller re-runs with the higher tier
return NoneConfidence-Based Escalation#
def classify_with_escalation(text: str) -> dict:
"""Classify with automatic escalation on low confidence."""
# Try fast model first
config = HarnessConfig(model="qwen3:4b", json_mode=True, max_tokens=256)
result = call_with_retry(
[{"role": "user", "content": classification_prompt(text)}],
config,
validator=json_validator,
)
if result.success:
data = json.loads(result.content)
if data.get("confidence", 0) >= 0.8:
return data # Fast model is confident
# Escalate to balanced model
config.model = "qwen2.5-coder:7b"
result = call_with_retry(
[{"role": "user", "content": classification_prompt(text)}],
config,
validator=json_validator,
)
if result.success:
return json.loads(result.content)
# All local models failed — flag for review
return {"category": "unknown", "confidence": 0.0, "needs_review": True}Parallel Execution#
For batch operations (summarizing many files, classifying many items), run calls in parallel:
from concurrent.futures import ThreadPoolExecutor, as_completed
def batch_process(
items: list[dict],
prompt_fn: Callable[[dict], list[dict]],
config: HarnessConfig,
max_workers: int = 3,
validator: Callable[[str], bool] = None,
) -> list[LLMResult]:
"""Process a batch of items in parallel."""
results = [None] * len(items)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {}
for i, item in enumerate(items):
messages = prompt_fn(item)
future = executor.submit(call_with_retry, messages, config, validator)
futures[future] = i
for future in as_completed(futures):
idx = futures[future]
results[idx] = future.result()
return resultsMax workers should be 3-4 for Ollama. Ollama processes one inference at a time per model. Additional workers queue up. More than 3-4 workers increases memory pressure without improving throughput.
Scoring Pipeline#
Attach scoring to every harness run for quality tracking:
def scored_extraction(
text: str,
schema: dict,
expected: dict = None,
) -> LLMResult:
"""Run extraction with optional scoring against expected output."""
config = HarnessConfig(model="qwen2.5-coder:7b", json_mode=True, max_tokens=512)
prompt = extraction_prompt(text, schema)
result = call_with_retry(
[{"role": "user", "content": prompt}],
config,
validator=schema_validator(schema),
)
if result.success and expected:
actual = json.loads(result.content)
score = score_extraction(expected, actual)
result.score = score["overall"]
result.metadata["field_scores"] = score
return resultAggregate Scoring#
Track quality over time:
def run_scoring_suite(
test_cases: list[dict],
model: str,
) -> dict:
"""Run a full scoring suite and report aggregate metrics."""
results = []
for case in test_cases:
result = scored_extraction(
text=case["input"],
schema=case["schema"],
expected=case["expected"],
)
results.append({
"name": case["name"],
"score": result.score,
"success": result.success,
"duration_ms": result.duration_ms,
"tokens_out": result.tokens_out,
})
successful = [r for r in results if r["success"]]
return {
"model": model,
"total_cases": len(results),
"successful": len(successful),
"failed": len(results) - len(successful),
"avg_score": sum(r["score"] for r in successful) / len(successful) if successful else 0,
"avg_duration_ms": sum(r["duration_ms"] for r in successful) / len(successful) if successful else 0,
"p95_duration_ms": sorted(r["duration_ms"] for r in successful)[int(len(successful) * 0.95)] if successful else 0,
"below_80pct": sum(1 for r in successful if r["score"] < 0.8),
}Observability#
Log every LLM call for debugging and optimization:
import logging
logger = logging.getLogger("llm-harness")
def log_result(result: LLMResult, task_name: str):
"""Log LLM call metadata for observability."""
logger.info(
"llm_call",
extra={
"task": task_name,
"model": result.model,
"success": result.success,
"score": result.score,
"tokens_in": result.tokens_in,
"tokens_out": result.tokens_out,
"duration_ms": result.duration_ms,
"ttft_ms": result.ttft_ms,
"retries": result.retries,
},
)Track these metrics to answer:
- Which tasks have the lowest scores? (Improve prompts or escalate to larger model)
- Which calls are slowest? (Reduce token budget or use smaller model)
- Which calls retry most? (Fix validation or prompt issues)
- What is the cost of cloud escalation? (Optimize routing thresholds)
Putting It All Together#
A complete workflow using all the harness components:
def analyze_support_tickets(tickets: list[str]) -> list[dict]:
"""Classify, extract, and route support tickets."""
results = []
for ticket in tickets:
# Step 1: Classify (fast model)
classification = classify_with_escalation(ticket)
# Step 2: Extract details (balanced model)
extraction = scored_extraction(
text=ticket,
schema=TICKET_SCHEMA,
)
# Step 3: Route based on classification
if classification["category"] == "billing":
route = "billing-queue"
elif classification["category"] == "technical" and classification.get("confidence", 0) > 0.8:
route = "engineering-queue"
else:
route = "triage-queue"
results.append({
"ticket": ticket[:100],
"classification": classification,
"extraction": json.loads(extraction.content) if extraction.success else None,
"route": route,
"quality_score": extraction.score,
})
return resultsCommon Mistakes#
- No output validation. Raw LLM output is unreliable. Always validate (at minimum, check JSON parses and required fields exist) before passing results downstream.
- Retrying without changing anything. If the model produced bad output, retrying the exact same call often produces the same bad output. Consider adding “Your previous output was invalid because: …” to the retry prompt.
- Not tracking quality metrics. Without scoring, you cannot tell if a prompt change improved or degraded quality. Add scoring to every harness, even if you only check it periodically.
- Over-parallelizing. More threads does not mean more throughput with Ollama. The model can only process one request at a time. Extra threads just queue up and consume memory.
- Building a framework instead of a harness. A harness is 200-400 lines of purpose-built code for your specific workflow. A framework is thousands of lines trying to handle every possible case. Start with a harness. Generalize only when you have three harnesses with clear shared patterns.