Skip to content

Commit

Permalink
[samples/ffi] Sample for asynchronous callbacks
Browse files Browse the repository at this point in the history
Issue: #37022 (comment)

Change-Id: If30d168e6666131b6d96d5885a0dbe32291b1ef9
Cq-Include-Trybots: luci.dart.try:vm-ffi-android-debug-arm-try,vm-ffi-android-debug-arm64-try,app-kernel-linux-debug-x64-try,vm-kernel-linux-debug-ia32-try,vm-kernel-win-debug-x64-try,vm-kernel-win-debug-ia32-try,vm-kernel-precomp-linux-debug-x64-try,vm-dartkb-linux-release-x64-abi-try,vm-kernel-precomp-android-release-arm64-try,vm-kernel-asan-linux-release-x64-try,vm-kernel-linux-release-simarm-try,vm-kernel-linux-release-simarm64-try,vm-kernel-precomp-android-release-arm_x64-try,vm-kernel-precomp-obfuscate-linux-release-x64-try,dart-sdk-linux-try,analyzer-analysis-server-linux-try,analyzer-linux-release-try,front-end-linux-release-x64-try,vm-kernel-precomp-win-release-x64-try
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/134704
Reviewed-by: Martin Kustermann <[email protected]>
  • Loading branch information
dcharkes authored and [email protected] committed Feb 18, 2020
1 parent b20c35c commit 76ef075
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 5 deletions.
2 changes: 2 additions & 0 deletions runtime/bin/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,7 @@ shared_library("ffi_test_dynamic_library") {
cflags = [ "-fPIC" ]
}
if (is_win) {
# TODO(dartbug.com/40579): This wrongly links in dart.exe on precompiled.
libs = [ "dart.lib" ]
abs_root_out_dir = rebase_path(root_out_dir)
ldflags = [ "/LIBPATH:$abs_root_out_dir" ]
Expand Down Expand Up @@ -1153,6 +1154,7 @@ shared_library("ffi_test_functions") {
cflags = [ "-fPIC" ]
}
if (is_win) {
# TODO(dartbug.com/40579): This wrongly links in dart.exe on precompiled.
libs = [ "dart.lib" ]
abs_root_out_dir = rebase_path(root_out_dir)
ldflags = [ "/LIBPATH:$abs_root_out_dir" ]
Expand Down
202 changes: 199 additions & 3 deletions runtime/bin/ffi_test/ffi_test_functions_vmspecific.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,26 @@
#include "platform/globals.h"
#if defined(HOST_OS_WINDOWS)
#include <psapi.h>
#include <windows.h>
#else
#include <unistd.h>
#endif

// Only OK to use here because this is test code.
#include <condition_variable> // NOLINT(build/c++11)
#include <functional> // NOLINT(build/c++11)
#include <mutex> // NOLINT(build/c++11)
#include <queue> // NOLINT(build/c++11)
#include <thread> // NOLINT(build/c++11)
#endif

#include <setjmp.h>
#include <signal.h>
#include <setjmp.h> // NOLINT
#include <signal.h> // NOLINT
#include <iostream>
#include <limits>

// TODO(dartbug.com/40579): This requires static linking to either link
// dart.exe or dart_precompiled_runtime.exe on Windows.
// The sample currently fails on Windows in AOT mode.
#include "include/dart_api.h"
#include "include/dart_native_api.h"

Expand Down Expand Up @@ -264,4 +269,195 @@ DART_EXPORT intptr_t TestCallbackWrongIsolate(void (*fn)()) {

#endif // defined(TARGET_OS_LINUX)

////////////////////////////////////////////////////////////////////////////////
// Functions for async callbacks example.
//
// sample_async_callback.dart

void Fatal(char const* file, int line, char const* error) {
printf("FATAL %s:%i\n", file, line);
printf("%s\n", error);
Dart_DumpNativeStackTrace(NULL);
Dart_PrepareToAbort();
abort();
}

#define FATAL(error) Fatal(__FILE__, __LINE__, error)

void SleepOnAnyOS(intptr_t seconds) {
#if defined(HOST_OS_WINDOWS)
Sleep(1000 * seconds);
#else
sleep(seconds);
#endif
}

intptr_t (*my_callback_blocking_fp_)(intptr_t);
Dart_Port my_callback_blocking_send_port_;

void (*my_callback_non_blocking_fp_)(intptr_t);
Dart_Port my_callback_non_blocking_send_port_;

typedef std::function<void()> Work;

// Notify Dart through a port that the C lib has pending async callbacks.
//
// Expects heap allocated `work` so delete can be called on it.
//
// The `send_port` should be from the isolate which registered the callback.
void NotifyDart(Dart_Port send_port, const Work* work) {
const intptr_t work_addr = reinterpret_cast<intptr_t>(work);
printf("C : Posting message (port: %" Px64 ", work: %" Px ").\n",
send_port, work_addr);

Dart_CObject dart_object;
dart_object.type = Dart_CObject_kInt64;
dart_object.value.as_int64 = work_addr;

const bool result = Dart_PostCObject(send_port, &dart_object);
if (!result) {
FATAL("C : Posting message to port failed.");
}
}

// Do a callback to Dart in a blocking way, being interested in the result.
//
// Dart returns `a + 3`.
intptr_t MyCallbackBlocking(intptr_t a) {
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
intptr_t result;
auto callback = my_callback_blocking_fp_; // Define storage duration.
std::condition_variable cv;
bool notified = false;
const Work work = [a, &result, callback, &cv, &notified]() {
result = callback(a);
printf("C Da: Notify result ready.\n");
notified = true;
cv.notify_one();
};
const Work* work_ptr = new Work(work); // Copy to heap.
NotifyDart(my_callback_blocking_send_port_, work_ptr);
printf("C : Waiting for result.\n");
while (!notified) {
cv.wait(lock);
}
printf("C : Received result.\n");
return result;
}

// Do a callback to Dart in a non-blocking way.
//
// Dart sums all numbers posted to it.
void MyCallbackNonBlocking(intptr_t a) {
auto callback = my_callback_non_blocking_fp_; // Define storage duration.
const Work work = [a, callback]() { callback(a); };
// Copy to heap to make it outlive the function scope.
const Work* work_ptr = new Work(work);
NotifyDart(my_callback_non_blocking_send_port_, work_ptr);
}

// Simulated work for Thread #1.
//
// Simulates heavy work with sleeps.
void Work1() {
printf("C T1: Work1 Start.\n");
SleepOnAnyOS(1);
const intptr_t val1 = 3;
printf("C T1: MyCallbackBlocking(%" Pd ").\n", val1);
const intptr_t val2 = MyCallbackBlocking(val1); // val2 = 6.
printf("C T1: MyCallbackBlocking returned %" Pd ".\n", val2);
SleepOnAnyOS(1);
const intptr_t val3 = val2 - 1; // val3 = 5.
printf("C T1: MyCallbackNonBlocking(%" Pd ").\n", val3);
MyCallbackNonBlocking(val3); // Post 5 to Dart.
printf("C T1: Work1 Done.\n");
}

// Simulated work for Thread #2.
//
// Simulates lighter work, no sleeps.
void Work2() {
printf("C T2: Work2 Start.\n");
const intptr_t val1 = 5;
printf("C T2: MyCallbackNonBlocking(%" Pd ").\n", val1);
MyCallbackNonBlocking(val1); // Post 5 to Dart.
const intptr_t val2 = 1;
printf("C T2: MyCallbackBlocking(%" Pd ").\n", val2);
const intptr_t val3 = MyCallbackBlocking(val2); // val3 = 4.
printf("C T2: MyCallbackBlocking returned %" Pd ".\n", val3);
printf("C T2: MyCallbackNonBlocking(%" Pd ").\n", val3);
MyCallbackNonBlocking(val3); // Post 4 to Dart.
printf("C T2: Work2 Done.\n");
}

// Simulator that simulates concurrent work with multiple threads.
class SimulateWork {
public:
static void StartWorkSimulator() {
running_work_simulator_ = new SimulateWork();
running_work_simulator_->Start();
}

static void StopWorkSimulator() {
running_work_simulator_->Stop();
delete running_work_simulator_;
running_work_simulator_ = nullptr;
}

private:
static SimulateWork* running_work_simulator_;

void Start() {
printf("C Da: Starting SimulateWork.\n");
printf("C Da: Starting worker threads.\n");
thread1 = new std::thread(Work1);
thread2 = new std::thread(Work2);
printf("C Da: Started SimulateWork.\n");
}

void Stop() {
printf("C Da: Stopping SimulateWork.\n");
printf("C Da: Waiting for worker threads to finish.\n");
thread1->join();
thread2->join();
delete thread1;
delete thread2;
printf("C Da: Stopped SimulateWork.\n");
}

std::thread* thread1;
std::thread* thread2;
};
SimulateWork* SimulateWork::running_work_simulator_ = 0;

DART_EXPORT void RegisterMyCallbackBlocking(Dart_Port send_port,
intptr_t (*callback1)(intptr_t)) {
my_callback_blocking_fp_ = callback1;
my_callback_blocking_send_port_ = send_port;
}

DART_EXPORT void RegisterMyCallbackNonBlocking(Dart_Port send_port,
void (*callback)(intptr_t)) {
my_callback_non_blocking_fp_ = callback;
my_callback_non_blocking_send_port_ = send_port;
}

DART_EXPORT void StartWorkSimulator() {
SimulateWork::StartWorkSimulator();
}

DART_EXPORT void StopWorkSimulator() {
SimulateWork::StopWorkSimulator();
}

DART_EXPORT void ExecuteCallback(Work* work_ptr) {
printf("C Da: ExecuteCallback(%" Pp ").\n",
reinterpret_cast<intptr_t>(work_ptr));
const Work work = *work_ptr;
work();
delete work_ptr;
printf("C Da: ExecuteCallback done.\n");
}

} // namespace dart
13 changes: 13 additions & 0 deletions samples/ffi/async/async_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// This file exercises the sample files so that they are tested.
//
// SharedObjects=ffi_test_dynamic_library ffi_test_functions

import 'sample_async_callback.dart' as sample0;

main() {
sample0.main();
}
109 changes: 109 additions & 0 deletions samples/ffi/async/sample_async_callback.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// Sample showing how to do async callbacks by telling the Dart isolate to
// yields its execution thread to C so it can perform the callbacks on the
// main Dart thread.
//
// TODO(dartbug.com/37022): Update this when we get real async callbacks.
// TODO(dartbug.com/40564): On Windows DLL is wrongly linked against dart.exe
// instead of dart_precompiled_runtime.exe

import 'dart:ffi';
import 'dart:isolate';

import 'package:expect/expect.dart';

import '../dylib_utils.dart';

int globalResult = 0;
int numCallbacks1 = 0;
int numCallbacks2 = 0;

main() async {
print("Dart = Dart mutator thread executing Dart.");
print("C Da = Dart mutator thread executing C.");
print("C T1 = Some C thread executing C.");
print("C T2 = Some C thread executing C.");
print("C = C T1 or C T2.");
print("Dart: Setup.");
final interactiveCppRequests = ReceivePort()..listen(requestExecuteCallback);

final int nativePort = interactiveCppRequests.sendPort.nativePort;
registerCallback1(nativePort, callback1FP);
registerCallback2(nativePort, callback2FP);
print("Dart: Tell C to start worker threads.");
startWorkSimulator();

// We need to yield control in order to be able to receive messages.
while (numCallbacks2 < 3) {
print("Dart: Yielding (able to receive messages on port).");
await asyncSleep(500);
}
print("Dart: Received expected number of callbacks.");

Expect.equals(2, numCallbacks1);
Expect.equals(3, numCallbacks2);
Expect.equals(14, globalResult);

print("Dart: Tell C to stop worker threads.");
stopWorkSimulator();
interactiveCppRequests.close();
print("Dart: Done.");
}

int callback1(int a) {
print("Dart: callback1($a).");
numCallbacks1++;
return a + 3;
}

void callback2(int a) {
print("Dart: callback2($a).");
globalResult += a;
numCallbacks2++;
}

void requestExecuteCallback(dynamic message) {
final int work_address = message;
final work = Pointer<Work>.fromAddress(work_address);
print("Dart: Calling into C to execute callback ($work).");
executeCallback(work);
print("Dart: Done with callback.");
}

final callback1FP = Pointer.fromFunction<IntPtr Function(IntPtr)>(callback1, 0);

final callback2FP = Pointer.fromFunction<Void Function(IntPtr)>(callback2);

final dl = dlopenPlatformSpecific("ffi_test_functions");

final registerCallback1 = dl.lookupFunction<
Void Function(Int64 sendPort,
Pointer<NativeFunction<IntPtr Function(IntPtr)>> functionPointer),
void Function(int sendPort,
Pointer<NativeFunction<IntPtr Function(IntPtr)>> functionPointer)>(
'RegisterMyCallbackBlocking');

final registerCallback2 = dl.lookupFunction<
Void Function(Int64 sendPort,
Pointer<NativeFunction<Void Function(IntPtr)>> functionPointer),
void Function(int sendPort,
Pointer<NativeFunction<Void Function(IntPtr)>> functionPointer)>(
'RegisterMyCallbackNonBlocking');

final startWorkSimulator =
dl.lookupFunction<Void Function(), void Function()>('StartWorkSimulator');

final stopWorkSimulator =
dl.lookupFunction<Void Function(), void Function()>('StopWorkSimulator');

final executeCallback = dl.lookupFunction<Void Function(Pointer<Work>),
void Function(Pointer<Work>)>('ExecuteCallback');

class Work extends Struct {}

Future asyncSleep(int ms) {
return new Future.delayed(Duration(milliseconds: ms));
}
6 changes: 4 additions & 2 deletions samples/samples.status
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ sample_extension/test/sample_extension_app_snapshot_test: SkipByDesign # This te
[ $compiler == dart2js && $runtime == none ]
*: Fail, Pass # TODO(ahe): Triage these tests.

[ $compiler == dartkp && $system == windows ]
ffi/async/async_test: Skip # dartbug.com/40564 dartbug.com/40579

[ $compiler == none && $mode == debug && $runtime == vm && $system == windows ]
sample_extension/test/sample_extension_app_snapshot_test: Pass, RuntimeError # Issue 28842

[ $compiler == none && $runtime == vm && $system == fuchsia ]
*: Skip # Not yet triaged.

[ $arch == simarm || $arch == simarm64 ]
ffi/resource_management/resource_management_test: SkipByDesign
ffi/samples_test: SkipByDesign # FFI skips, see ffi.status
ffi/*: SkipByDesign # FFI skips, see ffi.status

[ $arch != x64 || $compiler != dartk || $system != linux || $hot_reload || $hot_reload_rollback ]
ffi/sqlite/test/sqlite_test: SkipByDesign # FFI not supported or libsqlite3.so not available.
Expand Down

0 comments on commit 76ef075

Please sign in to comment.