StaticactivityActivity-side context. Call Durable.activity.getContext() inside
an activity function to access the activity name, arguments,
parent workflow ID, and other execution metadata.
StaticClientStarts workflows, sends signals, and reads results. Instantiate
with a connection, then use client.workflow.start() to launch
a workflow and obtain a Handle.
StaticConnectionDeclares connection options (class + config) for Postgres. The actual connection is established lazily when a workflow or worker is started.
StaticdidReturns true if the error is an engine control-flow signal (replay
suspension) rather than an application error. Always check this in
catch blocks inside workflows and interceptors — swallowing an
interruption breaks the replay system.
Type guard that returns true if an error is a Durable engine
control-flow signal rather than a genuine application error.
Durable uses thrown errors internally to suspend workflow execution
for durable operations like sleep, condition, proxyActivities,
and executeChild. These errors must be re-thrown (not swallowed) so
the engine can persist state and schedule the next step.
Always use didInterrupt in catch blocks inside workflow
functions to avoid accidentally swallowing engine signals.
DurableChildError, DurableContinueAsNewError, DurableFatalError,
DurableMaxedError, DurableProxyError, DurableRetryError,
DurableSleepError, DurableTimeoutError, DurableWaitForError,
DurableWaitForAllError, CancelledFailure
import { Durable } from '@hotmeshio/hotmesh';
export async function safeWorkflow(): Promise<string> {
const { riskyOperation } = Durable.workflow.proxyActivities<typeof activities>();
try {
return await riskyOperation();
} catch (error) {
// CRITICAL: re-throw engine signals
if (Durable.workflow.didInterrupt(error)) {
throw error;
}
// Handle real application errors
return 'fallback-value';
}
}
// Common pattern in interceptors
const interceptor: WorkflowInboundCallsInterceptor = {
async execute(ctx, next) {
try {
return await next();
} catch (error) {
if (Durable.workflow.didInterrupt(error)) {
throw error; // always re-throw engine signals
}
// Log and re-throw application errors
console.error('Workflow failed:', error);
throw error;
}
},
};
The error to check.
true if the error is a Durable engine interruption signal.
StaticguidGenerate a unique identifier for workflow IDs
StaticHandleA handle to a running or completed workflow. Provides methods to
read results, send signals, query state, cancel, or interrupt.
Obtained via client.workflow.start() or client.workflow.getHandle().
StaticlistList all provisioned secured worker roles.
List all provisioned worker roles for a namespace.
Optionalnamespace?: stringStaticprovisionProvision a scoped Postgres role for a worker. The role can only dequeue, ack, and respond on its assigned stream names via stored procedures — zero direct table access.
Provision a scoped Postgres role for a worker router.
The role can only dequeue/ack/respond on the specified stream names via SECURITY DEFINER stored procedures — it has zero direct table access.
Optionalnamespace?: stringOptionalpassword?: stringconst cred = await Durable.provisionWorkerRole({
connection: { class: Postgres, options: adminOptions },
streamNames: ['payment-activity'],
});
await Worker.create({
connection: { class: Postgres, options: pgOptions },
taskQueue: 'payment',
workflow: paymentWorkflow,
workerCredentials: cred,
});
StaticregisterRegister activity workers for a task queue. Activities execute via message queue and can run on different servers from workflows.
Register activity workers for a task queue. Activities are invoked via message queue, so they can run on different servers from workflows.
The task queue name gets -activity appended automatically for the worker topic.
For example, taskQueue: 'payment' creates a worker listening on payment-activity.
Worker configuration (connection, namespace, taskQueue)
Activity functions to register
OptionalactivityTaskQueue: stringTask queue name (without -activity suffix).
Defaults to config.taskQueue if not provided.
Promise
// Activity worker (can be on separate server)
import { Durable } from '@hotmeshio/hotmesh';
import { Client as Postgres } from 'pg';
const activities = {
async processPayment(amount: number): Promise<string> {
return `Processed $${amount}`;
},
async sendEmail(to: string, subject: string): Promise<void> {
// Send email
}
};
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'payment' // Listens on 'payment-activity'
}, activities, 'payment');
// Workflow worker (can be on different server)
async function orderWorkflow(orderId: string, amount: number) {
const { processPayment, sendEmail } = Durable.workflow.proxyActivities<{
processPayment: (amount: number) => Promise<string>;
sendEmail: (to: string, subject: string) => Promise<void>;
}>({
taskQueue: 'payment',
retry: { maximumAttempts: 3 }
});
const result = await processPayment(amount);
await sendEmail('customer@example.com', 'Order confirmed');
return result;
}
await Durable.Worker.create({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'orders',
workflow: orderWorkflow
});
// Shared activity pool for interceptors
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'shared'
}, { auditLog, collectMetrics }, 'shared');
const interceptor: WorkflowInboundCallsInterceptor = {
async execute(ctx, next) {
const { auditLog } = Durable.workflow.proxyActivities<{
auditLog: (id: string, action: string) => Promise<void>;
}>({
taskQueue: 'shared',
retry: { maximumAttempts: 3 }
});
await auditLog(ctx.get('workflowId'), 'started');
return next();
}
};
// Secured worker with scoped Postgres credentials (VNF-style isolation)
// Step 1: Admin provisions a credential (one-time)
const cred = await Durable.provisionWorkerRole({
connection: { class: Postgres, options: adminOptions },
streamNames: ['payment-activity'],
});
// Step 2: Worker connects with scoped role — can only access payment-activity
await Durable.registerActivityWorker({
connection: { class: Postgres, options: { host: 'pg.prod', database: 'hotmesh' } },
taskQueue: 'payment',
workerCredentials: { user: cred.roleName, password: cred.password },
}, { processPayment, refundPayment });
// Activity worker
const activities = {
async processPayment(amount: number) { return `Processed $${amount}`; },
async sendEmail(to: string, msg: string) { /* ... */ }
};
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'payment'
}, activities, 'payment');
// Workflow worker (can be on different server)
async function orderWorkflow(amount: number) {
const { processPayment, sendEmail } = Durable.workflow.proxyActivities<{
processPayment: (amount: number) => Promise<string>;
sendEmail: (to: string, msg: string) => Promise<void>;
}>({
taskQueue: 'payment',
retry: { maximumAttempts: 3 }
});
const result = await processPayment(amount);
await sendEmail('customer@example.com', result);
return result;
}
await Durable.Worker.create({
connection: { class: Postgres, options: { connectionString: '...' } },
taskQueue: 'orders',
workflow: orderWorkflow
});
StaticregisterRegister a global payload codec for encrypting/decrypting workflow
data at rest. The codec's encode/decode methods are called
automatically on all persisted workflow payloads.
Register a global payload codec for encoding/decoding serialized
object data at rest. Once registered, all object values flowing
through the serializer are stored as /b{encoded} instead of
/s{json}. Use this for encryption, compression, or custom encoding.
The codec is global — it applies to all HotMesh and Durable instances
in the process. Pass null to remove a previously registered codec.
Constraints: The codec must be synchronous and its output must be a valid UTF-8 string. Use base64 encoding for binary output.
StaticrevokeRevoke a secured worker role (disables login).
Revoke a worker role by disabling login. The role is not dropped,
preserving the audit trail in the worker_credentials table.
Optionalnamespace?: stringStaticrotateRotate a secured worker role's password.
Rotate the password for an existing worker role.
Optionalnamespace?: stringOptionalnewStaticWorkerHosts workflow and activity functions. Call Worker.create() with
a connection, task queue, and workflow function to start processing.
StaticworkflowThe workflow-internal API. Every method on this object
(proxyActivities, sleep, condition, signal, executeChild,
continueAsNew, patched, CancellationScope, etc.) is designed
to be called inside a workflow function and participates in
deterministic replay.
StaticclearStaticclearStaticclearStaticregisterRegister an activity inbound interceptor that wraps the actual activity
function execution on the activity worker side. This runs inside the
activity's AsyncLocalStorage context, NOT the workflow context.
Use for logging, metrics, auth validation, or error enrichment at the point where the activity actually executes.
StaticregisterRegister a workflow interceptor that wraps the entire workflow execution in an onion-like pattern. Interceptors execute in registration order (first registered is outermost) and can perform actions before and after workflow execution, handle errors, and add cross-cutting concerns like logging, metrics, or tracing.
Workflow interceptors run inside the workflow's async local storage context,
so all Durable workflow methods (proxyActivities, sleep, condition,
executeChild, etc.) are available. When using Durable functions, always check
for interruptions with Durable.didInterrupt(err) and rethrow them.
The interceptor to register
// Logging interceptor
Durable.registerInboundInterceptor({
async execute(ctx, next) {
console.log(`Workflow ${ctx.get('workflowName')} starting`);
try {
const result = await next();
console.log(`Workflow ${ctx.get('workflowName')} completed`);
return result;
} catch (err) {
if (Durable.didInterrupt(err)) throw err;
console.error(`Workflow ${ctx.get('workflowName')} failed`);
throw err;
}
}
});
StaticregisterRegister an outbound interceptor that wraps individual proxied activity calls within workflows. Interceptors execute in registration order (first registered is outermost) using the onion pattern.
Outbound interceptors run inside the workflow's execution context
and have access to all Durable workflow methods (proxyActivities,
sleep, condition, executeChild, etc.). The activityCtx parameter
provides activityName, args, and options for the call being
intercepted. The workflowCtx map provides workflow metadata
(workflowId, workflowName, namespace, etc.).
The outbound interceptor to register
Durable.registerOutboundInterceptor({
async execute(activityCtx, workflowCtx, next) {
const start = Date.now();
try {
const result = await next();
console.log(`${activityCtx.activityName} took ${Date.now() - start}ms`);
return result;
} catch (err) {
if (Durable.didInterrupt(err)) throw err;
throw err;
}
}
});
Staticshutdown
Durable workflow engine backed by Postgres. Write workflows as plain async functions; the engine handles persistence, replay, retry, and crash recovery automatically.
Architecture
proxyActivities,sleep,condition, ...)Workflow Primitives
All methods below are available as
Durable.workflow.<method>inside a workflow function. Each call is durable — results are persisted and replayed deterministically on recovery.proxyActivitiessleepconditionsignalconditionexecuteChildstartChildcontinueAsNewpatched/deprecatePatchCancellationScope.nonCancellableisCancellationQuick Start
Interceptors
Cross-cutting concerns (logging, auth, metrics) attach via interceptors. See registerInboundInterceptor (wraps workflows) and registerOutboundInterceptor (wraps individual activity calls).
Secured Workers
For production isolation, workers can connect with scoped Postgres credentials that restrict access to specific task queues via stored procedures. See WorkerService.create and provisionWorkerRole.