Map Steps
pgflow’s map steps enable parallel processing of arrays by automatically creating multiple tasks - one for each array element. This guide explains how map steps work, when to use them, and their unique characteristics.
Why Map Steps?
Section titled “Why Map Steps?”When processing arrays in workflows, you often want to:
- Apply the same operation to each element independently
- Process items in parallel for better performance
- Transform data collections efficiently
- Make batch API calls or database queries
Map steps solve these needs by providing a declarative way to spawn parallel tasks from arrays.
Mental Model: One Step, Many Tasks
Section titled “Mental Model: One Step, Many Tasks”Both step types produce a single output, but achieve it differently:
Regular Step:
Input → [Single Task] → OutputMap Step:
┌→ [Task 0] → Result 0 ┐ │→ [Task 1] → Result 1 │Array[5] │→ [Task 2] → Result 2 │→ Array[5] │→ [Task 3] → Result 3 │ └→ [Task 4] → Result 4 ┘
Each task:- Processes one element- Has own retry counter- Retries independently (but if exhausted, fails the entire step)The key insight: Map steps transform arrays element-by-element through parallel execution, but still produce one step output (an array).
How Map Steps Work
Section titled “How Map Steps Work”Task Spawning and Distribution
Section titled “Task Spawning and Distribution”When a map step becomes ready, pgflow:
- Validates the input is an array - Non-array inputs fail the step
- Creates N tasks - One per array element, with
task_index0 to N-1 - Distributes elements - Task 0 gets
array[0], task 1 getsarray[1], etc. - Queues tasks - All tasks are enqueued for parallel processing
// This map step with 3 elements....array({ slug: 'userIds' }, () => ['user1', 'user2', 'user3']).map({ slug: 'users', array: 'userIds' }, async (id) => { // Task 0 receives: 'user1' // Task 1 receives: 'user2' // Task 2 receives: 'user3' return await fetchUserData(id);})Independent Task Execution
Section titled “Independent Task Execution”Each map task is a separate function execution in the worker. Tasks execute independently, each with its own:
- Retry counter: If one task fails, only that task retries, not the entire array
- Retry isolation: Other tasks continue while one retries
This independence is crucial for resilience:
// Processing 100 API calls.map({ slug: 'apiCalls', array: 'items' }, async (item) => { return await callAPI(item); // If item #47 fails, only task #47 retries})
// vs single step processing.step({ slug: 'apiCallsBatch' }, async (input) => { // If ANY call fails, the ENTIRE step retries all 100 calls! return await Promise.all(input.items.map(item => callAPI(item)));})Output Aggregation
Section titled “Output Aggregation”As map tasks complete, their outputs are collected:
// If the map tasks return:// Task 0: { name: 'Alice', id: 'user1' }// Task 1: { name: 'Bob', id: 'user2' }// Task 2: { name: 'Charlie', id: 'user3' }
// The aggregated output becomes:[ { name: 'Alice', id: 'user1' }, { name: 'Bob', id: 'user2' }, { name: 'Charlie', id: 'user3' }]Important: Order is preserved based on task_index, not completion time.
Handler Input Constraint
Section titled “Handler Input Constraint”Map step handlers receive only the individual array element, not the full input object that regular step handlers receive:
Regular Step Handler:
.step({ slug: 'regular' }, async (input) => { // input contains: // - input.run: The original flow input // - input.dependencyName: Output from each dependency return processAllData(input);})Map Step Handler:
.map({ slug: 'mapStep', array: 'source' }, async (item) => { // item is ONLY the individual array element // No access to input.run or other dependencies return processItem(item);})This constraint keeps map handlers focused on transforming individual elements. If a map handler needs additional data (like flow input or other step outputs), include that data in the array elements via a previous step. Future versions will provide access to flow input via the handler’s context parameter.
Root Maps vs Dependent Maps
Section titled “Root Maps vs Dependent Maps”Map steps operate in two modes:
Root Maps
Section titled “Root Maps”Process the flow’s input array directly by omitting the array property:
// Flow input MUST be an arraynew Flow<string[]>({ slug: 'batch_processor' }) .map( { slug: 'processEach' }, // No 'array' property (item) => processItem(item) );Starting this flow:
SELECT pgflow.start_flow( flow_slug => 'batch_processor', input => '["item1", "item2", "item3"]'::jsonb);Dependent Maps
Section titled “Dependent Maps”Process another step’s array output by specifying the array property:
new Flow<{ searchQuery: string }>({ slug: 'search_pipeline' }) .array( { slug: 'searchResults' }, async (input) => await searchAPI(input.run.searchQuery) ) .map( { slug: 'processResults', array: 'searchResults' }, // Processes search output (result) => extractRelevantData(result) );Edge Cases and Special Behaviors
Section titled “Edge Cases and Special Behaviors”Empty Array Cascade
Section titled “Empty Array Cascade”When a map step receives an empty array ([]):
- The step completes immediately without creating tasks or executing handlers
- The step outputs an empty array
[] - Dependent map steps also receive
[]and complete immediately - This cascades through the entire chain in a single transaction
// If items array is empty.array({ slug: 'items' }, () => []).map({ slug: 'processedItems', array: 'items' }, (item) => transform(item)).map({ slug: 'enrichedItems', array: 'processedItems' }, (item) => enrich(item))
// Result: All three steps complete with []NULL Handling
Section titled “NULL Handling”NULL values in arrays are preserved through task distribution and appear in the aggregated output at their original positions.
Type Violations
Section titled “Type Violations”When a step outputs non-array data to a map step:
- The SQL Core detects the type violation
- The entire run is marked as failed
- The invalid output is stored for debugging
- All queued messages are archived to prevent orphaned tasks
// This will fail the run:.step({ slug: 'single' }, () => ({ not: 'an array' })).map({ slug: 'map', array: 'single' }, (item) => item) // Type violation!Limitations and Considerations
Section titled “Limitations and Considerations”Map steps have a single dependency specified via the array property (or the flow input for root maps). Map steps cannot use the dependsOn property.
If your map handler needs data from other steps or must wait for additional dependencies, use an array step to compose and enrich the data first:
// Get config and items from separate steps.step({ slug: 'config' }, () => ({ apiKey: 'secret' })).array({ slug: 'items' }, () => ['item1', 'item2'])
// Array step enriches items with config data.array( { slug: 'enrichedItems', dependsOn: ['items', 'config'] // Array steps CAN use dependsOn }, (input) => { // Enrich each item with the config data it needs return input.items.map(item => ({ item, apiKey: input.config.apiKey })); })
// Map step processes the enriched array.map( { slug: 'processItems', array: 'enrichedItems' }, // Only 'array', no 'dependsOn' async (enriched) => { // Handler receives enriched object with both item and apiKey return await processItem(enriched.item, enriched.apiKey); })This pattern keeps map steps focused on parallel processing while array steps handle dependency coordination and data composition.
Summary
Section titled “Summary”Map steps provide powerful parallel array processing:
- Automatic Parallelization: One step spawns N parallel tasks
- Simple Handler API: Focus on transforming individual elements
- Order Preservation: Results maintain array element order
- Graceful Edge Cases: Empty arrays and NULLs handled correctly
- Type Safety: TypeScript ensures correct array types
Use map steps when you need to process collections efficiently, but remember their constraints around dependencies and context access. For complex scenarios requiring flow context, enrich your array elements before mapping.
For practical examples of using map steps, see the Process arrays in parallel guide.