Skip to content

Commit

Permalink
feat(events): Change naming used in gRPC API
Browse files Browse the repository at this point in the history
  • Loading branch information
aholstenson committed Jan 14, 2024
1 parent 99bde72 commit b5fbd01
Show file tree
Hide file tree
Showing 14 changed files with 887 additions and 889 deletions.
18 changes: 9 additions & 9 deletions cmd/test/events/consume/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {
Name: name,
Stream: "test",
Subjects: []string{"test"},
Pointer: &eventsv1alpha1.StreamPointer{
From: &eventsv1alpha1.StreamPointer{
Pointer: &eventsv1alpha1.StreamPointer_Start{
Start: true,
},
Expand All @@ -69,16 +69,16 @@ func main() {
log.Fatal(err)
}

stream, err := client.Consume(ctx)
stream, err := client.Events(ctx)
if err != nil {
log.Fatal(err)
}

// Send the initial subscription request
maxProcessingEvents := uint64(*parallelism)
err = stream.Send(&eventsv1alpha1.ConsumeRequest{
Request: &eventsv1alpha1.ConsumeRequest_Subscribe_{
Subscribe: &eventsv1alpha1.ConsumeRequest_Subscribe{
err = stream.Send(&eventsv1alpha1.EventsRequest{
Request: &eventsv1alpha1.EventsRequest_Subscribe_{
Subscribe: &eventsv1alpha1.EventsRequest_Subscribe{
Stream: "test",
Consumer: c.Id,
MaxProcessingEvents: &maxProcessingEvents,
Expand All @@ -95,7 +95,7 @@ func main() {
log.Fatal(err)
}

incoming := make(chan *eventsv1alpha1.ConsumeResponse)
incoming := make(chan *eventsv1alpha1.EventsResponse)
go func() {
for {
resp, err2 := stream.Recv()
Expand Down Expand Up @@ -151,9 +151,9 @@ func main() {
time.Sleep(time.Duration(sleepInMS) * time.Millisecond)

// Acknowledge the event
err = stream.Send(&eventsv1alpha1.ConsumeRequest{
Request: &eventsv1alpha1.ConsumeRequest_Ack_{
Ack: &eventsv1alpha1.ConsumeRequest_Ack{
err = stream.Send(&eventsv1alpha1.EventsRequest{
Request: &eventsv1alpha1.EventsRequest_Ack_{
Ack: &eventsv1alpha1.EventsRequest_Ack{
Ids: []uint64{event.Id},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func (e *EventsServiceServer) EnsureConsumer(ctx context.Context, req *eventsv1a
config.Timeout = req.ProcessingTimeout.AsDuration()
}

if req.Pointer != nil {
config.Pointer = toStreamPointer(req.Pointer)
if req.From != nil {
config.From = toStreamPointer(req.From)
}

consumer, err := e.events.EnsureConsumer(ctx, config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
. "github.com/onsi/gomega"
)

var _ = Describe("Subscriptions", func() {
var _ = Describe("Consumers", func() {
var service eventsv1alpha1.EventsServiceClient

BeforeEach(func(ctx context.Context) {
Expand All @@ -29,7 +29,7 @@ var _ = Describe("Subscriptions", func() {
})

Describe("Ephemeral", func() {
It("can create a subscription", func(ctx context.Context) {
It("can create a consumer", func(ctx context.Context) {
_, err := service.EnsureConsumer(ctx, &eventsv1alpha1.EnsureConsumerRequest{
Stream: "test",
Subjects: []string{
Expand All @@ -41,7 +41,7 @@ var _ = Describe("Subscriptions", func() {
})

Describe("Durable", func() {
It("can update subject of subscription", func(ctx context.Context) {
It("can update subject of consumer", func(ctx context.Context) {
subID := "test-sub"
_, err := service.EnsureConsumer(ctx, &eventsv1alpha1.EnsureConsumerRequest{
Stream: "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

func (e *EventsServiceServer) Consume(server eventsv1alpha1.EventsService_ConsumeServer) error {
func (e *EventsServiceServer) Events(server eventsv1alpha1.EventsService_EventsServer) error {
ctx := server.Context()

subscribe, err := server.Recv()
Expand All @@ -28,7 +28,7 @@ func (e *EventsServiceServer) Consume(server eventsv1alpha1.EventsService_Consum
config := e.createEventConsumeConfig(sub)

var err2 error
events, err2 = e.events.Consume(ctx, config)
events, err2 = e.events.Events(ctx, config)
if err2 != nil {
return errors.Wrap(err2, "could not subscribe")
}
Expand All @@ -38,9 +38,9 @@ func (e *EventsServiceServer) Consume(server eventsv1alpha1.EventsService_Consum
defer events.Close()

// Send initial response
err = server.Send(&eventsv1alpha1.ConsumeResponse{
Response: &eventsv1alpha1.ConsumeResponse_Subscribed_{
Subscribed: &eventsv1alpha1.ConsumeResponse_Subscribed{
err = server.Send(&eventsv1alpha1.EventsResponse{
Response: &eventsv1alpha1.EventsResponse_Subscribed_{
Subscribed: &eventsv1alpha1.EventsResponse_Subscribed{
ProcessingTimeout: durationpb.New(events.Timeout),
},
},
Expand All @@ -50,7 +50,7 @@ func (e *EventsServiceServer) Consume(server eventsv1alpha1.EventsService_Consum
}

// Start a goroutine to read incoming messages and send them to a channel
messages := make(chan *eventsv1alpha1.ConsumeRequest)
messages := make(chan *eventsv1alpha1.EventsRequest)
go func() {
for {
if ctx.Err() != nil {
Expand Down Expand Up @@ -90,7 +90,7 @@ func (e *EventsServiceServer) Consume(server eventsv1alpha1.EventsService_Consum
return nil
case <-e.globalStop:
return nil
case event := <-events.Events():
case event := <-events.Incoming():
eventMap.Add(event)

// Create the common headers
Expand All @@ -105,8 +105,8 @@ func (e *EventsServiceServer) Consume(server eventsv1alpha1.EventsService_Consum
})

// Send the actual event
err = server.Send(&eventsv1alpha1.ConsumeResponse{
Response: &eventsv1alpha1.ConsumeResponse_Event{
err = server.Send(&eventsv1alpha1.EventsResponse{
Response: &eventsv1alpha1.EventsResponse_Event{
Event: &eventsv1alpha1.Event{
Id: event.StreamSeq,
Data: event.Data,
Expand All @@ -122,19 +122,19 @@ func (e *EventsServiceServer) Consume(server eventsv1alpha1.EventsService_Consum
event.DiscardData()
case request := <-messages:
switch r := request.Request.(type) {
case *eventsv1alpha1.ConsumeRequest_Subscribe_:
case *eventsv1alpha1.EventsRequest_Subscribe_:
return errors.New("cannot subscribe again")
case *eventsv1alpha1.ConsumeRequest_Ack_:
case *eventsv1alpha1.EventsRequest_Ack_:
err = e.handleAck(server, eventMap, r)
if err != nil {
return err
}
case *eventsv1alpha1.ConsumeRequest_Reject_:
case *eventsv1alpha1.EventsRequest_Reject_:
err = e.handleReject(server, eventMap, r)
if err != nil {
return err
}
case *eventsv1alpha1.ConsumeRequest_Ping_:
case *eventsv1alpha1.EventsRequest_Ping_:
err = e.handlePing(server, eventMap, r)
if err != nil {
return err
Expand All @@ -145,9 +145,9 @@ func (e *EventsServiceServer) Consume(server eventsv1alpha1.EventsService_Consum
}

func (e *EventsServiceServer) handleAck(
server eventsv1alpha1.EventsService_ConsumeServer,
server eventsv1alpha1.EventsService_EventsServer,
eventMap eventTracker,
r *eventsv1alpha1.ConsumeRequest_Ack_,
r *eventsv1alpha1.EventsRequest_Ack_,
) error {
ids := r.Ack.Ids

Expand Down Expand Up @@ -175,9 +175,9 @@ func (e *EventsServiceServer) handleAck(
}
}

err := server.Send(&eventsv1alpha1.ConsumeResponse{
Response: &eventsv1alpha1.ConsumeResponse_AckConfirmation_{
AckConfirmation: &eventsv1alpha1.ConsumeResponse_AckConfirmation{
err := server.Send(&eventsv1alpha1.EventsResponse{
Response: &eventsv1alpha1.EventsResponse_AckConfirmation_{
AckConfirmation: &eventsv1alpha1.EventsResponse_AckConfirmation{
Ids: processedIds,
InvalidIds: invalidIds,
TemporaryFailedIds: temporaryErrors,
Expand All @@ -192,9 +192,9 @@ func (e *EventsServiceServer) handleAck(
}

func (e *EventsServiceServer) handleReject(
server eventsv1alpha1.EventsService_ConsumeServer,
server eventsv1alpha1.EventsService_EventsServer,
eventMap eventTracker,
r *eventsv1alpha1.ConsumeRequest_Reject_,
r *eventsv1alpha1.EventsRequest_Reject_,
) error {
ids := r.Reject.Ids
permanently := r.Reject.Permanently
Expand Down Expand Up @@ -232,9 +232,9 @@ func (e *EventsServiceServer) handleReject(
}
}

err := server.Send(&eventsv1alpha1.ConsumeResponse{
Response: &eventsv1alpha1.ConsumeResponse_RejectConfirmation_{
RejectConfirmation: &eventsv1alpha1.ConsumeResponse_RejectConfirmation{
err := server.Send(&eventsv1alpha1.EventsResponse{
Response: &eventsv1alpha1.EventsResponse_RejectConfirmation_{
RejectConfirmation: &eventsv1alpha1.EventsResponse_RejectConfirmation{
Ids: processedIds,
InvalidIds: invalidIds,
TemporaryFailedIds: temporaryErrors,
Expand All @@ -249,9 +249,9 @@ func (e *EventsServiceServer) handleReject(
}

func (e *EventsServiceServer) handlePing(
server eventsv1alpha1.EventsService_ConsumeServer,
server eventsv1alpha1.EventsService_EventsServer,
eventMap eventTracker,
r *eventsv1alpha1.ConsumeRequest_Ping_,
r *eventsv1alpha1.EventsRequest_Ping_,
) error {
ids := r.Ping.Ids

Expand All @@ -278,9 +278,9 @@ func (e *EventsServiceServer) handlePing(
}
}

err := server.Send(&eventsv1alpha1.ConsumeResponse{
Response: &eventsv1alpha1.ConsumeResponse_PingConfirmation_{
PingConfirmation: &eventsv1alpha1.ConsumeResponse_PingConfirmation{
err := server.Send(&eventsv1alpha1.EventsResponse{
Response: &eventsv1alpha1.EventsResponse_PingConfirmation_{
PingConfirmation: &eventsv1alpha1.EventsResponse_PingConfirmation{
Ids: processedIds,
InvalidIds: invalidIds,
TemporaryFailedIds: temporaryErrors,
Expand All @@ -294,7 +294,7 @@ func (e *EventsServiceServer) handlePing(
return nil
}

func (*EventsServiceServer) createEventConsumeConfig(sub *eventsv1alpha1.ConsumeRequest_Subscribe) *events.EventConsumeConfig {
func (*EventsServiceServer) createEventConsumeConfig(sub *eventsv1alpha1.EventsRequest_Subscribe) *events.EventConsumeConfig {
maxPendingEvents := uint(0)
if sub.MaxProcessingEvents != nil {
maxPendingEvents = uint(*sub.MaxProcessingEvents)
Expand Down
Loading

0 comments on commit b5fbd01

Please sign in to comment.