Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[all] Changed Kafka from LI's 2.4.1.78 to Apache's 2.4.1 #1000

Merged
merged 4 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ if (project.hasProperty('overrideBuildEnvironment')) {
def avroVersion = '1.10.2'
def avroUtilVersion = '0.3.21'
def grpcVersion = '1.49.2'
def kafkaGroup = 'com.linkedin.kafka'
def kafkaVersion = '2.4.1.65'
// N.B.: The build should also work when substituting Kafka from the Apache fork to LinkedIn's fork:
def kafkaGroup = 'org.apache.kafka' // 'com.linkedin.kafka'
def kafkaVersion = '2.4.1' // '2.4.1.65'
sushantmane marked this conversation as resolved.
Show resolved Hide resolved
FelixGV marked this conversation as resolved.
Show resolved Hide resolved
def log4j2Version = '2.17.1'
def pegasusVersion = '29.31.0'
def protobufVersion = '3.21.7'
Expand All @@ -52,14 +53,15 @@ def alpnAgentVersion = '2.0.10'
def hadoopVersion = '2.10.2'
def apacheSparkVersion = '3.3.3'
def antlrVersion = '4.8'
def scala = '2.12'

ext.libraries = [
alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}",
antlr4: "org.antlr:antlr4:${antlrVersion}",
antlr4Runtime: "org.antlr:antlr4-runtime:${antlrVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_2.12:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_2.12:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_2.12:${apacheSparkVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_${scala}:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_${scala}:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_${scala}:${apacheSparkVersion}",
avro: "org.apache.avro:avro:${avroVersion}",
avroCompiler: "org.apache.avro:avro-compiler:${avroVersion}",
avroMapred: "org.apache.avro:avro-mapred:${avroVersion}",
Expand Down Expand Up @@ -101,7 +103,7 @@ ext.libraries = [
jna: 'net.java.dev.jna:jna:4.5.1',
jsr305: 'com.google.code.findbugs:jsr305:3.0.2',
joptSimple: 'net.sf.jopt-simple:jopt-simple:3.2',
kafka: "${kafkaGroup}:kafka_2.12:${kafkaVersion}",
kafka: "${kafkaGroup}:kafka_${scala}:${kafkaVersion}",
kafkaClients: "${kafkaGroup}:kafka-clients:${kafkaVersion}",
kafkaClientsTest: "${kafkaGroup}:kafka-clients:${kafkaVersion}:test",
log4j2api: "org.apache.logging.log4j:log4j-api:${log4j2Version}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.linkedin.venice.exceptions.ConfigurationException;
import com.linkedin.venice.exceptions.UndefinedPropertyException;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.VeniceProperties;
Expand All @@ -44,7 +45,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -89,8 +89,8 @@ public class VeniceClusterConfig {

private final VeniceProperties clusterProperties;

private final SecurityProtocol kafkaSecurityProtocol;
private final Map<String, SecurityProtocol> kafkaBootstrapUrlToSecurityProtocol;
private final PubSubSecurityProtocol kafkaSecurityProtocol;
private final Map<String, PubSubSecurityProtocol> kafkaBootstrapUrlToSecurityProtocol;
private final Optional<SSLConfig> sslConfig;

public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String, String>> kafkaClusterMap)
Expand Down Expand Up @@ -135,17 +135,17 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
LOGGER.info("Final region name for this node: {}", this.regionName);

String kafkaSecurityProtocolString =
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name());
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name());
if (!KafkaSSLUtils.isKafkaProtocolValid(kafkaSecurityProtocolString)) {
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
this.kafkaSecurityProtocol = SecurityProtocol.forName(kafkaSecurityProtocolString);
this.kafkaSecurityProtocol = PubSubSecurityProtocol.forName(kafkaSecurityProtocolString);

Int2ObjectMap<String> tmpKafkaClusterIdToUrlMap = new Int2ObjectOpenHashMap<>();
Object2IntMap<String> tmpKafkaClusterUrlToIdMap = new Object2IntOpenHashMap<>();
Int2ObjectMap<String> tmpKafkaClusterIdToAliasMap = new Int2ObjectOpenHashMap<>();
Object2IntMap<String> tmpKafkaClusterAliasToIdMap = new Object2IntOpenHashMap<>();
Map<String, SecurityProtocol> tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>();
Map<String, PubSubSecurityProtocol> tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>();
Map<String, String> tmpKafkaUrlResolution = new HashMap<>();

boolean foundBaseKafkaUrlInMappingIfItIsPopulated = kafkaClusterMap.isEmpty();
Expand All @@ -167,7 +167,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
tmpKafkaClusterUrlToIdMap.put(url, clusterId);
tmpKafkaUrlResolution.put(url, url);
if (securityProtocolString != null) {
tmpKafkaBootstrapUrlToSecurityProtocol.put(url, SecurityProtocol.valueOf(securityProtocolString));
tmpKafkaBootstrapUrlToSecurityProtocol.put(url, PubSubSecurityProtocol.valueOf(securityProtocolString));
}
}
if (baseKafkaBootstrapServers.equals(url)) {
Expand Down Expand Up @@ -228,7 +228,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
if (KafkaSSLUtils.isKafkaSSLProtocol(kafkaSecurityProtocolString)
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(SecurityProtocol.SSL)) {
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(PubSubSecurityProtocol.SSL)) {
this.sslConfig = Optional.of(new SSLConfig(clusterProps));
} else {
this.sslConfig = Optional.empty();
Expand Down Expand Up @@ -260,8 +260,8 @@ public String getKafkaBootstrapServers() {
return kafkaBootstrapServers;
}

public SecurityProtocol getKafkaSecurityProtocol(String kafkaBootstrapUrl) {
SecurityProtocol clusterSpecificSecurityProtocol = kafkaBootstrapUrlToSecurityProtocol.get(kafkaBootstrapUrl);
public PubSubSecurityProtocol getKafkaSecurityProtocol(String kafkaBootstrapUrl) {
PubSubSecurityProtocol clusterSpecificSecurityProtocol = kafkaBootstrapUrlToSecurityProtocol.get(kafkaBootstrapUrl);
return clusterSpecificSecurityProtocol == null ? kafkaSecurityProtocol : clusterSpecificSecurityProtocol;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.manager.TopicManagerContext;
Expand Down Expand Up @@ -114,7 +115,6 @@
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -1106,15 +1106,15 @@ private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String kafkaBoot
kafkaBootstrapUrls = resolvedKafkaUrl;
}
properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapUrls);
SecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
PubSubSecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
if (KafkaSSLUtils.isKafkaSSLProtocol(securityProtocol)) {
Optional<SSLConfig> sslConfig = serverConfig.getSslConfig();
if (!sslConfig.isPresent()) {
throw new VeniceException("SSLConfig should be present when Kafka SSL is enabled");
}
properties.putAll(sslConfig.get().getKafkaSSLConfig());
}
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name);
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
return new VeniceProperties(properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaEntry;
Expand All @@ -60,7 +61,6 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -126,7 +126,7 @@ private void setupMockConfig() {
doReturn(VeniceProperties.empty()).when(mockVeniceServerConfig).getKafkaConsumerConfigsForLocalConsumption();
doReturn(getConsumerAssignmentStrategy()).when(mockVeniceServerConfig).getSharedConsumerAssignmentStrategy();
doReturn(1).when(mockVeniceServerConfig).getConsumerPoolSizePerKafkaCluster();
doReturn(SecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl);
doReturn(PubSubSecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl);
doReturn(10).when(mockVeniceServerConfig).getKafkaMaxPollRecords();
doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherConsumerPoolSize();
doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherThreadPoolSize();
Expand Down
2 changes: 2 additions & 0 deletions clients/venice-admin-tool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ dependencies {
exclude group: 'org.apache.helix'
}
implementation('org.apache.helix:metrics-common:1.4.1:jdk8')
implementation libraries.zstd

testImplementation project(':internal:venice-common').sourceSets.test.output
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.venice.pubsub.api;

import java.util.Locale;


/**
* This enum is equivalent to Kafka's SecurityProtocol enum.
*
* We need this abstraction because Kafka's enum is present in two different namespaces, which are different between
* LinkedIn's fork and the Apache fork.
*/
public enum PubSubSecurityProtocol {
sushantmane marked this conversation as resolved.
Show resolved Hide resolved
/** Un-authenticated, non-encrypted channel */
PLAINTEXT,
/** SSL channel */
SSL,
/** SASL authenticated, non-encrypted channel */
SASL_PLAINTEXT,
/** SASL authenticated, SSL channel */
SASL_SSL;

public static PubSubSecurityProtocol forName(String name) {
return PubSubSecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.linkedin.venice.utils;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;


public class KafkaSSLUtils {
Expand All @@ -26,26 +26,20 @@ public class KafkaSSLUtils {
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);

/**
* Right now, Venice only supports two Kafka protocols:
* {@link SecurityProtocol#PLAINTEXT}
* {@link SecurityProtocol#SSL}
*
* @param kafkaProtocol
* @return
*/
public static boolean isKafkaProtocolValid(String kafkaProtocol) {
return kafkaProtocol.equals(SecurityProtocol.PLAINTEXT.name()) || kafkaProtocol.equals(SecurityProtocol.SSL.name())
|| kafkaProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name())
|| kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name());
return kafkaProtocol.equals(PubSubSecurityProtocol.PLAINTEXT.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_PLAINTEXT.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name());
}

public static boolean isKafkaSSLProtocol(String kafkaProtocol) {
return kafkaProtocol.equals(SecurityProtocol.SSL.name()) || kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name());
return kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name());
}

public static boolean isKafkaSSLProtocol(SecurityProtocol kafkaProtocol) {
return kafkaProtocol == SecurityProtocol.SSL || kafkaProtocol == SecurityProtocol.SASL_SSL;
public static boolean isKafkaSSLProtocol(PubSubSecurityProtocol kafkaProtocol) {
return kafkaProtocol == PubSubSecurityProtocol.SSL || kafkaProtocol == PubSubSecurityProtocol.SASL_SSL;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -192,7 +193,7 @@ public void testDeleteTopicValidTopicDeletion() throws Exception {
DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class);
KafkaFuture<Void> topicDeletionFutureMock = mock(KafkaFuture.class);

when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock);
when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock);
when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock);
when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS))).thenReturn(null);

Expand All @@ -208,7 +209,7 @@ public void testDeleteTopicThrowsException() throws Exception {
DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class);
KafkaFuture<Void> topicDeletionFutureMock = mock(KafkaFuture.class);

when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock);
when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock);
when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock);

when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import static org.testng.Assert.*;

import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.testng.annotations.Test;


Expand All @@ -20,23 +20,23 @@ public class ApacheKafkaAdminConfigTest {

@Test
public void testSetupSaslInKafkaAdminPlaintext() {
testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_PLAINTEXT);
testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_PLAINTEXT);
}

@Test
public void testSetupSaslInKafkaAdminSSL() {
testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_SSL);
testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_SSL);
}

private void testSetupSaslInKafkaAdmin(SecurityProtocol securityProtocol) {
private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) {
Properties properties = new Properties();
properties.put("cluster.name", "cluster");
properties.put("zookeeper.address", "localhost:2181");
properties.put("kafka.bootstrap.servers", "localhost:9092");
properties.put("kafka.sasl.jaas.config", SASL_JAAS_CONFIG);
properties.put("kafka.sasl.mechanism", SASL_MECHANISM);
properties.put("kafka.security.protocol", securityProtocol.name);
if (securityProtocol.name.contains("SSL")) {
properties.put("kafka.security.protocol", securityProtocol.name());
if (securityProtocol.name().contains("SSL")) {
properties.put("ssl.truststore.location", "-");
properties.put("ssl.truststore.password", "");
properties.put("ssl.truststore.type", "JKS");
Expand All @@ -49,7 +49,7 @@ private void testSetupSaslInKafkaAdmin(SecurityProtocol securityProtocol) {
Properties adminProperties = serverConfig.getAdminProperties();
assertEquals(SASL_JAAS_CONFIG, adminProperties.get("sasl.jaas.config"));
assertEquals(SASL_MECHANISM, adminProperties.get("sasl.mechanism"));
assertEquals(securityProtocol.name, adminProperties.get("security.protocol"));
assertEquals(securityProtocol.name(), adminProperties.get("security.protocol"));
}

@Test
Expand Down
Loading
Loading