Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 95 additions & 110 deletions include/net/collectd/collectd_packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,42 @@
#include <stdint.h>

#include <boost/endian/conversion.hpp>
#include <boost/noncopyable.hpp>
#include <boost/optional.hpp>
#include <boost/tuple/tuple.hpp>
#include <cstring>
#include <list>
#include <map>
#include <sstream>
#include <str/utils.hpp>
#include <str/xtos.hpp>

namespace collectd {
class data {
public:
#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable : 4200) // flexible-array member is intentional
#endif
struct string_part : boost::noncopyable {
int16_t type;
int16_t length;
char data[];

char *get_data() { return &data[0]; }
};
#if defined(_MSC_VER)
#pragma warning(pop)
#endif
struct int64_part : boost::noncopyable {
int16_t type;
int16_t length;
int64_t data;
};
struct value_part : boost::noncopyable {
int16_t type;
int16_t length;
int16_t count;
};
struct derive_value_part : boost::noncopyable {
int8_t type;
int64_t value;
};
// collectd "part" type codes (see https://collectd.org/wiki/index.php/Binary_protocol).
enum part_type : int16_t {
part_host = 0x0000,
part_time_hr = 0x0008,
part_plugin = 0x0002,
part_plugin_instance = 0x0003,
part_type = 0x0004,
part_type_instance = 0x0005,
part_values = 0x0006,
part_interval_hr = 0x0009,
};

// Value type codes inside a values part.
enum value_type : uint8_t {
value_gauge = 0x01,
value_derive = 0x02,
};

// The collectd network plugin reads at most this many bytes per datagram, so
// we must not emit a packet larger than this (see Network plugin
// `MaxPacketSize`, default 1452).
static const std::size_t max_packet_size = 1452;

// Longest string a single part may carry: it must fit in one datagram alongside
// the 4-byte part header and the trailing NUL.
static const std::size_t max_string_length = max_packet_size - 5;

class collectd_exception : public std::exception {
std::string msg_;

Expand Down Expand Up @@ -92,95 +86,82 @@ class packet {
return ss.str();
}

void add_host(std::string value) { append_string(0x0000, value); }
void add_plugin(std::string value) { append_string(0x0002, value); }
void add_plugin_instance(std::string value) { append_string(0x0003, value); }
void add_type(std::string value) { append_string(0x0004, value); }
void add_type_instance(std::string value) { append_string(0x0005, value); }
void add_gauge_value(std::list<double> values) { append_values(0x00006, 0x01, values); }
void add_derive_value(std::list<long long> values) { append_values(0x00006, 0x02, values); }
void add_time_hr(unsigned long long time) { append_int(0x0008, time); }
void add_interval_hr(unsigned long long time) { append_int(0x0009, time); }

bool is_full() const { return buffer.size() > 200; }

void add_host(const std::string &value) { append_string(part_host, value); }
void add_plugin(const std::string &value) { append_string(part_plugin, value); }
void add_plugin_instance(const std::string &value) { append_string(part_plugin_instance, value); }
void add_type(const std::string &value) { append_string(part_type, value); }
void add_type_instance(const std::string &value) { append_string(part_type_instance, value); }
void add_gauge_value(const std::list<double> &values) { append_values(value_gauge, values); }
void add_derive_value(const std::list<long long> &values) { append_values(value_derive, values); }
void add_time_hr(unsigned long long time) { append_int(part_time_hr, time); }
void add_interval_hr(unsigned long long time) { append_int(part_interval_hr, time); }

// A packet is "full" once adding another value-list could overflow the
// network buffer. Leave headroom for one more value-list (a handful of
// string parts plus the values) so render() never emits an oversized packet.
bool is_full() const { return buffer.size() > max_packet_size - 128; }

// All multi-byte integers in the collectd protocol are big-endian. Build the
// big-endian value in a properly aligned local and append its bytes, rather
// than reinterpret_cast-ing into the std::string buffer (which is unaligned
// and aliasing UB, and trips the sanitizers).
template <class T>
inline void set_byte(std::string &buffer, const std::string::size_type pos, const T value) {
T *b_value = reinterpret_cast<T *>(&buffer[pos]);
*b_value = boost::endian::native_to_big(value);
static void append_be(std::string &buf, T value) {
const T be = boost::endian::native_to_big(value);
buf.append(reinterpret_cast<const char *>(&be), sizeof(be));
}

void append_string(int16_t type, std::string &string_data) {
int16_t len = static_cast<int16_t>(string_data.length()) + 5;
std::string::size_type pos = buffer.length();
buffer.append(sizeof(collectd::data::string_part), '\0');
collectd::data::string_part *data = reinterpret_cast<collectd::data::string_part *>(&buffer[pos]);
data->type = boost::endian::native_to_big(type);
data->length = boost::endian::native_to_big(len);
buffer.append(string_data.c_str(), string_data.length() + 1);
// A string part: type(2) + length(2) + NUL-terminated string. The length
// field covers the 4-byte header and the trailing NUL.
//
// The length field is an (unsigned) 16-bit value and the whole part must fit
// inside one datagram, so clamp pathologically long strings — e.g. a
// misconfigured hostname/plugin/type — instead of letting the cast to int16_t
// silently wrap and emit a malformed part.
void append_string(int16_t type, const std::string &string_data) {
const std::size_t data_len = string_data.size() > max_string_length ? max_string_length : string_data.size();
const int16_t len = static_cast<int16_t>(data_len + 5);
append_be<int16_t>(buffer, type);
append_be<int16_t>(buffer, len);
buffer.append(string_data, 0, data_len);
buffer.push_back('\0');
}
// A number part: type(2) + length(2, always 12) + uint64(8).
void append_int(int16_t type, unsigned long long int_data) {
std::string::size_type pos = buffer.length();
buffer.append(sizeof(int16_t) + sizeof(int16_t) + sizeof(int64_t), '\0');
int16_t len = static_cast<int16_t>(buffer.length() - pos);
set_byte<uint16_t>(buffer, pos, type);
set_byte<uint16_t>(buffer, pos + sizeof(int16_t), len);
set_byte<uint64_t>(buffer, pos + sizeof(int16_t) + sizeof(int16_t), int_data);
append_be<int16_t>(buffer, type);
append_be<int16_t>(buffer, static_cast<int16_t>(12));
append_be<uint64_t>(buffer, static_cast<uint64_t>(int_data));
}
void append_values(int16_t base_type, int value_type, const std::list<double> &value_data) {
std::string::size_type pos = buffer.length();
buffer.append(sizeof(collectd::data::value_part), '\0');
for (std::size_t i = 0; i < value_data.size(); i++) {
append_value_type(value_type);
}
for (const double &v : value_data) {
append_value_value(v);
}
int16_t len = static_cast<int16_t>(buffer.length() - pos);
collectd::data::value_part *data = reinterpret_cast<collectd::data::value_part *>(&buffer[pos]);
data->type = boost::endian::native_to_big(base_type);
data->count = boost::endian::native_to_big(static_cast<int16_t>(value_data.size()));
data->length = boost::endian::native_to_big(len);
}
void append_values(int16_t base_type, int value_type, const std::list<long long> &value_data) {
std::string::size_type pos = buffer.length();
buffer.append(sizeof(collectd::data::value_part), '\0');
// A values part: type(2) + length(2) + count(2) + count value-type bytes +
// count 8-byte values. Gauge values are little-endian IEEE-754 doubles;
// derive values are big-endian int64. Build the body first, then prefix the
// header with the now-known length.
template <class T>
void append_values(uint8_t value_type, const std::list<T> &value_data) {
std::string body;
body.reserve(value_data.size() * 9);
for (std::size_t i = 0; i < value_data.size(); i++) {
append_value_type(value_type);
body.push_back(static_cast<char>(value_type));
}
for (const long long &v : value_data) {
append_value_value(v);
for (const T &v : value_data) {
append_value(body, v);
}
int16_t len = static_cast<int16_t>(buffer.length() - pos);
collectd::data::value_part *data = reinterpret_cast<collectd::data::value_part *>(&buffer[pos]);
data->type = boost::endian::native_to_big(base_type);
data->count = boost::endian::native_to_big(static_cast<int16_t>(value_data.size()));
data->length = boost::endian::native_to_big(len);
const int16_t len = static_cast<int16_t>(6 + body.size());
append_be<int16_t>(buffer, part_values);
append_be<int16_t>(buffer, len);
append_be<int16_t>(buffer, static_cast<int16_t>(value_data.size()));
buffer.append(body);
}

void append_value_type(int type) {
std::string::size_type pos = buffer.length();
int sz = sizeof(int8_t);
buffer.append(sz, '\0');
int8_t *b_type = reinterpret_cast<int8_t *>(&buffer[pos]);
*b_type = static_cast<int8_t>(type);
}
void append_value_value(const double double_data) {
std::string::size_type pos = buffer.length();
int sz = sizeof(int64_t);
buffer.append(sz, '\0');
// int64_t *b_value = reinterpret_cast<int64_t*>(&buffer[pos + sizeof(int8_t)]);
// *b_value = boost::endian::native_to_big(static_cast<int64_t>(*int_data));
double *b_dvalue = reinterpret_cast<double *>(&buffer[pos]);
*b_dvalue = double_data;
}
void append_value_value(const long long int_data) {
std::string::size_type pos = buffer.length();
int sz = sizeof(int64_t);
buffer.append(sz, '\0');
int64_t *b_value = reinterpret_cast<int64_t *>(&buffer[pos]);
*b_value = boost::endian::native_to_big(static_cast<int64_t>(int_data));
// Gauge: little-endian double (collectd stores gauges in x86 byte order).
static void append_value(std::string &buf, double value) {
uint64_t bits = 0;
std::memcpy(&bits, &value, sizeof(bits));
const uint64_t le = boost::endian::native_to_little(bits);
buf.append(reinterpret_cast<const char *>(&le), sizeof(le));
}
// Derive/counter: big-endian int64.
static void append_value(std::string &buf, long long value) { append_be<int64_t>(buf, static_cast<int64_t>(value)); }

std::string get_buffer() const { return buffer; }

Expand Down Expand Up @@ -392,7 +373,11 @@ struct collectd_builder {
is_new = true;
}
}
packets.push_back(packet);
// Only emit the trailing packet if it actually holds data. When nothing
// rendered (e.g. an unmatched variable) or the last metric exactly filled
// and flushed a packet, `packet` is an empty default-constructed buffer and
// must not be sent as a zero-length datagram.
if (packet.get_size() > 0) packets.push_back(packet);
}
void set_metric(const ::std::string &key, const std::string &value);
};
Expand Down
16 changes: 16 additions & 0 deletions modules/CollectdClient/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,19 @@ target_link_libraries(
include(${BUILD_CMAKE_FOLDER}/module.cmake)
source_group("Client" REGULAR_EXPRESSION .*include/collectd/.*)
source_group("Socket" REGULAR_EXPRESSION .*include/socket/.*)

NSCP_CREATE_TEST(
collectd_packet_test
SOURCES
collectd_packet_test.cpp
${NSCP_INCLUDEDIR}/net/collectd/collectd_packet.cpp
${NSCP_INCLUDEDIR}/str/utf8.cpp
LIBRARIES
GTest::gtest_main
expression_parser
${Boost_REGEX_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
INCLUDES
${NSCP_INCLUDEDIR}
${CMAKE_CURRENT_SOURCE_DIR}
)
35 changes: 27 additions & 8 deletions modules/CollectdClient/CollectdClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
* @return
*/
CollectdClient::CollectdClient()
: client_("nsca", std::make_shared<collectd_client::collectd_client_handler>(), std::make_shared<collectd_handler::options_reader_impl>()) {}
: handler_(std::make_shared<collectd_client::collectd_client_handler>()),
client_("collectd", handler_, std::make_shared<collectd_handler::options_reader_impl>()) {}

/**
* Default d-tor
Expand All @@ -52,30 +53,48 @@ bool CollectdClient::loadModuleEx(std::string alias, NSCAPI::moduleLoadMode) {

client_.set_path(target_path);

unsigned int interval = 10;

// clang-format off
settings.alias().add_path_to_settings()
("COLLECTD CLIENT SECTION", "Section for NSCA passive check module.")
("COLLECTD CLIENT SECTION", "Section for the collectd client; forwards NSClient++ metrics to a collectd server.")

("targets", sh::fun_values_path([this] (auto key, auto value) { this->add_target(key, value); }),
"REMOTE TARGET DEFINITIONS", "",
"TARGET", "For more configuration options add a dedicated section")

("variables", sh::fun_values_path([this] (auto key, auto value) { this->handler_->add_variable(key, value); }),
"VARIABLE DEFINITIONS",
"Variables used to expand ${...} placeholders in metric keys. Each value is a regular expression matched against metric names; "
"the captured groups become the variable's values. When empty a built-in default set is used.")

("metrics", sh::fun_values_path([this] (auto key, auto value) { this->handler_->add_metric(key, value); }),
"METRIC MAPPINGS",
"Mapping of collectd keys (e.g. cpu-total/cpu-user) to value expressions (e.g. derive:system.cpu.total.user). "
"When empty a built-in default set is used.")
;

// clang-format on
settings.alias().add_key_to_settings().add_string(
"hostname", sh::string_key(&hostname_, "auto"), "HOSTNAME",
"The host name of the monitored computer.\nSet this to auto (default) to use the windows name of the computer.\n\n"
settings.alias().add_key_to_settings()
.add_string("hostname", sh::string_key(&hostname_, "auto"), "HOSTNAME",
"The host name reported to collectd.\nSet this to auto (default) to use the name of this computer.\n\n"
"auto\tHostname\n"
"${host}\tHostname\n"
"${host_lc}\nHostname in lowercase\n"
"${host_lc}\tHostname in lowercase\n"
"${host_uc}\tHostname in uppercase\n"
"${domain}\tDomainname\n"
"${domain_lc}\tDomainname in lowercase\n"
"${domain_uc}\tDomainname in uppercase\n");
"${domain_uc}\tDomainname in uppercase\n")

.add_int("interval", sh::uint_key(&interval, 10), "METRICS INTERVAL",
"The interval (in seconds) reported to collectd. Should match the core 'metrics interval' so collectd computes rates correctly.")
;
// clang-format on

settings.register_all();
settings.notify();

handler_->set_interval(interval);

client_.finalize(nscapi::settings_proxy::create(get_id(), get_core()));

nscapi::core_helper core(get_core(), get_id());
Expand Down
7 changes: 7 additions & 0 deletions modules/CollectdClient/CollectdClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@
#include <nscapi/nscapi_targets.hpp>
#include <nscapi/protobuf/metrics.hpp>

#include <memory>

namespace po = boost::program_options;
namespace sh = nscapi::settings_helper;

namespace collectd_client {
struct collectd_client_handler;
}

class CollectdClient : public nscapi::impl::simple_plugin {
private:
std::string hostname_;

std::shared_ptr<collectd_client::collectd_client_handler> handler_;
client::configuration client_;

public:
Expand Down
Loading
Loading