Skip to content

Understanding the Flow DSL

pgflow’s TypeScript Domain Specific Language (DSL) is designed to create type-safe, data-driven workflows. This guide explains how the DSL works and the principles behind its design.

The Flow DSL is built on several key principles:

  1. Type Safety - Complete type checking from flow inputs to outputs
  2. Functional Approach - Composable functions with clear inputs and outputs
  3. Separation of Concerns - Task logic separate from flow orchestration
  4. Fluent Interface - Chainable method calls that return new Flow instances

The pgflow DSL uses a fluent interface:

// Method chaining for flow definition
new Flow<Input>({ slug: 'my_flow' })
.step({ slug: 'step1' }, async (input) => { /* ... */ })
.step({ slug: 'step2' }, async (input) => { /* ... */ });

Each .step() call creates a new Flow instance without modifying the original, ensuring flow definitions are predictable, testable, and immune to side effects. The entire flow is defined in a single declarative expression with no side effects.

In pgflow, every step receives a unified input object with two critical parts:

  1. input.run - The original flow input, available to ALL steps
  2. input.{stepName} - Outputs from any dependency steps

The input.run Field

Every step has access to the complete flow input via input.run, ensuring:

  • Original flow parameters are accessible throughout the flow
  • Data doesn’t need to be manually forwarded through intermediate steps
  • Steps can combine original input with processed data

Consider this example:

new Flow<{ url: string, userId: string }>({
slug: 'analyze_website',
})
.step(
{ slug: 'scrape' },
async (input) => {
// Access to input.run.url and input.run.userId
return await scrapeWebsite(input.run.url);
}
)
.step(
{ slug: 'analyze', dependsOn: ['scrape'] },
async (input) => {
// Still has access to input.run.userId
// Now also has access to input.scrape
return await analyzeContent(input.scrape.content);
}
);

When this flow runs:

  1. The flow receives an input object (e.g., { url: "example.com", userId: "123" })
  2. Each step receives both the original input via input.run and the outputs of any dependency steps
  3. Steps can combine original parameters with processed data from previous steps

One of pgflow’s most powerful features is its type system, which ensures type safety across the entire workflow:

// Define input type for the flow
type WebsiteInput = { url: string, userId: string };
// Create a flow with that input type
new Flow<WebsiteInput>({
slug: 'analyze_website',
})
.step(
{ slug: 'scrape' },
async (input) => {
// input.run is typed as WebsiteInput
return { content: "..." };
}
)
.step(
{ slug: 'analyze', dependsOn: ['scrape'] },
async (input) => {
// input.scrape is typed based on the scrape step's return type
return { analysis: "..." };
}
);

The type system automatically:

  • Enforces the correct input type for the flow
  • Makes the flow input available as input.run in every step
  • Tracks each step’s output type and makes it available to dependent steps
  • Provides IDE autocompletion and catches type errors at compile time

The Flow DSL encourages a functional programming approach to tasks that are reusable across different flows.

All step inputs and outputs MUST be serializable to JSON since pgflow stores these values in JSONB database columns:

// GOOD: Fully serializable objects
.step(
{ slug: 'processData' },
async (input) => {
return {
count: 42,
items: ["apple", "banana"],
metadata: {
processed: true,
timestamp: new Date().toISOString() // Convert Date to string
}
};
}
)
// BAD: Non-serializable types will cause runtime errors
.step(
{ slug: 'badExample' },
async (input) => {
return {
date: new Date(), // Use toISOString() instead
regex: /test/, // Not serializable
function: () => {} // Functions can't be serialized
};
}
)
  • Use: primitive types (strings, numbers, booleans, null), plain objects, and arrays
  • Convert dates to ISO strings: new Date().toISOString()
  • Avoid: class instances, functions, symbols, undefined, and circular references

Steps are connected through explicit dependencies:

.step(
{ slug: 'summary', dependsOn: ['website'] },
// Function implementation...
)

When a flow runs:

  1. pgflow runs steps with no dependencies (“root steps”) first
  2. As steps complete, dependent steps whose dependencies are satisfied become eligible to run
  3. Steps with the same dependencies can run in parallel
  4. The flow completes when all steps have completed or failed

This execution model maximizes parallelism when possible, ensures proper ordering of operations, and handles retries and failures automatically.

The flow lifecycle has distinct phases:

  1. Definition (TypeScript): Flow structure defined using the DSL
  2. Compilation: Conversion from TypeScript to SQL
  3. Registration: Storage of flow structure in the database
  4. Execution: Runtime processing of tasks based on the database definition

The TypeScript DSL is used only for definition and compilation, not for execution. This separation allows for versioning of flow definitions and persistence of workflow state in the database.

Here’s a practical example showing how data flows efficiently through steps:

// Flow with multiple input parameters
type Input = {
userId: string,
includeDetails: boolean,
reportType: 'basic' | 'advanced'
};
new Flow<Input>({
slug: 'user_report',
})
// Step 1: Fetch user data
.step(
{ slug: 'user' },
async (input) => {
return await fetchUser(input.run.userId);
}
)
// Steps 2 & 3: Process user data in parallel
.step(
{ slug: 'activity', dependsOn: ['user'] },
async (input) => {
// Uses input.run.reportType to determine timespan
const timespan = input.run.reportType === 'advanced' ? '1y' : '30d';
return await getUserActivity(input.user.id, timespan);
}
)
.step(
{ slug: 'preferences', dependsOn: ['user'] },
async (input) => {
// Uses input.run.includeDetails parameter
return await getUserPreferences(input.user.id, input.run.includeDetails);
}
)
// Step 4: Combine results
.step(
{ slug: 'report', dependsOn: ['activity', 'preferences'] },
async (input) => {
return {
user: input.user,
activity: input.activity,
preferences: input.preferences,
reportType: input.run.reportType, // Original parameter still available
generatedAt: new Date().toISOString()
};
}
);

This example demonstrates:

  1. Original parameters available throughout - Every step can access the input parameters
  2. Conditional processing - Steps adapt behavior based on original parameters
  3. No manual parameter forwarding needed - The user step doesn’t need to include original parameters
  4. Type safety throughout - TypeScript ensures all data accesses are valid

The pgflow DSL provides a powerful, type-safe way to define complex workflows:

  • Every step receives the original flow input via input.run
  • TypeScript’s type system ensures data flows correctly between steps
  • The functional approach keeps task implementation separate from flow orchestration
  • Parallel execution of independent steps is managed automatically
  • Dependencies between steps are handled transparently

The input.run design enables original parameters to be available throughout the workflow without manual forwarding, allowing steps to focus on their specific tasks while maintaining type safety. With these principles, you can design efficient, maintainable workflows that take full advantage of pgflow’s capabilities.