diff --git a/agents.py b/agents.py index b158f51..27f79e5 100644 --- a/agents.py +++ b/agents.py @@ -29,9 +29,10 @@ def __init__( def process_interaction(self, unified_message): """Main entry point from the Orchestrator.""" - print(f"Intake Agent: Processing incoming message ID { - unified_message.get('id')} for tenant { - self.tenant_id}") + print( + f"Intake Agent: Processing incoming message ID " + f"{unified_message.get('id')} for tenant {self.tenant_id}" + ) # 1. Extract and verify customer details customer_profile = self._collect_customer_info(unified_message) @@ -160,9 +161,10 @@ def __init__( def execute_task(self, task_context): """Main execution block triggered by the Orchestrator.""" - print(f"Fulfillment Agent: Executing task for customer { - task_context.get('customer_id')} on tenant { - self.tenant_id}") + print( + f"Fulfillment Agent: Executing task for customer " + f"{task_context.get('customer_id')} on tenant {self.tenant_id}" + ) intent = task_context.get("intent") @@ -239,8 +241,10 @@ def __init__( def handle_issue(self, issue_context): """Resolves edge cases or handles failed fulfilment tasks.""" - print(f"Resolution Agent: Troubleshooting issue { - issue_context.get('issue_id')}") + print( + f"Resolution Agent: Troubleshooting issue " + f"{issue_context.get('issue_id')}" + ) # Log the intervention for metrics self.logger.log_event("agent_intervention", issue_context) diff --git a/langgraph/embeddings.py b/langgraph/embeddings.py index c19a344..1b0437f 100644 --- a/langgraph/embeddings.py +++ b/langgraph/embeddings.py @@ -54,8 +54,10 @@ def embed_and_store(self, processed_chunks: List[Dict[str, Any]]): ] tenant_id = processed_chunks[0]["tenant_id"] - print(f"Generating sovereign embeddings for { - len(texts_to_embed)} chunks. Tenant: {tenant_id}...") + print( + f"Generating sovereign embeddings for {len(texts_to_embed)} " + f"chunks. Tenant: {tenant_id}..." + ) # 2. Run embedding if self.embedder is not None: @@ -80,6 +82,8 @@ def embed_and_store(self, processed_chunks: List[Dict[str, Any]]): # if self.db is not None: # self.db.insert(collection_name="knowledge_embeddings", data=ready_records) - print(f"Prepared { - len(ready_records)} vector records for Tenant {tenant_id}.") + print( + f"Prepared {len(ready_records)} vector records for Tenant " + f"{tenant_id}." + ) return ready_records diff --git a/langgraph/ingestion.py b/langgraph/ingestion.py index 5892260..6d6279a 100644 --- a/langgraph/ingestion.py +++ b/langgraph/ingestion.py @@ -85,6 +85,8 @@ def chunk_and_tag_document( } processed_chunks.append(chunk_record) - print(f"Processed { - len(processed_chunks)} chunks for Tenant: {tenant_id}") + print( + f"Processed {len(processed_chunks)} chunks for Tenant: " + f"{tenant_id}" + ) return processed_chunks diff --git a/models.py b/models.py index 8169060..c438be4 100644 --- a/models.py +++ b/models.py @@ -71,10 +71,11 @@ class Tenant(Base): ) def __repr__(self): - return f"" + return ( + f"" + ) class TenantConfig(Base): @@ -113,9 +114,10 @@ class TenantConfig(Base): tenant = relationship("Tenant", back_populates="configs") def __repr__(self): - return f"" + return ( + f"" + ) class KnowledgeSource(Base): @@ -162,11 +164,12 @@ class KnowledgeSource(Base): ) def __repr__(self): - return f"" + return ( + f"" + ) class VectorEmbedding(Base): @@ -206,10 +209,10 @@ class VectorEmbedding(Base): source = relationship("KnowledgeSource", back_populates="embeddings") def __repr__(self): - return f"" + return ( + f"" + ) class InteractionLog(Base): @@ -248,10 +251,10 @@ class InteractionLog(Base): tenant = relationship("Tenant", back_populates="interaction_logs") def __repr__(self): - return f"" + return ( + f"" + ) class BranchLineageHealth(Base): diff --git a/orchestrator.py b/orchestrator.py index dcba5ad..998e4cd 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -1,22 +1,54 @@ -import uuid -from typing import Any, Dict, Optional, Protocol +from typing import Any, Dict, Optional -from coastal_alpine_core.security import SecurityGuard, SecurityResult +from agents import IntakeAgent +from coastal_alpine_core.security import SecurityGuard from coastal_alpine_core.telemetry import TelemetryTracker -from coastal_alpine_core.flywheel import DataFlywheel, Trajectory, BayesianOptimisationHook -# ... existing imports and classes ... + +class _MemoryStore: + def __init__(self): + self._contexts: Dict[Optional[str], Dict[str, Any]] = {} + + def update_context( + self, + message_id: Optional[str], + customer_profile: Dict[str, Any], + request_classification: str, + ) -> None: + self._contexts[message_id] = { + "customer_profile": customer_profile, + "request_classification": request_classification, + } + + +class _KnowledgeBaseClient: + def query(self, query: str, tenant_id: str): + return [] + class AgentOrchestrator: """ - Enterprise orchestrator with full Data Flywheel + Bayesian Optimisation scaffolding. + Demo orchestrator that wraps the intake agent with security and telemetry checks. """ - def __init__(self, tenant_id: str, tenant_config: Optional[Dict[str, Any]] = None, ...): - # ... existing init code ... - self.flywheel = DataFlywheel(storage_path=f"flywheel_{tenant_id}.jsonl") - self.bo_hook = BayesianOptimisationHook() + def __init__( + self, + tenant_id: str, + tenant_config: Optional[Dict[str, Any]] = None, + knowledge_base_client=None, + memory_store=None, + ): + self.tenant_id = tenant_id + self.tenant_config = tenant_config or {} + self.knowledge_base_client = knowledge_base_client or _KnowledgeBaseClient() + self.memory_store = memory_store or _MemoryStore() self.security_guard = SecurityGuard() + self.intake_agent = IntakeAgent( + self.knowledge_base_client, + self.memory_store, + tenant_id=self.tenant_id, + tenant_config=self.tenant_config, + ) def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]: # Security + Telemetry as before... @@ -25,36 +57,10 @@ def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]: return {"status": "blocked"} measurement = TelemetryTracker.measure_latency("orchestrator_process_message") + result = self.intake_agent.process_interaction(message) - # ... existing intake and routing logic ... - - result = intake_agent.process_interaction(message) - - # === Data Flywheel Recording === - try: - traj = Trajectory( - trajectory_id=str(uuid.uuid4()), - timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - action="process_message", - input_summary=str(message)[:300], - output_summary=str(result)[:300], - outcome="success" if result.get("status") != "error" else "failure", - latency_seconds=0.0, # populated after complete_measurement - estimated_energy_joules=0.0, - system_metrics={}, - metadata={"tenant_id": self.tenant_id} - ) - self.flywheel.record_trajectory(traj) - except Exception as e: - logger.warning(f"Flywheel recording failed: {e}") - - TelemetryTracker.complete_measurement(measurement, include_system_metrics=True) - - # Optional: Ask BO hook for suggestions periodically - if len(self.flywheel.get_recent_trajectories(10)) % 20 == 0: - suggestion = self.bo_hook.suggest_next_configuration({"current_latency": 2.3}) - logger.info(f"BO Suggestion: {suggestion}") + TelemetryTracker.complete_measurement( + measurement, include_system_metrics=True + ) return result - - # ... rest of the class ... diff --git a/src/attestation_agent.py b/src/attestation_agent.py index 8bd7506..dc22c5f 100644 --- a/src/attestation_agent.py +++ b/src/attestation_agent.py @@ -63,8 +63,10 @@ def generate_tpm_quote(nonce_hex): return attestation_payload except subprocess.CalledProcessError as e: - print(f"[CRITICAL HARDWARE FAULT] TPM execution failed: { - e.stderr.decode()}") + print( + f"[CRITICAL HARDWARE FAULT] TPM execution failed: " + f"{e.stderr.decode()}" + ) return None finally: # Cleanup volatile staging files diff --git a/src/fleet_manager.py b/src/fleet_manager.py index fbdef77..af63b6a 100644 --- a/src/fleet_manager.py +++ b/src/fleet_manager.py @@ -75,8 +75,10 @@ def on_heartbeat_received(client, userdata, msg): "battery_mv": payload.get("battery_mv"), "posture_state": payload.get("posture"), } - print(f"[FLEET MONITOR] Node {node_id} reported in. Status: { - payload.get('posture')}") + print( + f"[FLEET MONITOR] Node {node_id} reported in. Status: " + f"{payload.get('posture')}" + ) except Exception as e: print(f"[FLEET ERROR] Corrupted heartbeat frame: {e}") diff --git a/tests/stress/mqtt_flood.py b/tests/stress/mqtt_flood.py index 8ba7b0a..f571c7f 100644 --- a/tests/stress/mqtt_flood.py +++ b/tests/stress/mqtt_flood.py @@ -1,4 +1,5 @@ # weaver/tests/stress/mqtt_flood.py +import json import time import threading import random @@ -29,11 +30,13 @@ def simulate_node(node_id): for _ in range(MESSAGES_PER_NODE): # Simulate a standard telemetry packet - payload = f'{ - {"node": {node_id}, "moisture": { - random.uniform( - 20.0, 80.0)}, "ts": { - time.time()}}}' + payload = json.dumps( + { + "node": node_id, + "moisture": random.uniform(20.0, 80.0), + "ts": time.time(), + } + ) client.publish(TOPIC, payload, qos=1) time.sleep(0.001) # Rapid blast @@ -61,7 +64,7 @@ def simulate_node(node_id): duration = time.time() - start_time total_msgs = NUM_SIMULATED_NODES * MESSAGES_PER_NODE - print(f"Test finished. Dispatched {total_msgs} messages in { - duration:.2f} seconds ({ - total_msgs / - duration:.1f} msgs/sec).") + print( + f"Test finished. Dispatched {total_msgs} messages in " + f"{duration:.2f} seconds ({total_msgs / duration:.1f} msgs/sec)." + )