Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor reactor wiring logic to support parallel processing of messages #1534

Open
evan-forbes opened this issue Nov 18, 2024 · 0 comments
Open
Labels
p2p refactor WS: Big Blonks 🔭 Improving consensus critical gossiping protocols

Comments

@evan-forbes
Copy link
Member

We can refactor the wiring of the Reactor code to be able to process multiple messages at the same time. This is likely crucial for stopping long message processing times from blocking other messages from coming in.

While this isn't a bottleneck atm, as the big blonks testnet was not using this and was capable of hitting 27MB/s even without such changes. However it would likely dramatically limit the damage that could be done by inscription events, where the network was being spammed with many small stateful messages that each requires blocking calls to CheckTx.

relevant code and potential implementations below

// recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
// After a whole message has been assembled, it's pushed to onReceive().
// Blocks depending on how the connection is throttled.
// Otherwise, it never blocks.
func (c *MConnection) recvRoutine() {
defer c._recover()
protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)
FOR_LOOP:
for {
// Block until .recvMonitor says we can read.
c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
// Peek into bufConnReader for debugging
/*
if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
bz, err := c.bufConnReader.Peek(cmtmath.MinInt(numBytes, 100))
if err == nil {
// return
} else {
c.Logger.Debug("Error peeking connection buffer", "err", err)
// return nil
}
c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
}
*/
// Read packet type
var packet tmp2p.Packet
_n, err := protoReader.ReadMsg(&packet)
c.recvMonitor.Update(_n)
if err != nil {
// stopServices was invoked and we are shutting down
// receiving is excpected to fail since we will close the connection
select {
case <-c.quitRecvRoutine:
break FOR_LOOP
default:
}
if c.IsRunning() {
if err == io.EOF {
c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
} else {
c.Logger.Debug("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
}
c.stopForError(err)
}
break FOR_LOOP
}
// Read more depending on packet type.
switch pkt := packet.Sum.(type) {
case *tmp2p.Packet_PacketPing:
// TODO: prevent abuse, as they cause flush()'s.
// https://github.com/tendermint/tendermint/issues/1190
c.Logger.Debug("Receive Ping")
select {
case c.pong <- struct{}{}:
default:
// never block
}
case *tmp2p.Packet_PacketPong:
c.Logger.Debug("Receive Pong")
select {
case c.pongTimeoutCh <- false:
default:
// never block
}
case *tmp2p.Packet_PacketMsg:
channelID := byte(pkt.PacketMsg.ChannelID)
channel, ok := c.channelsIdx[channelID]
if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil {
err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
break FOR_LOOP
}
msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
if err != nil {
if c.IsRunning() {
c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
}
break FOR_LOOP
}
if msgBytes != nil {
c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes)
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
c.onReceive(channelID, msgBytes)
}
default:
err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
break FOR_LOOP
}
}
// Cleanup
close(c.pong)
for range c.pong {
// Drain
}
}

celestia-core/p2p/peer.go

Lines 548 to 586 in 8581b47

onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
// Note that its ok to panic here as it's caught in the conn._recover,
// which does onPeerError.
panic(fmt.Sprintf("Unknown channel %X", chID))
}
mt := msgTypeByChID[chID]
msg := proto.Clone(mt)
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(fmt.Errorf("unmarshaling message: %s into type: %s", err, reflect.TypeOf(mt)))
}
if w, ok := msg.(Unwrapper); ok {
msg, err = w.Unwrap()
if err != nil {
panic(fmt.Errorf("unwrapping message: %s", err))
}
}
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With(append(labels, "message_type", p.mlc.ValueToMetricLabel(msg))...).Add(float64(len(msgBytes)))
schema.WriteReceivedBytes(p.traceClient, string(p.ID()), chID, len(msgBytes))
if nr, ok := reactor.(EnvelopeReceiver); ok {
nr.ReceiveEnvelope(Envelope{
ChannelID: chID,
Src: p,
Message: msg,
})
} else {
reactor.Receive(chID, p, msgBytes)
}
}

potential implementation https://github.com/cometbft/cometbft/pull/3230/files

this would get us to the point where multiple reactors could process messages at the same time. In the future, it might be more advantageous to refactor further, for example to enable each channel to process messages independently.

@evan-forbes evan-forbes added the WS: Big Blonks 🔭 Improving consensus critical gossiping protocols label Nov 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
p2p refactor WS: Big Blonks 🔭 Improving consensus critical gossiping protocols
Projects
None yet
Development

No branches or pull requests

1 participant