Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,17 @@ bash tools/run_tts_clips.sh
Clip mode uses `~/blindnav_alert_clips` by default and falls back to
`espeak-ng` if a phrase clip is missing.

To test clip mode with optional non-speech proximity tones:

```bash
export BLINDNAV_LOG_UPLOAD=1
bash tools/run_tts_clips_tones.sh
```

This keeps speech for action-level alerts and adds local stereo pulses for
nearby hazards. Set `BLINDNAV_AUDIO_MODE=quiet|balanced|training` and
`BLINDNAV_TONE_VOLUME=0.0..1.0` to tune it.

To upload each completed run's CSV and event log to GitHub automatically:

```bash
Expand Down
238 changes: 237 additions & 1 deletion raspberry_pi/yolo_realsense_navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,30 @@
QUEUE_CLEAR_WHEN_BUSY = os.environ.get(
"BLINDNAV_QUEUE_CLEAR_WHEN_BUSY", "0"
).strip().lower() in {"1", "true", "yes", "on"}
BLINDNAV_AUDIO_MODE = os.environ.get("BLINDNAV_AUDIO_MODE", "balanced").strip().lower()
if BLINDNAV_AUDIO_MODE not in {"quiet", "balanced", "training"}:
BLINDNAV_AUDIO_MODE = "balanced"
PROXIMITY_TONES_ENABLED = os.environ.get(
"BLINDNAV_PROXIMITY_TONES", "0"
).strip().lower() in {"1", "true", "yes", "on"}
PROXIMITY_TONES_DURING_VOICE = os.environ.get(
"BLINDNAV_TONES_DURING_VOICE", "1"
).strip().lower() in {"1", "true", "yes", "on"}
TONE_REPLACES_OBSTACLE_VOICE = os.environ.get(
"BLINDNAV_TONE_REPLACES_OBSTACLE_VOICE", "1"
).strip().lower() in {"1", "true", "yes", "on"}
PROXIMITY_TONE_VOLUME = max(
0.0, min(1.0, float(os.environ.get("BLINDNAV_TONE_VOLUME", "0.35")))
)
PROXIMITY_TONE_MIN_CM = 35
PROXIMITY_TONE_VOICE_KEEP_CM = 90
PROXIMITY_TONE_URGENT_VOICE_KEEP_CM = 100
PROXIMITY_TONE_BALANCED_MAX_CM = 200
PROXIMITY_TONE_TRAINING_MAX_CM = MAX_VOICE_DISTANCE_CM
PROXIMITY_TONE_MIN_INTERVAL_S = 0.18
PROXIMITY_TONE_MAX_INTERVAL_S = 1.40
PROXIMITY_TONE_DURATION_S = 0.055
PROXIMITY_TONE_SAMPLE_RATE = 22050

POSITION_CAMERA_HFOV_DEG = 69.0
POSITION_SIDE_ENTER_DEG = 13.1
Expand Down Expand Up @@ -351,6 +375,33 @@ def _within_voice_distance(obj, dist_cm):
return dist_cm <= MAX_VOICE_DISTANCE_CM


def _proximity_tone_max_distance(audio_mode=BLINDNAV_AUDIO_MODE):
if audio_mode == "training":
return PROXIMITY_TONE_TRAINING_MAX_CM
if audio_mode == "balanced":
return PROXIMITY_TONE_BALANCED_MAX_CM
return 0


def _proximity_tone_interval_s(dist_cm, audio_mode=BLINDNAV_AUDIO_MODE):
max_cm = _proximity_tone_max_distance(audio_mode)
if dist_cm is None or dist_cm < 0 or max_cm <= 0 or dist_cm > max_cm:
return None
span = max(1.0, max_cm - PROXIMITY_TONE_MIN_CM)
closeness = (max_cm - max(PROXIMITY_TONE_MIN_CM, dist_cm)) / span
closeness = max(0.0, min(1.0, closeness))
ratio = PROXIMITY_TONE_MIN_INTERVAL_S / PROXIMITY_TONE_MAX_INTERVAL_S
return PROXIMITY_TONE_MAX_INTERVAL_S * (ratio ** closeness)


def _proximity_tone_gains(pos):
if pos == "on your left":
return 1.0, 0.18
if pos == "on your right":
return 0.18, 1.0
return 0.85, 0.85


def _spoken_object_name(obj, tier):
if obj == "person":
return obj
Expand Down Expand Up @@ -506,6 +557,17 @@ def _queue_tier_for_voice_decision(tier, reason):
return tier


def _tone_replaces_voice(queue_tier, obj, reason, dist_cm, tones_enabled):
if (not tones_enabled or not TONE_REPLACES_OBSTACLE_VOICE
or queue_tier is None or obj == "person"):
return False
if dist_cm is None or dist_cm < 0:
return False
if queue_tier == "urgent":
return reason.startswith("ttc_") and dist_cm > PROXIMITY_TONE_URGENT_VOICE_KEEP_CM
return queue_tier in {"warning", "awareness"} and dist_cm > PROXIMITY_TONE_VOICE_KEEP_CM


def _repo_root():
return os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))

Expand Down Expand Up @@ -1457,6 +1519,161 @@ def shutdown(self, timeout=6.0):
worker.join(timeout=remaining)


# ============= PROXIMITY SONIFICATION (v3.31) =============
class ProximityTonePlayer:
"""
Optional non-speech proximity pulses.

Speech remains the safety/action channel. Tones are an ambient cue for
nearby hazards, mapped exponentially so close hazards get more precision
and far hazards stay quiet.
"""

def __init__(self, enabled=PROXIMITY_TONES_ENABLED,
audio_mode=BLINDNAV_AUDIO_MODE, volume=PROXIMITY_TONE_VOLUME,
allow_during_voice=PROXIMITY_TONES_DURING_VOICE,
event_logger=None, _player_fn=None):
self.enabled = bool(enabled) and audio_mode != "quiet"
self.audio_mode = audio_mode
self.volume = max(0.0, min(1.0, float(volume)))
self.allow_during_voice = bool(allow_during_voice)
self._event_logger = event_logger
self._player_fn = _player_fn
self._lock = threading.Lock()
self._last_pulse = 0.0
self._thread = None
self._last_log = 0.0
self._shutting_down = False

def update(self, threats, frame_width=640, frame_tag=None, voice_busy=False):
if not self.enabled:
return
selected = self._select_track(threats, frame_width, frame_tag)
if selected is None:
return
score, track, pos, interval = selected
if voice_busy and not self._should_play_while_voice(score, track, pos):
return
now = time.time()
with self._lock:
if self._shutting_down:
return
if self._thread is not None and self._thread.is_alive():
return
if now - self._last_pulse < interval:
return
self._last_pulse = now
left_gain, right_gain = _proximity_tone_gains(pos)
dist_cm = track.distance
freq = 1120 if dist_cm <= 70 else 880
duration = 0.075 if dist_cm <= 70 else PROXIMITY_TONE_DURATION_S
self._thread = threading.Thread(
target=self._play_worker,
args=(left_gain, right_gain, freq, duration, track.class_name,
pos, dist_cm, interval, score, voice_busy),
daemon=True,
)
self._thread.start()

def _should_play_while_voice(self, score, track, pos):
if not self.allow_during_voice:
return False
dist_cm = getattr(track, "distance", None)
if dist_cm is None or dist_cm < 0:
return False
side_person = (
track.class_name == "person"
and pos != "ahead"
and dist_cm <= SIDE_PASS_PERSON_AWARE_CM
)
return side_person or dist_cm <= SIDE_PASS_PERSON_WARN_CM or score >= 50

def _select_track(self, threats, frame_width, frame_tag):
max_cm = _proximity_tone_max_distance(self.audio_mode)
if max_cm <= 0:
return None
for score, track in threats:
dist_cm = getattr(track, "distance", None)
if dist_cm is None or dist_cm < 0 or dist_cm > max_cm:
continue
if getattr(track, "seen_frames", 0) < 3:
continue
pos = get_position(track, frame_width=frame_width, frame_tag=frame_tag)
side_person = (
track.class_name == "person"
and pos != "ahead"
and dist_cm <= SIDE_PASS_PERSON_AWARE_CM
)
if not (dist_cm <= WARNING_DISTANCE or score >= 10 or side_person):
continue
interval = _proximity_tone_interval_s(dist_cm, self.audio_mode)
if interval is None:
continue
return score, track, pos, interval
return None

def _play_worker(self, left_gain, right_gain, freq, duration,
obj, pos, dist_cm, interval, score, voice_busy):
wav_path = None
try:
wav_path = self._write_tone_wav(left_gain, right_gain, freq, duration)
now = time.time()
if self._event_logger and now - self._last_log > 2.0:
self._last_log = now
self._event_logger(
f"[TONE] {obj} {pos}: dist={dist_cm}cm "
f"interval={interval:.2f}s score={score:.1f} "
f"mode={self.audio_mode} voice_busy={int(bool(voice_busy))}"
)
if self._player_fn is not None:
proc = self._player_fn(wav_path)
else:
proc = subprocess.Popen(["aplay", wav_path], stderr=subprocess.DEVNULL)
proc.wait()
except Exception as exc:
if self._event_logger:
self._event_logger(f"[TONE] error: {exc}")
finally:
try:
if wav_path:
os.unlink(wav_path)
except Exception:
pass

def _write_tone_wav(self, left_gain, right_gain, freq, duration):
frames = max(1, int(PROXIMITY_TONE_SAMPLE_RATE * duration))
fade_frames = max(1, int(PROXIMITY_TONE_SAMPLE_RATE * 0.006))
amp = int(32767 * self.volume)
fd, path = tempfile.mkstemp(prefix="blindnav_tone_", suffix=".wav")
os.close(fd)
with wave.open(path, "wb") as wav:
wav.setnchannels(2)
wav.setsampwidth(2)
wav.setframerate(PROXIMITY_TONE_SAMPLE_RATE)
buf = bytearray()
for i in range(frames):
phase = 2.0 * math.pi * freq * (i / PROXIMITY_TONE_SAMPLE_RATE)
env = 1.0
if i < fade_frames:
env = i / fade_frames
elif i > frames - fade_frames:
env = max(0.0, (frames - i) / fade_frames)
sample = int(math.sin(phase) * amp * env)
left = int(sample * left_gain)
right = int(sample * right_gain)
buf.extend(left.to_bytes(2, "little", signed=True))
buf.extend(right.to_bytes(2, "little", signed=True))
wav.writeframes(bytes(buf))
return path

def shutdown(self, timeout=1.0):
with self._lock:
self._shutting_down = True
thread = self._thread
if thread is not None:
thread.join(timeout=max(0.0, timeout))


# ============= THREAT TRANSITION TRACKER =============
class ThreatTransitionTracker:
MIN_THREAT_FRAMES = 12
Expand Down Expand Up @@ -2613,6 +2830,7 @@ def log_event(msg):

motion = MotionDetector()
voice = VoiceAssistant(event_logger=log_event)
tones = ProximityTonePlayer(event_logger=log_event)
scene = SceneDescriber(voice, start_keyboard=not VOICE_INPUT_ENABLED)
nav_snapshot = NavigationSnapshot()
command_router = CommandRouter(voice, scene, nav_snapshot, event_logger=log_event)
Expand Down Expand Up @@ -2711,6 +2929,12 @@ def _capture_worker():
f"clip_live_piper={'on' if CLIP_MODE_ALLOW_LIVE_PIPER else 'off'} | "
f"silence={FAST_ALERT_SILENCE_MS if ALERT_TTS_MODE in {'espeak', 'clips'} else PIPER_SILENCE_MS}ms\n"
)
print(
f"[AUDIO] mode={BLINDNAV_AUDIO_MODE} | "
f"proximity_tones={'on' if tones.enabled else 'off'} | "
f"tone_max={_proximity_tone_max_distance(BLINDNAV_AUDIO_MODE)}cm | "
f"tone_volume={PROXIMITY_TONE_VOLUME:.2f}"
)

frame_count = 0
last_threat_print = time.time()
Expand Down Expand Up @@ -2881,7 +3105,14 @@ def log_voice_policy(now, track, pos, score, motion_eval, reason, tier="NONE"):
reason = decision["reason"]
queue_tier = _queue_tier_for_voice_decision(tier, reason)

if queue_tier == "urgent":
if _tone_replaces_voice(
queue_tier, obj, reason, dist_cm, tones.enabled):
log_voice_policy(
now, track, pos, score, motion_eval,
reason="tone_replaces_obstacle_voice",
tier=(queue_tier or "NONE").upper(),
)
elif queue_tier == "urgent":
voice.speak_urgent(
msg, key=_voice_key(pos, obj,
"side_pass_urg" if reason == "side_pass_warning"
Expand Down Expand Up @@ -2920,6 +3151,10 @@ def log_voice_policy(now, track, pos, score, motion_eval, reason, tier="NONE"):
print(line)
log_event(line)

tones.update(
threats, frame_width=w, frame_tag=frame_count,
voice_busy=voice.status_summary() != "idle")

# Cleanup stale zone keys
if frame_count % 60 == 0:
active_keys = set()
Expand Down Expand Up @@ -3036,6 +3271,7 @@ def log_voice_policy(now, track, pos, score, motion_eval, reason, tier="NONE"):
pipeline.stop()
except Exception as exc:
print(f"[CLEANUP] pipeline.stop(): {exc}")
tones.shutdown(timeout=1.0)
voice.shutdown(timeout=6.0)
csv_file.close()
event_file.close()
Expand Down
Loading