Skip to content

Commit

Permalink
Split kernel on distinct emitter (writer) and cathode (reader).
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Sep 27, 2024
1 parent 9d3c7b1 commit 7b3dfea
Show file tree
Hide file tree
Showing 12 changed files with 801 additions and 349 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ require (
github.com/fogfish/golem/pure v0.10.1
github.com/fogfish/guid/v2 v2.0.4
github.com/fogfish/it v1.0.0
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/logger/v3 v3.1.1
)

require github.com/fogfish/golem/hseq v1.2.0 // indirect
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ github.com/fogfish/it v1.0.0 h1:kiwFHZcrkRLUydZoIoY0gTuMfj38trwvLo0YRyIkeG8=
github.com/fogfish/it v1.0.0/go.mod h1:NQJG4Ygvek85y7zGj0Gny8+6ygAnHjfBORhI7TdQhp4=
github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o=
github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4=
github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE=
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
74 changes: 74 additions & 0 deletions kernel/bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package kernel

import (
"context"
"fmt"
"time"

"github.com/fogfish/swarm"
)

// Bridge Lambda's main function to [Cathode] interface
type Bridge struct {
timeToFlight time.Duration
inflight map[string]struct{}
session chan error
ch chan []swarm.Bag
}

func NewBridge(timeToFlight time.Duration) *Bridge {
return &Bridge{
ch: make(chan []swarm.Bag),
session: make(chan error),
timeToFlight: timeToFlight,
}
}

// Dispatch the batch of messages in the context of Lambda handler
func (s *Bridge) Dispatch(seq []swarm.Bag) error {
s.inflight = map[string]struct{}{}
for _, bag := range seq {
s.inflight[bag.Ctx.Digest] = struct{}{}
}

s.ch <- seq

select {
case err := <-s.session:
return err
case <-time.After(s.timeToFlight):
return fmt.Errorf("ack timeout")
}
}

func (s *Bridge) Ask(ctx context.Context) ([]swarm.Bag, error) {
select {
case <-ctx.Done():
return nil, nil
case bag := <-s.ch:
return bag, nil
}
}

func (s *Bridge) Ack(ctx context.Context, digest string) error {
delete(s.inflight, digest)
if len(s.inflight) == 0 {
s.session <- nil
}

return nil
}

func (s *Bridge) Err(ctx context.Context, digest string, err error) error {
delete(s.inflight, digest)
s.session <- err
return nil
}
108 changes: 108 additions & 0 deletions kernel/bridge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package kernel

import (
"context"
"log/slog"
"testing"
"time"

"github.com/fogfish/it/v2"
"github.com/fogfish/logger/v3"
"github.com/fogfish/swarm"
)

func init() {
slog.SetDefault(
logger.New(
logger.WithSourceShorten(),
logger.WithoutTimestamp(),
logger.WithLogLevel(slog.LevelDebug),
),
)
}

func TestBridge(t *testing.T) {

seq := []swarm.Bag{
{Ctx: &swarm.Context{Category: "test", Digest: "1"}, Object: []byte(`"1"`)},
{Ctx: &swarm.Context{Category: "test", Digest: "2"}, Object: []byte(`"2"`)},
}

t.Run("None", func(t *testing.T) {
k := NewDequeuer(mockSpawner(seq), swarm.Config{PollFrequency: 1 * time.Second})
go k.Await()
k.Close()
})

t.Run("Dequeue.1", func(t *testing.T) {
mock := mockSpawner(seq)
k := NewDequeuer(mock, swarm.Config{PollFrequency: 10 * time.Millisecond})
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
go k.Await()

ack <- <-rcv
ack <- <-rcv

k.Close()

it.Then(t).Should(
it.Seq(mock.ack).Equal(`1`, `2`),
)
})

t.Run("Timeout", func(t *testing.T) {
mock := mockSpawner(seq)
k := NewDequeuer(mock, swarm.Config{PollFrequency: 0 * time.Millisecond})
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
go k.Await()

ack <- <-rcv
<-rcv
time.Sleep(1 * time.Millisecond)

k.Close()
time.Sleep(1 * time.Millisecond)

it.Then(t).ShouldNot(
it.Nil(mock.err),
)
})

}

//------------------------------------------------------------------------------

type spawner struct {
*Bridge
seq []swarm.Bag
ack []string
err error
}

func mockSpawner(seq []swarm.Bag) *spawner {
return &spawner{
Bridge: NewBridge(100 * time.Millisecond),
seq: seq,
}
}

func (s *spawner) Ack(ctx context.Context, digest string) error {
if err := s.Bridge.Ack(ctx, digest); err != nil {
return err
}

s.ack = append(s.ack, digest)
return nil
}

func (s *spawner) Run() {
s.err = s.Bridge.Dispatch(s.seq)
}
191 changes: 191 additions & 0 deletions kernel/cathode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package kernel

import (
"context"
"log/slog"
"sync"
"time"

"github.com/fogfish/swarm"
)

type Cathode interface {
Ack(ctx context.Context, digest string) error
Err(ctx context.Context, digest string, err error) error
Ask(ctx context.Context) ([]swarm.Bag, error)
}

// Decode message from wire format
type Decoder[T any] interface{ Decode([]byte) (T, error) }

type Dequeuer struct {
sync.WaitGroup
sync.RWMutex

// Control-plane stop channel used by go routines to stop I/O on data channels
context context.Context
cancel context.CancelFunc

// Kernel configuration
Config swarm.Config

// event router, binds category with destination channel
router map[string]interface{ Route(swarm.Bag) error }

// Cathode is the reader port on message broker
Cathode Cathode
}

func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer {
ctx, can := context.WithCancel(context.Background())

return &Dequeuer{
Config: config,
context: ctx,
cancel: can,
router: map[string]interface{ Route(swarm.Bag) error }{},
Cathode: cathode,
}
}

// Close enqueuer
func (k *Dequeuer) Close() {
k.cancel()
k.WaitGroup.Wait()
}

func (k *Dequeuer) Await() {
if spawner, ok := k.Cathode.(interface{ Run() }); ok {
go spawner.Run()
}

k.receive()
<-k.context.Done()
k.WaitGroup.Wait()
}

// internal infinite receive loop.
// waiting for message from event buses and queues and schedules it for delivery.
func (k *Dequeuer) receive() {
asker := func() {
seq, err := k.Cathode.Ask(k.context)
if k.Config.StdErr != nil && err != nil {
k.Config.StdErr <- err
return
}

for i := 0; i < len(seq); i++ {
bag := seq[i]

k.RWMutex.RLock()
r, has := k.router[bag.Ctx.Category]
k.RWMutex.RUnlock()

if has {
err := r.Route(bag)
if k.Config.StdErr != nil && err != nil {
k.Config.StdErr <- err
return
}
}
}
}

k.WaitGroup.Add(1)
go func() {
slog.Debug("kernel dequeue loop started")

exit:
for {
select {
case <-k.context.Done():
break exit
default:
}

select {
case <-k.context.Done():
break exit
case <-time.After(k.Config.PollFrequency):
asker()
}
}

k.WaitGroup.Done()
slog.Debug("kernel dequeue loop stopped")
}()
}

// Dequeue creates pair of channels within kernel to enqueue messages
func Dequeue[T any](k *Dequeuer, cat string, codec Decoder[T]) ( /*rcv*/ <-chan swarm.Msg[T] /*ack*/, chan<- swarm.Msg[T]) {
rcv := make(chan swarm.Msg[T], k.Config.CapRcv)
ack := make(chan swarm.Msg[T], k.Config.CapAck)

k.RWMutex.Lock()
k.router[cat] = router[T]{ch: rcv, codec: codec}
k.RWMutex.Unlock()

// emitter routine
acks := func(msg swarm.Msg[T]) {
if msg.Ctx.Error == nil {
err := k.Cathode.Ack(k.context, msg.Ctx.Digest)
if k.Config.StdErr != nil && err != nil {
k.Config.StdErr <- err
}
} else {
err := k.Cathode.Err(k.context, msg.Ctx.Digest, msg.Ctx.Error)
if k.Config.StdErr != nil && err != nil {
k.Config.StdErr <- err
}
}
}

k.WaitGroup.Add(1)
go func() {
slog.Debug("kernel dequeue started", "cat", cat)

exit:
for {
// The try-receive operation here is to
// try to exit the sender goroutine as
// early as possible. Try-receive and
// try-send select blocks are specially
// optimized by the standard Go
// compiler, so they are very efficient.
select {
case <-k.context.Done():
break exit
default:
}

select {
case <-k.context.Done():
break exit
case msg := <-ack:
acks(msg)
}
}

backlog := len(ack)
close(ack)

if backlog != 0 {
for msg := range ack {
acks(msg)
}
}

k.WaitGroup.Done()
slog.Debug("kernel dequeue stopped", "cat", cat)
}()

return rcv, ack
}
Loading

0 comments on commit 7b3dfea

Please sign in to comment.