-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNexusLEDInterface.py
More file actions
173 lines (131 loc) · 5.32 KB
/
NexusLEDInterface.py
File metadata and controls
173 lines (131 loc) · 5.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import datetime
import time
import requests
import sys
import serial
import dotenv
import os
from enum import Enum
class MockSerial:
def __init__(self, port, **kwargs):
self.name = port
self.is_open = True
def write(self, data):
# Instead of sending to hardware, we just see it in the console
print(f"[SERIAL SEND] -> {data.decode('utf-8').strip()}")
def close(self):
self.is_open = False
print("Mock Serial Closed.")
class BumperColour(Enum):
RED = "red"
BLUE = "blue"
class Matches:
def __init__(self, data):
self.data = data if isinstance(data, list) else []
def next_match(self):
if not self.data:
return Match(None)
return Match(next(filter(lambda m: not m['status'] == 'On field', self.data), None))
class Match:
def __init__(self, data):
self.data = data
def get_time_to_match_ms(self) -> int:
if not self.data or 'times' not in self.data:
return -1
queue_time = self.data.get('times', {}).get('estimatedQueueTime')
if queue_time is None:
return -1
return max(int(queue_time - (time.time() * 1000)), 0)
def get_bumper_colour(self, team_number: str):
if self.data is None:
return None
return BumperColour.RED if team_number in self.data.get('redTeams', []) else BumperColour.BLUE
class EventData:
def __init__(self, data):
self.data = data
def get_team_matches(self, team_number: str):
if self.data is None:
return Matches(None)
return Matches(list(filter(lambda m: team_number in m.get('redTeams', []) + m.get('blueTeams', []), self.data.get('matches', []))))
dotenv.load_dotenv()
event_key = "demo1943"
url = "https://frc.nexus/api/v1/event/" + event_key
headers = {"Nexus-Api-Key": os.environ['NEXUS_API_KEY']}
def get_event_data() -> EventData:
response = requests.get(url, headers=headers)
if not response.ok:
error_message = response.text
print(f"{error_message=}")
return EventData(data={})
data = response.json()
print(data)
return EventData(data=data)
def get_team_matches(event_data: dict, team_number: int) -> list:
return list(filter(lambda m: team_number in m.get('redTeams', []) + m.get('blueTeams', []), event_data['matches']))
def get_next_match(match_data: list, team_number: int) -> dict | None:
next(filter(lambda m: not m['status'] == 'On field', match_data), None)
def get_bumper_colour(match: dict, team_number: int) -> BumperColour:
return BumperColour.RED if team_number in match.get('redTeams', []) else BumperColour.BLUE
def get_time_to_match_ms(match: dict) -> int:
queue_time = match['times'].get('estimatedQueueTime', None)
return queue_time - (time.time() * 1000)
serial_port = None
try:
try:
serial_port = serial.Serial(port='COM9', baudrate=9600, timeout=1)
print("Connected to REAL serial port.")
except (serial.SerialException, FileNotFoundError):
print("Real serial port not found. SWITCHING TO MOCK MODE.")
serial_port = MockSerial(port='COM9')
time.sleep(1)
print(f"{serial_port.name=}")
next_match = None
bumper_colour = None
time_to_match = -1
pattern = "white"
speed = 100
last_api_update = 0
last_serial_update = 0
while(True):
now = time.time()
if now - last_api_update >= 5:
event_data = get_event_data()
team_matches = event_data.get_team_matches('2702')
next_match = team_matches.next_match()
if next_match.data is not None:
bumper_colour = next_match.get_bumper_colour('2702')
time_to_match = next_match.get_time_to_match_ms()
else:
bumper_colour = None
time_to_match = -1
last_api_update = now
if now - last_serial_update >= 2:
if time_to_match > 600000:
pattern = "white"
speed = 100
elif time_to_match > 300000:
pattern = f"white-{bumper_colour.value}-edge" if bumper_colour is not None else "white"
speed = 100
elif time_to_match >= 0:
pattern = f"{bumper_colour.value}-chaser" if bumper_colour is not None else "white"
speed = int((time_to_match - 300000) * (2000 - 100) / (0 - 300000) + 100)
else:
pattern = "white"
speed = 100
time_min = round(time_to_match / 60000, 2)
print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] "
f"Match in: {time_min}m | Pattern: {pattern} | Speed: {int(speed)}")
if serial_port and serial_port.is_open:
serial_port.write(f'set-pattern {pattern}\n'.encode('utf-8'))
serial_port.write(f'speed {int(speed)}\n'.encode('utf-8'))
last_serial_update = now
time.sleep(0.1)
except serial.SerialException as e:
print(f"Error opening serial port: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
# Always ensure the port is closed
if 'serial_port' in locals() and serial_port is not None and serial_port.is_open:
serial_port.close()
print("Serial port closed.")