Skip to content

Understanding flows

pgflow’s TypeScript DSL lets you define flows by describing what steps you need and how they connect. You specify the structure - which steps exist, what data they need, and which steps depend on others. Each step includes a function that does the actual work.

Flow definitions are built on several key principles:

  1. Declarative Structure - Describe what steps exist and how they connect, not how to execute them
  2. Type Safety - Complete type checking from flow inputs to outputs
  3. Functional Approach - Composable handler functions with clear inputs and outputs
  4. Separation of Concerns - Step logic separate from orchestration
  5. Fluent Interface - Chainable method calls that return new Flow instances

The Flow class uses a fluent interface to build this declarative manifest:

// Method chaining for flow definition
new Flow<Input>({ slug: 'my_flow' })
.step({ slug: 'step1' }, async (input) => { /* ... */ })
.step({ slug: 'step2' }, async (input) => { /* ... */ });

Each .step() call creates a new Flow instance without modifying the original, ensuring flow definitions are predictable, testable, and immune to side effects. The entire flow is defined in a single expression - you describe the structure and provide handlers, while pgflow handles the execution.

In pgflow, every step receives a unified input object with two critical parts:

  1. input.run - The original flow input, available to ALL steps
  2. input.{stepName} - Outputs from any dependency steps

Consider this example:

new Flow<{ url: string, userId: string }>({
slug: 'analyze_website',
})
.step(
{ slug: 'scrape' },
async (input) => {
// Access to input.run.url and input.run.userId
return await scrapeWebsite(input.run.url);
}
)
.step(
{ slug: 'analyze', dependsOn: ['scrape'] },
async (input) => {
// Still has access to input.run.userId
// Now also has access to input.scrape
return await analyzeContent(input.scrape.content);
}
);

When this flow runs:

  1. The flow receives an input object (e.g., { url: "example.com", userId: "123" })
  2. Each step receives both the original input via input.run and the outputs of any dependency steps
  3. Steps can combine original parameters with processed data from previous steps

One of pgflow’s most powerful features is its type system, which ensures type safety across the entire workflow:

// Define input type for the flow
type WebsiteInput = { url: string, userId: string };
// Create a flow with that input type
new Flow<WebsiteInput>({
slug: 'analyze_website',
})
.step(
{ slug: 'scrape' },
async (input) => {
// input.run is typed as WebsiteInput
return { content: "..." };
}
)
.step(
{ slug: 'analyze', dependsOn: ['scrape'] },
async (input) => {
// input.scrape is typed based on the scrape step's return type
return { analysis: "..." };
}
);

The type system automatically:

  • Enforces the correct input type for the flow
  • Makes the flow input available as input.run in every step
  • Tracks each step’s output type and makes it available to dependent steps
  • Provides IDE autocompletion and catches type errors at compile time

The Flow class provides three methods for defining steps in your workflow:

The standard method for adding steps to a flow. Each step processes input and returns output.

.step(
{ slug: 'process', dependsOn: ['previous'] },
async (input) => {
// Access input.run and input.previous
return { result: 'processed' };
}
)

A semantic wrapper around .step() that enforces array return types. Useful for data collection that will be processed by map steps.

// Fetch an array of items
.array(
{ slug: 'fetch_items' },
async () => ['item1', 'item2', 'item3']
)
// With dependencies
.array(
{ slug: 'combine', dependsOn: ['source1', 'source2'] },
async (input) => [...input.source1, ...input.source2]
)

Processes arrays element-by-element in parallel. Unlike regular steps, map handlers receive individual elements instead of the full input object.

// Root map: processes flow input array directly
new Flow<string[]>({ slug: 'batch_processor' })
.map(
{ slug: 'uppercase' }, // No 'array' property
(item) => item.toUpperCase()
);
// Dependent map: processes another step's array output
new Flow<{}>({ slug: 'data_pipeline' })
.array({ slug: 'fetch_ids' }, () => ['id1', 'id2', 'id3'])
.map(
{ slug: 'fetch_details', array: 'fetch_ids' },
async (id) => await fetchUserDetails(id)
)
.map(
{ slug: 'enrich', array: 'fetch_details' },
async (user) => ({ ...user, enriched: true })
);

Key Characteristics of Map Steps:

  • Parallel Execution: Each array element spawns a separate task
  • Different Handler Signature: (item) => result instead of (input) => result
  • Automatic Aggregation: Outputs are collected back into an array
  • Single Dependency: Can only depend on one array source
  • Two Modes: Root map (processes flow input) or dependent map (processes step output)

Flow definitions encourage a functional programming approach to tasks that are reusable across different flows.

All step inputs and outputs MUST be serializable to JSON since pgflow stores these values in JSONB database columns:

// GOOD: Fully serializable objects
.step(
{ slug: 'processData' },
async (input) => {
return {
count: 42,
items: ["apple", "banana"],
metadata: {
processed: true,
timestamp: new Date().toISOString() // Convert Date to string
}
};
}
)
// BAD: Non-serializable types will cause runtime errors
.step(
{ slug: 'badExample' },
async (input) => {
return {
date: new Date(), // Use toISOString() instead
regex: /test/, // Not serializable
function: () => {} // Functions can't be serialized
};
}
)
  • Use: primitive types (strings, numbers, booleans, null), plain objects, and arrays
  • Convert dates to ISO strings: new Date().toISOString()
  • Avoid: class instances, functions, symbols, undefined, and circular references

Steps are connected through explicit dependencies:

.step(
{ slug: 'summary', dependsOn: ['website'] },
// Function implementation...
)

When a flow runs:

  1. pgflow runs steps with no dependencies (“root steps”) first
  2. As steps complete, dependent steps whose dependencies are satisfied become eligible to run
  3. Steps with the same dependencies can run in parallel
  4. The flow completes when all steps have completed or failed

This execution model maximizes parallelism when possible, ensures proper ordering of operations, and handles retries and failures automatically.

The flow lifecycle has distinct phases:

  1. Definition (TypeScript): Flow structure defined using the DSL
  2. Compilation: Conversion from TypeScript to SQL
  3. Registration: Storage of flow structure in the database
  4. Execution: Runtime processing of tasks based on the database definition

The TypeScript code is used only for definition and compilation, not for execution. This separation allows for versioning of flow definitions and persistence of workflow state in the database.

Here’s a practical example showing how data flows efficiently through steps:

// Flow with multiple input parameters
type Input = {
userId: string,
includeDetails: boolean,
reportType: 'basic' | 'advanced'
};
new Flow<Input>({
slug: 'user_report',
})
// Step 1: Fetch user data
.step(
{ slug: 'user' },
async (input) => {
return await fetchUser(input.run.userId);
}
)
// Steps 2 & 3: Process user data in parallel
.step(
{ slug: 'activity', dependsOn: ['user'] },
async (input) => {
// Uses input.run.reportType to determine timespan
const timespan = input.run.reportType === 'advanced' ? '1y' : '30d';
return await getUserActivity(input.user.id, timespan);
}
)
.step(
{ slug: 'preferences', dependsOn: ['user'] },
async (input) => {
// Uses input.run.includeDetails parameter
return await getUserPreferences(input.user.id, input.run.includeDetails);
}
)
// Step 4: Combine results
.step(
{ slug: 'report', dependsOn: ['activity', 'preferences'] },
async (input) => {
return {
user: input.user,
activity: input.activity,
preferences: input.preferences,
reportType: input.run.reportType, // Original parameter still available
generatedAt: new Date().toISOString()
};
}
);

This example demonstrates:

  1. Original parameters available throughout - Every step can access the input parameters
  2. Conditional processing - Steps adapt behavior based on original parameters
  3. No manual parameter forwarding needed - The user step doesn’t need to include original parameters
  4. Type safety throughout - TypeScript ensures all data accesses are valid

pgflow provides a powerful, type-safe way to define complex workflows:

  • Every step receives the original flow input via input.run
  • TypeScript’s type system ensures data flows correctly between steps
  • The functional approach keeps task implementation separate from flow orchestration
  • Parallel execution of independent steps is managed automatically
  • Dependencies between steps are handled transparently

The input.run design enables original parameters to be available throughout the workflow without manual forwarding, allowing steps to focus on their specific tasks while maintaining type safety.