Database maintenance operations for HotMesh's Postgres backend.

HotMesh uses soft-delete patterns: expired jobs and stream messages retain their rows with expired_at set but are never physically removed during normal operation. Over time, three tables accumulate dead rows:

Table What accumulates
{appId}.jobs Completed/expired jobs with expired_at set
{appId}.jobs_attributes Execution artifacts (adata, hmark, status, other) that are only needed during workflow execution
{appId}.engine_streams Processed engine stream messages with expired_at set
{appId}.worker_streams Processed worker stream messages with expired_at set

The DBA service addresses this with two methods:

  • prune() — Targets any combination of jobs, streams, and attributes independently. Each table can be pruned on its own schedule with its own retention window.
  • deploy() — Pre-deploys the Postgres function (e.g., during CI/CD migrations) without running a prune.

Stripping removes adata, hmark, status, and other attributes from completed jobs while preserving:

  • jdata — workflow return data
  • udata — user-searchable data
  • jmark — timeline markers needed for workflow execution export

Set keepHmark: true to also preserve hmark (activity state markers).

Use entities to restrict pruning/stripping to specific entity types (e.g., ['book', 'author']). Use pruneTransient to delete expired jobs with no entity (entity IS NULL).

After stripping, jobs are marked with pruned_at = NOW(). Subsequent prune calls skip already-pruned jobs, making the operation idempotent and efficient for repeated scheduling.

import { Client as Postgres } from 'pg';
import { DBA } from '@hotmeshio/hotmesh';

const connection = {
class: Postgres,
options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' },
};

// Cron 1 — Nightly: strip execution artifacts from completed jobs
await DBA.prune({
appId: 'myapp', connection,
jobs: false, streams: false, attributes: true,
});

// Cron 2 — Hourly: aggressive engine stream cleanup, conservative worker streams
await DBA.prune({
appId: 'myapp', connection,
jobs: false, attributes: false,
engineStreams: true,
engineStreamsExpire: '24 hours',
workerStreams: true,
workerStreamsExpire: '90 days', // preserve for export fidelity
});

// Cron 3 — Weekly: remove expired 'book' jobs older than 30 days
await DBA.prune({
appId: 'myapp', connection,
expire: '30 days',
jobs: true, streams: false,
entities: ['book'],
});

// Cron 4 — Weekly: remove transient (no entity) expired jobs
await DBA.prune({
appId: 'myapp', connection,
expire: '7 days',
jobs: false, streams: false,
pruneTransient: true,
});

The underlying Postgres function can be called directly, without the TypeScript SDK. The first 4 parameters are backwards-compatible:

-- Strip attributes only (keep all jobs and streams)
SELECT * FROM myapp.prune('0 seconds', false, false, true);

-- Prune only 'book' entity jobs older than 30 days
SELECT * FROM myapp.prune('30 days', true, false, false, ARRAY['book']);

-- Prune everything older than 7 days and strip attributes
SELECT * FROM myapp.prune('7 days', true, true, true);

-- Independent stream retention: engine 24h, worker 90 days
SELECT * FROM myapp.prune(
'7 days', true, false, false, NULL, false, false,
true, true, -- prune_engine_streams, prune_worker_streams
INTERVAL '24 hours', INTERVAL '90 days' -- per-table retention
);

Methods

Methods

  • Deploys the prune() Postgres function into the target schema. Also runs schema migrations (e.g., adding pruned_at column). Idempotent — uses CREATE OR REPLACE and IF NOT EXISTS.

    The function is automatically deployed when DBA.prune is called, but this method is exposed for explicit control (e.g., CI/CD migration scripts that provision database objects before the application starts).

    Parameters

    • connection: ProviderConfig

      Postgres provider configuration

    • appId: string

      Application identifier (schema name)

    Returns Promise<void>

    import { Client as Postgres } from 'pg';
    import { DBA } from '@hotmeshio/hotmesh';

    // Pre-deploy during CI/CD migration
    await DBA.deploy(
    {
    class: Postgres,
    options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
    },
    'myapp',
    );
  • Prunes expired data and/or strips execution artifacts from completed jobs. Each operation is independently controlled, so callers can target a single table per cron schedule.

    Operations (each enabled individually):

    1. jobs — Hard-deletes expired jobs older than the retention window (FK CASCADE removes their attributes automatically). Scoped by entities when set.
    2. streams — Hard-deletes expired stream messages older than the retention window
    3. attributes — Strips non-essential attributes (adata, hmark, status, other) from completed, un-pruned jobs. Preserves jdata, udata, and jmark. Marks stripped jobs with pruned_at for idempotency.
    4. pruneTransient — Deletes expired jobs with entity IS NULL

    Parameters

    Returns Promise<PruneResult>

    Counts of deleted/stripped rows

    import { Client as Postgres } from 'pg';
    import { DBA } from '@hotmeshio/hotmesh';

    // Strip attributes from 'book' entities only
    await DBA.prune({
    appId: 'myapp',
    connection: {
    class: Postgres,
    options: { connectionString: 'postgresql://usr:pwd@localhost:5432/db' }
    },
    jobs: false,
    streams: false,
    attributes: true,
    entities: ['book'],
    });