The Worker service Registers worker functions and connects them to the mesh, using the target backend provider/s (Postgres, NATS, etc).

import { MemFlow } from '@hotmeshio/hotmesh';
import { Client as Postgres } from 'pg';
import * as workflows from './workflows';

async function run() {
const worker = await MemFlow.Worker.create({
connection: {
class: Postgres,
options: { connectionString: 'postgres://user:password@localhost:5432/db' }
},
taskQueue: 'default',
workflow: workflows.example,
});

await worker.run();
}

Methods

  • Connects a worker to the mesh.

    Parameters

    Returns Promise<WorkerService>

    import { MemFlow } from '@hotmeshio/hotmesh';
    import { Client as Postgres } from 'pg';
    import * as workflows from './workflows';

    async function run() {
    const worker = await MemFlow.Worker.create({
    connection: {
    class: Postgres,
    options: {
    connectionString: 'postgres://user:password@localhost:5432/db'
    },
    },
    taskQueue: 'default',
    workflow: workflows.example,
    });

    await worker.run();
    }
  • Register activity workers for a task queue. Activities are invoked via message queue, so they can run on different servers from workflows.

    The task queue name gets -activity appended automatically for the worker topic. For example, taskQueue: 'payment' creates a worker listening on payment-activity.

    Parameters

    • config: Partial<WorkerConfig>

      Worker configuration (connection, namespace, taskQueue)

    • activities: any

      Activity functions to register

    • OptionalactivityTaskQueue: string

      Task queue name (without -activity suffix). Defaults to config.taskQueue if not provided.

    Returns Promise<HotMesh>

    Promise The initialized activity worker

    // Activity worker (can be on separate server)
    import { MemFlow } from '@hotmeshio/hotmesh';
    import { Client as Postgres } from 'pg';

    const activities = {
    async processPayment(amount: number): Promise<string> {
    return `Processed $${amount}`;
    },
    async sendEmail(to: string, subject: string): Promise<void> {
    // Send email
    }
    };

    await MemFlow.registerActivityWorker({
    connection: {
    class: Postgres,
    options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
    },
    taskQueue: 'payment' // Listens on 'payment-activity'
    }, activities, 'payment');
    // Workflow worker (can be on different server)
    async function orderWorkflow(orderId: string, amount: number) {
    const { processPayment, sendEmail } = MemFlow.workflow.proxyActivities<{
    processPayment: (amount: number) => Promise<string>;
    sendEmail: (to: string, subject: string) => Promise<void>;
    }>({
    taskQueue: 'payment',
    retryPolicy: { maximumAttempts: 3 }
    });

    const result = await processPayment(amount);
    await sendEmail('customer@example.com', 'Order confirmed');
    return result;
    }

    await MemFlow.Worker.create({
    connection: {
    class: Postgres,
    options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
    },
    taskQueue: 'orders',
    workflow: orderWorkflow
    });
    // Shared activity pool for interceptors
    await MemFlow.registerActivityWorker({
    connection: {
    class: Postgres,
    options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
    },
    taskQueue: 'shared'
    }, { auditLog, collectMetrics }, 'shared');

    const interceptor: WorkflowInterceptor = {
    async execute(ctx, next) {
    const { auditLog } = MemFlow.workflow.proxyActivities<{
    auditLog: (id: string, action: string) => Promise<void>;
    }>({
    taskQueue: 'shared',
    retryPolicy: { maximumAttempts: 3 }
    });
    await auditLog(ctx.get('workflowId'), 'started');
    return next();
    }
    };