Skip to content

Commit

Permalink
[Description]
Browse files Browse the repository at this point in the history
In native engine, add ability to set a create directory callback and add the corresponding config as a property. We also expose native_op_trace_directory_create_config as a session property.
  • Loading branch information
yuandagits committed Nov 30, 2024
1 parent 810e799 commit 56a9e60
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 40 deletions.
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,16 @@ SessionProperties::SessionProperties() {
QueryConfig::kOpTraceDirectoryCreateConfig,
c.opTraceDirectoryCreateConfig());

addSessionProperty(
kOpTraceDirectoryCreateConfig,
"Config used to create operator trace directory. This config is provided to"
" underlying file system and the config is free form. The form should be defined "
"by the underlying file system.",
VARCHAR(),
false,
QueryConfig::kOpTraceDirectoryCreateConfig,
c.opTraceDirectoryCreateConfig());

addSessionProperty(
kMaxOutputBufferSize,
"The maximum size in bytes for the task's buffered output. The buffer is"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class SessionProperties {
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr const char* kOpTraceDirectoryCreateConfig =
"native_op_trace_directory_create_config"
"native_op_trace_directory_create_config";

/// The maximum size in bytes for the task's buffered output. The buffer is
/// shared among all drivers.
Expand Down
68 changes: 38 additions & 30 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,31 +70,35 @@ static void maybeSetupTaskSpillDirectory(
const auto includeNodeInSpillPath =
SystemConfig::instance()->includeNodeInSpillPath();
auto nodeConfig = NodeConfig::instance();
const auto [taskSpillDirPath, taskDateStrPath] = TaskManager::buildTaskSpillDirectoryPath(
baseSpillDirectory,
nodeConfig->nodeInternalAddress(),
nodeConfig->nodeId(),
execTask.queryCtx()->queryId(),
execTask.taskId(),
includeNodeInSpillPath);
const auto [taskSpillDirPath, dateSpillDirPath] =
TaskManager::buildTaskSpillDirectoryPath(
baseSpillDirectory,
nodeConfig->nodeInternalAddress(),
nodeConfig->nodeId(),
execTask.queryCtx()->queryId(),
execTask.taskId(),
includeNodeInSpillPath);
execTask.setSpillDirectory(taskSpillDirPath, /*alreadyCreated=*/false);

execTask.setCreateSpillDirectoryCb([spillDir = taskSpillDirPath, dateStrDir = taskDateStrPath]() {
auto fs = filesystems::getFileSystem(dateStrDir, nullptr);
// First create the top level directory (date string of the query) with TTL or other configs if set.
filesystems::DirectoryOptions options;
auto config = SystemConfig::instance()->spillerDirectoryCreateConfig();
if (!config.empty()) {
options.values.emplace(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
config);
}
fs->mkdir(dateStrDir, std::move(options));
execTask.setCreateSpillDirectoryCb(
[spillDir = taskSpillDirPath, dateStrDir = dateSpillDirPath]() {
auto fs = filesystems::getFileSystem(dateStrDir, nullptr);
// First create the top level directory (date string of the query) with
// TTL or other configs if set.
filesystems::DirectoryOptions options;
auto config = SystemConfig::instance()->spillerDirectoryCreateConfig();
if (!config.empty()) {
options.values.emplace(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
config);
}
fs->mkdir(dateStrDir, std::move(options));

// After the parent directory is created, then create the spill directory.
fs->mkdir(spillDir);
return spillDir;
});
// After the parent directory is created, then create the spill
// then create the spill directory for the actual task.
fs->mkdir(spillDir);
return spillDir;
});
}

// Keep outstanding Promises in RequestHandler's state itself.
Expand Down Expand Up @@ -396,7 +400,8 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
return std::make_unique<TaskInfo>(info);
}

/*static*/ std::tuple<std::string, std::string> TaskManager::buildTaskSpillDirectoryPath(
/*static*/ std::tuple<std::string, std::string>
TaskManager::buildTaskSpillDirectoryPath(
const std::string& baseSpillPath,
const std::string& nodeIp,
const std::string& nodeId,
Expand All @@ -414,17 +419,20 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
queryId.substr(6, 2))
: "1970-01-01";

std::string path;
folly::toAppend(fmt::format("{}/presto_native/", baseSpillPath), &path);
std::string taskSpillDirPath;
folly::toAppend(
fmt::format("{}/presto_native/", baseSpillPath), &taskSpillDirPath);
if (includeNodeInSpillPath) {
folly::toAppend(fmt::format("{}_{}/", nodeIp, nodeId), &path);
folly::toAppend(fmt::format("{}_{}/", nodeIp, nodeId), &taskSpillDirPath);
}

std::string dateStringPath = path;
folly::toAppend(fmt::format("{}/", dateString), &dateStringPath);
std::string dateSpillDirPath = taskSpillDirPath;
folly::toAppend(fmt::format("{}/", dateString), &dateSpillDirPath);

folly::toAppend(fmt::format("{}/{}/{}/", dateString, queryId, taskId), &path);
return std::make_tuple(std::move(path), std::move(dateStringPath));
folly::toAppend(
fmt::format("{}/{}/{}/", dateString, queryId, taskId), &taskSpillDirPath);
return std::make_tuple(
std::move(taskSpillDirPath), std::move(dateSpillDirPath));
}

void TaskManager::getDataForResultRequests(
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ class TaskManager {

/// Build directory path for spilling for the given task.
/// Always returns tuple of non-empty string containing the spill directory
/// and the date string directory.
/// and the date string directory, which is parent directory of task spill
/// directory.
static std::tuple<std::string, std::string> buildTaskSpillDirectoryPath(
const std::string& baseSpillPath,
const std::string& nodeIp,
Expand Down
6 changes: 3 additions & 3 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kSpillerFileCreateConfig{
"spiller.file-create-config"};

/// Config used to create spill directories. This config is provided to underlying
/// file system and the config is free form. The form should be defined by the
/// underlying file system.
/// Config used to create spill directories. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr std::string_view kSpillerDirectoryCreateConfig{
"spiller.directory-create-config"};

Expand Down
17 changes: 12 additions & 5 deletions presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1262,12 +1262,15 @@ TEST_P(TaskManagerTest, aggregationSpill) {

TEST_P(TaskManagerTest, buildTaskSpillDirectoryPath) {
EXPECT_EQ(
std::make_tuple("fs::/base/192.168.10.2_19/2022-12-20/presto_native/20221220-Q/Task1/",
"fs::/base/192.168.10.2_19/2022-12-20/presto_native/20221220-Q/"),
std::make_tuple(
"fs::/base/192.168.10.2_19/2022-12-20/presto_native/20221220-Q/Task1/",
"fs::/base/192.168.10.2_19/2022-12-20/presto_native/20221220-Q/"),
TaskManager::buildTaskSpillDirectoryPath(
"fs::/base", "192.168.10.2", "19", "20221220-Q", "Task1", true));
EXPECT_EQ(
std::make_tuple("fsx::/root/192.16.10.2_sample_node_id/1970-01-01/presto_native/Q100/Task22/","fsx::/root/192.16.10.2_sample_node_id/1970-01-01/presto_native/Q100/"),
std::make_tuple(
"fsx::/root/192.16.10.2_sample_node_id/1970-01-01/presto_native/Q100/Task22/",
"fsx::/root/192.16.10.2_sample_node_id/1970-01-01/presto_native/Q100/"),
TaskManager::buildTaskSpillDirectoryPath(
"fsx::/root",
"192.16.10.2",
Expand All @@ -1277,11 +1280,15 @@ TEST_P(TaskManagerTest, buildTaskSpillDirectoryPath) {
true));

EXPECT_EQ(
std::make_tuple("fs::/base/2022-12-20/presto_native/20221220-Q/Task1/", "fs::/base/2022-12-20/presto_native/20221220-Q/"),
std::make_tuple(
"fs::/base/2022-12-20/presto_native/20221220-Q/Task1/",
"fs::/base/2022-12-20/presto_native/20221220-Q/"),
TaskManager::buildTaskSpillDirectoryPath(
"fs::/base", "192.168.10.2", "19", "20221220-Q", "Task1", false));
EXPECT_EQ(
std::make_tuple("fsx::/root/1970-01-01/presto_native/Q100/Task22/","fsx::/root/1970-01-01/presto_native/Q100"),
std::make_tuple(
"fsx::/root/1970-01-01/presto_native/Q100/Task22/",
"fsx::/root/1970-01-01/presto_native/Q100"),
TaskManager::buildTaskSpillDirectoryPath(
"fsx::/root",
"192.16.10.2",
Expand Down

0 comments on commit 56a9e60

Please sign in to comment.