Dispatches work to a registered callback function. The worker activity publishes a message to its configured topic stream, where a worker process picks it up, executes the callback, and returns a response that the engine captures as the activity's output.

app:
id: myapp
version: '1'
graphs:
- subscribes: order.placed
expire: 120

activities:
t1:
type: trigger

a1:
type: worker
topic: work.do # matches the registered worker topic
input:
schema:
type: object
properties:
x: { type: string }
maps:
x: '{t1.output.data.inputField}'
output:
schema:
type: object
properties:
y: { type: string }
job:
maps:
result: '{$self.output.data.y}'

transitions:
t1:
- to: a1

Workers are registered at initialization time via the workers array in HotMesh.init. Each worker binds a topic to a callback function.

const hotMesh = await HotMesh.init({
appId: 'myapp',
engine: { connection },
workers: [{
topic: 'work.do',
connection,
callback: async (data: StreamData) => ({
metadata: { ...data.metadata },
data: { y: `${data.data.x} transformed` }
})
}]
});

Retry behavior is configured at the worker level (not in YAML) via the retryPolicy option. Failed callbacks are retried with exponential backoff until maximumAttempts is exhausted. The maximumInterval caps the delay between retries.

const hotMesh = await HotMesh.init({
appId: 'myapp',
engine: { connection },
workers: [{
topic: 'work.backoff',
connection,
retryPolicy: {
maximumAttempts: 5, // retry up to 5 times
backoffCoefficient: 2, // exponential: 2^0, 2^1, 2^2, ... seconds
maximumInterval: '30s', // cap delay at 30 seconds
},
callback: async (data: StreamData) => {
const result = await doWork(data.data);
return {
code: 200,
status: StreamStatus.SUCCESS,
metadata: { ...data.metadata },
data: { result },
} as StreamDataResponse;
},
}]
});

Worker is a Category A (duplex) activity:

  • Leg 1 (process): Maps input data and publishes a message to the worker's topic stream.
  • Leg 2 (processEvent, inherited): Receives the worker's response, maps output data, and executes the step protocol to transition to adjacent activities.

WorkerActivity for the TypeScript interface

Hierarchy (view full)

Constructors

Properties

adjacencyList: StreamData[]
adjacentIndex: number = 0
code: number = 200
context: JobState
guidLedger: number = 0
logger: ILogger
status: StreamStatus = StreamStatus.SUCCESS

Methods

  • if the job is created/deleted/created with the same key, the 'gid' ensures no stale messages (such as sleep delays) enter the workstream. Any message with a mismatched gid belongs to a prior job and can safely be ignored/dropped.

    Parameters

    • jobGID: string
    • OptionalmsgGID: string

    Returns void

  • unhandled activity errors (activities that return an ERROR StreamMessage status and have no adjacent children to transition to) are bound to the job

    Parameters

    • data: Record<string, unknown>

    Returns void

  • Executes the 3-step Leg1 protocol for Category B activities (Leg1-only with children, e.g., Hook passthrough, Signal, Interrupt-another). Uses the incoming Leg1 message GUID as the GUID ledger key.

    Step A: setState + notarizeLeg1Completion + step1 markers (transaction 1) Step B: publish children + step2 markers + setStatusAndCollateGuid (transaction 2) Step C: if edge → runJobCompletionTasks + step3 markers + finalize (transaction 3)

    Parameters

    • delta: number

    Returns Promise<boolean>

    true if this transition caused the job to complete

  • Executes the 3-step Leg2 protocol using GUID ledger for crash-safe resume. Each step bundles durable writes with its concluding digit update in a single transaction.

    Parameters

    • delta: number
    • shouldFinalize: boolean

    Returns Promise<boolean>

    true if this transition caused the job to complete

  • Leg1 entry verification for Category B activities (Leg1-only with children). Returns true if this is a resume (Leg1 already completed on a prior attempt). On resume, loads the GUID ledger for step-level resume decisions.

    Returns Promise<boolean>

  • Upon entering leg 2 of a duplexed activity. Increments both the activity ledger (+1) and GUID ledger (+1). Stores the GUID ledger value for step-level resume decisions.

    Returns Promise<number>