Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions engram/sdk/crewai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""
Engram — CrewAI tool adapter

Exposes an Engram miner as CrewAI tools so any CrewAI agent can use the
decentralized subnet as long-term semantic memory: store facts during a run
and retrieve them later by meaning.

Mirrors the lazy-import pattern of the LangChain and LlamaIndex adapters
(`engram/sdk/langchain.py`, `engram/sdk/llama_index.py`) so importing this
module never hard-requires CrewAI.

Install:
pip install crewai engram-subnet

Usage:
from crewai import Agent, Task, Crew
from engram.sdk.crewai import engram_memory_tools

store_tool, search_tool = engram_memory_tools(miner_url="http://127.0.0.1:8091")

researcher = Agent(
role="Researcher",
goal="Remember findings and recall them across tasks",
backstory="Uses Engram as durable cross-task memory.",
tools=[store_tool, search_tool],
)

You can also construct the tools directly:

from engram.sdk.crewai import EngramStoreTool, EngramSearchTool

store_tool = EngramStoreTool(miner_url="http://127.0.0.1:8091")
search_tool = EngramSearchTool(miner_url="http://127.0.0.1:8091", top_k=5)
"""

from __future__ import annotations

from typing import Any

from engram.sdk.client import EngramClient
from engram.sdk.exceptions import EngramError

try:
from crewai.tools import BaseTool
from pydantic import BaseModel, Field, PrivateAttr

_CREWAI_AVAILABLE = True
except ImportError: # pragma: no cover - exercised only without CrewAI installed
_CREWAI_AVAILABLE = False

# Minimal shims so the module imports cleanly and gives a helpful error
# at instantiation time, exactly like the LangChain/LlamaIndex adapters.
class BaseTool: # type: ignore[no-redef]
pass

class BaseModel: # type: ignore[no-redef]
pass

def Field(*args: Any, **kwargs: Any) -> Any: # type: ignore[no-redef]
return None

def PrivateAttr(*args: Any, **kwargs: Any) -> Any: # type: ignore[no-redef]
return None


DEFAULT_MINER_URL = "http://127.0.0.1:8091"


def _require_crewai() -> None:
if not _CREWAI_AVAILABLE:
raise ImportError(
"crewai is required for the Engram CrewAI tools. "
"Install it with: pip install crewai"
)


# ── Argument schemas ──────────────────────────────────────────────────────────


class _StoreArgs(BaseModel):
text: str = Field(..., description="The text/fact to store in Engram memory.")
source: str = Field(
default="crewai",
description="Optional tag recorded in the memory's metadata.",
)


class _SearchArgs(BaseModel):
query: str = Field(..., description="What to recall, described in natural language.")


# ── Tools ───────────────────────────────────────────────────────────────────


class EngramStoreTool(BaseTool):
"""CrewAI tool that stores a piece of text in an Engram miner.

The miner returns a content ID (CID); the same text+metadata is
content-addressed, so storing it twice yields the same CID.
"""

name: str = "engram_store"
description: str = (
"Store a fact or piece of text in long-term Engram memory so it can be "
"recalled later by meaning. Input the text to remember. "
"Returns the content ID (CID) of the stored memory."
)
args_schema: type = _StoreArgs

miner_url: str = DEFAULT_MINER_URL
timeout: float = 30.0

_client: EngramClient = PrivateAttr()

def __init__(self, **data: Any) -> None:
_require_crewai()
super().__init__(**data)
self._client = EngramClient(miner_url=self.miner_url, timeout=self.timeout)

def _run(self, text: str, source: str = "crewai") -> str:
try:
cid = self._client.ingest(text, metadata={"text": text, "source": source})
return f"Stored memory. CID: {cid}"
except EngramError as exc:
return f"Could not store memory: {exc}"


class EngramSearchTool(BaseTool):
"""CrewAI tool that retrieves the most relevant memories from an Engram miner."""

name: str = "engram_search"
description: str = (
"Search long-term Engram memory for facts relevant to a natural-language "
"query. Returns the top matching memories with similarity scores."
)
args_schema: type = _SearchArgs

miner_url: str = DEFAULT_MINER_URL
timeout: float = 30.0
top_k: int = 4

_client: EngramClient = PrivateAttr()

def __init__(self, **data: Any) -> None:
_require_crewai()
super().__init__(**data)
self._client = EngramClient(miner_url=self.miner_url, timeout=self.timeout)

def _run(self, query: str) -> str:
try:
results = self._client.query(query, top_k=self.top_k)
except EngramError as exc:
return f"Search failed: {exc}"

if not results:
return "No relevant memories found."

lines = []
for r in results:
meta = dict(r.get("metadata") or {})
content = meta.get("text", r.get("cid", ""))
score = float(r.get("score", 0.0))
lines.append(f"[{score:.4f}] {content}")
return "\n".join(lines)


def engram_memory_tools(
miner_url: str = DEFAULT_MINER_URL,
timeout: float = 30.0,
top_k: int = 4,
) -> tuple[EngramStoreTool, EngramSearchTool]:
"""Return a (store_tool, search_tool) pair wired to the same miner.

Convenience for the common case of giving a CrewAI agent both
write and read access to Engram memory.
"""
_require_crewai()
store = EngramStoreTool(miner_url=miner_url, timeout=timeout)
search = EngramSearchTool(miner_url=miner_url, timeout=timeout, top_k=top_k)
return store, search


__all__ = [
"EngramStoreTool",
"EngramSearchTool",
"engram_memory_tools",
]
90 changes: 90 additions & 0 deletions examples/crewai_agent_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""
Example: give a CrewAI agent durable memory backed by Engram.

This shows the SDK gap the LangChain and LlamaIndex adapters already fill for
their ecosystems: a CrewAI agent that can *store* findings during one task and
*recall* them by meaning in a later task, using an Engram miner as the backend.

Prerequisites:
- A miner running locally: python -m neurons.miner (default http://127.0.0.1:8091)
- pip install crewai engram-subnet

Run:
python examples/crewai_agent_memory.py

If you just want to see the tools work without the full Crew/LLM loop, the
bottom of this file has a `--tools-only` path that calls the tools directly.
"""

from __future__ import annotations

import sys

from engram.sdk.crewai import engram_memory_tools

MINER_URL = "http://127.0.0.1:8091"


def tools_only_demo() -> None:
"""Exercise the tools directly — no LLM or API key required."""
store_tool, search_tool = engram_memory_tools(miner_url=MINER_URL)

print("Storing two memories...")
print(" ", store_tool._run("Engram stores embeddings on a Bittensor subnet."))
print(" ", store_tool._run("Retrieval uses the miner's HNSW index."))

print("\nRecalling 'how does Engram find similar items?'")
print(search_tool._run("how does Engram find similar items?"))


def full_crew_demo() -> None:
"""Wire the tools into a real CrewAI Agent + Crew."""
from crewai import Agent, Crew, Task

store_tool, search_tool = engram_memory_tools(miner_url=MINER_URL, top_k=5)

archivist = Agent(
role="Archivist",
goal="Persist important facts to Engram memory.",
backstory="You never forget — you write everything worth keeping to Engram.",
tools=[store_tool],
verbose=True,
)
analyst = Agent(
role="Analyst",
goal="Answer questions using only what is in Engram memory.",
backstory="You recall stored facts by meaning before answering.",
tools=[search_tool],
verbose=True,
)

remember = Task(
description=(
"Store these facts in Engram memory: "
"(1) Engram is a decentralized vector database on Bittensor. "
"(2) Private namespaces use X25519 + sr25519 signed requests."
),
expected_output="Confirmation that both facts were stored, with their CIDs.",
agent=archivist,
)
recall = Task(
description="Using Engram memory, answer: how are private namespaces secured?",
expected_output="A short answer grounded in the retrieved memory.",
agent=analyst,
)

crew = Crew(agents=[archivist, analyst], tasks=[remember, recall], verbose=True)
result = crew.kickoff()
print(result)


if __name__ == "__main__":
if "--tools-only" in sys.argv:
tools_only_demo()
else:
try:
full_crew_demo()
except Exception as exc: # pragma: no cover - depends on local LLM/keys
print(f"Full crew run needs CrewAI + an LLM configured: {exc}")
print("Falling back to the tools-only demo (needs a running miner):\n")
tools_only_demo()
92 changes: 92 additions & 0 deletions scripts/test_crewai_sdk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
CrewAI adapter test — engram/sdk/crewai.py

Offline-safe: requires neither a live miner nor an LLM. Mirrors the harness
style of scripts/test_sdk.py.

Behavior covered:
- Module imports cleanly whether or not CrewAI is installed.
- Without CrewAI: constructing a tool raises a helpful ImportError.
- With CrewAI: tools construct, expose correct name/description/args_schema,
and degrade gracefully (return an error string, not raise) when the miner
is offline.

Run:
python scripts/test_crewai_sdk.py
"""

import sys

sys.path.insert(0, ".")

from engram.sdk import crewai as engram_crewai # noqa: E402

DEAD_URL = "http://127.0.0.1:19999" # nothing running here

results: list[tuple[str, bool]] = []
PASS = "✓"
FAIL = "✗"


def check(label: str, condition: bool) -> None:
print(f" {PASS if condition else FAIL} {label}")
results.append((label, bool(condition)))


def section(title: str) -> None:
print(f"\n[{title}]")


def main() -> int:
section("Import + public surface")
check("module imports", engram_crewai is not None)
check(
"exports the expected names",
set(engram_crewai.__all__)
== {"EngramStoreTool", "EngramSearchTool", "engram_memory_tools"},
)

if not engram_crewai._CREWAI_AVAILABLE:
section("CrewAI not installed — graceful error path")
try:
engram_crewai.EngramStoreTool(miner_url=DEAD_URL)
check("EngramStoreTool() raises ImportError without CrewAI", False)
except ImportError:
check("EngramStoreTool() raises ImportError without CrewAI", True)
except Exception:
check("EngramStoreTool() raises ImportError without CrewAI", False)
else:
section("CrewAI installed — tool construction")
store = engram_crewai.EngramStoreTool(miner_url=DEAD_URL)
search = engram_crewai.EngramSearchTool(miner_url=DEAD_URL, top_k=3)
check("store tool name is 'engram_store'", store.name == "engram_store")
check("search tool name is 'engram_search'", search.name == "engram_search")
check("store has a non-empty description", bool(store.description))
check("search args_schema is set", search.args_schema is not None)

factory = engram_crewai.engram_memory_tools(miner_url=DEAD_URL)
check("engram_memory_tools() returns a pair", len(factory) == 2)

section("Offline miner — graceful degradation (no exceptions)")
out_store = store._run("a fact to remember")
check("store._run returns a string", isinstance(out_store, str))
check("store._run reports failure gracefully", "Could not store" in out_store)

out_search = search._run("recall something")
check("search._run returns a string", isinstance(out_search, str))
check("search._run reports failure gracefully", "Search failed" in out_search)

failed = [label for label, ok in results if not ok]
print(f"\n{'='*52}")
print(f" {len(results) - len(failed)}/{len(results)} checks passed")
if failed:
print(" FAILED:")
for label in failed:
print(f" - {label}")
return 1
print(" ALL GREEN")
return 0


if __name__ == "__main__":
raise SystemExit(main())