[Enhancement](hive-writer) Adjust table sink exchange rebalancer params. (#33397)

Issue Number:  #31442

Change table sink exchange rebalancer params to node level and adjust these params to improve write performance by better balance.

rebalancer params:
```
DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
              "26214400"); // 25MB
// Minimum partition data processed to rebalance writers in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
              "15728640"); // 15MB
```
This commit is contained in:
Qi Chen
2024-04-12 13:09:44 +08:00
committed by morningman
parent d31bca199f
commit e841d82ffb
5 changed files with 30 additions and 17 deletions

View File

@ -1184,14 +1184,15 @@ DEFINE_mString(ca_cert_file_paths,
"/etc/ssl/ca-bundle.pem");
/** Table sink configurations(currently contains only external table types) **/
// Minimum data processed to scale writers when non partition writing
// Minimum data processed to scale writers in exchange when non partition writing
DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold,
"125829120"); // 120MB
// Minimum data processed to start rebalancing in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_data_processed_threshold, "209715200"); // 200MB
"26214400"); // 25MB
// Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold,
"209715200"); // 200MB
DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
"26214400"); // 25MB
// Minimum partition data processed to rebalance writers in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
"15728640"); // 15MB
// Maximum processed partition nums of per writer when partition writing
DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128");

View File

@ -1258,12 +1258,12 @@ DECLARE_String(tmp_file_dir);
DECLARE_mString(ca_cert_file_paths);
/** Table sink configurations(currently contains only external table types) **/
// Minimum data processed to scale writers when non partition writing
// Minimum data processed to scale writers in exchange when non partition writing
DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold);
// Minimum data processed to start rebalancing in exchange when partition writing
DECLARE_mInt64(table_sink_partition_write_data_processed_threshold);
// Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing
DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold);
DECLARE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold);
// Minimum partition data processed to rebalance writers in exchange when partition writing
DECLARE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold);
// Maximum processed partition nums of per writer when partition writing
DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer);

View File

@ -257,17 +257,23 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_partitioner.reset(
new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_partition_count));
_partition_function.reset(new HashPartitionFunction(_partitioner.get()));
// const long MEGABYTE = 1024 * 1024;
// const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 10000 * MEGABYTE; // 1MB
// const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50000 * MEGABYTE; // 50MB
// const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 1MB
// const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 50MB
scale_writer_partitioning_exchanger.reset(new vectorized::ScaleWriterPartitioningExchanger<
HashPartitionFunction>(
channels.size(), *_partition_function, _partition_count, channels.size(), 1,
config::table_sink_partition_write_data_processed_threshold,
config::table_sink_partition_write_skewed_data_processed_rebalance_threshold));
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold /
state->task_num() ==
0
? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
: config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold /
state->task_num(),
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
state->task_num() ==
0
? config::table_sink_partition_write_min_data_processed_rebalance_threshold
: config::table_sink_partition_write_min_data_processed_rebalance_threshold /
state->task_num()));
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",

View File

@ -591,6 +591,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
init_runtime_state(task_runtime_state);
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
auto task = std::make_unique<PipelineXTask>(
pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this,
pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline),

View File

@ -619,6 +619,10 @@ public:
int task_id() const { return _task_id; }
void set_task_num(int task_num) { _task_num = task_num; }
int task_num() const { return _task_num; }
private:
Status create_error_log_file();
@ -729,6 +733,7 @@ private:
std::vector<TErrorTabletInfo> _error_tablet_infos;
int _max_operator_id = 0;
int _task_id = -1;
int _task_num = 0;
std::vector<THivePartitionUpdate> _hive_partition_updates;