Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions packages/powersync/lib/src/web/sync_controller.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:js_interop';

import 'package:meta/meta.dart';
import 'package:powersync/src/web/worker_utils.dart';
import 'package:sqlite_async/web.dart';
import 'package:web/web.dart';
Expand All @@ -21,17 +22,20 @@ class SyncWorkerHandle implements StreamingSync {

final StreamController<SyncStatus> _status = StreamController.broadcast();

SyncWorkerHandle._({
@visibleForTesting
SyncWorkerHandle({
required this.database,
required this.connector,
required this.options,
required MessagePort sendToWorker,
required SharedWorker worker,
required SharedWorker? worker,
required this.subscriptions,
}) {
_channel = WorkerCommunicationChannel(
port: sendToWorker,
errors: EventStreamProviders.errorEvent.forTarget(worker),
errors: worker != null
? EventStreamProviders.errorEvent.forTarget(worker)
: null,
logger: database.logger,
requestHandler: (type, payload) async {
switch (type) {
Expand Down Expand Up @@ -102,7 +106,7 @@ class SyncWorkerHandle implements StreamingSync {
[port2].toJS,
);

final handle = SyncWorkerHandle._(
final handle = SyncWorkerHandle(
options: options,
database: database,
connector: connector,
Expand All @@ -119,7 +123,12 @@ class SyncWorkerHandle implements StreamingSync {

Future<void> close() async {
await abort();
await _channel.close();
await closeChannel();
}

@visibleForTesting
Future<void> closeChannel() {
return _channel.close();
}

@override
Expand Down
97 changes: 47 additions & 50 deletions packages/powersync/lib/src/web/sync_worker.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// This file needs to be compiled to JavaScript with the command
/// dart compile js -O4 packages/powersync/lib/src/web/sync_worker.dart -o assets/powersync_sync.worker.js
/// The output should then be included in each project's `web` directory
@internal
library;

import 'dart:async';
Expand All @@ -11,6 +12,7 @@ import 'package:async/async.dart';
import 'package:collection/collection.dart';
import 'package:http/browser_client.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:powersync/powersync.dart';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:powersync/src/sync/internal_connector.dart';
Expand All @@ -26,20 +28,21 @@ import 'web_bucket_storage.dart';
final _logger = autoLogger;

class SyncWorker {
final Map<String, _SyncRunner> _requestedSyncTasks = {};
@visibleForTesting
final Map<String, SyncRunner> requestedSyncTasks = {};

void trackPort(MessagePort port) {
_ConnectedClient(port, this);
ConnectedClient(port, this);
}

_SyncRunner _referenceSyncTask(
SyncRunner _referenceSyncTask(
String databaseIdentifier,
SyncOptions options,
String schemaJson,
List<SubscribedStream> subscriptions,
_ConnectedClient client) {
return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
return _SyncRunner(databaseIdentifier);
ConnectedClient client) {
return requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
return SyncRunner(databaseIdentifier);
})
..registerClient(
client,
Expand All @@ -50,20 +53,23 @@ class SyncWorker {
}
}

class _ConnectedClient {
@visibleForTesting
class ConnectedClient {
late WorkerCommunicationChannel channel;
final SyncWorker _worker;

_SyncRunner? _runner;
SyncRunner? _runner;
StreamSubscription<LogRecord>? _logSubscription;

_ConnectedClient(MessagePort port, this._worker) {
ConnectedClient(MessagePort port, this._worker) {
channel = WorkerCommunicationChannel(
port: port,
requestHandler: (type, payload) async {
switch (type) {
case SyncWorkerMessageType.startSynchronization:
final request = payload as StartSynchronization;
channel.observeRemoteLockName(request.lockName);

final recoveredOptions = SyncOptions(
crudThrottleTime:
Duration(milliseconds: request.crudThrottleTimeMs),
Expand Down Expand Up @@ -108,6 +114,7 @@ class _ConnectedClient {
}
},
);
channel.closed.whenComplete(markClosed);

_logSubscription = _logger.onRecord.listen((record) {
final msg = StringBuffer(
Expand Down Expand Up @@ -139,7 +146,8 @@ class _ConnectedClient {
}
}

class _SyncRunner {
@visibleForTesting
class SyncRunner {
final String identifier;
ResolvedSyncOptions options = ResolvedSyncOptions(SyncOptions());
String schemaJson = '{}';
Expand All @@ -149,11 +157,11 @@ class _SyncRunner {

StreamingSyncImplementation? sync;
SyncStatus? _lastSyncStatus;
_ConnectedClient? databaseHost;
final connections = <_ConnectedClient, List<SubscribedStream>>{};
ConnectedClient? databaseHost;
final connections = <ConnectedClient, List<SubscribedStream>>{};
List<SubscribedStream> currentStreams = [];

_SyncRunner(this.identifier) {
SyncRunner(this.identifier) {
_group.add(_mainEvents.stream);

Future(() async {
Expand Down Expand Up @@ -186,32 +194,18 @@ class _SyncRunner {
if (_lastSyncStatus case final status?) {
client.sendSyncStatus(SerializedSyncStatus.from(status));
}

case _RemoveConnection(:final client):
connections.remove(client);
if (connections.isEmpty) {
await sync?.abort();
sync = null;
} else if (client == databaseHost) {
await _activeClientHasClosed();
}
case _DisconnectClient(:final client):
connections.remove(client);
await sync?.abort();
sync = null;
case _ActiveDatabaseClosed():
_logger.info('Remote database closed, finding a new client');
sync?.abort();
sync = null;

// The only reliable notification we get for a client closing is
// when that client is currently hosting the database. Use the
// opportunity to check whether secondary clients have also closed
// in the meantime.
final newHost = await _collectActiveClients();
if (newHost == null) {
_logger.info('No client remains');
} else {
await _requestDatabase(newHost);
}
case _ClientSubscriptionsChanged(
:final client,
:final subscriptions
Expand All @@ -226,6 +220,19 @@ class _SyncRunner {
});
}

Future<void> _activeClientHasClosed() async {
_logger.info('Remote database closed, finding a new client');
await sync?.abort();
sync = null;

final newHost = await _collectActiveClients();
if (newHost == null) {
_logger.info('No client remains');
} else {
await _requestDatabase(newHost);
}
}

/// Updates [currentStreams] to the union of values in [connections].
void reindexSubscriptions() {
final before = currentStreams.toSet();
Expand All @@ -242,13 +249,13 @@ class _SyncRunner {
/// (as they are likely closed tabs as well).
///
/// Returns the first client that responds (without waiting for others).
Future<_ConnectedClient?> _collectActiveClients() async {
Future<ConnectedClient?> _collectActiveClients() async {
final candidates = connections.keys.toList();
if (candidates.isEmpty) {
return null;
}

final firstResponder = Completer<_ConnectedClient?>();
final firstResponder = Completer<ConnectedClient?>();
var pendingRequests = candidates.length;

for (final candidate in candidates) {
Expand All @@ -270,7 +277,7 @@ class _SyncRunner {
return firstResponder.future;
}

Future<void> _requestDatabase(_ConnectedClient client) async {
Future<void> _requestDatabase(ConnectedClient client) async {
_logger.info('Sync setup: Requesting database');

// This is the first client, ask for a database connection
Expand All @@ -287,12 +294,6 @@ class _SyncRunner {
database.closedFuture.then((_) {
_logger.fine('Detected closed client');
client.markClosed();

if (client == databaseHost) {
_logger
.info('Tab providing sync database has gone down, reconnecting...');
_mainEvents.add(const _ActiveDatabaseClosed());
}
});

final tables = ['ps_crud'];
Expand Down Expand Up @@ -336,31 +337,31 @@ class _SyncRunner {
sync!.streamingSync();
}

void registerClient(_ConnectedClient client, SyncOptions options,
void registerClient(ConnectedClient client, SyncOptions options,
String schemaJson, List<SubscribedStream> subscriptions) {
_mainEvents.add(_AddConnection(client, options, schemaJson, subscriptions));
}

/// Remove a client, disconnecting if no clients remain..
void unregisterClient(_ConnectedClient client) {
void unregisterClient(ConnectedClient client) {
_mainEvents.add(_RemoveConnection(client));
}

/// Remove a client, and immediately disconnect.
void disconnectClient(_ConnectedClient client) {
void disconnectClient(ConnectedClient client) {
_mainEvents.add(_DisconnectClient(client));
}

void updateClientSubscriptions(
_ConnectedClient client, List<SubscribedStream> subscriptions) {
ConnectedClient client, List<SubscribedStream> subscriptions) {
_mainEvents.add(_ClientSubscriptionsChanged(client, subscriptions));
}
}

sealed class _RunnerEvent {}

final class _AddConnection implements _RunnerEvent {
final _ConnectedClient client;
final ConnectedClient client;
final SyncOptions options;
final String schemaJson;
final List<SubscribedStream> subscriptions;
Expand All @@ -370,24 +371,20 @@ final class _AddConnection implements _RunnerEvent {
}

final class _RemoveConnection implements _RunnerEvent {
final _ConnectedClient client;
final ConnectedClient client;

_RemoveConnection(this.client);
}

final class _DisconnectClient implements _RunnerEvent {
final _ConnectedClient client;
final ConnectedClient client;

_DisconnectClient(this.client);
}

final class _ClientSubscriptionsChanged implements _RunnerEvent {
final _ConnectedClient client;
final ConnectedClient client;
final List<SubscribedStream> subscriptions;

_ClientSubscriptionsChanged(this.client, this.subscriptions);
}

final class _ActiveDatabaseClosed implements _RunnerEvent {
const _ActiveDatabaseClosed();
}
Loading