diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 6863faa4ac..9ad8d8805c 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -21,7 +21,6 @@ #include "olap/rowset/rowset_writer.h" #include "util/brpc_client_cache.h" -#include "util/debug_points.h" #include "util/network_util.h" #include "util/thrift_util.h" #include "util/uid_util.h" diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 5c9b3d2f31..a5c9d7464a 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -60,6 +60,7 @@ #include "runtime/thread_context.h" #include "runtime/types.h" #include "util/countdown_latch.h" +#include "util/debug_points.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "vec/columns/column.h" @@ -183,6 +184,10 @@ public: // wait remote to close stream, // remote will close stream when it receives CLOSE_LOAD Status close_wait(int64_t timeout_ms = 0) { + DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", { + while (true) { + }; + }); if (!_is_init.load() || _handler.is_closed()) { return Status::OK(); } diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index c867756d8f..3cf168a131 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -26,7 +26,7 @@ class TExpr; LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool) : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {} -void LoadStreams::release() { +void LoadStreams::release(Status status) { int num_use = --_use_cnt; DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; }); if (num_use == 0) { @@ -39,12 +39,14 @@ void LoadStreams::release() { LOG(WARNING) << "close stream failed " << st; } } - for (auto& stream : _streams) { - auto st = stream->close_wait(); - DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed", - { st = Status::InternalError("stream close wait timeout"); }); - if (!st.ok()) { - LOG(WARNING) << "close wait failed " << st; + if (status.ok()) { + for (auto& stream : _streams) { + auto st = stream->close_wait(); + DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed", + { st = Status::InternalError("stream close wait timeout"); }); + if (!st.ok()) { + LOG(WARNING) << "close wait failed " << st; + } } } _pool->erase(_load_id, _dst_id); diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_stub_pool.h index e41083825b..b34383b25f 100644 --- a/be/src/vec/sink/load_stream_stub_pool.h +++ b/be/src/vec/sink/load_stream_stub_pool.h @@ -76,7 +76,7 @@ class LoadStreams { public: LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool); - void release(); + void release(Status status); Streams& streams() { return _streams; } diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index d1e65c804d..e02fba7c21 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -456,7 +456,7 @@ Status VTabletWriterV2::_cancel(Status status) { _delta_writer_for_tablet.reset(); } for (const auto& [_, streams] : _streams_for_node) { - streams->release(); + streams->release(status); } return Status::OK(); } @@ -513,7 +513,7 @@ Status VTabletWriterV2::close(Status exec_status) { // defer stream release to prevent memory leak Defer defer([&] { for (const auto& [_, streams] : _streams_for_node) { - streams->release(); + streams->release(status); } _streams_for_node.clear(); }); diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp b/be/test/vec/exec/load_stream_stub_pool_test.cpp index 24da3bb699..bea5443b4f 100644 --- a/be/test/vec/exec/load_stream_stub_pool_test.cpp +++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp @@ -32,6 +32,7 @@ TEST_F(LoadStreamStubPoolTest, test) { LoadStreamStubPool pool; int64_t src_id = 100; PUniqueId load_id; + Status st = Status::OK(); load_id.set_hi(1); load_id.set_hi(2); auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1); @@ -41,9 +42,9 @@ TEST_F(LoadStreamStubPoolTest, test) { EXPECT_EQ(1, pool.templates_size()); EXPECT_EQ(streams1, streams3); EXPECT_NE(streams1, streams2); - streams1->release(); - streams2->release(); - streams3->release(); + streams1->release(st); + streams2->release(st); + streams3->release(st); EXPECT_EQ(0, pool.size()); EXPECT_EQ(0, pool.templates_size()); } diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy new file mode 100644 index 0000000000..4a87f1daf6 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") { + sql """ set enable_memtable_on_sink_node=true """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("VTabletWriterV2.close.cancel") + GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait") + def res = sql "insert into test select * from baseall where k1 <= 3" + logger.info(res.toString()) + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("cancel")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("VTabletWriterV2.close.cancel") + GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait") + } + + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ set enable_memtable_on_sink_node=false """ +} \ No newline at end of file