From 6eeb4f526f785b09948475c30aacd3a7581cc5a0 Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Mon, 30 Oct 2023 01:08:53 +0400 Subject: [PATCH] Add deletion-triggered compaction to RocksJava --- java/CMakeLists.txt | 2 + java/rocksjni/options.cc | 58 +++++++++++++++++++ .../table_properties_collector_factory.cc | 37 ++++++++++++ .../table_properties_collector_factory.h | 14 +++++ java/src/main/java/org/rocksdb/Options.java | 33 +++++++++++ .../TablePropertiesCollectorFactory.java | 45 ++++++++++++++ .../test/java/org/rocksdb/OptionsTest.java | 16 +++++ src.mk | 1 + 8 files changed, 206 insertions(+) create mode 100644 java/rocksjni/table_properties_collector_factory.cc create mode 100644 java/rocksjni/table_properties_collector_factory.h create mode 100644 java/src/main/java/org/rocksdb/TablePropertiesCollectorFactory.java diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 45f1a6c9421b..c1906058aa0a 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -74,6 +74,7 @@ set(JNI_NATIVE_SOURCES rocksjni/table.cc rocksjni/table_filter.cc rocksjni/table_filter_jnicallback.cc + rocksjni/table_properties_collector_factory.cc rocksjni/testable_event_listener.cc rocksjni/thread_status.cc rocksjni/trace_writer.cc @@ -250,6 +251,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/TableFileDeletionInfo.java src/main/java/org/rocksdb/TableFilter.java src/main/java/org/rocksdb/TableProperties.java + src/main/java/org/rocksdb/TablePropertiesCollectorFactory.java src/main/java/org/rocksdb/TableFormatConfig.java src/main/java/org/rocksdb/ThreadType.java src/main/java/org/rocksdb/ThreadStatus.java diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 0d84901c9176..8281c0c316c7 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -32,11 +32,13 @@ #include "rocksdb/sst_partitioner.h" #include "rocksdb/statistics.h" #include "rocksdb/table.h" +// #include "rocksdb/utilities/table_properties_collectors.h" #include "rocksjni/comparatorjnicallback.h" #include "rocksjni/cplusplus_to_java_convert.h" #include "rocksjni/portal.h" #include "rocksjni/statisticsjni.h" #include "rocksjni/table_filter_jnicallback.h" +#include "rocksjni/table_properties_collector_factory.h" #include "utilities/merge_operators.h" /* @@ -3927,6 +3929,62 @@ jint Java_org_rocksdb_Options_memtableMaxRangeDeletions(JNIEnv*, jobject, return static_cast(opts->memtable_max_range_deletions); } +/* + * Class: org_rocksdb_Options + * Method: tablePropertiesCollectorFactory + * Signature: (J)[J + */ +jlongArray Java_org_rocksdb_Options_tablePropertiesCollectorFactory( + JNIEnv* env, jclass, jlong jhandle) { + auto* opt = reinterpret_cast(jhandle); + const auto size = opt->table_properties_collector_factories.size(); + jlongArray retVal = env->NewLongArray(static_cast(size)); + if (retVal == nullptr) { + // exception thrown: OutOfMemoryError + return nullptr; + } + jlong* buf = env->GetLongArrayElements(retVal, NULL); + if (buf == nullptr) { + // exception thrown: OutOfMemoryError + return nullptr; + } + + for (int i = 0; i < size; i++) { + auto* wrapper = new TablePropertiesCollectorFactoriesJniWrapper(); + wrapper->table_properties_collector_factories = + opt->table_properties_collector_factories[i]; + buf[i] = GET_CPLUSPLUS_POINTER(wrapper); + } + env->ReleaseLongArrayElements(retVal, buf, 0); + return retVal; +} + +/* + * Class: org_rocksdb_Options + * Method: setTablePropertiesCollectorFactory + * Signature: (J[J)V + */ +void Java_org_rocksdb_Options_setTablePropertiesCollectorFactory( + JNIEnv* env, jclass, jlong jhandle, jlongArray j_factory_handlers) { + auto* opt = reinterpret_cast(jhandle); + const auto size = env->GetArrayLength(j_factory_handlers); + + jlong* buf = env->GetLongArrayElements(j_factory_handlers, NULL); + if (buf == nullptr) { + // exception thrown: OutOfMemoryError + return; + } + + opt->table_properties_collector_factories.clear(); + for (int i = 0; i < size; i++) { + auto* wrapper = + reinterpret_cast(buf[i]); + opt->table_properties_collector_factories.emplace_back( + wrapper->table_properties_collector_factories); + } + env->ReleaseLongArrayElements(j_factory_handlers, buf, JNI_ABORT); +} + ////////////////////////////////////////////////////////////////////////////// // ROCKSDB_NAMESPACE::ColumnFamilyOptions diff --git a/java/rocksjni/table_properties_collector_factory.cc b/java/rocksjni/table_properties_collector_factory.cc new file mode 100644 index 000000000000..3b5c6cbc1753 --- /dev/null +++ b/java/rocksjni/table_properties_collector_factory.cc @@ -0,0 +1,37 @@ +// +// Created by rhubner on 23-Oct-23. +// + +#include "java/rocksjni/table_properties_collector_factory.h" +#include "java/include/org_rocksdb_TablePropertiesCollectorFactory.h" +#include "java/rocksjni/cplusplus_to_java_convert.h" +#include "rocksdb/db.h" +#include "rocksdb/utilities/table_properties_collectors.h" + +/* + * Class: org_rocksdb_TablePropertiesCollectorFactory + * Method: newCompactOnDeletionCollectorFactory + * Signature: (JJD)J + */ +jlong Java_org_rocksdb_TablePropertiesCollectorFactory_newCompactOnDeletionCollectorFactory + (JNIEnv *, jclass, jlong sliding_window_size, jlong deletion_trigger, jdouble deletion_ratio) { + + auto* wrapper = new TablePropertiesCollectorFactoriesJniWrapper(); + wrapper->table_properties_collector_factories = ROCKSDB_NAMESPACE::NewCompactOnDeletionCollectorFactory(sliding_window_size, deletion_trigger, deletion_ratio); + return GET_CPLUSPLUS_POINTER(wrapper); + +} + +/* + * Class: org_rocksdb_TablePropertiesCollectorFactory + * Method: deleteCompactOnDeletionCollectorFactory + * Signature: (J)J + */ +void Java_org_rocksdb_TablePropertiesCollectorFactory_deleteCompactOnDeletionCollectorFactory + (JNIEnv *, jclass, jlong jhandle) { + + auto instance = reinterpret_cast(jhandle); + delete instance; + +} + diff --git a/java/rocksjni/table_properties_collector_factory.h b/java/rocksjni/table_properties_collector_factory.h new file mode 100644 index 000000000000..1c469f127db8 --- /dev/null +++ b/java/rocksjni/table_properties_collector_factory.h @@ -0,0 +1,14 @@ +// +// Created by rhubner on 24-Oct-23. +// + +#include "rocksdb/utilities/table_properties_collectors.h" +#include "rocksdb/table_properties.h" + +#ifndef ROCKSDB_TABLE_PROPERTIES_COLLECTOR_FACTORY_H +#define ROCKSDB_TABLE_PROPERTIES_COLLECTOR_FACTORY_H + +struct TablePropertiesCollectorFactoriesJniWrapper { + std::shared_ptr table_properties_collector_factories; +}; +#endif // ROCKSDB_TABLE_PROPERTIES_COLLECTOR_FACTORY_H diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index 29f5e8e0d233..1279f54e780e 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -7,6 +7,7 @@ import java.nio.file.Paths; import java.util.*; +import java.util.stream.Collectors; /** * Options to control the behavior of a database. It will be used @@ -2119,6 +2120,35 @@ public PrepopulateBlobCache prepopulateBlobCache() { return PrepopulateBlobCache.getPrepopulateBlobCache(prepopulateBlobCache(nativeHandle_)); } + /** + * Return copy of TablePropertiesCollectorFactory list. Modifying this list will not change + * underlying options C++ object. {@link #setTablePropertiesCollectorFactory(List) + * setTablePropertiesCollectorFactory} must be called to propagate changes. All instance must be + * properly closed to prevent memory leaks. + * @return copy of TablePropertiesCollectorFactory list. + */ + public List tablePropertiesCollectorFactory() { + long[] factoryHandlers = tablePropertiesCollectorFactory(nativeHandle_); + + return Arrays.stream(factoryHandlers) + .mapToObj(factoryHandle -> TablePropertiesCollectorFactory.newWrapper(factoryHandle)) + .collect(Collectors.toList()); + } + + /** + * Set TablePropertiesCollectorFactory in undulation C++ object. + * This method create its own copy of the list. Caller is responsible for + * closing all the instances in the list. + * @param factories + */ + public void setTablePropertiesCollectorFactory(List factories) { + long[] factoryHandlers = new long[factories.size()]; + for (int i = 0; i < factoryHandlers.length; i++) { + factoryHandlers[i] = factories.get(i).getNativeHandle(); + } + setTablePropertiesCollectorFactory(nativeHandle_, factoryHandlers); + } + // // END options for blobs (integrated BlobDB) // @@ -2563,6 +2593,9 @@ private native void setBlobFileStartingLevel( private native void setPrepopulateBlobCache( final long nativeHandle_, final byte prepopulateBlobCache); private native byte prepopulateBlobCache(final long nativeHandle_); + private static native long[] tablePropertiesCollectorFactory(long nativeHandle); + private static native void setTablePropertiesCollectorFactory( + long nativeHandle, long[] factoryHandlers); // instance variables // NOTE: If you add new member variables, please update the copy constructor above! diff --git a/java/src/main/java/org/rocksdb/TablePropertiesCollectorFactory.java b/java/src/main/java/org/rocksdb/TablePropertiesCollectorFactory.java new file mode 100644 index 000000000000..6fda1e8d7bd2 --- /dev/null +++ b/java/src/main/java/org/rocksdb/TablePropertiesCollectorFactory.java @@ -0,0 +1,45 @@ +package org.rocksdb; + +public abstract class TablePropertiesCollectorFactory extends RocksObject { + + private TablePropertiesCollectorFactory(final long nativeHandle) { + super(nativeHandle); + } + + public static TablePropertiesCollectorFactory NewCompactOnDeletionCollectorFactory(final long sliding_window_size, + final long deletion_trigger, + final double deletion_ratio) { + + //TODO - Call native method to create instance of deletion factory and get pointer. + + long handle = newCompactOnDeletionCollectorFactory(sliding_window_size, deletion_trigger, deletion_ratio); + return new TablePropertiesCollectorFactory(handle) { + @Override + protected void disposeInternal(long handle) { + TablePropertiesCollectorFactory.deleteCompactOnDeletionCollectorFactory(handle); + } + }; + } + + /** + * Internal API. Do not use. + * @param nativeHandle + * @return + */ + static TablePropertiesCollectorFactory newWrapper(final long nativeHandle ) { + return new TablePropertiesCollectorFactory(nativeHandle) { + @Override + protected void disposeInternal(long handle) { + TablePropertiesCollectorFactory.deleteCompactOnDeletionCollectorFactory(handle); + } + }; + } + + private static native long newCompactOnDeletionCollectorFactory(final long sliding_window_size, + final long deletion_trigger, + final double deletion_ratio); + + private static native void deleteCompactOnDeletionCollectorFactory(final long handle); + + +} diff --git a/java/src/test/java/org/rocksdb/OptionsTest.java b/java/src/test/java/org/rocksdb/OptionsTest.java index 4b59464b1e30..1d310ab24beb 100644 --- a/java/src/test/java/org/rocksdb/OptionsTest.java +++ b/java/src/test/java/org/rocksdb/OptionsTest.java @@ -1496,4 +1496,20 @@ public void onMemTableSealed(final MemTableInfo memTableInfo) { assertEquals(0, listeners2.size()); } } + @Test + public void tablePropertiesCollectorFactory() { + try (final Options options = new Options()) { + try (TablePropertiesCollectorFactory collectorFactory = + TablePropertiesCollectorFactory.NewCompactOnDeletionCollectorFactory(10, 10, 1.0)) { + List factories = List.of(collectorFactory); + options.setTablePropertiesCollectorFactory(factories); + } + List factories = options.tablePropertiesCollectorFactory(); + try { + assertThat(factories).hasSize(1); + } finally { + factories.stream().forEach(TablePropertiesCollectorFactory::close); + } + } + } } diff --git a/src.mk b/src.mk index a019205ae96a..f842e01c96f3 100644 --- a/src.mk +++ b/src.mk @@ -701,6 +701,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/table.cc \ java/rocksjni/table_filter.cc \ java/rocksjni/table_filter_jnicallback.cc \ + java/rocksjni/rocksjni/table_filter_jnicallback.cc \ java/rocksjni/thread_status.cc \ java/rocksjni/trace_writer.cc \ java/rocksjni/trace_writer_jnicallback.cc \