decrease min value of px_task_size

This commit is contained in:
sdc 2024-11-05 03:47:51 +00:00 committed by ob-robot
parent 4b7197c9fb
commit 8c7deb9fcf
6 changed files with 22 additions and 19 deletions

View File

@ -1142,7 +1142,7 @@ DEF_CAP(dtl_buffer_size, OB_CLUSTER_PARAMETER, "64K", "[4K,2M]", "to be removed"
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
// TODO bin.lb: to be remove // TODO bin.lb: to be remove
DEF_CAP(px_task_size, OB_CLUSTER_PARAMETER, "2M", "[2M,)", "to be removed", DEF_CAP(px_task_size, OB_CLUSTER_PARAMETER, "2M", "[1K,)", "to be removed",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_max_elr_dependent_trx_count, OB_CLUSTER_PARAMETER, "0", "[0,)", "max elr dependent transaction count", DEF_INT(_max_elr_dependent_trx_count, OB_CLUSTER_PARAMETER, "0", "[0,)", "max elr dependent transaction count",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -45,7 +45,7 @@ void ObParallelBlockRangeTaskParams::reset()
expected_task_load_ = sql::OB_EXPECTED_TASK_LOAD; expected_task_load_ = sql::OB_EXPECTED_TASK_LOAD;
min_task_count_per_thread_ = sql::OB_MIN_PARALLEL_TASK_COUNT; min_task_count_per_thread_ = sql::OB_MIN_PARALLEL_TASK_COUNT;
max_task_count_per_thread_ = sql::OB_MAX_PARALLEL_TASK_COUNT; max_task_count_per_thread_ = sql::OB_MAX_PARALLEL_TASK_COUNT;
min_task_access_size_ = GCONF.px_task_size >> 20; min_task_access_size_ = GCONF.px_task_size >> 10;
} }
int ObParallelBlockRangeTaskParams::valid() const int ObParallelBlockRangeTaskParams::valid() const
@ -325,8 +325,8 @@ int ObGranuleUtil::split_block_granule(ObExecContext &exec_ctx,
partition_size))) { partition_size))) {
LOG_WARN("failed to get multi ranges cost", K(ret), K(tablet)); LOG_WARN("failed to get multi ranges cost", K(ret), K(tablet));
} else { } else {
// B to MB // B to KB
partition_size = partition_size / 1024 / 1024; partition_size = partition_size / 1024;
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
@ -340,6 +340,7 @@ int ObGranuleUtil::split_block_granule(ObExecContext &exec_ctx,
} }
} }
} }
LOG_TRACE("get multi ranges cost", K(empty_partition_cnt), K(size_each_partitions));
} }
// 3. calc the total number of tasks for all partitions // 3. calc the total number of tasks for all partitions
@ -347,7 +348,7 @@ int ObGranuleUtil::split_block_granule(ObExecContext &exec_ctx,
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
ObParallelBlockRangeTaskParams params; ObParallelBlockRangeTaskParams params;
params.parallelism_ = parallelism; params.parallelism_ = parallelism;
params.expected_task_load_ = tablet_size/1024/1024; params.expected_task_load_ = tablet_size/1024;
if (OB_FAIL(compute_total_task_count(params, total_size, esti_task_cnt_by_data_size))) { if (OB_FAIL(compute_total_task_count(params, total_size, esti_task_cnt_by_data_size))) {
LOG_WARN("compute task count failed", K(ret)); LOG_WARN("compute task count failed", K(ret));
} else { } else {
@ -448,16 +449,18 @@ int ObGranuleUtil::compute_total_task_count(const ObParallelBlockRangeTaskParams
tmp_total_task_count = min(params.min_task_count_per_thread_ * params.parallelism_, tmp_total_task_count = min(params.min_task_count_per_thread_ * params.parallelism_,
total_access_size/min_task_access_size); total_access_size/min_task_access_size);
tmp_total_task_count = max(tmp_total_task_count, total_access_size / expected_task_load); tmp_total_task_count = max(tmp_total_task_count, total_access_size / expected_task_load);
LOG_TRACE("the data is less than lower bound size", K(ret), K(tmp_total_task_count)); LOG_TRACE("the data is less than lower bound size", K(ret), K(tmp_total_task_count),
K(total_size), K(params));
} else if (total_access_size > upper_bound_size) { } else if (total_access_size > upper_bound_size) {
// the data size is greater than upper bound size // the data size is greater than upper bound size
tmp_total_task_count = params.max_task_count_per_thread_ * params.parallelism_; tmp_total_task_count = params.max_task_count_per_thread_ * params.parallelism_;
LOG_TRACE("the data size is greater upper bound size", K(ret), K(tmp_total_task_count)); LOG_TRACE("the data size is greater upper bound size", K(ret), K(tmp_total_task_count),
K(total_size), K(params));
} else { } else {
// the data size is between lower bound size and upper bound size // the data size is between lower bound size and upper bound size
tmp_total_task_count = total_access_size / expected_task_load; tmp_total_task_count = total_access_size / expected_task_load;
LOG_TRACE("the data size is between lower bound size and upper bound size", LOG_TRACE("the data size is between lower bound size and upper bound size",
K(ret), K(tmp_total_task_count)); K(ret), K(tmp_total_task_count), K(total_size), K(params));
} }
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {

View File

@ -106,7 +106,7 @@ struct ObParallelBlockRangeTaskParams
expected_task_load_(sql::OB_EXPECTED_TASK_LOAD), expected_task_load_(sql::OB_EXPECTED_TASK_LOAD),
min_task_count_per_thread_(sql::OB_MIN_PARALLEL_TASK_COUNT), min_task_count_per_thread_(sql::OB_MIN_PARALLEL_TASK_COUNT),
max_task_count_per_thread_(sql::OB_MAX_PARALLEL_TASK_COUNT), max_task_count_per_thread_(sql::OB_MAX_PARALLEL_TASK_COUNT),
min_task_access_size_(GCONF.px_task_size >> 20), min_task_access_size_(GCONF.px_task_size >> 10),
marcos_count_(0) marcos_count_(0)
{ } { }
virtual ~ObParallelBlockRangeTaskParams() virtual ~ObParallelBlockRangeTaskParams()
@ -117,8 +117,8 @@ struct ObParallelBlockRangeTaskParams
/* 并行度 */ /* 并行度 */
int64_t parallelism_; int64_t parallelism_;
/** /**
* MB * KB
* 100100M的数据 * 102400100M的数据
* 使使tablet size使 * 使使tablet size使
*/ */
int64_t expected_task_load_; int64_t expected_task_load_;
@ -133,8 +133,8 @@ struct ObParallelBlockRangeTaskParams
*/ */
int64_t max_task_count_per_thread_; int64_t max_task_count_per_thread_;
/** /**
* MB * KB
* 2M * 2M
* *
*/ */
int64_t min_task_access_size_; int64_t min_task_access_size_;

View File

@ -39,7 +39,7 @@ const int64_t OB_MIN_PARALLEL_TASK_COUNT = 13; //期望每一个并行度最低
const int64_t OB_MAX_PARALLEL_TASK_COUNT = 100; //期望每一个并行度最大持有task数量 const int64_t OB_MAX_PARALLEL_TASK_COUNT = 100; //期望每一个并行度最大持有task数量
const int64_t OB_MIN_MARCO_COUNT_IN_TASK = 1; //每个task最少负责的宏块个数 const int64_t OB_MIN_MARCO_COUNT_IN_TASK = 1; //每个task最少负责的宏块个数
const int64_t OB_INVAILD_PARALLEL_TASK_COUNT = -1; const int64_t OB_INVAILD_PARALLEL_TASK_COUNT = -1;
const int64_t OB_EXPECTED_TASK_LOAD = 100; //MB, one task will get 100MB data from disk const int64_t OB_EXPECTED_TASK_LOAD = 102400; //KB, one task will get 100MB data from disk
const int64_t OB_GET_MACROS_COUNT_BY_QUERY_RANGE = 1; const int64_t OB_GET_MACROS_COUNT_BY_QUERY_RANGE = 1;
const int64_t OB_GET_BLOCK_RANGE = 2; const int64_t OB_GET_BLOCK_RANGE = 2;
const int64_t OB_BROADCAST_THRESHOLD = 100; const int64_t OB_BROADCAST_THRESHOLD = 100;

View File

@ -10107,9 +10107,9 @@ int ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew(ObOptimizerCo
} else { } else {
ObParallelBlockRangeTaskParams params; ObParallelBlockRangeTaskParams params;
params.parallelism_ = compute_parallel; params.parallelism_ = compute_parallel;
params.expected_task_load_ = table_schema->get_tablet_size() / 1024 / 1024 <= 0 ? sql::OB_EXPECTED_TASK_LOAD : table_schema->get_tablet_size() / 1024 / 1024; params.expected_task_load_ = table_schema->get_tablet_size() / 1024 <= 0 ? sql::OB_EXPECTED_TASK_LOAD : table_schema->get_tablet_size() / 1024;
//convert from B -> MB //convert from B -> KB
int64_t esti_table_size = (esti_table_meta_info->micro_block_count_ * esti_table_meta_info->micro_block_size_) / 1024 / 1024; int64_t esti_table_size = (esti_table_meta_info->micro_block_count_ * esti_table_meta_info->micro_block_size_) / 1024;
double query_range_filter_ratio = esti_table_meta_info->row_count_ / esti_table_meta_info->table_row_count_; double query_range_filter_ratio = esti_table_meta_info->row_count_ / esti_table_meta_info->table_row_count_;
esti_table_size *= query_range_filter_ratio; esti_table_size *= query_range_filter_ratio;
esti_table_size = MAX(0, esti_table_size); esti_table_size = MAX(0, esti_table_size);

View File

@ -228,7 +228,7 @@ int ObComplementDataParam::split_task_ranges(
ObArrayArray<ObStoreRange> multi_range_split_array; ObArrayArray<ObStoreRange> multi_range_split_array;
ObParallelBlockRangeTaskParams params; ObParallelBlockRangeTaskParams params;
params.parallelism_ = hint_parallelism; params.parallelism_ = hint_parallelism;
params.expected_task_load_ = tablet_size / 1024 / 1024 <= 0 ? sql::OB_EXPECTED_TASK_LOAD : tablet_size / 1024 / 1024; params.expected_task_load_ = tablet_size / 1024 <= 0 ? sql::OB_EXPECTED_TASK_LOAD : tablet_size / 1024;
if (OB_FAIL(ranges.push_back(range))) { if (OB_FAIL(ranges.push_back(range))) {
LOG_WARN("push back range failed", K(ret)); LOG_WARN("push back range failed", K(ret));
} else if (OB_FAIL(tablet_service->get_multi_ranges_cost(tablet_id, } else if (OB_FAIL(tablet_service->get_multi_ranges_cost(tablet_id,
@ -239,7 +239,7 @@ int ObComplementDataParam::split_task_ranges(
if (OB_REPLICA_NOT_READABLE == ret) { if (OB_REPLICA_NOT_READABLE == ret) {
ret = OB_EAGAIN; ret = OB_EAGAIN;
} }
} else if (OB_FALSE_IT(total_size = total_size / 1024 / 1024 /* Byte -> MB */)) { } else if (OB_FALSE_IT(total_size = total_size / 1024 /* Byte -> KB */)) {
} else if (OB_FAIL(ObGranuleUtil::compute_total_task_count(params, } else if (OB_FAIL(ObGranuleUtil::compute_total_task_count(params,
total_size, total_size,
expected_task_count))) { expected_task_count))) {