Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Pointer dereference error with Spanner partitioned reads on Dataflow #28959

Open
2 of 16 tasks
azach opened this issue Oct 12, 2023 · 5 comments
Open
2 of 16 tasks

Comments

@azach
Copy link

azach commented Oct 12, 2023

What happened?

Running into the following panic when attempting a partitioned read on Dataflow:

func main() {
	flag.Parse()
	beam.Init()

	p, s := beam.NewPipelineWithRoot()

	spannerRows := spannerio.Query(s, *spannerConnStr, "select id from table", reflect.TypeOf(SpannerEntity{}), spannerio.WithBatching(true))

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf(context.Background(), "Failed to execute job: %v", err)
	}
}

Error seen on Dataflow:

Full error:
while executing Process for Plan[s01-10]:
1: DataSource[S[ptransform-7@localhost:12371], 0] Out:8 Coder:W;coder-35<bytes;coder-36>!GWC
8: ParDo[spannerio.generatePartitionsFn] Out:[7] Sig: func(context.Context, []uint8, func(spannerio.partitionedRead)) error, SideInputs: []
7: PCollection[s01-10-pcollection-15] Out:[6]
6: SDF.PairWithRestriction[spannerio.readBatchFn] UID:6 Out:[5]
5: PCollection[s01-10-pcollection-18] Out:[4]
4: SDF.SplitAndSizeRestrictions[spannerio.readBatchFn] UID:4 Out:[2]
3: PCollection[s01-10-pcollection-25] Out:[2]
2: DataSink[S[ptransform-8@localhost:12371]] Coder:W;coder-41<KV;coder-42<KV;coder-43<LP;coder-44<R[spannerio.partitionedRead]>,KV;coder-45<offsetrange.Restriction[offsetrange.Restriction;c4];coder-46,bool;coder-47>>,double;coder-48>>!GWC
	caused by:
panic: runtime error: invalid memory address or nil pointer dereference goroutine 41 [running]:
runtime/debug.Stack()
	runtime/debug/stack.go:24 +0x5e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1()
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/util.go:58 +0x94
panic({0x1b94d60?, 0x4011170?})
	runtime/panic.go:914 +0x21f
cloud.google.com/go/spanner.(*Client).BatchReadOnlyTransactionFromID(0x0, {{0xc0009f3300, 0x3e, 0x40}, {0xc000ad3cc0, 0xa0}, {0x30c20288, 0xedcb9b466, 0x4046f80}})
	cloud.google.com/go/[email protected]/client.go:464 +0x57
github.com/apache/beam/sdks/v2/go/pkg/beam/io/spannerio.(*readBatchFn).CreateInitialRestriction(0xc000644fb0?, {{{0xc0009f3300, 0x3e, 0x40}, {0xc000ad3cc0, 0xa0}, {0x30c20288, 0xedcb9b466, 0x4046f80}}, 0xc000b22d50})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/spannerio/read_batch.go:70 +0x6d
reflect.Value.call({0x1ace740?, 0xc00043bf80?, 0x452909?}, {0x22605ba, 0x4}, {0xc000b74ba0, 0x1, 0x530ccb?})
	reflect/value.go:596 +0xce7
reflect.Value.Call({0x1ace740?, 0xc00043bf80?, 0x99?}, {0xc000b74ba0?, 0xc0006c60e8?, 0xc000670c80?})
	reflect/value.go:380 +0xb9
github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx.(*reflectFunc).Call(0xc0001686c0, {0xc000672440?, 0xc000b06240?, 0x246bf60?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/util/reflectx/call.go:87 +0x53
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*cirInvoker).initCallFn.func7()
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/sdf_invokers_arity.go:78 +0x32
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*cirInvoker).Invoke(0x0?, {0x24a30d0?, 0xc000671d00?}, 0x1de8f00?)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/sdf_invokers.go:84 +0xff
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*PairWithRestriction).ProcessElement(0xc000665050, {0x24a30d0, 0xc000671d00}, 0xc0002a44d0, {0x0, 0x0, 0x0})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/sdf.go:93 +0x5e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*PCollection).ProcessElement(0xc00065f290?, {0x24a30d0?, 0xc000671d00?}, 0x1?, {0x0?, 0x0?, 0x0?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pcollection.go:100 +0x151
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*emitValue).invoke(0xc00052ce00, {0xc000b74b88, 0x1, 0x101?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/emit.go:137 +0x2fb
github.com/apache/beam/sdks/v2/go/pkg/beam/io/spannerio.(*generatePartitionsFn).ProcessElement(0xc00052c380, {0x24a30d0, 0xc000671d00}, {0xc0006ccd20?, 0xc000646920?, 0x40e45a?}, 0xc00068f980)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/spannerio/generate_partitions.go:96 +0x2f3
github.com/apache/beam/sdks/v2/go/pkg/beam/register.registerDoFn3x1StructWrappersAndFuncs[...].func2.1({0x40764e0, 0x0, 0x0}, 0xf20000c00067fd01)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/register/register.go:3382 +0x78
github.com/apache/beam/sdks/v2/go/pkg/beam/register.(*caller3x1[...]).Call3x1(0x2529200, {0x1e0a160, 0xc000671d00?}, {0x1a5b3c0, 0xc00069e918?}, {0x1a9b660, 0xc00068f980?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/register/register.go:3371 +0xa5
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func13({0x3, 0x1, 0x1, 0x0, 0x0}, {0x4010b00, 0x1, 0x1}, 0x445ce0?)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn_arity.go:109 +0xa9
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).invokeWithOpts(0xc0000fadc0, {0x24a30d0?, 0xc000671d00}, {0x3, 0x1, 0x1, 0x0, 0x0}, {0x4010b00, 0x1, ...}, ...)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn.go:297 +0xf90
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc000674000, {0x24a30d0, 0xc000671d00}, {0x3, 0x1, 0x1, 0x0, 0x0}, {0x4010b00, 0x1, ...}, ...)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:508 +0x29f
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc000674000, 0xc0007001b0)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:194 +0x8f
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x0?, 0x24a30d0?)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:172 +0x7f
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc000674000, {0x246bdc0?, 0xc00069e750?}, 0xc00052db90, {0x0, 0x0, 0x0})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:156 +0x129
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).Process.func1(0xc00069e750, {0xc000647328?, 0xc0006c62c0?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/datasource.go:240 +0x223
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).process(0xc0006302c0, {0x24a30d0, 0xc00043a680}, 0xc000647618, 0xc0006475f8)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/datasource.go:146 +0x4d3
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).Process(0xc0006302c0, {0x24a30d0, 0xc00043a680})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/datasource.go:214 +0x20a
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute.func2({0x24a30d0?, 0xc00043a680?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/plan.go:151 +0x3b
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic({0x24a30d0?, 0xc00043a680?}, 0x418488?)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/util.go:62 +0x62
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc00065f5f0, {0x24a30d0, 0xc00043a680}, {0xc000350300, 0x24}, {{0x24a11a0?, 0xc000670500?}, {0x24f2bd8?, 0xc0006654d0?}})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/plan.go:150 +0x3b8
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc000436000, {0x24a2ad8?, 0xc000310f30?}, 0xc0004fc460)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/harness/harness.go:409 +0x9f8
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.MainWithOptions.func4({0x24a2ad8?, 0xc000310f30?}, 0xc000106900?)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/harness/harness.go:195 +0x74
created by github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.MainWithOptions in goroutine 1
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/harness/harness.go:213 +0xede

This collection works when batching is disabled. I've also confirmed this is a root partitionable query (it's also just a simple read from the table).

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@pequalsnp
Copy link
Contributor

I am running into the same error using spannerio.Query:

{
insertId: "1qg0xpqd1fse"
labels: {4}
logName: "projects/backend-core-prod/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2023-10-12T16:13:53.007588424Z"
resource: {2}
severity: "ERROR"
textPayload: "Error message from worker: process bundle failed for instruction process_bundle-536767401968947855-0 using plan s01-10 : panic: runtime error: invalid memory address or nil pointer dereference
Full error:
while executing Process for Plan[s01-10]:
1: DataSource[S[ptransform-7@localhost:12371], 0] Out:8 Coder:W;coder-35<bytes;coder-36>!GWC 
8: ParDo[spannerio.generatePartitionsFn] Out:[7] Sig: func(context.Context, []uint8, func(spannerio.partitionedRead)) error, SideInputs: []
7: PCollection[s01-10-pcollection-15] Out:[6]
6: SDF.PairWithRestriction[spannerio.readBatchFn] UID:6 Out:[5]
5: PCollection[s01-10-pcollection-18] Out:[4]
4: SDF.SplitAndSizeRestrictions[spannerio.readBatchFn] UID:4 Out:[2]
3: PCollection[s01-10-pcollection-25] Out:[2]
2: DataSink[S[ptransform-8@localhost:12371]] Coder:W;coder-41<KV;coder-42<KV;coder-43<LP;coder-44<R[spannerio.partitionedRead]>,KV;coder-45<offsetrange.Restriction[offsetrange.Restriction;c4];coder-46,bool;coder-47>>,double;coder-48>>!GWC
	caused by:
panic: runtime error: invalid memory address or nil pointer dereference goroutine 41 [running]:
runtime/debug.Stack()
	runtime/debug/stack.go:24 +0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1()
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/util.go:58 +0xa5
panic({0x2578d40, 0x4d742d0})
	runtime/panic.go:884 +0x213
cloud.google.com/go/spanner.(*Client).BatchReadOnlyTransactionFromID(0x0, {{0xc000a18240, 0x3e, 0x40}, {0xc00014fc20, 0x98}, {0x358a15d0, 0xedcba123e, 0x4da8320}})
	cloud.google.com/go/[email protected]/client.go:464 +0x5a
github.com/apache/beam/sdks/v2/go/pkg/beam/io/spannerio.(*readBatchFn).CreateInitialRestriction(0xc000496fe8?, {{{0xc000a18240, 0x3e, 0x40}, {0xc00014fc20, 0x98}, {0x358a15d0, 0xedcba123e, 0x4da8320}}, 0xc000a7de60})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/spannerio/read_batch.go:70 +0x78
reflect.Value.call({0x2474420?, 0xc00060e680?, 0x44eb32?}, {0x3166478, 0x4}, {0xc00061ba28, 0x1, 0x5167f1?})
	reflect/value.go:586 +0xb0b
reflect.Value.Call({0x2474420?, 0xc00060e680?, 0x99?}, {0xc00061ba28?, 0xc000a3c558?, 0xc00060f300?})
	reflect/value.go:370 +0xbc
github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx.(*reflectFunc).Call(0xc000228f30, {0xc0005fb500?, 0xc00017fc20?, 0x33bfd00?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/util/reflectx/call.go:87 +0x59
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*cirInvoker).initCallFn.func7()
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/sdf_invokers_arity.go:78 +0x38
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*cirInvoker).Invoke(0x7f38661e8458?, {0x34103c0?, 0xc00064e400?}, 0x299a4e0?)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/sdf_invokers.go:84 +0x114
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*PairWithRestriction).ProcessElement(0xc000614240, {0x34103c0, 0xc00064e400}, 0xc0004ee5b0, {0x0, 0x0, 0x0})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/sdf.go:93 +0x6a
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*PCollection).ProcessElement(0xc000603170?, {0x34103c0?, 0xc00064e400?}, 0x1?, {0x0?, 0x0?, 0x0?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pcollection.go:100 +0x151
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*emitValue).invoke(0xc0004ef6c0, {0xc00061ba10, 0x1, 0x101?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/emit.go:137 +0x314
github.com/apache/beam/sdks/v2/go/pkg/beam/io/spannerio.(*generatePartitionsFn).ProcessElement(0xc0004ef420, {0x34103c0, 0xc00064e400}, {0xc0006900c0?, 0xc000498930?, 0x40bb05?}, 0xc00064cc30)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/spannerio/generate_partitions.go:96 +0x2ec
github.com/apache/beam/sdks/v2/go/pkg/beam/register.registerDoFn3x1StructWrappersAndFuncs[...].func2.1({0x4dd79b0, 0x0, 0x0}, 0x8)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/register/register.go:3382 +0x82
github.com/apache/beam/sdks/v2/go/pkg/beam/register.(*caller3x1[...]).Call3x1(0x3524800, {0x29b7b00, 0xc00064e400?}, {0x2412680, 0xc00061afc0?}, {0x24448e0, 0xc00064cc30?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/register/register.go:3371 +0xae
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func13({0x3, 0x1, 0x1, 0x0, 0x0}, {0x4d73bb0, 0x1, 0x1}, 0x442220?)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn_arity.go:109 +0xad
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).invokeWithOpts(0xc0001508c0, {0x34103c0?, 0xc00064e400}, {0x3, 0x1, 0x1, 0x0, 0x0}, {0x4d73bb0, 0x1, ...}, ...)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn.go:297 +0xfec
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc000618240, {0x34103c0, 0xc00064e400}, {0x3, 0x1, 0x1, 0x0, 0x0}, {0x4d73bb0, 0x1, ...}, ...)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:508 +0x2b5
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc000618240, 0xc00069c750)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:194 +0x9b
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x0?, 0x34103c0?)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:172 +0x91
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc000618240, {0x33bfce0?, 0xc00061adf8?}, 0xc0004efb20, {0x0, 0x0, 0x0})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:156 +0x125
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).Process.func1(0xc00061adf8, {0xc000499338?, 0xc00065a4f0?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/datasource.go:240 +0x21e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).process(0xc00044c2c0, {0x34103c0, 0xc000220080}, 0xc000499628, 0xc000499608)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/datasource.go:146 +0x4f6
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).Process(0xc00044c2c0, {0x34103c0, 0xc000220080})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/datasource.go:214 +0x226
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute.func2({0x34103c0?, 0xc000220080?})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/plan.go:151 +0x42
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic({0x34103c0?, 0xc000220080?}, 0x415d70?)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/util.go:62 +0x6c
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc0006034d0, {0x34103c0, 0xc000220080}, {0xc000308b40, 0x23}, {{0x33de540?, 0xc00060eb80?}, {0x34b9418?, 0xc000614660?}})
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/plan.go:150 +0x3df
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc0001f8000, {0x34102a8?, 0xc0002df200?}, 0xc0000b7f40)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/harness/harness.go:409 +0xa37
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.MainWithOptions.func4({0x34102a8?, 0xc0002df200?}, 0xc0000f6900?)
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/harness/harness.go:195 +0x7a
created by github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.MainWithOptions
	github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/harness/harness.go:213 +0xf1d
"
timestamp: "2023-10-12T16:13:51.268439744Z"
}

using spannerio.Read works

@pequalsnp
Copy link
Contributor

This also appears entirely related to partitioned reads because I can get query to work if I set WithBatching(false)

@lostluck
Copy link
Contributor

This does seem like a bug in the batch partitioned reads.

The failing line SDK side:

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/spannerio/read_batch.go#L70

Appears to be sending in a bad BatchReadOnlyTransactionID from the partitionedRead,


And the issue appears to be because those spanner types don't JSON serialize, they BinaryMarshals. So of course the type is bad on encode/decode. Beam doesn't automatically handle binary marshalling, and neither does the final fallback JSON encoding.

https://pkg.go.dev/cloud.google.com/go/spanner#BatchReadOnlyTransactionID
https://pkg.go.dev/cloud.google.com/go/spanner#Partition

The fastest fix is to register encoder and decoder functions for the type.

So what probably needs to be done is to register a coder for spanner.partitionedRead, since that field is not getting encoded properly.

eg. Similar to the following should be added to the spannerio package, but with error handling added in, and pseudo code filled in.

func init() {
beam.RegisterCoder(reflect.TypeOf(partitionedRead{}), encodePartitionedRead, decodePartitionedRead)
}

func encodePartitionedRead(v partitionedRead) []byte {
  a, _:= v.BatchTransactionId.MarshalBinary()
  b, _ :=  v.Partition.MarshalBinary()
// length prefix the first byte set, and return the bytesparameter
}

func decodePartitionedRead(bytes []byte) partitionedRead {
  var ret partitionedRead
  // Decode the length prefix n so we can split the fields.
  _ = ret.BatchTransactionId.UnarshalBinary(bytes[:n])
  _ = ret.Partition.UnarshalBinary(bytes[n:])
  return ret
}

See the doc for some light documentation https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam#RegisterCoder . It still claims json is the default coder, but that's not been true for a bit. The rest of it remains valid however.

If one of you two @pequalsnp @azach were to try it and validate it for your usecase(s), that would be a big help versus my simple guess. IOs are hard because one needs to be an expert in both beam and the datasource them to write them well. I'm very happy to review/merge any changes that work for y'all.

@pequalsnp
Copy link
Contributor

I did write a coder, essentially like this (I took this from an SDF I re-wrote):

	beam.RegisterCoder(reflect.TypeOf(SpannerPartitionRestriction{}),
		func(read SpannerPartitionRestriction) ([]byte, error) {
			tid, err := read.TransactionID.MarshalBinary()
			if err != nil {
				return nil, err
			}
			ps := [][]byte{}
			for _, partition := range read.Partitions {
				p, err := partition.MarshalBinary()
				if err != nil {
					return nil, err
				}
				ps = append(ps, p)
			}
			var buf bytes.Buffer
			enc := gob.NewEncoder(&buf)
			err = enc.Encode(rawSpannerPartitionRestriction{Tid: tid, Ps: ps})
			if err != nil {
				return nil, err
			}
			return buf.Bytes(), nil
		},
		func(raw []byte) (SpannerPartitionRestriction, error) {
			buf := bytes.NewBuffer(raw)
			dec := gob.NewDecoder(buf)
			var rawStruct rawSpannerPartitionRestriction
			err := dec.Decode(&rawStruct)
			if err != nil {
				return SpannerPartitionRestriction{}, err
			}
			tid := spanner.BatchReadOnlyTransactionID{}
			err = tid.UnmarshalBinary(rawStruct.Tid)
			if err != nil {
				return SpannerPartitionRestriction{}, err
			}
			var ps []*spanner.Partition
			for _, rawP := range rawStruct.Ps {
				p := spanner.Partition{}
				err = p.UnmarshalBinary(rawP)
				if err != nil {
					return SpannerPartitionRestriction{}, err
				}
				ps = append(ps, &p)
			}
			return SpannerPartitionRestriction{TransactionID: tid, Partitions: ps}, nil
		})

That gets rid of that error, but there were other issues. For example, client is nil in CreateInitialRestriction because Setup hasn't been called yet. Not sure if that's a bug in the library or intentional.

@lostluck
Copy link
Contributor

I'm 50/50 on if that's the right behavior. SDFs get split up into several execution components, so if setup is expensive, it could be called once on a node that never actually needs those parts setup.

This could be worked around by moving the necessary client setup code into a sync.Once guarded function, which is called before it's needed.

But if we do feel it is a bug with Beam Go SDFs:

Setup would be called like here for normal ParDos:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L105

That would be fixed here in exec/sdf.go for Creating Restrictions.

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L52C35-L52C35

But also here for Splitting and sizing restrictions:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L141

Here for truncations: https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L268

It is already being called via the up call for ProcessSizedElementsAndRestrictions, but I'm mentioning it for completeness:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L460
And here for the fallback when SDFs are executing without sub element splits: https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L994

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants