Understanding the Flow DSL
pgflow’s TypeScript Domain Specific Language (DSL) is designed to create type-safe, data-driven workflows. This guide explains how the DSL works and the principles behind its design.
The Flow DSL Philosophy
Section titled “The Flow DSL Philosophy”The Flow DSL is built on several key principles:
- Type Safety - Complete type checking from flow inputs to outputs
- Functional Approach - Composable functions with clear inputs and outputs
- Separation of Concerns - Task logic separate from flow orchestration
- Fluent Interface - Chainable method calls that return new Flow instances
The pgflow DSL uses a fluent interface:
// Method chaining for flow definitionnew Flow<Input>({ slug: 'my_flow' }) .step({ slug: 'step1' }, async (input) => { /* ... */ }) .step({ slug: 'step2' }, async (input) => { /* ... */ });
Each .step()
call creates a new Flow instance without modifying the original, ensuring flow definitions are predictable, testable, and immune to side effects. The entire flow is defined in a single declarative expression with no side effects.
Understanding Step Inputs and Data Flow
Section titled “Understanding Step Inputs and Data Flow”In pgflow, every step receives a unified input
object with two critical parts:
input.run
- The original flow input, available to ALL stepsinput.{stepName}
- Outputs from any dependency steps
The input.run Field
Every step has access to the complete flow input via input.run
, ensuring:
- Original flow parameters are accessible throughout the flow
- Data doesn’t need to be manually forwarded through intermediate steps
- Steps can combine original input with processed data
Consider this example:
new Flow<{ url: string, userId: string }>({ slug: 'analyze_website',}) .step( { slug: 'scrape' }, async (input) => { // Access to input.run.url and input.run.userId return await scrapeWebsite(input.run.url); } ) .step( { slug: 'analyze', dependsOn: ['scrape'] }, async (input) => { // Still has access to input.run.userId // Now also has access to input.scrape return await analyzeContent(input.scrape.content); } );
When this flow runs:
- The flow receives an input object (e.g.,
{ url: "example.com", userId: "123" }
) - Each step receives both the original input via
input.run
and the outputs of any dependency steps - Steps can combine original parameters with processed data from previous steps
The Type System
Section titled “The Type System”One of pgflow’s most powerful features is its type system, which ensures type safety across the entire workflow:
// Define input type for the flowtype WebsiteInput = { url: string, userId: string };
// Create a flow with that input typenew Flow<WebsiteInput>({ slug: 'analyze_website',}) .step( { slug: 'scrape' }, async (input) => { // input.run is typed as WebsiteInput return { content: "..." }; } ) .step( { slug: 'analyze', dependsOn: ['scrape'] }, async (input) => { // input.scrape is typed based on the scrape step's return type return { analysis: "..." }; } );
The type system automatically:
- Enforces the correct input type for the flow
- Makes the flow input available as
input.run
in every step - Tracks each step’s output type and makes it available to dependent steps
- Provides IDE autocompletion and catches type errors at compile time
Task Implementation
Section titled “Task Implementation”The Flow DSL encourages a functional programming approach to tasks that are reusable across different flows.
JSON Serialization Requirements
Section titled “JSON Serialization Requirements”All step inputs and outputs MUST be serializable to JSON since pgflow stores these values in JSONB database columns:
// GOOD: Fully serializable objects.step( { slug: 'processData' }, async (input) => { return { count: 42, items: ["apple", "banana"], metadata: { processed: true, timestamp: new Date().toISOString() // Convert Date to string } }; })
// BAD: Non-serializable types will cause runtime errors.step( { slug: 'badExample' }, async (input) => { return { date: new Date(), // Use toISOString() instead regex: /test/, // Not serializable function: () => {} // Functions can't be serialized }; })
Guidelines for JSON-Compatible Data
Section titled “Guidelines for JSON-Compatible Data”- Use: primitive types (strings, numbers, booleans, null), plain objects, and arrays
- Convert dates to ISO strings:
new Date().toISOString()
- Avoid: class instances, functions, symbols, undefined, and circular references
How Steps Connect and Execute
Section titled “How Steps Connect and Execute”Steps are connected through explicit dependencies:
.step( { slug: 'summary', dependsOn: ['website'] }, // Function implementation...)
When a flow runs:
- pgflow runs steps with no dependencies (“root steps”) first
- As steps complete, dependent steps whose dependencies are satisfied become eligible to run
- Steps with the same dependencies can run in parallel
- The flow completes when all steps have completed or failed
This execution model maximizes parallelism when possible, ensures proper ordering of operations, and handles retries and failures automatically.
Flow Definition vs. Execution
Section titled “Flow Definition vs. Execution”The flow lifecycle has distinct phases:
- Definition (TypeScript): Flow structure defined using the DSL
- Compilation: Conversion from TypeScript to SQL
- Registration: Storage of flow structure in the database
- Execution: Runtime processing of tasks based on the database definition
The TypeScript DSL is used only for definition and compilation, not for execution. This separation allows for versioning of flow definitions and persistence of workflow state in the database.
Example: Multi-Step Data Flow
Section titled “Example: Multi-Step Data Flow”Here’s a practical example showing how data flows efficiently through steps:
// Flow with multiple input parameterstype Input = { userId: string, includeDetails: boolean, reportType: 'basic' | 'advanced'};
new Flow<Input>({ slug: 'user_report',}) // Step 1: Fetch user data .step( { slug: 'user' }, async (input) => { return await fetchUser(input.run.userId); } ) // Steps 2 & 3: Process user data in parallel .step( { slug: 'activity', dependsOn: ['user'] }, async (input) => { // Uses input.run.reportType to determine timespan const timespan = input.run.reportType === 'advanced' ? '1y' : '30d'; return await getUserActivity(input.user.id, timespan); } ) .step( { slug: 'preferences', dependsOn: ['user'] }, async (input) => { // Uses input.run.includeDetails parameter return await getUserPreferences(input.user.id, input.run.includeDetails); } ) // Step 4: Combine results .step( { slug: 'report', dependsOn: ['activity', 'preferences'] }, async (input) => { return { user: input.user, activity: input.activity, preferences: input.preferences, reportType: input.run.reportType, // Original parameter still available generatedAt: new Date().toISOString() }; } );
This example demonstrates:
- Original parameters available throughout - Every step can access the input parameters
- Conditional processing - Steps adapt behavior based on original parameters
- No manual parameter forwarding needed - The
user
step doesn’t need to include original parameters - Type safety throughout - TypeScript ensures all data accesses are valid
Summary
Section titled “Summary”The pgflow DSL provides a powerful, type-safe way to define complex workflows:
- Every step receives the original flow input via
input.run
- TypeScript’s type system ensures data flows correctly between steps
- The functional approach keeps task implementation separate from flow orchestration
- Parallel execution of independent steps is managed automatically
- Dependencies between steps are handled transparently
The input.run
design enables original parameters to be available throughout the workflow without manual forwarding, allowing steps to focus on their specific tasks while maintaining type safety. With these principles, you can design efficient, maintainable workflows that take full advantage of pgflow’s capabilities.