diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 2506261a7e..b678d85559 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -838,6 +838,13 @@ DEFINE_Int32(routine_load_consumer_pool_size, "10"); // if the size of batch is more than this threshold, we will request plans for all related tables. DEFINE_Int32(multi_table_batch_plan_threshold, "200"); +// Used in single-stream-multi-table load. When receiving a batch of messages from Kafka, +// if the size of the table wait for plan is more than this threshold, we will request plans for all related tables. +// The param is aimed to avoid requesting and executing too many plans at once. +// Performing small batch processing on multiple tables during the loaded process can reduce the pressure of a single RPC +// and improve the real-time processing of data. +DEFINE_Int32(multi_table_max_wait_tables, "5"); + // When the timeout of a load task is less than this threshold, // Doris treats it as a high priority task. // high priority tasks use a separate thread pool for flush and do not block rpc by memory cleanup logic. diff --git a/be/src/common/config.h b/be/src/common/config.h index 0ba2fa9111..ea44d718bc 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -893,6 +893,13 @@ DECLARE_Int32(routine_load_consumer_pool_size); // if the size of batch is more than this threshold, we will request plans for all related tables. DECLARE_Int32(multi_table_batch_plan_threshold); +// Used in single-stream-multi-table load. When receiving a batch of messages from Kafka, +// if the size of the table wait for plan is more than this threshold, we will request plans for all related tables. +// The param is aimed to avoid requesting and executing too many plans at once. +// Performing small batch processing on multiple tables during the loaded process can reduce the pressure of a single RPC +// and improve the real-time processing of data. +DECLARE_Int32(multi_table_max_wait_tables); + // When the timeout of a load task is less than this threshold, // Doris treats it as a high priority task. // high priority tasks use a separate thread pool for flush and do not block rpc by memory cleanup logic. diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index da46645fd4..36976a2397 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -118,10 +118,12 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size "append failed in unplanned kafka pipe"); ++_unplanned_row_cnt; - size_t threshold = config::multi_table_batch_plan_threshold; - if (_unplanned_row_cnt >= threshold) { - LOG(INFO) << fmt::format("unplanned row cnt={} reach threshold={}, plan them", - _unplanned_row_cnt, threshold); + if (_unplanned_row_cnt >= _row_threshold || + _unplanned_pipes.size() >= _wait_tables_threshold) { + LOG(INFO) << fmt::format( + "unplanned row cnt={} reach row_threshold={} or wait_plan_table_threshold={}, " + "plan them", + _unplanned_row_cnt, _row_threshold, _wait_tables_threshold); Status st = request_and_exec_plans(); _unplanned_row_cnt = 0; if (!st.ok()) { diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h index 3ded0fc608..694794638a 100644 --- a/be/src/io/fs/multi_table_pipe.h +++ b/be/src/io/fs/multi_table_pipe.h @@ -92,6 +92,9 @@ private: #endif std::mutex _pipe_map_lock; std::unordered_map> _pipe_map; + + uint32_t _row_threshold = config::multi_table_batch_plan_threshold; + uint32_t _wait_tables_threshold = config::multi_table_max_wait_tables; }; } // namespace io } // end namespace doris diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 62329d214c..3f00e62d38 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -779,6 +779,12 @@ BaseCompaction:546859: * Description: For single-stream-multi-table load. When receive a batch of messages from kafka, if the size of batch is more than this threshold, we will request plans for all related tables. * Default value: 200 +#### `multi_table_max_wait_tables` + +* Type: int32 +* Description: Used in single-stream-multi-table load. When receiving a batch of messages from Kafka, if the size of the table wait for plan is more than this threshold, we will request plans for all related tables.The param is aimed to avoid requesting and executing too many plans at once. Performing small batch processing on multiple tables during the loaded process can reduce the pressure of a single RPC and improve the real-time processing of data. +* Default value: 5 + #### `single_replica_load_download_num_workers` * Type: int32 diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 26e47822b7..dc8418d548 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -805,6 +805,13 @@ BaseCompaction:546859: * 描述:一流多表使用该配置,表示攒多少条数据再进行规划。过小的值会导致规划频繁,多大的值会增加内存压力和导入延迟。 * 默认值:200 +#### `multi_table_max_wait_tables` + +* 类型:int32 +* 描述:一流多表使用该配置,如果等待执行的表的数量大于此阈值,将请求并执行所有相关表的计划。该参数旨在避免一次同时请求和执行过多的计划。 +将导入过程的多表进行小批处理,可以减少单次rpc的压力,同时可以提高导入数据处理的实时性。 +* 默认值:5 + #### `single_replica_load_download_num_workers` * 类型: int32 * 描述: 单副本数据导入功能中,Slave副本通过HTTP从Master副本下载数据文件的工作线程数。导入并发增大时,可以适当调大该参数来保证Slave副本及时同步Master副本数据。必要时也应相应地调大`webserver_num_workers`来提高IO效率。