Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Unreleased

* `PowerSyncDatabase(dbFilename:)` now accepts an absolute path (starting with `/`), used
as-is so the database can live in an App Group container shared with app extensions.
Plain filenames keep the existing behavior. The SDK coordinates opening the database to
avoid conflicts and can share update notifications across the main app and extensions.

## 1.14.4

- Fix crash when running a statement in a cursor callback ([#148](https://github.com/powersync-ja/powersync-swift/issues/148)).
Expand Down
152 changes: 110 additions & 42 deletions Sources/PowerSync/Implementation/AsyncConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,49 @@ import Foundation
enum DatabaseLocation {
case inMemory
case inDefaultDirectory(name: String)

case atPath(String)

/// The on-disk path other processes can share, or `nil` when the database can't be
/// shared. Only absolute paths (typically an App Group container) are shareable; the
/// default directory is inside the app's own sandbox, which extensions cannot reach.
var sharedPath: String? {
switch self {
case .inMemory, .inDefaultDirectory:
return nil
case let .atPath(path):
return path
}
}

func openConnection(writer: Bool) throws -> RawSqliteConnection {
var db: OpaquePointer?
let rc: Int32
let path: String

switch self {
case .inMemory:
path = ":memory:"
rc = sqlite3_open_v2(path, &db, SQLITE_OPEN_READWRITE, nil)
case .inDefaultDirectory(let name):
let fileManager = FileManager.default
let databaseDirectory = (try DatabaseLocation.appleDefaultDatabaseDirectory()).path

if !fileManager.fileExists(atPath: databaseDirectory) {
try fileManager.createDirectory(atPath: databaseDirectory, withIntermediateDirectories: true)
}
return try DatabaseLocation.open(path: ":memory:", flags: SQLITE_OPEN_READWRITE)
case let .inDefaultDirectory(name):
let directory = (try DatabaseLocation.appleDefaultDatabaseDirectory()).path
return try DatabaseLocation.openFile(at: "\(directory)/\(name)", in: directory, writer: writer)
case let .atPath(absolutePath):
let directory = (absolutePath as NSString).deletingLastPathComponent
return try DatabaseLocation.openFile(at: absolutePath, in: directory, writer: writer)
}
}

path = "\(databaseDirectory)/\(name)"
let flags = if writer {
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE
} else {
SQLITE_OPEN_READONLY
}
rc = sqlite3_open_v2(path, &db, flags, nil)
/// Creates `directory` if needed, then opens the database file with the right flags.
private static func openFile(at path: String, in directory: String, writer: Bool) throws -> RawSqliteConnection {
let fileManager = FileManager.default
if !fileManager.fileExists(atPath: directory) {
try fileManager.createDirectory(atPath: directory, withIntermediateDirectories: true)
}
let flags = writer ? (SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE) : SQLITE_OPEN_READONLY
return try open(path: path, flags: flags)
}

private static func open(path: String, flags: Int32) throws -> RawSqliteConnection {
var db: OpaquePointer?
let rc = sqlite3_open_v2(path, &db, flags, nil)
if rc != 0 {
throw PowerSyncError.sqliteError(extendedResultCode: rc, offset: nil, message: "Could not open database \(path)", errorString: nil, sql: nil)
}

return RawSqliteConnection(connection: db!)
}

Expand All @@ -51,7 +63,8 @@ enum DatabaseLocation {
throw PowerSyncError.operationFailed(message: "Unable to find application support directory")
}

return documentsDirectory.appendingPathComponent("databases")
// `isDirectory: true` avoids a blocking stat to infer the URL kind.
return documentsDirectory.appendingPathComponent("databases", isDirectory: true)
}
}

Expand All @@ -62,11 +75,16 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
private let logger: any LoggerProtocol
private let tableUpdatesStream = BroadcastStream<Set<String>>()
private let opener = PoolOpener()
/// Cross-process change signaling; `nil` for in-memory databases (nothing to share).
private let changeSignal: CrossProcessChangeSignal?
Comment thread
simolus3 marked this conversation as resolved.

init(location: DatabaseLocation, logger: any LoggerProtocol, initialStatements: [String] = []) {
self.location = location
self.logger = logger
self.initialStatements = initialStatements
self.changeSignal = location.sharedPath.map {
CrossProcessChangeSignal(databasePath: $0, logger: logger)
}
}

var tableUpdates: AsyncStream<Set<String>> {
Expand All @@ -88,12 +106,15 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
let _ = try context.execute(sql: stmt, parameters: [])
}

// The busy handler is installed first so later statements wait instead of failing,
// but note it does NOT apply to the WAL transition below.
let _ = try context.execute(sql: "pragma busy_timeout = 30000", parameters: [])

if isWriter {
let _ = try context.execute(sql: "pragma journal_mode = WAL", parameters: [])
}

let _ = try context.execute(sql: "pragma journal_size_limit = \(6 * 1024 * 1024)", parameters: [])
let _ = try context.execute(sql: "pragma busy_timeout = 30000", parameters: [])
let _ = try context.execute(sql: "pragma cache_size = -\(50 * 1024)", parameters: [])

if isWriter {
Expand All @@ -110,6 +131,63 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
}
}

/// Whether an error from opening/configuring a connection is transient contention
/// (another process holds the file, e.g. mid WAL-recovery) and worth retrying.
/// `pragma journal_mode = WAL` reports SQLITE_BUSY/SQLITE_BUSY_RECOVERY without
/// consulting the busy handler, so `busy_timeout` cannot cover the open path.
private static func isTransientOpenError(_ error: any Error) -> Bool {
guard case let PowerSyncError.sqliteError(extendedResultCode, _, _, _, _) = error else {
return false
}
let primary = extendedResultCode & 0xFF
return primary == SQLITE_BUSY || primary == SQLITE_LOCKED
}

/// Opens and configures all connections of the pool in a single blocking unit of work.
/// One attempt: `RawSqliteConnection` is `~Copyable` and cannot cross the async
/// boundary, so the whole pool is built here and the retry/backoff lives in the async
/// caller (``buildPoolWithRetry(handleUpdates:)``).
fileprivate func buildPool(handleUpdates: @escaping @Sendable (Set<String>) -> Void) throws -> NativeConnectionPool {
let writer = try location.openConnection(writer: true)
try configureConnection(connection: writer, isWriter: true)

if case .inMemory = location {
return NativeConnectionPool(singleConnection: writer, logger: logger, handleUpdates: handleUpdates)
}
let numReaders = 4
var readers = RigidDeque<RawSqliteConnection>(capacity: numReaders)
while !readers.isFull {
let reader = try location.openConnection(writer: false)
try configureConnection(connection: reader, isWriter: false)
readers.append(reader)
}
return NativeConnectionPool(writer: writer, readers: readers, logger: logger, handleUpdates: handleUpdates)
}

/// Builds the pool, retrying with asynchronous backoff while another process holds the
/// database (apps and their widgets/extensions open concurrently). Awaiting between
/// attempts pins no thread and is cancellable, unlike a blocking sleep.
private func buildPoolWithRetry(handleUpdates: @escaping @Sendable (Set<String>) -> Void) async throws -> NativeConnectionPool {
// ~5s total budget: 10ms doubling to a 250ms cap. Concurrent opens (app + widget)
// resolve in tens of milliseconds; a database still busy after seconds is stuck.
// `Task.sleep(nanoseconds:)` keeps the package's iOS 15 / macOS 12 floor while
// staying async and cancellable.
var delayNanoseconds: UInt64 = 10_000_000
let deadline = DispatchTime.now() + .seconds(5)
while true {
do {
return try await runBlocking { try self.buildPool(handleUpdates: handleUpdates) }
} catch where Self.isTransientOpenError(error) && DispatchTime.now() < deadline {
logger.debug(
"database busy while opening (another process holds it); retrying in \(delayNanoseconds / 1_000_000)ms",
tag: "AsyncConnectionPool"
)
try await Task.sleep(nanoseconds: delayNanoseconds)
delayNanoseconds = min(delayNanoseconds * 2, 250_000_000)
}
}
}

/// Opens connections on a background thread to obtain the native connection pool.
private func obtainInner() async throws -> NativeConnectionPool {
try await opener.obtainPool(pool: self)
Expand Down Expand Up @@ -137,6 +215,7 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
}

func close() async throws {
changeSignal?.stop()
try await self.opener.close()
}

Expand All @@ -152,27 +231,16 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
try registerPowerSyncCoreExtension()
let handleUpdates: @Sendable (_: Set<String>) -> () = { [weak context] updates in
context?.tableUpdatesStream.dispatch(event: updates)
// Tell other processes sharing this file that tables changed.
context?.changeSignal?.post()
}

let pool = try await context.runBlocking {
let writer = try context.location.openConnection(writer: true)
try context.configureConnection(connection: writer, isWriter: true)

if case .inMemory = context.location {
return NativeConnectionPool(singleConnection: writer, logger: context.logger, handleUpdates: handleUpdates)
} else {
let numReaders = 4
var readers = RigidDeque<RawSqliteConnection>(capacity: numReaders)
while !readers.isFull {
let connection = try context.location.openConnection(writer: false)
try context.configureConnection(connection: connection, isWriter: false)
readers.append(connection)
}

return NativeConnectionPool(writer: writer, readers: readers, logger: context.logger, handleUpdates: handleUpdates)
}
context.changeSignal?.start { [weak context] in
// Another process (or this one; harmless, throttled downstream) changed
// the database outside this pool's update hooks.
context?.tableUpdatesStream.dispatch(event: [EXTERNAL_CHANGES_MARKER])
}

let pool = try await context.buildPoolWithRetry(handleUpdates: handleUpdates)
self.pool = pool
return pool
}
Expand Down
75 changes: 75 additions & 0 deletions Sources/PowerSync/Implementation/CrossProcessChangeSignal.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import Foundation
import notify

/// Cross-process change signaling for a database file, built on Darwin notifications —
/// the same mechanism Core Data uses for its remote-change notifications.
///
/// PowerSync's update hooks only observe writes made through the local connection pool.
/// When several processes share a database file (an app and its widgets or App Intents
/// extensions), each process posts this signal after committing a write; the others
/// re-emit their `tableUpdates` with ``EXTERNAL_CHANGES_MARKER`` so `watch` queries
/// re-run and the upload client checks `ps_crud`.
///
/// Darwin notifications carry no payload and are coalesced under pressure, which is fine:
/// the marker means "something changed, re-query". Deliveries to the posting process
/// itself are not suppressed — a redundant re-query (already throttled) is preferable to
/// the race a sender-stamp suppression scheme introduces, where an external change could
/// be misattributed and silently dropped.
final class CrossProcessChangeSignal: @unchecked Sendable {
private let name: String
private let logger: any LoggerProtocol
private var token: Int32 = NOTIFY_TOKEN_INVALID
private let queue = DispatchQueue(label: "powersync.cross-process-signal")

init(databasePath: String, logger: any LoggerProtocol) {
// Stable across processes: both sides derive the name from the canonical path.
let canonical = URL(fileURLWithPath: databasePath).standardizedFileURL.path
self.name = "com.powersync.changes.\(Self.fnv1a(canonical))"
self.logger = logger
}

/// Starts listening; `onExternalChange` runs on a private queue for every signal
/// (including this process's own posts).
func start(onChange: @escaping @Sendable () -> Void) {
guard token == NOTIFY_TOKEN_INVALID else {
return
}
let status = notify_register_dispatch(name, &token, queue) { _ in
onChange()
}
if status != NOTIFY_STATUS_OK {
logger.warning(
"could not register cross-process change signal (status \(status)); "
+ "changes from other processes will not wake watch queries",
tag: "CrossProcessChangeSignal"
)
token = NOTIFY_TOKEN_INVALID
}
}

/// Posts the signal; called after every committed write.
func post() {
notify_post(name)
}

func stop() {
if token != NOTIFY_TOKEN_INVALID {
notify_cancel(token)
token = NOTIFY_TOKEN_INVALID
}
}

deinit {
stop()
}

/// FNV-1a 64-bit, hex-encoded: deterministic and dependency-free.
private static func fnv1a(_ input: String) -> String {
var hash: UInt64 = 0xCBF2_9CE4_8422_2325
for byte in input.utf8 {
hash ^= UInt64(byte)
hash = hash &* 0x0000_0100_0000_01B3
}
return String(hash, radix: 16)
}
}
10 changes: 7 additions & 3 deletions Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,13 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
func close(deleteDatabase: Bool) async throws {
try await close()
if deleteDatabase, let dbFilename {
// We can use the supplied dbLocation when we support that in future
let directory = try DatabaseLocation.appleDefaultDatabaseDirectory()
try deleteSQLiteFiles(dbFilename: dbFilename, in: directory)
if dbFilename.hasPrefix("/") {
let url = URL(fileURLWithPath: dbFilename)
try deleteSQLiteFiles(dbFilename: url.lastPathComponent, in: url.deletingLastPathComponent())
} else {
let directory = try DatabaseLocation.appleDefaultDatabaseDirectory()
try deleteSQLiteFiles(dbFilename: dbFilename, in: directory)
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions Sources/PowerSync/Implementation/queries/watch.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func watchImpl<RowType: Sendable>(db: PowerSyncDatabaseImpl, options: WatchOptio

let updateNotifications = pool.tableUpdates.filter { changedTables in
changedTables.contains(where: watchedTables.contains)
// Another process changed unknown tables: conservatively re-query.
|| changedTables.contains(EXTERNAL_CHANGES_MARKER)
}.map { _ in () }
// Allows emitting the first result even if there aren't changes
let withInitial = AsyncAlgorithms.merge([()].async, updateNotifications)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ final class StreamingSyncClient: Sendable {
}

private func uploadLoop(signals: SyncSignals) async throws {
let updates = db.pool.tableUpdates.filter { updates in updates.contains("ps_crud") }.map { _ in () }
let updates = db.pool.tableUpdates.filter { updates in
updates.contains("ps_crud") || updates.contains(EXTERNAL_CHANGES_MARKER)
}.map { _ in () }
let allTriggers = MergeItemSequence(inner: AsyncAlgorithms.merge(updates, signals.signalCrudUpload.subscribe())).makeAsyncIterator()

// Use a do-while loop to ensure we start an upload iteration even if we can't connect to the service.
Expand Down
10 changes: 9 additions & 1 deletion Sources/PowerSync/PowerSyncDatabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ public let DEFAULT_DB_FILENAME = "powersync.db"
/// Creates a PowerSyncDatabase instance
/// - Parameters:
/// - schema: The database schema
/// - dbFilename: The database filename. Defaults to "powersync.db"
/// - dbFilename: The database filename. Defaults to "powersync.db". Plain names are
/// stored in the default databases directory; an absolute path (starting with "/") is
/// used as-is, which allows sharing the database with app extensions through an App
Comment thread
simolus3 marked this conversation as resolved.
/// Group container. The database itself can be used concurrently from the main app and
/// its extensions, but only the main app should call `connect`. Two sync connections on
/// the same database waste resources and are untested (and could corrupt the sync
/// client); let extensions read and write, and leave syncing to the app.
/// - logger: Optional logging interface
/// - initialStatements: An optional list of statements to run as the database is opened.
/// - Returns: A configured PowerSyncDatabase instance
Expand All @@ -18,6 +24,8 @@ public func PowerSyncDatabase(
) -> PowerSyncDatabaseProtocol {
let (location, group) = if dbFilename == ":memory:" {
(DatabaseLocation.inMemory, DatabaseGroupCollection())
} else if dbFilename.hasPrefix("/") {
(DatabaseLocation.atPath(dbFilename), .shared)
} else {
(DatabaseLocation.inDefaultDirectory(name: dbFilename), .shared)
}
Expand Down
5 changes: 5 additions & 0 deletions Sources/PowerSync/Protocol/SQLiteConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public protocol SQLiteStatementIteratorProtocol {
/// This is the underlying pool implementation on which the higher-level PowerSync Swift SDK is built on.
///
/// This is an internal protocol and should not be implemented outside of the PowerSync SDK.
/// Emitted on `tableUpdates` when another process changed the database. The concrete
/// tables are unknown (cross-process signals carry no payload), so consumers must treat
/// this as potentially matching every table they watch.
let EXTERNAL_CHANGES_MARKER = "__powersync_external_changes__"

public protocol SQLiteConnectionPoolProtocol: Sendable {
var tableUpdates: AsyncStream<Set<String>> { get }

Expand Down
Loading