From 89c4fa5a75876035db0394bd877e1894516c2b63 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 17 Apr 2024 10:40:17 +0800 Subject: [PATCH] [fix](move-memtable) close wait on all sinks (#33710) --- be/src/vec/sink/load_stream_map_pool.cpp | 16 ++++++++++++---- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 8 +++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index f335f05e16..fdcfe190db 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -61,15 +61,23 @@ bool LoadStreamMap::contains(int64_t dst_id) { } void LoadStreamMap::for_each(std::function fn) { - std::lock_guard lock(_mutex); - for (auto& [dst_id, streams] : _streams_for_node) { + decltype(_streams_for_node) snapshot; + { + std::lock_guard lock(_mutex); + snapshot = _streams_for_node; + } + for (auto& [dst_id, streams] : snapshot) { fn(dst_id, *streams); } } Status LoadStreamMap::for_each_st(std::function fn) { - std::lock_guard lock(_mutex); - for (auto& [dst_id, streams] : _streams_for_node) { + decltype(_streams_for_node) snapshot; + { + std::lock_guard lock(_mutex); + snapshot = _streams_for_node; + } + for (auto& [dst_id, streams] : snapshot) { RETURN_IF_ERROR(fn(dst_id, *streams)); } return Status::OK(); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 617e96c6cc..1f1756b5a1 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -546,9 +546,15 @@ Status VTabletWriterV2::close(Status exec_status) { LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink << ", load_id=" << print_id(_load_id); - // send CLOSE_LOAD and close_wait on all streams + // send CLOSE_LOAD on all streams if this is the last sink if (is_last_sink) { RETURN_IF_ERROR(_load_stream_map->close_load()); + } + + // close_wait on all streams, even if this is not the last sink. + // because some per-instance data structures are now shared among all sinks + // due to sharing delta writers and load stream stubs. + { SCOPED_TIMER(_close_load_timer); RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t dst_id, const Streams& streams) -> Status {