Skip to content

Commit

Permalink
[samples/ffi] Sample for asynchronous native port calls
Browse files Browse the repository at this point in the history
Issue: #37022 (comment)

Change-Id: I774befa1d9843c043883038e59c0f8b629bf3c77
Cq-Include-Trybots: luci.dart.try:vm-ffi-android-debug-arm-try,vm-ffi-android-debug-arm64-try,app-kernel-linux-debug-x64-try,vm-kernel-linux-debug-ia32-try,vm-kernel-win-debug-x64-try,vm-kernel-win-debug-ia32-try,vm-kernel-precomp-linux-debug-x64-try,vm-dartkb-linux-release-x64-abi-try,vm-kernel-precomp-android-release-arm64-try,vm-kernel-asan-linux-release-x64-try,vm-kernel-linux-release-simarm-try,vm-kernel-linux-release-simarm64-try,vm-kernel-precomp-android-release-arm_x64-try,vm-kernel-precomp-obfuscate-linux-release-x64-try,dart-sdk-linux-try,analyzer-analysis-server-linux-try,analyzer-linux-release-try,front-end-linux-release-x64-try,vm-kernel-precomp-win-release-x64-try,vm-kernel-mac-debug-x64-try
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/134822
Commit-Queue: Daco Harkes <[email protected]>
Reviewed-by: Martin Kustermann <[email protected]>
  • Loading branch information
dcharkes authored and [email protected] committed Feb 18, 2020
1 parent 76ef075 commit cb60e4b
Show file tree
Hide file tree
Showing 3 changed files with 402 additions and 0 deletions.
269 changes: 269 additions & 0 deletions runtime/bin/ffi_test/ffi_test_functions_vmspecific.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,4 +460,273 @@ DART_EXPORT void ExecuteCallback(Work* work_ptr) {
printf("C Da: ExecuteCallback done.\n");
}

////////////////////////////////////////////////////////////////////////////////
// Functions for async callbacks example.
//
// sample_native_port_call.dart

Dart_Port send_port_;

static void FreeFinalizer(void*, Dart_WeakPersistentHandle, void* value) {
free(value);
}

class PendingCall {
public:
PendingCall(void** buffer, size_t* length)
: response_buffer_(buffer), response_length_(length) {
receive_port_ =
Dart_NewNativePort("cpp-response", &PendingCall::HandleResponse,
/*handle_concurrently=*/false);
}
~PendingCall() { Dart_CloseNativePort(receive_port_); }

Dart_Port port() const { return receive_port_; }

void PostAndWait(Dart_Port port, Dart_CObject* object) {
std::unique_lock<std::mutex> lock(mutex);
const bool success = Dart_PostCObject(send_port_, object);
if (!success) FATAL("Failed to send message, invalid port or isolate died");

printf("C : Waiting for result.\n");
while (!notified) {
cv.wait(lock);
}
}

static void HandleResponse(Dart_Port p, Dart_CObject* message) {
if (message->type != Dart_CObject_kArray) {
FATAL("C : Wrong Data: message->type != Dart_CObject_kArray.\n");
}
Dart_CObject** c_response_args = message->value.as_array.values;
Dart_CObject* c_pending_call = c_response_args[0];
Dart_CObject* c_message = c_response_args[1];
printf("C : HandleResponse (call: %" Px ", message: %" Px ").\n",
reinterpret_cast<intptr_t>(c_pending_call),
reinterpret_cast<intptr_t>(c_message));

auto pending_call = reinterpret_cast<PendingCall*>(
c_pending_call->type == Dart_CObject_kInt64
? c_pending_call->value.as_int64
: c_pending_call->value.as_int32);

pending_call->ResolveCall(c_message);
}

private:
static bool NonEmptyBuffer(void** value) { return *value != nullptr; }

void ResolveCall(Dart_CObject* bytes) {
assert(bytes->type == Dart_CObject_kTypedData);
if (bytes->type != Dart_CObject_kTypedData) {
FATAL("C : Wrong Data: bytes->type != Dart_CObject_kTypedData.\n");
}
const intptr_t response_length = bytes->value.as_typed_data.length;
const uint8_t* response_buffer = bytes->value.as_typed_data.values;
printf("C : ResolveCall(length: %" Pd ", buffer: %" Px ").\n",
response_length, reinterpret_cast<intptr_t>(response_buffer));

void* buffer = malloc(response_length);
memmove(buffer, response_buffer, response_length);

*response_buffer_ = buffer;
*response_length_ = response_length;

printf("C : Notify result ready.\n");
notified = true;
cv.notify_one();
}

std::mutex mutex;
std::condition_variable cv;
bool notified = false;

Dart_Port receive_port_;
void** response_buffer_;
size_t* response_length_;
};

// Do a callback to Dart in a blocking way, being interested in the result.
//
// Dart returns `a + 3`.
uint8_t MyCallback1(uint8_t a) {
const char* methodname = "myCallback1";
size_t request_length = sizeof(uint8_t) * 1;
void* request_buffer = malloc(request_length); // FreeFinalizer.
reinterpret_cast<uint8_t*>(request_buffer)[0] = a; // Populate buffer.
void* response_buffer = nullptr;
size_t response_length = 0;

PendingCall pending_call(&response_buffer, &response_length);

Dart_CObject c_send_port;
c_send_port.type = Dart_CObject_kSendPort;
c_send_port.value.as_send_port.id = pending_call.port();
c_send_port.value.as_send_port.origin_id = ILLEGAL_PORT;

Dart_CObject c_pending_call;
c_pending_call.type = Dart_CObject_kInt64;
c_pending_call.value.as_int64 = reinterpret_cast<int64_t>(&pending_call);

Dart_CObject c_method_name;
c_method_name.type = Dart_CObject_kString;
c_method_name.value.as_string = const_cast<char*>(methodname);

Dart_CObject c_request_data;
c_request_data.type = Dart_CObject_kExternalTypedData;
c_request_data.value.as_external_typed_data.type = Dart_TypedData_kUint8;
c_request_data.value.as_external_typed_data.length = request_length;
c_request_data.value.as_external_typed_data.data =
static_cast<uint8_t*>(request_buffer);
c_request_data.value.as_external_typed_data.peer = request_buffer;
c_request_data.value.as_external_typed_data.callback = FreeFinalizer;

Dart_CObject* c_request_arr[] = {&c_send_port, &c_pending_call,
&c_method_name, &c_request_data};
Dart_CObject c_request;
c_request.type = Dart_CObject_kArray;
c_request.value.as_array.values = c_request_arr;
c_request.value.as_array.length =
sizeof(c_request_arr) / sizeof(c_request_arr[0]);

printf("C : Dart_PostCObject(request: %" Px ", call: %" Px ").\n",
reinterpret_cast<intptr_t>(&c_request),
reinterpret_cast<intptr_t>(&c_pending_call));
pending_call.PostAndWait(send_port_, &c_request);
printf("C : Received result.\n");

const intptr_t result = reinterpret_cast<uint8_t*>(response_buffer)[0];
free(response_buffer);

return result;
}

// Do a callback to Dart in a non-blocking way.
//
// Dart sums all numbers posted to it.
void MyCallback2(uint8_t a) {
const char* methodname = "myCallback2";
void* request_buffer = malloc(sizeof(uint8_t) * 1); // FreeFinalizer.
reinterpret_cast<uint8_t*>(request_buffer)[0] = a; // Populate buffer.
const size_t request_length = sizeof(uint8_t) * 1;

Dart_CObject c_send_port;
c_send_port.type = Dart_CObject_kNull;

Dart_CObject c_pending_call;
c_pending_call.type = Dart_CObject_kNull;

Dart_CObject c_method_name;
c_method_name.type = Dart_CObject_kString;
c_method_name.value.as_string = const_cast<char*>(methodname);

Dart_CObject c_request_data;
c_request_data.type = Dart_CObject_kExternalTypedData;
c_request_data.value.as_external_typed_data.type = Dart_TypedData_kUint8;
c_request_data.value.as_external_typed_data.length = request_length;
c_request_data.value.as_external_typed_data.data =
static_cast<uint8_t*>(request_buffer);
c_request_data.value.as_external_typed_data.peer = request_buffer;
c_request_data.value.as_external_typed_data.callback = FreeFinalizer;

Dart_CObject* c_request_arr[] = {&c_send_port, &c_pending_call,
&c_method_name, &c_request_data};
Dart_CObject c_request;
c_request.type = Dart_CObject_kArray;
c_request.value.as_array.values = c_request_arr;
c_request.value.as_array.length =
sizeof(c_request_arr) / sizeof(c_request_arr[0]);

printf("C : Dart_PostCObject(request: %" Px ", call: %" Px ").\n",
reinterpret_cast<intptr_t>(&c_request),
reinterpret_cast<intptr_t>(&c_pending_call));
Dart_PostCObject(send_port_, &c_request);
}

// Simulated work for Thread #1.
//
// Simulates heavy work with sleeps.
void Work1_2() {
printf("C T1: Work1 Start.\n");
SleepOnAnyOS(1);
const intptr_t val1 = 3;
printf("C T1: MyCallback1(%" Pd ").\n", val1);
const intptr_t val2 = MyCallback1(val1); // val2 = 6.
printf("C T1: MyCallback1 returned %" Pd ".\n", val2);
SleepOnAnyOS(1);
const intptr_t val3 = val2 - 1; // val3 = 5.
printf("C T1: MyCallback2(%" Pd ").\n", val3);
MyCallback2(val3); // Post 5 to Dart.
printf("C T1: Work1 Done.\n");
}

// Simulated work for Thread #2.
//
// Simulates lighter work, no sleeps.
void Work2_2() {
printf("C T2: Work2 Start.\n");
const intptr_t val1 = 5;
printf("C T2: MyCallback2(%" Pd ").\n", val1);
MyCallback2(val1); // Post 5 to Dart.
const intptr_t val2 = 1;
printf("C T2: MyCallback1(%" Pd ").\n", val2);
const intptr_t val3 = MyCallback1(val2); // val3 = 4.
printf("C T2: MyCallback1 returned %" Pd ".\n", val3);
printf("C T2: MyCallback2(%" Pd ").\n", val3);
MyCallback2(val3); // Post 4 to Dart.
printf("C T2: Work2 Done.\n");
}

// Simulator that simulates concurrent work with multiple threads.
class SimulateWork2 {
public:
static void StartWorkSimulator() {
running_work_simulator_ = new SimulateWork2();
running_work_simulator_->Start();
}

static void StopWorkSimulator() {
running_work_simulator_->Stop();
delete running_work_simulator_;
running_work_simulator_ = nullptr;
}

private:
static SimulateWork2* running_work_simulator_;

void Start() {
printf("C Da: Starting SimulateWork.\n");
printf("C Da: Starting worker threads.\n");
thread1 = new std::thread(Work1_2);
thread2 = new std::thread(Work2_2);
printf("C Da: Started SimulateWork.\n");
}

void Stop() {
printf("C Da: Stopping SimulateWork.\n");
printf("C Da: Waiting for worker threads to finish.\n");
thread1->join();
thread2->join();
delete thread1;
delete thread2;
printf("C Da: Stopped SimulateWork.\n");
}

std::thread* thread1;
std::thread* thread2;
};
SimulateWork2* SimulateWork2::running_work_simulator_ = 0;

DART_EXPORT void RegisterSendPort(Dart_Port send_port) {
send_port_ = send_port;
}

DART_EXPORT void StartWorkSimulator2() {
SimulateWork2::StartWorkSimulator();
}

DART_EXPORT void StopWorkSimulator2() {
SimulateWork2::StopWorkSimulator();
}

} // namespace dart
2 changes: 2 additions & 0 deletions samples/ffi/async/async_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
// SharedObjects=ffi_test_dynamic_library ffi_test_functions

import 'sample_async_callback.dart' as sample0;
import 'sample_native_port_call.dart' as sample1;

main() {
sample0.main();
sample1.main();
}
Loading

0 comments on commit cb60e4b

Please sign in to comment.