Connect on Discord
Part 1: Create AI Scraping Flow
Let’s build the backend workflow that powers your AI web scraper.
What we’ll create:
- Database table for results
- Four task functions (scrape, summarize, tag, save)
- pgflow workflow connecting them
- Compile flow to SQL and migrate
- Setup Edge Worker to run it
- Test everything locally
Step 1 - Create websites
table
Section titled “Step 1 - Create websites table”Set up your database to store AI analysis results.
-
Create a new migration:
Terminal window npx supabase migration new add_websites -
Add this SQL to the generated file:
create table public.websites (id bigserial primary key,website_url text not null,summary text,tags text[],created_at timestamptz default now()); -
Apply it:
Terminal window npx supabase migrations up --local
Step 2 - Create task functions
Section titled “Step 2 - Create task functions”Build four focused functions that each do one thing well:
File | What it does |
---|---|
scrapeWebsite.ts | Fetches webpage content |
summarize.ts | AI summary |
extractTags.ts | AI tags |
saveToDb.ts | Saves to database |
Put these in supabase/functions/_tasks
(see organizing flows code for project structure):
Web Scraping
Section titled “Web Scraping”scrapeWebsite.ts - Fetch and clean webpage content
export default async function scrapeWebsite(url: string) { console.log("[scrapeWebsite] fetching", url);
const res = await fetch(url, { signal: AbortSignal.timeout(10000), // 10 second timeout });
if (!res.ok) throw new Error(`Fetch failed: ${res.status}`);
const html = await res.text(); const text = html .replace(/<[^>]+>/g, " ") .replace(/\s+/g, " ") .trim();
return { content: text.slice(0, 25_000) }; // limit tokens}
AI Analysis
Section titled “AI Analysis”Two OpenAI functions that return structured, type-safe data:
summarize.ts - AI summary
import OpenAI from "npm:openai";
export default async function summarize(content: string) { console.log("[summarize] processing content");
const apiKey = Deno.env.get("OPENAI_API_KEY"); if (!apiKey) throw new Error("Missing OPENAI_API_KEY");
const openai = new OpenAI({ apiKey });
const response = await openai.responses.parse({ model: "gpt-4o", input: [ { role: "system", content: "Return a short paragraph summary." }, { role: "user", content }, ], text: { format: { type: "json_schema", name: "summary_format", schema: { type: "object", properties: { summary: { type: "string", description: "A short paragraph summary of the content", }, }, required: ["summary"], additionalProperties: false, }, }, }, });
return response.output_parsed.summary;}
extractTags.ts - Extract tags
import OpenAI from "npm:openai";
export default async function extractTags(content: string) { console.log("[extractTags] extracting tags");
const apiKey = Deno.env.get("OPENAI_API_KEY"); if (!apiKey) throw new Error("Missing OPENAI_API_KEY");
const openai = new OpenAI({ apiKey });
const response = await openai.responses.parse({ model: "gpt-4o", input: [ { role: "system", content: "Return 5-10 descriptive tags." }, { role: "user", content }, ], text: { format: { type: "json_schema", name: "tags_format", schema: { type: "object", properties: { tags: { type: "array", items: { type: "string", }, description: "An array of 5-10 descriptive tags", }, }, required: ["tags"], additionalProperties: false, }, }, }, });
return response.output_parsed.tags;}
Save to Database
Section titled “Save to Database”The final task saves all results to your database:
saveToDb.ts - Store results
import { createClient } from "jsr:@supabase/supabase-js";
export default async function saveToDb(row: { website_url: string; summary: string; tags: string[];}) { console.log("[saveWebsite] inserting row");
const supabaseUrl = Deno.env.get("SUPABASE_URL"); const supabaseKey = Deno.env.get("SUPABASE_SERVICE_ROLE_KEY");
if (!supabaseUrl || !supabaseKey) { throw new Error("Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY"); }
const supabase = createClient(supabaseUrl, supabaseKey);
const { data } = await supabase .from("websites") .insert(row) .select("*") .single() .throwOnError();
console.log("[saveWebsite] inserted with id:", data?.id); return data;}
Step 3 - Define flow
Section titled “Step 3 - Define flow”Connect tasks into a workflow using pgflow’s TypeScript DSL (supabase/functions/_flows/analyze_website.ts
):
import { Flow } from "npm:@pgflow/dsl";import scrapeWebsite from "../_tasks/scrapeWebsite.ts";import summarize from "../_tasks/summarize.ts";import extractTags from "../_tasks/extractTags.ts";import saveToDb from "../_tasks/saveToDb.ts";
type Input = { url: string };
export default new Flow<Input>({ slug: "analyzeWebsite", maxAttempts: 3 }) .step({ slug: "website" }, ({ run }) => scrapeWebsite(run.url)) .step({ slug: "summary", dependsOn: ["website"] }, ({ website }) => summarize(website.content), ) .step({ slug: "tags", dependsOn: ["website"] }, ({ website }) => extractTags(website.content), ) .step( { slug: "saveToDb", dependsOn: ["summary", "tags"] }, ({ run, summary, tags }) => saveToDb({ website_url: run.url, summary, tags }), );
Flow structure:
website
→ fetches the URL (root step)summary
&tags
→ run in parallel (both need website content)saveToDb
→ waits for both, then saves everything
Summary and tags execute simultaneously since both only need website content - cutting execution time in half. All state transitions happen transactionally in the database, ensuring your flow never ends up in an inconsistent state even if tasks fail or workers crash.
Step 4 - Compile & migrate
Section titled “Step 4 - Compile & migrate”Turn your TypeScript flow into SQL using pgflow’s compiler:
-
Compile TypeScript to SQL:
Terminal window npx pgflow@latest compile supabase/functions/_flows/analyze_website.ts# Generates supabase/migrations/<timestamp>_analyze_website.sqlUsing import maps or custom
deno.json
?You can use
--deno-json
flag to point at yourdeno.json
file:Terminal window npx pgflow@latest compile \--deno-json=path/to/deno.json \supabase/functions/_flows/analyze_website.tsRun
npx pgflow@latest compile --help
for additional options. -
Apply migration to database:
Terminal window npx supabase migrations up --local
The DSL compiler extracts your flow’s shape (steps, dependencies) and generates SQL that inserts it into the database. The database’s flow definition determines what runs and when. The TypeScript DSL also wires up step handlers so the Edge Worker knows which function to invoke for each step. Learn more about how pgflow works.
Step 5 - Setup Edge Worker
Section titled “Step 5 - Setup Edge Worker”-
Create a worker function that will process steps from your flow:
Terminal window npx supabase functions new analyze_website_worker -
Replace the generated
index.ts
with the following code:supabase/functions/analyze_website_worker/index.ts import { EdgeWorker } from "jsr:@pgflow/edge-worker";import AnalyzeWebsite from '../_flows/analyze_website.ts';EdgeWorker.start(AnalyzeWebsite); // That's it! 🤯 -
Update your
supabase/config.toml
:supabase/config.toml [functions.analyze_website_worker]enabled = trueverify_jwt = trueverify_jwt = false
Step 6 - Run & test
Section titled “Step 6 - Run & test”Start the services (make sure supabase start
is already running):
-
Serve Edge Functions (keep this terminal open):
npx supabase functions serve -
In a new terminal, start the worker:
curl -X POST http://127.0.0.1:54321/functions/v1/analyze_website_worker -
Trigger the flow in SQL Editor:
select * from pgflow.start_flow(flow_slug => 'analyzeWebsite',input => '{"url":"https://supabase.com"}');
What happens
Section titled “What happens”Worker output:
[scrapeWebsite] fetching https://supabase.com[summarize] processing content[extractTags] extracting tags[saveToDb] inserting row[saveToDb] inserted with id: 1
Check your database:
select website_url, tags, summary from websites;
-- Example output:-- website_url | tags | summary-- ---------------------|----------------------------------------------------|----------------------------------------------------------- https://supabase.com | {"postgres","api","backend","database","realtime"} | Supabase is an open source Firebase alternative providing-- | | a PostgreSQL database, authentication, instant APIs,-- | | realtime subscriptions, and storage.
What you’ve built
Section titled “What you’ve built”Your AI scraper pipeline:
- Auto-retries - Failed steps retry up to 3 times
- Parallel AI - Summary and tags run simultaneously
- Full history - Every run tracked in your database
- Modular code - Each task is independent and testable
- ACID guarantees - Built on pgmq, a real Postgres queue with transactional safety
Troubleshooting
Section titled “Troubleshooting”Common issues
Error | Fix |
---|---|
SASL_SIGNATURE_MISMATCH | URL-encode DB password |
Missing OPENAI_API_KEY | Add to supabase/functions/.env |
401 (Unauthorized) | Check OpenAI key is valid |
Compile errors | Run npx pgflow@latest compile --help |
No logs appearing after starting flow | Make sure you ran the curl command to boot the worker. Check supabase functions serve terminal for activity. |
Flow stuck in Created state | Worker might not be polling. Restart functions serve and curl the worker endpoint again. |
For debugging, see Monitor flow execution.
Next Steps
Section titled “Next Steps”You’ve built the backend workflow! Coming next:
- Part 2: Frontend Dashboard (coming soon) - Create a real-time UI using the upcoming pgflow client library that leverages Supabase Realtime to stream flow progress directly to the browser
In the meantime, experiment with:
- Adding more AI analysis tasks to your flow
- Creating custom flow visualizations with the pgflow monitoring tables
- Optimizing performance with parallel step execution