← All posts

Building a data pipeline with Python: a practical guide

6 min read

How to build a reliable data pipeline in Python for small business automation. Ingestion, transformation, storage, and scheduling with real code examples.

Data pipeline core cleaning, combining, transforming raw data from various sources into a stable foundation for automation.

Every automation project I plan starts with the same question: where does the data come from and where does it need to go? The answer is almost never simple. Data arrives from APIs, spreadsheets, email attachments, and database exports. It needs to be cleaned, combined, transformed, and stored somewhere useful.

That's a data pipeline. It's not glamorous, but it's the foundation that everything else depends on. Get the pipeline right and the rest of the project is straightforward. Get it wrong and you'll spend more time debugging data issues than building the actual product.

Here's how I think about building pipelines in Python.

The four stages

Every data pipeline, no matter how complex, follows four stages.

1. Ingest

Pulling data from its source. This could be:

  • API calls: Most SaaS tools (Shopify, Xero, Zendesk, etc.) have REST APIs
  • File reads: CSVs, Excel files from email or shared folders
  • Database queries: Direct SQL against a source database
  • Web scraping: For data that doesn't have an API (use responsibly)

The key principle: never trust the source data. APIs return unexpected nulls. CSVs have inconsistent encodings. Excel files have merged cells. Always validate what you receive before processing it.

import requests

def fetch_orders(api_key: str, since: str) -> list[dict]:
    resp = requests.get(
        "https://api.example.com/orders",
        headers={"Authorization": f"Bearer {api_key}"},
        params={"created_after": since},
    )
    resp.raise_for_status()
    return resp.json()["orders"]

2. Transform

Cleaning, normalising, and reshaping the data. This is always the messiest stage.

Common transformations:

  • Column renaming: Standardise field names across sources
  • Type casting: Dates as dates, numbers as numbers
  • Deduplication: Remove or merge duplicate records
  • Normalisation: Currency conversion, timezone alignment, unit standardisation
  • Enrichment: Join with additional data sources (e.g., add customer info to order data)

pandas is the default tool here, and for good reason. It handles 90% of transformation tasks cleanly.

import pandas as pd

def clean_orders(raw: list[dict]) -> pd.DataFrame:
    df = pd.DataFrame(raw)
    df = df.rename(columns={"created_at": "date", "total_price": "amount"})
    df["date"] = pd.to_datetime(df["date"])
    df["amount"] = pd.to_numeric(df["amount"])
    df = df.drop_duplicates(subset=["order_id"])
    return df

3. Store

Putting the cleaned data somewhere it can be queried, analysed, or fed into downstream systems.

Options:

  • Postgres / MySQL: Best for structured data that needs to be queried with SQL. My default choice for most projects.
  • SQLite: Great for small-scale projects or prototyping. Zero configuration.
  • Google Sheets: Surprisingly useful for small businesses who want to see and edit data manually. The API works well.
  • CSV / Parquet files: For batch workflows that don't need real-time access.
from sqlalchemy import create_engine

engine = create_engine("postgresql://user:pass@localhost/mydb")
df.to_sql("orders", engine, if_exists="append", index=False)

4. Schedule

Running the pipeline automatically on a schedule. A pipeline that only runs when you remember to trigger it isn't useful.

Options:

  • cron: Simple, reliable, runs on any Linux/Mac machine. My go-to for most projects.
  • Systemd timers: Like cron but with better logging and dependency management.
  • GitHub Actions: Free for public repos, useful for scheduled tasks that don't need private infrastructure.
  • Cloud schedulers: AWS Lambda + EventBridge, Google Cloud Functions + Cloud Scheduler. More setup, but scales automatically.
# Run every day at 6am
0 6 * * * cd /opt/pipeline && python run.py >> /var/log/pipeline.log 2>&1

Error handling matters more than you think

The difference between a prototype pipeline and a production pipeline is error handling. Things that will go wrong:

  • The source API is down or rate-limited
  • A CSV has a new column or a changed format
  • A required field is null for some records
  • The database connection drops mid-write
  • A transformation produces unexpected results

For each failure mode, the pipeline needs to either retry, skip the bad record and continue, or fail loudly and notify you.

import logging

logger = logging.getLogger(__name__)

def safe_fetch(api_key: str, since: str, retries: int = 3) -> list[dict]:
    for attempt in range(retries):
        try:
            return fetch_orders(api_key, since)
        except requests.HTTPError as e:
            if e.response.status_code == 429:
                time.sleep(30 * (attempt + 1))
            else:
                raise
    logger.error("Failed to fetch orders after %d retries", retries)
    return []

Warning

Silent failures are the worst kind. If your pipeline silently skips bad data or swallows errors, you'll find out weeks later when someone notices the numbers don't add up. Fail loudly: log errors, send alerts, and never pretend a failure didn't happen.

When to use Python vs dedicated ETL tools

Python is the right choice when:

  • You need custom transformation logic
  • Your data sources are varied (APIs + files + databases)
  • You're building the pipeline alongside an ML model or automation system
  • The team already knows Python

Dedicated ETL tools (Airbyte, Fivetran, dbt) are better when:

  • You're mainly moving data between well-known SaaS tools
  • The transformations are mostly SQL-based
  • You want a managed service with built-in monitoring
  • Non-technical team members need to configure connectors

For most custom AI projects, Python pipelines make more sense because the data transformations are too specific for pre-built connectors.

Key Takeaways

  • Every pipeline has four stages: ingest, transform, store, schedule. Get each one right.
  • Never trust source data. Validate everything on ingest.
  • Error handling separates prototypes from production. Fail loudly, retry intelligently, and never silently skip bad data.
  • pandas plus Postgres plus cron covers 90% of small business pipeline needs.
  • Use dedicated ETL tools when you're connecting standard SaaS apps. Use Python when you need custom logic.

Need a pipeline built?

Data pipelines are usually the first thing I build on any automation project. If your business needs data flowing reliably between systems, get in touch and I'll scope it out.


Related reading: