diff --git a/include/net/collectd/collectd_packet.hpp b/include/net/collectd/collectd_packet.hpp index acc5aecb7..d91173f85 100644 --- a/include/net/collectd/collectd_packet.hpp +++ b/include/net/collectd/collectd_packet.hpp @@ -22,9 +22,8 @@ #include #include -#include #include -#include +#include #include #include #include @@ -32,38 +31,33 @@ #include namespace collectd { -class data { - public: -#if defined(_MSC_VER) -#pragma warning(push) -#pragma warning(disable : 4200) // flexible-array member is intentional -#endif - struct string_part : boost::noncopyable { - int16_t type; - int16_t length; - char data[]; - - char *get_data() { return &data[0]; } - }; -#if defined(_MSC_VER) -#pragma warning(pop) -#endif - struct int64_part : boost::noncopyable { - int16_t type; - int16_t length; - int64_t data; - }; - struct value_part : boost::noncopyable { - int16_t type; - int16_t length; - int16_t count; - }; - struct derive_value_part : boost::noncopyable { - int8_t type; - int64_t value; - }; +// collectd "part" type codes (see https://collectd.org/wiki/index.php/Binary_protocol). +enum part_type : int16_t { + part_host = 0x0000, + part_time_hr = 0x0008, + part_plugin = 0x0002, + part_plugin_instance = 0x0003, + part_type = 0x0004, + part_type_instance = 0x0005, + part_values = 0x0006, + part_interval_hr = 0x0009, }; +// Value type codes inside a values part. +enum value_type : uint8_t { + value_gauge = 0x01, + value_derive = 0x02, +}; + +// The collectd network plugin reads at most this many bytes per datagram, so +// we must not emit a packet larger than this (see Network plugin +// `MaxPacketSize`, default 1452). +static const std::size_t max_packet_size = 1452; + +// Longest string a single part may carry: it must fit in one datagram alongside +// the 4-byte part header and the trailing NUL. +static const std::size_t max_string_length = max_packet_size - 5; + class collectd_exception : public std::exception { std::string msg_; @@ -92,95 +86,82 @@ class packet { return ss.str(); } - void add_host(std::string value) { append_string(0x0000, value); } - void add_plugin(std::string value) { append_string(0x0002, value); } - void add_plugin_instance(std::string value) { append_string(0x0003, value); } - void add_type(std::string value) { append_string(0x0004, value); } - void add_type_instance(std::string value) { append_string(0x0005, value); } - void add_gauge_value(std::list values) { append_values(0x00006, 0x01, values); } - void add_derive_value(std::list values) { append_values(0x00006, 0x02, values); } - void add_time_hr(unsigned long long time) { append_int(0x0008, time); } - void add_interval_hr(unsigned long long time) { append_int(0x0009, time); } - - bool is_full() const { return buffer.size() > 200; } - + void add_host(const std::string &value) { append_string(part_host, value); } + void add_plugin(const std::string &value) { append_string(part_plugin, value); } + void add_plugin_instance(const std::string &value) { append_string(part_plugin_instance, value); } + void add_type(const std::string &value) { append_string(part_type, value); } + void add_type_instance(const std::string &value) { append_string(part_type_instance, value); } + void add_gauge_value(const std::list &values) { append_values(value_gauge, values); } + void add_derive_value(const std::list &values) { append_values(value_derive, values); } + void add_time_hr(unsigned long long time) { append_int(part_time_hr, time); } + void add_interval_hr(unsigned long long time) { append_int(part_interval_hr, time); } + + // A packet is "full" once adding another value-list could overflow the + // network buffer. Leave headroom for one more value-list (a handful of + // string parts plus the values) so render() never emits an oversized packet. + bool is_full() const { return buffer.size() > max_packet_size - 128; } + + // All multi-byte integers in the collectd protocol are big-endian. Build the + // big-endian value in a properly aligned local and append its bytes, rather + // than reinterpret_cast-ing into the std::string buffer (which is unaligned + // and aliasing UB, and trips the sanitizers). template - inline void set_byte(std::string &buffer, const std::string::size_type pos, const T value) { - T *b_value = reinterpret_cast(&buffer[pos]); - *b_value = boost::endian::native_to_big(value); + static void append_be(std::string &buf, T value) { + const T be = boost::endian::native_to_big(value); + buf.append(reinterpret_cast(&be), sizeof(be)); } - void append_string(int16_t type, std::string &string_data) { - int16_t len = static_cast(string_data.length()) + 5; - std::string::size_type pos = buffer.length(); - buffer.append(sizeof(collectd::data::string_part), '\0'); - collectd::data::string_part *data = reinterpret_cast(&buffer[pos]); - data->type = boost::endian::native_to_big(type); - data->length = boost::endian::native_to_big(len); - buffer.append(string_data.c_str(), string_data.length() + 1); + // A string part: type(2) + length(2) + NUL-terminated string. The length + // field covers the 4-byte header and the trailing NUL. + // + // The length field is an (unsigned) 16-bit value and the whole part must fit + // inside one datagram, so clamp pathologically long strings — e.g. a + // misconfigured hostname/plugin/type — instead of letting the cast to int16_t + // silently wrap and emit a malformed part. + void append_string(int16_t type, const std::string &string_data) { + const std::size_t data_len = string_data.size() > max_string_length ? max_string_length : string_data.size(); + const int16_t len = static_cast(data_len + 5); + append_be(buffer, type); + append_be(buffer, len); + buffer.append(string_data, 0, data_len); + buffer.push_back('\0'); } + // A number part: type(2) + length(2, always 12) + uint64(8). void append_int(int16_t type, unsigned long long int_data) { - std::string::size_type pos = buffer.length(); - buffer.append(sizeof(int16_t) + sizeof(int16_t) + sizeof(int64_t), '\0'); - int16_t len = static_cast(buffer.length() - pos); - set_byte(buffer, pos, type); - set_byte(buffer, pos + sizeof(int16_t), len); - set_byte(buffer, pos + sizeof(int16_t) + sizeof(int16_t), int_data); + append_be(buffer, type); + append_be(buffer, static_cast(12)); + append_be(buffer, static_cast(int_data)); } - void append_values(int16_t base_type, int value_type, const std::list &value_data) { - std::string::size_type pos = buffer.length(); - buffer.append(sizeof(collectd::data::value_part), '\0'); - for (std::size_t i = 0; i < value_data.size(); i++) { - append_value_type(value_type); - } - for (const double &v : value_data) { - append_value_value(v); - } - int16_t len = static_cast(buffer.length() - pos); - collectd::data::value_part *data = reinterpret_cast(&buffer[pos]); - data->type = boost::endian::native_to_big(base_type); - data->count = boost::endian::native_to_big(static_cast(value_data.size())); - data->length = boost::endian::native_to_big(len); - } - void append_values(int16_t base_type, int value_type, const std::list &value_data) { - std::string::size_type pos = buffer.length(); - buffer.append(sizeof(collectd::data::value_part), '\0'); + // A values part: type(2) + length(2) + count(2) + count value-type bytes + + // count 8-byte values. Gauge values are little-endian IEEE-754 doubles; + // derive values are big-endian int64. Build the body first, then prefix the + // header with the now-known length. + template + void append_values(uint8_t value_type, const std::list &value_data) { + std::string body; + body.reserve(value_data.size() * 9); for (std::size_t i = 0; i < value_data.size(); i++) { - append_value_type(value_type); + body.push_back(static_cast(value_type)); } - for (const long long &v : value_data) { - append_value_value(v); + for (const T &v : value_data) { + append_value(body, v); } - int16_t len = static_cast(buffer.length() - pos); - collectd::data::value_part *data = reinterpret_cast(&buffer[pos]); - data->type = boost::endian::native_to_big(base_type); - data->count = boost::endian::native_to_big(static_cast(value_data.size())); - data->length = boost::endian::native_to_big(len); + const int16_t len = static_cast(6 + body.size()); + append_be(buffer, part_values); + append_be(buffer, len); + append_be(buffer, static_cast(value_data.size())); + buffer.append(body); } - void append_value_type(int type) { - std::string::size_type pos = buffer.length(); - int sz = sizeof(int8_t); - buffer.append(sz, '\0'); - int8_t *b_type = reinterpret_cast(&buffer[pos]); - *b_type = static_cast(type); - } - void append_value_value(const double double_data) { - std::string::size_type pos = buffer.length(); - int sz = sizeof(int64_t); - buffer.append(sz, '\0'); - // int64_t *b_value = reinterpret_cast(&buffer[pos + sizeof(int8_t)]); - // *b_value = boost::endian::native_to_big(static_cast(*int_data)); - double *b_dvalue = reinterpret_cast(&buffer[pos]); - *b_dvalue = double_data; - } - void append_value_value(const long long int_data) { - std::string::size_type pos = buffer.length(); - int sz = sizeof(int64_t); - buffer.append(sz, '\0'); - int64_t *b_value = reinterpret_cast(&buffer[pos]); - *b_value = boost::endian::native_to_big(static_cast(int_data)); + // Gauge: little-endian double (collectd stores gauges in x86 byte order). + static void append_value(std::string &buf, double value) { + uint64_t bits = 0; + std::memcpy(&bits, &value, sizeof(bits)); + const uint64_t le = boost::endian::native_to_little(bits); + buf.append(reinterpret_cast(&le), sizeof(le)); } + // Derive/counter: big-endian int64. + static void append_value(std::string &buf, long long value) { append_be(buf, static_cast(value)); } std::string get_buffer() const { return buffer; } @@ -392,7 +373,11 @@ struct collectd_builder { is_new = true; } } - packets.push_back(packet); + // Only emit the trailing packet if it actually holds data. When nothing + // rendered (e.g. an unmatched variable) or the last metric exactly filled + // and flushed a packet, `packet` is an empty default-constructed buffer and + // must not be sent as a zero-length datagram. + if (packet.get_size() > 0) packets.push_back(packet); } void set_metric(const ::std::string &key, const std::string &value); }; diff --git a/modules/CollectdClient/CMakeLists.txt b/modules/CollectdClient/CMakeLists.txt index 78fd15f2d..03e7b7d07 100644 --- a/modules/CollectdClient/CMakeLists.txt +++ b/modules/CollectdClient/CMakeLists.txt @@ -46,3 +46,19 @@ target_link_libraries( include(${BUILD_CMAKE_FOLDER}/module.cmake) source_group("Client" REGULAR_EXPRESSION .*include/collectd/.*) source_group("Socket" REGULAR_EXPRESSION .*include/socket/.*) + +NSCP_CREATE_TEST( + collectd_packet_test + SOURCES + collectd_packet_test.cpp + ${NSCP_INCLUDEDIR}/net/collectd/collectd_packet.cpp + ${NSCP_INCLUDEDIR}/str/utf8.cpp + LIBRARIES + GTest::gtest_main + expression_parser + ${Boost_REGEX_LIBRARY} + ${Boost_FILESYSTEM_LIBRARY} + INCLUDES + ${NSCP_INCLUDEDIR} + ${CMAKE_CURRENT_SOURCE_DIR} +) diff --git a/modules/CollectdClient/CollectdClient.cpp b/modules/CollectdClient/CollectdClient.cpp index 6a390e1d7..a7f25a0d0 100644 --- a/modules/CollectdClient/CollectdClient.cpp +++ b/modules/CollectdClient/CollectdClient.cpp @@ -36,7 +36,8 @@ * @return */ CollectdClient::CollectdClient() - : client_("nsca", std::make_shared(), std::make_shared()) {} + : handler_(std::make_shared()), + client_("collectd", handler_, std::make_shared()) {} /** * Default d-tor @@ -52,30 +53,48 @@ bool CollectdClient::loadModuleEx(std::string alias, NSCAPI::moduleLoadMode) { client_.set_path(target_path); + unsigned int interval = 10; + // clang-format off settings.alias().add_path_to_settings() - ("COLLECTD CLIENT SECTION", "Section for NSCA passive check module.") + ("COLLECTD CLIENT SECTION", "Section for the collectd client; forwards NSClient++ metrics to a collectd server.") ("targets", sh::fun_values_path([this] (auto key, auto value) { this->add_target(key, value); }), "REMOTE TARGET DEFINITIONS", "", "TARGET", "For more configuration options add a dedicated section") + + ("variables", sh::fun_values_path([this] (auto key, auto value) { this->handler_->add_variable(key, value); }), + "VARIABLE DEFINITIONS", + "Variables used to expand ${...} placeholders in metric keys. Each value is a regular expression matched against metric names; " + "the captured groups become the variable's values. When empty a built-in default set is used.") + + ("metrics", sh::fun_values_path([this] (auto key, auto value) { this->handler_->add_metric(key, value); }), + "METRIC MAPPINGS", + "Mapping of collectd keys (e.g. cpu-total/cpu-user) to value expressions (e.g. derive:system.cpu.total.user). " + "When empty a built-in default set is used.") ; - // clang-format on - settings.alias().add_key_to_settings().add_string( - "hostname", sh::string_key(&hostname_, "auto"), "HOSTNAME", - "The host name of the monitored computer.\nSet this to auto (default) to use the windows name of the computer.\n\n" + settings.alias().add_key_to_settings() + .add_string("hostname", sh::string_key(&hostname_, "auto"), "HOSTNAME", + "The host name reported to collectd.\nSet this to auto (default) to use the name of this computer.\n\n" "auto\tHostname\n" "${host}\tHostname\n" - "${host_lc}\nHostname in lowercase\n" + "${host_lc}\tHostname in lowercase\n" "${host_uc}\tHostname in uppercase\n" "${domain}\tDomainname\n" "${domain_lc}\tDomainname in lowercase\n" - "${domain_uc}\tDomainname in uppercase\n"); + "${domain_uc}\tDomainname in uppercase\n") + + .add_int("interval", sh::uint_key(&interval, 10), "METRICS INTERVAL", + "The interval (in seconds) reported to collectd. Should match the core 'metrics interval' so collectd computes rates correctly.") + ; + // clang-format on settings.register_all(); settings.notify(); + handler_->set_interval(interval); + client_.finalize(nscapi::settings_proxy::create(get_id(), get_core())); nscapi::core_helper core(get_core(), get_id()); diff --git a/modules/CollectdClient/CollectdClient.h b/modules/CollectdClient/CollectdClient.h index 7487e05cd..aacb08d34 100644 --- a/modules/CollectdClient/CollectdClient.h +++ b/modules/CollectdClient/CollectdClient.h @@ -25,13 +25,20 @@ #include #include +#include + namespace po = boost::program_options; namespace sh = nscapi::settings_helper; +namespace collectd_client { +struct collectd_client_handler; +} + class CollectdClient : public nscapi::impl::simple_plugin { private: std::string hostname_; + std::shared_ptr handler_; client::configuration client_; public: diff --git a/modules/CollectdClient/collectd_client.hpp b/modules/CollectdClient/collectd_client.hpp index 39eebdcbd..7f76db6e4 100644 --- a/modules/CollectdClient/collectd_client.hpp +++ b/modules/CollectdClient/collectd_client.hpp @@ -31,31 +31,29 @@ namespace collectd_client { class udp_sender { public: - udp_sender(boost::asio::io_context &io_service, boost::asio::ip::udp::endpoint source_endpoint, const boost::asio::ip::address &multicast_address, - unsigned short multicast_port) - : endpoint_(multicast_address, multicast_port), - socket_(io_service, source_endpoint) - //, timer_(io_service) - {} - - udp_sender(boost::asio::io_context &io_service, const boost::asio::ip::address &multicast_address, unsigned short multicast_port) - : endpoint_(multicast_address, multicast_port), - socket_(io_service, endpoint_.protocol()) - //, timer_(io_service) - {} - + // Multicast: bind the socket to a specific local interface (source_endpoint) + // so the datagram leaves through it. + udp_sender(boost::asio::io_context &io_service, boost::asio::ip::udp::endpoint source_endpoint, const boost::asio::ip::address &target_address, + unsigned short target_port) + : endpoint_(target_address, target_port), socket_(io_service, source_endpoint) {} + + // Unicast (or default-interface multicast): open a socket for the target's + // protocol family and let the OS pick the source. + udp_sender(boost::asio::io_context &io_service, const boost::asio::ip::address &target_address, unsigned short target_port) + : endpoint_(target_address, target_port), socket_(io_service, endpoint_.protocol()) {} + + // Queue one datagram. The payload is owned by a shared_ptr captured in the + // completion handler so it stays alive until the async send finishes — this + // lets us queue several packets before a single io_context.run() drains them + // (the previous single-member buffer was overwritten by the next send). void send_data(const std::string &data) { - payload = data; - socket_.async_send_to(boost::asio::buffer(payload), endpoint_, [this](auto ec, auto bytes) { this->handle_send_to(ec); }); + auto payload = std::make_shared(data); + socket_.async_send_to(boost::asio::buffer(*payload), endpoint_, [payload](const boost::system::error_code &, std::size_t) {}); } - void handle_send_to(const boost::system::error_code &error) {} - private: boost::asio::ip::udp::endpoint endpoint_; boost::asio::ip::udp::socket socket_; - // boost::asio::deadline_timer timer_; - std::string payload; }; struct connection_data : public socket_helpers::connection_info { @@ -100,7 +98,68 @@ struct client_handler : public socket_helpers::client::client_handler { std::string expand_path(std::string path) { return GET_CORE()->expand_path(path); } }; +// A single mapping entry: a collectd key (e.g. "cpu-total/cpu-user") and the +// expression that resolves its value(s) (e.g. "derive:system.cpu.total.user"). +typedef std::list > mapping_list; + struct collectd_client_handler : public client::handler_interface { + // Built-in mapping used when none is configured in settings. The metric + // namespace CheckSystem emits differs between platforms (Windows exposes + // committed/virtual/page memory, PDH counters and "core N"; the Unix module + // exposes physical/cached/swap memory and "core_N"), so the defaults are + // platform-specific — otherwise one platform would forward fabricated zeros + // for metrics the other never produces. + static mapping_list default_variables() { + mapping_list m; +#ifdef WIN32 + // CheckSystem emits per-core CPU as "system.cpu.core 0.user" (space). + m.push_back(std::make_pair("core", "system.cpu.core (.*)\\.user")); +#else + // CheckSystemUnix normalises per-core CPU to "system.cpu.core_0.user". + m.push_back(std::make_pair("core", "system.cpu.core_(.*)\\.user")); +#endif + return m; + } + static mapping_list default_metrics() { + mapping_list m; + // CPU total + per-core (derive) — available on both platforms. + m.push_back(std::make_pair("cpu-total/cpu-user", "derive:system.cpu.total.user")); + m.push_back(std::make_pair("cpu-total/cpu-system", "derive:system.cpu.total.kernel")); + m.push_back(std::make_pair("cpu-total/cpu-idle", "derive:system.cpu.total.idle")); + // Uptime in seconds (collectd "uptime" GAUGE type) — both platforms expose + // it as the numeric system.uptime.ticks.raw (system.uptime.uptime is a + // human-readable string and is not usable here). + m.push_back(std::make_pair("uptime/uptime", "gauge:system.uptime.ticks.raw")); +#ifdef WIN32 + m.push_back(std::make_pair("cpu-${core}/cpu-user", "derive:system.cpu.core ${core}.user")); + m.push_back(std::make_pair("cpu-${core}/cpu-system", "derive:system.cpu.core ${core}.kernel")); + m.push_back(std::make_pair("cpu-${core}/cpu-idle", "derive:system.cpu.core ${core}.idle")); + // Physical + page-file (committed) memory. + m.push_back(std::make_pair("memory-/memory-available", "gauge:system.mem.physical.avail")); + m.push_back(std::make_pair("memory-pagefile/memory-used", "gauge:system.mem.commited.used")); + m.push_back(std::make_pair("memory-pagefile/memory-free", "gauge:system.mem.commited.avail")); + // Process / thread counts (Windows-only metric family). + m.push_back(std::make_pair("processes-/ps_count", "gauge:system.metrics.procs.procs,system.metrics.procs.threads")); +#else + m.push_back(std::make_pair("cpu-${core}/cpu-user", "derive:system.cpu.core_${core}.user")); + m.push_back(std::make_pair("cpu-${core}/cpu-system", "derive:system.cpu.core_${core}.kernel")); + m.push_back(std::make_pair("cpu-${core}/cpu-idle", "derive:system.cpu.core_${core}.idle")); + // Physical memory (collectd "memory" type) + swap (collectd "swap" type). + m.push_back(std::make_pair("memory-/memory-used", "gauge:system.mem.physical.used")); + m.push_back(std::make_pair("memory-/memory-free", "gauge:system.mem.physical.avail")); + m.push_back(std::make_pair("swap-/swap-used", "gauge:system.mem.swap.used")); + m.push_back(std::make_pair("swap-/swap-free", "gauge:system.mem.swap.avail")); +#endif + return m; + } + + collectd_client_handler() : interval_seconds_(10) {} + + // Configuration (populated from settings; empty => defaults are used). + void add_variable(const std::string &key, const std::string &value) { variables_.push_back(std::make_pair(key, value)); } + void add_metric(const std::string &key, const std::string &value) { metrics_.push_back(std::make_pair(key, value)); } + void set_interval(unsigned long long seconds) { interval_seconds_ = seconds; } + bool query(client::destination_container sender, client::destination_container target, const PB::Commands::QueryRequestMessage &request_message, PB::Commands::QueryResponseMessage &response_message) { return false; @@ -108,18 +167,11 @@ struct collectd_client_handler : public client::handler_interface { bool submit(client::destination_container sender, client::destination_container target, const PB::Commands::SubmitRequestMessage &request_message, PB::Commands::SubmitResponseMessage &response_message) { - const PB::Common::Header &request_header = request_message.header(); - nscapi::protobuf::functions::make_return_header(response_message.mutable_header(), request_header); - connection_data con(target, sender); - - std::list list; - for (int i = 0; i < request_message.payload_size(); ++i) { - collectd::packet packet; - // packet.add_string(0, "Hello WOrld"); - list.push_back(packet); - } - - send(con, list); + // collectd is a metrics protocol; it has no concept of a passive check + // result. Report that clearly instead of emitting empty datagrams. + nscapi::protobuf::functions::make_return_header(response_message.mutable_header(), request_message.header()); + nscapi::protobuf::functions::set_response_bad(*response_message.add_payload(), + "The collectd client only forwards metrics; submitting passive check results is not supported."); return true; } @@ -138,10 +190,14 @@ struct collectd_client_handler : public client::handler_interface { for (const PB::Metrics::Metric &v : b.value()) { if (v.has_gauge_value()) { builder.set_metric(mypath + "." + v.key(), str::xtos(v.gauge_value().value())); + } else if (v.has_counter_value()) { + builder.set_metric(mypath + "." + v.key(), str::xtos(v.counter_value().value())); + } else if (v.has_untyped_value()) { + builder.set_metric(mypath + "." + v.key(), str::xtos(v.untyped_value().value())); } else if (v.has_string_value()) { builder.set_metric(mypath + "." + v.key(), v.string_value().value()); } else { - NSC_LOG_ERROR_EX("Unknown metrics type"); + NSC_LOG_ERROR_EX("Unsupported metrics type for: " + mypath + "." + v.key()); } } } @@ -159,101 +215,76 @@ struct collectd_client_handler : public client::handler_interface { set_metrics(builder, request_message); boost::posix_time::ptime const time_epoch(boost::gregorian::date(1970, 1, 1)); + const unsigned long long now_seconds = (boost::posix_time::microsec_clock::universal_time() - time_epoch).total_seconds(); - unsigned long long ms = (boost::posix_time::microsec_clock::universal_time() - time_epoch).total_seconds(); - unsigned long long int_ms = 5; - builder.set_time(ms << 30, int_ms << 30); - builder.set_host(sender.get_host()); - - builder.add_variable("diskid", "system.metrics.pdh.disk_queue_length.disk_queue_length_(.*)$"); - builder.add_variable("core", "system.cpu.core (.*).user"); - - builder.add_metric("memory-/memory-available", "gauge:system.mem.physical.avail"); - // builder.add_variable("memory-/memory-pool_nonpaged", "gauge:0"); - // builder.add_variable("memory-/memory-pool_paged", "gauge:0"); - // builder.add_variable("memory-/memory-system_cache", "gauge:0"); - // builder.add_variable("memory-/memory-system_code", "gauge:0"); // 0? - // builder.add_variable("memory-/memory-system_driver", "gauge:0"); - - // builder.add_variable("interface-${nic}/if_octets-", "derive:0,0"); - // builder.add_variable("interface-${nic}/if_packets-", "derive:0,0"); - - // builder.add_variable("disk-${diskid}/disk_octets", "derive:0,0"); - // builder.add_variable("disk-${diskid}/disk_ops", "derive:0,0"); - builder.add_metric("disk-${diskid}/queue_length", "gauge:system.metrics.pdh.disk_queue_length.disk_queue_length_${diskid}"); + // Interval reported to collectd: a per-target "interval" overrides the + // module-level default (interval_seconds_) when set. + const unsigned long long interval = static_cast(target.get_int_data("interval", static_cast(interval_seconds_))); - builder.add_metric("processes-/ps_count", "gauge:system.metrics.procs.procs,system.metrics.procs.threads"); - - builder.add_metric("memory-pagefile/memory-used", "gauge:system.mem.commited.used"); - builder.add_metric("memory-pagefile/memory-free", "gauge:system.mem.commited.free"); - - // builder.add_variable("df-${drive}/df_complex-free", "gauge:0"); - // builder.add_variable("df-${drive}/df_complex-reserved", "gauge:0"); - // builder.add_variable("df-${drive}/df_complex-used", "gauge:0"); - - // builder.add_variable("memory-/working_set-available", "gauge:0"); - // builder.add_variable("memory-/working_set-pool_nonpaged", "gauge:0"); - // builder.add_variable("memory-/working_set-pool_paged", "gauge:0"); - - builder.add_metric("cpu-${core}/cpu-user", "derive:system.cpu.core ${core}.user"); - builder.add_metric("cpu-${core}/cpu-system", "derive:system.cpu.core ${core}.kernel"); - // builder.add_metric("cpu-${core}/cpu-interrupt", "derive:0"); - builder.add_metric("cpu-${core}/cpu-idle", "derive:system.cpu.core ${core}.idle"); + // collectd "high-resolution" time/interval are in units of 2^-30 seconds. + builder.set_time(now_seconds << 30, interval << 30); + builder.set_host(sender.get_host()); - builder.add_metric("cpu-total/cpu-user", "derive:system.cpu.total.user"); - builder.add_metric("cpu-total/cpu-system", "derive:system.cpu.total.kernel"); - // builder.add_metric("cpu-total/cpu-interrupt", "derive:0"); - builder.add_metric("cpu-total/cpu-idle", "derive:system.cpu.total.idle"); + // Variables must be expanded (against the flattened metric names) before + // the metric templates that reference them are added. + const mapping_list &variables = variables_.empty() ? default_variables_ : variables_; + const mapping_list &metrics = metrics_.empty() ? default_metrics_ : metrics_; + for (const auto &v : variables) builder.add_variable(v.first, v.second); + for (const auto &m : metrics) builder.add_metric(m.first, m.second); - // NSC_DEBUG_MSG("--->" + builder.to_string()); collectd::collectd_builder::packet_list packets; - builder.render(packets); connection_data con(target, sender); send(con, packets); return true; } - void send(const connection_data target, const collectd::collectd_builder::packet_list &packets) { + void send(const connection_data &target, const collectd::collectd_builder::packet_list &packets) { NSC_TRACE_ENABLED() { NSC_TRACE_MSG("Sending " + str::xtos(packets.size()) + " packets to: " + target.to_string()); } - for (const collectd::packet &p : packets) { - try { - boost::asio::io_context io_service; - std::list> senders; - - boost::asio::ip::address target_address = boost::asio::ip::make_address(target.get_address()); - + try { + boost::asio::io_context io_service; + const boost::asio::ip::address target_address = boost::asio::ip::make_address(target.get_address()); + const unsigned short target_port = target.get_int_port(); + + const bool is_multicast = (target_address.is_v4() && target_address.to_v4().is_multicast()) || + (target_address.is_v6() && target_address.to_v6().is_multicast()); + + // Build the set of sockets to send through. For unicast that is a single + // OS-routed socket; for multicast we send out every local interface of + // the matching address family. + std::list > senders; + if (is_multicast) { boost::asio::ip::udp::resolver resolver(io_service); - auto endpoints = resolver.resolve(boost::asio::ip::host_name(), ""); - bool is_multicast = false; - if (target_address.is_v4()) { - is_multicast = target_address.to_v4().is_multicast(); - } else if (target_address.is_v6()) { - is_multicast = target_address.to_v6().is_multicast(); + for (const auto &entry : resolver.resolve(boost::asio::ip::host_name(), "")) { + const boost::asio::ip::address &local = entry.endpoint().address(); + if (local.is_v4() == target_address.is_v4()) { + senders.push_back(std::make_shared(io_service, entry.endpoint(), target_address, target_port)); + } } + } + if (senders.empty()) { + // Unicast, or multicast with no enumerable matching interface: fall + // back to a default-bound socket for the target's address family. + senders.push_back(std::make_shared(io_service, target_address, target_port)); + } - if (is_multicast) { - for (const auto &entry : endpoints) { - if (target_address.is_v4() && entry.endpoint().address().is_v4()) { - std::shared_ptr s = std::make_shared(io_service, entry.endpoint(), target_address, target.get_int_port()); - senders.push_back(s); - s->send_data(p.get_buffer()); - io_service.run(); - } - } - } else { - std::shared_ptr s = std::make_shared(io_service, target_address, target.get_int_port()); - senders.push_back(s); + for (const collectd::packet &p : packets) { + if (p.get_size() == 0) continue; // never put an empty datagram on the wire + for (const std::shared_ptr &s : senders) { s->send_data(p.get_buffer()); - io_service.run(); } - - senders.clear(); - - } catch (std::exception &e) { - NSC_LOG_ERROR_STD(utf8::utf8_from_native(e.what())); } + io_service.run(); + } catch (std::exception &e) { + NSC_LOG_ERROR_STD(utf8::utf8_from_native(e.what())); } } + + private: + mapping_list variables_; + mapping_list metrics_; + unsigned long long interval_seconds_; + const mapping_list default_variables_ = default_variables(); + const mapping_list default_metrics_ = default_metrics(); }; } // namespace collectd_client \ No newline at end of file diff --git a/modules/CollectdClient/collectd_handler.hpp b/modules/CollectdClient/collectd_handler.hpp index a35bfcbff..a7e21df3e 100644 --- a/modules/CollectdClient/collectd_handler.hpp +++ b/modules/CollectdClient/collectd_handler.hpp @@ -42,7 +42,10 @@ struct collectd_target_object : public nscapi::targets::target_object { nscapi::settings_helper::path_extension root_path = settings.path(get_path()); if (is_sample) root_path.set_sample(); - // add_ssl_keys(root_path); + root_path.add_key().add_int( + "interval", sh::int_fun_key([this](auto value) { this->set_property_int("interval", value); }), "METRICS INTERVAL", + "The interval (in seconds) reported to collectd for metrics sent to this target. Overrides the client-level interval; should match the core 'metrics " + "interval'."); settings.register_all(); settings.notify(); @@ -58,18 +61,10 @@ struct options_reader_impl : public client::options_reader_interface { } void process(boost::program_options::options_description &desc, client::destination_container &source, client::destination_container &data) { - // add_ssl_options(desc, data); - // clang-format off desc.add_options() - ("payload-length,l", po::value()->notifier([&data] (auto value) { data.set_int_data("payload length", value); }), - "Length of payload (has to be same as on the server)") - ("buffer-length", po::value()->notifier([&data] (auto value) { data.set_int_data("payload length", value); }), - "Length of payload to/from the NRPE agent. This is a hard specific value so you have to \"configure\" (read recompile) your NRPE agent to use the same value for it to work.") - ("password", po::value()->notifier([&data] (auto value) { data.set_string_data("password", value); }), - "Password") - ("time-offset", po::value()->notifier([&data] (auto value) { data.set_string_data("time offset", value); }), - "") + ("interval", po::value()->notifier([&data] (auto value) { data.set_int_data("interval", value); }), + "The interval (in seconds) reported to collectd for these metrics.") ; // clang-format on } diff --git a/modules/CollectdClient/collectd_packet_test.cpp b/modules/CollectdClient/collectd_packet_test.cpp new file mode 100644 index 000000000..f62c121a6 --- /dev/null +++ b/modules/CollectdClient/collectd_packet_test.cpp @@ -0,0 +1,356 @@ +/* + * Copyright (C) 2004-2026 Michael Medin + * + * This file is part of NSClient++ - https://nsclient.org + * + * NSClient++ is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * NSClient++ is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with NSClient++. If not, see . + */ + +#include + +#include + +#include + +#include +#include +#include +#include + +// ============================================================================ +// A standalone decoder for the collectd binary network protocol, used to +// verify the bytes produced by collectd::packet / collectd_builder. It mirrors +// the receiver in tests/collectd-submit.test.ts but in C++. +// ============================================================================ +namespace { + +struct decoded_value_list { + std::string host; + std::string plugin; + std::string plugin_instance; + std::string type; + std::string type_instance; + unsigned long long time_hr = 0; + unsigned long long interval_hr = 0; + std::vector value_types; // 1=gauge, 2=derive + std::vector gauges; + std::vector derives; +}; + +// Read a big-endian integer of width N from `p`. +template +T read_be(const unsigned char *p) { + T v = 0; + std::memcpy(&v, p, sizeof(T)); + return boost::endian::big_to_native(v); +} + +// Walk the TLV part stream of one packet, emitting a decoded_value_list for +// every values part using the host/plugin/type context established so far. +std::vector decode_packet(const std::string &buf) { + std::vector out; + const unsigned char *p = reinterpret_cast(buf.data()); + std::size_t off = 0; + std::string host, plugin, plugin_instance, type, type_instance; + unsigned long long time_hr = 0, interval_hr = 0; + + while (off + 4 <= buf.size()) { + const uint16_t part_type = read_be(p + off); + const uint16_t part_len = read_be(p + off + 2); + if (part_len < 4 || off + part_len > buf.size()) break; + const unsigned char *body = p + off + 4; + const std::size_t body_len = part_len - 4; + + switch (part_type) { + case collectd::part_host: + host.assign(reinterpret_cast(body), body_len ? body_len - 1 : 0); + break; + case collectd::part_plugin: + plugin.assign(reinterpret_cast(body), body_len ? body_len - 1 : 0); + plugin_instance.clear(); + break; + case collectd::part_plugin_instance: + plugin_instance.assign(reinterpret_cast(body), body_len ? body_len - 1 : 0); + break; + case collectd::part_type: + type.assign(reinterpret_cast(body), body_len ? body_len - 1 : 0); + type_instance.clear(); + break; + case collectd::part_type_instance: + type_instance.assign(reinterpret_cast(body), body_len ? body_len - 1 : 0); + break; + case collectd::part_time_hr: + time_hr = read_be(body); + break; + case collectd::part_interval_hr: + interval_hr = read_be(body); + break; + case collectd::part_values: { + decoded_value_list vl; + vl.host = host; + vl.plugin = plugin; + vl.plugin_instance = plugin_instance; + vl.type = type; + vl.type_instance = type_instance; + vl.time_hr = time_hr; + vl.interval_hr = interval_hr; + const uint16_t count = read_be(body); + const unsigned char *types = body + 2; + const unsigned char *values = types + count; + for (uint16_t i = 0; i < count; ++i) { + const uint8_t vt = types[i]; + vl.value_types.push_back(vt); + const unsigned char *vp = values + i * 8; + if (vt == collectd::value_gauge) { + uint64_t le = 0; + std::memcpy(&le, vp, sizeof(le)); + const uint64_t bits = boost::endian::little_to_native(le); + double d = 0; + std::memcpy(&d, &bits, sizeof(d)); + vl.gauges.push_back(d); + } else { + vl.derives.push_back(read_be(vp)); + } + } + out.push_back(vl); + break; + } + default: + break; + } + off += part_len; + } + return out; +} + +} // namespace + +// ============================================================================ +// packet: low-level part encoding +// ============================================================================ + +TEST(CollectdPacket, EmptyPacketHasEmptyBuffer) { + collectd::packet p; + EXPECT_TRUE(p.get_buffer().empty()); + EXPECT_EQ(p.get_size(), 0u); +} + +TEST(CollectdPacket, HostStringPartLayout) { + collectd::packet p; + p.add_host("myhost"); + const std::string &buf = p.get_buffer(); + // type(2) + length(2) + "myhost" + NUL = 4 + 6 + 1 = 11 bytes. + ASSERT_EQ(buf.size(), 11u); + EXPECT_EQ(read_be(reinterpret_cast(buf.data())), collectd::part_host); + EXPECT_EQ(read_be(reinterpret_cast(buf.data()) + 2), 11u); + EXPECT_EQ(buf[buf.size() - 1], '\0'); + EXPECT_EQ(buf.compare(4, 6, "myhost"), 0); +} + +TEST(CollectdPacket, ClampsOverlongStringPart) { + collectd::packet p; + // A hostname far larger than a datagram: the length field must not wrap. + p.add_host(std::string(40000, 'x')); + const std::string buf = p.get_buffer(); + const uint16_t len = read_be(reinterpret_cast(buf.data()) + 2); + // Length stays positive, within the datagram, and matches the actual part. + EXPECT_GT(len, 0u); + EXPECT_LE(len, collectd::max_packet_size); + EXPECT_EQ(buf.size(), len); + + // Still decodes cleanly to a (clamped, non-empty) host without overrunning. + p.add_gauge_value({1.0}); + const auto lists = decode_packet(p.get_buffer()); + ASSERT_EQ(lists.size(), 1u); + EXPECT_FALSE(lists[0].host.empty()); + EXPECT_LE(lists[0].host.size(), collectd::max_string_length); +} + +TEST(CollectdPacket, TimeAndIntervalPartsAreBigEndian) { + collectd::packet p; + p.add_time_hr(0x1122334455667788ULL); + p.add_interval_hr(10ULL << 30); + const auto lists = [&] { + // Append a values part so decode_packet emits a record carrying the context. + p.add_gauge_value({1.0}); + return decode_packet(p.get_buffer()); + }(); + ASSERT_EQ(lists.size(), 1u); + EXPECT_EQ(lists[0].time_hr, 0x1122334455667788ULL); + EXPECT_EQ(lists[0].interval_hr, 10ULL << 30); +} + +TEST(CollectdPacket, GaugeValueRoundTripsAsLittleEndianDouble) { + collectd::packet p; + p.add_host("h"); + p.add_gauge_value({1024.5, -0.25}); + const auto lists = decode_packet(p.get_buffer()); + ASSERT_EQ(lists.size(), 1u); + ASSERT_EQ(lists[0].gauges.size(), 2u); + EXPECT_DOUBLE_EQ(lists[0].gauges[0], 1024.5); + EXPECT_DOUBLE_EQ(lists[0].gauges[1], -0.25); + EXPECT_EQ(lists[0].value_types[0], collectd::value_gauge); +} + +TEST(CollectdPacket, DeriveValueRoundTripsAsBigEndianInt64) { + collectd::packet p; + p.add_host("h"); + p.add_derive_value({42, 9000000000LL}); + const auto lists = decode_packet(p.get_buffer()); + ASSERT_EQ(lists.size(), 1u); + ASSERT_EQ(lists[0].derives.size(), 2u); + EXPECT_EQ(lists[0].derives[0], 42); + EXPECT_EQ(lists[0].derives[1], 9000000000LL); + EXPECT_EQ(lists[0].value_types[0], collectd::value_derive); +} + +// ============================================================================ +// collectd_builder: metric mapping + rendering +// ============================================================================ + +TEST(CollectdBuilder, RendersDeriveMetricWithPluginAndTypeInstances) { + collectd::collectd_builder b; + b.set_time(123ULL << 30, 10ULL << 30); + b.set_host("myhost"); + b.set_metric("system.cpu.total.user", "42"); + b.add_metric("cpu-total/cpu-user", "derive:system.cpu.total.user"); + + collectd::collectd_builder::packet_list packets; + b.render(packets); + ASSERT_EQ(packets.size(), 1u); + const auto lists = decode_packet(packets.front().get_buffer()); + ASSERT_EQ(lists.size(), 1u); + EXPECT_EQ(lists[0].host, "myhost"); + EXPECT_EQ(lists[0].plugin, "cpu"); + EXPECT_EQ(lists[0].plugin_instance, "total"); + EXPECT_EQ(lists[0].type, "cpu"); + EXPECT_EQ(lists[0].type_instance, "user"); + ASSERT_EQ(lists[0].derives.size(), 1u); + EXPECT_EQ(lists[0].derives[0], 42); +} + +TEST(CollectdBuilder, RendersGaugeMetricFromMetricReference) { + collectd::collectd_builder b; + b.set_time(1ULL << 30, 1ULL << 30); + b.set_host("h"); + b.set_metric("system.mem.physical.avail", "1024.5"); + b.add_metric("memory-/memory-available", "gauge:system.mem.physical.avail"); + + collectd::collectd_builder::packet_list packets; + b.render(packets); + ASSERT_EQ(packets.size(), 1u); + const auto lists = decode_packet(packets.front().get_buffer()); + ASSERT_EQ(lists.size(), 1u); + EXPECT_EQ(lists[0].plugin, "memory"); + EXPECT_EQ(lists[0].type, "memory"); + EXPECT_EQ(lists[0].type_instance, "available"); + ASSERT_EQ(lists[0].gauges.size(), 1u); + EXPECT_DOUBLE_EQ(lists[0].gauges[0], 1024.5); +} + +TEST(CollectdBuilder, GaugeAcceptsLiteralValue) { + collectd::collectd_builder b; + b.set_time(1ULL << 30, 1ULL << 30); + b.set_host("h"); + b.add_metric("load-/gauge-value", "gauge:7"); + + collectd::collectd_builder::packet_list packets; + b.render(packets); + const auto lists = decode_packet(packets.front().get_buffer()); + ASSERT_EQ(lists.size(), 1u); + ASSERT_EQ(lists[0].gauges.size(), 1u); + EXPECT_DOUBLE_EQ(lists[0].gauges[0], 7.0); +} + +TEST(CollectdBuilder, ExpandsVariablesFromMatchingMetricNames) { + collectd::collectd_builder b; + b.set_time(1ULL << 30, 1ULL << 30); + b.set_host("h"); + // Two cores, each with a .user metric. + b.set_metric("system.cpu.core 0.user", "100"); + b.set_metric("system.cpu.core 1.user", "200"); + b.add_variable("core", "system.cpu.core (.*).user"); + b.add_metric("cpu-${core}/cpu-user", "derive:system.cpu.core ${core}.user"); + + collectd::collectd_builder::packet_list packets; + b.render(packets); + std::vector all; + for (const auto &pk : packets) { + const auto lists = decode_packet(pk.get_buffer()); + all.insert(all.end(), lists.begin(), lists.end()); + } + ASSERT_EQ(all.size(), 2u); + // The variable should have expanded into per-core plugin instances 0 and 1. + std::vector instances; + std::vector values; + for (const auto &vl : all) { + EXPECT_EQ(vl.plugin, "cpu"); + EXPECT_EQ(vl.type, "cpu"); + EXPECT_EQ(vl.type_instance, "user"); + instances.push_back(vl.plugin_instance); + ASSERT_EQ(vl.derives.size(), 1u); + values.push_back(vl.derives[0]); + } + EXPECT_NE(std::find(instances.begin(), instances.end(), "0"), instances.end()); + EXPECT_NE(std::find(instances.begin(), instances.end(), "1"), instances.end()); + EXPECT_NE(std::find(values.begin(), values.end(), 100), values.end()); + EXPECT_NE(std::find(values.begin(), values.end(), 200), values.end()); +} + +TEST(CollectdBuilder, UnmatchedVariableProducesNoMetrics) { + collectd::collectd_builder b; + b.set_time(1ULL << 30, 1ULL << 30); + b.set_host("h"); + // No metric matches the variable's regex, so the templated metric expands + // to nothing. + b.add_variable("diskid", "system.metrics.pdh.disk_queue_length.disk_queue_length_(.*)$"); + b.add_metric("disk-${diskid}/queue_length", "gauge:system.metrics.pdh.disk_queue_length.disk_queue_length_${diskid}"); + + collectd::collectd_builder::packet_list packets; + b.render(packets); + // Nothing rendered, so there must be no value-lists — and crucially no empty + // trailing packet that would go out as a zero-length datagram. + std::size_t value_lists = 0; + for (const auto &pk : packets) value_lists += decode_packet(pk.get_buffer()).size(); + EXPECT_EQ(value_lists, 0u); + for (const auto &pk : packets) EXPECT_GT(pk.get_size(), 0u); +} + +// ============================================================================ +// Fragmentation: a large metric set must split into multiple packets, each of +// which stays within the collectd network buffer size. +// ============================================================================ + +TEST(CollectdBuilder, FragmentsLargeMetricSetWithinMtu) { + collectd::collectd_builder b; + b.set_time(1ULL << 30, 1ULL << 30); + b.set_host("a-reasonably-long-hostname-for-padding"); + for (int i = 0; i < 500; ++i) { + const std::string key = "metric_" + std::to_string(i); + b.set_metric(key, std::to_string(i)); + b.add_metric("plugin" + std::to_string(i) + "-/gauge-value", "gauge:" + key); + } + + collectd::collectd_builder::packet_list packets; + b.render(packets); + EXPECT_GT(packets.size(), 1u); + + std::size_t total_value_lists = 0; + for (const auto &pk : packets) { + EXPECT_LE(pk.get_size(), collectd::max_packet_size); + EXPECT_GT(pk.get_size(), 0u); // no empty trailing packet even when fragmenting + total_value_lists += decode_packet(pk.get_buffer()).size(); + } + EXPECT_EQ(total_value_lists, 500u); +} diff --git a/tests/Dockerfiles/collectd.Dockerfile b/tests/Dockerfiles/collectd.Dockerfile new file mode 100644 index 000000000..d20c1693b --- /dev/null +++ b/tests/Dockerfiles/collectd.Dockerfile @@ -0,0 +1,24 @@ +# A real collectd daemon used by collectd-daemon.test.ts to prove NSClient++'s +# CollectdClient speaks the binary network protocol well enough for an actual +# collectd to parse it. The `network` plugin listens on UDP 25826 and the `csv` +# plugin writes every received value-list to /data///... — +# the test reads those files back via `docker exec` to confirm collectd +# accepted (parsed + type-checked) our packets. +# +# Unlike the other receivers this one is published with `-p ...:25826/udp` via +# raw `docker run` (testcontainers only maps TCP), so there is no wait strategy +# baked in here; the test waits for the "entering read-loop" log line. +FROM debian:bookworm-slim + +RUN apt-get update \ + && apt-get install -y --no-install-recommends collectd-core \ + && rm -rf /var/lib/apt/lists/* \ + && mkdir -p /data + +COPY Dockerfiles/entrypoints/collectd.conf /etc/collectd/collectd.conf + +EXPOSE 25826/udp + +# -f: run in the foreground (so logs stream and the container stays attached). +# -C: use only our config, not the distro's default plugin soup. +ENTRYPOINT ["collectd", "-f", "-C", "/etc/collectd/collectd.conf"] diff --git a/tests/Dockerfiles/entrypoints/collectd.conf b/tests/Dockerfiles/entrypoints/collectd.conf new file mode 100644 index 000000000..69b06faed --- /dev/null +++ b/tests/Dockerfiles/entrypoints/collectd.conf @@ -0,0 +1,24 @@ +# Minimal collectd config for the integration test: receive value-lists over +# the binary network protocol and write each one to a CSV file the test can +# read back. See collectd.Dockerfile for the surrounding context. + +Hostname "receiver" +FQDNLookup false +Interval 1 + +LoadPlugin network +LoadPlugin csv + +# Receive unsigned/plaintext value-lists on the default collectd UDP port. +# SecurityLevel defaults to None, which matches NSClient++'s (unsigned) sender. + + Listen "0.0.0.0" "25826" + + +# One file per value-list under /data///. +# StoreRates false keeps raw DERIVE/GAUGE values so a single sample produces a +# data line (no rate computation that would need two samples). + + DataDir "/data" + StoreRates false + diff --git a/tests/collectd-daemon.test.ts b/tests/collectd-daemon.test.ts new file mode 100644 index 000000000..9fba49984 --- /dev/null +++ b/tests/collectd-daemon.test.ts @@ -0,0 +1,206 @@ +/** + * End-to-end check against a REAL collectd daemon (collectd-daemon, not our own + * decoder): proves NSClient++'s CollectdClient speaks the binary network + * protocol well enough for an actual collectd to parse, type-check, and store + * the value-lists we send. + * + * Topology: + * nscp (host process) --UDP collectd binary--> collectd `network` plugin + * --> collectd `csv` plugin + * --> /data/// + * + * collectd's csv plugin only writes a file once it has accepted a value-list — + * i.e. parsed the packet AND matched the value count/types against types.db — + * so the presence of /data//cpu-total/cpu-user-* is positive proof the + * wire format is correct, not just self-consistent with our own decoder (that + * is what collectd-submit.test.ts covers). + * + * One collectd container is shared; each sub-scenario runs its own nscp with a + * distinct collectd `host` so their output lands in separate /data/ + * trees. collectd's UDP port is published with a raw `docker run -p + * ...:25826/udp` because testcontainers only maps TCP; dockerd's proxy + * forwards the datagrams from loopback to the container, and everything is read + * back through `docker exec`. + */ +import * as path from "path"; +import execa from "execa"; +import { NscpInstance, dockerOrSkip } from "@fixtures/index"; + +jest.setTimeout(600_000); + +const IMAGE_TAG = "nscp_collectd_it"; + +/** Thin `docker ...` wrapper. */ +async function docker(args: string[], opts: { reject?: boolean } = {}) { + return execa("docker", args, { reject: opts.reject ?? true, all: true }); +} + +dockerOrSkip()("CollectD real-daemon integration", () => { + let containerId = ""; + let port = 0; + + /** Files collectd's csv plugin has written for `host` so far. */ + async function dataFiles(host: string): Promise { + const r = await docker( + ["exec", containerId, "sh", "-c", `find /data/${host} -type f 2>/dev/null || true`], + { reject: false }, + ); + return r.stdout + .split("\n") + .map((s) => s.trim()) + .filter(Boolean); + } + + /** Poll collectd's output dir for `host` until `predicate` holds. */ + async function waitForFiles( + host: string, + predicate: (files: string[]) => boolean, + timeoutMs = 90_000, + ): Promise { + const deadline = Date.now() + timeoutMs; + let last: string[] = []; + while (Date.now() < deadline) { + last = await dataFiles(host); + if (predicate(last)) return last; + await new Promise((r) => setTimeout(r, 1000)); + } + return last; + } + + beforeAll(async () => { + // Build with `docker build` (cwd = tests/, so tests/.dockerignore keeps + // node_modules/ out of the context) and a fixed tag we can run ourselves + // with a published UDP port. + await execa("docker", ["build", "-t", IMAGE_TAG, "-f", "Dockerfiles/collectd.Dockerfile", "."], { + cwd: path.resolve(__dirname), + all: true, + }); + + const run = await execa("docker", ["run", "-d", "-p", "127.0.0.1::25826/udp", IMAGE_TAG], { + all: true, + }); + containerId = run.stdout.trim(); + + // Wait for "Initialization complete, entering read-loop." in the logs. + const deadline = Date.now() + 30_000; + let ready = false; + while (Date.now() < deadline) { + const logs = await docker(["logs", containerId], { reject: false }); + if (/entering read-loop/.test(`${logs.stdout}\n${logs.stderr}`)) { + ready = true; + break; + } + await new Promise((r) => setTimeout(r, 500)); + } + if (!ready) { + const logs = await docker(["logs", containerId], { reject: false }); + throw new Error(`collectd did not become ready. Logs:\n${logs.stdout}\n${logs.stderr}`); + } + + // The published loopback UDP port (docker assigns it; `docker port` reports it). + const portOut = (await docker(["port", containerId, "25826/udp"])).stdout.trim(); + port = Number(portOut.split("\n")[0].split(":").pop()); + if (!Number.isInteger(port) || port <= 0) { + throw new Error(`could not parse mapped UDP port from: ${portOut}`); + } + }); + + afterAll(async () => { + if (containerId) await docker(["rm", "-f", containerId], { reject: false }); + }); + + // The built-in default mapping is platform-specific (CheckSystem's metric + // namespace differs by OS), so the expected collectd output differs too. nscp + // runs as a host process, so process.platform decides which default branch is + // exercised; the collectd receiver is the same Linux container either way. + const isWindows = process.platform === "win32"; + const label = isWindows ? "default Windows mapping" : "default Linux mapping"; + // [regex, human description] for files that MUST be present for this platform. + const required: Array<[RegExp, string]> = isWindows + ? [ + [/\/cpu-total\/cpu-user-/, "cpu-total/cpu-user"], + [/\/cpu-\d+\/cpu-user-/, "per-core cpu-/cpu-user"], + [/\/memory\/memory-available-/, "memory/memory-available"], + [/\/memory-pagefile\/memory-used-/, "memory-pagefile/memory-used"], + [/\/processes\/ps_count-/, "processes/ps_count"], + [/\/uptime\/uptime-/, "uptime/uptime"], + ] + : [ + [/\/cpu-total\/cpu-user-/, "cpu-total/cpu-user"], + [/\/cpu-\d+\/cpu-user-/, "per-core cpu-/cpu-user"], + [/\/memory\/memory-free-/, "memory/memory-free"], + [/\/swap\/swap-used-/, "swap/swap-used"], + [/\/uptime\/uptime-/, "uptime/uptime"], + ]; + + describe(label, () => { + const HOST = "it-collectd-default"; + let nscp: NscpInstance; + + beforeAll(async () => { + nscp = new NscpInstance(); + await nscp.configure({ + "/modules": { CheckSystem: "enabled", CollectdClient: "enabled" }, + "/settings/core": { "metrics interval": "1s" }, + "/settings/collectd/client": { hostname: HOST }, + "/settings/collectd/client/targets/default": { address: `127.0.0.1:${port}` }, + }); + nscp.start(); + }); + + afterAll(async () => { + await nscp?.stop(); + }); + + it("stores the platform's default value-lists via a real collectd", async () => { + // Every default mapping for this platform produces a standard collectd + // type, so a real collectd accepts and stores them only if our packets + // are valid collectd binary protocol. The per-core file (cpu-/...) + // specifically exercises the `core`/`core_` variable expansion. + const files = await waitForFiles(HOST, (f) => required.every(([re]) => f.some((p) => re.test(p)))); + for (const [re, desc] of required) { + expect({ desc, present: files.some((p) => re.test(p)) }).toEqual({ desc, present: true }); + } + + // The stored CSV must contain collectd's "epoch,value" header plus at + // least one data line — confirming collectd decoded a value, not just a + // header. + const cpuFile = files.find((p) => /\/cpu-total\/cpu-user-/.test(p))!; + const content = (await docker(["exec", containerId, "cat", cpuFile])).stdout; + expect(content).toMatch(/^epoch,value/m); + expect(content).toMatch(/^\d+(?:\.\d+)?,\S+$/m); + }); + }); + + describe("multi-value type", () => { + const HOST = "it-collectd-mv"; + let nscp: NscpInstance; + + beforeAll(async () => { + nscp = new NscpInstance(); + await nscp.configure({ + "/modules": { CheckSystem: "enabled", CollectdClient: "enabled" }, + "/settings/core": { "metrics interval": "1s" }, + "/settings/collectd/client": { hostname: HOST }, + // A custom mapping emitting the two-value collectd "ps_count" type + // (processes + threads) from two metrics that exist on Linux. This + // proves a real collectd accepts our multi-value encoding — it drops + // value-lists whose value count doesn't match the type in types.db. + "/settings/collectd/client/metrics": { + "processes-/ps_count": "gauge:system.cpu.total.user,system.cpu.total.idle", + }, + "/settings/collectd/client/targets/default": { address: `127.0.0.1:${port}` }, + }); + nscp.start(); + }); + + afterAll(async () => { + await nscp?.stop(); + }); + + it("accepts a two-value ps_count value-list", async () => { + const files = await waitForFiles(HOST, (f) => f.some((p) => /\/processes\/ps_count-/.test(p))); + expect(files.some((p) => /\/processes\/ps_count-/.test(p))).toBe(true); + }); + }); +}); diff --git a/tests/collectd-submit.test.ts b/tests/collectd-submit.test.ts new file mode 100644 index 000000000..c6d82ae85 --- /dev/null +++ b/tests/collectd-submit.test.ts @@ -0,0 +1,362 @@ +/** + * Stands up a minimal collectd binary-protocol receiver and points + * NSClient++'s CollectdClient at it, then verifies the metrics flow: + * + * CheckSystem produces system.cpu.* / system.mem.* metrics; the core + * metrics scheduler fetches them every `metrics interval` and hands them to + * every module that consumes metrics. CollectdClient's metrics handler maps + * them onto collectd value-lists (cpu / memory / processes / …) and sends + * them to the configured target as collectd's binary network protocol over + * UDP (default port 25826). + * + * Unlike the graphite/nsca scenarios, the "server" here is NOT a docker + * container: testcontainers (v10) only publishes TCP ports, and collectd's + * wire protocol is UDP. So the receiver is a plain Node `dgram` socket bound + * to an ephemeral loopback port, and we decode the binary packets in-process + * (see decodeCollectd below) to assert on the host/plugin fields and values. + * + * The collectd binary protocol is a flat stream of TLV "parts" (RFC-less, but + * documented at https://collectd.org/wiki/index.php/Binary_protocol): + * - string parts (host/plugin/.../type_instance): 2-byte BE type, 2-byte BE + * length (incl. the 4-byte header and a trailing NUL), then the NUL- + * terminated string. + * - number parts (time/interval, "high-resolution" 2^-30 s units): same + * header + a single 8-byte BE uint64. + * - value parts: header + 2-byte BE count, then `count` 1-byte type codes + * (1=gauge little-endian double, 2=derive big-endian int64), then `count` + * 8-byte values. + * A reading is emitted whenever a value part is seen, tagged with whatever + * host/plugin/type context the preceding string parts established. This + * mirrors what collectd_packet.cpp on the sending side writes. + */ +import * as dgram from "dgram"; +import { NscpInstance } from "@fixtures/index"; + +jest.setTimeout(600_000); + +// Distinctive sender hostname so the decoded `host` field has a stable anchor +// regardless of the machine the test runs on. +const HOSTNAME = "it-collectd-host"; + +// collectd part type codes (collectd_packet.cpp add_* helpers). +const PART_HOST = 0x0000; +const PART_TIME_HR = 0x0008; +const PART_PLUGIN = 0x0002; +const PART_PLUGIN_INSTANCE = 0x0003; +const PART_TYPE = 0x0004; +const PART_TYPE_INSTANCE = 0x0005; +const PART_VALUES = 0x0006; +const PART_INTERVAL_HR = 0x0009; + +// collectd "high-resolution" time/interval are in units of 2^-30 seconds. +const HR_SHIFT = 1 << 30; + +interface CollectdReading { + host: string; + plugin: string; + pluginInstance: string; + type: string; + typeInstance: string; + /** Number of values in the value-list. */ + count: number; + /** Reported interval in whole seconds (time/interval are 2^-30 s units). */ + intervalSeconds: number; +} + +/** + * Decode one collectd binary packet into the list of readings (value-lists) + * it carries. Walks the TLV part stream, tracking the current + * host/plugin/type context, and emits a reading each time a value part is + * seen. Returns whatever it can parse and stops at the first malformed part + * rather than throwing — a half-captured datagram should degrade gracefully. + */ +function decodeCollectd(buf: Buffer): CollectdReading[] { + const readings: CollectdReading[] = []; + let host = ""; + let plugin = ""; + let pluginInstance = ""; + let type = ""; + let typeInstance = ""; + let intervalSeconds = 0; + + const readString = (body: Buffer): string => { + // Strip the trailing NUL the sender appends. + let end = body.length; + while (end > 0 && body[end - 1] === 0) end--; + return body.toString("utf8", 0, end); + }; + + let off = 0; + while (off + 4 <= buf.length) { + const partType = buf.readUInt16BE(off); + const partLen = buf.readUInt16BE(off + 2); + // length covers the 4-byte header; anything shorter or overrunning the + // datagram is corruption — bail with what we have. + if (partLen < 4 || off + partLen > buf.length) break; + const body = buf.subarray(off + 4, off + partLen); + off += partLen; + + switch (partType) { + case PART_HOST: + host = readString(body); + break; + case PART_PLUGIN: + plugin = readString(body); + pluginInstance = ""; + break; + case PART_PLUGIN_INSTANCE: + pluginInstance = readString(body); + break; + case PART_TYPE: + type = readString(body); + typeInstance = ""; + break; + case PART_TYPE_INSTANCE: + typeInstance = readString(body); + break; + case PART_TIME_HR: + // 8-byte BE uint64; skip — only the interval is asserted on. + break; + case PART_INTERVAL_HR: + if (body.length >= 8) { + // Read as a JS number; values fit comfortably in 53 bits here. + intervalSeconds = Math.round(Number(body.readBigUInt64BE(0)) / HR_SHIFT); + } + break; + case PART_VALUES: { + const count = body.length >= 2 ? body.readUInt16BE(0) : 0; + readings.push({ host, plugin, pluginInstance, type, typeInstance, count, intervalSeconds }); + break; + } + default: + // time / interval / unknown — context we don't assert on. + break; + } + } + return readings; +} + +/** A loopback UDP collectd receiver that decodes every datagram it gets. */ +class CollectdReceiver { + private readonly socket = dgram.createSocket("udp4"); + readonly readings: CollectdReading[] = []; + /** Raw datagrams, kept so a failing assertion can show the bytes. */ + readonly packets: Buffer[] = []; + + async start(): Promise { + await new Promise((resolve, reject) => { + this.socket.once("error", reject); + this.socket.bind(0, "127.0.0.1", () => { + this.socket.off("error", reject); + resolve(); + }); + }); + this.socket.on("message", (msg) => { + this.packets.push(msg); + this.readings.push(...decodeCollectd(msg)); + }); + return this.socket.address().port; + } + + async stop(): Promise { + await new Promise((resolve) => this.socket.close(() => resolve())); + } + + /** Poll until `predicate` holds over the readings collected so far. */ + async waitFor( + predicate: (readings: CollectdReading[]) => boolean, + timeoutMs = 60_000, + ): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (predicate(this.readings)) return true; + await new Promise((r) => setTimeout(r, 500)); + } + return predicate(this.readings); + } +} + +describe("CollectD integration", () => { + let nscp: NscpInstance; + let receiver: CollectdReceiver; + + beforeAll(async () => { + receiver = new CollectdReceiver(); + const port = await receiver.start(); + + nscp = new NscpInstance(); + await nscp.configure({ + "/modules": { + // CheckSystem is the system-metrics producer (system.cpu.* / + // system.mem.*); on Linux this build ships it as libCheckSystem.so + // (see graphite-submit.test.ts for the naming note). + CheckSystem: "enabled", + CollectdClient: "enabled", + }, + "/settings/core": { + // Push metrics every second so the test doesn't wait the 10s default. + "metrics interval": "1s", + }, + "/settings/collectd/client": { + // Pin the collectd `host` field so the decoded packets have a stable + // anchor. A literal (non-"auto") hostname is passed through verbatim. + hostname: HOSTNAME, + }, + "/settings/collectd/client/targets/default": { + // Unicast the binary protocol at our loopback receiver instead of the + // 239.192.74.66:25826 multicast group collectd defaults to. net::parse + // splits host:port; connection_data reads host + port from it. + address: `127.0.0.1:${port}`, + }, + }); + + // Run the full agent so the core metrics scheduler is live. + nscp.start(); + }); + + afterAll(async () => { + await nscp?.stop(); + await receiver?.stop(); + }); + + it("sends well-formed collectd binary packets over UDP", async () => { + const ok = await receiver.waitFor((r) => r.length > 0); + expect(ok).toBe(true); + // Every datagram must have decoded into at least one value-list — i.e. the + // bytes on the wire really are the collectd binary protocol, not garbage. + expect(receiver.packets.length).toBeGreaterThan(0); + expect(receiver.readings.length).toBeGreaterThan(0); + // Each value-list carries at least one value. + expect(receiver.readings.every((v) => v.count >= 1)).toBe(true); + }); + + it("tags packets with the configured collectd host", async () => { + await receiver.waitFor((r) => r.some((v) => v.host === HOSTNAME)); + const hosts = new Set(receiver.readings.map((v) => v.host)); + expect(hosts).toContain(HOSTNAME); + }); + + it("maps CheckSystem metrics onto collectd plugins", async () => { + // The default mapping is platform-specific: Linux emits cpu/memory/swap, + // Windows emits cpu/memory/processes. cpu + memory are common to both, so + // assert those show up once metrics have flowed. + const ok = await receiver.waitFor((r) => + r.some((v) => v.plugin === "cpu") && r.some((v) => v.plugin === "memory"), + ); + expect(ok).toBe(true); + const plugins = new Set(receiver.readings.map((v) => v.plugin)); + expect(plugins).toContain("cpu"); + expect(plugins).toContain("memory"); + // Per-core CPU (cpu-0, cpu-1, …) proves the default `core` variable + // expanded — this was broken on Linux before the per-platform defaults. + expect(receiver.readings.some((v) => v.plugin === "cpu" && /^\d+$/.test(v.pluginInstance))).toBe(true); + }); + + it("reports the default 10s interval", async () => { + await receiver.waitFor((r) => r.length > 0); + // No `interval` configured, so the handler's default (10s) is reported. + expect(receiver.readings.every((v) => v.intervalSeconds === 10)).toBe(true); + }); +}); + +// A second instance with an explicit interval and a custom metric mapping, +// proving the mapping/interval are settings-driven rather than hard-coded. +describe("CollectD configurable mapping", () => { + let nscp: NscpInstance; + let receiver: CollectdReceiver; + + beforeAll(async () => { + receiver = new CollectdReceiver(); + const port = await receiver.start(); + + nscp = new NscpInstance(); + await nscp.configure({ + "/modules": { + CheckSystem: "enabled", + CollectdClient: "enabled", + }, + "/settings/core": { + "metrics interval": "1s", + }, + "/settings/collectd/client": { + hostname: HOSTNAME, + // Report a non-default interval so we can distinguish it on the wire. + interval: "7", + }, + // A single custom mapping replacing the built-in defaults: emit one + // gauge under the "testplugin" plugin from a metric CheckSystem always + // produces on Linux (system.cpu.total.idle). + "/settings/collectd/client/metrics": { + "testplugin-/gauge-value": "gauge:system.cpu.total.idle", + }, + "/settings/collectd/client/targets/default": { + address: `127.0.0.1:${port}`, + }, + }); + + nscp.start(); + }); + + afterAll(async () => { + await nscp?.stop(); + await receiver?.stop(); + }); + + it("uses the configured interval and custom mapping", async () => { + const ok = await receiver.waitFor((r) => r.some((v) => v.plugin === "testplugin")); + expect(ok).toBe(true); + // Only the custom mapping should be present — the built-in cpu/memory + // defaults must not be emitted once a mapping is configured. + const plugins = new Set(receiver.readings.map((v) => v.plugin)); + expect(plugins).toContain("testplugin"); + expect(plugins).not.toContain("memory"); + expect(plugins).not.toContain("cpu"); + // The configured 7s interval is what reaches the wire. + expect(receiver.readings.every((v) => v.intervalSeconds === 7)).toBe(true); + }); +}); + +// A per-target `interval` must override the client-level interval on the wire. +describe("CollectD per-target interval override", () => { + let nscp: NscpInstance; + let receiver: CollectdReceiver; + + beforeAll(async () => { + receiver = new CollectdReceiver(); + const port = await receiver.start(); + + nscp = new NscpInstance(); + await nscp.configure({ + "/modules": { + CheckSystem: "enabled", + CollectdClient: "enabled", + }, + "/settings/core": { + "metrics interval": "1s", + }, + "/settings/collectd/client": { + hostname: HOSTNAME, + // Client-level default — the per-target value below must win over this. + interval: "5", + }, + "/settings/collectd/client/targets/default": { + address: `127.0.0.1:${port}`, + // Per-target override. + interval: "13", + }, + }); + + nscp.start(); + }); + + afterAll(async () => { + await nscp?.stop(); + await receiver?.stop(); + }); + + it("reports the per-target interval, not the client-level one", async () => { + await receiver.waitFor((r) => r.length > 0); + expect(receiver.readings.length).toBeGreaterThan(0); + expect(receiver.readings.every((v) => v.intervalSeconds === 13)).toBe(true); + }); +});