diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 76373b37dd..19bb2c8159 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -123,6 +123,7 @@ public class S3Storage implements Storage { ThreadUtils.createThreadFactory("storage-timeout-detect", true), 1, TimeUnit.SECONDS, 100); private long lastLogTimestamp = 0L; private volatile double maxDataWriteRate = 0.0; + private volatile boolean shutdown; private final AtomicLong pendingUploadBytes = new AtomicLong(0L); @@ -377,6 +378,7 @@ void recover0(WriteAheadLog deltaWAL, StreamManager streamManager, ObjectManager @Override public void shutdown() { + shutdown = true; drainBackoffTask.cancel(false); for (WalWriteRequest request : backoffRecords) { request.cf.completeExceptionally(new IOException("S3Storage is shutdown")); @@ -417,7 +419,7 @@ public CompletableFuture append(AppendContext context, StreamRecordBatch s * @return backoff status. */ public boolean append0(AppendContext context, WalWriteRequest request, boolean fromBackoff) { - // TODO: storage status check, fast fail the request when storage closed. + checkStatus(request); if (!fromBackoff && !backoffRecords.isEmpty()) { backoffRecords.offer(request); return true; @@ -574,6 +576,12 @@ private void handleTimeout(Timeout timeout, long streamId, long startOffset, lon } } + public void checkStatus(WalWriteRequest request) { + if (shutdown) { + request.cf.completeExceptionally(new IOException("S3Storage is shutdown")); + } + } + private void continuousCheck(List records) { long expectStartOffset = -1L; for (StreamRecordBatch record : records) {