diff --git a/src/Cameras/video_streaming/include/video_streaming/base_video_node.hpp b/src/Cameras/video_streaming/include/video_streaming/base_video_node.hpp index 8cb1f3f3..11d296f5 100644 --- a/src/Cameras/video_streaming/include/video_streaming/base_video_node.hpp +++ b/src/Cameras/video_streaming/include/video_streaming/base_video_node.hpp @@ -22,6 +22,9 @@ class BaseVideoNode : public rclcpp::Node { // Returns new ref; caller must unref. GstElement *get_element(const std::string &name) const; + // Safely unref a GStreamer element. + static void safe_gst_unref(GstElement *&elem); + // GStreamer pipeline owned by this node. GstElement *pipeline_{nullptr}; diff --git a/src/Cameras/video_streaming/include/video_streaming/srt_node.hpp b/src/Cameras/video_streaming/include/video_streaming/srt_node.hpp index 85342d8d..947e0416 100644 --- a/src/Cameras/video_streaming/include/video_streaming/srt_node.hpp +++ b/src/Cameras/video_streaming/include/video_streaming/srt_node.hpp @@ -3,6 +3,7 @@ #include "rclcpp/rclcpp.hpp" #include "std_msgs/msg/empty.hpp" // I-frame #include "std_msgs/msg/int32.hpp" // bitrate +#include // timers #include #include @@ -11,7 +12,11 @@ namespace video_streaming { class SrtNode : public BaseVideoNode { public: explicit SrtNode(const rclcpp::NodeOptions &options = rclcpp::NodeOptions()); - ~SrtNode() override = default; + ~SrtNode() override; + + static constexpr int INITIAL_BACKOFF_MS = 200; + static constexpr int MAX_BACKOFF_MS = 5000; + static constexpr int BACKOFF_RESET_MS = 10000; protected: // (interpipesrc -> nvvidconv -> nvv4l2av1enc -> srtsink) @@ -19,6 +24,7 @@ class SrtNode : public BaseVideoNode { GstElement *av1_encoder_ = nullptr; GstElement *srt_sink_ = nullptr; + GstElement *framerate_caps_ = nullptr; // bitrate rclcpp::Subscription::SharedPtr bitrate_sub_; @@ -40,8 +46,23 @@ class SrtNode : public BaseVideoNode { void publish_srt_stats(); + void change_framerate(int new_fps); + rcl_interfaces::msg::SetParametersResult on_parameter_change(const std::vector ¶meters); + + // Backoff mechanism to avoid excessive I-frame requests + void check_packet_loss_and_trigger(int64_t current_total_dropped); + + struct BackoffState { + int current_delay_ms = INITIAL_BACKOFF_MS; + int max_delay_ms = MAX_BACKOFF_MS; + int reset_ms = BACKOFF_RESET_MS; + + std::chrono::steady_clock::time_point last_trigger_time; + std::chrono::steady_clock::time_point last_loss_time; + int64_t last_total_dropped_pkts = 0; + } backoff_state_; }; -} // namespace video_streaming \ No newline at end of file +} // namespace video_streaming diff --git a/src/Cameras/video_streaming/src/base_video_node.cpp b/src/Cameras/video_streaming/src/base_video_node.cpp index f94f7dc1..01e9a1b5 100644 --- a/src/Cameras/video_streaming/src/base_video_node.cpp +++ b/src/Cameras/video_streaming/src/base_video_node.cpp @@ -106,4 +106,11 @@ bool BaseVideoNode::resume_pipeline() { RCLCPP_INFO(get_logger(), "BaseVideoNode: pipeline resumed."); } return true; -} \ No newline at end of file +} + +void BaseVideoNode::safe_gst_unref(GstElement *&elem) { + if (elem) { + gst_object_unref(elem); + elem = nullptr; + } +} diff --git a/src/Cameras/video_streaming/src/detect_node.cpp b/src/Cameras/video_streaming/src/detect_node.cpp index 1b89f448..244b5233 100644 --- a/src/Cameras/video_streaming/src/detect_node.cpp +++ b/src/Cameras/video_streaming/src/detect_node.cpp @@ -26,7 +26,7 @@ DetectNode::DetectNode(const rclcpp::NodeOptions &options) bool DetectNode::create_pipeline() { std::stringstream desc_ss; - desc_ss << "interpipesrc listen-to=" + desc_ss << "interpipesrc format=3 listen-to=" << this->get_parameter("listen_to").as_string() << " is-live=true " "allow-renegotiation=true name=src ! "; diff --git a/src/Cameras/video_streaming/src/srt_node.cpp b/src/Cameras/video_streaming/src/srt_node.cpp index 147db96d..a3dd5780 100644 --- a/src/Cameras/video_streaming/src/srt_node.cpp +++ b/src/Cameras/video_streaming/src/srt_node.cpp @@ -6,18 +6,31 @@ #include #include #include - namespace video_streaming { SrtNode::SrtNode(const rclcpp::NodeOptions &options) : BaseVideoNode("srt_node", options) { RCLCPP_INFO(this->get_logger(), "Initializing SrtNode..."); + // Set last trigger time to 1 hour ago so the first error triggers immediately + backoff_state_.last_trigger_time = + std::chrono::steady_clock::now() - std::chrono::hours(1); + backoff_state_.last_loss_time = std::chrono::steady_clock::now(); + backoff_state_.last_total_dropped_pkts = 0; + backoff_state_.current_delay_ms = INITIAL_BACKOFF_MS; + this->declare_parameter("srt_uri", "srt://:7001"); this->declare_parameter("latency", 100); this->declare_parameter("iframe_interval", 0); this->declare_parameter("test_mode", false); this->declare_parameter("stats_frequency", 1.0); + this->declare_parameter("target_framerate", 30); + this->declare_parameter("backoff_max_delay_ms", 5000); + this->declare_parameter("backoff_reset_ms", 10000); + + backoff_state_.max_delay_ms = + this->get_parameter("backoff_max_delay_ms").as_int(); + backoff_state_.reset_ms = this->get_parameter("backoff_reset_ms").as_int(); param_callback_handle_ = this->add_on_set_parameters_callback( std::bind(&SrtNode::on_parameter_change, this, std::placeholders::_1)); @@ -53,6 +66,12 @@ SrtNode::SrtNode(const rclcpp::NodeOptions &options) RCLCPP_INFO(get_logger(), "SrtNode constructed and pipeline started."); } +SrtNode::~SrtNode() { + BaseVideoNode::safe_gst_unref(framerate_caps_); + BaseVideoNode::safe_gst_unref(av1_encoder_); + BaseVideoNode::safe_gst_unref(srt_sink_); +} + bool SrtNode::create_pipeline() { std::string srt_uri = this->get_parameter("srt_uri").as_string(); int latency_val = this->get_parameter("latency").as_int(); @@ -60,30 +79,40 @@ bool SrtNode::create_pipeline() { bool test_mode = this->get_parameter("test_mode").as_bool(); + int framerate = this->get_parameter("target_framerate").as_int(); + gchar *desc; if (test_mode) { desc = g_strdup_printf( - "videotestsrc is-live=true ! video/x-raw,width=640,height=480 ! " + "videotestsrc is-live=true ! " + "videorate ! " + "capsfilter name=framerate_caps " + "caps=video/x-raw,framerate=%d/1 ! " "videoconvert ! " "x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast " "name=av1_enc ! " "rtph264pay ! queue ! " "srtsink name=srt_sink uri=%s latency=%d", - srt_uri.c_str(), latency_val); + framerate, srt_uri.c_str(), latency_val); + RCLCPP_WARN(this->get_logger(), "Using MAC TEST pipeline description: %s", desc); } else { desc = g_strdup_printf( - "interpipesrc listen-to=detect is-live=true ! " + + "interpipesrc format=3 listen-to=detect is-live=true ! " + "nvvidconv ! videorate ! " + "capsfilter name=framerate_caps " + "caps=video/x-raw(memory:NVMM),framerate=%d/1 ! " "nvvidconv ! " "nvv4l2av1enc name=av1_enc insert-seq-hdr=true iframeinterval=%d ! " " av1parse ! capsfilter caps=\"video/x-av1, " "alignment=obu, parsed=true\" ! " "queue ! " "srtsink name=srt_sink uri=%s latency=%d sync=false", - iframe_interval_val, srt_uri.c_str(), latency_val); + framerate, iframe_interval_val, srt_uri.c_str(), latency_val); RCLCPP_INFO(this->get_logger(), "Using PRODUCTION pipeline description: %s", desc); } @@ -110,8 +139,9 @@ bool SrtNode::create_pipeline() { av1_encoder_ = this->get_element("av1_enc"); srt_sink_ = this->get_element("srt_sink"); + framerate_caps_ = get_element("framerate_caps"); - if (!av1_encoder_ || !srt_sink_) { + if (!av1_encoder_ || !srt_sink_ || !framerate_caps_) { RCLCPP_ERROR(get_logger(), "Failed to get elements by name"); return false; } @@ -120,6 +150,27 @@ bool SrtNode::create_pipeline() { "Pipeline parsed and elements retrieved successfully."); return true; } +void SrtNode::change_framerate(int new_fps) { + if (!pipeline_) { + RCLCPP_WARN(this->get_logger(), + "Pipeline not initialized, cannot change framerate."); + return; + } + + if (!framerate_caps_) { + RCLCPP_WARN(this->get_logger(), + "framerate_caps_ is null! Check create_pipeline."); + return; + } + + GstCaps *caps = gst_caps_new_simple("video/x-raw", "framerate", + GST_TYPE_FRACTION, new_fps, 1, NULL); + + g_object_set(framerate_caps_, "caps", caps, NULL); + gst_caps_unref(caps); + + RCLCPP_INFO(this->get_logger(), "Framerate changed to %d fps", new_fps); +} rcl_interfaces::msg::SetParametersResult SrtNode::on_parameter_change(const std::vector ¶meters) { @@ -128,18 +179,65 @@ SrtNode::on_parameter_change(const std::vector ¶meters) { result.reason = "success"; for (const auto ¶m : parameters) { - if (param.get_name() == "latency" && srt_sink_) { - int val = param.as_int(); - g_object_set(srt_sink_, "latency", val, NULL); - RCLCPP_INFO(this->get_logger(), "Param Update: Latency set to %d", val); - } else if (param.get_name() == "iframe_interval" && av1_encoder_) { + const std::string &name = param.get_name(); + + if (name == "latency") { + if (srt_sink_) { + int val = param.as_int(); + g_object_set(srt_sink_, "latency", val, NULL); + RCLCPP_INFO(this->get_logger(), "Param Update: Latency set to %d", val); + } + continue; + } + + if (name == "iframe_interval") { + if (av1_encoder_) { + int val = param.as_int(); + g_object_set(av1_encoder_, "iframeinterval", val, NULL); + RCLCPP_INFO(this->get_logger(), + "Param Update: IFrame Interval set to %d", val); + } + continue; + } + + if (name == "target_framerate") { + if (!stop_pipeline()) { + RCLCPP_ERROR(this->get_logger(), + "Failed to stop pipeline for reconfiguration."); + result.successful = false; + result.reason = "Failed to stop pipeline for reconfiguration."; + continue; + } + if (!start_pipeline()) { + RCLCPP_ERROR(this->get_logger(), + "Failed to start pipeline after reconfiguration."); + result.successful = false; + result.reason = "Failed to start pipeline after reconfiguration."; + } + continue; + } + + if (name == "backoff_max_delay_ms") { int val = param.as_int(); + if (val > 0) { + backoff_state_.max_delay_ms = val; + RCLCPP_INFO(this->get_logger(), + "Param Update: Backoff Max Delay set to %d ms", val); + } + continue; + } - g_object_set(av1_encoder_, "iframeinterval", val, NULL); - RCLCPP_INFO(this->get_logger(), "Param Update: IFrame Interval set to %d", - val); + if (name == "backoff_reset_ms") { + int val = param.as_int(); + if (val > 0) { + backoff_state_.reset_ms = val; + RCLCPP_INFO(this->get_logger(), + "Param Update: Backoff Reset Time set to %d ms", val); + } + continue; } } + return result; } @@ -154,16 +252,72 @@ void SrtNode::on_bitrate_received(const std_msgs::msg::Int32::SharedPtr msg) { void SrtNode::on_iframe_trigger(const std_msgs::msg::Empty::SharedPtr msg) { (void)msg; - if (av1_encoder_) { - GstEvent *event = gst_video_event_new_downstream_force_key_unit( - GST_CLOCK_TIME_NONE, GST_CLOCK_TIME_NONE, GST_CLOCK_TIME_NONE, TRUE, 0); - if (gst_element_send_event(av1_encoder_, event)) { - RCLCPP_INFO(this->get_logger(), "I-Frame triggered (Event sent)"); + if (srt_sink_) { + + GstEvent *event = gst_video_event_new_upstream_force_key_unit( + GST_CLOCK_TIME_NONE, TRUE, 0); + + if (gst_element_send_event(srt_sink_, event)) { + RCLCPP_INFO(this->get_logger(), + "Manual I-Frame triggered (Upstream Event sent)"); } else { - RCLCPP_WARN(this->get_logger(), "Failed to trigger I-Frame"); + RCLCPP_WARN(this->get_logger(), + "Failed to trigger Manual I-Frame (Event rejected)"); + } + } else { + RCLCPP_WARN(this->get_logger(), "Cannot trigger I-Frame: SRT Sink is null"); + } +} + +void SrtNode::check_packet_loss_and_trigger(int64_t current_total_dropped) { + auto now = std::chrono::steady_clock::now(); + + if (current_total_dropped <= backoff_state_.last_total_dropped_pkts) { + auto time_since_loss = + std::chrono::duration_cast( + now - backoff_state_.last_loss_time) + .count(); + + if (time_since_loss > backoff_state_.reset_ms && + backoff_state_.current_delay_ms != INITIAL_BACKOFF_MS) { + RCLCPP_DEBUG(this->get_logger(), + "SRT connection stable. Resetting backoff."); } + backoff_state_.current_delay_ms = INITIAL_BACKOFF_MS; + return; } + + backoff_state_.last_total_dropped_pkts = current_total_dropped; + backoff_state_.last_loss_time = now; + + auto time_since_trigger = + std::chrono::duration_cast( + now - backoff_state_.last_trigger_time) + .count(); + + if (time_since_trigger < backoff_state_.current_delay_ms) { + RCLCPP_DEBUG(this->get_logger(), + "SRT Packet Drop Detected (%ld total). Suppressing I-Frame " + "request (Backoff in effect: %d ms)", + time_since_trigger, backoff_state_.current_delay_ms); + return; + } + + RCLCPP_WARN(this->get_logger(), + "SRT Packet Drop Detected (%ld total). Requesting I-Frame! (Next " + "backoff: %d ms)", + current_total_dropped, backoff_state_.current_delay_ms * 2); + + GstEvent *event = + gst_video_event_new_upstream_force_key_unit(GST_CLOCK_TIME_NONE, TRUE, 0); + if (srt_sink_) { + gst_element_send_event(srt_sink_, event); + } + + backoff_state_.last_trigger_time = now; + backoff_state_.current_delay_ms = std::min( + backoff_state_.current_delay_ms * 2, backoff_state_.max_delay_ms); } void SrtNode::publish_srt_stats() { @@ -226,6 +380,14 @@ void SrtNode::publish_srt_stats() { msg.packets_retransmitted = ret_int; } + int64_t pkt_drop_total = 0; + + if (gst_structure_get_int64(target_stats, "pkt-drop-total", + &pkt_drop_total)) { + msg.packet_drop_total = pkt_drop_total; + check_packet_loss_and_trigger(pkt_drop_total); + } + srt_stats_pub_->publish(msg); gst_structure_free(stats); } diff --git a/src/interfaces/msg/SrtStats.msg b/src/interfaces/msg/SrtStats.msg index 266f5e43..5a951361 100644 --- a/src/interfaces/msg/SrtStats.msg +++ b/src/interfaces/msg/SrtStats.msg @@ -5,3 +5,4 @@ float64 bandwidth # Estimated bandwidth (in bits/sec) int64 packets_sent int64 packets_lost int64 packets_retransmitted +int64 packet_drop_total