The callback function to execute when a message is dequeued from the target stream
Optionalconnection?: ProviderConfig | ProvidersConfigshort-form format for the connection options for the store, stream, sub, and search clients
Optionalreadonly?: booleanIf true, the worker's router will not consume messages from the
stream. The worker can still publish responses but will never
dequeue or process messages. This is inherited from the
connection's readonly flag by the Durable layer.
Optionalreclaimthe number of times to reclaim a stream before giving up and moving the message to a dead-letter queue or other error handling mechanism
Optionalreclaimthe number of milliseconds to wait before reclaiming a stream; depending upon the provider this may be an explicit retry event, consuming a message from the stream and re-queueing it (dlq, etc), or it may be a configurable delay before the provider exposes the message to the consumer again. It is up to the provider, but expressed in milliseconds here.
Optionalretry?: types/streamRetry policy for stream messages. Configures automatic retry behavior with exponential backoff for failed operations. Applied to the stream connection during initialization.
OptionaltaskTask queue identifier used for connection pooling optimization. When provided, connections will be reused across providers (store, sub, stream) that share the same task queue and database configuration.
the topic/task queue that the worker subscribes to (stream_name)
OptionalworkerScoped Postgres credentials for database-level worker isolation.
When provided, the user and password in the worker's connection
options are overridden with these values, and all stream operations
route through SECURITY DEFINER stored procedures that validate
app.allowed_streams before executing. The worker role has zero
direct table access.
Use this for workers running in untrusted environments: pluggable K8s containers, LLM-driven agents (e.g., MCP tool servers), third-party integrations, or any workload that should be isolated from the engine's database surface.
Provision credentials via HotMesh.provisionWorkerRole() or by
creating a Postgres role with EXECUTE on the schema's worker
stored procedures and ALTER ROLE ... SET app.allowed_streams.
Limitations: Workflows running under scoped credentials cannot
use Durable.workflow.entity(), Durable.workflow.search(), or
Durable.workflow.enrich() — these write directly to the jobs
and jobs_attributes tables, which the scoped role has no access
to. All other workflow primitives (proxyActivities, sleep,
condition, signal, emit, executeChild, etc.) work normally.
// Admin provisions scoped credentials (one-time)
const cred = await HotMesh.provisionWorkerRole({
connection: { class: Postgres, options: adminOptions },
streamNames: ['order.process'],
});
// Worker connects with restricted role
workers: [{
topic: 'order.process',
connection: { class: Postgres, options: { host: 'pg.prod', database: 'db' } },
workerCredentials: { user: cred.roleName, password: cred.password },
callback: myCallback,
}]
Optionalworkflowthe workflow function name for dispatch routing (workflow_name column). When set, workers sharing the same topic use a singleton consumer that fetches batches and dispatches by workflowName.
Configuration for a HotMesh worker that consumes messages from a Postgres stream topic. Workers can run on the same process as the engine or on entirely separate servers — the only coupling is the shared Postgres database.
Connection Modes
Standard: Worker uses the same Postgres credentials as the engine. Set
connectionwith admin credentials. Suitable for trusted, co-located workers.Secured: Worker connects as a restricted Postgres role with
workerCredentials. The role can only dequeue/ack/respond on its allowed stream topics via SECURITY DEFINER stored procedures — zero direct table access. Use for untrusted workloads: K8s containers, LLM agents, third-party integrations.