Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 77 additions & 49 deletions score/mw/com/example/com-api-example/basic-consumer-producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,33 +328,25 @@ mod test {
for attempt in 0..MAX_ATTEMPTS {
println!("[RECEIVER] Attempt {}", attempt);

// Match and immediately reassign in all branches
sample_buf = match subscribed.receive(sample_buf, 1, 3).await {
Ok(returned_buf) => {
let count = returned_buf.sample_count();

if count > 0 {
total_received += count;
println!(
"[RECEIVER] Received {} samples (total: {})",
count, total_received
);

// Create a mutable version to pop from
let mut buf = returned_buf;
while let Some(sample) = buf.pop_front() {
println!("[RECEIVER] Sample: {:.2} psi", sample.pressure);
}
buf
} else {
returned_buf
}
}
Err(e) => {
// Destructure tuple - container is always returned even on error
let (returned_buf, result) = subscribed.receive(sample_buf, 1, 3).await;
sample_buf = {
let count = returned_buf.sample_count();
if let Err(e) = result {
println!("[RECEIVER] Error on attempt {}: {:?}", attempt, e);
// Create a fresh buffer if there's an error
SampleContainer::new(5)
} else if count > 0 {
total_received += count;
println!(
"[RECEIVER] Received {} samples (total: {})",
count, total_received
);
}
// Drain printed samples
let mut buf = returned_buf;
while let Some(sample) = buf.pop_front() {
println!("[RECEIVER] Sample: {:.2} psi", sample.pressure);
}
buf
};
}

Expand All @@ -376,7 +368,7 @@ mod test {
let uninit_sample = match offered_producer.left_tire.allocate() {
Ok(sample) => sample,
Err(e) => {
eprintln!("Failed to allocate sample: {:?}", e);
eprintln!("[SENDER] Failed to allocate sample: {:?}", e);
continue;
}
};
Expand All @@ -385,39 +377,38 @@ mod test {
});
match sample.send() {
Ok(_) => (),
Err(e) => eprintln!("Failed to send sample: {:?}", e),
Err(e) => eprintln!("[SENDER] Failed to send sample: {:?}", e),
}
println!("Sent sample with pressure: {}", 1.0 + i as f32);
println!("[SENDER] Sent sample with pressure: {}", 1.0 + i as f32);
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
}
offered_producer
}

//receiver function which use async receive to get data, it waits for new data and process it once it arrives,
//it will receive data 10 times and print the received samples
async fn async_data_processor_fn<R: Runtime>(subscribed: impl Subscription<Tire, R>) {
async fn async_data_processor_fn<R: Runtime>(
subscribed: impl Subscription<Tire, R>,
is_timeout: bool,
) {
println!("[RECEIVER] Async data processor started");
let mut buffer = SampleContainer::new(5);
for _ in 0..5 {
buffer = match subscribed.receive(buffer, 2, 3).await {
Ok(returned_buf) => {
let count = returned_buf.sample_count();
if count > 0 {
println!("[RECEIVER] Received {} samples", count);
let mut buf = returned_buf;
while let Some(sample) = buf.pop_front() {
println!("[RECEIVER] Sample: {:.2} psi", sample.pressure);
}
buf
} else {
returned_buf
}
}
Err(e) => {
println!("[RECEIVER] Error receiving data: {:?}", e);
SampleContainer::new(5)
}
let (returned_buf, result) = if is_timeout {
let timeout = tokio::time::sleep(Duration::from_millis(1000));
subscribed.receive_timeout(buffer, 2, 3, timeout).await
} else {
subscribed.receive(buffer, 2, 3).await
};
match result {
Ok(count) => println!("[RECEIVER] Received {} samples", count),
Err(e) => eprintln!("[RECEIVER] Failed to receive samples: {:?}", e),
}
let mut buf = returned_buf;
while let Some(sample) = buf.pop_front() {
println!("[RECEIVER] Sample: {:.2} psi", sample.pressure);
}
buffer = buf;
}
}

Expand All @@ -444,8 +435,45 @@ mod test {
let consumer = consumer.await.expect("Failed to create consumer");
// Subscribe to one event
let subscribed = consumer.left_tire.subscribe(5).unwrap();
// Spawn async data processor
let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed));

let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed, false));
processor_join_handle
.await
.expect("Error returned from task");
let producer = sender_join_handle.await.expect("Error returned from task");

match producer.unoffer() {
Ok(_) => println!("Successfully unoffered the service"),
Err(e) => eprintln!("Failed to unoffer: {:?}", e),
}

println!("=== Async subscription test with Lola runtime completed ===\n");
}

#[tokio::test(flavor = "multi_thread")]
async fn receive_with_timeout_and_send_using_multi_thread() {
println!("Starting async subscription test with Lola runtime");
//Intentionally using service instance of test1, if you face issue add new service instance in config file and use it here.
let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance")
.expect("Failed to create InstanceSpecifier");
let service_id_clone = service_id.clone();
//consumer create
let consumer_runtime = get_test_runtime();
//starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result
let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id));
//simulate some delay before producer offer service, so that consumer is waiting for discovery
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
//Producer create
let producer_runtime = get_test_runtime();
let producer = create_producer(producer_runtime, service_id_clone);
// Spawn async data sender
let sender_join_handle = tokio::spawn(async_data_sender_fn(producer));
// Await consumer creation and subscribe to events
let consumer = consumer.await.expect("Failed to create consumer");
// Subscribe to one event
let subscribed = consumer.left_tire.subscribe(5).unwrap();

let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed, true));
processor_join_handle
.await
.expect("Error returned from task");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ impl InstanceSpecifier {
}

// Remove the single leading slash
let service_name = service_name.strip_prefix('/').unwrap();
let Some(service_name) = service_name.strip_prefix('/') else {
return false;
};

// Check each character
// Allowed: digits, lowercase, uppercase, underscore
Expand Down Expand Up @@ -800,7 +802,7 @@ pub trait Subscription<T: CommData + Debug, R: Runtime + ?Sized> {
/// # Parameters
/// * `scratch` - Container for events from this subscription; must not be reused across
/// different subscriptions
/// * `max_samples` - Maximum number of events to transfer
/// * `max_samples` - Maximum number of events to transfer, 0 value is treated as error
///
/// # Returns
///
Expand All @@ -823,13 +825,17 @@ pub trait Subscription<T: CommData + Debug, R: Runtime + ?Sized> {
/// TODO: See above for C++ limitations.
/// # Parameters
/// * `scratch` - Container for events from this subscription
/// * `new_samples` - Minimum number of new events before resolution
/// * `new_samples` - Minimum number of new events before resolution, 0 value is treated as error
/// * `max_samples` - Maximum number of events that shall be received from the communication
/// buffer and transferred to the container
/// buffer and transferred to the container, 0 value is treated as error
///
/// # Returns
/// Future that resolves to the number of newly added events to the container with at least
/// `new_samples` number of new events.
/// Future that resolves to `(SampleContainer<Self::Sample<'a>>, Result<usize>)`.
/// SampleContainer is **always** returned (even on error or cancellation)
/// Success case return contains the number of newly added samples to the container with
/// number of samples.
/// Failure case return the SampleContainer which user passed to the function and
/// an 'Error' indicating the reason for failure of the receive operation.
///
/// # Important Notes
/// User can not concurrenly call `receive` on the same subscription instance from
Expand All @@ -844,12 +850,56 @@ pub trait Subscription<T: CommData + Debug, R: Runtime + ?Sized> {
// The `Future` cannot have a `'static` lifetime. If we enforced `'static`, then `self` would
// also need to be `'static`, which is not semantically correct for this use case.
// Multiple threads cannot concurrently read the same event from a single subscription.
#[allow(clippy::manual_async_fn)]
fn receive<'a>(
&'a self,
scratch: SampleContainer<Self::Sample<'a>>,
new_samples: usize,
max_samples: usize,
) -> impl Future<Output = Result<SampleContainer<Self::Sample<'a>>>> + 'a;
) -> impl Future<Output = (SampleContainer<Self::Sample<'a>>, Result<usize>)> + 'a {
self.receive_timeout(
scratch,
new_samples,
max_samples,
core::future::pending::<()>(),
)
}

/// This method is extension of `receive` with timeout support.
/// It returns a future that resolves as soon as at least `new_samples` samples have been transferred
/// from the communication buffer to the sample container or the `timeout` future resolves.
///
/// # Parameters
/// * `scratch` - Container for events from this subscription
/// * `new_samples` - Minimum number of new events before resolution, 0 value is treated as error
/// * `max_samples` - Maximum number of events that shall be received from the communication
/// buffer and transferred to the container, 0 value is treated as error
/// * `timeout` - Future that resolves when the receive operation should time out. If the timeout
/// future resolves before the required number of samples are received, the receive operation is
/// considered to have timed out.
///
/// # Returns
/// Future that resolves to `(SampleContainer<Self::Sample<'a>>, Result<usize>)`.
/// SampleContainer is **always** returned (even on error or cancellation)
/// Success case return contains the number of newly added samples to the container with
/// number of samples.
/// Failure case return the SampleContainer which user passed to the function and
/// an 'Error' indicating the reason for failure of the receive operation.
///
/// # Important Notes
/// User can not concurrenly call `receive_timeout` on the same subscription instance from
/// multiple threads or tasks.
/// `timeout` must be `'static` because timeout futures (e.g. `tokio::time::sleep`) own all
/// their state and do not borrow anything from the caller's scope.
/// And if you change to `'a` lifetime then it create lifetime bound not satisfied
/// error from rust (see issue <https://github.com/rust-lang/rust/issues/100013> for more information)
fn receive_timeout<'a>(
&'a self,
scratch: SampleContainer<Self::Sample<'a>>,
new_samples: usize,
max_samples: usize,
timeout: impl Future<Output = ()> + Send + 'static,
) -> impl Future<Output = (SampleContainer<Self::Sample<'a>>, Result<usize>)> + 'a;
}

/// A trait for types that can be default-constructed in place, skipping intermediate moves.
Expand Down
4 changes: 4 additions & 0 deletions score/mw/com/impl/rust/com-api/com-api-concept/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ pub enum ReceiveFailedReason {
BufferUnavailable,
#[error("Sample size out of bounds, expected at most {max}, but got {requested}")]
SampleCountOutOfBounds { max: usize, requested: usize },
#[error("Receive operation was cancelled or timed out")]
Cancelled,
#[error("Input value out of bounds, maximum sample {max}, but new sample is {requested}")]
InputValueOutOfBounds { max: usize, requested: usize },
}

/// Comprehensive error reasons for event-related failures
Expand Down
Loading
Loading