Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions backend/app/agent/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ async def _get_or_create_user(channel: str, sender_id: str) -> User:
return user

# Reuse the sole existing user (single-tenant OSS) so sessions from
# every channel are visible in the dashboard.
# every channel are visible in the dashboard. Skip this in
# multi-tenant (premium) mode to avoid linking a new sender's
# messages to an existing user's account.
all_users = db.query(User).all()
if len(all_users) == 1:
if len(all_users) == 1 and not settings.premium_plugin:
user = all_users[0]
db.add(ChannelRoute(user_id=user.id, channel=channel, channel_identifier=sender_id))
user.channel_identifier = sender_id
Expand Down
24 changes: 22 additions & 2 deletions backend/app/agent/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,25 @@ async def run_pipeline(ctx: PipelineContext, steps: list[PipelineStep]) -> Pipel
persist_outbound_step,
]

# Premium (or other plugins) can register a custom pipeline via
# ``set_pipeline_override()`` to inject extra steps (e.g. quota checks).
_pipeline_override: list[PipelineStep] | None = None


def set_pipeline_override(pipeline: list[PipelineStep]) -> None:
"""Register a custom pipeline that replaces ``DEFAULT_PIPELINE``.

Called by the premium plugin at import time to inject quota-check
and usage-tracking steps into the agent pipeline.
"""
global _pipeline_override
_pipeline_override = pipeline


def get_active_pipeline() -> list[PipelineStep]:
"""Return the currently active pipeline (override or default)."""
return _pipeline_override if _pipeline_override is not None else DEFAULT_PIPELINE


# ---------------------------------------------------------------------------
# Orchestrator
Expand All @@ -476,7 +495,8 @@ async def handle_inbound_message(

Orchestrates discrete pipeline steps via a composable list of
``PipelineStep`` callables. Pass a custom ``pipeline`` to add,
remove, or reorder steps; defaults to ``DEFAULT_PIPELINE``.
remove, or reorder steps; defaults to the active pipeline (which
may be overridden by a premium plugin).
"""
logger.debug(
"Handling inbound message seq=%d for user %s, %d media attachment(s)",
Expand Down Expand Up @@ -510,5 +530,5 @@ async def handle_inbound_message(
download_media=download_media,
request_id=request_id,
)
ctx = await run_pipeline(ctx, pipeline or DEFAULT_PIPELINE)
ctx = await run_pipeline(ctx, pipeline or get_active_pipeline())
return ctx.response or AgentResponse(reply_text="")
Loading