From 90e79ae373ab38cf4e48e9854c28aaffb0938458 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johanna=20=C3=96jeling?= <51084516+johannaojeling@users.noreply.github.com> Date: Tue, 12 Dec 2023 22:19:06 +0100 Subject: [PATCH] [Go SDK]: Implement natsio.Read transform for reading from NATS (#29410) --- CHANGES.md | 1 + sdks/go/pkg/beam/io/natsio/common.go | 4 + sdks/go/pkg/beam/io/natsio/end_estimator.go | 77 +++++ .../pkg/beam/io/natsio/end_estimator_test.go | 78 +++++ sdks/go/pkg/beam/io/natsio/example_test.go | 18 ++ sdks/go/pkg/beam/io/natsio/helper_test.go | 48 ++- sdks/go/pkg/beam/io/natsio/read.go | 289 ++++++++++++++++++ sdks/go/pkg/beam/io/natsio/read_option.go | 98 ++++++ sdks/go/pkg/beam/io/natsio/read_test.go | 212 +++++++++++++ sdks/go/pkg/beam/io/natsio/time_policy.go | 50 +++ .../go/pkg/beam/io/natsio/time_policy_test.go | 45 +++ .../pkg/beam/io/natsio/watermark_estimator.go | 33 ++ .../io/natsio/watermark_estimator_test.go | 67 ++++ sdks/go/pkg/beam/io/natsio/write_test.go | 4 +- 14 files changed, 1020 insertions(+), 4 deletions(-) create mode 100644 sdks/go/pkg/beam/io/natsio/end_estimator.go create mode 100644 sdks/go/pkg/beam/io/natsio/end_estimator_test.go create mode 100644 sdks/go/pkg/beam/io/natsio/read.go create mode 100644 sdks/go/pkg/beam/io/natsio/read_option.go create mode 100644 sdks/go/pkg/beam/io/natsio/read_test.go create mode 100644 sdks/go/pkg/beam/io/natsio/time_policy.go create mode 100644 sdks/go/pkg/beam/io/natsio/time_policy_test.go create mode 100644 sdks/go/pkg/beam/io/natsio/watermark_estimator.go create mode 100644 sdks/go/pkg/beam/io/natsio/watermark_estimator_test.go diff --git a/CHANGES.md b/CHANGES.md index 60b5a820cf3b..70ce1a70b7ee 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ * Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)). * Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546)) * Add support for generating text embeddings in MLTransform for Vertex AI and Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564)) +* NATS IO connector added (Go) ([#29000](https://github.com/apache/beam/issues/29000)). ## New Features / Improvements diff --git a/sdks/go/pkg/beam/io/natsio/common.go b/sdks/go/pkg/beam/io/natsio/common.go index 53f595516987..72640894c76e 100644 --- a/sdks/go/pkg/beam/io/natsio/common.go +++ b/sdks/go/pkg/beam/io/natsio/common.go @@ -31,6 +31,10 @@ type natsFn struct { } func (fn *natsFn) Setup() error { + if fn.nc != nil && fn.js != nil { + return nil + } + var opts []nats.Option if fn.CredsFile != "" { opts = append(opts, nats.UserCredentials(fn.CredsFile)) diff --git a/sdks/go/pkg/beam/io/natsio/end_estimator.go b/sdks/go/pkg/beam/io/natsio/end_estimator.go new file mode 100644 index 000000000000..6b2f18e10ce3 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/end_estimator.go @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import ( + "context" + "errors" + "fmt" + + "github.com/nats-io/nats.go/jetstream" +) + +type endEstimator struct { + js jetstream.JetStream + stream string + subject string +} + +func newEndEstimator(js jetstream.JetStream, stream string, subject string) *endEstimator { + return &endEstimator{ + js: js, + stream: stream, + subject: subject, + } +} + +func (e *endEstimator) Estimate() int64 { + ctx := context.Background() + end, err := e.getEndSeqNo(ctx) + if err != nil { + panic(err) + } + return end +} + +func (e *endEstimator) getEndSeqNo(ctx context.Context) (int64, error) { + str, err := e.js.Stream(ctx, e.stream) + if err != nil { + return -1, fmt.Errorf("error getting stream: %v", err) + } + + msg, err := str.GetLastMsgForSubject(ctx, e.subject) + if err != nil { + if isMessageNotFound(err) { + return 1, nil + } + + return -1, fmt.Errorf("error getting last message: %v", err) + } + + return int64(msg.Sequence) + 1, nil +} + +func isMessageNotFound(err error) bool { + var jsErr jetstream.JetStreamError + if errors.As(err, &jsErr) { + apiErr := jsErr.APIError() + if apiErr.ErrorCode == jetstream.JSErrCodeMessageNotFound { + return true + } + } + + return false +} diff --git a/sdks/go/pkg/beam/io/natsio/end_estimator_test.go b/sdks/go/pkg/beam/io/natsio/end_estimator_test.go new file mode 100644 index 000000000000..855547a0e297 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/end_estimator_test.go @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import ( + "context" + "fmt" + "testing" + + "github.com/nats-io/nats.go" +) + +func Test_endEstimator_Estimate(t *testing.T) { + tests := []struct { + name string + msgs []*nats.Msg + subject string + want int64 + }{ + { + name: "Estimate end for published messages", + msgs: []*nats.Msg{ + { + Subject: "subject.1", + Data: []byte("msg1"), + }, + { + Subject: "subject.1", + Data: []byte("msg2"), + }, + { + Subject: "subject.2", + Data: []byte("msg3"), + }, + }, + subject: "subject.1", + want: 3, + }, + { + name: "Estimate end for no published messages", + subject: "subject.1", + want: 1, + }, + } + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + srv := newServer(t) + url := srv.ClientURL() + conn := newConn(t, url) + js := newJetStream(t, conn) + + stream := fmt.Sprintf("STREAM-%d", i) + subjectFilter := "subject.*" + + createStream(ctx, t, js, stream, []string{subjectFilter}) + publishMessages(ctx, t, js, tt.msgs) + + estimator := newEndEstimator(js, stream, tt.subject) + if got := estimator.Estimate(); got != tt.want { + t.Fatalf("Estimate() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/io/natsio/example_test.go b/sdks/go/pkg/beam/io/natsio/example_test.go index 0516b8efa921..984261a3686b 100644 --- a/sdks/go/pkg/beam/io/natsio/example_test.go +++ b/sdks/go/pkg/beam/io/natsio/example_test.go @@ -22,9 +22,27 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/natsio" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" "github.com/nats-io/nats.go" ) +func ExampleRead() { + beam.Init() + + p, s := beam.NewPipelineWithRoot() + + uri := "nats://localhost:4222" + stream := "EVENTS" + subject := "events.*" + + col := natsio.Read(s, uri, stream, subject) + debug.Print(s, col) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} + func ExampleWrite() { beam.Init() diff --git a/sdks/go/pkg/beam/io/natsio/helper_test.go b/sdks/go/pkg/beam/io/natsio/helper_test.go index cd47ed331de0..ac7eedac1d44 100644 --- a/sdks/go/pkg/beam/io/natsio/helper_test.go +++ b/sdks/go/pkg/beam/io/natsio/helper_test.go @@ -18,6 +18,7 @@ package natsio import ( "context" "testing" + "time" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats-server/v2/test" @@ -62,8 +63,8 @@ func newJetStream(t *testing.T, conn *nats.Conn) jetstream.JetStream { } func createStream( - t *testing.T, ctx context.Context, + t *testing.T, js jetstream.JetStream, stream string, subjects []string, @@ -89,8 +90,8 @@ func createStream( } func createConsumer( - t *testing.T, ctx context.Context, + t *testing.T, js jetstream.JetStream, stream string, subjects []string, @@ -128,3 +129,46 @@ func fetchMessages(t *testing.T, cons jetstream.Consumer, size int) []jetstream. return result } + +func publishMessages(ctx context.Context, t *testing.T, js jetstream.JetStream, msgs []*nats.Msg) { + t.Helper() + + for _, msg := range msgs { + if _, err := js.PublishMsg(ctx, msg); err != nil { + t.Fatalf("Failed to publish message: %v", err) + } + } +} + +func messagesWithPublishingTime( + t *testing.T, + pubMsgs []jetstream.Msg, + pubIndices []int, + want []any, +) []any { + t.Helper() + + wantWTime := make([]any, len(want)) + + for i := range want { + pubIdx := pubIndices[i] + pubMsg := pubMsgs[pubIdx] + + wantMsg := want[i].(ConsumerMessage) + wantMsg.PublishingTime = messageTimestamp(t, pubMsg) + wantWTime[i] = wantMsg + } + + return wantWTime +} + +func messageTimestamp(t *testing.T, msg jetstream.Msg) time.Time { + t.Helper() + + metadata, err := msg.Metadata() + if err != nil { + t.Fatalf("Failed to retrieve metadata: %v", err) + } + + return metadata.Timestamp +} diff --git a/sdks/go/pkg/beam/io/natsio/read.go b/sdks/go/pkg/beam/io/natsio/read.go new file mode 100644 index 000000000000..df5a53accbe1 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/read.go @@ -0,0 +1,289 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import ( + "context" + "fmt" + "math" + "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func init() { + register.DoFn5x2[ + context.Context, *watermarkEstimator, *sdf.LockRTracker, []byte, + func(beam.EventTime, ConsumerMessage), sdf.ProcessContinuation, error, + ]( + &readFn{}, + ) + register.Emitter2[beam.EventTime, ConsumerMessage]() + beam.RegisterType(reflect.TypeOf((*ConsumerMessage)(nil)).Elem()) +} + +const ( + defaultFetchSize = 100 + defaultStartSeqNo = 1 + defaultEndSeqNo = math.MaxInt64 + fetchTimeout = 3 * time.Second + assumedLag = 1 * time.Second + resumeDelay = 5 * time.Second +) + +type ConsumerMessage struct { + Subject string + PublishingTime time.Time + ID string + Headers map[string][]string + Data []byte +} + +// Read reads messages from NATS JetStream and returns a PCollection. +// Read takes a variable number of ReadOptionFn to configure the read operation: +// - UserCredentials: path to the user credentials file. Defaults to empty. +// - ProcessingTimePolicy: whether to use the pipeline processing time of the messages as the event +// time. Defaults to true. +// - PublishingTimePolicy: whether to use the publishing time of the messages as the event time. +// Defaults to false. +// - FetchSize: the maximum number of messages to retrieve at a time. Defaults to 100. +// - StartSeqNo: the start sequence number of messages to read. Defaults to 1. +// - EndSeqNo: the end sequence number of messages to read (exclusive). Defaults to math.MaxInt64. +func Read( + s beam.Scope, + uri string, + stream string, + subject string, + opts ...ReadOptionFn, +) beam.PCollection { + s = s.Scope("natsio.Read") + + option := &readOption{ + TimePolicy: processingTimePolicy, + FetchSize: defaultFetchSize, + StartSeqNo: defaultStartSeqNo, + EndSeqNo: defaultEndSeqNo, + } + + for _, opt := range opts { + if err := opt(option); err != nil { + panic(fmt.Sprintf("natsio.Read: invalid option: %v", err)) + } + } + + imp := beam.Impulse(s) + return beam.ParDo(s, newReadFn(uri, stream, subject, option), imp) +} + +type readFn struct { + natsFn + Stream string + Subject string + TimePolicy timePolicy + FetchSize int + StartSeqNo int64 + EndSeqNo int64 + timestampFn timestampFn +} + +func newReadFn(uri string, stream string, subject string, option *readOption) *readFn { + return &readFn{ + natsFn: natsFn{ + URI: uri, + CredsFile: option.CredsFile, + }, + Stream: stream, + Subject: subject, + TimePolicy: option.TimePolicy, + FetchSize: option.FetchSize, + StartSeqNo: option.StartSeqNo, + EndSeqNo: option.EndSeqNo, + } +} + +func (fn *readFn) Setup() error { + if err := fn.natsFn.Setup(); err != nil { + return err + } + + fn.timestampFn = fn.TimePolicy.TimestampFn() + return nil +} + +func (fn *readFn) CreateInitialRestriction(_ []byte) offsetrange.Restriction { + return offsetrange.Restriction{ + Start: fn.StartSeqNo, + End: fn.EndSeqNo, + } +} + +func (fn *readFn) SplitRestriction( + _ []byte, + rest offsetrange.Restriction, +) []offsetrange.Restriction { + return []offsetrange.Restriction{rest} +} + +func (fn *readFn) RestrictionSize(_ []byte, rest offsetrange.Restriction) (float64, error) { + if err := fn.natsFn.Setup(); err != nil { + return -1, err + } + + rt, err := fn.createRTracker(rest) + if err != nil { + return -1, err + } + + _, remaining := rt.GetProgress() + return remaining, nil +} + +func (fn *readFn) CreateTracker(rest offsetrange.Restriction) (*sdf.LockRTracker, error) { + rt, err := fn.createRTracker(rest) + if err != nil { + return nil, err + } + + return sdf.NewLockRTracker(rt), nil +} + +func (fn *readFn) TruncateRestriction(rt *sdf.LockRTracker, _ []byte) offsetrange.Restriction { + start := rt.GetRestriction().(offsetrange.Restriction).Start + return offsetrange.Restriction{ + Start: start, + End: start, + } +} + +func (fn *readFn) InitialWatermarkEstimatorState( + et beam.EventTime, + _ offsetrange.Restriction, + _ []byte, +) int64 { + return et.Milliseconds() +} + +func (fn *readFn) CreateWatermarkEstimator(ms int64) *watermarkEstimator { + return &watermarkEstimator{state: ms} +} + +func (fn *readFn) WatermarkEstimatorState(we *watermarkEstimator) int64 { + return we.state +} + +func (fn *readFn) ProcessElement( + ctx context.Context, + we *watermarkEstimator, + rt *sdf.LockRTracker, + _ []byte, + emit func(beam.EventTime, ConsumerMessage), +) (sdf.ProcessContinuation, error) { + startSeqNo := rt.GetRestriction().(offsetrange.Restriction).Start + cons, err := fn.createConsumer(ctx, startSeqNo) + if err != nil { + return sdf.StopProcessing(), err + } + + for { + msgs, err := cons.Fetch(fn.FetchSize, jetstream.FetchMaxWait(fetchTimeout)) + if err != nil { + return nil, fmt.Errorf("error fetching messages: %v", err) + } + + count := 0 + for msg := range msgs.Messages() { + metadata, err := msg.Metadata() + if err != nil { + return sdf.StopProcessing(), fmt.Errorf("error retrieving metadata: %v", err) + } + + seqNo := int64(metadata.Sequence.Stream) + if !rt.TryClaim(seqNo) { + return sdf.StopProcessing(), nil + } + + et := fn.timestampFn(metadata.Timestamp) + consMsg := createConsumerMessage(msg, metadata.Timestamp) + emit(et, consMsg) + + count++ + } + + if err := msgs.Error(); err != nil { + return sdf.StopProcessing(), fmt.Errorf("error in message batch: %v", err) + } + + if count == 0 { + fn.updateWatermarkManually(we) + return sdf.ResumeProcessingIn(resumeDelay), nil + } + } +} + +func (fn *readFn) createRTracker(rest offsetrange.Restriction) (sdf.RTracker, error) { + if rest.End < math.MaxInt64 { + return offsetrange.NewTracker(rest), nil + } + + estimator := newEndEstimator(fn.js, fn.Stream, fn.Subject) + rt, err := offsetrange.NewGrowableTracker(rest, estimator) + if err != nil { + return nil, fmt.Errorf("error creating growable tracker: %v", err) + } + + return rt, nil +} + +func (fn *readFn) createConsumer( + ctx context.Context, + startSeqNo int64, +) (jetstream.Consumer, error) { + cfg := jetstream.OrderedConsumerConfig{ + FilterSubjects: []string{fn.Subject}, + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + OptStartSeq: uint64(startSeqNo), + MaxResetAttempts: 5, + } + + cons, err := fn.js.OrderedConsumer(ctx, fn.Stream, cfg) + if err != nil { + return nil, fmt.Errorf("error creating consumer: %v", err) + } + + return cons, nil +} + +func createConsumerMessage(msg jetstream.Msg, publishingTime time.Time) ConsumerMessage { + return ConsumerMessage{ + Subject: msg.Subject(), + PublishingTime: publishingTime, + ID: msg.Headers().Get(nats.MsgIdHdr), + Headers: msg.Headers(), + Data: msg.Data(), + } +} + +func (fn *readFn) updateWatermarkManually(we *watermarkEstimator) { + t := time.Now().Add(-1 * assumedLag) + et := fn.timestampFn(t) + we.ObserveTimestamp(et.ToTime()) +} diff --git a/sdks/go/pkg/beam/io/natsio/read_option.go b/sdks/go/pkg/beam/io/natsio/read_option.go new file mode 100644 index 000000000000..f9d715be1022 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/read_option.go @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import "errors" + +var ( + errInvalidFetchSize = errors.New("fetch size must be greater than 0") + errInvalidStartSeqNo = errors.New("start sequence number must be greater than 0") + errInvalidEndSeqNo = errors.New("end sequence number must be greater than 0") +) + +type readOption struct { + CredsFile string + TimePolicy timePolicy + FetchSize int + StartSeqNo int64 + EndSeqNo int64 +} + +// ReadOptionFn is a function that can be passed to Read to configure options for reading +// from NATS. +type ReadOptionFn func(option *readOption) error + +// ReadUserCredentials sets the user credentials when connecting to NATS. +func ReadUserCredentials(credsFile string) ReadOptionFn { + return func(o *readOption) error { + o.CredsFile = credsFile + return nil + } +} + +// ReadProcessingTimePolicy specifies that the pipeline processing time of the messages should be +// used to compute the watermark estimate. +func ReadProcessingTimePolicy() ReadOptionFn { + return func(o *readOption) error { + o.TimePolicy = processingTimePolicy + return nil + } +} + +// ReadPublishingTimePolicy specifies that the publishing time of the messages should be used to +// compute the watermark estimate. +func ReadPublishingTimePolicy() ReadOptionFn { + return func(o *readOption) error { + o.TimePolicy = publishingTimePolicy + return nil + } +} + +// ReadFetchSize sets the maximum number of messages to retrieve at a time. +func ReadFetchSize(size int) ReadOptionFn { + return func(o *readOption) error { + if size <= 0 { + return errInvalidFetchSize + } + + o.FetchSize = size + return nil + } +} + +// ReadStartSeqNo sets the start sequence number of messages to read. +func ReadStartSeqNo(seqNo int64) ReadOptionFn { + return func(o *readOption) error { + if seqNo <= 0 { + return errInvalidStartSeqNo + } + + o.StartSeqNo = seqNo + return nil + } +} + +// ReadEndSeqNo sets the end sequence number of messages to read (exclusive). +func ReadEndSeqNo(seqNo int64) ReadOptionFn { + return func(o *readOption) error { + if seqNo <= 0 { + return errInvalidEndSeqNo + } + + o.EndSeqNo = seqNo + return nil + } +} diff --git a/sdks/go/pkg/beam/io/natsio/read_test.go b/sdks/go/pkg/beam/io/natsio/read_test.go new file mode 100644 index 000000000000..faf0b0540c81 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/read_test.go @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import ( + "context" + "fmt" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/nats-io/nats.go" +) + +func TestRead(t *testing.T) { + tests := []struct { + name string + input []*nats.Msg + subject string + opts []ReadOptionFn + pubIndices []int + want []any + }{ + { + name: "Read messages from bounded stream with single subject", + input: []*nats.Msg{ + { + Subject: "subject.1", + Header: nats.Header{nats.MsgIdHdr: []string{"123"}}, + Data: []byte("msg1"), + }, + { + Subject: "subject.2", + Header: nats.Header{nats.MsgIdHdr: []string{"124"}}, + Data: []byte("msg2"), + }, + { + Subject: "subject.1", + Header: nats.Header{nats.MsgIdHdr: []string{"125"}}, + Data: []byte("msg3"), + }, + }, + subject: "subject.1", + opts: []ReadOptionFn{ + ReadEndSeqNo(4), + }, + pubIndices: []int{0, 2}, + want: []any{ + ConsumerMessage{ + Subject: "subject.1", + ID: "123", + Headers: map[string][]string{nats.MsgIdHdr: {"123"}}, + Data: []byte("msg1"), + }, + ConsumerMessage{ + Subject: "subject.1", + ID: "125", + Headers: map[string][]string{nats.MsgIdHdr: {"125"}}, + Data: []byte("msg3"), + }, + }, + }, + { + name: "Read messages from bounded stream with wildcard subject", + input: []*nats.Msg{ + { + Subject: "subject.1", + Header: nats.Header{nats.MsgIdHdr: []string{"123"}}, + Data: []byte("msg1"), + }, + { + Subject: "subject.2", + Header: nats.Header{nats.MsgIdHdr: []string{"124"}}, + Data: []byte("msg2"), + }, + { + Subject: "subject.1", + Header: nats.Header{nats.MsgIdHdr: []string{"125"}}, + Data: []byte("msg3"), + }, + }, + subject: "subject.*", + opts: []ReadOptionFn{ + ReadEndSeqNo(4), + }, + pubIndices: []int{0, 1, 2}, + want: []any{ + ConsumerMessage{ + Subject: "subject.1", + ID: "123", + Headers: map[string][]string{nats.MsgIdHdr: {"123"}}, + Data: []byte("msg1"), + }, + ConsumerMessage{ + Subject: "subject.2", + ID: "124", + Headers: map[string][]string{nats.MsgIdHdr: {"124"}}, + Data: []byte("msg2"), + }, + ConsumerMessage{ + Subject: "subject.1", + ID: "125", + Headers: map[string][]string{nats.MsgIdHdr: {"125"}}, + Data: []byte("msg3"), + }, + }, + }, + { + name: "Read messages from bounded stream with custom fetch size", + input: []*nats.Msg{ + { + Subject: "subject.1", + Header: nats.Header{nats.MsgIdHdr: []string{"123"}}, + Data: []byte("msg1"), + }, + { + Subject: "subject.1", + Header: nats.Header{nats.MsgIdHdr: []string{"124"}}, + Data: []byte("msg2"), + }, + }, + subject: "subject.1", + opts: []ReadOptionFn{ + ReadFetchSize(1), + ReadEndSeqNo(3), + }, + pubIndices: []int{0, 1}, + want: []any{ + ConsumerMessage{ + Subject: "subject.1", + ID: "123", + Headers: map[string][]string{nats.MsgIdHdr: {"123"}}, + Data: []byte("msg1"), + }, + ConsumerMessage{ + Subject: "subject.1", + ID: "124", + Headers: map[string][]string{nats.MsgIdHdr: {"124"}}, + Data: []byte("msg2"), + }, + }, + }, + { + name: "Read messages from bounded stream with custom start seq no", + input: []*nats.Msg{ + { + Subject: "subject.1", + Header: nats.Header{nats.MsgIdHdr: []string{"123"}}, + Data: []byte("msg1"), + }, + { + Subject: "subject.1", + Header: nats.Header{nats.MsgIdHdr: []string{"124"}}, + Data: []byte("msg2"), + }, + }, + subject: "subject.1", + opts: []ReadOptionFn{ + ReadStartSeqNo(2), + ReadEndSeqNo(3), + }, + pubIndices: []int{1}, + want: []any{ + ConsumerMessage{ + Subject: "subject.1", + ID: "124", + Headers: map[string][]string{nats.MsgIdHdr: {"124"}}, + Data: []byte("msg2"), + }, + }, + }, + } + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + srv := newServer(t) + url := srv.ClientURL() + conn := newConn(t, url) + js := newJetStream(t, conn) + + stream := fmt.Sprintf("STREAM-%d", i) + subjectFilter := "subject.*" + + createStream(ctx, t, js, stream, []string{subjectFilter}) + publishMessages(ctx, t, js, tt.input) + + cons := createConsumer(ctx, t, js, stream, []string{subjectFilter}) + pubMsgs := fetchMessages(t, cons, len(tt.input)) + wantWTime := messagesWithPublishingTime(t, pubMsgs, tt.pubIndices, tt.want) + + p, s := beam.NewPipelineWithRoot() + got := Read(s, url, stream, tt.subject, tt.opts...) + + passert.Equals(s, got, wantWTime...) + ptest.RunAndValidate(t, p) + }) + } +} diff --git a/sdks/go/pkg/beam/io/natsio/time_policy.go b/sdks/go/pkg/beam/io/natsio/time_policy.go new file mode 100644 index 000000000000..1c2dbf5165f7 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/time_policy.go @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import ( + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +type timePolicy int + +const ( + processingTimePolicy timePolicy = iota + publishingTimePolicy +) + +type timestampFn func(time.Time) mtime.Time + +func processingTime(_ time.Time) mtime.Time { + return mtime.Now() +} + +func publishingTime(t time.Time) mtime.Time { + return mtime.FromTime(t) +} + +func (p timePolicy) TimestampFn() timestampFn { + switch p { + case processingTimePolicy: + return processingTime + case publishingTimePolicy: + return publishingTime + default: + panic("unsupported time policy") + } +} diff --git a/sdks/go/pkg/beam/io/natsio/time_policy_test.go b/sdks/go/pkg/beam/io/natsio/time_policy_test.go new file mode 100644 index 000000000000..2452334c8cdd --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/time_policy_test.go @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import ( + "testing" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +func Test_timePolicy_TimestampFn(t *testing.T) { + t.Run("processingTime", func(t *testing.T) { + pubTime := time.Date(2020, 1, 2, 3, 4, 5, 6e6, time.UTC) + + t1 := mtime.Now() + got := processingTimePolicy.TimestampFn()(pubTime) + t2 := mtime.Now() + + if got < t1 || got > t2 { + t.Errorf("timestamp = %v, want between %v and %v", got, t1, t2) + } + }) + + t.Run("publishingTime", func(t *testing.T) { + pubTime := time.Date(2020, 1, 2, 3, 4, 5, 6e6, time.UTC) + + if got, want := publishingTimePolicy.TimestampFn()(pubTime), mtime.FromTime(pubTime); got != want { + t.Errorf("timestamp = %v, want %v", got, want) + } + }) +} diff --git a/sdks/go/pkg/beam/io/natsio/watermark_estimator.go b/sdks/go/pkg/beam/io/natsio/watermark_estimator.go new file mode 100644 index 000000000000..b23eb37ac855 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/watermark_estimator.go @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import "time" + +type watermarkEstimator struct { + state int64 +} + +func (e *watermarkEstimator) CurrentWatermark() time.Time { + return time.UnixMilli(e.state) +} + +func (e *watermarkEstimator) ObserveTimestamp(t time.Time) { + ms := t.UnixMilli() + if ms > e.state { + e.state = ms + } +} diff --git a/sdks/go/pkg/beam/io/natsio/watermark_estimator_test.go b/sdks/go/pkg/beam/io/natsio/watermark_estimator_test.go new file mode 100644 index 000000000000..91a9a840a6e0 --- /dev/null +++ b/sdks/go/pkg/beam/io/natsio/watermark_estimator_test.go @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package natsio + +import ( + "testing" + "time" +) + +func Test_watermarkEstimator_CurrentWatermark(t *testing.T) { + ms := int64(1577934245000) + we := &watermarkEstimator{ + state: ms, + } + if got, want := we.CurrentWatermark(), time.UnixMilli(ms); got != want { + t.Errorf("CurrentWatermark() = %v, want %v", got, want) + } +} + +func Test_watermarkEstimator_ObserveTimestamp(t *testing.T) { + t1 := time.Date(2020, 1, 2, 3, 4, 5, 6e6, time.UTC) + t2 := time.Date(2020, 1, 2, 3, 4, 5, 7e6, time.UTC) + + tests := []struct { + name string + state int64 + t time.Time + want int64 + }{ + { + name: "Update watermark when the time is greater than the current state", + state: t1.UnixMilli(), + t: t2, + want: t2.UnixMilli(), + }, + { + name: "Keep existing watermark when the time is not greater than the current state", + state: t2.UnixMilli(), + t: t1, + want: t2.UnixMilli(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + we := &watermarkEstimator{ + state: tt.state, + } + we.ObserveTimestamp(tt.t) + if got, want := we.state, tt.want; got != want { + t.Errorf("state = %v, want %v", got, want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/io/natsio/write_test.go b/sdks/go/pkg/beam/io/natsio/write_test.go index 5e9387ece5f6..678874ce11e9 100644 --- a/sdks/go/pkg/beam/io/natsio/write_test.go +++ b/sdks/go/pkg/beam/io/natsio/write_test.go @@ -187,8 +187,8 @@ func TestWrite(t *testing.T) { js := newJetStream(t, conn) subjects := []string{subject} - createStream(t, ctx, js, stream, subjects) - cons := createConsumer(t, ctx, js, stream, subjects) + createStream(ctx, t, js, stream, subjects) + cons := createConsumer(ctx, t, js, stream, subjects) p, s := beam.NewPipelineWithRoot()