Organize Flows code
This guide outlines best practices for organizing your pgflow codebase to improve maintainability, reusability, and clarity.
Recommended Project Structure
Section titled “Recommended Project Structure”A well-organized pgflow project typically follows this structure:
mkdir -p supabase/functions/_flows supabase/functions/_tasks
Directorysupabase
Directoryfunctions
Directory_flows
- analyze_website.ts
Directory_tasks
- scrapeWebsite.ts
- summarizeWithAI.ts
- extractTags.ts
- saveWebsite.ts
- utils.ts
This organization separates your codebase into two key parts:
_tasks/
- Contains small, focused functions that each perform a single unit of work with clear inputs and outputs_flows/
- Contains definitions that compose these tasks into directed acyclic graphs (DAGs), defining data dependencies between tasks
Tasks are modular, reusable functions designed for a specific purpose, while flows define the execution order, parallelism, and data transformations between tasks. The flow orchestrates how data moves through the computational graph.
Task Design
Section titled “Task Design”When organizing your codebase, task design is critical to ensure reusability and maintainability.
Example: Website Analysis Flow Implementation
Section titled “Example: Website Analysis Flow Implementation”Let’s look at how a real-world workflow might be organized:
/** * Fetches website content from a URL * * For a real implementation, see the demo app: * https://github.com/pgflow-dev/pgflow/tree/main/examples/playground/supabase/functions/_tasks/scrapeWebsite.ts */export default async function scrapeWebsite(url: string) { console.log(`Fetching content from: ${url}`);
// In a real implementation, this would fetch and process actual website content // This simplified version returns mock data based on the URL
return { content: `Sample content from ${url}. This is a placeholder for real website content. The website discusses various topics including technology, data processing, and workflows. In a production app, this would be actual content scraped from the URL.` };}
/** * Summarizes text content using AI * * For a real implementation using Groq/OpenAI, see the demo app: * https://github.com/pgflow-dev/pgflow/tree/main/examples/playground/supabase/functions/_tasks/summarizeWithAI.ts */export default async function summarizeWithAI(content: string) { console.log(`Summarizing ${content.length} chars of content`);
// Simple function that generates a summary based on content length // In a real implementation, this would use an AI service API
const length = content.length; let summary = "";
if (length < 100) { summary = "Very short content about a website."; } else if (length < 500) { summary = "Website discussing technology and data workflows. The site includes information about processing data efficiently."; } else { summary = "Comprehensive website covering multiple aspects of technology, data processing workflows, and system architecture. The content explores efficient data handling methodologies and implementation patterns."; }
return summary;}
/** * Extracts relevant tags from content * * For a real implementation using AI services, see the demo app: * https://github.com/pgflow-dev/pgflow/tree/main/examples/playground/supabase/functions/_tasks/extractTags.ts */export default async function extractTags(content: string) { console.log(`Extracting tags from ${content.length} chars of content`);
// Simple mock implementation that returns tags based on content // In a real implementation, this would use AI to analyze the content
// Create a set of default tags const defaultTags = ["technology", "data", "workflow"];
// Add additional tags based on content const additionalTags = []; if (content.includes("processing")) additionalTags.push("processing"); if (content.includes("API") || content.includes("api")) additionalTags.push("api"); if (content.includes("database") || content.includes("SQL")) additionalTags.push("database");
return { keywords: [...defaultTags, ...additionalTags] };}
/** * Saves website data to the database * * For a real implementation using Supabase, see the demo app: * https://github.com/pgflow-dev/pgflow/tree/main/examples/playground/supabase/functions/_tasks/saveWebsite.ts */export default async function saveWebsite(websiteData: { website_url: string; summary: string; tags: string[];}) { console.log("Saving website data:", websiteData);
// In a real implementation, this would save to a database // This simplified version just logs and returns mock data
// Generate a mock ID based on URL const id = `website_${Date.now()}`;
return { success: true, website: { id, ...websiteData, created_at: new Date().toISOString() } };}
And the flow definition that ties everything together:
import { Flow } from 'npm:@pgflow/dsl';import scrapeWebsite from '../_tasks/scrapeWebsite.ts';import summarizeWithAI from '../_tasks/summarizeWithAI.ts';import extractTags from '../_tasks/extractTags.ts';import saveWebsite from '../_tasks/saveWebsite.ts';
type Input = { url: string;};
export default new Flow<Input>({ slug: 'analyze_website', maxAttempts: 3, timeout: 4, baseDelay: 1,}) .step( { slug: 'website' }, async (input) => await scrapeWebsite(input.run.url), ) .step( { slug: 'summary', dependsOn: ['website'] }, async (input) => await summarizeWithAI(input.website.content), ) .step({ slug: 'tags', dependsOn: ['website'] }, async (input) => { const { keywords } = await extractTags(input.website.content); return keywords; }) .step({ slug: 'saveToDb', dependsOn: ['summary', 'tags'] }, async (input) => { const websiteData = { website_url: input.run.url, summary: input.summary, tags: input.tags, }; const { website } = await saveWebsite(websiteData);
return website; });
Benefits of This Organization
Section titled “Benefits of This Organization”Organizing your code this way provides several benefits:
- Reusability: Tasks can be reused across multiple flows
- Testability: Individual tasks can be tested in isolation
- Maintainability: Easier to understand, debug, and update
- Separation of concerns: Clear boundaries between logic and orchestration
- Versioning: Simplifies flow versioning while maintaining task compatibility