For quickstart, installation, and API reference, see README.md.
from agent_friend import MemoryTool, CodeTool, SearchTool, BrowserTool, EmailTool, FileTool, FetchTool, VoiceTool, RSSFeedTool, SchedulerTool, DatabaseTool, GitTool, TableTool, WebhookTool, HTTPTool, CacheTool, NotifyTool, JSONTool, DateTimeTool, ProcessTool, EnvTool, CryptoTool, ValidatorTool, MetricsTool, TemplateTool, DiffTool, RetryTool, HTMLTool, XMLTool, RegexTool, RateLimitTool, QueueTool, EventBusTool, StateMachineTool, MapReduceTool, GraphTool, FormatTool, SearchIndexTool, ConfigTool, ChunkerTool, VectorStoreTool, TimerTool, StatsTool, SamplerTool, WorkflowTool, AlertTool, LockTool, AuditTool, BatchTool, TransformTool, tool
# Use by name (recommended)
friend = Friend(tools=["memory", "code", "search", "browser", "email", "file", "fetch", "voice", "rss", "scheduler", "database", "git", "table", "webhook", "http", "cache", "notify", "json", "datetime", "process", "env"])
# Or use instances for custom config
friend = Friend(tools=[
MemoryTool(db_path="~/.my_agent/memory.db"),
CodeTool(timeout_seconds=10),
SearchTool(max_results=5),
])
# Or register any function as a tool with @tool
@tool
def get_weather(city: str) -> str:
"""Get current weather for a city."""
return f"Sunny in {city}, 22°C" # replace with real API call
friend = Friend(tools=["search", get_weather])MemoryTool — SQLite-backed persistent memory
remember(key, value)— store a factrecall(query)— full-text search memoryforget(key)— remove a fact
CodeTool — Sandboxed code execution
run_code(code, language="python")— run Python or bash, returns stdout+stderr
SearchTool — DuckDuckGo web search (no API key)
search(query, max_results=5)— returns titles, URLs, snippets
BrowserTool — Browser automation (requires agent-browser)
browse(url)— returns page text content
EmailTool — Email via AgentMail (requires free account)
email_list(limit, unread_only)— list inbox messagesemail_read(message_id)— read full message bodyemail_send(to, subject, body, send=False)— draft or send emailemail_threads(limit)— list conversation threads- Set
AGENTMAIL_INBOXenv var to your inbox address
FileTool — Read, write, append, and list local files
file_read(path)— read a file (up to 32 KB, larger files truncated with notice)file_write(path, content)— write a file (creates parent dirs)file_append(path, content)— append to a filefile_list(path, pattern)— list directory contents, optional glob filter- Configure
base_dirto sandbox access to a specific directory
FetchTool — Fetch any URL and read its text content (no API key)
fetch(url, max_chars=8000)— fetches a URL, strips HTML to plain text- Works with web pages, documentation, APIs, raw text files
- Use with SearchTool: search finds URLs, fetch reads them
VoiceTool — Text-to-speech for your agent (zero required dependencies)
speak(text, voice=None)— speaks text aloud or saves to MP3 file- System TTS: espeak/espeak-ng (Linux),
say(macOS), PowerShell (Windows) - Neural TTS: set
AGENT_FRIEND_TTS_URLto use any HTTP TTS server for high-quality voices - Saves audio to
~/.agent_friend/voice/when using HTTP backend - Lets your agent narrate its responses, read documents aloud, or generate audio files
RSSFeedTool — Subscribe to and read RSS/Atom feeds (zero required dependencies)
subscribe(url, name)— save a feed by name for quick accesslist_feeds()— list subscribed feedsread_feed(name, count=5)— get latest items from a subscribed feedfetch_feed(url, count=5)— fetch any RSS/Atom URL directlyunsubscribe(name)— remove a subscribed feed- Supports RSS 2.0, Atom, and RSS 1.0. Strips HTML from summaries automatically.
SchedulerTool — Schedule tasks for your agent to run on a timer or at a specific time
schedule(task_id, prompt, interval_minutes=None, run_at=None)— create a recurring or one-shot taskrun_pending()— check and return tasks that are due (use withagent-friend scheduleCLI)list_scheduled()— see all scheduled tasks and their next run timescancel(task_id)— remove a scheduled taskclear_all()— remove all tasks- Stores schedule in
~/.agent_friend/scheduler.json. Zero dependencies.
DatabaseTool — Create and query SQLite databases (zero dependencies)
db_execute(sql, params=[])— CREATE TABLE, INSERT, UPDATE, DELETEdb_query(sql, params=[])— SELECT and return results as a formatted tabledb_tables()— list all tables in the databasedb_schema(table)— get the CREATE TABLE statement for any table- Python API:
create_table(),insert(),query(),run(),list_tables(),get_schema() - Backed by
~/.agent_friend/agent.db. Your agent can store and query structured data persistently.
GitTool — read and commit to git repositories (requires git installed)
git_status(repo_dir)— working tree statusgit_diff(staged, path, repo_dir)— unstaged or staged diffgit_log(n, oneline, repo_dir)— commit historygit_add(paths, repo_dir)— stage files for commitgit_commit(message, repo_dir)— commit staged changesgit_branch_list(repo_dir)— list all local branchesgit_branch_create(name, checkout, repo_dir)— create a new branch- Python API:
git.status(),git.diff(),git.log(),git.add(),git.commit(),git.branch_list(),git.branch_create()
from agent_friend import Friend, GitTool
# Point at a specific repo
git = GitTool(repo_dir="/path/to/repo")
friend = Friend(tools=["search", "code", "file", git])
friend.chat("Show me the git status and recent commits")
friend.chat("Stage all changes to src/ and commit with message 'Refactor auth flow'")
# Default: uses current working directory
friend = Friend(tools=["git"])
friend.chat("What changed in the last 5 commits?")TableTool — read, filter, and aggregate CSV/TSV files (no pandas)
table_read(filepath)— read CSV/TSV, return rows as JSONtable_columns(filepath)— list column namestable_filter(filepath, column, operator, value)— filter rows (eq/ne/gt/lt/gte/lte/contains/startswith)table_aggregate(filepath, column, operation)— count/sum/avg/min/max/unique over a columntable_write(filepath, rows, delimiter)— write rows to CSV- Python API:
read(),write(),columns(),filter_rows(),aggregate(),append_row() - Auto-detects delimiter (comma vs tab). Zero dependencies.
from agent_friend import Friend, TableTool
table = TableTool()
friend = Friend(tools=["search", "code", table])
friend.chat("Read sales.csv and tell me the average revenue by region")
friend.chat("Filter transactions.csv to rows where amount > 1000")WebhookTool — receive incoming webhooks (payment callbacks, GitHub events, form submissions)
wait_for_webhook(path, timeout)— start HTTP server and wait for a POST request- Returns: path, headers, body (str), json (parsed dict or None), received_at timestamp
- Port 0 = auto-assign random available port. Server shuts down after receiving one request.
from agent_friend import Friend, WebhookTool
# Agent waits for a payment webhook, then reacts
hook = WebhookTool(port=8765)
friend = Friend(tools=["code", "memory", hook])
response = friend.chat(
"Wait for a webhook at /payment with 60 second timeout. "
"When it arrives, log the amount to memory."
)
# In another terminal: curl -X POST http://localhost:8765/payment -d '{"amount": 99.99}'HTTPTool — generic REST API client (GET/POST/PUT/PATCH/DELETE with auth headers)
http_request(method, url, headers, body, body_text)— make any HTTP request- Returns: status code, response headers, body (str), json (parsed dict if JSON response)
default_headersconstructor param for auth headers shared across all requests- No requests library required — stdlib only
from agent_friend import Friend, HTTPTool
# API client with auth headers baked in
http = HTTPTool(default_headers={"Authorization": "Bearer sk-..."})
friend = Friend(tools=["memory", http])
response = friend.chat(
"POST to https://api.example.com/orders with body {\"item\": \"widget\", \"qty\": 5}"
)
# One-off requests without config
friend = Friend(tools=["search", "http"])
friend.chat("GET https://api.github.com/repos/0-co/agent-friend and summarize the stats")CacheTool — key-value cache with TTL expiry, persisted to disk
cache_get(key)— retrieve a cached value (returnsnullif missing or expired)cache_set(key, value, ttl_seconds=3600)— store a value with optional TTLcache_delete(key)— remove one entrycache_clear()— remove all entriescache_stats()— JSON with entry count, hit/miss counts
from agent_friend import Friend, CacheTool
friend = Friend(tools=["http", "cache"])
response = friend.chat(
"Fetch the GitHub stars for 0-co/agent-friend. "
"Cache the result under 'gh_stars' for 1 hour. "
"If it's already cached, use the cached value."
)
# Python API
cache = CacheTool()
cache.cache_set("weather_nyc", '{"temp": 72, "sky": "clear"}', ttl_seconds=3600)
result = cache.cache_get("weather_nyc") # returns value within 1 hour, else None
print(cache.cache_stats()) # {"entries": 1, "session_hits": 1, "session_misses": 0, ...}NotifyTool — send notifications when tasks complete (desktop, file log, or terminal bell)
notify(title, message)— best available channel (desktop → file fallback)notify_desktop(title, message)— system notification (notify-send / osascript)notify_file(title, message, path=None)— append to JSONL log filebell()— terminal bell characterread_notifications(n=10)— read last N notifications from log
from agent_friend import Friend
# Agent that notifies you when a long task is done
friend = Friend(
seed="Run the report, then notify the user when complete.",
tools=["scheduler", "notify"],
)
friend.chat("Run the daily news summary at 8:00 UTC and notify me when it's done")
# Python API — useful in scripts
from agent_friend import NotifyTool
notifier = NotifyTool()
notifier.notify("Report ready", "Daily news summary complete") # desktop or file
notifier.notify_file("Error", "API timeout after 30s retry") # always works
entries = notifier.read_notifications(n=5) # last 5 entriesJSONTool — parse, query, and transform JSON data with dot-notation paths
json_get(data, path)— extract value at path ("user.name","items[0].id","users[*].email")json_set(data, path, value)— return modified JSON with value set at pathjson_keys(data)— list top-level keysjson_filter(data, key, value)— filter array by key=valuejson_format(data, indent=2)— pretty-printjson_merge(base, patch)— merge two objects (patch overrides base)
from agent_friend import Friend, JSONTool
friend = Friend(tools=["http", "json"])
response = friend.chat(
"GET https://pypi.org/pypi/requests/json and extract the latest version from info.version"
)
# Python API
from agent_friend import JSONTool
jt = JSONTool()
data = '{"user": {"name": "Alice"}, "tags": ["ai", "python"]}'
jt.json_get(data, "user.name") # '"Alice"'
jt.json_get(data, "tags[0]") # '"ai"'
jt.json_set(data, "user.email", '"a@b.com"') # modified JSON
jt.json_filter('[{"role":"admin"},{"role":"user"}]', "role", '"admin"')DateTimeTool — date and time operations without CodeTool
now(timezone)— current datetime in any IANA timezoneparse(text)— parse date strings (ISO 8601, natural language, slashes)format_dt(dt_str, fmt)— strftime formattingdiff(a, b, unit)— time difference in seconds/minutes/hours/daysadd_duration(dt_str, days, hours, minutes, seconds)— date arithmeticconvert_timezone(dt_str, to_tz)— timezone conversionto_timestamp(dt_str)/from_timestamp(ts)— Unix timestamp conversion
from agent_friend import Friend, DateTimeTool
friend = Friend(tools=["datetime", "scheduler"])
response = friend.chat("Schedule a reminder for 7 days from now and tell me the date")
# Python API
from agent_friend import DateTimeTool
dt = DateTimeTool()
dt.now("America/New_York") # "2026-03-12T10:53:00-04:00"
dt.diff("2026-03-12", "2026-04-01", "days") # "20.0"
dt.add_duration("2026-03-12T00:00:00", days=7) # "2026-03-19T00:00:00+00:00"
dt.convert_timezone("2026-03-12T12:00:00", to_tz="Asia/Tokyo") # "2026-03-12T21:00:00+09:00"ProcessTool — run shell commands and scripts from your agent
run(command, timeout, cwd, env, shell)— run any shell command, get stdout/stderr/returncoderun_script(script, timeout, cwd, interpreter)— execute multi-line bash/python scriptswhich(name)— find the full path of an executable in PATH- All stdlib —
subprocess+shutil+shlex. Configurable timeouts.
from agent_friend import Friend, ProcessTool
friend = Friend(tools=["process", "file"])
response = friend.chat("Check if git is installed, then run git log --oneline -5")
# Python API
from agent_friend import ProcessTool
proc = ProcessTool(timeout=30)
proc.run("git status") # {"success": true, "stdout": "...", ...}
proc.which("python3") # {"path": "/usr/bin/python3"}
proc.run_script("echo hi\npython3 --version") # multi-line scriptEnvTool — read, set, and verify environment variables; load .env files
env_get(key, default=None)— get an env var's value (sensitive vars return[hidden])env_set(key, value)— set a var for the current processenv_list(prefix="")— list visible vars as JSON, filtered by optional prefixenv_check(keys)— verify required vars are set —{ok: bool, present: [...], missing: [...]}env_load(path=".env")— load key=value pairs from a.envfile (won't overwrite existing vars)- Sensitive variable names (KEY, TOKEN, SECRET, etc.) are hidden from
env_getandenv_list
from agent_friend import Friend, EnvTool
# Check API keys are set before calling external services
friend = Friend(tools=["env", "http"])
response = friend.chat(
"Check that OPENAI_API_KEY and DATABASE_URL are set. "
"If DATABASE_URL is missing, load it from .env"
)
# Python API
from agent_friend import EnvTool
env = EnvTool()
env.env_load(".env") # loads .env into os.environ
env.env_check(["OPENAI_API_KEY", "DATABASE_URL"]) # {"ok": false, "missing": ["DATABASE_URL"]}
env.env_get("HOME") # "/home/user"
env.env_list(prefix="AWS_") # lists all AWS_ vars
env.env_set("LOG_LEVEL", "debug") # set for current processCryptoTool — cryptographic utilities: tokens, hashing, HMAC, UUID, base64
generate_token(length=32)— secure random hex token (32 bytes → 64-char hex)hash_data(data, algorithm='sha256')— SHA-256/512/etc hex digesthmac_sign(data, secret, algorithm='sha256')— sign data with HMAChmac_verify(data, secret, signature)— verify HMAC signature (constant-time)uuid4()— generate a random UUID4base64_encode(data, url_safe=False)/base64_decode(data, url_safe=False)random_bytes(length=16)— random bytes as hex (for nonces, salts)- All stdlib — zero dependencies
from agent_friend import CryptoTool
crypto = CryptoTool()
crypto.generate_token() # "a3f9b2..." (64-char hex)
crypto.hash_data("hello", "sha256") # "2cf24d..."
sig = crypto.hmac_sign("payload", "secret") # HMAC-SHA256 hex
crypto.hmac_verify("payload", "secret", sig) # True
crypto.uuid4() # "550e8400-e29b-41d4-..."
crypto.base64_encode("hello") # "aGVsbG8="ValidatorTool — validate user inputs before acting on them
validate_email(email)— RFC 5322 format check →{valid, local, domain}validate_url(url, allowed_schemes=['http','https'])— scheme + host checkvalidate_ip(ip)— IPv4/IPv6 →{valid, version, is_private, is_loopback}validate_uuid(value)— UUID format check →{valid, version, variant}validate_json(value, required_keys=None)— parse + optional key checkvalidate_range(value, min_val, max_val)— numeric boundsvalidate_pattern(value, pattern, flags='')— regex match →{valid, groups}validate_length(value, min_length, max_length)— string/list lengthvalidate_type(value, expected_type)— type check (string/int/float/bool/list/dict/null)
from agent_friend import ValidatorTool
v = ValidatorTool()
v.validate_email("user@example.com") # {"valid": True, ...}
v.validate_url("https://github.com") # {"valid": True, "scheme": "https", ...}
v.validate_ip("192.168.1.1") # {"valid": True, "is_private": True}
v.validate_json('{"x":1}', required_keys=["x", "y"]) # {"valid": False, missing "y"}
v.validate_range(42, min_val=0, max_val=100) # {"valid": True}
v.validate_pattern("2026-03-12", r"(\d{4})-(\d{2})-(\d{2})") # groups: ["2026","03","12"]MetricsTool — session-scoped counters, gauges, and timers for your agent
metric_increment(name, value=1.0)— increment a counter (tracks count, total, min, max, last)metric_gauge(name, value)— set a gauge to a specific valuemetric_timer_start(name)→ timer_id — start a timermetric_timer_stop(timer_id)— stop timer, records elapsed_ms (count, total, min, max, avg)metric_get(name)— get current metric statemetric_list()— list all metric names and typesmetric_summary()— all metrics as a dictmetric_reset(name=None)— reset one metric or allmetric_export(format="json")— export as JSON or Prometheus text format
from agent_friend import MetricsTool
m = MetricsTool()
m.metric_increment("api_calls")
m.metric_increment("api_calls", 3) # total: 4
m.metric_gauge("queue_depth", 42)
timer_id = m.metric_timer_start("search")
# ... do work ...
m.metric_timer_stop(timer_id) # records elapsed_ms
m.metric_export("prometheus")
# # TYPE api_calls counter
# api_calls_total 4.0
# # TYPE queue_depth gauge
# queue_depth 42.0TemplateTool — parameterized string templates for prompts and content
template_render(template, variables)— render${variable}substitutionstemplate_save(name, template)— save a named template for reusetemplate_render_named(name, variables)— render a saved templatetemplate_variables(template)— extract all variable names from a templatetemplate_validate(template, variables)— check for missing/extra variablestemplate_list()— list all saved templatestemplate_get(name)/template_delete(name)— manage saved templates
from agent_friend import TemplateTool
t = TemplateTool()
t.template_save("search_prompt", "Search for ${topic} from ${start_date} to ${end_date}.")
t.template_render_named("search_prompt", {"topic": "AI agents", "start_date": "2025", "end_date": "2026"})
# "Search for AI agents from 2025 to 2026."
# Check what variables a template needs before rendering
t.template_variables("Dear ${name}, your order ${order_id} is ${status}.")
# {"variables": ["name", "order_id", "status"], "count": 3}DiffTool — compare text and files with unified diffs, word-level comparison, and similarity scoring
diff_text(text_a, text_b, context=3)— unified diff between two stringsdiff_files(path_a, path_b)— unified diff between two filesdiff_words(text_a, text_b)— inline word-level diff (+added,-removed)diff_stats(text_a, text_b)— similarity ratio, added/removed chars and linesdiff_similar(query, candidates, top_n=5)— find closest matches from a list
from agent_friend import DiffTool
d = DiffTool()
result = d.diff_text("def foo():\n return 1\n", "def foo():\n return 42\n")
print(result["unified"])
# --- before
# +++ after
# @@ -1,2 +1,2 @@
# def foo():
# - return 1
# + return 42
d.diff_stats("apple pie", "apple sauce")
# {"similarity": 0.67, "added_chars": 5, "removed_chars": 3, ...}
d.diff_similar("agnet-friend", ["agent-friend", "agent-lib", "agentsmith"])
# [{"text": "agent-friend", "score": 0.93}, ...]RetryTool — retry HTTP requests and shell commands with exponential back-off + circuit breaker
retry_http(method, url, body, headers, max_attempts=3, delay_seconds=1.0, backoff_factor=2.0, jitter=True)— HTTP with auto-retry on 429/5xx/network errorsretry_shell(command, max_attempts=3, delay_seconds=1.0, backoff_factor=2.0)— shell command with retry on non-zero exitretry_status()— stats: total calls, retries, successes, failurescircuit_create(name, max_failures=5, reset_timeout_seconds=60)— create a named circuit breakercircuit_call(name, method, url, body, headers)— HTTP call through circuit breaker (returns instantly if circuit is open)circuit_status(name)— current state: closed / open / half-open, failure countcircuit_reset(name)— manually close a tripped circuit
from agent_friend import RetryTool
r = RetryTool()
# Retry a flaky API — waits 1s, 2s, 4s between attempts
result = r.retry_http("GET", "https://api.example.com/data", max_attempts=3)
# {"ok": True, "status": 200, "body": "...", "attempts": 2}
# Circuit breaker — stops hammering after 3 failures
r.circuit_create("payments", max_failures=3, reset_timeout_seconds=30)
r.circuit_call("payments", "POST", "https://pay.example.com/charge", body='{"amount": 100}')
r.circuit_status("payments") # {"state": "open", "failures": 3, ...}HTMLTool — parse HTML and extract text, links, headings, tables, and meta tags
html_text(html, max_chars=20000)— extract visible text, stripping all tags and skipping script/style blockshtml_links(html, base_url="")— list of{text, href}dicts for every<a>taghtml_headings(html)— list of{level, text}dicts for<h1>–<h6>html_meta(html)— page{title, meta}including Open Graph and description tagshtml_tables(html)— list of tables, each a list of rows, each a list of cell stringshtml_select(html, tag, attrs={})— text content of all matching elements (simple CSS-like selector)
from agent_friend import HTMLTool, FetchTool
# Fetch a page, then extract what you need
fetch = FetchTool()
html_tool = HTMLTool()
# html = fetch.fetch_url("https://news.ycombinator.com") # if FetchTool returns HTML
html = "<h1>Agent News</h1><p>New tool <a href='/retry'>RetryTool</a> shipped.</p>"
html_tool.html_text(html)
# "Agent News\nNew tool RetryTool shipped."
html_tool.html_links(html, base_url="https://example.com")
# [{"text": "RetryTool", "href": "https://example.com/retry"}]
html_tool.html_headings(html)
# [{"level": 1, "text": "Agent News"}]
# Extract prices from a shopping page
html_tool.html_select(html, "span", {"class": "price"})
# ["$29.99", "$49.99", ...]XMLTool — parse XML, run XPath queries, and convert to JSON
xml_extract(xml, tag)— text content of all matching tags:["Apple", "Banana"]xml_attrs(xml, tag)— attributes of all matching tags:[{"id": "1"}, {"id": "2"}]xml_find(xml, xpath)— first match:{tag, text, attrs, children}xml_findall(xml, xpath)— all matches as list of{tag, text, attrs}xml_to_dict(xml)— XML → nested dict (attrs get@prefix, repeated tags → list)xml_validate(xml)—{valid: true/false}— check XML is well-formedxml_tags(xml)— tag name → occurrence count (explore unfamiliar XML)
from agent_friend import XMLTool
x = XMLTool()
xml = """<catalog>
<book id="1"><title>Agent Patterns</title><price>29.99</price></book>
<book id="2"><title>Async Python</title><price>24.99</price></book>
</catalog>"""
x.xml_extract(xml, "title") # '["Agent Patterns", "Async Python"]'
x.xml_attrs(xml, "book") # '[{"id": "1"}, {"id": "2"}]'
x.xml_find(xml, ".//book[@id='2']")
# {"found": true, "tag": "book", "text": "", "attrs": {"id": "2"}, "children": [...]}
x.xml_to_dict(xml) # nested dict representation
x.xml_tags(xml) # {"catalog": 1, "book": 2, "title": 2, "price": 2}RegexTool — regular expression operations: match, search, findall, replace, split, extract groups
regex_match(pattern, text, flags=[])— match at the start of text →{matched, match, start, end, groups, named_groups}regex_search(pattern, text, flags=[])— find first occurrence anywhere in text → same structureregex_findall(pattern, text, flags=[])— all non-overlapping matches as a listregex_findall_with_positions(pattern, text, flags=[])— matches with start/end positionsregex_replace(pattern, replacement, text, count=0)— replace (backreferences:\\1,\\g<name>)regex_split(pattern, text, maxsplit=0)— split text by pattern → list of stringsregex_extract_groups(pattern, text)— all matches with captured groupsregex_validate(pattern)—{valid: true/false}— check a pattern is validregex_escape(text)— escape a string so it matches literally in a pattern- Flags:
IGNORECASE,MULTILINE,DOTALL,VERBOSE
from agent_friend import RegexTool
rx = RegexTool()
# Extract version numbers
rx.regex_findall(r"\d+\.\d+\.\d+", "v0.28.0 and v0.27.0 released")
# '["0.28.0", "0.27.0"]'
# Named groups
rx.regex_search(r"(?P<user>\w+)@(?P<domain>[\w.]+)", "Contact alice@example.com")
# '{"matched": true, "named_groups": {"user": "alice", "domain": "example.com"}, ...}'
# Replace with backreference
rx.regex_replace(r"(\w+)\s+(\w+)", r"\2 \1", "hello world") # "world hello"
# Redact sensitive data
rx.regex_replace(r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "****", text)
# Case-insensitive findall
rx.regex_findall("error|warning", log_text, flags=["IGNORECASE"])
# Build a safe literal pattern from user input
escaped = rx.regex_escape("$1.00 (special offer)")
rx.regex_search(escaped, price_text) # matches the literal stringRateLimitTool — rate limiting for agent API calls: fixed window, sliding window, token bucket
limiter_create(name, max_calls=10, window_seconds=60, algorithm="fixed")— create a named limiterlimiter_check(name)— check if a request is allowed without consuming a tokenlimiter_consume(name)— record that a request was made (consumes a token)limiter_acquire(name)— atomically check and consume — the most common operationlimiter_status(name)— current state: count, remaining, algorithm, window size, tokenslimiter_reset(name)— reset counters to initial statelimiter_delete(name)/limiter_list()— manage limiters- Algorithms:
fixed(simple counter),sliding(no boundary burst),token_bucket(smooth rate + burst)
from agent_friend import RateLimitTool
r = RateLimitTool()
# 10 requests per 60 seconds, sliding window (no boundary burst)
r.limiter_create("openai", max_calls=10, window_seconds=60, algorithm="sliding")
# Check-and-consume before every API call
result = json.loads(r.limiter_acquire("openai"))
if result["allowed"]:
# make the API call
pass
else:
print(f"Rate limited. Try again in {result['reset_in_seconds']:.1f}s")
# Token bucket — smooth rate with burst capacity
r.limiter_create("github", algorithm="token_bucket", rate_per_second=1.0, burst_capacity=10.0)
# List all active limiters
print(r.limiter_list())QueueTool — named work queues: FIFO, LIFO (stack), and priority queue
queue_create(name, kind="fifo", maxsize=0)— create a named queuequeue_push(name, item, priority=0.0)— add an item (any JSON value)queue_pop(name)— remove and return the next item →{item, size}or{empty: true}queue_peek(name)— inspect next item without removing itqueue_size(name)— current item countqueue_clear(name)/queue_delete(name)/queue_list()— manage queues- Priority queue: lower priority number = more urgent (min-heap). Same priority → FIFO order.
from agent_friend import QueueTool
q = QueueTool()
# FIFO work queue — process URLs in order
q.queue_create("urls")
q.queue_push("urls", {"url": "https://example.com", "action": "scrape"})
q.queue_push("urls", {"url": "https://other.com", "action": "scrape"})
while True:
result = json.loads(q.queue_pop("urls"))
if result.get("empty"):
break
item = result["item"]
# process item["url"]
# Priority queue — handle critical alerts first
q.queue_create("alerts", kind="priority")
q.queue_push("alerts", "disk full", priority=1) # urgent
q.queue_push("alerts", "CPU usage high", priority=5) # normal
q.queue_push("alerts", "log rotate", priority=10) # low
print(json.loads(q.queue_pop("alerts"))["item"]) # "disk full"EventBusTool — in-process pub/sub event bus for decoupled agent coordination
bus_subscribe(topic, subscriber)— subscribe to a topic; usetopic="*"for wildcardbus_unsubscribe(topic, subscriber)— unsubscribe from a topicbus_publish(topic, data)— emit an event → notifies all subscribers in orderbus_history(topic, n=10)— get the n most recent events for a topicbus_topics()— list all topics with subscriber and event countsbus_subscribers(topic)— list subscribers to a topicbus_stats()— total events published, per-subscriber call countsbus_clear(topic=None)— clear a topic or all topics
from agent_friend import EventBusTool
bus = EventBusTool()
# Subscribe
bus.bus_subscribe("new_url", "scraper")
bus.bus_subscribe("new_url", "logger")
bus.bus_subscribe("*", "auditor") # receives ALL events
# Publish
bus.bus_publish("new_url", {"url": "https://example.com", "priority": 1})
# scraper, logger, and auditor are all notified
# Read history
history = json.loads(bus.bus_history("new_url", n=5))
# [{"event_id": 1, "topic": "new_url", "data": {...}, "timestamp": 1741754400.0}]
# Observability
stats = json.loads(bus.bus_stats())
# {"total_events": 1, "subscriber_counts": {"scraper": 1, "logger": 1, "auditor": 1}}SearchIndexTool — in-memory full-text search over JSON document collections
index_add(name, docs)— index a list of dicts; auto-creates the indexindex_search(name, query, top_n=10, field=None)— BM25-lite relevance search; returns docs with_scoreindex_create(name, fields=[])— create index; fields restricts which keys are indexedindex_delete_doc(name, doc_id)— remove a document by_idindex_list_docs(name, limit, offset)— paginated list of indexed documentsindex_status(name)— doc count, token count, fieldsindex_drop(name)/index_list()— manage indexes- Stop words filtered automatically. Case-insensitive. Pairs with HTTPTool and HTMLTool.
from agent_friend import SearchIndexTool
import json
idx = SearchIndexTool()
# Index API results
docs = [
{"id": 1, "title": "Python packaging guide", "body": "publish packages to PyPI"},
{"id": 2, "title": "Agent memory patterns", "body": "persistent memory using SQLite"},
{"id": 3, "title": "Rate limiting API calls", "body": "limit openai and anthropic calls"},
{"id": 4, "title": "Python async programming", "body": "asyncio and coroutines"},
]
idx.index_add("articles", docs)
# Search with BM25 relevance
results = json.loads(idx.index_search("articles", "python"))
# [{"id": 1, "title": "Python packaging...", "_score": 0.42, ...},
# {"id": 4, "title": "Python async...", "_score": 0.42, ...}]
# Field-restricted search
results = json.loads(idx.index_search("articles", "python", field="title"))
# Multi-word search (union of terms, ranked by relevance)
results = json.loads(idx.index_search("articles", "rate limit api", top_n=1))
# [{"id": 3, "title": "Rate limiting API calls", "_score": ..., ...}]
# Status
json.loads(idx.index_status("articles"))
# {"name": "articles", "doc_count": 4, "token_count": 18, ...}ConfigTool — hierarchical key-value configuration management
config_set(name, key, value)— set a key (dot-notation OK:"db.host")config_get(name, key, default=None, as_type=None)— get with optional type coercion (int/float/bool/str/json)config_defaults(name, defaults)— set multiple defaults (only where key not already set)config_load_env(name, prefix="", strip_prefix=True, lowercase=True)— populate from env vars;__→.for dot-notationconfig_list(name, prefix="")— list all keys, optionally filtered by prefixconfig_delete(name, key)/config_dump(name)— remove a key or export all as JSONconfig_require(name, keys)— assert required keys exist; returns{ok: false, missing: [...]}config_drop(name)/config_list_stores()— manage named config stores- Multiple named configs per instance. Max 20 stores, 1000 keys each (configurable).
from agent_friend import ConfigTool
import json
cfg = ConfigTool()
# Set config values with dot-notation keys
cfg.config_set("app", "db.host", "localhost")
cfg.config_set("app", "db.port", 5432)
cfg.config_set("app", "debug", True)
# Get with type coercion
json.loads(cfg.config_get("app", "db.host")) # {"value": "localhost", "found": True}
json.loads(cfg.config_get("app", "db.port", as_type="int")) # {"value": 5432, ...}
# Load from environment (APP_DB__HOST → db.host)
cfg.config_load_env("app", prefix="APP_", strip_prefix=True, lowercase=True)
# Set defaults (won't overwrite existing keys)
cfg.config_defaults("app", {"db.host": "127.0.0.1", "timeout": 30})
# Assert required keys before starting
json.loads(cfg.config_require("app", ["db.host", "db.port"]))
# {"ok": true, "missing": []}
# List keys by prefix
json.loads(cfg.config_list("app", prefix="db."))
# ["db.host", "db.port"]
# Export
json.loads(cfg.config_dump("app"))
# {"db.host": "localhost", "db.port": 5432, "debug": true}ChunkerTool — split long text and lists into chunks for LLM context windows
chunk_text(text, max_chars=2000, overlap=0, mode="chars")— split by chars, tokens, sentences, or paragraphschunk_list(items, size=10)— split a list into batches of sizechunk_by_separator(text, separator, max_chars=0, keep_separator=False)— split on custom delimiter, optionally merge up to max_charschunk_sliding_window(text, window_chars=500, step_chars=250)— overlapping sliding window; returns start/end offsetschunk_stats(text)— char_count, token_estimate, word_count, sentence_count, paragraph_count- Token estimate: ~4 chars per token (GPT/Claude heuristic). Pairs with SearchIndexTool for RAG pipelines.
from agent_friend import ChunkerTool
import json
chunker = ChunkerTool()
# Split a long document into ~500 char chunks with 50 char overlap
chunks = json.loads(chunker.chunk_text(long_doc, max_chars=500, overlap=50))
# [{"index": 0, "text": "...", "char_count": 500, "token_estimate": 125}, ...]
# Split by sentence boundaries (fits sentences into 1000-char groups)
chunks = json.loads(chunker.chunk_text(doc, max_chars=1000, mode="sentences"))
# Split by paragraph
chunks = json.loads(chunker.chunk_text(doc, mode="paragraphs"))
# Batch a list of URLs for parallel processing
batches = json.loads(chunker.chunk_list(urls, size=10))
# [{"index": 0, "items": [url1..url10], "count": 10}, ...]
# Sliding window for context-aware chunking
windows = json.loads(chunker.chunk_sliding_window(text, window_chars=1000, step_chars=500))
# [{"index": 0, "text": "...", "start": 0, "end": 1000}, ...]
# Stats before chunking
stats = json.loads(chunker.chunk_stats(text))
# {"char_count": 15000, "token_estimate": 3750, "sentence_count": 120, ...}VectorStoreTool — in-memory vector store with cosine/euclidean/dot similarity search
vector_add(name, vector, metadata={}, doc_id=None)— store an embedding; auto-generates UUID4 IDvector_search(name, query, top_k=5, metric="cosine", threshold=None)— nearest-neighbour search; returns[{id, score, metadata}]vector_get(name, doc_id)/vector_delete(name, doc_id)— retrieve or remove by IDvector_list(name, offset=0, limit=100)— paginated list of stored IDsvector_stats(name)— count, dim, max_vectorsvector_drop(name)/vector_list_stores()— manage named stores- Metrics: cosine (default), euclidean (inverted distance), dot product. Pairs with ChunkerTool for RAG pipelines.
from agent_friend import VectorStoreTool
import json
vs = VectorStoreTool()
# Store embeddings (from Anthropic/OpenAI embedding API)
vs.vector_add("docs", [0.1, 0.9, 0.3], metadata={"text": "cats and kittens"}, doc_id="doc1")
vs.vector_add("docs", [0.8, 0.1, 0.5], metadata={"text": "dogs and puppies"}, doc_id="doc2")
vs.vector_add("docs", [0.15, 0.85, 0.25], metadata={"text": "feline companions"}, doc_id="doc3")
# Find nearest neighbours (cosine similarity)
results = json.loads(vs.vector_search("docs", [0.1, 0.9, 0.3], top_k=2))
# [{"id": "doc1", "score": 1.0, "metadata": {"text": "cats and kittens"}},
# {"id": "doc3", "score": 0.999, "metadata": {"text": "feline companions"}}]
# Filter by minimum similarity
results = json.loads(vs.vector_search("docs", query, threshold=0.9))
# Euclidean distance (closer = higher score)
results = json.loads(vs.vector_search("docs", query, metric="euclidean"))
# Stats
stats = json.loads(vs.vector_stats("docs"))
# {"count": 3, "dim": 3, "max_vectors": 10000}TimerTool — named stopwatch timers, countdowns, and shell command benchmarking
timer_start(name)/timer_stop(name)— start/stop a named stopwatch; returns elapsed_ms and elapsed_stimer_elapsed(name)— get elapsed time without stoppingtimer_lap(name)— record a lap split; returns lap_ms, lap_number, total_elapsed_mstimer_reset(name)/timer_delete(name)— reset or remove a timertimer_list()— list all timers with current elapsed and lap splitscountdown_start(name, seconds)/countdown_remaining(name)— countdown timers with expiry detectiontimer_benchmark(command, runs=3)— time a shell command N times; returns avg/min/max_ms
from agent_friend import TimerTool
import json
t = TimerTool()
# Basic stopwatch
t.timer_start("search")
# ... do work ...
r = json.loads(t.timer_stop("search"))
print(f"Search took {r['elapsed_ms']:.1f}ms")
# Lap timing
t.timer_start("pipeline")
# stage 1
json.loads(t.timer_lap("pipeline")) # lap 1
# stage 2
r = json.loads(t.timer_stop("pipeline"))
print(f"Laps: {r['laps']}") # [142.3, 287.1]
# Countdown
t.countdown_start("timeout", 30)
r = json.loads(t.countdown_remaining("timeout"))
# {"remaining_s": 29.8, "expired": false}
# Benchmark a command
r = json.loads(t.timer_benchmark("curl -s https://example.com", runs=3))
print(f"avg={r['avg_ms']:.1f}ms min={r['min_ms']:.1f}ms max={r['max_ms']:.1f}ms")StatsTool — descriptive statistics for numeric data (no numpy, no pandas)
stats_describe(values, percentiles=[25,50,75])— count/mean/median/std/variance/min/max/range/percentilesstats_histogram(values, bins=10)— frequency histogram with range, count, frequency per binstats_correlation(x, y)— Pearson r, r_squared, interpretation (strong/moderate/weak positive/negative)stats_normalize(values, method="minmax")— min-max [0,1] or z-score normalizationstats_outliers(values, method="iqr", threshold=1.5)— IQR or z-score outlier detectionstats_moving_average(values, window=3, kind="simple")— SMA or EMA (alpha=0.3)stats_frequency(values, top_n=20)— frequency count with percent for categorical data
from agent_friend import StatsTool
import json
stats = StatsTool()
data = [2, 4, 4, 4, 5, 5, 7, 9]
# Descriptive statistics
r = json.loads(stats.stats_describe(data))
# {"count": 8, "mean": 5.0, "median": 4.5, "std": 2.14, "min": 2, "max": 9, ...}
# Histogram
r = json.loads(stats.stats_histogram(data, bins=4))
# {"bins": [{"range_start": 2.0, "range_end": 3.75, "count": 1, "frequency": 0.125}, ...]}
# Correlation
r = json.loads(stats.stats_correlation([1,2,3,4,5], [2,4,6,8,10]))
# {"r": 1.0, "r_squared": 1.0, "interpretation": "strong_positive"}
# Detect outliers
r = json.loads(stats.stats_outliers([1, 2, 3, 4, 100], method="iqr"))
# {"outliers": [{"index": 4, "value": 100.0}], "clean": [1,2,3,4], ...}
# 3-period moving average
r = json.loads(stats.stats_moving_average([1,2,3,4,5], window=3))
# {"values": [1.0, 1.5, 2.0, 3.0, 4.0], ...}SamplerTool — random sampling, shuffling, and data splitting
sample_list(items, n, seed=None, replacement=False)— random sample; deterministic with seedsample_weighted(items, weights, n=1, seed=None)— weighted selection; weights auto-normalizedsample_stratified(groups, n_per_group, seed=None)— balanced sampling across categoriesshuffle(items, seed=None)— return shuffled copy (original unchanged)random_split(items, ratios=[0.8, 0.2], seed=None)— train/test split or N-way partitionrandom_choice(items, seed=None)— pick one item; returns {choice, index}random_int(low, high, n=1, seed=None)/random_float(low, high, n=1)— reproducible random numbers
from agent_friend import SamplerTool
import json
sampler = SamplerTool()
# Deterministic random sample (no duplicates)
r = json.loads(sampler.sample_list([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], n=3, seed=42))
print(r["sample"]) # [7, 6, 5] — same every time with seed=42
# Weighted selection — 80% chance of "a"
r = json.loads(sampler.sample_weighted(["a", "b", "c"], [0.8, 0.1, 0.1], n=5, seed=1))
print(r["sample"]) # ["a", "a", "b", "a", "a"]
# Stratified sampling for balanced datasets
groups = {"positive": pos_examples, "negative": neg_examples}
r = json.loads(sampler.sample_stratified(groups, n_per_group=50, seed=42))
# Train/test split
r = json.loads(sampler.random_split(dataset, ratios=[0.8, 0.2], seed=42))
train, test = r["splits"]
# Reproducible random integers
r = json.loads(sampler.random_int(1, 100, n=5, seed=7))
print(r["values"]) # [43, 12, 78, 34, 91]WorkflowTool — lightweight workflow / pipeline runner for agent orchestration
workflow_define(name, steps, description="")— register a named workflow; steps:{name, fn, retries?, on_error?, default?, condition?}workflow_run(name, input=None, context={})— execute workflow; returns{output, steps, elapsed_ms, ok}step_define(name, source)— register custom step function:def step(value, ctx): ...workflow_list()/workflow_get(name)/workflow_delete(name)— manage workflowsworkflow_status()— execution history: total_runs, ok_runs, failed_runs, recentbuiltin_fns()— list built-ins: identity, upper, lower, strip, to_int, to_float, to_str, to_list, reverse, length, sum_list, sort, unique, flatten, noop- Supports: retries, on_error (fail/skip/default), conditional steps (truthy/falsy), shared context dict
from agent_friend import WorkflowTool
import json
wf = WorkflowTool()
# Define an ETL pipeline
wf.workflow_define("clean", steps=[
{"name": "strip", "fn": "strip"},
{"name": "upper", "fn": "upper"},
{"name": "to_list", "fn": "to_list"},
])
r = json.loads(wf.workflow_run("clean", input=" hello world "))
print(r["output"]) # ['H', 'E', 'L', 'L', 'O', ' ', 'W', 'O', 'R', 'L', 'D']
# Custom step with Python source
wf.step_define("double", "def step(value, ctx): return value * 2")
wf.workflow_define("math", steps=[
{"fn": "to_int"},
{"fn": "double"},
])
r = json.loads(wf.workflow_run("math", input="21"))
print(r["output"]) # 42
# on_error=skip keeps pipeline running on bad data
wf.workflow_define("safe", steps=[
{"fn": "to_int", "on_error": "skip"},
{"fn": "to_str"},
])
r = json.loads(wf.workflow_run("safe", input="not_a_number"))
print(r["ok"]) # True — pipeline completed with skipped stepAlertTool — threshold-based alerting and rule evaluation
alert_define(name, condition, threshold, severity="warning", cooldown_s=0)— register rule; conditions: gt/gte/lt/lte/eq/ne/between/outside/contains/not_contains/is_empty/is_truthyalert_evaluate(name, value, metadata={})— check value; returns{fired, severity, timestamp}alert_list()/alert_get(name)/alert_delete(name)— manage rulesalert_history(rule=None, severity=None, limit=50)— fired event logalert_clear(rule=None)/alert_stats()— clear history and aggregate counts- Severities: info / warning / error / critical; cooldown prevents duplicate alerts
from agent_friend import AlertTool
import json
alerts = AlertTool()
# Define threshold rules
alerts.alert_define("high_cpu", condition="gt", threshold=90.0, severity="critical", metric="cpu_pct")
alerts.alert_define("low_disk", condition="lte", threshold=5.0, severity="error", metric="disk_gb")
alerts.alert_define("error_log", condition="contains", threshold="ERROR", severity="warning")
# Evaluate incoming telemetry
r = json.loads(alerts.alert_evaluate("high_cpu", 95.0))
print(r["fired"], r["severity"]) # True critical
r = json.loads(alerts.alert_evaluate("high_cpu", 70.0))
print(r["fired"]) # False — below threshold
# Check log lines
r = json.loads(alerts.alert_evaluate("error_log", "2026-03-12 ERROR: timeout"))
print(r["fired"]) # True
# History of fired events
r = json.loads(alerts.alert_history(severity="critical", limit=10))
print(len(r["events"]))LockTool — named mutex-style locking to prevent concurrent operations
lock_acquire(name, owner="default", ttl_s=None, wait_s=0)— acquire lock;ttl_sauto-expires;wait_sblocking waitlock_release(name, owner)— release; only owner may release (uselock_expireto force)lock_try(name, owner, ttl_s=None)— non-blocking; returns{acquired, held_by?}immediatelylock_status(name)—{held, owner?, remaining_s?}lock_list()/lock_release_all(owner)/lock_expire(name)— manage and force-release lockslock_stats()— total_acquisitions, total_contentions, currently_held
from agent_friend import LockTool
import json
locks = LockTool()
# Acquire + release
r = json.loads(locks.lock_acquire("db_write", owner="worker-1", ttl_s=30))
print(r["acquired"]) # True
# Try without blocking
r = json.loads(locks.lock_try("db_write", owner="worker-2"))
print(r["acquired"], r.get("held_by")) # False worker-1
# Check status
r = json.loads(locks.lock_status("db_write"))
print(r["held"], r["remaining_s"]) # True ~29.9
locks.lock_release("db_write", owner="worker-1")
print(json.loads(locks.lock_status("db_write"))["held"]) # False
# Release all locks for a worker
locks.lock_acquire("a", owner="worker-3")
locks.lock_acquire("b", owner="worker-3")
r = json.loads(locks.lock_release_all("worker-3"))
print(r["released_count"]) # 2AuditTool — structured audit log for agent observability and tracing
audit_log(event_type, actor, resource, metadata, severity, outcome)— record event; returns{id, timestamp}audit_search(event_type, actor, resource, severity, outcome, after, before, text, limit)— filter logaudit_get(event_id)— retrieve single event by UUIDaudit_stats(after, before)— aggregate by_type, by_actor, by_resource, by_severity, by_outcomeaudit_export(event_type, after, before)— JSON lines exportaudit_clear(before=None)/audit_types()/audit_timeline(bucket="hour"|"day")- Severities: info/warning/error/critical. Outcomes: success/failure/denied/unknown.
from agent_friend import AuditTool
import json
audit = AuditTool()
# Log events
audit.audit_log("user.login", actor="alice", resource="auth", metadata={"ip": "1.1.1.1"})
audit.audit_log("file.delete", actor="bob", resource="doc.txt", severity="warning")
audit.audit_log("api.call", actor="alice", resource="/v1/data")
audit.audit_log("user.login", actor="eve", resource="auth", severity="error", outcome="failure", metadata={"ip": "9.9.9.9"})
# Search
r = json.loads(audit.audit_search(actor="alice"))
print("Alice events:", r["total"]) # 2
r = json.loads(audit.audit_search(text="9.9.9.9"))
print("Suspicious IP:", r["events"][0]["actor"]) # eve
# Aggregate stats
r = json.loads(audit.audit_stats())
print("By type:", r["by_type"]) # {"user.login": 2, "file.delete": 1, ...}
# Timeline
r = json.loads(audit.audit_timeline(bucket="hour"))
print("Buckets:", len(r["buckets"]))BatchTool — map/filter/reduce/partition lists with registered or built-in functions
fn_define(name, source)— registerdef fn(item): ...(ordef fn(acc, item): ...for reducers withis_reducer=True)batch_map(items, fn, on_error="null"|"skip"|"raise")— apply fn to each item; returns{results, ok, errors}batch_filter(items, fn)— keep items wherefn(item)is truthy; returns{results, kept, rejected}batch_reduce(items, fn, initial=None)— fold with accumulator; built-ins: sum/product/max/min/concatbatch_partition(items, fn)— split into{passing, failing}listsbatch_chunk(items, size)— split into equal-size chunksbatch_zip(keys, *lists)— zip lists into list of dicts- Built-in fns: identity/str/int/float/upper/lower/strip/len/bool/not/negate/abs/double/square
from agent_friend import BatchTool
import json
batch = BatchTool()
# Built-in map
r = json.loads(batch.batch_map([1, 2, 3, 4, 5], fn="square"))
print(r["results"]) # [1, 4, 9, 16, 25]
# Built-in filter
r = json.loads(batch.batch_filter(["", "hello", "", "world"], fn="is_truthy"))
print(r["results"]) # ["hello", "world"]
# Custom function
batch.fn_define("add_tax", "def fn(item): return round(item * 1.08, 2)")
r = json.loads(batch.batch_map([10.0, 20.0, 50.0], fn="add_tax"))
print(r["results"]) # [10.8, 21.6, 54.0]
# Reduce
r = json.loads(batch.batch_reduce([1, 2, 3, 4, 5], fn="sum"))
print(r["result"]) # 15
# Partition
r = json.loads(batch.batch_partition([1, -2, 3, -4, 0], fn="is_truthy"))
print(r["passing"], r["failing"]) # [1, 3] [-2, -4, 0]
# Chunk into batches of 3
r = json.loads(batch.batch_chunk(list(range(10)), size=3))
print(r["chunks"]) # [[0,1,2], [3,4,5], [6,7,8], [9]]TransformTool — structured data transformation: pick, omit, rename, coerce, flatten, unflatten, batch records, deep merge
transform_pick(record, keys)— extract only specified keys; returns{result, picked, missing}transform_omit(record, keys)— remove specified keys; returns{result, omitted}transform_rename(record, mapping)— rename keys via{old: new}; unmapped keys kept; returns{result, renamed}transform_coerce(record, types)— type-coerce values via{key: "str"|"int"|"float"|"bool"|"list"|"dict"|"null"}; returns{result, coerced, errors}transform_flatten(record, sep=".")— nested dict → dot-notation keys; arrays indexed askey.0,key.1; returns{result, key_count}transform_unflatten(record, sep=".")— dot-notation keys → nested dict; returns{result}transform_map_records(records, pick, omit, rename, coerce, add)— apply pick→omit→rename→coerce→add to each record in a list; returns{results, count, errors}transform_merge(*dicts)— deep merge; later dicts win on conflict; returns{result, merged_from}
from agent_friend import TransformTool
import json
t = TransformTool()
# Pick & rename
r = json.loads(t.transform_pick({"a": 1, "b": 2, "c": 3}, keys=["a", "c"]))
print(r["result"]) # {"a": 1, "c": 3}
r = json.loads(t.transform_rename({"name": "alice", "age": 30}, mapping={"name": "full_name"}))
print(r["result"]) # {"full_name": "alice", "age": 30}
# Flatten & unflatten
r = json.loads(t.transform_flatten({"user": {"name": "alice", "scores": [90, 85]}}))
print(r["result"]) # {"user.name": "alice", "user.scores.0": 90, "user.scores.1": 85}
# Batch transform records
records = [{"n": "alice", "s": "95"}, {"n": "bob", "s": "87"}]
r = json.loads(t.transform_map_records(records, rename={"n": "name", "s": "score"}, coerce={"score": "int"}))
print(r["results"]) # [{"name": "alice", "score": 95}, {"name": "bob", "score": 87}]
# Deep merge
r = json.loads(t.transform_merge({"x": {"a": 1}}, {"x": {"b": 2}, "y": 3}))
print(r["result"]) # {"x": {"a": 1, "b": 2}, "y": 3}FormatTool — human-readable formatting for numbers, sizes, durations, and text
format_bytes(value, decimals=1, binary=False)—1234567→"1.2 MB"(orKiB/MiBwithbinary=True)format_duration(seconds, verbose=False)—3661→"1h 1m 1s"or"1 hour 1 minute 1 second"format_number(value, decimals=2)—1234567.89→"1,234,567.89"format_percent(value, decimals=1)—0.8734→"87.3%"(ratios auto-scaled)format_currency(value, currency="USD")—1234.5→"$1,234.50"(USD/EUR/GBP/JPY/...)format_ordinal(n)—1→"1st",11→"11th",21→"21st"format_plural(count, singular, plural=None)—format_plural(3, "item")→"3 items"format_truncate(text, max_length=80, suffix="…")— truncate long strings with ellipsisformat_pad(text, width, align="left")— left/right/center pad a stringformat_table(data, columns=None)— render a JSON array of dicts as a plain-text table
from agent_friend import FormatTool
f = FormatTool()
f.format_bytes(1_234_567) # "1.2 MB"
f.format_bytes(1024, binary=True) # "1.0 KiB"
f.format_duration(3_661) # "1h 1m 1s"
f.format_duration(90, verbose=True) # "1 minute 30 seconds"
f.format_number(1_234_567.89) # "1,234,567.89"
f.format_percent(0.8734) # "87.3%"
f.format_currency(1234.5, "EUR") # "€1,234.50"
f.format_ordinal(21) # "21st"
f.format_plural(3, "test") # "3 tests"
f.format_truncate("a very long...", max_length=20) # "a very long...…"
import json
data = json.dumps([{"name": "Alice", "score": 90}, {"name": "Bob", "score": 75}])
print(f.format_table(data))
# +-------+-------+
# | name | score |
# +-------+-------+
# | Alice | 90 |
# | Bob | 75 |
# +-------+-------+GraphTool — directed graphs: dependency tracking, topological sort, cycle detection
graph_create(name)— create a named directed graphgraph_add_node(name, node, meta={})— add a node with optional metadatagraph_add_edge(name, src, dst)— add directed edge src → dst (auto-creates nodes)graph_remove_edge(name, src, dst)/graph_remove_node(name, node)— remove elementsgraph_topo_sort(name)— topological order (Kahn's algorithm) or error if cyclicgraph_has_cycle(name)—{has_cycle: true/false}graph_path(name, src, dst)— BFS shortest path →{reachable, path, length}graph_ancestors(name, node)— all nodes that can reach nodegraph_descendants(name, node)— all nodes reachable from nodegraph_nodes(name)/graph_edges(name)/graph_status(name)— inspection- Multiple named graphs. Zero dependencies.
from agent_friend import GraphTool
import json
g = GraphTool()
g.graph_create("deps")
# Model a Python package dependency graph
g.graph_add_edge("deps", "django", "sqlparse")
g.graph_add_edge("deps", "django", "asgiref")
g.graph_add_edge("deps", "myapp", "django")
g.graph_add_edge("deps", "myapp", "celery")
g.graph_add_edge("deps", "celery", "kombu")
# What order should I install packages?
order = json.loads(g.graph_topo_sort("deps"))
# ["asgiref", "celery", "kombu", "sqlparse", "django", "myapp"] (valid install order)
# What does myapp transitively depend on?
json.loads(g.graph_descendants("deps", "myapp"))
# ["asgiref", "celery", "django", "kombu", "sqlparse"]
# Is there a path from myapp to sqlparse?
json.loads(g.graph_path("deps", "myapp", "sqlparse"))
# {"reachable": True, "path": ["myapp", "django", "sqlparse"], "length": 2}
# Detect circular deps
json.loads(g.graph_has_cycle("deps")) # {"has_cycle": False}MapReduceTool — map, filter, sort, group, and reduce JSON arrays without CodeTool
mr_map(data, field, transform=None)— extract a field from every item (dot-notation OK)mr_filter(data, field, operator, value)— keep items matching a predicate (eq/ne/gt/lt/gte/lte/contains/startswith/endswith/exists)mr_reduce(data, field, operation)— aggregate to a scalar (count, sum, avg, min, max, first, last, join, unique)mr_sort(data, field, reverse=False)— sort by fieldmr_group(data, field)— group items by field value →{key: [items]}mr_flatten(data)— flatten a list of listsmr_zip(left, right)— zip two arrays →[{left, right}, ...]mr_pick(data, fields)— keep only specified keys in each dictmr_slice(data, start, end)— slice the list- Chainable with JSONTool, HTTPTool, TableTool. All inputs/outputs are JSON strings.
from agent_friend import MapReduceTool
import json
mr = MapReduceTool()
data = json.dumps([
{"name": "Alice", "score": 90, "dept": "eng"},
{"name": "Bob", "score": 75, "dept": "mkt"},
{"name": "Charlie", "score": 90, "dept": "eng"},
{"name": "Diana", "score": 55, "dept": "mkt"},
])
# Extract all names
mr.mr_map(data, "name") # '["Alice", "Bob", "Charlie", "Diana"]'
# Keep scores >= 80
high = mr.mr_filter(data, "score", "gte", 80) # Alice + Charlie
# Average score
mr.mr_reduce(data, "score", "avg") # '77.5'
# Sort by score descending
mr.mr_sort(data, "score", reverse=True)
# Group by department
groups = json.loads(mr.mr_group(data, "dept"))
# {"eng": [...], "mkt": [...]}
# Chain: filter then reduce
top = mr.mr_filter(data, "score", "gte", 80)
mr.mr_reduce(top, "name", "join", separator=" & ") # "Alice & Charlie"StateMachineTool — finite state machines for agent workflow control
sm_create(name, initial, states=[])— define a named machine with an initial statesm_add_transition(name, from_state, to_state)— allow a specific state transitionsm_trigger(name, to_state)— attempt a transition →{ok, from, to}or{ok: false, error}sm_state(name)— current state + list of allowed next statessm_can(name, to_state)— check if a transition is allowed without executing itsm_history(name, n=20)— last N transitions as[{seq, from, to, timestamp}]sm_reset(name, state=None)— reset to initial state (or specified state); clears historysm_status(name)— full snapshot: states, current, allowed_next, transition_countsm_list()/sm_delete(name)— manage machines- Multiple named machines per instance; only defined transitions are ever permitted
from agent_friend import StateMachineTool
sm = StateMachineTool()
# Define an order workflow
sm.sm_create("order", initial="pending",
states=["pending", "paid", "shipped", "delivered", "cancelled"])
sm.sm_add_transition("order", "pending", "paid")
sm.sm_add_transition("order", "pending", "cancelled")
sm.sm_add_transition("order", "paid", "shipped")
sm.sm_add_transition("order", "shipped", "delivered")
# Execute transitions
sm.sm_trigger("order", "paid") # {"ok": true, "from": "pending", "to": "paid"}
sm.sm_trigger("order", "delivered") # {"ok": false, "error": "No transition from 'paid' to 'delivered'..."}
sm.sm_trigger("order", "shipped") # ok
sm.sm_trigger("order", "delivered") # ok
# Inspect
sm.sm_state("order") # {"state": "delivered", "allowed_next": []}
sm.sm_history("order", n=3) # [{seq: 1, from: "pending", to: "paid", timestamp: ...}, ...]
# Guard before acting
if json.loads(sm.sm_can("order", "shipped"))["allowed"]:
# safe to proceed
sm.sm_trigger("order", "shipped")Custom Tools via @tool — register any Python function as an agent tool
- Reads type hints to auto-generate the JSON schema
- Optional parameters (with defaults or
Optional[X]) are not required - The decorated function remains callable normally
- Mix with built-in tools:
Friend(tools=["search", my_fn])
from agent_friend import Friend, tool
@tool
def stock_price(ticker: str) -> str:
"""Get current stock price for a ticker symbol."""
# call your actual API here
return f"{ticker}: $182.50"
@tool(name="convert_temp", description="Convert Celsius to Fahrenheit")
def to_fahrenheit(celsius: float) -> str:
return f"{celsius * 9/5 + 32:.1f}°F"
friend = Friend(tools=["search", stock_price, to_fahrenheit])
friend.chat("What's AAPL stock price and convert 22°C to Fahrenheit?")
# Functions still work normally
print(stock_price("AAPL")) # "AAPL: $182.50"
print(to_fahrenheit(22.0)) # "71.6°F"# System TTS (zero config, works everywhere)
friend = Friend(tools=["voice"])
friend.chat("Read this article summary aloud")
# Neural TTS via HTTP server
from agent_friend import VoiceTool
friend = Friend(tools=[VoiceTool(tts_url="http://your-tts-server:8081")])# Agent with a real database — create tables, insert rows, run queries
from agent_friend import DatabaseTool
friend = Friend(tools=["database"])
friend.chat("Create a tasks table with title and done columns, then add 3 tasks")
friend.chat("Show me all incomplete tasks")
# Python API for scripting
db = DatabaseTool()
db.create_table("notes", "id INTEGER PRIMARY KEY, content TEXT, tag TEXT")
db.insert("notes", {"content": "Ship agent-friend v0.8", "tag": "work"})
rows = db.query("SELECT * FROM notes WHERE tag = ?", ["work"])