Skip to content
This repository has been archived by the owner on Apr 28, 2024. It is now read-only.

Commit

Permalink
v0.4.3
Browse files Browse the repository at this point in the history
Added:
- Hooks for lifecycle events
Fixed:
- Close errors channel
- Small fixes
  • Loading branch information
Alexander Kiryukhin committed Apr 4, 2019
1 parent 04db7b6 commit c691d42
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 28 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ err := <- r.Errors()

Disabled by default. Use `r.With(rutina.WithErrChan())` to turn on.

## Events and hooks

Rutina has own simple lifecycle events system. You can subscribe your hooks on any of this events:

* `EventRoutineStart` - Fires when starts new routine
* `EventRoutineStop` - Fires when routine stopped with any result
* `EventRoutineComplete` - Fires when routine stopped without errors
* `EventRoutineFail` - Fires when routine stopped with error
* `EventAppStop` - Fires when all routines stopped with any result
* `EventAppComplete` - Fires when all routines stopped with no errors
* `EventAppFail` - Fires when all routines stopped with error

Example:

```go
r.RegisterHook(rutina.EventRoutineStart, func(ev rutina.Event, rid int) error {
log.Println("Started routine with ID", rid)
return nil
})
```

## Mixins

### Usage
Expand Down
29 changes: 29 additions & 0 deletions event_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:generate stringer -type=Event
package rutina

// Event represents lifecycle events
type Event int

const (
EventRoutineStart Event = iota
EventRoutineStop
EventRoutineComplete
EventRoutineFail
EventAppStop
EventAppComplete
EventAppFail
)

// Hook is function that calls when event fired
// Params:
// ev Event - fired event
// r *Rutina - pointer to rutina
// rid int - ID of routine if present, 0 - otherwise
type Hook func(ev Event, r *Rutina, rid int) error
2 changes: 1 addition & 1 deletion example/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
// New instance with builtin context
r := rutina.New()

r = r.With(rutina.WithErrChan())
r = r.With(rutina.WithErrChan(), rutina.WithStdLogger())

r.Go(func(ctx context.Context) error {
<-time.After(1 * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion mixins.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func WithLogger(logger *log.Logger) *MixinLogger {

// WithStdLogger adds standard logger to rutina
func WithStdLogger() *MixinLogger {
return &MixinLogger{Logger: log.New(os.Stdout, "rutina", log.LstdFlags)}
return &MixinLogger{Logger: log.New(os.Stdout, "[rutina]", log.LstdFlags)}
}

func (o MixinLogger) apply(r *Rutina) {
Expand Down
76 changes: 50 additions & 26 deletions rutina.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@ import (

//Rutina is routine manager
type Rutina struct {
ctx context.Context // State of application (started/stopped)
Cancel func() // Cancel func that stops all routines
wg sync.WaitGroup // WaitGroup that wait all routines to complete
o sync.Once // Flag that prevents overwrite first error that shutdowns all routines
err error // First error that shutdowns all routines
logger *log.Logger // Optional logger
counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger
errCh chan error // Optional channel for errors when RestartIfFail and DoNothingIfFail
ctx context.Context // State of application (started/stopped)
Cancel func() // Cancel func that stops all routines
wg sync.WaitGroup // WaitGroup that wait all routines to complete
onceErr sync.Once // Flag that prevents overwrite first error that shutdowns all routines
onceWait sync.Once // Flag that prevents wait already waited rutina
err error // First error that shutdowns all routines
logger *log.Logger // Optional logger
counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger
errCh chan error // Optional channel for errors when RestartIfFail and DoNothingIfFail
hooks map[Event][]Hook // Lifecycle hooks
}

// New instance with builtin context
func New(mixins ...Mixin) *Rutina {
ctx, cancel := context.WithCancel(context.Background())
var counter uint64
r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil}
r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil, hooks: map[Event][]Hook{}}
return r.With(mixins...)
}

Expand All @@ -38,8 +40,16 @@ func (r *Rutina) With(mixins ...Mixin) *Rutina {
return r
}

func (r *Rutina) RegisterHook(ev Event, hook Hook) {
r.hooks[ev] = append(r.hooks[ev], hook)
}

// Go routine
func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
// Check that context is not canceled yet
if r.ctx.Err() != nil {
return
}
onFail := ShutdownIfFail
for _, o := range opts {
switch o {
Expand All @@ -66,49 +76,39 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
r.wg.Add(1)
go func() {
defer r.wg.Done()
// Check that context is not canceled yet
if r.ctx.Err() != nil {
return
}
id := atomic.AddUint64(r.counter, 1)
r.log("starting #%d", id)
r.fire(EventRoutineStart, int(id))
if err := doer(r.ctx); err != nil {
r.fire(EventRoutineFail, int(id))
r.fire(EventRoutineStop, int(id))
// errors history
if r.errCh != nil {
r.errCh <- err
}
// region routine failed
r.log("error at #%d : %v", id, err)
switch onFail {
case ShutdownIfFail:
r.log("stopping #%d", id)
// Save error only if shutdown all routines
r.o.Do(func() {
r.onceErr.Do(func() {
r.err = err
})
r.Cancel()
case RestartIfFail:
r.log("restarting #%d", id)
r.Go(doer, opts...)
case DoNothingIfFail:
r.log("stopping #%d", id)
}
// endregion
} else {
r.fire(EventRoutineComplete, int(id))
r.fire(EventRoutineStop, int(id))
// region routine successfully done
switch onDone {
case ShutdownIfDone:
r.log("stopping #%d with shutdown", id)
r.Cancel()
case RestartIfDone:
r.log("restarting #%d", id)
r.Go(doer, opts...)
case DoNothingIfDone:
r.log("stopping #%d", id)
}
// endregion
}

}()
}

Expand Down Expand Up @@ -138,10 +138,34 @@ func (r *Rutina) ListenOsSignals(signals ...os.Signal) {

// Wait all routines and returns first error or nil if all routines completes without errors
func (r *Rutina) Wait() error {
r.wg.Wait()
r.onceWait.Do(func() {
r.wg.Wait()
r.fire(EventAppStop, 0)
if r.err == nil {
r.fire(EventAppComplete, 0)
} else {
r.fire(EventAppFail, 0)
}
if r.errCh != nil {
close(r.errCh)
}
})
return r.err
}

func (r *Rutina) fire(ev Event, rid int) {
r.log("Event = %s Routine ID = %d", ev.String(), rid)
if hooks, ok := r.hooks[ev]; ok == true {
for _, h := range hooks {
if err := h(ev, r, rid); err != nil {
if r.errCh != nil {
r.errCh <- err
}
}
}
}
}

// Log if can
func (r *Rutina) log(format string, args ...interface{}) {
if r.logger != nil {
Expand Down

0 comments on commit c691d42

Please sign in to comment.