diff --git a/it/google-cloud-platform/build.gradle b/it/google-cloud-platform/build.gradle index 1c63ade152d0..681b034adbfa 100644 --- a/it/google-cloud-platform/build.gradle +++ b/it/google-cloud-platform/build.gradle @@ -41,6 +41,7 @@ dependencies { implementation 'org.apache.parquet:parquet-avro:1.12.0' implementation 'org.apache.parquet:parquet-common:1.12.0' implementation 'org.apache.parquet:parquet-hadoop:1.12.0' + implementation 'org.apache.commons:commons-lang3:3.9' implementation library.java.gax implementation library.java.google_api_common implementation library.java.protobuf_java_util @@ -72,10 +73,11 @@ dependencies { testImplementation project(path: ":sdks:java:testing:test-utils") testImplementation project(path: ":sdks:java:io:google-cloud-platform") + testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:io:synthetic") testImplementation library.java.mockito_inline testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration") - testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadowTest") + testImplementation project(path: ":runners:direct-java", configuration: "shadow") testRuntimeOnly library.java.slf4j_simple } @@ -84,4 +86,6 @@ tasks.register("BigTablePerformanceTest", IoPerformanceTestUtilities.IoPerforman tasks.register("BigQueryPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOLT', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) tasks.register("BigQueryStressTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOST', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) tasks.register("BigQueryStorageApiStreamingPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryStreamingLT', ['configuration':'large', 'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) +tasks.register("PubSubPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'PubSubIOLT', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) tasks.register("WordCountIntegrationTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'WordCountIT', ['project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) + diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubSubIOLT.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubSubIOLT.java new file mode 100644 index 000000000000..f5bd4f59149b --- /dev/null +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubSubIOLT.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.gcp.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.cloud.Timestamp; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.TestProperties; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.IOLoadTestBase; +import org.apache.beam.runners.direct.DirectOptions; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.sdk.testutils.metrics.IOITMetrics; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** PubSubIO performance tests. */ +public class PubSubIOLT extends IOLoadTestBase { + + private static final int NUMBER_OF_BUNDLES_FOR_LOCAL = 10; + private static final int NUMBER_OF_BUNDLES_FOR_MEDIUM_AND_LARGE = 20; + private static final String READ_ELEMENT_METRIC_NAME = "read_count"; + private static final String MAP_RECORDS_STEP_NAME = "Map records"; + private static final String WRITE_TO_PUBSUB_STEP_NAME = "Write to PubSub"; + private static final Map TEST_CONFIGS_PRESET; + private static TopicName topicName; + private static String testConfigName; + private static Configuration configuration; + private static SubscriptionName subscription; + private static InfluxDBSettings influxDBSettings; + private static PubsubResourceManager resourceManager; + + @Rule public transient TestPipeline writePipeline = TestPipeline.create(); + @Rule public transient TestPipeline readPipeline = TestPipeline.create(); + + static { + try { + TEST_CONFIGS_PRESET = + ImmutableMap.of( + "local", + PubSubIOLT.Configuration.fromJsonString( + "{\"numRecords\":200,\"valueSizeBytes\":1000,\"pipelineTimeout\":7,\"runner\":\"DirectRunner\",\"numWorkers\":1}", + PubSubIOLT.Configuration.class), // 0.2 MB + "medium", + PubSubIOLT.Configuration.fromJsonString( + "{\"numRecords\":10000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\",\"numWorkers\":10}", + PubSubIOLT.Configuration.class), // 10 GB + "large", + PubSubIOLT.Configuration.fromJsonString( + "{\"numRecords\":100000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":50,\"runner\":\"DataflowRunner\",\"numWorkers\":20}", + PubSubIOLT.Configuration.class) // 100 GB + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Before + public void setup() throws IOException { + resourceManager = + PubsubResourceManager.builder("io-pubsub-lt", project, CREDENTIALS_PROVIDER).build(); + topicName = resourceManager.createTopic("topic"); + subscription = resourceManager.createSubscription(topicName, "subscription"); + PipelineOptionsFactory.register(TestPipelineOptions.class); + + // parse configuration + testConfigName = + TestProperties.getProperty("configuration", "local", TestProperties.Type.PROPERTY); + configuration = TEST_CONFIGS_PRESET.get(testConfigName); + if (configuration == null) { + try { + configuration = + PubSubIOLT.Configuration.fromJsonString(testConfigName, PubSubIOLT.Configuration.class); + } catch (IOException e) { + throw new IllegalArgumentException( + String.format( + "Unknown test configuration: [%s]. Pass to a valid configuration json, or use" + + " config presets: %s", + testConfigName, TEST_CONFIGS_PRESET.keySet())); + } + } + + // Explicitly set up number of bundles in SyntheticUnboundedSource since it has a bug in + // implementation where + // number of lost data in streaming pipeline equals to number of initial bundles. + configuration.forceNumInitialBundles = + testConfigName.equals("local") + ? NUMBER_OF_BUNDLES_FOR_LOCAL + : NUMBER_OF_BUNDLES_FOR_MEDIUM_AND_LARGE; + + // tempLocation needs to be set for DataflowRunner + if (!Strings.isNullOrEmpty(tempBucketName)) { + String tempLocation = String.format("gs://%s/temp/", tempBucketName); + writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation); + writePipeline.getOptions().setTempLocation(tempLocation); + readPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation); + readPipeline.getOptions().setTempLocation(tempLocation); + } + writePipeline.getOptions().as(PubsubOptions.class).setProject(project); + readPipeline.getOptions().as(PubsubOptions.class).setProject(project); + writePipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false); + readPipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false); + } + + @After + public void tearDownClass() { + ResourceManagerUtils.cleanResources(resourceManager); + } + + @Test + public void testStringWriteAndRead() throws IOException { + configuration.writeAndReadFormat = WriteAndReadFormat.STRING.toString(); + testWriteAndRead(); + } + + @Test + public void testAvroGenericClassWriteAndRead() throws IOException { + configuration.writeAndReadFormat = WriteAndReadFormat.AVRO.toString(); + testWriteAndRead(); + } + + @Test + public void testProtoPrimitiveWriteAndRead() throws IOException { + configuration.writeAndReadFormat = WriteAndReadFormat.PROTO.toString(); + testWriteAndRead(); + } + + @Test + public void testPubsubMessageWriteAndRead() throws IOException { + configuration.writeAndReadFormat = WriteAndReadFormat.PUBSUB_MESSAGE.toString(); + testWriteAndRead(); + } + + public void testWriteAndRead() throws IOException { + if (configuration.exportMetricsToInfluxDB) { + influxDBSettings = + InfluxDBSettings.builder() + .withHost(configuration.influxHost) + .withDatabase(configuration.influxDatabase) + .withMeasurement( + configuration.influxMeasurement + + "_" + + testConfigName + + "_" + + configuration.writeAndReadFormat) + .get(); + } + + WriteAndReadFormat format = WriteAndReadFormat.valueOf(configuration.writeAndReadFormat); + PipelineLauncher.LaunchInfo writeLaunchInfo = testWrite(format); + PipelineLauncher.LaunchInfo readLaunchInfo = testRead(format); + try { + PipelineOperator.Result readResult = + pipelineOperator.waitUntilDone( + createConfig(readLaunchInfo, Duration.ofMinutes(configuration.pipelineTimeout))); + + // Check the initial launch didn't fail + assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, readResult); + // streaming read pipeline does not end itself + // Fail the test if read pipeline (streaming) not in running state. + assertEquals( + PipelineLauncher.JobState.RUNNING, + pipelineLauncher.getJobStatus(project, region, readLaunchInfo.jobId())); + + // check metrics + double numRecords = + pipelineLauncher.getMetric( + project, + region, + readLaunchInfo.jobId(), + getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)); + + // Assert that actual data equals or greater than expected data number since there might be + // duplicates when testing big amount of data + long expectedDataNum = configuration.numRecords - configuration.forceNumInitialBundles; + assertTrue(numRecords >= expectedDataNum); + + // export metrics + MetricsConfiguration writeMetricsConfig = + MetricsConfiguration.builder() + .setInputPCollection("Map records.out0") + .setInputPCollectionV2("Map records/ParMultiDo(MapKVToV).out0") + .build(); + + MetricsConfiguration readMetricsConfig = + MetricsConfiguration.builder() + .setOutputPCollection("Counting element.out0") + .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0") + .build(); + + exportMetrics(writeLaunchInfo, writeMetricsConfig); + exportMetrics(readLaunchInfo, readMetricsConfig); + } catch (ParseException | InterruptedException e) { + throw new RuntimeException(e); + } finally { + cancelJobIfRunning(writeLaunchInfo); + cancelJobIfRunning(readLaunchInfo); + } + } + + private PipelineLauncher.LaunchInfo testWrite(WriteAndReadFormat format) throws IOException { + PCollection> dataFromSource = + writePipeline.apply( + "Read from source", Read.from(new SyntheticUnboundedSource(configuration))); + + switch (format) { + case STRING: + dataFromSource + .apply(MAP_RECORDS_STEP_NAME, ParDo.of(new MapKVtoString())) + .apply(WRITE_TO_PUBSUB_STEP_NAME, PubsubIO.writeStrings().to(topicName.toString())); + break; + case AVRO: + dataFromSource + .apply(MAP_RECORDS_STEP_NAME, ParDo.of(new MapKVtoGenericClass())) + .apply( + WRITE_TO_PUBSUB_STEP_NAME, + PubsubIO.writeAvros(GenericClass.class).to(topicName.toString())); + break; + case PROTO: + dataFromSource + .apply(MAP_RECORDS_STEP_NAME, ParDo.of(new MapKVtoPrimitiveProto())) + .apply( + WRITE_TO_PUBSUB_STEP_NAME, + PubsubIO.writeProtos(Primitive.class).to(topicName.toString())); + break; + case PUBSUB_MESSAGE: + dataFromSource + .apply(MAP_RECORDS_STEP_NAME, ParDo.of(new MapKVtoPubSubMessage())) + .apply(WRITE_TO_PUBSUB_STEP_NAME, PubsubIO.writeMessages().to(topicName.toString())); + break; + } + + PipelineLauncher.LaunchConfig writeOptions = + PipelineLauncher.LaunchConfig.builder("write-pubsub") + .setSdk(PipelineLauncher.Sdk.JAVA) + .setPipeline(writePipeline) + .addParameter("runner", configuration.runner) + .addParameter("streaming", "true") + .addParameter("numWorkers", String.valueOf(configuration.numWorkers)) + .build(); + + return pipelineLauncher.launch(project, region, writeOptions); + } + + private PipelineLauncher.LaunchInfo testRead(WriteAndReadFormat format) throws IOException { + PubsubIO.Read read = null; + + switch (format) { + case STRING: + read = PubsubIO.readStrings().fromSubscription(subscription.toString()); + break; + case AVRO: + read = PubsubIO.readAvros(GenericClass.class).fromSubscription(subscription.toString()); + break; + case PROTO: + read = PubsubIO.readProtos(Primitive.class).fromSubscription(subscription.toString()); + break; + case PUBSUB_MESSAGE: + read = PubsubIO.readMessages().fromSubscription(subscription.toString()); + break; + } + + readPipeline + .apply("Read from PubSub", read) + .apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME))); + + PipelineLauncher.LaunchConfig readOptions = + PipelineLauncher.LaunchConfig.builder("read-pubsub") + .setSdk(PipelineLauncher.Sdk.JAVA) + .setPipeline(readPipeline) + .addParameter("runner", configuration.runner) + .addParameter("streaming", "true") + .addParameter("numWorkers", String.valueOf(configuration.numWorkers)) + .build(); + + return pipelineLauncher.launch(project, region, readOptions); + } + + private void cancelJobIfRunning(PipelineLauncher.LaunchInfo pipelineLaunchInfo) + throws IOException { + if (pipelineLauncher.getJobStatus(project, region, pipelineLaunchInfo.jobId()) + == PipelineLauncher.JobState.RUNNING) { + pipelineLauncher.cancelJob(project, region, pipelineLaunchInfo.jobId()); + } + } + + private void exportMetrics( + PipelineLauncher.LaunchInfo launchInfo, MetricsConfiguration metricsConfig) + throws IOException, ParseException, InterruptedException { + + Map metrics = getMetrics(launchInfo, metricsConfig); + String testId = UUID.randomUUID().toString(); + String testTimestamp = Timestamp.now().toString(); + + if (configuration.exportMetricsToInfluxDB) { + Collection namedTestResults = new ArrayList<>(); + for (Map.Entry entry : metrics.entrySet()) { + NamedTestResult metricResult = + NamedTestResult.create(testId, testTimestamp, entry.getKey(), entry.getValue()); + namedTestResults.add(metricResult); + } + IOITMetrics.publishToInflux(testId, testTimestamp, namedTestResults, influxDBSettings); + } else { + exportMetricsToBigQuery(launchInfo, metrics); + } + } + + /** Mapper class to convert data from KV to String. */ + private static class MapKVtoString extends DoFn, String> { + @ProcessElement + public void process(ProcessContext context) { + byte[] byteValue = Objects.requireNonNull(context.element()).getValue(); + context.output(ByteString.copyFrom(byteValue).toString(StandardCharsets.UTF_8)); + } + } + + /** Mapper class to convert data from KV to GenericClass. */ + private static class MapKVtoGenericClass extends DoFn, GenericClass> { + @ProcessElement + public void process(ProcessContext context) { + byte[] byteValue = Objects.requireNonNull(context.element()).getValue(); + GenericClass pojo = new GenericClass(byteValue); + context.output(pojo); + } + } + + /** Mapper class to convert data from KV to Proto Primitive. */ + private static class MapKVtoPrimitiveProto extends DoFn, Primitive> { + @ProcessElement + public void process(ProcessContext context) { + byte[] byteValue = Objects.requireNonNull(context.element()).getValue(); + Primitive proto = + Primitive.newBuilder() + .setPrimitiveBytes(ByteString.copyFrom(byteValue)) + .setPrimitiveInt32(ByteBuffer.wrap(byteValue).getInt()) + .build(); + context.output(proto); + } + } + + /** Mapper class to convert data from KV to PubSubMessage. */ + private static class MapKVtoPubSubMessage extends DoFn, PubsubMessage> { + @ProcessElement + public void process(ProcessContext context) { + byte[] byteValue = Objects.requireNonNull(context.element()).getValue(); + PubsubMessage pubsubMessage = new PubsubMessage(byteValue, Collections.emptyMap()); + context.output(pubsubMessage); + } + } + + /** Example of Generic class to test PubSubIO.writeAvros()/readAvros methods. */ + static class GenericClass implements Serializable { + byte[] byteField; + + public GenericClass() {} + + public GenericClass(byte[] byteField) { + this.byteField = byteField; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()).add("byteField", byteField).toString(); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(byteField)); + } + + @Override + public boolean equals(@Nullable Object other) { + if (other == null || !(other instanceof GenericClass)) { + return false; + } + GenericClass o = (GenericClass) other; + return Arrays.equals(byteField, o.byteField); + } + } + + private enum WriteAndReadFormat { + STRING, + AVRO, + PROTO, + PUBSUB_MESSAGE + } + + /** Options for PubSub IO load test. */ + static class Configuration extends SyntheticSourceOptions { + /** Pipeline timeout in minutes. Must be a positive value. */ + @JsonProperty public int pipelineTimeout = 20; + + /** Runner specified to run the pipeline. */ + @JsonProperty public String runner = "DirectRunner"; + + /** PubSub write and read format: STRING/AVRO/PROTO/PUBSUB_MESSAGE. */ + @JsonProperty public String writeAndReadFormat = "STRING"; + + /** Number of workers for the pipeline. */ + @JsonProperty public int numWorkers = 20; + + /** + * Determines the destination for exporting metrics. If set to true, metrics will be exported to + * InfluxDB and displayed using Grafana. If set to false, metrics will be exported to BigQuery + * and displayed with Looker Studio. + */ + @JsonProperty public boolean exportMetricsToInfluxDB = false; + + /** InfluxDB measurement to publish results to. * */ + @JsonProperty public String influxMeasurement; + + /** InfluxDB host to publish metrics. * */ + @JsonProperty public String influxHost; + + /** InfluxDB database to publish metrics. * */ + @JsonProperty public String influxDatabase; + } +}