Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solace Read connector: SolaceIO PTransform init #31594

Merged
merged 47 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
f404eec
wip solace connector
bzablocki Apr 16, 2024
00274a2
wip solace connector
bzablocki Apr 16, 2024
399ffcc
some checker errors resolved
bzablocki Apr 17, 2024
62dfa4a
all checker errors resolved
bzablocki Apr 17, 2024
ea202fb
improving unit tests
bzablocki Apr 18, 2024
260f645
respond to pr commments
bzablocki Apr 19, 2024
9057860
Documentation
bzablocki Apr 19, 2024
a74b6a6
Small refactor - move data classes out of the client
bzablocki Apr 22, 2024
552cab4
refactor
bzablocki Apr 23, 2024
5d847b3
Add github action for integration test of Solace
bzablocki Apr 23, 2024
23292a1
testing github workflow
bzablocki Apr 24, 2024
a69e10a
bump testcontainers to 1.19.7 - soalce semp was updated with an admin…
bzablocki Apr 24, 2024
f4b0d6c
Use FlowHandle to acknowledge messages to make SolaceCheckpointMark's…
bzablocki Apr 29, 2024
a1ca8b3
Handle StaleSessionException error
bzablocki May 8, 2024
8007a33
Add @Internal annotation to mark the SolaceIO API beta and subject to…
bzablocki May 8, 2024
cff3105
Improve documentation
bzablocki May 13, 2024
f3eaabe
Back to ack based on bytesxmlmessages. Deduplicate default to false.
bzablocki May 29, 2024
34c4170
Merge remote-tracking branch 'origin/master' into solace-connector
bzablocki May 29, 2024
7b24fdb
update changes.md with Solace read connector
bzablocki May 29, 2024
be74c86
remove ack by id code
bzablocki Jun 3, 2024
a85c68c
remove todo comment
bzablocki Jun 3, 2024
533f122
Add licenses to package-info.java files
bzablocki Jun 3, 2024
2f41380
Restructure documentation
bzablocki Jun 3, 2024
9a317a8
update aws test after upgrading testcontainers version.
bzablocki Jun 5, 2024
38b6f0c
Merge branch 'master' of github.com:bzablocki/beam into solace-connector
bzablocki Jun 5, 2024
cf5d24d
Merge branch 'solace-connector' of github.com:bzablocki/beam into sol…
bzablocki Jun 5, 2024
aa29100
Disable publishing docs until the first pass on the master branch
bzablocki Jun 6, 2024
94101a8
Merge branch 'master' of github.com:apache/beam into solace-connector
bzablocki Jun 6, 2024
c625069
Remove files from this branch to split PR into smaller chunks
bzablocki Jun 6, 2024
1521f40
refactor tests for readability
bzablocki Jun 6, 2024
b815286
revert upgrade of testcontainers - not needed in this PR chunk
bzablocki Jun 6, 2024
08b777c
revert upgrade of testcontainers - not needed in this PR chunk
bzablocki Jun 6, 2024
0f63749
spotless
bzablocki Jun 6, 2024
e90e69a
remove IT tests from this pr
bzablocki Jun 6, 2024
f6833d2
Tech Writer review
bzablocki Jun 7, 2024
df1eb6c
Add a field to Solace.Record mapped from BytesXMLMessage.getAttachmen…
bzablocki Jun 7, 2024
5015b32
Add and fix some documentation
bzablocki Jun 10, 2024
2e1c10e
Remove CheckpointMark's reference to the UnboundedSolaceReader - unne…
bzablocki Jun 10, 2024
4d02b99
Revert "Remove CheckpointMark's reference to the UnboundedSolaceReade…
bzablocki Jun 11, 2024
bee8faf
Solace project init - github workflow file, gradle module
bzablocki Jun 11, 2024
a5b29ed
Merge branch 'solace-connector-0-project-init' into solace-connector
bzablocki Jun 11, 2024
09368d6
Merge branch 'master' of github.com:apache/beam into solace-connector
bzablocki Jun 13, 2024
f2d284a
Splitting the #31476 - Leaving only PTransform AutoValue configurations
bzablocki Jun 13, 2024
9af3185
remove unnecessary dependencies
bzablocki Jun 13, 2024
60d854d
remove info from CHANGES.md
bzablocki Jun 13, 2024
4f65427
Update sdks/java/io/solace/build.gradle
Abacn Jun 13, 2024
c662360
Update sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solac…
Abacn Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ class BeamModulePlugin implements Plugin<Project> {
def singlestore_jdbc_version = "1.1.4"
def slf4j_version = "1.7.30"
def snakeyaml_version = "2.2"
def solace_version = "10.21.0"
def spark2_version = "2.4.8"
def spark3_version = "3.2.2"
def spotbugs_version = "4.0.6"
Expand Down Expand Up @@ -877,6 +878,7 @@ class BeamModulePlugin implements Plugin<Project> {
slf4j_log4j12 : "org.slf4j:slf4j-log4j12:$slf4j_version",
slf4j_jcl : "org.slf4j:slf4j-jcl:$slf4j_version",
snappy_java : "org.xerial.snappy:snappy-java:1.1.10.4",
solace : "com.solacesystems:sol-jcsmp:$solace_version",
spark_core : "org.apache.spark:spark-core_2.11:$spark2_version",
spark_streaming : "org.apache.spark:spark-streaming_2.11:$spark2_version",
spark3_core : "org.apache.spark:spark-core_2.12:$spark3_version",
Expand Down
7 changes: 6 additions & 1 deletion sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ ext.summary = """IO to read and write to Solace destinations (queues and topics)

dependencies {
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
}
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.joda_time
implementation library.java.solace
Abacn marked this conversation as resolved.
Show resolved Hide resolved
implementation library.java.vendored_guava_32_1_2_jre
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.Topic;
import org.apache.beam.sdk.io.solace.broker.SempClientFactory;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

public class SolaceIO {

private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;

/** Get a {@link Topic} object from the topic name. */
static Topic topicFromName(String topicName) {
return JCSMPFactory.onlyInstance().createTopic(topicName);
}

/** Get a {@link Queue} object from the queue name. */
static Queue queueFromName(String queueName) {
return JCSMPFactory.onlyInstance().createQueue(queueName);
}

@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {

/** Set the queue name to read from. Use this or the `from(Topic)` method. */
public Read<T> from(Solace.Queue queue) {
return toBuilder().setQueue(queueFromName(queue.getName())).build();
}

/** Set the topic name to read from. Use this or the `from(Queue)` method. */
public Read<T> from(Solace.Topic topic) {
return toBuilder().setTopic(topicFromName(topic.getName())).build();
}

/**
* The timestamp function, used for estimating the watermark, mapping the record T to an {@link
* Instant}
*
* <p>Optional when using the no-arg {@link SolaceIO#read()} method. Defaults to {@link
* SolaceIO#SENDER_TIMESTAMP_FUNCTION}. When using the {@link SolaceIO#read(TypeDescriptor,
* SerializableFunction, SerializableFunction)} method, the function mapping from T to {@link
* Instant} has to be passed as an argument.
*/
public Read<T> withTimestampFn(SerializableFunction<T, Instant> timestampFn) {
checkState(
timestampFn != null,
"SolaceIO.Read: timestamp function must be set or use the"
+ " `Read.readSolaceRecords()` method");
return toBuilder().setTimestampFn(timestampFn).build();
}

/**
* Optional. Sets the maximum number of connections to the broker. The actual number of sessions
* is determined by this and the number set by the runner. If not set, the number of sessions is
* determined by the runner. The number of connections created follows this logic:
* `numberOfConnections = min(maxNumConnections, desiredNumberOfSplits)`, where the
* `desiredNumberOfSplits` is set by the runner.
*/
public Read<T> withMaxNumConnections(Integer maxNumConnections) {
return toBuilder().setMaxNumConnections(maxNumConnections).build();
}

/**
* Optional, default: false. Set to deduplicate messages based on the {@link
* BytesXMLMessage#getApplicationMessageId()} of the incoming {@link BytesXMLMessage}. If the
* field is null, then the {@link BytesXMLMessage#getReplicationGroupMessageId()} will be used,
* which is always set by Solace.
*/
public Read<T> withDeduplicateRecords(boolean deduplicateRecords) {
return toBuilder().setDeduplicateRecords(deduplicateRecords).build();
}

/**
* Set a factory that creates a {@link org.apache.beam.sdk.io.solace.broker.SempClientFactory}.
*
* <p>The factory `create()` method is invoked in each instance of an {@link
* org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader}. Created {@link
* org.apache.beam.sdk.io.solace.broker.SempClient} has to communicate with broker management
* API. It must support operations such as:
*
* <ul>
* <li>query for outstanding backlog bytes in a Queue,
* <li>query for metadata such as access-type of a Queue,
* <li>requesting creation of new Queues.
* </ul>
*
* <p>An existing implementation of the SempClientFactory includes {@link
* org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} which implements connection
* to the SEMP with the Basic Authentication method.
*
* <p>To use it, specify the credentials with the builder methods.
*
* <p>The format of the host is `[Protocol://]Host[:Port]`
*
* <pre>{@code
* .withSempClientFactory(
* BasicAuthSempClientFactory.builder()
* .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
* .username("username")
* .password("password")
* .vpnName("vpn-name")
* .build())
* }</pre>
*/
public Read<T> withSempClientFactory(SempClientFactory sempClientFactory) {
checkState(sempClientFactory != null, "SolaceIO.Read: sempClientFactory must not be null.");
return toBuilder().setSempClientFactory(sempClientFactory).build();
}

/**
* Set a factory that creates a {@link SessionService}.
*
* <p>The factory `create()` method is invoked in each instance of an {@link
* org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader}. Created {@link SessionService} has
* to be able to:
*
* <ul>
* <li>initialize a connection with the broker,
* <li>check liveliness of the connection,
* <li>close the connection,
* <li>create a {@link org.apache.beam.sdk.io.solace.broker.MessageReceiver}.
* </ul>
*
* <p>An existing implementation of the SempClientFactory includes {@link
* org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService} which implements the Basic
* Authentication to Solace. *
*
* <p>To use it, specify the credentials with the builder methods. *
*
* <p>The host is the IPv4 or IPv6 or host name of the appliance. IPv5 addresses must be encoded
Abacn marked this conversation as resolved.
Show resolved Hide resolved
* in brackets ([]). For example, "12.34.56.78", or "[fe80::1]". If connecting to a non-default
* port, it can be specified here using the "Host:Port" format. For example, "12.34.56.78:4444",
* or "[fe80::1]:4444".
*
* <pre>{@code
* BasicAuthJcsmpSessionServiceFactory.builder()
* .host("your-host-name")
* // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
* .username("semp-username")
* .password("semp-password")
* .vpnName("vpn-name")
* .build()));
* }</pre>
*/
public Read<T> withSessionServiceFactory(SessionServiceFactory sessionServiceFactory) {
checkState(
sessionServiceFactory != null, "SolaceIO.Read: sessionServiceFactory must not be null.");
return toBuilder().setSessionServiceFactory(sessionServiceFactory).build();
}

abstract @Nullable Queue getQueue();

abstract @Nullable Topic getTopic();

abstract @Nullable SerializableFunction<T, Instant> getTimestampFn();

abstract @Nullable Integer getMaxNumConnections();

abstract boolean getDeduplicateRecords();

abstract SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn();

abstract @Nullable SempClientFactory getSempClientFactory();

abstract @Nullable SessionServiceFactory getSessionServiceFactory();

abstract TypeDescriptor<T> getTypeDescriptor();

public static <T> Builder<T> builder() {
Builder<T> builder = new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read.Builder<T>();
builder.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS);
return builder;
}

abstract Builder<T> toBuilder();

@AutoValue.Builder
public abstract static class Builder<T> {

abstract Builder<T> setQueue(Queue queue);

abstract Builder<T> setTopic(Topic topic);

abstract Builder<T> setTimestampFn(SerializableFunction<T, Instant> timestampFn);

abstract Builder<T> setMaxNumConnections(Integer maxNumConnections);

abstract Builder<T> setDeduplicateRecords(boolean deduplicateRecords);

abstract Builder<T> setParseFn(
SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn);

abstract Builder<T> setSempClientFactory(SempClientFactory brokerServiceFactory);

abstract Builder<T> setSessionServiceFactory(SessionServiceFactory sessionServiceFactory);

abstract Builder<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor);

abstract Read<T> build();
}

@Override
public PCollection<T> expand(PBegin input) {
throw new UnsupportedOperationException("");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import java.io.Serializable;

/**
* This interface serves as a blueprint for creating SempClient objects, which are used to interact
* with a Solace message broker using the Solace Element Management Protocol (SEMP).
*/
public interface SempClientFactory extends Serializable {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import java.io.Serializable;

/**
* This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a
* queue property and mandates the implementation of a create() method in concrete subclasses.
*/
public abstract class SessionServiceFactory implements Serializable {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Solace IO broker-related classes. */
package org.apache.beam.sdk.io.solace.broker;
Loading
Loading