Understanding flows
pgflow’s TypeScript DSL lets you define flows by describing what steps you need and how they connect. You specify the structure - which steps exist, what data they need, and which steps depend on others. Each step includes a function that does the actual work.
Flow Definition Philosophy
Section titled “Flow Definition Philosophy”Flow definitions are built on several key principles:
- Declarative Structure - Describe what steps exist and how they connect, not how to execute them
- Type Safety - Complete type checking from flow inputs to outputs
- Functional Approach - Composable handler functions with clear inputs and outputs
- Separation of Concerns - Step logic separate from orchestration
- Fluent Interface - Chainable method calls that return new Flow instances
The Flow class uses a fluent interface to build this declarative manifest:
// Method chaining for flow definitionnew Flow<Input>({ slug: 'my_flow' }) .step({ slug: 'step1' }, async (input) => { /* ... */ }) .step({ slug: 'step2' }, async (input) => { /* ... */ });Each .step() call creates a new Flow instance without modifying the original, ensuring flow definitions are predictable, testable, and immune to side effects. The entire flow is defined in a single expression - you describe the structure and provide handlers, while pgflow handles the execution.
Understanding Step Inputs and Data Flow
Section titled “Understanding Step Inputs and Data Flow”In pgflow, every step receives a unified input object with two critical parts:
input.run- The original flow input, available to ALL stepsinput.{stepName}- Outputs from any dependency steps
Consider this example:
new Flow<{ url: string, userId: string }>({ slug: 'analyze_website',}) .step( { slug: 'scrape' }, async (input) => { // Access to input.run.url and input.run.userId return await scrapeWebsite(input.run.url); } ) .step( { slug: 'analyze', dependsOn: ['scrape'] }, async (input) => { // Still has access to input.run.userId // Now also has access to input.scrape return await analyzeContent(input.scrape.content); } );When this flow runs:
- The flow receives an input object (e.g.,
{ url: "example.com", userId: "123" }) - Each step receives both the original input via
input.runand the outputs of any dependency steps - Steps can combine original parameters with processed data from previous steps
The Type System
Section titled “The Type System”One of pgflow’s most powerful features is its type system, which ensures type safety across the entire workflow:
// Define input type for the flowtype WebsiteInput = { url: string, userId: string };
// Create a flow with that input typenew Flow<WebsiteInput>({ slug: 'analyze_website',}) .step( { slug: 'scrape' }, async (input) => { // input.run is typed as WebsiteInput return { content: "..." }; } ) .step( { slug: 'analyze', dependsOn: ['scrape'] }, async (input) => { // input.scrape is typed based on the scrape step's return type return { analysis: "..." }; } );The type system automatically:
- Enforces the correct input type for the flow
- Makes the flow input available as
input.runin every step - Tracks each step’s output type and makes it available to dependent steps
- Provides IDE autocompletion and catches type errors at compile time
Step Methods
Section titled “Step Methods”The Flow class provides three methods for defining steps in your workflow:
.step() - Regular Steps
Section titled “.step() - Regular Steps”The standard method for adding steps to a flow. Each step processes input and returns output.
.step( { slug: 'process', dependsOn: ['previous'] }, async (input) => { // Access input.run and input.previous return { result: 'processed' }; }).array() - Array-Returning Steps
Section titled “.array() - Array-Returning Steps”A semantic wrapper around .step() that enforces array return types. Useful for data collection that will be processed by map steps.
// Fetch an array of items.array( { slug: 'fetch_items' }, async () => ['item1', 'item2', 'item3'])
// With dependencies.array( { slug: 'combine', dependsOn: ['source1', 'source2'] }, async (input) => [...input.source1, ...input.source2]).map() - Parallel Array Processing
Section titled “.map() - Parallel Array Processing”Processes arrays element-by-element in parallel. Unlike regular steps, map handlers receive individual elements instead of the full input object.
// Root map: processes flow input array directlynew Flow<string[]>({ slug: 'batch_processor' }) .map( { slug: 'uppercase' }, // No 'array' property (item) => item.toUpperCase() );
// Dependent map: processes another step's array outputnew Flow<{}>({ slug: 'data_pipeline' }) .array({ slug: 'fetch_ids' }, () => ['id1', 'id2', 'id3']) .map( { slug: 'fetch_details', array: 'fetch_ids' }, async (id) => await fetchUserDetails(id) ) .map( { slug: 'enrich', array: 'fetch_details' }, async (user) => ({ ...user, enriched: true }) );Key Characteristics of Map Steps:
- Parallel Execution: Each array element spawns a separate task
- Different Handler Signature:
(item) => resultinstead of(input) => result - Automatic Aggregation: Outputs are collected back into an array
- Single Dependency: Can only depend on one array source
- Two Modes: Root map (processes flow input) or dependent map (processes step output)
Task Implementation
Section titled “Task Implementation”Flow definitions encourage a functional programming approach to tasks that are reusable across different flows.
JSON Serialization Requirements
Section titled “JSON Serialization Requirements”All step inputs and outputs MUST be serializable to JSON since pgflow stores these values in JSONB database columns:
// GOOD: Fully serializable objects.step( { slug: 'processData' }, async (input) => { return { count: 42, items: ["apple", "banana"], metadata: { processed: true, timestamp: new Date().toISOString() // Convert Date to string } }; })
// BAD: Non-serializable types will cause runtime errors.step( { slug: 'badExample' }, async (input) => { return { date: new Date(), // Use toISOString() instead regex: /test/, // Not serializable function: () => {} // Functions can't be serialized }; })Guidelines for JSON-Compatible Data
Section titled “Guidelines for JSON-Compatible Data”- Use: primitive types (strings, numbers, booleans, null), plain objects, and arrays
- Convert dates to ISO strings:
new Date().toISOString() - Avoid: class instances, functions, symbols, undefined, and circular references
How Steps Connect and Execute
Section titled “How Steps Connect and Execute”Steps are connected through explicit dependencies:
.step( { slug: 'summary', dependsOn: ['website'] }, // Function implementation...)When a flow runs:
- pgflow runs steps with no dependencies (“root steps”) first
- As steps complete, dependent steps whose dependencies are satisfied become eligible to run
- Steps with the same dependencies can run in parallel
- The flow completes when all steps have completed or failed
This execution model maximizes parallelism when possible, ensures proper ordering of operations, and handles retries and failures automatically.
Flow Definition vs. Execution
Section titled “Flow Definition vs. Execution”The flow lifecycle has distinct phases:
- Definition (TypeScript): Flow structure defined using the DSL
- Compilation: Conversion from TypeScript to SQL
- Registration: Storage of flow structure in the database
- Execution: Runtime processing of tasks based on the database definition
The TypeScript code is used only for definition and compilation, not for execution. This separation allows for versioning of flow definitions and persistence of workflow state in the database.
Example: Multi-Step Data Flow
Section titled “Example: Multi-Step Data Flow”Here’s a practical example showing how data flows efficiently through steps:
// Flow with multiple input parameterstype Input = { userId: string, includeDetails: boolean, reportType: 'basic' | 'advanced'};
new Flow<Input>({ slug: 'user_report',}) // Step 1: Fetch user data .step( { slug: 'user' }, async (input) => { return await fetchUser(input.run.userId); } ) // Steps 2 & 3: Process user data in parallel .step( { slug: 'activity', dependsOn: ['user'] }, async (input) => { // Uses input.run.reportType to determine timespan const timespan = input.run.reportType === 'advanced' ? '1y' : '30d'; return await getUserActivity(input.user.id, timespan); } ) .step( { slug: 'preferences', dependsOn: ['user'] }, async (input) => { // Uses input.run.includeDetails parameter return await getUserPreferences(input.user.id, input.run.includeDetails); } ) // Step 4: Combine results .step( { slug: 'report', dependsOn: ['activity', 'preferences'] }, async (input) => { return { user: input.user, activity: input.activity, preferences: input.preferences, reportType: input.run.reportType, // Original parameter still available generatedAt: new Date().toISOString() }; } );This example demonstrates:
- Original parameters available throughout - Every step can access the input parameters
- Conditional processing - Steps adapt behavior based on original parameters
- No manual parameter forwarding needed - The
userstep doesn’t need to include original parameters - Type safety throughout - TypeScript ensures all data accesses are valid
Summary
Section titled “Summary”pgflow provides a powerful, type-safe way to define complex workflows:
- Every step receives the original flow input via
input.run - TypeScript’s type system ensures data flows correctly between steps
- The functional approach keeps task implementation separate from flow orchestration
- Parallel execution of independent steps is managed automatically
- Dependencies between steps are handled transparently
The input.run design enables original parameters to be available throughout the workflow without manual forwarding, allowing steps to focus on their specific tasks while maintaining type safety.