Building a Temporal Worker Bridge#
The architecture article evaluated three cross-cluster communication patterns and identified the worker bridge as the best fit for most open-source Temporal deployments. This article builds the bridge.
The worker bridge is a single binary that holds connections to two Temporal clusters. It polls Cluster A for tasks on a dedicated queue and executes those tasks using Cluster B’s resources – its Temporal client, databases, APIs, and services. From Cluster A’s perspective, the bridge is just another worker. From Cluster B’s perspective, the bridge is just another client starting workflows.
All code is in the companion repository at github.com/statherm/temporal-examples under cross-cluster/bridge/.
Bridge Architecture Recap#
Cluster A (Source) Cluster B (Execution)
┌─────────────────────┐ ┌─────────────────────┐
│ Temporal Server A │ │ Temporal Server B │
│ │ │ │
│ CrossClusterWF │ ┌────────┐ │ FulfillmentWF │
│ ───────────────── │ │ Bridge │ │ ───────────────── │
│ Task Queue: │◄─│ Worker │─►│ Task Queue: │
│ bridge-queue │ │ │ │ fulfillment-queue │
│ │ │ Polls │ │ │
│ Waits for result │ │ A │ │ Executes locally │
│ from bridge │ │ Starts │ │ Returns result │
│ │ │ on B │ │ │
└─────────────────────┘ └────────┘ └─────────────────────┘The bridge worker process:
- Connects to Cluster A (source) and registers on the
bridge-queuetask queue - Connects to Cluster B (execution target) for starting and monitoring remote workflows
- When a task arrives on
bridge-queue, the bridge activity starts a corresponding workflow on Cluster B - The bridge activity polls the Cluster B workflow until it completes, then returns the result to Cluster A
The Bridge Worker Binary#
The bridge binary establishes two independent Temporal client connections at startup. Configuration comes from environment variables, making it easy to deploy in different environments.
package main
import (
"log"
"os"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"github.com/statherm/temporal-examples/cross-cluster/bridge"
)
func main() {
// Connect to Cluster A — the source of bridge tasks
clientA, err := client.Dial(client.Options{
HostPort: getEnv("CLUSTER_A_HOST", "localhost:7233"),
Namespace: getEnv("CLUSTER_A_NAMESPACE", "default"),
})
if err != nil {
log.Fatalf("Failed to connect to Cluster A: %v", err)
}
defer clientA.Close()
// Connect to Cluster B — where work actually executes
clientB, err := client.Dial(client.Options{
HostPort: getEnv("CLUSTER_B_HOST", "localhost:7234"),
Namespace: getEnv("CLUSTER_B_NAMESPACE", "default"),
})
if err != nil {
log.Fatalf("Failed to connect to Cluster B: %v", err)
}
defer clientB.Close()
// Create bridge activities with Cluster B client
bridgeActivities := bridge.NewBridgeActivities(clientB, bridge.BridgeConfig{
SourceCluster: getEnv("CLUSTER_A_NAME", "cluster-a"),
DestinationCluster: getEnv("CLUSTER_B_NAME", "cluster-b"),
PollInterval: 5 * time.Second,
MaxPollAttempts: 720, // 1 hour at 5s intervals
})
// Register worker on Cluster A's bridge queue
w := worker.New(clientA, "bridge-queue", worker.Options{
MaxConcurrentActivityExecutionSize: 10,
})
w.RegisterWorkflow(bridge.CrossClusterWorkflow)
w.RegisterActivity(bridgeActivities)
log.Println("Bridge worker starting",
"clusterA", getEnv("CLUSTER_A_HOST", "localhost:7233"),
"clusterB", getEnv("CLUSTER_B_HOST", "localhost:7234"))
if err := w.Run(worker.InterruptCh()); err != nil {
log.Fatalf("Bridge worker failed: %v", err)
}
}
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}Key points:
clientAis used only for polling tasks. The worker registers on Cluster A’sbridge-queue.clientBis used for executing work. Bridge activities use it to start workflows, query status, and retrieve results from Cluster B.- The two clients are completely independent. They can point to different Temporal versions, different namespaces, and different network segments.
Bridge Activities#
Bridge activities wrap the Cluster B client and provide three operations: start a remote workflow, query its status, and wait for completion.
package bridge
import (
"context"
"errors"
"fmt"
"time"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
)
type BridgeConfig struct {
SourceCluster string
DestinationCluster string
PollInterval time.Duration
MaxPollAttempts int
}
type BridgeActivities struct {
remoteClient client.Client
config BridgeConfig
}
func NewBridgeActivities(remoteClient client.Client, config BridgeConfig) *BridgeActivities {
return &BridgeActivities{
remoteClient: remoteClient,
config: config,
}
}
type RemoteWorkflowRequest struct {
SourceWorkflowID string
WorkflowType string
TaskQueue string
Input []byte // Serialized workflow input
Timeout time.Duration
}
type RemoteWorkflowResult struct {
WorkflowID string
RunID string
Status string
Output []byte // Serialized workflow output
}StartWorkflowInRemote#
This activity starts a workflow on Cluster B with an idempotent workflow ID derived from the source workflow:
func (b *BridgeActivities) StartWorkflowInRemote(
ctx context.Context,
req RemoteWorkflowRequest,
) (*RemoteWorkflowResult, error) {
logger := activity.GetLogger(ctx)
// Generate deterministic workflow ID for idempotency
remoteWorkflowID := GenerateCrossClusterKey(
b.config.SourceCluster,
req.SourceWorkflowID,
req.WorkflowType,
)
timeout := req.Timeout
if timeout == 0 {
timeout = 1 * time.Hour
}
opts := client.StartWorkflowOptions{
ID: remoteWorkflowID,
TaskQueue: req.TaskQueue,
WorkflowExecutionTimeout: timeout,
}
we, err := b.remoteClient.ExecuteWorkflow(ctx, opts, req.WorkflowType, req.Input)
if err != nil {
// Handle already-started as success (idempotent retry)
var alreadyStarted *serviceerror.WorkflowExecutionAlreadyStarted
if errors.As(err, &alreadyStarted) {
logger.Info("Remote workflow already exists, attaching",
"workflowID", remoteWorkflowID)
return &RemoteWorkflowResult{
WorkflowID: remoteWorkflowID,
Status: "already_running",
}, nil
}
return nil, fmt.Errorf("start remote workflow failed: %w", err)
}
logger.Info("Started remote workflow",
"workflowID", we.GetID(),
"runID", we.GetRunID(),
"cluster", b.config.DestinationCluster)
return &RemoteWorkflowResult{
WorkflowID: we.GetID(),
RunID: we.GetRunID(),
Status: "started",
}, nil
}WaitForRemoteCompletion#
This activity polls the remote workflow until it completes, using activity heartbeating to prevent Temporal from timing out the long-running poll:
func (b *BridgeActivities) WaitForRemoteCompletion(
ctx context.Context,
workflowID string,
runID string,
) (*RemoteWorkflowResult, error) {
logger := activity.GetLogger(ctx)
for attempt := 0; attempt < b.config.MaxPollAttempts; attempt++ {
// Heartbeat to prevent activity timeout
activity.RecordHeartbeat(ctx, fmt.Sprintf("poll attempt %d", attempt))
// Check if activity context is cancelled
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Describe the remote workflow execution
resp, err := b.remoteClient.DescribeWorkflowExecution(ctx, workflowID, runID)
if err != nil {
logger.Warn("Failed to describe remote workflow, retrying",
"workflowID", workflowID,
"error", err,
"attempt", attempt)
time.Sleep(b.config.PollInterval)
continue
}
status := resp.WorkflowExecutionInfo.Status
switch status {
case enums.WORKFLOW_EXECUTION_STATUS_COMPLETED:
logger.Info("Remote workflow completed",
"workflowID", workflowID)
// Get the result
run := b.remoteClient.GetWorkflow(ctx, workflowID, runID)
var output []byte
if err := run.Get(ctx, &output); err != nil {
return nil, fmt.Errorf("get remote workflow result: %w", err)
}
return &RemoteWorkflowResult{
WorkflowID: workflowID,
RunID: runID,
Status: "completed",
Output: output,
}, nil
case enums.WORKFLOW_EXECUTION_STATUS_FAILED:
return &RemoteWorkflowResult{
WorkflowID: workflowID,
RunID: runID,
Status: "failed",
}, fmt.Errorf("remote workflow failed: %s", workflowID)
case enums.WORKFLOW_EXECUTION_STATUS_CANCELED:
return &RemoteWorkflowResult{
WorkflowID: workflowID,
RunID: runID,
Status: "canceled",
}, fmt.Errorf("remote workflow was canceled: %s", workflowID)
case enums.WORKFLOW_EXECUTION_STATUS_TERMINATED:
return &RemoteWorkflowResult{
WorkflowID: workflowID,
RunID: runID,
Status: "terminated",
}, fmt.Errorf("remote workflow was terminated: %s", workflowID)
default:
// Still running, wait and poll again
logger.Debug("Remote workflow still running",
"workflowID", workflowID,
"status", status,
"attempt", attempt)
}
time.Sleep(b.config.PollInterval)
}
return nil, fmt.Errorf("timed out waiting for remote workflow %s after %d attempts",
workflowID, b.config.MaxPollAttempts)
}Cross-Cluster Idempotency#
Idempotency is critical in bridge scenarios. The bridge activity may be retried (worker crash, network timeout, Temporal retry policy), and each retry must not create a duplicate workflow on the remote cluster.
The key strategy is deterministic workflow IDs:
func GenerateCrossClusterKey(sourceCluster, workflowID, activityName string) string {
return fmt.Sprintf("bridge-%s-%s-%s", sourceCluster, workflowID, activityName)
}
func GenerateKey(workflowID, activityName, attempt string) string {
return fmt.Sprintf("%s:%s:%s", workflowID, activityName, attempt)
}By constructing the remote workflow ID from the source cluster name, source workflow ID, and activity type, we guarantee that retries of the same bridge activity always target the same remote workflow. Temporal’s “workflow already exists” semantics handle the deduplication.
This means:
- First attempt: starts remote workflow
bridge-cluster-a-order-123-fulfillment - Bridge worker crashes and retries: tries to start
bridge-cluster-a-order-123-fulfillmentagain, getsWorkflowExecutionAlreadyStarted, attaches to existing execution - Result: exactly one fulfillment workflow runs, regardless of how many times the bridge retries
The CrossCluster Workflow#
The workflow that runs on Cluster A orchestrates the bridge activities:
package bridge
import (
"fmt"
"time"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
type CrossClusterRequest struct {
WorkflowType string
TaskQueue string
Input []byte
Timeout time.Duration
}
func CrossClusterWorkflow(ctx workflow.Context, req CrossClusterRequest) (*RemoteWorkflowResult, error) {
logger := workflow.GetLogger(ctx)
// Activity options with heartbeat for long-running poll
activityOpts := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Hour,
HeartbeatTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
},
}
ctx = workflow.WithActivityOptions(ctx, activityOpts)
workflowInfo := workflow.GetInfo(ctx)
// Step 1: Start the remote workflow
remoteReq := RemoteWorkflowRequest{
SourceWorkflowID: workflowInfo.WorkflowExecution.ID,
WorkflowType: req.WorkflowType,
TaskQueue: req.TaskQueue,
Input: req.Input,
Timeout: req.Timeout,
}
var startResult *RemoteWorkflowResult
err := workflow.ExecuteActivity(ctx, (*BridgeActivities).StartWorkflowInRemote, remoteReq).
Get(ctx, &startResult)
if err != nil {
return nil, fmt.Errorf("failed to start remote workflow: %w", err)
}
logger.Info("Remote workflow started",
"remoteWorkflowID", startResult.WorkflowID,
"status", startResult.Status)
// Step 2: Wait for the remote workflow to complete
var finalResult *RemoteWorkflowResult
err = workflow.ExecuteActivity(ctx, (*BridgeActivities).WaitForRemoteCompletion,
startResult.WorkflowID, startResult.RunID).
Get(ctx, &finalResult)
if err != nil {
return nil, fmt.Errorf("remote workflow execution failed: %w", err)
}
logger.Info("Remote workflow completed",
"remoteWorkflowID", finalResult.WorkflowID,
"status", finalResult.Status)
return finalResult, nil
}Error Handling#
Cross-cluster operations have more failure modes than single-cluster workflows. The bridge must handle:
Cluster B Unreachable#
If the bridge cannot connect to Cluster B, StartWorkflowInRemote fails. Temporal’s retry policy on the activity retries with exponential backoff. If Cluster B is down for an extended period, the activity eventually exhausts its retries and the calling workflow receives an error.
Configure retry policies based on your SLA:
retryPolicy := &temporal.RetryPolicy{
InitialInterval: 5 * time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 5 * time.Minute,
MaximumAttempts: 0, // Retry indefinitely
NonRetryableErrorTypes: []string{"INVALID_REQUEST"}, // Don't retry bad inputs
}Remote Workflow Failure#
If the workflow on Cluster B fails, WaitForRemoteCompletion detects it via the WORKFLOW_EXECUTION_STATUS_FAILED status and returns an error to the calling workflow. The calling workflow can then decide to retry, compensate, or escalate.
Bridge Worker Crash During Poll#
If the bridge worker crashes while polling for remote completion, Temporal detects the missed heartbeat, times out the activity, and schedules it on another bridge worker. The new worker resumes polling with the same remote workflow ID. Because the remote workflow ID is deterministic, it attaches to the same execution.
Deploying the Bridge#
The bridge runs as a standard Kubernetes Deployment. It needs network access to both clusters.
apiVersion: apps/v1
kind: Deployment
metadata:
name: temporal-bridge-worker
labels:
app: temporal-bridge
spec:
replicas: 2 # HA: at least 2 replicas
selector:
matchLabels:
app: temporal-bridge
template:
metadata:
labels:
app: temporal-bridge
spec:
containers:
- name: bridge
image: ghcr.io/statherm/temporal-bridge:latest
env:
- name: CLUSTER_A_HOST
valueFrom:
configMapKeyRef:
name: bridge-config
key: cluster-a-host
- name: CLUSTER_A_NAMESPACE
value: "default"
- name: CLUSTER_B_HOST
valueFrom:
configMapKeyRef:
name: bridge-config
key: cluster-b-host
- name: CLUSTER_B_NAMESPACE
value: "default"
- name: CLUSTER_A_NAME
value: "cluster-a"
- name: CLUSTER_B_NAME
value: "cluster-b"
resources:
requests:
cpu: 250m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
readinessProbe:
exec:
command:
- /bin/sh
- -c
- "pgrep -f bridge-worker"
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: ConfigMap
metadata:
name: bridge-config
data:
cluster-a-host: "temporal-frontend.temporal-cluster-a.svc.cluster.local:7233"
cluster-b-host: "temporal-frontend.default.svc.cluster.local:7233"Deploy the bridge to Cluster B (where it has local access to Cluster B’s resources):
minikube profile temporal-cluster-b
kubectl apply -f deploy/bridge-worker.yamlEnd-to-End Test#
With both clusters running (see Multi-Cluster Minikube Setup) and the bridge deployed, test the full flow.
Start a cross-cluster workflow on Cluster A:
# Ensure you are pointing at Cluster A
export TEMPORAL_ADDRESS=localhost:7233
# Start the cross-cluster workflow
temporal workflow start \
--task-queue bridge-queue \
--type CrossClusterWorkflow \
--input '{"WorkflowType":"FulfillmentWorkflow","TaskQueue":"fulfillment-queue","Input":"eyJvcmRlcklEIjoiMTIzIn0="}'Monitor the workflow in both Web UIs:
- Cluster A (http://localhost:8080): Shows the
CrossClusterWorkflowrunning, withStartWorkflowInRemoteandWaitForRemoteCompletionactivities - Cluster B (http://localhost:8081): Shows the
FulfillmentWorkflowstarted by the bridge
The companion repository provides a Makefile target for the full flow:
make clusters-up deploy-bridge start-cross-cluster-workflowThis starts both clusters, deploys the bridge, starts a test workflow on Cluster A, and tails the bridge worker logs.
Monitoring and Observability#
Bridge workers need additional monitoring beyond standard Temporal worker metrics.
Key Metrics#
Track these bridge-specific metrics:
| Metric | Description | Alert Threshold |
|---|---|---|
bridge_cross_cluster_latency_ms | Time from activity start to remote workflow completion | p99 > 60s |
bridge_queue_depth | Number of pending tasks on bridge-queue | > 100 |
bridge_remote_start_errors | Failed remote workflow starts per minute | > 5/min |
bridge_remote_poll_errors | Failed status polls per minute | > 10/min |
bridge_heartbeat_age_ms | Time since last successful heartbeat | > 25s |
Structured Logging#
Every bridge log line should include both cluster contexts:
logger.Info("Bridge activity executing",
"sourceCluster", b.config.SourceCluster,
"destinationCluster", b.config.DestinationCluster,
"sourceWorkflowID", req.SourceWorkflowID,
"remoteWorkflowID", remoteWorkflowID,
"remoteTaskQueue", req.TaskQueue)This makes it possible to correlate events across clusters when debugging. Use a shared request ID or trace ID that propagates through both clusters.
Production Considerations#
Bridge Replicas#
Run at least two bridge replicas. Temporal distributes tasks across all workers polling the same task queue, so multiple bridges provide both throughput and availability. If one bridge crashes, the other continues processing tasks, and Temporal reschedules the failed bridge’s in-progress activities after the heartbeat timeout.
Graceful Shutdown#
The bridge binary uses worker.InterruptCh() to handle SIGTERM gracefully. On shutdown, it stops polling for new tasks and waits for in-progress activities to complete (up to a configured drain timeout). In Kubernetes, set terminationGracePeriodSeconds to match your longest expected activity duration.
Version Compatibility#
The bridge connects to two Temporal servers that may run different versions. The Temporal Go SDK is backward-compatible with older server versions, but not forward-compatible. Run the bridge with the SDK version matching your newer cluster, and verify compatibility with the older cluster in staging first.
Capacity Planning#
Each bridge activity holds two open connections (one to each cluster) and one polling goroutine for the remote workflow. With MaxConcurrentActivityExecutionSize: 10, each bridge replica handles 10 concurrent cross-cluster operations. Scale replicas based on your expected cross-cluster throughput.
Next Steps#
You now have a working cross-cluster Temporal setup with a bridge pattern. This forms the foundation for multi-region deployment strategies. For the underlying signal-based coordination patterns, see Temporal Signals for Automated Coordination. For the infrastructure setup, see Multiple Temporal Servers on Minikube. For the container lifecycle workflows that the bridge can orchestrate, see Container Lifecycle Workflow.