Skip to content

Commit

Permalink
[GLUTEN-5884][VL] support native data cache spill in Gluten
Browse files Browse the repository at this point in the history
  • Loading branch information
yma11 committed Jun 7, 2024
1 parent 17697fa commit f4632e5
Show file tree
Hide file tree
Showing 15 changed files with 453 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter}
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.init.NativeBackendInitializer
import org.apache.gluten.storage.memory.NativeDataCache
import org.apache.gluten.utils._
import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace}

Expand All @@ -30,6 +31,7 @@ import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
import org.apache.spark.storage.memory.{GlutenExtMemStoreInjects, GlutenMemStoreInjects}
import org.apache.spark.util.SparkDirectoryUtil

import org.apache.commons.lang3.StringUtils
Expand All @@ -56,6 +58,17 @@ class VeloxListenerApi extends ListenerApi {
override def onExecutorStart(pc: PluginContext): Unit = {
UDFResolver.resolveUdfConf(pc.conf(), isDriver = false)
initialize(pc.conf())
if (GlutenMemStoreInjects.getReservationListener() == null) {
// scalastyle:off println
println("!!!!ReservationListenser is null.")
// scalastyle:on println
}
NativeDataCache.setAsyncDataCache(
GlutenMemStoreInjects.getMemStoreSize(),
GlutenMemStoreInjects.getReservationListener())
// scalastyle:off println
println("!!!!Data cache is initialized.")
// scalastyle:on println
}

override def onExecutorShutdown(): Unit = shutdown()
Expand Down Expand Up @@ -198,6 +211,8 @@ class VeloxListenerApi extends ListenerApi {
GlutenParquetWriterInjects.setInstance(new VeloxParquetWriterInjects())
GlutenOrcWriterInjects.setInstance(new VeloxOrcWriterInjects())
GlutenRowSplitter.setInstance(new VeloxRowSplitter())
// for memStore evict
GlutenMemStoreInjects.setInstance(new GlutenExtMemStoreInjects())
}

private def shutdown(): Unit = {
Expand Down
4 changes: 4 additions & 0 deletions cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
file(MAKE_DIRECTORY ${root_directory}/releases)
add_library(gluten SHARED ${SPARK_COLUMNAR_PLUGIN_SRCS})
add_dependencies(gluten jni_proto)
target_include_directories(gluten PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
${VELOX_HOME}/
)

if(ENABLE_GLUTEN_VCPKG)
# Hide symbols of some static dependencies. Otherwise, if such dependencies are already
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ set(VELOX_SRCS
memory/BufferOutputStream.cc
memory/VeloxColumnarBatch.cc
memory/VeloxMemoryManager.cc
memory/VeloxListenableMmapAllocator.cc
operators/functions/RegistrationAllFunctions.cc
operators/functions/RowConstructorWithNull.cc
operators/functions/SparkTokenizer.cc
Expand Down Expand Up @@ -352,6 +353,7 @@ target_include_directories(velox PUBLIC
${CMAKE_SYSTEM_INCLUDE_PATH}
${JNI_INCLUDE_DIRS}
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/../
${VELOX_HOME}/
${VELOX_BUILD_PATH}/
${VELOX_BUILD_PATH}/_deps/xsimd-src/include/
Expand Down
53 changes: 30 additions & 23 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
#include <filesystem>
#include <iostream>

#include "VeloxBackend.h"

Expand Down Expand Up @@ -108,7 +109,7 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
// Setup and register.
velox::filesystems::registerLocalFileSystem();
initJolFilesystem();
initCache();
// initCache();
initConnector();

// Register Velox functions
Expand All @@ -126,27 +127,20 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
facebook::velox::memory::MemoryManager::initialize({});
}

facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
return asyncDataCache_.get();
uint64_t VeloxBackend::shrinkAsyncDataCache(uint64_t size) const {
std::cout << "Will shrink:" << size << std::endl;
return asyncDataCache_->shrink(size);
}

// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces
void VeloxBackend::initJolFilesystem() {
int64_t maxSpillFileSize = backendConf_->get<int64_t>(kMaxSpillFileSize, kMaxSpillFileSizeDefault);

// FIXME It's known that if spill compression is disabled, the actual spill file size may
// in crease beyond this limit a little (maximum 64 rows which is by default
// one compression page)
gluten::registerJolFileSystem(maxSpillFileSize);
}

void VeloxBackend::initCache() {
if (backendConf_->get<bool>(kVeloxCacheEnabled, false)) {
void VeloxBackend::setAsyncDataCache(
int64_t memCacheSize,
gluten::VeloxListenableMmapAllocator* allocator) {
if (true) {
FLAGS_ssd_odirect = true;

FLAGS_ssd_odirect = backendConf_->get<bool>(kVeloxSsdODirectEnabled, false);

uint64_t memCacheSize = backendConf_->get<uint64_t>(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault);
// uint64_t memCacheSize = backendConf_->get<uint64_t>(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault);
uint64_t ssdCacheSize = backendConf_->get<uint64_t>(kVeloxSsdCacheSize, kVeloxSsdCacheSizeDefault);
int32_t ssdCacheShards = backendConf_->get<int32_t>(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault);
int32_t ssdCacheIOThreads = backendConf_->get<int32_t>(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault);
Expand All @@ -167,25 +161,38 @@ void VeloxBackend::initCache() {
"free space: " + std::to_string(si.available))
}

velox::memory::MmapAllocator::Options options;
options.capacity = memCacheSize;
cacheAllocator_ = std::make_shared<velox::memory::MmapAllocator>(options);
if (ssdCacheSize == 0) {
LOG(INFO) << "AsyncDataCache will do memory caching only as ssd cache size is 0";
// TODO: this is not tracked by Spark.
asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get());
asyncDataCache_ = velox::cache::AsyncDataCache::create(allocator);
} else {
// TODO: this is not tracked by Spark.
asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get(), std::move(ssd));
// asyncDataCache_ = velox::cache::AsyncDataCache::create(allocator, std::move(ssd));
asyncDataCache_ = velox::cache::AsyncDataCache::create(allocator);
}

VELOX_CHECK_NOT_NULL(dynamic_cast<velox::cache::AsyncDataCache*>(asyncDataCache_.get()))
LOG(INFO) << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize
std::cout << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize
<< ", ssdCache prefix: " << ssdCachePath << ", ssdCache size: " << ssdCacheSize
<< ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads;
<< ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads
<< std::endl;
}
}

facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
return asyncDataCache_.get();
}

// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces
void VeloxBackend::initJolFilesystem() {
int64_t maxSpillFileSize = backendConf_->get<int64_t>(kMaxSpillFileSize, kMaxSpillFileSizeDefault);

// FIXME It's known that if spill compression is disabled, the actual spill file size may
// in crease beyond this limit a little (maximum 64 rows which is by default
// one compression page)
gluten::registerJolFileSystem(maxSpillFileSize);
}

void VeloxBackend::initConnector() {
// The configs below are used at process level.
std::unordered_map<std::string, std::string> connectorConfMap = backendConf_->values();
Expand Down
7 changes: 5 additions & 2 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "velox/common/memory/MemoryPool.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/core/Config.h"
#include "velox/memory/VeloxListenableMmapAllocator.h"

namespace gluten {
/// As a static instance in per executor, initialized at executor startup.
Expand All @@ -51,8 +52,12 @@ class VeloxBackend {

static VeloxBackend* get();

void setAsyncDataCache(int64_t memCacheSize, gluten::VeloxListenableMmapAllocator* allocator);

facebook::velox::cache::AsyncDataCache* getAsyncDataCache() const;

uint64_t shrinkAsyncDataCache(uint64_t size) const;

std::shared_ptr<facebook::velox::Config> getBackendConf() const {
return backendConf_;
}
Expand All @@ -70,7 +75,6 @@ class VeloxBackend {
}

void init(const std::unordered_map<std::string, std::string>& conf);
void initCache();
void initConnector();
void initUdf();

Expand All @@ -87,7 +91,6 @@ class VeloxBackend {

std::unique_ptr<folly::IOThreadPoolExecutor> ssdCacheExecutor_;
std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
std::shared_ptr<facebook::velox::memory::MmapAllocator> cacheAllocator_;

std::string cachePathPrefix_;
std::string cacheFilePrefix_;
Expand Down
49 changes: 49 additions & 0 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

#include <jni.h>
#include <iostream>

#include <glog/logging.h>
#include <jni/JniCommon.h>
Expand All @@ -36,6 +37,11 @@

using namespace facebook;

static jclass javaReservationListenerClass;

static jmethodID reserveMemoryMethod;
static jmethodID unreserveMemoryMethod;

#ifdef __cplusplus
extern "C" {
#endif
Expand All @@ -45,7 +51,14 @@ jint JNI_OnLoad(JavaVM* vm, void*) {
if (vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
return JNI_ERR;
}
javaReservationListenerClass = createGlobalClassReference(
env,
"Lorg/apache/spark/storage/memory/"
"ExtMemStoreReservationListener;");

reserveMemoryMethod = getMethodIdOrError(env, javaReservationListenerClass, "reserve", "(J)J");
unreserveMemoryMethod = getMethodIdOrError(env, javaReservationListenerClass, "unreserve", "(J)J");

gluten::getJniCommonState()->ensureInitialized(env);
gluten::getJniErrorState()->ensureInitialized(env);
gluten::initVeloxJniFileSystem(env);
Expand Down Expand Up @@ -85,6 +98,42 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_init_NativeBackendInitializer_shut
JNI_METHOD_END()
}

JNIEXPORT void JNICALL Java_org_apache_gluten_storage_memory_NativeDataCache_setAsyncDataCache( // NOLINT
JNIEnv* env,
jclass,
jlong size,
jobject jlistener) {
JNI_METHOD_START
JavaVM* vm;
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}
std::cout << "try creating allocationListener. size: " << size << std::endl;
gluten::AllocationListener* listener =
new SparkAllocationListener(vm, jlistener, reserveMemoryMethod, unreserveMemoryMethod);
velox::memory::MmapAllocator::Options options;
options.capacity = size;

facebook::velox::memory::MmapAllocator* cacheAllocator = new facebook::velox::memory::MmapAllocator(options);
gluten::VeloxListenableMmapAllocator* listenableAlloc =
new gluten::VeloxListenableMmapAllocator(cacheAllocator, listener, options);
if (gluten::VeloxBackend::get() == nullptr) {
std::cout << "backend is null" << std::endl;
}
gluten::VeloxBackend::get()->setAsyncDataCache(size, listenableAlloc);
JNI_METHOD_END()
}

JNIEXPORT jlong JNICALL Java_org_apache_gluten_storage_memory_NativeDataCache_shrinkAsyncDataCache( // NOLINT
JNIEnv* env,
jclass,
jlong size) {
JNI_METHOD_START
uint64_t shrinkedSize = gluten::VeloxBackend::get()->shrinkAsyncDataCache(static_cast<uint64_t>(size));
return static_cast<jlong>(shrinkedSize);
JNI_METHOD_END(gluten::kInvalidResourceHandle)
}

JNIEXPORT void JNICALL Java_org_apache_gluten_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT
JNIEnv* env,
jclass) {
Expand Down
47 changes: 47 additions & 0 deletions cpp/velox/memory/VeloxListenableMmapAllocator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "VeloxListenableMmapAllocator.h"
#include <iostream>

namespace gluten {

int64_t VeloxListenableMmapAllocator::freeNonContiguous(facebook::velox::memory::Allocation& allocation) {
std::cout << "calling freeNonContiguous in VeloxListenableMmapAllocator" << std::endl;
int64_t freedSize = delegated_->freeNonContiguous(allocation);
std::cout << "freedSize is " << freedSize << std::endl;
listener_->allocationChanged(-freedSize);
return freedSize;
}

bool VeloxListenableMmapAllocator::allocateNonContiguousWithoutRetry(
const facebook::velox::memory::MemoryAllocator::SizeMix& sizeMix,
facebook::velox::memory::Allocation& out) {
std::cout << "calling allocateNonContiguousWithoutRetry in VeloxListenableMmapAllocator" << std::endl;
// Firstly assure such amount of memory can be allocated from StorageMemoryPool
const int64_t acquired = facebook::velox::memory::AllocationTraits::pageBytes(sizeMix.totalPages);
listener_->allocationChanged(acquired);
// If comes to here, means allocation succeed as no exception pops
const bool success = facebook::velox::memory::MmapAllocator::allocateNonContiguousWithoutRetry(sizeMix, out);
if (success) {
VELOX_CHECK(!out.empty());
}
// Do nothing if allocateNonContiguousWithoutRetry fails as cleanupAllocAndReleaseReservation
// will be called in MemoryAllocator to release corresponding allocated bytes.
return success;
}

} // namespace gluten
48 changes: 48 additions & 0 deletions cpp/velox/memory/VeloxListenableMmapAllocator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "memory/AllocationListener.h"
#include "velox/common/memory/MmapAllocator.h"

namespace gluten {

class VeloxListenableMmapAllocator final : public facebook::velox::memory::MmapAllocator {
public:
VeloxListenableMmapAllocator(
facebook::velox::memory::MmapAllocator* delegated,
AllocationListener* listener,
const Options& options)
: facebook::velox::memory::MmapAllocator(options) {
delegated_ = delegated;
listener_ = listener;
}

public:
int64_t freeNonContiguous(facebook::velox::memory::Allocation& allocation) override;

bool allocateNonContiguousWithoutRetry(
const facebook::velox::memory::MemoryAllocator::SizeMix& sizeMix,
facebook::velox::memory::Allocation& out) override;

private:
facebook::velox::memory::MmapAllocator* delegated_;
AllocationListener* listener_;
};

} // namespace gluten
1 change: 1 addition & 0 deletions package/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@
<ignoreClass>org.apache.spark.sql.execution.columnar.ByteBufferHelper$</ignoreClass>
<!-- The overridden class list by Gluten. Carefully add entries to this list only when you knew exactly what is going to happen -->
<ignoreClass>org.apache.spark.rdd.EmptyRDD</ignoreClass>
<ignoreClass>org.apache.spark.storage.memory.ExternalMemoryStore</ignoreClass>
<ignoreClass>org.apache.spark.sql.hive.execution.HiveFileFormat</ignoreClass>
<ignoreClass>org.apache.spark.sql.hive.execution.HiveFileFormat$$$$anon$1</ignoreClass>
<ignoreClass>org.apache.spark.sql.hive.execution.HiveOutputWriter</ignoreClass>
Expand Down
Loading

0 comments on commit f4632e5

Please sign in to comment.