diff --git a/alphaswarm/agent/clients/cron_job.py b/alphaswarm/agent/clients/cron_job.py index 16b322b8..9c78476d 100644 --- a/alphaswarm/agent/clients/cron_job.py +++ b/alphaswarm/agent/clients/cron_job.py @@ -12,17 +12,41 @@ def __init__( agent: AlphaSwarmAgent, client_id: str, interval_seconds: int, - message_generator: Callable[[], str], + message_generator: Callable[[], str], # TODO: consider returning Optional[str] and not call agent with None response_handler: Callable[[str], None] = print, + should_process: Callable[[str], bool] = lambda _: True, + skip_message: Callable[[str], None] = lambda _: None, + max_history: int = 1, ) -> None: - super().__init__(agent, client_id) + """ + Initialize CronJobClient with conditional processing. + + Args: + agent: The AlphaSwarmAgent instance + client_id: Unique identifier for the client + interval_seconds: Interval between message generation + message_generator: Function that generates messages + response_handler: Function to handle agent responses + should_process: Function that decides if a message should be processed by agent + skip_message: Function to handle skipped messages + max_history: Maximum number of messages to keep in history + """ + super().__init__(agent, client_id, max_history=max_history) self.interval_seconds = interval_seconds self.message_generator = message_generator + self.should_process = should_process self.response_handler = response_handler + self.skip_message = skip_message async def get_message(self) -> Context: await asyncio.sleep(self.interval_seconds) message = self.message_generator() + + # If message should not be processed, handle it and return a quit signal + if not self.should_process(message): + self.skip_message(message) + return Context(context=None, message="quit") + return Context(context=None, message=message) async def on_agent_response(self, ctx: Context, message: ChatMessage) -> None: @@ -36,3 +60,19 @@ async def on_start(self) -> None: async def on_stop(self) -> None: self.response_handler(f"Cron Job {self.id} stopped") + + async def start(self) -> None: + """Override start to continue after skipped messages""" + if self._lock: + raise RuntimeError("Client already started") + await self.on_start() + self._lock = asyncio.Lock() + try: + while True: + context = await self.get_message() + if context.message.lower() == "quit": + continue # Continue instead of break to keep the cron job running + await self._process_message(context) + await asyncio.sleep(1) + finally: + await self.stop() diff --git a/alphaswarm/tools/alchemy/__init__.py b/alphaswarm/tools/alchemy/__init__.py index 110ddd56..d73d535a 100644 --- a/alphaswarm/tools/alchemy/__init__.py +++ b/alphaswarm/tools/alchemy/__init__.py @@ -1 +1,2 @@ from .alchemy_price_history import AlchemyPriceHistoryBySymbol, AlchemyPriceHistoryByAddress +from .alchemy_price_change import TokenPriceChangeCalculator diff --git a/alphaswarm/tools/alchemy/alchemy_price_change.py b/alphaswarm/tools/alchemy/alchemy_price_change.py new file mode 100644 index 00000000..6ad94e3b --- /dev/null +++ b/alphaswarm/tools/alchemy/alchemy_price_change.py @@ -0,0 +1,90 @@ +from datetime import datetime +from decimal import Decimal +from typing import Optional + +from alphaswarm.services.alchemy import AlchemyClient +from alphaswarm.tools.alchemy import AlchemyPriceHistoryByAddress +from pydantic import BaseModel +from smolagents import Tool + + +class TokenPriceChange(BaseModel): + token_address: str + network: str + start_time: datetime + end_time: datetime + start_price: float + end_price: float + percent_change: float + n_samples: int + frequency: str + + +class TokenPriceChangeCalculator(Tool): + name = "TokenPriceChangeCalculator" + description = "Calculate the percentage price change for a token over a specified number of samples and frequency" + inputs = { + "token_address": { + "type": "string", + "description": "The token address to analyze", + }, + "frequency": { + "type": "string", + "description": "Time interval between data points", + "enum": ["5m", "1h", "1d"], + }, + "n_samples": { + "type": "integer", + "description": "Number of samples to analyze (must be >= 2)", + "minimum": 2, + }, + "network": { + "type": "string", + "description": "Network where the token exists (e.g. eth-mainnet, base-mainnet)", + "default": "eth-mainnet", + "nullable": True, + }, + } + output_type = "object" + + def __init__(self, alchemy_client: Optional[AlchemyClient] = None): + super().__init__() + self.price_history_tool = AlchemyPriceHistoryByAddress(alchemy_client) + + def _calculate_percent_change(self, start_price: Decimal, end_price: Decimal) -> float: + return float((end_price - start_price) / start_price * 100) + + def forward( + self, token_address: str, frequency: str, n_samples: int, network: str = "eth-mainnet" + ) -> TokenPriceChange: + # Calculate the required history in days based on frequency and n_samples + interval_to_minutes = {"5m": 5, "1h": 60, "1d": 1440} + minutes_needed = interval_to_minutes[frequency] * n_samples + days_needed = (minutes_needed // 1440) + 1 # Round up to nearest day + + # Use the existing price history tool + price_history = self.price_history_tool.forward( + address=token_address, network=network, interval=frequency, history=days_needed + ) + + # Ensure we have enough data points + prices = price_history.data[-n_samples:] # Get the most recent n_samples + if len(prices) < n_samples: + raise ValueError(f"Requested {n_samples} samples but only got {len(prices)}") + + # Calculate percent changes + start_price = prices[0].value + end_price = prices[-1].value + percent_change = self._calculate_percent_change(start_price, end_price) + + return TokenPriceChange( + token_address=token_address, + network=network, + start_time=prices[0].timestamp, + end_time=prices[-1].timestamp, + start_price=float(start_price), + end_price=float(end_price), + percent_change=percent_change, + n_samples=n_samples, + frequency=frequency, + ) diff --git a/alphaswarm/tools/price_tool.py b/alphaswarm/tools/price_tool.py index 45fa10aa..d2e26d0e 100644 --- a/alphaswarm/tools/price_tool.py +++ b/alphaswarm/tools/price_tool.py @@ -36,6 +36,12 @@ def __del__(self) -> None: """Cleanup the session when the tool is destroyed""" self.session.close() + def fetch_price(self, *, address: str, chain: str) -> requests.Response: + url = f"{self.base_url}/simple/token_price/{chain}" + params = {"contract_addresses": address, "vs_currencies": "usd", "include_24hr_change": "true"} + + return self.session.get(url, params=params, timeout=10) + def forward(self, address: str, chain: str) -> str: """ Fetch current price and 24h change for a given token @@ -48,10 +54,7 @@ def forward(self, address: str, chain: str) -> str: # Normalize address to lowercase for consistent comparison address = address.lower() - url = f"{self.base_url}/simple/token_price/{chain}" - params = {"contract_addresses": address, "vs_currencies": "usd", "include_24hr_change": "true"} - - response = self.session.get(url, params=params, timeout=10) + response = self.fetch_price(address=address, chain=chain) if response.status_code != 200: return f"Error: Could not fetch price for {address} (Status: {response.status_code})" diff --git a/examples/agents/price_change_observer.py b/examples/agents/price_change_observer.py new file mode 100644 index 00000000..20987e1a --- /dev/null +++ b/examples/agents/price_change_observer.py @@ -0,0 +1,109 @@ +import asyncio +import datetime +import logging +from typing import List + +import dotenv +from alphaswarm.agent.agent import AlphaSwarmAgent +from alphaswarm.agent.clients import CronJobClient +from alphaswarm.services.alchemy import AlchemyClient +from alphaswarm.tools.alchemy import TokenPriceChangeCalculator + + +class PriceChangeObserver(AlphaSwarmAgent): + def __init__( + self, + token_addresses: List[str], + chain: str = "base-mainnet", + price_change_interval: str = "1h", + price_pct_chg_thresh: float = 2.0, + ) -> None: + """ + A basic agent that observes price changes for a set of token addresses. + This agent does not use any LLM calls, instead it will just execute a sequence of tool calls. + It can be adapted to use any LLM by adding logic to thea `process_message` method. + + Args: + token_addresses: List of token addresses to observe + chain: Chain to observe + price_change_interval: Interval to observe + price_pct_chg_thresh: Percentage change threshold to observe + """ + + self.alchemy_client = AlchemyClient.from_env() + self.price_change_calculator = TokenPriceChangeCalculator(self.alchemy_client) + self.token_addresses = token_addresses + self.chain = chain + self.price_change_interval = price_change_interval + self.price_pct_chg_thresh = price_pct_chg_thresh + + hints = "Have any of the price changes increased or decreased (+/- 1%) since the last observation? Respond with either 'yes' or 'no'." + + super().__init__(model_id="gpt-4o-mini", tools=[], hints=hints) + + def get_price_alerts(self) -> str: + """ + Get the price alerts for the token addresses. + """ + price_alerts = [] + for address in self.token_addresses: + price_history = self.price_change_calculator.forward( + token_address=address, + frequency=self.price_change_interval, + n_samples=2, + network=self.chain, + ) + + if abs(price_history.percent_change) >= self.price_pct_chg_thresh: + price_alerts.append( + f"{address}: {price_history.percent_change}% change in the last {self.price_change_interval}." + ) + + logging.info(f"{len(price_alerts)} price alerts found.") + if len(price_alerts) > 0: + alert_message = f"Price changes have been observed for the following tokens as of {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: " + alert_message += "\n" + "\n".join(price_alerts) + return alert_message + else: + return "" + + # async def process_message(self, message: str) -> Optional[str]: + # """ + # You can override the `process_message` method to specify how the agent will respond to the price alerts. + # When this method is not overridden, the default LLM-based agent configuration will be used to respond. + # """ + # logging.info(f"Agent received alerts:\n{message}") + # pass + + +async def main() -> None: + dotenv.load_dotenv() + logging.basicConfig(level=logging.INFO) + token_addresses = [ + "0x4F9Fd6Be4a90f2620860d680c0d4d5Fb53d1A825", # AIXBT + "0x0b3e328455c4059EEb9e3f84b5543F74E24e7E1b", # VIRTUAL + "0x731814e491571A2e9eE3c5b1F7f3b962eE8f4870", # VADER + ] + + agent = PriceChangeObserver( + token_addresses=token_addresses, + chain="base-mainnet", + price_change_interval="5m", # '5m', '1h', or '1d' + price_pct_chg_thresh=0.02, + ) + + cron_client = CronJobClient( + agent=agent, + client_id="Price Change Observer", + interval_seconds=5, + response_handler=lambda _: None, + message_generator=agent.get_price_alerts, + should_process=lambda alerts: len(alerts) > 0, + skip_message=lambda _: None, + max_history=2, + ) + await asyncio.gather(cron_client.start()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/integration/tools/alchemy/test_alchemy_price_change_calculator.py b/tests/integration/tools/alchemy/test_alchemy_price_change_calculator.py new file mode 100644 index 00000000..b702b59e --- /dev/null +++ b/tests/integration/tools/alchemy/test_alchemy_price_change_calculator.py @@ -0,0 +1,14 @@ +from alphaswarm.services.alchemy import AlchemyClient +from alphaswarm.tools.alchemy.alchemy_price_change import TokenPriceChangeCalculator + + +def test_get_price_change(alchemy_client: AlchemyClient) -> None: + tool = TokenPriceChangeCalculator(alchemy_client) + result = tool.forward( + token_address="0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599", # WBTC + frequency="5m", + n_samples=2, + network="eth-mainnet", + ) + + assert abs(result.percent_change) > 0