Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions cloud-agents/reviewer/deploy_agent_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Deploy the ADK reviewer agent to Vertex AI Agent Engine.

The reviewer is a pure reasoning + grounding agent (Gemini via Vertex AI +
Vertex AI Search), which is exactly what Agent Engine is built to host — so it
runs there, while the coder agent (which needs a real git/shell sandbox) stays
on Cloud Run. This is the "right runtime per agent" half of Revka's hybrid
deployment.

Idempotent: updates the existing Agent Engine with the same display name if one
exists, otherwise creates it. Prints the reasoning-engine resource name, which
Revka's workflow uses to call the agent over Agent Engine's query API.

Run locally (gcloud ADC) or from CI (Workload Identity Federation):

cd cloud-agents/reviewer && python deploy_agent_engine.py

Config via env (sensible defaults for project construct-498201):
PROJECT, LOCATION, STAGING_BUCKET, RUNTIME_SERVICE_ACCOUNT,
REVIEWER_DATASTORE_ID, GITHUB_TOKEN_SECRET, DISPLAY_NAME
"""
from __future__ import annotations

import os
import sys

import vertexai
from vertexai import agent_engines
from vertexai.preview import reasoning_engines

from agent import root_agent, REVIEWER_DATASTORE_ID # local module

PROJECT = os.getenv("PROJECT", "construct-498201")
LOCATION = os.getenv("LOCATION", "us-central1")
STAGING_BUCKET = os.getenv("STAGING_BUCKET", f"gs://{PROJECT}-agent-engine")
# Reuse the reviewer Cloud Run SA: it already holds aiplatform.user,
# discoveryengine.viewer, and secretAccessor on revka-GITHUB_TOKEN.
RUNTIME_SA = os.getenv(
"RUNTIME_SERVICE_ACCOUNT",
f"reviewer-agent@{PROJECT}.iam.gserviceaccount.com",
)
GITHUB_TOKEN_SECRET = os.getenv("GITHUB_TOKEN_SECRET", "revka-GITHUB_TOKEN")
DISPLAY_NAME = os.getenv("DISPLAY_NAME", "revka-reviewer")

REQUIREMENTS = [
"google-adk==1.15.0",
"httpx>=0.27.0,<1.0.0",
"google-auth>=2.28",
"google-cloud-aiplatform[agent_engines]>=1.95.1",
]

# Both files travel with the agent so the bundled-conventions corpus fallback
# (retrieve_conventions) keeps working even if the search index is still settling.
EXTRA_PACKAGES = ["agent.py", "grounding/CONVENTIONS.md"]

ENV_VARS = {
# GOOGLE_CLOUD_PROJECT / GOOGLE_CLOUD_LOCATION are reserved on Agent Engine
# (provided automatically); GOOGLE_GENAI_USE_VERTEXAI is set in agent.py.
"REVIEWER_DATASTORE_ID": REVIEWER_DATASTORE_ID,
}

# The reviewer only *reads* PRs, so against a public repo it needs no GitHub
# token (unauthenticated REST is fine). Only attach the Secret Manager token for
# a private repo — and then the Agent Engine runtime service agent must have
# secretAccessor on it, or instances fail readiness ("no running instances").
if os.getenv("ATTACH_GITHUB_SECRET", "").lower() in ("1", "true", "yes"):
# A dict is converted to a SecretRef proto by the SDK (version-agnostic).
ENV_VARS["GITHUB_TOKEN"] = {"secret": GITHUB_TOKEN_SECRET, "version": "latest"}


def main() -> None:
vertexai.init(project=PROJECT, location=LOCATION, staging_bucket=STAGING_BUCKET)

app = reasoning_engines.AdkApp(agent=root_agent, enable_tracing=True)

# Run the engine as the reviewer SA (it already holds aiplatform.user +
# discoveryengine.viewer), so Vertex AI Search grounding is live rather than
# falling back to the bundled CONVENTIONS.md corpus. Requires the Agent
# Engine service agent to have roles/iam.serviceAccountTokenCreator on this
# SA. Set via the SDK's spec.service_account (honored on create and update).
common = dict(
requirements=REQUIREMENTS,
extra_packages=EXTRA_PACKAGES,
env_vars=ENV_VARS,
service_account=RUNTIME_SA,
display_name=DISPLAY_NAME,
description=(
"Revka reviewer agent (ADK/Gemini via Vertex AI) — reviews GitHub "
"PRs grounded in repo coding conventions via Vertex AI Search."
),
)

existing = [
a for a in agent_engines.list()
if getattr(a, "display_name", None) == DISPLAY_NAME
]
if existing:
target = existing[0]
print(f"==> Updating existing Agent Engine: {target.resource_name}", file=sys.stderr)
remote = target.update(agent_engine=app, **common)
else:
print("==> Creating new Agent Engine", file=sys.stderr)
remote = agent_engines.create(agent_engine=app, **common)

# stdout = just the resource name, so CI / callers can capture it cleanly.
print(remote.resource_name)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ inputs:
- name: reviewer_agent_url
required: false
default: "https://reviewer-agent-n22ujw2j2a-uc.a.run.app"
description: Cloud Run A2A reviewer executor (ADK/Gemini via Vertex).
description: Legacy Cloud Run A2A reviewer executor (kept for fallback; the reviewer now runs on Vertex AI Agent Engine).
- name: reviewer_agent_engine
required: false
default: "projects/1091585228963/locations/us-central1/reasoningEngines/3625053003137941504"
description: Vertex AI Agent Engine resource for the ADK/Gemini reviewer (reasoning + Vertex AI Search grounding).
- name: fix_strategy
required: false
default: "Analyze the issue and implement the smallest correct fix. Add or adjust tests to cover the change. Run pytest. Keep the change minimal and well-scoped."
Expand Down Expand Up @@ -303,23 +307,89 @@ steps:
issue_number: "${assess_issue.output_data.issue_number}"

# -------------------------------------------------------------------------
# 6. Review the PR — A2A call to the Cloud Run ADK/Gemini reviewer. Threaded
# pr_number comes from the extract step above.
# 6. Review the PR — query the ADK/Gemini reviewer running on Vertex AI Agent
# Engine (reasoning + Vertex AI Search grounding). The orchestrator mints an
# OAuth access token from its metadata server (its SA holds aiplatform.user)
# and calls the engine's :streamQuery endpoint, then parses the reviewer's
# JSON verdict out of the streamed events. Threaded pr_number comes from the
# extract step above.
# -------------------------------------------------------------------------
- id: review_pr
name: "Review Pull Request (A2A / Cloud Run)"
type: a2a
name: "Review Pull Request (Vertex AI Agent Engine)"
type: python
position:
x: 513.52
y: 600.0
depends_on: [extract_pr_number]
a2a:
url: "${inputs.reviewer_agent_url}"
cloud_run_auth: gcloud
cloud_run_audience: "${inputs.reviewer_agent_url}"
python:
timeout: 900
message: |
{"repo_name": "${inputs.repo_name}", "pr_number": ${extract_pr_number.output_data.pr_number}}
code: |
import json, sys, re, time, urllib.request, urllib.parse
ctx = json.load(sys.stdin)
engine = (ctx["args"].get("reviewer_agent_engine") or "").strip()
repo = ctx["args"].get("repo_name") or ""
pr = ctx["args"].get("pr_number")
out = {"review_status": "needs_changes", "findings": [],
"standards_checked": [], "summary": ""}
def metadata_access_token():
url = ("http://metadata.google.internal/computeMetadata/v1/instance/"
"service-accounts/default/token")
req = urllib.request.Request(url, headers={"Metadata-Flavor": "Google"})
with urllib.request.urlopen(req, timeout=5) as r:
return json.loads(r.read().decode("utf-8"))["access_token"]
def query_reviewer():
loc = engine.split("/locations/")[1].split("/")[0]
token = metadata_access_token()
api = ("https://" + loc + "-aiplatform.googleapis.com/v1/" + engine +
":streamQuery?alt=sse")
message = json.dumps({"repo_name": repo, "pr_number": pr})
body = json.dumps({"class_method": "stream_query",
"input": {"user_id": "revka", "message": message}})
req = urllib.request.Request(
api, data=body.encode("utf-8"),
headers={"Authorization": "Bearer " + token,
"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=280) as r:
raw = r.read().decode("utf-8", "replace")
final = ""
for line in raw.splitlines():
line = line.strip()
if not line:
continue
if line.startswith("data:"):
line = line[5:].strip()
try:
ev = json.loads(line)
except Exception:
continue
for p in (ev.get("content", {}) or {}).get("parts", []) or []:
if p.get("text"):
final = p["text"]
return final
# The ADK/Gemini agent can transiently fail (empty stream); retry so a
# single hiccup doesn't drop the verdict. Grounding stays additive.
final, last_err = "", ""
for attempt in range(3):
try:
final = query_reviewer()
if final.strip():
break
except Exception as exc:
last_err = type(exc).__name__ + ": " + str(exc)[:200]
time.sleep(8 * (attempt + 1))
m = re.search(r"\{.*\}", final, re.S)
if m:
out = json.loads(m.group(0))
elif last_err:
out["summary"] = "Agent Engine review call failed: " + last_err
else:
out["summary"] = ("Reviewer returned no parseable verdict: " +
final[:500])
json.dump(out, sys.stdout)
args:
reviewer_agent_engine: "${inputs.reviewer_agent_engine}"
repo_name: "${inputs.repo_name}"
pr_number: "${extract_pr_number.output_data.pr_number}"

# -------------------------------------------------------------------------
# 7. Human gate — approve the merge.
Expand Down
Loading