Skip to content

Commit

Permalink
Fix all tour examples from part 1 (apache#30478)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dal-Papa authored Mar 25, 2024
1 parent ea7a27d commit c8ad120
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package main

import (
"context"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
Expand All @@ -37,6 +38,7 @@ import (

func main() {
ctx := context.Background()
beam.Init()

p, s := beam.NewPipelineWithRoot()

Expand All @@ -51,7 +53,7 @@ func main() {
err := beamx.Run(ctx, p)

if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,37 @@
package main

import (
"context"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"context"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
)

func main() {
ctx := context.Background()
ctx := context.Background()
beam.Init()

p, s := beam.NewPipelineWithRoot()
p, s := beam.NewPipelineWithRoot()

// List of elements
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// List of elements
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// The applyTransform() converts [input] to [output]
output := applyTransform(s,input)
// The applyTransform() converts [input] to [output]
output := applyTransform(s, input)

debug.Printf(s, "PCollection maximum value: %v", output)
debug.Printf(s, "PCollection maximum value: %v", output)

err := beamx.Run(ctx, p)
err := beamx.Run(ctx, p)

if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
}
if err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}

// Return the maximum number from `PCollection`.
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
return stats.Max(s, input)
}
return stats.Max(s, input)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,37 @@
package main

import (
"context"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"context"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
)

func main() {
ctx := context.Background()
ctx := context.Background()
beam.Init()

p, s := beam.NewPipelineWithRoot()
p, s := beam.NewPipelineWithRoot()

// List of elements
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// List of elements
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// The applyTransform() converts [input] to [output]
output := applyTransform(s, input)
// The applyTransform() converts [input] to [output]
output := applyTransform(s, input)

debug.Printf(s, "PCollection mean value: %v", output)
debug.Printf(s, "PCollection mean value: %v", output)

err := beamx.Run(ctx, p)
err := beamx.Run(ctx, p)

if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
}
if err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}

// Return the mean of numbers from `PCollection`.
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
return stats.Mean(s, input)
}
return stats.Mean(s, input)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,37 @@
package main

import (
"context"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"context"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
)

func main() {
ctx := context.Background()
ctx := context.Background()
beam.Init()

p, s := beam.NewPipelineWithRoot()
p, s := beam.NewPipelineWithRoot()

// List of elements
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// List of elements
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// The applyTransform() converts [input] to [output]
output := applyTransform(s, input)
// The applyTransform() converts [input] to [output]
output := applyTransform(s, input)

debug.Printf(s, "PCollection minimum value: %v", output)
debug.Printf(s, "PCollection minimum value: %v", output)

err := beamx.Run(ctx, p)
err := beamx.Run(ctx, p)

if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
}
if err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}

// Return the minimum of numbers from `PCollection`.
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
return stats.Min(s, input)
}
return stats.Min(s, input)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package main

import (
"context"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
Expand All @@ -37,6 +38,7 @@ import (

func main() {
ctx := context.Background()
beam.Init()

p, s := beam.NewPipelineWithRoot()

Expand All @@ -51,7 +53,7 @@ func main() {
err := beamx.Run(ctx, p)

if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,39 @@
package main

import (
"context"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
"context"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
)

func main() {
ctx := context.Background()
ctx := context.Background()
beam.Init()

p, s := beam.NewPipelineWithRoot()
p, s := beam.NewPipelineWithRoot()

// List of elements
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// List of elements
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// The [input] filtered with the applyTransform()
output := applyTransform(s, input)
// The [input] filtered with the applyTransform()
output := applyTransform(s, input)

debug.Printf(s, "PCollection filtered value: %v", output)
debug.Printf(s, "PCollection filtered value: %v", output)

err := beamx.Run(ctx, p)
err := beamx.Run(ctx, p)

if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
}
if err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}

// The method filters the collection so that the numbers are even
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
return filter.Exclude(s, input, func(element int) bool {
return element % 2 == 1
})
return filter.Exclude(s, input, func(element int) bool {
return element%2 == 1
})
}


Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ package main

import (
"context"
"strconv"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"strconv"
"strings"
)

func main() {
ctx := context.Background()
beam.Init()

p, s := beam.NewPipelineWithRoot()

Expand Down Expand Up @@ -71,7 +73,7 @@ func main() {
err := beamx.Run(ctx, p)

if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ package main

import (
"context"
"strconv"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"strconv"
"strings"
)

func main() {
ctx := context.Background()
beam.Init()

p, s := beam.NewPipelineWithRoot()

Expand Down Expand Up @@ -73,7 +75,7 @@ func main() {
err := beamx.Run(ctx, p)

if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}

Expand Down
Loading

0 comments on commit c8ad120

Please sign in to comment.