Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ set_property(
if(CONFIG_ZEPHLETS)
zephyr_library()
zephyr_library_sources("${SHARED_ZEPHLET_DIR}/zephlet.c")
zephyr_library_sources_ifdef(CONFIG_ZEPHLETS_COORD
"${SHARED_ZEPHLET_DIR}/zephlet_coord.c")
zephyr_include_directories(.)
zephyr_linker_sources(DATA_SECTIONS zephlet_iterables.ld)

Expand Down
36 changes: 36 additions & 0 deletions Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,42 @@ config ZEPHLET_MAX_INSTANCES
`STRUCT_SECTION_FOREACH(zephlet, ...)` at runtime. Used to size
the SYS_INIT init-priority ordering buffer.

config ZEPHLETS_COORD
bool "Coordinator framework"
help
Step-based workflow primitive for orchestrating multi-zephlet
flows on a dedicated workqueue, with optional zbus-event await
and bounded timeout. See zephlet_coord.h for the public surface.

if ZEPHLETS_COORD

config ZEPHLETS_COORD_STACK_SIZE
int "Stack size for the shared coordinator workqueue"
default 2048
help
Stack size in bytes for the zephlet_coord_workq thread shared by
all coordinators in the application.

config ZEPHLETS_COORD_PRIORITY
int "Priority for the shared coordinator workqueue"
default 10
help
Thread priority for the zephlet_coord_workq thread. Coordinator
steps tend to be longer-running than typical k_work, so they get
their own thread rather than the system workqueue.

config ZEPHLETS_COORD_ASYNC_ZBUS_TIMEOUT
int "Timeout for the async API of zephlet_coord when claiming channels"
default 25
help
Timeout in milliseconds for the async API of zephlet_coord when
claiming channels. This is the timeout for the underlying
zbus_chan_add_obs and zbus_chan_rm_obs calls to complete,
both of which can fail if the channel is already claimed by a
third party.

endif # ZEPHLETS_COORD

config ZEPHLETS_COAP
bool "CoAP frontend for zephlets"
select NETWORKING
Expand Down
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ my_zephlet_start(&my_instance, &st, K_MSEC(500));
- **`events` channel** (value-typed): async fan-out. Publishers call `<type>_emit(z, &ev, timeout)`; consumers observe with `ZEPHLET_EVENTS_LISTENER(instance, type, callback)` (wraps `ZBUS_ASYNC_LISTENER_DEFINE`).
- **Non-singleton**: multiple instances per type coexist; each has its own channel pair and data.
- **Weak handler overrides**: generator emits `__weak int <type>_<cmd>_impl(...)` returning `-ENOSYS`; user provides strong overrides in `<prefix>.c`.
- **Coordinators** (optional, `CONFIG_ZEPHLETS_COORD=y`): multi-step flows with workqueue dispatch + bounded zbus-event awaits. Sits above the per-zephlet command/events surface — see [Coordinators](#coordinators) below.

## Adapters

Expand All @@ -101,6 +102,43 @@ if(CONFIG_ZEPHLET_TICK AND CONFIG_ZEPHLET_UI)
endif()
```

## Coordinators

Optional framework for multi-step flows that span several zephlets. A coordinator is a singleton state object whose flow is expressed as a chain of step callbacks dispatched on a shared workqueue (`zephlet_coord_workq`), with optional bounded zbus-event awaits. Enable with `CONFIG_ZEPHLETS_COORD=y`.

Reach for it when an application flow needs state across multiple events (provisioning, OTA, multi-stage tamper response). Stateless event routing stays as plain `ZEPHLET_EVENTS_LISTENER` adapters.

```c
static struct provisioning_ctx ctx;
ZEPHLET_COORD_ASYNC_DEFINE(provisioning, ctx, s_handshake);

static void s_handshake(struct zephlet_coord *c)
{
struct provisioning_ctx *st = c->ctx;
(void)zlet_radio_connect(&radio_instance, &st->cred, K_SECONDS(2));
zephlet_coord_await(c, &chan_zlet_radio_events,
&st->reply, match_connected,
s_complete, K_SECONDS(5));
}

/* trigger source — typically a zbus listener on a flow-local channel */
if(!zephlet_coord_is_running(provisioning)) {
int err = zephlet_coord_kick(provisioning);
/* err == -EBUSY: author's policy (drop / queue / reject) */
}
```

Public surface (see [`zephlet_coord.h`](zephlet_coord.h)):

| Operation | Role |
|---|---|
| `ZEPHLET_COORD_DEFINE` / `_ASYNC_DEFINE` | Allocate a sync or async coordinator at file scope. |
| `zephlet_coord_kick(c)` | Start the flow; returns `-EBUSY` if already running. |
| `zephlet_coord_next(c, fn)` | Queue the next step within an in-flight flow. |
| `zephlet_coord_await(c, chan, dst, match, next, timeout)` | Suspend until a matching publish arrives or the timeout fires. The framework-generated listener handles the memcpy. |
| `zephlet_coord_resolve(c)` | Finalise an await; idempotent against the timeout path. |
| `zephlet_coord_done(c)` | Mark the flow idle. |

## West commands

| Command | Purpose |
Expand Down
11 changes: 11 additions & 0 deletions tests/shared/coord/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
cmake_minimum_required(VERSION 3.20.0)

get_filename_component(ZEPHLET_INFRA_ROOT "${CMAKE_CURRENT_SOURCE_DIR}/../../.." ABSOLUTE)

list(APPEND EXTRA_ZEPHYR_MODULES "${ZEPHLET_INFRA_ROOT}")

find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})

project(zephlet_coord_test)

target_sources(app PRIVATE src/main.c)
8 changes: 8 additions & 0 deletions tests/shared/coord/prj.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CONFIG_ZTEST=y
CONFIG_ZBUS=y
CONFIG_ZBUS_RUNTIME_OBSERVERS=y
CONFIG_ZEPHLETS=y
CONFIG_ZEPHLETS_COORD=y
CONFIG_NANOPB=y
CONFIG_LOG=y
CONFIG_ASSERT=y
Loading
Loading