Skip to content

Commit

Permalink
wal support asynchronous write
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed Apr 3, 2024
1 parent aded562 commit 76d77b5
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.seatunnel.engine.imap.storage.api.common;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class ImapStorageThreadFactory implements ThreadFactory {
private final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);

private final String namePrefix;

public ImapStorageThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "Imap-StorageThread-" + poolNumber.getAndIncrement() + "-thread-";
}

@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
if (thread.isDaemon()) {
thread.setDaemon(false);
}
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants;
import org.apache.seatunnel.engine.imap.storage.file.common.WALReader;
import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncType;
import org.apache.seatunnel.engine.imap.storage.file.config.AbstractConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALDisruptor;
Expand Down Expand Up @@ -71,6 +72,8 @@ public class IMapFileStorage implements IMapStorage {

private static final String STORAGE_TYPE_KEY = "storage.type";

private static final String WAL_SYNC_TYPE_KEY = "wal.sync.type";

public FileSystem fs;

public String namespace;
Expand Down Expand Up @@ -107,6 +110,7 @@ public class IMapFileStorage implements IMapStorage {
public static final long DEFAULT_WRITE_DATA_TIMEOUT_MILLISECONDS = 1000 * 60;

private Configuration conf;
private WALSyncType walSyncType;

private FileConfiguration fileConfiguration;

Expand All @@ -120,9 +124,16 @@ public void initialize(Map<String, Object> configuration) {

String storageType =
String.valueOf(
configuration.getOrDefault(
STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString()));
this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase());
configuration.getOrDefault(
STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString()))
.toUpperCase();
String walSyncMethod =
String.valueOf(
configuration.getOrDefault(
WAL_SYNC_TYPE_KEY, WALSyncType.SYNC.toString()))
.toUpperCase();
this.walSyncType = WALSyncType.valueOf(walSyncMethod);
this.fileConfiguration = FileConfiguration.valueOf(storageType);
// build configuration
AbstractConfiguration fileConfiguration = this.fileConfiguration.getConfiguration();

Expand Down Expand Up @@ -156,7 +167,8 @@ public void initialize(Map<String, Object> configuration) {
this.walDisruptor =
new WALDisruptor(
fs,
FileConfiguration.valueOf(storageType.toUpperCase()),
FileConfiguration.valueOf(storageType),
WALSyncType.valueOf(walSyncMethod),
businessRootPath + region + DEFAULT_IMAP_FILE_PATH_SPLIT,
serializer);
}
Expand Down Expand Up @@ -306,6 +318,9 @@ private long sendToDisruptorQueue(IMapFileData data, WALEventType type) {
}

private boolean queryExecuteStatus(long requestId) {
if (WALSyncType.ASYNC == walSyncType) {
return true;
}
return queryExecuteStatus(requestId, this.writDataTimeoutMilliseconds);
}

Expand All @@ -327,17 +342,8 @@ private boolean queryExecuteStatus(long requestId, long timeout) {
private Set<Object> batchQueryExecuteFailsStatus(
Map<Long, Object> requestMap, Set<Object> failures) {
for (Map.Entry<Long, Object> entry : requestMap.entrySet()) {
boolean success = false;
RequestFuture requestFuture = RequestFutureCache.get(entry.getKey());
try {
if (requestFuture.isDone() || Boolean.TRUE.equals(requestFuture.get())) {
success = true;
}
} catch (Exception e) {
log.error("wait for write status error", e);
} finally {
RequestFutureCache.remove(entry.getKey());
}
Long requestId = entry.getKey();
boolean success = queryExecuteStatus(requestId);
if (!success) {
failures.add(entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.seatunnel.engine.imap.storage.file.common;

public enum WALSyncType {
SYNC,
ASYNC
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.apache.seatunnel.engine.imap.storage.file.common;

import org.apache.seatunnel.engine.imap.storage.api.common.ImapStorageThreadFactory;
import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.wal.DiscoveryWalFileFactory;
Expand All @@ -29,29 +30,85 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
public class WALWriter implements AutoCloseable {

IFileWriter writer;

private ExecutorService walWriterService;

Future<?> writeTaskFuture;

private static final int DEFAULT_THREAD_POOL_MIN_SIZE =
Runtime.getRuntime().availableProcessors() * 2 + 1;

private static final int DEFAULT_THREAD_POOL_MAX_SIZE =
Runtime.getRuntime().availableProcessors() * 4 + 1;

private static final int DEFAULT_THREAD_POOL_QUENE_SIZE = 1024;

private final WALSyncType walSyncType;

public WALWriter(
FileSystem fs,
FileConfiguration fileConfiguration,
WALSyncType walSyncType,
Path parentPath,
Serializer serializer)
throws IOException {
this.writer = DiscoveryWalFileFactory.getWriter(fileConfiguration.getName());
this.writer.setBlockSize(fileConfiguration.getConfiguration().getBlockSize());
this.writer.initialize(fs, parentPath, serializer);
this.walSyncType = walSyncType;
if (WALSyncType.ASYNC == walSyncType) {
this.walWriterService =
new ThreadPoolExecutor(
DEFAULT_THREAD_POOL_MIN_SIZE,
DEFAULT_THREAD_POOL_MAX_SIZE,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(DEFAULT_THREAD_POOL_QUENE_SIZE),
new ImapStorageThreadFactory());
}
}

public void write(IMapFileData data) throws IOException {
this.writer.write(data);

switch (walSyncType) {
case SYNC:
this.writer.write(data);
return;
case ASYNC:
writeTaskFuture =
this.walWriterService.submit(
() -> {
try {
this.writer.write(data);
} catch (Exception e) {
log.error(String.format("store imap failed : %s", data), e);
}
});
return;
}
}

@Override
public void close() throws Exception {
if (WALSyncType.ASYNC == walSyncType && writeTaskFuture != null) {
writeTaskFuture.cancel(false);
if (walWriterService != null) {
walWriterService.shutdown();
}
}
this.writer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncType;
import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration;
import org.apache.seatunnel.engine.serializer.api.Serializer;

Expand Down Expand Up @@ -62,6 +63,7 @@ public class WALDisruptor implements Closeable {
public WALDisruptor(
FileSystem fs,
FileConfiguration fileConfiguration,
WALSyncType walSyncType,
String parentPath,
Serializer serializer) {
// todo should support multi thread producer
Expand All @@ -73,9 +75,8 @@ public WALDisruptor(
threadFactory,
ProducerType.SINGLE,
new BlockingWaitStrategy());

disruptor.handleEventsWithWorkerPool(
new WALWorkHandler(fs, fileConfiguration, parentPath, serializer));
new WALWorkHandler(fs, fileConfiguration, walSyncType, parentPath, serializer));

disruptor.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncType;
import org.apache.seatunnel.engine.imap.storage.file.common.WALWriter;
import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache;
Expand All @@ -44,10 +45,13 @@ public class WALWorkHandler implements WorkHandler<FileWALEvent> {
public WALWorkHandler(
FileSystem fs,
FileConfiguration fileConfiguration,
WALSyncType walSyncType,
String parentPath,
Serializer serializer) {
try {
writer = new WALWriter(fs, fileConfiguration, new Path(parentPath), serializer);
writer =
new WALWriter(
fs, fileConfiguration, walSyncType, new Path(parentPath), serializer);
} catch (IOException e) {
throw new IMapStorageException(
e, "create new current writer failed, parent path is %s", parentPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public void setBlockSize(Long blockSize) {
}
}

// TODO Synchronous write, asynchronous write can be added in the future
@Override
public void write(IMapFileData data) throws IOException {
byte[] bytes = serializer.serialize(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ public static void init() throws IOException {
}

@Test
public void testWriterAndReader() throws Exception {
WALWriter writer = new WALWriter(FS, FileConfiguration.HDFS, PARENT_PATH, SERIALIZER);
public void testSyncWriterAndReader() throws Exception {
WALWriter writer =
new WALWriter(
FS, FileConfiguration.HDFS, WALSyncType.SYNC, PARENT_PATH, SERIALIZER);
IMapFileData data;
boolean isDelete;
for (int i = 0; i < 1024; i++) {
Expand Down Expand Up @@ -114,6 +116,61 @@ public void testWriterAndReader() throws Exception {
Assertions.assertNull(result.get("key519"));
}

@Test
public void testAsyncWriterAndReader() throws Exception {
WALWriter writer =
new WALWriter(
FS, FileConfiguration.HDFS, WALSyncType.ASYNC, PARENT_PATH, SERIALIZER);
IMapFileData data;
boolean isDelete;
for (int i = 0; i < 1024; i++) {
data =
IMapFileData.builder()
.key(SERIALIZER.serialize("key" + i))
.keyClassName(String.class.getName())
.value(SERIALIZER.serialize("value" + i))
.valueClassName(Integer.class.getName())
.timestamp(System.nanoTime())
.build();
if (i % 2 == 0) {
isDelete = true;
data.setKey(SERIALIZER.serialize(i));
data.setKeyClassName(Integer.class.getName());
} else {
isDelete = false;
}
data.setDeleted(isDelete);

writer.write(data);
}
// update key 511
data =
IMapFileData.builder()
.key(SERIALIZER.serialize("key" + 511))
.keyClassName(String.class.getName())
.value(SERIALIZER.serialize("Kristen"))
.valueClassName(String.class.getName())
.deleted(false)
.timestamp(System.nanoTime())
.build();
writer.write(data);
// delete key 519
data =
IMapFileData.builder()
.key(SERIALIZER.serialize("key" + 519))
.keyClassName(String.class.getName())
.deleted(true)
.timestamp(System.nanoTime())
.build();

writer.write(data);
writer.close();
await().atMost(10, java.util.concurrent.TimeUnit.SECONDS).await();

WALReader reader = new WALReader(FS, FileConfiguration.HDFS, new ProtoStuffSerializer());
Map<Object, Object> result = reader.loadAllData(PARENT_PATH, new HashSet<>());
}

@AfterAll
public static void close() throws IOException {
FS.delete(PARENT_PATH, true);
Expand Down
Loading

0 comments on commit 76d77b5

Please sign in to comment.