Skip to content

Commit

Permalink
add lock manager to consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
firestar committed Sep 14, 2024
1 parent ff49f94 commit ae78918
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 16 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
}

group = 'com.nucleodb'
version = '1.18.2'
version = '1.18.3'

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ void addToQueue(LockReference lockReference){
public void push(LockReference lockReference) {
try {
String key = String.format("%s_%s", lockReference.getTableName(), lockReference.getKey());
log(key, "Lock Request Sent To Cluster");
producerHandler.push(key, Serializer.getObjectMapper().getOm().writeValueAsString(lockReference));
} catch (JsonProcessingException e) {
e.printStackTrace();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/nucleodb/library/mqs/QueueHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void run() {
}
}
}else if(lockdownType){
//logger.info("processing lockdown");
logger.info("processing lockdown");
this.consumerHandler.getLockManager().lockAction(
Serializer.getObjectMapper().getOm().readValue(entry, LockReference.class)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ public void run() {
}
seek(offsets);
super.setStartupLoadCount(getDatabase().getStartupLoadCount());
}
if (connectionType) {
}else if (connectionType) {
offsets = getConnectionHandler().getPartitionOffsets();
while (assigned.size() < offsets.size()) {
try {
Expand All @@ -174,8 +173,7 @@ public void run() {
}
seek(offsets);
super.setStartupLoadCount(getConnectionHandler().getStartupLoadCount());
}
if (lockManagerType) {
}else if (lockManagerType) {
offsets = new HashMap<>();
super.setStartupLoadCount(new AtomicInteger(0));
}
Expand Down Expand Up @@ -210,6 +208,14 @@ public void run() {
}
}

if(lockManagerType){
getQueue().add(pop);
getLeftToRead().incrementAndGet();
synchronized (getQueue()) {
getQueue().notifyAll();
}
}

if (saveConnection)
this.getConnectionHandler().getPartitionOffsets().put(action.partition(), action.offset());
if (saveDatabase)
Expand Down
25 changes: 15 additions & 10 deletions src/main/java/com/nucleodb/library/mqs/kafka/KafkaSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@ public class KafkaSettings extends MQSSettings{

public KafkaSettings(Map<String, Object> objs) {
super(objs);
if(objs.containsKey("servers")) {
this.servers = (String) objs.get("servers");
Object servers = objs.get("servers");
Object groupName = objs.get("groupName");
Object partitions = objs.get("partitions");
Object replicas = objs.get("replicas");
Object offsetReset = objs.get("offsetReset");
if(servers!=null) {
this.servers = (String) servers;
}
if(objs.containsKey("groupName")) {
this.groupName = (String) objs.get("groupName");
if(groupName!=null) {
this.groupName = (String) groupName;
}
if(objs.containsKey("partitions")) {
this.partitions = (int) objs.get("partitions");
if(partitions!=null) {
this.partitions = (int) partitions;
}
if(objs.containsKey("replicas")) {
this.replicas = (int) objs.get("replicas");
if(replicas!=null) {
this.replicas = (int) replicas;
}
if(objs.containsKey("offsetReset")) {
this.offsetReset = (String) objs.get("offsetReset");
if(offsetReset!=null) {
this.offsetReset = (String) offsetReset;
}
}

Expand Down

0 comments on commit ae78918

Please sign in to comment.