Skip to content

Commit

Permalink
MB-59601: Fix data race in CheckpointManager::takeAndResetCursors
Browse files Browse the repository at this point in the history
The method did not take a queueLock and could mutate the
CheckpointManager while it is being accessed, e.g. in
CheckpointManager::getListOfCursorsToDrop.

CheckpointMemRecoveryTask calls getListOfCursorsToDrop which iterates
CM::cursors. A concurrent RollbackTask can result in resetting the
vbucket and calling CM::takeAndResetCursors, which among others
mutates CM::cursors.

WARNING: ThreadSanitizer: data race (pid=47061)
  Write of size 8 at 0x00010d3b77a8 by main thread (mutexes: write M0, write M1, write M2):
    #0 CheckpointManager::takeAndResetCursors(CheckpointManager&) checkpoint_manager.cc:1754 (ep-engine_ep_unit_tests:arm64+0x1003c7dd8)
    #1 KVBucket::resetVBucket_UNLOCKED(LockedVBucketPtr&, std::__1::unique_lock<std::__1::mutex>&) kv_bucket.cc:1273 (ep-engine_ep_unit_tests:arm64+0x1001fc414)
    #2 KVBucket::rollback(Vbid, unsigned long long) kv_bucket.cc:2634 (ep-engine_ep_unit_tests:arm64+0x10020a910)
    #3 CheckpointRemoverTest_MB59601_Test::TestBody() checkpoint_remover_test.cc:518 (ep-engine_ep_unit_tests:arm64+0x1005d2224)
    #4 virtual thunk to CheckpointRemoverTest_MB59601_Test::TestBody() checkpoint_remover_test.cc (ep-engine_ep_unit_tests:arm64+0x1005d24e8)
    #5 void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) gtest.cc:2648 (ep-engine_ep_unit_tests:arm64+0x101b8f6bc)
    #6 <null> <null> (0x000186e390e0)

  Previous read of size 8 at 0x00010d3b77a8 by thread T1 (mutexes: write M3):
    #0 CheckpointManager::getListOfCursorsToDrop() checkpoint_manager.cc:842 (ep-engine_ep_unit_tests:arm64+0x1003c1af0)
    #1 CheckpointMemRecoveryTask::attemptCursorDropping() checkpoint_remover.cc:183 (ep-engine_ep_unit_tests:arm64+0x1003caf8c)
    #2 CheckpointMemRecoveryTask::runInner(bool) checkpoint_remover.cc:245 (ep-engine_ep_unit_tests:arm64+0x1003cb77c)
    #3 EpNotifiableTask::run() ep_task.cc:56 (ep-engine_ep_unit_tests:arm64+0x10028763c)
    #4 void* std::__1::__thread_proxy[abi:v160006]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, CheckpointRemoverTest_MB59601_Test::TestBody()::$_2::operator()() const::'lambda0'()>>(void*) thread:299 (ep-engine_ep_unit_tests:arm64+0x100600c30)

Change-Id: I15c1e9ccc6f45f3251ebd7f78649c8a446d65b54
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/203302
Reviewed-by: Vesko Karaganev <[email protected]>
Tested-by: Build Bot <[email protected]>
Reviewed-by: Paolo Cocchi <[email protected]>
  • Loading branch information
pavlosg committed Jan 12, 2024
1 parent e0e4d9d commit 7a17903
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 5 deletions.
22 changes: 17 additions & 5 deletions engines/ep/src/checkpoint_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,8 @@ std::vector<Cursor> CheckpointManager::getListOfCursorsToDrop() {
? *backupFound->second
: *persistenceCursor;

getListOfCursorsToDropHook();

for (const auto& pair : cursors) {
const auto cursor = pair.second;
// Note: Strict condition here.
Expand Down Expand Up @@ -1734,13 +1736,23 @@ void CheckpointManager::addStats(const AddStatFn& add_stat,
}

void CheckpointManager::takeAndResetCursors(CheckpointManager& other) {
pCursor = other.pCursor;
persistenceCursor = pCursor.lock().get();
for (auto& cursor : other.cursors) {
cursors[cursor.second->getName()] = cursor.second;
other.takeAndResetCursorsHook();

Cursor otherPCursor;
cursor_index otherCursors;
{
std::lock_guard<std::mutex> otherLH(other.queueLock);
otherPCursor = other.pCursor;
otherCursors = std::move(other.cursors);
other.cursors.clear();
}
other.cursors.clear();

std::lock_guard<std::mutex> lh(queueLock);
pCursor = std::move(otherPCursor);
persistenceCursor = pCursor.lock().get();
for (auto& cursor : otherCursors) {
cursors[cursor.second->getName()] = std::move(cursor.second);
}
resetCursors();
}

Expand Down
8 changes: 8 additions & 0 deletions engines/ep/src/checkpoint_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,14 @@ class CheckpointManager {
// (and not yet re-acquired) the CM::lock. Introduced in MB-56644.
TestingHook<> expelHook;

/// Testing hook called at the start of CM::takeAndResetCursors.
/// Introduced in MB-59601.
TestingHook<> takeAndResetCursorsHook;

/// Testing hook called just before iterating CM::cursors in
/// CM::getListOfCursorsToDrop. Introduced in MB-59601.
TestingHook<> getListOfCursorsToDropHook;

protected:
/**
* @param lh, the queueLock held
Expand Down
65 changes: 65 additions & 0 deletions engines/ep/tests/module_tests/checkpoint_remover_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "collections/vbucket_manifest_handles.h"
#include "dcp/response.h"
#include "test_helpers.h"
#include "thread_gate.h"
#include "vbucket.h"

#include <folly/portability/GMock.h>
Expand Down Expand Up @@ -454,6 +455,70 @@ TEST_P(CheckpointRemoverTest, MemRecoveryByCheckpointCreation) {
EXPECT_EQ(0, store->getRequiredCMMemoryReduction());
}

// Without the fix, there is a data race in
// CheckpointManager::takeAndResetCursors which did not take a queueLock,
// and could mutate the CheckpointManager while it is being accessed,
// e.g. in CheckpointManager::getListOfCursorsToDrop.
TEST_P(CheckpointRemoverTest, MB59601) {
if (!isPersistent()) {
GTEST_SKIP();
}

setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
auto& config = engine->getConfiguration();
config.setChkExpelEnabled(false);
config.setMaxSize(100UL * 1024 * 1024);
// Disable the mem-based checkpoint creation in this test, we would end up
// doing straight CheckpointRemoval rather than ItemExpel/CursorDrop
config.setCheckpointMaxSize(std::numeric_limits<size_t>::max());
const auto chkptMemRecoveryLimit =
config.getMaxSize() * store->getCheckpointMemoryRatio() *
store->getCheckpointMemoryRecoveryUpperMark();
auto& stats = engine->getEpStats();
stats.mem_low_wat.store(1);

int numItems = 0;
const std::string value(1024 * 1024, 'x');
while (stats.getCheckpointManagerEstimatedMemUsage() <
chkptMemRecoveryLimit) {
auto docKey = "key_" + std::to_string(++numItems);
store_item(vbid, makeStoredDocKey(docKey), value);
}
flushVBucketToDiskIfPersistent(vbid, numItems);

// VB needs to be replica to rollback
store->setVBucketState(vbid, vbucket_state_replica);

EXPECT_GT(stats.getNumCheckpoints(), 0);
EXPECT_GT(store->getRequiredCMMemoryReduction(), 0);

/// Synchronises just before accessing and mutating CM::cursors
ThreadGate tg(2);
std::thread bgThread;

auto& oldManager = *store->getVBucket(vbid)->checkpointManager;
oldManager.takeAndResetCursorsHook = [this, &tg, &bgThread]() {
// Note: takeAndResetCursorsHook is executed *after* the new VBucket
// has already been created

auto& newManager = *store->getVBucket(vbid)->checkpointManager;
newManager.getListOfCursorsToDropHook = [&tg]() { tg.threadUp(); };
bgThread = std::thread([this]() {
auto remover = std::make_shared<CheckpointMemRecoveryTask>(
*engine,
engine->getEpStats(),
engine->getConfiguration().getChkRemoverStime(),
0);
remover->run();
});

tg.threadUp();
};

store->rollback(vbid, 0);
bgThread.join();
}

// Test written for MB-36366. With the fix removed this test failed because
// post expel, we continued onto cursor dropping.
// MB-36447 - unreliable test, disabling for now
Expand Down

0 comments on commit 7a17903

Please sign in to comment.