diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index bbd23a08bcd6..e603e49f842f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -758,6 +758,7 @@ class BeamModulePlugin implements Plugin { // [bomupgrader] the BOM version is set by scripts/tools/bomupgrader.py. If update manually, also update // libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.39.0", + google_cloud_secret_manager : "com.google.cloud:google-cloud-secretmanager", // google_cloud_platform_libraries_bom sets version google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests", google_cloud_vertexai : "com.google.cloud:google-cloud-vertexai", // google_cloud_platform_libraries_bom sets version @@ -858,6 +859,7 @@ class BeamModulePlugin implements Plugin { proto_google_cloud_firestore_v1 : "com.google.api.grpc:proto-google-cloud-firestore-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version + proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index 7a74236539fb..741db51a5772 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -34,6 +34,9 @@ dependencies { implementation library.java.joda_time implementation library.java.solace implementation library.java.google_cloud_core + implementation library.java.google_cloud_secret_manager + implementation library.java.proto_google_cloud_secret_manager_v1 + implementation library.java.protobuf_java implementation library.java.vendored_guava_32_1_2_jre implementation project(":sdks:java:extensions:avro") implementation library.java.avro diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index e6b0dd34b184..980267b3f358 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -181,6 +181,10 @@ * * } * + *

Writing

+ * + * TBD + * *

Authentication

* *

When reading from Solace, the user must use {@link @@ -209,6 +213,12 @@ public class SolaceIO { }; private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false; + // Part of the new write connector, documentation to be updated in upcoming pull requests + public enum SubmissionMode { + HIGHER_THROUGHPUT, + LOWER_LATENCY + } + /** Get a {@link Topic} object from the topic name. */ static Topic topicFromName(String topicName) { return JCSMPFactory.onlyInstance().createTopic(topicName); diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java index 7863dbd129ce..df814b5c2be1 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -39,7 +39,7 @@ *

This class provides a way to connect to a Solace broker and receive messages from a queue. The * connection is established using basic authentication. */ -public class BasicAuthJcsmpSessionService implements SessionService { +public class BasicAuthJcsmpSessionService extends SessionService { private final String queueName; private final String host; private final String username; @@ -137,12 +137,19 @@ private int connectSession() throws JCSMPException { } private JCSMPSession createSessionObject() throws InvalidPropertiesException { - JCSMPProperties properties = new JCSMPProperties(); - properties.setProperty(JCSMPProperties.HOST, host); - properties.setProperty(JCSMPProperties.USERNAME, username); - properties.setProperty(JCSMPProperties.PASSWORD, password); - properties.setProperty(JCSMPProperties.VPN_NAME, vpnName); - + JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties()); return JCSMPFactory.onlyInstance().createSession(properties); } + + @Override + public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) { + baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName); + + baseProps.setProperty( + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC); + baseProps.setProperty(JCSMPProperties.USERNAME, username); + baseProps.setProperty(JCSMPProperties.PASSWORD, password); + baseProps.setProperty(JCSMPProperties.HOST, host); + return baseProps; + } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java index 8cb4ff0af053..2084e61b7e38 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.solace.broker; +import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; @@ -39,7 +40,7 @@ public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionService public abstract String vpnName(); public static Builder builder() { - return new AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder(); + return new AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder().vpnName(DEFAULT_VPN_NAME); } @AutoValue.Builder diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java new file mode 100644 index 000000000000..dd87e1d75fa5 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME; + +import com.google.auto.value.AutoValue; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretVersionName; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements a {@link SessionServiceFactory} that retrieve the basic authentication + * credentials from a Google Cloud Secret Manager secret. + * + *

It can be used to avoid having to pass the password as an option of your pipeline. For this + * provider to work, the worker where the job runs needs to have the necessary credentials to access + * the secret. In Dataflow, this implies adding the necessary permissions to the worker service + * account. For other runners, set the credentials in the pipeline options using {@link + * org.apache.beam.sdk.extensions.gcp.options.GcpOptions}. + * + *

It also shows how to implement a {@link SessionServiceFactory} that depends on using external + * resources to retrieve the Solace session properties. In this case, using the Google Cloud Secrete + * Manager client. + * + *

Example of how to create the provider object: + * + *

{@code
+ * GCPSecretSessionServiceFactory factory =
+ *     GCPSecretSessionServiceFactory.builder()
+ *         .username("user")
+ *         .host("host:port")
+ *         .passwordSecretName("secret-name")
+ *         .build();
+ *
+ * SessionService serviceUsingGCPSecret = factory.create();
+ * }
+ */ +@AutoValue +public abstract class GCPSecretSessionServiceFactory extends SessionServiceFactory { + + private static final Logger LOG = LoggerFactory.getLogger(GCPSecretSessionServiceFactory.class); + + private static final String PROJECT_NOT_FOUND = "PROJECT-NOT-FOUND"; + + public abstract String username(); + + public abstract String host(); + + public abstract String passwordSecretName(); + + public abstract String vpnName(); + + public abstract @Nullable String secretManagerProjectId(); + + public abstract String passwordSecretVersion(); + + public static GCPSecretSessionServiceFactory.Builder builder() { + return new AutoValue_GCPSecretSessionServiceFactory.Builder() + .passwordSecretVersion("latest") + .vpnName(DEFAULT_VPN_NAME); + } + + @AutoValue.Builder + public abstract static class Builder { + + /** Username to be used to authenticate with the broker. */ + public abstract GCPSecretSessionServiceFactory.Builder username(String username); + + /** + * The location of the broker, including port details if it is not listening in the default + * port. + */ + public abstract GCPSecretSessionServiceFactory.Builder host(String host); + + /** The Secret Manager secret name where the password is stored. */ + public abstract GCPSecretSessionServiceFactory.Builder passwordSecretName(String name); + + /** Optional. Solace broker VPN name. If not set, "default" is used. */ + public abstract GCPSecretSessionServiceFactory.Builder vpnName(String name); + + /** + * Optional for Dataflow or VMs running on Google Cloud. The project id of the project where the + * secret is stored. If not set, the project id where the job is running is used. + */ + public abstract GCPSecretSessionServiceFactory.Builder secretManagerProjectId(String id); + + /** Optional. Solace broker password secret version. If not set, "latest" is used. */ + public abstract GCPSecretSessionServiceFactory.Builder passwordSecretVersion(String version); + + public abstract GCPSecretSessionServiceFactory build(); + } + + @Override + public SessionService create() { + String password = null; + try { + password = retrieveSecret(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + BasicAuthJcsmpSessionServiceFactory factory = + BasicAuthJcsmpSessionServiceFactory.builder() + .username(username()) + .host(host()) + .password(password) + .vpnName(vpnName()) + .build(); + + return factory.create(); + } + + private String retrieveSecret() throws IOException { + try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { + String projectId = + Optional.ofNullable(secretManagerProjectId()).orElse(getProjectIdFromVmMetadata()); + SecretVersionName secretVersionName = + SecretVersionName.of(projectId, passwordSecretName(), passwordSecretVersion()); + return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private String getProjectIdFromVmMetadata() throws IOException { + URL metadataUrl = + new URL("http://metadata.google.internal/computeMetadata/v1/project/project-id"); + HttpURLConnection connection = (HttpURLConnection) metadataUrl.openConnection(); + connection.setRequestProperty("Metadata-Flavor", "Google"); + + String output; + try (BufferedReader reader = + new BufferedReader( + new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) { + output = reader.readLine(); + } + + if (output == null || output.isEmpty()) { + LOG.error( + "Cannot retrieve project id from VM metadata, please set a project id in your GoogleCloudSecretProvider."); + } + return output != null ? output : PROJECT_NOT_FOUND; + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java index cd368865f0c3..aed700a71ded 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java @@ -17,34 +17,220 @@ */ package org.apache.beam.sdk.io.solace.broker; +import com.solacesystems.jcsmp.JCSMPProperties; import java.io.Serializable; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The SessionService interface provides a set of methods for managing a session with the Solace * messaging system. It allows for establishing a connection, creating a message-receiver object, * checking if the connection is closed or not, and gracefully closing the session. + * + *

Override this class and the method {@link #initializeSessionProperties(JCSMPProperties)} with + * your specific properties, including all those related to authentication. + * + *

The connector will call the method only once per session created, so you can perform + * relatively heavy operations in that method (e.g. connect to a store or vault to retrieve + * credentials). + * + *

There are some default properties that are set by default and can be overridden in this + * provider, that are relevant for the writer connector, and not used in the case of the read + * connector (since they are not necessary for reading): + * + *

+ * + *

The connector overrides other properties, regardless of what this provider sends to the + * connector. Those properties are the following. Again, these properties are only relevant for the + * write connector. + * + *

+ * + * Those properties are set by the connector based on the values of {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Write#withWriterType(SolaceIO.WriterType)} and {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Write#withSubmissionMode(SolaceIO.SubmissionMode)}. + * + *

The method will always run in a worker thread or task, and not in the driver program. If you + * need to access any resource to set the properties, you need to make sure that the worker has the + * network connectivity required for that, and that any credential or configuration is passed to the + * provider through the constructor. + * + *

The connector ensures that no two threads will be calling that method at the same time, so you + * don't have to take any specific precautions to avoid race conditions. + * + *

For basic authentication, use {@link BasicAuthJcsmpSessionService} and {@link + * BasicAuthJcsmpSessionServiceFactory}. + * + *

For other situations, you need to extend this class. For instance: + * + *

{@code
+ * public class MySessionService extends SessionService {
+ *   private final String authToken;
+ *
+ *   public MySessionService(String token) {
+ *    this.oauthToken = token;
+ *    ...
+ *   }
+ *
+ *   {@literal }@Override
+ *   public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
+ *     baseProps.setProperty(JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2);
+ *     baseProps.setProperty(JCSMPProperties.OAUTH2_ACCESS_TOKEN, authToken);
+ *     return props;
+ *   }
+ *
+ *   {@literal }@Override
+ *   public void connect() {
+ *       ...
+ *   }
+ *
+ *   ...
+ * }
+ * }
*/ -public interface SessionService extends Serializable { +public abstract class SessionService implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(SessionService.class); + + public static final String DEFAULT_VPN_NAME = "default"; + + private static final int STREAMING_PUB_ACK_WINDOW = 50; + private static final int BATCHED_PUB_ACK_WINDOW = 255; /** * Establishes a connection to the service. This could involve providing connection details like * host, port, VPN name, username, and password. */ - void connect(); + public abstract void connect(); /** Gracefully closes the connection to the service. */ - void close(); + public abstract void close(); /** * Checks whether the connection to the service is currently closed. This method is called when an * `UnboundedSolaceReader` is starting to read messages - a session will be created if this * returns true. */ - boolean isClosed(); + public abstract boolean isClosed(); /** * Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is * created from the session instance. */ - MessageReceiver createReceiver(); + public abstract MessageReceiver createReceiver(); + + /** + * Override this method and provide your specific properties, including all those related to + * authentication, and possibly others too. The {@code}baseProperties{@code} parameter sets the + * Solace VPN to "default" if none is specified. + * + *

You should add your properties to the parameter {@code}baseProperties{@code}, and return the + * result. + * + *

The method will be used whenever the session needs to be created or refreshed. If you are + * setting credentials with expiration, just make sure that the latest available credentials (e.g. + * renewed token) are set when the method is called. + * + *

For a list of all the properties that can be set, please check the following link: + * + *

+ */ + public abstract JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties); + + /** + * This method will be called by the write connector when a new session is started. + * + *

This call will happen in the worker, so you need to make sure that the worker has access to + * the resources you need to set the properties. + * + *

The call will happen only once per session initialization. Typically, that will be when the + * worker and the client are created. But if for any reason the session is lost (e.g. expired auth + * token), this method will be called again. + */ + public final JCSMPProperties initializeWriteSessionProperties(SolaceIO.SubmissionMode mode) { + JCSMPProperties jcsmpProperties = initializeSessionProperties(getDefaultProperties()); + return overrideConnectorProperties(jcsmpProperties, mode); + } + + private static JCSMPProperties getDefaultProperties() { + JCSMPProperties props = new JCSMPProperties(); + props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME); + // Outgoing messages will have a sender timestamp field populated + props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true); + // Make XMLProducer safe to access from several threads. This is the default value, setting + // it just in case. + props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true); + + return props; + } + + /** + * This method overrides some properties for the broker session to prevent misconfiguration, + * taking into account how the write connector works. + */ + private static JCSMPProperties overrideConnectorProperties( + JCSMPProperties props, SolaceIO.SubmissionMode mode) { + + // PUB_ACK_WINDOW_SIZE heavily affects performance when publishing persistent + // messages. It can be a value between 1 and 255. This is the batch size for the ack + // received from Solace. A value of 1 will have the lowest latency, but a very low + // throughput and a monumental backpressure. + + // This controls how the messages are sent to Solace + if (mode == SolaceIO.SubmissionMode.HIGHER_THROUGHPUT) { + // Create a parallel thread and a queue to send the messages + + Boolean msgCbProp = props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR); + if (msgCbProp != null && msgCbProp) { + LOG.warn( + "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to false since" + + " HIGHER_THROUGHPUT mode was selected"); + } + + props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, false); + + Integer ackWindowSize = props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE); + if ((ackWindowSize != null && ackWindowSize != BATCHED_PUB_ACK_WINDOW)) { + LOG.warn( + String.format( + "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since" + + " HIGHER_THROUGHPUT mode was selected", + BATCHED_PUB_ACK_WINDOW)); + } + props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, BATCHED_PUB_ACK_WINDOW); + } else { + // Send from the same thread where the produced is being called. This offers the lowest + // latency, but a low throughput too. + Boolean msgCbProp = props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR); + if (msgCbProp != null && !msgCbProp) { + LOG.warn( + "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to true since" + + " LOWER_LATENCY mode was selected"); + } + + props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true); + + Integer ackWindowSize = props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE); + if ((ackWindowSize != null && ackWindowSize != STREAMING_PUB_ACK_WINDOW)) { + LOG.warn( + String.format( + "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since" + + " LOWER_LATENCY mode was selected", + STREAMING_PUB_ACK_WINDOW)); + } + + props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, STREAMING_PUB_ACK_WINDOW); + } + return props; + } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java index 7d1dee7a1187..027de2cff134 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java @@ -26,9 +26,8 @@ * queue property and mandates the implementation of a create() method in concrete subclasses. */ public abstract class SessionServiceFactory implements Serializable { - /** - * A reference to a Queue object. This is set when the pipline is constructed (in the {@link + * A reference to a Queue object. This is set when the pipeline is constructed (in the {@link * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method). * This could be used to associate the created SessionService with a specific queue for message * handling. diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java index 285c1cb8a7e8..ec0ae7194686 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java @@ -17,10 +17,11 @@ */ package org.apache.beam.sdk.io.solace; +import com.solacesystems.jcsmp.JCSMPProperties; import org.apache.beam.sdk.io.solace.broker.MessageReceiver; import org.apache.beam.sdk.io.solace.broker.SessionService; -public class MockEmptySessionService implements SessionService { +public class MockEmptySessionService extends SessionService { String exceptionMessage = "This is an empty client, use a MockSessionService instead."; @@ -43,4 +44,9 @@ public MessageReceiver createReceiver() { public void connect() { throw new UnsupportedOperationException(exceptionMessage); } + + @Override + public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) { + throw new UnsupportedOperationException(exceptionMessage); + } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java index 7b14da138c64..207cfef9c62c 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java @@ -18,23 +18,35 @@ package org.apache.beam.sdk.io.solace; import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.JCSMPProperties; import java.io.IOException; import java.io.Serializable; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; import org.apache.beam.sdk.io.solace.broker.MessageReceiver; import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.checkerframework.checker.nullness.qual.Nullable; -public class MockSessionService implements SessionService { +public class MockSessionService extends SessionService { private final SerializableFunction getRecordFn; private MessageReceiver messageReceiver = null; private final int minMessagesReceived; + private final @Nullable SubmissionMode mode; public MockSessionService( - SerializableFunction getRecordFn, int minMessagesReceived) { + SerializableFunction getRecordFn, + int minMessagesReceived, + @Nullable SubmissionMode mode) { this.getRecordFn = getRecordFn; this.minMessagesReceived = minMessagesReceived; + this.mode = mode; + } + + public MockSessionService( + SerializableFunction getRecordFn, int minMessagesReceived) { + this(getRecordFn, minMessagesReceived, null); } @Override @@ -85,4 +97,16 @@ public boolean isEOF() { return counter.get() >= minMessagesReceived; } } + + @Override + public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) { + // Let's override some properties that will be overriden by the connector + // Opposite of the mode, to test that is overriden + baseProperties.setProperty( + JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, mode == SubmissionMode.HIGHER_THROUGHPUT); + + baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, 87); + + return baseProperties; + } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/BasicAuthWriterSessionTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/BasicAuthWriterSessionTest.java new file mode 100644 index 000000000000..e33917641e33 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/BasicAuthWriterSessionTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME; +import static org.junit.Assert.assertEquals; + +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPProperties; +import com.solacesystems.jcsmp.Queue; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BasicAuthWriterSessionTest { + private final String username = "Some Username"; + private final String password = "Some Password"; + private final String host = "Some Host"; + private final String vpn = "Some non default VPN"; + SessionService withVpn; + SessionService withoutVpn; + + @Before + public void setUp() throws Exception { + Queue q = JCSMPFactory.onlyInstance().createQueue("test-queue"); + + BasicAuthJcsmpSessionServiceFactory factoryWithVpn = + BasicAuthJcsmpSessionServiceFactory.builder() + .username(username) + .password(password) + .host(host) + .vpnName(vpn) + .build(); + factoryWithVpn.setQueue(q); + withVpn = factoryWithVpn.create(); + + BasicAuthJcsmpSessionServiceFactory factoryNoVpn = + BasicAuthJcsmpSessionServiceFactory.builder() + .username(username) + .password(password) + .host(host) + .build(); + factoryNoVpn.setQueue(q); + withoutVpn = factoryNoVpn.create(); + } + + @Test + public void testAuthProperties() { + SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT; + JCSMPProperties props = withoutVpn.initializeWriteSessionProperties(mode); + assertEquals(username, props.getStringProperty(JCSMPProperties.USERNAME)); + assertEquals(password, props.getStringProperty(JCSMPProperties.PASSWORD)); + assertEquals(host, props.getStringProperty(JCSMPProperties.HOST)); + assertEquals( + JCSMPProperties.AUTHENTICATION_SCHEME_BASIC, + props.getStringProperty(JCSMPProperties.AUTHENTICATION_SCHEME)); + } + + @Test + public void testVpnNames() { + SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY; + JCSMPProperties propsWithoutVpn = withoutVpn.initializeWriteSessionProperties(mode); + assertEquals(DEFAULT_VPN_NAME, propsWithoutVpn.getStringProperty(JCSMPProperties.VPN_NAME)); + JCSMPProperties propsWithVpn = withVpn.initializeWriteSessionProperties(mode); + assertEquals(vpn, propsWithVpn.getStringProperty(JCSMPProperties.VPN_NAME)); + } + + @Test + public void testOverrideWithHigherThroughput() { + SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT; + JCSMPProperties props = withoutVpn.initializeWriteSessionProperties(mode); + + assertEquals(false, props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR)); + assertEquals( + Long.valueOf(255), + Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE))); + } + + @Test + public void testOverrideWithLowerLatency() { + SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY; + JCSMPProperties props = withoutVpn.initializeWriteSessionProperties(mode); + assertEquals(true, props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR)); + assertEquals( + Long.valueOf(50), + Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE))); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java new file mode 100644 index 000000000000..0c6f88a7c9d5 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import static org.junit.Assert.assertEquals; + +import com.solacesystems.jcsmp.JCSMPProperties; +import org.apache.beam.sdk.io.solace.MockSessionService; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OverrideWriterPropertiesTest { + @Test + public void testOverrideForHigherThroughput() { + SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT; + MockSessionService service = new MockSessionService(null, 0, mode); + + // Test HIGHER_THROUGHPUT mode + JCSMPProperties props = service.initializeWriteSessionProperties(mode); + assertEquals(false, props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR)); + assertEquals( + Long.valueOf(255), + Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE))); + } + + @Test + public void testOverrideForLowerLatency() { + SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY; + MockSessionService service = new MockSessionService(null, 0, mode); + + // Test HIGHER_THROUGHPUT mode + JCSMPProperties props = service.initializeWriteSessionProperties(mode); + assertEquals(true, props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR)); + assertEquals( + Long.valueOf(50), + Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE))); + } +}