diff --git a/python/google-adk/sample-agent/.gcloudignore b/python/google-adk/sample-agent/.gcloudignore new file mode 100644 index 00000000..fe5ee551 --- /dev/null +++ b/python/google-adk/sample-agent/.gcloudignore @@ -0,0 +1,14 @@ +# Cloud Run source upload allowlist. +# Ignore everything, then re-include only what the container needs at build/run. +# This forces the pip buildpack (requirements.txt) and avoids uploading the +# uv.lock (stale, missing mcp), the local .venv, secrets (.env), and a365 config. +* + +!*.py +!Procfile +!requirements.txt +!.python-version +!ToolingManifest.json + +# Re-exclude helper/local-only python files matched by !*.py above. +_freeze.py diff --git a/python/google-adk/sample-agent/.python-version b/python/google-adk/sample-agent/.python-version new file mode 100644 index 00000000..24ee5b1b --- /dev/null +++ b/python/google-adk/sample-agent/.python-version @@ -0,0 +1 @@ +3.13 diff --git a/python/google-adk/sample-agent/Procfile b/python/google-adk/sample-agent/Procfile new file mode 100644 index 00000000..629b83ad --- /dev/null +++ b/python/google-adk/sample-agent/Procfile @@ -0,0 +1 @@ +web: python main.py diff --git a/python/google-adk/sample-agent/README.md b/python/google-adk/sample-agent/README.md index 71c73af3..b17bf967 100644 --- a/python/google-adk/sample-agent/README.md +++ b/python/google-adk/sample-agent/README.md @@ -261,9 +261,34 @@ All values below come from `a365.config.json` and `a365.generated.config.json` ( See [Deploy agent to GCP](https://learn.microsoft.com/en-us/microsoft-agent-365/developer/deploy-agent-gcp) for full instructions. +Use the provided deploy script — it reads your local `.env`, applies the +production overrides the A365 observability exporter needs, and deploys via +Cloud Run source buildpacks (`Procfile` → `python main.py`): + +```powershell +./deploy-cloudrun.ps1 -ProjectId [-Region us-central1] [-ServiceName gcp-a365-agent] +``` + +The script deploys with **`--no-cpu-throttling`, which is required**: the +OpenTelemetry `BatchSpanProcessor` exports genAI spans on a background thread +*after* the turn returns. With default Cloud Run CPU throttling that thread +wakes on a frozen CPU and its TLS write stalls, so the gateway drops the +connection (`SSL UNEXPECTED_EOF_WHILE_READING`) and spans are silently lost. + +> Cloud Run injects `PORT` and `K_SERVICE` automatically; `main.py` reads both, +> so the JWT middleware and production host binding engage with no extra config. + +Build inputs are controlled by three scaffolding files in this folder: +`Procfile` (start command), `.python-version` (runtime), and `.gcloudignore` +(forces the pip/`requirements.txt` buildpack and excludes the local `.venv`, +`.env`, and a365 config from the upload). + +To deploy manually instead of using the script: + ```bash -# Deploy to Cloud Run -gcloud run deploy gcp-a365-agent --source . --region us-central1 --platform managed --allow-unauthenticated +gcloud run deploy gcp-a365-agent --source . --region us-central1 \ + --platform managed --allow-unauthenticated --no-cpu-throttling \ + --set-env-vars "ENABLE_OBSERVABILITY=true,ENABLE_A365_OBSERVABILITY_EXPORTER=true,AUTH_HANDLER_NAME=AGENTIC,..." ``` Set `a365.config.json` with your Cloud Run URL and `needDeployment: false`: @@ -283,6 +308,33 @@ Register only the messaging endpoint (skip Azure deploy): a365 setup blueprint --endpoint-only ``` +### Production observability (A365 exporter) + +When `ENABLE_A365_OBSERVABILITY_EXPORTER=true`, genAI spans are exported to the +Agent 365 backend instead of the console. Two pieces of wiring make this work: + +**1. Per-turn token exchange + cache.** The A365 exporter authenticates each +span export with a Bearer token, but export happens asynchronously *after* the +turn handler returns. During each authenticated turn, `agent.py` exchanges an +observability-scoped agentic token and caches it per `(tenant, agent)` in +`token_cache.py`. `main.py` passes `observability_token_resolver` to +`configure()`, which the exporter calls to retrieve that cached token. Token +exchange is best-effort — a failure only means spans aren't exported that cycle, +it never breaks the turn. + +> The observability **agent id is the app instance id (`AGENTIC_APP_ID`)**, not +> `AGENTIC_USER_ID`. The ingestion endpoint enforces `ValidateAgentIdentity`: +> the `{agentId}` in the export URL must equal the `azp`/`appid` in the agentic +> token, which is the app instance id. + +**2. Span operation-name remap.** Google ADK tags its inference span with +`gen_ai.operation.name = "generate_content"` (or no operation name at all), +which A365/Maven ingestion drops — it only accepts `invoke_agent`, +`execute_tool`, `chat`, and `output_messages`. `observability_remap.py` +rewrites these inference spans to `chat` on export (via the SDK's +`register_span_enricher` hook) so model and token-usage data reaches Maven's +`InferenceCall` table. No SDK or Maven changes are required. + ### Messaging endpoint reference See [Configure messaging endpoint](https://learn.microsoft.com/en-us/microsoft-agent-365/developer/agent-messaging-endpoint) for all hosting options. @@ -367,9 +419,9 @@ All configuration is via environment variables (`.env` for local, App Settings f | `GOOGLE_GENAI_USE_VERTEXAI` | `FALSE` | Set `TRUE` to use Vertex AI instead of Gemini API | | `AUTH_HANDLER_NAME` | _(empty)_ | Empty = anonymous (Playground/local), `AGENTIC` = production | | `BEARER_TOKEN` | _(empty)_ | Token for MCP tool access. Get with `a365 develop get-token -o raw` | -| `AGENTIC_APP_ID` | — | Agent App ID from A365 portal | +| `AGENTIC_APP_ID` | — | Agent **app instance** ID. Used as the observability agent id (must match the token `azp`) | | `AGENTIC_TENANT_ID` | — | Azure tenant ID | -| `AGENTIC_USER_ID` | — | Agent User ID from A365 portal | +| `AGENTIC_USER_ID` | — | Agent **user** ID (identity enrichment); populated after admin approves the instance | | `PORT` | `3978` | Server port (Azure sets this to `8000` automatically) | | `ENABLE_OBSERVABILITY` | `true` | Enable OpenTelemetry tracing | | `ENABLE_A365_OBSERVABILITY_EXPORTER` | `false` | Send traces to A365 backend (`true` for production) | diff --git a/python/google-adk/sample-agent/agent.py b/python/google-adk/sample-agent/agent.py index 23c6d61e..9eb95e89 100644 --- a/python/google-adk/sample-agent/agent.py +++ b/python/google-adk/sample-agent/agent.py @@ -174,8 +174,70 @@ async def invoke_agent_with_scope( # Fall back to env vars so observability baggage is still populated. recipient = context.activity.recipient tenant_id = getattr(recipient, "tenant_id", None) or os.getenv("AGENTIC_TENANT_ID", "") - agent_id = getattr(recipient, "agentic_user_id", None) or os.getenv("AGENTIC_USER_ID", "") - with BaggageBuilder().tenant_id(tenant_id).agent_id(agent_id).build(): + # The A365 observability ingestion endpoint enforces ValidateAgentIdentity: + # the {agentId} in the export URL (and every gen_ai.agent.id span attribute) + # must equal the application id (azp/appid) carried in the agentic OBO token. + # That token's azp is the agent_app_instance_id (AGENTIC_APP_ID), NOT the + # agentic_user_id — so the observability agent id must be the app instance id. + agent_id = getattr(recipient, "agentic_app_id", None) or os.getenv("AGENTIC_APP_ID", "") + + # Identity enrichment so the exported spans carry the dimensions the Agent 365 / + # IDEAs activity rollup needs to attribute usage. Without these, the admin-center + # "Activity" tabs stay empty ("When people are using agents, their usage data will + # show up here") even though spans ingest successfully: + # - user.id (the invoking human) -> "active users" metric + # - microsoft.a365.agent.blueprint.id -> blueprint-level Activity tab + # - microsoft.agent.user.id / email -> agent (instance) attribution + from_prop = context.activity.from_property + user_aad_object_id = getattr(from_prop, "aad_object_id", None) or getattr(from_prop, "id", None) + user_display_name = getattr(from_prop, "name", None) + # Blueprint id (a365.generated.config.json → agentBlueprintId). This is the + # blueprint/template id, distinct from AGENTIC_APP_ID (the instance app id). + # The service-connection CLIENTID already carries the blueprint id, so we reuse + # it as the fallback and no extra env var is required for deployment. + blueprint_id = ( + getattr(recipient, "agent_blueprint_id", None) + or os.getenv("AGENT_BLUEPRINT_ID") + or os.getenv("CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTID", "") + ) + agentic_user_id = getattr(recipient, "agentic_user_id", None) or os.getenv("AGENTIC_USER_ID", "") + agentic_user_email = os.getenv("AGENTIC_UPN", "") + + baggage = ( + BaggageBuilder() + .tenant_id(tenant_id) + .agent_id(agent_id) + .agent_blueprint_id(blueprint_id) + .agentic_user_id(agentic_user_id) + .agentic_user_email(agentic_user_email) + .user_id(user_aad_object_id) + .user_name(user_display_name) + ) + with baggage.build(): + # When running with an agentic auth handler (production), exchange for an + # observability-scoped token and cache it so the A365 exporter can authenticate + # its span export for this (tenant, agent). Best-effort: a failure here must + # not break the turn — it only means spans aren't exported this cycle. + if auth_handler_name and tenant_id and agent_id: + try: + from microsoft_agents_a365.runtime.environment_utils import ( + get_observability_authentication_scope, + ) + from token_cache import cache_agentic_token + + exaau_token = await auth.exchange_token( + context, + scopes=get_observability_authentication_scope(), + auth_handler_id=auth_handler_name, + ) + if exaau_token and getattr(exaau_token, "token", None): + cache_agentic_token(tenant_id, agent_id, exaau_token.token) + logger.info("Cached observability token for agent_id=%s", agent_id) + else: + logger.warning("Observability token exchange returned no token") + except Exception as e: + logger.warning("Observability token exchange failed: %s", e) + return await self.invoke_agent(message=message, auth=auth, auth_handler_name=auth_handler_name, context=context) async def _cleanup_agent(self, agent: Agent): diff --git a/python/google-adk/sample-agent/deploy-cloudrun.ps1 b/python/google-adk/sample-agent/deploy-cloudrun.ps1 new file mode 100644 index 00000000..356d3ef8 --- /dev/null +++ b/python/google-adk/sample-agent/deploy-cloudrun.ps1 @@ -0,0 +1,75 @@ +# Deploy the Google ADK A365 sample agent to GCP Cloud Run. +# +# Reads non-secret + secret env from the local .env (gitignored), applies the +# production overrides needed for the A365 observability exporter, and deploys +# via Cloud Run source buildpacks (Procfile -> `python main.py`). +# +# Cloud Run automatically injects PORT and K_SERVICE; main.py reads both, so the +# JWT middleware + production host binding engage with no extra config. +# +# Usage: +# .\deploy-cloudrun.ps1 -ProjectId [-Region us-central1] [-ServiceName gcp-a365-agent] + +param( + [Parameter(Mandatory = $true)] [string] $ProjectId, + [string] $Region = "us-central1", + [string] $ServiceName = "gcp-a365-agent" +) + +$ErrorActionPreference = "Stop" +Set-Location $PSScriptRoot + +if (-not (Test-Path ".env")) { throw ".env not found in $PSScriptRoot" } + +# Production overrides applied on top of .env. PORT is intentionally omitted +# (Cloud Run sets it). AUTH_HANDLER_NAME=AGENTIC turns on agentic token exchange. +$overrides = [ordered]@{ + "AUTH_HANDLER_NAME" = "AGENTIC" + "ENABLE_OBSERVABILITY" = "true" + "ENABLE_A365_OBSERVABILITY_EXPORTER" = "true" + "PYTHON_ENVIRONMENT" = "production" +} + +# Parse .env into an ordered map (skip comments, blanks, and PORT). +$envMap = [ordered]@{} +foreach ($line in Get-Content ".env") { + $trimmed = $line.Trim() + if ($trimmed -eq "" -or $trimmed.StartsWith("#")) { continue } + $idx = $trimmed.IndexOf("=") + if ($idx -lt 1) { continue } + $key = $trimmed.Substring(0, $idx).Trim() + $val = $trimmed.Substring($idx + 1).Trim() + if ($key -eq "PORT") { continue } + $envMap[$key] = $val +} +foreach ($k in $overrides.Keys) { $envMap[$k] = $overrides[$k] } + +# Build the env-vars string using a custom delimiter (^##^) so values containing +# commas, slashes, colons, etc. are passed verbatim to gcloud. +$pairs = @() +foreach ($k in $envMap.Keys) { $pairs += "$k=$($envMap[$k])" } +$envArg = "^##^" + ($pairs -join "##") + +Write-Host "Deploying '$ServiceName' to project '$ProjectId' ($Region) with $($envMap.Count) env vars..." -ForegroundColor Cyan + +# --no-cpu-throttling (CPU always allocated) is REQUIRED: the OTel BatchSpanProcessor +# exports genAI spans on a background thread AFTER the turn returns. With default CPU +# throttling, that thread wakes on a frozen CPU and its TLS read stalls -> the gateway +# drops the connection (SSL UNEXPECTED_EOF_WHILE_READING) and spans are lost. +gcloud run deploy $ServiceName ` + --source . ` + --project $ProjectId ` + --region $Region ` + --platform managed ` + --allow-unauthenticated ` + --no-cpu-throttling ` + --set-env-vars $envArg + +if ($LASTEXITCODE -ne 0) { throw "gcloud run deploy failed (exit $LASTEXITCODE)" } + +$url = gcloud run services describe $ServiceName --project $ProjectId --region $Region --format "value(status.url)" +Write-Host "" +Write-Host "Deployed. Service URL: $url" -ForegroundColor Green +Write-Host "Messaging endpoint: $url/api/messages" -ForegroundColor Green +Write-Host "" +Write-Host "Next: set messagingEndpoint in a365.config.json to the above, then run 'a365 setup all'." -ForegroundColor Yellow diff --git a/python/google-adk/sample-agent/main.py b/python/google-adk/sample-agent/main.py index d3378700..f0f6de83 100644 --- a/python/google-adk/sample-agent/main.py +++ b/python/google-adk/sample-agent/main.py @@ -140,10 +140,22 @@ def main(): # ENABLE_A365_OBSERVABILITY_EXPORTER=true sends traces to the A365 backend; # false falls back to the console exporter (expected in local/dev). if os.getenv("ENABLE_OBSERVABILITY", "true").lower() == "true": + # token_resolver supplies the Bearer token the A365 exporter uses to POST + # spans. It reads the agentic token cached during each authenticated turn + # (see agent.py invoke_agent_with_scope). When the A365 exporter is disabled + # (console exporter), the resolver is simply never called. + from token_cache import observability_token_resolver configure( service_name=os.getenv("OBSERVABILITY_SERVICE_NAME", "GoogleADKSampleAgent"), service_namespace=os.getenv("OBSERVABILITY_SERVICE_NAMESPACE", "GoogleADKTesting"), + token_resolver=observability_token_resolver, ) + # Google ADK tags the LLM span as gen_ai.operation.name="generate_content", + # which A365/Maven ingestion drops (it only accepts invoke_agent, execute_tool, + # chat, output_messages). Remap generate_content -> chat on export so the + # inference span (model + token usage) reaches Maven's InferenceCall table. + from observability_remap import register_generate_content_remap + register_generate_content_remap() logger.info( "Observability configured (service=%s, a365_exporter=%s)", os.getenv("OBSERVABILITY_SERVICE_NAME", "GoogleADKSampleAgent"), diff --git a/python/google-adk/sample-agent/observability_remap.py b/python/google-adk/sample-agent/observability_remap.py new file mode 100644 index 00000000..a4ed6672 --- /dev/null +++ b/python/google-adk/sample-agent/observability_remap.py @@ -0,0 +1,128 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Span operation-name remap for the Google ADK sample agent. + +Google ADK auto-instrumentation tags the LLM call with +``gen_ai.operation.name = "generate_content"`` (the Google GenAI semantic +convention). Microsoft Agent 365 / Maven ingestion only accepts four +operation names — ``invoke_agent``, ``execute_tool``, ``chat`` and +``output_messages`` — and drops every other span before fan-out. As a +result the ADK inference span (model, token usage, finish reason) never +reaches Maven. + +This module rewrites ``generate_content`` -> ``chat`` on export using the +A365 observability SDK's public enricher hook (``register_span_enricher``), +so the inference span maps onto Maven's InferenceCall table. The original +span is never mutated; an :class:`EnrichedReadableSpan` overlay is returned +with the single attribute overridden. + +No changes to the A365 SDK or to Maven are required. +""" + +import logging + +from opentelemetry.sdk.trace import ReadableSpan + +from microsoft_agents_a365.observability.core.constants import ( + CHAT_OPERATION_NAME, + EXECUTE_TOOL_OPERATION_NAME, + GEN_AI_OPERATION_NAME_KEY, + INVOKE_AGENT_OPERATION_NAME, + OUTPUT_MESSAGES_OPERATION_NAME, +) +from microsoft_agents_a365.observability.core.exporters.enriched_span import ( + EnrichedReadableSpan, +) +from microsoft_agents_a365.observability.core.exporters.enriching_span_processor import ( + get_span_enricher, + register_span_enricher, + unregister_span_enricher, +) + +logger = logging.getLogger(__name__) + +# The Google GenAI semantic-convention operation name (emitted only when the +# optional ``opentelemetry-instrumentation-google-genai`` package is installed). +_SOURCE_OPERATION_NAME = "generate_content" + +# Attribute set by ADK's ``trace_call_llm`` on the inference span. ADK does NOT +# set ``gen_ai.operation.name`` on this span, so without a remap it is dropped by +# the Agent 365 exporter (which only keeps invoke_agent/execute_tool/chat/ +# output_messages). Presence of this attribute identifies an inference call. +_GEN_AI_REQUEST_MODEL_KEY = "gen_ai.request.model" + +# Operation names the Agent 365 exporter already considers eligible. A span that +# already carries one of these must never be relabelled. +_RECOGNIZED_OPERATION_NAMES = frozenset( + { + INVOKE_AGENT_OPERATION_NAME, + EXECUTE_TOOL_OPERATION_NAME, + OUTPUT_MESSAGES_OPERATION_NAME, + CHAT_OPERATION_NAME, + } +) + + +def _remap_generate_content_to_chat(span: ReadableSpan) -> ReadableSpan: + """Map an ADK / Google GenAI inference span onto the ``chat`` operation. + + Two shapes are handled: + + 1. ``gen_ai.operation.name == "generate_content"`` — emitted by the optional + ``opentelemetry-instrumentation-google-genai`` package. + 2. ADK's own ``call_llm`` span, which sets ``gen_ai.request.model`` but no + ``gen_ai.operation.name`` at all. This is the default for google-adk + without the genai instrumentation package, and is the case in this + sample. + + Any span that already carries a recognized operation name (invoke_agent, + execute_tool, chat, output_messages) is returned unchanged. + """ + attributes = span.attributes or {} + operation_name = attributes.get(GEN_AI_OPERATION_NAME_KEY) + + # Never relabel a span that already has an eligible operation name. + if operation_name in _RECOGNIZED_OPERATION_NAMES: + return span + + is_genai_generate_content = operation_name == _SOURCE_OPERATION_NAME + is_adk_inference_span = ( + operation_name is None + and attributes.get(_GEN_AI_REQUEST_MODEL_KEY) is not None + ) + if not (is_genai_generate_content or is_adk_inference_span): + return span + + return EnrichedReadableSpan( + span, + extra_attributes={GEN_AI_OPERATION_NAME_KEY: CHAT_OPERATION_NAME}, + ) + + +def register_generate_content_remap() -> None: + """Register the ``generate_content`` -> ``chat`` enricher with the SDK. + + The SDK allows a single enricher at a time. If another enricher is already + registered (e.g. a platform instrumentor), this composes with it: the + existing enricher runs first, then the remap is applied to its result. + Safe to call once during application startup, after ``configure()``. + """ + existing = get_span_enricher() + + if existing is None: + enricher = _remap_generate_content_to_chat + else: + def enricher(span: ReadableSpan) -> ReadableSpan: + return _remap_generate_content_to_chat(existing(span)) + + # Replace the existing single-slot enricher with the composed one. + unregister_span_enricher() + + register_span_enricher(enricher) + logger.info( + "Registered span enricher: %s -> %s remap", + _SOURCE_OPERATION_NAME, + CHAT_OPERATION_NAME, + ) diff --git a/python/google-adk/sample-agent/requirements.txt b/python/google-adk/sample-agent/requirements.txt new file mode 100644 index 00000000..34527e1e --- /dev/null +++ b/python/google-adk/sample-agent/requirements.txt @@ -0,0 +1,88 @@ +aiohappyeyeballs==2.6.2 +aiohttp==3.14.1 +aiosignal==1.4.0 +aiosqlite==0.22.1 +annotated-doc==0.0.4 +annotated-types==0.7.0 +anyio==4.13.0 +attrs==26.1.0 +Authlib==1.7.2 +azure-core==1.41.0 +certifi==2026.5.20 +cffi==2.0.0 +charset-normalizer==3.4.7 +click==8.4.1 +colorama==0.4.6 +cryptography==48.0.1 +distro==1.9.0 +fastapi==0.136.3 +frozenlist==1.8.0 +google-adk==2.2.0 +google-auth==2.53.0 +google-genai==2.8.0 +googleapis-common-protos==1.75.0 +graphviz==0.21 +grpcio==1.81.0 +h11==0.16.0 +httpcore==1.0.9 +httptools==0.8.0 +httpx-sse==0.4.3 +httpx==0.28.1 +idna==3.18 +importlib_metadata==8.7.1 +isodate==0.7.2 +joserfc==1.7.1 +jsonschema-specifications==2025.9.1 +jsonschema==4.26.0 +mcp==1.27.2 +microsoft-agents-a365-notifications==1.0.0 +microsoft-agents-a365-observability-core==1.0.0 +microsoft-agents-a365-runtime==1.0.0 +microsoft-agents-a365-tooling==1.0.0 +microsoft-agents-activity==1.1.0.dev1 +microsoft-agents-authentication-msal==1.1.0.dev1 +microsoft-agents-hosting-aiohttp==1.1.0.dev1 +microsoft-agents-hosting-core==1.1.0.dev1 +msal==1.38.0rc1 +multidict==6.7.1 +opentelemetry-api==1.38.0 +opentelemetry-exporter-otlp-proto-common==1.38.0 +opentelemetry-exporter-otlp-proto-grpc==1.38.0 +opentelemetry-exporter-otlp-proto-http==1.38.0 +opentelemetry-exporter-otlp==1.38.0 +opentelemetry-proto==1.38.0 +opentelemetry-sdk==1.38.0 +opentelemetry-semantic-conventions==0.59b0 +packaging==26.2 +propcache==0.5.2 +protobuf==6.33.6 +pyasn1==0.6.3 +pyasn1_modules==0.4.2 +pycparser==3.0 +pydantic-settings==2.14.1 +pydantic==2.14.0a1 +pydantic_core==2.47.0 +PyJWT==2.13.0 +pyOpenSSL==26.2.0 +python-dotenv==1.2.2 +python-multipart==0.0.32 +PyYAML==6.0.3 +referencing==0.37.0 +requests==2.34.2 +rpds-py==2026.5.1 +sniffio==1.3.1 +sse-starlette==3.4.4 +starlette==1.3.1 +tenacity==9.1.4 +typing-inspection==0.4.2 +typing_extensions==4.15.0 +tzdata==2026.2 +tzlocal==5.3.1 +urllib3==2.7.0 +uvicorn==0.49.0 +watchdog==6.0.0 +watchfiles==1.2.0 +websockets==15.0.1 +wrapt==2.2.1 +yarl==1.24.2 +zipp==4.1.0 diff --git a/python/google-adk/sample-agent/token_cache.py b/python/google-adk/sample-agent/token_cache.py new file mode 100644 index 00000000..4b4aba72 --- /dev/null +++ b/python/google-adk/sample-agent/token_cache.py @@ -0,0 +1,95 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Token caching utilities for the Agent 365 Observability exporter. + +The A365 span exporter authenticates each export with a Bearer token resolved +via ``token_resolver(agent_id, tenant_id)``. That token is the agentic token the +agent obtains during an authenticated turn (via ``auth.exchange_token`` against +the observability scope). Because span export happens asynchronously on the +batch processor's schedule — after the turn handler returns — we cache the token +per (tenant, agent) during the turn and hand it back from the resolver. +""" + +import base64 +import binascii +import json +import logging +import threading +import time + +logger = logging.getLogger(__name__) + +# Fallback lifetime (seconds) used when the token's JWT ``exp`` claim cannot be +# parsed. Kept short so a malformed/opaque token is re-cached every turn rather +# than lingering past its real expiry. +_DEFAULT_TOKEN_TTL_SECONDS = 300 + +# Process-wide cache of agentic tokens keyed by "{tenant_id}:{agent_id}". +# Values are ``(token, expires_at_epoch_seconds)``. The cache is read by the +# OpenTelemetry BatchSpanProcessor export thread and written on the request/turn +# path, so every access is guarded by ``_cache_lock``. +_cache_lock = threading.Lock() +_agentic_token_cache: dict[str, tuple[str, float]] = {} + + +def _token_expiry(token: str) -> float: + """Return the epoch-seconds expiry for ``token`` from its JWT ``exp`` claim. + + Falls back to ``now + _DEFAULT_TOKEN_TTL_SECONDS`` when the token is not a + decodable JWT. The signature is not verified — this is only used to expire + the local cache entry, never to authorize anything. + """ + try: + payload_b64 = token.split(".")[1] + payload_b64 += "=" * (-len(payload_b64) % 4) # restore base64url padding + payload = json.loads(base64.urlsafe_b64decode(payload_b64)) + exp = float(payload["exp"]) + return exp + except (IndexError, KeyError, ValueError, TypeError, binascii.Error): + return time.time() + _DEFAULT_TOKEN_TTL_SECONDS + + +def cache_agentic_token(tenant_id: str, agent_id: str, token: str) -> None: + """Cache the agentic token for use by the Agent 365 Observability exporter.""" + key = f"{tenant_id}:{agent_id}" + expires_at = _token_expiry(token) + with _cache_lock: + _agentic_token_cache[key] = (token, expires_at) + logger.debug("Cached agentic token for %s", key) + + +def get_cached_agentic_token(tenant_id: str, agent_id: str) -> str | None: + """Retrieve the cached agentic token for the Agent 365 Observability exporter.""" + key = f"{tenant_id}:{agent_id}" + now = time.time() + with _cache_lock: + entry = _agentic_token_cache.get(key) + if entry is not None and entry[1] <= now: + # Expired: drop it so we don't export with a stale token. + del _agentic_token_cache[key] + entry = None + token = entry[0] if entry is not None else None + if token: + logger.debug("Retrieved cached agentic token for %s", key) + else: + logger.debug("No cached token found for %s", key) + return token + + +def observability_token_resolver(agent_id: str, tenant_id: str) -> str | None: + """Resolve the export Bearer token for the A365 Observability exporter. + + The exporter calls this with ``(agent_id, tenant_id)``; the cache is keyed + ``(tenant_id, agent_id)`` — note the argument order swap. + """ + token = get_cached_agentic_token(tenant_id, agent_id) + if not token: + logger.warning( + "No cached agentic token for agent_id=%s tenant_id=%s; " + "spans for this identity will not be exported this cycle.", + agent_id, + tenant_id, + ) + return token