From f9c5a71b26307789406961813402d91dcc5c999a Mon Sep 17 00:00:00 2001 From: Johanna Ojeling Date: Sat, 16 Sep 2023 13:42:03 +0200 Subject: [PATCH] Retrieve element type from input PCollection in parquetio.Write --- CHANGES.md | 1 + sdks/go/pkg/beam/io/parquetio/parquetio.go | 5 +++-- sdks/go/pkg/beam/io/parquetio/parquetio_test.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 3705cebc88df..47c35fe3491b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -73,6 +73,7 @@ * Removed fastjson library dependency for Beam SQL. Table property is changed to be based on jackson ObjectNode (Java) ([#24154](https://github.com/apache/beam/issues/24154)). * Removed TensorFlow from Beam Python container images [PR](https://github.com/apache/beam/pull/28424). If you have been negatively affected by this change, please comment on [#20605](https://github.com/apache/beam/issues/20605). +* Removed the parameter `t reflect.Type` from `parquetio.Write`. The element type is derived from the input PCollection (Go) ([#28490](https://github.com/apache/beam/issues/28490)) ## Deprecations diff --git a/sdks/go/pkg/beam/io/parquetio/parquetio.go b/sdks/go/pkg/beam/io/parquetio/parquetio.go index 9c48d134014b..eb2a611f6836 100644 --- a/sdks/go/pkg/beam/io/parquetio/parquetio.go +++ b/sdks/go/pkg/beam/io/parquetio/parquetio.go @@ -96,7 +96,7 @@ func (a *parquetReadFn) ProcessElement(ctx context.Context, file fileio.Readable } // Write writes a PCollection to .parquet file. -// Write expects a type t of struct with parquet tags +// Write expects elements of a struct type with parquet tags // For example: // // type Student struct { @@ -108,7 +108,8 @@ func (a *parquetReadFn) ProcessElement(ctx context.Context, file fileio.Readable // Day int32 `parquet:"name=day, type=INT32, convertedtype=DATE"` // Ignored int32 //without parquet tag and won't write // } -func Write(s beam.Scope, filename string, t reflect.Type, col beam.PCollection) { +func Write(s beam.Scope, filename string, col beam.PCollection) { + t := col.Type().Type() s = s.Scope("parquetio.Write") filesystem.ValidateScheme(filename) pre := beam.AddFixedKey(s, col) diff --git a/sdks/go/pkg/beam/io/parquetio/parquetio_test.go b/sdks/go/pkg/beam/io/parquetio/parquetio_test.go index 1cceefcef46b..f3c901395609 100644 --- a/sdks/go/pkg/beam/io/parquetio/parquetio_test.go +++ b/sdks/go/pkg/beam/io/parquetio/parquetio_test.go @@ -95,7 +95,7 @@ func TestWrite(t *testing.T) { } p, s, sequence := ptest.CreateList(studentList) parquetFile := "./write_student.parquet" - Write(s, parquetFile, reflect.TypeOf(Student{}), sequence) + Write(s, parquetFile, sequence) t.Cleanup(func() { os.Remove(parquetFile) })