diff --git a/CHANGELOG.md b/CHANGELOG.md index 034a0ccd..7cb1f015 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - ### Fixed -- +- **Postgres lifecycle waits for usable databases**: managed Postgres + container startup now waits for real `psql SELECT 1` connectivity, creates + the configured database if the cluster is alive but it is absent, and + includes container logs on readiness failures. This prevents major-upgrade + restores from treating `pg_isready` as sufficient before the target database + exists. +- **Postgres major upgrades require a clean live volume**: the upgrade + snapshot and rollback phases now remove the temporary copy sidecar before + deleting the old live data volume, fail if Docker still cannot remove that + volume, and wait for the final healthy Postgres container before creating + the target database, preventing Postgres 18+ containers from reopening stale + 17-era PGDATA after a supposedly clean handoff. +### Tests +- **Postgres upgrade Docker tests use real backup FK rows**: + `postgres_upgrade` integration fixtures now insert a matching `backups` row + for the fake pre-upgrade backup id, keeping the existing rollback/upgrade + assertions valid after FK enforcement. ## [0.1.0-beta.38] - 2026-06-23 diff --git a/crates/temps-providers/src/externalsvc/postgres_upgrade.rs b/crates/temps-providers/src/externalsvc/postgres_upgrade.rs index b0edbe1a..0aa38b66 100644 --- a/crates/temps-providers/src/externalsvc/postgres_upgrade.rs +++ b/crates/temps-providers/src/externalsvc/postgres_upgrade.rs @@ -662,8 +662,15 @@ impl PostgresUpgradeOrchestrator { self.copy_volume(row, &source_volume, &rollback_volume) .await?; - // Remove the original — new_container phase will recreate it empty. - self.remove_volume_best_effort(&source_volume).await; + // Remove the original. The new_container phase must start from an + // empty live volume; reusing old-version PGDATA breaks Postgres 18+. + self.remove_volume_or_fail(&source_volume) + .await + .map_err(|reason| PostgresUpgradeError::SnapshotFailed { + upgrade_id: row.id, + service_id: row.service_id, + reason: format!("remove old live volume '{}': {}", source_volume, reason), + })?; let expires_at = chrono::Utc::now() + chrono::Duration::days(ROLLBACK_RETENTION_DAYS); let current = self.load_upgrade(row.id).await?; @@ -1235,7 +1242,7 @@ impl PostgresUpgradeOrchestrator { read_only: Some(true), ..Default::default() }]), - auto_remove: Some(true), + auto_remove: Some(false), ..Default::default() }), ..Default::default() @@ -1500,6 +1507,8 @@ impl PostgresUpgradeOrchestrator { dest: &str, ) -> Result<(), PostgresUpgradeError> { use futures::TryStreamExt; + use std::time::{Duration, Instant}; + let copy_container_name = format!( "temps_pg_upgrade_{}_copy_{}", row.id, @@ -1564,7 +1573,8 @@ impl PostgresUpgradeOrchestrator { reason: format!("start copy container: {}", e), })?; - self.docker + let wait_result = self + .docker .wait_container( &created.id, None::, @@ -1577,6 +1587,82 @@ impl PostgresUpgradeOrchestrator { reason: format!("wait copy container: {}", e), })?; + let remove_result = self + .docker + .remove_container( + &created.id, + Some(bollard::query_parameters::RemoveContainerOptions { + v: true, + force: true, + ..Default::default() + }), + ) + .await; + + if let Err(e) = remove_result { + let msg = e.to_string(); + if !msg.contains("removal of container") || !msg.contains("is already in progress") { + return Err(PostgresUpgradeError::SnapshotFailed { + upgrade_id: row.id, + service_id: row.service_id, + reason: format!("remove copy container: {}", msg), + }); + } + } + + let deadline = Instant::now() + Duration::from_secs(30); + loop { + match self + .docker + .inspect_container( + &created.id, + None::, + ) + .await + { + Ok(_) if Instant::now() < deadline => { + tokio::time::sleep(Duration::from_millis(250)).await; + } + Ok(_) => { + return Err(PostgresUpgradeError::SnapshotFailed { + upgrade_id: row.id, + service_id: row.service_id, + reason: format!( + "copy container '{}' still exists after removal timeout", + created.id + ), + }); + } + Err(e) => { + let msg = e.to_string(); + if msg.contains("No such container") + || msg.contains("no such container") + || msg.contains("not found") + || msg.contains("404") + { + break; + } + return Err(PostgresUpgradeError::SnapshotFailed { + upgrade_id: row.id, + service_id: row.service_id, + reason: format!("inspect copy container after removal: {}", msg), + }); + } + } + } + + if let Some(nonzero) = wait_result + .iter() + .map(|status| status.status_code) + .find(|status_code| *status_code != 0) + { + return Err(PostgresUpgradeError::SnapshotFailed { + upgrade_id: row.id, + service_id: row.service_id, + reason: format!("copy container exited with status {}", nonzero), + }); + } + Ok(()) } @@ -1593,6 +1679,26 @@ impl PostgresUpgradeOrchestrator { .await; } + async fn remove_volume_or_fail(&self, volume_name: &str) -> Result<(), String> { + self.docker + .remove_volume( + volume_name, + Some(bollard::query_parameters::RemoveVolumeOptions { force: true }), + ) + .await + .map_err(|e| e.to_string()) + .or_else(|reason| { + if reason.contains("No such volume") + || reason.contains("not found") + || reason.contains("404") + { + Ok(()) + } else { + Err(reason) + } + }) + } + // ---- Rollback volume retention ------------------------------------- /// Sweep rollback volumes whose 7-day retention has expired. @@ -1770,8 +1876,15 @@ impl PostgresUpgradeOrchestrator { // 2. Remove the live volume so the copy produces a clean replica of // the rollback volume (any lingering new-version WAL/catalog is - // discarded). `remove_volume_best_effort` tolerates absence. - self.remove_volume_best_effort(&live_volume).await; + // discarded). This is required; overlaying rollback data onto a + // dirty live volume can leave incompatible catalog/WAL files. + self.remove_volume_or_fail(&live_volume) + .await + .map_err(|reason| PostgresUpgradeError::RollbackFailed { + upgrade_id, + service_id: row.service_id, + reason: format!("remove live volume '{}': {}", live_volume, reason), + })?; self.create_volume_if_missing(&row, &live_volume).await?; self.copy_volume(&row, &rollback_volume, &live_volume) .await @@ -2790,14 +2903,16 @@ mod tests { use sea_orm::{ActiveModelTrait, ActiveValue::Set, DatabaseConnection, EntityTrait}; use serde_json::json; use std::sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicI32, AtomicU64, Ordering}, Arc, Mutex, }; use std::time::{Duration, Instant}; use tempfile::TempDir; use temps_core::EncryptionService; use temps_database::test_utils::TestDatabase; - use temps_entities::{external_services, postgres_major_upgrades, users}; + use temps_entities::{ + backups, external_services, postgres_major_upgrades, s3_sources, users, + }; use temps_logs::LogService; use crate::postgres_lifecycle::PostgresLifecycleAdapter; @@ -3105,35 +3220,101 @@ mod tests { } } - /// Stub `PreUpgradeBackupProvider` — always returns a synthetic - /// backup id (42) without touching S3. The orchestrator only reads - /// this id back onto the row; no downstream check validates it. + /// Stub `PreUpgradeBackupProvider` — creates a real control-plane + /// backup row without touching S3. The orchestrator stores the + /// returned id on `postgres_major_upgrades.pre_upgrade_backup_id`, + /// which has a real FK to `backups.id` in the test database. pub struct StubBackupProvider { + db: Arc, pub calls: Arc, + pub last_backup_id: Arc, } impl StubBackupProvider { - pub fn new() -> Self { + pub fn new(db: Arc) -> Self { Self { + db, calls: Arc::new(AtomicU64::new(0)), + last_backup_id: Arc::new(AtomicI32::new(0)), } } } #[async_trait] impl PreUpgradeBackupProvider for StubBackupProvider { - async fn default_s3_source_id(&self, _service_id: i32) -> Result, String> { - Ok(Some(1)) + async fn default_s3_source_id(&self, service_id: i32) -> Result, String> { + let now = chrono::Utc::now(); + let source = s3_sources::ActiveModel { + id: sea_orm::NotSet, + name: Set(format!( + "upgrade-test-s3-{}-{}", + service_id, + now.timestamp_nanos_opt().unwrap_or(0) + )), + bucket_name: Set("upgrade-test-bucket".to_string()), + region: Set("us-east-1".to_string()), + endpoint: Set(Some("http://127.0.0.1:9000".to_string())), + bucket_path: Set("postgres-upgrade-tests".to_string()), + access_key_id: Set("test-access-key".to_string()), + secret_key: Set("test-secret-key".to_string()), + force_path_style: Set(Some(true)), + is_default: Set(true), + created_at: Set(now), + updated_at: Set(now), + } + .insert(self.db.as_ref()) + .await + .map_err(|e| format!("insert test s3 source: {}", e))?; + + Ok(Some(source.id)) } async fn create_pre_upgrade_backup( &self, - _service_id: i32, - _s3_source_id: i32, - _created_by: i32, + service_id: i32, + s3_source_id: i32, + created_by: i32, ) -> Result { self.calls.fetch_add(1, Ordering::SeqCst); - Ok(42) + let now = chrono::Utc::now(); + let backup_uuid = format!( + "upgrade-pre-backup-{}-{}", + service_id, + now.timestamp_nanos_opt().unwrap_or(0) + ); + let backup = backups::ActiveModel { + id: sea_orm::NotSet, + name: Set(format!("Pre-upgrade backup {}", service_id)), + backup_id: Set(backup_uuid.clone()), + schedule_id: Set(None), + backup_type: Set("pre_upgrade".to_string()), + state: Set("completed".to_string()), + started_at: Set(now), + finished_at: Set(Some(now)), + size_bytes: Set(Some(0)), + file_count: Set(Some(0)), + s3_source_id: Set(s3_source_id), + s3_location: Set(format!("s3://upgrade-test-bucket/{}", backup_uuid)), + error_message: Set(None), + metadata: Set(serde_json::json!({ + "test": true, + "service_id": service_id, + "purpose": "postgres_major_upgrade_pre_backup" + }) + .to_string()), + checksum: Set(None), + compression_type: Set("none".to_string()), + created_by: Set(created_by), + expires_at: Set(None), + tags: Set("[]".to_string()), + schedule_run_id: sea_orm::NotSet, + } + .insert(self.db.as_ref()) + .await + .map_err(|e| format!("insert test backup: {}", e))?; + + self.last_backup_id.store(backup.id, Ordering::SeqCst); + Ok(backup.id) } } @@ -3278,7 +3459,7 @@ mod tests { let ctx = UpgradeTestCtx::new("rollback").await; let backup_provider: Arc = - Arc::new(StubBackupProvider::new()); + Arc::new(StubBackupProvider::new(ctx.test_db.db.clone())); let lifecycle: Arc = ctx.lifecycle_adapter.clone(); // Seed pre-upgrade data on v17. @@ -3439,7 +3620,7 @@ mod tests { } let ctx = UpgradeTestCtx::new("happy").await; - let backup_provider = Arc::new(StubBackupProvider::new()); + let backup_provider = Arc::new(StubBackupProvider::new(ctx.test_db.db.clone())); let backup_provider_trait: Arc = backup_provider.clone(); let lifecycle: Arc = ctx.lifecycle_adapter.clone(); @@ -3478,7 +3659,11 @@ mod tests { assert!(row.finished_at.is_some(), "finished_at unset"); assert_eq!( row.pre_upgrade_backup_id, - Some(42), + Some( + backup_provider + .last_backup_id + .load(std::sync::atomic::Ordering::SeqCst) + ), "pre_upgrade_backup_id not persisted" ); assert!( @@ -3585,7 +3770,7 @@ mod tests { let ctx = UpgradeTestCtx::new("snap-order").await; let backup_provider: Arc = - Arc::new(StubBackupProvider::new()); + Arc::new(StubBackupProvider::new(ctx.test_db.db.clone())); // Wrap the real adapter in a recorder so we can observe call order. let recorder = RecordingLifecycle::new(ctx.lifecycle_adapter.clone()); @@ -3744,7 +3929,7 @@ mod tests { let ctx = UpgradeTestCtx::new("stream-drain").await; let backup_provider: Arc = - Arc::new(StubBackupProvider::new()); + Arc::new(StubBackupProvider::new(ctx.test_db.db.clone())); let lifecycle: Arc = ctx.lifecycle_adapter.clone(); let result = async { @@ -3843,7 +4028,7 @@ mod tests { let ctx = UpgradeTestCtx::new("dump-idem").await; let backup_provider: Arc = - Arc::new(StubBackupProvider::new()); + Arc::new(StubBackupProvider::new(ctx.test_db.db.clone())); let lifecycle: Arc = ctx.lifecycle_adapter.clone(); let result = async { @@ -3913,7 +4098,7 @@ mod tests { // state, the whole retry should complete FAST (< 60s typ). let retry_start = std::time::Instant::now(); let orch2 = ctx.orchestrator( - Arc::new(StubBackupProvider::new()), + Arc::new(StubBackupProvider::new(ctx.test_db.db.clone())), ctx.lifecycle_adapter.clone(), ); orch2 diff --git a/crates/temps-providers/src/postgres_lifecycle.rs b/crates/temps-providers/src/postgres_lifecycle.rs index 95e85f29..00ef3803 100644 --- a/crates/temps-providers/src/postgres_lifecycle.rs +++ b/crates/temps-providers/src/postgres_lifecycle.rs @@ -14,7 +14,7 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use bollard::Docker; -use futures::TryStreamExt; +use futures::{StreamExt, TryStreamExt}; use sea_orm::{ActiveModelTrait, ActiveValue::Set, DatabaseConnection, EntityTrait}; use temps_core::EncryptionService; use temps_entities::external_services; @@ -86,6 +86,229 @@ impl PostgresLifecycleAdapter { .map_err(|e| format!("bad version in '{}': {}", image, e))?; Ok(format!("/var/lib/postgresql/{}/docker", version)) } + + fn shell_escape(s: &str) -> String { + format!("'{}'", s.replace('\'', "'\\''")) + } + + fn sql_string_literal(s: &str) -> String { + format!("'{}'", s.replace('\'', "''")) + } + + async fn exec_in_container( + &self, + container_name: &str, + cmd: Vec, + env: Option>, + ) -> Result<(Option, String), String> { + let exec = self + .docker + .create_exec( + container_name, + bollard::models::ExecConfig { + cmd: Some(cmd), + env, + attach_stdout: Some(true), + attach_stderr: Some(true), + ..Default::default() + }, + ) + .await + .map_err(|e| format!("create_exec({}) failed: {}", container_name, e))?; + + let mut output_text = String::new(); + if let Ok(bollard::exec::StartExecResults::Attached { mut output, .. }) = + self.docker.start_exec(&exec.id, None).await + { + while let Some(chunk) = output.next().await { + match chunk { + Ok(chunk) => output_text.push_str(&chunk.to_string()), + Err(e) => output_text.push_str(&format!("", e)), + } + } + } + + let inspect = self + .docker + .inspect_exec(&exec.id) + .await + .map_err(|e| format!("inspect_exec({}) failed: {}", container_name, e))?; + Ok((inspect.exit_code, output_text)) + } + + async fn container_logs(&self, container_name: &str) -> String { + self.docker + .logs( + container_name, + Some( + bollard::query_parameters::LogsOptionsBuilder::new() + .stdout(true) + .stderr(true) + .build(), + ), + ) + .try_collect::>() + .await + .map(|v| v.into_iter().map(|c| c.to_string()).collect::()) + .unwrap_or_else(|e| format!("", e)) + } + + async fn wait_for_psql_database( + &self, + container_name: &str, + cfg: &PostgresConfig, + database: &str, + timeout: Duration, + ) -> Result<(), String> { + let deadline = Instant::now() + timeout; + let mut last_exit = None; + let mut last_output = String::new(); + while Instant::now() < deadline { + let result = self + .exec_in_container( + container_name, + vec![ + "psql".to_string(), + "-v".to_string(), + "ON_ERROR_STOP=1".to_string(), + "-U".to_string(), + cfg.username.clone(), + "-d".to_string(), + database.to_string(), + "-tAc".to_string(), + "SELECT 1".to_string(), + ], + Some(vec![format!("PGPASSWORD={}", cfg.password)]), + ) + .await; + + if let Ok((exit_code, output)) = result { + last_exit = exit_code; + last_output = output; + if exit_code == Some(0) { + return Ok(()); + } + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + + let logs = self.container_logs(container_name).await; + Err(format!( + "container '{}' failed psql readiness for database '{}' within {}s \ + (last exit={:?}, output={}):\n{}", + container_name, + database, + timeout.as_secs(), + last_exit, + last_output.trim(), + logs + )) + } + + async fn wait_for_container_healthy( + &self, + container_name: &str, + timeout: Duration, + ) -> Result<(), String> { + let deadline = Instant::now() + timeout; + let mut last_state = String::new(); + let mut last_health = String::new(); + + while Instant::now() < deadline { + let inspect = self + .docker + .inspect_container( + container_name, + None::, + ) + .await + .map_err(|e| format!("inspect_container({}) failed: {}", container_name, e))?; + + if let Some(state) = inspect.state { + last_state = state + .status + .map(|status| status.to_string()) + .unwrap_or_default(); + last_health = state + .health + .and_then(|health| health.status) + .map(|status| status.to_string()) + .unwrap_or_default(); + + if state.running == Some(false) || state.dead == Some(true) { + let logs = self.container_logs(container_name).await; + return Err(format!( + "container '{}' stopped before becoming healthy \ + (state='{}', health='{}', exit={:?}, error={:?}):\n{}", + container_name, last_state, last_health, state.exit_code, state.error, logs + )); + } + + if last_health == "healthy" { + return Ok(()); + } + } + + tokio::time::sleep(Duration::from_millis(500)).await; + } + + let logs = self.container_logs(container_name).await; + Err(format!( + "container '{}' did not become healthy within {}s \ + (last state='{}', health='{}'):\n{}", + container_name, + timeout.as_secs(), + last_state, + last_health, + logs + )) + } + + async fn ensure_database_exists( + &self, + container_name: &str, + cfg: &PostgresConfig, + ) -> Result<(), String> { + let sql = format!( + "SELECT 1 FROM pg_database WHERE datname = {}", + Self::sql_string_literal(&cfg.database) + ); + let cmd = format!( + "set -e; \ + if psql -v ON_ERROR_STOP=1 -U {user} -d postgres -tAc {sql} | grep -q 1; then \ + exit 0; \ + fi; \ + if createdb -U {user} {db} 2>/tmp/createdb.err; then \ + exit 0; \ + fi; \ + psql -v ON_ERROR_STOP=1 -U {user} -d postgres -tAc {sql} | grep -q 1 \ + || {{ cat /tmp/createdb.err >&2; exit 1; }}", + user = Self::shell_escape(&cfg.username), + sql = Self::shell_escape(&sql), + db = Self::shell_escape(&cfg.database), + ); + let (exit_code, output) = self + .exec_in_container( + container_name, + vec!["sh".to_string(), "-lc".to_string(), cmd], + Some(vec![format!("PGPASSWORD={}", cfg.password)]), + ) + .await?; + + if exit_code == Some(0) { + Ok(()) + } else { + let logs = self.container_logs(container_name).await; + Err(format!( + "failed to ensure database '{}' exists in '{}' (exit={:?}, output={}):\n{}", + cfg.database, + container_name, + exit_code, + output.trim(), + logs + )) + } + } } #[async_trait] @@ -293,51 +516,21 @@ impl PostgresContainerLifecycle for PostgresLifecycleAdapter { .await .map_err(|e| format!("start_container({}) failed: {}", container_name, e))?; - // Block until Postgres accepts connections. - let deadline = Instant::now() + Duration::from_secs(120); - loop { - if Instant::now() > deadline { - return Err(format!( - "container '{}' failed to become ready within 120s", - container_name - )); - } - let exec = self - .docker - .create_exec( - &container_name, - bollard::models::ExecConfig { - cmd: Some(vec![ - "pg_isready".to_string(), - "-U".to_string(), - cfg.username.clone(), - "-d".to_string(), - cfg.database.clone(), - ]), - attach_stdout: Some(true), - attach_stderr: Some(true), - ..Default::default() - }, - ) - .await; - if let Ok(id) = exec { - // Drain the attached stream before inspect_exec, otherwise stdout - // backpressure stalls the exec and exit_code never surfaces. - use futures::StreamExt; - if let Ok(bollard::exec::StartExecResults::Attached { mut output, .. }) = - self.docker.start_exec(&id.id, None).await - { - while output.next().await.is_some() {} - } - let inspect = self.docker.inspect_exec(&id.id).await; - if let Ok(info) = inspect { - if info.exit_code == Some(0) { - break; - } - } - } - tokio::time::sleep(Duration::from_millis(500)).await; - } + // Block until Docker's healthcheck sees the final post-entrypoint + // server, then make sure the configured application database exists. + // A transient init server can accept SQL briefly before shutdown. + self.wait_for_container_healthy(&container_name, Duration::from_secs(120)) + .await?; + self.wait_for_psql_database(&container_name, &cfg, "postgres", Duration::from_secs(120)) + .await?; + self.ensure_database_exists(&container_name, &cfg).await?; + self.wait_for_psql_database( + &container_name, + &cfg, + &cfg.database, + Duration::from_secs(30), + ) + .await?; Ok(()) }