Table of contents
- Opening
- Preface: What You’re Building (and Why)
- Define the Pipeline: Inputs → Steps → Outputs
- Map the Workflow: From Prompt to Repeatable Process
- Choose the Building Blocks: Storage, Compute, Scheduling
- Build and Run It: Deploy, Log, Handle Failures
- Operate and Improve: Limits, Cost, Reliability
- Next Steps: Extend the Pattern and Ship More Pipelines
Opening
In a prior organization, I helped teams take “AI work in the wild”—strong prompts buried in chat threads, one-off scripts on laptops, manual copy/paste workflows—and deploy it into production. The goal wasn’t just automation: it was making these workflows faster, drastically more reliable, and operationally boring so they could become systems the business actually relied on.
The goal wasn’t to build an AI platform—it was to ship one repeatable workflow with a clear input, a clear output, and enough operations to run unattended. This guide walks through the same pattern so you can move from scattered experiments to reliable systems without needing a huge team or a rebuild. Along the way, we’ll do one manual run end-to-end and define a concrete CSV “output contract” (with an example row and the exact enrichment prompt) so automation has a clear target.
Here’s the “minimum production bar” we’ll aim for in this post:
- Clear artifacts: every step writes an inspectable output (so debugging isn’t guesswork).
- Run identity: every run has a
run_idand a captured config (so runs are reproducible). - Failure handling: retries for expected failures, and per-item errors recorded as data (not silent drops).
- Replayability: you can rerun a failed step from stored artifacts without repeating the whole pipeline.
- Bounded cost: caps on batch size/concurrency so success doesn’t mean a surprise bill.
If you want the reference implementation, you can clone buildingaipipelines from
GitHub.
It includes the pipeline code and infrastructure-as-code—so you have a production-ready starting point to deploy your own prompts to AWS.
1. Preface: What You’re Building (and Why)
Most teams are already getting value from AI, but it often lives in fragile places: prompts in chat histories, scripts on personal machines, and workflows held together by copy/paste. That’s fine for exploration—but once the work becomes recurring or business-critical, the problem usually isn’t “better prompts.” It’s turning a one-off win into a repeatable system.
In this guide, we’ll do that by building a simple, generic pipeline around a concrete example topic: UFO sightings (UAP reports). This is a stand-in for common “find → extract → label → rank → report” workflows like competitor updates, policy monitoring, or support ticket triage. The pipeline will search recent coverage in a defined time window, extract page text into a consistent row schema (or record an extraction error per URL), use AI to label each item (for example, “on-topic/off-topic” and “sighting report vs entertainment”), score how relevant it is (a numeric relevance score), rank the results (best first), and generate a report (like a CSV). Section 2 walks through the manual version first—search → seed rows → extract text → enrich → rank—so the CSV format and step outputs are unambiguous before you automate anything.
2. Define the Pipeline: Inputs → Steps → Outputs
Let’s make this concrete with a small, manual run end-to-end. We’ll use a fun example topic—UFO sightings (UAP reports)—but the workflow is the same for any subject you’d want to track.
Start with the manual flow once. It’s the fastest way to define what “done” looks like before you automate anything.
Manual run (search → seed rows → extract text): Create a spreadsheet (this becomes your “manual pipeline output” CSV).
Then use the Google UI to search for UFO sightings (or UAP report) and collect ~10–20 candidate results. For
each result, add a row with seeding fields like run_id, query, title, source,
date, url, dedupe_key, and snippet. Next, manually extract the relevant text into
extracted_text (or record extraction_status/extraction_error if you can’t extract).
Set a simple dedupe rule now, because you’ll need it later: define dedupe_key as the normalized URL (for example, lowercased,
with tracking parameters stripped). If you see the same key twice, keep the best row and mark the other as a duplicate.
Manual enrichment + post-processing (label/score → rank → export): For each row with extracted text, run the
labeling/scoring prompt and fill label, type, relevance_score, and reason. Then sort
by relevance_score, sanity-check the top results, and export your final CSV.
Here’s an example of what your CSV can look like (seeded + partially filled):
run_id,query,title,source,date,url,dedupe_key,snippet,extracted_text,extraction_status,extraction_error,label,type,relevance_score,reason
2026-01-05,"UFO sightings (UAP reports)","Pilot reports UFO hovering beside jet, leaving air traffic ...","foxnews.com",2025-12-22,"https://www.foxnews.com/...","foxnews.com/...","Pilot says... air traffic control...","A commercial pilot reported seeing a bright object pacing the aircraft for several minutes... ATC recorded the call...",ok,,on_topic,sighting_report,78,"Firsthand report with specific details; limited corroboration."
2026-01-05,"UFO sightings (UAP reports)","Mass UFO Sightings Are Increasing - Here's Why | Encounter ...","youtube.com",2025-12-20,"https://www.youtube.com/watch?v=...","youtube.com/watch","Video discusses...",,error,video_not_extracted,,,,
2026-01-05,"UFO sightings (UAP reports)","Government report summarizes UAP cases and open questions","example.gov",2025-12-15,"https://example.gov/uap-report","example.gov/uap-report","A report says...","The report summarizes reviewed incidents, categories of explanations, and data gaps...",ok,,on_topic,government_or_policy,85,"Primary-source summary of cases and methodology."
Enrichment prompt (with an example story):
You are helping me triage search results for the topic: "UFO sightings (UAP reports)".
Return ONLY valid JSON with:
- label: "on_topic" or "off_topic"
- type: one of "sighting_report", "analysis", "government_or_policy", "entertainment", "other"
- relevance_score: integer 0-100
- reason: one short sentence explaining the score
Text:
"""
A commercial pilot reported seeing a bright object pacing the aircraft for several minutes during ascent. The pilot contacted air traffic control,
describing the object as stationary relative to the jet before accelerating away. No instrument anomalies were reported. The article cites an
interview and includes a short excerpt of the ATC transcript, but no independent verification beyond the pilot and controller comments.
"""
This manual workflow is tedious—and that’s the point. Once you feel the pain, you’ll know exactly what to automate. For now, focus on defining clean inputs, step outputs, and a CSV format you trust.
3. Map the Workflow: From Prompt to Repeatable Process
At this point, the workflow has a simple shape:
- Search export: produces a seeded CSV (the dataset you work from).
-
Extraction: fills
extracted_textby fetching each URL and recordingextraction_status/extraction_error(some sites will block automated requests, and that’s fine). - Enrichment: fills
label/type/score/reason. - Reporting: sorts/filters and exports the final ranked CSV.
This is why contracts matter: each step reads a known input (a CSV with specific columns) and produces a known output (the same CSV,
enriched). Once you have that, you can schedule it, persist artifacts per run_id, and add the operational basics (retries,
per-row errors, bounded batch sizes) without changing what “done” means.
Section 2 defines “done” by doing the work manually: you seed rows, extract text, enrich with a consistent prompt, and produce a ranked CSV you trust. Once that target is clear, automation becomes straightforward: you stop doing the repetitive parts by hand and make the workflow executable.
We’ll start by creating a small CLI script, pipeline.py, that runs this workflow as a repeatable set of commands. The first step we
automate is the search export: take a query and produce a clean, pipeline-ready CSV with the columns we already defined
(run_id, query, title, source, date when available, url,
dedupe_key, snippet, plus placeholder columns for extraction and enrichment). From there, we move to extraction (fill
extracted_text or record an error), then enrichment (fill label/type/relevance_score/reason), and finally reporting.
The code used in this guide lives in the GitHub repo
buildingaipipelines—you can clone it and run pipeline.py as-is, or use it as a starting point for your own pipeline.
Run the reference pipeline locally (optional): this mirrors the workflow you just did by hand, but turns it into three repeatable commands.
1) Install dependencies
pip install -r requirements.txt
2) Set environment variables
Create a .env file (it’s loaded automatically) with:
GOOGLE_CSE_API_KEY, GOOGLE_CSE_CX, OPENAI_API_KEY, and optionally OPENAI_MODEL.
3) Run an end-to-end example
python pipeline.py search --query "UFO sightings (UAP reports)" --num 5 --out results.csv --run-id 2026-01-05
python pipeline.py extract --in results.csv --out extracted.csv --max-rows 5
python pipeline.py enrich --in extracted.csv --out enriched.csv --max-rows 5
Notes
- Some sites block automated downloads (you’ll see
http_error:403inextraction_error). Those rows will be skipped by enrichment. - The CSV contains multi-line fields; open it in Google Sheets/Excel for easiest viewing.
Then we automated the enrichment step. Instead of manually running the prompt row-by-row and copy/pasting JSON back into a spreadsheet,
pipeline.py reads the CSV, finds rows with extracted_text (and extraction_status=ok),
calls the model using the same prompt contract, and fills label, type, relevance_score, and
reason.
You can run these commands manually while you’re iterating, or put them on a schedule with cron once the outputs look right. In the next sections,
we’ll take the same step contracts and move execution into cloud resources so the pipeline can run unattended with retries, logs, and stored artifacts.
4. Choose the Building Blocks: Storage, Compute, Scheduling
For this guide, we’ll build on AWS—not because you need “cloud for cloud’s sake,” but because AWS gives you managed building blocks that match
the exact jobs your local script is already doing. The goal is simple: take the same steps you ran with pipeline.py and run them
unattended, with artifacts saved, retries in place, and a clear run history.
The specific services we’ll use are: EventBridge Scheduler to trigger runs on a schedule, Step Functions to
orchestrate steps and retries, Lambda to run each step’s code, S3 to store artifacts per run_id,
CloudWatch Logs to make execution visible, and SSM/Secrets Manager to keep API keys and config out of code.
Think of each pipeline.py objective as a cloud step:
-
Search export (
pipeline.py search) → Lambda runs the search step, reads Google CSE credentials from SSM/Secrets Manager, and writes a seeded CSV to S3 under arun_idpath. Logs go to CloudWatch. -
Extraction (
pipeline.py extract) → Step Functions fans out across URLs (so one bad URL doesn’t fail the whole run), Lambda fetches and extracts text, and the results (including per-rowextraction_status/extraction_error) land in S3. -
Enrichment (
pipeline.py enrich) → Step Functions fans out across extracted rows, Lambda calls the model using keys in SSM/Secrets Manager, records any per-row errors as data (not crashes), and writes enriched rows back to S3. -
Reporting (your final output contract) → Lambda reads enriched artifacts from S3, sorts/filters, and
writes the final
report.csvback to S3.
That’s why the default stack is a good fit here: it mirrors the script’s responsibilities, but adds scheduling, retries, logging, and durable artifacts—without changing what “done” means.
5. Build and Run It: Deploy with Terraform, Log, Handle Failures
The reference implementation in this repo deploys the pipeline to AWS using Terraform (in infra/). The goal is
repeatability: one command to create/update the same resources every time, with no console-clicking and no secrets committed.
What Terraform creates (from infra/)
- S3 artifacts bucket (all outputs stored under
runs/<run_id>/...). -
Secrets Manager secret containing
GOOGLE_CSE_API_KEY,GOOGLE_CSE_CX,OPENAI_API_KEY, and optionalOPENAI_MODEL. - IAM roles/policies for Lambda, Step Functions, and the scheduler.
-
Lambda functions (packaged as one zip from
lambda_src/):building-ai-pipelines-search,building-ai-pipelines-extract-worker,building-ai-pipelines-extract,building-ai-pipelines-label-score,building-ai-pipelines-report. - Step Functions state machine to orchestrate the run.
- EventBridge Scheduler schedule to trigger runs automatically.
Operationally, each run is: one input payload → durable artifacts in S3 → logs in CloudWatch. The pipeline is designed so failures show up in the right place: Step Functions shows where a run stopped, and per-item extraction failures are recorded as data.
What actually runs in AWS (at a glance)
Think of this as one scheduled run that writes a folder of artifacts in S3 under runs/<run_id>/.... Step Functions is the orchestrator;
only extraction fans out in parallel.
Run input (Scheduler → Step Functions)
{
"run_id": "<<aws.scheduler.scheduled-time>>",
"query": "UFO sightings (UAP reports)",
"max_urls": 50
}
Execution flow + artifacts
-
1) Search (Lambda) → writes
runs/<run_id>/urls.json(the URL list) and returns that list to Step Functions. -
2) Extract (parallel) (Step Functions Map +
extract-workerLambda) → processes each URL independently and produces per-URL row objects (includingextraction_status/extraction_error). -
3) WriteExtracted (Lambda) → writes the Map results to
runs/<run_id>/extracted.jsonl. -
4) Label + score (Lambda) → reads
extracted.jsonl, enriches rows, writesruns/<run_id>/enriched.jsonl. (Rows that failed extraction are passed through without model calls.) -
5) Report (Lambda) → reads
enriched.jsonland writesruns/<run_id>/report.csv.
When debugging: use the Step Functions execution view to see which step failed, CloudWatch Logs for details, and S3 artifacts under the run’s
run_id to inspect outputs step-by-step.
6. Operate and Improve: IaC knobs for limits, cost, reliability
Once this runs unattended, the practical work is tuning throughput and spend while keeping runs reliable. The important part is that the knobs are explicit in the deployed system (and many are defined directly in Terraform), so changes are repeatable and reviewable.
The concrete knobs in this repo’s AWS deployment
- Work per run:
max_urlsis set in the scheduler input payload. -
Fan-out pressure: Step Functions
ExtractMapusesMaxConcurrency(currently10) to cap parallel extraction. -
Extraction safety: the map passes
timeout_s(currently10) andmax_chars(currently8000) to bound time and payload size. -
Lambda sizing: timeouts/memory are set per function in Terraform (for example,
label-scorehas a higher timeout for model calls). - Retries: Step Functions applies bounded retries for transient Lambda errors, and extraction failures are recorded per-row so runs can still complete.
-
Model cost control: the enrich step defaults to
gpt-4o-miniunlessOPENAI_MODELis set in Secrets Manager.
The operational loop stays boring by design: find the execution in Step Functions, inspect the S3 artifacts for that run_id, and adjust
caps/concurrency when you see rate limiting, timeouts, or cost spikes. Because artifacts are keyed by run_id, you can replay runs safely
without rebuilding the whole pipeline.
Finally, keep secrets out of the repo: in this implementation, Lambdas read API keys at runtime from a single Secrets Manager JSON secret whose values are provided to Terraform as sensitive variables (not committed).
7. Next Steps: Extend the Pattern and Ship More Pipelines
You’ve now seen the full pattern end-to-end: define “done” with one manual run, lock in an output contract, then turn the repetitive work into executable steps with artifacts, retries, and a schedule. That’s the move from ad-hoc AI work to a pipeline you can trust.
If you want to apply this to your real use case, here’s the next practical step: pick one workflow you run every week and write down four things—what goes in, what comes out, what can fail, and what you’re willing to spend. Then build the smallest version that produces a report you’d actually use. In our example, that means: seeded dataset → extracted text (or per-row errors) → enrichment → ranked output—first locally, then on a schedule, then in the cloud.
In this repo, “extend the pattern” means changing both code and infrastructure in lockstep: update the scheduled run payload and cadence in
infra/scheduler.tf, update orchestration (new steps, retries, concurrency) in infra/stepfunctions.tf, and implement step logic in
lambda_src/. Then redeploy with Terraform so the behavior is versioned and repeatable, with secrets still managed in Secrets Manager.
If you’re thinking “this is exactly what we need, but I don’t want to spend a month reinventing it,” that’s where I can help. I work with teams to take one high-ROI workflow and turn it into a production pipeline: clear contracts, sensible limits, reliable operation, and outputs that show up where the business needs them.
If you’d like to explore it, use the contact form and include:
- the workflow you want to automate
- where the data lives today
- what “done” looks like (the output you want)
- how often it should run
I’ll reply with a short plan and whether it’s a good fit. And either way—thanks for reading. I hope this guide gave you at least one practical idea you can use right away.