Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ def __init__(

def process_interaction(self, unified_message):
"""Main entry point from the Orchestrator."""
print(f"Intake Agent: Processing incoming message ID {
unified_message.get('id')} for tenant {
self.tenant_id}")
print(
f"Intake Agent: Processing incoming message ID "
f"{unified_message.get('id')} for tenant {self.tenant_id}"
)

# 1. Extract and verify customer details
customer_profile = self._collect_customer_info(unified_message)
Expand Down Expand Up @@ -160,9 +161,10 @@ def __init__(

def execute_task(self, task_context):
"""Main execution block triggered by the Orchestrator."""
print(f"Fulfillment Agent: Executing task for customer {
task_context.get('customer_id')} on tenant {
self.tenant_id}")
print(
f"Fulfillment Agent: Executing task for customer "
f"{task_context.get('customer_id')} on tenant {self.tenant_id}"
)

intent = task_context.get("intent")

Expand Down Expand Up @@ -239,8 +241,10 @@ def __init__(

def handle_issue(self, issue_context):
"""Resolves edge cases or handles failed fulfilment tasks."""
print(f"Resolution Agent: Troubleshooting issue {
issue_context.get('issue_id')}")
print(
f"Resolution Agent: Troubleshooting issue "
f"{issue_context.get('issue_id')}"
)

# Log the intervention for metrics
self.logger.log_event("agent_intervention", issue_context)
Expand Down
12 changes: 8 additions & 4 deletions langgraph/embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ def embed_and_store(self, processed_chunks: List[Dict[str, Any]]):
]
tenant_id = processed_chunks[0]["tenant_id"]

print(f"Generating sovereign embeddings for {
len(texts_to_embed)} chunks. Tenant: {tenant_id}...")
print(
f"Generating sovereign embeddings for {len(texts_to_embed)} "
f"chunks. Tenant: {tenant_id}..."
)

# 2. Run embedding
if self.embedder is not None:
Expand All @@ -80,6 +82,8 @@ def embed_and_store(self, processed_chunks: List[Dict[str, Any]]):
# if self.db is not None:
# self.db.insert(collection_name="knowledge_embeddings", data=ready_records)

print(f"Prepared {
len(ready_records)} vector records for Tenant {tenant_id}.")
print(
f"Prepared {len(ready_records)} vector records for Tenant "
f"{tenant_id}."
)
return ready_records
6 changes: 4 additions & 2 deletions langgraph/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def chunk_and_tag_document(
}
processed_chunks.append(chunk_record)

print(f"Processed {
len(processed_chunks)} chunks for Tenant: {tenant_id}")
print(
f"Processed {len(processed_chunks)} chunks for Tenant: "
f"{tenant_id}"
)
return processed_chunks
43 changes: 23 additions & 20 deletions models.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ class Tenant(Base):
)

def __repr__(self):
return f"<Tenant(tenant_id={
self.tenant_id}, company_name='{
self.company_name}', industry='{
self.industry}')>"
return (
f"<Tenant(tenant_id={self.tenant_id}, "
f"company_name='{self.company_name}', "
f"industry='{self.industry}')>"
)


class TenantConfig(Base):
Expand Down Expand Up @@ -113,9 +114,10 @@ class TenantConfig(Base):
tenant = relationship("Tenant", back_populates="configs")

def __repr__(self):
return f"<TenantConfig(config_id={
self.config_id}, tenant_id={
self.tenant_id})>"
return (
f"<TenantConfig(config_id={self.config_id}, "
f"tenant_id={self.tenant_id})>"
)


class KnowledgeSource(Base):
Expand Down Expand Up @@ -162,11 +164,12 @@ class KnowledgeSource(Base):
)

def __repr__(self):
return f"<KnowledgeSource(source_id={
self.source_id}, tenant_id={
self.tenant_id}, source_type='{
self.source_type}', sync_status='{
self.sync_status}')>"
return (
f"<KnowledgeSource(source_id={self.source_id}, "
f"tenant_id={self.tenant_id}, "
f"source_type='{self.source_type}', "
f"sync_status='{self.sync_status}')>"
)


class VectorEmbedding(Base):
Expand Down Expand Up @@ -206,10 +209,10 @@ class VectorEmbedding(Base):
source = relationship("KnowledgeSource", back_populates="embeddings")

def __repr__(self):
return f"<VectorEmbedding(chunk_id={
self.chunk_id}, tenant_id={
self.tenant_id}, source_id={
self.source_id})>"
return (
f"<VectorEmbedding(chunk_id={self.chunk_id}, "
f"tenant_id={self.tenant_id}, source_id={self.source_id})>"
)


class InteractionLog(Base):
Expand Down Expand Up @@ -248,10 +251,10 @@ class InteractionLog(Base):
tenant = relationship("Tenant", back_populates="interaction_logs")

def __repr__(self):
return f"<InteractionLog(interaction_id={
self.interaction_id}, tenant_id={
self.tenant_id}, escalated={
self.escalated})>"
return (
f"<InteractionLog(interaction_id={self.interaction_id}, "
f"tenant_id={self.tenant_id}, escalated={self.escalated})>"
)


class BranchLineageHealth(Base):
Expand Down
86 changes: 46 additions & 40 deletions orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,54 @@
import uuid
from typing import Any, Dict, Optional, Protocol
from typing import Any, Dict, Optional

from coastal_alpine_core.security import SecurityGuard, SecurityResult
from agents import IntakeAgent
from coastal_alpine_core.security import SecurityGuard
from coastal_alpine_core.telemetry import TelemetryTracker
from coastal_alpine_core.flywheel import DataFlywheel, Trajectory, BayesianOptimisationHook

# ... existing imports and classes ...

class _MemoryStore:
def __init__(self):
self._contexts: Dict[Optional[str], Dict[str, Any]] = {}

def update_context(
self,
message_id: Optional[str],
customer_profile: Dict[str, Any],
request_classification: str,
) -> None:
self._contexts[message_id] = {
"customer_profile": customer_profile,
"request_classification": request_classification,
}


class _KnowledgeBaseClient:
def query(self, query: str, tenant_id: str):
return []


class AgentOrchestrator:
"""
Enterprise orchestrator with full Data Flywheel + Bayesian Optimisation scaffolding.
Demo orchestrator that wraps the intake agent with security and telemetry checks.
"""

def __init__(self, tenant_id: str, tenant_config: Optional[Dict[str, Any]] = None, ...):
# ... existing init code ...
self.flywheel = DataFlywheel(storage_path=f"flywheel_{tenant_id}.jsonl")
self.bo_hook = BayesianOptimisationHook()
def __init__(
self,
tenant_id: str,
tenant_config: Optional[Dict[str, Any]] = None,
knowledge_base_client=None,
memory_store=None,
):
self.tenant_id = tenant_id
self.tenant_config = tenant_config or {}
self.knowledge_base_client = knowledge_base_client or _KnowledgeBaseClient()
self.memory_store = memory_store or _MemoryStore()
self.security_guard = SecurityGuard()
self.intake_agent = IntakeAgent(
self.knowledge_base_client,
self.memory_store,
tenant_id=self.tenant_id,
tenant_config=self.tenant_config,
)

def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
# Security + Telemetry as before...
Expand All @@ -25,36 +57,10 @@ def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
return {"status": "blocked"}

measurement = TelemetryTracker.measure_latency("orchestrator_process_message")
result = self.intake_agent.process_interaction(message)

# ... existing intake and routing logic ...

result = intake_agent.process_interaction(message)

# === Data Flywheel Recording ===
try:
traj = Trajectory(
trajectory_id=str(uuid.uuid4()),
timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
action="process_message",
input_summary=str(message)[:300],
output_summary=str(result)[:300],
outcome="success" if result.get("status") != "error" else "failure",
latency_seconds=0.0, # populated after complete_measurement
estimated_energy_joules=0.0,
system_metrics={},
metadata={"tenant_id": self.tenant_id}
)
self.flywheel.record_trajectory(traj)
except Exception as e:
logger.warning(f"Flywheel recording failed: {e}")

TelemetryTracker.complete_measurement(measurement, include_system_metrics=True)

# Optional: Ask BO hook for suggestions periodically
if len(self.flywheel.get_recent_trajectories(10)) % 20 == 0:
suggestion = self.bo_hook.suggest_next_configuration({"current_latency": 2.3})
logger.info(f"BO Suggestion: {suggestion}")
TelemetryTracker.complete_measurement(
measurement, include_system_metrics=True
)

return result

# ... rest of the class ...
6 changes: 4 additions & 2 deletions src/attestation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ def generate_tpm_quote(nonce_hex):
return attestation_payload

except subprocess.CalledProcessError as e:
print(f"[CRITICAL HARDWARE FAULT] TPM execution failed: {
e.stderr.decode()}")
print(
f"[CRITICAL HARDWARE FAULT] TPM execution failed: "
f"{e.stderr.decode()}"
)
return None
finally:
# Cleanup volatile staging files
Expand Down
6 changes: 4 additions & 2 deletions src/fleet_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ def on_heartbeat_received(client, userdata, msg):
"battery_mv": payload.get("battery_mv"),
"posture_state": payload.get("posture"),
}
print(f"[FLEET MONITOR] Node {node_id} reported in. Status: {
payload.get('posture')}")
print(
f"[FLEET MONITOR] Node {node_id} reported in. Status: "
f"{payload.get('posture')}"
)
except Exception as e:
print(f"[FLEET ERROR] Corrupted heartbeat frame: {e}")

Expand Down
21 changes: 12 additions & 9 deletions tests/stress/mqtt_flood.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# weaver/tests/stress/mqtt_flood.py
import json
import time
import threading
import random
Expand Down Expand Up @@ -29,11 +30,13 @@ def simulate_node(node_id):

for _ in range(MESSAGES_PER_NODE):
# Simulate a standard telemetry packet
payload = f'{
{"node": {node_id}, "moisture": {
random.uniform(
20.0, 80.0)}, "ts": {
time.time()}}}'
payload = json.dumps(
{
"node": node_id,
"moisture": random.uniform(20.0, 80.0),
"ts": time.time(),
}
)
client.publish(TOPIC, payload, qos=1)
time.sleep(0.001) # Rapid blast

Expand Down Expand Up @@ -61,7 +64,7 @@ def simulate_node(node_id):

duration = time.time() - start_time
total_msgs = NUM_SIMULATED_NODES * MESSAGES_PER_NODE
print(f"Test finished. Dispatched {total_msgs} messages in {
duration:.2f} seconds ({
total_msgs /
duration:.1f} msgs/sec).")
print(
f"Test finished. Dispatched {total_msgs} messages in "
f"{duration:.2f} seconds ({total_msgs / duration:.1f} msgs/sec)."
)