Skip to content

Commit 7943e94

Browse files
docs: Batch 4+5 Doxygen — crypto, thrift, AI tier (21 headers)
Batch 4 (9 headers): aes_ctr, aes_core, aes_gcm, cipher_interface, pme, key_metadata, post_quantum, thrift/compact, thrift/types. Batch 5 (12 headers): event_bus, feature_reader, feature_writer, mpmc_ring, quantized_vector, wal_mapped_segment, wal, column_batch, streaming_sink, row_lineage, vector_type, tensor_bridge. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e486243 commit 7943e94

21 files changed

Lines changed: 1653 additions & 601 deletions

include/signet/ai/column_batch.hpp

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
//
2121
// Phase 9b: MPMC ColumnBatch Event Bus.
2222

23+
/// @file column_batch.hpp
24+
/// @brief Column-major batch of feature rows for zero-copy tensor wrapping
25+
/// and WAL serialization in ML inference pipelines.
26+
2327
#pragma once
2428

2529
#include "signet/error.hpp"
@@ -38,16 +42,17 @@
3842

3943
namespace signet::forge {
4044

41-
// Convenience alias
45+
/// Convenience alias for TensorDataType (shorter schema declarations).
4246
using TDT = TensorDataType;
4347

4448
// ============================================================================
4549
// ColumnDesc — schema descriptor for one column in a ColumnBatch
4650
// ============================================================================
4751

52+
/// Describes a single column in a ColumnBatch schema.
4853
struct ColumnDesc {
49-
std::string name;
50-
TensorDataType dtype = TensorDataType::FLOAT64; ///< physical storage type
54+
std::string name; ///< Column name (e.g. "price", "volume")
55+
TensorDataType dtype = TensorDataType::FLOAT64; ///< Physical storage type (always stored as double internally)
5156
};
5257

5358
// ============================================================================
@@ -57,17 +62,25 @@ struct ColumnDesc {
5762
// wrapping. Each column is a contiguous std::vector<double>.
5863
// ============================================================================
5964

65+
/// A column-major batch of feature rows for ML inference and WAL serialization.
66+
///
67+
/// Data is stored in column-major layout (columns_[col][row]) so each column
68+
/// is a contiguous double array suitable for zero-copy wrapping as a TensorView
69+
/// or ONNX OrtValue without transposition.
70+
///
71+
/// Typically shared across threads via SharedColumnBatch (std::shared_ptr).
72+
/// @see SharedColumnBatch, make_column_batch, EventBus
6073
class ColumnBatch {
6174
public:
6275
// -------------------------------------------------------------------------
6376
// Producer-side metadata (set before publishing)
6477
// -------------------------------------------------------------------------
6578

66-
std::string source_id; ///< exchange / feed identifier
67-
std::string symbol; ///< instrument symbol
68-
int64_t seq_first = 0; ///< first WAL sequence in this batch
69-
int64_t seq_last = 0; ///< last WAL sequence in this batch
70-
int64_t created_ns = 0; ///< batch creation timestamp (ns since epoch)
79+
std::string source_id; ///< Exchange / feed identifier
80+
std::string symbol; ///< Instrument symbol
81+
int64_t seq_first = 0; ///< First WAL sequence number in this batch
82+
int64_t seq_last = 0; ///< Last WAL sequence number in this batch
83+
int64_t created_ns = 0; ///< Batch creation timestamp (ns since epoch)
7184

7285
// -------------------------------------------------------------------------
7386
// Factory
@@ -92,11 +105,12 @@ class ColumnBatch {
92105
return b;
93106
}
94107

108+
/// Default constructor (empty batch, no schema).
95109
ColumnBatch() = default;
96-
ColumnBatch(ColumnBatch&&) = default;
97-
ColumnBatch& operator=(ColumnBatch&&) = default;
98-
ColumnBatch(const ColumnBatch&) = default;
99-
ColumnBatch& operator=(const ColumnBatch&) = default;
110+
ColumnBatch(ColumnBatch&&) = default; ///< Move constructor.
111+
ColumnBatch& operator=(ColumnBatch&&) = default; ///< Move assignment.
112+
ColumnBatch(const ColumnBatch&) = default; ///< Copy constructor.
113+
ColumnBatch& operator=(const ColumnBatch&) = default; ///< Copy assignment.
100114

101115
// -------------------------------------------------------------------------
102116
// Build API — called from producer thread
@@ -115,11 +129,17 @@ class ColumnBatch {
115129
return expected<void>{};
116130
}
117131

132+
/// Append one row from an initializer list (e.g. `push_row({1.0, 2.0})`).
133+
/// @param values Feature values (must match num_columns()).
134+
/// @return Error on schema mismatch.
118135
[[nodiscard]] expected<void> push_row(std::initializer_list<double> values) {
119136
std::vector<double> tmp(values);
120137
return push_row(tmp.data(), tmp.size());
121138
}
122139

140+
/// Append one row from a vector.
141+
/// @param values Feature values (must match num_columns()).
142+
/// @return Error on schema mismatch.
123143
[[nodiscard]] expected<void> push_row(const std::vector<double>& values) {
124144
return push_row(values.data(), values.size());
125145
}
@@ -128,10 +148,14 @@ class ColumnBatch {
128148
// Query API — called from consumer / ML thread
129149
// -------------------------------------------------------------------------
130150

151+
/// Number of rows currently in the batch.
131152
[[nodiscard]] size_t num_rows() const noexcept { return num_rows_; }
153+
/// Number of columns defined by the schema.
132154
[[nodiscard]] size_t num_columns() const noexcept { return schema_.size(); }
155+
/// True if the batch contains no rows.
133156
[[nodiscard]] bool empty() const noexcept { return num_rows_ == 0; }
134157

158+
/// The schema (column descriptors) this batch was created with.
135159
[[nodiscard]] const std::vector<ColumnDesc>& schema() const noexcept {
136160
return schema_;
137161
}
@@ -163,6 +187,13 @@ class ColumnBatch {
163187
// buffer. output_dtype defaults to FLOAT32 for ONNX compatibility.
164188
// -------------------------------------------------------------------------
165189

190+
/// Assemble all columns into a single 2D [rows x cols] OwnedTensor.
191+
///
192+
/// Uses BatchTensorBuilder internally. The default output type is FLOAT32
193+
/// for direct ONNX Runtime consumption.
194+
///
195+
/// @param output_dtype Desired element type (default FLOAT32).
196+
/// @return OwnedTensor of shape {num_rows, num_columns}, or Error if empty.
166197
[[nodiscard]] expected<OwnedTensor> as_tensor(
167198
TensorDataType output_dtype = TensorDataType::FLOAT32) const {
168199

@@ -191,6 +222,14 @@ class ColumnBatch {
191222
// [float64 values × num_rows] × num_columns (column-major)
192223
// -------------------------------------------------------------------------
193224

225+
/// Serialize the batch into a WAL StreamRecord.
226+
///
227+
/// The binary payload uses little-endian column-major format. The default
228+
/// type_id 0x434F4C42 ("COLB") identifies ColumnBatch records in the WAL.
229+
///
230+
/// @param timestamp_ns Override timestamp (0 = use created_ns).
231+
/// @param type_id Record type tag for WAL routing.
232+
/// @return StreamRecord with the serialized batch payload.
194233
[[nodiscard]] StreamRecord to_stream_record(
195234
int64_t timestamp_ns = 0,
196235
uint32_t type_id = 0x434F4C42u /*"COLB"*/) const {
@@ -241,6 +280,13 @@ class ColumnBatch {
241280
// Deserialise a StreamRecord payload back into a ColumnBatch
242281
// -------------------------------------------------------------------------
243282

283+
/// Deserialize a StreamRecord payload back into a ColumnBatch.
284+
///
285+
/// Inverse of to_stream_record(). Reads the binary column-major format
286+
/// and reconstructs the schema, columns, and row data.
287+
///
288+
/// @param rec StreamRecord previously produced by to_stream_record().
289+
/// @return Reconstructed ColumnBatch, or Error on truncated/corrupt payload.
244290
[[nodiscard]] static expected<ColumnBatch> from_stream_record(
245291
const StreamRecord& rec) {
246292

@@ -299,11 +345,14 @@ class ColumnBatch {
299345
// Utility
300346
// -------------------------------------------------------------------------
301347

348+
/// Clear all row data while preserving the schema.
302349
void clear() {
303350
for (auto& col : columns_) col.clear();
304351
num_rows_ = 0;
305352
}
306353

354+
/// Pre-allocate storage for the given number of rows in each column.
355+
/// @param rows Number of rows to reserve capacity for.
307356
void reserve(size_t rows) {
308357
for (auto& col : columns_) col.reserve(rows);
309358
}
@@ -318,6 +367,8 @@ class ColumnBatch {
318367
// SharedColumnBatch — the unit transferred between threads
319368
// ---------------------------------------------------------------------------
320369

370+
/// Thread-safe shared pointer to a ColumnBatch -- the unit transferred
371+
/// between producer and consumer threads via EventBus.
321372
using SharedColumnBatch = std::shared_ptr<ColumnBatch>;
322373

323374
/// Convenience factory: create a shared batch with a given schema.

include/signet/ai/event_bus.hpp

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// Copyright 2026 Johnson Ogundeji
3-
// event_bus.hpp — Multi-tier event bus for SignetStack Signet Forge
3+
/// @file event_bus.hpp
4+
/// @brief Multi-tier event bus for routing SharedColumnBatch events through three tiers.
45
//
56
// Routes SharedColumnBatch events through three tiers:
67
//
@@ -57,6 +58,12 @@ namespace signet::forge {
5758
// EventBusOptions — defined outside EventBus for Apple Clang compat
5859
// ============================================================================
5960

61+
/// Configuration options for EventBus.
62+
///
63+
/// Defined at namespace scope (not nested in EventBus) to work around an
64+
/// Apple Clang restriction on default member initializers in nested aggregates.
65+
///
66+
/// @see EventBus
6067
struct EventBusOptions {
6168
size_t tier2_capacity = 4096; ///< Tier-2 MPMC ring capacity (power-of-2)
6269
size_t tier1_capacity = 256; ///< Default capacity for make_channel()
@@ -67,27 +74,42 @@ struct EventBusOptions {
6774
// EventBus — multi-tier SharedColumnBatch router
6875
// ============================================================================
6976

77+
/// Multi-tier event bus for routing SharedColumnBatch events.
78+
///
79+
/// Routes batches through three tiers:
80+
/// - **Tier 1** -- SPSC dedicated channels (sub-us, one per producer-consumer pair).
81+
/// - **Tier 2** -- MPMC shared pool (us, N producers to M worker threads).
82+
/// - **Tier 3** -- WAL / StreamingSink (ms, async durable logging).
83+
///
84+
/// The bus is non-copyable and non-movable due to its internal mutex.
85+
/// Wrap in `std::unique_ptr<EventBus>` if heap allocation is needed.
86+
///
87+
/// @see EventBusOptions, MpmcRing, StreamingSink
7088
class EventBus {
7189
public:
90+
/// Alias for the options struct.
7291
using Options = EventBusOptions;
7392

7493
// -------------------------------------------------------------------------
7594
// Channel — Tier-1 dedicated MpmcRing used as SPSC
7695
// -------------------------------------------------------------------------
7796

97+
/// Tier-1 dedicated channel type (MpmcRing used as SPSC when single-writer).
7898
using Channel = MpmcRing<SharedColumnBatch>;
7999

80100
// -------------------------------------------------------------------------
81101
// Construction
82102
// -------------------------------------------------------------------------
83103

104+
/// Construct an EventBus with the given options.
105+
/// @param opts Configuration controlling tier capacities and Tier-3 enablement.
84106
explicit EventBus(Options opts = {})
85107
: opts_(opts)
86108
, tier2_(std::make_unique<MpmcRing<SharedColumnBatch>>(
87109
opts.tier2_capacity)) {}
88110

89-
EventBus(EventBus&&) = delete;
90-
EventBus& operator=(EventBus&&) = delete;
111+
EventBus(EventBus&&) = delete; ///< Non-movable (contains mutex).
112+
EventBus& operator=(EventBus&&) = delete; ///< Non-movable (contains mutex).
91113
EventBus(const EventBus&) = delete;
92114
EventBus& operator=(const EventBus&) = delete;
93115
~EventBus() = default;
@@ -113,7 +135,9 @@ class EventBus {
113135
return ch;
114136
}
115137

116-
/// Look up an existing channel; returns nullptr if not found.
138+
/// Look up an existing channel by name.
139+
/// @param name Logical channel identifier.
140+
/// @return Shared pointer to the channel, or nullptr if not found.
117141
[[nodiscard]] std::shared_ptr<Channel> channel(
118142
const std::string& name) const {
119143
std::lock_guard<std::mutex> lk(channels_mutex_);
@@ -129,7 +153,8 @@ class EventBus {
129153
///
130154
/// Also forwards to Tier-3 sink (if attached and opts_.enable_tier3).
131155
///
132-
/// Returns false if the Tier-2 ring is full (caller may retry or drop).
156+
/// @param batch The column batch to publish (moved into the ring).
157+
/// @return true on success; false if the Tier-2 ring is full (caller may retry or drop).
133158
bool publish(SharedColumnBatch batch) {
134159
// Tier 3 first (cheap shared_ptr copy, async submit)
135160
if (opts_.enable_tier3 && sink_) {
@@ -147,8 +172,9 @@ class EventBus {
147172
return true;
148173
}
149174

150-
/// Pop from the shared Tier-2 ring (for worker threads).
151-
/// Returns false if the ring is empty (caller should back off / yield).
175+
/// Pop a batch from the shared Tier-2 ring (for worker threads).
176+
/// @param out Receives the popped batch on success.
177+
/// @return true if a batch was popped; false if the ring is empty (caller should back off / yield).
152178
bool pop(SharedColumnBatch& out) {
153179
return tier2_->pop(out);
154180
}
@@ -158,15 +184,21 @@ class EventBus {
158184
// =========================================================================
159185

160186
/// Attach a StreamingSink for Tier-3 durable logging.
187+
///
161188
/// The bus does NOT take ownership; the sink must outlive the bus.
189+
///
190+
/// @param sink Pointer to an externally owned StreamingSink.
162191
void attach_sink(StreamingSink* sink) {
163192
sink_ = sink;
164193
}
165194

195+
/// Detach the currently attached Tier-3 sink (no-op if none attached).
166196
void detach_sink() {
167197
sink_ = nullptr;
168198
}
169199

200+
/// Check whether a Tier-3 StreamingSink is currently attached.
201+
/// @return true if a sink is attached.
170202
[[nodiscard]] bool has_sink() const noexcept {
171203
return sink_ != nullptr;
172204
}
@@ -175,12 +207,15 @@ class EventBus {
175207
// Stats
176208
// =========================================================================
177209

210+
/// Snapshot of cumulative event bus counters.
178211
struct Stats {
179-
uint64_t published = 0;
180-
uint64_t dropped = 0; ///< Tier-2 ring full
181-
uint64_t tier3_drops = 0; ///< Tier-3 sink full / not attached
212+
uint64_t published = 0; ///< Total batches successfully enqueued to Tier-2.
213+
uint64_t dropped = 0; ///< Batches dropped because the Tier-2 ring was full.
214+
uint64_t tier3_drops = 0; ///< Batches dropped at the Tier-3 sink (full or not attached).
182215
};
183216

217+
/// Return a snapshot of the cumulative event bus statistics.
218+
/// @return Stats struct with relaxed-order atomic reads.
184219
[[nodiscard]] Stats stats() const noexcept {
185220
return Stats{
186221
published_.load(std::memory_order_relaxed),
@@ -189,6 +224,7 @@ class EventBus {
189224
};
190225
}
191226

227+
/// Reset all counters to zero.
192228
void reset_stats() noexcept {
193229
published_.store(0, std::memory_order_relaxed);
194230
dropped_.store(0, std::memory_order_relaxed);
@@ -199,8 +235,11 @@ class EventBus {
199235
// Introspection
200236
// =========================================================================
201237

238+
/// Approximate number of batches currently in the Tier-2 ring.
202239
[[nodiscard]] size_t tier2_size() const noexcept { return tier2_->size(); }
240+
/// Capacity of the Tier-2 MPMC ring (power-of-two, set by EventBusOptions).
203241
[[nodiscard]] size_t tier2_capacity() const noexcept { return tier2_->capacity(); }
242+
/// Number of named Tier-1 channels that have been created.
204243
[[nodiscard]] size_t num_channels() const {
205244
std::lock_guard<std::mutex> lk(channels_mutex_);
206245
return channels_.size();

0 commit comments

Comments
 (0)