Skip to content

Organize Flows code

This guide outlines best practices for organizing your pgflow codebase to improve maintainability, reusability, and clarity.

A well-organized pgflow project typically follows this structure:

mkdir -p supabase/functions/_flows supabase/functions/_tasks
  • Directorysupabase
    • Directoryfunctions
      • Directory_flows
        • analyze_website.ts
      • Directory_tasks
        • scrapeWebsite.ts
        • summarizeWithAI.ts
        • extractTags.ts
        • saveWebsite.ts
      • utils.ts

This organization separates your codebase into two key parts:

  • _tasks/ - Contains small, focused functions that each perform a single unit of work with clear inputs and outputs
  • _flows/ - Contains definitions that compose these tasks into directed acyclic graphs (DAGs), defining data dependencies between tasks

Tasks are modular, reusable functions designed for a specific purpose, while flows define the execution order, parallelism, and data transformations between tasks. The flow orchestrates how data moves through the computational graph.

When organizing your codebase, task design is critical to ensure reusability and maintainability.

Example: Website Analysis Flow Implementation

Section titled “Example: Website Analysis Flow Implementation”

Let’s look at how a real-world workflow might be organized:

supabase/functions/_tasks/scrapeWebsite.ts
/**
* Fetches website content from a URL
*
* For a real implementation, see the demo app:
* https://github.com/pgflow-dev/pgflow/tree/main/examples/playground/supabase/functions/_tasks/scrapeWebsite.ts
*/
export default async function scrapeWebsite(url: string) {
console.log(`Fetching content from: ${url}`);
// In a real implementation, this would fetch and process actual website content
// This simplified version returns mock data based on the URL
return {
content: `Sample content from ${url}. This is a placeholder for real website content.
The website discusses various topics including technology, data processing, and workflows.
In a production app, this would be actual content scraped from the URL.`
};
}

And the flow definition that ties everything together:

supabase/functions/_flows/analyze_website.ts
import { Flow } from 'npm:@pgflow/dsl';
import scrapeWebsite from '../_tasks/scrapeWebsite.ts';
import summarizeWithAI from '../_tasks/summarizeWithAI.ts';
import extractTags from '../_tasks/extractTags.ts';
import saveWebsite from '../_tasks/saveWebsite.ts';
type Input = {
url: string;
};
export default new Flow<Input>({
slug: 'analyze_website',
maxAttempts: 3,
timeout: 4,
baseDelay: 1,
})
.step(
{ slug: 'website' },
async (input) => await scrapeWebsite(input.run.url),
)
.step(
{ slug: 'summary', dependsOn: ['website'] },
async (input) => await summarizeWithAI(input.website.content),
)
.step({ slug: 'tags', dependsOn: ['website'] }, async (input) => {
const { keywords } = await extractTags(input.website.content);
return keywords;
})
.step({ slug: 'saveToDb', dependsOn: ['summary', 'tags'] }, async (input) => {
const websiteData = {
website_url: input.run.url,
summary: input.summary,
tags: input.tags,
};
const { website } = await saveWebsite(websiteData);
return website;
});

Organizing your code this way provides several benefits:

  1. Reusability: Tasks can be reused across multiple flows
  2. Testability: Individual tasks can be tested in isolation
  3. Maintainability: Easier to understand, debug, and update
  4. Separation of concerns: Clear boundaries between logic and orchestration
  5. Versioning: Simplifies flow versioning while maintaining task compatibility