From cb60e4b78e2de671ba1fd1e678c8e408ae41a9d4 Mon Sep 17 00:00:00 2001 From: Daco Harkes Date: Tue, 18 Feb 2020 08:42:16 +0000 Subject: [PATCH] [samples/ffi] Sample for asynchronous native port calls Issue: https://github.com/dart-lang/sdk/issues/37022#issuecomment-567122704 Change-Id: I774befa1d9843c043883038e59c0f8b629bf3c77 Cq-Include-Trybots: luci.dart.try:vm-ffi-android-debug-arm-try,vm-ffi-android-debug-arm64-try,app-kernel-linux-debug-x64-try,vm-kernel-linux-debug-ia32-try,vm-kernel-win-debug-x64-try,vm-kernel-win-debug-ia32-try,vm-kernel-precomp-linux-debug-x64-try,vm-dartkb-linux-release-x64-abi-try,vm-kernel-precomp-android-release-arm64-try,vm-kernel-asan-linux-release-x64-try,vm-kernel-linux-release-simarm-try,vm-kernel-linux-release-simarm64-try,vm-kernel-precomp-android-release-arm_x64-try,vm-kernel-precomp-obfuscate-linux-release-x64-try,dart-sdk-linux-try,analyzer-analysis-server-linux-try,analyzer-linux-release-try,front-end-linux-release-x64-try,vm-kernel-precomp-win-release-x64-try,vm-kernel-mac-debug-x64-try Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/134822 Commit-Queue: Daco Harkes Reviewed-by: Martin Kustermann --- .../ffi_test/ffi_test_functions_vmspecific.cc | 269 ++++++++++++++++++ samples/ffi/async/async_test.dart | 2 + .../ffi/async/sample_native_port_call.dart | 131 +++++++++ 3 files changed, 402 insertions(+) create mode 100644 samples/ffi/async/sample_native_port_call.dart diff --git a/runtime/bin/ffi_test/ffi_test_functions_vmspecific.cc b/runtime/bin/ffi_test/ffi_test_functions_vmspecific.cc index b847e4326206..7ecc22236cb4 100644 --- a/runtime/bin/ffi_test/ffi_test_functions_vmspecific.cc +++ b/runtime/bin/ffi_test/ffi_test_functions_vmspecific.cc @@ -460,4 +460,273 @@ DART_EXPORT void ExecuteCallback(Work* work_ptr) { printf("C Da: ExecuteCallback done.\n"); } +//////////////////////////////////////////////////////////////////////////////// +// Functions for async callbacks example. +// +// sample_native_port_call.dart + +Dart_Port send_port_; + +static void FreeFinalizer(void*, Dart_WeakPersistentHandle, void* value) { + free(value); +} + +class PendingCall { + public: + PendingCall(void** buffer, size_t* length) + : response_buffer_(buffer), response_length_(length) { + receive_port_ = + Dart_NewNativePort("cpp-response", &PendingCall::HandleResponse, + /*handle_concurrently=*/false); + } + ~PendingCall() { Dart_CloseNativePort(receive_port_); } + + Dart_Port port() const { return receive_port_; } + + void PostAndWait(Dart_Port port, Dart_CObject* object) { + std::unique_lock lock(mutex); + const bool success = Dart_PostCObject(send_port_, object); + if (!success) FATAL("Failed to send message, invalid port or isolate died"); + + printf("C : Waiting for result.\n"); + while (!notified) { + cv.wait(lock); + } + } + + static void HandleResponse(Dart_Port p, Dart_CObject* message) { + if (message->type != Dart_CObject_kArray) { + FATAL("C : Wrong Data: message->type != Dart_CObject_kArray.\n"); + } + Dart_CObject** c_response_args = message->value.as_array.values; + Dart_CObject* c_pending_call = c_response_args[0]; + Dart_CObject* c_message = c_response_args[1]; + printf("C : HandleResponse (call: %" Px ", message: %" Px ").\n", + reinterpret_cast(c_pending_call), + reinterpret_cast(c_message)); + + auto pending_call = reinterpret_cast( + c_pending_call->type == Dart_CObject_kInt64 + ? c_pending_call->value.as_int64 + : c_pending_call->value.as_int32); + + pending_call->ResolveCall(c_message); + } + + private: + static bool NonEmptyBuffer(void** value) { return *value != nullptr; } + + void ResolveCall(Dart_CObject* bytes) { + assert(bytes->type == Dart_CObject_kTypedData); + if (bytes->type != Dart_CObject_kTypedData) { + FATAL("C : Wrong Data: bytes->type != Dart_CObject_kTypedData.\n"); + } + const intptr_t response_length = bytes->value.as_typed_data.length; + const uint8_t* response_buffer = bytes->value.as_typed_data.values; + printf("C : ResolveCall(length: %" Pd ", buffer: %" Px ").\n", + response_length, reinterpret_cast(response_buffer)); + + void* buffer = malloc(response_length); + memmove(buffer, response_buffer, response_length); + + *response_buffer_ = buffer; + *response_length_ = response_length; + + printf("C : Notify result ready.\n"); + notified = true; + cv.notify_one(); + } + + std::mutex mutex; + std::condition_variable cv; + bool notified = false; + + Dart_Port receive_port_; + void** response_buffer_; + size_t* response_length_; +}; + +// Do a callback to Dart in a blocking way, being interested in the result. +// +// Dart returns `a + 3`. +uint8_t MyCallback1(uint8_t a) { + const char* methodname = "myCallback1"; + size_t request_length = sizeof(uint8_t) * 1; + void* request_buffer = malloc(request_length); // FreeFinalizer. + reinterpret_cast(request_buffer)[0] = a; // Populate buffer. + void* response_buffer = nullptr; + size_t response_length = 0; + + PendingCall pending_call(&response_buffer, &response_length); + + Dart_CObject c_send_port; + c_send_port.type = Dart_CObject_kSendPort; + c_send_port.value.as_send_port.id = pending_call.port(); + c_send_port.value.as_send_port.origin_id = ILLEGAL_PORT; + + Dart_CObject c_pending_call; + c_pending_call.type = Dart_CObject_kInt64; + c_pending_call.value.as_int64 = reinterpret_cast(&pending_call); + + Dart_CObject c_method_name; + c_method_name.type = Dart_CObject_kString; + c_method_name.value.as_string = const_cast(methodname); + + Dart_CObject c_request_data; + c_request_data.type = Dart_CObject_kExternalTypedData; + c_request_data.value.as_external_typed_data.type = Dart_TypedData_kUint8; + c_request_data.value.as_external_typed_data.length = request_length; + c_request_data.value.as_external_typed_data.data = + static_cast(request_buffer); + c_request_data.value.as_external_typed_data.peer = request_buffer; + c_request_data.value.as_external_typed_data.callback = FreeFinalizer; + + Dart_CObject* c_request_arr[] = {&c_send_port, &c_pending_call, + &c_method_name, &c_request_data}; + Dart_CObject c_request; + c_request.type = Dart_CObject_kArray; + c_request.value.as_array.values = c_request_arr; + c_request.value.as_array.length = + sizeof(c_request_arr) / sizeof(c_request_arr[0]); + + printf("C : Dart_PostCObject(request: %" Px ", call: %" Px ").\n", + reinterpret_cast(&c_request), + reinterpret_cast(&c_pending_call)); + pending_call.PostAndWait(send_port_, &c_request); + printf("C : Received result.\n"); + + const intptr_t result = reinterpret_cast(response_buffer)[0]; + free(response_buffer); + + return result; +} + +// Do a callback to Dart in a non-blocking way. +// +// Dart sums all numbers posted to it. +void MyCallback2(uint8_t a) { + const char* methodname = "myCallback2"; + void* request_buffer = malloc(sizeof(uint8_t) * 1); // FreeFinalizer. + reinterpret_cast(request_buffer)[0] = a; // Populate buffer. + const size_t request_length = sizeof(uint8_t) * 1; + + Dart_CObject c_send_port; + c_send_port.type = Dart_CObject_kNull; + + Dart_CObject c_pending_call; + c_pending_call.type = Dart_CObject_kNull; + + Dart_CObject c_method_name; + c_method_name.type = Dart_CObject_kString; + c_method_name.value.as_string = const_cast(methodname); + + Dart_CObject c_request_data; + c_request_data.type = Dart_CObject_kExternalTypedData; + c_request_data.value.as_external_typed_data.type = Dart_TypedData_kUint8; + c_request_data.value.as_external_typed_data.length = request_length; + c_request_data.value.as_external_typed_data.data = + static_cast(request_buffer); + c_request_data.value.as_external_typed_data.peer = request_buffer; + c_request_data.value.as_external_typed_data.callback = FreeFinalizer; + + Dart_CObject* c_request_arr[] = {&c_send_port, &c_pending_call, + &c_method_name, &c_request_data}; + Dart_CObject c_request; + c_request.type = Dart_CObject_kArray; + c_request.value.as_array.values = c_request_arr; + c_request.value.as_array.length = + sizeof(c_request_arr) / sizeof(c_request_arr[0]); + + printf("C : Dart_PostCObject(request: %" Px ", call: %" Px ").\n", + reinterpret_cast(&c_request), + reinterpret_cast(&c_pending_call)); + Dart_PostCObject(send_port_, &c_request); +} + +// Simulated work for Thread #1. +// +// Simulates heavy work with sleeps. +void Work1_2() { + printf("C T1: Work1 Start.\n"); + SleepOnAnyOS(1); + const intptr_t val1 = 3; + printf("C T1: MyCallback1(%" Pd ").\n", val1); + const intptr_t val2 = MyCallback1(val1); // val2 = 6. + printf("C T1: MyCallback1 returned %" Pd ".\n", val2); + SleepOnAnyOS(1); + const intptr_t val3 = val2 - 1; // val3 = 5. + printf("C T1: MyCallback2(%" Pd ").\n", val3); + MyCallback2(val3); // Post 5 to Dart. + printf("C T1: Work1 Done.\n"); +} + +// Simulated work for Thread #2. +// +// Simulates lighter work, no sleeps. +void Work2_2() { + printf("C T2: Work2 Start.\n"); + const intptr_t val1 = 5; + printf("C T2: MyCallback2(%" Pd ").\n", val1); + MyCallback2(val1); // Post 5 to Dart. + const intptr_t val2 = 1; + printf("C T2: MyCallback1(%" Pd ").\n", val2); + const intptr_t val3 = MyCallback1(val2); // val3 = 4. + printf("C T2: MyCallback1 returned %" Pd ".\n", val3); + printf("C T2: MyCallback2(%" Pd ").\n", val3); + MyCallback2(val3); // Post 4 to Dart. + printf("C T2: Work2 Done.\n"); +} + +// Simulator that simulates concurrent work with multiple threads. +class SimulateWork2 { + public: + static void StartWorkSimulator() { + running_work_simulator_ = new SimulateWork2(); + running_work_simulator_->Start(); + } + + static void StopWorkSimulator() { + running_work_simulator_->Stop(); + delete running_work_simulator_; + running_work_simulator_ = nullptr; + } + + private: + static SimulateWork2* running_work_simulator_; + + void Start() { + printf("C Da: Starting SimulateWork.\n"); + printf("C Da: Starting worker threads.\n"); + thread1 = new std::thread(Work1_2); + thread2 = new std::thread(Work2_2); + printf("C Da: Started SimulateWork.\n"); + } + + void Stop() { + printf("C Da: Stopping SimulateWork.\n"); + printf("C Da: Waiting for worker threads to finish.\n"); + thread1->join(); + thread2->join(); + delete thread1; + delete thread2; + printf("C Da: Stopped SimulateWork.\n"); + } + + std::thread* thread1; + std::thread* thread2; +}; +SimulateWork2* SimulateWork2::running_work_simulator_ = 0; + +DART_EXPORT void RegisterSendPort(Dart_Port send_port) { + send_port_ = send_port; +} + +DART_EXPORT void StartWorkSimulator2() { + SimulateWork2::StartWorkSimulator(); +} + +DART_EXPORT void StopWorkSimulator2() { + SimulateWork2::StopWorkSimulator(); +} + } // namespace dart diff --git a/samples/ffi/async/async_test.dart b/samples/ffi/async/async_test.dart index 1ddad47bce1b..f43858215aac 100644 --- a/samples/ffi/async/async_test.dart +++ b/samples/ffi/async/async_test.dart @@ -7,7 +7,9 @@ // SharedObjects=ffi_test_dynamic_library ffi_test_functions import 'sample_async_callback.dart' as sample0; +import 'sample_native_port_call.dart' as sample1; main() { sample0.main(); + sample1.main(); } diff --git a/samples/ffi/async/sample_native_port_call.dart b/samples/ffi/async/sample_native_port_call.dart new file mode 100644 index 000000000000..87958e9cd269 --- /dev/null +++ b/samples/ffi/async/sample_native_port_call.dart @@ -0,0 +1,131 @@ +// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +// +// Sample showing how to do calls from C into Dart through native ports. +// +// This sample does not use FFI callbacks to do the callbacks at all. Instead, +// it sends a message to Dart through native ports, decodes the message in Dart +// does a method call in Dart and sends the result back to C through a native +// port. +// +// The disadvantage of this approach compared to `sample_async_callback.dart` +// is that it requires more boilerplate, because it does not use the automatic +// marshalling of data of the FFI. +// +// The advantage is that finalizers can be used when passing ownership of data +// (buffers) from C to Dart. + +import 'dart:ffi'; +import 'dart:isolate'; +import 'dart:typed_data'; + +import 'package:expect/expect.dart'; + +import '../dylib_utils.dart'; + +var globalResult = 0; +var numCallbacks1 = 0; +var numCallbacks2 = 0; + +main() async { + print("Dart = Dart mutator thread executing Dart."); + print("C Da = Dart mutator thread executing C."); + print("C T1 = Some C thread executing C."); + print("C T2 = Some C thread executing C."); + print("C = C T1 or C T2."); + print("Dart: Setup."); + final interactiveCppRequests = ReceivePort()..listen(handleCppRequests); + final int nativePort = interactiveCppRequests.sendPort.nativePort; + registerSendPort(nativePort); + print("Dart: Tell C to start worker threads."); + startWorkSimulator2(); + + // We need to yield control in order to be able to receive messages. + while (numCallbacks2 < 3) { + print("Dart: Yielding (able to receive messages on port)."); + await asyncSleep(500); + } + print("Dart: Received expected number of callbacks."); + + Expect.equals(2, numCallbacks1); + Expect.equals(3, numCallbacks2); + Expect.equals(14, globalResult); + + print("Dart: Tell C to stop worker threads."); + stopWorkSimulator2(); + interactiveCppRequests.close(); + print("Dart: Done."); +} + +int myCallback1(int a) { + print("Dart: myCallback1($a)."); + numCallbacks1++; + return a + 3; +} + +void myCallback2(int a) { + print("Dart: myCallback2($a)."); + globalResult += a; + numCallbacks2++; +} + +class CppRequest { + final SendPort replyPort; + final int pendingCall; + final String method; + final Uint8List data; + + factory CppRequest.fromCppMessage(List message) { + return CppRequest._(message[0], message[1], message[2], message[3]); + } + + CppRequest._(this.replyPort, this.pendingCall, this.method, this.data); + + String toString() => 'CppRequest(method: $method, ${data.length} bytes)'; +} + +class CppResponse { + final int pendingCall; + final Uint8List data; + + CppResponse(this.pendingCall, this.data); + + List toCppMessage() => List.from([pendingCall, data], growable: false); + + String toString() => 'CppResponse(message: ${data.length})'; +} + +void handleCppRequests(dynamic message) { + final cppRequest = CppRequest.fromCppMessage(message); + print('Dart: Got message: $cppRequest'); + + if (cppRequest.method == 'myCallback1') { + // Use the data in any way you like. Here we just take the first byte as + // the argument to the function. + final int argument = cppRequest.data[0]; + final int result = myCallback1(argument); + final cppResponse = + CppResponse(cppRequest.pendingCall, Uint8List.fromList([result])); + print('Dart: Responding: $cppResponse'); + cppRequest.replyPort.send(cppResponse.toCppMessage()); + } else if (cppRequest.method == 'myCallback2') { + final int argument = cppRequest.data[0]; + myCallback2(argument); + } +} + +final dl = dlopenPlatformSpecific("ffi_test_functions"); + +final registerSendPort = dl.lookupFunction('RegisterSendPort'); + +final startWorkSimulator2 = + dl.lookupFunction('StartWorkSimulator2'); + +final stopWorkSimulator2 = + dl.lookupFunction('StopWorkSimulator2'); + +Future asyncSleep(int ms) { + return new Future.delayed(Duration(milliseconds: ms), () => true); +}