diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 17a5587f07..dd5d68dfbf 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1184,14 +1184,15 @@ DEFINE_mString(ca_cert_file_paths, "/etc/ssl/ca-bundle.pem"); /** Table sink configurations(currently contains only external table types) **/ -// Minimum data processed to scale writers when non partition writing +// Minimum data processed to scale writers in exchange when non partition writing DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold, - "125829120"); // 120MB -// Minimum data processed to start rebalancing in exchange when partition writing -DEFINE_mInt64(table_sink_partition_write_data_processed_threshold, "209715200"); // 200MB + "26214400"); // 25MB // Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing -DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold, - "209715200"); // 200MB +DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold, + "26214400"); // 25MB +// Minimum partition data processed to rebalance writers in exchange when partition writing +DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold, + "15728640"); // 15MB // Maximum processed partition nums of per writer when partition writing DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128"); diff --git a/be/src/common/config.h b/be/src/common/config.h index ae39a6e5eb..d8c77bf949 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1258,12 +1258,12 @@ DECLARE_String(tmp_file_dir); DECLARE_mString(ca_cert_file_paths); /** Table sink configurations(currently contains only external table types) **/ -// Minimum data processed to scale writers when non partition writing +// Minimum data processed to scale writers in exchange when non partition writing DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold); -// Minimum data processed to start rebalancing in exchange when partition writing -DECLARE_mInt64(table_sink_partition_write_data_processed_threshold); // Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing -DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold); +DECLARE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold); +// Minimum partition data processed to rebalance writers in exchange when partition writing +DECLARE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold); // Maximum processed partition nums of per writer when partition writing DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index bc55bc8f80..8323e20cfd 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -257,17 +257,23 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _partitioner.reset( new vectorized::Crc32HashPartitioner(_partition_count)); _partition_function.reset(new HashPartitionFunction(_partitioner.get())); - // const long MEGABYTE = 1024 * 1024; - // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 10000 * MEGABYTE; // 1MB - // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50000 * MEGABYTE; // 50MB - // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 1MB - // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 50MB scale_writer_partitioning_exchanger.reset(new vectorized::ScaleWriterPartitioningExchanger< HashPartitionFunction>( channels.size(), *_partition_function, _partition_count, channels.size(), 1, - config::table_sink_partition_write_data_processed_threshold, - config::table_sink_partition_write_skewed_data_processed_rebalance_threshold)); + config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num(), + config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num())); + RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner", diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 61c0bbfbf9..5d49aaf408 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -591,6 +591,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( init_runtime_state(task_runtime_state); auto cur_task_id = _total_tasks++; task_runtime_state->set_task_id(cur_task_id); + task_runtime_state->set_task_num(pipeline->num_tasks()); auto task = std::make_unique( pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this, pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 07655c71b6..644db3e32e 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -619,6 +619,10 @@ public: int task_id() const { return _task_id; } + void set_task_num(int task_num) { _task_num = task_num; } + + int task_num() const { return _task_num; } + private: Status create_error_log_file(); @@ -729,6 +733,7 @@ private: std::vector _error_tablet_infos; int _max_operator_id = 0; int _task_id = -1; + int _task_num = 0; std::vector _hive_partition_updates;