A functional data-transformation pipeline that resolves expressions row-by-row against live job data.

Pipe is the engine behind HotMesh's @pipe syntax — a uniform, functional approach to data mapping and transformation. Every ECMAScript operation is expressed as a function with inputs, eliminating the syntactic variability of the language (ternaries, property access, instance methods) in favor of a single, composable pattern.

If you've used an RPN calculator, @pipe will feel familiar. Operands come first, then the operator consumes them:

RPN:   4  4  +   →  8

@pipe:
- [4, 4] ← operands
- ["{@math.add}"] ← operator8

That's the entire model. Operands are pushed, an operator pops them, and the result becomes an operand for the next row.

A pipe is an ordered array of rows. Each row is an array of cells. The key rule: every resolved value on a row flows into cell 0 of the next row (the operator). The operator consumes all upstream operands, resolves to a new value, and that result becomes an operand. Any remaining cells on the same row are resolved independently and appended as additional operands. Then the cycle repeats.

@pipe:
┌──────────────────────────────────────┐
Row 0: [operand, operand, operand] │ ← resolve all cells
│ │ │ │ │
│ └────────┼────────┘ │
│ │ │
├─────────────────────┼────────────────┤
│ ▼ │
Row 1: [ OPERATOR , operand] │ ← ALL operands from Row 0
│ ▲ receives all ▲ │ flow into cell 0;
│ │ from above │ │ result becomes operand;
│ └──────────────┘ │ remaining cells resolve
│ │ │ │
│ └────────────┘ │
│ │ │
├─────────────────────┼────────────────┤
│ ▼ │
Row 2: [ OPERATOR , operand] │ ← same pattern repeats
└──────────────────────────────────────┘

final value = cell 0 of last row

Step by step:

  1. Row 0 — Every cell is resolved independently (static literals, {data.*} references, or nullary functions like {@date.now}). The resolved values are all operands.
  2. Row 1 — Cell 0 is the operator ({@domain.method}). It receives all operands from Row 0 as its arguments and produces a result. That result replaces cell 0 — it is now an operand. Any remaining cells (cell 1, 2, ...) on this row are resolved independently and become additional operands.
  3. Row 2 — Cell 0 is the next operator. It receives all operands from Row 1 (the prior result + any extra cells). The cycle repeats.
  4. Return — The first cell of the final row is the result.

When an additional cell on an operator row needs computed (not just a static value or simple reference), use a nested @pipe (sub-pipe) to resolve it. See Example 4 below.

Type Syntax Example
Static literal 42, "hello", true
Dynamic {path} {a.output.data.name}
Function {@domain.method} {@string.concat}, {@math.add}
  • arrayget, length, join, concat, push, indexOf, ...
  • bitwiseand, or, xor, not, ...
  • conditionalternary, equality, nullish, ...
  • cronnextDelay
  • datenow, toLocaleString, yyyymmdd, ...
  • jsonparse, stringify
  • logicaland, or, not
  • mathadd, multiply, pow, max, min, abs, ...
  • numberisEven, isOdd, gte, lte, ...
  • objectcreate, get, set, keys, ...
  • stringconcat, split, charAt, toLowerCase, includes, ...
  • symbol
  • unary

Most fields need only a one-to-one reference. Curly braces pull values from upstream activity outputs:

# Map fields from activities a and b into a new shape
maps:
first: "{a.output.data.first_name}"
last: "{a.output.data.last_name}"
email: "{b.output.data.email}"
age: "{b.output.data.age}"
company: "ACME Corp" # static string
bonus: 500 # static number

The equivalent JavaScript object passed to the mapper:

const rules = {
first: '{a.output.data.first_name}',
last: '{a.output.data.last_name}',
email: '{b.output.data.email}',
age: '{b.output.data.age}',
company: 'ACME Corp',
bonus: 500,
};

Build a user_name like jdoe from "John Doe". Follow the RPN flow — operands feed into the operator on the next row, the result becomes an operand, extra cells append more operands:

user_name:
"@pipe":
- ["{a.output.data.first_name}", 0]
- ["{@string.charAt}", "{a.output.data.last_name}"]
- ["{@string.concat}"]
- ["{@string.toLowerCase}"]
// Identical logic in JavaScript
const rules = {
user_name: {
'@pipe': [
['{a.output.data.first_name}', 0],
['{@string.charAt}', '{a.output.data.last_name}'],
['{@string.concat}'],
['{@string.toLowerCase}'],
],
},
};

RPN trace (given first_name="John", last_name="Doe"):

Row 0: ["John", 0]            ← two operands
Row 1: charAt("John", 0)="J"operator consumes bothresult "J"
then resolve "Doe"extra celloperands are ["J", "Doe"]
Row 2: concat("J", "Doe") ← operatorresult "JDoe"
operands are ["JDoe"]
Row 3: toLowerCase("JDoe") ← operatorresult "jdoe"

Classify an employee as "Senior" or "Junior" based on age:

status:
"@pipe":
- ["{b.output.data.age}", 40]
- ["{@number.gte}", "Senior", "Junior"]
- ["{@conditional.ternary}"]
const rules = {
status: {
'@pipe': [
['{b.output.data.age}', 40],
['{@number.gte}', 'Senior', 'Junior'],
['{@conditional.ternary}'],
],
},
};

RPN trace (given age=30):

Row 0: [30, 40]                          ← two operands
Row 1: gte(30, 40)=falseoperator consumes bothfalse
resolve "Senior", "Junior"extra cells → [false, "Senior", "Junior"]
Row 2: ternary(false, "Senior", "Junior") ← operator"Junior"

Extract initials from a full name. Each nested @pipe runs independently (fan-out), then the first standard row after them receives all sub-pipe results as inputs (fan-in):

initials:
"@pipe":
- ["{a.output.data.full_name}", " "]
- "@pipe": # fan-out: first initial
- ["{@string.split}", 0]
- ["{@array.get}", 0]
- ["{@string.charAt}"]
- "@pipe": # fan-out: last initial
- ["{@string.split}", 1]
- ["{@array.get}", 0]
- ["{@string.charAt}"]
- ["{@string.concat}"] # fan-in: combine both
const rules = {
initials: {
'@pipe': [
['{a.output.data.full_name}', ' '],
{ '@pipe': [['{@string.split}', 0], ['{@array.get}', 0], ['{@string.charAt}']] },
{ '@pipe': [['{@string.split}', 1], ['{@array.get}', 0], ['{@string.charAt}']] },
['{@string.concat}'],
],
},
};
// "Luke Birdeau" → split → ["Luke","Birdeau"]
// sub-pipe 1: get(0) → "Luke" → charAt(0) → "L"
// sub-pipe 2: get(1) → "Birdeau" → charAt(0) → "B"
// fan-in: concat("L", "B") → "LB"

Transform an array of { full_name } objects into { first, last } pairs using @reduce. Context variables {$item}, {$key}, {$index}, {$input}, and {$output} are available inside the reducer body:

const jobData = {
a: { output: { data: [
{ full_name: 'Luke Birdeau' },
{ full_name: 'John Doe' },
]}}
};

const rules = [
['{a.output.data}', []], // input array + initial accumulator
{ '@reduce': [
{ '@pipe': [['{$output}']] }, // carry forward accumulator
{ '@pipe': [
{ '@pipe': [['first']] },
{ '@pipe': [
['{$item.full_name}', ' '],
['{@string.split}', 0],
['{@array.get}'],
]},
{ '@pipe': [['last']] },
{ '@pipe': [
['{$item.full_name}', ' '],
['{@string.split}', 1],
['{@array.get}'],
]},
['{@object.create}'],
]},
['{@array.push}'],
]},
];
// => [{ first: 'Luke', last: 'Birdeau' }, { first: 'John', last: 'Doe' }]

Pipes power conditional transitions between workflow activities. In YAML, a transition fires only when the @pipe resolves to the expected value:

transitions:
t1:
- to: a1
conditions:
match:
- expected: false
actual:
"@pipe":
- ["{t1.output.data.a}", "goodbye"]
- ["{@conditional.equality}"]

HotMesh ships with an inline YAML parser, so a complete workflow with data mapping can be deployed in a single call:

await hotMesh.deploy(`
app:
id: myapp
version: '1'
graphs:
- subscribes: order.process
activities:
t1:
type: trigger
a1:
type: worker
topic: inventory.check
input:
maps:
itemId: "{t1.output.data.itemId}"
output:
schema:
type: object
job:
maps:
result: "{$self.output.data.available}"
transitions:
t1:
- to: a1
`);

await hotMesh.activate('1');
const response = await hotMesh.pubsub('order.process', { itemId: 'sku-42' });

Constructors

  • Parameters

    • rules: Pipe

      The ordered row array defining the pipeline.

    • jobData: JobData

      The current job data used to resolve {data.*} and {activity.*} references.

    • Optionalcontext: PipeContext

      Optional iteration context ($item, $key, $output, etc.) supplied during @reduce execution.

    Returns Pipe

Properties

context: PipeContext
jobData: JobData
rules: Pipe

Methods

  • Executes the pipeline row-by-row, resolving and transforming until the final value is produced.

    Parameters

    • resolved: unknown[] = null

      Optional pre-resolved seed values (used internally by reduce to inject the accumulator).

    Returns any

    The first cell of the final resolved row.

    Row 0 is resolved independently (values only). Each subsequent row feeds the prior row's resolved output as arguments to the function named in cell 0. Nested @pipe rows are queued (fan-out) and collected into the next standard row (fan-in). @reduce rows iterate over the prior row's array output.

    // Split a full name and grab the first element
    const pipe = new Pipe(
    [['{a.output.data.full_name}', ' '], ['{@string.split}', 0], ['{@array.get}']],
    { a: { output: { data: { full_name: 'Luke Birdeau' } } } },
    );
    pipe.process(); // => 'Luke'
  • Resolves every cell in a single row independently — each cell is evaluated for function calls, context variables, or mappable references.

    Parameters

    Returns unknown[]

    An array of resolved values, one per input cell.

  • Resolves a single cell value by detecting its type — function call ({@domain.fn}), context variable ({$item}, {$key}, etc.), mappable reference ({data.*}), or literal.

    Parameters

    • currentCell: PipeItem

      The cell to resolve.

    Returns unknown

    The resolved runtime value.

  • Resolves a {data.*} or {activity.*} reference by walking the dot-delimited path into jobData.

    Parameters

    • currentCell: string

      The mappable reference string (e.g. {data.user.name}).

    Returns unknown

  • Returns true if the value is a @pipe object (i.e. { '@pipe': [...] }).

    Parameters

    • obj: PipeItem | {
          [key: string]: unknown;
      }

      The value to test.

    Returns boolean

  • One-shot convenience method that resolves a single value or @pipe expression against the given context.

    Parameters

    • unresolved: PipeItem | {
          [key: string]: unknown;
      }

      A literal value, a {data.*} reference string, or a @pipe object.

    • context: Partial<JobState>

      Partial JobState used for resolution.

    Returns any

    The fully resolved value.

    Pipe.resolve('{data.user.email}', jobState);
    Pipe.resolve({ '@pipe': [['{data.a}', '{data.b}'], ['{@math.multiply}']] }, jobState);
  • Looks up a domain function by its {@domain.method} name string and returns the callable.

    Parameters

    • functionName: string

      A string like {@math.add} or {@string.concat}.

    Returns any

    The resolved function reference.

    If the domain or method is not registered.