Skip to content

Commit

Permalink
add system info in local recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
cjcchen committed Aug 21, 2023
1 parent 91109b1 commit 6006713
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 55 deletions.
4 changes: 2 additions & 2 deletions platform/consensus/execution/BUILD
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package(default_visibility = ["//platform/consensus/ordering:__subpackages__"])
package(default_visibility = ["//platform/consensus:__subpackages__"])

cc_library(
name = "system_info",
Expand Down Expand Up @@ -111,4 +111,4 @@ cc_library(
"//common:comm",
"//common/utils",
],
)
)
9 changes: 3 additions & 6 deletions platform/consensus/execution/system_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

namespace resdb {

SystemInfo::SystemInfo() : primary_id_(1), view_(1) {}

SystemInfo::SystemInfo(const ResDBConfig& config)
: primary_id_(config.GetReplicaInfos()[0].id()), view_(1) {
SetReplicas(config.GetReplicaInfos());
Expand All @@ -37,12 +39,7 @@ SystemInfo::SystemInfo(const ResDBConfig& config)

uint32_t SystemInfo::GetPrimaryId() const { return primary_id_; }

void SystemInfo::SetPrimary(uint32_t id) {
if(primary_id_ != id){
LOG(ERROR) << "[SetPrimary]: " << id;
}
primary_id_ = id;
}
void SystemInfo::SetPrimary(uint32_t id) { primary_id_ = id; }

uint64_t SystemInfo::GetCurrentView() const { return view_; }

Expand Down
1 change: 1 addition & 0 deletions platform/consensus/execution/system_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace resdb {
// has been agreed on, like the primary, the replicas,etc..
class SystemInfo {
public:
SystemInfo();
SystemInfo(const ResDBConfig& config);
virtual ~SystemInfo() = default;

Expand Down
2 changes: 1 addition & 1 deletion platform/consensus/ordering/pbft/commitment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
return -2;
}
if (request->is_recovery()) {
if (request->seq() == message_manager_->GetNextSeq()) {
if (request->seq() >= message_manager_->GetNextSeq()) {
message_manager_->SetNextSeq(request->seq() + 1);
}
return message_manager_->AddConsensusMsg(context->signature,
Expand Down
5 changes: 5 additions & 0 deletions platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ ConsensusManagerPBFT::ConsensusManagerPBFT(
config_, checkpoint_manager_.get(), message_manager_.get(),
system_info_.get(), GetBroadCastClient(), GetSignatureVerifier())),
recovery_(std::make_unique<Recovery>(config_, checkpoint_manager_.get(),
system_info_.get(),
message_manager_->GetStorage())) {
LOG(INFO) << "is running is performance mode:"
<< config_.IsPerformanceRunning();
Expand All @@ -69,6 +70,10 @@ ConsensusManagerPBFT::ConsensusManagerPBFT(
view_change_manager_->SetDuplicateManager(commitment_->GetDuplicateManager());

recovery_->ReadLogs(
[&](const SystemInfoData& data) {
system_info_->SetCurrentView(data.view());
system_info_->SetPrimary(data.primary_id());
},
[&](std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
return InternalConsensusCommit(std::move(context), std::move(request));
});
Expand Down
2 changes: 2 additions & 0 deletions platform/consensus/recovery/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ cc_library(
"//common/utils",
"//platform/config:resdb_config",
"//platform/consensus/checkpoint",
"//platform/consensus/execution:system_info",
"//platform/networkstrate:server_comm",
"//platform/proto:resdb_cc_proto",
"//platform/proto:system_info_data_cc_proto",
],
)

Expand Down
93 changes: 81 additions & 12 deletions platform/consensus/recovery/recovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
namespace resdb {

Recovery::Recovery(const ResDBConfig& config, CheckPoint* checkpoint,
Storage* storage)
: config_(config), checkpoint_(checkpoint), storage_(storage) {
SystemInfo* system_info, Storage* storage)
: config_(config),
checkpoint_(checkpoint),
system_info_(system_info),
storage_(storage) {
recovery_enabled_ = config_.GetConfigData().recovery_enabled();
file_path_ = config_.GetConfigData().recovery_path();
if (file_path_.empty()) {
Expand Down Expand Up @@ -166,7 +169,6 @@ std::string Recovery::GenerateFile(int64_t seq, int64_t min_seq,
}

void Recovery::FinishFile(int64_t seq) {
std::string next_file_path = GenerateFile(seq, -1, -1);
std::unique_lock<std::mutex> lk(mutex_);
Flush();
if (storage_) {
Expand All @@ -182,6 +184,8 @@ void Recovery::FinishFile(int64_t seq) {

std::rename(file_path_.c_str(), new_file_path.c_str());

LOG(ERROR) << "rename:" << file_path_ << " to:" << new_file_path;
std::string next_file_path = GenerateFile(seq, -1, -1);
file_path_ = next_file_path;

OpenFile(file_path_);
Expand All @@ -194,7 +198,7 @@ void Recovery::SwitchFile(const std::string& file_path) {
max_seq_ = -1;

ReadLogsFromFiles(
file_path, 0,
file_path, 0, 0, [&](const SystemInfoData& data) {},
[&](std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
min_seq_ == -1
? min_seq_ = request->seq()
Expand All @@ -203,23 +207,46 @@ void Recovery::SwitchFile(const std::string& file_path) {
});

OpenFile(file_path);
LOG(ERROR) << "switch to file:" << file_path << " seq:"
<< "[" << min_seq_ << "," << max_seq_ << "]";
LOG(INFO) << "switch to file:" << file_path << " seq:"
<< "[" << min_seq_ << "," << max_seq_ << "]";
}

void Recovery::OpenFile(const std::string& path) {
LOG(ERROR) << "open file:" << path;
if (fd_ >= 0) {
close(fd_);
}
fd_ = open(path.c_str(), O_CREAT | O_WRONLY, 0666);
if (fd_ < 0) {
LOG(ERROR) << "open file fail:" << path << " error:" << strerror(errno);
}

int pos = lseek(fd_, 0, SEEK_END);
LOG(INFO) << "file path:" << path << " len:" << pos;
if (pos == 0) {
WriteSystemInfo();
}

lseek(fd_, 0, SEEK_END);
LOG(ERROR) << "open file:" << path << " pos:" << lseek(fd_, 0, SEEK_CUR);
assert(fd_ >= 0);
}

void Recovery::WriteSystemInfo() {
int view = system_info_->GetCurrentView();
int primary_id = system_info_->GetPrimaryId();
LOG(ERROR) << "write system info:" << view << " primary id:" << primary_id;
SystemInfoData data;
data.set_view(view);
data.set_primary_id(primary_id);

std::string data_str;
data.SerializeToString(&data_str);

AppendData(data_str);
Flush();
}

void Recovery::AddRequest(const Context* context, const Request* request) {
if (recovery_enabled_ == false) {
return;
Expand Down Expand Up @@ -301,6 +328,21 @@ std::vector<std::unique_ptr<Recovery::RecoveryData>> Recovery::ParseData(
return request_list;
}

std::vector<std::string> Recovery::ParseRawData(const std::string& data) {
std::vector<std::string> data_list;
int pos = 0;
while (pos < data.size()) {
size_t len;
memcpy(&len, data.c_str() + pos, sizeof(len));
pos += sizeof(len);

std::string item = data.substr(pos, len);
pos += len;
data_list.push_back(item);
}
return data_list;
}

void Recovery::MayFlush() {
if (buffer_.size() > buffer_size_) {
Flush();
Expand Down Expand Up @@ -392,22 +434,26 @@ Recovery::GetRecoveryFiles() {
return std::make_pair(list, last_ckpt);
}

void Recovery::ReadLogs(std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back) {
void Recovery::ReadLogs(
std::function<void(const SystemInfoData& data)> system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back) {
if (recovery_enabled_ == false) {
return;
}
std::unique_lock<std::mutex> lk(mutex_);
auto recovery_files_pair = GetRecoveryFiles();
int64_t ckpt = recovery_files_pair.second;
int idx = 0;
for (auto path : recovery_files_pair.first) {
ReadLogsFromFiles(path.second, ckpt, call_back);
ReadLogsFromFiles(path.second, ckpt, idx++, system_callback, call_back);
}
}

void Recovery::ReadLogsFromFiles(
const std::string& path, int64_t ckpt,
const std::string& path, int64_t ckpt, int file_idx,
std::function<void(const SystemInfoData& data)> system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back) {
Expand All @@ -417,9 +463,32 @@ void Recovery::ReadLogsFromFiles(
}
assert(fd >= 0);

size_t data_len = 0;
Read(fd, sizeof(data_len), reinterpret_cast<char*>(&data_len));
{
std::string data;
char* buf = new char[data_len];
if (!Read(fd, data_len, buf)) {
LOG(ERROR) << "Read system info fail";
return;
}
data = std::string(buf, data_len);
delete buf;
std::vector<std::string> data_list = ParseRawData(data);

SystemInfoData info;
if (data_list.empty() || !info.ParseFromString(data_list[0])) {
LOG(ERROR) << "parse info fail:" << data.size();
return;
}
LOG(ERROR) << "read system info:" << info.DebugString();
if (file_idx == 0) {
system_callback(info);
}
}

std::vector<std::unique_ptr<RecoveryData>> request_list;

size_t data_len = 0;
while (Read(fd, sizeof(data_len), reinterpret_cast<char*>(&data_len))) {
std::string data;
char* buf = new char[data_len];
Expand Down
21 changes: 15 additions & 6 deletions platform/consensus/recovery/recovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@
#include "chain/storage/storage.h"
#include "platform/config/resdb_config.h"
#include "platform/consensus/checkpoint/checkpoint.h"
#include "platform/consensus/execution/system_info.h"
#include "platform/networkstrate/server_comm.h"
#include "platform/proto/resdb.pb.h"
#include "platform/proto/system_info_data.pb.h"

namespace resdb {

class Recovery {
public:
Recovery(const ResDBConfig& config, CheckPoint* checkpoint, Storage* storage);
Recovery(const ResDBConfig& config, CheckPoint* checkpoint,
SystemInfo* system_info, Storage* storage);
virtual ~Recovery();

virtual void AddRequest(const Context* context, const Request* request);

void ReadLogs(std::function<void(std::unique_ptr<Context> context,
void ReadLogs(std::function<void(const SystemInfoData& data)> system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back);

Expand All @@ -58,6 +62,7 @@ class Recovery {
void WriteLog(const Context* context, const Request* request);
void AppendData(const std::string& data);
std::vector<std::unique_ptr<RecoveryData>> ParseData(const std::string& data);
std::vector<std::string> ParseRawData(const std::string& data);
void Flush();
void MayFlush();

Expand All @@ -66,6 +71,7 @@ class Recovery {

std::string GenerateFile(int64_t seq, int64_t min_seq, int64_t max_seq);
void GetLastFile();
void WriteSystemInfo();

void OpenFile(const std::string& path);
void FinishFile(int64_t seq);
Expand All @@ -74,10 +80,12 @@ class Recovery {
void UpdateStableCheckPoint();
std::pair<std::vector<std::pair<int64_t, std::string>>, int64_t>
GetRecoveryFiles();
void ReadLogsFromFiles(const std::string& path, int64_t ckpt,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back);
void ReadLogsFromFiles(
const std::string& path, int64_t ckpt, int file_idx,
std::function<void(const SystemInfoData& data)> system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back);

protected:
ResDBConfig config_;
Expand All @@ -95,6 +103,7 @@ class Recovery {
std::mutex ckpt_mutex_;
std::atomic<bool> stop_;
int recovery_ckpt_time_s_;
SystemInfo* system_info_;
Storage* storage_;
};

Expand Down
Loading

0 comments on commit 6006713

Please sign in to comment.