diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java index 2c042edaf7c0..55199cf01637 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -35,6 +35,7 @@ import java.util.concurrent.Callable; import javax.annotation.Nullable; import org.apache.beam.sdk.io.solace.RetryCallableManager; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; /** @@ -47,12 +48,16 @@ public abstract class BasicAuthJcsmpSessionService extends SessionService { /** The name of the queue to receive messages from. */ public abstract @Nullable String queueName(); + /** The host name or IP address of the Solace broker. Format: Host[:Port] */ public abstract String host(); + /** The username to use for authentication. */ public abstract String username(); + /** The password to use for authentication. */ public abstract String password(); + /** The name of the VPN to connect to. */ public abstract String vpnName(); @@ -114,11 +119,11 @@ public MessageReceiver getReceiver() { } @Override - public MessageProducer getProducer() { + public MessageProducer getProducer(SubmissionMode submissionMode) { if (this.messageProducer == null || this.messageProducer.isClosed()) { + Callable create = () -> createXMLMessageProducer(submissionMode); this.messageProducer = - retryCallableManager.retryCallable( - this::createXMLMessageProducer, ImmutableSet.of(JCSMPException.class)); + retryCallableManager.retryCallable(create, ImmutableSet.of(JCSMPException.class)); } return checkStateNotNull(this.messageProducer); } @@ -128,9 +133,10 @@ public boolean isClosed() { return jcsmpSession == null || jcsmpSession.isClosed(); } - private MessageProducer createXMLMessageProducer() throws JCSMPException, IOException { + private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode) + throws JCSMPException, IOException { if (isClosed()) { - connectSession(); + connectWriteSession(submissionMode); } @SuppressWarnings("nullness") @@ -165,7 +171,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException createFlowReceiver(jcsmpSession, flowProperties, endpointProperties)); } throw new IOException( - "SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null."); + "SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is" + + " null."); } // The `@SuppressWarning` is needed here, because the checkerframework reports an error for the @@ -188,11 +195,24 @@ private int connectSession() throws JCSMPException { return 0; } + private int connectWriteSession(SubmissionMode mode) throws JCSMPException { + if (jcsmpSession == null) { + jcsmpSession = createWriteSessionObject(mode); + } + jcsmpSession.connect(); + return 0; + } + private JCSMPSession createSessionObject() throws InvalidPropertiesException { JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties()); return JCSMPFactory.onlyInstance().createSession(properties); } + private JCSMPSession createWriteSessionObject(SubmissionMode mode) + throws InvalidPropertiesException { + return JCSMPFactory.onlyInstance().createSession(initializeWriteSessionProperties(mode)); + } + @Override public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) { baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName()); diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java index 31834d237847..199dcccee854 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java @@ -30,12 +30,16 @@ */ @AutoValue public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory { + /** The host name or IP address of the Solace broker. Format: Host[:Port] */ public abstract String host(); + /** The username to use for authentication. */ public abstract String username(); + /** The password to use for authentication. */ public abstract String password(); + /** The name of the VPN to connect to. */ public abstract String vpnName(); public static Builder builder() { @@ -53,6 +57,7 @@ public abstract static class Builder { /** Set Solace username. */ public abstract Builder username(String username); + /** Set Solace password. */ public abstract Builder password(String password); diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java index 027d920eed49..7ae092c74923 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java @@ -20,6 +20,7 @@ import com.solacesystems.jcsmp.JCSMPProperties; import java.io.Serializable; import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,7 +137,7 @@ public abstract class SessionService implements Serializable { * this method is used, the producer is created from the session instance, otherwise it returns * the producer created initially. */ - public abstract MessageProducer getProducer(); + public abstract MessageProducer getProducer(SubmissionMode mode); /** * Override this method and provide your specific properties, including all those related to @@ -272,7 +273,7 @@ private static JCSMPProperties overrideConnectorProperties( LOG.info( " SolaceIO.Write: Using the custom JCSMP properties set by the user. No property has" + " been overridden by the connector."); - break; + break; case TESTING: LOG.warn( "SolaceIO.Write: Overriding JCSMP properties for testing. **IF THIS IS AN" diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java index 98059bc9248f..612b5d9e3cef 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java @@ -19,6 +19,7 @@ import com.solacesystems.jcsmp.Queue; import java.io.Serializable; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -59,6 +60,12 @@ public abstract class SessionServiceFactory implements Serializable { */ @Nullable Queue queue; + /** + * The write submission mode. This is set when the writers are created. This property is used only + * by the write connector. + */ + @Nullable SubmissionMode submissionMode; + /** * This is the core method that subclasses must implement. It defines how to construct and return * a SessionService object. @@ -78,6 +85,7 @@ public abstract class SessionServiceFactory implements Serializable { */ @Override public abstract int hashCode(); + /** * This method is called in the {@link * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method @@ -86,4 +94,15 @@ public abstract class SessionServiceFactory implements Serializable { public void setQueue(Queue queue) { this.queue = queue; } + + /** + * Called by the write connector to set the submission mode used to create the message producers. + */ + public void setSubmissionMode(SubmissionMode submissionMode) { + this.submissionMode = submissionMode; + } + + public @Nullable SubmissionMode getSubmissionMode() { + return submissionMode; + } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java index d705035cbf75..f10654af6d88 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java @@ -18,9 +18,11 @@ package org.apache.beam.sdk.io.solace.write; import static org.apache.beam.sdk.io.solace.SolaceIO.DEFAULT_WRITER_CLIENTS_PER_WORKER; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; @@ -55,7 +57,11 @@ private static SessionService createSessionAndStartProducer(SessionConfiguration SessionServiceFactory factory = key.sessionServiceFactory(); SessionService sessionService = factory.create(); // Start the producer now that the initialization is locked for other threads - sessionService.getProducer(); + SubmissionMode mode = factory.getSubmissionMode(); + checkStateNotNull( + mode, + "SolaceIO.Write: Submission mode is not set. You need to set it to create write sessions."); + sessionService.getProducer(mode); return sessionService; } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java index 25dbda4c3485..c0c01927c379 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java @@ -143,7 +143,7 @@ private void publishBatch(List records) { try { int entriesPublished = solaceSessionService() - .getProducer() + .getProducer(getSubmissionMode()) .publishBatch( records, shouldPublishLatencyMetrics(), getDestinationFn(), getDeliveryMode()); sentToBroker.inc(entriesPublished); diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java index 7cba7eca5add..7eb65b53f365 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java @@ -89,6 +89,9 @@ public UnboundedSolaceWriter( boolean publishLatencyMetrics) { this.destinationFn = destinationFn; this.sessionServiceFactory = sessionServiceFactory; + // Make sure that we set the submission mode now that we know which mode has been set by the + // user. + this.sessionServiceFactory.setSubmissionMode(submissionMode); this.deliveryMode = deliveryMode; this.submissionMode = submissionMode; this.producersMapCardinality = producersMapCardinality; @@ -279,7 +282,7 @@ public DeliveryMode getDeliveryMode() { return deliveryMode; } - public SubmissionMode getDispatchMode() { + public SubmissionMode getSubmissionMode() { return submissionMode; } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java index f2e0b940b237..65ded0f2c376 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java @@ -111,7 +111,7 @@ public void processElement( // The publish method will retry, let's send a failure message if all the retries fail try { solaceSessionService() - .getProducer() + .getProducer(getSubmissionMode()) .publishSingleMessage( record, getDestinationFn().apply(record), diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java index 18e896988a33..5fa3e0c53527 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java @@ -19,6 +19,7 @@ import com.google.auto.value.AutoValue; import com.solacesystems.jcsmp.JCSMPProperties; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; import org.apache.beam.sdk.io.solace.broker.MessageProducer; import org.apache.beam.sdk.io.solace.broker.MessageReceiver; import org.apache.beam.sdk.io.solace.broker.SessionService; @@ -48,7 +49,7 @@ public MessageReceiver getReceiver() { } @Override - public MessageProducer getProducer() { + public MessageProducer getProducer(SubmissionMode mode) { throw new UnsupportedOperationException(exceptionMessage); } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java index cb40a41b2c1b..2b104b770a15 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java @@ -81,7 +81,7 @@ public MessageReceiver getReceiver() { } @Override - public MessageProducer getProducer() { + public MessageProducer getProducer(SubmissionMode mode) { if (messageProducer == null) { messageProducer = new MockProducer(new PublishResultHandler()); }