Skip to content

Commit

Permalink
fix(interactive): Support concurrent compact operation (#3706)
Browse files Browse the repository at this point in the history
  • Loading branch information
bufapiqi authored Apr 12, 2024
1 parent 60e3e31 commit c30b34a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ public class StoreConfig {

public static final Config<String> STORE_WAL_DIR =
Config.stringConfig("store.rocksdb.wal.dir", "");

public static final Config<Integer> STORE_COMPACT_THREAD_NUM =
Config.intConfig("store.compact.thread.num", 1);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class StoreService implements MetricsAgent {
private final Configs storeConfigs;
private final int storeId;
private final int writeThreadCount;
private final int compactThreadCount;
private final MetaService metaService;
private Map<Integer, GraphPartition> idToPartition;
private ExecutorService writeExecutor;
Expand All @@ -84,6 +85,7 @@ public StoreService(
this.storeId = CommonConfig.NODE_IDX.get(storeConfigs);
this.enableGc = StoreConfig.STORE_GC_ENABLE.get(storeConfigs);
this.writeThreadCount = StoreConfig.STORE_WRITE_THREAD_COUNT.get(storeConfigs);
this.compactThreadCount = StoreConfig.STORE_COMPACT_THREAD_NUM.get(storeConfigs);
this.metaService = metaService;
this.isSecondary = CommonConfig.SECONDARY_INSTANCE_ENABLED.get(storeConfigs);
metricsCollector.register(this, this::updateMetrics);
Expand Down Expand Up @@ -121,13 +123,18 @@ public void start() throws IOException {
new LinkedBlockingQueue<>(1),
ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler(
"store-ingest", logger));
int partitionCount = partitionIds.size();
int compactQueueLength =
partitionCount - this.compactThreadCount <= 0
? 1
: partitionCount - this.compactThreadCount;
this.compactExecutor =
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
this.compactThreadCount,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(compactQueueLength),
ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler(
"store-compact", logger));
this.garbageCollectExecutor =
Expand Down Expand Up @@ -407,21 +414,36 @@ public void compactDB(CompletionCallback<Void> callback) {
callback.onCompleted(null);
return;
}
this.compactExecutor.execute(
() -> {
logger.info("compact DB");
try {
for (GraphPartition partition : this.idToPartition.values()) {
int partitionCount = this.idToPartition.values().size();
CountDownLatch compactCountDownLatch = new CountDownLatch(partitionCount);
AtomicInteger successCompactJobCount = new AtomicInteger(partitionCount);
logger.info("compact DB");
for (GraphPartition partition : this.idToPartition.values()) {
this.compactExecutor.execute(
() -> {
try {
partition.compact();
logger.info("Compaction {} partition finished", partition.getId());
successCompactJobCount.decrementAndGet();
} catch (Exception e) {
logger.error("compact DB failed", e);
} finally {
compactCountDownLatch.countDown();
}
logger.info("compact DB finished");
callback.onCompleted(null);
} catch (Exception e) {
logger.error("compact DB failed", e);
callback.onError(e);
}
});
});
}

try {
compactCountDownLatch.await();
} catch (InterruptedException e) {
logger.error("compact DB has been InterruptedException", e);
}

if (successCompactJobCount.get() > 0) {
callback.onError(new Exception("not all partition compact success. please check log."));
} else {
callback.onCompleted(null);
}
}

public void tryCatchUpWithPrimary() throws IOException {
Expand Down

0 comments on commit c30b34a

Please sign in to comment.