StaticClientThe Durable Client service is functionally
equivalent to the Temporal Client service.
StaticConnectionThe Durable Connection service is functionally
equivalent to the Temporal Connection service.
StaticdidChecks if an error is a HotMesh reserved error type that indicates a workflow interruption rather than a true error condition.
Type guard that returns true if an error is a Durable engine
control-flow signal rather than a genuine application error.
Durable uses thrown errors internally to suspend workflow execution
for durable operations like sleepFor, waitFor, proxyActivities,
and execChild. These errors must be re-thrown (not swallowed) so
the engine can persist state and schedule the next step.
Always use didInterrupt in catch blocks inside workflow
functions to avoid accidentally swallowing engine signals.
DurableChildError, DurableFatalError, DurableMaxedError,
DurableProxyError, DurableRetryError, DurableSleepError,
DurableTimeoutError, DurableWaitForError, DurableWaitForAllError
import { Durable } from '@hotmeshio/hotmesh';
export async function safeWorkflow(): Promise<string> {
const { riskyOperation } = Durable.workflow.proxyActivities<typeof activities>();
try {
return await riskyOperation();
} catch (error) {
// CRITICAL: re-throw engine signals
if (Durable.workflow.didInterrupt(error)) {
throw error;
}
// Handle real application errors
return 'fallback-value';
}
}
// Common pattern in interceptors
const interceptor: WorkflowInterceptor = {
async execute(ctx, next) {
try {
return await next();
} catch (error) {
if (Durable.workflow.didInterrupt(error)) {
throw error; // always re-throw engine signals
}
// Log and re-throw application errors
console.error('Workflow failed:', error);
throw error;
}
},
};
The error to check.
true if the error is a Durable engine interruption signal.
didInterrupt for detailed documentation
StaticguidGenerate a unique identifier for workflow IDs
StaticHandleThe Handle provides methods to interact with a running workflow. This includes exporting the workflow, sending signals, and querying the state of the workflow. An instance of the Handle service is typically accessed via the Durable.Client class (workflow.getHandle).
StaticregisterRegister activity workers for a task queue. Activities execute via message queue and can run on different servers from workflows.
Register activity workers for a task queue. Activities are invoked via message queue, so they can run on different servers from workflows.
The task queue name gets -activity appended automatically for the worker topic.
For example, taskQueue: 'payment' creates a worker listening on payment-activity.
Worker configuration (connection, namespace, taskQueue)
Activity functions to register
OptionalactivityTaskQueue: stringTask queue name (without -activity suffix).
Defaults to config.taskQueue if not provided.
Promise
// Activity worker (can be on separate server)
import { Durable } from '@hotmeshio/hotmesh';
import { Client as Postgres } from 'pg';
const activities = {
async processPayment(amount: number): Promise<string> {
return `Processed $${amount}`;
},
async sendEmail(to: string, subject: string): Promise<void> {
// Send email
}
};
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'payment' // Listens on 'payment-activity'
}, activities, 'payment');
// Workflow worker (can be on different server)
async function orderWorkflow(orderId: string, amount: number) {
const { processPayment, sendEmail } = Durable.workflow.proxyActivities<{
processPayment: (amount: number) => Promise<string>;
sendEmail: (to: string, subject: string) => Promise<void>;
}>({
taskQueue: 'payment',
retryPolicy: { maximumAttempts: 3 }
});
const result = await processPayment(amount);
await sendEmail('customer@example.com', 'Order confirmed');
return result;
}
await Durable.Worker.create({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'orders',
workflow: orderWorkflow
});
// Shared activity pool for interceptors
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'shared'
}, { auditLog, collectMetrics }, 'shared');
const interceptor: WorkflowInterceptor = {
async execute(ctx, next) {
const { auditLog } = Durable.workflow.proxyActivities<{
auditLog: (id: string, action: string) => Promise<void>;
}>({
taskQueue: 'shared',
retryPolicy: { maximumAttempts: 3 }
});
await auditLog(ctx.get('workflowId'), 'started');
return next();
}
};
// Activity worker
const activities = {
async processPayment(amount: number) { return `Processed $${amount}`; },
async sendEmail(to: string, msg: string) { /* ... */ }
};
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'payment'
}, activities, 'payment');
// Workflow worker (can be on different server)
async function orderWorkflow(amount: number) {
const { processPayment, sendEmail } = Durable.workflow.proxyActivities<{
processPayment: (amount: number) => Promise<string>;
sendEmail: (to: string, msg: string) => Promise<void>;
}>({
taskQueue: 'payment',
retryPolicy: { maximumAttempts: 3 }
});
const result = await processPayment(amount);
await sendEmail('customer@example.com', result);
return result;
}
await Durable.Worker.create({
connection: { class: Postgres, options: { connectionString: '...' } },
taskQueue: 'orders',
workflow: orderWorkflow
});
StaticWorkerThe Durable Worker service is functionally
equivalent to the Temporal Worker service.
StaticworkflowThe Durable workflow service is functionally
equivalent to the Temporal Workflow service
with additional methods for managing workflows,
including: execChild, waitFor, sleep, etc
StaticclearStaticclearStaticregisterRegister an activity interceptor that wraps individual proxied activity calls within workflows. Interceptors execute in registration order (first registered is outermost) using the onion pattern.
Activity interceptors run inside the workflow's execution context
and have access to all Durable workflow methods (proxyActivities,
sleepFor, waitFor, execChild, etc.). The activityCtx parameter
provides activityName, args, and options for the call being
intercepted. The workflowCtx map provides workflow metadata
(workflowId, workflowName, namespace, etc.).
The activity interceptor to register
Durable.registerActivityInterceptor({
async execute(activityCtx, workflowCtx, next) {
const start = Date.now();
try {
const result = await next();
console.log(`${activityCtx.activityName} took ${Date.now() - start}ms`);
return result;
} catch (err) {
if (Durable.didInterrupt(err)) throw err;
throw err;
}
}
});
StaticregisterRegister a workflow interceptor that wraps the entire workflow execution in an onion-like pattern. Interceptors execute in registration order (first registered is outermost) and can perform actions before and after workflow execution, handle errors, and add cross-cutting concerns like logging, metrics, or tracing.
Workflow interceptors run inside the workflow's async local storage context,
so all Durable workflow methods (proxyActivities, sleepFor, waitFor,
execChild, etc.) are available. When using Durable functions, always check
for interruptions with Durable.didInterrupt(err) and rethrow them.
The interceptor to register
// Logging interceptor
Durable.registerInterceptor({
async execute(ctx, next) {
console.log(`Workflow ${ctx.get('workflowName')} starting`);
try {
const result = await next();
console.log(`Workflow ${ctx.get('workflowName')} completed`);
return result;
} catch (err) {
if (Durable.didInterrupt(err)) throw err;
console.error(`Workflow ${ctx.get('workflowName')} failed`);
throw err;
}
}
});
Staticshutdown
The Durable service provides a Temporal-compatible workflow framework backed by Postgres. It offers entity-based memory management and composable, fault-tolerant workflows.
Core Features
1. Entity-Based Memory Model
Each workflow has a durable JSONB entity that serves as its memory:
2. Hook Functions & Workflow Coordination
Spawn and coordinate multiple perspectives/phases:
3. Durable Activities & Proxies
Define and execute durable activities with automatic retry:
4. Explicit Activity Registration
Register activity workers explicitly before workflows start:
5. Workflow Composition
Build complex workflows through composition:
6. Workflow Interceptors
Add cross-cutting concerns through interceptors that run as durable functions:
7. Activity Interceptors
Wrap individual proxied activity calls with cross-cutting logic. Unlike workflow interceptors (which wrap the entire workflow), activity interceptors execute around each
proxyActivitiescall. They run inside the workflow's execution context and have access to all Durable workflow methods (proxyActivities,sleepFor,waitFor,execChild, etc.). Multiple activity interceptors execute in onion order (first registered is outermost).Basic Usage Example