Skip to content

Commit

Permalink
Enrich write batch (#379)
Browse files Browse the repository at this point in the history
 

Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: tabokie <[email protected]>
Signed-off-by: Yang Zhang <[email protected]>

Co-authored-by: Wallace <[email protected]>
  • Loading branch information
v01dstar and Little-Wallace authored Sep 27, 2024
1 parent 6506e42 commit 96574bd
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
36 changes: 36 additions & 0 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3100,6 +3100,16 @@ Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
return Status::OK();
}

Status WriteBatchInternal::AppendContents(WriteBatch* dst,
const Slice& content) {
size_t src_len = content.size() - WriteBatchInternal::kHeader;
SetCount(dst, Count(dst) + DecodeFixed32(content.data() + 8));
assert(content.size() >= WriteBatchInternal::kHeader);
dst->rep_.append(content.data() + WriteBatchInternal::kHeader, src_len);
dst->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
return Status::OK();
}

Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
const bool wal_only) {
assert(dst->Count() == 0 ||
Expand Down Expand Up @@ -3191,4 +3201,30 @@ Status WriteBatchInternal::UpdateProtectionInfo(WriteBatch* wb,
"WriteBatch protection info must be zero or eight bytes/key");
}

void WriteBatch::Iterator::SeekToFirst() {
input_ = rep_;
if (input_.size() < WriteBatchInternal::kHeader) {
valid_ = false;
return;
}
input_.remove_prefix(WriteBatchInternal::kHeader);
valid_ = true;
Next();
}

void WriteBatch::Iterator::Next() {
if (input_.empty() || !valid_) {
valid_ = false;
return;
}
Slice blob, xid;
Status s = ReadRecordFromWriteBatch(&input_, &tag_, &column_family_, &key_,
&value_, &blob, &xid);
valid_ = s.ok();
}

int WriteBatch::WriteBatchRef::Count() const {
return DecodeFixed32(rep_.data() + 8);
}

} // namespace ROCKSDB_NAMESPACE
2 changes: 2 additions & 0 deletions db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ class WriteBatchInternal {
static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);

static Status AppendContents(WriteBatch* dst, const Slice& content);

// Returns the byte size of appending a WriteBatch with ByteSize
// leftByteSize and a WriteBatch with ByteSize rightByteSize
static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
Expand Down
40 changes: 40 additions & 0 deletions include/rocksdb/write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ class WriteBatch : public WriteBatchBase {
}
};
Status Iterate(Handler* handler) const;
class Iterator;
Iterator* NewIterator() const { return new Iterator(rep_); }

// Retrieve the serialized version of this batch.
const std::string& Data() const { return rep_; }
Expand Down Expand Up @@ -506,6 +508,44 @@ class WriteBatch : public WriteBatchBase {

protected:
std::string rep_; // See comment in write_batch.cc for the format of rep_
public:
class Iterator {
private:
Slice rep_;
Slice input_;
Slice key_;
Slice value_;
uint32_t column_family_;
char tag_;
bool valid_;

public:
explicit Iterator(const Slice& rep) : rep_(rep), valid_(false) {}

bool Valid() const { return valid_; }

Slice Key() const { return key_; }

Slice Value() const { return value_; }

uint32_t GetColumnFamilyId() const { return column_family_; }

char GetValueType() const { return tag_; };

void SeekToFirst();

void Next();
};
class WriteBatchRef {
public:
explicit WriteBatchRef(const Slice& rep) : rep_(rep) {}
Iterator* NewIterator() const { return new Iterator(rep_); }

int Count() const;

private:
const Slice& rep_;
};
};

} // namespace ROCKSDB_NAMESPACE

0 comments on commit 96574bd

Please sign in to comment.