Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ It also ships with a CLI tool to monitor your swarms:
- [Advertised artifacts and agent cards](#advertised-artifacts-and-agent-cards)
- [Ask](#ask)
- [Broadcast conversations](#broadcast)
- [Periodic tasks](#periodic-tasks)
- [Heartbeats and liveness](#heartbeats)
- [CLI](#cli)
- [`sn watch`](#sn-watch)
Expand Down Expand Up @@ -387,6 +388,60 @@ Why this matters:

---

## Periodic tasks

Use `@node.periodic(...)` to run an async function on an interval, cron expression, or solar event while the node is running. Periodic tasks start with `await node.start()` or `node.run()`, and `await node.stop()` cancels scheduled and in-flight runs.

```python
from synapse_p2p import Node, cron, every, solar

node = Node(name="worker")


@node.periodic(every(seconds=30))
async def refresh_cache() -> None:
print("refreshing cache")


@node.periodic(cron("0 9 * * mon-fri", tz="Europe/London"))
async def weekday_digest() -> None:
print("weekday digest")


@node.periodic(solar("civil_twilight_begin", latitude=51.5, longitude=-0.1, tz="Europe/London"))
async def dawn_check() -> None:
print("civil twilight has begun")


node.run()
```

For simple intervals, a number is shorthand for seconds:

```python
@node.periodic(30) # equivalent to every(seconds=30)
async def refresh_cache() -> None:
...
```

Built-in schedules:

- `every(seconds=..., minutes=..., hours=..., days=...)`
- `cron("*/15 * * * *", tz="UTC")`
- `solar("sunrise", latitude=..., longitude=..., tz="UTC")`

Solar events include `sunrise`, `sunset`, `solar_noon`, `civil_twilight_begin`, `civil_twilight_end`, `nautical_twilight_begin`, `nautical_twilight_end`, `astronomical_twilight_begin`, and `astronomical_twilight_end`.

Notes:

- The decorated function must be `async def` and take no arguments.
- The first run starts immediately when the node starts; later runs follow the schedule.
- Long-running tasks can overlap if a previous run is still active when the next scheduled time arrives.
- Exceptions are logged and do not stop future runs.
- Tasks added after `await node.start()` are scheduled immediately.

---

## Heartbeats

Nodes heartbeat known peers and mark stale peers offline.
Expand Down
4 changes: 2 additions & 2 deletions example_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
app = Node(port=9999)


@app.background(3)
@app.periodic(3)
async def heartbeat():
print("Running background task every 3 seconds")
print("Running periodic task every 3 seconds")


@app.endpoint("sum")
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ keywords = ["agents", "agent-to-agent", "multi-agent", "rpc", "p2p", "msgpack",
authors = [{ name = "Daniel van Flymen", email = "vanflymen@gmail.com" }]
requires-python = ">=3.10"
dependencies = [
"astral>=3.2",
"croniter>=6.0",
"loguru",
"msgpack",
"rich>=13",
Expand Down
7 changes: 7 additions & 0 deletions synapse_p2p/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from synapse_p2p.client import Client
from synapse_p2p.messages import RemoteProcedureCall, RPCError, RPCRequest, RPCResponse
from synapse_p2p.node import Capability, Node
from synapse_p2p.schedules import CronSchedule, IntervalSchedule, SolarSchedule, cron, every, solar
from synapse_p2p.serializers import BaseRPCSerializer, MessagePackRPCSerializer
from synapse_p2p.types import (
AdvertisedArtifact,
Expand All @@ -43,6 +44,7 @@
"BroadcastReply",
"Client",
"Connection",
"CronSchedule",
"MessagePackRPCSerializer",
"Node",
"NodeKind",
Expand All @@ -52,6 +54,11 @@
"RPCResponse",
"RemoteProcedureCall",
"ServedArtifact",
"IntervalSchedule",
"SolarSchedule",
"cron",
"every",
"solar",
"__logo__",
"__version__",
]
63 changes: 0 additions & 63 deletions synapse_p2p/background.py

This file was deleted.

42 changes: 23 additions & 19 deletions synapse_p2p/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@
from loguru import logger

from synapse_p2p import __logo__
from synapse_p2p.background import BackgroundTaskHandler
from synapse_p2p.client import Client
from synapse_p2p.exceptions import InvalidMessageError
from synapse_p2p.framing import read_frame, write_frame
from synapse_p2p.mdns import MdnsDiscovery
from synapse_p2p.messages import RPCError, RPCRequest, RPCResponse
from synapse_p2p.network import advertised_address
from synapse_p2p.periodic import PeriodicTaskHandler
from synapse_p2p.schedules import Schedule, every
from synapse_p2p.serializers import BaseRPCSerializer, MessagePackRPCSerializer
from synapse_p2p.types import (
BackgroundTask,
Broadcast,
BroadcastReply,
Connection,
NodeKind,
Peer,
PeriodicTask,
ServedArtifact,
build_connection_from_peer_name,
)
Expand Down Expand Up @@ -107,7 +108,7 @@ def __init__(
self.endpoint_directory: dict[str, Callable] = {}
self.endpoint_metadata: dict[str, EndpointMetadata] = {}
self.max_upload_size = max_upload_size
self.background_executor = BackgroundTaskHandler()
self.periodic_executor = PeriodicTaskHandler()
self.serializer_class = serializer_class
self._listener: asyncio.Server | None = None
self._register_system_endpoints()
Expand Down Expand Up @@ -309,9 +310,9 @@ def _print_startup(self) -> None:
for endpoint in self.endpoint_directory:
print(f"- {endpoint}")

if self.background_executor.tasks:
print("\nBackground Tasks:")
for task in self.background_executor.tasks:
if self.periodic_executor.tasks:
print("\nPeriodic Tasks:")
for task in self.periodic_executor.tasks:
print(f"- {task.name} ({task.period}s)")
print()

Expand All @@ -325,14 +326,14 @@ async def start(self) -> asyncio.Server:
if socket is not None:
_bound_address, self.port = socket.getsockname()[:2]
self.address = advertised_address(self.bind, self.advertise)
self.background_executor.start()
self.periodic_executor.start()
if self.mdns is not None:
await self.mdns.start()
return self._listener

async def stop(self) -> None:
"""Stop accepting connections and cancel background tasks."""
await self.background_executor.stop()
"""Stop accepting connections and cancel periodic tasks."""
await self.periodic_executor.stop()
if self.mdns is not None:
await self.mdns.stop()
if self._listener is None:
Expand Down Expand Up @@ -440,18 +441,18 @@ async def _reap_stale_peers(self) -> None:
def _register_lifecycle_tasks(self) -> None:
if self.heartbeat_interval is None:
return
self.background_executor.add_task(
BackgroundTask(
self.periodic_executor.add_task(
PeriodicTask(
name="_synapse.heartbeat",
callable=self._send_heartbeats,
period=self.heartbeat_interval,
schedule=every(seconds=self.heartbeat_interval),
)
)
self.background_executor.add_task(
BackgroundTask(
self.periodic_executor.add_task(
PeriodicTask(
name="_synapse.reap_stale_peers",
callable=self._reap_stale_peers,
period=self.heartbeat_interval,
schedule=every(seconds=self.heartbeat_interval),
)
)

Expand Down Expand Up @@ -647,12 +648,15 @@ def decorator(wrapped: Callable) -> Callable:

return decorator

def background(self, period: float) -> Callable:
"""Decorator to schedule a coroutine as a periodic background task."""
def periodic(self, schedule: float | Schedule) -> Callable:
"""Decorator to schedule a coroutine as a periodic task."""
task_schedule = every(seconds=schedule) if isinstance(schedule, int | float) else schedule

def decorator(wrapped: Callable) -> Callable:
self.background_executor.add_task(
BackgroundTask(name=wrapped.__name__, callable=wrapped, period=period)
if not inspect.iscoroutinefunction(wrapped):
raise TypeError("periodic task must be an async function")
self.periodic_executor.add_task(
PeriodicTask(name=wrapped.__name__, callable=wrapped, schedule=task_schedule)
)
return wrapped

Expand Down
72 changes: 72 additions & 0 deletions synapse_p2p/periodic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import asyncio
from datetime import UTC, datetime

from loguru import logger

from synapse_p2p.types import PeriodicTask


class PeriodicTaskHandler:
def __init__(self) -> None:
self.tasks: list[PeriodicTask] = []
self._running: set[asyncio.Task] = set()
self._scheduled: set[asyncio.TimerHandle] = set()
self._started = False

def add_task(self, task: PeriodicTask) -> None:
self.tasks.append(task)
if self._started:
self._schedule(task, immediate=True)

async def _run(self, task: PeriodicTask) -> None:
try:
await task.callable()
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Periodic task {!r} raised", task.name)

def _schedule(self, task: PeriodicTask, *, immediate: bool = False) -> None:
if not self._started:
return

delay = 0.0
if not immediate:
now = datetime.now(UTC)
task.next_run = task.schedule.next_after(task.next_run or now)
delay = max(0.0, (task.next_run - now).total_seconds())

handle: asyncio.TimerHandle | None = None

def run_and_schedule_next() -> None:
if handle is not None:
self._scheduled.discard(handle)

# Keep a strong reference so the task isn't garbage-collected mid-run
# (https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task).
running = asyncio.create_task(self._run(task), name=task.name)
self._running.add(running)
running.add_done_callback(self._running.discard)
self._schedule(task)

handle = asyncio.get_running_loop().call_later(delay, run_and_schedule_next)
self._scheduled.add(handle)

def start(self) -> None:
if self._started:
return
self._started = True
for task in self.tasks:
self._schedule(task, immediate=True)

async def stop(self) -> None:
self._started = False
for handle in self._scheduled:
handle.cancel()
self._scheduled.clear()

for task in self._running:
task.cancel()
if self._running:
await asyncio.gather(*self._running, return_exceptions=True)
self._running.clear()
Loading
Loading