Skip to content

Commit

Permalink
mailbox: use prefixed logger
Browse files Browse the repository at this point in the history
  • Loading branch information
ellemouton committed Nov 22, 2023
1 parent b2f5f14 commit 22ef935
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 55 deletions.
14 changes: 9 additions & 5 deletions mailbox/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"sync"

"github.com/btcsuite/btclog"
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -38,6 +39,8 @@ type Client struct {
sid [64]byte

ctx context.Context //nolint:containedctx

log btclog.Logger
}

// NewClient creates a new Client object which will handle the mailbox
Expand All @@ -56,6 +59,7 @@ func NewClient(ctx context.Context, serverHost string, connData *ConnData,
connData: connData,
status: ClientStatusNotConnected,
sid: sid,
log: newPrefixedLogger(false),
}

// Apply functional options.
Expand Down Expand Up @@ -98,12 +102,12 @@ func (c *Client) Dial(_ context.Context, _ string) (net.Conn, error) {
// If there is currently an active connection, block here until the
// previous connection as been closed.
if c.mailboxConn != nil {
log.Debugf("Dial: have existing mailbox connection, waiting")
c.log.Debugf("Dial: have existing mailbox connection, waiting")
<-c.mailboxConn.Done()
log.Debugf("Dial: done with existing conn")
c.log.Debugf("Dial: done with existing conn")
}

log.Debugf("Client: Dialing...")
c.log.Debugf("Dialing...")

sid, err := c.connData.SID()
if err != nil {
Expand All @@ -115,7 +119,7 @@ func (c *Client) Dial(_ context.Context, _ string) (net.Conn, error) {
if !bytes.Equal(c.sid[:], sid[:]) && c.mailboxConn != nil {
err := c.mailboxConn.Close()
if err != nil {
log.Errorf("could not close mailbox conn: %v", err)
c.log.Errorf("Could not close mailbox conn: %v", err)
}

c.mailboxConn = nil
Expand All @@ -126,7 +130,7 @@ func (c *Client) Dial(_ context.Context, _ string) (net.Conn, error) {
if c.mailboxConn == nil {
mailboxConn, err := NewClientConn(
c.ctx, c.sid, c.serverHost, c.grpcClient,
func(status ClientStatus) {
c.log, func(status ClientStatus) {
c.statusMu.Lock()
c.status = status
c.statusMu.Unlock()
Expand Down
47 changes: 30 additions & 17 deletions mailbox/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/btcsuite/btclog"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/lightninglabs/lightning-node-connect/gbn"
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
Expand Down Expand Up @@ -124,13 +125,15 @@ type ClientConn struct {
quit chan struct{}
cancel func()
closeOnce sync.Once

log btclog.Logger
}

// NewClientConn creates a new client connection with the given receive and send
// session identifiers. The context given as the first parameter will be used
// throughout the connection lifetime.
func NewClientConn(ctx context.Context, sid [64]byte, serverHost string,
client hashmailrpc.HashMailClient,
client hashmailrpc.HashMailClient, logger btclog.Logger,
onNewStatus func(status ClientStatus)) (*ClientConn, error) {

receiveSID := GetSID(sid, true)
Expand All @@ -141,7 +144,7 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string,
sendSID: sendSID[:],
}

log.Debugf("New client conn, read_stream=%x, write_stream=%x",
logger.Debugf("New client conn, read_stream=%x, write_stream=%x",
receiveSID[:], sendSID[:])

ctxc, cancel := context.WithCancel(ctx)
Expand All @@ -166,6 +169,7 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string,
onNewStatus: onNewStatus,
quit: make(chan struct{}),
cancel: cancel,
log: logger,
}
c.connKit = &connKit{
ctx: ctxc,
Expand Down Expand Up @@ -201,10 +205,11 @@ func RefreshClientConn(ctx context.Context, c *ClientConn) (*ClientConn,
c.statusMu.Lock()
defer c.statusMu.Unlock()

log.Debugf("Refreshing client conn, read_stream=%x, write_stream=%x",
c.log.Debugf("Refreshing client conn, read_stream=%x, write_stream=%x",
c.receiveSID[:], c.sendSID[:])

cc := &ClientConn{
log: c.log,
transport: c.transport.Refresh(),
status: ClientStatusNotConnected,
onNewStatus: c.onNewStatus,
Expand Down Expand Up @@ -299,7 +304,7 @@ func (c *ClientConn) recv(ctx context.Context) ([]byte, error) {
return nil, err
}

log.Debugf("Client: got failure on receive "+
c.log.Debugf("Client: got failure on receive "+
"socket/stream, re-trying: %v", err)

c.setStatus(errStatus)
Expand Down Expand Up @@ -343,8 +348,8 @@ func (c *ClientConn) send(ctx context.Context, payload []byte) error {
return err
}

log.Debugf("Client: got failure on send "+
"socket/stream, re-trying: %v", err)
c.log.Debugf("Got failure on send socket/stream, "+
"re-trying: %v", err)

c.setStatus(errStatus)
c.createSendMailBox(ctx, retryWait)
Expand Down Expand Up @@ -377,13 +382,14 @@ func (c *ClientConn) createReceiveMailBox(ctx context.Context,
waiter.Wait()

if err := c.transport.ConnectReceive(ctx); err != nil {
log.Errorf("Client: error connecting to receive "+
c.log.Errorf("Error connecting to receive "+
"socket/stream: %v", err)

continue
}

log.Debugf("Client: receive mailbox initialized")
c.log.Debugf("Receive mailbox initialized")

return
}
}
Expand All @@ -406,14 +412,17 @@ func (c *ClientConn) createSendMailBox(ctx context.Context,

waiter.Wait()

log.Debugf("Client: Attempting to create send socket/stream")
c.log.Debugf("Attempting to create send socket/stream")

if err := c.transport.ConnectSend(ctx); err != nil {
log.Debugf("Client: error connecting to send "+
c.log.Debugf("Error connecting to send "+
"stream/socket %v", err)

continue
}

log.Debugf("Client: Connected to send socket/stream")
c.log.Debugf("Connected to send socket/stream")

return
}
}
Expand Down Expand Up @@ -460,29 +469,33 @@ func (c *ClientConn) SetSendTimeout(timeout time.Duration) {
func (c *ClientConn) Close() error {
var returnErr error
c.closeOnce.Do(func() {
log.Debugf("Closing client connection")
c.log.Debugf("Closing connection")

if c.gbnConn != nil {
if err := c.gbnConn.Close(); err != nil {
log.Debugf("Error closing gbn connection: %v",
c.log.Debugf("Error closing gbn connection: %v",
err)

returnErr = err
}
}

c.receiveMu.Lock()
log.Debugf("closing receive stream/socket")
c.log.Debugf("Closing receive stream/socket")
if err := c.transport.CloseReceive(); err != nil {
log.Errorf("Error closing receive stream/socket: %v", err)
c.log.Errorf("Error closing receive stream/socket: %v",
err)

returnErr = err
}
c.receiveMu.Unlock()

c.sendMu.Lock()
log.Debugf("closing send stream/socket")
c.log.Debugf("Closing send stream/socket")
if err := c.transport.CloseSend(); err != nil {
log.Errorf("Error closing send stream/socket: %v", err)
c.log.Errorf("Error closing send stream/socket: %v",
err)

returnErr = err
}
c.sendMu.Unlock()
Expand Down
13 changes: 13 additions & 0 deletions mailbox/log.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mailbox

import (
"fmt"

"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
"google.golang.org/grpc/grpclog"
Expand Down Expand Up @@ -29,6 +31,17 @@ func UseLogger(logger btclog.Logger) {
log = logger
}

// nePrefixedLogger constructs a new prefixed logger.
func newPrefixedLogger(isServer bool) *build.PrefixLog {
identifier := "client"
if isServer {
identifier = "server"
}
prefix := fmt.Sprintf("(%s)", identifier)

return build.NewPrefixLog(prefix, log)
}

// GrpcLogLogger is a wrapper around a btclog logger to make it compatible with
// the grpclog logger package. By default we downgrade the info level to debug
// to reduce the verbosity of the logger.
Expand Down
19 changes: 13 additions & 6 deletions mailbox/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"

"github.com/btcsuite/btclog"
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
"google.golang.org/grpc"
)
Expand All @@ -30,6 +31,8 @@ type Server struct {

quit chan struct{}
cancel func()

log btclog.Logger
}

func NewServer(serverHost string, connData *ConnData,
Expand All @@ -55,6 +58,7 @@ func NewServer(serverHost string, connData *ConnData,
connData: connData,
sid: sid,
onNewStatus: onNewStatus,
log: newPrefixedLogger(true),
quit: make(chan struct{}),
}

Expand All @@ -79,12 +83,14 @@ func (s *Server) Accept() (net.Conn, error) {
// If there is currently an active connection, block here until the
// previous connection as been closed.
if s.mailboxConn != nil {
log.Debugf("Accept: have existing mailbox connection, waiting")
s.log.Debugf("Accept: have existing mailbox connection, " +
"waiting")

select {
case <-s.quit:
return nil, io.EOF
case <-s.mailboxConn.Done():
log.Debugf("Accept: done with existing conn")
s.log.Debugf("Accept: done with existing conn")
}
}

Expand All @@ -98,7 +104,7 @@ func (s *Server) Accept() (net.Conn, error) {
if !bytes.Equal(s.sid[:], sid[:]) && s.mailboxConn != nil {
err := s.mailboxConn.Stop()
if err != nil {
log.Errorf("could not close mailbox conn: %v", err)
s.log.Errorf("Could not close mailbox conn: %v", err)
}

s.mailboxConn = nil
Expand All @@ -110,7 +116,8 @@ func (s *Server) Accept() (net.Conn, error) {
// otherwise, we just refresh the ServerConn.
if s.mailboxConn == nil {
mailboxConn, err := NewServerConn(
s.ctx, s.serverHost, s.client, sid, s.onNewStatus,
s.ctx, s.serverHost, s.client, sid, s.log,
s.onNewStatus,
)
if err != nil {
return nil, &temporaryError{err}
Expand Down Expand Up @@ -143,13 +150,13 @@ func (e *temporaryError) Temporary() bool {
}

func (s *Server) Close() error {
log.Debugf("conn being closed")
s.log.Debugf("Conn being closed")

close(s.quit)

if s.mailboxConn != nil {
if err := s.mailboxConn.Stop(); err != nil {
log.Errorf("error closing mailboxConn %v", err)
s.log.Errorf("Error closing mailboxConn %v", err)
}
}
s.cancel()
Expand Down
Loading

0 comments on commit 22ef935

Please sign in to comment.