[fix](move-memtable) set idle timeout equal to load timeout (#29839)

This commit is contained in:
Kaijie Chen
2024-01-12 21:34:36 +08:00
committed by yiguolei
parent a974e96841
commit 620cfc3cd7
8 changed files with 26 additions and 15 deletions

View File

@ -776,8 +776,6 @@ DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
// timeout for load stream close wait in ms
DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min
// idle timeout for load stream in ms
DEFINE_mInt64(load_stream_idle_timeout_ms, "600000");
// brpc streaming max_buf_size in bytes
DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
// brpc streaming messages_in_batch

View File

@ -829,8 +829,6 @@ DECLARE_Int64(open_load_stream_timeout_ms);
// timeout for load stream close wait in ms
DECLARE_Int64(close_load_stream_timeout_ms);
// idle timeout for load stream in ms
DECLARE_Int64(load_stream_idle_timeout_ms);
// brpc streaming max_buf_size in bytes
DECLARE_Int64(load_stream_max_buf_size);
// brpc streaming messages_in_batch

View File

@ -391,7 +391,9 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
}
stream_options.handler = load_stream.get();
stream_options.idle_timeout_ms = config::load_stream_idle_timeout_ms;
stream_options.idle_timeout_ms = request->idle_timeout_ms();
DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout",
{ stream_options.idle_timeout_ms = 1; });
StreamId streamid;
if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {

View File

@ -151,7 +151,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
bool enable_profile) {
int64_t idle_timeout_ms, bool enable_profile) {
std::unique_lock<bthread::Mutex> lock(_open_mutex);
if (_is_init.load()) {
return Status::OK();
@ -160,7 +160,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
brpc::StreamOptions opt;
opt.max_buf_size = config::load_stream_max_buf_size;
opt.idle_timeout_ms = config::load_stream_idle_timeout_ms;
opt.idle_timeout_ms = idle_timeout_ms;
opt.messages_in_batch = config::load_stream_messages_in_batch;
opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self);
brpc::Controller cntl;
@ -174,6 +174,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
request.set_txn_id(txn_id);
request.set_enable_profile(enable_profile);
request.set_total_streams(total_streams);
request.set_idle_timeout_ms(idle_timeout_ms);
schema.to_protobuf(request.mutable_schema());
for (auto& tablet : tablets_for_schema) {
*request.add_tablets() = tablet;

View File

@ -125,7 +125,7 @@ public:
BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
bool enable_profile);
int64_t idle_timeout_ms, bool enable_profile);
// for mock this class in UT
#ifdef BE_TEST

View File

@ -275,18 +275,19 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams& st
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location", dst_id);
}
auto idle_timeout_ms = _state->execution_timeout() * 1000;
// get tablet schema from each backend only in the 1st stream
for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, tablets_for_schema,
_total_streams, _state->enable_profile()));
_total_streams, idle_timeout_ms, _state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, {}, _total_streams,
_state->enable_profile()));
idle_timeout_ms, _state->enable_profile()));
}
return Status::OK();
}

View File

@ -754,6 +754,7 @@ message POpenLoadStreamRequest {
repeated PTabletID tablets = 5;
optional bool enable_profile = 6 [default = false];
optional int64 total_streams = 7;
optional int64 idle_timeout_ms = 8;
}
message PTabletSchemaWithIndex {

View File

@ -125,6 +125,20 @@ suite("load_stream_fault_injection", "nonConcurrent") {
}
}
def load_with_injection2 = { injection1, injection2, error_msg->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection1)
GetDebugPoint().enableDebugPointForAllBEs(injection2)
sql "insert into test select * from baseall where k1 <= 3"
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection1)
GetDebugPoint().disableDebugPointForAllBEs(injection2)
}
}
// LoadStreamWriter create file failed
load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", "")
// LoadStreamWriter append_data meet null file writer error
@ -161,14 +175,10 @@ suite("load_stream_fault_injection", "nonConcurrent") {
load_with_injection("LoadStream._dispatch.unknown_srcid", "")
// LoadStream meets StreamRPC idle timeout
get_be_param("load_stream_idle_timeout_ms")
set_be_param("load_stream_idle_timeout_ms", 500)
try {
load_with_injection("LoadStreamStub._send_with_retry.delay_before_send", "")
load_with_injection2("LoadStreamStub._send_with_retry.delay_before_send", "PInternalServiceImpl.open_load_stream.set_idle_timeout", "")
} catch(Exception e) {
logger.info(e.getMessage())
} finally {
reset_be_param("load_stream_idle_timeout_ms")
}
}