diff --git a/build.gradle.kts b/build.gradle.kts index 59161809f37c..e01237c658da 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -54,6 +54,7 @@ tasks.rat { // Proto/grpc generated wrappers "**/apache_beam/portability/api/**/*_pb2*.py", "**/go/pkg/beam/**/*.pb.go", + "**/go/test/**/*.pb.go", "**/mock-apis/**/*.pb.go", // Ignore go.sum files, which don't permit headers diff --git a/sdks/go/pkg/beam/io/rrio/rrio.go b/sdks/go/pkg/beam/io/rrio/rrio.go index d96787eed814..75c7b86f06c9 100644 --- a/sdks/go/pkg/beam/io/rrio/rrio.go +++ b/sdks/go/pkg/beam/io/rrio/rrio.go @@ -1,28 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package rrio supports reading from and writing to Web APIs. package rrio import ( "bytes" "context" "fmt" - "github.com/apache/beam/sdks/v2/go/pkg/beam" "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" ) func init() { beam.RegisterType(reflect.TypeOf((*configuration)(nil))) + beam.RegisterType(reflect.TypeOf((*noOpSetupTeardown)(nil))) beam.RegisterDoFn(reflect.TypeOf((*callerFn)(nil))) } +// Option applies optional features to Execute. type Option interface { apply(config *configuration) } +// WithSetupTeardown applies an optional SetupTeardown that Execute invokes +// during its DoFn's setup and teardown methods. func WithSetupTeardown(impl SetupTeardown) Option { return &withSetupTeardown{ impl: impl, } } +// Execute a Caller with requests. +// +// Returns output and failure PCollections depending on the outcome of invoking +// the Caller's call func. Additional opts apply additional Option features +// to the transform. func Execute(s beam.Scope, caller Caller, requests beam.PCollection, opts ...Option) (beam.PCollection, beam.PCollection) { config := &configuration{ caller: caller, @@ -52,24 +79,45 @@ func tryExecute(s beam.Scope, config *configuration, requests beam.PCollection) return output, failures, nil } +// Request is a request represented as a byte array payload. type Request struct { Payload []byte } +// Response is a response represented as a byte array payload. type Response struct { Payload []byte } +// ApiIOError encapsulates an error and its associated Request. type ApiIOError struct { - Message string `beam:"message"` + Request *Request `beam:"request"` + Message string `beam:"message"` + ObservedTime time.Time `beam:"observed_time"` } +// Caller invokes an API with a Request, yielding either a Response or error. +// +// User code is responsible to convert from a Request's Payload and to +// Response's Payload after successful invocation of an API call. It is +// not recommended to instantiate the API client for each Call invocation. +// Consider implementing the SetupTeardown interface and providing this via +// the WithSetupTeardown Option. Note that a Caller and SetupTeardown can +// represent the same struct type. type Caller interface { + + // Call an API with a Request. Call(ctx context.Context, request *Request) (*Response, error) } +// SetupTeardown performs the setup and teardown of an API client. Execute's +// DoFn invokes SetupTeardown during its own setup and teardown methods, if +// provided when calling Execute with the WithSetupTeardown Option. type SetupTeardown interface { + // Setup an API client. Setup(ctx context.Context) error + + // Teardown an API client. Teardown(ctx context.Context) error } @@ -95,7 +143,9 @@ func (fn *callerFn) ProcessElement(ctx context.Context, request *Request, output resp, err := fn.Config.caller.Call(ctx, request) if err != nil { failures(&ApiIOError{ - Message: err.Error(), + Request: request, + Message: err.Error(), + ObservedTime: time.Now(), }) return nil } @@ -105,12 +155,12 @@ func (fn *callerFn) ProcessElement(ctx context.Context, request *Request, output type noOpSetupTeardown struct{} -func (client *noOpSetupTeardown) Setup(ctx context.Context) error { +func (*noOpSetupTeardown) Setup(ctx context.Context) error { // No Op return nil } -func (client *noOpSetupTeardown) Teardown(ctx context.Context) error { +func (*noOpSetupTeardown) Teardown(ctx context.Context) error { // No Op return nil } diff --git a/sdks/go/test/integration/io/rrio/rrio_test.go b/sdks/go/test/integration/io/rrio/rrio_test.go index 3a49fb4428da..efeaa1e77d78 100644 --- a/sdks/go/test/integration/io/rrio/rrio_test.go +++ b/sdks/go/test/integration/io/rrio/rrio_test.go @@ -1,8 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package rrio import ( + "bytes" "context" + "encoding/json" + "flag" "fmt" + "net/http" + "os" + "path" + "reflect" + "strings" + "testing" + "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rrio" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" @@ -11,43 +37,95 @@ import ( "github.com/golang/protobuf/proto" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "reflect" - "testing" - "time" ) //go:generate cp -R ../../../../../../.test-infra/mock-apis/src/main/go/internal/proto/echo echo const ( + grpcEndpointFlag = "grpcEndpointAddress" + httpEndpointFlag = "httpEndpointAddress" + echoPath = "v1/echo" echoShouldNeverExceedQuotaId = "echo-should-never-exceed-quota" - timeout = time.Hour * 3 + echoShouldExceedQuotaId = "echo-should-exceed-quota" + timeout = time.Second * 3 + moreInfoUrl = "https://github.com/apache/beam/tree/master/.test-infra/mock-apis#integration" +) + +var ( + moreInfo = fmt.Sprintf("See %s for more information on how to get the relevant value for your test.", moreInfoUrl) + grpcAddress = flag.String(grpcEndpointFlag, "", "The endpoint to target gRPC calls to the Echo service. "+moreInfo) + httpAddress = flag.String(httpEndpointFlag, "", "The endpoint to target HTTP calls to the Echo service. "+moreInfo) ) func init() { - beam.RegisterType(reflect.TypeOf((*echoClient)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*echoGrpcClient)(nil)).Elem()) beam.RegisterDoFn(encodeFn) + beam.RegisterDoFn(errMessageFn) } func TestExecute(t *testing.T) { - client := &echoClient{ - Address: "localhost:50051", + grpcClient := &echoGrpcClient{ + Address: *grpcAddress, + } + httpClient := &echoHttpClient{ + Host: *httpAddress, } payload := []byte("payload") + type args struct { + caller rrio.Caller + setupTeardown rrio.SetupTeardown + } for _, tt := range []struct { - quotaId string - numCalls int - wantFailures bool + args args + quotaId string + numCalls int + drainQuotaFirst bool + wantErrs []any }{ { + args: args{ + caller: grpcClient, + setupTeardown: grpcClient, + }, + quotaId: echoShouldNeverExceedQuotaId, + numCalls: 1, + }, + { + args: args{ + caller: httpClient, + }, quotaId: echoShouldNeverExceedQuotaId, numCalls: 1, }, + { + args: args{ + caller: grpcClient, + setupTeardown: grpcClient, + }, + quotaId: echoShouldExceedQuotaId, + drainQuotaFirst: true, + wantErrs: []any{ + "error: Echo(id:\"echo-should-exceed-quota\" payload:\"payload\") err rpc error: code = ResourceExhausted desc = error: resource exhausted for: echo-should-exceed-quota", + }, + numCalls: 1, + }, } { t.Run(tt.quotaId, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + + if tt.drainQuotaFirst { + drainQuota(ctx, t, grpcClient, tt.quotaId, 3) + } + var reqs []any var want []any + + var opts []rrio.Option + if tt.args.setupTeardown != nil { + opts = append(opts, rrio.WithSetupTeardown(tt.args.setupTeardown)) + } + for i := 0; i < tt.numCalls; i++ { b, err := proto.Marshal(&echo.EchoRequest{ Id: tt.quotaId, @@ -59,7 +137,7 @@ func TestExecute(t *testing.T) { reqs = append(reqs, &rrio.Request{ Payload: b, }) - if !tt.wantFailures { + if len(tt.wantErrs) == 0 { want = append(want, &echo.EchoResponse{ Id: tt.quotaId, Payload: payload, @@ -69,14 +147,12 @@ func TestExecute(t *testing.T) { p, s := beam.NewPipelineWithRoot() s = s.WithContext(ctx, "rrio.TestExecute") input := beam.Create(s, reqs...) - output, failures := rrio.Execute(s, client, input, rrio.WithSetupTeardown(client)) + output, failures := rrio.Execute(s, grpcClient, input, opts...) got := beam.ParDo(s, encodeFn, output) - wantFailureCount := 0 - if tt.wantFailures { - wantFailureCount = tt.numCalls - } - passert.Count(s, failures, "failures", wantFailureCount) + errs := beam.ParDo(s, errMessageFn, failures) + + passert.Equals(s, errs, tt.wantErrs...) passert.Equals(s, got, want...) ptest.RunAndValidate(t, p) @@ -84,16 +160,43 @@ func TestExecute(t *testing.T) { } } -var _ rrio.Caller = &echoClient{} -var _ rrio.SetupTeardown = &echoClient{} +// This method is needed in situations where we want to test quota errors. +// Due to the architecture constraints, we need a quota of at least 1. +// Therefore, it needs to be drained before testing expected exceeded quota +// errors. +func drainQuota(ctx context.Context, t *testing.T, client *echoGrpcClient, quotaId string, nCalls int) { + t.Helper() + if err := client.Setup(ctx); err != nil { + t.Fatalf("error: echoGrpcClient.Setup() %v", err) + } + req := &echo.EchoRequest{ + Id: quotaId, + Payload: []byte(""), + } + b, err := proto.Marshal(req) + if err != nil { + t.Fatalf("error: proto.Marshal(echo.EchoRequest) %v", err) + } + for i := 0; i < nCalls; i++ { + // we don't care about either the response or the error in this helper + // method. The responses and errors are tested later. + _, _ = client.Call(ctx, &rrio.Request{ + Payload: b, + }) + } +} -type echoClient struct { +// Verify interface implementation +var _ rrio.Caller = &echoGrpcClient{} +var _ rrio.SetupTeardown = &echoGrpcClient{} + +type echoGrpcClient struct { Address string conn *grpc.ClientConn internal echo.EchoServiceClient } -func (client *echoClient) Setup(ctx context.Context) error { +func (client *echoGrpcClient) Setup(ctx context.Context) error { conn, err := grpc.DialContext(ctx, client.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return fmt.Errorf("error dialing gRPC endpoint: %s, err %w", client.Address, err) @@ -104,11 +207,11 @@ func (client *echoClient) Setup(ctx context.Context) error { return nil } -func (client *echoClient) Teardown(_ context.Context) error { +func (client *echoGrpcClient) Teardown(_ context.Context) error { return client.conn.Close() } -func (client *echoClient) Call(ctx context.Context, request *rrio.Request) (*rrio.Response, error) { +func (client *echoGrpcClient) Call(ctx context.Context, request *rrio.Request) (*rrio.Response, error) { var req echo.EchoRequest if err := proto.Unmarshal(request.Payload, &req); err != nil { return nil, fmt.Errorf("proto.Unmarshall(rrio.Request.Payload) err %w", err) @@ -126,6 +229,40 @@ func (client *echoClient) Call(ctx context.Context, request *rrio.Request) (*rri }, nil } +// Verify interface implementation +var _ rrio.Caller = &echoHttpClient{} + +type echoHttpClient struct { + Host string +} + +func (client *echoHttpClient) Call(ctx context.Context, request *rrio.Request) (*rrio.Response, error) { + var req *echo.EchoRequest + var response *echo.EchoResponse + var body bytes.Buffer + if err := proto.Unmarshal(request.Payload, req); err != nil { + return nil, fmt.Errorf("proto.Unmarshall(rrio.Request.Payload) err %w", err) + } + if err := json.NewEncoder(&body).Encode(req); err != nil { + return nil, fmt.Errorf("json.Encode(echo.EchoRequest) err %w", err) + } + rawUrl := path.Join(client.Host, echoPath) + resp, err := http.DefaultClient.Post(rawUrl, "application/json", &body) + if err != nil { + return nil, fmt.Errorf("http Post(%s, %+v) err %w", rawUrl, req, err) + } + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + return nil, fmt.Errorf("json.Decode(echo.EchoResponse) err %w", err) + } + b, err := proto.Marshal(response) + if err != nil { + return nil, fmt.Errorf("proto.Marshal(echo.EchoResponse) err %w", err) + } + return &rrio.Response{ + Payload: b, + }, nil +} + func encodeFn(resp *rrio.Response) (*echo.EchoResponse, error) { var v echo.EchoResponse if err := proto.Unmarshal(resp.Payload, &v); err != nil { @@ -134,6 +271,28 @@ func encodeFn(resp *rrio.Response) (*echo.EchoResponse, error) { return &v, nil } +func errMessageFn(err *rrio.ApiIOError) string { + return err.Message +} + func TestMain(m *testing.M) { - ptest.Main(m) + beam.Init() + if !flag.Parsed() { + flag.Parse() + } + var missing []string + for _, f := range []string{ + grpcEndpointFlag, + httpEndpointFlag, + } { + if flag.Lookup(f).Value.String() == "" { + missing = append(missing, "--"+f) + } + } + + if len(missing) > 0 { + panic(fmt.Sprintf("missing required flags: %s", strings.Join(missing, " "))) + } + + os.Exit(m.Run()) }