Hosts workflow and activity functions, connecting them to Postgres for durable execution, replay, and automatic retry.

The worker connects with the same Postgres credentials as the engine. Simple to set up; all workers share the same connection pool.

const worker = await Durable.Worker.create({
connection: {
class: Postgres,
options: { connectionString: 'postgres://user:pass@host:5432/hotmesh' },
},
taskQueue: 'orders',
workflow: orderWorkflow,
});

The worker connects as a restricted Postgres role that can only dequeue/ack/respond on its assigned stream names. All data access goes through SECURITY DEFINER stored procedures that validate the role's app.allowed_streams session variable before executing.

Step 1: Provision a scoped credential (run once, from the engine/admin):

const cred = await Durable.provisionWorkerRole({
connection: { class: Postgres, options: adminPgOptions },
namespace: 'durable',
streamNames: ['orders-activity'],
});
// cred = { roleName: 'hmsh_wrk_durable_orders_activity', password: '...' }

Step 2: Pass the credential when creating the worker:

const worker = await Durable.Worker.create({
connection: {
class: Postgres,
options: { host: 'pg.prod', port: 5432, database: 'hotmesh' },
},
taskQueue: 'orders',
workflow: orderWorkflow,
workerCredentials: { user: cred.roleName, password: cred.password },
});

The worker role cannot:

  • SELECT/INSERT/UPDATE/DELETE any table directly
  • Access jobs, jobs_attributes, or any engine tables
  • Dequeue messages from other workers' streams
  • LISTEN on other workers' notification channels

See Durable.provisionWorkerRole for credential lifecycle management.

Workers automatically emit OpenTelemetry spans when an OTel SDK is registered. Initialize the SDK before calling create():

import { NodeSDK } from '@opentelemetry/sdk-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto';
import { resourceFromAttributes } from '@opentelemetry/resources';
import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';

const sdk = new NodeSDK({
resource: resourceFromAttributes({ [ATTR_SERVICE_NAME]: 'my-service' }),
traceExporter: new OTLPTraceExporter({
url: 'https://api.honeycomb.io/v1/traces',
headers: { 'x-honeycomb-team': process.env.HONEYCOMB_API_KEY },
}),
});
sdk.start();
HMSH_TELEMETRY Spans emitted
'info' (default) WORKFLOW/START, WORKFLOW/COMPLETE, WORKFLOW/ERROR, ACTIVITY/{name}
'debug' All info spans + DISPATCH/RETURN per operation + engine internals

Methods

  • Creates and starts a workflow worker.

    Parameters

    Returns Promise<WorkerService>

    import { Durable } from '@hotmeshio/hotmesh';
    import { Client as Postgres } from 'pg';
    import * as workflows from './workflows';

    async function run() {
    const worker = await Durable.Worker.create({
    connection: {
    class: Postgres,
    options: {
    connectionString: 'postgres://user:password@localhost:5432/db'
    },
    },
    taskQueue: 'default',
    workflow: workflows.example,
    });

    await worker.run();
    }
  • Register activity workers for a task queue. Activities are invoked via message queue, so they can run on different servers from workflows.

    The task queue name gets -activity appended automatically for the worker topic. For example, taskQueue: 'payment' creates a worker listening on payment-activity.

    Parameters

    • config: Partial<WorkerConfig>

      Worker configuration (connection, namespace, taskQueue)

    • activities: any

      Activity functions to register

    • OptionalactivityTaskQueue: string

      Task queue name (without -activity suffix). Defaults to config.taskQueue if not provided.

    Returns Promise<HotMesh>

    Promise The initialized activity worker

    // Activity worker (can be on separate server)
    import { Durable } from '@hotmeshio/hotmesh';
    import { Client as Postgres } from 'pg';

    const activities = {
    async processPayment(amount: number): Promise<string> {
    return `Processed $${amount}`;
    },
    async sendEmail(to: string, subject: string): Promise<void> {
    // Send email
    }
    };

    await Durable.registerActivityWorker({
    connection: {
    class: Postgres,
    options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
    },
    taskQueue: 'payment' // Listens on 'payment-activity'
    }, activities, 'payment');
    // Workflow worker (can be on different server)
    async function orderWorkflow(orderId: string, amount: number) {
    const { processPayment, sendEmail } = Durable.workflow.proxyActivities<{
    processPayment: (amount: number) => Promise<string>;
    sendEmail: (to: string, subject: string) => Promise<void>;
    }>({
    taskQueue: 'payment',
    retry: { maximumAttempts: 3 }
    });

    const result = await processPayment(amount);
    await sendEmail('customer@example.com', 'Order confirmed');
    return result;
    }

    await Durable.Worker.create({
    connection: {
    class: Postgres,
    options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
    },
    taskQueue: 'orders',
    workflow: orderWorkflow
    });
    // Shared activity pool for interceptors
    await Durable.registerActivityWorker({
    connection: {
    class: Postgres,
    options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
    },
    taskQueue: 'shared'
    }, { auditLog, collectMetrics }, 'shared');

    const interceptor: WorkflowInboundCallsInterceptor = {
    async execute(ctx, next) {
    const { auditLog } = Durable.workflow.proxyActivities<{
    auditLog: (id: string, action: string) => Promise<void>;
    }>({
    taskQueue: 'shared',
    retry: { maximumAttempts: 3 }
    });
    await auditLog(ctx.get('workflowId'), 'started');
    return next();
    }
    };
    // Secured worker with scoped Postgres credentials (VNF-style isolation)
    // Step 1: Admin provisions a credential (one-time)
    const cred = await Durable.provisionWorkerRole({
    connection: { class: Postgres, options: adminOptions },
    streamNames: ['payment-activity'],
    });

    // Step 2: Worker connects with scoped role — can only access payment-activity
    await Durable.registerActivityWorker({
    connection: { class: Postgres, options: { host: 'pg.prod', database: 'hotmesh' } },
    taskQueue: 'payment',
    workerCredentials: { user: cred.roleName, password: cred.password },
    }, { processPayment, refundPayment });