Skip to content

Commit

Permalink
Merge pull request stellar#4291 from marta-lokhova/minor_improvements
Browse files Browse the repository at this point in the history
Overlay improvements

Reviewed-by: SirTyson
  • Loading branch information
latobarita authored Apr 20, 2024
2 parents 25525d4 + f4b2002 commit 97e5b18
Show file tree
Hide file tree
Showing 17 changed files with 56 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/database/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ soci::session&
Database::getSession()
{
// global session can only be used from the main thread
assertThreadIsMain();
releaseAssert(threadIsMain());
return mSession;
}

Expand Down
2 changes: 1 addition & 1 deletion src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1787,7 +1787,7 @@ HerderImpl::checkAndMaybeReanalyzeQuorumMap()
mLastQuorumMapIntersectionState.mInterruptFlag = false;
mLastQuorumMapIntersectionState.mCheckingQuorumMapHash = curr;
auto& cfg = mApp.getConfig();
assertThreadIsMain();
releaseAssert(threadIsMain());
auto seed = gRandomEngine();
auto qic = QuorumIntersectionChecker::create(
qmap, cfg, mLastQuorumMapIntersectionState.mInterruptFlag, seed);
Expand Down
2 changes: 1 addition & 1 deletion src/ledger/LedgerTxn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2501,7 +2501,7 @@ LedgerTxnRoot::Impl::addChild(AbstractLedgerTxn& child, TransactionMode mode)
{
// Read-only transactions are only allowed on the main thread to ensure
// we're not competing with writes
assertThreadIsMain();
releaseAssert(threadIsMain());
}

mChild = &child;
Expand Down
4 changes: 2 additions & 2 deletions src/main/ApplicationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ std::string
ApplicationImpl::manualClose(std::optional<uint32_t> const& manualLedgerSeq,
std::optional<TimePoint> const& manualCloseTime)
{
assertThreadIsMain();
releaseAssert(threadIsMain());

// Manual close only makes sense for validating nodes
if (!mConfig.NODE_IS_VALIDATOR)
Expand Down Expand Up @@ -1452,7 +1452,7 @@ ApplicationImpl::createDatabase()
AbstractLedgerTxnParent&
ApplicationImpl::getLedgerTxnRoot()
{
assertThreadIsMain();
releaseAssert(threadIsMain());
return mConfig.MODE_USES_IN_MEMORY_LEDGER ? *mNeverCommittingLedgerTxn
: *mLedgerTxnRoot;
}
Expand Down
11 changes: 6 additions & 5 deletions src/main/ApplicationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,6 @@ class ApplicationImpl : public Application

virtual void resetDBForInMemoryMode() override;

protected:
std::unique_ptr<LedgerManager>
mLedgerManager; // allow to change that for tests
std::unique_ptr<Herder> mHerder; // allow to change that for tests

private:
VirtualClock& mVirtualClock;
Config mConfig;
Expand All @@ -152,6 +147,12 @@ class ApplicationImpl : public Application
std::unique_ptr<BucketManager> mBucketManager;
std::unique_ptr<Database> mDatabase;
std::unique_ptr<OverlayManager> mOverlayManager;

protected:
std::unique_ptr<LedgerManager>
mLedgerManager; // allow to change that for tests
std::unique_ptr<Herder> mHerder; // allow to change that for tests
private:
std::unique_ptr<CatchupManager> mCatchupManager;
std::unique_ptr<HerderPersistence> mHerderPersistence;
std::unique_ptr<HistoryArchiveManager> mHistoryArchiveManager;
Expand Down
15 changes: 14 additions & 1 deletion src/overlay/FlowControl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ FlowControl::FlowControl(Application& app)
void
FlowControl::sendSendMore(uint32_t numMessages, std::shared_ptr<Peer> peer)
{
releaseAssert(threadIsMain());

ZoneScoped;
StellarMessage m;
m.type(SEND_MORE);
Expand All @@ -55,6 +57,8 @@ void
FlowControl::sendSendMore(uint32_t numMessages, uint32_t numBytes,
std::shared_ptr<Peer> peer)
{
releaseAssert(threadIsMain());

ZoneScoped;
StellarMessage m;
m.type(SEND_MORE_EXTENDED);
Expand All @@ -68,6 +72,7 @@ FlowControl::sendSendMore(uint32_t numMessages, uint32_t numBytes,
bool
FlowControl::hasOutboundCapacity(StellarMessage const& msg) const
{
releaseAssert(threadIsMain());
releaseAssert(mFlowControlCapacity);
return mFlowControlCapacity->hasOutboundCapacity(msg) &&
(!mFlowControlBytesCapacity ||
Expand All @@ -80,6 +85,8 @@ FlowControl::start(std::weak_ptr<Peer> peer,
std::function<void(StellarMessage const&)> sendCb,
bool enableFCBytes)
{
releaseAssert(threadIsMain());

auto peerPtr = peer.lock();
if (!peerPtr)
{
Expand Down Expand Up @@ -242,7 +249,7 @@ bool
FlowControl::maybeSendMessage(std::shared_ptr<StellarMessage const> msg)
{
ZoneScoped;
if (mApp.getOverlayManager().isFloodMessage(*msg))
if (OverlayManager::isFloodMessage(*msg))
{
addMsgAndMaybeTrimQueue(msg);
maybeSendNextBatch();
Expand Down Expand Up @@ -281,6 +288,7 @@ FlowControl::endMessageProcessing(StellarMessage const& msg,
std::weak_ptr<Peer> peer)
{
ZoneScoped;
releaseAssert(threadIsMain());

mFloodDataProcessed += mFlowControlCapacity->releaseLocalCapacity(msg);
if (mFlowControlBytesCapacity)
Expand Down Expand Up @@ -339,6 +347,7 @@ bool
FlowControl::isSendMoreValid(StellarMessage const& msg,
std::string& errorMsg) const
{
releaseAssert(threadIsMain());
bool sendMoreExtendedType =
mFlowControlBytesCapacity && msg.type() == SEND_MORE_EXTENDED;
bool sendMoreType = !mFlowControlBytesCapacity && msg.type() == SEND_MORE;
Expand Down Expand Up @@ -385,6 +394,7 @@ bool
dropMessageAfterTimeout(FlowControl::QueuedOutboundMessage const& queuedMsg,
VirtualClock::time_point now)
{
releaseAssert(threadIsMain());
auto const& msg = *(queuedMsg.mMessage);
bool dropType = msg.type() == TRANSACTION || msg.type() == FLOOD_ADVERT ||
msg.type() == FLOOD_DEMAND;
Expand Down Expand Up @@ -554,6 +564,8 @@ FlowControl::addMsgAndMaybeTrimQueue(std::shared_ptr<StellarMessage const> msg)
Json::Value
FlowControl::getFlowControlJsonInfo(bool compact) const
{
releaseAssert(threadIsMain());

Json::Value res;
if (mFlowControlCapacity->getCapacity().mTotalCapacity)
{
Expand Down Expand Up @@ -608,5 +620,6 @@ FlowControl::FlowControlMetrics::FlowControlMetrics()
Peer::PEER_METRICS_RATE_UNIT,
Peer::PEER_METRICS_WINDOW_SIZE))
{
releaseAssert(threadIsMain());
}
}
6 changes: 3 additions & 3 deletions src/overlay/FlowControlCapacity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ void
FlowControlCapacity::lockOutboundCapacity(StellarMessage const& msg)
{
ZoneScoped;
if (mApp.getOverlayManager().isFloodMessage(msg))
if (OverlayManager::isFloodMessage(msg))
{
releaseAssert(hasOutboundCapacity(msg));
mOutboundCapacity -= getMsgResourceCount(msg);
Expand All @@ -152,7 +152,7 @@ FlowControlCapacity::lockLocalCapacity(StellarMessage const& msg)
*mCapacity.mTotalCapacity -= msgResources;
}

if (mApp.getOverlayManager().isFloodMessage(msg))
if (OverlayManager::isFloodMessage(msg))
{
// No capacity to process flood message
if (mCapacity.mFloodCapacity < msgResources)
Expand Down Expand Up @@ -182,7 +182,7 @@ FlowControlCapacity::releaseLocalCapacity(StellarMessage const& msg)
*mCapacity.mTotalCapacity += resourcesFreed;
}

if (mApp.getOverlayManager().isFloodMessage(msg))
if (OverlayManager::isFloodMessage(msg))
{
if (mCapacity.mFloodCapacity == 0)
{
Expand Down
3 changes: 1 addition & 2 deletions src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class OverlayManager

// Drop all PeerRecords from the Database
static void dropAll(Database& db);
static bool isFloodMessage(StellarMessage const& msg);

// Flush all FloodGate and ItemFetcher state for ledgers older than
// `ledgerSeq`.
Expand Down Expand Up @@ -141,8 +142,6 @@ class OverlayManager
virtual bool isPossiblyPreferred(std::string const& ip) const = 0;
virtual bool haveSpaceForConnection(std::string const& ip) const = 0;

virtual bool isFloodMessage(StellarMessage const& msg) = 0;

// Return the current in-memory set of inbound pending peers.
virtual std::vector<Peer::pointer> const&
getInboundPendingPeers() const = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ OverlayManagerImpl::isPreferred(Peer* peer) const
}

bool
OverlayManagerImpl::isFloodMessage(StellarMessage const& msg)
OverlayManager::isFloodMessage(StellarMessage const& msg)
{
return msg.type() == SCP_MESSAGE || msg.type() == TRANSACTION ||
msg.type() == FLOOD_DEMAND || msg.type() == FLOOD_ADVERT;
Expand Down
1 change: 0 additions & 1 deletion src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ class OverlayManagerImpl : public OverlayManager

bool acceptAuthenticatedPeer(Peer::pointer peer) override;
bool isPreferred(Peer* peer) const override;
bool isFloodMessage(StellarMessage const& msg) override;
std::vector<Peer::pointer> const& getInboundPendingPeers() const override;
std::vector<Peer::pointer> const& getOutboundPendingPeers() const override;
std::vector<Peer::pointer> getPendingPeers() const override;
Expand Down
1 change: 1 addition & 0 deletions src/overlay/PeerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ PeerManager::removePeersWithManyFailures(size_t minNumFailures,
PeerBareAddress const* address)
{
ZoneScoped;
releaseAssert(threadIsMain());
try
{
auto& db = mApp.getDatabase();
Expand Down
24 changes: 12 additions & 12 deletions src/overlay/TCPPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ TCPPeer::initiate(Application& app, PeerBareAddress const& address)
releaseAssert(address.getType() == PeerBareAddress::Type::IPv4);

CLOG_DEBUG(Overlay, "TCPPeer:initiate to {}", address.toString());
assertThreadIsMain();
releaseAssert(threadIsMain());
auto socket = make_shared<SocketType>(app.getClock().getIOContext(), BUFSZ);
auto result = make_shared<TCPPeer>(app, WE_CALLED_REMOTE, socket);
result->mAddress = address;
Expand Down Expand Up @@ -90,7 +90,7 @@ TCPPeer::initiate(Application& app, PeerBareAddress const& address)
TCPPeer::pointer
TCPPeer::accept(Application& app, shared_ptr<TCPPeer::SocketType> socket)
{
assertThreadIsMain();
releaseAssert(threadIsMain());

auto extractIP = [](shared_ptr<SocketType> socket) {
std::string result;
Expand Down Expand Up @@ -152,7 +152,7 @@ TCPPeer::accept(Application& app, shared_ptr<TCPPeer::SocketType> socket)

TCPPeer::~TCPPeer()
{
assertThreadIsMain();
releaseAssert(threadIsMain());
Peer::shutdown();
if (mRole == REMOTE_CALLED_US)
{
Expand Down Expand Up @@ -181,7 +181,7 @@ TCPPeer::sendMessage(xdr::msg_ptr&& xdrBytes)
return;
}

assertThreadIsMain();
releaseAssert(threadIsMain());

TimestampedMessage msg;
msg.mEnqueuedTime = mApp.getClock().now();
Expand Down Expand Up @@ -263,7 +263,7 @@ void
TCPPeer::messageSender()
{
ZoneScoped;
assertThreadIsMain();
releaseAssert(threadIsMain());

// if nothing to do, mark progress and return.
if (mWriteQueue.empty())
Expand Down Expand Up @@ -372,7 +372,7 @@ TCPPeer::writeHandler(asio::error_code const& error,
size_t messages_transferred)
{
ZoneScoped;
assertThreadIsMain();
releaseAssert(threadIsMain());
mLastWrite = mApp.getClock().now();

if (error)
Expand Down Expand Up @@ -473,7 +473,7 @@ TCPPeer::scheduleRead()

releaseAssert(canRead());

assertThreadIsMain();
releaseAssert(threadIsMain());
if (shouldAbort())
{
return;
Expand All @@ -488,7 +488,7 @@ void
TCPPeer::startRead()
{
ZoneScoped;
assertThreadIsMain();
releaseAssert(threadIsMain());
releaseAssert(canRead());
if (shouldAbort())
{
Expand Down Expand Up @@ -637,7 +637,7 @@ void
TCPPeer::readHeaderHandler(asio::error_code const& error,
std::size_t bytes_transferred)
{
assertThreadIsMain();
releaseAssert(threadIsMain());
if (error)
{
noteErrorReadHeader(bytes_transferred, error);
Expand Down Expand Up @@ -669,7 +669,7 @@ TCPPeer::readBodyHandler(asio::error_code const& error,
std::size_t bytes_transferred,
std::size_t expected_length)
{
assertThreadIsMain();
releaseAssert(threadIsMain());

if (error)
{
Expand Down Expand Up @@ -706,7 +706,7 @@ void
TCPPeer::recvMessage()
{
ZoneScoped;
assertThreadIsMain();
releaseAssert(threadIsMain());
releaseAssert(canRead());

try
Expand Down Expand Up @@ -737,7 +737,7 @@ void
TCPPeer::drop(std::string const& reason, DropDirection dropDirection,
DropMode dropMode)
{
assertThreadIsMain();
releaseAssert(threadIsMain());
if (shouldAbort())
{
return;
Expand Down
10 changes: 5 additions & 5 deletions src/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ struct TestContextListener : Catch::TestEventListenerBase
{
if (gTestTxMetaMode != TestTxMetaMode::META_TEST_IGNORE)
{
assertThreadIsMain();
releaseAssert(threadIsMain());
releaseAssert(!sTestCtx.has_value());
sTestCtx.emplace(testInfo);
}
Expand All @@ -106,7 +106,7 @@ struct TestContextListener : Catch::TestEventListenerBase
{
if (gTestTxMetaMode != TestTxMetaMode::META_TEST_IGNORE)
{
assertThreadIsMain();
releaseAssert(threadIsMain());
releaseAssert(sTestCtx.has_value());
releaseAssert(sSectCtx.empty());
sTestCtx.reset();
Expand All @@ -117,7 +117,7 @@ struct TestContextListener : Catch::TestEventListenerBase
{
if (gTestTxMetaMode != TestTxMetaMode::META_TEST_IGNORE)
{
assertThreadIsMain();
releaseAssert(threadIsMain());
sSectCtx.emplace_back(sectionInfo);
}
}
Expand All @@ -126,7 +126,7 @@ struct TestContextListener : Catch::TestEventListenerBase
{
if (gTestTxMetaMode != TestTxMetaMode::META_TEST_IGNORE)
{
assertThreadIsMain();
releaseAssert(threadIsMain());
sSectCtx.pop_back();
}
}
Expand Down Expand Up @@ -584,7 +584,7 @@ logFatalAndThrow(std::string const& msg)
static std::pair<stdfs::path, std::string>
getCurrentTestContext()
{
assertThreadIsMain();
releaseAssert(threadIsMain());

releaseAssert(TestContextListener::sTestCtx.has_value());
auto& tc = TestContextListener::sTestCtx.value();
Expand Down
2 changes: 2 additions & 0 deletions src/transactions/TransactionFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ TransactionFrame::TransactionFrame(Hash const& networkID,
Hash const&
TransactionFrame::getFullHash() const
{
ZoneScoped;
if (isZero(mFullHash))
{
mFullHash = xdrSha256(mEnvelope);
Expand All @@ -93,6 +94,7 @@ TransactionFrame::getFullHash() const
Hash const&
TransactionFrame::getContentsHash() const
{
ZoneScoped;
#ifdef _DEBUG
// force recompute
Hash oldHash;
Expand Down
Loading

0 comments on commit 97e5b18

Please sign in to comment.