diff --git a/mailbox/client.go b/mailbox/client.go index cb37ed1..14ec95a 100644 --- a/mailbox/client.go +++ b/mailbox/client.go @@ -7,6 +7,7 @@ import ( "net" "sync" + "github.com/btcsuite/btclog" "github.com/lightninglabs/lightning-node-connect/hashmailrpc" "google.golang.org/grpc" ) @@ -38,6 +39,8 @@ type Client struct { sid [64]byte ctx context.Context //nolint:containedctx + + log btclog.Logger } // NewClient creates a new Client object which will handle the mailbox @@ -56,6 +59,7 @@ func NewClient(ctx context.Context, serverHost string, connData *ConnData, connData: connData, status: ClientStatusNotConnected, sid: sid, + log: newPrefixedLogger(false), } // Apply functional options. @@ -98,12 +102,12 @@ func (c *Client) Dial(_ context.Context, _ string) (net.Conn, error) { // If there is currently an active connection, block here until the // previous connection as been closed. if c.mailboxConn != nil { - log.Debugf("Dial: have existing mailbox connection, waiting") + c.log.Debugf("Dial: have existing mailbox connection, waiting") <-c.mailboxConn.Done() - log.Debugf("Dial: done with existing conn") + c.log.Debugf("Dial: done with existing conn") } - log.Debugf("Client: Dialing...") + c.log.Debugf("Dialing...") sid, err := c.connData.SID() if err != nil { @@ -115,7 +119,7 @@ func (c *Client) Dial(_ context.Context, _ string) (net.Conn, error) { if !bytes.Equal(c.sid[:], sid[:]) && c.mailboxConn != nil { err := c.mailboxConn.Close() if err != nil { - log.Errorf("could not close mailbox conn: %v", err) + c.log.Errorf("Could not close mailbox conn: %v", err) } c.mailboxConn = nil @@ -126,7 +130,7 @@ func (c *Client) Dial(_ context.Context, _ string) (net.Conn, error) { if c.mailboxConn == nil { mailboxConn, err := NewClientConn( c.ctx, c.sid, c.serverHost, c.grpcClient, - func(status ClientStatus) { + c.log, func(status ClientStatus) { c.statusMu.Lock() c.status = status c.statusMu.Unlock() diff --git a/mailbox/client_conn.go b/mailbox/client_conn.go index b642834..4edd9fc 100644 --- a/mailbox/client_conn.go +++ b/mailbox/client_conn.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/btcsuite/btclog" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/lightninglabs/lightning-node-connect/gbn" "github.com/lightninglabs/lightning-node-connect/hashmailrpc" @@ -124,13 +125,15 @@ type ClientConn struct { quit chan struct{} cancel func() closeOnce sync.Once + + log btclog.Logger } // NewClientConn creates a new client connection with the given receive and send // session identifiers. The context given as the first parameter will be used // throughout the connection lifetime. func NewClientConn(ctx context.Context, sid [64]byte, serverHost string, - client hashmailrpc.HashMailClient, + client hashmailrpc.HashMailClient, logger btclog.Logger, onNewStatus func(status ClientStatus)) (*ClientConn, error) { receiveSID := GetSID(sid, true) @@ -141,7 +144,7 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string, sendSID: sendSID[:], } - log.Debugf("New client conn, read_stream=%x, write_stream=%x", + logger.Debugf("New client conn, read_stream=%x, write_stream=%x", receiveSID[:], sendSID[:]) ctxc, cancel := context.WithCancel(ctx) @@ -166,6 +169,7 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string, onNewStatus: onNewStatus, quit: make(chan struct{}), cancel: cancel, + log: logger, } c.connKit = &connKit{ ctx: ctxc, @@ -201,10 +205,11 @@ func RefreshClientConn(ctx context.Context, c *ClientConn) (*ClientConn, c.statusMu.Lock() defer c.statusMu.Unlock() - log.Debugf("Refreshing client conn, read_stream=%x, write_stream=%x", + c.log.Debugf("Refreshing client conn, read_stream=%x, write_stream=%x", c.receiveSID[:], c.sendSID[:]) cc := &ClientConn{ + log: c.log, transport: c.transport.Refresh(), status: ClientStatusNotConnected, onNewStatus: c.onNewStatus, @@ -299,7 +304,7 @@ func (c *ClientConn) recv(ctx context.Context) ([]byte, error) { return nil, err } - log.Debugf("Client: got failure on receive "+ + c.log.Debugf("Client: got failure on receive "+ "socket/stream, re-trying: %v", err) c.setStatus(errStatus) @@ -343,8 +348,8 @@ func (c *ClientConn) send(ctx context.Context, payload []byte) error { return err } - log.Debugf("Client: got failure on send "+ - "socket/stream, re-trying: %v", err) + c.log.Debugf("Got failure on send socket/stream, "+ + "re-trying: %v", err) c.setStatus(errStatus) c.createSendMailBox(ctx, retryWait) @@ -377,13 +382,14 @@ func (c *ClientConn) createReceiveMailBox(ctx context.Context, waiter.Wait() if err := c.transport.ConnectReceive(ctx); err != nil { - log.Errorf("Client: error connecting to receive "+ + c.log.Errorf("Error connecting to receive "+ "socket/stream: %v", err) continue } - log.Debugf("Client: receive mailbox initialized") + c.log.Debugf("Receive mailbox initialized") + return } } @@ -406,14 +412,17 @@ func (c *ClientConn) createSendMailBox(ctx context.Context, waiter.Wait() - log.Debugf("Client: Attempting to create send socket/stream") + c.log.Debugf("Attempting to create send socket/stream") + if err := c.transport.ConnectSend(ctx); err != nil { - log.Debugf("Client: error connecting to send "+ + c.log.Debugf("Error connecting to send "+ "stream/socket %v", err) + continue } - log.Debugf("Client: Connected to send socket/stream") + c.log.Debugf("Connected to send socket/stream") + return } } @@ -460,11 +469,11 @@ func (c *ClientConn) SetSendTimeout(timeout time.Duration) { func (c *ClientConn) Close() error { var returnErr error c.closeOnce.Do(func() { - log.Debugf("Closing client connection") + c.log.Debugf("Closing connection") if c.gbnConn != nil { if err := c.gbnConn.Close(); err != nil { - log.Debugf("Error closing gbn connection: %v", + c.log.Debugf("Error closing gbn connection: %v", err) returnErr = err @@ -472,17 +481,21 @@ func (c *ClientConn) Close() error { } c.receiveMu.Lock() - log.Debugf("closing receive stream/socket") + c.log.Debugf("Closing receive stream/socket") if err := c.transport.CloseReceive(); err != nil { - log.Errorf("Error closing receive stream/socket: %v", err) + c.log.Errorf("Error closing receive stream/socket: %v", + err) + returnErr = err } c.receiveMu.Unlock() c.sendMu.Lock() - log.Debugf("closing send stream/socket") + c.log.Debugf("Closing send stream/socket") if err := c.transport.CloseSend(); err != nil { - log.Errorf("Error closing send stream/socket: %v", err) + c.log.Errorf("Error closing send stream/socket: %v", + err) + returnErr = err } c.sendMu.Unlock() diff --git a/mailbox/log.go b/mailbox/log.go index 13cbb5f..98f8a20 100644 --- a/mailbox/log.go +++ b/mailbox/log.go @@ -1,6 +1,8 @@ package mailbox import ( + "fmt" + "github.com/btcsuite/btclog" "github.com/lightningnetwork/lnd/build" "google.golang.org/grpc/grpclog" @@ -29,6 +31,17 @@ func UseLogger(logger btclog.Logger) { log = logger } +// nePrefixedLogger constructs a new prefixed logger. +func newPrefixedLogger(isServer bool) *build.PrefixLog { + identifier := "client" + if isServer { + identifier = "server" + } + prefix := fmt.Sprintf("(%s)", identifier) + + return build.NewPrefixLog(prefix, log) +} + // GrpcLogLogger is a wrapper around a btclog logger to make it compatible with // the grpclog logger package. By default we downgrade the info level to debug // to reduce the verbosity of the logger. diff --git a/mailbox/server.go b/mailbox/server.go index f810fbc..c80314e 100644 --- a/mailbox/server.go +++ b/mailbox/server.go @@ -7,6 +7,7 @@ import ( "io" "net" + "github.com/btcsuite/btclog" "github.com/lightninglabs/lightning-node-connect/hashmailrpc" "google.golang.org/grpc" ) @@ -30,6 +31,8 @@ type Server struct { quit chan struct{} cancel func() + + log btclog.Logger } func NewServer(serverHost string, connData *ConnData, @@ -55,6 +58,7 @@ func NewServer(serverHost string, connData *ConnData, connData: connData, sid: sid, onNewStatus: onNewStatus, + log: newPrefixedLogger(true), quit: make(chan struct{}), } @@ -79,12 +83,14 @@ func (s *Server) Accept() (net.Conn, error) { // If there is currently an active connection, block here until the // previous connection as been closed. if s.mailboxConn != nil { - log.Debugf("Accept: have existing mailbox connection, waiting") + s.log.Debugf("Accept: have existing mailbox connection, " + + "waiting") + select { case <-s.quit: return nil, io.EOF case <-s.mailboxConn.Done(): - log.Debugf("Accept: done with existing conn") + s.log.Debugf("Accept: done with existing conn") } } @@ -98,7 +104,7 @@ func (s *Server) Accept() (net.Conn, error) { if !bytes.Equal(s.sid[:], sid[:]) && s.mailboxConn != nil { err := s.mailboxConn.Stop() if err != nil { - log.Errorf("could not close mailbox conn: %v", err) + s.log.Errorf("Could not close mailbox conn: %v", err) } s.mailboxConn = nil @@ -110,7 +116,8 @@ func (s *Server) Accept() (net.Conn, error) { // otherwise, we just refresh the ServerConn. if s.mailboxConn == nil { mailboxConn, err := NewServerConn( - s.ctx, s.serverHost, s.client, sid, s.onNewStatus, + s.ctx, s.serverHost, s.client, sid, s.log, + s.onNewStatus, ) if err != nil { return nil, &temporaryError{err} @@ -143,13 +150,13 @@ func (e *temporaryError) Temporary() bool { } func (s *Server) Close() error { - log.Debugf("conn being closed") + s.log.Debugf("Conn being closed") close(s.quit) if s.mailboxConn != nil { if err := s.mailboxConn.Stop(); err != nil { - log.Errorf("error closing mailboxConn %v", err) + s.log.Errorf("Error closing mailboxConn %v", err) } } s.cancel() diff --git a/mailbox/server_conn.go b/mailbox/server_conn.go index a1c8930..ac75055 100644 --- a/mailbox/server_conn.go +++ b/mailbox/server_conn.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/btcsuite/btclog" "github.com/lightninglabs/lightning-node-connect/gbn" "github.com/lightninglabs/lightning-node-connect/hashmailrpc" "google.golang.org/grpc/codes" @@ -55,6 +56,8 @@ type ServerConn struct { onNewStatus func(status ServerStatus) statusMu sync.Mutex + log btclog.Logger + cancel func() quit chan struct{} @@ -64,7 +67,7 @@ type ServerConn struct { // NewServerConn creates a new net.Conn compatible server connection that uses // a gRPC based connection to tunnel traffic over a mailbox server. func NewServerConn(ctx context.Context, serverHost string, - client hashmailrpc.HashMailClient, sid [64]byte, + client hashmailrpc.HashMailClient, sid [64]byte, logger btclog.Logger, onNewStatus func(status ServerStatus)) (*ServerConn, error) { ctxc, cancel := context.WithCancel(ctx) @@ -84,6 +87,7 @@ func NewServerConn(ctx context.Context, serverHost string, ), }, status: ServerStatusNotConnected, + log: logger, onNewStatus: onNewStatus, } c.connKit = &connKit{ @@ -94,7 +98,7 @@ func NewServerConn(ctx context.Context, serverHost string, sendSID: sendSID, } - log.Debugf("ServerConn: creating gbn, waiting for sync") + logger.Debugf("Creating gbn, waiting for sync") var err error c.gbnConn, err = gbn.NewServerConn( ctxc, c.sendToStream, c.recvFromStream, c.gbnOptions..., @@ -102,7 +106,7 @@ func NewServerConn(ctx context.Context, serverHost string, if err != nil { return nil, err } - log.Debugf("ServerConn: done creating gbn") + logger.Debugf("Done creating gbn") return c, nil } @@ -128,6 +132,7 @@ func RefreshServerConn(s *ServerConn) (*ServerConn, error) { cancel: s.cancel, status: ServerStatusNotConnected, onNewStatus: s.onNewStatus, + log: s.log, quit: make(chan struct{}), } @@ -140,7 +145,7 @@ func RefreshServerConn(s *ServerConn) (*ServerConn, error) { sendSID: s.connKit.sendSID, } - log.Debugf("ServerConn: creating gbn") + s.log.Debugf("ServerConn: creating gbn") var err error sc.gbnConn, err = gbn.NewServerConn( sc.ctx, sc.sendToStream, sc.recvFromStream, sc.gbnOptions..., @@ -149,7 +154,7 @@ func RefreshServerConn(s *ServerConn) (*ServerConn, error) { return nil, err } - log.Debugf("ServerConn: done creating gbn") + s.log.Debugf("ServerConn: done creating gbn") return sc, nil } @@ -174,7 +179,7 @@ func (c *ServerConn) recvFromStream(ctx context.Context) ([]byte, error) { c.receiveStreamMu.Lock() controlMsg, err := c.receiveStream.Recv() if err != nil { - log.Debugf("Server: got failure on receive socket, "+ + c.log.Debugf("Got failure on receive socket, "+ "re-trying: %v", err) c.setStatus(ServerStatusNotConnected) @@ -215,7 +220,7 @@ func (c *ServerConn) sendToStream(ctx context.Context, payload []byte) error { Msg: payload, }) if err != nil { - log.Debugf("Server: got failure on send socket, "+ + c.log.Debugf("Got failure on send socket, "+ "re-trying: %v", err) c.setStatus(ServerStatusNotConnected) @@ -289,8 +294,8 @@ func (c *ServerConn) createReceiveMailBox(ctx context.Context, // Create receive mailbox and get receive stream. err := initAccountCipherBox(ctx, c.client, c.receiveSID) if err != nil && !isErrAlreadyExists(err) { - log.Debugf("Server: failed to re-create read stream "+ - "mbox: %v", err) + c.log.Debugf("Failed to re-create read stream mbox: %v", + err) continue } @@ -306,8 +311,7 @@ func (c *ServerConn) createReceiveMailBox(ctx context.Context, } readStream, err := c.client.RecvStream(ctx, streamDesc) if err != nil { - log.Debugf("Server: failed to create read stream: %w", - err) + c.log.Debugf("Failed to create read stream: %w", err) continue } @@ -315,7 +319,8 @@ func (c *ServerConn) createReceiveMailBox(ctx context.Context, c.setStatus(ServerStatusIdle) c.receiveStream = readStream - log.Debugf("Server: receive mailbox created") + c.log.Debugf("Receive mailbox created") + return } } @@ -341,7 +346,8 @@ func (c *ServerConn) createSendMailBox(ctx context.Context, // Create send mailbox and get send stream. err := initAccountCipherBox(ctx, c.client, c.sendSID) if err != nil && !isErrAlreadyExists(err) { - log.Debugf("error creating send cipher box: %v", err) + c.log.Debugf("Error creating send cipher box: %v", err) + continue } c.sendBoxCreated = true @@ -353,12 +359,14 @@ func (c *ServerConn) createSendMailBox(ctx context.Context, // and exit if needed. writeStream, err := c.client.SendStream(ctx) if err != nil { - log.Debugf("unable to create send stream: %w", err) + c.log.Debugf("Unable to create send stream: %w", err) + continue } c.sendStream = writeStream - log.Debugf("Server: Send mailbox created") + c.log.Debugf("Send mailbox created") + return } } @@ -368,19 +376,23 @@ func (c *ServerConn) createSendMailBox(ctx context.Context, func (c *ServerConn) Stop() error { var returnErr error if err := c.Close(); err != nil { - log.Errorf("error closing mailbox") + c.log.Errorf("Error closing mailbox") + returnErr = err } if c.receiveBoxCreated { - if err := delCipherBox(c.ctx, c.client, c.receiveSID); err != nil { - log.Errorf("error removing receive cipher box: %v", err) + err := delCipherBox(c.ctx, c.client, c.receiveSID) + if err != nil { + c.log.Errorf("Error removing receive cipher box: %v", + err) + returnErr = err } } if c.sendBoxCreated { if err := delCipherBox(c.ctx, c.client, c.sendSID); err != nil { - log.Errorf("error removing send cipher box: %v", err) + c.log.Errorf("Error removing send cipher box: %v", err) returnErr = err } } @@ -403,34 +415,39 @@ func (c *ServerConn) Close() error { var returnErr error c.closeOnce.Do(func() { - log.Debugf("Server connection is closing") + c.log.Debugf("Connection is closing") if c.gbnConn != nil { if err := c.gbnConn.Close(); err != nil { - log.Debugf("Error closing gbn connection in " + - "server conn") + c.log.Debugf("Error closing gbn connection " + + "in server conn") + returnErr = err } } if c.receiveStream != nil { - log.Debugf("closing receive stream") + c.log.Debugf("Closing receive stream") if err := c.receiveStream.CloseSend(); err != nil { - log.Errorf("error closing receive stream: %v", err) + c.log.Errorf("Error closing receive stream: %v", + err) + returnErr = err } } if c.sendStream != nil { - log.Debugf("closing send stream") + c.log.Debugf("Closing send stream") if err := c.sendStream.CloseSend(); err != nil { - log.Errorf("error closing send stream: %v", err) + c.log.Errorf("Error closing send stream: %v", + err) + returnErr = err } } close(c.quit) - log.Debugf("Server connection closed") + c.log.Debugf("Connection closed") }) return returnErr