From ca212055039b50a7aaf3fcab2d7aa59d7001ed8f Mon Sep 17 00:00:00 2001 From: Nathaniel Davidson Date: Sat, 28 Sep 2024 02:17:34 -0700 Subject: [PATCH] additional reset of consumer --- build.gradle | 2 +- .../com/nucleodb/library/mqs/ConsumerHandler.java | 1 + .../library/mqs/kafka/KafkaConsumerHandler.java | 11 ++--------- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/build.gradle b/build.gradle index ef237ee..3f13e67 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ plugins { } group = 'com.nucleodb' -version = '1.18.16' +version = '1.18.17' repositories { mavenCentral() diff --git a/src/main/java/com/nucleodb/library/mqs/ConsumerHandler.java b/src/main/java/com/nucleodb/library/mqs/ConsumerHandler.java index 6df9195..4823dbe 100644 --- a/src/main/java/com/nucleodb/library/mqs/ConsumerHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/ConsumerHandler.java @@ -53,6 +53,7 @@ public void readFromStart() throws InterruptedException { queue.clear(); startupPhaseConsume.set(true); leftToRead.set(0); + startupItems = -1; queueTasks.shutdownNow(); queueTasks.awaitTermination(4, TimeUnit.SECONDS); queueTasks = Executors.newFixedThreadPool(60); diff --git a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java index 843c6ab..abf3557 100644 --- a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java @@ -176,10 +176,7 @@ public void run() { try { getConsumer().poll(Duration.ofMillis(100)); Thread.sleep(1000); - } catch (InterruptedException e) { -// throw new RuntimeException(e); - e.printStackTrace(); - } + } catch (InterruptedException e) {} } seek(offsets); super.setStartupLoadCount(getDatabase().getStartupLoadCount()); @@ -189,11 +186,7 @@ public void run() { try { getConsumer().poll(Duration.ofMillis(100)); Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - //throw new RuntimeException(e); - - } + } catch (InterruptedException e) {} } seek(offsets); super.setStartupLoadCount(getConnectionHandler().getStartupLoadCount());