From 620cfc3cd7c199f771cc7f70f0b4bfc1b353cb56 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 12 Jan 2024 21:34:36 +0800 Subject: [PATCH] [fix](move-memtable) set idle timeout equal to load timeout (#29839) --- be/src/common/config.cpp | 2 -- be/src/common/config.h | 2 -- be/src/service/internal_service.cpp | 4 +++- be/src/vec/sink/load_stream_stub.cpp | 5 +++-- be/src/vec/sink/load_stream_stub.h | 2 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 +++-- gensrc/proto/internal_service.proto | 1 + .../test_load_stream_fault_injection.groovy | 20 ++++++++++++++----- 8 files changed, 26 insertions(+), 15 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 6d1e0f7187..8f104fc430 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -776,8 +776,6 @@ DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s // timeout for load stream close wait in ms DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min -// idle timeout for load stream in ms -DEFINE_mInt64(load_stream_idle_timeout_ms, "600000"); // brpc streaming max_buf_size in bytes DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB // brpc streaming messages_in_batch diff --git a/be/src/common/config.h b/be/src/common/config.h index b9854db921..d0ed029f74 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -829,8 +829,6 @@ DECLARE_Int64(open_load_stream_timeout_ms); // timeout for load stream close wait in ms DECLARE_Int64(close_load_stream_timeout_ms); -// idle timeout for load stream in ms -DECLARE_Int64(load_stream_idle_timeout_ms); // brpc streaming max_buf_size in bytes DECLARE_Int64(load_stream_max_buf_size); // brpc streaming messages_in_batch diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 18a8325e4c..e7255f017b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -391,7 +391,9 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con } stream_options.handler = load_stream.get(); - stream_options.idle_timeout_ms = config::load_stream_idle_timeout_ms; + stream_options.idle_timeout_ms = request->idle_timeout_ms(); + DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout", + { stream_options.idle_timeout_ms = 1; }); StreamId streamid; if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) { diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 59fc29c60f..40ce75d24e 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -151,7 +151,7 @@ Status LoadStreamStub::open(std::shared_ptr self, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector& tablets_for_schema, int total_streams, - bool enable_profile) { + int64_t idle_timeout_ms, bool enable_profile) { std::unique_lock lock(_open_mutex); if (_is_init.load()) { return Status::OK(); @@ -160,7 +160,7 @@ Status LoadStreamStub::open(std::shared_ptr self, std::string host_port = get_host_port(node_info.host, node_info.brpc_port); brpc::StreamOptions opt; opt.max_buf_size = config::load_stream_max_buf_size; - opt.idle_timeout_ms = config::load_stream_idle_timeout_ms; + opt.idle_timeout_ms = idle_timeout_ms; opt.messages_in_batch = config::load_stream_messages_in_batch; opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self); brpc::Controller cntl; @@ -174,6 +174,7 @@ Status LoadStreamStub::open(std::shared_ptr self, request.set_txn_id(txn_id); request.set_enable_profile(enable_profile); request.set_total_streams(total_streams); + request.set_idle_timeout_ms(idle_timeout_ms); schema.to_protobuf(request.mutable_schema()); for (auto& tablet : tablets_for_schema) { *request.add_tablets() = tablet; diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 81ec99fa45..6aae778dc9 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -125,7 +125,7 @@ public: BrpcClientCache* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector& tablets_for_schema, int total_streams, - bool enable_profile); + int64_t idle_timeout_ms, bool enable_profile); // for mock this class in UT #ifdef BE_TEST diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 1263608887..02b4054925 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -275,18 +275,19 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams& st if (node_info == nullptr) { return Status::InternalError("Unknown node {} in tablet location", dst_id); } + auto idle_timeout_ms = _state->execution_timeout() * 1000; // get tablet schema from each backend only in the 1st stream for (auto& stream : streams.streams() | std::ranges::views::take(1)) { const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, - _total_streams, _state->enable_profile())); + _total_streams, idle_timeout_ms, _state->enable_profile())); } // for the rest streams, open without getting tablet schema for (auto& stream : streams.streams() | std::ranges::views::drop(1)) { RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, {}, _total_streams, - _state->enable_profile())); + idle_timeout_ms, _state->enable_profile())); } return Status::OK(); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index c91a4865ca..f197cd162d 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -754,6 +754,7 @@ message POpenLoadStreamRequest { repeated PTabletID tablets = 5; optional bool enable_profile = 6 [default = false]; optional int64 total_streams = 7; + optional int64 idle_timeout_ms = 8; } message PTabletSchemaWithIndex { diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy index f5dcbb1b7e..d5cd8097f9 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy @@ -125,6 +125,20 @@ suite("load_stream_fault_injection", "nonConcurrent") { } } + def load_with_injection2 = { injection1, injection2, error_msg-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection1) + GetDebugPoint().enableDebugPointForAllBEs(injection2) + sql "insert into test select * from baseall where k1 <= 3" + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains(error_msg)) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(injection1) + GetDebugPoint().disableDebugPointForAllBEs(injection2) + } + } + // LoadStreamWriter create file failed load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", "") // LoadStreamWriter append_data meet null file writer error @@ -161,14 +175,10 @@ suite("load_stream_fault_injection", "nonConcurrent") { load_with_injection("LoadStream._dispatch.unknown_srcid", "") // LoadStream meets StreamRPC idle timeout - get_be_param("load_stream_idle_timeout_ms") - set_be_param("load_stream_idle_timeout_ms", 500) try { - load_with_injection("LoadStreamStub._send_with_retry.delay_before_send", "") + load_with_injection2("LoadStreamStub._send_with_retry.delay_before_send", "PInternalServiceImpl.open_load_stream.set_idle_timeout", "") } catch(Exception e) { logger.info(e.getMessage()) - } finally { - reset_be_param("load_stream_idle_timeout_ms") } }