diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 50eb72ffdb..3a09fc942f 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -1430,14 +1430,18 @@ async def _parse_content_object( # CASE C: Text elif hasattr(part, "text") and part.text: - text_len = len(part.text.encode("utf-8")) - # If max_length is set and smaller than inline limit, use it as threshold - # to prefer offloading over truncation. - offload_threshold = self.inline_text_limit - if self.max_length != -1 and self.max_length < offload_threshold: - offload_threshold = self.max_length - - if self.offloader and text_len > offload_threshold: + char_len = len(part.text) + byte_len = len(part.text.encode("utf-8")) + + # Decide whether to offload using each limit in its own + # unit. inline_text_limit is a byte-based storage guard; + # max_length is a character-based truncation limit. + exceeds_inline_byte_limit = byte_len > self.inline_text_limit + exceeds_char_limit = ( + self.max_length != -1 and char_len > self.max_length + ) + + if self.offloader and (exceeds_inline_byte_limit or exceeds_char_limit): # Text is too big, treat as file path = f"{datetime.now().date()}/{self.trace_id}/{self.span_id}_p{idx}.txt" try: @@ -1906,6 +1910,18 @@ def _get_events_schema() -> list[bigquery.SchemaField]: " '$.a2a_metadata.\"a2a:response\"') AS a2a_response" ), ], + "AGENT_RESPONSE": [ + "JSON_VALUE(content, '$.response') AS response_text", + "JSON_VALUE(attributes, '$.source_event_id') AS source_event_id", + ( + "JSON_VALUE(attributes," + " '$.source_event_author') AS source_event_author" + ), + ( + "JSON_VALUE(attributes," + " '$.source_event_branch') AS source_event_branch" + ), + ], } _VIEW_SQL_TEMPLATE = """\ @@ -2653,7 +2669,14 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: async def _ensure_started(self, **kwargs) -> None: """Ensures that the plugin is started and initialized.""" - if os.getpid() != self._init_pid: + # _init_pid == 0 means the plugin was unpickled and has never been + # initialized in this process (the pickle sentinel set by + # __getstate__). Skip the fork reset in that case — no fork + # happened, and _started is already False so _lazy_setup will run. + # Real forks are caught by os.register_at_fork (line 108) and by + # this check when _init_pid is a real (non-zero) PID from a + # different process. + if self._init_pid != 0 and os.getpid() != self._init_pid: self._reset_runtime_state() if not self._started: # Kept original lock name as it was not explicitly changed. @@ -2665,6 +2688,10 @@ async def _ensure_started(self, **kwargs) -> None: await self._lazy_setup(**kwargs) self._started = True self._startup_error = None + # Record the current PID so fork detection works for + # the rest of this instance's lifetime. + if self._init_pid == 0: + self._init_pid = os.getpid() except Exception as e: self._startup_error = e logger.error("Failed to initialize BigQuery Plugin: %s", e) @@ -2966,7 +2993,7 @@ async def on_event_callback( invocation_context: InvocationContext, event: "Event", ) -> None: - """Logs state changes, HITL events, and A2A interactions. + """Logs state changes, HITL events, A2A interactions, and agent responses. - Checks each event for a non-empty state_delta and logs it as a STATE_DELTA event. @@ -2978,6 +3005,9 @@ async def on_event_callback( and logs them as ``A2A_INTERACTION`` events so the remote agent's response and cross-reference IDs (``a2a:task_id``, ``a2a:context_id``) are visible in BigQuery. + - Detects final response events emitted by agents and logs + them as ``AGENT_RESPONSE`` so the visible response text + (after all callback modifications) is captured in BigQuery. The HITL detection must happen here (not in tool callbacks) because ``adk_request_credential``, ``adk_request_confirmation``, and @@ -3080,6 +3110,50 @@ async def on_event_callback( ), ) + # --- Final agent response logging --- + # Captures final response events emitted by agents (after all + # after_model_callback modifications). Uses a strict guard to + # avoid false positives from skip_summarization function + # responses, long-running tool pause events, and thought-only + # events (which ADK treats as invisible internal reasoning). + is_agent_response = ( + event.content + and event.content.parts + and event.is_final_response() + and event.partial is not True + and not event.get_function_calls() + and not event.get_function_responses() + and not event.long_running_tool_ids + ) + if is_agent_response: + # Filter to visible text parts only. Exclude thoughts + # (internal reasoning, A2A working/submitted updates), + # empty parts, and non-text parts (executable_code, etc.) + # that would render as "other" in _format_content. + visible_parts = [ + p + for p in event.content.parts + if p.text and not getattr(p, "thought", None) + ] + if visible_parts: + visible_content = types.Content( + role=event.content.role, parts=visible_parts + ) + formatted, truncated = self._format_content_safely(visible_content) + await self._log_event( + "AGENT_RESPONSE", + callback_ctx, + raw_content={"response": formatted}, + is_truncated=truncated, + event_data=EventData( + extra_attributes={ + "source_event_id": event.id, + "source_event_author": event.author, + "source_event_branch": event.branch, + }, + ), + ) + return None async def on_state_change_callback( diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index 8a05392bec..33fc2a7e17 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -7408,3 +7408,418 @@ async def test_view_error_still_logged( ) as plugin: await plugin._ensure_started() assert plugin._started + + +# ================================================================ +# TEST CLASS: Fork detection after pickle (Issue #86 / PR #5528) +# ================================================================ +class TestForkDetectionAfterPickle: + """Tests that unpickled plugins do not false-positive fork detection.""" + + @pytest.mark.asyncio + async def test_no_reset_after_unpickle( + self, + mock_auth_default, + mock_bq_client, + mock_write_client, + mock_to_arrow_schema, + mock_asyncio_to_thread, + ): + """Unpickled plugin does not trigger _reset_runtime_state and + records os.getpid() after startup.""" + import pickle + + config = bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + create_views=False, + ) + plugin = bigquery_agent_analytics_plugin.BigQueryAgentAnalyticsPlugin( + PROJECT_ID, DATASET_ID, table_id=TABLE_ID, config=config + ) + pickled = pickle.dumps(plugin) + unpickled = pickle.loads(pickled) + + assert unpickled._init_pid == 0 + + with mock.patch.object(unpickled, "_reset_runtime_state") as mock_reset: + await unpickled._ensure_started() + mock_reset.assert_not_called() + + assert unpickled._started + assert unpickled._init_pid == os.getpid() + await unpickled.shutdown() + + @pytest.mark.asyncio + async def test_reset_on_real_fork( + self, + mock_auth_default, + mock_bq_client, + mock_write_client, + mock_to_arrow_schema, + mock_asyncio_to_thread, + ): + """Plugin detects real fork when _init_pid is a real non-zero PID.""" + config = bigquery_agent_analytics_plugin.BigQueryLoggerConfig( + create_views=False, + ) + async with managed_plugin( + project_id=PROJECT_ID, + dataset_id=DATASET_ID, + table_id=TABLE_ID, + config=config, + ) as plugin: + await plugin._ensure_started() + plugin._init_pid = max(os.getpid() - 1, 1) + plugin._started = True + + with mock.patch.object( + plugin, "_reset_runtime_state", wraps=plugin._reset_runtime_state + ) as mock_reset: + await plugin._ensure_started() + mock_reset.assert_called_once() + + +# ================================================================ +# TEST CLASS: GCS offload unit mismatch fix (Issue #5561) +# ================================================================ +class TestOffloadUnitSeparation: + """Tests that byte-based inline limit and character-based truncation + limit are evaluated independently for the GCS offload decision.""" + + @pytest.mark.asyncio + async def test_multibyte_text_offloaded_by_byte_limit(self): + """Multi-byte text exceeding inline_text_limit bytes is offloaded.""" + mock_offloader = mock.AsyncMock() + mock_offloader.upload_content.return_value = "gs://bucket/offloaded.txt" + + parser = bigquery_agent_analytics_plugin.HybridContentParser( + offloader=mock_offloader, + trace_id="t", + span_id="s", + max_length=-1, + ) + text = "\U0001f600" * 10000 + assert len(text) == 10000 + assert len(text.encode("utf-8")) > 32 * 1024 + + content = types.Content(parts=[types.Part(text=text)]) + _, parts, _ = await parser._parse_content_object(content) + + mock_offloader.upload_content.assert_called_once() + assert parts[0]["storage_mode"] == "GCS_REFERENCE" + + @pytest.mark.asyncio + async def test_ascii_under_both_limits_stays_inline(self): + """ASCII text under both byte and character limits stays inline.""" + mock_offloader = mock.AsyncMock() + + parser = bigquery_agent_analytics_plugin.HybridContentParser( + offloader=mock_offloader, + trace_id="t", + span_id="s", + max_length=50000, + ) + text = "A" * 1000 + content = types.Content(parts=[types.Part(text=text)]) + _, parts, _ = await parser._parse_content_object(content) + + mock_offloader.upload_content.assert_not_called() + assert parts[0]["storage_mode"] == "INLINE" + assert parts[0]["text"] == text + + @pytest.mark.asyncio + async def test_text_exceeding_char_limit_offloaded(self): + """ASCII text exceeding max_length characters is offloaded.""" + mock_offloader = mock.AsyncMock() + mock_offloader.upload_content.return_value = "gs://bucket/big.txt" + + parser = bigquery_agent_analytics_plugin.HybridContentParser( + offloader=mock_offloader, + trace_id="t", + span_id="s", + max_length=100, + ) + text = "X" * 200 + assert len(text.encode("utf-8")) < 32 * 1024 + assert len(text) > 100 + + content = types.Content(parts=[types.Part(text=text)]) + _, parts, _ = await parser._parse_content_object(content) + + mock_offloader.upload_content.assert_called_once() + assert parts[0]["storage_mode"] == "GCS_REFERENCE" + + @pytest.mark.asyncio + async def test_multibyte_under_char_and_byte_limits_stays_inline(self): + """Regression test: 3K emoji (12K bytes) with max_length=10000 + should stay inline — under both real limits.""" + mock_offloader = mock.AsyncMock() + parser = bigquery_agent_analytics_plugin.HybridContentParser( + offloader=mock_offloader, + trace_id="t", + span_id="s", + max_length=10000, + ) + + text = "\U0001f600" * 3000 + assert len(text) < 10000 + assert len(text.encode("utf-8")) > 10000 + assert len(text.encode("utf-8")) < 32 * 1024 + + content = types.Content(parts=[types.Part(text=text)]) + _, parts, _ = await parser._parse_content_object(content) + + mock_offloader.upload_content.assert_not_called() + assert parts[0]["storage_mode"] == "INLINE" + + @pytest.mark.asyncio + async def test_no_offloader_falls_back_to_truncate(self): + """Without offloader, text exceeding char limit is truncated inline.""" + parser = bigquery_agent_analytics_plugin.HybridContentParser( + offloader=None, + trace_id="t", + span_id="s", + max_length=50, + ) + text = "Z" * 200 + content = types.Content(parts=[types.Part(text=text)]) + _, parts, is_truncated = await parser._parse_content_object(content) + + assert is_truncated + assert parts[0]["storage_mode"] == "INLINE" + assert "TRUNCATED" in parts[0]["text"] + + +# ================================================================ +# TEST CLASS: AGENT_RESPONSE logging (Issue #87) +# ================================================================ +class TestAgentResponseLogging: + """Tests that final agent response events are captured correctly.""" + + @pytest.mark.asyncio + async def test_logs_final_text_response( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """Final text response is logged as AGENT_RESPONSE with + source_event_author from event.author.""" + event = event_lib.Event( + author="sub_agent", + content=types.Content(parts=[types.Part(text="Here is your answer.")]), + ) + + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.05) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + agent_resp_rows = [r for r in rows if r["event_type"] == "AGENT_RESPONSE"] + assert len(agent_resp_rows) == 1 + row = agent_resp_rows[0] + content = json.loads(row["content"]) + assert "Here is your answer" in content["response"] + attributes = json.loads(row["attributes"]) + # source_event_author must come from event.author + assert attributes["source_event_author"] == "sub_agent" + + @pytest.mark.asyncio + async def test_skips_function_call_events( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + ): + """Events with function calls are not logged as AGENT_RESPONSE.""" + fc = types.FunctionCall(name="my_tool", args={"x": 1}) + event = event_lib.Event( + author="agent", + content=types.Content(parts=[types.Part(function_call=fc)]), + ) + + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.05) + assert mock_write_client.append_rows.call_count == 0 + + @pytest.mark.asyncio + async def test_skips_function_response_events( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + ): + """Events with function responses are not logged as AGENT_RESPONSE.""" + fr = types.FunctionResponse(name="my_tool", response={"result": "ok"}) + event = event_lib.Event( + author="agent", + content=types.Content(parts=[types.Part(function_response=fr)]), + actions=event_actions_lib.EventActions(skip_summarization=True), + ) + + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.05) + assert mock_write_client.append_rows.call_count == 0 + + @pytest.mark.asyncio + async def test_skips_partial_events( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + ): + """Partial streaming events are not logged as AGENT_RESPONSE.""" + event = event_lib.Event( + author="agent", + content=types.Content(parts=[types.Part(text="partial chunk")]), + partial=True, + ) + + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.05) + assert mock_write_client.append_rows.call_count == 0 + + @pytest.mark.asyncio + async def test_skips_long_running_tool_events( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + ): + """Long-running tool events are not logged as AGENT_RESPONSE.""" + fc = types.FunctionCall(name="long_tool", args={}) + event = event_lib.Event( + author="agent", + content=types.Content(parts=[types.Part(function_call=fc)]), + long_running_tool_ids={"call-1"}, + ) + + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.05) + assert mock_write_client.append_rows.call_count == 0 + + @pytest.mark.asyncio + async def test_skips_thought_only_events( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + ): + """Thought-only final events are not logged as AGENT_RESPONSE.""" + event = event_lib.Event( + author="agent", + content=types.Content( + parts=[types.Part(text="internal reasoning...", thought=True)] + ), + ) + + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.05) + assert mock_write_client.append_rows.call_count == 0 + + @pytest.mark.asyncio + async def test_mixed_thought_and_visible_logs_only_visible( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """Mixed thought + visible text logs only the visible portion.""" + event = event_lib.Event( + author="agent", + content=types.Content( + parts=[ + types.Part(text="thinking step 1...", thought=True), + types.Part(text="Here is the answer."), + ] + ), + ) + + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.05) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + agent_resp_rows = [r for r in rows if r["event_type"] == "AGENT_RESPONSE"] + assert len(agent_resp_rows) == 1 + content = json.loads(agent_resp_rows[0]["content"]) + assert "Here is the answer" in content["response"] + assert "thinking step" not in content["response"] + + @pytest.mark.asyncio + async def test_skips_empty_part_events( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + ): + """Events with only empty Part() do not log AGENT_RESPONSE.""" + event = event_lib.Event( + author="agent", + content=types.Content(parts=[types.Part()]), + ) + + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.05) + assert mock_write_client.append_rows.call_count == 0 + + @pytest.mark.asyncio + async def test_skips_empty_text_events( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + ): + """Events with Part(text='') do not log AGENT_RESPONSE.""" + event = event_lib.Event( + author="agent", + content=types.Content(parts=[types.Part(text="")]), + ) + + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.05) + assert mock_write_client.append_rows.call_count == 0 + + @pytest.mark.asyncio + async def test_skips_executable_code_only_events( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + ): + """Events with only executable_code parts do not log AGENT_RESPONSE.""" + event = event_lib.Event( + author="agent", + content=types.Content( + parts=[ + types.Part( + executable_code=types.ExecutableCode( + code="print('hi')", language="PYTHON" + ) + ) + ] + ), + ) + + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.05) + assert mock_write_client.append_rows.call_count == 0