Skip to content

Instantly share code, notes, and snippets.

@buehler
Last active June 4, 2024 15:21
Show Gist options
  • Save buehler/97da456110c696f6ecbbabf0b70229ed to your computer and use it in GitHub Desktop.
Save buehler/97da456110c696f6ecbbabf0b70229ed to your computer and use it in GitHub Desktop.
Generic Dart Isolate Worker, allows spawning a new isolate in dart and compute _anything_ in futures or streams that can be transmitted to isolates. Works with closures instead of messages.
import 'dart:async';
import 'dart:isolate';
class IsolateWorker {
IsolateWorker._(this._receivePort, this._sendPort);
final ReceivePort _receivePort;
final SendPort _sendPort;
bool _isDisposed = false;
static Future<IsolateWorker> create({String? debugName}) async {
final receivePort = ReceivePort();
await Isolate.spawn(
_workerEntry,
receivePort.sendPort,
debugName: debugName,
);
return IsolateWorker._(receivePort, await receivePort.first as SendPort);
}
static void _workerEntry(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
late final StreamSubscription sub;
sub = receivePort.listen((message) async {
switch (message) {
case _ComputeCall(:final sendPort, :final computation):
sendPort.send(await computation());
break;
case _StreamComputeCall(:final sendPort, :final computation):
await for (final value in computation()) {
sendPort.send(value);
}
sendPort.send(_StreamDone());
break;
default:
sub.cancel();
receivePort.close();
}
});
}
Future<R> compute<R>(FutureOr<R> Function() computation) async {
if (_isDisposed) {
throw StateError('Worker is already disposed');
}
final sendPort = ReceivePort();
_sendPort.send(_ComputeCall(sendPort.sendPort, computation));
return await sendPort.first as R;
}
Stream<R> computeStream<R>(Stream<R> Function() computation) async* {
if (_isDisposed) {
throw StateError('Worker is already disposed');
}
final sendPort = ReceivePort();
_sendPort.send(_StreamComputeCall(sendPort.sendPort, computation));
await for (final value in sendPort) {
if (value is _StreamDone) {
break;
}
yield value as R;
}
}
void dispose() {
try {
_isDisposed = true;
_sendPort.send(null);
_receivePort.close();
} catch (e) {
print('Failed to dispose worker (possible double-dispose?): $e');
}
}
}
class _ComputeCall<R> {
_ComputeCall(this.sendPort, this.computation);
final SendPort sendPort;
final FutureOr<R> Function() computation;
}
class _StreamComputeCall<R> {
_StreamComputeCall(this.sendPort, this.computation);
final SendPort sendPort;
final Stream<R> Function() computation;
}
class _StreamDone {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment