Run your Flow
Now that you’ve defined and compiled your workflow, it’s time to execute it!
In this guide, we’ll set up an Edge Worker to process your workflow tasks, trigger your first flow, and observe its execution.
1. Create a worker function
Section titled “1. Create a worker function”Create a new Edge Function that will process tasks for your workflow:
npx supabase functions new greet_user_worker
Replace contents of index.ts
file with the following:
import { EdgeWorker } from "jsr:@pgflow/edge-worker";import GreetUser from '../_flows/greet_user.ts';
// Pass the flow definition to the Edge WorkerEdgeWorker.start(GreetUser);
Disable JWT verification
Section titled “Disable JWT verification”Disable JWT verification for now by editing supabase/config.toml
:
[functions.greet_user_worker]enabled = trueverify_jwt = trueverify_jwt = falseimport_map = "./functions/greet_user_worker/deno.json"
2. Start the Edge Runtime
Section titled “2. Start the Edge Runtime”Start the Edge Runtime to make your worker function available:
npx supabase functions serve
This will start the Edge Runtime server but not yet start your worker. You should see similar output in your terminal:
Setting up Edge Functions runtime...Serving functions on http://127.0.0.1:54321/functions/v1/<function-name>Using supabase-edge-runtime-1.67.4 (compatible with Deno v1.45.2)serving the request with supabase/functions/greet_user_worker
3. Start your worker
Section titled “3. Start your worker”In a new terminal, send an HTTP request to start your worker:
curl http://localhost:54321/functions/v1/greet_user_worker
You should see output in your Edge Runtime terminal indicating the worker has started:
[Info] [INFO] worker_id=unknown module=DenoAdapter DenoAdapter logger instance created and working.
[Info] [INFO] worker_id=unknown module=DenoAdapter HTTP Request: null
4. Trigger your first flow
Section titled “4. Trigger your first flow”Now let’s start a flow run! Using Supabase Studio:
- Open Supabase Studio in your browser (typically at http://localhost:54323)
- Navigate to the SQL Editor
- Execute this SQL to start your workflow:
SELECT * FROM pgflow.start_flow( flow_slug => 'greet_user', input => '{"first_name": "Alice", "last_name": "Smith"}'::jsonb);
This will:
- Create a new run for your workflow
- Start the root steps
- Return information about the new run
The output should look like:
run_id | flow_slug | status | input | output | remaining_steps--------------+--------------+---------+-----------------------------------------------+--------+-----------------<run_id UUID> | greet_user | started | {"first_name": "Alice", "last_name": "Smith"} | null | 2
5. Monitor execution
Section titled “5. Monitor execution”Your worker should start processing tasks immediately. You should see log output in the Edge Runtime terminal as each step executes:
[Info] [INFO] worker_id=<long UUID> module=ExecutionController Scheduling execution of task 1
[Info] [INFO] worker_id=<long UUID> module=ExecutionController Scheduling execution of task 2
You can check the run status using Supabase Studio:
In the SQL Editor, run this query:
SELECT * FROM pgflow.runsWHERE flow_slug = 'greet_user'ORDER BY started_at DESCLIMIT 1;
As steps complete, you’ll see the remaining_steps
count decrease. When it reaches 0, the run will be marked as completed
:
run_id | flow_slug | status | input | output | remaining_steps--------------+--------------+-----------+-----------------------------------------------+-------------------------------------+-----------------<run_id UUID> | greet_user | completed | {"first_name": "Alice", "last_name": "Smith"} | {"greeting": "Hello, Alice Smith!"} | 0
What’s happening behind the scenes?
Section titled “What’s happening behind the scenes?”When you trigger a workflow:
-
Flow Initialization:
- A new run is created in
pgflow.runs
- States for all steps are created in
pgflow.step_states
- Tasks for root steps are created in
pgflow.step_tasks
- Messages are sent to the queue for root steps
- A new run is created in
-
Task Processing:
- The worker polls the queue for available tasks
- For each task, it executes the appropriate step handler
- After successful execution, it calls
pgflow.complete_task
- If a step fails, it calls
pgflow.fail_task
which may retry the task
-
Dependency Resolution:
- When a step completes, its dependent steps become eligible for execution
- The system automatically creates tasks for these dependent steps
- The process continues until all steps complete or the workflow fails
-
Run Completion:
- When all steps complete, the run is marked as completed
- The outputs from all leaf steps (those with no dependents) are collected as the run output
Congratulations!
Section titled “Congratulations!”You’ve successfully:
- Created a workflow definition
- Compiled it to SQL
- Set up a worker to process tasks
- Triggered a flow execution
- Monitored its progress
This completes the pgflow getting started guide. You now have all the basics needed to start building your own workflows!