diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index d6469e209c67..bb9f0c6ea689 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -22,6 +22,7 @@ import com.google.auto.value.AutoValue; import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; import com.solacesystems.jcsmp.Destination; import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.Queue; @@ -31,18 +32,22 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory; +import org.apache.beam.sdk.io.solace.broker.GCPSecretSessionServiceFactory; import org.apache.beam.sdk.io.solace.broker.SempClientFactory; import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; import org.apache.beam.sdk.io.solace.data.Solace; import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper; import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource; +import org.apache.beam.sdk.io.solace.write.SolaceOutput; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; @@ -181,10 +186,6 @@ * * } * - *

Writing

- * - * TBD - * *

Authentication

* *

When reading from Solace, the user must use {@link @@ -198,6 +199,186 @@ *

For the authentication to the SEMP API ({@link Read#withSempClientFactory(SempClientFactory)}) * the connector provides {@link org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} to * authenticate using the Basic Authentication. + * + *

Writing

+ * + *

To write to Solace, use {@link #write()} with a {@link PCollection}. You can + * also use {@link #write(SerializableFunction)} to specify a format function to convert the input + * type to {@link Solace.Record}. + * + *

Writing to a static topic or queue

+ * + *

The connector uses the Solace JCSMP API. + * The clients will write to a SMF + * topic to the port 55555 of the host. If you want to use a different port, specify it in the + * host property with the format "X.X.X.X:PORT". + * + *

Once you have a {@link PCollection} of {@link Solace.Record}, you can write to Solace using: + * + *

{@code
+ * PCollection solaceRecs = ...;
+ *
+ * PCollection results =
+ *         solaceRecs.apply(
+ *                 "Write to Solace",
+ *                 SolaceIO.write()
+ *                         .to(SolaceIO.topicFromName("some-topic"))
+ *                         .withSessionServiceFactory(
+ *                            BasicAuthJcsmpSessionServiceFactory.builder()
+ *                              .username("username")
+ *                              .password("password")
+ *                              .host("host:port")
+ *                              .build()));
+ * }
+ * + *

The above code snippet will write to the VPN named "default", using 4 clients per worker (VM + * in Dataflow), and a maximum of 20 workers/VMs for writing (default values). You can change the + * default VPN name by setting the required JCSMP property in the session factory (in this case, + * with {@link BasicAuthJcsmpSessionServiceFactory#vpnName()}), the number of clients per worker + * with {@link Write#withNumberOfClientsPerWorker(int)} and the number of parallel write clients + * using {@link Write#withMaxNumOfUsedWorkers(int)}. + * + *

Writing to dynamic destinations

+ * + * To write to dynamic destinations, don't set the {@link Write#to(Solace.Queue)} or {@link + * Write#to(Solace.Topic)} property and make sure that all the {@link Solace.Record}s have their + * destination field set to either a topic or a queue. You can do this prior to calling the write + * connector, or by using a format function and {@link #write(SerializableFunction)}. + * + *

For instance, you can create a function like the following: + * + *

{@code
+ * // Generate Record with different destinations
+ * SerializableFunction formatFn =
+ *    (MyType msg) -> {
+ *       int queue = ... // some random number
+ *       return Solace.Record.builder()
+ *         .setDestination(Solace.Destination.builder()
+ *                        .setName(String.format("q%d", queue))
+ *                        .setType(Solace.DestinationType.QUEUE)
+ *                        .build())
+ *         .setMessageId(msg.getMessageId())
+ *         .build();
+ * };
+ * }
+ * + * And then use the connector as follows: + * + *
{@code
+ * // Ignore "to" method to use dynamic destinations
+ * SolaceOutput solaceResponses = msgs.apply("Write to Solace",
+ *   SolaceIO.write(formatFn)
+ *        .withDeliveryMode(DeliveryMode.PERSISTENT)
+ *        .withWriterType(SolaceIO.WriterType.STREAMING)
+ * ...
+ * }
+ * + *

Direct and persistent messages, and latency metrics

+ * + *

The connector can write either direct or persistent messages. The default mode is DIRECT. + * + *

The connector returns a {@link PCollection} of {@link Solace.PublishResult}, that you can use + * to get a confirmation of messages that have been published, or rejected, but only if it is + * publishing persistent messages. + * + *

If you are publishing persistent messages, then you can have some feedback about whether the + * messages have been published, and some publishing latency metrics. If the message has been + * published, {@link Solace.PublishResult#getPublished()} will be true. If it is false, it means + * that the message could not be published, and {@link Solace.PublishResult#getError()} will contain + * more details about why the message could not be published. To get latency metrics as well as the + * results, set the property {@link Write#publishLatencyMetrics()}. + * + *

Throughput and latency

+ * + *

This connector can work in two main modes: high latency or high throughput. The default mode + * favors high throughput over high latency. You can control this behavior with the methods {@link + * Write#withSubmissionMode(SubmissionMode)} and {@link Write#withWriterType(WriterType)}. + * + *

The default mode works like the following options: + * + *

{@code
+ * PCollection solaceRecs = ...;
+ *
+ * PCollection results =
+ *         solaceRecs.apply(
+ *                 "Write to Solace",
+ *                 SolaceIO.write()
+ *                         .to(SolaceIO.topicFromName("some-topic"))
+ *                         .withSessionServiceFactory(
+ *                            BasicAuthJcsmpSessionServiceFactory.builder()
+ *                              .username("username")
+ *                              .password("password")
+ *                              .host("host:port")
+ *                              .build())
+ *                         .withSubmissionMode(SubmissionMode.HIGHER_THROUGHPUT)
+ *                         .withWriterType(WriterType.BATCHED));
+ * }
+ * + *

{@link SubmissionMode#HIGHER_THROUGHPUT} and {@link WriterType#BATCHED} are the default + * values, and offer the higher possible throughput, and the lowest usage of resources in the runner + * side (due to the lower backpressure). + * + *

This connector writes bundles of 50 messages, using a bulk publish JCSMP method. This will + * increase the latency, since a message needs to "wait" until 50 messages are accumulated, before + * they are submitted to Solace. + * + *

For the lowest latency possible, use {@link SubmissionMode#LOWER_LATENCY} and {@link + * WriterType#STREAMING}. + * + *

{@code
+ * PCollection results =
+ *         solaceRecs.apply(
+ *                 "Write to Solace",
+ *                 SolaceIO.write()
+ *                         .to(SolaceIO.topicFromName("some-topic"))
+ *                         .withSessionServiceFactory(
+ *                            BasicAuthJcsmpSessionServiceFactory.builder()
+ *                              .username("username")
+ *                              .password("password")
+ *                              .host("host:port")
+ *                              .build())
+ *                         .withSubmissionMode(SubmissionMode.LOWER_LATENCY)
+ *                         .withWriterType(WriterType.STREAMING));
+ * }
+ * + *

The streaming connector publishes each message individually, without holding up or batching + * before the message is sent to Solace. This will ensure the lowest possible latency, but it will + * offer a much lower throughput. The streaming connector does not use state & timers. + * + *

Both connectors uses state & timers to control the level of parallelism. If you are using + * Cloud Dataflow, it is recommended that you enable Streaming Engine to use this + * connector. + * + *

Authentication

+ * + *

When writing to Solace, the user must use {@link + * Write#withSessionServiceFactory(SessionServiceFactory)} to create a JCSMP session. + * + *

See {@link Write#withSessionServiceFactory(SessionServiceFactory)} for session authentication. + * The connector provides implementation of the {@link SessionServiceFactory} using basic + * authentication ({@link BasicAuthJcsmpSessionServiceFactory}), and another implementation using + * basic authentication but with a password stored as a secret in Google Cloud Secret Manager + * ({@link GCPSecretSessionServiceFactory}) + * + *

Connector retries

+ * + *

When the worker using the connector is created, the connector will attempt to connect to + * Solace. + * + *

If the client cannot connect to Solace for whatever reason, the connector will retry the + * connections using the following strategy. There will be a maximum of 4 retries. The first retry + * will be attempted 1 second after the first connection attempt. Every subsequent retry will + * multiply that time by a factor of two, with a maximum of 10 seconds. + * + *

If after those retries the client is still unable to connect to Solace, the connector will + * attempt to reconnect using the same strategy repeated for every single incoming message. If for + * some reason, there is a persistent issue that prevents the connection (e.g. client quota + * exhausted), you will need to stop your job manually, or the connector will keep retrying. + * + *

This strategy is applied to all the remote calls sent to Solace, either to connect, pull + * messages, push messages, etc. */ @Internal public class SolaceIO { @@ -213,11 +394,13 @@ public class SolaceIO { }; private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false; - // Part of the new write connector, documentation to be updated in upcoming pull requests - public enum SubmissionMode { - HIGHER_THROUGHPUT, - LOWER_LATENCY - } + public static final int DEFAULT_WRITER_MAX_NUMBER_OF_WORKERS = 20; + public static final int DEFAULT_WRITER_CLIENTS_PER_WORKER = 4; + public static final Boolean DEFAULT_WRITER_PUBLISH_LATENCY_METRICS = false; + public static final SubmissionMode DEFAULT_WRITER_SUBMISSION_MODE = + SubmissionMode.HIGHER_THROUGHPUT; + public static final DeliveryMode DEFAULT_WRITER_DELIVERY_MODE = DeliveryMode.DIRECT; + public static final WriterType DEFAULT_WRITER_TYPE = WriterType.BATCHED; /** Get a {@link Topic} object from the topic name. */ static Topic topicFromName(String topicName) { @@ -287,6 +470,24 @@ public static Read read( .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)); } + /** + * Create a {@link Write} transform, to write to Solace with a custom type. + * + *

If you are using a custom data class, the format function should return a {@link + * Solace.Record} corresponding to your custom data class instance. + * + *

If you are using this formatting function with dynamic destinations, you must ensure that + * you set the right value in the destination value of the {@link Solace.Record} messages. + */ + public static Write write(SerializableFunction formatFunction) { + return Write.builder().setFormatFunction(formatFunction).build(); + } + + /** Create a {@link Write} transform, to write to Solace using {@link Solace.Record} objects. */ + public static Write write() { + return Write.builder().build(); + } + public static class Read extends PTransform> { private static final Logger LOG = LoggerFactory.getLogger(Read.class); @@ -579,4 +780,232 @@ private Queue initializeQueueForTopicIfNeeded( } } } + + public enum SubmissionMode { + HIGHER_THROUGHPUT, + LOWER_LATENCY + } + + public enum WriterType { + STREAMING, + BATCHED + } + + @AutoValue + public abstract static class Write extends PTransform, SolaceOutput> { + + public static final TupleTag FAILED_PUBLISH_TAG = + new TupleTag() {}; + public static final TupleTag SUCCESSFUL_PUBLISH_TAG = + new TupleTag() {}; + + /** + * Write to a Solace topic. + * + *

The topic does not need to exist before launching the pipeline. + * + *

This will write all records to the same topic, ignoring their destination field. + * + *

Optional. If not specified, the connector will use dynamic destinations based on the + * destination field of {@link Solace.Record}. + */ + public Write to(Solace.Topic topic) { + return toBuilder().setDestination(topicFromName(topic.getName())).build(); + } + + /** + * Write to a Solace queue. + * + *

The queue must exist prior to launching the pipeline. + * + *

This will write all records to the same queue, ignoring their destination field. + * + *

Optional. If not specified, the connector will use dynamic destinations based on the + * destination field of {@link Solace.Record}. + */ + public Write to(Solace.Queue queue) { + return toBuilder().setDestination(queueFromName(queue.getName())).build(); + } + + /** + * The number of workers used by the job to write to Solace. + * + *

This is optional, the default value is 20. + * + *

This is the maximum value that the job would use, but depending on the amount of data, the + * actual number of writers may be lower than this value. With the Dataflow runner, the + * connector will as maximum this number of VMs in the job (but the job itself may use more + * VMs). + * + *

Set this number taking into account the limits in the number of clients in your Solace + * cluster, and the need for performance when writing to Solace (more workers will achieve + * higher throughput). + */ + public Write withMaxNumOfUsedWorkers(int maxNumOfUsedWorkers) { + return toBuilder().setMaxNumOfUsedWorkers(maxNumOfUsedWorkers).build(); + } + + /** + * The number of clients that each worker will create. + * + *

This is optional, the default number is 4. + * + *

The number of clients is per worker. If there are more than one worker, the number of + * clients will be multiplied by the number of workers. With the Dataflow runner, this will be + * the number of clients created per VM. The clients will be re-used across different threads in + * the same worker. + * + *

Set this number in combination with {@link #withMaxNumOfUsedWorkers}, to ensure that the + * limit for number of clients in your Solace cluster is not exceeded. + * + *

Normally, using a higher number of clients with fewer workers will achieve better + * throughput at a lower cost, since the workers are better utilized. A good rule of thumb to + * use is setting as many clients per worker as vCPUs the worker has. + */ + public Write withNumberOfClientsPerWorker(int numberOfClientsPerWorker) { + return toBuilder().setNumberOfClientsPerWorker(numberOfClientsPerWorker).build(); + } + + /** + * Set the delivery mode. This is optional, the default value is DIRECT. + * + *

For more details, see https://docs.solace.com/API/API-Developer-Guide/Message-Delivery-Modes.htm + */ + public Write withDeliveryMode(DeliveryMode deliveryMode) { + return toBuilder().setDeliveryMode(deliveryMode).build(); + } + + /** + * Publish latency metrics using Beam metrics. + * + *

Latency metrics are only available if {@link #withDeliveryMode(DeliveryMode)} is set to + * PERSISTENT. In that mode, latency is measured for each single message, as the time difference + * between the message creation and the reception of the publishing confirmation. + * + *

For the batched writer, the creation time is set for every message in a batch shortly + * before the batch is submitted. So the latency is very close to the actual publishing latency, + * and it does not take into account the time spent waiting for the batch to be submitted. + * + *

This is optional, the default value is false (don't publish latency metrics). + */ + public Write publishLatencyMetrics() { + return toBuilder().setPublishLatencyMetrics(true).build(); + } + + /** + * This setting controls the JCSMP property MESSAGE_CALLBACK_ON_REACTOR. Optional. + * + *

For full details, please check https://docs.solace.com/API/API-Developer-Guide/Java-API-Best-Practices.htm. + * + *

The Solace JCSMP client libraries can dispatch messages using two different modes: + * + *

One of the modes dispatches messages directly from the same thread that is doing the rest + * of I/O work. This mode favors lower latency but lower throughput. Set this to LOWER_LATENCY + * to use that mode (MESSAGE_CALLBACK_ON_REACTOR set to True). + * + *

The other mode uses a parallel thread to accumulate and dispatch messages. This mode + * favors higher throughput but also has higher latency. Set this to HIGHER_THROUGHPUT to use + * that mode. This is the default mode (MESSAGE_CALLBACK_ON_REACTOR set to False). + * + *

This is optional, the default value is HIGHER_THROUGHPUT. + */ + public Write withSubmissionMode(SubmissionMode submissionMode) { + return toBuilder().setDispatchMode(submissionMode).build(); + } + + /** + * Set the type of writer used by the connector. Optional. + * + *

The Solace writer can either use the JCSMP modes in streaming or batched. + * + *

In streaming mode, the publishing latency will be lower, but the throughput will also be + * lower. + * + *

With the batched mode, messages are accumulated until a batch size of 50 is reached, or 5 + * seconds have elapsed since the first message in the batch was received. The 50 messages are + * sent to Solace in a single batch. This writer offers higher throughput but higher publishing + * latency, as messages can be held up for up to 5 seconds until they are published. + * + *

Notice that this is the message publishing latency, not the end-to-end latency. For very + * large scale pipelines, you will probably prefer to use the HIGHER_THROUGHPUT mode, as with + * lower throughput messages will accumulate in the pipeline, and the end-to-end latency may + * actually be higher. + * + *

This is optional, the default is the BATCHED writer. + */ + public Write withWriterType(WriterType writerType) { + return toBuilder().setWriterType(writerType).build(); + } + + /** + * Set the provider used to obtain the properties to initialize a new session in the broker. + * + *

This provider should define the destination host where the broker is listening, and all + * the properties related to authentication (base auth, client certificate, etc.). + */ + public Write withSessionServiceFactory(SessionServiceFactory factory) { + return toBuilder().setSessionServiceFactory(factory).build(); + } + + abstract int getMaxNumOfUsedWorkers(); + + abstract int getNumberOfClientsPerWorker(); + + abstract @Nullable Destination getDestination(); + + abstract DeliveryMode getDeliveryMode(); + + abstract boolean getPublishLatencyMetrics(); + + abstract SubmissionMode getDispatchMode(); + + abstract WriterType getWriterType(); + + abstract @Nullable SerializableFunction getFormatFunction(); + + abstract @Nullable SessionServiceFactory getSessionServiceFactory(); + + static Builder builder() { + return new AutoValue_SolaceIO_Write.Builder() + .setDeliveryMode(DEFAULT_WRITER_DELIVERY_MODE) + .setMaxNumOfUsedWorkers(DEFAULT_WRITER_MAX_NUMBER_OF_WORKERS) + .setNumberOfClientsPerWorker(DEFAULT_WRITER_CLIENTS_PER_WORKER) + .setPublishLatencyMetrics(DEFAULT_WRITER_PUBLISH_LATENCY_METRICS) + .setDispatchMode(DEFAULT_WRITER_SUBMISSION_MODE) + .setWriterType(DEFAULT_WRITER_TYPE); + } + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setMaxNumOfUsedWorkers(int maxNumOfUsedWorkers); + + abstract Builder setNumberOfClientsPerWorker(int numberOfClientsPerWorker); + + abstract Builder setDestination(Destination topicOrQueue); + + abstract Builder setDeliveryMode(DeliveryMode deliveryMode); + + abstract Builder setPublishLatencyMetrics(Boolean publishLatencyMetrics); + + abstract Builder setDispatchMode(SubmissionMode submissionMode); + + abstract Builder setWriterType(WriterType writerType); + + abstract Builder setFormatFunction(SerializableFunction formatFunction); + + abstract Builder setSessionServiceFactory(SessionServiceFactory factory); + + abstract Write build(); + } + + @Override + public SolaceOutput expand(PCollection input) { + // TODO: will be sent in upcoming PR + return SolaceOutput.in(input.getPipeline(), null, null); + } + } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java index 18fee9184446..00b94b5b9ea9 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,6 +119,7 @@ public abstract static class Record { * * @return The message ID, or null if not available. */ + @SchemaFieldNumber("0") public abstract @Nullable String getMessageId(); /** @@ -127,6 +129,7 @@ public abstract static class Record { * * @return The message payload. */ + @SchemaFieldNumber("1") public abstract ByteBuffer getPayload(); /** * Gets the destination (topic or queue) to which the message was sent. @@ -135,6 +138,7 @@ public abstract static class Record { * * @return The destination, or null if not available. */ + @SchemaFieldNumber("2") public abstract @Nullable Destination getDestination(); /** @@ -146,6 +150,7 @@ public abstract static class Record { * * @return The expiration timestamp. */ + @SchemaFieldNumber("3") public abstract long getExpiration(); /** @@ -155,6 +160,7 @@ public abstract static class Record { * * @return The message priority. */ + @SchemaFieldNumber("4") public abstract int getPriority(); /** @@ -164,6 +170,7 @@ public abstract static class Record { * * @return True if redelivered, false otherwise. */ + @SchemaFieldNumber("5") public abstract boolean getRedelivered(); /** @@ -173,6 +180,7 @@ public abstract static class Record { * * @return The reply-to destination, or null if not specified. */ + @SchemaFieldNumber("6") public abstract @Nullable Destination getReplyTo(); /** @@ -183,6 +191,7 @@ public abstract static class Record { * * @return The timestamp. */ + @SchemaFieldNumber("7") public abstract long getReceiveTimestamp(); /** @@ -191,6 +200,7 @@ public abstract static class Record { * * @return The sender timestamp, or null if not available. */ + @SchemaFieldNumber("8") public abstract @Nullable Long getSenderTimestamp(); /** @@ -200,6 +210,7 @@ public abstract static class Record { * * @return The sequence number, or null if not available. */ + @SchemaFieldNumber("9") public abstract @Nullable Long getSequenceNumber(); /** @@ -210,6 +221,7 @@ public abstract static class Record { * * @return The time-to-live value. */ + @SchemaFieldNumber("10") public abstract long getTimeToLive(); /** @@ -225,7 +237,9 @@ public abstract static class Record { * * @return The replication group message ID, or null if not present. */ + @SchemaFieldNumber("11") public abstract @Nullable String getReplicationGroupMessageId(); + /** * Gets the attachment data of the message as a ByteString, if any. This might represent files * or other binary content associated with the message. @@ -234,6 +248,7 @@ public abstract static class Record { * * @return The attachment data, or an empty ByteString if no attachment is present. */ + @SchemaFieldNumber("12") public abstract ByteBuffer getAttachmentBytes(); static Builder builder() { @@ -271,6 +286,90 @@ abstract static class Builder { abstract Record build(); } } + + /** + * The result of writing a message to Solace. This will be returned by the {@link + * com.google.cloud.dataflow.dce.io.solace.SolaceIO.Write} connector. + * + *

This class provides a builder to create instances, but you will probably not need it. The + * write connector will create and return instances of {@link Solace.PublishResult}. + * + *

If the message has been published, {@link Solace.PublishResult#getPublished()} will be true. + * If it is false, it means that the message could not be published, and {@link + * Solace.PublishResult#getError()} will contain more details about why the message could not be + * published. + */ + @AutoValue + @DefaultSchema(AutoValueSchema.class) + public abstract static class PublishResult { + /** The message id of the message that was published. */ + @SchemaFieldNumber("0") + public abstract String getMessageId(); + + /** Whether the message was published or not. */ + @SchemaFieldNumber("1") + public abstract Boolean getPublished(); + + /** + * The publishing latency in milliseconds. This is the difference between the time the message + * was created, and the time the message was published. It is only available if the {@link + * CorrelationKey} class is used as correlation key of the messages. + */ + @SchemaFieldNumber("2") + public abstract @Nullable Long getLatencyMilliseconds(); + + /** The error details if the message could not be published. */ + @SchemaFieldNumber("3") + public abstract @Nullable String getError(); + + public static Builder builder() { + return new AutoValue_Solace_PublishResult.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setMessageId(String messageId); + + public abstract Builder setPublished(Boolean published); + + public abstract Builder setLatencyMilliseconds(Long latencyMs); + + public abstract Builder setError(String error); + + public abstract PublishResult build(); + } + } + + /** + * The correlation key is an object that is passed back to the client during the event broker ack + * or nack. + * + *

In the streaming writer is optionally used to calculate publish latencies, by calculating + * the time difference between the creation of the correlation key, and the time of the ack. + */ + @AutoValue + @DefaultSchema(AutoValueSchema.class) + public abstract static class CorrelationKey { + @SchemaFieldNumber("0") + public abstract String getMessageId(); + + @SchemaFieldNumber("1") + public abstract long getPublishMonotonicMillis(); + + public static Builder builder() { + return new AutoValue_Solace_CorrelationKey.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setMessageId(String messageId); + + public abstract Builder setPublishMonotonicMillis(long millis); + + public abstract CorrelationKey build(); + } + } + /** * A utility class for mapping {@link BytesXMLMessage} instances to {@link Solace.Record} objects. * This simplifies the process of converting raw Solace messages into a format suitable for use diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java new file mode 100644 index 000000000000..6c37f879ae7f --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * The {@link SolaceIO.Write} transform's output return this type, containing both the successful + * publishes ({@link #getSuccessfulPublish()}) and the failed publishes ({@link + * #getFailedPublish()}). + * + *

The streaming writer with DIRECT messages does not return anything, and the output {@link + * PCollection}s will be equal to null. + */ +public final class SolaceOutput implements POutput { + private final Pipeline pipeline; + private final TupleTag failedPublishTag; + private final TupleTag successfulPublishTag; + private final @Nullable PCollection failedPublish; + private final @Nullable PCollection successfulPublish; + + public @Nullable PCollection getFailedPublish() { + return failedPublish; + } + + public @Nullable PCollection getSuccessfulPublish() { + return successfulPublish; + } + + public static SolaceOutput in( + Pipeline pipeline, + @Nullable PCollection failedPublish, + @Nullable PCollection successfulPublish) { + return new SolaceOutput( + pipeline, + SolaceIO.Write.FAILED_PUBLISH_TAG, + SolaceIO.Write.SUCCESSFUL_PUBLISH_TAG, + failedPublish, + successfulPublish); + } + + private SolaceOutput( + Pipeline pipeline, + TupleTag failedPublishTag, + TupleTag successfulPublishTag, + @Nullable PCollection failedPublish, + @Nullable PCollection successfulPublish) { + this.pipeline = pipeline; + this.failedPublishTag = failedPublishTag; + this.successfulPublishTag = successfulPublishTag; + this.failedPublish = failedPublish; + this.successfulPublish = successfulPublish; + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public Map, PValue> expand() { + ImmutableMap.Builder, PValue> builder = ImmutableMap., PValue>builder(); + + if (failedPublish != null) { + builder.put(failedPublishTag, failedPublish); + } + + if (successfulPublish != null) { + builder.put(successfulPublishTag, successfulPublish); + } + + return builder.build(); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/package-info.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/package-info.java new file mode 100644 index 000000000000..65974b9b29c2 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** SolaceIO Write connector. */ +package org.apache.beam.sdk.io.solace.write;