Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,97 @@ Built-in memory backends:
Bring your own by subclassing `BaseMemoryBackend` and pointing
`MEMORY_BACKEND` at the dotted path.

## Smart indexing (optional)

The classic indexer joins selected fields with whitespace. That works, but the
embedding model loses the *role* of each value: a category name and a body
paragraph become indistinguishable tokens. The optional `SmartIndexer` builds
structured documents with labelled sections so the embedder sees something
closer to:

```
Title: Pixel 8
Description:
Camera-first Android phone with Tensor G3.
Category: Phones
```

Enable it from settings — your existing index, signals, and management command
keep working because the resolver and `get_indexer()` factory pick the new
implementation transparently:

```python
GRAPH_SEARCH = {
# ... your existing config ...
"SMART_INDEXING": {
"ENABLED": True,
# Optional per-model templates; the indexer falls back to a heuristic
# template based on your MODELS config when one is missing.
"TEMPLATES": {
"shop.Product": {
"title_field": "name",
"sections": [
{"label": "Description", "field": "description", "multiline": True},
{"label": "Category", "field": "category__name"},
],
}
},
},
}
```

The original deterministic text is always appended as a safety net so smart
indexing never produces *less* searchable content than the legacy pipeline.
Disable the flag to fall back instantly — no reindex required to switch back.

## Streaming search endpoint (optional)

Long-running pipelines (query expansion, vector search, reranking) can stream
lifecycle events to the client so users see progress instead of staring at a
spinner. Two transports are supported:

- `ndjson` (default): one JSON object per line, ideal for `fetch` +
`ReadableStream` and CLI tools like `jq`.
- `sse`: Server-Sent Events for `EventSource` clients.

Enable from settings:

```python
GRAPH_SEARCH = {
# ... your existing config ...
"STREAMING": {
"ENABLED": True,
"FORMAT": "ndjson", # or "sse"
"INCLUDE_INTERNAL_EVENTS": True,
},
}
```

The endpoint is registered at `/<API_URL_PREFIX>/stream/` (default
`/api/search/stream/`) and returns HTTP 404 when disabled, so it is safe to
leave the URL config untouched.

Quick test:

```bash
curl -N "http://localhost:8000/api/search/stream/?q=phone"
```

Example event sequence (NDJSON):

```json
{"type": "query_received", "query": "phone"}
{"type": "vector_search_completed", "candidate_count": 12}
{"type": "completed", "total": 5}
{"type": "results", "results": [...], "total": 5}
{"type": "end"}
```

Under the hood the view subscribes a `queue.Queue` to a per-request
`EventHub`, runs the search in a worker thread, and yields events as soon as
the nodes publish them. The hub also powers structured logging and any
custom subscribers you register from your own apps.

## Comparison

| Feature | django-graph-search | Haystack | django-elasticsearch-dsl |
Expand Down
5 changes: 3 additions & 2 deletions src/django_graph_search/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from .apps import DjangoGraphSearchConfig
from .indexer import Indexer
from .indexer import Indexer, get_indexer
from .searcher import Searcher
from .settings import get_settings

Expand All @@ -17,7 +17,7 @@ def index(instance) -> None:
model_cfg = next((cfg for cfg in config.models if cfg.model == instance._meta.label), None)
if model_cfg is None:
return
Indexer(config=config).index_instance(instance, model_cfg)
get_indexer(config=config).index_instance(instance, model_cfg)


def get_similar(instance, limit: int | None = None):
Expand All @@ -27,6 +27,7 @@ def get_similar(instance, limit: int | None = None):
__all__ = [
"DjangoGraphSearchConfig",
"Indexer",
"get_indexer",
"Searcher",
"search",
"index",
Expand Down
75 changes: 75 additions & 0 deletions src/django_graph_search/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Lightweight event hook system used by the LangGraph pipelines.

The hub is intentionally tiny: it stores a list of subscribers and broadcasts
events via plain function calls. It does not try to be a full pub/sub system —
the intent is to give callers (HTTP endpoints, structured logging, future
streaming integrations) a single place to plug in.

Events are dictionaries with a ``type`` key and optional payload. Examples:

* ``{\"type\": \"query_received\", \"query\": \"phone\"}``
* ``{\"type\": \"vector_search_started\", \"queries\": [\"phone\", \"phones\"]}``
* ``{\"type\": \"completed\", \"total\": 12}``
"""
from __future__ import annotations

import logging
import threading
from typing import Any, Callable, Dict, List, Optional

log = logging.getLogger(__name__)

EventCallback = Callable[[Dict[str, Any]], None]


class EventHub:
"""Per-instance hub. Use :func:`get_default_hub` for the global one."""

def __init__(self) -> None:
self._subscribers: List[EventCallback] = []
self._lock = threading.Lock()

def subscribe(self, callback: EventCallback) -> Callable[[], None]:
"""Register ``callback`` and return a function that removes it."""
with self._lock:
self._subscribers.append(callback)

def _unsubscribe() -> None:
with self._lock:
try:
self._subscribers.remove(callback)
except ValueError: # pragma: no cover - already removed.
pass

return _unsubscribe

def publish(self, event: Dict[str, Any]) -> None:
"""Broadcast ``event`` to subscribers. Errors are logged, never raised."""
with self._lock:
subs = list(self._subscribers)
for cb in subs:
try:
cb(event)
except Exception as exc: # noqa: BLE001
log.warning("Event subscriber raised: %s", exc)


_default_hub: Optional[EventHub] = None
_default_lock = threading.Lock()


def get_default_hub() -> EventHub:
global _default_hub
if _default_hub is None:
with _default_lock:
if _default_hub is None:
_default_hub = EventHub()
return _default_hub


def reset_default_hub() -> None: # pragma: no cover - testing helper
global _default_hub
_default_hub = EventHub()


__all__ = ["EventHub", "EventCallback", "get_default_hub", "reset_default_hub"]
26 changes: 26 additions & 0 deletions src/django_graph_search/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,32 @@ def make_doc_id(model_label: str, pk: object) -> str:
return f"{model_label}:{pk}"


def get_indexer(
config: Optional[GraphSearchConfig] = None,
**kwargs,
):
"""Return the configured indexer instance.

By default returns :class:`Indexer`. When
``GRAPH_SEARCH["SMART_INDEXING"]["ENABLED"]`` is ``True`` the dotted-path
in ``SMART_INDEXING.INDEXER`` is loaded instead — typically
:class:`~django_graph_search.langgraph_indexer.SmartIndexer`.

The factory keeps the public surface stable: callers do not need to know
which implementation they are using because both expose the same methods
(``index_queryset``, ``index_instance``, ``delete_instance``,
``rebuild_all``).
"""
from .settings import get_settings
from django.utils.module_loading import import_string

cfg = config or get_settings()
if not cfg.smart_indexing.enabled:
return Indexer(config=cfg, **kwargs)
indexer_cls = import_string(cfg.smart_indexing.indexer)
return indexer_cls(config=cfg, **kwargs)


class Indexer(ComponentMixin):
def __init__(
self,
Expand Down
71 changes: 63 additions & 8 deletions src/django_graph_search/langgraph_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import logging
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, TypedDict

from .events import EventHub
from .llm.base import BaseLLMBackend, RerankCandidate
from .settings import GraphSearchConfig

Expand Down Expand Up @@ -232,34 +233,65 @@ def postprocess_results_node(state: SearchState) -> SearchState:
# ---------------------------------------------------------------------------


def build_search_graph(config: GraphSearchConfig, *, embedding_backend, vector_store, llm: BaseLLMBackend):
def build_search_graph(
config: GraphSearchConfig,
*,
embedding_backend,
vector_store,
llm: BaseLLMBackend,
event_hub: Optional[EventHub] = None,
):
"""Build and compile the search graph.

When the ``langgraph`` package is available we return a compiled
LangGraph ``StateGraph``. Otherwise we return :class:`_FallbackGraph` so
the rest of the code stays identical.

Pass ``event_hub`` to receive lifecycle events (``query_received``,
``query_expanded``, ``vector_search_completed``, ``rerank_completed``,
``completed``) — the same hub powers the streaming HTTP endpoint.
"""
try:
from langgraph.graph import END, StateGraph # type: ignore
except Exception: # pragma: no cover - exercised when langgraph absent.
return _FallbackGraph(config=config, embedding_backend=embedding_backend,
vector_store=vector_store, llm=llm)
return _FallbackGraph(
config=config,
embedding_backend=embedding_backend,
vector_store=vector_store,
llm=llm,
event_hub=event_hub,
)

def _wrap(name: str, fn):
if event_hub is None:
return fn

def _wrapped(s):
event_hub.publish({"type": f"{name}_started", "query": s.get("normalized_query") or s.get("query")})
out = fn(s)
event_hub.publish({
"type": f"{name}_completed",
"candidate_count": len(out.get("merged_results") or out.get("raw_results") or []),
})
return out

return _wrapped

graph: Any = StateGraph(dict)
graph.add_node("analyze_query", lambda s: analyze_query_node(s, config=config))
graph.add_node("analyze_query", _wrap("analyze_query", lambda s: analyze_query_node(s, config=config)))
graph.add_node(
"expand_query",
lambda s: expand_query_node(s, config=config, llm=llm),
_wrap("expand_query", lambda s: expand_query_node(s, config=config, llm=llm)),
)
graph.add_node(
"vector_search",
lambda s: vector_search_node(s, embedding_backend=embedding_backend, vector_store=vector_store),
_wrap("vector_search", lambda s: vector_search_node(s, embedding_backend=embedding_backend, vector_store=vector_store)),
)
graph.add_node(
"rerank_results",
lambda s: rerank_results_node(s, config=config, llm=llm),
_wrap("rerank_results", lambda s: rerank_results_node(s, config=config, llm=llm)),
)
graph.add_node("postprocess_results", lambda s: postprocess_results_node(s))
graph.add_node("postprocess_results", _wrap("postprocess_results", lambda s: postprocess_results_node(s)))

graph.set_entry_point("analyze_query")
graph.add_conditional_edges(
Expand Down Expand Up @@ -295,24 +327,47 @@ def __init__(
embedding_backend,
vector_store,
llm: BaseLLMBackend,
event_hub: Optional[EventHub] = None,
) -> None:
self.config = config
self.embedding_backend = embedding_backend
self.vector_store = vector_store
self.llm = llm
self.event_hub = event_hub

def _emit(self, event: Dict[str, Any]) -> None:
if self.event_hub is not None:
self.event_hub.publish(event)

def invoke(self, state: SearchState) -> SearchState:
self._emit({"type": "query_received", "query": state.get("query") or ""})
state = analyze_query_node(state, config=self.config)
if self.config.langgraph.query_expansion:
state = expand_query_node(state, config=self.config, llm=self.llm)
self._emit({
"type": "query_expanded",
"queries": list(state.get("expanded_queries") or []),
})
state = vector_search_node(
state,
embedding_backend=self.embedding_backend,
vector_store=self.vector_store,
)
self._emit({
"type": "vector_search_completed",
"candidate_count": len(state.get("merged_results") or []),
})
if self.config.langgraph.reranking:
state = rerank_results_node(state, config=self.config, llm=self.llm)
self._emit({
"type": "rerank_completed",
"candidate_count": len(state.get("reranked_results") or []),
})
state = postprocess_results_node(state)
self._emit({
"type": "completed",
"total": len(state.get("final_results") or []),
})
return state


Expand Down
Loading
Loading