StaticcreateCreates and starts a workflow worker.
import { Durable } from '@hotmeshio/hotmesh';
import { Client as Postgres } from 'pg';
import * as workflows from './workflows';
async function run() {
const worker = await Durable.Worker.create({
connection: {
class: Postgres,
options: {
connectionString: 'postgres://user:password@localhost:5432/db'
},
},
taskQueue: 'default',
workflow: workflows.example,
});
await worker.run();
}
StatichashStaticregisterRegister activity workers for a task queue. Activities are invoked via message queue, so they can run on different servers from workflows.
The task queue name gets -activity appended automatically for the worker topic.
For example, taskQueue: 'payment' creates a worker listening on payment-activity.
Worker configuration (connection, namespace, taskQueue)
Activity functions to register
OptionalactivityTaskQueue: stringTask queue name (without -activity suffix).
Defaults to config.taskQueue if not provided.
Promise
// Activity worker (can be on separate server)
import { Durable } from '@hotmeshio/hotmesh';
import { Client as Postgres } from 'pg';
const activities = {
async processPayment(amount: number): Promise<string> {
return `Processed $${amount}`;
},
async sendEmail(to: string, subject: string): Promise<void> {
// Send email
}
};
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'payment' // Listens on 'payment-activity'
}, activities, 'payment');
// Workflow worker (can be on different server)
async function orderWorkflow(orderId: string, amount: number) {
const { processPayment, sendEmail } = Durable.workflow.proxyActivities<{
processPayment: (amount: number) => Promise<string>;
sendEmail: (to: string, subject: string) => Promise<void>;
}>({
taskQueue: 'payment',
retry: { maximumAttempts: 3 }
});
const result = await processPayment(amount);
await sendEmail('customer@example.com', 'Order confirmed');
return result;
}
await Durable.Worker.create({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'orders',
workflow: orderWorkflow
});
// Shared activity pool for interceptors
await Durable.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'shared'
}, { auditLog, collectMetrics }, 'shared');
const interceptor: WorkflowInboundCallsInterceptor = {
async execute(ctx, next) {
const { auditLog } = Durable.workflow.proxyActivities<{
auditLog: (id: string, action: string) => Promise<void>;
}>({
taskQueue: 'shared',
retry: { maximumAttempts: 3 }
});
await auditLog(ctx.get('workflowId'), 'started');
return next();
}
};
// Secured worker with scoped Postgres credentials (VNF-style isolation)
// Step 1: Admin provisions a credential (one-time)
const cred = await Durable.provisionWorkerRole({
connection: { class: Postgres, options: adminOptions },
streamNames: ['payment-activity'],
});
// Step 2: Worker connects with scoped role — can only access payment-activity
await Durable.registerActivityWorker({
connection: { class: Postgres, options: { host: 'pg.prod', database: 'hotmesh' } },
taskQueue: 'payment',
workerCredentials: { user: cred.roleName, password: cred.password },
}, { processPayment, refundPayment });
Hosts workflow and activity functions, connecting them to Postgres for durable execution, replay, and automatic retry.
Connection Modes
Standard (legacy) — full admin access
The worker connects with the same Postgres credentials as the engine. Simple to set up; all workers share the same connection pool.
Secured — scoped Postgres role (recommended for production)
The worker connects as a restricted Postgres role that can only dequeue/ack/respond on its assigned stream names. All data access goes through SECURITY DEFINER stored procedures that validate the role's
app.allowed_streamssession variable before executing.Step 1: Provision a scoped credential (run once, from the engine/admin):
Step 2: Pass the credential when creating the worker:
The worker role cannot:
jobs,jobs_attributes, or any engine tablesSee Durable.provisionWorkerRole for credential lifecycle management.
Telemetry
Workers automatically emit OpenTelemetry spans when an OTel SDK is registered. Initialize the SDK before calling
create():HMSH_TELEMETRY'info'(default)WORKFLOW/START,WORKFLOW/COMPLETE,WORKFLOW/ERROR,ACTIVITY/{name}'debug'infospans +DISPATCH/RETURNper operation + engine internals