Skip to content

Commit

Permalink
[website][Go SDK] Add Go testing advice to test your pipeline. (#32549)
Browse files Browse the repository at this point in the history
* Add Go testing advice to test your pipeline.

* Remove package name in comp test example.

* ws lint

* additional ws

* fix line readings

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Sep 26, 2024
1 parent 8e8c914 commit 9053dbf
Showing 1 changed file with 115 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,27 @@ with TestPipeline as p:
...
{{< /highlight >}}

{{< highlight go >}}
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"

// Override TestMain with ptest.Main,
// once per package.
func TestMain(m *testing.M) {
ptest.Main(m)
}

func TestPipeline(t *testing.T) {
...
// The Go SDK doesn't use a TestPipeline concept,
// and recommends using the ptest harness
// to wrap pipeline construction.
pr := ptest.BuildAndRun(t, func(s beam.Scope) {
...
})
...
}
{{< /highlight >}}

> **Note:** Read about testing unbounded pipelines in Beam in [this blog post](/blog/2016/10/20/test-stream.html).
### Using the Create Transform
Expand Down Expand Up @@ -100,6 +121,16 @@ assert_that(
equal_to(["elem1", "elem3", "elem2"]))
{{< /highlight >}}

{{< highlight go >}}
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"

output := ... // beam.PCollection

// Check whether a PCollection contains some elements in any order.
passert.EqualsList(s, output, ["elem1", "elem3", "elem2"])
{{< /highlight >}}


{{< paragraph class="language-java" >}}
Any Java code that uses `PAssert` must link in `JUnit` and `Hamcrest`. If you're using Maven, you can link in `Hamcrest` by adding the following dependency to your project's `pom.xml` file:
{{< /paragraph >}}
Expand Down Expand Up @@ -194,6 +225,48 @@ class CountTest(unittest.TestCase):
# The pipeline will run and verify the results.
{{< /highlight >}}

{{< highlight go >}}
import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)

// formatFn takes a key value pair and puts them
// into a single string for comparison.
func formatFn(w string, c int) string {
return fmt.Sprintf("%s: %d", w, c)
}

// Register the functional DoFn to ensure execution on workers.
func init() {
register.Function2x1(formatFn)
}

func TestCountWords(t *testing.T) {
// The pipeline will run and verify the results.
ptest.BuildAndRun(t, func(s beam.Scope) {
words := []string{"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""}

wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}

// Create a PCollection from the words static input data.
input := beam.CreateList(s, words)

// Apply the Count transform under test.
output := stats.Count(s, col)
formatted := beam.ParDo(s, formatFn, output)

// Assert that the output PCollection matches the wantCounts data.
passert.Equals(s, formatted, wantCounts...)
})
}
{{< /highlight >}}
## Testing a Pipeline End-to-End

You can use the test classes in the Beam SDKs (such as `TestPipeline` and `PAssert` in the Beam SDK for Java) to test an entire pipeline end-to-end. Typically, to test an entire pipeline, you do the following:
Expand Down Expand Up @@ -283,3 +356,45 @@ class WordCountTest(unittest.TestCase):

# The pipeline will run and verify the results.
{{< /highlight >}}

{{< highlight go >}}
package wordcount

import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

// CountWords and formatFn are omitted for conciseness.
// Code for the Full transforms can be found here:
// https://github.com/apache/beam/blob/master/sdks/go/examples/debugging_wordcount/debugging_wordcount.go

func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { ... }

func formatFn(w string, c int) string { ... }

func TestCountWords(t *testing.T) {
// The pipeline will run and verify the results.
ptest.BuildAndRun(t, func(s beam.Scope) {
words := []string{"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""}

wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}

// Create a PCollection from the words static input data.
input := beam.CreateList(s, words)

// Run ALL the pipeline's transforms
// (in this case, the CountWords composite transform).
output := CountWords(s, input)
formatted := beam.ParDo(s, formatFn, output)

// Assert that the output PCollection matches
// the wantCounts data.
passert.Equals(s, formatted, wantCounts...)
})
}
{{< /highlight >}}

0 comments on commit 9053dbf

Please sign in to comment.