Skip to content

Commit

Permalink
feat(core): reuse unregistered node when requesting for next node id (#…
Browse files Browse the repository at this point in the history
…2200)

* feat(core): reuse unregistered node when requesting for next node id (#2183)

Signed-off-by: Shichao Nie <[email protected]>

* fix(core): fix getting duplicated node id

Signed-off-by: Shichao Nie <[email protected]>

---------

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Dec 2, 2024
1 parent d6641d7 commit 6217fb1
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.message.GetKVsRequestData;
import org.apache.kafka.common.message.PutKVsRequestData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
Expand All @@ -39,6 +41,7 @@
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.stream.KVControlManager;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
Expand All @@ -59,8 +62,10 @@

import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -100,11 +105,17 @@ static class Builder {

// AutoMQ for Kafka inject start
private List<String> quorumVoters;
private KVControlManager kvControlManager;

Builder setQuorumVoters(List<String> quorumVoters) {
this.quorumVoters = quorumVoters;
return this;
}

Builder setKVControlManager(KVControlManager kvControlManager) {
this.kvControlManager = kvControlManager;
return this;
}
// AutoMQ for Kafka inject end

Builder setLogContext(LogContext logContext) {
Expand Down Expand Up @@ -180,7 +191,10 @@ ClusterControlManager build() {
featureControl,
zkMigrationEnabled,
brokerUncleanShutdownHandler,
quorumVoters
// AutoMQ inject start
quorumVoters,
kvControlManager
// AutoMQ inject end
);
}
}
Expand Down Expand Up @@ -292,6 +306,12 @@ boolean check() {
* The real next available node id is generally one greater than this value.
*/
private AtomicInteger nextNodeId = new AtomicInteger(-1);

/**
* A set of node IDs that have been unregistered and can be reused for new node assignments.
*/
private final KVControlManager kvControlManager;
private static final String REUSABLE_NODE_IDS_KEY = "__automq_reusable_node_ids/";
// AutoMQ for Kafka inject end

private ClusterControlManager(
Expand All @@ -304,7 +324,10 @@ private ClusterControlManager(
FeatureControlManager featureControl,
boolean zkMigrationEnabled,
BrokerUncleanShutdownHandler brokerUncleanShutdownHandler,
List<String> quorumVoters
// AutoMQ inject start
List<String> quorumVoters,
KVControlManager kvControlManager
// AutoMQ inject end
) {
this.logContext = logContext;
this.clusterId = clusterId;
Expand All @@ -323,6 +346,7 @@ private ClusterControlManager(
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
// AutoMQ for Kafka inject start
this.maxControllerId = QuorumConfig.parseVoterConnections(quorumVoters).keySet().stream().max(Integer::compareTo).orElse(0);
this.kvControlManager = kvControlManager;
// AutoMQ for Kafka inject end
}

Expand Down Expand Up @@ -369,16 +393,73 @@ boolean zkRegistrationAllowed() {

// AutoMQ for Kafka inject start
public ControllerResult<Integer> getNextNodeId() {
int maxBrokerId = brokerRegistrations.keySet().stream().max(Integer::compareTo).orElse(0);
int maxNodeId = Math.max(maxBrokerId, maxControllerId);
int nextId = this.nextNodeId.accumulateAndGet(maxNodeId, (x, y) -> Math.max(x, y) + 1);
// Let the broker's nodeId start from 1000 to easily distinguish broker and controller.
nextId = Math.max(nextId, 1000);
UpdateNextNodeIdRecord record = new UpdateNextNodeIdRecord().setNodeId(nextId);
int nextId;
Set<Integer> reusableNodeIds = getReusableNodeIds();
if (!reusableNodeIds.isEmpty()) {
Iterator<Integer> iterator = reusableNodeIds.iterator();
nextId = iterator.next();
// we simply remove the id from reusable id set because we're unable to determine if the id
// will finally be used.
iterator.remove();
return ControllerResult.atomicOf(putReusableNodeIds(reusableNodeIds), nextId);
} else {
int maxBrokerId = brokerRegistrations.keySet().stream().max(Integer::compareTo).orElse(0);
int maxNodeId = Math.max(maxBrokerId, maxControllerId);
nextId = this.nextNodeId.accumulateAndGet(maxNodeId, (x, y) -> Math.max(x, y) + 1);
// Let the broker's nodeId start from 1000 to easily distinguish broker and controller.
nextId = Math.max(nextId, 1000);
UpdateNextNodeIdRecord record = new UpdateNextNodeIdRecord().setNodeId(nextId);

List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(new ApiMessageAndVersion(record, (short) 0));
return ControllerResult.atomicOf(records, nextId);
List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(new ApiMessageAndVersion(record, (short) 0));
return ControllerResult.atomicOf(records, nextId);
}
}

Set<Integer> getReusableNodeIds() {
return deserializeReusableNodeIds(kvControlManager.getKV(
new GetKVsRequestData.GetKVRequest().setKey(REUSABLE_NODE_IDS_KEY)).value());
}

List<ApiMessageAndVersion> putReusableNodeIds(Set<Integer> reusableNodeIds) {
return kvControlManager.putKV(new PutKVsRequestData.PutKVRequest()
.setKey(REUSABLE_NODE_IDS_KEY)
.setValue(serializeReusableNodeIds(reusableNodeIds))
.setOverwrite(true))
.records();
}

private Set<Integer> deserializeReusableNodeIds(byte[] value) {
if (value == null) {
return new HashSet<>();
}
ByteBuffer buffer = ByteBuffer.wrap(value);
Set<Integer> reusableNodeIds = new HashSet<>();
while (buffer.hasRemaining()) {
reusableNodeIds.add(buffer.getInt());
}
return reusableNodeIds;
}

private byte[] serializeReusableNodeIds(Set<Integer> reusableNodeIds) {
ByteBuffer buffer = ByteBuffer.allocate(reusableNodeIds.size() * Integer.BYTES);
reusableNodeIds.forEach(buffer::putInt);
return buffer.array();
}

public List<ApiMessageAndVersion> registerBrokerRecords(int brokerId) {
Set<Integer> reusableNodeIds = getReusableNodeIds();
if (reusableNodeIds.contains(brokerId)) {
reusableNodeIds.remove(brokerId);
return putReusableNodeIds(reusableNodeIds);
}
return Collections.emptyList();
}

public List<ApiMessageAndVersion> unRegisterBrokerRecords(int brokerId) {
Set<Integer> reusableNodeIds = getReusableNodeIds();
reusableNodeIds.add(brokerId);
return putReusableNodeIds(reusableNodeIds);
}
// AutoMQ for Kafka inject end

Expand Down Expand Up @@ -496,6 +577,10 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
}
heartbeatManager.register(brokerId, record.fenced());

// AutoMQ for Kafka inject start
records.addAll(registerBrokerRecords(brokerId));
// AutoMQ for Kafka inject end

return ControllerResult.atomicOf(records, new BrokerRegistrationReply(record.brokerEpoch()));
}

Expand Down Expand Up @@ -583,6 +668,7 @@ public void replay(RegisterBrokerRecord record, long offset) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
heartbeatManager.register(brokerId, record.fenced());
}

if (prevRegistration == null) {
log.info("Replayed initial RegisterBrokerRecord for broker {}: {}", record.brokerId(), record);
} else if (prevRegistration.incarnationId().equals(record.incarnationId())) {
Expand All @@ -608,6 +694,7 @@ public void replay(UnregisterBrokerRecord record) {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
updateDirectories(brokerId, registration.directories(), null);
brokerRegistrations.remove(brokerId);
// AutoMQ injection end
log.info("Replayed {}", record);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2047,6 +2047,9 @@ private QuorumController(
this.time = time;
this.controllerMetrics = controllerMetrics;
this.snapshotRegistry = new SnapshotRegistry(logContext);
// AutoMQ for Kafka inject start
this.kvControlManager = new KVControlManager(snapshotRegistry, logContext);
// AutoMQ for Kafka inject end
this.deferredEventQueue = new DeferredEventQueue(logContext);
this.deferredUnstableEventQueue = new DeferredEventQueue(logContext);
this.offsetControl = new OffsetControlManager.Builder().
Expand Down Expand Up @@ -2094,6 +2097,7 @@ private QuorumController(
setZkMigrationEnabled(zkMigrationEnabled).
// AutoMQ for Kafka inject start
setQuorumVoters(quorumVoters).
setKVControlManager(kvControlManager).
// AutoMQ for Kafka inject end
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
build();
Expand Down Expand Up @@ -2156,7 +2160,6 @@ private QuorumController(
featureControl::autoMQVersion, time);
this.streamControlManager = new StreamControlManager(this, snapshotRegistry, logContext,
this.s3ObjectControlManager, clusterControl, featureControl, replicationControl);
this.kvControlManager = new KVControlManager(snapshotRegistry, logContext);
this.topicDeletionManager = new TopicDeletionManager(snapshotRegistry, this, streamControlManager, kvControlManager);
this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoGetter(clusterControl, streamControlManager));
this.extension = extension.apply(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,7 @@ void handleBrokerUnregistered(int brokerId, long brokerEpoch,
(short) 0));
// AutoMQ for Kafka inject start
records.add(nodeControlManager.unregisterNodeRecord(brokerId));
records.addAll(clusterControl.unRegisterBrokerRecords(brokerId));
// AutoMQ for Kafka inject end
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.KVRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
Expand All @@ -36,6 +37,7 @@
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.controller.stream.KVControlManager;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
Expand All @@ -52,6 +54,7 @@
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -60,12 +63,14 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -718,4 +723,57 @@ public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) {
clusterControl.brokerRegistrations().get(1).epoch());
}
}

@Test
public void testReusableNodeIds() {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
KVControlManager kvControl = new KVControlManager(snapshotRegistry, new LogContext());
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
Collections.singletonList(0))).
setMetadataVersion(MetadataVersion.IBP_3_9_IV0).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setQuorumVoters(new ArrayList<>()).
setKVControlManager(kvControl).
build();
clusterControl.activate();
Set<Integer> nodeIds = clusterControl.getReusableNodeIds();
Assertions.assertTrue(nodeIds.isEmpty());
clusterControl.putReusableNodeIds(Set.of(1, 2, 3)).forEach(r -> {
kvControl.replay((KVRecord) r.message());
});
nodeIds = clusterControl.getReusableNodeIds();
Assertions.assertEquals(Set.of(1, 2, 3), nodeIds);

clusterControl.unRegisterBrokerRecords(4).forEach(r -> {
kvControl.replay((KVRecord) r.message());
});
nodeIds = clusterControl.getReusableNodeIds();
Assertions.assertEquals(Set.of(1, 2, 3, 4), nodeIds);

clusterControl.registerBrokerRecords(2).forEach(r -> {
kvControl.replay((KVRecord) r.message());
});
nodeIds = clusterControl.getReusableNodeIds();
Assertions.assertEquals(Set.of(1, 3, 4), nodeIds);

ControllerResult<Integer> result = clusterControl.getNextNodeId();
result.records().forEach(r -> {
if (r.message() instanceof KVRecord) {
kvControl.replay((KVRecord) r.message());
}
});
Set<Integer> remainIds = new HashSet<>(Set.of(1, 3, 4));
remainIds.remove(result.response());
Assertions.assertEquals(remainIds, clusterControl.getReusableNodeIds());
}
}

0 comments on commit 6217fb1

Please sign in to comment.