Skip to content

Commit

Permalink
Rollback inflight sync routine (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish authored Mar 17, 2024
1 parent 06bb611 commit 0efe5b8
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ go get -u github.com/fogfish/swarm
- [Generic events](#generic-events)
- [Error handling](#error-handling)
- [Fail Fast](#fail-fast)
- [Internal channel architecture](#internal-channel-architecture)
- [Serverless](#serverless)
- [Supported queuing system and event brokers](#supported-queuing-system-and-event-brokers)
- [Race condition in Serverless](#race-condition-in-serverless)

### Produce (enqueue) messages

Expand Down Expand Up @@ -423,6 +422,28 @@ stack.NewSink(
```


### Race condition in Serverless

In serverless environment doing dequeue and enqueue might cause a raise condition. The dequeue loop might finish earlier than other messages emitted.

```go
rcv, ack := queue.Dequeue[/* .. */](broker1)
snd, dlq := queue.Enqueue[/* .. */](broker2)

for msg := range rcv {
snd <- // ...

// The ack would cause sleep of function in serverless.
// snd channel might not be flushed before function sleep.
// The library does not provide yet ultimate solution.
ack <- msg
}
```

Use one of the following techniques to overcome the issue
1. Add sleep before ack
2. Use sync version of sender

## How To Contribute

The library is [Apache Version 2.0](LICENSE) licensed and accepts contributions via GitHub pull requests:
Expand Down
10 changes: 2 additions & 8 deletions internal/kernel/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ type Kernel struct {
// The flag indicates if Await loop is started
waiting bool

// In-flight waiting group
inflight sync.WaitGroup

// On the wire protocol emitter (writer) and cathode (receiver)
Emitter Emitter
Cathode Cathode
Expand All @@ -78,7 +75,8 @@ func New(emitter Emitter, cathode Cathode, config swarm.Config) *Kernel {
mainStop: make(chan struct{}, 1), // MUST BE buffered
ctrlStop: make(chan struct{}),

router: map[string]interface{ Route(swarm.Bag) error }{},
router: map[string]interface{ Route(swarm.Bag) error }{},

Emitter: emitter,
Cathode: cathode,
}
Expand Down Expand Up @@ -215,8 +213,6 @@ exit:
close(k.ctrlAcks)
k.ctrlAcks = nil

k.inflight.Wait()

return err
}

Expand Down Expand Up @@ -270,9 +266,7 @@ func Enqueue[T any](k *Kernel, cat string, codec Codec[T]) ( /*snd*/ chan<- T /*
case <-k.ctrlStop:
break exit
case obj := <-snd:
k.inflight.Add(1)
emit(obj)
k.inflight.Done()
}
}

Expand Down

0 comments on commit 0efe5b8

Please sign in to comment.