diff --git a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst index 060158c89136..6c692b4eee13 100644 --- a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst +++ b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst @@ -304,4 +304,14 @@ Use ``0`` to disable prefix-sort. * **Default value:** ``130`` Minimum number of rows to use prefix-sort. -The default value has been derived using micro-benchmarking. \ No newline at end of file +The default value has been derived using micro-benchmarking. + +``native_op_trace_directory_create_config`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``varchar`` +* **Default value:** ``""`` + +Native Execution only. Config used to create operator trace directory. This config is provided +to underlying file system and the config is free form. The form should be defined by the +underlying file system. \ No newline at end of file diff --git a/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java b/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java index 15a2404b4d1b..d55531260b1b 100644 --- a/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java +++ b/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java @@ -61,6 +61,7 @@ public class NativeWorkerSessionPropertyProvider public static final String NATIVE_QUERY_TRACE_NODE_IDS = "native_query_trace_node_ids"; public static final String NATIVE_QUERY_TRACE_MAX_BYTES = "native_query_trace_max_bytes"; public static final String NATIVE_QUERY_TRACE_REG_EXP = "native_query_trace_task_reg_exp"; + public static final String NATIVE_OP_TRACE_DIR_CREATE_CONFIG = "native_op_trace_directory_create_config"; public static final String NATIVE_MAX_LOCAL_EXCHANGE_PARTITION_COUNT = "native_max_local_exchange_partition_count"; public static final String NATIVE_SPILL_PREFIXSORT_ENABLED = "native_spill_prefixsort_enabled"; public static final String NATIVE_PREFIXSORT_NORMALIZED_KEY_MAX_BYTES = "native_prefixsort_normalized_key_max_bytes"; @@ -226,6 +227,11 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig) "The regexp of traced task id. We only enable trace on a task if its id matches.", "", !nativeExecution), + stringProperty(NATIVE_OP_TRACE_DIR_CREATE_CONFIG, + "Config used to create operator trace directory. This config is provided to underlying file" + + " system and the config is free form. The form should be defined by the underlying file system.", + "", + !nativeExecution), longProperty(NATIVE_MAX_OUTPUT_BUFFER_SIZE, "The maximum size in bytes for the task's buffered output. The buffer is shared among all drivers.", 200L << 20, diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.cpp b/presto-native-execution/presto_cpp/main/SessionProperties.cpp index 85e5d9be2ee0..1c50d2d91d95 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.cpp +++ b/presto-native-execution/presto_cpp/main/SessionProperties.cpp @@ -306,6 +306,16 @@ SessionProperties::SessionProperties() { QueryConfig::kQueryTraceTaskRegExp, c.queryTraceTaskRegExp()); + addSessionProperty( + kOpTraceDirectoryCreateConfig, + "Config used to create operator trace directory. This config is provided to" + " underlying file system and the config is free form. The form should be defined " + "by the underlying file system.", + VARCHAR(), + false, + QueryConfig::kOpTraceDirectoryCreateConfig, + c.opTraceDirectoryCreateConfig()); + addSessionProperty( kMaxOutputBufferSize, "The maximum size in bytes for the task's buffered output. The buffer is" diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h index 13a8f51e3d7a..4d6fb3d4c01e 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.h +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -207,6 +207,12 @@ class SessionProperties { static constexpr const char* kQueryTraceTaskRegExp = "native_query_trace_task_reg_exp"; + /// Config used to create operator trace directory. This config is provided to + /// underlying file system and the config is free form. The form should be + /// defined by the underlying file system. + static constexpr const char* kOpTraceDirectoryCreateConfig = + "native_op_trace_directory_create_config"; + /// The maximum size in bytes for the task's buffered output. The buffer is /// shared among all drivers. static constexpr const char* kMaxOutputBufferSize = diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 3e5955e56546..6bb2ce4b56ee 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -70,14 +70,35 @@ static void maybeSetupTaskSpillDirectory( const auto includeNodeInSpillPath = SystemConfig::instance()->includeNodeInSpillPath(); auto nodeConfig = NodeConfig::instance(); - const auto taskSpillDirPath = TaskManager::buildTaskSpillDirectoryPath( - baseSpillDirectory, - nodeConfig->nodeInternalAddress(), - nodeConfig->nodeId(), - execTask.queryCtx()->queryId(), - execTask.taskId(), - includeNodeInSpillPath); + const auto [taskSpillDirPath, dateSpillDirPath] = + TaskManager::buildTaskSpillDirectoryPath( + baseSpillDirectory, + nodeConfig->nodeInternalAddress(), + nodeConfig->nodeId(), + execTask.queryCtx()->queryId(), + execTask.taskId(), + includeNodeInSpillPath); execTask.setSpillDirectory(taskSpillDirPath, /*alreadyCreated=*/false); + + execTask.setCreateSpillDirectoryCb( + [spillDir = taskSpillDirPath, dateStrDir = dateSpillDirPath]() { + auto fs = filesystems::getFileSystem(dateStrDir, nullptr); + // First create the top level directory (date string of the query) with + // TTL or other configs if set. + filesystems::DirectoryOptions options; + auto config = SystemConfig::instance()->spillerDirectoryCreateConfig(); + if (!config.empty()) { + options.values.emplace( + filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(), + config); + } + fs->mkdir(dateStrDir, std::move(options)); + + // After the parent directory is created, then create the spill + // then create the spill directory for the actual task. + fs->mkdir(spillDir); + return spillDir; + }); } // Keep outstanding Promises in RequestHandler's state itself. @@ -379,7 +400,8 @@ std::unique_ptr TaskManager::createOrUpdateErrorTask( return std::make_unique(info); } -/*static*/ std::string TaskManager::buildTaskSpillDirectoryPath( +/*static*/ std::tuple +TaskManager::buildTaskSpillDirectoryPath( const std::string& baseSpillPath, const std::string& nodeIp, const std::string& nodeId, @@ -397,13 +419,20 @@ std::unique_ptr TaskManager::createOrUpdateErrorTask( queryId.substr(6, 2)) : "1970-01-01"; - std::string path; - folly::toAppend(fmt::format("{}/presto_native/", baseSpillPath), &path); + std::string taskSpillDirPath; + folly::toAppend( + fmt::format("{}/presto_native/", baseSpillPath), &taskSpillDirPath); if (includeNodeInSpillPath) { - folly::toAppend(fmt::format("{}_{}/", nodeIp, nodeId), &path); + folly::toAppend(fmt::format("{}_{}/", nodeIp, nodeId), &taskSpillDirPath); } - folly::toAppend(fmt::format("{}/{}/{}/", dateString, queryId, taskId), &path); - return path; + + std::string dateSpillDirPath = taskSpillDirPath; + folly::toAppend(fmt::format("{}/", dateString), &dateSpillDirPath); + + folly::toAppend( + fmt::format("{}/{}/{}/", dateString, queryId, taskId), &taskSpillDirPath); + return std::make_tuple( + std::move(taskSpillDirPath), std::move(dateSpillDirPath)); } void TaskManager::getDataForResultRequests( diff --git a/presto-native-execution/presto_cpp/main/TaskManager.h b/presto-native-execution/presto_cpp/main/TaskManager.h index 5453a188309f..063600e42b78 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.h +++ b/presto-native-execution/presto_cpp/main/TaskManager.h @@ -154,8 +154,10 @@ class TaskManager { std::vector& stuckOpCalls) const; /// Build directory path for spilling for the given task. - /// Always returns non-empty string. - static std::string buildTaskSpillDirectoryPath( + /// Always returns tuple of non-empty string containing the spill directory + /// and the date string directory, which is parent directory of task spill + /// directory. + static std::tuple buildTaskSpillDirectoryPath( const std::string& baseSpillPath, const std::string& nodeIp, const std::string& nodeId, diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index dfd37e507164..0929f43436a8 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -164,6 +164,7 @@ SystemConfig::SystemConfig() { NUM_PROP(kDriverNumStuckOperatorsToDetachWorker, 8), NUM_PROP(kSpillerNumCpuThreadsHwMultiplier, 1.0), STR_PROP(kSpillerFileCreateConfig, ""), + STR_PROP(kSpillerDirectoryCreateConfig, ""), NONE_PROP(kSpillerSpillPath), NUM_PROP(kShutdownOnsetSec, 10), NUM_PROP(kSystemMemoryGb, 40), @@ -409,6 +410,10 @@ std::string SystemConfig::spillerFileCreateConfig() const { return optionalProperty(kSpillerFileCreateConfig).value(); } +std::string SystemConfig::spillerDirectoryCreateConfig() const { + return optionalProperty(kSpillerDirectoryCreateConfig).value(); +} + folly::Optional SystemConfig::spillerSpillPath() const { return optionalProperty(kSpillerSpillPath); } diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index 77cfa1feac6a..56f54e30f3d4 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -263,6 +263,12 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kSpillerFileCreateConfig{ "spiller.file-create-config"}; + /// Config used to create spill directories. This config is provided to + /// underlying file system and the config is free form. The form should be + /// defined by the underlying file system. + static constexpr std::string_view kSpillerDirectoryCreateConfig{ + "spiller.directory-create-config"}; + static constexpr std::string_view kSpillerSpillPath{ "experimental.spiller-spill-path"}; static constexpr std::string_view kShutdownOnsetSec{"shutdown-onset-sec"}; @@ -734,6 +740,8 @@ class SystemConfig : public ConfigBase { std::string spillerFileCreateConfig() const; + std::string spillerDirectoryCreateConfig() const; + folly::Optional spillerSpillPath() const; int32_t shutdownOnsetSec() const; diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index eb953a8b543e..43190f2e5c03 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -1262,11 +1262,15 @@ TEST_P(TaskManagerTest, aggregationSpill) { TEST_P(TaskManagerTest, buildTaskSpillDirectoryPath) { EXPECT_EQ( - "fs::/base/presto_native/192.168.10.2_19/2022-12-20/20221220-Q/Task1/", + std::make_tuple( + "fs::/base/presto_native/192.168.10.2_19/2022-12-20/20221220-Q/Task1/", + "fs::/base/presto_native/192.168.10.2_19/2022-12-20/"), TaskManager::buildTaskSpillDirectoryPath( "fs::/base", "192.168.10.2", "19", "20221220-Q", "Task1", true)); EXPECT_EQ( - "fsx::/root/presto_native/192.16.10.2_sample_node_id/1970-01-01/Q100/Task22/", + std::make_tuple( + "fsx::/root/presto_native/192.16.10.2_sample_node_id/1970-01-01/Q100/Task22/", + "fsx::/root/presto_native/192.16.10.2_sample_node_id/1970-01-01/"), TaskManager::buildTaskSpillDirectoryPath( "fsx::/root", "192.16.10.2", @@ -1274,12 +1278,17 @@ TEST_P(TaskManagerTest, buildTaskSpillDirectoryPath) { "Q100", "Task22", true)); + EXPECT_EQ( - "fs::/base/presto_native/2022-12-20/20221220-Q/Task1/", + std::make_tuple( + "fs::/base/presto_native/2022-12-20/20221220-Q/Task1/", + "fs::/base/presto_native/2022-12-20/"), TaskManager::buildTaskSpillDirectoryPath( "fs::/base", "192.168.10.2", "19", "20221220-Q", "Task1", false)); EXPECT_EQ( - "fsx::/root/presto_native/1970-01-01/Q100/Task22/", + std::make_tuple( + "fsx::/root/presto_native/1970-01-01/Q100/Task22/", + "fsx::/root/presto_native/1970-01-01/"), TaskManager::buildTaskSpillDirectoryPath( "fsx::/root", "192.16.10.2",