Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 4 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,6 @@ The following attributes are available for the `viam:audio:microphone` model:
| `latency` | int | **Optional** | Suggested input latency in milliseconds. This controls how much audio PortAudio buffers before making it available. Lower values (5-20ms) provide more responsive audio capture but use more CPU time. Higher values (50-100ms) are more stable but less responsive. If not specified, uses the device's default low latency setting (typically 10-20ms). |
| `historical_throttle_ms` | int | **Optional** | Delay in milliseconds between chunks when streaming historical audio data using the previous_timestamp parameter (default: 50ms). Gives clients adequate time to process buffered audio data. |

### Reconfiguration Behavior

The microphone component supports reconfiguration - you can change stream attributes without restarting the audio stream RPC calls. When you reconfigure:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a short note here explaining the new "reconfigure" contract?

Any config change will terminate in-flight get_audio streams. Client needs to handle error and resubmit a get_audio request


- Active `get_audio()` calls will automatically transition to the new configuration
- There may be a brief gap in audio during the transition

#### Important Considerations

1. **Writing to fixed-format files (WAV, MP3, etc.)**
- WAV files have a fixed header with sample rate and channel count
- Changing `sample_rate` or `num_channels` mid-stream will corrupt the file
- **Solution:** Stop recording, save the file, then reconfigure and start a new file

2. **During active audio encoding**
- If you're encoding the streamed audio (e.g., to OPUS, AAC), changing `sample_rate` or `num_channels` will break the initialized encoder
- **Solution:** Reinitialize the encoder when reconfigurations occur

**No client-side handling required:**
- When streaming audio chunks that are processed independently
- Changing `device_name` to switch microphones
- Adjusting `latency` for performance tuning
- Between `get_audio` RPC calls

**Clients should:**
- Monitor the `audio_info` field in each audio chunk
- Detect when `sample_rate` or `num_channels` changes
- Handle the transition appropriately

## Model viam:audio:speaker
### Configuration
Expand Down Expand Up @@ -157,6 +129,10 @@ All audio data uses **little-endian** byte order. The specific format depends on
- **Microphone (`get_audio`)**: Returns audio data in interleaved format
- **Speaker (`play`)**: Expects audio data in interleaved format

## Reconfigure Behavior

Any config change terminates in-flight streams. Callers must handle the error
and resubmit the request to resume.

## Setup
```bash
Expand Down
1 change: 0 additions & 1 deletion src/discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <iostream>
#include <sstream>
#include <viam/sdk/config/resource.hpp>
#include <viam/sdk/resource/reconfigurable.hpp>
#include <viam/sdk/services/discovery.hpp>
#include "microphone.hpp"
#include "portaudio.hpp"
Expand Down
1 change: 0 additions & 1 deletion src/discovery.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include <memory>

#include <viam/sdk/config/resource.hpp>
#include <viam/sdk/resource/reconfigurable.hpp>
#include <viam/sdk/services/discovery.hpp>
#include "device_id.hpp"
#include "portaudio.hpp"
Expand Down
111 changes: 8 additions & 103 deletions src/microphone.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,6 @@ static int calculate_chunk_size(const audio::codec::AudioCodec codec,
}
}

// RAII guard to automatically increment and decrement the stream counter
// during get_audio calls
class StreamGuard {
std::mutex& mutex_;
int& counter_;

public:
StreamGuard(std::mutex& m, int& c) : mutex_(m), counter_(c) {
std::lock_guard<std::mutex> lock(mutex_);
counter_++;
}
~StreamGuard() {
std::lock_guard<std::mutex> lock(mutex_);
counter_--;
}
};

// === Microphone Class Implementation ===

void Microphone::restart_stalled_stream(const std::shared_ptr<audio::InputStreamContext>& stream_context) {
Expand Down Expand Up @@ -137,7 +120,6 @@ void Microphone::restart_stalled_stream(const std::shared_ptr<audio::InputStream

void Microphone::setup_stream_params(AudioCodec codec_enum,
MP3EncoderContext& mp3_ctx,
bool is_reconfigure,
int& stream_sample_rate,
int& requested_sample_rate,
int& stream_num_channels,
Expand All @@ -153,11 +135,7 @@ void Microphone::setup_stream_params(AudioCodec codec_enum,
stream_historical_throttle_ms = historical_throttle_ms_;
}

// Initialize or reinitialize MP3 encoder if needed
if (codec_enum == AudioCodec::MP3) {
if (is_reconfigure) {
cleanup_mp3_encoder(mp3_ctx);
}
initialize_mp3_encoder(mp3_ctx, requested_sample_rate, stream_num_channels);
}

Expand All @@ -182,7 +160,7 @@ void Microphone::setup_stream_params(AudioCodec codec_enum,
}

Microphone::Microphone(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig cfg, audio::portaudio::PortAudioInterface* pa)
: viam::sdk::AudioIn(cfg.name()), stream_(nullptr), pa_(pa), active_streams_(0), restart_attempts_(0) {
: viam::sdk::AudioIn(cfg.name()), stream_(nullptr), pa_(pa), restart_attempts_(0) {
#ifdef __APPLE__
if (geteuid() == 0) {
std::ostringstream error_msg;
Expand Down Expand Up @@ -311,57 +289,6 @@ std::vector<std::string> Microphone::validate(viam::sdk::ResourceConfig cfg) {
return {};
}

void Microphone::reconfigure(const viam::sdk::Dependencies& deps, const viam::sdk::ResourceConfig& cfg) {
VIAM_SDK_LOG(info) << "[reconfigure] Microphone reconfigure start";

try {
// Warn if reconfiguring with active streams
// Changing the sample rate or number of channels mid stream
// might cause issues client side, clients need to be actively
// checking the audioinfo for changes. Changing these parameters
// may also cause a small gap in audio.
{
std::lock_guard<std::mutex> lock(stream_ctx_mu_);
if (active_streams_ > 0) {
VIAM_SDK_LOG(info) << "[reconfigure] Reconfiguring with " << active_streams_
<< " active stream(s). See README for reconfiguration considerations.";
}
}

// Close the existing stream before setting up audio device — Pa_IsFormatSupported internally tries to
// open the device, which fails if it's already in use.
{
std::lock_guard<std::mutex> lock(stream_ctx_mu_);
if (stream_) {
audio::utils::shutdown_stream(stream_, pa_);
stream_ = nullptr;
}
}

auto setup =
audio::utils::setup_audio_device<audio::InputStreamContext>(cfg, audio::utils::StreamDirection::Input, AudioCallback, pa_);

// Set new configuration and restart stream under lock
{
std::lock_guard<std::mutex> lock(stream_ctx_mu_);

stream_params_ = setup.stream_params;
stream_params_.user_data = setup.audio_context.get();
device_id_ = setup.config_params.device_id;
audio::utils::restart_stream(stream_, stream_params_, pa_);
audio_context_ = setup.audio_context;
restart_attempts_ = 0;
requested_sample_rate_ = setup.config_params.sample_rate.value_or(
setup.stream_params.sample_rate); // User's requested rate, defaults to device rate
historical_throttle_ms_ = setup.config_params.historical_throttle_ms.value_or(DEFAULT_HISTORICAL_THROTTLE_MS);
}
VIAM_SDK_LOG(info) << "[reconfigure] Reconfigure completed successfully";
} catch (const std::exception& e) {
VIAM_SDK_LOG(error) << "[reconfigure] Reconfigure failed: " << e.what();
throw;
}
}

viam::sdk::ProtoStruct Microphone::do_command(const viam::sdk::ProtoStruct& command) {
VIAM_SDK_LOG(error) << "do_command not implemented";
return viam::sdk::ProtoStruct();
Expand All @@ -377,9 +304,6 @@ void Microphone::get_audio(std::string const& codec,
// Parse codec string to enum
const AudioCodec codec_enum = audio::codec::parse_codec(codec);

// guard to increment and decrement the active stream count
StreamGuard stream_guard(stream_ctx_mu_, active_streams_);

// Track audio duration using timestamps
int64_t first_chunk_start_timestamp_ns = 0;
bool duration_limit_set = false;
Expand Down Expand Up @@ -419,7 +343,6 @@ void Microphone::get_audio(std::string const& codec,
// Setup initial stream parameters and initialize encoder
setup_stream_params(codec_enum,
mp3_ctx,
false,
stream_sample_rate,
requested_sample_rate,
stream_num_channels,
Expand All @@ -428,43 +351,25 @@ void Microphone::get_audio(std::string const& codec,
device_samples_per_chunk);

while (true) {
// Check if audio_context_ changed
bool context_changed = false;
PaStream* current_stream = nullptr;
{
std::lock_guard<std::mutex> lock(stream_ctx_mu_);

// Detect context change (device reconfigured or stream restarted)
// The watchdog may have restarted a stalled stream, which swaps in a
// fresh audio_context_ (same format, fresh circular buffer). Skip any
// stale data from the old buffer and reset diagnostic counters.
if (audio_context_ != stream_context) {
if (stream_context != nullptr) {
VIAM_SDK_LOG(info) << "Detected stream change";
context_changed = true;
}
// Switch to new context and reset read position
VIAM_SDK_LOG(info) << "Detected stream change";
stream_context = audio_context_;
read_position = stream_context->get_write_position();
restart_attempts_ = 0;
// Brief gap in audio, but stream continues
last_logged_overflow_count = 0;
last_logged_underflow_count = 0;
last_staleness_log_ns = 0;
}
current_stream = stream_;
}

// Reconfigure stream parameters if context changed
if (context_changed) {
setup_stream_params(codec_enum,
mp3_ctx,
true,
stream_sample_rate,
requested_sample_rate,
stream_num_channels,
stream_historical_throttle_ms,
samples_per_chunk,
device_samples_per_chunk);
last_logged_overflow_count = 0;
last_logged_underflow_count = 0;
last_staleness_log_ns = 0;
}

// Check if we have enough samples for a full chunk
const uint64_t write_pos = stream_context->get_write_position();
const uint64_t available_samples = write_pos - read_position;
Expand Down
9 changes: 2 additions & 7 deletions src/microphone.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <viam/sdk/common/audio.hpp>
#include <viam/sdk/components/audio_in.hpp>
#include <viam/sdk/config/resource.hpp>
#include <viam/sdk/resource/reconfigurable.hpp>
#include "audio_codec.hpp"
#include "audio_stream.hpp"
#include "audio_utils.hpp"
Expand All @@ -33,7 +32,7 @@ PaDeviceIndex findDeviceByName(const std::string& name, const audio::portaudio::
// Returns the write position if previous_timestamp == 0 (default: most recent audio)
uint64_t get_initial_read_position(const std::shared_ptr<audio::InputStreamContext>& stream_context, int64_t previous_timestamp);

class Microphone final : public viam::sdk::AudioIn, public viam::sdk::Reconfigurable {
class Microphone final : public viam::sdk::AudioIn {
public:
Microphone(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig cfg, audio::portaudio::PortAudioInterface* pa = nullptr);

Expand All @@ -51,15 +50,13 @@ class Microphone final : public viam::sdk::AudioIn, public viam::sdk::Reconfigur

viam::sdk::audio_properties get_properties(const viam::sdk::ProtoStruct& extra);
std::vector<viam::sdk::GeometryConfig> get_geometries(const viam::sdk::ProtoStruct& extra);
void reconfigure(const viam::sdk::Dependencies& deps, const viam::sdk::ResourceConfig& cfg);

// Restarts the stream.
// Must NOT be called while holding stream_ctx_mu_.
void restart_stalled_stream(const std::shared_ptr<audio::InputStreamContext>& stream_context);

void setup_stream_params(audio::codec::AudioCodec codec_enum,
MP3EncoderContext& mp3_ctx,
bool is_reconfigure,
int& stream_sample_rate,
int& requested_sample_rate,
int& stream_num_channels,
Expand All @@ -72,14 +69,12 @@ class Microphone final : public viam::sdk::AudioIn, public viam::sdk::Reconfigur
int historical_throttle_ms_; // Throttle time for historical data stream
static vsdk::Model model;

// The mutex protects the stream, context, and the active streams counter
// The mutex protects the stream and context
std::mutex stream_ctx_mu_;
PaStream* stream_;
std::shared_ptr<audio::InputStreamContext> audio_context_;
// This is null in production and used for testing to inject the mock portaudio functions
const audio::portaudio::PortAudioInterface* pa_;
// Count of active get_audio calls
int active_streams_;
int restart_attempts_;

audio::utils::StreamParams stream_params_;
Expand Down
1 change: 0 additions & 1 deletion src/mp3_encoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <cstdint>
#include <vector>
#include <viam/sdk/config/resource.hpp>
#include <viam/sdk/resource/reconfigurable.hpp>
#include "audio_utils.hpp"

namespace microphone {
Expand Down
62 changes: 6 additions & 56 deletions src/speaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ vsdk::Model Speaker::model = {"viam", "system-audio", "speaker"};
* Tear down the existing stream and bring up a fresh one with the saved params.
*
* Bails out if `playback_context` is no longer the active audio_context_ — that means
* either reconfigure() or another stall recovery already replaced it, so the in-flight
* play() call (if any) is about to exit on its own and we shouldn't race.
* another stall recovery already replaced it, so the in-flight play() call (if any)
* is about to exit on its own and we shouldn't race.
*
* On a successful restart, any unplayed audio in the old buffer is discarded — same
* behavior as reconfigure(). The in-flight play() loop will see audio_context_ change
* and return early.
* On a successful restart, any unplayed audio in the old buffer is discarded. The
* in-flight play() loop will see audio_context_ change and return early.
*/
void Speaker::restart_stalled_stream(const std::shared_ptr<audio::OutputStreamContext>& playback_context) {
std::lock_guard<std::mutex> lock(stream_mu_);
Expand Down Expand Up @@ -447,12 +446,12 @@ void Speaker::play(std::vector<uint8_t> const& audio_data,
VIAM_SDK_LOG(debug) << "Playback stopped by stop command";
return;
}
// Check if context changed (reconfigure happened)
// Check if context changed (stream restarted by watchdog)
PaStream* current_stream = nullptr;
{
std::lock_guard<std::mutex> lock(stream_mu_);
if (audio_context_ != playback_context) {
VIAM_SDK_LOG(debug) << "Audio playback interrupted by reconfigure, exiting";
VIAM_SDK_LOG(debug) << "Audio playback interrupted by stream restart, exiting";
return;
}
current_stream = stream_;
Expand Down Expand Up @@ -504,53 +503,4 @@ std::vector<viam::sdk::GeometryConfig> Speaker::get_geometries(const viam::sdk::
throw std::runtime_error("get_geometries is unimplemented");
}

void Speaker::reconfigure(const vsdk::Dependencies& deps, const vsdk::ResourceConfig& cfg) {
VIAM_SDK_LOG(info) << "[reconfigure] Speaker reconfigure start";

try {
// Check if there's unplayed audio before reconfiguring
{
std::lock_guard<std::mutex> lock(stream_mu_);
if (audio_context_) {
const uint64_t write_pos = audio_context_->get_write_position();
const uint64_t playback_pos = audio_context_->playback_position.load();

if (write_pos > playback_pos) {
const uint64_t unplayed_samples = write_pos - playback_pos;
const double unplayed_seconds =
static_cast<double>(unplayed_samples) / (audio_context_->info.sample_rate_hz * audio_context_->info.num_channels);
VIAM_SDK_LOG(warn) << "[reconfigure] Discarding " << unplayed_seconds << " seconds of unplayed audio";
}
}
}

auto setup = audio::utils::setup_audio_device<audio::OutputStreamContext>(
cfg, audio::utils::StreamDirection::Output, speakerCallback, pa_, audio::BUFFER_DURATION_SECONDS);

// Set new configuration and restart stream under lock
{
std::lock_guard<std::mutex> lock(stream_mu_);

// Stop the stream first efore replacing audio_context_
// Otherwise the callback thread may still be accessing the old context
// after we destroy it (heap-use-after-free)
setup.stream_params.user_data = setup.audio_context.get();
stream_params_ = setup.stream_params;
audio::utils::restart_stream(stream_, stream_params_, pa_);
latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_);
audio_context_ = setup.audio_context;
device_id_ = setup.config_params.device_id;
restart_attempts_ = 0;
volume_ = setup.config_params.volume;
if (volume_) {
audio::volume::set_volume(stream_params_.device_name, *volume_);
}
}
VIAM_SDK_LOG(info) << "[reconfigure] Reconfigure completed successfully";
} catch (const std::exception& e) {
VIAM_SDK_LOG(error) << "[reconfigure] Reconfigure failed: " << e.what();
throw;
}
}

} // namespace speaker
Loading
Loading