From ae78918e05bee4156b13dd332cad32a7591a0eac Mon Sep 17 00:00:00 2001 From: Nathaniel Davidson Date: Sat, 14 Sep 2024 01:44:11 -0700 Subject: [PATCH] add lock manager to consumer --- build.gradle | 2 +- .../library/database/lock/LockManager.java | 1 + .../nucleodb/library/mqs/QueueHandler.java | 2 +- .../mqs/kafka/KafkaConsumerHandler.java | 14 ++++++++--- .../library/mqs/kafka/KafkaSettings.java | 25 +++++++++++-------- 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/build.gradle b/build.gradle index c205d00..6979e67 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ plugins { } group = 'com.nucleodb' -version = '1.18.2' +version = '1.18.3' repositories { mavenCentral() diff --git a/src/main/java/com/nucleodb/library/database/lock/LockManager.java b/src/main/java/com/nucleodb/library/database/lock/LockManager.java index 9c89e36..80c195d 100644 --- a/src/main/java/com/nucleodb/library/database/lock/LockManager.java +++ b/src/main/java/com/nucleodb/library/database/lock/LockManager.java @@ -172,6 +172,7 @@ void addToQueue(LockReference lockReference){ public void push(LockReference lockReference) { try { String key = String.format("%s_%s", lockReference.getTableName(), lockReference.getKey()); + log(key, "Lock Request Sent To Cluster"); producerHandler.push(key, Serializer.getObjectMapper().getOm().writeValueAsString(lockReference)); } catch (JsonProcessingException e) { e.printStackTrace(); diff --git a/src/main/java/com/nucleodb/library/mqs/QueueHandler.java b/src/main/java/com/nucleodb/library/mqs/QueueHandler.java index f9a2940..b71d328 100644 --- a/src/main/java/com/nucleodb/library/mqs/QueueHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/QueueHandler.java @@ -55,7 +55,7 @@ public void run() { } } }else if(lockdownType){ - //logger.info("processing lockdown"); + logger.info("processing lockdown"); this.consumerHandler.getLockManager().lockAction( Serializer.getObjectMapper().getOm().readValue(entry, LockReference.class) ); diff --git a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java index 4bedcc5..8e0a856 100644 --- a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java @@ -159,8 +159,7 @@ public void run() { } seek(offsets); super.setStartupLoadCount(getDatabase().getStartupLoadCount()); - } - if (connectionType) { + }else if (connectionType) { offsets = getConnectionHandler().getPartitionOffsets(); while (assigned.size() < offsets.size()) { try { @@ -174,8 +173,7 @@ public void run() { } seek(offsets); super.setStartupLoadCount(getConnectionHandler().getStartupLoadCount()); - } - if (lockManagerType) { + }else if (lockManagerType) { offsets = new HashMap<>(); super.setStartupLoadCount(new AtomicInteger(0)); } @@ -210,6 +208,14 @@ public void run() { } } + if(lockManagerType){ + getQueue().add(pop); + getLeftToRead().incrementAndGet(); + synchronized (getQueue()) { + getQueue().notifyAll(); + } + } + if (saveConnection) this.getConnectionHandler().getPartitionOffsets().put(action.partition(), action.offset()); if (saveDatabase) diff --git a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaSettings.java b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaSettings.java index dc6112d..2e06c42 100644 --- a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaSettings.java +++ b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaSettings.java @@ -16,20 +16,25 @@ public class KafkaSettings extends MQSSettings{ public KafkaSettings(Map objs) { super(objs); - if(objs.containsKey("servers")) { - this.servers = (String) objs.get("servers"); + Object servers = objs.get("servers"); + Object groupName = objs.get("groupName"); + Object partitions = objs.get("partitions"); + Object replicas = objs.get("replicas"); + Object offsetReset = objs.get("offsetReset"); + if(servers!=null) { + this.servers = (String) servers; } - if(objs.containsKey("groupName")) { - this.groupName = (String) objs.get("groupName"); + if(groupName!=null) { + this.groupName = (String) groupName; } - if(objs.containsKey("partitions")) { - this.partitions = (int) objs.get("partitions"); + if(partitions!=null) { + this.partitions = (int) partitions; } - if(objs.containsKey("replicas")) { - this.replicas = (int) objs.get("replicas"); + if(replicas!=null) { + this.replicas = (int) replicas; } - if(objs.containsKey("offsetReset")) { - this.offsetReset = (String) objs.get("offsetReset"); + if(offsetReset!=null) { + this.offsetReset = (String) offsetReset; } }