Skip to content

Commit

Permalink
Merge pull request #141 from noxiouz/tiny_fixes
Browse files Browse the repository at this point in the history
daemon: bugfix in when more than 1 listener is handled
  • Loading branch information
noxiouz authored Feb 13, 2017
2 parents 9b8fbc0 + a9642c8 commit 50802f4
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 26 deletions.
76 changes: 53 additions & 23 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ import (
)

type Daemon struct {
boxes isolate.Boxes
cfg *config.Config
boxes isolate.Boxes
cfg *config.Config
listeners []net.Listener

muListeners sync.Mutex
}

func New(ctx context.Context, configuration *config.Config) (*Daemon, error) {
checkLimits(ctx)
d := Daemon{
cfg: configuration,
boxes: make(isolate.Boxes),
cfg: configuration,
boxes: make(isolate.Boxes),
listeners: make([]net.Listener, 0),
}

boxTypes := map[string]struct{}{}
Expand Down Expand Up @@ -90,43 +94,52 @@ func (d *Daemon) RegisterHTTPHandlers(ctx context.Context, mux *http.ServeMux) {
}

func (d *Daemon) Serve(ctx context.Context) error {
logger := log.G(ctx)
ctx, cancelFunc := context.WithCancel(context.WithValue(ctx, isolate.BoxesTag, d.boxes))
defer cancelFunc()

var wg sync.WaitGroup
var listeners = make([]net.Listener, 0, len(d.cfg.Endpoints))
for _, endpoint := range d.cfg.Endpoints {
logger.WithField("endpoint", endpoint).Info("start TCP server")
log.G(ctx).WithField("endpoint", endpoint).Info("start TCP server")
ln, err := net.Listen("tcp", endpoint)
if err != nil {
logger.WithError(err).WithField("endpoint", endpoint).Fatal("unable to listen to")
log.G(ctx).WithError(err).WithField("endpoint", endpoint).Error("unable to listen to")
closeListeners(listeners)
return err
}
defer ln.Close()
listeners = append(listeners, ln)
}

return d.ServeOnListeners(ctx, listeners)
}

func (d *Daemon) ServeOnListeners(ctx context.Context, listeners []net.Listener) error {
d.closeListeners()

wg.Add(0)
func(ln net.Listener) {
d.muListeners.Lock()
d.listeners = listeners
d.muListeners.Unlock()

ctx, cancelFunc := context.WithCancel(context.WithValue(ctx, isolate.BoxesTag, d.boxes))
defer cancelFunc()

var wg sync.WaitGroup
for _, ln := range listeners {
wg.Add(1)
go func(ln net.Listener) {
defer wg.Done()
lnLogger := logger.WithField("listener", ln.Addr())
lnLogger := log.G(ctx).WithField("listener", ln.Addr())
for {
conn, err := ln.Accept()
if err != nil {
lnLogger.WithError(err).Error("Accept")
continue
return
}

// TODO: more optimal way
connID := fmt.Sprintf("%.3x", md5.Sum([]byte(fmt.Sprintf("%s.%d", conn.RemoteAddr().String(), time.Now().Unix()))))
lnLogger.WithFields(apexlog.Fields{"remote.addr": conn.RemoteAddr(), "conn.id": connID}).Info("accepted new connection")

connHandler, err := isolate.NewConnectionHandler(context.WithValue(ctx, "conn.id", connID))
if err != nil {
lnLogger.WithError(err).Fatal("unable to create connection handler")
}

go func() {
conns.Inc(0)
defer conns.Dec(0)
connHandler.HandleConn(conn)
isolate.NewConnectionHandler(context.WithValue(ctx, "conn.id", connID)).HandleConn(conn)
}()
}
}(ln)
Expand All @@ -136,8 +149,25 @@ func (d *Daemon) Serve(ctx context.Context) error {
return nil
}

func (d *Daemon) Close() {
func closeListeners(listeners []net.Listener) {
for _, ln := range listeners {
ln.Close()
}
}

func (d *Daemon) closeListeners() {
d.muListeners.Lock()
defer d.muListeners.Unlock()
closeListeners(d.listeners)
}

func (d *Daemon) closeBoxes() {
for _, box := range d.boxes {
box.Close()
}
}

func (d *Daemon) Close() {
d.closeListeners()
d.closeBoxes()
}
6 changes: 3 additions & 3 deletions isolate/conn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ type ConnectionHandler struct {
}

// NewConnectionHandler creates new ConnectionHandler
func NewConnectionHandler(ctx context.Context) (*ConnectionHandler, error) {
func NewConnectionHandler(ctx context.Context) *ConnectionHandler {
return newConnectionHandler(ctx, newInitialDispatch)
}

func newConnectionHandler(ctx context.Context, newDisp dispatcherInit) (*ConnectionHandler, error) {
func newConnectionHandler(ctx context.Context, newDisp dispatcherInit) *ConnectionHandler {
connID := getID(ctx)
ctx = log.WithLogger(ctx, log.G(ctx).WithField("conn.id", connID))

Expand All @@ -77,7 +77,7 @@ func newConnectionHandler(ctx context.Context, newDisp dispatcherInit) (*Connect
newDispatcher: newDisp,

connID: connID,
}, nil
}
}

func getID(ctx context.Context) string {
Expand Down

0 comments on commit 50802f4

Please sign in to comment.