From bbf58c5aa42d40e66bc6ccc9ed91a4fcb4bdfff7 Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Sun, 31 Dec 2023 16:26:41 +0800 Subject: [PATCH] [improve](move-memtable) cancel load rapidly when stream close wait (#29322) --- be/src/vec/sink/load_stream_stub.cpp | 11 +- be/src/vec/sink/load_stream_stub.h | 54 ++++++++- be/src/vec/sink/load_stream_stub_pool.cpp | 13 ++- be/src/vec/sink/load_stream_stub_pool.h | 4 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 +- .../io/fs/stream_sink_file_writer_test.cpp | 6 +- .../vec/exec/load_stream_stub_pool_test.cpp | 7 +- ...eam_stub_close_wait_fault_injection.groovy | 106 ++++++++++++++++++ 8 files changed, 185 insertions(+), 21 deletions(-) create mode 100644 regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 4b557cae93..a5f4f59dda 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -90,19 +90,22 @@ int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) { LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id; std::lock_guard lock(_mutex); + DBUG_EXECUTE_IF("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait", { return; }); _is_closed.store(true); _close_cv.notify_all(); } -LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use) - : _use_cnt(num_use), +LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use, RuntimeState* state) + : _state(state), + _use_cnt(num_use), _load_id(load_id), _src_id(src_id), _tablet_schema_for_index(std::make_shared()), _enable_unique_mow_for_index(std::make_shared()) {}; -LoadStreamStub::LoadStreamStub(LoadStreamStub& stub) - : _use_cnt(stub._use_cnt.load()), +LoadStreamStub::LoadStreamStub(LoadStreamStub& stub, RuntimeState* state) + : _state(state), + _use_cnt(stub._use_cnt.load()), _load_id(stub._load_id), _src_id(stub._src_id), _tablet_schema_for_index(stub._tablet_schema_for_index), diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index edbbbda1e6..9c45926d3c 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -57,6 +57,7 @@ #include "gutil/ref_counted.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" +#include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "runtime/types.h" #include "util/countdown_latch.h" @@ -101,11 +102,29 @@ private: Status close_wait(int64_t timeout_ms) { DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0"; std::unique_lock lock(_mutex); - if (_is_closed) { - return Status::OK(); + int ret = 0; + MonotonicStopWatch watch; + watch.start(); + while (true) { + if (_is_closed) { + return Status::OK(); + } + if (_stub->is_cancelled()) { + return _stub->cancelled_state(); + } + if (_stub->runtime_state()->is_cancelled()) { + return Status::Cancelled(_stub->runtime_state()->cancel_reason()); + } + // wait 1s once time. + ret = _close_cv.wait_for(lock, 1); + if (ret == 0) { + return Status::OK(); + } + if (watch.elapsed_time() / 1000 / 1000 >= timeout_ms) { + return Status::InternalError("stream close wait timeout, result: {}", ret); + } } - int ret = _close_cv.wait_for(lock, timeout_ms * 1000); - return ret == 0 ? Status::OK() : Status::Error(ret, "stream close_wait timeout"); + return Status::OK(); }; std::vector success_tablets() { @@ -138,10 +157,28 @@ private: public: // construct new stub - LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use); + LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use, RuntimeState* state); // copy constructor, shared_ptr members are shared - LoadStreamStub(LoadStreamStub& stub); + LoadStreamStub(LoadStreamStub& stub, RuntimeState* state); + + void cancel(Status status) { + _cancel = true; + _cancel_status = status; + } + + RuntimeState* runtime_state() const { return _state; } + + bool is_cancelled() const { return _cancel; } + + Status cancelled_state() const { return _cancel_status; } + + std::string cancel_reason() const { + if (_state == nullptr) { + return ""; + } + return _state->cancel_reason(); + } // for mock this class in UT #ifdef BE_TEST @@ -231,6 +268,11 @@ private: Status _send_with_retry(butil::IOBuf& buf); protected: + RuntimeState* _state = nullptr; + + bool _cancel = false; + Status _cancel_status; + std::atomic _is_init; bthread::Mutex _mutex; diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index c867756d8f..94838acdc0 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -54,13 +54,19 @@ void LoadStreams::release() { } } +void LoadStreams::cancel(Status status) { + for (auto& stream : _streams) { + stream->cancel(status); + } +} + LoadStreamStubPool::LoadStreamStubPool() = default; LoadStreamStubPool::~LoadStreamStubPool() = default; std::shared_ptr LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id, int num_streams, - int num_sink) { + int num_sink, RuntimeState* state) { auto key = std::make_pair(UniqueId(load_id), dst_id); std::lock_guard lock(_mutex); std::shared_ptr streams = _pool[key]; @@ -69,11 +75,12 @@ std::shared_ptr LoadStreamStubPool::get_or_create(PUniqueId load_id } DCHECK(num_streams > 0) << "stream num should be greater than 0"; DCHECK(num_sink > 0) << "sink num should be greater than 0"; - auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink}); + auto [it, _] = + _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink, state}); streams = std::make_shared(load_id, dst_id, num_sink, this); for (int32_t i = 0; i < num_streams; i++) { // copy construct, internal tablet schema map will be shared among all stubs - streams->streams().emplace_back(new LoadStreamStub {*it->second}); + streams->streams().emplace_back(new LoadStreamStub {*it->second, state}); } _pool[key] = streams; return streams; diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_stub_pool.h index e41083825b..b548d31a14 100644 --- a/be/src/vec/sink/load_stream_stub_pool.h +++ b/be/src/vec/sink/load_stream_stub_pool.h @@ -78,6 +78,8 @@ public: void release(); + void cancel(Status status); + Streams& streams() { return _streams; } private: @@ -95,7 +97,7 @@ public: ~LoadStreamStubPool(); std::shared_ptr get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id, - int num_streams, int num_sink); + int num_streams, int num_sink, RuntimeState* state); void erase(UniqueId load_id, int64_t dst_id); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 77df9b07f1..b0a39ac2a7 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -113,7 +113,7 @@ Status VTabletWriterV2::_incremental_open_streams( } for (int64_t node_id : new_backends) { auto load_streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, _backend_id, node_id, _stream_per_node, _num_local_sink); + _load_id, _backend_id, node_id, _stream_per_node, _num_local_sink, _state); RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams)); _streams_for_node[node_id] = load_streams; } @@ -261,7 +261,7 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { Status VTabletWriterV2::_open_streams(int64_t src_id) { for (auto& [dst_id, _] : _tablets_for_node) { auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, src_id, dst_id, _stream_per_node, _num_local_sink); + _load_id, src_id, dst_id, _stream_per_node, _num_local_sink, _state); RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); _streams_for_node[dst_id] = streams; } @@ -457,6 +457,7 @@ Status VTabletWriterV2::_cancel(Status status) { } for (const auto& [_, streams] : _streams_for_node) { streams->release(); + streams->cancel(status); } return Status::OK(); } diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp b/be/test/io/fs/stream_sink_file_writer_test.cpp index 7e5bdd350f..a0daa15916 100644 --- a/be/test/io/fs/stream_sink_file_writer_test.cpp +++ b/be/test/io/fs/stream_sink_file_writer_test.cpp @@ -51,7 +51,8 @@ static std::atomic g_num_request; class StreamSinkFileWriterTest : public testing::Test { class MockStreamStub : public LoadStreamStub { public: - MockStreamStub(PUniqueId load_id, int64_t src_id) : LoadStreamStub(load_id, src_id, 1) {}; + MockStreamStub(PUniqueId load_id, int64_t src_id, RuntimeState* state) + : LoadStreamStub(load_id, src_id, 1, state) {}; virtual ~MockStreamStub() = default; @@ -85,8 +86,9 @@ protected: virtual void SetUp() { _load_id.set_hi(LOAD_ID_HI); _load_id.set_lo(LOAD_ID_LO); + RuntimeState state; for (int src_id = 0; src_id < NUM_STREAM; src_id++) { - _streams.emplace_back(new MockStreamStub(_load_id, src_id)); + _streams.emplace_back(new MockStreamStub(_load_id, src_id, &state)); } } diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp b/be/test/vec/exec/load_stream_stub_pool_test.cpp index 24da3bb699..7faa5fb2e5 100644 --- a/be/test/vec/exec/load_stream_stub_pool_test.cpp +++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp @@ -30,13 +30,14 @@ public: TEST_F(LoadStreamStubPoolTest, test) { LoadStreamStubPool pool; + RuntimeState state; int64_t src_id = 100; PUniqueId load_id; load_id.set_hi(1); load_id.set_hi(2); - auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1); - auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1); - auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1); + auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1, &state); + auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1, &state); + auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1, &state); EXPECT_EQ(2, pool.size()); EXPECT_EQ(1, pool.templates_size()); EXPECT_EQ(streams1, streams3); diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy new file mode 100644 index 0000000000..85ad5f7bc9 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") { + sql """ set enable_memtable_on_sink_node=true """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait") + def thread1 = new Thread({ + try { + def res = sql "insert into test select * from baseall where k1 <= 3" + logger.info(res.toString()) + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("Communications link failure")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait") + } + }) + thread1.start() + + sleep(1000) + + def processList = sql "show processlist" + logger.info(processList.toString()) + processList.each { item -> + logger.info(item[1].toString()) + logger.info(item[11].toString()) + if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){ + def res = sql "kill ${item[1]}" + logger.info(res.toString()) + } + } + } catch(Exception e) { + logger.info(e.getMessage()) + } + + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ set enable_memtable_on_sink_node=false """ +} \ No newline at end of file