Skip to content

Commit

Permalink
improve documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Sep 28, 2024
1 parent ed446a3 commit 0cf37a3
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
12 changes: 11 additions & 1 deletion kernel/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ func NewBridge(timeToFlight time.Duration) *Bridge {
}
}

// Dispatch the batch of messages in the context of Lambda handler
// Dispatch the batch of messages in the context of Lambda handler.
//
// lambda.Start(
// func(evt events.CloudWatchEvent) error {
// ...
// bridge.Dispatch(bag)
// }
// )
func (s *Bridge) Dispatch(seq []swarm.Bag) error {
s.inflight = map[string]struct{}{}
for _, bag := range seq {
Expand All @@ -49,6 +56,7 @@ func (s *Bridge) Dispatch(seq []swarm.Bag) error {
}
}

// Ask converts input of Lambda handler to the context of the kernel
func (s *Bridge) Ask(ctx context.Context) ([]swarm.Bag, error) {
select {
case <-ctx.Done():
Expand All @@ -58,6 +66,7 @@ func (s *Bridge) Ask(ctx context.Context) ([]swarm.Bag, error) {
}
}

// Acknowledge processed message, allowing lambda handler progress
func (s *Bridge) Ack(ctx context.Context, digest string) error {
delete(s.inflight, digest)
if len(s.inflight) == 0 {
Expand All @@ -67,6 +76,7 @@ func (s *Bridge) Ack(ctx context.Context, digest string) error {
return nil
}

// Acknowledge error, allowing lambda handler progress
func (s *Bridge) Err(ctx context.Context, digest string, err error) error {
delete(s.inflight, digest)
s.session <- err
Expand Down
5 changes: 4 additions & 1 deletion kernel/cathode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/fogfish/swarm"
)

// Cathode defines on-the-wire protocol for [swarm.Bag], covering the ingress.
type Cathode interface {
Ack(ctx context.Context, digest string) error
Err(ctx context.Context, digest string, err error) error
Expand Down Expand Up @@ -48,6 +49,7 @@ type Dequeuer struct {
Cathode Cathode
}

// Creates instance of broker reader
func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer {
ctx, can := context.WithCancel(context.Background())

Expand All @@ -60,12 +62,13 @@ func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer {
}
}

// Close enqueuer
// Closes broker reader, gracefully shutdowns all I/O
func (k *Dequeuer) Close() {
k.cancel()
k.WaitGroup.Wait()
}

// Await reader to complete
func (k *Dequeuer) Await() {
if spawner, ok := k.Cathode.(interface{ Run() }); ok {
go spawner.Run()
Expand Down
3 changes: 2 additions & 1 deletion kernel/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/fogfish/swarm"
)

// Emitter defines on-the-wire protocol for [swarm.Bag]
// Emitter defines on-the-wire protocol for [swarm.Bag], covering egress.
type Emitter interface {
Enq(context.Context, swarm.Bag) error
}
Expand All @@ -38,6 +38,7 @@ type Enqueuer struct {
Emitter Emitter
}

// Creates instance of broker writer
func NewEnqueuer(emitter Emitter, config swarm.Config) *Enqueuer {
ctx, can := context.WithCancel(context.Background())

Expand Down

0 comments on commit 0cf37a3

Please sign in to comment.