diff --git a/platform/consensus/execution/BUILD b/platform/consensus/execution/BUILD index 23366749a..162a1ebb6 100644 --- a/platform/consensus/execution/BUILD +++ b/platform/consensus/execution/BUILD @@ -1,4 +1,4 @@ -package(default_visibility = ["//platform/consensus/ordering:__subpackages__"]) +package(default_visibility = ["//platform/consensus:__subpackages__"]) cc_library( name = "system_info", @@ -111,4 +111,4 @@ cc_library( "//common:comm", "//common/utils", ], -) \ No newline at end of file +) diff --git a/platform/consensus/execution/system_info.cpp b/platform/consensus/execution/system_info.cpp index 69d0ec31b..38bc502d8 100644 --- a/platform/consensus/execution/system_info.cpp +++ b/platform/consensus/execution/system_info.cpp @@ -29,6 +29,8 @@ namespace resdb { +SystemInfo::SystemInfo() : primary_id_(1), view_(1) {} + SystemInfo::SystemInfo(const ResDBConfig& config) : primary_id_(config.GetReplicaInfos()[0].id()), view_(1) { SetReplicas(config.GetReplicaInfos()); @@ -37,12 +39,7 @@ SystemInfo::SystemInfo(const ResDBConfig& config) uint32_t SystemInfo::GetPrimaryId() const { return primary_id_; } -void SystemInfo::SetPrimary(uint32_t id) { - if(primary_id_ != id){ - LOG(ERROR) << "[SetPrimary]: " << id; - } - primary_id_ = id; -} +void SystemInfo::SetPrimary(uint32_t id) { primary_id_ = id; } uint64_t SystemInfo::GetCurrentView() const { return view_; } diff --git a/platform/consensus/execution/system_info.h b/platform/consensus/execution/system_info.h index 4c06691f5..ecfde2596 100644 --- a/platform/consensus/execution/system_info.h +++ b/platform/consensus/execution/system_info.h @@ -36,6 +36,7 @@ namespace resdb { // has been agreed on, like the primary, the replicas,etc.. class SystemInfo { public: + SystemInfo(); SystemInfo(const ResDBConfig& config); virtual ~SystemInfo() = default; diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index 37bf1947b..061ead698 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -157,7 +157,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr context, return -2; } if (request->is_recovery()) { - if (request->seq() == message_manager_->GetNextSeq()) { + if (request->seq() >= message_manager_->GetNextSeq()) { message_manager_->SetNextSeq(request->seq() + 1); } return message_manager_->AddConsensusMsg(context->signature, diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp index 4727a6549..b09861bbb 100644 --- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp +++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp @@ -61,6 +61,7 @@ ConsensusManagerPBFT::ConsensusManagerPBFT( config_, checkpoint_manager_.get(), message_manager_.get(), system_info_.get(), GetBroadCastClient(), GetSignatureVerifier())), recovery_(std::make_unique(config_, checkpoint_manager_.get(), + system_info_.get(), message_manager_->GetStorage())) { LOG(INFO) << "is running is performance mode:" << config_.IsPerformanceRunning(); @@ -69,6 +70,10 @@ ConsensusManagerPBFT::ConsensusManagerPBFT( view_change_manager_->SetDuplicateManager(commitment_->GetDuplicateManager()); recovery_->ReadLogs( + [&](const SystemInfoData& data) { + system_info_->SetCurrentView(data.view()); + system_info_->SetPrimary(data.primary_id()); + }, [&](std::unique_ptr context, std::unique_ptr request) { return InternalConsensusCommit(std::move(context), std::move(request)); }); diff --git a/platform/consensus/recovery/BUILD b/platform/consensus/recovery/BUILD index 6632775e7..8426a64bf 100644 --- a/platform/consensus/recovery/BUILD +++ b/platform/consensus/recovery/BUILD @@ -9,8 +9,10 @@ cc_library( "//common/utils", "//platform/config:resdb_config", "//platform/consensus/checkpoint", + "//platform/consensus/execution:system_info", "//platform/networkstrate:server_comm", "//platform/proto:resdb_cc_proto", + "//platform/proto:system_info_data_cc_proto", ], ) diff --git a/platform/consensus/recovery/recovery.cpp b/platform/consensus/recovery/recovery.cpp index b65d8d57a..ff85ab66f 100644 --- a/platform/consensus/recovery/recovery.cpp +++ b/platform/consensus/recovery/recovery.cpp @@ -38,8 +38,11 @@ namespace resdb { Recovery::Recovery(const ResDBConfig& config, CheckPoint* checkpoint, - Storage* storage) - : config_(config), checkpoint_(checkpoint), storage_(storage) { + SystemInfo* system_info, Storage* storage) + : config_(config), + checkpoint_(checkpoint), + system_info_(system_info), + storage_(storage) { recovery_enabled_ = config_.GetConfigData().recovery_enabled(); file_path_ = config_.GetConfigData().recovery_path(); if (file_path_.empty()) { @@ -166,7 +169,6 @@ std::string Recovery::GenerateFile(int64_t seq, int64_t min_seq, } void Recovery::FinishFile(int64_t seq) { - std::string next_file_path = GenerateFile(seq, -1, -1); std::unique_lock lk(mutex_); Flush(); if (storage_) { @@ -182,6 +184,8 @@ void Recovery::FinishFile(int64_t seq) { std::rename(file_path_.c_str(), new_file_path.c_str()); + LOG(ERROR) << "rename:" << file_path_ << " to:" << new_file_path; + std::string next_file_path = GenerateFile(seq, -1, -1); file_path_ = next_file_path; OpenFile(file_path_); @@ -194,7 +198,7 @@ void Recovery::SwitchFile(const std::string& file_path) { max_seq_ = -1; ReadLogsFromFiles( - file_path, 0, + file_path, 0, 0, [&](const SystemInfoData& data) {}, [&](std::unique_ptr context, std::unique_ptr request) { min_seq_ == -1 ? min_seq_ = request->seq() @@ -203,11 +207,12 @@ void Recovery::SwitchFile(const std::string& file_path) { }); OpenFile(file_path); - LOG(ERROR) << "switch to file:" << file_path << " seq:" - << "[" << min_seq_ << "," << max_seq_ << "]"; + LOG(INFO) << "switch to file:" << file_path << " seq:" + << "[" << min_seq_ << "," << max_seq_ << "]"; } void Recovery::OpenFile(const std::string& path) { + LOG(ERROR) << "open file:" << path; if (fd_ >= 0) { close(fd_); } @@ -215,11 +220,33 @@ void Recovery::OpenFile(const std::string& path) { if (fd_ < 0) { LOG(ERROR) << "open file fail:" << path << " error:" << strerror(errno); } + + int pos = lseek(fd_, 0, SEEK_END); + LOG(INFO) << "file path:" << path << " len:" << pos; + if (pos == 0) { + WriteSystemInfo(); + } + lseek(fd_, 0, SEEK_END); LOG(ERROR) << "open file:" << path << " pos:" << lseek(fd_, 0, SEEK_CUR); assert(fd_ >= 0); } +void Recovery::WriteSystemInfo() { + int view = system_info_->GetCurrentView(); + int primary_id = system_info_->GetPrimaryId(); + LOG(ERROR) << "write system info:" << view << " primary id:" << primary_id; + SystemInfoData data; + data.set_view(view); + data.set_primary_id(primary_id); + + std::string data_str; + data.SerializeToString(&data_str); + + AppendData(data_str); + Flush(); +} + void Recovery::AddRequest(const Context* context, const Request* request) { if (recovery_enabled_ == false) { return; @@ -301,6 +328,21 @@ std::vector> Recovery::ParseData( return request_list; } +std::vector Recovery::ParseRawData(const std::string& data) { + std::vector data_list; + int pos = 0; + while (pos < data.size()) { + size_t len; + memcpy(&len, data.c_str() + pos, sizeof(len)); + pos += sizeof(len); + + std::string item = data.substr(pos, len); + pos += len; + data_list.push_back(item); + } + return data_list; +} + void Recovery::MayFlush() { if (buffer_.size() > buffer_size_) { Flush(); @@ -392,22 +434,26 @@ Recovery::GetRecoveryFiles() { return std::make_pair(list, last_ckpt); } -void Recovery::ReadLogs(std::function context, - std::unique_ptr request)> - call_back) { +void Recovery::ReadLogs( + std::function system_callback, + std::function context, + std::unique_ptr request)> + call_back) { if (recovery_enabled_ == false) { return; } std::unique_lock lk(mutex_); auto recovery_files_pair = GetRecoveryFiles(); int64_t ckpt = recovery_files_pair.second; + int idx = 0; for (auto path : recovery_files_pair.first) { - ReadLogsFromFiles(path.second, ckpt, call_back); + ReadLogsFromFiles(path.second, ckpt, idx++, system_callback, call_back); } } void Recovery::ReadLogsFromFiles( - const std::string& path, int64_t ckpt, + const std::string& path, int64_t ckpt, int file_idx, + std::function system_callback, std::function context, std::unique_ptr request)> call_back) { @@ -417,9 +463,32 @@ void Recovery::ReadLogsFromFiles( } assert(fd >= 0); + size_t data_len = 0; + Read(fd, sizeof(data_len), reinterpret_cast(&data_len)); + { + std::string data; + char* buf = new char[data_len]; + if (!Read(fd, data_len, buf)) { + LOG(ERROR) << "Read system info fail"; + return; + } + data = std::string(buf, data_len); + delete buf; + std::vector data_list = ParseRawData(data); + + SystemInfoData info; + if (data_list.empty() || !info.ParseFromString(data_list[0])) { + LOG(ERROR) << "parse info fail:" << data.size(); + return; + } + LOG(ERROR) << "read system info:" << info.DebugString(); + if (file_idx == 0) { + system_callback(info); + } + } + std::vector> request_list; - size_t data_len = 0; while (Read(fd, sizeof(data_len), reinterpret_cast(&data_len))) { std::string data; char* buf = new char[data_len]; diff --git a/platform/consensus/recovery/recovery.h b/platform/consensus/recovery/recovery.h index 3d7cd27d1..f8b0e8473 100644 --- a/platform/consensus/recovery/recovery.h +++ b/platform/consensus/recovery/recovery.h @@ -30,19 +30,23 @@ #include "chain/storage/storage.h" #include "platform/config/resdb_config.h" #include "platform/consensus/checkpoint/checkpoint.h" +#include "platform/consensus/execution/system_info.h" #include "platform/networkstrate/server_comm.h" #include "platform/proto/resdb.pb.h" +#include "platform/proto/system_info_data.pb.h" namespace resdb { class Recovery { public: - Recovery(const ResDBConfig& config, CheckPoint* checkpoint, Storage* storage); + Recovery(const ResDBConfig& config, CheckPoint* checkpoint, + SystemInfo* system_info, Storage* storage); virtual ~Recovery(); virtual void AddRequest(const Context* context, const Request* request); - void ReadLogs(std::function context, + void ReadLogs(std::function system_callback, + std::function context, std::unique_ptr request)> call_back); @@ -58,6 +62,7 @@ class Recovery { void WriteLog(const Context* context, const Request* request); void AppendData(const std::string& data); std::vector> ParseData(const std::string& data); + std::vector ParseRawData(const std::string& data); void Flush(); void MayFlush(); @@ -66,6 +71,7 @@ class Recovery { std::string GenerateFile(int64_t seq, int64_t min_seq, int64_t max_seq); void GetLastFile(); + void WriteSystemInfo(); void OpenFile(const std::string& path); void FinishFile(int64_t seq); @@ -74,10 +80,12 @@ class Recovery { void UpdateStableCheckPoint(); std::pair>, int64_t> GetRecoveryFiles(); - void ReadLogsFromFiles(const std::string& path, int64_t ckpt, - std::function context, - std::unique_ptr request)> - call_back); + void ReadLogsFromFiles( + const std::string& path, int64_t ckpt, int file_idx, + std::function system_callback, + std::function context, + std::unique_ptr request)> + call_back); protected: ResDBConfig config_; @@ -95,6 +103,7 @@ class Recovery { std::mutex ckpt_mutex_; std::atomic stop_; int recovery_ckpt_time_s_; + SystemInfo* system_info_; Storage* storage_; }; diff --git a/platform/consensus/recovery/recovery_test.cpp b/platform/consensus/recovery/recovery_test.cpp index d561fd593..f02cb9dff 100644 --- a/platform/consensus/recovery/recovery_test.cpp +++ b/platform/consensus/recovery/recovery_test.cpp @@ -57,10 +57,10 @@ ResConfigData GetConfigData(int buf_size = 10) { return data; } -std::vector Listlogs(const std::string& path) { +std::vector Listlogs(const std::string &path) { std::vector ret; std::string dir = std::filesystem::path(path).parent_path(); - for (const auto& entry : std::filesystem::directory_iterator(dir)) { + for (const auto &entry : std::filesystem::directory_iterator(dir)) { LOG(ERROR) << "path:" << entry.path(); ret.push_back(entry.path()); } @@ -70,13 +70,15 @@ std::vector Listlogs(const std::string& path) { class RecoveryTest : public Test { public: RecoveryTest() - : config_(GetConfigData(), ReplicaInfo(), KeyInfo(), CertificateInfo()) { + : config_(GetConfigData(), ReplicaInfo(), KeyInfo(), CertificateInfo()), + system_info_() { std::string dir = std::filesystem::path(log_path).parent_path(); std::filesystem::remove_all(dir); } protected: ResDBConfig config_; + SystemInfo system_info_; MockCheckPoint checkpoint_; }; @@ -91,7 +93,7 @@ TEST_F(RecoveryTest, ReadLog) { }; { - Recovery recovery(config_, &checkpoint_, nullptr); + Recovery recovery(config_, &checkpoint_, &system_info_, nullptr); for (int t : types) { std::unique_ptr request = @@ -102,8 +104,9 @@ TEST_F(RecoveryTest, ReadLog) { } { std::vector list; - Recovery recovery(config_, &checkpoint_, nullptr); + Recovery recovery(config_, &checkpoint_, &system_info_, nullptr); recovery.ReadLogs( + [&](const SystemInfoData &data) {}, [&](std::unique_ptr context, std::unique_ptr request) { list.push_back(*request); }); @@ -129,7 +132,7 @@ TEST_F(RecoveryTest, ReadLog_FlushOnce) { }; { - Recovery recovery(config, &checkpoint_, nullptr); + Recovery recovery(config, &checkpoint_, &system_info_, nullptr); for (int t : types) { std::unique_ptr request = @@ -140,12 +143,13 @@ TEST_F(RecoveryTest, ReadLog_FlushOnce) { } { std::vector list; - Recovery recovery(config, &checkpoint_, nullptr); - recovery.ReadLogs([&](std::unique_ptr context, + Recovery recovery(config, &checkpoint_, &system_info_, nullptr); + recovery.ReadLogs([&](const SystemInfoData &data) {}, + [&](std::unique_ptr context, std::unique_ptr request) { - LOG(ERROR) << "call back:" << request->seq(); - list.push_back(*request); - }); + LOG(ERROR) << "call back:" << request->seq(); + list.push_back(*request); + }); EXPECT_EQ(list.size(), expected_types.size()); @@ -180,7 +184,7 @@ TEST_F(RecoveryTest, CheckPoint) { })); { - Recovery recovery(config, &checkpoint_, nullptr); + Recovery recovery(config, &checkpoint_, &system_info_, nullptr); for (int i = 1; i < 10; ++i) { for (int t : types) { @@ -205,12 +209,13 @@ TEST_F(RecoveryTest, CheckPoint) { EXPECT_EQ(log_list.size(), 2); { std::vector list; - Recovery recovery(config, &checkpoint_, nullptr); - recovery.ReadLogs([&](std::unique_ptr context, + Recovery recovery(config, &checkpoint_, &system_info_, nullptr); + recovery.ReadLogs([&](const SystemInfoData &data) {}, + [&](std::unique_ptr context, std::unique_ptr request) { - list.push_back(*request); - // LOG(ERROR)<<"call back:"<seq(); - }); + list.push_back(*request); + // LOG(ERROR)<<"call back:"<seq(); + }); EXPECT_EQ(list.size(), types.size() * 14); @@ -256,7 +261,7 @@ TEST_F(RecoveryTest, CheckPoint2) { })); { - Recovery recovery(config, &checkpoint_, &storage); + Recovery recovery(config, &checkpoint_, &system_info_, &storage); for (int i = 1; i < 10; ++i) { for (int t : types) { @@ -281,12 +286,13 @@ TEST_F(RecoveryTest, CheckPoint2) { EXPECT_EQ(log_list.size(), 2); { std::vector list; - Recovery recovery(config, &checkpoint_, &storage); - recovery.ReadLogs([&](std::unique_ptr context, + Recovery recovery(config, &checkpoint_, &system_info_, &storage); + recovery.ReadLogs([&](const SystemInfoData &data) {}, + [&](std::unique_ptr context, std::unique_ptr request) { - list.push_back(*request); - // LOG(ERROR)<<"call back:"<seq(); - }); + list.push_back(*request); + // LOG(ERROR)<<"call back:"<seq(); + }); EXPECT_EQ(list.size(), types.size() * 14); @@ -317,12 +323,13 @@ TEST_F(RecoveryTest, CheckPoint2) { { std::vector list; - Recovery recovery(config, &checkpoint_, &storage); - recovery.ReadLogs([&](std::unique_ptr context, + Recovery recovery(config, &checkpoint_, &system_info_, &storage); + recovery.ReadLogs([&](const SystemInfoData &data) {}, + [&](std::unique_ptr context, std::unique_ptr request) { - list.push_back(*request); - // LOG(ERROR)<<"call back:"<seq(); - }); + list.push_back(*request); + // LOG(ERROR)<<"call back:"<seq(); + }); EXPECT_EQ(list.size(), types.size() * 9); @@ -334,6 +341,128 @@ TEST_F(RecoveryTest, CheckPoint2) { } } +TEST_F(RecoveryTest, SystemInfo) { + ResDBConfig config(GetConfigData(1024), ReplicaInfo(), KeyInfo(), + CertificateInfo()); + MockStorage storage; + EXPECT_CALL(storage, Flush).Times(2).WillRepeatedly(Return(true)); + + std::vector types = {Request::TYPE_PRE_PREPARE, Request::TYPE_PREPARE, + Request::TYPE_COMMIT}; + + std::vector expected_types = { + Request::TYPE_PRE_PREPARE, Request::TYPE_PREPARE, Request::TYPE_COMMIT}; + + std::promise insert_done, ckpt, insert_done2, ckpt2; + std::future insert_done_future = insert_done.get_future(), + ckpt_future = ckpt.get_future(); + std::future insert_done2_future = insert_done2.get_future(); + std::future ckpt_future2 = ckpt2.get_future(); + int time = 1; + EXPECT_CALL(checkpoint_, GetStableCheckpoint()).WillRepeatedly(Invoke([&]() { + if (time == 1) { + insert_done_future.get(); + } else if (time == 2) { + ckpt.set_value(true); + } else if (time == 3) { + insert_done2_future.get(); + } else if (time == 4) { + ckpt2.set_value(true); + } + time++; + if (time > 3) { + return 25; + } + return 5; + })); + + { + Recovery recovery(config, &checkpoint_, &system_info_, &storage); + system_info_.SetCurrentView(2); + system_info_.SetPrimary(2); + + for (int i = 1; i < 10; ++i) { + for (int t : types) { + std::unique_ptr request = + NewRequest(static_cast(t), Request(), i); + request->set_seq(i); + recovery.AddRequest(nullptr, request.get()); + } + } + insert_done.set_value(true); + ckpt_future.get(); + for (int i = 10; i < 20; ++i) { + for (int t : types) { + std::unique_ptr request = + NewRequest(static_cast(t), Request(), i); + request->set_seq(i); + recovery.AddRequest(nullptr, request.get()); + } + } + } + std::vector log_list = Listlogs(log_path); + EXPECT_EQ(log_list.size(), 2); + { + std::vector list; + SystemInfoData data; + Recovery recovery(config, &checkpoint_, &system_info_, &storage); + recovery.ReadLogs([&](const SystemInfoData &r_data) { data = r_data; }, + [&](std::unique_ptr context, + std::unique_ptr request) { + list.push_back(*request); + // LOG(ERROR)<<"call back:"<seq(); + }); + + EXPECT_EQ(list.size(), types.size() * 14); + + for (int i = 0; i < expected_types.size(); ++i) { + EXPECT_EQ(list[i].type(), expected_types[i]); + } + + for (int i = 20; i < 30; ++i) { + for (int t : types) { + std::unique_ptr request = + NewRequest(static_cast(t), Request(), i); + request->set_seq(i); + recovery.AddRequest(nullptr, request.get()); + } + } + insert_done2.set_value(true); + ckpt_future2.get(); + + for (int i = 30; i < 35; ++i) { + for (int t : types) { + std::unique_ptr request = + NewRequest(static_cast(t), Request(), i); + request->set_seq(i); + recovery.AddRequest(nullptr, request.get()); + } + } + } + + { + std::vector list; + SystemInfoData data; + Recovery recovery(config, &checkpoint_, &system_info_, &storage); + recovery.ReadLogs([&](const SystemInfoData &r_data) { data = r_data; }, + [&](std::unique_ptr context, + std::unique_ptr request) { + list.push_back(*request); + // LOG(ERROR)<<"call back:"<seq(); + }); + + EXPECT_EQ(data.view(), 2); + EXPECT_EQ(data.primary_id(), 2); + EXPECT_EQ(list.size(), types.size() * 9); + + for (int i = 0; i < expected_types.size(); ++i) { + EXPECT_EQ(list[i].type(), expected_types[i]); + } + EXPECT_EQ(recovery.GetMinSeq(), 30); + EXPECT_EQ(recovery.GetMaxSeq(), 34); + } +} + } // namespace } // namespace resdb diff --git a/platform/proto/BUILD b/platform/proto/BUILD index c71b8f728..7b26b2abf 100644 --- a/platform/proto/BUILD +++ b/platform/proto/BUILD @@ -131,3 +131,15 @@ cc_proto_library( name = "broadcast_cc_proto", deps = [":broadcast_proto"], ) + +proto_library( + name = "system_info_data_proto", + srcs = ["system_info_data.proto"], +) + +cc_proto_library( + name = "system_info_data_cc_proto", + deps = [ + ":system_info_data_proto", + ], +) diff --git a/platform/proto/system_info_data.proto b/platform/proto/system_info_data.proto new file mode 100644 index 000000000..a8452f6ba --- /dev/null +++ b/platform/proto/system_info_data.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package resdb; + +message SystemInfoData { + int32 view = 1; + int32 primary_id = 2; +}