Skip to content

Commit

Permalink
add reset on table
Browse files Browse the repository at this point in the history
  • Loading branch information
firestar committed Sep 26, 2024
1 parent 7dcbfec commit 5582ea9
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions src/main/java/com/nucleodb/library/mqs/ConsumerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.nucleodb.library.mqs.config.MQSSettings;
import com.nucleodb.library.mqs.exceptions.RequiredMethodNotImplementedException;

import java.util.Queue;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -29,20 +29,32 @@ public class ConsumerHandler implements Runnable{

private MQSSettings settings;

private Queue<Thread> queueThreads = Queues.newLinkedBlockingQueue();


public ConsumerHandler(MQSSettings settings, String table) {
this.table = table;
this.settings = settings;
}
public void start(int queues){
for (int x = 0; x < queues; x++)
new Thread(new QueueHandler(this)).start();
for (int x = 0; x < queues; x++) {
Thread queueThread = new Thread(new QueueHandler(this));
queueThreads.add(queueThread);
queueThread.start();
}
}

public void readFromStart(){
startupLoadCount.set(0);
queue.clear();
startupPhaseConsume.set(true);
leftToRead.set(0);
Thread queueThread;
while((queueThread = queueThreads.poll())!=null) {
try {
queueThread.interrupt();
}catch (Exception e){}
}
}

@Override
Expand Down

0 comments on commit 5582ea9

Please sign in to comment.