Skip to content

Part 1: Create AI Scraping Flow

Let’s build the backend workflow that powers your AI web scraper.

What we’ll create:

  1. Database table for results
  2. Four task functions (scrape, summarize, tag, save)
  3. pgflow workflow connecting them
  4. Compile flow to SQL and migrate
  5. Setup Edge Worker to run it
  6. Test everything locally

Set up your database to store AI analysis results.

  1. Create a new migration:

    Terminal window
    npx supabase migration new add_websites
  2. Add this SQL to the generated file:

    create table public.websites (
    id bigserial primary key,
    website_url text not null,
    summary text,
    tags text[],
    created_at timestamptz default now()
    );
  3. Apply it:

    Terminal window
    npx supabase migrations up --local

Build four focused functions that each do one thing well:

FileWhat it does
scrapeWebsite.tsFetches webpage content
summarize.tsAI summary
extractTags.tsAI tags
saveToDb.tsSaves to database

Put these in supabase/functions/_tasks (see organizing flows code for project structure):

scrapeWebsite.ts - Fetch and clean webpage content
supabase/functions/_tasks/scrapeWebsite.ts
export default async function scrapeWebsite(url: string) {
console.log("[scrapeWebsite] fetching", url);
const res = await fetch(url, {
signal: AbortSignal.timeout(10000), // 10 second timeout
});
if (!res.ok) throw new Error(`Fetch failed: ${res.status}`);
const html = await res.text();
const text = html
.replace(/<[^>]+>/g, " ")
.replace(/\s+/g, " ")
.trim();
return { content: text.slice(0, 25_000) }; // limit tokens
}

Two OpenAI functions that return structured, type-safe data:

summarize.ts - AI summary
supabase/functions/_tasks/summarize.ts
import OpenAI from "npm:openai";
export default async function summarize(content: string) {
console.log("[summarize] processing content");
const apiKey = Deno.env.get("OPENAI_API_KEY");
if (!apiKey) throw new Error("Missing OPENAI_API_KEY");
const openai = new OpenAI({ apiKey });
const response = await openai.responses.parse({
model: "gpt-4o",
input: [
{ role: "system", content: "Return a short paragraph summary." },
{ role: "user", content },
],
text: {
format: {
type: "json_schema",
name: "summary_format",
schema: {
type: "object",
properties: {
summary: {
type: "string",
description: "A short paragraph summary of the content",
},
},
required: ["summary"],
additionalProperties: false,
},
},
},
});
return response.output_parsed.summary;
}
extractTags.ts - Extract tags
supabase/functions/_tasks/extractTags.ts
import OpenAI from "npm:openai";
export default async function extractTags(content: string) {
console.log("[extractTags] extracting tags");
const apiKey = Deno.env.get("OPENAI_API_KEY");
if (!apiKey) throw new Error("Missing OPENAI_API_KEY");
const openai = new OpenAI({ apiKey });
const response = await openai.responses.parse({
model: "gpt-4o",
input: [
{ role: "system", content: "Return 5-10 descriptive tags." },
{ role: "user", content },
],
text: {
format: {
type: "json_schema",
name: "tags_format",
schema: {
type: "object",
properties: {
tags: {
type: "array",
items: {
type: "string",
},
description: "An array of 5-10 descriptive tags",
},
},
required: ["tags"],
additionalProperties: false,
},
},
},
});
return response.output_parsed.tags;
}

The final task saves all results to your database:

saveToDb.ts - Store results
supabase/functions/_tasks/saveToDb.ts
import { createClient } from "jsr:@supabase/supabase-js";
export default async function saveToDb(row: {
website_url: string;
summary: string;
tags: string[];
}) {
console.log("[saveWebsite] inserting row");
const supabaseUrl = Deno.env.get("SUPABASE_URL");
const supabaseKey = Deno.env.get("SUPABASE_SERVICE_ROLE_KEY");
if (!supabaseUrl || !supabaseKey) {
throw new Error("Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY");
}
const supabase = createClient(supabaseUrl, supabaseKey);
const { data } = await supabase
.from("websites")
.insert(row)
.select("*")
.single()
.throwOnError();
console.log("[saveWebsite] inserted with id:", data?.id);
return data;
}

Connect tasks into a workflow using pgflow’s TypeScript DSL (supabase/functions/_flows/analyze_website.ts):

supabase/functions/_flows/analyze_website.ts
import { Flow } from "npm:@pgflow/dsl";
import scrapeWebsite from "../_tasks/scrapeWebsite.ts";
import summarize from "../_tasks/summarize.ts";
import extractTags from "../_tasks/extractTags.ts";
import saveToDb from "../_tasks/saveToDb.ts";
type Input = { url: string };
export default new Flow<Input>({ slug: "analyzeWebsite", maxAttempts: 3 })
.step({ slug: "website" }, ({ run }) => scrapeWebsite(run.url))
.step({ slug: "summary", dependsOn: ["website"] }, ({ website }) =>
summarize(website.content),
)
.step({ slug: "tags", dependsOn: ["website"] }, ({ website }) =>
extractTags(website.content),
)
.step(
{ slug: "saveToDb", dependsOn: ["summary", "tags"] },
({ run, summary, tags }) =>
saveToDb({ website_url: run.url, summary, tags }),
);

Flow structure:

AI Web Scraper Workflow
  • website → fetches the URL (root step)
  • summary & tags → run in parallel (both need website content)
  • saveToDb → waits for both, then saves everything

Summary and tags execute simultaneously since both only need website content - cutting execution time in half. All state transitions happen transactionally in the database, ensuring your flow never ends up in an inconsistent state even if tasks fail or workers crash.


Turn your TypeScript flow into SQL using pgflow’s compiler:

  1. Compile TypeScript to SQL:

    Terminal window
    npx pgflow@latest compile supabase/functions/_flows/analyze_website.ts
    # Generates supabase/migrations/<timestamp>_analyze_website.sql
    Using import maps or custom deno.json?

    You can use --deno-json flag to point at your deno.json file:

    Terminal window
    npx pgflow@latest compile \
    --deno-json=path/to/deno.json \
    supabase/functions/_flows/analyze_website.ts

    Run npx pgflow@latest compile --help for additional options.

  2. Apply migration to database:

    Terminal window
    npx supabase migrations up --local

The DSL compiler extracts your flow’s shape (steps, dependencies) and generates SQL that inserts it into the database. The database’s flow definition determines what runs and when. The TypeScript DSL also wires up step handlers so the Edge Worker knows which function to invoke for each step. Learn more about how pgflow works.


  1. Create a worker function that will process steps from your flow:

    Terminal window
    npx supabase functions new analyze_website_worker
  2. Replace the generated index.ts with the following code:

    supabase/functions/analyze_website_worker/index.ts
    import { EdgeWorker } from "jsr:@pgflow/edge-worker";
    import AnalyzeWebsite from '../_flows/analyze_website.ts';
    EdgeWorker.start(AnalyzeWebsite); // That's it! 🤯
  3. Update your supabase/config.toml:

    supabase/config.toml
    [functions.analyze_website_worker]
    enabled = true
    verify_jwt = true
    verify_jwt = false

Start the services (make sure supabase start is already running):

  1. Serve Edge Functions (keep this terminal open):

    npx supabase functions serve
  2. In a new terminal, start the worker:

    curl -X POST http://127.0.0.1:54321/functions/v1/analyze_website_worker
  3. Trigger the flow in SQL Editor:

    select * from pgflow.start_flow(
    flow_slug => 'analyzeWebsite',
    input => '{"url":"https://supabase.com"}'
    );

Worker output:

[scrapeWebsite] fetching https://supabase.com
[summarize] processing content
[extractTags] extracting tags
[saveToDb] inserting row
[saveToDb] inserted with id: 1

Check your database:

select website_url, tags, summary from websites;
-- Example output:
-- website_url | tags | summary
-- ---------------------|----------------------------------------------------|---------------------------------------------------------
-- https://supabase.com | {"postgres","api","backend","database","realtime"} | Supabase is an open source Firebase alternative providing
-- | | a PostgreSQL database, authentication, instant APIs,
-- | | realtime subscriptions, and storage.

Your AI scraper pipeline:

  • Auto-retries - Failed steps retry up to 3 times
  • Parallel AI - Summary and tags run simultaneously
  • Full history - Every run tracked in your database
  • Modular code - Each task is independent and testable
  • ACID guarantees - Built on pgmq, a real Postgres queue with transactional safety

Common issues
ErrorFix
SASL_SIGNATURE_MISMATCHURL-encode DB password
Missing OPENAI_API_KEYAdd to supabase/functions/.env
401 (Unauthorized)Check OpenAI key is valid
Compile errorsRun npx pgflow@latest compile --help
No logs appearing after starting flowMake sure you ran the curl command to boot the worker. Check supabase functions serve terminal for activity.
Flow stuck in Created stateWorker might not be polling. Restart functions serve and curl the worker endpoint again.

For debugging, see Monitor flow execution.


You’ve built the backend workflow! Coming next:

  • Part 2: Frontend Dashboard (coming soon) - Create a real-time UI using the upcoming pgflow client library that leverages Supabase Realtime to stream flow progress directly to the browser

In the meantime, experiment with:

  • Adding more AI analysis tasks to your flow
  • Creating custom flow visualizations with the pgflow monitoring tables
  • Optimizing performance with parallel step execution

Join the Community

Connect on Discord

Have questions or need help? pgflow is just getting started - join us on Discord to ask questions, share feedback, or discuss partnership opportunities.
Join Discord →