-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
remove mutex for write #197
base: 6.4.tikv
Are you sure you want to change the base?
Changes from 12 commits
5730f5e
1ca2b0e
add5369
cf42e20
c9493c7
9da6f60
b84bd77
b914b6b
914ca62
1f0b313
d17b969
59ab24b
46d7b19
55409a2
5a17178
bc172cb
6bacce4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -169,7 +169,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, | |||||
log_dir_synced_(false), | ||||||
log_empty_(true), | ||||||
persist_stats_cf_handle_(nullptr), | ||||||
log_sync_cv_(&mutex_), | ||||||
log_sync_cv_(&log_write_mutex_), | ||||||
total_log_size_(0), | ||||||
is_snapshot_supported_(true), | ||||||
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), | ||||||
|
@@ -258,6 +258,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, | |||||
// we won't drop any deletion markers until SetPreserveDeletesSequenceNumber() | ||||||
// is called by client and this seqnum is advanced. | ||||||
preserve_deletes_seqnum_.store(0); | ||||||
max_total_wal_size_.store(mutable_db_options_.max_total_wal_size, | ||||||
std::memory_order_relaxed); | ||||||
} | ||||||
|
||||||
Status DBImpl::Resume() { | ||||||
|
@@ -526,25 +528,28 @@ Status DBImpl::CloseHelper() { | |||||
mutex_.Lock(); | ||||||
} | ||||||
|
||||||
for (auto l : logs_to_free_) { | ||||||
delete l; | ||||||
} | ||||||
for (auto& log : logs_) { | ||||||
uint64_t log_number = log.writer->get_log_number(); | ||||||
Status s = log.ClearWriter(); | ||||||
if (!s.ok()) { | ||||||
ROCKS_LOG_WARN( | ||||||
immutable_db_options_.info_log, | ||||||
"Unable to Sync WAL file %s with error -- %s", | ||||||
LogFileName(immutable_db_options_.wal_dir, log_number).c_str(), | ||||||
s.ToString().c_str()); | ||||||
// Retain the first error | ||||||
if (ret.ok()) { | ||||||
ret = s; | ||||||
{ | ||||||
InstrumentedMutexLock lock(&log_write_mutex_); | ||||||
for (auto l : logs_to_free_) { | ||||||
delete l; | ||||||
} | ||||||
for (auto& log : logs_) { | ||||||
uint64_t log_number = log.writer->get_log_number(); | ||||||
Status s = log.ClearWriter(); | ||||||
if (!s.ok()) { | ||||||
ROCKS_LOG_WARN( | ||||||
immutable_db_options_.info_log, | ||||||
"Unable to Sync WAL file %s with error -- %s", | ||||||
LogFileName(immutable_db_options_.wal_dir, log_number).c_str(), | ||||||
s.ToString().c_str()); | ||||||
// Retain the first error | ||||||
if (ret.ok()) { | ||||||
ret = s; | ||||||
} | ||||||
} | ||||||
} | ||||||
logs_.clear(); | ||||||
} | ||||||
logs_.clear(); | ||||||
|
||||||
// Table cache may have table handles holding blocks from the block cache. | ||||||
// We need to release them before the block cache is destroyed. The block | ||||||
|
@@ -1014,11 +1019,12 @@ Status DBImpl::SetDBOptions( | |||||
mutable_db_options_.max_background_jobs, | ||||||
mutable_db_options_.base_background_compactions, | ||||||
/* parallelize_compactions */ true); | ||||||
const BGJobLimits new_bg_job_limits = GetBGJobLimits( | ||||||
new_options.max_background_flushes, | ||||||
new_options.max_background_compactions, | ||||||
new_options.max_background_jobs, | ||||||
new_options.base_background_compactions, /* parallelize_compactions */ true); | ||||||
const BGJobLimits new_bg_job_limits = | ||||||
GetBGJobLimits(new_options.max_background_flushes, | ||||||
new_options.max_background_compactions, | ||||||
new_options.max_background_jobs, | ||||||
new_options.base_background_compactions, | ||||||
/* parallelize_compactions */ true); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that all code of rocksdb place the comment before param There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're half right. It's a per-file styling and this file happens to use postposition which is rare within the whole codebase... |
||||||
|
||||||
const bool max_flushes_increased = | ||||||
new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes; | ||||||
|
@@ -1072,6 +1078,11 @@ Status DBImpl::SetDBOptions( | |||||
thread_persist_stats_.reset(); | ||||||
} | ||||||
} | ||||||
if (new_options.max_total_wal_size != | ||||||
mutable_db_options_.max_total_wal_size) { | ||||||
max_total_wal_size_.store(new_options.max_total_wal_size, | ||||||
std::memory_order_release); | ||||||
} | ||||||
write_controller_.set_max_delayed_write_rate( | ||||||
new_options.delayed_write_rate); | ||||||
table_cache_.get()->SetCapacity(new_options.max_open_files == -1 | ||||||
|
@@ -1187,7 +1198,7 @@ Status DBImpl::SyncWAL() { | |||||
uint64_t current_log_number; | ||||||
|
||||||
{ | ||||||
InstrumentedMutexLock l(&mutex_); | ||||||
InstrumentedMutexLock l(&log_write_mutex_); | ||||||
assert(!logs_.empty()); | ||||||
|
||||||
// This SyncWAL() call only cares about logs up to this number. | ||||||
|
@@ -1235,7 +1246,7 @@ Status DBImpl::SyncWAL() { | |||||
|
||||||
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); | ||||||
{ | ||||||
InstrumentedMutexLock l(&mutex_); | ||||||
InstrumentedMutexLock l(&log_write_mutex_); | ||||||
MarkLogsSynced(current_log_number, need_log_dir_sync, status); | ||||||
} | ||||||
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); | ||||||
|
@@ -1264,7 +1275,7 @@ Status DBImpl::UnlockWAL() { | |||||
|
||||||
void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, | ||||||
const Status& status) { | ||||||
mutex_.AssertHeld(); | ||||||
log_write_mutex_.AssertHeld(); | ||||||
if (synced_dir && logfile_number_ == up_to && status.ok()) { | ||||||
log_dir_synced_ = true; | ||||||
} | ||||||
|
@@ -1273,8 +1284,6 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, | |||||
assert(log.getting_synced); | ||||||
if (status.ok() && logs_.size() > 1) { | ||||||
logs_to_free_.push_back(log.ReleaseWriter()); | ||||||
// To modify logs_ both mutex_ and log_write_mutex_ must be held | ||||||
InstrumentedMutexLock l(&log_write_mutex_); | ||||||
it = logs_.erase(it); | ||||||
} else { | ||||||
log.getting_synced = false; | ||||||
|
@@ -2123,7 +2132,6 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, | |||||
s = cfd->AddDirectories(); | ||||||
} | ||||||
if (s.ok()) { | ||||||
single_column_family_mode_ = false; | ||||||
auto* cfd = | ||||||
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); | ||||||
assert(cfd != nullptr); | ||||||
|
@@ -2140,6 +2148,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, | |||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, | ||||||
"Created column family [%s] (ID %u)", | ||||||
column_family_name.c_str(), (unsigned)cfd->GetID()); | ||||||
single_column_family_mode_.store(false, std::memory_order_release); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why move this line from 2135? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I forget why I place it here.... Maybe I can try to place it back to origin |
||||||
} else { | ||||||
ROCKS_LOG_ERROR(immutable_db_options_.info_log, | ||||||
"Creating column family [%s] FAILED -- %s", | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -936,10 +936,10 @@ class DBImpl : public DB { | |
|
||
// only used for dynamically adjusting max_total_wal_size. it is a sum of | ||
// [write_buffer_size * max_write_buffer_number] over all column families | ||
uint64_t max_total_in_memory_state_; | ||
std::atomic<uint64_t> max_total_in_memory_state_; | ||
// If true, we have only one (default) column family. We use this to optimize | ||
// some code-paths | ||
bool single_column_family_mode_; | ||
std::atomic<bool> single_column_family_mode_; | ||
|
||
// The options to access storage files | ||
const EnvOptions env_options_; | ||
|
@@ -1131,7 +1131,13 @@ class DBImpl : public DB { | |
} | ||
} | ||
}; | ||
|
||
struct LogContext { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add newlines around this struct definition. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
explicit LogContext(bool need_sync = false) | ||
: need_log_sync(need_sync), need_log_dir_sync(need_sync) {} | ||
bool need_log_sync; | ||
bool need_log_dir_sync; | ||
log::Writer* writer; | ||
}; | ||
struct LogFileNumberSize { | ||
explicit LogFileNumberSize(uint64_t _number) : number(_number) {} | ||
void AddSize(uint64_t new_size) { size += new_size; } | ||
|
@@ -1401,8 +1407,8 @@ class DBImpl : public DB { | |
Status HandleWriteBufferFull(WriteContext* write_context); | ||
|
||
// REQUIRES: mutex locked | ||
Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, | ||
WriteContext* write_context); | ||
Status PreprocessWrite(const WriteOptions& write_options, | ||
LogContext* log_context, WriteContext* write_context); | ||
|
||
WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group, | ||
WriteBatch* tmp_batch, size_t* write_with_wal, | ||
|
@@ -1661,11 +1667,10 @@ class DBImpl : public DB { | |
std::deque<LogFileNumberSize> alive_log_files_; | ||
// Log files that aren't fully synced, and the current log file. | ||
// Synchronization: | ||
// - push_back() is done from write_thread_ with locked mutex_ and | ||
// log_write_mutex_ | ||
// - push_back() is done from write_thread_ with locked log_write_mutex_ | ||
// - pop_front() is done from any thread with locked mutex_ and | ||
// log_write_mutex_ | ||
// - reads are done with either locked mutex_ or log_write_mutex_ | ||
// - reads are done with locked log_write_mutex_ | ||
// - back() and items with getting_synced=true are not popped, | ||
// - The same thread that sets getting_synced=true will reset it. | ||
// - it follows that the object referred by back() can be safely read from | ||
|
@@ -1687,7 +1692,7 @@ class DBImpl : public DB { | |
std::atomic<uint64_t> total_log_size_; | ||
|
||
// If this is non-empty, we need to delete these log files in background | ||
// threads. Protected by db mutex. | ||
// threads. Protected by db log_write_mutex_. | ||
autovector<log::Writer*> logs_to_free_; | ||
|
||
bool is_snapshot_supported_; | ||
|
@@ -1934,6 +1939,7 @@ class DBImpl : public DB { | |
InstrumentedCondVar atomic_flush_install_cv_; | ||
|
||
bool wal_in_db_path_; | ||
std::atomic<uint64_t> max_total_wal_size_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put it together with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
}; | ||
|
||
extern Options SanitizeOptions(const std::string& db, const Options& src); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,7 +80,7 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, | |
|
||
Status DBImpl::SyncClosedLogs(JobContext* job_context) { | ||
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); | ||
mutex_.AssertHeld(); | ||
InstrumentedMutexLock l(&log_write_mutex_); | ||
autovector<log::Writer*, 1> logs_to_sync; | ||
uint64_t current_log_number = logfile_number_; | ||
while (logs_.front().number < current_log_number && | ||
|
@@ -97,7 +97,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { | |
|
||
Status s; | ||
if (!logs_to_sync.empty()) { | ||
mutex_.Unlock(); | ||
log_write_mutex_.Unlock(); | ||
|
||
for (log::Writer* log : logs_to_sync) { | ||
ROCKS_LOG_INFO(immutable_db_options_.info_log, | ||
|
@@ -119,13 +119,12 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { | |
s = directories_.GetWalDir()->Fsync(); | ||
} | ||
|
||
mutex_.Lock(); | ||
log_write_mutex_.Lock(); | ||
|
||
// "number <= current_log_number - 1" is equivalent to | ||
// "number < current_log_number". | ||
MarkLogsSynced(current_log_number - 1, true, s); | ||
if (!s.ok()) { | ||
error_handler_.SetBGError(s, BackgroundErrorReason::kFlush); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where do you set this error now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will be saved at "db/db_impl/db_impl_compaction_flush.cc" L549 and L210 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this function, we will not hold mutex so that we can not save BG error here. |
||
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); | ||
return s; | ||
} | ||
|
@@ -175,7 +174,9 @@ Status DBImpl::FlushMemTableToOutputFile( | |
// flushed SST may contain data from write batches whose updates to | ||
// other column families are missing. | ||
// SyncClosedLogs() may unlock and re-lock the db_mutex. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "db_mutex" -> log_write_mutex_ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
mutex_.Unlock(); | ||
s = SyncClosedLogs(job_context); | ||
mutex_.Lock(); | ||
} else { | ||
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip"); | ||
} | ||
|
@@ -357,7 +358,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( | |
if (logfile_number_ > 0) { | ||
// TODO (yanqin) investigate whether we should sync the closed logs for | ||
// single column family case. | ||
mutex_.Unlock(); | ||
s = SyncClosedLogs(job_context); | ||
mutex_.Lock(); | ||
} | ||
|
||
// exec_status stores the execution status of flush_jobs as | ||
|
@@ -1988,7 +1991,8 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, | |
} | ||
if (!parallelize_compactions) { | ||
// throttle background compactions until we deem necessary | ||
res.max_compactions = std::max(1, std::min(base_background_compactions, res.max_compactions)); | ||
res.max_compactions = | ||
std::max(1, std::min(base_background_compactions, res.max_compactions)); | ||
} | ||
return res; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -180,6 +180,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, | |
|
||
// logs_ is empty when called during recovery, in which case there can't yet | ||
// be any tracked obsolete logs | ||
InstrumentedMutexLock l(&log_write_mutex_); | ||
if (!alive_log_files_.empty() && !logs_.empty()) { | ||
uint64_t min_log_number = job_context->log_number; | ||
size_t num_alive_log_files = alive_log_files_.size(); | ||
|
@@ -201,13 +202,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, | |
} | ||
job_context->size_log_to_delete += earliest.size; | ||
total_log_size_ -= earliest.size; | ||
if (two_write_queues_) { | ||
log_write_mutex_.Lock(); | ||
} | ||
alive_log_files_.pop_front(); | ||
if (two_write_queues_) { | ||
log_write_mutex_.Unlock(); | ||
} | ||
|
||
// Current log should always stay alive since it can't have | ||
// number < MinLogNumber(). | ||
assert(alive_log_files_.size()); | ||
|
@@ -220,10 +216,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, | |
continue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, it will release the pairing mutex There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch... I'm thinking about how to solve this problem.... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure whether it is still correctly to release There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Maybe you can skip those unsync-ed log files, and clean them in the next round? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK I will unlock |
||
} | ||
logs_to_free_.push_back(log.ReleaseWriter()); | ||
{ | ||
InstrumentedMutexLock wl(&log_write_mutex_); | ||
logs_.pop_front(); | ||
} | ||
logs_.pop_front(); | ||
} | ||
// Current log cannot be obsolete. | ||
assert(!logs_.empty()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the benefit to make logs_to_free_ be protected by both log_write_mutex_ and mutex_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes no sense. But i think unlock
mutex_
during this function may cause other bad case.