Table of contents
- Opening
- Preface: What You’re Building (and Why)
- Define the Pipeline: Inputs → Steps → Outputs
- Map the Workflow: From Prompt to Repeatable Process
- Choose the Building Blocks: Storage, Compute, Scheduling
- Build and Run It: Deploy, Log, Handle Failures
- Operate and Improve: Limits, Cost, Reliability
- Next Steps: Extend the Pattern and Ship More Pipelines
Opening
In a prior organization, I took a bunch of “AI work in the wild”—strong prompts buried in chat threads, one-off scripts on laptops, manual copy/paste workflows—and turned it into production pipelines the team could actually rely on.
The goal wasn’t to build an AI platform—it was to ship one repeatable workflow with a clear input, a clear output, and enough operations to run unattended. This guide walks through the same pattern so you can move from scattered experiments to reliable systems without needing a huge team or a rebuild. Along the way, we’ll do one manual run end-to-end and define a concrete CSV “output contract” (with an example row and the exact enrichment prompt) so automation has a clear target.
Here’s the “minimum production bar” we’ll aim for in this post:
- Clear artifacts: every step writes an inspectable output (so debugging isn’t guesswork).
- Run identity: every run has a
run_idand a captured config (so runs are reproducible). - Failure handling: retries for expected failures, and per-item errors recorded as data (not silent drops).
- Replayability: you can rerun a failed step from stored artifacts without repeating the whole pipeline.
- Bounded cost: caps on batch size/concurrency so success doesn’t mean a surprise bill.
1. Preface: What You’re Building (and Why)
Most teams are already getting value from AI, but it often lives in fragile places: prompts in chat histories, scripts on personal machines, and workflows held together by copy/paste. That’s fine for exploration—but once the work becomes recurring or business-critical, the problem usually isn’t “better prompts.” It’s turning a one-off win into a repeatable system.
In this guide, we’ll do that by building a simple, generic pipeline around a concrete example topic: UFO sightings (UAP reports). This is a stand-in for common “find → extract → label → rank → report” workflows like competitor updates, policy monitoring, or support ticket triage. The pipeline will search recent coverage in a defined time window, extract page text into a consistent row schema (or record an extraction error per URL), use AI to label each item (for example, “on-topic/off-topic” and “sighting report vs entertainment”), score how relevant it is (a numeric relevance score), rank the results (best first), and generate a report (like a CSV). Section 2 walks through the manual version first—search → seed rows → extract text → enrich → rank—so the CSV format and step outputs are unambiguous before you automate anything.
2. Define the Pipeline: Inputs → Steps → Outputs
Let’s make this concrete with a small, manual run end-to-end. We’ll use a fun example topic—UFO sightings (UAP reports)—but the workflow is the same for any subject you’d want to track.
Start with the manual flow once. It’s the fastest way to define what “done” looks like before you automate anything.
Manual run (search → seed rows → extract text): Create a spreadsheet (this becomes your “manual pipeline output” CSV).
Then use the Google UI to search for UFO sightings (or UAP report) and collect ~10–20 candidate results. For
each result, add a row with seeding fields like run_id, query, title, source,
date, url, dedupe_key, and snippet. Next, manually extract the relevant text into
extracted_text (or record extraction_status/extraction_error if you can’t extract).
Set a simple dedupe rule now, because you’ll need it later: define dedupe_key as the normalized URL (for example, lowercased,
with tracking parameters stripped). If you see the same key twice, keep the best row and mark the other as a duplicate.
Manual enrichment + post-processing (label/score → rank → export): For each row with extracted text, run the
labeling/scoring prompt and fill label, type, relevance_score, and reason. Then sort
by relevance_score, sanity-check the top results, and export your final CSV.
Here’s an example of what your CSV can look like (seeded + partially filled):
run_id,query,title,source,date,url,dedupe_key,snippet,extracted_text,extraction_status,extraction_error,label,type,relevance_score,reason
2026-01-05,"UFO sightings (UAP reports)","Pilot reports UFO hovering beside jet, leaving air traffic ...","foxnews.com",2025-12-22,"https://www.foxnews.com/...","foxnews.com/...","Pilot says... air traffic control...","A commercial pilot reported seeing a bright object pacing the aircraft for several minutes... ATC recorded the call...",ok,,on_topic,sighting_report,78,"Firsthand report with specific details; limited corroboration."
2026-01-05,"UFO sightings (UAP reports)","Mass UFO Sightings Are Increasing - Here's Why | Encounter ...","youtube.com",2025-12-20,"https://www.youtube.com/watch?v=...","youtube.com/watch","Video discusses...",,error,video_not_extracted,,,,
2026-01-05,"UFO sightings (UAP reports)","Government report summarizes UAP cases and open questions","example.gov",2025-12-15,"https://example.gov/uap-report","example.gov/uap-report","A report says...","The report summarizes reviewed incidents, categories of explanations, and data gaps...",ok,,on_topic,government_or_policy,85,"Primary-source summary of cases and methodology."
Enrichment prompt (with an example story):
You are helping me triage search results for the topic: "UFO sightings (UAP reports)".
Return ONLY valid JSON with:
- label: "on_topic" or "off_topic"
- type: one of "sighting_report", "analysis", "government_or_policy", "entertainment", "other"
- relevance_score: integer 0-100
- reason: one short sentence explaining the score
Text:
"""
A commercial pilot reported seeing a bright object pacing the aircraft for several minutes during ascent. The pilot contacted air traffic control,
describing the object as stationary relative to the jet before accelerating away. No instrument anomalies were reported. The article cites an
interview and includes a short excerpt of the ATC transcript, but no independent verification beyond the pilot and controller comments.
"""
This manual workflow is tedious—and that’s the point. Once you feel the pain, you’ll know exactly what to automate. For now, focus on defining clean inputs, step outputs, and a CSV format you trust.
3. Map the Workflow: From Prompt to Repeatable Process
Section 2 defines “done” by doing the work manually: you seed rows, extract text, enrich with a consistent prompt, and produce a ranked CSV you trust. Once that target is clear, automation becomes straightforward: you stop doing the repetitive parts by hand and make the workflow executable.
The first thing we automated was the most annoying part: collecting and seeding rows. The Google results UI is fine for browsing, but
it’s a bad interface for producing a clean dataset. So we wrote pipeline.py, which takes a query and outputs a
pipeline-ready CSV with the columns we already defined (run_id, query, title, source,
date when available, url, dedupe_key, snippet, plus placeholder columns for
extraction and enrichment).
Then we automated the enrichment step. Instead of manually running the prompt row-by-row and copy/pasting JSON back into a spreadsheet,
pipeline.py reads the CSV, finds rows with extracted_text (and extraction_status=ok),
calls the model using the same prompt contract, and fills label, type, relevance_score, and
reason.
At this point, the workflow has a simple shape:
- Search export: produces a seeded CSV (the dataset you work from).
- Extraction: fills
extracted_text(manual for now, but this becomes its own automated step later). - Enrichment: fills
label/type/score/reason. - Reporting: sorts/filters and exports the final ranked CSV.
This is why contracts matter: each step reads a known input (a CSV with specific columns) and produces a known output (the same CSV,
enriched). Once you have that, you can schedule it, persist artifacts per run_id, and add the operational basics (retries,
per-row errors, bounded batch sizes) without changing what “done” means.
4. Choose the Building Blocks: Storage, Compute, Scheduling
For this guide, we’ll build on AWS. AWS is a cloud platform with managed building blocks you can combine into a pipeline without running servers yourself. The reason we’ll use it here is simple: if you want low cost and low operational overhead, AWS’s serverless and managed services let you pay mostly for what you use, keep deployments small, and avoid inventing infrastructure.
We’ll use a simple default stack: EventBridge (schedule) → Step Functions (orchestration/retries) → Lambda (compute per step) → S3 (artifacts). Add DynamoDB only when you need durable state for dedupe/idempotency or run tracking at scale, and add SQS only when you need reliable fan-out + buffering across lots of URLs. Use CloudWatch Logs for observability and SSM/Secrets Manager for API keys.
5. Build and Run It: Deploy, Log, Handle Failures
To run the UFO/UAP pipeline on AWS, we’ll deploy four small compute steps and wire them together so they run on a schedule and write artifacts you can inspect when something goes wrong.
Before deploying, it helps to have a working reference implementation. pipeline.py is that reference: it produces the seeded
CSV and can run the enrichment step locally using the same prompt contract you validated in Section 2. In AWS, we’ll keep the same
contracts, but run each step as managed compute on a schedule, with artifacts stored per run_id.
Start with the foundations: storage and secrets. Create an S3 bucket for pipeline artifacts, and use a simple convention like
runs/<run_id>/urls.json, runs/<run_id>/extracted.jsonl,
runs/<run_id>/enriched.jsonl, and runs/<run_id>/report.csv. Put API keys (LLM, etc.) in Secrets
Manager or SSM Parameter Store so nothing sensitive is hardcoded.
Next deploy the compute. Implement four steps—search, extract, label_score, and
report—each with a clear input/output contract. Each step should: (1) read an input event, (2) write an output artifact to
S3 under the current run_id, and (3) return a small JSON payload containing run_id and the S3 keys the next step
should read.
If you want a simple way to keep implementations aligned, think of the cloud steps as the scheduled, production version of what your script does locally:
pipeline.py search: corresponds to thesearchstep (seed the dataset).-
pipeline.py enrich: corresponds tolabel_score(filllabel/type/relevance_score/reasonfromextracted_text).
Step contracts (what each step reads/writes)
Use contracts like these as your “glue.” They make the system debuggable and let you rerun steps without repeating earlier work—and
they’re the same contracts your pipeline.py script follows locally.
Run input (from the schedule)
{
"run_id": "2026-01-05",
"query": "UFO sightings (UAP reports)",
"date_range": "past_month",
"max_urls": 50
}
1) search → writes runs/<run_id>/urls.json
{
"run_id": "2026-01-05",
"urls_s3_key": "runs/2026-01-05/urls.json"
}
Example urls.json contents (minimum):
[
{ "url": "https://example.com/a", "title": "…", "source": "Example", "published_at": "2026-01-03" }
]
2) extract (reads URL list) → writes runs/<run_id>/extracted.jsonl
{
"run_id": "2026-01-05",
"urls_s3_key": "runs/2026-01-05/urls.json",
"extracted_s3_key": "runs/2026-01-05/extracted.jsonl"
}
Each JSONL line should be a row. If extraction fails, record it as data so the run still completes:
{"url":"https://example.com/a","dedupe_key":"example.com/a","title":"…","source":"Example","date":"2026-01-03","extracted_text":"…","extraction_status":"ok","extraction_error":null}
{"url":"https://paywalled.com/b","dedupe_key":"paywalled.com/b","title":"…","source":"Paywalled","date":"2026-01-02","extracted_text":"","extraction_status":"error","extraction_error":"paywall_or_blocked"}
3) label_score (reads extracted rows) → writes runs/<run_id>/enriched.jsonl
{
"run_id": "2026-01-05",
"extracted_s3_key": "runs/2026-01-05/extracted.jsonl",
"enriched_s3_key": "runs/2026-01-05/enriched.jsonl"
}
Enriched rows should preserve the original fields and add label, type, relevance_score, and
reason. If labeling fails for a row, record an error field and continue.
4) report (reads enriched rows) → writes runs/<run_id>/report.csv
{
"run_id": "2026-01-05",
"enriched_s3_key": "runs/2026-01-05/enriched.jsonl",
"report_s3_key": "runs/2026-01-05/report.csv"
}
Finally wire up orchestration and scheduling. Create a Step Functions state machine that invokes the four steps in order (with retries
and timeouts) and passes run_id + S3 keys between steps. Then create an EventBridge Scheduler (or EventBridge rule) that
starts the state machine daily/weekly with the run input payload.
After that, “deployment” is done: the schedule triggers runs, Step Functions shows you exactly where a run failed, CloudWatch Logs shows
details for each step, and S3 holds artifacts for each run_id so you can replay or backfill safely.
6. Operate and Improve: Limits, Cost, Reliability
Once the pipeline is running on a schedule, the work shifts from “can it run?” to “can it keep running cheaply and reliably?” In practice, the first constraints you’ll hit are external limits (source rate limits, LLM API quotas) and time/cost tradeoffs as you increase the number of URLs you process.
Start by treating throughput as a knob. Put a hard cap on work per run (max URLs), and keep everything keyed by run_id so you
can resume or replay without redoing earlier steps.
Reliability comes from handling partial failure intentionally. For extraction and enrichment, assume some rows will fail (bad pages, timeouts, model hiccups). Record failures as data in the row, keep the run moving, and ship a report even if it contains some errors. Then you can re-run just the failed rows later instead of throwing away the whole run.
At the system level, your operational loop should be: see where runs fail, inspect the artifacts for that run_id, and retry
only what needs retrying. Over time, you’ll tune the same knobs—batching, concurrency, retries, and timeouts—until the pipeline produces
consistent reports on schedule without surprise bills.
7. Next Steps: Extend the Pattern and Ship More Pipelines
You’ve now seen the full pattern end-to-end: define “done” with one manual run, lock in a CSV output contract, then turn the repetitive steps into executable contracts. That’s the core move from ad-hoc AI work to a pipeline you can trust.
A practical next step is to take one workflow you run weekly and write down: run input → row schema → step outputs → failure modes → cost caps. Then implement the smallest version that produces a report you’d actually use. In our example, that means: seeded CSV, consistent enrichment, and a ranked output—first locally, then on a schedule.
If you want help applying this pattern to your real use case—or want a second set of eyes on your pipeline design—I’m open to a conversation. To start, use the contact form.