diff --git a/CMakeLists.txt b/CMakeLists.txt index 4d17a7ff6..f1a31c173 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,11 +5,11 @@ enable_language(C) find_package(Git) if (NOT ROCKSDB_GIT_REPO) - set(ROCKSDB_GIT_REPO "https://github.com/tikv/rocksdb.git") + set(ROCKSDB_GIT_REPO "https://github.com/v01dstar/rocksdb.git") endif() if (NOT ROCKSDB_GIT_BRANCH) - set(ROCKSDB_GIT_BRANCH "6.29.tikv") # should at least or newer than commit dcf2f8d56092285381be2acbf8f04b8aeeb7ad79 + set(ROCKSDB_GIT_BRANCH "8.10-tikv") # should at least or newer than commit dcf2f8d56092285381be2acbf8f04b8aeeb7ad79 endif() if (NOT DEFINED ROCKSDB_DIR) @@ -92,7 +92,7 @@ if (WITH_TITAN_TESTS OR WITH_TITAN_TOOLS) add_subdirectory(${ROCKSDB_DIR} rocksdb EXCLUDE_FROM_ALL) # Check if -latomic is required or not if (NOT MSVC) - set(CMAKE_REQUIRED_FLAGS "--std=c++11") + set(CMAKE_REQUIRED_FLAGS "--std=c++17") CHECK_CXX_SOURCE_COMPILES(" #include std::atomic x(0); @@ -111,7 +111,7 @@ endif() # Check if -latomic is required or not if (NOT MSVC) - set(CMAKE_REQUIRED_FLAGS "--std=c++11") + set(CMAKE_REQUIRED_FLAGS "--std=c++17") CHECK_CXX_SOURCE_COMPILES(" #include std::atomic x(0); diff --git a/cmake/rocksdb_flags.cmake b/cmake/rocksdb_flags.cmake index fa4199754..c2086b616 100644 --- a/cmake/rocksdb_flags.cmake +++ b/cmake/rocksdb_flags.cmake @@ -63,7 +63,7 @@ else() if(MINGW) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format") endif() - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17") if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer") include(CheckCXXCompilerFlag) diff --git a/include/titan/options.h b/include/titan/options.h index 47a920578..b02da2d96 100644 --- a/include/titan/options.h +++ b/include/titan/options.h @@ -3,6 +3,7 @@ #include #include +#include "rocksdb/advanced_cache.h" #include "rocksdb/options.h" namespace rocksdb { diff --git a/src/blob_file_cache.cc b/src/blob_file_cache.cc index ed757d6b0..5a6841f67 100644 --- a/src/blob_file_cache.cc +++ b/src/blob_file_cache.cc @@ -1,6 +1,7 @@ #include "blob_file_cache.h" #include "file/filename.h" +#include "rocksdb/advanced_cache.h" #include "util.h" @@ -9,6 +10,9 @@ namespace titandb { namespace { +const Cache::CacheItemHelper kBlobFileReaderCacheItemHelper( + CacheEntryRole::kBlockBasedTableReader, &DeleteCacheValue); + Slice EncodeFileNumber(const uint64_t* number) { return Slice(reinterpret_cast(number), sizeof(*number)); } @@ -87,8 +91,8 @@ Status BlobFileCache::GetBlobFileReaderHandle(uint64_t file_number, stats_); if (!s.ok()) return s; - cache_->Insert(cache_key, reader.release(), 1, - &DeleteCacheValue, handle); + cache_->Insert(cache_key, reader.release(), &kBlobFileReaderCacheItemHelper, + 1, handle); return s; } diff --git a/src/blob_file_iterator.cc b/src/blob_file_iterator.cc index 7b4d51f3f..45b9683f1 100644 --- a/src/blob_file_iterator.cc +++ b/src/blob_file_iterator.cc @@ -22,11 +22,10 @@ BlobFileIterator::~BlobFileIterator() {} bool BlobFileIterator::Init() { Slice slice; char header_buf[BlobFileHeader::kV3EncodedLength]; - // With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator - // is only used for GC, we always set for_compaction to true. - status_ = - file_->Read(IOOptions(), 0, BlobFileHeader::kV3EncodedLength, &slice, - header_buf, nullptr /*aligned_buf*/, true /*for_compaction*/); + IOOptions io_options; + io_options.rate_limiter_priority = Env::IOPriority::IO_LOW; + status_ = file_->Read(io_options, 0, BlobFileHeader::kV3EncodedLength, &slice, + header_buf, nullptr /*aligned_buf*/); if (!status_.ok()) { return false; } @@ -39,12 +38,9 @@ bool BlobFileIterator::Init() { header_size_ = blob_file_header.size(); char footer_buf[BlobFileFooter::kEncodedLength]; - // With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator - // is only used for GC, we always set for_compaction to true. - status_ = - file_->Read(IOOptions(), file_size_ - BlobFileFooter::kEncodedLength, - BlobFileFooter::kEncodedLength, &slice, footer_buf, - nullptr /*aligned_buf*/, true /*for_compaction*/); + status_ = file_->Read(io_options, file_size_ - BlobFileFooter::kEncodedLength, + BlobFileFooter::kEncodedLength, &slice, footer_buf, + nullptr /*aligned_buf*/); if (!status_.ok()) return false; BlobFileFooter blob_file_footer; status_ = blob_file_footer.DecodeFrom(&slice); @@ -125,13 +121,22 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) { uint64_t total_length = 0; FixedSlice header_buffer; iterate_offset_ = header_size_; +<<<<<<< HEAD while (iterate_offset_ < offset) { // With for_compaction=true, rate_limiter is enabled. Since // BlobFileIterator is only used for GC, we always set for_compaction to // true. status_ = file_->Read(IOOptions(), iterate_offset_, kRecordHeaderSize, +======= + IOOptions io_options; + // Since BlobFileIterator is only used for GC, we always set IO priority to + // low. + io_options.rate_limiter_priority = Env::IOPriority::IO_LOW; + for (; iterate_offset_ < offset; iterate_offset_ += total_length) { + status_ = file_->Read(io_options, iterate_offset_, kRecordHeaderSize, +>>>>>>> 0e45324 (Fix compatibility) &header_buffer, header_buffer.get(), - nullptr /*aligned_buf*/, true /*for_compaction*/); + nullptr /*aligned_buf*/); if (!status_.ok()) return; status_ = decoder_.DecodeHeader(&header_buffer); if (!status_.ok()) return; @@ -148,11 +153,13 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) { void BlobFileIterator::GetBlobRecord() { FixedSlice header_buffer; - // With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator - // is only used for GC, we always set for_compaction to true. - status_ = file_->Read(IOOptions(), iterate_offset_, kRecordHeaderSize, - &header_buffer, header_buffer.get(), - nullptr /*aligned_buf*/, true /*for_compaction*/); + // Since BlobFileIterator is only used for GC, we always set IO priority to + // low. + IOOptions io_options; + io_options.rate_limiter_priority = Env::IOPriority::IO_LOW; + status_ = + file_->Read(io_options, iterate_offset_, kRecordHeaderSize, + &header_buffer, header_buffer.get(), nullptr /*aligned_buf*/); if (!status_.ok()) return; status_ = decoder_.DecodeHeader(&header_buffer); if (!status_.ok()) return; @@ -160,11 +167,9 @@ void BlobFileIterator::GetBlobRecord() { Slice record_slice; auto record_size = decoder_.GetRecordSize(); buffer_.resize(record_size); - // With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator - // is only used for GC, we always set for_compaction to true. - status_ = file_->Read(IOOptions(), iterate_offset_ + kRecordHeaderSize, - record_size, &record_slice, buffer_.data(), - nullptr /*aligned_buf*/, true /*for_compaction*/); + status_ = + file_->Read(io_options, iterate_offset_ + kRecordHeaderSize, record_size, + &record_slice, buffer_.data(), nullptr /*aligned_buf*/); if (status_.ok()) { status_ = decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_, @@ -199,7 +204,9 @@ void BlobFileIterator::PrefetchAndGet() { while (readahead_end_offset_ + readahead_size_ <= min_blob_size && readahead_size_ < kMaxReadaheadSize) readahead_size_ <<= 1; - file_->Prefetch(readahead_end_offset_, readahead_size_); + IOOptions io_options; + io_options.rate_limiter_priority = Env::IOPriority::IO_LOW; + file_->Prefetch(io_options, readahead_end_offset_, readahead_size_); readahead_end_offset_ += readahead_size_; readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1); } diff --git a/src/blob_file_reader.cc b/src/blob_file_reader.cc index 0fb5d1771..6e8594429 100644 --- a/src/blob_file_reader.cc +++ b/src/blob_file_reader.cc @@ -142,8 +142,8 @@ Status BlobFileReader::Get(const ReadOptions& _options, } if (handle.size != static_cast(blob.size())) { return Status::Corruption( - "ReadRecord actual size: " + ToString(blob.size()) + - " not equal to blob size " + ToString(handle.size)); + "ReadRecord actual size: " + std::to_string(blob.size()) + + " not equal to blob size " + std::to_string(handle.size)); } BlobDecoder decoder(uncompression_dict_ == nullptr @@ -165,7 +165,9 @@ Status BlobFilePrefetcher::Get(const ReadOptions& options, last_offset_ = handle.offset + handle.size; if (handle.offset + handle.size > readahead_limit_) { readahead_size_ = std::max(handle.size, readahead_size_); - reader_->file_->Prefetch(handle.offset, readahead_size_); + IOOptions io_options; + io_options.rate_limiter_priority = Env::IOPriority::IO_HIGH; + reader_->file_->Prefetch(io_options, handle.offset, readahead_size_); readahead_limit_ = handle.offset + readahead_size_; readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ * 2); } diff --git a/src/blob_file_size_collector_test.cc b/src/blob_file_size_collector_test.cc index 182819b60..2f33b489b 100644 --- a/src/blob_file_size_collector_test.cc +++ b/src/blob_file_size_collector_test.cc @@ -78,7 +78,7 @@ class BlobFileSizeCollectorTest : public testing::Test { void NewTableReader(std::unique_ptr&& file, std::unique_ptr* result) { TableReaderOptions options(ioptions_, prefix_extractor_, env_options_, - cf_ioptions_.internal_comparator); + cf_ioptions_.internal_comparator, 0); uint64_t file_size = 0; ASSERT_OK(env_->GetFileSize(file->file_name(), &file_size)); ASSERT_TRUE(file_size > 0); diff --git a/src/blob_format.h b/src/blob_format.h index 1a72791e0..38ff63245 100644 --- a/src/blob_format.h +++ b/src/blob_format.h @@ -64,7 +64,7 @@ class BlobEncoder { BlobEncoder(CompressionType compression, CompressionOptions compression_opt, const CompressionDict* compression_dict) : compression_opt_(compression_opt), - compression_ctx_(compression), + compression_ctx_(compression, compression_opt_), compression_dict_(compression_dict), compression_info_(new CompressionInfo( compression_opt_, compression_ctx_, *compression_dict_, compression, @@ -347,7 +347,7 @@ struct BlobFileHeader { if (ver != BlobFileHeader::kVersion1 && ver != BlobFileHeader::kVersion2 && ver != BlobFileHeader::kVersion3) { return Status::InvalidArgument("unrecognized blob file version " + - ToString(ver)); + std::to_string(ver)); } return Status::OK(); } diff --git a/src/blob_gc_job_test.cc b/src/blob_gc_job_test.cc index 8fabce4e9..79982122d 100644 --- a/src/blob_gc_job_test.cc +++ b/src/blob_gc_job_test.cc @@ -33,16 +33,22 @@ class BlobGCJobTest : public testing::Test { TitanOptions options_; port::Mutex* mutex_; - BlobGCJobTest() : dbname_(test::TmpDir()) { + void ResetOptions() { options_.dirname = dbname_ + "/titandb"; options_.create_if_missing = true; options_.disable_background_gc = true; options_.min_blob_size = 0; options_.disable_auto_compactions = true; + options_.level_compaction_dynamic_level_bytes = false; options_.env->CreateDirIfMissing(dbname_); options_.env->CreateDirIfMissing(options_.dirname); } - ~BlobGCJobTest() { Close(); } + + BlobGCJobTest() : dbname_(test::TmpDir()) { ResetOptions(); } + ~BlobGCJobTest() { + Close(); + ResetOptions(); + } void DisableMergeSmall() { options_.merge_small_file_threshold = 0; } diff --git a/src/blob_storage.cc b/src/blob_storage.cc index b314c047d..95cd8ddb7 100644 --- a/src/blob_storage.cc +++ b/src/blob_storage.cc @@ -57,9 +57,9 @@ Status BlobStorage::Get(const ReadOptions& options, const BlobIndex& index, if (blob_cache_ && options.fill_cache) { Cache::Handle* cache_handle = nullptr; auto cache_value = new OwnedSlice(std::move(blob)); - blob_cache_->Insert( - cache_key, cache_value, cache_value->size() + sizeof(*cache_value), - &DeleteCacheValue, &cache_handle, Cache::Priority::BOTTOM); + blob_cache_->Insert(cache_key, cache_value, &kBlobValueCacheItemHelper, + cache_value->size() + sizeof(*cache_value), + &cache_handle, Cache::Priority::BOTTOM); value->PinSlice(record->value, UnrefCacheHandle, blob_cache_.get(), cache_handle); } else { diff --git a/src/db_impl.cc b/src/db_impl.cc index eb764e457..81791c7e3 100644 --- a/src/db_impl.cc +++ b/src/db_impl.cc @@ -785,9 +785,9 @@ Iterator* TitanDBImpl::NewIteratorImpl( } std::unique_ptr iter(db_impl_->NewIteratorImpl( - options, cfd, options.snapshot->GetSequenceNumber(), - nullptr /*read_callback*/, true /*expose_blob_index*/, - true /*allow_refresh*/)); + options, cfd, cfd->GetReferencedSuperVersion(db_impl_), + options.snapshot->GetSequenceNumber(), nullptr /*read_callback*/, + true /*expose_blob_index*/, true /*allow_refresh*/)); return new TitanDBIterator(options, storage.get(), snapshot, std::move(iter), env_->GetSystemClock().get(), stats_.get(), db_options_.info_log.get()); @@ -936,8 +936,8 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, file_meta->fd.GetPathId()); if (props.count(fname) == 0) { std::shared_ptr table_properties; - Status s = - version->GetTableProperties(&table_properties, file_meta, &fname); + Status s = version->GetTableProperties( + ReadOptions(), &table_properties, file_meta, &fname); if (s.ok() && table_properties) { props.insert({fname, table_properties}); } else { diff --git a/src/db_impl_gc.cc b/src/db_impl_gc.cc index 0a5e6bf26..aeb97dfc4 100644 --- a/src/db_impl_gc.cc +++ b/src/db_impl_gc.cc @@ -103,7 +103,7 @@ Status TitanDBImpl::AsyncInitializeGC( cf_handle->GetName().c_str()); TablePropertiesCollection collection; // this operation may be slow - s = cf.second->GetPropertiesOfAllTables(&collection); + s = cf.second->GetPropertiesOfAllTables(ReadOptions(), &collection); unref(cf.second); if (!s.ok()) { MutexLock l(&mutex_); diff --git a/src/db_iter.h b/src/db_iter.h index e0364968a..1177ebfb2 100644 --- a/src/db_iter.h +++ b/src/db_iter.h @@ -179,10 +179,9 @@ class TitanDBIterator : public Iterator { if (blob_cache && options_.fill_cache) { Cache::Handle *cache_handle = nullptr; auto cache_value = new OwnedSlice(std::move(blob)); - blob_cache->Insert(cache_key, cache_value, + blob_cache->Insert(cache_key, cache_value, &kBlobValueCacheItemHelper, cache_value->size() + sizeof(*cache_value), - &DeleteCacheValue, &cache_handle, - Cache::Priority::BOTTOM); + &cache_handle, Cache::Priority::BOTTOM); buffer_.PinSlice(*cache_value, UnrefCacheHandle, blob_cache, cache_handle); } else { diff --git a/src/edit_collector.h b/src/edit_collector.h index e05f8dac5..d26a2be7c 100644 --- a/src/edit_collector.h +++ b/src/edit_collector.h @@ -46,10 +46,10 @@ class EditCollector { if (edit.has_next_file_number_) { if (edit.next_file_number_ < next_file_number_) { - status_ = - Status::Corruption("Edit has a smaller next file number " + - ToString(edit.next_file_number_) + - " than current " + ToString(next_file_number_)); + status_ = Status::Corruption("Edit has a smaller next file number " + + std::to_string(edit.next_file_number_) + + " than current " + + std::to_string(next_file_number_)); return status_; } next_file_number_ = edit.next_file_number_; @@ -138,7 +138,7 @@ class EditCollector { "blob file %" PRIu64 " has been deleted twice\n", number); if (paranoid_check_) { - return Status::Corruption("Blob file " + ToString(number) + + return Status::Corruption("Blob file " + std::to_string(number) + " has been added twice"); } else { return Status::OK(); @@ -154,7 +154,7 @@ class EditCollector { "blob file %" PRIu64 " has been deleted twice\n", number); if (paranoid_check_) { - return Status::Corruption("Blob file " + ToString(number) + + return Status::Corruption("Blob file " + std::to_string(number) + " has been deleted twice"); } else { return Status::OK(); @@ -173,14 +173,14 @@ class EditCollector { TITAN_LOG_ERROR(storage->db_options().info_log, "blob file %" PRIu64 " has been deleted before\n", number); - return Status::Corruption("Blob file " + ToString(number) + + return Status::Corruption("Blob file " + std::to_string(number) + " has been deleted before"); } else { TITAN_LOG_ERROR(storage->db_options().info_log, "blob file %" PRIu64 " has been added before\n", number); - return Status::Corruption("Blob file " + ToString(number) + + return Status::Corruption("Blob file " + std::to_string(number) + " has been added before"); } } @@ -196,14 +196,14 @@ class EditCollector { TITAN_LOG_ERROR(storage->db_options().info_log, "blob file %" PRIu64 " doesn't exist before\n", number); - return Status::Corruption("Blob file " + ToString(number) + + return Status::Corruption("Blob file " + std::to_string(number) + " doesn't exist before"); } else if (blob->is_obsolete()) { TITAN_LOG_ERROR(storage->db_options().info_log, "blob file %" PRIu64 " has been deleted already\n", number); if (paranoid_check_) { - return Status::Corruption("Blob file " + ToString(number) + + return Status::Corruption("Blob file " + std::to_string(number) + " has been deleted already"); } } diff --git a/src/table_builder.cc b/src/table_builder.cc index 9232a9df2..ba28686e4 100644 --- a/src/table_builder.cc +++ b/src/table_builder.cc @@ -6,7 +6,7 @@ #include -#include "monitoring/statistics.h" +#include "monitoring/statistics_impl.h" #include "titan_logging.h" @@ -37,10 +37,10 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { uint64_t prev_bytes_written = 0; SavePrevIOBytes(&prev_bytes_read, &prev_bytes_written); + Slice copy = value; if (ikey.type == kTypeBlobIndex && cf_options_.blob_run_mode == TitanBlobRunMode::kFallback) { // we ingest value from blob file - Slice copy = value; BlobIndex index; status_ = index.DecodeFrom(©); if (!ok()) { @@ -71,17 +71,16 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { cf_options_.blob_run_mode == TitanBlobRunMode::kNormal) { bool is_small_kv = value.size() < cf_options_.min_blob_size; if (is_small_kv) { - AddBase(key, ikey, value); + AddBase(key, ikey, copy); } else { // We write to blob file and insert index - AddBlob(ikey, value); + AddBlob(ikey, copy); } } else if (ikey.type == kTypeBlobIndex && cf_options_.level_merge && target_level_ >= merge_level_ && cf_options_.blob_run_mode == TitanBlobRunMode::kNormal) { // we merge value to new blob file BlobIndex index; - Slice copy = value; status_ = index.DecodeFrom(©); if (!ok()) { return; @@ -108,11 +107,12 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { index.file_number, get_status.ToString().c_str()); } } - AddBase(key, ikey, value); + Slice another_copy = value; + AddBase(key, ikey, another_copy); } else { // Mainly processing kTypeMerge and kTypeBlobIndex in both flushing and // compaction. - AddBase(key, ikey, value); + AddBase(key, ikey, copy); } } diff --git a/src/table_builder_test.cc b/src/table_builder_test.cc index bb361a9b5..1c6956828 100644 --- a/src/table_builder_test.cc +++ b/src/table_builder_test.cc @@ -240,7 +240,7 @@ class TableBuilderTest : public testing::Test { uint64_t file_size = 0; ASSERT_OK(env_->GetFileSize(file->file_name(), &file_size)); TableReaderOptions options(ioptions_, prefix_extractor_, env_options_, - cf_ioptions_.internal_comparator); + cf_ioptions_.internal_comparator, 0); options.cur_file_num = file_number; ASSERT_OK(table_factory_->NewTableReader(options, std::move(file), file_size, result)); @@ -261,7 +261,7 @@ class TableBuilderTest : public testing::Test { ioptions_, cf_moptions_, cf_ioptions_.internal_comparator, &collectors_, kNoCompression, compression_opts, 0 /*column_family_id*/, kDefaultColumnFamilyName, target_level, false, - TableFileCreationReason::kMisc, 0, 0, 0, "", "", 0, file_number); + TableFileCreationReason::kMisc, 0, 0, "", "", 0, file_number); result->reset(table_factory_->NewTableBuilder(options, file)); } diff --git a/src/titan_checkpoint_impl.cc b/src/titan_checkpoint_impl.cc index 1c8d91c35..3ec4e2945 100644 --- a/src/titan_checkpoint_impl.cc +++ b/src/titan_checkpoint_impl.cc @@ -6,6 +6,7 @@ #include "file/file_util.h" #include "file/filename.h" #include "port/port.h" +#include "rocksdb/advanced_options.h" #include "rocksdb/transaction_log.h" #include "test_util/sync_point.h" #include "utilities/checkpoint/checkpoint_impl.h" @@ -159,7 +160,8 @@ Status TitanCheckpointImpl::CreateCheckpoint( TITAN_LOG_INFO(titandb_options.info_log, "Copying %s", fname.c_str()); return CopyFile(db_->GetFileSystem(), src_dirname + fname, full_private_path + fname, size_limit_bytes, - titandb_options.use_fsync); + titandb_options.use_fsync, nullptr, + Temperature::kUnknown); } /* copy_file_cb */, [&](const std::string& fname, const std::string& contents, FileType) { TITAN_LOG_INFO(titandb_options.info_log, "Creating %s", diff --git a/src/titan_checkpoint_test.cc b/src/titan_checkpoint_test.cc index 28274a428..bcef258cf 100644 --- a/src/titan_checkpoint_test.cc +++ b/src/titan_checkpoint_test.cc @@ -386,8 +386,8 @@ TEST_F(CheckpointTest, CheckpointCF) { CreateAndReopenWithCF({"one", "two", "three", "four", "five", "six"}, options); rocksdb::SyncPoint::GetInstance()->LoadDependency( - {{"CheckpointTest::CheckpointCF:2", "DBImpl::GetLiveFiles:2"}, - {"DBImpl::GetLiveFiles:1", "CheckpointTest::CheckpointCF:1"}}); + {{"CheckpointTest::CheckpointCF:2", "DBImpl::FlushAllColumnFamilies:2"}, + {"DBImpl::FlushAllColumnFamilies:1", "CheckpointTest::CheckpointCF:1"}}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc index e4dc1f287..784a06163 100644 --- a/src/titan_db_test.cc +++ b/src/titan_db_test.cc @@ -3,7 +3,7 @@ #include "db/db_impl/db_impl.h" #include "file/filename.h" -#include "monitoring/statistics.h" +#include "monitoring/statistics_impl.h" #include "options/cf_options.h" #include "port/port.h" #include "rocksdb/utilities/debug.h" @@ -39,6 +39,7 @@ class TitanDBTest : public testing::Test { options_.min_blob_size = 32; options_.min_gc_batch_size = 1; options_.disable_background_gc = true; + options_.disable_auto_compactions = true; options_.blob_file_compression = CompressionType::kLZ4Compression; options_.statistics = CreateDBStatistics(); DeleteDir(env_, options_.dirname); diff --git a/src/titan_stats.cc b/src/titan_stats.cc index 973bcf9f2..1ba875898 100644 --- a/src/titan_stats.cc +++ b/src/titan_stats.cc @@ -4,7 +4,6 @@ #include #include -#include "monitoring/statistics.h" #include "monitoring/statistics_impl.h" #include "blob_file_set.h" diff --git a/src/titan_stats.h b/src/titan_stats.h index 41526c47e..1390f0136 100644 --- a/src/titan_stats.h +++ b/src/titan_stats.h @@ -8,7 +8,7 @@ #include "logging/log_buffer.h" #include "monitoring/histogram.h" -#include "monitoring/statistics.h" +#include "monitoring/statistics_impl.h" #include "rocksdb/iostats_context.h" #include "rocksdb/statistics.h" #include "util/string_util.h" diff --git a/src/util.h b/src/util.h index cef2e5f8b..62d3b8a46 100644 --- a/src/util.h +++ b/src/util.h @@ -71,10 +71,13 @@ Status Uncompress(const UncompressionInfo& info, const Slice& input, void UnrefCacheHandle(void* cache, void* handle); template -void DeleteCacheValue(const Slice&, void* value) { +void DeleteCacheValue(void* value, MemoryAllocator*) { delete reinterpret_cast(value); } +const Cache::CacheItemHelper kBlobValueCacheItemHelper( + CacheEntryRole::kBlobValue, &DeleteCacheValue); + Status SyncTitanManifest(TitanStats* stats, const ImmutableDBOptions* db_options, WritableFileWriter* file); diff --git a/src/util_test.cc b/src/util_test.cc index 0e30cbdbc..17a0bd0b7 100644 --- a/src/util_test.cc +++ b/src/util_test.cc @@ -12,7 +12,7 @@ TEST(UtilTest, Compression) { for (auto compression : {kSnappyCompression, kZlibCompression, kLZ4Compression, kZSTD}) { CompressionOptions compression_opt; - CompressionContext compression_ctx(compression); + CompressionContext compression_ctx(compression, compression_opt); CompressionInfo compression_info( compression_opt, compression_ctx, CompressionDict::GetEmptyDict(), compression, 0 /* sample_for_compression */); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index e7b0292d3..7336a14c1 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -36,9 +37,8 @@ #include "db/db_impl/db_impl.h" #include "db/malloc_stats.h" #include "db/version_set.h" -#include "hdfs/env_hdfs.h" #include "monitoring/histogram.h" -#include "monitoring/statistics.h" +#include "monitoring/statistics_impl.h" #include "options/cf_options.h" #include "port/port.h" #include "port/stack_trace.h" @@ -500,9 +500,6 @@ DEFINE_int32(file_opening_threads, rocksdb::Options().max_file_opening_threads, "If open_files is set to -1, this option set the number of " "threads that will be used to open files during DB::Open()"); -DEFINE_bool(new_table_reader_for_compaction_inputs, true, - "If true, uses a separate file handle for compaction inputs"); - DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size"); DEFINE_int32(random_access_max_buffer_size, 1024 * 1024, @@ -940,10 +937,6 @@ static bool ValidateRateLimit(const char* flagname, double value) { } return true; } -DEFINE_double(soft_rate_limit, 0.0, "DEPRECATED"); - -DEFINE_double(hard_rate_limit, 0.0, "DEPRECATED"); - DEFINE_uint64(soft_pending_compaction_bytes_limit, 64ull * 1024 * 1024 * 1024, "Slowdown writes if pending compaction bytes exceed this number"); @@ -1253,12 +1246,6 @@ DEFINE_bool(report_file_operations, false, "operations"); DEFINE_int32(readahead_size, 0, "Iterator readahead size"); -static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) = - RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit); - -static const bool FLAGS_hard_rate_limit_dummy __attribute__((__unused__)) = - RegisterFlagValidator(&FLAGS_hard_rate_limit, &ValidateRateLimit); - static const bool FLAGS_prefix_size_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); @@ -1619,9 +1606,9 @@ class ReporterAgent { auto secs_elapsed = (env_->NowMicros() - time_started + kMicrosInSecond / 2) / kMicrosInSecond; - std::string report = ToString(secs_elapsed) + "," + - ToString(total_ops_done_snapshot - last_report_) + - "\n"; + std::string report = + std::to_string(secs_elapsed) + "," + + std::to_string(total_ops_done_snapshot - last_report_) + "\n"; auto s = report_file_->Append(report); if (s.ok()) { s = report_file_->Flush(); @@ -1867,7 +1854,7 @@ class Stats { if (db->GetProperty( db_with_cfh->cfh[i], "rocksdb.aggregated-table-properties-at-level" + - ToString(level), + std::to_string(level), &stats)) { if (stats.find("# entries=0") == std::string::npos) { fprintf(stderr, "Level[%d]: %s\n", level, @@ -1885,7 +1872,7 @@ class Stats { for (int level = 0; level < FLAGS_num_levels; ++level) { if (db->GetProperty( "rocksdb.aggregated-table-properties-at-level" + - ToString(level), + std::to_string(level), &stats)) { if (stats.find("# entries=0") == std::string::npos) { fprintf(stderr, "Level[%d]: %s\n", level, stats.c_str()); @@ -2314,7 +2301,7 @@ class Benchmark { std::string input_str(len, 'y'); std::string compressed; CompressionOptions opts; - CompressionContext context(FLAGS_compression_type_e); + CompressionContext context(FLAGS_compression_type_e, opts); CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), FLAGS_compression_type_e, FLAGS_sample_for_compression); @@ -2372,9 +2359,9 @@ class Benchmark { Slice val = TrimSpace(Slice(sep + 1)); if (key == "model name") { ++num_cpus; - cpu_type = val.ToString(); + cpu_type = val.std::to_string(); } else if (key == "cache size") { - cache_size = val.ToString(); + cache_size = val.std::to_string(); } } fclose(cpuinfo); @@ -2628,7 +2615,7 @@ class Benchmark { } #endif } - return base_name + ToString(id); + return base_name + std::to_string(id); } void VerifyDBFromDB(std::string& truth_db_name) { @@ -3207,7 +3194,7 @@ class Benchmark { void Crc32c(ThreadState* thread) { // Checksum about 500MB of data total const int size = FLAGS_block_size; // use --block_size option for db_bench - std::string labels = "(" + ToString(FLAGS_block_size) + " per op)"; + std::string labels = "(" + std::to_string(FLAGS_block_size) + " per op)"; const char* label = labels.c_str(); std::string data(size, 'x'); @@ -3268,7 +3255,7 @@ class Benchmark { bool ok = true; std::string compressed; CompressionOptions opts; - CompressionContext context(FLAGS_compression_type_e); + CompressionContext context(FLAGS_compression_type_e, opts); CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), FLAGS_compression_type_e, FLAGS_sample_for_compression); @@ -3297,8 +3284,9 @@ class Benchmark { Slice input = gen.Generate(FLAGS_block_size); std::string compressed; - CompressionContext compression_ctx(FLAGS_compression_type_e); CompressionOptions compression_opts; + CompressionContext compression_ctx(FLAGS_compression_type_e, + compression_opts); CompressionInfo compression_info( compression_opts, compression_ctx, CompressionDict::GetEmptyDict(), FLAGS_compression_type_e, FLAGS_sample_for_compression); @@ -3337,8 +3325,8 @@ class Benchmark { DBOptions db_opts; std::vector cf_descs; if (FLAGS_options_file != "") { - auto s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), &db_opts, - &cf_descs); + auto s = LoadOptionsFromFile(ConfigOptions(), FLAGS_options_file, + &db_opts, &cf_descs); if (s.ok()) { *opts = Options(db_opts, cf_descs[0].options); return true; @@ -3411,8 +3399,6 @@ class Benchmark { } options.bloom_locality = FLAGS_bloom_locality; options.max_file_opening_threads = FLAGS_file_opening_threads; - options.new_table_reader_for_compaction_inputs = - FLAGS_new_table_reader_for_compaction_inputs; options.compaction_readahead_size = FLAGS_compaction_readahead_size; options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size; options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size; @@ -3537,7 +3523,6 @@ class Benchmark { true; } block_based_options.block_cache = cache_; - block_based_options.block_cache_compressed = compressed_cache_; block_based_options.block_size = FLAGS_block_size; block_based_options.block_restart_interval = FLAGS_block_restart_interval; block_based_options.index_block_restart_interval = @@ -3632,8 +3617,6 @@ class Benchmark { options.compression_per_level[i] = FLAGS_compression_type_e; } } - options.soft_rate_limit = FLAGS_soft_rate_limit; - options.hard_rate_limit = FLAGS_hard_rate_limit; options.soft_pending_compaction_bytes_limit = FLAGS_soft_pending_compaction_bytes_limit; options.hard_pending_compaction_bytes_limit = @@ -3649,8 +3632,6 @@ class Benchmark { options.unordered_write = FLAGS_unordered_write; options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec; options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec; - options.rate_limit_delay_max_milliseconds = - FLAGS_rate_limit_delay_max_milliseconds; options.table_cache_numshardbits = FLAGS_table_cache_numshardbits; options.max_compaction_bytes = FLAGS_max_compaction_bytes; options.disable_auto_compactions = FLAGS_disable_auto_compactions; @@ -3772,13 +3753,6 @@ class Benchmark { } if (FLAGS_rate_limiter_bytes_per_sec > 0) { - if (FLAGS_rate_limit_bg_reads && - !FLAGS_new_table_reader_for_compaction_inputs) { - fprintf(stderr, - "rate limit compaction reads must have " - "new_table_reader_for_compaction_inputs set\n"); - exit(1); - } options.rate_limiter.reset(NewGenericRateLimiter( FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */, 10 /* fairness */, @@ -6290,7 +6264,8 @@ class Benchmark { } std::unique_ptr shi; - Status s = db->GetStatsHistory(0, port::kMaxUint64, &shi); + Status s = + db->GetStatsHistory(0, std::numeric_limits::max(), &shi); if (!s.ok()) { fprintf(stdout, "%s\n", s.ToString().c_str()); return; @@ -6428,7 +6403,8 @@ int db_bench_tool(int argc, char** argv) { fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n"); exit(1); } else if (!FLAGS_env_uri.empty()) { - Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env); + Status s = + Env::CreateFromString(ConfigOptions(), FLAGS_env_uri, &FLAGS_env); if (FLAGS_env == nullptr) { fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str()); exit(1); @@ -6442,10 +6418,6 @@ int db_bench_tool(int argc, char** argv) { exit(1); } - if (!FLAGS_hdfs.empty()) { - FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs); - } - if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE")) FLAGS_compaction_fadvice_e = rocksdb::Options::NONE; else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL")) diff --git a/tools/titandb_stress.cc b/tools/titandb_stress.cc index 168572005..215d7635b 100644 --- a/tools/titandb_stress.cc +++ b/tools/titandb_stress.cc @@ -43,7 +43,6 @@ int main() { #include "db/db_impl/db_impl.h" #include "db/version_set.h" -#include "hdfs/env_hdfs.h" #include "logging/logging.h" #include "monitoring/histogram.h" #include "options/options_helper.h" @@ -54,7 +53,7 @@ int main() { #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" -#include "rocksdb/utilities/backupable_db.h" +#include "rocksdb/utilities/backup_engine.h" #include "rocksdb/utilities/checkpoint.h" #include "rocksdb/utilities/db_ttl.h" #include "rocksdb/utilities/debug.h" @@ -1478,20 +1477,20 @@ class StressTest { std::unordered_map > options_tbl = { {"write_buffer_size", - {ToString(options_.write_buffer_size), - ToString(options_.write_buffer_size * 2), - ToString(options_.write_buffer_size * 4)}}, + {std::to_string(options_.write_buffer_size), + std::to_string(options_.write_buffer_size * 2), + std::to_string(options_.write_buffer_size * 4)}}, {"max_write_buffer_number", - {ToString(options_.max_write_buffer_number), - ToString(options_.max_write_buffer_number * 2), - ToString(options_.max_write_buffer_number * 4)}}, + {std::to_string(options_.max_write_buffer_number), + std::to_string(options_.max_write_buffer_number * 2), + std::to_string(options_.max_write_buffer_number * 4)}}, {"arena_block_size", { - ToString(options_.arena_block_size), - ToString(options_.write_buffer_size / 4), - ToString(options_.write_buffer_size / 8), + std::to_string(options_.arena_block_size), + std::to_string(options_.write_buffer_size / 4), + std::to_string(options_.write_buffer_size / 8), }}, - {"memtable_huge_page_size", {"0", ToString(2 * 1024 * 1024)}}, + {"memtable_huge_page_size", {"0", std::to_string(2 * 1024 * 1024)}}, {"max_successive_merges", {"0", "2", "4"}}, {"inplace_update_num_locks", {"100", "200", "300"}}, // TODO(ljin): enable test for this option @@ -1500,49 +1499,49 @@ class StressTest { {"hard_rate_limit", {"0", "1.1", "2.0"}}, {"level0_file_num_compaction_trigger", { - ToString(options_.level0_file_num_compaction_trigger), - ToString(options_.level0_file_num_compaction_trigger + 2), - ToString(options_.level0_file_num_compaction_trigger + 4), + std::to_string(options_.level0_file_num_compaction_trigger), + std::to_string(options_.level0_file_num_compaction_trigger + 2), + std::to_string(options_.level0_file_num_compaction_trigger + 4), }}, {"level0_slowdown_writes_trigger", { - ToString(options_.level0_slowdown_writes_trigger), - ToString(options_.level0_slowdown_writes_trigger + 2), - ToString(options_.level0_slowdown_writes_trigger + 4), + std::to_string(options_.level0_slowdown_writes_trigger), + std::to_string(options_.level0_slowdown_writes_trigger + 2), + std::to_string(options_.level0_slowdown_writes_trigger + 4), }}, {"level0_stop_writes_trigger", { - ToString(options_.level0_stop_writes_trigger), - ToString(options_.level0_stop_writes_trigger + 2), - ToString(options_.level0_stop_writes_trigger + 4), + std::to_string(options_.level0_stop_writes_trigger), + std::to_string(options_.level0_stop_writes_trigger + 2), + std::to_string(options_.level0_stop_writes_trigger + 4), }}, {"max_compaction_bytes", { - ToString(options_.target_file_size_base * 5), - ToString(options_.target_file_size_base * 15), - ToString(options_.target_file_size_base * 100), + std::to_string(options_.target_file_size_base * 5), + std::to_string(options_.target_file_size_base * 15), + std::to_string(options_.target_file_size_base * 100), }}, {"target_file_size_base", { - ToString(options_.target_file_size_base), - ToString(options_.target_file_size_base * 2), - ToString(options_.target_file_size_base * 4), + std::to_string(options_.target_file_size_base), + std::to_string(options_.target_file_size_base * 2), + std::to_string(options_.target_file_size_base * 4), }}, {"target_file_size_multiplier", { - ToString(options_.target_file_size_multiplier), + std::to_string(options_.target_file_size_multiplier), "1", "2", }}, {"max_bytes_for_level_base", { - ToString(options_.max_bytes_for_level_base / 2), - ToString(options_.max_bytes_for_level_base), - ToString(options_.max_bytes_for_level_base * 2), + std::to_string(options_.max_bytes_for_level_base / 2), + std::to_string(options_.max_bytes_for_level_base), + std::to_string(options_.max_bytes_for_level_base * 2), }}, {"max_bytes_for_level_multiplier", { - ToString(options_.max_bytes_for_level_multiplier), + std::to_string(options_.max_bytes_for_level_multiplier), "1", "2", }}, @@ -1863,7 +1862,8 @@ class StressTest { if (snap_state.status != s) { return Status::Corruption( "The snapshot gave inconsistent results for key " + - ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) + + std::to_string( + Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) + " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() + ") vs. (" + s.ToString() + ")"); } @@ -2162,8 +2162,9 @@ class StressTest { if (FLAGS_compact_range_one_in > 0 && thread->rand.Uniform(FLAGS_compact_range_one_in) == 0) { int64_t end_key_num; - if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) { - end_key_num = port::kMaxInt64; + if (std::numeric_limits::max() - rand_key < + FLAGS_compact_range_width) { + end_key_num = std::numeric_limits::max(); } else { end_key_num = FLAGS_compact_range_width + rand_key; } @@ -2473,18 +2474,21 @@ class StressTest { // dropped while the locks for `rand_keys` are held. So we should not have // to worry about accessing those column families throughout this function. assert(rand_column_families.size() == rand_keys.size()); - std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid); - std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid); - BackupableDBOptions backup_opts(backup_dir); + std::string backup_dir = + FLAGS_db + "/.backup" + std::to_string(thread->tid); + std::string restore_dir = + FLAGS_db + "/.restore" + std::to_string(thread->tid); + BackupEngineOptions backup_engine_opts(backup_dir); BackupEngine* backup_engine = nullptr; - Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine); + Status s = + BackupEngine::Open(FLAGS_env, backup_engine_opts, &backup_engine); if (s.ok()) { s = backup_engine->CreateNewBackup(db_); } if (s.ok()) { delete backup_engine; backup_engine = nullptr; - s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine); + s = BackupEngine::Open(FLAGS_env, backup_engine_opts, &backup_engine); } if (s.ok()) { s = backup_engine->RestoreDBFromLatestBackup(restore_dir /* db_dir */, @@ -2561,7 +2565,7 @@ class StressTest { // to worry about accessing those column families throughout this function. assert(rand_column_families.size() == rand_keys.size()); std::string checkpoint_dir = - FLAGS_db + "/.checkpoint" + ToString(thread->tid); + FLAGS_db + "/.checkpoint" + std::to_string(thread->tid); DestroyDB(checkpoint_dir, Options()); Checkpoint* checkpoint = nullptr; Status s = Checkpoint::Create(db_, &checkpoint); @@ -2745,7 +2749,6 @@ class StressTest { block_based_options.block_cache = cache_; block_based_options.cache_index_and_filter_blocks = FLAGS_cache_index_and_filter_blocks; - block_based_options.block_cache_compressed = compressed_cache_; block_based_options.checksum = FLAGS_checksum_type_e; block_based_options.block_size = FLAGS_block_size; block_based_options.format_version = @@ -2824,7 +2827,7 @@ class StressTest { #else DBOptions db_options; std::vector cf_descriptors; - Status s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), + Status s = LoadOptionsFromFile(ConfigOptions(), FLAGS_options_file, &db_options, &cf_descriptors); if (!s.ok()) { fprintf(stderr, "Unable to load options file %s --- %s\n", @@ -2841,9 +2844,6 @@ class StressTest { 10 /* fairness */, FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly : RateLimiter::Mode::kWritesOnly)); - if (FLAGS_rate_limit_bg_reads) { - options_.new_table_reader_for_compaction_inputs = true; - } } if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) { @@ -2929,7 +2929,7 @@ class StressTest { cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); } while (cf_descriptors.size() < (size_t)FLAGS_column_families) { - std::string name = ToString(new_column_family_name_.load()); + std::string name = std::to_string(new_column_family_name_.load()); new_column_family_name_++; cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); column_family_names_.push_back(name); @@ -3215,7 +3215,8 @@ class NonBatchedOpsStressTest : public StressTest { if (thread->rand.OneIn(FLAGS_clear_column_family_one_in)) { // drop column family and then create it again (can't drop default) int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1; - std::string new_name = ToString(new_column_family_name_.fetch_add(1)); + std::string new_name = + std::to_string(new_column_family_name_.fetch_add(1)); { MutexLock l(thread->shared->GetMutex()); fprintf( @@ -3592,7 +3593,7 @@ class NonBatchedOpsStressTest : public StressTest { ThreadState* thread, const std::vector& rand_column_families, const std::vector& rand_keys, std::unique_ptr& lock) { const std::string sst_filename = - FLAGS_db + "/." + ToString(thread->tid) + ".sst"; + FLAGS_db + "/." + std::to_string(thread->tid) + ".sst"; Status s; if (FLAGS_env->FileExists(sst_filename).ok()) { // Maybe we terminated abnormally before, so cleanup to give this file @@ -4283,7 +4284,7 @@ class CfConsistencyStressTest : public StressTest { ThreadState* thread, const std::vector& /* rand_column_families */, const std::vector& /* rand_keys */) { std::string checkpoint_dir = - FLAGS_db + "/.checkpoint" + ToString(thread->tid); + FLAGS_db + "/.checkpoint" + std::to_string(thread->tid); DestroyDB(checkpoint_dir, Options()); Checkpoint* checkpoint = nullptr; Status s = Checkpoint::Create(db_, &checkpoint); @@ -4507,9 +4508,6 @@ int main(int argc, char** argv) { FLAGS_compression_type_e = StringToCompressionType(FLAGS_compression_type.c_str()); FLAGS_checksum_type_e = StringToChecksumType(FLAGS_checksum_type.c_str()); - if (!FLAGS_hdfs.empty()) { - FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs); - } FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str()); // The number of background threads should be at least as much the