diff --git a/src/mil_common/mil_msgs/CMakeLists.txt b/src/mil_common/mil_msgs/CMakeLists.txt index d7125f5b..452decaa 100644 --- a/src/mil_common/mil_msgs/CMakeLists.txt +++ b/src/mil_common/mil_msgs/CMakeLists.txt @@ -38,7 +38,7 @@ set(msg_files "msg/ObjectDetections.msg") set(srv_files "srv/CameraToLidarTransform.srv" "srv/ObjectDBQuery.srv" - "srv/SetGeometry.srv") + "srv/SetGeometry.srv" "srv/BagTopics.srv") rosidl_generate_interfaces( ${PROJECT_NAME} diff --git a/src/mil_common/mil_msgs/action/BagOnline.action b/src/mil_common/mil_msgs/action/BagOnline.action index 79d721b7..ea53ff73 100644 --- a/src/mil_common/mil_msgs/action/BagOnline.action +++ b/src/mil_common/mil_msgs/action/BagOnline.action @@ -9,7 +9,7 @@ string bag_name # Space separated list of topics to bag. If empty string, all buffered topics will be bagged -string topics +string[] topics # Time in seconds float32 bag_time @@ -17,7 +17,7 @@ float32 bag_time bool success string status # Contains error information if success is False -string filename # Full path to file where bag was written +# string filename # Full path to file where bag was written --- diff --git a/src/mil_common/mil_msgs/srv/BagTopics.srv b/src/mil_common/mil_msgs/srv/BagTopics.srv new file mode 100644 index 00000000..5dfdf1d4 --- /dev/null +++ b/src/mil_common/mil_msgs/srv/BagTopics.srv @@ -0,0 +1,5 @@ +# Onling bagger client call this service to obtain a list of topics to be baggered +# The topics will be display on client ui + +--- +string[] topics # Contains all topic names to bag in the online bagger server. diff --git a/src/mil_common/mil_tools/CMakeLists.txt b/src/mil_common/mil_tools/CMakeLists.txt index 1c026924..a901f0fd 100644 --- a/src/mil_common/mil_tools/CMakeLists.txt +++ b/src/mil_common/mil_tools/CMakeLists.txt @@ -20,6 +20,7 @@ find_package(backward_ros REQUIRED) find_package(Eigen3 REQUIRED) find_package(tf2_geometry_msgs REQUIRED) find_package(rclcpp REQUIRED) +find_package(ftxui REQUIRED) set(SOURCE_FILES src/geometry/Rotation.cpp src/fs/path.cpp src/os.cpp src/os/FileDescriptor.cpp src/string.cpp) @@ -29,8 +30,12 @@ target_include_directories( $ ${EIGEN3_INCLUDE_DIR}) target_include_directories(${PROJECT_NAME} PUBLIC $ $) -ament_target_dependencies(${PROJECT_NAME} rclcpp Eigen3 tf2_geometry_msgs) -ament_export_dependencies(rclcpp Eigen3 tf2_geometry_msgs) + +target_link_libraries(${PROJECT_NAME} PUBLIC ftxui::screen ftxui::dom + ftxui::component) +ament_target_dependencies(${PROJECT_NAME} PUBLIC rclcpp Eigen3 + tf2_geometry_msgs) +ament_export_dependencies(rclcpp Eigen3 tf2_geometry_msgs ftxui) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) ament_export_include_directories(include) include_directories(include) diff --git a/src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp b/src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp new file mode 100644 index 00000000..baed3db5 --- /dev/null +++ b/src/mil_common/mil_tools/include/mil_tools/tui/dialog.hpp @@ -0,0 +1,56 @@ +#pragma once +// Show a dialog on the terminal powered by ftxui + +#include "ftxui/component/component.hpp" +#include "ftxui/component/screen_interactive.hpp" +#include "ftxui/dom/elements.hpp" + +namespace mil_tools::tui +{ +class Dialog +{ + public: + Dialog() + { + } + + int exec(std::string const& title, ftxui::Component content, std::vector const& buttons = { "OK" }) + { + using namespace ftxui; + Component close_button = Button("X", [&] { screen.Exit(); }, ButtonOption::Ascii()); + + Components button_comps; + button_comps.push_back(Renderer([] { return filler(); })); + for (size_t i = 0; i < buttons.size(); i++) + { + button_comps.push_back(Button( + buttons[i], + [this, i = i] + { + exit_code = i; + screen.Exit(); + }, + ButtonOption::Border())); + } + + Component dialog_ui = + Container::Vertical( + { Renderer(close_button, [=] { return hbox({ text(title) | center | flex, close_button->Render() }); }), + Renderer([] { return separator(); }), content | vcenter | flex, Renderer([] { return separator(); }), + Container::Horizontal(button_comps) }) | + border; + + screen.Loop(dialog_ui); + return exit_code; + } + + void close() + { + screen.Exit(); + } + + private: + int exit_code = -1; + ftxui::ScreenInteractive screen = ftxui::ScreenInteractive::Fullscreen(); +}; +} // namespace mil_tools::tui diff --git a/src/mil_common/mil_tools/package.xml b/src/mil_common/mil_tools/package.xml index d8d1cd60..333d7ccb 100644 --- a/src/mil_common/mil_tools/package.xml +++ b/src/mil_common/mil_tools/package.xml @@ -13,6 +13,7 @@ rclcpp backward_ros + ftxui ament ament_cmake ament_cmake_python diff --git a/src/mil_common/online_bagger/CMakeLists.txt b/src/mil_common/online_bagger/CMakeLists.txt new file mode 100644 index 00000000..b2ae84b9 --- /dev/null +++ b/src/mil_common/online_bagger/CMakeLists.txt @@ -0,0 +1,48 @@ +cmake_minimum_required(VERSION 3.8) +project(online_bagger) + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(mil_msgs REQUIRED) +find_package(std_msgs REQUIRED) +find_package(mil_tools REQUIRED) +find_package(rclcpp_action REQUIRED) +find_package(rosbag2_cpp REQUIRED) +find_package(Boost REQUIRED COMPONENTS filesystem system program_options) +find_package(ftxui REQUIRED COMPONENTS screen dom component) + +include_directories(include ${Boost_INCLUDE_DIRS}) +add_executable(${PROJECT_NAME}_server_node src/server.cpp) +add_library(${PROJECT_NAME}_client src/client.cpp) +add_executable(${PROJECT_NAME} src/online_bagger.cpp) + +ament_target_dependencies( + ${PROJECT_NAME}_server_node + rclcpp + mil_msgs + std_msgs + rclcpp_action + rosbag2_cpp + Boost) + +ament_target_dependencies(${PROJECT_NAME}_client rclcpp mil_msgs std_msgs + rclcpp_action) + +ament_target_dependencies(${PROJECT_NAME} mil_tools rclcpp Boost) + +target_link_libraries(${PROJECT_NAME} ${PROJECT_NAME}_client ftxui::screen + ftxui::dom ftxui::component) + +install(TARGETS ${PROJECT_NAME}_server_node ${PROJECT_NAME}_client + DESTINATION lib/${PROJECT_NAME}) + +install(TARGETS ${PROJECT_NAME} DESTINATION bin) + +install(DIRECTORY launch/ DESTINATION share/${PROJECT_NAME}/launch) + +ament_package() diff --git a/src/mil_common/online_bagger/include/online_bagger/client.hpp b/src/mil_common/online_bagger/include/online_bagger/client.hpp new file mode 100644 index 00000000..9f8cc1b7 --- /dev/null +++ b/src/mil_common/online_bagger/include/online_bagger/client.hpp @@ -0,0 +1,58 @@ +#pragma once + +#include + +#include "mil_msgs/action/bag_online.hpp" +#include "mil_msgs/srv/bag_topics.hpp" + +#include + +namespace online_bagger +{ +class Client : public rclcpp::Node +{ + public: + using BagOnlineGoalHandle = rclcpp_action::ClientGoalHandle; + using TopicsFuture = std::shared_future>; + using BagFuture = std::shared_future>; + + enum class State + { + Waiting, + Ready, + Bagging + }; + + struct BagOptions + { + BagOptions() + { + goal.bag_time = 1.0f; + } + + mil_msgs::action::BagOnline::Goal goal; + std::function on_progress = nullptr; + std::function on_finish = nullptr; + }; + + Client(); + ~Client(); + + State get_state(); + + TopicsFuture get_bag_topics(std::function callback = nullptr); + BagFuture bag(BagOptions const& options); + + private: + using TopicsPromise = std::promise>; + using BagPromise = std::promise>; + + std::atomic bagging; + rclcpp_action::Client::SharedPtr action_client; + rclcpp::Client::SharedPtr srv_client; + rclcpp::TimerBase::SharedPtr alive_timer; + + std::function on_state_change; +}; + +} // namespace online_bagger diff --git a/src/mil_common/online_bagger/include/online_bagger/common.hpp b/src/mil_common/online_bagger/include/online_bagger/common.hpp new file mode 100644 index 00000000..4ec9ae11 --- /dev/null +++ b/src/mil_common/online_bagger/include/online_bagger/common.hpp @@ -0,0 +1,7 @@ +#pragma once + +namespace online_bagger +{ +static constexpr char BAG_ACTION_NAME[] = "/online_bagger/bag"; +static constexpr char TOPIC_SERVICE_NAME[] = "/online_bagger/topics"; +} // namespace online_bagger diff --git a/src/mil_common/online_bagger/launch/test.launch.py b/src/mil_common/online_bagger/launch/test.launch.py new file mode 100644 index 00000000..7ed8b80a --- /dev/null +++ b/src/mil_common/online_bagger/launch/test.launch.py @@ -0,0 +1,16 @@ +from launch import LaunchDescription +from launch_ros.actions import Node + + +def generate_launch_description(): + return LaunchDescription( + [ + # Launch online_bagger node with topics list + Node( + package="online_bagger", + executable="online_bagger_server", + output="screen", + parameters=[{"topics": ["rand_int", "rand_float", "rand_str"]}], + ), + ], + ) diff --git a/src/mil_common/online_bagger/package.xml b/src/mil_common/online_bagger/package.xml new file mode 100644 index 00000000..c6c9bb64 --- /dev/null +++ b/src/mil_common/online_bagger/package.xml @@ -0,0 +1,26 @@ + + + + online_bagger + 0.0.0 + The online bagger is a tool that allows user to store bagged information about important events, after they happen. + zhongzheng + MIL + + ament_cmake + + ftxui + mil_msgs + mil_tools + rclcpp + rclcpp_action + rosbag2_cpp + std_msgs + + ament_lint_auto + ament_lint_common + + + ament_cmake + + diff --git a/src/mil_common/online_bagger/src/client.cpp b/src/mil_common/online_bagger/src/client.cpp new file mode 100644 index 00000000..e2c5d329 --- /dev/null +++ b/src/mil_common/online_bagger/src/client.cpp @@ -0,0 +1,100 @@ +#include "online_bagger/client.hpp" + +#include + +#include + +#include "mil_msgs/action/bag_online.hpp" +#include "mil_msgs/srv/bag_topics.hpp" +#include "online_bagger/common.hpp" + +#include + +namespace online_bagger +{ + +Client::Client() : rclcpp::Node("online_bagger_client") +{ + action_client = rclcpp_action::create_client(this, BAG_ACTION_NAME); + srv_client = create_client(TOPIC_SERVICE_NAME); + + action_client->wait_for_action_server(std::chrono::milliseconds(500)); +} + +Client::~Client() +{ +} + +Client::State Client::get_state() +{ + if (!action_client->action_server_is_ready()) + return State::Waiting; + + if (!bagging) + return State::Ready; + + return State::Bagging; +} + +Client::TopicsFuture Client::get_bag_topics(std::function callback) +{ + std::shared_ptr promise = std::make_shared(); + TopicsFuture future = promise->get_future().share(); + + auto request = std::make_shared(); + srv_client->async_send_request( + request, + [promise, future, callback](rclcpp::Client::SharedFuture response_future) + { + promise->set_value(std::move(response_future.get()->topics)); + if (callback) + callback(future); + }); + + return future; +} + +Client::BagFuture Client::bag(Client::BagOptions const& options) +{ + std::shared_ptr promise = std::make_shared(); + BagFuture future = promise->get_future().share(); + + bagging = true; + + rclcpp_action::Client::SendGoalOptions goal_options; + goal_options.goal_response_callback = + [this, on_finish = options.on_finish, promise, future](Client::BagOnlineGoalHandle::SharedPtr goal_handle) + { + if (!goal_handle) + { + promise->set_value({ false, "Bag request is rejected by the online bagger server." }); + if (on_finish) + on_finish(future); + bagging = false; + } + }; + + if (options.on_progress) + { + goal_options.feedback_callback = + [on_progress = + options.on_progress]([[maybe_unused]] Client::BagOnlineGoalHandle::SharedPtr goal_handle, + std::shared_ptr const feedback) + { on_progress(feedback->progress); }; + } + + goal_options.result_callback = + [this, on_finish = options.on_finish, promise, future](Client::BagOnlineGoalHandle::WrappedResult const& result) + { + promise->set_value({ result.result->success, std::move(result.result->status) }); + if (on_finish) + on_finish(future); + bagging = false; + }; + + action_client->async_send_goal(options.goal, goal_options); + + return future; +} + +} // namespace online_bagger diff --git a/src/mil_common/online_bagger/src/online_bagger.cpp b/src/mil_common/online_bagger/src/online_bagger.cpp new file mode 100644 index 00000000..72a986de --- /dev/null +++ b/src/mil_common/online_bagger/src/online_bagger.cpp @@ -0,0 +1,419 @@ +#include + +#include "ftxui/component/component.hpp" +#include "ftxui/component/loop.hpp" +#include "ftxui/component/screen_interactive.hpp" +#include "ftxui/dom/elements.hpp" +#include + +#include "mil_tools/tui/dialog.hpp" +#include "online_bagger/client.hpp" + +using namespace ftxui; +using namespace online_bagger; + +class TopicList : public ComponentBase +{ + public: + TopicList() + { + } + ~TopicList() + { + } + + void refresh(std::vector&& topics) + { + this->topics = std::move(topics); + DetachAllChildren(); + if (selected) + delete[] selected; + + if (this->topics.size() == 0) + return; + + future = std::async(std::launch::async, + [&] + { + selected = new bool[this->topics.size()]{ false }; + Components items; + for (size_t i = 0; i < this->topics.size(); i++) + { + items.push_back(Checkbox(this->topics[i], &selected[i])); + } + + return Container::Vertical(items); + }); + + return; + } + + std::vector const get_selected() + { + std::vector selected_topics; + for (size_t i = 0; i < topics.size(); i++) + { + if (selected[i]) + selected_topics.push_back(topics[i]); + } + return selected_topics; + } + + Element Render() override + { + if (!future.valid()) + { + if (ChildCount() > 0) + return ChildAt(0)->Render(); + else + return text("No bag topics available"); + } + else + { + if (future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) + { + Add(future.get()); + } + + return spinner(15, clock.now().nanoseconds() / 200'000'000); + } + } + + private: + std::future future; + std::vector topics; + bool* selected = nullptr; + rclcpp::Clock clock; +}; + +class Tui +{ + public: + Tui() + { + } + + ~Tui() + { + } + + void spin(std::shared_ptr client) + { + // Head + Component ui_head = Renderer([] { return text("Online Bagger") | bold | center; }); + // Body + std::shared_ptr ui_topic_list = std::make_shared(); + + Component ui_body = ui_topic_list | vscroll_indicator | frame; + + // Bottom + auto on_quit = [this] + { + exited = true; + screen.Exit(); + }; + auto on_bag = [this, ui_topic_list, client] + { + bag_progress = 0.0f; + + Client::BagOptions options; + options.goal.topics = ui_topic_list->get_selected(); + + if (!ask_bag_detail(options.goal.bag_name, options.goal.bag_time)) + return; + + options.on_finish = [this](Client::BagFuture future) + { + screen.Post( + [this, future] + { + auto& result = future.get(); + std::string message = result.first ? "Succuessfully save bag file to " + result.second : + "Failed to bag topics: " + result.second; + show_message("Bag Result", message); + }); + }; + + options.on_progress = [this](float progress) + { + bag_progress = progress; + screen.PostEvent(Event::Custom); + }; + + client->bag(options); + }; + + auto on_refresh = [this, ui_topic_list, client] + { + client->get_bag_topics( + [&](Client::TopicsFuture future) + { + screen.Post([ui_topic_list, topics = std::move(future.get())]() mutable + { ui_topic_list->refresh(std::move(topics)); }); + }); + }; + + Component ui_status_bar = Renderer( + [this, client] + { + if (last_state == Client::State::Waiting) + { + return hbox( + { text("Connecting to bag server "), spinner(15, clock.now().nanoseconds() / 200'000'000) }); + } + else if (last_state == Client::State::Ready) + { + return filler(); + } + else + { + return hbox({ + gauge(bag_progress), + filler(), + }); + } + }); + + Component ui_bottom_bar = Container::Horizontal( + { ui_status_bar | vcenter | flex, Button("Quit", on_quit, ButtonOption::Border()) | align_right, + Maybe(Button("Refresh", on_refresh, ButtonOption::Border()), + [client] { return client->get_state() != Client::State::Waiting; }) | + align_right, + Maybe(Button("Bag", on_bag, ButtonOption::Border()), + [client] { return client->get_state() == Client::State::Ready; }) | + align_right }); + + // Main UI + Component main_ui = Container::Vertical({ ui_head, Renderer([] { return separator(); }), ui_body | flex, + Renderer([] { return separator(); }), ui_bottom_bar }); + + last_state = Client::State::Waiting; + std::thread refresh_thread( + [this, client, &on_refresh] + { + while (!exited) + { + Client::State state = client->get_state(); + if (state != Client::State::Bagging) + { + screen.PostEvent(Event::Custom); + } + + if (state == Client::State::Ready && last_state == Client::State::Waiting) + { + on_refresh(); + } + + last_state = state; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + }); + + screen.Loop(main_ui); + refresh_thread.join(); + } + + private: + std::atomic exited = false; + std::atomic last_state; + rclcpp::Clock clock; + ScreenInteractive screen = ScreenInteractive::Fullscreen(); + std::atomic bag_progress = 0.0f; + + bool ask_bag_detail(std::string& name, float& time) + { + mil_tools::tui::Dialog dialog; + std::string time_input = std::to_string(time); + Component ui_name_input = Input(&name, InputOption::Default()); + Component ui_time_input = + Input(&time_input, InputOption::Default()) | + CatchEvent( + [](Event event) + { + return event.is_character() && !(std::isdigit(event.character()[0]) || event.character()[0] == '.'); + }); + + Component content = + Renderer(Container::Vertical({ ui_name_input, ui_time_input }), + [=] + { + return gridbox({ { text("Bag File Name ") | vcenter, ui_name_input->Render() | border }, + { text("Bag Time ") | vcenter, ui_time_input->Render() | border } }); + }) | + vscroll_indicator | frame; + + if (dialog.exec("Bag details", content) == -1) + return false; + + try + { + time = std::stof(time_input); + } + catch (std::exception const& e) + { + return false; + } + + return true; + } + + void show_message(std::string const& title, std::string const& message) + { + mil_tools::tui::Dialog dialog; + dialog.exec(title, Renderer([&] { return paragraph(message); })); + } +}; + +namespace po = boost::program_options; + +void show_help_message() +{ + std::cout << "Usage: online_bagger [args]." << std::endl; + std::cout << "Available commands are:" << std::endl; + std::cout << "list:\tList all topics that can be bagged." << std::endl; + std::cout << "help:\tShow this help message." << std::endl; + std::cout << "bag:\tStart bagging." << std::endl; + std::cout << "\t --topics -p \tTopics to bag." << std::endl; + std::cout << "\t\t\tWill bag all message if not specified." << std::endl; + std::cout << "\t --name -n \tBag file name." << std::endl; + std::cout << "\t\t\tDefault name is generate from bagging start time in the format HH:MM:SS." << std::endl; + std::cout << "\t --time -t \tDuration in seconds to bag from the past up to now." << std::endl; + std::cout << "\t\t\tDefault value is 1." << std::endl; + std::cout << "tui:\tShow terminal user interface." << std::endl; +} + +int main(int argc, char** argv) +{ + std::string subcommand; + std::vector options; + + po::options_description global_opts("Global"); + global_opts.add_options()("subcommand", po::value(&subcommand), "Subcommand")( + "options", po::value>(&options), "Args for subcommand"); + + po::positional_options_description pos; + pos.add("subcommand", 1).add("options", -1); + + po::variables_map vm; + po::store(po::command_line_parser(argc, argv).options(global_opts).positional(pos).allow_unregistered().run(), vm); + po::notify(vm); + + enum class Mode + { + TUI, + HELP, + BAG, + LIST + }; + + std::unordered_map const subcmd_mapping = { + { "tui", Mode::TUI }, { "help", Mode::HELP }, { "bag", Mode::BAG }, { "list", Mode::LIST } + }; + + auto it = subcmd_mapping.find(subcommand); + if (it == subcmd_mapping.end()) + { + show_help_message(); + return 1; + } + + Mode mode = it->second; + rclcpp::init(1, argv); + std::shared_ptr client = std::make_shared(); + + if (mode == Mode::HELP) + { + show_help_message(); + } + else if (mode == Mode::LIST) + { + auto future = client->get_bag_topics( + [](Client::TopicsFuture future) + { + for (std::string const& topic : future.get()) + { + std::cout << topic << std::endl; + } + rclcpp::shutdown(); + }); + + if (rclcpp::spin_until_future_complete(client, future, std::chrono::seconds(2)) == + rclcpp::FutureReturnCode::TIMEOUT) + std::cout << "Online bagger server doesn't exist.\n"; + } + else if (mode == Mode::BAG) + { + auto future = client->get_bag_topics(); + + if (rclcpp::spin_until_future_complete(client, future, std::chrono::seconds(2)) == + rclcpp::FutureReturnCode::SUCCESS) + { + Client::BagOptions bag_options; + bag_options.goal.topics = std::move(future.get()); + bag_options.goal.bag_time = 1.0f; + + po::options_description bag_opts("bag options"); + bag_opts.add_options()("name,n", po::value(&bag_options.goal.bag_name), + "Bag file name")("time,t", po::value(&bag_options.goal.bag_time), + "Duration in seconds to bag from the past up to now ")( + "topics,p", po::value>(&bag_options.goal.topics)->multitoken(), + "Topics to bag"); + + po::variables_map bag_vm; + po::store(po::command_line_parser(options).options(bag_opts).run(), bag_vm); + po::notify(bag_vm); + + bag_options.on_finish = [&](Client::BagFuture future) + { + auto& result = future.get(); + std::string message = result.first ? "Succuessfully save bag file to " + result.second : + "Failed to bag topics: " + result.second; + std::cout << '\n' << message << ".\n"; + rclcpp::shutdown(); + }; + + bag_options.on_progress = [&](float progress) + { std::cout << "Bagging\t" << static_cast(progress * 100) << "%\r"; }; + + auto bag_future = client->bag(bag_options); + + std::thread thread( + [&] + { + while (bag_future.wait_for(std::chrono::seconds(1)) == std::future_status::timeout) + { + if (client->get_state() == Client::State::Waiting) + { + std::cout << "Lost connection with the online bagger server.\n"; + rclcpp::shutdown(); + break; + } + } + }); + + rclcpp::spin(client); + + thread.join(); + } + else + { + std::cout << "Online bagger server doesn't exist.\n"; + } + } + else if (mode == Mode::TUI) + { + std::thread thread( + [=] + { + Tui tui; + tui.spin(client); + rclcpp::shutdown(); + }); + + rclcpp::spin(client); + thread.join(); + } + + rclcpp::shutdown(); + return 0; +} diff --git a/src/mil_common/online_bagger/src/server.cpp b/src/mil_common/online_bagger/src/server.cpp new file mode 100644 index 00000000..4e673adf --- /dev/null +++ b/src/mil_common/online_bagger/src/server.cpp @@ -0,0 +1,432 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "mil_msgs/action/bag_online.hpp" +#include "mil_msgs/srv/bag_topics.hpp" +#include "online_bagger/common.hpp" + +#include +#include + +namespace online_bagger +{ +class Server : public rclcpp::Node +{ + public: + Server() + : rclcpp::Node("online_bagger_server") + , work_guard(boost::asio::make_work_guard(context)) + , work_thread([this] { context.run(); }) + { + get_params(); + + topics_service = create_service( + TOPIC_SERVICE_NAME, + std::bind(&Server::handle_topics_req, this, std::placeholders::_1, std::placeholders::_2)); + + _action_server = rclcpp_action::create_server( + this, BAG_ACTION_NAME, std::bind(&Server::handle_goal, this, std::placeholders::_1, std::placeholders::_2), + std::bind(&Server::handle_cancel, this, std::placeholders::_1), + std::bind(&Server::handle_accepted, this, std::placeholders::_1)); + + subscribe(); + if (subscriber_list.size() == successful_subscription_count && subscriber_list.size() != 0) + { + RCLCPP_INFO(get_logger(), "Subscribed to %ld of %ld topics", successful_subscription_count, + subscriber_list.size()); + } + else + { + RCLCPP_INFO(get_logger(), "Subscribed to %ld of %ld topics, will try again every %.2f seconds", + successful_subscription_count, subscriber_list.size(), resubscribe_period); + resubscriber = + create_wall_timer(std::chrono::milliseconds(static_cast(resubscribe_period * 1000)), + [this] + { + if ((successful_subscription_count == subscriber_list.size()) && resubscriber) + { + resubscriber->cancel(); + if (subscriber_list.size() == 0) + { + RCLCPP_WARN(get_logger(), "No topics selected to subscribe to. Closing."); + rclcpp::shutdown(); + } + else + { + RCLCPP_INFO(get_logger(), "All topics are subscribed! Shutting down " + "resubscriber"); + } + + return; + } + subscribe(); + }); + } + } + ~Server() + { + work_guard.reset(); + work_thread.join(); + } + + private: + using BagOnlineGoalHandle = rclcpp_action::ServerGoalHandle; + std::atomic_bool streaming = true; + rclcpp_action::Server::SharedPtr _action_server; + rclcpp::Service::SharedPtr topics_service; + size_t successful_subscription_count = 0; + size_t stream_time; + bool dated_folder; + float resubscribe_period; + std::filesystem::path dir; + std::unordered_map> subscriber_list; + std::unordered_map>>> + topic_messages; + rclcpp::TimerBase::SharedPtr resubscriber; + size_t iteration_count = 0; + + boost::asio::io_context context; + boost::asio::executor_work_guard work_guard; + std::thread work_thread; + + void handle_topics_req([[maybe_unused]] mil_msgs::srv::BagTopics::Request::ConstSharedPtr request, + mil_msgs::srv::BagTopics::Response::SharedPtr response) + { + for (auto const& [topic, info] : subscriber_list) + { + auto const& [time, subscribed] = info; + if (subscribed != nullptr) + response->topics.push_back(topic); + } + } + + rclcpp_action::GoalResponse + handle_goal([[maybe_unused]] rclcpp_action::GoalUUID const& uuid, + [[maybe_unused]] std::shared_ptr goal) + { + if (!streaming) + { + RCLCPP_WARN(get_logger(), "Rejected goal because there is an ongoing goal"); + return rclcpp_action::GoalResponse::REJECT; + } + + RCLCPP_INFO(get_logger(), "Accepted goal from online bagger client"); + return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; + } + + rclcpp_action::CancelResponse handle_cancel([[maybe_unused]] std::shared_ptr const goal_handle) + { + return rclcpp_action::CancelResponse::REJECT; + } + + float get_topic_duration(std::string const& topic) + { + if (topic_messages[topic].size() == 0) + return 0.0f; + + return (topic_messages[topic].back().first - topic_messages[topic].front().first).nanoseconds() / 1e9f; + } + + size_t get_time_index(std::string const& topic, float requested_seconds) + { + if (requested_seconds == 0) + return 0; + + float topic_duration = get_topic_duration(topic); + + if (topic_duration == 0) + return 0; + + float ratio = requested_seconds / topic_duration; + size_t index = topic_messages[topic].size() * (1 - std::clamp(ratio, 0.0f, 1.0f)); + return index; + } + + void handle_accepted(std::shared_ptr goal_handle) + { + context.post([this, goal_handle] { start_bagging(goal_handle); }); + } + + void bagger_callback(std::shared_ptr msg, std::string const& topic) + { + if (!streaming) + return; + + iteration_count += 1; + topic_messages[topic].push_back({ now(), msg }); + + float time_diff = get_topic_duration(topic); + + if (iteration_count % 100 == 0) + { + RCLCPP_DEBUG(get_logger(), "%s has %ld messages spanning %.2f seconds.", topic.c_str(), + topic_messages[topic].size(), time_diff); + } + + while (time_diff > subscriber_list[topic].first && rclcpp::ok()) + { + topic_messages[topic].pop_front(); + time_diff = get_topic_duration(topic); + } + } + + std::filesystem::path get_bag_name(std::filesystem::path const& filename) + { + std::filesystem::path default_dir = dir; + std::string date = boost::gregorian::to_simple_string(boost::gregorian::day_clock::local_day()); + std::string time = + boost::posix_time::to_simple_string(boost::posix_time::second_clock::local_time().time_of_day()); + if (dated_folder) + { + default_dir /= date; + } + + std::filesystem::path bag_dir = default_dir / filename.parent_path(); + std::filesystem::path bag_name = filename.filename(); + if (!std::filesystem::exists(bag_dir)) + { + std::filesystem::create_directories(bag_dir); + } + + if (!bag_name.has_filename()) + { + bag_name.replace_filename(time); + } + + if (bag_name.extension() != "bag") + { + bag_name.replace_extension("bag"); + } + + return bag_dir / bag_name; + } + + void start_bagging(std::shared_ptr goal_handle) + { + auto result = std::make_shared(); + streaming = false; + + try + { + auto const goal = goal_handle->get_goal(); + + float requested_seconds = goal->bag_time; + std::vector const& selected_topics = goal->topics; + + auto feedback = std::make_shared(); + size_t total_messages = 0; + std::unordered_map bag_topics; + for (auto const& topic : selected_topics) + { + auto it = subscriber_list.find(topic); + if (it == subscriber_list.end()) + continue; + + auto const& [time, subscribed] = it->second; + if (subscribed == nullptr) + continue; + + if (topic_messages[topic].size() == 0) + continue; + + size_t index = get_time_index(topic, requested_seconds); + total_messages += topic_messages[topic].size() - index; + bag_topics[topic] = index; + } + + if (total_messages == 0) + { + result->success = false; + result->status = "No messages to bag"; + goto ret; + } + + RCLCPP_INFO(get_logger(), "Start bagging, total messages %ld", total_messages); + auto bag = std::make_shared(); + rosbag2_storage::StorageOptions storage_options; + storage_options.uri = get_bag_name(goal->bag_name).string(); + RCLCPP_INFO(get_logger(), "bag file path: %s", storage_options.uri.c_str()); + + bag->open(storage_options); + + goal_handle->publish_feedback(feedback); + + auto topic_names_and_types = get_topic_names_and_types(); + + size_t feedback_threshold = total_messages / 100; + for (auto const& [topic, index] : bag_topics) + { + for (size_t i = 0; i < topic_messages[topic].size(); i++) + { + auto const& [time, message] = topic_messages[topic][i]; + bag->write(message, topic, topic_names_and_types[topic][0], time); + + if (feedback_threshold == 0 || i % feedback_threshold == 0) // send feedback every 50 messages + { + feedback->progress = static_cast(i + 1) / total_messages; + goal_handle->publish_feedback(feedback); + } + } + // empty deque when done writing to bag + topic_messages[topic].clear(); + } + feedback->progress = 1.0; + goal_handle->publish_feedback(feedback); + bag->close(); + + result->status = std::move(storage_options.uri); + result->success = true; + } + + catch (std::exception const& e) + { + result->status = e.what(); + result->success = false; + } + ret: + if (result->success) + { + goal_handle->succeed(result); + RCLCPP_INFO(get_logger(), "Successfully bag to %s", result->status.c_str()); + } + else + { + goal_handle->abort(result); + RCLCPP_INFO(get_logger(), "Failed to bag: %s", result->status.c_str()); + } + + streaming = true; + } + + void subscribe() + { + auto topic_names_and_types = get_topic_names_and_types(); + std::vector subscribed_topics; + + for (auto& [topic, info] : subscriber_list) + { + auto& [time, subscribed] = info; + + if (!subscribed) + { + auto it = topic_names_and_types.find(topic); + if (it != topic_names_and_types.end()) + { + successful_subscription_count++; + subscribed = create_generic_subscription( + topic, it->second[0], rclcpp::QoS(10), + [this, topic](std::shared_ptr msg) { bagger_callback(msg, topic); }); + } + } + } + } + + void add_env_var(std::vector const& var) + { + for (auto& topic : var) + { + subscriber_list.emplace(std::move(topic), std::make_pair(stream_time, nullptr)); + } + } + + void get_params() + { + boost::process::environment env = boost::this_process::environment(); + auto it = env.find("BAG_DIR"); + if (it != env.end()) + { + dir = it->to_string(); + } + else + { + it = env.find("HOME"); + if (it != env.end()) + { + dir = std::filesystem::path(it->to_string()) / "bag"; + } + else + { + RCLCPP_FATAL(get_logger(), "Failed to obtain the bag package path, exiting..."); + rclcpp::shutdown(); + } + } + + dir = std::filesystem::path(declare_parameter("bag_package_path", dir.string())); + + stream_time = declare_parameter("stream_time", 30); + resubscribe_period = declare_parameter("resubscribe_period", 3.0); + dated_folder = declare_parameter("dated_folder", true); + + std::vector topics_param; + for (auto const& [topic, types] : get_topic_names_and_types()) + { + topics_param.push_back(topic); + } + topics_param = declare_parameter("topics", std::vector(topics_param)); + + for (std::string const& topic : topics_param) + { + std::istringstream iss(topic); + std::string token; + std::vector tokens; + while (std::getline(iss, token, ',')) + { + tokens.push_back(std::move(token)); + } + + size_t topic_stream_time = stream_time; + if (tokens.size() == 2) + { + try + { + topic_stream_time = std::stoul(tokens[1]); + } + catch (std::exception const& e) + { + RCLCPP_WARN(get_logger(), "Failed to set stream time %s for topic %s.", tokens[0].c_str(), + tokens[1].c_str()); + } + } + + subscriber_list.emplace(std::move(tokens[0]), std::make_pair(topic_stream_time, nullptr)); + } + + it = env.find("BAG_ALWAYS"); + if (it != env.end()) + { + add_env_var(it->to_vector()); + } + + for (auto const& entry : env) + { + std::string key = entry.get_name(); + if (key.size() >= 4 && key.substr(0, 4) == "bag_") + { + add_env_var(entry.to_vector()); + } + } + + RCLCPP_INFO(get_logger(), "Default stream_time: %ld seconds", stream_time); + RCLCPP_INFO(get_logger(), "Bag Directory: %s", dir.c_str()); + } +}; +} // namespace online_bagger + +int main(int argc, char* argv[]) +{ + rclcpp::init(argc, argv); + + auto server = std::make_shared(); + rclcpp::spin(server); + rclcpp::shutdown(); + + return 0; +} diff --git a/src/subjugator/subjugator_bringup/launch/gazebo.launch.py b/src/subjugator/subjugator_bringup/launch/gazebo.launch.py index c3d90141..a187b31c 100644 --- a/src/subjugator/subjugator_bringup/launch/gazebo.launch.py +++ b/src/subjugator/subjugator_bringup/launch/gazebo.launch.py @@ -76,12 +76,18 @@ def generate_launch_description(): output="screen", ) - return LaunchDescription( - [ - gz_sim_world, - gz_sim, - set_sim_params, - subjugator_setup, - bridge, + bagger = Node( + package="online_bagger", + executable="online_bagger_server_node", + parameters=[ + { + # add topics to bag here + "topics": ["/imu/data"], + }, ], ) + + return LaunchDescription( + [gz_sim_world, gz_sim, set_sim_params, subjugator_setup, bridge, bagger], + output="screen", + ) diff --git a/src/subjugator/subjugator_bringup/launch/subjugator_setup.launch.py b/src/subjugator/subjugator_bringup/launch/subjugator_setup.launch.py index 0c4e1179..8880187f 100644 --- a/src/subjugator/subjugator_bringup/launch/subjugator_setup.launch.py +++ b/src/subjugator/subjugator_bringup/launch/subjugator_setup.launch.py @@ -143,6 +143,29 @@ def generate_launch_description(): output="both", ) + bagger = Node( + package="online_bagger", + executable="online_bagger_server_node", + parameters=[ + { + # add topics to bag here + "topics": [ + "/imu/data", + "/dvl/odom", + "/thruster/BLH", + "/thruster/BLV", + "/thruster/BRH", + "/thruster/BRV", + "/thruster/FLH", + "/thruster/FLV", + "/thruster/FRH", + "/thruster/FRV", + ], + }, + ], + output="screen", + ) + # wrench_tuner = IncludeLaunchDescription( # PythonLaunchDescriptionSource( # os.path.join( @@ -166,5 +189,6 @@ def generate_launch_description(): controller, path_planner, trajectory_planner, + bagger, ], )