StaticdisconnectingOnce the app YAML file is deployed to the provider backend, the activate function can be
called to enable it for the entire quorum at the same moment.
The approach is to establish the coordinated health of the system through series
of call/response exchanges. Once it is established that the quorum is healthy,
the quorum is instructed to run their engine in no-cache mode, ensuring
that the provider backend is consulted for the active app version each time a
call is processed. This ensures that all engines are running the same version
of the app, switching over at the same moment and then enabling cache mode
to improve performance.
Add a delay for the quorum to reach consensus if traffic is busy, but also consider throttling traffic flow to an acceptable level.
Optionaldelay: numberAdd a transition message to the workstream, resuming leg 2 of a paused reentrant activity (e.g., await, worker, hook)
When the app YAML descriptor file is ready, the deploy function can be called.
This function is responsible for merging all referenced YAML source
files and writing the JSON output to the file system and to the provider backend. It
is also possible to embed the YAML in-line as a string.
The version will not be active until activation is explicitly called.
Returns searchable/queryable data for a job. In this example a literal field is also searched (the colon is used to track job status and is a reserved field; it can be read but not written).
Returns all data (HGETALL) for a job.
Returns the status of a job. This is a numeric
semaphore value that indicates the job's state.
Any non-positive value indicates a completed job.
Jobs with a value of -1 are pending and will
automatically be scrubbed after a set period.
Jobs a value around -1billion have been interrupted
and will be scrubbed after a set period. Jobs with
a value of 0 completed normally. Jobs with a
positive value are still running.
Re/entry point for an active job. This is used to resume a paused job
and close the reentry point or leave it open for subsequent reentry.
Because hooks are public entry points, they include a topic
which is established in the app YAML file.
When this method is called, a hook rule will be located to establish the exact activity and activity dimension for reentry.
Optionalstatus: StreamStatusOptionalcode: numberInterrupt an active job
Listen to all output and interim emissions of a workflow topic matching a wildcard pattern.
Starts a workflow
Optionalcontext: JobStateOptionalextended: ExtensionTypePublish a message to the quorum (engine and/or workers)
Request a roll call from the quorum (engine and workers)
Optionaldelay: numberSubscribe (listen) to all output and interim emissions of a single workflow topic. NOTE: Postgres does not support patterned unsubscription, so this method is not supported for Postgres.
Subscribe to quorum events (engine and workers)
Sends a throttle message to the quorum (engine and/or workers)
to limit the rate of processing. Pass -1 to throttle indefinitely.
The value must be a non-negative integer and not exceed MAX_DELAY ms.
When throttling is set, the quorum will pause for the specified time
before processing the next message. Target specific engines and
workers by passing a guid and/or topic. Pass no arguments to
throttle the entire quorum.
In this example, all processing has been paused indefinitely for the entire quorum. This is equivalent to an emergency stop.
HotMesh is a stateless sequence engine, so the throttle can be adjusted up and down with no loss of data.
Unsubscribe from quorum events (engine and workers)
StaticguidStaticinitInstance initializer. Workers are configured similarly to the engine, but as an array with multiple worker objects.
HotMesh supports robust retry policies with exponential backoff for PostgreSQL. Configure retry behavior at the stream level for automatic fault tolerance.
const config: HotMeshConfig = {
appId: 'myapp',
engine: {
connection: {
class: Postgres,
options: {
connectionString: 'postgresql://usr:pwd@localhost:5432/db',
}
}
},
workers [...]
};
const hotMesh = await HotMesh.init(config);
import { HotMesh } from '@hotmeshio/hotmesh';
import { Client as Postgres } from 'pg';
const hotMesh = await HotMesh.init({
appId: 'my-app',
engine: {
connection: {
class: Postgres,
options: { connectionString: 'postgresql://...' }
},
// Default retry policy for engine streams
retryPolicy: {
maximumAttempts: 5, // Retry up to 5 times
backoffCoefficient: 2, // Exponential: 2^0, 2^1, 2^2...
maximumInterval: '300s' // Cap delay at 5 minutes
}
},
workers: [{
topic: 'order.process',
connection: {
class: Postgres,
options: { connectionString: 'postgresql://...' }
},
// Worker-specific retry policy
retryPolicy: {
maximumAttempts: 10,
backoffCoefficient: 1.5,
maximumInterval: '600s'
},
callback: async (data) => {
// Your business logic here
// Failures will automatically retry with exponential backoff
return { status: 'success', data: processedData };
}
}]
});
Retry Policy Options:
maximumAttempts - Maximum retry attempts before failure (default: 3)backoffCoefficient - Base for exponential backoff calculation (default: 10)maximumInterval - Maximum delay between retries in seconds or duration string (default: '120s')Retry Delays: For backoffCoefficient: 2, delays are: 2s, 4s, 8s, 16s, 32s...
capped at maximumInterval.
Note: Retry policies are stored in PostgreSQL columns for efficient querying and observability. Each retry creates a new message, preserving message immutability.
Staticstop
HotMesh transforms Postgres into a durable workflow orchestration engine capable of running fault-tolerant workflows across multiple services and systems.
Key Features
Architecture
HotMesh consists of several specialized modules:
Lifecycle Overview
Basic Usage
Example
Postgres Backend Example
Example
Advanced Features
Pattern Subscriptions: Listen to multiple workflow topics
Throttling: Control processing rates
Workflow Interruption: Gracefully stop running workflows
State Inspection: Query workflow state and progress
Distributed Coordination
HotMesh automatically handles distributed coordination through its quorum system:
Integration with Higher-Level Modules
For most use cases, consider using the higher-level modules:
Cleanup
Always clean up resources when shutting down:
See