Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
-

### Fixed
-
- **Postgres lifecycle waits for usable databases**: managed Postgres
container startup now waits for real `psql SELECT 1` connectivity, creates
the configured database if the cluster is alive but it is absent, and
includes container logs on readiness failures. This prevents major-upgrade
restores from treating `pg_isready` as sufficient before the target database
exists.
- **Postgres major upgrades require a clean live volume**: the upgrade
snapshot and rollback phases now remove the temporary copy sidecar before
deleting the old live data volume, fail if Docker still cannot remove that
volume, and wait for the final healthy Postgres container before creating
the target database, preventing Postgres 18+ containers from reopening stale
17-era PGDATA after a supposedly clean handoff.

### Tests
- **Postgres upgrade Docker tests use real backup FK rows**:
`postgres_upgrade` integration fixtures now insert a matching `backups` row
for the fake pre-upgrade backup id, keeping the existing rollback/upgrade
assertions valid after FK enforcement.

## [0.1.0-beta.38] - 2026-06-23

Expand Down
235 changes: 210 additions & 25 deletions crates/temps-providers/src/externalsvc/postgres_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,8 +662,15 @@ impl PostgresUpgradeOrchestrator {

self.copy_volume(row, &source_volume, &rollback_volume)
.await?;
// Remove the original — new_container phase will recreate it empty.
self.remove_volume_best_effort(&source_volume).await;
// Remove the original. The new_container phase must start from an
// empty live volume; reusing old-version PGDATA breaks Postgres 18+.
self.remove_volume_or_fail(&source_volume)
.await
.map_err(|reason| PostgresUpgradeError::SnapshotFailed {
upgrade_id: row.id,
service_id: row.service_id,
reason: format!("remove old live volume '{}': {}", source_volume, reason),
})?;

let expires_at = chrono::Utc::now() + chrono::Duration::days(ROLLBACK_RETENTION_DAYS);
let current = self.load_upgrade(row.id).await?;
Expand Down Expand Up @@ -1235,7 +1242,7 @@ impl PostgresUpgradeOrchestrator {
read_only: Some(true),
..Default::default()
}]),
auto_remove: Some(true),
auto_remove: Some(false),
..Default::default()
}),
..Default::default()
Expand Down Expand Up @@ -1500,6 +1507,8 @@ impl PostgresUpgradeOrchestrator {
dest: &str,
) -> Result<(), PostgresUpgradeError> {
use futures::TryStreamExt;
use std::time::{Duration, Instant};

let copy_container_name = format!(
"temps_pg_upgrade_{}_copy_{}",
row.id,
Expand Down Expand Up @@ -1564,7 +1573,8 @@ impl PostgresUpgradeOrchestrator {
reason: format!("start copy container: {}", e),
})?;

self.docker
let wait_result = self
.docker
.wait_container(
&created.id,
None::<bollard::query_parameters::WaitContainerOptions>,
Expand All @@ -1577,6 +1587,82 @@ impl PostgresUpgradeOrchestrator {
reason: format!("wait copy container: {}", e),
})?;

let remove_result = self
.docker
.remove_container(
&created.id,
Some(bollard::query_parameters::RemoveContainerOptions {
v: true,
force: true,
..Default::default()
}),
)
.await;

if let Err(e) = remove_result {
let msg = e.to_string();
if !msg.contains("removal of container") || !msg.contains("is already in progress") {
return Err(PostgresUpgradeError::SnapshotFailed {
upgrade_id: row.id,
service_id: row.service_id,
reason: format!("remove copy container: {}", msg),
});
}
}

let deadline = Instant::now() + Duration::from_secs(30);
loop {
match self
.docker
.inspect_container(
&created.id,
None::<bollard::query_parameters::InspectContainerOptions>,
)
.await
{
Ok(_) if Instant::now() < deadline => {
tokio::time::sleep(Duration::from_millis(250)).await;
}
Ok(_) => {
return Err(PostgresUpgradeError::SnapshotFailed {
upgrade_id: row.id,
service_id: row.service_id,
reason: format!(
"copy container '{}' still exists after removal timeout",
created.id
),
});
}
Err(e) => {
let msg = e.to_string();
if msg.contains("No such container")
|| msg.contains("no such container")
|| msg.contains("not found")
|| msg.contains("404")
{
break;
}
return Err(PostgresUpgradeError::SnapshotFailed {
upgrade_id: row.id,
service_id: row.service_id,
reason: format!("inspect copy container after removal: {}", msg),
});
}
}
}

if let Some(nonzero) = wait_result
.iter()
.map(|status| status.status_code)
.find(|status_code| *status_code != 0)
{
return Err(PostgresUpgradeError::SnapshotFailed {
upgrade_id: row.id,
service_id: row.service_id,
reason: format!("copy container exited with status {}", nonzero),
});
}

Ok(())
}

Expand All @@ -1593,6 +1679,26 @@ impl PostgresUpgradeOrchestrator {
.await;
}

async fn remove_volume_or_fail(&self, volume_name: &str) -> Result<(), String> {
self.docker
.remove_volume(
volume_name,
Some(bollard::query_parameters::RemoveVolumeOptions { force: true }),
)
.await
.map_err(|e| e.to_string())
.or_else(|reason| {
if reason.contains("No such volume")
|| reason.contains("not found")
|| reason.contains("404")
{
Ok(())
} else {
Err(reason)
}
})
}

// ---- Rollback volume retention -------------------------------------

/// Sweep rollback volumes whose 7-day retention has expired.
Expand Down Expand Up @@ -1770,8 +1876,15 @@ impl PostgresUpgradeOrchestrator {

// 2. Remove the live volume so the copy produces a clean replica of
// the rollback volume (any lingering new-version WAL/catalog is
// discarded). `remove_volume_best_effort` tolerates absence.
self.remove_volume_best_effort(&live_volume).await;
// discarded). This is required; overlaying rollback data onto a
// dirty live volume can leave incompatible catalog/WAL files.
self.remove_volume_or_fail(&live_volume)
.await
.map_err(|reason| PostgresUpgradeError::RollbackFailed {
upgrade_id,
service_id: row.service_id,
reason: format!("remove live volume '{}': {}", live_volume, reason),
})?;
self.create_volume_if_missing(&row, &live_volume).await?;
self.copy_volume(&row, &rollback_volume, &live_volume)
.await
Expand Down Expand Up @@ -2790,14 +2903,16 @@ mod tests {
use sea_orm::{ActiveModelTrait, ActiveValue::Set, DatabaseConnection, EntityTrait};
use serde_json::json;
use std::sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicI32, AtomicU64, Ordering},
Arc, Mutex,
};
use std::time::{Duration, Instant};
use tempfile::TempDir;
use temps_core::EncryptionService;
use temps_database::test_utils::TestDatabase;
use temps_entities::{external_services, postgres_major_upgrades, users};
use temps_entities::{
backups, external_services, postgres_major_upgrades, s3_sources, users,
};
use temps_logs::LogService;

use crate::postgres_lifecycle::PostgresLifecycleAdapter;
Expand Down Expand Up @@ -3105,35 +3220,101 @@ mod tests {
}
}

/// Stub `PreUpgradeBackupProvider` — always returns a synthetic
/// backup id (42) without touching S3. The orchestrator only reads
/// this id back onto the row; no downstream check validates it.
/// Stub `PreUpgradeBackupProvider` — creates a real control-plane
/// backup row without touching S3. The orchestrator stores the
/// returned id on `postgres_major_upgrades.pre_upgrade_backup_id`,
/// which has a real FK to `backups.id` in the test database.
pub struct StubBackupProvider {
db: Arc<DatabaseConnection>,
pub calls: Arc<AtomicU64>,
pub last_backup_id: Arc<AtomicI32>,
}

impl StubBackupProvider {
pub fn new() -> Self {
pub fn new(db: Arc<DatabaseConnection>) -> Self {
Self {
db,
calls: Arc::new(AtomicU64::new(0)),
last_backup_id: Arc::new(AtomicI32::new(0)),
}
}
}

#[async_trait]
impl PreUpgradeBackupProvider for StubBackupProvider {
async fn default_s3_source_id(&self, _service_id: i32) -> Result<Option<i32>, String> {
Ok(Some(1))
async fn default_s3_source_id(&self, service_id: i32) -> Result<Option<i32>, String> {
let now = chrono::Utc::now();
let source = s3_sources::ActiveModel {
id: sea_orm::NotSet,
name: Set(format!(
"upgrade-test-s3-{}-{}",
service_id,
now.timestamp_nanos_opt().unwrap_or(0)
)),
bucket_name: Set("upgrade-test-bucket".to_string()),
region: Set("us-east-1".to_string()),
endpoint: Set(Some("http://127.0.0.1:9000".to_string())),
bucket_path: Set("postgres-upgrade-tests".to_string()),
access_key_id: Set("test-access-key".to_string()),
secret_key: Set("test-secret-key".to_string()),
force_path_style: Set(Some(true)),
is_default: Set(true),
created_at: Set(now),
updated_at: Set(now),
}
.insert(self.db.as_ref())
.await
.map_err(|e| format!("insert test s3 source: {}", e))?;

Ok(Some(source.id))
}

async fn create_pre_upgrade_backup(
&self,
_service_id: i32,
_s3_source_id: i32,
_created_by: i32,
service_id: i32,
s3_source_id: i32,
created_by: i32,
) -> Result<i32, String> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(42)
let now = chrono::Utc::now();
let backup_uuid = format!(
"upgrade-pre-backup-{}-{}",
service_id,
now.timestamp_nanos_opt().unwrap_or(0)
);
let backup = backups::ActiveModel {
id: sea_orm::NotSet,
name: Set(format!("Pre-upgrade backup {}", service_id)),
backup_id: Set(backup_uuid.clone()),
schedule_id: Set(None),
backup_type: Set("pre_upgrade".to_string()),
state: Set("completed".to_string()),
started_at: Set(now),
finished_at: Set(Some(now)),
size_bytes: Set(Some(0)),
file_count: Set(Some(0)),
s3_source_id: Set(s3_source_id),
s3_location: Set(format!("s3://upgrade-test-bucket/{}", backup_uuid)),
error_message: Set(None),
metadata: Set(serde_json::json!({
"test": true,
"service_id": service_id,
"purpose": "postgres_major_upgrade_pre_backup"
})
.to_string()),
checksum: Set(None),
compression_type: Set("none".to_string()),
created_by: Set(created_by),
expires_at: Set(None),
tags: Set("[]".to_string()),
schedule_run_id: sea_orm::NotSet,
}
.insert(self.db.as_ref())
.await
.map_err(|e| format!("insert test backup: {}", e))?;

self.last_backup_id.store(backup.id, Ordering::SeqCst);
Ok(backup.id)
}
}

Expand Down Expand Up @@ -3278,7 +3459,7 @@ mod tests {

let ctx = UpgradeTestCtx::new("rollback").await;
let backup_provider: Arc<dyn PreUpgradeBackupProvider> =
Arc::new(StubBackupProvider::new());
Arc::new(StubBackupProvider::new(ctx.test_db.db.clone()));
let lifecycle: Arc<dyn PostgresContainerLifecycle> = ctx.lifecycle_adapter.clone();

// Seed pre-upgrade data on v17.
Expand Down Expand Up @@ -3439,7 +3620,7 @@ mod tests {
}

let ctx = UpgradeTestCtx::new("happy").await;
let backup_provider = Arc::new(StubBackupProvider::new());
let backup_provider = Arc::new(StubBackupProvider::new(ctx.test_db.db.clone()));
let backup_provider_trait: Arc<dyn PreUpgradeBackupProvider> = backup_provider.clone();
let lifecycle: Arc<dyn PostgresContainerLifecycle> = ctx.lifecycle_adapter.clone();

Expand Down Expand Up @@ -3478,7 +3659,11 @@ mod tests {
assert!(row.finished_at.is_some(), "finished_at unset");
assert_eq!(
row.pre_upgrade_backup_id,
Some(42),
Some(
backup_provider
.last_backup_id
.load(std::sync::atomic::Ordering::SeqCst)
),
"pre_upgrade_backup_id not persisted"
);
assert!(
Expand Down Expand Up @@ -3585,7 +3770,7 @@ mod tests {

let ctx = UpgradeTestCtx::new("snap-order").await;
let backup_provider: Arc<dyn PreUpgradeBackupProvider> =
Arc::new(StubBackupProvider::new());
Arc::new(StubBackupProvider::new(ctx.test_db.db.clone()));

// Wrap the real adapter in a recorder so we can observe call order.
let recorder = RecordingLifecycle::new(ctx.lifecycle_adapter.clone());
Expand Down Expand Up @@ -3744,7 +3929,7 @@ mod tests {

let ctx = UpgradeTestCtx::new("stream-drain").await;
let backup_provider: Arc<dyn PreUpgradeBackupProvider> =
Arc::new(StubBackupProvider::new());
Arc::new(StubBackupProvider::new(ctx.test_db.db.clone()));
let lifecycle: Arc<dyn PostgresContainerLifecycle> = ctx.lifecycle_adapter.clone();

let result = async {
Expand Down Expand Up @@ -3843,7 +4028,7 @@ mod tests {

let ctx = UpgradeTestCtx::new("dump-idem").await;
let backup_provider: Arc<dyn PreUpgradeBackupProvider> =
Arc::new(StubBackupProvider::new());
Arc::new(StubBackupProvider::new(ctx.test_db.db.clone()));
let lifecycle: Arc<dyn PostgresContainerLifecycle> = ctx.lifecycle_adapter.clone();

let result = async {
Expand Down Expand Up @@ -3913,7 +4098,7 @@ mod tests {
// state, the whole retry should complete FAST (< 60s typ).
let retry_start = std::time::Instant::now();
let orch2 = ctx.orchestrator(
Arc::new(StubBackupProvider::new()),
Arc::new(StubBackupProvider::new(ctx.test_db.db.clone())),
ctx.lifecycle_adapter.clone(),
);
orch2
Expand Down
Loading
Loading