Skip to content

Commit 51a71eb

Browse files
Improbe trace data decoder in case of corrupted messages
1 parent 937c7b2 commit 51a71eb

File tree

1 file changed

+69
-39
lines changed

1 file changed

+69
-39
lines changed

rustmeter-cli/src/tracing/trace_data_decoder.rs

Lines changed: 69 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -53,59 +53,69 @@ impl TraceDataDecoder {
5353
self.internal_buffer.extend(data);
5454
}
5555

56-
pub fn decode(&mut self) -> anyhow::Result<Vec<TracingItem>> {
57-
// Check if we have enough data for a header (TODO: improve this check by peeking)
58-
if self.internal_buffer.len() < 100 {
59-
return Ok(vec![]);
60-
}
61-
56+
/// Decode a single tracing item from the internal buffer, advance
57+
/// the timestamp and drain the read bytes from the buffer.
58+
fn decode_single(&mut self) -> Option<TracingItem> {
6259
// Prepare monitor type lookup function
6360
let monitors = self.monitors.clone();
6461
let monitor_type_fn = move |monitor_id: u8| -> Option<u8> {
6562
monitors.lock().unwrap().get(&monitor_id).cloned()
6663
};
6764

68-
// TODO: Optimize decoding loop to avoid reallocations
69-
// TODO: Check when decoding failed to go to next byte instead of stopping (message is corrupted)
70-
71-
// Try to decode some bytes
65+
// Create buffer reader
7266
self.internal_buffer.make_contiguous();
7367
let mut buffer = BufferReader::new(self.internal_buffer.as_slices().0);
74-
let mut items = vec![];
75-
loop {
76-
match read_tracing_event(&mut buffer, &monitor_type_fn) {
77-
Some((timedelta, payload)) => {
78-
// Advance the timestamp
79-
let timestamp = self.last_timestamp
80-
+ Duration::from_micros(timedelta.get_delta_us() as u64);
81-
self.last_timestamp = timestamp;
82-
83-
// Check for monitor registration events
84-
if let EventPayload::TypeDefinition(definition) = &payload {
85-
if let TypeDefinitionPayload::ValueMonitor {
86-
type_id, value_id, ..
87-
} = definition
88-
{
89-
let mut monitors = self.monitors.lock().unwrap();
90-
monitors.insert(*value_id, *type_id);
91-
}
92-
}
93-
94-
// Store the item
95-
items.push(TracingItem::new(timestamp, payload));
68+
69+
if let Some((timedelta, payload)) = read_tracing_event(&mut buffer, &monitor_type_fn) {
70+
// Advance the timestamp
71+
let timestamp =
72+
self.last_timestamp + Duration::from_micros(timedelta.get_delta_us() as u64);
73+
self.last_timestamp = timestamp;
74+
75+
// Check for monitor registration events
76+
if let EventPayload::TypeDefinition(definition) = &payload {
77+
if let TypeDefinitionPayload::ValueMonitor {
78+
type_id, value_id, ..
79+
} = definition
80+
{
81+
let mut monitors = self.monitors.lock().unwrap();
82+
monitors.insert(*value_id, *type_id);
9683
}
97-
None => break,
9884
}
9985

100-
// Check if we have enough data for a header (TODO: improve this check by peeking)
101-
if self.internal_buffer.len() - buffer.get_position() < 100 {
102-
break;
86+
// Remove the already read bytes from the internal buffer
87+
let read_bytes = buffer.get_position();
88+
self.internal_buffer.drain(0..read_bytes);
89+
90+
return Some(TracingItem::new(timestamp, payload));
91+
}
92+
93+
None
94+
}
95+
96+
/// Decode all available tracing items from the internal buffer. If no
97+
/// items could be decoded, but the buffer has significant data, it will
98+
/// try to recover by removing bytes from the start of the buffer until
99+
/// valid data is found.
100+
pub fn decode(&mut self) -> anyhow::Result<Vec<TracingItem>> {
101+
// Decode all available items
102+
let mut items = vec![];
103+
while let Some(item) = self.decode_single() {
104+
items.push(item);
105+
}
106+
107+
// Check for corrupted data (buffer cannot read any item, but it has significant data)
108+
// 32 bytes is buffer size on target side
109+
while items.is_empty() && self.internal_buffer.len() > 32 {
110+
// Clear first byte till recovering
111+
self.internal_buffer.pop_front();
112+
113+
// Try decoding again
114+
while let Some(item) = self.decode_single() {
115+
items.push(item);
103116
}
104117
}
105118

106-
// Remove the already read bytes from the internal buffer
107-
let read_bytes = buffer.get_position();
108-
self.internal_buffer.drain(0..read_bytes);
109119
Ok(items)
110120
}
111121
}
@@ -261,4 +271,24 @@ mod tests {
261271

262272
test_trace_data_decoder_continuius();
263273
}
274+
275+
#[test]
276+
pub fn test_trace_data_decoder_empty() {
277+
let mut decoder = TraceDataDecoder::new();
278+
let decoded_items = decoder.decode().unwrap();
279+
assert_eq!(decoded_items.len(), 0);
280+
}
281+
282+
#[test]
283+
fn test_trace_data_decoder() {
284+
test_trace_data_decoder_sequence();
285+
286+
// Reset RTT channel
287+
{
288+
let (_sender, receiver) = &*RTT_CHANNEL;
289+
while receiver.try_recv().is_ok() {}
290+
}
291+
292+
test_trace_data_decoder_continuius();
293+
}
264294
}

0 commit comments

Comments
 (0)