From 2adcd72b4ba5ed60da1eb481d7ac8114348c1d82 Mon Sep 17 00:00:00 2001 From: Evan Mitchell Date: Thu, 18 Jun 2026 16:36:57 +0100 Subject: [PATCH 1/6] Add Google ADK sample agent A365 observability wiring (PoC) Span remap + token caching + Cloud Run deploy scaffolding for routing Google ADK agent telemetry through the Agent 365 observability pipeline. --- python/google-adk/sample-agent/.gcloudignore | 14 + .../google-adk/sample-agent/.python-version | 1 + python/google-adk/sample-agent/Procfile | 1 + python/google-adk/sample-agent/_freeze.py | 15 + python/google-adk/sample-agent/agent.py | 66 ++++- python/google-adk/sample-agent/debug_probe.py | 272 ++++++++++++++++++ .../sample-agent/deploy-cloudrun.ps1 | 75 +++++ python/google-adk/sample-agent/main.py | 38 +++ .../sample-agent/observability_remap.py | 128 +++++++++ .../google-adk/sample-agent/requirements.txt | 88 ++++++ python/google-adk/sample-agent/token_cache.py | 92 ++++++ 11 files changed, 788 insertions(+), 2 deletions(-) create mode 100644 python/google-adk/sample-agent/.gcloudignore create mode 100644 python/google-adk/sample-agent/.python-version create mode 100644 python/google-adk/sample-agent/Procfile create mode 100644 python/google-adk/sample-agent/_freeze.py create mode 100644 python/google-adk/sample-agent/debug_probe.py create mode 100644 python/google-adk/sample-agent/deploy-cloudrun.ps1 create mode 100644 python/google-adk/sample-agent/observability_remap.py create mode 100644 python/google-adk/sample-agent/requirements.txt create mode 100644 python/google-adk/sample-agent/token_cache.py diff --git a/python/google-adk/sample-agent/.gcloudignore b/python/google-adk/sample-agent/.gcloudignore new file mode 100644 index 00000000..fe5ee551 --- /dev/null +++ b/python/google-adk/sample-agent/.gcloudignore @@ -0,0 +1,14 @@ +# Cloud Run source upload allowlist. +# Ignore everything, then re-include only what the container needs at build/run. +# This forces the pip buildpack (requirements.txt) and avoids uploading the +# uv.lock (stale, missing mcp), the local .venv, secrets (.env), and a365 config. +* + +!*.py +!Procfile +!requirements.txt +!.python-version +!ToolingManifest.json + +# Re-exclude helper/local-only python files matched by !*.py above. +_freeze.py diff --git a/python/google-adk/sample-agent/.python-version b/python/google-adk/sample-agent/.python-version new file mode 100644 index 00000000..24ee5b1b --- /dev/null +++ b/python/google-adk/sample-agent/.python-version @@ -0,0 +1 @@ +3.13 diff --git a/python/google-adk/sample-agent/Procfile b/python/google-adk/sample-agent/Procfile new file mode 100644 index 00000000..629b83ad --- /dev/null +++ b/python/google-adk/sample-agent/Procfile @@ -0,0 +1 @@ +web: python main.py diff --git a/python/google-adk/sample-agent/_freeze.py b/python/google-adk/sample-agent/_freeze.py new file mode 100644 index 00000000..4de7b099 --- /dev/null +++ b/python/google-adk/sample-agent/_freeze.py @@ -0,0 +1,15 @@ +import importlib.metadata as m + +skip = {"sample-google-adk", "pip", "setuptools", "wheel"} +# Windows-only packages present in the local venv that have no Linux build. +skip |= {"pywin32", "pywin32-ctypes", "pypiwin32", "pywinpty", "winsdk", "windows-curses"} +lines = [] +for d in m.distributions(): + name = d.metadata["Name"] + if not name or name.lower() in skip: + continue + lines.append(f"{name}=={d.version}") +lines = sorted(set(lines), key=str.lower) +with open("requirements.txt", "w", encoding="utf-8") as f: + f.write("\n".join(lines) + "\n") +print("count", len(lines)) diff --git a/python/google-adk/sample-agent/agent.py b/python/google-adk/sample-agent/agent.py index 23c6d61e..9eb95e89 100644 --- a/python/google-adk/sample-agent/agent.py +++ b/python/google-adk/sample-agent/agent.py @@ -174,8 +174,70 @@ async def invoke_agent_with_scope( # Fall back to env vars so observability baggage is still populated. recipient = context.activity.recipient tenant_id = getattr(recipient, "tenant_id", None) or os.getenv("AGENTIC_TENANT_ID", "") - agent_id = getattr(recipient, "agentic_user_id", None) or os.getenv("AGENTIC_USER_ID", "") - with BaggageBuilder().tenant_id(tenant_id).agent_id(agent_id).build(): + # The A365 observability ingestion endpoint enforces ValidateAgentIdentity: + # the {agentId} in the export URL (and every gen_ai.agent.id span attribute) + # must equal the application id (azp/appid) carried in the agentic OBO token. + # That token's azp is the agent_app_instance_id (AGENTIC_APP_ID), NOT the + # agentic_user_id — so the observability agent id must be the app instance id. + agent_id = getattr(recipient, "agentic_app_id", None) or os.getenv("AGENTIC_APP_ID", "") + + # Identity enrichment so the exported spans carry the dimensions the Agent 365 / + # IDEAs activity rollup needs to attribute usage. Without these, the admin-center + # "Activity" tabs stay empty ("When people are using agents, their usage data will + # show up here") even though spans ingest successfully: + # - user.id (the invoking human) -> "active users" metric + # - microsoft.a365.agent.blueprint.id -> blueprint-level Activity tab + # - microsoft.agent.user.id / email -> agent (instance) attribution + from_prop = context.activity.from_property + user_aad_object_id = getattr(from_prop, "aad_object_id", None) or getattr(from_prop, "id", None) + user_display_name = getattr(from_prop, "name", None) + # Blueprint id (a365.generated.config.json → agentBlueprintId). This is the + # blueprint/template id, distinct from AGENTIC_APP_ID (the instance app id). + # The service-connection CLIENTID already carries the blueprint id, so we reuse + # it as the fallback and no extra env var is required for deployment. + blueprint_id = ( + getattr(recipient, "agent_blueprint_id", None) + or os.getenv("AGENT_BLUEPRINT_ID") + or os.getenv("CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTID", "") + ) + agentic_user_id = getattr(recipient, "agentic_user_id", None) or os.getenv("AGENTIC_USER_ID", "") + agentic_user_email = os.getenv("AGENTIC_UPN", "") + + baggage = ( + BaggageBuilder() + .tenant_id(tenant_id) + .agent_id(agent_id) + .agent_blueprint_id(blueprint_id) + .agentic_user_id(agentic_user_id) + .agentic_user_email(agentic_user_email) + .user_id(user_aad_object_id) + .user_name(user_display_name) + ) + with baggage.build(): + # When running with an agentic auth handler (production), exchange for an + # observability-scoped token and cache it so the A365 exporter can authenticate + # its span export for this (tenant, agent). Best-effort: a failure here must + # not break the turn — it only means spans aren't exported this cycle. + if auth_handler_name and tenant_id and agent_id: + try: + from microsoft_agents_a365.runtime.environment_utils import ( + get_observability_authentication_scope, + ) + from token_cache import cache_agentic_token + + exaau_token = await auth.exchange_token( + context, + scopes=get_observability_authentication_scope(), + auth_handler_id=auth_handler_name, + ) + if exaau_token and getattr(exaau_token, "token", None): + cache_agentic_token(tenant_id, agent_id, exaau_token.token) + logger.info("Cached observability token for agent_id=%s", agent_id) + else: + logger.warning("Observability token exchange returned no token") + except Exception as e: + logger.warning("Observability token exchange failed: %s", e) + return await self.invoke_agent(message=message, auth=auth, auth_handler_name=auth_handler_name, context=context) async def _cleanup_agent(self, agent: Agent): diff --git a/python/google-adk/sample-agent/debug_probe.py b/python/google-adk/sample-agent/debug_probe.py new file mode 100644 index 00000000..d397601b --- /dev/null +++ b/python/google-adk/sample-agent/debug_probe.py @@ -0,0 +1,272 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Egress diagnostic probe for the Agent 365 Observability ingestion endpoint. + +Purpose +------- +The agent emits correctly-shaped, identity-stamped spans, but the A365 span +exporter's HTTPS POST to ``agent365.svc.cloud.microsoft`` was observed failing +with ``SSL: UNEXPECTED_EOF_WHILE_READING`` (a TLS-handshake reset). Every prior +handshake test was run from the corp network or against a *different* host +(``login.microsoftonline.com``) — never a raw handshake from inside the +Cloud Run container to the ingestion host itself. + +This module reproduces the export network path from the **exact** runtime and +egress of the live agent. It is mounted as ``GET /debug/otlp-probe`` and runs, +in order: + +1. **DNS** — what the container resolves ``agent365.svc.cloud.microsoft`` to. +2. **Raw TLS handshake** — a bare ``socket`` + ``ssl`` connection (default + context, then TLS 1.2 forced) to ``host:443`` with SNI, reporting the + negotiated protocol/cipher and the peer-certificate subject. This isolates + the handshake from any HTTP/auth concerns. +3. **HTTPS POST** — a minimal single-span OTLP/HTTP+JSON request via the same + ``requests`` library the SDK exporter uses, to the same URL the SDK builds. + If a cached observability token exists (from a prior authenticated turn) it + is attached; otherwise the POST runs token-less (a ``401``/``403`` still + proves TLS + HTTP reachability — the opposite of a handshake reset). + +Interpreting results +--------------------- +* Any **HTTP status code** at step 3 (even 401/403/400) ⇒ TLS and the network + path from Cloud Run are healthy; the "Microsoft blocks GCP at TLS" theory is + disproven and the original failure was transient or exporter-level. +* A reproduced **``UNEXPECTED_EOF`` / handshake error** at step 2 or 3 ⇒ the + network hypothesis is confirmed *from the real egress path*, giving hard + evidence for an egress-IP allowlist or relay decision. + +Security +-------- +The route is gated by the ``DEBUG_PROBE_KEY`` env var (passed as ``?key=``). +If the env var is unset the route returns 404 so it can't be probed on a public +URL. The response never includes the bearer token or any cert private material. +This is a temporary diagnostic — remove the route (or unset the key) afterward. +""" + +import os +import socket +import ssl +import time +import traceback +import logging + +import requests + +logger = logging.getLogger(__name__) + +# The ingestion host the SDK exporter targets by default. Kept in sync with +# DEFAULT_ENDPOINT_URL in microsoft_agents_a365.observability.core.exporters. +_DEFAULT_HOST = "agent365.svc.cloud.microsoft" +_HTTP_TIMEOUT_SECONDS = 30 +_TLS_TIMEOUT_SECONDS = 15 + + +def _resolve_endpoint() -> tuple[str, str, str, str]: + """Return (host, full_url, tenant_id, agent_id) matching the SDK exporter. + + Uses the SDK's own URL builder and domain-override handling so the probe + hits the identical URL the exporter would, including the delegated + ``/observability`` path and ``api-version=1``. + """ + tenant_id = os.getenv("AGENTIC_TENANT_ID", "") + agent_id = os.getenv("AGENTIC_USER_ID", "") + + host = _DEFAULT_HOST + endpoint = f"https://{_DEFAULT_HOST}" + try: + from microsoft_agents_a365.observability.core.exporters.utils import ( + build_export_url, + get_validated_domain_override, + ) + + override = get_validated_domain_override() + if override: + endpoint = override if "://" in override else f"https://{override}" + # use_s2s_endpoint=False mirrors the exporter default (delegated/OBO path). + full_url = build_export_url(endpoint, agent_id, tenant_id, False) + except Exception as e: # pragma: no cover - defensive + logger.warning("Falling back to manual URL build: %s", e) + full_url = ( + f"{endpoint}/observability/tenants/{tenant_id}" + f"/otlp/agents/{agent_id}/traces?api-version=1" + ) + + # Derive the host actually being dialed (honors a domain override). + try: + from urllib.parse import urlparse + + host = urlparse(full_url).hostname or _DEFAULT_HOST + except Exception: + pass + + return host, full_url, tenant_id, agent_id + + +def _probe_dns(host: str) -> dict: + result: dict = {"host": host} + try: + infos = socket.getaddrinfo(host, 443, proto=socket.IPPROTO_TCP) + addrs = sorted({info[4][0] for info in infos}) + result["resolved"] = addrs + result["ok"] = True + except Exception as e: + result["ok"] = False + result["error"] = f"{type(e).__name__}: {e}" + return result + + +def _probe_tls(host: str, *, force_tls12: bool) -> dict: + """Open a raw TLS connection and report the negotiated parameters. + + No HTTP is sent — this isolates the TLS handshake from auth/HTTP so an + ``UNEXPECTED_EOF`` here is unambiguously a handshake-level failure. + """ + label = "tls1_2_forced" if force_tls12 else "tls_default" + out: dict = {"variant": label} + start = time.monotonic() + try: + ctx = ssl.create_default_context() + if force_tls12: + ctx.minimum_version = ssl.TLSVersion.TLSv1_2 + ctx.maximum_version = ssl.TLSVersion.TLSv1_2 + with socket.create_connection((host, 443), timeout=_TLS_TIMEOUT_SECONDS) as sock: + with ctx.wrap_socket(sock, server_hostname=host) as tls: + cert = tls.getpeercert() or {} + subject = dict(x[0] for x in cert.get("subject", [])) + issuer = dict(x[0] for x in cert.get("issuer", [])) + out.update( + ok=True, + negotiated_protocol=tls.version(), + cipher=tls.cipher()[0] if tls.cipher() else None, + alpn=tls.selected_alpn_protocol(), + peer_cert_cn=subject.get("commonName"), + peer_cert_issuer=issuer.get("commonName"), + elapsed_ms=round((time.monotonic() - start) * 1000), + ) + except Exception as e: + out.update( + ok=False, + error_type=type(e).__name__, + error=str(e), + elapsed_ms=round((time.monotonic() - start) * 1000), + ) + return out + + +def _minimal_otlp_body(tenant_id: str, agent_id: str) -> str: + """Smallest valid OTLP/HTTP+JSON body: a single ``invoke_agent`` span. + + Mirrors the shape documented in the direct-OTel integration guide so a 200 + response would be a genuine accept (subject to the usual downstream drop + conditions). Times are Unix-epoch-nanosecond strings. + """ + import json + + now_ns = time.time_ns() + span = { + "traceId": "0af7651916cd43dd8448eb211c80319c", + "spanId": "b7ad6b7169203331", + "name": "invoke_agent probe", + "kind": 1, + "startTimeUnixNano": str(now_ns - 1_000_000), + "endTimeUnixNano": str(now_ns), + "attributes": [ + {"key": "gen_ai.operation.name", "value": {"stringValue": "invoke_agent"}}, + {"key": "microsoft.tenant.id", "value": {"stringValue": tenant_id}}, + {"key": "gen_ai.agent.id", "value": {"stringValue": agent_id}}, + ], + "status": {"code": 1}, + } + body = { + "resourceSpans": [ + { + "resource": {"attributes": []}, + "scopeSpans": [{"scope": {"name": "debug-probe"}, "spans": [span]}], + } + ] + } + return json.dumps(body, separators=(",", ":")) + + +def _probe_https_post(full_url: str, tenant_id: str, agent_id: str) -> dict: + """POST a minimal span using the same ``requests`` stack as the exporter.""" + out: dict = {"url_path": full_url.split("?")[0]} + body = _minimal_otlp_body(tenant_id, agent_id) + headers = {"content-type": "application/json", "connection": "close"} + + token = None + try: + from token_cache import get_cached_agentic_token + + token = get_cached_agentic_token(tenant_id, agent_id) + except Exception as e: + out["token_lookup_error"] = f"{type(e).__name__}: {e}" + out["token_attached"] = bool(token) + if token: + headers["authorization"] = f"Bearer {token}" + + start = time.monotonic() + try: + resp = requests.post( + full_url, + data=body.encode("utf-8"), + headers=headers, + timeout=_HTTP_TIMEOUT_SECONDS, + ) + out.update( + ok=True, + http_status=resp.status_code, + elapsed_ms=round((time.monotonic() - start) * 1000), + correlation_id=( + resp.headers.get("x-ms-correlation-id") + or resp.headers.get("request-id") + or "N/A" + ), + # Body is small (partialSuccess JSON or an error page); cap it so we + # never echo anything large. Never contains the bearer token. + response_body=resp.text[:2000], + ) + except Exception as e: + out.update( + ok=False, + error_type=type(e).__name__, + error=str(e), + elapsed_ms=round((time.monotonic() - start) * 1000), + traceback=traceback.format_exc()[-1500:], + ) + return out + + +def run_probe() -> dict: + """Run the full DNS → TLS → HTTPS diagnostic and return a JSON-able dict.""" + host, full_url, tenant_id, agent_id = _resolve_endpoint() + summary = { + "target_host": host, + "tenant_id_present": bool(tenant_id), + "agent_id_present": bool(agent_id), + "dns": _probe_dns(host), + "tls_default": _probe_tls(host, force_tls12=False), + "tls_1_2_forced": _probe_tls(host, force_tls12=True), + "https_post": _probe_https_post(full_url, tenant_id, agent_id), + } + + # One-line verdict to make the log/JSON instantly readable. + post = summary["https_post"] + if post.get("ok"): + summary["verdict"] = ( + f"TLS + HTTP reachable from this egress (HTTP {post.get('http_status')}). " + "Handshake-reset theory DISPROVEN — investigate exporter/auth/transient." + ) + elif summary["tls_default"].get("ok"): + summary["verdict"] = ( + "Raw TLS handshake succeeded but the HTTPS POST failed — " + "likely HTTP/auth/proxy layer, not a handshake reset." + ) + else: + summary["verdict"] = ( + "Raw TLS handshake FAILED from this egress — network hypothesis " + "CONFIRMED from the real Cloud Run path. Evidence for allowlist/relay." + ) + return summary diff --git a/python/google-adk/sample-agent/deploy-cloudrun.ps1 b/python/google-adk/sample-agent/deploy-cloudrun.ps1 new file mode 100644 index 00000000..356d3ef8 --- /dev/null +++ b/python/google-adk/sample-agent/deploy-cloudrun.ps1 @@ -0,0 +1,75 @@ +# Deploy the Google ADK A365 sample agent to GCP Cloud Run. +# +# Reads non-secret + secret env from the local .env (gitignored), applies the +# production overrides needed for the A365 observability exporter, and deploys +# via Cloud Run source buildpacks (Procfile -> `python main.py`). +# +# Cloud Run automatically injects PORT and K_SERVICE; main.py reads both, so the +# JWT middleware + production host binding engage with no extra config. +# +# Usage: +# .\deploy-cloudrun.ps1 -ProjectId [-Region us-central1] [-ServiceName gcp-a365-agent] + +param( + [Parameter(Mandatory = $true)] [string] $ProjectId, + [string] $Region = "us-central1", + [string] $ServiceName = "gcp-a365-agent" +) + +$ErrorActionPreference = "Stop" +Set-Location $PSScriptRoot + +if (-not (Test-Path ".env")) { throw ".env not found in $PSScriptRoot" } + +# Production overrides applied on top of .env. PORT is intentionally omitted +# (Cloud Run sets it). AUTH_HANDLER_NAME=AGENTIC turns on agentic token exchange. +$overrides = [ordered]@{ + "AUTH_HANDLER_NAME" = "AGENTIC" + "ENABLE_OBSERVABILITY" = "true" + "ENABLE_A365_OBSERVABILITY_EXPORTER" = "true" + "PYTHON_ENVIRONMENT" = "production" +} + +# Parse .env into an ordered map (skip comments, blanks, and PORT). +$envMap = [ordered]@{} +foreach ($line in Get-Content ".env") { + $trimmed = $line.Trim() + if ($trimmed -eq "" -or $trimmed.StartsWith("#")) { continue } + $idx = $trimmed.IndexOf("=") + if ($idx -lt 1) { continue } + $key = $trimmed.Substring(0, $idx).Trim() + $val = $trimmed.Substring($idx + 1).Trim() + if ($key -eq "PORT") { continue } + $envMap[$key] = $val +} +foreach ($k in $overrides.Keys) { $envMap[$k] = $overrides[$k] } + +# Build the env-vars string using a custom delimiter (^##^) so values containing +# commas, slashes, colons, etc. are passed verbatim to gcloud. +$pairs = @() +foreach ($k in $envMap.Keys) { $pairs += "$k=$($envMap[$k])" } +$envArg = "^##^" + ($pairs -join "##") + +Write-Host "Deploying '$ServiceName' to project '$ProjectId' ($Region) with $($envMap.Count) env vars..." -ForegroundColor Cyan + +# --no-cpu-throttling (CPU always allocated) is REQUIRED: the OTel BatchSpanProcessor +# exports genAI spans on a background thread AFTER the turn returns. With default CPU +# throttling, that thread wakes on a frozen CPU and its TLS read stalls -> the gateway +# drops the connection (SSL UNEXPECTED_EOF_WHILE_READING) and spans are lost. +gcloud run deploy $ServiceName ` + --source . ` + --project $ProjectId ` + --region $Region ` + --platform managed ` + --allow-unauthenticated ` + --no-cpu-throttling ` + --set-env-vars $envArg + +if ($LASTEXITCODE -ne 0) { throw "gcloud run deploy failed (exit $LASTEXITCODE)" } + +$url = gcloud run services describe $ServiceName --project $ProjectId --region $Region --format "value(status.url)" +Write-Host "" +Write-Host "Deployed. Service URL: $url" -ForegroundColor Green +Write-Host "Messaging endpoint: $url/api/messages" -ForegroundColor Green +Write-Host "" +Write-Host "Next: set messagingEndpoint in a365.config.json to the above, then run 'a365 setup all'." -ForegroundColor Yellow diff --git a/python/google-adk/sample-agent/main.py b/python/google-adk/sample-agent/main.py index d3378700..4c8b6325 100644 --- a/python/google-adk/sample-agent/main.py +++ b/python/google-adk/sample-agent/main.py @@ -28,6 +28,11 @@ logging.basicConfig(level=log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s") logger = logging.getLogger(__name__) +# TEMPORARY: surface the A365 exporter's per-chunk export result ("HTTP 200 success +# ... Correlation ID") which is logged at DEBUG, without flooding root with DEBUG. +# Remove alongside the TLS_PROBE diagnostic once export is confirmed. +logging.getLogger("microsoft_agents_a365.observability.core.exporters.agent365_exporter").setLevel(logging.DEBUG) + def start_server(agent_app: AgentApplication): """Start the agent application server.""" isProduction = ( @@ -104,10 +109,31 @@ async def selective_jwt_auth(request, handler): async def health_check(req: Request) -> Response: return Response(text="OK", status=200) + # Temporary egress diagnostic for the A365 observability ingestion endpoint. + # Runs DNS -> raw TLS handshake -> minimal OTLP POST from THIS container's + # egress path (the test never previously run from inside Cloud Run). Gated by + # DEBUG_PROBE_KEY; returns 404 when the key is unset so it can't be probed on + # a public URL. Remove the route (or unset the key) once diagnosis is done. + async def otlp_probe(req: Request) -> Response: + import json as _json + from aiohttp.web import json_response + + expected_key = os.getenv("DEBUG_PROBE_KEY", "") + if not expected_key or req.query.get("key") != expected_key: + return Response(text="Not found", status=404) + + from debug_probe import run_probe + # run_probe does blocking socket/TLS/HTTP work; offload from the loop. + import asyncio + result = await asyncio.to_thread(run_probe) + logger.info("OTLP egress probe verdict: %s", result.get("verdict")) + return json_response(result, dumps=lambda o: _json.dumps(o, indent=2)) + # Configure App app = Application(middlewares=middlewares) app.router.add_get("/", health_check) app.router.add_get("/robots933456.txt", health_check) + app.router.add_get("/debug/otlp-probe", otlp_probe) app.router.add_post("/api/messages", entry_point) app["agent_configuration"] = agent_auth_config @@ -140,10 +166,22 @@ def main(): # ENABLE_A365_OBSERVABILITY_EXPORTER=true sends traces to the A365 backend; # false falls back to the console exporter (expected in local/dev). if os.getenv("ENABLE_OBSERVABILITY", "true").lower() == "true": + # token_resolver supplies the Bearer token the A365 exporter uses to POST + # spans. It reads the agentic token cached during each authenticated turn + # (see agent.py invoke_agent_with_scope). When the A365 exporter is disabled + # (console exporter), the resolver is simply never called. + from token_cache import observability_token_resolver configure( service_name=os.getenv("OBSERVABILITY_SERVICE_NAME", "GoogleADKSampleAgent"), service_namespace=os.getenv("OBSERVABILITY_SERVICE_NAMESPACE", "GoogleADKTesting"), + token_resolver=observability_token_resolver, ) + # Google ADK tags the LLM span as gen_ai.operation.name="generate_content", + # which A365/Maven ingestion drops (it only accepts invoke_agent, execute_tool, + # chat, output_messages). Remap generate_content -> chat on export so the + # inference span (model + token usage) reaches Maven's InferenceCall table. + from observability_remap import register_generate_content_remap + register_generate_content_remap() logger.info( "Observability configured (service=%s, a365_exporter=%s)", os.getenv("OBSERVABILITY_SERVICE_NAME", "GoogleADKSampleAgent"), diff --git a/python/google-adk/sample-agent/observability_remap.py b/python/google-adk/sample-agent/observability_remap.py new file mode 100644 index 00000000..a4ed6672 --- /dev/null +++ b/python/google-adk/sample-agent/observability_remap.py @@ -0,0 +1,128 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Span operation-name remap for the Google ADK sample agent. + +Google ADK auto-instrumentation tags the LLM call with +``gen_ai.operation.name = "generate_content"`` (the Google GenAI semantic +convention). Microsoft Agent 365 / Maven ingestion only accepts four +operation names — ``invoke_agent``, ``execute_tool``, ``chat`` and +``output_messages`` — and drops every other span before fan-out. As a +result the ADK inference span (model, token usage, finish reason) never +reaches Maven. + +This module rewrites ``generate_content`` -> ``chat`` on export using the +A365 observability SDK's public enricher hook (``register_span_enricher``), +so the inference span maps onto Maven's InferenceCall table. The original +span is never mutated; an :class:`EnrichedReadableSpan` overlay is returned +with the single attribute overridden. + +No changes to the A365 SDK or to Maven are required. +""" + +import logging + +from opentelemetry.sdk.trace import ReadableSpan + +from microsoft_agents_a365.observability.core.constants import ( + CHAT_OPERATION_NAME, + EXECUTE_TOOL_OPERATION_NAME, + GEN_AI_OPERATION_NAME_KEY, + INVOKE_AGENT_OPERATION_NAME, + OUTPUT_MESSAGES_OPERATION_NAME, +) +from microsoft_agents_a365.observability.core.exporters.enriched_span import ( + EnrichedReadableSpan, +) +from microsoft_agents_a365.observability.core.exporters.enriching_span_processor import ( + get_span_enricher, + register_span_enricher, + unregister_span_enricher, +) + +logger = logging.getLogger(__name__) + +# The Google GenAI semantic-convention operation name (emitted only when the +# optional ``opentelemetry-instrumentation-google-genai`` package is installed). +_SOURCE_OPERATION_NAME = "generate_content" + +# Attribute set by ADK's ``trace_call_llm`` on the inference span. ADK does NOT +# set ``gen_ai.operation.name`` on this span, so without a remap it is dropped by +# the Agent 365 exporter (which only keeps invoke_agent/execute_tool/chat/ +# output_messages). Presence of this attribute identifies an inference call. +_GEN_AI_REQUEST_MODEL_KEY = "gen_ai.request.model" + +# Operation names the Agent 365 exporter already considers eligible. A span that +# already carries one of these must never be relabelled. +_RECOGNIZED_OPERATION_NAMES = frozenset( + { + INVOKE_AGENT_OPERATION_NAME, + EXECUTE_TOOL_OPERATION_NAME, + OUTPUT_MESSAGES_OPERATION_NAME, + CHAT_OPERATION_NAME, + } +) + + +def _remap_generate_content_to_chat(span: ReadableSpan) -> ReadableSpan: + """Map an ADK / Google GenAI inference span onto the ``chat`` operation. + + Two shapes are handled: + + 1. ``gen_ai.operation.name == "generate_content"`` — emitted by the optional + ``opentelemetry-instrumentation-google-genai`` package. + 2. ADK's own ``call_llm`` span, which sets ``gen_ai.request.model`` but no + ``gen_ai.operation.name`` at all. This is the default for google-adk + without the genai instrumentation package, and is the case in this + sample. + + Any span that already carries a recognized operation name (invoke_agent, + execute_tool, chat, output_messages) is returned unchanged. + """ + attributes = span.attributes or {} + operation_name = attributes.get(GEN_AI_OPERATION_NAME_KEY) + + # Never relabel a span that already has an eligible operation name. + if operation_name in _RECOGNIZED_OPERATION_NAMES: + return span + + is_genai_generate_content = operation_name == _SOURCE_OPERATION_NAME + is_adk_inference_span = ( + operation_name is None + and attributes.get(_GEN_AI_REQUEST_MODEL_KEY) is not None + ) + if not (is_genai_generate_content or is_adk_inference_span): + return span + + return EnrichedReadableSpan( + span, + extra_attributes={GEN_AI_OPERATION_NAME_KEY: CHAT_OPERATION_NAME}, + ) + + +def register_generate_content_remap() -> None: + """Register the ``generate_content`` -> ``chat`` enricher with the SDK. + + The SDK allows a single enricher at a time. If another enricher is already + registered (e.g. a platform instrumentor), this composes with it: the + existing enricher runs first, then the remap is applied to its result. + Safe to call once during application startup, after ``configure()``. + """ + existing = get_span_enricher() + + if existing is None: + enricher = _remap_generate_content_to_chat + else: + def enricher(span: ReadableSpan) -> ReadableSpan: + return _remap_generate_content_to_chat(existing(span)) + + # Replace the existing single-slot enricher with the composed one. + unregister_span_enricher() + + register_span_enricher(enricher) + logger.info( + "Registered span enricher: %s -> %s remap", + _SOURCE_OPERATION_NAME, + CHAT_OPERATION_NAME, + ) diff --git a/python/google-adk/sample-agent/requirements.txt b/python/google-adk/sample-agent/requirements.txt new file mode 100644 index 00000000..ff3df312 --- /dev/null +++ b/python/google-adk/sample-agent/requirements.txt @@ -0,0 +1,88 @@ +aiohappyeyeballs==2.6.2 +aiohttp==3.14.1 +aiosignal==1.4.0 +aiosqlite==0.22.1 +annotated-doc==0.0.4 +annotated-types==0.7.0 +anyio==4.13.0 +attrs==26.1.0 +Authlib==1.7.2 +azure-core==1.41.0 +certifi==2026.5.20 +cffi==2.0.0 +charset-normalizer==3.4.7 +click==8.4.1 +colorama==0.4.6 +cryptography==48.0.0 +distro==1.9.0 +fastapi==0.136.3 +frozenlist==1.8.0 +google-adk==2.2.0 +google-auth==2.53.0 +google-genai==2.8.0 +googleapis-common-protos==1.75.0 +graphviz==0.21 +grpcio==1.81.0 +h11==0.16.0 +httpcore==1.0.9 +httptools==0.8.0 +httpx-sse==0.4.3 +httpx==0.28.1 +idna==3.18 +importlib_metadata==8.7.1 +isodate==0.7.2 +joserfc==1.7.1 +jsonschema-specifications==2025.9.1 +jsonschema==4.26.0 +mcp==1.27.2 +microsoft-agents-a365-notifications==1.0.0 +microsoft-agents-a365-observability-core==1.0.0 +microsoft-agents-a365-runtime==1.0.0 +microsoft-agents-a365-tooling==1.0.0 +microsoft-agents-activity==1.1.0.dev1 +microsoft-agents-authentication-msal==1.1.0.dev1 +microsoft-agents-hosting-aiohttp==1.1.0.dev1 +microsoft-agents-hosting-core==1.1.0.dev1 +msal==1.38.0rc1 +multidict==6.7.1 +opentelemetry-api==1.38.0 +opentelemetry-exporter-otlp-proto-common==1.38.0 +opentelemetry-exporter-otlp-proto-grpc==1.38.0 +opentelemetry-exporter-otlp-proto-http==1.38.0 +opentelemetry-exporter-otlp==1.38.0 +opentelemetry-proto==1.38.0 +opentelemetry-sdk==1.38.0 +opentelemetry-semantic-conventions==0.59b0 +packaging==26.2 +propcache==0.5.2 +protobuf==6.33.6 +pyasn1==0.6.3 +pyasn1_modules==0.4.2 +pycparser==3.0 +pydantic-settings==2.14.1 +pydantic==2.14.0a1 +pydantic_core==2.47.0 +PyJWT==2.13.0 +pyOpenSSL==26.2.0 +python-dotenv==1.2.2 +python-multipart==0.0.32 +PyYAML==6.0.3 +referencing==0.37.0 +requests==2.34.2 +rpds-py==2026.5.1 +sniffio==1.3.1 +sse-starlette==3.4.4 +starlette==1.2.1 +tenacity==9.1.4 +typing-inspection==0.4.2 +typing_extensions==4.15.0 +tzdata==2026.2 +tzlocal==5.3.1 +urllib3==2.7.0 +uvicorn==0.49.0 +watchdog==6.0.0 +watchfiles==1.2.0 +websockets==15.0.1 +wrapt==2.2.1 +yarl==1.24.2 +zipp==4.1.0 diff --git a/python/google-adk/sample-agent/token_cache.py b/python/google-adk/sample-agent/token_cache.py new file mode 100644 index 00000000..a8d693f6 --- /dev/null +++ b/python/google-adk/sample-agent/token_cache.py @@ -0,0 +1,92 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Token caching utilities for the Agent 365 Observability exporter. + +The A365 span exporter authenticates each export with a Bearer token resolved +via ``token_resolver(agent_id, tenant_id)``. That token is the agentic token the +agent obtains during an authenticated turn (via ``auth.exchange_token`` against +the observability scope). Because span export happens asynchronously on the +batch processor's schedule — after the turn handler returns — we cache the token +per (tenant, agent) during the turn and hand it back from the resolver. +""" + +import logging + +import httpx +import requests + +logger = logging.getLogger(__name__) + +# Process-wide cache of agentic tokens keyed by "{tenant_id}:{agent_id}". +_agentic_token_cache: dict[str, str] = {} + + +def _probe_export_transport(agent_id: str, tenant_id: str, token: str) -> None: + """TEMPORARY DIAGNOSTIC: isolate the intermittent TLS UNEXPECTED_EOF seen on + the A365 span export. The SDK exporter uses requests/urllib3 and hits the EOF, + while MCP calls (httpx) to the SAME host succeed. POST a tiny body to the exact + traces URL with BOTH clients during the turn (instance awake) and log each + result side-by-side. Any HTTP status (even 400/403/415) proves the transport + completed; an exception identifies the failing client. Remove once resolved. + """ + url = ( + f"https://agent365.svc.cloud.microsoft/observability/tenants/{tenant_id}" + f"/otlp/agents/{agent_id}/traces?api-version=1" + ) + headers = {"content-type": "application/json", "authorization": f"Bearer {token}"} + body = b"{}" + + # requests / urllib3 -- same HTTP stack as the A365 exporter. + try: + resp = requests.post(url, data=body, headers=headers, timeout=30) + cid = resp.headers.get("x-ms-correlation-id") or resp.headers.get("request-id") or "N/A" + logger.warning("TLS_PROBE requests: status=%s correlation=%s", resp.status_code, cid) + except Exception as exc: # noqa: BLE001 - diagnostic only + logger.warning("TLS_PROBE requests: EXC %s: %s", type(exc).__name__, exc) + + # httpx -- same HTTP stack as MCP, which returns 200 to this host. + try: + with httpx.Client(timeout=30) as client: + resp = client.post(url, content=body, headers=headers) + cid = resp.headers.get("x-ms-correlation-id") or resp.headers.get("request-id") or "N/A" + logger.warning("TLS_PROBE httpx: status=%s correlation=%s", resp.status_code, cid) + except Exception as exc: # noqa: BLE001 - diagnostic only + logger.warning("TLS_PROBE httpx: EXC %s: %s", type(exc).__name__, exc) + + +def cache_agentic_token(tenant_id: str, agent_id: str, token: str) -> None: + """Cache the agentic token for use by the Agent 365 Observability exporter.""" + key = f"{tenant_id}:{agent_id}" + _agentic_token_cache[key] = token + logger.debug("Cached agentic token for %s", key) + _probe_export_transport(agent_id, tenant_id, token) + + +def get_cached_agentic_token(tenant_id: str, agent_id: str) -> str | None: + """Retrieve the cached agentic token for the Agent 365 Observability exporter.""" + key = f"{tenant_id}:{agent_id}" + token = _agentic_token_cache.get(key) + if token: + logger.debug("Retrieved cached agentic token for %s", key) + else: + logger.debug("No cached token found for %s", key) + return token + + +def observability_token_resolver(agent_id: str, tenant_id: str) -> str | None: + """Resolve the export Bearer token for the A365 Observability exporter. + + The exporter calls this with ``(agent_id, tenant_id)``; the cache is keyed + ``(tenant_id, agent_id)`` — note the argument order swap. + """ + token = get_cached_agentic_token(tenant_id, agent_id) + if not token: + logger.warning( + "No cached agentic token for agent_id=%s tenant_id=%s; " + "spans for this identity will not be exported this cycle.", + agent_id, + tenant_id, + ) + return token From e08b7b553531ec5dd150d1e880ce23b5b279b793 Mon Sep 17 00:00:00 2001 From: Evan Mitchell Date: Fri, 19 Jun 2026 10:36:26 +0100 Subject: [PATCH 2/6] Remove temporary OTLP egress diagnostics and bump vulnerable deps - Delete debug_probe.py (insecure TLS probe flagged by CodeQL) - Strip /debug/otlp-probe route and forced DEBUG logger from main.py - Strip _probe_export_transport TLS diagnostic from token_cache.py - Bump cryptography 48.0.0 -> 49.0.0 (OpenSSL wheel advisory) - Bump starlette 1.2.1 -> 1.3.1 (request.form DoS advisory) --- python/google-adk/sample-agent/debug_probe.py | 272 ------------------ python/google-adk/sample-agent/main.py | 26 -- .../google-adk/sample-agent/requirements.txt | 4 +- python/google-adk/sample-agent/token_cache.py | 37 --- 4 files changed, 2 insertions(+), 337 deletions(-) delete mode 100644 python/google-adk/sample-agent/debug_probe.py diff --git a/python/google-adk/sample-agent/debug_probe.py b/python/google-adk/sample-agent/debug_probe.py deleted file mode 100644 index d397601b..00000000 --- a/python/google-adk/sample-agent/debug_probe.py +++ /dev/null @@ -1,272 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -""" -Egress diagnostic probe for the Agent 365 Observability ingestion endpoint. - -Purpose -------- -The agent emits correctly-shaped, identity-stamped spans, but the A365 span -exporter's HTTPS POST to ``agent365.svc.cloud.microsoft`` was observed failing -with ``SSL: UNEXPECTED_EOF_WHILE_READING`` (a TLS-handshake reset). Every prior -handshake test was run from the corp network or against a *different* host -(``login.microsoftonline.com``) — never a raw handshake from inside the -Cloud Run container to the ingestion host itself. - -This module reproduces the export network path from the **exact** runtime and -egress of the live agent. It is mounted as ``GET /debug/otlp-probe`` and runs, -in order: - -1. **DNS** — what the container resolves ``agent365.svc.cloud.microsoft`` to. -2. **Raw TLS handshake** — a bare ``socket`` + ``ssl`` connection (default - context, then TLS 1.2 forced) to ``host:443`` with SNI, reporting the - negotiated protocol/cipher and the peer-certificate subject. This isolates - the handshake from any HTTP/auth concerns. -3. **HTTPS POST** — a minimal single-span OTLP/HTTP+JSON request via the same - ``requests`` library the SDK exporter uses, to the same URL the SDK builds. - If a cached observability token exists (from a prior authenticated turn) it - is attached; otherwise the POST runs token-less (a ``401``/``403`` still - proves TLS + HTTP reachability — the opposite of a handshake reset). - -Interpreting results ---------------------- -* Any **HTTP status code** at step 3 (even 401/403/400) ⇒ TLS and the network - path from Cloud Run are healthy; the "Microsoft blocks GCP at TLS" theory is - disproven and the original failure was transient or exporter-level. -* A reproduced **``UNEXPECTED_EOF`` / handshake error** at step 2 or 3 ⇒ the - network hypothesis is confirmed *from the real egress path*, giving hard - evidence for an egress-IP allowlist or relay decision. - -Security --------- -The route is gated by the ``DEBUG_PROBE_KEY`` env var (passed as ``?key=``). -If the env var is unset the route returns 404 so it can't be probed on a public -URL. The response never includes the bearer token or any cert private material. -This is a temporary diagnostic — remove the route (or unset the key) afterward. -""" - -import os -import socket -import ssl -import time -import traceback -import logging - -import requests - -logger = logging.getLogger(__name__) - -# The ingestion host the SDK exporter targets by default. Kept in sync with -# DEFAULT_ENDPOINT_URL in microsoft_agents_a365.observability.core.exporters. -_DEFAULT_HOST = "agent365.svc.cloud.microsoft" -_HTTP_TIMEOUT_SECONDS = 30 -_TLS_TIMEOUT_SECONDS = 15 - - -def _resolve_endpoint() -> tuple[str, str, str, str]: - """Return (host, full_url, tenant_id, agent_id) matching the SDK exporter. - - Uses the SDK's own URL builder and domain-override handling so the probe - hits the identical URL the exporter would, including the delegated - ``/observability`` path and ``api-version=1``. - """ - tenant_id = os.getenv("AGENTIC_TENANT_ID", "") - agent_id = os.getenv("AGENTIC_USER_ID", "") - - host = _DEFAULT_HOST - endpoint = f"https://{_DEFAULT_HOST}" - try: - from microsoft_agents_a365.observability.core.exporters.utils import ( - build_export_url, - get_validated_domain_override, - ) - - override = get_validated_domain_override() - if override: - endpoint = override if "://" in override else f"https://{override}" - # use_s2s_endpoint=False mirrors the exporter default (delegated/OBO path). - full_url = build_export_url(endpoint, agent_id, tenant_id, False) - except Exception as e: # pragma: no cover - defensive - logger.warning("Falling back to manual URL build: %s", e) - full_url = ( - f"{endpoint}/observability/tenants/{tenant_id}" - f"/otlp/agents/{agent_id}/traces?api-version=1" - ) - - # Derive the host actually being dialed (honors a domain override). - try: - from urllib.parse import urlparse - - host = urlparse(full_url).hostname or _DEFAULT_HOST - except Exception: - pass - - return host, full_url, tenant_id, agent_id - - -def _probe_dns(host: str) -> dict: - result: dict = {"host": host} - try: - infos = socket.getaddrinfo(host, 443, proto=socket.IPPROTO_TCP) - addrs = sorted({info[4][0] for info in infos}) - result["resolved"] = addrs - result["ok"] = True - except Exception as e: - result["ok"] = False - result["error"] = f"{type(e).__name__}: {e}" - return result - - -def _probe_tls(host: str, *, force_tls12: bool) -> dict: - """Open a raw TLS connection and report the negotiated parameters. - - No HTTP is sent — this isolates the TLS handshake from auth/HTTP so an - ``UNEXPECTED_EOF`` here is unambiguously a handshake-level failure. - """ - label = "tls1_2_forced" if force_tls12 else "tls_default" - out: dict = {"variant": label} - start = time.monotonic() - try: - ctx = ssl.create_default_context() - if force_tls12: - ctx.minimum_version = ssl.TLSVersion.TLSv1_2 - ctx.maximum_version = ssl.TLSVersion.TLSv1_2 - with socket.create_connection((host, 443), timeout=_TLS_TIMEOUT_SECONDS) as sock: - with ctx.wrap_socket(sock, server_hostname=host) as tls: - cert = tls.getpeercert() or {} - subject = dict(x[0] for x in cert.get("subject", [])) - issuer = dict(x[0] for x in cert.get("issuer", [])) - out.update( - ok=True, - negotiated_protocol=tls.version(), - cipher=tls.cipher()[0] if tls.cipher() else None, - alpn=tls.selected_alpn_protocol(), - peer_cert_cn=subject.get("commonName"), - peer_cert_issuer=issuer.get("commonName"), - elapsed_ms=round((time.monotonic() - start) * 1000), - ) - except Exception as e: - out.update( - ok=False, - error_type=type(e).__name__, - error=str(e), - elapsed_ms=round((time.monotonic() - start) * 1000), - ) - return out - - -def _minimal_otlp_body(tenant_id: str, agent_id: str) -> str: - """Smallest valid OTLP/HTTP+JSON body: a single ``invoke_agent`` span. - - Mirrors the shape documented in the direct-OTel integration guide so a 200 - response would be a genuine accept (subject to the usual downstream drop - conditions). Times are Unix-epoch-nanosecond strings. - """ - import json - - now_ns = time.time_ns() - span = { - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "b7ad6b7169203331", - "name": "invoke_agent probe", - "kind": 1, - "startTimeUnixNano": str(now_ns - 1_000_000), - "endTimeUnixNano": str(now_ns), - "attributes": [ - {"key": "gen_ai.operation.name", "value": {"stringValue": "invoke_agent"}}, - {"key": "microsoft.tenant.id", "value": {"stringValue": tenant_id}}, - {"key": "gen_ai.agent.id", "value": {"stringValue": agent_id}}, - ], - "status": {"code": 1}, - } - body = { - "resourceSpans": [ - { - "resource": {"attributes": []}, - "scopeSpans": [{"scope": {"name": "debug-probe"}, "spans": [span]}], - } - ] - } - return json.dumps(body, separators=(",", ":")) - - -def _probe_https_post(full_url: str, tenant_id: str, agent_id: str) -> dict: - """POST a minimal span using the same ``requests`` stack as the exporter.""" - out: dict = {"url_path": full_url.split("?")[0]} - body = _minimal_otlp_body(tenant_id, agent_id) - headers = {"content-type": "application/json", "connection": "close"} - - token = None - try: - from token_cache import get_cached_agentic_token - - token = get_cached_agentic_token(tenant_id, agent_id) - except Exception as e: - out["token_lookup_error"] = f"{type(e).__name__}: {e}" - out["token_attached"] = bool(token) - if token: - headers["authorization"] = f"Bearer {token}" - - start = time.monotonic() - try: - resp = requests.post( - full_url, - data=body.encode("utf-8"), - headers=headers, - timeout=_HTTP_TIMEOUT_SECONDS, - ) - out.update( - ok=True, - http_status=resp.status_code, - elapsed_ms=round((time.monotonic() - start) * 1000), - correlation_id=( - resp.headers.get("x-ms-correlation-id") - or resp.headers.get("request-id") - or "N/A" - ), - # Body is small (partialSuccess JSON or an error page); cap it so we - # never echo anything large. Never contains the bearer token. - response_body=resp.text[:2000], - ) - except Exception as e: - out.update( - ok=False, - error_type=type(e).__name__, - error=str(e), - elapsed_ms=round((time.monotonic() - start) * 1000), - traceback=traceback.format_exc()[-1500:], - ) - return out - - -def run_probe() -> dict: - """Run the full DNS → TLS → HTTPS diagnostic and return a JSON-able dict.""" - host, full_url, tenant_id, agent_id = _resolve_endpoint() - summary = { - "target_host": host, - "tenant_id_present": bool(tenant_id), - "agent_id_present": bool(agent_id), - "dns": _probe_dns(host), - "tls_default": _probe_tls(host, force_tls12=False), - "tls_1_2_forced": _probe_tls(host, force_tls12=True), - "https_post": _probe_https_post(full_url, tenant_id, agent_id), - } - - # One-line verdict to make the log/JSON instantly readable. - post = summary["https_post"] - if post.get("ok"): - summary["verdict"] = ( - f"TLS + HTTP reachable from this egress (HTTP {post.get('http_status')}). " - "Handshake-reset theory DISPROVEN — investigate exporter/auth/transient." - ) - elif summary["tls_default"].get("ok"): - summary["verdict"] = ( - "Raw TLS handshake succeeded but the HTTPS POST failed — " - "likely HTTP/auth/proxy layer, not a handshake reset." - ) - else: - summary["verdict"] = ( - "Raw TLS handshake FAILED from this egress — network hypothesis " - "CONFIRMED from the real Cloud Run path. Evidence for allowlist/relay." - ) - return summary diff --git a/python/google-adk/sample-agent/main.py b/python/google-adk/sample-agent/main.py index 4c8b6325..f0f6de83 100644 --- a/python/google-adk/sample-agent/main.py +++ b/python/google-adk/sample-agent/main.py @@ -28,11 +28,6 @@ logging.basicConfig(level=log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s") logger = logging.getLogger(__name__) -# TEMPORARY: surface the A365 exporter's per-chunk export result ("HTTP 200 success -# ... Correlation ID") which is logged at DEBUG, without flooding root with DEBUG. -# Remove alongside the TLS_PROBE diagnostic once export is confirmed. -logging.getLogger("microsoft_agents_a365.observability.core.exporters.agent365_exporter").setLevel(logging.DEBUG) - def start_server(agent_app: AgentApplication): """Start the agent application server.""" isProduction = ( @@ -109,31 +104,10 @@ async def selective_jwt_auth(request, handler): async def health_check(req: Request) -> Response: return Response(text="OK", status=200) - # Temporary egress diagnostic for the A365 observability ingestion endpoint. - # Runs DNS -> raw TLS handshake -> minimal OTLP POST from THIS container's - # egress path (the test never previously run from inside Cloud Run). Gated by - # DEBUG_PROBE_KEY; returns 404 when the key is unset so it can't be probed on - # a public URL. Remove the route (or unset the key) once diagnosis is done. - async def otlp_probe(req: Request) -> Response: - import json as _json - from aiohttp.web import json_response - - expected_key = os.getenv("DEBUG_PROBE_KEY", "") - if not expected_key or req.query.get("key") != expected_key: - return Response(text="Not found", status=404) - - from debug_probe import run_probe - # run_probe does blocking socket/TLS/HTTP work; offload from the loop. - import asyncio - result = await asyncio.to_thread(run_probe) - logger.info("OTLP egress probe verdict: %s", result.get("verdict")) - return json_response(result, dumps=lambda o: _json.dumps(o, indent=2)) - # Configure App app = Application(middlewares=middlewares) app.router.add_get("/", health_check) app.router.add_get("/robots933456.txt", health_check) - app.router.add_get("/debug/otlp-probe", otlp_probe) app.router.add_post("/api/messages", entry_point) app["agent_configuration"] = agent_auth_config diff --git a/python/google-adk/sample-agent/requirements.txt b/python/google-adk/sample-agent/requirements.txt index ff3df312..160bcd4c 100644 --- a/python/google-adk/sample-agent/requirements.txt +++ b/python/google-adk/sample-agent/requirements.txt @@ -13,7 +13,7 @@ cffi==2.0.0 charset-normalizer==3.4.7 click==8.4.1 colorama==0.4.6 -cryptography==48.0.0 +cryptography==49.0.0 distro==1.9.0 fastapi==0.136.3 frozenlist==1.8.0 @@ -72,7 +72,7 @@ requests==2.34.2 rpds-py==2026.5.1 sniffio==1.3.1 sse-starlette==3.4.4 -starlette==1.2.1 +starlette==1.3.1 tenacity==9.1.4 typing-inspection==0.4.2 typing_extensions==4.15.0 diff --git a/python/google-adk/sample-agent/token_cache.py b/python/google-adk/sample-agent/token_cache.py index a8d693f6..b1d2a3ee 100644 --- a/python/google-adk/sample-agent/token_cache.py +++ b/python/google-adk/sample-agent/token_cache.py @@ -14,54 +14,17 @@ import logging -import httpx -import requests - logger = logging.getLogger(__name__) # Process-wide cache of agentic tokens keyed by "{tenant_id}:{agent_id}". _agentic_token_cache: dict[str, str] = {} -def _probe_export_transport(agent_id: str, tenant_id: str, token: str) -> None: - """TEMPORARY DIAGNOSTIC: isolate the intermittent TLS UNEXPECTED_EOF seen on - the A365 span export. The SDK exporter uses requests/urllib3 and hits the EOF, - while MCP calls (httpx) to the SAME host succeed. POST a tiny body to the exact - traces URL with BOTH clients during the turn (instance awake) and log each - result side-by-side. Any HTTP status (even 400/403/415) proves the transport - completed; an exception identifies the failing client. Remove once resolved. - """ - url = ( - f"https://agent365.svc.cloud.microsoft/observability/tenants/{tenant_id}" - f"/otlp/agents/{agent_id}/traces?api-version=1" - ) - headers = {"content-type": "application/json", "authorization": f"Bearer {token}"} - body = b"{}" - - # requests / urllib3 -- same HTTP stack as the A365 exporter. - try: - resp = requests.post(url, data=body, headers=headers, timeout=30) - cid = resp.headers.get("x-ms-correlation-id") or resp.headers.get("request-id") or "N/A" - logger.warning("TLS_PROBE requests: status=%s correlation=%s", resp.status_code, cid) - except Exception as exc: # noqa: BLE001 - diagnostic only - logger.warning("TLS_PROBE requests: EXC %s: %s", type(exc).__name__, exc) - - # httpx -- same HTTP stack as MCP, which returns 200 to this host. - try: - with httpx.Client(timeout=30) as client: - resp = client.post(url, content=body, headers=headers) - cid = resp.headers.get("x-ms-correlation-id") or resp.headers.get("request-id") or "N/A" - logger.warning("TLS_PROBE httpx: status=%s correlation=%s", resp.status_code, cid) - except Exception as exc: # noqa: BLE001 - diagnostic only - logger.warning("TLS_PROBE httpx: EXC %s: %s", type(exc).__name__, exc) - - def cache_agentic_token(tenant_id: str, agent_id: str, token: str) -> None: """Cache the agentic token for use by the Agent 365 Observability exporter.""" key = f"{tenant_id}:{agent_id}" _agentic_token_cache[key] = token logger.debug("Cached agentic token for %s", key) - _probe_export_transport(agent_id, tenant_id, token) def get_cached_agentic_token(tenant_id: str, agent_id: str) -> str | None: From bc5ac061f5d4c81bd956e1706ee8be1b3a4e64a4 Mon Sep 17 00:00:00 2001 From: Evan Mitchell Date: Fri, 19 Jun 2026 10:41:33 +0100 Subject: [PATCH 3/6] Pin cryptography==48.0.1 (fixes GHSA-537c-gmf6-5ccf, stays <49 for pyopenssl) --- python/google-adk/sample-agent/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/google-adk/sample-agent/requirements.txt b/python/google-adk/sample-agent/requirements.txt index 160bcd4c..34527e1e 100644 --- a/python/google-adk/sample-agent/requirements.txt +++ b/python/google-adk/sample-agent/requirements.txt @@ -13,7 +13,7 @@ cffi==2.0.0 charset-normalizer==3.4.7 click==8.4.1 colorama==0.4.6 -cryptography==49.0.0 +cryptography==48.0.1 distro==1.9.0 fastapi==0.136.3 frozenlist==1.8.0 From e629b134bc325aa0d21cec7168cda6046d02b62d Mon Sep 17 00:00:00 2001 From: Evan Mitchell Date: Fri, 19 Jun 2026 15:27:06 +0100 Subject: [PATCH 4/6] Address review: thread-safe token cache with TTL eviction; add license header to _freeze.py - token_cache.py: guard cache reads/writes with a process-wide threading.Lock (request path vs BatchSpanProcessor export thread); store (token, expiry) with expiry parsed from JWT exp claim and evict expired entries on read. - _freeze.py: add Microsoft copyright + MIT license header. --- python/google-adk/sample-agent/_freeze.py | 2 + python/google-adk/sample-agent/token_cache.py | 46 +++++++++++++++++-- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/python/google-adk/sample-agent/_freeze.py b/python/google-adk/sample-agent/_freeze.py index 4de7b099..98ff4bb3 100644 --- a/python/google-adk/sample-agent/_freeze.py +++ b/python/google-adk/sample-agent/_freeze.py @@ -1,3 +1,5 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. import importlib.metadata as m skip = {"sample-google-adk", "pip", "setuptools", "wheel"} diff --git a/python/google-adk/sample-agent/token_cache.py b/python/google-adk/sample-agent/token_cache.py index b1d2a3ee..4b4aba72 100644 --- a/python/google-adk/sample-agent/token_cache.py +++ b/python/google-adk/sample-agent/token_cache.py @@ -12,25 +12,65 @@ per (tenant, agent) during the turn and hand it back from the resolver. """ +import base64 +import binascii +import json import logging +import threading +import time logger = logging.getLogger(__name__) +# Fallback lifetime (seconds) used when the token's JWT ``exp`` claim cannot be +# parsed. Kept short so a malformed/opaque token is re-cached every turn rather +# than lingering past its real expiry. +_DEFAULT_TOKEN_TTL_SECONDS = 300 + # Process-wide cache of agentic tokens keyed by "{tenant_id}:{agent_id}". -_agentic_token_cache: dict[str, str] = {} +# Values are ``(token, expires_at_epoch_seconds)``. The cache is read by the +# OpenTelemetry BatchSpanProcessor export thread and written on the request/turn +# path, so every access is guarded by ``_cache_lock``. +_cache_lock = threading.Lock() +_agentic_token_cache: dict[str, tuple[str, float]] = {} + + +def _token_expiry(token: str) -> float: + """Return the epoch-seconds expiry for ``token`` from its JWT ``exp`` claim. + + Falls back to ``now + _DEFAULT_TOKEN_TTL_SECONDS`` when the token is not a + decodable JWT. The signature is not verified — this is only used to expire + the local cache entry, never to authorize anything. + """ + try: + payload_b64 = token.split(".")[1] + payload_b64 += "=" * (-len(payload_b64) % 4) # restore base64url padding + payload = json.loads(base64.urlsafe_b64decode(payload_b64)) + exp = float(payload["exp"]) + return exp + except (IndexError, KeyError, ValueError, TypeError, binascii.Error): + return time.time() + _DEFAULT_TOKEN_TTL_SECONDS def cache_agentic_token(tenant_id: str, agent_id: str, token: str) -> None: """Cache the agentic token for use by the Agent 365 Observability exporter.""" key = f"{tenant_id}:{agent_id}" - _agentic_token_cache[key] = token + expires_at = _token_expiry(token) + with _cache_lock: + _agentic_token_cache[key] = (token, expires_at) logger.debug("Cached agentic token for %s", key) def get_cached_agentic_token(tenant_id: str, agent_id: str) -> str | None: """Retrieve the cached agentic token for the Agent 365 Observability exporter.""" key = f"{tenant_id}:{agent_id}" - token = _agentic_token_cache.get(key) + now = time.time() + with _cache_lock: + entry = _agentic_token_cache.get(key) + if entry is not None and entry[1] <= now: + # Expired: drop it so we don't export with a stale token. + del _agentic_token_cache[key] + entry = None + token = entry[0] if entry is not None else None if token: logger.debug("Retrieved cached agentic token for %s", key) else: From f5770ed74ae2388a3c53501f687e708dde35df47 Mon Sep 17 00:00:00 2001 From: Evan Mitchell Date: Fri, 19 Jun 2026 15:43:36 +0100 Subject: [PATCH 5/6] Remove _freeze.py dev-only requirements generator from sample Local pip-freeze helper not needed by sample consumers; requirements.txt is the committed contract. --- python/google-adk/sample-agent/_freeze.py | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 python/google-adk/sample-agent/_freeze.py diff --git a/python/google-adk/sample-agent/_freeze.py b/python/google-adk/sample-agent/_freeze.py deleted file mode 100644 index 98ff4bb3..00000000 --- a/python/google-adk/sample-agent/_freeze.py +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. -import importlib.metadata as m - -skip = {"sample-google-adk", "pip", "setuptools", "wheel"} -# Windows-only packages present in the local venv that have no Linux build. -skip |= {"pywin32", "pywin32-ctypes", "pypiwin32", "pywinpty", "winsdk", "windows-curses"} -lines = [] -for d in m.distributions(): - name = d.metadata["Name"] - if not name or name.lower() in skip: - continue - lines.append(f"{name}=={d.version}") -lines = sorted(set(lines), key=str.lower) -with open("requirements.txt", "w", encoding="utf-8") as f: - f.write("\n".join(lines) + "\n") -print("count", len(lines)) From c27b914b37d91509aff8d51ecdd3c80b3318334f Mon Sep 17 00:00:00 2001 From: Evan Mitchell Date: Wed, 24 Jun 2026 09:57:31 +0100 Subject: [PATCH 6/6] docs: document Cloud Run deploy script and A365 observability exporter wiring --- python/google-adk/sample-agent/README.md | 60 ++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/python/google-adk/sample-agent/README.md b/python/google-adk/sample-agent/README.md index 71c73af3..b17bf967 100644 --- a/python/google-adk/sample-agent/README.md +++ b/python/google-adk/sample-agent/README.md @@ -261,9 +261,34 @@ All values below come from `a365.config.json` and `a365.generated.config.json` ( See [Deploy agent to GCP](https://learn.microsoft.com/en-us/microsoft-agent-365/developer/deploy-agent-gcp) for full instructions. +Use the provided deploy script — it reads your local `.env`, applies the +production overrides the A365 observability exporter needs, and deploys via +Cloud Run source buildpacks (`Procfile` → `python main.py`): + +```powershell +./deploy-cloudrun.ps1 -ProjectId [-Region us-central1] [-ServiceName gcp-a365-agent] +``` + +The script deploys with **`--no-cpu-throttling`, which is required**: the +OpenTelemetry `BatchSpanProcessor` exports genAI spans on a background thread +*after* the turn returns. With default Cloud Run CPU throttling that thread +wakes on a frozen CPU and its TLS write stalls, so the gateway drops the +connection (`SSL UNEXPECTED_EOF_WHILE_READING`) and spans are silently lost. + +> Cloud Run injects `PORT` and `K_SERVICE` automatically; `main.py` reads both, +> so the JWT middleware and production host binding engage with no extra config. + +Build inputs are controlled by three scaffolding files in this folder: +`Procfile` (start command), `.python-version` (runtime), and `.gcloudignore` +(forces the pip/`requirements.txt` buildpack and excludes the local `.venv`, +`.env`, and a365 config from the upload). + +To deploy manually instead of using the script: + ```bash -# Deploy to Cloud Run -gcloud run deploy gcp-a365-agent --source . --region us-central1 --platform managed --allow-unauthenticated +gcloud run deploy gcp-a365-agent --source . --region us-central1 \ + --platform managed --allow-unauthenticated --no-cpu-throttling \ + --set-env-vars "ENABLE_OBSERVABILITY=true,ENABLE_A365_OBSERVABILITY_EXPORTER=true,AUTH_HANDLER_NAME=AGENTIC,..." ``` Set `a365.config.json` with your Cloud Run URL and `needDeployment: false`: @@ -283,6 +308,33 @@ Register only the messaging endpoint (skip Azure deploy): a365 setup blueprint --endpoint-only ``` +### Production observability (A365 exporter) + +When `ENABLE_A365_OBSERVABILITY_EXPORTER=true`, genAI spans are exported to the +Agent 365 backend instead of the console. Two pieces of wiring make this work: + +**1. Per-turn token exchange + cache.** The A365 exporter authenticates each +span export with a Bearer token, but export happens asynchronously *after* the +turn handler returns. During each authenticated turn, `agent.py` exchanges an +observability-scoped agentic token and caches it per `(tenant, agent)` in +`token_cache.py`. `main.py` passes `observability_token_resolver` to +`configure()`, which the exporter calls to retrieve that cached token. Token +exchange is best-effort — a failure only means spans aren't exported that cycle, +it never breaks the turn. + +> The observability **agent id is the app instance id (`AGENTIC_APP_ID`)**, not +> `AGENTIC_USER_ID`. The ingestion endpoint enforces `ValidateAgentIdentity`: +> the `{agentId}` in the export URL must equal the `azp`/`appid` in the agentic +> token, which is the app instance id. + +**2. Span operation-name remap.** Google ADK tags its inference span with +`gen_ai.operation.name = "generate_content"` (or no operation name at all), +which A365/Maven ingestion drops — it only accepts `invoke_agent`, +`execute_tool`, `chat`, and `output_messages`. `observability_remap.py` +rewrites these inference spans to `chat` on export (via the SDK's +`register_span_enricher` hook) so model and token-usage data reaches Maven's +`InferenceCall` table. No SDK or Maven changes are required. + ### Messaging endpoint reference See [Configure messaging endpoint](https://learn.microsoft.com/en-us/microsoft-agent-365/developer/agent-messaging-endpoint) for all hosting options. @@ -367,9 +419,9 @@ All configuration is via environment variables (`.env` for local, App Settings f | `GOOGLE_GENAI_USE_VERTEXAI` | `FALSE` | Set `TRUE` to use Vertex AI instead of Gemini API | | `AUTH_HANDLER_NAME` | _(empty)_ | Empty = anonymous (Playground/local), `AGENTIC` = production | | `BEARER_TOKEN` | _(empty)_ | Token for MCP tool access. Get with `a365 develop get-token -o raw` | -| `AGENTIC_APP_ID` | — | Agent App ID from A365 portal | +| `AGENTIC_APP_ID` | — | Agent **app instance** ID. Used as the observability agent id (must match the token `azp`) | | `AGENTIC_TENANT_ID` | — | Azure tenant ID | -| `AGENTIC_USER_ID` | — | Agent User ID from A365 portal | +| `AGENTIC_USER_ID` | — | Agent **user** ID (identity enrichment); populated after admin approves the instance | | `PORT` | `3978` | Server port (Azure sets this to `8000` automatically) | | `ENABLE_OBSERVABILITY` | `true` | Enable OpenTelemetry tracing | | `ENABLE_A365_OBSERVABILITY_EXPORTER` | `false` | Send traces to A365 backend (`true` for production) |