Skip to content

Commit

Permalink
eventbridge add example of sending binary data
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Sep 29, 2024
1 parent e2d2359 commit bda64c2
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 118 deletions.
2 changes: 1 addition & 1 deletion broker/eventbridge/awscdk.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/awscdk_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
Expand Down
8 changes: 8 additions & 0 deletions broker/eventbridge/config.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package eventbridge

import (
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/eventbridge.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
Expand Down
114 changes: 1 addition & 113 deletions broker/eventbridge/eventbridge_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
Expand All @@ -23,118 +23,6 @@ import (
"github.com/fogfish/swarm/kernel"
)

/*
import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/eventbridge"
"github.com/fogfish/swarm"
sut "github.com/fogfish/swarm/broker/eventbridge"
"github.com/fogfish/swarm/qtest"
"github.com/fogfish/swarm/queue"
)
func TestEnqueueEventBridge(t *testing.T) {
qtest.TestEnqueueTyped(t, newMockEnqueue)
qtest.TestEnqueueBytes(t, newMockEnqueue)
qtest.TestEnqueueEvent(t, newMockEnqueue)
}
func TestDequeueEventBridge(t *testing.T) {
qtest.TestDequeueTyped(t, newMockDequeue)
qtest.TestDequeueBytes(t, newMockDequeue)
qtest.TestDequeueEvent(t, newMockDequeue)
}
// Mock AWS EventBridge Enqueue
type mockEnqueue struct {
sut.EventBridge
expectCategory string
loopback chan string
}
func newMockEnqueue(
loopback chan string,
queueName string,
expectCategory string,
opts ...swarm.Option,
) swarm.Broker {
mock := &mockEnqueue{expectCategory: expectCategory, loopback: loopback}
conf := append(opts, swarm.WithService(mock))
return queue.Must(sut.New(queueName, conf...))
}
func (m *mockEnqueue) PutEvents(ctx context.Context, req *eventbridge.PutEventsInput, opts ...func(*eventbridge.Options)) (*eventbridge.PutEventsOutput, error) {
if len(req.Entries) != 1 {
return nil, fmt.Errorf("Bad request")
}
if !strings.HasPrefix(*req.Entries[0].DetailType, m.expectCategory) {
return nil, fmt.Errorf("Bad message category")
}
m.loopback <- aws.ToString(req.Entries[0].Detail)
return &eventbridge.PutEventsOutput{
FailedEntryCount: 0,
}, nil
}
// Mock AWS EventBridge Dequeue
func newMockDequeue(
loopback chan string,
queueName string,
returnCategory string,
returnMessage string,
returnReceipt string,
opts ...swarm.Option,
) swarm.Broker {
mock := &mockLambda{
loopback: loopback,
returnCategory: returnCategory,
returnMessage: returnMessage,
returnReceipt: returnReceipt,
}
conf := append(opts, swarm.WithService(mock))
return queue.Must(sut.New(queueName, conf...))
}
type mockLambda struct {
sut.EventBridge
loopback chan string
returnCategory string
returnMessage string
returnReceipt string
}
func (mock *mockLambda) Start(handler interface{}) {
msg, _ := json.Marshal(
events.CloudWatchEvent{
ID: "abc-def",
DetailType: mock.returnCategory,
Detail: json.RawMessage(mock.returnMessage),
},
)
h := lambda.NewHandler(handler)
_, err := h.Invoke(context.Background(), msg)
if err != nil {
panic(err)
}
mock.loopback <- mock.returnReceipt
}
*/

func TestReader(t *testing.T) {
var bag []swarm.Bag
bridge := &bridge{kernel.NewBridge(100 * time.Millisecond)}
Expand Down
45 changes: 45 additions & 0 deletions broker/eventbridge/examples/dequeue/bytes/eventbridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package main

import (
"log/slog"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/eventbridge"
"github.com/fogfish/swarm/dequeue"
)

func main() {
q, err := eventbridge.NewReader("swarm-example-eventbridge",
eventbridge.WithConfig(
swarm.WithLogStdErr(),
),
)
if err != nil {
slog.Error("eventbridge reader has failed", "err", err)
return
}

//
go actor("user").handle(dequeue.Bytes(q, "User"))
go actor("note").handle(dequeue.Bytes(q, "Note"))
go actor("like").handle(dequeue.Bytes(q, "Like"))

q.Await()
}

type actor string

func (a actor) handle(rcv <-chan swarm.Msg[[]byte], ack chan<- swarm.Msg[[]byte]) {
for msg := range rcv {
slog.Info("Event", "type", a, "msg", string(msg.Object))
ack <- msg
}
}
3 changes: 2 additions & 1 deletion broker/eventbridge/examples/dequeue/event/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func bus(rcv <-chan swarm.Msg[Event], ack chan<- swarm.Msg[Event]) {
}

v, _ := json.MarshalIndent(msg, prefix, " ")
fmt.Printf("common note > \n %s\n", v)
fmt.Printf("event > \n %s\n", v)

ack <- msg
}
}
40 changes: 40 additions & 0 deletions broker/eventbridge/examples/enqueue/bytes/eventbridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package main

import (
"log/slog"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/eventbridge"
"github.com/fogfish/swarm/enqueue"
)

func main() {
q, err := eventbridge.NewWriter("swarm-example-eventbridge",
eventbridge.WithConfig(
swarm.WithSource("swarm-example-eventbridge"),
swarm.WithLogStdErr(),
),
)
if err != nil {
slog.Error("eventbridge writer has failed", "err", err)
return
}

user := swarm.LogDeadLetters(enqueue.Bytes(q, "User"))
note := swarm.LogDeadLetters(enqueue.Bytes(q, "Note"))
like := swarm.LogDeadLetters(enqueue.Bytes(q, "Like"))

user <- []byte(`User Signed in`)
note <- []byte(`User wrote note`)
like <- []byte(`User liked note`)

q.Close()
}
14 changes: 13 additions & 1 deletion broker/eventbridge/examples/serverless/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func main() {
},
},
)
//.

//
broker := eventbridge.NewBroker(stack, jsii.String("Broker"), nil)
broker.NewEventBus(nil)

Expand All @@ -53,5 +54,16 @@ func main() {
},
)

broker.NewSink(
&eventbridge.SinkProps{
Source: []string{"swarm-example-eventbridge"},
Categories: []string{"User", "Note", "Like"},
Function: &scud.FunctionGoProps{
SourceCodeModule: "github.com/fogfish/swarm/broker/eventbridge",
SourceCodeLambda: "examples/dequeue/bytes",
},
},
)

app.Synth(nil)
}

0 comments on commit bda64c2

Please sign in to comment.