diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..01f6ddf --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,90 @@ +name: ci + +on: + pull_request: + push: + branches: [main] + +concurrency: + group: ci-${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + runs-on: ubuntu-latest + services: + postgres: + image: postgres:16-alpine + env: + POSTGRES_USER: scopilot + POSTGRES_PASSWORD: scopilot + POSTGRES_DB: scopilot + ports: ["5432:5432"] + options: >- + --health-cmd "pg_isready -U scopilot" + --health-interval 5s + --health-timeout 3s + --health-retries 10 + redis: + image: redis:7-alpine + ports: ["6379:6379"] + options: >- + --health-cmd "redis-cli ping" + --health-interval 5s + --health-timeout 3s + --health-retries 10 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + cache: pip + - name: Install + run: | + python -m pip install --upgrade pip + pip install -e ".[dev,api,worker,webex,mcp,cli,sources,postgres]" + - name: Lint + run: ruff check . + - name: Run Alembic migrations against Postgres + env: + SCOPILOT_DB__URL: postgresql+asyncpg://scopilot:scopilot@localhost:5432/scopilot + SCOPILOT_DB__SYNC_URL: postgresql+psycopg2://scopilot:scopilot@localhost:5432/scopilot + run: alembic upgrade head + - name: pytest + env: + SCOPILOT_DB__URL: sqlite+aiosqlite:///:memory: + run: pytest -v --tb=short + + docker: + runs-on: ubuntu-latest + needs: test + if: github.event_name == 'push' && github.ref == 'refs/heads/main' + permissions: + contents: read + packages: write + steps: + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Build image + uses: docker/build-push-action@v6 + with: + context: . + file: deploy/Dockerfile + push: true + tags: | + ghcr.io/${{ github.repository }}:latest + ghcr.io/${{ github.repository }}:${{ github.sha }} + cache-from: type=gha + cache-to: type=gha,mode=max + - name: Trivy scan + uses: aquasecurity/trivy-action@0.24.0 + with: + image-ref: ghcr.io/${{ github.repository }}:${{ github.sha }} + format: table + severity: CRITICAL,HIGH + exit-code: "0" # warn, don't fail (Phase-6 follow-up: fail on CRITICAL) diff --git a/README.md b/README.md index c55fc5f..d4cdf5b 100644 --- a/README.md +++ b/README.md @@ -146,7 +146,31 @@ project into six phases. **Phase 1 (this PR)** lands the foundation: - Schema includes `proposals`, `proposal_audit`, `matrix_versions`, `threat_lookups`, `audit_events` so Phases 3 and 5 can land additively. -**Phase 5 (this PR)** adds: +**Phase 6 (this PR)** adds: + +- `services/mcp_server/` — MCP server exposing 14 tools (runs, SGT, + proposals, threat intel). Two transports on one shared registry: + stdio (`python -m services.mcp_server.stdio`) for Claude Code / + Desktop, and streamable HTTP for LibreChat / remote clients. + `set_sgt_name` is gated by `--allow-dictionary-edit`. +- `deploy/Dockerfile` — multi-stage; one image serves every role + (api / worker / scheduler / mcp / threat-daemon / webex-bot / ui). + Non-root, read-only-rootfs friendly. +- `deploy/docker-compose.yml` — full stack (postgres + redis + every + service) behind Compose profiles for the optional ones (webex, + threat, ui). +- `deploy/k8s/base/` — kustomize base with Deployments + Services + + HPAs + PodDisruptionBudget for the scheduler + Ingress + an example + NetworkPolicy stack. Migration runs as a pre-install Job / + argocd-sync-wave -10. +- `core/observability/` — JSON structured logs + Prometheus metrics + (counters for flow_unknown / classifications / proposals / + threat_lookups). API exposes `/metrics`. +- `.github/workflows/ci.yml` — ruff lint, Alembic migration on a real + Postgres service container, `pytest` against the full matrix, + Docker build + push to GHCR on `main`, Trivy scan. + +**Phase 5** added: - `core/threat/` — pluggable threat-intelligence layer with a `ThreatIntelClient` Protocol and four implementations: @@ -213,9 +237,20 @@ project into six phases. **Phase 1 (this PR)** lands the foundation: `BackgroundTasks`; `POST /v1/proposals/{id}/decision` goes through the state machine. -Subsequent phases (separate PRs): +## Deploy -- **Phase 6** — MCP server + K8s manifests + observability + CI/CD. +```bash +# Local: full stack on docker-compose +docker compose -f deploy/docker-compose.yml up -d +docker compose -f deploy/docker-compose.yml --profile ui --profile webex up -d + +# Kubernetes +kubectl apply -k deploy/k8s/base +``` + +Both targets need at least `SCOPILOT_ANTHROPIC__API_KEY`; threat-intel +and WebEx-bot keys are optional. In production, source secrets via +External Secrets Operator rather than the example `Secret`. ## Apply migrations diff --git a/alembic/env.py b/alembic/env.py index 37b5d60..f366d22 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -5,13 +5,12 @@ from logging.config import fileConfig -from alembic import context from sqlalchemy import engine_from_config, pool +from alembic import context from segmentation_copilot.config import get_settings from segmentation_copilot.core.models.orm import Base - config = context.config if config.config_file_name is not None: fileConfig(config.config_file_name) diff --git a/alembic/versions/50d5b46784ac_initial_schema.py b/alembic/versions/50d5b46784ac_initial_schema.py index e14bd75..1556836 100644 --- a/alembic/versions/50d5b46784ac_initial_schema.py +++ b/alembic/versions/50d5b46784ac_initial_schema.py @@ -4,16 +4,16 @@ Revises: Create Date: 2026-05-22 11:29:07.382587 """ -from typing import Sequence, Union +from collections.abc import Sequence -from alembic import op import sqlalchemy as sa +from alembic import op revision: str = '50d5b46784ac' -down_revision: Union[str, None] = None -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None +down_revision: str | None = None +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None def upgrade() -> None: diff --git a/app.py b/app.py index 3511095..b42d55b 100644 --- a/app.py +++ b/app.py @@ -26,7 +26,6 @@ import httpx import streamlit as st - DEFAULT_API_BASE = os.environ.get("SCOPILOT_API_BASE", "http://localhost:8000") diff --git a/deploy/.dockerignore b/deploy/.dockerignore new file mode 100644 index 0000000..05fe5e6 --- /dev/null +++ b/deploy/.dockerignore @@ -0,0 +1,17 @@ +.git/ +.github/ +.venv/ +__pycache__/ +*.pyc +data/ +.pytest_cache/ +.ruff_cache/ +.mypy_cache/ +.coverage +htmlcov/ +*.egg-info/ +build/ +dist/ +node_modules/ +deploy/k8s/ +*.md diff --git a/deploy/Dockerfile b/deploy/Dockerfile new file mode 100644 index 0000000..b07769e --- /dev/null +++ b/deploy/Dockerfile @@ -0,0 +1,49 @@ +# Multi-stage build — one image, one role per replica. +# +# The image installs ALL optional extras so the same artifact can serve +# every role (api / worker / scheduler / threat-daemon / webex-bot / +# mcp-http / streamlit-ui). Pick the role with the container CMD or +# pod spec. Keeps the registry footprint to one tag per release. + +ARG PYTHON_VERSION=3.11 + +# --- builder --------------------------------------------------------------- +FROM python:${PYTHON_VERSION}-slim AS builder + +ENV PIP_DISABLE_PIP_VERSION_CHECK=1 \ + PIP_NO_CACHE_DIR=1 \ + PYTHONDONTWRITEBYTECODE=1 + +WORKDIR /build +COPY pyproject.toml README.md ./ +COPY src/ src/ +COPY services/ services/ +COPY alembic.ini ./ +COPY alembic/ alembic/ +COPY app.py ./ + +RUN python -m venv /opt/venv && \ + /opt/venv/bin/pip install --upgrade pip && \ + /opt/venv/bin/pip install ".[api,worker,webex,mcp,cli,ui,sources,otel,postgres]" + +# --- runtime --------------------------------------------------------------- +FROM python:${PYTHON_VERSION}-slim AS runtime + +ENV PATH="/opt/venv/bin:$PATH" \ + PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + +# Run as non-root. UID 10001 matches the K8s PSP-style baseline. +RUN groupadd --system --gid 10001 scopilot && \ + useradd --system --uid 10001 --gid scopilot --home /app --shell /usr/sbin/nologin scopilot + +WORKDIR /app +COPY --from=builder /opt/venv /opt/venv +COPY --from=builder /build /app + +USER 10001:10001 + +# Default to the API; override the CMD per replica. +EXPOSE 8000 +CMD ["uvicorn", "services.api.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml new file mode 100644 index 0000000..4e7f54f --- /dev/null +++ b/deploy/docker-compose.yml @@ -0,0 +1,185 @@ +# Segmentation Copilot — full stack. +# +# Brings up postgres + redis + every service we ship, behind a single +# `docker compose up`. For production deployments, prefer the K8s +# manifests under deploy/k8s/ — this compose file is optimised for +# local development and demo environments. +# +# Optional services live behind Compose profiles: +# --profile webex bring up the WebEx bot (requires bot token) +# --profile threat bring up the threat daemon (requires SSH creds +# and a threat-intel key) +# --profile ui bring up the Streamlit UI +# +# Usage: +# docker compose -f deploy/docker-compose.yml up -d +# docker compose -f deploy/docker-compose.yml --profile webex --profile ui up -d + +services: + + postgres: + image: postgres:16-alpine + environment: + POSTGRES_DB: scopilot + POSTGRES_USER: scopilot + POSTGRES_PASSWORD: scopilot + volumes: + - postgres-data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U scopilot -d scopilot"] + interval: 5s + timeout: 3s + retries: 10 + + redis: + image: redis:7-alpine + command: ["redis-server", "--appendonly", "yes"] + volumes: + - redis-data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 10 + + migrate: + build: + context: .. + dockerfile: deploy/Dockerfile + environment: + SCOPILOT_DB__URL: postgresql+asyncpg://scopilot:scopilot@postgres:5432/scopilot + SCOPILOT_DB__SYNC_URL: postgresql+psycopg2://scopilot:scopilot@postgres:5432/scopilot + command: ["alembic", "upgrade", "head"] + depends_on: + postgres: + condition: service_healthy + restart: "no" + + api: + build: + context: .. + dockerfile: deploy/Dockerfile + environment: + SCOPILOT_DB__URL: postgresql+asyncpg://scopilot:scopilot@postgres:5432/scopilot + SCOPILOT_REDIS__URL: redis://redis:6379/0 + SCOPILOT_API__REQUIRE_AUTH: "false" + SCOPILOT_LOG_FORMAT: json + SCOPILOT_ANTHROPIC__API_KEY: ${SCOPILOT_ANTHROPIC__API_KEY:-} + ports: + - "8000:8000" + depends_on: + migrate: + condition: service_completed_successfully + redis: + condition: service_healthy + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz').read()"] + interval: 10s + timeout: 5s + retries: 5 + + worker: + build: + context: .. + dockerfile: deploy/Dockerfile + environment: + SCOPILOT_DB__URL: postgresql+asyncpg://scopilot:scopilot@postgres:5432/scopilot + SCOPILOT_REDIS__URL: redis://redis:6379/0 + SCOPILOT_LOG_FORMAT: json + SCOPILOT_ANTHROPIC__API_KEY: ${SCOPILOT_ANTHROPIC__API_KEY:-} + command: ["python", "-m", "services.worker.main", "--role", "worker"] + depends_on: + migrate: + condition: service_completed_successfully + redis: + condition: service_healthy + + scheduler: + build: + context: .. + dockerfile: deploy/Dockerfile + environment: + SCOPILOT_DB__URL: postgresql+asyncpg://scopilot:scopilot@postgres:5432/scopilot + SCOPILOT_REDIS__URL: redis://redis:6379/0 + SCOPILOT_LOG_FORMAT: json + command: ["python", "-m", "services.worker.main", "--role", "scheduler"] + depends_on: + migrate: + condition: service_completed_successfully + redis: + condition: service_healthy + + mcp-http: + build: + context: .. + dockerfile: deploy/Dockerfile + environment: + SCOPILOT_DB__URL: postgresql+asyncpg://scopilot:scopilot@postgres:5432/scopilot + SCOPILOT_REDIS__URL: redis://redis:6379/0 + SCOPILOT_LOG_FORMAT: json + SCOPILOT_ANTHROPIC__API_KEY: ${SCOPILOT_ANTHROPIC__API_KEY:-} + command: ["python", "-m", "services.mcp_server.http", "--port", "8002"] + ports: + - "8002:8002" + depends_on: + migrate: + condition: service_completed_successfully + + webex-bot: + profiles: ["webex"] + build: + context: .. + dockerfile: deploy/Dockerfile + environment: + SCOPILOT_DB__URL: postgresql+asyncpg://scopilot:scopilot@postgres:5432/scopilot + SCOPILOT_REDIS__URL: redis://redis:6379/0 + SCOPILOT_LOG_FORMAT: json + SCOPILOT_WEBEX__BOT_ACCESS_TOKEN: ${SCOPILOT_WEBEX__BOT_ACCESS_TOKEN:-} + SCOPILOT_WEBEX__WEBHOOK_SECRET: ${SCOPILOT_WEBEX__WEBHOOK_SECRET:-} + SCOPILOT_WEBEX__OPERATORS_ROOM_ID: ${SCOPILOT_WEBEX__OPERATORS_ROOM_ID:-} + command: ["uvicorn", "services.webex_bot.main:app", "--host", "0.0.0.0", "--port", "8001"] + ports: + - "8001:8001" + depends_on: + migrate: + condition: service_completed_successfully + + threat-daemon: + profiles: ["threat"] + build: + context: .. + dockerfile: deploy/Dockerfile + environment: + SCOPILOT_DB__URL: postgresql+asyncpg://scopilot:scopilot@postgres:5432/scopilot + SCOPILOT_REDIS__URL: redis://redis:6379/0 + SCOPILOT_LOG_FORMAT: json + SCOPILOT_THREAT__ABUSEIPDB_API_KEY: ${SCOPILOT_THREAT__ABUSEIPDB_API_KEY:-} + SCOPILOT_THREAT__OTX_API_KEY: ${SCOPILOT_THREAT__OTX_API_KEY:-} + SCOPILOT_THREAT__VIRUSTOTAL_API_KEY: ${SCOPILOT_THREAT__VIRUSTOTAL_API_KEY:-} + SSH_HOST: ${SCOPILOT_THREAT_SSH_HOST:-} + SSH_USER: ${SCOPILOT_THREAT_SSH_USER:-} + SSH_LOG_PATH: ${SCOPILOT_THREAT_SSH_LOG_PATH:-/var/log/network/syslog} + command: ["sh", "-c", "python -m services.threat_daemon.main --host $$SSH_HOST --username $$SSH_USER --log-path $$SSH_LOG_PATH"] + depends_on: + migrate: + condition: service_completed_successfully + redis: + condition: service_healthy + + ui: + profiles: ["ui"] + build: + context: .. + dockerfile: deploy/Dockerfile + environment: + SCOPILOT_API_BASE: http://api:8000 + command: ["streamlit", "run", "/app/app.py", "--server.address", "0.0.0.0", "--server.port", "8501"] + ports: + - "8501:8501" + depends_on: + api: + condition: service_healthy + +volumes: + postgres-data: + redis-data: diff --git a/deploy/k8s/base/api.yaml b/deploy/k8s/base/api.yaml new file mode 100644 index 0000000..6924427 --- /dev/null +++ b/deploy/k8s/base/api.yaml @@ -0,0 +1,84 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: api +spec: + replicas: 2 + selector: + matchLabels: + app: api + template: + metadata: + labels: + app: api + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "8000" + prometheus.io/path: "/metrics" + spec: + securityContext: + runAsNonRoot: true + runAsUser: 10001 + seccompProfile: + type: RuntimeDefault + containers: + - name: api + image: ghcr.io/rlienard/segmentation-copilot:latest + imagePullPolicy: IfNotPresent + ports: + - containerPort: 8000 + name: http + envFrom: + - configMapRef: + name: scopilot-config + - secretRef: + name: scopilot-secrets + readinessProbe: + httpGet: + path: /readyz + port: http + periodSeconds: 5 + livenessProbe: + httpGet: + path: /healthz + port: http + periodSeconds: 20 + resources: + requests: { cpu: "100m", memory: "256Mi" } + limits: { cpu: "1000m", memory: "1Gi" } + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + capabilities: + drop: ["ALL"] +--- +apiVersion: v1 +kind: Service +metadata: + name: api +spec: + selector: + app: api + ports: + - port: 80 + targetPort: 8000 + name: http +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: api +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: api + minReplicas: 2 + maxReplicas: 8 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 diff --git a/deploy/k8s/base/configmap.yaml b/deploy/k8s/base/configmap.yaml new file mode 100644 index 0000000..c646483 --- /dev/null +++ b/deploy/k8s/base/configmap.yaml @@ -0,0 +1,21 @@ +# Non-secret configuration shared by every pod. +# +# Secret values (DB password, Anthropic key, WebEx token, threat-intel +# keys) live in a Kubernetes Secret — see secret-example.yaml. Production +# deployments should source them via External Secrets Operator + Vault, +# not from the literal secret in this directory. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: scopilot-config +data: + SCOPILOT_ENVIRONMENT: "prod" + SCOPILOT_LOG_LEVEL: "INFO" + SCOPILOT_LOG_FORMAT: "json" + SCOPILOT_DEFAULT_TENANT_ID: "default" + SCOPILOT_DB__URL: "postgresql+asyncpg://scopilot:scopilot@postgres:5432/scopilot" + SCOPILOT_DB__SYNC_URL: "postgresql+psycopg2://scopilot:scopilot@postgres:5432/scopilot" + SCOPILOT_REDIS__URL: "redis://redis:6379/0" + SCOPILOT_API__REQUIRE_AUTH: "true" + SCOPILOT_SCHED__SCAN_INTERVAL_MINUTES: "15" diff --git a/deploy/k8s/base/ingress.yaml b/deploy/k8s/base/ingress.yaml new file mode 100644 index 0000000..d400a74 --- /dev/null +++ b/deploy/k8s/base/ingress.yaml @@ -0,0 +1,60 @@ +# nginx-ingress + cert-manager based ingress. Replace hosts in your +# overlay; the base only declares the *shape* — no real DNS names. + +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: scopilot + annotations: + cert-manager.io/cluster-issuer: letsencrypt-prod + nginx.ingress.kubernetes.io/proxy-body-size: "20m" +spec: + ingressClassName: nginx + tls: + - hosts: + - api.example.com + - mcp.example.com + - ui.example.com + - webex.example.com + secretName: scopilot-tls + rules: + - host: api.example.com + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: api + port: + number: 80 + - host: mcp.example.com + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: mcp-http + port: + number: 80 + - host: ui.example.com + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: ui + port: + number: 80 + - host: webex.example.com + http: + paths: + - path: /webhooks/webex + pathType: Prefix + backend: + service: + name: webex-bot + port: + number: 80 diff --git a/deploy/k8s/base/kustomization.yaml b/deploy/k8s/base/kustomization.yaml new file mode 100644 index 0000000..81cb48e --- /dev/null +++ b/deploy/k8s/base/kustomization.yaml @@ -0,0 +1,30 @@ +# Kustomize base — every deployment, service, configmap, and secret +# the stack needs. Overlays under `overlays/{dev,staging,prod}` patch +# replica counts, resource limits, ingress hosts, and secret refs. + +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: scopilot + +resources: + - namespace.yaml + - configmap.yaml + - secret-example.yaml + - postgres.yaml + - redis.yaml + - migration.yaml + - api.yaml + - worker.yaml + - scheduler.yaml + - mcp-http.yaml + - webex-bot.yaml + - streamlit-ui.yaml + - ingress.yaml + +commonLabels: + app.kubernetes.io/part-of: segmentation-copilot + +images: + - name: ghcr.io/rlienard/segmentation-copilot + newTag: latest diff --git a/deploy/k8s/base/mcp-http.yaml b/deploy/k8s/base/mcp-http.yaml new file mode 100644 index 0000000..5524cde --- /dev/null +++ b/deploy/k8s/base/mcp-http.yaml @@ -0,0 +1,49 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mcp-http +spec: + replicas: 2 + selector: + matchLabels: + app: mcp-http + template: + metadata: + labels: + app: mcp-http + spec: + securityContext: + runAsNonRoot: true + runAsUser: 10001 + containers: + - name: mcp + image: ghcr.io/rlienard/segmentation-copilot:latest + command: ["python", "-m", "services.mcp_server.http", "--port", "8002"] + ports: + - containerPort: 8002 + name: http + envFrom: + - configMapRef: + name: scopilot-config + - secretRef: + name: scopilot-secrets + resources: + requests: { cpu: "100m", memory: "256Mi" } + limits: { cpu: "1000m", memory: "1Gi" } + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + capabilities: + drop: ["ALL"] +--- +apiVersion: v1 +kind: Service +metadata: + name: mcp-http +spec: + selector: + app: mcp-http + ports: + - port: 80 + targetPort: 8002 + name: http diff --git a/deploy/k8s/base/migration.yaml b/deploy/k8s/base/migration.yaml new file mode 100644 index 0000000..97d5043 --- /dev/null +++ b/deploy/k8s/base/migration.yaml @@ -0,0 +1,28 @@ +# One-shot Job that runs `alembic upgrade head` before any service starts. +# Use a Helm pre-install hook / Argo Sync wave in production. + +apiVersion: batch/v1 +kind: Job +metadata: + name: scopilot-migrate + annotations: + helm.sh/hook: pre-install,pre-upgrade + argocd.argoproj.io/sync-wave: "-10" +spec: + backoffLimit: 3 + ttlSecondsAfterFinished: 600 + template: + spec: + restartPolicy: OnFailure + containers: + - name: alembic + image: ghcr.io/rlienard/segmentation-copilot:latest + command: ["alembic", "upgrade", "head"] + envFrom: + - configMapRef: + name: scopilot-config + - secretRef: + name: scopilot-secrets + resources: + requests: { cpu: "50m", memory: "128Mi" } + limits: { cpu: "500m", memory: "512Mi" } diff --git a/deploy/k8s/base/namespace.yaml b/deploy/k8s/base/namespace.yaml new file mode 100644 index 0000000..51057ac --- /dev/null +++ b/deploy/k8s/base/namespace.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: scopilot + labels: + pod-security.kubernetes.io/enforce: restricted + pod-security.kubernetes.io/audit: restricted + pod-security.kubernetes.io/warn: restricted diff --git a/deploy/k8s/base/networkpolicy.yaml b/deploy/k8s/base/networkpolicy.yaml new file mode 100644 index 0000000..4777e89 --- /dev/null +++ b/deploy/k8s/base/networkpolicy.yaml @@ -0,0 +1,68 @@ +# Zero-trust intra-namespace traffic. +# +# Default-deny on ingress + egress, then explicit allows per service. +# Not in the base `kustomization.yaml` by default because cluster +# operators may have their own NetworkPolicy stack — apply this file +# manually or include it via an overlay. + +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: default-deny +spec: + podSelector: {} + policyTypes: [Ingress, Egress] +--- +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: api-allow +spec: + podSelector: + matchLabels: + app: api + policyTypes: [Ingress, Egress] + ingress: + - from: + - namespaceSelector: {} + egress: + - to: + - podSelector: + matchLabels: + app: postgres + - podSelector: + matchLabels: + app: redis + - to: # DNS + - namespaceSelector: {} + ports: + - protocol: UDP + port: 53 +--- +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: worker-allow +spec: + podSelector: + matchLabels: + app: worker + policyTypes: [Egress] + egress: + - to: + - podSelector: + matchLabels: + app: postgres + - podSelector: + matchLabels: + app: redis + # Anthropic + threat-intel APIs — restrict to known egress proxy in prod. + - to: [] + ports: + - protocol: TCP + port: 443 + - to: + - namespaceSelector: {} + ports: + - protocol: UDP + port: 53 diff --git a/deploy/k8s/base/postgres.yaml b/deploy/k8s/base/postgres.yaml new file mode 100644 index 0000000..13cd890 --- /dev/null +++ b/deploy/k8s/base/postgres.yaml @@ -0,0 +1,76 @@ +# Single-replica StatefulSet for dev/staging. Production should point +# `SCOPILOT_DB__URL` at a managed instance (RDS / Cloud SQL / etc.) and +# delete this StatefulSet. + +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: postgres +spec: + serviceName: postgres + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + securityContext: + fsGroup: 999 + containers: + - name: postgres + image: postgres:16-alpine + env: + - name: POSTGRES_DB + value: scopilot + - name: POSTGRES_USER + value: scopilot + - name: POSTGRES_PASSWORD + valueFrom: + secretKeyRef: + name: scopilot-secrets + key: POSTGRES_PASSWORD + - name: PGDATA + value: /var/lib/postgresql/data/pgdata + ports: + - containerPort: 5432 + readinessProbe: + exec: + command: ["pg_isready", "-U", "scopilot", "-d", "scopilot"] + periodSeconds: 5 + livenessProbe: + exec: + command: ["pg_isready", "-U", "scopilot", "-d", "scopilot"] + periodSeconds: 20 + volumeMounts: + - name: data + mountPath: /var/lib/postgresql/data + resources: + requests: + cpu: "100m" + memory: "256Mi" + limits: + cpu: "1000m" + memory: "1Gi" + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 10Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres +spec: + clusterIP: None + selector: + app: postgres + ports: + - port: 5432 + targetPort: 5432 diff --git a/deploy/k8s/base/redis.yaml b/deploy/k8s/base/redis.yaml new file mode 100644 index 0000000..0008e87 --- /dev/null +++ b/deploy/k8s/base/redis.yaml @@ -0,0 +1,59 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: redis +spec: + serviceName: redis + replicas: 1 + selector: + matchLabels: + app: redis + template: + metadata: + labels: + app: redis + spec: + containers: + - name: redis + image: redis:7-alpine + args: ["redis-server", "--appendonly", "yes"] + ports: + - containerPort: 6379 + readinessProbe: + tcpSocket: + port: 6379 + periodSeconds: 5 + livenessProbe: + tcpSocket: + port: 6379 + periodSeconds: 20 + volumeMounts: + - name: data + mountPath: /data + resources: + requests: + cpu: "50m" + memory: "128Mi" + limits: + cpu: "500m" + memory: "512Mi" + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 5Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: redis +spec: + clusterIP: None + selector: + app: redis + ports: + - port: 6379 + targetPort: 6379 diff --git a/deploy/k8s/base/scheduler.yaml b/deploy/k8s/base/scheduler.yaml new file mode 100644 index 0000000..91c1fcc --- /dev/null +++ b/deploy/k8s/base/scheduler.yaml @@ -0,0 +1,50 @@ +# Two replicas — one is the elected leader, the other idles, ready to +# take over when the lease expires. Redis SETNX-EX in services.worker.leader +# enforces the single-active invariant. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: scheduler +spec: + replicas: 2 + selector: + matchLabels: + app: scheduler + template: + metadata: + labels: + app: scheduler + spec: + securityContext: + runAsNonRoot: true + runAsUser: 10001 + containers: + - name: scheduler + image: ghcr.io/rlienard/segmentation-copilot:latest + command: ["python", "-m", "services.worker.main", "--role", "scheduler"] + envFrom: + - configMapRef: + name: scopilot-config + - secretRef: + name: scopilot-secrets + resources: + requests: { cpu: "50m", memory: "128Mi" } + limits: { cpu: "500m", memory: "512Mi" } + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + capabilities: + drop: ["ALL"] +--- +# A 50% PDB keeps at least one scheduler standing during a rolling +# upgrade so the leader handover is fast. +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: scheduler +spec: + minAvailable: 1 + selector: + matchLabels: + app: scheduler diff --git a/deploy/k8s/base/secret-example.yaml b/deploy/k8s/base/secret-example.yaml new file mode 100644 index 0000000..ab65fdc --- /dev/null +++ b/deploy/k8s/base/secret-example.yaml @@ -0,0 +1,24 @@ +# EXAMPLE secret — DO NOT use in production. Replace with an +# ExternalSecret managed by External Secrets Operator backed by +# Vault / AWS Secrets Manager / GCP Secret Manager. +# +# `kubectl apply` of this file is opt-in: it's intentionally not +# referenced by kustomization.yaml's `resources` list once you switch +# to ExternalSecret. For dev/staging, fill in the values below and +# `kubectl apply -f`. + +apiVersion: v1 +kind: Secret +metadata: + name: scopilot-secrets +type: Opaque +stringData: + SCOPILOT_ANTHROPIC__API_KEY: "REPLACE_ME" + SCOPILOT_API__API_KEYS: '["REPLACE_ME_WITH_A_TOKEN"]' + SCOPILOT_WEBEX__BOT_ACCESS_TOKEN: "" + SCOPILOT_WEBEX__WEBHOOK_SECRET: "" + SCOPILOT_WEBEX__OPERATORS_ROOM_ID: "" + SCOPILOT_THREAT__ABUSEIPDB_API_KEY: "" + SCOPILOT_THREAT__OTX_API_KEY: "" + SCOPILOT_THREAT__VIRUSTOTAL_API_KEY: "" + POSTGRES_PASSWORD: "scopilot" diff --git a/deploy/k8s/base/streamlit-ui.yaml b/deploy/k8s/base/streamlit-ui.yaml new file mode 100644 index 0000000..6d7b29f --- /dev/null +++ b/deploy/k8s/base/streamlit-ui.yaml @@ -0,0 +1,47 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ui +spec: + replicas: 1 + selector: + matchLabels: + app: ui + template: + metadata: + labels: + app: ui + spec: + securityContext: + runAsNonRoot: true + runAsUser: 10001 + containers: + - name: ui + image: ghcr.io/rlienard/segmentation-copilot:latest + command: ["streamlit", "run", "/app/app.py", "--server.address", "0.0.0.0", + "--server.port", "8501", "--server.headless", "true"] + env: + - name: SCOPILOT_API_BASE + value: "http://api" + ports: + - containerPort: 8501 + name: http + resources: + requests: { cpu: "50m", memory: "256Mi" } + limits: { cpu: "500m", memory: "512Mi" } + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: ["ALL"] +--- +apiVersion: v1 +kind: Service +metadata: + name: ui +spec: + selector: + app: ui + ports: + - port: 80 + targetPort: 8501 + name: http diff --git a/deploy/k8s/base/webex-bot.yaml b/deploy/k8s/base/webex-bot.yaml new file mode 100644 index 0000000..e5d6680 --- /dev/null +++ b/deploy/k8s/base/webex-bot.yaml @@ -0,0 +1,49 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: webex-bot +spec: + replicas: 1 + selector: + matchLabels: + app: webex-bot + template: + metadata: + labels: + app: webex-bot + spec: + securityContext: + runAsNonRoot: true + runAsUser: 10001 + containers: + - name: webex + image: ghcr.io/rlienard/segmentation-copilot:latest + command: ["uvicorn", "services.webex_bot.main:app", "--host", "0.0.0.0", "--port", "8001"] + ports: + - containerPort: 8001 + name: http + envFrom: + - configMapRef: + name: scopilot-config + - secretRef: + name: scopilot-secrets + resources: + requests: { cpu: "50m", memory: "128Mi" } + limits: { cpu: "500m", memory: "512Mi" } + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + capabilities: + drop: ["ALL"] +--- +apiVersion: v1 +kind: Service +metadata: + name: webex-bot +spec: + selector: + app: webex-bot + ports: + - port: 80 + targetPort: 8001 + name: http diff --git a/deploy/k8s/base/worker.yaml b/deploy/k8s/base/worker.yaml new file mode 100644 index 0000000..ebc5334 --- /dev/null +++ b/deploy/k8s/base/worker.yaml @@ -0,0 +1,57 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: worker +spec: + replicas: 2 + selector: + matchLabels: + app: worker + template: + metadata: + labels: + app: worker + spec: + securityContext: + runAsNonRoot: true + runAsUser: 10001 + seccompProfile: + type: RuntimeDefault + containers: + - name: worker + image: ghcr.io/rlienard/segmentation-copilot:latest + command: ["python", "-m", "services.worker.main", "--role", "worker"] + envFrom: + - configMapRef: + name: scopilot-config + - secretRef: + name: scopilot-secrets + resources: + requests: { cpu: "100m", memory: "256Mi" } + limits: { cpu: "2000m", memory: "1Gi" } + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + capabilities: + drop: ["ALL"] +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: worker +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: worker + minReplicas: 1 + maxReplicas: 10 + # CPU-based for now. Phase-6 follow-up: KEDA scaler on Redis Streams + # pending-entries depth. + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 75 diff --git a/pyproject.toml b/pyproject.toml index f89bebc..5b7b425 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [ "sqlalchemy>=2.0.30", "alembic>=1.13.0", "aiosqlite>=0.20.0", + "prometheus-client>=0.20.0", ] [project.optional-dependencies] @@ -101,7 +102,10 @@ target-version = "py311" [tool.ruff.lint] select = ["E", "F", "I", "B", "UP", "ASYNC"] -ignore = ["E501"] +# B008: FastAPI's Depends() pattern intentionally uses calls in defaults. +# UP042: ProposalStatus / FlowCategory inherit str+Enum on purpose for +# JSON-compatible enum values; switching to StrEnum is a follow-up. +ignore = ["E501", "B008", "UP042"] [tool.mypy] python_version = "3.11" diff --git a/services/api/main.py b/services/api/main.py index 962645f..f57cb93 100644 --- a/services/api/main.py +++ b/services/api/main.py @@ -11,8 +11,8 @@ from __future__ import annotations +from collections.abc import AsyncIterator from contextlib import asynccontextmanager -from typing import AsyncIterator from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware @@ -58,6 +58,11 @@ def create_app() -> FastAPI: app.include_router(matrix.router) app.include_router(sgt.router) app.include_router(proposals.router) + + # Prometheus /metrics — scraped by the K8s monitoring stack. + from segmentation_copilot.core.observability import make_metrics_endpoint # noqa: PLC0415 + + app.add_route("/metrics", make_metrics_endpoint(), methods=["GET"]) return app diff --git a/services/api/routers/runs.py b/services/api/routers/runs.py index 981b429..c3b9112 100644 --- a/services/api/routers/runs.py +++ b/services/api/routers/runs.py @@ -15,7 +15,6 @@ from segmentation_copilot.core.services import ( ClassificationService, IngestionService, - MatrixService, ) from ..auth import AuthContext diff --git a/services/api/schemas.py b/services/api/schemas.py index 15be1be..2ecb495 100644 --- a/services/api/schemas.py +++ b/services/api/schemas.py @@ -26,7 +26,6 @@ SGTEntryRecord, ) - # --------------------------------------------------------------------------- # Runs # --------------------------------------------------------------------------- diff --git a/services/cli/main.py b/services/cli/main.py index e0db895..599d23e 100644 --- a/services/cli/main.py +++ b/services/cli/main.py @@ -11,7 +11,6 @@ import json import os -import sys from pathlib import Path from typing import Any diff --git a/services/mcp_server/__init__.py b/services/mcp_server/__init__.py new file mode 100644 index 0000000..2cfdf5e --- /dev/null +++ b/services/mcp_server/__init__.py @@ -0,0 +1,13 @@ +"""MCP server — same `core` import as every other service. + +Two transports are shipped as separate binaries on top of one shared +tool registry (`tools.py`): + + * `stdio.py` — `python -m services.mcp_server.stdio` + Suits Claude Code / Claude Desktop / any local client. + * `http.py` — streamable HTTP for LibreChat / remote MCP clients. + +The MCP protocol is stateless — every tool takes an explicit `run_id` / +`tenant_id`, never relies on conversation-scoped state. Authorization +lives at the repo layer (already tenant-scoped from Phase 1). +""" diff --git a/services/mcp_server/http.py b/services/mcp_server/http.py new file mode 100644 index 0000000..df90998 --- /dev/null +++ b/services/mcp_server/http.py @@ -0,0 +1,61 @@ +"""Streamable-HTTP MCP transport — for LibreChat and other remote MCP clients. + +Run: + uvicorn services.mcp_server.http:app --host 0.0.0.0 --port 8002 + +Or directly: + python -m services.mcp_server.http [--host 0.0.0.0] [--port 8002] + [--allow-dictionary-edit] + +Auth: deliberately handled by the ingress (Phase 2 OIDC sidecar or an +upstream gateway). The MCP server itself trusts whatever reaches it — +pair it with a NetworkPolicy or a reverse proxy that enforces auth. +""" + +from __future__ import annotations + +import argparse +import logging +import os +import sys + +from .server import build_server + + +def _build_app(allow_edit: bool): + server = build_server(allow_dictionary_edit=allow_edit) + # FastMCP exposes a streamable-HTTP Starlette app. + return server.streamable_http_app() + + +# Default app for `uvicorn services.mcp_server.http:app`; the dictionary +# edit flag also accepts an env override so the K8s manifest can flip it +# without changing the CMD. +app = _build_app(allow_edit=os.environ.get("SCOPILOT_MCP_ALLOW_EDIT", "") == "true") + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description="Segmentation Copilot MCP server (HTTP transport)" + ) + parser.add_argument("--host", default="0.0.0.0") + parser.add_argument("--port", type=int, default=8002) + parser.add_argument("--allow-dictionary-edit", action="store_true") + parser.add_argument("--log-level", default="INFO") + args = parser.parse_args(argv) + + logging.basicConfig( + level=getattr(logging, args.log_level.upper(), logging.INFO), + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + import uvicorn # noqa: PLC0415 + + uvicorn.run( + _build_app(allow_edit=args.allow_dictionary_edit), + host=args.host, port=args.port, + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/services/mcp_server/server.py b/services/mcp_server/server.py new file mode 100644 index 0000000..3e5f9aa --- /dev/null +++ b/services/mcp_server/server.py @@ -0,0 +1,342 @@ +"""FastMCP server + tool registry shared by stdio and HTTP transports. + +Each tool is a thin async wrapper around an existing `core.services.*` +operation. Tools take an explicit `tenant_id` so the same server instance +can serve multiple tenants if the front-door auth provides one — for +single-tenant deployments, leave it at the default. + +`register_sgt_name` and `set_sgt_dictionary_bulk` are gated by the +`allow_dictionary_edit` setting; the MCP client is not the right place +for an unprivileged user to silently rename SGTs. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from mcp.server.fastmcp import FastMCP + +from segmentation_copilot.config import get_settings +from segmentation_copilot.core import db as core_db +from segmentation_copilot.core.models.domain import ( + ACE, + ProposalStatus, + ProposalTrigger, +) +from segmentation_copilot.core.repositories.classifications import ( + ClassificationRepository, +) +from segmentation_copilot.core.repositories.events import FlowEventRepository +from segmentation_copilot.core.repositories.proposals import ProposalRepository +from segmentation_copilot.core.repositories.runs import RunRepository +from segmentation_copilot.core.repositories.sgt import SGTRepository +from segmentation_copilot.core.services import ( + ClassificationService, + IngestionService, + MatrixService, +) +from segmentation_copilot.core.services.proposal import ( + ProposalApplyError, + ProposalConflictError, + ProposalService, +) +from segmentation_copilot.core.threat.aggregator import build_default_aggregator + +log = logging.getLogger(__name__) + + +SERVER_INSTRUCTIONS = """\ +Segmentation Copilot MCP server. + +Tools fall into four groups: + - Runs: start_run, ingest_lines, classify_run, build_matrix, list_runs, + get_run, list_missing_sgts. + - SGT dictionary: list_sgt_entries, set_sgt_name (gated). + - Proposals: list_proposals, get_proposal, approve_proposal, + reject_proposal. + - Threat intel: lookup_threat_intel. + +Always pass an explicit run_id for any per-run operation. Tenant defaults +to the server's configured tenant; multi-tenant deployments may pass +tenant_id explicitly. +""" + + +def build_server(*, allow_dictionary_edit: bool | None = None) -> FastMCP: + settings = get_settings() + allow_edit = ( + allow_dictionary_edit + if allow_dictionary_edit is not None + else False # default: read-only dictionary; flip via CLI flag. + ) + default_tenant = settings.default_tenant_id + + mcp = FastMCP(name="segmentation-copilot", instructions=SERVER_INSTRUCTIONS) + + # ------------------------------------------------------------------ + # Runs + # ------------------------------------------------------------------ + + @mcp.tool() + async def start_run( + source_type: str = "inline", + trigger: str = "manual", + tenant_id: str | None = None, + ) -> dict[str, Any]: + """Create a new analysis run. Returns `{run_id, started_at}`.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + run = await RunRepository(session).create( + tenant_id=tenant, source_type=source_type, trigger=trigger, + ) + return {"run_id": run.id, "started_at": run.started_at.isoformat()} + + @mcp.tool() + async def ingest_lines( + run_id: int, + lines: list[str], + tenant_id: str | None = None, + ) -> dict[str, Any]: + """Ingest raw syslog lines into an existing run.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + result = await IngestionService(session).ingest_lines( + tenant_id=tenant, lines=lines, run_id=run_id, + ) + return result.summary() + + @mcp.tool() + async def list_missing_sgts( + run_id: int, tenant_id: str | None = None + ) -> list[int]: + """SGT IDs observed in this run that aren't in the dictionary.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + ids = await FlowEventRepository(session).distinct_sgt_dgt_for_run(run_id) + return await SGTRepository(session).missing_ids( + tenant_id=tenant, ids=ids + ) + + @mcp.tool() + async def classify_run( + run_id: int, tenant_id: str | None = None + ) -> dict[str, int]: + """Classify all aggregated flows for the run. Returns category counts.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + ingestion = IngestionService(session) + flows = await ingestion.aggregated_for_run(run_id) + if not flows: + return {} + counts = await ClassificationService(session).classify( + tenant_id=tenant, run_id=run_id, flows=flows, + ) + return counts + + @mcp.tool() + async def build_matrix( + run_id: int, tenant_id: str | None = None + ) -> dict[str, Any]: + """Build (or rebuild) the contract matrix for a run. Returns markdown + JSON.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + svc = MatrixService(session) + contracts = await svc.build(tenant_id=tenant, run_id=run_id) + md = await svc.render_markdown(run_id=run_id) + return { + "run_id": run_id, + "markdown": md, + "contracts": [c.model_dump(mode="json") for c in contracts], + } + + @mcp.tool() + async def list_runs( + limit: int = 20, tenant_id: str | None = None + ) -> list[dict[str, Any]]: + """List recent runs for the tenant.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + runs = await RunRepository(session).list_for_tenant(tenant, limit=limit) + return [r.model_dump(mode="json") for r in runs] + + @mcp.tool() + async def get_run( + run_id: int, tenant_id: str | None = None + ) -> dict[str, Any] | None: + """Fetch a single run by id.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + run = await RunRepository(session).get(run_id) + if run is None or run.tenant_id != tenant: + return None + return run.model_dump(mode="json") + + @mcp.tool() + async def list_classifications( + run_id: int, tenant_id: str | None = None + ) -> list[dict[str, Any]]: + """Per-flow classifications for a run.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + run = await RunRepository(session).get(run_id) + if run is None or run.tenant_id != tenant: + return [] + records = await ClassificationRepository(session).list_for_run(run_id) + return [c.model_dump(mode="json") for c in records] + + # ------------------------------------------------------------------ + # SGT dictionary + # ------------------------------------------------------------------ + + @mcp.tool() + async def list_sgt_entries( + tenant_id: str | None = None, + ) -> list[dict[str, Any]]: + """List the SGT id→name dictionary for the tenant.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + entries = await SGTRepository(session).list_for_tenant(tenant) + return [e.model_dump(mode="json") for e in entries] + + if allow_edit: + + @mcp.tool() + async def set_sgt_name( + sgt_id: int, name: str, tenant_id: str | None = None + ) -> dict[str, Any]: + """Upsert one SGT id→name. Only available with --allow-dictionary-edit.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + record = await SGTRepository(session).upsert( + tenant_id=tenant, sgt_id=sgt_id, name=name + ) + return record.model_dump(mode="json") + + # ------------------------------------------------------------------ + # Proposals + # ------------------------------------------------------------------ + + @mcp.tool() + async def list_proposals( + status: str | None = None, + limit: int = 50, + tenant_id: str | None = None, + ) -> list[dict[str, Any]]: + """List rule proposals. `status` filters by lifecycle state.""" + tenant = tenant_id or default_tenant + status_filter = ProposalStatus(status) if status else None + async with core_db.session_scope() as session: + proposals = await ProposalRepository(session).list_for_tenant( + tenant_id=tenant, status=status_filter, limit=limit, + ) + return [p.model_dump(mode="json") for p in proposals] + + @mcp.tool() + async def get_proposal( + proposal_id: str, tenant_id: str | None = None + ) -> dict[str, Any] | None: + """Fetch a proposal by id.""" + tenant = tenant_id or default_tenant + async with core_db.session_scope() as session: + proposal = await ProposalRepository(session).get(proposal_id) + if proposal is None or proposal.tenant_id != tenant: + return None + return proposal.model_dump(mode="json") + + @mcp.tool() + async def create_proposal( + src_sgt: int, + dst_sgt: int, + proposed_aces: list[dict[str, Any]], + rationale: str, + run_id: int | None = None, + tenant_id: str | None = None, + ) -> dict[str, Any]: + """Create (or storm-collapse into) a rule proposal.""" + tenant = tenant_id or default_tenant + aces = [ACE(**a) for a in proposed_aces] + async with core_db.session_scope() as session: + proposal, _ = await ProposalService(session).propose( + tenant_id=tenant, + trigger=ProposalTrigger.MANUAL, + src_sgt=src_sgt, + dst_sgt=dst_sgt, + proposed_aces=aces, + rationale=rationale, + run_id=run_id, + ) + return proposal.model_dump(mode="json") + + @mcp.tool() + async def approve_proposal( + proposal_id: str, + actor: str, + tenant_id: str | None = None, + ) -> dict[str, Any]: + """Approve a proposal — creates a new matrix_version. Optimistic-locked.""" + return await _decide(proposal_id, ProposalStatus.APPROVED, actor, + tenant_id or default_tenant) + + @mcp.tool() + async def reject_proposal( + proposal_id: str, + actor: str, + tenant_id: str | None = None, + ) -> dict[str, Any]: + """Reject a proposal.""" + return await _decide(proposal_id, ProposalStatus.REJECTED, actor, + tenant_id or default_tenant) + + async def _decide(proposal_id: str, decision: ProposalStatus, actor: str, + tenant: str) -> dict[str, Any]: + async with core_db.session_scope() as session: + service = ProposalService(session) + existing = await service.proposals.get(proposal_id) + if existing is None or existing.tenant_id != tenant: + return {"error": "proposal not found"} + try: + decided = await service.decide( + proposal_id=proposal_id, decision=decision, + actor=actor, channel="mcp", + ) + except ProposalConflictError as exc: + return {"error": "conflict", "detail": str(exc)} + except ProposalApplyError as exc: + return {"error": "apply_failed", "detail": str(exc)} + return decided.model_dump(mode="json") + + # ------------------------------------------------------------------ + # Threat intel + # ------------------------------------------------------------------ + + @mcp.tool() + async def lookup_threat_intel( + ip: str, + tenant_id: str | None = None, + ) -> dict[str, Any]: + """Run the configured threat-intel providers against an IP.""" + tenant = tenant_id or default_tenant + aggregator = build_default_aggregator() + if aggregator is None: + return {"error": "no threat-intel providers configured"} + try: + decision = await aggregator.lookup_ip(tenant_id=tenant, ip=ip) + finally: + await aggregator.aclose() + return { + "ip": decision.target, + "is_malicious": decision.is_malicious, + "max_score": decision.max_score, + "triggering_providers": decision.triggering_providers, + "verdicts": [ + { + "provider": v.provider, + "score": v.score, + "categories": v.categories, + } + for v in decision.verdicts + ], + } + + return mcp diff --git a/services/mcp_server/stdio.py b/services/mcp_server/stdio.py new file mode 100644 index 0000000..b67bd07 --- /dev/null +++ b/services/mcp_server/stdio.py @@ -0,0 +1,41 @@ +"""Stdio MCP transport — for Claude Code, Claude Desktop, local CLI clients. + +Run: + python -m services.mcp_server.stdio [--allow-dictionary-edit] +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import sys + +from .server import build_server + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description="Segmentation Copilot MCP server (stdio transport)" + ) + parser.add_argument( + "--allow-dictionary-edit", action="store_true", + help="Expose set_sgt_name tool. Default is read-only on the dictionary.", + ) + parser.add_argument("--log-level", default="WARNING", + help="stdio MCP is sensitive to stdout noise; default WARNING") + args = parser.parse_args(argv) + + # stdio uses stdout for the MCP protocol — keep logs on stderr only. + logging.basicConfig( + level=getattr(logging, args.log_level.upper(), logging.WARNING), + stream=sys.stderr, + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + server = build_server(allow_dictionary_edit=args.allow_dictionary_edit) + asyncio.run(server.run_stdio_async()) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/services/threat_daemon/main.py b/services/threat_daemon/main.py index 7076f17..67f386a 100644 --- a/services/threat_daemon/main.py +++ b/services/threat_daemon/main.py @@ -16,8 +16,8 @@ import asyncio import logging import sys +from collections.abc import AsyncIterator from contextlib import asynccontextmanager -from typing import AsyncIterator from segmentation_copilot.config import get_settings from segmentation_copilot.core import db as core_db diff --git a/services/threat_daemon/runner.py b/services/threat_daemon/runner.py index 396e0b6..f3d5e0c 100644 --- a/services/threat_daemon/runner.py +++ b/services/threat_daemon/runner.py @@ -8,8 +8,8 @@ from __future__ import annotations import logging +from collections.abc import Awaitable, Callable from datetime import datetime -from typing import Awaitable, Callable from segmentation_copilot import parser from segmentation_copilot.core.events import ( @@ -21,7 +21,6 @@ from segmentation_copilot.sources.streaming import StreamingLogSource from segmentation_copilot.sources.streaming_ssh import HEARTBEAT_PREFIX - log = logging.getLogger(__name__) diff --git a/services/worker/cursor.py b/services/worker/cursor.py index 6fd7772..d3f7a2f 100644 --- a/services/worker/cursor.py +++ b/services/worker/cursor.py @@ -14,7 +14,6 @@ from datetime import datetime from typing import Protocol - CURSOR_TTL_SECONDS = 28 * 24 * 3600 _KEY = "scopilot:scan:cursor:{tenant_id}" diff --git a/services/worker/leader.py b/services/worker/leader.py index 70b8312..f4c885c 100644 --- a/services/worker/leader.py +++ b/services/worker/leader.py @@ -14,8 +14,9 @@ import asyncio import uuid +from collections.abc import AsyncIterator from contextlib import asynccontextmanager -from typing import AsyncIterator, Protocol +from typing import Protocol class LeaderElector(Protocol): @@ -99,7 +100,7 @@ async def _run(self) -> None: self._is_leader = False try: await asyncio.wait_for(self._stop.wait(), timeout=self._refresh) - except asyncio.TimeoutError: + except TimeoutError: pass diff --git a/services/worker/scan.py b/services/worker/scan.py index 01385b5..3654edd 100644 --- a/services/worker/scan.py +++ b/services/worker/scan.py @@ -18,7 +18,6 @@ from __future__ import annotations import hashlib -from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime @@ -29,6 +28,7 @@ EventBus, FlowUnknownPayload, ) +from segmentation_copilot.core.observability import flow_unknown_published_counter from segmentation_copilot.core.repositories.events import FlowEventRepository from segmentation_copilot.core.services.baseline import BaselineService @@ -125,6 +125,9 @@ async def scan_tenant( ) if event_id is not None: enqueued += 1 + flow_unknown_published_counter.labels( + tenant_id=tenant_id, trigger="scheduled" + ).inc() # Advance cursor to the newest event we just observed (NOT `now` — # the cursor must track ingestion progress, not wall-clock, so a diff --git a/services/worker/scheduler.py b/services/worker/scheduler.py index e9b0f3a..4a279be 100644 --- a/services/worker/scheduler.py +++ b/services/worker/scheduler.py @@ -21,7 +21,6 @@ from .leader import LeaderElector from .scan import ScanResult, scan_tenant - log = logging.getLogger(__name__) @@ -67,5 +66,5 @@ async def run_scheduler( try: await asyncio.wait_for(stop_event.wait(), timeout=interval) - except asyncio.TimeoutError: + except TimeoutError: pass diff --git a/services/worker/worker.py b/services/worker/worker.py index 8d35115..a623d80 100644 --- a/services/worker/worker.py +++ b/services/worker/worker.py @@ -18,6 +18,7 @@ from anthropic import Anthropic +from segmentation_copilot import classify as classify_mod from segmentation_copilot.aggregator import AggregatedFlow, FlowKey from segmentation_copilot.config import Settings from segmentation_copilot.core import db as core_db @@ -31,10 +32,8 @@ from segmentation_copilot.core.models.domain import ACE, ProposalTrigger from segmentation_copilot.core.repositories.sgt import SGTRepository from segmentation_copilot.core.services.classification import ClassificationService -from segmentation_copilot.core.services.proposal import ProposalService from segmentation_copilot.core.services.notifier import get_notifier -from segmentation_copilot import classify as classify_mod - +from segmentation_copilot.core.services.proposal import ProposalService CONSUMER_GROUP_FLOW_UNKNOWN = "scopilot.workers.flow_unknown.v1" diff --git a/src/segmentation_copilot/agent.py b/src/segmentation_copilot/agent.py index 2b7205d..60cd9fb 100644 --- a/src/segmentation_copilot/agent.py +++ b/src/segmentation_copilot/agent.py @@ -16,7 +16,6 @@ from . import tools - SYSTEM_PROMPT = """\ You are a **Security Analyst** in charge of defining the TrustSec contracts between endpoints connected to a large Cisco SD-Access network. The customer's management diff --git a/src/segmentation_copilot/aggregator.py b/src/segmentation_copilot/aggregator.py index 2e28272..5214be8 100644 --- a/src/segmentation_copilot/aggregator.py +++ b/src/segmentation_copilot/aggregator.py @@ -2,13 +2,11 @@ from __future__ import annotations -from collections import defaultdict +from collections.abc import Iterable from dataclasses import dataclass, field -from typing import Iterable from .parser import FlowEvent - EPHEMERAL_PORT_THRESHOLD = 1024 ANY_PORT = "any" diff --git a/src/segmentation_copilot/classify.py b/src/segmentation_copilot/classify.py index d5abfab..98f38f5 100644 --- a/src/segmentation_copilot/classify.py +++ b/src/segmentation_copilot/classify.py @@ -4,14 +4,12 @@ import json from dataclasses import dataclass -from typing import Iterable from anthropic import Anthropic from .aggregator import AggregatedFlow from .sgt import SGTDictionary - CATEGORIES = ("business_relevant", "default", "business_irrelevant", "harmful") diff --git a/src/segmentation_copilot/contracts.py b/src/segmentation_copilot/contracts.py index 28c2c6f..1436235 100644 --- a/src/segmentation_copilot/contracts.py +++ b/src/segmentation_copilot/contracts.py @@ -2,13 +2,9 @@ from __future__ import annotations -from collections import defaultdict -from typing import Iterable - from .aggregator import AggregatedFlow from .sgt import SGTDictionary - PERMIT_CATEGORIES = {"business_relevant", "default"} DENY_CATEGORIES = {"business_irrelevant", "harmful"} diff --git a/src/segmentation_copilot/core/db.py b/src/segmentation_copilot/core/db.py index 5d053d9..e02a799 100644 --- a/src/segmentation_copilot/core/db.py +++ b/src/segmentation_copilot/core/db.py @@ -21,7 +21,6 @@ from ..config import DatabaseSettings, get_settings from .models.orm import Base - _engine: AsyncEngine | None = None _sessionmaker: async_sessionmaker[AsyncSession] | None = None diff --git a/src/segmentation_copilot/core/events/bus.py b/src/segmentation_copilot/core/events/bus.py index b05cead..c1f5a19 100644 --- a/src/segmentation_copilot/core/events/bus.py +++ b/src/segmentation_copilot/core/events/bus.py @@ -11,7 +11,7 @@ import asyncio import time import uuid -from collections import defaultdict, deque +from collections import defaultdict from dataclasses import dataclass, field from typing import Any, Protocol, runtime_checkable diff --git a/src/segmentation_copilot/core/events/redis_bus.py b/src/segmentation_copilot/core/events/redis_bus.py index ef32992..3f1c90e 100644 --- a/src/segmentation_copilot/core/events/redis_bus.py +++ b/src/segmentation_copilot/core/events/redis_bus.py @@ -15,7 +15,7 @@ import redis.asyncio as redis -from .bus import EventBus, EventEnvelope +from .bus import EventEnvelope from .streams import EVENT_TTL_SECONDS diff --git a/src/segmentation_copilot/core/events/streams.py b/src/segmentation_copilot/core/events/streams.py index 911db2d..bf6c6e1 100644 --- a/src/segmentation_copilot/core/events/streams.py +++ b/src/segmentation_copilot/core/events/streams.py @@ -12,7 +12,6 @@ from pydantic import BaseModel - STREAM_SCAN_SCHEDULED = "scopilot.events.scan.scheduled.v1" STREAM_FLOW_UNKNOWN = "scopilot.events.flow.unknown.v1" STREAM_PROPOSAL_CREATED = "scopilot.events.proposal.created.v1" diff --git a/src/segmentation_copilot/core/observability/__init__.py b/src/segmentation_copilot/core/observability/__init__.py new file mode 100644 index 0000000..37c47e5 --- /dev/null +++ b/src/segmentation_copilot/core/observability/__init__.py @@ -0,0 +1,30 @@ +"""Observability helpers — structured logs + Prometheus metrics. + +Kept deliberately lightweight: every service can `from +segmentation_copilot.core.observability import metrics, configure_logging` +and get sensible defaults without an OpenTelemetry collector running. + +The Prometheus registry is process-global; services that expose +`/metrics` mount `make_metrics_app()` (FastAPI) or +`make_metrics_endpoint()` (Starlette plain). +""" + +from .logging import configure_logging +from .metrics import ( + classifications_counter, + flow_unknown_consumed_counter, + flow_unknown_published_counter, + make_metrics_endpoint, + proposals_counter, + threat_lookups_counter, +) + +__all__ = [ + "classifications_counter", + "configure_logging", + "flow_unknown_consumed_counter", + "flow_unknown_published_counter", + "make_metrics_endpoint", + "proposals_counter", + "threat_lookups_counter", +] diff --git a/src/segmentation_copilot/core/observability/logging.py b/src/segmentation_copilot/core/observability/logging.py new file mode 100644 index 0000000..73ae316 --- /dev/null +++ b/src/segmentation_copilot/core/observability/logging.py @@ -0,0 +1,60 @@ +"""Structured-logging helper. + +Switching between JSON and plain text via `SCOPILOT_LOG_FORMAT`. JSON is +the production default — feeds straight into Loki / CloudWatch / etc. +without an exporter. +""" + +from __future__ import annotations + +import json +import logging +import os +import sys +import time +from typing import Any + + +class _JsonFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + payload: dict[str, Any] = { + "ts": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(record.created)), + "level": record.levelname, + "logger": record.name, + "message": record.getMessage(), + } + if record.exc_info: + payload["exc_info"] = self.formatException(record.exc_info) + for k, v in record.__dict__.items(): + if k in ("args", "msg", "name", "pathname", "filename", "module", + "exc_info", "exc_text", "stack_info", "lineno", "funcName", + "created", "msecs", "relativeCreated", "thread", "threadName", + "processName", "process", "levelname", "levelno"): + continue + payload[k] = v + return json.dumps(payload, default=str) + + +def configure_logging(*, level: str | None = None, fmt: str | None = None, + stream=None) -> None: + """Configure root logging once per process. + + `level` and `fmt` default to `SCOPILOT_LOG_LEVEL` / `SCOPILOT_LOG_FORMAT`. + `stream` defaults to stderr (stdio MCP transport must keep stdout clean). + """ + level = (level or os.environ.get("SCOPILOT_LOG_LEVEL") or "INFO").upper() + fmt = (fmt or os.environ.get("SCOPILOT_LOG_FORMAT") or "text").lower() + stream = stream or sys.stderr + + root = logging.getLogger() + for h in list(root.handlers): + root.removeHandler(h) + handler = logging.StreamHandler(stream) + if fmt == "json": + handler.setFormatter(_JsonFormatter()) + else: + handler.setFormatter(logging.Formatter( + "%(asctime)s %(levelname)s %(name)s %(message)s" + )) + root.addHandler(handler) + root.setLevel(getattr(logging, level, logging.INFO)) diff --git a/src/segmentation_copilot/core/observability/metrics.py b/src/segmentation_copilot/core/observability/metrics.py new file mode 100644 index 0000000..b011918 --- /dev/null +++ b/src/segmentation_copilot/core/observability/metrics.py @@ -0,0 +1,84 @@ +"""Prometheus counters + histograms used across services. + +We export a process-global registry so every counter is summed without +the caller knowing the registry exists. Mount `make_metrics_endpoint()` +on FastAPI / Starlette to expose `/metrics`. + +Counters are deliberately coarse — labels are kept low-cardinality +(category names, status names — never IPs or proposal ids) so Prometheus +storage stays bounded. +""" + +from __future__ import annotations + +from prometheus_client import ( + CONTENT_TYPE_LATEST, + Counter, + Histogram, + generate_latest, +) + +# --------------------------------------------------------------------------- +# Counters +# --------------------------------------------------------------------------- + +flow_unknown_published_counter = Counter( + "scopilot_flow_unknown_published_total", + "Number of events.flow.unknown published.", + labelnames=("tenant_id", "trigger"), +) + +flow_unknown_consumed_counter = Counter( + "scopilot_flow_unknown_consumed_total", + "Number of events.flow.unknown consumed and classified.", + labelnames=("tenant_id", "outcome"), +) + +classifications_counter = Counter( + "scopilot_classifications_total", + "Per-category classification counts.", + labelnames=("tenant_id", "category"), +) + +proposals_counter = Counter( + "scopilot_proposals_total", + "Proposal lifecycle transitions.", + labelnames=("tenant_id", "status"), +) + +threat_lookups_counter = Counter( + "scopilot_threat_lookups_total", + "Threat-intel lookups by provider and outcome.", + labelnames=("provider", "outcome"), +) + +# --------------------------------------------------------------------------- +# Histograms (Phase 6 baseline; downstream code will add observe() calls). +# --------------------------------------------------------------------------- + +http_request_duration = Histogram( + "scopilot_http_request_duration_seconds", + "FastAPI request duration.", + labelnames=("route", "method", "status_class"), +) + + +# --------------------------------------------------------------------------- +# Endpoint +# --------------------------------------------------------------------------- + + +def make_metrics_endpoint(): + """Return an ASGI-compatible handler that renders the global registry. + + Usage: + from starlette.routing import Route + app.add_route("/metrics", make_metrics_endpoint(), methods=["GET"]) + """ + from starlette.responses import Response + + async def metrics(request): # noqa: ARG001 + return Response(content=generate_latest(), + media_type=CONTENT_TYPE_LATEST) + + return metrics diff --git a/src/segmentation_copilot/core/repositories/events.py b/src/segmentation_copilot/core/repositories/events.py index 078c300..f8e9ae0 100644 --- a/src/segmentation_copilot/core/repositories/events.py +++ b/src/segmentation_copilot/core/repositories/events.py @@ -12,6 +12,7 @@ from ..models.domain import FlowEventRecord from ..models.orm import FlowEvent + class FlowEventRepository: def __init__(self, session: AsyncSession) -> None: self.session = session diff --git a/src/segmentation_copilot/core/threat/abuseipdb.py b/src/segmentation_copilot/core/threat/abuseipdb.py index 60a1c42..066cc3f 100644 --- a/src/segmentation_copilot/core/threat/abuseipdb.py +++ b/src/segmentation_copilot/core/threat/abuseipdb.py @@ -18,7 +18,6 @@ from .base import ThreatVerdict - _BASE_URL = "https://api.abuseipdb.com/api/v2" # Category code → label mapping (top-level signals only; full list at @@ -29,7 +28,7 @@ 9: "open_proxy", 10: "web_spam", 11: "email_spam", - 14: "port_scan", + 14: "scanner", 15: "hacking", 18: "brute_force", 19: "bad_web_bot", @@ -37,7 +36,6 @@ 21: "web_app_attack", 22: "ssh", 23: "iot_targeted", - 14: "scanner", } diff --git a/src/segmentation_copilot/core/threat/aggregator.py b/src/segmentation_copilot/core/threat/aggregator.py index 073c40f..26fd0a0 100644 --- a/src/segmentation_copilot/core/threat/aggregator.py +++ b/src/segmentation_copilot/core/threat/aggregator.py @@ -21,13 +21,10 @@ from collections.abc import Iterable from dataclasses import dataclass -from sqlalchemy.ext.asyncio import AsyncSession - from ...config import Settings, get_settings from .base import ThreatIntelClient, ThreatVerdict from .cache import ThreatLookupRepository, ThreatVerdictCache, build_cache, is_miss - log = logging.getLogger(__name__) @@ -135,7 +132,7 @@ async def _fetch_one( verdict = await asyncio.wait_for( client.lookup_ip(ip), timeout=self._per_call_timeout ) - except asyncio.TimeoutError: + except TimeoutError: log.warning("threat lookup timed out provider=%s ip=%s", client.name, ip) return None except Exception: diff --git a/src/segmentation_copilot/core/threat/cache.py b/src/segmentation_copilot/core/threat/cache.py index 1887ae7..3cf572a 100644 --- a/src/segmentation_copilot/core/threat/cache.py +++ b/src/segmentation_copilot/core/threat/cache.py @@ -15,7 +15,6 @@ from __future__ import annotations import json -from dataclasses import asdict from datetime import datetime, timedelta from typing import Protocol @@ -25,7 +24,6 @@ from ..models.orm import ThreatLookup from .base import ThreatVerdict - _CLEAN_VERDICT_SENTINEL = "<>" diff --git a/src/segmentation_copilot/core/threat/otx.py b/src/segmentation_copilot/core/threat/otx.py index 118614f..9fccc9d 100644 --- a/src/segmentation_copilot/core/threat/otx.py +++ b/src/segmentation_copilot/core/threat/otx.py @@ -16,7 +16,6 @@ from .base import ThreatVerdict - _BASE_URL = "https://otx.alienvault.com/api/v1" diff --git a/src/segmentation_copilot/core/threat/talos.py b/src/segmentation_copilot/core/threat/talos.py index bf0797f..e91a70a 100644 --- a/src/segmentation_copilot/core/threat/talos.py +++ b/src/segmentation_copilot/core/threat/talos.py @@ -21,7 +21,6 @@ from .base import ThreatVerdict - _URL = "https://talosintelligence.com/sb_api/query_lookup" diff --git a/src/segmentation_copilot/core/threat/virustotal.py b/src/segmentation_copilot/core/threat/virustotal.py index f3c8849..f8d3076 100644 --- a/src/segmentation_copilot/core/threat/virustotal.py +++ b/src/segmentation_copilot/core/threat/virustotal.py @@ -19,7 +19,6 @@ from .base import ThreatVerdict - _BASE_URL = "https://www.virustotal.com/api/v3" diff --git a/src/segmentation_copilot/db.py b/src/segmentation_copilot/db.py index 548c2a2..faa03df 100644 --- a/src/segmentation_copilot/db.py +++ b/src/segmentation_copilot/db.py @@ -2,17 +2,15 @@ from __future__ import annotations -import json import sqlite3 +from collections.abc import Iterable, Iterator from contextlib import contextmanager from datetime import datetime from pathlib import Path -from typing import Iterable, Iterator from .aggregator import AggregatedFlow from .parser import FlowEvent - SCHEMA = """ CREATE TABLE IF NOT EXISTS runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/src/segmentation_copilot/parser.py b/src/segmentation_copilot/parser.py index f2374ed..68973af 100644 --- a/src/segmentation_copilot/parser.py +++ b/src/segmentation_copilot/parser.py @@ -3,9 +3,9 @@ from __future__ import annotations import re +from collections.abc import Iterable, Iterator from dataclasses import dataclass from datetime import datetime -from typing import Iterable, Iterator from dateutil import parser as date_parser diff --git a/src/segmentation_copilot/sources/base.py b/src/segmentation_copilot/sources/base.py index d3be86e..5de9267 100644 --- a/src/segmentation_copilot/sources/base.py +++ b/src/segmentation_copilot/sources/base.py @@ -3,9 +3,10 @@ from __future__ import annotations from abc import ABC, abstractmethod +from collections.abc import Iterator from dataclasses import dataclass from datetime import datetime -from typing import Any, Iterator +from typing import Any @dataclass @@ -23,5 +24,5 @@ def fetch(self, start: datetime, end: datetime) -> Iterator[str]: @classmethod @abstractmethod - def from_config(cls, config: LogSourceConfig) -> "LogSource": + def from_config(cls, config: LogSourceConfig) -> LogSource: ... diff --git a/src/segmentation_copilot/sources/local.py b/src/segmentation_copilot/sources/local.py index f0b8687..438040d 100644 --- a/src/segmentation_copilot/sources/local.py +++ b/src/segmentation_copilot/sources/local.py @@ -2,9 +2,9 @@ from __future__ import annotations +from collections.abc import Iterator from datetime import datetime from pathlib import Path -from typing import Iterator from ..parser import _parse_ts # type: ignore[attr-defined] from .base import LogSource, LogSourceConfig @@ -15,7 +15,7 @@ def __init__(self, paths: list[Path]): self.paths = paths @classmethod - def from_config(cls, config: LogSourceConfig) -> "LocalFileSource": + def from_config(cls, config: LogSourceConfig) -> LocalFileSource: raw = config.options.get("path") if not raw: raise ValueError("LocalFileSource requires 'path' in options") diff --git a/src/segmentation_copilot/sources/ssh.py b/src/segmentation_copilot/sources/ssh.py index c230756..f55374a 100644 --- a/src/segmentation_copilot/sources/ssh.py +++ b/src/segmentation_copilot/sources/ssh.py @@ -3,8 +3,8 @@ from __future__ import annotations import shlex +from collections.abc import Iterator from datetime import datetime -from typing import Iterator from .base import LogSource, LogSourceConfig @@ -42,7 +42,7 @@ def __init__( self.grep_pattern = grep_pattern @classmethod - def from_config(cls, config: LogSourceConfig) -> "SSHSource": + def from_config(cls, config: LogSourceConfig) -> SSHSource: opts = config.options missing = [k for k in ("host", "username", "log_path") if not opts.get(k)] if missing: diff --git a/src/segmentation_copilot/sources/streaming_ssh.py b/src/segmentation_copilot/sources/streaming_ssh.py index 0040ad0..7931266 100644 --- a/src/segmentation_copilot/sources/streaming_ssh.py +++ b/src/segmentation_copilot/sources/streaming_ssh.py @@ -19,7 +19,6 @@ import random from collections.abc import AsyncIterator - log = logging.getLogger(__name__) @@ -95,7 +94,7 @@ async def tail(self) -> AsyncIterator[str]: try: await asyncio.wait_for(self._stop.wait(), timeout=backoff) return - except asyncio.TimeoutError: + except TimeoutError: pass jitter = random.uniform(0, backoff * 0.25) backoff = min(self._backoff_max, backoff * 2 + jitter) @@ -107,7 +106,7 @@ async def _read_with_heartbeat(self, proc) -> AsyncIterator[str]: line = await asyncio.wait_for( proc.stdout.readline(), timeout=self._heartbeat_seconds ) - except asyncio.TimeoutError: + except TimeoutError: yield f"{HEARTBEAT_PREFIX} {self._host}" continue if not line: diff --git a/tests/conftest.py b/tests/conftest.py index bad859c..3d92e76 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,7 +8,6 @@ from __future__ import annotations -import os from collections.abc import AsyncIterator import pytest diff --git a/tests/test_api.py b/tests/test_api.py index 811eb88..9ddd32c 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -14,7 +14,6 @@ import pytest_asyncio from httpx import ASGITransport, AsyncClient - FIXTURE = Path(__file__).parent / "fixtures" / "sample.log" diff --git a/tests/test_cli.py b/tests/test_cli.py index d965798..7bcf4e0 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -9,7 +9,6 @@ from __future__ import annotations -import json from pathlib import Path from unittest.mock import MagicMock, patch @@ -18,7 +17,6 @@ from services.cli.main import app - FIXTURE = Path(__file__).parent / "fixtures" / "sample.log" diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py new file mode 100644 index 0000000..1bb00fd --- /dev/null +++ b/tests/test_mcp_server.py @@ -0,0 +1,152 @@ +"""MCP server tests. + +The FastMCP `tool()` decorator registers each function in the server's +tool manager; we drive it through `call_tool(name, arguments)` which is +exactly what the stdio / HTTP transports would invoke. No real MCP +client needed. + +The tests cover the happy path of each tool group plus the +dictionary-edit gate (`set_sgt_name` only exists when +`allow_dictionary_edit=True`). +""" + +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import patch + +import pytest + +from segmentation_copilot.core import db as core_db +from segmentation_copilot.core.repositories.sgt import SGTRepository + +FIXTURE = Path(__file__).parent / "fixtures" / "sample.log" + + +async def _call(server, name: str, arguments: dict | None = None): + """Invoke a tool by name via the FastMCP server.""" + return await server.call_tool(name, arguments or {}) + + +def _result_payload(result): + """Return the structured payload from FastMCP's CallToolResult tuple. + + FastMCP returns `(content_blocks, structured_content)` where + `structured_content` is `{"result": }`. Prefer that — + the content blocks only carry the *first* element of a list return, + which trips up obvious-looking `.text` parsing. + """ + if isinstance(result, tuple) and len(result) >= 2: + structured = result[1] + if isinstance(structured, dict) and "result" in structured: + return structured["result"] + if isinstance(structured, dict): + return structured + # Fallback: render whatever the first text block carried. + if isinstance(result, tuple) and result and isinstance(result[0], list): + for block in result[0]: + text = getattr(block, "text", None) + if text is not None: + try: + return json.loads(text) + except (json.JSONDecodeError, TypeError): + return text + return result + + +def _fake_classify(flows, sgt_dict, client=None, model=None): + return [(f, "business_relevant", "mcp-fake") for f in flows] + + +@pytest.mark.asyncio +async def test_mcp_run_lifecycle_through_tools(): + """start_run → ingest_lines → classify_run → build_matrix end-to-end.""" + await core_db.create_all() + # Seed the SGT dictionary directly — set_sgt_name is gated and disabled + # for this test. + async with core_db.session_scope() as s: + for sid, name in [(100, "Employees"), (200, "Web"), (300, "Guests"), + (400, "DNS"), (999, "External")]: + await SGTRepository(s).upsert(tenant_id="test-tenant", sgt_id=sid, name=name) + + from services.mcp_server.server import build_server + + server = build_server(allow_dictionary_edit=False) + + res = await _call(server, "start_run", {"source_type": "inline"}) + payload = _result_payload(res) + run_id = payload["run_id"] + assert isinstance(run_id, int) + + lines = FIXTURE.read_text().splitlines() + res = await _call(server, "ingest_lines", {"run_id": run_id, "lines": lines}) + summary = _result_payload(res) + assert summary["parsed_events"] == 6 + + with patch( + "segmentation_copilot.core.services.classification.classify.classify_batch", + side_effect=_fake_classify, + ): + res = await _call(server, "classify_run", {"run_id": run_id}) + counts = _result_payload(res) + assert counts.get("business_relevant", 0) >= 1 + + res = await _call(server, "build_matrix", {"run_id": run_id}) + matrix = _result_payload(res) + assert "Source SGT" in matrix["markdown"] + assert len(matrix["contracts"]) >= 1 + + +@pytest.mark.asyncio +async def test_mcp_set_sgt_name_only_present_when_enabled(): + from services.mcp_server.server import build_server + + locked = build_server(allow_dictionary_edit=False) + tools_locked = {t.name for t in await locked.list_tools()} + assert "set_sgt_name" not in tools_locked + assert "list_sgt_entries" in tools_locked + + unlocked = build_server(allow_dictionary_edit=True) + tools_unlocked = {t.name for t in await unlocked.list_tools()} + assert "set_sgt_name" in tools_unlocked + + +@pytest.mark.asyncio +async def test_mcp_proposal_flow(): + await core_db.create_all() + from services.mcp_server.server import build_server + + server = build_server() + aces = [{"protocol": "tcp", "src_port": "any", "dst_port": "443", + "action": "permit", "source_category": "business_relevant"}] + res = await _call(server, "create_proposal", { + "src_sgt": 100, "dst_sgt": 200, + "proposed_aces": aces, + "rationale": "HTTPS for the web tier", + }) + proposal = _result_payload(res) + pid = proposal["id"] + assert proposal["status"] == "pending" + + res = await _call(server, "approve_proposal", { + "proposal_id": pid, "actor": "alice@example.com" + }) + approved = _result_payload(res) + assert approved["status"] == "applied" + + # List proposals — the approved one should appear. + res = await _call(server, "list_proposals", {}) + items = _result_payload(res) + assert any(p["id"] == pid for p in items) + + +@pytest.mark.asyncio +async def test_mcp_lookup_threat_intel_returns_error_when_unconfigured(): + """No keys configured → graceful error, not a 500.""" + from services.mcp_server.server import build_server + + server = build_server() + res = await _call(server, "lookup_threat_intel", {"ip": "1.2.3.4"}) + payload = _result_payload(res) + assert "error" in payload diff --git a/tests/test_no_core_in_app.py b/tests/test_no_core_in_app.py index 695dee3..c9c7ada 100644 --- a/tests/test_no_core_in_app.py +++ b/tests/test_no_core_in_app.py @@ -10,9 +10,6 @@ from pathlib import Path -import pytest - - APP_PY = Path(__file__).parent.parent / "app.py" diff --git a/tests/test_parser.py b/tests/test_parser.py index 66e63fa..8630a42 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -2,7 +2,6 @@ from segmentation_copilot.parser import parse_line, parse_lines - EXAMPLE = ( "Jun 18 10:17:22.205: %RBM-6-SGACLHIT: ingress_interface='GigabitEthernet1/0/1' " "sgacl_name='testv4' action='Permit' protocol='udp' src-vrf='default' " diff --git a/tests/test_proposal_service.py b/tests/test_proposal_service.py index 1c58d30..769248b 100644 --- a/tests/test_proposal_service.py +++ b/tests/test_proposal_service.py @@ -29,7 +29,6 @@ idempotency_key, ) - TENANT = "test-tenant" diff --git a/tests/test_repositories.py b/tests/test_repositories.py index c7b9434..8cfe1fb 100644 --- a/tests/test_repositories.py +++ b/tests/test_repositories.py @@ -25,7 +25,6 @@ from segmentation_copilot.core.repositories.proposals import ProposalConflictError from segmentation_copilot.parser import FlowEvent as ParsedFlowEvent - TENANT = "test-tenant" diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 9a852dd..ae21d3e 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -19,12 +19,10 @@ from segmentation_copilot.core.events import InMemoryBus from segmentation_copilot.core.repositories.sgt import SGTRepository from segmentation_copilot.core.services.ingestion import IngestionService - from services.worker.cursor import MemoryCursorStore from services.worker.leader import MemoryLeader from services.worker.scheduler import run_scheduler - FIXTURE = Path(__file__).parent / "fixtures" / "sample.log" TENANT = "test-tenant" diff --git a/tests/test_services.py b/tests/test_services.py index bd2ab05..b74b227 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -14,7 +14,6 @@ from segmentation_copilot.aggregator import AggregatedFlow from segmentation_copilot.core.repositories import ( - ClassificationRepository, MatrixVersionRepository, SGTRepository, ) @@ -25,7 +24,6 @@ MatrixService, ) - TENANT = "test-tenant" FIXTURE = Path(__file__).parent / "fixtures" / "sample.log" diff --git a/tests/test_threat_daemon.py b/tests/test_threat_daemon.py index 30b1141..285b86f 100644 --- a/tests/test_threat_daemon.py +++ b/tests/test_threat_daemon.py @@ -15,7 +15,6 @@ from __future__ import annotations -import asyncio from datetime import datetime from typing import Any @@ -31,7 +30,6 @@ from segmentation_copilot.sources.streaming_ssh import HEARTBEAT_PREFIX from services.threat_daemon.runner import run_daemon - # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- diff --git a/tests/test_threat_intel.py b/tests/test_threat_intel.py index 2a80089..a3dc95c 100644 --- a/tests/test_threat_intel.py +++ b/tests/test_threat_intel.py @@ -24,7 +24,6 @@ from segmentation_copilot.core.threat.otx import OTXClient from segmentation_copilot.core.threat.virustotal import VirusTotalClient - # --------------------------------------------------------------------------- # Fake providers # --------------------------------------------------------------------------- diff --git a/tests/test_worker.py b/tests/test_worker.py index 386d8e5..261f3c4 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -12,7 +12,6 @@ from __future__ import annotations -import asyncio from pathlib import Path from unittest.mock import patch @@ -29,11 +28,9 @@ from segmentation_copilot.core.repositories.sgt import SGTRepository from segmentation_copilot.core.services.ingestion import IngestionService from segmentation_copilot.core.services.proposal import ProposalService - from services.worker.cursor import MemoryCursorStore from services.worker.scan import scan_tenant -from services.worker.worker import CONSUMER_GROUP_FLOW_UNKNOWN, run_worker - +from services.worker.worker import run_worker FIXTURE = Path(__file__).parent / "fixtures" / "sample.log" TENANT = "test-tenant"