Skip to content

Commit

Permalink
Add both http and grpc tests
Browse files Browse the repository at this point in the history
  • Loading branch information
damondouglas committed Nov 22, 2023
1 parent f05ae33 commit eba3533
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 29 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ tasks.rat {
// Proto/grpc generated wrappers
"**/apache_beam/portability/api/**/*_pb2*.py",
"**/go/pkg/beam/**/*.pb.go",
"**/go/test/**/*.pb.go",
"**/mock-apis/**/*.pb.go",

// Ignore go.sum files, which don't permit headers
Expand Down
60 changes: 55 additions & 5 deletions sdks/go/pkg/beam/io/rrio/rrio.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package rrio supports reading from and writing to Web APIs.
package rrio

import (
"bytes"
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

func init() {
beam.RegisterType(reflect.TypeOf((*configuration)(nil)))
beam.RegisterType(reflect.TypeOf((*noOpSetupTeardown)(nil)))
beam.RegisterDoFn(reflect.TypeOf((*callerFn)(nil)))
}

// Option applies optional features to Execute.
type Option interface {
apply(config *configuration)
}

// WithSetupTeardown applies an optional SetupTeardown that Execute invokes
// during its DoFn's setup and teardown methods.
func WithSetupTeardown(impl SetupTeardown) Option {
return &withSetupTeardown{
impl: impl,
}
}

// Execute a Caller with requests.
//
// Returns output and failure PCollections depending on the outcome of invoking
// the Caller's call func. Additional opts apply additional Option features
// to the transform.
func Execute(s beam.Scope, caller Caller, requests beam.PCollection, opts ...Option) (beam.PCollection, beam.PCollection) {
config := &configuration{
caller: caller,
Expand Down Expand Up @@ -52,24 +79,45 @@ func tryExecute(s beam.Scope, config *configuration, requests beam.PCollection)
return output, failures, nil
}

// Request is a request represented as a byte array payload.
type Request struct {
Payload []byte
}

// Response is a response represented as a byte array payload.
type Response struct {
Payload []byte
}

// ApiIOError encapsulates an error and its associated Request.
type ApiIOError struct {
Message string `beam:"message"`
Request *Request `beam:"request"`
Message string `beam:"message"`
ObservedTime time.Time `beam:"observed_time"`
}

// Caller invokes an API with a Request, yielding either a Response or error.
//
// User code is responsible to convert from a Request's Payload and to
// Response's Payload after successful invocation of an API call. It is
// not recommended to instantiate the API client for each Call invocation.
// Consider implementing the SetupTeardown interface and providing this via
// the WithSetupTeardown Option. Note that a Caller and SetupTeardown can
// represent the same struct type.
type Caller interface {

// Call an API with a Request.
Call(ctx context.Context, request *Request) (*Response, error)
}

// SetupTeardown performs the setup and teardown of an API client. Execute's
// DoFn invokes SetupTeardown during its own setup and teardown methods, if
// provided when calling Execute with the WithSetupTeardown Option.
type SetupTeardown interface {
// Setup an API client.
Setup(ctx context.Context) error

// Teardown an API client.
Teardown(ctx context.Context) error
}

Expand All @@ -95,7 +143,9 @@ func (fn *callerFn) ProcessElement(ctx context.Context, request *Request, output
resp, err := fn.Config.caller.Call(ctx, request)
if err != nil {
failures(&ApiIOError{
Message: err.Error(),
Request: request,
Message: err.Error(),
ObservedTime: time.Now(),
})
return nil
}
Expand All @@ -105,12 +155,12 @@ func (fn *callerFn) ProcessElement(ctx context.Context, request *Request, output

type noOpSetupTeardown struct{}

func (client *noOpSetupTeardown) Setup(ctx context.Context) error {
func (*noOpSetupTeardown) Setup(ctx context.Context) error {
// No Op
return nil
}

func (client *noOpSetupTeardown) Teardown(ctx context.Context) error {
func (*noOpSetupTeardown) Teardown(ctx context.Context) error {
// No Op
return nil
}
Expand Down
Loading

0 comments on commit eba3533

Please sign in to comment.