Skip to content

Process arrays in parallel

This guide shows practical patterns for using map steps to process collections of data in parallel, from simple transformations to complex batch operations.

A common use case is fetching and processing multiple web pages in parallel:

import { Flow } from '@pgflow/dsl/supabase';
const ScrapeMultipleUrls = new Flow<string[]>({ // Flow input must be array for root map
slug: 'scrape_multiple_urls',
maxAttempts: 3,
})
.map(
{ slug: 'scrapedPages' },
async (url) => await scrapeWebpage(url)
)
.step(
{ slug: 'summary', dependsOn: ['scrapedPages'] },
(input) => summarizeResults(input.scrapedPages)
);
// Usage (SQL):
SELECT pgflow.start_flow(
flow_slug => 'scrape_multiple_urls',
input => '["https://example.com", "https://example.org", "https://example.net"]'::jsonb
);

Transform and validate data collections efficiently.

const CsvProcessor = new Flow<{ csvUrl: string }>({
slug: 'csv_processor',
})
.array(
{ slug: 'csvRows' },
async (input) => await fetchAndParseCSV(input.run.csvUrl)
)
.map(
{ slug: 'validatedRows', array: 'csvRows' },
(row) => validateRow(row)
)
.map(
{ slug: 'processedRows', array: 'validatedRows' },
(row) => transformIfValid(row)
)
.step(
{ slug: 'saveResults', dependsOn: ['processedRows'] },
async (input, context) => await saveProcessedData(input.processedRows, context)
);

Enriching Array Elements with Additional Data

Section titled “Enriching Array Elements with Additional Data”

Map handlers only receive individual array elements. If a handler needs access to the original flow input (run), outputs from other dependencies, or any additional data, include that data in the array elements via a previous step.

Problem: Map Handlers Can’t Access Flow Context

Section titled “Problem: Map Handlers Can’t Access Flow Context”
// This won't work - map handler can't access input.run
const ProblemFlow = new Flow<{ apiKey: string, ids: string[] }>({
slug: 'problem_flow',
})
.map({ slug: 'fetch' }, async (id) => {
// Can't access input.run.apiKey here!
return await fetchWithKey(id, ???); // No access to apiKey
});
const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({
slug: 'solution_flow',
})
.array(
{ slug: 'prepareItems' },
(input) => {
// Include needed context in each element
return input.run.ids.map(id => ({
id,
apiKey: input.run.apiKey
}));
}
)
.map(
{ slug: 'fetch', array: 'prepareItems' },
async (item) => {
// Now we have access to both id and apiKey
return await fetchWithKey(item.id, item.apiKey);
}
);

This pattern applies whenever a map handler needs any data beyond the array elements themselves. Add a step before the map that enriches the array elements with whatever data the handler needs - whether that’s the original flow input (run), outputs from other dependencies, or both.

When a map step receives an empty array, pgflow optimizes by completing the entire chain immediately:

const EmptyHandling = new Flow<{}>({
slug: 'empty_handling',
})
.array({ slug: 'items' }, async () => {
const results = await fetchData();
return results || []; // Might return []
})
.map(
{ slug: 'processedItems', array: 'items' },
(item) => processItem(item) // Never executes if items is []
)
.map(
{ slug: 'enrichedItems', array: 'processedItems' },
(item) => enrichItem(item) // Also never executes
)
.step(
{ slug: 'handleResults', dependsOn: ['enrichedItems'] },
(input) => {
// This still executes to handle the empty result
if (input.enrichedItems.length === 0) {
return { message: 'No items to process' };
}
return { processed: input.enrichedItems.length };
}
);

What happens:

  • items returns []
  • processedItems and enrichedItems complete immediately in a single transaction without creating tasks
  • Each cascade-completed map outputs [], resolving as a dependency for any dependent steps
  • No map handlers are executed (no wasted API calls or processing)
  • handleResults receives enrichedItems: [] and executes normally because the dependency is already resolved
  • The entire cascade is atomic - all map steps complete together in one transaction