Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions node/server/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,20 @@
ToolRunInput,
EnvironmentRunInput,
KBRunInput,
ModuleExecutionType
ModuleExecutionType,
SecretInput
)
from node.secret import Secret
from node.module_manager import setup_module_deployment
from node.utils import get_logger

load_dotenv()

file_path = Path(__file__).resolve()
root_dir = file_path.parent.parent.parent
MODULES_SOURCE_DIR = root_dir / os.getenv("MODULES_SOURCE_DIR")

logger = logging.getLogger(__name__)
logger = get_logger(__name__)

class GrpcServerServicer(grpc_server_pb2_grpc.GrpcServerServicer):
async def CheckUser(self, request, context):
Expand Down Expand Up @@ -112,6 +115,17 @@ async def create_module(self, module_deployment: Union[AgentDeployment, MemoryDe
async def RunModule(self, request, context):
try:
module_type = request.module_type
if len(request.secrets) > 0:
secrets = [SecretInput(
user_id=secret.user_id,
secret_value=secret.secret_value,
key_name=secret.key_name
) for secret in request.secrets]
else:
secrets = []

request_dict = MessageToDict(request, preserving_proto_field_name=True)

module_configs = {
"agent": {
"input_class": AgentRunInput,
Expand Down Expand Up @@ -196,8 +210,19 @@ async def RunModule(self, request, context):
else:
execution_type = run_input.deployment.module.execution_type

# Process secrets for environment variables
user_env_data = {}
secret_obj = Secret()
if secrets:
for secret in secrets:
user_env_data[secret.key_name] = secret_obj.decrypt_rsa(secret.secret_value)

if execution_type == ModuleExecutionType.package or execution_type == "package":
task = config["worker"].delay(module_run_data)
task = config["worker"].delay(
module_run_data,
user_env_data,
[secret.model_dump() for secret in secrets] if secrets else []
)
elif execution_type == ModuleExecutionType.docker or execution_type == "docker":
if config["docker_support"]:
task = execute_docker_agent.delay(module_run_data)
Expand Down
62 changes: 32 additions & 30 deletions node/server/grpc_server_pb2.py

Large diffs are not rendered by default.

119 changes: 65 additions & 54 deletions node/server/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,20 @@ async def agent_create_endpoint(agent_input: AgentDeployment) -> AgentDeployment
return await self.agent_create(agent_input)

@router.post("/agent/run")
async def agent_run_endpoint(agent_run_input: AgentRunInput, secrets: List[SecretInput] = []) -> AgentRun:
async def agent_run_endpoint(payload: dict) -> AgentRun:
"""
Run an agent
:param agent_run_input: Agent run specifications
:param secrets: Optional list of secrets to pass to the agent
:param payload: Agent run specifications
:return: Status
"""
if payload.get("agent_run_input"):
agent_run_input = AgentRunInput(**payload.get("agent_run_input", {}))
secrets_dict = payload.get("secrets", [])
secrets = [SecretInput(**secret) for secret in secrets_dict]
else:
agent_run_input = AgentRunInput(**payload)
secrets = []

return await self.agent_run(agent_run_input, secrets)

@router.post("/agent/check")
Expand All @@ -151,13 +158,20 @@ async def tool_create_endpoint(tool_input: ToolDeployment) -> ToolDeployment:
return await self.tool_create(tool_input)

@router.post("/tool/run")
async def tool_run_endpoint(tool_run_input: ToolRunInput, secrets: List[SecretInput] = []) -> ToolRun:
async def tool_run_endpoint(payload: dict) -> ToolRun:
"""
Run a tool
:param tool_run_input: Tool run specifications
:param secrets: Optional list of secrets to pass to the tool
:param payload: Tool run specifications
:return: Status
"""
if payload.get("tool_run_input"):
tool_run_input = ToolRunInput(**payload.get("tool_run_input", {}))
secrets_dict = payload.get("secrets", [])
secrets = [SecretInput(**secret) for secret in secrets_dict]
else:
tool_run_input = ToolRunInput(**payload)
secrets = []

return await self.tool_run(tool_run_input, secrets)

@router.post("/tool/check")
Expand All @@ -179,13 +193,21 @@ async def orchestrator_create_endpoint(orchestrator_input: OrchestratorDeploymen
return await self.orchestrator_create(orchestrator_input)

@router.post("/orchestrator/run")
async def orchestrator_run_endpoint(orchestrator_run_input: OrchestratorRunInput, secrets: List[SecretInput] = []) -> OrchestratorRun:
async def orchestrator_run_endpoint(payload: dict) -> OrchestratorRun:
"""
Run an agent orchestrator
:param orchestrator_run_input: Orchestrator run specifications
:param secrets: Optional list of secrets to pass to the orchestrator
:param payload: Orchestrator run specifications
:return: Status
"""
logger.info(f"Orchestrator run payload: {payload}")
if payload.get("orchestrator_run_input"):
orchestrator_run_input = OrchestratorRunInput(**payload.get("orchestrator_run_input", {}))
secrets_dict = payload.get("secrets", [])
secrets = [SecretInput(**secret) for secret in secrets_dict]
else:
orchestrator_run_input = OrchestratorRunInput(**payload)
secrets = []

return await self.orchestrator_run(orchestrator_run_input, secrets)

@router.post("/orchestrator/check")
Expand All @@ -207,13 +229,19 @@ async def environment_create_endpoint(environment_input: EnvironmentDeployment)
return await self.environment_create(environment_input)

@router.post("/environment/run")
async def environment_run_endpoint(environment_run_input: EnvironmentRunInput, secrets: List[SecretInput] = []) -> EnvironmentRun:
async def environment_run_endpoint(payload: dict) -> EnvironmentRun:
"""
Run an environment
:param environment_run_input: Environment run specifications
:param secrets: Optional list of secrets to pass to the environment
:param payload: Environment run specifications
:return: Status
"""
if payload.get("environment_run_input"):
environment_run_input = EnvironmentRunInput(**payload.get("environment_run_input", {}))
secrets_dict = payload.get("secrets", [])
secrets = [SecretInput(**secret) for secret in secrets_dict]
else:
environment_run_input = EnvironmentRunInput(**payload)
secrets = []
return await self.environment_run(environment_run_input, secrets)

@router.post("/environment/check")
Expand All @@ -235,13 +263,19 @@ async def kb_create_endpoint(kb_input: KBDeployment) -> KBDeployment:
return await self.kb_create(kb_input)

@router.post("/kb/run")
async def kb_run_endpoint(kb_run_input: KBRunInput, secrets: List[SecretInput] = []) -> KBRun:
async def kb_run_endpoint(payload: dict) -> KBRun:
"""
Run a knowledge base
:param kb_run_input: KBRunInput
:param secrets: Optional list of secrets to pass to the knowledge base
:param payload: KB run specifications
:return: KBRun
"""
if payload.get("kb_run_input"):
kb_run_input = KBRunInput(**payload.get("kb_run_input", {}))
secrets_dict = payload.get("secrets", [])
secrets = [SecretInput(**secret) for secret in secrets_dict]
else:
kb_run_input = KBRunInput(**payload)
secrets = []
return await self.kb_run(kb_run_input, secrets)

@router.post("/kb/check")
Expand All @@ -263,13 +297,19 @@ async def memory_create_endpoint(memory_input: MemoryDeployment) -> MemoryDeploy
return await self.memory_create(memory_input)

@router.post("/memory/run")
async def memory_run_endpoint(memory_run_input: MemoryRunInput, secrets: List[SecretInput] = [] ) -> MemoryRun:
async def memory_run_endpoint(payload: dict) -> MemoryRun:
"""
Run a memory module
:param memory_run_input: Memory run specifications
:param secrets: Optional list of secrets to pass to the memory
:param payload: Memory run specifications
:return: Status
"""
if payload.get("memory_run_input"):
memory_run_input = MemoryRunInput(**payload.get("memory_run_input", {}))
secrets_dict = payload.get("secrets", [])
secrets = [SecretInput(**secret) for secret in secrets_dict]
else:
memory_run_input = MemoryRunInput(**payload)
secrets = []
return await self.memory_run(memory_run_input, secrets)

@router.post("/memory/check")
Expand Down Expand Up @@ -297,35 +337,6 @@ async def user_register_endpoint(user_input: dict):
if '_sa_instance_state' in response:
response.pop('_sa_instance_state')
return response

@router.post("/user/secret/create")
async def user_secret_create_endpoint(
existing_secrets: List[SecretInput] = [],
secrets: List[SecretInput] = [],
signature: str = Query(...),
is_update: Optional[bool] = Query(False)
):
user_id = secrets[0].user_id.replace("<record>", "") if secrets else ""

if not user_id:
raise HTTPException(status_code=400, detail=f"Data cannot be empty")

if not verify_signature(user_id, signature, user_id.split(":")[1]):
raise HTTPException(status_code=401, detail="Unauthorized: Invalid signature")

try:
aes_secret = base64.b64decode(os.getenv("AES_SECRET"))
for data in secrets:
rsa_decrypted_secret_value = self.secret.decrypt_rsa(data.secret_value)
encrypted_secret_value = self.secret.encrypt_with_aes(rsa_decrypted_secret_value, aes_secret)
data.secret_value = encrypted_secret_value

async with HubDBSurreal() as hub:
result = await hub.create_secret(secrets, is_update, existing_secrets)

return result
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error processing secrets: {str(e)}")

async def get_server_connection(self, server_id: str):
"""
Expand Down Expand Up @@ -461,7 +472,7 @@ async def run_module(
secret = Secret()

for record in secrets:
decrypted_value = secret.decrypt_with_aes(record.secret_value, base64.b64decode(os.getenv("AES_SECRET")))
decrypted_value = secret.decrypt_rsa(record.secret_value)
user_env_data[record.key_name] = decrypted_value

# Create module run record in DB
Expand All @@ -478,17 +489,17 @@ async def run_module(
# Execute the task based on module type
if module_run_input.deployment.module.execution_type == ModuleExecutionType.package:
if module_type == "agent":
_ = run_agent.delay(module_run_data, user_env_data)
_ = run_agent.delay(module_run_data, user_env_data, [secret.model_dump() for secret in secrets] if secrets else [])
elif module_type == "tool":
_ = run_tool.delay(module_run_data, user_env_data)
_ = run_tool.delay(module_run_data, user_env_data, [secret.model_dump() for secret in secrets] if secrets else [])
elif module_type == "orchestrator":
_ = run_orchestrator.delay(module_run_data, user_env_data)
_ = run_orchestrator.delay(module_run_data, user_env_data, [secret.model_dump() for secret in secrets] if secrets else [])
elif module_type == "environment":
_ = run_environment.delay(module_run_data, user_env_data)
_ = run_environment.delay(module_run_data, user_env_data, [secret.model_dump() for secret in secrets] if secrets else [])
elif module_type == "kb":
_ = run_kb.delay(module_run_data, user_env_data)
_ = run_kb.delay(module_run_data, user_env_data, [secret.model_dump() for secret in secrets] if secrets else [])
elif module_type == "memory":
_ = run_memory.delay(module_run_data, user_env_data)
_ = run_memory.delay(module_run_data, user_env_data, [secret.model_dump() for secret in secrets] if secrets else [])
elif module_run_input.deployment.module.execution_type == ModuleExecutionType.docker and module_type == "agent":
# validate docker params
try:
Expand Down
6 changes: 6 additions & 0 deletions node/server/protos/grpc_server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ message NodeConfig {
repeated string models = 21;
}

message Secret {
string user_id = 1;
string secret_value = 2;
string key_name = 3;
}

message Module {
string id = 1;
Expand Down Expand Up @@ -140,6 +145,7 @@ message ModuleRunRequest {
}
repeated ModuleRun orchestrator_runs = 9;
optional string signature = 10;
repeated Secret secrets = 11;
}

message ModuleRun {
Expand Down
1 change: 0 additions & 1 deletion node/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ def signal_handler(sig):

secret = Secret(server_private_key_path, server_public_key_path, env_file_path)
secret.check_and_generate_keys()
secret.check_and_generate_aes_secret()

asyncio.run(run_server(
communication_protocol=args.communication_protocol,
Expand Down
20 changes: 16 additions & 4 deletions node/server/ws_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from datetime import datetime
import traceback
import logging
from typing import Dict, Any, Union
from typing import Dict, Any, Union, Optional, List
from pathlib import Path
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
Expand All @@ -20,6 +20,7 @@
KBRunInput, KBDeployment,
OrchestratorRunInput, OrchestratorDeployment,
ModuleExecutionType,
SecretInput
)
from node.storage.db.db import LocalDBPostgres
from node.user import register_user, check_user
Expand All @@ -34,6 +35,7 @@

)
from node.module_manager import setup_module_deployment
from node.secret import Secret

logger = logging.getLogger(__name__)
load_dotenv()
Expand Down Expand Up @@ -252,8 +254,13 @@ async def run_module_endpoint(self, websocket: WebSocket, module_type: str, clie
try:
data = await websocket.receive_json()
logger.info(f"Endpoint: run_{module_type} :: Received data: {data}")
if data.get('secrets'):
secrets = data.pop('secrets')
logger.info(f"Endpoint: run_{module_type} :: Received1 secrets: {secrets}")
else:
secrets = []

result = await self.run_module(module_type, data, client_id)
result = await self.run_module(module_type, data, client_id, [SecretInput(**secret) for secret in secrets] if secrets else [])
logger.info(f"Endpoint: run_{module_type} :: Sending result: {result}")
await self.manager.send_message(result, client_id, f"run_{module_type}")

Expand Down Expand Up @@ -283,7 +290,7 @@ async def run_module_endpoint(self, websocket: WebSocket, module_type: str, clie
logger.warning(f"Client {client_id} disconnected (outer).")
self.manager.disconnect(client_id, f"run_{module_type}")

async def run_module(self, module_type: str, data: dict, client_id: str) -> str:
async def run_module(self, module_type: str, data: dict, client_id: str, secrets: List[SecretInput] = []) -> str:
try:
# Map module types to their corresponding input and run classes
module_configs = {
Expand Down Expand Up @@ -355,8 +362,13 @@ async def run_module(self, module_type: str, data: dict, client_id: str) -> str:
else:
execution_type = module_run.deployment.module.execution_type

secret_obj = Secret()
user_env_data = {}
for secret in secrets:
user_env_data[secret.key_name] = secret_obj.decrypt_rsa(secret.secret_value)

if execution_type == ModuleExecutionType.package or execution_type == 'package':
task = config["worker"].delay(module_run_data)
task = config["worker"].delay(module_run_data, user_env_data, [secret.model_dump() for secret in secrets] if secrets else [])
elif execution_type == ModuleExecutionType.docker or execution_type == 'docker':
task = execute_docker_agent.delay(module_run_data)
else:
Expand Down
Loading
Loading