diff --git a/CHANGES.md b/CHANGES.md index e1cb4be078e9..9ce29336ec20 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -89,6 +89,7 @@ * Fixed incorrect service account impersonation flow for Python pipelines using BigQuery IOs ([#32030](https://github.com/apache/beam/issues/32030)). * Auto-disable broken and meaningless `upload_graph` feature when using Dataflow Runner V2 ([#32159](https://github.com/apache/beam/issues/32159)). * (Python) Upgraded google-cloud-storage to version 2.18.2 to fix a data corruption issue ([#32135](https://github.com/apache/beam/pull/32135)). +* (Go) Fix corruption on State API writes. ([#32245](https://github.com/apache/beam/issues/32245)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go index 061cfca011f5..4c1bc0b55fe3 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go @@ -16,6 +16,7 @@ package harness import ( + "bytes" "context" "fmt" "io" @@ -452,6 +453,9 @@ func (r *stateKeyReader) Read(buf []byte) (int, error) { r.buf = nil default: r.buf = r.buf[n:] + if len(r.buf) == 0 { + r.buf = nil + } } return n, nil } @@ -469,6 +473,7 @@ func (r *stateKeyWriter) Write(buf []byte) (int, error) { localChannel := r.ch r.mu.Unlock() + toSend := bytes.Clone(buf) var req *fnpb.StateRequest switch r.writeType { case writeTypeAppend: @@ -478,7 +483,7 @@ func (r *stateKeyWriter) Write(buf []byte) (int, error) { StateKey: r.key, Request: &fnpb.StateRequest_Append{ Append: &fnpb.StateAppendRequest{ - Data: buf, + Data: toSend, }, }, } @@ -499,7 +504,7 @@ func (r *stateKeyWriter) Write(buf []byte) (int, error) { if err != nil { return 0, err } - return len(buf), nil + return len(toSend), nil } // StateChannelManager manages data channels over the State API. A fixed number of channels