diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index ba6279a1349..4b7dd4ed781 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -900,6 +900,7 @@ class BeamModulePlugin implements Plugin { testcontainers_oracle : "org.testcontainers:oracle-xe:$testcontainers_version", testcontainers_postgresql : "org.testcontainers:postgresql:$testcontainers_version", testcontainers_rabbitmq : "org.testcontainers:rabbitmq:$testcontainers_version", + testcontainers_solace : "org.testcontainers:solace:$testcontainers_version", truth : "com.google.truth:truth:1.1.5", threetenbp : "org.threeten:threetenbp:1.6.8", vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.2", diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index 2d720ab7d92..e22016c7907 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -41,6 +41,7 @@ dependencies { implementation library.java.avro permitUnusedDeclared library.java.avro implementation library.java.google_api_common + implementation library.java.threetenbp implementation library.java.gax implementation library.java.threetenbp implementation library.java.google_http_client @@ -52,5 +53,6 @@ dependencies { testImplementation project(path: ":sdks:java:io:common") testImplementation project(path: ":sdks:java:testing:test-utils") testRuntimeOnly library.java.slf4j_jdk14 + testImplementation library.java.testcontainers_solace testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java new file mode 100644 index 00000000000..e9c3fe7dfcb --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.it; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.solace.Service; +import org.testcontainers.solace.SolaceContainer; +import org.testcontainers.utility.DockerImageName; + +public class SolaceContainerManager { + + public static final String VPN_NAME = "default"; + public static final String PASSWORD = "password"; + public static final String USERNAME = "username"; + public static final String TOPIC_NAME = "test_topic"; + private static final Logger LOG = LoggerFactory.getLogger(SolaceContainerManager.class); + private final SolaceContainer container; + + public SolaceContainerManager() { + this.container = + new SolaceContainer(DockerImageName.parse("solace/solace-pubsub-standard:10.7")) { + { + addFixedExposedPort(55555, 55555); + addFixedExposedPort(9000, 9000); + addFixedExposedPort(8080, 8080); + addFixedExposedPort(80, 80); + } + }.withVpn(VPN_NAME) + .withCredentials(USERNAME, PASSWORD) + // .withExposedPorts(Service.SMF.getPort()); + .withTopic(TOPIC_NAME, Service.SMF) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + container.addExposedPort(8080); + container.addExposedPort(55555); + } + + public void start() { + container.start(); + } + + void createQueueWithSubscriptionTopic(String queueName) { + executeCommand( + "curl", + "http://localhost:8080/SEMP/v2/config/msgVpns/" + VPN_NAME + "/topicEndpoints", + "-X", + "POST", + "-u", + "admin:admin", + "-H", + "Content-Type:application/json", + "-d", + "{\"topicEndpointName\":\"" + + TOPIC_NAME + + "\",\"accessType\":\"exclusive\",\"permission\":\"modify-topic\",\"ingressEnabled\":true,\"egressEnabled\":true}"); + executeCommand( + "curl", + "http://localhost:8080/SEMP/v2/config/msgVpns/" + VPN_NAME + "/queues", + "-X", + "POST", + "-u", + "admin:admin", + "-H", + "Content-Type:application/json", + "-d", + "{\"queueName\":\"" + + queueName + + "\",\"accessType\":\"non-exclusive\",\"maxMsgSpoolUsage\":200,\"permission\":\"consume\",\"ingressEnabled\":true,\"egressEnabled\":true}"); + executeCommand( + "curl", + "http://localhost:8080/SEMP/v2/config/msgVpns/" + + VPN_NAME + + "/queues/" + + queueName + + "/subscriptions", + "-X", + "POST", + "-u", + "admin:admin", + "-H", + "Content-Type:application/json", + "-d", + "{\"subscriptionTopic\":\"" + TOPIC_NAME + "\"}"); + } + + private void executeCommand(String... command) { + try { + org.testcontainers.containers.Container.ExecResult execResult = + container.execInContainer(command); + if (execResult.getExitCode() != 0) { + logCommandError(execResult.getStderr(), command); + } else { + LOG.info(execResult.getStdout()); + } + } catch (IOException | InterruptedException e) { + logCommandError(e.getMessage(), command); + } + } + + private void logCommandError(String error, String... command) { + LOG.error("Could not execute command {}: {}", command, error); + } + + public void stop() { + if (container != null) { + container.stop(); + } + } + + public void getQueueDetails(String queueName) { + executeCommand( + "curl", + "http://localhost:8080/SEMP/v2/monitor/msgVpns/" + + VPN_NAME + + "/queues/" + + queueName + + "/msgs", + "-X", + "GET", + "-u", + "admin:admin"); + } + + public void sendToTopic(String payload, List additionalHeaders) { + // https://docs.solace.com/API/RESTMessagingPrtl/Solace-REST-Message-Encoding.htm + + List command = + new ArrayList<>( + Arrays.asList( + "curl", + "http://localhost:9000/TOPIC/" + TOPIC_NAME, + "-X", + "POST", + "-u", + USERNAME + ":" + PASSWORD, + "--header", + "Content-Type:application/json", + "-d", + payload)); + + for (String additionalHeader : additionalHeaders) { + command.add("--header"); + command.add(additionalHeader); + } + + executeCommand(command.toArray(new String[0])); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java new file mode 100644 index 00000000000..7dcd1829609 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.it; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory; +import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory; +import org.apache.beam.sdk.io.solace.data.Solace.Queue; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testutils.metrics.MetricsReader; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +public class SolaceIOIT { + private static final String NAMESPACE = SolaceIOIT.class.getName(); + private static final String READ_COUNT = "read_count"; + private static SolaceContainerManager solaceContainerManager; + private static final TestPipelineOptions readPipelineOptions; + + static { + readPipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class); + readPipelineOptions.setBlockOnRun(false); + readPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false); + readPipelineOptions.as(StreamingOptions.class).setStreaming(false); + } + + @Rule public final TestPipeline readPipeline = TestPipeline.fromOptions(readPipelineOptions); + + @BeforeClass + public static void setup() { + solaceContainerManager = new SolaceContainerManager(); + solaceContainerManager.start(); + } + + @AfterClass + public static void afterClass() { + if (solaceContainerManager != null) { + solaceContainerManager.stop(); + } + } + + @Test + public void testRead() { + String queueName = "test_queue"; + solaceContainerManager.createQueueWithSubscriptionTopic(queueName); + + // todo this is very slow, needs to be replaced with the SolaceIO.write connector. + int publishMessagesCount = 20; + for (int i = 0; i < publishMessagesCount; i++) { + solaceContainerManager.sendToTopic( + "{\"field_str\":\"value\",\"field_int\":123}", + ImmutableList.of("Solace-Message-ID:m" + i)); + } + + readPipeline + .apply( + "Read from Solace", + SolaceIO.read() + .from(Queue.fromName(queueName)) + .withMaxNumConnections(1) + .withSempClientFactory( + BasicAuthSempClientFactory.builder() + .host("http://localhost:8080") + .username("admin") + .password("admin") + .vpnName(SolaceContainerManager.VPN_NAME) + .build()) + .withSessionServiceFactory( + BasicAuthJcsmpSessionServiceFactory.builder() + .host("localhost") + .username(SolaceContainerManager.USERNAME) + .password(SolaceContainerManager.PASSWORD) + .vpnName(SolaceContainerManager.VPN_NAME) + .build())) + .apply("Count", ParDo.of(new CountingFn<>(NAMESPACE, READ_COUNT))); + + PipelineResult pipelineResult = readPipeline.run(); + pipelineResult.waitUntilFinish(Duration.standardSeconds(15)); + + MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); + long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT); + assertEquals(publishMessagesCount, actualRecordsCount); + } + + private static class CountingFn extends DoFn { + + private final Counter elementCounter; + + CountingFn(String namespace, String name) { + elementCounter = Metrics.counter(namespace, name); + } + + @ProcessElement + public void processElement(@Element T record, OutputReceiver c) { + elementCounter.inc(1L); + c.output(record); + } + } +}