From ef885399c929d6763240ad59e150e31ae3aa1ea5 Mon Sep 17 00:00:00 2001 From: George Ma <111381964+georgecma@users.noreply.github.com> Date: Tue, 18 Jun 2024 12:22:01 -0400 Subject: [PATCH 1/3] CassandraIO: Fix generic SelectAll clauses provided by ValueProvider causing CassandraIO.read to fail (#31623) * Fix ValueProvider not expanding properly when query does not have 'WHERE' in it * Tweak test case 'WHERE' clause * Linted CassandraIO --- .../apache/beam/sdk/io/cassandra/ReadFn.java | 13 +-- .../sdk/io/cassandra/CassandraIOTest.java | 89 +++++++++++++++++++ 2 files changed, 96 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java index b8fff2297121..44e3808f53f8 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java @@ -107,7 +107,7 @@ private static String getHighestSplitQuery( String finalHighQuery = (spec.query() == null) ? buildInitialQuery(spec, true) + highestClause - : spec.query() + " AND " + highestClause; + : spec.query() + getJoinerClause(spec.query().get()) + highestClause; LOG.debug("CassandraIO generated a wrapAround query : {}", finalHighQuery); return finalHighQuery; } @@ -117,7 +117,7 @@ private static String getLowestSplitQuery(Read spec, String partitionKey, Big String finalLowQuery = (spec.query() == null) ? buildInitialQuery(spec, true) + lowestClause - : spec.query() + " AND " + lowestClause; + : spec.query() + getJoinerClause(spec.query().get()) + lowestClause; LOG.debug("CassandraIO generated a wrapAround query : {}", finalLowQuery); return finalLowQuery; } @@ -141,9 +141,10 @@ private static String buildInitialQuery(Read spec, Boolean hasRingRange) { return (spec.query() == null) ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get()) + " WHERE " - : spec.query().get() - + (hasRingRange - ? spec.query().get().toUpperCase().contains("WHERE") ? " AND " : " WHERE " - : ""); + : spec.query().get() + (hasRingRange ? getJoinerClause(spec.query().get()) : ""); + } + + private static String getJoinerClause(String queryString) { + return queryString.toUpperCase().contains("WHERE") ? " AND " : " WHERE "; } } diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index a7090d5c7bcc..747f803ea46b 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -61,6 +61,7 @@ import javax.management.remote.JMXServiceURL; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.common.NetworkTestHelper; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; @@ -489,6 +490,94 @@ public void testReadWithQuery() throws Exception { pipeline.run(); } + /** + * Create a mock value provider class that tests how the query gets expanded in + * CassandraIO.ReadFn. + */ + static class MockQueryProvider implements ValueProvider { + private volatile String query; + + MockQueryProvider(String query) { + this.query = query; + } + + @Override + public String get() { + return query; + } + + @Override + public boolean isAccessible() { + return !query.isEmpty(); + } + } + + @Test + public void testReadWithQueryProvider() throws Exception { + String query = + String.format( + "select person_id, writetime(person_name) from %s.%s", + CASSANDRA_KEYSPACE, CASSANDRA_TABLE); + + PCollection output = + pipeline.apply( + CassandraIO.read() + .withHosts(Collections.singletonList(CASSANDRA_HOST)) + .withPort(cassandraPort) + .withKeyspace(CASSANDRA_KEYSPACE) + .withTable(CASSANDRA_TABLE) + .withMinNumberOfSplits(20) + .withQuery(new MockQueryProvider(query)) + .withCoder(SerializableCoder.of(Scientist.class)) + .withEntity(Scientist.class)); + + PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(NUM_ROWS); + PAssert.that(output) + .satisfies( + input -> { + for (Scientist sci : input) { + assertNull(sci.name); + assertTrue(sci.nameTs != null && sci.nameTs > 0); + } + return null; + }); + + pipeline.run(); + } + + @Test + public void testReadWithQueryProviderWithWhereQuery() throws Exception { + String query = + String.format( + "select person_id, writetime(person_name) from %s.%s where person_id=10 AND person_department='logic'", + CASSANDRA_KEYSPACE, CASSANDRA_TABLE); + + PCollection output = + pipeline.apply( + CassandraIO.read() + .withHosts(Collections.singletonList(CASSANDRA_HOST)) + .withPort(cassandraPort) + .withKeyspace(CASSANDRA_KEYSPACE) + .withTable(CASSANDRA_TABLE) + .withMinNumberOfSplits(20) + .withQuery(new MockQueryProvider(query)) + .withCoder(SerializableCoder.of(Scientist.class)) + .withEntity(Scientist.class)); + + PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L); + PAssert.that(output) + .satisfies( + input -> { + for (Scientist sci : input) { + assertNull(sci.name); + assertTrue(sci.nameTs != null && sci.nameTs > 0); + } + return null; + }); + + pipeline.run(); + } + @Test public void testReadWithUnfilteredQuery() throws Exception { String query = From 96b9de0fee9d8d86d37e86d73a6e90b891469493 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Tue, 18 Jun 2024 19:27:54 +0200 Subject: [PATCH 2/3] Solace Read connector: UnboundedSource and UnboundedReader (#31636) * Add interfaces for broker-helper classes * Add UnboundedSolaceReader and UnboundedSolaceSource --- sdks/java/io/solace/build.gradle | 3 + .../sdk/io/solace/broker/MessageReceiver.java | 59 ++++++ .../beam/sdk/io/solace/broker/SempClient.java | 49 +++++ .../io/solace/broker/SempClientFactory.java | 11 +- .../sdk/io/solace/broker/SessionService.java | 50 +++++ .../solace/broker/SessionServiceFactory.java | 9 +- .../io/solace/read/SolaceCheckpointMark.java | 6 +- .../io/solace/read/UnboundedSolaceReader.java | 191 ++++++++++++++++++ .../io/solace/read/UnboundedSolaceSource.java | 148 ++++++++++++++ 9 files changed, 523 insertions(+), 3 deletions(-) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index fbf096abd22f..b33b8fb18027 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -32,7 +32,10 @@ dependencies { implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.slf4j_api implementation library.java.joda_time implementation library.java.solace implementation project(":sdks:java:extensions:avro") + implementation library.java.avro + permitUnusedDeclared library.java.avro } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java new file mode 100644 index 000000000000..199a83e322bd --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import java.io.IOException; + +/** + * Interface for receiving messages from a Solace broker. + * + *

Implementations of this interface are responsible for managing the connection to the broker + * and for receiving messages from the broker. + */ +public interface MessageReceiver { + /** + * Starts the message receiver. + * + *

This method is called in the {@link + * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader#start()} method. + */ + void start(); + + /** + * Returns {@literal true} if the message receiver is closed, {@literal false} otherwise. + * + *

A message receiver is closed when it is no longer able to receive messages. + */ + boolean isClosed(); + + /** + * Receives a message from the broker. + * + *

This method will block until a message is received. + */ + BytesXMLMessage receive() throws IOException; + + /** + * Test clients may return {@literal true} to signal that all expected messages have been pulled + * and the test may complete. Real clients should always return {@literal false}. + */ + default boolean isEOF() { + return false; + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java new file mode 100644 index 000000000000..465f37c14036 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.Queue; +import java.io.IOException; +import java.io.Serializable; + +/** + * This interface defines methods for interacting with a Solace message broker using the Solace + * Element Management Protocol (SEMP). SEMP provides a way to manage and monitor various aspects of + * the broker, including queues and topics. + */ +public interface SempClient extends Serializable { + + /** + * Determines if the specified queue is non-exclusive. In Solace, non-exclusive queues allow + * multiple consumers to receive messages from the queue. + */ + boolean isQueueNonExclusive(String queueName) throws IOException; + + /** + * This is only called when a user requests to read data from a topic. This method creates a new + * queue on the Solace broker and associates it with the specified topic. This ensures that + * messages published to the topic are delivered to the queue, allowing consumers to receive them. + */ + Queue createQueueForTopic(String queueName, String topicName) throws IOException; + + /** + * Retrieves the size of the backlog (in bytes) for the specified queue. The backlog represents + * the amount of data in messages that are waiting to be delivered to consumers. + */ + long getBacklogBytes(String queueName) throws IOException; +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java index b5cb53e14b39..79f690fde175 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java @@ -23,4 +23,13 @@ * This interface serves as a blueprint for creating SempClient objects, which are used to interact * with a Solace message broker using the Solace Element Management Protocol (SEMP). */ -public interface SempClientFactory extends Serializable {} +public interface SempClientFactory extends Serializable { + + /** + * This method is the core of the factory interface. It defines how to construct and return a + * SempClient object. Implementations of this interface will provide the specific logic for + * creating a client instance, which might involve connecting to the broker, handling + * authentication, and configuring other settings. + */ + SempClient create(); +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java new file mode 100644 index 000000000000..cd368865f0c3 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import java.io.Serializable; + +/** + * The SessionService interface provides a set of methods for managing a session with the Solace + * messaging system. It allows for establishing a connection, creating a message-receiver object, + * checking if the connection is closed or not, and gracefully closing the session. + */ +public interface SessionService extends Serializable { + + /** + * Establishes a connection to the service. This could involve providing connection details like + * host, port, VPN name, username, and password. + */ + void connect(); + + /** Gracefully closes the connection to the service. */ + void close(); + + /** + * Checks whether the connection to the service is currently closed. This method is called when an + * `UnboundedSolaceReader` is starting to read messages - a session will be created if this + * returns true. + */ + boolean isClosed(); + + /** + * Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is + * created from the session instance. + */ + MessageReceiver createReceiver(); +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java index ab1f55ae7a9c..9b4ef99eba77 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java @@ -23,4 +23,11 @@ * This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a * queue property and mandates the implementation of a create() method in concrete subclasses. */ -public abstract class SessionServiceFactory implements Serializable {} +public abstract class SessionServiceFactory implements Serializable { + + /** + * This is the core method that subclasses must implement. It defines how to construct and return + * a SessionService object. + */ + public abstract SessionService create(); +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index f429df3f8cd1..77f6eed8f62c 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -23,9 +23,11 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -33,7 +35,9 @@ * acknowledged. */ @DefaultCoder(AvroCoder.class) -class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { +@Internal +@VisibleForTesting +public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { private transient AtomicBoolean activeReader; // BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry // these messages here. We relay on Solace's retry mechanism. diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java new file mode 100644 index 000000000000..0155345a2323 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.read; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.io.solace.broker.MessageReceiver; +import org.apache.beam.sdk.io.solace.broker.SempClient; +import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Unbounded Reader to read messages from a Solace Router. */ +@VisibleForTesting +class UnboundedSolaceReader extends UnboundedReader { + + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSolaceReader.class); + private final UnboundedSolaceSource currentSource; + private final WatermarkPolicy watermarkPolicy; + private final SempClient sempClient; + private @Nullable BytesXMLMessage solaceOriginalRecord; + private @Nullable T solaceMappedRecord; + private @Nullable MessageReceiver messageReceiver; + private @Nullable SessionService sessionService; + AtomicBoolean active = new AtomicBoolean(true); + + /** + * Queue to place advanced messages before {@link #getCheckpointMark()} be called non-concurrent + * queue, should only be accessed by the reader thread A given {@link UnboundedReader} object will + * only be accessed by a single thread at once. + */ + private final java.util.Queue elementsToCheckpoint = new ArrayDeque<>(); + + public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { + this.currentSource = currentSource; + this.watermarkPolicy = WatermarkPolicy.create(currentSource.getTimestampFn()); + this.sessionService = currentSource.getSessionServiceFactory().create(); + this.sempClient = currentSource.getSempClientFactory().create(); + } + + @Override + public boolean start() { + populateSession(); + populateMessageConsumer(); + return advance(); + } + + public void populateSession() { + if (sessionService == null) { + sessionService = getCurrentSource().getSessionServiceFactory().create(); + } + if (sessionService.isClosed()) { + checkNotNull(sessionService).connect(); + } + } + + private void populateMessageConsumer() { + if (messageReceiver == null) { + messageReceiver = checkNotNull(sessionService).createReceiver(); + messageReceiver.start(); + } + MessageReceiver receiver = checkNotNull(messageReceiver); + if (receiver.isClosed()) { + receiver.start(); + } + } + + @Override + public boolean advance() { + BytesXMLMessage receivedXmlMessage; + try { + receivedXmlMessage = checkNotNull(messageReceiver).receive(); + } catch (IOException e) { + LOG.warn("SolaceIO.Read: Exception when pulling messages from the broker.", e); + return false; + } + + if (receivedXmlMessage == null) { + return false; + } + elementsToCheckpoint.add(receivedXmlMessage); + solaceOriginalRecord = receivedXmlMessage; + solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); + watermarkPolicy.update(solaceMappedRecord); + return true; + } + + @Override + public void close() { + active.set(false); + checkNotNull(sessionService).close(); + } + + @Override + public Instant getWatermark() { + // should be only used by a test receiver + if (checkNotNull(messageReceiver).isEOF()) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + return watermarkPolicy.getWatermark(); + } + + @Override + public UnboundedSource.CheckpointMark getCheckpointMark() { + List ackQueue = new ArrayList<>(); + while (!elementsToCheckpoint.isEmpty()) { + BytesXMLMessage msg = elementsToCheckpoint.poll(); + if (msg != null) { + ackQueue.add(msg); + } + } + return new SolaceCheckpointMark(active, ackQueue); + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (solaceMappedRecord == null) { + throw new NoSuchElementException(); + } + return solaceMappedRecord; + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + if (solaceOriginalRecord == null) { + throw new NoSuchElementException(); + } + if (solaceOriginalRecord.getApplicationMessageId() != null) { + return checkNotNull(solaceOriginalRecord) + .getApplicationMessageId() + .getBytes(StandardCharsets.UTF_8); + } else { + return checkNotNull(solaceOriginalRecord) + .getReplicationGroupMessageId() + .toString() + .getBytes(StandardCharsets.UTF_8); + } + } + + @Override + public UnboundedSolaceSource getCurrentSource() { + return currentSource; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (getCurrent() == null) { + throw new NoSuchElementException(); + } + return currentSource.getTimestampFn().apply(getCurrent()); + } + + @Override + public long getTotalBacklogBytes() { + try { + return sempClient.getBacklogBytes(currentSource.getQueue().getName()); + } catch (IOException e) { + LOG.warn("SolaceIO.Read: Could not query backlog bytes. Returning BACKLOG_UNKNOWN", e); + return BACKLOG_UNKNOWN; + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java new file mode 100644 index 000000000000..370159994941 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.read; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.Queue; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.solace.broker.SempClientFactory; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class UnboundedSolaceSource extends UnboundedSource { + private static final long serialVersionUID = 42L; + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSolaceSource.class); + private final Queue queue; + private final @Nullable Integer maxNumConnections; + private final Coder coder; + private final boolean enableDeduplication; + private final SempClientFactory sempClientFactory; + private final SessionServiceFactory sessionServiceFactory; + private final SerializableFunction timestampFn; + private final SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn; + + public Queue getQueue() { + return queue; + } + + public SessionServiceFactory getSessionServiceFactory() { + return sessionServiceFactory; + } + + public SempClientFactory getSempClientFactory() { + return sempClientFactory; + } + + public SerializableFunction getTimestampFn() { + return timestampFn; + } + + public SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn() { + return parseFn; + } + + public UnboundedSolaceSource( + Queue queue, + SempClientFactory sempClientFactory, + SessionServiceFactory sessionServiceFactory, + @Nullable Integer maxNumConnections, + boolean enableDeduplication, + Coder coder, + SerializableFunction timestampFn, + SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn) { + this.queue = queue; + this.sempClientFactory = sempClientFactory; + this.sessionServiceFactory = sessionServiceFactory; + this.maxNumConnections = maxNumConnections; + this.enableDeduplication = enableDeduplication; + this.coder = coder; + this.timestampFn = timestampFn; + this.parseFn = parseFn; + } + + @Override + public UnboundedReader createReader( + PipelineOptions options, @Nullable SolaceCheckpointMark checkpointMark) { + // it makes no sense to resume a Solace Session with the previous checkpoint + // so don't need the pass a checkpoint to new a Solace Reader + return new UnboundedSolaceReader<>(this); + } + + @Override + public List> split(int desiredNumSplits, PipelineOptions options) + throws IOException { + boolean queueNonExclusive = sempClientFactory.create().isQueueNonExclusive(queue.getName()); + if (queueNonExclusive) { + return getSolaceSources(desiredNumSplits, maxNumConnections); + } else { + LOG.warn("SolaceIO.Read: The queue {} is exclusive. Provisioning only 1 read client.", queue); + return getSolaceSources(desiredNumSplits, 1); + } + } + + private List> getSolaceSources( + int desiredNumSplits, @Nullable Integer maxNumConnections) { + List> sourceList = new ArrayList<>(); + int numSplits = + maxNumConnections != null + ? Math.min(desiredNumSplits, maxNumConnections) + : desiredNumSplits; + LOG.info("SolaceIO.Read: UnboundedSolaceSource: creating {} read connections.", numSplits); + for (int i = 0; i < numSplits; i++) { + UnboundedSolaceSource source = + new UnboundedSolaceSource<>( + queue, + sempClientFactory, + sessionServiceFactory, + maxNumConnections, + enableDeduplication, + coder, + timestampFn, + parseFn); + sourceList.add(source); + } + return sourceList; + } + + @Override + public Coder getCheckpointMarkCoder() { + return AvroCoder.of(SolaceCheckpointMark.class); + } + + @Override + public Coder getOutputCoder() { + return coder; + } + + @Override + public boolean requiresDeduping() { + return enableDeduplication; + } +} From 2b9078506ca28c3c0ceef8e8aca554c16001dccd Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Tue, 18 Jun 2024 19:28:34 +0200 Subject: [PATCH 3/3] Add interfaces for broker-helper classes (#31635)