Jamie Software Lab
Home / Projects / Data Pipeline
Python ETL Pydantic SQLite Scheduling

Data Pipeline

A real data ingestion pipeline that fetches, validates, transforms, and stores structured data. Built with Python, Pydantic schemas, retry logic, dead letter queues, and full audit logging. Used in the Market Snapshot and GlobeScraper projects.

Runs Daily + on-demand
Data sources yfinance + 7 scrapers
Validation Pydantic v2
Storage SQLite + JSON
Pipeline healthy Last run: today 0 dead letters

Pipeline Architecture

The pipeline follows a six-stage model. Each stage is a pure function that takes input and produces output : making it testable, retryable, and easy to debug.

Data Flow
Data Source
API / scraper
Ingestion
fetch + retry
Validation
schema check
Transform
normalize + enrich
Storage
SQLite / JSON
Analytics
web view
01
Ingestion Worker

Fetches raw data from external sources. Uses requests with configurable timeouts, exponential backoff, and per-source rate limiting. Each run gets a unique batch ID.

02
Validation Layer

Pydantic models define the expected shape of every data record. Invalid records are rejected with detailed error messages and routed to the dead letter queue.

03
Transformation

Normalizes fields (dates, currencies, encodings), enriches with calculated values, and deduplicates against the existing dataset. Pure functions, fully unit-tested.

Schema Validation

Every record is validated against a Pydantic model before it enters the pipeline. This catches malformed data early and provides clear error messages for debugging.

python : Pydantic schema for market data
from pydantic import BaseModel, Field, field_validator
from datetime import date

class MarketRecord(BaseModel):
    """Validated schema for a single daily market snapshot."""
    symbol: str = Field(..., pattern=r"^[A-Z]{1,5}$")
    date: date
    high: float = Field(..., ge=0)
    low: float = Field(..., ge=0)
    close: float = Field(..., ge=0)
    volume: int = Field(..., ge=0)

    @field_validator("high")
    def high_gte_low(cls, v, info):
        if "low" in info.data and v < info.data["low"]:
            raise ValueError("high must be >= low")
        return v
python : GlobeScraper listing schema
class RentalListing(BaseModel):
    """Schema for a scraped rental listing."""
    source: str = Field(..., max_length=50)
    title: str = Field(..., min_length=5, max_length=200)
    price_thb: float = Field(..., ge=0, le=500000)
    city: str
    url: str = Field(..., pattern=r"^https?://")
    scraped_at: datetime

    @field_validator("title")
    def sanitize_title(cls, v):
        # Strip HTML tags and control characters
        import re
        return re.sub(r"<[^>]+>", "", v).strip()

Resilience Patterns

External data sources fail. The pipeline handles this gracefully with retries, dead letter queues, and clear failure reporting.

🔄
Retry Handling

Every fetch uses exponential backoff: 1s → 2s → 4s → 8s, max 3 retries. Retries only on transient errors (timeouts, 5xx responses). Permanent failures (4xx) skip immediately.

def fetch_with_retry(url, max_retries=3):
    for attempt in range(max_retries):
        try:
            resp = requests.get(url, timeout=10)
            resp.raise_for_status()
            return resp.json()
        except (Timeout, ConnectionError):
            wait = 2 ** attempt
            logger.warning("Retry %d in %ds", attempt, wait)
            time.sleep(wait)
        except HTTPError as e:
            if e.response.status_code < 500:
                raise  # Don't retry 4xx
    raise PipelineError("Max retries exceeded")
💀
Dead Letter Queue

Records that fail validation or transformation are written to a dead_letters.jsonl file with the original data, error message, batch ID, and timestamp. I can inspect and reprocess them manually.

def send_to_dlq(record, error, batch_id):
    entry = {
        "batch_id": batch_id,
        "timestamp": now_iso(),
        "record": record,
        "error": str(error),
    }
    with open("dead_letters.jsonl", "a") as f:
        f.write(json.dumps(entry) + "\n")
    logger.error("DLQ: %s", error)

Audit Logs & Versioned Datasets

Every pipeline run is logged with full metadata: source, record count, validation failures, duration, and output location. Datasets are versioned by date.

json : Audit log entry
{
  "batch_id": "batch_20260304_001",
  "source": "yfinance",
  "started_at": "2026-03-04T00:10:00Z",
  "completed_at": "2026-03-04T00:10:04Z",
  "duration_ms": 4102,
  "records_fetched": 1,
  "records_valid": 1,
  "records_rejected": 0,
  "output": "/data/market-snapshot.json",
  "status": "success"
}
📁
Dataset Versioning

Output files include the date: snapshot_2026-03-04.json. The "latest" file is a symlink to the most recent output. Previous versions are retained for 30 days, then archived. Git tracks schema changes, not data.

📝
Structured Logging

Pipeline logs use JSON format with consistent fields: batch_id, stage, level, message. This makes it easy to filter by batch to trace a run from ingestion to storage.