Skip to content

Commit

Permalink
Support parameter control for overlaps join via query hint
Browse files Browse the repository at this point in the history
* Support two param control via query hint: 1) overlaps_bucket_threshold AND 2) overlaps_max_size

* Improve the code for parsing / passing query hint

* assign default query hint iff query_dag_ is nullptr

* Fixup codes / Add test cases
  • Loading branch information
yoonminnam authored and andrewseidl committed Jan 25, 2021
1 parent 15827c0 commit 3a86f6e
Show file tree
Hide file tree
Showing 20 changed files with 409 additions and 131 deletions.
2 changes: 2 additions & 0 deletions QueryEngine/CardinalityEstimator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ RelAlgExecutionUnit create_ndv_execution_unit(const RelAlgExecutionUnit& ra_exe_
: makeExpr<Analyzer::NDVEstimator>(ra_exe_unit.groupby_exprs),
SortInfo{{}, SortAlgorithm::Default, 0, 0},
0,
ra_exe_unit.query_hint,
false,
ra_exe_unit.union_all,
ra_exe_unit.query_state};
Expand All @@ -114,6 +115,7 @@ RelAlgExecutionUnit create_count_all_execution_unit(
nullptr,
SortInfo{{}, SortAlgorithm::Default, 0, 0},
0,
ra_exe_unit.query_hint,
false,
ra_exe_unit.union_all,
ra_exe_unit.query_state};
Expand Down
7 changes: 5 additions & 2 deletions QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,7 @@ RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit& ra_exe_unit_in
ra_exe_unit_in.estimator,
ra_exe_unit_in.sort_info,
new_scan_limit,
ra_exe_unit_in.query_hint,
ra_exe_unit_in.use_bump_allocator,
ra_exe_unit_in.union_all,
ra_exe_unit_in.query_state};
Expand Down Expand Up @@ -3120,7 +3121,8 @@ Executor::JoinHashTableOrError Executor::buildHashTableForQualifier(
const std::vector<InputTableInfo>& query_infos,
const MemoryLevel memory_level,
const HashType preferred_hash_type,
ColumnCacheMap& column_cache) {
ColumnCacheMap& column_cache,
const QueryHint& query_hint) {
if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
}
Expand All @@ -3137,7 +3139,8 @@ Executor::JoinHashTableOrError Executor::buildHashTableForQualifier(
preferred_hash_type,
deviceCountForMemoryLevel(memory_level),
column_cache,
this);
this,
query_hint);
return {tbl, ""};
} catch (const HashJoinFail& e) {
return {nullptr, e.what()};
Expand Down
3 changes: 2 additions & 1 deletion QueryEngine/Execute.h
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,8 @@ class Executor {
const std::vector<InputTableInfo>& query_infos,
const MemoryLevel memory_level,
const HashType preferred_hash_type,
ColumnCacheMap& column_cache);
ColumnCacheMap& column_cache,
const QueryHint& query_hint);
void nukeOldState(const bool allow_lazy_fetch,
const std::vector<InputTableInfo>& query_infos,
const PlanState::DeletedColumnsMap& deleted_cols_map,
Expand Down
1 change: 1 addition & 0 deletions QueryEngine/ExecutionKernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class SharedKernelContext {
std::vector<uint64_t> all_frag_row_offsets_;
std::mutex all_frag_row_offsets_mutex_;
const std::vector<InputTableInfo>& query_infos_;
const QueryHint query_hint_;
};

class ExecutionKernel {
Expand Down
3 changes: 2 additions & 1 deletion QueryEngine/IRCodegen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,8 @@ std::shared_ptr<HashJoin> Executor::buildCurrentLevelHashTable(
co.device_type == ExecutorDeviceType::GPU ? MemoryLevel::GPU_LEVEL
: MemoryLevel::CPU_LEVEL,
HashType::OneToOne,
column_cache);
column_cache,
ra_exe_unit.query_hint);
current_level_hash_table = hash_table_or_error.hash_table;
}
if (hash_table_or_error.hash_table) {
Expand Down
20 changes: 15 additions & 5 deletions QueryEngine/JoinHashTable/HashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ std::shared_ptr<HashJoin> HashJoin::getInstance(
const HashType preferred_hash_type,
const int device_count,
ColumnCacheMap& column_cache,
Executor* executor) {
Executor* executor,
const QueryHint& query_hint) {
auto timer = DEBUG_TIMER(__func__);
std::shared_ptr<HashJoin> join_hash_table;
CHECK_GT(device_count, 0);
Expand All @@ -232,8 +233,13 @@ std::shared_ptr<HashJoin> HashJoin::getInstance(
}
if (qual_bin_oper->is_overlaps_oper()) {
VLOG(1) << "Trying to build geo hash table:";
join_hash_table = OverlapsJoinHashTable::getInstance(
qual_bin_oper, query_infos, memory_level, device_count, column_cache, executor);
join_hash_table = OverlapsJoinHashTable::getInstance(qual_bin_oper,
query_infos,
memory_level,
device_count,
column_cache,
executor,
query_hint);
} else if (dynamic_cast<const Analyzer::ExpressionTuple*>(
qual_bin_oper->get_left_operand())) {
VLOG(1) << "Trying to build keyed hash table:";
Expand Down Expand Up @@ -458,14 +464,16 @@ std::shared_ptr<HashJoin> HashJoin::getSyntheticInstance(
AllColumnVarsVisitor().visit(qual_bin_oper.get());
auto query_infos = getSyntheticInputTableInfo(cvs, executor);
setupSyntheticCaching(cvs, executor);
QueryHint query_hint = QueryHint::defaults();

auto hash_table = HashJoin::getInstance(qual_bin_oper,
query_infos,
memory_level,
preferred_hash_type,
device_count,
column_cache,
executor);
executor,
query_hint);
return hash_table;
}

Expand All @@ -481,14 +489,16 @@ std::shared_ptr<HashJoin> HashJoin::getSyntheticInstance(
AllColumnVarsVisitor().visit(qual_bin_oper.get());
auto query_infos = getSyntheticInputTableInfo(cvs, executor);
setupSyntheticCaching(cvs, executor);
QueryHint query_hint = QueryHint::defaults();

auto hash_table = HashJoin::getInstance(qual_bin_oper,
query_infos,
memory_level,
preferred_hash_type,
device_count,
column_cache,
executor);
executor,
query_hint);
return hash_table;
}

Expand Down
3 changes: 2 additions & 1 deletion QueryEngine/JoinHashTable/HashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ class HashJoin {
const HashType preferred_hash_type,
const int device_count,
ColumnCacheMap& column_cache,
Executor* executor);
Executor* executor,
const QueryHint& query_hint);

//! Make hash table from named tables and columns (such as for testing).
static std::shared_ptr<HashJoin> getSyntheticInstance(
Expand Down
107 changes: 71 additions & 36 deletions QueryEngine/JoinHashTable/OverlapsJoinHashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ std::shared_ptr<OverlapsJoinHashTable> OverlapsJoinHashTable::getInstance(
const Data_Namespace::MemoryLevel memory_level,
const int device_count,
ColumnCacheMap& column_cache,
Executor* executor) {
Executor* executor,
const QueryHint& query_hint) {
decltype(std::chrono::steady_clock::now()) ts1, ts2;
auto inner_outer_pairs = normalize_column_pairs(
condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
Expand Down Expand Up @@ -95,6 +96,9 @@ std::shared_ptr<OverlapsJoinHashTable> OverlapsJoinHashTable::getInstance(
executor,
inner_outer_pairs,
device_count);
if (query_hint.hint_delivered) {
join_hash_table->registerQueryHint(query_hint);
}
try {
join_hash_table->reify(layout);
} catch (const HashJoinFail& e) {
Expand Down Expand Up @@ -130,6 +134,34 @@ void OverlapsJoinHashTable::reifyWithLayout(const HashType layout) {
if (query_info.fragments.empty()) {
return;
}

auto overlaps_max_table_size_bytes = g_overlaps_max_table_size_bytes;
bool use_user_given_bucket_threshold = false;
auto query_hint = getRegisteredQueryHint();
if (query_hint.hint_delivered) {
if (query_hint.overlaps_bucket_threshold != overlaps_hashjoin_bucket_threshold_) {
VLOG(1) << "User changes a threshold \'overlaps_hashjoin_bucket_threshold\' via "
"query hint: "
<< overlaps_hashjoin_bucket_threshold_ << " -> "
<< query_hint.overlaps_bucket_threshold;
overlaps_hashjoin_bucket_threshold_ = query_hint.overlaps_bucket_threshold;
use_user_given_bucket_threshold = true;
}
if (query_hint.overlaps_max_size != overlaps_max_table_size_bytes) {
std::ostringstream oss;
oss << "User requests to change a threshold \'overlaps_max_table_size_bytes\' via "
"query hint: "
<< overlaps_max_table_size_bytes << " -> " << query_hint.overlaps_max_size;
if (!use_user_given_bucket_threshold) {
overlaps_max_table_size_bytes = query_hint.overlaps_max_size;
} else {
oss << ", but is skipped since the query hint also changes the threshold "
"\'overlaps_hashjoin_bucket_threshold\'";
}
VLOG(1) << oss.str();
}
}

std::vector<ColumnsForDevice> columns_per_device;
const auto catalog = executor_->getCatalog();
CHECK(catalog);
Expand Down Expand Up @@ -175,42 +207,45 @@ void OverlapsJoinHashTable::reifyWithLayout(const HashType layout) {

// Auto-tuner: Pre-calculate some possible hash table sizes.
std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
auto atc = auto_tuner_cache_.find(cache_key);
if (atc != auto_tuner_cache_.end()) {
overlaps_hashjoin_bucket_threshold_ = atc->second;
VLOG(1) << "Auto tuner using cached overlaps hash table size of: "
<< overlaps_hashjoin_bucket_threshold_;
} else {
VLOG(1) << "Auto tuning for the overlaps hash table size:";
// TODO(jclay): Currently, joining on large poly sets
// will lead to lengthy construction times (and large hash tables)
// tune this to account for the characteristics of the data being joined.
const double min_threshold{1e-5};
const double max_threshold{1};
double good_threshold{max_threshold};
for (double threshold = max_threshold; threshold >= min_threshold;
threshold /= 10.0) {
overlaps_hashjoin_bucket_threshold_ = threshold;
size_t entry_count;
size_t emitted_keys_count;
std::tie(entry_count, emitted_keys_count) =
calculateCounts(shard_count, query_info, columns_per_device);
size_t hash_table_size = calculateHashTableSize(
bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
bucket_sizes_for_dimension_.clear();
VLOG(1) << "Calculated bin threshold of " << std::fixed << threshold
<< " giving: entry count " << entry_count << " hash table size "
<< hash_table_size;
if (hash_table_size <= g_overlaps_max_table_size_bytes) {
good_threshold = overlaps_hashjoin_bucket_threshold_;
} else {
VLOG(1) << "Rejected bin threshold of " << std::fixed << threshold;
break;
if (!use_user_given_bucket_threshold) {
// auto-tuning is valid iff no query hint is delivered to change bucket threshold
auto atc = auto_tuner_cache_.find(cache_key);
if (atc != auto_tuner_cache_.end()) {
overlaps_hashjoin_bucket_threshold_ = atc->second;
VLOG(1) << "Auto tuner using cached overlaps hash table size of: "
<< overlaps_hashjoin_bucket_threshold_;
} else {
VLOG(1) << "Auto tuning for the overlaps hash table size:";
// TODO(jclay): Currently, joining on large poly sets
// will lead to lengthy construction times (and large hash tables)
// tune this to account for the characteristics of the data being joined.
const double min_threshold{1e-5};
const double max_threshold{1};
double good_threshold{max_threshold};
for (double threshold = max_threshold; threshold >= min_threshold;
threshold /= 10.0) {
overlaps_hashjoin_bucket_threshold_ = threshold;
size_t entry_count;
size_t emitted_keys_count;
std::tie(entry_count, emitted_keys_count) =
calculateCounts(shard_count, query_info, columns_per_device);
size_t hash_table_size = calculateHashTableSize(
bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
bucket_sizes_for_dimension_.clear();
VLOG(1) << "Calculated bin threshold of " << std::fixed << threshold
<< " giving: entry count " << entry_count << " hash table size "
<< hash_table_size;
if (hash_table_size <= overlaps_max_table_size_bytes) {
good_threshold = overlaps_hashjoin_bucket_threshold_;
} else {
VLOG(1) << "Rejected bin threshold of " << std::fixed << threshold;
break;
}
}
overlaps_hashjoin_bucket_threshold_ = good_threshold;
if (!cache_key_contains_intermediate_table(cache_key)) {
auto_tuner_cache_[cache_key] = overlaps_hashjoin_bucket_threshold_;
}
}
overlaps_hashjoin_bucket_threshold_ = good_threshold;
if (!cache_key_contains_intermediate_table(cache_key)) {
auto_tuner_cache_[cache_key] = overlaps_hashjoin_bucket_threshold_;
}
}

Expand Down
10 changes: 9 additions & 1 deletion QueryEngine/JoinHashTable/OverlapsJoinHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class OverlapsJoinHashTable : public HashJoin {
, device_count_(device_count) {
CHECK_GT(device_count_, 0);
hash_tables_for_device_.resize(std::max(device_count_, 1));
query_hint_ = QueryHint::defaults();
}

virtual ~OverlapsJoinHashTable() {}
Expand All @@ -50,7 +51,8 @@ class OverlapsJoinHashTable : public HashJoin {
const Data_Namespace::MemoryLevel memory_level,
const int device_count,
ColumnCacheMap& column_cache,
Executor* executor);
Executor* executor,
const QueryHint& query_hint);

static auto getCacheInvalidator() -> std::function<void()> {
VLOG(1) << "Invalidate " << auto_tuner_cache_.size() << " cached overlaps hashtable.";
Expand Down Expand Up @@ -135,6 +137,10 @@ class OverlapsJoinHashTable : public HashJoin {
return nullptr;
}

const QueryHint& getRegisteredQueryHint() { return query_hint_; }

void registerQueryHint(const QueryHint& query_hint) { query_hint_ = query_hint; }

static std::map<HashTableCacheKey, double> auto_tuner_cache_;
static std::mutex auto_tuner_cache_mutex_;

Expand Down Expand Up @@ -245,4 +251,6 @@ class OverlapsJoinHashTable : public HashJoin {
using HashTableCacheValue = std::shared_ptr<HashTable>;
static std::unique_ptr<HashTableCache<HashTableCacheKey, HashTableCacheValue>>
hash_table_cache_;

QueryHint query_hint_;
};
51 changes: 50 additions & 1 deletion QueryEngine/QueryHint.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,57 @@
#ifndef OMNISCI_QUERYHINT_H
#define OMNISCI_QUERYHINT_H

#include "ThriftHandler/CommandLineOptions.h"

struct QueryHint {
bool cpu_mode{false};
// for each hint "H", we first define its value as the corresponding system-defined
// default value "D"
// After then, if we detect at least one hint is registered (via hint_delivered),
// we can compare the value btw. "H" and "D" during the query compilation step that H
// is involved and then use the "H" iff "H" != "D"
// since that indicates user-given hint is delivered
// (otherwise, "H" should be the equal to "D")
// note that we should check if H is valid W.R.T the proper value range
// i.e., if H is valid in 0.0 ~ 1.0, then we check that at the point
// when we decide to use H, and use D iff given H does not have a valid value
QueryHint() {
hint_delivered = false;
cpu_mode = false;
overlaps_bucket_threshold = 0.1;
overlaps_max_size = g_overlaps_max_table_size_bytes;
}

QueryHint& operator=(const QueryHint& other) {
hint_delivered = other.hint_delivered;
cpu_mode = other.cpu_mode;
overlaps_bucket_threshold = other.overlaps_bucket_threshold;
overlaps_max_size = other.overlaps_max_size;
return *this;
}

QueryHint(const QueryHint& other) {
hint_delivered = other.hint_delivered;
cpu_mode = other.cpu_mode;
overlaps_bucket_threshold = other.overlaps_bucket_threshold;
overlaps_max_size = other.overlaps_max_size;
}

// set true if at least one query hint is delivered
bool hint_delivered;

// general query execution
bool cpu_mode;

// overlaps hash join
double overlaps_bucket_threshold; // defined in "OverlapsJoinHashTable.h"
size_t overlaps_max_size;

std::unordered_map<std::string, int> OMNISCI_SUPPORTED_HINT_CLASS = {
{"cpu_mode", 0},
{"overlaps_bucket_threshold", 1},
{"overlaps_max_size", 2}};

static QueryHint defaults() { return QueryHint(); }
};

#endif // OMNISCI_QUERYHINT_H
3 changes: 3 additions & 0 deletions QueryEngine/QueryRewrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ RelAlgExecutionUnit QueryRewriter::rewriteOverlapsJoin(
ra_exe_unit_in.estimator,
ra_exe_unit_in.sort_info,
ra_exe_unit_in.scan_limit,
ra_exe_unit_in.query_hint,
ra_exe_unit_in.use_bump_allocator};
}

Expand Down Expand Up @@ -369,6 +370,7 @@ RelAlgExecutionUnit QueryRewriter::rewriteColumnarUpdate(
ra_exe_unit_in.estimator,
ra_exe_unit_in.sort_info,
ra_exe_unit_in.scan_limit,
ra_exe_unit_in.query_hint,
ra_exe_unit_in.use_bump_allocator,
ra_exe_unit_in.union_all,
ra_exe_unit_in.query_state};
Expand Down Expand Up @@ -468,6 +470,7 @@ RelAlgExecutionUnit QueryRewriter::rewriteColumnarDelete(
ra_exe_unit_in.estimator,
ra_exe_unit_in.sort_info,
ra_exe_unit_in.scan_limit,
ra_exe_unit_in.query_hint,
ra_exe_unit_in.use_bump_allocator,
ra_exe_unit_in.union_all,
ra_exe_unit_in.query_state};
Expand Down
Loading

0 comments on commit 3a86f6e

Please sign in to comment.