Temporal.io
Best Practices Guide
Workers, Workflows & Activities — with Python & Go recommendations, local testing patterns, Docker setup, and Azure deployment tutorials.
Core Concepts & Architecture
Temporal separates the orchestration engine (Temporal Server) from the execution engine (Workers). The Server persists Workflow state as an append-only Event History. Workers poll Task Queues, execute code, and report results back.
| Component | Responsibility | Key Characteristic |
|---|---|---|
| Temporal Server | State management, task routing, timers | Must be highly available |
| Worker | Executes Workflow and Activity code | Stateless, horizontally scalable |
| Workflow | Orchestration logic (deterministic) | Must not perform I/O directly |
| Activity | Business logic with side effects | Can call APIs, databases, etc. |
| Task Queue | Routes tasks to appropriate Workers | Lightweight, dynamically created |
Workflow Best Practices
Keep Workflows Deterministic
Workflows must produce the same sequence of Commands when replayed from the same Event History. This is the single most important constraint.
Rules for Determinism
- ✗ Never use system time — use
workflow.Now()(Go) orworkflow.time()(Python) - ✗ Never use random number generators — use
workflow.SideEffect() - ✗ Never perform I/O inside Workflows — delegate all I/O to Activities
- ✗ Never use global mutable state — each Workflow Execution must be self-contained
- ✗ Never iterate over maps without sorting keys (Go) or use threading (Python)
Use Single Input/Output Objects
Wrap all inputs and outputs in a single struct or dataclass. This lets you add fields later without breaking running Executions.
@dataclass
class OrderInput:
order_id: str
customer_id: str
items: list[str]
priority: str = "normal" # New field, backward compatible
@dataclass
class OrderResult:
status: str
tracking_id: strtype OrderInput struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Items []string `json:"items"`
Priority string `json:"priority"` // zero-value safe
}
type OrderResult struct {
Status string `json:"status"`
TrackingID string `json:"tracking_id"`
}Manage Event History Growth
Keep histories below a few thousand events. Individual payloads must not exceed 2 MB. Total history must stay below 50 MB. Use Continue-As-New for long-running Workflows.
Tip: Use workflow.info().get_current_history_length() (Python) or workflow.GetInfo(ctx).GetCurrentHistoryLength() (Go) to check and trigger Continue-As-New.
Versioning Workflows Safely
Patching (Code-Level)
Branch logic within the same Workflow function using workflow.patched(). Keeps code in one place but adds complexity.
Worker Versioning Public Preview
Pin Workflow Executions to specific Worker build IDs. Pinned Workflows always run on the same build version until completion.
Worker Versioning →Activity Best Practices
Always Set Start-To-Close Timeout
Without this timeout, a crashed Worker will cause the Activity to appear stuck indefinitely. Temporal strongly recommends setting this on every Activity.
| Timeout | Purpose | Recommendation |
|---|---|---|
| Start-To-Close ⭐ | Max duration of a single attempt | ALWAYS set this |
| Schedule-To-Close | Max duration including all retries | Optional overall deadline |
| Heartbeat | Max gap between heartbeats | ~30s for long-running Activities |
| Schedule-To-Start | Max time in Task Queue | Rarely needed — monitor metrics instead |
Heartbeat Long-Running Activities
func LongRunningActivity(ctx context.Context, input ProcessInput) (ProcessResult, error) {
for i := 0; i < len(input.Items); i++ {
activity.RecordHeartbeat(ctx, i) // Report progress
err := processItem(input.Items[i])
if err != nil { return ProcessResult{}, err }
}
return ProcessResult{Status: "done"}, nil
}@activity.defn
async def long_running_activity(input: ProcessInput) -> ProcessResult:
for i, item in enumerate(input.items):
activity.heartbeat(i) # Report progress
await process_item(item)
return ProcessResult(status="done")Configure Retry Policies Thoughtfully
Default: unlimited retries, 1s initial interval, 2.0 backoff, 100s max interval. Customize per use case:
- ● Set
MaximumAttemptsfor operations that shouldn't retry forever (e.g., payments) - ● Use
NonRetryableErrorTypesfor errors that can never succeed on retry (4xx, validation) - ● Activity logic changes don't break Workflow determinism — update Activities freely
Worker Best Practices
Long-Running Services
Package Workers as Docker images. Deploy via CI/CD. Inject Temporal connection params at runtime via env vars.
Tune Concurrency
Don't rely on SDK defaults. Tune Activity slots for I/O vs CPU bound work. Use Poller Autoscaling (recommended).
Separate Task Queues
Isolate CPU-heavy vs I/O-heavy. Separate by priority level or team/domain for independent scaling.
Rainbow Deployments
Multiple Worker versions run simultaneously. Old versions drain pinned Workflows; new versions handle new Executions.
Local CLI Testing Without Temporal
Activities are regular functions — test them directly with standard testing tools, no Temporal infrastructure required.
Test Activities as Plain Functions
# test_activities.py — no Temporal server needed
import pytest
from activities import process_order
from unittest.mock import patch, AsyncMock
@pytest.mark.asyncio
async def test_process_order_success():
"""Test activity as a plain async function."""
with patch('activities.call_payment_api', new_callable=AsyncMock) as mock_api:
mock_api.return_value = MockPaymentResult(txn_id="txn_123")
result = await process_order(OrderInput(
order_id="ord_1", amount=99.99
))
assert result.status == "processed"
assert result.txn_id == "txn_123"// activities_test.go — uses TestActivityEnvironment for context
func TestProcessOrder_Success(t *testing.T) {
var suite testsuite.WorkflowTestSuite
env := suite.NewTestActivityEnvironment()
env.RegisterActivity(ProcessOrder)
result, err := env.ExecuteActivity(ProcessOrder, OrderInput{
OrderID: "ord_1", Amount: 99.99,
})
assert.NoError(t, err)
var output OrderResult
err = result.Get(&output)
assert.NoError(t, err)
assert.Equal(t, "processed", output.Status)
}Temporal CLI Dev Server
For integration tests, the CLI includes an in-memory dev server — no Docker or external DB needed:
# Install CLI
brew install temporal # macOS
curl -sSf https://temporal.download/cli | sh # Linux
# Start dev server (in-memory, supports time-skipping)
temporal server start-dev
# Server: localhost:7233 | Web UI: http://localhost:8233
# Start a workflow
temporal workflow start \
--task-queue my-task-queue \
--type MyWorkflow \
--input '{"key": "value"}'Design for Testability: Dependency Injection
Accept dependencies as constructor params so you can swap mocks in tests:
type OrderActivities struct {
PaymentClient PaymentClient // interface
DB Database // interface
}
func (a *OrderActivities) ProcessPayment(ctx context.Context, input PaymentInput) (PaymentResult, error) {
return a.PaymentClient.Charge(input.Amount, input.CardToken)
}
// Tests: inject mocks
activities := &OrderActivities{PaymentClient: &MockPaymentClient{}, DB: &MockDB{}}
// Production: inject real implementations
activities := &OrderActivities{PaymentClient: stripe.NewClient(apiKey), DB: postgres.NewDB(connStr)}class OrderActivities:
def __init__(self, payment_client, db):
self.payment_client = payment_client
self.db = db
@activity.defn
async def process_payment(self, input: PaymentInput) -> PaymentResult:
return await self.payment_client.charge(input.amount, input.card_token)
# Tests
activities = OrderActivities(payment_client=MockPaymentClient(), db=MockDB())
# Production
activities = OrderActivities(payment_client=StripeClient(api_key), db=PostgresDB(conn_str))Testing with Temporal Frameworks
Both SDKs provide test environments for Workflow integration tests with mocked Activities and time-skipping.
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
@pytest.fixture
async def env():
async with await WorkflowEnvironment.start_time_skipping() as env:
yield env
@pytest.mark.asyncio
async def test_order_workflow(env):
@activity.defn(name="process_payment")
async def mock_payment(input: PaymentInput) -> PaymentResult:
return PaymentResult(txn_id="mock_txn")
async with Worker(
env.client, task_queue="test-queue",
workflows=[OrderWorkflow],
activities=[mock_payment],
):
result = await env.client.execute_workflow(
OrderWorkflow.run,
OrderInput(order_id="test_1"),
id="test-workflow-id",
task_queue="test-queue",
)
assert result.status == "completed"type OrderWorkflowTestSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
env *testsuite.TestWorkflowEnvironment
}
func (s *OrderWorkflowTestSuite) SetupTest() {
s.env = s.NewTestWorkflowEnvironment()
}
func (s *OrderWorkflowTestSuite) Test_OrderWorkflow_Success() {
// Mock activity
s.env.OnActivity(ProcessPayment, mock.Anything, mock.Anything).Return(
PaymentResult{TxnID: "mock_txn"}, nil,
)
s.env.ExecuteWorkflow(OrderWorkflow, OrderInput{OrderID: "test_1"})
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
var result OrderResult
s.NoError(s.env.GetWorkflowResult(&result))
s.Equal("completed", result.Status)
}
func TestOrderWorkflowSuite(t *testing.T) {
suite.Run(t, new(OrderWorkflowTestSuite))
}Warning: Unless Activities are mocked, the test environment executes actual Activity code including external service calls. Always mock when testing Workflow logic in isolation.
Python SDK Recommendations
Recommended Project Structure
my-temporal-app/
├── activities/
│ ├── __init__.py
│ ├── order_activities.py
│ └── notification_activities.py
├── workflows/
│ ├── __init__.py
│ └── order_workflow.py
├── models/
│ ├── __init__.py
│ └── order.py # Input/Output dataclasses
├── worker.py # Worker entry point
├── client.py # Workflow starter
├── config.py # Configuration loader
├── tests/
│ ├── test_activities.py
│ └── test_workflows.py
├── Dockerfile
└── requirements.txtWorker Configuration
# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio import workflow
from config import get_config
with workflow.unsafe.imports_passed_through():
from workflows.order_workflow import OrderWorkflow
from activities.order_activities import OrderActivities
async def main():
config = get_config()
client = await Client.connect(config.temporal_address, namespace=config.namespace)
activities = OrderActivities(
payment_client=create_payment_client(config),
db=create_db_pool(config),
)
worker = Worker(
client,
task_queue=config.task_queue,
workflows=[OrderWorkflow],
activities=[activities.process_payment, activities.update_inventory],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())Important: Use workflow.unsafe.imports_passed_through() for importing Activity modules. This ensures Activities are accessible during sandboxed Workflow execution.
Go SDK Recommendations
Worker Configuration
// cmd/worker/main.go
func main() {
cfg := config.Load()
c, err := client.Dial(client.Options{
HostPort: cfg.TemporalAddress,
Namespace: cfg.Namespace,
})
if err != nil { log.Fatalln("Unable to create client:", err) }
defer c.Close()
w := worker.New(c, cfg.TaskQueue, worker.Options{
MaxConcurrentActivityExecutionSize: 200,
MaxConcurrentWorkflowTaskExecutionSize: 100,
})
orderActs := &activities.OrderActivities{
PaymentClient: createPaymentClient(cfg),
DB: createDBPool(cfg),
}
w.RegisterWorkflow(workflows.OrderWorkflow)
w.RegisterActivity(orderActs)
if err := w.Run(worker.InterruptCh()); err != nil {
log.Fatalln("Worker failed:", err)
}
}Go Pattern: Declare a nil pointer to your Activity struct (var acts *OrderActivities) in Workflows and pass method references. This gives compile-time type checking with runtime dispatch via the Worker's registered struct.
Docker Setup Tutorial
Quick Start with Official Docker Compose
# Clone official repo
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
# Start (PostgreSQL + Elasticsearch by default)
docker compose up -d
# Temporal gRPC → localhost:7233
# Temporal Web UI → http://localhost:8080Custom Production-Like Compose
# docker-compose.yml
version: "3.8"
services:
postgresql:
image: postgres:16
environment:
POSTGRES_USER: temporal
POSTGRES_PASSWORD: temporal
ports: ["5432:5432"]
volumes: [postgres_data:/var/lib/postgresql/data]
networks: [temporal-network]
temporal:
image: temporalio/auto-setup:latest
depends_on: [postgresql]
environment:
- DB=postgres12_pgx
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=postgresql
ports: ["7233:7233"]
networks: [temporal-network]
temporal-ui:
image: temporalio/ui:latest
depends_on: [temporal]
environment: [TEMPORAL_ADDRESS=temporal:7233]
ports: ["8080:8080"]
networks: [temporal-network]
worker:
build: { context: ./worker, dockerfile: Dockerfile }
depends_on: [temporal]
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TASK_QUEUE=my-task-queue
networks: [temporal-network]
restart: unless-stoppedWorker Dockerfiles
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "worker.py"]FROM golang:1.23 AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /worker ./cmd/worker
FROM gcr.io/distroless/static-debian12
COPY --from=builder /worker /worker
CMD ["/worker"]Azure Deployment Guide
Option A: Azure Kubernetes Service (AKS)
Recommended for full production Temporal deployments with dedicated server infrastructure.
Step 1: Create Azure Resources
# Create resource group + ACR + AKS
az group create --name rg-temporal --location eastus2
az acr create --resource-group rg-temporal --name mytemporalacr --sku Basic
az aks create \
--resource-group rg-temporal \
--name aks-temporal \
--node-count 3 \
--attach-acr mytemporalacr \
--generate-ssh-keys
az aks get-credentials --resource-group rg-temporal --name aks-temporalStep 2: Deploy Temporal Server via Helm
helm repo add temporal https://charts.temporal.io
helm repo update
# Create Azure PostgreSQL Flexible Server
az postgres flexible-server create \
--resource-group rg-temporal \
--name temporal-pg-server \
--admin-user temporaladmin \
--admin-password <strong-password> \
--sku-name Standard_B2s --version 16
# Install Temporal
helm install temporal temporal/temporal \
--namespace temporal --create-namespace \
--set server.replicaCount=2 \
--set persistence.default.driver=sql \
--set persistence.default.sql.driver=postgres12 \
--set persistence.default.sql.host=temporal-pg-server.postgres.database.azure.comStep 3: Deploy Worker
# worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: temporal-worker
namespace: temporal
spec:
replicas: 3
selector:
matchLabels: { app: temporal-worker }
template:
metadata:
labels: { app: temporal-worker }
spec:
containers:
- name: worker
image: mytemporalacr.azurecr.io/temporal-worker:v1
env:
- name: TEMPORAL_ADDRESS
value: "temporal-frontend.temporal.svc.cluster.local:7233"
- name: TASK_QUEUE
value: "my-task-queue"
resources:
requests: { cpu: "250m", memory: "256Mi" }
limits: { cpu: "500m", memory: "512Mi" }Option B: Azure Container Apps (Workers Only)
Simpler operational model for stateless Workers. Pair with Temporal Cloud or an AKS-hosted Temporal Server.
az containerapp env create \
--name temporal-env --resource-group rg-temporal --location eastus2
az containerapp create \
--name temporal-worker \
--resource-group rg-temporal \
--environment temporal-env \
--image mytemporalacr.azurecr.io/temporal-worker:v1 \
--registry-server mytemporalacr.azurecr.io \
--min-replicas 1 --max-replicas 10 \
--cpu 0.5 --memory 1.0Gi \
--env-vars TEMPORAL_ADDRESS=<address>:7233 TASK_QUEUE=my-task-queueOption C: Temporal Cloud + Azure Workers
Simplest path — use Temporal's managed SaaS and deploy only Workers to Azure:
client = await Client.connect(
"<namespace>.<account>.tmprl.cloud:7233",
namespace="<namespace>.<account>",
tls=TLSConfig(
client_cert=Path("client.pem").read_bytes(),
client_private_key=Path("client.key").read_bytes(),
),
)c, err := client.Dial(client.Options{
HostPort: "<namespace>.<account>.tmprl.cloud:7233",
Namespace: "<namespace>.<account>",
ConnectionOptions: client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
},
})Production Checklist
Interactive checklist — click items to mark as done. Progress is session-only.