From 5fe5fd797b15a0f476efc3a42168561d91f46796 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 1 Jun 2026 12:02:09 +0200 Subject: [PATCH 1/6] Support custom HTTP clients on native platforms --- .../native/native_powersync_database.dart | 6 ++++-- packages/powersync/lib/src/sync/options.dart | 20 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/packages/powersync/lib/src/database/native/native_powersync_database.dart b/packages/powersync/lib/src/database/native/native_powersync_database.dart index 85bca49d..f71fcc14 100644 --- a/packages/powersync/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync/lib/src/database/native/native_powersync_database.dart @@ -3,7 +3,6 @@ import 'dart:convert'; import 'dart:isolate'; import 'package:meta/meta.dart'; -import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; import 'package:powersync/src/abort_controller.dart'; import 'package:powersync/src/sync/bucket_storage.dart'; @@ -172,6 +171,7 @@ final class NativePowerSyncDatabase extends BasePowerSyncDatabase { database.openFactory.path, options, jsonEncode(schema), + options.source.httpClient, ), debugName: 'Sync ${database.openFactory.path}', onError: receiveUnhandledErrors.sendPort, @@ -191,12 +191,14 @@ class _PowerSyncDatabaseIsolateArgs { final String databaseName; final ResolvedSyncOptions options; final String schemaJson; + final HttpClientFactory httpClient; _PowerSyncDatabaseIsolateArgs( this.sPort, this.databaseName, this.options, this.schemaJson, + this.httpClient, ); } @@ -284,7 +286,7 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { crudUpdateTriggerStream: database .onChange(['ps_crud'], throttle: args.options.crudThrottleTime), options: args.options, - client: http.Client(), + client: args.httpClient(), syncMutex: mutexes.mutex('sync'), crudMutex: mutexes.mutex('crud'), ); diff --git a/packages/powersync/lib/src/sync/options.dart b/packages/powersync/lib/src/sync/options.dart index 381b3a5d..9ba351f8 100644 --- a/packages/powersync/lib/src/sync/options.dart +++ b/packages/powersync/lib/src/sync/options.dart @@ -1,6 +1,18 @@ import 'package:collection/collection.dart'; +import 'package:http/http.dart'; import 'package:meta/meta.dart'; +/// The signature of a function creating a http [Client] to use by the PowerSync +/// client. +/// +/// PowerSync will use [Client.new] by default, but a custom factory can be used +/// as [SyncOptions.httpClient]. This allows transforming requests and +/// responses, e.g. to add additional headers or allow custom TLS certificates. +/// +/// On native platforms, these functions are sent across send ports (and thus +/// must not capture non-sendable state). +typedef HttpClientFactory = Client Function(); + /// Options that affect how the sync client connects to the sync service. final class SyncOptions { /// A map of application metadata that is passed to the PowerSync service. @@ -38,6 +50,12 @@ final class SyncOptions { /// This is enabled by default. final bool? includeDefaultStreams; + /// A function to create http clients used by the PowerSync SDK. + /// + /// Custom clients can be used to configure TLS options, inject additional + /// headers, or otherwise customize networking. + final HttpClientFactory httpClient; + const SyncOptions({ this.crudThrottleTime, this.retryDelay, @@ -45,6 +63,7 @@ final class SyncOptions { this.syncImplementation = SyncClientImplementation.defaultClient, this.includeDefaultStreams, this.appMetadata, + this.httpClient = Client.new, }); SyncOptions _copyWith({ @@ -60,6 +79,7 @@ final class SyncOptions { syncImplementation: syncImplementation, includeDefaultStreams: includeDefaultStreams, appMetadata: appMetadata ?? this.appMetadata, + httpClient: httpClient, ); } } From 5b4f2409a79da37b82448f97da1891be644f4e1e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 2 Jun 2026 17:03:09 +0200 Subject: [PATCH 2/6] Custom HTTP client for workers --- .../native/native_powersync_database.dart | 3 +- .../database/web/web_powersync_database.dart | 4 +- packages/powersync/lib/src/sync/options.dart | 5 +- .../powersync/lib/src/web/http/client.dart | 112 +++++++++++++ .../powersync/lib/src/web/http/protocol.dart | 92 +++++++++++ .../powersync/lib/src/web/http/server.dart | 123 +++++++++++++++ .../lib/src/web/sync_controller.dart | 7 +- .../powersync/lib/src/web/sync_worker.dart | 4 + .../lib/src/web/sync_worker_protocol.dart | 92 ++++++++++- packages/powersync/test/web/http_test.dart | 147 ++++++++++++++++++ 10 files changed, 579 insertions(+), 10 deletions(-) create mode 100644 packages/powersync/lib/src/web/http/client.dart create mode 100644 packages/powersync/lib/src/web/http/protocol.dart create mode 100644 packages/powersync/lib/src/web/http/server.dart create mode 100644 packages/powersync/test/web/http_test.dart diff --git a/packages/powersync/lib/src/database/native/native_powersync_database.dart b/packages/powersync/lib/src/database/native/native_powersync_database.dart index f71fcc14..f4869897 100644 --- a/packages/powersync/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync/lib/src/database/native/native_powersync_database.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:isolate'; +import 'package:http/http.dart'; import 'package:meta/meta.dart'; import 'package:logging/logging.dart'; @@ -171,7 +172,7 @@ final class NativePowerSyncDatabase extends BasePowerSyncDatabase { database.openFactory.path, options, jsonEncode(schema), - options.source.httpClient, + options.source.httpClient ?? Client.new, ), debugName: 'Sync ${database.openFactory.path}', onError: receiveUnhandledErrors.sendPort, diff --git a/packages/powersync/lib/src/database/web/web_powersync_database.dart b/packages/powersync/lib/src/database/web/web_powersync_database.dart index 717936b5..10c91c74 100644 --- a/packages/powersync/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync/lib/src/database/web/web_powersync_database.dart @@ -1,7 +1,7 @@ import 'dart:async'; import 'dart:convert'; +import 'package:http/http.dart'; import 'package:meta/meta.dart'; -import 'package:http/browser_client.dart'; import 'package:powersync/src/abort_controller.dart'; import 'package:powersync/src/sync/bucket_storage.dart'; import 'package:powersync/src/connector.dart'; @@ -75,7 +75,7 @@ final class WebPowerSyncDatabase extends BasePowerSyncDatabase { connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: crudStream, options: options, - client: BrowserClient(), + client: options.source.httpClient?.call() ?? Client(), activeSubscriptions: initiallyActiveStreams, // Only allows 1 sync implementation to run at a time per database // This should be global (across tabs) when using Navigator locks. diff --git a/packages/powersync/lib/src/sync/options.dart b/packages/powersync/lib/src/sync/options.dart index 9ba351f8..865b6664 100644 --- a/packages/powersync/lib/src/sync/options.dart +++ b/packages/powersync/lib/src/sync/options.dart @@ -54,7 +54,7 @@ final class SyncOptions { /// /// Custom clients can be used to configure TLS options, inject additional /// headers, or otherwise customize networking. - final HttpClientFactory httpClient; + final HttpClientFactory? httpClient; const SyncOptions({ this.crudThrottleTime, @@ -63,7 +63,7 @@ final class SyncOptions { this.syncImplementation = SyncClientImplementation.defaultClient, this.includeDefaultStreams, this.appMetadata, - this.httpClient = Client.new, + this.httpClient, }); SyncOptions _copyWith({ @@ -138,6 +138,7 @@ extension type ResolvedSyncOptions(SyncOptions source) { includeDefaultStreams: other.includeDefaultStreams ?? includeDefaultStreams, appMetadata: other.appMetadata ?? appMetadata, + httpClient: other.httpClient ?? source.httpClient, ); final didChange = !_mapEquality.equals(newOptions.params, params) || diff --git a/packages/powersync/lib/src/web/http/client.dart b/packages/powersync/lib/src/web/http/client.dart new file mode 100644 index 00000000..61d9d634 --- /dev/null +++ b/packages/powersync/lib/src/web/http/client.dart @@ -0,0 +1,112 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:js_interop'; +import 'dart:typed_data'; + +import 'package:http/http.dart'; + +import '../sync_worker_protocol.dart'; +import 'protocol.dart'; + +final class RemoteHttpClient extends BaseClient { + final WorkerCommunicationChannel _channel; + + int _nextTransactionId = 0; + + RemoteHttpClient(this._channel); + + @override + Future send(BaseRequest request) async { + final body = await request.finalize().toBytes(); + final bodyBuffer = body.buffer; + // This will always be the case with the toBytes() implementation. We + // couldn't safely transfer the entire buffer without it. + assert(body.offsetInBytes == 0 && body.length == bodyBuffer.lengthInBytes); + final jsBuffer = bodyBuffer.toJS; + + final txId = _nextTransactionId++; + + // Send request with other port. + final responseFuture = _channel.sendHttpRequest( + HttpRequest( + requestId: 0, // Set by sendHttpRequest + transactionId: txId, + uri: request.url.toString(), + method: request.method, + headers: json.encode(request.headers), + body: jsBuffer, + ), + ); + + if (request is Abortable) { + request.abortTrigger?.whenComplete(() => sendAbort(txId, false)); + } + + final rawResponse = await responseFuture; + + return StreamedResponse( + _ResponseStream(this, txId, rawResponse.lockName).stream, + rawResponse.statusCode, + request: request, + headers: rawResponse.decodedHeaders, + ); + } + + void sendAbort(int txId, bool abortStream) { + _channel.port.postMessage( + SyncWorkerMessage( + type: SyncWorkerMessageType.abortHttpRequest.name, + payload: AbortHttpResponse( + cancelStream: abortStream, + transactionId: txId, + ), + ), + ); + } +} + +final class _ResponseStream { + final RemoteHttpClient client; + final int txId; + + final streamController = StreamController(sync: true); + var isFetching = false; + + Stream get stream => streamController.stream; + + _ResponseStream(this.client, this.txId, String lockName) { + streamController + ..onListen = fetchIfHasListener + ..onResume = fetchIfHasListener + ..onCancel = () => client.sendAbort(txId, true); + } + + void fetchChunk() async { + assert(!isFetching); + isFetching = true; + + try { + final chunk = await client._channel.readHttpResponseChunk(txId); + if (chunk != null) { + streamController.add(chunk.toDart.asUint8List()); + } else { + streamController.close(); + } + } catch (e, s) { + streamController.addError(e, s); + streamController.close(); + } finally { + isFetching = false; + fetchIfHasListener(); + } + } + + void fetchIfHasListener() { + if (!isFetching && + streamController.hasListener && + !streamController.isClosed && + !streamController.isPaused) { + fetchChunk(); + } + } +} diff --git a/packages/powersync/lib/src/web/http/protocol.dart b/packages/powersync/lib/src/web/http/protocol.dart new file mode 100644 index 00000000..0e83d2db --- /dev/null +++ b/packages/powersync/lib/src/web/http/protocol.dart @@ -0,0 +1,92 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:js_interop'; + +extension type HttpMessage._(JSObject _) implements JSObject { + /// JSON-encoded request or response headers headers. + @JS('h') + external String headers; + + Map get decodedHeaders { + final decoded = json.decode(headers) as Map; + return decoded.cast(); + } +} + +/// A serialized HTTP request. +extension type HttpRequest._(JSObject _) implements HttpMessage { + external factory HttpRequest({ + @JS('r') required int requestId, + @JS('i') required int transactionId, + @JS('u') required String uri, + @JS('m') required String method, + @JS('h') required String headers, + @JS('b') required JSArrayBuffer body, + }); + + @JS('r') + external int requestId; + + /// A client-generated id for the HTTP transaction. + /// + /// This can be used to identify the request in subsequent commands to read + /// or abort the response. + @JS('i') + external int transactionId; + + @JS('u') + external String uri; + + @JS('m') + external String method; + + /// The full request body (we don't support streaming request bodies, these + /// aren't used by the SDK). + @JS('b') + external JSArrayBuffer body; +} + +/// A serialized HTTP response +extension type HttpResponse._(JSObject _) implements HttpMessage { + external factory HttpResponse({ + @JS('l') required String lockName, + @JS('s') required int statusCode, + @JS('h') required String headers, + }); + + @JS('l') + external String lockName; + + @JS('s') + external int statusCode; +} + +extension type AbortHttpResponse._(JSObject _) implements JSObject { + external factory AbortHttpResponse({ + @JS('r') required bool cancelStream, + @JS('i') required int transactionId, + }); + + /// Whether the abort is from a [StreamSubscription.cancel] call (as opposed + /// to an abort trigger on an abortable request). + @JS('r') + external bool cancelStream; + + /// The same as [HttpRequest.transactionId]. + @JS('i') + external int transactionId; +} + +extension type ReadStreamChunk._(JSObject _) implements JSObject { + external factory ReadStreamChunk({ + @JS('r') required int requestId, + @JS('i') required int transactionId, + }); + + @JS('r') + external int requestId; + + /// The same as [HttpRequest.transactionId]. + @JS('i') + external int transactionId; +} diff --git a/packages/powersync/lib/src/web/http/server.dart b/packages/powersync/lib/src/web/http/server.dart new file mode 100644 index 00000000..6ceff49e --- /dev/null +++ b/packages/powersync/lib/src/web/http/server.dart @@ -0,0 +1,123 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:js_interop'; +import 'dart:js_interop_unsafe'; +import 'dart:typed_data'; + +import 'package:http/http.dart'; +import 'package:web/web.dart' show Crypto; + +import '../../platform_specific/web.dart'; +import 'protocol.dart'; + +final class RemoteHttpServer { + /// The http client to expose to a worker. + final Client client; + + final Map _pendingTransactions = {}; + + RemoteHttpServer(this.client); + + Future<(HttpResponse, JSArray?)> handle(HttpRequest request) async { + final state = _HttpRequest(); + _pendingTransactions[request.transactionId] = state; + + final lockName = await state.acquireLock(); + final inner = AbortableRequest(request.method, Uri.parse(request.uri), + abortTrigger: state._abortController.future); + inner.bodyBytes = request.body.toDart.asUint8List(); + request.decodedHeaders.forEach((k, v) => inner.headers[k] = v); + + final response = await client.send(inner); + state.response = StreamIterator(response.stream); + + return ( + HttpResponse( + lockName: lockName, + statusCode: response.statusCode, + headers: json.encode(response.headers), + ), + null + ); + } + + Future<(JSArrayBuffer?, JSArray?)> readResponse(int transactionId) async { + final state = _pendingTransactions[transactionId]; + final response = state?.response; + if (state == null || response == null) { + throw ArgumentError('Unknown HTTP transaction: $transactionId'); + } + + if (await response.moveNext()) { + final asJsBuffer = _byteListToArrayBuffer(response.current); + return (asJsBuffer, [asJsBuffer].toJS); + } else if (state._abortController.isCompleted) { + throw RequestAbortedException(); + } else { + // End of stream + _pendingTransactions.remove(transactionId); + state.close(); + + return (null, null); + } + } + + void abort(int transactionId, bool cancelStream) { + _pendingTransactions.remove(transactionId)?.abort(cancelStream); + } + + static JSArrayBuffer _byteListToArrayBuffer(List bytes) { + if (bytes is Uint8List) { + final buffer = bytes.buffer; + if (bytes.offsetInBytes == 0 && buffer.lengthInBytes == bytes.length) { + // Not a sublist view, we can transfer the buffer at once. + return buffer.toJS; + } + } + + return Uint8List.fromList(bytes).buffer.toJS; + } +} + +final class _HttpRequest { + var _closed = false; + + final Completer _abortController = Completer.sync(); + StreamIterator>? response; + + void close() { + if (!_closed) { + _closed = true; + + response?.cancel(); + abort(false); + } + } + + void abort(bool abortStream) { + if (!_abortController.isCompleted) { + if (abortStream) { + response?.cancel(); + } + + _abortController.complete(); + } + } + + Future acquireLock() async { + final name = _generateRandomLockName(); + final hasLock = Completer.sync(); + potentiallySharedMutex(name).lock(() async { + hasLock.complete(); + return _abortController.future; + }); + + await hasLock.future; + return name; + } + + static String _generateRandomLockName() { + final crypto = (globalContext['crypto'] as Crypto); + return 'http-remote-${crypto.randomUUID()}'; + } +} diff --git a/packages/powersync/lib/src/web/sync_controller.dart b/packages/powersync/lib/src/web/sync_controller.dart index aa8bf88a..288ce5d6 100644 --- a/packages/powersync/lib/src/web/sync_controller.dart +++ b/packages/powersync/lib/src/web/sync_controller.dart @@ -2,9 +2,10 @@ import 'dart:async'; import 'dart:js_interop'; import 'package:meta/meta.dart'; +import 'package:http/http.dart'; import 'package:powersync/src/web/worker_utils.dart'; import 'package:sqlite_async/web.dart'; -import 'package:web/web.dart'; +import 'package:web/web.dart' hide Client; import '../connector.dart'; import '../database/powersync_database.dart'; @@ -29,6 +30,7 @@ class SyncWorkerHandle implements StreamingSync { required this.options, required MessagePort sendToWorker, required SharedWorker? worker, + required Client? client, required this.subscriptions, }) { _channel = WorkerCommunicationChannel( @@ -37,6 +39,7 @@ class SyncWorkerHandle implements StreamingSync { ? EventStreamProviders.errorEvent.forTarget(worker) : null, logger: database.logger, + exposedHttpClient: client, requestHandler: (type, payload) async { switch (type) { case SyncWorkerMessageType.requestEndpoint: @@ -113,6 +116,7 @@ class SyncWorkerHandle implements StreamingSync { sendToWorker: port1, worker: worker, subscriptions: subscriptions, + client: options.httpClient?.call(), ); // Make sure that the worker is working, or throw immediately. @@ -146,6 +150,7 @@ class SyncWorkerHandle implements StreamingSync { ResolvedSyncOptions(options), database.schema, subscriptions, + options.httpClient != null, ); } diff --git a/packages/powersync/lib/src/web/sync_worker.dart b/packages/powersync/lib/src/web/sync_worker.dart index df225c08..b938f219 100644 --- a/packages/powersync/lib/src/web/sync_worker.dart +++ b/packages/powersync/lib/src/web/sync_worker.dart @@ -22,6 +22,7 @@ import 'package:sqlite_async/web.dart'; import 'package:web/web.dart' hide RequestMode; import '../database/powersync_database.dart'; +import 'http/client.dart'; import 'sync_worker_protocol.dart'; import 'web_bucket_storage.dart'; @@ -91,6 +92,9 @@ class ConnectedClient { final encodedAppMetadata => Map.from( jsonDecode(encodedAppMetadata) as Map), }, + httpClient: request.customHttpClient == true + ? () => RemoteHttpClient(channel) + : null, ); _runner = _worker._referenceSyncTask( diff --git a/packages/powersync/lib/src/web/sync_worker_protocol.dart b/packages/powersync/lib/src/web/sync_worker_protocol.dart index ff10663e..5283c0e7 100644 --- a/packages/powersync/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync/lib/src/web/sync_worker_protocol.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:js_interop'; import 'dart:js_interop_unsafe'; +import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync/src/schema.dart'; import 'package:powersync/src/sync/options.dart'; @@ -14,6 +15,8 @@ import '../log.dart'; import '../platform_specific/web.dart'; import '../sync/streaming_sync.dart'; import '../sync/sync_status.dart'; +import 'http/protocol.dart'; +import 'http/server.dart'; /// Names used in [SyncWorkerMessage] enum SyncWorkerMessageType { @@ -58,6 +61,24 @@ enum SyncWorkerMessageType { /// The payload is a [JSString]. logEvent, + /// Send an HTTP request. + /// + /// The request payload is a [HttpRequest], the response is a [HttpResponse]. + sendHttpRequest, + + /// Abort an in-flight HTTP request. + /// + /// The payload is a [AbortHttpResponse] object. + /// + /// The payload is a [JSNumber] matching the [HttpRequest.transactionId]. + abortHttpRequest, + + /// Requests a chunk of data from an HTTP response. + /// + /// The request payload is a [ReadStreamChunk], the response is a + /// [JSArrayBuffer] (or `null` if the end of the response was reached). + readResponseChunk, + okResponse, errorResponse, } @@ -84,6 +105,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { String? syncParamsEncoded, UpdateSubscriptions? subscriptions, String? appMetadataEncoded, + bool? customHttpClient, }); external String get databaseName; @@ -96,6 +118,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { external UpdateSubscriptions? get subscriptions; external String? get appMetadataEncoded; external String get lockName; + external bool? get customHttpClient; } @anonymous @@ -155,11 +178,16 @@ extension type OkResponse._(JSObject _) implements JSObject { extension type ErrorResponse._(JSObject _) implements JSObject { external factory ErrorResponse({ required int requestId, + required int? recognizedType, required JSString errorMessage, }); external int get requestId; + external int? get recognizedType; external JSString get errorMessage; + + static const recognizedTypeNone = 0; + static const recognizedTypeRequestAbortedException = 1; } @anonymous @@ -372,6 +400,7 @@ final class WorkerCommunicationChannel { required this.requestHandler, Stream? errors, Logger? logger, + Client? exposedHttpClient, }) : _logger = logger ?? autoLogger { port.start(); _incomingErrors = errors?.listen((event) { @@ -383,6 +412,9 @@ final class WorkerCommunicationChannel { _pendingRequests.clear(); }); + final client = + exposedHttpClient == null ? null : RemoteHttpServer(exposedHttpClient); + _incomingMessages = EventStreamProviders.messageEvent.forTarget(port).listen((event) async { final message = event.data as SyncWorkerMessage; @@ -405,15 +437,33 @@ final class WorkerCommunicationChannel { case SyncWorkerMessageType.invalidCredentialsCallback: case SyncWorkerMessageType.uploadCrud: requestId = (message.payload as JSNumber).toDartInt; + case SyncWorkerMessageType.sendHttpRequest: + final request = message.payload as HttpRequest; + _respond(request.requestId, () => client!.handle(request)); + return; + case SyncWorkerMessageType.abortHttpRequest: + final payload = message.payload as AbortHttpResponse; + client!.abort(payload.transactionId, payload.cancelStream); + return; + case SyncWorkerMessageType.readResponseChunk: + final request = message.payload as ReadStreamChunk; + _respond(request.requestId, + () => client!.readResponse(request.transactionId)); + return; case SyncWorkerMessageType.okResponse: final payload = message.payload as OkResponse; _pendingRequests.remove(payload.requestId)!.complete(payload.payload); return; case SyncWorkerMessageType.errorResponse: final payload = message.payload as ErrorResponse; - _pendingRequests - .remove(payload.requestId)! - .completeError(payload.errorMessage.toDart); + final error = switch ( + payload.recognizedType ?? ErrorResponse.recognizedTypeNone) { + ErrorResponse.recognizedTypeRequestAbortedException => + RequestAbortedException(), + _ => payload.errorMessage.toDart, + }; + + _pendingRequests.remove(payload.requestId)!.completeError(error); return; case SyncWorkerMessageType.notifySyncStatus: _events.add((type, message.payload)); @@ -446,7 +496,14 @@ final class WorkerCommunicationChannel { port.postMessage(SyncWorkerMessage( type: SyncWorkerMessageType.errorResponse.name, payload: ErrorResponse( - requestId: requestId, errorMessage: e.toString().toJS), + requestId: requestId, + recognizedType: switch (e) { + RequestAbortedException() => + ErrorResponse.recognizedTypeRequestAbortedException, + _ => ErrorResponse.recognizedTypeNone, + }, + errorMessage: e.toString().toJS, + ), )); } } @@ -489,6 +546,7 @@ final class WorkerCommunicationChannel { ResolvedSyncOptions options, Schema schema, List streams, + bool customHttpClient, ) async { final (id, completion) = _newRequest(); port.postMessage(SyncWorkerMessage( @@ -510,6 +568,7 @@ final class WorkerCommunicationChannel { final appMetadata => jsonEncode(appMetadata), }, lockName: await _lockName, + customHttpClient: customHttpClient, ), )); await completion; @@ -554,6 +613,31 @@ final class WorkerCommunicationChannel { await _numericRequest(SyncWorkerMessageType.uploadCrud); } + Future sendHttpRequest(HttpRequest request) async { + final (id, completion) = _newRequest(); + request.requestId = id; + port.postMessage( + SyncWorkerMessage( + payload: request, + type: SyncWorkerMessageType.sendHttpRequest.name, + ), + [request.body].toJS, + ); + return (await completion) as HttpResponse; + } + + Future readHttpResponseChunk(int transactionId) async { + final (id, completion) = _newRequest(); + port.postMessage( + SyncWorkerMessage( + type: SyncWorkerMessageType.readResponseChunk.name, + payload: ReadStreamChunk(requestId: id, transactionId: transactionId), + ), + ); + + return (await completion) as JSArrayBuffer?; + } + Future close() async { if (!_closed.isCompleted) { _incomingMessages?.cancel(); diff --git a/packages/powersync/test/web/http_test.dart b/packages/powersync/test/web/http_test.dart new file mode 100644 index 00000000..7953650b --- /dev/null +++ b/packages/powersync/test/web/http_test.dart @@ -0,0 +1,147 @@ +@TestOn('browser') +library; + +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:http/http.dart'; +import 'package:http/testing.dart'; +import 'package:powersync/src/web/http/client.dart'; +import 'package:powersync/src/web/sync_worker_protocol.dart'; +import 'package:test/test.dart'; +import 'package:web/web.dart' show MessageChannel; + +void main() { + final uri = Uri.parse('https://powersync.com/foo/bar'); + + test('can send http requests', () async { + final (client, _) = createRemoteClient(MockClient((request) async { + expect(request.url, uri); + expect(request.method, 'POST'); + expect(request.headers, containsPair('Foo', 'Bar')); + expect(request.body, 'body'); + + return Response('ok', 200, headers: {'Response': 'Ok'}); + })); + + final response = + await client.post(uri, headers: {'Foo': 'Bar'}, body: 'body'); + expect(response.statusCode, 200); + expect(response.headers, {'Response': 'Ok'}); + expect(response.body, 'ok'); + }); + + test('response stream control', () async { + final responseStream = StreamController(); + final (client, _) = + createRemoteClient(MockClient.streaming((request, stream) async { + await stream.drain(); + return StreamedResponse(responseStream.stream, 200); + })); + + final response = await client.send(Request('GET', uri)); + expect(responseStream.hasListener, isFalse); + + final receivedChunks = >[]; + final sub = response.stream.listen(receivedChunks.add); + await pumpEventQueue(); + expect(responseStream.hasListener, isTrue); + expect(responseStream.isPaused, isFalse); + + responseStream.add(Uint8List(123)); + await pumpEventQueue(); + expect(receivedChunks, [hasLength(123)]); + + responseStream.add(Uint8List(42)); + sub.pause(); + await pumpEventQueue(); + expect(responseStream.isPaused, isTrue); + + sub.resume(); + await pumpEventQueue(); + expect(responseStream.isPaused, isFalse); + + sub.cancel(); + await pumpEventQueue(); + expect(responseStream.hasListener, isFalse); + }); + + test('can abort requests before receiving response', () async { + final (client, _) = + createRemoteClient(MockClient.streaming((request, _) async { + await (request as Abortable).abortTrigger!; + throw RequestAbortedException(); + })); + + await expectLater( + client.send(AbortableRequest('GET', uri, abortTrigger: Future.value())), + throwsA(isA()), + ); + }); + + test('can abort requests in response', () async { + final abort = Completer(); + final responseStream = StreamController(); + var aborted = false; + + final (client, _) = + createRemoteClient(MockClient.streaming((request, _) async { + (request as Abortable).abortTrigger!.whenComplete(() { + aborted = true; + responseStream + ..addError(RequestAbortedException()) + ..close(); + }); + return StreamedResponse(responseStream.stream, 200); + })); + + final response = await client + .send(AbortableRequest('GET', uri, abortTrigger: abort.future)); + responseStream.add(Uint8List(42)); + final receivedResponseStream = StreamQueue(response.stream); + await expectLater(receivedResponseStream, emits(hasLength(42))); + + abort.complete(); + await expectLater( + receivedResponseStream, emitsError(isA())); + expect(aborted, isTrue); + }); + + test('can abort via stream cancel', () async { + final responseStream = StreamController(); + + final (client, _) = + createRemoteClient(MockClient.streaming((request, _) async { + return StreamedResponse(responseStream.stream, 200); + })); + + final response = await client.send(AbortableRequest('GET', uri)); + responseStream.add(Uint8List(42)); + + final receivedResponseStream = StreamQueue(response.stream); + await expectLater(receivedResponseStream, emits(hasLength(42))); + await receivedResponseStream.cancel(); + await pumpEventQueue(); + expect(responseStream.hasListener, isFalse); + }); +} + +(Client, WorkerCommunicationChannel) createRemoteClient(Client original) { + final channel = MessageChannel(); + + final local = WorkerCommunicationChannel( + port: channel.port1, + requestHandler: (type, payload) async { + throw UnimplementedError(); + }, + exposedHttpClient: original, + ); + final remote = WorkerCommunicationChannel( + port: channel.port2, + requestHandler: (type, payload) async { + throw UnimplementedError(); + }, + ); + return (RemoteHttpClient(remote), local); +} From 8ad5012109be8551e1bf0790e04a293b8088e85c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 2 Jun 2026 18:00:21 +0200 Subject: [PATCH 3/6] Add test --- packages/powersync/lib/src/setup/web.dart | 4 +- .../test/sync/custom_client_test.dart | 50 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 packages/powersync/test/sync/custom_client_test.dart diff --git a/packages/powersync/lib/src/setup/web.dart b/packages/powersync/lib/src/setup/web.dart index 23256bc1..faf064a7 100644 --- a/packages/powersync/lib/src/setup/web.dart +++ b/packages/powersync/lib/src/setup/web.dart @@ -58,8 +58,8 @@ Future downloadWebAssets(List arguments) async { return; } + final httpClient = HttpClient(); try { - final httpClient = HttpClient(); const sqlitePackageName = 'sqlite3'; final (tag: powersyncTag, version: powerSyncVersion) = @@ -112,6 +112,8 @@ Future downloadWebAssets(List arguments) async { } catch (e) { print(e); exit(1); + } finally { + httpClient.close(); } } diff --git a/packages/powersync/test/sync/custom_client_test.dart b/packages/powersync/test/sync/custom_client_test.dart new file mode 100644 index 00000000..8418f7f4 --- /dev/null +++ b/packages/powersync/test/sync/custom_client_test.dart @@ -0,0 +1,50 @@ +import 'package:http/http.dart'; +import 'package:powersync/powersync.dart'; +import 'package:test/test.dart'; + +import '../server/sync_server/in_memory_sync_server.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/in_memory_http.dart'; +import '../utils/test_utils_impl.dart'; +import 'utils.dart'; + +void main() { + late PowerSyncDatabase powersync; + late String path; + + setUp(() async { + path = _testUtils.dbPath(); + await _testUtils.cleanDb(path: path); + + powersync = await _testUtils.setupPowerSync(path: path); + }); + + tearDown(() => powersync.close()); + + test('can use custom http client', () async { + await powersync.connect( + connector: TestConnector( + () async => PowerSyncCredentials( + endpoint: 'http://test.powersync.example.org', token: 'token'), + ), + options: SyncOptions( + httpClient: _createMockClient, + ), + ); + + await powersync.waitForFirstSync(); + }); +} + +Client _createMockClient() { + final (client, server) = inMemoryServer(); + final service = MockSyncService(); + server.mount((r) => service.router(r)); + + service + ..addLine(checkpoint(lastOpId: 1)) + ..addLine(checkpointComplete()); + return client; +} + +final _testUtils = TestUtils(); From ba45b0e9cec155c308291828b8f229dd54f2fedb Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 3 Jun 2026 11:25:51 +0200 Subject: [PATCH 4/6] Documentation --- .../powersync/lib/src/web/http/client.dart | 8 +- .../powersync/lib/src/web/http/protocol.dart | 6 +- .../powersync/lib/src/web/http/server.dart | 58 ++++---- .../lib/src/web/sync_worker_protocol.dart | 41 ++++-- packages/powersync/test/web/http_test.dart | 133 ++++++++++-------- 5 files changed, 138 insertions(+), 108 deletions(-) diff --git a/packages/powersync/lib/src/web/http/client.dart b/packages/powersync/lib/src/web/http/client.dart index 61d9d634..79dbaf21 100644 --- a/packages/powersync/lib/src/web/http/client.dart +++ b/packages/powersync/lib/src/web/http/client.dart @@ -8,6 +8,10 @@ import 'package:http/http.dart'; import '../sync_worker_protocol.dart'; import 'protocol.dart'; +/// An HTTP client implemented through `postMessage` calls on message ports. +/// +/// The sync worker uses this when a connecting tab indicates that a custom +/// HTTP client (which we can't send through ports as an object) should be used. final class RemoteHttpClient extends BaseClient { final WorkerCommunicationChannel _channel; @@ -45,7 +49,7 @@ final class RemoteHttpClient extends BaseClient { final rawResponse = await responseFuture; return StreamedResponse( - _ResponseStream(this, txId, rawResponse.lockName).stream, + _ResponseStream(this, txId).stream, rawResponse.statusCode, request: request, headers: rawResponse.decodedHeaders, @@ -74,7 +78,7 @@ final class _ResponseStream { Stream get stream => streamController.stream; - _ResponseStream(this.client, this.txId, String lockName) { + _ResponseStream(this.client, this.txId) { streamController ..onListen = fetchIfHasListener ..onResume = fetchIfHasListener diff --git a/packages/powersync/lib/src/web/http/protocol.dart b/packages/powersync/lib/src/web/http/protocol.dart index 0e83d2db..fbb0da9b 100644 --- a/packages/powersync/lib/src/web/http/protocol.dart +++ b/packages/powersync/lib/src/web/http/protocol.dart @@ -49,18 +49,15 @@ extension type HttpRequest._(JSObject _) implements HttpMessage { /// A serialized HTTP response extension type HttpResponse._(JSObject _) implements HttpMessage { external factory HttpResponse({ - @JS('l') required String lockName, @JS('s') required int statusCode, @JS('h') required String headers, }); - @JS('l') - external String lockName; - @JS('s') external int statusCode; } +/// A message sent to cancel an in-flight HTTP request. extension type AbortHttpResponse._(JSObject _) implements JSObject { external factory AbortHttpResponse({ @JS('r') required bool cancelStream, @@ -77,6 +74,7 @@ extension type AbortHttpResponse._(JSObject _) implements JSObject { external int transactionId; } +/// A request to read a chunk of HTTP response data. extension type ReadStreamChunk._(JSObject _) implements JSObject { external factory ReadStreamChunk({ @JS('r') required int requestId, diff --git a/packages/powersync/lib/src/web/http/server.dart b/packages/powersync/lib/src/web/http/server.dart index 6ceff49e..6e0d87e9 100644 --- a/packages/powersync/lib/src/web/http/server.dart +++ b/packages/powersync/lib/src/web/http/server.dart @@ -1,15 +1,13 @@ import 'dart:async'; import 'dart:convert'; import 'dart:js_interop'; -import 'dart:js_interop_unsafe'; import 'dart:typed_data'; import 'package:http/http.dart'; -import 'package:web/web.dart' show Crypto; -import '../../platform_specific/web.dart'; import 'protocol.dart'; +/// A proxy exposing an HTTP [Client] through a message port protocol. final class RemoteHttpServer { /// The http client to expose to a worker. final Client client; @@ -18,11 +16,14 @@ final class RemoteHttpServer { RemoteHttpServer(this.client); - Future<(HttpResponse, JSArray?)> handle(HttpRequest request) async { + /// Handles an http request, returning the serialized response. + /// + /// The response does not include a body, which must be read in chunks via + /// [readResponse]. + Future handle(HttpRequest request) async { final state = _HttpRequest(); _pendingTransactions[request.transactionId] = state; - final lockName = await state.acquireLock(); final inner = AbortableRequest(request.method, Uri.parse(request.uri), abortTrigger: state._abortController.future); inner.bodyBytes = request.body.toDart.asUint8List(); @@ -31,17 +32,17 @@ final class RemoteHttpServer { final response = await client.send(inner); state.response = StreamIterator(response.stream); - return ( - HttpResponse( - lockName: lockName, - statusCode: response.statusCode, - headers: json.encode(response.headers), - ), - null + return HttpResponse( + statusCode: response.statusCode, + headers: json.encode(response.headers), ); } - Future<(JSArrayBuffer?, JSArray?)> readResponse(int transactionId) async { + /// Reads a chunk of an HTTP response stream. + /// + /// Returns the chunk as an array buffer, or null if the end of the stream has + /// been reached. + Future readResponse(int transactionId) async { final state = _pendingTransactions[transactionId]; final response = state?.response; if (state == null || response == null) { @@ -49,8 +50,7 @@ final class RemoteHttpServer { } if (await response.moveNext()) { - final asJsBuffer = _byteListToArrayBuffer(response.current); - return (asJsBuffer, [asJsBuffer].toJS); + return _byteListToArrayBuffer(response.current); } else if (state._abortController.isCompleted) { throw RequestAbortedException(); } else { @@ -58,7 +58,7 @@ final class RemoteHttpServer { _pendingTransactions.remove(transactionId); state.close(); - return (null, null); + return null; } } @@ -66,6 +66,15 @@ final class RemoteHttpServer { _pendingTransactions.remove(transactionId)?.abort(cancelStream); } + void forceClose() { + for (final pending in _pendingTransactions.values) { + pending.close(); + } + _pendingTransactions.clear(); + + client.close(); + } + static JSArrayBuffer _byteListToArrayBuffer(List bytes) { if (bytes is Uint8List) { final buffer = bytes.buffer; @@ -103,21 +112,4 @@ final class _HttpRequest { _abortController.complete(); } } - - Future acquireLock() async { - final name = _generateRandomLockName(); - final hasLock = Completer.sync(); - potentiallySharedMutex(name).lock(() async { - hasLock.complete(); - return _abortController.future; - }); - - await hasLock.future; - return name; - } - - static String _generateRandomLockName() { - final crypto = (globalContext['crypto'] as Crypto); - return 'http-remote-${crypto.randomUUID()}'; - } } diff --git a/packages/powersync/lib/src/web/sync_worker_protocol.dart b/packages/powersync/lib/src/web/sync_worker_protocol.dart index 5283c0e7..481c1792 100644 --- a/packages/powersync/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync/lib/src/web/sync_worker_protocol.dart @@ -20,6 +20,11 @@ import 'http/server.dart'; /// Names used in [SyncWorkerMessage] enum SyncWorkerMessageType { + /// Probes whether the remote is still active. + /// + /// This makes the remote acquire a randomly-named navigator lock and reply + /// with its name. Once the other end can acquire that, it knows the port is + /// closed. ping, /// Sent from client to the sync worker to request the synchronization @@ -68,9 +73,7 @@ enum SyncWorkerMessageType { /// Abort an in-flight HTTP request. /// - /// The payload is a [AbortHttpResponse] object. - /// - /// The payload is a [JSNumber] matching the [HttpRequest.transactionId]. + /// The payload is an [AbortHttpResponse] object. abortHttpRequest, /// Requests a chunk of data from an HTTP response. @@ -390,6 +393,7 @@ final class WorkerCommunicationChannel { final StreamController<(SyncWorkerMessageType, JSAny)> _events = StreamController(); final Logger _logger; + final RemoteHttpServer? _httpServer; Stream<(SyncWorkerMessageType, JSAny)> get events => _events.stream; @@ -401,7 +405,10 @@ final class WorkerCommunicationChannel { Stream? errors, Logger? logger, Client? exposedHttpClient, - }) : _logger = logger ?? autoLogger { + }) : _logger = logger ?? autoLogger, + _httpServer = exposedHttpClient == null + ? null + : RemoteHttpServer(exposedHttpClient) { port.start(); _incomingErrors = errors?.listen((event) { _hasError = true; @@ -412,9 +419,6 @@ final class WorkerCommunicationChannel { _pendingRequests.clear(); }); - final client = - exposedHttpClient == null ? null : RemoteHttpServer(exposedHttpClient); - _incomingMessages = EventStreamProviders.messageEvent.forTarget(port).listen((event) async { final message = event.data as SyncWorkerMessage; @@ -439,17 +443,21 @@ final class WorkerCommunicationChannel { requestId = (message.payload as JSNumber).toDartInt; case SyncWorkerMessageType.sendHttpRequest: final request = message.payload as HttpRequest; - _respond(request.requestId, () => client!.handle(request)); - return; + return _respond(request.requestId, + () async => (await _httpServer!.handle(request), null)); case SyncWorkerMessageType.abortHttpRequest: final payload = message.payload as AbortHttpResponse; - client!.abort(payload.transactionId, payload.cancelStream); + _httpServer!.abort(payload.transactionId, payload.cancelStream); return; case SyncWorkerMessageType.readResponseChunk: final request = message.payload as ReadStreamChunk; - _respond(request.requestId, - () => client!.readResponse(request.transactionId)); - return; + return _respond(request.requestId, () async { + return switch ( + await _httpServer!.readResponse(request.transactionId)) { + null => (null, null), + final buffer => (buffer, [buffer].toJS), + }; + }); case SyncWorkerMessageType.okResponse: final payload = message.payload as OkResponse; _pendingRequests.remove(payload.requestId)!.complete(payload.payload); @@ -530,7 +538,11 @@ final class WorkerCommunicationChannel { } Future ping() async { - await _numericRequest(SyncWorkerMessageType.ping); + final response = await _numericRequest(SyncWorkerMessageType.ping); + if (response.isA()) { + // Once we're able to acquire this lock, we know the remote has closed. + potentiallySharedMutex((response as JSString).toDart).lock(close); + } } void observeRemoteLockName(String name) { @@ -643,6 +655,7 @@ final class WorkerCommunicationChannel { _incomingMessages?.cancel(); _incomingErrors?.cancel(); port.close(); + _httpServer?.forceClose(); for (final pending in _pendingRequests.values) { pending.completeError(const ChannelClosedException()); diff --git a/packages/powersync/test/web/http_test.dart b/packages/powersync/test/web/http_test.dart index 7953650b..cb0a3e21 100644 --- a/packages/powersync/test/web/http_test.dart +++ b/packages/powersync/test/web/http_test.dart @@ -67,63 +67,86 @@ void main() { expect(responseStream.hasListener, isFalse); }); - test('can abort requests before receiving response', () async { - final (client, _) = - createRemoteClient(MockClient.streaming((request, _) async { - await (request as Abortable).abortTrigger!; - throw RequestAbortedException(); - })); - - await expectLater( - client.send(AbortableRequest('GET', uri, abortTrigger: Future.value())), - throwsA(isA()), - ); + group('can abort', () { + test('before receiving response', () async { + final (client, _) = + createRemoteClient(MockClient.streaming((request, _) async { + await (request as Abortable).abortTrigger!; + throw RequestAbortedException(); + })); + + await expectLater( + client.send(AbortableRequest('GET', uri, abortTrigger: Future.value())), + throwsA(isA()), + ); + }); + + test('in response', () async { + final abort = Completer(); + final responseStream = StreamController(); + var aborted = false; + + final (client, _) = + createRemoteClient(MockClient.streaming((request, _) async { + (request as Abortable).abortTrigger!.whenComplete(() { + aborted = true; + responseStream + ..addError(RequestAbortedException()) + ..close(); + }); + return StreamedResponse(responseStream.stream, 200); + })); + + final response = await client + .send(AbortableRequest('GET', uri, abortTrigger: abort.future)); + responseStream.add(Uint8List(42)); + final receivedResponseStream = StreamQueue(response.stream); + await expectLater(receivedResponseStream, emits(hasLength(42))); + + abort.complete(); + await expectLater( + receivedResponseStream, emitsError(isA())); + expect(aborted, isTrue); + }); + + test('via stream cancel', () async { + final responseStream = StreamController(); + + final (client, _) = + createRemoteClient(MockClient.streaming((request, _) async { + return StreamedResponse(responseStream.stream, 200); + })); + + final response = await client.send(AbortableRequest('GET', uri)); + responseStream.add(Uint8List(42)); + + final receivedResponseStream = StreamQueue(response.stream); + await expectLater(receivedResponseStream, emits(hasLength(42))); + await receivedResponseStream.cancel(); + await pumpEventQueue(); + expect(responseStream.hasListener, isFalse); + }); }); - test('can abort requests in response', () async { - final abort = Completer(); - final responseStream = StreamController(); - var aborted = false; - - final (client, _) = - createRemoteClient(MockClient.streaming((request, _) async { - (request as Abortable).abortTrigger!.whenComplete(() { - aborted = true; - responseStream - ..addError(RequestAbortedException()) - ..close(); - }); - return StreamedResponse(responseStream.stream, 200); - })); - - final response = await client - .send(AbortableRequest('GET', uri, abortTrigger: abort.future)); - responseStream.add(Uint8List(42)); - final receivedResponseStream = StreamQueue(response.stream); - await expectLater(receivedResponseStream, emits(hasLength(42))); - - abort.complete(); - await expectLater( - receivedResponseStream, emitsError(isA())); - expect(aborted, isTrue); - }); - - test('can abort via stream cancel', () async { - final responseStream = StreamController(); - - final (client, _) = - createRemoteClient(MockClient.streaming((request, _) async { - return StreamedResponse(responseStream.stream, 200); - })); - - final response = await client.send(AbortableRequest('GET', uri)); - responseStream.add(Uint8List(42)); - - final receivedResponseStream = StreamQueue(response.stream); - await expectLater(receivedResponseStream, emits(hasLength(42))); - await receivedResponseStream.cancel(); - await pumpEventQueue(); - expect(responseStream.hasListener, isFalse); + group('reacts to closed channel', () { + test('in response stream', () async { + final responseStream = StreamController(); + final (client, channel) = + createRemoteClient(MockClient.streaming((request, _) async { + return StreamedResponse(responseStream.stream, 200); + })); + + final response = await client.send(AbortableRequest('GET', uri)); + responseStream.add(Uint8List(42)); + final receivedResponseStream = StreamQueue(response.stream); + await expectLater(receivedResponseStream, emits(hasLength(42))); + + final expectation = expectLater( + receivedResponseStream, emitsError(isA())); + await pumpEventQueue(); + channel.close(); + await expectation; + }); }); } From 1383a3d989e39e6c5493e7ed37894b46c7bcc509 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 3 Jun 2026 13:55:15 +0200 Subject: [PATCH 5/6] Better support for closed tabs --- .../lib/src/web/sync_worker_protocol.dart | 6 ++-- packages/powersync/test/web/http_test.dart | 34 ++++++++++++++----- .../powersync/test/web/sync_worker_test.dart | 1 + 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/packages/powersync/lib/src/web/sync_worker_protocol.dart b/packages/powersync/lib/src/web/sync_worker_protocol.dart index 481c1792..e5856f17 100644 --- a/packages/powersync/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync/lib/src/web/sync_worker_protocol.dart @@ -5,6 +5,7 @@ import 'dart:js_interop_unsafe'; import 'package:http/http.dart'; import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; import 'package:powersync/src/schema.dart'; import 'package:powersync/src/sync/options.dart'; import 'package:powersync/src/sync/stream.dart'; @@ -385,7 +386,8 @@ final class WorkerCommunicationChannel { /// The name of a navigator lock held by this channel, it's sent to the remote /// when requested so that it can detect when this channel is closed. - late final Future _lockName = _acquireLock(); + @visibleForTesting + late final Future lockName = _acquireLock(); final MessagePort port; final FutureOr<(JSAny?, JSArray?)> Function(SyncWorkerMessageType, JSAny) @@ -579,7 +581,7 @@ final class WorkerCommunicationChannel { null => null, final appMetadata => jsonEncode(appMetadata), }, - lockName: await _lockName, + lockName: await lockName, customHttpClient: customHttpClient, ), )); diff --git a/packages/powersync/test/web/http_test.dart b/packages/powersync/test/web/http_test.dart index cb0a3e21..4f0160a2 100644 --- a/packages/powersync/test/web/http_test.dart +++ b/packages/powersync/test/web/http_test.dart @@ -16,7 +16,7 @@ void main() { final uri = Uri.parse('https://powersync.com/foo/bar'); test('can send http requests', () async { - final (client, _) = createRemoteClient(MockClient((request) async { + final (client, _) = await createRemoteClient(MockClient((request) async { expect(request.url, uri); expect(request.method, 'POST'); expect(request.headers, containsPair('Foo', 'Bar')); @@ -35,7 +35,7 @@ void main() { test('response stream control', () async { final responseStream = StreamController(); final (client, _) = - createRemoteClient(MockClient.streaming((request, stream) async { + await createRemoteClient(MockClient.streaming((request, stream) async { await stream.drain(); return StreamedResponse(responseStream.stream, 200); })); @@ -70,7 +70,7 @@ void main() { group('can abort', () { test('before receiving response', () async { final (client, _) = - createRemoteClient(MockClient.streaming((request, _) async { + await createRemoteClient(MockClient.streaming((request, _) async { await (request as Abortable).abortTrigger!; throw RequestAbortedException(); })); @@ -87,7 +87,7 @@ void main() { var aborted = false; final (client, _) = - createRemoteClient(MockClient.streaming((request, _) async { + await createRemoteClient(MockClient.streaming((request, _) async { (request as Abortable).abortTrigger!.whenComplete(() { aborted = true; responseStream @@ -113,7 +113,7 @@ void main() { final responseStream = StreamController(); final (client, _) = - createRemoteClient(MockClient.streaming((request, _) async { + await createRemoteClient(MockClient.streaming((request, _) async { return StreamedResponse(responseStream.stream, 200); })); @@ -129,10 +129,24 @@ void main() { }); group('reacts to closed channel', () { + test('before receiving response', () async { + late Client client; + late WorkerCommunicationChannel channel; + + (client, channel) = + await createRemoteClient(MockClient.streaming((request, _) async { + channel.close(); + return StreamedResponse(Stream.empty(), 200); + })); + + await expectLater(client.send(AbortableRequest('GET', uri)), + throwsA(isA())); + }); + test('in response stream', () async { final responseStream = StreamController(); final (client, channel) = - createRemoteClient(MockClient.streaming((request, _) async { + await createRemoteClient(MockClient.streaming((request, _) async { return StreamedResponse(responseStream.stream, 200); })); @@ -142,7 +156,7 @@ void main() { await expectLater(receivedResponseStream, emits(hasLength(42))); final expectation = expectLater( - receivedResponseStream, emitsError(isA())); + receivedResponseStream, emitsError(isA())); await pumpEventQueue(); channel.close(); await expectation; @@ -150,7 +164,8 @@ void main() { }); } -(Client, WorkerCommunicationChannel) createRemoteClient(Client original) { +Future<(Client, WorkerCommunicationChannel)> createRemoteClient( + Client original) async { final channel = MessageChannel(); final local = WorkerCommunicationChannel( @@ -166,5 +181,8 @@ void main() { throw UnimplementedError(); }, ); + local.observeRemoteLockName(await remote.lockName); + remote.observeRemoteLockName(await local.lockName); + return (RemoteHttpClient(remote), local); } diff --git a/packages/powersync/test/web/sync_worker_test.dart b/packages/powersync/test/web/sync_worker_test.dart index b9f03066..5bf855fc 100644 --- a/packages/powersync/test/web/sync_worker_test.dart +++ b/packages/powersync/test/web/sync_worker_test.dart @@ -44,6 +44,7 @@ void main() { sendToWorker: rawChannel.port2, worker: null, subscriptions: subscriptions, + client: null, ); handle.statusStream.forEach(db.setStatus); return handle; From 9533e1af69d0dd533ca17669b2165991fd956d8f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 3 Jun 2026 14:43:50 +0200 Subject: [PATCH 6/6] Support custom clients in sync workers --- .../native/native_powersync_database.dart | 5 ---- .../database/web/web_powersync_database.dart | 2 -- packages/powersync/lib/src/sync/options.dart | 2 ++ .../lib/src/sync/streaming_sync.dart | 3 +- .../lib/src/web/sync_controller.dart | 5 +--- .../powersync/lib/src/web/sync_worker.dart | 2 -- .../test/utils/abstract_test_utils.dart | 29 +++---------------- .../powersync/test/web/sync_worker_test.dart | 24 ++++++++++++++- 8 files changed, 31 insertions(+), 41 deletions(-) diff --git a/packages/powersync/lib/src/database/native/native_powersync_database.dart b/packages/powersync/lib/src/database/native/native_powersync_database.dart index f4869897..d59037c3 100644 --- a/packages/powersync/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync/lib/src/database/native/native_powersync_database.dart @@ -1,7 +1,6 @@ import 'dart:async'; import 'dart:convert'; import 'dart:isolate'; -import 'package:http/http.dart'; import 'package:meta/meta.dart'; import 'package:logging/logging.dart'; @@ -172,7 +171,6 @@ final class NativePowerSyncDatabase extends BasePowerSyncDatabase { database.openFactory.path, options, jsonEncode(schema), - options.source.httpClient ?? Client.new, ), debugName: 'Sync ${database.openFactory.path}', onError: receiveUnhandledErrors.sendPort, @@ -192,14 +190,12 @@ class _PowerSyncDatabaseIsolateArgs { final String databaseName; final ResolvedSyncOptions options; final String schemaJson; - final HttpClientFactory httpClient; _PowerSyncDatabaseIsolateArgs( this.sPort, this.databaseName, this.options, this.schemaJson, - this.httpClient, ); } @@ -287,7 +283,6 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { crudUpdateTriggerStream: database .onChange(['ps_crud'], throttle: args.options.crudThrottleTime), options: args.options, - client: args.httpClient(), syncMutex: mutexes.mutex('sync'), crudMutex: mutexes.mutex('crud'), ); diff --git a/packages/powersync/lib/src/database/web/web_powersync_database.dart b/packages/powersync/lib/src/database/web/web_powersync_database.dart index 10c91c74..2ac85502 100644 --- a/packages/powersync/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync/lib/src/database/web/web_powersync_database.dart @@ -1,6 +1,5 @@ import 'dart:async'; import 'dart:convert'; -import 'package:http/http.dart'; import 'package:meta/meta.dart'; import 'package:powersync/src/abort_controller.dart'; import 'package:powersync/src/sync/bucket_storage.dart'; @@ -75,7 +74,6 @@ final class WebPowerSyncDatabase extends BasePowerSyncDatabase { connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: crudStream, options: options, - client: options.source.httpClient?.call() ?? Client(), activeSubscriptions: initiallyActiveStreams, // Only allows 1 sync implementation to run at a time per database // This should be global (across tabs) when using Navigator locks. diff --git a/packages/powersync/lib/src/sync/options.dart b/packages/powersync/lib/src/sync/options.dart index 865b6664..45bfc971 100644 --- a/packages/powersync/lib/src/sync/options.dart +++ b/packages/powersync/lib/src/sync/options.dart @@ -129,6 +129,8 @@ extension type ResolvedSyncOptions(SyncOptions source) { bool get includeDefaultStreams => source.includeDefaultStreams ?? true; + Client createHttpClient() => source.httpClient?.call() ?? Client(); + (ResolvedSyncOptions, bool) applyFrom(SyncOptions other) { final newOptions = SyncOptions( crudThrottleTime: other.crudThrottleTime ?? crudThrottleTime, diff --git a/packages/powersync/lib/src/sync/streaming_sync.dart b/packages/powersync/lib/src/sync/streaming_sync.dart index 927a5c77..6e7bc860 100644 --- a/packages/powersync/lib/src/sync/streaming_sync.dart +++ b/packages/powersync/lib/src/sync/streaming_sync.dart @@ -73,7 +73,6 @@ class StreamingSyncImplementation implements StreamingSync { required this.connector, required this.crudUpdateTriggerStream, required this.options, - required http.Client client, List activeSubscriptions = const [], Mutex? syncMutex, Mutex? crudMutex, @@ -82,7 +81,7 @@ class StreamingSyncImplementation implements StreamingSync { /// A unique identifier for this streaming sync implementation /// A good value is typically the DB file path which it will mutate when syncing. String? identifier = "unknown", - }) : _client = client, + }) : _client = options.createHttpClient(), syncMutex = syncMutex ?? potentiallySharedMutex("sync-$identifier"), crudMutex = crudMutex ?? potentiallySharedMutex("crud-$identifier"), _userAgentHeaders = userAgentHeaders(), diff --git a/packages/powersync/lib/src/web/sync_controller.dart b/packages/powersync/lib/src/web/sync_controller.dart index 288ce5d6..66854748 100644 --- a/packages/powersync/lib/src/web/sync_controller.dart +++ b/packages/powersync/lib/src/web/sync_controller.dart @@ -2,7 +2,6 @@ import 'dart:async'; import 'dart:js_interop'; import 'package:meta/meta.dart'; -import 'package:http/http.dart'; import 'package:powersync/src/web/worker_utils.dart'; import 'package:sqlite_async/web.dart'; import 'package:web/web.dart' hide Client; @@ -30,7 +29,6 @@ class SyncWorkerHandle implements StreamingSync { required this.options, required MessagePort sendToWorker, required SharedWorker? worker, - required Client? client, required this.subscriptions, }) { _channel = WorkerCommunicationChannel( @@ -39,7 +37,7 @@ class SyncWorkerHandle implements StreamingSync { ? EventStreamProviders.errorEvent.forTarget(worker) : null, logger: database.logger, - exposedHttpClient: client, + exposedHttpClient: options.httpClient?.call(), requestHandler: (type, payload) async { switch (type) { case SyncWorkerMessageType.requestEndpoint: @@ -116,7 +114,6 @@ class SyncWorkerHandle implements StreamingSync { sendToWorker: port1, worker: worker, subscriptions: subscriptions, - client: options.httpClient?.call(), ); // Make sure that the worker is working, or throw immediately. diff --git a/packages/powersync/lib/src/web/sync_worker.dart b/packages/powersync/lib/src/web/sync_worker.dart index b938f219..7df8a665 100644 --- a/packages/powersync/lib/src/web/sync_worker.dart +++ b/packages/powersync/lib/src/web/sync_worker.dart @@ -10,7 +10,6 @@ import 'dart:js_interop'; import 'package:async/async.dart'; import 'package:collection/collection.dart'; -import 'package:http/browser_client.dart'; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:powersync/powersync.dart'; @@ -324,7 +323,6 @@ class SyncRunner { ), crudUpdateTriggerStream: crudStream, options: options, - client: BrowserClient(), identifier: identifier, activeSubscriptions: currentStreams, logger: _logger, diff --git a/packages/powersync/test/utils/abstract_test_utils.dart b/packages/powersync/test/utils/abstract_test_utils.dart index aabe3955..1c1096df 100644 --- a/packages/powersync/test/utils/abstract_test_utils.dart +++ b/packages/powersync/test/utils/abstract_test_utils.dart @@ -168,10 +168,13 @@ final class TestDatabase extends BasePowerSyncDatabase { required AbortController abort, required Zone asyncWorkZone, }) async { + if (httpClient case final client?) { + options = options.applyFrom(SyncOptions(httpClient: () => client)).$1; + } + final impl = StreamingSyncImplementation( adapter: BucketStorage(this), schemaJson: jsonEncode(this.schema), - client: httpClient!, options: options, connector: InternalConnector.wrap(connector, this), logger: logger, @@ -207,27 +210,3 @@ final class TestDatabase extends BasePowerSyncDatabase { debugContext: debugContext, lockTimeout: lockTimeout); } } - -extension MockSync on PowerSyncDatabase { - StreamingSyncImplementation connectWithMockService( - Client client, - PowerSyncBackendConnector connector, { - Logger? logger, - SyncOptions options = const SyncOptions(retryDelay: Duration(seconds: 5)), - Schema? customSchema, - }) { - final impl = StreamingSyncImplementation( - adapter: BucketStorage(this), - schemaJson: jsonEncode(customSchema ?? schema), - client: client, - options: ResolvedSyncOptions(options), - connector: InternalConnector.wrap(connector, this), - logger: logger, - crudUpdateTriggerStream: database - .onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)), - ); - impl.statusStream.listen(setStatus); - - return impl; - } -} diff --git a/packages/powersync/test/web/sync_worker_test.dart b/packages/powersync/test/web/sync_worker_test.dart index 5bf855fc..8939bd9e 100644 --- a/packages/powersync/test/web/sync_worker_test.dart +++ b/packages/powersync/test/web/sync_worker_test.dart @@ -11,6 +11,10 @@ import 'package:powersync/src/web/sync_worker.dart'; import 'package:test/test.dart'; import 'package:web/web.dart' show MessageChannel; +import '../server/sync_server/in_memory_sync_server.dart'; +import '../sync/utils.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/in_memory_http.dart'; import '../utils/web_test_utils.dart'; void main() { @@ -44,7 +48,6 @@ void main() { sendToWorker: rawChannel.port2, worker: null, subscriptions: subscriptions, - client: null, ); handle.statusStream.forEach(db.setStatus); return handle; @@ -97,6 +100,25 @@ void main() { expect(syncRunner.sync, isNull); expect(syncRunner.connections, isEmpty); }); + + test('can use custom http client', () async { + final (client, server) = inMemoryServer(); + final service = MockSyncService(); + server.mount((r) => service.router(r)); + service + ..addLine(checkpoint(lastOpId: 1)) + ..addLine(checkpointComplete()); + + final handle = createWorkerHandle( + connector: TestConnector( + () async => PowerSyncCredentials( + endpoint: 'http://test.powersync.example.org', token: 'token'), + ), + options: SyncOptions(httpClient: () => client), + ); + await handle.streamingSync(); + await db.waitForFirstSync(); + }); } final class _ThrowingBackendConnector extends PowerSyncBackendConnector {