Prune Old Data
As your workflows accumulate, pgflow tables gather historical data. While valuable for auditing, keeping this data indefinitely can impact performance and storage costs.
Using the Pruning Function
Section titled “Using the Pruning Function”pgflow includes a pruning function that accepts an INTERVAL parameter specifying how much data to keep:
pgflow.prune_data_older_than(retention_interval INTERVAL)
Examples:
-- Keep 30 days of data using make_intervalSELECT pgflow.prune_data_older_than(make_interval(days => 30));
-- Keep 7 days of dataSELECT pgflow.prune_data_older_than(make_interval(days => 7));
-- Keep 3 months of data using interval literalsSELECT pgflow.prune_data_older_than(INTERVAL '3 months');
-- Keep 2 weeksSELECT pgflow.prune_data_older_than(INTERVAL '2 weeks');
Running periodically
Section titled “Running periodically”You can use pg_cron to schedule automated pruning.
1. Install the pruning function
Section titled “1. Install the pruning function”Create a new migration and paste contents of the pruning function,
then run supabase migrations up
.
2. Setup pg_cron schedule
Section titled “2. Setup pg_cron schedule”Run it in Supabase Studio or include in a migration file:
-- Schedule weekly pruning (every Sunday at 2 AM)-- This keeps 28 days of data (adjust as needed)SELECT cron.schedule( 'pgflow-prune-weekly', '0 2 * * 0', -- cron expression: minute hour day month weekday $$SELECT pgflow.prune_data_older_than(make_interval(days => 28))$$);
Verify the scheduled job
Section titled “Verify the scheduled job”SELECT * FROM cron.job;
The Pruning Function
Section titled “The Pruning Function”Add this SQL function to a migration file:
/** * Prunes old records from pgflow tables and PGMQ archive tables. * * @param retention_interval - Interval of recent records to keep (e.g., interval '28 days', interval '3 months') */create or replace function pgflow.prune_data_older_than( retention_interval INTERVAL) returns void language plpgsql as $$DECLARE cutoff_timestamp TIMESTAMPTZ := now() - retention_interval; flow_record RECORD; archive_table TEXT; dynamic_sql TEXT;BEGIN -- Delete old worker records DELETE FROM pgflow.workers WHERE last_heartbeat_at < cutoff_timestamp;
-- Delete old step_tasks records DELETE FROM pgflow.step_tasks WHERE ( (completed_at IS NOT NULL AND completed_at < cutoff_timestamp) OR (failed_at IS NOT NULL AND failed_at < cutoff_timestamp) );
-- Delete old step_states records DELETE FROM pgflow.step_states WHERE ( (completed_at IS NOT NULL AND completed_at < cutoff_timestamp) OR (failed_at IS NOT NULL AND failed_at < cutoff_timestamp) );
-- Delete old runs records DELETE FROM pgflow.runs WHERE ( (completed_at IS NOT NULL AND completed_at < cutoff_timestamp) OR (failed_at IS NOT NULL AND failed_at < cutoff_timestamp) );
-- Prune archived messages from PGMQ archive tables (pgmq.a_{flow_slug}) -- For each flow, delete old archived messages FOR flow_record IN SELECT DISTINCT flow_slug FROM pgflow.flows LOOP -- Build the archive table name archive_table := pgmq.format_table_name(flow_record.flow_slug, 'a');
-- Check if the archive table exists IF EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = 'pgmq' AND table_name = archive_table ) THEN -- Build and execute a dynamic SQL statement to delete old archive records dynamic_sql := format(' DELETE FROM pgmq.%I WHERE archived_at < $1 ', archive_table);
EXECUTE dynamic_sql USING cutoff_timestamp; END IF; END LOOP;END$$;