The MemFlow service provides a Temporal-compatible workflow framework backed by Postgres. It offers durable execution, entity-based memory management, and composable workflows.

Each workflow has a durable JSONB entity that serves as its memory:

export async function researchAgent(query: string) {
const agent = await MemFlow.workflow.entity();

// Initialize entity state
await agent.set({
query,
findings: [],
status: 'researching'
});

// Update state atomically
await agent.merge({ status: 'analyzing' });
await agent.append('findings', newFinding);
}

Spawn and coordinate multiple perspectives/phases:

// Launch parallel research perspectives
await MemFlow.workflow.execHook({
taskQueue: 'research',
workflowName: 'optimisticView',
args: [query],
signalId: 'optimistic-complete'
});

await MemFlow.workflow.execHook({
taskQueue: 'research',
workflowName: 'skepticalView',
args: [query],
signalId: 'skeptical-complete'
});

// Wait for both perspectives
await Promise.all([
MemFlow.workflow.waitFor('optimistic-complete'),
MemFlow.workflow.waitFor('skeptical-complete')
]);

Define and execute durable activities with automatic retry:

const activities = MemFlow.workflow.proxyActivities<{
analyzeDocument: typeof analyzeDocument;
validateFindings: typeof validateFindings;
}>({
activities: { analyzeDocument, validateFindings },
retryPolicy: {
maximumAttempts: 3,
backoffCoefficient: 2
}
});

// Activities are durable and automatically retried
const analysis = await activities.analyzeDocument(data);
const validation = await activities.validateFindings(analysis);

Build complex workflows through composition:

// Start a child workflow
const childResult = await MemFlow.workflow.execChild({
taskQueue: 'analysis',
workflowName: 'detailedAnalysis',
args: [data],
// Child workflow config
config: {
maximumAttempts: 5,
backoffCoefficient: 2
}
});

// Fire-and-forget child workflow
await MemFlow.workflow.startChild({
taskQueue: 'notifications',
workflowName: 'sendUpdates',
args: [updates]
});

Add cross-cutting concerns through interceptors that run as durable functions:

// Add audit interceptor that uses MemFlow functions
MemFlow.registerInterceptor({
async execute(ctx, next) {
try {
// Interceptors can use MemFlow functions and participate in replay
const entity = await MemFlow.workflow.entity();
await entity.append('auditLog', {
action: 'started',
timestamp: new Date().toISOString()
});

// Rate limiting with durable sleep
await MemFlow.workflow.sleepFor('100 milliseconds');

const result = await next();

await entity.append('auditLog', {
action: 'completed',
timestamp: new Date().toISOString()
});

return result;
} catch (err) {
// CRITICAL: Always check for HotMesh interruptions
if (MemFlow.didInterrupt(err)) {
throw err; // Rethrow for replay system
}
throw err;
}
}
});
import { Client, Worker, MemFlow } from '@hotmeshio/hotmesh';
import { Client as Postgres } from 'pg';

// Initialize worker
await Worker.create({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'default',
workflow: workflows.example
});

// Initialize client
const client = new Client({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
}
});

// Start workflow
const handle = await client.workflow.start({
args: ['input data'],
taskQueue: 'default',
workflowName: 'example',
workflowId: MemFlow.guid()
});

// Get result
const result = await handle.result();

// Cleanup
await MemFlow.shutdown();

Properties

Client: typeof ClientService = ClientService

The MemFlow Client service is functionally equivalent to the Temporal Client service.

Connection: typeof ConnectionService = ConnectionService

The MemFlow Connection service is functionally equivalent to the Temporal Connection service.

Handle: typeof WorkflowHandleService = WorkflowHandleService

The Handle provides methods to interact with a running workflow. This includes exporting the workflow, sending signals, and querying the state of the workflow. An instance of the Handle service is typically accessed via the MemFlow.Client class (workflow.getHandle).

Worker: typeof WorkerService = WorkerService

The MemFlow Worker service is functionally equivalent to the Temporal Worker service.

didInterrupt: ((error: Error) => boolean) = didInterrupt

Checks if an error is a HotMesh reserved error type that indicates a workflow interruption rather than a true error condition.

Type declaration

    • (error): boolean
    • Checks if an error is a HotMesh reserved error type that indicates a HotMesh interruption rather than a true error condition.

      When this returns true, you can safely return rethrow the error. The workflow engine will handle the interruption automatically.

      Parameters

      • error: Error

        The error to check

      Returns boolean

      true if the error is a HotMesh interruption

      import { MemFlow } from '@hotmeshio/hotmesh';

      try {
      await someWorkflowOperation();
      } catch (error) {
      // Check if this is a HotMesh interruption
      if (MemFlow.workflow.didInterrupt(error)) {
      // Rethrow the error
      throw error;
      }
      // Handle actual error
      console.error('Workflow failed:', error);
      }

utils/interruption.didInterrupt for detailed documentation

workflow: typeof WorkflowService = WorkflowService

The MemFlow workflow service is functionally equivalent to the Temporal Workflow service with additional methods for managing workflows, including: execChild, waitFor, sleep, etc

Methods

  • Shutdown everything. All connections, workers, and clients will be closed. Include in your signal handlers to ensure a clean shutdown.

    Returns Promise<void>