Skip to content

Map Steps

pgflow’s map steps enable parallel processing of arrays by automatically creating multiple tasks - one for each array element. This guide explains how map steps work, when to use them, and their unique characteristics.

When processing arrays in workflows, you often want to:

  • Apply the same operation to each element independently
  • Process items in parallel for better performance
  • Transform data collections efficiently
  • Make batch API calls or database queries

Map steps solve these needs by providing a declarative way to spawn parallel tasks from arrays.

Both step types produce a single output, but achieve it differently:

Regular Step:

Input → [Single Task] → Output

Map Step:

┌→ [Task 0] → Result 0 ┐
│→ [Task 1] → Result 1 │
Array[5] │→ [Task 2] → Result 2 │→ Array[5]
│→ [Task 3] → Result 3 │
└→ [Task 4] → Result 4 ┘
Each task:
- Processes one element
- Has own retry counter
- Retries independently (but if exhausted, fails the entire step)

The key insight: Map steps transform arrays element-by-element through parallel execution, but still produce one step output (an array).

When a map step becomes ready, pgflow:

  1. Validates the input is an array - Non-array inputs fail the step
  2. Creates N tasks - One per array element, with task_index 0 to N-1
  3. Distributes elements - Task 0 gets array[0], task 1 gets array[1], etc.
  4. Queues tasks - All tasks are enqueued for parallel processing
// This map step with 3 elements...
.array({ slug: 'userIds' }, () => ['user1', 'user2', 'user3'])
.map({ slug: 'users', array: 'userIds' }, async (id) => {
// Task 0 receives: 'user1'
// Task 1 receives: 'user2'
// Task 2 receives: 'user3'
return await fetchUserData(id);
})

Each map task is a separate function execution in the worker. Tasks execute independently, each with its own:

  • Retry counter: If one task fails, only that task retries, not the entire array
  • Retry isolation: Other tasks continue while one retries

This independence is crucial for resilience:

// Processing 100 API calls
.map({ slug: 'apiCalls', array: 'items' }, async (item) => {
return await callAPI(item); // If item #47 fails, only task #47 retries
})
// vs single step processing
.step({ slug: 'apiCallsBatch' }, async (input) => {
// If ANY call fails, the ENTIRE step retries all 100 calls!
return await Promise.all(input.items.map(item => callAPI(item)));
})

As map tasks complete, their outputs are collected:

// If the map tasks return:
// Task 0: { name: 'Alice', id: 'user1' }
// Task 1: { name: 'Bob', id: 'user2' }
// Task 2: { name: 'Charlie', id: 'user3' }
// The aggregated output becomes:
[
{ name: 'Alice', id: 'user1' },
{ name: 'Bob', id: 'user2' },
{ name: 'Charlie', id: 'user3' }
]

Important: Order is preserved based on task_index, not completion time.

Map step handlers receive only the individual array element, not the full input object that regular step handlers receive:

Regular Step Handler:

.step({ slug: 'regular' }, async (input) => {
// input contains:
// - input.run: The original flow input
// - input.dependencyName: Output from each dependency
return processAllData(input);
})

Map Step Handler:

.map({ slug: 'mapStep', array: 'source' }, async (item) => {
// item is ONLY the individual array element
// No access to input.run or other dependencies
return processItem(item);
})

This constraint keeps map handlers focused on transforming individual elements. If a map handler needs additional data (like flow input or other step outputs), include that data in the array elements via a previous step. Future versions will provide access to flow input via the handler’s context parameter.

Map steps operate in two modes:

Process the flow’s input array directly by omitting the array property:

// Flow input MUST be an array
new Flow<string[]>({ slug: 'batch_processor' })
.map(
{ slug: 'processEach' }, // No 'array' property
(item) => processItem(item)
);

Starting this flow:

SELECT pgflow.start_flow(
flow_slug => 'batch_processor',
input => '["item1", "item2", "item3"]'::jsonb
);

Process another step’s array output by specifying the array property:

new Flow<{ searchQuery: string }>({ slug: 'search_pipeline' })
.array(
{ slug: 'searchResults' },
async (input) => await searchAPI(input.run.searchQuery)
)
.map(
{ slug: 'processResults', array: 'searchResults' }, // Processes search output
(result) => extractRelevantData(result)
);

When a map step receives an empty array ([]):

  1. The step completes immediately without creating tasks or executing handlers
  2. The step outputs an empty array []
  3. Dependent map steps also receive [] and complete immediately
  4. This cascades through the entire chain in a single transaction
// If items array is empty
.array({ slug: 'items' }, () => [])
.map({ slug: 'processedItems', array: 'items' }, (item) => transform(item))
.map({ slug: 'enrichedItems', array: 'processedItems' }, (item) => enrich(item))
// Result: All three steps complete with []

NULL values in arrays are preserved through task distribution and appear in the aggregated output at their original positions.

When a step outputs non-array data to a map step:

  1. The SQL Core detects the type violation
  2. The entire run is marked as failed
  3. The invalid output is stored for debugging
  4. All queued messages are archived to prevent orphaned tasks
// This will fail the run:
.step({ slug: 'single' }, () => ({ not: 'an array' }))
.map({ slug: 'map', array: 'single' }, (item) => item) // Type violation!

Map steps have a single dependency specified via the array property (or the flow input for root maps). Map steps cannot use the dependsOn property.

If your map handler needs data from other steps or must wait for additional dependencies, use an array step to compose and enrich the data first:

// Get config and items from separate steps
.step({ slug: 'config' }, () => ({ apiKey: 'secret' }))
.array({ slug: 'items' }, () => ['item1', 'item2'])
// Array step enriches items with config data
.array(
{
slug: 'enrichedItems',
dependsOn: ['items', 'config'] // Array steps CAN use dependsOn
},
(input) => {
// Enrich each item with the config data it needs
return input.items.map(item => ({
item,
apiKey: input.config.apiKey
}));
}
)
// Map step processes the enriched array
.map(
{ slug: 'processItems', array: 'enrichedItems' }, // Only 'array', no 'dependsOn'
async (enriched) => {
// Handler receives enriched object with both item and apiKey
return await processItem(enriched.item, enriched.apiKey);
}
)

This pattern keeps map steps focused on parallel processing while array steps handle dependency coordination and data composition.

Map steps provide powerful parallel array processing:

  • Automatic Parallelization: One step spawns N parallel tasks
  • Simple Handler API: Focus on transforming individual elements
  • Order Preservation: Results maintain array element order
  • Graceful Edge Cases: Empty arrays and NULLs handled correctly
  • Type Safety: TypeScript ensures correct array types

Use map steps when you need to process collections efficiently, but remember their constraints around dependencies and context access. For complex scenarios requiring flow context, enrich your array elements before mapping.

For practical examples of using map steps, see the Process arrays in parallel guide.