Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[all] Changed Kafka from LI's 2.4.1.78 to Apache's 2.4.1" #1283

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ if (project.hasProperty('overrideBuildEnvironment')) {
def avroVersion = '1.10.2'
def avroUtilVersion = '0.3.21'
def grpcVersion = '1.49.2'
// N.B.: The build should also work when substituting Kafka from the Apache fork to LinkedIn's fork:
def kafkaGroup = 'org.apache.kafka' // 'com.linkedin.kafka'
def kafkaVersion = '2.4.1' // '2.4.1.65'
def kafkaGroup = 'com.linkedin.kafka'
def kafkaVersion = '2.4.1.78'
def log4j2Version = '2.17.1'
def pegasusVersion = '29.31.0'
def protobufVersion = '3.21.7'
Expand All @@ -53,15 +52,14 @@ def alpnAgentVersion = '2.0.10'
def hadoopVersion = '2.10.2'
def apacheSparkVersion = '3.3.3'
def antlrVersion = '4.8'
def scala = '2.12'

ext.libraries = [
alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}",
antlr4: "org.antlr:antlr4:${antlrVersion}",
antlr4Runtime: "org.antlr:antlr4-runtime:${antlrVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_${scala}:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_${scala}:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_${scala}:${apacheSparkVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_2.12:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_2.12:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_2.12:${apacheSparkVersion}",
avro: "org.apache.avro:avro:${avroVersion}",
avroCompiler: "org.apache.avro:avro-compiler:${avroVersion}",
avroMapred: "org.apache.avro:avro-mapred:${avroVersion}",
Expand Down Expand Up @@ -103,7 +101,7 @@ ext.libraries = [
jna: 'net.java.dev.jna:jna:4.5.1',
jsr305: 'com.google.code.findbugs:jsr305:3.0.2',
joptSimple: 'net.sf.jopt-simple:jopt-simple:3.2',
kafka: "${kafkaGroup}:kafka_${scala}:${kafkaVersion}",
kafka: "${kafkaGroup}:kafka_2.12:${kafkaVersion}",
kafkaClients: "${kafkaGroup}:kafka-clients:${kafkaVersion}",
kafkaClientsTest: "${kafkaGroup}:kafka-clients:${kafkaVersion}:test",
log4j2api: "org.apache.logging.log4j:log4j-api:${log4j2Version}",
Expand Down Expand Up @@ -196,8 +194,6 @@ subprojects {

if (JavaVersion.current() >= JavaVersion.VERSION_1_9) {
tasks.withType(JavaCompile) {
// Compiler arguments can be injected here...
// options.compilerArgs << '-Xlint:unchecked'
options.release = 8
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.linkedin.venice.exceptions.ConfigurationException;
import com.linkedin.venice.exceptions.UndefinedPropertyException;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.Utils;
Expand All @@ -46,6 +45,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -90,8 +90,8 @@ public class VeniceClusterConfig {

private final VeniceProperties clusterProperties;

private final PubSubSecurityProtocol kafkaSecurityProtocol;
private final Map<String, PubSubSecurityProtocol> kafkaBootstrapUrlToSecurityProtocol;
private final SecurityProtocol kafkaSecurityProtocol;
private final Map<String, SecurityProtocol> kafkaBootstrapUrlToSecurityProtocol;
private final Optional<SSLConfig> sslConfig;

public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String, String>> kafkaClusterMap)
Expand Down Expand Up @@ -136,17 +136,17 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
LOGGER.info("Final region name for this node: {}", this.regionName);

String kafkaSecurityProtocolString =
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name());
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name());
if (!KafkaSSLUtils.isKafkaProtocolValid(kafkaSecurityProtocolString)) {
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
this.kafkaSecurityProtocol = PubSubSecurityProtocol.forName(kafkaSecurityProtocolString);
this.kafkaSecurityProtocol = SecurityProtocol.forName(kafkaSecurityProtocolString);

Int2ObjectMap<String> tmpKafkaClusterIdToUrlMap = new Int2ObjectOpenHashMap<>();
Object2IntMap<String> tmpKafkaClusterUrlToIdMap = new Object2IntOpenHashMap<>();
Int2ObjectMap<String> tmpKafkaClusterIdToAliasMap = new Int2ObjectOpenHashMap<>();
Object2IntMap<String> tmpKafkaClusterAliasToIdMap = new Object2IntOpenHashMap<>();
Map<String, PubSubSecurityProtocol> tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>();
Map<String, SecurityProtocol> tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>();
Map<String, String> tmpKafkaUrlResolution = new HashMap<>();

boolean foundBaseKafkaUrlInMappingIfItIsPopulated = kafkaClusterMap.isEmpty();
Expand Down Expand Up @@ -183,7 +183,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
tmpKafkaClusterUrlToIdMap.put(url, clusterId);
tmpKafkaUrlResolution.put(url, url);
if (securityProtocolString != null) {
tmpKafkaBootstrapUrlToSecurityProtocol.put(url, PubSubSecurityProtocol.valueOf(securityProtocolString));
tmpKafkaBootstrapUrlToSecurityProtocol.put(url, SecurityProtocol.valueOf(securityProtocolString));
}
}
if (baseKafkaBootstrapServers.equals(url)) {
Expand Down Expand Up @@ -244,7 +244,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
if (KafkaSSLUtils.isKafkaSSLProtocol(kafkaSecurityProtocolString)
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(PubSubSecurityProtocol.SSL)) {
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(SecurityProtocol.SSL)) {
this.sslConfig = Optional.of(new SSLConfig(clusterProps));
} else {
this.sslConfig = Optional.empty();
Expand Down Expand Up @@ -276,8 +276,8 @@ public String getKafkaBootstrapServers() {
return kafkaBootstrapServers;
}

public PubSubSecurityProtocol getKafkaSecurityProtocol(String kafkaBootstrapUrl) {
PubSubSecurityProtocol clusterSpecificSecurityProtocol = kafkaBootstrapUrlToSecurityProtocol.get(kafkaBootstrapUrl);
public SecurityProtocol getKafkaSecurityProtocol(String kafkaBootstrapUrl) {
SecurityProtocol clusterSpecificSecurityProtocol = kafkaBootstrapUrlToSecurityProtocol.get(kafkaBootstrapUrl);
return clusterSpecificSecurityProtocol == null ? kafkaSecurityProtocol : clusterSpecificSecurityProtocol;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.manager.TopicManagerContext;
Expand Down Expand Up @@ -114,6 +113,7 @@
import java.util.function.BooleanSupplier;
import org.apache.avro.Schema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -1119,15 +1119,15 @@ private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String kafkaBoot
kafkaBootstrapUrls = resolvedKafkaUrl;
}
properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapUrls);
PubSubSecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
SecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
if (KafkaSSLUtils.isKafkaSSLProtocol(securityProtocol)) {
Optional<SSLConfig> sslConfig = serverConfig.getSslConfig();
if (!sslConfig.isPresent()) {
throw new VeniceException("SSLConfig should be present when Kafka SSL is enabled");
}
properties.putAll(sslConfig.get().getKafkaSSLConfig());
}
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name);
return new VeniceProperties(properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaEntry;
Expand All @@ -64,6 +63,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -129,7 +129,7 @@ private void setupMockConfig() {
doReturn(VeniceProperties.empty()).when(mockVeniceServerConfig).getKafkaConsumerConfigsForLocalConsumption();
doReturn(getConsumerAssignmentStrategy()).when(mockVeniceServerConfig).getSharedConsumerAssignmentStrategy();
doReturn(1).when(mockVeniceServerConfig).getConsumerPoolSizePerKafkaCluster();
doReturn(PubSubSecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl);
doReturn(SecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl);
doReturn(10).when(mockVeniceServerConfig).getKafkaMaxPollRecords();
doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherConsumerPoolSize();
doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherThreadPoolSize();
Expand Down
2 changes: 0 additions & 2 deletions clients/venice-admin-tool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ dependencies {
exclude group: 'org.apache.helix'
}
implementation('org.apache.helix:metrics-common:1.4.1:jdk8')
implementation libraries.zstd

testImplementation project(':internal:venice-common').sourceSets.test.output
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.linkedin.venice.utils;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;


public class KafkaSSLUtils {
Expand All @@ -26,20 +26,26 @@ public class KafkaSSLUtils {
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);

/**
* Right now, Venice only supports two Kafka protocols:
* {@link SecurityProtocol#PLAINTEXT}
* {@link SecurityProtocol#SSL}
*
* @param kafkaProtocol
* @return
*/
public static boolean isKafkaProtocolValid(String kafkaProtocol) {
return kafkaProtocol.equals(PubSubSecurityProtocol.PLAINTEXT.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_PLAINTEXT.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name());
return kafkaProtocol.equals(SecurityProtocol.PLAINTEXT.name()) || kafkaProtocol.equals(SecurityProtocol.SSL.name())
|| kafkaProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name())
|| kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name());
}

public static boolean isKafkaSSLProtocol(String kafkaProtocol) {
return kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name());
return kafkaProtocol.equals(SecurityProtocol.SSL.name()) || kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name());
}

public static boolean isKafkaSSLProtocol(PubSubSecurityProtocol kafkaProtocol) {
return kafkaProtocol == PubSubSecurityProtocol.SSL || kafkaProtocol == PubSubSecurityProtocol.SASL_SSL;
public static boolean isKafkaSSLProtocol(SecurityProtocol kafkaProtocol) {
return kafkaProtocol == SecurityProtocol.SSL || kafkaProtocol == SecurityProtocol.SASL_SSL;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -193,7 +192,7 @@ public void testDeleteTopicValidTopicDeletion() throws Exception {
DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class);
KafkaFuture<Void> topicDeletionFutureMock = mock(KafkaFuture.class);

when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock);
when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock);
when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock);
when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS))).thenReturn(null);

Expand All @@ -209,7 +208,7 @@ public void testDeleteTopicThrowsException() throws Exception {
DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class);
KafkaFuture<Void> topicDeletionFutureMock = mock(KafkaFuture.class);

when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock);
when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock);
when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock);

when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import static org.testng.Assert.*;

import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.testng.annotations.Test;


Expand All @@ -20,23 +20,23 @@ public class ApacheKafkaAdminConfigTest {

@Test
public void testSetupSaslInKafkaAdminPlaintext() {
testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_PLAINTEXT);
testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_PLAINTEXT);
}

@Test
public void testSetupSaslInKafkaAdminSSL() {
testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_SSL);
testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_SSL);
}

private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) {
private void testSetupSaslInKafkaAdmin(SecurityProtocol securityProtocol) {
Properties properties = new Properties();
properties.put("cluster.name", "cluster");
properties.put("zookeeper.address", "localhost:2181");
properties.put("kafka.bootstrap.servers", "localhost:9092");
properties.put("kafka.sasl.jaas.config", SASL_JAAS_CONFIG);
properties.put("kafka.sasl.mechanism", SASL_MECHANISM);
properties.put("kafka.security.protocol", securityProtocol.name());
if (securityProtocol.name().contains("SSL")) {
properties.put("kafka.security.protocol", securityProtocol.name);
if (securityProtocol.name.contains("SSL")) {
properties.put("ssl.truststore.location", "-");
properties.put("ssl.truststore.password", "");
properties.put("ssl.truststore.type", "JKS");
Expand All @@ -49,7 +49,7 @@ private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol)
Properties adminProperties = serverConfig.getAdminProperties();
assertEquals(SASL_JAAS_CONFIG, adminProperties.get("sasl.jaas.config"));
assertEquals(SASL_MECHANISM, adminProperties.get("sasl.mechanism"));
assertEquals(securityProtocol.name(), adminProperties.get("security.protocol"));
assertEquals(securityProtocol.name, adminProperties.get("security.protocol"));
}

@Test
Expand Down
Loading