Updated March 2026

Temporal.io
Best Practices Guide

Workers, Workflows & Activities — with Python & Go recommendations, local testing patterns, Docker setup, and Azure deployment tutorials.

Section 01

Core Concepts & Architecture

Temporal separates the orchestration engine (Temporal Server) from the execution engine (Workers). The Server persists Workflow state as an append-only Event History. Workers poll Task Queues, execute code, and report results back.

Component Responsibility Key Characteristic
Temporal ServerState management, task routing, timersMust be highly available
WorkerExecutes Workflow and Activity codeStateless, horizontally scalable
WorkflowOrchestration logic (deterministic)Must not perform I/O directly
ActivityBusiness logic with side effectsCan call APIs, databases, etc.
Task QueueRoutes tasks to appropriate WorkersLightweight, dynamically created
Section 02

Workflow Best Practices

Keep Workflows Deterministic

Workflows must produce the same sequence of Commands when replayed from the same Event History. This is the single most important constraint.

Rules for Determinism

  • Never use system time — use workflow.Now() (Go) or workflow.time() (Python)
  • Never use random number generators — use workflow.SideEffect()
  • Never perform I/O inside Workflows — delegate all I/O to Activities
  • Never use global mutable state — each Workflow Execution must be self-contained
  • Never iterate over maps without sorting keys (Go) or use threading (Python)

Use Single Input/Output Objects

Wrap all inputs and outputs in a single struct or dataclass. This lets you add fields later without breaking running Executions.

@dataclass class OrderInput: order_id: str customer_id: str items: list[str] priority: str = "normal" # New field, backward compatible @dataclass class OrderResult: status: str tracking_id: str
type OrderInput struct { OrderID string `json:"order_id"` CustomerID string `json:"customer_id"` Items []string `json:"items"` Priority string `json:"priority"` // zero-value safe } type OrderResult struct { Status string `json:"status"` TrackingID string `json:"tracking_id"` }

Manage Event History Growth

Keep histories below a few thousand events. Individual payloads must not exceed 2 MB. Total history must stay below 50 MB. Use Continue-As-New for long-running Workflows.

Tip: Use workflow.info().get_current_history_length() (Python) or workflow.GetInfo(ctx).GetCurrentHistoryLength() (Go) to check and trigger Continue-As-New.

Versioning Workflows Safely

Patching (Code-Level)

Branch logic within the same Workflow function using workflow.patched(). Keeps code in one place but adds complexity.

Python Versioning →

Worker Versioning Public Preview

Pin Workflow Executions to specific Worker build IDs. Pinned Workflows always run on the same build version until completion.

Worker Versioning →
Section 03

Activity Best Practices

Always Set Start-To-Close Timeout

Without this timeout, a crashed Worker will cause the Activity to appear stuck indefinitely. Temporal strongly recommends setting this on every Activity.

Timeout Purpose Recommendation
Start-To-Close ⭐Max duration of a single attemptALWAYS set this
Schedule-To-CloseMax duration including all retriesOptional overall deadline
HeartbeatMax gap between heartbeats~30s for long-running Activities
Schedule-To-StartMax time in Task QueueRarely needed — monitor metrics instead

Heartbeat Long-Running Activities

func LongRunningActivity(ctx context.Context, input ProcessInput) (ProcessResult, error) { for i := 0; i < len(input.Items); i++ { activity.RecordHeartbeat(ctx, i) // Report progress err := processItem(input.Items[i]) if err != nil { return ProcessResult{}, err } } return ProcessResult{Status: "done"}, nil }
@activity.defn async def long_running_activity(input: ProcessInput) -> ProcessResult: for i, item in enumerate(input.items): activity.heartbeat(i) # Report progress await process_item(item) return ProcessResult(status="done")

Configure Retry Policies Thoughtfully

Default: unlimited retries, 1s initial interval, 2.0 backoff, 100s max interval. Customize per use case:

Section 04

Worker Best Practices

Long-Running Services

Package Workers as Docker images. Deploy via CI/CD. Inject Temporal connection params at runtime via env vars.

Tune Concurrency

Don't rely on SDK defaults. Tune Activity slots for I/O vs CPU bound work. Use Poller Autoscaling (recommended).

Separate Task Queues

Isolate CPU-heavy vs I/O-heavy. Separate by priority level or team/domain for independent scaling.

Rainbow Deployments

Multiple Worker versions run simultaneously. Old versions drain pinned Workflows; new versions handle new Executions.

Section 05

Local CLI Testing Without Temporal

Activities are regular functions — test them directly with standard testing tools, no Temporal infrastructure required.

Test Activities as Plain Functions

# test_activities.py — no Temporal server needed import pytest from activities import process_order from unittest.mock import patch, AsyncMock @pytest.mark.asyncio async def test_process_order_success(): """Test activity as a plain async function.""" with patch('activities.call_payment_api', new_callable=AsyncMock) as mock_api: mock_api.return_value = MockPaymentResult(txn_id="txn_123") result = await process_order(OrderInput( order_id="ord_1", amount=99.99 )) assert result.status == "processed" assert result.txn_id == "txn_123"
// activities_test.go — uses TestActivityEnvironment for context func TestProcessOrder_Success(t *testing.T) { var suite testsuite.WorkflowTestSuite env := suite.NewTestActivityEnvironment() env.RegisterActivity(ProcessOrder) result, err := env.ExecuteActivity(ProcessOrder, OrderInput{ OrderID: "ord_1", Amount: 99.99, }) assert.NoError(t, err) var output OrderResult err = result.Get(&output) assert.NoError(t, err) assert.Equal(t, "processed", output.Status) }

Temporal CLI Dev Server

For integration tests, the CLI includes an in-memory dev server — no Docker or external DB needed:

# Install CLI brew install temporal # macOS curl -sSf https://temporal.download/cli | sh # Linux # Start dev server (in-memory, supports time-skipping) temporal server start-dev # Server: localhost:7233 | Web UI: http://localhost:8233 # Start a workflow temporal workflow start \ --task-queue my-task-queue \ --type MyWorkflow \ --input '{"key": "value"}'

Design for Testability: Dependency Injection

Accept dependencies as constructor params so you can swap mocks in tests:

type OrderActivities struct { PaymentClient PaymentClient // interface DB Database // interface } func (a *OrderActivities) ProcessPayment(ctx context.Context, input PaymentInput) (PaymentResult, error) { return a.PaymentClient.Charge(input.Amount, input.CardToken) } // Tests: inject mocks activities := &OrderActivities{PaymentClient: &MockPaymentClient{}, DB: &MockDB{}} // Production: inject real implementations activities := &OrderActivities{PaymentClient: stripe.NewClient(apiKey), DB: postgres.NewDB(connStr)}
class OrderActivities: def __init__(self, payment_client, db): self.payment_client = payment_client self.db = db @activity.defn async def process_payment(self, input: PaymentInput) -> PaymentResult: return await self.payment_client.charge(input.amount, input.card_token) # Tests activities = OrderActivities(payment_client=MockPaymentClient(), db=MockDB()) # Production activities = OrderActivities(payment_client=StripeClient(api_key), db=PostgresDB(conn_str))
Section 06

Testing with Temporal Frameworks

Both SDKs provide test environments for Workflow integration tests with mocked Activities and time-skipping.

import pytest from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker @pytest.fixture async def env(): async with await WorkflowEnvironment.start_time_skipping() as env: yield env @pytest.mark.asyncio async def test_order_workflow(env): @activity.defn(name="process_payment") async def mock_payment(input: PaymentInput) -> PaymentResult: return PaymentResult(txn_id="mock_txn") async with Worker( env.client, task_queue="test-queue", workflows=[OrderWorkflow], activities=[mock_payment], ): result = await env.client.execute_workflow( OrderWorkflow.run, OrderInput(order_id="test_1"), id="test-workflow-id", task_queue="test-queue", ) assert result.status == "completed"
type OrderWorkflowTestSuite struct { suite.Suite testsuite.WorkflowTestSuite env *testsuite.TestWorkflowEnvironment } func (s *OrderWorkflowTestSuite) SetupTest() { s.env = s.NewTestWorkflowEnvironment() } func (s *OrderWorkflowTestSuite) Test_OrderWorkflow_Success() { // Mock activity s.env.OnActivity(ProcessPayment, mock.Anything, mock.Anything).Return( PaymentResult{TxnID: "mock_txn"}, nil, ) s.env.ExecuteWorkflow(OrderWorkflow, OrderInput{OrderID: "test_1"}) s.True(s.env.IsWorkflowCompleted()) s.NoError(s.env.GetWorkflowError()) var result OrderResult s.NoError(s.env.GetWorkflowResult(&result)) s.Equal("completed", result.Status) } func TestOrderWorkflowSuite(t *testing.T) { suite.Run(t, new(OrderWorkflowTestSuite)) }

Warning: Unless Activities are mocked, the test environment executes actual Activity code including external service calls. Always mock when testing Workflow logic in isolation.

Section 07

Python SDK Recommendations

Recommended Project Structure

my-temporal-app/ ├── activities/ │ ├── __init__.py │ ├── order_activities.py │ └── notification_activities.py ├── workflows/ │ ├── __init__.py │ └── order_workflow.py ├── models/ │ ├── __init__.py │ └── order.py # Input/Output dataclasses ├── worker.py # Worker entry point ├── client.py # Workflow starter ├── config.py # Configuration loader ├── tests/ │ ├── test_activities.py │ └── test_workflows.py ├── Dockerfile └── requirements.txt

Worker Configuration

# worker.py import asyncio from temporalio.client import Client from temporalio.worker import Worker from temporalio import workflow from config import get_config with workflow.unsafe.imports_passed_through(): from workflows.order_workflow import OrderWorkflow from activities.order_activities import OrderActivities async def main(): config = get_config() client = await Client.connect(config.temporal_address, namespace=config.namespace) activities = OrderActivities( payment_client=create_payment_client(config), db=create_db_pool(config), ) worker = Worker( client, task_queue=config.task_queue, workflows=[OrderWorkflow], activities=[activities.process_payment, activities.update_inventory], ) await worker.run() if __name__ == "__main__": asyncio.run(main())

Important: Use workflow.unsafe.imports_passed_through() for importing Activity modules. This ensures Activities are accessible during sandboxed Workflow execution.

Section 08

Go SDK Recommendations

Worker Configuration

// cmd/worker/main.go func main() { cfg := config.Load() c, err := client.Dial(client.Options{ HostPort: cfg.TemporalAddress, Namespace: cfg.Namespace, }) if err != nil { log.Fatalln("Unable to create client:", err) } defer c.Close() w := worker.New(c, cfg.TaskQueue, worker.Options{ MaxConcurrentActivityExecutionSize: 200, MaxConcurrentWorkflowTaskExecutionSize: 100, }) orderActs := &activities.OrderActivities{ PaymentClient: createPaymentClient(cfg), DB: createDBPool(cfg), } w.RegisterWorkflow(workflows.OrderWorkflow) w.RegisterActivity(orderActs) if err := w.Run(worker.InterruptCh()); err != nil { log.Fatalln("Worker failed:", err) } }

Go Pattern: Declare a nil pointer to your Activity struct (var acts *OrderActivities) in Workflows and pass method references. This gives compile-time type checking with runtime dispatch via the Worker's registered struct.

Section 09

Docker Setup Tutorial

Quick Start with Official Docker Compose

# Clone official repo git clone https://github.com/temporalio/docker-compose.git cd docker-compose # Start (PostgreSQL + Elasticsearch by default) docker compose up -d # Temporal gRPC → localhost:7233 # Temporal Web UI → http://localhost:8080

Custom Production-Like Compose

# docker-compose.yml version: "3.8" services: postgresql: image: postgres:16 environment: POSTGRES_USER: temporal POSTGRES_PASSWORD: temporal ports: ["5432:5432"] volumes: [postgres_data:/var/lib/postgresql/data] networks: [temporal-network] temporal: image: temporalio/auto-setup:latest depends_on: [postgresql] environment: - DB=postgres12_pgx - DB_PORT=5432 - POSTGRES_USER=temporal - POSTGRES_PWD=temporal - POSTGRES_SEEDS=postgresql ports: ["7233:7233"] networks: [temporal-network] temporal-ui: image: temporalio/ui:latest depends_on: [temporal] environment: [TEMPORAL_ADDRESS=temporal:7233] ports: ["8080:8080"] networks: [temporal-network] worker: build: { context: ./worker, dockerfile: Dockerfile } depends_on: [temporal] environment: - TEMPORAL_ADDRESS=temporal:7233 - TASK_QUEUE=my-task-queue networks: [temporal-network] restart: unless-stopped

Worker Dockerfiles

FROM python:3.12-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["python", "worker.py"]
FROM golang:1.23 AS builder WORKDIR /app COPY go.mod go.sum ./ RUN go mod download COPY . . RUN CGO_ENABLED=0 GOOS=linux go build -o /worker ./cmd/worker FROM gcr.io/distroless/static-debian12 COPY --from=builder /worker /worker CMD ["/worker"]
Section 10

Azure Deployment Guide

Option A: Azure Kubernetes Service (AKS)

Recommended for full production Temporal deployments with dedicated server infrastructure.

Step 1: Create Azure Resources

# Create resource group + ACR + AKS az group create --name rg-temporal --location eastus2 az acr create --resource-group rg-temporal --name mytemporalacr --sku Basic az aks create \ --resource-group rg-temporal \ --name aks-temporal \ --node-count 3 \ --attach-acr mytemporalacr \ --generate-ssh-keys az aks get-credentials --resource-group rg-temporal --name aks-temporal

Step 2: Deploy Temporal Server via Helm

helm repo add temporal https://charts.temporal.io helm repo update # Create Azure PostgreSQL Flexible Server az postgres flexible-server create \ --resource-group rg-temporal \ --name temporal-pg-server \ --admin-user temporaladmin \ --admin-password <strong-password> \ --sku-name Standard_B2s --version 16 # Install Temporal helm install temporal temporal/temporal \ --namespace temporal --create-namespace \ --set server.replicaCount=2 \ --set persistence.default.driver=sql \ --set persistence.default.sql.driver=postgres12 \ --set persistence.default.sql.host=temporal-pg-server.postgres.database.azure.com

Step 3: Deploy Worker

# worker-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: temporal-worker namespace: temporal spec: replicas: 3 selector: matchLabels: { app: temporal-worker } template: metadata: labels: { app: temporal-worker } spec: containers: - name: worker image: mytemporalacr.azurecr.io/temporal-worker:v1 env: - name: TEMPORAL_ADDRESS value: "temporal-frontend.temporal.svc.cluster.local:7233" - name: TASK_QUEUE value: "my-task-queue" resources: requests: { cpu: "250m", memory: "256Mi" } limits: { cpu: "500m", memory: "512Mi" }

Option B: Azure Container Apps (Workers Only)

Simpler operational model for stateless Workers. Pair with Temporal Cloud or an AKS-hosted Temporal Server.

az containerapp env create \ --name temporal-env --resource-group rg-temporal --location eastus2 az containerapp create \ --name temporal-worker \ --resource-group rg-temporal \ --environment temporal-env \ --image mytemporalacr.azurecr.io/temporal-worker:v1 \ --registry-server mytemporalacr.azurecr.io \ --min-replicas 1 --max-replicas 10 \ --cpu 0.5 --memory 1.0Gi \ --env-vars TEMPORAL_ADDRESS=<address>:7233 TASK_QUEUE=my-task-queue

Option C: Temporal Cloud + Azure Workers

Simplest path — use Temporal's managed SaaS and deploy only Workers to Azure:

client = await Client.connect( "<namespace>.<account>.tmprl.cloud:7233", namespace="<namespace>.<account>", tls=TLSConfig( client_cert=Path("client.pem").read_bytes(), client_private_key=Path("client.key").read_bytes(), ), )
c, err := client.Dial(client.Options{ HostPort: "<namespace>.<account>.tmprl.cloud:7233", Namespace: "<namespace>.<account>", ConnectionOptions: client.ConnectionOptions{ TLS: &tls.Config{Certificates: []tls.Certificate{cert}}, }, })
Section 11

Production Checklist

Interactive checklist — click items to mark as done. Progress is session-only.