feat(client): add threaded transport polling#125
Conversation
The client, not the app, now drives the transport; events are delivered asynchronously, per ADR 0001. - ChatClient owns Arc<Mutex<Core>> + a worker thread. - The worker select!s over the inbound and shutdown channels; Drop joins it. Outbound runs on the caller's thread. - InProcessDelivery becomes push fan-out: endpoint(bus) -> (Self, Receiver). - FFI replaces client_receive with client_push_inbound + client_poll_events. - chat-cli drains Receiver<Event>; transport inbound channels switch to crossbeam.
There was a problem hiding this comment.
Pull request overview
This PR refactors the client to asynchronously process inbound transport payloads on a dedicated worker thread, while keeping outbound operations synchronous on the caller’s thread. It updates the in-process transport to fan out messages to subscribed endpoints and adjusts the CLI + FFI surfaces to push inbound bytes and poll/drain produced events.
Changes:
- Introduce a threaded
ChatClientarchitecture: worker consumes inbound payloads and emitsEvents via a channel; outbound calls lock the core and run synchronously. - Replace the in-process “log + cursor” transport with a push-based
MessageBusthat routes published messages to subscribed endpoints. - Update CLI, examples, tests, and FFI to the new “push inbound + poll/drain events” model; switch several inbound channels to
crossbeam-channel.
Reviewed changes
Copilot reviewed 16 out of 18 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/client/src/client.rs | Refactors ChatClient to own Arc<Mutex<Core>> plus a worker thread and event channel. |
| crates/client/src/delivery_in_process.rs | Replaces cursor-based bus with fan-out routing to endpoint receivers via subscriptions. |
| crates/client/src/lib.rs | Removes Cursor re-export to match the new in-process delivery design. |
| crates/client/tests/saro_and_raya.rs | Updates tests to assert against asynchronously delivered Events; adds worker shutdown + publish failure tests. |
| crates/client/examples/message-exchange/main.rs | Updates example to drain event receivers instead of pulling from a cursor and calling receive(). |
| crates/client/Cargo.toml | Adds crossbeam-channel and tracing; includes a redundant dev-dependency entry. |
| crates/client-ffi/src/api.rs | Replaces client_receive with client_push_inbound + client_poll_events; updates handle to store inbound sender + event receiver. |
| crates/client-ffi/src/delivery.rs | Minor formatting change. |
| crates/client-ffi/examples/message-exchange/src/main.c | Updates C example to push inbound and poll events with a timeout loop. |
| crates/client-ffi/Cargo.toml | Adds crossbeam-channel dependency. |
| bin/chat-cli/src/main.rs | Wires ChatClient::open/new to accept inbound receiver and passes event receiver to the app. |
| bin/chat-cli/src/app.rs | Switches incoming processing from payload decryption in the app to draining Events from the client. |
| bin/chat-cli/src/ui.rs | Updates generic bounds to require Send delivery services (threaded client). |
| bin/chat-cli/src/transport/logos_delivery.rs | Switches inbound/subscriber channels to crossbeam-channel and keeps outbound on mpsc. |
| bin/chat-cli/src/transport/file.rs | Switches inbound channel to crossbeam-channel to feed the client worker. |
| bin/chat-cli/Cargo.toml | Adds crossbeam-channel dependency; removes a stray blank line. |
| Cargo.toml | Adds workspace dependency declaration for crossbeam-channel. |
| Cargo.lock | Records new crossbeam-channel dependency and updates dependent package entries. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| fn register(&self, address: &str, sender: Sender<Message>) { | ||
| self.routes | ||
| .lock() | ||
| .unwrap() | ||
| .entry(address.to_string()) | ||
| .or_default() | ||
| .push(sender); | ||
| } |
| fn publish(&self, address: &str, data: Message) { | ||
| if let Some(senders) = self.routes.lock().unwrap().get(address) { | ||
| for tx in senders { | ||
| // A disconnected endpoint (dropped client) is harmless here. | ||
| let _ = tx.send(data.clone()); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
@osmaczko The DeliveryService trait can be updated if you'd like to return a Result from publish
| let events = { | ||
| let mut core = core.lock().unwrap(); | ||
| match core.handle_payload(&bytes) { | ||
| Ok(outcome) => events_from_inbound(outcome), | ||
| Err(e) => { | ||
| tracing::warn!("inbound handle_payload failed: {e:?}"); | ||
| Vec::new() | ||
| } |
| [dev-dependencies] | ||
| # Workspace dependencies (sorted) | ||
| crossbeam-channel = { workspace = true } | ||
|
|
jazzz
left a comment
There was a problem hiding this comment.
I was expecting to see an AsyncTask model for handling concurrency, however I think the threading model has some strengths too. Specifically it does not require awaits all over the place.
I think the Queue approach is a huge hit to the Public Developer API, which would add complexity to integrating with existing applications. Not technically a blocker, but it represents a major shift.
Moving forward I'd like to see a Developer interface emerge which is easier to use, rather than more powerful. This current step appears to be opinionated, however doesn't make ChatClient easier to use.
I suggest as this work progresses that wither ClientCore becomes signficantly more opinionated. Either Client internally defines Services such as LogosDelivery, and a GenericClient is created later). Or a LogosChatClient is created which is more straight forward and easier to use.
Happy to approve once there are responses to comments
| fn publish(&self, address: &str, data: Message) { | ||
| if let Some(senders) = self.routes.lock().unwrap().get(address) { | ||
| for tx in senders { | ||
| // A disconnected endpoint (dropped client) is harmless here. | ||
| let _ = tx.send(data.clone()); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
@osmaczko The DeliveryService trait can be updated if you'd like to return a Result from publish
| delivery: D, | ||
| inbound: Receiver<Vec<u8>>, |
There was a problem hiding this comment.
[?] I don't understand this.
The DS ownership is being passed to ChatClient, however population of the queue is occurring externally?
| inbound: Receiver<Vec<u8>>, | ||
| ) -> Result<(Self, mpsc::Receiver<Event>), ClientError> { |
There was a problem hiding this comment.
[Sand] Clients must pass in a crossbeam channel, however receive a std::sync::mpsc Receiver. Is there a reason to not use Crossbeam consistently - The dependency is already required.
| inbound: Receiver<Vec<u8>>, | ||
| ) -> Result<(Self, mpsc::Receiver<Event>), ClientError> { |
There was a problem hiding this comment.
[Sand] Returning a "Processing Queue" is not very ergonomic for Applications.
We should have an explicit reason why this design choice was made over using callbacks.
| let worker = thread::spawn({ | ||
| let core = Arc::clone(&core); | ||
| move || worker_loop(core, inbound, shutdown_rx, event_tx) | ||
| }); |
There was a problem hiding this comment.
[Sand] The threading model is not very flexible.
- LogosDelivery/FileTransport spawn a thread:
- which places messages into a queue via callback.
- Client spawns a thread to:
- listen to the queue
- processes the result
- places event into the event queue.
- App then blocks the mainthread to:
- poll the events queue
- update the UI.
Callbacks from the Client->App would allow for more flexible threading models.e.g AsyncTasks, Direct Invocation, and dedicated UI mainthread. As it stands now, any UI framework would require the blocking thread to listen for messages, and any tokio based system would have to create a blocking thread pool via spawn_blocking
| loop { | ||
| select! { | ||
| recv(inbound) -> msg => { |
There was a problem hiding this comment.
[!] DeMLS adds more requirements such as a WakeupService which can call the Conversation after N seconds, to continue processing.
| pub fn new( | ||
| name: impl Into<String>, | ||
| delivery: D, | ||
| inbound: Receiver<Vec<u8>>, |
There was a problem hiding this comment.
This input parameter changes the contract for the DS at the Client level.
[Sand] Be explicit about the requirements from a service.
Extensions/components/local_broadcaster does not use queues, and would be incompatible with ChatClient. This puts more pressure on App Developers to wire components together. Either add a trait to make an explicit requirement on Services such as DS, or find a more flexible mechanism.
e.g.
trait QueuedResult {
fn get_event_queue(...)
}
or
trait AcceptCallback {
fn register_callback(...)
}
The client, not the app, now drives the transport; events are delivered asynchronously, per ADR 0001.