Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class BaseVideoNode : public rclcpp::Node {
// Returns new ref; caller must unref.
GstElement *get_element(const std::string &name) const;

// Safely unref a GStreamer element.
static void safe_gst_unref(GstElement *&elem);

// GStreamer pipeline owned by this node.
GstElement *pipeline_{nullptr};

Expand Down
25 changes: 23 additions & 2 deletions src/Cameras/video_streaming/include/video_streaming/srt_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/empty.hpp" // I-frame
#include "std_msgs/msg/int32.hpp" // bitrate
#include <chrono> // timers
#include <gst/gst.h>
#include <interfaces/msg/srt_stats.hpp>

Expand All @@ -11,14 +12,19 @@ namespace video_streaming {
class SrtNode : public BaseVideoNode {
public:
explicit SrtNode(const rclcpp::NodeOptions &options = rclcpp::NodeOptions());
~SrtNode() override = default;
~SrtNode() override;

static constexpr int INITIAL_BACKOFF_MS = 200;
static constexpr int MAX_BACKOFF_MS = 5000;
static constexpr int BACKOFF_RESET_MS = 10000;

protected:
// (interpipesrc -> nvvidconv -> nvv4l2av1enc -> srtsink)
bool create_pipeline() override;

GstElement *av1_encoder_ = nullptr;
GstElement *srt_sink_ = nullptr;
GstElement *framerate_caps_ = nullptr;

// bitrate
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr bitrate_sub_;
Expand All @@ -40,8 +46,23 @@ class SrtNode : public BaseVideoNode {

void publish_srt_stats();

void change_framerate(int new_fps);

rcl_interfaces::msg::SetParametersResult
on_parameter_change(const std::vector<rclcpp::Parameter> &parameters);

// Backoff mechanism to avoid excessive I-frame requests
void check_packet_loss_and_trigger(int64_t current_total_dropped);

struct BackoffState {
int current_delay_ms = INITIAL_BACKOFF_MS;
int max_delay_ms = MAX_BACKOFF_MS;
int reset_ms = BACKOFF_RESET_MS;

std::chrono::steady_clock::time_point last_trigger_time;
std::chrono::steady_clock::time_point last_loss_time;
int64_t last_total_dropped_pkts = 0;
} backoff_state_;
};

} // namespace video_streaming
} // namespace video_streaming
9 changes: 8 additions & 1 deletion src/Cameras/video_streaming/src/base_video_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,11 @@ bool BaseVideoNode::resume_pipeline() {
RCLCPP_INFO(get_logger(), "BaseVideoNode: pipeline resumed.");
}
return true;
}
}

void BaseVideoNode::safe_gst_unref(GstElement *&elem) {
if (elem) {
gst_object_unref(elem);
elem = nullptr;
}
}
2 changes: 1 addition & 1 deletion src/Cameras/video_streaming/src/detect_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ DetectNode::DetectNode(const rclcpp::NodeOptions &options)
bool DetectNode::create_pipeline() {

std::stringstream desc_ss;
desc_ss << "interpipesrc listen-to="
desc_ss << "interpipesrc format=3 listen-to="
<< this->get_parameter("listen_to").as_string()
<< " is-live=true "
"allow-renegotiation=true name=src ! ";
Expand Down
202 changes: 182 additions & 20 deletions src/Cameras/video_streaming/src/srt_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,31 @@
#include <gst/gststructure.h>
#include <gst/video/video-event.h>
#include <rclcpp_components/register_node_macro.hpp>

namespace video_streaming {

SrtNode::SrtNode(const rclcpp::NodeOptions &options)
: BaseVideoNode("srt_node", options) {
RCLCPP_INFO(this->get_logger(), "Initializing SrtNode...");

// Set last trigger time to 1 hour ago so the first error triggers immediately
backoff_state_.last_trigger_time =
std::chrono::steady_clock::now() - std::chrono::hours(1);
backoff_state_.last_loss_time = std::chrono::steady_clock::now();
backoff_state_.last_total_dropped_pkts = 0;
backoff_state_.current_delay_ms = INITIAL_BACKOFF_MS;

this->declare_parameter<std::string>("srt_uri", "srt://:7001");
this->declare_parameter<int>("latency", 100);
this->declare_parameter<int>("iframe_interval", 0);
this->declare_parameter<bool>("test_mode", false);
this->declare_parameter<double>("stats_frequency", 1.0);
this->declare_parameter<int>("target_framerate", 30);
this->declare_parameter<int>("backoff_max_delay_ms", 5000);
this->declare_parameter<int>("backoff_reset_ms", 10000);

backoff_state_.max_delay_ms =
this->get_parameter("backoff_max_delay_ms").as_int();
backoff_state_.reset_ms = this->get_parameter("backoff_reset_ms").as_int();

param_callback_handle_ = this->add_on_set_parameters_callback(
std::bind(&SrtNode::on_parameter_change, this, std::placeholders::_1));
Expand Down Expand Up @@ -53,37 +66,53 @@ SrtNode::SrtNode(const rclcpp::NodeOptions &options)
RCLCPP_INFO(get_logger(), "SrtNode constructed and pipeline started.");
}

SrtNode::~SrtNode() {
BaseVideoNode::safe_gst_unref(framerate_caps_);
BaseVideoNode::safe_gst_unref(av1_encoder_);
BaseVideoNode::safe_gst_unref(srt_sink_);
}

bool SrtNode::create_pipeline() {
std::string srt_uri = this->get_parameter("srt_uri").as_string();
int latency_val = this->get_parameter("latency").as_int();
int iframe_interval_val = this->get_parameter("iframe_interval").as_int();

bool test_mode = this->get_parameter("test_mode").as_bool();

int framerate = this->get_parameter("target_framerate").as_int();

gchar *desc;

if (test_mode) {
desc = g_strdup_printf(
"videotestsrc is-live=true ! video/x-raw,width=640,height=480 ! "
"videotestsrc is-live=true ! "
"videorate ! "
"capsfilter name=framerate_caps "
"caps=video/x-raw,framerate=%d/1 ! "
"videoconvert ! "
"x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast "
"name=av1_enc ! "
"rtph264pay ! queue ! "
"srtsink name=srt_sink uri=%s latency=%d",
srt_uri.c_str(), latency_val);
framerate, srt_uri.c_str(), latency_val);

RCLCPP_WARN(this->get_logger(), "Using MAC TEST pipeline description: %s",
desc);

} else {
desc = g_strdup_printf(
"interpipesrc listen-to=detect is-live=true ! "

"interpipesrc format=3 listen-to=detect is-live=true ! "
"nvvidconv ! videorate ! "
"capsfilter name=framerate_caps "
"caps=video/x-raw(memory:NVMM),framerate=%d/1 ! "
"nvvidconv ! "
"nvv4l2av1enc name=av1_enc insert-seq-hdr=true iframeinterval=%d ! "
" av1parse ! capsfilter caps=\"video/x-av1, "
"alignment=obu, parsed=true\" ! "
"queue ! "
"srtsink name=srt_sink uri=%s latency=%d sync=false",
iframe_interval_val, srt_uri.c_str(), latency_val);
framerate, iframe_interval_val, srt_uri.c_str(), latency_val);
RCLCPP_INFO(this->get_logger(), "Using PRODUCTION pipeline description: %s",
desc);
}
Expand All @@ -110,8 +139,9 @@ bool SrtNode::create_pipeline() {

av1_encoder_ = this->get_element("av1_enc");
srt_sink_ = this->get_element("srt_sink");
framerate_caps_ = get_element("framerate_caps");

if (!av1_encoder_ || !srt_sink_) {
if (!av1_encoder_ || !srt_sink_ || !framerate_caps_) {
RCLCPP_ERROR(get_logger(), "Failed to get elements by name");
return false;
}
Expand All @@ -120,6 +150,27 @@ bool SrtNode::create_pipeline() {
"Pipeline parsed and elements retrieved successfully.");
return true;
}
void SrtNode::change_framerate(int new_fps) {
if (!pipeline_) {
RCLCPP_WARN(this->get_logger(),
"Pipeline not initialized, cannot change framerate.");
return;
}

if (!framerate_caps_) {
RCLCPP_WARN(this->get_logger(),
"framerate_caps_ is null! Check create_pipeline.");
return;
}

GstCaps *caps = gst_caps_new_simple("video/x-raw", "framerate",
GST_TYPE_FRACTION, new_fps, 1, NULL);

g_object_set(framerate_caps_, "caps", caps, NULL);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might have to set the pipeline to paused before changing this but we can test that tommorrow.

gst_caps_unref(caps);

RCLCPP_INFO(this->get_logger(), "Framerate changed to %d fps", new_fps);
}

rcl_interfaces::msg::SetParametersResult
SrtNode::on_parameter_change(const std::vector<rclcpp::Parameter> &parameters) {
Expand All @@ -128,18 +179,65 @@ SrtNode::on_parameter_change(const std::vector<rclcpp::Parameter> &parameters) {
result.reason = "success";

for (const auto &param : parameters) {
if (param.get_name() == "latency" && srt_sink_) {
int val = param.as_int();
g_object_set(srt_sink_, "latency", val, NULL);
RCLCPP_INFO(this->get_logger(), "Param Update: Latency set to %d", val);
} else if (param.get_name() == "iframe_interval" && av1_encoder_) {
const std::string &name = param.get_name();

if (name == "latency") {
if (srt_sink_) {
int val = param.as_int();
g_object_set(srt_sink_, "latency", val, NULL);
RCLCPP_INFO(this->get_logger(), "Param Update: Latency set to %d", val);
}
continue;
}

if (name == "iframe_interval") {
if (av1_encoder_) {
int val = param.as_int();
g_object_set(av1_encoder_, "iframeinterval", val, NULL);
RCLCPP_INFO(this->get_logger(),
"Param Update: IFrame Interval set to %d", val);
}
continue;
}

if (name == "target_framerate") {
if (!stop_pipeline()) {
RCLCPP_ERROR(this->get_logger(),
"Failed to stop pipeline for reconfiguration.");
result.successful = false;
result.reason = "Failed to stop pipeline for reconfiguration.";
continue;
}
if (!start_pipeline()) {
RCLCPP_ERROR(this->get_logger(),
"Failed to start pipeline after reconfiguration.");
result.successful = false;
result.reason = "Failed to start pipeline after reconfiguration.";
}
continue;
}

if (name == "backoff_max_delay_ms") {
int val = param.as_int();
if (val > 0) {
backoff_state_.max_delay_ms = val;
RCLCPP_INFO(this->get_logger(),
"Param Update: Backoff Max Delay set to %d ms", val);
}
continue;
}

g_object_set(av1_encoder_, "iframeinterval", val, NULL);
RCLCPP_INFO(this->get_logger(), "Param Update: IFrame Interval set to %d",
val);
if (name == "backoff_reset_ms") {
int val = param.as_int();
if (val > 0) {
backoff_state_.reset_ms = val;
RCLCPP_INFO(this->get_logger(),
"Param Update: Backoff Reset Time set to %d ms", val);
}
continue;
}
}

return result;
}

Expand All @@ -154,16 +252,72 @@ void SrtNode::on_bitrate_received(const std_msgs::msg::Int32::SharedPtr msg) {

void SrtNode::on_iframe_trigger(const std_msgs::msg::Empty::SharedPtr msg) {
(void)msg;
if (av1_encoder_) {
GstEvent *event = gst_video_event_new_downstream_force_key_unit(
GST_CLOCK_TIME_NONE, GST_CLOCK_TIME_NONE, GST_CLOCK_TIME_NONE, TRUE, 0);

if (gst_element_send_event(av1_encoder_, event)) {
RCLCPP_INFO(this->get_logger(), "I-Frame triggered (Event sent)");
if (srt_sink_) {

GstEvent *event = gst_video_event_new_upstream_force_key_unit(
GST_CLOCK_TIME_NONE, TRUE, 0);

if (gst_element_send_event(srt_sink_, event)) {
RCLCPP_INFO(this->get_logger(),
"Manual I-Frame triggered (Upstream Event sent)");
} else {
RCLCPP_WARN(this->get_logger(), "Failed to trigger I-Frame");
RCLCPP_WARN(this->get_logger(),
"Failed to trigger Manual I-Frame (Event rejected)");
}
} else {
RCLCPP_WARN(this->get_logger(), "Cannot trigger I-Frame: SRT Sink is null");
}
}

void SrtNode::check_packet_loss_and_trigger(int64_t current_total_dropped) {
auto now = std::chrono::steady_clock::now();

if (current_total_dropped <= backoff_state_.last_total_dropped_pkts) {
auto time_since_loss =
std::chrono::duration_cast<std::chrono::milliseconds>(
now - backoff_state_.last_loss_time)
.count();

if (time_since_loss > backoff_state_.reset_ms &&
backoff_state_.current_delay_ms != INITIAL_BACKOFF_MS) {
RCLCPP_DEBUG(this->get_logger(),
"SRT connection stable. Resetting backoff.");
}
backoff_state_.current_delay_ms = INITIAL_BACKOFF_MS;
return;
}

backoff_state_.last_total_dropped_pkts = current_total_dropped;
backoff_state_.last_loss_time = now;

auto time_since_trigger =
std::chrono::duration_cast<std::chrono::milliseconds>(
now - backoff_state_.last_trigger_time)
.count();

if (time_since_trigger < backoff_state_.current_delay_ms) {
RCLCPP_DEBUG(this->get_logger(),
"SRT Packet Drop Detected (%ld total). Suppressing I-Frame "
"request (Backoff in effect: %d ms)",
time_since_trigger, backoff_state_.current_delay_ms);
return;
}

RCLCPP_WARN(this->get_logger(),
"SRT Packet Drop Detected (%ld total). Requesting I-Frame! (Next "
"backoff: %d ms)",
current_total_dropped, backoff_state_.current_delay_ms * 2);

GstEvent *event =
gst_video_event_new_upstream_force_key_unit(GST_CLOCK_TIME_NONE, TRUE, 0);
if (srt_sink_) {
gst_element_send_event(srt_sink_, event);
}

backoff_state_.last_trigger_time = now;
backoff_state_.current_delay_ms = std::min(
backoff_state_.current_delay_ms * 2, backoff_state_.max_delay_ms);
}

void SrtNode::publish_srt_stats() {
Expand Down Expand Up @@ -226,6 +380,14 @@ void SrtNode::publish_srt_stats() {
msg.packets_retransmitted = ret_int;
}

int64_t pkt_drop_total = 0;

if (gst_structure_get_int64(target_stats, "pkt-drop-total",
&pkt_drop_total)) {
msg.packet_drop_total = pkt_drop_total;
check_packet_loss_and_trigger(pkt_drop_total);
}

srt_stats_pub_->publish(msg);
gst_structure_free(stats);
}
Expand Down
1 change: 1 addition & 0 deletions src/interfaces/msg/SrtStats.msg
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ float64 bandwidth # Estimated bandwidth (in bits/sec)
int64 packets_sent
int64 packets_lost
int64 packets_retransmitted
int64 packet_drop_total