-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhost_bridge.py
More file actions
71 lines (58 loc) · 2.79 KB
/
host_bridge.py
File metadata and controls
71 lines (58 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# Placeholder file for now, I hope to expand this idea in the future
import serial
import time
import json
import urllib.request
import urllib.parse
from urllib.error import URLError
# Configure the serial connection (matching QEMU's configuration)
# The pip package 'pyserial' is required for this.
PORT = 'COM1'
BAUD = 38400
# Mock AI API (Since we don't have a real API key in this environment,
# we will simulate an AI response locally to prove the bridge works).
def query_mock_ai(prompt):
print(f"[Host] Querying AI API with prompt: '{prompt}'")
time.sleep(1) # Simulate network delay
prompt = prompt.lower()
if "hello" in prompt:
return "Hello there! I am your AI assistant running on the Host machine."
elif "math" in prompt or "calculus" in prompt:
return "Calculus is the mathematical study of continuous change."
elif "who are you" in prompt:
return "I am the EduOS Host Bridge AI! Nice to meet you."
else:
return f"I received your query: '{prompt}'. This is a mock response from Windows!"
def run_bridge():
try:
# Note: We need a virtual serial port loopback if QEMU isn't pointing to a real one.
# For simplicity, we assume QEMU is piped to a named pipe or physical port we can read
ser = serial.Serial(PORT, BAUD, timeout=1)
print(f"[Host] Bridge started. Listening on {PORT} at {BAUD} baud.")
print("[Host] Waiting for AI requests from EduOS...")
buffer = ""
while True:
if ser.in_waiting > 0:
char = ser.read().decode('utf-8', errors='ignore')
if char == '\n':
print(f"[Host] Received raw payload: {buffer}")
# Parse the payload (e.g. "AI_REQ|TUTOR|What is calculus?")
parts = buffer.split('|')
if len(parts) >= 3 and parts[0] == "AI_REQ":
action = parts[1]
data = parts[2]
print(f"[Host] Action: {action}, Data: {data}")
# Get AI response
response = query_mock_ai(data)
print(f"[Host] Sending Response: {response}")
# Send back over serial
ser.write((response + '\n').encode('utf-8'))
buffer = "" # Clear buffer
else:
buffer += char
time.sleep(0.01)
except serial.SerialException as e:
print(f"[Error] Could not open serial port {PORT}: {e}")
print("Please ensure QEMU is configured to forward COM1 to this port.")
if __name__ == "__main__":
run_bridge()