From 2da24d0644f1fc978febbc940865ca73e669b258 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Tue, 20 Aug 2024 13:52:30 -0700 Subject: [PATCH] [#32245][Go SDK] Copy bytes sent over the State API Writer. (#32246) * [#32245] Copy bytes sent from the State API. * Mention #32245 in changes.md * remove unnecessary chagned line * weird copypasta --------- Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- CHANGES.md | 1 + sdks/go/pkg/beam/core/runtime/harness/statemgr.go | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e1cb4be078e9..9ce29336ec20 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -89,6 +89,7 @@ * Fixed incorrect service account impersonation flow for Python pipelines using BigQuery IOs ([#32030](https://github.com/apache/beam/issues/32030)). * Auto-disable broken and meaningless `upload_graph` feature when using Dataflow Runner V2 ([#32159](https://github.com/apache/beam/issues/32159)). * (Python) Upgraded google-cloud-storage to version 2.18.2 to fix a data corruption issue ([#32135](https://github.com/apache/beam/pull/32135)). +* (Go) Fix corruption on State API writes. ([#32245](https://github.com/apache/beam/issues/32245)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go index 061cfca011f5..4c1bc0b55fe3 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go @@ -16,6 +16,7 @@ package harness import ( + "bytes" "context" "fmt" "io" @@ -452,6 +453,9 @@ func (r *stateKeyReader) Read(buf []byte) (int, error) { r.buf = nil default: r.buf = r.buf[n:] + if len(r.buf) == 0 { + r.buf = nil + } } return n, nil } @@ -469,6 +473,7 @@ func (r *stateKeyWriter) Write(buf []byte) (int, error) { localChannel := r.ch r.mu.Unlock() + toSend := bytes.Clone(buf) var req *fnpb.StateRequest switch r.writeType { case writeTypeAppend: @@ -478,7 +483,7 @@ func (r *stateKeyWriter) Write(buf []byte) (int, error) { StateKey: r.key, Request: &fnpb.StateRequest_Append{ Append: &fnpb.StateAppendRequest{ - Data: buf, + Data: toSend, }, }, } @@ -499,7 +504,7 @@ func (r *stateKeyWriter) Write(buf []byte) (int, error) { if err != nil { return 0, err } - return len(buf), nil + return len(toSend), nil } // StateChannelManager manages data channels over the State API. A fixed number of channels