Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/upstream/master' into sola…
Browse files Browse the repository at this point in the history
…ce-connector

# Conflicts:
#	sdks/java/io/solace/build.gradle
#	sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
#	sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java
#	sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
#	sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
  • Loading branch information
bzablocki committed Jun 14, 2024
2 parents 19b02be + 09bb197 commit bcc0a51
Show file tree
Hide file tree
Showing 24 changed files with 1,452 additions and 62 deletions.
4 changes: 2 additions & 2 deletions .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run."
}

3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@
* Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform
* All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381)
should be updated to use new snake_case parameter names.
* Upgraded Jackson Databind to 2.15.4 (Java) ([#26743](https://github.com/apache/beam/issues/26743)).
jackson-2.15 has known breaking changes. An important one is it imposed a buffer limit for parser.
If your custom PTransform/DoFn are affected, refer to [#31580](https://github.com/apache/beam/pull/31580) for mitigation.

# [2.56.0] - 2024-05-01

Expand Down
42 changes: 38 additions & 4 deletions contributor-docs/release-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,36 @@ The following should be confirmed:
- [ ] There is a commit not on the release branch with the version adjusted.
- [ ] The RC tag points to that commit.
### Create a draft, pre-release Github release for the RC Tag
TODO: Automate these steps as a github action.
If this is for the first release candidate, create a new, draft, pre-release Github release.
* Go to https://github.com/apache/beam/releases/new to start creating a Github release.
If this is for subsequent release candidates re-use the existing Github release for this version.
* Do not create a new release if one already exists, navigate to the existing Github release for the previous RC.
Once on the release page:
* Update the Release tag to the current RC Tag.
* Title the release "Beam ${RELEASE_VERSION} release".
* The description may remain empty for now, but will eventually contain the release blog post.
* Set this release as a pre-release, by checking the `Set as pre-release` box below the description box.
Once configured properly, press the `Save draft` button.
The following should be confirmed:
- [ ] The Github release is configured as a draft, pre-release.
- [ ] The Github release points to the current RC tag.
### Run build_release_candidate GitHub Action to create a release candidate
**Action** [build_release_candidate](https://github.com/apache/beam/actions/workflows/build_release_candidate.yml) (click `run workflow`)
and update the JSON configuration fields with "yes".
**The action will:**
Expand All @@ -542,11 +569,15 @@ The following should be confirmed:
5. Build javadoc, pydoc, typedocs for a PR to update beam-site.
- **NOTE**: Do not merge this PR until after an RC has been approved (see
"Finalize the Release").
6. Build Prism binaries for various platforms, and upload them into [dist.apache.org](https://dist.apache.org/repos/dist/dev/beam)
and the Github Release with the matching RC tag.
### Verify source distributions
### Verify source and artifact distributions
- [ ] Verify that the source zip of the whole project is present in [dist.apache.org](https://dist.apache.org/repos/dist/dev/beam).
- [ ] Verify that the Python binaries are present in [dist.apache.org](https://dist.apache.org/repos/dist/dev/beam).
- [ ] Verify that the Prism binaries are present in [dist.apache.org](https://dist.apache.org/repos/dist/dev/beam).
- [ ] Verify that the Prism binaries are attached to the Github Release created in the previous step.
### Verify docker images
Expand Down Expand Up @@ -1189,9 +1220,12 @@ Merge all of the website pull requests
### Publish release to Github
Once the tag is uploaded, publish the release notes to Github. From the [Beam release page on Github](https://github.com/apache/beam/releases) select
"Draft a new release." Title the release "Beam ${RELEASE_VERSION} release" and set the release at the version tag created above. Use the content of the
release blog post as the body of the release notes, set this version as the latest release, and publish it.
Once the tag is uploaded, publish the release notes to Github.
From the [Beam release page on Github](https://github.com/apache/beam/releases)
find and open the release for the final RC tag for for editing.
Update the release with the final version tag created above.
Use the content of the release blog post as the body of the release notes,
set this version as the latest release, and publish it.
The release notes should now be visible on Github's [Releases](https://github.com/apache/beam/releases) page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,11 @@ message StandardUserStateTypes {
// StateKey.MultimapKeysUserState or StateKey.MultimapUserState.
MULTIMAP = 1 [(beam_urn) = "beam:user_state:multimap:v1"];

// TODO(https://github.com/apache/beam/issues/20486): Add protocol to support OrderedListState
// Represents a user state specification that supports an ordered list.
//
// StateRequests performed on this user state must use
// StateKey.OrderedListUserState.
ORDERED_LIST = 2 [(beam_urn) = "beam:user_state:ordered_list:v1"];
}
}

Expand Down
2 changes: 1 addition & 1 deletion playground/infrastructure/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pytest-mock==3.6.1
PyYAML==6.0
tqdm~=4.62.3
sonora==0.2.2
pydantic==1.10.2
pydantic==1.10.13
grpcio-tools==1.62.1
protobuf==4.21.12
google-cloud-datastore==2.11.0
Expand Down
4 changes: 2 additions & 2 deletions release/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ val library = project.extensions.extraProperties["library"] as Map<String, Map<S

dependencies {
implementation(library.getValue("groovy").getValue("groovy_all"))
implementation("commons-cli:commons-cli:1.6.0")
permitUnusedDeclared("commons-cli:commons-cli:1.6.0") // BEAM-11761
implementation("commons-cli:commons-cli:1.8.0")
permitUnusedDeclared("commons-cli:commons-cli:1.8.0") // BEAM-11761
}

task("runJavaExamplesValidationTask") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,38 @@
@Internal
public class RowJsonUtils {

//
private static int defaultBufferLimit;

/**
* Increase the default jackson-databind stream read constraint.
*
* <p>StreamReadConstraints was introduced in jackson 2.15 causing string > 20MB (5MB in 2.15.0)
* parsing failure. This has caused regressions in its dependencies include Beam. Here we
* overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit.
* If needed, call this method during pipeline run time, e.g. in DoFn.setup.
*/
public static void increaseDefaultStreamReadConstraints(int newLimit) {
if (newLimit <= defaultBufferLimit) {
return;
}
try {
Class<?> unused = Class.forName("com.fasterxml.jackson.core.StreamReadConstraints");

com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints(
com.fasterxml.jackson.core.StreamReadConstraints.builder()
.maxStringLength(newLimit)
.build());
} catch (ClassNotFoundException e) {
// <2.15, do nothing
}
defaultBufferLimit = newLimit;
}

static {
increaseDefaultStreamReadConstraints(100 * 1024 * 1024);
}

public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) {
SimpleModule module = new SimpleModule("rowDeserializationModule");
module.addDeserializer(Row.class, deserializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class ParDoTranslation {
public static final String BAG_USER_STATE = "beam:user_state:bag:v1";
/** Represents a user state specification that supports a multimap. */
public static final String MULTIMAP_USER_STATE = "beam:user_state:multimap:v1";
/** Represents a user state specification that supports an ordered list. */
public static final String ORDERED_LIST_USER_STATE = "beam:user_state:ordered_list:v1";

static {
checkState(
Expand All @@ -141,6 +143,8 @@ public class ParDoTranslation {
BeamUrns.getUrn(StandardRequirements.Enum.REQUIRES_ON_WINDOW_EXPIRATION)));
checkState(BAG_USER_STATE.equals(BeamUrns.getUrn(StandardUserStateTypes.Enum.BAG)));
checkState(MULTIMAP_USER_STATE.equals(BeamUrns.getUrn(StandardUserStateTypes.Enum.MULTIMAP)));
checkState(
ORDERED_LIST_USER_STATE.equals(BeamUrns.getUrn(StandardUserStateTypes.Enum.ORDERED_LIST)));
}

/** The URN for an unknown Java {@link DoFn}. */
Expand Down Expand Up @@ -601,9 +605,7 @@ public RunnerApi.StateSpec dispatchOrderedList(Coder<?> elementCoder) {
.setOrderedListSpec(
RunnerApi.OrderedListStateSpec.newBuilder()
.setElementCoderId(registerCoderOrThrow(components, elementCoder)))
// TODO(https://github.com/apache/beam/issues/20486): Update with correct protocol
// once the protocol is defined and
// the SDK harness uses it.
.setProtocol(FunctionSpec.newBuilder().setUrn(ORDERED_LIST_USER_STATE))
.build();
}

Expand Down Expand Up @@ -694,6 +696,10 @@ static StateSpec<?> fromProto(RunnerApi.StateSpec stateSpec, RehydratedComponent
case SET_SPEC:
return StateSpecs.set(components.getCoder(stateSpec.getSetSpec().getElementCoderId()));

case ORDERED_LIST_SPEC:
return StateSpecs.orderedList(
components.getCoder(stateSpec.getOrderedListSpec().getElementCoderId()));

case SPEC_NOT_SET:
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ public static Iterable<Object[]> stateSpecs() {
{
StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of()),
FunctionSpec.newBuilder().setUrn(ParDoTranslation.MULTIMAP_USER_STATE).build()
},
{
StateSpecs.orderedList(VarIntCoder.of()),
FunctionSpec.newBuilder().setUrn(ParDoTranslation.ORDERED_LIST_USER_STATE).build()
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu
}
return ((ByteString) value).getBytes();
case ARRAY:
case ITERABLE:
return toBeamList((List<Object>) value, fieldType.getCollectionElementType(), verifyValues);
case MAP:
return toBeamMap(
Expand Down Expand Up @@ -558,6 +559,9 @@ private static Expression getBeamField(
case ROW:
value = Expressions.call(expression, "getRow", fieldName);
break;
case ITERABLE:
value = Expressions.call(expression, "getIterable", fieldName);
break;
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (FixedString.IDENTIFIER.equals(identifier)
Expand Down Expand Up @@ -634,6 +638,7 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType)
return nullOr(
value, Expressions.new_(ByteString.class, Expressions.convert_(value, byte[].class)));
case ARRAY:
case ITERABLE:
return nullOr(value, toCalciteList(value, fieldType.getCollectionElementType()));
case MAP:
return nullOr(value, toCalciteMap(value, fieldType.getMapValueType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
Expand Down Expand Up @@ -94,6 +95,7 @@ public static void prepareClass() throws ParseException {
.addDateTimeField("f_timestamp")
.addInt32Field("f_int2")
.addDecimalField("f_decimal")
.addIterableField("f_iterable", FieldType.STRING)
.build();

rowsInTableA =
Expand All @@ -111,7 +113,8 @@ public static void prepareClass() throws ParseException {
LocalDateTime.of(2017, 1, 1, 1, 1, 3),
parseTimestampWithoutTimeZone("2017-01-01 01:01:03"),
0,
new BigDecimal(1))
new BigDecimal(1),
Lists.newArrayList("s1", "s2"))
.addRows(
2,
2000L,
Expand All @@ -125,7 +128,8 @@ public static void prepareClass() throws ParseException {
LocalDateTime.of(2017, 1, 1, 1, 2, 3),
parseTimestampWithoutTimeZone("2017-01-01 01:02:03"),
0,
new BigDecimal(2))
new BigDecimal(2),
Lists.newArrayList("s1", "s2"))
.addRows(
3,
3000L,
Expand All @@ -139,7 +143,8 @@ public static void prepareClass() throws ParseException {
LocalDateTime.of(2017, 1, 1, 1, 6, 3),
parseTimestampWithoutTimeZone("2017-01-01 01:06:03"),
0,
new BigDecimal(3))
new BigDecimal(3),
Lists.newArrayList("s1", "s2"))
.addRows(
4,
4000L,
Expand All @@ -153,7 +158,8 @@ public static void prepareClass() throws ParseException {
LocalDateTime.of(2017, 1, 1, 2, 4, 3),
parseTimestampWithoutTimeZone("2017-01-01 02:04:03"),
0,
new BigDecimal(4))
new BigDecimal(4),
Lists.newArrayList("s1", "s2"))
.getRows();

monthlyRowsInTableA =
Expand All @@ -171,7 +177,8 @@ public static void prepareClass() throws ParseException {
LocalDateTime.of(2017, 1, 1, 1, 1, 3),
parseTimestampWithUTCTimeZone("2017-01-01 01:01:03"),
0,
new BigDecimal(1))
new BigDecimal(1),
Lists.newArrayList("s1", "s2"))
.addRows(
2,
2000L,
Expand All @@ -185,7 +192,8 @@ public static void prepareClass() throws ParseException {
LocalDateTime.of(2017, 1, 1, 1, 2, 3),
parseTimestampWithUTCTimeZone("2017-02-01 01:02:03"),
0,
new BigDecimal(2))
new BigDecimal(2),
Lists.newArrayList("s1", "s2"))
.addRows(
3,
3000L,
Expand All @@ -199,7 +207,8 @@ public static void prepareClass() throws ParseException {
LocalDateTime.of(2017, 1, 1, 1, 6, 3),
parseTimestampWithUTCTimeZone("2017-03-01 01:06:03"),
0,
new BigDecimal(3))
new BigDecimal(3),
Lists.newArrayList("s1", "s2"))
.getRows();

schemaFloatDouble =
Expand Down
Loading

0 comments on commit bcc0a51

Please sign in to comment.