This repository implements Stripe API to Databricks raw ingestion pipeline with incremental loading and workflow orchestration..
The project is intentionally focused on raw ingestion only:
- no business transformations in the ingestion step
- no upserts/merges into raw tables
- full payload retention for auditability and replay
For each configured Stripe entity, the job:
- Reads the latest resume point from checkpoint events.
- Calls Stripe list APIs with:
created[gte]watermark filterstarting_aftercursor for pagination
- Writes each object as an immutable raw row to a Delta table.
- Appends checkpoint events after each committed page.
- Produces a new resume point for the next run.
This design supports daily scheduled ingestion and deterministic recovery from interruption.
Current entities are defined in src/stripe_raw_ingestion/entities.py:
payment_intentschargesbalance_transactionsrefundsinvoices
Raw tables are written with .mode("append").
No MERGE, no deletes, no in-place updates.
The pipeline combines three mechanisms:
- watermark:
created[gte]=start_created_gte - cursor:
starting_after=<last_object_id> - lookback overlap: next run starts from
max_created_seen - lookback_seconds
Checkpointing is event-based and append-only (stripe_ingestion_checkpoints):
RUN_STARTEDPAGE_COMMITTEDRUN_COMPLETEDRUN_FAILED
Recovery behavior:
- if latest run is incomplete/failed: resume from last cursor
- if latest run completed: restart from lookback-adjusted watermark
HTTP client handles:
- pagination
- network timeouts/connection errors
- retryable HTTP statuses (
429,5xx) - exponential backoff + jitter
- optional
Retry-Aftersupport
Each row stores:
_raw_json(entire Stripe object)- source IDs/timestamps (
_stripe_id,_stripe_created_at) - ingestion lineage (
_ingest_run_id,_ingested_at,_entity) - request/page context (
_page_number, cursors, status/request_id) - payload fingerprint (
_payload_sha256)
src/stripe_raw_ingestion/main.py
Entrypoint and Spark session initialization.src/stripe_raw_ingestion/config.py
Runtime configuration + secret lookup via Databricks secrets.src/stripe_raw_ingestion/pipeline.py
Entity ingestion loop, incremental logic, checkpoint orchestration.src/stripe_raw_ingestion/http_client.py
Stripe HTTP client with retry/backoff policy.src/stripe_raw_ingestion/raw_writer.py
Raw Delta write contract and append logic.src/stripe_raw_ingestion/state_store.py
Checkpoint event table + resume-point computation.resources/stripe_ingestion_job.yml
Databricks Job definition for wheel task execution.databricks.yml
Bundle configuration, targets, and artifact build.
The wheel task reads parameters (via CLI option parsing) for:
catalogschemabundle_targetstripe_secret_scopestripe_secret_keyentities
Other ingestion defaults currently in config:
- checkpoint table
- page size
- lookback seconds
- max retries
- request timeout
- backoff base/max/jitter
- Stripe API base URL
databricks bundle validate
databricks bundle deploy --target dev
databricks bundle run stripe_ingestion_job --target devTo onboard another API or entity family, you can reuse the same pattern:
- Add entity endpoint/table mapping in
entities.py. - Reuse existing HTTP pagination + retry client.
- Keep raw write contract unchanged.
- Reuse checkpoint event model for incremental resume.