diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.cpp b/presto-native-execution/presto_cpp/main/SessionProperties.cpp index 717bddd0bea9..6a3277e4b5dd 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.cpp +++ b/presto-native-execution/presto_cpp/main/SessionProperties.cpp @@ -316,6 +316,16 @@ SessionProperties::SessionProperties() { QueryConfig::kOpTraceDirectoryCreateConfig, c.opTraceDirectoryCreateConfig()); + addSessionProperty( + kOpTraceDirectoryCreateConfig, + "Config used to create operator trace directory. This config is provided to" + " underlying file system and the config is free form. The form should be defined " + "by the underlying file system.", + VARCHAR(), + false, + QueryConfig::kOpTraceDirectoryCreateConfig, + c.opTraceDirectoryCreateConfig()); + addSessionProperty( kMaxOutputBufferSize, "The maximum size in bytes for the task's buffered output. The buffer is" diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h index fd20af1f0c6c..4d6fb3d4c01e 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.h +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -211,7 +211,7 @@ class SessionProperties { /// underlying file system and the config is free form. The form should be /// defined by the underlying file system. static constexpr const char* kOpTraceDirectoryCreateConfig = - "native_op_trace_directory_create_config" + "native_op_trace_directory_create_config"; /// The maximum size in bytes for the task's buffered output. The buffer is /// shared among all drivers. diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 94be0589cabb..6bb2ce4b56ee 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -70,31 +70,35 @@ static void maybeSetupTaskSpillDirectory( const auto includeNodeInSpillPath = SystemConfig::instance()->includeNodeInSpillPath(); auto nodeConfig = NodeConfig::instance(); - const auto [taskSpillDirPath, taskDateStrPath] = TaskManager::buildTaskSpillDirectoryPath( - baseSpillDirectory, - nodeConfig->nodeInternalAddress(), - nodeConfig->nodeId(), - execTask.queryCtx()->queryId(), - execTask.taskId(), - includeNodeInSpillPath); + const auto [taskSpillDirPath, dateSpillDirPath] = + TaskManager::buildTaskSpillDirectoryPath( + baseSpillDirectory, + nodeConfig->nodeInternalAddress(), + nodeConfig->nodeId(), + execTask.queryCtx()->queryId(), + execTask.taskId(), + includeNodeInSpillPath); execTask.setSpillDirectory(taskSpillDirPath, /*alreadyCreated=*/false); - execTask.setCreateSpillDirectoryCb([spillDir = taskSpillDirPath, dateStrDir = taskDateStrPath]() { - auto fs = filesystems::getFileSystem(dateStrDir, nullptr); - // First create the top level directory (date string of the query) with TTL or other configs if set. - filesystems::DirectoryOptions options; - auto config = SystemConfig::instance()->spillerDirectoryCreateConfig(); - if (!config.empty()) { - options.values.emplace( - filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(), - config); - } - fs->mkdir(dateStrDir, std::move(options)); + execTask.setCreateSpillDirectoryCb( + [spillDir = taskSpillDirPath, dateStrDir = dateSpillDirPath]() { + auto fs = filesystems::getFileSystem(dateStrDir, nullptr); + // First create the top level directory (date string of the query) with + // TTL or other configs if set. + filesystems::DirectoryOptions options; + auto config = SystemConfig::instance()->spillerDirectoryCreateConfig(); + if (!config.empty()) { + options.values.emplace( + filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(), + config); + } + fs->mkdir(dateStrDir, std::move(options)); - // After the parent directory is created, then create the spill directory. - fs->mkdir(spillDir); - return spillDir; - }); + // After the parent directory is created, then create the spill + // then create the spill directory for the actual task. + fs->mkdir(spillDir); + return spillDir; + }); } // Keep outstanding Promises in RequestHandler's state itself. @@ -396,7 +400,8 @@ std::unique_ptr TaskManager::createOrUpdateErrorTask( return std::make_unique(info); } -/*static*/ std::tuple TaskManager::buildTaskSpillDirectoryPath( +/*static*/ std::tuple +TaskManager::buildTaskSpillDirectoryPath( const std::string& baseSpillPath, const std::string& nodeIp, const std::string& nodeId, @@ -414,17 +419,20 @@ std::unique_ptr TaskManager::createOrUpdateErrorTask( queryId.substr(6, 2)) : "1970-01-01"; - std::string path; - folly::toAppend(fmt::format("{}/presto_native/", baseSpillPath), &path); + std::string taskSpillDirPath; + folly::toAppend( + fmt::format("{}/presto_native/", baseSpillPath), &taskSpillDirPath); if (includeNodeInSpillPath) { - folly::toAppend(fmt::format("{}_{}/", nodeIp, nodeId), &path); + folly::toAppend(fmt::format("{}_{}/", nodeIp, nodeId), &taskSpillDirPath); } - std::string dateStringPath = path; - folly::toAppend(fmt::format("{}/", dateString), &dateStringPath); + std::string dateSpillDirPath = taskSpillDirPath; + folly::toAppend(fmt::format("{}/", dateString), &dateSpillDirPath); - folly::toAppend(fmt::format("{}/{}/{}/", dateString, queryId, taskId), &path); - return std::make_tuple(std::move(path), std::move(dateStringPath)); + folly::toAppend( + fmt::format("{}/{}/{}/", dateString, queryId, taskId), &taskSpillDirPath); + return std::make_tuple( + std::move(taskSpillDirPath), std::move(dateSpillDirPath)); } void TaskManager::getDataForResultRequests( diff --git a/presto-native-execution/presto_cpp/main/TaskManager.h b/presto-native-execution/presto_cpp/main/TaskManager.h index 746e4e0c5aed..063600e42b78 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.h +++ b/presto-native-execution/presto_cpp/main/TaskManager.h @@ -155,7 +155,8 @@ class TaskManager { /// Build directory path for spilling for the given task. /// Always returns tuple of non-empty string containing the spill directory - /// and the date string directory. + /// and the date string directory, which is parent directory of task spill + /// directory. static std::tuple buildTaskSpillDirectoryPath( const std::string& baseSpillPath, const std::string& nodeIp, diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index 9d77902ffbdf..56f54e30f3d4 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -263,9 +263,9 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kSpillerFileCreateConfig{ "spiller.file-create-config"}; - /// Config used to create spill directories. This config is provided to underlying - /// file system and the config is free form. The form should be defined by the - /// underlying file system. + /// Config used to create spill directories. This config is provided to + /// underlying file system and the config is free form. The form should be + /// defined by the underlying file system. static constexpr std::string_view kSpillerDirectoryCreateConfig{ "spiller.directory-create-config"}; diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index efbb4401c744..b5ad4d19d4df 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -1262,12 +1262,15 @@ TEST_P(TaskManagerTest, aggregationSpill) { TEST_P(TaskManagerTest, buildTaskSpillDirectoryPath) { EXPECT_EQ( - std::make_tuple("fs::/base/192.168.10.2_19/2022-12-20/presto_native/20221220-Q/Task1/", - "fs::/base/192.168.10.2_19/2022-12-20/presto_native/20221220-Q/"), + std::make_tuple( + "fs::/base/192.168.10.2_19/2022-12-20/presto_native/20221220-Q/Task1/", + "fs::/base/192.168.10.2_19/2022-12-20/presto_native/20221220-Q/"), TaskManager::buildTaskSpillDirectoryPath( "fs::/base", "192.168.10.2", "19", "20221220-Q", "Task1", true)); EXPECT_EQ( - std::make_tuple("fsx::/root/192.16.10.2_sample_node_id/1970-01-01/presto_native/Q100/Task22/","fsx::/root/192.16.10.2_sample_node_id/1970-01-01/presto_native/Q100/"), + std::make_tuple( + "fsx::/root/192.16.10.2_sample_node_id/1970-01-01/presto_native/Q100/Task22/", + "fsx::/root/192.16.10.2_sample_node_id/1970-01-01/presto_native/Q100/"), TaskManager::buildTaskSpillDirectoryPath( "fsx::/root", "192.16.10.2", @@ -1277,11 +1280,15 @@ TEST_P(TaskManagerTest, buildTaskSpillDirectoryPath) { true)); EXPECT_EQ( - std::make_tuple("fs::/base/2022-12-20/presto_native/20221220-Q/Task1/", "fs::/base/2022-12-20/presto_native/20221220-Q/"), + std::make_tuple( + "fs::/base/2022-12-20/presto_native/20221220-Q/Task1/", + "fs::/base/2022-12-20/presto_native/20221220-Q/"), TaskManager::buildTaskSpillDirectoryPath( "fs::/base", "192.168.10.2", "19", "20221220-Q", "Task1", false)); EXPECT_EQ( - std::make_tuple("fsx::/root/1970-01-01/presto_native/Q100/Task22/","fsx::/root/1970-01-01/presto_native/Q100"), + std::make_tuple( + "fsx::/root/1970-01-01/presto_native/Q100/Task22/", + "fsx::/root/1970-01-01/presto_native/Q100"), TaskManager::buildTaskSpillDirectoryPath( "fsx::/root", "192.16.10.2",