Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into solace-connector
Browse files Browse the repository at this point in the history
# Conflicts:
#	sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
  • Loading branch information
bzablocki committed Jun 21, 2024
2 parents a39f950 + 3695d49 commit da26ab5
Show file tree
Hide file tree
Showing 66 changed files with 1,624 additions and 1,479 deletions.
3 changes: 0 additions & 3 deletions .github/REVIEWERS.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ labels:
reviewers:
- lostluck
- jrmccluskey
- riteshghorse
exclusionList:
- youngoli
- name: Python
reviewers:
- damccorm
- jrmccluskey
- tvalentyn
- riteshghorse
- liferoad
- shunping
exclusionList: []
Expand Down Expand Up @@ -85,7 +83,6 @@ fallbackReviewers:
- jrmccluskey
- kennknowles
- lostluck
- riteshghorse
- robertwb
- shunping
- tvalentyn
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ jobs:
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: |
:sdks:java:io:elasticsearch-tests:elasticsearch-tests-5:build \
:sdks:java:io:elasticsearch-tests:elasticsearch-tests-7:build \
:sdks:java:io:elasticsearch-tests:elasticsearch-tests-8:build \
:sdks:java:io:elasticsearch-tests:elasticsearch-tests-common:build \
Expand Down
18 changes: 9 additions & 9 deletions .github/workflows/build_release_candidate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ on:
--
beam_site_pr: create the documentation update PR against apache/beam-site.
--
prism: build and upload the artifacts to the release for this tag
prism: build and upload the artifacts to the release for this tag
required: true
default: |
{java_artifacts: "no",
Expand Down Expand Up @@ -339,7 +339,7 @@ jobs:
pip install -U pip
pip install tox
# TODO(https://github.com/apache/beam/issues/20209): Don't hardcode py version in this file.
tox -e py38-docs
tox -e docs
rm -rf target/docs/_build/.doctrees
- name: Build Typescript Docs
working-directory: beam/sdks/typescript
Expand Down Expand Up @@ -469,7 +469,7 @@ jobs:
svn co https://dist.apache.org/repos/dist/dev/beam
mkdir -p "${SVN_ARTIFACTS_DIR}"
OUTPUT_DIR=$ROOT_DIR/$SVN_ARTIFACTS_DIR
mkdir -p $OUTPUT_DIR
echo "OUTPUT_DIR=$OUTPUT_DIR"
Expand All @@ -486,8 +486,8 @@ jobs:
BUILD_DIR=`pwd`
echo "............Build and stage prism artifacts in the Github Release $GH_RELEASE_ID for tag $RC_TAG.........."
# Loop through and build desired set from allowed types.
for OS in linux windows darwin; do
for ARCH in amd64 arm64; do
Expand All @@ -498,7 +498,7 @@ jobs:
OUTPUT_FILE="$TARGET_DIR/$TARGET_NAME"
if GOOS=$OS GOARCH=$ARCH CGO_ENABLED=0 go build -trimpath -buildvcs=false -o "$OUTPUT_FILE" . > /dev/null 2>&1; then
cd $TARGET_DIR
# Extract real output name. Windows builds automatically have .exe added.
# Extract real output name. Windows builds automatically have .exe added.
ARTIFACT=`ls`
echo "target built - $ARTIFACT"
Expand All @@ -511,10 +511,10 @@ jobs:
# Upload to the release.
gh release upload $RC_TAG $ZIP_NAME ${ZIP_NAME}.sha512 ${ZIP_NAME}.asc --clobber
# Remove the binary from the release directory
rm $ARTIFACT
# Return to our root build dir.
cd $BUILD_DIR
else
Expand All @@ -530,6 +530,6 @@ jobs:
echo "...........Adding prism artifacts to the Dev Apache SVN repo..........."
svn add --force prism
svn add --force --parents prism
svn status
svn commit -m "Staging Prism artifacts for Apache Beam ${RELEASE} RC${RC_NUM}" --non-interactive --username "${{ github.event.inputs.APACHE_ID }}" --password "${{ github.event.inputs.APACHE_PASSWORD }}"
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
# - group
# - strings

# [START groupbykey]
import apache_beam as beam
def groupbykey():
# [START groupbykey]
import apache_beam as beam

with beam.Pipeline() as p:
with beam.Pipeline() as p:

(p | beam.Create(['apple', 'ball', 'car', 'bear', 'cheetah', 'ant'])
| beam.Map(lambda word: (word[0], word))
| beam.GroupByKey()
| beam.LogElements())
# [END groupbykey]
(p | beam.Create(['apple', 'ball', 'car', 'bear', 'cheetah', 'ant'])
| beam.Map(lambda word: (word[0], word))
| beam.GroupByKey()
| beam.LogElements())
# [END groupbykey]

if __name__ == '__main__':
groupbykey()
16 changes: 8 additions & 8 deletions sdks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/google/uuid v1.6.0
github.com/johannesboyne/gofakes3 v0.0.0-20221110173912-32fb85c5aed6
github.com/lib/pq v1.10.9
github.com/linkedin/goavro/v2 v2.12.0
github.com/linkedin/goavro/v2 v2.13.0
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.33.1
github.com/proullon/ramsql v0.1.3
Expand All @@ -54,11 +54,11 @@ require (
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
go.mongodb.org/mongo-driver v1.13.1
golang.org/x/net v0.24.0
golang.org/x/net v0.26.0
golang.org/x/oauth2 v0.18.0
golang.org/x/sync v0.6.0
golang.org/x/sys v0.19.0
golang.org/x/text v0.14.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.21.0
golang.org/x/text v0.16.0
google.golang.org/api v0.171.0
google.golang.org/genproto v0.0.0-20240308144416-29370a3891b7
google.golang.org/grpc v1.63.2
Expand Down Expand Up @@ -181,9 +181,9 @@ require (
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c // indirect
Expand Down
32 changes: 16 additions & 16 deletions sdks/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg=
github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/linkedin/goavro/v2 v2.13.0 h1:L8eI8GcuciwUkt41Ej62joSZS4kKaYIUdze+6for9NU=
github.com/linkedin/goavro/v2 v2.13.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
Expand Down Expand Up @@ -494,8 +494,8 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -528,8 +528,8 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY=
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand All @@ -553,8 +553,8 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -570,8 +570,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -607,8 +607,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -620,8 +620,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -660,8 +660,8 @@ golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc=
golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
67 changes: 60 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
urns.TransformCombinePerKey,
urns.TransformCombineGlobally, // Used by Java SDK
urns.TransformCombineGroupedValues, // Used by Java SDK
urns.TransformMerge, // Used directly by Python SDK if "pre-optimized"
urns.TransformPreCombine, // Used directly by Python SDK if "pre-optimized"
urns.TransformExtract, // Used directly by Python SDK if "pre-optimized"
urns.TransformAssignWindows:
// Very few expected transforms types for submitted pipelines.
// Most URNs are for the runner to communicate back to the SDK for execution.
Expand Down Expand Up @@ -165,12 +168,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo

check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now.

case "":
// Composites can often have no spec
if len(t.GetSubtransforms()) > 0 {
continue
}
fallthrough
case urns.TransformTestStream:
var testStream pipepb.TestStreamPayload
if err := proto.Unmarshal(t.GetSpec().GetPayload(), &testStream); err != nil {
Expand All @@ -179,7 +176,15 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo

t.EnvironmentId = "" // Unset the environment, to ensure it's handled prism side.
testStreamIds = append(testStreamIds, tid)

default:
// Composites can often have some unknown urn, permit those.
// Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1", as well as the deprecated "beam:transform:read:v1",
// but they are composites. Since we don't do anything special with the high level, we simply use their internal subgraph.
if len(t.GetSubtransforms()) > 0 {
continue
}
// But if not, fail.
check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), "<doesn't exist>")
}
}
Expand All @@ -191,7 +196,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
// Inspect Windowing strategies for unsupported features.
for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() {
check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0))
check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY)
// Both Closing behaviors are identical without additional trigger firings.
check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY, pipepb.ClosingBehavior_EMIT_ALWAYS)
check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING)
if ws.GetWindowFn().GetUrn() != urns.WindowFnSession {
check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING)
Expand Down Expand Up @@ -398,3 +404,50 @@ func (s *Server) GetState(_ context.Context, req *jobpb.GetJobStateRequest) (*jo
Timestamp: timestamppb.New(j.stateTime),
}, nil
}

// DescribePipelineOptions is a no-op since it's unclear how it is to function.
// Apparently only implemented in the Python SDK.
func (s *Server) DescribePipelineOptions(context.Context, *jobpb.DescribePipelineOptionsRequest) (*jobpb.DescribePipelineOptionsResponse, error) {
return &jobpb.DescribePipelineOptionsResponse{
Options: []*jobpb.PipelineOptionDescriptor{},
}, nil
}

// GetStateStream returns the job state as it changes.
func (s *Server) GetStateStream(req *jobpb.GetJobStateRequest, stream jobpb.JobService_GetStateStreamServer) error {
s.mu.Lock()
job, ok := s.jobs[req.GetJobId()]
s.mu.Unlock()
if !ok {
return fmt.Errorf("job with id %v not found", req.GetJobId())
}

job.streamCond.L.Lock()
defer job.streamCond.L.Unlock()

state := job.state.Load().(jobpb.JobState_Enum)
for {
job.streamCond.L.Unlock()
stream.Send(&jobpb.JobStateEvent{
State: state,
Timestamp: timestamppb.Now(),
})
job.streamCond.L.Lock()
switch state {
case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED:
// Reached terminal state.
return nil
}
newState := job.state.Load().(jobpb.JobState_Enum)
for state == newState {
select { // Quit out if the external connection is done.
case <-stream.Context().Done():
return context.Cause(stream.Context())
default:
}
job.streamCond.Wait()
newState = job.state.Load().(jobpb.JobState_Enum)
}
state = newState
}
}
15 changes: 15 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,21 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error {

data = winMap[w]

case *fnpb.StateKey_MultimapKeysSideInput_:
mmkey := key.GetMultimapKeysSideInput()
wKey := mmkey.GetWindow()
var w typex.Window = window.GlobalWindow{}
if len(wKey) > 0 {
w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
if err != nil {
panic(fmt.Sprintf("error decoding multimap side input window key %v: %v", wKey, err))
}
}
winMap := b.MultiMapSideInputData[SideInputKey{TransformID: mmkey.GetTransformId(), Local: mmkey.GetSideInputId()}]
for k := range winMap[w] {
data = append(data, []byte(k))
}

case *fnpb.StateKey_MultimapSideInput_:
mmkey := key.GetMultimapSideInput()
wKey := mmkey.GetWindow()
Expand Down
Loading

0 comments on commit da26ab5

Please sign in to comment.