From f8a2a71f849a90c339cca75bed04881226ebf1bf Mon Sep 17 00:00:00 2001 From: "Asier G. Morato" Date: Mon, 15 Jun 2026 16:35:30 +0200 Subject: [PATCH 1/3] Multi-process database support: absolute paths, open retry, cross-process change signal Three core changes enabling a PowerSync database file to be shared across processes (an app and its widgets or App Intents extensions), extracted from the SwiftData DataStore work for separate review. 1. Absolute dbFilename paths. PowerSyncDatabase(dbFilename:) treats a path starting with '/' as absolute and uses it as-is, so the database can live in an App Group container. close(deleteDatabase: true) deletes the files at that location. Plain names are unchanged. 2. Concurrent-open retry. 'pragma journal_mode = WAL' needs a moment of exclusive access and reports SQLITE_BUSY/SQLITE_BUSY_RECOVERY without consulting the busy handler, so two processes opening the same file concurrently failed (~80% in a two-process harness). Opening and configuring a connection now retries with backoff. Reproduced in-process with a system-SQLite lock holder. 3. Cross-process change signal. Update hooks are per-pool, so watch was blind to writes from other processes. Each pool posts a Darwin notification (the mechanism Core Data uses for remote changes) after every committed write and re-emits tableUpdates with EXTERNAL_CHANGES_MARKER on receipt, which watch and the upload client treat as 'unknown tables changed'. Two pools over one file reproduce the cross-process behavior deterministically. Builds with -strict-concurrency=complete; 35 tests across the new suites and the in-memory sync integration tests pass. --- CHANGELOG.md | 15 +++ .../Implementation/AsyncConnectionPool.swift | 91 +++++++++++++++++-- .../CrossProcessChangeSignal.swift | 75 +++++++++++++++ .../PowerSyncDatabaseImpl.swift | 10 +- .../Implementation/queries/watch.swift | 2 + .../sync/StreamingSyncClient.swift | 4 +- Sources/PowerSync/PowerSyncDatabase.swift | 7 +- .../Protocol/SQLiteConnectionPool.swift | 5 + .../Implementation/AbsolutePathTests.swift | 39 ++++++++ .../Implementation/ConcurrentOpenTests.swift | 86 ++++++++++++++++++ .../CrossProcessSignalTests.swift | 84 +++++++++++++++++ Tests/PowerSyncTests/SyncTests.swift | 8 +- 12 files changed, 410 insertions(+), 16 deletions(-) create mode 100644 Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift create mode 100644 Tests/PowerSyncTests/Implementation/AbsolutePathTests.swift create mode 100644 Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift create mode 100644 Tests/PowerSyncTests/Implementation/CrossProcessSignalTests.swift diff --git a/CHANGELOG.md b/CHANGELOG.md index e4e13d4..9f8f100 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +## Unreleased + +* `PowerSyncDatabase(dbFilename:)` now accepts an absolute path (starting with `/`), used + as-is so the database can live in an App Group container shared with app extensions. + Plain filenames keep the existing behavior, and `close(deleteDatabase: true)` deletes the + files at the absolute location. +* Opening the connection pool retries while another process holds the database. The + `pragma journal_mode = WAL` transition reports `SQLITE_BUSY`/`SQLITE_BUSY_RECOVERY` + without consulting the busy handler, so concurrent cold opens (an app launching while its + extension opens the same file) used to fail; the pool now retries with backoff. +* Added an opt-in cross-process change signal: each pool posts a Darwin notification after + every committed write and, on receipt, re-emits `tableUpdates` with + `EXTERNAL_CHANGES_MARKER` so `watch` queries and the upload client wake for writes made by + other processes sharing the database file. In-memory databases skip the signal. + ## 1.14.3 - Fix CRUD uploads entering a `Delaying due to previously encountered CRUD item` loop. diff --git a/Sources/PowerSync/Implementation/AsyncConnectionPool.swift b/Sources/PowerSync/Implementation/AsyncConnectionPool.swift index b765903..7816d91 100644 --- a/Sources/PowerSync/Implementation/AsyncConnectionPool.swift +++ b/Sources/PowerSync/Implementation/AsyncConnectionPool.swift @@ -5,7 +5,20 @@ import Foundation enum DatabaseLocation { case inMemory case inDefaultDirectory(name: String) - + case atPath(String) + + /// The on-disk path other processes can share, or `nil` for in-memory databases. + var sharedPath: String? { + switch self { + case .inMemory: + return nil + case let .inDefaultDirectory(name): + return (try? DatabaseLocation.appleDefaultDatabaseDirectory().path).map { "\($0)/\(name)" } + case let .atPath(path): + return path + } + } + func openConnection(writer: Bool) throws -> RawSqliteConnection { var db: OpaquePointer? let rc: Int32 @@ -30,6 +43,21 @@ enum DatabaseLocation { SQLITE_OPEN_READONLY } rc = sqlite3_open_v2(path, &db, flags, nil) + case .atPath(let absolutePath): + let fileManager = FileManager.default + let directory = (absolutePath as NSString).deletingLastPathComponent + + if !fileManager.fileExists(atPath: directory) { + try fileManager.createDirectory(atPath: directory, withIntermediateDirectories: true) + } + + path = absolutePath + let flags = if writer { + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE + } else { + SQLITE_OPEN_READONLY + } + rc = sqlite3_open_v2(path, &db, flags, nil) } if rc != 0 { @@ -62,11 +90,16 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { private let logger: any LoggerProtocol private let tableUpdatesStream = BroadcastStream>() private let opener = PoolOpener() + /// Cross-process change signaling; `nil` for in-memory databases (nothing to share). + private let changeSignal: CrossProcessChangeSignal? init(location: DatabaseLocation, logger: any LoggerProtocol, initialStatements: [String] = []) { self.location = location self.logger = logger self.initialStatements = initialStatements + self.changeSignal = location.sharedPath.map { + CrossProcessChangeSignal(databasePath: $0, logger: logger) + } } var tableUpdates: AsyncStream> { @@ -88,12 +121,15 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { let _ = try context.execute(sql: stmt, parameters: []) } + // The busy handler is installed first so later statements wait instead of failing, + // but note it does NOT apply to the WAL transition below. + let _ = try context.execute(sql: "pragma busy_timeout = 30000", parameters: []) + if isWriter { let _ = try context.execute(sql: "pragma journal_mode = WAL", parameters: []) } let _ = try context.execute(sql: "pragma journal_size_limit = \(6 * 1024 * 1024)", parameters: []) - let _ = try context.execute(sql: "pragma busy_timeout = 30000", parameters: []) let _ = try context.execute(sql: "pragma cache_size = -\(50 * 1024)", parameters: []) if isWriter { @@ -110,6 +146,42 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { } } + /// Whether an error from opening/configuring a connection is transient contention + /// (another process holds the file, e.g. mid WAL-recovery) and worth retrying. + /// `pragma journal_mode = WAL` reports SQLITE_BUSY/SQLITE_BUSY_RECOVERY without + /// consulting the busy handler, so `busy_timeout` cannot cover the open path. + private static func isTransientOpenError(_ error: any Error) -> Bool { + guard case let PowerSyncError.sqliteError(extendedResultCode, _, _, _, _) = error else { + return false + } + let primary = extendedResultCode & 0xFF + return primary == SQLITE_BUSY || primary == SQLITE_LOCKED + } + + /// Opens and configures a connection, retrying with backoff while another process + /// holds the database (apps and their widgets/extensions open concurrently). + fileprivate func openConfiguredConnection(writer: Bool) throws -> RawSqliteConnection { + // ~5s total budget: 10ms doubling to a 250ms cap. Concurrent opens (app + widget) + // resolve in tens of milliseconds; a database still busy after seconds is stuck. + var delayMicroseconds: UInt32 = 10_000 + let deadline = DispatchTime.now() + .seconds(5) + while true { + do { + let connection = try location.openConnection(writer: writer) + try configureConnection(connection: connection, isWriter: writer) + return connection + } catch where Self.isTransientOpenError(error) && DispatchTime.now() < deadline { + // The failed connection is dropped (closed by deinit); reopen fresh. + logger.debug( + "database busy while opening (another process holds it); retrying in \(delayMicroseconds / 1000)ms", + tag: "AsyncConnectionPool" + ) + usleep(delayMicroseconds) + delayMicroseconds = min(delayMicroseconds * 2, 250_000) + } + } + } + /// Opens connections on a background thread to obtain the native connection pool. private func obtainInner() async throws -> NativeConnectionPool { try await opener.obtainPool(pool: self) @@ -137,6 +209,7 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { } func close() async throws { + changeSignal?.stop() try await self.opener.close() } @@ -152,11 +225,17 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { try registerPowerSyncCoreExtension() let handleUpdates: @Sendable (_: Set) -> () = { [weak context] updates in context?.tableUpdatesStream.dispatch(event: updates) + // Tell other processes sharing this file that tables changed. + context?.changeSignal?.post() + } + context.changeSignal?.start { [weak context] in + // Another process (or this one; harmless, throttled downstream) changed + // the database outside this pool's update hooks. + context?.tableUpdatesStream.dispatch(event: [EXTERNAL_CHANGES_MARKER]) } let pool = try await context.runBlocking { - let writer = try context.location.openConnection(writer: true) - try context.configureConnection(connection: writer, isWriter: true) + let writer = try context.openConfiguredConnection(writer: true) if case .inMemory = context.location { return NativeConnectionPool(singleConnection: writer, logger: context.logger, handleUpdates: handleUpdates) @@ -164,9 +243,7 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { let numReaders = 4 var readers = RigidDeque(capacity: numReaders) while !readers.isFull { - let connection = try context.location.openConnection(writer: false) - try context.configureConnection(connection: connection, isWriter: false) - readers.append(connection) + readers.append(try context.openConfiguredConnection(writer: false)) } return NativeConnectionPool(writer: writer, readers: readers, logger: context.logger, handleUpdates: handleUpdates) diff --git a/Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift b/Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift new file mode 100644 index 0000000..2178b13 --- /dev/null +++ b/Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift @@ -0,0 +1,75 @@ +import Foundation +import notify + +/// Cross-process change signaling for a database file, built on Darwin notifications — +/// the same mechanism Core Data uses for its remote-change notifications. +/// +/// PowerSync's update hooks only observe writes made through the local connection pool. +/// When several processes share a database file (an app and its widgets or App Intents +/// extensions), each process posts this signal after committing a write; the others +/// re-emit their `tableUpdates` with ``EXTERNAL_CHANGES_MARKER`` so `watch` queries +/// re-run and the upload client checks `ps_crud`. +/// +/// Darwin notifications carry no payload and are coalesced under pressure, which is fine: +/// the marker means "something changed, re-query". Deliveries to the posting process +/// itself are not suppressed — a redundant re-query (already throttled) is preferable to +/// the race a sender-stamp suppression scheme introduces, where an external change could +/// be misattributed and silently dropped. +final class CrossProcessChangeSignal: @unchecked Sendable { + private let name: String + private let logger: any LoggerProtocol + private var token: Int32 = NOTIFY_TOKEN_INVALID + private let queue = DispatchQueue(label: "powersync.cross-process-signal") + + init(databasePath: String, logger: any LoggerProtocol) { + // Stable across processes: both sides derive the name from the canonical path. + let canonical = URL(fileURLWithPath: databasePath).standardizedFileURL.path + self.name = "co.powersync.changes.\(Self.fnv1a(canonical))" + self.logger = logger + } + + /// Starts listening; `onExternalChange` runs on a private queue for every signal + /// (including this process's own posts). + func start(onChange: @escaping @Sendable () -> Void) { + guard token == NOTIFY_TOKEN_INVALID else { + return + } + let status = notify_register_dispatch(name, &token, queue) { _ in + onChange() + } + if status != NOTIFY_STATUS_OK { + logger.warning( + "could not register cross-process change signal (status \(status)); " + + "changes from other processes will not wake watch queries", + tag: "CrossProcessChangeSignal" + ) + token = NOTIFY_TOKEN_INVALID + } + } + + /// Posts the signal; called after every committed write. + func post() { + notify_post(name) + } + + func stop() { + if token != NOTIFY_TOKEN_INVALID { + notify_cancel(token) + token = NOTIFY_TOKEN_INVALID + } + } + + deinit { + stop() + } + + /// FNV-1a 64-bit, hex-encoded: deterministic and dependency-free. + private static func fnv1a(_ input: String) -> String { + var hash: UInt64 = 0xCBF2_9CE4_8422_2325 + for byte in input.utf8 { + hash ^= UInt64(byte) + hash = hash &* 0x0000_0100_0000_01B3 + } + return String(hash, radix: 16) + } +} diff --git a/Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift b/Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift index 2a52571..2e68427 100644 --- a/Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift +++ b/Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift @@ -121,9 +121,13 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { func close(deleteDatabase: Bool) async throws { try await close() if deleteDatabase, let dbFilename { - // We can use the supplied dbLocation when we support that in future - let directory = try DatabaseLocation.appleDefaultDatabaseDirectory() - try deleteSQLiteFiles(dbFilename: dbFilename, in: directory) + if dbFilename.hasPrefix("/") { + let url = URL(fileURLWithPath: dbFilename) + try deleteSQLiteFiles(dbFilename: url.lastPathComponent, in: url.deletingLastPathComponent()) + } else { + let directory = try DatabaseLocation.appleDefaultDatabaseDirectory() + try deleteSQLiteFiles(dbFilename: dbFilename, in: directory) + } } } diff --git a/Sources/PowerSync/Implementation/queries/watch.swift b/Sources/PowerSync/Implementation/queries/watch.swift index 00567bb..60e6c51 100644 --- a/Sources/PowerSync/Implementation/queries/watch.swift +++ b/Sources/PowerSync/Implementation/queries/watch.swift @@ -29,6 +29,8 @@ func watchImpl(db: PowerSyncDatabaseImpl, options: WatchOptio let updateNotifications = pool.tableUpdates.filter { changedTables in changedTables.contains(where: watchedTables.contains) + // Another process changed unknown tables: conservatively re-query. + || changedTables.contains(EXTERNAL_CHANGES_MARKER) }.map { _ in () } // Allows emitting the first result even if there aren't changes let withInitial = AsyncAlgorithms.merge([()].async, updateNotifications) diff --git a/Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift b/Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift index 7191f7d..dc108a8 100644 --- a/Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift +++ b/Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift @@ -37,7 +37,9 @@ final class StreamingSyncClient: Sendable { } private func uploadLoop(signals: SyncSignals) async throws { - let updates = db.pool.tableUpdates.filter { updates in updates.contains("ps_crud") }.map { _ in () } + let updates = db.pool.tableUpdates.filter { updates in + updates.contains("ps_crud") || updates.contains(EXTERNAL_CHANGES_MARKER) + }.map { _ in () } let allTriggers = MergeItemSequence(inner: AsyncAlgorithms.merge(updates, signals.signalCrudUpload.subscribe())).makeAsyncIterator() // Use a do-while loop to ensure we start an upload iteration even if we can't connect to the service. diff --git a/Sources/PowerSync/PowerSyncDatabase.swift b/Sources/PowerSync/PowerSyncDatabase.swift index 034b4e9..8a7d04e 100644 --- a/Sources/PowerSync/PowerSyncDatabase.swift +++ b/Sources/PowerSync/PowerSyncDatabase.swift @@ -6,7 +6,10 @@ public let DEFAULT_DB_FILENAME = "powersync.db" /// Creates a PowerSyncDatabase instance /// - Parameters: /// - schema: The database schema -/// - dbFilename: The database filename. Defaults to "powersync.db" +/// - dbFilename: The database filename. Defaults to "powersync.db". Plain names are +/// stored in the default databases directory; an absolute path (starting with "/") is +/// used as-is, which allows sharing the database with app extensions through an App +/// Group container. /// - logger: Optional logging interface /// - initialStatements: An optional list of statements to run as the database is opened. /// - Returns: A configured PowerSyncDatabase instance @@ -18,6 +21,8 @@ public func PowerSyncDatabase( ) -> PowerSyncDatabaseProtocol { let (location, group) = if dbFilename == ":memory:" { (DatabaseLocation.inMemory, DatabaseGroupCollection()) + } else if dbFilename.hasPrefix("/") { + (DatabaseLocation.atPath(dbFilename), .shared) } else { (DatabaseLocation.inDefaultDirectory(name: dbFilename), .shared) } diff --git a/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift b/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift index 8edb486..598a736 100644 --- a/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift +++ b/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift @@ -57,6 +57,11 @@ public protocol SQLiteStatementIteratorProtocol { /// This is the underlying pool implementation on which the higher-level PowerSync Swift SDK is built on. /// /// This is an internal protocol and should not be implemented outside of the PowerSync SDK. +/// Emitted on `tableUpdates` when another process changed the database. The concrete +/// tables are unknown (cross-process signals carry no payload), so consumers must treat +/// this as potentially matching every table they watch. +public let EXTERNAL_CHANGES_MARKER = "__powersync_external_changes__" + public protocol SQLiteConnectionPoolProtocol: Sendable { var tableUpdates: AsyncStream> { get } diff --git a/Tests/PowerSyncTests/Implementation/AbsolutePathTests.swift b/Tests/PowerSyncTests/Implementation/AbsolutePathTests.swift new file mode 100644 index 0000000..ae1320e --- /dev/null +++ b/Tests/PowerSyncTests/Implementation/AbsolutePathTests.swift @@ -0,0 +1,39 @@ +import Foundation +@testable import PowerSync +import Testing + +struct AbsolutePathTests { + /// An absolute `dbFilename` opens the database at that exact path (creating parent + /// directories), persists across instances, and `close(deleteDatabase:)` removes the + /// files there. This is what app extensions need to share a database through an App + /// Group container. + @Test func opensPersistsAndDeletesAtAbsolutePath() async throws { + let schema = Schema(tables: [ + Table(name: "items", columns: [.text("name")]), + ]) + let directory = FileManager.default.temporaryDirectory + .appendingPathComponent("powersync-absolute-\(UUID().uuidString)") + let path = directory.appendingPathComponent("nested/shared.db").path + + let database = PowerSyncDatabase(schema: schema, dbFilename: path, logger: DefaultLogger()) + try await database.disconnectAndClear() + _ = try await database.execute( + sql: "INSERT INTO items (id, name) VALUES (?, ?)", + parameters: ["i1", "shared"] + ) + #expect(FileManager.default.fileExists(atPath: path)) + try await database.close() + + // A second instance over the same absolute path sees the data. + let reopened = PowerSyncDatabase(schema: schema, dbFilename: path, logger: DefaultLogger()) + let name = try await reopened.get(sql: "SELECT name FROM items WHERE id = ?", parameters: ["i1"]) { + try $0.getString(index: 0) + } + #expect(name == "shared") + + try await reopened.close(deleteDatabase: true) + #expect(!FileManager.default.fileExists(atPath: path)) + + try? FileManager.default.removeItem(at: directory) + } +} diff --git a/Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift b/Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift new file mode 100644 index 0000000..3ba01c7 --- /dev/null +++ b/Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift @@ -0,0 +1,86 @@ +import Foundation +@testable import PowerSync +import SQLite3 +import Testing + +/// Opening a database while another process holds it must retry instead of failing. +/// +/// `pragma journal_mode = WAL` needs a moment of exclusive access; when another process +/// (an app while its widget refreshes, a widget while the app launches) holds any lock — +/// or is mid WAL-recovery — SQLite reports SQLITE_BUSY/SQLITE_BUSY_RECOVERY *without* +/// consulting the busy handler, so `busy_timeout` cannot save the open. The pool has to +/// retry with backoff. Reproduced here in-process with a system-SQLite connection holding +/// a reserved lock, which is byte-for-byte the same file-level contention two processes +/// produce. +@Suite("Concurrent open") +struct ConcurrentOpenTests { + /// Sendable wrapper for the lock-holding SQLite connection used across tasks. + private final class LockHolder: @unchecked Sendable { + var connection: OpaquePointer? + func exec(_ sql: String) -> Int32 { sqlite3_exec(connection, sql, nil, nil, nil) } + func close() { sqlite3_close_v2(connection) } + } + + @available(iOS 16, macOS 13, watchOS 9, tvOS 16, *) + @Test(.timeLimit(.minutes(1))) + func openRetriesWhileAnotherConnectionHoldsTheDatabase() async throws { + let path = FileManager.default.temporaryDirectory + .appendingPathComponent("concurrent-open-\(UUID().uuidString).db").path + + // Simulate the other process: a plain SQLite connection (journal mode still + // DELETE) holding a reserved lock when our pool tries to switch to WAL. + let holder = LockHolder() + #expect(sqlite3_open(path, &holder.connection) == SQLITE_OK) + defer { holder.close() } + #expect(holder.exec("CREATE TABLE other(x); BEGIN IMMEDIATE; INSERT INTO other VALUES (1);") == SQLITE_OK) + + // Release the lock shortly after the pool starts opening. + let releaser = Task { + try await Task.sleep(for: .milliseconds(500)) + _ = holder.exec("COMMIT") + } + + let database = PowerSyncDatabase( + schema: Schema(tables: [Table(name: "item", columns: [.text("title")])]), + dbFilename: path, + logger: DefaultLogger(minSeverity: .warning) + ) + // Forces the pool open (pragma journal_mode = WAL) while the lock is held. + _ = try await database.execute("SELECT 1") + let mode = try await database.get(sql: "pragma journal_mode", parameters: []) { + try $0.getString(index: 0) + } + #expect(mode.lowercased() == "wal") + + try await releaser.value + try await database.close(deleteDatabase: true) + } + + @available(iOS 16, macOS 13, watchOS 9, tvOS 16, *) + @Test(.timeLimit(.minutes(1))) + func openStillFailsWhenTheDatabaseNeverFreesUp() async throws { + let path = FileManager.default.temporaryDirectory + .appendingPathComponent("concurrent-open-stuck-\(UUID().uuidString).db").path + + let holder = LockHolder() + #expect(sqlite3_open(path, &holder.connection) == SQLITE_OK) + defer { + _ = holder.exec("COMMIT") + holder.close() + try? FileManager.default.removeItem(atPath: path) + } + #expect(holder.exec("CREATE TABLE other(x); BEGIN IMMEDIATE; INSERT INTO other VALUES (1);") == SQLITE_OK) + + // The lock is never released: the retry budget must eventually surface the error + // (bounded, not an infinite hang). + let database = PowerSyncDatabase( + schema: Schema(tables: [Table(name: "item", columns: [.text("title")])]), + dbFilename: path, + logger: DefaultLogger(minSeverity: .warning) + ) + await #expect(throws: (any Error).self) { + _ = try await database.execute("SELECT 1") + } + try? await database.close() + } +} diff --git a/Tests/PowerSyncTests/Implementation/CrossProcessSignalTests.swift b/Tests/PowerSyncTests/Implementation/CrossProcessSignalTests.swift new file mode 100644 index 0000000..d57ef5f --- /dev/null +++ b/Tests/PowerSyncTests/Implementation/CrossProcessSignalTests.swift @@ -0,0 +1,84 @@ +import Foundation +@testable import PowerSync +import Testing + +/// Changes made by another process must wake `watch` queries. +/// +/// PowerSync's update hooks are per-pool: a write that goes through a different pool +/// (another process — an app's widget or App Intent extension) never reaches this pool's +/// `tableUpdates`, so `watch` (and everything built on it: the SwiftData change observer, +/// the upload trigger) was structurally blind to it. A cross-process Darwin notification +/// posted after every committed write closes the gap. +/// +/// Two `PowerSyncDatabase` instances over the same file inside one test process use two +/// independent pools, reproducing byte-for-byte the blindness two processes exhibit. +@Suite("Cross-process change signal") +struct CrossProcessSignalTests { + private static func makeDatabase(path: String) -> any PowerSyncDatabaseProtocol { + PowerSyncDatabase( + schema: Schema(tables: [Table(name: "item", columns: [.text("title")])]), + dbFilename: path, + logger: DefaultLogger(minSeverity: .warning) + ) + } + + @available(iOS 16, macOS 13, watchOS 9, tvOS 16, *) + @Test(.timeLimit(.minutes(1))) + func watchWakesUpForWritesFromAnotherPool() async throws { + let path = FileManager.default.temporaryDirectory + .appendingPathComponent("cross-signal-\(UUID().uuidString).db").path + + let observerSide = Self.makeDatabase(path: path) + let writerSide = Self.makeDatabase(path: path) + + // Collect watch emissions on the observer side. + let counts = try observerSide.watch( + sql: "SELECT COUNT(*) FROM item", + parameters: [] + ) { try $0.getInt(index: 0) } + var iterator = counts.makeAsyncIterator() + let initial = try await iterator.next() + #expect(initial == [0]) + + // A write through the OTHER pool: same file, different update hooks. + _ = try await writerSide.execute( + sql: "INSERT INTO item (id, title) VALUES (uuid(), ?)", + parameters: ["externa"] + ) + + // Without the cross-process signal this hangs until the time limit. + let afterExternalWrite = try await iterator.next() + #expect(afterExternalWrite == [1]) + + try await observerSide.close() + try await writerSide.close(deleteDatabase: true) + } + + @available(iOS 16, macOS 13, watchOS 9, tvOS 16, *) + @Test(.timeLimit(.minutes(1))) + func externalChangeMarkerMatchesEveryWatchedTable() async throws { + // The marker says "unknown tables changed"; a watch over any table must re-query. + let path = FileManager.default.temporaryDirectory + .appendingPathComponent("cross-signal-marker-\(UUID().uuidString).db").path + + let observerSide = Self.makeDatabase(path: path) + let writerSide = Self.makeDatabase(path: path) + + // Two watches over different shapes (table scan and aggregate) both wake up. + let titles = try observerSide.watch( + sql: "SELECT title FROM item ORDER BY title", + parameters: [] + ) { try $0.getString(index: 0) } + var titlesIterator = titles.makeAsyncIterator() + _ = try await titlesIterator.next() + + _ = try await writerSide.execute( + sql: "INSERT INTO item (id, title) VALUES (uuid(), ?)", + parameters: ["uno"] + ) + #expect(try await titlesIterator.next() == [["uno"]].first) + + try await observerSide.close() + try await writerSide.close(deleteDatabase: true) + } +} diff --git a/Tests/PowerSyncTests/SyncTests.swift b/Tests/PowerSyncTests/SyncTests.swift index 322b6e6..2521bca 100644 --- a/Tests/PowerSyncTests/SyncTests.swift +++ b/Tests/PowerSyncTests/SyncTests.swift @@ -236,9 +236,9 @@ class InMemorySyncIntegrationTests { @Test @MainActor func uploadsOfflineWrites() async throws { let channel = AsyncThrowingChannel() - var allowConnection = false - let mockClient = MockHttpClient { @MainActor request in - if allowConnection { + let allowConnection = Mutex(false) + let mockClient = MockHttpClient { _ in + if allowConnection.withLock({ $0 }) { return channel } throw PowerSyncError.operationFailed(message: "Fake IO error for test", underlyingError: nil) @@ -254,7 +254,7 @@ class InMemorySyncIntegrationTests { var query = try db.watch("SELECT name FROM users") { try $0.getString(index: 0) }.makeAsyncIterator() try #require(try await query.next() == ["local write"]) - allowConnection = true + allowConnection.withLock { $0 = true } try await channel.pushLine(.fullCheckpoint(Checkpoint(last_op_id: "1", buckets: [BucketChecksum(bucket: "a", checksum: 0)], writeCheckpoint: "1"))) try await channel.pushLine(.syncDataBucket(SyncDataBucket(bucket: "a", data: [OplogEntry( checksum: 0, From dac1f693e4371a7b5f3b10c25ede45dd5e3fe097 Mon Sep 17 00:00:00 2001 From: "Asier G. Morato" Date: Thu, 18 Jun 2026 15:43:22 +0200 Subject: [PATCH 2/3] Address review on multi-process core changes - Open retry is now asynchronous: the retry/backoff loop awaits with Task.sleep between attempts instead of blocking a thread with usleep, which also makes a database open cancellable (new test). The whole pool is still built in one runBlocking unit since RawSqliteConnection is ~Copyable and cannot cross the async boundary. - The cross-process change signal is only enabled for absolute-path databases (App Group containers); the default directory lives in the app's sandbox and is not reachable by extensions, so signalling it was wasted overhead. - EXTERNAL_CHANGES_MARKER is now internal; watch already accounts for it and there is no public onChange API, so consumers don't need it. - Extracted a shared connection-open helper between the default-directory and absolute-path cases. - Use isDirectory: true when building the default database directory URL to avoid a blocking stat, per Apple's file-access guidance. 109 XCTests plus 55 Swift Testing tests across 11 suites pass with -strict-concurrency=complete. --- CHANGELOG.md | 4 +- .../Implementation/AsyncConnectionPool.swift | 125 ++++++++---------- .../Protocol/SQLiteConnectionPool.swift | 2 +- .../Implementation/ConcurrentOpenTests.swift | 42 ++++++ 4 files changed, 104 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f8f100..a073d54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,9 @@ * Added an opt-in cross-process change signal: each pool posts a Darwin notification after every committed write and, on receipt, re-emits `tableUpdates` with `EXTERNAL_CHANGES_MARKER` so `watch` queries and the upload client wake for writes made by - other processes sharing the database file. In-memory databases skip the signal. + other processes sharing the database file. Only databases opened from an absolute path + (an App Group container) use the signal; in-memory and default-directory databases skip + it, since they can't be shared across processes. ## 1.14.3 diff --git a/Sources/PowerSync/Implementation/AsyncConnectionPool.swift b/Sources/PowerSync/Implementation/AsyncConnectionPool.swift index 7816d91..8adaf1e 100644 --- a/Sources/PowerSync/Implementation/AsyncConnectionPool.swift +++ b/Sources/PowerSync/Implementation/AsyncConnectionPool.swift @@ -7,63 +7,47 @@ enum DatabaseLocation { case inDefaultDirectory(name: String) case atPath(String) - /// The on-disk path other processes can share, or `nil` for in-memory databases. + /// The on-disk path other processes can share, or `nil` when the database can't be + /// shared. Only absolute paths (typically an App Group container) are shareable; the + /// default directory is inside the app's own sandbox, which extensions cannot reach. var sharedPath: String? { switch self { - case .inMemory: + case .inMemory, .inDefaultDirectory: return nil - case let .inDefaultDirectory(name): - return (try? DatabaseLocation.appleDefaultDatabaseDirectory().path).map { "\($0)/\(name)" } case let .atPath(path): return path } } func openConnection(writer: Bool) throws -> RawSqliteConnection { - var db: OpaquePointer? - let rc: Int32 - let path: String - switch self { case .inMemory: - path = ":memory:" - rc = sqlite3_open_v2(path, &db, SQLITE_OPEN_READWRITE, nil) - case .inDefaultDirectory(let name): - let fileManager = FileManager.default - let databaseDirectory = (try DatabaseLocation.appleDefaultDatabaseDirectory()).path - - if !fileManager.fileExists(atPath: databaseDirectory) { - try fileManager.createDirectory(atPath: databaseDirectory, withIntermediateDirectories: true) - } - - path = "\(databaseDirectory)/\(name)" - let flags = if writer { - SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE - } else { - SQLITE_OPEN_READONLY - } - rc = sqlite3_open_v2(path, &db, flags, nil) - case .atPath(let absolutePath): - let fileManager = FileManager.default + return try DatabaseLocation.open(path: ":memory:", flags: SQLITE_OPEN_READWRITE) + case let .inDefaultDirectory(name): + let directory = (try DatabaseLocation.appleDefaultDatabaseDirectory()).path + return try DatabaseLocation.openFile(at: "\(directory)/\(name)", in: directory, writer: writer) + case let .atPath(absolutePath): let directory = (absolutePath as NSString).deletingLastPathComponent + return try DatabaseLocation.openFile(at: absolutePath, in: directory, writer: writer) + } + } - if !fileManager.fileExists(atPath: directory) { - try fileManager.createDirectory(atPath: directory, withIntermediateDirectories: true) - } - - path = absolutePath - let flags = if writer { - SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE - } else { - SQLITE_OPEN_READONLY - } - rc = sqlite3_open_v2(path, &db, flags, nil) + /// Creates `directory` if needed, then opens the database file with the right flags. + private static func openFile(at path: String, in directory: String, writer: Bool) throws -> RawSqliteConnection { + let fileManager = FileManager.default + if !fileManager.fileExists(atPath: directory) { + try fileManager.createDirectory(atPath: directory, withIntermediateDirectories: true) } + let flags = writer ? (SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE) : SQLITE_OPEN_READONLY + return try open(path: path, flags: flags) + } + private static func open(path: String, flags: Int32) throws -> RawSqliteConnection { + var db: OpaquePointer? + let rc = sqlite3_open_v2(path, &db, flags, nil) if rc != 0 { throw PowerSyncError.sqliteError(extendedResultCode: rc, offset: nil, message: "Could not open database \(path)", errorString: nil, sql: nil) } - return RawSqliteConnection(connection: db!) } @@ -79,7 +63,8 @@ enum DatabaseLocation { throw PowerSyncError.operationFailed(message: "Unable to find application support directory") } - return documentsDirectory.appendingPathComponent("databases") + // `isDirectory: true` avoids a blocking stat to infer the URL kind. + return documentsDirectory.appendingPathComponent("databases", isDirectory: true) } } @@ -158,26 +143,47 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { return primary == SQLITE_BUSY || primary == SQLITE_LOCKED } - /// Opens and configures a connection, retrying with backoff while another process - /// holds the database (apps and their widgets/extensions open concurrently). - fileprivate func openConfiguredConnection(writer: Bool) throws -> RawSqliteConnection { + /// Opens and configures all connections of the pool in a single blocking unit of work. + /// One attempt: `RawSqliteConnection` is `~Copyable` and cannot cross the async + /// boundary, so the whole pool is built here and the retry/backoff lives in the async + /// caller (``buildPoolWithRetry(handleUpdates:)``). + fileprivate func buildPool(handleUpdates: @escaping @Sendable (Set) -> Void) throws -> NativeConnectionPool { + let writer = try location.openConnection(writer: true) + try configureConnection(connection: writer, isWriter: true) + + if case .inMemory = location { + return NativeConnectionPool(singleConnection: writer, logger: logger, handleUpdates: handleUpdates) + } + let numReaders = 4 + var readers = RigidDeque(capacity: numReaders) + while !readers.isFull { + let reader = try location.openConnection(writer: false) + try configureConnection(connection: reader, isWriter: false) + readers.append(reader) + } + return NativeConnectionPool(writer: writer, readers: readers, logger: logger, handleUpdates: handleUpdates) + } + + /// Builds the pool, retrying with asynchronous backoff while another process holds the + /// database (apps and their widgets/extensions open concurrently). Awaiting between + /// attempts pins no thread and is cancellable, unlike a blocking sleep. + private func buildPoolWithRetry(handleUpdates: @escaping @Sendable (Set) -> Void) async throws -> NativeConnectionPool { // ~5s total budget: 10ms doubling to a 250ms cap. Concurrent opens (app + widget) // resolve in tens of milliseconds; a database still busy after seconds is stuck. - var delayMicroseconds: UInt32 = 10_000 + // `Task.sleep(nanoseconds:)` keeps the package's iOS 15 / macOS 12 floor while + // staying async and cancellable. + var delayNanoseconds: UInt64 = 10_000_000 let deadline = DispatchTime.now() + .seconds(5) while true { do { - let connection = try location.openConnection(writer: writer) - try configureConnection(connection: connection, isWriter: writer) - return connection + return try await runBlocking { try self.buildPool(handleUpdates: handleUpdates) } } catch where Self.isTransientOpenError(error) && DispatchTime.now() < deadline { - // The failed connection is dropped (closed by deinit); reopen fresh. logger.debug( - "database busy while opening (another process holds it); retrying in \(delayMicroseconds / 1000)ms", + "database busy while opening (another process holds it); retrying in \(delayNanoseconds / 1_000_000)ms", tag: "AsyncConnectionPool" ) - usleep(delayMicroseconds) - delayMicroseconds = min(delayMicroseconds * 2, 250_000) + try await Task.sleep(nanoseconds: delayNanoseconds) + delayNanoseconds = min(delayNanoseconds * 2, 250_000_000) } } } @@ -234,22 +240,7 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { context?.tableUpdatesStream.dispatch(event: [EXTERNAL_CHANGES_MARKER]) } - let pool = try await context.runBlocking { - let writer = try context.openConfiguredConnection(writer: true) - - if case .inMemory = context.location { - return NativeConnectionPool(singleConnection: writer, logger: context.logger, handleUpdates: handleUpdates) - } else { - let numReaders = 4 - var readers = RigidDeque(capacity: numReaders) - while !readers.isFull { - readers.append(try context.openConfiguredConnection(writer: false)) - } - - return NativeConnectionPool(writer: writer, readers: readers, logger: context.logger, handleUpdates: handleUpdates) - } - } - + let pool = try await context.buildPoolWithRetry(handleUpdates: handleUpdates) self.pool = pool return pool } diff --git a/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift b/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift index 598a736..fb36b22 100644 --- a/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift +++ b/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift @@ -60,7 +60,7 @@ public protocol SQLiteStatementIteratorProtocol { /// Emitted on `tableUpdates` when another process changed the database. The concrete /// tables are unknown (cross-process signals carry no payload), so consumers must treat /// this as potentially matching every table they watch. -public let EXTERNAL_CHANGES_MARKER = "__powersync_external_changes__" +let EXTERNAL_CHANGES_MARKER = "__powersync_external_changes__" public protocol SQLiteConnectionPoolProtocol: Sendable { var tableUpdates: AsyncStream> { get } diff --git a/Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift b/Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift index 3ba01c7..441ced9 100644 --- a/Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift +++ b/Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift @@ -83,4 +83,46 @@ struct ConcurrentOpenTests { } try? await database.close() } + + @available(iOS 16, macOS 13, watchOS 9, tvOS 16, *) + @Test(.timeLimit(.minutes(1))) + func openRetryIsCancellable() async throws { + // The async retry awaits between attempts instead of blocking a thread, so a task + // opening a database that is held by another connection can be cancelled promptly + // rather than waiting out the full retry budget. + let path = FileManager.default.temporaryDirectory + .appendingPathComponent("concurrent-open-cancel-\(UUID().uuidString).db").path + + let holder = LockHolder() + #expect(sqlite3_open(path, &holder.connection) == SQLITE_OK) + defer { + _ = holder.exec("COMMIT") + holder.close() + try? FileManager.default.removeItem(atPath: path) + } + #expect(holder.exec("CREATE TABLE other(x); BEGIN IMMEDIATE; INSERT INTO other VALUES (1);") == SQLITE_OK) + + let database = PowerSyncDatabase( + schema: Schema(tables: [Table(name: "item", columns: [.text("title")])]), + dbFilename: path, + logger: DefaultLogger(minSeverity: .warning) + ) + let opening = Task { + _ = try await database.execute("SELECT 1") + } + // Let a couple of retry attempts happen, then cancel. + try await Task.sleep(for: .milliseconds(100)) + opening.cancel() + + let start = ContinuousClock.now + let result = await opening.result + let elapsed = ContinuousClock.now - start + // Cancellation resolves well before the ~5s retry budget would expire. + #expect(elapsed < .seconds(2)) + if case .success = result { + Issue.record("opening should not have succeeded while the database is held") + } + + try? await database.close() + } } From 541e7f93a327d8ff78ad355db0635c6072d2b9e9 Mon Sep 17 00:00:00 2001 From: "Asier G. Morato" Date: Sun, 28 Jun 2026 14:46:01 +0200 Subject: [PATCH 3/3] Address minor review comments on multi-process core - Darwin notification name prefix com.powersync.changes (was co.). - Simplify the changelog entry to one feature bullet, dropping the internal-only details. - Document that only the main app should call connect(); extensions can read and write the shared database but must not open a second sync connection. --- CHANGELOG.md | 14 ++------------ .../Implementation/CrossProcessChangeSignal.swift | 2 +- Sources/PowerSync/PowerSyncDatabase.swift | 5 ++++- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a073d54..f524544 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,18 +4,8 @@ * `PowerSyncDatabase(dbFilename:)` now accepts an absolute path (starting with `/`), used as-is so the database can live in an App Group container shared with app extensions. - Plain filenames keep the existing behavior, and `close(deleteDatabase: true)` deletes the - files at the absolute location. -* Opening the connection pool retries while another process holds the database. The - `pragma journal_mode = WAL` transition reports `SQLITE_BUSY`/`SQLITE_BUSY_RECOVERY` - without consulting the busy handler, so concurrent cold opens (an app launching while its - extension opens the same file) used to fail; the pool now retries with backoff. -* Added an opt-in cross-process change signal: each pool posts a Darwin notification after - every committed write and, on receipt, re-emits `tableUpdates` with - `EXTERNAL_CHANGES_MARKER` so `watch` queries and the upload client wake for writes made by - other processes sharing the database file. Only databases opened from an absolute path - (an App Group container) use the signal; in-memory and default-directory databases skip - it, since they can't be shared across processes. + Plain filenames keep the existing behavior. The SDK coordinates opening the database to + avoid conflicts and can share update notifications across the main app and extensions. ## 1.14.3 diff --git a/Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift b/Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift index 2178b13..c06984b 100644 --- a/Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift +++ b/Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift @@ -24,7 +24,7 @@ final class CrossProcessChangeSignal: @unchecked Sendable { init(databasePath: String, logger: any LoggerProtocol) { // Stable across processes: both sides derive the name from the canonical path. let canonical = URL(fileURLWithPath: databasePath).standardizedFileURL.path - self.name = "co.powersync.changes.\(Self.fnv1a(canonical))" + self.name = "com.powersync.changes.\(Self.fnv1a(canonical))" self.logger = logger } diff --git a/Sources/PowerSync/PowerSyncDatabase.swift b/Sources/PowerSync/PowerSyncDatabase.swift index 8a7d04e..f9ed966 100644 --- a/Sources/PowerSync/PowerSyncDatabase.swift +++ b/Sources/PowerSync/PowerSyncDatabase.swift @@ -9,7 +9,10 @@ public let DEFAULT_DB_FILENAME = "powersync.db" /// - dbFilename: The database filename. Defaults to "powersync.db". Plain names are /// stored in the default databases directory; an absolute path (starting with "/") is /// used as-is, which allows sharing the database with app extensions through an App -/// Group container. +/// Group container. The database itself can be used concurrently from the main app and +/// its extensions, but only the main app should call `connect`. Two sync connections on +/// the same database waste resources and are untested (and could corrupt the sync +/// client); let extensions read and write, and leave syncing to the app. /// - logger: Optional logging interface /// - initialStatements: An optional list of statements to run as the database is opened. /// - Returns: A configured PowerSyncDatabase instance