This repository implements the first stage of the data integration pipeline described in docs/WHITE_PAPER.md.
The goal is to connect external systems with unknown or inconsistent schemas and normalize them into a
stable integration record stream without forcing a fixed payload schema.
The pipeline works in three steps:
- Driver fetches raw data from an external system (file, REST, or DB).
- Integration pipeline validates minimal contract rules and emits:
IntegrationRecordfor accepted payloadsDeadLetterfor rejected payloads
- Fabric merge combines multiple integration outputs into one dataset.
crates/common: Shared data structures (Payload,IntegrationRecord,DeadLetter,PayloadFormat).crates/drivers: External system drivers (file, REST, DB).crates/runtime: Interface definition + integration pipeline logic.crates/shell: CLI runner for the pipeline.crates/fabric: Merge layer that combines multiple integration outputs.
- Opaque input support using
Payload::Binary(base64 encoded) to preserve unstructured data. - External interface definition that drives validation (required paths, record id paths, payload format).
- Drivers:
- File drivers:
jsonl,text,binary - REST driver: basic GET/POST with headers and optional body
- DB driver:
sqlite,postgres,mysql - Stream driver MVP:
stream.kafka(fixture-backed input viamvp_input)
- File drivers:
- DLQ (dead letter) handling with a pluggable sink interface (file sink implemented by default).
- Merge layer to combine multiple pipeline outputs with optional dedupe.
python3 scripts/create_sample_dbs.pycargo run -p shell -- \
--interface path/to/interface.json \
--contract-registry system/contracts/reference/allowlist.json \
--input path/to/input.jsonl \
--output path/to/output.jsonlcargo run -p shell -- \
--interface path/to/interface.json \
--contract-registry system/contracts/reference/allowlist.json \
--input path/to/input.bin \
--output path/to/output.jsonl \
--format binarycargo run -p shell -- \
--interface path/to/rest-interface.json \
--contract-registry system/contracts/reference/allowlist.json \
--output path/to/output.jsonlcargo run -p shell -- \
--interface tests/fixtures/interfaces/mes.db.json \
--contract-registry system/contracts/reference/allowlist.json \
--output /tmp/mes.output.jsonlcargo run -p shell -- \
--interface tests/fixtures/interfaces/qms.db.json \
--contract-registry system/contracts/reference/allowlist.json \
--output /tmp/qms.output.jsonlcargo run -p shell -- \
--interface path/to/postgres.interface.json \
--contract-registry system/contracts/reference/allowlist.json \
--output /tmp/postgres.output.jsonlcargo run -p shell -- \
--interface path/to/mysql.interface.json \
--contract-registry system/contracts/reference/allowlist.json \
--output /tmp/mysql.output.jsonlcargo run -p shell -- \
--interface tests/fixtures/interfaces/stream.kafka.sample.json \
--contract-registry system/contracts/reference/allowlist.json \
--output /tmp/stream.output.jsonl- Current MVP behavior:
driver.stream.kafka.mvp_inputis consumed as the stream source. - This keeps payload normalization identical to other drivers while streaming runtime semantics evolve.
- Default run mode is one-shot (
--schedule-mode once). - Interval mode executes the same interface repeatedly:
--schedule-mode interval--interval-seconds <n>--max-runs <n>
Example (run stream interface every 5s, 3 times):
cargo run -p shell -- \
--interface tests/fixtures/interfaces/stream.kafka.sample.json \
--contract-registry system/contracts/reference/allowlist.json \
--output /tmp/stream.scheduled.output.jsonl \
--schedule-mode interval \
--interval-seconds 5 \
--max-runs 3cargo run -p fabric -- \
--inputs /tmp/mes.output.jsonl \
--inputs /tmp/qms.output.jsonl \
--output /tmp/merged.output.jsonl \
--dedupebash scripts/run_service_smoke_tests.shbash scripts/run_complex_pipeline_checks.shcd ui
npm install
npm run devOpen http://localhost:3000 to view artifact status from /tmp/rootsys-smoke and /tmp/rootsys-complex outputs.
bash scripts/run_all_checks_and_prepare_ui.shbash scripts/run_all_checks_and_prepare_ui.sh defaultCreate a new customer profile:
bash scripts/create_company_profile.sh <company-name>Validate profile paths before full run:
bash scripts/validate_company_profile.sh <company-name>Scale-up test example:
ROOTSYS_SMOKE_DB_COUNT=500 ROOTSYS_SMOKE_REST_COUNT=500 ROOTSYS_COMPLEX_STREAM_RECORD_COUNT=1000 bash scripts/run_all_checks_and_prepare_ui.sh <company-name>Profile file location:
config/companies/<profile>.env
Custom config file override:
ROOTSYS_CONFIG_FILE=/absolute/path/to/company.env bash scripts/run_all_checks_and_prepare_ui.shOptional: automatically start Next.js dev server after all checks:
ROOTSYS_RUN_UI_DEV=1 bash scripts/run_all_checks_and_prepare_ui.sh- File sink (default):
--dlq-sink file--dlq /path/to/output.dlq.jsonl(optional, default is derived from--output)
- SQLite sink:
--dlq-sink sqlite--dlq /path/to/dlq.sqlite(optional, default is derived from--output)--dlq-table dead_letters(optional)
Example (SQLite DLQ sink):
cargo run -p shell -- \
--interface path/to/interface.json \
--contract-registry system/contracts/reference/allowlist.json \
--output /tmp/output.jsonl \
--dlq-sink sqlite \
--dlq /tmp/dlq.sqlite \
--dlq-table dead_letters- Replay from file DLQ:
cargo run -p shell -- \
--interface path/to/interface.json \
--contract-registry system/contracts/reference/allowlist.json \
--output /tmp/replay.output.jsonl \
--replay-dlq /tmp/output.dlq.jsonl \
--replay-dlq-source file- Replay from SQLite DLQ:
cargo run -p shell -- \
--interface path/to/interface.json \
--contract-registry system/contracts/reference/allowlist.json \
--output /tmp/replay.output.jsonl \
--replay-dlq /tmp/dlq.sqlite \
--replay-dlq-source sqlite \
--replay-dlq-table dead_lettersThe interface JSON drives the pipeline. Example:
{
"name": "mes",
"version": "v1",
"driver": {
"kind": "jsonl",
"input": "./data/mes.jsonl",
"content_type": "application/x-ndjson"
},
"payload_format": "json",
"record_id_policy": "hash_fallback",
"record_id_paths": ["/defect_id", "/lot_id"],
"required_paths": ["/defect_id"]
}-
record_id_policycontrols how record IDs are generated:hash_fallback(default): userecord_id_pathswhen present, otherwise hash the payload.strict: requirerecord_id_pathsto resolve; unresolved IDs are emitted to DLQ.
-
Contract governance is enforced through
--contract-registry(default:system/contracts/reference/allowlist.json). The interface(name, version)pair must exist in the allowlist.
{
"name": "external-api",
"version": "v1",
"driver": {
"kind": "rest",
"rest": {
"url": "https://api.example.com/events",
"method": "GET",
"headers": { "Accept": "application/json" },
"auth": {
"kind": "api_key",
"api_key": {
"in": "header",
"name": "X-API-KEY",
"value": "<token>"
}
},
"timeout_ms": 5000,
"retry": {
"max_attempts": 3,
"base_delay_ms": 100,
"max_delay_ms": 2000,
"jitter_percent": 20
},
"circuit_breaker": {
"failure_threshold": 5,
"open_timeout_ms": 30000
},
"response_format": "json",
"items_pointer": "/items"
}
},
"payload_format": "json"
}items_pointeris optional. If it points to a JSON array, one record is created per element.- If
response_formatisunknown, the driver tries JSON, then UTF-8 text, then falls back to binary. - Safe default request timeout is
5000mswhentimeout_msis omitted. - Transient REST failures are retried with exponential backoff and jitter.
- API key auth supports
in: "header"andin: "query"injection modes. - OAuth2 client-credentials auth is supported via
auth.kind = "oauth2_client_credentials"withtoken_url,client_id,client_secret, and optionalscope. - OAuth2 access tokens are cached in-memory and refreshed before expiry.
- Cursor pagination is supported via
pagination.kind = "cursor". - Page/page_size pagination is supported via
pagination.kind = "page". - Safe default page cap is
100requests when paginationmax_pagesis omitted. - Cursor record emission rules:
items_pointerpoints to an array -> emit one record per item.items_pointeromitted or non-array target -> emit one record per page payload.
- Page/page_size record emission rules:
items_pointerpoints to an array -> emit one record per item.- emission stops when a page emits zero records, or
max_pagesis reached.
- Rate-limit policy notes:
- transient HTTP failures (
408,425,429,500,502,503,504) and transport errors are retried. - default retry policy:
max_attempts=3,base_delay_ms=100,max_delay_ms=2000,jitter_percent=20. - retry policy can be overridden with
rest.retry.
- transient HTTP failures (
- Circuit breaker policy is optional for REST and supports:
failure_threshold(default5)open_timeout_ms(default30000)- state transitions:
closed -> open -> half_open -> closed - Use conservative
page_size, explicitmax_pages, and endpoint-side quotas for safe operation.
OAuth2 example:
{
"name": "external-api",
"version": "v1",
"driver": {
"kind": "rest",
"rest": {
"url": "https://api.example.com/events",
"method": "GET",
"auth": {
"kind": "oauth2_client_credentials",
"oauth2_client_credentials": {
"token_url": "https://auth.example.com/oauth/token",
"client_id": "client-id",
"client_secret": "client-secret",
"scope": "events:read"
}
},
"response_format": "json"
}
},
"payload_format": "json"
}Cursor pagination example:
{
"name": "external-api",
"version": "v1",
"driver": {
"kind": "rest",
"rest": {
"url": "https://api.example.com/events",
"method": "GET",
"response_format": "json",
"items_pointer": "/items",
"pagination": {
"kind": "cursor",
"cursor": {
"cursor_param": "cursor",
"cursor_path": "/next_cursor",
"initial_cursor": "",
"max_pages": 100
}
}
}
},
"payload_format": "json"
}Page/page_size pagination example:
{
"name": "external-api",
"version": "v1",
"driver": {
"kind": "rest",
"rest": {
"url": "https://api.example.com/events",
"method": "GET",
"response_format": "json",
"items_pointer": "/items",
"pagination": {
"kind": "page",
"page": {
"page_param": "page",
"page_size_param": "page_size",
"page_size": 100,
"initial_page": 1,
"max_pages": 50
}
}
}
},
"payload_format": "json"
}{
"name": "local-db",
"version": "v1",
"driver": {
"kind": "db",
"db": {
"kind": "sqlite",
"connection": "./data/sample.db",
"query": "SELECT * FROM defect_events"
}
},
"payload_format": "json"
}- Each row becomes a JSON object where keys are column names.
- Blob columns are base64 encoded.
{
"name": "ops-db",
"version": "v1",
"driver": {
"kind": "db",
"db": {
"kind": "postgres",
"connection": "host=localhost user=app password=secret dbname=ops",
"query": "SELECT * FROM defect_events",
"postgres_tls_mode": "require",
"pool": {
"min_connections": 1,
"max_connections": 10
},
"retry": {
"max_attempts": 3,
"base_delay_ms": 100,
"max_delay_ms": 2000,
"jitter_percent": 20
},
"circuit_breaker": {
"failure_threshold": 5,
"open_timeout_ms": 30000
}
}
},
"payload_format": "json"
}{
"name": "ops-db",
"version": "v1",
"driver": {
"kind": "db",
"db": {
"kind": "mysql",
"connection": "mysql://app:secret@localhost:3306/ops",
"query": "SELECT * FROM defect_events",
"pool": {
"min_connections": 1,
"max_connections": 10
},
"retry": {
"max_attempts": 3,
"base_delay_ms": 100,
"max_delay_ms": 2000,
"jitter_percent": 20
},
"circuit_breaker": {
"failure_threshold": 5,
"open_timeout_ms": 30000
}
}
},
"payload_format": "json"
}postgres_tls_modeis optional and only valid forkind: "postgres".postgres_tls_modesupports:disable(default)require
poolis optional and supported forpostgresandmysql.- Pool defaults (when omitted):
min_connections=1,max_connections=10. - DB retry policy is optional for all DB kinds and supports:
max_attempts(default3)base_delay_ms(default100)max_delay_ms(default2000)jitter_percent(default20, range0..=100)
- Circuit breaker policy is optional for all DB kinds and supports:
failure_threshold(default5)open_timeout_ms(default30000)- state transitions:
closed -> open -> half_open -> closed
IntegrationRecordretains the raw payload plus metadata and pipeline annotations.metadatais standardized across drivers:content_typefilename- optional
source_details(source_type, optionallocator)
IntegrationRecord.warningsandDeadLetter.errorsare structured messages:code: machine-readable error/warning codepath: optional JSON path/pointer contextmessage: human-readable detail
DeadLetterretains the raw payload plus structured validation errors.DeadLetter.reason_codesstores unique machine-readable reason code list.DeadLetter.lineagestores rejection lineage metadata (pipeline_stage,driver_kind,record_id_policy, source context).
- The merge layer outputs the same
IntegrationRecordJSONL format. - When dedupe is enabled, it removes duplicates by
(source, interface.name, interface.version, record_id). - Operational strategy reference:
docs/runbooks/idempotency_dedupe_strategy.md.
- Expand executable product flow coverage across ontology/linkage/kernel runtime paths.
- Add shell-level end-to-end integration tests for CLI execution chains.
The repository is validated locally with:
cargo fmt --check
cargo check
cargo test
cargo buildCI preflight also enforces Rust quality gates on pull requests via .github/workflows/preflight.yml.