Skip to content

Commit

Permalink
feat: Events now include information about delivery attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
aholstenson committed Jun 28, 2024
1 parent 1f61e95 commit dee161f
Show file tree
Hide file tree
Showing 11 changed files with 6,307 additions and 119 deletions.
9 changes: 5 additions & 4 deletions internal/api/events/v1alpha1/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,11 @@ func (e *EventsServiceServer) Events(server eventsv1alpha1.EventsService_EventsS
err = server.Send(&eventsv1alpha1.EventsResponse{
Response: &eventsv1alpha1.EventsResponse_Event{
Event: &eventsv1alpha1.Event{
Id: event.StreamSeq,
Data: event.Data,
Subject: event.Subject,
Headers: headers,
Id: event.StreamSeq,
Data: event.Data,
Subject: event.Subject,
Headers: headers,
DeliveryAttempt: event.DeliveryAttempt,
},
},
})
Expand Down
45 changes: 34 additions & 11 deletions internal/api/events/v1alpha1/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"
eventsv1alpha1 "windshift/service/internal/proto/windshift/events/v1alpha1"
testv1 "windshift/service/internal/proto/windshift/test/v1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -94,7 +95,7 @@ var _ = Describe("Events", func() {
// Send an event
_, err = service.PublishEvent(ctx, &eventsv1alpha1.PublishEventRequest{
Subject: "events.test",
Data: Data(&emptypb.Empty{}),
Data: Data(&testv1.StringValue{Value: "test"}),
})
Expect(err).ToNot(HaveOccurred())

Expand All @@ -104,6 +105,11 @@ var _ = Describe("Events", func() {
if _, ok := in.Response.(*eventsv1alpha1.EventsResponse_Event); !ok {
Fail("expected Event message")
}

e := in.GetEvent()
Expect(e.Subject).To(Equal("events.test"))
Expect(e.DeliveryAttempt).To(BeNumerically("==", 1))
Expect(e.Data).To(Equal(Data(&testv1.StringValue{Value: "test"})))
})
})

Expand Down Expand Up @@ -141,7 +147,7 @@ var _ = Describe("Events", func() {
// Send an event
_, err = service.PublishEvent(ctx, &eventsv1alpha1.PublishEventRequest{
Subject: "events.test",
Data: Data(&emptypb.Empty{}),
Data: Data(&testv1.StringValue{Value: "test"}),
})
Expect(err).ToNot(HaveOccurred())

Expand All @@ -152,7 +158,12 @@ var _ = Describe("Events", func() {
Fail("expected Event message")
}

eventID := in.GetEvent().GetId()
e := in.GetEvent()
Expect(e.Subject).To(Equal("events.test"))
Expect(e.DeliveryAttempt).To(BeNumerically("==", 1))
Expect(e.Data).To(Equal(Data(&testv1.StringValue{Value: "test"})))

eventID := e.GetId()

// Acknowledge the event
err = client.Send(&eventsv1alpha1.EventsRequest{
Expand All @@ -170,8 +181,7 @@ var _ = Describe("Events", func() {
if r, ok := in.Response.(*eventsv1alpha1.EventsResponse_AckConfirmation_); ok {
Expect(r.AckConfirmation.Ids).To(Equal([]uint64{eventID}))
} else {

Fail("expected AckConfiramtion message")
Fail("expected AckConfirmation message")
}
})

Expand Down Expand Up @@ -221,8 +231,7 @@ var _ = Describe("Events", func() {
if r, ok := in.Response.(*eventsv1alpha1.EventsResponse_AckConfirmation_); ok {
Expect(r.AckConfirmation.InvalidIds).To(Equal([]uint64{1}))
} else {

Fail("expected AckConfiramtion message")
Fail("expected AckConfirmation message")
}
})

Expand Down Expand Up @@ -337,7 +346,7 @@ var _ = Describe("Events", func() {
// Send an event
_, err = service.PublishEvent(ctx, &eventsv1alpha1.PublishEventRequest{
Subject: "events.test",
Data: Data(&emptypb.Empty{}),
Data: Data(&testv1.StringValue{Value: "test"}),
})
Expect(err).ToNot(HaveOccurred())

Expand All @@ -348,7 +357,12 @@ var _ = Describe("Events", func() {
Fail("expected Event message")
}

eventID := in.GetEvent().GetId()
e := in.GetEvent()
Expect(e.Subject).To(Equal("events.test"))
Expect(e.DeliveryAttempt).To(BeNumerically("==", 1))
Expect(e.Data).To(Equal(Data(&testv1.StringValue{Value: "test"})))

eventID := e.GetId()

// Reject the event
err = client.Send(&eventsv1alpha1.EventsRequest{
Expand All @@ -374,6 +388,11 @@ var _ = Describe("Events", func() {
if _, ok := in.Response.(*eventsv1alpha1.EventsResponse_Event); !ok {
Fail("expected Event message")
}

e = in.GetEvent()
Expect(e.Subject).To(Equal("events.test"))
Expect(e.DeliveryAttempt).To(BeNumerically("==", 2))
Expect(e.Data).To(Equal(Data(&testv1.StringValue{Value: "test"})))
})

It("rejecting unknown event fails", NodeTimeout(5*time.Second), func(ctx context.Context) {
Expand Down Expand Up @@ -458,7 +477,7 @@ var _ = Describe("Events", func() {
// Send an event
_, err = service.PublishEvent(ctx, &eventsv1alpha1.PublishEventRequest{
Subject: "events.test",
Data: Data(&emptypb.Empty{}),
Data: Data(&testv1.StringValue{Value: "test"}),
})
Expect(err).ToNot(HaveOccurred())

Expand All @@ -469,7 +488,11 @@ var _ = Describe("Events", func() {
Fail("expected Event message")
}

eventID := in.GetEvent().GetId()
e := in.GetEvent()
Expect(e.Subject).To(Equal("events.test"))
Expect(e.DeliveryAttempt).To(BeNumerically("==", 1))
Expect(e.Data).To(Equal(Data(&testv1.StringValue{Value: "test"})))
eventID := e.GetId()

// Reject the event
err = client.Send(&eventsv1alpha1.EventsRequest{
Expand Down
25 changes: 15 additions & 10 deletions internal/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type Event struct {
// resume from there on the next run.
StreamSeq uint64

// DeliveryAttempt is the number of times the event has been delivered to
// a consumer. The first delivery is 1.
DeliveryAttempt uint64

// Headers contains the headers of the event.
Headers *Headers

Expand Down Expand Up @@ -108,16 +112,17 @@ func newEvent(
}

return &Event{
span: span,
logger: logger,
msg: msg,
onProcess: onProcess,
Context: ctx,
Subject: msg.Subject(),
ConsumerSeq: md.Sequence.Stream,
StreamSeq: md.Sequence.Consumer,
Headers: headers,
Data: data,
span: span,
logger: logger,
msg: msg,
onProcess: onProcess,
Context: ctx,
Subject: msg.Subject(),
ConsumerSeq: md.Sequence.Stream,
StreamSeq: md.Sequence.Consumer,
DeliveryAttempt: md.NumDelivered,
Headers: headers,
Data: data,
}, nil
}

Expand Down
13 changes: 12 additions & 1 deletion internal/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ var _ = Describe("Event Consumption", func() {
select {
case event := <-ec.Incoming():
Expect(event).ToNot(BeNil())
Expect(event.DeliveryAttempt).To(BeNumerically("==", 1))

err = event.Reject()
Expect(err).ToNot(HaveOccurred())
Expand All @@ -569,6 +570,7 @@ var _ = Describe("Event Consumption", func() {
select {
case event := <-ec.Incoming():
Expect(event).ToNot(BeNil())
Expect(event.DeliveryAttempt).To(BeNumerically("==", 2))

err = event.Ack()
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -611,6 +613,7 @@ var _ = Describe("Event Consumption", func() {
select {
case event := <-ec.Incoming():
Expect(event).ToNot(BeNil())
Expect(event.DeliveryAttempt).To(BeNumerically("==", 1))

event.DiscardData()
err = event.Reject()
Expand All @@ -622,6 +625,7 @@ var _ = Describe("Event Consumption", func() {
select {
case event := <-ec.Incoming():
Expect(event).ToNot(BeNil())
Expect(event.DeliveryAttempt).To(BeNumerically("==", 2))

err = event.Ack()
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -665,6 +669,7 @@ var _ = Describe("Event Consumption", func() {

event := <-ec1.Incoming()
Expect(event).ToNot(BeNil())
Expect(event.DeliveryAttempt).To(BeNumerically("==", 1))
err = ec1.Close()
Expect(err).ToNot(HaveOccurred())

Expand All @@ -675,6 +680,7 @@ var _ = Describe("Event Consumption", func() {
select {
case event = <-ec2.Incoming():
Expect(event).ToNot(BeNil())
Expect(event.DeliveryAttempt).To(BeNumerically("==", 2))
case <-time.After(500 * time.Millisecond):
Fail("timeout waiting for event")
}
Expand Down Expand Up @@ -709,6 +715,7 @@ var _ = Describe("Event Consumption", func() {
select {
case event := <-ec.Incoming():
Expect(event).ToNot(BeNil())
Expect(event.DeliveryAttempt).To(BeNumerically("==", 1))
case <-time.After(200 * time.Millisecond):
Fail("no event received")
}
Expand All @@ -718,6 +725,7 @@ var _ = Describe("Event Consumption", func() {
select {
case event := <-ec.Incoming():
Expect(event).ToNot(BeNil())
Expect(event.DeliveryAttempt).To(BeNumerically("==", 2))

err = event.Ack()
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -758,17 +766,20 @@ var _ = Describe("Event Consumption", func() {

event := <-ec.Incoming()
Expect(event).ToNot(BeNil())
Expect(event.DeliveryAttempt).To(BeNumerically("==", 1))

start := time.Now()
err = event.RejectWithDelay(100 * time.Millisecond)
Expect(err).ToNot(HaveOccurred())

// Receive the event again
select {
case <-ec.Incoming():
case event := <-ec.Incoming():
if time.Since(start) < 100*time.Millisecond {
Fail("event received too early")
}

Expect(event.DeliveryAttempt).To(BeNumerically("==", 2))
case <-time.After(200 * time.Millisecond):
Fail("no event received")
}
Expand Down
Loading

0 comments on commit dee161f

Please sign in to comment.