feat: introduce client event system#104
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a client-facing event system across libchat and its consumers, shifting inbound processing from “return optional content” to “emit events” and updating delivery transports to support non-blocking polling.
Changes:
- Add core
Event/FailureReasontypes and update conversation/inbox handlers to returnVec<Event>instead ofOption<ContentData>. - Redesign
DeliveryServiceto beSend + Sync, use&selfmethods, and add non-blockingpull()for inbound batches. - Update client crate, CLI, and FFI to run a translator loop that pulls inbound payloads and forwards
Events to callers/tests.
Reviewed changes
Copilot reviewed 27 out of 29 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| flake.nix | Adds devshell dependency (perl) and adjusts forAllSystems args. |
| extensions/components/src/delivery/local_broadcaster.rs | Reworks LocalBroadcaster to Arc<Mutex<…>> + pull() API. |
| crates/client/tests/saro_and_raya.rs | Migrates client tests from raw payload receive to event assertions. |
| crates/client/src/lib.rs | Re-exports new core event types; drops ContentData re-export. |
| crates/client/src/delivery_in_process.rs | Updates in-process bus to Mutex and implements subscription + pull(). |
| crates/client/src/client.rs | Introduces translator thread + event channel; updates client API to return events. |
| crates/client/examples/message-exchange/main.rs | Updates example to consume Events instead of manual cursor/receive. |
| crates/client/Cargo.toml | Adds tracing dependency for translator logging. |
| crates/client-ffi/src/delivery.rs | Implements inbound queue + pull() for FFI delivery. |
| crates/client-ffi/src/api.rs | Replaces “receive” API with push-inbound + drain-events event list API. |
| crates/client-ffi/examples/message-exchange/src/main.c | Updates C example to push inbound bytes and drain event batches. |
| core/integration_tests_core/tests/private_integration.rs | Updates integration tests to poll delivery + assert Events. |
| core/integration_tests_core/tests/mls_integration.rs | Updates MLS tests to use Events; adds regression test for group join visibility. |
| core/conversations/src/types.rs | Removes ContentData type in favor of events. |
| core/conversations/src/service_traits.rs | Updates DeliveryService trait to Send + Sync and adds pull(). |
| core/conversations/src/lib.rs | Exposes new event module/types; removes ContentData export. |
| core/conversations/src/inbox/handler.rs | Converts inbox processing to emit ConversationStarted + follow-on events. |
| core/conversations/src/inbox_v2.rs | Emits ConversationStarted for group welcome handling. |
| core/conversations/src/event.rs | Adds core event definitions (Event, EnvelopeId, FailureReason). |
| core/conversations/src/conversation/privatev1.rs | Emits MessageReceived events instead of ContentData. |
| core/conversations/src/conversation/group_v1.rs | Emits message events; returns publish-failure events for group operations. |
| core/conversations/src/conversation.rs | Updates Convo/GroupConvo traits to return Vec<Event>. |
| core/conversations/src/context.rs | Updates context to publish internally and return events; adds delivery sharing via Arc. |
| Cargo.lock | Records new dependency (tracing). |
| bin/chat-cli/src/transport/logos_delivery.rs | Updates transport to implement pull() for inbound draining. |
| bin/chat-cli/src/transport/file.rs | Updates file transport to internalize inbound channel + implement pull(). |
| bin/chat-cli/src/main.rs | Refactors CLI startup to rely on event receiver from ChatClient. |
| bin/chat-cli/src/app.rs | Updates UI app to process Events and handle send-side events. |
| .gitignore | Updates ignored path for the compiled C FFI example binary. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self.state.lock().unwrap().outbound_msgs.push(id); | ||
| self.shared.lock().unwrap().messages.push_back(envelope); |
| /// High-level client that owns its transport loop. | ||
| /// | ||
| /// `ChatClient` wraps a `Context` and spawns a translator thread that drains | ||
| /// the `DeliveryService` via [`DeliveryService::recv`], runs each inbound | ||
| /// payload through `Context::handle_payload`, and forwards the resulting | ||
| /// events onto the `Receiver<Event>` returned at construction. | ||
| pub struct ChatClient<D: DeliveryService> { |
| fn wrap(ctx: ChatContext<D>) -> (Self, mpsc::Receiver<Event>) { | ||
| let delivery = ctx.delivery_arc(); | ||
| let ctx = Arc::new(Mutex::new(ctx)); | ||
| let (event_tx, event_rx) = mpsc::channel(); | ||
| let translator_ctx = Arc::clone(&ctx); | ||
| thread::spawn(move || translator_loop(delivery, translator_ctx, event_tx)); | ||
| (Self { ctx }, event_rx) | ||
| } |
| loop { | ||
| let batch = delivery.pull(); | ||
| if batch.is_empty() { | ||
| // Idle: back off briefly. When traffic is flowing we skip the | ||
| // sleep — the next iteration drains again immediately. | ||
| std::thread::sleep(std::time::Duration::from_millis(50)); | ||
| continue; | ||
| } |
| // Subscribe | ||
| ds.borrow_mut() | ||
| .subscribe(&pq_inbox.delivery_address()) | ||
| // Subscribe to both inbox addresses so DS::recv yields their traffic. |
|
|
||
| /// Cloned reference-counted handle to the delivery service. Used by | ||
| /// `ChatClient` to give its translator thread its own handle so the | ||
| /// thread can call `DS::recv` without locking the context. |
| /// (exposed via `client_push_inbound`); the client's translator thread drains | ||
| /// the matching `Receiver` via `recv`. This shape lets a C/Nim app feed | ||
| /// network bytes synchronously without owning an event loop. |
| pub struct ClientHandle { | ||
| client: ChatClient<CDelivery>, | ||
| /// Inbound bytes pushed by `client_push_inbound`. The translator thread | ||
| /// inside `client` consumes the matching `Receiver` via `CDelivery::recv`. |
| /// receiver for inbound raw payloads. | ||
| pub fn start(cfg: Config) -> Result<(Self, mpsc::Receiver<Vec<u8>>), DeliveryError> { | ||
| /// Start the embedded logos-delivery node. Returns the service handle — | ||
| /// inbound payloads are drained via the `DeliveryService::recv` trait |
4b9f977 to
0c0af45
Compare
0c0af45 to
f21ffca
Compare
jazzz
left a comment
There was a problem hiding this comment.
Only a quick glance as this is a draft.
Overall I can see this PR is making some much needed improvements. Specifically addressing ownership and simplifying execution pathways.
Few notes:
- This PR seems much bigger than it needs to be. Looks like we could just focus on event passing from the core, and leave DS updates until a later PR.
- I'd like to see the platform client be the object which is async/thread aware. Even if that means that Client uses async await via Tokio.
- Instead of adding more functionality to traits consider defining another one which describes additional behavior. eg.
pub trait CanFetch: fn fetch(.....this makes managing traits easier.
| /// This interface allows Conversations to send payloads on the wire as well as | ||
| /// register interest in delivery_addresses. Client implementations are responsible | ||
| /// for providing the inbound payloads to Context::handle_payload. | ||
| pub trait DeliveryService: Debug { | ||
| /// This interface allows Conversations to send payloads on the wire, register | ||
| /// interest in delivery_addresses, and pull inbound payloads. | ||
| pub trait DeliveryService: Debug + Send + Sync { | ||
| type Error: Display + Debug; | ||
| fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error>; | ||
| fn subscribe(&mut self, delivery_address: &str) -> Result<(), Self::Error>; | ||
| fn publish(&self, envelope: AddressedEnvelope) -> Result<(), Self::Error>; | ||
| fn subscribe(&self, delivery_address: &str) -> Result<(), Self::Error>; | ||
|
|
||
| /// Return every inbound payload that has arrived since the last call. | ||
| /// Non-blocking; returns an empty vec when nothing is available. | ||
| fn pull(&self) -> Vec<Vec<u8>>; |
There was a problem hiding this comment.
I see a few things happening here.
- I see the DS changing to use Interior mutability (Mutref -> ref)
- The addition of a Pull function.
It would take some work to convince me this is the preferred approach - specifically number 2.
I would expect that traits reflect the functionality needed at a given level in the architecture. I would expect that the layer performing the the poll would add additional requirements if needed, and expose them to its providers, rather than adding it as a blanket requirement everywhere.
"Do we really want ConversationTypes to call Poll?"
| identity: Rc<Identity>, | ||
| ds: Rc<RefCell<DS>>, | ||
| store: Rc<RefCell<CS>>, | ||
| identity: Arc<Identity>, | ||
| ds: Arc<DS>, | ||
| store: Arc<Mutex<CS>>, |
There was a problem hiding this comment.
Making Context thread safe seems like its adding complexity to our pipeline - but perhaps there is a good reason for it.
In general "Context" is getting too state aware, and I'd like to see ownership move up the the platform client level which could handle threading, io, etc.
In the ServiceContext model these would all be passed in on invocation.
So similar to this: Example however it doesn't own the ServiceContext - Platform Client does. This removes all runtime borrow checks, making the core synchronous and simple.
I think its fair that the Context/CoreClient is difficult to use, and the platform client provides the DX sugar, and owns the Services.
| /// Encrypt and publish `content`. This is the publish-side counterpart of | ||
| /// `Convo::send_message`. Returns the observation events generated while | ||
| /// publishing — typically empty on success. | ||
| fn send_content(&mut self, content: &[u8]) -> Result<Vec<Event>, ChatError>; |
There was a problem hiding this comment.
Returning events makes sense to me.
Theres an option to add a EventService, where clients can push events. However I like that this is a clean type driven approach.
| pub enum Event { | ||
| #[non_exhaustive] | ||
| ConversationStarted { | ||
| conversation_id: ConversationIdOwned, | ||
| }, | ||
| #[non_exhaustive] | ||
| MessageReceived { | ||
| conversation_id: ConversationIdOwned, | ||
| data: Vec<u8>, | ||
| }, | ||
| #[non_exhaustive] | ||
| DeliveryFailed { | ||
| conversation_id: ConversationIdOwned, | ||
| /// `None` when the failure isn't tied to a specific outbound envelope. | ||
| envelope_id: Option<EnvelopeId>, | ||
| reason: FailureReason, | ||
| }, | ||
| } |
There was a problem hiding this comment.
To maintain our deterministic parsing tree approach:
I'd love for our Event structure to capture in a self-describing way which events are destined for the Application. When I go to spec this, I want there to be no ambiguity whether an event should be passed to an App or consumed by an internal layer.
If "All Events MUST be passed to application" that works. However I want to avoid cases where a new Event is added in the future which creates ambiguity.
There was a problem hiding this comment.
Good point, addressed in #106 by two event types, one per layer.
| DeliveryFailed { | ||
| conversation_id: ConversationIdOwned, | ||
| /// `None` when the failure isn't tied to a specific outbound envelope. | ||
| envelope_id: Option<EnvelopeId>, | ||
| reason: FailureReason, | ||
| }, |
There was a problem hiding this comment.
We will likely need some clarity between how Events and Errors Differ assuming we still have both
No description provided.