Skip to content

Commit

Permalink
[Description]
Browse files Browse the repository at this point in the history
In native engine, add ability to set a create directory callback and add the corresponding config as a property. We also expose native_op_trace_directory_create_config as a session property.

[Description]
In native engine, add ability to set a create directory callback and add the corresponding config as a property. We also expose native_op_trace_directory_create_config as a session property.
  • Loading branch information
yuandagits committed Nov 30, 2024
1 parent bd10f61 commit dcc1631
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 20 deletions.
12 changes: 11 additions & 1 deletion presto-docs/src/main/sphinx/presto_cpp/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,14 @@ Use ``0`` to disable prefix-sort.
* **Default value:** ``130``

Minimum number of rows to use prefix-sort.
The default value has been derived using micro-benchmarking.
The default value has been derived using micro-benchmarking.

``native_op_trace_directory_create_config``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``varchar``
* **Default value:** ``""``

Native Execution only. Config used to create operator trace directory. This config is provided
to underlying file system and the config is free form. The form should be defined by the
underlying file system.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class NativeWorkerSessionPropertyProvider
public static final String NATIVE_QUERY_TRACE_NODE_IDS = "native_query_trace_node_ids";
public static final String NATIVE_QUERY_TRACE_MAX_BYTES = "native_query_trace_max_bytes";
public static final String NATIVE_QUERY_TRACE_REG_EXP = "native_query_trace_task_reg_exp";
public static final String NATIVE_OP_TRACE_DIR_CREATE_CONFIG = "native_op_trace_directory_create_config";
public static final String NATIVE_MAX_LOCAL_EXCHANGE_PARTITION_COUNT = "native_max_local_exchange_partition_count";
public static final String NATIVE_SPILL_PREFIXSORT_ENABLED = "native_spill_prefixsort_enabled";
public static final String NATIVE_PREFIXSORT_NORMALIZED_KEY_MAX_BYTES = "native_prefixsort_normalized_key_max_bytes";
Expand Down Expand Up @@ -226,6 +227,11 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig)
"The regexp of traced task id. We only enable trace on a task if its id matches.",
"",
!nativeExecution),
stringProperty(NATIVE_OP_TRACE_DIR_CREATE_CONFIG,
"Config used to create operator trace directory. This config is provided to underlying file" +
" system and the config is free form. The form should be defined by the underlying file system.",
"",
!nativeExecution),
longProperty(NATIVE_MAX_OUTPUT_BUFFER_SIZE,
"The maximum size in bytes for the task's buffered output. The buffer is shared among all drivers.",
200L << 20,
Expand Down
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,16 @@ SessionProperties::SessionProperties() {
QueryConfig::kQueryTraceTaskRegExp,
c.queryTraceTaskRegExp());

addSessionProperty(
kOpTraceDirectoryCreateConfig,
"Config used to create operator trace directory. This config is provided to"
" underlying file system and the config is free form. The form should be defined "
"by the underlying file system.",
VARCHAR(),
false,
QueryConfig::kOpTraceDirectoryCreateConfig,
c.opTraceDirectoryCreateConfig());

addSessionProperty(
kMaxOutputBufferSize,
"The maximum size in bytes for the task's buffered output. The buffer is"
Expand Down
6 changes: 6 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ class SessionProperties {
static constexpr const char* kQueryTraceTaskRegExp =
"native_query_trace_task_reg_exp";

/// Config used to create operator trace directory. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr const char* kOpTraceDirectoryCreateConfig =
"native_op_trace_directory_create_config";

/// The maximum size in bytes for the task's buffered output. The buffer is
/// shared among all drivers.
static constexpr const char* kMaxOutputBufferSize =
Expand Down
55 changes: 42 additions & 13 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,35 @@ static void maybeSetupTaskSpillDirectory(
const auto includeNodeInSpillPath =
SystemConfig::instance()->includeNodeInSpillPath();
auto nodeConfig = NodeConfig::instance();
const auto taskSpillDirPath = TaskManager::buildTaskSpillDirectoryPath(
baseSpillDirectory,
nodeConfig->nodeInternalAddress(),
nodeConfig->nodeId(),
execTask.queryCtx()->queryId(),
execTask.taskId(),
includeNodeInSpillPath);
const auto [taskSpillDirPath, dateSpillDirPath] =
TaskManager::buildTaskSpillDirectoryPath(
baseSpillDirectory,
nodeConfig->nodeInternalAddress(),
nodeConfig->nodeId(),
execTask.queryCtx()->queryId(),
execTask.taskId(),
includeNodeInSpillPath);
execTask.setSpillDirectory(taskSpillDirPath, /*alreadyCreated=*/false);

execTask.setCreateSpillDirectoryCb(
[spillDir = taskSpillDirPath, dateStrDir = dateSpillDirPath]() {
auto fs = filesystems::getFileSystem(dateStrDir, nullptr);
// First create the top level directory (date string of the query) with
// TTL or other configs if set.
filesystems::DirectoryOptions options;
auto config = SystemConfig::instance()->spillerDirectoryCreateConfig();
if (!config.empty()) {
options.values.emplace(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
config);
}
fs->mkdir(dateStrDir, std::move(options));

// After the parent directory is created, then create the spill
// then create the spill directory for the actual task.
fs->mkdir(spillDir);
return spillDir;
});
}

// Keep outstanding Promises in RequestHandler's state itself.
Expand Down Expand Up @@ -379,7 +400,8 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
return std::make_unique<TaskInfo>(info);
}

/*static*/ std::string TaskManager::buildTaskSpillDirectoryPath(
/*static*/ std::tuple<std::string, std::string>
TaskManager::buildTaskSpillDirectoryPath(
const std::string& baseSpillPath,
const std::string& nodeIp,
const std::string& nodeId,
Expand All @@ -397,13 +419,20 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
queryId.substr(6, 2))
: "1970-01-01";

std::string path;
folly::toAppend(fmt::format("{}/presto_native/", baseSpillPath), &path);
std::string taskSpillDirPath;
folly::toAppend(
fmt::format("{}/presto_native/", baseSpillPath), &taskSpillDirPath);
if (includeNodeInSpillPath) {
folly::toAppend(fmt::format("{}_{}/", nodeIp, nodeId), &path);
folly::toAppend(fmt::format("{}_{}/", nodeIp, nodeId), &taskSpillDirPath);
}
folly::toAppend(fmt::format("{}/{}/{}/", dateString, queryId, taskId), &path);
return path;

std::string dateSpillDirPath = taskSpillDirPath;
folly::toAppend(fmt::format("{}/", dateString), &dateSpillDirPath);

folly::toAppend(
fmt::format("{}/{}/{}/", dateString, queryId, taskId), &taskSpillDirPath);
return std::make_tuple(
std::move(taskSpillDirPath), std::move(dateSpillDirPath));
}

void TaskManager::getDataForResultRequests(
Expand Down
6 changes: 4 additions & 2 deletions presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ class TaskManager {
std::vector<velox::exec::Task::OpCallInfo>& stuckOpCalls) const;

/// Build directory path for spilling for the given task.
/// Always returns non-empty string.
static std::string buildTaskSpillDirectoryPath(
/// Always returns tuple of non-empty string containing the spill directory
/// and the date string directory, which is parent directory of task spill
/// directory.
static std::tuple<std::string, std::string> buildTaskSpillDirectoryPath(
const std::string& baseSpillPath,
const std::string& nodeIp,
const std::string& nodeId,
Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ SystemConfig::SystemConfig() {
NUM_PROP(kDriverNumStuckOperatorsToDetachWorker, 8),
NUM_PROP(kSpillerNumCpuThreadsHwMultiplier, 1.0),
STR_PROP(kSpillerFileCreateConfig, ""),
STR_PROP(kSpillerDirectoryCreateConfig, ""),
NONE_PROP(kSpillerSpillPath),
NUM_PROP(kShutdownOnsetSec, 10),
NUM_PROP(kSystemMemoryGb, 40),
Expand Down Expand Up @@ -409,6 +410,10 @@ std::string SystemConfig::spillerFileCreateConfig() const {
return optionalProperty<std::string>(kSpillerFileCreateConfig).value();
}

std::string SystemConfig::spillerDirectoryCreateConfig() const {
return optionalProperty<std::string>(kSpillerDirectoryCreateConfig).value();
}

folly::Optional<std::string> SystemConfig::spillerSpillPath() const {
return optionalProperty(kSpillerSpillPath);
}
Expand Down
8 changes: 8 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kSpillerFileCreateConfig{
"spiller.file-create-config"};

/// Config used to create spill directories. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr std::string_view kSpillerDirectoryCreateConfig{
"spiller.directory-create-config"};

static constexpr std::string_view kSpillerSpillPath{
"experimental.spiller-spill-path"};
static constexpr std::string_view kShutdownOnsetSec{"shutdown-onset-sec"};
Expand Down Expand Up @@ -734,6 +740,8 @@ class SystemConfig : public ConfigBase {

std::string spillerFileCreateConfig() const;

std::string spillerDirectoryCreateConfig() const;

folly::Optional<std::string> spillerSpillPath() const;

int32_t shutdownOnsetSec() const;
Expand Down
17 changes: 13 additions & 4 deletions presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1262,24 +1262,33 @@ TEST_P(TaskManagerTest, aggregationSpill) {

TEST_P(TaskManagerTest, buildTaskSpillDirectoryPath) {
EXPECT_EQ(
"fs::/base/presto_native/192.168.10.2_19/2022-12-20/20221220-Q/Task1/",
std::make_tuple(
"fs::/base/presto_native/192.168.10.2_19/2022-12-20/20221220-Q/Task1/",
"fs::/base/presto_native/192.168.10.2_19/2022-12-20/"),
TaskManager::buildTaskSpillDirectoryPath(
"fs::/base", "192.168.10.2", "19", "20221220-Q", "Task1", true));
EXPECT_EQ(
"fsx::/root/presto_native/192.16.10.2_sample_node_id/1970-01-01/Q100/Task22/",
std::make_tuple(
"fsx::/root/presto_native/192.16.10.2_sample_node_id/1970-01-01/Q100/Task22/",
"fsx::/root/presto_native/192.16.10.2_sample_node_id/1970-01-01/"),
TaskManager::buildTaskSpillDirectoryPath(
"fsx::/root",
"192.16.10.2",
"sample_node_id",
"Q100",
"Task22",
true));

EXPECT_EQ(
"fs::/base/presto_native/2022-12-20/20221220-Q/Task1/",
std::make_tuple(
"fs::/base/presto_native/2022-12-20/20221220-Q/Task1/",
"fs::/base/presto_native/2022-12-20/"),
TaskManager::buildTaskSpillDirectoryPath(
"fs::/base", "192.168.10.2", "19", "20221220-Q", "Task1", false));
EXPECT_EQ(
"fsx::/root/presto_native/1970-01-01/Q100/Task22/",
std::make_tuple(
"fsx::/root/presto_native/1970-01-01/Q100/Task22/",
"fsx::/root/presto_native/1970-01-01/"),
TaskManager::buildTaskSpillDirectoryPath(
"fsx::/root",
"192.16.10.2",
Expand Down

0 comments on commit dcc1631

Please sign in to comment.