Skip to content

Commit

Permalink
Fix bug introduced with the refactoring of code for this PR.
Browse files Browse the repository at this point in the history
I forgot to pass the submission mode when the write session is created, and I
called the wrong method in the base class because it was defined as public.

This makes sure that the submission mode is passed to the session when the
session is created for writing messages.
  • Loading branch information
iht committed Sep 15, 2024
1 parent de617fa commit 0389117
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;

/**
Expand All @@ -47,12 +48,16 @@
public abstract class BasicAuthJcsmpSessionService extends SessionService {
/** The name of the queue to receive messages from. */
public abstract @Nullable String queueName();

/** The host name or IP address of the Solace broker. Format: Host[:Port] */
public abstract String host();

/** The username to use for authentication. */
public abstract String username();

/** The password to use for authentication. */
public abstract String password();

/** The name of the VPN to connect to. */
public abstract String vpnName();

Expand Down Expand Up @@ -114,11 +119,11 @@ public MessageReceiver getReceiver() {
}

@Override
public MessageProducer getProducer() {
public MessageProducer getProducer(SubmissionMode submissionMode) {
if (this.messageProducer == null || this.messageProducer.isClosed()) {
Callable<MessageProducer> create = () -> createXMLMessageProducer(submissionMode);
this.messageProducer =
retryCallableManager.retryCallable(
this::createXMLMessageProducer, ImmutableSet.of(JCSMPException.class));
retryCallableManager.retryCallable(create, ImmutableSet.of(JCSMPException.class));
}
return checkStateNotNull(this.messageProducer);
}
Expand All @@ -128,9 +133,10 @@ public boolean isClosed() {
return jcsmpSession == null || jcsmpSession.isClosed();
}

private MessageProducer createXMLMessageProducer() throws JCSMPException, IOException {
private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
throws JCSMPException, IOException {
if (isClosed()) {
connectSession();
connectWriteSession(submissionMode);
}

@SuppressWarnings("nullness")
Expand Down Expand Up @@ -165,7 +171,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException
createFlowReceiver(jcsmpSession, flowProperties, endpointProperties));
}
throw new IOException(
"SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null.");
"SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is"
+ " null.");
}

// The `@SuppressWarning` is needed here, because the checkerframework reports an error for the
Expand All @@ -188,11 +195,24 @@ private int connectSession() throws JCSMPException {
return 0;
}

private int connectWriteSession(SubmissionMode mode) throws JCSMPException {
if (jcsmpSession == null) {
jcsmpSession = createWriteSessionObject(mode);
}
jcsmpSession.connect();
return 0;
}

private JCSMPSession createSessionObject() throws InvalidPropertiesException {
JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties());
return JCSMPFactory.onlyInstance().createSession(properties);
}

private JCSMPSession createWriteSessionObject(SubmissionMode mode)
throws InvalidPropertiesException {
return JCSMPFactory.onlyInstance().createSession(initializeWriteSessionProperties(mode));
}

@Override
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@
*/
@AutoValue
public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory {
/** The host name or IP address of the Solace broker. Format: Host[:Port] */
public abstract String host();

/** The username to use for authentication. */
public abstract String username();

/** The password to use for authentication. */
public abstract String password();

/** The name of the VPN to connect to. */
public abstract String vpnName();

public static Builder builder() {
Expand All @@ -53,6 +57,7 @@ public abstract static class Builder {

/** Set Solace username. */
public abstract Builder username(String username);

/** Set Solace password. */
public abstract Builder password(String password);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.solacesystems.jcsmp.JCSMPProperties;
import java.io.Serializable;
import org.apache.beam.sdk.io.solace.SolaceIO;
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -136,7 +137,7 @@ public abstract class SessionService implements Serializable {
* this method is used, the producer is created from the session instance, otherwise it returns
* the producer created initially.
*/
public abstract MessageProducer getProducer();
public abstract MessageProducer getProducer(SubmissionMode mode);

/**
* Override this method and provide your specific properties, including all those related to
Expand Down Expand Up @@ -272,7 +273,7 @@ private static JCSMPProperties overrideConnectorProperties(
LOG.info(
" SolaceIO.Write: Using the custom JCSMP properties set by the user. No property has"
+ " been overridden by the connector.");
break;
break;
case TESTING:
LOG.warn(
"SolaceIO.Write: Overriding JCSMP properties for testing. **IF THIS IS AN"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.solacesystems.jcsmp.Queue;
import java.io.Serializable;
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down Expand Up @@ -59,6 +60,12 @@ public abstract class SessionServiceFactory implements Serializable {
*/
@Nullable Queue queue;

/**
* The write submission mode. This is set when the writers are created. This property is used only
* by the write connector.
*/
@Nullable SubmissionMode submissionMode;

/**
* This is the core method that subclasses must implement. It defines how to construct and return
* a SessionService object.
Expand All @@ -78,6 +85,7 @@ public abstract class SessionServiceFactory implements Serializable {
*/
@Override
public abstract int hashCode();

/**
* This method is called in the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
Expand All @@ -86,4 +94,15 @@ public abstract class SessionServiceFactory implements Serializable {
public void setQueue(Queue queue) {
this.queue = queue;
}

/**
* Called by the write connector to set the submission mode used to create the message producers.
*/
public void setSubmissionMode(SubmissionMode submissionMode) {
this.submissionMode = submissionMode;
}

public @Nullable SubmissionMode getSubmissionMode() {
return submissionMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.beam.sdk.io.solace.write;

import static org.apache.beam.sdk.io.solace.SolaceIO.DEFAULT_WRITER_CLIENTS_PER_WORKER;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;

Expand Down Expand Up @@ -55,7 +57,11 @@ private static SessionService createSessionAndStartProducer(SessionConfiguration
SessionServiceFactory factory = key.sessionServiceFactory();
SessionService sessionService = factory.create();
// Start the producer now that the initialization is locked for other threads
sessionService.getProducer();
SubmissionMode mode = factory.getSubmissionMode();
checkStateNotNull(
mode,
"SolaceIO.Write: Submission mode is not set. You need to set it to create write sessions.");
sessionService.getProducer(mode);
return sessionService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private void publishBatch(List<Solace.Record> records) {
try {
int entriesPublished =
solaceSessionService()
.getProducer()
.getProducer(getSubmissionMode())
.publishBatch(
records, shouldPublishLatencyMetrics(), getDestinationFn(), getDeliveryMode());
sentToBroker.inc(entriesPublished);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public UnboundedSolaceWriter(
boolean publishLatencyMetrics) {
this.destinationFn = destinationFn;
this.sessionServiceFactory = sessionServiceFactory;
// Make sure that we set the submission mode now that we know which mode has been set by the
// user.
this.sessionServiceFactory.setSubmissionMode(submissionMode);
this.deliveryMode = deliveryMode;
this.submissionMode = submissionMode;
this.producersMapCardinality = producersMapCardinality;
Expand Down Expand Up @@ -279,7 +282,7 @@ public DeliveryMode getDeliveryMode() {
return deliveryMode;
}

public SubmissionMode getDispatchMode() {
public SubmissionMode getSubmissionMode() {
return submissionMode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void processElement(
// The publish method will retry, let's send a failure message if all the retries fail
try {
solaceSessionService()
.getProducer()
.getProducer(getSubmissionMode())
.publishSingleMessage(
record,
getDestinationFn().apply(record),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.JCSMPProperties;
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
import org.apache.beam.sdk.io.solace.broker.MessageProducer;
import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
import org.apache.beam.sdk.io.solace.broker.SessionService;
Expand Down Expand Up @@ -48,7 +49,7 @@ public MessageReceiver getReceiver() {
}

@Override
public MessageProducer getProducer() {
public MessageProducer getProducer(SubmissionMode mode) {
throw new UnsupportedOperationException(exceptionMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public MessageReceiver getReceiver() {
}

@Override
public MessageProducer getProducer() {
public MessageProducer getProducer(SubmissionMode mode) {
if (messageProducer == null) {
messageProducer = new MockProducer(new PublishResultHandler());
}
Expand Down

0 comments on commit 0389117

Please sign in to comment.