From c6a7354b32f522d98f9f7b0aa595fd5161c4b257 Mon Sep 17 00:00:00 2001 From: Israel Herraiz Date: Wed, 13 Nov 2024 17:11:37 +0100 Subject: [PATCH] SolaceIO write connector (#32060) * This is a follow-up PR to #31953, and part of the issue #31905. This PR adds the actual writer functionality, and some additional testing, including integration testing. This should be final PR for the SolaceIO write connector to be complete. * Use static imports for Preconditions * Remove unused method * Logging has builtin formatting support * Use TypeDescriptors to check the type used as input * Fix parameter name * Use interface + utils class for MessageProducer * Use null instead of optional * Avoid using ByteString just to create an empty byte array. * Fix documentation, we are not using ByteString now. * Not needed anymore, we are not using ByteString * Defer transforming latency from nanos to millis. The transform into millis is done at the presentation moment, when the metric is reported to Beam. * Avoid using top level classes with a single inner class. A couple of DoFns are moved to their own files too, as the abstract class forthe UnboundedSolaceWriter was in practice a "package". This commits addresses a few comments about the structure of UnboundedSolaceWriter and some base classes of that abstract class. * Remove using a state variable, there is already a timer. This DoFn is a stateful DoFn to force a shuffling with a given input key set cardinality. * Properties must always be set. The warnings are only shown if the user decided to set the properties that are overriden by the connector. This was changed in one of the previous commits but it is actually a bug. I am reverting that change and changing this to a switch block, to make it more clear that the properties need to be set always by the connector. * Add a new custom mode so no JCSMP property is overridden. This lets the user to fully control all the properties used by the connector, instead of making sensible choices on its behalf. This also adds some logging to be more explicit about what the connector is doing. This does not add too much logging pressure, this only adds logging at the producer creation moment. * Add some more documentation about the new custom submission mode. * Fix bug introduced with the refactoring of code for this PR. I forgot to pass the submission mode when the write session is created, and I called the wrong method in the base class because it was defined as public. This makes sure that the submission mode is passed to the session when the session is created for writing messages. * Remove unnecessary Serializable annotation. * Make the PublishResult class for handling callbacks non-static to handle pipelines with multiple write transforms. * Rename maxNumOfUsedWorkers to numShards * Use RoundRobin assignment of producers to process bundles. * Output results in a GlobalWindow * Add ErrorHandler * Fix docs * Remove PublishResultHandler class that was just a wrapper around a Queue * small refactors * Revert CsvIO docs fix * Add withErrorHandler docs * fix var scope --------- Co-authored-by: Bartosz Zablocki --- CHANGES.md | 1 + sdks/java/io/solace/build.gradle | 1 + .../apache/beam/sdk/io/solace/SolaceIO.java | 201 ++++++++-- .../broker/BasicAuthJcsmpSessionService.java | 150 +++++-- .../BasicAuthJcsmpSessionServiceFactory.java | 22 +- .../GCPSecretSessionServiceFactory.java | 2 +- .../sdk/io/solace/broker/MessageProducer.java | 61 +++ .../solace/broker/MessageProducerUtils.java | 110 ++++++ .../solace/broker/PublishResultHandler.java | 100 +++++ .../sdk/io/solace/broker/SessionService.java | 160 +++++--- .../solace/broker/SessionServiceFactory.java | 64 ++- .../solace/broker/SolaceMessageProducer.java | 87 ++++ .../beam/sdk/io/solace/data/Solace.java | 88 +++-- .../io/solace/read/UnboundedSolaceReader.java | 19 +- .../sdk/io/solace/write/AddShardKeyDoFn.java | 45 +++ .../write/RecordToPublishResultDoFn.java | 41 ++ .../sdk/io/solace/write/SolaceOutput.java | 34 +- .../write/SolaceWriteSessionsHandler.java | 112 ++++++ .../write/UnboundedBatchedSolaceWriter.java | 164 ++++++++ .../solace/write/UnboundedSolaceWriter.java | 373 ++++++++++++++++++ .../write/UnboundedStreamingSolaceWriter.java | 138 +++++++ .../io/solace/MockEmptySessionService.java | 24 +- .../beam/sdk/io/solace/MockProducer.java | 110 ++++++ .../sdk/io/solace/MockSessionService.java | 101 +++-- .../io/solace/MockSessionServiceFactory.java | 68 +++- ...olaceIOTest.java => SolaceIOReadTest.java} | 260 ++++++------ .../beam/sdk/io/solace/SolaceIOWriteTest.java | 208 ++++++++++ .../broker/OverrideWriterPropertiesTest.java | 20 +- .../sdk/io/solace/data/SolaceDataUtils.java | 4 +- .../beam/sdk/io/solace/it/SolaceIOIT.java | 132 ++++++- 30 files changed, 2508 insertions(+), 392 deletions(-) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducerUtils.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/RecordToPublishResultDoFn.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java create mode 100644 sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockProducer.java rename sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/{SolaceIOTest.java => SolaceIOReadTest.java} (72%) create mode 100644 sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java diff --git a/CHANGES.md b/CHANGES.md index c5731bcff313..6962b0fb8ded 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,6 +70,7 @@ * [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) * BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527)) * [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879)) +* Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)). ## New Features / Improvements diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index 741db51a5772..ef0d49891f08 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -53,6 +53,7 @@ dependencies { testImplementation library.java.junit testImplementation project(path: ":sdks:java:io:common") testImplementation project(path: ":sdks:java:testing:test-utils") + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testRuntimeOnly library.java.slf4j_jdk14 testImplementation library.java.testcontainers_solace testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index dcfdcc4fabb9..a55d8a0a4217 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.solace; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -38,16 +39,29 @@ import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.Record; import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper; import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource; +import org.apache.beam.sdk.io.solace.write.AddShardKeyDoFn; import org.apache.beam.sdk.io.solace.write.SolaceOutput; +import org.apache.beam.sdk.io.solace.write.UnboundedBatchedSolaceWriter; +import org.apache.beam.sdk.io.solace.write.UnboundedStreamingSolaceWriter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; @@ -147,7 +161,7 @@ * function. * *
{@code
- * @DefaultSchema(JavaBeanSchema.class)
+ * {@literal @}DefaultSchema(JavaBeanSchema.class)
  * public static class SimpleRecord {
  *    public String payload;
  *    public String messageId;
@@ -238,7 +252,7 @@
  * default VPN name by setting the required JCSMP property in the session factory (in this case,
  * with {@link BasicAuthJcsmpSessionServiceFactory#vpnName()}), the number of clients per worker
  * with {@link Write#withNumberOfClientsPerWorker(int)} and the number of parallel write clients
- * using {@link Write#withMaxNumOfUsedWorkers(int)}.
+ * using {@link Write#withNumShards(int)}.
  *
  * 

Writing to dynamic destinations

* @@ -345,13 +359,17 @@ * *

The streaming connector publishes each message individually, without holding up or batching * before the message is sent to Solace. This will ensure the lowest possible latency, but it will - * offer a much lower throughput. The streaming connector does not use state & timers. + * offer a much lower throughput. The streaming connector does not use state and timers. * - *

Both connectors uses state & timers to control the level of parallelism. If you are using + *

Both connectors uses state and timers to control the level of parallelism. If you are using * Cloud Dataflow, it is recommended that you enable Streaming Engine to use this * connector. * + *

For full control over all the properties, use {@link SubmissionMode#CUSTOM}. The connector + * will not override any property that you set, and you will have full control over all the JCSMP + * properties. + * *

Authentication

* *

When writing to Solace, the user must use {@link @@ -396,7 +414,7 @@ public class SolaceIO { private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false; private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD = Duration.standardSeconds(30); - public static final int DEFAULT_WRITER_MAX_NUMBER_OF_WORKERS = 20; + public static final int DEFAULT_WRITER_NUM_SHARDS = 20; public static final int DEFAULT_WRITER_CLIENTS_PER_WORKER = 4; public static final Boolean DEFAULT_WRITER_PUBLISH_LATENCY_METRICS = false; public static final SubmissionMode DEFAULT_WRITER_SUBMISSION_MODE = @@ -445,6 +463,7 @@ public static Read read() { .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS) .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)); } + /** * Create a {@link Read} transform, to read from Solace. Specify a {@link SerializableFunction} to * map incoming {@link BytesXMLMessage} records, to the object of your choice. You also need to @@ -805,7 +824,9 @@ private Queue initializeQueueForTopicIfNeeded( public enum SubmissionMode { HIGHER_THROUGHPUT, - LOWER_LATENCY + LOWER_LATENCY, + CUSTOM, // Don't override any property set by the user + TESTING // Send acks 1 by 1, this will be very slow, never use this in an actual pipeline! } public enum WriterType { @@ -816,8 +837,9 @@ public enum WriterType { @AutoValue public abstract static class Write extends PTransform, SolaceOutput> { - public static final TupleTag FAILED_PUBLISH_TAG = - new TupleTag() {}; + private static final Logger LOG = LoggerFactory.getLogger(Write.class); + + public static final TupleTag FAILED_PUBLISH_TAG = new TupleTag() {}; public static final TupleTag SUCCESSFUL_PUBLISH_TAG = new TupleTag() {}; @@ -863,8 +885,8 @@ public Write to(Solace.Queue queue) { * cluster, and the need for performance when writing to Solace (more workers will achieve * higher throughput). */ - public Write withMaxNumOfUsedWorkers(int maxNumOfUsedWorkers) { - return toBuilder().setMaxNumOfUsedWorkers(maxNumOfUsedWorkers).build(); + public Write withNumShards(int numShards) { + return toBuilder().setNumShards(numShards).build(); } /** @@ -877,8 +899,8 @@ public Write withMaxNumOfUsedWorkers(int maxNumOfUsedWorkers) { * the number of clients created per VM. The clients will be re-used across different threads in * the same worker. * - *

Set this number in combination with {@link #withMaxNumOfUsedWorkers}, to ensure that the - * limit for number of clients in your Solace cluster is not exceeded. + *

Set this number in combination with {@link #withNumShards}, to ensure that the limit for + * number of clients in your Solace cluster is not exceeded. * *

Normally, using a higher number of clients with fewer workers will achieve better * throughput at a lower cost, since the workers are better utilized. A good rule of thumb to @@ -921,15 +943,19 @@ public Write publishLatencyMetrics() { *

For full details, please check https://docs.solace.com/API/API-Developer-Guide/Java-API-Best-Practices.htm. * - *

The Solace JCSMP client libraries can dispatch messages using two different modes: + *

The Solace JCSMP client libraries can dispatch messages using three different modes: * *

One of the modes dispatches messages directly from the same thread that is doing the rest * of I/O work. This mode favors lower latency but lower throughput. Set this to LOWER_LATENCY * to use that mode (MESSAGE_CALLBACK_ON_REACTOR set to True). * - *

The other mode uses a parallel thread to accumulate and dispatch messages. This mode - * favors higher throughput but also has higher latency. Set this to HIGHER_THROUGHPUT to use - * that mode. This is the default mode (MESSAGE_CALLBACK_ON_REACTOR set to False). + *

Another mode uses a parallel thread to accumulate and dispatch messages. This mode favors + * higher throughput but also has higher latency. Set this to HIGHER_THROUGHPUT to use that + * mode. This is the default mode (MESSAGE_CALLBACK_ON_REACTOR set to False). + * + *

If you prefer to have full control over all the JCSMP properties, set this to CUSTOM, and + * override the classes {@link SessionServiceFactory} and {@link SessionService} to have full + * control on how to create the JCSMP sessions and producers used by the connector. * *

This is optional, the default value is HIGHER_THROUGHPUT. */ @@ -945,10 +971,12 @@ public Write withSubmissionMode(SubmissionMode submissionMode) { *

In streaming mode, the publishing latency will be lower, but the throughput will also be * lower. * - *

With the batched mode, messages are accumulated until a batch size of 50 is reached, or 5 - * seconds have elapsed since the first message in the batch was received. The 50 messages are - * sent to Solace in a single batch. This writer offers higher throughput but higher publishing - * latency, as messages can be held up for up to 5 seconds until they are published. + *

With the batched mode, messages are accumulated until a batch size of 50 is reached, or + * {@link UnboundedBatchedSolaceWriter#ACKS_FLUSHING_INTERVAL_SECS} seconds have elapsed since + * the first message in the batch was received. The 50 messages are sent to Solace in a single + * batch. This writer offers higher throughput but higher publishing latency, as messages can be + * held up for up to {@link UnboundedBatchedSolaceWriter#ACKS_FLUSHING_INTERVAL_SECS}5seconds + * until they are published. * *

Notice that this is the message publishing latency, not the end-to-end latency. For very * large scale pipelines, you will probably prefer to use the HIGHER_THROUGHPUT mode, as with @@ -971,7 +999,20 @@ public Write withSessionServiceFactory(SessionServiceFactory factory) { return toBuilder().setSessionServiceFactory(factory).build(); } - abstract int getMaxNumOfUsedWorkers(); + /** + * An optional error handler for handling records that failed to publish to Solace. + * + *

If provided, this error handler will be invoked for each record that could not be + * successfully published. The error handler can implement custom logic for dealing with failed + * records, such as writing them to a dead-letter queue or logging them. + * + *

If no error handler is provided, failed records will be ignored. + */ + public Write withErrorHandler(ErrorHandler errorHandler) { + return toBuilder().setErrorHandler(errorHandler).build(); + } + + abstract int getNumShards(); abstract int getNumberOfClientsPerWorker(); @@ -989,10 +1030,12 @@ public Write withSessionServiceFactory(SessionServiceFactory factory) { abstract @Nullable SessionServiceFactory getSessionServiceFactory(); + abstract @Nullable ErrorHandler getErrorHandler(); + static Builder builder() { return new AutoValue_SolaceIO_Write.Builder() .setDeliveryMode(DEFAULT_WRITER_DELIVERY_MODE) - .setMaxNumOfUsedWorkers(DEFAULT_WRITER_MAX_NUMBER_OF_WORKERS) + .setNumShards(DEFAULT_WRITER_NUM_SHARDS) .setNumberOfClientsPerWorker(DEFAULT_WRITER_CLIENTS_PER_WORKER) .setPublishLatencyMetrics(DEFAULT_WRITER_PUBLISH_LATENCY_METRICS) .setDispatchMode(DEFAULT_WRITER_SUBMISSION_MODE) @@ -1003,7 +1046,7 @@ static Builder builder() { @AutoValue.Builder abstract static class Builder { - abstract Builder setMaxNumOfUsedWorkers(int maxNumOfUsedWorkers); + abstract Builder setNumShards(int numShards); abstract Builder setNumberOfClientsPerWorker(int numberOfClientsPerWorker); @@ -1021,13 +1064,121 @@ abstract static class Builder { abstract Builder setSessionServiceFactory(SessionServiceFactory factory); + abstract Builder setErrorHandler(ErrorHandler errorHandler); + abstract Write build(); } @Override public SolaceOutput expand(PCollection input) { - // TODO: will be sent in upcoming PR - return SolaceOutput.in(input.getPipeline(), null, null); + boolean usingSolaceRecord = + TypeDescriptor.of(Solace.Record.class) + .isSupertypeOf(checkNotNull(input.getTypeDescriptor())); + + validateWriteTransform(usingSolaceRecord); + + boolean usingDynamicDestinations = getDestination() == null; + SerializableFunction destinationFn; + if (usingDynamicDestinations) { + destinationFn = x -> SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination())); + } else { + // Constant destination for all messages (same topic or queue) + // This should not be non-null, as nulls would have been flagged by the + // validateWriteTransform method + destinationFn = x -> checkNotNull(getDestination()); + } + + @SuppressWarnings("unchecked") + PCollection records = + usingSolaceRecord + ? (PCollection) input + : input.apply( + "Format records", + MapElements.into(TypeDescriptor.of(Solace.Record.class)) + .via(checkNotNull(getFormatFunction()))); + + PCollection withGlobalWindow = + records.apply("Global window", Window.into(new GlobalWindows())); + + PCollection> withShardKeys = + withGlobalWindow.apply("Add shard key", ParDo.of(new AddShardKeyDoFn(getNumShards()))); + + String label = + getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : "Publish (batched)"; + + PCollectionTuple solaceOutput = withShardKeys.apply(label, getWriterTransform(destinationFn)); + + SolaceOutput output; + if (getDeliveryMode() == DeliveryMode.PERSISTENT) { + if (getErrorHandler() != null) { + checkNotNull(getErrorHandler()).addErrorCollection(solaceOutput.get(FAILED_PUBLISH_TAG)); + } + output = SolaceOutput.in(input.getPipeline(), solaceOutput.get(SUCCESSFUL_PUBLISH_TAG)); + } else { + LOG.info( + "Solace.Write: omitting writer output because delivery mode is {}", getDeliveryMode()); + output = SolaceOutput.in(input.getPipeline(), null); + } + + return output; + } + + private ParDo.MultiOutput, Solace.PublishResult> getWriterTransform( + SerializableFunction destinationFn) { + + ParDo.SingleOutput, Solace.PublishResult> writer = + ParDo.of( + getWriterType() == WriterType.STREAMING + ? new UnboundedStreamingSolaceWriter( + destinationFn, + checkNotNull(getSessionServiceFactory()), + getDeliveryMode(), + getDispatchMode(), + getNumberOfClientsPerWorker(), + getPublishLatencyMetrics()) + : new UnboundedBatchedSolaceWriter( + destinationFn, + checkNotNull(getSessionServiceFactory()), + getDeliveryMode(), + getDispatchMode(), + getNumberOfClientsPerWorker(), + getPublishLatencyMetrics())); + + return writer.withOutputTags(SUCCESSFUL_PUBLISH_TAG, TupleTagList.of(FAILED_PUBLISH_TAG)); + } + + /** + * Called before running the Pipeline to verify this transform is fully and correctly specified. + */ + private void validateWriteTransform(boolean usingSolaceRecords) { + if (!usingSolaceRecords) { + checkNotNull( + getFormatFunction(), + "SolaceIO.Write: If you are not using Solace.Record as the input type, you" + + " must set a format function using withFormatFunction()."); + } + + checkArgument( + getNumShards() > 0, "SolaceIO.Write: The number of used workers must be positive."); + checkArgument( + getNumberOfClientsPerWorker() > 0, + "SolaceIO.Write: The number of clients per worker must be positive."); + checkArgument( + getDeliveryMode() == DeliveryMode.DIRECT || getDeliveryMode() == DeliveryMode.PERSISTENT, + String.format( + "SolaceIO.Write: Delivery mode must be either DIRECT or PERSISTENT. %s" + + " not supported", + getDeliveryMode())); + if (getPublishLatencyMetrics()) { + checkArgument( + getDeliveryMode() == DeliveryMode.PERSISTENT, + "SolaceIO.Write: Publish latency metrics can only be enabled for PERSISTENT" + + " delivery mode."); + } + checkNotNull( + getSessionServiceFactory(), + "SolaceIO: You need to pass a session service factory. For basic" + + " authentication, you can use BasicAuthJcsmpSessionServiceFactory."); } } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java index 2137d574b09a..b2196dbf1067 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import com.google.auto.value.AutoValue; import com.solacesystems.jcsmp.ConsumerFlowProperties; import com.solacesystems.jcsmp.EndpointProperties; import com.solacesystems.jcsmp.FlowReceiver; @@ -28,9 +29,15 @@ import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.XMLMessageProducer; import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.annotation.Nullable; import org.apache.beam.sdk.io.solace.RetryCallableManager; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.data.Solace.PublishResult; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; /** @@ -39,34 +46,50 @@ *

This class provides a way to connect to a Solace broker and receive messages from a queue. The * connection is established using basic authentication. */ -public class BasicAuthJcsmpSessionService extends SessionService { - private final String queueName; - private final String host; - private final String username; - private final String password; - private final String vpnName; - @Nullable private JCSMPSession jcsmpSession; - @Nullable private MessageReceiver messageReceiver; - private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); +@AutoValue +public abstract class BasicAuthJcsmpSessionService extends SessionService { + + /** The name of the queue to receive messages from. */ + public abstract @Nullable String queueName(); + + /** The host name or IP address of the Solace broker. Format: Host[:Port] */ + public abstract String host(); + + /** The username to use for authentication. */ + public abstract String username(); + + /** The password to use for authentication. */ + public abstract String password(); + + /** The name of the VPN to connect to. */ + public abstract String vpnName(); + + public static Builder builder() { + return new AutoValue_BasicAuthJcsmpSessionService.Builder().vpnName(DEFAULT_VPN_NAME); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder queueName(@Nullable String queueName); + + public abstract Builder host(String host); - /** - * Creates a new {@link BasicAuthJcsmpSessionService} with the given parameters. - * - * @param queueName The name of the queue to receive messages from. - * @param host The host name or IP address of the Solace broker. Format: Host[:Port] - * @param username The username to use for authentication. - * @param password The password to use for authentication. - * @param vpnName The name of the VPN to connect to. - */ - public BasicAuthJcsmpSessionService( - String queueName, String host, String username, String password, String vpnName) { - this.queueName = queueName; - this.host = host; - this.username = username; - this.password = password; - this.vpnName = vpnName; + public abstract Builder username(String username); + + public abstract Builder password(String password); + + public abstract Builder vpnName(String vpnName); + + public abstract BasicAuthJcsmpSessionService build(); } + @Nullable private transient JCSMPSession jcsmpSession; + @Nullable private transient MessageReceiver messageReceiver; + @Nullable private transient MessageProducer messageProducer; + private final java.util.Queue publishedResultsQueue = + new ConcurrentLinkedQueue<>(); + private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); + @Override public void connect() { retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class)); @@ -79,6 +102,9 @@ public void close() { if (messageReceiver != null) { messageReceiver.close(); } + if (messageProducer != null) { + messageProducer.close(); + } if (!isClosed()) { checkStateNotNull(jcsmpSession).closeSession(); } @@ -88,24 +114,64 @@ public void close() { } @Override - public MessageReceiver createReceiver() { - this.messageReceiver = - retryCallableManager.retryCallable( - this::createFlowReceiver, ImmutableSet.of(JCSMPException.class)); + public MessageReceiver getReceiver() { + if (this.messageReceiver == null) { + this.messageReceiver = + retryCallableManager.retryCallable( + this::createFlowReceiver, ImmutableSet.of(JCSMPException.class)); + } return this.messageReceiver; } + @Override + public MessageProducer getInitializedProducer(SubmissionMode submissionMode) { + if (this.messageProducer == null || this.messageProducer.isClosed()) { + Callable create = () -> createXMLMessageProducer(submissionMode); + this.messageProducer = + retryCallableManager.retryCallable(create, ImmutableSet.of(JCSMPException.class)); + } + return checkStateNotNull(this.messageProducer); + } + + @Override + public java.util.Queue getPublishedResultsQueue() { + return publishedResultsQueue; + } + @Override public boolean isClosed() { return jcsmpSession == null || jcsmpSession.isClosed(); } + private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode) + throws JCSMPException, IOException { + + if (isClosed()) { + connectWriteSession(submissionMode); + } + + @SuppressWarnings("nullness") + Callable initProducer = + () -> + Objects.requireNonNull(jcsmpSession) + .getMessageProducer(new PublishResultHandler(publishedResultsQueue)); + + XMLMessageProducer producer = + retryCallableManager.retryCallable(initProducer, ImmutableSet.of(JCSMPException.class)); + if (producer == null) { + throw new IOException("SolaceIO.Write: Could not create producer, producer object is null"); + } + return new SolaceMessageProducer(producer); + } + private MessageReceiver createFlowReceiver() throws JCSMPException, IOException { if (isClosed()) { connectSession(); } - Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName); + Queue queue = + JCSMPFactory.onlyInstance() + .createQueue(checkStateNotNull(queueName(), "SolaceIO.Read: Queue is not set.")); ConsumerFlowProperties flowProperties = new ConsumerFlowProperties(); flowProperties.setEndpoint(queue); @@ -118,7 +184,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException createFlowReceiver(jcsmpSession, flowProperties, endpointProperties)); } throw new IOException( - "SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null."); + "SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is" + + " null."); } // The `@SuppressWarning` is needed here, because the checkerframework reports an error for the @@ -141,20 +208,33 @@ private int connectSession() throws JCSMPException { return 0; } + private int connectWriteSession(SubmissionMode mode) throws JCSMPException { + if (jcsmpSession == null) { + jcsmpSession = createWriteSessionObject(mode); + } + jcsmpSession.connect(); + return 0; + } + private JCSMPSession createSessionObject() throws InvalidPropertiesException { JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties()); return JCSMPFactory.onlyInstance().createSession(properties); } + private JCSMPSession createWriteSessionObject(SubmissionMode mode) + throws InvalidPropertiesException { + return JCSMPFactory.onlyInstance().createSession(initializeWriteSessionProperties(mode)); + } + @Override public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) { - baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName); + baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName()); baseProps.setProperty( JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC); - baseProps.setProperty(JCSMPProperties.USERNAME, username); - baseProps.setProperty(JCSMPProperties.PASSWORD, password); - baseProps.setProperty(JCSMPProperties.HOST, host); + baseProps.setProperty(JCSMPProperties.USERNAME, username()); + baseProps.setProperty(JCSMPProperties.PASSWORD, password()); + baseProps.setProperty(JCSMPProperties.HOST, host()); return baseProps; } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java index 2084e61b7e38..199dcccee854 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.solace.broker; import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; @@ -31,12 +30,16 @@ */ @AutoValue public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory { + /** The host name or IP address of the Solace broker. Format: Host[:Port] */ public abstract String host(); + /** The username to use for authentication. */ public abstract String username(); + /** The password to use for authentication. */ public abstract String password(); + /** The name of the VPN to connect to. */ public abstract String vpnName(); public static Builder builder() { @@ -54,6 +57,7 @@ public abstract static class Builder { /** Set Solace username. */ public abstract Builder username(String username); + /** Set Solace password. */ public abstract Builder password(String password); @@ -65,11 +69,15 @@ public abstract static class Builder { @Override public SessionService create() { - return new BasicAuthJcsmpSessionService( - checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(), - host(), - username(), - password(), - vpnName()); + BasicAuthJcsmpSessionService.Builder builder = BasicAuthJcsmpSessionService.builder(); + if (queue != null) { + builder = builder.queueName(queue.getName()); + } + return builder + .host(host()) + .username(username()) + .password(password()) + .vpnName(vpnName()) + .build(); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java index dd87e1d75fa5..7f691b46be31 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java @@ -117,7 +117,7 @@ public abstract static class Builder { @Override public SessionService create() { - String password = null; + String password; try { password = retrieveSecret(); } catch (IOException e) { diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java new file mode 100644 index 000000000000..8aa254b92cb1 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import java.util.List; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.transforms.SerializableFunction; + +/** + * Base class for publishing messages to a Solace broker. + * + *

Implementations of this interface are responsible for managing the connection to the broker + * and for publishing messages to the broker. + */ +@Internal +public interface MessageProducer { + + /** Publishes a message to the broker. */ + void publishSingleMessage( + Solace.Record msg, + Destination topicOrQueue, + boolean useCorrelationKeyLatency, + DeliveryMode deliveryMode); + + /** + * Publishes a batch of messages to the broker. + * + *

The size of the batch cannot exceed 50 messages, this is a limitation of the Solace API. + * + *

It returns the number of messages written. + */ + int publishBatch( + List records, + boolean useCorrelationKeyLatency, + SerializableFunction destinationFn, + DeliveryMode deliveryMode); + + /** Returns {@literal true} if the message producer is closed, {@literal false} otherwise. */ + boolean isClosed(); + + /** Closes the message producer. */ + void close(); +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducerUtils.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducerUtils.java new file mode 100644 index 000000000000..dd4610910ff4 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducerUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPSendMultipleEntry; +import java.util.List; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.transforms.SerializableFunction; + +@Internal +public class MessageProducerUtils { + // This is the batch limit supported by the send multiple JCSMP API method. + static final int SOLACE_BATCH_LIMIT = 50; + + /** + * Create a {@link BytesXMLMessage} to be published in Solace. + * + * @param record The record to be published. + * @param useCorrelationKeyLatency Whether to use a complex key for tracking latency. + * @param deliveryMode The {@link DeliveryMode} used to publish the message. + * @return A {@link BytesXMLMessage} that can be sent to Solace "as is". + */ + public static BytesXMLMessage createBytesXMLMessage( + Solace.Record record, boolean useCorrelationKeyLatency, DeliveryMode deliveryMode) { + JCSMPFactory jcsmpFactory = JCSMPFactory.onlyInstance(); + BytesXMLMessage msg = jcsmpFactory.createBytesXMLMessage(); + byte[] payload = record.getPayload(); + msg.writeBytes(payload); + + Long senderTimestamp = record.getSenderTimestamp(); + if (senderTimestamp == null) { + senderTimestamp = System.currentTimeMillis(); + } + msg.setSenderTimestamp(senderTimestamp); + msg.setDeliveryMode(deliveryMode); + if (useCorrelationKeyLatency) { + Solace.CorrelationKey key = + Solace.CorrelationKey.builder() + .setMessageId(record.getMessageId()) + .setPublishMonotonicNanos(System.nanoTime()) + .build(); + msg.setCorrelationKey(key); + } else { + // Use only a string as correlation key + msg.setCorrelationKey(record.getMessageId()); + } + msg.setApplicationMessageId(record.getMessageId()); + return msg; + } + + /** + * Create a {@link JCSMPSendMultipleEntry} array to be published in Solace. This can be used with + * `sendMultiple` to send all the messages in a single API call. + * + *

The size of the list cannot be larger than 50 messages. This is a hard limit enforced by the + * Solace API. + * + * @param records A {@link List} of records to be published + * @param useCorrelationKeyLatency Whether to use a complex key for tracking latency. + * @param destinationFn A function that maps every record to its destination. + * @param deliveryMode The {@link DeliveryMode} used to publish the message. + * @return A {@link JCSMPSendMultipleEntry} array that can be sent to Solace "as is". + */ + public static JCSMPSendMultipleEntry[] createJCSMPSendMultipleEntry( + List records, + boolean useCorrelationKeyLatency, + SerializableFunction destinationFn, + DeliveryMode deliveryMode) { + if (records.size() > SOLACE_BATCH_LIMIT) { + throw new RuntimeException( + String.format( + "SolaceIO.Write: Trying to create a batch of %d, but Solace supports a" + + " maximum of %d. The batch will likely be rejected by Solace.", + records.size(), SOLACE_BATCH_LIMIT)); + } + + JCSMPSendMultipleEntry[] entries = new JCSMPSendMultipleEntry[records.size()]; + for (int i = 0; i < records.size(); i++) { + Solace.Record record = records.get(i); + JCSMPSendMultipleEntry entry = + JCSMPFactory.onlyInstance() + .createSendMultipleEntry( + createBytesXMLMessage(record, useCorrelationKeyLatency, deliveryMode), + destinationFn.apply(record)); + entries[i] = entry; + } + + return entries; + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java new file mode 100644 index 000000000000..1153bfcb7a1c --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler; +import java.util.Queue; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.PublishResult; +import org.apache.beam.sdk.io.solace.write.UnboundedSolaceWriter; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is required to handle callbacks from Solace, to find out if messages were actually + * published or there were any kind of error. + * + *

This class is also used to calculate the latency of the publication. The correlation key + * contains the original timestamp of when the message was sent from the pipeline to Solace. The + * comparison of that value with the clock now, using a monotonic clock, is understood as the + * latency of the publication + */ +public final class PublishResultHandler implements JCSMPStreamingPublishCorrelatingEventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(PublishResultHandler.class); + private final Queue publishResultsQueue; + private final Counter batchesRejectedByBroker = + Metrics.counter(UnboundedSolaceWriter.class, "batches_rejected"); + + public PublishResultHandler(Queue publishResultsQueue) { + this.publishResultsQueue = publishResultsQueue; + } + + @Override + public void handleErrorEx(Object key, JCSMPException cause, long timestamp) { + processKey(key, false, cause); + } + + @Override + public void responseReceivedEx(Object key) { + processKey(key, true, null); + } + + private void processKey(Object key, boolean isPublished, @Nullable JCSMPException cause) { + PublishResult.Builder resultBuilder = PublishResult.builder(); + String messageId; + if (key == null) { + messageId = ""; + } else if (key instanceof Solace.CorrelationKey) { + messageId = ((Solace.CorrelationKey) key).getMessageId(); + long latencyNanos = calculateLatency((Solace.CorrelationKey) key); + resultBuilder = resultBuilder.setLatencyNanos(latencyNanos); + } else { + messageId = key.toString(); + } + + resultBuilder = resultBuilder.setMessageId(messageId).setPublished(isPublished); + if (!isPublished) { + batchesRejectedByBroker.inc(); + if (cause != null) { + resultBuilder = resultBuilder.setError(cause.getMessage()); + } else { + resultBuilder = resultBuilder.setError("NULL - Not set by Solace"); + } + } else if (cause != null) { + LOG.warn( + "Message with id {} is published but exception is populated. Ignoring exception", + messageId); + } + + PublishResult publishResult = resultBuilder.build(); + // Static reference, it receives all callbacks from all publications + // from all threads + publishResultsQueue.add(publishResult); + } + + private static long calculateLatency(Solace.CorrelationKey key) { + long currentMillis = System.nanoTime(); + long publishMillis = key.getPublishMonotonicNanos(); + return currentMillis - publishMillis; + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java index aed700a71ded..84a876a9d0bc 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java @@ -19,7 +19,11 @@ import com.solacesystems.jcsmp.JCSMPProperties; import java.io.Serializable; +import java.util.Queue; import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.data.Solace.PublishResult; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,21 +73,23 @@ *

For basic authentication, use {@link BasicAuthJcsmpSessionService} and {@link * BasicAuthJcsmpSessionServiceFactory}. * - *

For other situations, you need to extend this class. For instance: + *

For other situations, you need to extend this class and implement the `equals` method, so two + * instances of your class can be compared by value. We recommend using AutoValue for that. For + * instance: * *

{@code
+ * {@literal }@AutoValue
  * public class MySessionService extends SessionService {
- *   private final String authToken;
+ *   abstract String authToken();
  *
- *   public MySessionService(String token) {
- *    this.oauthToken = token;
- *    ...
+ *   public static MySessionService create(String authToken) {
+ *       return new AutoValue_MySessionService(authToken);
  *   }
  *
  *   {@literal }@Override
  *   public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
  *     baseProps.setProperty(JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2);
- *     baseProps.setProperty(JCSMPProperties.OAUTH2_ACCESS_TOKEN, authToken);
+ *     baseProps.setProperty(JCSMPProperties.OAUTH2_ACCESS_TOKEN, authToken());
  *     return props;
  *   }
  *
@@ -101,6 +107,7 @@ public abstract class SessionService implements Serializable {
 
   public static final String DEFAULT_VPN_NAME = "default";
 
+  private static final int TESTING_PUB_ACK_WINDOW = 1;
   private static final int STREAMING_PUB_ACK_WINDOW = 50;
   private static final int BATCHED_PUB_ACK_WINDOW = 255;
 
@@ -121,10 +128,25 @@ public abstract class SessionService implements Serializable {
   public abstract boolean isClosed();
 
   /**
-   * Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is
-   * created from the session instance.
+   * Returns a MessageReceiver object for receiving messages from Solace. If it is the first time
+   * this method is used, the receiver is created from the session instance, otherwise it returns
+   * the receiver created initially.
    */
-  public abstract MessageReceiver createReceiver();
+  public abstract MessageReceiver getReceiver();
+
+  /**
+   * Returns a MessageProducer object for publishing messages to Solace. If it is the first time
+   * this method is used, the producer is created from the session instance, otherwise it returns
+   * the producer created initially.
+   */
+  public abstract MessageProducer getInitializedProducer(SubmissionMode mode);
+
+  /**
+   * Returns the {@link Queue} instance associated with this session, with the
+   * asynchronously received callbacks from Solace for message publications. The queue
+   * implementation has to be thread-safe for production use-cases.
+   */
+  public abstract Queue getPublishedResultsQueue();
 
   /**
    * Override this method and provide your specific properties, including all those related to
@@ -147,6 +169,20 @@ public abstract class SessionService implements Serializable {
    */
   public abstract JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties);
 
+  /**
+   * You need to override this method to be able to compare these objects by value. We recommend
+   * using AutoValue for that.
+   */
+  @Override
+  public abstract boolean equals(@Nullable Object other);
+
+  /**
+   * You need to override this method to be able to compare these objects by value. We recommend
+   * using AutoValue for that.
+   */
+  @Override
+  public abstract int hashCode();
+
   /**
    * This method will be called by the write connector when a new session is started.
    *
@@ -186,50 +222,80 @@ private static JCSMPProperties overrideConnectorProperties(
     // received from Solace. A value of 1 will have the lowest latency, but a very low
     // throughput and a monumental backpressure.
 
-    // This controls how the messages are sent to Solace
-    if (mode == SolaceIO.SubmissionMode.HIGHER_THROUGHPUT) {
-      // Create a parallel thread and a queue to send the messages
+    // Retrieve current values of the properties
+    Boolean msgCbProp = props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
+    Integer ackWindowSize = props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
 
-      Boolean msgCbProp = props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
-      if (msgCbProp != null && msgCbProp) {
-        LOG.warn(
-            "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to false since"
-                + " HIGHER_THROUGHPUT mode was selected");
-      }
+    switch (mode) {
+      case HIGHER_THROUGHPUT:
+        // Check if it was set by user, show override warning
+        if (msgCbProp != null && msgCbProp) {
+          LOG.warn(
+              "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to false since"
+                  + " HIGHER_THROUGHPUT mode was selected");
+        }
+        if ((ackWindowSize != null && ackWindowSize != BATCHED_PUB_ACK_WINDOW)) {
+          LOG.warn(
+              String.format(
+                  "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
+                      + " HIGHER_THROUGHPUT mode was selected",
+                  BATCHED_PUB_ACK_WINDOW));
+        }
 
-      props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, false);
+        // Override the properties
+        // Use a dedicated thread for callbacks, increase the ack window size
+        props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, false);
+        props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, BATCHED_PUB_ACK_WINDOW);
+        LOG.info(
+            "SolaceIO.Write: Using HIGHER_THROUGHPUT mode, MESSAGE_CALLBACK_ON_REACTOR is FALSE,"
+                + " PUB_ACK_WINDOW_SIZE is {}",
+            BATCHED_PUB_ACK_WINDOW);
+        break;
+      case LOWER_LATENCY:
+        // Check if it was set by user, show override warning
+        if (msgCbProp != null && !msgCbProp) {
+          LOG.warn(
+              "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to true since"
+                  + " LOWER_LATENCY mode was selected");
+        }
 
-      Integer ackWindowSize = props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
-      if ((ackWindowSize != null && ackWindowSize != BATCHED_PUB_ACK_WINDOW)) {
-        LOG.warn(
-            String.format(
-                "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
-                    + " HIGHER_THROUGHPUT mode was selected",
-                BATCHED_PUB_ACK_WINDOW));
-      }
-      props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, BATCHED_PUB_ACK_WINDOW);
-    } else {
-      // Send from the same thread where the produced is being called. This offers the lowest
-      // latency, but a low throughput too.
-      Boolean msgCbProp = props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
-      if (msgCbProp != null && !msgCbProp) {
-        LOG.warn(
-            "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to true since"
-                + " LOWER_LATENCY mode was selected");
-      }
+        if ((ackWindowSize != null && ackWindowSize != STREAMING_PUB_ACK_WINDOW)) {
+          LOG.warn(
+              String.format(
+                  "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
+                      + " LOWER_LATENCY mode was selected",
+                  STREAMING_PUB_ACK_WINDOW));
+        }
 
-      props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
+        // Override the properties
+        // Send from the same thread where the produced is being called. This offers the lowest
+        // latency, but a low throughput too.
+        props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
+        props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, STREAMING_PUB_ACK_WINDOW);
+        LOG.info(
+            "SolaceIO.Write: Using LOWER_LATENCY mode, MESSAGE_CALLBACK_ON_REACTOR is TRUE,"
+                + " PUB_ACK_WINDOW_SIZE is {}",
+            STREAMING_PUB_ACK_WINDOW);
 
-      Integer ackWindowSize = props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
-      if ((ackWindowSize != null && ackWindowSize != STREAMING_PUB_ACK_WINDOW)) {
+        break;
+      case CUSTOM:
+        LOG.info(
+            " SolaceIO.Write: Using the custom JCSMP properties set by the user. No property has"
+                + " been overridden by the connector.");
+        break;
+      case TESTING:
         LOG.warn(
-            String.format(
-                "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
-                    + " LOWER_LATENCY mode was selected",
-                STREAMING_PUB_ACK_WINDOW));
-      }
-
-      props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, STREAMING_PUB_ACK_WINDOW);
+            "SolaceIO.Write: Overriding JCSMP properties for testing. **IF THIS IS AN"
+                + " ACTUAL PIPELINE, CHANGE THE SUBMISSION MODE TO HIGHER_THROUGHPUT "
+                + "OR LOWER_LATENCY.**");
+        // Minimize multi-threading for testing
+        props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
+        props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, TESTING_PUB_ACK_WINDOW);
+        break;
+      default:
+        LOG.error(
+            "SolaceIO.Write: no submission mode is selected. Set the submission mode to"
+                + " HIGHER_THROUGHPUT or LOWER_LATENCY;");
     }
     return props;
   }
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
index 027de2cff134..bd1f3c23694d 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
@@ -19,11 +19,40 @@
 
 import com.solacesystems.jcsmp.Queue;
 import java.io.Serializable;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
- * This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a
- * queue property and mandates the implementation of a create() method in concrete subclasses.
+ * This abstract class serves as a blueprint for creating `SessionServiceFactory` objects. It
+ * introduces a queue property and mandates the implementation of a create() method in concrete
+ * subclasses.
+ *
+ * 

For basic authentication, use {@link BasicAuthJcsmpSessionServiceFactory}. + * + *

For other situations, you need to extend this class. Classes extending from this abstract + * class must implement the `equals` method so two instances can be compared by value, and not by + * reference. We recommend using AutoValue for that. + * + *

{@code
+ * {@literal @}AutoValue
+ * public abstract class MyFactory implements SessionServiceClientFactory {
+ *
+ *   abstract String value1();
+ *
+ *   abstract String value2();
+ *
+ *   public static MyFactory create(String value1, String value2) {
+ *     return new AutoValue_MyFactory.Builder(value1, value2);
+ *   }
+ *
+ *   ...
+ *
+ *   {@literal @}Override
+ *   public SessionService create() {
+ *     ...
+ *   }
+ * }
+ * }
*/ public abstract class SessionServiceFactory implements Serializable { /** @@ -34,12 +63,32 @@ public abstract class SessionServiceFactory implements Serializable { */ @Nullable Queue queue; + /** + * The write submission mode. This is set when the writers are created. This property is used only + * by the write connector. + */ + @Nullable SubmissionMode submissionMode; + /** * This is the core method that subclasses must implement. It defines how to construct and return * a SessionService object. */ public abstract SessionService create(); + /** + * You need to override this method to be able to compare these objects by value. We recommend + * using AutoValue for that. + */ + @Override + public abstract boolean equals(@Nullable Object other); + + /** + * You need to override this method to be able to compare these objects by value. We recommend + * using AutoValue for that. + */ + @Override + public abstract int hashCode(); + /** * This method is called in the {@link * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method @@ -48,4 +97,15 @@ public abstract class SessionServiceFactory implements Serializable { public void setQueue(Queue queue) { this.queue = queue; } + + /** + * Called by the write connector to set the submission mode used to create the message producers. + */ + public void setSubmissionMode(SubmissionMode submissionMode) { + this.submissionMode = submissionMode; + } + + public @Nullable SubmissionMode getSubmissionMode() { + return submissionMode; + } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java new file mode 100644 index 000000000000..b3806b5afae9 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import static org.apache.beam.sdk.io.solace.broker.MessageProducerUtils.createBytesXMLMessage; +import static org.apache.beam.sdk.io.solace.broker.MessageProducerUtils.createJCSMPSendMultipleEntry; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPSendMultipleEntry; +import com.solacesystems.jcsmp.XMLMessageProducer; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.RetryCallableManager; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; + +@Internal +public class SolaceMessageProducer implements MessageProducer { + + private final XMLMessageProducer producer; + private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); + + public SolaceMessageProducer(XMLMessageProducer producer) { + this.producer = producer; + } + + @Override + public void publishSingleMessage( + Solace.Record record, + Destination topicOrQueue, + boolean useCorrelationKeyLatency, + DeliveryMode deliveryMode) { + BytesXMLMessage msg = createBytesXMLMessage(record, useCorrelationKeyLatency, deliveryMode); + Callable publish = + () -> { + producer.send(msg, topicOrQueue); + return 0; + }; + + retryCallableManager.retryCallable(publish, ImmutableSet.of(JCSMPException.class)); + } + + @Override + public int publishBatch( + List records, + boolean useCorrelationKeyLatency, + SerializableFunction destinationFn, + DeliveryMode deliveryMode) { + JCSMPSendMultipleEntry[] batch = + createJCSMPSendMultipleEntry( + records, useCorrelationKeyLatency, destinationFn, deliveryMode); + Callable publish = () -> producer.sendMultiple(batch, 0, batch.length, 0); + return retryCallableManager.retryCallable(publish, ImmutableSet.of(JCSMPException.class)); + } + + @Override + public boolean isClosed() { + return producer == null || producer.isClosed(); + } + + @Override + public void close() { + if (!isClosed()) { + this.producer.close(); + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java index 00b94b5b9ea9..21274237f46a 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java @@ -21,7 +21,6 @@ import com.solacesystems.jcsmp.BytesXMLMessage; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; @@ -52,6 +51,7 @@ public String getName() { return name; } } + /** Represents a Solace topic. */ public static class Topic { private final String name; @@ -68,6 +68,7 @@ public String getName() { return name; } } + /** Represents a Solace destination type. */ public enum DestinationType { TOPIC, @@ -93,17 +94,17 @@ public abstract static class Destination { */ public abstract DestinationType getType(); - static Builder builder() { + public static Builder builder() { return new AutoValue_Solace_Destination.Builder(); } @AutoValue.Builder - abstract static class Builder { - abstract Builder setName(String name); + public abstract static class Builder { + public abstract Builder setName(String name); - abstract Builder setType(DestinationType type); + public abstract Builder setType(DestinationType type); - abstract Destination build(); + public abstract Destination build(); } } @@ -120,17 +121,19 @@ public abstract static class Record { * @return The message ID, or null if not available. */ @SchemaFieldNumber("0") - public abstract @Nullable String getMessageId(); + public abstract String getMessageId(); /** - * Gets the payload of the message as a ByteString. + * Gets the payload of the message as a byte array. * *

Mapped from {@link BytesXMLMessage#getBytes()} * * @return The message payload. */ + @SuppressWarnings("mutable") @SchemaFieldNumber("1") - public abstract ByteBuffer getPayload(); + public abstract byte[] getPayload(); + /** * Gets the destination (topic or queue) to which the message was sent. * @@ -192,7 +195,7 @@ public abstract static class Record { * @return The timestamp. */ @SchemaFieldNumber("7") - public abstract long getReceiveTimestamp(); + public abstract @Nullable Long getReceiveTimestamp(); /** * Gets the timestamp (in milliseconds since the Unix epoch) when the message was sent by the @@ -241,55 +244,62 @@ public abstract static class Record { public abstract @Nullable String getReplicationGroupMessageId(); /** - * Gets the attachment data of the message as a ByteString, if any. This might represent files + * Gets the attachment data of the message as a byte array, if any. This might represent files * or other binary content associated with the message. * *

Mapped from {@link BytesXMLMessage#getAttachmentByteBuffer()} * - * @return The attachment data, or an empty ByteString if no attachment is present. + * @return The attachment data, or an empty byte array if no attachment is present. */ + @SuppressWarnings("mutable") @SchemaFieldNumber("12") - public abstract ByteBuffer getAttachmentBytes(); + public abstract byte[] getAttachmentBytes(); - static Builder builder() { - return new AutoValue_Solace_Record.Builder(); + public static Builder builder() { + return new AutoValue_Solace_Record.Builder() + .setExpiration(0L) + .setPriority(-1) + .setRedelivered(false) + .setTimeToLive(0) + .setAttachmentBytes(new byte[0]); } @AutoValue.Builder - abstract static class Builder { - abstract Builder setMessageId(@Nullable String messageId); + public abstract static class Builder { + public abstract Builder setMessageId(String messageId); - abstract Builder setPayload(ByteBuffer payload); + public abstract Builder setPayload(byte[] payload); - abstract Builder setDestination(@Nullable Destination destination); + public abstract Builder setDestination(@Nullable Destination destination); - abstract Builder setExpiration(long expiration); + public abstract Builder setExpiration(long expiration); - abstract Builder setPriority(int priority); + public abstract Builder setPriority(int priority); - abstract Builder setRedelivered(boolean redelivered); + public abstract Builder setRedelivered(boolean redelivered); - abstract Builder setReplyTo(@Nullable Destination replyTo); + public abstract Builder setReplyTo(@Nullable Destination replyTo); - abstract Builder setReceiveTimestamp(long receiveTimestamp); + public abstract Builder setReceiveTimestamp(@Nullable Long receiveTimestamp); - abstract Builder setSenderTimestamp(@Nullable Long senderTimestamp); + public abstract Builder setSenderTimestamp(@Nullable Long senderTimestamp); - abstract Builder setSequenceNumber(@Nullable Long sequenceNumber); + public abstract Builder setSequenceNumber(@Nullable Long sequenceNumber); - abstract Builder setTimeToLive(long timeToLive); + public abstract Builder setTimeToLive(long timeToLive); - abstract Builder setReplicationGroupMessageId(@Nullable String replicationGroupMessageId); + public abstract Builder setReplicationGroupMessageId( + @Nullable String replicationGroupMessageId); - abstract Builder setAttachmentBytes(ByteBuffer attachmentBytes); + public abstract Builder setAttachmentBytes(byte[] attachmentBytes); - abstract Record build(); + public abstract Record build(); } } /** * The result of writing a message to Solace. This will be returned by the {@link - * com.google.cloud.dataflow.dce.io.solace.SolaceIO.Write} connector. + * org.apache.beam.sdk.io.solace.SolaceIO.Write} connector. * *

This class provides a builder to create instances, but you will probably not need it. The * write connector will create and return instances of {@link Solace.PublishResult}. @@ -311,12 +321,12 @@ public abstract static class PublishResult { public abstract Boolean getPublished(); /** - * The publishing latency in milliseconds. This is the difference between the time the message + * The publishing latency in nanoseconds. This is the difference between the time the message * was created, and the time the message was published. It is only available if the {@link - * CorrelationKey} class is used as correlation key of the messages. + * CorrelationKey} class is used as correlation key of the messages, and null otherwise. */ @SchemaFieldNumber("2") - public abstract @Nullable Long getLatencyMilliseconds(); + public abstract @Nullable Long getLatencyNanos(); /** The error details if the message could not be published. */ @SchemaFieldNumber("3") @@ -332,7 +342,7 @@ public abstract static class Builder { public abstract Builder setPublished(Boolean published); - public abstract Builder setLatencyMilliseconds(Long latencyMs); + public abstract Builder setLatencyNanos(Long latencyNanos); public abstract Builder setError(String error); @@ -354,7 +364,7 @@ public abstract static class CorrelationKey { public abstract String getMessageId(); @SchemaFieldNumber("1") - public abstract long getPublishMonotonicMillis(); + public abstract long getPublishMonotonicNanos(); public static Builder builder() { return new AutoValue_Solace_CorrelationKey.Builder(); @@ -364,7 +374,7 @@ public static Builder builder() { public abstract static class Builder { public abstract Builder setMessageId(String messageId); - public abstract Builder setPublishMonotonicMillis(long millis); + public abstract Builder setPublishMonotonicNanos(long nanos); public abstract CorrelationKey build(); } @@ -414,7 +424,7 @@ public static class SolaceRecordMapper { Destination destination = getDestination(msg.getCorrelationId(), msg.getDestination()); return Record.builder() .setMessageId(msg.getApplicationMessageId()) - .setPayload(ByteBuffer.wrap(payloadBytesStream.toByteArray())) + .setPayload(payloadBytesStream.toByteArray()) .setDestination(destination) .setExpiration(msg.getExpiration()) .setPriority(msg.getPriority()) @@ -428,7 +438,7 @@ public static class SolaceRecordMapper { msg.getReplicationGroupMessageId() != null ? msg.getReplicationGroupMessageId().toString() : null) - .setAttachmentBytes(ByteBuffer.wrap(attachmentBytesStream.toByteArray())) + .setAttachmentBytes(attachmentBytesStream.toByteArray()) .build(); } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index c18a9d110b2a..a421970370da 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; -import org.apache.beam.sdk.io.solace.broker.MessageReceiver; import org.apache.beam.sdk.io.solace.broker.SempClient; import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -49,7 +48,6 @@ class UnboundedSolaceReader extends UnboundedReader { private final SempClient sempClient; private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; - private @Nullable MessageReceiver messageReceiver; private @Nullable SessionService sessionService; AtomicBoolean active = new AtomicBoolean(true); @@ -72,7 +70,7 @@ public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { @Override public boolean start() { populateSession(); - populateMessageConsumer(); + checkNotNull(sessionService).getReceiver().start(); return advance(); } @@ -85,22 +83,11 @@ public void populateSession() { } } - private void populateMessageConsumer() { - if (messageReceiver == null) { - messageReceiver = checkNotNull(sessionService).createReceiver(); - messageReceiver.start(); - } - MessageReceiver receiver = checkNotNull(messageReceiver); - if (receiver.isClosed()) { - receiver.start(); - } - } - @Override public boolean advance() { BytesXMLMessage receivedXmlMessage; try { - receivedXmlMessage = checkNotNull(messageReceiver).receive(); + receivedXmlMessage = checkNotNull(sessionService).getReceiver().receive(); } catch (IOException e) { LOG.warn("SolaceIO.Read: Exception when pulling messages from the broker.", e); return false; @@ -125,7 +112,7 @@ public void close() { @Override public Instant getWatermark() { // should be only used by a test receiver - if (checkNotNull(messageReceiver).isEOF()) { + if (checkNotNull(sessionService).getReceiver().isEOF()) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } return watermarkPolicy.getWatermark(); diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java new file mode 100644 index 000000000000..12d8a8507d8a --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; + +/** + * This class a pseudo-key with a given cardinality. The downstream steps will use state {@literal + * &} timers to distribute the data and control for the number of parallel workers used for writing. + */ +@Internal +public class AddShardKeyDoFn extends DoFn> { + private final int shardCount; + private int shardKey; + + public AddShardKeyDoFn(int shardCount) { + this.shardCount = shardCount; + shardKey = -1; + } + + @ProcessElement + public void processElement( + @Element Solace.Record record, OutputReceiver> c) { + shardKey = (shardKey + 1) % shardCount; + c.output(KV.of(shardKey, record)); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/RecordToPublishResultDoFn.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/RecordToPublishResultDoFn.java new file mode 100644 index 000000000000..4be5b0a014b3 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/RecordToPublishResultDoFn.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * This class just transforms to PublishResult to be able to capture the windowing with the right + * strategy. The output is not used for anything else. + */ +@Internal +public class RecordToPublishResultDoFn extends DoFn { + @ProcessElement + public void processElement( + @Element Solace.Record record, OutputReceiver receiver) { + Solace.PublishResult result = + Solace.PublishResult.builder() + .setPublished(true) + .setMessageId(record.getMessageId()) + .setLatencyNanos(0L) + .build(); + receiver.output(result); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java index 6c37f879ae7f..d9c37326f83f 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.io.solace.SolaceIO; import org.apache.beam.sdk.io.solace.data.Solace; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; @@ -31,50 +32,33 @@ import org.checkerframework.checker.nullness.qual.Nullable; /** - * The {@link SolaceIO.Write} transform's output return this type, containing both the successful - * publishes ({@link #getSuccessfulPublish()}) and the failed publishes ({@link - * #getFailedPublish()}). + * The {@link SolaceIO.Write} transform's output return this type, containing the successful + * publishes ({@link #getSuccessfulPublish()}). To access failed records, configure the connector + * with {@link SolaceIO.Write#withErrorHandler(ErrorHandler)}. * *

The streaming writer with DIRECT messages does not return anything, and the output {@link - * PCollection}s will be equal to null. + * PCollection} will be equal to null. */ public final class SolaceOutput implements POutput { private final Pipeline pipeline; - private final TupleTag failedPublishTag; private final TupleTag successfulPublishTag; - private final @Nullable PCollection failedPublish; private final @Nullable PCollection successfulPublish; - public @Nullable PCollection getFailedPublish() { - return failedPublish; - } - public @Nullable PCollection getSuccessfulPublish() { return successfulPublish; } public static SolaceOutput in( - Pipeline pipeline, - @Nullable PCollection failedPublish, - @Nullable PCollection successfulPublish) { - return new SolaceOutput( - pipeline, - SolaceIO.Write.FAILED_PUBLISH_TAG, - SolaceIO.Write.SUCCESSFUL_PUBLISH_TAG, - failedPublish, - successfulPublish); + Pipeline pipeline, @Nullable PCollection successfulPublish) { + return new SolaceOutput(pipeline, SolaceIO.Write.SUCCESSFUL_PUBLISH_TAG, successfulPublish); } private SolaceOutput( Pipeline pipeline, - TupleTag failedPublishTag, TupleTag successfulPublishTag, - @Nullable PCollection failedPublish, @Nullable PCollection successfulPublish) { this.pipeline = pipeline; - this.failedPublishTag = failedPublishTag; this.successfulPublishTag = successfulPublishTag; - this.failedPublish = failedPublish; this.successfulPublish = successfulPublish; } @@ -87,10 +71,6 @@ public Pipeline getPipeline() { public Map, PValue> expand() { ImmutableMap.Builder, PValue> builder = ImmutableMap., PValue>builder(); - if (failedPublish != null) { - builder.put(failedPublishTag, failedPublish); - } - if (successfulPublish != null) { builder.put(successfulPublishTag, successfulPublish); } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java new file mode 100644 index 000000000000..109010231d17 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import static org.apache.beam.sdk.io.solace.SolaceIO.DEFAULT_WRITER_CLIENTS_PER_WORKER; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.google.auto.value.AutoValue; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; + +/** + * All the writer threads belonging to the same factory share the same instance of this class, to + * control for the number of clients that are connected to Solace, and minimize problems with quotas + * and limits. + * + *

This class maintains a map of all the session open in a worker, and control the size of that + * map, to avoid creating more sessions than Solace could handle. + * + *

This class is thread-safe and creates a pool of producers per SessionServiceFactory. If there + * is only a Write transform in the pipeline, this is effectively a singleton. If there are more + * than one, each {@link SessionServiceFactory} instance keeps their own pool of producers. + */ +final class SolaceWriteSessionsHandler { + + private static final ConcurrentHashMap sessionsMap = + new ConcurrentHashMap<>(DEFAULT_WRITER_CLIENTS_PER_WORKER); + + public static SessionService getSessionServiceWithProducer( + int producerIndex, SessionServiceFactory sessionServiceFactory, UUID writerTransformUuid) { + SessionConfigurationIndex key = + SessionConfigurationIndex.builder() + .producerIndex(producerIndex) + .sessionServiceFactory(sessionServiceFactory) + .writerTransformUuid(writerTransformUuid) + .build(); + return sessionsMap.computeIfAbsent( + key, SolaceWriteSessionsHandler::createSessionAndStartProducer); + } + + private static SessionService createSessionAndStartProducer(SessionConfigurationIndex key) { + SessionServiceFactory factory = key.sessionServiceFactory(); + SessionService sessionService = factory.create(); + // Start the producer now that the initialization is locked for other threads + SubmissionMode mode = factory.getSubmissionMode(); + checkStateNotNull( + mode, + "SolaceIO.Write: Submission mode is not set. You need to set it to create write sessions."); + sessionService.getInitializedProducer(mode); + return sessionService; + } + + /** Disconnect all the sessions from Solace, and clear the corresponding state. */ + public static void disconnectFromSolace( + SessionServiceFactory factory, int producersCardinality, UUID writerTransformUuid) { + for (int i = 0; i < producersCardinality; i++) { + SessionConfigurationIndex key = + SessionConfigurationIndex.builder() + .producerIndex(i) + .sessionServiceFactory(factory) + .writerTransformUuid(writerTransformUuid) + .build(); + + SessionService sessionService = sessionsMap.remove(key); + if (sessionService != null) { + sessionService.close(); + } + } + } + + @AutoValue + abstract static class SessionConfigurationIndex { + abstract int producerIndex(); + + abstract SessionServiceFactory sessionServiceFactory(); + + abstract UUID writerTransformUuid(); + + static Builder builder() { + return new AutoValue_SolaceWriteSessionsHandler_SessionConfigurationIndex.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder producerIndex(int producerIndex); + + abstract Builder sessionServiceFactory(SessionServiceFactory sessionServiceFactory); + + abstract Builder writerTransformUuid(UUID writerTransformUuid); + + abstract SessionConfigurationIndex build(); + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java new file mode 100644 index 000000000000..dd4f81eeb082 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.Record; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This DoFn is the responsible for writing to Solace in batch mode (holding up any messages), and + * emit the corresponding output (success or fail; only for persistent messages), so the + * SolaceIO.Write connector can be composed with other subsequent transforms in the pipeline. + * + *

The DoFn will create several JCSMP sessions per VM, and the sessions and producers will be + * reused across different threads (if the number of threads is higher than the number of sessions, + * which is probably the most common case). + * + *

The producer uses the JCSMP send multiple mode to publish a batch of messages together with a + * single API call. The acks from this publication are also processed in batch, and returned as the + * output of the DoFn. + * + *

The batch size is 50, and this is currently the maximum value supported by Solace. + * + *

There are no acks if the delivery mode is set to DIRECT. + * + *

This writer DoFn offers higher throughput than {@link UnboundedStreamingSolaceWriter} but also + * higher latency. + */ +@Internal +public final class UnboundedBatchedSolaceWriter extends UnboundedSolaceWriter { + + private static final Logger LOG = LoggerFactory.getLogger(UnboundedBatchedSolaceWriter.class); + + private static final int ACKS_FLUSHING_INTERVAL_SECS = 10; + + private final Counter sentToBroker = + Metrics.counter(UnboundedBatchedSolaceWriter.class, "msgs_sent_to_broker"); + + private final Counter batchesRejectedByBroker = + Metrics.counter(UnboundedSolaceWriter.class, "batches_rejected"); + + // State variables are never explicitly "used" + @SuppressWarnings("UnusedVariable") + @TimerId("bundle_flusher") + private final TimerSpec bundleFlusherTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + public UnboundedBatchedSolaceWriter( + SerializableFunction destinationFn, + SessionServiceFactory sessionServiceFactory, + DeliveryMode deliveryMode, + SubmissionMode submissionMode, + int producersMapCardinality, + boolean publishLatencyMetrics) { + super( + destinationFn, + sessionServiceFactory, + deliveryMode, + submissionMode, + producersMapCardinality, + publishLatencyMetrics); + } + + // The state variable is here just to force a shuffling with a certain cardinality + @ProcessElement + public void processElement( + @Element KV element, + @TimerId("bundle_flusher") Timer bundleFlusherTimer, + @Timestamp Instant timestamp) { + + setCurrentBundleTimestamp(timestamp); + + Solace.Record record = element.getValue(); + + if (record == null) { + LOG.error( + "SolaceIO.Write: Found null record with key {}. Ignoring record.", element.getKey()); + } else { + addToCurrentBundle(record); + // Extend timer for bundle flushing + bundleFlusherTimer + .offset(Duration.standardSeconds(ACKS_FLUSHING_INTERVAL_SECS)) + .setRelative(); + } + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) throws IOException { + // Take messages in groups of 50 (if there are enough messages) + List currentBundle = getCurrentBundle(); + for (int i = 0; i < currentBundle.size(); i += SOLACE_BATCH_LIMIT) { + int toIndex = Math.min(i + SOLACE_BATCH_LIMIT, currentBundle.size()); + List batch = currentBundle.subList(i, toIndex); + if (batch.isEmpty()) { + continue; + } + publishBatch(batch); + } + getCurrentBundle().clear(); + + publishResults(BeamContextWrapper.of(context)); + } + + @OnTimer("bundle_flusher") + public void flushBundle(OnTimerContext context) throws IOException { + publishResults(BeamContextWrapper.of(context)); + } + + private void publishBatch(List records) { + try { + int entriesPublished = + solaceSessionServiceWithProducer() + .getInitializedProducer(getSubmissionMode()) + .publishBatch( + records, shouldPublishLatencyMetrics(), getDestinationFn(), getDeliveryMode()); + sentToBroker.inc(entriesPublished); + } catch (Exception e) { + batchesRejectedByBroker.inc(); + Solace.PublishResult errorPublish = + Solace.PublishResult.builder() + .setPublished(false) + .setMessageId(String.format("BATCH_OF_%d_ENTRIES", records.size())) + .setError( + String.format( + "Batch could not be published after several" + " retries. Error: %s", + e.getMessage())) + .setLatencyNanos(System.nanoTime()) + .build(); + solaceSessionServiceWithProducer().getPublishedResultsQueue().add(errorPublish); + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java new file mode 100644 index 000000000000..1c98113c2416 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import static org.apache.beam.sdk.io.solace.SolaceIO.Write.FAILED_PUBLISH_TAG; +import static org.apache.beam.sdk.io.solace.SolaceIO.Write.SUCCESSFUL_PUBLISH_TAG; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPSendMultipleEntry; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.PublishResult; +import org.apache.beam.sdk.io.solace.data.Solace.Record; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This DoFn encapsulates common code used both for the {@link UnboundedBatchedSolaceWriter} and + * {@link UnboundedStreamingSolaceWriter}. + */ +@Internal +public abstract class UnboundedSolaceWriter + extends DoFn, Solace.PublishResult> { + + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSolaceWriter.class); + + // This is the batch limit supported by the send multiple JCSMP API method. + static final int SOLACE_BATCH_LIMIT = 50; + private final Distribution latencyPublish = + Metrics.distribution(SolaceIO.Write.class, "latency_publish_ms"); + + private final Distribution latencyErrors = + Metrics.distribution(SolaceIO.Write.class, "latency_failed_ms"); + + private final SerializableFunction destinationFn; + + private final SessionServiceFactory sessionServiceFactory; + private final DeliveryMode deliveryMode; + private final SubmissionMode submissionMode; + private final int producersMapCardinality; + private final boolean publishLatencyMetrics; + private static final AtomicInteger bundleProducerIndexCounter = new AtomicInteger(); + private int currentBundleProducerIndex = 0; + + private final List batchToEmit; + + private @Nullable Instant bundleTimestamp; + + final UUID writerTransformUuid = UUID.randomUUID(); + + public UnboundedSolaceWriter( + SerializableFunction destinationFn, + SessionServiceFactory sessionServiceFactory, + DeliveryMode deliveryMode, + SubmissionMode submissionMode, + int producersMapCardinality, + boolean publishLatencyMetrics) { + this.destinationFn = destinationFn; + this.sessionServiceFactory = sessionServiceFactory; + // Make sure that we set the submission mode now that we know which mode has been set by the + // user. + this.sessionServiceFactory.setSubmissionMode(submissionMode); + this.deliveryMode = deliveryMode; + this.submissionMode = submissionMode; + this.producersMapCardinality = producersMapCardinality; + this.publishLatencyMetrics = publishLatencyMetrics; + this.batchToEmit = new ArrayList<>(); + } + + @Teardown + public void teardown() { + SolaceWriteSessionsHandler.disconnectFromSolace( + sessionServiceFactory, producersMapCardinality, writerTransformUuid); + } + + public void updateProducerIndex() { + currentBundleProducerIndex = + bundleProducerIndexCounter.getAndIncrement() % producersMapCardinality; + } + + @StartBundle + public void startBundle() { + // Pick a producer at random for this bundle, reuse for the whole bundle + updateProducerIndex(); + batchToEmit.clear(); + } + + public SessionService solaceSessionServiceWithProducer() { + return SolaceWriteSessionsHandler.getSessionServiceWithProducer( + currentBundleProducerIndex, sessionServiceFactory, writerTransformUuid); + } + + public void publishResults(BeamContextWrapper context) { + long sumPublish = 0; + long countPublish = 0; + long minPublish = Long.MAX_VALUE; + long maxPublish = 0; + + long sumFailed = 0; + long countFailed = 0; + long minFailed = Long.MAX_VALUE; + long maxFailed = 0; + + Queue publishResultsQueue = + solaceSessionServiceWithProducer().getPublishedResultsQueue(); + Solace.PublishResult result = publishResultsQueue.poll(); + + if (result != null) { + if (getCurrentBundleTimestamp() == null) { + setCurrentBundleTimestamp(Instant.now()); + } + } + + while (result != null) { + Long latency = result.getLatencyNanos(); + + if (latency == null && shouldPublishLatencyMetrics()) { + LOG.error( + "SolaceIO.Write: Latency is null but user asked for latency metrics." + + " This may be a bug."); + } + + if (latency != null) { + if (result.getPublished()) { + sumPublish += latency; + countPublish++; + minPublish = Math.min(minPublish, latency); + maxPublish = Math.max(maxPublish, latency); + } else { + sumFailed += latency; + countFailed++; + minFailed = Math.min(minFailed, latency); + maxFailed = Math.max(maxFailed, latency); + } + } + if (result.getPublished()) { + context.output( + SUCCESSFUL_PUBLISH_TAG, result, getCurrentBundleTimestamp(), GlobalWindow.INSTANCE); + } else { + try { + BadRecord b = + BadRecord.fromExceptionInformation( + result, + null, + null, + Optional.ofNullable(result.getError()).orElse("SolaceIO.Write: unknown error.")); + context.output(FAILED_PUBLISH_TAG, b, getCurrentBundleTimestamp(), GlobalWindow.INSTANCE); + } catch (IOException e) { + // ignore, the exception is thrown when the exception argument in the + // `BadRecord.fromExceptionInformation` is not null. + } + } + + result = publishResultsQueue.poll(); + } + + if (shouldPublishLatencyMetrics()) { + // Report all latency value in milliseconds + if (countPublish > 0) { + getPublishLatencyMetric() + .update( + TimeUnit.NANOSECONDS.toMillis(sumPublish), + countPublish, + TimeUnit.NANOSECONDS.toMillis(minPublish), + TimeUnit.NANOSECONDS.toMillis(maxPublish)); + } + + if (countFailed > 0) { + getFailedLatencyMetric() + .update( + TimeUnit.NANOSECONDS.toMillis(sumFailed), + countFailed, + TimeUnit.NANOSECONDS.toMillis(minFailed), + TimeUnit.NANOSECONDS.toMillis(maxFailed)); + } + } + } + + public BytesXMLMessage createSingleMessage( + Solace.Record record, boolean useCorrelationKeyLatency) { + JCSMPFactory jcsmpFactory = JCSMPFactory.onlyInstance(); + BytesXMLMessage msg = jcsmpFactory.createBytesXMLMessage(); + byte[] payload = record.getPayload(); + msg.writeBytes(payload); + + Long senderTimestamp = record.getSenderTimestamp(); + if (senderTimestamp == null) { + LOG.error( + "SolaceIO.Write: Record with id {} has no sender timestamp. Using current" + + " worker clock as timestamp.", + record.getMessageId()); + senderTimestamp = System.currentTimeMillis(); + } + msg.setSenderTimestamp(senderTimestamp); + msg.setDeliveryMode(getDeliveryMode()); + if (useCorrelationKeyLatency) { + Solace.CorrelationKey key = + Solace.CorrelationKey.builder() + .setMessageId(record.getMessageId()) + .setPublishMonotonicNanos(System.nanoTime()) + .build(); + msg.setCorrelationKey(key); + } else { + // Use only a string as correlation key + msg.setCorrelationKey(record.getMessageId()); + } + msg.setApplicationMessageId(record.getMessageId()); + return msg; + } + + public JCSMPSendMultipleEntry[] createMessagesArray( + Iterable records, boolean useCorrelationKeyLatency) { + // Solace batch publishing only supports 50 elements max, so it is safe to convert to + // list here + ArrayList recordsList = Lists.newArrayList(records); + if (recordsList.size() > SOLACE_BATCH_LIMIT) { + LOG.error( + "SolaceIO.Write: Trying to create a batch of {}, but Solace supports a" + + " maximum of {}. The batch will likely be rejected by Solace.", + recordsList.size(), + SOLACE_BATCH_LIMIT); + } + + JCSMPSendMultipleEntry[] entries = new JCSMPSendMultipleEntry[recordsList.size()]; + for (int i = 0; i < recordsList.size(); i++) { + Solace.Record record = recordsList.get(i); + JCSMPSendMultipleEntry entry = + JCSMPFactory.onlyInstance() + .createSendMultipleEntry( + createSingleMessage(record, useCorrelationKeyLatency), + getDestinationFn().apply(record)); + entries[i] = entry; + } + + return entries; + } + + public int getProducersMapCardinality() { + return producersMapCardinality; + } + + public Distribution getPublishLatencyMetric() { + return latencyPublish; + } + + public Distribution getFailedLatencyMetric() { + return latencyErrors; + } + + public boolean shouldPublishLatencyMetrics() { + return publishLatencyMetrics; + } + + public SerializableFunction getDestinationFn() { + return destinationFn; + } + + public DeliveryMode getDeliveryMode() { + return deliveryMode; + } + + public SubmissionMode getSubmissionMode() { + return submissionMode; + } + + public void addToCurrentBundle(Solace.Record record) { + batchToEmit.add(record); + } + + public List getCurrentBundle() { + return batchToEmit; + } + + public @Nullable Instant getCurrentBundleTimestamp() { + return bundleTimestamp; + } + + public void setCurrentBundleTimestamp(Instant bundleTimestamp) { + if (this.bundleTimestamp == null || bundleTimestamp.isBefore(this.bundleTimestamp)) { + this.bundleTimestamp = bundleTimestamp; + } + } + + /** + * Since we need to publish from on timer methods and finish bundle methods, we need a consistent + * way to handle both WindowedContext and FinishBundleContext. + */ + static class BeamContextWrapper { + private @Nullable WindowedContext windowedContext; + private @Nullable FinishBundleContext finishBundleContext; + + private BeamContextWrapper() {} + + public static BeamContextWrapper of(WindowedContext windowedContext) { + BeamContextWrapper beamContextWrapper = new BeamContextWrapper(); + beamContextWrapper.windowedContext = windowedContext; + return beamContextWrapper; + } + + public static BeamContextWrapper of(FinishBundleContext finishBundleContext) { + BeamContextWrapper beamContextWrapper = new BeamContextWrapper(); + beamContextWrapper.finishBundleContext = finishBundleContext; + return beamContextWrapper; + } + + public void output( + TupleTag tag, + T output, + @Nullable Instant timestamp, // Not required for windowed context + @Nullable BoundedWindow window) { // Not required for windowed context + if (windowedContext != null) { + windowedContext.output(tag, output); + } else if (finishBundleContext != null) { + if (timestamp == null) { + throw new IllegalStateException( + "SolaceIO.Write.UnboundedSolaceWriter.Context: Timestamp is required for a" + + " FinishBundleContext."); + } + if (window == null) { + throw new IllegalStateException( + "SolaceIO.Write.UnboundedSolaceWriter.Context: BoundedWindow is required for a" + + " FinishBundleContext."); + } + finishBundleContext.output(tag, output, timestamp, window); + } else { + throw new IllegalStateException( + "SolaceIO.Write.UnboundedSolaceWriter.Context: No context provided"); + } + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java new file mode 100644 index 000000000000..6d6d0b27e2bb --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This DoFn is the responsible for writing to Solace in streaming mode (one message at a time, not + * holding up any message), and emit the corresponding output (success or fail; only for persistent + * messages), so the SolaceIO.Write connector can be composed with other subsequent transforms in + * the pipeline. + * + *

The DoFn will create several JCSMP sessions per VM, and the sessions and producers will be + * reused across different threads (if the number of threads is higher than the number of sessions, + * which is probably the most common case). + * + *

The producer uses the JCSMP streaming mode to publish a single message at a time, processing + * the acks from this publication, and returning them as output of the DoFn. + * + *

There are no acks if the delivery mode is set to DIRECT. + * + *

This writer DoFn offers lower latency and lower throughput than {@link + * UnboundedBatchedSolaceWriter}. + */ +@Internal +public final class UnboundedStreamingSolaceWriter extends UnboundedSolaceWriter { + + private static final Logger LOG = LoggerFactory.getLogger(UnboundedStreamingSolaceWriter.class); + + private final Counter sentToBroker = + Metrics.counter(UnboundedStreamingSolaceWriter.class, "msgs_sent_to_broker"); + + private final Counter rejectedByBroker = + Metrics.counter(UnboundedStreamingSolaceWriter.class, "msgs_rejected_by_broker"); + + // We use a state variable to force a shuffling and ensure the cardinality of the processing + @SuppressWarnings("UnusedVariable") + @StateId("current_key") + private final StateSpec> currentKeySpec = StateSpecs.value(); + + public UnboundedStreamingSolaceWriter( + SerializableFunction destinationFn, + SessionServiceFactory sessionServiceFactory, + DeliveryMode deliveryMode, + SolaceIO.SubmissionMode submissionMode, + int producersMapCardinality, + boolean publishLatencyMetrics) { + super( + destinationFn, + sessionServiceFactory, + deliveryMode, + submissionMode, + producersMapCardinality, + publishLatencyMetrics); + } + + @ProcessElement + public void processElement( + @Element KV element, + @Timestamp Instant timestamp, + @AlwaysFetched @StateId("current_key") ValueState currentKeyState) { + + setCurrentBundleTimestamp(timestamp); + + Integer currentKey = currentKeyState.read(); + Integer elementKey = element.getKey(); + Solace.Record record = element.getValue(); + + if (currentKey == null || !currentKey.equals(elementKey)) { + currentKeyState.write(elementKey); + } + + if (record == null) { + LOG.error("SolaceIO.Write: Found null record with key {}. Ignoring record.", elementKey); + return; + } + + // The publish method will retry, let's send a failure message if all the retries fail + try { + solaceSessionServiceWithProducer() + .getInitializedProducer(getSubmissionMode()) + .publishSingleMessage( + record, + getDestinationFn().apply(record), + shouldPublishLatencyMetrics(), + getDeliveryMode()); + sentToBroker.inc(); + } catch (Exception e) { + rejectedByBroker.inc(); + Solace.PublishResult errorPublish = + Solace.PublishResult.builder() + .setPublished(false) + .setMessageId(record.getMessageId()) + .setError( + String.format( + "Message could not be published after several" + " retries. Error: %s", + e.getMessage())) + .setLatencyNanos(System.nanoTime()) + .build(); + solaceSessionServiceWithProducer().getPublishedResultsQueue().add(errorPublish); + } + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) { + publishResults(BeamContextWrapper.of(context)); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java index ec0ae7194686..38b4953a5984 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java @@ -17,14 +17,24 @@ */ package org.apache.beam.sdk.io.solace; +import com.google.auto.value.AutoValue; import com.solacesystems.jcsmp.JCSMPProperties; +import java.util.Queue; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.broker.MessageProducer; import org.apache.beam.sdk.io.solace.broker.MessageReceiver; import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.io.solace.data.Solace.PublishResult; -public class MockEmptySessionService extends SessionService { +@AutoValue +public abstract class MockEmptySessionService extends SessionService { String exceptionMessage = "This is an empty client, use a MockSessionService instead."; + public static MockEmptySessionService create() { + return new AutoValue_MockEmptySessionService(); + } + @Override public void close() { throw new UnsupportedOperationException(exceptionMessage); @@ -36,7 +46,17 @@ public boolean isClosed() { } @Override - public MessageReceiver createReceiver() { + public MessageReceiver getReceiver() { + throw new UnsupportedOperationException(exceptionMessage); + } + + @Override + public MessageProducer getInitializedProducer(SubmissionMode mode) { + throw new UnsupportedOperationException(exceptionMessage); + } + + @Override + public Queue getPublishedResultsQueue() { throw new UnsupportedOperationException(exceptionMessage); } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockProducer.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockProducer.java new file mode 100644 index 000000000000..271310359577 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockProducer.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.JCSMPException; +import java.time.Instant; +import java.util.List; +import org.apache.beam.sdk.io.solace.broker.MessageProducer; +import org.apache.beam.sdk.io.solace.broker.PublishResultHandler; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.Record; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public abstract class MockProducer implements MessageProducer { + final PublishResultHandler handler; + + public MockProducer(PublishResultHandler handler) { + this.handler = handler; + } + + @Override + public int publishBatch( + List records, + boolean useCorrelationKeyLatency, + SerializableFunction destinationFn, + DeliveryMode deliveryMode) { + for (Record record : records) { + this.publishSingleMessage( + record, destinationFn.apply(record), useCorrelationKeyLatency, deliveryMode); + } + return records.size(); + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void close() {} + + public static class MockSuccessProducer extends MockProducer { + public MockSuccessProducer(PublishResultHandler handler) { + super(handler); + } + + @Override + public void publishSingleMessage( + Record msg, + Destination topicOrQueue, + boolean useCorrelationKeyLatency, + DeliveryMode deliveryMode) { + if (useCorrelationKeyLatency) { + handler.responseReceivedEx( + Solace.PublishResult.builder() + .setPublished(true) + .setMessageId(msg.getMessageId()) + .build()); + } else { + handler.responseReceivedEx(msg.getMessageId()); + } + } + } + + public static class MockFailedProducer extends MockProducer { + public MockFailedProducer(PublishResultHandler handler) { + super(handler); + } + + @Override + public void publishSingleMessage( + Record msg, + Destination topicOrQueue, + boolean useCorrelationKeyLatency, + DeliveryMode deliveryMode) { + if (useCorrelationKeyLatency) { + handler.handleErrorEx( + Solace.PublishResult.builder() + .setPublished(false) + .setMessageId(msg.getMessageId()) + .setError("Some error") + .build(), + new JCSMPException("Some JCSMPException"), + Instant.now().toEpochMilli()); + } else { + handler.handleErrorEx( + msg.getMessageId(), + new JCSMPException("Some JCSMPException"), + Instant.now().toEpochMilli()); + } + } + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java index a4d6a42ef302..bd52dee7ea86 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java @@ -17,38 +17,63 @@ */ package org.apache.beam.sdk.io.solace; +import com.google.auto.value.AutoValue; import com.solacesystems.jcsmp.BytesXMLMessage; import com.solacesystems.jcsmp.JCSMPProperties; import java.io.IOException; -import java.io.Serializable; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import org.apache.beam.sdk.io.solace.MockProducer.MockSuccessProducer; import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.broker.MessageProducer; import org.apache.beam.sdk.io.solace.broker.MessageReceiver; +import org.apache.beam.sdk.io.solace.broker.PublishResultHandler; import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.io.solace.data.Solace.PublishResult; import org.apache.beam.sdk.transforms.SerializableFunction; import org.checkerframework.checker.nullness.qual.Nullable; -public class MockSessionService extends SessionService { +@AutoValue +public abstract class MockSessionService extends SessionService { + public static int ackWindowSizeForTesting = 87; + public static boolean callbackOnReactor = true; - private final SerializableFunction getRecordFn; - private MessageReceiver messageReceiver = null; - private final int minMessagesReceived; - private final @Nullable SubmissionMode mode; - - public MockSessionService( - SerializableFunction getRecordFn, - int minMessagesReceived, - @Nullable SubmissionMode mode) { - this.getRecordFn = getRecordFn; - this.minMessagesReceived = minMessagesReceived; - this.mode = mode; + public abstract @Nullable SerializableFunction recordFn(); + + public abstract int minMessagesReceived(); + + public abstract @Nullable SubmissionMode mode(); + + public abstract Function mockProducerFn(); + + private final Queue publishedResultsReceiver = new ConcurrentLinkedQueue<>(); + + public static Builder builder() { + return new AutoValue_MockSessionService.Builder() + .minMessagesReceived(0) + .mockProducerFn(MockSuccessProducer::new); } - public MockSessionService( - SerializableFunction getRecordFn, int minMessagesReceived) { - this(getRecordFn, minMessagesReceived, null); + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder recordFn( + @Nullable SerializableFunction recordFn); + + public abstract Builder minMessagesReceived(int minMessagesReceived); + + public abstract Builder mode(@Nullable SubmissionMode mode); + + public abstract Builder mockProducerFn( + Function mockProducerFn); + + public abstract MockSessionService build(); } + private MessageReceiver messageReceiver = null; + private MockProducer messageProducer = null; + @Override public void close() {} @@ -58,17 +83,41 @@ public boolean isClosed() { } @Override - public MessageReceiver createReceiver() { + public MessageReceiver getReceiver() { if (messageReceiver == null) { - messageReceiver = new MockReceiver(getRecordFn, minMessagesReceived); + messageReceiver = new MockReceiver(recordFn(), minMessagesReceived()); } return messageReceiver; } + @Override + public MessageProducer getInitializedProducer(SubmissionMode mode) { + if (messageProducer == null) { + messageProducer = mockProducerFn().apply(new PublishResultHandler(publishedResultsReceiver)); + } + return messageProducer; + } + + @Override + public Queue getPublishedResultsQueue() { + return publishedResultsReceiver; + } + @Override public void connect() {} - public static class MockReceiver implements MessageReceiver, Serializable { + @Override + public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) { + // Let's override some properties that will be overriden by the connector + // Opposite of the mode, to test that is overriden + baseProperties.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, callbackOnReactor); + + baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, ackWindowSizeForTesting); + + return baseProperties; + } + + public static class MockReceiver implements MessageReceiver { private final AtomicInteger counter = new AtomicInteger(); private final SerializableFunction getRecordFn; private final int minMessagesReceived; @@ -100,16 +149,4 @@ public boolean isEOF() { return counter.get() >= minMessagesReceived; } } - - @Override - public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) { - // Let's override some properties that will be overriden by the connector - // Opposite of the mode, to test that is overriden - baseProperties.setProperty( - JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, mode == SubmissionMode.HIGHER_THROUGHPUT); - - baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, 87); - - return baseProperties; - } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java index 603a30ad2c90..9c17ca604201 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java @@ -17,22 +17,78 @@ */ package org.apache.beam.sdk.io.solace; +import com.google.auto.value.AutoValue; +import com.solacesystems.jcsmp.BytesXMLMessage; +import org.apache.beam.sdk.io.solace.MockProducer.MockFailedProducer; +import org.apache.beam.sdk.io.solace.MockProducer.MockSuccessProducer; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.checkerframework.checker.nullness.qual.Nullable; -public class MockSessionServiceFactory extends SessionServiceFactory { - SessionService sessionService; +@AutoValue +public abstract class MockSessionServiceFactory extends SessionServiceFactory { + public abstract @Nullable SubmissionMode mode(); - public MockSessionServiceFactory(SessionService clientService) { - this.sessionService = clientService; + public abstract @Nullable SerializableFunction recordFn(); + + public abstract int minMessagesReceived(); + + public abstract SessionServiceType sessionServiceType(); + + public static Builder builder() { + return new AutoValue_MockSessionServiceFactory.Builder() + .minMessagesReceived(0) + .sessionServiceType(SessionServiceType.WITH_SUCCEEDING_PRODUCER); } public static SessionServiceFactory getDefaultMock() { - return new MockSessionServiceFactory(new MockEmptySessionService()); + return MockSessionServiceFactory.builder().build(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder mode(@Nullable SubmissionMode mode); + + public abstract Builder recordFn( + @Nullable SerializableFunction recordFn); + + public abstract Builder minMessagesReceived(int minMessagesReceived); + + public abstract Builder sessionServiceType(SessionServiceType sessionServiceType); + + public abstract MockSessionServiceFactory build(); } @Override public SessionService create() { - return sessionService; + switch (sessionServiceType()) { + case EMPTY: + return MockEmptySessionService.create(); + case WITH_SUCCEEDING_PRODUCER: + return MockSessionService.builder() + .recordFn(recordFn()) + .minMessagesReceived(minMessagesReceived()) + .mode(mode()) + .mockProducerFn(MockSuccessProducer::new) + .build(); + case WITH_FAILING_PRODUCER: + return MockSessionService.builder() + .recordFn(recordFn()) + .minMessagesReceived(minMessagesReceived()) + .mode(mode()) + .mockProducerFn(MockFailedProducer::new) + .build(); + default: + throw new RuntimeException( + String.format("Unknown sessionServiceType: %s", sessionServiceType().name())); + } + } + + public enum SessionServiceType { + EMPTY, + WITH_SUCCEEDING_PRODUCER, + WITH_FAILING_PRODUCER } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java similarity index 72% rename from sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java rename to sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index cc1fa1d667aa..c718c55e1b48 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -31,10 +31,12 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.io.solace.MockSessionServiceFactory.SessionServiceType; import org.apache.beam.sdk.io.solace.SolaceIO.Read; import org.apache.beam.sdk.io.solace.SolaceIO.Read.Configuration; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; @@ -49,6 +51,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -61,7 +64,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class SolaceIOTest { +public class SolaceIOReadTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -69,7 +72,6 @@ private Read getDefaultRead() { return SolaceIO.read() .from(Solace.Queue.fromName("queue")) .withSempClientFactory(MockSempClientFactory.getDefaultMock()) - .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()) .withMaxNumConnections(1); } @@ -77,7 +79,6 @@ private Read getDefaultReadForTopic() { return SolaceIO.read() .from(Solace.Topic.fromName("topic")) .withSempClientFactory(MockSempClientFactory.getDefaultMock()) - .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()) .withMaxNumConnections(1); } @@ -102,20 +103,18 @@ private static UnboundedSolaceSource getSource(Read spec, TestPi @Test public void testReadMessages() { // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), - SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), - SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); - return getOrNull(index, messages); - }, - 3); + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().minMessagesReceived(3).recordFn(recordFn).build(); // Expected data List expected = new ArrayList<>(); @@ -137,20 +136,18 @@ public void testReadMessages() { @Test public void testReadMessagesWithDeduplication() { // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), - SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), - SolaceDataUtils.getBytesXmlMessage("payload_test2", "451")); - return getOrNull(index, messages); - }, - 3); + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "451")); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build(); // Expected data List expected = new ArrayList<>(); @@ -172,19 +169,18 @@ public void testReadMessagesWithDeduplication() { @Test public void testReadMessagesWithoutDeduplication() { // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), - SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), - SolaceDataUtils.getBytesXmlMessage("payload_test2", "451")); - return getOrNull(index, messages); - }, - 3); + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "451")); + return getOrNull(index, messages); + }; + SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build(); // Expected data List expected = new ArrayList<>(); @@ -206,32 +202,38 @@ public void testReadMessagesWithoutDeduplication() { @Test public void testReadMessagesWithDeduplicationOnReplicationGroupMessageId() { // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage( - "payload_test0", null, null, new ReplicationGroupMessageIdImpl(2L, 1L)), - SolaceDataUtils.getBytesXmlMessage( - "payload_test1", null, null, new ReplicationGroupMessageIdImpl(2L, 2L)), - SolaceDataUtils.getBytesXmlMessage( - "payload_test2", null, null, new ReplicationGroupMessageIdImpl(2L, 2L))); - return getOrNull(index, messages); - }, - 3); + + String id0 = UUID.randomUUID().toString(); + String id1 = UUID.randomUUID().toString(); + String id2 = UUID.randomUUID().toString(); + + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage( + "payload_test0", id0, null, new ReplicationGroupMessageIdImpl(2L, 1L)), + SolaceDataUtils.getBytesXmlMessage( + "payload_test1", id1, null, new ReplicationGroupMessageIdImpl(2L, 2L)), + SolaceDataUtils.getBytesXmlMessage( + "payload_test2", id2, null, new ReplicationGroupMessageIdImpl(2L, 2L))); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build(); // Expected data List expected = new ArrayList<>(); expected.add( SolaceDataUtils.getSolaceRecord( - "payload_test0", null, new ReplicationGroupMessageIdImpl(2L, 1L))); + "payload_test0", id0, new ReplicationGroupMessageIdImpl(2L, 1L))); + expected.add( + SolaceDataUtils.getSolaceRecord( + "payload_test1", id1, new ReplicationGroupMessageIdImpl(2L, 2L))); expected.add( SolaceDataUtils.getSolaceRecord( - "payload_test1", null, new ReplicationGroupMessageIdImpl(2L, 2L))); + "payload_test2", id2, new ReplicationGroupMessageIdImpl(2L, 2L))); // Run the pipeline PCollection events = @@ -248,19 +250,18 @@ public void testReadMessagesWithDeduplicationOnReplicationGroupMessageId() { @Test public void testReadWithCoderAndParseFnAndTimestampFn() { // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), - SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), - SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); - return getOrNull(index, messages); - }, - 3); + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); + return getOrNull(index, messages); + }; + SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build(); // Expected data List expected = new ArrayList<>(); @@ -304,7 +305,10 @@ public void testSplitsForExclusiveQueue() throws Exception { SolaceIO.read() .from(Solace.Queue.fromName("queue")) .withSempClientFactory(new MockSempClientFactory(mockSempClient)) - .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()); + .withSessionServiceFactory( + MockSessionServiceFactory.builder() + .sessionServiceType(SessionServiceType.EMPTY) + .build()); int desiredNumSplits = 5; @@ -316,7 +320,10 @@ public void testSplitsForExclusiveQueue() throws Exception { @Test public void testSplitsForNonExclusiveQueueWithMaxNumConnections() throws Exception { - Read spec = getDefaultRead().withMaxNumConnections(3); + Read spec = + getDefaultRead() + .withMaxNumConnections(3) + .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()); int desiredNumSplits = 5; @@ -328,7 +335,10 @@ public void testSplitsForNonExclusiveQueueWithMaxNumConnections() throws Excepti @Test public void testSplitsForNonExclusiveQueueWithMaxNumConnectionsRespectDesired() throws Exception { - Read spec = getDefaultRead().withMaxNumConnections(10); + Read spec = + getDefaultRead() + .withMaxNumConnections(10) + .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()); int desiredNumSplits = 5; UnboundedSolaceSource initialSource = getSource(spec, pipeline); @@ -346,7 +356,9 @@ public void testCreateQueueForTopic() { .build(); Read spec = - getDefaultReadForTopic().withSempClientFactory(new MockSempClientFactory(mockSempClient)); + getDefaultReadForTopic() + .withSempClientFactory(new MockSempClientFactory(mockSempClient)) + .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()); spec.expand(PBegin.in(TestPipeline.create())); // check if createQueueForTopic was executed assertEquals(1, createQueueForTopicFnCounter.get()); @@ -358,22 +370,22 @@ public void testCheckpointMark() throws Exception { AtomicInteger countAckMessages = new AtomicInteger(0); // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - messages.add( - SolaceDataUtils.getBytesXmlMessage( - "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); - } - countConsumedMessages.incrementAndGet(); - return getOrNull(index, messages); - }, - 10); + + SerializableFunction recordFn = + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); + Read spec = getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory); UnboundedSolaceSource initialSource = getSource(spec, pipeline); @@ -407,21 +419,20 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { AtomicInteger countAckMessages = new AtomicInteger(0); // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - messages.add( - SolaceDataUtils.getBytesXmlMessage( - "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); - } - countConsumedMessages.incrementAndGet(); - return getOrNull(index, messages); - }, - 10); + SerializableFunction recordFn = + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }; + SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); Read spec = getDefaultRead() @@ -467,22 +478,21 @@ public void testCheckpointMarkSafety() throws Exception { AtomicInteger countAckMessages = new AtomicInteger(0); // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = new ArrayList<>(); - for (int i = 0; i < messagesToProcess; i++) { - messages.add( - SolaceDataUtils.getBytesXmlMessage( - "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); - } - countConsumedMessages.incrementAndGet(); - return getOrNull(index, messages); - }, - 10); + SerializableFunction recordFn = + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < messagesToProcess; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); + Read spec = getDefaultRead() .withSessionServiceFactory(fakeSessionServiceFactory) @@ -558,20 +568,18 @@ public void testDestinationTopicQueueCreation() { @Test public void testTopicEncoding() { - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), - SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), - SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); - return getOrNull(index, messages); - }, - 3); + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build(); // Run PCollection events = diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java new file mode 100644 index 000000000000..e92657c3c3d2 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.solacesystems.jcsmp.DeliveryMode; +import java.util.List; +import java.util.Objects; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.io.solace.MockSessionServiceFactory.SessionServiceType; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.SolaceIO.WriterType; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.Record; +import org.apache.beam.sdk.io.solace.data.SolaceDataUtils; +import org.apache.beam.sdk.io.solace.write.SolaceOutput; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SolaceIOWriteTest { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + private final List keys = ImmutableList.of("450", "451", "452"); + private final List payloads = ImmutableList.of("payload0", "payload1", "payload2"); + + private PCollection getRecords(Pipeline p) { + TestStream.Builder> kvBuilder = + TestStream.create(KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class))) + .advanceWatermarkTo(Instant.EPOCH); + + assert keys.size() == payloads.size(); + + for (int k = 0; k < keys.size(); k++) { + kvBuilder = + kvBuilder + .addElements(KV.of(keys.get(k), payloads.get(k))) + .advanceProcessingTime(Duration.standardSeconds(60)); + } + + TestStream> testStream = kvBuilder.advanceWatermarkToInfinity(); + PCollection> kvs = p.apply("Test stream", testStream); + + return kvs.apply( + "To Record", + MapElements.into(TypeDescriptor.of(Record.class)) + .via(kv -> SolaceDataUtils.getSolaceRecord(kv.getValue(), kv.getKey()))); + } + + private SolaceOutput getWriteTransform( + SubmissionMode mode, + WriterType writerType, + Pipeline p, + ErrorHandler errorHandler) { + SessionServiceFactory fakeSessionServiceFactory = + MockSessionServiceFactory.builder().mode(mode).build(); + + PCollection records = getRecords(p); + return records.apply( + "Write to Solace", + SolaceIO.write() + .to(Solace.Queue.fromName("queue")) + .withSubmissionMode(mode) + .withWriterType(writerType) + .withDeliveryMode(DeliveryMode.PERSISTENT) + .withSessionServiceFactory(fakeSessionServiceFactory) + .withErrorHandler(errorHandler)); + } + + private static PCollection getIdsPCollection(SolaceOutput output) { + return output + .getSuccessfulPublish() + .apply( + "Get message ids", MapElements.into(strings()).via(Solace.PublishResult::getMessageId)); + } + + @Test + public void testWriteLatencyStreaming() throws Exception { + SubmissionMode mode = SubmissionMode.LOWER_LATENCY; + WriterType writerType = WriterType.STREAMING; + + ErrorHandler> errorHandler = + pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform()); + SolaceOutput output = getWriteTransform(mode, writerType, pipeline, errorHandler); + PCollection ids = getIdsPCollection(output); + + PAssert.that(ids).containsInAnyOrder(keys); + errorHandler.close(); + PAssert.that(errorHandler.getOutput()).empty(); + + pipeline.run(); + } + + @Test + public void testWriteThroughputStreaming() throws Exception { + SubmissionMode mode = SubmissionMode.HIGHER_THROUGHPUT; + WriterType writerType = WriterType.STREAMING; + ErrorHandler> errorHandler = + pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform()); + SolaceOutput output = getWriteTransform(mode, writerType, pipeline, errorHandler); + PCollection ids = getIdsPCollection(output); + + PAssert.that(ids).containsInAnyOrder(keys); + errorHandler.close(); + PAssert.that(errorHandler.getOutput()).empty(); + + pipeline.run(); + } + + @Test + public void testWriteLatencyBatched() throws Exception { + SubmissionMode mode = SubmissionMode.LOWER_LATENCY; + WriterType writerType = WriterType.BATCHED; + ErrorHandler> errorHandler = + pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform()); + SolaceOutput output = getWriteTransform(mode, writerType, pipeline, errorHandler); + PCollection ids = getIdsPCollection(output); + + PAssert.that(ids).containsInAnyOrder(keys); + errorHandler.close(); + PAssert.that(errorHandler.getOutput()).empty(); + pipeline.run(); + } + + @Test + public void testWriteThroughputBatched() throws Exception { + SubmissionMode mode = SubmissionMode.HIGHER_THROUGHPUT; + WriterType writerType = WriterType.BATCHED; + ErrorHandler> errorHandler = + pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform()); + SolaceOutput output = getWriteTransform(mode, writerType, pipeline, errorHandler); + PCollection ids = getIdsPCollection(output); + + PAssert.that(ids).containsInAnyOrder(keys); + errorHandler.close(); + PAssert.that(errorHandler.getOutput()).empty(); + pipeline.run(); + } + + @Test + public void testWriteWithFailedRecords() throws Exception { + SubmissionMode mode = SubmissionMode.HIGHER_THROUGHPUT; + WriterType writerType = WriterType.BATCHED; + ErrorHandler> errorHandler = + pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform()); + + SessionServiceFactory fakeSessionServiceFactory = + MockSessionServiceFactory.builder() + .mode(mode) + .sessionServiceType(SessionServiceType.WITH_FAILING_PRODUCER) + .build(); + + PCollection records = getRecords(pipeline); + SolaceOutput output = + records.apply( + "Write to Solace", + SolaceIO.write() + .to(Solace.Queue.fromName("queue")) + .withSubmissionMode(mode) + .withWriterType(writerType) + .withDeliveryMode(DeliveryMode.PERSISTENT) + .withSessionServiceFactory(fakeSessionServiceFactory) + .withErrorHandler(errorHandler)); + + PCollection ids = getIdsPCollection(output); + + PAssert.that(ids).empty(); + errorHandler.close(); + PAssert.thatSingleton(Objects.requireNonNull(errorHandler.getOutput())) + .isEqualTo((long) payloads.size()); + pipeline.run(); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java index 0c6f88a7c9d5..357734f18aad 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java @@ -31,9 +31,8 @@ public class OverrideWriterPropertiesTest { @Test public void testOverrideForHigherThroughput() { SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT; - MockSessionService service = new MockSessionService(null, 0, mode); + MockSessionService service = MockSessionService.builder().mode(mode).build(); - // Test HIGHER_THROUGHPUT mode JCSMPProperties props = service.initializeWriteSessionProperties(mode); assertEquals(false, props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR)); assertEquals( @@ -44,13 +43,26 @@ public void testOverrideForHigherThroughput() { @Test public void testOverrideForLowerLatency() { SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY; - MockSessionService service = new MockSessionService(null, 0, mode); + MockSessionService service = MockSessionService.builder().mode(mode).build(); - // Test HIGHER_THROUGHPUT mode JCSMPProperties props = service.initializeWriteSessionProperties(mode); assertEquals(true, props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR)); assertEquals( Long.valueOf(50), Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE))); } + + @Test + public void testDontOverrideForCustom() { + SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.CUSTOM; + MockSessionService service = MockSessionService.builder().mode(mode).build(); + + JCSMPProperties props = service.initializeWriteSessionProperties(mode); + assertEquals( + MockSessionService.callbackOnReactor, + props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR)); + assertEquals( + Long.valueOf(MockSessionService.ackWindowSizeForTesting), + Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE))); + } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java index 5134bd131d73..9e04c4cfd276 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java @@ -100,7 +100,7 @@ public static Solace.Record getSolaceRecord( : DEFAULT_REPLICATION_GROUP_ID.toString(); return Solace.Record.builder() - .setPayload(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8))) + .setPayload(payload.getBytes(StandardCharsets.UTF_8)) .setMessageId(messageId) .setDestination( Solace.Destination.builder() @@ -116,7 +116,7 @@ public static Solace.Record getSolaceRecord( .setTimeToLive(1000L) .setSenderTimestamp(null) .setReplicationGroupMessageId(replicationGroupMessageIdString) - .setAttachmentBytes(ByteBuffer.wrap(new byte[0])) + .setAttachmentBytes(new byte[0]) .build(); } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java index 1a2a056efd45..ee5d206533dc 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java @@ -17,49 +17,71 @@ */ package org.apache.beam.sdk.io.solace.it; +import static org.apache.beam.sdk.io.solace.it.SolaceContainerManager.TOPIC_NAME; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; import static org.junit.Assert.assertEquals; +import com.solacesystems.jcsmp.DeliveryMode; import java.io.IOException; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.solace.SolaceIO; import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory; import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory; +import org.apache.beam.sdk.io.solace.data.Solace; import org.apache.beam.sdk.io.solace.data.Solace.Queue; +import org.apache.beam.sdk.io.solace.data.SolaceDataUtils; +import org.apache.beam.sdk.io.solace.write.SolaceOutput; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testutils.metrics.MetricsReader; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.FixMethodOrder; import org.junit.Rule; import org.junit.Test; +import org.junit.runners.MethodSorters; +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class SolaceIOIT { private static final String NAMESPACE = SolaceIOIT.class.getName(); private static final String READ_COUNT = "read_count"; + private static final String WRITE_COUNT = "write_count"; private static SolaceContainerManager solaceContainerManager; - private static final TestPipelineOptions readPipelineOptions; + private static final String queueName = "test_queue"; + private static final TestPipelineOptions pipelineOptions; + private static final long PUBLISH_MESSAGE_COUNT = 20; static { - readPipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class); - readPipelineOptions.setBlockOnRun(false); - readPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false); - readPipelineOptions.as(StreamingOptions.class).setStreaming(false); + pipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class); + pipelineOptions.as(StreamingOptions.class).setStreaming(true); + // For the read connector tests, we need to make sure that p.run() does not block + pipelineOptions.setBlockOnRun(false); + pipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false); } - @Rule public final TestPipeline readPipeline = TestPipeline.fromOptions(readPipelineOptions); + @Rule public final TestPipeline pipeline = TestPipeline.fromOptions(pipelineOptions); @BeforeClass public static void setup() throws IOException { solaceContainerManager = new SolaceContainerManager(); solaceContainerManager.start(); + solaceContainerManager.createQueueWithSubscriptionTopic(queueName); } @AfterClass @@ -69,20 +91,17 @@ public static void afterClass() { } } + // The order of the following tests matter. The first test publishes some messages in a Solace + // queue, and those messages are read by the second test. If another writer tests is run before + // the read test, that will alter the count for the read test and will make it fail. @Test - public void testRead() { - String queueName = "test_queue"; - solaceContainerManager.createQueueWithSubscriptionTopic(queueName); - - // todo this is very slow, needs to be replaced with the SolaceIO.write connector. - int publishMessagesCount = 20; - for (int i = 0; i < publishMessagesCount; i++) { - solaceContainerManager.sendToTopic( - "{\"field_str\":\"value\",\"field_int\":123}", - ImmutableList.of("Solace-Message-ID:m" + i)); - } + public void test01WriteStreaming() { + testWriteConnector(SolaceIO.WriterType.STREAMING); + } - readPipeline + @Test + public void test02Read() { + pipeline .apply( "Read from Solace", SolaceIO.read() @@ -105,12 +124,83 @@ public void testRead() { .build())) .apply("Count", ParDo.of(new CountingFn<>(NAMESPACE, READ_COUNT))); - PipelineResult pipelineResult = readPipeline.run(); + PipelineResult pipelineResult = pipeline.run(); + // We need enough time for Beam to pull all messages from the queue, but we need a timeout too, + // as the Read connector will keep attempting to read forever. pipelineResult.waitUntilFinish(Duration.standardSeconds(15)); MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT); - assertEquals(publishMessagesCount, actualRecordsCount); + assertEquals(PUBLISH_MESSAGE_COUNT, actualRecordsCount); + } + + @Test + public void test03WriteBatched() { + testWriteConnector(SolaceIO.WriterType.BATCHED); + } + + private void testWriteConnector(SolaceIO.WriterType writerType) { + Pipeline p = createWriterPipeline(writerType); + + PipelineResult pipelineResult = p.run(); + pipelineResult.waitUntilFinish(); + MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); + long actualRecordsCount = metricsReader.getCounterMetric(WRITE_COUNT); + assertEquals(PUBLISH_MESSAGE_COUNT, actualRecordsCount); + } + + private Pipeline createWriterPipeline(SolaceIO.WriterType writerType) { + TestStream.Builder> kvBuilder = + TestStream.create(KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class))) + .advanceWatermarkTo(Instant.EPOCH); + + for (int i = 0; i < PUBLISH_MESSAGE_COUNT; i++) { + String key = "Solace-Message-ID:m" + i; + String payload = String.format("{\"field_str\":\"value\",\"field_int\":123%d}", i); + kvBuilder = + kvBuilder + .addElements(KV.of(key, payload)) + .advanceProcessingTime(Duration.standardSeconds(60)); + } + + TestStream> testStream = kvBuilder.advanceWatermarkToInfinity(); + + PCollection> kvs = + pipeline.apply(String.format("Test stream %s", writerType), testStream); + + PCollection records = + kvs.apply( + String.format("To Record %s", writerType), + MapElements.into(TypeDescriptor.of(Solace.Record.class)) + .via(kv -> SolaceDataUtils.getSolaceRecord(kv.getValue(), kv.getKey()))); + + SolaceOutput result = + records.apply( + String.format("Write to Solace %s", writerType), + SolaceIO.write() + .to(Solace.Topic.fromName(TOPIC_NAME)) + .withSubmissionMode(SolaceIO.SubmissionMode.TESTING) + .withWriterType(writerType) + .withDeliveryMode(DeliveryMode.PERSISTENT) + .withNumberOfClientsPerWorker(1) + .withNumShards(1) + .withSessionServiceFactory( + BasicAuthJcsmpSessionServiceFactory.builder() + .host("localhost:" + solaceContainerManager.jcsmpPortMapped) + .username(SolaceContainerManager.USERNAME) + .password(SolaceContainerManager.PASSWORD) + .vpnName(SolaceContainerManager.VPN_NAME) + .build())); + result + .getSuccessfulPublish() + .apply( + String.format("Get ids %s", writerType), + MapElements.into(strings()).via(Solace.PublishResult::getMessageId)) + .apply( + String.format("Count %s", writerType), + ParDo.of(new CountingFn<>(NAMESPACE, WRITE_COUNT))); + + return pipeline; } private static class CountingFn extends DoFn {