From 36e0b1e51ab0ec2c9dd307c462bf107149bd62c4 Mon Sep 17 00:00:00 2001 From: Dmytro Nikolaiev Date: Wed, 12 Feb 2025 14:49:43 -0500 Subject: [PATCH 1/6] Implement cron_observer example --- alphaswarm/agent/clients/cron_job.py | 40 ++++++++++++++- alphaswarm/tools/price_tool.py | 11 ++-- examples/interaction/cron_observer.py | 74 +++++++++++++++++++++++++++ 3 files changed, 120 insertions(+), 5 deletions(-) create mode 100644 examples/interaction/cron_observer.py diff --git a/alphaswarm/agent/clients/cron_job.py b/alphaswarm/agent/clients/cron_job.py index 16b322b8..bf473117 100644 --- a/alphaswarm/agent/clients/cron_job.py +++ b/alphaswarm/agent/clients/cron_job.py @@ -12,17 +12,39 @@ def __init__( agent: AlphaSwarmAgent, client_id: str, interval_seconds: int, - message_generator: Callable[[], str], + message_generator: Callable[[], str], # TODO: consider returning Optional[str] and not call agent with None response_handler: Callable[[str], None] = print, + should_process: Callable[[str], bool] = lambda _: True, + skip_message: Callable[[str], None] = lambda _: None, ) -> None: + """ + Initialize CronJobClient with conditional processing. + + Args: + agent: The AlphaSwarmAgent instance + client_id: Unique identifier for the client + interval_seconds: Interval between message generation + message_generator: Function that generates messages + response_handler: Function to handle agent responses + should_process: Function that decides if a message should be processed by agent + skip_message: Function to handle skipped messages + """ super().__init__(agent, client_id) self.interval_seconds = interval_seconds self.message_generator = message_generator + self.should_process = should_process self.response_handler = response_handler + self.skip_message = skip_message async def get_message(self) -> Context: await asyncio.sleep(self.interval_seconds) message = self.message_generator() + + # If message should not be processed, handle it and return a quit signal + if not self.should_process(message): + self.skip_message(message) + return Context(context=None, message="quit") + return Context(context=None, message=message) async def on_agent_response(self, ctx: Context, message: ChatMessage) -> None: @@ -36,3 +58,19 @@ async def on_start(self) -> None: async def on_stop(self) -> None: self.response_handler(f"Cron Job {self.id} stopped") + + async def start(self) -> None: + """Override start to continue after skipped messages""" + if self._lock: + raise RuntimeError("Client already started") + await self.on_start() + self._lock = asyncio.Lock() + try: + while True: + context = await self.get_message() + if context.message.lower() == "quit": + continue # Continue instead of break to keep the cron job running + await self._process_message(context) + await asyncio.sleep(1) + finally: + await self.stop() diff --git a/alphaswarm/tools/price_tool.py b/alphaswarm/tools/price_tool.py index 45fa10aa..d2e26d0e 100644 --- a/alphaswarm/tools/price_tool.py +++ b/alphaswarm/tools/price_tool.py @@ -36,6 +36,12 @@ def __del__(self) -> None: """Cleanup the session when the tool is destroyed""" self.session.close() + def fetch_price(self, *, address: str, chain: str) -> requests.Response: + url = f"{self.base_url}/simple/token_price/{chain}" + params = {"contract_addresses": address, "vs_currencies": "usd", "include_24hr_change": "true"} + + return self.session.get(url, params=params, timeout=10) + def forward(self, address: str, chain: str) -> str: """ Fetch current price and 24h change for a given token @@ -48,10 +54,7 @@ def forward(self, address: str, chain: str) -> str: # Normalize address to lowercase for consistent comparison address = address.lower() - url = f"{self.base_url}/simple/token_price/{chain}" - params = {"contract_addresses": address, "vs_currencies": "usd", "include_24hr_change": "true"} - - response = self.session.get(url, params=params, timeout=10) + response = self.fetch_price(address=address, chain=chain) if response.status_code != 200: return f"Error: Could not fetch price for {address} (Status: {response.status_code})" diff --git a/examples/interaction/cron_observer.py b/examples/interaction/cron_observer.py new file mode 100644 index 00000000..8979adbf --- /dev/null +++ b/examples/interaction/cron_observer.py @@ -0,0 +1,74 @@ +import asyncio +import logging +from typing import Callable, List + +import dotenv +from alphaswarm.agent.agent import AlphaSwarmAgent +from alphaswarm.agent.clients import CronJobClient +from alphaswarm.config import Config +from alphaswarm.tools.alchemy import AlchemyPriceHistoryBySymbol +from alphaswarm.tools.exchanges import GetTokenPriceTool +from alphaswarm.tools.price_tool import PriceTool +from smolagents import Tool + +logging.getLogger("smolagents").setLevel(logging.ERROR) + + +async def main() -> None: + dotenv.load_dotenv() + config = Config() + + # Initialize tools for price-related operations + # GetTokenPriceTool: Real-time token prices + # AlchemyPriceHistoryBySymbol: Historical price data from Alchemy + tools: List[Tool] = [GetTokenPriceTool(config), AlchemyPriceHistoryBySymbol()] # TODO: what tools to use + + # Initialize the AlphaSwarm agent with the price tools + agent = AlphaSwarmAgent(tools=tools, model_id="anthropic/claude-3-5-sonnet-20241022") + + price_tool = PriceTool() + + def generate_message() -> str: + # Call price tool and see if there's a 24h price change more than `threshold`% + + address = "0x0b3e328455c4059EEb9e3f84b5543F74E24e7E1b".lower() # VIRTUAL + chain = "base" + threshold = 10 # in percentages + + data = price_tool.fetch_price(address=address, chain=chain).json() + change_24h = data[address]["usd_24h_change"] + if abs(change_24h) >= threshold: + return f"Alert! {address} change on {chain} is {change_24h:+.2f}%" + + return ( + f"No alerts, {address} change on {chain} is {change_24h:+.2f}%, " + f"monitoring for at least {threshold:.2f}% change..." + ) + + def should_process_message(message: str) -> bool: + return message.startswith("Alert!") + + def handle_skipped(message: str) -> None: + print(f"Skipped processing message: {message}") + + def response_handler(prefix: str) -> Callable[[str], None]: + # Creates a closure that prints responses with color formatting + def handler(response: str) -> None: + print(f"\033[94m[{prefix}] Received response: {response}\033[0m") + + return handler + + cron_client = CronJobClient( + agent=agent, + client_id="AlphaSwarm Observer Example", + interval_seconds=5, + message_generator=generate_message, + response_handler=response_handler("AlphaSwarm Observer Example"), + should_process=should_process_message, + skip_message=handle_skipped, + ) + await asyncio.gather(cron_client.start()) + + +if __name__ == "__main__": + asyncio.run(main()) From 44f5e0cd70e40c3a6895603c68657374ac2b1779 Mon Sep 17 00:00:00 2001 From: Ethan Date: Thu, 13 Feb 2025 00:59:29 +0000 Subject: [PATCH 2/6] price change tool and observer agent example --- alphaswarm/tools/alchemy/__init__.py | 1 + .../tools/alchemy/alchemy_price_change.py | 87 ++++++++++++++ examples/agents/price_change_observer.py | 109 ++++++++++++++++++ .../test_alchemy_price_change_calculator.py | 14 +++ 4 files changed, 211 insertions(+) create mode 100644 alphaswarm/tools/alchemy/alchemy_price_change.py create mode 100644 examples/agents/price_change_observer.py create mode 100644 tests/integration/tools/alchemy/test_alchemy_price_change_calculator.py diff --git a/alphaswarm/tools/alchemy/__init__.py b/alphaswarm/tools/alchemy/__init__.py index 110ddd56..d73d535a 100644 --- a/alphaswarm/tools/alchemy/__init__.py +++ b/alphaswarm/tools/alchemy/__init__.py @@ -1 +1,2 @@ from .alchemy_price_history import AlchemyPriceHistoryBySymbol, AlchemyPriceHistoryByAddress +from .alchemy_price_change import TokenPriceChangeCalculator diff --git a/alphaswarm/tools/alchemy/alchemy_price_change.py b/alphaswarm/tools/alchemy/alchemy_price_change.py new file mode 100644 index 00000000..cbbf0301 --- /dev/null +++ b/alphaswarm/tools/alchemy/alchemy_price_change.py @@ -0,0 +1,87 @@ +from datetime import datetime +from decimal import Decimal +from pydantic import BaseModel +from typing import Optional + +from alphaswarm.services.alchemy import AlchemyClient +from alphaswarm.tools.alchemy import AlchemyPriceHistoryByAddress +from smolagents import Tool + + +class TokenPriceChange(BaseModel): + token_address: str + network: str + start_time: datetime + end_time: datetime + start_price: float + end_price: float + percent_change: float + n_samples: int + frequency: str + +class TokenPriceChangeCalculator(Tool): + name = "TokenPriceChangeCalculator" + description = "Calculate the percentage price change for a token over a specified number of samples and frequency" + inputs = { + "token_address": { + "type": "string", + "description": "The token address to analyze", + }, + "frequency": { + "type": "string", + "description": "Time interval between data points", + "enum": ["5m", "1h", "1d"], + }, + "n_samples": { + "type": "integer", + "description": "Number of samples to analyze (must be >= 2)", + "minimum": 2, + }, + "network": { + "type": "string", + "description": "Network where the token exists (e.g. eth-mainnet, base-mainnet)", + "default": "eth-mainnet", + "nullable": True, + }, + } + output_type = "object" + + def __init__(self, alchemy_client: Optional[AlchemyClient] = None): + super().__init__() + self.price_history_tool = AlchemyPriceHistoryByAddress(alchemy_client) + + def _calculate_percent_change(self, start_price: Decimal, end_price: Decimal) -> float: + return float((end_price - start_price) / start_price * 100) + + def forward(self, token_address: str, frequency: str, n_samples: int, network: str = "eth-mainnet") -> TokenPriceChange: + # Calculate the required history in days based on frequency and n_samples + interval_to_minutes = {"5m": 5, "1h": 60, "1d": 1440} + minutes_needed = interval_to_minutes[frequency] * n_samples + days_needed = (minutes_needed // 1440) + 1 # Round up to nearest day + + # Use the existing price history tool + price_history = self.price_history_tool.forward( + address=token_address, network=network, interval=frequency, history=days_needed + ) + + # Ensure we have enough data points + prices = price_history.data[-n_samples:] # Get the most recent n_samples + if len(prices) < n_samples: + raise ValueError(f"Requested {n_samples} samples but only got {len(prices)}") + + # Calculate percent changes + start_price = prices[0].value + end_price = prices[-1].value + percent_change = self._calculate_percent_change(start_price, end_price) + + return TokenPriceChange( + token_address=token_address, + network=network, + start_time=prices[0].timestamp, + end_time=prices[-1].timestamp, + start_price=float(start_price), + end_price=float(end_price), + percent_change=percent_change, + n_samples=n_samples, + frequency=frequency, + ) diff --git a/examples/agents/price_change_observer.py b/examples/agents/price_change_observer.py new file mode 100644 index 00000000..1973c14b --- /dev/null +++ b/examples/agents/price_change_observer.py @@ -0,0 +1,109 @@ +import asyncio +import dotenv +from enum import Enum +import logging +from typing import List, Optional + +from alphaswarm.agent.agent import AlphaSwarmAgent +from alphaswarm.agent.clients import TerminalClient, CronJobClient +from alphaswarm.config import BASE_PATH, Config +from alphaswarm.tools.alchemy import TokenPriceChangeCalculator +from alphaswarm.services.alchemy import AlchemyClient + +class PriceChangeInterval(Enum): + FIVE_MINUTES: str = "5m" + ONE_HOUR: str = "1h" + ONE_DAY: str = "1d" + + +""" +TODO: Finish this in the morning -- we just want the agent to execute a sequence of tool calls, +with some logic to apply thresholds or whatever. + +The point is not that this is needed for this example, but rather that the pattern still works. + +In another example, we'll have an agent that is generating forecasts, which could absoultely use the 'agentic' pattern. +""" + +class PriceChangeObserver(AlphaSwarmAgent): + def __init__( + self, + token_addresses: List[str], + chain: str = "base-mainnet", + price_change_interval: str = PriceChangeInterval.ONE_HOUR, + price_change_percentage: float = 2.0, + ) -> None: + """ + Initialize the PriceChangeObserver with some `configurable` parameters. + This `agent` won't use any LLM calls, rather it will just execute a sequence of tool calls. + It may imply that we want to generalize AlphSwarmAgent so that it doesn't necessarily need a LLM or smolagents. + + Args: + token_addresses: List of token addresses to observe + chain: Chain to observe + price_change_interval: Interval to observe + price_change_percentage: Percentage to observe + """ + + self.alchemy_client = AlchemyClient.from_env() + self.price_change_calculator = TokenPriceChangeCalculator(self.alchemy_client) + self.token_addresses = token_addresses + self.chain = chain + self.price_change_interval = price_change_interval + self.price_change_percentage = price_change_percentage + + super().__init__(tools=[self.price_change_calculator]) + + async def process_message(self, current_message: str) -> Optional[str]: + # It doesn't matter what the message is, we just want to 'execute' a sequence of tool calls + logging.info(f"Processing message: {current_message}") + + # Get the price history + price_alerts = [] + for address in self.token_addresses: + logging.info(f"Processing token address: {address}") + price_history = self.price_change_calculator.forward( + token_address=address, + frequency=self.price_change_interval, + n_samples=2, + network=self.chain, + ) + logging.info(f"Price history: {price_history}") + if price_history.percent_change >= self.price_change_percentage: + price_alerts.append(f"Price alert for {address}: {price_history.percent_change}% change in the last {self.price_change_interval}.") + + if len(price_alerts) > 0: + return "\n".join(price_alerts) + else: + return "No price alerts found." + + +async def main() -> None: + dotenv.load_dotenv() + logging.basicConfig(level=logging.INFO) + token_addresses = [ + "0x4F9Fd6Be4a90f2620860d680c0d4d5Fb53d1A825", + "0x0b3e328455c4059EEb9e3f84b5543F74E24e7E1b", + "0x731814e491571A2e9eE3c5b1F7f3b962eE8f4870", + ] + + agent = PriceChangeObserver( + token_addresses=token_addresses, + chain="base-mainnet", + price_change_interval="1h", + price_change_percentage=2.0, + ) + + cron_client = CronJobClient( + agent=agent, + client_id="AlphaSwarm Observer Example", + interval_seconds=5, + message_generator=lambda: "Check the price history for any alerts.", + should_process=lambda _: True, + skip_message=lambda _: None, + ) + await asyncio.gather(cron_client.start()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/integration/tools/alchemy/test_alchemy_price_change_calculator.py b/tests/integration/tools/alchemy/test_alchemy_price_change_calculator.py new file mode 100644 index 00000000..b702b59e --- /dev/null +++ b/tests/integration/tools/alchemy/test_alchemy_price_change_calculator.py @@ -0,0 +1,14 @@ +from alphaswarm.services.alchemy import AlchemyClient +from alphaswarm.tools.alchemy.alchemy_price_change import TokenPriceChangeCalculator + + +def test_get_price_change(alchemy_client: AlchemyClient) -> None: + tool = TokenPriceChangeCalculator(alchemy_client) + result = tool.forward( + token_address="0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599", # WBTC + frequency="5m", + n_samples=2, + network="eth-mainnet", + ) + + assert abs(result.percent_change) > 0 From a95a067ac97ad068b597dfdf4902de92729c12fb Mon Sep 17 00:00:00 2001 From: Ethan Date: Thu, 13 Feb 2025 15:57:55 +0000 Subject: [PATCH 3/6] price change observer example --- examples/agents/price_change_observer.py | 77 ++++++++++++------------ 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/examples/agents/price_change_observer.py b/examples/agents/price_change_observer.py index 1973c14b..d5a61792 100644 --- a/examples/agents/price_change_observer.py +++ b/examples/agents/price_change_observer.py @@ -1,37 +1,22 @@ import asyncio +import datetime import dotenv -from enum import Enum import logging from typing import List, Optional from alphaswarm.agent.agent import AlphaSwarmAgent -from alphaswarm.agent.clients import TerminalClient, CronJobClient -from alphaswarm.config import BASE_PATH, Config +from alphaswarm.agent.clients import CronJobClient from alphaswarm.tools.alchemy import TokenPriceChangeCalculator from alphaswarm.services.alchemy import AlchemyClient -class PriceChangeInterval(Enum): - FIVE_MINUTES: str = "5m" - ONE_HOUR: str = "1h" - ONE_DAY: str = "1d" - - -""" -TODO: Finish this in the morning -- we just want the agent to execute a sequence of tool calls, -with some logic to apply thresholds or whatever. - -The point is not that this is needed for this example, but rather that the pattern still works. - -In another example, we'll have an agent that is generating forecasts, which could absoultely use the 'agentic' pattern. -""" class PriceChangeObserver(AlphaSwarmAgent): def __init__( self, token_addresses: List[str], chain: str = "base-mainnet", - price_change_interval: str = PriceChangeInterval.ONE_HOUR, - price_change_percentage: float = 2.0, + price_change_interval: str = "1h", + price_pct_chg_thresh: float = 2.0, ) -> None: """ Initialize the PriceChangeObserver with some `configurable` parameters. @@ -42,7 +27,7 @@ def __init__( token_addresses: List of token addresses to observe chain: Chain to observe price_change_interval: Interval to observe - price_change_percentage: Percentage to observe + price_pct_chg_thresh: Percentage change threshold to observe """ self.alchemy_client = AlchemyClient.from_env() @@ -50,32 +35,47 @@ def __init__( self.token_addresses = token_addresses self.chain = chain self.price_change_interval = price_change_interval - self.price_change_percentage = price_change_percentage + self.price_pct_chg_thresh = price_pct_chg_thresh super().__init__(tools=[self.price_change_calculator]) - async def process_message(self, current_message: str) -> Optional[str]: - # It doesn't matter what the message is, we just want to 'execute' a sequence of tool calls - logging.info(f"Processing message: {current_message}") - - # Get the price history + def get_price_alerts(self) -> str: + """ + Get the price alerts for the token addresses. + """ price_alerts = [] for address in self.token_addresses: - logging.info(f"Processing token address: {address}") price_history = self.price_change_calculator.forward( token_address=address, frequency=self.price_change_interval, n_samples=2, network=self.chain, - ) - logging.info(f"Price history: {price_history}") - if price_history.percent_change >= self.price_change_percentage: - price_alerts.append(f"Price alert for {address}: {price_history.percent_change}% change in the last {self.price_change_interval}.") + ) + + if abs(price_history.percent_change) >= self.price_pct_chg_thresh: + price_alerts.append(f"{address}: {price_history.percent_change}% change in the last {self.price_change_interval}.") + logging.info(f"{len(price_alerts)} price alerts found.") if len(price_alerts) > 0: - return "\n".join(price_alerts) + alert_message = f"Price changes have been observed for the following tokens as of {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: " + alert_message += "\n" + "\n".join(price_alerts) + return alert_message else: - return "No price alerts found." + return "" + + + async def process_message(self, message: str) -> Optional[str]: + """ + Add your logic here to decide what to do with the message. + + Args: + message: Previous messages followed by the current message that includes the price alerts. + + Returns: + The agent's response to the message. + """ + logging.info(f"Agent received alerts: {message}") + return None async def main() -> None: @@ -90,16 +90,17 @@ async def main() -> None: agent = PriceChangeObserver( token_addresses=token_addresses, chain="base-mainnet", - price_change_interval="1h", - price_change_percentage=2.0, + price_change_interval="5m", + price_pct_chg_thresh=0.2, ) cron_client = CronJobClient( agent=agent, - client_id="AlphaSwarm Observer Example", + client_id="Price Change Observer", interval_seconds=5, - message_generator=lambda: "Check the price history for any alerts.", - should_process=lambda _: True, + response_handler=lambda _: None, + message_generator=agent.get_price_alerts, + should_process=lambda alerts: len(alerts) > 0, skip_message=lambda _: None, ) await asyncio.gather(cron_client.start()) From 2c3283f770f7d3b89a49abaaf2ce4b266a23abf1 Mon Sep 17 00:00:00 2001 From: Ethan Date: Thu, 13 Feb 2025 16:00:16 +0000 Subject: [PATCH 4/6] remove interaction example in favor of observer agent example --- .../tools/alchemy/alchemy_price_change.py | 9 ++- examples/agents/price_change_observer.py | 17 +++-- examples/interaction/cron_observer.py | 74 ------------------- 3 files changed, 15 insertions(+), 85 deletions(-) delete mode 100644 examples/interaction/cron_observer.py diff --git a/alphaswarm/tools/alchemy/alchemy_price_change.py b/alphaswarm/tools/alchemy/alchemy_price_change.py index cbbf0301..6ad94e3b 100644 --- a/alphaswarm/tools/alchemy/alchemy_price_change.py +++ b/alphaswarm/tools/alchemy/alchemy_price_change.py @@ -1,10 +1,10 @@ from datetime import datetime from decimal import Decimal -from pydantic import BaseModel from typing import Optional from alphaswarm.services.alchemy import AlchemyClient from alphaswarm.tools.alchemy import AlchemyPriceHistoryByAddress +from pydantic import BaseModel from smolagents import Tool @@ -19,6 +19,7 @@ class TokenPriceChange(BaseModel): n_samples: int frequency: str + class TokenPriceChangeCalculator(Tool): name = "TokenPriceChangeCalculator" description = "Calculate the percentage price change for a token over a specified number of samples and frequency" @@ -53,7 +54,9 @@ def __init__(self, alchemy_client: Optional[AlchemyClient] = None): def _calculate_percent_change(self, start_price: Decimal, end_price: Decimal) -> float: return float((end_price - start_price) / start_price * 100) - def forward(self, token_address: str, frequency: str, n_samples: int, network: str = "eth-mainnet") -> TokenPriceChange: + def forward( + self, token_address: str, frequency: str, n_samples: int, network: str = "eth-mainnet" + ) -> TokenPriceChange: # Calculate the required history in days based on frequency and n_samples interval_to_minutes = {"5m": 5, "1h": 60, "1d": 1440} minutes_needed = interval_to_minutes[frequency] * n_samples @@ -74,7 +77,7 @@ def forward(self, token_address: str, frequency: str, n_samples: int, network: s end_price = prices[-1].value percent_change = self._calculate_percent_change(start_price, end_price) - return TokenPriceChange( + return TokenPriceChange( token_address=token_address, network=network, start_time=prices[0].timestamp, diff --git a/examples/agents/price_change_observer.py b/examples/agents/price_change_observer.py index d5a61792..cbf99034 100644 --- a/examples/agents/price_change_observer.py +++ b/examples/agents/price_change_observer.py @@ -1,13 +1,13 @@ import asyncio import datetime -import dotenv import logging from typing import List, Optional +import dotenv from alphaswarm.agent.agent import AlphaSwarmAgent from alphaswarm.agent.clients import CronJobClient -from alphaswarm.tools.alchemy import TokenPriceChangeCalculator from alphaswarm.services.alchemy import AlchemyClient +from alphaswarm.tools.alchemy import TokenPriceChangeCalculator class PriceChangeObserver(AlphaSwarmAgent): @@ -50,11 +50,13 @@ def get_price_alerts(self) -> str: frequency=self.price_change_interval, n_samples=2, network=self.chain, - ) - + ) + if abs(price_history.percent_change) >= self.price_pct_chg_thresh: - price_alerts.append(f"{address}: {price_history.percent_change}% change in the last {self.price_change_interval}.") - + price_alerts.append( + f"{address}: {price_history.percent_change}% change in the last {self.price_change_interval}." + ) + logging.info(f"{len(price_alerts)} price alerts found.") if len(price_alerts) > 0: alert_message = f"Price changes have been observed for the following tokens as of {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: " @@ -62,12 +64,11 @@ def get_price_alerts(self) -> str: return alert_message else: return "" - async def process_message(self, message: str) -> Optional[str]: """ Add your logic here to decide what to do with the message. - + Args: message: Previous messages followed by the current message that includes the price alerts. diff --git a/examples/interaction/cron_observer.py b/examples/interaction/cron_observer.py deleted file mode 100644 index 8979adbf..00000000 --- a/examples/interaction/cron_observer.py +++ /dev/null @@ -1,74 +0,0 @@ -import asyncio -import logging -from typing import Callable, List - -import dotenv -from alphaswarm.agent.agent import AlphaSwarmAgent -from alphaswarm.agent.clients import CronJobClient -from alphaswarm.config import Config -from alphaswarm.tools.alchemy import AlchemyPriceHistoryBySymbol -from alphaswarm.tools.exchanges import GetTokenPriceTool -from alphaswarm.tools.price_tool import PriceTool -from smolagents import Tool - -logging.getLogger("smolagents").setLevel(logging.ERROR) - - -async def main() -> None: - dotenv.load_dotenv() - config = Config() - - # Initialize tools for price-related operations - # GetTokenPriceTool: Real-time token prices - # AlchemyPriceHistoryBySymbol: Historical price data from Alchemy - tools: List[Tool] = [GetTokenPriceTool(config), AlchemyPriceHistoryBySymbol()] # TODO: what tools to use - - # Initialize the AlphaSwarm agent with the price tools - agent = AlphaSwarmAgent(tools=tools, model_id="anthropic/claude-3-5-sonnet-20241022") - - price_tool = PriceTool() - - def generate_message() -> str: - # Call price tool and see if there's a 24h price change more than `threshold`% - - address = "0x0b3e328455c4059EEb9e3f84b5543F74E24e7E1b".lower() # VIRTUAL - chain = "base" - threshold = 10 # in percentages - - data = price_tool.fetch_price(address=address, chain=chain).json() - change_24h = data[address]["usd_24h_change"] - if abs(change_24h) >= threshold: - return f"Alert! {address} change on {chain} is {change_24h:+.2f}%" - - return ( - f"No alerts, {address} change on {chain} is {change_24h:+.2f}%, " - f"monitoring for at least {threshold:.2f}% change..." - ) - - def should_process_message(message: str) -> bool: - return message.startswith("Alert!") - - def handle_skipped(message: str) -> None: - print(f"Skipped processing message: {message}") - - def response_handler(prefix: str) -> Callable[[str], None]: - # Creates a closure that prints responses with color formatting - def handler(response: str) -> None: - print(f"\033[94m[{prefix}] Received response: {response}\033[0m") - - return handler - - cron_client = CronJobClient( - agent=agent, - client_id="AlphaSwarm Observer Example", - interval_seconds=5, - message_generator=generate_message, - response_handler=response_handler("AlphaSwarm Observer Example"), - should_process=should_process_message, - skip_message=handle_skipped, - ) - await asyncio.gather(cron_client.start()) - - -if __name__ == "__main__": - asyncio.run(main()) From 590c1cc93f9b5e2024e9e4b45c2b3b17ab4657cd Mon Sep 17 00:00:00 2001 From: Ethan Date: Thu, 13 Feb 2025 21:51:13 +0000 Subject: [PATCH 5/6] modified price chg observer example to use a basic LLM task --- alphaswarm/agent/clients/cron_job.py | 4 ++- examples/agents/price_change_observer.py | 40 +++++++++++------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/alphaswarm/agent/clients/cron_job.py b/alphaswarm/agent/clients/cron_job.py index bf473117..9c78476d 100644 --- a/alphaswarm/agent/clients/cron_job.py +++ b/alphaswarm/agent/clients/cron_job.py @@ -16,6 +16,7 @@ def __init__( response_handler: Callable[[str], None] = print, should_process: Callable[[str], bool] = lambda _: True, skip_message: Callable[[str], None] = lambda _: None, + max_history: int = 1, ) -> None: """ Initialize CronJobClient with conditional processing. @@ -28,8 +29,9 @@ def __init__( response_handler: Function to handle agent responses should_process: Function that decides if a message should be processed by agent skip_message: Function to handle skipped messages + max_history: Maximum number of messages to keep in history """ - super().__init__(agent, client_id) + super().__init__(agent, client_id, max_history=max_history) self.interval_seconds = interval_seconds self.message_generator = message_generator self.should_process = should_process diff --git a/examples/agents/price_change_observer.py b/examples/agents/price_change_observer.py index cbf99034..8e8647fd 100644 --- a/examples/agents/price_change_observer.py +++ b/examples/agents/price_change_observer.py @@ -19,9 +19,9 @@ def __init__( price_pct_chg_thresh: float = 2.0, ) -> None: """ - Initialize the PriceChangeObserver with some `configurable` parameters. - This `agent` won't use any LLM calls, rather it will just execute a sequence of tool calls. - It may imply that we want to generalize AlphSwarmAgent so that it doesn't necessarily need a LLM or smolagents. + A basic agent that observes price changes for a set of token addresses. + This agent does not use any LLM calls, instead it will just execute a sequence of tool calls. + It can be adapted to use any LLM by adding logic to thea `process_message` method. Args: token_addresses: List of token addresses to observe @@ -37,7 +37,9 @@ def __init__( self.price_change_interval = price_change_interval self.price_pct_chg_thresh = price_pct_chg_thresh - super().__init__(tools=[self.price_change_calculator]) + hints = "Have any of the price changes increased or decreased (+/- 1%) since the last observation? Respond with either 'yes' or 'no'." + + super().__init__(model_id="gpt-4o-mini", tools=[], hints=hints) def get_price_alerts(self) -> str: """ @@ -65,34 +67,29 @@ def get_price_alerts(self) -> str: else: return "" - async def process_message(self, message: str) -> Optional[str]: - """ - Add your logic here to decide what to do with the message. - - Args: - message: Previous messages followed by the current message that includes the price alerts. - - Returns: - The agent's response to the message. - """ - logging.info(f"Agent received alerts: {message}") - return None + # async def process_message(self, message: str) -> Optional[str]: + # """ + # You can override the `process_message` method to specify how the agent will respond to the price alerts. + # When this method is not overridden, the default LLM-based agent configuration will be used to respond. + # """ + # logging.info(f"Agent received alerts:\n{message}") + # pass async def main() -> None: dotenv.load_dotenv() logging.basicConfig(level=logging.INFO) token_addresses = [ - "0x4F9Fd6Be4a90f2620860d680c0d4d5Fb53d1A825", - "0x0b3e328455c4059EEb9e3f84b5543F74E24e7E1b", - "0x731814e491571A2e9eE3c5b1F7f3b962eE8f4870", + "0x4F9Fd6Be4a90f2620860d680c0d4d5Fb53d1A825", # AIXBT + "0x0b3e328455c4059EEb9e3f84b5543F74E24e7E1b", # VIRTUAL + "0x731814e491571A2e9eE3c5b1F7f3b962eE8f4870", # VADER ] agent = PriceChangeObserver( token_addresses=token_addresses, chain="base-mainnet", - price_change_interval="5m", - price_pct_chg_thresh=0.2, + price_change_interval="5m", # '5m', '1h', or '1d' + price_pct_chg_thresh=0.02, ) cron_client = CronJobClient( @@ -103,6 +100,7 @@ async def main() -> None: message_generator=agent.get_price_alerts, should_process=lambda alerts: len(alerts) > 0, skip_message=lambda _: None, + max_history=2, ) await asyncio.gather(cron_client.start()) From 6d92701f8a3115eef3f61205a237dd81f03f226d Mon Sep 17 00:00:00 2001 From: Ethan Date: Thu, 13 Feb 2025 22:34:47 +0000 Subject: [PATCH 6/6] lint --- examples/agents/price_change_observer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/agents/price_change_observer.py b/examples/agents/price_change_observer.py index 8e8647fd..20987e1a 100644 --- a/examples/agents/price_change_observer.py +++ b/examples/agents/price_change_observer.py @@ -1,7 +1,7 @@ import asyncio import datetime import logging -from typing import List, Optional +from typing import List import dotenv from alphaswarm.agent.agent import AlphaSwarmAgent