Skip to content

TypeScript Client

This guide shows how to use @pgflow/client to start pgflow workflows from TypeScript applications (browsers, Node.js, Deno, React Native) and stream real-time progress updates.

The client can also monitor flows started by Supabase RPC, pg_cron, or database triggers using getRun().

  • Real-time progress updates - Show live workflow progress to users in browsers or apps
  • AI generation progress bars - Display streaming updates during content generation
  • Multi-step workflow visualization - Track individual step progress in dashboards
  • Interactive applications - Build UIs that respond to workflow status changes
  • Monitor flows from any source - Track workflows started by RPC, cron, or triggers

Install the client package in your TypeScript project:

Terminal window
npm install @pgflow/client

Initialize the client with your Supabase client:

import { createClient } from '@supabase/supabase-js';
import { PgflowClient, FlowRunStatus } from '@pgflow/client';
const supabase = createClient(
'https://your-project.supabase.co',
'your-anon-key'
);
const pgflow = new PgflowClient(supabase);

The simplest pattern is to start a workflow and wait for it to complete:

// Start the workflow
const run = await pgflow.startFlow('analyze_website', {
url: 'https://example.com',
});
console.log(`Started workflow: ${run.run_id}`);
// Wait for completion
const completed = await run.waitForStatus(FlowRunStatus.Completed, {
timeoutMs: 30000, // 30 second timeout
});
console.log('Analysis complete:', completed.output);

Wait for any terminal status to handle both success and failure cases:

const run = await pgflow.startFlow('process_data', { file: 'data.csv' });
// Wait for either completed or failed
const terminal = await run.waitForStatus([
FlowRunStatus.Completed,
FlowRunStatus.Failed,
]);
if (terminal.status === FlowRunStatus.Completed) {
console.log('Success:', terminal.output);
} else {
console.error('Failed:', terminal.error_message);
}

Track the progress of specific steps within a workflow:

import { FlowStepStatus } from '@pgflow/client';
const run = await pgflow.startFlow('data_pipeline', { source: 'api' });
// Monitor the extraction step
const extractStep = run.step('extract_data');
await extractStep.waitForStatus(FlowStepStatus.Completed);
console.log('Extraction complete:', extractStep.output);
// Monitor the transformation step
const transformStep = run.step('transform_data');
await transformStep.waitForStatus(FlowStepStatus.Completed);
console.log('Transformation complete:', transformStep.output);

Subscribe to events for live progress updates without polling:

const run = await pgflow.startFlow('long_running_job', { task: 'process' });
// Listen to all run events
run.on('*', (event) => {
console.log(`Status changed to: ${event.status}`);
});
// Listen to specific events
run.on('completed', (event) => {
console.log('Job completed:', event.output);
});
run.on('failed', (event) => {
console.error('Job failed:', event.error_message);
});
// Unsubscribe when done
const unsubscribe = run.on('completed', handler);
// Later...
unsubscribe();

Monitor progress of individual steps in real-time:

const run = await pgflow.startFlow('multi_step_flow', { data: 'input' });
const step = run.step('critical_step');
step.on('started', (event) => {
console.log(`Step started at: ${event.started_at}`);
});
step.on('completed', (event) => {
console.log('Step output:', event.output);
});
step.on('failed', (event) => {
console.error('Step error:', event.error_message);
});

Track several steps concurrently:

const run = await pgflow.startFlow('parallel_pipeline', { input: 'data' });
// Wait for multiple steps to complete
const stepSlugs = ['process_a', 'process_b', 'process_c'];
const stepPromises = stepSlugs.map((slug) =>
run.step(slug).waitForStatus(FlowStepStatus.Completed)
);
// All steps complete
await Promise.all(stepPromises);
console.log('All parallel steps finished');
// Or race for the first completion
const firstComplete = await Promise.race(stepPromises);
console.log('First step completed:', firstComplete.step_slug);

Listen for failure events to handle them as they occur:

const run = await pgflow.startFlow('risky_workflow', { data: 'test' });
// React immediately when workflow fails
run.on('failed', (event) => {
console.error('Workflow failed:', event.error_message);
// Handle failure (alert, log, compensate, etc.)
});
run.on('completed', (event) => {
console.log('Workflow completed:', event.output);
});

Wait for terminal status and handle the outcome:

const run = await pgflow.startFlow('risky_workflow', { data: 'test' });
const result = await run.waitForStatus([
FlowRunStatus.Completed,
FlowRunStatus.Failed,
]);
if (result.status === FlowRunStatus.Failed) {
console.error('Workflow failed:', result.error_message);
// Handle failure (alert, log, compensate, etc.)
} else {
console.log('Workflow completed:', result.output);
}

Use AbortSignal to cancel waiting operations when needed:

const controller = new AbortController();
// Start waiting with abort signal
const waitPromise = run.waitForStatus(FlowRunStatus.Completed, {
timeoutMs: 60000,
signal: controller.signal,
});
// Cancel if user navigates away (browser example)
window.addEventListener('beforeunload', () => {
controller.abort();
});
// Or cancel on user action
document.getElementById('cancel-btn')?.addEventListener('click', () => {
controller.abort();
});
try {
await waitPromise;
console.log('Workflow completed:', run.output);
} catch (error) {
if (error.message.includes('Aborted')) {
console.log('User canceled the operation');
} else {
throw error; // Re-throw other errors
}
}

This is particularly useful in interactive applications where users might:

  • Navigate away before workflow completion
  • Cancel long-running operations
  • Switch between different workflows

Clean up automatically when workflow reaches terminal state:

const run = await pgflow.startFlow('cleanup_example', { data: 'test' });
// Auto-cleanup on completion
run.on('*', (event) => {
if (event.status === 'completed' || event.status === 'failed') {
pgflow.dispose(run.run_id);
}
});

Clean up after waiting for workflow completion:

const run = await pgflow.startFlow('cleanup_example', { data: 'test' });
await run.waitForStatus(FlowRunStatus.Completed);
pgflow.dispose(run.run_id);

Clean up all runs when your application terminates:

process.on('SIGTERM', () => {
pgflow.disposeAll();
});

When using with @pgflow/dsl, get full type inference:

import { Flow } from '@pgflow/dsl';
// Define your flow
const AnalyzeWebsite = new Flow<{ url: string }>({ slug: 'analyze_website' })
.step({ slug: 'scrape' }, async (input) => ({
content: 'html...',
title: 'Page Title'
}))
.step({ slug: 'analyze' }, async (input) => ({
sentiment: 0.8,
keywords: ['typescript', 'postgres']
}));
// Type-safe client usage
const run = await pgflow.startFlow<typeof AnalyzeWebsite>(
AnalyzeWebsite.slug,
{ url: 'https://example.com' } // TypeScript validates this
);
// Typed step access
const scrapeStep = run.step('scrape'); // TypeScript knows this step exists
// Typed output
await run.waitForStatus(FlowRunStatus.Completed);
console.log(run.output); // TypeScript knows the output structure

Access workflow runs that were started elsewhere:

// Get a run by ID
const run = await pgflow.getRun('550e8400-e29b-41d4-a716-446655440000');
if (run) {
console.log('Found run:', run.status);
// Continue monitoring
if (run.status === FlowRunStatus.Started) {
await run.waitForStatus(FlowRunStatus.Completed);
}
} else {
console.log('Run not found');
}