diff --git a/packages/powersync/lib/src/web/sync_controller.dart b/packages/powersync/lib/src/web/sync_controller.dart index 342e1cac..aa8bf88a 100644 --- a/packages/powersync/lib/src/web/sync_controller.dart +++ b/packages/powersync/lib/src/web/sync_controller.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:js_interop'; +import 'package:meta/meta.dart'; import 'package:powersync/src/web/worker_utils.dart'; import 'package:sqlite_async/web.dart'; import 'package:web/web.dart'; @@ -21,17 +22,20 @@ class SyncWorkerHandle implements StreamingSync { final StreamController _status = StreamController.broadcast(); - SyncWorkerHandle._({ + @visibleForTesting + SyncWorkerHandle({ required this.database, required this.connector, required this.options, required MessagePort sendToWorker, - required SharedWorker worker, + required SharedWorker? worker, required this.subscriptions, }) { _channel = WorkerCommunicationChannel( port: sendToWorker, - errors: EventStreamProviders.errorEvent.forTarget(worker), + errors: worker != null + ? EventStreamProviders.errorEvent.forTarget(worker) + : null, logger: database.logger, requestHandler: (type, payload) async { switch (type) { @@ -102,7 +106,7 @@ class SyncWorkerHandle implements StreamingSync { [port2].toJS, ); - final handle = SyncWorkerHandle._( + final handle = SyncWorkerHandle( options: options, database: database, connector: connector, @@ -119,7 +123,12 @@ class SyncWorkerHandle implements StreamingSync { Future close() async { await abort(); - await _channel.close(); + await closeChannel(); + } + + @visibleForTesting + Future closeChannel() { + return _channel.close(); } @override diff --git a/packages/powersync/lib/src/web/sync_worker.dart b/packages/powersync/lib/src/web/sync_worker.dart index bdc68996..df225c08 100644 --- a/packages/powersync/lib/src/web/sync_worker.dart +++ b/packages/powersync/lib/src/web/sync_worker.dart @@ -1,6 +1,7 @@ /// This file needs to be compiled to JavaScript with the command /// dart compile js -O4 packages/powersync/lib/src/web/sync_worker.dart -o assets/powersync_sync.worker.js /// The output should then be included in each project's `web` directory +@internal library; import 'dart:async'; @@ -11,6 +12,7 @@ import 'package:async/async.dart'; import 'package:collection/collection.dart'; import 'package:http/browser_client.dart'; import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; import 'package:powersync/powersync.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:powersync/src/sync/internal_connector.dart'; @@ -26,20 +28,21 @@ import 'web_bucket_storage.dart'; final _logger = autoLogger; class SyncWorker { - final Map _requestedSyncTasks = {}; + @visibleForTesting + final Map requestedSyncTasks = {}; void trackPort(MessagePort port) { - _ConnectedClient(port, this); + ConnectedClient(port, this); } - _SyncRunner _referenceSyncTask( + SyncRunner _referenceSyncTask( String databaseIdentifier, SyncOptions options, String schemaJson, List subscriptions, - _ConnectedClient client) { - return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () { - return _SyncRunner(databaseIdentifier); + ConnectedClient client) { + return requestedSyncTasks.putIfAbsent(databaseIdentifier, () { + return SyncRunner(databaseIdentifier); }) ..registerClient( client, @@ -50,20 +53,23 @@ class SyncWorker { } } -class _ConnectedClient { +@visibleForTesting +class ConnectedClient { late WorkerCommunicationChannel channel; final SyncWorker _worker; - _SyncRunner? _runner; + SyncRunner? _runner; StreamSubscription? _logSubscription; - _ConnectedClient(MessagePort port, this._worker) { + ConnectedClient(MessagePort port, this._worker) { channel = WorkerCommunicationChannel( port: port, requestHandler: (type, payload) async { switch (type) { case SyncWorkerMessageType.startSynchronization: final request = payload as StartSynchronization; + channel.observeRemoteLockName(request.lockName); + final recoveredOptions = SyncOptions( crudThrottleTime: Duration(milliseconds: request.crudThrottleTimeMs), @@ -108,6 +114,7 @@ class _ConnectedClient { } }, ); + channel.closed.whenComplete(markClosed); _logSubscription = _logger.onRecord.listen((record) { final msg = StringBuffer( @@ -139,7 +146,8 @@ class _ConnectedClient { } } -class _SyncRunner { +@visibleForTesting +class SyncRunner { final String identifier; ResolvedSyncOptions options = ResolvedSyncOptions(SyncOptions()); String schemaJson = '{}'; @@ -149,11 +157,11 @@ class _SyncRunner { StreamingSyncImplementation? sync; SyncStatus? _lastSyncStatus; - _ConnectedClient? databaseHost; - final connections = <_ConnectedClient, List>{}; + ConnectedClient? databaseHost; + final connections = >{}; List currentStreams = []; - _SyncRunner(this.identifier) { + SyncRunner(this.identifier) { _group.add(_mainEvents.stream); Future(() async { @@ -186,32 +194,18 @@ class _SyncRunner { if (_lastSyncStatus case final status?) { client.sendSyncStatus(SerializedSyncStatus.from(status)); } - case _RemoveConnection(:final client): connections.remove(client); if (connections.isEmpty) { await sync?.abort(); sync = null; + } else if (client == databaseHost) { + await _activeClientHasClosed(); } case _DisconnectClient(:final client): connections.remove(client); await sync?.abort(); sync = null; - case _ActiveDatabaseClosed(): - _logger.info('Remote database closed, finding a new client'); - sync?.abort(); - sync = null; - - // The only reliable notification we get for a client closing is - // when that client is currently hosting the database. Use the - // opportunity to check whether secondary clients have also closed - // in the meantime. - final newHost = await _collectActiveClients(); - if (newHost == null) { - _logger.info('No client remains'); - } else { - await _requestDatabase(newHost); - } case _ClientSubscriptionsChanged( :final client, :final subscriptions @@ -226,6 +220,19 @@ class _SyncRunner { }); } + Future _activeClientHasClosed() async { + _logger.info('Remote database closed, finding a new client'); + await sync?.abort(); + sync = null; + + final newHost = await _collectActiveClients(); + if (newHost == null) { + _logger.info('No client remains'); + } else { + await _requestDatabase(newHost); + } + } + /// Updates [currentStreams] to the union of values in [connections]. void reindexSubscriptions() { final before = currentStreams.toSet(); @@ -242,13 +249,13 @@ class _SyncRunner { /// (as they are likely closed tabs as well). /// /// Returns the first client that responds (without waiting for others). - Future<_ConnectedClient?> _collectActiveClients() async { + Future _collectActiveClients() async { final candidates = connections.keys.toList(); if (candidates.isEmpty) { return null; } - final firstResponder = Completer<_ConnectedClient?>(); + final firstResponder = Completer(); var pendingRequests = candidates.length; for (final candidate in candidates) { @@ -270,7 +277,7 @@ class _SyncRunner { return firstResponder.future; } - Future _requestDatabase(_ConnectedClient client) async { + Future _requestDatabase(ConnectedClient client) async { _logger.info('Sync setup: Requesting database'); // This is the first client, ask for a database connection @@ -287,12 +294,6 @@ class _SyncRunner { database.closedFuture.then((_) { _logger.fine('Detected closed client'); client.markClosed(); - - if (client == databaseHost) { - _logger - .info('Tab providing sync database has gone down, reconnecting...'); - _mainEvents.add(const _ActiveDatabaseClosed()); - } }); final tables = ['ps_crud']; @@ -336,23 +337,23 @@ class _SyncRunner { sync!.streamingSync(); } - void registerClient(_ConnectedClient client, SyncOptions options, + void registerClient(ConnectedClient client, SyncOptions options, String schemaJson, List subscriptions) { _mainEvents.add(_AddConnection(client, options, schemaJson, subscriptions)); } /// Remove a client, disconnecting if no clients remain.. - void unregisterClient(_ConnectedClient client) { + void unregisterClient(ConnectedClient client) { _mainEvents.add(_RemoveConnection(client)); } /// Remove a client, and immediately disconnect. - void disconnectClient(_ConnectedClient client) { + void disconnectClient(ConnectedClient client) { _mainEvents.add(_DisconnectClient(client)); } void updateClientSubscriptions( - _ConnectedClient client, List subscriptions) { + ConnectedClient client, List subscriptions) { _mainEvents.add(_ClientSubscriptionsChanged(client, subscriptions)); } } @@ -360,7 +361,7 @@ class _SyncRunner { sealed class _RunnerEvent {} final class _AddConnection implements _RunnerEvent { - final _ConnectedClient client; + final ConnectedClient client; final SyncOptions options; final String schemaJson; final List subscriptions; @@ -370,24 +371,20 @@ final class _AddConnection implements _RunnerEvent { } final class _RemoveConnection implements _RunnerEvent { - final _ConnectedClient client; + final ConnectedClient client; _RemoveConnection(this.client); } final class _DisconnectClient implements _RunnerEvent { - final _ConnectedClient client; + final ConnectedClient client; _DisconnectClient(this.client); } final class _ClientSubscriptionsChanged implements _RunnerEvent { - final _ConnectedClient client; + final ConnectedClient client; final List subscriptions; _ClientSubscriptionsChanged(this.client, this.subscriptions); } - -final class _ActiveDatabaseClosed implements _RunnerEvent { - const _ActiveDatabaseClosed(); -} diff --git a/packages/powersync/lib/src/web/sync_worker_protocol.dart b/packages/powersync/lib/src/web/sync_worker_protocol.dart index 727d8eb3..ff10663e 100644 --- a/packages/powersync/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync/lib/src/web/sync_worker_protocol.dart @@ -1,15 +1,17 @@ import 'dart:async'; import 'dart:convert'; import 'dart:js_interop'; +import 'dart:js_interop_unsafe'; import 'package:logging/logging.dart'; import 'package:powersync/src/schema.dart'; import 'package:powersync/src/sync/options.dart'; import 'package:powersync/src/sync/stream.dart'; -import 'package:web/web.dart'; +import 'package:web/web.dart' hide HttpRequest, Client; import '../connector.dart'; import '../log.dart'; +import '../platform_specific/web.dart'; import '../sync/streaming_sync.dart'; import '../sync/sync_status.dart'; @@ -78,6 +80,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { required int retryDelayMs, required String implementationName, required String schemaJson, + required String lockName, String? syncParamsEncoded, UpdateSubscriptions? subscriptions, String? appMetadataEncoded, @@ -92,6 +95,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { external String? get syncParamsEncoded; external UpdateSubscriptions? get subscriptions; external String? get appMetadataEncoded; + external String get lockName; } @anonymous @@ -341,11 +345,17 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { final class WorkerCommunicationChannel { final Map> _pendingRequests = {}; + final Completer _closed = Completer(); int _nextRequestId = 0; bool _hasError = false; + bool _hasReceivedRemoteLockName = false; StreamSubscription? _incomingMessages; StreamSubscription? _incomingErrors; + /// The name of a navigator lock held by this channel, it's sent to the remote + /// when requested so that it can detect when this channel is closed. + late final Future _lockName = _acquireLock(); + final MessagePort port; final FutureOr<(JSAny?, JSArray?)> Function(SyncWorkerMessageType, JSAny) requestHandler; @@ -355,6 +365,8 @@ final class WorkerCommunicationChannel { Stream<(SyncWorkerMessageType, JSAny)> get events => _events.stream; + Future get closed => _closed.future; + WorkerCommunicationChannel({ required this.port, required this.requestHandler, @@ -382,11 +394,7 @@ final class WorkerCommunicationChannel { switch (type) { case SyncWorkerMessageType.ping: requestId = (message.payload as JSNumber).toDartInt; - port.postMessage(SyncWorkerMessage( - type: SyncWorkerMessageType.okResponse.name, - payload: OkResponse(requestId: requestId, payload: null), - )); - return; + return _respond(requestId, () async => (null, null)); case SyncWorkerMessageType.startSynchronization: requestId = (message.payload as StartSynchronization).requestId; case SyncWorkerMessageType.updateSubscriptions: @@ -416,31 +424,35 @@ final class WorkerCommunicationChannel { return; } - try { - final (response, transfer) = - await requestHandler(type, message.payload); - final responseMessage = SyncWorkerMessage( - type: SyncWorkerMessageType.okResponse.name, - payload: OkResponse(requestId: requestId, payload: response), - ); - - if (transfer != null) { - port.postMessage(responseMessage, transfer); - } else { - port.postMessage(responseMessage); - } - } catch (e) { - port.postMessage(SyncWorkerMessage( - type: SyncWorkerMessageType.errorResponse.name, - payload: ErrorResponse( - requestId: requestId, errorMessage: e.toString().toJS), - )); - } + await _respond(requestId, () => requestHandler(type, message.payload)); }); } + Future _respond(int requestId, + FutureOr<(JSAny?, JSArray?)> Function() generateResponse) async { + try { + final (response, transfer) = await generateResponse(); + final responseMessage = SyncWorkerMessage( + type: SyncWorkerMessageType.okResponse.name, + payload: OkResponse(requestId: requestId, payload: response), + ); + + if (transfer != null) { + port.postMessage(responseMessage, transfer); + } else { + port.postMessage(responseMessage); + } + } catch (e) { + port.postMessage(SyncWorkerMessage( + type: SyncWorkerMessageType.errorResponse.name, + payload: ErrorResponse( + requestId: requestId, errorMessage: e.toString().toJS), + )); + } + } + (int, Future) _newRequest() { - if (_hasError) { + if (_hasError || _closed.isCompleted) { throw StateError('Channel has error, cannot send new requests'); } @@ -464,6 +476,14 @@ final class WorkerCommunicationChannel { await _numericRequest(SyncWorkerMessageType.ping); } + void observeRemoteLockName(String name) { + if (!_hasReceivedRemoteLockName) { + _hasReceivedRemoteLockName = true; + // Once we're able to acquire this lock, we know the remote has closed. + potentiallySharedMutex(name).lock(close); + } + } + Future startSynchronization( String databaseName, ResolvedSyncOptions options, @@ -489,6 +509,7 @@ final class WorkerCommunicationChannel { null => null, final appMetadata => jsonEncode(appMetadata), }, + lockName: await _lockName, ), )); await completion; @@ -534,8 +555,42 @@ final class WorkerCommunicationChannel { } Future close() async { - _incomingMessages?.cancel(); - _incomingErrors?.cancel(); - port.close(); + if (!_closed.isCompleted) { + _incomingMessages?.cancel(); + _incomingErrors?.cancel(); + port.close(); + + for (final pending in _pendingRequests.values) { + pending.completeError(const ChannelClosedException()); + } + + _closed.complete(); + } + } + + Future _acquireLock() async { + final name = _generateRandomLockName(); + final hasLock = Completer.sync(); + potentiallySharedMutex(name).lock(() async { + hasLock.complete(); + return _closed.future; + }); + + await hasLock.future; + return name; + } + + static String _generateRandomLockName() { + final crypto = (globalContext['crypto'] as Crypto); + return 'powersync-worker-keepalive-${crypto.randomUUID()}'; + } +} + +final class ChannelClosedException implements Exception { + const ChannelClosedException(); + + @override + String toString() { + return 'Worker communication channel closed'; } } diff --git a/packages/powersync/test/web/sync_worker_test.dart b/packages/powersync/test/web/sync_worker_test.dart new file mode 100644 index 00000000..b9f03066 --- /dev/null +++ b/packages/powersync/test/web/sync_worker_test.dart @@ -0,0 +1,114 @@ +@TestOn('browser') +library; + +import 'dart:async'; + +import 'package:logging/logging.dart'; +import 'package:powersync/powersync.dart'; +import 'package:powersync/src/sync/streaming_sync.dart'; +import 'package:powersync/src/web/sync_controller.dart'; +import 'package:powersync/src/web/sync_worker.dart'; +import 'package:test/test.dart'; +import 'package:web/web.dart' show MessageChannel; + +import '../utils/web_test_utils.dart'; + +void main() { + late final TestUtils utils; + late SyncWorker syncWorker; + late PowerSyncDatabase db; + + setUpAll(() async { + utils = TestUtils(); + }); + + setUp(() async { + syncWorker = SyncWorker(); + + db = await utils.setupPowerSync(logger: Logger.detached('test_logger')); + await db.initialize(); + }); + + SyncWorkerHandle createWorkerHandle({ + required PowerSyncBackendConnector connector, + SyncOptions options = const SyncOptions(), + List subscriptions = const [], + }) { + final rawChannel = MessageChannel(); + syncWorker.trackPort(rawChannel.port1); + + final handle = SyncWorkerHandle( + database: db, + connector: connector, + options: options, + sendToWorker: rawChannel.port2, + worker: null, + subscriptions: subscriptions, + ); + handle.statusStream.forEach(db.setStatus); + return handle; + } + + test('aborts sync when database is closed', () async { + final handle = createWorkerHandle(connector: _ThrowingBackendConnector()); + final hasError = expectLater( + db.statusStream, + emitsThrough(isA() + .having((e) => e.downloadError, 'downloadError', isNotNull))); + await handle.streamingSync(); + await hasError; + + expect(db.currentStatus.downloadError.toString(), + contains('Expected error from fetchCredentials')); + final syncRunner = syncWorker.requestedSyncTasks.values.single; + expect(syncRunner.sync, isNotNull); + expect(syncRunner.connections, hasLength(1)); + + await handle.closeChannel(); + await pumpEventQueue(); + + // This should abort the sync process + expect(syncRunner.sync, isNull); + expect(syncRunner.connections, isEmpty); + }); + + test('handles tabs closing while serving a request', () async { + late SyncWorkerHandle handle; + final didRequestCredentials = Completer(); + handle = createWorkerHandle(connector: _ThrowingBackendConnector(() { + // When the fetchCredentials request is sent, there should be a sync + // process. + final syncRunner = syncWorker.requestedSyncTasks.values.single; + expect(syncRunner.sync, isNotNull); + expect(syncRunner.connections, hasLength(1)); + + // Close the handle while the fetchCredentials request is active, meaning + // the sync worker will never receive a response. + handle.closeChannel(); + didRequestCredentials.complete(); + })); + + await handle.streamingSync(); + await didRequestCredentials.future; + await pumpEventQueue(); + + final syncRunner = syncWorker.requestedSyncTasks.values.single; + expect(syncRunner.sync, isNull); + expect(syncRunner.connections, isEmpty); + }); +} + +final class _ThrowingBackendConnector extends PowerSyncBackendConnector { + void Function()? onFetchCredentials; + + _ThrowingBackendConnector([this.onFetchCredentials]); + + @override + Future fetchCredentials() async { + onFetchCredentials?.call(); + throw UnsupportedError('Expected error from fetchCredentials'); + } + + @override + Future uploadData(PowerSyncDatabase database) async {} +}