From 84e91176f3909a28d29bdf5a766e4c2c72cd4461 Mon Sep 17 00:00:00 2001 From: Felix GV Date: Tue, 21 May 2024 18:34:56 -0700 Subject: [PATCH 1/3] Changed Kafka from LI's 2.4.1.65 to Apache's 2.4.1 Several code changes to make it work. Introduced a new PubSubSecurityProtocol to replace all usage of Kafka's SecurityProtocol enum, since that one has a different package name between the Apache and LinkedIn forks of Kafka. AK uses: org.apache.kafka.common.security.auth.SecurityProtocol While LI uses: org.apache.kafka.common.protocol.SecurityProtocol --- build.gradle | 14 ++-- .../davinci/config/VeniceClusterConfig.java | 20 +++--- .../consumer/KafkaStoreIngestionService.java | 4 +- .../KafkaStoreIngestionServiceTest.java | 4 +- clients/venice-admin-tool/build.gradle | 2 + .../pubsub/api/PubSubSecurityProtocol.java | 36 ++++++++++ .../linkedin/venice/utils/KafkaSSLUtils.java | 24 +++---- .../admin/ApacheKafkaAdminAdapterTest.java | 5 +- .../admin/ApacheKafkaAdminConfigTest.java | 8 +-- .../api/PubSubSecurityProtocolTest.java | 67 +++++++++++++++++++ .../venice/utils/KafkaSSLUtilsTest.java | 10 +-- internal/venice-test-common/build.gradle | 3 +- .../integration/utils/KafkaTestUtils.java | 4 +- .../utils/VeniceControllerWrapper.java | 4 +- .../utils/VeniceServerWrapper.java | 4 +- ...woLayerMultiRegionMultiClusterWrapper.java | 14 ++-- .../VeniceControllerClusterConfig.java | 4 +- 17 files changed, 165 insertions(+), 62 deletions(-) create mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java create mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java diff --git a/build.gradle b/build.gradle index 7d1a332888..767b18c554 100644 --- a/build.gradle +++ b/build.gradle @@ -40,8 +40,9 @@ if (project.hasProperty('overrideBuildEnvironment')) { def avroVersion = '1.10.2' def avroUtilVersion = '0.3.21' def grpcVersion = '1.49.2' -def kafkaGroup = 'com.linkedin.kafka' -def kafkaVersion = '2.4.1.65' +// N.B.: The build should also work when substituting Kafka from the Apache fork to LinkedIn's fork: +def kafkaGroup = 'org.apache.kafka' // 'com.linkedin.kafka' +def kafkaVersion = '2.4.1' // '2.4.1.65' def log4j2Version = '2.17.1' def pegasusVersion = '29.31.0' def protobufVersion = '3.21.7' @@ -52,14 +53,15 @@ def alpnAgentVersion = '2.0.10' def hadoopVersion = '2.10.2' def apacheSparkVersion = '3.3.3' def antlrVersion = '4.8' +def scala = '2.12' ext.libraries = [ alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}", antlr4: "org.antlr:antlr4:${antlrVersion}", antlr4Runtime: "org.antlr:antlr4-runtime:${antlrVersion}", - apacheSparkAvro: "org.apache.spark:spark-avro_2.12:${apacheSparkVersion}", - apacheSparkCore: "org.apache.spark:spark-core_2.12:${apacheSparkVersion}", - apacheSparkSql: "org.apache.spark:spark-sql_2.12:${apacheSparkVersion}", + apacheSparkAvro: "org.apache.spark:spark-avro_${scala}:${apacheSparkVersion}", + apacheSparkCore: "org.apache.spark:spark-core_${scala}:${apacheSparkVersion}", + apacheSparkSql: "org.apache.spark:spark-sql_${scala}:${apacheSparkVersion}", avro: "org.apache.avro:avro:${avroVersion}", avroCompiler: "org.apache.avro:avro-compiler:${avroVersion}", avroMapred: "org.apache.avro:avro-mapred:${avroVersion}", @@ -101,7 +103,7 @@ ext.libraries = [ jna: 'net.java.dev.jna:jna:4.5.1', jsr305: 'com.google.code.findbugs:jsr305:3.0.2', joptSimple: 'net.sf.jopt-simple:jopt-simple:3.2', - kafka: "${kafkaGroup}:kafka_2.12:${kafkaVersion}", + kafka: "${kafkaGroup}:kafka_${scala}:${kafkaVersion}", kafkaClients: "${kafkaGroup}:kafka-clients:${kafkaVersion}", kafkaClientsTest: "${kafkaGroup}:kafka-clients:${kafkaVersion}:test", log4j2api: "org.apache.logging.log4j:log4j-api:${log4j2Version}", diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java index cd1ab58ff7..04dabe3c44 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java @@ -27,6 +27,7 @@ import com.linkedin.venice.exceptions.ConfigurationException; import com.linkedin.venice.exceptions.UndefinedPropertyException; import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.KafkaSSLUtils; import com.linkedin.venice.utils.RegionUtils; import com.linkedin.venice.utils.VeniceProperties; @@ -44,7 +45,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -89,8 +89,8 @@ public class VeniceClusterConfig { private final VeniceProperties clusterProperties; - private final SecurityProtocol kafkaSecurityProtocol; - private final Map kafkaBootstrapUrlToSecurityProtocol; + private final PubSubSecurityProtocol kafkaSecurityProtocol; + private final Map kafkaBootstrapUrlToSecurityProtocol; private final Optional sslConfig; public VeniceClusterConfig(VeniceProperties clusterProps, Map> kafkaClusterMap) @@ -135,17 +135,17 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map tmpKafkaClusterIdToUrlMap = new Int2ObjectOpenHashMap<>(); Object2IntMap tmpKafkaClusterUrlToIdMap = new Object2IntOpenHashMap<>(); Int2ObjectMap tmpKafkaClusterIdToAliasMap = new Int2ObjectOpenHashMap<>(); Object2IntMap tmpKafkaClusterAliasToIdMap = new Object2IntOpenHashMap<>(); - Map tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>(); + Map tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>(); Map tmpKafkaUrlResolution = new HashMap<>(); boolean foundBaseKafkaUrlInMappingIfItIsPopulated = kafkaClusterMap.isEmpty(); @@ -167,7 +167,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map sslConfig = serverConfig.getSslConfig(); if (!sslConfig.isPresent()) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index b537fa8291..c39bd07317 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -43,6 +43,7 @@ import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.schema.SchemaEntry; @@ -60,7 +61,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.Function; import org.apache.avro.Schema; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -126,7 +126,7 @@ private void setupMockConfig() { doReturn(VeniceProperties.empty()).when(mockVeniceServerConfig).getKafkaConsumerConfigsForLocalConsumption(); doReturn(getConsumerAssignmentStrategy()).when(mockVeniceServerConfig).getSharedConsumerAssignmentStrategy(); doReturn(1).when(mockVeniceServerConfig).getConsumerPoolSizePerKafkaCluster(); - doReturn(SecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl); + doReturn(PubSubSecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl); doReturn(10).when(mockVeniceServerConfig).getKafkaMaxPollRecords(); doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherConsumerPoolSize(); doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherThreadPoolSize(); diff --git a/clients/venice-admin-tool/build.gradle b/clients/venice-admin-tool/build.gradle index 32de0e29f8..a6f46156a6 100644 --- a/clients/venice-admin-tool/build.gradle +++ b/clients/venice-admin-tool/build.gradle @@ -28,6 +28,8 @@ dependencies { exclude group: 'org.apache.helix' } implementation('org.apache.helix:metrics-common:1.4.1:jdk8') + implementation libraries.zstd + testImplementation project(':internal:venice-common').sourceSets.test.output } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java new file mode 100644 index 0000000000..05f0e5d13b --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java @@ -0,0 +1,36 @@ +package com.linkedin.venice.pubsub.api; + +import java.util.Locale; + + +/** + * This enum is equivalent to Kafka's SecurityProtocol enum. + * + * We need this abstraction because Kafka's enum is present in two different namespaces, which are different between + * LinkedIn's fork and the Apache fork. + */ +public enum PubSubSecurityProtocol { + /** Un-authenticated, non-encrypted channel */ + PLAINTEXT(0, "PLAINTEXT"), + /** SSL channel */ + SSL(1, "SSL"), + /** SASL authenticated, non-encrypted channel */ + SASL_PLAINTEXT(2, "SASL_PLAINTEXT"), + /** SASL authenticated, SSL channel */ + SASL_SSL(3, "SASL_SSL"); + + /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */ + public final short id; + + /** Name of the security protocol. This may be used by client configuration. */ + public final String name; + + PubSubSecurityProtocol(int id, String name) { + this.id = (short) id; + this.name = name; + } + + public static PubSubSecurityProtocol forName(String name) { + return PubSubSecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT)); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java index 9946842eed..19ed1a1a2c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java @@ -1,12 +1,12 @@ package com.linkedin.venice.utils; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import java.util.Arrays; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; public class KafkaSSLUtils { @@ -26,26 +26,20 @@ public class KafkaSSLUtils { SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG); - /** - * Right now, Venice only supports two Kafka protocols: - * {@link SecurityProtocol#PLAINTEXT} - * {@link SecurityProtocol#SSL} - * - * @param kafkaProtocol - * @return - */ public static boolean isKafkaProtocolValid(String kafkaProtocol) { - return kafkaProtocol.equals(SecurityProtocol.PLAINTEXT.name()) || kafkaProtocol.equals(SecurityProtocol.SSL.name()) - || kafkaProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name()) - || kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name()); + return kafkaProtocol.equals(PubSubSecurityProtocol.PLAINTEXT.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SASL_PLAINTEXT.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name()); } public static boolean isKafkaSSLProtocol(String kafkaProtocol) { - return kafkaProtocol.equals(SecurityProtocol.SSL.name()) || kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name()); + return kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name()); } - public static boolean isKafkaSSLProtocol(SecurityProtocol kafkaProtocol) { - return kafkaProtocol == SecurityProtocol.SSL || kafkaProtocol == SecurityProtocol.SASL_SSL; + public static boolean isKafkaSSLProtocol(PubSubSecurityProtocol kafkaProtocol) { + return kafkaProtocol == PubSubSecurityProtocol.SSL || kafkaProtocol == PubSubSecurityProtocol.SASL_SSL; } /** diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java index ecfea7c98c..00e2a9da70 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java @@ -27,6 +27,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -192,7 +193,7 @@ public void testDeleteTopicValidTopicDeletion() throws Exception { DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class); KafkaFuture topicDeletionFutureMock = mock(KafkaFuture.class); - when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock); + when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock); when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock); when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS))).thenReturn(null); @@ -208,7 +209,7 @@ public void testDeleteTopicThrowsException() throws Exception { DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class); KafkaFuture topicDeletionFutureMock = mock(KafkaFuture.class); - when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock); + when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock); when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock); when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS))) diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java index a0a197af02..5091e0fd01 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java @@ -2,13 +2,13 @@ import static org.testng.Assert.*; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.VeniceProperties; import java.util.Properties; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.testng.annotations.Test; @@ -20,15 +20,15 @@ public class ApacheKafkaAdminConfigTest { @Test public void testSetupSaslInKafkaAdminPlaintext() { - testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_PLAINTEXT); + testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_PLAINTEXT); } @Test public void testSetupSaslInKafkaAdminSSL() { - testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_SSL); + testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_SSL); } - private void testSetupSaslInKafkaAdmin(SecurityProtocol securityProtocol) { + private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) { Properties properties = new Properties(); properties.put("cluster.name", "cluster"); properties.put("zookeeper.address", "localhost:2181"); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java new file mode 100644 index 0000000000..75029fa273 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java @@ -0,0 +1,67 @@ +package com.linkedin.venice.pubsub.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import com.linkedin.venice.utils.ReflectUtils; +import java.util.HashSet; +import java.util.Set; +import org.testng.annotations.Test; + + +public class PubSubSecurityProtocolTest { + private static final Class SPECIFIC_ENUM_CLASS = getFQCN(); + + private static Class getFQCN() { + String simpleClassName = "SecurityProtocol"; + try { + String apacheKafkaFQCN = "org.apache.kafka.common.security.auth" + "." + simpleClassName; + Class apacheKafkaSecurityProtocol = ReflectUtils.loadClass(apacheKafkaFQCN); + if (apacheKafkaSecurityProtocol != null && apacheKafkaSecurityProtocol.isEnum()) { + return apacheKafkaSecurityProtocol; + } + } catch (Exception e) { + // Expected if not using Apache Kafka. + } + try { + String liKafkaFQCN = "org.apache.kafka.common.protocol" + "." + simpleClassName; + Class liKafkaSecurityProtocol = ReflectUtils.loadClass(liKafkaFQCN); + if (liKafkaSecurityProtocol != null && liKafkaSecurityProtocol.isEnum()) { + return liKafkaSecurityProtocol; + } + } catch (Exception e) { + // Expected if not using LI Kafka... + } + throw new IllegalStateException( + "Neither the Apache Kafka nor LinkedIn Kafka version of " + simpleClassName + " were found on the classpath!"); + } + + /** + * This is a proof of concept of how we could instantiate the specific enum, though as of now there is no need for it. + * + * @return an enum instance specific to the pub sub library present on the class path. + */ + private static Enum getPubSubSpecificEnum(String enumValueName) { + return Enum.valueOf(SPECIFIC_ENUM_CLASS, enumValueName); + } + + /** + * This test merely checks that our own PubSubSecurityProtocol contains only values which exist in Kafka's own enum. + * It can be run with the classpath containing either the Apache fork or LinkedIn's fork of Kafka, and succeed. + */ + @Test + public void testInstantiation() { + for (PubSubSecurityProtocol value: PubSubSecurityProtocol.values()) { + Enum specificEnum = getPubSubSpecificEnum(value.name()); + assertNotNull(specificEnum); + assertTrue(specificEnum.getClass().isEnum()); + assertEquals(specificEnum.getClass().getSimpleName(), "SecurityProtocol"); + Set expectedPackageNames = new HashSet<>(2); + expectedPackageNames.add("org.apache.kafka.common.security.auth"); + expectedPackageNames.add("org.apache.kafka.common.protocol"); + assertTrue(expectedPackageNames.contains(specificEnum.getClass().getPackage().getName())); + assertEquals(specificEnum.name(), value.name()); + } + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java index 0a36349c14..ef67ef05ae 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java @@ -2,7 +2,7 @@ import static org.testng.Assert.*; -import org.apache.kafka.common.protocol.SecurityProtocol; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import org.testng.annotations.Test; @@ -25,9 +25,9 @@ public void testIsKafkaSSLProtocol() { @Test public void testTestIsKafkaSSLProtocol() { - assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SSL)); - assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.PLAINTEXT)); - assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SASL_SSL)); - assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SASL_PLAINTEXT)); + assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SSL)); + assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.PLAINTEXT)); + assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SASL_SSL)); + assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SASL_PLAINTEXT)); } } diff --git a/internal/venice-test-common/build.gradle b/internal/venice-test-common/build.gradle index b037f6eb07..c29fcb2d7b 100644 --- a/internal/venice-test-common/build.gradle +++ b/internal/venice-test-common/build.gradle @@ -36,7 +36,6 @@ configurations { } } implementation { - exclude group: 'org.apache.kafka' exclude group: 'org.mortbay.jetty', module: 'servlet-api' } integrationTestImplementation.extendsFrom testImplementation @@ -109,6 +108,7 @@ dependencies { implementation libraries.samzaApi implementation libraries.spark implementation libraries.testng + implementation libraries.zstd implementation (libraries.mapreduceClientJobClient) { exclude group: 'org.apache.avro' @@ -119,6 +119,7 @@ dependencies { testImplementation project(':internal:venice-common').sourceSets.test.output testImplementation libraries.log4j2core testImplementation libraries.log4j2api + testImplementation libraries.kafkaClients jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:' + jmh.jmhVersion.get() jmhImplementation project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils') diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java index aa0d6d7730..b65d2b1f9a 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java @@ -1,12 +1,12 @@ package com.linkedin.venice.integration.utils; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.SslUtils; import com.linkedin.venice.utils.SslUtils.VeniceTlsConfiguration; import java.util.Properties; import kafka.server.KafkaConfig; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; /** @@ -43,7 +43,7 @@ public static Properties getLocalCommonKafkaSSLConfig(VeniceTlsConfiguration tls public static Properties getLocalKafkaClientSSLConfig() { Properties properties = new Properties(); - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name()); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, PubSubSecurityProtocol.SSL.name()); properties.putAll(getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); return properties; } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java index a13c8ba6eb..3f1de7c66f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java @@ -65,6 +65,7 @@ import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer; import com.linkedin.venice.stats.TehutiUtils; import com.linkedin.venice.utils.PropertyBuilder; @@ -82,7 +83,6 @@ import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -228,7 +228,7 @@ static StatefulServiceProvider generateService(VeniceCo } if (options.isSslToKafka()) { - builder.put(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.SSL.name); + builder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name); builder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java index 82c7393b4f..2783a7a8f2 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java @@ -42,6 +42,7 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.AllowlistAccessor; import com.linkedin.venice.helix.ZkAllowlistAccessor; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.server.VeniceServer; import com.linkedin.venice.server.VeniceServerContext; @@ -72,7 +73,6 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.io.FileUtils; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -256,7 +256,7 @@ static StatefulServiceProvider generateService( .put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000) .put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true); if (sslToKafka) { - serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.SSL.name); + serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name); serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java index 5065c3d52e..8ebaa243e2 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java @@ -18,6 +18,7 @@ import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import java.io.File; @@ -33,7 +34,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -257,8 +257,8 @@ private static Map> addKafkaClusterIDMappingToServer List regionNames, List kafkaBrokers) { if (serverProperties.isPresent()) { - SecurityProtocol baseSecurityProtocol = SecurityProtocol - .valueOf(serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name)); + PubSubSecurityProtocol baseSecurityProtocol = PubSubSecurityProtocol + .valueOf(serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name)); Map> kafkaClusterMap = new HashMap<>(); Map mapping; @@ -266,21 +266,21 @@ private static Map> addKafkaClusterIDMappingToServer mapping = new HashMap<>(); int clusterId = i - 1; mapping.put(KAFKA_CLUSTER_MAP_KEY_NAME, regionNames.get(clusterId)); - SecurityProtocol securityProtocol = baseSecurityProtocol; + PubSubSecurityProtocol securityProtocol = baseSecurityProtocol; if (clusterId > 0) { // Testing mixed security on any 2-layer setup with 2 or more DCs. - securityProtocol = SecurityProtocol.SSL; + securityProtocol = PubSubSecurityProtocol.SSL; } mapping.put(KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL, securityProtocol.name); // N.B. the first Kafka broker in the list is the parent, which we're excluding from the mapping, so this // is why the index here is offset by 1 compared to the cluster ID. PubSubBrokerWrapper pubSubBrokerWrapper = kafkaBrokers.get(i); - String kafkaAddress = securityProtocol == SecurityProtocol.SSL + String kafkaAddress = securityProtocol == PubSubSecurityProtocol.SSL ? pubSubBrokerWrapper.getSSLAddress() : pubSubBrokerWrapper.getAddress(); mapping.put(KAFKA_CLUSTER_MAP_KEY_URL, kafkaAddress); - String otherKafkaAddress = securityProtocol == SecurityProtocol.PLAINTEXT + String otherKafkaAddress = securityProtocol == PubSubSecurityProtocol.PLAINTEXT ? pubSubBrokerWrapper.getSSLAddress() : pubSubBrokerWrapper.getAddress(); mapping.put(KAFKA_CLUSTER_MAP_KEY_OTHER_URLS, otherKafkaAddress); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java index 7d390d3b99..9f9397a9f9 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java @@ -182,6 +182,7 @@ import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.meta.ReadStrategy; import com.linkedin.venice.meta.RoutingStrategy; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.pubsub.PubSubAdminAdapterFactory; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pushmonitor.LeakedPushStatusCleanUpService; @@ -206,7 +207,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -592,7 +592,7 @@ public VeniceControllerClusterConfig(VeniceProperties props) { this.sslKafkaBootStrapServers = sslToKafka ? props.getString(SSL_KAFKA_BOOTSTRAP_SERVERS) : null; this.helixSendMessageTimeoutMilliseconds = props.getInt(HELIX_SEND_MESSAGE_TIMEOUT_MS, 10000); - this.kafkaSecurityProtocol = props.getString(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name()); + this.kafkaSecurityProtocol = props.getString(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name()); if (!KafkaSSLUtils.isKafkaProtocolValid(kafkaSecurityProtocol)) { throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocol); } From bfaf0a9c39c75300959f7d69111ae6abca443047 Mon Sep 17 00:00:00 2001 From: Felix GV Date: Fri, 7 Jun 2024 09:46:48 -0700 Subject: [PATCH 2/3] Trimmed down the copy-pasted code in the PubSubSecurityProtocol. --- .../consumer/KafkaStoreIngestionService.java | 2 +- .../pubsub/api/PubSubSecurityProtocol.java | 19 ++++--------------- .../admin/ApacheKafkaAdminConfigTest.java | 6 +++--- .../api/PubSubSecurityProtocolTest.java | 19 +++++++++++++++++++ .../utils/VeniceControllerWrapper.java | 2 +- .../utils/VeniceServerWrapper.java | 2 +- ...woLayerMultiRegionMultiClusterWrapper.java | 6 +++--- 7 files changed, 32 insertions(+), 24 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 931a95db6c..c4bf6bec80 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -1114,7 +1114,7 @@ private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String kafkaBoot } properties.putAll(sslConfig.get().getKafkaSSLConfig()); } - properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name); + properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name()); return new VeniceProperties(properties); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java index 05f0e5d13b..b09d63dabc 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java @@ -11,24 +11,13 @@ */ public enum PubSubSecurityProtocol { /** Un-authenticated, non-encrypted channel */ - PLAINTEXT(0, "PLAINTEXT"), + PLAINTEXT, /** SSL channel */ - SSL(1, "SSL"), + SSL, /** SASL authenticated, non-encrypted channel */ - SASL_PLAINTEXT(2, "SASL_PLAINTEXT"), + SASL_PLAINTEXT, /** SASL authenticated, SSL channel */ - SASL_SSL(3, "SASL_SSL"); - - /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */ - public final short id; - - /** Name of the security protocol. This may be used by client configuration. */ - public final String name; - - PubSubSecurityProtocol(int id, String name) { - this.id = (short) id; - this.name = name; - } + SASL_SSL; public static PubSubSecurityProtocol forName(String name) { return PubSubSecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT)); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java index 5091e0fd01..1f2a998d7b 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java @@ -35,8 +35,8 @@ private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) properties.put("kafka.bootstrap.servers", "localhost:9092"); properties.put("kafka.sasl.jaas.config", SASL_JAAS_CONFIG); properties.put("kafka.sasl.mechanism", SASL_MECHANISM); - properties.put("kafka.security.protocol", securityProtocol.name); - if (securityProtocol.name.contains("SSL")) { + properties.put("kafka.security.protocol", securityProtocol.name()); + if (securityProtocol.name().contains("SSL")) { properties.put("ssl.truststore.location", "-"); properties.put("ssl.truststore.password", ""); properties.put("ssl.truststore.type", "JKS"); @@ -49,7 +49,7 @@ private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) Properties adminProperties = serverConfig.getAdminProperties(); assertEquals(SASL_JAAS_CONFIG, adminProperties.get("sasl.jaas.config")); assertEquals(SASL_MECHANISM, adminProperties.get("sasl.mechanism")); - assertEquals(securityProtocol.name, adminProperties.get("security.protocol")); + assertEquals(securityProtocol.name(), adminProperties.get("security.protocol")); } @Test diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java index 75029fa273..5edbd018e2 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java @@ -64,4 +64,23 @@ public void testInstantiation() { assertEquals(specificEnum.name(), value.name()); } } + + @Test + public void testForName() { + assertEquals(PubSubSecurityProtocol.forName("plaintext"), PubSubSecurityProtocol.PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("PLAINTEXT"), PubSubSecurityProtocol.PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("Plaintext"), PubSubSecurityProtocol.PLAINTEXT); + + assertEquals(PubSubSecurityProtocol.forName("ssl"), PubSubSecurityProtocol.SSL); + assertEquals(PubSubSecurityProtocol.forName("SSL"), PubSubSecurityProtocol.SSL); + assertEquals(PubSubSecurityProtocol.forName("Ssl"), PubSubSecurityProtocol.SSL); + + assertEquals(PubSubSecurityProtocol.forName("sasl_plaintext"), PubSubSecurityProtocol.SASL_PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("SASL_PLAINTEXT"), PubSubSecurityProtocol.SASL_PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("Sasl_Plaintext"), PubSubSecurityProtocol.SASL_PLAINTEXT); + + assertEquals(PubSubSecurityProtocol.forName("sasl_ssl"), PubSubSecurityProtocol.SASL_SSL); + assertEquals(PubSubSecurityProtocol.forName("SASL_SSL"), PubSubSecurityProtocol.SASL_SSL); + assertEquals(PubSubSecurityProtocol.forName("Sasl_Ssl"), PubSubSecurityProtocol.SASL_SSL); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java index 3f1de7c66f..78e51df38b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java @@ -228,7 +228,7 @@ static StatefulServiceProvider generateService(VeniceCo } if (options.isSslToKafka()) { - builder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name); + builder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name()); builder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java index 2783a7a8f2..669abfa9fe 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java @@ -256,7 +256,7 @@ static StatefulServiceProvider generateService( .put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000) .put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true); if (sslToKafka) { - serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name); + serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name()); serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java index 8ebaa243e2..3c9e760c5c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java @@ -257,8 +257,8 @@ private static Map> addKafkaClusterIDMappingToServer List regionNames, List kafkaBrokers) { if (serverProperties.isPresent()) { - PubSubSecurityProtocol baseSecurityProtocol = PubSubSecurityProtocol - .valueOf(serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name)); + PubSubSecurityProtocol baseSecurityProtocol = PubSubSecurityProtocol.valueOf( + serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name())); Map> kafkaClusterMap = new HashMap<>(); Map mapping; @@ -271,7 +271,7 @@ private static Map> addKafkaClusterIDMappingToServer // Testing mixed security on any 2-layer setup with 2 or more DCs. securityProtocol = PubSubSecurityProtocol.SSL; } - mapping.put(KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL, securityProtocol.name); + mapping.put(KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL, securityProtocol.name()); // N.B. the first Kafka broker in the list is the parent, which we're excluding from the mapping, so this // is why the index here is offset by 1 compared to the cluster ID. From 66e451346feacb4d7d7b9c9395abe7c2052a135c Mon Sep 17 00:00:00 2001 From: Felix GV Date: Mon, 30 Sep 2024 07:58:44 -0700 Subject: [PATCH 3/3] Fixed spotless violation and some compiler warnings. --- build.gradle | 4 +++- .../com/linkedin/venice/utils/VeniceEnumValueTest.java | 9 ++++++--- .../venice/controller/VeniceControllerClusterConfig.java | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index 767b18c554..9c0e5b3e0d 100644 --- a/build.gradle +++ b/build.gradle @@ -196,6 +196,8 @@ subprojects { if (JavaVersion.current() >= JavaVersion.VERSION_1_9) { tasks.withType(JavaCompile) { + // Compiler arguments can be injected here... + // options.compilerArgs << '-Xlint:unchecked' options.release = 8 } } @@ -818,4 +820,4 @@ task verifyJdkVersion { gradle.taskGraph.whenReady { // Ensure the JDK version is verified before any other tasks verifyJdkVersion -} \ No newline at end of file +} diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/VeniceEnumValueTest.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/VeniceEnumValueTest.java index 05a0861ac6..fedfec47a8 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/VeniceEnumValueTest.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/VeniceEnumValueTest.java @@ -60,7 +60,9 @@ public void test() { Function valueOfFunction = value -> { try { - return (T) valueOfMethod.invoke(null, value); + @SuppressWarnings("unchecked") + T valueOfReturn = (T) valueOfMethod.invoke(null, value); + return valueOfReturn; } catch (Exception e) { if (e.getClass() == InvocationTargetException.class && e.getCause() instanceof VeniceException) { // Those are expected for invalid values, so we bubble them up. @@ -95,7 +97,8 @@ public void test() { // Check that no other enum values exist besides those that are expected Method valuesFunction = getPublicStaticFunction(this.enumClass, VALUES_METHOD_NAME, new Class[0]); try { - T[] types = (T[]) valuesFunction.invoke(null, new Class[0]); + @SuppressWarnings("unchecked") + T[] types = (T[]) valuesFunction.invoke(null, new Object[0]); for (T type: types) { assertTrue( expectedMapping.containsKey(type.getValue()), @@ -106,7 +109,7 @@ public void test() { } } - private static Method getPublicStaticFunction(Class klass, String functionName, Class... params) { + private static Method getPublicStaticFunction(Class klass, String functionName, Class... params) { try { Method function = klass.getDeclaredMethod(functionName, params); assertTrue( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java index 9f9397a9f9..b29d14ec70 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java @@ -182,9 +182,9 @@ import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.meta.ReadStrategy; import com.linkedin.venice.meta.RoutingStrategy; -import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.pubsub.PubSubAdminAdapterFactory; import com.linkedin.venice.pubsub.PubSubClientsFactory; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.pushmonitor.LeakedPushStatusCleanUpService; import com.linkedin.venice.pushmonitor.PushMonitorType; import com.linkedin.venice.status.BatchJobHeartbeatConfigs;