Skip to content

Commit

Permalink
add node filter to allow for sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
firestar committed Aug 17, 2024
1 parent 0bf752e commit f5126fc
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
}

group = 'com.nucleodb'
version = '1.17.4'
version = '1.18.0'

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.nucleodb.library.database.tables.connection;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.nucleodb.library.database.tables.annotation.Conn;
import com.nucleodb.library.database.utils.StartupRun;
import com.nucleodb.library.event.ConnectionEventListener;
import com.nucleodb.library.mqs.config.MQSConfiguration;
Expand Down Expand Up @@ -31,6 +32,7 @@ public class ConnectionConfig implements Serializable{
MQSConfiguration mqsConfiguration = new KafkaConfiguration();
Map<String, Object> settingsMap = new TreeMap<>();
String connectionFileName;
NodeFilter nodeFilter = new NodeFilter();
@JsonIgnore
private transient StartupRun startupRun = null;

Expand Down Expand Up @@ -182,4 +184,12 @@ public String getConnectionFileName() {
public void setConnectionFileName(String connectionFileName) {
this.connectionFileName = connectionFileName;
}

public NodeFilter getNodeFilter() {
return nodeFilter;
}

public void setNodeFilter(NodeFilter nodeFilter) {
this.nodeFilter = nodeFilter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.nucleodb.library.database.tables.connection;

import com.nucleodb.library.database.modifications.*;

public class NodeFilter {
public boolean create(ConnectionCreate c){
return true;
}
public <C extends Connection> boolean delete(ConnectionDelete d, C existing){
return true;
}
public <C extends Connection> boolean update(ConnectionUpdate u, C existing){
return true;
}
public <C extends Connection> boolean accept(String key){
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,14 @@ private void itemRequeue() {
if (this.startupPhase.get()) this.startupLoadCount.incrementAndGet();
}

public void modify(Modification mod, Object modification) {
public void modify(Modification mod, Object modification) throws ExecutionException {
switch (mod) {
case CREATE:
Create c = (Create) modification;
if(!config.getNodeFilter().create(c)){
consumerResponse(null, c.getChangeUUID());
return;
}
//if(!startupPhase.get()) logger.info("Create statement called");
if (c != null) {
try {
Expand Down Expand Up @@ -601,17 +605,23 @@ public void modify(Modification mod, Object modification) {
break;
case DELETE:
Delete d = (Delete) modification;

//if(!startupPhase.get()) logger.info("Delete statement called");
if (d != null) {
try {
itemProcessed();
T de = keyToEntry.get(d.getKey());
if(de!=null && !config.getNodeFilter().delete(d, de)){
consumerResponse(null, d.getChangeUUID());
return;
}
if (this.config.getReadToTime() != null && d.getTime().isAfter(this.config.getReadToTime())) {
consumerResponse(null, d.getChangeUUID());
fireListeners(Modification.DELETE, null);
return;
}

T de = keyToEntry.get(d.getKey());

if (de != null) {
if (de.getVersion() >= d.getVersion()) {
logger.info("Ignore already saved change.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public int compareTo(@NotNull Object o) {
MQSConfiguration mqsConfiguration = new KafkaConfiguration();
Map<String, Object> settingsMap = new TreeMap<>();
String tableFileName;
NodeFilter nodeFilter = new NodeFilter();
@JsonIgnore
private transient StartupRun startupRun = null;

Expand Down Expand Up @@ -220,4 +221,12 @@ public long getExportInterval() {
public void setExportInterval(long exportInterval) {
this.exportInterval = exportInterval;
}

public NodeFilter getNodeFilter() {
return nodeFilter;
}

public void setNodeFilter(NodeFilter nodeFilter) {
this.nodeFilter = nodeFilter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.nucleodb.library.database.tables.table;

import com.nucleodb.library.database.modifications.*;

public class NodeFilter {
public boolean create(Create c){
return true;
}
public <T extends DataEntry> boolean delete(Delete d, T existing){
return true;
}
public <T extends DataEntry> boolean update(Update u, T existing){
return true;
}
public <T extends DataEntry> boolean accept(String key){
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void run() {
Map<Integer, OffsetAndMetadata> offsetMetaMap = new HashMap<>();
try {
do {
ConsumerRecords<Integer, String> rs = getConsumer().poll(Duration.ofMillis(1000));
ConsumerRecords<String, String> rs = getConsumer().poll(Duration.ofMillis(1000));
if (rs.count() > 0) {
Map<Integer, Long> finalOffsets = offsets;
rs.iterator().forEachRemaining(action -> {
Expand All @@ -191,11 +191,25 @@ public void run() {
if (getStartupPhaseConsume().get()) getStartupLoadCount().incrementAndGet();
String pop = action.value();
//System.out.println("Change added to queue.");
getQueue().add(pop);
getLeftToRead().incrementAndGet();
synchronized (getQueue()) {
getQueue().notifyAll();
if(connectionType){
if(this.getConnectionHandler().getConfig().getNodeFilter().accept(action.key())){
getQueue().add(pop);
getLeftToRead().incrementAndGet();
synchronized (getQueue()) {
getQueue().notifyAll();
}
}
}
if(databaseType){
if(this.getDatabase().getConfig().getNodeFilter().accept(action.key())){
getQueue().add(pop);
getLeftToRead().incrementAndGet();
synchronized (getQueue()) {
getQueue().notifyAll();
}
}
}

if (saveConnection)
this.getConnectionHandler().getPartitionOffsets().put(action.partition(), action.offset());
if (saveDatabase)
Expand Down

0 comments on commit f5126fc

Please sign in to comment.