Skip to content

Prune Old Data

As your workflows accumulate, pgflow tables gather historical data. While valuable for auditing, keeping this data indefinitely can impact performance and storage costs.

pgflow includes a pruning function that accepts an INTERVAL parameter specifying how much data to keep:

pgflow.prune_data_older_than(retention_interval INTERVAL)

Examples:

-- Keep 30 days of data using make_interval
SELECT pgflow.prune_data_older_than(make_interval(days => 30));
-- Keep 7 days of data
SELECT pgflow.prune_data_older_than(make_interval(days => 7));
-- Keep 3 months of data using interval literals
SELECT pgflow.prune_data_older_than(INTERVAL '3 months');
-- Keep 2 weeks
SELECT pgflow.prune_data_older_than(INTERVAL '2 weeks');

You can use pg_cron to schedule automated pruning.

Create a new migration and paste contents of the pruning function, then run supabase migrations up.

Run it in Supabase Studio or include in a migration file:

-- Schedule weekly pruning (every Sunday at 2 AM)
-- This keeps 28 days of data (adjust as needed)
SELECT cron.schedule(
'pgflow-prune-weekly',
'0 2 * * 0', -- cron expression: minute hour day month weekday
$$SELECT pgflow.prune_data_older_than(make_interval(days => 28))$$
);
SELECT * FROM cron.job;

Add this SQL function to a migration file:

/**
* Prunes old records from pgflow tables and PGMQ archive tables.
*
* @param retention_interval - Interval of recent records to keep (e.g., interval '28 days', interval '3 months')
*/
create or replace function pgflow.prune_data_older_than(
retention_interval INTERVAL
) returns void language plpgsql as $$
DECLARE
cutoff_timestamp TIMESTAMPTZ := now() - retention_interval;
flow_record RECORD;
archive_table TEXT;
dynamic_sql TEXT;
BEGIN
-- Delete old worker records
DELETE FROM pgflow.workers
WHERE last_heartbeat_at < cutoff_timestamp;
-- Delete old step_tasks records
DELETE FROM pgflow.step_tasks
WHERE (
(completed_at IS NOT NULL AND completed_at < cutoff_timestamp) OR
(failed_at IS NOT NULL AND failed_at < cutoff_timestamp)
);
-- Delete old step_states records
DELETE FROM pgflow.step_states
WHERE (
(completed_at IS NOT NULL AND completed_at < cutoff_timestamp) OR
(failed_at IS NOT NULL AND failed_at < cutoff_timestamp)
);
-- Delete old runs records
DELETE FROM pgflow.runs
WHERE (
(completed_at IS NOT NULL AND completed_at < cutoff_timestamp) OR
(failed_at IS NOT NULL AND failed_at < cutoff_timestamp)
);
-- Prune archived messages from PGMQ archive tables (pgmq.a_{flow_slug})
-- For each flow, delete old archived messages
FOR flow_record IN SELECT DISTINCT flow_slug FROM pgflow.flows
LOOP
-- Build the archive table name
archive_table := pgmq.format_table_name(flow_record.flow_slug, 'a');
-- Check if the archive table exists
IF EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'pgmq' AND table_name = archive_table
) THEN
-- Build and execute a dynamic SQL statement to delete old archive records
dynamic_sql := format('
DELETE FROM pgmq.%I
WHERE archived_at < $1
', archive_table);
EXECUTE dynamic_sql USING cutoff_timestamp;
END IF;
END LOOP;
END
$$;