Building LLM Harnesses#

A harness is the infrastructure that wraps LLM calls into a reliable, testable, and observable workflow. It handles the concerns that a raw API call does not: input preparation, output validation, error recovery, model routing, parallel execution, and quality scoring. Without a harness, you have a script. With one, you have a tool.

Harness Architecture#

Input
  │
  ├── Preprocessing (validate input, select model, prepare prompt)
  │
  ├── Execution (call Ollama with timeout, retry on failure)
  │
  ├── Post-processing (parse output, validate schema, score quality)
  │
  ├── Routing (if quality too low, escalate to larger model or flag)
  │
  └── Output (structured result + metadata)

Core Harness in Python#

import ollama
import json
import time
from dataclasses import dataclass, field
from typing import Any, Callable

@dataclass
class LLMResult:
    content: str
    model: str
    tokens_in: int
    tokens_out: int
    duration_ms: int
    ttft_ms: int
    success: bool
    retries: int = 0
    score: float | None = None
    metadata: dict = field(default_factory=dict)

@dataclass
class HarnessConfig:
    model: str = "qwen2.5-coder:7b"
    temperature: float = 0.0
    max_tokens: int = 1024
    json_mode: bool = False
    timeout_seconds: int = 120
    max_retries: int = 2
    retry_delay_seconds: float = 1.0

def call_llm(
    messages: list[dict],
    config: HarnessConfig,
) -> LLMResult:
    """Make a single LLM call with timing metadata."""
    start = time.monotonic()

    kwargs = {
        "model": config.model,
        "messages": messages,
        "options": {
            "temperature": config.temperature,
            "num_predict": config.max_tokens,
        },
        "stream": False,
    }
    if config.json_mode:
        kwargs["format"] = "json"

    try:
        response = ollama.chat(**kwargs)
        duration = int((time.monotonic() - start) * 1000)

        return LLMResult(
            content=response["message"]["content"],
            model=config.model,
            tokens_in=response.get("prompt_eval_count", 0),
            tokens_out=response.get("eval_count", 0),
            duration_ms=duration,
            ttft_ms=int(response.get("prompt_eval_duration", 0) / 1_000_000),
            success=True,
        )
    except Exception as e:
        duration = int((time.monotonic() - start) * 1000)
        return LLMResult(
            content=str(e),
            model=config.model,
            tokens_in=0,
            tokens_out=0,
            duration_ms=duration,
            ttft_ms=0,
            success=False,
        )

Retry with Validation#

Do not retry blindly. Retry only when the output fails validation:

def call_with_retry(
    messages: list[dict],
    config: HarnessConfig,
    validator: Callable[[str], bool] = None,
) -> LLMResult:
    """Call LLM with retries on validation failure."""
    last_result = None

    for attempt in range(config.max_retries + 1):
        result = call_llm(messages, config)
        result.retries = attempt

        if not result.success:
            last_result = result
            time.sleep(config.retry_delay_seconds)
            continue

        # Validate output
        if validator is None or validator(result.content):
            return result

        last_result = result
        time.sleep(config.retry_delay_seconds)

    # All retries exhausted
    if last_result:
        last_result.success = False
        last_result.metadata["failure_reason"] = "validation_failed_after_retries"
    return last_result

Validators#

def json_validator(content: str) -> bool:
    """Check that output is valid JSON."""
    try:
        json.loads(content)
        return True
    except json.JSONDecodeError:
        return False

def schema_validator(schema: dict):
    """Return a validator that checks JSON matches a schema."""
    required_fields = schema.get("required", [])
    enum_fields = {
        k: v["enum"]
        for k, v in schema.get("properties", {}).items()
        if "enum" in v
    }

    def validate(content: str) -> bool:
        try:
            data = json.loads(content)
        except json.JSONDecodeError:
            return False

        for field_name in required_fields:
            if field_name not in data:
                return False

        for field_name, allowed_values in enum_fields.items():
            if field_name in data and data[field_name] not in allowed_values:
                return False

        return True

    return validate

Model Routing#

Route tasks to different models based on complexity, confidence, or cost budget:

@dataclass
class ModelTier:
    model: str
    max_tokens: int
    cost_per_call: float  # For budgeting, even if $0 for local

TIERS = {
    "fast": ModelTier("qwen3:4b", 512, 0.0),
    "balanced": ModelTier("qwen2.5-coder:7b", 1024, 0.0),
    "capable": ModelTier("qwen2.5-coder:32b", 2048, 0.0),
    "frontier": ModelTier("claude-sonnet", 4096, 0.003),  # Cloud fallback
}

def route_task(task_type: str, input_length: int) -> str:
    """Select model tier based on task characteristics."""
    if task_type in ["extraction", "classification", "routing"]:
        return "fast"
    elif task_type in ["summarization", "format_conversion"]:
        return "balanced"
    elif task_type in ["code_review", "correlation", "refactoring"]:
        return "capable"
    elif task_type in ["architecture", "complex_reasoning"]:
        return "capable"  # Try local first, escalate if needed
    return "balanced"

def escalate_if_needed(result: LLMResult, tier: str) -> LLMResult | None:
    """Escalate to the next tier if quality is insufficient."""
    if result.score is not None and result.score < 0.7:
        next_tier = {
            "fast": "balanced",
            "balanced": "capable",
            "capable": "frontier",
        }.get(tier)

        if next_tier:
            return next_tier  # Caller re-runs with the higher tier
    return None

Confidence-Based Escalation#

def classify_with_escalation(text: str) -> dict:
    """Classify with automatic escalation on low confidence."""
    # Try fast model first
    config = HarnessConfig(model="qwen3:4b", json_mode=True, max_tokens=256)
    result = call_with_retry(
        [{"role": "user", "content": classification_prompt(text)}],
        config,
        validator=json_validator,
    )

    if result.success:
        data = json.loads(result.content)
        if data.get("confidence", 0) >= 0.8:
            return data  # Fast model is confident

    # Escalate to balanced model
    config.model = "qwen2.5-coder:7b"
    result = call_with_retry(
        [{"role": "user", "content": classification_prompt(text)}],
        config,
        validator=json_validator,
    )

    if result.success:
        return json.loads(result.content)

    # All local models failed — flag for review
    return {"category": "unknown", "confidence": 0.0, "needs_review": True}

Parallel Execution#

For batch operations (summarizing many files, classifying many items), run calls in parallel:

from concurrent.futures import ThreadPoolExecutor, as_completed

def batch_process(
    items: list[dict],
    prompt_fn: Callable[[dict], list[dict]],
    config: HarnessConfig,
    max_workers: int = 3,
    validator: Callable[[str], bool] = None,
) -> list[LLMResult]:
    """Process a batch of items in parallel."""
    results = [None] * len(items)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {}
        for i, item in enumerate(items):
            messages = prompt_fn(item)
            future = executor.submit(call_with_retry, messages, config, validator)
            futures[future] = i

        for future in as_completed(futures):
            idx = futures[future]
            results[idx] = future.result()

    return results

Max workers should be 3-4 for Ollama. Ollama processes one inference at a time per model. Additional workers queue up. More than 3-4 workers increases memory pressure without improving throughput.

Scoring Pipeline#

Attach scoring to every harness run for quality tracking:

def scored_extraction(
    text: str,
    schema: dict,
    expected: dict = None,
) -> LLMResult:
    """Run extraction with optional scoring against expected output."""
    config = HarnessConfig(model="qwen2.5-coder:7b", json_mode=True, max_tokens=512)
    prompt = extraction_prompt(text, schema)

    result = call_with_retry(
        [{"role": "user", "content": prompt}],
        config,
        validator=schema_validator(schema),
    )

    if result.success and expected:
        actual = json.loads(result.content)
        score = score_extraction(expected, actual)
        result.score = score["overall"]
        result.metadata["field_scores"] = score

    return result

Aggregate Scoring#

Track quality over time:

def run_scoring_suite(
    test_cases: list[dict],
    model: str,
) -> dict:
    """Run a full scoring suite and report aggregate metrics."""
    results = []

    for case in test_cases:
        result = scored_extraction(
            text=case["input"],
            schema=case["schema"],
            expected=case["expected"],
        )
        results.append({
            "name": case["name"],
            "score": result.score,
            "success": result.success,
            "duration_ms": result.duration_ms,
            "tokens_out": result.tokens_out,
        })

    successful = [r for r in results if r["success"]]

    return {
        "model": model,
        "total_cases": len(results),
        "successful": len(successful),
        "failed": len(results) - len(successful),
        "avg_score": sum(r["score"] for r in successful) / len(successful) if successful else 0,
        "avg_duration_ms": sum(r["duration_ms"] for r in successful) / len(successful) if successful else 0,
        "p95_duration_ms": sorted(r["duration_ms"] for r in successful)[int(len(successful) * 0.95)] if successful else 0,
        "below_80pct": sum(1 for r in successful if r["score"] < 0.8),
    }

Observability#

Log every LLM call for debugging and optimization:

import logging

logger = logging.getLogger("llm-harness")

def log_result(result: LLMResult, task_name: str):
    """Log LLM call metadata for observability."""
    logger.info(
        "llm_call",
        extra={
            "task": task_name,
            "model": result.model,
            "success": result.success,
            "score": result.score,
            "tokens_in": result.tokens_in,
            "tokens_out": result.tokens_out,
            "duration_ms": result.duration_ms,
            "ttft_ms": result.ttft_ms,
            "retries": result.retries,
        },
    )

Track these metrics to answer:

  • Which tasks have the lowest scores? (Improve prompts or escalate to larger model)
  • Which calls are slowest? (Reduce token budget or use smaller model)
  • Which calls retry most? (Fix validation or prompt issues)
  • What is the cost of cloud escalation? (Optimize routing thresholds)

Putting It All Together#

A complete workflow using all the harness components:

def analyze_support_tickets(tickets: list[str]) -> list[dict]:
    """Classify, extract, and route support tickets."""
    results = []

    for ticket in tickets:
        # Step 1: Classify (fast model)
        classification = classify_with_escalation(ticket)

        # Step 2: Extract details (balanced model)
        extraction = scored_extraction(
            text=ticket,
            schema=TICKET_SCHEMA,
        )

        # Step 3: Route based on classification
        if classification["category"] == "billing":
            route = "billing-queue"
        elif classification["category"] == "technical" and classification.get("confidence", 0) > 0.8:
            route = "engineering-queue"
        else:
            route = "triage-queue"

        results.append({
            "ticket": ticket[:100],
            "classification": classification,
            "extraction": json.loads(extraction.content) if extraction.success else None,
            "route": route,
            "quality_score": extraction.score,
        })

    return results

Common Mistakes#

  1. No output validation. Raw LLM output is unreliable. Always validate (at minimum, check JSON parses and required fields exist) before passing results downstream.
  2. Retrying without changing anything. If the model produced bad output, retrying the exact same call often produces the same bad output. Consider adding “Your previous output was invalid because: …” to the retry prompt.
  3. Not tracking quality metrics. Without scoring, you cannot tell if a prompt change improved or degraded quality. Add scoring to every harness, even if you only check it periodically.
  4. Over-parallelizing. More threads does not mean more throughput with Ollama. The model can only process one request at a time. Extra threads just queue up and consume memory.
  5. Building a framework instead of a harness. A harness is 200-400 lines of purpose-built code for your specific workflow. A framework is thousands of lines trying to handle every possible case. Start with a harness. Generalize only when you have three harnesses with clear shared patterns.