The Durable service provides a Temporal-compatible workflow framework backed by Postgres. It offers entity-based memory management and composable, fault-tolerant workflows.

Each workflow has a durable JSONB entity that serves as its memory:

export async function researchAgent(query: string) {
const agent = await Durable.workflow.entity();

// Initialize entity state
await agent.set({
query,
findings: [],
status: 'researching'
});

// Update state atomically
await agent.merge({ status: 'analyzing' });
await agent.append('findings', newFinding);
}

Spawn and coordinate multiple perspectives/phases:

// Launch parallel research perspectives
await Durable.workflow.execHook({
taskQueue: 'research',
workflowName: 'optimisticView',
args: [query],
signalId: 'optimistic-complete'
});

await Durable.workflow.execHook({
taskQueue: 'research',
workflowName: 'skepticalView',
args: [query],
signalId: 'skeptical-complete'
});

// Wait for both perspectives
await Promise.all([
Durable.workflow.waitFor('optimistic-complete'),
Durable.workflow.waitFor('skeptical-complete')
]);

Define and execute durable activities with automatic retry:

// Default: activities use workflow's task queue
const activities = Durable.workflow.proxyActivities<{
analyzeDocument: typeof analyzeDocument;
validateFindings: typeof validateFindings;
}>({
activities: { analyzeDocument, validateFindings },
retryPolicy: {
maximumAttempts: 3,
backoffCoefficient: 2
}
});

// Activities are durable and automatically retried
const analysis = await activities.analyzeDocument(data);
const validation = await activities.validateFindings(analysis);

Register activity workers explicitly before workflows start:

// Register shared activity pool for interceptors
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'shared-activities'
}, sharedActivities, 'shared-activities');

// Register custom activity pool for specific use cases
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'priority-activities'
}, priorityActivities, 'priority-activities');

Build complex workflows through composition:

// Start a child workflow
const childResult = await Durable.workflow.execChild({
taskQueue: 'analysis',
workflowName: 'detailedAnalysis',
args: [data],
// Child workflow config
config: {
maximumAttempts: 5,
backoffCoefficient: 2
}
});

// Fire-and-forget child workflow
await Durable.workflow.startChild({
taskQueue: 'notifications',
workflowName: 'sendUpdates',
args: [updates]
});

Add cross-cutting concerns through interceptors that run as durable functions:

// First register shared activity worker for interceptors
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'interceptor-activities'
}, { auditLog }, 'interceptor-activities');

// Add audit interceptor that uses activities with explicit taskQueue
Durable.registerInterceptor({
async execute(ctx, next) {
try {
// Interceptors use explicit taskQueue to prevent per-workflow queues
const { auditLog } = Durable.workflow.proxyActivities<typeof activities>({
activities: { auditLog },
taskQueue: 'interceptor-activities', // Explicit shared queue
retryPolicy: { maximumAttempts: 3 }
});

await auditLog(ctx.get('workflowId'), 'started');

const result = await next();

await auditLog(ctx.get('workflowId'), 'completed');

return result;
} catch (err) {
// CRITICAL: Always check for HotMesh interruptions
if (Durable.didInterrupt(err)) {
throw err; // Rethrow for replay system
}
throw err;
}
}
});

Wrap individual proxied activity calls with cross-cutting logic. Unlike workflow interceptors (which wrap the entire workflow), activity interceptors execute around each proxyActivities call. They run inside the workflow's execution context and have access to all Durable workflow methods (proxyActivities, sleepFor, waitFor, execChild, etc.). Multiple activity interceptors execute in onion order (first registered is outermost).

// Simple logging interceptor
Durable.registerActivityInterceptor({
async execute(activityCtx, workflowCtx, next) {
console.log(`Activity ${activityCtx.activityName} starting`);
try {
const result = await next();
console.log(`Activity ${activityCtx.activityName} completed`);
return result;
} catch (err) {
if (Durable.didInterrupt(err)) throw err;
console.error(`Activity ${activityCtx.activityName} failed`);
throw err;
}
}
});

// Interceptor that calls its own proxy activities
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'audit-activities'
}, { auditLog }, 'audit-activities');

Durable.registerActivityInterceptor({
async execute(activityCtx, workflowCtx, next) {
try {
const { auditLog } = Durable.workflow.proxyActivities<{
auditLog: (id: string, action: string) => Promise<void>;
}>({
taskQueue: 'audit-activities',
retryPolicy: { maximumAttempts: 3 }
});

await auditLog(workflowCtx.get('workflowId'), `before:${activityCtx.activityName}`);
const result = await next();
await auditLog(workflowCtx.get('workflowId'), `after:${activityCtx.activityName}`);
return result;
} catch (err) {
if (Durable.didInterrupt(err)) throw err;
throw err;
}
}
});
import { Client, Worker, Durable } from '@hotmeshio/hotmesh';
import { Client as Postgres } from 'pg';
import * as activities from './activities';

// (Optional) Register shared activity workers for interceptors
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'shared-activities'
}, sharedActivities, 'shared-activities');

// Initialize worker
await Worker.create({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'default',
workflow: workflows.example
});

// Initialize client
const client = new Client({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
}
});

// Start workflow
const handle = await client.workflow.start({
args: ['input data'],
taskQueue: 'default',
workflowName: 'example',
workflowId: Durable.guid()
});

// Get result
const result = await handle.result();

// Cleanup
await Durable.shutdown();

Properties

Client: typeof ClientService = ClientService

The Durable Client service is functionally equivalent to the Temporal Client service.

Connection: typeof ConnectionService = ConnectionService

The Durable Connection service is functionally equivalent to the Temporal Connection service.

didInterrupt: ((error: Error) => boolean) = didInterrupt

Checks if an error is a HotMesh reserved error type that indicates a workflow interruption rather than a true error condition.

Type declaration

    • (error): boolean
    • Type guard that returns true if an error is a Durable engine control-flow signal rather than a genuine application error.

      Durable uses thrown errors internally to suspend workflow execution for durable operations like sleepFor, waitFor, proxyActivities, and execChild. These errors must be re-thrown (not swallowed) so the engine can persist state and schedule the next step.

      Always use didInterrupt in catch blocks inside workflow functions to avoid accidentally swallowing engine signals.

      DurableChildError, DurableFatalError, DurableMaxedError, DurableProxyError, DurableRetryError, DurableSleepError, DurableTimeoutError, DurableWaitForError, DurableWaitForAllError

      import { Durable } from '@hotmeshio/hotmesh';

      export async function safeWorkflow(): Promise<string> {
      const { riskyOperation } = Durable.workflow.proxyActivities<typeof activities>();

      try {
      return await riskyOperation();
      } catch (error) {
      // CRITICAL: re-throw engine signals
      if (Durable.workflow.didInterrupt(error)) {
      throw error;
      }
      // Handle real application errors
      return 'fallback-value';
      }
      }
      // Common pattern in interceptors
      const interceptor: WorkflowInterceptor = {
      async execute(ctx, next) {
      try {
      return await next();
      } catch (error) {
      if (Durable.workflow.didInterrupt(error)) {
      throw error; // always re-throw engine signals
      }
      // Log and re-throw application errors
      console.error('Workflow failed:', error);
      throw error;
      }
      },
      };

      Parameters

      • error: Error

        The error to check.

      Returns boolean

      true if the error is a Durable engine interruption signal.

didInterrupt for detailed documentation

guid: ((size?: number) => string) = guid

Generate a unique identifier for workflow IDs

Handle: typeof WorkflowHandleService = WorkflowHandleService

The Handle provides methods to interact with a running workflow. This includes exporting the workflow, sending signals, and querying the state of the workflow. An instance of the Handle service is typically accessed via the Durable.Client class (workflow.getHandle).

registerActivityWorker: ((config: Partial<WorkerConfig>, activities: any, activityTaskQueue?: string) => Promise<HotMesh>) = WorkerService.registerActivityWorker

Register activity workers for a task queue. Activities execute via message queue and can run on different servers from workflows.

Type declaration

    • (config, activities, activityTaskQueue?): Promise<HotMesh>
    • Register activity workers for a task queue. Activities are invoked via message queue, so they can run on different servers from workflows.

      The task queue name gets -activity appended automatically for the worker topic. For example, taskQueue: 'payment' creates a worker listening on payment-activity.

      Parameters

      • config: Partial<WorkerConfig>

        Worker configuration (connection, namespace, taskQueue)

      • activities: any

        Activity functions to register

      • OptionalactivityTaskQueue: string

        Task queue name (without -activity suffix). Defaults to config.taskQueue if not provided.

      Returns Promise<HotMesh>

      Promise The initialized activity worker

      // Activity worker (can be on separate server)
      import { Durable } from '@hotmeshio/hotmesh';
      import { Client as Postgres } from 'pg';

      const activities = {
      async processPayment(amount: number): Promise<string> {
      return `Processed $${amount}`;
      },
      async sendEmail(to: string, subject: string): Promise<void> {
      // Send email
      }
      };

      await Durable.registerActivityWorker({
      connection: {
      class: Postgres,
      options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
      },
      taskQueue: 'payment' // Listens on 'payment-activity'
      }, activities, 'payment');
      // Workflow worker (can be on different server)
      async function orderWorkflow(orderId: string, amount: number) {
      const { processPayment, sendEmail } = Durable.workflow.proxyActivities<{
      processPayment: (amount: number) => Promise<string>;
      sendEmail: (to: string, subject: string) => Promise<void>;
      }>({
      taskQueue: 'payment',
      retryPolicy: { maximumAttempts: 3 }
      });

      const result = await processPayment(amount);
      await sendEmail('customer@example.com', 'Order confirmed');
      return result;
      }

      await Durable.Worker.create({
      connection: {
      class: Postgres,
      options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
      },
      taskQueue: 'orders',
      workflow: orderWorkflow
      });
      // Shared activity pool for interceptors
      await Durable.registerActivityWorker({
      connection: {
      class: Postgres,
      options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
      },
      taskQueue: 'shared'
      }, { auditLog, collectMetrics }, 'shared');

      const interceptor: WorkflowInterceptor = {
      async execute(ctx, next) {
      const { auditLog } = Durable.workflow.proxyActivities<{
      auditLog: (id: string, action: string) => Promise<void>;
      }>({
      taskQueue: 'shared',
      retryPolicy: { maximumAttempts: 3 }
      });
      await auditLog(ctx.get('workflowId'), 'started');
      return next();
      }
      };
// Activity worker
const activities = {
async processPayment(amount: number) { return `Processed $${amount}`; },
async sendEmail(to: string, msg: string) { /* ... */ }
};

await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'payment'
}, activities, 'payment');

// Workflow worker (can be on different server)
async function orderWorkflow(amount: number) {
const { processPayment, sendEmail } = Durable.workflow.proxyActivities<{
processPayment: (amount: number) => Promise<string>;
sendEmail: (to: string, msg: string) => Promise<void>;
}>({
taskQueue: 'payment',
retryPolicy: { maximumAttempts: 3 }
});

const result = await processPayment(amount);
await sendEmail('customer@example.com', result);
return result;
}

await Durable.Worker.create({
connection: { class: Postgres, options: { connectionString: '...' } },
taskQueue: 'orders',
workflow: orderWorkflow
});
Worker: typeof WorkerService = WorkerService

The Durable Worker service is functionally equivalent to the Temporal Worker service.

workflow: typeof WorkflowService = WorkflowService

The Durable workflow service is functionally equivalent to the Temporal Workflow service with additional methods for managing workflows, including: execChild, waitFor, sleep, etc

Methods

  • Register an activity interceptor that wraps individual proxied activity calls within workflows. Interceptors execute in registration order (first registered is outermost) using the onion pattern.

    Activity interceptors run inside the workflow's execution context and have access to all Durable workflow methods (proxyActivities, sleepFor, waitFor, execChild, etc.). The activityCtx parameter provides activityName, args, and options for the call being intercepted. The workflowCtx map provides workflow metadata (workflowId, workflowName, namespace, etc.).

    Parameters

    Returns void

    Durable.registerActivityInterceptor({
    async execute(activityCtx, workflowCtx, next) {
    const start = Date.now();
    try {
    const result = await next();
    console.log(`${activityCtx.activityName} took ${Date.now() - start}ms`);
    return result;
    } catch (err) {
    if (Durable.didInterrupt(err)) throw err;
    throw err;
    }
    }
    });
  • Register a workflow interceptor that wraps the entire workflow execution in an onion-like pattern. Interceptors execute in registration order (first registered is outermost) and can perform actions before and after workflow execution, handle errors, and add cross-cutting concerns like logging, metrics, or tracing.

    Workflow interceptors run inside the workflow's async local storage context, so all Durable workflow methods (proxyActivities, sleepFor, waitFor, execChild, etc.) are available. When using Durable functions, always check for interruptions with Durable.didInterrupt(err) and rethrow them.

    Parameters

    Returns void

    // Logging interceptor
    Durable.registerInterceptor({
    async execute(ctx, next) {
    console.log(`Workflow ${ctx.get('workflowName')} starting`);
    try {
    const result = await next();
    console.log(`Workflow ${ctx.get('workflowName')} completed`);
    return result;
    } catch (err) {
    if (Durable.didInterrupt(err)) throw err;
    console.error(`Workflow ${ctx.get('workflowName')} failed`);
    throw err;
    }
    }
    });
  • Shutdown everything. All connections, workers, and clients will be closed. Include in your signal handlers to ensure a clean shutdown.

    Returns Promise<void>