diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala index 0b1d131d318e..ae92e0e52ee7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala @@ -22,6 +22,7 @@ import org.apache.gluten.exception.GlutenException import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter} import org.apache.gluten.expression.UDFMappings import org.apache.gluten.init.NativeBackendInitializer +import org.apache.gluten.storage.memory.NativeDataCache import org.apache.gluten.utils._ import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace} @@ -30,6 +31,7 @@ import org.apache.spark.api.plugin.PluginContext import org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, VeloxParquetWriterInjects, VeloxRowSplitter} import org.apache.spark.sql.expression.UDFResolver import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf} +import org.apache.spark.storage.memory.{GlutenExtMemStoreInjects, GlutenMemStoreInjects} import org.apache.spark.util.SparkDirectoryUtil import org.apache.commons.lang3.StringUtils @@ -56,6 +58,17 @@ class VeloxListenerApi extends ListenerApi { override def onExecutorStart(pc: PluginContext): Unit = { UDFResolver.resolveUdfConf(pc.conf(), isDriver = false) initialize(pc.conf()) + if (GlutenMemStoreInjects.getReservationListener() == null) { + // scalastyle:off println + println("!!!!ReservationListenser is null.") + // scalastyle:on println + } + NativeDataCache.setAsyncDataCache( + GlutenMemStoreInjects.getMemStoreSize(), + GlutenMemStoreInjects.getReservationListener()) + // scalastyle:off println + println("!!!!Data cache is initialized.") + // scalastyle:on println } override def onExecutorShutdown(): Unit = shutdown() @@ -198,6 +211,8 @@ class VeloxListenerApi extends ListenerApi { GlutenParquetWriterInjects.setInstance(new VeloxParquetWriterInjects()) GlutenOrcWriterInjects.setInstance(new VeloxOrcWriterInjects()) GlutenRowSplitter.setInstance(new VeloxRowSplitter()) + // for memStore evict + GlutenMemStoreInjects.setInstance(new GlutenExtMemStoreInjects()) } private def shutdown(): Unit = { diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index dc9ce3435c38..9437b3804647 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -207,6 +207,10 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS file(MAKE_DIRECTORY ${root_directory}/releases) add_library(gluten SHARED ${SPARK_COLUMNAR_PLUGIN_SRCS}) add_dependencies(gluten jni_proto) +target_include_directories(gluten PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR} + ${VELOX_HOME}/ +) if(ENABLE_GLUTEN_VCPKG) # Hide symbols of some static dependencies. Otherwise, if such dependencies are already diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 05ecf9635eb0..ef3e5c1c5c6f 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -303,6 +303,7 @@ set(VELOX_SRCS memory/BufferOutputStream.cc memory/VeloxColumnarBatch.cc memory/VeloxMemoryManager.cc + memory/VeloxListenableMmapAllocator.cc operators/functions/RegistrationAllFunctions.cc operators/functions/RowConstructorWithNull.cc operators/functions/SparkTokenizer.cc @@ -352,6 +353,7 @@ target_include_directories(velox PUBLIC ${CMAKE_SYSTEM_INCLUDE_PATH} ${JNI_INCLUDE_DIRS} ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/../ ${VELOX_HOME}/ ${VELOX_BUILD_PATH}/ ${VELOX_BUILD_PATH}/_deps/xsimd-src/include/ diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 10d1c7529710..d17a0c8c569f 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -15,6 +15,7 @@ * limitations under the License. */ #include +#include #include "VeloxBackend.h" @@ -108,7 +109,7 @@ void VeloxBackend::init(const std::unordered_map& conf // Setup and register. velox::filesystems::registerLocalFileSystem(); initJolFilesystem(); - initCache(); + // initCache(); initConnector(); // Register Velox functions @@ -126,27 +127,20 @@ void VeloxBackend::init(const std::unordered_map& conf facebook::velox::memory::MemoryManager::initialize({}); } -facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const { - return asyncDataCache_.get(); +uint64_t VeloxBackend::shrinkAsyncDataCache(uint64_t size) const { + std::cout << "Will shrink:" << size << std::endl; + return asyncDataCache_->shrink(size); } -// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces -void VeloxBackend::initJolFilesystem() { - int64_t maxSpillFileSize = backendConf_->get(kMaxSpillFileSize, kMaxSpillFileSizeDefault); - - // FIXME It's known that if spill compression is disabled, the actual spill file size may - // in crease beyond this limit a little (maximum 64 rows which is by default - // one compression page) - gluten::registerJolFileSystem(maxSpillFileSize); -} - -void VeloxBackend::initCache() { - if (backendConf_->get(kVeloxCacheEnabled, false)) { +void VeloxBackend::setAsyncDataCache( + int64_t memCacheSize, + gluten::VeloxListenableMmapAllocator* allocator) { + if (true) { FLAGS_ssd_odirect = true; FLAGS_ssd_odirect = backendConf_->get(kVeloxSsdODirectEnabled, false); - uint64_t memCacheSize = backendConf_->get(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault); + // uint64_t memCacheSize = backendConf_->get(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault); uint64_t ssdCacheSize = backendConf_->get(kVeloxSsdCacheSize, kVeloxSsdCacheSizeDefault); int32_t ssdCacheShards = backendConf_->get(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault); int32_t ssdCacheIOThreads = backendConf_->get(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault); @@ -167,25 +161,38 @@ void VeloxBackend::initCache() { "free space: " + std::to_string(si.available)) } - velox::memory::MmapAllocator::Options options; - options.capacity = memCacheSize; - cacheAllocator_ = std::make_shared(options); if (ssdCacheSize == 0) { LOG(INFO) << "AsyncDataCache will do memory caching only as ssd cache size is 0"; // TODO: this is not tracked by Spark. - asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get()); + asyncDataCache_ = velox::cache::AsyncDataCache::create(allocator); } else { // TODO: this is not tracked by Spark. - asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get(), std::move(ssd)); + // asyncDataCache_ = velox::cache::AsyncDataCache::create(allocator, std::move(ssd)); + asyncDataCache_ = velox::cache::AsyncDataCache::create(allocator); } VELOX_CHECK_NOT_NULL(dynamic_cast(asyncDataCache_.get())) - LOG(INFO) << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize + std::cout << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize << ", ssdCache prefix: " << ssdCachePath << ", ssdCache size: " << ssdCacheSize - << ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads; + << ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads + << std::endl; } } +facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const { + return asyncDataCache_.get(); +} + +// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces +void VeloxBackend::initJolFilesystem() { + int64_t maxSpillFileSize = backendConf_->get(kMaxSpillFileSize, kMaxSpillFileSizeDefault); + + // FIXME It's known that if spill compression is disabled, the actual spill file size may + // in crease beyond this limit a little (maximum 64 rows which is by default + // one compression page) + gluten::registerJolFileSystem(maxSpillFileSize); +} + void VeloxBackend::initConnector() { // The configs below are used at process level. std::unordered_map connectorConfMap = backendConf_->values(); diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index e8298eeed192..db081b229a6d 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -28,6 +28,7 @@ #include "velox/common/memory/MemoryPool.h" #include "velox/common/memory/MmapAllocator.h" #include "velox/core/Config.h" +#include "velox/memory/VeloxListenableMmapAllocator.h" namespace gluten { /// As a static instance in per executor, initialized at executor startup. @@ -51,8 +52,12 @@ class VeloxBackend { static VeloxBackend* get(); + void setAsyncDataCache(int64_t memCacheSize, gluten::VeloxListenableMmapAllocator* allocator); + facebook::velox::cache::AsyncDataCache* getAsyncDataCache() const; + uint64_t shrinkAsyncDataCache(uint64_t size) const; + std::shared_ptr getBackendConf() const { return backendConf_; } @@ -70,7 +75,6 @@ class VeloxBackend { } void init(const std::unordered_map& conf); - void initCache(); void initConnector(); void initUdf(); @@ -87,7 +91,6 @@ class VeloxBackend { std::unique_ptr ssdCacheExecutor_; std::unique_ptr ioExecutor_; - std::shared_ptr cacheAllocator_; std::string cachePathPrefix_; std::string cacheFilePrefix_; diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 9da7355d1b3a..bfc80dcd01ba 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -16,6 +16,7 @@ */ #include +#include #include #include @@ -36,6 +37,11 @@ using namespace facebook; +static jclass javaReservationListenerClass; + +static jmethodID reserveMemoryMethod; +static jmethodID unreserveMemoryMethod; + #ifdef __cplusplus extern "C" { #endif @@ -45,7 +51,14 @@ jint JNI_OnLoad(JavaVM* vm, void*) { if (vm->GetEnv(reinterpret_cast(&env), jniVersion) != JNI_OK) { return JNI_ERR; } + javaReservationListenerClass = createGlobalClassReference( + env, + "Lorg/apache/spark/storage/memory/" + "ExtMemStoreReservationListener;"); + reserveMemoryMethod = getMethodIdOrError(env, javaReservationListenerClass, "reserve", "(J)J"); + unreserveMemoryMethod = getMethodIdOrError(env, javaReservationListenerClass, "unreserve", "(J)J"); + gluten::getJniCommonState()->ensureInitialized(env); gluten::getJniErrorState()->ensureInitialized(env); gluten::initVeloxJniFileSystem(env); @@ -85,6 +98,42 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_init_NativeBackendInitializer_shut JNI_METHOD_END() } +JNIEXPORT void JNICALL Java_org_apache_gluten_storage_memory_NativeDataCache_setAsyncDataCache( // NOLINT + JNIEnv* env, + jclass, + jlong size, + jobject jlistener) { + JNI_METHOD_START + JavaVM* vm; + if (env->GetJavaVM(&vm) != JNI_OK) { + throw gluten::GlutenException("Unable to get JavaVM instance"); + } + std::cout << "try creating allocationListener. size: " << size << std::endl; + gluten::AllocationListener* listener = + new SparkAllocationListener(vm, jlistener, reserveMemoryMethod, unreserveMemoryMethod); + velox::memory::MmapAllocator::Options options; + options.capacity = size; + + facebook::velox::memory::MmapAllocator* cacheAllocator = new facebook::velox::memory::MmapAllocator(options); + gluten::VeloxListenableMmapAllocator* listenableAlloc = + new gluten::VeloxListenableMmapAllocator(cacheAllocator, listener, options); + if (gluten::VeloxBackend::get() == nullptr) { + std::cout << "backend is null" << std::endl; + } + gluten::VeloxBackend::get()->setAsyncDataCache(size, listenableAlloc); + JNI_METHOD_END() +} + +JNIEXPORT jlong JNICALL Java_org_apache_gluten_storage_memory_NativeDataCache_shrinkAsyncDataCache( // NOLINT + JNIEnv* env, + jclass, + jlong size) { + JNI_METHOD_START + uint64_t shrinkedSize = gluten::VeloxBackend::get()->shrinkAsyncDataCache(static_cast(size)); + return static_cast(shrinkedSize); + JNI_METHOD_END(gluten::kInvalidResourceHandle) +} + JNIEXPORT void JNICALL Java_org_apache_gluten_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT JNIEnv* env, jclass) { diff --git a/cpp/velox/memory/VeloxListenableMmapAllocator.cc b/cpp/velox/memory/VeloxListenableMmapAllocator.cc new file mode 100644 index 000000000000..84ef5637b58f --- /dev/null +++ b/cpp/velox/memory/VeloxListenableMmapAllocator.cc @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "VeloxListenableMmapAllocator.h" +#include + +namespace gluten { + +int64_t VeloxListenableMmapAllocator::freeNonContiguous(facebook::velox::memory::Allocation& allocation) { + std::cout << "calling freeNonContiguous in VeloxListenableMmapAllocator" << std::endl; + int64_t freedSize = delegated_->freeNonContiguous(allocation); + std::cout << "freedSize is " << freedSize << std::endl; + listener_->allocationChanged(-freedSize); + return freedSize; +} + +bool VeloxListenableMmapAllocator::allocateNonContiguousWithoutRetry( + const facebook::velox::memory::MemoryAllocator::SizeMix& sizeMix, + facebook::velox::memory::Allocation& out) { + std::cout << "calling allocateNonContiguousWithoutRetry in VeloxListenableMmapAllocator" << std::endl; + // Firstly assure such amount of memory can be allocated from StorageMemoryPool + const int64_t acquired = facebook::velox::memory::AllocationTraits::pageBytes(sizeMix.totalPages); + listener_->allocationChanged(acquired); + // If comes to here, means allocation succeed as no exception pops + const bool success = facebook::velox::memory::MmapAllocator::allocateNonContiguousWithoutRetry(sizeMix, out); + if (success) { + VELOX_CHECK(!out.empty()); + } + // Do nothing if allocateNonContiguousWithoutRetry fails as cleanupAllocAndReleaseReservation + // will be called in MemoryAllocator to release corresponding allocated bytes. + return success; +} + +} // namespace gluten diff --git a/cpp/velox/memory/VeloxListenableMmapAllocator.h b/cpp/velox/memory/VeloxListenableMmapAllocator.h new file mode 100644 index 000000000000..7e20335b3aad --- /dev/null +++ b/cpp/velox/memory/VeloxListenableMmapAllocator.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "memory/AllocationListener.h" +#include "velox/common/memory/MmapAllocator.h" + +namespace gluten { + +class VeloxListenableMmapAllocator final : public facebook::velox::memory::MmapAllocator { + public: + VeloxListenableMmapAllocator( + facebook::velox::memory::MmapAllocator* delegated, + AllocationListener* listener, + const Options& options) + : facebook::velox::memory::MmapAllocator(options) { + delegated_ = delegated; + listener_ = listener; + } + + public: + int64_t freeNonContiguous(facebook::velox::memory::Allocation& allocation) override; + + bool allocateNonContiguousWithoutRetry( + const facebook::velox::memory::MemoryAllocator::SizeMix& sizeMix, + facebook::velox::memory::Allocation& out) override; + + private: + facebook::velox::memory::MmapAllocator* delegated_; + AllocationListener* listener_; +}; + +} // namespace gluten diff --git a/package/pom.xml b/package/pom.xml index db4056a7e109..2a514fa94e0e 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -265,6 +265,7 @@ org.apache.spark.sql.execution.columnar.ByteBufferHelper$ org.apache.spark.rdd.EmptyRDD + org.apache.spark.storage.memory.ExternalMemoryStore org.apache.spark.sql.hive.execution.HiveFileFormat org.apache.spark.sql.hive.execution.HiveFileFormat$$$$anon$1 org.apache.spark.sql.hive.execution.HiveOutputWriter diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index d76e698dcf2a..b91660b0cb9f 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -1246,13 +1246,6 @@ object GlutenConfig { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("1GB") - val COLUMNAR_VELOX_MEM_INIT_CAPACITY = - buildConf("spark.gluten.sql.columnar.backend.velox.memInitCapacity") - .internal() - .doc("The initial memory capacity to reserve for a newly created Velox query memory pool.") - .bytesConf(ByteUnit.BYTE) - .createWithDefaultString("8MB") - val COLUMNAR_VELOX_SSD_CACHE_PATH = buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath") .internal() diff --git a/shims/common/src/main/scala/org/apache/gluten/storage/memory/NativeDataCache.java b/shims/common/src/main/scala/org/apache/gluten/storage/memory/NativeDataCache.java new file mode 100644 index 000000000000..c8571a36e409 --- /dev/null +++ b/shims/common/src/main/scala/org/apache/gluten/storage/memory/NativeDataCache.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.storage.memory; + +import org.apache.spark.storage.memory.ExtMemStoreReservationListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public final class NativeDataCache { + private static final Logger LOG = LoggerFactory.getLogger(NativeDataCache.class); + + public static native void setAsyncDataCache( + long size, ExtMemStoreReservationListener listener); + + public static native long shrinkAsyncDataCache(long size); +} diff --git a/shims/common/src/main/scala/org/apache/spark/storage/memory/ExtMemStoreReservationListener.java b/shims/common/src/main/scala/org/apache/spark/storage/memory/ExtMemStoreReservationListener.java new file mode 100644 index 000000000000..3585ae1b9136 --- /dev/null +++ b/shims/common/src/main/scala/org/apache/spark/storage/memory/ExtMemStoreReservationListener.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage.memory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Reserve Spark managed memory. */ +public class ExtMemStoreReservationListener { + + private static final Logger LOG = + LoggerFactory.getLogger(ExtMemStoreReservationListener.class); + + private final ExternalMemoryStore extMemoryStore; + + public ExtMemStoreReservationListener(ExternalMemoryStore extMemoryStore) { + this.extMemoryStore = extMemoryStore; + } + + public long reserve(long size) { + synchronized (this) { + try { + System.out.println("*******Data cache tries to acquire " + size + " bytes memory."); + boolean success = extMemoryStore.acquireStorageMemory(size); + if (success) { + return size; + } else { + throw new RuntimeException("Failed acquire enough memory from storage memory pool"); + } + } catch (Exception e) { + LOG.error("Error reserving memory from native memory store", e); + throw e; + } + } + } + + public long unreserve(long size) { + synchronized (this) { + System.out.println("*******Data cache tries to release " + size + " bytes memory."); + extMemoryStore.releaseStorageMemory(size); + return size; + } + } +} diff --git a/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenExtMemStoreInjects.scala b/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenExtMemStoreInjects.scala new file mode 100644 index 000000000000..9736bf30cb4f --- /dev/null +++ b/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenExtMemStoreInjects.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage.memory + +import org.apache.gluten.storage.memory.NativeDataCache + +class GlutenExtMemStoreInjects { + def initExternalCache(size: Long, memoryStore: ExternalMemoryStore): Unit = { + // scalastyle:off println + println("*******initializing data cache through injects.") + // scalastyle:on println + val reservationListener = new ExtMemStoreReservationListener(memoryStore) + NativeDataCache.setAsyncDataCache(size, reservationListener) + } + def evictEntriesToFreeSpace(spaceToFree: Long): Long = { + // scalastyle:off println + println("*******Will shrink data cache with size" + spaceToFree + " bytes.") + // scalastyle:on println + val shrinkedSize = NativeDataCache.shrinkAsyncDataCache(spaceToFree) + // scalastyle:off println + println("*******Shrinked size: " + shrinkedSize + " bytes.") + // scalastyle:on println + shrinkedSize + } +} diff --git a/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenMemStoreInjects.scala b/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenMemStoreInjects.scala new file mode 100644 index 000000000000..c65038c0ce9a --- /dev/null +++ b/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenMemStoreInjects.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage.memory + +object GlutenMemStoreInjects { + private var INSTANCE: GlutenExtMemStoreInjects = _ + private var memStoreSize: Long = _ + private var reservationListener: ExtMemStoreReservationListener = _ + + def setInstance(instance: GlutenExtMemStoreInjects): Unit = { + INSTANCE = instance + } + def getInstance(): GlutenExtMemStoreInjects = { + if (INSTANCE == null) { + throw new IllegalStateException("GlutenExtMemStoreInjects is not initialized") +// INSTANCE = new GlutenExtMemStoreInjects() + } + INSTANCE + } + + def setMemStoreSize(size: Long): Unit = { + memStoreSize = size; + } + + def getMemStoreSize(): Long = { + memStoreSize + } + + def setReservationListener(listener: ExtMemStoreReservationListener): Unit = { + reservationListener = listener + } + + def getReservationListener(): ExtMemStoreReservationListener = { + reservationListener + } +} diff --git a/shims/spark33/src/main/scala/org/apache/spark/storage/memory/ExternalMemoryStore.scala b/shims/spark33/src/main/scala/org/apache/spark/storage/memory/ExternalMemoryStore.scala new file mode 100644 index 000000000000..39b584d0af83 --- /dev/null +++ b/shims/spark33/src/main/scala/org/apache/spark/storage/memory/ExternalMemoryStore.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage.memory + +import org.apache.gluten.GlutenConfig +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{EXTERNAL_MEMORY_STORAGE_FRACTION, MEMORY_OFFHEAP_SIZE, MEMORY_STORAGE_FRACTION} +import org.apache.spark.memory.{MemoryManager, MemoryMode} +import org.apache.spark.storage.BlockId + +private[spark] class ExternalMemoryStore(conf: SparkConf, memoryManager: MemoryManager) + extends Logging { + + // Note: all changes to memory allocations, notably evicting entries and + // acquiring memory, must be synchronized on `memoryManager`! + + private[spark] def initExternalCache(conf: SparkConf): Unit = { + val reservationListener = new ExtMemStoreReservationListener(this) +// NativeBackendInitializer.setAsyncDataCache(size, reservationListener) + if (GlutenConfig.getConf.enableVeloxCache) { + // scalastyle:off println + println("*******Will initializing data cache late in velox backend") + // scalastyle:on println + val size = (conf.get(MEMORY_OFFHEAP_SIZE) * conf.get(MEMORY_STORAGE_FRACTION) + * conf.get(EXTERNAL_MEMORY_STORAGE_FRACTION)).toLong + GlutenMemStoreInjects.setMemStoreSize(size) + GlutenMemStoreInjects.setReservationListener(reservationListener) + } + } + + // This method will call memoryManager.acquireStorageMemory() + // NativeStorageMemoryListener will connect this method with velox memory allocator.allocate() + private[spark] def acquireStorageMemory(numBytes: Long): Boolean = memoryManager.synchronized { + // scalastyle:off println + println("*******acquireStorageMemory in NativeMemoryStore") + // scalastyle:on println + memoryManager.acquireStorageMemory(BlockId("test_file_cache"), numBytes, MemoryMode.OFF_HEAP) + } + // We don't need handle increase spaceToFree in storage memory pool in this method + // as releaseStorageMemory() will be automatically triggered during eviction + private[spark] def evictEntriesToFreeSpace(spaceToFree: Long): Long = { + if (GlutenConfig.getConf.enableVeloxCache) { + // scalastyle:off println + println("*******trying to evict data cache: " + spaceToFree) + // scalastyle:on println + GlutenMemStoreInjects.getInstance().evictEntriesToFreeSpace(spaceToFree) + } else { + 0L + } + } + // This method will call memoryManager.releaseStorageMemory() + // NativeStorageMemoryListener will connect this method with velox memory allocator.free() + private[spark] def releaseStorageMemory(numBytes: Long): Unit = memoryManager.synchronized { + // scalastyle:off println + println("*******releaseStorageMemory in NativeMemoryStore") + // scalastyle:on println + memoryManager.releaseStorageMemory(numBytes, MemoryMode.OFF_HEAP) + } +}