Durable workflow engine backed by Postgres. Write workflows as plain async functions; the engine handles persistence, replay, retry, and crash recovery automatically.

Component Role
Worker Hosts workflow and activity functions
Client Starts workflows, sends signals, reads results
workflow In-workflow API (proxyActivities, sleep, condition, ...)

All methods below are available as Durable.workflow.<method> inside a workflow function. Each call is durable — results are persisted and replayed deterministically on recovery.

Primitive Purpose
proxyActivities Execute side-effectful code with automatic retry
sleep Durable timer (survives restarts)
condition Pause until a named signal arrives (with optional timeout)
signal Deliver data to a waiting condition
executeChild Spawn a child workflow and await its result
startChild Fire-and-forget child workflow
continueAsNew Restart the workflow with new args (resets history)
patched / deprecatePatch Safe versioning for in-flight workflow code changes
CancellationScope.nonCancellable Shield cleanup code from cancellation
isCancellation Detect cooperative cancellation errors
import { Durable } from '@hotmeshio/hotmesh';
import { Client as Postgres } from 'pg';
import * as activities from './activities';

// 1. Define a workflow
async function orderWorkflow(orderId: string): Promise<string> {
const { charge, notify } = Durable.workflow.proxyActivities<typeof activities>({
activities,
retry: { maximumAttempts: 3 },
});

const receipt = await charge(orderId);
await Durable.workflow.sleep('1 hour');
await notify(orderId, receipt);
return receipt;
}

// 2. Start a worker
const connection = { class: Postgres, options: { connectionString: 'postgresql://...' } };
await Durable.Worker.create({ connection, taskQueue: 'orders', workflow: orderWorkflow });

// 3. Start a workflow from the client
const client = new Durable.Client({ connection });
const handle = await client.workflow.start({
args: ['order-123'],
taskQueue: 'orders',
workflowName: 'orderWorkflow',
workflowId: Durable.guid(),
});
const result = await handle.result();

Cross-cutting concerns (logging, auth, metrics) attach via interceptors. See registerInboundInterceptor (wraps workflows) and registerOutboundInterceptor (wraps individual activity calls).

For production isolation, workers can connect with scoped Postgres credentials that restrict access to specific task queues via stored procedures. See WorkerService.create and provisionWorkerRole.

Properties

activity: typeof ActivityService = ActivityService

Activity-side context. Call Durable.activity.getContext() inside an activity function to access the activity name, arguments, parent workflow ID, and other execution metadata.

Client: typeof ClientService = ClientService

Starts workflows, sends signals, and reads results. Instantiate with a connection, then use client.workflow.start() to launch a workflow and obtain a Handle.

Connection: typeof ConnectionService = ConnectionService

Declares connection options (class + config) for Postgres. The actual connection is established lazily when a workflow or worker is started.

didInterrupt: ((error: Error) => boolean) = didInterrupt

Returns true if the error is an engine control-flow signal (replay suspension) rather than an application error. Always check this in catch blocks inside workflows and interceptors — swallowing an interruption breaks the replay system.

Type declaration

    • (error): boolean
    • Type guard that returns true if an error is a Durable engine control-flow signal rather than a genuine application error.

      Durable uses thrown errors internally to suspend workflow execution for durable operations like sleep, condition, proxyActivities, and executeChild. These errors must be re-thrown (not swallowed) so the engine can persist state and schedule the next step.

      Always use didInterrupt in catch blocks inside workflow functions to avoid accidentally swallowing engine signals.

      DurableChildError, DurableContinueAsNewError, DurableFatalError, DurableMaxedError, DurableProxyError, DurableRetryError, DurableSleepError, DurableTimeoutError, DurableWaitForError, DurableWaitForAllError, CancelledFailure

      import { Durable } from '@hotmeshio/hotmesh';

      export async function safeWorkflow(): Promise<string> {
      const { riskyOperation } = Durable.workflow.proxyActivities<typeof activities>();

      try {
      return await riskyOperation();
      } catch (error) {
      // CRITICAL: re-throw engine signals
      if (Durable.workflow.didInterrupt(error)) {
      throw error;
      }
      // Handle real application errors
      return 'fallback-value';
      }
      }
      // Common pattern in interceptors
      const interceptor: WorkflowInboundCallsInterceptor = {
      async execute(ctx, next) {
      try {
      return await next();
      } catch (error) {
      if (Durable.workflow.didInterrupt(error)) {
      throw error; // always re-throw engine signals
      }
      // Log and re-throw application errors
      console.error('Workflow failed:', error);
      throw error;
      }
      },
      };

      Parameters

      • error: Error

        The error to check.

      Returns boolean

      true if the error is a Durable engine interruption signal.

guid: ((size?: number) => string) = guid

Generate a unique identifier for workflow IDs

Handle: typeof WorkflowHandleService = WorkflowHandleService

A handle to a running or completed workflow. Provides methods to read results, send signals, query state, cancel, or interrupt. Obtained via client.workflow.start() or client.workflow.getHandle().

listWorkerRoles: ((config: {
    connection: ProviderConfig | ProvidersConfig;
    namespace?: string;
}) => Promise<WorkerCredentialInfo[]>) = HotMesh.listWorkerRoles

List all provisioned secured worker roles.

Type declaration

provisionWorkerRole: ((config: {
    connection: ProviderConfig | ProvidersConfig;
    namespace?: string;
    password?: string;
    streamNames: string[];
}) => Promise<WorkerCredential>) = HotMesh.provisionWorkerRole

Provision a scoped Postgres role for a worker. The role can only dequeue, ack, and respond on its assigned stream names via stored procedures — zero direct table access.

Type declaration

    • (config): Promise<WorkerCredential>
    • Provision a scoped Postgres role for a worker router.

      The role can only dequeue/ack/respond on the specified stream names via SECURITY DEFINER stored procedures — it has zero direct table access.

      Parameters

      Returns Promise<WorkerCredential>

const cred = await Durable.provisionWorkerRole({
connection: { class: Postgres, options: adminOptions },
streamNames: ['payment-activity'],
});

await Worker.create({
connection: { class: Postgres, options: pgOptions },
taskQueue: 'payment',
workflow: paymentWorkflow,
workerCredentials: cred,
});
registerActivityWorker: ((config: Partial<WorkerConfig>, activities: any, activityTaskQueue?: string) => Promise<HotMesh>) = WorkerService.registerActivityWorker

Register activity workers for a task queue. Activities execute via message queue and can run on different servers from workflows.

Type declaration

    • (config, activities, activityTaskQueue?): Promise<HotMesh>
    • Register activity workers for a task queue. Activities are invoked via message queue, so they can run on different servers from workflows.

      The task queue name gets -activity appended automatically for the worker topic. For example, taskQueue: 'payment' creates a worker listening on payment-activity.

      Parameters

      • config: Partial<WorkerConfig>

        Worker configuration (connection, namespace, taskQueue)

      • activities: any

        Activity functions to register

      • OptionalactivityTaskQueue: string

        Task queue name (without -activity suffix). Defaults to config.taskQueue if not provided.

      Returns Promise<HotMesh>

      Promise The initialized activity worker

      // Activity worker (can be on separate server)
      import { Durable } from '@hotmeshio/hotmesh';
      import { Client as Postgres } from 'pg';

      const activities = {
      async processPayment(amount: number): Promise<string> {
      return `Processed $${amount}`;
      },
      async sendEmail(to: string, subject: string): Promise<void> {
      // Send email
      }
      };

      await Durable.registerActivityWorker({
      connection: {
      class: Postgres,
      options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
      },
      taskQueue: 'payment' // Listens on 'payment-activity'
      }, activities, 'payment');
      // Workflow worker (can be on different server)
      async function orderWorkflow(orderId: string, amount: number) {
      const { processPayment, sendEmail } = Durable.workflow.proxyActivities<{
      processPayment: (amount: number) => Promise<string>;
      sendEmail: (to: string, subject: string) => Promise<void>;
      }>({
      taskQueue: 'payment',
      retry: { maximumAttempts: 3 }
      });

      const result = await processPayment(amount);
      await sendEmail('customer@example.com', 'Order confirmed');
      return result;
      }

      await Durable.Worker.create({
      connection: {
      class: Postgres,
      options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
      },
      taskQueue: 'orders',
      workflow: orderWorkflow
      });
      // Shared activity pool for interceptors
      await Durable.registerActivityWorker({
      connection: {
      class: Postgres,
      options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
      },
      taskQueue: 'shared'
      }, { auditLog, collectMetrics }, 'shared');

      const interceptor: WorkflowInboundCallsInterceptor = {
      async execute(ctx, next) {
      const { auditLog } = Durable.workflow.proxyActivities<{
      auditLog: (id: string, action: string) => Promise<void>;
      }>({
      taskQueue: 'shared',
      retry: { maximumAttempts: 3 }
      });
      await auditLog(ctx.get('workflowId'), 'started');
      return next();
      }
      };
      // Secured worker with scoped Postgres credentials (VNF-style isolation)
      // Step 1: Admin provisions a credential (one-time)
      const cred = await Durable.provisionWorkerRole({
      connection: { class: Postgres, options: adminOptions },
      streamNames: ['payment-activity'],
      });

      // Step 2: Worker connects with scoped role — can only access payment-activity
      await Durable.registerActivityWorker({
      connection: { class: Postgres, options: { host: 'pg.prod', database: 'hotmesh' } },
      taskQueue: 'payment',
      workerCredentials: { user: cred.roleName, password: cred.password },
      }, { processPayment, refundPayment });
// Activity worker
const activities = {
async processPayment(amount: number) { return `Processed $${amount}`; },
async sendEmail(to: string, msg: string) { /* ... */ }
};

await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'payment'
}, activities, 'payment');

// Workflow worker (can be on different server)
async function orderWorkflow(amount: number) {
const { processPayment, sendEmail } = Durable.workflow.proxyActivities<{
processPayment: (amount: number) => Promise<string>;
sendEmail: (to: string, msg: string) => Promise<void>;
}>({
taskQueue: 'payment',
retry: { maximumAttempts: 3 }
});

const result = await processPayment(amount);
await sendEmail('customer@example.com', result);
return result;
}

await Durable.Worker.create({
connection: { class: Postgres, options: { connectionString: '...' } },
taskQueue: 'orders',
workflow: orderWorkflow
});
registerCodec: ((codec: PayloadCodec) => void) = HotMesh.registerCodec

Register a global payload codec for encrypting/decrypting workflow data at rest. The codec's encode/decode methods are called automatically on all persisted workflow payloads.

Type declaration

    • (codec): void
    • Register a global payload codec for encoding/decoding serialized object data at rest. Once registered, all object values flowing through the serializer are stored as /b{encoded} instead of /s{json}. Use this for encryption, compression, or custom encoding.

      The codec is global — it applies to all HotMesh and Durable instances in the process. Pass null to remove a previously registered codec.

      Constraints: The codec must be synchronous and its output must be a valid UTF-8 string. Use base64 encoding for binary output.

      Parameters

      Returns void

      import { HotMesh } from '@hotmeshio/hotmesh';

      HotMesh.registerCodec({
      encode(json) { return Buffer.from(json).toString('base64'); },
      decode(encoded) { return Buffer.from(encoded, 'base64').toString('utf8'); },
      });
revokeWorkerRole: ((config: {
    connection: ProviderConfig | ProvidersConfig;
    namespace?: string;
    roleName: string;
}) => Promise<void>) = HotMesh.revokeWorkerRole

Revoke a secured worker role (disables login).

Type declaration

    • (config): Promise<void>
    • Revoke a worker role by disabling login. The role is not dropped, preserving the audit trail in the worker_credentials table.

      Parameters

      Returns Promise<void>

rotateWorkerPassword: ((config: {
    connection: ProviderConfig | ProvidersConfig;
    namespace?: string;
    newPassword?: string;
    roleName: string;
}) => Promise<{
    password: string;
}>) = HotMesh.rotateWorkerPassword

Rotate a secured worker role's password.

Type declaration

    • (config): Promise<{
          password: string;
      }>
    • Rotate the password for an existing worker role.

      Parameters

      Returns Promise<{
          password: string;
      }>

Worker: typeof WorkerService = WorkerService

Hosts workflow and activity functions. Call Worker.create() with a connection, task queue, and workflow function to start processing.

workflow: typeof WorkflowService = WorkflowService

The workflow-internal API. Every method on this object (proxyActivities, sleep, condition, signal, executeChild, continueAsNew, patched, CancellationScope, etc.) is designed to be called inside a workflow function and participates in deterministic replay.

Methods

  • Register an activity inbound interceptor that wraps the actual activity function execution on the activity worker side. This runs inside the activity's AsyncLocalStorage context, NOT the workflow context.

    Use for logging, metrics, auth validation, or error enrichment at the point where the activity actually executes.

    Parameters

    Returns void

  • Register a workflow interceptor that wraps the entire workflow execution in an onion-like pattern. Interceptors execute in registration order (first registered is outermost) and can perform actions before and after workflow execution, handle errors, and add cross-cutting concerns like logging, metrics, or tracing.

    Workflow interceptors run inside the workflow's async local storage context, so all Durable workflow methods (proxyActivities, sleep, condition, executeChild, etc.) are available. When using Durable functions, always check for interruptions with Durable.didInterrupt(err) and rethrow them.

    Parameters

    Returns void

    // Logging interceptor
    Durable.registerInboundInterceptor({
    async execute(ctx, next) {
    console.log(`Workflow ${ctx.get('workflowName')} starting`);
    try {
    const result = await next();
    console.log(`Workflow ${ctx.get('workflowName')} completed`);
    return result;
    } catch (err) {
    if (Durable.didInterrupt(err)) throw err;
    console.error(`Workflow ${ctx.get('workflowName')} failed`);
    throw err;
    }
    }
    });
  • Register an outbound interceptor that wraps individual proxied activity calls within workflows. Interceptors execute in registration order (first registered is outermost) using the onion pattern.

    Outbound interceptors run inside the workflow's execution context and have access to all Durable workflow methods (proxyActivities, sleep, condition, executeChild, etc.). The activityCtx parameter provides activityName, args, and options for the call being intercepted. The workflowCtx map provides workflow metadata (workflowId, workflowName, namespace, etc.).

    Parameters

    Returns void

    Durable.registerOutboundInterceptor({
    async execute(activityCtx, workflowCtx, next) {
    const start = Date.now();
    try {
    const result = await next();
    console.log(`${activityCtx.activityName} took ${Date.now() - start}ms`);
    return result;
    } catch (err) {
    if (Durable.didInterrupt(err)) throw err;
    throw err;
    }
    }
    });
  • Gracefully shut down all workers, clients, and connections. Call from your process signal handlers (SIGTERM, SIGINT).

    Returns Promise<void>