From 4b1f4e68acca0bdac89bd115995a136b411957e4 Mon Sep 17 00:00:00 2001 From: bzablocki Date: Thu, 13 Jun 2024 23:20:26 +0200 Subject: [PATCH] Solace Read connector: SolaceIO PTransform init (#31594) * Splitting the #31476 - Leaving only PTransform AutoValue configurations --- .../beam/gradle/BeamModulePlugin.groovy | 2 + sdks/java/io/solace/build.gradle | 6 +- .../apache/beam/sdk/io/solace/SolaceIO.java | 237 ++++++++++++++++++ .../io/solace/broker/SempClientFactory.java | 26 ++ .../solace/broker/SessionServiceFactory.java | 26 ++ .../sdk/io/solace/broker/package-info.java | 20 ++ .../beam/sdk/io/solace/data/Solace.java | 71 ++++++ .../beam/sdk/io/solace/data/package-info.java | 20 ++ .../beam/sdk/io/solace/package-info.java | 20 ++ 9 files changed, 427 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/package-info.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/package-info.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/package-info.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 65fcf7333407..af3568a359ef 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -636,6 +636,7 @@ class BeamModulePlugin implements Plugin { def singlestore_jdbc_version = "1.1.4" def slf4j_version = "1.7.30" def snakeyaml_version = "2.2" + def solace_version = "10.21.0" def spark2_version = "2.4.8" def spark3_version = "3.2.2" def spotbugs_version = "4.0.6" @@ -877,6 +878,7 @@ class BeamModulePlugin implements Plugin { slf4j_log4j12 : "org.slf4j:slf4j-log4j12:$slf4j_version", slf4j_jcl : "org.slf4j:slf4j-jcl:$slf4j_version", snappy_java : "org.xerial.snappy:snappy-java:1.1.10.4", + solace : "com.solacesystems:sol-jcsmp:$solace_version", spark_core : "org.apache.spark:spark-core_2.11:$spark2_version", spark_streaming : "org.apache.spark:spark-streaming_2.11:$spark2_version", spark3_core : "org.apache.spark:spark-core_2.12:$spark3_version", diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index c09df4245015..c49b79f96a3d 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -30,4 +30,8 @@ ext.summary = """IO to read and write to Solace destinations (queues and topics) dependencies { implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) -} \ No newline at end of file + implementation library.java.vendored_guava_32_1_2_jre + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.joda_time + implementation library.java.solace +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java new file mode 100644 index 000000000000..ca8cd615ac68 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.Topic; +import org.apache.beam.sdk.io.solace.broker.SempClientFactory; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +public class SolaceIO { + + private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false; + + /** Get a {@link Topic} object from the topic name. */ + static Topic topicFromName(String topicName) { + return JCSMPFactory.onlyInstance().createTopic(topicName); + } + + /** Get a {@link Queue} object from the queue name. */ + static Queue queueFromName(String queueName) { + return JCSMPFactory.onlyInstance().createQueue(queueName); + } + + @AutoValue + public abstract static class Read extends PTransform> { + + /** Set the queue name to read from. Use this or the `from(Topic)` method. */ + public Read from(Solace.Queue queue) { + return toBuilder().setQueue(queueFromName(queue.getName())).build(); + } + + /** Set the topic name to read from. Use this or the `from(Queue)` method. */ + public Read from(Solace.Topic topic) { + return toBuilder().setTopic(topicFromName(topic.getName())).build(); + } + + /** + * The timestamp function, used for estimating the watermark, mapping the record T to an {@link + * Instant} + * + *

Optional when using the no-arg {@link SolaceIO#read()} method. Defaults to {@link + * SolaceIO#SENDER_TIMESTAMP_FUNCTION}. When using the {@link SolaceIO#read(TypeDescriptor, + * SerializableFunction, SerializableFunction)} method, the function mapping from T to {@link + * Instant} has to be passed as an argument. + */ + public Read withTimestampFn(SerializableFunction timestampFn) { + checkState( + timestampFn != null, + "SolaceIO.Read: timestamp function must be set or use the" + + " `Read.readSolaceRecords()` method"); + return toBuilder().setTimestampFn(timestampFn).build(); + } + + /** + * Optional. Sets the maximum number of connections to the broker. The actual number of sessions + * is determined by this and the number set by the runner. If not set, the number of sessions is + * determined by the runner. The number of connections created follows this logic: + * `numberOfConnections = min(maxNumConnections, desiredNumberOfSplits)`, where the + * `desiredNumberOfSplits` is set by the runner. + */ + public Read withMaxNumConnections(Integer maxNumConnections) { + return toBuilder().setMaxNumConnections(maxNumConnections).build(); + } + + /** + * Optional, default: false. Set to deduplicate messages based on the {@link + * BytesXMLMessage#getApplicationMessageId()} of the incoming {@link BytesXMLMessage}. If the + * field is null, then the {@link BytesXMLMessage#getReplicationGroupMessageId()} will be used, + * which is always set by Solace. + */ + public Read withDeduplicateRecords(boolean deduplicateRecords) { + return toBuilder().setDeduplicateRecords(deduplicateRecords).build(); + } + + /** + * Set a factory that creates a {@link org.apache.beam.sdk.io.solace.broker.SempClientFactory}. + * + *

The factory `create()` method is invoked in each instance of an {@link + * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader}. Created {@link + * org.apache.beam.sdk.io.solace.broker.SempClient} has to communicate with broker management + * API. It must support operations such as: + * + *

    + *
  • query for outstanding backlog bytes in a Queue, + *
  • query for metadata such as access-type of a Queue, + *
  • requesting creation of new Queues. + *
+ * + *

An existing implementation of the SempClientFactory includes {@link + * org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} which implements connection + * to the SEMP with the Basic Authentication method. + * + *

To use it, specify the credentials with the builder methods. + * + *

The format of the host is `[Protocol://]Host[:Port]` + * + *

{@code
+     * .withSempClientFactory(
+     *         BasicAuthSempClientFactory.builder()
+     *               .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
+     *               .username("username")
+     *               .password("password")
+     *               .vpnName("vpn-name")
+     *               .build())
+     * }
+ */ + public Read withSempClientFactory(SempClientFactory sempClientFactory) { + checkState(sempClientFactory != null, "SolaceIO.Read: sempClientFactory must not be null."); + return toBuilder().setSempClientFactory(sempClientFactory).build(); + } + + /** + * Set a factory that creates a {@link SessionService}. + * + *

The factory `create()` method is invoked in each instance of an {@link + * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader}. Created {@link SessionService} has + * to be able to: + * + *

    + *
  • initialize a connection with the broker, + *
  • check liveliness of the connection, + *
  • close the connection, + *
  • create a {@link org.apache.beam.sdk.io.solace.broker.MessageReceiver}. + *
+ * + *

An existing implementation of the SempClientFactory includes {@link + * org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService} which implements the Basic + * Authentication to Solace. * + * + *

To use it, specify the credentials with the builder methods. * + * + *

The host is the IPv4 or IPv6 or host name of the appliance. IPv6 addresses must be encoded + * in brackets ([]). For example, "12.34.56.78", or "[fe80::1]". If connecting to a non-default + * port, it can be specified here using the "Host:Port" format. For example, "12.34.56.78:4444", + * or "[fe80::1]:4444". + * + *

{@code
+     * BasicAuthJcsmpSessionServiceFactory.builder()
+     *     .host("your-host-name")
+     *           // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
+     *     .username("semp-username")
+     *     .password("semp-password")
+     *     .vpnName("vpn-name")
+     *     .build()));
+     * }
+ */ + public Read withSessionServiceFactory(SessionServiceFactory sessionServiceFactory) { + checkState( + sessionServiceFactory != null, "SolaceIO.Read: sessionServiceFactory must not be null."); + return toBuilder().setSessionServiceFactory(sessionServiceFactory).build(); + } + + abstract @Nullable Queue getQueue(); + + abstract @Nullable Topic getTopic(); + + abstract @Nullable SerializableFunction getTimestampFn(); + + abstract @Nullable Integer getMaxNumConnections(); + + abstract boolean getDeduplicateRecords(); + + abstract SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn(); + + abstract @Nullable SempClientFactory getSempClientFactory(); + + abstract @Nullable SessionServiceFactory getSessionServiceFactory(); + + abstract TypeDescriptor getTypeDescriptor(); + + public static Builder builder() { + Builder builder = new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read.Builder(); + builder.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS); + return builder; + } + + abstract Builder toBuilder(); + + @AutoValue.Builder + public abstract static class Builder { + + abstract Builder setQueue(Queue queue); + + abstract Builder setTopic(Topic topic); + + abstract Builder setTimestampFn(SerializableFunction timestampFn); + + abstract Builder setMaxNumConnections(Integer maxNumConnections); + + abstract Builder setDeduplicateRecords(boolean deduplicateRecords); + + abstract Builder setParseFn( + SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn); + + abstract Builder setSempClientFactory(SempClientFactory brokerServiceFactory); + + abstract Builder setSessionServiceFactory(SessionServiceFactory sessionServiceFactory); + + abstract Builder setTypeDescriptor(TypeDescriptor typeDescriptor); + + abstract Read build(); + } + + @Override + public PCollection expand(PBegin input) { + throw new UnsupportedOperationException(""); + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java new file mode 100644 index 000000000000..b5cb53e14b39 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import java.io.Serializable; + +/** + * This interface serves as a blueprint for creating SempClient objects, which are used to interact + * with a Solace message broker using the Solace Element Management Protocol (SEMP). + */ +public interface SempClientFactory extends Serializable {} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java new file mode 100644 index 000000000000..ab1f55ae7a9c --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import java.io.Serializable; + +/** + * This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a + * queue property and mandates the implementation of a create() method in concrete subclasses. + */ +public abstract class SessionServiceFactory implements Serializable {} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/package-info.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/package-info.java new file mode 100644 index 000000000000..960e24e2a1b3 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Solace IO broker-related classes. */ +package org.apache.beam.sdk.io.solace.broker; diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java new file mode 100644 index 000000000000..076a16b96ceb --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.data; + +/** + * A record to be written to a Solace topic. + * + *

You need to transform to {@link Solace.Record} to be able to write to Solace. For that, you + * can use the {@link Solace.Record.Builder} provided with this class. + * + *

For instance, to create a record, use the following code: + * + *

{@code
+ * Solace.Record record = Solace.Record.builder()
+ *         .setMessageId(messageId)
+ *         .setSenderTimestamp(timestampMillis)
+ *         .setPayload(payload)
+ *         .build();
+ * }
+ * + * Setting the message id and the timestamp is mandatory. + */ +public class Solace { + + public static class Queue { + private final String name; + + private Queue(String name) { + this.name = name; + } + + public static Queue fromName(String name) { + return new Queue(name); + } + + public String getName() { + return name; + } + } + + public static class Topic { + private final String name; + + private Topic(String name) { + this.name = name; + } + + public static Topic fromName(String name) { + return new Topic(name); + } + + public String getName() { + return name; + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/package-info.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/package-info.java new file mode 100644 index 000000000000..edf584310cab --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Solace IO connector - data-related classes. */ +package org.apache.beam.sdk.io.solace.data; diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/package-info.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/package-info.java new file mode 100644 index 000000000000..3996b9ad3e04 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Solace IO connector. */ +package org.apache.beam.sdk.io.solace;