[FEAT MERGE] INDEX MERGE PHASE I
This commit is contained in:
@ -942,7 +942,9 @@ int ObTableScanOp::prepare_all_das_tasks()
|
||||
if (need_perform_real_batch_rescan()) {
|
||||
grp_guard.switch_group_rescan_param(i);
|
||||
}
|
||||
if (OB_FAIL(prepare_single_scan_range(i))) {
|
||||
if (MY_CTDEF.use_index_merge_ && OB_FAIL(prepare_index_merge_scan_range(i))) {
|
||||
LOG_WARN("failed to prepare index merge scan range", K(ret));
|
||||
} else if (!MY_CTDEF.use_index_merge_ && OB_FAIL(prepare_single_scan_range(i))) {
|
||||
LOG_WARN("prepare single scan range failed", K(ret));
|
||||
} else if (OB_FAIL(prepare_das_task())) {
|
||||
LOG_WARN("prepare das task failed", K(ret));
|
||||
@ -1172,7 +1174,11 @@ int ObTableScanOp::prepare_scan_range()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!need_perform_real_batch_rescan()) {
|
||||
ret = prepare_single_scan_range();
|
||||
if (MY_CTDEF.use_index_merge_ && OB_FAIL(prepare_index_merge_scan_range())) {
|
||||
LOG_WARN("failed to prepare index merge range", K(ret));
|
||||
} else if (!MY_CTDEF.use_index_merge_ && OB_FAIL(prepare_single_scan_range())) {
|
||||
LOG_WARN("failed to prepare single scan range", K(ret));
|
||||
}
|
||||
} else {
|
||||
ret = prepare_batch_scan_range();
|
||||
}
|
||||
@ -1371,6 +1377,123 @@ int ObTableScanOp::prepare_single_scan_range(int64_t group_idx)
|
||||
return ret;
|
||||
}
|
||||
|
||||
// for index merge, disable equal range optimization
|
||||
int ObTableScanOp::prepare_index_merge_scan_range(int64_t group_idx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
|
||||
ObIAllocator &range_allocator = (table_rescan_allocator_ != nullptr ?
|
||||
*table_rescan_allocator_ : ctx_.get_allocator());
|
||||
ObDASBaseRtDef *attach_rtdef = tsc_rtdef_.attach_rtinfo_->attach_rtdef_;
|
||||
if (OB_ISNULL(attach_rtdef)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KPC(attach_rtdef), K(ret));
|
||||
} else {
|
||||
ObDASBaseRtDef *index_merge_rtdef = attach_rtdef->op_type_ == DAS_OP_TABLE_LOOKUP ?
|
||||
attach_rtdef->children_[0] : attach_rtdef;
|
||||
if (OB_FAIL(prepare_range_for_each_index(group_idx, range_allocator, index_merge_rtdef))) {
|
||||
LOG_WARN("failed to prepare range for each index", KPC(index_merge_rtdef), K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableScanOp::prepare_range_for_each_index(int64_t group_idx,
|
||||
ObIAllocator &allocator,
|
||||
ObDASBaseRtDef *rtdef)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(rtdef)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KPC(rtdef), K(ret));
|
||||
} else if (rtdef->op_type_ == DAS_OP_TABLE_SCAN) {
|
||||
ObQueryRangeArray key_ranges;
|
||||
ObQueryRangeArray ss_key_ranges;
|
||||
ObDASScanRtDef *scan_rtdef = static_cast<ObDASScanRtDef*>(rtdef);
|
||||
const ObDASScanCtDef *scan_ctdef = static_cast<const ObDASScanCtDef*>(rtdef->ctdef_);
|
||||
const ObQueryRange &pre_query_range = scan_ctdef->pre_query_range_;
|
||||
scan_rtdef->key_ranges_.reuse();
|
||||
scan_rtdef->ss_key_ranges_.reuse();
|
||||
scan_rtdef->mbr_filters_.reuse();
|
||||
if (OB_UNLIKELY(!pre_query_range.has_range())) {
|
||||
// virtual table, do nothing
|
||||
} else if (pre_query_range.is_contain_geo_filters() &&
|
||||
OB_FAIL(ObSQLUtils::extract_geo_query_range(
|
||||
pre_query_range,
|
||||
allocator,
|
||||
ctx_,
|
||||
key_ranges,
|
||||
scan_rtdef->mbr_filters_,
|
||||
ObBasicSessionInfo::create_dtc_params(ctx_.get_my_session())))) {
|
||||
LOG_WARN("failed to extract pre query ranges", K(ret));
|
||||
} else if (!pre_query_range.is_contain_geo_filters() &&
|
||||
OB_FAIL(ObSQLUtils::extract_pre_query_range(
|
||||
pre_query_range,
|
||||
allocator,
|
||||
ctx_,
|
||||
key_ranges,
|
||||
ObBasicSessionInfo::create_dtc_params(ctx_.get_my_session())))) {
|
||||
LOG_WARN("failed to extract pre query ranges", K(ret));
|
||||
} else if (OB_FAIL(pre_query_range.get_ss_tablet_ranges(allocator,
|
||||
ctx_,
|
||||
ss_key_ranges,
|
||||
ObBasicSessionInfo::create_dtc_params(ctx_.get_my_session())))) {
|
||||
LOG_WARN("failed to final extract index skip query range", K(ret));
|
||||
} else if (!ss_key_ranges.empty()) {
|
||||
// index skip scan, ranges from extract_pre_query_range/get_ss_tablet_ranges,
|
||||
// prefix range and postfix range is single range
|
||||
if (1 != ss_key_ranges.count() || 1 != key_ranges.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected index skip scan range", K(ret), K(key_ranges), K(ss_key_ranges));
|
||||
} else {
|
||||
key_ranges.at(0)->table_id_ = scan_ctdef->ref_table_id_;
|
||||
key_ranges.at(0)->group_idx_ = group_idx;
|
||||
ss_key_ranges.at(0)->table_id_ = scan_ctdef->ref_table_id_;
|
||||
ss_key_ranges.at(0)->group_idx_ = group_idx;
|
||||
if (OB_FAIL(scan_rtdef->key_ranges_.push_back(*key_ranges.at(0))) ||
|
||||
OB_FAIL(scan_rtdef->ss_key_ranges_.push_back(*ss_key_ranges.at(0)))) {
|
||||
LOG_WARN("failed to push back ss key range", KPC(scan_rtdef), K(ret));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ObNewRange whole_range;
|
||||
ObNewRange *key_range = nullptr;
|
||||
whole_range.set_whole_range();
|
||||
whole_range.table_id_ = scan_ctdef->ref_table_id_;
|
||||
whole_range.group_idx_ = group_idx;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < key_ranges.count(); ++i) {
|
||||
key_range = key_ranges.at(i);
|
||||
key_range->table_id_ = scan_ctdef->ref_table_id_;
|
||||
key_range->group_idx_ = group_idx;
|
||||
if (OB_FAIL(scan_rtdef->key_ranges_.push_back(*key_range)) ||
|
||||
OB_FAIL(scan_rtdef->ss_key_ranges_.push_back(whole_range))) {
|
||||
LOG_WARN("failed to push back key range", KPC(scan_rtdef), K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && MY_SPEC.is_vt_mapping_) {
|
||||
OZ(vt_result_converter_->convert_key_ranges(scan_rtdef->key_ranges_));
|
||||
}
|
||||
} else if (rtdef->op_type_ == DAS_OP_SORT) {
|
||||
OB_ASSERT(rtdef->children_ != nullptr && rtdef->children_cnt_ == 1);
|
||||
if (OB_FAIL(prepare_range_for_each_index(group_idx, allocator, rtdef->children_[0]))) {
|
||||
LOG_WARN("failed to prepare scan range for child", K(ret));
|
||||
}
|
||||
} else if (rtdef->op_type_ == DAS_OP_INDEX_MERGE) {
|
||||
OB_ASSERT(rtdef->children_ != nullptr && rtdef->children_cnt_ == 2);
|
||||
if (OB_FAIL(prepare_range_for_each_index(group_idx, allocator, rtdef->children_[0]))) {
|
||||
LOG_WARN("failed to prepare scan range for left tree", K(ret));
|
||||
} else if (OB_FAIL(prepare_range_for_each_index(group_idx, allocator, rtdef->children_[1]))) {
|
||||
LOG_WARN("failed to prepare scan range for right tree", K(ret));
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected rtdef type", K(rtdef), K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableScanOp::single_equal_scan_check_type(const ParamStore ¶m_store, bool& is_same_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -137,6 +137,7 @@ struct GroupRescanParamInfo
|
||||
common::ObObjParam cur_param_; //current param in param store, used to restore paramstore state after the completion of group rescan.
|
||||
};
|
||||
typedef common::ObFixedArray<GroupRescanParamInfo, common::ObIAllocator> GroupRescanParamArray;
|
||||
|
||||
struct ObTableScanCtDef
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
@ -158,7 +159,9 @@ public:
|
||||
{ }
|
||||
const ExprFixedArray &get_das_output_exprs() const
|
||||
{
|
||||
return lookup_ctdef_ != nullptr ? lookup_ctdef_->result_output_ : scan_ctdef_.result_output_;
|
||||
return attach_spec_.attach_ctdef_ != nullptr ? attach_spec_.get_result_output()
|
||||
: lookup_ctdef_ != nullptr ? lookup_ctdef_->result_output_
|
||||
: scan_ctdef_.result_output_;
|
||||
}
|
||||
const UIntFixedArray &get_full_acccess_cids() const
|
||||
{
|
||||
@ -179,7 +182,9 @@ public:
|
||||
KPC_(das_dppr_tbl),
|
||||
KPC_(calc_part_id_expr),
|
||||
K_(global_index_rowkey_exprs),
|
||||
K_(attach_spec));
|
||||
K_(attach_spec),
|
||||
K_(is_das_keep_order),
|
||||
K_(use_index_merge));
|
||||
//the query range of index scan/table scan
|
||||
ObQueryRange pre_query_range_;
|
||||
FlashBackItem flashback_item_;
|
||||
@ -211,7 +216,8 @@ public:
|
||||
uint64_t flags_;
|
||||
struct {
|
||||
uint64_t is_das_keep_order_ : 1; // whether das need keep ordering
|
||||
uint64_t reserved_ : 63;
|
||||
uint64_t use_index_merge_ : 1; // whether use index merge
|
||||
uint64_t reserved_ : 62;
|
||||
};
|
||||
};
|
||||
};
|
||||
@ -485,7 +491,8 @@ protected:
|
||||
int single_equal_scan_check_type(const ParamStore ¶m_store, bool& is_same_type);
|
||||
bool need_extract_range() const { return MY_SPEC.tsc_ctdef_.pre_query_range_.has_range(); }
|
||||
int prepare_single_scan_range(int64_t group_idx = 0);
|
||||
|
||||
int prepare_index_merge_scan_range(int64_t group_idx = 0);
|
||||
int prepare_range_for_each_index(int64_t group_idx, ObIAllocator &allocator, ObDASBaseRtDef *rtdef);
|
||||
int reuse_table_rescan_allocator();
|
||||
|
||||
int local_iter_rescan();
|
||||
|
||||
Reference in New Issue
Block a user