diff --git a/website/www/site/content/en/documentation/pipelines/test-your-pipeline.md b/website/www/site/content/en/documentation/pipelines/test-your-pipeline.md index 744b12aa1625..05767fea4846 100644 --- a/website/www/site/content/en/documentation/pipelines/test-your-pipeline.md +++ b/website/www/site/content/en/documentation/pipelines/test-your-pipeline.md @@ -65,6 +65,27 @@ with TestPipeline as p: ... {{< /highlight >}} +{{< highlight go >}} +import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + +// Override TestMain with ptest.Main, +// once per package. +func TestMain(m *testing.M) { + ptest.Main(m) +} + +func TestPipeline(t *testing.T) { + ... + // The Go SDK doesn't use a TestPipeline concept, + // and recommends using the ptest harness + // to wrap pipeline construction. + pr := ptest.BuildAndRun(t, func(s beam.Scope) { + ... + }) + ... +} +{{< /highlight >}} + > **Note:** Read about testing unbounded pipelines in Beam in [this blog post](/blog/2016/10/20/test-stream.html). ### Using the Create Transform @@ -100,6 +121,16 @@ assert_that( equal_to(["elem1", "elem3", "elem2"])) {{< /highlight >}} +{{< highlight go >}} +import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + +output := ... // beam.PCollection + +// Check whether a PCollection contains some elements in any order. +passert.EqualsList(s, output, ["elem1", "elem3", "elem2"]) +{{< /highlight >}} + + {{< paragraph class="language-java" >}} Any Java code that uses `PAssert` must link in `JUnit` and `Hamcrest`. If you're using Maven, you can link in `Hamcrest` by adding the following dependency to your project's `pom.xml` file: {{< /paragraph >}} @@ -194,6 +225,48 @@ class CountTest(unittest.TestCase): # The pipeline will run and verify the results. {{< /highlight >}} +{{< highlight go >}} +import ( + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" +) + +// formatFn takes a key value pair and puts them +// into a single string for comparison. +func formatFn(w string, c int) string { + return fmt.Sprintf("%s: %d", w, c) +} + +// Register the functional DoFn to ensure execution on workers. +func init() { + register.Function2x1(formatFn) +} + +func TestCountWords(t *testing.T) { + // The pipeline will run and verify the results. + ptest.BuildAndRun(t, func(s beam.Scope) { + words := []string{"hi", "there", "hi", "hi", "sue", "bob", + "hi", "sue", "", "", "ZOW", "bob", ""} + + wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"} + + // Create a PCollection from the words static input data. + input := beam.CreateList(s, words) + + // Apply the Count transform under test. + output := stats.Count(s, col) + formatted := beam.ParDo(s, formatFn, output) + + // Assert that the output PCollection matches the wantCounts data. + passert.Equals(s, formatted, wantCounts...) + }) +} +{{< /highlight >}} ## Testing a Pipeline End-to-End You can use the test classes in the Beam SDKs (such as `TestPipeline` and `PAssert` in the Beam SDK for Java) to test an entire pipeline end-to-end. Typically, to test an entire pipeline, you do the following: @@ -283,3 +356,45 @@ class WordCountTest(unittest.TestCase): # The pipeline will run and verify the results. {{< /highlight >}} + +{{< highlight go >}} +package wordcount + +import ( + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" +) + +// CountWords and formatFn are omitted for conciseness. +// Code for the Full transforms can be found here: +// https://github.com/apache/beam/blob/master/sdks/go/examples/debugging_wordcount/debugging_wordcount.go + +func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { ... } + +func formatFn(w string, c int) string { ... } + +func TestCountWords(t *testing.T) { + // The pipeline will run and verify the results. + ptest.BuildAndRun(t, func(s beam.Scope) { + words := []string{"hi", "there", "hi", "hi", "sue", "bob", + "hi", "sue", "", "", "ZOW", "bob", ""} + + wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"} + + // Create a PCollection from the words static input data. + input := beam.CreateList(s, words) + + // Run ALL the pipeline's transforms + // (in this case, the CountWords composite transform). + output := CountWords(s, input) + formatted := beam.ParDo(s, formatFn, output) + + // Assert that the output PCollection matches + // the wantCounts data. + passert.Equals(s, formatted, wantCounts...) + }) +} +{{< /highlight >}}