diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d9b347..05e5817 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## Unreleased + +* `PowerSyncDatabase(dbFilename:)` now accepts an absolute path (starting with `/`), used + as-is so the database can live in an App Group container shared with app extensions. + Plain filenames keep the existing behavior. The SDK coordinates opening the database to + avoid conflicts and can share update notifications across the main app and extensions. + ## 1.14.4 - Fix crash when running a statement in a cursor callback ([#148](https://github.com/powersync-ja/powersync-swift/issues/148)). diff --git a/Sources/PowerSync/Implementation/AsyncConnectionPool.swift b/Sources/PowerSync/Implementation/AsyncConnectionPool.swift index b765903..8adaf1e 100644 --- a/Sources/PowerSync/Implementation/AsyncConnectionPool.swift +++ b/Sources/PowerSync/Implementation/AsyncConnectionPool.swift @@ -5,37 +5,49 @@ import Foundation enum DatabaseLocation { case inMemory case inDefaultDirectory(name: String) - + case atPath(String) + + /// The on-disk path other processes can share, or `nil` when the database can't be + /// shared. Only absolute paths (typically an App Group container) are shareable; the + /// default directory is inside the app's own sandbox, which extensions cannot reach. + var sharedPath: String? { + switch self { + case .inMemory, .inDefaultDirectory: + return nil + case let .atPath(path): + return path + } + } + func openConnection(writer: Bool) throws -> RawSqliteConnection { - var db: OpaquePointer? - let rc: Int32 - let path: String - switch self { case .inMemory: - path = ":memory:" - rc = sqlite3_open_v2(path, &db, SQLITE_OPEN_READWRITE, nil) - case .inDefaultDirectory(let name): - let fileManager = FileManager.default - let databaseDirectory = (try DatabaseLocation.appleDefaultDatabaseDirectory()).path - - if !fileManager.fileExists(atPath: databaseDirectory) { - try fileManager.createDirectory(atPath: databaseDirectory, withIntermediateDirectories: true) - } + return try DatabaseLocation.open(path: ":memory:", flags: SQLITE_OPEN_READWRITE) + case let .inDefaultDirectory(name): + let directory = (try DatabaseLocation.appleDefaultDatabaseDirectory()).path + return try DatabaseLocation.openFile(at: "\(directory)/\(name)", in: directory, writer: writer) + case let .atPath(absolutePath): + let directory = (absolutePath as NSString).deletingLastPathComponent + return try DatabaseLocation.openFile(at: absolutePath, in: directory, writer: writer) + } + } - path = "\(databaseDirectory)/\(name)" - let flags = if writer { - SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE - } else { - SQLITE_OPEN_READONLY - } - rc = sqlite3_open_v2(path, &db, flags, nil) + /// Creates `directory` if needed, then opens the database file with the right flags. + private static func openFile(at path: String, in directory: String, writer: Bool) throws -> RawSqliteConnection { + let fileManager = FileManager.default + if !fileManager.fileExists(atPath: directory) { + try fileManager.createDirectory(atPath: directory, withIntermediateDirectories: true) } + let flags = writer ? (SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE) : SQLITE_OPEN_READONLY + return try open(path: path, flags: flags) + } + private static func open(path: String, flags: Int32) throws -> RawSqliteConnection { + var db: OpaquePointer? + let rc = sqlite3_open_v2(path, &db, flags, nil) if rc != 0 { throw PowerSyncError.sqliteError(extendedResultCode: rc, offset: nil, message: "Could not open database \(path)", errorString: nil, sql: nil) } - return RawSqliteConnection(connection: db!) } @@ -51,7 +63,8 @@ enum DatabaseLocation { throw PowerSyncError.operationFailed(message: "Unable to find application support directory") } - return documentsDirectory.appendingPathComponent("databases") + // `isDirectory: true` avoids a blocking stat to infer the URL kind. + return documentsDirectory.appendingPathComponent("databases", isDirectory: true) } } @@ -62,11 +75,16 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { private let logger: any LoggerProtocol private let tableUpdatesStream = BroadcastStream>() private let opener = PoolOpener() + /// Cross-process change signaling; `nil` for in-memory databases (nothing to share). + private let changeSignal: CrossProcessChangeSignal? init(location: DatabaseLocation, logger: any LoggerProtocol, initialStatements: [String] = []) { self.location = location self.logger = logger self.initialStatements = initialStatements + self.changeSignal = location.sharedPath.map { + CrossProcessChangeSignal(databasePath: $0, logger: logger) + } } var tableUpdates: AsyncStream> { @@ -88,12 +106,15 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { let _ = try context.execute(sql: stmt, parameters: []) } + // The busy handler is installed first so later statements wait instead of failing, + // but note it does NOT apply to the WAL transition below. + let _ = try context.execute(sql: "pragma busy_timeout = 30000", parameters: []) + if isWriter { let _ = try context.execute(sql: "pragma journal_mode = WAL", parameters: []) } let _ = try context.execute(sql: "pragma journal_size_limit = \(6 * 1024 * 1024)", parameters: []) - let _ = try context.execute(sql: "pragma busy_timeout = 30000", parameters: []) let _ = try context.execute(sql: "pragma cache_size = -\(50 * 1024)", parameters: []) if isWriter { @@ -110,6 +131,63 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { } } + /// Whether an error from opening/configuring a connection is transient contention + /// (another process holds the file, e.g. mid WAL-recovery) and worth retrying. + /// `pragma journal_mode = WAL` reports SQLITE_BUSY/SQLITE_BUSY_RECOVERY without + /// consulting the busy handler, so `busy_timeout` cannot cover the open path. + private static func isTransientOpenError(_ error: any Error) -> Bool { + guard case let PowerSyncError.sqliteError(extendedResultCode, _, _, _, _) = error else { + return false + } + let primary = extendedResultCode & 0xFF + return primary == SQLITE_BUSY || primary == SQLITE_LOCKED + } + + /// Opens and configures all connections of the pool in a single blocking unit of work. + /// One attempt: `RawSqliteConnection` is `~Copyable` and cannot cross the async + /// boundary, so the whole pool is built here and the retry/backoff lives in the async + /// caller (``buildPoolWithRetry(handleUpdates:)``). + fileprivate func buildPool(handleUpdates: @escaping @Sendable (Set) -> Void) throws -> NativeConnectionPool { + let writer = try location.openConnection(writer: true) + try configureConnection(connection: writer, isWriter: true) + + if case .inMemory = location { + return NativeConnectionPool(singleConnection: writer, logger: logger, handleUpdates: handleUpdates) + } + let numReaders = 4 + var readers = RigidDeque(capacity: numReaders) + while !readers.isFull { + let reader = try location.openConnection(writer: false) + try configureConnection(connection: reader, isWriter: false) + readers.append(reader) + } + return NativeConnectionPool(writer: writer, readers: readers, logger: logger, handleUpdates: handleUpdates) + } + + /// Builds the pool, retrying with asynchronous backoff while another process holds the + /// database (apps and their widgets/extensions open concurrently). Awaiting between + /// attempts pins no thread and is cancellable, unlike a blocking sleep. + private func buildPoolWithRetry(handleUpdates: @escaping @Sendable (Set) -> Void) async throws -> NativeConnectionPool { + // ~5s total budget: 10ms doubling to a 250ms cap. Concurrent opens (app + widget) + // resolve in tens of milliseconds; a database still busy after seconds is stuck. + // `Task.sleep(nanoseconds:)` keeps the package's iOS 15 / macOS 12 floor while + // staying async and cancellable. + var delayNanoseconds: UInt64 = 10_000_000 + let deadline = DispatchTime.now() + .seconds(5) + while true { + do { + return try await runBlocking { try self.buildPool(handleUpdates: handleUpdates) } + } catch where Self.isTransientOpenError(error) && DispatchTime.now() < deadline { + logger.debug( + "database busy while opening (another process holds it); retrying in \(delayNanoseconds / 1_000_000)ms", + tag: "AsyncConnectionPool" + ) + try await Task.sleep(nanoseconds: delayNanoseconds) + delayNanoseconds = min(delayNanoseconds * 2, 250_000_000) + } + } + } + /// Opens connections on a background thread to obtain the native connection pool. private func obtainInner() async throws -> NativeConnectionPool { try await opener.obtainPool(pool: self) @@ -137,6 +215,7 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { } func close() async throws { + changeSignal?.stop() try await self.opener.close() } @@ -152,27 +231,16 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol { try registerPowerSyncCoreExtension() let handleUpdates: @Sendable (_: Set) -> () = { [weak context] updates in context?.tableUpdatesStream.dispatch(event: updates) + // Tell other processes sharing this file that tables changed. + context?.changeSignal?.post() } - - let pool = try await context.runBlocking { - let writer = try context.location.openConnection(writer: true) - try context.configureConnection(connection: writer, isWriter: true) - - if case .inMemory = context.location { - return NativeConnectionPool(singleConnection: writer, logger: context.logger, handleUpdates: handleUpdates) - } else { - let numReaders = 4 - var readers = RigidDeque(capacity: numReaders) - while !readers.isFull { - let connection = try context.location.openConnection(writer: false) - try context.configureConnection(connection: connection, isWriter: false) - readers.append(connection) - } - - return NativeConnectionPool(writer: writer, readers: readers, logger: context.logger, handleUpdates: handleUpdates) - } + context.changeSignal?.start { [weak context] in + // Another process (or this one; harmless, throttled downstream) changed + // the database outside this pool's update hooks. + context?.tableUpdatesStream.dispatch(event: [EXTERNAL_CHANGES_MARKER]) } + let pool = try await context.buildPoolWithRetry(handleUpdates: handleUpdates) self.pool = pool return pool } diff --git a/Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift b/Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift new file mode 100644 index 0000000..c06984b --- /dev/null +++ b/Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift @@ -0,0 +1,75 @@ +import Foundation +import notify + +/// Cross-process change signaling for a database file, built on Darwin notifications — +/// the same mechanism Core Data uses for its remote-change notifications. +/// +/// PowerSync's update hooks only observe writes made through the local connection pool. +/// When several processes share a database file (an app and its widgets or App Intents +/// extensions), each process posts this signal after committing a write; the others +/// re-emit their `tableUpdates` with ``EXTERNAL_CHANGES_MARKER`` so `watch` queries +/// re-run and the upload client checks `ps_crud`. +/// +/// Darwin notifications carry no payload and are coalesced under pressure, which is fine: +/// the marker means "something changed, re-query". Deliveries to the posting process +/// itself are not suppressed — a redundant re-query (already throttled) is preferable to +/// the race a sender-stamp suppression scheme introduces, where an external change could +/// be misattributed and silently dropped. +final class CrossProcessChangeSignal: @unchecked Sendable { + private let name: String + private let logger: any LoggerProtocol + private var token: Int32 = NOTIFY_TOKEN_INVALID + private let queue = DispatchQueue(label: "powersync.cross-process-signal") + + init(databasePath: String, logger: any LoggerProtocol) { + // Stable across processes: both sides derive the name from the canonical path. + let canonical = URL(fileURLWithPath: databasePath).standardizedFileURL.path + self.name = "com.powersync.changes.\(Self.fnv1a(canonical))" + self.logger = logger + } + + /// Starts listening; `onExternalChange` runs on a private queue for every signal + /// (including this process's own posts). + func start(onChange: @escaping @Sendable () -> Void) { + guard token == NOTIFY_TOKEN_INVALID else { + return + } + let status = notify_register_dispatch(name, &token, queue) { _ in + onChange() + } + if status != NOTIFY_STATUS_OK { + logger.warning( + "could not register cross-process change signal (status \(status)); " + + "changes from other processes will not wake watch queries", + tag: "CrossProcessChangeSignal" + ) + token = NOTIFY_TOKEN_INVALID + } + } + + /// Posts the signal; called after every committed write. + func post() { + notify_post(name) + } + + func stop() { + if token != NOTIFY_TOKEN_INVALID { + notify_cancel(token) + token = NOTIFY_TOKEN_INVALID + } + } + + deinit { + stop() + } + + /// FNV-1a 64-bit, hex-encoded: deterministic and dependency-free. + private static func fnv1a(_ input: String) -> String { + var hash: UInt64 = 0xCBF2_9CE4_8422_2325 + for byte in input.utf8 { + hash ^= UInt64(byte) + hash = hash &* 0x0000_0100_0000_01B3 + } + return String(hash, radix: 16) + } +} diff --git a/Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift b/Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift index 2a52571..2e68427 100644 --- a/Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift +++ b/Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift @@ -121,9 +121,13 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { func close(deleteDatabase: Bool) async throws { try await close() if deleteDatabase, let dbFilename { - // We can use the supplied dbLocation when we support that in future - let directory = try DatabaseLocation.appleDefaultDatabaseDirectory() - try deleteSQLiteFiles(dbFilename: dbFilename, in: directory) + if dbFilename.hasPrefix("/") { + let url = URL(fileURLWithPath: dbFilename) + try deleteSQLiteFiles(dbFilename: url.lastPathComponent, in: url.deletingLastPathComponent()) + } else { + let directory = try DatabaseLocation.appleDefaultDatabaseDirectory() + try deleteSQLiteFiles(dbFilename: dbFilename, in: directory) + } } } diff --git a/Sources/PowerSync/Implementation/queries/watch.swift b/Sources/PowerSync/Implementation/queries/watch.swift index 00567bb..60e6c51 100644 --- a/Sources/PowerSync/Implementation/queries/watch.swift +++ b/Sources/PowerSync/Implementation/queries/watch.swift @@ -29,6 +29,8 @@ func watchImpl(db: PowerSyncDatabaseImpl, options: WatchOptio let updateNotifications = pool.tableUpdates.filter { changedTables in changedTables.contains(where: watchedTables.contains) + // Another process changed unknown tables: conservatively re-query. + || changedTables.contains(EXTERNAL_CHANGES_MARKER) }.map { _ in () } // Allows emitting the first result even if there aren't changes let withInitial = AsyncAlgorithms.merge([()].async, updateNotifications) diff --git a/Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift b/Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift index 7191f7d..dc108a8 100644 --- a/Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift +++ b/Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift @@ -37,7 +37,9 @@ final class StreamingSyncClient: Sendable { } private func uploadLoop(signals: SyncSignals) async throws { - let updates = db.pool.tableUpdates.filter { updates in updates.contains("ps_crud") }.map { _ in () } + let updates = db.pool.tableUpdates.filter { updates in + updates.contains("ps_crud") || updates.contains(EXTERNAL_CHANGES_MARKER) + }.map { _ in () } let allTriggers = MergeItemSequence(inner: AsyncAlgorithms.merge(updates, signals.signalCrudUpload.subscribe())).makeAsyncIterator() // Use a do-while loop to ensure we start an upload iteration even if we can't connect to the service. diff --git a/Sources/PowerSync/PowerSyncDatabase.swift b/Sources/PowerSync/PowerSyncDatabase.swift index 034b4e9..f9ed966 100644 --- a/Sources/PowerSync/PowerSyncDatabase.swift +++ b/Sources/PowerSync/PowerSyncDatabase.swift @@ -6,7 +6,13 @@ public let DEFAULT_DB_FILENAME = "powersync.db" /// Creates a PowerSyncDatabase instance /// - Parameters: /// - schema: The database schema -/// - dbFilename: The database filename. Defaults to "powersync.db" +/// - dbFilename: The database filename. Defaults to "powersync.db". Plain names are +/// stored in the default databases directory; an absolute path (starting with "/") is +/// used as-is, which allows sharing the database with app extensions through an App +/// Group container. The database itself can be used concurrently from the main app and +/// its extensions, but only the main app should call `connect`. Two sync connections on +/// the same database waste resources and are untested (and could corrupt the sync +/// client); let extensions read and write, and leave syncing to the app. /// - logger: Optional logging interface /// - initialStatements: An optional list of statements to run as the database is opened. /// - Returns: A configured PowerSyncDatabase instance @@ -18,6 +24,8 @@ public func PowerSyncDatabase( ) -> PowerSyncDatabaseProtocol { let (location, group) = if dbFilename == ":memory:" { (DatabaseLocation.inMemory, DatabaseGroupCollection()) + } else if dbFilename.hasPrefix("/") { + (DatabaseLocation.atPath(dbFilename), .shared) } else { (DatabaseLocation.inDefaultDirectory(name: dbFilename), .shared) } diff --git a/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift b/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift index 8edb486..fb36b22 100644 --- a/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift +++ b/Sources/PowerSync/Protocol/SQLiteConnectionPool.swift @@ -57,6 +57,11 @@ public protocol SQLiteStatementIteratorProtocol { /// This is the underlying pool implementation on which the higher-level PowerSync Swift SDK is built on. /// /// This is an internal protocol and should not be implemented outside of the PowerSync SDK. +/// Emitted on `tableUpdates` when another process changed the database. The concrete +/// tables are unknown (cross-process signals carry no payload), so consumers must treat +/// this as potentially matching every table they watch. +let EXTERNAL_CHANGES_MARKER = "__powersync_external_changes__" + public protocol SQLiteConnectionPoolProtocol: Sendable { var tableUpdates: AsyncStream> { get } diff --git a/Tests/PowerSyncTests/Implementation/AbsolutePathTests.swift b/Tests/PowerSyncTests/Implementation/AbsolutePathTests.swift new file mode 100644 index 0000000..ae1320e --- /dev/null +++ b/Tests/PowerSyncTests/Implementation/AbsolutePathTests.swift @@ -0,0 +1,39 @@ +import Foundation +@testable import PowerSync +import Testing + +struct AbsolutePathTests { + /// An absolute `dbFilename` opens the database at that exact path (creating parent + /// directories), persists across instances, and `close(deleteDatabase:)` removes the + /// files there. This is what app extensions need to share a database through an App + /// Group container. + @Test func opensPersistsAndDeletesAtAbsolutePath() async throws { + let schema = Schema(tables: [ + Table(name: "items", columns: [.text("name")]), + ]) + let directory = FileManager.default.temporaryDirectory + .appendingPathComponent("powersync-absolute-\(UUID().uuidString)") + let path = directory.appendingPathComponent("nested/shared.db").path + + let database = PowerSyncDatabase(schema: schema, dbFilename: path, logger: DefaultLogger()) + try await database.disconnectAndClear() + _ = try await database.execute( + sql: "INSERT INTO items (id, name) VALUES (?, ?)", + parameters: ["i1", "shared"] + ) + #expect(FileManager.default.fileExists(atPath: path)) + try await database.close() + + // A second instance over the same absolute path sees the data. + let reopened = PowerSyncDatabase(schema: schema, dbFilename: path, logger: DefaultLogger()) + let name = try await reopened.get(sql: "SELECT name FROM items WHERE id = ?", parameters: ["i1"]) { + try $0.getString(index: 0) + } + #expect(name == "shared") + + try await reopened.close(deleteDatabase: true) + #expect(!FileManager.default.fileExists(atPath: path)) + + try? FileManager.default.removeItem(at: directory) + } +} diff --git a/Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift b/Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift new file mode 100644 index 0000000..441ced9 --- /dev/null +++ b/Tests/PowerSyncTests/Implementation/ConcurrentOpenTests.swift @@ -0,0 +1,128 @@ +import Foundation +@testable import PowerSync +import SQLite3 +import Testing + +/// Opening a database while another process holds it must retry instead of failing. +/// +/// `pragma journal_mode = WAL` needs a moment of exclusive access; when another process +/// (an app while its widget refreshes, a widget while the app launches) holds any lock — +/// or is mid WAL-recovery — SQLite reports SQLITE_BUSY/SQLITE_BUSY_RECOVERY *without* +/// consulting the busy handler, so `busy_timeout` cannot save the open. The pool has to +/// retry with backoff. Reproduced here in-process with a system-SQLite connection holding +/// a reserved lock, which is byte-for-byte the same file-level contention two processes +/// produce. +@Suite("Concurrent open") +struct ConcurrentOpenTests { + /// Sendable wrapper for the lock-holding SQLite connection used across tasks. + private final class LockHolder: @unchecked Sendable { + var connection: OpaquePointer? + func exec(_ sql: String) -> Int32 { sqlite3_exec(connection, sql, nil, nil, nil) } + func close() { sqlite3_close_v2(connection) } + } + + @available(iOS 16, macOS 13, watchOS 9, tvOS 16, *) + @Test(.timeLimit(.minutes(1))) + func openRetriesWhileAnotherConnectionHoldsTheDatabase() async throws { + let path = FileManager.default.temporaryDirectory + .appendingPathComponent("concurrent-open-\(UUID().uuidString).db").path + + // Simulate the other process: a plain SQLite connection (journal mode still + // DELETE) holding a reserved lock when our pool tries to switch to WAL. + let holder = LockHolder() + #expect(sqlite3_open(path, &holder.connection) == SQLITE_OK) + defer { holder.close() } + #expect(holder.exec("CREATE TABLE other(x); BEGIN IMMEDIATE; INSERT INTO other VALUES (1);") == SQLITE_OK) + + // Release the lock shortly after the pool starts opening. + let releaser = Task { + try await Task.sleep(for: .milliseconds(500)) + _ = holder.exec("COMMIT") + } + + let database = PowerSyncDatabase( + schema: Schema(tables: [Table(name: "item", columns: [.text("title")])]), + dbFilename: path, + logger: DefaultLogger(minSeverity: .warning) + ) + // Forces the pool open (pragma journal_mode = WAL) while the lock is held. + _ = try await database.execute("SELECT 1") + let mode = try await database.get(sql: "pragma journal_mode", parameters: []) { + try $0.getString(index: 0) + } + #expect(mode.lowercased() == "wal") + + try await releaser.value + try await database.close(deleteDatabase: true) + } + + @available(iOS 16, macOS 13, watchOS 9, tvOS 16, *) + @Test(.timeLimit(.minutes(1))) + func openStillFailsWhenTheDatabaseNeverFreesUp() async throws { + let path = FileManager.default.temporaryDirectory + .appendingPathComponent("concurrent-open-stuck-\(UUID().uuidString).db").path + + let holder = LockHolder() + #expect(sqlite3_open(path, &holder.connection) == SQLITE_OK) + defer { + _ = holder.exec("COMMIT") + holder.close() + try? FileManager.default.removeItem(atPath: path) + } + #expect(holder.exec("CREATE TABLE other(x); BEGIN IMMEDIATE; INSERT INTO other VALUES (1);") == SQLITE_OK) + + // The lock is never released: the retry budget must eventually surface the error + // (bounded, not an infinite hang). + let database = PowerSyncDatabase( + schema: Schema(tables: [Table(name: "item", columns: [.text("title")])]), + dbFilename: path, + logger: DefaultLogger(minSeverity: .warning) + ) + await #expect(throws: (any Error).self) { + _ = try await database.execute("SELECT 1") + } + try? await database.close() + } + + @available(iOS 16, macOS 13, watchOS 9, tvOS 16, *) + @Test(.timeLimit(.minutes(1))) + func openRetryIsCancellable() async throws { + // The async retry awaits between attempts instead of blocking a thread, so a task + // opening a database that is held by another connection can be cancelled promptly + // rather than waiting out the full retry budget. + let path = FileManager.default.temporaryDirectory + .appendingPathComponent("concurrent-open-cancel-\(UUID().uuidString).db").path + + let holder = LockHolder() + #expect(sqlite3_open(path, &holder.connection) == SQLITE_OK) + defer { + _ = holder.exec("COMMIT") + holder.close() + try? FileManager.default.removeItem(atPath: path) + } + #expect(holder.exec("CREATE TABLE other(x); BEGIN IMMEDIATE; INSERT INTO other VALUES (1);") == SQLITE_OK) + + let database = PowerSyncDatabase( + schema: Schema(tables: [Table(name: "item", columns: [.text("title")])]), + dbFilename: path, + logger: DefaultLogger(minSeverity: .warning) + ) + let opening = Task { + _ = try await database.execute("SELECT 1") + } + // Let a couple of retry attempts happen, then cancel. + try await Task.sleep(for: .milliseconds(100)) + opening.cancel() + + let start = ContinuousClock.now + let result = await opening.result + let elapsed = ContinuousClock.now - start + // Cancellation resolves well before the ~5s retry budget would expire. + #expect(elapsed < .seconds(2)) + if case .success = result { + Issue.record("opening should not have succeeded while the database is held") + } + + try? await database.close() + } +} diff --git a/Tests/PowerSyncTests/Implementation/CrossProcessSignalTests.swift b/Tests/PowerSyncTests/Implementation/CrossProcessSignalTests.swift new file mode 100644 index 0000000..d57ef5f --- /dev/null +++ b/Tests/PowerSyncTests/Implementation/CrossProcessSignalTests.swift @@ -0,0 +1,84 @@ +import Foundation +@testable import PowerSync +import Testing + +/// Changes made by another process must wake `watch` queries. +/// +/// PowerSync's update hooks are per-pool: a write that goes through a different pool +/// (another process — an app's widget or App Intent extension) never reaches this pool's +/// `tableUpdates`, so `watch` (and everything built on it: the SwiftData change observer, +/// the upload trigger) was structurally blind to it. A cross-process Darwin notification +/// posted after every committed write closes the gap. +/// +/// Two `PowerSyncDatabase` instances over the same file inside one test process use two +/// independent pools, reproducing byte-for-byte the blindness two processes exhibit. +@Suite("Cross-process change signal") +struct CrossProcessSignalTests { + private static func makeDatabase(path: String) -> any PowerSyncDatabaseProtocol { + PowerSyncDatabase( + schema: Schema(tables: [Table(name: "item", columns: [.text("title")])]), + dbFilename: path, + logger: DefaultLogger(minSeverity: .warning) + ) + } + + @available(iOS 16, macOS 13, watchOS 9, tvOS 16, *) + @Test(.timeLimit(.minutes(1))) + func watchWakesUpForWritesFromAnotherPool() async throws { + let path = FileManager.default.temporaryDirectory + .appendingPathComponent("cross-signal-\(UUID().uuidString).db").path + + let observerSide = Self.makeDatabase(path: path) + let writerSide = Self.makeDatabase(path: path) + + // Collect watch emissions on the observer side. + let counts = try observerSide.watch( + sql: "SELECT COUNT(*) FROM item", + parameters: [] + ) { try $0.getInt(index: 0) } + var iterator = counts.makeAsyncIterator() + let initial = try await iterator.next() + #expect(initial == [0]) + + // A write through the OTHER pool: same file, different update hooks. + _ = try await writerSide.execute( + sql: "INSERT INTO item (id, title) VALUES (uuid(), ?)", + parameters: ["externa"] + ) + + // Without the cross-process signal this hangs until the time limit. + let afterExternalWrite = try await iterator.next() + #expect(afterExternalWrite == [1]) + + try await observerSide.close() + try await writerSide.close(deleteDatabase: true) + } + + @available(iOS 16, macOS 13, watchOS 9, tvOS 16, *) + @Test(.timeLimit(.minutes(1))) + func externalChangeMarkerMatchesEveryWatchedTable() async throws { + // The marker says "unknown tables changed"; a watch over any table must re-query. + let path = FileManager.default.temporaryDirectory + .appendingPathComponent("cross-signal-marker-\(UUID().uuidString).db").path + + let observerSide = Self.makeDatabase(path: path) + let writerSide = Self.makeDatabase(path: path) + + // Two watches over different shapes (table scan and aggregate) both wake up. + let titles = try observerSide.watch( + sql: "SELECT title FROM item ORDER BY title", + parameters: [] + ) { try $0.getString(index: 0) } + var titlesIterator = titles.makeAsyncIterator() + _ = try await titlesIterator.next() + + _ = try await writerSide.execute( + sql: "INSERT INTO item (id, title) VALUES (uuid(), ?)", + parameters: ["uno"] + ) + #expect(try await titlesIterator.next() == [["uno"]].first) + + try await observerSide.close() + try await writerSide.close(deleteDatabase: true) + } +} diff --git a/Tests/PowerSyncTests/SyncTests.swift b/Tests/PowerSyncTests/SyncTests.swift index 322b6e6..2521bca 100644 --- a/Tests/PowerSyncTests/SyncTests.swift +++ b/Tests/PowerSyncTests/SyncTests.swift @@ -236,9 +236,9 @@ class InMemorySyncIntegrationTests { @Test @MainActor func uploadsOfflineWrites() async throws { let channel = AsyncThrowingChannel() - var allowConnection = false - let mockClient = MockHttpClient { @MainActor request in - if allowConnection { + let allowConnection = Mutex(false) + let mockClient = MockHttpClient { _ in + if allowConnection.withLock({ $0 }) { return channel } throw PowerSyncError.operationFailed(message: "Fake IO error for test", underlyingError: nil) @@ -254,7 +254,7 @@ class InMemorySyncIntegrationTests { var query = try db.watch("SELECT name FROM users") { try $0.getString(index: 0) }.makeAsyncIterator() try #require(try await query.next() == ["local write"]) - allowConnection = true + allowConnection.withLock { $0 = true } try await channel.pushLine(.fullCheckpoint(Checkpoint(last_op_id: "1", buckets: [BucketChecksum(bucket: "a", checksum: 0)], writeCheckpoint: "1"))) try await channel.pushLine(.syncDataBucket(SyncDataBucket(bucket: "a", data: [OplogEntry( checksum: 0,