Не подтверждена Коммит 7ecc4cc0 создал по автору Simon Binder's avatar Simon Binder
Просмотр файлов

sqlite web: Export message port to database

владелец 2a4778b5
......@@ -53,6 +53,11 @@ final class RemoteDatabase implements Database {
}
}
@override
Future<void> get closed {
return connection.closed;
}
@override
Future<void> dispose() async {
_isClosed = true;
......@@ -125,6 +130,16 @@ final class RemoteDatabase implements Database {
final result = await select('pragma user_version;');
return result.single[0] as int;
}
@override
Future<SqliteWebEndpoint> additionalConnection() async {
final response = await connection.sendRequest(
OpenAdditonalConnection(requestId: 0, databaseId: databaseId),
MessageType.endpointResponse,
);
final endpoint = response.endpoint;
return (endpoint.port, endpoint.lockName!);
}
}
final class WorkerConnection extends ProtocolChannel {
......@@ -301,6 +316,19 @@ final class DatabaseClient implements WebSqlite {
);
}
Future<Database> connectToExisting(SqliteWebEndpoint endpoint) async {
final channel = WorkerConnection(
WebEndpoint(port: endpoint.$1, lockName: endpoint.$2).connect());
return RemoteDatabase(
connection: channel,
// The database id for this pre-existing connection is always zero.
// It gets assigned by the worker handling the OpenAdditonalConnection
// request.
databaseId: 0,
);
}
@override
Future<Database> connect(
String name, StorageMode type, AccessMode access) async {
......
import 'dart:js_interop';
import 'package:sqlite3/wasm.dart';
import 'package:web/web.dart' hide FileSystem;
import 'types.dart';
import 'client.dart';
......@@ -29,6 +30,16 @@ abstract base class DatabaseController {
ClientConnection connection, JSAny? request);
}
/// An endpoint that can be used, by any running JavaScript context in the same
/// website, to connect to an existing [Database].
///
/// These endpoints are created by calling [Database.additionalConnection] and
/// consist of a [MessagePort] and a [String] internally identifying the
/// connection. Both objects can be transferred over send ports towards another
/// worker or context. That context can then use [WebSqlite.connectToPort] to
/// connect to the port already opened.
typedef SqliteWebEndpoint = (MessagePort, String);
/// Abstraction over a database either available locally or in a remote worker.
abstract class Database {
FileSystem get fileSystem;
......@@ -39,6 +50,15 @@ abstract class Database {
/// stream is active.
Stream<SqliteUpdate> get updates;
/// A future that resolves when the database is closed.
///
/// Typically, databases are closed because [dispose] is called. For databases
/// opened with [WebSqlite.connectToPort] however, it's possible that the
/// original worker hosting the database gets closed without this [Database]
/// instance being explicitly [dispose]d. In those cases, monitoring [closed]
/// is useful to react to databases closing.
Future<void> get closed;
/// Closes this database and instructs the worker to release associated
/// resources.
///
......@@ -67,6 +87,12 @@ abstract class Database {
/// Custom requests are handled by implementing `handleCustomRequest` in your
/// `WorkerDatabase` subclass.
Future<JSAny?> customRequest(JSAny? request);
/// Creates a [MessagePort] (a transferrable object that can be sent to
/// another JavaScript context like a worker) that can be used with
/// [WebSqlite.connectToPort] to open another instance of this database
/// remotely.
Future<SqliteWebEndpoint> additionalConnection();
}
/// A connection from a client from the perspective of a worker.
......@@ -157,4 +183,9 @@ abstract class WebSqlite {
}) {
return DatabaseClient(worker, wasmModule);
}
static Future<Database> connectToPort(SqliteWebEndpoint endpoint) {
final client = DatabaseClient(Uri.base, Uri.base);
return client.connectToExisting(endpoint);
}
}
......@@ -28,7 +28,9 @@ enum MessageType<T extends Message> {
simpleSuccessResponse<SimpleSuccessResponse>(),
rowsResponse<RowsResponse>(),
errorResponse<ErrorResponse>(),
endpointResponse<EndpointResponse>(),
closeDatabase<CloseDatabase>(),
openAdditionalConnection<OpenAdditonalConnection>(),
notifyUpdate<UpdateNotification>(),
;
......@@ -86,9 +88,12 @@ sealed class Message {
MessageType.fileSystemAccess => FileSystemAccess.deserialize(object),
MessageType.connect => ConnectRequest.deserialize(object),
MessageType.closeDatabase => CloseDatabase.deserialize(object),
MessageType.openAdditionalConnection =>
OpenAdditonalConnection.deserialize(object),
MessageType.updateRequest => UpdateStreamRequest.deserialize(object),
MessageType.simpleSuccessResponse =>
SimpleSuccessResponse.deserialize(object),
MessageType.endpointResponse => EndpointResponse.deserialize(object),
MessageType.rowsResponse => RowsResponse.deserialize(object),
MessageType.errorResponse => ErrorResponse.deserialize(object),
MessageType.notifyUpdate => UpdateNotification.deserialize(object),
......@@ -428,7 +433,7 @@ final class RunQuery extends Request {
}
}
class CloseDatabase extends Request {
final class CloseDatabase extends Request {
CloseDatabase({required super.requestId, required super.databaseId});
factory CloseDatabase.deserialize(JSObject object) {
......@@ -440,6 +445,23 @@ class CloseDatabase extends Request {
MessageType<Message> get type => MessageType.closeDatabase;
}
final class OpenAdditonalConnection extends Request {
OpenAdditonalConnection({
required super.requestId,
super.databaseId,
});
factory OpenAdditonalConnection.deserialize(JSObject object) {
return OpenAdditonalConnection(
requestId: object.requestId,
databaseId: object.databaseId,
);
}
@override
MessageType<Message> get type => MessageType.openAdditionalConnection;
}
final class SimpleSuccessResponse extends Response {
final JSAny? response;
......@@ -462,6 +484,29 @@ final class SimpleSuccessResponse extends Response {
}
}
final class EndpointResponse extends Response {
final WebEndpoint endpoint;
EndpointResponse({required super.requestId, required this.endpoint});
factory EndpointResponse.deserialize(JSObject object) {
return EndpointResponse(
requestId: object.requestId,
endpoint: object[_UniqueFieldNames.responseData] as WebEndpoint,
);
}
@override
MessageType<Message> get type => MessageType.endpointResponse;
@override
void serialize(JSObject object, List<JSObject> transferred) {
super.serialize(object, transferred);
object[_UniqueFieldNames.responseData] = endpoint;
transferred.add(endpoint.port);
}
}
final class RowsResponse extends Response {
final ResultSet resultSet;
......
......@@ -106,9 +106,11 @@ final class Shared extends WorkerEnvironment {
/// A database opened by a client.
final class _ConnectionDatabase {
final DatabaseState database;
final int id;
StreamSubscription<SqliteUpdate>? updates;
_ConnectionDatabase(this.database);
_ConnectionDatabase(this.database, [int? id]) : id = id ?? database.id;
Future<void> close() async {
updates?.cancel();
......@@ -173,17 +175,19 @@ final class _ClientConnection extends ProtocolChannel
case OpenRequest():
await _runner.loadWasmModule(request.wasmUri);
DatabaseState? database;
_ConnectionDatabase? connectionDatabase;
try {
database =
_runner.findDatabase(request.databaseName, request.storageMode);
await database.opened;
_openedDatabases.add(_ConnectionDatabase(database));
connectionDatabase = _ConnectionDatabase(database);
_openedDatabases.add(connectionDatabase);
return SimpleSuccessResponse(
response: database.id.toJS, requestId: request.requestId);
} catch (e) {
if (database != null) {
_openedDatabases.remove(database.id);
_openedDatabases.remove(connectionDatabase);
await database.decrementRefCount();
}
......@@ -220,6 +224,16 @@ final class _ClientConnection extends ProtocolChannel
}
return SimpleSuccessResponse(
response: null, requestId: request.requestId);
case OpenAdditonalConnection():
final database = _databaseFor(request)!.database;
database.refCount++;
final (endpoint, channel) = await createChannel();
final client = _runner._accept(channel);
client._openedDatabases.add(_ConnectionDatabase(database, 0));
return EndpointResponse(
requestId: request.requestId, endpoint: endpoint);
case CloseDatabase():
_openedDatabases.remove(database!);
await database.close();
......@@ -247,7 +261,7 @@ final class _ClientConnection extends ProtocolChannel
_ConnectionDatabase? _databaseFor(Request request) {
if (request.databaseId case final id?) {
return _openedDatabases.firstWhere((e) => e.database.id == id);
return _openedDatabases.firstWhere((e) => e.id == id);
} else {
return null;
}
......@@ -381,11 +395,13 @@ final class WorkerRunner {
}
}
void _accept(StreamChannel<Message> channel) {
_ClientConnection _accept(StreamChannel<Message> channel) {
final connection = _ClientConnection(
runner: this, channel: channel, id: _nextConnectionId++);
_connections.add(connection);
connection.closed.whenComplete(() => _connections.remove(connection));
return connection;
}
Future<CompatibilityResult> checkCompatibility(CompatibilityCheck check) {
......
Поддерживает Markdown
0% или .
You are about to add 0 people to the discussion. Proceed with caution.
Сначала завершите редактирование этого сообщения!
Пожалуйста, зарегистрируйтесь или чтобы прокомментировать