The WorkflowService class provides a set of static methods to be used within a workflow function. These methods ensure deterministic replay, persistence of state, and error handling across re-entrant workflow executions.

import { MemFlow } from '@hotmeshio/hotmesh';

export async function waitForExample(): Promise<[boolean, number]> {
const [s1, s2] = await Promise.all([
MemFlow.workflow.waitFor<boolean>('my-sig-nal-1'),
MemFlow.workflow.waitFor<number>('my-sig-nal-2')
]);
return [s1, s2];
}

Properties

all: Object = all
didInterrupt: ((error: Error) => boolean) = didInterrupt

Type declaration

    • (error): boolean
    • Checks if an error is a HotMesh reserved error type that indicates a HotMesh interruption rather than a true error condition.

      When this returns true, you can safely return rethrow the error. The workflow engine will handle the interruption automatically.

      Parameters

      • error: Error

        The error to check

      Returns boolean

      true if the error is a HotMesh interruption

      import { MemFlow } from '@hotmeshio/hotmesh';

      try {
      await someWorkflowOperation();
      } catch (error) {
      // Check if this is a HotMesh interruption
      if (MemFlow.workflow.didInterrupt(error)) {
      // Rethrow the error
      throw error;
      }
      // Handle actual error
      console.error('Workflow failed:', error);
      }
didRun: Object = didRun
emit: ((events: StringAnyType, config??: {
    once: boolean;
}) => Promise<boolean>) = emit

Type declaration

    • (events, config?): Promise<boolean>
    • Emits events to the event bus provider. Topics are prefixed with the quorum namespace.

      Parameters

      • events: StringAnyType

        A mapping of topic => message to publish.

      • Optionalconfig: {
            once: boolean;
        } = ...

        If once is true, events are emitted only once.

        • once: boolean

      Returns Promise<boolean>

      True after emission completes.

enrich: ((fields: StringStringType) => Promise<boolean>) = enrich

Type declaration

    • (fields): Promise<boolean>
    • Adds custom user data to the backend workflow record (writes to HASH). Runs exactly once during workflow execution.

      Parameters

      Returns Promise<boolean>

      True when enrichment is completed.

entity: (() => Promise<Entity>) = entity

Type declaration

    • (): Promise<Entity>
    • Returns an entity session handle for interacting with the workflow's JSONB entity storage.

      Returns Promise<Entity>

      An entity session for workflow data.

      const entity = await workflow.entity();
      await entity.set({ user: { id: 123 } });
      await entity.merge({ user: { name: "John" } });
      const user = await entity.get("user");
execChild: (<T>(options: WorkflowOptions) => Promise<T>) = execChild

Type declaration

    • <T>(options): Promise<T>
    • Spawns a child workflow and awaits the result, or if await is false, returns immediately.

      Type Parameters

      • T

      Parameters

      Returns Promise<T>

      Result of the child workflow.

execHook: (<T>(options: ExecHookOptions) => Promise<T>) = execHook

Type declaration

    • <T>(options): Promise<T>
    • Executes a hook function and awaits the signal response. This is a convenience method that combines hook() and waitFor() operations.

      Signal Injection: The signalId is automatically injected as the LAST argument to the hooked function. The hooked function should check for this signal parameter and emit the signal when processing is complete.

      This behaves like execChild but targets the existing workflow instead of spawning a new workflow.

      Type Parameters

      • T

      Parameters

      Returns Promise<T>

      The signal result from the hooked function.

      // Execute a hook and await its signal response
      const result = await MemFlow.workflow.execHook({
      taskQueue: 'processing',
      workflowName: 'processData',
      args: ['user123', 'batch-process'],
      signalId: 'processing-complete'
      });

      // The hooked function receives the signal as the last argument:
      export async function processData(userId: string, processType: string, signalInfo?: { signal: string }) {
      // ... do processing work ...
      const result = { userId, processType, status: 'completed' };

      // Check if called via execHook (signalInfo will be present)
      if (signalInfo?.signal) {
      await MemFlow.workflow.signal(signalInfo.signal, result);
      }

      return result;
      }
      // Alternative pattern - check if last arg is signal object
      export async function myHookFunction(arg1: string, arg2: number, ...rest: any[]) {
      // ... process arg1 and arg2 ...
      const result = { processed: true, data: [arg1, arg2] };

      // Check if last argument is a signal object
      const lastArg = rest[rest.length - 1];
      if (lastArg && typeof lastArg === 'object' && lastArg.signal) {
      await MemFlow.workflow.signal(lastArg.signal, result);
      }

      return result;
      }
executeChild: (<T>(options: WorkflowOptions) => Promise<T>) = executeChild

Type declaration

    • <T>(options): Promise<T>
    • Spawns a child workflow and awaits the result, or if await is false, returns immediately.

      Type Parameters

      • T

      Parameters

      Returns Promise<T>

      Result of the child workflow.

getContext: (() => WorkflowContext) = getContext

Type declaration

hook: ((options: HookOptions) => Promise<string>) = hook

Type declaration

    • (options): Promise<string>
    • Spawns a hook from the main thread or a hook thread. If entity/workflowName are not provided, defaults to the current workflow.

      Parameters

      Returns Promise<string>

      The resulting hook/stream ID.

interrupt: ((jobId: string, options?: JobInterruptOptions) => Promise<string | void>) = interrupt

Type declaration

    • (jobId, options?): Promise<string | void>
    • Interrupts a running job by sending an interruption request.

      Parameters

      • jobId: string

        The ID of the job to interrupt.

      • options: JobInterruptOptions = {}

        Additional interruption options.

      Returns Promise<string | void>

      Result of the interruption, if any.

isSideEffectAllowed: Object = isSideEffectAllowed
proxyActivities: (<ACT>(options?: ActivityConfig) => ProxyType<ACT>) = proxyActivities

Type declaration

    • <ACT>(options?): ProxyType<ACT>
    • Provides a proxy for defined activities, ensuring deterministic replay and retry.

      Type Parameters

      • ACT

      Parameters

      • Optionaloptions: ActivityConfig

        Optional activity config (includes retryPolicy).

      Returns ProxyType<ACT>

      A proxy to call activities as if local, but durably managed by the workflow.

random: (() => number) = random

Type declaration

    • (): number
    • Returns a deterministic random number between 0 and 1. The number is derived from the current execution index, ensuring deterministic replay.

      Returns number

      A deterministic pseudo-random number between 0 and 1.

search: (() => Promise<Search>) = search

Type declaration

    • (): Promise<Search>
    • Returns a search session handle for interacting with the workflow's HASH storage.

      Returns Promise<Search>

      A search session for workflow data.

signal: ((signalId: string, data: Record<any, any>) => Promise<string>) = signal

Type declaration

    • (signalId, data): Promise<string>
    • Sends a signal payload to any paused workflow thread awaiting this signal. This method is commonly used to coordinate between workflows, hook functions, and external events.

      Parameters

      • signalId: string

        Unique signal identifier that matches a waitFor() call.

      • data: Record<any, any>

        The payload to send with the signal.

      Returns Promise<string>

      The resulting hook/stream ID.

      // Basic usage - send a simple signal with data
      await MemFlow.workflow.signal('signal-id', { name: 'WarmMash' });
      // Hook function signaling completion
      export async function exampleHook(name: string): Promise<void> {
      const result = await processData(name);
      await MemFlow.workflow.signal('hook-complete', { data: result });
      }
      // Signal with complex data structure
      await MemFlow.workflow.signal('process-complete', {
      status: 'success',
      data: { id: 123, name: 'test' },
      timestamp: new Date().toISOString()
      });
sleepFor: ((duration: string) => Promise<number>) = sleepFor

Type declaration

    • (duration): Promise<number>
    • Sleeps the workflow for a specified duration, deterministically. On replay, it will not actually sleep again, but resume after sleep.

      Parameters

      • duration: string

        A human-readable duration string (e.g., '1m', '2 hours', '30 seconds').

      Returns Promise<number>

      The resolved duration in seconds.

      // Basic usage - sleep for a specific duration
      await MemFlow.workflow.sleepFor('2 seconds');
      // Using with Promise.all for parallel operations
      const [greeting, timeInSeconds] = await Promise.all([
      someActivity(name),
      MemFlow.workflow.sleepFor('1 second')
      ]);
      // Multiple sequential sleeps
      await MemFlow.workflow.sleepFor('1 seconds'); // First pause
      await MemFlow.workflow.sleepFor('2 seconds'); // Second pause
startChild: ((options: WorkflowOptions) => Promise<string>) = startChild

Type declaration

    • (options): Promise<string>
    • Spawns a child workflow and returns the child Job ID without awaiting its completion.

      Parameters

      Returns Promise<string>

      The child job ID.

trace: ((attributes: StringScalarType, config??: {
    once: boolean;
}) => Promise<boolean>) = trace

Type declaration

    • (attributes, config?): Promise<boolean>
    • Executes a distributed trace, outputting the provided attributes to the telemetry sink (e.g. OpenTelemetry).

      This trace will only run once per workflow execution by default.

      Parameters

      • attributes: StringScalarType

        Key-value attributes to attach to the trace.

      • Optionalconfig: {
            once: boolean;
        } = ...

        If once is true, trace only runs once.

        • once: boolean

      Returns Promise<boolean>

      True if tracing succeeded, otherwise false.

waitFor: (<T>(signalId: string) => Promise<T>) = waitFor

Type declaration

    • <T>(signalId): Promise<T>
    • Pauses the workflow until a signal with the given signalId is received. This method is commonly used to coordinate between the main workflow and hook functions, or to wait for external events.

      Type Parameters

      • T

        The type of data expected in the signal payload

      Parameters

      • signalId: string

        A unique signal identifier shared by the sender and receiver.

      Returns Promise<T>

      The data payload associated with the received signal.

      // Basic usage - wait for a single signal
      const payload = await MemFlow.workflow.waitFor<PayloadType>('abcdefg');
      // Wait for multiple signals in parallel
      const [signal1, signal2] = await Promise.all([
      MemFlow.workflow.waitFor<Record<string, any>>('signal1'),
      MemFlow.workflow.waitFor<Record<string, any>>('signal2')
      ]);
      // Typical pattern with hook functions
      // In main workflow:
      await MemFlow.workflow.waitFor<ResponseType>('hook-complete');

      // In hook function:
      await MemFlow.workflow.signal('hook-complete', { data: result });

Methods