diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 59ff35a3a5..2f52bd4417 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -746,6 +746,13 @@ DEFINE_Bool(share_delta_writers, "true"); // timeout for open load stream rpc in ms DEFINE_Int64(open_load_stream_timeout_ms, "500"); +// idle timeout for load stream in ms +DEFINE_Int64(load_stream_idle_timeout_ms, "600000"); +// brpc streaming max_buf_size in bytes +DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB +// brpc streaming messages_in_batch +DEFINE_Int32(load_stream_messages_in_batch, "128"); + // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, // if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job diff --git a/be/src/common/config.h b/be/src/common/config.h index bc93816581..e13d6dcfd4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -803,6 +803,13 @@ DECLARE_Bool(share_delta_writers); // timeout for open load stream rpc in ms DECLARE_Int64(open_load_stream_timeout_ms); +// idle timeout for load stream in ms +DECLARE_Int64(load_stream_idle_timeout_ms); +// brpc streaming max_buf_size in bytes +DECLARE_Int64(load_stream_max_buf_size); +// brpc streaming messages_in_batch +DECLARE_Int32(load_stream_messages_in_batch); + // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, // if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 793098a3e9..76907713fd 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -134,9 +134,9 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, _handler.set_load_id(_load_id); std::string host_port = get_host_port(node_info.host, node_info.brpc_port); brpc::StreamOptions opt; - opt.max_buf_size = 20 << 20; // 20MB - opt.idle_timeout_ms = 30000; - opt.messages_in_batch = 128; + opt.max_buf_size = config::load_stream_max_buf_size; + opt.idle_timeout_ms = config::load_stream_idle_timeout_ms; + opt.messages_in_batch = config::load_stream_messages_in_batch; opt.handler = &_handler; brpc::Controller cntl; if (int ret = StreamCreate(&_stream_id, cntl, &opt)) {