Skip to content

Commit

Permalink
Fix compatibility
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
v01dstar committed Oct 5, 2024
1 parent 31fdadf commit 6b8c4ed
Show file tree
Hide file tree
Showing 30 changed files with 206 additions and 218 deletions.
8 changes: 4 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ enable_language(C)
find_package(Git)

if (NOT ROCKSDB_GIT_REPO)
set(ROCKSDB_GIT_REPO "https://github.com/tikv/rocksdb.git")
set(ROCKSDB_GIT_REPO "https://github.com/v01dstar/rocksdb.git")
endif()

if (NOT ROCKSDB_GIT_BRANCH)
set(ROCKSDB_GIT_BRANCH "6.29.tikv") # should at least or newer than commit dcf2f8d56092285381be2acbf8f04b8aeeb7ad79
set(ROCKSDB_GIT_BRANCH "compile-option") # should at least or newer than commit dcf2f8d56092285381be2acbf8f04b8aeeb7ad79
endif()

if (NOT DEFINED ROCKSDB_DIR)
Expand Down Expand Up @@ -92,7 +92,7 @@ if (WITH_TITAN_TESTS OR WITH_TITAN_TOOLS)
add_subdirectory(${ROCKSDB_DIR} rocksdb EXCLUDE_FROM_ALL)
# Check if -latomic is required or not
if (NOT MSVC)
set(CMAKE_REQUIRED_FLAGS "--std=c++11")
set(CMAKE_REQUIRED_FLAGS "--std=c++17")
CHECK_CXX_SOURCE_COMPILES("
#include <atomic>
std::atomic<uint64_t> x(0);
Expand All @@ -111,7 +111,7 @@ endif()

# Check if -latomic is required or not
if (NOT MSVC)
set(CMAKE_REQUIRED_FLAGS "--std=c++11")
set(CMAKE_REQUIRED_FLAGS "--std=c++17")
CHECK_CXX_SOURCE_COMPILES("
#include <atomic>
std::atomic<uint64_t> x(0);
Expand Down
2 changes: 1 addition & 1 deletion cmake/rocksdb_flags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ else()
if(MINGW)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format")
endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17")
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer")
include(CheckCXXCompilerFlag)
Expand Down
1 change: 1 addition & 0 deletions include/titan/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <map>
#include <unordered_map>

#include "rocksdb/advanced_cache.h"
#include "rocksdb/options.h"

namespace rocksdb {
Expand Down
12 changes: 8 additions & 4 deletions src/blob_file_cache.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "blob_file_cache.h"

#include "file/filename.h"
#include "rocksdb/advanced_cache.h"

#include "util.h"

Expand All @@ -9,6 +10,9 @@ namespace titandb {

namespace {

const Cache::CacheItemHelper kBlobFileReaderCacheItemHelper(
CacheEntryRole::kBlockBasedTableReader, &DeleteCacheValue<BlobFileReader>);

Slice EncodeFileNumber(const uint64_t* number) {
return Slice(reinterpret_cast<const char*>(number), sizeof(*number));
}
Expand All @@ -27,13 +31,13 @@ BlobFileCache::BlobFileCache(const TitanDBOptions& db_options,

Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number,
const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer, bool for_compaction) {
OwnedSlice* buffer) {
Cache::Handle* cache_handle = nullptr;
Status s = GetBlobFileReaderHandle(file_number, &cache_handle);
if (!s.ok()) return s;

auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(cache_handle));
s = reader->Get(options, handle, record, buffer, for_compaction);
s = reader->Get(options, handle, record, buffer);
cache_->Release(cache_handle);
return s;
}
Expand Down Expand Up @@ -87,8 +91,8 @@ Status BlobFileCache::GetBlobFileReaderHandle(uint64_t file_number,
stats_);
if (!s.ok()) return s;

cache_->Insert(cache_key, reader.release(), 1,
&DeleteCacheValue<BlobFileReader>, handle);
cache_->Insert(cache_key, reader.release(), &kBlobFileReaderCacheItemHelper,
1, handle);
return s;
}

Expand Down
3 changes: 1 addition & 2 deletions src/blob_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ class BlobFileCache {
// bytes. The provided buffer is used to store the record data, so
// the buffer must be valid when the record is used.
Status Get(const ReadOptions& options, uint64_t file_number,
const BlobHandle& handle, BlobRecord* record, OwnedSlice* buffer,
bool for_compaction = false);
const BlobHandle& handle, BlobRecord* record, OwnedSlice* buffer);

// Creates a prefetcher for the specified file number.
Status NewPrefetcher(uint64_t file_number,
Expand Down
53 changes: 26 additions & 27 deletions src/blob_file_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ BlobFileIterator::~BlobFileIterator() {}
bool BlobFileIterator::Init() {
Slice slice;
char header_buf[BlobFileHeader::kV3EncodedLength];
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ =
file_->Read(IOOptions(), 0, BlobFileHeader::kV3EncodedLength, &slice,
header_buf, nullptr /*aligned_buf*/, true /*for_compaction*/);
IOOptions io_options;
io_options.rate_limiter_priority = Env::IOPriority::IO_LOW;
status_ = file_->Read(io_options, 0, BlobFileHeader::kV3EncodedLength, &slice,
header_buf, nullptr /*aligned_buf*/);
if (!status_.ok()) {
return false;
}
Expand All @@ -39,12 +38,9 @@ bool BlobFileIterator::Init() {
header_size_ = blob_file_header.size();

char footer_buf[BlobFileFooter::kEncodedLength];
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ =
file_->Read(IOOptions(), file_size_ - BlobFileFooter::kEncodedLength,
BlobFileFooter::kEncodedLength, &slice, footer_buf,
nullptr /*aligned_buf*/, true /*for_compaction*/);
status_ = file_->Read(io_options, file_size_ - BlobFileFooter::kEncodedLength,
BlobFileFooter::kEncodedLength, &slice, footer_buf,
nullptr /*aligned_buf*/);
if (!status_.ok()) return false;
BlobFileFooter blob_file_footer;
status_ = blob_file_footer.DecodeFrom(&slice);
Expand Down Expand Up @@ -126,12 +122,13 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {
FixedSlice<kRecordHeaderSize> header_buffer;
iterate_offset_ = header_size_;
while (iterate_offset_ < offset) {
// With for_compaction=true, rate_limiter is enabled. Since
// BlobFileIterator is only used for GC, we always set for_compaction to
// true.
status_ = file_->Read(IOOptions(), iterate_offset_, kRecordHeaderSize,
IOOptions io_options;
// Since BlobFileIterator is only used for GC, we always set IO priority to
// low.
io_options.rate_limiter_priority = Env::IOPriority::IO_LOW;
status_ = file_->Read(io_options, iterate_offset_, kRecordHeaderSize,
&header_buffer, header_buffer.get(),
nullptr /*aligned_buf*/, true /*for_compaction*/);
nullptr /*aligned_buf*/);
if (!status_.ok()) return;
status_ = decoder_.DecodeHeader(&header_buffer);
if (!status_.ok()) return;
Expand All @@ -148,23 +145,23 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {

void BlobFileIterator::GetBlobRecord() {
FixedSlice<kRecordHeaderSize> header_buffer;
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(IOOptions(), iterate_offset_, kRecordHeaderSize,
&header_buffer, header_buffer.get(),
nullptr /*aligned_buf*/, true /*for_compaction*/);
// Since BlobFileIterator is only used for GC, we always set IO priority to
// low.
IOOptions io_options;
io_options.rate_limiter_priority = Env::IOPriority::IO_LOW;
status_ =
file_->Read(io_options, iterate_offset_, kRecordHeaderSize,
&header_buffer, header_buffer.get(), nullptr /*aligned_buf*/);
if (!status_.ok()) return;
status_ = decoder_.DecodeHeader(&header_buffer);
if (!status_.ok()) return;

Slice record_slice;
auto record_size = decoder_.GetRecordSize();
buffer_.resize(record_size);
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(IOOptions(), iterate_offset_ + kRecordHeaderSize,
record_size, &record_slice, buffer_.data(),
nullptr /*aligned_buf*/, true /*for_compaction*/);
status_ =
file_->Read(io_options, iterate_offset_ + kRecordHeaderSize, record_size,
&record_slice, buffer_.data(), nullptr /*aligned_buf*/);
if (status_.ok()) {
status_ =
decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_,
Expand Down Expand Up @@ -199,7 +196,9 @@ void BlobFileIterator::PrefetchAndGet() {
while (readahead_end_offset_ + readahead_size_ <= min_blob_size &&
readahead_size_ < kMaxReadaheadSize)
readahead_size_ <<= 1;
file_->Prefetch(readahead_end_offset_, readahead_size_);
IOOptions io_options;
io_options.rate_limiter_priority = Env::IOPriority::IO_LOW;
file_->Prefetch(io_options, readahead_end_offset_, readahead_size_);
readahead_end_offset_ += readahead_size_;
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1);
}
Expand Down
12 changes: 7 additions & 5 deletions src/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,20 @@ BlobFileReader::BlobFileReader(const TitanCFOptions& options,

Status BlobFileReader::Get(const ReadOptions& _options,
const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer, bool for_compaction) {
OwnedSlice* buffer) {
TEST_SYNC_POINT("BlobFileReader::Get");
Slice blob;
CacheAllocationPtr ubuf =
AllocateBlock(handle.size, options_.memory_allocator());
Status s = file_->Read(IOOptions(), handle.offset, handle.size, &blob,
ubuf.get(), nullptr /*aligned_buf*/, for_compaction);
ubuf.get(), nullptr /*aligned_buf*/);
if (!s.ok()) {
return s;
}
if (handle.size != static_cast<uint64_t>(blob.size())) {
return Status::Corruption(
"ReadRecord actual size: " + ToString(blob.size()) +
" not equal to blob size " + ToString(handle.size));
"ReadRecord actual size: " + std::to_string(blob.size()) +
" not equal to blob size " + std::to_string(handle.size));
}

BlobDecoder decoder(uncompression_dict_ == nullptr
Expand All @@ -165,7 +165,9 @@ Status BlobFilePrefetcher::Get(const ReadOptions& options,
last_offset_ = handle.offset + handle.size;
if (handle.offset + handle.size > readahead_limit_) {
readahead_size_ = std::max(handle.size, readahead_size_);
reader_->file_->Prefetch(handle.offset, readahead_size_);
IOOptions io_options;
io_options.rate_limiter_priority = Env::IOPriority::IO_HIGH;
reader_->file_->Prefetch(io_options, handle.offset, readahead_size_);
readahead_limit_ = handle.offset + readahead_size_;
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ * 2);
}
Expand Down
3 changes: 1 addition & 2 deletions src/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class BlobFileReader {
// of the record is stored in the value slice underlying, so the value slice
// must be valid when the record is used.
Status Get(const ReadOptions& options, const BlobHandle& handle,
BlobRecord* record, OwnedSlice* buffer,
bool for_compaction = false);
BlobRecord* record, OwnedSlice* buffer);

private:
friend class BlobFilePrefetcher;
Expand Down
4 changes: 2 additions & 2 deletions src/blob_file_size_collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class BlobFileSizeCollectorTest : public testing::Test {
void NewTableReader(std::unique_ptr<RandomAccessFileReader>&& file,
std::unique_ptr<TableReader>* result) {
TableReaderOptions options(ioptions_, prefix_extractor_, env_options_,
cf_ioptions_.internal_comparator);
cf_ioptions_.internal_comparator, 0);
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file->file_name(), &file_size));
ASSERT_TRUE(file_size > 0);
Expand All @@ -102,7 +102,7 @@ TEST_F(BlobFileSizeCollectorTest, Basic) {
ParsedInternalKey ikey;
snprintf(buf, sizeof(buf), "%15d", i);
ikey.user_key = buf;
ikey.type = kTypeBlobIndex;
ikey.type = kTypeTitanBlobIndex;
std::string key;
AppendInternalKey(&key, ikey);

Expand Down
2 changes: 2 additions & 0 deletions src/blob_format.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "blob_format.h"

#include <cinttypes>

#include "test_util/sync_point.h"
#include "util/crc32c.h"

Expand Down
4 changes: 2 additions & 2 deletions src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BlobEncoder {
BlobEncoder(CompressionType compression, CompressionOptions compression_opt,
const CompressionDict* compression_dict)
: compression_opt_(compression_opt),
compression_ctx_(compression),
compression_ctx_(compression, compression_opt_),
compression_dict_(compression_dict),
compression_info_(new CompressionInfo(
compression_opt_, compression_ctx_, *compression_dict_, compression,
Expand Down Expand Up @@ -347,7 +347,7 @@ struct BlobFileHeader {
if (ver != BlobFileHeader::kVersion1 && ver != BlobFileHeader::kVersion2 &&
ver != BlobFileHeader::kVersion3) {
return Status::InvalidArgument("unrecognized blob file version " +
ToString(ver));
std::to_string(ver));
}
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions src/blob_gc_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ void BlobGCJob::BatchWriteNewIndices(BlobFileBuilder::OutContexts& contexts,
rewrite_batches_.emplace_back(
std::make_pair(WriteBatch(), std::move(callback)));
auto& wb = rewrite_batches_.back().first;
*s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), ikey.user_key,
index_entry);
*s = WriteBatchInternal::PutTitanBlobIndex(&wb, cfh->GetID(), ikey.user_key,
index_entry);
if (!s->ok()) break;
}
}
Expand Down
13 changes: 10 additions & 3 deletions src/blob_gc_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,22 @@ class BlobGCJobTest : public testing::Test {
TitanOptions options_;
port::Mutex* mutex_;

BlobGCJobTest() : dbname_(test::TmpDir()) {
void ResetOptions() {
options_.dirname = dbname_ + "/titandb";
options_.create_if_missing = true;
options_.disable_background_gc = true;
options_.min_blob_size = 0;
options_.disable_auto_compactions = true;
options_.level_compaction_dynamic_level_bytes = false;
options_.env->CreateDirIfMissing(dbname_);
options_.env->CreateDirIfMissing(options_.dirname);
}
~BlobGCJobTest() { Close(); }

BlobGCJobTest() : dbname_(test::TmpDir()) { ResetOptions(); }
~BlobGCJobTest() {
Close();
ResetOptions();
}

void DisableMergeSmall() { options_.merge_small_file_threshold = 0; }

Expand Down Expand Up @@ -212,7 +218,8 @@ class BlobGCJobTest : public testing::Test {
blob_index.EncodeTo(&res);
std::string key = "test_discard_entry";
WriteBatch wb;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res));
ASSERT_OK(
WriteBatchInternal::PutTitanBlobIndex(&wb, cfh->GetID(), key, res));
auto rewrite_status = base_db_->Write(WriteOptions(), &wb);

std::vector<std::shared_ptr<BlobFileMeta>> tmp;
Expand Down
8 changes: 4 additions & 4 deletions src/blob_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ Status BlobStorage::Get(const ReadOptions& options, const BlobIndex& index,

OwnedSlice blob;
Status s = file_cache_->Get(options, index.file_number, index.blob_handle,
record, &blob, for_compaction);
record, &blob);
if (!s.ok()) {
return s;
}

if (blob_cache_ && options.fill_cache) {
Cache::Handle* cache_handle = nullptr;
auto cache_value = new OwnedSlice(std::move(blob));
blob_cache_->Insert(
cache_key, cache_value, cache_value->size() + sizeof(*cache_value),
&DeleteCacheValue<OwnedSlice>, &cache_handle, Cache::Priority::BOTTOM);
blob_cache_->Insert(cache_key, cache_value, &kBlobValueCacheItemHelper,
cache_value->size() + sizeof(*cache_value),
&cache_handle, Cache::Priority::BOTTOM);
value->PinSlice(record->value, UnrefCacheHandle, blob_cache_.get(),
cache_handle);
} else {
Expand Down
13 changes: 6 additions & 7 deletions src/compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ class TitanCompactionFilter final : public CompactionFilter {

bool IsStackedBlobDbInternalCompactionFilter() const override { return true; }

Decision UnsafeFilter(int level, const Slice &key, SequenceNumber seqno,
ValueType value_type, const Slice &value,
std::string *new_value,
Decision UnsafeFilter(int level, const Slice &key, ValueType value_type,
const Slice &value, std::string *new_value,
std::string *skip_until) const override {
Status s;
Slice user_key = key;
Expand All @@ -61,12 +60,12 @@ class TitanCompactionFilter final : public CompactionFilter {
}

if (skip_value_) {
return original_filter_->UnsafeFilter(level, user_key, seqno, value_type,
return original_filter_->UnsafeFilter(level, user_key, value_type,
Slice(), new_value, skip_until);
}
if (value_type != kBlobIndex) {
return original_filter_->UnsafeFilter(level, user_key, seqno, value_type,
value, new_value, skip_until);
return original_filter_->UnsafeFilter(level, user_key, value_type, value,
new_value, skip_until);
}

BlobIndex blob_index;
Expand Down Expand Up @@ -96,7 +95,7 @@ class TitanCompactionFilter final : public CompactionFilter {
return Decision::kKeep;
} else if (s.ok()) {
auto decision = original_filter_->UnsafeFilter(
level, user_key, seqno, kValue, record.value, new_value, skip_until);
level, user_key, kValue, record.value, new_value, skip_until);

// It would be a problem if it change the value whereas the value_type
// is still kBlobIndex. For now, just returns kKeep.
Expand Down
Loading

0 comments on commit 6b8c4ed

Please sign in to comment.