Skip to content

Commit

Permalink
Add deletion-triggered compaction to RocksJava
Browse files Browse the repository at this point in the history
  • Loading branch information
rhubner committed Oct 29, 2023
1 parent 526f36b commit 6eeb4f5
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 0 deletions.
2 changes: 2 additions & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/table.cc
rocksjni/table_filter.cc
rocksjni/table_filter_jnicallback.cc
rocksjni/table_properties_collector_factory.cc
rocksjni/testable_event_listener.cc
rocksjni/thread_status.cc
rocksjni/trace_writer.cc
Expand Down Expand Up @@ -250,6 +251,7 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/TableFileDeletionInfo.java
src/main/java/org/rocksdb/TableFilter.java
src/main/java/org/rocksdb/TableProperties.java
src/main/java/org/rocksdb/TablePropertiesCollectorFactory.java
src/main/java/org/rocksdb/TableFormatConfig.java
src/main/java/org/rocksdb/ThreadType.java
src/main/java/org/rocksdb/ThreadStatus.java
Expand Down
58 changes: 58 additions & 0 deletions java/rocksjni/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
#include "rocksdb/sst_partitioner.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table.h"
// #include "rocksdb/utilities/table_properties_collectors.h"
#include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/cplusplus_to_java_convert.h"
#include "rocksjni/portal.h"
#include "rocksjni/statisticsjni.h"
#include "rocksjni/table_filter_jnicallback.h"
#include "rocksjni/table_properties_collector_factory.h"
#include "utilities/merge_operators.h"

/*
Expand Down Expand Up @@ -3927,6 +3929,62 @@ jint Java_org_rocksdb_Options_memtableMaxRangeDeletions(JNIEnv*, jobject,
return static_cast<jint>(opts->memtable_max_range_deletions);
}

/*
* Class: org_rocksdb_Options
* Method: tablePropertiesCollectorFactory
* Signature: (J)[J
*/
jlongArray Java_org_rocksdb_Options_tablePropertiesCollectorFactory(
JNIEnv* env, jclass, jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
const auto size = opt->table_properties_collector_factories.size();
jlongArray retVal = env->NewLongArray(static_cast<jsize>(size));
if (retVal == nullptr) {
// exception thrown: OutOfMemoryError
return nullptr;
}
jlong* buf = env->GetLongArrayElements(retVal, NULL);
if (buf == nullptr) {
// exception thrown: OutOfMemoryError
return nullptr;
}

for (int i = 0; i < size; i++) {
auto* wrapper = new TablePropertiesCollectorFactoriesJniWrapper();
wrapper->table_properties_collector_factories =
opt->table_properties_collector_factories[i];
buf[i] = GET_CPLUSPLUS_POINTER(wrapper);
}
env->ReleaseLongArrayElements(retVal, buf, 0);
return retVal;
}

/*
* Class: org_rocksdb_Options
* Method: setTablePropertiesCollectorFactory
* Signature: (J[J)V
*/
void Java_org_rocksdb_Options_setTablePropertiesCollectorFactory(
JNIEnv* env, jclass, jlong jhandle, jlongArray j_factory_handlers) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
const auto size = env->GetArrayLength(j_factory_handlers);

jlong* buf = env->GetLongArrayElements(j_factory_handlers, NULL);
if (buf == nullptr) {
// exception thrown: OutOfMemoryError
return;
}

opt->table_properties_collector_factories.clear();
for (int i = 0; i < size; i++) {
auto* wrapper =
reinterpret_cast<TablePropertiesCollectorFactoriesJniWrapper*>(buf[i]);
opt->table_properties_collector_factories.emplace_back(
wrapper->table_properties_collector_factories);
}
env->ReleaseLongArrayElements(j_factory_handlers, buf, JNI_ABORT);
}

//////////////////////////////////////////////////////////////////////////////
// ROCKSDB_NAMESPACE::ColumnFamilyOptions

Expand Down
37 changes: 37 additions & 0 deletions java/rocksjni/table_properties_collector_factory.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// Created by rhubner on 23-Oct-23.
//

#include "java/rocksjni/table_properties_collector_factory.h"
#include "java/include/org_rocksdb_TablePropertiesCollectorFactory.h"
#include "java/rocksjni/cplusplus_to_java_convert.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/table_properties_collectors.h"

/*
* Class: org_rocksdb_TablePropertiesCollectorFactory
* Method: newCompactOnDeletionCollectorFactory
* Signature: (JJD)J
*/
jlong Java_org_rocksdb_TablePropertiesCollectorFactory_newCompactOnDeletionCollectorFactory
(JNIEnv *, jclass, jlong sliding_window_size, jlong deletion_trigger, jdouble deletion_ratio) {

auto* wrapper = new TablePropertiesCollectorFactoriesJniWrapper();
wrapper->table_properties_collector_factories = ROCKSDB_NAMESPACE::NewCompactOnDeletionCollectorFactory(sliding_window_size, deletion_trigger, deletion_ratio);
return GET_CPLUSPLUS_POINTER(wrapper);

}

/*
* Class: org_rocksdb_TablePropertiesCollectorFactory
* Method: deleteCompactOnDeletionCollectorFactory
* Signature: (J)J
*/
void Java_org_rocksdb_TablePropertiesCollectorFactory_deleteCompactOnDeletionCollectorFactory
(JNIEnv *, jclass, jlong jhandle) {

auto instance = reinterpret_cast<TablePropertiesCollectorFactoriesJniWrapper*>(jhandle);
delete instance;

}

14 changes: 14 additions & 0 deletions java/rocksjni/table_properties_collector_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//
// Created by rhubner on 24-Oct-23.
//

#include "rocksdb/utilities/table_properties_collectors.h"
#include "rocksdb/table_properties.h"

#ifndef ROCKSDB_TABLE_PROPERTIES_COLLECTOR_FACTORY_H
#define ROCKSDB_TABLE_PROPERTIES_COLLECTOR_FACTORY_H

struct TablePropertiesCollectorFactoriesJniWrapper {
std::shared_ptr<rocksdb::TablePropertiesCollectorFactory> table_properties_collector_factories;
};
#endif // ROCKSDB_TABLE_PROPERTIES_COLLECTOR_FACTORY_H
33 changes: 33 additions & 0 deletions java/src/main/java/org/rocksdb/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;

/**
* Options to control the behavior of a database. It will be used
Expand Down Expand Up @@ -2119,6 +2120,35 @@ public PrepopulateBlobCache prepopulateBlobCache() {
return PrepopulateBlobCache.getPrepopulateBlobCache(prepopulateBlobCache(nativeHandle_));
}

/**
* Return copy of TablePropertiesCollectorFactory list. Modifying this list will not change
* underlying options C++ object. {@link #setTablePropertiesCollectorFactory(List)
* setTablePropertiesCollectorFactory} must be called to propagate changes. All instance must be
* properly closed to prevent memory leaks.
* @return copy of TablePropertiesCollectorFactory list.
*/
public List<TablePropertiesCollectorFactory> tablePropertiesCollectorFactory() {
long[] factoryHandlers = tablePropertiesCollectorFactory(nativeHandle_);

return Arrays.stream(factoryHandlers)
.mapToObj(factoryHandle -> TablePropertiesCollectorFactory.newWrapper(factoryHandle))
.collect(Collectors.toList());
}

/**
* Set TablePropertiesCollectorFactory in undulation C++ object.
* This method create its own copy of the list. Caller is responsible for
* closing all the instances in the list.
* @param factories
*/
public void setTablePropertiesCollectorFactory(List<TablePropertiesCollectorFactory> factories) {
long[] factoryHandlers = new long[factories.size()];
for (int i = 0; i < factoryHandlers.length; i++) {
factoryHandlers[i] = factories.get(i).getNativeHandle();
}
setTablePropertiesCollectorFactory(nativeHandle_, factoryHandlers);
}

//
// END options for blobs (integrated BlobDB)
//
Expand Down Expand Up @@ -2563,6 +2593,9 @@ private native void setBlobFileStartingLevel(
private native void setPrepopulateBlobCache(
final long nativeHandle_, final byte prepopulateBlobCache);
private native byte prepopulateBlobCache(final long nativeHandle_);
private static native long[] tablePropertiesCollectorFactory(long nativeHandle);
private static native void setTablePropertiesCollectorFactory(
long nativeHandle, long[] factoryHandlers);

// instance variables
// NOTE: If you add new member variables, please update the copy constructor above!
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.rocksdb;

public abstract class TablePropertiesCollectorFactory extends RocksObject {

private TablePropertiesCollectorFactory(final long nativeHandle) {
super(nativeHandle);
}

public static TablePropertiesCollectorFactory NewCompactOnDeletionCollectorFactory(final long sliding_window_size,
final long deletion_trigger,
final double deletion_ratio) {

//TODO - Call native method to create instance of deletion factory and get pointer.

long handle = newCompactOnDeletionCollectorFactory(sliding_window_size, deletion_trigger, deletion_ratio);
return new TablePropertiesCollectorFactory(handle) {
@Override
protected void disposeInternal(long handle) {
TablePropertiesCollectorFactory.deleteCompactOnDeletionCollectorFactory(handle);
}
};
}

/**
* Internal API. Do not use.
* @param nativeHandle
* @return
*/
static TablePropertiesCollectorFactory newWrapper(final long nativeHandle ) {
return new TablePropertiesCollectorFactory(nativeHandle) {
@Override
protected void disposeInternal(long handle) {
TablePropertiesCollectorFactory.deleteCompactOnDeletionCollectorFactory(handle);
}
};
}

private static native long newCompactOnDeletionCollectorFactory(final long sliding_window_size,
final long deletion_trigger,
final double deletion_ratio);

private static native void deleteCompactOnDeletionCollectorFactory(final long handle);


}
16 changes: 16 additions & 0 deletions java/src/test/java/org/rocksdb/OptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1496,4 +1496,20 @@ public void onMemTableSealed(final MemTableInfo memTableInfo) {
assertEquals(0, listeners2.size());
}
}
@Test
public void tablePropertiesCollectorFactory() {
try (final Options options = new Options()) {
try (TablePropertiesCollectorFactory collectorFactory =
TablePropertiesCollectorFactory.NewCompactOnDeletionCollectorFactory(10, 10, 1.0)) {
List<TablePropertiesCollectorFactory> factories = List.of(collectorFactory);
options.setTablePropertiesCollectorFactory(factories);
}
List<TablePropertiesCollectorFactory> factories = options.tablePropertiesCollectorFactory();
try {
assertThat(factories).hasSize(1);
} finally {
factories.stream().forEach(TablePropertiesCollectorFactory::close);
}
}
}
}
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/table.cc \
java/rocksjni/table_filter.cc \
java/rocksjni/table_filter_jnicallback.cc \
java/rocksjni/rocksjni/table_filter_jnicallback.cc \
java/rocksjni/thread_status.cc \
java/rocksjni/trace_writer.cc \
java/rocksjni/trace_writer_jnicallback.cc \
Expand Down

0 comments on commit 6eeb4f5

Please sign in to comment.