diff --git a/README.md b/README.md index 60e1657..10107cf 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,7 @@ It also ships with a CLI tool to monitor your swarms: - [Local discovery: mDNS](#local-discovery-mdns) - [Remote discovery: seeds](#remote-discovery-seeds) - [Capabilities](#capabilities) +- [Advertised artifacts and agent cards](#advertised-artifacts-and-agent-cards) - [Ask](#ask) - [Broadcast conversations](#broadcast) - [Heartbeats and liveness](#heartbeats) @@ -73,6 +74,12 @@ It also ships with a CLI tool to monitor your swarms: ## Install ```bash +uv add synapse-p2p +``` + +or + +``` pip install synapse-p2p ``` @@ -251,6 +258,62 @@ methods = await client.call("_synapse.methods") --- +## Advertised artifacts and agent cards + +Nodes can advertise small metadata documents or resources that peers can fetch over Synapse RPC. +Synapse does not interpret the document; it only serves the bytes/JSON plus a MIME type. Higher-level agents decide what they understand. + +An A2A-style agent card can be published as just another artifact: + +```python +from synapse_p2p import Node + +node = Node( + name="Tiny Review LLC", + role="reviewer", + swarm="code.review", + capabilities=["code-review", "pytest"], +) + +node.artifact( + "agent-card", + { + "name": node.name, + "description": "Reviews Python PRs and returns concise feedback.", + "capabilities": ["code-review", "pytest"], + "input_modes": ["text", "git-diff", "url"], + "output_modes": ["text/markdown", "text/x-diff"], + }, + mime_type="application/vnd.synapse.agent-card+json", + description="Self-description for agents that understand agent cards.", +) +``` + +Peers discover and fetch advertised artifacts with system endpoints: + +```python +from synapse_p2p import Client + +client = Client.from_peer(peer) + +artifacts = await client.call("_synapse.artifacts") +# [{"name": "agent-card", "mime_type": "application/vnd.synapse.agent-card+json", ...}] + +agent_card = await client.call("_synapse.artifact.get", "agent-card") +print(agent_card["mime_type"]) +print(agent_card["content"]) +``` + +MIME types are conventions for consumers: + +- `application/vnd.synapse.agent-card+json` — a Synapse/A2A-style self-description +- `application/vnd.synapse.capability-schema+json` — capability-specific input/output schema +- `text/markdown`, `application/pdf`, `image/png`, `text/x-diff` — normal document/file types + +For now, artifacts are small inline documents served over RPC. Large artifacts can advertise external URIs in their content or metadata. + +--- + ## Ask Register one ask handler: diff --git a/synapse_p2p/__init__.py b/synapse_p2p/__init__.py index f20c9c6..5b936c8 100644 --- a/synapse_p2p/__init__.py +++ b/synapse_p2p/__init__.py @@ -23,11 +23,20 @@ from synapse_p2p.messages import RemoteProcedureCall, RPCError, RPCRequest, RPCResponse from synapse_p2p.node import Capability, Node from synapse_p2p.serializers import BaseRPCSerializer, MessagePackRPCSerializer -from synapse_p2p.types import Broadcast, BroadcastReply, Connection, NodeKind, Peer +from synapse_p2p.types import ( + AdvertisedArtifact, + Broadcast, + BroadcastReply, + Connection, + NodeKind, + Peer, + ServedArtifact, +) logger.disable("synapse_p2p") __all__ = [ + "AdvertisedArtifact", "Capability", "BaseRPCSerializer", "Broadcast", @@ -42,6 +51,7 @@ "RPCRequest", "RPCResponse", "RemoteProcedureCall", + "ServedArtifact", "__logo__", "__version__", ] diff --git a/synapse_p2p/node.py b/synapse_p2p/node.py index b28c4a0..ec4ce20 100644 --- a/synapse_p2p/node.py +++ b/synapse_p2p/node.py @@ -1,6 +1,9 @@ import asyncio +import base64 import contextlib +import hashlib import inspect +import json import time import uuid from collections.abc import Awaitable, Callable, Coroutine @@ -25,6 +28,7 @@ Connection, NodeKind, Peer, + ServedArtifact, build_connection_from_peer_name, ) from synapse_p2p.utils import random_hash @@ -98,6 +102,7 @@ def __init__( self.peer_timeout = peer_timeout self.peers: dict[str, Peer] = {} self.broadcast_replies: dict[str, list[BroadcastReply]] = {} + self.artifact_directory: dict[str, ServedArtifact] = {} self.lifecycle_handlers: dict[str, list[Callable[[Any], Coroutine[Any, Any, None]]]] = {} self.endpoint_directory: dict[str, Callable] = {} self.endpoint_metadata: dict[str, EndpointMetadata] = {} @@ -135,6 +140,70 @@ def ask(self, wrapped: AskHandler) -> AskHandler: self._ask_handler = wrapped return wrapped + def artifact( + self, + name: str, + content: Any, + *, + mime_type: str = "application/json", + kind: str = "metadata", + description: str = "", + encoding: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> ServedArtifact: + """Advertise a small inline artifact that peers can fetch over RPC. + + Synapse does not interpret the MIME type. It advertises and serves the + document so higher-level agents can decide whether they understand it. + """ + artifact = self._build_artifact( + name=name, + content=content, + mime_type=mime_type, + kind=kind, + description=description, + encoding=encoding, + metadata=metadata or {}, + ) + self.artifact_directory[name] = artifact + return artifact + + def _build_artifact( + self, + *, + name: str, + content: Any, + mime_type: str, + kind: str, + description: str, + encoding: str | None, + metadata: dict[str, Any], + ) -> ServedArtifact: + if isinstance(content, bytes): + raw = content + served_content: Any = base64.b64encode(content).decode("ascii") + artifact_encoding = encoding or "base64" + elif isinstance(content, str): + raw = content.encode() + served_content = content + artifact_encoding = encoding or "text" + else: + raw = json.dumps(content, sort_keys=True, separators=(",", ":")).encode() + served_content = content + artifact_encoding = encoding or "json" + + return ServedArtifact( + name=name, + mime_type=mime_type, + content=served_content, + kind=kind, + description=description, + encoding=artifact_encoding, + size=len(raw), + sha256=hashlib.sha256(raw).hexdigest(), + metadata=dict(metadata), + ) + def run(self) -> None: """Run the node.""" print(__logo__) @@ -495,6 +564,20 @@ async def methods() -> list[dict[str, str | bool]]: async def peers() -> list[dict]: return [peer.to_dict() for peer in self.peers.values()] + @self.endpoint("_synapse.artifacts", publish=False) + async def artifacts() -> list[dict[str, Any]]: + return [ + artifact.descriptor().to_dict() + for artifact in self.artifact_directory.values() + ] + + @self.endpoint("_synapse.artifact.get", publish=False) + async def artifact_get(name: str) -> dict[str, Any]: + artifact = self.artifact_directory.get(name) + if artifact is None: + raise InvalidMessageError(f"unknown artifact: {name}") + return artifact.to_dict() + @self.endpoint("_synapse.join", publish=False) async def join(peer: dict) -> dict: incoming = Peer.from_dict(peer) diff --git a/synapse_p2p/tests/test_node.py b/synapse_p2p/tests/test_node.py index 56f8c5c..41b3ebd 100644 --- a/synapse_p2p/tests/test_node.py +++ b/synapse_p2p/tests/test_node.py @@ -34,6 +34,24 @@ async def heartbeat(): assert task.period == 5 +def test_artifact_registers_descriptor(node): + artifact = node.artifact( + "agent-card", + {"name": "reviewer", "capabilities": ["code-review"]}, + mime_type="application/vnd.synapse.agent-card+json", + description="Self-description for this node", + ) + + descriptor = artifact.descriptor().to_dict() + + assert artifact.name == "agent-card" + assert artifact.encoding == "json" + assert artifact.content == {"name": "reviewer", "capabilities": ["code-review"]} + assert artifact.size is not None + assert artifact.sha256 is not None + assert "content" not in descriptor + + async def _send_message(node: Node, message) -> RPCResponse: tcp = await asyncio.start_server(node.handle_data, node.bind, node.port) host, port = tcp.sockets[0].getsockname()[:2] @@ -198,6 +216,50 @@ async def handle(reader, writer): await Client(host, port, timeout=0.01).call("slow") +@pytest.mark.asyncio +async def test_synapse_artifacts_endpoints(): + node = Node(bind="127.0.0.1") + node.artifact( + "agent-card", + {"name": "reviewer", "capabilities": ["code-review"]}, + mime_type="application/vnd.synapse.agent-card+json", + ) + + artifacts = await _send_rpc(node, RPCRequest(id="list", endpoint="_synapse.artifacts")) + assert artifacts.ok is True + assert artifacts.result == [ + { + "name": "agent-card", + "mime_type": "application/vnd.synapse.agent-card+json", + "kind": "metadata", + "description": "", + "encoding": "json", + "size": 50, + "sha256": "dff355dc1bfeebbd133f25b00a8c169d25c9afef8d3f45076f0140668bed3920", + "metadata": {}, + } + ] + + fetched = await _send_rpc( + node, + RPCRequest(id="get", endpoint="_synapse.artifact.get", args=["agent-card"]), + ) + assert fetched.ok is True + assert fetched.result["content"] == {"name": "reviewer", "capabilities": ["code-review"]} + + +@pytest.mark.asyncio +async def test_synapse_artifact_get_unknown_name_returns_bad_request(): + node = Node(bind="127.0.0.1") + response = await _send_rpc( + node, + RPCRequest(id="get", endpoint="_synapse.artifact.get", args=["missing"]), + ) + assert response.ok is False + assert response.error is not None + assert response.error.code == "bad_request" + + @pytest.mark.asyncio async def test_node_start_and_stop_lifecycle(): node = Node(bind="127.0.0.1") diff --git a/synapse_p2p/tests/test_public_api.py b/synapse_p2p/tests/test_public_api.py index 9d60855..0d70efa 100644 --- a/synapse_p2p/tests/test_public_api.py +++ b/synapse_p2p/tests/test_public_api.py @@ -1,4 +1,5 @@ from synapse_p2p import ( + AdvertisedArtifact, BaseRPCSerializer, Broadcast, BroadcastReply, @@ -12,10 +13,12 @@ RPCError, RPCRequest, RPCResponse, + ServedArtifact, ) def test_substrate_types_are_exported_from_top_level_package(): + assert AdvertisedArtifact is not None assert BaseRPCSerializer is not None assert Broadcast is not None assert BroadcastReply is not None @@ -29,3 +32,4 @@ def test_substrate_types_are_exported_from_top_level_package(): assert RPCRequest is not None assert RPCResponse is not None assert RemoteProcedureCall is RPCRequest + assert ServedArtifact is not None diff --git a/synapse_p2p/types.py b/synapse_p2p/types.py index 70627ac..766f682 100644 --- a/synapse_p2p/types.py +++ b/synapse_p2p/types.py @@ -105,6 +105,61 @@ def to_dict(self) -> dict[str, Any]: return {"nonce": self.nonce, "peer": self.peer.to_dict(), "result": self.result} +@dataclass(slots=True) +class AdvertisedArtifact: + """Descriptor for a small document or resource a node can serve to peers.""" + + name: str + mime_type: str + kind: str = "metadata" + description: str = "" + encoding: str = "json" + size: int | None = None + sha256: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> AdvertisedArtifact: + return cls(**data) + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + +@dataclass(slots=True) +class ServedArtifact: + """Artifact descriptor plus inline content returned by a node.""" + + name: str + mime_type: str + content: Any + kind: str = "metadata" + description: str = "" + encoding: str = "json" + size: int | None = None + sha256: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ServedArtifact: + return cls(**data) + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + def descriptor(self) -> AdvertisedArtifact: + return AdvertisedArtifact( + name=self.name, + mime_type=self.mime_type, + kind=self.kind, + description=self.description, + encoding=self.encoding, + size=self.size, + sha256=self.sha256, + metadata=dict(self.metadata), + ) + + def build_connection_from_peer_name(peer_name: tuple[str, int]) -> Connection: ip, port = peer_name identifier = sha256(f"{ip}:{port}".encode()).hexdigest()[:8]