The MemFlow service provides a Temporal-compatible workflow framework backed by Postgres. It offers durable execution, entity-based memory management, and composable workflows.

Each workflow has a durable JSONB entity that serves as its memory:

export async function researchAgent(query: string) {
const agent = await MemFlow.workflow.entity();

// Initialize entity state
await agent.set({
query,
findings: [],
status: 'researching'
});

// Update state atomically
await agent.merge({ status: 'analyzing' });
await agent.append('findings', newFinding);
}

Spawn and coordinate multiple perspectives/phases:

// Launch parallel research perspectives
await MemFlow.workflow.execHook({
taskQueue: 'research',
workflowName: 'optimisticView',
args: [query],
signalId: 'optimistic-complete'
});

await MemFlow.workflow.execHook({
taskQueue: 'research',
workflowName: 'skepticalView',
args: [query],
signalId: 'skeptical-complete'
});

// Wait for both perspectives
await Promise.all([
MemFlow.workflow.waitFor('optimistic-complete'),
MemFlow.workflow.waitFor('skeptical-complete')
]);

Define and execute durable activities with automatic retry:

// Default: activities use workflow's task queue
const activities = MemFlow.workflow.proxyActivities<{
analyzeDocument: typeof analyzeDocument;
validateFindings: typeof validateFindings;
}>({
activities: { analyzeDocument, validateFindings },
retryPolicy: {
maximumAttempts: 3,
backoffCoefficient: 2
}
});

// Activities are durable and automatically retried
const analysis = await activities.analyzeDocument(data);
const validation = await activities.validateFindings(analysis);

Register activity workers explicitly before workflows start:

// Register shared activity pool for interceptors
await MemFlow.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'shared-activities'
}, sharedActivities, 'shared-activities');

// Register custom activity pool for specific use cases
await MemFlow.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'priority-activities'
}, priorityActivities, 'priority-activities');

Build complex workflows through composition:

// Start a child workflow
const childResult = await MemFlow.workflow.execChild({
taskQueue: 'analysis',
workflowName: 'detailedAnalysis',
args: [data],
// Child workflow config
config: {
maximumAttempts: 5,
backoffCoefficient: 2
}
});

// Fire-and-forget child workflow
await MemFlow.workflow.startChild({
taskQueue: 'notifications',
workflowName: 'sendUpdates',
args: [updates]
});

Add cross-cutting concerns through interceptors that run as durable functions:

// First register shared activity worker for interceptors
await MemFlow.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'interceptor-activities'
}, { auditLog }, 'interceptor-activities');

// Add audit interceptor that uses activities with explicit taskQueue
MemFlow.registerInterceptor({
async execute(ctx, next) {
try {
// Interceptors use explicit taskQueue to prevent per-workflow queues
const { auditLog } = MemFlow.workflow.proxyActivities<typeof activities>({
activities: { auditLog },
taskQueue: 'interceptor-activities', // Explicit shared queue
retryPolicy: { maximumAttempts: 3 }
});

await auditLog(ctx.get('workflowId'), 'started');

const result = await next();

await auditLog(ctx.get('workflowId'), 'completed');

return result;
} catch (err) {
// CRITICAL: Always check for HotMesh interruptions
if (MemFlow.didInterrupt(err)) {
throw err; // Rethrow for replay system
}
throw err;
}
}
});
import { Client, Worker, MemFlow } from '@hotmeshio/hotmesh';
import { Client as Postgres } from 'pg';
import * as activities from './activities';

// (Optional) Register shared activity workers for interceptors
await MemFlow.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'shared-activities'
}, sharedActivities, 'shared-activities');

// Initialize worker
await Worker.create({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'default',
workflow: workflows.example
});

// Initialize client
const client = new Client({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
}
});

// Start workflow
const handle = await client.workflow.start({
args: ['input data'],
taskQueue: 'default',
workflowName: 'example',
workflowId: MemFlow.guid()
});

// Get result
const result = await handle.result();

// Cleanup
await MemFlow.shutdown();

Properties

Client: typeof ClientService = ClientService

The MemFlow Client service is functionally equivalent to the Temporal Client service.

Connection: typeof ConnectionService = ConnectionService

The MemFlow Connection service is functionally equivalent to the Temporal Connection service.

Handle: typeof WorkflowHandleService = WorkflowHandleService

The Handle provides methods to interact with a running workflow. This includes exporting the workflow, sending signals, and querying the state of the workflow. An instance of the Handle service is typically accessed via the MemFlow.Client class (workflow.getHandle).

Worker: typeof WorkerService = WorkerService

The MemFlow Worker service is functionally equivalent to the Temporal Worker service.

didInterrupt: ((error: Error) => boolean) = didInterrupt

Checks if an error is a HotMesh reserved error type that indicates a workflow interruption rather than a true error condition.

Type declaration

    • (error): boolean
    • Checks if an error is a HotMesh reserved error type that indicates a HotMesh interruption rather than a true error condition.

      When this returns true, you can safely return rethrow the error. The workflow engine will handle the interruption automatically.

      Parameters

      • error: Error

        The error to check

      Returns boolean

      true if the error is a HotMesh interruption

      import { MemFlow } from '@hotmeshio/hotmesh';

      try {
      await someWorkflowOperation();
      } catch (error) {
      // Check if this is a HotMesh interruption
      if (MemFlow.workflow.didInterrupt(error)) {
      // Rethrow the error
      throw error;
      }
      // Handle actual error
      console.error('Workflow failed:', error);
      }

utils/interruption.didInterrupt for detailed documentation

guid: ((size?: number) => string) = guid

Generate a unique identifier for workflow IDs

registerActivityWorker: ((config: Partial<WorkerConfig>, activities: any, activityTaskQueue?: string) => Promise<HotMesh>) = WorkerService.registerActivityWorker

Register activity workers for a task queue. Activities execute via message queue and can run on different servers from workflows.

Type declaration

    • (config, activities, activityTaskQueue?): Promise<HotMesh>
    • Register activity workers for a task queue. Activities are invoked via message queue, so they can run on different servers from workflows.

      The task queue name gets -activity appended automatically for the worker topic. For example, taskQueue: 'payment' creates a worker listening on payment-activity.

      Parameters

      • config: Partial<WorkerConfig>

        Worker configuration (connection, namespace, taskQueue)

      • activities: any

        Activity functions to register

      • OptionalactivityTaskQueue: string

        Task queue name (without -activity suffix). Defaults to config.taskQueue if not provided.

      Returns Promise<HotMesh>

      Promise The initialized activity worker

      // Activity worker (can be on separate server)
      import { MemFlow } from '@hotmeshio/hotmesh';
      import { Client as Postgres } from 'pg';

      const activities = {
      async processPayment(amount: number): Promise<string> {
      return `Processed $${amount}`;
      },
      async sendEmail(to: string, subject: string): Promise<void> {
      // Send email
      }
      };

      await MemFlow.registerActivityWorker({
      connection: {
      class: Postgres,
      options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
      },
      taskQueue: 'payment' // Listens on 'payment-activity'
      }, activities, 'payment');
      // Workflow worker (can be on different server)
      async function orderWorkflow(orderId: string, amount: number) {
      const { processPayment, sendEmail } = MemFlow.workflow.proxyActivities<{
      processPayment: (amount: number) => Promise<string>;
      sendEmail: (to: string, subject: string) => Promise<void>;
      }>({
      taskQueue: 'payment',
      retryPolicy: { maximumAttempts: 3 }
      });

      const result = await processPayment(amount);
      await sendEmail('customer@example.com', 'Order confirmed');
      return result;
      }

      await MemFlow.Worker.create({
      connection: {
      class: Postgres,
      options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
      },
      taskQueue: 'orders',
      workflow: orderWorkflow
      });
      // Shared activity pool for interceptors
      await MemFlow.registerActivityWorker({
      connection: {
      class: Postgres,
      options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
      },
      taskQueue: 'shared'
      }, { auditLog, collectMetrics }, 'shared');

      const interceptor: WorkflowInterceptor = {
      async execute(ctx, next) {
      const { auditLog } = MemFlow.workflow.proxyActivities<{
      auditLog: (id: string, action: string) => Promise<void>;
      }>({
      taskQueue: 'shared',
      retryPolicy: { maximumAttempts: 3 }
      });
      await auditLog(ctx.get('workflowId'), 'started');
      return next();
      }
      };
// Activity worker
const activities = {
async processPayment(amount: number) { return `Processed $${amount}`; },
async sendEmail(to: string, msg: string) { /* ... */ }
};

await MemFlow.registerActivityWorker({
connection: {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
},
taskQueue: 'payment'
}, activities, 'payment');

// Workflow worker (can be on different server)
async function orderWorkflow(amount: number) {
const { processPayment, sendEmail } = MemFlow.workflow.proxyActivities<{
processPayment: (amount: number) => Promise<string>;
sendEmail: (to: string, msg: string) => Promise<void>;
}>({
taskQueue: 'payment',
retryPolicy: { maximumAttempts: 3 }
});

const result = await processPayment(amount);
await sendEmail('customer@example.com', result);
return result;
}

await MemFlow.Worker.create({
connection: { class: Postgres, options: { connectionString: '...' } },
taskQueue: 'orders',
workflow: orderWorkflow
});
workflow: typeof WorkflowService = WorkflowService

The MemFlow workflow service is functionally equivalent to the Temporal Workflow service with additional methods for managing workflows, including: execChild, waitFor, sleep, etc

Methods

  • Shutdown everything. All connections, workers, and clients will be closed. Include in your signal handlers to ensure a clean shutdown.

    Returns Promise<void>