Files
umbrix/lib/services/singbox/ffi_singbox_service.dart

334 lines
8.8 KiB
Dart
Raw Normal View History

2023-08-19 22:27:23 +03:30
import 'dart:async';
2023-09-01 15:00:41 +03:30
import 'dart:convert';
2023-08-19 22:27:23 +03:30
import 'dart:ffi';
import 'dart:io';
2023-08-28 13:15:57 +03:30
import 'dart:isolate';
2023-08-19 22:27:23 +03:30
import 'package:combine/combine.dart';
import 'package:ffi/ffi.dart';
import 'package:fpdart/fpdart.dart';
2023-09-10 20:25:04 +03:30
import 'package:hiddify/domain/connectivity/connectivity.dart';
2023-09-01 15:00:41 +03:30
import 'package:hiddify/domain/singbox/config_options.dart';
2023-08-19 22:27:23 +03:30
import 'package:hiddify/gen/singbox_generated_bindings.dart';
2023-09-10 20:25:04 +03:30
import 'package:hiddify/services/singbox/shared.dart';
2023-08-19 22:27:23 +03:30
import 'package:hiddify/services/singbox/singbox_service.dart';
import 'package:hiddify/utils/utils.dart';
2023-08-21 00:54:11 +03:00
import 'package:loggy/loggy.dart';
2023-08-19 22:27:23 +03:30
import 'package:path/path.dart' as p;
2023-09-10 20:25:04 +03:30
import 'package:rxdart/rxdart.dart';
2023-08-19 22:27:23 +03:30
2023-08-21 00:54:11 +03:00
final _logger = Loggy('FFISingboxService');
2023-09-10 20:25:04 +03:30
class FFISingboxService
with ServiceStatus, InfraLogger
implements SingboxService {
2023-08-19 22:27:23 +03:30
static final SingboxNativeLibrary _box = _gen();
2023-09-10 20:25:04 +03:30
late final ValueStream<ConnectionStatus> _connectionStatus;
late final ReceivePort _connectionStatusReceiver;
2023-08-28 13:15:57 +03:30
Stream<String>? _statusStream;
2023-08-29 19:32:31 +03:30
Stream<String>? _groupsStream;
2023-08-28 13:15:57 +03:30
2023-08-19 22:27:23 +03:30
static SingboxNativeLibrary _gen() {
String fullPath = "";
if (Platform.environment.containsKey('FLUTTER_TEST')) {
fullPath = "libcore";
}
if (Platform.isWindows) {
fullPath = p.join(fullPath, "libcore.dll");
} else if (Platform.isMacOS) {
fullPath = p.join(fullPath, "libcore.dylib");
} else {
fullPath = p.join(fullPath, "libcore.so");
}
2023-08-21 00:54:11 +03:00
_logger.debug('singbox native libs path: "$fullPath"');
2023-08-19 22:27:23 +03:30
final lib = DynamicLibrary.open(fullPath);
return SingboxNativeLibrary(lib);
}
2023-09-10 20:25:04 +03:30
@override
Future<void> init() async {
loggy.debug("initializing");
_connectionStatusReceiver = ReceivePort('service status receiver');
final source = _connectionStatusReceiver
.asBroadcastStream()
.map((event) => jsonDecode(event as String) as Map<String, dynamic>)
.map(mapEventToStatus);
_connectionStatus = ValueConnectableStream.seeded(
source,
const ConnectionStatus.disconnected(),
).autoConnect();
}
2023-08-19 22:27:23 +03:30
@override
TaskEither<String, Unit> setup(
String baseDir,
String workingDir,
String tempDir,
) {
2023-09-10 20:25:04 +03:30
final port = _connectionStatusReceiver.sendPort.nativePort;
2023-08-19 22:27:23 +03:30
return TaskEither(
() => CombineWorker().execute(
() {
2023-09-10 20:25:04 +03:30
_box.setupOnce(NativeApi.initializeApiDLData);
2023-08-19 22:27:23 +03:30
_box.setup(
baseDir.toNativeUtf8().cast(),
workingDir.toNativeUtf8().cast(),
tempDir.toNativeUtf8().cast(),
2023-09-10 20:25:04 +03:30
port,
2023-08-19 22:27:23 +03:30
);
return right(unit);
},
),
);
}
@override
2023-09-22 23:52:20 +03:30
TaskEither<String, Unit> parseConfig(
String path,
String tempPath,
bool debug,
) {
2023-08-19 22:27:23 +03:30
return TaskEither(
() => CombineWorker().execute(
() {
final err = _box
2023-09-22 23:52:20 +03:30
.parse(
path.toNativeUtf8().cast(),
tempPath.toNativeUtf8().cast(),
debug ? 1 : 0,
)
2023-08-19 22:27:23 +03:30
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
return left(err);
}
return right(unit);
},
),
);
}
@override
2023-09-01 15:00:41 +03:30
TaskEither<String, Unit> changeConfigOptions(ConfigOptions options) {
2023-08-19 22:27:23 +03:30
return TaskEither(
() => CombineWorker().execute(
() {
2023-09-01 15:00:41 +03:30
final json = jsonEncode(options.toJson());
final err = _box
.changeConfigOptions(json.toNativeUtf8().cast())
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
return left(err);
}
return right(unit);
},
),
);
}
@override
2023-09-10 20:25:04 +03:30
TaskEither<String, Unit> start(String configPath) {
2023-09-01 15:00:41 +03:30
return TaskEither(
() => CombineWorker().execute(
2023-09-10 20:25:04 +03:30
() {
2023-08-19 22:27:23 +03:30
final err = _box
2023-09-10 20:25:04 +03:30
.start(configPath.toNativeUtf8().cast())
2023-08-19 22:27:23 +03:30
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
return left(err);
}
return right(unit);
},
),
);
}
@override
2023-09-10 20:25:04 +03:30
TaskEither<String, Unit> stop() {
2023-08-19 22:27:23 +03:30
return TaskEither(
() => CombineWorker().execute(
() {
2023-09-10 20:25:04 +03:30
final err = _box.stop().cast<Utf8>().toDartString();
2023-08-19 22:27:23 +03:30
if (err.isNotEmpty) {
return left(err);
}
return right(unit);
},
),
);
}
@override
2023-09-10 20:25:04 +03:30
TaskEither<String, Unit> restart(String configPath) {
2023-08-19 22:27:23 +03:30
return TaskEither(
() => CombineWorker().execute(
() {
2023-09-10 20:25:04 +03:30
final err = _box
.restart(configPath.toNativeUtf8().cast())
.cast<Utf8>()
.toDartString();
2023-08-19 22:27:23 +03:30
if (err.isNotEmpty) {
return left(err);
}
return right(unit);
},
),
);
}
2023-08-28 13:15:57 +03:30
@override
2023-09-10 20:25:04 +03:30
Stream<ConnectionStatus> watchConnectionStatus() => _connectionStatus;
@override
Stream<String> watchStats() {
2023-08-28 13:15:57 +03:30
if (_statusStream != null) return _statusStream!;
final receiver = ReceivePort('status receiver');
final statusStream = receiver.asBroadcastStream(
onCancel: (_) {
_logger.debug("stopping status command client");
final err = _box.stopCommandClient(1).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
_logger.warning("error stopping status client");
}
receiver.close();
_statusStream = null;
},
).map(
(event) {
if (event case String _) {
if (event.startsWith('error:')) {
loggy.warning("[status client] error received: $event");
throw event.replaceFirst('error:', "");
}
return event;
}
loggy.warning("[status client] unexpected type, msg: $event");
throw "invalid type";
},
);
final err = _box
.startCommandClient(1, receiver.sendPort.nativePort)
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
loggy.warning("error starting status command: $err");
throw err;
}
return _statusStream = statusStream;
}
2023-08-29 19:32:31 +03:30
@override
Stream<String> watchOutbounds() {
if (_groupsStream != null) return _groupsStream!;
final receiver = ReceivePort('outbounds receiver');
final groupsStream = receiver.asBroadcastStream(
onCancel: (_) {
_logger.debug("stopping group command client");
final err = _box.stopCommandClient(4).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
_logger.warning("error stopping group client");
}
receiver.close();
_groupsStream = null;
},
).map(
(event) {
if (event case String _) {
if (event.startsWith('error:')) {
loggy.warning("[group client] error received: $event");
throw event.replaceFirst('error:', "");
}
return event;
}
loggy.warning("[group client] unexpected type, msg: $event");
throw "invalid type";
},
);
final err = _box
.startCommandClient(4, receiver.sendPort.nativePort)
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
loggy.warning("error starting group command: $err");
throw err;
}
return _groupsStream = groupsStream;
}
@override
TaskEither<String, Unit> selectOutbound(String groupTag, String outboundTag) {
return TaskEither(
() => CombineWorker().execute(
() {
final err = _box
.selectOutbound(
groupTag.toNativeUtf8().cast(),
outboundTag.toNativeUtf8().cast(),
)
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
return left(err);
}
return right(unit);
},
),
);
}
@override
TaskEither<String, Unit> urlTest(String groupTag) {
return TaskEither(
() => CombineWorker().execute(
() {
final err = _box
.urlTest(groupTag.toNativeUtf8().cast())
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
return left(err);
}
return right(unit);
},
),
);
}
2023-08-19 22:27:23 +03:30
@override
Stream<String> watchLogs(String path) {
var linesRead = 0;
return Stream.periodic(
const Duration(seconds: 1),
).asyncMap((_) async {
final result = await _readLogs(path, linesRead);
linesRead = result.$2;
return result.$1;
}).transform(
StreamTransformer.fromHandlers(
handleData: (data, sink) {
for (final item in data) {
sink.add(item);
}
},
),
);
}
Future<(List<String>, int)> _readLogs(String path, int from) async {
return CombineWorker().execute(
() async {
final lines = await File(path).readAsLines();
final to = lines.length;
return (lines.sublist(from), to);
},
);
}
}