TypeScript Client
This guide shows how to use @pgflow/client to start pgflow workflows from TypeScript applications (browsers, Node.js, Deno, React Native) and stream real-time progress updates.
The client can also monitor flows started by Supabase RPC, pg_cron, or database triggers using getRun().
When to Use This Approach
Section titled “When to Use This Approach”- Real-time progress updates - Show live workflow progress to users in browsers or apps
- AI generation progress bars - Display streaming updates during content generation
- Multi-step workflow visualization - Track individual step progress in dashboards
- Interactive applications - Build UIs that respond to workflow status changes
- Monitor flows from any source - Track workflows started by RPC, cron, or triggers
Installation
Section titled “Installation”Install the client package in your TypeScript project:
npm install @pgflow/clientInitialize the client with your Supabase client:
import { createClient } from '@supabase/supabase-js';import { PgflowClient, FlowRunStatus } from '@pgflow/client';
const supabase = createClient( 'https://your-project.supabase.co', 'your-anon-key');
const pgflow = new PgflowClient(supabase);Starting and Waiting for Workflows
Section titled “Starting and Waiting for Workflows”The simplest pattern is to start a workflow and wait for it to complete:
// Start the workflowconst run = await pgflow.startFlow('analyze_website', { url: 'https://example.com',});
console.log(`Started workflow: ${run.run_id}`);
// Wait for completionconst completed = await run.waitForStatus(FlowRunStatus.Completed, { timeoutMs: 30000, // 30 second timeout});
console.log('Analysis complete:', completed.output);Handling Success or Failure
Section titled “Handling Success or Failure”Wait for any terminal status to handle both success and failure cases:
const run = await pgflow.startFlow('process_data', { file: 'data.csv' });
// Wait for either completed or failedconst terminal = await run.waitForStatus([ FlowRunStatus.Completed, FlowRunStatus.Failed,]);
if (terminal.status === FlowRunStatus.Completed) { console.log('Success:', terminal.output);} else { console.error('Failed:', terminal.error_message);}Monitoring Individual Steps
Section titled “Monitoring Individual Steps”Track the progress of specific steps within a workflow:
import { FlowStepStatus } from '@pgflow/client';
const run = await pgflow.startFlow('data_pipeline', { source: 'api' });
// Monitor the extraction stepconst extractStep = run.step('extract_data');await extractStep.waitForStatus(FlowStepStatus.Completed);console.log('Extraction complete:', extractStep.output);
// Monitor the transformation stepconst transformStep = run.step('transform_data');await transformStep.waitForStatus(FlowStepStatus.Completed);console.log('Transformation complete:', transformStep.output);Real-Time Event Subscriptions
Section titled “Real-Time Event Subscriptions”Subscribe to events for live progress updates without polling:
const run = await pgflow.startFlow('long_running_job', { task: 'process' });
// Listen to all run eventsrun.on('*', (event) => { console.log(`Status changed to: ${event.status}`);});
// Listen to specific eventsrun.on('completed', (event) => { console.log('Job completed:', event.output);});
run.on('failed', (event) => { console.error('Job failed:', event.error_message);});
// Unsubscribe when doneconst unsubscribe = run.on('completed', handler);// Later...unsubscribe();Step-Level Events
Section titled “Step-Level Events”Monitor progress of individual steps in real-time:
const run = await pgflow.startFlow('multi_step_flow', { data: 'input' });
const step = run.step('critical_step');
step.on('started', (event) => { console.log(`Step started at: ${event.started_at}`);});
step.on('completed', (event) => { console.log('Step output:', event.output);});
step.on('failed', (event) => { console.error('Step error:', event.error_message);});Monitoring Multiple Steps in Parallel
Section titled “Monitoring Multiple Steps in Parallel”Track several steps concurrently:
const run = await pgflow.startFlow('parallel_pipeline', { input: 'data' });
// Wait for multiple steps to completeconst stepSlugs = ['process_a', 'process_b', 'process_c'];const stepPromises = stepSlugs.map((slug) => run.step(slug).waitForStatus(FlowStepStatus.Completed));
// All steps completeawait Promise.all(stepPromises);console.log('All parallel steps finished');
// Or race for the first completionconst firstComplete = await Promise.race(stepPromises);console.log('First step completed:', firstComplete.step_slug);Handling Workflow Failures
Section titled “Handling Workflow Failures”Observing Failure Events
Section titled “Observing Failure Events”Listen for failure events to handle them as they occur:
const run = await pgflow.startFlow('risky_workflow', { data: 'test' });
// React immediately when workflow failsrun.on('failed', (event) => { console.error('Workflow failed:', event.error_message); // Handle failure (alert, log, compensate, etc.)});
run.on('completed', (event) => { console.log('Workflow completed:', event.output);});Waiting for Success or Failure
Section titled “Waiting for Success or Failure”Wait for terminal status and handle the outcome:
const run = await pgflow.startFlow('risky_workflow', { data: 'test' });
const result = await run.waitForStatus([ FlowRunStatus.Completed, FlowRunStatus.Failed,]);
if (result.status === FlowRunStatus.Failed) { console.error('Workflow failed:', result.error_message); // Handle failure (alert, log, compensate, etc.)} else { console.log('Workflow completed:', result.output);}Canceling Wait Operations
Section titled “Canceling Wait Operations”Use AbortSignal to cancel waiting operations when needed:
const controller = new AbortController();
// Start waiting with abort signalconst waitPromise = run.waitForStatus(FlowRunStatus.Completed, { timeoutMs: 60000, signal: controller.signal,});
// Cancel if user navigates away (browser example)window.addEventListener('beforeunload', () => { controller.abort();});
// Or cancel on user actiondocument.getElementById('cancel-btn')?.addEventListener('click', () => { controller.abort();});
try { await waitPromise; console.log('Workflow completed:', run.output);} catch (error) { if (error.message.includes('Aborted')) { console.log('User canceled the operation'); } else { throw error; // Re-throw other errors }}This is particularly useful in interactive applications where users might:
- Navigate away before workflow completion
- Cancel long-running operations
- Switch between different workflows
Resource Cleanup
Section titled “Resource Cleanup”Auto-Cleanup with Events
Section titled “Auto-Cleanup with Events”Clean up automatically when workflow reaches terminal state:
const run = await pgflow.startFlow('cleanup_example', { data: 'test' });
// Auto-cleanup on completionrun.on('*', (event) => { if (event.status === 'completed' || event.status === 'failed') { pgflow.dispose(run.run_id); }});Cleanup After Waiting
Section titled “Cleanup After Waiting”Clean up after waiting for workflow completion:
const run = await pgflow.startFlow('cleanup_example', { data: 'test' });
await run.waitForStatus(FlowRunStatus.Completed);pgflow.dispose(run.run_id);Cleanup on Shutdown
Section titled “Cleanup on Shutdown”Clean up all runs when your application terminates:
process.on('SIGTERM', () => { pgflow.disposeAll();});Type Safety with Flow Definitions
Section titled “Type Safety with Flow Definitions”When using with @pgflow/dsl, get full type inference:
import { Flow } from '@pgflow/dsl';
// Define your flowconst AnalyzeWebsite = new Flow<{ url: string }>({ slug: 'analyze_website' }) .step({ slug: 'scrape' }, async (input) => ({ content: 'html...', title: 'Page Title' })) .step({ slug: 'analyze' }, async (input) => ({ sentiment: 0.8, keywords: ['typescript', 'postgres'] }));
// Type-safe client usageconst run = await pgflow.startFlow<typeof AnalyzeWebsite>( AnalyzeWebsite.slug, { url: 'https://example.com' } // TypeScript validates this);
// Typed step accessconst scrapeStep = run.step('scrape'); // TypeScript knows this step exists
// Typed outputawait run.waitForStatus(FlowRunStatus.Completed);console.log(run.output); // TypeScript knows the output structureRetrieving Existing Runs
Section titled “Retrieving Existing Runs”Access workflow runs that were started elsewhere:
// Get a run by IDconst run = await pgflow.getRun('550e8400-e29b-41d4-a716-446655440000');
if (run) { console.log('Found run:', run.status);
// Continue monitoring if (run.status === FlowRunStatus.Started) { await run.waitForStatus(FlowRunStatus.Completed); }} else { console.log('Run not found');}