Skip to content

Commit

Permalink
Allow sql_execute_df to execute on CPU or GPU independently of whethe…
Browse files Browse the repository at this point in the history
…r Arrow result sets are placed on host or device
  • Loading branch information
tmostak authored and andrewseidl committed Aug 4, 2021
1 parent 4d5db0f commit 0204512
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 22 deletions.
45 changes: 42 additions & 3 deletions Tests/ArrowIpcIntegrationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ using namespace std::literals;
#include "QueryEngine/CompilationOptions.h"
#include "Shared/ArrowUtil.h"
#include "Shared/ThriftClient.h"
#include "Shared/scope.h"

#include "gen-cpp/OmniSci.h"

Expand Down Expand Up @@ -379,10 +380,9 @@ TEST_F(ArrowIpcBasic, IpcWire) {
}
}

TEST_F(ArrowIpcBasic, IpcCpu) {
auto data_frame =
execute_arrow_ipc("SELECT * FROM arrow_ipc_test;", ExecutorDeviceType::CPU);
namespace {

void check_cpu_dataframe(TDataFrame& data_frame) {
ASSERT_TRUE(data_frame.df_size > 0);

auto df =
Expand Down Expand Up @@ -439,6 +439,29 @@ TEST_F(ArrowIpcBasic, IpcCpu) {
ASSERT_EQ(str, truth_strings.GetString(i));
}
}
}

} // namespace

TEST_F(ArrowIpcBasic, IpcCpu) {
auto data_frame =
execute_arrow_ipc("SELECT * FROM arrow_ipc_test;", ExecutorDeviceType::CPU);

check_cpu_dataframe(data_frame);

deallocate_df(data_frame, ExecutorDeviceType::CPU);
}

TEST_F(ArrowIpcBasic, IpcCpuWithCpuExecution) {
g_client->set_execution_mode(g_session_id, TExecuteMode::CPU);
ScopeGuard reset_execution_mode = [&] {
g_client->set_execution_mode(g_session_id, TExecuteMode::GPU);
};

auto data_frame =
execute_arrow_ipc("SELECT * FROM arrow_ipc_test;", ExecutorDeviceType::CPU);

check_cpu_dataframe(data_frame);

deallocate_df(data_frame, ExecutorDeviceType::CPU);
}
Expand Down Expand Up @@ -550,6 +573,22 @@ TEST_F(ArrowIpcBasic, IpcGpu) {
deallocate_df(data_frame, ExecutorDeviceType::GPU);
}

TEST_F(ArrowIpcBasic, IpcGpuWithCpuQuery) {
const size_t device_id = 0;
if (g_cpu_only) {
LOG(ERROR) << "Test not valid in CPU mode.";
return;
}

g_client->set_execution_mode(g_session_id, TExecuteMode::CPU);
ScopeGuard reset_execution_mode = [&] {
g_client->set_execution_mode(g_session_id, TExecuteMode::GPU);
};

EXPECT_ANY_THROW(execute_arrow_ipc(
"SELECT * FROM arrow_ipc_test;", ExecutorDeviceType::GPU, device_id));
}

TEST_F(ArrowIpcBasic, EmptyResultSet) {
char const* drop_flights = "DROP TABLE IF EXISTS flights;";
run_ddl_statement(drop_flights);
Expand Down
39 changes: 21 additions & 18 deletions ThriftHandler/DBHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1398,16 +1398,17 @@ int64_t DBHandler::process_geo_copy_from(const TSessionId& session_id) {
void DBHandler::sql_execute_df(TDataFrame& _return,
const TSessionId& session,
const std::string& query_str,
const TDeviceType::type device_type,
const TDeviceType::type results_device_type,
const int32_t device_id,
const int32_t first_n,
const TArrowTransport::type transport_method) {
auto session_ptr = get_session_ptr(session);
auto query_state = create_query_state(session_ptr, query_str);
auto stdlog = STDLOG(session_ptr, query_state);

if (device_type == TDeviceType::GPU) {
const auto executor_device_type = session_ptr->get_executor_device_type();
const auto executor_device_type = session_ptr->get_executor_device_type();

if (results_device_type == TDeviceType::GPU) {
if (executor_device_type != ExecutorDeviceType::GPU) {
THROW_MAPD_EXCEPTION(std::string("GPU mode is not allowed in this session"));
}
Expand Down Expand Up @@ -1453,8 +1454,10 @@ void DBHandler::sql_execute_df(TDataFrame& _return,
query_ra,
query_state_proxy,
*session_ptr,
device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU
: ExecutorDeviceType::GPU,
executor_device_type,
results_device_type == TDeviceType::CPU
? ExecutorDeviceType::CPU
: ExecutorDeviceType::GPU,
static_cast<size_t>(device_id),
first_n,
transport_method);
Expand Down Expand Up @@ -5622,13 +5625,12 @@ void DBHandler::execute_rel_alg_df(TDataFrame& _return,
const std::string& query_ra,
QueryStateProxy query_state_proxy,
const Catalog_Namespace::SessionInfo& session_info,
const ExecutorDeviceType device_type,
const ExecutorDeviceType executor_device_type,
const ExecutorDeviceType results_device_type,
const size_t device_id,
const int32_t first_n,
const TArrowTransport::type transport_method) const {
const auto& cat = session_info.getCatalog();
CHECK(device_type == ExecutorDeviceType::CPU ||
session_info.get_executor_device_type() == ExecutorDeviceType::GPU);
auto executor = Executor::getExecutor(Executor::UNITARY_EXECUTOR_ID,
jit_debug_ ? "/tmp" : "",
jit_debug_ ? "mapdquery" : "",
Expand All @@ -5639,14 +5641,15 @@ void DBHandler::execute_rel_alg_df(TDataFrame& _return,
query_state_proxy.getQueryState().shared_from_this());
const auto& query_hints = ra_executor.getParsedQueryHints();
const bool cpu_mode_enabled = query_hints.isHintRegistered(QueryHint::kCpuMode);
CompilationOptions co = {cpu_mode_enabled ? ExecutorDeviceType::CPU : device_type,
/*hoist_literals=*/true,
ExecutorOptLevel::Default,
g_enable_dynamic_watchdog,
/*allow_lazy_fetch=*/true,
/*filter_on_deleted_column=*/true,
ExecutorExplainType::Default,
intel_jit_profile_};
CompilationOptions co = {
cpu_mode_enabled ? ExecutorDeviceType::CPU : executor_device_type,
/*hoist_literals=*/true,
ExecutorOptLevel::Default,
g_enable_dynamic_watchdog,
/*allow_lazy_fetch=*/true,
/*filter_on_deleted_column=*/true,
ExecutorExplainType::Default,
intel_jit_profile_};
ExecutionOptions eo = {
g_enable_columnar_output,
allow_multifrag_,
Expand Down Expand Up @@ -5681,7 +5684,7 @@ void DBHandler::execute_rel_alg_df(TDataFrame& _return,
const auto converter =
std::make_unique<ArrowResultSetConverter>(rs,
data_mgr_,
device_type,
results_device_type,
device_id,
getTargetNames(result.getTargetsMeta()),
first_n,
Expand All @@ -5696,7 +5699,7 @@ void DBHandler::execute_rel_alg_df(TDataFrame& _return,
std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
_return.df_buffer =
std::string(arrow_result.df_buffer.begin(), arrow_result.df_buffer.end());
if (device_type == ExecutorDeviceType::GPU) {
if (results_device_type == ExecutorDeviceType::GPU) {
std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
ipc_handle_to_dev_ptr_.insert(
Expand Down
3 changes: 2 additions & 1 deletion ThriftHandler/DBHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,8 @@ class DBHandler : public OmniSciIf {
const std::string& query_ra,
QueryStateProxy query_state_proxy,
const Catalog_Namespace::SessionInfo& session_info,
const ExecutorDeviceType device_type,
const ExecutorDeviceType executor_device_type,
const ExecutorDeviceType results_device_type,
const size_t device_id,
const int32_t first_n,
const TArrowTransport::type transport_method) const;
Expand Down

0 comments on commit 0204512

Please sign in to comment.