Process arrays in parallel
This guide shows practical patterns for using map steps to process collections of data in parallel, from simple transformations to complex batch operations.
Processing a List of URLs
Section titled “Processing a List of URLs”A common use case is fetching and processing multiple web pages in parallel:
import { Flow } from '@pgflow/dsl/supabase';
const ScrapeMultipleUrls = new Flow<string[]>({ // Flow input must be array for root map slug: 'scrape_multiple_urls', maxAttempts: 3,}) .map( { slug: 'scrapedPages' }, async (url) => await scrapeWebpage(url) ) .step( { slug: 'summary', dependsOn: ['scrapedPages'] }, (input) => summarizeResults(input.scrapedPages) );
// Usage (SQL):SELECT pgflow.start_flow( flow_slug => 'scrape_multiple_urls', input => '["https://example.com", "https://example.org", "https://example.net"]'::jsonb);Data Transformation Pipeline
Section titled “Data Transformation Pipeline”Transform and validate data collections efficiently.
CSV Processing Pipeline
Section titled “CSV Processing Pipeline”const CsvProcessor = new Flow<{ csvUrl: string }>({ slug: 'csv_processor',}) .array( { slug: 'csvRows' }, async (input) => await fetchAndParseCSV(input.run.csvUrl) ) .map( { slug: 'validatedRows', array: 'csvRows' }, (row) => validateRow(row) ) .map( { slug: 'processedRows', array: 'validatedRows' }, (row) => transformIfValid(row) ) .step( { slug: 'saveResults', dependsOn: ['processedRows'] }, async (input, context) => await saveProcessedData(input.processedRows, context) );Enriching Array Elements with Additional Data
Section titled “Enriching Array Elements with Additional Data”Map handlers only receive individual array elements. If a handler needs access to the original flow input (run), outputs from other dependencies, or any additional data, include that data in the array elements via a previous step.
Problem: Map Handlers Can’t Access Flow Context
Section titled “Problem: Map Handlers Can’t Access Flow Context”// This won't work - map handler can't access input.runconst ProblemFlow = new Flow<{ apiKey: string, ids: string[] }>({ slug: 'problem_flow',}) .map({ slug: 'fetch' }, async (id) => { // Can't access input.run.apiKey here! return await fetchWithKey(id, ???); // No access to apiKey });Solution: Enrich Array Elements
Section titled “Solution: Enrich Array Elements”const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({ slug: 'solution_flow',}) .array( { slug: 'prepareItems' }, (input) => { // Include needed context in each element return input.run.ids.map(id => ({ id, apiKey: input.run.apiKey })); } ) .map( { slug: 'fetch', array: 'prepareItems' }, async (item) => { // Now we have access to both id and apiKey return await fetchWithKey(item.id, item.apiKey); } );This pattern applies whenever a map handler needs any data beyond the array elements themselves. Add a step before the map that enriches the array elements with whatever data the handler needs - whether that’s the original flow input (run), outputs from other dependencies, or both.
Common Gotchas and Solutions
Section titled “Common Gotchas and Solutions”Handling Empty Arrays
Section titled “Handling Empty Arrays”When a map step receives an empty array, pgflow optimizes by completing the entire chain immediately:
const EmptyHandling = new Flow<{}>({ slug: 'empty_handling',}) .array({ slug: 'items' }, async () => { const results = await fetchData(); return results || []; // Might return [] }) .map( { slug: 'processedItems', array: 'items' }, (item) => processItem(item) // Never executes if items is [] ) .map( { slug: 'enrichedItems', array: 'processedItems' }, (item) => enrichItem(item) // Also never executes ) .step( { slug: 'handleResults', dependsOn: ['enrichedItems'] }, (input) => { // This still executes to handle the empty result if (input.enrichedItems.length === 0) { return { message: 'No items to process' }; } return { processed: input.enrichedItems.length }; } );What happens:
itemsreturns[]processedItemsandenrichedItemscomplete immediately in a single transaction without creating tasks- Each cascade-completed map outputs
[], resolving as a dependency for any dependent steps - No map handlers are executed (no wasted API calls or processing)
handleResultsreceivesenrichedItems: []and executes normally because the dependency is already resolved- The entire cascade is atomic - all map steps complete together in one transaction