Skip to content

Commit

Permalink
Merge branch 'apache:develop' into rebalance
Browse files Browse the repository at this point in the history
  • Loading branch information
3424672656 authored Nov 14, 2024
2 parents 383285f + 66ba456 commit e067d7c
Show file tree
Hide file tree
Showing 67 changed files with 2,246 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public static byte[] combineBytes(byte[] b1, byte[] b2) {
}

public static String calSignature(byte[] data, String secretKey) {
String signature = AclSigner.calSignature(data, secretKey);
return signature;
return AclSigner.calSignature(data, secretKey);
}

public static void IPv6AddressCheck(String netAddress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -35,6 +36,7 @@
import org.apache.rocketmq.acl.PermissionChecker;
import org.apache.rocketmq.acl.common.AclConstants;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclSigner;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.AclConfig;
Expand Down Expand Up @@ -618,7 +620,8 @@ public void validate(PlainAccessResource plainAccessResource) {

// Check the signature
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) {
if (plainAccessResource.getSignature() == null
|| !MessageDigest.isEqual(signature.getBytes(AclSigner.DEFAULT_CHARSET), plainAccessResource.getSignature().getBytes(AclSigner.DEFAULT_CHARSET))) {
throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.auth.authentication.chain;

import java.security.MessageDigest;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -62,7 +63,8 @@ protected void doAuthenticate(DefaultAuthenticationContext context, User user) {
throw new AuthenticationException("User:{} is disabled.", context.getUsername());
}
String signature = AclSigner.calSignature(context.getContent(), user.getPassword());
if (!StringUtils.equals(signature, context.getSignature())) {
if (context.getSignature() == null
|| !MessageDigest.isEqual(signature.getBytes(AclSigner.DEFAULT_CHARSET), context.getSignature().getBytes(AclSigner.DEFAULT_CHARSET))) {
throw new AuthenticationException("check signature failed.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.RocksDBLmqConsumerOffsetManager;
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
Expand All @@ -99,12 +99,16 @@
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.RocksDBLmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.broker.config.v1.RocksDBLmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.config.v2.ConsumerOffsetManagerV2;
import org.apache.rocketmq.broker.config.v2.SubscriptionGroupManagerV2;
import org.apache.rocketmq.broker.config.v2.TopicConfigManagerV2;
import org.apache.rocketmq.broker.config.v2.ConfigStorage;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.RocksDBLmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.config.v1.RocksDBLmqTopicConfigManager;
import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
Expand All @@ -124,6 +128,7 @@
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.config.ConfigManagerVersion;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageExt;
Expand Down Expand Up @@ -239,6 +244,11 @@ public class BrokerController {
protected RemotingServer remotingServer;
protected CountDownLatch remotingServerStartLatch;
protected RemotingServer fastRemotingServer;

/**
* If {Topic, SubscriptionGroup, Offset}ManagerV2 are used, config entries are stored in RocksDB.
*/
protected ConfigStorage configStorage;
protected TopicConfigManager topicConfigManager;
protected SubscriptionGroupManager subscriptionGroupManager;
protected TopicQueueMappingManager topicQueueMappingManager;
Expand Down Expand Up @@ -334,7 +344,12 @@ public BrokerController(
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
if (this.messageStoreConfig.isEnableRocksDBStore()) {
if (ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion())) {
this.configStorage = new ConfigStorage(messageStoreConfig.getStorePathRootDir());
this.topicConfigManager = new TopicConfigManagerV2(this, configStorage);
this.subscriptionGroupManager = new SubscriptionGroupManagerV2(this, configStorage);
this.consumerOffsetManager = new ConsumerOffsetManagerV2(this, configStorage);
} else if (this.messageStoreConfig.isEnableRocksDBStore()) {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this);
Expand Down Expand Up @@ -771,7 +786,11 @@ private void updateNamesrvAddr() {
}

public boolean initializeMetadata() {
boolean result = this.topicConfigManager.load();
boolean result = true;
if (null != configStorage) {
result = configStorage.start();
}
result = result && this.topicConfigManager.load();
result = result && this.topicQueueMappingManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
Expand Down Expand Up @@ -1547,6 +1566,10 @@ protected void shutdownBasicService() {
this.consumerOffsetManager.stop();
}

if (null != configStorage) {
configStorage.shutdown();
}

if (this.authenticationMetadataManager != null) {
this.authenticationMetadataManager.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,40 @@
package org.apache.rocketmq.broker;

import com.alibaba.fastjson.JSON;
import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.CompressionType;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;

import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;

public class RocksDBConfigManager {
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public volatile boolean isStop = false;
public ConfigRocksDBStorage configRocksDBStorage = null;
private FlushOptions flushOptions = null;
private volatile long lastFlushMemTableMicroSecond = 0;

private final String filePath;
private final long memTableFlushInterval;
private final CompressionType compressionType;
private DataVersion kvDataVersion = new DataVersion();


public RocksDBConfigManager(String filePath, long memTableFlushInterval) {
public RocksDBConfigManager(String filePath, long memTableFlushInterval, CompressionType compressionType) {
this.filePath = filePath;
this.memTableFlushInterval = memTableFlushInterval;
this.compressionType = compressionType;
}

public boolean init() {
this.isStop = false;
this.configRocksDBStorage = new ConfigRocksDBStorage(filePath);
this.configRocksDBStorage = new ConfigRocksDBStorage(filePath, compressionType);
return this.configRocksDBStorage.start();
}
public boolean loadDataVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;
package org.apache.rocketmq.broker.config.v1;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
Expand All @@ -24,12 +24,14 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.CompressionType;
import org.rocksdb.WriteBatch;

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
Expand All @@ -40,7 +42,9 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));

}

@Override
Expand All @@ -60,10 +64,6 @@ public boolean loadConsumerOffset() {
}

private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
log.info("the switch transferOffsetJsonToRocksdb is off, no merge offset operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("consumerOffset json file does not exist, so skip merge");
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;
package org.apache.rocketmq.broker.config.v1;

import java.util.HashMap;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.subscription;
package org.apache.rocketmq.broker.config.v1;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.topic;
package org.apache.rocketmq.broker.config.v1;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;
package org.apache.rocketmq.broker.config.v1;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.subscription;
package org.apache.rocketmq.broker.config.v1;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
Expand All @@ -27,10 +27,12 @@
import java.util.function.BiConsumer;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.rocksdb.CompressionType;
import org.rocksdb.RocksIterator;

public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
Expand All @@ -39,7 +41,8 @@ public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {

public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
super(brokerController, false);
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
}

@Override
Expand Down Expand Up @@ -77,10 +80,6 @@ public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) {


private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
log.info("the switch transferMetadataJsonToRocksdb is off, no merge subGroup operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("subGroup json file does not exist, so skip merge");
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.topic;
package org.apache.rocketmq.broker.config.v1;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
Expand All @@ -23,18 +23,21 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.CompressionType;

public class RocksDBTopicConfigManager extends TopicConfigManager {

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBTopicConfigManager(BrokerController brokerController) {
super(brokerController, false);
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
}

@Override
Expand All @@ -58,10 +61,6 @@ public boolean loadDataVersion() {
}

private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
log.info("the switch transferMetadataJsonToRocksdb is off, no merge topic operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("topic json file does not exist, so skip merge");
return true;
Expand Down
Loading

0 comments on commit e067d7c

Please sign in to comment.