Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import 'dart:convert';
import 'dart:isolate';
import 'package:meta/meta.dart';

import 'package:http/http.dart' as http;
import 'package:logging/logging.dart';
import 'package:powersync/src/abort_controller.dart';
import 'package:powersync/src/sync/bucket_storage.dart';
Expand Down Expand Up @@ -284,7 +283,6 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
crudUpdateTriggerStream: database
.onChange(['ps_crud'], throttle: args.options.crudThrottleTime),
options: args.options,
client: http.Client(),
syncMutex: mutexes.mutex('sync'),
crudMutex: mutexes.mutex('crud'),
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'dart:async';
import 'dart:convert';
import 'package:meta/meta.dart';
import 'package:http/browser_client.dart';
import 'package:powersync/src/abort_controller.dart';
import 'package:powersync/src/sync/bucket_storage.dart';
import 'package:powersync/src/connector.dart';
Expand Down Expand Up @@ -75,7 +74,6 @@ final class WebPowerSyncDatabase extends BasePowerSyncDatabase {
connector: InternalConnector.wrap(connector, this),
crudUpdateTriggerStream: crudStream,
options: options,
client: BrowserClient(),
activeSubscriptions: initiallyActiveStreams,
// Only allows 1 sync implementation to run at a time per database
// This should be global (across tabs) when using Navigator locks.
Expand Down
4 changes: 3 additions & 1 deletion packages/powersync/lib/src/setup/web.dart
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ Future<void> downloadWebAssets(List<String> arguments) async {
return;
}

final httpClient = HttpClient();
try {
final httpClient = HttpClient();
const sqlitePackageName = 'sqlite3';

final (tag: powersyncTag, version: powerSyncVersion) =
Expand Down Expand Up @@ -112,6 +112,8 @@ Future<void> downloadWebAssets(List<String> arguments) async {
} catch (e) {
print(e);
exit(1);
} finally {
httpClient.close();
}
}

Expand Down
23 changes: 23 additions & 0 deletions packages/powersync/lib/src/sync/options.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import 'package:collection/collection.dart';
import 'package:http/http.dart';
import 'package:meta/meta.dart';

/// The signature of a function creating a http [Client] to use by the PowerSync
/// client.
///
/// PowerSync will use [Client.new] by default, but a custom factory can be used
/// as [SyncOptions.httpClient]. This allows transforming requests and
/// responses, e.g. to add additional headers or allow custom TLS certificates.
///
/// On native platforms, these functions are sent across send ports (and thus
/// must not capture non-sendable state).
typedef HttpClientFactory = Client Function();

/// Options that affect how the sync client connects to the sync service.
final class SyncOptions {
/// A map of application metadata that is passed to the PowerSync service.
Expand Down Expand Up @@ -38,13 +50,20 @@ final class SyncOptions {
/// This is enabled by default.
final bool? includeDefaultStreams;

/// A function to create http clients used by the PowerSync SDK.
///
/// Custom clients can be used to configure TLS options, inject additional
/// headers, or otherwise customize networking.
final HttpClientFactory? httpClient;

const SyncOptions({
this.crudThrottleTime,
this.retryDelay,
this.params,
this.syncImplementation = SyncClientImplementation.defaultClient,
this.includeDefaultStreams,
this.appMetadata,
this.httpClient,
});

SyncOptions _copyWith({
Expand All @@ -60,6 +79,7 @@ final class SyncOptions {
syncImplementation: syncImplementation,
includeDefaultStreams: includeDefaultStreams,
appMetadata: appMetadata ?? this.appMetadata,
httpClient: httpClient,
);
}
}
Expand Down Expand Up @@ -109,6 +129,8 @@ extension type ResolvedSyncOptions(SyncOptions source) {

bool get includeDefaultStreams => source.includeDefaultStreams ?? true;

Client createHttpClient() => source.httpClient?.call() ?? Client();

(ResolvedSyncOptions, bool) applyFrom(SyncOptions other) {
final newOptions = SyncOptions(
crudThrottleTime: other.crudThrottleTime ?? crudThrottleTime,
Expand All @@ -118,6 +140,7 @@ extension type ResolvedSyncOptions(SyncOptions source) {
includeDefaultStreams:
other.includeDefaultStreams ?? includeDefaultStreams,
appMetadata: other.appMetadata ?? appMetadata,
httpClient: other.httpClient ?? source.httpClient,
);

final didChange = !_mapEquality.equals(newOptions.params, params) ||
Expand Down
3 changes: 1 addition & 2 deletions packages/powersync/lib/src/sync/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class StreamingSyncImplementation implements StreamingSync {
required this.connector,
required this.crudUpdateTriggerStream,
required this.options,
required http.Client client,
List<SubscribedStream> activeSubscriptions = const [],
Mutex? syncMutex,
Mutex? crudMutex,
Expand All @@ -82,7 +81,7 @@ class StreamingSyncImplementation implements StreamingSync {
/// A unique identifier for this streaming sync implementation
/// A good value is typically the DB file path which it will mutate when syncing.
String? identifier = "unknown",
}) : _client = client,
}) : _client = options.createHttpClient(),
syncMutex = syncMutex ?? potentiallySharedMutex("sync-$identifier"),
crudMutex = crudMutex ?? potentiallySharedMutex("crud-$identifier"),
_userAgentHeaders = userAgentHeaders(),
Expand Down
116 changes: 116 additions & 0 deletions packages/powersync/lib/src/web/http/client.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import 'dart:async';
import 'dart:convert';
import 'dart:js_interop';
import 'dart:typed_data';

import 'package:http/http.dart';

import '../sync_worker_protocol.dart';
import 'protocol.dart';

/// An HTTP client implemented through `postMessage` calls on message ports.
///
/// The sync worker uses this when a connecting tab indicates that a custom
/// HTTP client (which we can't send through ports as an object) should be used.
final class RemoteHttpClient extends BaseClient {
final WorkerCommunicationChannel _channel;

int _nextTransactionId = 0;

RemoteHttpClient(this._channel);

@override
Future<StreamedResponse> send(BaseRequest request) async {
final body = await request.finalize().toBytes();
final bodyBuffer = body.buffer;
// This will always be the case with the toBytes() implementation. We
// couldn't safely transfer the entire buffer without it.
assert(body.offsetInBytes == 0 && body.length == bodyBuffer.lengthInBytes);
final jsBuffer = bodyBuffer.toJS;

final txId = _nextTransactionId++;

// Send request with other port.
final responseFuture = _channel.sendHttpRequest(
HttpRequest(
requestId: 0, // Set by sendHttpRequest
transactionId: txId,
uri: request.url.toString(),
method: request.method,
headers: json.encode(request.headers),
body: jsBuffer,
),
);

if (request is Abortable) {
request.abortTrigger?.whenComplete(() => sendAbort(txId, false));
}

final rawResponse = await responseFuture;

return StreamedResponse(
_ResponseStream(this, txId).stream,
rawResponse.statusCode,
request: request,
headers: rawResponse.decodedHeaders,
);
}

void sendAbort(int txId, bool abortStream) {
_channel.port.postMessage(
SyncWorkerMessage(
type: SyncWorkerMessageType.abortHttpRequest.name,
payload: AbortHttpResponse(
cancelStream: abortStream,
transactionId: txId,
),
),
);
}
}

final class _ResponseStream {
final RemoteHttpClient client;
final int txId;

final streamController = StreamController<Uint8List>(sync: true);
var isFetching = false;

Stream<Uint8List> get stream => streamController.stream;

_ResponseStream(this.client, this.txId) {
streamController
..onListen = fetchIfHasListener
..onResume = fetchIfHasListener
..onCancel = () => client.sendAbort(txId, true);
}

void fetchChunk() async {
assert(!isFetching);
isFetching = true;

try {
final chunk = await client._channel.readHttpResponseChunk(txId);
if (chunk != null) {
streamController.add(chunk.toDart.asUint8List());
} else {
streamController.close();
}
} catch (e, s) {
streamController.addError(e, s);
streamController.close();
} finally {
isFetching = false;
fetchIfHasListener();
}
}

void fetchIfHasListener() {
if (!isFetching &&
streamController.hasListener &&
!streamController.isClosed &&
!streamController.isPaused) {
fetchChunk();
}
}
}
90 changes: 90 additions & 0 deletions packages/powersync/lib/src/web/http/protocol.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import 'dart:async';
import 'dart:convert';
import 'dart:js_interop';

extension type HttpMessage._(JSObject _) implements JSObject {
/// JSON-encoded request or response headers headers.
@JS('h')
external String headers;

Map<String, String> get decodedHeaders {
final decoded = json.decode(headers) as Map<String, Object?>;
return decoded.cast();
}
}

/// A serialized HTTP request.
extension type HttpRequest._(JSObject _) implements HttpMessage {
external factory HttpRequest({
@JS('r') required int requestId,
@JS('i') required int transactionId,
@JS('u') required String uri,
@JS('m') required String method,
@JS('h') required String headers,
@JS('b') required JSArrayBuffer body,
});

@JS('r')
external int requestId;

/// A client-generated id for the HTTP transaction.
///
/// This can be used to identify the request in subsequent commands to read
/// or abort the response.
@JS('i')
external int transactionId;

@JS('u')
external String uri;

@JS('m')
external String method;

/// The full request body (we don't support streaming request bodies, these
/// aren't used by the SDK).
@JS('b')
external JSArrayBuffer body;
}

/// A serialized HTTP response
extension type HttpResponse._(JSObject _) implements HttpMessage {
external factory HttpResponse({
@JS('s') required int statusCode,
@JS('h') required String headers,
});

@JS('s')
external int statusCode;
}

/// A message sent to cancel an in-flight HTTP request.
extension type AbortHttpResponse._(JSObject _) implements JSObject {
external factory AbortHttpResponse({
@JS('r') required bool cancelStream,
@JS('i') required int transactionId,
});

/// Whether the abort is from a [StreamSubscription.cancel] call (as opposed
/// to an abort trigger on an abortable request).
@JS('r')
external bool cancelStream;

/// The same as [HttpRequest.transactionId].
@JS('i')
external int transactionId;
}

/// A request to read a chunk of HTTP response data.
extension type ReadStreamChunk._(JSObject _) implements JSObject {
external factory ReadStreamChunk({
@JS('r') required int requestId,
@JS('i') required int transactionId,
});

@JS('r')
external int requestId;

/// The same as [HttpRequest.transactionId].
@JS('i')
external int transactionId;
}
Loading