From 24507919a16f6a2501b2aa0cc1bf1f9a82d87923 Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Sat, 3 May 2025 21:50:11 -0400 Subject: [PATCH 01/12] complete tui version of online bagger --- src/mil_common/mil_msgs/CMakeLists.txt | 2 +- .../mil_msgs/action/BagOnline.action | 2 +- src/mil_common/mil_msgs/srv/BagTopics.srv | 5 + src/mil_common/online_bagger/CMakeLists.txt | 53 +++ .../include/online_bagger/online_bagger.h | 5 + .../online_bagger/launch/test.launch.py | 17 + src/mil_common/online_bagger/package.xml | 25 + src/mil_common/online_bagger/src/client.cpp | 350 ++++++++++++++ src/mil_common/online_bagger/src/server.cpp | 428 ++++++++++++++++++ 9 files changed, 885 insertions(+), 2 deletions(-) create mode 100644 src/mil_common/mil_msgs/srv/BagTopics.srv create mode 100644 src/mil_common/online_bagger/CMakeLists.txt create mode 100644 src/mil_common/online_bagger/include/online_bagger/online_bagger.h create mode 100644 src/mil_common/online_bagger/launch/test.launch.py create mode 100644 src/mil_common/online_bagger/package.xml create mode 100644 src/mil_common/online_bagger/src/client.cpp create mode 100644 src/mil_common/online_bagger/src/server.cpp diff --git a/src/mil_common/mil_msgs/CMakeLists.txt b/src/mil_common/mil_msgs/CMakeLists.txt index 196e45f2..d7ac42e7 100644 --- a/src/mil_common/mil_msgs/CMakeLists.txt +++ b/src/mil_common/mil_msgs/CMakeLists.txt @@ -36,7 +36,7 @@ set(msg_files "msg/ObjectDetections.msg") set(srv_files "srv/CameraToLidarTransform.srv" "srv/ObjectDBQuery.srv" - "srv/SetGeometry.srv") + "srv/SetGeometry.srv" "srv/BagTopics.srv") rosidl_generate_interfaces( ${PROJECT_NAME} diff --git a/src/mil_common/mil_msgs/action/BagOnline.action b/src/mil_common/mil_msgs/action/BagOnline.action index 79d721b7..6edec87e 100644 --- a/src/mil_common/mil_msgs/action/BagOnline.action +++ b/src/mil_common/mil_msgs/action/BagOnline.action @@ -9,7 +9,7 @@ string bag_name # Space separated list of topics to bag. If empty string, all buffered topics will be bagged -string topics +string[] topics # Time in seconds float32 bag_time diff --git a/src/mil_common/mil_msgs/srv/BagTopics.srv b/src/mil_common/mil_msgs/srv/BagTopics.srv new file mode 100644 index 00000000..5dfdf1d4 --- /dev/null +++ b/src/mil_common/mil_msgs/srv/BagTopics.srv @@ -0,0 +1,5 @@ +# Onling bagger client call this service to obtain a list of topics to be baggered +# The topics will be display on client ui + +--- +string[] topics # Contains all topic names to bag in the online bagger server. diff --git a/src/mil_common/online_bagger/CMakeLists.txt b/src/mil_common/online_bagger/CMakeLists.txt new file mode 100644 index 00000000..ba73570b --- /dev/null +++ b/src/mil_common/online_bagger/CMakeLists.txt @@ -0,0 +1,53 @@ +cmake_minimum_required(VERSION 3.8) +project(online_bagger) + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(mil_msgs REQUIRED) +find_package(std_msgs REQUIRED) +find_package(rclcpp_action REQUIRED) +find_package(rosbag2_cpp REQUIRED) +find_package(Boost REQUIRED COMPONENTS filesystem system program_options) +find_package(ftxui REQUIRED COMPONENTS screen dom component) + +include_directories(include ${Boost_INCLUDE_DIRS}) +add_executable(${PROJECT_NAME}_server src/server.cpp) +add_executable(${PROJECT_NAME}_client src/client.cpp) + +ament_target_dependencies(${PROJECT_NAME}_server + rclcpp + mil_msgs + std_msgs + rclcpp_action + rosbag2_cpp + Boost +) + +ament_target_dependencies(${PROJECT_NAME}_client + rclcpp + mil_msgs + std_msgs + rclcpp_action + Boost +) + +target_link_libraries(${PROJECT_NAME}_client +ftxui::screen +ftxui::dom +ftxui::component +) + +install(TARGETS ${PROJECT_NAME}_server ${PROJECT_NAME}_client + DESTINATION lib/${PROJECT_NAME} +) + +install(DIRECTORY launch/ + DESTINATION share/${PROJECT_NAME}/launch +) + +ament_package() diff --git a/src/mil_common/online_bagger/include/online_bagger/online_bagger.h b/src/mil_common/online_bagger/include/online_bagger/online_bagger.h new file mode 100644 index 00000000..00b5ebfe --- /dev/null +++ b/src/mil_common/online_bagger/include/online_bagger/online_bagger.h @@ -0,0 +1,5 @@ +namespace online_bagger +{ +static constexpr char BAG_ACTION_NAME[] = "/online_bagger/bag"; +static constexpr char TOPIC_SERVICE_NAME[] = "/online_bagger/topics"; +} \ No newline at end of file diff --git a/src/mil_common/online_bagger/launch/test.launch.py b/src/mil_common/online_bagger/launch/test.launch.py new file mode 100644 index 00000000..777161bd --- /dev/null +++ b/src/mil_common/online_bagger/launch/test.launch.py @@ -0,0 +1,17 @@ +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch_ros.actions import Node + +def generate_launch_description(): + return LaunchDescription([ + + # Launch online_bagger node with topics list + Node( + package='online_bagger', + executable='online_bagger_server', + output='screen', + parameters=[{ + 'topics': ['rand_int', 'rand_float', 'rand_str'] + }] + ) + ]) diff --git a/src/mil_common/online_bagger/package.xml b/src/mil_common/online_bagger/package.xml new file mode 100644 index 00000000..735c5cb0 --- /dev/null +++ b/src/mil_common/online_bagger/package.xml @@ -0,0 +1,25 @@ + + + + online_bagger + 0.0.0 + The online bagger is a tool that allows user to store bagged information about important events, after they happen. + zhongzheng + MIL + + ament_cmake + + ftxui + rclcpp + rclcpp_action + rosbag2_cpp + mil_msgs + std_msgs + + ament_lint_auto + ament_lint_common + + + ament_cmake + + diff --git a/src/mil_common/online_bagger/src/client.cpp b/src/mil_common/online_bagger/src/client.cpp new file mode 100644 index 00000000..9a1fd348 --- /dev/null +++ b/src/mil_common/online_bagger/src/client.cpp @@ -0,0 +1,350 @@ +#include +#include + +#include + +#include "ftxui/component/loop.hpp" +#include "ftxui/component/screen_interactive.hpp" +#include "ftxui/component/component.hpp" +#include "ftxui/dom/elements.hpp" + +#include "mil_msgs/action/bag_online.hpp" +#include "mil_msgs/srv/bag_topics.hpp" +#include "online_bagger/online_bagger.h" + +#include +#include + +namespace online_bagger +{ +using namespace ftxui; + +class TopicList: public ComponentBase +{ + public: + TopicList() + { + } + ~TopicList() + { + + } + + void refresh(std::vector&& topics) + { + this->topics = std::move(topics); + DetachAllChildren(); + if(selected) + delete[] selected; + + if(this->topics.size() == 0) + return; + + future = std::async(std::launch::async, [&]{ + selected = new bool[this->topics.size()]{false}; + Components items; + for(size_t i=0; itopics.size(); i++) + { + items.push_back(Checkbox(this->topics[i], &selected[i])); + } + + return Container::Vertical(items); + }); + + return; + } + + const std::vector get_selected() + { + std::vector selected_topics; + for(size_t i=0;i 0) + return ChildAt(0)->Render(); + else + return text("No bag topics available"); + } + else + { + if(future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) + { + Add(future.get()); + } + + return spinner(15, clock.now().nanoseconds() / 200'000'000); + } + } + + private: + std::future future; + std::vector topics; + bool* selected = nullptr; + rclcpp::Clock clock; +}; + +class UIScreen +{ + public: + UIScreen(Component ui): + loop(&screen, ui) + { + + } + + ~UIScreen() + { + } + + void shutdown() + { + screen.Exit(); + } + + bool ok() + { + return !loop.HasQuitted(); + } + + void spin_once() + { + loop.RunOnce(); + } + + void refresh() + { + screen.PostEvent(Event::Custom); + } + + private: + ScreenInteractive screen = ScreenInteractive::Fullscreen(); + Loop loop; +}; + +class Client: public rclcpp::Node +{ + public: + Client():rclcpp::Node("online_bagger_client") + { + using namespace ftxui; + + action_client = rclcpp_action::create_client(this, BAG_ACTION_NAME); + srv_client = create_client(TOPIC_SERVICE_NAME); + + // Head + Component ui_head = Renderer([]{return text("Online Bagger") | bold | center | flex;}); + // Body + Component ui_name_input = Input(&name_input, InputOption::Default()); + Component ui_time_input = Input(&time_input, InputOption::Default()) | CatchEvent([](Event event) { + return event.is_character() && !(std::isdigit(event.character()[0]) || event.character()[0] == '.'); + }); + Component ui_body = Renderer(Container::Vertical({ + ui_topic_list, ui_name_input, ui_time_input + }),[=]{ + return vbox({ + text("Bag Topics") | vcenter, + ui_topic_list->Render() | border, + text("Bag File Name ") | vcenter, + ui_name_input->Render() | border, + text("Bag Time ") | vcenter, + ui_time_input->Render() | border + }) | vscroll_indicator | frame; + }); + + // Bottom + auto on_quit = [&]{rclcpp::shutdown();}; + auto on_bag = [&]{ + bag_progress = 0.0f; + mil_msgs::action::BagOnline::Goal goal; + goal.bag_name = name_input; + goal.topics = ui_topic_list->get_selected(); + goal.bag_time = std::stof(time_input); + + auto goal_options = rclcpp_action::Client::SendGoalOptions(); + goal_options.goal_response_callback = std::bind(&Client::handle_reponse, this, std::placeholders::_1); + goal_options.result_callback = std::bind(&Client::handle_result, this, std::placeholders::_1); + goal_options.feedback_callback = std::bind(&Client::handle_feedback, this, std::placeholders::_1, std::placeholders::_2); + action_client->async_send_goal(goal, goal_options); + }; + auto on_refresh = [=]{ + auto request = std::make_shared(); + srv_client->async_send_request(request, [&]( + rclcpp::Client::SharedFuture future){ + ui_topic_list->refresh(std::move(future.get()->topics)); + }); + }; + + Component ui_status_bar = Renderer([&]{ + if(client_state == ClientState::Waiting) + return hbox({ + text("Connecting to bag server "), + spinner(15, now().nanoseconds() / 200'000'000) + }); + else if(client_state == ClientState::Ready) + return filler(); + else + return gauge(bag_progress); + }); + + Component ui_bottom_bar = Container::Horizontal({ + ui_status_bar | vcenter | xflex, + Button("Quit", on_quit, ButtonOption::Border()) | align_right, + Maybe(Button("Refresh", on_refresh, ButtonOption::Border()), [&]{ + return client_state != ClientState::Waiting; + }) | align_right, + Maybe(Button("Bag", on_bag, ButtonOption::Border()), [&]{ + return client_state == ClientState::Ready; + }) | align_right + }); + + + // Main UI + Component main_ui = Container::Vertical({ + ui_head, + Renderer([]{return separator();}), + ui_body | flex, + Renderer([]{return separator();}), + ui_bottom_bar + }); + + screens.emplace(main_ui); + + ui_timer = create_wall_timer(std::chrono::milliseconds(20), std::bind(&Client::refresh_ui, this)); + alive_timer = create_wall_timer(std::chrono::milliseconds(100), std::bind(&Client::is_alive, this)); + } + ~Client() + { + + } + private: + + using BagOnlineGoalHandle = rclcpp_action::ClientGoalHandle; + + enum class ClientState + { + Waiting, + Ready, + Bagging + }client_state = ClientState::Waiting; + + float bag_progress = 0.0f; + std::string result_string; + rclcpp_action::Client::SharedPtr action_client; + rclcpp::Client::SharedPtr srv_client; + rclcpp::TimerBase::SharedPtr ui_timer; + rclcpp::TimerBase::SharedPtr alive_timer; + std::shared_ptr ui_topic_list = std::make_shared(); + + std::stack screens; + + std::string name_input = "bag"; + std::string time_input = "1"; + + void refresh_ui() + { + UIScreen& screen = screens.top(); + if(screen.ok()) + { + screen.spin_once(); + } + else + { + screens.pop(); + if(screens.size() == 0) + rclcpp::shutdown(); + } + } + + void is_alive() + { + if(client_state == ClientState::Waiting && action_client->action_server_is_ready()) + { + auto request = std::make_shared(); + srv_client->async_send_request(request, [&]( + rclcpp::Client::SharedFuture future){ + client_state = ClientState::Ready; + ui_topic_list->refresh(std::move(future.get()->topics)); + }); + } + else if(client_state == ClientState::Ready && !action_client->action_server_is_ready()) + { + client_state = ClientState::Waiting; + } + else if(client_state == ClientState::Bagging && !action_client->action_server_is_ready()) + { + client_state = ClientState::Waiting; + } + screens.top().refresh(); + } + + void handle_reponse(BagOnlineGoalHandle::SharedPtr goal_handle) + { + if(!goal_handle) + { + show_dialog("Failed", "Request is rejected by the online bagger server"); + } + } + + void handle_feedback([[maybe_unused]]BagOnlineGoalHandle::SharedPtr goal_handle, + const std::shared_ptr feedback) + { + bag_progress = feedback->progress; + } + + void handle_result(const BagOnlineGoalHandle::WrappedResult& result) + { + if(result.result->success) + { + show_dialog("Success", "Succuessfully save bag file to " + result.result->filename); + } + else + { + show_dialog("Failed", "Failed to bag topics: " + result.result->status); + } + + client_state = ClientState::Ready; + } + + void show_dialog(const std::string& title, const std::string& body) + { + Component dialog_ui = Container::Vertical({}); + UIScreen& screen = screens.emplace(dialog_ui | border); + + Component close_button = Button("X", [&]{ + screen.shutdown(); + }, ButtonOption::Ascii()); + + dialog_ui->Add(Renderer(close_button,[=]{ + return hbox({ + text(title) | center |flex, + close_button->Render() + }); + })); + + dialog_ui->Add(Renderer([]{return separator();})); + dialog_ui->Add(Renderer([=]{return paragraph(body);})); + } + +}; +} + +int main(int argc, char** argv) +{ + rclcpp::init(argc, argv); + + std::shared_ptr client = std::make_shared(); + + rclcpp::spin(client); + rclcpp::shutdown(); + + return 0; +} diff --git a/src/mil_common/online_bagger/src/server.cpp b/src/mil_common/online_bagger/src/server.cpp new file mode 100644 index 00000000..437ed099 --- /dev/null +++ b/src/mil_common/online_bagger/src/server.cpp @@ -0,0 +1,428 @@ +#include +#include +#include + +#include "mil_msgs/action/bag_online.hpp" +#include "mil_msgs/srv/bag_topics.hpp" +#include "online_bagger/online_bagger.h" + +#include +#include +#include + +#include +#include +#include +#include + +namespace online_bagger +{ +class Server: public rclcpp::Node +{ + public: + + Server():rclcpp::Node("online_bagger_server") + { + get_params(); + + topics_service = create_service(TOPIC_SERVICE_NAME, + std::bind(&Server::handle_topics_req, this, std::placeholders::_1, std::placeholders::_2) + ); + + _action_server = rclcpp_action::create_server(this, + BAG_ACTION_NAME, + std::bind(&Server::handle_goal, this, std::placeholders::_1, std::placeholders::_2), + std::bind(&Server::handle_cancel, this, std::placeholders::_1), + std::bind(&Server::handle_accepted, this, std::placeholders::_1) + ); + + subscribe(); + if(subscriber_list.size() == successful_subscription_count && subscriber_list.size() != 0) + { + RCLCPP_INFO(get_logger(), "Subscribed to %ld of %ld topics", + successful_subscription_count, + subscriber_list.size()); + } + else + { + RCLCPP_INFO(get_logger(), "Subscribed to %ld of %ld topics, will try again every %.2f seconds", + successful_subscription_count, + subscriber_list.size(), + resubscribe_period); + resubscriber = create_wall_timer(std::chrono::milliseconds(static_cast(resubscribe_period*1000)), [this]{ + if ((successful_subscription_count == subscriber_list.size()) && resubscriber) + { + resubscriber->cancel(); + if(subscriber_list.size() == 0) + { + RCLCPP_WARN(get_logger(), "No topics selected to subscribe to. Closing."); + rclcpp::shutdown(); + } + else + { + RCLCPP_INFO(get_logger(), "All topics are subscribed! Shutting down resubscriber"); + } + + return; + } + subscribe(); + }); + } + } + ~Server() + { + + } + + private: + using BagOnlineGoalHandle = rclcpp_action::ServerGoalHandle; + std::atomic_bool streaming = true; + rclcpp_action::Server::SharedPtr _action_server; + rclcpp::Service::SharedPtr topics_service; + size_t successful_subscription_count = 0; + size_t stream_time; + bool dated_folder; + float resubscribe_period; + std::filesystem::path dir; + std::unordered_map> subscriber_list; + std::unordered_map>>> topic_messages; + rclcpp::TimerBase::SharedPtr resubscriber; + size_t iteration_count = 0; + + void handle_topics_req([[maybe_unused]]mil_msgs::srv::BagTopics::Request::ConstSharedPtr request, + mil_msgs::srv::BagTopics::Response::SharedPtr response) + { + for(const auto& [topic, info] : subscriber_list) + { + const auto& [time, subscribed] = info; + if(subscribed != nullptr) + response->topics.push_back(topic); + } + } + + rclcpp_action::GoalResponse handle_goal( + [[maybe_unused]]const rclcpp_action::GoalUUID & uuid, + [[maybe_unused]]std::shared_ptr goal) + { + RCLCPP_INFO(get_logger(), "Accepted goal from online bagger client"); + return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; + } + + rclcpp_action::CancelResponse handle_cancel( + [[maybe_unused]]const std::shared_ptr goal_handle) + { + return rclcpp_action::CancelResponse::REJECT; + } + + float get_topic_duration(const std::string& topic) + { + return (topic_messages[topic].back().first - topic_messages[topic].front().first).nanoseconds() / 1e-9; + } + + size_t get_time_index(const std::string& topic, float requested_seconds) + { + if(requested_seconds == 0) + return 0; + + float topic_duration = get_topic_duration(topic); + + if(topic_duration == 0) + return 0; + + float ratio = requested_seconds / topic_duration; + size_t index = topic_messages[topic].size() * (1 - std::min(ratio, 1.0f)); + return index; + } + + void handle_accepted(std::shared_ptr goal_handle) + { + std::thread(std::bind(&Server::start_bagging, this, std::placeholders::_1), goal_handle).detach(); + } + + void bagger_callback(std::shared_ptr msg, const std::string& topic) + { + if(!streaming) + return; + + + iteration_count += 1; + topic_messages[topic].push_back({now(), msg}); + + float time_diff = get_topic_duration(topic); + + if(iteration_count % 100 == 0) + { + RCLCPP_DEBUG(get_logger(), "%s has %ld messages spaning %.2f seconds.", + topic.c_str(), + topic_messages[topic].size(), + time_diff); + } + + while(time_diff > subscriber_list[topic].first && rclcpp::ok()) + { + topic_messages[topic].pop_front(); + time_diff = get_topic_duration(topic); + } + + } + + std::filesystem::path get_bag_name(const std::filesystem::path& filename) + { + std::filesystem::path default_dir = dir; + std::string date = boost::gregorian::to_simple_string(boost::gregorian::day_clock::local_day()); + std::string time = boost::posix_time::to_simple_string(boost::posix_time::second_clock::local_time().time_of_day()); + if(dated_folder) + { + default_dir /= date; + } + + std::filesystem::path bag_dir = default_dir / filename.parent_path(); + std::filesystem::path bag_name = filename.filename(); + if(!std::filesystem::exists(bag_dir)) + { + std::filesystem::create_directories(bag_dir); + } + + if(!bag_name.has_filename()) + { + bag_name.replace_filename(date + '-' + time); + } + + if(bag_name.extension() != "bag") + { + bag_name.replace_extension("bag"); + } + + return bag_dir / bag_name; + } + + void start_bagging(std::shared_ptr goal_handle) + { + RCLCPP_INFO(get_logger(), "Start bagging"); + auto result = std::make_shared(); + if(!streaming.exchange(false)) + { + result->status = "Bag Request came in while bagging, priority given to prior request", + result->success = false; + goto ret; + } + + try + { + const auto goal = goal_handle->get_goal(); + std::filesystem::path filename = get_bag_name(goal->bag_name).string(); + + float requested_seconds = goal->bag_time; + const std::vector& selected_topics = goal->topics; + + auto feedback = std::make_shared(); + size_t total_messages = 0; + std::unordered_map bag_topics; + for(const auto& topic : selected_topics) + { + auto it = subscriber_list.find(topic); + if(it == subscriber_list.end()) + continue; + + const auto& [time, subscribed] = it->second; + if(subscribed == nullptr) + continue; + + if(topic_messages[topic].size() == 0) + continue; + + size_t index = get_time_index(topic, requested_seconds); + total_messages += topic_messages[topic].size() - index; + bag_topics[topic] = index; + } + + if(total_messages == 0) + { + result->success = false; + result->status = "No messages to bag"; + goto ret; + } + + auto bag = std::make_shared(); + rosbag2_storage::StorageOptions storage_options; + storage_options.uri = filename.string(); + bag->open(storage_options); + + goal_handle->publish_feedback(feedback); + + auto topic_names_and_types = get_topic_names_and_types(); + + size_t msg_inc = 0; + for(const auto& [topic, index] : bag_topics) + { + for(auto it=topic_messages[topic].begin(); + it!=topic_messages[topic].end(); + it++) + { + bag->write(it->second, topic, topic_names_and_types[topic][0],it->first); + if(msg_inc % 50 == 0) // send feedback every 50 messages + { + feedback->progress = static_cast(msg_inc) / total_messages; + goal_handle->publish_feedback(feedback); + } + msg_inc += 1; + // empty deque when done writing to bag + topic_messages[topic].clear(); + } + } + feedback->progress = 1.0; + goal_handle->publish_feedback(feedback); + bag->close(); + + result->success = true; + } + catch(const std::exception& e) + { + result->status = e.what(); + result->success = false; + } +ret: + if(result->success) + { + goal_handle->succeed(result); + RCLCPP_INFO(get_logger(), "Successfully bag to %s", result->filename.c_str()); + } + else + { + goal_handle->abort(result); + RCLCPP_INFO(get_logger(), "Failed to bag: %s", result->status.c_str()); + } + + streaming = true; + } + + void subscribe() + { + auto topic_names_and_types = get_topic_names_and_types(); + for (auto& [topic, info] : subscriber_list) + { + auto& [time, subscribed] = info; + + if(!subscribed) + { + auto it = topic_names_and_types.find(topic); + if(it != topic_names_and_types.end()) + { + successful_subscription_count++; + subscribed = create_generic_subscription( + topic, + it->second[0], + rclcpp::QoS(10), + [this, topic](std::shared_ptr msg){ + bagger_callback(msg, topic); + } + ); + } + } + } + } + + void add_env_var(const std::vector& var) + { + for(auto& topic : var) + { + subscriber_list.emplace(std::move(topic), std::make_pair(stream_time, nullptr)); + } + } + + void get_params() + { + boost::process::environment env = boost::this_process::environment(); + auto it = env.find("BAG_DIR"); + if(it != env.end()) + { + dir = it->to_string(); + } + else + { + it = env.find("HOME"); + if(it != env.end()) + { + dir = std::filesystem::path(it->to_string()) / "bag"; + } + else + { + RCLCPP_FATAL(get_logger(), "Failed to obtain the bag package path, exiting..."); + rclcpp::shutdown(); + } + } + + dir = std::filesystem::path(declare_parameter("bag_package_path", dir.string())); + + + stream_time = declare_parameter("stream_time", 30); + resubscribe_period = declare_parameter("resubscribe_period", 3.0); + dated_folder = declare_parameter("dated_folder", true); + + std::vector topics_param; + for(const auto& [topic, types] : get_topic_names_and_types()) + { + topics_param.push_back(topic); + } + topics_param = declare_parameter("topics", std::vector(topics_param)); + + for (const std::string& topic : topics_param) + { + std::istringstream iss(topic); + std::string token; + std::vector tokens; + while (std::getline(iss, token, ',')) + { + tokens.push_back(std::move(token)); + } + + size_t topic_stream_time = stream_time; + if(tokens.size() == 2) + { + try + { + topic_stream_time = std::stoul(tokens[1]); + } + catch(const std::exception& e) + { + RCLCPP_WARN(get_logger(), "Failed to set stream time %s for topic %s.", tokens[0].c_str(), tokens[1].c_str()); + } + } + + subscriber_list.emplace(std::move(tokens[0]), std::make_pair(topic_stream_time, nullptr)); + } + + it = env.find("BAG_ALWAYS"); + if(it != env.end()) + { + add_env_var(it->to_vector()); + } + + for(const auto& entry : env) + { + std::string key = entry.get_name(); + if (key.size() >= 4 && key.substr(0, 4) == "bag_") + { + add_env_var(entry.to_vector()); + } + } + + RCLCPP_INFO(get_logger(), "Default stream_time: %ld seconds", stream_time); + RCLCPP_INFO(get_logger(), "Bag Directory: %s", dir.c_str()); + } +}; +} + +int main(int argc, char* argv[]) +{ + rclcpp::init(argc, argv); + + auto server = std::make_shared(); + rclcpp::spin(server); + rclcpp::shutdown(); + + return 0; +} + + + From 8f3457800e217254aa007b0570d2c2dc93a2e382 Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Sun, 4 May 2025 16:49:10 -0400 Subject: [PATCH 02/12] run tui in a seperate thread --- src/mil_common/online_bagger/src/client.cpp | 316 ++++++++++---------- src/mil_common/online_bagger/src/server.cpp | 5 +- 2 files changed, 166 insertions(+), 155 deletions(-) diff --git a/src/mil_common/online_bagger/src/client.cpp b/src/mil_common/online_bagger/src/client.cpp index 9a1fd348..cffe4b84 100644 --- a/src/mil_common/online_bagger/src/client.cpp +++ b/src/mil_common/online_bagger/src/client.cpp @@ -14,6 +14,7 @@ #include #include +#include namespace online_bagger { @@ -92,104 +93,175 @@ class TopicList: public ComponentBase rclcpp::Clock clock; }; -class UIScreen +class Client: public rclcpp::Node { public: - UIScreen(Component ui): - loop(&screen, ui) + using BagOnlineGoalHandle = rclcpp_action::ClientGoalHandle; + + enum class State { + Waiting, + Ready, + Bagging + }; - } + struct BagOptions + { + std::vector topics; + std::string name; + float time; + std::function on_feedback; + std::function on_result; + }; - ~UIScreen() + Client():rclcpp::Node("online_bagger_client") { + using namespace ftxui; + + action_client = rclcpp_action::create_client(this, BAG_ACTION_NAME); + srv_client = create_client(TOPIC_SERVICE_NAME); + + alive_timer = create_wall_timer(std::chrono::milliseconds(100), std::bind(&Client::is_alive, this)); + } + ~Client() + { + } - void shutdown() + State get_state() { - screen.Exit(); + return state; } - bool ok() + + std::shared_future> + get_bag_topics(std::function::SharedFuture future)> callback) { - return !loop.HasQuitted(); + auto request = std::make_shared(); + return srv_client->async_send_request(request, callback).future; } - void spin_once() + std::shared_future + start_bagging(const mil_msgs::action::BagOnline::Goal& goal, + const rclcpp_action::Client::SendGoalOptions& options) { - loop.RunOnce(); + state = State::Bagging; + return action_client->async_send_goal(goal, options); } - void refresh() + void finish_bagging() { - screen.PostEvent(Event::Custom); + state = State::Ready; } private: - ScreenInteractive screen = ScreenInteractive::Fullscreen(); - Loop loop; + + std::atomic state; + rclcpp_action::Client::SharedPtr action_client; + rclcpp::Client::SharedPtr srv_client; + rclcpp::TimerBase::SharedPtr alive_timer; + + void is_alive() + { + if(state == State::Waiting && action_client->action_server_is_ready()) + { + state = State::Ready; + } + else if(state == State::Ready && !action_client->action_server_is_ready()) + { + state = State::Waiting; + } + else if(state == State::Bagging && !action_client->action_server_is_ready()) + { + state = State::Waiting; + } + } + }; -class Client: public rclcpp::Node +class Tui { public: - Client():rclcpp::Node("online_bagger_client") + Tui() { - using namespace ftxui; - action_client = rclcpp_action::create_client(this, BAG_ACTION_NAME); - srv_client = create_client(TOPIC_SERVICE_NAME); + } + + ~Tui() + { + + } + void spin(std::shared_ptr client) + { // Head Component ui_head = Renderer([]{return text("Online Bagger") | bold | center | flex;}); // Body + std::shared_ptr ui_topic_list = std::make_shared(); Component ui_name_input = Input(&name_input, InputOption::Default()); Component ui_time_input = Input(&time_input, InputOption::Default()) | CatchEvent([](Event event) { return event.is_character() && !(std::isdigit(event.character()[0]) || event.character()[0] == '.'); }); - Component ui_body = Renderer(Container::Vertical({ - ui_topic_list, ui_name_input, ui_time_input - }),[=]{ - return vbox({ - text("Bag Topics") | vcenter, - ui_topic_list->Render() | border, - text("Bag File Name ") | vcenter, - ui_name_input->Render() | border, - text("Bag Time ") | vcenter, - ui_time_input->Render() | border - }) | vscroll_indicator | frame; - }); + Component ui_body = Container::Vertical({ + Renderer([]{return text("Bag Topics") | vcenter;}), + ui_topic_list | border, + Renderer([]{return text("Bag File Name ") | vcenter;}), + ui_name_input | border, + Renderer([]{return text("Bag Time ") | vcenter;}), + ui_time_input | border + }) | vscroll_indicator | frame ; // Bottom - auto on_quit = [&]{rclcpp::shutdown();}; - auto on_bag = [&]{ + auto on_quit = [this]{screen.Exit();}; + auto on_bag = [this, ui_topic_list, client]{ bag_progress = 0.0f; + mil_msgs::action::BagOnline::Goal goal; goal.bag_name = name_input; goal.topics = ui_topic_list->get_selected(); goal.bag_time = std::stof(time_input); auto goal_options = rclcpp_action::Client::SendGoalOptions(); - goal_options.goal_response_callback = std::bind(&Client::handle_reponse, this, std::placeholders::_1); - goal_options.result_callback = std::bind(&Client::handle_result, this, std::placeholders::_1); - goal_options.feedback_callback = std::bind(&Client::handle_feedback, this, std::placeholders::_1, std::placeholders::_2); - action_client->async_send_goal(goal, goal_options); + goal_options.goal_response_callback = [this](Client::BagOnlineGoalHandle::SharedPtr goal_handle){ + if(!goal_handle) + { + screen.Post([this]{show_dialog("Failed", "Request is rejected by the online bagger server");}); + } + }; + goal_options.result_callback = [this, client = client](const Client::BagOnlineGoalHandle::WrappedResult& result){ + std::string message = result.result->success ? "Succuessfully save bag file to " + result.result->filename : + "Failed to bag topics: " + result.result->status; + screen.Post([this, message = std::move(message), success = result.result->success]{ + show_dialog(success ? "Success" : "Failed", message); + }); + client->finish_bagging(); + }; + goal_options.feedback_callback = [this]( + [[maybe_unused]]Client::BagOnlineGoalHandle::SharedPtr goal_handle, + const std::shared_ptr feedback){ + screen.Post([this, progress = feedback->progress]{bag_progress = progress;}); + }; + + client->start_bagging(goal, goal_options); }; - auto on_refresh = [=]{ - auto request = std::make_shared(); - srv_client->async_send_request(request, [&]( - rclcpp::Client::SharedFuture future){ - ui_topic_list->refresh(std::move(future.get()->topics)); + + auto on_refresh = [this, ui_topic_list, client]{ + client->get_bag_topics([&](rclcpp::Client::SharedFuture future){ + screen.Post([ui_topic_list, topics = std::move(future.get()->topics)] () mutable { + ui_topic_list->refresh(std::move(topics)); + }); }); }; - Component ui_status_bar = Renderer([&]{ - if(client_state == ClientState::Waiting) + Component ui_status_bar = Renderer([this, client]{ + if(client->get_state() == Client::State::Waiting) + { return hbox({ text("Connecting to bag server "), - spinner(15, now().nanoseconds() / 200'000'000) + spinner(15, clock.now().nanoseconds() / 200'000'000) }); - else if(client_state == ClientState::Ready) + } + else if(client->get_state() == Client::State::Ready) return filler(); else return gauge(bag_progress); @@ -198,11 +270,11 @@ class Client: public rclcpp::Node Component ui_bottom_bar = Container::Horizontal({ ui_status_bar | vcenter | xflex, Button("Quit", on_quit, ButtonOption::Border()) | align_right, - Maybe(Button("Refresh", on_refresh, ButtonOption::Border()), [&]{ - return client_state != ClientState::Waiting; + Maybe(Button("Refresh", on_refresh, ButtonOption::Border()), [client]{ + return client->get_state() != Client::State::Waiting; }) | align_right, - Maybe(Button("Bag", on_bag, ButtonOption::Border()), [&]{ - return client_state == ClientState::Ready; + Maybe(Button("Bag", on_bag, ButtonOption::Border()), [client]{ + return client->get_state() == Client::State::Ready; }) | align_right }); @@ -216,125 +288,53 @@ class Client: public rclcpp::Node ui_bottom_bar }); - screens.emplace(main_ui); - - ui_timer = create_wall_timer(std::chrono::milliseconds(20), std::bind(&Client::refresh_ui, this)); - alive_timer = create_wall_timer(std::chrono::milliseconds(100), std::bind(&Client::is_alive, this)); - } - ~Client() - { - - } - private: - - using BagOnlineGoalHandle = rclcpp_action::ClientGoalHandle; - - enum class ClientState - { - Waiting, - Ready, - Bagging - }client_state = ClientState::Waiting; - - float bag_progress = 0.0f; - std::string result_string; - rclcpp_action::Client::SharedPtr action_client; - rclcpp::Client::SharedPtr srv_client; - rclcpp::TimerBase::SharedPtr ui_timer; - rclcpp::TimerBase::SharedPtr alive_timer; - std::shared_ptr ui_topic_list = std::make_shared(); - - std::stack screens; + on_refresh(); - std::string name_input = "bag"; - std::string time_input = "1"; + Loop loop(&screen, main_ui); - void refresh_ui() - { - UIScreen& screen = screens.top(); - if(screen.ok()) - { - screen.spin_once(); - } - else - { - screens.pop(); - if(screens.size() == 0) - rclcpp::shutdown(); - } - } - - void is_alive() - { - if(client_state == ClientState::Waiting && action_client->action_server_is_ready()) - { - auto request = std::make_shared(); - srv_client->async_send_request(request, [&]( - rclcpp::Client::SharedFuture future){ - client_state = ClientState::Ready; - ui_topic_list->refresh(std::move(future.get()->topics)); - }); - } - else if(client_state == ClientState::Ready && !action_client->action_server_is_ready()) - { - client_state = ClientState::Waiting; - } - else if(client_state == ClientState::Bagging && !action_client->action_server_is_ready()) - { - client_state = ClientState::Waiting; - } - screens.top().refresh(); - } + std::thread refresh_thread([&]{ + while(!loop.HasQuitted()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + screen.PostEvent(Event::Custom); + } + }); - void handle_reponse(BagOnlineGoalHandle::SharedPtr goal_handle) - { - if(!goal_handle) - { - show_dialog("Failed", "Request is rejected by the online bagger server"); - } - } + loop.Run(); - void handle_feedback([[maybe_unused]]BagOnlineGoalHandle::SharedPtr goal_handle, - const std::shared_ptr feedback) - { - bag_progress = feedback->progress; + refresh_thread.join(); } - void handle_result(const BagOnlineGoalHandle::WrappedResult& result) - { - if(result.result->success) - { - show_dialog("Success", "Succuessfully save bag file to " + result.result->filename); - } - else - { - show_dialog("Failed", "Failed to bag topics: " + result.result->status); - } - - client_state = ClientState::Ready; - } + private: + rclcpp::Clock clock; + ScreenInteractive screen = ScreenInteractive::Fullscreen(); + std::string name_input = "Bag"; + std::string time_input = "1"; + float bag_progress = 0.0f; void show_dialog(const std::string& title, const std::string& body) { - Component dialog_ui = Container::Vertical({}); - UIScreen& screen = screens.emplace(dialog_ui | border); + ScreenInteractive dialog_screen = ScreenInteractive::Fullscreen(); Component close_button = Button("X", [&]{ - screen.shutdown(); + dialog_screen.Exit(); }, ButtonOption::Ascii()); - dialog_ui->Add(Renderer(close_button,[=]{ - return hbox({ - text(title) | center |flex, - close_button->Render() - }); - })); + Component dialog_ui = Container::Vertical({ + Renderer(close_button,[=]{ + return hbox({ + text(title) | center |flex, + close_button->Render() + }); + }), + Renderer([]{return separator();}), + Renderer([=]{return paragraph(body);}) + }); - dialog_ui->Add(Renderer([]{return separator();})); - dialog_ui->Add(Renderer([=]{return paragraph(body);})); + dialog_screen.Loop(dialog_ui); } - }; + } int main(int argc, char** argv) @@ -342,9 +342,17 @@ int main(int argc, char** argv) rclcpp::init(argc, argv); std::shared_ptr client = std::make_shared(); + + std::thread ui_thread([=]{ + online_bagger::Tui tui; + tui.spin(client); + rclcpp::shutdown(); + }); rclcpp::spin(client); rclcpp::shutdown(); + ui_thread.join(); + return 0; } diff --git a/src/mil_common/online_bagger/src/server.cpp b/src/mil_common/online_bagger/src/server.cpp index 437ed099..465ec370 100644 --- a/src/mil_common/online_bagger/src/server.cpp +++ b/src/mil_common/online_bagger/src/server.cpp @@ -116,7 +116,10 @@ class Server: public rclcpp::Node float get_topic_duration(const std::string& topic) { - return (topic_messages[topic].back().first - topic_messages[topic].front().first).nanoseconds() / 1e-9; + if(topic_messages[topic].size() == 0) + return 0.0f; + + return (topic_messages[topic].back().first - topic_messages[topic].front().first).nanoseconds() / 1e-9f; } size_t get_time_index(const std::string& topic, float requested_seconds) From 95aa832eeb0b4da4955ffcd128887248df3e2ba4 Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Mon, 5 May 2025 01:58:12 -0400 Subject: [PATCH 03/12] reorganize file structure --- src/mil_common/mil_tools/CMakeLists.txt | 1 + .../include/mil_tools/tui/dialog.hpp | 65 +++ src/mil_common/mil_tools/package.xml | 1 + src/mil_common/online_bagger/CMakeLists.txt | 28 +- .../include/online_bagger/client.hpp | 49 +++ .../{online_bagger.h => common.hpp} | 2 + src/mil_common/online_bagger/package.xml | 1 + src/mil_common/online_bagger/src/client.cpp | 365 ++-------------- .../online_bagger/src/online_bagger.cpp | 402 ++++++++++++++++++ src/mil_common/online_bagger/src/server.cpp | 16 +- 10 files changed, 590 insertions(+), 340 deletions(-) create mode 100644 src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp create mode 100644 src/mil_common/online_bagger/include/online_bagger/client.hpp rename src/mil_common/online_bagger/include/online_bagger/{online_bagger.h => common.hpp} (92%) create mode 100644 src/mil_common/online_bagger/src/online_bagger.cpp diff --git a/src/mil_common/mil_tools/CMakeLists.txt b/src/mil_common/mil_tools/CMakeLists.txt index 031eccb5..279d778d 100644 --- a/src/mil_common/mil_tools/CMakeLists.txt +++ b/src/mil_common/mil_tools/CMakeLists.txt @@ -16,6 +16,7 @@ find_package(backward_ros REQUIRED) find_package(Eigen3 REQUIRED) find_package(tf2_geometry_msgs REQUIRED) find_package(rclcpp REQUIRED) +find_package(ftxui REQUIRED) set(SOURCE_FILES src/geometry/Rotation.cpp src/fs/path.cpp src/os.cpp src/os/FileDescriptor.cpp src/string.cpp) diff --git a/src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp b/src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp new file mode 100644 index 00000000..e8d92d15 --- /dev/null +++ b/src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp @@ -0,0 +1,65 @@ +#pragma once +// Show a dialog on the terminal powered by ftxui + +#include "ftxui/component/screen_interactive.hpp" +#include "ftxui/component/component.hpp" +#include "ftxui/dom/elements.hpp" + +namespace mil_tools::tui +{ +class Dialog +{ +public: + Dialog() + { + + } + + int exec(const std::string& title, + ftxui::Component content, + const std::vector& buttons = {"OK"}) + { + using namespace ftxui; + Component close_button = Button("X", [&]{ + screen.Exit(); + }, ButtonOption::Ascii()); + + Components button_comps; + button_comps.push_back(Renderer([]{return filler();})); + for(size_t i=0;iRender() + }); + }), + Renderer([]{return separator();}), + content | vcenter | flex, + Renderer([]{return separator();}), + Container::Horizontal(button_comps) + }) | border; + + screen.Loop(dialog_ui); + return exit_code; + } + + void close() + { + screen.Exit(); + } + + private: + int exit_code = -1; + ftxui::ScreenInteractive screen = ftxui::ScreenInteractive::Fullscreen(); +}; +} \ No newline at end of file diff --git a/src/mil_common/mil_tools/package.xml b/src/mil_common/mil_tools/package.xml index d8d1cd60..333d7ccb 100644 --- a/src/mil_common/mil_tools/package.xml +++ b/src/mil_common/mil_tools/package.xml @@ -13,6 +13,7 @@ rclcpp backward_ros + ftxui ament ament_cmake ament_cmake_python diff --git a/src/mil_common/online_bagger/CMakeLists.txt b/src/mil_common/online_bagger/CMakeLists.txt index ba73570b..a8346c3c 100644 --- a/src/mil_common/online_bagger/CMakeLists.txt +++ b/src/mil_common/online_bagger/CMakeLists.txt @@ -10,16 +10,18 @@ find_package(ament_cmake REQUIRED) find_package(rclcpp REQUIRED) find_package(mil_msgs REQUIRED) find_package(std_msgs REQUIRED) +find_package(mil_tools REQUIRED) find_package(rclcpp_action REQUIRED) find_package(rosbag2_cpp REQUIRED) find_package(Boost REQUIRED COMPONENTS filesystem system program_options) find_package(ftxui REQUIRED COMPONENTS screen dom component) include_directories(include ${Boost_INCLUDE_DIRS}) -add_executable(${PROJECT_NAME}_server src/server.cpp) -add_executable(${PROJECT_NAME}_client src/client.cpp) +add_executable(${PROJECT_NAME}_server_node src/server.cpp) +add_library(${PROJECT_NAME}_client src/client.cpp) +add_executable(${PROJECT_NAME} src/online_bagger.cpp) -ament_target_dependencies(${PROJECT_NAME}_server +ament_target_dependencies(${PROJECT_NAME}_server_node rclcpp mil_msgs std_msgs @@ -33,19 +35,29 @@ ament_target_dependencies(${PROJECT_NAME}_client mil_msgs std_msgs rclcpp_action +) + +ament_target_dependencies(${PROJECT_NAME} + mil_tools + rclcpp Boost ) -target_link_libraries(${PROJECT_NAME}_client -ftxui::screen -ftxui::dom -ftxui::component +target_link_libraries(${PROJECT_NAME} + ${PROJECT_NAME}_client + ftxui::screen + ftxui::dom + ftxui::component ) -install(TARGETS ${PROJECT_NAME}_server ${PROJECT_NAME}_client +install(TARGETS ${PROJECT_NAME}_server_node ${PROJECT_NAME}_client DESTINATION lib/${PROJECT_NAME} ) +install(TARGETS ${PROJECT_NAME} + DESTINATION bin +) + install(DIRECTORY launch/ DESTINATION share/${PROJECT_NAME}/launch ) diff --git a/src/mil_common/online_bagger/include/online_bagger/client.hpp b/src/mil_common/online_bagger/include/online_bagger/client.hpp new file mode 100644 index 00000000..5cafea31 --- /dev/null +++ b/src/mil_common/online_bagger/include/online_bagger/client.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include +#include + +#include "mil_msgs/srv/bag_topics.hpp" +#include "mil_msgs/action/bag_online.hpp" + +namespace online_bagger +{ +class Client: public rclcpp::Node +{ + public: + using BagOnlineGoalHandle = rclcpp_action::ClientGoalHandle; + + enum class State + { + Waiting, + Ready, + Bagging + }; + + Client(); + ~Client(); + + State get_state(); + + std::shared_future> + get_bag_topics(std::function::SharedFuture future)> callback); + + std::shared_future + start_bagging(const mil_msgs::action::BagOnline::Goal& goal, + const rclcpp_action::Client::SendGoalOptions& options); + + void finish_bagging(); + + private: + + std::atomic state; + rclcpp_action::Client::SharedPtr action_client; + rclcpp::Client::SharedPtr srv_client; + rclcpp::TimerBase::SharedPtr alive_timer; + + void is_alive(); + +}; + +} + diff --git a/src/mil_common/online_bagger/include/online_bagger/online_bagger.h b/src/mil_common/online_bagger/include/online_bagger/common.hpp similarity index 92% rename from src/mil_common/online_bagger/include/online_bagger/online_bagger.h rename to src/mil_common/online_bagger/include/online_bagger/common.hpp index 00b5ebfe..239abf51 100644 --- a/src/mil_common/online_bagger/include/online_bagger/online_bagger.h +++ b/src/mil_common/online_bagger/include/online_bagger/common.hpp @@ -1,3 +1,5 @@ +#pragma once + namespace online_bagger { static constexpr char BAG_ACTION_NAME[] = "/online_bagger/bag"; diff --git a/src/mil_common/online_bagger/package.xml b/src/mil_common/online_bagger/package.xml index 735c5cb0..3f67b192 100644 --- a/src/mil_common/online_bagger/package.xml +++ b/src/mil_common/online_bagger/package.xml @@ -15,6 +15,7 @@ rosbag2_cpp mil_msgs std_msgs + mil_tools ament_lint_auto ament_lint_common diff --git a/src/mil_common/online_bagger/src/client.cpp b/src/mil_common/online_bagger/src/client.cpp index cffe4b84..d482cd97 100644 --- a/src/mil_common/online_bagger/src/client.cpp +++ b/src/mil_common/online_bagger/src/client.cpp @@ -1,358 +1,73 @@ #include #include -#include - -#include "ftxui/component/loop.hpp" -#include "ftxui/component/screen_interactive.hpp" -#include "ftxui/component/component.hpp" -#include "ftxui/dom/elements.hpp" - #include "mil_msgs/action/bag_online.hpp" #include "mil_msgs/srv/bag_topics.hpp" -#include "online_bagger/online_bagger.h" +#include "online_bagger/common.hpp" +#include "online_bagger/client.hpp" -#include -#include #include namespace online_bagger { -using namespace ftxui; -class TopicList: public ComponentBase +Client::Client():rclcpp::Node("online_bagger_client") { - public: - TopicList() - { - } - ~TopicList() - { - - } - - void refresh(std::vector&& topics) - { - this->topics = std::move(topics); - DetachAllChildren(); - if(selected) - delete[] selected; - - if(this->topics.size() == 0) - return; - - future = std::async(std::launch::async, [&]{ - selected = new bool[this->topics.size()]{false}; - Components items; - for(size_t i=0; itopics.size(); i++) - { - items.push_back(Checkbox(this->topics[i], &selected[i])); - } - - return Container::Vertical(items); - }); + action_client = rclcpp_action::create_client(this, BAG_ACTION_NAME); + srv_client = create_client(TOPIC_SERVICE_NAME); - return; - } - - const std::vector get_selected() - { - std::vector selected_topics; - for(size_t i=0;i 0) - return ChildAt(0)->Render(); - else - return text("No bag topics available"); - } - else - { - if(future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) - { - Add(future.get()); - } - - return spinner(15, clock.now().nanoseconds() / 200'000'000); - } - } + if(action_client->wait_for_action_server(std::chrono::seconds(1)) == true) + state = State::Ready; - private: - std::future future; - std::vector topics; - bool* selected = nullptr; - rclcpp::Clock clock; -}; + alive_timer = create_wall_timer(std::chrono::milliseconds(100), std::bind(&Client::is_alive, this)); +} -class Client: public rclcpp::Node +Client::~Client() { - public: - using BagOnlineGoalHandle = rclcpp_action::ClientGoalHandle; - - enum class State - { - Waiting, - Ready, - Bagging - }; - - struct BagOptions - { - std::vector topics; - std::string name; - float time; - std::function on_feedback; - std::function on_result; - }; - - Client():rclcpp::Node("online_bagger_client") - { - using namespace ftxui; - - action_client = rclcpp_action::create_client(this, BAG_ACTION_NAME); - srv_client = create_client(TOPIC_SERVICE_NAME); - - alive_timer = create_wall_timer(std::chrono::milliseconds(100), std::bind(&Client::is_alive, this)); - } - ~Client() - { - - } - - State get_state() - { - return state; - } - - - std::shared_future> - get_bag_topics(std::function::SharedFuture future)> callback) - { - auto request = std::make_shared(); - return srv_client->async_send_request(request, callback).future; - } - - std::shared_future - start_bagging(const mil_msgs::action::BagOnline::Goal& goal, - const rclcpp_action::Client::SendGoalOptions& options) - { - state = State::Bagging; - return action_client->async_send_goal(goal, options); - } - - void finish_bagging() - { - state = State::Ready; - } - private: +} - std::atomic state; - rclcpp_action::Client::SharedPtr action_client; - rclcpp::Client::SharedPtr srv_client; - rclcpp::TimerBase::SharedPtr alive_timer; +Client::State Client::get_state() +{ + return state; +} - void is_alive() - { - if(state == State::Waiting && action_client->action_server_is_ready()) - { - state = State::Ready; - } - else if(state == State::Ready && !action_client->action_server_is_ready()) - { - state = State::Waiting; - } - else if(state == State::Bagging && !action_client->action_server_is_ready()) - { - state = State::Waiting; - } - } -}; +std::shared_future> + Client::get_bag_topics(std::function::SharedFuture future)> callback) +{ + auto request = std::make_shared(); + return srv_client->async_send_request(request, callback).future; +} -class Tui +std::shared_future + Client::start_bagging(const mil_msgs::action::BagOnline::Goal& goal, + const rclcpp_action::Client::SendGoalOptions& options) { - public: - Tui() - { + state = State::Bagging; + return action_client->async_send_goal(goal, options); +} - } +void Client::finish_bagging() +{ + action_client->async_cancel_all_goals(); + state = State::Ready; +} - ~Tui() +void Client::is_alive() +{ + if(state == State::Waiting && action_client->action_server_is_ready()) { - + state = State::Ready; } - - void spin(std::shared_ptr client) + else if(state == State::Ready && !action_client->action_server_is_ready()) { - // Head - Component ui_head = Renderer([]{return text("Online Bagger") | bold | center | flex;}); - // Body - std::shared_ptr ui_topic_list = std::make_shared(); - Component ui_name_input = Input(&name_input, InputOption::Default()); - Component ui_time_input = Input(&time_input, InputOption::Default()) | CatchEvent([](Event event) { - return event.is_character() && !(std::isdigit(event.character()[0]) || event.character()[0] == '.'); - }); - Component ui_body = Container::Vertical({ - Renderer([]{return text("Bag Topics") | vcenter;}), - ui_topic_list | border, - Renderer([]{return text("Bag File Name ") | vcenter;}), - ui_name_input | border, - Renderer([]{return text("Bag Time ") | vcenter;}), - ui_time_input | border - }) | vscroll_indicator | frame ; - - // Bottom - auto on_quit = [this]{screen.Exit();}; - auto on_bag = [this, ui_topic_list, client]{ - bag_progress = 0.0f; - - mil_msgs::action::BagOnline::Goal goal; - goal.bag_name = name_input; - goal.topics = ui_topic_list->get_selected(); - goal.bag_time = std::stof(time_input); - - auto goal_options = rclcpp_action::Client::SendGoalOptions(); - goal_options.goal_response_callback = [this](Client::BagOnlineGoalHandle::SharedPtr goal_handle){ - if(!goal_handle) - { - screen.Post([this]{show_dialog("Failed", "Request is rejected by the online bagger server");}); - } - }; - goal_options.result_callback = [this, client = client](const Client::BagOnlineGoalHandle::WrappedResult& result){ - std::string message = result.result->success ? "Succuessfully save bag file to " + result.result->filename : - "Failed to bag topics: " + result.result->status; - screen.Post([this, message = std::move(message), success = result.result->success]{ - show_dialog(success ? "Success" : "Failed", message); - }); - client->finish_bagging(); - }; - goal_options.feedback_callback = [this]( - [[maybe_unused]]Client::BagOnlineGoalHandle::SharedPtr goal_handle, - const std::shared_ptr feedback){ - screen.Post([this, progress = feedback->progress]{bag_progress = progress;}); - }; - - client->start_bagging(goal, goal_options); - }; - - auto on_refresh = [this, ui_topic_list, client]{ - client->get_bag_topics([&](rclcpp::Client::SharedFuture future){ - screen.Post([ui_topic_list, topics = std::move(future.get()->topics)] () mutable { - ui_topic_list->refresh(std::move(topics)); - }); - }); - }; - - Component ui_status_bar = Renderer([this, client]{ - if(client->get_state() == Client::State::Waiting) - { - return hbox({ - text("Connecting to bag server "), - spinner(15, clock.now().nanoseconds() / 200'000'000) - }); - } - else if(client->get_state() == Client::State::Ready) - return filler(); - else - return gauge(bag_progress); - }); - - Component ui_bottom_bar = Container::Horizontal({ - ui_status_bar | vcenter | xflex, - Button("Quit", on_quit, ButtonOption::Border()) | align_right, - Maybe(Button("Refresh", on_refresh, ButtonOption::Border()), [client]{ - return client->get_state() != Client::State::Waiting; - }) | align_right, - Maybe(Button("Bag", on_bag, ButtonOption::Border()), [client]{ - return client->get_state() == Client::State::Ready; - }) | align_right - }); - - - // Main UI - Component main_ui = Container::Vertical({ - ui_head, - Renderer([]{return separator();}), - ui_body | flex, - Renderer([]{return separator();}), - ui_bottom_bar - }); - - on_refresh(); - - Loop loop(&screen, main_ui); - - std::thread refresh_thread([&]{ - while(!loop.HasQuitted()) - { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - screen.PostEvent(Event::Custom); - } - }); - - loop.Run(); - - refresh_thread.join(); + state = State::Waiting; } - - private: - rclcpp::Clock clock; - ScreenInteractive screen = ScreenInteractive::Fullscreen(); - std::string name_input = "Bag"; - std::string time_input = "1"; - float bag_progress = 0.0f; - - void show_dialog(const std::string& title, const std::string& body) + else if(state == State::Bagging && !action_client->action_server_is_ready()) { - ScreenInteractive dialog_screen = ScreenInteractive::Fullscreen(); - - Component close_button = Button("X", [&]{ - dialog_screen.Exit(); - }, ButtonOption::Ascii()); - - Component dialog_ui = Container::Vertical({ - Renderer(close_button,[=]{ - return hbox({ - text(title) | center |flex, - close_button->Render() - }); - }), - Renderer([]{return separator();}), - Renderer([=]{return paragraph(body);}) - }); - - dialog_screen.Loop(dialog_ui); + state = State::Waiting; } -}; - } -int main(int argc, char** argv) -{ - rclcpp::init(argc, argv); - - std::shared_ptr client = std::make_shared(); - - std::thread ui_thread([=]{ - online_bagger::Tui tui; - tui.spin(client); - rclcpp::shutdown(); - }); - - rclcpp::spin(client); - rclcpp::shutdown(); - - ui_thread.join(); - - return 0; } diff --git a/src/mil_common/online_bagger/src/online_bagger.cpp b/src/mil_common/online_bagger/src/online_bagger.cpp new file mode 100644 index 00000000..cdf29998 --- /dev/null +++ b/src/mil_common/online_bagger/src/online_bagger.cpp @@ -0,0 +1,402 @@ +#include "ftxui/component/screen_interactive.hpp" +#include "ftxui/component/component.hpp" +#include "ftxui/dom/elements.hpp" +#include "ftxui/component/loop.hpp" + +#include "online_bagger/client.hpp" +#include "mil_tools/tui/dialog.hpp" + +#include + +#include + +using namespace ftxui; +using namespace online_bagger; + +class TopicList: public ComponentBase +{ + public: + TopicList() + { + } + ~TopicList() + { + + } + + void refresh(std::vector&& topics) + { + this->topics = std::move(topics); + DetachAllChildren(); + if(selected) + delete[] selected; + + if(this->topics.size() == 0) + return; + + future = std::async(std::launch::async, [&]{ + selected = new bool[this->topics.size()]{false}; + Components items; + for(size_t i=0; itopics.size(); i++) + { + items.push_back(Checkbox(this->topics[i], &selected[i])); + } + + return Container::Vertical(items); + }); + + return; + } + + const std::vector get_selected() + { + std::vector selected_topics; + for(size_t i=0;i 0) + return ChildAt(0)->Render(); + else + return text("No bag topics available"); + } + else + { + if(future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) + { + Add(future.get()); + } + + return spinner(15, clock.now().nanoseconds() / 200'000'000); + } + } + + private: + std::future future; + std::vector topics; + bool* selected = nullptr; + rclcpp::Clock clock; +}; + +class Tui +{ + public: + Tui() + { + + } + + ~Tui() + { + + } + + void spin(std::shared_ptr client) + { + // Head + Component ui_head = Renderer([]{return text("Online Bagger") | bold | center;}); + // Body + std::shared_ptr ui_topic_list = std::make_shared(); + + Component ui_body = ui_topic_list | vscroll_indicator | frame ; + + // Bottom + auto on_quit = [this]{screen.Exit();}; + auto on_bag = [this, ui_topic_list, client]{ + bag_progress = 0.0f; + + mil_msgs::action::BagOnline::Goal goal; + goal.topics = ui_topic_list->get_selected(); + goal.bag_time = 1.0f; + + if(!ask_bag_detail(goal.bag_name, goal.bag_time)) + return; + + auto goal_options = rclcpp_action::Client::SendGoalOptions(); + goal_options.goal_response_callback = [this](Client::BagOnlineGoalHandle::SharedPtr goal_handle){ + if(!goal_handle) + { + screen.Post([this]{show_message("Failed", "Request is rejected by the online bagger server");}); + } + }; + goal_options.result_callback = [this, client = client](const Client::BagOnlineGoalHandle::WrappedResult& result){ + std::string message = result.result->success ? "Succuessfully save bag file to " + result.result->filename : + "Failed to bag topics: " + result.result->status; + screen.Post([this, message = std::move(message), success = result.result->success]{ + show_message(success ? "Success" : "Failed", message); + }); + client->finish_bagging(); + }; + goal_options.feedback_callback = [this]( + [[maybe_unused]]Client::BagOnlineGoalHandle::SharedPtr goal_handle, + const std::shared_ptr feedback){ + screen.Post([this, progress = feedback->progress]{bag_progress = progress;}); + }; + + client->start_bagging(goal, goal_options); + }; + + auto on_refresh = [this, ui_topic_list, client]{ + client->get_bag_topics([&](rclcpp::Client::SharedFuture future){ + screen.Post([ui_topic_list, topics = std::move(future.get()->topics)] () mutable { + ui_topic_list->refresh(std::move(topics)); + }); + }); + }; + + Component ui_status_bar = Renderer([this, client]{ + if(client->get_state() == Client::State::Waiting) + { + return hbox({ + text("Connecting to bag server "), + spinner(15, clock.now().nanoseconds() / 200'000'000) + }); + } + else if(client->get_state() == Client::State::Ready) + return filler(); + else + return gauge(bag_progress); + }); + + Component ui_bottom_bar = Container::Horizontal({ + ui_status_bar | vcenter | xflex, + Button("Quit", on_quit, ButtonOption::Border()) | align_right, + Maybe(Button("Refresh", on_refresh, ButtonOption::Border()), [client]{ + return client->get_state() != Client::State::Waiting; + }) | align_right, + Maybe(Button("Bag", on_bag, ButtonOption::Border()), [client]{ + return client->get_state() == Client::State::Ready; + }) | align_right + }); + + + // Main UI + Component main_ui = Container::Vertical({ + ui_head, + Renderer([]{return separator();}), + ui_body | flex, + Renderer([]{return separator();}), + ui_bottom_bar + }); + + on_refresh(); + + Loop loop(&screen, main_ui); + + std::thread refresh_thread([&]{ + while(!loop.HasQuitted()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + screen.PostEvent(Event::Custom); + } + }); + + loop.Run(); + + refresh_thread.join(); + } + + private: + rclcpp::Clock clock; + ScreenInteractive screen = ScreenInteractive::Fullscreen(); + float bag_progress = 0.0f; + + bool ask_bag_detail(std::string& name, float& time) + { + mil_tools::tui::Dialog dialog; + std::string time_input = std::to_string(time); + Component ui_name_input = Input(&name, InputOption::Default()); + Component ui_time_input = Input(&time_input, InputOption::Default()) | CatchEvent([](Event event) { + return event.is_character() && !(std::isdigit(event.character()[0]) || event.character()[0] == '.'); + }); + + Component content = Renderer(Container::Vertical({ui_name_input, ui_time_input}),[=]{ + return gridbox({ + {text("Bag File Name ") | vcenter, ui_name_input->Render() | border}, + {text("Bag Time ") | vcenter, ui_time_input->Render() | border} + }); + }) | vscroll_indicator | frame; + + if(dialog.exec("Bag details", content) == -1) + return false; + + try + { + time = std::stof(time_input); + } + catch(const std::exception& e) + { + return false; + } + + return true; + } + + void show_message(const std::string& title, const std::string& message) + { + mil_tools::tui::Dialog dialog; + dialog.exec(title, Renderer([&]{return paragraph(message);})); + } +}; + +namespace po = boost::program_options; + +void show_help_message() +{ + +} + +int main(int argc, char** argv) +{ + std::string subcommand; + std::vector options; + + po::options_description global_opts("Global"); + global_opts.add_options() + ("subcommand", po::value(&subcommand), "Subcommand") + ("options", po::value>(&options), "Args for subcommand"); + + po::positional_options_description pos; + pos.add("subcommand", 1).add("options", -1); + + po::variables_map vm; + po::store(po::command_line_parser(argc, argv) + .options(global_opts) + .positional(pos) + .allow_unregistered() + .run(), + vm); + po::notify(vm); + + enum class Mode { + TUI, + HELP, + BAG, + LIST + }; + + const std::unordered_map subcmd_mapping = { + {"tui", Mode::TUI}, + {"help", Mode::HELP}, + {"bag", Mode::BAG}, + {"list", Mode::LIST} + }; + + auto it = subcmd_mapping.find(subcommand); + if(it == subcmd_mapping.end()) + { + show_help_message(); + return 1; + } + + Mode mode = it -> second; + rclcpp::init(1, argv); + std::shared_ptr client = std::make_shared(); + + if(mode == Mode::HELP) + { + show_help_message(); + } + else if(mode == Mode::LIST) + { + auto future = client->get_bag_topics([](rclcpp::Client::SharedFuture future){ + for(const std::string& topic : future.get()->topics) + { + std::cout << topic << std::endl; + } + rclcpp::shutdown(); + }); + + if(rclcpp::spin_until_future_complete(client, future, std::chrono::seconds(2)) == rclcpp::FutureReturnCode::TIMEOUT) + std::cout << "Online bagger server doesn't exist.\n"; + } + else if(mode == Mode::BAG) + { + auto future = client->get_bag_topics([&]([[maybe_unused]]rclcpp::Client::SharedFuture future){}); + + if(rclcpp::spin_until_future_complete(client, future, std::chrono::seconds(2)) == rclcpp::FutureReturnCode::SUCCESS) + { + mil_msgs::action::BagOnline::Goal goal; + goal.topics = future.get()->topics; + goal.bag_time = 1.0f; + + po::options_description bag_opts("bag options"); + bag_opts.add_options() + ("name,n", po::value(&goal.bag_name), "Bag name") + ("time,t", po::value(&goal.bag_time), "Time in seconds") + ("topics,p", po::value>(&goal.topics)->multitoken(), "Topics to bag"); + + po::variables_map bag_vm; + po::store(po::command_line_parser(options) + .options(bag_opts) + .run(), + bag_vm); + po::notify(bag_vm); + + auto goal_options = rclcpp_action::Client::SendGoalOptions(); + goal_options.goal_response_callback = [](online_bagger::Client::BagOnlineGoalHandle::SharedPtr goal_handle){ + if(!goal_handle) + { + std::cout << "Failed to bag: Request is rejected by the online bagger server.\n"; + } + }; + goal_options.result_callback = [&](const online_bagger::Client::BagOnlineGoalHandle::WrappedResult& result){ + std::string message = result.result->success ? "Succuessfully save bag file to " + result.result->filename : + "Failed to bag topics: " + result.result->status; + std::cout << message << ".\n"; + client->finish_bagging(); + rclcpp::shutdown(); + }; + goal_options.feedback_callback = [&]( + [[maybe_unused]]online_bagger::Client::BagOnlineGoalHandle::SharedPtr goal_handle, + const std::shared_ptr feedback){ + std::cout << "Bagging " << feedback->progress << "%\n"; + }; + + auto bag_future = client->start_bagging(goal, goal_options); + + rclcpp::spin(client); + + std::thread thread([&]{ + while(bag_future.wait_for(std::chrono::seconds(1)) == std::future_status::timeout) + { + if(client->get_state() == Client::State::Waiting) + { + std::cout << "Lost connection with the online bagger server.\n"; + rclcpp::shutdown(); + break; + } + } + }); + + thread.join(); + } + else + { + std::cout << "Online bagger server doesn't exist.\n"; + } + } + else if(mode == Mode::TUI) + { + std::thread thread([=]{ + Tui tui; + tui.spin(client); + rclcpp::shutdown(); + }); + + rclcpp::spin(client); + thread.join(); + } + + rclcpp::shutdown(); + return 0; +} + diff --git a/src/mil_common/online_bagger/src/server.cpp b/src/mil_common/online_bagger/src/server.cpp index 465ec370..aae1e693 100644 --- a/src/mil_common/online_bagger/src/server.cpp +++ b/src/mil_common/online_bagger/src/server.cpp @@ -4,7 +4,7 @@ #include "mil_msgs/action/bag_online.hpp" #include "mil_msgs/srv/bag_topics.hpp" -#include "online_bagger/online_bagger.h" +#include "online_bagger/common.hpp" #include #include @@ -133,7 +133,7 @@ class Server: public rclcpp::Node return 0; float ratio = requested_seconds / topic_duration; - size_t index = topic_messages[topic].size() * (1 - std::min(ratio, 1.0f)); + size_t index = topic_messages[topic].size() * (1 - std::clamp(ratio, 0.0f, 1.0f)); return index; } @@ -201,7 +201,6 @@ class Server: public rclcpp::Node void start_bagging(std::shared_ptr goal_handle) { - RCLCPP_INFO(get_logger(), "Start bagging"); auto result = std::make_shared(); if(!streaming.exchange(false)) { @@ -213,7 +212,8 @@ class Server: public rclcpp::Node try { const auto goal = goal_handle->get_goal(); - std::filesystem::path filename = get_bag_name(goal->bag_name).string(); + result->filename = get_bag_name(goal->bag_name).string(); + RCLCPP_INFO(get_logger(), "bag file path: %s", result->filename.c_str()); float requested_seconds = goal->bag_time; const std::vector& selected_topics = goal->topics; @@ -246,9 +246,10 @@ class Server: public rclcpp::Node goto ret; } + RCLCPP_INFO(get_logger(), "Start bagging"); auto bag = std::make_shared(); rosbag2_storage::StorageOptions storage_options; - storage_options.uri = filename.string(); + storage_options.uri = result->filename; bag->open(storage_options); goal_handle->publish_feedback(feedback); @@ -269,9 +270,9 @@ class Server: public rclcpp::Node goal_handle->publish_feedback(feedback); } msg_inc += 1; - // empty deque when done writing to bag - topic_messages[topic].clear(); } + // empty deque when done writing to bag + topic_messages[topic].clear(); } feedback->progress = 1.0; goal_handle->publish_feedback(feedback); @@ -279,6 +280,7 @@ class Server: public rclcpp::Node result->success = true; } + catch(const std::exception& e) { result->status = e.what(); From 44b6f2f4241162f6c10a85ea2b40bf1d64f78fee Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Wed, 7 May 2025 22:06:59 -0400 Subject: [PATCH 04/12] improve client function calls and tui design --- src/mil_common/mil_msgs/CMakeLists.txt | 5 +- .../mil_msgs/action/BagOnline.action | 2 +- .../include/online_bagger/client.hpp | 32 ++-- src/mil_common/online_bagger/src/client.cpp | 82 +++++++---- .../online_bagger/src/online_bagger.cpp | 139 +++++++++--------- src/mil_common/online_bagger/src/server.cpp | 64 +++++--- 6 files changed, 189 insertions(+), 135 deletions(-) diff --git a/src/mil_common/mil_msgs/CMakeLists.txt b/src/mil_common/mil_msgs/CMakeLists.txt index d7ac42e7..baeb4b5e 100644 --- a/src/mil_common/mil_msgs/CMakeLists.txt +++ b/src/mil_common/mil_msgs/CMakeLists.txt @@ -14,7 +14,10 @@ find_package(action_msgs REQUIRED) find_package(geometry_msgs REQUIRED) find_package(sensor_msgs REQUIRED) -set(action_files "action/MoveTo.action" "action/BagOnline.action") +set(action_files + "action/MoveTo.action" + "action/BagOnline.action" +) # File name is in CamelCase, but must import in snake_case set(msg_files diff --git a/src/mil_common/mil_msgs/action/BagOnline.action b/src/mil_common/mil_msgs/action/BagOnline.action index 6edec87e..ea53ff73 100644 --- a/src/mil_common/mil_msgs/action/BagOnline.action +++ b/src/mil_common/mil_msgs/action/BagOnline.action @@ -17,7 +17,7 @@ float32 bag_time bool success string status # Contains error information if success is False -string filename # Full path to file where bag was written +# string filename # Full path to file where bag was written --- diff --git a/src/mil_common/online_bagger/include/online_bagger/client.hpp b/src/mil_common/online_bagger/include/online_bagger/client.hpp index 5cafea31..d6977cc7 100644 --- a/src/mil_common/online_bagger/include/online_bagger/client.hpp +++ b/src/mil_common/online_bagger/include/online_bagger/client.hpp @@ -12,6 +12,8 @@ class Client: public rclcpp::Node { public: using BagOnlineGoalHandle = rclcpp_action::ClientGoalHandle; + using TopicsFuture = std::shared_future>; + using BagFuture = std::shared_future>; enum class State { @@ -20,28 +22,36 @@ class Client: public rclcpp::Node Bagging }; + struct BagOptions + { + BagOptions() + { + goal.bag_time = 1.0f; + } + + mil_msgs::action::BagOnline::Goal goal; + std::function on_progress = nullptr; + std::function on_finish = nullptr; + }; + Client(); ~Client(); - + State get_state(); - std::shared_future> - get_bag_topics(std::function::SharedFuture future)> callback); - - std::shared_future - start_bagging(const mil_msgs::action::BagOnline::Goal& goal, - const rclcpp_action::Client::SendGoalOptions& options); - - void finish_bagging(); + TopicsFuture get_bag_topics(std::function callback = nullptr); + BagFuture bag(const BagOptions& options); private: + using TopicsPromise = std::promise>; + using BagPromise = std::promise>; - std::atomic state; + std::atomic bagging; rclcpp_action::Client::SharedPtr action_client; rclcpp::Client::SharedPtr srv_client; rclcpp::TimerBase::SharedPtr alive_timer; - void is_alive(); + std::function on_state_change; }; diff --git a/src/mil_common/online_bagger/src/client.cpp b/src/mil_common/online_bagger/src/client.cpp index d482cd97..2e3a16ec 100644 --- a/src/mil_common/online_bagger/src/client.cpp +++ b/src/mil_common/online_bagger/src/client.cpp @@ -16,10 +16,7 @@ Client::Client():rclcpp::Node("online_bagger_client") action_client = rclcpp_action::create_client(this, BAG_ACTION_NAME); srv_client = create_client(TOPIC_SERVICE_NAME); - if(action_client->wait_for_action_server(std::chrono::seconds(1)) == true) - state = State::Ready; - - alive_timer = create_wall_timer(std::chrono::milliseconds(100), std::bind(&Client::is_alive, this)); + action_client->wait_for_action_server(std::chrono::milliseconds(500)); } Client::~Client() @@ -29,45 +26,66 @@ Client::~Client() Client::State Client::get_state() { - return state; -} + if(!action_client->action_server_is_ready()) + return State::Waiting; + + if(!bagging) + return State::Ready; + return State::Bagging; +} -std::shared_future> - Client::get_bag_topics(std::function::SharedFuture future)> callback) +Client::TopicsFuture Client::get_bag_topics(std::function callback) { + std::shared_ptr promise = std::make_shared(); + TopicsFuture future = promise->get_future().share(); + auto request = std::make_shared(); - return srv_client->async_send_request(request, callback).future; -} + srv_client->async_send_request(request, [promise, future, callback](rclcpp::Client::SharedFuture response_future) { + promise->set_value(std::move(response_future.get()->topics)); + if(callback) + callback(future); + }); -std::shared_future - Client::start_bagging(const mil_msgs::action::BagOnline::Goal& goal, - const rclcpp_action::Client::SendGoalOptions& options) -{ - state = State::Bagging; - return action_client->async_send_goal(goal, options); + return future; } -void Client::finish_bagging() +Client::BagFuture Client::bag(const Client::BagOptions& options) { - action_client->async_cancel_all_goals(); - state = State::Ready; -} + std::shared_ptr promise = std::make_shared(); + BagFuture future = promise->get_future().share(); -void Client::is_alive() -{ - if(state == State::Waiting && action_client->action_server_is_ready()) - { - state = State::Ready; - } - else if(state == State::Ready && !action_client->action_server_is_ready()) - { - state = State::Waiting; - } - else if(state == State::Bagging && !action_client->action_server_is_ready()) + bagging = true; + + rclcpp_action::Client::SendGoalOptions goal_options; + goal_options.goal_response_callback = [this, on_finish = options.on_finish, promise, future](Client::BagOnlineGoalHandle::SharedPtr goal_handle){ + if(!goal_handle) + { + promise->set_value({false, "Bag request is rejected by the online bagger server."}); + if(on_finish) + on_finish(future); + bagging = false; + } + }; + + if(options.on_progress) { - state = State::Waiting; + goal_options.feedback_callback = [on_progress = options.on_progress]([[maybe_unused]]Client::BagOnlineGoalHandle::SharedPtr goal_handle, + const std::shared_ptr feedback){ + on_progress(feedback->progress); + }; } + + goal_options.result_callback = [this, on_finish = options.on_finish, promise, future](const Client::BagOnlineGoalHandle::WrappedResult& result){ + promise->set_value({result.result->success, std::move(result.result->status)}); + if(on_finish) + on_finish(future); + bagging = false; + }; + + action_client->async_send_goal(options.goal, goal_options); + + return future; } } diff --git a/src/mil_common/online_bagger/src/online_bagger.cpp b/src/mil_common/online_bagger/src/online_bagger.cpp index cdf29998..5635926a 100644 --- a/src/mil_common/online_bagger/src/online_bagger.cpp +++ b/src/mil_common/online_bagger/src/online_bagger.cpp @@ -109,65 +109,69 @@ class Tui Component ui_body = ui_topic_list | vscroll_indicator | frame ; // Bottom - auto on_quit = [this]{screen.Exit();}; + auto on_quit = [this]{ + exited = true; + screen.Exit(); + }; auto on_bag = [this, ui_topic_list, client]{ bag_progress = 0.0f; - mil_msgs::action::BagOnline::Goal goal; - goal.topics = ui_topic_list->get_selected(); - goal.bag_time = 1.0f; + Client::BagOptions options; + options.goal.topics = ui_topic_list->get_selected(); - if(!ask_bag_detail(goal.bag_name, goal.bag_time)) + if(!ask_bag_detail(options.goal.bag_name, options.goal.bag_time)) return; - auto goal_options = rclcpp_action::Client::SendGoalOptions(); - goal_options.goal_response_callback = [this](Client::BagOnlineGoalHandle::SharedPtr goal_handle){ - if(!goal_handle) - { - screen.Post([this]{show_message("Failed", "Request is rejected by the online bagger server");}); - } - }; - goal_options.result_callback = [this, client = client](const Client::BagOnlineGoalHandle::WrappedResult& result){ - std::string message = result.result->success ? "Succuessfully save bag file to " + result.result->filename : - "Failed to bag topics: " + result.result->status; - screen.Post([this, message = std::move(message), success = result.result->success]{ - show_message(success ? "Success" : "Failed", message); + options.on_finish = [this](Client::BagFuture future){ + screen.Post([this, future]{ + auto& result = future.get(); + std::string message = result.first ? "Succuessfully save bag file to " + result.second : + "Failed to bag topics: " + result.second; + show_message("Bag Result", message); }); - client->finish_bagging(); }; - goal_options.feedback_callback = [this]( - [[maybe_unused]]Client::BagOnlineGoalHandle::SharedPtr goal_handle, - const std::shared_ptr feedback){ - screen.Post([this, progress = feedback->progress]{bag_progress = progress;}); + + options.on_progress = [this](float progress){ + bag_progress = progress; + screen.PostEvent(Event::Custom); }; - client->start_bagging(goal, goal_options); + client->bag(options); }; auto on_refresh = [this, ui_topic_list, client]{ - client->get_bag_topics([&](rclcpp::Client::SharedFuture future){ - screen.Post([ui_topic_list, topics = std::move(future.get()->topics)] () mutable { + client->get_bag_topics([&](Client::TopicsFuture future){ + screen.Post([ui_topic_list, topics = std::move(future.get())] () mutable { ui_topic_list->refresh(std::move(topics)); }); }); }; Component ui_status_bar = Renderer([this, client]{ - if(client->get_state() == Client::State::Waiting) + if(last_state == Client::State::Waiting) { return hbox({ text("Connecting to bag server "), spinner(15, clock.now().nanoseconds() / 200'000'000) }); } - else if(client->get_state() == Client::State::Ready) + else if(last_state == Client::State::Ready) + { return filler(); + } else - return gauge(bag_progress); + { + return hbox({ + gauge(bag_progress), + filler(), + }); + } }); + + Component ui_bottom_bar = Container::Horizontal({ - ui_status_bar | vcenter | xflex, + ui_status_bar | vcenter | flex, Button("Quit", on_quit, ButtonOption::Border()) | align_right, Maybe(Button("Refresh", on_refresh, ButtonOption::Border()), [client]{ return client->get_state() != Client::State::Waiting; @@ -187,27 +191,36 @@ class Tui ui_bottom_bar }); - on_refresh(); - - Loop loop(&screen, main_ui); - - std::thread refresh_thread([&]{ - while(!loop.HasQuitted()) + last_state = Client::State::Waiting; + std::thread refresh_thread([this, client, &on_refresh]{ + while(!exited) { + Client::State state = client->get_state(); + if(state != Client::State::Bagging) + { + screen.PostEvent(Event::Custom); + } + + if(state == Client::State::Ready && last_state == Client::State::Waiting) + { + on_refresh(); + } + + last_state = state; std::this_thread::sleep_for(std::chrono::milliseconds(100)); - screen.PostEvent(Event::Custom); } }); - loop.Run(); - + screen.Loop(main_ui); refresh_thread.join(); } private: + std::atomic exited = false; + std::atomic last_state; rclcpp::Clock clock; ScreenInteractive screen = ScreenInteractive::Fullscreen(); - float bag_progress = 0.0f; + std::atomic bag_progress = 0.0f; bool ask_bag_detail(std::string& name, float& time) { @@ -307,8 +320,8 @@ int main(int argc, char** argv) } else if(mode == Mode::LIST) { - auto future = client->get_bag_topics([](rclcpp::Client::SharedFuture future){ - for(const std::string& topic : future.get()->topics) + auto future = client->get_bag_topics([](Client::TopicsFuture future){ + for(const std::string& topic : future.get()) { std::cout << topic << std::endl; } @@ -320,19 +333,19 @@ int main(int argc, char** argv) } else if(mode == Mode::BAG) { - auto future = client->get_bag_topics([&]([[maybe_unused]]rclcpp::Client::SharedFuture future){}); + auto future = client->get_bag_topics(); if(rclcpp::spin_until_future_complete(client, future, std::chrono::seconds(2)) == rclcpp::FutureReturnCode::SUCCESS) { - mil_msgs::action::BagOnline::Goal goal; - goal.topics = future.get()->topics; - goal.bag_time = 1.0f; + Client::BagOptions bag_options; + bag_options.goal.topics = std::move(future.get()); + bag_options.goal.bag_time = 1.0f; po::options_description bag_opts("bag options"); bag_opts.add_options() - ("name,n", po::value(&goal.bag_name), "Bag name") - ("time,t", po::value(&goal.bag_time), "Time in seconds") - ("topics,p", po::value>(&goal.topics)->multitoken(), "Topics to bag"); + ("name,n", po::value(&bag_options.goal.bag_name), "Bag name") + ("time,t", po::value(&bag_options.goal.bag_time), "Time in seconds") + ("topics,p", po::value>(&bag_options.goal.topics)->multitoken(), "Topics to bag"); po::variables_map bag_vm; po::store(po::command_line_parser(options) @@ -341,29 +354,19 @@ int main(int argc, char** argv) bag_vm); po::notify(bag_vm); - auto goal_options = rclcpp_action::Client::SendGoalOptions(); - goal_options.goal_response_callback = [](online_bagger::Client::BagOnlineGoalHandle::SharedPtr goal_handle){ - if(!goal_handle) - { - std::cout << "Failed to bag: Request is rejected by the online bagger server.\n"; - } - }; - goal_options.result_callback = [&](const online_bagger::Client::BagOnlineGoalHandle::WrappedResult& result){ - std::string message = result.result->success ? "Succuessfully save bag file to " + result.result->filename : - "Failed to bag topics: " + result.result->status; - std::cout << message << ".\n"; - client->finish_bagging(); + bag_options.on_finish = [&](Client::BagFuture future){ + auto& result = future.get(); + std::string message = result.first ? "Succuessfully save bag file to " + result.second : + "Failed to bag topics: " + result.second; + std::cout << '\n' << message << ".\n"; rclcpp::shutdown(); }; - goal_options.feedback_callback = [&]( - [[maybe_unused]]online_bagger::Client::BagOnlineGoalHandle::SharedPtr goal_handle, - const std::shared_ptr feedback){ - std::cout << "Bagging " << feedback->progress << "%\n"; + + bag_options.on_progress = [&](float progress){ + std::cout << "Bagging\t" << static_cast(progress * 100) << "%\r"; }; - auto bag_future = client->start_bagging(goal, goal_options); - - rclcpp::spin(client); + auto bag_future = client->bag(bag_options); std::thread thread([&]{ while(bag_future.wait_for(std::chrono::seconds(1)) == std::future_status::timeout) @@ -377,6 +380,8 @@ int main(int argc, char** argv) } }); + rclcpp::spin(client); + thread.join(); } else diff --git a/src/mil_common/online_bagger/src/server.cpp b/src/mil_common/online_bagger/src/server.cpp index aae1e693..0934d7e4 100644 --- a/src/mil_common/online_bagger/src/server.cpp +++ b/src/mil_common/online_bagger/src/server.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -21,7 +22,9 @@ class Server: public rclcpp::Node { public: - Server():rclcpp::Node("online_bagger_server") + Server():rclcpp::Node("online_bagger_server"), + work_guard(boost::asio::make_work_guard(context)), + work_thread([this]{context.run();}) { get_params(); @@ -71,7 +74,8 @@ class Server: public rclcpp::Node } ~Server() { - + work_guard.reset(); + work_thread.join(); } private: @@ -89,6 +93,10 @@ class Server: public rclcpp::Node rclcpp::TimerBase::SharedPtr resubscriber; size_t iteration_count = 0; + boost::asio::io_context context; + boost::asio::executor_work_guard work_guard; + std::thread work_thread; + void handle_topics_req([[maybe_unused]]mil_msgs::srv::BagTopics::Request::ConstSharedPtr request, mil_msgs::srv::BagTopics::Response::SharedPtr response) { @@ -104,6 +112,12 @@ class Server: public rclcpp::Node [[maybe_unused]]const rclcpp_action::GoalUUID & uuid, [[maybe_unused]]std::shared_ptr goal) { + if(!streaming) + { + RCLCPP_WARN(get_logger(), "Rejected goal because there is an ongoing goal"); + return rclcpp_action::GoalResponse::REJECT; + } + RCLCPP_INFO(get_logger(), "Accepted goal from online bagger client"); return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; } @@ -139,7 +153,9 @@ class Server: public rclcpp::Node void handle_accepted(std::shared_ptr goal_handle) { - std::thread(std::bind(&Server::start_bagging, this, std::placeholders::_1), goal_handle).detach(); + context.post([this, goal_handle]{ + start_bagging(goal_handle); + }); } void bagger_callback(std::shared_ptr msg, const std::string& topic) @@ -202,18 +218,11 @@ class Server: public rclcpp::Node void start_bagging(std::shared_ptr goal_handle) { auto result = std::make_shared(); - if(!streaming.exchange(false)) - { - result->status = "Bag Request came in while bagging, priority given to prior request", - result->success = false; - goto ret; - } + streaming = false; try { const auto goal = goal_handle->get_goal(); - result->filename = get_bag_name(goal->bag_name).string(); - RCLCPP_INFO(get_logger(), "bag file path: %s", result->filename.c_str()); float requested_seconds = goal->bag_time; const std::vector& selected_topics = goal->topics; @@ -246,30 +255,36 @@ class Server: public rclcpp::Node goto ret; } - RCLCPP_INFO(get_logger(), "Start bagging"); + RCLCPP_INFO(get_logger(), "Start bagging, total messages %ld", total_messages); auto bag = std::make_shared(); rosbag2_storage::StorageOptions storage_options; - storage_options.uri = result->filename; + storage_options.uri = get_bag_name(goal->bag_name).string(); + RCLCPP_INFO(get_logger(), "bag file path: %s", storage_options.uri.c_str()); + bag->open(storage_options); goal_handle->publish_feedback(feedback); auto topic_names_and_types = get_topic_names_and_types(); - size_t msg_inc = 0; + size_t feedback_threshold = total_messages / 100; for(const auto& [topic, index] : bag_topics) { - for(auto it=topic_messages[topic].begin(); - it!=topic_messages[topic].end(); - it++) + + for(size_t i=0;iwrite(it->second, topic, topic_names_and_types[topic][0],it->first); - if(msg_inc % 50 == 0) // send feedback every 50 messages + const auto& [time, message] = topic_messages[topic][i]; + bag->write(message, + topic, + topic_names_and_types[topic][0], + time); + + if(feedback_threshold == 0 || i % feedback_threshold == 0) // send feedback every 50 messages { - feedback->progress = static_cast(msg_inc) / total_messages; + feedback->progress = static_cast(i+1) / total_messages; goal_handle->publish_feedback(feedback); } - msg_inc += 1; + } // empty deque when done writing to bag topic_messages[topic].clear(); @@ -277,7 +292,8 @@ class Server: public rclcpp::Node feedback->progress = 1.0; goal_handle->publish_feedback(feedback); bag->close(); - + + result->status = std::move(storage_options.uri); result->success = true; } @@ -290,7 +306,7 @@ class Server: public rclcpp::Node if(result->success) { goal_handle->succeed(result); - RCLCPP_INFO(get_logger(), "Successfully bag to %s", result->filename.c_str()); + RCLCPP_INFO(get_logger(), "Successfully bag to %s", result->status.c_str()); } else { @@ -304,6 +320,8 @@ class Server: public rclcpp::Node void subscribe() { auto topic_names_and_types = get_topic_names_and_types(); + std::vector subscribed_topics; + for (auto& [topic, info] : subscriber_list) { auto& [time, subscribed] = info; From 0180b7bc51ef191ab5363524a51df6e5ea953547 Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Mon, 2 Jun 2025 21:12:28 -0400 Subject: [PATCH 05/12] add online bagger server to gazebo.launch.py in subjugator_bringup --- .../subjugator_bringup/launch/gazebo.launch.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/subjugator/subjugator_bringup/launch/gazebo.launch.py b/src/subjugator/subjugator_bringup/launch/gazebo.launch.py index bd44bfd3..5f9b5e7b 100644 --- a/src/subjugator/subjugator_bringup/launch/gazebo.launch.py +++ b/src/subjugator/subjugator_bringup/launch/gazebo.launch.py @@ -58,10 +58,18 @@ def generate_launch_description(): output="screen", ) - return LaunchDescription( - [ - gz_sim, - subjugator_setup, - bridge, + bagger = Node( + package="online_bagger", + executable="online_bagger_server_node", + parameters=[ + { + # add topics to bag here + "topics": ["/imu/data"], + }, ], + output="screen", + ) + + return LaunchDescription( + [gz_sim, subjugator_setup, bridge, bagger], ) From d934ad39f4d61d3efdcd8a5d020aa5ff54bcbd1f Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Wed, 4 Jun 2025 21:26:17 -0400 Subject: [PATCH 06/12] fix the duration calculation issue --- src/mil_common/online_bagger/src/server.cpp | 291 +++++++++----------- 1 file changed, 136 insertions(+), 155 deletions(-) diff --git a/src/mil_common/online_bagger/src/server.cpp b/src/mil_common/online_bagger/src/server.cpp index 0934d7e4..ea4534a0 100644 --- a/src/mil_common/online_bagger/src/server.cpp +++ b/src/mil_common/online_bagger/src/server.cpp @@ -1,75 +1,74 @@ -#include -#include +#include +#include +#include +#include + +#include +#include +#include +#include #include #include "mil_msgs/action/bag_online.hpp" #include "mil_msgs/srv/bag_topics.hpp" #include "online_bagger/common.hpp" -#include -#include -#include -#include - -#include -#include -#include -#include +#include +#include namespace online_bagger { -class Server: public rclcpp::Node +class Server : public rclcpp::Node { - public: - - Server():rclcpp::Node("online_bagger_server"), - work_guard(boost::asio::make_work_guard(context)), - work_thread([this]{context.run();}) + public: + Server() + : rclcpp::Node("online_bagger_server") + , work_guard(boost::asio::make_work_guard(context)) + , work_thread([this] { context.run(); }) { get_params(); - topics_service = create_service(TOPIC_SERVICE_NAME, - std::bind(&Server::handle_topics_req, this, std::placeholders::_1, std::placeholders::_2) - ); - - _action_server = rclcpp_action::create_server(this, - BAG_ACTION_NAME, - std::bind(&Server::handle_goal, this, std::placeholders::_1, std::placeholders::_2), + topics_service = create_service( + TOPIC_SERVICE_NAME, + std::bind(&Server::handle_topics_req, this, std::placeholders::_1, std::placeholders::_2)); + + _action_server = rclcpp_action::create_server( + this, BAG_ACTION_NAME, std::bind(&Server::handle_goal, this, std::placeholders::_1, std::placeholders::_2), std::bind(&Server::handle_cancel, this, std::placeholders::_1), - std::bind(&Server::handle_accepted, this, std::placeholders::_1) - ); - + std::bind(&Server::handle_accepted, this, std::placeholders::_1)); + subscribe(); - if(subscriber_list.size() == successful_subscription_count && subscriber_list.size() != 0) + if (subscriber_list.size() == successful_subscription_count && subscriber_list.size() != 0) { - RCLCPP_INFO(get_logger(), "Subscribed to %ld of %ld topics", - successful_subscription_count, - subscriber_list.size()); + RCLCPP_INFO(get_logger(), "Subscribed to %ld of %ld topics", successful_subscription_count, + subscriber_list.size()); } else { - RCLCPP_INFO(get_logger(), "Subscribed to %ld of %ld topics, will try again every %.2f seconds", - successful_subscription_count, - subscriber_list.size(), - resubscribe_period); - resubscriber = create_wall_timer(std::chrono::milliseconds(static_cast(resubscribe_period*1000)), [this]{ - if ((successful_subscription_count == subscriber_list.size()) && resubscriber) - { - resubscriber->cancel(); - if(subscriber_list.size() == 0) - { - RCLCPP_WARN(get_logger(), "No topics selected to subscribe to. Closing."); - rclcpp::shutdown(); - } - else - { - RCLCPP_INFO(get_logger(), "All topics are subscribed! Shutting down resubscriber"); - } - - return; - } - subscribe(); - }); + RCLCPP_INFO(get_logger(), "Subscribed to %ld of %ld topics, will try again every %.2f seconds", + successful_subscription_count, subscriber_list.size(), resubscribe_period); + resubscriber = + create_wall_timer(std::chrono::milliseconds(static_cast(resubscribe_period * 1000)), + [this] + { + if ((successful_subscription_count == subscriber_list.size()) && resubscriber) + { + resubscriber->cancel(); + if (subscriber_list.size() == 0) + { + RCLCPP_WARN(get_logger(), "No topics selected to subscribe to. Closing."); + rclcpp::shutdown(); + } + else + { + RCLCPP_INFO(get_logger(), "All topics are subscribed! Shutting down " + "resubscriber"); + } + + return; + } + subscribe(); + }); } } ~Server() @@ -78,7 +77,7 @@ class Server: public rclcpp::Node work_thread.join(); } - private: + private: using BagOnlineGoalHandle = rclcpp_action::ServerGoalHandle; std::atomic_bool streaming = true; rclcpp_action::Server::SharedPtr _action_server; @@ -89,7 +88,8 @@ class Server: public rclcpp::Node float resubscribe_period; std::filesystem::path dir; std::unordered_map> subscriber_list; - std::unordered_map>>> topic_messages; + std::unordered_map>>> + topic_messages; rclcpp::TimerBase::SharedPtr resubscriber; size_t iteration_count = 0; @@ -97,55 +97,54 @@ class Server: public rclcpp::Node boost::asio::executor_work_guard work_guard; std::thread work_thread; - void handle_topics_req([[maybe_unused]]mil_msgs::srv::BagTopics::Request::ConstSharedPtr request, - mil_msgs::srv::BagTopics::Response::SharedPtr response) + void handle_topics_req([[maybe_unused]] mil_msgs::srv::BagTopics::Request::ConstSharedPtr request, + mil_msgs::srv::BagTopics::Response::SharedPtr response) { - for(const auto& [topic, info] : subscriber_list) + for (auto const& [topic, info] : subscriber_list) { - const auto& [time, subscribed] = info; - if(subscribed != nullptr) + auto const& [time, subscribed] = info; + if (subscribed != nullptr) response->topics.push_back(topic); } } - rclcpp_action::GoalResponse handle_goal( - [[maybe_unused]]const rclcpp_action::GoalUUID & uuid, - [[maybe_unused]]std::shared_ptr goal) + rclcpp_action::GoalResponse + handle_goal([[maybe_unused]] rclcpp_action::GoalUUID const& uuid, + [[maybe_unused]] std::shared_ptr goal) { - if(!streaming) + if (!streaming) { RCLCPP_WARN(get_logger(), "Rejected goal because there is an ongoing goal"); return rclcpp_action::GoalResponse::REJECT; } - + RCLCPP_INFO(get_logger(), "Accepted goal from online bagger client"); return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; } - rclcpp_action::CancelResponse handle_cancel( - [[maybe_unused]]const std::shared_ptr goal_handle) + rclcpp_action::CancelResponse handle_cancel([[maybe_unused]] std::shared_ptr const goal_handle) { return rclcpp_action::CancelResponse::REJECT; } - float get_topic_duration(const std::string& topic) + float get_topic_duration(std::string const& topic) { - if(topic_messages[topic].size() == 0) + if (topic_messages[topic].size() == 0) return 0.0f; - - return (topic_messages[topic].back().first - topic_messages[topic].front().first).nanoseconds() / 1e-9f; + + return (topic_messages[topic].back().first - topic_messages[topic].front().first).nanoseconds() / 1e9f; } - size_t get_time_index(const std::string& topic, float requested_seconds) + size_t get_time_index(std::string const& topic, float requested_seconds) { - if(requested_seconds == 0) + if (requested_seconds == 0) return 0; - + float topic_duration = get_topic_duration(topic); - if(topic_duration == 0) + if (topic_duration == 0) return 0; - + float ratio = requested_seconds / topic_duration; size_t index = topic_messages[topic].size() * (1 - std::clamp(ratio, 0.0f, 1.0f)); return index; @@ -153,61 +152,56 @@ class Server: public rclcpp::Node void handle_accepted(std::shared_ptr goal_handle) { - context.post([this, goal_handle]{ - start_bagging(goal_handle); - }); + context.post([this, goal_handle] { start_bagging(goal_handle); }); } - void bagger_callback(std::shared_ptr msg, const std::string& topic) + void bagger_callback(std::shared_ptr msg, std::string const& topic) { - if(!streaming) + if (!streaming) return; - - + iteration_count += 1; - topic_messages[topic].push_back({now(), msg}); + topic_messages[topic].push_back({ now(), msg }); float time_diff = get_topic_duration(topic); - if(iteration_count % 100 == 0) + if (iteration_count % 100 == 0) { - RCLCPP_DEBUG(get_logger(), "%s has %ld messages spaning %.2f seconds.", - topic.c_str(), - topic_messages[topic].size(), - time_diff); + RCLCPP_DEBUG(get_logger(), "%s has %ld messages spanning %.2f seconds.", topic.c_str(), + topic_messages[topic].size(), time_diff); } - while(time_diff > subscriber_list[topic].first && rclcpp::ok()) + while (time_diff > subscriber_list[topic].first && rclcpp::ok()) { topic_messages[topic].pop_front(); time_diff = get_topic_duration(topic); } - } - std::filesystem::path get_bag_name(const std::filesystem::path& filename) + std::filesystem::path get_bag_name(std::filesystem::path const& filename) { std::filesystem::path default_dir = dir; std::string date = boost::gregorian::to_simple_string(boost::gregorian::day_clock::local_day()); - std::string time = boost::posix_time::to_simple_string(boost::posix_time::second_clock::local_time().time_of_day()); - if(dated_folder) + std::string time = + boost::posix_time::to_simple_string(boost::posix_time::second_clock::local_time().time_of_day()); + if (dated_folder) { default_dir /= date; } std::filesystem::path bag_dir = default_dir / filename.parent_path(); std::filesystem::path bag_name = filename.filename(); - if(!std::filesystem::exists(bag_dir)) + if (!std::filesystem::exists(bag_dir)) { std::filesystem::create_directories(bag_dir); } - if(!bag_name.has_filename()) + if (!bag_name.has_filename()) { bag_name.replace_filename(date + '-' + time); } - if(bag_name.extension() != "bag") + if (bag_name.extension() != "bag") { bag_name.replace_extension("bag"); } @@ -222,25 +216,25 @@ class Server: public rclcpp::Node try { - const auto goal = goal_handle->get_goal(); + auto const goal = goal_handle->get_goal(); float requested_seconds = goal->bag_time; - const std::vector& selected_topics = goal->topics; + std::vector const& selected_topics = goal->topics; auto feedback = std::make_shared(); size_t total_messages = 0; std::unordered_map bag_topics; - for(const auto& topic : selected_topics) + for (auto const& topic : selected_topics) { auto it = subscriber_list.find(topic); - if(it == subscriber_list.end()) + if (it == subscriber_list.end()) continue; - - const auto& [time, subscribed] = it->second; - if(subscribed == nullptr) + + auto const& [time, subscribed] = it->second; + if (subscribed == nullptr) continue; - if(topic_messages[topic].size() == 0) + if (topic_messages[topic].size() == 0) continue; size_t index = get_time_index(topic, requested_seconds); @@ -248,7 +242,7 @@ class Server: public rclcpp::Node bag_topics[topic] = index; } - if(total_messages == 0) + if (total_messages == 0) { result->success = false; result->status = "No messages to bag"; @@ -268,23 +262,18 @@ class Server: public rclcpp::Node auto topic_names_and_types = get_topic_names_and_types(); size_t feedback_threshold = total_messages / 100; - for(const auto& [topic, index] : bag_topics) + for (auto const& [topic, index] : bag_topics) { - - for(size_t i=0;iwrite(message, - topic, - topic_names_and_types[topic][0], - time); + auto const& [time, message] = topic_messages[topic][i]; + bag->write(message, topic, topic_names_and_types[topic][0], time); - if(feedback_threshold == 0 || i % feedback_threshold == 0) // send feedback every 50 messages + if (feedback_threshold == 0 || i % feedback_threshold == 0) // send feedback every 50 messages { - feedback->progress = static_cast(i+1) / total_messages; + feedback->progress = static_cast(i + 1) / total_messages; goal_handle->publish_feedback(feedback); } - } // empty deque when done writing to bag topic_messages[topic].clear(); @@ -292,18 +281,18 @@ class Server: public rclcpp::Node feedback->progress = 1.0; goal_handle->publish_feedback(feedback); bag->close(); - + result->status = std::move(storage_options.uri); result->success = true; } - - catch(const std::exception& e) + + catch (std::exception const& e) { result->status = e.what(); result->success = false; } -ret: - if(result->success) + ret: + if (result->success) { goal_handle->succeed(result); RCLCPP_INFO(get_logger(), "Successfully bag to %s", result->status.c_str()); @@ -313,58 +302,53 @@ class Server: public rclcpp::Node goal_handle->abort(result); RCLCPP_INFO(get_logger(), "Failed to bag: %s", result->status.c_str()); } - + streaming = true; } void subscribe() - { + { auto topic_names_and_types = get_topic_names_and_types(); std::vector subscribed_topics; - for (auto& [topic, info] : subscriber_list) + for (auto& [topic, info] : subscriber_list) { auto& [time, subscribed] = info; - - if(!subscribed) + + if (!subscribed) { auto it = topic_names_and_types.find(topic); - if(it != topic_names_and_types.end()) + if (it != topic_names_and_types.end()) { successful_subscription_count++; subscribed = create_generic_subscription( - topic, - it->second[0], - rclcpp::QoS(10), - [this, topic](std::shared_ptr msg){ - bagger_callback(msg, topic); - } - ); + topic, it->second[0], rclcpp::QoS(10), + [this, topic](std::shared_ptr msg) { bagger_callback(msg, topic); }); } } } } - void add_env_var(const std::vector& var) + void add_env_var(std::vector const& var) { - for(auto& topic : var) + for (auto& topic : var) { subscriber_list.emplace(std::move(topic), std::make_pair(stream_time, nullptr)); - } + } } void get_params() { boost::process::environment env = boost::this_process::environment(); auto it = env.find("BAG_DIR"); - if(it != env.end()) + if (it != env.end()) { dir = it->to_string(); } else { it = env.find("HOME"); - if(it != env.end()) + if (it != env.end()) { dir = std::filesystem::path(it->to_string()) / "bag"; } @@ -377,38 +361,38 @@ class Server: public rclcpp::Node dir = std::filesystem::path(declare_parameter("bag_package_path", dir.string())); - stream_time = declare_parameter("stream_time", 30); resubscribe_period = declare_parameter("resubscribe_period", 3.0); dated_folder = declare_parameter("dated_folder", true); std::vector topics_param; - for(const auto& [topic, types] : get_topic_names_and_types()) + for (auto const& [topic, types] : get_topic_names_and_types()) { topics_param.push_back(topic); } topics_param = declare_parameter("topics", std::vector(topics_param)); - for (const std::string& topic : topics_param) + for (std::string const& topic : topics_param) { std::istringstream iss(topic); std::string token; std::vector tokens; - while (std::getline(iss, token, ',')) + while (std::getline(iss, token, ',')) { tokens.push_back(std::move(token)); } - size_t topic_stream_time = stream_time; - if(tokens.size() == 2) + size_t topic_stream_time = stream_time; + if (tokens.size() == 2) { try { topic_stream_time = std::stoul(tokens[1]); } - catch(const std::exception& e) + catch (std::exception const& e) { - RCLCPP_WARN(get_logger(), "Failed to set stream time %s for topic %s.", tokens[0].c_str(), tokens[1].c_str()); + RCLCPP_WARN(get_logger(), "Failed to set stream time %s for topic %s.", tokens[0].c_str(), + tokens[1].c_str()); } } @@ -416,15 +400,15 @@ class Server: public rclcpp::Node } it = env.find("BAG_ALWAYS"); - if(it != env.end()) + if (it != env.end()) { add_env_var(it->to_vector()); } - for(const auto& entry : env) + for (auto const& entry : env) { std::string key = entry.get_name(); - if (key.size() >= 4 && key.substr(0, 4) == "bag_") + if (key.size() >= 4 && key.substr(0, 4) == "bag_") { add_env_var(entry.to_vector()); } @@ -434,7 +418,7 @@ class Server: public rclcpp::Node RCLCPP_INFO(get_logger(), "Bag Directory: %s", dir.c_str()); } }; -} +} // namespace online_bagger int main(int argc, char* argv[]) { @@ -446,6 +430,3 @@ int main(int argc, char* argv[]) return 0; } - - - From 02a1542ff5242e96c760c9f754af71c106db9813 Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Thu, 5 Jun 2025 14:46:41 -0400 Subject: [PATCH 07/12] change bag file name format and add help message for online bagger command --- .../online_bagger/src/online_bagger.cpp | 380 +++++++++--------- src/mil_common/online_bagger/src/server.cpp | 2 +- 2 files changed, 197 insertions(+), 185 deletions(-) diff --git a/src/mil_common/online_bagger/src/online_bagger.cpp b/src/mil_common/online_bagger/src/online_bagger.cpp index 5635926a..72a986de 100644 --- a/src/mil_common/online_bagger/src/online_bagger.cpp +++ b/src/mil_common/online_bagger/src/online_bagger.cpp @@ -1,59 +1,59 @@ -#include "ftxui/component/screen_interactive.hpp" +#include + #include "ftxui/component/component.hpp" -#include "ftxui/dom/elements.hpp" #include "ftxui/component/loop.hpp" +#include "ftxui/component/screen_interactive.hpp" +#include "ftxui/dom/elements.hpp" +#include -#include "online_bagger/client.hpp" #include "mil_tools/tui/dialog.hpp" - -#include - -#include +#include "online_bagger/client.hpp" using namespace ftxui; using namespace online_bagger; -class TopicList: public ComponentBase +class TopicList : public ComponentBase { - public: + public: TopicList() { } ~TopicList() { - } void refresh(std::vector&& topics) { this->topics = std::move(topics); DetachAllChildren(); - if(selected) + if (selected) delete[] selected; - - if(this->topics.size() == 0) + + if (this->topics.size() == 0) return; - future = std::async(std::launch::async, [&]{ - selected = new bool[this->topics.size()]{false}; - Components items; - for(size_t i=0; itopics.size(); i++) - { - items.push_back(Checkbox(this->topics[i], &selected[i])); - } + future = std::async(std::launch::async, + [&] + { + selected = new bool[this->topics.size()]{ false }; + Components items; + for (size_t i = 0; i < this->topics.size(); i++) + { + items.push_back(Checkbox(this->topics[i], &selected[i])); + } - return Container::Vertical(items); - }); + return Container::Vertical(items); + }); return; } - const std::vector get_selected() + std::vector const get_selected() { std::vector selected_topics; - for(size_t i=0;i 0) + if (ChildCount() > 0) return ChildAt(0)->Render(); else return text("No bag topics available"); } else { - if(future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) + if (future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) { Add(future.get()); } @@ -79,8 +79,8 @@ class TopicList: public ComponentBase } } - private: - std::future future; + private: + std::future future; std::vector topics; bool* selected = nullptr; rclcpp::Clock clock; @@ -88,50 +88,54 @@ class TopicList: public ComponentBase class Tui { - public: + public: Tui() { - } ~Tui() { - } void spin(std::shared_ptr client) { // Head - Component ui_head = Renderer([]{return text("Online Bagger") | bold | center;}); + Component ui_head = Renderer([] { return text("Online Bagger") | bold | center; }); // Body std::shared_ptr ui_topic_list = std::make_shared(); - - Component ui_body = ui_topic_list | vscroll_indicator | frame ; + + Component ui_body = ui_topic_list | vscroll_indicator | frame; // Bottom - auto on_quit = [this]{ + auto on_quit = [this] + { exited = true; screen.Exit(); }; - auto on_bag = [this, ui_topic_list, client]{ + auto on_bag = [this, ui_topic_list, client] + { bag_progress = 0.0f; - Client::BagOptions options; + Client::BagOptions options; options.goal.topics = ui_topic_list->get_selected(); - if(!ask_bag_detail(options.goal.bag_name, options.goal.bag_time)) + if (!ask_bag_detail(options.goal.bag_name, options.goal.bag_time)) return; - options.on_finish = [this](Client::BagFuture future){ - screen.Post([this, future]{ - auto& result = future.get(); - std::string message = result.first ? "Succuessfully save bag file to " + result.second : - "Failed to bag topics: " + result.second; - show_message("Bag Result", message); - }); + options.on_finish = [this](Client::BagFuture future) + { + screen.Post( + [this, future] + { + auto& result = future.get(); + std::string message = result.first ? "Succuessfully save bag file to " + result.second : + "Failed to bag topics: " + result.second; + show_message("Bag Result", message); + }); }; - - options.on_progress = [this](float progress){ + + options.on_progress = [this](float progress) + { bag_progress = progress; screen.PostEvent(Event::Custom); }; @@ -139,83 +143,77 @@ class Tui client->bag(options); }; - auto on_refresh = [this, ui_topic_list, client]{ - client->get_bag_topics([&](Client::TopicsFuture future){ - screen.Post([ui_topic_list, topics = std::move(future.get())] () mutable { - ui_topic_list->refresh(std::move(topics)); + auto on_refresh = [this, ui_topic_list, client] + { + client->get_bag_topics( + [&](Client::TopicsFuture future) + { + screen.Post([ui_topic_list, topics = std::move(future.get())]() mutable + { ui_topic_list->refresh(std::move(topics)); }); }); - }); }; - Component ui_status_bar = Renderer([this, client]{ - if(last_state == Client::State::Waiting) + Component ui_status_bar = Renderer( + [this, client] { - return hbox({ - text("Connecting to bag server "), - spinner(15, clock.now().nanoseconds() / 200'000'000) - }); - } - else if(last_state == Client::State::Ready) - { - return filler(); - } - else - { - return hbox({ - gauge(bag_progress), - filler(), - }); - } - }); - - + if (last_state == Client::State::Waiting) + { + return hbox( + { text("Connecting to bag server "), spinner(15, clock.now().nanoseconds() / 200'000'000) }); + } + else if (last_state == Client::State::Ready) + { + return filler(); + } + else + { + return hbox({ + gauge(bag_progress), + filler(), + }); + } + }); - Component ui_bottom_bar = Container::Horizontal({ - ui_status_bar | vcenter | flex, - Button("Quit", on_quit, ButtonOption::Border()) | align_right, - Maybe(Button("Refresh", on_refresh, ButtonOption::Border()), [client]{ - return client->get_state() != Client::State::Waiting; - }) | align_right, - Maybe(Button("Bag", on_bag, ButtonOption::Border()), [client]{ - return client->get_state() == Client::State::Ready; - }) | align_right - }); + Component ui_bottom_bar = Container::Horizontal( + { ui_status_bar | vcenter | flex, Button("Quit", on_quit, ButtonOption::Border()) | align_right, + Maybe(Button("Refresh", on_refresh, ButtonOption::Border()), + [client] { return client->get_state() != Client::State::Waiting; }) | + align_right, + Maybe(Button("Bag", on_bag, ButtonOption::Border()), + [client] { return client->get_state() == Client::State::Ready; }) | + align_right }); - // Main UI - Component main_ui = Container::Vertical({ - ui_head, - Renderer([]{return separator();}), - ui_body | flex, - Renderer([]{return separator();}), - ui_bottom_bar - }); + Component main_ui = Container::Vertical({ ui_head, Renderer([] { return separator(); }), ui_body | flex, + Renderer([] { return separator(); }), ui_bottom_bar }); last_state = Client::State::Waiting; - std::thread refresh_thread([this, client, &on_refresh]{ - while(!exited) + std::thread refresh_thread( + [this, client, &on_refresh] { - Client::State state = client->get_state(); - if(state != Client::State::Bagging) - { - screen.PostEvent(Event::Custom); - } - - if(state == Client::State::Ready && last_state == Client::State::Waiting) + while (!exited) { - on_refresh(); - } + Client::State state = client->get_state(); + if (state != Client::State::Bagging) + { + screen.PostEvent(Event::Custom); + } - last_state = state; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - }); + if (state == Client::State::Ready && last_state == Client::State::Waiting) + { + on_refresh(); + } + + last_state = state; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + }); screen.Loop(main_ui); refresh_thread.join(); } - private: + private: std::atomic exited = false; std::atomic last_state; rclcpp::Clock clock; @@ -227,36 +225,42 @@ class Tui mil_tools::tui::Dialog dialog; std::string time_input = std::to_string(time); Component ui_name_input = Input(&name, InputOption::Default()); - Component ui_time_input = Input(&time_input, InputOption::Default()) | CatchEvent([](Event event) { - return event.is_character() && !(std::isdigit(event.character()[0]) || event.character()[0] == '.'); - }); - - Component content = Renderer(Container::Vertical({ui_name_input, ui_time_input}),[=]{ - return gridbox({ - {text("Bag File Name ") | vcenter, ui_name_input->Render() | border}, - {text("Bag Time ") | vcenter, ui_time_input->Render() | border} - }); - }) | vscroll_indicator | frame; + Component ui_time_input = + Input(&time_input, InputOption::Default()) | + CatchEvent( + [](Event event) + { + return event.is_character() && !(std::isdigit(event.character()[0]) || event.character()[0] == '.'); + }); - if(dialog.exec("Bag details", content) == -1) + Component content = + Renderer(Container::Vertical({ ui_name_input, ui_time_input }), + [=] + { + return gridbox({ { text("Bag File Name ") | vcenter, ui_name_input->Render() | border }, + { text("Bag Time ") | vcenter, ui_time_input->Render() | border } }); + }) | + vscroll_indicator | frame; + + if (dialog.exec("Bag details", content) == -1) return false; try { time = std::stof(time_input); } - catch(const std::exception& e) + catch (std::exception const& e) { - return false; + return false; } return true; } - void show_message(const std::string& title, const std::string& message) + void show_message(std::string const& title, std::string const& message) { mil_tools::tui::Dialog dialog; - dialog.exec(title, Renderer([&]{return paragraph(message);})); + dialog.exec(title, Renderer([&] { return paragraph(message); })); } }; @@ -264,121 +268,128 @@ namespace po = boost::program_options; void show_help_message() { - + std::cout << "Usage: online_bagger [args]." << std::endl; + std::cout << "Available commands are:" << std::endl; + std::cout << "list:\tList all topics that can be bagged." << std::endl; + std::cout << "help:\tShow this help message." << std::endl; + std::cout << "bag:\tStart bagging." << std::endl; + std::cout << "\t --topics -p \tTopics to bag." << std::endl; + std::cout << "\t\t\tWill bag all message if not specified." << std::endl; + std::cout << "\t --name -n \tBag file name." << std::endl; + std::cout << "\t\t\tDefault name is generate from bagging start time in the format HH:MM:SS." << std::endl; + std::cout << "\t --time -t \tDuration in seconds to bag from the past up to now." << std::endl; + std::cout << "\t\t\tDefault value is 1." << std::endl; + std::cout << "tui:\tShow terminal user interface." << std::endl; } -int main(int argc, char** argv) +int main(int argc, char** argv) { std::string subcommand; std::vector options; po::options_description global_opts("Global"); - global_opts.add_options() - ("subcommand", po::value(&subcommand), "Subcommand") - ("options", po::value>(&options), "Args for subcommand"); + global_opts.add_options()("subcommand", po::value(&subcommand), "Subcommand")( + "options", po::value>(&options), "Args for subcommand"); po::positional_options_description pos; pos.add("subcommand", 1).add("options", -1); po::variables_map vm; - po::store(po::command_line_parser(argc, argv) - .options(global_opts) - .positional(pos) - .allow_unregistered() - .run(), - vm); + po::store(po::command_line_parser(argc, argv).options(global_opts).positional(pos).allow_unregistered().run(), vm); po::notify(vm); - enum class Mode { + enum class Mode + { TUI, HELP, BAG, LIST }; - const std::unordered_map subcmd_mapping = { - {"tui", Mode::TUI}, - {"help", Mode::HELP}, - {"bag", Mode::BAG}, - {"list", Mode::LIST} + std::unordered_map const subcmd_mapping = { + { "tui", Mode::TUI }, { "help", Mode::HELP }, { "bag", Mode::BAG }, { "list", Mode::LIST } }; auto it = subcmd_mapping.find(subcommand); - if(it == subcmd_mapping.end()) + if (it == subcmd_mapping.end()) { show_help_message(); return 1; } - Mode mode = it -> second; + Mode mode = it->second; rclcpp::init(1, argv); std::shared_ptr client = std::make_shared(); - if(mode == Mode::HELP) + if (mode == Mode::HELP) { show_help_message(); } - else if(mode == Mode::LIST) + else if (mode == Mode::LIST) { - auto future = client->get_bag_topics([](Client::TopicsFuture future){ - for(const std::string& topic : future.get()) + auto future = client->get_bag_topics( + [](Client::TopicsFuture future) { - std::cout << topic << std::endl; - } - rclcpp::shutdown(); - }); + for (std::string const& topic : future.get()) + { + std::cout << topic << std::endl; + } + rclcpp::shutdown(); + }); - if(rclcpp::spin_until_future_complete(client, future, std::chrono::seconds(2)) == rclcpp::FutureReturnCode::TIMEOUT) + if (rclcpp::spin_until_future_complete(client, future, std::chrono::seconds(2)) == + rclcpp::FutureReturnCode::TIMEOUT) std::cout << "Online bagger server doesn't exist.\n"; } - else if(mode == Mode::BAG) - { + else if (mode == Mode::BAG) + { auto future = client->get_bag_topics(); - if(rclcpp::spin_until_future_complete(client, future, std::chrono::seconds(2)) == rclcpp::FutureReturnCode::SUCCESS) + if (rclcpp::spin_until_future_complete(client, future, std::chrono::seconds(2)) == + rclcpp::FutureReturnCode::SUCCESS) { Client::BagOptions bag_options; bag_options.goal.topics = std::move(future.get()); bag_options.goal.bag_time = 1.0f; po::options_description bag_opts("bag options"); - bag_opts.add_options() - ("name,n", po::value(&bag_options.goal.bag_name), "Bag name") - ("time,t", po::value(&bag_options.goal.bag_time), "Time in seconds") - ("topics,p", po::value>(&bag_options.goal.topics)->multitoken(), "Topics to bag"); + bag_opts.add_options()("name,n", po::value(&bag_options.goal.bag_name), + "Bag file name")("time,t", po::value(&bag_options.goal.bag_time), + "Duration in seconds to bag from the past up to now ")( + "topics,p", po::value>(&bag_options.goal.topics)->multitoken(), + "Topics to bag"); po::variables_map bag_vm; - po::store(po::command_line_parser(options) - .options(bag_opts) - .run(), - bag_vm); + po::store(po::command_line_parser(options).options(bag_opts).run(), bag_vm); po::notify(bag_vm); - bag_options.on_finish = [&](Client::BagFuture future){ + bag_options.on_finish = [&](Client::BagFuture future) + { auto& result = future.get(); - std::string message = result.first ? "Succuessfully save bag file to " + result.second : - "Failed to bag topics: " + result.second; + std::string message = result.first ? "Succuessfully save bag file to " + result.second : + "Failed to bag topics: " + result.second; std::cout << '\n' << message << ".\n"; rclcpp::shutdown(); }; - bag_options.on_progress = [&](float progress){ - std::cout << "Bagging\t" << static_cast(progress * 100) << "%\r"; - }; + bag_options.on_progress = [&](float progress) + { std::cout << "Bagging\t" << static_cast(progress * 100) << "%\r"; }; auto bag_future = client->bag(bag_options); - std::thread thread([&]{ - while(bag_future.wait_for(std::chrono::seconds(1)) == std::future_status::timeout) + std::thread thread( + [&] { - if(client->get_state() == Client::State::Waiting) + while (bag_future.wait_for(std::chrono::seconds(1)) == std::future_status::timeout) { - std::cout << "Lost connection with the online bagger server.\n"; - rclcpp::shutdown(); - break; + if (client->get_state() == Client::State::Waiting) + { + std::cout << "Lost connection with the online bagger server.\n"; + rclcpp::shutdown(); + break; + } } - } - }); + }); rclcpp::spin(client); @@ -389,13 +400,15 @@ int main(int argc, char** argv) std::cout << "Online bagger server doesn't exist.\n"; } } - else if(mode == Mode::TUI) + else if (mode == Mode::TUI) { - std::thread thread([=]{ - Tui tui; - tui.spin(client); - rclcpp::shutdown(); - }); + std::thread thread( + [=] + { + Tui tui; + tui.spin(client); + rclcpp::shutdown(); + }); rclcpp::spin(client); thread.join(); @@ -404,4 +417,3 @@ int main(int argc, char** argv) rclcpp::shutdown(); return 0; } - diff --git a/src/mil_common/online_bagger/src/server.cpp b/src/mil_common/online_bagger/src/server.cpp index ea4534a0..4e673adf 100644 --- a/src/mil_common/online_bagger/src/server.cpp +++ b/src/mil_common/online_bagger/src/server.cpp @@ -198,7 +198,7 @@ class Server : public rclcpp::Node if (!bag_name.has_filename()) { - bag_name.replace_filename(date + '-' + time); + bag_name.replace_filename(time); } if (bag_name.extension() != "bag") From dc8cb944a2d89f4b89c40ebb86c7ed17ab3af113 Mon Sep 17 00:00:00 2001 From: Joseph Handsome Date: Tue, 10 Jun 2025 15:59:53 -0400 Subject: [PATCH 08/12] move online bagger to subjector_setup launch file, add more bag topics --- .../launch/subjugator_setup.launch.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/subjugator/subjugator_bringup/launch/subjugator_setup.launch.py b/src/subjugator/subjugator_bringup/launch/subjugator_setup.launch.py index 2ad62178..1900354b 100644 --- a/src/subjugator/subjugator_bringup/launch/subjugator_setup.launch.py +++ b/src/subjugator/subjugator_bringup/launch/subjugator_setup.launch.py @@ -117,6 +117,29 @@ def generate_launch_description(): output="both", ) + bagger = Node( + package="online_bagger", + executable="online_bagger_server_node", + parameters=[ + { + # add topics to bag here + "topics": [ + "/imu/data", + "/dvl/odom", + "/thruster/BLH", + "/thruster/BLV", + "/thruster/BRH", + "/thruster/BRV", + "/thruster/FLH", + "/thruster/FLV", + "/thruster/FRH", + "/thruster/FRV", + ], + }, + ], + output="screen", + ) + return LaunchDescription( [ gui_cmd, @@ -128,5 +151,6 @@ def generate_launch_description(): controller, path_planner, trajectory_planner, + bagger, ], ) From 86e00b6e1c3a8d7c47447f07d19f0fb9e6d66e7c Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Fri, 24 Oct 2025 09:04:36 -0400 Subject: [PATCH 09/12] add ftxui dependence to the cmake file of the mil_tools package --- src/mil_common/mil_tools/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mil_common/mil_tools/CMakeLists.txt b/src/mil_common/mil_tools/CMakeLists.txt index fc1f391c..9bf333f1 100644 --- a/src/mil_common/mil_tools/CMakeLists.txt +++ b/src/mil_common/mil_tools/CMakeLists.txt @@ -30,8 +30,8 @@ target_include_directories( $ ${EIGEN3_INCLUDE_DIR}) target_include_directories(${PROJECT_NAME} PUBLIC $ $) -ament_target_dependencies(${PROJECT_NAME} rclcpp Eigen3 tf2_geometry_msgs) -ament_export_dependencies(rclcpp Eigen3 tf2_geometry_msgs) +ament_target_dependencies(${PROJECT_NAME} rclcpp Eigen3 tf2_geometry_msgs ftxui) +ament_export_dependencies(rclcpp Eigen3 tf2_geometry_msgs ftxui) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) ament_export_include_directories(include) include_directories(include) From dd4882b857d87db83afd86b634f2cb5989194405 Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Fri, 24 Oct 2025 09:09:33 -0400 Subject: [PATCH 10/12] rerun pre-commit to format the code --- .../include/mil_tools/tui/dialog.hpp | 49 ++++++-------- src/mil_common/online_bagger/CMakeLists.txt | 49 +++++--------- .../include/online_bagger/client.hpp | 23 ++++--- .../include/online_bagger/common.hpp | 2 +- .../online_bagger/launch/test.launch.py | 25 ++++--- src/mil_common/online_bagger/package.xml | 4 +- src/mil_common/online_bagger/src/client.cpp | 65 +++++++++++-------- 7 files changed, 99 insertions(+), 118 deletions(-) diff --git a/src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp b/src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp index e8d92d15..baed3db5 100644 --- a/src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp +++ b/src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp @@ -1,54 +1,45 @@ #pragma once // Show a dialog on the terminal powered by ftxui -#include "ftxui/component/screen_interactive.hpp" #include "ftxui/component/component.hpp" +#include "ftxui/component/screen_interactive.hpp" #include "ftxui/dom/elements.hpp" namespace mil_tools::tui { class Dialog { -public: + public: Dialog() { - } - int exec(const std::string& title, - ftxui::Component content, - const std::vector& buttons = {"OK"}) + int exec(std::string const& title, ftxui::Component content, std::vector const& buttons = { "OK" }) { using namespace ftxui; - Component close_button = Button("X", [&]{ - screen.Exit(); - }, ButtonOption::Ascii()); + Component close_button = Button("X", [&] { screen.Exit(); }, ButtonOption::Ascii()); Components button_comps; - button_comps.push_back(Renderer([]{return filler();})); - for(size_t i=0;iRender() - }); - }), - Renderer([]{return separator();}), - content | vcenter | flex, - Renderer([]{return separator();}), - Container::Horizontal(button_comps) - }) | border; - + Component dialog_ui = + Container::Vertical( + { Renderer(close_button, [=] { return hbox({ text(title) | center | flex, close_button->Render() }); }), + Renderer([] { return separator(); }), content | vcenter | flex, Renderer([] { return separator(); }), + Container::Horizontal(button_comps) }) | + border; + screen.Loop(dialog_ui); return exit_code; } @@ -58,8 +49,8 @@ class Dialog screen.Exit(); } - private: + private: int exit_code = -1; ftxui::ScreenInteractive screen = ftxui::ScreenInteractive::Fullscreen(); }; -} \ No newline at end of file +} // namespace mil_tools::tui diff --git a/src/mil_common/online_bagger/CMakeLists.txt b/src/mil_common/online_bagger/CMakeLists.txt index a8346c3c..b2ae84b9 100644 --- a/src/mil_common/online_bagger/CMakeLists.txt +++ b/src/mil_common/online_bagger/CMakeLists.txt @@ -21,45 +21,28 @@ add_executable(${PROJECT_NAME}_server_node src/server.cpp) add_library(${PROJECT_NAME}_client src/client.cpp) add_executable(${PROJECT_NAME} src/online_bagger.cpp) -ament_target_dependencies(${PROJECT_NAME}_server_node - rclcpp - mil_msgs - std_msgs - rclcpp_action - rosbag2_cpp - Boost -) +ament_target_dependencies( + ${PROJECT_NAME}_server_node + rclcpp + mil_msgs + std_msgs + rclcpp_action + rosbag2_cpp + Boost) -ament_target_dependencies(${PROJECT_NAME}_client - rclcpp - mil_msgs - std_msgs - rclcpp_action -) +ament_target_dependencies(${PROJECT_NAME}_client rclcpp mil_msgs std_msgs + rclcpp_action) -ament_target_dependencies(${PROJECT_NAME} - mil_tools - rclcpp - Boost -) +ament_target_dependencies(${PROJECT_NAME} mil_tools rclcpp Boost) -target_link_libraries(${PROJECT_NAME} - ${PROJECT_NAME}_client - ftxui::screen - ftxui::dom - ftxui::component -) +target_link_libraries(${PROJECT_NAME} ${PROJECT_NAME}_client ftxui::screen + ftxui::dom ftxui::component) install(TARGETS ${PROJECT_NAME}_server_node ${PROJECT_NAME}_client - DESTINATION lib/${PROJECT_NAME} -) + DESTINATION lib/${PROJECT_NAME}) -install(TARGETS ${PROJECT_NAME} - DESTINATION bin -) +install(TARGETS ${PROJECT_NAME} DESTINATION bin) -install(DIRECTORY launch/ - DESTINATION share/${PROJECT_NAME}/launch -) +install(DIRECTORY launch/ DESTINATION share/${PROJECT_NAME}/launch) ament_package() diff --git a/src/mil_common/online_bagger/include/online_bagger/client.hpp b/src/mil_common/online_bagger/include/online_bagger/client.hpp index d6977cc7..9f8cc1b7 100644 --- a/src/mil_common/online_bagger/include/online_bagger/client.hpp +++ b/src/mil_common/online_bagger/include/online_bagger/client.hpp @@ -1,16 +1,17 @@ #pragma once #include -#include -#include "mil_msgs/srv/bag_topics.hpp" #include "mil_msgs/action/bag_online.hpp" +#include "mil_msgs/srv/bag_topics.hpp" + +#include namespace online_bagger { -class Client: public rclcpp::Node +class Client : public rclcpp::Node { - public: + public: using BagOnlineGoalHandle = rclcpp_action::ClientGoalHandle; using TopicsFuture = std::shared_future>; using BagFuture = std::shared_future>; @@ -36,15 +37,15 @@ class Client: public rclcpp::Node Client(); ~Client(); - + State get_state(); - + TopicsFuture get_bag_topics(std::function callback = nullptr); - BagFuture bag(const BagOptions& options); + BagFuture bag(BagOptions const& options); - private: + private: using TopicsPromise = std::promise>; - using BagPromise = std::promise>; + using BagPromise = std::promise>; std::atomic bagging; rclcpp_action::Client::SharedPtr action_client; @@ -52,8 +53,6 @@ class Client: public rclcpp::Node rclcpp::TimerBase::SharedPtr alive_timer; std::function on_state_change; - }; -} - +} // namespace online_bagger diff --git a/src/mil_common/online_bagger/include/online_bagger/common.hpp b/src/mil_common/online_bagger/include/online_bagger/common.hpp index 239abf51..4ec9ae11 100644 --- a/src/mil_common/online_bagger/include/online_bagger/common.hpp +++ b/src/mil_common/online_bagger/include/online_bagger/common.hpp @@ -4,4 +4,4 @@ namespace online_bagger { static constexpr char BAG_ACTION_NAME[] = "/online_bagger/bag"; static constexpr char TOPIC_SERVICE_NAME[] = "/online_bagger/topics"; -} \ No newline at end of file +} // namespace online_bagger diff --git a/src/mil_common/online_bagger/launch/test.launch.py b/src/mil_common/online_bagger/launch/test.launch.py index 777161bd..7ed8b80a 100644 --- a/src/mil_common/online_bagger/launch/test.launch.py +++ b/src/mil_common/online_bagger/launch/test.launch.py @@ -1,17 +1,16 @@ from launch import LaunchDescription -from launch.actions import ExecuteProcess from launch_ros.actions import Node -def generate_launch_description(): - return LaunchDescription([ - # Launch online_bagger node with topics list - Node( - package='online_bagger', - executable='online_bagger_server', - output='screen', - parameters=[{ - 'topics': ['rand_int', 'rand_float', 'rand_str'] - }] - ) - ]) +def generate_launch_description(): + return LaunchDescription( + [ + # Launch online_bagger node with topics list + Node( + package="online_bagger", + executable="online_bagger_server", + output="screen", + parameters=[{"topics": ["rand_int", "rand_float", "rand_str"]}], + ), + ], + ) diff --git a/src/mil_common/online_bagger/package.xml b/src/mil_common/online_bagger/package.xml index 3f67b192..c6c9bb64 100644 --- a/src/mil_common/online_bagger/package.xml +++ b/src/mil_common/online_bagger/package.xml @@ -10,12 +10,12 @@ ament_cmake ftxui + mil_msgs + mil_tools rclcpp rclcpp_action rosbag2_cpp - mil_msgs std_msgs - mil_tools ament_lint_auto ament_lint_common diff --git a/src/mil_common/online_bagger/src/client.cpp b/src/mil_common/online_bagger/src/client.cpp index 2e3a16ec..e2c5d329 100644 --- a/src/mil_common/online_bagger/src/client.cpp +++ b/src/mil_common/online_bagger/src/client.cpp @@ -1,17 +1,19 @@ -#include +#include "online_bagger/client.hpp" + +#include + #include #include "mil_msgs/action/bag_online.hpp" #include "mil_msgs/srv/bag_topics.hpp" #include "online_bagger/common.hpp" -#include "online_bagger/client.hpp" -#include +#include namespace online_bagger { -Client::Client():rclcpp::Node("online_bagger_client") +Client::Client() : rclcpp::Node("online_bagger_client") { action_client = rclcpp_action::create_client(this, BAG_ACTION_NAME); srv_client = create_client(TOPIC_SERVICE_NAME); @@ -21,36 +23,38 @@ Client::Client():rclcpp::Node("online_bagger_client") Client::~Client() { - } Client::State Client::get_state() { - if(!action_client->action_server_is_ready()) + if (!action_client->action_server_is_ready()) return State::Waiting; - - if(!bagging) + + if (!bagging) return State::Ready; return State::Bagging; } - + Client::TopicsFuture Client::get_bag_topics(std::function callback) { std::shared_ptr promise = std::make_shared(); TopicsFuture future = promise->get_future().share(); auto request = std::make_shared(); - srv_client->async_send_request(request, [promise, future, callback](rclcpp::Client::SharedFuture response_future) { - promise->set_value(std::move(response_future.get()->topics)); - if(callback) - callback(future); - }); + srv_client->async_send_request( + request, + [promise, future, callback](rclcpp::Client::SharedFuture response_future) + { + promise->set_value(std::move(response_future.get()->topics)); + if (callback) + callback(future); + }); return future; } -Client::BagFuture Client::bag(const Client::BagOptions& options) +Client::BagFuture Client::bag(Client::BagOptions const& options) { std::shared_ptr promise = std::make_shared(); BagFuture future = promise->get_future().share(); @@ -58,27 +62,32 @@ Client::BagFuture Client::bag(const Client::BagOptions& options) bagging = true; rclcpp_action::Client::SendGoalOptions goal_options; - goal_options.goal_response_callback = [this, on_finish = options.on_finish, promise, future](Client::BagOnlineGoalHandle::SharedPtr goal_handle){ - if(!goal_handle) + goal_options.goal_response_callback = + [this, on_finish = options.on_finish, promise, future](Client::BagOnlineGoalHandle::SharedPtr goal_handle) + { + if (!goal_handle) { - promise->set_value({false, "Bag request is rejected by the online bagger server."}); - if(on_finish) + promise->set_value({ false, "Bag request is rejected by the online bagger server." }); + if (on_finish) on_finish(future); bagging = false; } }; - if(options.on_progress) + if (options.on_progress) { - goal_options.feedback_callback = [on_progress = options.on_progress]([[maybe_unused]]Client::BagOnlineGoalHandle::SharedPtr goal_handle, - const std::shared_ptr feedback){ - on_progress(feedback->progress); - }; + goal_options.feedback_callback = + [on_progress = + options.on_progress]([[maybe_unused]] Client::BagOnlineGoalHandle::SharedPtr goal_handle, + std::shared_ptr const feedback) + { on_progress(feedback->progress); }; } - goal_options.result_callback = [this, on_finish = options.on_finish, promise, future](const Client::BagOnlineGoalHandle::WrappedResult& result){ - promise->set_value({result.result->success, std::move(result.result->status)}); - if(on_finish) + goal_options.result_callback = + [this, on_finish = options.on_finish, promise, future](Client::BagOnlineGoalHandle::WrappedResult const& result) + { + promise->set_value({ result.result->success, std::move(result.result->status) }); + if (on_finish) on_finish(future); bagging = false; }; @@ -88,4 +97,4 @@ Client::BagFuture Client::bag(const Client::BagOptions& options) return future; } -} +} // namespace online_bagger From da195103ee5a657ad25c76f0b0784a6e33de2535 Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Fri, 24 Oct 2025 09:30:51 -0400 Subject: [PATCH 11/12] add ftxui include path to the cmake file of the mil_tools package --- src/mil_common/mil_tools/CMakeLists.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/mil_common/mil_tools/CMakeLists.txt b/src/mil_common/mil_tools/CMakeLists.txt index 9bf333f1..da7a42d7 100644 --- a/src/mil_common/mil_tools/CMakeLists.txt +++ b/src/mil_common/mil_tools/CMakeLists.txt @@ -30,6 +30,9 @@ target_include_directories( $ ${EIGEN3_INCLUDE_DIR}) target_include_directories(${PROJECT_NAME} PUBLIC $ $) +target_include_directories( + ${PROJECT_NAME} PUBLIC $ + $) ament_target_dependencies(${PROJECT_NAME} rclcpp Eigen3 tf2_geometry_msgs ftxui) ament_export_dependencies(rclcpp Eigen3 tf2_geometry_msgs ftxui) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) From 23211e460f86f4571c421cdbf9ec1aa58d5138f1 Mon Sep 17 00:00:00 2001 From: "Zhongzheng R. Zhang" Date: Sat, 25 Oct 2025 19:07:23 -0400 Subject: [PATCH 12/12] fix include file path issue in the CMake file of the mil_tools package --- src/mil_common/mil_tools/CMakeLists.txt | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/mil_common/mil_tools/CMakeLists.txt b/src/mil_common/mil_tools/CMakeLists.txt index da7a42d7..a901f0fd 100644 --- a/src/mil_common/mil_tools/CMakeLists.txt +++ b/src/mil_common/mil_tools/CMakeLists.txt @@ -30,10 +30,11 @@ target_include_directories( $ ${EIGEN3_INCLUDE_DIR}) target_include_directories(${PROJECT_NAME} PUBLIC $ $) -target_include_directories( - ${PROJECT_NAME} PUBLIC $ - $) -ament_target_dependencies(${PROJECT_NAME} rclcpp Eigen3 tf2_geometry_msgs ftxui) + +target_link_libraries(${PROJECT_NAME} PUBLIC ftxui::screen ftxui::dom + ftxui::component) +ament_target_dependencies(${PROJECT_NAME} PUBLIC rclcpp Eigen3 + tf2_geometry_msgs) ament_export_dependencies(rclcpp Eigen3 tf2_geometry_msgs ftxui) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) ament_export_include_directories(include)