onestep is a small async task runtime centered on four concepts:
OneStepApp: task registry and lifecycle managerSource: fetch data from a queue or polling backendSink: publish processed dataDelivery: a single fetched item withack/retry/fail
The V1 stable surface includes:
MemoryQueueonestep-mysql:MySQLConnector.table_queue(...), binlog and incremental sources, table sinks, state stores, and cursor storesonestep-mq:RabbitMQConnector.queue(...)onestep-redis:RedisConnector.stream(...)onestep-sqs:SQSConnector.queue(...)IntervalSource.every(...)CronSource(...)WebhookSource(...)
Core package:
pip install onestepCommon extras:
pip install 'onestep[yaml]'pip install onestep-mysqlpip install onestep-mqpip install onestep-redispip install onestep-sqspip install 'onestep[control-plane]'pip install 'onestep[all]'
Connector plugins:
- Feishu Bitable: install
onestep-feishu-bitable - MySQL: install
onestep-mysql - RabbitMQ: install
onestep-mq - Redis: install
onestep-redis - SQS: install
onestep-sqs
From a source checkout:
pip install -e .pip install -e '.[dev]'pip install -e '.[integration]'
1.0.0 is a runtime rewrite. Projects built on the legacy step and
*Broker APIs should treat the upgrade as a migration, not a drop-in package
bump.
See MIGRATION-0.5-to-1.0.0.md for:
- old-to-new API mapping
- unsupported legacy features
- a minimal before/after example
- rollout guidance for existing projects
The deployment entrypoint is the onestep CLI.
Recommended module shape:
from onestep import IntervalSource, OneStepApp
app = OneStepApp("billing-sync")
@app.task(source=IntervalSource.every(hours=1, immediate=True, overlap="skip"))
async def sync_billing(ctx, _):
print("syncing billing data")Run the app:
onestep run your_package.tasks:appThe short form is also supported:
onestep your_package.tasks:appCheck the target before starting it:
onestep check your_package.tasks:appYou can also point the CLI at a zero-argument factory:
onestep check your_package.tasks:build_appUse JSON output when you want the check result in CI or deployment scripts:
onestep check --json your_package.tasks:appYou can also load a YAML app definition. Start with the smallest working shape
and add fields only when needed. resources is the preferred top-level section
for named runtime objects, while handler.ref and hooks.*.ref point to
Python callables:
app:
name: billing-sync
logging:
level: DEBUG
resources:
tick:
type: interval
minutes: 5
immediate: true
tasks:
- name: sync_billing
source: tick
handler:
ref: your_package.handlers.billing:sync_billingCheck or run the YAML target the same way:
onestep check worker.yaml
onestep run worker.yamlScaffold a minimal standalone YAML project when starting from scratch:
onestep init billing-syncThat generates:
pyproject.tomlworker.yamlsrc/<package>/tasks/src/<package>/transforms/
The generated project stays intentionally small. worker.yaml only defines
runtime wiring; business logic still lives in Python. Add hooks.py later only
when you actually wire hooks in YAML.
Use strict checking when you want schema validation for YAML targets, including
unknown-field detection and contract checks for reporter, hooks, tasks,
and resource specs:
onestep check --strict worker.yamlapp.logging.level is optional and only controls the onestep logger
namespace. It does not configure handlers or formatters, but setting it to
DEBUG makes framework-level sink success logs visible.
YAML resources can reference other resources by name, for example
rabbitmq_queue.connector: rmq or mysql_incremental.state: cursor_store.
YAML emit entries can also route handler results with a Python predicate.
The YAML stays declarative: when names a callable, while the condition logic
stays in Python.
emit:
- audit_sink
- when:
ref: your_package.routing:is_active_user
params:
status_field: status
then: active_user_sink
otherwise: inactive_user_sinkThe predicate may accept ctx, payload, and result; then and otherwise
can each be a sink name or list of sink names. If otherwise is omitted and the
predicate is falsy, that route emits to no sink.
Task handlers and hooks can read:
ctx.configfor app-level configctx.task_configfor task-level configctx.resourcesfor named runtime objects
App hooks can read app.resources for the same named resource registry.
For a progressive YAML guide from minimal task to fully wired app, including
strict-check guidance, see
docs/yaml-task-definition.md.
For a standalone project-style example with worker.yaml plus
tasks.py / transforms.py / hooks.py, see example/yaml_project/.
Built-in YAML resource types:
memoryintervalcronwebhookhttp_sink
Plugin YAML resource types:
onestep-mysql:mysql,mysql_state_store,mysql_cursor_store,mysql_table_queue,mysql_incremental,mysql_binlog,mysql_table_sinkonestep-mq:rabbitmq,rabbitmq_queueonestep-redis:redis,redis_streamonestep-sqs:sqs,sqs_queueonestep-feishu-bitable:feishu_bitable,feishu_bitable_incremental,feishu_bitable_table_sink
Install the corresponding plugin package in the worker environment before using plugin resource types in YAML.
http_sink sends task results as JSON by default. Configure body only when
the outbound payload should be reshaped. url, headers, params, and
configured body values can reference body, payload, meta, and
attempts with {{ ... }} variables:
resources:
notify:
type: http_sink
url: "https://api.example.com/orders/{{ body.order_id }}"
method: POST
headers:
X-Trace-Id: "{{ meta.trace_id }}"
params:
attempt: "{{ attempts }}"
body:
id: "{{ body.order_id }}"
status: "{{ body.status }}"YAML apps can also bind app-level state explicitly:
app:
name: billing-sync
state: app_state
connectors:
db:
type: mysql
dsn: mysql+pymysql://root:root@localhost:3306/app
app_state:
type: mysql_state_store
connector: db
table: onestep_stateThe named state resource must support load/save/delete; mysql_state_store
and mysql_cursor_store both work when onestep-mysql is installed.
Legacy connectors, sources, and sinks sections are still supported and
merged into the same internal resource registry. For the full YAML schema and a
larger task-definition example, see docs/yaml-task-definition.md.
Runnable examples live in:
example/cli_app.pyexample/cli_app.yaml
Run it locally with:
PYTHONPATH=src onestep check example.cli_app:app
onestep check example/cli_app.yaml
SYNC_INTERVAL_SECONDS=5 PYTHONPATH=src onestep run example.cli_app:appA complete deployment template lives in:
deploy/README.mddeploy/systemd/onestep-app.servicedeploy/env/onestep-app.env.exampledeploy/bin/onestep-preflight.sh
The example uses:
/etc/onestep/onestep-app.envfor deployment variablesExecStartPreto run a startup checkExecStartto launchonestep run
Install it with:
sudo mkdir -p /etc/onestep
sudo cp deploy/env/onestep-app.env.example /etc/onestep/onestep-app.env
sudo cp deploy/systemd/onestep-app.service /etc/systemd/system/onestep-app.service
sudo systemctl daemon-reload
sudo systemctl enable --now onestep-appCheck status and logs:
sudo systemctl status onestep-app
sudo journalctl -u onestep-app -fSee deploy/README.md for the expected directory layout and the env vars you need to adjust first.
The deploy template prepends APP_CWD to PYTHONPATH so module targets defined inside the repo can be imported by the onestep console script.
onestep also ships an official worker runtime image for YAML-oriented workers.
Mounted workspace usage:
docker run --rm \
-e ONESTEP_TARGET=/workspace/worker.yaml \
-v "$PWD:/workspace" \
ghcr.io/mic1on/onestep-worker:1.2.7Derived image usage:
FROM ghcr.io/mic1on/onestep-worker:1.2.7
WORKDIR /workspace
COPY . /workspace
ENV ONESTEP_TARGET=/workspace/worker.yamlONESTEP_TARGET points to the YAML file path or Python import target the container should start.
The runtime automatically adds /workspace and /workspace/src to PYTHONPATH.
If /workspace/requirements.txt exists it is installed first; otherwise the runtime falls back to installing /workspace when /workspace/pyproject.toml exists.
See deploy/worker-runtime-image.md for the full usage guide and troubleshooting notes.
onestep can push runtime telemetry to onestep-control-plane over a single long-lived
WebSocket session without adding a new connector or changing task code.
Install the optional control-plane dependency before using the reporter:
pip install 'onestep[control-plane]'The YAML form is intentionally minimal:
app:
name: billing-sync
reporter: trueThat enables ControlPlaneReporter using env-backed defaults. When you need explicit
overrides, keep the field names aligned with ControlPlaneReporterConfig:
reporter:
base_url: https://control-plane.example.com
token: ${ONESTEP_CONTROL_PLANE_TOKEN}
service_name: billing-sync-workerAttach the reporter explicitly:
from onestep import ControlPlaneReporter, ControlPlaneReporterConfig, IntervalSource, OneStepApp
app = OneStepApp("billing-sync")
reporter = ControlPlaneReporter(ControlPlaneReporterConfig.from_env(app_name=app.name))
reporter.attach(app)
@app.task(source=IntervalSource.every(hours=1, immediate=True, overlap="skip"))
async def sync_users(ctx, _):
print("syncing users")Required environment variables:
ONESTEP_CONTROL_PLANE_URLorONESTEP_CONTROL_URLONESTEP_CONTROL_PLANE_TOKENorONESTEP_CONTROL_TOKEN
Common optional environment variables:
ONESTEP_ENVONESTEP_SERVICE_NAMEONESTEP_NODE_NAMEONESTEP_DEPLOYMENT_VERSIONONESTEP_INSTANCE_IDONESTEP_REPLICA_KEYONESTEP_STATE_DIRONESTEP_CONTROL_PLANE_HEARTBEAT_INTERVAL_SONESTEP_CONTROL_PLANE_METRICS_INTERVAL_SONESTEP_CONTROL_PLANE_EVENT_FLUSH_INTERVAL_SONESTEP_CONTROL_PLANE_EVENT_BATCH_SIZEONESTEP_CONTROL_PLANE_MAX_PENDING_EVENTSONESTEP_CONTROL_PLANE_MAX_PENDING_METRIC_BATCHESONESTEP_CONTROL_PLANE_TIMEOUT_SONESTEP_CONTROL_PLANE_RECONNECT_BASE_DELAY_SONESTEP_CONTROL_PLANE_RECONNECT_MAX_DELAY_S
The reporter now uses:
GET /api/v1/agents/wshello/hello_acktelemetry(sync|heartbeat|metrics|events)command/command_ack/command_result
Behavior:
- startup reuses a stable
instance_idfrom local state unlessONESTEP_INSTANCE_IDorONESTEP_REPLICA_KEYoverrides it - startup opens a fresh WS session and negotiates capabilities
- startup sends a heartbeat and a topology sync built from the current app tasks
syncandheartbeatsequences are tracked independently and persist across restarts- sync is resent on later heartbeat cycles until the current topology hash converges
- task execution events are aggregated into task window metrics
- important runtime events (
retried,failed,dead_lettered,cancelled) are batched and pushed - remote commands can trigger
ping,shutdown,restart,drain,pause_task,resume_task,sync_now,flush_metrics, andflush_events - transport send failures reset the current session and reconnect with exponential backoff plus jitter
- low-priority
metricsandeventsbuffers are bounded locally; if the control plane stays down, the oldest buffered telemetry is dropped first - reporter failures are logged but do not stop task execution
Identity resolution order:
ONESTEP_INSTANCE_ID: hard override for tests or explicit pinning.ONESTEP_REPLICA_KEY: derives a deterministic UUIDv5 fromservice_name + environment + replica_key.- local identity state: reuses the
instance_idstored in the reporter state dir.
By default ControlPlaneReporterConfig.from_env() uses a local state dir under:
~/.onestep/control-plane-state/<environment>/<service_name>~/.onestep/control-plane-state/<environment>/<service_name>/<replica_key>whenONESTEP_REPLICA_KEYis set
Multi-replica rules:
- single worker: the default state dir is enough
- multiple workers on one host: give each worker a unique
ONESTEP_REPLICA_KEYorONESTEP_STATE_DIR - StatefulSet-style deployments: use the stable ordinal as
ONESTEP_REPLICA_KEY - disposable deployment pod names are not a stable replica key unless you inject one yourself
For an operations-focused guide with deployment patterns and troubleshooting, see
docs/stable-instance-identity.md.
Quick local demo:
-
Start
onestep-control-plane:If the control plane repo is checked out next to this one:
cd ../onestep-control-plane
./scripts/start-local.sh- Start a long-running OneStep reporter demo:
./scripts/run-control-plane-demo.sh- Or run the end-to-end smoke script, which boots the local control plane, starts the
demo agent, dispatches a
ping, and waits for a successfulcommand_result:
./scripts/run-control-plane-smoke.sh- Inspect the control plane. The demo cycles through
ok,retry_once,fail, andslowjobs so you can see successful runs, retries, terminal failures, timeouts, and dead-letter events without changing any code:
http://127.0.0.1:8080/services?environment=dev
http://127.0.0.1:8080/services/control-plane-demo?environment=dev
http://127.0.0.1:8080/services/control-plane-demo?environment=dev&tab=commands
The runtime now has a few explicit control points:
OneStepApp(..., shutdown_timeout_s=30.0)controls how long the app waits for inflight tasks before cancelling them during shutdown@app.task(..., timeout_s=30.0)applies an execution timeout to async handlers@app.task(..., dead_letter=...)routes terminal failures into a dead-letter sink@app.on_eventreceives task execution eventsInMemoryMetrics()is a built-in metrics hook for event counting@app.on_startupand@app.on_shutdownregister lifecycle hooksctx.configexposes app configctx.stateexposes per-task namespaced state backed by the app state store
Failures are classified as:
errortimeoutcancelled
Custom retry policies receive a FailureInfo object so they can decide differently for timeouts vs business exceptions.
Execution events are emitted for:
fetchedstartedsucceededretriedfaileddead_letteredcancelled
from onestep import InMemoryMetrics, InMemoryStateStore, MemoryQueue, OneStepApp, StructuredEventLogger
source = MemoryQueue("incoming")
dead = MemoryQueue("dead-letter")
metrics = InMemoryMetrics()
app = OneStepApp(
"runtime-demo",
config={"prefix": "demo"},
state=InMemoryStateStore(),
shutdown_timeout_s=10.0,
)
app.on_event(metrics)
app.on_event(StructuredEventLogger())
@app.on_startup
async def bootstrap(app):
await source.publish({"value": 1})
@app.task(source=source, timeout_s=5.0, dead_letter=dead)
async def consume(ctx, item):
runs = await ctx.state.get("runs", 0)
await ctx.state.set("runs", runs + 1)
print(ctx.config["prefix"], item)
ctx.app.request_shutdown()If a task ends in a terminal failure and dead_letter is configured, the dead-letter sink receives:
body["payload"]: the original payloadbody["failure"]:{kind, exception_type, message, traceback?}meta["original_meta"]: the original envelope metadata
You can inspect metrics in-process:
snapshot = metrics.snapshot()
print(snapshot["kinds"])StructuredEventLogger() bridges TaskEvent into standard Python logging with consistent fields such as:
event_kindapp_nametask_namesource_nameattemptsduration_sfailure_kind
Tasks can also opt into webhook-friendly success metadata by returning a reserved notification object. The task return value sent to sinks stays unchanged; only the emitted succeeded event gains meta["notification"].
from onestep import MemoryQueue, OneStepApp, StructuredEventLogger
source = MemoryQueue("incoming")
app = OneStepApp("notification-demo")
app.on_event(StructuredEventLogger())
@app.task(source=source)
async def sync_status(ctx, item):
updated = item["updated"]
ctx.app.request_shutdown()
return {
"success": True,
"updated": updated,
"notification": {
"summary": f"Status sync complete, updated {updated} devices",
"metrics": [
{"label": "Updated", "value": updated},
],
},
}The emitted TaskEventKind.SUCCEEDED metadata then includes:
{
"notification": {
"summary": "Status sync complete, updated 12 devices",
"metrics": [
{"label": "Updated", "value": 12}
]
}
}notification values must be JSON-safe. Invalid values are dropped from the emitted metadata instead of failing task execution.
from onestep import MemoryQueue, OneStepApp
app = OneStepApp("demo")
source = MemoryQueue("incoming")
sink = MemoryQueue("processed")
@app.task(source=source, emit=sink, concurrency=4)
async def double(ctx, item):
ctx.app.request_shutdown()
return {"value": item["value"] * 2}
async def main():
await source.publish({"value": 21})
await app.serve()Use a local scheduler when you need to run a task every fixed duration.
from onestep import IntervalSource, OneStepApp
app = OneStepApp("interval-demo")
@app.task(
source=IntervalSource.every(
hours=1,
immediate=True,
overlap="skip",
payload={"job": "refresh-cache"},
)
)
async def refresh_cache(ctx, item):
print("scheduled at:", ctx.current.meta["scheduled_at"], "payload:", item)overlap controls what happens when the previous run is still inflight:
allow: start another run immediatelyskip: drop missed ticks while the previous run is still runningqueue: serialize missed ticks and run them one by one afterwards
queue mode keeps at most max_queued_runs missed ticks, defaulting to 1000,
and drops the oldest queued ticks first when the backlog exceeds that bound.
Use CronSource when you care about wall-clock time rather than elapsed duration.
from onestep import CronSource, OneStepApp
app = OneStepApp("hourly-sync")
@app.task(source=CronSource("0 * * * *", timezone="Asia/Shanghai", overlap="skip"))
async def sync_hourly(ctx, _):
print("running at:", ctx.current.meta["scheduled_at"])The built-in parser supports standard 5-field cron expressions and these aliases:
@hourly@daily@weekly@monthly@yearly
Use WebhookSource when external systems push events into your app.
from onestep import BearerAuth, MemoryQueue, OneStepApp, WebhookSource
app = OneStepApp("webhook-demo")
jobs = MemoryQueue("jobs")
@app.task(
source=WebhookSource(
path="/webhooks/github",
methods=("POST",),
host="127.0.0.1",
port=8080,
auth=BearerAuth("replace-me"),
),
emit=jobs,
)
async def ingest_github(ctx, event):
return {
"event": event["headers"].get("x-github-event"),
"payload": event["body"],
}The stdlib implementation supports:
- shared
host:portlisteners across multiple webhook routes - optional
BearerAuth(...) json,form,text,raw, andautobody parsing- fixed
WebhookResponse(...)responses - exact path matching and method filtering
The payload delivered to your task contains:
bodyheadersquerymethodpathclientreceived_at
Use a table as a task queue by claiming rows and marking them as finished.
from onestep import MemoryQueue, OneStepApp
from onestep_mysql import MySQLConnector
app = OneStepApp("orders")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
source = db.table_queue(
table="orders",
key="id",
where="status = 0",
claim={"status": 9},
ack={"status": 1},
nack={"status": 0},
batch_size=100,
)
sink = db.table_sink(table="processed_orders", mode="upsert", keys=("id",))
@app.task(source=source, emit=sink, concurrency=16)
async def process_order(ctx, row):
return {"id": row["id"], "payload": row["payload"], "status": "done"}When you need to write computed fields back to the claimed row itself, call
await ctx.update_current_row({...}) inside the task. This is currently
supported for MySQLConnector.table_queue(...) deliveries.
Use (updated_at, id) as a lightweight cursor for Logstash-style sync.
from onestep import MemoryQueue, OneStepApp
from onestep_mysql import MySQLConnector
app = OneStepApp("sync-users")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
state = db.cursor_store(table="onestep_cursor")
source = db.incremental(
table="users",
key="id",
cursor=("updated_at", "id"),
where="deleted = 0",
batch_size=1000,
state=state,
)
out = MemoryQueue("dw")
@app.task(source=source, emit=out, concurrency=1)
async def sync_user(ctx, row):
return {"id": row["id"], "name": row["name"], "updated_at": row["updated_at"]}For production deployments, prefer db.cursor_store(...) or db.state_store(...) over the in-memory stores so cursors and task state survive process restarts.
Install onestep-feishu-bitable to use Feishu Bitable as an incremental
source or upsert sink. In this repository, the plugin source lives under
plugins/onestep-feishu-bitable.
from onestep import OneStepApp
from onestep_feishu_bitable import (
FeishuBitableConnector,
feishu_bitable_text,
feishu_bitable_user,
)
app = OneStepApp("feishu-sync")
feishu = FeishuBitableConnector(app_id="cli_xxx", app_secret="secret")
source = feishu.incremental(
app_token="bascnxxx",
table_id="tbl_source",
cursor_field="最后更新时间",
user_id_type="user_id",
batch_size=100,
fallback_scan_page_limit=100,
)
sink = feishu.table_sink(
app_token="bascnyyy",
table_id="tbl_target",
mode="upsert",
match_fields=["编号"],
user_id_type="user_id",
)
@app.task(source=source, emit=sink, concurrency=20)
async def sync_order(ctx, payload):
fields = payload["fields"]
return {
"编号": feishu_bitable_text(fields.get("编号")),
"标题": feishu_bitable_text(fields.get("标题")),
"负责人": feishu_bitable_user(fields.get("负责人ID")),
}For Feishu person fields, pass the matching user_id_type (open_id,
union_id, or user_id) and write values as [{"id": "..."}]. The
feishu_bitable_text(...) and feishu_bitable_user(...) helpers live in the
onestep_feishu_bitable plugin package.
When Feishu cannot serve the incremental search with cursor sorting, the source
falls back to scanning pages and sorting locally. fallback_scan_page_limit
bounds that fallback to avoid high memory use on large tables.
from onestep import OneStepApp
from onestep_rabbitmq import RabbitMQConnector
app = OneStepApp("rabbitmq-demo")
rmq = RabbitMQConnector("amqp://guest:guest@localhost/")
source = rmq.queue(
"incoming_jobs",
exchange="jobs.events",
routing_key="jobs.created",
prefetch=50,
)
out = rmq.queue(
"processed_jobs",
exchange="jobs.events",
routing_key="jobs.done",
)
@app.task(source=source, emit=out, concurrency=8)
async def process_job(ctx, item):
return {"job": item["job"], "status": "done"}Install with pip install onestep-mq.
Use Redis Streams for lightweight, reliable message queuing with consumer groups.
from onestep import OneStepApp
from onestep_redis import RedisConnector
app = OneStepApp("redis-demo")
redis = RedisConnector("redis://localhost:6379")
source = redis.stream(
"jobs",
group="workers",
batch_size=100,
poll_interval_s=0.5,
)
out = redis.stream("processed")
@app.task(source=source, emit=out, concurrency=8)
async def process_job(ctx, item):
return {"job": item["job"], "status": "done"}Key features:
- Consumer groups: Multiple consumers share message processing
- Message acknowledgment:
XACKfor reliable processing - Pending messages: Unacked messages stay in PEL for retry via
XCLAIM - Stream trimming:
maxlenoption to limit stream size
Install with pip install onestep-redis.
from onestep import OneStepApp
from onestep_sqs import SQSConnector
app = OneStepApp("sqs-demo")
sqs = SQSConnector(region_name="ap-southeast-1")
source = sqs.queue(
"https://sqs.ap-southeast-1.amazonaws.com/123456789/jobs.fifo",
message_group_id="workers",
delete_batch_size=10,
delete_flush_interval_s=0.5,
heartbeat_interval_s=15,
heartbeat_visibility_timeout=60,
)
out = sqs.queue(
"https://sqs.ap-southeast-1.amazonaws.com/123456789/processed.fifo",
message_group_id="workers",
)
@app.task(source=source, emit=out, concurrency=16)
async def process_job(ctx, item):
return {"job": item["job"], "status": "done"}Install with pip install onestep-sqs.
Supported examples are indexed in:
example/README.md
Common entrypoints:
example/cli_app.pyexample/runtime_showcase.pyexample/mysql_incremental.pyexample/redis_stream.pyexample/webhook_source.py
Optional live tests are under tests/integration/ and plugin-specific
plugins/*/tests/integration/ directories.
The local stack now includes Redis, RabbitMQ, LocalStack SQS, and MySQL.
Install the live-test dependencies:
pip install '.[integration]'Start the local integration stack:
make integration-upLoad the generated environment into your current shell:
eval "$(./scripts/setup-integration-env.sh)"Run all live tests in one command:
make integration-testYou can also run one test file manually after loading the environment:
PYTHONPATH=src python3 -m pytest tests/integration/test_redis_live.py -qPYTHONPATH=src python3 -m pytest tests/integration/test_rabbitmq_live.py -quv run --all-packages python -m pytest plugins/onestep-sqs/tests/integration/test_sqs_live.py -quv run --all-packages python -m pytest plugins/onestep-mysql/tests/integration/test_mysql_live.py -q
Set KEEP_INTEGRATION_SERVICES=1 to keep containers running after make integration-test.
The test suite is now intentionally split by responsibility:
tests/contract/: runtime contract tests that lock task execution semanticstests/integration/: live infrastructure tests for built-in Redis and RabbitMQ connectorsplugins/*/tests/: plugin-specific unit, contract, and live integration teststests/test_*.py: connector-focused unit tests
For a quick end-to-end demo with webhook ingestion, queueing, dead-letter handling, metrics, and structured logs:
PYTHONPATH=src python3 example/runtime_showcase.pyThen send:
curl -X POST http://127.0.0.1:8090/demo/webhook \
-H 'Content-Type: application/json' \
-d '{"action":"ok","value":21}'
curl -X POST http://127.0.0.1:8090/demo/webhook \
-H 'Content-Type: application/json' \
-d '{"action":"fail","value":21}'
curl -X POST http://127.0.0.1:8090/demo/webhook \
-H 'Content-Type: application/json' \
-d '{"action":"slow","value":21}'You will see:
- structured task event logs
- successful processing for
action=ok - dead-letter output for
action=fail - timeout + dead-letter output for
action=slow