Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions alphaswarm/agent/clients/cron_job.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main thing I might want to discuss is:
Should we keep cron_job.py as-is and create a new client to add logic? For example we could call this conditional_cron_job.py instead.
Another point is that I'd like to also create something like a swarm_cron_job.py that allows the user to specify something like a chain of agents to handle processing, so we could have, e.g. PriceForecaster -> TradeAssistant

Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,41 @@ def __init__(
agent: AlphaSwarmAgent,
client_id: str,
interval_seconds: int,
message_generator: Callable[[], str],
message_generator: Callable[[], str], # TODO: consider returning Optional[str] and not call agent with None
response_handler: Callable[[str], None] = print,
should_process: Callable[[str], bool] = lambda _: True,
skip_message: Callable[[str], None] = lambda _: None,
max_history: int = 1,
) -> None:
super().__init__(agent, client_id)
"""
Initialize CronJobClient with conditional processing.

Args:
agent: The AlphaSwarmAgent instance
client_id: Unique identifier for the client
interval_seconds: Interval between message generation
message_generator: Function that generates messages
response_handler: Function to handle agent responses
should_process: Function that decides if a message should be processed by agent
skip_message: Function to handle skipped messages
max_history: Maximum number of messages to keep in history
"""
super().__init__(agent, client_id, max_history=max_history)
self.interval_seconds = interval_seconds
self.message_generator = message_generator
self.should_process = should_process
self.response_handler = response_handler
self.skip_message = skip_message

async def get_message(self) -> Context:
await asyncio.sleep(self.interval_seconds)
message = self.message_generator()

# If message should not be processed, handle it and return a quit signal
if not self.should_process(message):
self.skip_message(message)
return Context(context=None, message="quit")

return Context(context=None, message=message)

async def on_agent_response(self, ctx: Context, message: ChatMessage) -> None:
Expand All @@ -36,3 +60,19 @@ async def on_start(self) -> None:

async def on_stop(self) -> None:
self.response_handler(f"Cron Job {self.id} stopped")

async def start(self) -> None:
"""Override start to continue after skipped messages"""
if self._lock:
raise RuntimeError("Client already started")
await self.on_start()
self._lock = asyncio.Lock()
try:
while True:
context = await self.get_message()
if context.message.lower() == "quit":
continue # Continue instead of break to keep the cron job running
await self._process_message(context)
await asyncio.sleep(1)
finally:
await self.stop()
1 change: 1 addition & 0 deletions alphaswarm/tools/alchemy/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .alchemy_price_history import AlchemyPriceHistoryBySymbol, AlchemyPriceHistoryByAddress
from .alchemy_price_change import TokenPriceChangeCalculator
90 changes: 90 additions & 0 deletions alphaswarm/tools/alchemy/alchemy_price_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from datetime import datetime
from decimal import Decimal
from typing import Optional

from alphaswarm.services.alchemy import AlchemyClient
from alphaswarm.tools.alchemy import AlchemyPriceHistoryByAddress
from pydantic import BaseModel
from smolagents import Tool


class TokenPriceChange(BaseModel):
token_address: str
network: str
start_time: datetime
end_time: datetime
start_price: float
end_price: float
percent_change: float
n_samples: int
frequency: str


class TokenPriceChangeCalculator(Tool):
name = "TokenPriceChangeCalculator"
description = "Calculate the percentage price change for a token over a specified number of samples and frequency"
inputs = {
"token_address": {
"type": "string",
"description": "The token address to analyze",
},
"frequency": {
"type": "string",
"description": "Time interval between data points",
"enum": ["5m", "1h", "1d"],
},
"n_samples": {
"type": "integer",
"description": "Number of samples to analyze (must be >= 2)",
"minimum": 2,
},
"network": {
"type": "string",
"description": "Network where the token exists (e.g. eth-mainnet, base-mainnet)",
"default": "eth-mainnet",
"nullable": True,
},
}
output_type = "object"

def __init__(self, alchemy_client: Optional[AlchemyClient] = None):
super().__init__()
self.price_history_tool = AlchemyPriceHistoryByAddress(alchemy_client)

def _calculate_percent_change(self, start_price: Decimal, end_price: Decimal) -> float:
return float((end_price - start_price) / start_price * 100)

def forward(
self, token_address: str, frequency: str, n_samples: int, network: str = "eth-mainnet"
) -> TokenPriceChange:
# Calculate the required history in days based on frequency and n_samples
interval_to_minutes = {"5m": 5, "1h": 60, "1d": 1440}
minutes_needed = interval_to_minutes[frequency] * n_samples
days_needed = (minutes_needed // 1440) + 1 # Round up to nearest day

# Use the existing price history tool
price_history = self.price_history_tool.forward(
address=token_address, network=network, interval=frequency, history=days_needed
)

# Ensure we have enough data points
prices = price_history.data[-n_samples:] # Get the most recent n_samples
if len(prices) < n_samples:
raise ValueError(f"Requested {n_samples} samples but only got {len(prices)}")

# Calculate percent changes
start_price = prices[0].value
end_price = prices[-1].value
percent_change = self._calculate_percent_change(start_price, end_price)

return TokenPriceChange(
token_address=token_address,
network=network,
start_time=prices[0].timestamp,
end_time=prices[-1].timestamp,
start_price=float(start_price),
end_price=float(end_price),
percent_change=percent_change,
n_samples=n_samples,
frequency=frequency,
)
11 changes: 7 additions & 4 deletions alphaswarm/tools/price_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ def __del__(self) -> None:
"""Cleanup the session when the tool is destroyed"""
self.session.close()

def fetch_price(self, *, address: str, chain: str) -> requests.Response:
url = f"{self.base_url}/simple/token_price/{chain}"
params = {"contract_addresses": address, "vs_currencies": "usd", "include_24hr_change": "true"}

return self.session.get(url, params=params, timeout=10)

def forward(self, address: str, chain: str) -> str:
"""
Fetch current price and 24h change for a given token
Expand All @@ -48,10 +54,7 @@ def forward(self, address: str, chain: str) -> str:
# Normalize address to lowercase for consistent comparison
address = address.lower()

url = f"{self.base_url}/simple/token_price/{chain}"
params = {"contract_addresses": address, "vs_currencies": "usd", "include_24hr_change": "true"}

response = self.session.get(url, params=params, timeout=10)
response = self.fetch_price(address=address, chain=chain)

if response.status_code != 200:
return f"Error: Could not fetch price for {address} (Status: {response.status_code})"
Expand Down
109 changes: 109 additions & 0 deletions examples/agents/price_change_observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import asyncio
import datetime
import logging
from typing import List

import dotenv
from alphaswarm.agent.agent import AlphaSwarmAgent
from alphaswarm.agent.clients import CronJobClient
from alphaswarm.services.alchemy import AlchemyClient
from alphaswarm.tools.alchemy import TokenPriceChangeCalculator


class PriceChangeObserver(AlphaSwarmAgent):
def __init__(
self,
token_addresses: List[str],
chain: str = "base-mainnet",
price_change_interval: str = "1h",
price_pct_chg_thresh: float = 2.0,
) -> None:
"""
A basic agent that observes price changes for a set of token addresses.
This agent does not use any LLM calls, instead it will just execute a sequence of tool calls.
It can be adapted to use any LLM by adding logic to thea `process_message` method.

Args:
token_addresses: List of token addresses to observe
chain: Chain to observe
price_change_interval: Interval to observe
price_pct_chg_thresh: Percentage change threshold to observe
"""

self.alchemy_client = AlchemyClient.from_env()
self.price_change_calculator = TokenPriceChangeCalculator(self.alchemy_client)
self.token_addresses = token_addresses
self.chain = chain
self.price_change_interval = price_change_interval
self.price_pct_chg_thresh = price_pct_chg_thresh

hints = "Have any of the price changes increased or decreased (+/- 1%) since the last observation? Respond with either 'yes' or 'no'."

super().__init__(model_id="gpt-4o-mini", tools=[], hints=hints)

def get_price_alerts(self) -> str:
"""
Get the price alerts for the token addresses.
"""
price_alerts = []
for address in self.token_addresses:
price_history = self.price_change_calculator.forward(
token_address=address,
frequency=self.price_change_interval,
n_samples=2,
network=self.chain,
)

if abs(price_history.percent_change) >= self.price_pct_chg_thresh:
price_alerts.append(
f"{address}: {price_history.percent_change}% change in the last {self.price_change_interval}."
)

logging.info(f"{len(price_alerts)} price alerts found.")
if len(price_alerts) > 0:
alert_message = f"Price changes have been observed for the following tokens as of {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: "
alert_message += "\n" + "\n".join(price_alerts)
return alert_message
else:
return ""

# async def process_message(self, message: str) -> Optional[str]:
# """
# You can override the `process_message` method to specify how the agent will respond to the price alerts.
# When this method is not overridden, the default LLM-based agent configuration will be used to respond.
# """
# logging.info(f"Agent received alerts:\n{message}")
# pass


async def main() -> None:
dotenv.load_dotenv()
logging.basicConfig(level=logging.INFO)
token_addresses = [
"0x4F9Fd6Be4a90f2620860d680c0d4d5Fb53d1A825", # AIXBT
"0x0b3e328455c4059EEb9e3f84b5543F74E24e7E1b", # VIRTUAL
"0x731814e491571A2e9eE3c5b1F7f3b962eE8f4870", # VADER
]

agent = PriceChangeObserver(
token_addresses=token_addresses,
chain="base-mainnet",
price_change_interval="5m", # '5m', '1h', or '1d'
price_pct_chg_thresh=0.02,
)

cron_client = CronJobClient(
agent=agent,
client_id="Price Change Observer",
interval_seconds=5,
response_handler=lambda _: None,
message_generator=agent.get_price_alerts,
should_process=lambda alerts: len(alerts) > 0,
skip_message=lambda _: None,
max_history=2,
)
await asyncio.gather(cron_client.start())


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from alphaswarm.services.alchemy import AlchemyClient
from alphaswarm.tools.alchemy.alchemy_price_change import TokenPriceChangeCalculator


def test_get_price_change(alchemy_client: AlchemyClient) -> None:
tool = TokenPriceChangeCalculator(alchemy_client)
result = tool.forward(
token_address="0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599", # WBTC
frequency="5m",
n_samples=2,
network="eth-mainnet",
)

assert abs(result.percent_change) > 0