diff --git a/README.md b/README.md index 01c7202..e12545d 100644 --- a/README.md +++ b/README.md @@ -162,6 +162,17 @@ bash tools/run_tts_clips.sh Clip mode uses `~/blindnav_alert_clips` by default and falls back to `espeak-ng` if a phrase clip is missing. +To test clip mode with optional non-speech proximity tones: + +```bash +export BLINDNAV_LOG_UPLOAD=1 +bash tools/run_tts_clips_tones.sh +``` + +This keeps speech for action-level alerts and adds local stereo pulses for +nearby hazards. Set `BLINDNAV_AUDIO_MODE=quiet|balanced|training` and +`BLINDNAV_TONE_VOLUME=0.0..1.0` to tune it. + To upload each completed run's CSV and event log to GitHub automatically: ```bash diff --git a/raspberry_pi/yolo_realsense_navigation.py b/raspberry_pi/yolo_realsense_navigation.py index 1de1065..69ef4a3 100644 --- a/raspberry_pi/yolo_realsense_navigation.py +++ b/raspberry_pi/yolo_realsense_navigation.py @@ -224,6 +224,30 @@ QUEUE_CLEAR_WHEN_BUSY = os.environ.get( "BLINDNAV_QUEUE_CLEAR_WHEN_BUSY", "0" ).strip().lower() in {"1", "true", "yes", "on"} +BLINDNAV_AUDIO_MODE = os.environ.get("BLINDNAV_AUDIO_MODE", "balanced").strip().lower() +if BLINDNAV_AUDIO_MODE not in {"quiet", "balanced", "training"}: + BLINDNAV_AUDIO_MODE = "balanced" +PROXIMITY_TONES_ENABLED = os.environ.get( + "BLINDNAV_PROXIMITY_TONES", "0" +).strip().lower() in {"1", "true", "yes", "on"} +PROXIMITY_TONES_DURING_VOICE = os.environ.get( + "BLINDNAV_TONES_DURING_VOICE", "1" +).strip().lower() in {"1", "true", "yes", "on"} +TONE_REPLACES_OBSTACLE_VOICE = os.environ.get( + "BLINDNAV_TONE_REPLACES_OBSTACLE_VOICE", "1" +).strip().lower() in {"1", "true", "yes", "on"} +PROXIMITY_TONE_VOLUME = max( + 0.0, min(1.0, float(os.environ.get("BLINDNAV_TONE_VOLUME", "0.35"))) +) +PROXIMITY_TONE_MIN_CM = 35 +PROXIMITY_TONE_VOICE_KEEP_CM = 90 +PROXIMITY_TONE_URGENT_VOICE_KEEP_CM = 100 +PROXIMITY_TONE_BALANCED_MAX_CM = 200 +PROXIMITY_TONE_TRAINING_MAX_CM = MAX_VOICE_DISTANCE_CM +PROXIMITY_TONE_MIN_INTERVAL_S = 0.18 +PROXIMITY_TONE_MAX_INTERVAL_S = 1.40 +PROXIMITY_TONE_DURATION_S = 0.055 +PROXIMITY_TONE_SAMPLE_RATE = 22050 POSITION_CAMERA_HFOV_DEG = 69.0 POSITION_SIDE_ENTER_DEG = 13.1 @@ -351,6 +375,33 @@ def _within_voice_distance(obj, dist_cm): return dist_cm <= MAX_VOICE_DISTANCE_CM +def _proximity_tone_max_distance(audio_mode=BLINDNAV_AUDIO_MODE): + if audio_mode == "training": + return PROXIMITY_TONE_TRAINING_MAX_CM + if audio_mode == "balanced": + return PROXIMITY_TONE_BALANCED_MAX_CM + return 0 + + +def _proximity_tone_interval_s(dist_cm, audio_mode=BLINDNAV_AUDIO_MODE): + max_cm = _proximity_tone_max_distance(audio_mode) + if dist_cm is None or dist_cm < 0 or max_cm <= 0 or dist_cm > max_cm: + return None + span = max(1.0, max_cm - PROXIMITY_TONE_MIN_CM) + closeness = (max_cm - max(PROXIMITY_TONE_MIN_CM, dist_cm)) / span + closeness = max(0.0, min(1.0, closeness)) + ratio = PROXIMITY_TONE_MIN_INTERVAL_S / PROXIMITY_TONE_MAX_INTERVAL_S + return PROXIMITY_TONE_MAX_INTERVAL_S * (ratio ** closeness) + + +def _proximity_tone_gains(pos): + if pos == "on your left": + return 1.0, 0.18 + if pos == "on your right": + return 0.18, 1.0 + return 0.85, 0.85 + + def _spoken_object_name(obj, tier): if obj == "person": return obj @@ -506,6 +557,17 @@ def _queue_tier_for_voice_decision(tier, reason): return tier +def _tone_replaces_voice(queue_tier, obj, reason, dist_cm, tones_enabled): + if (not tones_enabled or not TONE_REPLACES_OBSTACLE_VOICE + or queue_tier is None or obj == "person"): + return False + if dist_cm is None or dist_cm < 0: + return False + if queue_tier == "urgent": + return reason.startswith("ttc_") and dist_cm > PROXIMITY_TONE_URGENT_VOICE_KEEP_CM + return queue_tier in {"warning", "awareness"} and dist_cm > PROXIMITY_TONE_VOICE_KEEP_CM + + def _repo_root(): return os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) @@ -1457,6 +1519,161 @@ def shutdown(self, timeout=6.0): worker.join(timeout=remaining) +# ============= PROXIMITY SONIFICATION (v3.31) ============= +class ProximityTonePlayer: + """ + Optional non-speech proximity pulses. + + Speech remains the safety/action channel. Tones are an ambient cue for + nearby hazards, mapped exponentially so close hazards get more precision + and far hazards stay quiet. + """ + + def __init__(self, enabled=PROXIMITY_TONES_ENABLED, + audio_mode=BLINDNAV_AUDIO_MODE, volume=PROXIMITY_TONE_VOLUME, + allow_during_voice=PROXIMITY_TONES_DURING_VOICE, + event_logger=None, _player_fn=None): + self.enabled = bool(enabled) and audio_mode != "quiet" + self.audio_mode = audio_mode + self.volume = max(0.0, min(1.0, float(volume))) + self.allow_during_voice = bool(allow_during_voice) + self._event_logger = event_logger + self._player_fn = _player_fn + self._lock = threading.Lock() + self._last_pulse = 0.0 + self._thread = None + self._last_log = 0.0 + self._shutting_down = False + + def update(self, threats, frame_width=640, frame_tag=None, voice_busy=False): + if not self.enabled: + return + selected = self._select_track(threats, frame_width, frame_tag) + if selected is None: + return + score, track, pos, interval = selected + if voice_busy and not self._should_play_while_voice(score, track, pos): + return + now = time.time() + with self._lock: + if self._shutting_down: + return + if self._thread is not None and self._thread.is_alive(): + return + if now - self._last_pulse < interval: + return + self._last_pulse = now + left_gain, right_gain = _proximity_tone_gains(pos) + dist_cm = track.distance + freq = 1120 if dist_cm <= 70 else 880 + duration = 0.075 if dist_cm <= 70 else PROXIMITY_TONE_DURATION_S + self._thread = threading.Thread( + target=self._play_worker, + args=(left_gain, right_gain, freq, duration, track.class_name, + pos, dist_cm, interval, score, voice_busy), + daemon=True, + ) + self._thread.start() + + def _should_play_while_voice(self, score, track, pos): + if not self.allow_during_voice: + return False + dist_cm = getattr(track, "distance", None) + if dist_cm is None or dist_cm < 0: + return False + side_person = ( + track.class_name == "person" + and pos != "ahead" + and dist_cm <= SIDE_PASS_PERSON_AWARE_CM + ) + return side_person or dist_cm <= SIDE_PASS_PERSON_WARN_CM or score >= 50 + + def _select_track(self, threats, frame_width, frame_tag): + max_cm = _proximity_tone_max_distance(self.audio_mode) + if max_cm <= 0: + return None + for score, track in threats: + dist_cm = getattr(track, "distance", None) + if dist_cm is None or dist_cm < 0 or dist_cm > max_cm: + continue + if getattr(track, "seen_frames", 0) < 3: + continue + pos = get_position(track, frame_width=frame_width, frame_tag=frame_tag) + side_person = ( + track.class_name == "person" + and pos != "ahead" + and dist_cm <= SIDE_PASS_PERSON_AWARE_CM + ) + if not (dist_cm <= WARNING_DISTANCE or score >= 10 or side_person): + continue + interval = _proximity_tone_interval_s(dist_cm, self.audio_mode) + if interval is None: + continue + return score, track, pos, interval + return None + + def _play_worker(self, left_gain, right_gain, freq, duration, + obj, pos, dist_cm, interval, score, voice_busy): + wav_path = None + try: + wav_path = self._write_tone_wav(left_gain, right_gain, freq, duration) + now = time.time() + if self._event_logger and now - self._last_log > 2.0: + self._last_log = now + self._event_logger( + f"[TONE] {obj} {pos}: dist={dist_cm}cm " + f"interval={interval:.2f}s score={score:.1f} " + f"mode={self.audio_mode} voice_busy={int(bool(voice_busy))}" + ) + if self._player_fn is not None: + proc = self._player_fn(wav_path) + else: + proc = subprocess.Popen(["aplay", wav_path], stderr=subprocess.DEVNULL) + proc.wait() + except Exception as exc: + if self._event_logger: + self._event_logger(f"[TONE] error: {exc}") + finally: + try: + if wav_path: + os.unlink(wav_path) + except Exception: + pass + + def _write_tone_wav(self, left_gain, right_gain, freq, duration): + frames = max(1, int(PROXIMITY_TONE_SAMPLE_RATE * duration)) + fade_frames = max(1, int(PROXIMITY_TONE_SAMPLE_RATE * 0.006)) + amp = int(32767 * self.volume) + fd, path = tempfile.mkstemp(prefix="blindnav_tone_", suffix=".wav") + os.close(fd) + with wave.open(path, "wb") as wav: + wav.setnchannels(2) + wav.setsampwidth(2) + wav.setframerate(PROXIMITY_TONE_SAMPLE_RATE) + buf = bytearray() + for i in range(frames): + phase = 2.0 * math.pi * freq * (i / PROXIMITY_TONE_SAMPLE_RATE) + env = 1.0 + if i < fade_frames: + env = i / fade_frames + elif i > frames - fade_frames: + env = max(0.0, (frames - i) / fade_frames) + sample = int(math.sin(phase) * amp * env) + left = int(sample * left_gain) + right = int(sample * right_gain) + buf.extend(left.to_bytes(2, "little", signed=True)) + buf.extend(right.to_bytes(2, "little", signed=True)) + wav.writeframes(bytes(buf)) + return path + + def shutdown(self, timeout=1.0): + with self._lock: + self._shutting_down = True + thread = self._thread + if thread is not None: + thread.join(timeout=max(0.0, timeout)) + + # ============= THREAT TRANSITION TRACKER ============= class ThreatTransitionTracker: MIN_THREAT_FRAMES = 12 @@ -2613,6 +2830,7 @@ def log_event(msg): motion = MotionDetector() voice = VoiceAssistant(event_logger=log_event) + tones = ProximityTonePlayer(event_logger=log_event) scene = SceneDescriber(voice, start_keyboard=not VOICE_INPUT_ENABLED) nav_snapshot = NavigationSnapshot() command_router = CommandRouter(voice, scene, nav_snapshot, event_logger=log_event) @@ -2711,6 +2929,12 @@ def _capture_worker(): f"clip_live_piper={'on' if CLIP_MODE_ALLOW_LIVE_PIPER else 'off'} | " f"silence={FAST_ALERT_SILENCE_MS if ALERT_TTS_MODE in {'espeak', 'clips'} else PIPER_SILENCE_MS}ms\n" ) + print( + f"[AUDIO] mode={BLINDNAV_AUDIO_MODE} | " + f"proximity_tones={'on' if tones.enabled else 'off'} | " + f"tone_max={_proximity_tone_max_distance(BLINDNAV_AUDIO_MODE)}cm | " + f"tone_volume={PROXIMITY_TONE_VOLUME:.2f}" + ) frame_count = 0 last_threat_print = time.time() @@ -2881,7 +3105,14 @@ def log_voice_policy(now, track, pos, score, motion_eval, reason, tier="NONE"): reason = decision["reason"] queue_tier = _queue_tier_for_voice_decision(tier, reason) - if queue_tier == "urgent": + if _tone_replaces_voice( + queue_tier, obj, reason, dist_cm, tones.enabled): + log_voice_policy( + now, track, pos, score, motion_eval, + reason="tone_replaces_obstacle_voice", + tier=(queue_tier or "NONE").upper(), + ) + elif queue_tier == "urgent": voice.speak_urgent( msg, key=_voice_key(pos, obj, "side_pass_urg" if reason == "side_pass_warning" @@ -2920,6 +3151,10 @@ def log_voice_policy(now, track, pos, score, motion_eval, reason, tier="NONE"): print(line) log_event(line) + tones.update( + threats, frame_width=w, frame_tag=frame_count, + voice_busy=voice.status_summary() != "idle") + # Cleanup stale zone keys if frame_count % 60 == 0: active_keys = set() @@ -3036,6 +3271,7 @@ def log_voice_policy(now, track, pos, score, motion_eval, reason, tier="NONE"): pipeline.stop() except Exception as exc: print(f"[CLEANUP] pipeline.stop(): {exc}") + tones.shutdown(timeout=1.0) voice.shutdown(timeout=6.0) csv_file.close() event_file.close() diff --git a/tests/test_blindnav_v326.py b/tests/test_blindnav_v326.py index 32f2e59..ec03b69 100644 --- a/tests/test_blindnav_v326.py +++ b/tests/test_blindnav_v326.py @@ -323,6 +323,14 @@ def test_side_pass_warning_is_queued_as_urgent(self): assert MOD._queue_tier_for_voice_decision( decision["tier"], decision["reason"]) == "urgent" + def test_tones_can_replace_non_person_voice_when_enabled(self): + assert MOD._tone_replaces_voice( + "warning", "chair", "ttc_warning", 120, tones_enabled=True) + assert not MOD._tone_replaces_voice( + "warning", "person", "ttc_warning", 120, tones_enabled=True) + assert not MOD._tone_replaces_voice( + "urgent", "chair", "ttc_urgent_neutral", 80, tones_enabled=True) + def test_busy_area_speech_is_off_by_default(self): assert MOD.SPEAK_BUSY_AREA is False @@ -1192,6 +1200,85 @@ def test_global_awareness_cooldown_blocks_rapid_awareness(self): assert "second awareness" not in spoken +# ============================================================ +# Proximity sonification +# ============================================================ + +class TestProximityTonePlayer: + + def _track(self, distance=100, class_name="person", seen_frames=3, + box=None, tid=1): + return types.SimpleNamespace( + id=tid, + class_name=class_name, + distance=distance, + seen_frames=seen_frames, + box=box or [0, 120, 80, 320], + ) + + def test_interval_is_exponential_and_faster_when_close(self): + far = MOD._proximity_tone_interval_s(190, "balanced") + near = MOD._proximity_tone_interval_s(50, "balanced") + assert far is not None and near is not None + assert near < far + + def test_balanced_mode_suppresses_far_tones(self): + assert MOD._proximity_tone_interval_s(260, "balanced") is None + assert MOD._proximity_tone_interval_s(260, "training") is not None + + def test_directional_gains_pan_left_and_right(self): + left = MOD._proximity_tone_gains("on your left") + right = MOD._proximity_tone_gains("on your right") + ahead = MOD._proximity_tone_gains("ahead") + assert left[0] > left[1] + assert right[1] > right[0] + assert ahead[0] == ahead[1] + + def test_tone_drops_when_voice_busy(self): + player = fast_player(0.01) + tones = MOD.ProximityTonePlayer( + enabled=True, + audio_mode="balanced", + allow_during_voice=False, + _player_fn=player, + ) + track = self._track(distance=90) + tones.update([(50, track)], frame_width=640, frame_tag=1, voice_busy=True) + time.sleep(0.05) + assert len(player.procs) == 0 + + def test_tone_can_play_for_side_person_while_voice_busy(self): + events = [] + player = fast_player(0.01) + tones = MOD.ProximityTonePlayer( + enabled=True, + audio_mode="balanced", + allow_during_voice=True, + event_logger=events.append, + _player_fn=player, + ) + track = self._track(distance=130, class_name="person") + tones.update([(50, track)], frame_width=640, frame_tag=1, voice_busy=True) + tones.shutdown(timeout=1.0) + assert len(player.procs) == 1 + assert any("voice_busy=1" in msg for msg in events) + + def test_tone_plays_for_near_actionable_track(self): + events = [] + player = fast_player(0.01) + tones = MOD.ProximityTonePlayer( + enabled=True, + audio_mode="balanced", + event_logger=events.append, + _player_fn=player, + ) + track = self._track(distance=90) + tones.update([(50, track)], frame_width=640, frame_tag=1, voice_busy=False) + tones.shutdown(timeout=1.0) + assert len(player.procs) == 1 + assert any("[TONE]" in msg and "dist=90cm" in msg for msg in events) + + # ============================================================ # Skip-ahead before playback (FIX 8 - no player termination) # ============================================================ diff --git a/tools/run_tts_clips_tones.sh b/tools/run_tts_clips_tones.sh new file mode 100755 index 0000000..96f77ad --- /dev/null +++ b/tools/run_tts_clips_tones.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Follow-on field test: pre-generated speech clips plus local proximity tones. +# Use this only after testing PR #17's plain clip mode. +cd "$(dirname "$0")/.." + +export BLINDNAV_ALERT_TTS=clips +export BLINDNAV_ALERT_CLIP_DIR="${BLINDNAV_ALERT_CLIP_DIR:-$HOME/blindnav_alert_clips}" +export BLINDNAV_CLIP_MODE_ALLOW_LIVE_PIPER="${BLINDNAV_CLIP_MODE_ALLOW_LIVE_PIPER:-0}" +export BLINDNAV_AUDIO_MODE="${BLINDNAV_AUDIO_MODE:-balanced}" +export BLINDNAV_PROXIMITY_TONES=1 +export BLINDNAV_TONE_VOLUME="${BLINDNAV_TONE_VOLUME:-0.35}" + +python3 raspberry_pi/yolo_realsense_navigation.py