Skip to content

(feat): add actions for langfuse evaluators#593

Open
narsimhaReddyJuspay wants to merge 1 commit into
juspay:releasefrom
narsimhaReddyJuspay:add-actions-for-langfuse-evaluators
Open

(feat): add actions for langfuse evaluators#593
narsimhaReddyJuspay wants to merge 1 commit into
juspay:releasefrom
narsimhaReddyJuspay:add-actions-for-langfuse-evaluators

Conversation

@narsimhaReddyJuspay
Copy link
Copy Markdown
Contributor

@narsimhaReddyJuspay narsimhaReddyJuspay commented Feb 24, 2026

  • added new actions for evaluators
  • added new config for actions of evaluators
  • added a basic update outcome action for now
  • they will work with same threshold as the alerts

Summary by CodeRabbit

  • New Features

    • Evaluators can trigger configurable automated actions (e.g., outcome corrections, cancel pending retries, send webhook) when scores cross thresholds.
    • New environment config to define evaluator actions.
  • Behavior

    • Actions run before failing-score alerts; Slack alerts now include Action Status and Lead ID when available.
  • Bug Fixes / Reliability

    • Reduced redundant lead lookups via targeted pre-fetching and refreshed lead state after actions.
  • Documentation

    • Added comprehensive docs for evaluator action configuration and rollout.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Feb 24, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 712d3f4d-2b2c-4034-9999-83c52e600858

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a Redis-backed async EVALUATOR_ACTIONS config, a new action framework (ActionType, OutcomeUpdateAction, ActionExecutor) to derive and apply outcome corrections, integrates action execution into the Langfuse score-monitoring flow (pre-fetch leads, run actions, refresh cache), and surfaces action status into Slack alerts.

Changes

Cohort / File(s) Summary
Config: dynamic fetcher
app/core/config/dynamic.py
Added async EVALUATOR_ACTIONS() -> dict[str, Any] to load/parse JSON from Redis, validate object shape, log JSON decode errors, and return {} on missing/invalid data.
Action framework
app/services/langfuse/tasks/score_monitor/actions.py
New module implementing ActionType, extract_json_from_end, extract_field, OutcomeUpdateAction (determine proposed outcome, enforce gating, update lead completion details, cancel pending retries, send HMAC-signed Nautilus webhook, track per-step results) and ActionExecutor (should_trigger, execute_action, error handling).
Score monitoring integration
app/services/langfuse/tasks/score_monitor/score.py
Fetches EVALUATOR_ACTIONS, instantiates ActionExecutor when configured, pre-fetches necessary leads, executes actions for failing evaluator scores (default outcome_update), refreshes cached lead on success, and passes action_result into send_score_alert (signature updated).
Slack alerting
app/services/slack/alert.py
Alert.send signature extended with optional action_status; fields filtered to exclude Action Status from normal grid and an explicit full-width section is appended when provided.
DB cancel-retries query
app/database/queries/breeze_buddy/lead_call_tracker.py
Added cancel_pending_retries_by_request_id_query: UPDATE that marks BACKLOG/RETRY rows FINISHED, sets outcome to CANCELLED_BY_OUTCOME_CORRECTION, updates timestamps, and merges cancellation metadata.
DB accessor
app/database/accessor/breeze_buddy/lead_call_tracker.py
Added cancel_pending_retries_by_request_id(request_id, reason) wrapper that executes the new query, returns number of rows cancelled, and logs on error.
Breeze Buddy tracing util
app/ai/voice/agents/breeze_buddy/utils/traces.py, app/ai/voice/agents/breeze_buddy/observability/tracing_setup.py
Added extract_possible_outcomes(flow) to extract static outcomes from template flow hooks; tracing setup now annotates root span with possible_outcomes when present.
Docs / env
docs/EVALUATOR_ACTIONS.md, .env.example
Documentation for EVALUATOR_ACTIONS config added; .env.example includes an example EVALUATOR_ACTIONS JSON variable.

Sequence Diagram

sequenceDiagram
    participant Monitor as Score Monitor
    participant Config as EVALUATOR_ACTIONS (Redis)
    participant Executor as ActionExecutor
    participant Action as OutcomeUpdateAction
    participant DB as Lead DB
    participant Tracker as Lead Call Tracker (cancel)
    participant Nautilus as Reporting Webhook
    participant Alert as Slack Alert

    Monitor->>Config: fetch EVALUATOR_ACTIONS()
    Config-->>Monitor: action mappings

    loop for each failing evaluator score
        Monitor->>Executor: should_trigger(score, config)?
        Executor-->>Monitor: yes/no
        alt yes
            Monitor->>DB: get_lead_by_call_id(call_sid)
            DB-->>Monitor: lead
            Monitor->>Executor: execute_action(type, cfg, call_sid, score, lead)
            Executor->>Action: run OutcomeUpdateAction
            Action->>Action: extract_proposed_outcome(score/comment or config)
            opt update_db
                Action->>DB: update_lead_completion_details(...)
                DB-->>Action: success/failure
            end
            opt cancel_retries
                Action->>Tracker: cancel_pending_retries_by_request_id(request_id, reason)
                Tracker-->>Action: rows_cancelled
            end
            opt webhook
                Action->>Nautilus: send HMAC-signed webhook(payload)
                Nautilus-->>Action: 200/err
            end
            Action-->>Executor: ActionResult (step_results, success)
            Executor-->>Monitor: action_result
            Monitor->>DB: refresh_cached_lead_if_action_success
        end
    end

    Monitor->>Alert: send_score_alert(..., action_status=action_result.to_slack_status())
    Alert-->>Monitor: delivered/failed
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰
A nibble of JSON from Redis, neat and bright,
Actions hop onward when scores lose the fight.
Outcomes peek out from comments in the night,
Leads get nudged, webhooks hum, alerts alight.
Bunny stamps approval — everything’s just right!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding action support for Langfuse evaluators, which is the central theme across all modified files.
Docstring Coverage ✅ Passed Docstring coverage is 96.15% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (6)
app/services/langfuse/tasks/score_monitor/actions.py (3)

43-56: Regex only handles one level of JSON nesting.

The pattern r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}" matches JSON objects with at most one level of nested braces. If the evaluator comment contains deeper nesting like {"result": {"details": {"outcome": "X"}}}, the regex will fail to capture the full object, potentially returning a partial match that won't parse or returning the wrong inner object.

For the current use case (flat JSON with actual_outcome), this works, but it's fragile if evaluator comment formats evolve.

A more robust alternative would be to scan backwards for { and attempt json.loads on progressively larger substrings:

♻️ More robust extraction approach
-    # Find all JSON objects in the comment
-    # Use a greedy match to get the largest valid JSON at each position
-    pattern = r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}"
-    matches = re.findall(pattern, comment, re.DOTALL)
-
-    if not matches:
-        return None
-
-    # Try to parse each match from the end, return first valid one
-    for match in reversed(matches):
-        try:
-            return json.loads(match)
-        except json.JSONDecodeError:
-            continue
+    # Search backwards for JSON objects by finding '{' positions
+    # and attempting to parse from each one
+    for i in range(len(comment) - 1, -1, -1):
+        if comment[i] == '{':
+            try:
+                obj = json.loads(comment[i:])
+                if isinstance(obj, dict):
+                    return obj
+            except json.JSONDecodeError:
+                continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 43 - 56,
The current regex-based extraction (pattern and re.findall on variable comment)
only supports one level of brace nesting and can miss deeper JSON; replace it
with a robust backward-scan that finds the last '{' positions in comment and for
each candidate substring (from that position to end) attempts json.loads until
one succeeds, returning the parsed object or None if none parse; update the code
that currently uses pattern, matches and the reversed loop to this scan-and-try
approach (use the same json.loads and comment variable so callers remain
unchanged).

78-80: Use removeprefix instead of replace for path parsing.

replace("$.", "") replaces all occurrences of "$." in the string, not just the leading prefix. While unlikely to cause issues with current usage, removeprefix (Python 3.9+) is semantically correct here.

♻️ Proposed fix
-    field = json_path.replace("$.", "")
+    field = json_path.removeprefix("$.")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 78 - 80,
The parsing of the JSON path uses json_path.replace("$.", "") which will remove
all occurrences of "$." instead of only a leading prefix; change this to use
json_path.removeprefix("$.") when computing field (the variable named field) so
only a leading "$." is dropped, then return data.get(field) as before; update
the line that assigns field (and any similar occurrences) to use removeprefix.

216-239: should_trigger duplicates ScoreMonitor._is_below_threshold.

This method is functionally identical to _is_below_threshold in score.py (lines 185-208). In check_and_alert, scores are already filtered by _is_below_threshold, so calling should_trigger again (score.py line 955) is redundant — every score in failing_scores already passed the threshold check.

Consider either:

  1. Removing the redundant should_trigger call in score.py, or
  2. Having ScoreMonitor._is_below_threshold delegate to ActionExecutor.should_trigger to centralize the logic.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 216 - 239,
The should_trigger method in ActionExecutor duplicates
ScoreMonitor._is_below_threshold and is redundantly called for items already
filtered into failing_scores in check_and_alert; remove the extra call to
ActionExecutor.should_trigger from ScoreMonitor.check_and_alert (so
failing_scores are assumed already below threshold) and keep
ActionExecutor.should_trigger for external use, or alternatively change
ScoreMonitor._is_below_threshold to call ActionExecutor.should_trigger
(centralizing logic) — update call sites accordingly to reference should_trigger
or _is_below_threshold consistently and remove the duplicate threshold check to
avoid double validation.
app/services/langfuse/tasks/score_monitor/score.py (3)

954-955: Redundant threshold check — should_trigger re-checks what _is_below_threshold already verified.

All scores in failing_scores have already passed _is_below_threshold (line 834), which performs the identical score_value < threshold comparison. The should_trigger call here is always True for these scores.

If the intent is to use the same threshold for actions and alerts, the check is redundant. If actions might use different thresholds in the future, consider making that explicit rather than re-checking with the same value.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 954 - 955,
The failing_scores loop is redundantly re-checking thresholds: scores have
already been filtered by _is_below_threshold (used to build failing_scores), so
calling executor.should_trigger(score, threshold) with the same threshold is
unnecessary; either remove that second check and directly trigger actions/alerts
for entries in failing_scores, or if you intend a separate action threshold,
make that explicit by using a different config key (e.g., action_threshold) and
pass that to executor.should_trigger — update references to evaluators_config,
_is_below_threshold, failing_scores, and executor.should_trigger accordingly.

296-315: Leads are fetched twice: once in the pre-fetch loop (line 932) and again inside send_score_alert (line 311).

send_score_alert independently calls get_lead_by_call_id(call_sid) at line 311 to get recording_url. The pre-fetched call_sid_to_lead dict is never passed to send_score_alert, so every call_sid with an action config triggers two DB lookups.

Consider passing the pre-fetched lead to send_score_alert or refactoring it to accept an optional recording_url parameter, so the same lead data serves both the action and the alert.

Also applies to: 928-936

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 296 - 315,
The code performs duplicate DB lookups for leads: pre-fetch builds
call_sid_to_lead but send_score_alert(call_sid, ...) still calls
get_lead_by_call_id(call_sid) to fetch recording_url; change send_score_alert to
accept an optional lead or recording_url (e.g., add parameter recording_url=None
or lead=None), update all callers (the pre-fetch loop that has call_sid_to_lead
to pass the pre-fetched lead/recording_url into send_score_alert), and inside
send_score_alert use the passed lead/recording_url if provided and only call
get_lead_by_call_id when it is not supplied; remove the redundant database call
path to eliminate the double lookup (refer to send_score_alert,
get_lead_by_call_id, call_sid_to_lead, and recording_url).

967-973: call_sid may be None when passed to execute_action.

At line 943, call_sid is set to None if trace_id is missing or has no mapping. This None is then passed to executor.execute_action(call_sid=call_sid, ...) at line 970, which flows into OutcomeUpdateAction.execute and ultimately calls get_lead_by_call_id(None).

While the downstream code handles it gracefully (returns None, leading to "No lead found" log + return False), it would be cleaner to skip action execution entirely when call_sid is absent:

♻️ Proposed guard
                             if executor.should_trigger(score, threshold):
+                                if not call_sid:
+                                    logger.info(
+                                        f"No call_sid for trace {trace_id}, skipping action"
+                                    )
+                                    action_status = "FAILED"
+                                    continue
                                 try:

Note: This would need a slight restructuring since the code isn't in a loop that supports continue at this level. A simple if call_sid: guard around the action block would work.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 967 - 973,
call_sid can be None before calling executor.execute_action, causing downstream
OutcomeUpdateAction.execute to call get_lead_by_call_id(None); add a guard to
skip executing actions when call_sid is falsy (e.g., if not call_sid: log/debug
that no call_sid found and skip the action block) so that execute_action is only
invoked when call_sid is present; update the code paths around the call to
executor.execute_action and any related logging to reflect the early skip (refer
to the variables call_sid, executor.execute_action, OutcomeUpdateAction.execute,
and get_lead_by_call_id when locating the change).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Line 13: Remove the unused typing import "List" from the import statement in
this module (the line importing Any, Dict, List, Optional); keep the used types
(Any, Dict, Optional) so the file compiles and the linter no longer flags an
unused import in actions.py.
- Around line 262-273: The current code calls ActionType(action_type) which
raises ValueError for unknown values and gets caught by the broad exception,
hiding a config problem; change the logic in the method that looks up handlers
so you validate the enum first (e.g., check ActionType.__members__ or use a
small try/except specifically around ActionType(action_type) to handle
ValueError), log a clear "unknown/unsupported action type" message referencing
the action_type when invalid, then use the validated enum key to get
self._action_handlers and proceed to instantiate handler_class and call
handler.execute(call_sid, score, current_lead); keep the broad exception
handling only for runtime errors from handler.execute.

In `@app/services/langfuse/tasks/score_monitor/score.py`:
- Around line 927-936: The pre-fetch currently iterates trace_to_call_sid for
all traces causing unnecessary DB calls; change it to first build a set of
relevant call_sids (only those belonging to traces that have failing scores AND
have a matching evaluator action in action_configs when executor is set), then
iterate that set to call get_lead_by_call_id and populate call_sid_to_lead; use
only the call_sid in the loop (drop unused trace_id) and keep the existing
try/except logger.error behavior. This limits queries to only the
failing+configured traces and removes the B007 unused-variable warning.

---

Nitpick comments:
In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Around line 43-56: The current regex-based extraction (pattern and re.findall
on variable comment) only supports one level of brace nesting and can miss
deeper JSON; replace it with a robust backward-scan that finds the last '{'
positions in comment and for each candidate substring (from that position to
end) attempts json.loads until one succeeds, returning the parsed object or None
if none parse; update the code that currently uses pattern, matches and the
reversed loop to this scan-and-try approach (use the same json.loads and comment
variable so callers remain unchanged).
- Around line 78-80: The parsing of the JSON path uses json_path.replace("$.",
"") which will remove all occurrences of "$." instead of only a leading prefix;
change this to use json_path.removeprefix("$.") when computing field (the
variable named field) so only a leading "$." is dropped, then return
data.get(field) as before; update the line that assigns field (and any similar
occurrences) to use removeprefix.
- Around line 216-239: The should_trigger method in ActionExecutor duplicates
ScoreMonitor._is_below_threshold and is redundantly called for items already
filtered into failing_scores in check_and_alert; remove the extra call to
ActionExecutor.should_trigger from ScoreMonitor.check_and_alert (so
failing_scores are assumed already below threshold) and keep
ActionExecutor.should_trigger for external use, or alternatively change
ScoreMonitor._is_below_threshold to call ActionExecutor.should_trigger
(centralizing logic) — update call sites accordingly to reference should_trigger
or _is_below_threshold consistently and remove the duplicate threshold check to
avoid double validation.

In `@app/services/langfuse/tasks/score_monitor/score.py`:
- Around line 954-955: The failing_scores loop is redundantly re-checking
thresholds: scores have already been filtered by _is_below_threshold (used to
build failing_scores), so calling executor.should_trigger(score, threshold) with
the same threshold is unnecessary; either remove that second check and directly
trigger actions/alerts for entries in failing_scores, or if you intend a
separate action threshold, make that explicit by using a different config key
(e.g., action_threshold) and pass that to executor.should_trigger — update
references to evaluators_config, _is_below_threshold, failing_scores, and
executor.should_trigger accordingly.
- Around line 296-315: The code performs duplicate DB lookups for leads:
pre-fetch builds call_sid_to_lead but send_score_alert(call_sid, ...) still
calls get_lead_by_call_id(call_sid) to fetch recording_url; change
send_score_alert to accept an optional lead or recording_url (e.g., add
parameter recording_url=None or lead=None), update all callers (the pre-fetch
loop that has call_sid_to_lead to pass the pre-fetched lead/recording_url into
send_score_alert), and inside send_score_alert use the passed lead/recording_url
if provided and only call get_lead_by_call_id when it is not supplied; remove
the redundant database call path to eliminate the double lookup (refer to
send_score_alert, get_lead_by_call_id, call_sid_to_lead, and recording_url).
- Around line 967-973: call_sid can be None before calling
executor.execute_action, causing downstream OutcomeUpdateAction.execute to call
get_lead_by_call_id(None); add a guard to skip executing actions when call_sid
is falsy (e.g., if not call_sid: log/debug that no call_sid found and skip the
action block) so that execute_action is only invoked when call_sid is present;
update the code paths around the call to executor.execute_action and any related
logging to reflect the early skip (refer to the variables call_sid,
executor.execute_action, OutcomeUpdateAction.execute, and get_lead_by_call_id
when locating the change).

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b8f0d21 and 257896c.

📒 Files selected for processing (3)
  • app/core/config/dynamic.py
  • app/services/langfuse/tasks/score_monitor/actions.py
  • app/services/langfuse/tasks/score_monitor/score.py

Comment thread app/services/langfuse/tasks/score_monitor/actions.py Outdated
Comment thread app/services/langfuse/tasks/score_monitor/actions.py Outdated
Comment thread app/services/langfuse/tasks/score_monitor/score.py Outdated
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch 2 times, most recently from 0d6e299 to 59f600c Compare February 24, 2026 13:12
@narsimhaReddyJuspay
Copy link
Copy Markdown
Contributor Author

@coderabbitai please review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Feb 24, 2026

@narsimhaReddyJuspay: I'll review the changes in this PR.

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
app/services/langfuse/tasks/score_monitor/score.py (1)

978-984: call_sid may be None when passed to execute_action.

At Line 981, call_sid can be None (from Line 954's .get()). This propagates to OutcomeUpdateAction.execute() which calls get_lead_by_call_id(None). A guard was suggested at the root cause in actions.py (OutcomeUpdateAction.execute), but you could also short-circuit here:

+                                    if not call_sid:
+                                        logger.warning(
+                                            f"No call_sid for trace {score.get('trace_id')}, skipping action"
+                                        )
+                                        action_status = "FAILED"
+                                    else:
                                         success = await executor.execute_action(
                                             action_type=action_type,
                                             action_config=action,
                                             call_sid=call_sid,
                                             score=score,
                                             current_lead=current_lead,
                                         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 978 - 984,
call_sid can be None (from the earlier .get()) and is currently passed into
executor.execute_action which leads OutcomeUpdateAction.execute to call
get_lead_by_call_id(None); guard against this by short-circuiting before calling
executor.execute_action when call_sid is falsy (or explicitly None) — e.g.,
check the call_sid local variable and skip or log the action dispatch for that
action_type; reference the call_sid variable and the executor.execute_action
call, and note OutcomeUpdateAction.execute / get_lead_by_call_id as the
downstream symbols to avoid invoking with None.
🧹 Nitpick comments (3)
app/services/langfuse/tasks/score_monitor/score.py (1)

960-1007: Consider extracting action processing into a helper method to reduce nesting.

This block is nested 7 levels deep (forfortryifififtry), which makes the flow hard to follow. Extracting the action processing (Lines 960–1007) into a dedicated method like _process_action_for_score(executor, action_configs, evaluators_config, score, call_sid, call_sid_to_lead) would improve readability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 960 - 1007,
The action-processing block within the score monitoring loop is deeply nested;
extract it into a helper function named something like
_process_action_for_score(executor, action_configs, evaluators_config, score,
call_sid, call_sid_to_lead) that encapsulates the logic currently around
executor.should_trigger(...), executor.execute_action(...), action_status
handling, and the post-success lead refresh via get_lead_by_call_id(call_sid);
move exception handling for action execution and lead re-fetch into that helper
and return action_status and any updated lead so the caller (the outer loop)
only calls the helper and updates call_sid_to_lead if needed, preserving
existing variable names (executor, action_configs, evaluators_config, score,
call_sid, call_sid_to_lead) and log messages (logger.error) to retain behavior.
app/services/langfuse/tasks/score_monitor/actions.py (2)

43-56: Regex only handles single-level nested JSON objects.

The pattern \{[^{}]*(?:\{[^{}]*\}[^{}]*)*\} will correctly match {"outcome": "X"} or {"a": {"b": "c"}}, but will fail on deeper nesting like {"a": {"b": {"c": "d"}}}. If evaluator comments may contain multi-level nested JSON, this will silently miss valid objects.

If deeper nesting is a future possibility, consider a stack-based brace-matching approach or use a scanning parser instead of regex.

💡 Alternative: brace-matching extraction
def extract_json_from_end(comment: str) -> Optional[Dict[str, Any]]:
    if not comment:
        return None

    # Find all top-level JSON objects by brace matching (supports arbitrary nesting)
    candidates = []
    i = len(comment) - 1
    while i >= 0:
        if comment[i] == '}':
            depth = 0
            end = i
            while i >= 0:
                if comment[i] == '}':
                    depth += 1
                elif comment[i] == '{':
                    depth -= 1
                if depth == 0:
                    candidates.append(comment[i:end + 1])
                    break
                i -= 1
        i -= 1

    for candidate in candidates:
        try:
            return json.loads(candidate)
        except json.JSONDecodeError:
            continue

    if candidates:
        logger.warning(
            f"Found {len(candidates)} JSON-like patterns but none parsed successfully"
        )
    return None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 43 - 56,
The current regex-based extraction using the pattern variable (pattern =
r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}") will fail on deeper-than-two-level nested
JSON; replace the regex approach in this block with a brace-matching scanner
that walks the comment string from the end, collects candidate JSON substrings
by tracking brace depth, and then json.loads() each candidate in reverse order
until one parses successfully (update the existing matches/reversed(matches)
logic to use the new candidates list). Ensure the new logic handles empty
comments, logs a warning if candidates were found but none parsed, and keep the
function behavior of returning the first successfully parsed dict or None.

216-239: should_trigger is redundant with the upstream filtering in score.py.

Scores reaching the action-execution block (score.py Line 966) are already filtered by _is_below_threshold (score.py Line 834) using the same threshold. So executor.should_trigger(score, threshold) will always return True for these scores.

This isn't a bug — it's a defensive double-check — but it duplicates identical logic (float(value) < threshold) from ScoreMonitor._is_below_threshold. If this is intentional as a safety guard, a brief comment noting that would help future readers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 216 - 239,
The should_trigger method duplicates the same float(value) < threshold check
already performed upstream by ScoreMonitor._is_below_threshold before actions
are executed; update the should_trigger implementation (or its docstring) to
explicitly state this is an intentional defensive guard against untrusted inputs
coming into executor.should_trigger and that it may be redundant because
ScoreMonitor._is_below_threshold already filters scores, so leave the existing
logic but add a concise comment referencing ScoreMonitor._is_below_threshold to
make the intent clear to future readers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Around line 64-80: The extract_field function currently returns
data.get(field) which can be non-string; change extract_field to coerce non-None
values to str (e.g., return None if missing else str(value)) so its return
matches the expected string downstream; update callers (notably _extract_outcome
and the call to update_lead_call_completion_details(outcome=...)) to rely on the
coerced string/None behavior from extract_field.
- Around line 101-124: The execute method may receive call_sid=None (propagated
from trace_to_call_sid.get(trace_id)), so avoid calling get_lead_by_call_id with
None; add an early guard at the start of ScoreMonitorOutcomeAction.execute to
check if call_sid is falsy/None and immediately return False (and optionally log
a warning) before attempting to call get_lead_by_call_id(call_sid), preserving
the existing behavior when a valid call_sid is provided.

---

Duplicate comments:
In `@app/services/langfuse/tasks/score_monitor/score.py`:
- Around line 978-984: call_sid can be None (from the earlier .get()) and is
currently passed into executor.execute_action which leads
OutcomeUpdateAction.execute to call get_lead_by_call_id(None); guard against
this by short-circuiting before calling executor.execute_action when call_sid is
falsy (or explicitly None) — e.g., check the call_sid local variable and skip or
log the action dispatch for that action_type; reference the call_sid variable
and the executor.execute_action call, and note OutcomeUpdateAction.execute /
get_lead_by_call_id as the downstream symbols to avoid invoking with None.

---

Nitpick comments:
In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Around line 43-56: The current regex-based extraction using the pattern
variable (pattern = r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}") will fail on
deeper-than-two-level nested JSON; replace the regex approach in this block with
a brace-matching scanner that walks the comment string from the end, collects
candidate JSON substrings by tracking brace depth, and then json.loads() each
candidate in reverse order until one parses successfully (update the existing
matches/reversed(matches) logic to use the new candidates list). Ensure the new
logic handles empty comments, logs a warning if candidates were found but none
parsed, and keep the function behavior of returning the first successfully
parsed dict or None.
- Around line 216-239: The should_trigger method duplicates the same
float(value) < threshold check already performed upstream by
ScoreMonitor._is_below_threshold before actions are executed; update the
should_trigger implementation (or its docstring) to explicitly state this is an
intentional defensive guard against untrusted inputs coming into
executor.should_trigger and that it may be redundant because
ScoreMonitor._is_below_threshold already filters scores, so leave the existing
logic but add a concise comment referencing ScoreMonitor._is_below_threshold to
make the intent clear to future readers.

In `@app/services/langfuse/tasks/score_monitor/score.py`:
- Around line 960-1007: The action-processing block within the score monitoring
loop is deeply nested; extract it into a helper function named something like
_process_action_for_score(executor, action_configs, evaluators_config, score,
call_sid, call_sid_to_lead) that encapsulates the logic currently around
executor.should_trigger(...), executor.execute_action(...), action_status
handling, and the post-success lead refresh via get_lead_by_call_id(call_sid);
move exception handling for action execution and lead re-fetch into that helper
and return action_status and any updated lead so the caller (the outer loop)
only calls the helper and updates call_sid_to_lead if needed, preserving
existing variable names (executor, action_configs, evaluators_config, score,
call_sid, call_sid_to_lead) and log messages (logger.error) to retain behavior.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 257896c and 59f600c.

📒 Files selected for processing (3)
  • app/core/config/dynamic.py
  • app/services/langfuse/tasks/score_monitor/actions.py
  • app/services/langfuse/tasks/score_monitor/score.py

Comment thread app/services/langfuse/tasks/score_monitor/actions.py Outdated
Comment thread app/services/langfuse/tasks/score_monitor/actions.py Outdated
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch from 59f600c to 6cee410 Compare February 24, 2026 13:35
@narsimhaReddyJuspay
Copy link
Copy Markdown
Contributor Author

@coderabbitai please review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Feb 24, 2026

@narsimhaReddyJuspay: I'll review the changes in this PR.

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/core/config/dynamic.py`:
- Around line 263-299: EVALUATOR_ACTIONS currently returns whatever
json.loads(config_value) yields which may be a list or primitive; validate that
the parsed value is a dict before returning and fail closed if not. After
json.loads in EVALUATOR_ACTIONS, check isinstance(parsed, dict) and if it is,
return it; otherwise log a clear error (including the parsed type and the raw
config_value) via logger.error and return {} so downstream callers (e.g., code
expecting .get()) won’t raise AttributeError.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 59f600c and 6cee410.

📒 Files selected for processing (3)
  • app/core/config/dynamic.py
  • app/services/langfuse/tasks/score_monitor/actions.py
  • app/services/langfuse/tasks/score_monitor/score.py

Comment thread app/core/config/dynamic.py
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch from 6cee410 to 6204f42 Compare February 24, 2026 14:48
@narsimhaReddyJuspay
Copy link
Copy Markdown
Contributor Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Feb 24, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (5)
app/services/langfuse/tasks/score_monitor/actions.py (2)

241-249: Minor: return score_value < threshold inside try body (Ruff TRY300)

Same pattern as in dynamic.py — the return belongs in an else block so only the float conversion is guarded.

♻️ Proposed refactor
         try:
             value = score.get("value")
             if value is None:
                 return False
             score_value = float(value)
-            return score_value < threshold
         except (ValueError, TypeError):
             logger.warning(f"Invalid score value: {score.get('value')}")
             return False
+        else:
+            return score_value < threshold
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 241 - 249,
The try block currently wraps both the float conversion and the comparison,
which can hide logic errors; change it so only the conversion of
score.get("value") to float is inside the try (e.g., try: score_value =
float(value) except (ValueError, TypeError): logger.warning(...) return False),
and perform the comparison return score_value < threshold in an else (or
immediately after the try) using the same symbols (score.get("value"),
score_value, threshold) so only the conversion is guarded and the return is
outside the try body.

43-61: extract_json_from_end regex handles at most two levels of JSON nesting

The pattern r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}" matches an object whose inner values contain at most one level of {}. For three or more levels (e.g., {"a": {"b": {"c": 1}}}), re.findall returns an inner sub-object instead of the outermost one. The current callers only embed flat objects ({"actual_outcome": "VOICEMAIL"}), so this is fine now, but the limitation is invisible to future contributors.

Consider adding a docstring note about the nesting depth limit, or replacing the regex with an iterative bracket-balancing scan:

♻️ Bracket-balanced alternative (handles arbitrary nesting)
-    # Find all JSON objects in the comment
-    # Use a greedy match to get the largest valid JSON at each position
-    pattern = r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}"
-    matches = re.findall(pattern, comment, re.DOTALL)
-
-    if not matches:
+    # Find all top-level '{' positions and extract the balanced span
+    matches = []
+    for i, ch in enumerate(comment):
+        if ch != "{":
+            continue
+        depth = 0
+        for j in range(i, len(comment)):
+            if comment[j] == "{":
+                depth += 1
+            elif comment[j] == "}":
+                depth -= 1
+            if depth == 0:
+                matches.append(comment[i : j + 1])
+                break
+
+    if not matches:
         return None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 43 - 61,
The regex in extract_json_from_end (pattern variable) only supports up to two
nested brace levels, so replace it with an iterative bracket-balanced scan that
walks the string (preferably from the end) counting '{' and '}' until a balanced
top-level object is found, then json.loads that substring; update error handling
to catch json.JSONDecodeError and keep the existing logger.warning message;
alternatively, if you want the minimal change, add a docstring to
extract_json_from_end documenting the nesting limitation and the rationale.
Ensure references to extract_json_from_end, pattern, and logger are preserved so
reviewers can find the change.
app/services/langfuse/tasks/score_monitor/score.py (2)

962-966: executor.should_trigger is always True here — redundant check

failing_scores on line 833-835 is already filtered with self._is_below_threshold(s, threshold), which uses identical logic (float(value) < threshold). Every score in the loop has already passed this check with the same threshold, so executor.should_trigger unconditionally returns True.

♻️ Proposed simplification
                     threshold = evaluators_config.get(evaluator_name, 5)
-                    if executor.should_trigger(score, threshold):
-                        try:
+                    try:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 962 - 966,
The loop calls executor.should_trigger(score, threshold) after building
failing_scores using self._is_below_threshold(s, threshold) which uses identical
float(value) < threshold logic, making the check redundant; remove the
executor.should_trigger(...) condition or move the executor.should_trigger check
into the failing_scores filter so you either (A) just rely on failing_scores and
drop the executor.should_trigger branch where evaluator_name/action_config are
handled, or (B) change the failing_scores computation to call
executor.should_trigger(s, threshold) (instead of self._is_below_threshold) so
the executor's custom logic is used; update references to failing_scores,
_is_below_threshold, executor.should_trigger, evaluator_name, action_config,
evaluators_config and score accordingly.

309-315: send_score_alert issues a fresh DB query for recording_url even when the lead is already in call_sid_to_lead

With the new pre-fetch cache (call_sid_to_lead) and the re-fetch after a successful action (lines 991-997), a single call_sid can now trigger up to three get_lead_by_call_id calls per alert cycle. The send_score_alert method has no access to the cache, so it always queries the DB again.

Consider passing the recording_url (or the cached lead) to send_score_alert so the method only falls back to a DB query when no cached value is available.

♻️ Sketch of the change
 async def send_score_alert(
     self,
     evaluator_name: str,
     score: Dict[str, Any],
     trace_details: Optional[Dict[str, Any]] = None,
     action_status: Optional[str] = None,
+    cached_lead: Optional[Any] = None,
 ) -> bool:
     ...
     if trace_details:
         ...
         if call_sid:
+            lead = cached_lead
+            if lead is None:
                 try:
                     lead = await get_lead_by_call_id(call_sid)
                 except Exception as e:
                     logger.error(f"Error querying database for recording_url: {e}")
             if lead and lead.recording_url:
                 recording_url = lead.recording_url

Then at the call site (line 1010), pass cached_lead=call_sid_to_lead.get(call_sid).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 309 - 315,
send_score_alert currently always calls get_lead_by_call_id causing redundant DB
queries; modify send_score_alert to accept an optional cached_lead (or
recording_url) parameter and use cached_lead.recording_url (or the passed
recording_url) if present before falling back to calling get_lead_by_call_id.
Update the place that invokes send_score_alert to pass
cached_lead=call_sid_to_lead.get(call_sid) (or cached recording_url) so the DB
query only runs when no cached value exists; keep existing exception handling
for the fallback query.
app/core/config/dynamic.py (1)

295-306: Minor: return parsed inside try body (Ruff TRY300)

Moving the success return to an else block makes the try/except intent clearer — only the statement that can actually raise JSONDecodeError stays inside try.

♻️ Proposed refactor
     try:
         parsed = json.loads(config_value)
         # Ensure the parsed value is a dict, not a list or primitive
         if not isinstance(parsed, dict):
             logger.error(
                 f"EVALUATOR_ACTIONS config must be a JSON object, got {type(parsed).__name__}"
             )
             return {}
-        return parsed
     except json.JSONDecodeError as e:
         logger.error(f"Failed to parse EVALUATOR_ACTIONS config: {e}")
         return {}
+    else:
+        return parsed
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/core/config/dynamic.py` around lines 295 - 306, Refactor the try/except
so only json.loads(config_value) is inside the try: block and the
successful-return logic is inside an else: block; specifically, keep try: parsed
= json.loads(config_value) and the except json.JSONDecodeError handler logging
"Failed to parse EVALUATOR_ACTIONS config: {e}" and returning {}, then in the
else: check isinstance(parsed, dict) (using the same logger/error text about
"EVALUATOR_ACTIONS config must be a JSON object"), return {} on failure, and
finally return parsed on success (referencing variables config_value, parsed and
logger).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@app/core/config/dynamic.py`:
- Around line 295-306: Refactor the try/except so only json.loads(config_value)
is inside the try: block and the successful-return logic is inside an else:
block; specifically, keep try: parsed = json.loads(config_value) and the except
json.JSONDecodeError handler logging "Failed to parse EVALUATOR_ACTIONS config:
{e}" and returning {}, then in the else: check isinstance(parsed, dict) (using
the same logger/error text about "EVALUATOR_ACTIONS config must be a JSON
object"), return {} on failure, and finally return parsed on success
(referencing variables config_value, parsed and logger).

In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Around line 241-249: The try block currently wraps both the float conversion
and the comparison, which can hide logic errors; change it so only the
conversion of score.get("value") to float is inside the try (e.g., try:
score_value = float(value) except (ValueError, TypeError): logger.warning(...)
return False), and perform the comparison return score_value < threshold in an
else (or immediately after the try) using the same symbols (score.get("value"),
score_value, threshold) so only the conversion is guarded and the return is
outside the try body.
- Around line 43-61: The regex in extract_json_from_end (pattern variable) only
supports up to two nested brace levels, so replace it with an iterative
bracket-balanced scan that walks the string (preferably from the end) counting
'{' and '}' until a balanced top-level object is found, then json.loads that
substring; update error handling to catch json.JSONDecodeError and keep the
existing logger.warning message; alternatively, if you want the minimal change,
add a docstring to extract_json_from_end documenting the nesting limitation and
the rationale. Ensure references to extract_json_from_end, pattern, and logger
are preserved so reviewers can find the change.

In `@app/services/langfuse/tasks/score_monitor/score.py`:
- Around line 962-966: The loop calls executor.should_trigger(score, threshold)
after building failing_scores using self._is_below_threshold(s, threshold) which
uses identical float(value) < threshold logic, making the check redundant;
remove the executor.should_trigger(...) condition or move the
executor.should_trigger check into the failing_scores filter so you either (A)
just rely on failing_scores and drop the executor.should_trigger branch where
evaluator_name/action_config are handled, or (B) change the failing_scores
computation to call executor.should_trigger(s, threshold) (instead of
self._is_below_threshold) so the executor's custom logic is used; update
references to failing_scores, _is_below_threshold, executor.should_trigger,
evaluator_name, action_config, evaluators_config and score accordingly.
- Around line 309-315: send_score_alert currently always calls
get_lead_by_call_id causing redundant DB queries; modify send_score_alert to
accept an optional cached_lead (or recording_url) parameter and use
cached_lead.recording_url (or the passed recording_url) if present before
falling back to calling get_lead_by_call_id. Update the place that invokes
send_score_alert to pass cached_lead=call_sid_to_lead.get(call_sid) (or cached
recording_url) so the DB query only runs when no cached value exists; keep
existing exception handling for the fallback query.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6cee410 and 6204f42.

📒 Files selected for processing (3)
  • app/core/config/dynamic.py
  • app/services/langfuse/tasks/score_monitor/actions.py
  • app/services/langfuse/tasks/score_monitor/score.py

@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch 6 times, most recently from 0d13f79 to 85b7a76 Compare February 27, 2026 07:31
Copilot AI review requested due to automatic review settings March 2, 2026 10:15
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch from 85b7a76 to 4bc6d39 Compare March 2, 2026 10:15
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an evaluator-action framework to the Langfuse score monitor so low evaluator scores can automatically run follow-up actions (initially: outcome correction) and surface step-by-step action status in Slack alerts.

Changes:

  • Introduces evaluator-triggered actions (OutcomeUpdateAction) with step tracking (ActionResult) and an ActionExecutor.
  • Integrates action execution into the Langfuse score monitor loop (execute actions before sending failing-score alerts) and enhances Slack alerts with a full-width action status section.
  • Adds DB support for cancelling pending retry leads by request_id, plus a new dynamic config getter for EVALUATOR_ACTIONS and accompanying documentation.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
docs/EVALUATOR_ACTIONS.md New documentation describing evaluator actions, config schema, flow, and alert/logging behavior.
app/services/slack/alert.py Adds action_status rendering as a separate full-width Slack block and improves mention formatting.
app/services/langfuse/tasks/score_monitor/score.py Executes configured actions prior to Slack alerts; includes action status + lead id in alert payload.
app/services/langfuse/tasks/score_monitor/actions.py New evaluator action framework (outcome update, retry cancellation, Nautilus webhook).
app/database/queries/breeze_buddy/lead_call_tracker.py Adds SQL to cancel pending retries by request_id.
app/database/accessor/breeze_buddy/lead_call_tracker.py Adds accessor to execute the new cancellation query and return count cancelled.
app/core/config/dynamic.py Adds dynamic config getter EVALUATOR_ACTIONS() for Redis/env-driven action configuration.
.env.example Adds an EVALUATOR_ACTIONS example.

Comment thread app/services/langfuse/tasks/actions/actions.py
Comment thread docs/EVALUATOR_ACTIONS.md Outdated
Comment thread app/services/langfuse/tasks/score_monitor/actions.py Outdated
Comment thread app/services/langfuse/tasks/score_monitor/actions.py Outdated
Comment thread app/core/config/dynamic.py
Comment thread .env.example Outdated
Comment thread docs/EVALUATOR_ACTIONS.md
Comment thread app/services/langfuse/tasks/actions/actions.py
Comment thread app/services/langfuse/tasks/score_monitor/actions.py Outdated
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch 2 times, most recently from 79f8c57 to c9d3d7e Compare March 10, 2026 13:16
@swaroopvarma1
Copy link
Copy Markdown
Collaborator

swaroopvarma1 commented Mar 13, 2026

Changes requested

The PR introduces langfuse evaluator actions but lacks schema validation for the Redis-backed configuration, causing fail-slow behavior that delays error detection until action execution.

  • Correctness: EVALUATOR_ACTIONS() parses Redis JSON without validating required fields (action_type, action_config) or action_steps structure, risking runtime failures and inconsistent evaluator state. Add Pydantic models to fail fast on startup. app/core/config/dynamic.py:285-285

@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch from c9d3d7e to a4dd3f3 Compare March 17, 2026 20:30
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch from a4dd3f3 to 266ab68 Compare March 25, 2026 09:03
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch 3 times, most recently from c907144 to b4bef87 Compare April 14, 2026 12:28
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch 4 times, most recently from 8fed7ea to 943b04a Compare April 28, 2026 09:29
@narsimhaReddyJuspay
Copy link
Copy Markdown
Contributor Author

Changes requested

The PR introduces langfuse evaluator actions but lacks schema validation for the Redis-backed configuration, causing fail-slow behavior that delays error detection until action execution.

  • Correctness: EVALUATOR_ACTIONS() parses Redis JSON without validating required fields (action_type, action_config) or action_steps structure, risking runtime failures and inconsistent evaluator state. Add Pydantic models to fail fast on startup. app/core/config/dynamic.py:285-285

we already to validation in code and it fails fast, adding pydantic models is an overhead we dont need ?

@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch 3 times, most recently from 8b72dc9 to 7027ed8 Compare April 29, 2026 10:25
Comment thread docs/EVALUATOR_ACTIONS.md
Check EVALUATOR_ACTIONS config
Execute OutcomeUpdateAction:
1. Update DB outcome
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need HITL here ? We can get a slack alert or something where we review and approve these cases until it's fully stabilized ?

Because here we are directly allowing to update the outcome in DB based on a judgement of LLM and outcomes will be lost with db writes becoming irreversible.

Copy link
Copy Markdown
Contributor Author

@narsimhaReddyJuspay narsimhaReddyJuspay Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hitl is excessive (maybe later) and not scalable here, goal is to automate, we can configure the not allowed outcome changes like confirm to busy , we have extensive logs and traces and alerts so original outcome is not lost.

as for llm judgement , the original outcome is also given by llm no ?

@narsimhaReddyJuspay
Copy link
Copy Markdown
Contributor Author

@coderabbitai please review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 30, 2026

@narsimhaReddyJuspay: I'll review the changes in this PR.

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
app/services/langfuse/tasks/score_monitor/actions.py (1)

560-567: 💤 Low value

Clarify the success criteria in a comment.

The success logic allows cancel_retries and nautilus_webhook to be "FAILED" or "ERROR" while still marking success=True. This is intentional (DB update is critical, others are non-critical), but a brief inline comment would help future maintainers understand this design decision.

📝 Suggested clarification comment
             # Determine overall success
-            # DB update is critical - must succeed or be skipped
-            # Cancel retries and webhook are non-critical - any non-None status is acceptable
+            # DB update is critical - must succeed or be skipped for overall success
+            # Cancel retries and webhook are non-critical - failures are logged but
+            # don't block overall success (the outcome was still corrected in DB)
             result.success = (
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 560 - 567,
Add an inline comment above the result.success assignment clarifying the
intended success semantics: that only result.db_update must be "SUCCESS" or
"SKIPPED" (DB update is critical), while result.cancel_retries and
result.nautilus_webhook are considered non-critical and may be "SUCCESS",
"SKIPPED", "FAILED", or "ERROR" without causing overall failure; reference the
boolean expression involving result.db_update, result.cancel_retries, and
result.nautilus_webhook to make the design decision explicit for future
maintainers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/ai/voice/agents/breeze_buddy/utils/traces.py`:
- Around line 25-38: The nested traversal over flow -> nodes -> functions ->
hooks in this block can raise if any element is not a dict; update the loop in
the function that processes flows so you check types before accessing .get:
ensure each node is a dict before iterating node.get("functions", []), each func
is a dict before iterating func.get("hooks", []), and each hook is a dict before
reading hook.get("name") and hook.get("expected_fields"); also guard that
expected_fields and outcome_field are dicts before calling
outcome_field.get("source") or outcome_field["value"]; skip any non-dict entries
so malformed template data won't raise and will safely continue updating seen
and outcomes only when values are valid for the "update_outcome_in_database"
hook.

---

Nitpick comments:
In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Around line 560-567: Add an inline comment above the result.success assignment
clarifying the intended success semantics: that only result.db_update must be
"SUCCESS" or "SKIPPED" (DB update is critical), while result.cancel_retries and
result.nautilus_webhook are considered non-critical and may be "SUCCESS",
"SKIPPED", "FAILED", or "ERROR" without causing overall failure; reference the
boolean expression involving result.db_update, result.cancel_retries, and
result.nautilus_webhook to make the design decision explicit for future
maintainers.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0aa0cb8a-472e-446b-8a1a-b4a2f0ace8a9

📥 Commits

Reviewing files that changed from the base of the PR and between 6204f42 and 7027ed8.

📒 Files selected for processing (10)
  • .env.example
  • app/ai/voice/agents/breeze_buddy/observability/tracing_setup.py
  • app/ai/voice/agents/breeze_buddy/utils/traces.py
  • app/core/config/dynamic.py
  • app/database/accessor/breeze_buddy/lead_call_tracker.py
  • app/database/queries/breeze_buddy/lead_call_tracker.py
  • app/services/langfuse/tasks/score_monitor/actions.py
  • app/services/langfuse/tasks/score_monitor/score.py
  • app/services/slack/alert.py
  • docs/EVALUATOR_ACTIONS.md
✅ Files skipped from review due to trivial changes (1)
  • .env.example

Comment thread app/ai/voice/agents/breeze_buddy/utils/traces.py
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch from 7027ed8 to b2f26a1 Compare May 6, 2026 06:03
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch 2 times, most recently from 04661a7 to c4f4fbe Compare May 7, 2026 12:10
try:
parsed = json.loads(config_value)
# Ensure the parsed value is a dict, not a list or primitive
if not isinstance(parsed, dict):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decode here

Comment thread .env.example Outdated
REDIS_CLUSTER_NODES=redis1:6379,redis2:6380,redis3:6381

LANGFUSE_EVALUATORS="breeze buddy outcome correctness, latency evaluator" # name of the evaluator
EVALUATOR_ACTIONS='{"OUTCOME MISMATCH": {"action_type": "outcome_update", "action_config": {"outcome_key": "$.actual_outcome"}, "action_steps": {"update_in_db": true, "send_nautilus_webhook": true, "cancel_retries": true}}}'
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_reporting_webhook

if executor and action_configs:
evaluator_action_config = action_configs.get(evaluator_name)
if evaluator_action_config:
threshold = evaluators_config.get(evaluator_name, 5)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this into a function

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

OUTCOME_UPDATE = "outcome_update"


def extract_json_from_end(comment: str) -> Optional[Dict[str, Any]]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

langfuse/tasks/actions

Actions.py
utils.py

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch 2 times, most recently from 96d3894 to 69fde5c Compare May 13, 2026 09:03
- added new actions for evaluators
- added new config for actions of evaluators
- added a basic update outcome action for now
- they will work with same threshold as the alerts
@narsimhaReddyJuspay narsimhaReddyJuspay force-pushed the add-actions-for-langfuse-evaluators branch from 69fde5c to 70b4c12 Compare May 13, 2026 09:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants