diff --git a/README.md b/README.md index 6c1ba16..55baaf5 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ What makes it fun: - [Swarms and discovery](#swarms-and-discovery) - [Capabilities](#capabilities) - [Ask: delegate work](#ask-delegate-work) -- [Broadcast: ask the swarm](#broadcast-ask-the-swarm) +- [Broadcast: shared conversations](#broadcast-shared-conversations) - [Periodic tasks](#periodic-tasks) - [Artifacts and agent cards](#artifacts-and-agent-cards) - [Heartbeats](#heartbeats) @@ -313,7 +313,7 @@ methods = await client.call("_synapse.methods") ## 🤝 Ask: delegate work -Use `@node.ask` for the default task handler on a node. +Use `@node.ask` for the default task handler on a node. Synapse provides the transport; your agent code decides whether and how to answer. ```python from synapse_p2p import Node @@ -326,7 +326,11 @@ async def handle(task: str, context: dict): return {"status": "done", "task": task} ``` -Ask a peer to do work: +There are two ways to use it. + +### Direct ask + +Call one known peer directly with `_node.ask`: ```python from synapse_p2p import Client @@ -338,9 +342,29 @@ result = await Client.from_peer(peer).call( ) ``` +### Swarm ask + +Broadcast to the built-in `synapse.ask` endpoint when you want any interested node to wade in: + +```python +broadcast = await node.broadcast( + "synapse.ask", + "Review this diff", + context={"diff": diff}, +) +``` + +A node with an `@node.ask` handler will ACK the conversation, run the handler, and reply with the result. Nodes without a handler fail quietly from the caller's point of view, just like any other broadcast recipient that cannot help. + +The CLI wraps this flow: + +```bash +sn ask foo.electron.network "Review this diff" --context url=https://github.com/org/repo/pull/1 +``` + --- -## 💬 Broadcast: ask the swarm +## 💬 Broadcast: shared conversations Use broadcast when you do not know which node should answer. @@ -367,11 +391,48 @@ for reply in node.replies(broadcast): print(reply.peer.name, reply.result) ``` +Synapse also keeps a lightweight conversation event log. A broadcast creates a `message` event whose `conversation_id` is the broadcast nonce. Nodes may opt into the conversation with `ack` or other events; Synapse does not decide who should answer. + +```python +@node.endpoint("team.question") +async def answer(question: str, broadcast: Broadcast) -> dict: + # ACK means "I saw this and am choosing to wade in". + # It does not mean Synapse assigned this node the work. + await node.ack(broadcast, {"seen": True}) + await node.reply(broadcast, {"answer": "I can help"}) + return {"accepted": True} +``` + +Listen for conversation events: + +```python +from synapse_p2p import ConversationEvent + + +@node.on("conversation.ack") +async def on_ack(event: ConversationEvent) -> None: + print(event.peer.name, "acked", event.conversation_id) + + +for event in node.conversation(broadcast): + print(event.kind, event.peer.name, event.payload) +``` + +Built-in conversation event kinds are intentionally small conventions: + +- `message` — a broadcast or conversation message was seen +- `ack` — a node chose to acknowledge / enter the conversation +- `reply` — a node replied with a result + +Higher-level agent frameworks can layer routing, claiming, status, artifacts, or task semantics on top by emitting their own event kinds with `node.emit_conversation_event(...)`. + Why this is useful: - one broadcast creates one shared conversation - every node sees the same nonce -- replies group without a central coordinator +- nodes can wade in or stay silent +- ACK is opt-in, not automatic assignment +- replies and events group without a central coordinator - UUIDv7 nonces keep conversations roughly time-ordered when the runtime supports them --- @@ -515,6 +576,25 @@ sn watch foo.electron.network --team backend sn watch foo.electron.network --no-capabilities ``` +### 🙋 Ask from the terminal + +`sn ask` broadcasts to the built-in `synapse.ask` endpoint. Nodes with a `@node.ask` handler can opt in with ACK and reply with their result. + +```bash +sn ask foo.electron.network "Review this diff" +sn ask foo.electron.network "Review this diff" --context url=https://github.com/org/repo/pull/1 +sn ask foo.electron.network "Who can help?" --forever +``` + +Example output: + +```text +ask: 019e4ab0-1d0d-709a-... +waiting for ACKs and replies... press Ctrl+C to stop +✓ reviewer acked +- reviewer: LGTM after fixing tests +``` + ### 📣 Broadcast from the terminal ```bash @@ -574,11 +654,13 @@ Built-in endpoints: | `_synapse.join` | join through a seed | | `_synapse.heartbeat` | update peer liveness | | `_synapse.broadcast.reply` | reply to a broadcast nonce | +| `_synapse.conversation.event` | gossip a shared conversation event | | `_synapse.artifacts` | list advertised artifacts | | `_synapse.artifact.get` | fetch one advertised artifact | | `_node.info` | name, role, description, capabilities | | `_node.capabilities` | machine-readable capabilities | -| `_node.ask` | delegate to the node ask handler | +| `_node.ask` | delegate directly to the node ask handler | +| `synapse.ask` | swarm-facing ask endpoint used by `sn ask` | Wire format: @@ -612,8 +694,20 @@ Response: Useful low-level exports: ```python -from synapse_p2p import Broadcast, BroadcastReply, Capability, Client, Node, Peer -from synapse_p2p import RPCError, RPCRequest, RPCResponse +from synapse_p2p import ( + AdvertisedArtifact, + Broadcast, + BroadcastReply, + Capability, + Client, + ConversationEvent, + Node, + Peer, + RPCError, + RPCRequest, + RPCResponse, + ServedArtifact, +) ``` Enable logs when debugging: @@ -634,7 +728,7 @@ Those belong above Synapse. Synapse is the substrate: -> nodes + discovery + capabilities + heartbeats + broadcasts + schedules + a tiny protocol +> nodes + discovery + capabilities + conversations + artifacts + heartbeats + schedules + a tiny protocol --- diff --git a/synapse_p2p/__init__.py b/synapse_p2p/__init__.py index 454de7c..cc95196 100644 --- a/synapse_p2p/__init__.py +++ b/synapse_p2p/__init__.py @@ -29,6 +29,7 @@ Broadcast, BroadcastReply, Connection, + ConversationEvent, NodeKind, Peer, ServedArtifact, @@ -44,6 +45,7 @@ "BroadcastReply", "Client", "Connection", + "ConversationEvent", "CronSchedule", "MessagePackRPCSerializer", "Node", diff --git a/synapse_p2p/cli.py b/synapse_p2p/cli.py index 8c29a8b..e7fca79 100644 --- a/synapse_p2p/cli.py +++ b/synapse_p2p/cli.py @@ -11,7 +11,7 @@ from zeroconf import ServiceStateChange from zeroconf.asyncio import AsyncServiceBrowser, AsyncZeroconf -from synapse_p2p import Broadcast, BroadcastReply, Node, Peer +from synapse_p2p import Broadcast, BroadcastReply, ConversationEvent, Node, Peer from synapse_p2p.mdns import SERVICE_TYPE from synapse_p2p.node import Seed @@ -29,6 +29,16 @@ def _parse_seed(seed: list[str] | None) -> list[Seed]: return list(seed or []) +def _seed_host(seed: Seed) -> str: + if isinstance(seed, tuple): + return seed[0] + return seed.rsplit(":", 1)[0] + + +def _loopback_only(seeds: list[Seed], mdns: bool) -> bool: + return bool(seeds) and all(_seed_host(seed).startswith("127.") for seed in seeds) and not mdns + + def _online_dot( last_seen: float, timeout: float, @@ -61,12 +71,23 @@ def _format_result(result: Any) -> str: return str(result) +def _parse_key_value(items: list[str] | None) -> dict[str, str]: + parsed: dict[str, str] = {} + for item in items or []: + if "=" not in item: + raise typer.BadParameter(f"expected key=value, got {item!r}") + key, value = item.split("=", 1) + parsed[key] = value + return parsed + + def _event(kind: str, text: str) -> str: styles = { "joined": "bold white on dark_green", "heartbeat": "grey50 on grey15", "offline": "bold white on dark_red", "message": "bold white on purple4", + "ack": "bold black on bright_yellow", "reply": "bold black on bright_cyan", } labels = { @@ -74,6 +95,7 @@ def _event(kind: str, text: str) -> str: "heartbeat": "BEAT", "offline": "OFFLINE", "message": "ASK", + "ack": "ACK", "reply": "REPLY", } style = styles.get(kind, "bold white on grey23") @@ -389,7 +411,7 @@ async def _broadcast( discover: float, timeout: float | None, ) -> None: - loopback_only = bool(seeds) and all(seed[0].startswith("127.") for seed in seeds) and not mdns + loopback_only = _loopback_only(seeds, mdns) node = Node( name=name, role="speaker", @@ -470,6 +492,138 @@ def broadcast_command( ) +async def _ask( + swarm: str, + team: str, + task: str, + name: str, + seeds: list[Seed], + mdns: bool, + discover: float, + timeout: float | None, + context: dict[str, str] | None = None, +) -> None: + loopback_only = _loopback_only(seeds, mdns) + node = Node( + name=name, + role="asker", + swarm=swarm, + team=team, + capabilities=["ask"], + seeds=seeds, + mdns=mdns, + bind="127.0.0.1" if loopback_only else "0.0.0.0", + advertise="127.0.0.1" if loopback_only else "auto", + ) + replies: asyncio.Queue[BroadcastReply] = asyncio.Queue() + acks: asyncio.Queue[ConversationEvent] = asyncio.Queue() + seen_acks: set[str] = set() + + @node.on("conversation.ack") + async def on_ack(event: ConversationEvent) -> None: + await acks.put(event) + + @node.on("broadcast.reply") + async def on_reply(reply: BroadcastReply) -> None: + await replies.put(reply) + + await node.start() + await node.join() + await asyncio.sleep(discover) + + broadcast = await node.broadcast("synapse.ask", task, context=context or {}) + typer.echo(f"ask: {broadcast.nonce}") + typer.echo("waiting for ACKs and replies... press Ctrl+C to stop") + + try: + while True: + reply_task = asyncio.create_task(replies.get()) + ack_task = asyncio.create_task(acks.get()) + pending = {reply_task, ack_task} + try: + done, pending = await asyncio.wait( + pending, + timeout=timeout, + return_when=asyncio.FIRST_COMPLETED, + ) + finally: + for task_item in pending: + task_item.cancel() + + if not done: + if not node.replies(broadcast): + typer.echo("no replies") + return + + for task_item in done: + item = task_item.result() + if isinstance(item, BroadcastReply): + if item.nonce == broadcast.nonce: + typer.echo( + f"- {_peer_name(item.peer)}: {_format_result(item.result)}" + ) + elif item.conversation_id == broadcast.nonce and item.peer.id not in seen_acks: + seen_acks.add(item.peer.id) + typer.echo(f"✓ {_peer_name(item.peer)} acked") + + if timeout is not None and node.replies(broadcast): + # After the first reply, keep the command snappy unless --forever is used. + timeout = 0.25 + finally: + await node.stop() + + +@app.command("ask") +def ask_command( + swarm: Annotated[str, typer.Argument(help="Swarm name")], + task: Annotated[str, typer.Argument(help="Task/question to ask the swarm")], + team: Annotated[ + str, + typer.Option("--team", "-t", help="Optional subgroup inside the swarm"), + ] = "default", + name: Annotated[ + str, + typer.Option("--name", "-n", help="Name for this CLI node"), + ] = "asker", + seed: Annotated[ + list[str] | None, + typer.Option("--seed", "-s", help="Seed node as host:port. Repeatable."), + ] = None, + mdns: Annotated[bool, typer.Option("--mdns/--no-mdns", help="Use local mDNS discovery")] = True, + discover: Annotated[ + float, + typer.Option("--discover", help="Seconds to discover peers before asking"), + ] = 0.5, + timeout: Annotated[ + float | None, + typer.Option("--timeout", help="Stop after this many seconds without an event"), + ] = 30, + forever: Annotated[ + bool, + typer.Option("--forever", help="Keep streaming replies until Ctrl+C"), + ] = False, + context: Annotated[ + list[str] | None, + typer.Option("--context", "-c", help="Context key=value. Repeatable."), + ] = None, +) -> None: + """Ask capable swarm nodes to perform work and stream ACKs/replies.""" + stream_timeout = None if forever else timeout + asyncio.run( + _ask( + swarm, + team, + task, + name, + _parse_seed(seed), + mdns, + discover, + stream_timeout, + _parse_key_value(context), + ) + ) + + async def _list_swarms(seconds: float) -> None: found: dict[tuple[str, str], set[str]] = defaultdict(set) zeroconf = AsyncZeroconf() diff --git a/synapse_p2p/node.py b/synapse_p2p/node.py index 093c9d8..5bd7a44 100644 --- a/synapse_p2p/node.py +++ b/synapse_p2p/node.py @@ -26,6 +26,7 @@ Broadcast, BroadcastReply, Connection, + ConversationEvent, NodeKind, Peer, PeriodicTask, @@ -103,6 +104,8 @@ def __init__( self.peer_timeout = peer_timeout self.peers: dict[str, Peer] = {} self.broadcast_replies: dict[str, list[BroadcastReply]] = {} + self.conversation_events: dict[str, list[ConversationEvent]] = {} + self._seen_conversation_events: set[str] = set() self.artifact_directory: dict[str, ServedArtifact] = {} self.lifecycle_handlers: dict[str, list[Callable[[Any], Coroutine[Any, Any, None]]]] = {} self.endpoint_directory: dict[str, Callable] = {} @@ -223,11 +226,29 @@ async def _dispatch(self, rpc: RPCRequest, connection: Connection): kwargs = dict(rpc.kwargs) if "broadcast" in kwargs and isinstance(kwargs["broadcast"], dict): kwargs["broadcast"] = Broadcast.from_dict(kwargs["broadcast"]) + broadcast = kwargs["broadcast"] + self._remember_conversation_event( + ConversationEvent( + conversation_id=broadcast.nonce, + event_id=broadcast.nonce, + kind="message", + peer=broadcast.origin, + payload={ + "endpoint": rpc.endpoint, + "args": list(rpc.args), + "kwargs": { + key: value + for key, value in kwargs.items() + if key != "broadcast" + }, + }, + ) + ) self._emit_lifecycle( "broadcast.received", { "endpoint": rpc.endpoint, - "broadcast": kwargs["broadcast"], + "broadcast": broadcast, "args": list(rpc.args), "kwargs": {key: value for key, value in kwargs.items() if key != "broadcast"}, }, @@ -464,6 +485,15 @@ async def broadcast(self, endpoint: str, *args, **kwargs) -> Broadcast: """ broadcast = Broadcast(nonce=new_nonce(), origin=self.self_peer(), endpoint=endpoint) + self._remember_conversation_event( + ConversationEvent( + conversation_id=broadcast.nonce, + event_id=broadcast.nonce, + kind="message", + peer=broadcast.origin, + payload={"endpoint": endpoint, "args": list(args), "kwargs": dict(kwargs)}, + ) + ) message = broadcast.to_dict() async def send(peer: Peer) -> None: @@ -477,6 +507,16 @@ async def send(peer: Peer) -> None: async def reply(self, broadcast: Broadcast, result: Any) -> None: """Reply to a broadcast using its nonce and share the reply with known peers.""" + self._remember_conversation_event( + ConversationEvent( + conversation_id=broadcast.nonce, + event_id=random_hash(), + kind="reply", + peer=self.self_peer(), + payload={"result": result}, + parent_id=broadcast.nonce, + ) + ) recipients = {broadcast.origin.id: broadcast.origin} for peer in self.peers.values(): recipients.setdefault(peer.id, peer) @@ -500,6 +540,83 @@ def replies(self, broadcast: Broadcast | str) -> list[BroadcastReply]: nonce = broadcast.nonce if isinstance(broadcast, Broadcast) else broadcast return list(self.broadcast_replies.get(nonce, [])) + def conversation(self, conversation: Broadcast | str) -> list[ConversationEvent]: + """Return locally known events for a conversation id or broadcast.""" + conversation_id = ( + conversation.nonce if isinstance(conversation, Broadcast) else conversation + ) + return list(self.conversation_events.get(conversation_id, [])) + + def _remember_conversation_event(self, event: ConversationEvent) -> bool: + if event.event_id in self._seen_conversation_events: + return False + self._validate_peer_membership(event.peer) + self._seen_conversation_events.add(event.event_id) + self.conversation_events.setdefault(event.conversation_id, []).append(event) + self.add_peer(event.peer) + self._emit_lifecycle("conversation.event", event) + self._emit_lifecycle(f"conversation.{event.kind}", event) + return True + + async def emit_conversation_event( + self, + conversation: Broadcast | str, + kind: str, + payload: dict[str, Any] | None = None, + *, + parent_id: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> ConversationEvent: + """Append a lightweight event to a shared conversation and gossip it to peers.""" + conversation_id = ( + conversation.nonce if isinstance(conversation, Broadcast) else conversation + ) + event = ConversationEvent( + conversation_id=conversation_id, + event_id=random_hash(), + kind=kind, + peer=self.self_peer(), + payload=payload or {}, + parent_id=parent_id, + metadata=metadata or {}, + ) + self._remember_conversation_event(event) + await self._send_conversation_event(event) + return event + + async def ack( + self, + conversation: Broadcast | str, + payload: dict[str, Any] | None = None, + *, + parent_id: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> ConversationEvent: + """Emit an opt-in acknowledgement event for a conversation.""" + if isinstance(conversation, Broadcast) and parent_id is None: + parent_id = conversation.nonce + return await self.emit_conversation_event( + conversation, + "ack", + payload, + parent_id=parent_id, + metadata=metadata, + ) + + async def _send_conversation_event( + self, + event: ConversationEvent, + *, + exclude: set[str] | None = None, + ) -> None: + excluded = {self.node_id, event.peer.id, *(exclude or set())} + recipients = [peer for peer in self.peers.values() if peer.id not in excluded] + + async def send(peer: Peer) -> None: + await Client.from_peer(peer).call("_synapse.conversation.event", event.to_dict()) + + await asyncio.gather(*(send(peer) for peer in recipients), return_exceptions=True) + async def _discover(self, *, wait: float = 0) -> None: if self.mdns is not None: await self.mdns.discover(wait=wait) @@ -544,6 +661,21 @@ async def node_ask(task: str, context: dict[str, Any] | None = None) -> Any: raise RuntimeError("node has no ask handler") return await self._ask_handler(task, context or {}) + @self.endpoint("synapse.ask", publish=False, description="Ask this node to perform work") + async def swarm_ask( + task: str, + context: dict[str, Any] | None = None, + broadcast: Broadcast | None = None, + ) -> Any: + if self._ask_handler is None: + raise RuntimeError("node has no ask handler") + if broadcast is not None: + await self.ack(broadcast) + result = await self._ask_handler(task, context or {}) + if broadcast is not None: + await self.reply(broadcast, result) + return result + def _register_system_endpoints(self) -> None: @self.endpoint("_synapse.ping", publish=False) async def ping() -> str: @@ -579,6 +711,14 @@ async def artifact_get(name: str) -> dict[str, Any]: raise InvalidMessageError(f"unknown artifact: {name}") return artifact.to_dict() + @self.endpoint("_synapse.conversation.event", publish=False) + async def conversation_event(event: dict[str, Any]) -> dict[str, Any]: + incoming = ConversationEvent.from_dict(event) + remembered = self._remember_conversation_event(incoming) + if remembered: + await self._send_conversation_event(incoming) + return {"ok": True, "stored": remembered} + @self.endpoint("_synapse.join", publish=False) async def join(peer: dict) -> dict: incoming = Peer.from_dict(peer) @@ -603,6 +743,16 @@ async def broadcast_reply(nonce: str, peer: dict, result=None) -> dict: return {"ok": True} self.add_peer(incoming) + self._remember_conversation_event( + ConversationEvent( + conversation_id=nonce, + event_id=random_hash(), + kind="reply", + peer=incoming, + payload={"result": result}, + parent_id=nonce, + ) + ) reply = BroadcastReply(nonce=nonce, peer=incoming, result=result) replies.append(reply) self._emit_lifecycle("broadcast.reply", reply) diff --git a/synapse_p2p/tests/test_broadcast.py b/synapse_p2p/tests/test_broadcast.py index 7b17ded..40b5cdd 100644 --- a/synapse_p2p/tests/test_broadcast.py +++ b/synapse_p2p/tests/test_broadcast.py @@ -2,7 +2,7 @@ import pytest -from synapse_p2p import Broadcast, BroadcastReply, Client, Node +from synapse_p2p import Broadcast, BroadcastReply, Client, ConversationEvent, Node @pytest.mark.asyncio @@ -221,6 +221,129 @@ async def test_broadcast_reply_endpoint_deduplicates_forwarded_replies(): await node.stop() +@pytest.mark.asyncio +async def test_broadcast_records_message_and_reply_conversation_events(): + origin = Node( + name="origin", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + worker = Node( + name="worker", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + + @worker.endpoint("team.question") + async def answer(question: str, broadcast: Broadcast) -> dict: + await worker.reply(broadcast, {"answer": question}) + return {"accepted": True} + + await origin.start() + await worker.start() + origin.add_peer(worker.self_peer()) + + try: + broadcast = await origin.broadcast("team.question", "who can help?") + await asyncio.sleep(0.05) + + origin_events = origin.conversation(broadcast) + worker_events = worker.conversation(broadcast) + + assert origin_events[0].kind == "message" + assert origin_events[0].event_id == broadcast.nonce + assert origin_events[0].payload["endpoint"] == "team.question" + assert any(event.kind == "reply" for event in origin_events) + assert worker_events[0].kind == "message" + finally: + await worker.stop() + await origin.stop() + + +@pytest.mark.asyncio +async def test_ack_is_opt_in_conversation_event_shared_with_origin(): + origin = Node( + name="origin", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + worker = Node( + name="worker", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + acked = asyncio.Queue() + + @origin.on("conversation.ack") + async def on_ack(event: ConversationEvent) -> None: + await acked.put(event) + + @worker.endpoint("team.question") + async def answer(_question: str, broadcast: Broadcast) -> dict: + await worker.ack(broadcast, {"seen": True}) + return {"accepted": True} + + await origin.start() + await worker.start() + origin.add_peer(worker.self_peer()) + + try: + broadcast = await origin.broadcast("team.question", "who can help?") + event = await asyncio.wait_for(acked.get(), 0.2) + + assert event.kind == "ack" + assert event.conversation_id == broadcast.nonce + assert event.parent_id == broadcast.nonce + assert event.peer.name == "worker" + assert event.payload == {"seen": True} + assert origin.conversation(broadcast)[-1].event_id == event.event_id + finally: + await worker.stop() + await origin.stop() + + +@pytest.mark.asyncio +async def test_conversation_event_endpoint_deduplicates_events(): + node = Node( + name="node", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + peer = Node( + name="peer", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + event = ConversationEvent( + conversation_id="nonce", + event_id="event-1", + kind="ack", + peer=peer.self_peer(), + payload={"seen": True}, + ) + + await node.start() + await peer.start() + + try: + for _ in range(2): + await Client.from_peer(node.self_peer()).call( + "_synapse.conversation.event", + event.to_dict(), + ) + + assert len(node.conversation("nonce")) == 1 + finally: + await peer.stop() + await node.stop() + + @pytest.mark.asyncio async def test_broadcast_reply_endpoint_validates_membership(): origin = Node( diff --git a/synapse_p2p/tests/test_cli.py b/synapse_p2p/tests/test_cli.py index 1dd6912..d2696c5 100644 --- a/synapse_p2p/tests/test_cli.py +++ b/synapse_p2p/tests/test_cli.py @@ -6,9 +6,12 @@ from synapse_p2p.cli import ( CHATTER_LIMIT, OFFLINE_PEER_RETENTION, + _ask, _broadcast, _chatter_text, _format_result, + _loopback_only, + _parse_key_value, _parse_seed, _peer_label, _peer_line, @@ -22,6 +25,19 @@ def test_parse_seed_defaults_to_empty_list(): assert _parse_seed(None) == [] +def test_parse_key_value_context(): + assert _parse_key_value(["diff=abc", "url=https://example.com"]) == { + "diff": "abc", + "url": "https://example.com", + } + + +def test_loopback_only_supports_string_and_tuple_seeds(): + assert _loopback_only(["127.0.0.1:9999", ("127.0.0.1", 8888)], mdns=False) is True + assert _loopback_only(["192.168.1.10:9999"], mdns=False) is False + assert _loopback_only(["127.0.0.1:9999"], mdns=True) is False + + def test_offline_peer_retention_is_a_few_minutes(): assert OFFLINE_PEER_RETENTION == 180 @@ -165,6 +181,66 @@ async def receive(message: str, broadcast: Broadcast) -> dict[str, str]: assert "no replies" not in output +@pytest.mark.asyncio +async def test_cli_ask_streams_acks_and_replies_from_seed_peer(monkeypatch): + output: list[str] = [] + worker = Node( + name="worker", + swarm="foo.electron.network", + bind="127.0.0.1", + advertise="127.0.0.1", + heartbeat_interval=None, + ) + + @worker.ask + async def handle(task: str, context: dict): + return {"answer": f"handled {task} with {context['kind']}"} + + listener = await worker.start() + _host, port = listener.sockets[0].getsockname()[:2] + monkeypatch.setattr("synapse_p2p.cli.typer.echo", output.append) + + try: + await _ask( + "foo.electron.network", + "default", + "review this", + "asker", + [("127.0.0.1", port)], + False, + 0, + 1, + {"kind": "diff"}, + ) + finally: + await worker.stop() + + assert any(line.startswith("ask: ") for line in output) + assert any("worker acked" in line for line in output) + assert any("worker" in line and "handled review this with diff" in line for line in output) + assert "no replies" not in output + + +@pytest.mark.asyncio +async def test_cli_ask_times_out_cleanly_without_replies(monkeypatch): + output: list[str] = [] + monkeypatch.setattr("synapse_p2p.cli.typer.echo", output.append) + + await _ask( + "foo.electron.network", + "default", + "hello", + "asker", + [], + False, + 0, + 0.01, + ) + + assert any(line.startswith("ask: ") for line in output) + assert "no replies" in output + + @pytest.mark.asyncio async def test_cli_broadcast_times_out_cleanly_without_replies(monkeypatch): output: list[str] = [] diff --git a/synapse_p2p/tests/test_public_api.py b/synapse_p2p/tests/test_public_api.py index 0d70efa..3cfb713 100644 --- a/synapse_p2p/tests/test_public_api.py +++ b/synapse_p2p/tests/test_public_api.py @@ -5,6 +5,7 @@ BroadcastReply, Capability, Client, + ConversationEvent, MessagePackRPCSerializer, Node, NodeKind, @@ -24,6 +25,7 @@ def test_substrate_types_are_exported_from_top_level_package(): assert BroadcastReply is not None assert Capability is not None assert Client is not None + assert ConversationEvent is not None assert MessagePackRPCSerializer is not None assert Node is not None assert NodeKind is not None diff --git a/synapse_p2p/types.py b/synapse_p2p/types.py index 99d7d0b..0a78fb2 100644 --- a/synapse_p2p/types.py +++ b/synapse_p2p/types.py @@ -166,6 +166,45 @@ def descriptor(self) -> AdvertisedArtifact: ) +@dataclass(slots=True) +class ConversationEvent: + """A small event appended to a shared swarm conversation.""" + + conversation_id: str + event_id: str + kind: str + peer: Peer + payload: dict[str, Any] = field(default_factory=dict) + parent_id: str | None = None + created_at: float = field(default_factory=time.time) + metadata: dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ConversationEvent: + return cls( + conversation_id=data["conversation_id"], + event_id=data["event_id"], + kind=data["kind"], + peer=Peer.from_dict(data["peer"]), + payload=dict(data.get("payload", {})), + parent_id=data.get("parent_id"), + created_at=float(data.get("created_at", time.time())), + metadata=dict(data.get("metadata", {})), + ) + + def to_dict(self) -> dict[str, Any]: + return { + "conversation_id": self.conversation_id, + "event_id": self.event_id, + "kind": self.kind, + "peer": self.peer.to_dict(), + "payload": self.payload, + "parent_id": self.parent_id, + "created_at": self.created_at, + "metadata": self.metadata, + } + + def build_connection_from_peer_name(peer_name: tuple[str, int]) -> Connection: ip, port = peer_name identifier = sha256(f"{ip}:{port}".encode()).hexdigest()[:8]