[improve](move-memtable) cancel load rapidly when stream close wait (#29322)
This commit is contained in:
committed by
GitHub
parent
6c74313f60
commit
bbf58c5aa4
@ -90,19 +90,22 @@ int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
|
||||
void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
|
||||
LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id;
|
||||
std::lock_guard<bthread::Mutex> lock(_mutex);
|
||||
DBUG_EXECUTE_IF("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait", { return; });
|
||||
_is_closed.store(true);
|
||||
_close_cv.notify_all();
|
||||
}
|
||||
|
||||
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
|
||||
: _use_cnt(num_use),
|
||||
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use, RuntimeState* state)
|
||||
: _state(state),
|
||||
_use_cnt(num_use),
|
||||
_load_id(load_id),
|
||||
_src_id(src_id),
|
||||
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
|
||||
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {};
|
||||
|
||||
LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
|
||||
: _use_cnt(stub._use_cnt.load()),
|
||||
LoadStreamStub::LoadStreamStub(LoadStreamStub& stub, RuntimeState* state)
|
||||
: _state(state),
|
||||
_use_cnt(stub._use_cnt.load()),
|
||||
_load_id(stub._load_id),
|
||||
_src_id(stub._src_id),
|
||||
_tablet_schema_for_index(stub._tablet_schema_for_index),
|
||||
|
||||
@ -57,6 +57,7 @@
|
||||
#include "gutil/ref_counted.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/memory/mem_tracker.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "runtime/types.h"
|
||||
#include "util/countdown_latch.h"
|
||||
@ -101,11 +102,29 @@ private:
|
||||
Status close_wait(int64_t timeout_ms) {
|
||||
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
|
||||
std::unique_lock<bthread::Mutex> lock(_mutex);
|
||||
if (_is_closed) {
|
||||
return Status::OK();
|
||||
int ret = 0;
|
||||
MonotonicStopWatch watch;
|
||||
watch.start();
|
||||
while (true) {
|
||||
if (_is_closed) {
|
||||
return Status::OK();
|
||||
}
|
||||
if (_stub->is_cancelled()) {
|
||||
return _stub->cancelled_state();
|
||||
}
|
||||
if (_stub->runtime_state()->is_cancelled()) {
|
||||
return Status::Cancelled(_stub->runtime_state()->cancel_reason());
|
||||
}
|
||||
// wait 1s once time.
|
||||
ret = _close_cv.wait_for(lock, 1);
|
||||
if (ret == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
if (watch.elapsed_time() / 1000 / 1000 >= timeout_ms) {
|
||||
return Status::InternalError("stream close wait timeout, result: {}", ret);
|
||||
}
|
||||
}
|
||||
int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
|
||||
return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream close_wait timeout");
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
std::vector<int64_t> success_tablets() {
|
||||
@ -138,10 +157,28 @@ private:
|
||||
|
||||
public:
|
||||
// construct new stub
|
||||
LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
|
||||
LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use, RuntimeState* state);
|
||||
|
||||
// copy constructor, shared_ptr members are shared
|
||||
LoadStreamStub(LoadStreamStub& stub);
|
||||
LoadStreamStub(LoadStreamStub& stub, RuntimeState* state);
|
||||
|
||||
void cancel(Status status) {
|
||||
_cancel = true;
|
||||
_cancel_status = status;
|
||||
}
|
||||
|
||||
RuntimeState* runtime_state() const { return _state; }
|
||||
|
||||
bool is_cancelled() const { return _cancel; }
|
||||
|
||||
Status cancelled_state() const { return _cancel_status; }
|
||||
|
||||
std::string cancel_reason() const {
|
||||
if (_state == nullptr) {
|
||||
return "";
|
||||
}
|
||||
return _state->cancel_reason();
|
||||
}
|
||||
|
||||
// for mock this class in UT
|
||||
#ifdef BE_TEST
|
||||
@ -231,6 +268,11 @@ private:
|
||||
Status _send_with_retry(butil::IOBuf& buf);
|
||||
|
||||
protected:
|
||||
RuntimeState* _state = nullptr;
|
||||
|
||||
bool _cancel = false;
|
||||
Status _cancel_status;
|
||||
|
||||
std::atomic<bool> _is_init;
|
||||
bthread::Mutex _mutex;
|
||||
|
||||
|
||||
@ -54,13 +54,19 @@ void LoadStreams::release() {
|
||||
}
|
||||
}
|
||||
|
||||
void LoadStreams::cancel(Status status) {
|
||||
for (auto& stream : _streams) {
|
||||
stream->cancel(status);
|
||||
}
|
||||
}
|
||||
|
||||
LoadStreamStubPool::LoadStreamStubPool() = default;
|
||||
|
||||
LoadStreamStubPool::~LoadStreamStubPool() = default;
|
||||
|
||||
std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id,
|
||||
int64_t dst_id, int num_streams,
|
||||
int num_sink) {
|
||||
int num_sink, RuntimeState* state) {
|
||||
auto key = std::make_pair(UniqueId(load_id), dst_id);
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
std::shared_ptr<LoadStreams> streams = _pool[key];
|
||||
@ -69,11 +75,12 @@ std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId load_id
|
||||
}
|
||||
DCHECK(num_streams > 0) << "stream num should be greater than 0";
|
||||
DCHECK(num_sink > 0) << "sink num should be greater than 0";
|
||||
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink});
|
||||
auto [it, _] =
|
||||
_template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink, state});
|
||||
streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
|
||||
for (int32_t i = 0; i < num_streams; i++) {
|
||||
// copy construct, internal tablet schema map will be shared among all stubs
|
||||
streams->streams().emplace_back(new LoadStreamStub {*it->second});
|
||||
streams->streams().emplace_back(new LoadStreamStub {*it->second, state});
|
||||
}
|
||||
_pool[key] = streams;
|
||||
return streams;
|
||||
|
||||
@ -78,6 +78,8 @@ public:
|
||||
|
||||
void release();
|
||||
|
||||
void cancel(Status status);
|
||||
|
||||
Streams& streams() { return _streams; }
|
||||
|
||||
private:
|
||||
@ -95,7 +97,7 @@ public:
|
||||
~LoadStreamStubPool();
|
||||
|
||||
std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id,
|
||||
int num_streams, int num_sink);
|
||||
int num_streams, int num_sink, RuntimeState* state);
|
||||
|
||||
void erase(UniqueId load_id, int64_t dst_id);
|
||||
|
||||
|
||||
@ -113,7 +113,7 @@ Status VTabletWriterV2::_incremental_open_streams(
|
||||
}
|
||||
for (int64_t node_id : new_backends) {
|
||||
auto load_streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
|
||||
_load_id, _backend_id, node_id, _stream_per_node, _num_local_sink);
|
||||
_load_id, _backend_id, node_id, _stream_per_node, _num_local_sink, _state);
|
||||
RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
|
||||
_streams_for_node[node_id] = load_streams;
|
||||
}
|
||||
@ -261,7 +261,7 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
|
||||
Status VTabletWriterV2::_open_streams(int64_t src_id) {
|
||||
for (auto& [dst_id, _] : _tablets_for_node) {
|
||||
auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
|
||||
_load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
|
||||
_load_id, src_id, dst_id, _stream_per_node, _num_local_sink, _state);
|
||||
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
|
||||
_streams_for_node[dst_id] = streams;
|
||||
}
|
||||
@ -457,6 +457,7 @@ Status VTabletWriterV2::_cancel(Status status) {
|
||||
}
|
||||
for (const auto& [_, streams] : _streams_for_node) {
|
||||
streams->release();
|
||||
streams->cancel(status);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -51,7 +51,8 @@ static std::atomic<int64_t> g_num_request;
|
||||
class StreamSinkFileWriterTest : public testing::Test {
|
||||
class MockStreamStub : public LoadStreamStub {
|
||||
public:
|
||||
MockStreamStub(PUniqueId load_id, int64_t src_id) : LoadStreamStub(load_id, src_id, 1) {};
|
||||
MockStreamStub(PUniqueId load_id, int64_t src_id, RuntimeState* state)
|
||||
: LoadStreamStub(load_id, src_id, 1, state) {};
|
||||
|
||||
virtual ~MockStreamStub() = default;
|
||||
|
||||
@ -85,8 +86,9 @@ protected:
|
||||
virtual void SetUp() {
|
||||
_load_id.set_hi(LOAD_ID_HI);
|
||||
_load_id.set_lo(LOAD_ID_LO);
|
||||
RuntimeState state;
|
||||
for (int src_id = 0; src_id < NUM_STREAM; src_id++) {
|
||||
_streams.emplace_back(new MockStreamStub(_load_id, src_id));
|
||||
_streams.emplace_back(new MockStreamStub(_load_id, src_id, &state));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -30,13 +30,14 @@ public:
|
||||
|
||||
TEST_F(LoadStreamStubPoolTest, test) {
|
||||
LoadStreamStubPool pool;
|
||||
RuntimeState state;
|
||||
int64_t src_id = 100;
|
||||
PUniqueId load_id;
|
||||
load_id.set_hi(1);
|
||||
load_id.set_hi(2);
|
||||
auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
|
||||
auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
|
||||
auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
|
||||
auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1, &state);
|
||||
auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1, &state);
|
||||
auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1, &state);
|
||||
EXPECT_EQ(2, pool.size());
|
||||
EXPECT_EQ(1, pool.templates_size());
|
||||
EXPECT_EQ(streams1, streams3);
|
||||
|
||||
@ -0,0 +1,106 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import org.codehaus.groovy.runtime.IOGroovyMethods
|
||||
import org.apache.doris.regression.util.Http
|
||||
|
||||
suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") {
|
||||
sql """ set enable_memtable_on_sink_node=true """
|
||||
sql """ DROP TABLE IF EXISTS `baseall` """
|
||||
sql """ DROP TABLE IF EXISTS `test` """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS `baseall` (
|
||||
`k0` boolean null comment "",
|
||||
`k1` tinyint(4) null comment "",
|
||||
`k2` smallint(6) null comment "",
|
||||
`k3` int(11) null comment "",
|
||||
`k4` bigint(20) null comment "",
|
||||
`k5` decimal(9, 3) null comment "",
|
||||
`k6` char(5) null comment "",
|
||||
`k10` date null comment "",
|
||||
`k11` datetime null comment "",
|
||||
`k7` varchar(20) null comment "",
|
||||
`k8` double max null comment "",
|
||||
`k9` float sum null comment "",
|
||||
`k12` string replace null comment "",
|
||||
`k13` largeint(40) replace null comment ""
|
||||
) engine=olap
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
|
||||
"""
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS `test` (
|
||||
`k0` boolean null comment "",
|
||||
`k1` tinyint(4) null comment "",
|
||||
`k2` smallint(6) null comment "",
|
||||
`k3` int(11) null comment "",
|
||||
`k4` bigint(20) null comment "",
|
||||
`k5` decimal(9, 3) null comment "",
|
||||
`k6` char(5) null comment "",
|
||||
`k10` date null comment "",
|
||||
`k11` datetime null comment "",
|
||||
`k7` varchar(20) null comment "",
|
||||
`k8` double max null comment "",
|
||||
`k9` float sum null comment "",
|
||||
`k12` string replace_if_not_null null comment "",
|
||||
`k13` largeint(40) replace null comment ""
|
||||
) engine=olap
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
|
||||
"""
|
||||
|
||||
GetDebugPoint().clearDebugPointsForAllBEs()
|
||||
streamLoad {
|
||||
table "baseall"
|
||||
db "regression_test_fault_injection_p0"
|
||||
set 'column_separator', ','
|
||||
file "baseall.txt"
|
||||
}
|
||||
|
||||
try {
|
||||
GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait")
|
||||
def thread1 = new Thread({
|
||||
try {
|
||||
def res = sql "insert into test select * from baseall where k1 <= 3"
|
||||
logger.info(res.toString())
|
||||
} catch(Exception e) {
|
||||
logger.info(e.getMessage())
|
||||
assertTrue(e.getMessage().contains("Communications link failure"))
|
||||
} finally {
|
||||
GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait")
|
||||
}
|
||||
})
|
||||
thread1.start()
|
||||
|
||||
sleep(1000)
|
||||
|
||||
def processList = sql "show processlist"
|
||||
logger.info(processList.toString())
|
||||
processList.each { item ->
|
||||
logger.info(item[1].toString())
|
||||
logger.info(item[11].toString())
|
||||
if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){
|
||||
def res = sql "kill ${item[1]}"
|
||||
logger.info(res.toString())
|
||||
}
|
||||
}
|
||||
} catch(Exception e) {
|
||||
logger.info(e.getMessage())
|
||||
}
|
||||
|
||||
sql """ DROP TABLE IF EXISTS `baseall` """
|
||||
sql """ DROP TABLE IF EXISTS `test` """
|
||||
sql """ set enable_memtable_on_sink_node=false """
|
||||
}
|
||||
Reference in New Issue
Block a user