Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ It also ships with a CLI tool to monitor your swarms:
- [Local discovery: mDNS](#local-discovery-mdns)
- [Remote discovery: seeds](#remote-discovery-seeds)
- [Capabilities](#capabilities)
- [Advertised artifacts and agent cards](#advertised-artifacts-and-agent-cards)
- [Ask](#ask)
- [Broadcast conversations](#broadcast)
- [Heartbeats and liveness](#heartbeats)
Expand All @@ -73,6 +74,12 @@ It also ships with a CLI tool to monitor your swarms:
## Install

```bash
uv add synapse-p2p
```

or

```
pip install synapse-p2p
```

Expand Down Expand Up @@ -251,6 +258,62 @@ methods = await client.call("_synapse.methods")

---

## Advertised artifacts and agent cards

Nodes can advertise small metadata documents or resources that peers can fetch over Synapse RPC.
Synapse does not interpret the document; it only serves the bytes/JSON plus a MIME type. Higher-level agents decide what they understand.

An A2A-style agent card can be published as just another artifact:

```python
from synapse_p2p import Node

node = Node(
name="Tiny Review LLC",
role="reviewer",
swarm="code.review",
capabilities=["code-review", "pytest"],
)

node.artifact(
"agent-card",
{
"name": node.name,
"description": "Reviews Python PRs and returns concise feedback.",
"capabilities": ["code-review", "pytest"],
"input_modes": ["text", "git-diff", "url"],
"output_modes": ["text/markdown", "text/x-diff"],
},
mime_type="application/vnd.synapse.agent-card+json",
description="Self-description for agents that understand agent cards.",
)
```

Peers discover and fetch advertised artifacts with system endpoints:

```python
from synapse_p2p import Client

client = Client.from_peer(peer)

artifacts = await client.call("_synapse.artifacts")
# [{"name": "agent-card", "mime_type": "application/vnd.synapse.agent-card+json", ...}]

agent_card = await client.call("_synapse.artifact.get", "agent-card")
print(agent_card["mime_type"])
print(agent_card["content"])
```

MIME types are conventions for consumers:

- `application/vnd.synapse.agent-card+json` — a Synapse/A2A-style self-description
- `application/vnd.synapse.capability-schema+json` — capability-specific input/output schema
- `text/markdown`, `application/pdf`, `image/png`, `text/x-diff` — normal document/file types

For now, artifacts are small inline documents served over RPC. Large artifacts can advertise external URIs in their content or metadata.

---

## Ask

Register one ask handler:
Expand Down
12 changes: 11 additions & 1 deletion synapse_p2p/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,20 @@
from synapse_p2p.messages import RemoteProcedureCall, RPCError, RPCRequest, RPCResponse
from synapse_p2p.node import Capability, Node
from synapse_p2p.serializers import BaseRPCSerializer, MessagePackRPCSerializer
from synapse_p2p.types import Broadcast, BroadcastReply, Connection, NodeKind, Peer
from synapse_p2p.types import (
AdvertisedArtifact,
Broadcast,
BroadcastReply,
Connection,
NodeKind,
Peer,
ServedArtifact,
)

logger.disable("synapse_p2p")

__all__ = [
"AdvertisedArtifact",
"Capability",
"BaseRPCSerializer",
"Broadcast",
Expand All @@ -42,6 +51,7 @@
"RPCRequest",
"RPCResponse",
"RemoteProcedureCall",
"ServedArtifact",
"__logo__",
"__version__",
]
83 changes: 83 additions & 0 deletions synapse_p2p/node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import base64
import contextlib
import hashlib
import inspect
import json
import time
import uuid
from collections.abc import Awaitable, Callable, Coroutine
Expand All @@ -25,6 +28,7 @@
Connection,
NodeKind,
Peer,
ServedArtifact,
build_connection_from_peer_name,
)
from synapse_p2p.utils import random_hash
Expand Down Expand Up @@ -98,6 +102,7 @@ def __init__(
self.peer_timeout = peer_timeout
self.peers: dict[str, Peer] = {}
self.broadcast_replies: dict[str, list[BroadcastReply]] = {}
self.artifact_directory: dict[str, ServedArtifact] = {}
self.lifecycle_handlers: dict[str, list[Callable[[Any], Coroutine[Any, Any, None]]]] = {}
self.endpoint_directory: dict[str, Callable] = {}
self.endpoint_metadata: dict[str, EndpointMetadata] = {}
Expand Down Expand Up @@ -135,6 +140,70 @@ def ask(self, wrapped: AskHandler) -> AskHandler:
self._ask_handler = wrapped
return wrapped

def artifact(
self,
name: str,
content: Any,
*,
mime_type: str = "application/json",
kind: str = "metadata",
description: str = "",
encoding: str | None = None,
metadata: dict[str, Any] | None = None,
) -> ServedArtifact:
"""Advertise a small inline artifact that peers can fetch over RPC.

Synapse does not interpret the MIME type. It advertises and serves the
document so higher-level agents can decide whether they understand it.
"""
artifact = self._build_artifact(
name=name,
content=content,
mime_type=mime_type,
kind=kind,
description=description,
encoding=encoding,
metadata=metadata or {},
)
self.artifact_directory[name] = artifact
return artifact

def _build_artifact(
self,
*,
name: str,
content: Any,
mime_type: str,
kind: str,
description: str,
encoding: str | None,
metadata: dict[str, Any],
) -> ServedArtifact:
if isinstance(content, bytes):
raw = content
served_content: Any = base64.b64encode(content).decode("ascii")
artifact_encoding = encoding or "base64"
elif isinstance(content, str):
raw = content.encode()
served_content = content
artifact_encoding = encoding or "text"
else:
raw = json.dumps(content, sort_keys=True, separators=(",", ":")).encode()
served_content = content
artifact_encoding = encoding or "json"

return ServedArtifact(
name=name,
mime_type=mime_type,
content=served_content,
kind=kind,
description=description,
encoding=artifact_encoding,
size=len(raw),
sha256=hashlib.sha256(raw).hexdigest(),
metadata=dict(metadata),
)

def run(self) -> None:
"""Run the node."""
print(__logo__)
Expand Down Expand Up @@ -495,6 +564,20 @@ async def methods() -> list[dict[str, str | bool]]:
async def peers() -> list[dict]:
return [peer.to_dict() for peer in self.peers.values()]

@self.endpoint("_synapse.artifacts", publish=False)
async def artifacts() -> list[dict[str, Any]]:
return [
artifact.descriptor().to_dict()
for artifact in self.artifact_directory.values()
]

@self.endpoint("_synapse.artifact.get", publish=False)
async def artifact_get(name: str) -> dict[str, Any]:
artifact = self.artifact_directory.get(name)
if artifact is None:
raise InvalidMessageError(f"unknown artifact: {name}")
return artifact.to_dict()

@self.endpoint("_synapse.join", publish=False)
async def join(peer: dict) -> dict:
incoming = Peer.from_dict(peer)
Expand Down
62 changes: 62 additions & 0 deletions synapse_p2p/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ async def heartbeat():
assert task.period == 5


def test_artifact_registers_descriptor(node):
artifact = node.artifact(
"agent-card",
{"name": "reviewer", "capabilities": ["code-review"]},
mime_type="application/vnd.synapse.agent-card+json",
description="Self-description for this node",
)

descriptor = artifact.descriptor().to_dict()

assert artifact.name == "agent-card"
assert artifact.encoding == "json"
assert artifact.content == {"name": "reviewer", "capabilities": ["code-review"]}
assert artifact.size is not None
assert artifact.sha256 is not None
assert "content" not in descriptor


async def _send_message(node: Node, message) -> RPCResponse:
tcp = await asyncio.start_server(node.handle_data, node.bind, node.port)
host, port = tcp.sockets[0].getsockname()[:2]
Expand Down Expand Up @@ -198,6 +216,50 @@ async def handle(reader, writer):
await Client(host, port, timeout=0.01).call("slow")


@pytest.mark.asyncio
async def test_synapse_artifacts_endpoints():
node = Node(bind="127.0.0.1")
node.artifact(
"agent-card",
{"name": "reviewer", "capabilities": ["code-review"]},
mime_type="application/vnd.synapse.agent-card+json",
)

artifacts = await _send_rpc(node, RPCRequest(id="list", endpoint="_synapse.artifacts"))
assert artifacts.ok is True
assert artifacts.result == [
{
"name": "agent-card",
"mime_type": "application/vnd.synapse.agent-card+json",
"kind": "metadata",
"description": "",
"encoding": "json",
"size": 50,
"sha256": "dff355dc1bfeebbd133f25b00a8c169d25c9afef8d3f45076f0140668bed3920",
"metadata": {},
}
]

fetched = await _send_rpc(
node,
RPCRequest(id="get", endpoint="_synapse.artifact.get", args=["agent-card"]),
)
assert fetched.ok is True
assert fetched.result["content"] == {"name": "reviewer", "capabilities": ["code-review"]}


@pytest.mark.asyncio
async def test_synapse_artifact_get_unknown_name_returns_bad_request():
node = Node(bind="127.0.0.1")
response = await _send_rpc(
node,
RPCRequest(id="get", endpoint="_synapse.artifact.get", args=["missing"]),
)
assert response.ok is False
assert response.error is not None
assert response.error.code == "bad_request"


@pytest.mark.asyncio
async def test_node_start_and_stop_lifecycle():
node = Node(bind="127.0.0.1")
Expand Down
4 changes: 4 additions & 0 deletions synapse_p2p/tests/test_public_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from synapse_p2p import (
AdvertisedArtifact,
BaseRPCSerializer,
Broadcast,
BroadcastReply,
Expand All @@ -12,10 +13,12 @@
RPCError,
RPCRequest,
RPCResponse,
ServedArtifact,
)


def test_substrate_types_are_exported_from_top_level_package():
assert AdvertisedArtifact is not None
assert BaseRPCSerializer is not None
assert Broadcast is not None
assert BroadcastReply is not None
Expand All @@ -29,3 +32,4 @@ def test_substrate_types_are_exported_from_top_level_package():
assert RPCRequest is not None
assert RPCResponse is not None
assert RemoteProcedureCall is RPCRequest
assert ServedArtifact is not None
55 changes: 55 additions & 0 deletions synapse_p2p/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,61 @@ def to_dict(self) -> dict[str, Any]:
return {"nonce": self.nonce, "peer": self.peer.to_dict(), "result": self.result}


@dataclass(slots=True)
class AdvertisedArtifact:
"""Descriptor for a small document or resource a node can serve to peers."""

name: str
mime_type: str
kind: str = "metadata"
description: str = ""
encoding: str = "json"
size: int | None = None
sha256: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)

@classmethod
def from_dict(cls, data: dict[str, Any]) -> AdvertisedArtifact:
return cls(**data)

def to_dict(self) -> dict[str, Any]:
return asdict(self)


@dataclass(slots=True)
class ServedArtifact:
"""Artifact descriptor plus inline content returned by a node."""

name: str
mime_type: str
content: Any
kind: str = "metadata"
description: str = ""
encoding: str = "json"
size: int | None = None
sha256: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)

@classmethod
def from_dict(cls, data: dict[str, Any]) -> ServedArtifact:
return cls(**data)

def to_dict(self) -> dict[str, Any]:
return asdict(self)

def descriptor(self) -> AdvertisedArtifact:
return AdvertisedArtifact(
name=self.name,
mime_type=self.mime_type,
kind=self.kind,
description=self.description,
encoding=self.encoding,
size=self.size,
sha256=self.sha256,
metadata=dict(self.metadata),
)


def build_connection_from_peer_name(peer_name: tuple[str, int]) -> Connection:
ip, port = peer_name
identifier = sha256(f"{ip}:{port}".encode()).hexdigest()[:8]
Expand Down
Loading