From 0a86a4d966140f842a06b3264e73c2d7f38d6bb7 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Sat, 3 Aug 2024 11:35:40 +0800 Subject: [PATCH 1/5] feat(s3stream):add storage status check --- s3stream/src/main/java/com/automq/stream/s3/S3Storage.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 76373b37dd..d191fc6194 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -123,6 +123,7 @@ public class S3Storage implements Storage { ThreadUtils.createThreadFactory("storage-timeout-detect", true), 1, TimeUnit.SECONDS, 100); private long lastLogTimestamp = 0L; private volatile double maxDataWriteRate = 0.0; + private volatile boolean shutdown; private final AtomicLong pendingUploadBytes = new AtomicLong(0L); @@ -377,6 +378,7 @@ void recover0(WriteAheadLog deltaWAL, StreamManager streamManager, ObjectManager @Override public void shutdown() { + shutdown = true; drainBackoffTask.cancel(false); for (WalWriteRequest request : backoffRecords) { request.cf.completeExceptionally(new IOException("S3Storage is shutdown")); @@ -417,7 +419,9 @@ public CompletableFuture append(AppendContext context, StreamRecordBatch s * @return backoff status. */ public boolean append0(AppendContext context, WalWriteRequest request, boolean fromBackoff) { - // TODO: storage status check, fast fail the request when storage closed. + if (shutdown) { + LOGGER.warn("S3Storage is shutdown"); + } if (!fromBackoff && !backoffRecords.isEmpty()) { backoffRecords.offer(request); return true; From a4dbfa1c1863cf57cf50e9c559378da72a074492 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Sat, 3 Aug 2024 11:55:26 +0800 Subject: [PATCH 2/5] feat(s3stream):add storage status check --- .../src/main/java/com/automq/stream/s3/S3Storage.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index d191fc6194..d5076f8583 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -419,9 +419,7 @@ public CompletableFuture append(AppendContext context, StreamRecordBatch s * @return backoff status. */ public boolean append0(AppendContext context, WalWriteRequest request, boolean fromBackoff) { - if (shutdown) { - LOGGER.warn("S3Storage is shutdown"); - } + checkStatus(); if (!fromBackoff && !backoffRecords.isEmpty()) { backoffRecords.offer(request); return true; @@ -578,6 +576,12 @@ private void handleTimeout(Timeout timeout, long streamId, long startOffset, lon } } + public void checkStatus() { + if (shutdown) { + new IOException("S3Storage is shutdown"); + } + } + private void continuousCheck(List records) { long expectStartOffset = -1L; for (StreamRecordBatch record : records) { From 8895f450fe073161bd09343c7355bd1f6cf024cd Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Sun, 4 Aug 2024 18:33:23 +0800 Subject: [PATCH 3/5] feat(s3stream):add storage status check --- s3stream/src/main/java/com/automq/stream/s3/S3Storage.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index d5076f8583..24a2ea0cc7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -56,6 +56,7 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; @@ -578,7 +579,7 @@ private void handleTimeout(Timeout timeout, long streamId, long startOffset, lon public void checkStatus() { if (shutdown) { - new IOException("S3Storage is shutdown"); + new CompletionException(new IOException("S3Storage is shutdown")); } } From 7efc0136b0e581edac78d0ae0daae1587b383a25 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Tue, 6 Aug 2024 10:26:08 +0800 Subject: [PATCH 4/5] feat(s3stream): add storage status check --- s3stream/src/main/java/com/automq/stream/s3/S3Storage.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 24a2ea0cc7..19bb2c8159 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -56,7 +56,6 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; @@ -420,7 +419,7 @@ public CompletableFuture append(AppendContext context, StreamRecordBatch s * @return backoff status. */ public boolean append0(AppendContext context, WalWriteRequest request, boolean fromBackoff) { - checkStatus(); + checkStatus(request); if (!fromBackoff && !backoffRecords.isEmpty()) { backoffRecords.offer(request); return true; @@ -577,9 +576,9 @@ private void handleTimeout(Timeout timeout, long streamId, long startOffset, lon } } - public void checkStatus() { + public void checkStatus(WalWriteRequest request) { if (shutdown) { - new CompletionException(new IOException("S3Storage is shutdown")); + request.cf.completeExceptionally(new IOException("S3Storage is shutdown")); } } From bd28a93a34864005152339cffdf0ff62baeb86ef Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 14 Aug 2024 00:05:00 +0800 Subject: [PATCH 5/5] feat(s3stream):add storage status check