Skip to content

Commit fd6cbf6

Browse files
authored
fix(server): retry sandbox delete phase conflicts (NVIDIA#1905)
Signed-off-by: Taylor Mutch <taylormutch@gmail.com>
1 parent 294c64e commit fd6cbf6

1 file changed

Lines changed: 163 additions & 11 deletions

File tree

  • crates/openshell-server/src/compute

crates/openshell-server/src/compute/mod.rs

Lines changed: 163 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ type DriverWatchStream = Pin<Box<dyn Stream<Item = Result<WatchSandboxesEvent, S
5151
type SharedComputeDriver =
5252
Arc<dyn ComputeDriver<WatchSandboxesStream = DriverWatchStream> + Send + Sync>;
5353

54+
const DELETE_PHASE_CAS_RETRY_LIMIT: usize = 3;
55+
5456
#[tonic::async_trait]
5557
trait ShutdownCleanup: Send + Sync {
5658
async fn cleanup_on_shutdown(&self) -> Result<(), String>;
@@ -540,17 +542,7 @@ impl ComputeRuntime {
540542

541543
let id = sandbox.object_id().to_string();
542544

543-
// Use CAS to set phase to Deleting
544-
// TODO: Accept expected_version from DeleteSandboxRequest for proper client-driven CAS
545-
let sandbox = self
546-
.store
547-
.update_message_cas::<Sandbox, _>(&id, 0, |s| {
548-
s.set_phase(SandboxPhase::Deleting as i32);
549-
})
550-
.await
551-
.map_err(|e| {
552-
crate::grpc::persistence_error_to_status(e, "set sandbox phase to Deleting")
553-
})?;
545+
let sandbox = self.set_sandbox_phase_deleting_with_retry(&id).await?;
554546

555547
self.sandbox_index.update_from_sandbox(&sandbox);
556548
self.sandbox_watch_bus.notify(&id);
@@ -574,6 +566,114 @@ impl ComputeRuntime {
574566
Ok(deleted)
575567
}
576568

569+
async fn set_sandbox_phase_deleting_with_retry(
570+
&self,
571+
sandbox_id: &str,
572+
) -> Result<Sandbox, Status> {
573+
self.set_sandbox_phase_deleting_with_initial_snapshot(sandbox_id, None)
574+
.await
575+
}
576+
577+
async fn set_sandbox_phase_deleting_with_initial_snapshot(
578+
&self,
579+
sandbox_id: &str,
580+
mut initial_snapshot: Option<Sandbox>,
581+
) -> Result<Sandbox, Status> {
582+
let operation = "set sandbox phase to Deleting";
583+
584+
for attempt in 1..=DELETE_PHASE_CAS_RETRY_LIMIT {
585+
let sandbox = match initial_snapshot.take() {
586+
Some(sandbox) => sandbox,
587+
None => self
588+
.store
589+
.get_message::<Sandbox>(sandbox_id)
590+
.await
591+
.map_err(|e| Status::internal(format!("fetch sandbox failed: {e}")))?
592+
.ok_or_else(|| Status::not_found("sandbox not found"))?,
593+
};
594+
595+
match self
596+
.write_sandbox_phase_deleting_from_snapshot(sandbox)
597+
.await
598+
{
599+
Ok(sandbox) => {
600+
if attempt > 1 {
601+
debug!(
602+
sandbox_id,
603+
attempt, "Retried sandbox delete phase transition after CAS conflict"
604+
);
605+
}
606+
return Ok(sandbox);
607+
}
608+
Err(crate::persistence::PersistenceError::Conflict {
609+
current_resource_version,
610+
}) => {
611+
let err = crate::persistence::PersistenceError::Conflict {
612+
current_resource_version,
613+
};
614+
if attempt == DELETE_PHASE_CAS_RETRY_LIMIT {
615+
return Err(crate::grpc::persistence_error_to_status(err, operation));
616+
}
617+
debug!(
618+
sandbox_id,
619+
attempt,
620+
current_resource_version,
621+
"Sandbox delete phase transition conflicted; retrying"
622+
);
623+
tokio::task::yield_now().await;
624+
}
625+
Err(err) => return Err(crate::grpc::persistence_error_to_status(err, operation)),
626+
}
627+
}
628+
629+
unreachable!("delete phase retry loop always returns")
630+
}
631+
632+
async fn write_sandbox_phase_deleting_from_snapshot(
633+
&self,
634+
mut sandbox: Sandbox,
635+
) -> crate::persistence::PersistenceResult<Sandbox> {
636+
let id = sandbox.object_id().to_string();
637+
let name = sandbox.object_name().to_string();
638+
let expected_resource_version = sandbox
639+
.metadata
640+
.as_ref()
641+
.map_or(0, |metadata| metadata.resource_version);
642+
643+
sandbox.set_phase(SandboxPhase::Deleting as i32);
644+
645+
let labels_json = sandbox
646+
.metadata
647+
.as_ref()
648+
.map(|metadata| &metadata.labels)
649+
.filter(|labels| !labels.is_empty())
650+
.map(serde_json::to_string)
651+
.transpose()
652+
.map_err(|e| {
653+
crate::persistence::PersistenceError::Encode(format!(
654+
"failed to serialize labels: {e}"
655+
))
656+
})?;
657+
658+
let result = self
659+
.store
660+
.put_if(
661+
Sandbox::object_type(),
662+
&id,
663+
&name,
664+
&sandbox.encode_to_vec(),
665+
labels_json.as_deref(),
666+
WriteCondition::MatchResourceVersion(expected_resource_version),
667+
)
668+
.await?;
669+
670+
if let Some(metadata) = sandbox.metadata.as_mut() {
671+
metadata.resource_version = result.resource_version;
672+
}
673+
674+
Ok(sandbox)
675+
}
676+
577677
pub fn spawn_watchers(&self, shutdown_rx: watch::Receiver<bool>) {
578678
let runtime = Arc::new(self.clone());
579679
if self.store.is_single_replica() {
@@ -2536,6 +2636,58 @@ mod tests {
25362636
));
25372637
}
25382638

2639+
#[tokio::test]
2640+
async fn set_sandbox_phase_deleting_retries_after_stale_snapshot_conflict() {
2641+
let runtime = test_runtime(Arc::new(TestDriver::default())).await;
2642+
let sandbox = sandbox_record("sb-1", "sandbox-a", SandboxPhase::Ready);
2643+
runtime.store.put_message(&sandbox).await.unwrap();
2644+
2645+
let stale_snapshot = runtime
2646+
.store
2647+
.get_message::<Sandbox>("sb-1")
2648+
.await
2649+
.unwrap()
2650+
.unwrap();
2651+
2652+
runtime
2653+
.store
2654+
.update_message_cas::<Sandbox, _>("sb-1", 0, |sandbox| {
2655+
sandbox.set_current_policy_version(7);
2656+
})
2657+
.await
2658+
.unwrap();
2659+
2660+
let updated = runtime
2661+
.set_sandbox_phase_deleting_with_initial_snapshot("sb-1", Some(stale_snapshot))
2662+
.await
2663+
.unwrap();
2664+
2665+
assert_eq!(
2666+
SandboxPhase::try_from(updated.phase()).unwrap(),
2667+
SandboxPhase::Deleting
2668+
);
2669+
assert_eq!(updated.current_policy_version(), 7);
2670+
assert_eq!(
2671+
updated
2672+
.metadata
2673+
.as_ref()
2674+
.map_or(0, |metadata| metadata.resource_version),
2675+
3
2676+
);
2677+
2678+
let stored = runtime
2679+
.store
2680+
.get_message::<Sandbox>("sb-1")
2681+
.await
2682+
.unwrap()
2683+
.unwrap();
2684+
assert_eq!(
2685+
SandboxPhase::try_from(stored.phase()).unwrap(),
2686+
SandboxPhase::Deleting
2687+
);
2688+
assert_eq!(stored.current_policy_version(), 7);
2689+
}
2690+
25392691
#[tokio::test]
25402692
async fn apply_sandbox_update_allows_delete_failures_to_recover() {
25412693
let runtime = test_runtime(Arc::new(TestDriver::default())).await;

0 commit comments

Comments
 (0)