Skip to content

Commit

Permalink
Solace Read connector: adding Basic Authentication support (#31541)
Browse files Browse the repository at this point in the history
* Add support for BasicAuth to Solace

* Address PR comments

* Use `checkStateNotNull`
  • Loading branch information
bzablocki authored Jul 1, 2024
1 parent 957a7cd commit 3b8ddda
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.FlowReceiver;
import com.solacesystems.jcsmp.InvalidPropertiesException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Queue;
import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;

/**
* A class that manages a connection to a Solace broker using basic authentication.
*
* <p>This class provides a way to connect to a Solace broker and receive messages from a queue. The
* connection is established using basic authentication.
*/
public class BasicAuthJcsmpSessionService implements SessionService {
private final String queueName;
private final String host;
private final String username;
private final String password;
private final String vpnName;
@Nullable private JCSMPSession jcsmpSession;
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();

/**
* Creates a new {@link BasicAuthJcsmpSessionService} with the given parameters.
*
* @param queueName The name of the queue to receive messages from.
* @param host The host name or IP address of the Solace broker. Format: Host[:Port]
* @param username The username to use for authentication.
* @param password The password to use for authentication.
* @param vpnName The name of the VPN to connect to.
*/
public BasicAuthJcsmpSessionService(
String queueName, String host, String username, String password, String vpnName) {
this.queueName = queueName;
this.host = host;
this.username = username;
this.password = password;
this.vpnName = vpnName;
}

@Override
public void connect() {
retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class));
}

@Override
public void close() {
if (isClosed()) {
return;
}
retryCallableManager.retryCallable(
() -> {
checkStateNotNull(jcsmpSession).closeSession();
return 0;
},
ImmutableSet.of(IOException.class));
}

@Override
public MessageReceiver createReceiver() {
return retryCallableManager.retryCallable(
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
}

@Override
public boolean isClosed() {
return jcsmpSession == null || jcsmpSession.isClosed();
}

private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {
if (isClosed()) {
connectSession();
}

Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName);

ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
flowProperties.setEndpoint(queue);
flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);

EndpointProperties endpointProperties = new EndpointProperties();
endpointProperties.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE);
if (jcsmpSession != null) {
return new SolaceMessageReceiver(
createFlowReceiver(jcsmpSession, flowProperties, endpointProperties));
}
throw new IOException(
"SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null.");
}

// The `@SuppressWarning` is needed here, because the checkerframework reports an error for the
// first argument of the `createFlow` being null, even though the documentation allows it:
// https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPSession.html#createFlow-com.solacesystems.jcsmp.XMLMessageListener-com.solacesystems.jcsmp.ConsumerFlowProperties-com.solacesystems.jcsmp.EndpointProperties-
@SuppressWarnings("nullness")
private static FlowReceiver createFlowReceiver(
JCSMPSession jcsmpSession,
ConsumerFlowProperties flowProperties,
EndpointProperties endpointProperties)
throws JCSMPException {
return jcsmpSession.createFlow(null, flowProperties, endpointProperties);
}

private int connectSession() throws JCSMPException {
if (jcsmpSession == null) {
jcsmpSession = createSessionObject();
}
jcsmpSession.connect();
return 0;
}

private JCSMPSession createSessionObject() throws InvalidPropertiesException {
JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.HOST, host);
properties.setProperty(JCSMPProperties.USERNAME, username);
properties.setProperty(JCSMPProperties.PASSWORD, password);
properties.setProperty(JCSMPProperties.VPN_NAME, vpnName);

return JCSMPFactory.onlyInstance().createSession(properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;

/**
* A factory for creating {@link BasicAuthJcsmpSessionService} instances. Extends {@link
* SessionServiceFactory}.
*
* <p>This factory provides a way to create {@link BasicAuthJcsmpSessionService} instances with
* authenticate to Solace with Basic Authentication.
*/
@AutoValue
public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory {
public abstract String host();

public abstract String username();

public abstract String password();

public abstract String vpnName();

public static Builder builder() {
return new AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

/**
* Set Solace host, format: Host[:Port] e.g. "12.34.56.78", or "[fe80::1]", or
* "12.34.56.78:4444".
*/
public abstract Builder host(String host);

/** Set Solace username. */
public abstract Builder username(String username);
/** Set Solace password. */
public abstract Builder password(String password);

/** Set Solace vpn name. */
public abstract Builder vpnName(String vpnName);

public abstract BasicAuthJcsmpSessionServiceFactory build();
}

@Override
public SessionService create() {
return new BasicAuthJcsmpSessionService(
checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(),
host(),
username(),
password(),
vpnName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.FlowReceiver;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.StaleSessionException;
import java.io.IOException;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolaceMessageReceiver implements MessageReceiver {
private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageReceiver.class);

public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100;
private final FlowReceiver flowReceiver;
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();

public SolaceMessageReceiver(FlowReceiver flowReceiver) {
this.flowReceiver = flowReceiver;
}

@Override
public void start() {
startFlowReceiver();
}

private void startFlowReceiver() {
retryCallableManager.retryCallable(
() -> {
flowReceiver.start();
return 0;
},
ImmutableSet.of(JCSMPException.class));
}

@Override
public boolean isClosed() {
return flowReceiver == null || flowReceiver.isClosed();
}

@Override
public BytesXMLMessage receive() throws IOException {
try {
return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS);
} catch (StaleSessionException e) {
LOG.warn("SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.");
startFlowReceiver();
throw new IOException(e);
} catch (JCSMPException e) {
throw new IOException(e);
}
}
}

0 comments on commit 3b8ddda

Please sign in to comment.