From a2e901bdc5763b5aa1c32fefa7fa723f750d4923 Mon Sep 17 00:00:00 2001 From: Victor Miranda Date: Thu, 21 May 2026 18:09:14 -0300 Subject: [PATCH 1/3] feat(coord): step-based workflow primitive over zbus Adds zephlet_coord.{h,c} skeleton, Kconfig (CONFIG_ZEPHLETS_COORD), and CMake wiring. Step + await + timeout chain, ported from loc8-catfish. Tests and docs follow. Signed-off-by: Victor Miranda --- CMakeLists.txt | 2 + Kconfig | 36 ++++++ zephlet_coord.c | 182 ++++++++++++++++++++++++++++++ zephlet_coord.h | 292 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 512 insertions(+) create mode 100644 zephlet_coord.c create mode 100644 zephlet_coord.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e3c2b7..d91c93d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,6 +39,8 @@ set_property( if(CONFIG_ZEPHLETS) zephyr_library() zephyr_library_sources("${SHARED_ZEPHLET_DIR}/zephlet.c") + zephyr_library_sources_ifdef(CONFIG_ZEPHLETS_COORD + "${SHARED_ZEPHLET_DIR}/zephlet_coord.c") zephyr_include_directories(.) zephyr_linker_sources(DATA_SECTIONS zephlet_iterables.ld) diff --git a/Kconfig b/Kconfig index aed2d01..d97c8b7 100644 --- a/Kconfig +++ b/Kconfig @@ -13,6 +13,42 @@ config ZEPHLET_MAX_INSTANCES `STRUCT_SECTION_FOREACH(zephlet, ...)` at runtime. Used to size the SYS_INIT init-priority ordering buffer. +config ZEPHLETS_COORD + bool "Coordinator framework" + help + Step-based workflow primitive for orchestrating multi-zephlet + flows on a dedicated workqueue, with optional zbus-event await + and bounded timeout. See zephlet_coord.h for the public surface. + +if ZEPHLETS_COORD + +config ZEPHLETS_COORD_STACK_SIZE + int "Stack size for the shared coordinator workqueue" + default 2048 + help + Stack size in bytes for the zephlet_coord_workq thread shared by + all coordinators in the application. + +config ZEPHLETS_COORD_PRIORITY + int "Priority for the shared coordinator workqueue" + default 10 + help + Thread priority for the zephlet_coord_workq thread. Coordinator + steps tend to be longer-running than typical k_work, so they get + their own thread rather than the system workqueue. + +config ZEPHLETS_COORD_ASYNC_ZBUS_TIMEOUT + int "Timeout for the async API of zephlet_coord when claiming channels" + default 25 + help + Timeout in milliseconds for the async API of zephlet_coord when + claiming channels. This is the timeout for the underlying + zbus_chan_add_obs and zbus_chan_rm_obs calls to complete, + both of which can fail if the channel is already claimed by a + third party. + +endif # ZEPHLETS_COORD + config ZEPHLETS_COAP bool "CoAP frontend for zephlets" select NETWORKING diff --git a/zephlet_coord.c b/zephlet_coord.c new file mode 100644 index 0000000..b142a6e --- /dev/null +++ b/zephlet_coord.c @@ -0,0 +1,182 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include "zephlet_coord.h" + +LOG_MODULE_DECLARE(zephlet, CONFIG_ZEPHLET_LOG_LEVEL); + +struct k_work_q zephlet_coord_workq; + +static K_THREAD_STACK_DEFINE(zephlet_coord_stack, CONFIG_ZEPHLETS_COORD_STACK_SIZE); + +static const char *get_chan_name(const struct zbus_channel *chan) +{ +#if defined(CONFIG_ZBUS_CHANNEL_NAME) + return zbus_chan_name(chan); +#else + ARG_UNUSED(chan); + return ""; +#endif +} + +static int zephlet_coord_workq_init(void) +{ + const struct k_work_queue_config cfg = { + .name = "zephlet_coord_wq", + }; + + k_work_queue_init(&zephlet_coord_workq); + k_work_queue_start(&zephlet_coord_workq, zephlet_coord_stack, + K_THREAD_STACK_SIZEOF(zephlet_coord_stack), + CONFIG_ZEPHLETS_COORD_PRIORITY, &cfg); + LOG_DBG("workqueue started: prio=%d stack=%zu", CONFIG_ZEPHLETS_COORD_PRIORITY, + K_THREAD_STACK_SIZEOF(zephlet_coord_stack)); + return 0; +} + +SYS_INIT(zephlet_coord_workq_init, APPLICATION, 0); + +void zephlet_coord_dispatch(struct k_work *w) +{ + struct zephlet_coord *c = CONTAINER_OF(w, struct zephlet_coord, work); + + LOG_DBG("dispatch %p step=%p", c, c->current); + if (c->current != NULL) { + c->current(c); + } +} + +int zephlet_coord_kick(struct zephlet_coord *c) +{ + if (c->current != NULL) { + LOG_DBG("kick %p rejected: already running", c); + return -EBUSY; + } + c->current = c->entry; + k_work_submit_to_queue(&zephlet_coord_workq, &c->work); + LOG_DBG("kick %p entry=%p", c, c->entry); + return 0; +} + +void zephlet_coord_next(struct zephlet_coord *c, zephlet_coord_step_fn fn) +{ + c->current = fn; + k_work_submit_to_queue(&zephlet_coord_workq, &c->work); + LOG_DBG("next %p step=%p", c, fn); +} + +void zephlet_coord_done(struct zephlet_coord *c) +{ + c->current = NULL; + LOG_DBG("done %p", c); +} + +static const struct zbus_channel *zephlet_coord_async_claim(struct zephlet_coord_async *async) +{ + const struct zbus_channel *chan = NULL; + + K_SPINLOCK(&async->lock) { + if (async->chan != NULL) { + chan = async->chan; + async->chan = NULL; + } + } + return chan; +} + +void zephlet_coord_await(struct zephlet_coord *c, const struct zbus_channel *chan, void *dst, + zephlet_coord_match_fn match, zephlet_coord_step_fn next, + k_timeout_t timeout) +{ + struct zephlet_coord_async *async = CONTAINER_OF(c, struct zephlet_coord_async, base); + int err; + + c->current = next; + + err = zbus_chan_add_obs(chan, async->obs, K_MSEC(CONFIG_ZEPHLETS_COORD_ASYNC_ZBUS_TIMEOUT)); + if (err < 0) { + LOG_ERR("zbus_chan_add_obs failed on %s: %d", get_chan_name(chan), err); + c->current = NULL; + return; + } + + async->dst = dst; + async->match = match; + async->chan = chan; + + err = k_work_schedule_for_queue(&zephlet_coord_workq, &async->timer, timeout); + if (err < 0) { + LOG_ERR("k_work_schedule_for_queue failed: %d", err); + async->chan = NULL; + (void)zbus_chan_rm_obs(chan, async->obs, + K_MSEC(CONFIG_ZEPHLETS_COORD_ASYNC_ZBUS_TIMEOUT)); + c->current = NULL; + return; + } + + LOG_DBG("armed %p: chan=%s timeout=%lld", c, get_chan_name(chan), timeout.ticks); +} + +void zephlet_coord_cleanup_dispatch(struct k_work *w) +{ + struct zephlet_coord_async *async = + CONTAINER_OF(w, struct zephlet_coord_async, cleanup_work); + int err; + + if (async->pending_rm != NULL) { + err = zbus_chan_rm_obs(async->pending_rm, async->obs, + K_MSEC(CONFIG_ZEPHLETS_COORD_ASYNC_ZBUS_TIMEOUT)); + if (err < 0) { + LOG_ERR("zbus_chan_rm_obs (deferred) failed on %s: %d", + get_chan_name(async->pending_rm), err); + } + async->pending_rm = NULL; + } + + k_work_submit_to_queue(&zephlet_coord_workq, &async->base.work); +} + +void zephlet_coord_resolve(struct zephlet_coord *c) +{ + struct zephlet_coord_async *async = CONTAINER_OF(c, struct zephlet_coord_async, base); + const struct zbus_channel *chan = zephlet_coord_async_claim(async); + + if (chan == NULL) { + LOG_DBG("resolve %p: already claimed", c); + return; + } + + k_work_cancel_delayable(&async->timer); + async->pending_rm = chan; + k_work_submit_to_queue(&zephlet_coord_workq, &async->cleanup_work); + + LOG_DBG("resolved %p on event", c); +} + +void zephlet_coord_await_timeout(struct k_work *w) +{ + struct k_work_delayable *dwork = k_work_delayable_from_work(w); + struct zephlet_coord_async *async = CONTAINER_OF(dwork, struct zephlet_coord_async, timer); + const struct zbus_channel *chan = zephlet_coord_async_claim(async); + int err; + + if (chan == NULL) { + LOG_DBG("timeout %p: already claimed", &async->base); + return; + } + + err = zbus_chan_rm_obs(chan, async->obs, K_MSEC(CONFIG_ZEPHLETS_COORD_ASYNC_ZBUS_TIMEOUT)); + if (err < 0) { + LOG_ERR("zbus_chan_rm_obs failed on %s: %d", get_chan_name(chan), err); + } + + k_work_submit_to_queue(&zephlet_coord_workq, &async->base.work); + + LOG_DBG("resolved %p on timeout", &async->base); +} diff --git a/zephlet_coord.h b/zephlet_coord.h new file mode 100644 index 0000000..1ba9be8 --- /dev/null +++ b/zephlet_coord.h @@ -0,0 +1,292 @@ +#ifndef MODULES_ZEPHLETS_SHARED_ZEPHLET_COORD_H +#define MODULES_ZEPHLETS_SHARED_ZEPHLET_COORD_H + +#include +#include +#include + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief Zephlet Coordinator API + * @defgroup zephlet_coord_apis Zephlet Coordinator APIs + * @since 0.1.0 + * @version 0.1.0 + * @{ + */ + +/** + * @brief Shared workqueue for every coordinator in the project. + * + * Coordinators submit their k_work items here rather than to the system + * workqueue so long-running steps cannot stall driver work, debouncers, + * or timer handlers. Started at SYS_INIT(APPLICATION, 0). + */ +extern struct k_work_q zephlet_coord_workq; + +struct zephlet_coord; + +/** + * @brief Step callback shape. + * + * Every step is a plain function of this type. The dispatcher calls + * @c c->current(c); the body recovers its typed per-flow state via + * @c c->ctx and advances via @ref zephlet_coord_next, + * @ref zephlet_coord_await, or @ref zephlet_coord_done. + */ +typedef void (*zephlet_coord_step_fn)(struct zephlet_coord *c); + +/** + * @brief Event-match predicate shape. + * + * Used by @ref zephlet_coord_await to decide whether an incoming + * channel publish qualifies as the awaited event. Return @c true to + * resolve the await; @c false to leave it armed for the next publish. + * Passing @c NULL as the predicate accepts any publish. + */ +typedef bool (*zephlet_coord_match_fn)(const void *msg); + +/** + * @brief Framework-owned coordinator handle. + * + * Allocated by @ref ZEPHLET_COORD_DEFINE or + * @ref ZEPHLET_COORD_ASYNC_DEFINE; never embedded in author code. The + * @c ctx pointer references a user-defined per-flow state struct whose + * contents are bounded by one flow (trigger to @ref zephlet_coord_done). + * + * @see ZEPHLET_COORD_DEFINE + * @see ZEPHLET_COORD_ASYNC_DEFINE + */ +struct zephlet_coord { + /** Generic dispatcher work item; handler is @c zephlet_coord_dispatch. + */ + struct k_work work; + /** Next step to run; NULL means idle. */ + zephlet_coord_step_fn current; + /** First step kicked off by @ref zephlet_coord_kick. */ + zephlet_coord_step_fn entry; + /** User-defined per-flow state; opaque to the framework. */ + void *ctx; +}; + +/** + * @brief Async sidecar for coordinators that may suspend on a zbus event. + * + * Allocated by @ref ZEPHLET_COORD_ASYNC_DEFINE. The public coordinator + * pointer aliases @c base; the framework recovers the sidecar from a + * @c struct zephlet_coord* via CONTAINER_OF inside the await, + * resolve, and timeout paths. + * + * The framework emits a per-coordinator zbus listener at definition + * time; @c obs is initialised to point at it and never changes. The + * listener runs @c match against the incoming message, memcpys into + * @c dst on match, and calls @ref zephlet_coord_resolve. + * + * The race between event-resolve and timeout is protected by @c lock + * and a swap-to-NULL of @c chan: the first path through reads a + * non-NULL @c chan and clears it; the loser sees NULL and bails. + */ +struct zephlet_coord_async { + /** Embedded coordinator; public pointer aliases @c &base. */ + struct zephlet_coord base; + /** Delayable work for the await's bounded wait. */ + struct k_work_delayable timer; + /** Deferred work that runs @c zbus_chan_rm_obs off the listener call + * stack and then advances the flow. + */ + struct k_work cleanup_work; + /** Protects the chan-claim race. */ + struct k_spinlock lock; + /** Channel currently awaited; cleared on claim. */ + const struct zbus_channel *chan; + /** Channel handed to @c cleanup_work for deferred observer removal; + * set on claim, cleared once the obs has been removed. + */ + const struct zbus_channel *pending_rm; + /** Framework-generated observer; set at definition time. */ + const struct zbus_observer *obs; + /** Match predicate; NULL accepts any publish. */ + zephlet_coord_match_fn match; + /** Destination for the matched message memcpy; NULL skips the copy. */ + void *dst; +}; + +/** @cond INTERNAL_HIDDEN */ +void zephlet_coord_dispatch(struct k_work *w); +void zephlet_coord_await_timeout(struct k_work *w); +void zephlet_coord_cleanup_dispatch(struct k_work *w); +/** @endcond */ + +/** + * @brief Start a flow. + * + * Sets @c current to the registered entry step and submits the work + * item to @c zephlet_coord_workq. + * + * The trigger source decides the policy (drop / queue / reject) on + * @c -EBUSY; the framework imposes none. + * + * @warning A trigger source that writes to @c c->ctx before calling + * this function will corrupt the in-flight flow's state on @c -EBUSY. + * Peek @c c->current first, mutate @c ctx only on the success path, + * then call this function as a safety net. + * + * @param[in] c The coordinator instance reference. + * + * @retval 0 Flow started. + * @retval -EBUSY Coordinator is already running. + */ +int zephlet_coord_kick(struct zephlet_coord *c); + +/** + * @brief Queue the next step within an in-flight flow. + * + * Sets @c c->current and re-submits the work item to + * @c zephlet_coord_workq. No re-entry guard — only called from inside + * a step or an await's resolve/timeout path; the dispatcher is + * single-threaded per coordinator. + * + * @param[in] c The coordinator instance reference. + * @param[in] fn The next step to run. + */ +void zephlet_coord_next(struct zephlet_coord *c, zephlet_coord_step_fn fn); + +/** + * @brief Terminate the flow. Marks the coordinator idle. + * + * Does NOT reset @c c->ctx. Anything the author allocated, armed, or + * registered during the flow — timers, dynamic zbus observers, cached + * pointers, accumulated state — must be torn down by the author + * before this call. The framework owns @c base and the async sidecar; + * everything else is yours. + * + * @param[in] c The coordinator instance reference. + */ +void zephlet_coord_done(struct zephlet_coord *c); + +/** + * @brief Suspend the flow until a publish matches or the timeout expires. + * + * Adds the framework-generated observer to @p chan and arms a + * delayable work item for @p timeout. Returns immediately; the step + * exits. + * + * On each publish to @p chan while the await is armed, the framework + * runs @p match against the message. If it returns @c true (or if + * @p match is @c NULL), the framework memcpys @c zbus_chan_msg_size + * bytes into @p dst (if non-NULL), cancels the timer, removes the + * observer, and submits @p next. Non-qualifying events leave the + * await armed. + * + * Distinguishing event-vs-timeout in @p next is the author's job. + * Recommended idiom: pre-zero @p dst before the await and inspect a + * naturally-never-zero field on the way out. + * + * @warning Requires the coordinator to have been defined with + * @ref ZEPHLET_COORD_ASYNC_DEFINE. Calling on a sync-only coordinator + * is a programmer error. + * + * @param[in] c The coordinator instance reference. + * @param[in] chan The channel to observe. + * @param[out] dst Destination for the matched message memcpy; NULL + * to skip the copy. + * @param[in] match Match predicate; NULL accepts any publish. + * @param[in] next Step to run on resolve or timeout. + * @param[in] timeout Bounded wait, or @c K_FOREVER. + */ +void zephlet_coord_await(struct zephlet_coord *c, const struct zbus_channel *chan, void *dst, + zephlet_coord_match_fn match, zephlet_coord_step_fn next, + k_timeout_t timeout); + +/** + * @brief Finalise the await and advance to the pending step. + * + * Normally called by the framework-generated listener emitted by + * @ref ZEPHLET_COORD_ASYNC_DEFINE; surfaced for hand-rolled resolve + * paths. Idempotent against the timer-expiry path via the async + * sidecar's spinlock + swap-chan-to-NULL claim. + * + * @param[in] c The coordinator instance reference. + */ +void zephlet_coord_resolve(struct zephlet_coord *c); + +/** + * @brief Define a synchronous coordinator. + * + * Emits @p _name as a @c struct zephlet_coord *const pointing at + * file-scope storage. The per-flow state variable @p _ctx_var must be + * a static in the same translation unit. + * + * @param[in] _name The coordinator's name. + * @param[in] _ctx_var The static per-flow state variable. + * @param[in] _first_step The entry step kicked by @ref zephlet_coord_kick. + */ +#define ZEPHLET_COORD_DEFINE(_name, _ctx_var, _first_step) \ + static struct zephlet_coord _name##_storage = { \ + .work = Z_WORK_INITIALIZER(zephlet_coord_dispatch), \ + .entry = (_first_step), \ + .ctx = &(_ctx_var), \ + }; \ + static struct zephlet_coord *const _name = &_name##_storage + +/** + * @brief Define a coordinator that may suspend on a zbus event. + * + * Allocates the async sidecar and emits a framework-owned zbus + * listener (@c <_name>_zlet_obs) whose callback runs the await's + * @c match predicate, memcpys the matched message into @c dst, and + * calls @ref zephlet_coord_resolve. The author never writes the + * listener; per-await behaviour is fully specified by the arguments + * to @ref zephlet_coord_await. + * + * Emits at file scope: + * - @c <_name>_storage of type @c struct zephlet_coord_async + * - @c <_name> as @c struct zephlet_coord *const (aliasing @c &storage.base) + * - @c <_name>_zlet_obs / @c <_name>_zlet_cb (framework listener) + * + * @param[in] _name The coordinator's name. + * @param[in] _ctx_var The static per-flow state variable. + * @param[in] _first_step The entry step kicked by @ref zephlet_coord_kick. + */ +#define ZEPHLET_COORD_ASYNC_DEFINE(_name, _ctx_var, _first_step) \ + static void _name##_zlet_cb(const struct zbus_channel *chan); \ + ZBUS_LISTENER_DEFINE(_name##_zlet_obs, _name##_zlet_cb); \ + static struct zephlet_coord_async _name##_storage = { \ + .base = \ + { \ + .work = Z_WORK_INITIALIZER(zephlet_coord_dispatch), \ + .entry = (_first_step), \ + .ctx = &(_ctx_var), \ + }, \ + .timer = Z_WORK_DELAYABLE_INITIALIZER(zephlet_coord_await_timeout), \ + .cleanup_work = Z_WORK_INITIALIZER(zephlet_coord_cleanup_dispatch), \ + .obs = &_name##_zlet_obs, \ + }; \ + static struct zephlet_coord *const _name = &_name##_storage.base; \ + static void _name##_zlet_cb(const struct zbus_channel *chan) \ + { \ + const void *_msg = zbus_chan_const_msg(chan); \ + if (_name##_storage.match != NULL && !_name##_storage.match(_msg)) { \ + return; \ + } \ + if (_name##_storage.dst != NULL) { \ + memcpy(_name##_storage.dst, _msg, zbus_chan_msg_size(chan)); \ + } \ + zephlet_coord_resolve(_name); \ + } + +/** + * @} + */ + +#ifdef __cplusplus +} +#endif + +#endif /* MODULES_ZEPHLETS_SHARED_ZEPHLET_COORD_H */ From d19699e8cadd869d96b31beddb030a29ddb2caf5 Mon Sep 17 00:00:00 2001 From: Victor Miranda Date: Mon, 25 May 2026 15:54:56 -0300 Subject: [PATCH 2/3] test(coord): add ZTEST suite covering kick/await/resolve paths Nine partition-derived cases cover kick + -EBUSY, await match, timeout, predicate filter, dst memcpy, resolve idempotency. Signed-off-by: Victor Miranda --- tests/shared/coord/CMakeLists.txt | 11 + tests/shared/coord/prj.conf | 8 + tests/shared/coord/src/main.c | 328 ++++++++++++++++++++++++++++++ tests/shared/coord/testcase.yaml | 7 + 4 files changed, 354 insertions(+) create mode 100644 tests/shared/coord/CMakeLists.txt create mode 100644 tests/shared/coord/prj.conf create mode 100644 tests/shared/coord/src/main.c create mode 100644 tests/shared/coord/testcase.yaml diff --git a/tests/shared/coord/CMakeLists.txt b/tests/shared/coord/CMakeLists.txt new file mode 100644 index 0000000..85bd7fa --- /dev/null +++ b/tests/shared/coord/CMakeLists.txt @@ -0,0 +1,11 @@ +cmake_minimum_required(VERSION 3.20.0) + +get_filename_component(ZEPHLET_INFRA_ROOT "${CMAKE_CURRENT_SOURCE_DIR}/../../.." ABSOLUTE) + +list(APPEND EXTRA_ZEPHYR_MODULES "${ZEPHLET_INFRA_ROOT}") + +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) + +project(zephlet_coord_test) + +target_sources(app PRIVATE src/main.c) diff --git a/tests/shared/coord/prj.conf b/tests/shared/coord/prj.conf new file mode 100644 index 0000000..95573c6 --- /dev/null +++ b/tests/shared/coord/prj.conf @@ -0,0 +1,8 @@ +CONFIG_ZTEST=y +CONFIG_ZBUS=y +CONFIG_ZBUS_RUNTIME_OBSERVERS=y +CONFIG_ZEPHLETS=y +CONFIG_ZEPHLETS_COORD=y +CONFIG_NANOPB=y +CONFIG_LOG=y +CONFIG_ASSERT=y diff --git a/tests/shared/coord/src/main.c b/tests/shared/coord/src/main.c new file mode 100644 index 0000000..2936f83 --- /dev/null +++ b/tests/shared/coord/src/main.c @@ -0,0 +1,328 @@ +#include +#include + +#include +#include +#include + +#include "zephlet_coord.h" + +/** + * @file + * @brief Tests for the zephlet_coord framework. + * + * Three test coordinators (sync / async / busy) cover the public surface + * declared in zephlet_coord.h. Semaphores synchronise the test thread + * with the coord_workq thread; no k_sleep-based polling. + */ + +struct payload { + int value; +}; + +ZBUS_CHAN_DEFINE(chan_test_events, struct payload, NULL, NULL, ZBUS_OBSERVERS_EMPTY, + ZBUS_MSG_INIT(.value = 0)); + +struct trace { + char buf[17]; + int len; +}; + +static inline void trace_push(struct trace *t, char c) +{ + if (t->len < (int)(sizeof(t->buf) - 1)) { + t->buf[t->len++] = c; + t->buf[t->len] = '\0'; + } +} + +struct test_ctx { + struct trace trace; + void *await_dst; + bool use_match_fn; + bool match_should_accept; + struct payload received; + k_timeout_t await_timeout; + int resume_count; +}; + +static struct test_ctx ctx; + +static K_SEM_DEFINE(done_sem, 0, 1); +static K_SEM_DEFINE(await_armed_sem, 0, 1); +static K_SEM_DEFINE(busy_step_started_sem, 0, 1); +static K_SEM_DEFINE(busy_blocker_sem, 0, 1); + +static void s_sync_a(struct zephlet_coord *c); +static void s_sync_b(struct zephlet_coord *c); +static void s_sync_end(struct zephlet_coord *c); + +ZEPHLET_COORD_DEFINE(coord_sync, ctx, s_sync_a); + +static void s_sync_a(struct zephlet_coord *c) +{ + struct test_ctx *st = c->ctx; + + trace_push(&st->trace, 'A'); + zephlet_coord_next(c, s_sync_b); +} + +static void s_sync_b(struct zephlet_coord *c) +{ + struct test_ctx *st = c->ctx; + + trace_push(&st->trace, 'B'); + zephlet_coord_next(c, s_sync_end); +} + +static void s_sync_end(struct zephlet_coord *c) +{ + struct test_ctx *st = c->ctx; + + trace_push(&st->trace, 'E'); + zephlet_coord_done(c); + k_sem_give(&done_sem); +} + +static bool match_predicate(const void *msg) +{ + ARG_UNUSED(msg); + return ctx.match_should_accept; +} + +static void s_async_await(struct zephlet_coord *c); +static void s_async_resume(struct zephlet_coord *c); + +ZEPHLET_COORD_ASYNC_DEFINE(coord_async, ctx, s_async_await); + +static void s_async_await(struct zephlet_coord *c) +{ + struct test_ctx *st = c->ctx; + + zephlet_coord_await(c, &chan_test_events, st->await_dst, + st->use_match_fn ? match_predicate : NULL, s_async_resume, + st->await_timeout); + k_sem_give(&await_armed_sem); +} + +static void s_async_resume(struct zephlet_coord *c) +{ + struct test_ctx *st = c->ctx; + + st->resume_count++; + zephlet_coord_done(c); + k_sem_give(&done_sem); +} + +static void s_busy_blocker(struct zephlet_coord *c); + +ZEPHLET_COORD_DEFINE(coord_busy, ctx, s_busy_blocker); + +static void s_busy_blocker(struct zephlet_coord *c) +{ + k_sem_give(&busy_step_started_sem); + k_sem_take(&busy_blocker_sem, K_FOREVER); + zephlet_coord_done(c); + k_sem_give(&done_sem); +} + +static void reset(void *fixture) +{ + ARG_UNUSED(fixture); + + memset(&ctx, 0, sizeof(ctx)); + ctx.await_timeout = K_MSEC(50); + k_sem_reset(&done_sem); + k_sem_reset(&await_armed_sem); + k_sem_reset(&busy_step_started_sem); + k_sem_reset(&busy_blocker_sem); +} + +ZTEST_SUITE(zephlet_coord, NULL, NULL, reset, NULL, NULL); + +ZTEST(zephlet_coord, test_sync_chain) +{ + int err = zephlet_coord_kick(coord_sync); + + zassert_equal(err, 0, "kick err=%d", err); + + err = k_sem_take(&done_sem, K_MSEC(500)); + zassert_equal(err, 0, "sync flow did not complete"); + + zassert_str_equal(ctx.trace.buf, "ABE", "trace mismatch"); + zassert_is_null(coord_sync->current, "coord did not return to idle"); + + err = zephlet_coord_kick(coord_sync); + zassert_equal(err, 0, "re-kick rejected: %d", err); + err = k_sem_take(&done_sem, K_MSEC(500)); + zassert_equal(err, 0, "second flow did not complete"); + zassert_equal(ctx.trace.len, 6, "second chain did not run"); +} + +ZTEST(zephlet_coord, test_kick_busy_returns_ebusy) +{ + int err = zephlet_coord_kick(coord_busy); + + zassert_equal(err, 0); + + err = k_sem_take(&busy_step_started_sem, K_MSEC(500)); + zassert_equal(err, 0, "busy step never started"); + + err = zephlet_coord_kick(coord_busy); + zassert_equal(err, -EBUSY, "second kick should -EBUSY, got %d", err); + + k_sem_give(&busy_blocker_sem); + err = k_sem_take(&done_sem, K_MSEC(500)); + zassert_equal(err, 0, "busy flow did not unblock"); +} + +ZTEST(zephlet_coord, test_await_accept_any_publish) +{ + ctx.await_dst = &ctx.received; + ctx.use_match_fn = false; + ctx.await_timeout = K_MSEC(500); + + int err = zephlet_coord_kick(coord_async); + + zassert_equal(err, 0); + zassert_equal(k_sem_take(&await_armed_sem, K_MSEC(500)), 0, "await never armed"); + + struct payload msg = {.value = 42}; + + zbus_chan_pub(&chan_test_events, &msg, K_NO_WAIT); + + zassert_equal(k_sem_take(&done_sem, K_MSEC(500)), 0, "resume did not run"); + zassert_equal(ctx.received.value, 42, "memcpy missed payload"); + zassert_equal(ctx.resume_count, 1); +} + +ZTEST(zephlet_coord, test_await_predicate_filters_non_qualifying) +{ + ctx.await_dst = &ctx.received; + ctx.use_match_fn = true; + ctx.match_should_accept = false; + ctx.await_timeout = K_MSEC(500); + + int err = zephlet_coord_kick(coord_async); + + zassert_equal(err, 0); + zassert_equal(k_sem_take(&await_armed_sem, K_MSEC(500)), 0); + + struct payload msg1 = {.value = 1}; + + zbus_chan_pub(&chan_test_events, &msg1, K_NO_WAIT); + k_msleep(10); + zassert_equal(ctx.resume_count, 0, "non-qualifying event triggered resume"); + zassert_equal(ctx.received.value, 0, "non-qualifying event was memcpy'd"); + + ctx.match_should_accept = true; + + struct payload msg2 = {.value = 99}; + + zbus_chan_pub(&chan_test_events, &msg2, K_NO_WAIT); + + zassert_equal(k_sem_take(&done_sem, K_MSEC(500)), 0, "qualifying event did not resume"); + zassert_equal(ctx.resume_count, 1); + zassert_equal(ctx.received.value, 99); +} + +ZTEST(zephlet_coord, test_await_all_non_qualifying_then_timeout) +{ + ctx.await_dst = &ctx.received; + ctx.use_match_fn = true; + ctx.match_should_accept = false; + ctx.await_timeout = K_MSEC(50); + + int err = zephlet_coord_kick(coord_async); + + zassert_equal(err, 0); + zassert_equal(k_sem_take(&await_armed_sem, K_MSEC(500)), 0); + + for (int i = 0; i < 3; i++) { + struct payload msg = {.value = i + 1}; + + zbus_chan_pub(&chan_test_events, &msg, K_NO_WAIT); + } + + zassert_equal(k_sem_take(&done_sem, K_MSEC(500)), 0, "timeout did not resume"); + zassert_equal(ctx.received.value, 0, "dst should be untouched"); + zassert_equal(ctx.resume_count, 1); +} + +ZTEST(zephlet_coord, test_await_no_publish_timeout) +{ + ctx.await_dst = &ctx.received; + ctx.use_match_fn = false; + ctx.await_timeout = K_MSEC(50); + + int err = zephlet_coord_kick(coord_async); + + zassert_equal(err, 0); + zassert_equal(k_sem_take(&await_armed_sem, K_MSEC(500)), 0); + zassert_equal(k_sem_take(&done_sem, K_MSEC(500)), 0, "timeout did not resume"); + zassert_equal(ctx.received.value, 0); + zassert_equal(ctx.resume_count, 1); +} + +ZTEST(zephlet_coord, test_await_dst_null_no_copy) +{ + ctx.await_dst = NULL; + ctx.use_match_fn = false; + ctx.await_timeout = K_MSEC(500); + + int err = zephlet_coord_kick(coord_async); + + zassert_equal(err, 0); + zassert_equal(k_sem_take(&await_armed_sem, K_MSEC(500)), 0); + + struct payload msg = {.value = 42}; + + zbus_chan_pub(&chan_test_events, &msg, K_NO_WAIT); + + zassert_equal(k_sem_take(&done_sem, K_MSEC(500)), 0); + zassert_equal(ctx.received.value, 0, "ctx.received changed despite dst=NULL"); + zassert_equal(ctx.resume_count, 1); +} + +ZTEST(zephlet_coord, test_resolve_idempotent_after_event) +{ + ctx.await_dst = &ctx.received; + ctx.use_match_fn = false; + ctx.await_timeout = K_MSEC(500); + + int err = zephlet_coord_kick(coord_async); + + zassert_equal(err, 0); + zassert_equal(k_sem_take(&await_armed_sem, K_MSEC(500)), 0); + + struct payload msg = {.value = 42}; + + zbus_chan_pub(&chan_test_events, &msg, K_NO_WAIT); + zassert_equal(k_sem_take(&done_sem, K_MSEC(500)), 0); + zassert_equal(ctx.resume_count, 1); + + zephlet_coord_resolve(coord_async); + k_msleep(50); + zassert_equal(ctx.resume_count, 1, "extra resolve after event caused double-resume"); +} + +ZTEST(zephlet_coord, test_resolve_idempotent_after_timeout) +{ + ctx.await_dst = &ctx.received; + ctx.use_match_fn = false; + ctx.await_timeout = K_MSEC(50); + + int err = zephlet_coord_kick(coord_async); + + zassert_equal(err, 0); + zassert_equal(k_sem_take(&await_armed_sem, K_MSEC(500)), 0); + zassert_equal(k_sem_take(&done_sem, K_MSEC(500)), 0); + zassert_equal(ctx.resume_count, 1); + + struct payload msg = {.value = 42}; + + zbus_chan_pub(&chan_test_events, &msg, K_NO_WAIT); + k_msleep(50); + zassert_equal(ctx.resume_count, 1, "publish after timeout triggered resume"); + zassert_equal(ctx.received.value, 0, "publish after timeout was memcpy'd"); +} diff --git a/tests/shared/coord/testcase.yaml b/tests/shared/coord/testcase.yaml new file mode 100644 index 0000000..76c51ed --- /dev/null +++ b/tests/shared/coord/testcase.yaml @@ -0,0 +1,7 @@ +tests: + zephlet.shared.coord: + tags: zephlet shared coord + harness: ztest + platform_allow: + - native_sim + timeout: 30 From 40cd1113c59a44fe08cf3dc1c59b3c789cf078ff Mon Sep 17 00:00:00 2001 From: Victor Miranda Date: Mon, 25 May 2026 15:59:55 -0300 Subject: [PATCH 3/3] docs(coord): add Coordinators section to README Sample, public surface table, and a one-line bullet under Architecture at a glance. Signed-off-by: Victor Miranda --- README.md | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/README.md b/README.md index 14b0d99..bf3726b 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,7 @@ my_zephlet_start(&my_instance, &st, K_MSEC(500)); - **`events` channel** (value-typed): async fan-out. Publishers call `_emit(z, &ev, timeout)`; consumers observe with `ZEPHLET_EVENTS_LISTENER(instance, type, callback)` (wraps `ZBUS_ASYNC_LISTENER_DEFINE`). - **Non-singleton**: multiple instances per type coexist; each has its own channel pair and data. - **Weak handler overrides**: generator emits `__weak int __impl(...)` returning `-ENOSYS`; user provides strong overrides in `.c`. +- **Coordinators** (optional, `CONFIG_ZEPHLETS_COORD=y`): multi-step flows with workqueue dispatch + bounded zbus-event awaits. Sits above the per-zephlet command/events surface — see [Coordinators](#coordinators) below. ## Adapters @@ -101,6 +102,43 @@ if(CONFIG_ZEPHLET_TICK AND CONFIG_ZEPHLET_UI) endif() ``` +## Coordinators + +Optional framework for multi-step flows that span several zephlets. A coordinator is a singleton state object whose flow is expressed as a chain of step callbacks dispatched on a shared workqueue (`zephlet_coord_workq`), with optional bounded zbus-event awaits. Enable with `CONFIG_ZEPHLETS_COORD=y`. + +Reach for it when an application flow needs state across multiple events (provisioning, OTA, multi-stage tamper response). Stateless event routing stays as plain `ZEPHLET_EVENTS_LISTENER` adapters. + +```c +static struct provisioning_ctx ctx; +ZEPHLET_COORD_ASYNC_DEFINE(provisioning, ctx, s_handshake); + +static void s_handshake(struct zephlet_coord *c) +{ + struct provisioning_ctx *st = c->ctx; + (void)zlet_radio_connect(&radio_instance, &st->cred, K_SECONDS(2)); + zephlet_coord_await(c, &chan_zlet_radio_events, + &st->reply, match_connected, + s_complete, K_SECONDS(5)); +} + +/* trigger source — typically a zbus listener on a flow-local channel */ +if(!zephlet_coord_is_running(provisioning)) { + int err = zephlet_coord_kick(provisioning); + /* err == -EBUSY: author's policy (drop / queue / reject) */ +} +``` + +Public surface (see [`zephlet_coord.h`](zephlet_coord.h)): + +| Operation | Role | +|---|---| +| `ZEPHLET_COORD_DEFINE` / `_ASYNC_DEFINE` | Allocate a sync or async coordinator at file scope. | +| `zephlet_coord_kick(c)` | Start the flow; returns `-EBUSY` if already running. | +| `zephlet_coord_next(c, fn)` | Queue the next step within an in-flight flow. | +| `zephlet_coord_await(c, chan, dst, match, next, timeout)` | Suspend until a matching publish arrives or the timeout fires. The framework-generated listener handles the memcpy. | +| `zephlet_coord_resolve(c)` | Finalise an await; idempotent against the timeout path. | +| `zephlet_coord_done(c)` | Mark the flow idle. | + ## West commands | Command | Purpose |