[fix](move-memtable) close wait on all sinks (#33710)

This commit is contained in:
Kaijie Chen
2024-04-17 10:40:17 +08:00
committed by yiguolei
parent f52067415b
commit 89c4fa5a75
2 changed files with 19 additions and 5 deletions

View File

@ -61,15 +61,23 @@ bool LoadStreamMap::contains(int64_t dst_id) {
}
void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
std::lock_guard<std::mutex> lock(_mutex);
for (auto& [dst_id, streams] : _streams_for_node) {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
snapshot = _streams_for_node;
}
for (auto& [dst_id, streams] : snapshot) {
fn(dst_id, *streams);
}
}
Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const Streams&)> fn) {
std::lock_guard<std::mutex> lock(_mutex);
for (auto& [dst_id, streams] : _streams_for_node) {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
snapshot = _streams_for_node;
}
for (auto& [dst_id, streams] : snapshot) {
RETURN_IF_ERROR(fn(dst_id, *streams));
}
return Status::OK();

View File

@ -546,9 +546,15 @@ Status VTabletWriterV2::close(Status exec_status) {
LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink
<< ", load_id=" << print_id(_load_id);
// send CLOSE_LOAD and close_wait on all streams
// send CLOSE_LOAD on all streams if this is the last sink
if (is_last_sink) {
RETURN_IF_ERROR(_load_stream_map->close_load());
}
// close_wait on all streams, even if this is not the last sink.
// because some per-instance data structures are now shared among all sinks
// due to sharing delta writers and load stream stubs.
{
SCOPED_TIMER(_close_load_timer);
RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t dst_id,
const Streams& streams) -> Status {