Dispatches work to a registered callback function. The worker activity publishes a message to its configured topic stream, where a worker process picks it up, executes the callback, and returns a response that the engine captures as the activity's output.

app:
id: myapp
version: '1'
graphs:
- subscribes: order.placed
expire: 120

activities:
t1:
type: trigger

a1:
type: worker
topic: work.do # matches the registered worker topic
input:
schema:
type: object
properties:
x: { type: string }
maps:
x: '{t1.output.data.inputField}'
output:
schema:
type: object
properties:
y: { type: string }
job:
maps:
result: '{$self.output.data.y}'

transitions:
t1:
- to: a1

Workers are registered at initialization time via the workers array in HotMesh.init. Each worker binds a topic to a callback function.

const hotMesh = await HotMesh.init({
appId: 'myapp',
engine: { connection },
workers: [{
topic: 'work.do',
connection,
callback: async (data: StreamData) => ({
metadata: { ...data.metadata },
data: { y: `${data.data.x} transformed` }
})
}]
});

Retry behavior is configured at the worker level (not in YAML) via the retry option. Failed callbacks are retried with exponential backoff until maximumAttempts is exhausted. The maximumInterval caps the delay between retries.

const hotMesh = await HotMesh.init({
appId: 'myapp',
engine: { connection },
workers: [{
topic: 'work.backoff',
connection,
retry: {
maximumAttempts: 5, // retry up to 5 times
backoffCoefficient: 2, // exponential: 2^0, 2^1, 2^2, ... seconds
maximumInterval: '30s', // cap delay at 30 seconds
},
callback: async (data: StreamData) => {
const result = await doWork(data.data);
return {
code: 200,
status: StreamStatus.SUCCESS,
metadata: { ...data.metadata },
data: { result },
} as StreamDataResponse;
},
}]
});

Worker is a Category A (duplex) activity:

  • Leg 1 (process): Maps input data and publishes a message to the worker's topic stream.
  • Leg 2 (processEvent, inherited): Receives the worker's response, maps output data, and executes the step protocol to transition to adjacent activities.

WorkerActivity for the TypeScript interface

Hierarchy (view full)

Constructors

Properties

adjacencyList: StreamData[]
adjacentIndex: number = 0
code: number = 200
context: JobState
guidLedger: number = 0
logger: ILogger
status: StreamStatus = StreamStatus.SUCCESS

Methods