Skip to content

Commit 96eaec7

Browse files
MarkoVcodeclaude
andcommitted
Add data recording feature with multi-device support
Implement comprehensive backend infrastructure for recording instrument data with pause/resume functionality, real-time streaming, and CSV export. Key features: - SQLite database with flexible path resolution (web/Electron compatible) - Multi-device recording: capture data from multiple instruments simultaneously - Pause/resume support with accurate duration tracking - Real-time WebSocket streaming for live charts - REST API endpoints for recording management - CSV export per recording series - Manual deletion only (no auto-cleanup) Backend components: - Database layer: RecordingSeries and DataPoint models - Recording service: async data collection with error handling - REST API: 9 endpoints for full recording lifecycle - WebSocket endpoint: live data broadcasting - Test suite: 39 new tests (85 total, all passing) Dependencies added: - SQLAlchemy for database ORM - websockets for real-time streaming - echarts & echarts-for-react for frontend charts (Phase 2) - axios for frontend HTTP requests (Phase 2) This completes Phase 1 (Backend Foundation) following TDD principles. Ready for Phase 2: Frontend implementation with Apache ECharts. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent d7733e2 commit 96eaec7

13 files changed

Lines changed: 2219 additions & 2 deletions

File tree

16 KB
Binary file not shown.

benchmesh-serial-service/frontend/package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
"react-dom": "^18.2.0",
1717
"react-markdown": "^10.1.0",
1818
"remark-gfm": "^4.0.1",
19-
"swagger-ui-react": "^5.29.4"
19+
"swagger-ui-react": "^5.29.4",
20+
"echarts": "^5.5.0",
21+
"echarts-for-react": "^3.0.2",
22+
"axios": "^1.7.0"
2023
},
2124
"devDependencies": {
2225
"@testing-library/jest-dom": "^6.4.8",

benchmesh-serial-service/requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ loguru
44
fastapi>=0.111.0
55
uvicorn[standard]>=0.30.0
66
pydantic>=1.8
7-
httpx>=0.24
7+
httpx>=0.24
8+
sqlalchemy>=2.0.0
9+
websockets>=12.0
Lines changed: 364 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,364 @@
1+
"""
2+
Recording API endpoints.
3+
4+
Provides REST API for managing data recordings with pause/resume support.
5+
"""
6+
7+
import json
8+
import csv
9+
import io
10+
from typing import List, Optional
11+
from datetime import datetime
12+
13+
from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect
14+
from fastapi.responses import StreamingResponse
15+
from sqlalchemy.orm import Session
16+
from pydantic import BaseModel
17+
18+
from benchmesh_service.database import get_db
19+
from benchmesh_service.models.recording import RecordingSeries, DataPoint
20+
import benchmesh_service.services.recording_service as recording_service_module
21+
from benchmesh_service.logger import logger
22+
23+
24+
class ChannelConfig(BaseModel):
25+
"""Configuration for a single recording channel."""
26+
device_id: str
27+
parameter: str
28+
label: str
29+
30+
31+
class StartRecordingRequest(BaseModel):
32+
"""Request to start a new recording."""
33+
name: str
34+
channels: List[ChannelConfig]
35+
interval_seconds: float
36+
description: Optional[str] = None
37+
38+
39+
class RecordingResponse(BaseModel):
40+
"""Response for recording operations."""
41+
series_id: int
42+
name: str
43+
status: str
44+
paused_at: Optional[str] = None
45+
46+
47+
class RecordingDetailsResponse(BaseModel):
48+
"""Detailed recording information."""
49+
id: int
50+
name: str
51+
description: Optional[str]
52+
interval_seconds: float
53+
channels: List[dict]
54+
start_time: str
55+
end_time: Optional[str]
56+
paused_at: Optional[str]
57+
pause_duration_seconds: float
58+
data_points_count: int
59+
60+
61+
def create_recording_router() -> APIRouter:
62+
"""
63+
Create and configure the recording API router.
64+
65+
Returns:
66+
APIRouter: Configured router
67+
"""
68+
router = APIRouter(prefix="/api/recordings", tags=["recordings"])
69+
70+
@router.post("/start")
71+
async def start_recording(
72+
request: StartRecordingRequest,
73+
db: Session = Depends(get_db)
74+
):
75+
"""Start a new recording session."""
76+
try:
77+
# Convert channels to dict format
78+
channels = [ch.model_dump() for ch in request.channels]
79+
80+
series = await recording_service_module.recording_service.start_recording(
81+
db=db,
82+
name=request.name,
83+
channels=channels,
84+
interval_seconds=request.interval_seconds,
85+
description=request.description
86+
)
87+
88+
return {
89+
"series_id": series.id,
90+
"name": series.name,
91+
"status": "recording"
92+
}
93+
except Exception as e:
94+
logger.error(f"Error starting recording: {e}")
95+
raise HTTPException(status_code=400, detail=str(e))
96+
97+
@router.post("/{series_id}/pause")
98+
async def pause_recording(series_id: int, db: Session = Depends(get_db)):
99+
"""Pause an active recording."""
100+
try:
101+
series = await recording_service_module.recording_service.pause_recording(db, series_id)
102+
if not series:
103+
raise HTTPException(status_code=404, detail="Recording not found")
104+
105+
return {
106+
"series_id": series.id,
107+
"name": series.name,
108+
"status": "paused",
109+
"paused_at": series.paused_at.isoformat() if series.paused_at else None
110+
}
111+
except ValueError as e:
112+
raise HTTPException(status_code=404, detail=str(e))
113+
except Exception as e:
114+
logger.error(f"Error pausing recording: {e}")
115+
raise HTTPException(status_code=400, detail=str(e))
116+
117+
@router.post("/{series_id}/resume")
118+
async def resume_recording(series_id: int, db: Session = Depends(get_db)):
119+
"""Resume a paused recording."""
120+
try:
121+
series = await recording_service_module.recording_service.resume_recording(db, series_id)
122+
if not series:
123+
raise HTTPException(status_code=404, detail="Recording not found")
124+
125+
return {
126+
"series_id": series.id,
127+
"name": series.name,
128+
"status": "recording",
129+
"paused_at": None
130+
}
131+
except ValueError as e:
132+
raise HTTPException(status_code=404, detail=str(e))
133+
except Exception as e:
134+
logger.error(f"Error resuming recording: {e}")
135+
raise HTTPException(status_code=400, detail=str(e))
136+
137+
@router.post("/{series_id}/stop")
138+
async def stop_recording(series_id: int, db: Session = Depends(get_db)):
139+
"""Stop an active recording."""
140+
# Check if recording exists first
141+
series = db.query(RecordingSeries).filter(
142+
RecordingSeries.id == series_id
143+
).first()
144+
145+
if not series:
146+
raise HTTPException(status_code=404, detail="Recording not found")
147+
148+
try:
149+
series = await recording_service_module.recording_service.stop_recording(db, series_id)
150+
151+
duration = 0
152+
if series.end_time and series.start_time:
153+
duration = (series.end_time - series.start_time).total_seconds()
154+
# Subtract pause duration
155+
duration -= series.pause_duration_seconds
156+
157+
return {
158+
"series_id": series.id,
159+
"name": series.name,
160+
"status": "stopped",
161+
"data_points_count": series.data_points_count,
162+
"duration_seconds": duration,
163+
"pause_duration_seconds": series.pause_duration_seconds
164+
}
165+
except Exception as e:
166+
logger.error(f"Error stopping recording: {e}")
167+
raise HTTPException(status_code=400, detail=str(e))
168+
169+
@router.get("")
170+
async def list_recordings(db: Session = Depends(get_db)):
171+
"""List all recording series."""
172+
series_list = db.query(RecordingSeries).order_by(
173+
RecordingSeries.start_time.desc()
174+
).all()
175+
176+
return {
177+
"series": [
178+
{
179+
"id": s.id,
180+
"name": s.name,
181+
"description": s.description,
182+
"start_time": s.start_time.isoformat(),
183+
"end_time": s.end_time.isoformat() if s.end_time else None,
184+
"data_points_count": s.data_points_count,
185+
"status": "paused" if s.paused_at else ("completed" if s.end_time else "recording")
186+
}
187+
for s in series_list
188+
]
189+
}
190+
191+
@router.get("/{series_id}")
192+
async def get_recording(series_id: int, db: Session = Depends(get_db)):
193+
"""Get details of a specific recording."""
194+
series = db.query(RecordingSeries).filter(
195+
RecordingSeries.id == series_id
196+
).first()
197+
198+
if not series:
199+
raise HTTPException(status_code=404, detail="Recording not found")
200+
201+
return {
202+
"id": series.id,
203+
"name": series.name,
204+
"description": series.description,
205+
"interval_seconds": series.interval_seconds,
206+
"channels": json.loads(series.channels),
207+
"start_time": series.start_time.isoformat(),
208+
"end_time": series.end_time.isoformat() if series.end_time else None,
209+
"paused_at": series.paused_at.isoformat() if series.paused_at else None,
210+
"pause_duration_seconds": series.pause_duration_seconds,
211+
"data_points_count": series.data_points_count
212+
}
213+
214+
@router.delete("/{series_id}")
215+
async def delete_recording(series_id: int, db: Session = Depends(get_db)):
216+
"""Delete a recording series and all its data points."""
217+
series = db.query(RecordingSeries).filter(
218+
RecordingSeries.id == series_id
219+
).first()
220+
221+
if not series:
222+
raise HTTPException(status_code=404, detail="Recording not found")
223+
224+
# Stop recording if active
225+
if series_id in recording_service_module.recording_service.get_active_recording_ids():
226+
await recording_service_module.recording_service.stop_recording(db, series_id)
227+
228+
db.delete(series)
229+
db.commit()
230+
231+
logger.info(f"Deleted recording: {series.name} (ID: {series_id})")
232+
233+
return {
234+
"status": "deleted",
235+
"series_id": series_id
236+
}
237+
238+
@router.get("/{series_id}/data")
239+
async def get_recording_data(
240+
series_id: int,
241+
offset: int = 0,
242+
limit: int = 1000,
243+
db: Session = Depends(get_db)
244+
):
245+
"""Get data points for a recording series."""
246+
series = db.query(RecordingSeries).filter(
247+
RecordingSeries.id == series_id
248+
).first()
249+
250+
if not series:
251+
raise HTTPException(status_code=404, detail="Recording not found")
252+
253+
# Get total count
254+
total_points = series.data_points_count
255+
256+
# Get paginated data points
257+
points = db.query(DataPoint).filter(
258+
DataPoint.series_id == series_id
259+
).order_by(
260+
DataPoint.timestamp
261+
).offset(offset).limit(limit).all()
262+
263+
# Format data
264+
data = [
265+
{
266+
"timestamp": point.timestamp.isoformat(),
267+
**json.loads(point.measurements)
268+
}
269+
for point in points
270+
]
271+
272+
return {
273+
"series_id": series_id,
274+
"total_points": total_points,
275+
"offset": offset,
276+
"limit": limit,
277+
"data": data
278+
}
279+
280+
@router.get("/{series_id}/export")
281+
async def export_recording(series_id: int, db: Session = Depends(get_db)):
282+
"""Export recording data to CSV."""
283+
series = db.query(RecordingSeries).filter(
284+
RecordingSeries.id == series_id
285+
).first()
286+
287+
if not series:
288+
raise HTTPException(status_code=404, detail="Recording not found")
289+
290+
# Get all data points
291+
points = db.query(DataPoint).filter(
292+
DataPoint.series_id == series_id
293+
).order_by(DataPoint.timestamp).all()
294+
295+
# Generate CSV
296+
output = io.StringIO()
297+
298+
if points:
299+
# Get all measurement keys from first point
300+
first_measurements = json.loads(points[0].measurements)
301+
fieldnames = ["timestamp"] + sorted(first_measurements.keys())
302+
303+
writer = csv.DictWriter(output, fieldnames=fieldnames)
304+
writer.writeheader()
305+
306+
for point in points:
307+
measurements = json.loads(point.measurements)
308+
row = {"timestamp": point.timestamp.isoformat()}
309+
row.update(measurements)
310+
writer.writerow(row)
311+
else:
312+
# Empty CSV with just headers
313+
channels = json.loads(series.channels)
314+
fieldnames = ["timestamp"] + [
315+
f"{ch['device_id']}.{ch['parameter']}" for ch in channels
316+
]
317+
writer = csv.DictWriter(output, fieldnames=fieldnames)
318+
writer.writeheader()
319+
320+
# Return CSV file
321+
output.seek(0)
322+
return StreamingResponse(
323+
iter([output.getvalue()]),
324+
media_type="text/csv",
325+
headers={
326+
"Content-Disposition": f"attachment; filename={series.name.replace(' ', '_')}.csv"
327+
}
328+
)
329+
330+
@router.websocket("/ws/{series_id}")
331+
async def websocket_endpoint(websocket: WebSocket, series_id: int):
332+
"""WebSocket endpoint for live data streaming."""
333+
await websocket.accept()
334+
335+
# Verify series exists
336+
from benchmesh_service.database import get_db_context
337+
with get_db_context() as db:
338+
series = db.query(RecordingSeries).filter(
339+
RecordingSeries.id == series_id
340+
).first()
341+
342+
if not series:
343+
await websocket.close(code=1008, reason="Recording not found")
344+
return
345+
346+
# Add websocket to recording service
347+
await recording_service_module.recording_service.add_websocket(series_id, websocket)
348+
349+
try:
350+
# Keep connection alive and wait for disconnect
351+
while True:
352+
# Receive message (mainly to detect disconnect)
353+
data = await websocket.receive_text()
354+
# Echo back for ping/pong
355+
await websocket.send_json({"type": "pong"})
356+
except WebSocketDisconnect:
357+
logger.debug(f"WebSocket disconnected for series {series_id}")
358+
except Exception as e:
359+
logger.error(f"WebSocket error for series {series_id}: {e}")
360+
finally:
361+
# Remove websocket from recording service
362+
await recording_service_module.recording_service.remove_websocket(series_id, websocket)
363+
364+
return router

0 commit comments

Comments
 (0)