Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 84 additions & 10 deletions src/google/adk/plugins/bigquery_agent_analytics_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1430,14 +1430,18 @@ async def _parse_content_object(

# CASE C: Text
elif hasattr(part, "text") and part.text:
text_len = len(part.text.encode("utf-8"))
# If max_length is set and smaller than inline limit, use it as threshold
# to prefer offloading over truncation.
offload_threshold = self.inline_text_limit
if self.max_length != -1 and self.max_length < offload_threshold:
offload_threshold = self.max_length

if self.offloader and text_len > offload_threshold:
char_len = len(part.text)
byte_len = len(part.text.encode("utf-8"))

# Decide whether to offload using each limit in its own
# unit. inline_text_limit is a byte-based storage guard;
# max_length is a character-based truncation limit.
exceeds_inline_byte_limit = byte_len > self.inline_text_limit
exceeds_char_limit = (
self.max_length != -1 and char_len > self.max_length
)

if self.offloader and (exceeds_inline_byte_limit or exceeds_char_limit):
# Text is too big, treat as file
path = f"{datetime.now().date()}/{self.trace_id}/{self.span_id}_p{idx}.txt"
try:
Expand Down Expand Up @@ -1906,6 +1910,18 @@ def _get_events_schema() -> list[bigquery.SchemaField]:
" '$.a2a_metadata.\"a2a:response\"') AS a2a_response"
),
],
"AGENT_RESPONSE": [
"JSON_VALUE(content, '$.response') AS response_text",
"JSON_VALUE(attributes, '$.source_event_id') AS source_event_id",
(
"JSON_VALUE(attributes,"
" '$.source_event_author') AS source_event_author"
),
(
"JSON_VALUE(attributes,"
" '$.source_event_branch') AS source_event_branch"
),
],
}

_VIEW_SQL_TEMPLATE = """\
Expand Down Expand Up @@ -2653,7 +2669,14 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

async def _ensure_started(self, **kwargs) -> None:
"""Ensures that the plugin is started and initialized."""
if os.getpid() != self._init_pid:
# _init_pid == 0 means the plugin was unpickled and has never been
# initialized in this process (the pickle sentinel set by
# __getstate__). Skip the fork reset in that case — no fork
# happened, and _started is already False so _lazy_setup will run.
# Real forks are caught by os.register_at_fork (line 108) and by
# this check when _init_pid is a real (non-zero) PID from a
# different process.
if self._init_pid != 0 and os.getpid() != self._init_pid:
self._reset_runtime_state()
if not self._started:
# Kept original lock name as it was not explicitly changed.
Expand All @@ -2665,6 +2688,10 @@ async def _ensure_started(self, **kwargs) -> None:
await self._lazy_setup(**kwargs)
self._started = True
self._startup_error = None
# Record the current PID so fork detection works for
# the rest of this instance's lifetime.
if self._init_pid == 0:
self._init_pid = os.getpid()
except Exception as e:
self._startup_error = e
logger.error("Failed to initialize BigQuery Plugin: %s", e)
Expand Down Expand Up @@ -2966,7 +2993,7 @@ async def on_event_callback(
invocation_context: InvocationContext,
event: "Event",
) -> None:
"""Logs state changes, HITL events, and A2A interactions.
"""Logs state changes, HITL events, A2A interactions, and agent responses.

- Checks each event for a non-empty state_delta and logs it as a
STATE_DELTA event.
Expand All @@ -2978,6 +3005,9 @@ async def on_event_callback(
and logs them as ``A2A_INTERACTION`` events so the remote
agent's response and cross-reference IDs (``a2a:task_id``,
``a2a:context_id``) are visible in BigQuery.
- Detects final response events emitted by agents and logs
them as ``AGENT_RESPONSE`` so the visible response text
(after all callback modifications) is captured in BigQuery.

The HITL detection must happen here (not in tool callbacks) because
``adk_request_credential``, ``adk_request_confirmation``, and
Expand Down Expand Up @@ -3080,6 +3110,50 @@ async def on_event_callback(
),
)

# --- Final agent response logging ---
# Captures final response events emitted by agents (after all
# after_model_callback modifications). Uses a strict guard to
# avoid false positives from skip_summarization function
# responses, long-running tool pause events, and thought-only
# events (which ADK treats as invisible internal reasoning).
is_agent_response = (
event.content
and event.content.parts
and event.is_final_response()
and event.partial is not True
and not event.get_function_calls()
and not event.get_function_responses()
and not event.long_running_tool_ids
)
if is_agent_response:
# Filter to visible text parts only. Exclude thoughts
# (internal reasoning, A2A working/submitted updates),
# empty parts, and non-text parts (executable_code, etc.)
# that would render as "other" in _format_content.
visible_parts = [
p
for p in event.content.parts
if p.text and not getattr(p, "thought", None)
]
if visible_parts:
visible_content = types.Content(
role=event.content.role, parts=visible_parts
)
formatted, truncated = self._format_content_safely(visible_content)
await self._log_event(
"AGENT_RESPONSE",
callback_ctx,
raw_content={"response": formatted},
is_truncated=truncated,
event_data=EventData(
extra_attributes={
"source_event_id": event.id,
"source_event_author": event.author,
"source_event_branch": event.branch,
},
),
)

return None

async def on_state_change_callback(
Expand Down
Loading