Invokes another graph (sub-flow) and optionally waits for its completion. The await activity enables compositional workflows where one graph triggers another by publishing to its subscribes topic, creating a parent-child relationship between flows.

The topic in the await activity must match the subscribes topic of the child graph. Both graphs are defined in the same app YAML:

app:
id: myapp
version: '1'
graphs:

# ── Parent graph ──────────────────────────────
- subscribes: order.placed
expire: 120

activities:
t1:
type: trigger
job:
maps:
orderId: '{$self.output.data.id}'

a1:
type: await
topic: approval.requested # ◄── targets the child graph's subscribes
await: true
input:
schema:
type: object
properties:
orderId: { type: string }
maps:
orderId: '{t1.output.data.id}'
output:
schema:
type: object
properties:
approved: { type: boolean }
job:
maps:
approval: '{$self.output.data.approved}'

done:
type: hook

transitions:
t1:
- to: a1
a1:
- to: done

# ── Child graph (invoked by the await) ────────
- subscribes: approval.requested # ◄── matched by the await activity's topic
publishes: approval.completed
expire: 60

activities:
t1:
type: trigger
review:
type: worker
topic: approval.review

transitions:
t1:
- to: review

When await is explicitly set to false, the activity starts the child flow but does not wait for its completion. The parent flow immediately continues. The child's job_id is returned as the output.

a1:
type: await
topic: background.process
await: false
job:
maps:
childJobId: '{$self.output.data.job_id}'

Await is a Category A (duplex) activity:

  • Leg 1 (process): Maps input data and publishes a StreamDataType.AWAIT message to the engine stream. The engine starts the child flow.
  • Leg 2 (processEvent, inherited): Receives the child flow's final output, maps output data, and transitions to adjacent activities.

AwaitActivity for the TypeScript interface

Hierarchy (view full)

Constructors

Properties

adjacencyList: StreamData[]
adjacentIndex: number = 0
code: number = 200
context: JobState
guidLedger: number = 0
logger: ILogger
status: StreamStatus = StreamStatus.SUCCESS

Methods

  • if the job is created/deleted/created with the same key, the 'gid' ensures no stale messages (such as sleep delays) enter the workstream. Any message with a mismatched gid belongs to a prior job and can safely be ignored/dropped.

    Parameters

    • jobGID: string
    • OptionalmsgGID: string

    Returns void

  • unhandled activity errors (activities that return an ERROR StreamMessage status and have no adjacent children to transition to) are bound to the job

    Parameters

    • data: Record<string, unknown>

    Returns void

  • Executes the 3-step Leg1 protocol for Category B activities (Leg1-only with children, e.g., Hook passthrough, Signal, Interrupt-another). Uses the incoming Leg1 message GUID as the GUID ledger key.

    Step A: setState + notarizeLeg1Completion + step1 markers (transaction 1) Step B: publish children + step2 markers + setStatusAndCollateGuid (transaction 2) Step C: if edge → runJobCompletionTasks + step3 markers + finalize (transaction 3)

    Parameters

    • delta: number

    Returns Promise<boolean>

    true if this transition caused the job to complete

  • Executes the 3-step Leg2 protocol using GUID ledger for crash-safe resume. Each step bundles durable writes with its concluding digit update in a single transaction.

    Parameters

    • delta: number
    • shouldFinalize: boolean

    Returns Promise<boolean>

    true if this transition caused the job to complete

  • Leg1 entry verification for Category B activities (Leg1-only with children). Returns true if this is a resume (Leg1 already completed on a prior attempt). On resume, loads the GUID ledger for step-level resume decisions.

    Returns Promise<boolean>

  • Upon entering leg 2 of a duplexed activity. Increments both the activity ledger (+1) and GUID ledger (+1). Stores the GUID ledger value for step-level resume decisions.

    Returns Promise<number>