Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#28981] remove anon functions from minimal wordcount #29179

Merged
merged 1 commit into from
Oct 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions sdks/go/examples/minimal_wordcount/minimal_wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
//
// Concepts:
//
// 0. Registering transforms with Beam.
// 1. Reading data from text files
// 2. Specifying 'inline' transforms
// 3. Counting items in a PCollection
Expand Down Expand Up @@ -62,6 +63,7 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"

Expand All @@ -71,6 +73,26 @@ import (

var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)

func splitWords(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}

func formatCounts(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}

// Concept #0: Transform functions executed by Beam need to be registered
// so they can be executed by portable runners. We use the register package
// in an init block to inform Beam of the functions we will be using, so
// it can access them on workers.
func init() {
register.Function2x0(splitWords)
register.Function2x1(formatCounts)
register.Emitter1[string]()
}

func main() {
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()
Expand All @@ -91,15 +113,11 @@ func main() {
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")

// Concept #2: Invoke a ParDo transform on our PCollection of text lines.
// This ParDo invokes a DoFn (defined in-line) on each element that
// This ParDo invokes a DoFn (registered earlier) on each element that
// tokenizes the text line into individual words. The ParDo returns a
// PCollection of type string, where each element is an individual word in
// Shakespeare's collected texts.
words := beam.ParDo(s, func(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}, lines)
words := beam.ParDo(s, splitWords, lines)

// Concept #3: Invoke the stats.Count transform on our PCollection of
// individual words. The Count transform returns a new PCollection of
Expand All @@ -110,9 +128,7 @@ func main() {
// Use a ParDo to format our PCollection of word counts into a printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}, counted)
formatted := beam.ParDo(s, formatCounts, counted)

// Concept #4: Invoke textio.Write at the end of the pipeline to write
// the contents of a PCollection (in this case, our PCollection of
Expand Down