Building AI Pipelines: A Guide to Turning Ad-Hoc AI Work into Production Systems

A practical playbook for turning prompts and one-off scripts into valuable systems your organization can rely on.

Table of contents

  1. Opening
  2. Preface: What You’re Building (and Why)
  3. Define the Pipeline: Inputs → Steps → Outputs
  4. Map the Workflow: From Prompt to Repeatable Process
  5. Choose the Building Blocks: Storage, Compute, Scheduling
  6. Build and Run It: Deploy, Log, Handle Failures
  7. Operate and Improve: Limits, Cost, Reliability
  8. Next Steps: Extend the Pattern and Ship More Pipelines

Opening

In a prior organization, I helped teams take “AI work in the wild”—strong prompts buried in chat threads, one-off scripts on laptops, manual copy/paste workflows—and deploy it into production. The goal wasn’t just automation: it was making these workflows faster, drastically more reliable, and operationally boring so they could become systems the business actually relied on.

The goal wasn’t to build an AI platform—it was to ship one repeatable workflow with a clear input, a clear output, and enough operations to run unattended. This guide walks through the same pattern so you can move from scattered experiments to reliable systems without needing a huge team or a rebuild. Along the way, we’ll do one manual run end-to-end and define a concrete CSV “output contract” (with an example row and the exact enrichment prompt) so automation has a clear target.

Here’s the “minimum production bar” we’ll aim for in this post:

  • Clear artifacts: every step writes an inspectable output (so debugging isn’t guesswork).
  • Run identity: every run has a run_id and a captured config (so runs are reproducible).
  • Failure handling: retries for expected failures, and per-item errors recorded as data (not silent drops).
  • Replayability: you can rerun a failed step from stored artifacts without repeating the whole pipeline.
  • Bounded cost: caps on batch size/concurrency so success doesn’t mean a surprise bill.

If you want the reference implementation, you can clone buildingaipipelines from GitHub. It includes the pipeline code and infrastructure-as-code—so you have a production-ready starting point to deploy your own prompts to AWS.

1. Preface: What You’re Building (and Why)

Most teams are already getting value from AI, but it often lives in fragile places: prompts in chat histories, scripts on personal machines, and workflows held together by copy/paste. That’s fine for exploration—but once the work becomes recurring or business-critical, the problem usually isn’t “better prompts.” It’s turning a one-off win into a repeatable system.

In this guide, we’ll do that by building a simple, generic pipeline around a concrete example topic: UFO sightings (UAP reports). This is a stand-in for common “find → extract → label → rank → report” workflows like competitor updates, policy monitoring, or support ticket triage. The pipeline will search recent coverage in a defined time window, extract page text into a consistent row schema (or record an extraction error per URL), use AI to label each item (for example, “on-topic/off-topic” and “sighting report vs entertainment”), score how relevant it is (a numeric relevance score), rank the results (best first), and generate a report (like a CSV). Section 2 walks through the manual version first—search → seed rows → extract text → enrich → rank—so the CSV format and step outputs are unambiguous before you automate anything.

2. Define the Pipeline: Inputs → Steps → Outputs

Let’s make this concrete with a small, manual run end-to-end. We’ll use a fun example topic—UFO sightings (UAP reports)—but the workflow is the same for any subject you’d want to track.

Start with the manual flow once. It’s the fastest way to define what “done” looks like before you automate anything.

Manual run (search → seed rows → extract text): Create a spreadsheet (this becomes your “manual pipeline output” CSV). Then use the Google UI to search for UFO sightings (or UAP report) and collect ~10–20 candidate results. For each result, add a row with seeding fields like run_id, query, title, source, date, url, dedupe_key, and snippet. Next, manually extract the relevant text into extracted_text (or record extraction_status/extraction_error if you can’t extract).

Set a simple dedupe rule now, because you’ll need it later: define dedupe_key as the normalized URL (for example, lowercased, with tracking parameters stripped). If you see the same key twice, keep the best row and mark the other as a duplicate.

Manual enrichment + post-processing (label/score → rank → export): For each row with extracted text, run the labeling/scoring prompt and fill label, type, relevance_score, and reason. Then sort by relevance_score, sanity-check the top results, and export your final CSV.

Here’s an example of what your CSV can look like (seeded + partially filled):

run_id,query,title,source,date,url,dedupe_key,snippet,extracted_text,extraction_status,extraction_error,label,type,relevance_score,reason
2026-01-05,"UFO sightings (UAP reports)","Pilot reports UFO hovering beside jet, leaving air traffic ...","foxnews.com",2025-12-22,"https://www.foxnews.com/...","foxnews.com/...","Pilot says... air traffic control...","A commercial pilot reported seeing a bright object pacing the aircraft for several minutes... ATC recorded the call...",ok,,on_topic,sighting_report,78,"Firsthand report with specific details; limited corroboration."
2026-01-05,"UFO sightings (UAP reports)","Mass UFO Sightings Are Increasing - Here's Why | Encounter ...","youtube.com",2025-12-20,"https://www.youtube.com/watch?v=...","youtube.com/watch","Video discusses...",,error,video_not_extracted,,,,
2026-01-05,"UFO sightings (UAP reports)","Government report summarizes UAP cases and open questions","example.gov",2025-12-15,"https://example.gov/uap-report","example.gov/uap-report","A report says...","The report summarizes reviewed incidents, categories of explanations, and data gaps...",ok,,on_topic,government_or_policy,85,"Primary-source summary of cases and methodology."

Enrichment prompt (with an example story):

You are helping me triage search results for the topic: "UFO sightings (UAP reports)".

Return ONLY valid JSON with:
- label: "on_topic" or "off_topic"
- type: one of "sighting_report", "analysis", "government_or_policy", "entertainment", "other"
- relevance_score: integer 0-100
- reason: one short sentence explaining the score

Text:
"""
A commercial pilot reported seeing a bright object pacing the aircraft for several minutes during ascent. The pilot contacted air traffic control,
describing the object as stationary relative to the jet before accelerating away. No instrument anomalies were reported. The article cites an
interview and includes a short excerpt of the ATC transcript, but no independent verification beyond the pilot and controller comments.
"""

This manual workflow is tedious—and that’s the point. Once you feel the pain, you’ll know exactly what to automate. For now, focus on defining clean inputs, step outputs, and a CSV format you trust.

3. Map the Workflow: From Prompt to Repeatable Process

At this point, the workflow has a simple shape:

  • Search export: produces a seeded CSV (the dataset you work from).
  • Extraction: fills extracted_text by fetching each URL and recording extraction_status/extraction_error (some sites will block automated requests, and that’s fine).
  • Enrichment: fills label/type/score/reason.
  • Reporting: sorts/filters and exports the final ranked CSV.

This is why contracts matter: each step reads a known input (a CSV with specific columns) and produces a known output (the same CSV, enriched). Once you have that, you can schedule it, persist artifacts per run_id, and add the operational basics (retries, per-row errors, bounded batch sizes) without changing what “done” means.

Section 2 defines “done” by doing the work manually: you seed rows, extract text, enrich with a consistent prompt, and produce a ranked CSV you trust. Once that target is clear, automation becomes straightforward: you stop doing the repetitive parts by hand and make the workflow executable.

We’ll start by creating a small CLI script, pipeline.py, that runs this workflow as a repeatable set of commands. The first step we automate is the search export: take a query and produce a clean, pipeline-ready CSV with the columns we already defined (run_id, query, title, source, date when available, url, dedupe_key, snippet, plus placeholder columns for extraction and enrichment). From there, we move to extraction (fill extracted_text or record an error), then enrichment (fill label/type/relevance_score/reason), and finally reporting.

The code used in this guide lives in the GitHub repo buildingaipipelines—you can clone it and run pipeline.py as-is, or use it as a starting point for your own pipeline.

Run the reference pipeline locally (optional): this mirrors the workflow you just did by hand, but turns it into three repeatable commands.

1) Install dependencies

pip install -r requirements.txt

2) Set environment variables

Create a .env file (it’s loaded automatically) with: GOOGLE_CSE_API_KEY, GOOGLE_CSE_CX, OPENAI_API_KEY, and optionally OPENAI_MODEL.

3) Run an end-to-end example

python pipeline.py search  --query "UFO sightings (UAP reports)" --num 5 --out results.csv --run-id 2026-01-05
python pipeline.py extract --in results.csv   --out extracted.csv --max-rows 5
python pipeline.py enrich  --in extracted.csv --out enriched.csv  --max-rows 5

Notes

  • Some sites block automated downloads (you’ll see http_error:403 in extraction_error). Those rows will be skipped by enrichment.
  • The CSV contains multi-line fields; open it in Google Sheets/Excel for easiest viewing.

Then we automated the enrichment step. Instead of manually running the prompt row-by-row and copy/pasting JSON back into a spreadsheet, pipeline.py reads the CSV, finds rows with extracted_text (and extraction_status=ok), calls the model using the same prompt contract, and fills label, type, relevance_score, and reason.

You can run these commands manually while you’re iterating, or put them on a schedule with cron once the outputs look right. In the next sections, we’ll take the same step contracts and move execution into cloud resources so the pipeline can run unattended with retries, logs, and stored artifacts.

4. Choose the Building Blocks: Storage, Compute, Scheduling

For this guide, we’ll build on AWS—not because you need “cloud for cloud’s sake,” but because AWS gives you managed building blocks that match the exact jobs your local script is already doing. The goal is simple: take the same steps you ran with pipeline.py and run them unattended, with artifacts saved, retries in place, and a clear run history.

The specific services we’ll use are: EventBridge Scheduler to trigger runs on a schedule, Step Functions to orchestrate steps and retries, Lambda to run each step’s code, S3 to store artifacts per run_id, CloudWatch Logs to make execution visible, and SSM/Secrets Manager to keep API keys and config out of code.

Think of each pipeline.py objective as a cloud step:

  • Search export (pipeline.py search) → Lambda runs the search step, reads Google CSE credentials from SSM/Secrets Manager, and writes a seeded CSV to S3 under a run_id path. Logs go to CloudWatch.
  • Extraction (pipeline.py extract) → Step Functions fans out across URLs (so one bad URL doesn’t fail the whole run), Lambda fetches and extracts text, and the results (including per-row extraction_status/extraction_error) land in S3.
  • Enrichment (pipeline.py enrich) → Step Functions fans out across extracted rows, Lambda calls the model using keys in SSM/Secrets Manager, records any per-row errors as data (not crashes), and writes enriched rows back to S3.
  • Reporting (your final output contract) → Lambda reads enriched artifacts from S3, sorts/filters, and writes the final report.csv back to S3.

That’s why the default stack is a good fit here: it mirrors the script’s responsibilities, but adds scheduling, retries, logging, and durable artifacts—without changing what “done” means.

5. Build and Run It: Deploy with Terraform, Log, Handle Failures

The reference implementation in this repo deploys the pipeline to AWS using Terraform (in infra/). The goal is repeatability: one command to create/update the same resources every time, with no console-clicking and no secrets committed.

What Terraform creates (from infra/)

  • S3 artifacts bucket (all outputs stored under runs/<run_id>/...).
  • Secrets Manager secret containing GOOGLE_CSE_API_KEY, GOOGLE_CSE_CX, OPENAI_API_KEY, and optional OPENAI_MODEL.
  • IAM roles/policies for Lambda, Step Functions, and the scheduler.
  • Lambda functions (packaged as one zip from lambda_src/): building-ai-pipelines-search, building-ai-pipelines-extract-worker, building-ai-pipelines-extract, building-ai-pipelines-label-score, building-ai-pipelines-report.
  • Step Functions state machine to orchestrate the run.
  • EventBridge Scheduler schedule to trigger runs automatically.

Operationally, each run is: one input payloaddurable artifacts in S3logs in CloudWatch. The pipeline is designed so failures show up in the right place: Step Functions shows where a run stopped, and per-item extraction failures are recorded as data.

What actually runs in AWS (at a glance)

Think of this as one scheduled run that writes a folder of artifacts in S3 under runs/<run_id>/.... Step Functions is the orchestrator; only extraction fans out in parallel.

Run input (Scheduler → Step Functions)

{
  "run_id": "<<aws.scheduler.scheduled-time>>",
  "query": "UFO sightings (UAP reports)",
  "max_urls": 50
}

Execution flow + artifacts

  • 1) Search (Lambda) → writes runs/<run_id>/urls.json (the URL list) and returns that list to Step Functions.
  • 2) Extract (parallel) (Step Functions Map + extract-worker Lambda) → processes each URL independently and produces per-URL row objects (including extraction_status/extraction_error).
  • 3) WriteExtracted (Lambda) → writes the Map results to runs/<run_id>/extracted.jsonl.
  • 4) Label + score (Lambda) → reads extracted.jsonl, enriches rows, writes runs/<run_id>/enriched.jsonl. (Rows that failed extraction are passed through without model calls.)
  • 5) Report (Lambda) → reads enriched.jsonl and writes runs/<run_id>/report.csv.

When debugging: use the Step Functions execution view to see which step failed, CloudWatch Logs for details, and S3 artifacts under the run’s run_id to inspect outputs step-by-step.

6. Operate and Improve: IaC knobs for limits, cost, reliability

Once this runs unattended, the practical work is tuning throughput and spend while keeping runs reliable. The important part is that the knobs are explicit in the deployed system (and many are defined directly in Terraform), so changes are repeatable and reviewable.

The concrete knobs in this repo’s AWS deployment

  • Work per run: max_urls is set in the scheduler input payload.
  • Fan-out pressure: Step Functions ExtractMap uses MaxConcurrency (currently 10) to cap parallel extraction.
  • Extraction safety: the map passes timeout_s (currently 10) and max_chars (currently 8000) to bound time and payload size.
  • Lambda sizing: timeouts/memory are set per function in Terraform (for example, label-score has a higher timeout for model calls).
  • Retries: Step Functions applies bounded retries for transient Lambda errors, and extraction failures are recorded per-row so runs can still complete.
  • Model cost control: the enrich step defaults to gpt-4o-mini unless OPENAI_MODEL is set in Secrets Manager.

The operational loop stays boring by design: find the execution in Step Functions, inspect the S3 artifacts for that run_id, and adjust caps/concurrency when you see rate limiting, timeouts, or cost spikes. Because artifacts are keyed by run_id, you can replay runs safely without rebuilding the whole pipeline.

Finally, keep secrets out of the repo: in this implementation, Lambdas read API keys at runtime from a single Secrets Manager JSON secret whose values are provided to Terraform as sensitive variables (not committed).

7. Next Steps: Extend the Pattern and Ship More Pipelines

You’ve now seen the full pattern end-to-end: define “done” with one manual run, lock in an output contract, then turn the repetitive work into executable steps with artifacts, retries, and a schedule. That’s the move from ad-hoc AI work to a pipeline you can trust.

If you want to apply this to your real use case, here’s the next practical step: pick one workflow you run every week and write down four things—what goes in, what comes out, what can fail, and what you’re willing to spend. Then build the smallest version that produces a report you’d actually use. In our example, that means: seeded dataset → extracted text (or per-row errors) → enrichment → ranked output—first locally, then on a schedule, then in the cloud.

In this repo, “extend the pattern” means changing both code and infrastructure in lockstep: update the scheduled run payload and cadence in infra/scheduler.tf, update orchestration (new steps, retries, concurrency) in infra/stepfunctions.tf, and implement step logic in lambda_src/. Then redeploy with Terraform so the behavior is versioned and repeatable, with secrets still managed in Secrets Manager.

If you’re thinking “this is exactly what we need, but I don’t want to spend a month reinventing it,” that’s where I can help. I work with teams to take one high-ROI workflow and turn it into a production pipeline: clear contracts, sensible limits, reliable operation, and outputs that show up where the business needs them.

If you’d like to explore it, use the contact form and include:

  • the workflow you want to automate
  • where the data lives today
  • what “done” looks like (the output you want)
  • how often it should run

I’ll reply with a short plan and whether it’s a good fit. And either way—thanks for reading. I hope this guide gave you at least one practical idea you can use right away.