Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions src/player/statemanager/src/grpc/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,285 @@ impl StateManagerReceiver {
}
}

#[cfg(test)]
mod tests {
use super::*;
use common::monitoringserver::ContainerList;
use common::statemanager::{ErrorCode, ResourceType, StateChange};
use tonic::Request;

#[test]
fn test_validate_state_change_and_resource_type_to_string() {
let (tx, _rx) = mpsc::channel::<ContainerList>(1);
let (tx_state_change, _rx2) = mpsc::channel::<StateChange>(1);
let receiver = StateManagerReceiver {
tx,
tx_state_change,
};

// Valid state change
let sc = StateChange {
resource_type: ResourceType::Scenario as i32,
resource_name: "res1".to_string(),
current_state: "Idle".to_string(),
target_state: "Waiting".to_string(),
transition_id: "t1".to_string(),
timestamp_ns: 1,
source: "unittest".to_string(),
};
assert!(receiver.validate_state_change(&sc).is_ok());

// Invalid timestamp
let mut sc2 = sc.clone();
sc2.timestamp_ns = 0;
assert!(receiver.validate_state_change(&sc2).is_err());

// Empty resource_name
let mut sc3 = sc.clone();
sc3.resource_name = "".to_string();
assert!(receiver.validate_state_change(&sc3).is_err());

// resource_type_to_string checks
assert_eq!(
receiver.resource_type_to_string(ResourceType::Scenario as i32),
"Scenario"
);
assert_eq!(receiver.resource_type_to_string(9999), "Unknown");
}

#[tokio::test]
async fn test_send_changed_container_list_success_and_failure() {
// Success path: receiver present
let (tx, _rx) = mpsc::channel::<ContainerList>(1);
let (tx_state_change, _rx2) = mpsc::channel::<StateChange>(1);
let receiver = StateManagerReceiver {
tx: tx.clone(),
tx_state_change: tx_state_change.clone(),
};

let cl = ContainerList {
node_name: "n1".to_string(),
containers: vec![],
};
let resp = receiver.send_changed_container_list(Request::new(cl)).await;
assert!(resp.is_ok());

// Failure path: dropped receiver for tx
let (bad_tx, bad_rx) = mpsc::channel::<ContainerList>(1);
drop(bad_rx);
let receiver2 = StateManagerReceiver {
tx: bad_tx,
tx_state_change: tx_state_change.clone(),
};
let cl2 = ContainerList {
node_name: "n2".to_string(),
containers: vec![],
};
let resp2 = receiver2
.send_changed_container_list(Request::new(cl2))
.await;
assert!(resp2.is_err());
}

#[tokio::test]
async fn test_send_changed_container_list_response_content() {
let (tx, _rx) = mpsc::channel::<ContainerList>(1);
let (tx_state_change, _rx2) = mpsc::channel::<StateChange>(1);
let receiver = StateManagerReceiver {
tx: tx.clone(),
tx_state_change: tx_state_change.clone(),
};

let cl = ContainerList {
node_name: "n1".to_string(),
containers: vec![],
};
let resp = receiver
.send_changed_container_list(Request::new(cl))
.await
.unwrap();
let body = resp.into_inner();
assert_eq!(body.resp, "Successfully processed ContainerList");

// Failure message should contain 'cannot send changed container list'
let (bad_tx, bad_rx) = mpsc::channel::<ContainerList>(1);
drop(bad_rx);
let receiver2 = StateManagerReceiver {
tx: bad_tx,
tx_state_change,
};
let cl2 = ContainerList {
node_name: "n2".to_string(),
containers: vec![],
};
let resp2 = receiver2
.send_changed_container_list(Request::new(cl2))
.await;
assert!(resp2.is_err());
let status = resp2.err().unwrap();
assert_eq!(status.code(), tonic::Code::Unavailable);
assert!(status
.message()
.contains("cannot send changed container list"));
}

#[tokio::test]
async fn test_send_state_change_success_and_unavailable() {
// Success: tx_state_change has receiver
let (tx, _rx) = mpsc::channel::<ContainerList>(1);
let (tx_state_change, mut rx_state_change) = mpsc::channel::<StateChange>(1);
let receiver = StateManagerReceiver {
tx: tx.clone(),
tx_state_change: tx_state_change.clone(),
};

let sc = StateChange {
resource_type: ResourceType::Scenario as i32,
resource_name: "res2".to_string(),
current_state: "Idle".to_string(),
target_state: "Waiting".to_string(),
transition_id: "t2".to_string(),
timestamp_ns: 1,
source: "unittest".to_string(),
};

let resp = receiver.send_state_change(Request::new(sc.clone())).await;
assert!(resp.is_ok());
let body = resp.unwrap().into_inner();
assert_eq!(body.error_code, ErrorCode::Success as i32);

// ensure message was forwarded
let forwarded = rx_state_change.recv().await;
assert!(forwarded.is_some());

// Failure: tx_state_change cannot send (receiver dropped)
let (bad_tx, bad_rx) = mpsc::channel::<StateChange>(1);
drop(bad_rx);
let receiver2 = StateManagerReceiver {
tx: tx.clone(),
tx_state_change: bad_tx,
};

let sc2 = StateChange {
transition_id: "t3".to_string(),
..sc.clone()
};
let resp2 = receiver2
.send_state_change(Request::new(sc2))
.await
.unwrap();
let inner = resp2.into_inner();
assert_eq!(inner.error_code, ErrorCode::ResourceUnavailable as i32);
}

#[tokio::test]
async fn test_send_action_returns_unavailable() {
let (tx, _rx) = mpsc::channel::<ContainerList>(1);
let (tx_state_change, _rx2) = mpsc::channel::<StateChange>(1);
let receiver = StateManagerReceiver {
tx,
tx_state_change,
};

let action = common::statemanager::Action {
action: "doit".to_string(),
};
let res = receiver.send_action(Request::new(action)).await;
assert!(res.is_err());
let status = res.err().unwrap();
assert_eq!(status.code(), tonic::Code::Unavailable);
assert_eq!(status.message(), "doit");
}

#[tokio::test]
async fn test_send_state_change_validation_failure_returns_invalid_request() {
// Create receiver; validation should fail before attempting to forward
let (tx, _rx) = mpsc::channel::<ContainerList>(1);
let (tx_state_change, _rx2) = mpsc::channel::<StateChange>(1);
let receiver = StateManagerReceiver {
tx,
tx_state_change,
};

// Build an invalid StateChange (timestamp_ns <= 0)
let sc = StateChange {
resource_type: ResourceType::Scenario as i32,
resource_name: "bad".to_string(),
current_state: "Idle".to_string(),
target_state: "Waiting".to_string(),
transition_id: "bad-tid".to_string(),
timestamp_ns: 0,
source: "unittest".to_string(),
};

let resp = receiver.send_state_change(Request::new(sc)).await;
assert!(resp.is_ok());
let inner = resp.unwrap().into_inner();
assert_eq!(inner.error_code, ErrorCode::InvalidRequest as i32);
}

#[tokio::test]
async fn test_send_state_change_invalid_resource_type_returns_invalid_request() {
let (tx, _rx) = mpsc::channel::<ContainerList>(1);
let (tx_state_change, _rx2) = mpsc::channel::<StateChange>(1);
let receiver = StateManagerReceiver {
tx,
tx_state_change,
};

let sc = StateChange {
resource_type: 9999, // invalid
resource_name: "res_invalid".to_string(),
current_state: "Idle".to_string(),
target_state: "Waiting".to_string(),
transition_id: "tid-invalid".to_string(),
timestamp_ns: 1,
source: "unittest".to_string(),
};

let resp = receiver.send_state_change(Request::new(sc)).await;
assert!(resp.is_ok());
let inner = resp.unwrap().into_inner();
assert_eq!(inner.error_code, ErrorCode::InvalidRequest as i32);
}

#[test]
fn test_resource_type_to_string_variants() {
let (tx, _rx) = mpsc::channel::<ContainerList>(1);
let (tx_state_change, _rx2) = mpsc::channel::<StateChange>(1);
let receiver = StateManagerReceiver {
tx,
tx_state_change,
};

assert_eq!(
receiver.resource_type_to_string(ResourceType::Scenario as i32),
"Scenario"
);
assert_eq!(
receiver.resource_type_to_string(ResourceType::Package as i32),
"Package"
);
assert_eq!(
receiver.resource_type_to_string(ResourceType::Model as i32),
"Model"
);
assert_eq!(
receiver.resource_type_to_string(ResourceType::Volume as i32),
"Volume"
);
assert_eq!(
receiver.resource_type_to_string(ResourceType::Network as i32),
"Network"
);
assert_eq!(
receiver.resource_type_to_string(ResourceType::Node as i32),
"Node"
);
assert_eq!(receiver.resource_type_to_string(9999), "Unknown");
}
}

// ========================================
// FUTURE IMPLEMENTATION NOTES
// ========================================
Expand Down
45 changes: 45 additions & 0 deletions src/player/statemanager/src/grpc/receiver/timpani.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,48 @@ impl FaultService for TimpaniReceiver {
Ok(Response::new(response))
}
}

#[cfg(test)]
mod tests {
use super::*;
use tonic::Request;

#[tokio::test]
async fn test_notify_fault_returns_success() {
let receiver = TimpaniReceiver::default();

// Use default FaultInfo (prost types implement Default)
let info = FaultInfo::default();

let req = Request::new(info);
let resp = receiver.notify_fault(req).await;

assert!(resp.is_ok());
let resp = resp.unwrap();
assert_eq!(resp.get_ref().status, 0);
}

#[tokio::test]
async fn test_notify_fault_concurrent_calls() {
let receiver = TimpaniReceiver::default();

// Spawn multiple concurrent notify_fault calls to ensure no panics and consistent responses
let mut handles = Vec::new();
for _ in 0..8 {
handles.push(tokio::spawn(async move {
let r = TimpaniReceiver::default();
let info = FaultInfo::default();
let req = Request::new(info);
let res = r.notify_fault(req).await;
res
}));
}

for h in handles {
let res = h.await.expect("task panicked");
assert!(res.is_ok());
let out = res.unwrap();
assert_eq!(out.get_ref().status, 0);
}
}
}
33 changes: 33 additions & 0 deletions src/player/statemanager/src/grpc/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,44 @@ use common::actioncontroller::{
action_controller_connection_client::ActionControllerConnectionClient, connect_server,
ReconcileRequest, ReconcileResponse,
};
use std::env;
use tonic::{Request, Response, Status};

pub async fn _send(condition: ReconcileRequest) -> Result<Response<ReconcileResponse>, Status> {
// Test mode bypass: return a fake successful response when env var is set
if env::var("PULLPIRI_TEST_MODE").is_ok() {
let resp = ReconcileResponse {
status: 0,
desc: "mock".to_string(),
};
return Ok(Response::new(resp));
}
let mut client = ActionControllerConnectionClient::connect(connect_server())
.await
.unwrap();
client.reconcile(Request::new(condition)).await
}

#[cfg(test)]
mod tests {
use super::*;
use std::env;

#[tokio::test]
async fn test_send_in_test_mode_returns_mock_response() {
env::set_var("PULLPIRI_TEST_MODE", "1");

let req = ReconcileRequest {
scenario_name: "s1".to_string(),
current: 0,
desired: 0,
};

let res = _send(req).await;
assert!(res.is_ok());
let r = res.unwrap();
assert_eq!(r.get_ref().status, 0);

env::remove_var("PULLPIRI_TEST_MODE");
}
}
Loading
Loading