Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion agent_assembly/client/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import httpx

from agent_assembly.client.dispatch import DispatchToolResult
from agent_assembly.core.transport_security import require_secure_http_url
from agent_assembly.exceptions import GatewayError


Expand All @@ -27,6 +28,7 @@ def __init__(
spawned_by_tool: str | None = None,
depth: int | None = None,
enforcement_mode: str | None = None,
allow_insecure: bool = False,
) -> None:
"""
Initialize the GatewayClient.
Expand All @@ -51,6 +53,12 @@ def __init__(
``None`` (the default) lets the gateway apply its server-side
default of live enforcement. Stored for reference; registration
itself now goes through the native gRPC path (ADR 0004).
allow_insecure: Permit sending the ``Authorization: Bearer`` API key
over plaintext ``http://`` to a **non-loopback** host. Off by
default — a remote ``http://`` base URL with an API key set is
refused with :class:`ValueError`, because the Bearer credential
would travel unencrypted (AAASM-3725). Loopback ``http://`` is
always allowed (local dev gateway).
"""
self.gateway_url = gateway_url.rstrip("/")
self.control_plane_url = control_plane_url.rstrip("/") if control_plane_url else None
Expand All @@ -63,6 +71,7 @@ def __init__(
self.spawned_by_tool = spawned_by_tool
self.depth = depth
self.enforcement_mode = enforcement_mode
self.allow_insecure = allow_insecure
self._client: httpx.Client | None = None

@property
Expand All @@ -73,11 +82,17 @@ def client(self) -> httpx.Client:
it falls back to ``gateway_url`` (single-host OSS dev).
"""
if self._client is None:
base_url = self.control_plane_url or self.gateway_url
require_secure_http_url(
base_url,
has_api_key=bool(self.api_key),
allow_insecure=self.allow_insecure,
)
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
self._client = httpx.Client(
base_url=self.control_plane_url or self.gateway_url,
base_url=base_url,
headers=headers,
timeout=self.timeout,
)
Expand Down
5 changes: 5 additions & 0 deletions agent_assembly/core/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
register_agent,
)
from agent_assembly.core.spawn import _SPAWN_CTX
from agent_assembly.core.transport_security import warn_if_insecure_http_url
from agent_assembly.exceptions import AssemblyError, ConfigurationError

RuntimeMode = Literal["auto", "ebpf", "proxy", "sdk-only"]
Expand Down Expand Up @@ -173,6 +174,10 @@ def init_assembly(
"""
gateway_url = resolve_gateway_url(gateway_url)
api_key = resolve_api_key(api_key)
# Warn early when the resolved gateway would carry the Bearer API key over
# plaintext http:// to a non-loopback host (AAASM-3725). The control-plane
# URL is the host the credential is actually sent to when set.
warn_if_insecure_http_url(control_plane_url or gateway_url, has_api_key=bool(api_key))
gateway_url, control_plane_url = _validate_inputs(
gateway_url=gateway_url,
mode=mode,
Expand Down
136 changes: 136 additions & 0 deletions agent_assembly/core/transport_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""Shared transport-security validation for gateway connections (AAASM-3685/3725).

The SDK reaches the governance gateway over two transports that both carry
sensitive material:

* the op-control gRPC stream (``op_control.py``), which carries the agent
identity triple and operator pause / terminate signals;
* the control-plane HTTP API (``client/gateway.py``), which sends the API key
as an ``Authorization: Bearer`` header.

Neither must travel unencrypted to a **non-loopback** host without an explicit
opt-in. A loopback target is the local dev-mode gateway, where plaintext is the
documented default; anything else is presumed remote and must be encrypted.

This module centralises the loopback decision and the two enforcement helpers
so the gRPC and HTTP paths agree on what "secure" means (mirrors node-sdk
AAASM-3123).
"""

from __future__ import annotations

import warnings
from urllib.parse import urlsplit

__all__ = [
"LOOPBACK_HOSTS",
"gateway_host_of",
"is_loopback_target",
"require_secure_grpc_target",
"require_secure_http_url",
"warn_if_insecure_http_url",
]

# Hosts treated as loopback for the secure-by-default transport decision.
# A loopback gateway is the local dev-mode CP, where plaintext is the documented
# default; anything else is presumed remote and must be encrypted.
LOOPBACK_HOSTS = frozenset({"localhost", "127.0.0.1", "::1", "[::1]"})


def gateway_host_of(gateway_url: str) -> str:
"""Extract the bare host from a gateway target.

Accepts a gRPC ``host:port`` target, a bare host, or a full URL with a
scheme (``http://host:port/path``). IPv6 bracket form (``[::1]:7391``) is
preserved with its brackets so it can be matched against
:data:`LOOPBACK_HOSTS`. The result is lower-cased for case-insensitive
comparison.
"""
target = gateway_url.strip()

# Full URL with a scheme — let urlsplit pull out the host:port authority.
if "://" in target:
target = urlsplit(target).netloc

# Strip any userinfo (``user:pass@host``) — only the host matters here.
if "@" in target:
target = target.rsplit("@", 1)[1]

# Bracketed IPv6 (``[::1]`` or ``[::1]:7391``): keep the brackets, drop the
# trailing ``:port`` that sits *outside* the closing bracket.
if target.startswith("["):
closing = target.find("]")
if closing != -1:
return target[: closing + 1].lower()
return target.lower()

# Bare IPv6 without brackets (more than one colon) — no port to strip.
if target.count(":") > 1:
return target.lower()

# host or host:port — drop the port.
return target.rsplit(":", 1)[0].lower() if ":" in target else target.lower()


def is_loopback_target(gateway_url: str) -> bool:
"""Return True iff ``gateway_url`` points at a loopback host."""
return gateway_host_of(gateway_url) in LOOPBACK_HOSTS


def require_secure_grpc_target(gateway_url: str, *, allow_insecure: bool) -> None:
"""Validate that a plaintext gRPC channel to ``gateway_url`` is permitted.

A loopback target is always allowed (local dev gateway). A non-loopback
target requires the caller to set ``allow_insecure`` — otherwise raise
:class:`ValueError`, because the op-control stream carries the agent
identity and operator control signals.
"""
if is_loopback_target(gateway_url) or allow_insecure:
return
raise ValueError(
f"Refusing to open an insecure (plaintext) gRPC channel to non-loopback "
f"gateway {gateway_url!r}. Use a TLS channel via channel_factory, or pass "
f"allow_insecure=True to explicitly opt in (loopback dev only)."
)


def require_secure_http_url(gateway_url: str, *, has_api_key: bool, allow_insecure: bool) -> None:
"""Validate that an ``http://`` control-plane URL is permitted.

When an API key is set it is sent as a Bearer header, so a plaintext
``http://`` URL to a non-loopback host would leak the credential. Refuse
unless the target is loopback or ``allow_insecure`` is set. ``https://``
URLs (and non-http schemes) always pass.
"""
scheme = urlsplit(gateway_url.strip()).scheme.lower()
if scheme != "http":
return
if not has_api_key:
return
if is_loopback_target(gateway_url) or allow_insecure:
return
raise ValueError(
f"Refusing to send an Authorization: Bearer credential over a plaintext "
f"(non-TLS) connection to non-loopback gateway {gateway_url!r}. Use an "
f"https:// endpoint, or pass allow_insecure=True to explicitly opt in "
f"(loopback dev only)."
)


def warn_if_insecure_http_url(gateway_url: str, *, has_api_key: bool) -> None:
"""Emit a warning when a non-loopback ``http://`` gateway carries a key.

Resolution-time advisory counterpart to :func:`require_secure_http_url`:
the resolver knows the URL and whether a key is set before any request is
issued, so it warns early. The hard refusal still happens later in
``GatewayClient`` (AAASM-3725). ``https://`` and loopback targets are silent.
"""
scheme = urlsplit(gateway_url.strip()).scheme.lower()
if scheme != "http" or not has_api_key or is_loopback_target(gateway_url):
return
warnings.warn(
f"Gateway {gateway_url!r} uses plaintext http:// while an API key is set; "
f"the Authorization: Bearer credential would travel unencrypted. Use "
f"https:// for non-loopback gateways.",
stacklevel=3,
)
14 changes: 13 additions & 1 deletion agent_assembly/op_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import grpc

from agent_assembly.core.transport_security import require_secure_grpc_target
from agent_assembly.exceptions import OpTerminatedError
from agent_assembly.proto import common_pb2, policy_pb2, policy_pb2_grpc

Expand Down Expand Up @@ -102,13 +103,24 @@ def connect(
team_id: str,
agent_id: str,
channel_factory: grpc.Channel | None = None,
allow_insecure: bool = False,
) -> OpControlSubscriber:
"""Open the gRPC channel + subscription stream and start the reader.

``gateway_url`` is the ``host:port`` of the gateway's gRPC endpoint
(no scheme; gRPC uses its own). When ``channel_factory`` is supplied
(tests), it's used instead of opening a fresh insecure channel.
(tests, or a TLS channel built by the caller), it's used verbatim and
the loopback / ``allow_insecure`` check below is bypassed — the caller
has taken responsibility for the transport.

Otherwise a plaintext ``grpc.insecure_channel`` is opened, but only when
the target is loopback (local dev gateway) or ``allow_insecure`` is set.
The op-control stream carries the agent identity triple and operator
pause / terminate signals, so an unencrypted channel to a non-loopback
host is refused with :class:`ValueError` (AAASM-3685).
"""
if channel_factory is None:
require_secure_grpc_target(gateway_url, allow_insecure=allow_insecure)
channel = channel_factory or grpc.insecure_channel(gateway_url)
stub = policy_pb2_grpc.PolicyServiceStub(channel) # type: ignore[no-untyped-call]
proto_agent_id = common_pb2.AgentId(org_id=org_id, team_id=team_id, agent_id=agent_id)
Expand Down
41 changes: 40 additions & 1 deletion test/unit/client/test_gateway_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ def _patch_post(client: GatewayClient, mock_post: MagicMock) -> Any:


def test_http_client_sets_bearer_auth_header_when_api_key_present() -> None:
client = GatewayClient(gateway_url="http://gw.test", agent_id="a", api_key="sekret")
# allow_insecure opts past the AAASM-3725 plaintext-http refusal so this
# test can assert header behavior on a non-loopback http:// base URL.
client = GatewayClient(
gateway_url="http://gw.test", agent_id="a", api_key="sekret", allow_insecure=True
)
try:
assert client.client.headers["Authorization"] == "Bearer sekret"
finally:
Expand Down Expand Up @@ -86,3 +90,38 @@ def test_report_edge_raises_gateway_error_on_http_error() -> None:
mock_post = MagicMock(return_value=_raising(httpx.ConnectError("down")))
with _patch_post(client, mock_post), pytest.raises(GatewayError, match="Failed to report edge"):
client.report_edge("src", "dst", "messages")


class TestHttpTransportSecurity:
"""AAASM-3725: refuse Bearer API key over plaintext http to a remote host."""

def test_bearer_over_http_non_loopback_rejected(self) -> None:
with GatewayClient(gateway_url="http://gw.test", agent_id="a", api_key="k") as client:
with pytest.raises(ValueError, match="Bearer"):
_ = client.client

def test_bearer_over_http_loopback_allowed(self) -> None:
with GatewayClient(
gateway_url="http://localhost:7391", agent_id="a", api_key="k"
) as client:
assert client.client.headers["Authorization"] == "Bearer k"

def test_bearer_over_https_non_loopback_allowed(self) -> None:
with GatewayClient(gateway_url="https://gw.test", agent_id="a", api_key="k") as client:
assert client.client.headers["Authorization"] == "Bearer k"

def test_http_non_loopback_without_key_allowed(self) -> None:
with GatewayClient(gateway_url="http://gw.test", agent_id="a") as client:
assert "Authorization" not in client.client.headers

def test_control_plane_url_is_the_validated_target(self) -> None:
# The Bearer header rides the control-plane base URL when set; a remote
# plaintext control-plane URL must be refused even if gateway_url is safe.
with GatewayClient(
gateway_url="https://gw.test",
agent_id="a",
api_key="k",
control_plane_url="http://cp.remote",
) as client:
with pytest.raises(ValueError, match="Bearer"):
_ = client.client
94 changes: 94 additions & 0 deletions test/unit/core/test_transport_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Tests for the shared transport-security validator (AAASM-3685/3725)."""

from __future__ import annotations

import warnings

import pytest

from agent_assembly.core import transport_security as ts


class TestGatewayHostOf:
@pytest.mark.parametrize(
("target", "expected"),
[
("localhost:7391", "localhost"),
("127.0.0.1:7391", "127.0.0.1"),
("http://localhost:7391", "localhost"),
("https://gw.prod.example/path", "gw.prod.example"),
("gw.prod.example:443", "gw.prod.example"),
("[::1]:7391", "[::1]"),
("[::1]", "[::1]"),
("user:pass@gw.prod.example:443", "gw.prod.example"),
("LOCALHOST:7391", "localhost"),
("bare-host", "bare-host"),
],
)
def test_extracts_host(self, target: str, expected: str) -> None:
assert ts.gateway_host_of(target) == expected


class TestIsLoopbackTarget:
@pytest.mark.parametrize(
"target",
["localhost:7391", "127.0.0.1:7391", "http://localhost:7391", "[::1]:7391", "::1"],
)
def test_loopback(self, target: str) -> None:
assert ts.is_loopback_target(target) is True

@pytest.mark.parametrize(
"target",
["gw.prod.example:443", "http://gw.test", "10.0.0.5:7391", "example.com"],
)
def test_non_loopback(self, target: str) -> None:
assert ts.is_loopback_target(target) is False


class TestRequireSecureGrpcTarget:
def test_loopback_allowed(self) -> None:
ts.require_secure_grpc_target("localhost:7391", allow_insecure=False)

def test_non_loopback_rejected_without_opt_in(self) -> None:
with pytest.raises(ValueError, match="insecure"):
ts.require_secure_grpc_target("gw.prod.example:443", allow_insecure=False)

def test_non_loopback_allowed_with_opt_in(self) -> None:
ts.require_secure_grpc_target("gw.prod.example:443", allow_insecure=True)


class TestRequireSecureHttpUrl:
def test_https_always_allowed(self) -> None:
ts.require_secure_http_url("https://gw.test", has_api_key=True, allow_insecure=False)

def test_http_no_key_allowed(self) -> None:
ts.require_secure_http_url("http://gw.test", has_api_key=False, allow_insecure=False)

def test_loopback_http_with_key_allowed(self) -> None:
ts.require_secure_http_url("http://localhost:7391", has_api_key=True, allow_insecure=False)

def test_non_loopback_http_with_key_rejected(self) -> None:
with pytest.raises(ValueError, match="Bearer"):
ts.require_secure_http_url("http://gw.test", has_api_key=True, allow_insecure=False)

def test_non_loopback_http_with_key_allowed_when_opted_in(self) -> None:
ts.require_secure_http_url("http://gw.test", has_api_key=True, allow_insecure=True)


class TestWarnIfInsecureHttpUrl:
def test_warns_on_non_loopback_http_with_key(self) -> None:
with pytest.warns(UserWarning, match="unencrypted"):
ts.warn_if_insecure_http_url("http://gw.test", has_api_key=True)

@pytest.mark.parametrize(
("url", "has_key"),
[
("https://gw.test", True),
("http://gw.test", False),
("http://localhost:7391", True),
],
)
def test_silent_otherwise(self, url: str, has_key: bool) -> None:
with warnings.catch_warnings():
warnings.simplefilter("error")
ts.warn_if_insecure_http_url(url, has_api_key=has_key)
Loading