diff --git a/packages/powersync/lib/src/database/native/native_powersync_database.dart b/packages/powersync/lib/src/database/native/native_powersync_database.dart index 85bca49d..d59037c3 100644 --- a/packages/powersync/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync/lib/src/database/native/native_powersync_database.dart @@ -3,7 +3,6 @@ import 'dart:convert'; import 'dart:isolate'; import 'package:meta/meta.dart'; -import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; import 'package:powersync/src/abort_controller.dart'; import 'package:powersync/src/sync/bucket_storage.dart'; @@ -284,7 +283,6 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { crudUpdateTriggerStream: database .onChange(['ps_crud'], throttle: args.options.crudThrottleTime), options: args.options, - client: http.Client(), syncMutex: mutexes.mutex('sync'), crudMutex: mutexes.mutex('crud'), ); diff --git a/packages/powersync/lib/src/database/web/web_powersync_database.dart b/packages/powersync/lib/src/database/web/web_powersync_database.dart index 717936b5..2ac85502 100644 --- a/packages/powersync/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync/lib/src/database/web/web_powersync_database.dart @@ -1,7 +1,6 @@ import 'dart:async'; import 'dart:convert'; import 'package:meta/meta.dart'; -import 'package:http/browser_client.dart'; import 'package:powersync/src/abort_controller.dart'; import 'package:powersync/src/sync/bucket_storage.dart'; import 'package:powersync/src/connector.dart'; @@ -75,7 +74,6 @@ final class WebPowerSyncDatabase extends BasePowerSyncDatabase { connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: crudStream, options: options, - client: BrowserClient(), activeSubscriptions: initiallyActiveStreams, // Only allows 1 sync implementation to run at a time per database // This should be global (across tabs) when using Navigator locks. diff --git a/packages/powersync/lib/src/setup/web.dart b/packages/powersync/lib/src/setup/web.dart index 23256bc1..faf064a7 100644 --- a/packages/powersync/lib/src/setup/web.dart +++ b/packages/powersync/lib/src/setup/web.dart @@ -58,8 +58,8 @@ Future downloadWebAssets(List arguments) async { return; } + final httpClient = HttpClient(); try { - final httpClient = HttpClient(); const sqlitePackageName = 'sqlite3'; final (tag: powersyncTag, version: powerSyncVersion) = @@ -112,6 +112,8 @@ Future downloadWebAssets(List arguments) async { } catch (e) { print(e); exit(1); + } finally { + httpClient.close(); } } diff --git a/packages/powersync/lib/src/sync/options.dart b/packages/powersync/lib/src/sync/options.dart index 381b3a5d..45bfc971 100644 --- a/packages/powersync/lib/src/sync/options.dart +++ b/packages/powersync/lib/src/sync/options.dart @@ -1,6 +1,18 @@ import 'package:collection/collection.dart'; +import 'package:http/http.dart'; import 'package:meta/meta.dart'; +/// The signature of a function creating a http [Client] to use by the PowerSync +/// client. +/// +/// PowerSync will use [Client.new] by default, but a custom factory can be used +/// as [SyncOptions.httpClient]. This allows transforming requests and +/// responses, e.g. to add additional headers or allow custom TLS certificates. +/// +/// On native platforms, these functions are sent across send ports (and thus +/// must not capture non-sendable state). +typedef HttpClientFactory = Client Function(); + /// Options that affect how the sync client connects to the sync service. final class SyncOptions { /// A map of application metadata that is passed to the PowerSync service. @@ -38,6 +50,12 @@ final class SyncOptions { /// This is enabled by default. final bool? includeDefaultStreams; + /// A function to create http clients used by the PowerSync SDK. + /// + /// Custom clients can be used to configure TLS options, inject additional + /// headers, or otherwise customize networking. + final HttpClientFactory? httpClient; + const SyncOptions({ this.crudThrottleTime, this.retryDelay, @@ -45,6 +63,7 @@ final class SyncOptions { this.syncImplementation = SyncClientImplementation.defaultClient, this.includeDefaultStreams, this.appMetadata, + this.httpClient, }); SyncOptions _copyWith({ @@ -60,6 +79,7 @@ final class SyncOptions { syncImplementation: syncImplementation, includeDefaultStreams: includeDefaultStreams, appMetadata: appMetadata ?? this.appMetadata, + httpClient: httpClient, ); } } @@ -109,6 +129,8 @@ extension type ResolvedSyncOptions(SyncOptions source) { bool get includeDefaultStreams => source.includeDefaultStreams ?? true; + Client createHttpClient() => source.httpClient?.call() ?? Client(); + (ResolvedSyncOptions, bool) applyFrom(SyncOptions other) { final newOptions = SyncOptions( crudThrottleTime: other.crudThrottleTime ?? crudThrottleTime, @@ -118,6 +140,7 @@ extension type ResolvedSyncOptions(SyncOptions source) { includeDefaultStreams: other.includeDefaultStreams ?? includeDefaultStreams, appMetadata: other.appMetadata ?? appMetadata, + httpClient: other.httpClient ?? source.httpClient, ); final didChange = !_mapEquality.equals(newOptions.params, params) || diff --git a/packages/powersync/lib/src/sync/streaming_sync.dart b/packages/powersync/lib/src/sync/streaming_sync.dart index 927a5c77..6e7bc860 100644 --- a/packages/powersync/lib/src/sync/streaming_sync.dart +++ b/packages/powersync/lib/src/sync/streaming_sync.dart @@ -73,7 +73,6 @@ class StreamingSyncImplementation implements StreamingSync { required this.connector, required this.crudUpdateTriggerStream, required this.options, - required http.Client client, List activeSubscriptions = const [], Mutex? syncMutex, Mutex? crudMutex, @@ -82,7 +81,7 @@ class StreamingSyncImplementation implements StreamingSync { /// A unique identifier for this streaming sync implementation /// A good value is typically the DB file path which it will mutate when syncing. String? identifier = "unknown", - }) : _client = client, + }) : _client = options.createHttpClient(), syncMutex = syncMutex ?? potentiallySharedMutex("sync-$identifier"), crudMutex = crudMutex ?? potentiallySharedMutex("crud-$identifier"), _userAgentHeaders = userAgentHeaders(), diff --git a/packages/powersync/lib/src/web/http/client.dart b/packages/powersync/lib/src/web/http/client.dart new file mode 100644 index 00000000..79dbaf21 --- /dev/null +++ b/packages/powersync/lib/src/web/http/client.dart @@ -0,0 +1,116 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:js_interop'; +import 'dart:typed_data'; + +import 'package:http/http.dart'; + +import '../sync_worker_protocol.dart'; +import 'protocol.dart'; + +/// An HTTP client implemented through `postMessage` calls on message ports. +/// +/// The sync worker uses this when a connecting tab indicates that a custom +/// HTTP client (which we can't send through ports as an object) should be used. +final class RemoteHttpClient extends BaseClient { + final WorkerCommunicationChannel _channel; + + int _nextTransactionId = 0; + + RemoteHttpClient(this._channel); + + @override + Future send(BaseRequest request) async { + final body = await request.finalize().toBytes(); + final bodyBuffer = body.buffer; + // This will always be the case with the toBytes() implementation. We + // couldn't safely transfer the entire buffer without it. + assert(body.offsetInBytes == 0 && body.length == bodyBuffer.lengthInBytes); + final jsBuffer = bodyBuffer.toJS; + + final txId = _nextTransactionId++; + + // Send request with other port. + final responseFuture = _channel.sendHttpRequest( + HttpRequest( + requestId: 0, // Set by sendHttpRequest + transactionId: txId, + uri: request.url.toString(), + method: request.method, + headers: json.encode(request.headers), + body: jsBuffer, + ), + ); + + if (request is Abortable) { + request.abortTrigger?.whenComplete(() => sendAbort(txId, false)); + } + + final rawResponse = await responseFuture; + + return StreamedResponse( + _ResponseStream(this, txId).stream, + rawResponse.statusCode, + request: request, + headers: rawResponse.decodedHeaders, + ); + } + + void sendAbort(int txId, bool abortStream) { + _channel.port.postMessage( + SyncWorkerMessage( + type: SyncWorkerMessageType.abortHttpRequest.name, + payload: AbortHttpResponse( + cancelStream: abortStream, + transactionId: txId, + ), + ), + ); + } +} + +final class _ResponseStream { + final RemoteHttpClient client; + final int txId; + + final streamController = StreamController(sync: true); + var isFetching = false; + + Stream get stream => streamController.stream; + + _ResponseStream(this.client, this.txId) { + streamController + ..onListen = fetchIfHasListener + ..onResume = fetchIfHasListener + ..onCancel = () => client.sendAbort(txId, true); + } + + void fetchChunk() async { + assert(!isFetching); + isFetching = true; + + try { + final chunk = await client._channel.readHttpResponseChunk(txId); + if (chunk != null) { + streamController.add(chunk.toDart.asUint8List()); + } else { + streamController.close(); + } + } catch (e, s) { + streamController.addError(e, s); + streamController.close(); + } finally { + isFetching = false; + fetchIfHasListener(); + } + } + + void fetchIfHasListener() { + if (!isFetching && + streamController.hasListener && + !streamController.isClosed && + !streamController.isPaused) { + fetchChunk(); + } + } +} diff --git a/packages/powersync/lib/src/web/http/protocol.dart b/packages/powersync/lib/src/web/http/protocol.dart new file mode 100644 index 00000000..fbb0da9b --- /dev/null +++ b/packages/powersync/lib/src/web/http/protocol.dart @@ -0,0 +1,90 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:js_interop'; + +extension type HttpMessage._(JSObject _) implements JSObject { + /// JSON-encoded request or response headers headers. + @JS('h') + external String headers; + + Map get decodedHeaders { + final decoded = json.decode(headers) as Map; + return decoded.cast(); + } +} + +/// A serialized HTTP request. +extension type HttpRequest._(JSObject _) implements HttpMessage { + external factory HttpRequest({ + @JS('r') required int requestId, + @JS('i') required int transactionId, + @JS('u') required String uri, + @JS('m') required String method, + @JS('h') required String headers, + @JS('b') required JSArrayBuffer body, + }); + + @JS('r') + external int requestId; + + /// A client-generated id for the HTTP transaction. + /// + /// This can be used to identify the request in subsequent commands to read + /// or abort the response. + @JS('i') + external int transactionId; + + @JS('u') + external String uri; + + @JS('m') + external String method; + + /// The full request body (we don't support streaming request bodies, these + /// aren't used by the SDK). + @JS('b') + external JSArrayBuffer body; +} + +/// A serialized HTTP response +extension type HttpResponse._(JSObject _) implements HttpMessage { + external factory HttpResponse({ + @JS('s') required int statusCode, + @JS('h') required String headers, + }); + + @JS('s') + external int statusCode; +} + +/// A message sent to cancel an in-flight HTTP request. +extension type AbortHttpResponse._(JSObject _) implements JSObject { + external factory AbortHttpResponse({ + @JS('r') required bool cancelStream, + @JS('i') required int transactionId, + }); + + /// Whether the abort is from a [StreamSubscription.cancel] call (as opposed + /// to an abort trigger on an abortable request). + @JS('r') + external bool cancelStream; + + /// The same as [HttpRequest.transactionId]. + @JS('i') + external int transactionId; +} + +/// A request to read a chunk of HTTP response data. +extension type ReadStreamChunk._(JSObject _) implements JSObject { + external factory ReadStreamChunk({ + @JS('r') required int requestId, + @JS('i') required int transactionId, + }); + + @JS('r') + external int requestId; + + /// The same as [HttpRequest.transactionId]. + @JS('i') + external int transactionId; +} diff --git a/packages/powersync/lib/src/web/http/server.dart b/packages/powersync/lib/src/web/http/server.dart new file mode 100644 index 00000000..6e0d87e9 --- /dev/null +++ b/packages/powersync/lib/src/web/http/server.dart @@ -0,0 +1,115 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:js_interop'; +import 'dart:typed_data'; + +import 'package:http/http.dart'; + +import 'protocol.dart'; + +/// A proxy exposing an HTTP [Client] through a message port protocol. +final class RemoteHttpServer { + /// The http client to expose to a worker. + final Client client; + + final Map _pendingTransactions = {}; + + RemoteHttpServer(this.client); + + /// Handles an http request, returning the serialized response. + /// + /// The response does not include a body, which must be read in chunks via + /// [readResponse]. + Future handle(HttpRequest request) async { + final state = _HttpRequest(); + _pendingTransactions[request.transactionId] = state; + + final inner = AbortableRequest(request.method, Uri.parse(request.uri), + abortTrigger: state._abortController.future); + inner.bodyBytes = request.body.toDart.asUint8List(); + request.decodedHeaders.forEach((k, v) => inner.headers[k] = v); + + final response = await client.send(inner); + state.response = StreamIterator(response.stream); + + return HttpResponse( + statusCode: response.statusCode, + headers: json.encode(response.headers), + ); + } + + /// Reads a chunk of an HTTP response stream. + /// + /// Returns the chunk as an array buffer, or null if the end of the stream has + /// been reached. + Future readResponse(int transactionId) async { + final state = _pendingTransactions[transactionId]; + final response = state?.response; + if (state == null || response == null) { + throw ArgumentError('Unknown HTTP transaction: $transactionId'); + } + + if (await response.moveNext()) { + return _byteListToArrayBuffer(response.current); + } else if (state._abortController.isCompleted) { + throw RequestAbortedException(); + } else { + // End of stream + _pendingTransactions.remove(transactionId); + state.close(); + + return null; + } + } + + void abort(int transactionId, bool cancelStream) { + _pendingTransactions.remove(transactionId)?.abort(cancelStream); + } + + void forceClose() { + for (final pending in _pendingTransactions.values) { + pending.close(); + } + _pendingTransactions.clear(); + + client.close(); + } + + static JSArrayBuffer _byteListToArrayBuffer(List bytes) { + if (bytes is Uint8List) { + final buffer = bytes.buffer; + if (bytes.offsetInBytes == 0 && buffer.lengthInBytes == bytes.length) { + // Not a sublist view, we can transfer the buffer at once. + return buffer.toJS; + } + } + + return Uint8List.fromList(bytes).buffer.toJS; + } +} + +final class _HttpRequest { + var _closed = false; + + final Completer _abortController = Completer.sync(); + StreamIterator>? response; + + void close() { + if (!_closed) { + _closed = true; + + response?.cancel(); + abort(false); + } + } + + void abort(bool abortStream) { + if (!_abortController.isCompleted) { + if (abortStream) { + response?.cancel(); + } + + _abortController.complete(); + } + } +} diff --git a/packages/powersync/lib/src/web/sync_controller.dart b/packages/powersync/lib/src/web/sync_controller.dart index aa8bf88a..66854748 100644 --- a/packages/powersync/lib/src/web/sync_controller.dart +++ b/packages/powersync/lib/src/web/sync_controller.dart @@ -4,7 +4,7 @@ import 'dart:js_interop'; import 'package:meta/meta.dart'; import 'package:powersync/src/web/worker_utils.dart'; import 'package:sqlite_async/web.dart'; -import 'package:web/web.dart'; +import 'package:web/web.dart' hide Client; import '../connector.dart'; import '../database/powersync_database.dart'; @@ -37,6 +37,7 @@ class SyncWorkerHandle implements StreamingSync { ? EventStreamProviders.errorEvent.forTarget(worker) : null, logger: database.logger, + exposedHttpClient: options.httpClient?.call(), requestHandler: (type, payload) async { switch (type) { case SyncWorkerMessageType.requestEndpoint: @@ -146,6 +147,7 @@ class SyncWorkerHandle implements StreamingSync { ResolvedSyncOptions(options), database.schema, subscriptions, + options.httpClient != null, ); } diff --git a/packages/powersync/lib/src/web/sync_worker.dart b/packages/powersync/lib/src/web/sync_worker.dart index df225c08..7df8a665 100644 --- a/packages/powersync/lib/src/web/sync_worker.dart +++ b/packages/powersync/lib/src/web/sync_worker.dart @@ -10,7 +10,6 @@ import 'dart:js_interop'; import 'package:async/async.dart'; import 'package:collection/collection.dart'; -import 'package:http/browser_client.dart'; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:powersync/powersync.dart'; @@ -22,6 +21,7 @@ import 'package:sqlite_async/web.dart'; import 'package:web/web.dart' hide RequestMode; import '../database/powersync_database.dart'; +import 'http/client.dart'; import 'sync_worker_protocol.dart'; import 'web_bucket_storage.dart'; @@ -91,6 +91,9 @@ class ConnectedClient { final encodedAppMetadata => Map.from( jsonDecode(encodedAppMetadata) as Map), }, + httpClient: request.customHttpClient == true + ? () => RemoteHttpClient(channel) + : null, ); _runner = _worker._referenceSyncTask( @@ -320,7 +323,6 @@ class SyncRunner { ), crudUpdateTriggerStream: crudStream, options: options, - client: BrowserClient(), identifier: identifier, activeSubscriptions: currentStreams, logger: _logger, diff --git a/packages/powersync/lib/src/web/sync_worker_protocol.dart b/packages/powersync/lib/src/web/sync_worker_protocol.dart index ff10663e..e5856f17 100644 --- a/packages/powersync/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync/lib/src/web/sync_worker_protocol.dart @@ -3,7 +3,9 @@ import 'dart:convert'; import 'dart:js_interop'; import 'dart:js_interop_unsafe'; +import 'package:http/http.dart'; import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; import 'package:powersync/src/schema.dart'; import 'package:powersync/src/sync/options.dart'; import 'package:powersync/src/sync/stream.dart'; @@ -14,9 +16,16 @@ import '../log.dart'; import '../platform_specific/web.dart'; import '../sync/streaming_sync.dart'; import '../sync/sync_status.dart'; +import 'http/protocol.dart'; +import 'http/server.dart'; /// Names used in [SyncWorkerMessage] enum SyncWorkerMessageType { + /// Probes whether the remote is still active. + /// + /// This makes the remote acquire a randomly-named navigator lock and reply + /// with its name. Once the other end can acquire that, it knows the port is + /// closed. ping, /// Sent from client to the sync worker to request the synchronization @@ -58,6 +67,22 @@ enum SyncWorkerMessageType { /// The payload is a [JSString]. logEvent, + /// Send an HTTP request. + /// + /// The request payload is a [HttpRequest], the response is a [HttpResponse]. + sendHttpRequest, + + /// Abort an in-flight HTTP request. + /// + /// The payload is an [AbortHttpResponse] object. + abortHttpRequest, + + /// Requests a chunk of data from an HTTP response. + /// + /// The request payload is a [ReadStreamChunk], the response is a + /// [JSArrayBuffer] (or `null` if the end of the response was reached). + readResponseChunk, + okResponse, errorResponse, } @@ -84,6 +109,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { String? syncParamsEncoded, UpdateSubscriptions? subscriptions, String? appMetadataEncoded, + bool? customHttpClient, }); external String get databaseName; @@ -96,6 +122,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { external UpdateSubscriptions? get subscriptions; external String? get appMetadataEncoded; external String get lockName; + external bool? get customHttpClient; } @anonymous @@ -155,11 +182,16 @@ extension type OkResponse._(JSObject _) implements JSObject { extension type ErrorResponse._(JSObject _) implements JSObject { external factory ErrorResponse({ required int requestId, + required int? recognizedType, required JSString errorMessage, }); external int get requestId; + external int? get recognizedType; external JSString get errorMessage; + + static const recognizedTypeNone = 0; + static const recognizedTypeRequestAbortedException = 1; } @anonymous @@ -354,7 +386,8 @@ final class WorkerCommunicationChannel { /// The name of a navigator lock held by this channel, it's sent to the remote /// when requested so that it can detect when this channel is closed. - late final Future _lockName = _acquireLock(); + @visibleForTesting + late final Future lockName = _acquireLock(); final MessagePort port; final FutureOr<(JSAny?, JSArray?)> Function(SyncWorkerMessageType, JSAny) @@ -362,6 +395,7 @@ final class WorkerCommunicationChannel { final StreamController<(SyncWorkerMessageType, JSAny)> _events = StreamController(); final Logger _logger; + final RemoteHttpServer? _httpServer; Stream<(SyncWorkerMessageType, JSAny)> get events => _events.stream; @@ -372,7 +406,11 @@ final class WorkerCommunicationChannel { required this.requestHandler, Stream? errors, Logger? logger, - }) : _logger = logger ?? autoLogger { + Client? exposedHttpClient, + }) : _logger = logger ?? autoLogger, + _httpServer = exposedHttpClient == null + ? null + : RemoteHttpServer(exposedHttpClient) { port.start(); _incomingErrors = errors?.listen((event) { _hasError = true; @@ -405,15 +443,37 @@ final class WorkerCommunicationChannel { case SyncWorkerMessageType.invalidCredentialsCallback: case SyncWorkerMessageType.uploadCrud: requestId = (message.payload as JSNumber).toDartInt; + case SyncWorkerMessageType.sendHttpRequest: + final request = message.payload as HttpRequest; + return _respond(request.requestId, + () async => (await _httpServer!.handle(request), null)); + case SyncWorkerMessageType.abortHttpRequest: + final payload = message.payload as AbortHttpResponse; + _httpServer!.abort(payload.transactionId, payload.cancelStream); + return; + case SyncWorkerMessageType.readResponseChunk: + final request = message.payload as ReadStreamChunk; + return _respond(request.requestId, () async { + return switch ( + await _httpServer!.readResponse(request.transactionId)) { + null => (null, null), + final buffer => (buffer, [buffer].toJS), + }; + }); case SyncWorkerMessageType.okResponse: final payload = message.payload as OkResponse; _pendingRequests.remove(payload.requestId)!.complete(payload.payload); return; case SyncWorkerMessageType.errorResponse: final payload = message.payload as ErrorResponse; - _pendingRequests - .remove(payload.requestId)! - .completeError(payload.errorMessage.toDart); + final error = switch ( + payload.recognizedType ?? ErrorResponse.recognizedTypeNone) { + ErrorResponse.recognizedTypeRequestAbortedException => + RequestAbortedException(), + _ => payload.errorMessage.toDart, + }; + + _pendingRequests.remove(payload.requestId)!.completeError(error); return; case SyncWorkerMessageType.notifySyncStatus: _events.add((type, message.payload)); @@ -446,7 +506,14 @@ final class WorkerCommunicationChannel { port.postMessage(SyncWorkerMessage( type: SyncWorkerMessageType.errorResponse.name, payload: ErrorResponse( - requestId: requestId, errorMessage: e.toString().toJS), + requestId: requestId, + recognizedType: switch (e) { + RequestAbortedException() => + ErrorResponse.recognizedTypeRequestAbortedException, + _ => ErrorResponse.recognizedTypeNone, + }, + errorMessage: e.toString().toJS, + ), )); } } @@ -473,7 +540,11 @@ final class WorkerCommunicationChannel { } Future ping() async { - await _numericRequest(SyncWorkerMessageType.ping); + final response = await _numericRequest(SyncWorkerMessageType.ping); + if (response.isA()) { + // Once we're able to acquire this lock, we know the remote has closed. + potentiallySharedMutex((response as JSString).toDart).lock(close); + } } void observeRemoteLockName(String name) { @@ -489,6 +560,7 @@ final class WorkerCommunicationChannel { ResolvedSyncOptions options, Schema schema, List streams, + bool customHttpClient, ) async { final (id, completion) = _newRequest(); port.postMessage(SyncWorkerMessage( @@ -509,7 +581,8 @@ final class WorkerCommunicationChannel { null => null, final appMetadata => jsonEncode(appMetadata), }, - lockName: await _lockName, + lockName: await lockName, + customHttpClient: customHttpClient, ), )); await completion; @@ -554,11 +627,37 @@ final class WorkerCommunicationChannel { await _numericRequest(SyncWorkerMessageType.uploadCrud); } + Future sendHttpRequest(HttpRequest request) async { + final (id, completion) = _newRequest(); + request.requestId = id; + port.postMessage( + SyncWorkerMessage( + payload: request, + type: SyncWorkerMessageType.sendHttpRequest.name, + ), + [request.body].toJS, + ); + return (await completion) as HttpResponse; + } + + Future readHttpResponseChunk(int transactionId) async { + final (id, completion) = _newRequest(); + port.postMessage( + SyncWorkerMessage( + type: SyncWorkerMessageType.readResponseChunk.name, + payload: ReadStreamChunk(requestId: id, transactionId: transactionId), + ), + ); + + return (await completion) as JSArrayBuffer?; + } + Future close() async { if (!_closed.isCompleted) { _incomingMessages?.cancel(); _incomingErrors?.cancel(); port.close(); + _httpServer?.forceClose(); for (final pending in _pendingRequests.values) { pending.completeError(const ChannelClosedException()); diff --git a/packages/powersync/test/sync/custom_client_test.dart b/packages/powersync/test/sync/custom_client_test.dart new file mode 100644 index 00000000..8418f7f4 --- /dev/null +++ b/packages/powersync/test/sync/custom_client_test.dart @@ -0,0 +1,50 @@ +import 'package:http/http.dart'; +import 'package:powersync/powersync.dart'; +import 'package:test/test.dart'; + +import '../server/sync_server/in_memory_sync_server.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/in_memory_http.dart'; +import '../utils/test_utils_impl.dart'; +import 'utils.dart'; + +void main() { + late PowerSyncDatabase powersync; + late String path; + + setUp(() async { + path = _testUtils.dbPath(); + await _testUtils.cleanDb(path: path); + + powersync = await _testUtils.setupPowerSync(path: path); + }); + + tearDown(() => powersync.close()); + + test('can use custom http client', () async { + await powersync.connect( + connector: TestConnector( + () async => PowerSyncCredentials( + endpoint: 'http://test.powersync.example.org', token: 'token'), + ), + options: SyncOptions( + httpClient: _createMockClient, + ), + ); + + await powersync.waitForFirstSync(); + }); +} + +Client _createMockClient() { + final (client, server) = inMemoryServer(); + final service = MockSyncService(); + server.mount((r) => service.router(r)); + + service + ..addLine(checkpoint(lastOpId: 1)) + ..addLine(checkpointComplete()); + return client; +} + +final _testUtils = TestUtils(); diff --git a/packages/powersync/test/utils/abstract_test_utils.dart b/packages/powersync/test/utils/abstract_test_utils.dart index aabe3955..1c1096df 100644 --- a/packages/powersync/test/utils/abstract_test_utils.dart +++ b/packages/powersync/test/utils/abstract_test_utils.dart @@ -168,10 +168,13 @@ final class TestDatabase extends BasePowerSyncDatabase { required AbortController abort, required Zone asyncWorkZone, }) async { + if (httpClient case final client?) { + options = options.applyFrom(SyncOptions(httpClient: () => client)).$1; + } + final impl = StreamingSyncImplementation( adapter: BucketStorage(this), schemaJson: jsonEncode(this.schema), - client: httpClient!, options: options, connector: InternalConnector.wrap(connector, this), logger: logger, @@ -207,27 +210,3 @@ final class TestDatabase extends BasePowerSyncDatabase { debugContext: debugContext, lockTimeout: lockTimeout); } } - -extension MockSync on PowerSyncDatabase { - StreamingSyncImplementation connectWithMockService( - Client client, - PowerSyncBackendConnector connector, { - Logger? logger, - SyncOptions options = const SyncOptions(retryDelay: Duration(seconds: 5)), - Schema? customSchema, - }) { - final impl = StreamingSyncImplementation( - adapter: BucketStorage(this), - schemaJson: jsonEncode(customSchema ?? schema), - client: client, - options: ResolvedSyncOptions(options), - connector: InternalConnector.wrap(connector, this), - logger: logger, - crudUpdateTriggerStream: database - .onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)), - ); - impl.statusStream.listen(setStatus); - - return impl; - } -} diff --git a/packages/powersync/test/web/http_test.dart b/packages/powersync/test/web/http_test.dart new file mode 100644 index 00000000..4f0160a2 --- /dev/null +++ b/packages/powersync/test/web/http_test.dart @@ -0,0 +1,188 @@ +@TestOn('browser') +library; + +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:http/http.dart'; +import 'package:http/testing.dart'; +import 'package:powersync/src/web/http/client.dart'; +import 'package:powersync/src/web/sync_worker_protocol.dart'; +import 'package:test/test.dart'; +import 'package:web/web.dart' show MessageChannel; + +void main() { + final uri = Uri.parse('https://powersync.com/foo/bar'); + + test('can send http requests', () async { + final (client, _) = await createRemoteClient(MockClient((request) async { + expect(request.url, uri); + expect(request.method, 'POST'); + expect(request.headers, containsPair('Foo', 'Bar')); + expect(request.body, 'body'); + + return Response('ok', 200, headers: {'Response': 'Ok'}); + })); + + final response = + await client.post(uri, headers: {'Foo': 'Bar'}, body: 'body'); + expect(response.statusCode, 200); + expect(response.headers, {'Response': 'Ok'}); + expect(response.body, 'ok'); + }); + + test('response stream control', () async { + final responseStream = StreamController(); + final (client, _) = + await createRemoteClient(MockClient.streaming((request, stream) async { + await stream.drain(); + return StreamedResponse(responseStream.stream, 200); + })); + + final response = await client.send(Request('GET', uri)); + expect(responseStream.hasListener, isFalse); + + final receivedChunks = >[]; + final sub = response.stream.listen(receivedChunks.add); + await pumpEventQueue(); + expect(responseStream.hasListener, isTrue); + expect(responseStream.isPaused, isFalse); + + responseStream.add(Uint8List(123)); + await pumpEventQueue(); + expect(receivedChunks, [hasLength(123)]); + + responseStream.add(Uint8List(42)); + sub.pause(); + await pumpEventQueue(); + expect(responseStream.isPaused, isTrue); + + sub.resume(); + await pumpEventQueue(); + expect(responseStream.isPaused, isFalse); + + sub.cancel(); + await pumpEventQueue(); + expect(responseStream.hasListener, isFalse); + }); + + group('can abort', () { + test('before receiving response', () async { + final (client, _) = + await createRemoteClient(MockClient.streaming((request, _) async { + await (request as Abortable).abortTrigger!; + throw RequestAbortedException(); + })); + + await expectLater( + client.send(AbortableRequest('GET', uri, abortTrigger: Future.value())), + throwsA(isA()), + ); + }); + + test('in response', () async { + final abort = Completer(); + final responseStream = StreamController(); + var aborted = false; + + final (client, _) = + await createRemoteClient(MockClient.streaming((request, _) async { + (request as Abortable).abortTrigger!.whenComplete(() { + aborted = true; + responseStream + ..addError(RequestAbortedException()) + ..close(); + }); + return StreamedResponse(responseStream.stream, 200); + })); + + final response = await client + .send(AbortableRequest('GET', uri, abortTrigger: abort.future)); + responseStream.add(Uint8List(42)); + final receivedResponseStream = StreamQueue(response.stream); + await expectLater(receivedResponseStream, emits(hasLength(42))); + + abort.complete(); + await expectLater( + receivedResponseStream, emitsError(isA())); + expect(aborted, isTrue); + }); + + test('via stream cancel', () async { + final responseStream = StreamController(); + + final (client, _) = + await createRemoteClient(MockClient.streaming((request, _) async { + return StreamedResponse(responseStream.stream, 200); + })); + + final response = await client.send(AbortableRequest('GET', uri)); + responseStream.add(Uint8List(42)); + + final receivedResponseStream = StreamQueue(response.stream); + await expectLater(receivedResponseStream, emits(hasLength(42))); + await receivedResponseStream.cancel(); + await pumpEventQueue(); + expect(responseStream.hasListener, isFalse); + }); + }); + + group('reacts to closed channel', () { + test('before receiving response', () async { + late Client client; + late WorkerCommunicationChannel channel; + + (client, channel) = + await createRemoteClient(MockClient.streaming((request, _) async { + channel.close(); + return StreamedResponse(Stream.empty(), 200); + })); + + await expectLater(client.send(AbortableRequest('GET', uri)), + throwsA(isA())); + }); + + test('in response stream', () async { + final responseStream = StreamController(); + final (client, channel) = + await createRemoteClient(MockClient.streaming((request, _) async { + return StreamedResponse(responseStream.stream, 200); + })); + + final response = await client.send(AbortableRequest('GET', uri)); + responseStream.add(Uint8List(42)); + final receivedResponseStream = StreamQueue(response.stream); + await expectLater(receivedResponseStream, emits(hasLength(42))); + + final expectation = expectLater( + receivedResponseStream, emitsError(isA())); + await pumpEventQueue(); + channel.close(); + await expectation; + }); + }); +} + +Future<(Client, WorkerCommunicationChannel)> createRemoteClient( + Client original) async { + final channel = MessageChannel(); + + final local = WorkerCommunicationChannel( + port: channel.port1, + requestHandler: (type, payload) async { + throw UnimplementedError(); + }, + exposedHttpClient: original, + ); + final remote = WorkerCommunicationChannel( + port: channel.port2, + requestHandler: (type, payload) async { + throw UnimplementedError(); + }, + ); + local.observeRemoteLockName(await remote.lockName); + remote.observeRemoteLockName(await local.lockName); + + return (RemoteHttpClient(remote), local); +} diff --git a/packages/powersync/test/web/sync_worker_test.dart b/packages/powersync/test/web/sync_worker_test.dart index b9f03066..8939bd9e 100644 --- a/packages/powersync/test/web/sync_worker_test.dart +++ b/packages/powersync/test/web/sync_worker_test.dart @@ -11,6 +11,10 @@ import 'package:powersync/src/web/sync_worker.dart'; import 'package:test/test.dart'; import 'package:web/web.dart' show MessageChannel; +import '../server/sync_server/in_memory_sync_server.dart'; +import '../sync/utils.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/in_memory_http.dart'; import '../utils/web_test_utils.dart'; void main() { @@ -96,6 +100,25 @@ void main() { expect(syncRunner.sync, isNull); expect(syncRunner.connections, isEmpty); }); + + test('can use custom http client', () async { + final (client, server) = inMemoryServer(); + final service = MockSyncService(); + server.mount((r) => service.router(r)); + service + ..addLine(checkpoint(lastOpId: 1)) + ..addLine(checkpointComplete()); + + final handle = createWorkerHandle( + connector: TestConnector( + () async => PowerSyncCredentials( + endpoint: 'http://test.powersync.example.org', token: 'token'), + ), + options: SyncOptions(httpClient: () => client), + ); + await handle.streamingSync(); + await db.waitForFirstSync(); + }); } final class _ThrowingBackendConnector extends PowerSyncBackendConnector {