[FEAT MERGE] skip scan feature and performance optimization
This commit is contained in:
@ -39,10 +39,13 @@ int ObGITaskSet::get_task_at_pos(ObGranuleTaskInfo &info, const int64_t &pos) co
|
||||
int64_t cur_idx = gi_task_set_.at(pos).idx_;
|
||||
info.tablet_loc_ = const_cast<ObDASTabletLoc *>(gi_task_set_.at(pos).tablet_loc_);
|
||||
info.ranges_.reset();
|
||||
info.ss_ranges_.reset();
|
||||
for (int64_t i = pos; OB_SUCC(ret) && i < gi_task_set_.count(); i++) {
|
||||
if (cur_idx == gi_task_set_.at(i).idx_) {
|
||||
if (OB_FAIL(info.ranges_.push_back(gi_task_set_.at(i).range_))) {
|
||||
LOG_WARN("push back ranges failed", K(ret));
|
||||
} else if (OB_FAIL(info.ss_ranges_.push_back(gi_task_set_.at(i).ss_range_))) {
|
||||
LOG_WARN("push back skip scan ranges failed", K(ret));
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
@ -89,10 +92,13 @@ int ObGITaskSet::get_next_gi_task(ObGranuleTaskInfo &info)
|
||||
int64_t cur_idx = gi_task_set_.at(cur_pos_).idx_;
|
||||
info.tablet_loc_ = gi_task_set_.at(cur_pos_).tablet_loc_;
|
||||
info.ranges_.reset();
|
||||
info.ss_ranges_.reset();
|
||||
for (int64_t i = cur_pos_; OB_SUCC(ret) && i < gi_task_set_.count(); i++) {
|
||||
if (cur_idx == gi_task_set_.at(i).idx_) {
|
||||
if (OB_FAIL(info.ranges_.push_back(gi_task_set_.at(i).range_))) {
|
||||
LOG_WARN("push back ranges failed", K(ret));
|
||||
} else if (OB_FAIL(info.ss_ranges_.push_back(gi_task_set_.at(i).ss_range_))) {
|
||||
LOG_WARN("push back skip scan ranges failed", K(ret));
|
||||
}
|
||||
if (i == (gi_task_set_.count() - 1)) {
|
||||
cur_pos_ = gi_task_set_.count();
|
||||
@ -199,25 +205,30 @@ int ObGITaskSet::set_block_order(bool desc)
|
||||
|
||||
int ObGITaskSet::construct_taskset(ObIArray<ObDASTabletLoc*> &taskset_tablets,
|
||||
ObIArray<ObNewRange> &taskset_ranges,
|
||||
ObIArray<ObNewRange> &ss_ranges,
|
||||
ObIArray<int64_t> &taskset_idxs,
|
||||
ObGIRandomType random_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (taskset_tablets.count() != taskset_ranges.count() ||
|
||||
taskset_tablets.count() != taskset_idxs.count() ||
|
||||
taskset_tablets.empty()) {
|
||||
if (OB_UNLIKELY(taskset_tablets.count() != taskset_ranges.count() ||
|
||||
taskset_tablets.count() != taskset_idxs.count() ||
|
||||
taskset_tablets.empty() || ss_ranges.count() > 1)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("taskest count err", K(taskset_tablets.count()),
|
||||
K(taskset_ranges),
|
||||
K(taskset_idxs));
|
||||
K(taskset_idxs),
|
||||
K(ss_ranges.count()));
|
||||
} else if (!(GI_RANDOM_NONE <= random_type && random_type <= GI_RANDOM_RANGE)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("random type err", K(random_type));
|
||||
} else if (gi_task_set_.empty() && OB_FAIL(gi_task_set_.reserve(taskset_tablets.count()))) {
|
||||
LOG_WARN("failed to prepare allocate", K(ret));
|
||||
} else {
|
||||
ObNewRange whole_range;
|
||||
whole_range.set_whole_range();
|
||||
ObNewRange &ss_range = ss_ranges.empty() ? whole_range : ss_ranges.at(0);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < taskset_tablets.count(); i++) {
|
||||
ObGITaskInfo task_info(taskset_tablets.at(i), taskset_ranges.at(i), taskset_idxs.at(i));
|
||||
ObGITaskInfo task_info(taskset_tablets.at(i), taskset_ranges.at(i), ss_range, taskset_idxs.at(i));
|
||||
if (random_type != ObGITaskSet::GI_RANDOM_NONE) {
|
||||
task_info.hash_value_ = common::murmurhash(&task_info.idx_, sizeof(task_info.idx_), 0);
|
||||
}
|
||||
@ -673,6 +684,7 @@ int ObGranuleSplitter::split_gi_task(ObGranulePumpArgs &args,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSEArray<ObNewRange, 16> ranges;
|
||||
ObSEArray<ObNewRange, 16> ss_ranges;
|
||||
DASTabletLocSEArray taskset_tablets;
|
||||
ObSEArray<ObNewRange, 16> taskset_ranges;
|
||||
ObSEArray<int64_t, 16> taskset_idxs;
|
||||
@ -686,7 +698,8 @@ int ObGranuleSplitter::split_gi_task(ObGranulePumpArgs &args,
|
||||
} else if (tablets.count() <= 0 || OB_ISNULL(args.ctx_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("the task has an empty tablets", K(ret), K(tablets));
|
||||
} else if (OB_FAIL(get_query_range(*args.ctx_, tsc->get_query_range(), ranges, table_id, op_id, partition_granule, args.with_param_down()))) {
|
||||
} else if (OB_FAIL(get_query_range(*args.ctx_, tsc->get_query_range(), ranges, ss_ranges,
|
||||
table_id, op_id, partition_granule, args.with_param_down()))) {
|
||||
LOG_WARN("get query range failed", K(ret));
|
||||
} else if (ranges.count() <= 0) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -703,9 +716,14 @@ int ObGranuleSplitter::split_gi_task(ObGranulePumpArgs &args,
|
||||
taskset_idxs,
|
||||
range_independent))) {
|
||||
LOG_WARN("failed to get graunle task", K(ret), K(ranges), K(tablets));
|
||||
} else if (OB_FAIL(task_set.construct_taskset(taskset_tablets, taskset_ranges, taskset_idxs, random_type))) {
|
||||
} else if (OB_FAIL(task_set.construct_taskset(taskset_tablets,
|
||||
taskset_ranges,
|
||||
ss_ranges,
|
||||
taskset_idxs,
|
||||
random_type))) {
|
||||
LOG_WARN("construct taskset failed", K(ret), K(taskset_tablets),
|
||||
K(taskset_ranges),
|
||||
K(ss_ranges),
|
||||
K(taskset_idxs),
|
||||
K(random_type));
|
||||
}
|
||||
@ -715,6 +733,7 @@ int ObGranuleSplitter::split_gi_task(ObGranulePumpArgs &args,
|
||||
int ObGranuleSplitter::get_query_range(ObExecContext &ctx,
|
||||
const ObQueryRange &tsc_pre_query_range,
|
||||
ObIArray<ObNewRange> &ranges,
|
||||
ObIArray<ObNewRange> &ss_ranges,
|
||||
int64_t table_id,
|
||||
int64_t op_id,
|
||||
bool partition_granule,
|
||||
@ -722,6 +741,7 @@ int ObGranuleSplitter::get_query_range(ObExecContext &ctx,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObQueryRangeArray scan_ranges;
|
||||
ObQueryRangeArray skip_scan_ranges;
|
||||
ObGetMethodArray get_method;
|
||||
ObPhysicalPlanCtx *plan_ctx = nullptr;
|
||||
bool has_extract_query_range = false;
|
||||
@ -757,6 +777,12 @@ int ObGranuleSplitter::get_query_range(ObExecContext &ctx,
|
||||
get_method,
|
||||
ObBasicSessionInfo::create_dtc_params(ctx.get_my_session())))) {
|
||||
LOG_WARN("failed to get scan ranges", K(ret));
|
||||
} else if (OB_FAIL(tsc_pre_query_range.get_ss_tablet_ranges(
|
||||
ctx.get_allocator(),
|
||||
ctx,
|
||||
skip_scan_ranges,
|
||||
ObBasicSessionInfo::create_dtc_params(ctx.get_my_session())))) {
|
||||
LOG_WARN("failed to final extract index skip query range", K(ret));
|
||||
} else {
|
||||
has_extract_query_range = true;
|
||||
}
|
||||
@ -783,12 +809,37 @@ int ObGranuleSplitter::get_query_range(ObExecContext &ctx,
|
||||
get_method,
|
||||
ObBasicSessionInfo::create_dtc_params(ctx.get_my_session())))) {
|
||||
LOG_WARN("failed to get scan ranges", K(ret));
|
||||
} else if (OB_FAIL(tsc_pre_query_range.get_ss_tablet_ranges(
|
||||
ctx.get_allocator(),
|
||||
ctx,
|
||||
skip_scan_ranges,
|
||||
ObBasicSessionInfo::create_dtc_params(ctx.get_my_session())))) {
|
||||
LOG_WARN("failed to final extract index skip query range", K(ret));
|
||||
} else {
|
||||
has_extract_query_range = true;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG("gi get the scan range", K(ret), K(partition_granule), K(has_extract_query_range), K(scan_ranges));
|
||||
LOG_DEBUG("gi get the scan range", K(ret), K(partition_granule), K(has_extract_query_range),
|
||||
K(scan_ranges), K(skip_scan_ranges));
|
||||
if (OB_SUCC(ret)) {
|
||||
// index skip scan, ranges from extract_pre_query_range/get_ss_tablet_ranges,
|
||||
// prefix range and postfix range is single range
|
||||
ObNewRange *ss_range = NULL;
|
||||
ObNewRange whole_range;
|
||||
whole_range.set_whole_range();
|
||||
if (!skip_scan_ranges.empty() &&
|
||||
(OB_ISNULL(skip_scan_ranges.at(0)) ||
|
||||
OB_UNLIKELY(1 != skip_scan_ranges.count() || 1 != scan_ranges.count()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected index skip scan range", K(ret), K(scan_ranges), K(skip_scan_ranges));
|
||||
} else if (OB_FAIL(ss_ranges.push_back(skip_scan_ranges.empty()
|
||||
? whole_range : *skip_scan_ranges.at(0)))) {
|
||||
LOG_WARN("push back ranges failed", K(ret));
|
||||
} else {
|
||||
ss_ranges.at(ss_ranges.count() - 1).table_id_ = table_id;
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; i < scan_ranges.count() && OB_SUCC(ret); ++i) {
|
||||
if (OB_ISNULL(scan_ranges.at(i))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -1187,6 +1238,7 @@ int ObPartitionWiseGranuleSplitter::split_insert_gi_task(ObGranulePumpArgs &args
|
||||
// insert的每一个partition对应的区间默认是[min_rowkey,max_rowkey]
|
||||
ObNewRange each_partition_range;
|
||||
ObSEArray<ObNewRange, 4> ranges;
|
||||
ObSEArray<ObNewRange, 1> empty_ss_ranges;
|
||||
DASTabletLocSEArray taskset_tablets;
|
||||
ObSEArray<ObNewRange, 4> taskset_ranges;
|
||||
ObSEArray<int64_t, 4> taskset_idxs;
|
||||
@ -1213,7 +1265,8 @@ int ObPartitionWiseGranuleSplitter::split_insert_gi_task(ObGranulePumpArgs &args
|
||||
taskset_idxs,
|
||||
range_independent))) {
|
||||
LOG_WARN("failed to get insert graunle task", K(ret), K(each_partition_range), K(tablets));
|
||||
} else if (OB_FAIL(task_set.construct_taskset(taskset_tablets, taskset_ranges, taskset_idxs, random_type))) {
|
||||
} else if (OB_FAIL(task_set.construct_taskset(taskset_tablets, taskset_ranges,
|
||||
empty_ss_ranges, taskset_idxs, random_type))) {
|
||||
// INSERT的任务划分一定是 partition wise的,并且INSERT算子每次rescan仅仅需要每一个task对应的partition key,
|
||||
// `ranges`,`idx`等任务参数是不需要
|
||||
LOG_WARN("construct taskset failed", K(ret), K(taskset_tablets),
|
||||
|
||||
Reference in New Issue
Block a user