Data Pipeline
A real data ingestion pipeline that fetches, validates, transforms, and stores structured data. Built with Python, Pydantic schemas, retry logic, dead letter queues, and full audit logging. Used in the Market Snapshot and GlobeScraper projects.
Pipeline Architecture
The pipeline follows a six-stage model. Each stage is a pure function that takes input and produces output : making it testable, retryable, and easy to debug.
Fetches raw data from external sources. Uses requests with configurable timeouts,
exponential backoff, and per-source rate limiting. Each run gets a unique batch ID.
Pydantic models define the expected shape of every data record. Invalid records are rejected with detailed error messages and routed to the dead letter queue.
Normalizes fields (dates, currencies, encodings), enriches with calculated values, and deduplicates against the existing dataset. Pure functions, fully unit-tested.
Schema Validation
Every record is validated against a Pydantic model before it enters the pipeline. This catches malformed data early and provides clear error messages for debugging.
from pydantic import BaseModel, Field, field_validator from datetime import date class MarketRecord(BaseModel): """Validated schema for a single daily market snapshot.""" symbol: str = Field(..., pattern=r"^[A-Z]{1,5}$") date: date high: float = Field(..., ge=0) low: float = Field(..., ge=0) close: float = Field(..., ge=0) volume: int = Field(..., ge=0) @field_validator("high") def high_gte_low(cls, v, info): if "low" in info.data and v < info.data["low"]: raise ValueError("high must be >= low") return v
class RentalListing(BaseModel): """Schema for a scraped rental listing.""" source: str = Field(..., max_length=50) title: str = Field(..., min_length=5, max_length=200) price_thb: float = Field(..., ge=0, le=500000) city: str url: str = Field(..., pattern=r"^https?://") scraped_at: datetime @field_validator("title") def sanitize_title(cls, v): # Strip HTML tags and control characters import re return re.sub(r"<[^>]+>", "", v).strip()
Resilience Patterns
External data sources fail. The pipeline handles this gracefully with retries, dead letter queues, and clear failure reporting.
Every fetch uses exponential backoff: 1s → 2s → 4s → 8s, max 3 retries. Retries only on transient errors (timeouts, 5xx responses). Permanent failures (4xx) skip immediately.
def fetch_with_retry(url, max_retries=3): for attempt in range(max_retries): try: resp = requests.get(url, timeout=10) resp.raise_for_status() return resp.json() except (Timeout, ConnectionError): wait = 2 ** attempt logger.warning("Retry %d in %ds", attempt, wait) time.sleep(wait) except HTTPError as e: if e.response.status_code < 500: raise # Don't retry 4xx raise PipelineError("Max retries exceeded")
Records that fail validation or transformation are written to a dead_letters.jsonl
file with the original data, error message, batch ID, and timestamp.
I can inspect and reprocess them manually.
def send_to_dlq(record, error, batch_id): entry = { "batch_id": batch_id, "timestamp": now_iso(), "record": record, "error": str(error), } with open("dead_letters.jsonl", "a") as f: f.write(json.dumps(entry) + "\n") logger.error("DLQ: %s", error)
Audit Logs & Versioned Datasets
Every pipeline run is logged with full metadata: source, record count, validation failures, duration, and output location. Datasets are versioned by date.
{
"batch_id": "batch_20260304_001",
"source": "yfinance",
"started_at": "2026-03-04T00:10:00Z",
"completed_at": "2026-03-04T00:10:04Z",
"duration_ms": 4102,
"records_fetched": 1,
"records_valid": 1,
"records_rejected": 0,
"output": "/data/market-snapshot.json",
"status": "success"
}
Output files include the date: snapshot_2026-03-04.json.
The "latest" file is a symlink to the most recent output. Previous versions
are retained for 30 days, then archived. Git tracks schema changes, not data.
Pipeline logs use JSON format with consistent fields: batch_id,
stage, level, message. This makes it easy to
filter by batch to trace a run from ingestion to storage.