Skip to content

Commit

Permalink
[Go SDK]: Implement natsio.Read transform for reading from NATS (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
johannaojeling authored Dec 12, 2023
1 parent d59e192 commit 90e79ae
Show file tree
Hide file tree
Showing 14 changed files with 1,020 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)).
* Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546))
* Add support for generating text embeddings in MLTransform for Vertex AI and Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564))
* NATS IO connector added (Go) ([#29000](https://github.com/apache/beam/issues/29000)).

## New Features / Improvements

Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/io/natsio/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type natsFn struct {
}

func (fn *natsFn) Setup() error {
if fn.nc != nil && fn.js != nil {
return nil
}

var opts []nats.Option
if fn.CredsFile != "" {
opts = append(opts, nats.UserCredentials(fn.CredsFile))
Expand Down
77 changes: 77 additions & 0 deletions sdks/go/pkg/beam/io/natsio/end_estimator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package natsio

import (
"context"
"errors"
"fmt"

"github.com/nats-io/nats.go/jetstream"
)

type endEstimator struct {
js jetstream.JetStream
stream string
subject string
}

func newEndEstimator(js jetstream.JetStream, stream string, subject string) *endEstimator {
return &endEstimator{
js: js,
stream: stream,
subject: subject,
}
}

func (e *endEstimator) Estimate() int64 {
ctx := context.Background()
end, err := e.getEndSeqNo(ctx)
if err != nil {
panic(err)
}
return end
}

func (e *endEstimator) getEndSeqNo(ctx context.Context) (int64, error) {
str, err := e.js.Stream(ctx, e.stream)
if err != nil {
return -1, fmt.Errorf("error getting stream: %v", err)
}

msg, err := str.GetLastMsgForSubject(ctx, e.subject)
if err != nil {
if isMessageNotFound(err) {
return 1, nil
}

return -1, fmt.Errorf("error getting last message: %v", err)
}

return int64(msg.Sequence) + 1, nil
}

func isMessageNotFound(err error) bool {
var jsErr jetstream.JetStreamError
if errors.As(err, &jsErr) {
apiErr := jsErr.APIError()
if apiErr.ErrorCode == jetstream.JSErrCodeMessageNotFound {
return true
}
}

return false
}
78 changes: 78 additions & 0 deletions sdks/go/pkg/beam/io/natsio/end_estimator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package natsio

import (
"context"
"fmt"
"testing"

"github.com/nats-io/nats.go"
)

func Test_endEstimator_Estimate(t *testing.T) {
tests := []struct {
name string
msgs []*nats.Msg
subject string
want int64
}{
{
name: "Estimate end for published messages",
msgs: []*nats.Msg{
{
Subject: "subject.1",
Data: []byte("msg1"),
},
{
Subject: "subject.1",
Data: []byte("msg2"),
},
{
Subject: "subject.2",
Data: []byte("msg3"),
},
},
subject: "subject.1",
want: 3,
},
{
name: "Estimate end for no published messages",
subject: "subject.1",
want: 1,
},
}
for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
srv := newServer(t)
url := srv.ClientURL()
conn := newConn(t, url)
js := newJetStream(t, conn)

stream := fmt.Sprintf("STREAM-%d", i)
subjectFilter := "subject.*"

createStream(ctx, t, js, stream, []string{subjectFilter})
publishMessages(ctx, t, js, tt.msgs)

estimator := newEndEstimator(js, stream, tt.subject)
if got := estimator.Estimate(); got != tt.want {
t.Fatalf("Estimate() = %v, want %v", got, tt.want)
}
})
}
}
18 changes: 18 additions & 0 deletions sdks/go/pkg/beam/io/natsio/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,27 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/natsio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"github.com/nats-io/nats.go"
)

func ExampleRead() {
beam.Init()

p, s := beam.NewPipelineWithRoot()

uri := "nats://localhost:4222"
stream := "EVENTS"
subject := "events.*"

col := natsio.Read(s, uri, stream, subject)
debug.Print(s, col)

if err := beamx.Run(context.Background(), p); err != nil {
log.Fatalf("Failed to execute job: %v", err)
}
}

func ExampleWrite() {
beam.Init()

Expand Down
48 changes: 46 additions & 2 deletions sdks/go/pkg/beam/io/natsio/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package natsio
import (
"context"
"testing"
"time"

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats-server/v2/test"
Expand Down Expand Up @@ -62,8 +63,8 @@ func newJetStream(t *testing.T, conn *nats.Conn) jetstream.JetStream {
}

func createStream(
t *testing.T,
ctx context.Context,
t *testing.T,
js jetstream.JetStream,
stream string,
subjects []string,
Expand All @@ -89,8 +90,8 @@ func createStream(
}

func createConsumer(
t *testing.T,
ctx context.Context,
t *testing.T,
js jetstream.JetStream,
stream string,
subjects []string,
Expand Down Expand Up @@ -128,3 +129,46 @@ func fetchMessages(t *testing.T, cons jetstream.Consumer, size int) []jetstream.

return result
}

func publishMessages(ctx context.Context, t *testing.T, js jetstream.JetStream, msgs []*nats.Msg) {
t.Helper()

for _, msg := range msgs {
if _, err := js.PublishMsg(ctx, msg); err != nil {
t.Fatalf("Failed to publish message: %v", err)
}
}
}

func messagesWithPublishingTime(
t *testing.T,
pubMsgs []jetstream.Msg,
pubIndices []int,
want []any,
) []any {
t.Helper()

wantWTime := make([]any, len(want))

for i := range want {
pubIdx := pubIndices[i]
pubMsg := pubMsgs[pubIdx]

wantMsg := want[i].(ConsumerMessage)
wantMsg.PublishingTime = messageTimestamp(t, pubMsg)
wantWTime[i] = wantMsg
}

return wantWTime
}

func messageTimestamp(t *testing.T, msg jetstream.Msg) time.Time {
t.Helper()

metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to retrieve metadata: %v", err)
}

return metadata.Timestamp
}
Loading

0 comments on commit 90e79ae

Please sign in to comment.