Skip to content

Commit

Permalink
[fix](move-memtable) abstract multi-streams to one logical stream (ap…
Browse files Browse the repository at this point in the history
…ache#42039)

## Proposed changes
Currently, an upstream BE (sink_v2) will open multiple streams to a
downstream BE (load_stream).
If any of the streams fails, the use_cnt on the downstream BE will be
messed up.
The load_stream will not report any success tablets to the sink_v2 since
in its view there are still unfinished streams.

So fault tolerance when open streams is not meaningful in practical, and
may cause data lost.
i.e. Upstream think there is still working streams to transfer data, but
downstream does not report any commit info.

This PR removes fault tolerance when open multiple streams to the same
backend.
If any of the open fails, the upstream sink_v2 should mark the
downstream BE as failed replicas.
  • Loading branch information
kaijchen authored Oct 22, 2024
1 parent b7faf57 commit 9e5f9cb
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 145 deletions.
50 changes: 16 additions & 34 deletions be/src/vec/sink/load_stream_map_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,20 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams,
DCHECK(num_use > 0) << "use num should be greater than 0";
}

std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) {
std::shared_ptr<LoadStreamStubs> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
std::shared_ptr<LoadStreamStubs> streams = _streams_for_node[dst_id];
if (streams != nullptr) {
return streams;
}
streams = std::make_shared<Streams>();
for (int i = 0; i < _num_streams; i++) {
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index,
_enable_unique_mow_for_index, incremental));
}
streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id, _src_id,
_tablet_schema_for_index,
_enable_unique_mow_for_index, incremental);
_streams_for_node[dst_id] = streams;
return streams;
}

std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) {
std::shared_ptr<LoadStreamStubs> LoadStreamMap::at(int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
return _streams_for_node.at(dst_id);
}
Expand All @@ -60,7 +58,7 @@ bool LoadStreamMap::contains(int64_t dst_id) {
return _streams_for_node.contains(dst_id);
}

void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
void LoadStreamMap::for_each(std::function<void(int64_t, LoadStreamStubs&)> fn) {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
Expand All @@ -71,7 +69,7 @@ void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
}
}

Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const Streams&)> fn) {
Status LoadStreamMap::for_each_st(std::function<Status(int64_t, LoadStreamStubs&)> fn) {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
Expand Down Expand Up @@ -108,38 +106,22 @@ bool LoadStreamMap::release() {
}

void LoadStreamMap::close_load(bool incremental) {
auto st = for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status {
for (auto& [dst_id, streams] : _streams_for_node) {
if (streams->is_incremental()) {
continue;
}
std::vector<PTabletID> tablets_to_commit;
const auto& tablets = _tablets_to_commit[dst_id];
tablets_to_commit.reserve(tablets.size());
for (const auto& [tablet_id, tablet] : tablets) {
tablets_to_commit.push_back(tablet);
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
}
Status status = Status::OK();
bool first = true;
for (auto& stream : streams) {
if (stream->is_incremental() != incremental) {
continue;
}
if (first) {
auto st = stream->close_load(tablets_to_commit);
if (!st.ok() && status.ok()) {
status = st;
}
first = false;
} else {
auto st = stream->close_load({});
if (!st.ok() && status.ok()) {
status = st;
}
}
auto st = streams->close_load(tablets_to_commit);
if (!st.ok()) {
LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental")
<< " streams failed: " << st << ", load_id=" << _load_id;
}
return status;
});
if (!st.ok()) {
LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental")
<< " streams failed: " << st << ", load_id=" << _load_id;
}
}

Expand Down
12 changes: 5 additions & 7 deletions be/src/vec/sink/load_stream_map_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,20 @@ class LoadStreamStub;

class LoadStreamMapPool;

using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;

class LoadStreamMap {
public:
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamMapPool* pool);

std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental = false);
std::shared_ptr<LoadStreamStubs> get_or_create(int64_t dst_id, bool incremental = false);

std::shared_ptr<Streams> at(int64_t dst_id);
std::shared_ptr<LoadStreamStubs> at(int64_t dst_id);

bool contains(int64_t dst_id);

void for_each(std::function<void(int64_t, const Streams&)> fn);
void for_each(std::function<void(int64_t, LoadStreamStubs&)> fn);

Status for_each_st(std::function<Status(int64_t, const Streams&)> fn);
Status for_each_st(std::function<Status(int64_t, LoadStreamStubs&)> fn);

void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& tablets_to_commit);

Expand All @@ -106,7 +104,7 @@ class LoadStreamMap {
const int _num_streams;
std::atomic<int> _use_cnt;
std::mutex _mutex;
std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> _streams_for_node;
LoadStreamMapPool* _pool = nullptr;
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
Expand Down
66 changes: 65 additions & 1 deletion be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port
<< ", " << *this;
_is_open.store(true);
return Status::OK();
_status = Status::OK();
return _status;
}

// APPEND_DATA
Expand Down Expand Up @@ -505,4 +506,67 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub)
return ostr;
}

Status LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
int64_t idle_timeout_ms, bool enable_profile) {
bool get_schema = true;
auto status = Status::OK();
for (auto& stream : _streams) {
Status st;
if (get_schema) {
st = stream->open(client_cache, node_info, txn_id, schema, tablets_for_schema,
total_streams, idle_timeout_ms, enable_profile);
} else {
st = stream->open(client_cache, node_info, txn_id, schema, {}, total_streams,
idle_timeout_ms, enable_profile);
}
if (st.ok()) {
get_schema = false;
} else {
LOG(WARNING) << "open stream failed: " << st << "; stream: " << *stream;
status = st;
// no break here to try get schema from the rest streams
}
}
// only mark open when all streams open success
_open_success.store(status.ok());
// cancel all streams if open failed
if (!status.ok()) {
cancel(status);
}
return status;
}

Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_commit) {
if (!_open_success.load()) {
return Status::InternalError("streams not open");
}
bool first = true;
auto status = Status::OK();
for (auto& stream : _streams) {
Status st;
if (first) {
st = stream->close_load(tablets_to_commit);
first = false;
} else {
st = stream->close_load({});
}
if (!st.ok()) {
LOG(WARNING) << "close_load failed: " << st << "; stream: " << *stream;
}
}
return status;
}

Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) {
MonotonicStopWatch watch;
watch.start();
for (auto& stream : _streams) {
RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - watch.elapsed_time() / 1000 / 1000));
}
return Status::OK();
}

} // namespace doris
69 changes: 69 additions & 0 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,73 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
bool _is_incremental = false;
};

// a collection of LoadStreams connect to the same node
class LoadStreamStubs {
public:
LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false)
: _is_incremental(incremental) {
_streams.reserve(num_streams);
for (size_t i = 0; i < num_streams; i++) {
_streams.emplace_back(
new LoadStreamStub(load_id, src_id, schema_map, mow_map, incremental));
}
}

Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
int64_t idle_timeout_ms, bool enable_profile);

bool is_incremental() const { return _is_incremental; }

size_t size() const { return _streams.size(); }

// for UT only
void mark_open() { _open_success.store(true); }

std::shared_ptr<LoadStreamStub> select_one_stream() {
if (!_open_success.load()) {
return nullptr;
}
size_t i = _select_index.fetch_add(1);
return _streams[i % _streams.size()];
}

void cancel(Status reason) {
for (auto& stream : _streams) {
stream->cancel(reason);
}
}

Status close_load(const std::vector<PTabletID>& tablets_to_commit);

Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);

std::unordered_set<int64_t> success_tablets() {
std::unordered_set<int64_t> s;
for (auto& stream : _streams) {
auto v = stream->success_tablets();
std::copy(v.begin(), v.end(), std::inserter(s, s.end()));
}
return s;
}

std::unordered_map<int64_t, Status> failed_tablets() {
std::unordered_map<int64_t, Status> m;
for (auto& stream : _streams) {
auto v = stream->failed_tablets();
m.insert(v.begin(), v.end());
}
return m;
}

private:
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
std::atomic<bool> _open_success = false;
std::atomic<size_t> _select_index = 0;
const bool _is_incremental;
};

} // namespace doris
Loading

0 comments on commit 9e5f9cb

Please sign in to comment.