From 17b71e39f0ceeb250fbd3a2ef73e97fdaed80158 Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Thu, 13 Jun 2024 17:23:37 +0400 Subject: [PATCH 01/12] Support Row fields with type ITERABLE in Beam SQL Extension (#31588) --- .../extensions/sql/impl/rel/BeamCalcRel.java | 5 ++++ .../sdk/extensions/sql/BeamSqlDslBase.java | 23 +++++++++++++------ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index d647027a9ae5..4895c1478766 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -395,6 +395,7 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu } return ((ByteString) value).getBytes(); case ARRAY: + case ITERABLE: return toBeamList((List) value, fieldType.getCollectionElementType(), verifyValues); case MAP: return toBeamMap( @@ -558,6 +559,9 @@ private static Expression getBeamField( case ROW: value = Expressions.call(expression, "getRow", fieldName); break; + case ITERABLE: + value = Expressions.call(expression, "getIterable", fieldName); + break; case LOGICAL_TYPE: String identifier = fieldType.getLogicalType().getIdentifier(); if (FixedString.IDENTIFIER.equals(identifier) @@ -634,6 +638,7 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) return nullOr( value, Expressions.new_(ByteString.class, Expressions.convert_(value, byte[].class))); case ARRAY: + case ITERABLE: return nullOr(value, toCalciteList(value, fieldType.getCollectionElementType())); case MAP: return nullOr(value, toCalciteMap(value, fieldType.getMapValueType())); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java index 426b95ae6df6..68b672e1814c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; @@ -94,6 +95,7 @@ public static void prepareClass() throws ParseException { .addDateTimeField("f_timestamp") .addInt32Field("f_int2") .addDecimalField("f_decimal") + .addIterableField("f_iterable", FieldType.STRING) .build(); rowsInTableA = @@ -111,7 +113,8 @@ public static void prepareClass() throws ParseException { LocalDateTime.of(2017, 1, 1, 1, 1, 3), parseTimestampWithoutTimeZone("2017-01-01 01:01:03"), 0, - new BigDecimal(1)) + new BigDecimal(1), + Lists.newArrayList("s1", "s2")) .addRows( 2, 2000L, @@ -125,7 +128,8 @@ public static void prepareClass() throws ParseException { LocalDateTime.of(2017, 1, 1, 1, 2, 3), parseTimestampWithoutTimeZone("2017-01-01 01:02:03"), 0, - new BigDecimal(2)) + new BigDecimal(2), + Lists.newArrayList("s1", "s2")) .addRows( 3, 3000L, @@ -139,7 +143,8 @@ public static void prepareClass() throws ParseException { LocalDateTime.of(2017, 1, 1, 1, 6, 3), parseTimestampWithoutTimeZone("2017-01-01 01:06:03"), 0, - new BigDecimal(3)) + new BigDecimal(3), + Lists.newArrayList("s1", "s2")) .addRows( 4, 4000L, @@ -153,7 +158,8 @@ public static void prepareClass() throws ParseException { LocalDateTime.of(2017, 1, 1, 2, 4, 3), parseTimestampWithoutTimeZone("2017-01-01 02:04:03"), 0, - new BigDecimal(4)) + new BigDecimal(4), + Lists.newArrayList("s1", "s2")) .getRows(); monthlyRowsInTableA = @@ -171,7 +177,8 @@ public static void prepareClass() throws ParseException { LocalDateTime.of(2017, 1, 1, 1, 1, 3), parseTimestampWithUTCTimeZone("2017-01-01 01:01:03"), 0, - new BigDecimal(1)) + new BigDecimal(1), + Lists.newArrayList("s1", "s2")) .addRows( 2, 2000L, @@ -185,7 +192,8 @@ public static void prepareClass() throws ParseException { LocalDateTime.of(2017, 1, 1, 1, 2, 3), parseTimestampWithUTCTimeZone("2017-02-01 01:02:03"), 0, - new BigDecimal(2)) + new BigDecimal(2), + Lists.newArrayList("s1", "s2")) .addRows( 3, 3000L, @@ -199,7 +207,8 @@ public static void prepareClass() throws ParseException { LocalDateTime.of(2017, 1, 1, 1, 6, 3), parseTimestampWithUTCTimeZone("2017-03-01 01:06:03"), 0, - new BigDecimal(3)) + new BigDecimal(3), + Lists.newArrayList("s1", "s2")) .getRows(); schemaFloatDouble = From 814cf43140fd31139de7faf1a3f00694df53cb18 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 13 Jun 2024 15:30:34 +0200 Subject: [PATCH 02/12] Call out enrichment in side input patterns (#31578) --- .../www/site/content/en/documentation/patterns/side-inputs.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/website/www/site/content/en/documentation/patterns/side-inputs.md b/website/www/site/content/en/documentation/patterns/side-inputs.md index 814a735bf724..136eeef29ada 100644 --- a/website/www/site/content/en/documentation/patterns/side-inputs.md +++ b/website/www/site/content/en/documentation/patterns/side-inputs.md @@ -19,6 +19,8 @@ limitations under the License. The samples on this page show you common Beam side input patterns. A side input is an additional input that your `DoFn` can access each time it processes an element in the input `PCollection`. For more information, see the [programming guide section on side inputs](/documentation/programming-guide/#side-inputs). +If you are trying to enrich your data by doing a key-value lookup to a remote service, you may first want to consider the [Enrichment transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment/) which can abstract away some of the details of side inputs and provide additional benefits like client-side throttling. + {{< language-switcher java py >}} ## Slowly updating global window side inputs From fd5b1de4f1daaa1c3fb930593f9b27062412d38b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 13 Jun 2024 10:46:45 -0400 Subject: [PATCH 03/12] Fix StreamConstraintsException introduced in jackson 2.15 (#31580) * Fix StreamConstraintsException introduced in jackson 2.15 * Fix spotless * Fix checkstyle * Add changes.md --- CHANGES.md | 3 ++ .../apache/beam/sdk/util/RowJsonUtils.java | 32 +++++++++++++++++++ .../io/gcp/bigquery/TableRowJsonCoder.java | 24 +++++++++----- .../gcp/bigquery/TableRowJsonCoderTest.java | 8 +++++ 4 files changed, 59 insertions(+), 8 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a93e7089e7b7..f970a4087887 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -133,6 +133,9 @@ * Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform * All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381) should be updated to use new snake_case parameter names. +* Upgraded Jackson Databind to 2.15.4 (Java) ([#26743](https://github.com/apache/beam/issues/26743)). + jackson-2.15 has known breaking changes. An important one is it imposed a buffer limit for parser. + If your custom PTransform/DoFn are affected, refer to [#31580](https://github.com/apache/beam/pull/31580) for mitigation. # [2.56.0] - 2024-05-01 diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java index 6538a1459290..408143fb1ebe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java @@ -34,6 +34,38 @@ @Internal public class RowJsonUtils { + // + private static int defaultBufferLimit; + + /** + * Increase the default jackson-databind stream read constraint. + * + *

StreamReadConstraints was introduced in jackson 2.15 causing string > 20MB (5MB in 2.15.0) + * parsing failure. This has caused regressions in its dependencies include Beam. Here we + * overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit. + * If needed, call this method during pipeline run time, e.g. in DoFn.setup. + */ + public static void increaseDefaultStreamReadConstraints(int newLimit) { + if (newLimit <= defaultBufferLimit) { + return; + } + try { + Class unused = Class.forName("com.fasterxml.jackson.core.StreamReadConstraints"); + + com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints( + com.fasterxml.jackson.core.StreamReadConstraints.builder() + .maxStringLength(newLimit) + .build()); + } catch (ClassNotFoundException e) { + // <2.15, do nothing + } + defaultBufferLimit = newLimit; + } + + static { + increaseDefaultStreamReadConstraints(100 * 1024 * 1024); + } + public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) { SimpleModule module = new SimpleModule("rowDeserializationModule"); module.addDeserializer(Row.class, deserializer); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java index 9b80c0b552b6..8cf3eeb479c0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.util.RowJsonUtils; import org.apache.beam.sdk.values.TypeDescriptor; /** A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. */ @@ -69,15 +70,22 @@ public long getEncodedElementByteSize(TableRow value) throws Exception { // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in // TableRow. - private static final ObjectMapper MAPPER = - new ObjectMapper() - .registerModule(new JavaTimeModule()) - .registerModule(new JodaModule()) - .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) - .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + private static final ObjectMapper MAPPER;; + private static final TableRowJsonCoder INSTANCE; + private static final TypeDescriptor TYPE_DESCRIPTOR; - private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder(); - private static final TypeDescriptor TYPE_DESCRIPTOR = new TypeDescriptor() {}; + static { + RowJsonUtils.increaseDefaultStreamReadConstraints(100 * 1024 * 1024); + + MAPPER = + new ObjectMapper() + .registerModule(new JavaTimeModule()) + .registerModule(new JodaModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + INSTANCE = new TableRowJsonCoder(); + TYPE_DESCRIPTOR = new TypeDescriptor() {}; + } private TableRowJsonCoder() {} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java index b09832085a7d..9e767be48d81 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.commons.lang3.StringUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -67,6 +68,13 @@ public void testDecodeEncodeEqual() throws Exception { } } + @Test + public void testLargeRow() throws Exception { + String val = StringUtils.repeat("BEAM", 10 * 1024 * 1024); // 40 MB + TableRow testValue = new TableRowBuilder().set("a", val).set("b", "1").build(); + CoderProperties.coderDecodeEncodeEqual(TEST_CODER, testValue); + } + /** * Generated data to check that the wire format has not changed. To regenerate, see {@link * org.apache.beam.sdk.coders.PrintBase64Encodings}. From af31d35a0007648b0fc44d0bcb1d6f94cb88d706 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 13 Jun 2024 14:46:36 -0400 Subject: [PATCH 04/12] Implement ordered list state for FnApi. (#30317) * Add request and response proto messages for ordered list state. * Initial implementation of OrderedListState for fnApi. * Discard the use of the value coder in FakeBeamFnStateClient. * Fix the behavior of pre-existing iterators on local change of state If there are changes on a state after we obtain iterators from calling read() and readRange(), the behavior of these pre-existing iterators were incorrect in the previous implementation. The change introduced here will make sure that these iterators will still work as if no local change is made. * Support continuation token for ordered list get request in fake client * Add copyright notices to the new files * Add binding for ordered list state in fnapi state accessor * Clean up comments * Apply spotless and checkStyle to reformat * Add an encode-only coder for the use in the fake client. * Remove request and response messages for ordered list state get. * The range information is placed in the state key of ordered list * For consistency, we reuse the existing get request and response mesasages of other states like Bag, MultiMap, etc. * Remove request and response messages for ordered list state update * Reuse existing messages of clear and append. * Minor fixes based on feedbacks from reviewers * Replace String::size() > 0 with String::isEmpty() * Return this in readLater and readRangeLater instead of throwing an exception * Remove the added SupressWarnings("unchecked") * Apply spotless * Use data field in AppendRequest for ordered list state Previously, we used a repeated OrderedListEntry field in the AppendRequest particularly for ordered list state. For consistency, we now get rid of that and use the same data field as other states. * Apply spotless * Minor renaming of a variable * Create a new coder for TimestampedValue according to the notes in proto. * Address feedback from the reviewer - Add a test to cover the case when an add/clear operation happens while we are partway through an existing iterable. - Use clear() instead of clearRange(min, max) when we can. - Fix a typo. * Apply spotless * Add urn for ordered list state. * Add ordered list spec to ParDoTranslation. * Fix an edge case when async called after clear. Minor fix based on reviwer comments. * Refactor some variable names. Add a notes on the order of pendingAdds and pendingRemoves during async_close() --- .../model/pipeline/v1/beam_runner_api.proto | 6 +- .../util/construction/ParDoTranslation.java | 12 +- .../construction/ParDoTranslationTest.java | 4 + .../fn/harness/state/FnApiStateAccessor.java | 96 ++- .../harness/state/OrderedListUserState.java | 329 +++++++++ .../harness/state/StateFetchingIterators.java | 7 +- .../harness/state/FakeBeamFnStateClient.java | 205 +++++- .../state/OrderedListUserStateTest.java | 684 ++++++++++++++++++ 8 files changed, 1314 insertions(+), 29 deletions(-) create mode 100644 sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java create mode 100644 sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 4672c98fd073..422c2e1a5f7c 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -479,7 +479,11 @@ message StandardUserStateTypes { // StateKey.MultimapKeysUserState or StateKey.MultimapUserState. MULTIMAP = 1 [(beam_urn) = "beam:user_state:multimap:v1"]; - // TODO(https://github.com/apache/beam/issues/20486): Add protocol to support OrderedListState + // Represents a user state specification that supports an ordered list. + // + // StateRequests performed on this user state must use + // StateKey.OrderedListUserState. + ORDERED_LIST = 2 [(beam_urn) = "beam:user_state:ordered_list:v1"]; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ParDoTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ParDoTranslation.java index 23906c733ae3..c873a2a7b860 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ParDoTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ParDoTranslation.java @@ -119,6 +119,8 @@ public class ParDoTranslation { public static final String BAG_USER_STATE = "beam:user_state:bag:v1"; /** Represents a user state specification that supports a multimap. */ public static final String MULTIMAP_USER_STATE = "beam:user_state:multimap:v1"; + /** Represents a user state specification that supports an ordered list. */ + public static final String ORDERED_LIST_USER_STATE = "beam:user_state:ordered_list:v1"; static { checkState( @@ -141,6 +143,8 @@ public class ParDoTranslation { BeamUrns.getUrn(StandardRequirements.Enum.REQUIRES_ON_WINDOW_EXPIRATION))); checkState(BAG_USER_STATE.equals(BeamUrns.getUrn(StandardUserStateTypes.Enum.BAG))); checkState(MULTIMAP_USER_STATE.equals(BeamUrns.getUrn(StandardUserStateTypes.Enum.MULTIMAP))); + checkState( + ORDERED_LIST_USER_STATE.equals(BeamUrns.getUrn(StandardUserStateTypes.Enum.ORDERED_LIST))); } /** The URN for an unknown Java {@link DoFn}. */ @@ -601,9 +605,7 @@ public RunnerApi.StateSpec dispatchOrderedList(Coder elementCoder) { .setOrderedListSpec( RunnerApi.OrderedListStateSpec.newBuilder() .setElementCoderId(registerCoderOrThrow(components, elementCoder))) - // TODO(https://github.com/apache/beam/issues/20486): Update with correct protocol - // once the protocol is defined and - // the SDK harness uses it. + .setProtocol(FunctionSpec.newBuilder().setUrn(ORDERED_LIST_USER_STATE)) .build(); } @@ -694,6 +696,10 @@ static StateSpec fromProto(RunnerApi.StateSpec stateSpec, RehydratedComponent case SET_SPEC: return StateSpecs.set(components.getCoder(stateSpec.getSetSpec().getElementCoderId())); + case ORDERED_LIST_SPEC: + return StateSpecs.orderedList( + components.getCoder(stateSpec.getOrderedListSpec().getElementCoderId())); + case SPEC_NOT_SET: default: throw new IllegalArgumentException( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/ParDoTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/ParDoTranslationTest.java index fa308163ed3a..6ef836038196 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/ParDoTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/ParDoTranslationTest.java @@ -245,6 +245,10 @@ public static Iterable stateSpecs() { { StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of()), FunctionSpec.newBuilder().setUrn(ParDoTranslation.MULTIMAP_USER_STATE).build() + }, + { + StateSpecs.orderedList(VarIntCoder.of()), + FunctionSpec.newBuilder().setUrn(ParDoTranslation.ORDERED_LIST_USER_STATE).build() } }); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java index cfc247039503..93f89301d158 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.GroupingState; import org.apache.beam.sdk.state.MapState; import org.apache.beam.sdk.state.MultimapState; import org.apache.beam.sdk.state.OrderedListState; @@ -63,12 +64,14 @@ import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.construction.BeamUrns; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; /** Provides access to side inputs and state via a {@link BeamFnStateClient}. */ @SuppressWarnings({ @@ -600,8 +603,73 @@ public MultimapState bindMultimap( @Override public OrderedListState bindOrderedList( String id, StateSpec> spec, Coder elemCoder) { - throw new UnsupportedOperationException( - "TODO: Add support for a sorted-list state to the Fn API."); + return (OrderedListState) + stateKeyObjectCache.computeIfAbsent( + createOrderedListUserStateKey(id), + new Function() { + @Override + public Object apply(StateKey key) { + return new OrderedListState() { + private final OrderedListUserState impl = + createOrderedListUserState(key, elemCoder); + + @Override + public void clear() { + impl.clear(); + } + + @Override + public void add(TimestampedValue value) { + impl.add(value); + } + + @Override + public ReadableState isEmpty() { + return new ReadableState() { + @Override + public @Nullable Boolean read() { + return !impl.read().iterator().hasNext(); + } + + @Override + public ReadableState readLater() { + return this; + } + }; + } + + @Nullable + @Override + public Iterable> read() { + return readRange( + Instant.ofEpochMilli(Long.MIN_VALUE), Instant.ofEpochMilli(Long.MAX_VALUE)); + } + + @Override + public GroupingState, Iterable>> + readLater() { + return this; + } + + @Override + public Iterable> readRange( + Instant minTimestamp, Instant limitTimestamp) { + return impl.readRange(minTimestamp, limitTimestamp); + } + + @Override + public void clearRange(Instant minTimestamp, Instant limitTimestamp) { + impl.clearRange(minTimestamp, limitTimestamp); + } + + @Override + public OrderedListState readRangeLater( + Instant minTimestamp, Instant limitTimestamp) { + return this; + } + }; + } + }); } @Override @@ -849,6 +917,30 @@ private StateKey createMultimapKeysUserStateKey(String stateId) { return builder.build(); } + private OrderedListUserState createOrderedListUserState( + StateKey stateKey, Coder valueCoder) { + OrderedListUserState rval = + new OrderedListUserState<>( + getCacheFor(stateKey), + beamFnStateClient, + processBundleInstructionId.get(), + stateKey, + valueCoder); + stateFinalizers.add(rval::asyncClose); + return rval; + } + + private StateKey createOrderedListUserStateKey(String stateId) { + StateKey.Builder builder = StateKey.newBuilder(); + builder + .getOrderedListUserStateBuilder() + .setWindow(encodedCurrentWindowSupplier.get()) + .setKey(encodedCurrentKeySupplier.get()) + .setTransformId(ptransformId) + .setUserStateId(stateId); + return builder.build(); + } + public void finalizeState() { // Persist all dirty state cells try { diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java new file mode 100644 index 000000000000..47b5057880b9 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of an ordered list user state that utilizes the Beam Fn State API to fetch, + * clear and persist values. + * + *

Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + *

TODO: Move to an async persist model where persistence is signalled based upon cache memory + * pressure and its need to flush. + */ +public class OrderedListUserState { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest requestTemplate; + private final TimestampedValueCoder timestampedValueCoder; + // Pending updates to persistent storage + // (a) The elements in pendingAdds are the ones that should be added to the persistent storage + // during the next async_close(). It doesn't include the ones that are removed by + // clear_range() or clear() after the last add. + // (b) The elements in pendingRemoves are the sort keys that should be removed from the persistent + // storage. + // (c) When syncing local copy with persistent storage, pendingRemoves are performed first and + // then pendingAdds. Switching this order may result in wrong results, because a value added + // later could be removed from an earlier clear. + private NavigableMap> pendingAdds = Maps.newTreeMap(); + private TreeRangeSet pendingRemoves = TreeRangeSet.create(); + + private boolean isCleared = false; + private boolean isClosed = false; + + public static class TimestampedValueCoder extends StructuredCoder> { + + private final Coder valueCoder; + + // Internally, a TimestampedValue is encoded with a KvCoder, where the key is encoded with + // a VarLongCoder and the value is encoded with a LengthPrefixCoder. + // Refer to the comment in StateAppendRequest + // (org/apache/beam/model/fn_execution/v1/beam_fn_api.proto) for more detail. + private final KvCoder internalKvCoder; + + public static OrderedListUserState.TimestampedValueCoder of(Coder valueCoder) { + return new OrderedListUserState.TimestampedValueCoder<>(valueCoder); + } + + @Override + public Object structuralValue(TimestampedValue value) { + Object structuralValue = valueCoder.structuralValue(value.getValue()); + return TimestampedValue.of(structuralValue, value.getTimestamp()); + } + + @SuppressWarnings("unchecked") + TimestampedValueCoder(Coder valueCoder) { + this.valueCoder = checkNotNull(valueCoder); + this.internalKvCoder = KvCoder.of(VarLongCoder.of(), LengthPrefixCoder.of(valueCoder)); + } + + @Override + public void encode(TimestampedValue timestampedValue, OutputStream outStream) + throws IOException { + internalKvCoder.encode( + KV.of(timestampedValue.getTimestamp().getMillis(), timestampedValue.getValue()), + outStream); + } + + @Override + public TimestampedValue decode(InputStream inStream) throws IOException { + KV kv = internalKvCoder.decode(inStream); + return TimestampedValue.of(kv.getValue(), Instant.ofEpochMilli(kv.getKey())); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic( + this, "TimestampedValueCoder requires a deterministic valueCoder", valueCoder); + } + + @Override + public List> getCoderArguments() { + return Arrays.>asList(valueCoder); + } + + public Coder getValueCoder() { + return valueCoder; + } + + @Override + public TypeDescriptor> getEncodedTypeDescriptor() { + return new TypeDescriptor>() {}.where( + new TypeParameter() {}, valueCoder.getEncodedTypeDescriptor()); + } + + @Override + public List> getComponents() { + return Collections.singletonList(valueCoder); + } + } + + public OrderedListUserState( + Cache cache, + BeamFnStateClient beamFnStateClient, + String instructionId, + StateKey stateKey, + Coder valueCoder) { + checkArgument( + stateKey.hasOrderedListUserState(), + "Expected OrderedListUserState StateKey but received %s.", + stateKey); + this.beamFnStateClient = beamFnStateClient; + this.timestampedValueCoder = TimestampedValueCoder.of(valueCoder); + this.requestTemplate = + StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(); + } + + public void add(TimestampedValue value) { + checkState( + !isClosed, + "OrderedList user state is no longer usable because it is closed for %s", + requestTemplate.getStateKey()); + Instant timestamp = value.getTimestamp(); + pendingAdds.putIfAbsent(timestamp, new ArrayList<>()); + pendingAdds.get(timestamp).add(value.getValue()); + } + + public Iterable> readRange(Instant minTimestamp, Instant limitTimestamp) { + checkState( + !isClosed, + "OrderedList user state is no longer usable because it is closed for %s", + requestTemplate.getStateKey()); + + // Store pendingAdds whose sort key is in the query range and values are truncated by the + // current size. The values (collections) of pendingAdds are kept, so that they will still be + // accessible in pre-existing iterables even after: + // (1) a sort key is added to or removed from pendingAdds, or + // (2) a new value is added to an existing sort key + ArrayList>> pendingAddsInRange = new ArrayList<>(); + for (Entry> kv : + pendingAdds.subMap(minTimestamp, limitTimestamp).entrySet()) { + pendingAddsInRange.add( + PrefetchableIterables.limit( + Iterables.transform(kv.getValue(), (v) -> TimestampedValue.of(v, kv.getKey())), + kv.getValue().size())); + } + Iterable> valuesInRange = Iterables.concat(pendingAddsInRange); + + if (!isCleared) { + StateRequest.Builder getRequestBuilder = this.requestTemplate.toBuilder(); + getRequestBuilder + .getStateKeyBuilder() + .getOrderedListUserStateBuilder() + .getRangeBuilder() + .setStart(minTimestamp.getMillis()) + .setEnd(limitTimestamp.getMillis()); + + // TODO: consider use cache here + CachingStateIterable> persistentValues = + StateFetchingIterators.readAllAndDecodeStartingFrom( + Caches.noop(), + this.beamFnStateClient, + getRequestBuilder.build(), + this.timestampedValueCoder); + + // Make a snapshot of the current pendingRemoves and use them to filter persistent values. + // The values of pendingRemoves are copied, so that they will still be accessible in + // pre-existing iterables even after a sort key is removed. + TreeRangeSet pendingRemovesSnapshot = TreeRangeSet.create(pendingRemoves); + Iterable> persistentValuesAfterRemoval = + Iterables.filter( + persistentValues, v -> !pendingRemovesSnapshot.contains(v.getTimestamp())); + + return Iterables.mergeSorted( + ImmutableList.of(persistentValuesAfterRemoval, valuesInRange), + Comparator.comparing(TimestampedValue::getTimestamp)); + } + + return valuesInRange; + } + + public Iterable> read() { + checkState( + !isClosed, + "OrderedList user state is no longer usable because it is closed for %s", + requestTemplate.getStateKey()); + + return readRange(Instant.ofEpochMilli(Long.MIN_VALUE), Instant.ofEpochMilli(Long.MAX_VALUE)); + } + + public void clearRange(Instant minTimestamp, Instant limitTimestamp) { + checkState( + !isClosed, + "OrderedList user state is no longer usable because it is closed for %s", + requestTemplate.getStateKey()); + + // Remove items (in a collection) in the specific range from pendingAdds. + // The old values of the removed sub map are kept, so that they will still be accessible in + // pre-existing iterables even after the sort key is cleared. + pendingAdds.subMap(minTimestamp, limitTimestamp).clear(); + if (!isCleared) { + pendingRemoves.add( + Range.range(minTimestamp, BoundType.CLOSED, limitTimestamp, BoundType.OPEN)); + } + } + + public void clear() { + checkState( + !isClosed, + "OrderedList user state is no longer usable because it is closed for %s", + requestTemplate.getStateKey()); + isCleared = true; + // Create a new object for pendingRemoves and clear the mappings in pendingAdds. + // The entire tree range set of pendingRemoves and the old values in the pendingAdds are kept, + // so that they will still be accessible in pre-existing iterables even after the state is + // cleared. + pendingRemoves = TreeRangeSet.create(); + pendingRemoves.add( + Range.range( + Instant.ofEpochMilli(Long.MIN_VALUE), + BoundType.CLOSED, + Instant.ofEpochMilli(Long.MAX_VALUE), + BoundType.OPEN)); + pendingAdds.clear(); + } + + public void asyncClose() throws Exception { + isClosed = true; + + if (!pendingRemoves.isEmpty()) { + for (Range r : pendingRemoves.asRanges()) { + StateRequest.Builder stateRequest = this.requestTemplate.toBuilder(); + stateRequest.setClear(StateClearRequest.newBuilder().build()); + stateRequest + .getStateKeyBuilder() + .getOrderedListUserStateBuilder() + .getRangeBuilder() + .setStart(r.lowerEndpoint().getMillis()) + .setEnd(r.upperEndpoint().getMillis()); + + CompletableFuture response = beamFnStateClient.handle(stateRequest); + if (!response.get().getError().isEmpty()) { + throw new IllegalStateException(response.get().getError()); + } + } + pendingRemoves.clear(); + } + + if (!pendingAdds.isEmpty()) { + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + for (Entry> entry : pendingAdds.entrySet()) { + for (T v : entry.getValue()) { + TimestampedValue tv = TimestampedValue.of(v, entry.getKey()); + try { + timestampedValueCoder.encode(tv, outStream); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + StateRequest.Builder stateRequest = this.requestTemplate.toBuilder(); + stateRequest.getAppendBuilder().setData(outStream.toByteString()); + + CompletableFuture response = beamFnStateClient.handle(stateRequest); + if (!response.get().getError().isEmpty()) { + throw new IllegalStateException(response.get().getError()); + } + pendingAdds.clear(); + } + } +} diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java index 6f8622d736fe..3b9fccfa2a5e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java @@ -605,13 +605,16 @@ public ByteString next() { } prefetchedResponse = null; + ByteString tokenFromResponse = stateResponse.getGet().getContinuationToken(); + // If the continuation token is empty, that means we have reached EOF. - if (ByteString.EMPTY.equals(stateResponse.getGet().getContinuationToken())) { + if (ByteString.EMPTY.equals(tokenFromResponse)) { continuationToken = null; } else { - continuationToken = stateResponse.getGet().getContinuationToken(); + continuationToken = tokenFromResponse; prefetch(); } + return stateResponse.getGet().getData(); } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java index 468b4f6b4251..34dec41771b0 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java @@ -17,17 +17,25 @@ */ package org.apache.beam.fn.harness.state; +import static org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase.GET; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableSet; +import java.util.NoSuchElementException; +import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.beam.fn.harness.state.OrderedListUserState.TimestampedValueCoder; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendResponse; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearResponse; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetResponse; @@ -36,9 +44,14 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -47,6 +60,7 @@ public class FakeBeamFnStateClient implements BeamFnStateClient { private static final int DEFAULT_CHUNK_SIZE = 6; private final Map> data; private int currentId; + private final Map> orderedListSortKeysFromStateKey; public FakeBeamFnStateClient(Coder valueCoder, Map> initialData) { this(valueCoder, initialData, DEFAULT_CHUNK_SIZE); @@ -97,6 +111,27 @@ public FakeBeamFnStateClient(Map, List>> initialData, i } return chunks; })); + + List orderedListStateKeys = + initialData.keySet().stream() + .filter((k) -> k.getTypeCase() == TypeCase.ORDERED_LIST_USER_STATE) + .collect(Collectors.toList()); + + this.orderedListSortKeysFromStateKey = new HashMap<>(); + for (StateKey key : orderedListStateKeys) { + long sortKey = key.getOrderedListUserState().getRange().getStart(); + + StateKey.Builder keyBuilder = key.toBuilder(); + + // clear the range in the state key before using it as a key to store, because ordered list + // with different ranges would be mapped to the same set of sort keys. + keyBuilder.getOrderedListUserStateBuilder().clearRange(); + + this.orderedListSortKeysFromStateKey + .computeIfAbsent(keyBuilder.build(), (unused) -> new TreeSet<>()) + .add(sortKey); + } + this.data = new ConcurrentHashMap<>( Maps.filterValues(encodedData, byteStrings -> !byteStrings.isEmpty())); @@ -134,7 +169,7 @@ public CompletableFuture handle(StateRequest.Builder requestBuild assertNotEquals(TypeCase.TYPE_NOT_SET, key.getTypeCase()); // multimap side input and runner based state keys only support get requests if (key.getTypeCase() == TypeCase.MULTIMAP_SIDE_INPUT || key.getTypeCase() == TypeCase.RUNNER) { - assertEquals(RequestCase.GET, request.getRequestCase()); + assertEquals(GET, request.getRequestCase()); } if (key.getTypeCase() == TypeCase.MULTIMAP_KEYS_VALUES_SIDE_INPUT && !data.containsKey(key)) { // Allow testing this not being supported rather than blindly returning the empty list. @@ -143,34 +178,162 @@ public CompletableFuture handle(StateRequest.Builder requestBuild switch (request.getRequestCase()) { case GET: - List byteStrings = - data.getOrDefault(request.getStateKey(), Collections.singletonList(ByteString.EMPTY)); - int block = 0; - if (request.getGet().getContinuationToken().size() > 0) { - block = Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8()); - } - ByteString returnBlock = byteStrings.get(block); - ByteString continuationToken = ByteString.EMPTY; - if (byteStrings.size() > block + 1) { - continuationToken = ByteString.copyFromUtf8(Integer.toString(block + 1)); + if (key.getTypeCase() == TypeCase.ORDERED_LIST_USER_STATE) { + long start = key.getOrderedListUserState().getRange().getStart(); + long end = key.getOrderedListUserState().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (!request.getGet().getContinuationToken().isEmpty()) { + try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + try { + if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); + } + + StateKey.Builder stateKeyWithoutRange = request.getStateKey().toBuilder(); + stateKeyWithoutRange.getOrderedListUserStateBuilder().clearRange(); + NavigableSet subset = + orderedListSortKeysFromStateKey + .getOrDefault(stateKeyWithoutRange.build(), new TreeSet<>()) + .subSet(sortKey, true, end, false); + + // get the effective sort key currently, can throw NoSuchElementException + Long nextSortKey = subset.first(); + + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder + .getOrderedListUserStateBuilder() + .getRangeBuilder() + .setStart(nextSortKey) + .setEnd(nextSortKey + 1); + List byteStrings = + data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + + // get the block specified in continuation token, can throw IndexOutOfBoundsException + returnBlock = byteStrings.get(index); + + if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; + } else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; + } + + ByteStringOutputStream outputStream = new ByteStringOutputStream(); + try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { + continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setGet( + StateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); + } else { + List byteStrings = + data.getOrDefault(request.getStateKey(), Collections.singletonList(ByteString.EMPTY)); + int block = 0; + if (!request.getGet().getContinuationToken().isEmpty()) { + block = Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8()); + } + ByteString returnBlock = byteStrings.get(block); + ByteString continuationToken = ByteString.EMPTY; + if (byteStrings.size() > block + 1) { + continuationToken = ByteString.copyFromUtf8(Integer.toString(block + 1)); + } + response = + StateResponse.newBuilder() + .setGet( + StateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); } - response = - StateResponse.newBuilder() - .setGet( - StateGetResponse.newBuilder() - .setData(returnBlock) - .setContinuationToken(continuationToken)); break; case CLEAR: - data.remove(request.getStateKey()); + if (key.getTypeCase() == TypeCase.ORDERED_LIST_USER_STATE) { + OrderedListRange r = request.getStateKey().getOrderedListUserState().getRange(); + StateKey.Builder stateKeyWithoutRange = request.getStateKey().toBuilder(); + stateKeyWithoutRange.getOrderedListUserStateBuilder().clearRange(); + + List keysToRemove = + new ArrayList<>( + orderedListSortKeysFromStateKey + .getOrDefault(stateKeyWithoutRange.build(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + for (Long l : keysToRemove) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().getRangeBuilder().setStart(l).setEnd(l + 1); + data.remove(keyBuilder.build()); + orderedListSortKeysFromStateKey.get(stateKeyWithoutRange.build()).remove(l); + } + } else { + data.remove(request.getStateKey()); + } response = StateResponse.newBuilder().setClear(StateClearResponse.getDefaultInstance()); break; case APPEND: - List previousValue = - data.computeIfAbsent(request.getStateKey(), (unused) -> new ArrayList<>()); - previousValue.add(request.getAppend().getData()); + if (key.getTypeCase() == TypeCase.ORDERED_LIST_USER_STATE) { + InputStream inStream = request.getAppend().getData().newInput(); + TimestampedValueCoder coder = TimestampedValueCoder.of(ByteArrayCoder.of()); + try { + while (inStream.available() > 0) { + TimestampedValue tv = coder.decode(inStream); + ByteStringOutputStream outStream = new ByteStringOutputStream(); + coder.encode(tv, outStream); + ByteString output = outStream.toByteString(); + + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + long sortKey = tv.getTimestamp().getMillis(); + keyBuilder + .getOrderedListUserStateBuilder() + .getRangeBuilder() + .setStart(sortKey) + .setEnd(sortKey + 1); + + List previousValues = + data.computeIfAbsent(keyBuilder.build(), (unused) -> new ArrayList<>()); + previousValues.add(output); + + StateKey.Builder stateKeyWithoutRange = request.getStateKey().toBuilder(); + stateKeyWithoutRange.getOrderedListUserStateBuilder().clearRange(); + orderedListSortKeysFromStateKey + .computeIfAbsent(stateKeyWithoutRange.build(), (unused) -> new TreeSet<>()) + .add(sortKey); + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } else { + List previousValue = + data.computeIfAbsent(request.getStateKey(), (unused) -> new ArrayList<>()); + previousValue.add(request.getAppend().getData()); + } response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java new file mode 100644 index 000000000000..efade508843b --- /dev/null +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java @@ -0,0 +1,684 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static java.util.Arrays.asList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.OrderedListUserState.TimestampedValueCoder; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OrderedListUserStateTest { + private static final TimestampedValue A1 = + TimestampedValue.of("A1", Instant.ofEpochMilli(1)); + private static final TimestampedValue B1 = + TimestampedValue.of("B1", Instant.ofEpochMilli(1)); + private static final TimestampedValue C1 = + TimestampedValue.of("C1", Instant.ofEpochMilli(1)); + private static final TimestampedValue A2 = + TimestampedValue.of("A2", Instant.ofEpochMilli(2)); + private static final TimestampedValue B2 = + TimestampedValue.of("B2", Instant.ofEpochMilli(2)); + private static final TimestampedValue A3 = + TimestampedValue.of("A3", Instant.ofEpochMilli(3)); + private static final TimestampedValue A4 = + TimestampedValue.of("A4", Instant.ofEpochMilli(4)); + + private final String pTransformId = "pTransformId"; + private final String stateId = "stateId"; + private final String encodedWindow = "encodedWindow"; + private final Coder> timestampedValueCoder = + TimestampedValueCoder.of(StringUtf8Coder.of()); + + @Test + public void testNoPersistedValues() throws Exception { + FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(Collections.emptyMap()); + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + assertThat(userState.read(), is(emptyIterable())); + } + + @Test + public void testRead() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, B1))); + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + assertArrayEquals( + asList(A1, B1).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); + userState.asyncClose(); + assertThrows(IllegalStateException.class, () -> userState.read()); + } + + @Test + public void testReadRange() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), asList(A1, B1), + createOrderedListStateKey("A", 4), Collections.singletonList(A4), + createOrderedListStateKey("A", 2), Collections.singletonList(A2))); + + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + Iterable> stateBeforeB2 = + userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)); + assertArrayEquals( + Collections.singletonList(A2).toArray(), + Iterables.toArray(stateBeforeB2, TimestampedValue.class)); + + // Add a new value to an existing sort key + userState.add(B2); + assertArrayEquals( + Collections.singletonList(A2).toArray(), + Iterables.toArray(stateBeforeB2, TimestampedValue.class)); + assertArrayEquals( + asList(A2, B2).toArray(), + Iterables.toArray( + userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)), + TimestampedValue.class)); + + // Add a new value to a new sort key + userState.add(A3); + assertArrayEquals( + Collections.singletonList(A2).toArray(), + Iterables.toArray(stateBeforeB2, TimestampedValue.class)); + assertArrayEquals( + asList(A2, B2, A3).toArray(), + Iterables.toArray( + userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)), + TimestampedValue.class)); + + userState.asyncClose(); + assertThrows( + IllegalStateException.class, + () -> userState.readRange(Instant.ofEpochMilli(1), Instant.ofEpochMilli(2))); + } + + @Test + public void testAdd() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4), + createOrderedListStateKey("A", 2), + asList(A2, B2))); + + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + // add to an existing timestamp + userState.add(B1); + assertArrayEquals( + asList(A1, B1, A2, B2, A4).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + + // add to a nonexistent timestamp + userState.add(A3); + assertArrayEquals( + asList(A1, B1, A2, B2, A3, A4).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + + // add a duplicated value + userState.add(B1); + assertArrayEquals( + asList(A1, B1, B1, A2, B2, A3, A4).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + + userState.asyncClose(); + assertThrows(IllegalStateException.class, () -> userState.add(A1)); + } + + @Test + public void testClearRange() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + asList(A1, B1), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4), + createOrderedListStateKey("A", 2), + asList(A2, B2), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3))); + + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + Iterable> initStateFrom2To3 = + userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)); + + // clear range below the current timestamp range + userState.clearRange(Instant.ofEpochMilli(-1), Instant.ofEpochMilli(0)); + assertArrayEquals( + asList(A2, B2, A3).toArray(), Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); + assertArrayEquals( + asList(A1, B1, A2, B2, A3, A4).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + + // clear range above the current timestamp range + userState.clearRange(Instant.ofEpochMilli(5), Instant.ofEpochMilli(10)); + assertArrayEquals( + asList(A2, B2, A3).toArray(), Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); + assertArrayEquals( + asList(A1, B1, A2, B2, A3, A4).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + + // clear range that falls inside the current timestamp range + userState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)); + assertArrayEquals( + asList(A2, B2, A3).toArray(), Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); + assertArrayEquals( + asList(A1, B1, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); + + // clear range that partially covers the current timestamp range + userState.clearRange(Instant.ofEpochMilli(3), Instant.ofEpochMilli(5)); + assertArrayEquals( + asList(A2, B2, A3).toArray(), Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); + assertArrayEquals( + asList(A1, B1).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); + + // clear range that fully covers the current timestamp range + userState.clearRange(Instant.ofEpochMilli(-1), Instant.ofEpochMilli(10)); + assertArrayEquals( + asList(A2, B2, A3).toArray(), Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); + assertThat(userState.read(), is(emptyIterable())); + + userState.asyncClose(); + assertThrows( + IllegalStateException.class, + () -> userState.clearRange(Instant.ofEpochMilli(1), Instant.ofEpochMilli(2))); + } + + @Test + public void testClear() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + asList(A1, B1), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4), + createOrderedListStateKey("A", 2), + asList(A2, B2), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3))); + + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + Iterable> stateBeforeClear = userState.read(); + userState.clear(); + assertArrayEquals( + asList(A1, B1, A2, B2, A3, A4).toArray(), + Iterables.toArray(stateBeforeClear, TimestampedValue.class)); + assertThat(userState.read(), is(emptyIterable())); + + userState.asyncClose(); + assertThrows(IllegalStateException.class, () -> userState.clear()); + } + + @Test + public void testAddAndClearRange() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); + + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + // add to a non-existing timestamp, clear, and then add + userState.add(A2); + Iterable> stateBeforeFirstClearRange = userState.read(); + userState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(3)); + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), + Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); + assertArrayEquals( + asList(A1, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); + userState.add(B2); + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), + Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); + assertArrayEquals( + asList(A1, B2, A3, A4).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + + // add to an existing timestamp, clear, and then add + userState.add(B1); + userState.clearRange(Instant.ofEpochMilli(1), Instant.ofEpochMilli(2)); + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), + Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); + assertArrayEquals( + asList(B2, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); + userState.add(B1); + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), + Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); + assertArrayEquals( + asList(B1, B2, A3, A4).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + + // add a duplicated value, clear, and then add + userState.add(A3); + userState.clearRange(Instant.ofEpochMilli(3), Instant.ofEpochMilli(4)); + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), + Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); + assertArrayEquals( + asList(B1, B2, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); + userState.add(A3); + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), + Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); + assertArrayEquals( + asList(B1, B2, A3, A4).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + } + + @Test + public void testAddAndClearRangeAfterClear() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); + + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + userState.clear(); + userState.clearRange(Instant.ofEpochMilli(0), Instant.ofEpochMilli(5)); + assertThat(userState.read(), is(emptyIterable())); + + userState.add(A1); + assertArrayEquals( + Collections.singletonList(A1).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + + userState.add(A2); + userState.add(A3); + assertArrayEquals( + asList(A1, A2, A3).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); + + userState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(3)); + assertArrayEquals( + asList(A1, A3).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); + } + + @Test + public void testNoopAsyncCloseAndRead() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); + { + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + userState.asyncClose(); + } + + { + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + assertArrayEquals( + asList(A1, A3, A4).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + } + } + + @Test + public void testAddAsyncCloseAndRead() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); + { + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + userState.add(B1); + userState.add(A2); + userState.asyncClose(); + } + { + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + assertArrayEquals( + asList(A1, B1, A2, A3, A4).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + } + } + + @Test + public void testClearRangeAsyncCloseAndRead() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 2), + Collections.singletonList(A2), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); + { + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + userState.clearRange(Instant.ofEpochMilli(1), Instant.ofEpochMilli(3)); + userState.clearRange(Instant.ofEpochMilli(4), Instant.ofEpochMilli(5)); + userState.asyncClose(); + } + { + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + assertArrayEquals( + Collections.singletonList(A3).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + } + } + + @Test + public void testAddClearRangeAsyncCloseAndRead() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); + { + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + userState.add(B1); + userState.add(A2); + userState.add(A3); + userState.clearRange(Instant.ofEpochMilli(1), Instant.ofEpochMilli(3)); + userState.clearRange(Instant.ofEpochMilli(4), Instant.ofEpochMilli(5)); + userState.asyncClose(); + } + { + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + assertArrayEquals( + Collections.singletonList(A3).toArray(), + Iterables.toArray(userState.read(), TimestampedValue.class)); + } + } + + @Test + public void testClearAsyncCloseAndRead() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 2), + Collections.singletonList(A2), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); + { + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + userState.clear(); + userState.asyncClose(); + } + { + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + assertThat(userState.read(), is(emptyIterable())); + } + } + + @Test + public void testOperationsDuringNavigatingIterable() throws Exception { + FakeBeamFnStateClient fakeClient = + new FakeBeamFnStateClient( + timestampedValueCoder, + ImmutableMap.of( + createOrderedListStateKey("A", 1), + asList(A1, B1), + createOrderedListStateKey("A", 2), + asList(A2, B2), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); + + OrderedListUserState userState = + new OrderedListUserState<>( + Caches.noop(), + fakeClient, + "instructionId", + createOrderedListStateKey("A"), + StringUtf8Coder.of()); + + Iterator> iter = userState.read().iterator(); + assertEquals(iter.next(), A1); + + // Adding a C1 locally, but it should not be returned after B1 in the existing iterable. + userState.add(C1); + assertEquals(iter.next(), B1); + assertEquals(iter.next(), A2); + + // Clearing range [2,4) locally, but B2 and A3 should still be returned. + userState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)); + assertEquals(iter.next(), B2); + assertEquals(iter.next(), A3); + + // Clearing all ranges locally, but A4 should still be returned. + userState.clear(); + assertEquals(iter.next(), A4); + } + + private ByteString encode(String... values) throws IOException { + ByteStringOutputStream out = new ByteStringOutputStream(); + for (String value : values) { + StringUtf8Coder.of().encode(value, out); + } + return out.toByteString(); + } + + private StateKey createOrderedListStateKey(String key) throws IOException { + return StateKey.newBuilder() + .setOrderedListUserState( + StateKey.OrderedListUserState.newBuilder() + .setWindow(encode(encodedWindow)) + .setTransformId(pTransformId) + .setUserStateId(stateId) + .setKey(encode(key))) + .build(); + } + + private StateKey createOrderedListStateKey(String key, long sortKey) throws IOException { + return StateKey.newBuilder() + .setOrderedListUserState( + StateKey.OrderedListUserState.newBuilder() + .setWindow(encode(encodedWindow)) + .setTransformId(pTransformId) + .setUserStateId(stateId) + .setKey(encode(key)) + .setRange( + OrderedListRange.newBuilder().setStart(sortKey).setEnd(sortKey + 1).build())) + .build(); + } +} From 30cce4450e3ada1142c798adcff18733620ff9e6 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 13 Jun 2024 20:47:08 +0200 Subject: [PATCH 05/12] Clone dataflow containers (#31591) --- sdks/python/apache_beam/runners/dataflow/internal/names.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 20d2facd6bd1..40147e9926dc 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -34,6 +34,6 @@ # Unreleased sdks use container image tag specified below. # Update this tag whenever there is a change that # requires changes to SDK harness container or SDK harness launcher. -BEAM_DEV_SDK_CONTAINER_TAG = 'beam-master-20240524' +BEAM_DEV_SDK_CONTAINER_TAG = 'beam-master-20240613' DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3' From 635372f67c0be356dd1b7540ffeedbc560b6e9d0 Mon Sep 17 00:00:00 2001 From: tvalentyn Date: Thu, 13 Jun 2024 13:41:56 -0700 Subject: [PATCH 06/12] Limit the size of bundles of elements emitted by SDK into the data output stream. (#31581) * Limit the size of bundles of elements emitted by SDK into the data output stream. * Trigger tests. * Use a type-compliant sentinel. --- .github/trigger_files/beam_PostCommit_Python.json | 4 ++-- sdks/python/apache_beam/runners/worker/data_plane.py | 12 ++++++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 63bd5651def0..0ff79c010935 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,4 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run." } - + diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 3dd6bdbe9ae2..2f9de24594b2 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -452,7 +452,7 @@ def close(self): class _GrpcDataChannel(DataChannel): """Base class for implementing a BeamFnData-based DataChannel.""" - _WRITES_FINISHED = object() + _WRITES_FINISHED = beam_fn_api_pb2.Elements.Data() def __init__(self, data_buffer_time_limit_ms=0): # type: (int) -> None @@ -475,7 +475,7 @@ def __init__(self, data_buffer_time_limit_ms=0): def close(self): # type: () -> None - self._to_send.put(self._WRITES_FINISHED) # type: ignore[arg-type] + self._to_send.put(self._WRITES_FINISHED) self._closed = True def wait(self, timeout=None): @@ -639,8 +639,12 @@ def _write_outputs(self): streams = [self._to_send.get()] try: # Coalesce up to 100 other items. - for _ in range(100): - streams.append(self._to_send.get_nowait()) + total_size_bytes = streams[0].ByteSize() + while (total_size_bytes < _DEFAULT_SIZE_FLUSH_THRESHOLD and + len(streams) <= 100): + data_or_timer = self._to_send.get_nowait() + total_size_bytes += data_or_timer.ByteSize() + streams.append(data_or_timer) except queue.Empty: pass if streams[-1] is self._WRITES_FINISHED: From 4b1f4e68acca0bdac89bd115995a136b411957e4 Mon Sep 17 00:00:00 2001 From: bzablocki Date: Thu, 13 Jun 2024 23:20:26 +0200 Subject: [PATCH 07/12] Solace Read connector: SolaceIO PTransform init (#31594) * Splitting the #31476 - Leaving only PTransform AutoValue configurations --- .../beam/gradle/BeamModulePlugin.groovy | 2 + sdks/java/io/solace/build.gradle | 6 +- .../apache/beam/sdk/io/solace/SolaceIO.java | 237 ++++++++++++++++++ .../io/solace/broker/SempClientFactory.java | 26 ++ .../solace/broker/SessionServiceFactory.java | 26 ++ .../sdk/io/solace/broker/package-info.java | 20 ++ .../beam/sdk/io/solace/data/Solace.java | 71 ++++++ .../beam/sdk/io/solace/data/package-info.java | 20 ++ .../beam/sdk/io/solace/package-info.java | 20 ++ 9 files changed, 427 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/package-info.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/package-info.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/package-info.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 65fcf7333407..af3568a359ef 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -636,6 +636,7 @@ class BeamModulePlugin implements Plugin { def singlestore_jdbc_version = "1.1.4" def slf4j_version = "1.7.30" def snakeyaml_version = "2.2" + def solace_version = "10.21.0" def spark2_version = "2.4.8" def spark3_version = "3.2.2" def spotbugs_version = "4.0.6" @@ -877,6 +878,7 @@ class BeamModulePlugin implements Plugin { slf4j_log4j12 : "org.slf4j:slf4j-log4j12:$slf4j_version", slf4j_jcl : "org.slf4j:slf4j-jcl:$slf4j_version", snappy_java : "org.xerial.snappy:snappy-java:1.1.10.4", + solace : "com.solacesystems:sol-jcsmp:$solace_version", spark_core : "org.apache.spark:spark-core_2.11:$spark2_version", spark_streaming : "org.apache.spark:spark-streaming_2.11:$spark2_version", spark3_core : "org.apache.spark:spark-core_2.12:$spark3_version", diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index c09df4245015..c49b79f96a3d 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -30,4 +30,8 @@ ext.summary = """IO to read and write to Solace destinations (queues and topics) dependencies { implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) -} \ No newline at end of file + implementation library.java.vendored_guava_32_1_2_jre + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.joda_time + implementation library.java.solace +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java new file mode 100644 index 000000000000..ca8cd615ac68 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.Topic; +import org.apache.beam.sdk.io.solace.broker.SempClientFactory; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +public class SolaceIO { + + private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false; + + /** Get a {@link Topic} object from the topic name. */ + static Topic topicFromName(String topicName) { + return JCSMPFactory.onlyInstance().createTopic(topicName); + } + + /** Get a {@link Queue} object from the queue name. */ + static Queue queueFromName(String queueName) { + return JCSMPFactory.onlyInstance().createQueue(queueName); + } + + @AutoValue + public abstract static class Read extends PTransform> { + + /** Set the queue name to read from. Use this or the `from(Topic)` method. */ + public Read from(Solace.Queue queue) { + return toBuilder().setQueue(queueFromName(queue.getName())).build(); + } + + /** Set the topic name to read from. Use this or the `from(Queue)` method. */ + public Read from(Solace.Topic topic) { + return toBuilder().setTopic(topicFromName(topic.getName())).build(); + } + + /** + * The timestamp function, used for estimating the watermark, mapping the record T to an {@link + * Instant} + * + *

Optional when using the no-arg {@link SolaceIO#read()} method. Defaults to {@link + * SolaceIO#SENDER_TIMESTAMP_FUNCTION}. When using the {@link SolaceIO#read(TypeDescriptor, + * SerializableFunction, SerializableFunction)} method, the function mapping from T to {@link + * Instant} has to be passed as an argument. + */ + public Read withTimestampFn(SerializableFunction timestampFn) { + checkState( + timestampFn != null, + "SolaceIO.Read: timestamp function must be set or use the" + + " `Read.readSolaceRecords()` method"); + return toBuilder().setTimestampFn(timestampFn).build(); + } + + /** + * Optional. Sets the maximum number of connections to the broker. The actual number of sessions + * is determined by this and the number set by the runner. If not set, the number of sessions is + * determined by the runner. The number of connections created follows this logic: + * `numberOfConnections = min(maxNumConnections, desiredNumberOfSplits)`, where the + * `desiredNumberOfSplits` is set by the runner. + */ + public Read withMaxNumConnections(Integer maxNumConnections) { + return toBuilder().setMaxNumConnections(maxNumConnections).build(); + } + + /** + * Optional, default: false. Set to deduplicate messages based on the {@link + * BytesXMLMessage#getApplicationMessageId()} of the incoming {@link BytesXMLMessage}. If the + * field is null, then the {@link BytesXMLMessage#getReplicationGroupMessageId()} will be used, + * which is always set by Solace. + */ + public Read withDeduplicateRecords(boolean deduplicateRecords) { + return toBuilder().setDeduplicateRecords(deduplicateRecords).build(); + } + + /** + * Set a factory that creates a {@link org.apache.beam.sdk.io.solace.broker.SempClientFactory}. + * + *

The factory `create()` method is invoked in each instance of an {@link + * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader}. Created {@link + * org.apache.beam.sdk.io.solace.broker.SempClient} has to communicate with broker management + * API. It must support operations such as: + * + *

    + *
  • query for outstanding backlog bytes in a Queue, + *
  • query for metadata such as access-type of a Queue, + *
  • requesting creation of new Queues. + *
+ * + *

An existing implementation of the SempClientFactory includes {@link + * org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} which implements connection + * to the SEMP with the Basic Authentication method. + * + *

To use it, specify the credentials with the builder methods. + * + *

The format of the host is `[Protocol://]Host[:Port]` + * + *

{@code
+     * .withSempClientFactory(
+     *         BasicAuthSempClientFactory.builder()
+     *               .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
+     *               .username("username")
+     *               .password("password")
+     *               .vpnName("vpn-name")
+     *               .build())
+     * }
+ */ + public Read withSempClientFactory(SempClientFactory sempClientFactory) { + checkState(sempClientFactory != null, "SolaceIO.Read: sempClientFactory must not be null."); + return toBuilder().setSempClientFactory(sempClientFactory).build(); + } + + /** + * Set a factory that creates a {@link SessionService}. + * + *

The factory `create()` method is invoked in each instance of an {@link + * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader}. Created {@link SessionService} has + * to be able to: + * + *

    + *
  • initialize a connection with the broker, + *
  • check liveliness of the connection, + *
  • close the connection, + *
  • create a {@link org.apache.beam.sdk.io.solace.broker.MessageReceiver}. + *
+ * + *

An existing implementation of the SempClientFactory includes {@link + * org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService} which implements the Basic + * Authentication to Solace. * + * + *

To use it, specify the credentials with the builder methods. * + * + *

The host is the IPv4 or IPv6 or host name of the appliance. IPv6 addresses must be encoded + * in brackets ([]). For example, "12.34.56.78", or "[fe80::1]". If connecting to a non-default + * port, it can be specified here using the "Host:Port" format. For example, "12.34.56.78:4444", + * or "[fe80::1]:4444". + * + *

{@code
+     * BasicAuthJcsmpSessionServiceFactory.builder()
+     *     .host("your-host-name")
+     *           // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
+     *     .username("semp-username")
+     *     .password("semp-password")
+     *     .vpnName("vpn-name")
+     *     .build()));
+     * }
+ */ + public Read withSessionServiceFactory(SessionServiceFactory sessionServiceFactory) { + checkState( + sessionServiceFactory != null, "SolaceIO.Read: sessionServiceFactory must not be null."); + return toBuilder().setSessionServiceFactory(sessionServiceFactory).build(); + } + + abstract @Nullable Queue getQueue(); + + abstract @Nullable Topic getTopic(); + + abstract @Nullable SerializableFunction getTimestampFn(); + + abstract @Nullable Integer getMaxNumConnections(); + + abstract boolean getDeduplicateRecords(); + + abstract SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn(); + + abstract @Nullable SempClientFactory getSempClientFactory(); + + abstract @Nullable SessionServiceFactory getSessionServiceFactory(); + + abstract TypeDescriptor getTypeDescriptor(); + + public static Builder builder() { + Builder builder = new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read.Builder(); + builder.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS); + return builder; + } + + abstract Builder toBuilder(); + + @AutoValue.Builder + public abstract static class Builder { + + abstract Builder setQueue(Queue queue); + + abstract Builder setTopic(Topic topic); + + abstract Builder setTimestampFn(SerializableFunction timestampFn); + + abstract Builder setMaxNumConnections(Integer maxNumConnections); + + abstract Builder setDeduplicateRecords(boolean deduplicateRecords); + + abstract Builder setParseFn( + SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn); + + abstract Builder setSempClientFactory(SempClientFactory brokerServiceFactory); + + abstract Builder setSessionServiceFactory(SessionServiceFactory sessionServiceFactory); + + abstract Builder setTypeDescriptor(TypeDescriptor typeDescriptor); + + abstract Read build(); + } + + @Override + public PCollection expand(PBegin input) { + throw new UnsupportedOperationException(""); + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java new file mode 100644 index 000000000000..b5cb53e14b39 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import java.io.Serializable; + +/** + * This interface serves as a blueprint for creating SempClient objects, which are used to interact + * with a Solace message broker using the Solace Element Management Protocol (SEMP). + */ +public interface SempClientFactory extends Serializable {} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java new file mode 100644 index 000000000000..ab1f55ae7a9c --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import java.io.Serializable; + +/** + * This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a + * queue property and mandates the implementation of a create() method in concrete subclasses. + */ +public abstract class SessionServiceFactory implements Serializable {} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/package-info.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/package-info.java new file mode 100644 index 000000000000..960e24e2a1b3 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Solace IO broker-related classes. */ +package org.apache.beam.sdk.io.solace.broker; diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java new file mode 100644 index 000000000000..076a16b96ceb --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.data; + +/** + * A record to be written to a Solace topic. + * + *

You need to transform to {@link Solace.Record} to be able to write to Solace. For that, you + * can use the {@link Solace.Record.Builder} provided with this class. + * + *

For instance, to create a record, use the following code: + * + *

{@code
+ * Solace.Record record = Solace.Record.builder()
+ *         .setMessageId(messageId)
+ *         .setSenderTimestamp(timestampMillis)
+ *         .setPayload(payload)
+ *         .build();
+ * }
+ * + * Setting the message id and the timestamp is mandatory. + */ +public class Solace { + + public static class Queue { + private final String name; + + private Queue(String name) { + this.name = name; + } + + public static Queue fromName(String name) { + return new Queue(name); + } + + public String getName() { + return name; + } + } + + public static class Topic { + private final String name; + + private Topic(String name) { + this.name = name; + } + + public static Topic fromName(String name) { + return new Topic(name); + } + + public String getName() { + return name; + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/package-info.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/package-info.java new file mode 100644 index 000000000000..edf584310cab --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Solace IO connector - data-related classes. */ +package org.apache.beam.sdk.io.solace.data; diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/package-info.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/package-info.java new file mode 100644 index 000000000000..3996b9ad3e04 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Solace IO connector. */ +package org.apache.beam.sdk.io.solace; From 791d55790da2aaa39e0fa443ecdfd32eb22bfc71 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 13 Jun 2024 15:32:27 -0700 Subject: [PATCH 08/12] [#29697] Update the Release guide with currently manual GitHub release steps for prism. (#31582) --- contributor-docs/release-guide.md | 42 ++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/contributor-docs/release-guide.md b/contributor-docs/release-guide.md index 52ce3fb0a293..c0e8e7c67ce7 100644 --- a/contributor-docs/release-guide.md +++ b/contributor-docs/release-guide.md @@ -528,9 +528,36 @@ The following should be confirmed: - [ ] There is a commit not on the release branch with the version adjusted. - [ ] The RC tag points to that commit. +### Create a draft, pre-release Github release for the RC Tag + +TODO: Automate these steps as a github action. + +If this is for the first release candidate, create a new, draft, pre-release Github release. + +* Go to https://github.com/apache/beam/releases/new to start creating a Github release. + +If this is for subsequent release candidates re-use the existing Github release for this version. + +* Do not create a new release if one already exists, navigate to the existing Github release for the previous RC. + +Once on the release page: + +* Update the Release tag to the current RC Tag. +* Title the release "Beam ${RELEASE_VERSION} release". +* The description may remain empty for now, but will eventually contain the release blog post. +* Set this release as a pre-release, by checking the `Set as pre-release` box below the description box. + +Once configured properly, press the `Save draft` button. + +The following should be confirmed: + +- [ ] The Github release is configured as a draft, pre-release. +- [ ] The Github release points to the current RC tag. + ### Run build_release_candidate GitHub Action to create a release candidate **Action** [build_release_candidate](https://github.com/apache/beam/actions/workflows/build_release_candidate.yml) (click `run workflow`) +and update the JSON configuration fields with "yes". **The action will:** @@ -542,11 +569,15 @@ The following should be confirmed: 5. Build javadoc, pydoc, typedocs for a PR to update beam-site. - **NOTE**: Do not merge this PR until after an RC has been approved (see "Finalize the Release"). +6. Build Prism binaries for various platforms, and upload them into [dist.apache.org](https://dist.apache.org/repos/dist/dev/beam) + and the Github Release with the matching RC tag. -### Verify source distributions +### Verify source and artifact distributions - [ ] Verify that the source zip of the whole project is present in [dist.apache.org](https://dist.apache.org/repos/dist/dev/beam). - [ ] Verify that the Python binaries are present in [dist.apache.org](https://dist.apache.org/repos/dist/dev/beam). + - [ ] Verify that the Prism binaries are present in [dist.apache.org](https://dist.apache.org/repos/dist/dev/beam). + - [ ] Verify that the Prism binaries are attached to the Github Release created in the previous step. ### Verify docker images @@ -1189,9 +1220,12 @@ Merge all of the website pull requests ### Publish release to Github -Once the tag is uploaded, publish the release notes to Github. From the [Beam release page on Github](https://github.com/apache/beam/releases) select -"Draft a new release." Title the release "Beam ${RELEASE_VERSION} release" and set the release at the version tag created above. Use the content of the -release blog post as the body of the release notes, set this version as the latest release, and publish it. +Once the tag is uploaded, publish the release notes to Github. +From the [Beam release page on Github](https://github.com/apache/beam/releases) +find and open the release for the final RC tag for for editing. +Update the release with the final version tag created above. +Use the content of the release blog post as the body of the release notes, +set this version as the latest release, and publish it. The release notes should now be visible on Github's [Releases](https://github.com/apache/beam/releases) page. From 1e394072f8620c394b83667e72d03a72bdab847b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 14 Jun 2024 13:48:26 +0200 Subject: [PATCH 09/12] Bump com.gradle.enterprise from 3.17.4 to 3.17.5 (#31585) Bumps com.gradle.enterprise from 3.17.4 to 3.17.5. --- updated-dependencies: - dependency-name: com.gradle.enterprise dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- settings.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/settings.gradle.kts b/settings.gradle.kts index fd2f29ea001e..7386d4562148 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -24,7 +24,7 @@ pluginManagement { } plugins { - id("com.gradle.enterprise") version "3.17.4" + id("com.gradle.enterprise") version "3.17.5" id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.1" } From e357b4298dc73c7c36060b7731d06db39acc068c Mon Sep 17 00:00:00 2001 From: Florian Bernard Date: Fri, 14 Jun 2024 13:55:44 +0200 Subject: [PATCH 10/12] Improve javadoc in ParquetIO (#31409) --- .../main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index 33940aa27a5c..24c18f382817 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -1046,13 +1046,13 @@ public Sink withCompressionCodec(CompressionCodecName compressionCodecName) { return toBuilder().setCompressionCodec(compressionCodecName).build(); } - /** Specifies configuration to be passed into the sink's writer. */ + /** Specify Hadoop configuration for ParquetWriter. */ public Sink withConfiguration(Map configuration) { checkArgument(configuration != null, "configuration can not be null"); return toBuilder().setConfiguration(SerializableConfiguration.fromMap(configuration)).build(); } - /** Specify Hadoop configuration for ParquetReader. */ + /** Specify Hadoop configuration for ParquetWriter. */ public Sink withConfiguration(Configuration configuration) { checkArgument(configuration != null, "configuration can not be null"); return toBuilder().setConfiguration(new SerializableConfiguration(configuration)).build(); From 7fb85637c1fc6417f99347c77ba7d53fe950a1f9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 14 Jun 2024 13:59:39 +0200 Subject: [PATCH 11/12] Bump pydantic from 1.10.2 to 1.10.13 in /playground/infrastructure (#31099) Bumps [pydantic](https://github.com/pydantic/pydantic) from 1.10.2 to 1.10.13. - [Release notes](https://github.com/pydantic/pydantic/releases) - [Changelog](https://github.com/pydantic/pydantic/blob/main/HISTORY.md) - [Commits](https://github.com/pydantic/pydantic/compare/v1.10.2...v1.10.13) --- updated-dependencies: - dependency-name: pydantic dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- playground/infrastructure/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/playground/infrastructure/requirements.txt b/playground/infrastructure/requirements.txt index cfbd67145b14..16297f1fdc54 100644 --- a/playground/infrastructure/requirements.txt +++ b/playground/infrastructure/requirements.txt @@ -22,7 +22,7 @@ pytest-mock==3.6.1 PyYAML==6.0 tqdm~=4.62.3 sonora==0.2.2 -pydantic==1.10.2 +pydantic==1.10.13 grpcio-tools==1.62.1 protobuf==4.21.12 google-cloud-datastore==2.11.0 From 09bb1972103daeb46c2687d1e57eea3236cf8df6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 14 Jun 2024 14:03:04 +0200 Subject: [PATCH 12/12] Bump commons-cli:commons-cli from 1.6.0 to 1.8.0 (#31387) Bumps commons-cli:commons-cli from 1.6.0 to 1.8.0. --- updated-dependencies: - dependency-name: commons-cli:commons-cli dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- release/build.gradle.kts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/release/build.gradle.kts b/release/build.gradle.kts index ca9982806b9f..3b55542f9398 100644 --- a/release/build.gradle.kts +++ b/release/build.gradle.kts @@ -29,8 +29,8 @@ val library = project.extensions.extraProperties["library"] as Map