From e9c95707019ce144875ce77eae0e9b9e70da34b0 Mon Sep 17 00:00:00 2001 From: linbo-lin <1434442536@qq.com> Date: Thu, 21 Nov 2024 12:15:43 +0000 Subject: [PATCH] [FEAT MERGE] Refactor json multi-value index query and GIS index query based on DAS Iter Tree --- src/sql/CMakeLists.txt | 3 +- src/sql/code_generator/ob_tsc_cg_service.cpp | 194 ++++++- src/sql/code_generator/ob_tsc_cg_service.h | 5 + src/sql/das/iter/ob_das_doc_id_merge_iter.h | 2 + src/sql/das/iter/ob_das_iter_define.h | 10 + src/sql/das/iter/ob_das_iter_utils.cpp | 405 +++++++++++--- src/sql/das/iter/ob_das_iter_utils.h | 30 ++ src/sql/das/iter/ob_das_local_lookup_iter.h | 4 +- src/sql/das/iter/ob_das_lookup_iter.h | 28 +- src/sql/das/iter/ob_das_mvi_lookup_iter.cpp | 180 +++++++ src/sql/das/iter/ob_das_mvi_lookup_iter.h | 47 ++ src/sql/das/iter/ob_das_scan_iter.h | 11 +- src/sql/das/iter/ob_das_sort_iter.cpp | 127 +++-- src/sql/das/iter/ob_das_sort_iter.h | 11 +- src/sql/das/iter/ob_das_spatial_scan_iter.cpp | 152 ++++++ src/sql/das/iter/ob_das_spatial_scan_iter.h | 82 +++ src/sql/das/ob_das_scan_op.cpp | 92 ++-- .../das/ob_das_spatial_index_lookup_op.cpp | 313 ----------- src/sql/das/ob_das_spatial_index_lookup_op.h | 86 --- src/sql/das/ob_domain_index_lookup_op.cpp | 499 ------------------ src/sql/das/ob_domain_index_lookup_op.h | 82 --- src/sql/engine/sort/ob_sort_op_impl.cpp | 5 + src/sql/engine/sort/ob_sort_op_impl.h | 33 +- src/sql/optimizer/ob_join_order.cpp | 79 --- src/sql/optimizer/ob_join_order.h | 6 +- src/sql/optimizer/ob_log_table_scan.h | 1 + 26 files changed, 1210 insertions(+), 1277 deletions(-) create mode 100644 src/sql/das/iter/ob_das_mvi_lookup_iter.cpp create mode 100644 src/sql/das/iter/ob_das_mvi_lookup_iter.h create mode 100644 src/sql/das/iter/ob_das_spatial_scan_iter.cpp create mode 100644 src/sql/das/iter/ob_das_spatial_scan_iter.h delete mode 100644 src/sql/das/ob_das_spatial_index_lookup_op.cpp delete mode 100644 src/sql/das/ob_das_spatial_index_lookup_op.h diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index 25db98ee80..4e7e26f23b 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -61,7 +61,6 @@ ob_set_subtarget(ob_sql das das/ob_das_ir_define.cpp das/ob_das_vec_define.cpp das/ob_das_task_result.cpp - das/ob_das_spatial_index_lookup_op.cpp das/ob_das_retry_ctrl.cpp das/ob_das_simple_op.cpp das/ob_das_domain_utils.cpp @@ -82,6 +81,8 @@ ob_set_subtarget(ob_sql das das/iter/ob_das_doc_id_merge_iter.cpp das/iter/ob_das_vid_merge_iter.cpp das/iter/ob_das_index_merge_iter.cpp + das/iter/ob_das_mvi_lookup_iter.cpp + das/iter/ob_das_spatial_scan_iter.cpp ) ob_set_subtarget(ob_sql dtl diff --git a/src/sql/code_generator/ob_tsc_cg_service.cpp b/src/sql/code_generator/ob_tsc_cg_service.cpp index 6245dfc819..057ffea61f 100644 --- a/src/sql/code_generator/ob_tsc_cg_service.cpp +++ b/src/sql/code_generator/ob_tsc_cg_service.cpp @@ -178,16 +178,17 @@ int ObTscCgService::generate_tsc_ctdef(ObLogTableScan &op, ObTableScanCtDef &tsc } if (OB_SUCC(ret) && op.is_multivalue_index_scan()) { - ObDASIRAuxLookupCtDef *aux_lookup_ctdef = nullptr; - ObExpr *doc_id_col_expr = nullptr; - if (scan_ctdef.result_output_.count() == 0) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to generate multivalue lookup ctdef, scan_ctdef.result_output_.count() is 0", K(ret)); - } else if (FALSE_IT(doc_id_col_expr = scan_ctdef.result_output_.at(scan_ctdef.result_output_.count() - 1))) { - } else if (OB_FAIL(generate_doc_id_lookup_ctdef(op, tsc_ctdef, root_ctdef, doc_id_col_expr, aux_lookup_ctdef))) { - LOG_WARN("failed to generate doc id lookup ctdef", K(ret)); + if (OB_FAIL(generate_multivalue_ir_ctdef(op, tsc_ctdef, root_ctdef))) { + LOG_WARN("failed to generate multivalue ir ctdef", K(ret)); + } else { + need_attach = true; + } + } + + if (OB_SUCC(ret) && op.is_spatial_index_scan()) { + if (OB_FAIL(generate_gis_ir_ctdef(op, tsc_ctdef, root_ctdef))) { + LOG_WARN("failed to generate spatial ir ctdef", K(ret)); } else { - root_ctdef = aux_lookup_ctdef; need_attach = true; } } @@ -1307,6 +1308,12 @@ int ObTscCgService::extract_das_output_column_ids(const ObLogTableScan &op, LOG_WARN("fail to get doc id column.", K(ret)); } else if (OB_FAIL(output_cids.push_back(doc_id_col_id))) { LOG_WARN("store colum id failed", K(ret)); + } else if (!op.get_index_back()){ + if (OB_FAIL(extract_tsc_access_columns(op, das_output_cols))) { + LOG_WARN("extract tsc access columns failed", K(ret)); + } else if (OB_FAIL(extract_das_column_ids(das_output_cols, output_cids))) { + LOG_WARN("extract column ids failed", K(ret)); + } } } @@ -1632,6 +1639,90 @@ int ObTscCgService::generate_vec_ir_ctdef(const ObLogTableScan &op, return ret; } +int ObTscCgService::generate_multivalue_ir_ctdef(const ObLogTableScan &op, + ObTableScanCtDef &tsc_ctdef, + ObDASBaseCtDef *&root_ctdef) +{ + int ret = OB_SUCCESS; + + int64_t rowkey_cnt = 0; + const ObTableSchema *table_schema = nullptr; + if (OB_FAIL(cg_.opt_ctx_->get_schema_guard()->get_table_schema(MTL_ID(), op.get_real_ref_table_id(), table_schema))) { + LOG_WARN("get table schema failed", K(ret), K(op.get_ref_table_id())); + } else if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected nullptr to table schema", K(ret)); + } else { + rowkey_cnt = table_schema->get_rowkey_column_num(); + } + + if (OB_SUCC(ret)) { + ObDASScanCtDef *scan_ctdef = &tsc_ctdef.scan_ctdef_; + ObDASIRAuxLookupCtDef *aux_lookup_ctdef = nullptr; + ObDASSortCtDef *sort_ctdef = nullptr; + ObExpr *doc_id_col_expr = nullptr; + + if (scan_ctdef->result_output_.count() < rowkey_cnt + 1) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to generate multivalue lookup ctdef, scan_ctdef.result_output_.count() is unexpected", K(ret)); + } else if (FALSE_IT(doc_id_col_expr = scan_ctdef->result_output_.at(rowkey_cnt))) { + } else if (OB_FAIL(generate_doc_id_lookup_ctdef(op, tsc_ctdef, root_ctdef, doc_id_col_expr, aux_lookup_ctdef))) { + LOG_WARN("failed to generate doc id lookup ctdef", K(ret)); + } else if (OB_FAIL(scan_ctdef->rowkey_exprs_.init(rowkey_cnt))) { + LOG_WARN("failed to init rowkey exprs", K(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_cnt; ++i) { + ObExpr *expr = scan_ctdef->result_output_.at(i); + if (OB_FAIL(scan_ctdef->rowkey_exprs_.push_back(expr))) { + LOG_WARN("append rowkey exprs failed", K(ret)); + } + } + + if (OB_SUCC(ret) && OB_FAIL(generate_das_sort_ctdef(scan_ctdef->rowkey_exprs_, aux_lookup_ctdef, sort_ctdef))) { + LOG_WARN("generate sort ctdef failed", K(ret)); + } else { + root_ctdef = sort_ctdef; + } + } + } + + return ret; +} + +int ObTscCgService::generate_gis_ir_ctdef(const ObLogTableScan &op, + ObTableScanCtDef &tsc_ctdef, + ObDASBaseCtDef *&root_ctdef) +{ + int ret = OB_SUCCESS; + + ObDASScanCtDef *scan_ctdef = &tsc_ctdef.scan_ctdef_; + ObSEArray rowkey_exprs; + if (scan_ctdef->result_output_.count() == 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to generate gis ir ctdef, scan_ctdef.result_output_.count() is 0", K(ret)); + } else { + int64_t rowkey_cnt = scan_ctdef->result_output_.count() - 1; + if (scan_ctdef->trans_info_expr_ != nullptr) { + rowkey_cnt = rowkey_cnt - 1; + } + for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_cnt; ++i) { + ObExpr *expr = scan_ctdef->result_output_.at(i); + if (OB_FAIL(rowkey_exprs.push_back(expr))) { + LOG_WARN("append rowkey exprs failed", K(ret)); + } + } + } + + ObDASSortCtDef *sort_ctdef = nullptr; + if (OB_SUCC(ret) && OB_FAIL(generate_das_sort_ctdef(rowkey_exprs, scan_ctdef, sort_ctdef))) { + LOG_WARN("generate sort ctdef failed", K(ret)); + } else { + root_ctdef = sort_ctdef; + } + + return ret; +} + int ObTscCgService::generate_text_ir_ctdef(const ObLogTableScan &op, ObTableScanCtDef &tsc_ctdef, ObDASBaseCtDef *&root_ctdef) @@ -2418,7 +2509,14 @@ int ObTscCgService::generate_doc_id_lookup_ctdef(const ObLogTableScan &op, } if (OB_SUCC(ret)) { - if (OB_FAIL(aux_lookup_ctdef->result_output_.assign(result_outputs))) { + if (op.is_multivalue_index_scan()) { + ObDASScanCtDef *index_ctdef = static_cast(ir_scan_ctdef); + if (OB_FAIL(append_array_no_dup(result_outputs, index_ctdef->result_output_))) { + LOG_WARN("append result output failed", K(ret)); + } + } + + if (OB_SUCC(ret) && OB_FAIL(aux_lookup_ctdef->result_output_.assign(result_outputs))) { LOG_WARN("assign result output failed", K(ret)); } } @@ -3048,6 +3146,81 @@ int ObTscCgService::generate_das_sort_ctdef( } } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(append_array_no_dup(result_output, sort_ctdef->sort_exprs_))) { + LOG_WARN("failed to append sort exprs to result output", K(ret)); + } else if (ObDASTaskFactory::is_attached(child_ctdef->op_type_) + && OB_FAIL(append_array_no_dup(result_output, static_cast(child_ctdef)->result_output_))) { + LOG_WARN("failed to append child result output", K(ret)); + } else if (child_ctdef->op_type_ == DAS_OP_TABLE_SCAN + && OB_FAIL(append_array_no_dup(result_output, static_cast(child_ctdef)->result_output_))) { + LOG_WARN("failed to append child result output", K(ret)); + } else if (OB_FAIL(sort_ctdef->result_output_.assign(result_output))) { + LOG_WARN("failed to assign result output", K(ret)); + } else { + LOG_TRACE("generate sort ctdef finished", K(sort_keys), K(sort_ctdef->sort_exprs_), K(result_output), K(ret)); + } + return ret; +} + +int ObTscCgService::generate_das_sort_ctdef( + const ObIArray &sort_keys, + ObDASBaseCtDef *child_ctdef, + ObDASSortCtDef *&sort_ctdef) +{ + int ret = OB_SUCCESS; + const int64_t sort_cnt = sort_keys.count(); + ObIAllocator &ctdef_alloc = cg_.phy_plan_->get_allocator(); + if (OB_UNLIKELY(0 == sort_cnt) || OB_ISNULL(child_ctdef)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid sort arg", K(ret), K(sort_cnt), KPC(child_ctdef)); + } else if (OB_FAIL(ObDASTaskFactory::alloc_das_ctdef(DAS_OP_SORT, ctdef_alloc, sort_ctdef))) { + LOG_WARN("alloc sort ctdef failed ", K(ret)); + } else if (OB_FAIL(sort_ctdef->sort_collations_.init(sort_cnt))) { + LOG_WARN("failed to init sort collations", K(ret)); + } else if (OB_FAIL(sort_ctdef->sort_cmp_funcs_.init(sort_cnt))) { + LOG_WARN("failed to init sort cmp funcs", K(ret)); + } else if (OB_FAIL(sort_ctdef->sort_exprs_.init(sort_cnt))) { + LOG_WARN("failed to init sort exprs", K(ret)); + } else if (OB_ISNULL(sort_ctdef->children_ = OB_NEW_ARRAY(ObDASBaseCtDef*, &ctdef_alloc, 1))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate ir scan ctdef children failed", K(ret)); + } else { + sort_ctdef->children_cnt_ = 1; + sort_ctdef->children_[0] = child_ctdef; + sort_ctdef->fetch_with_ties_ = false; + } + + ObSEArray result_output; + int64_t field_idx = 0; + for (int64_t i = 0; i < sort_keys.count() && OB_SUCC(ret); ++i) { + ObExpr *expr = sort_keys.at(i); + ObSortFieldCollation field_collation(field_idx, expr->datum_meta_.cs_type_, true, NULL_FIRST); + ObSortCmpFunc cmp_func; + cmp_func.cmp_func_ = ObDatumFuncs::get_nullsafe_cmp_func( + expr->datum_meta_.type_, + expr->datum_meta_.type_, + field_collation.null_pos_, + field_collation.cs_type_, + expr->datum_meta_.scale_, + lib::is_oracle_mode(), + expr->obj_meta_.has_lob_header(), + expr->datum_meta_.precision_, + expr->datum_meta_.precision_); + if (OB_ISNULL(cmp_func.cmp_func_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("cmp_func is null, check datatype is valid", K(ret)); + } else if (OB_FAIL(sort_ctdef->sort_cmp_funcs_.push_back(cmp_func))) { + LOG_WARN("failed to append sort function", K(ret)); + } else if (OB_FAIL(sort_ctdef->sort_collations_.push_back(field_collation))) { + LOG_WARN("failed to push back field collation", K(ret)); + } else if (OB_FAIL(sort_ctdef->sort_exprs_.push_back(expr))) { + LOG_WARN("failed to push back expr", K(ret)); + } else { + field_idx++; + } + } + if (OB_FAIL(ret)) { } else if (OB_FAIL(append_array_no_dup(result_output, sort_ctdef->sort_exprs_))) { LOG_WARN("failed to append sort exprs to result output", K(ret)); @@ -3060,7 +3233,6 @@ int ObTscCgService::generate_das_sort_ctdef( } else if (OB_FAIL(sort_ctdef->result_output_.assign(result_output))) { LOG_WARN("failed to assign result output", K(ret)); } - LOG_TRACE("generate sort ctdef finished", K(sort_keys), K(sort_ctdef->sort_exprs_), K(result_output), K(ret)); return ret; } diff --git a/src/sql/code_generator/ob_tsc_cg_service.h b/src/sql/code_generator/ob_tsc_cg_service.h index e48c4f75b1..77e99809ac 100644 --- a/src/sql/code_generator/ob_tsc_cg_service.h +++ b/src/sql/code_generator/ob_tsc_cg_service.h @@ -77,6 +77,8 @@ private: int generate_geo_access_ctdef(const ObLogTableScan &op, const ObTableSchema &index_schema, ObArray &access_exprs); int generate_text_ir_ctdef(const ObLogTableScan &op, ObTableScanCtDef &tsc_ctdef, ObDASBaseCtDef *&root_ctdef); int generate_vec_ir_ctdef(const ObLogTableScan &op, ObTableScanCtDef &tsc_ctdef, ObDASBaseCtDef *&root_ctdef); + int generate_multivalue_ir_ctdef(const ObLogTableScan &op, ObTableScanCtDef &tsc_ctdef, ObDASBaseCtDef *&root_ctdef); + int generate_gis_ir_ctdef(const ObLogTableScan &op, ObTableScanCtDef &tsc_ctdef, ObDASBaseCtDef *&root_ctdef); int extract_text_ir_access_columns(const ObLogTableScan &op, const ObDASScanCtDef &scan_ctdef, ObIArray &access_exprs); @@ -153,6 +155,9 @@ private: ObRawExpr *topk_offset_expr, ObDASBaseCtDef *child_ctdef, ObDASSortCtDef *&sort_ctdef); + int generate_das_sort_ctdef(const ObIArray &sort_keys, + ObDASBaseCtDef *child_ctdef, + ObDASSortCtDef *&sort_ctdef); int mapping_oracle_real_agent_virtual_exprs(const ObLogTableScan &op, common::ObIArray &access_exprs); int generate_mr_mv_scan_flag(const ObLogTableScan &op, ObQueryFlag &query_flag) const; diff --git a/src/sql/das/iter/ob_das_doc_id_merge_iter.h b/src/sql/das/iter/ob_das_doc_id_merge_iter.h index 717fdc619a..c0f1164203 100644 --- a/src/sql/das/iter/ob_das_doc_id_merge_iter.h +++ b/src/sql/das/iter/ob_das_doc_id_merge_iter.h @@ -26,6 +26,8 @@ namespace sql class ObDASDocIdMergeCtDef; class ObDASDocIdMergeRtDef; +class ObDASScanCtDef; +class ObDASScanRtDef; class ObDASDocIdMergeIterParam final : public ObDASIterParam { diff --git a/src/sql/das/iter/ob_das_iter_define.h b/src/sql/das/iter/ob_das_iter_define.h index 434b88fc8f..fea90f7fde 100644 --- a/src/sql/das/iter/ob_das_iter_define.h +++ b/src/sql/das/iter/ob_das_iter_define.h @@ -71,6 +71,16 @@ enum ObDASIterTreeType : uint32_t ITER_TREE_MAX }; +#define SUPPORTED_DAS_ITER_TREE(_type) \ +({ \ + ITER_TREE_PARTITION_SCAN == (_type) || \ + ITER_TREE_LOCAL_LOOKUP == (_type) || \ + ITER_TREE_TEXT_RETRIEVAL == (_type) || \ + ITER_TREE_INDEX_MERGE == (_type) || \ + ITER_TREE_MVI_LOOKUP == (_type) || \ + ITER_TREE_GIS_LOOKUP == (_type); \ +}) + struct ObDASRelatedTabletID { public: diff --git a/src/sql/das/iter/ob_das_iter_utils.cpp b/src/sql/das/iter/ob_das_iter_utils.cpp index f146c85300..78430d806c 100644 --- a/src/sql/das/iter/ob_das_iter_utils.cpp +++ b/src/sql/das/iter/ob_das_iter_utils.cpp @@ -18,6 +18,52 @@ namespace oceanbase namespace sql { /***************** PUBLIC BEGIN *****************/ +void ObDASIterUtils::init_scan_iter_param(ObDASScanIterParam ¶m, const ObDASScanCtDef *scan_ctdef, ObDASBaseRtDef *scan_rtdef) +{ + param.scan_ctdef_ = scan_ctdef; + param.max_size_ = scan_rtdef->eval_ctx_->is_vectorized() ? scan_rtdef->eval_ctx_->max_batch_size_ : 1; + param.eval_ctx_ = scan_rtdef->eval_ctx_; + param.exec_ctx_ = &scan_rtdef->eval_ctx_->exec_ctx_; + param.output_ = &scan_ctdef->result_output_; +} + +void ObDASIterUtils::init_spatial_scan_iter_param(ObDASSpatialScanIterParam ¶m, const ObDASScanCtDef *scan_ctdef, ObDASScanRtDef *scan_rtdef) +{ + param.scan_ctdef_ = scan_ctdef; + param.max_size_ = scan_rtdef->eval_ctx_->is_vectorized() ? scan_rtdef->eval_ctx_->max_batch_size_ : 1; + param.eval_ctx_ = scan_rtdef->eval_ctx_; + param.exec_ctx_ = &scan_rtdef->eval_ctx_->exec_ctx_; + param.output_ = &scan_ctdef->result_output_; + param.scan_rtdef_ = scan_rtdef; +} + +int ObDASIterUtils::create_das_spatial_scan_iter(ObIAllocator &alloc, ObDASSpatialScanIterParam ¶m, ObDASSpatialScanIter *&result) +{ + int ret = OB_SUCCESS; + ObDASSpatialScanIter *iter = nullptr; + + void *buf = alloc.alloc(sizeof(ObDASSpatialScanIter)); + if (OB_ISNULL(buf)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc ObDASSpatialScanIter buf"); + } else { + iter= new(buf) ObDASSpatialScanIter(alloc); + if (OB_FAIL(iter->init(param))) { + LOG_WARN("failed to init ObDASSpatialScanIter", K(ret)); + } + } + if (OB_SUCC(ret)) { + result = iter; + } else { + if (OB_NOT_NULL(iter)) { + iter->release(); + alloc.free(iter); + iter = nullptr; + } + } + return ret; +} + int ObDASIterUtils::create_das_scan_iter_tree(ObDASIterTreeType tree_type, storage::ObTableScanParam &scan_param, const ObDASScanCtDef *scan_ctdef, @@ -54,6 +100,14 @@ int ObDASIterUtils::create_das_scan_iter_tree(ObDASIterTreeType tree_type, ret = create_index_merge_iter_tree(scan_param, alloc, attach_ctdef, attach_rtdef, related_tablet_ids, trans_desc, snapshot, iter_tree); break; } + case ITER_TREE_MVI_LOOKUP: { + ret = create_mvi_lookup_tree(scan_param, alloc, attach_ctdef, attach_rtdef, related_tablet_ids, trans_desc, snapshot, iter_tree); + break; + } + case ITER_TREE_GIS_LOOKUP: { + ret = create_gis_lookup_tree(scan_param, alloc, attach_ctdef, attach_rtdef, related_tablet_ids, trans_desc, snapshot, iter_tree); + break; + } default: { ret = OB_ERR_UNEXPECTED; } @@ -298,11 +352,7 @@ int ObDASIterUtils::create_partition_scan_tree(storage::ObTableScanParam &scan_p { int ret = OB_SUCCESS; ObDASScanIterParam param; - param.scan_ctdef_ = scan_ctdef; - param.max_size_ = scan_rtdef->eval_ctx_->is_vectorized() ? scan_rtdef->eval_ctx_->max_batch_size_ : 1; - param.eval_ctx_ = scan_rtdef->eval_ctx_; - param.exec_ctx_ = &scan_rtdef->eval_ctx_->exec_ctx_; - param.output_ = &scan_ctdef->result_output_; + init_scan_iter_param(param, scan_ctdef, scan_rtdef); ObDASScanIter *scan_iter = nullptr; if (OB_FAIL(create_das_iter(alloc, param, scan_iter))) { LOG_WARN("failed to create das scan iter", K(ret)); @@ -348,17 +398,9 @@ int ObDASIterUtils::create_local_lookup_tree(ObTableScanParam &scan_param, ObDASScanIter *data_table_iter = nullptr; ObDASLocalLookupIter *lookup_iter = nullptr; ObDASScanIterParam index_table_param; - index_table_param.scan_ctdef_ = scan_ctdef; - index_table_param.max_size_ = scan_rtdef->eval_ctx_->is_vectorized() ? scan_rtdef->eval_ctx_->max_batch_size_ : 1; - index_table_param.eval_ctx_ = scan_rtdef->eval_ctx_; - index_table_param.exec_ctx_ = &scan_rtdef->eval_ctx_->exec_ctx_; - index_table_param.output_ = &scan_ctdef->result_output_; + init_scan_iter_param(index_table_param, scan_ctdef, scan_rtdef); ObDASScanIterParam data_table_param; - data_table_param.scan_ctdef_ = lookup_ctdef; - data_table_param.max_size_ = lookup_rtdef->eval_ctx_->is_vectorized() ? lookup_rtdef->eval_ctx_->max_batch_size_ : 1; - data_table_param.eval_ctx_ = lookup_rtdef->eval_ctx_; - data_table_param.exec_ctx_ = &lookup_rtdef->eval_ctx_->exec_ctx_; - data_table_param.output_ = &lookup_ctdef->result_output_; + init_scan_iter_param(data_table_param, lookup_ctdef, lookup_rtdef); if (OB_FAIL(create_das_iter(alloc, index_table_param, index_table_iter))) { LOG_WARN("failed to create index table iter", K(ret)); } else if (OB_FAIL(create_das_iter(alloc, data_table_param, data_table_iter))) { @@ -400,7 +442,6 @@ int ObDASIterUtils::create_local_lookup_tree(ObTableScanParam &scan_param, lookup_param.eval_ctx_ = lookup_rtdef->eval_ctx_; lookup_param.exec_ctx_ = &lookup_rtdef->eval_ctx_->exec_ctx_; lookup_param.output_ = &lookup_ctdef->result_output_; - lookup_param.default_batch_row_count_ = 1000; // hard code 1000 for local lookup lookup_param.index_ctdef_ = scan_ctdef; lookup_param.index_rtdef_ = scan_rtdef; lookup_param.lookup_ctdef_ = lookup_ctdef; @@ -496,7 +537,7 @@ int ObDASIterUtils::create_text_retrieval_tree(ObTableScanParam &scan_param, const bool need_rewind = true; if (!has_sort) { // skip - } else if (OB_FAIL(create_sort_sub_tree(alloc, sort_ctdef, sort_rtdef, need_rewind, text_retrieval_result, sort_result))) { + } else if (OB_FAIL(create_sort_sub_tree(alloc, sort_ctdef, sort_rtdef, need_rewind, false/*need_distinct*/,text_retrieval_result, sort_result))) { LOG_WARN("failed to create sort sub tree", K(ret)); } else { root_iter = sort_result; @@ -584,11 +625,8 @@ int ObDASIterUtils::create_text_retrieval_sub_tree(const ObLSID &ls_id, } else if (merge_iter_param.query_tokens_.count() > OB_MAX_TEXT_RETRIEVAL_TOKEN_CNT || !ir_scan_ctdef->need_proj_relevance_score()) { if (!ir_scan_ctdef->need_estimate_total_doc_cnt()) { - doc_cnt_agg_param.scan_ctdef_ = ir_scan_ctdef->get_doc_id_idx_agg_ctdef(); + init_scan_iter_param(doc_cnt_agg_param, ir_scan_ctdef->get_doc_id_idx_agg_ctdef(), ir_scan_rtdef); doc_cnt_agg_param.max_size_ = ir_scan_rtdef->eval_ctx_->max_batch_size_; - doc_cnt_agg_param.eval_ctx_ = ir_scan_rtdef->eval_ctx_; - doc_cnt_agg_param.exec_ctx_ = &ir_scan_rtdef->eval_ctx_->exec_ctx_; - doc_cnt_agg_param.output_ = &ir_scan_ctdef->get_doc_id_idx_agg_ctdef()->result_output_; if (OB_FAIL(create_das_iter(alloc, doc_cnt_agg_param, doc_cnt_agg_iter))) { LOG_WARN("failed to create doc cnt agg scan iter", K(ret)); } else { @@ -605,11 +643,8 @@ int ObDASIterUtils::create_text_retrieval_sub_tree(const ObLSID &ls_id, } } else { if (ir_scan_ctdef->need_calc_relevance() && !ir_scan_ctdef->need_estimate_total_doc_cnt()) { - doc_cnt_agg_param.scan_ctdef_ = ir_scan_ctdef->get_doc_id_idx_agg_ctdef(); + init_scan_iter_param(doc_cnt_agg_param, ir_scan_ctdef->get_doc_id_idx_agg_ctdef(), ir_scan_rtdef); doc_cnt_agg_param.max_size_ = ir_scan_rtdef->eval_ctx_->max_batch_size_; - doc_cnt_agg_param.eval_ctx_ = ir_scan_rtdef->eval_ctx_; - doc_cnt_agg_param.exec_ctx_ = &ir_scan_rtdef->eval_ctx_->exec_ctx_; - doc_cnt_agg_param.output_ = &ir_scan_ctdef->get_doc_id_idx_agg_ctdef()->result_output_; if (OB_FAIL(create_das_iter(alloc, doc_cnt_agg_param, doc_cnt_agg_iter))) { LOG_WARN("failed to create doc cnt agg scan iter", K(ret)); } else { @@ -644,25 +679,17 @@ int ObDASIterUtils::create_text_retrieval_sub_tree(const ObLSID &ls_id, ObDASScanIterParam inv_idx_scan_iter_param; ObDASScanIter *inv_idx_scan_iter = nullptr; - inv_idx_scan_iter_param.scan_ctdef_ = ir_scan_ctdef->get_inv_idx_scan_ctdef(); + init_scan_iter_param(inv_idx_scan_iter_param, ir_scan_ctdef->get_inv_idx_scan_ctdef(), ir_scan_rtdef); inv_idx_scan_iter_param.max_size_ = ir_scan_rtdef->eval_ctx_->max_batch_size_; - inv_idx_scan_iter_param.eval_ctx_ = ir_scan_rtdef->eval_ctx_; - inv_idx_scan_iter_param.exec_ctx_ = &ir_scan_rtdef->eval_ctx_->exec_ctx_; - inv_idx_scan_iter_param.output_ = &ir_scan_ctdef->get_inv_idx_scan_ctdef()->result_output_; ObDASScanIterParam inv_idx_agg_iter_param; ObDASScanIter *inv_idx_agg_iter = nullptr; - inv_idx_agg_iter_param.scan_ctdef_ = ir_scan_ctdef->get_inv_idx_agg_ctdef(); + init_scan_iter_param(inv_idx_agg_iter_param, ir_scan_ctdef->get_inv_idx_agg_ctdef(), ir_scan_rtdef); inv_idx_agg_iter_param.max_size_ = ir_scan_rtdef->eval_ctx_->max_batch_size_; - inv_idx_agg_iter_param.eval_ctx_ = ir_scan_rtdef->eval_ctx_; - inv_idx_agg_iter_param.exec_ctx_ = &ir_scan_rtdef->eval_ctx_->exec_ctx_; - inv_idx_agg_iter_param.output_ = &ir_scan_ctdef->get_inv_idx_agg_ctdef()->result_output_; ObDASScanIterParam fwd_idx_iter_param; ObDASScanIter *fwd_idx_iter = nullptr; - fwd_idx_iter_param.scan_ctdef_ = ir_scan_ctdef->get_fwd_idx_agg_ctdef(); + init_scan_iter_param(fwd_idx_iter_param, ir_scan_ctdef->get_fwd_idx_agg_ctdef(), ir_scan_rtdef); fwd_idx_iter_param.max_size_ = ir_scan_rtdef->eval_ctx_->max_batch_size_; - fwd_idx_iter_param.eval_ctx_ = ir_scan_rtdef->eval_ctx_; - fwd_idx_iter_param.exec_ctx_ = &ir_scan_rtdef->eval_ctx_->exec_ctx_; - fwd_idx_iter_param.output_ = &ir_scan_ctdef->get_fwd_idx_agg_ctdef()->result_output_; + if (OB_FAIL(create_das_iter(alloc, inv_idx_scan_iter_param, inv_idx_scan_iter))) { LOG_WARN("failed to create inv idx iter", K(ret)); } else if (ir_scan_ctdef->need_inv_idx_agg() @@ -763,11 +790,7 @@ int ObDASIterUtils::create_doc_id_scan_sub_tree( ObDASDocIdMergeIter *doc_id_merge_iter = nullptr; ObDASScanIterParam rowkey_doc_param; ObDASScanIter *rowkey_doc_iter = nullptr; - rowkey_doc_param.scan_ctdef_ = static_cast(merge_ctdef->children_[1]); - rowkey_doc_param.max_size_ = merge_rtdef->eval_ctx_->is_vectorized() ? merge_rtdef->eval_ctx_->max_batch_size_ : 1; - rowkey_doc_param.eval_ctx_ = merge_rtdef->eval_ctx_; - rowkey_doc_param.exec_ctx_ = &merge_rtdef->eval_ctx_->exec_ctx_; - rowkey_doc_param.output_ = &rowkey_doc_param.scan_ctdef_->result_output_; + init_scan_iter_param(rowkey_doc_param, static_cast(merge_ctdef->children_[1]),merge_rtdef ); if (OB_FAIL(create_das_iter(alloc, rowkey_doc_param, rowkey_doc_iter))) { LOG_WARN("fail to create das scan iter", K(ret), K(rowkey_doc_param)); } else { @@ -816,12 +839,7 @@ int ObDASIterUtils::create_vid_scan_sub_tree( ObDASVIdMergeIter *vid_merge_iter = nullptr; ObDASScanIterParam rowkey_vid_param; ObDASScanIter *rowkey_vid_iter = nullptr; - rowkey_vid_param.scan_ctdef_ = static_cast(merge_ctdef->children_[1]); - rowkey_vid_param.max_size_ = merge_rtdef->eval_ctx_->is_vectorized() ? - merge_rtdef->eval_ctx_->max_batch_size_ : 1; - rowkey_vid_param.eval_ctx_ = merge_rtdef->eval_ctx_; - rowkey_vid_param.exec_ctx_ = &merge_rtdef->eval_ctx_->exec_ctx_; - rowkey_vid_param.output_ = &rowkey_vid_param.scan_ctdef_->result_output_; + init_scan_iter_param(rowkey_vid_param, static_cast(merge_ctdef->children_[1]), merge_rtdef); if (OB_FAIL(create_das_iter(alloc, rowkey_vid_param, rowkey_vid_iter))) { LOG_WARN("fail to create das scan iter", K(ret), K(rowkey_vid_param)); } else { @@ -991,10 +1009,275 @@ int ObDASIterUtils::create_domain_lookup_sub_tree(ObTableScanParam &scan_param, return ret; } + +/* local_lookup + * | | + * sort_distinct main_data_table + * | + * aux_local_lookup + * | | + * index_table docid_rowkey_table +*/ +int ObDASIterUtils::create_mvi_lookup_tree(ObTableScanParam &scan_param, + common::ObIAllocator &alloc, + const ObDASBaseCtDef *attach_ctdef, + ObDASBaseRtDef *attach_rtdef, + const ObDASRelatedTabletID &related_tablet_ids, + transaction::ObTxDesc *trans_desc, + transaction::ObTxReadSnapshot *snapshot, + ObDASIter *&iter_tree) +{ + int ret = OB_SUCCESS; + + const ObDASSortCtDef *sort_ctdef = nullptr; + ObDASSortRtDef *sort_rtdef = nullptr; + const ObDASTableLookupCtDef *lookup_ctdef = nullptr; + ObDASTableLookupRtDef *lookup_rtdef = nullptr; + const ObDASIRAuxLookupCtDef *mvi_lookup_ctdef = nullptr; + ObDASIRAuxLookupRtDef *mvi_lookup_rtdef = nullptr; + + ObDASScanIter *index_table_iter = nullptr; + ObDASScanIter *docid_rowkey_table_iter = nullptr; + ObDASMVILookupIter *mvi_lookup_iter = nullptr; + ObDASIter *sort_iter = nullptr; + + if (OB_ISNULL(attach_ctdef) || OB_ISNULL(attach_rtdef)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table lookup param is nullptr", KP(attach_ctdef), KP(attach_rtdef)); + } else if (OB_FAIL(ObDASUtils::find_target_das_def(attach_ctdef, attach_rtdef, DAS_OP_SORT, sort_ctdef, sort_rtdef))) { + LOG_WARN("find sort def failed", K(ret)); + } else if (OB_FAIL(ObDASUtils::find_target_das_def(attach_ctdef, attach_rtdef, DAS_OP_IR_AUX_LOOKUP, mvi_lookup_ctdef, mvi_lookup_rtdef))) { + LOG_WARN("find ir aux lookup def failed", K(ret)); + } else if (mvi_lookup_ctdef->children_cnt_ != 2) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("find index def failed", K(ret), K(mvi_lookup_ctdef->children_cnt_)); + } else { + const ObDASScanCtDef* index_ctdef = static_cast(mvi_lookup_ctdef->children_[0]); + ObDASScanRtDef * index_rtdef = static_cast(mvi_lookup_rtdef->children_[0]); + const ObDASScanCtDef* docid_table_ctdef = mvi_lookup_ctdef->get_lookup_scan_ctdef(); + ObDASScanRtDef * docid_table_rtdef = mvi_lookup_rtdef->get_lookup_scan_rtdef(); + + ObDASScanIterParam index_table_param; + init_scan_iter_param(index_table_param, index_ctdef, index_rtdef); + ObDASScanIterParam docid_rowkey_table_param; + init_scan_iter_param(docid_rowkey_table_param, docid_table_ctdef, docid_table_rtdef); + + if (OB_FAIL(create_das_iter(alloc, index_table_param, index_table_iter))) { + LOG_WARN("failed to create index table scan iter", K(ret)); + } else if (OB_FAIL(create_das_iter(alloc, docid_rowkey_table_param, docid_rowkey_table_iter))){ + LOG_WARN("failed to create docid rowkey table scan iter", K(ret)); + } else { + ObDASLocalLookupIterParam mvi_lookup_param; + mvi_lookup_param.max_size_ = 1; + mvi_lookup_param.eval_ctx_ = mvi_lookup_rtdef->eval_ctx_; + mvi_lookup_param.exec_ctx_ = &mvi_lookup_rtdef->eval_ctx_->exec_ctx_; + mvi_lookup_param.output_ = &mvi_lookup_ctdef->result_output_; + mvi_lookup_param.index_ctdef_ = index_ctdef; + mvi_lookup_param.index_rtdef_ = index_rtdef; + mvi_lookup_param.lookup_ctdef_ = docid_table_ctdef; + mvi_lookup_param.lookup_rtdef_ = docid_table_rtdef; + mvi_lookup_param.index_table_iter_ = index_table_iter; + mvi_lookup_param.data_table_iter_ = docid_rowkey_table_iter; + mvi_lookup_param.trans_desc_ = trans_desc; + mvi_lookup_param.snapshot_ = snapshot; + mvi_lookup_param.rowkey_exprs_ = &mvi_lookup_ctdef->get_lookup_scan_ctdef()->rowkey_exprs_; + if (OB_FAIL(create_das_iter(alloc, mvi_lookup_param, mvi_lookup_iter))) { + LOG_WARN("failed to create mvi lookup iter", K(ret)); + } else if (OB_FAIL(create_iter_children_array(2, alloc, mvi_lookup_iter))) { + LOG_WARN("failed to create iter children array", K(ret)); + } else { + mvi_lookup_iter->get_children()[0] = index_table_iter; + mvi_lookup_iter->get_children()[1] = docid_rowkey_table_iter; + index_table_iter->set_scan_param(scan_param); + docid_rowkey_table_iter->set_scan_param(mvi_lookup_iter->get_lookup_param()); + mvi_lookup_iter->set_tablet_id(related_tablet_ids.aux_lookup_tablet_id_); + mvi_lookup_iter->set_ls_id(scan_param.ls_id_); + } + } + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(create_sort_sub_tree(alloc, sort_ctdef, sort_rtdef,false/*need_rewind*/, + true/*need_distinct*/, mvi_lookup_iter, sort_iter))) { + LOG_WARN("failed to create sort sub tree", K(ret)); + } else if (OB_FAIL(ObDASUtils::find_target_das_def(attach_ctdef, attach_rtdef, DAS_OP_TABLE_LOOKUP, lookup_ctdef, lookup_rtdef))) { + // multivalue index scan and don't need to index back lookup. + ret = OB_SUCCESS; + iter_tree = sort_iter; + } else { + ObDASScanIter *data_table_iter = nullptr; + ObDASLocalLookupIter *local_lookup_iter = nullptr; + + const ObDASScanCtDef *data_table_ctdef = lookup_ctdef->get_lookup_scan_ctdef(); + ObDASScanRtDef *data_table_rtdef = lookup_rtdef->get_lookup_scan_rtdef(); + ObDASScanIterParam data_table_param; + init_scan_iter_param(data_table_param, data_table_ctdef, data_table_rtdef); + + if (OB_FAIL(create_das_iter(alloc, data_table_param, data_table_iter))) { + LOG_WARN("failed to create data table scan iter", K(ret)); + } else { + ObDASIter *tmp_data_table_iter = static_cast(data_table_iter); + + ObDASBaseCtDef *ctdef = lookup_ctdef->children_[1]; + ObDASBaseRtDef *rtdef = lookup_rtdef->children_[1]; + if (OB_ISNULL(ctdef) || OB_ISNULL(rtdef)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpeted error, ctdef or rtdef is nullptr", K(ret), KPC(ctdef), KPC(rtdef)); + } else if (ObDASOpType::DAS_OP_TABLE_SCAN == ctdef->op_type_) { + // nothing to do + } else if (OB_UNLIKELY(ObDASOpType::DAS_OP_VID_MERGE == ctdef->op_type_)) { + if (OB_FAIL(create_vid_scan_sub_tree(scan_param, alloc, static_cast(ctdef), + static_cast(rtdef), related_tablet_ids, trans_desc, snapshot, tmp_data_table_iter))) { + LOG_WARN("fail to create doc id scan sub tree", K(ret), K(scan_param), KPC(ctdef), KPC(rtdef)); + } + } else if (OB_UNLIKELY(ObDASOpType::DAS_OP_DOC_ID_MERGE != ctdef->op_type_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected error, attach op type isn't doc id merge", K(ret), K(ctdef->op_type_), KPC(ctdef)); + } else if (OB_FAIL(create_doc_id_scan_sub_tree(scan_param, alloc, static_cast(ctdef), + static_cast(rtdef), related_tablet_ids, trans_desc, snapshot, tmp_data_table_iter))) { + LOG_WARN("fail to create doc id scan sub tree", K(ret), K(scan_param), KPC(ctdef), KPC(rtdef)); + } + + if (OB_SUCC(ret)) { + ObDASLocalLookupIterParam lookup_param; + lookup_param.max_size_ = lookup_rtdef->eval_ctx_->is_vectorized() ? data_table_rtdef->eval_ctx_->max_batch_size_ : 1; + lookup_param.eval_ctx_ = lookup_rtdef->eval_ctx_; + lookup_param.exec_ctx_ = &lookup_rtdef->eval_ctx_->exec_ctx_; + lookup_param.output_ = &lookup_ctdef->result_output_; + lookup_param.index_ctdef_ = sort_ctdef; + lookup_param.index_rtdef_ = sort_rtdef; + lookup_param.lookup_ctdef_ = data_table_ctdef; + lookup_param.lookup_rtdef_ = data_table_rtdef; + lookup_param.index_table_iter_ = sort_iter; + lookup_param.data_table_iter_ = tmp_data_table_iter; + lookup_param.trans_desc_ = trans_desc; + lookup_param.snapshot_ = snapshot; + lookup_param.rowkey_exprs_ = &lookup_ctdef->get_lookup_scan_ctdef()->rowkey_exprs_; + + if (OB_FAIL(create_das_iter(alloc, lookup_param, local_lookup_iter))) { + LOG_WARN("failed to create mvi lookup iter", K(ret)); + } else if (OB_FAIL(create_iter_children_array(2, alloc, local_lookup_iter))) { + LOG_WARN("failed to create iter children array", K(ret)); + } else { + local_lookup_iter->get_children()[0] = sort_iter; + local_lookup_iter->get_children()[1] = tmp_data_table_iter; + data_table_iter->set_scan_param(local_lookup_iter->get_lookup_param()); + local_lookup_iter->set_tablet_id(related_tablet_ids.lookup_tablet_id_); + local_lookup_iter->set_ls_id(scan_param.ls_id_); + iter_tree = local_lookup_iter; + } + } + } + } + + return ret; +} + +int ObDASIterUtils::create_gis_lookup_tree(ObTableScanParam &scan_param, + common::ObIAllocator &alloc, + const ObDASBaseCtDef *attach_ctdef, + ObDASBaseRtDef *attach_rtdef, + const ObDASRelatedTabletID &related_tablet_ids, + transaction::ObTxDesc *trans_desc, + transaction::ObTxReadSnapshot *snapshot, + ObDASIter *&iter_tree) +{ + int ret = OB_SUCCESS; + + const ObDASTableLookupCtDef *lookup_ctdef = nullptr; + ObDASTableLookupRtDef *lookup_rtdef = nullptr; + const ObDASSortCtDef *sort_ctdef = nullptr; + ObDASSortRtDef *sort_rtdef = nullptr; + + if (OB_ISNULL(attach_ctdef) || OB_ISNULL(attach_rtdef)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table lookup param is nullptr", KP(attach_ctdef), KP(attach_rtdef)); + } else if (OB_FAIL(ObDASUtils::find_target_das_def(attach_ctdef, + attach_rtdef, + DAS_OP_TABLE_LOOKUP, + lookup_ctdef, + lookup_rtdef))) { + LOG_WARN("find data table lookup def failed", K(ret)); + } else if (OB_FAIL(ObDASUtils::find_target_das_def(attach_ctdef, + attach_rtdef, + DAS_OP_SORT, + sort_ctdef, + sort_rtdef))) { + LOG_WARN("find sort def failed", K(ret)); + } else { + ObDASScanIter *data_table_iter = nullptr; + ObDASSpatialScanIter *index_table_iter = nullptr; + ObDASIter *sort_iter = nullptr; // ObDASSortDistinctIter + ObDASLocalLookupIter *local_lookup_iter = nullptr; + + const ObDASScanCtDef *data_table_ctdef = lookup_ctdef->get_lookup_scan_ctdef(); + ObDASScanRtDef *data_table_rtdef = lookup_rtdef->get_lookup_scan_rtdef(); + const ObDASScanCtDef* index_ctdef = static_cast(sort_ctdef->children_[0]); + ObDASScanRtDef * index_rtdef = static_cast(sort_rtdef->children_[0]); + + ObDASSpatialScanIterParam index_table_param; + init_spatial_scan_iter_param(index_table_param, index_ctdef, index_rtdef); + ObDASScanIterParam data_table_param; + init_scan_iter_param(data_table_param, data_table_ctdef, data_table_rtdef); + + if (OB_FAIL(create_das_spatial_scan_iter(alloc, index_table_param, index_table_iter))) { + LOG_WARN("failed to create index table scan iter", K(ret)); + } else if (OB_FALSE_IT(index_table_iter->set_scan_param(scan_param))) { + } else if (OB_FAIL(create_das_iter(alloc, data_table_param, data_table_iter))) { + LOG_WARN("failed to create data table scan iter", K(ret)); + } + + if (OB_SUCC(ret) && OB_FAIL(create_sort_sub_tree(alloc, + sort_ctdef, + sort_rtdef, + false/*need_rewind*/, + true/*need_distinct*/, + index_table_iter, + sort_iter))) { + LOG_WARN("failed to create sort sub tree", K(ret)); + } + + if (OB_SUCC(ret)) { + ObDASLocalLookupIterParam lookup_param; + lookup_param.max_size_ = lookup_rtdef->eval_ctx_->is_vectorized() ? data_table_rtdef->eval_ctx_->max_batch_size_ : 1; + lookup_param.eval_ctx_ = lookup_rtdef->eval_ctx_; + lookup_param.exec_ctx_ = &lookup_rtdef->eval_ctx_->exec_ctx_; + lookup_param.output_ = &lookup_ctdef->result_output_; + lookup_param.index_ctdef_ = sort_ctdef; + lookup_param.index_rtdef_ = sort_rtdef; + lookup_param.lookup_ctdef_ = data_table_ctdef; + lookup_param.lookup_rtdef_ = data_table_rtdef; + lookup_param.index_table_iter_ = sort_iter; + lookup_param.data_table_iter_ = data_table_iter; + lookup_param.trans_desc_ = trans_desc; + lookup_param.snapshot_ = snapshot; + lookup_param.rowkey_exprs_ = &lookup_ctdef->get_lookup_scan_ctdef()->rowkey_exprs_; + if (OB_FAIL(create_das_iter(alloc, lookup_param, local_lookup_iter))) { + LOG_WARN("failed to create mvi lookup iter", K(ret)); + } else if (OB_FAIL(create_iter_children_array(2, alloc, local_lookup_iter))) { + LOG_WARN("failed to create iter children array", K(ret)); + } else { + local_lookup_iter->get_children()[0] = sort_iter; + local_lookup_iter->get_children()[1] = data_table_iter; + data_table_iter->set_scan_param(local_lookup_iter->get_lookup_param()); + local_lookup_iter->set_tablet_id(related_tablet_ids.lookup_tablet_id_); + local_lookup_iter->set_ls_id(scan_param.ls_id_); + } + } + + if (OB_SUCC(ret)) { + iter_tree = local_lookup_iter; + } + } + + return ret; +} + int ObDASIterUtils::create_sort_sub_tree(common::ObIAllocator &alloc, const ObDASSortCtDef *sort_ctdef, ObDASSortRtDef *sort_rtdef, const bool need_rewind, + const bool need_distinct, ObDASIter *sort_input, ObDASIter *&sort_result) { @@ -1009,6 +1292,7 @@ int ObDASIterUtils::create_sort_sub_tree(common::ObIAllocator &alloc, sort_iter_param.sort_ctdef_ = sort_ctdef; sort_iter_param.child_ = sort_input; sort_iter_param.need_rewind_ = need_rewind; + sort_iter_param.need_distinct_ = need_distinct; if (OB_FAIL(create_das_iter(alloc, sort_iter_param, sort_iter))) { LOG_WARN("failed to create sort iter", K(ret)); } else if (OB_FAIL(create_iter_children_array(1, alloc, sort_iter))) { @@ -1120,7 +1404,6 @@ int ObDASIterUtils::create_global_lookup_iter_tree(const ObTableScanCtDef &tsc_c ObDASGlobalLookupIterParam lookup_param; lookup_param.assgin(param); lookup_param.type_ = DAS_ITER_GLOBAL_LOOKUP; - lookup_param.default_batch_row_count_ = 10000; // hard code 10000 for global lookup lookup_param.index_ctdef_ = scan_ctdef; lookup_param.index_rtdef_ = &tsc_rtdef.scan_rtdef_; lookup_param.lookup_ctdef_ = lookup_ctdef; @@ -1192,11 +1475,8 @@ int ObDASIterUtils::create_index_merge_iter_tree(ObTableScanParam &scan_param, const ObDASScanCtDef *lookup_ctdef = static_cast(attach_ctdef->children_[1]); ObDASScanRtDef *lookup_rtdef = static_cast(attach_rtdef->children_[1]); ObDASScanIterParam data_table_param; - data_table_param.scan_ctdef_ = lookup_ctdef; - data_table_param.max_size_ = lookup_rtdef->eval_ctx_->is_vectorized() ? lookup_rtdef->eval_ctx_->max_batch_size_ : 1; - data_table_param.eval_ctx_ = lookup_rtdef->eval_ctx_; - data_table_param.exec_ctx_ = &lookup_rtdef->eval_ctx_->exec_ctx_; - data_table_param.output_ = &lookup_ctdef->result_output_; + init_scan_iter_param(data_table_param, lookup_ctdef, lookup_rtdef); + if (OB_FAIL(create_das_iter(alloc, data_table_param, data_table_iter))) { LOG_WARN("failed to create data table iter", K(ret)); } else { @@ -1205,7 +1485,6 @@ int ObDASIterUtils::create_index_merge_iter_tree(ObTableScanParam &scan_param, lookup_param.eval_ctx_ = lookup_rtdef->eval_ctx_; lookup_param.exec_ctx_ = &lookup_rtdef->eval_ctx_->exec_ctx_; lookup_param.output_ = &lookup_ctdef->result_output_; - lookup_param.default_batch_row_count_ = 1000; // hard code 1000 for local lookup lookup_param.index_ctdef_ = index_merge_ctdef; lookup_param.index_rtdef_ = index_merge_rtdef; lookup_param.lookup_ctdef_ = lookup_ctdef; @@ -1251,11 +1530,8 @@ int ObDASIterUtils::create_index_merge_sub_tree(const ObLSID &ls_id, LOG_WARN("unexpected nullptr", K(ctdef), K(rtdef)); } else if (ctdef->op_type_ == DAS_OP_TABLE_SCAN) { ObDASScanIterParam scan_param; - scan_param.scan_ctdef_ = static_cast(ctdef); - scan_param.max_size_ = rtdef->eval_ctx_->is_vectorized() ? rtdef->eval_ctx_->max_batch_size_ : 1; - scan_param.eval_ctx_ = rtdef->eval_ctx_; - scan_param.exec_ctx_ = &rtdef->eval_ctx_->exec_ctx_; - scan_param.output_ = &scan_param.scan_ctdef_->result_output_; + init_scan_iter_param(scan_param, static_cast(ctdef), rtdef); + ObDASScanIter *scan_iter = nullptr; if (OB_FAIL(create_das_iter(alloc, scan_param, scan_iter))) { LOG_WARN("failed to create das scan iter", K(ret)); @@ -1270,16 +1546,13 @@ int ObDASIterUtils::create_index_merge_sub_tree(const ObLSID &ls_id, ctdef->children_[0]->op_type_ == DAS_OP_TABLE_SCAN); const ObDASScanCtDef *scan_ctdef = static_cast(ctdef->children_[0]); ObDASScanIterParam child_scan_param; - child_scan_param.scan_ctdef_ = scan_ctdef; - child_scan_param.max_size_ = rtdef->eval_ctx_->is_vectorized() ? rtdef->eval_ctx_->max_batch_size_ : 1; - child_scan_param.eval_ctx_ = rtdef->eval_ctx_; - child_scan_param.exec_ctx_ = &rtdef->eval_ctx_->exec_ctx_; - child_scan_param.output_ = &scan_ctdef->result_output_; + init_scan_iter_param(child_scan_param, scan_ctdef, rtdef); + ObDASScanIter *child_scan_iter = nullptr; ObDASIter *sort_iter = nullptr; if (OB_FAIL(create_das_iter(alloc, child_scan_param, child_scan_iter))) { LOG_WARN("failed to create das scan iter", K(ret)); - } else if (OB_FAIL(create_sort_sub_tree(alloc, sort_ctdef, sort_rtdef, false/*need_rewind*/,child_scan_iter, sort_iter))) { + } else if (OB_FAIL(create_sort_sub_tree(alloc, sort_ctdef, sort_rtdef, false/*need_rewind*/, false/*need_distinct*/, child_scan_iter, sort_iter))) { LOG_WARN("failed to create das sort iter", K(ret)); } else { iter = sort_iter; diff --git a/src/sql/das/iter/ob_das_iter_utils.h b/src/sql/das/iter/ob_das_iter_utils.h index 98b4545acb..a00a58be70 100644 --- a/src/sql/das/iter/ob_das_iter_utils.h +++ b/src/sql/das/iter/ob_das_iter_utils.h @@ -15,6 +15,7 @@ #include "sql/das/iter/ob_das_iter_define.h" #include "sql/das/iter/ob_das_scan_iter.h" +#include "sql/das/iter/ob_das_spatial_scan_iter.h" #include "sql/das/iter/ob_das_merge_iter.h" #include "sql/das/iter/ob_das_local_lookup_iter.h" #include "sql/das/iter/ob_das_global_lookup_iter.h" @@ -26,6 +27,7 @@ #include "sql/das/iter/ob_das_vid_merge_iter.h" #include "sql/das/iter/ob_das_index_merge_iter.h" #include "sql/engine/table/ob_table_scan_op.h" +#include "sql/das/iter/ob_das_mvi_lookup_iter.h" namespace oceanbase { @@ -151,6 +153,24 @@ private: ObDASIter *doc_id_iter, ObDASIter *&domain_lookup_result); + static int create_mvi_lookup_tree(ObTableScanParam &scan_param, + common::ObIAllocator &alloc, + const ObDASBaseCtDef *attach_ctdef, + ObDASBaseRtDef *attach_rtdef, + const ObDASRelatedTabletID &related_tablet_ids, + transaction::ObTxDesc *trans_desc, + transaction::ObTxReadSnapshot *snapshot, + ObDASIter *&iter_tree); + + static int create_gis_lookup_tree(ObTableScanParam &scan_param, + common::ObIAllocator &alloc, + const ObDASBaseCtDef *attach_ctdef, + ObDASBaseRtDef *attach_rtdef, + const ObDASRelatedTabletID &related_tablet_ids, + transaction::ObTxDesc *trans_desc, + transaction::ObTxReadSnapshot *snapshot, + ObDASIter *&iter_tree); + static int create_text_retrieval_sub_tree(const ObLSID &ls_id, common::ObIAllocator &alloc, const ObDASIRScanCtDef *ir_scan_ctdef, @@ -164,6 +184,7 @@ private: const ObDASSortCtDef *sort_ctdef, ObDASSortRtDef *sort_rtdef, const bool need_rewind, + const bool need_distinct, ObDASIter *sort_input, ObDASIter *&sort_result); @@ -230,6 +251,15 @@ private: return ret; } + static void init_scan_iter_param(ObDASScanIterParam ¶m, + const ObDASScanCtDef *scan_ctdef, + ObDASBaseRtDef *scan_rtdef); + static void init_spatial_scan_iter_param(ObDASSpatialScanIterParam ¶m, + const ObDASScanCtDef *scan_ctdef, + ObDASScanRtDef *scan_rtdef); + + static int create_das_spatial_scan_iter(ObIAllocator &alloc, ObDASSpatialScanIterParam ¶m, ObDASSpatialScanIter *&result); + ObDASIterUtils() = delete; ~ObDASIterUtils() = delete; }; diff --git a/src/sql/das/iter/ob_das_local_lookup_iter.h b/src/sql/das/iter/ob_das_local_lookup_iter.h index d48a7bfaf1..26c5558982 100644 --- a/src/sql/das/iter/ob_das_local_lookup_iter.h +++ b/src/sql/das/iter/ob_das_local_lookup_iter.h @@ -43,8 +43,8 @@ class ObDASScanRtDef; class ObDASLocalLookupIter : public ObDASLookupIter { public: - ObDASLocalLookupIter() - : ObDASLookupIter(ObDASIterType::DAS_ITER_LOCAL_LOOKUP), + ObDASLocalLookupIter(const ObDASIterType type = ObDASIterType::DAS_ITER_LOCAL_LOOKUP) + : ObDASLookupIter(type), trans_info_array_(), lookup_param_(), lookup_tablet_id_(), diff --git a/src/sql/das/iter/ob_das_lookup_iter.h b/src/sql/das/iter/ob_das_lookup_iter.h index 57291d3c6a..ab207d5abf 100644 --- a/src/sql/das/iter/ob_das_lookup_iter.h +++ b/src/sql/das/iter/ob_das_lookup_iter.h @@ -25,12 +25,16 @@ class ObDASBaseCtDef; class ObDASBaseRtDef; class ObDASScanCtDef; class ObDASScanRtDef; + struct ObDASLookupIterParam : public ObDASIterParam { public: + static const int64_t LOCAL_LOOKUP_ITER_DEFAULT_BATCH_ROW_COUNT = 1000; + static const int64_t GLOBAL_LOOKUP_ITER_DEFAULT_BATCH_ROW_COUNT = 10000; + ObDASLookupIterParam(bool is_global_index) : ObDASIterParam(is_global_index ? DAS_ITER_GLOBAL_LOOKUP : DAS_ITER_LOCAL_LOOKUP), - default_batch_row_count_(0), + default_batch_row_count_(is_global_index ? GLOBAL_LOOKUP_ITER_DEFAULT_BATCH_ROW_COUNT : LOCAL_LOOKUP_ITER_DEFAULT_BATCH_ROW_COUNT), index_ctdef_(nullptr), index_rtdef_(nullptr), lookup_ctdef_(nullptr), @@ -93,6 +97,14 @@ protected: virtual int check_index_lookup() = 0; protected: + enum LookupState : uint32_t + { + INDEX_SCAN, + DO_LOOKUP, + OUTPUT_ROWS, + FINISHED + }; + const ObDASBaseCtDef *index_ctdef_; ObDASBaseRtDef *index_rtdef_; const ObDASScanCtDef *lookup_ctdef_; @@ -102,22 +114,14 @@ protected: ObDASIter *data_table_iter_; int64_t lookup_rowkey_cnt_; int64_t lookup_row_cnt_; + LookupState state_; + bool index_end_; + int64_t default_batch_row_count_; int build_lookup_range(ObNewRange &range); int build_trans_info_datum(const ObExpr *trans_info_expr, ObDatum *&datum_ptr); common::ObArenaAllocator &get_arena_allocator() { return lookup_memctx_->get_arena_allocator(); } private: - enum LookupState : uint32_t - { - INDEX_SCAN, - DO_LOOKUP, - OUTPUT_ROWS, - FINISHED - }; - - LookupState state_; - bool index_end_; - int64_t default_batch_row_count_; lib::MemoryContext lookup_memctx_; }; diff --git a/src/sql/das/iter/ob_das_mvi_lookup_iter.cpp b/src/sql/das/iter/ob_das_mvi_lookup_iter.cpp new file mode 100644 index 0000000000..f0c41623d3 --- /dev/null +++ b/src/sql/das/iter/ob_das_mvi_lookup_iter.cpp @@ -0,0 +1,180 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SQL_DAS +#include "sql/das/iter/ob_das_mvi_lookup_iter.h" +#include "sql/das/iter/ob_das_scan_iter.h" +#include "sql/das/ob_das_scan_op.h" +#include "storage/concurrency_control/ob_data_validation_service.h" + +namespace oceanbase +{ +using namespace common; +namespace sql +{ + +int ObDASMVILookupIter::inner_get_next_row() +{ + int ret = OB_SUCCESS; + + bool got_next_row = false; + do { + switch (state_) { + case INDEX_SCAN: { + index_table_iter_->clear_evaluated_flag(); + if (OB_SUCC(index_table_iter_->get_next_row())) { + bool has_rowkey = check_has_rowkey(); + if (has_rowkey) { + lookup_rowkey_cnt_++; + state_ = OUTPUT_ROWS; + } else { + state_ = DO_LOOKUP; + } + } + + if (OB_ITER_END == ret) { + state_ = FINISHED; + } + break; + } + case DO_LOOKUP: { + if (OB_FAIL(do_index_lookup())) { + LOG_WARN("failed to do index lookup", K(ret)); + } else { + state_ = OUTPUT_ROWS; + } + break; + } + case OUTPUT_ROWS: { + if (lookup_rowkey_cnt_ != 0) { + lookup_rowkey_cnt_ = 0; + } else { + data_table_iter_->clear_evaluated_flag(); + if (OB_FAIL(data_table_iter_->get_next_row())) { + LOG_WARN("failed to get next row from data table", K(ret)); + } + } + + got_next_row = true; + state_ = LookupState::INDEX_SCAN; + break; + } + case FINISHED: { + ret = OB_ITER_END; + break; + } + default: { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected loopup state", K(state_), K(ret)); + break; + } + } + } while (OB_SUCC(ret) && !got_next_row); + + return ret; +} + +int ObDASMVILookupIter::save_rowkey() +{ + int ret = OB_SUCCESS; + + const ObDASScanCtDef *index_table_ctdef = static_cast(index_ctdef_); + const ObDASScanCtDef *data_table_ctdef = static_cast(lookup_ctdef_); + + ObDASScanIter *scan_iter = static_cast(data_table_iter_); + ObTableScanParam &scan_param = scan_iter->get_scan_param(); + + int64_t rowkey_cnt = index_table_ctdef->rowkey_exprs_.count(); + ObExpr *doc_id_expr = index_table_ctdef->result_output_.at(rowkey_cnt); + ObDatum &doc_id_datum = doc_id_expr->locate_expr_datum(*lookup_rtdef_->eval_ctx_); + if (OB_UNLIKELY(doc_id_datum.is_null())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("docid and rowkey can't both be null", K(ret)); + } else { + ObObj *obj_ptr = nullptr; + ObArenaAllocator allocator("MulvalLookup"); + if (OB_ISNULL(obj_ptr = static_cast(allocator.alloc(sizeof(ObObj))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate buffer failed", K(ret)); + } else { + obj_ptr = new(obj_ptr) ObObj; + + if (OB_FAIL(doc_id_datum.to_obj(*obj_ptr, doc_id_expr->obj_meta_, doc_id_expr->obj_datum_map_))) { + LOG_WARN("failed to convert datum to obj", K(ret)); + } else { + ObRowkey aux_table_rowkey(obj_ptr, 1); + ObNewRange lookup_range; + if (OB_FAIL(lookup_range.build_range(lookup_ctdef_->ref_table_id_, aux_table_rowkey))) { + LOG_WARN("failed to build lookup range", K(ret), K(lookup_ctdef_->ref_table_id_), K(aux_table_rowkey)); + } else if (OB_FAIL(scan_param.key_ranges_.push_back(lookup_range))) { + LOG_WARN("failed to push back lookup range", K(ret)); + } else { + scan_param.is_get_ = true; + } + } + } + } + + return ret; +} + +int ObDASMVILookupIter::inner_get_next_rows(int64_t &count, int64_t capacity) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ObDASMVILookupIter::inner_get_next_row())) { + LOG_WARN("ObDASMVILookupIter failed to get next row", K(ret)); + } else { + count = 1; + } + return ret; +} + +int ObDASMVILookupIter::do_index_lookup() +{ + int ret = OB_SUCCESS; + + if (OB_NOT_NULL(data_table_iter_) && OB_FAIL(data_table_iter_->reuse())) { + LOG_WARN("failed to reuse data table iter"); + } else if (OB_FAIL(save_rowkey())) { + LOG_WARN("failed to save rowkey", K(ret)); + } else if (OB_FAIL(ObDASLocalLookupIter::do_index_lookup())) { + LOG_WARN("failed to do index lookup", K(ret)); + } + + return ret; +} + +bool ObDASMVILookupIter::check_has_rowkey() +{ + const ObDASScanCtDef *index_table_ctdef = static_cast(index_ctdef_); + const ObDASScanCtDef *data_table_ctdef = static_cast(lookup_ctdef_); + int64_t rowkey_col_cnt = index_table_ctdef->rowkey_exprs_.count(); + int64_t rowkey_null_col_cnt = 0; + + for (int64_t i = 0; i < rowkey_col_cnt; ++i) { + ObExpr *expr = index_table_ctdef->result_output_.at(i); + if (T_PSEUDO_GROUP_ID == expr->type_) { + // do nothing + } else { + ObDatum &datum = expr->locate_expr_datum(*lookup_rtdef_->eval_ctx_); + if (datum.is_null()) { + rowkey_null_col_cnt++; + } + } + } + + return rowkey_null_col_cnt != rowkey_col_cnt; +} + + +} // namespace sql +} // namespace oceanbase diff --git a/src/sql/das/iter/ob_das_mvi_lookup_iter.h b/src/sql/das/iter/ob_das_mvi_lookup_iter.h new file mode 100644 index 0000000000..50af0449be --- /dev/null +++ b/src/sql/das/iter/ob_das_mvi_lookup_iter.h @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OBDEV_SRC_SQL_DAS_ITER_OB_DAS_MVI_LOOKUP_ITER_H_ +#define OBDEV_SRC_SQL_DAS_ITER_OB_DAS_MVI_LOOKUP_ITER_H_ + +#include "sql/das/iter/ob_das_lookup_iter.h" +#include "storage/access/ob_dml_param.h" + +namespace oceanbase +{ +using namespace common; +namespace sql +{ + +class ObDASScanCtDef; +class ObDASScanRtDef; +class ObDASMVILookupIter : public ObDASLocalLookupIter +{ +public: + ObDASMVILookupIter(): ObDASLocalLookupIter(ObDASIterType::DAS_ITER_MVI_LOOKUP) {} + virtual ~ObDASMVILookupIter() {} + +protected: + virtual int inner_get_next_row() override; + virtual int inner_get_next_rows(int64_t &count, int64_t capacity) override; + virtual int do_index_lookup() override; + +private: + bool check_has_rowkey(); + int save_rowkey(); +}; + +} // namespace sql +} // namespace oceanbase + + +#endif /* OBDEV_SRC_SQL_DAS_ITER_OB_DAS_MVI_ITER_H_ */ diff --git a/src/sql/das/iter/ob_das_scan_iter.h b/src/sql/das/iter/ob_das_scan_iter.h index 839a8e01c2..febe9e37c7 100644 --- a/src/sql/das/iter/ob_das_scan_iter.h +++ b/src/sql/das/iter/ob_das_scan_iter.h @@ -16,10 +16,19 @@ #include "sql/das/iter/ob_das_iter.h" namespace oceanbase { -using namespace common; +namespace common { +class ObITabletScan; +} + +namespace storage { +class ObTableScanParam; +} + namespace sql { +class ObDASScanCtDef; + // DASScanIter is a wrapper class for storage iter, it doesn't require eval_ctx or exprs like other iters. struct ObDASScanIterParam : public ObDASIterParam { diff --git a/src/sql/das/iter/ob_das_sort_iter.cpp b/src/sql/das/iter/ob_das_sort_iter.cpp index 1d94ecb370..cc5c4b91ec 100644 --- a/src/sql/das/iter/ob_das_sort_iter.cpp +++ b/src/sql/das/iter/ob_das_sort_iter.cpp @@ -37,6 +37,7 @@ int ObDASSortIter::inner_init(ObDASIterParam ¶m) ObDASSortIterParam &sort_param = static_cast(param); sort_ctdef_ = sort_param.sort_ctdef_; need_rewind_ = sort_param.need_rewind_; + need_distinct_ = sort_param.need_distinct_; child_ = sort_param.child_; // init top-n parameter if ((nullptr != sort_ctdef_->limit_expr_ || nullptr != sort_ctdef_->offset_expr_) @@ -67,27 +68,64 @@ int ObDASSortIter::inner_init(ObDASIterParam ¶m) } } - if (OB_SUCC(ret)) { - const bool top_k_overflow = INT64_MAX - limit_param_.offset_ < limit_param_.limit_; - const int64_t top_k = (limit_param_.is_valid() && !top_k_overflow) - ? (limit_param_.limit_ + limit_param_.offset_) : INT64_MAX; - if (OB_FAIL(sort_impl_.init(MTL_ID(), - &sort_ctdef_->sort_collations_, - &sort_ctdef_->sort_cmp_funcs_, - eval_ctx_, - exec_ctx_, - false, // enable encode sort key - false, // is local order - need_rewind_, // need rewind - 0, // part cnt - top_k, - sort_ctdef_->fetch_with_ties_))) { - LOG_WARN("failed to init sort impl", K(ret)); - } else if (OB_FAIL(append(sort_row_, sort_ctdef_->sort_exprs_))) { - LOG_WARN("failed to append sort exprs", K(ret)); - } else if (OB_FAIL(append_array_no_dup(sort_row_, *child_->get_output()))) { - LOG_WARN("failed to append sort rows", K(ret)); - } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(init_sort_impl())) { + LOG_WARN("failed to init sort impl", K(ret)); + } else if (OB_FAIL(append(sort_row_, sort_ctdef_->sort_exprs_))) { + LOG_WARN("failed to append sort exprs", K(ret)); + } else if (OB_FAIL(append_array_no_dup(sort_row_, *child_->get_output()))) { + LOG_WARN("failed to append sort rows", K(ret)); + } + } + } + + return ret; +} + +int ObDASSortIter::init_sort_impl() +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(sort_memctx_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null sort memctx", K(ret)); + } else { + ObIAllocator &allocator = sort_memctx_->get_arena_allocator(); + void *buf = nullptr; + if (!need_distinct_) { + if (OB_ISNULL(buf = allocator.alloc(sizeof(ObSortOpImpl)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate ObSortOpImpl memory", K(ret), KP(buf)); + } else { + ObSortOpImpl *sort_impl = new (buf) ObSortOpImpl(); + sort_impl_ = static_cast(sort_impl); + } + } else { + if (OB_ISNULL(buf = allocator.alloc(sizeof(ObUniqueSortImpl)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate ObUniqueSortImpl memory", K(ret), KP(buf)); + } else { + ObUniqueSortImpl *sort_impl = new (buf) ObUniqueSortImpl(); + sort_impl_ = static_cast(sort_impl); + } + } + + + if (OB_SUCC(ret)) { + const bool top_k_overflow = INT64_MAX - limit_param_.offset_ < limit_param_.limit_; + const int64_t top_k = (limit_param_.is_valid() && !top_k_overflow) + ? (limit_param_.limit_ + limit_param_.offset_) : INT64_MAX; + if (OB_FAIL(sort_impl_->init(MTL_ID(), + &sort_ctdef_->sort_collations_, + &sort_ctdef_->sort_cmp_funcs_, + eval_ctx_, + exec_ctx_, + false, // enable encode sort key + false, // is local order + need_rewind_, // need rewind + 0, // part cnt + top_k, + sort_ctdef_->fetch_with_ties_))) { + LOG_WARN("failed to init sort impl", K(ret)); } } } @@ -103,15 +141,19 @@ int ObDASSortIter::inner_reuse() LOG_WARN("failed to reuse child iter", K(ret)); } } + // TODO: check if we can reuse sort impl here + // reset sort impl here since ObSortOpImpl::outputted_rows_cnt_ is not reset in reuse() + if (OB_NOT_NULL(sort_impl_)) { + sort_impl_->reset(); + sort_impl_->~ObSortOpImpl(); + sort_impl_ = nullptr; + } if (OB_NOT_NULL(sort_memctx_)) { sort_memctx_->reset_remain_one_page(); } sort_finished_ = false; output_row_cnt_ = 0; input_row_cnt_ = 0; - // TODO: check if we can reuse sort impl here - // reset sort impl here since ObSortOpImpl::outputted_rows_cnt_ is not reset in reuse() - sort_impl_.reset(); fake_skip_ = nullptr; return ret; } @@ -119,7 +161,11 @@ int ObDASSortIter::inner_reuse() int ObDASSortIter::inner_release() { int ret = OB_SUCCESS; - sort_impl_.reset(); + if (OB_NOT_NULL(sort_impl_)) { + sort_impl_->reset(); + sort_impl_->~ObSortOpImpl(); + sort_impl_ = nullptr; + } if (OB_NOT_NULL(sort_memctx_)) { sort_memctx_->reset_remain_one_page(); DESTROY_CONTEXT(sort_memctx_); @@ -141,24 +187,13 @@ int ObDASSortIter::do_table_scan() int ObDASSortIter::rescan() { int ret = OB_SUCCESS; - const bool top_k_overflow = INT64_MAX - limit_param_.offset_ < limit_param_.limit_; - const int64_t top_k = (limit_param_.is_valid() && !top_k_overflow) - ? (limit_param_.limit_ + limit_param_.offset_) : INT64_MAX; + if (OB_FAIL(child_->rescan())) { LOG_WARN("failed to rescan child", K(ret)); - } else if (OB_FAIL(sort_impl_.init(MTL_ID(), - &sort_ctdef_->sort_collations_, - &sort_ctdef_->sort_cmp_funcs_, - eval_ctx_, - exec_ctx_, - false, // enable encode sort key - false, // is local order - need_rewind_, // need rewind - 0, // part cnt - top_k, - sort_ctdef_->fetch_with_ties_))) { + } else if (OB_FAIL(init_sort_impl())) { LOG_WARN("failed to init sort impl", K(ret)); } + return ret; } @@ -173,7 +208,7 @@ int ObDASSortIter::inner_get_next_row() } else { bool got_row = false; while (OB_SUCC(ret) && !got_row) { - if (OB_FAIL(sort_impl_.get_next_row(sort_row_))) { + if (OB_FAIL(sort_impl_->get_next_row(sort_row_))) { if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("failed to get next row from sort impl", K(ret)); } @@ -202,7 +237,7 @@ int ObDASSortIter::inner_get_next_rows(int64_t &count, int64_t capacity) int64_t need_offset_count = limit_param_.offset_; while (OB_SUCC(ret) && need_offset_count > 0) { int64_t got_count = 0; - if (OB_FAIL(sort_impl_.get_next_batch(sort_row_, OB_MIN(need_offset_count, capacity), got_count))) { + if (OB_FAIL(sort_impl_->get_next_batch(sort_row_, OB_MIN(need_offset_count, capacity), got_count))) { if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("failed to get next row from token merge", K(ret)); } @@ -226,7 +261,7 @@ int ObDASSortIter::inner_get_next_rows(int64_t &count, int64_t capacity) } if (OB_SUCC(ret)) { int64_t min_capacity = limit_param_.limit_ > 0 ? OB_MIN(capacity, limit_param_.limit_ - output_row_cnt_) : capacity; - if (OB_FAIL(sort_impl_.get_next_batch(sort_row_, min_capacity, count))) { + if (OB_FAIL(sort_impl_->get_next_batch(sort_row_, min_capacity, count))) { if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("failed to get next row from token merge", K(ret)); } @@ -254,13 +289,13 @@ int ObDASSortIter::do_sort(bool is_vectorized) if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("failed ro get next rows from child iter", K(ret)); } else if (read_size != 0) { - if (OB_FAIL(sort_impl_.add_batch(sort_row_, *fake_skip_, read_size, 0, nullptr))) { + if (OB_FAIL(sort_impl_->add_batch(sort_row_, *fake_skip_, read_size, 0, nullptr))) { LOG_WARN("failed to add batch to sort impl", K(ret)); } else { ret = OB_ITER_END; } } - } else if (OB_FAIL(sort_impl_.add_batch(sort_row_, *fake_skip_, read_size, 0, nullptr))) { + } else if (OB_FAIL(sort_impl_->add_batch(sort_row_, *fake_skip_, read_size, 0, nullptr))) { LOG_WARN("failed to add batch to sort impl", K(ret)); } } @@ -270,7 +305,7 @@ int ObDASSortIter::do_sort(bool is_vectorized) if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("failed ro get next rows from child iter", K(ret)); } - } else if (OB_FAIL(sort_impl_.add_row(sort_row_))) { + } else if (OB_FAIL(sort_impl_->add_row(sort_row_))) { LOG_WARN("failed to add row to sort impl", K(ret)); } } @@ -278,7 +313,7 @@ int ObDASSortIter::do_sort(bool is_vectorized) if (OB_LIKELY(OB_ITER_END == ret)) { ret = OB_SUCCESS; - if (OB_FAIL(sort_impl_.sort())) { + if (OB_FAIL(sort_impl_->sort())) { LOG_WARN("failed to do sort", K(ret)); } else { sort_finished_ = true; diff --git a/src/sql/das/iter/ob_das_sort_iter.h b/src/sql/das/iter/ob_das_sort_iter.h index 2b10ead50f..a0b7537d9d 100644 --- a/src/sql/das/iter/ob_das_sort_iter.h +++ b/src/sql/das/iter/ob_das_sort_iter.h @@ -30,12 +30,14 @@ public: sort_ctdef_(nullptr), child_(nullptr), limit_param_(), - need_rewind_(false) {} + need_rewind_(false), + need_distinct_(false) {} virtual ~ObDASSortIterParam() {} const ObDASSortCtDef *sort_ctdef_; ObDASIter *child_; common::ObLimitParam limit_param_; bool need_rewind_; + bool need_distinct_; virtual bool is_valid() const override { return ObDASIterParam::is_valid() && nullptr != sort_ctdef_ && nullptr != child_; @@ -47,13 +49,14 @@ class ObDASSortIter : public ObDASIter public: ObDASSortIter() : ObDASIter(ObDASIterType::DAS_ITER_SORT), - sort_impl_(), + sort_impl_(nullptr), sort_memctx_(), sort_ctdef_(nullptr), sort_row_(), child_(nullptr), sort_finished_(false), need_rewind_(false), + need_distinct_(false), limit_param_(), input_row_cnt_(0), output_row_cnt_(0), @@ -73,16 +76,18 @@ protected: virtual int inner_get_next_rows(int64_t &count, int64_t capacity) override; private: + int init_sort_impl(); int do_sort(bool is_vectorized); private: - ObSortOpImpl sort_impl_; + ObSortOpImpl *sort_impl_; lib::MemoryContext sort_memctx_; const ObDASSortCtDef *sort_ctdef_; ObSEArray sort_row_; ObDASIter *child_; bool sort_finished_; bool need_rewind_; + bool need_distinct_; // limit param was set only once at do_table_scan of TSC, which means it should not be reset at reuse, // input row cnt and output row cnt are the same as well. common::ObLimitParam limit_param_; diff --git a/src/sql/das/iter/ob_das_spatial_scan_iter.cpp b/src/sql/das/iter/ob_das_spatial_scan_iter.cpp new file mode 100644 index 0000000000..82716dd04a --- /dev/null +++ b/src/sql/das/iter/ob_das_spatial_scan_iter.cpp @@ -0,0 +1,152 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SQL_DAS +#include "sql/das/iter/ob_das_spatial_scan_iter.h" +#include "sql/das/ob_das_scan_op.h" +#include "storage/tx_storage/ob_access_service.h" + +namespace oceanbase +{ +using namespace common; +namespace sql +{ + +int ObDASSpatialScanIter::inner_init(ObDASIterParam ¶m) +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(ObDASScanIter::inner_init(param))) { + LOG_WARN("failed to init das scan iter", K(ret)); + } else { + ObDASSpatialScanIterParam& scan_param = static_cast(param); + scan_rtdef_ = scan_param.scan_rtdef_; + scan_ctdef_ = scan_param.scan_ctdef_; + } + + return ret; +} + +void ObDASSpatialScanIter::set_scan_param(storage::ObTableScanParam &scan_param) +{ + mbr_filters_ = &scan_param.mbr_filters_; + for (int64_t i = 0; i < scan_param.key_ranges_.count(); i++) { + if (scan_param.key_ranges_.at(i).is_whole_range()) { + is_whole_range_ = true; + } + } + is_whole_range_ |= (mbr_filters_->count() == 0); + + ObDASScanIter::set_scan_param(scan_param); +} + + +int ObDASSpatialScanIter::inner_get_next_row() +{ + int ret = OB_SUCCESS; + + bool got_row = false; + do { + if (OB_FAIL(ObDASScanIter::inner_get_next_row())) { + LOG_WARN("failed to get next row", K(ret)); + } else if (OB_FAIL(filter_by_mbr(got_row))){ + LOG_WARN("failed to process data table rowkey", K(ret)); + } + } while (OB_SUCC(ret) && !got_row); + + return ret; +} + +int ObDASSpatialScanIter::filter_by_mbr(bool &got_row) +{ + int ret = OB_SUCCESS; + int64_t rowkey_cnt = max_rowkey_cnt_; + if (max_rowkey_cnt_ < 0 || OB_ISNULL(obj_ptr_)) { + rowkey_cnt = scan_ctdef_->result_output_.count() - 1; + if (scan_ctdef_->trans_info_expr_ != nullptr) { + rowkey_cnt = rowkey_cnt - 1; + } + max_rowkey_cnt_ = rowkey_cnt; + + void *buf = nullptr; + if (OB_ISNULL(allocator_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("allocator is null", K(ret)); + } else if (OB_ISNULL(buf = allocator_->alloc(sizeof(ObObj) * rowkey_cnt))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate buffer failed", K(ret), K(rowkey_cnt)); + } else { + obj_ptr_ = new(buf) ObObj[rowkey_cnt]; + } + } + + for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_cnt; ++i) { + ObExpr *expr = scan_ctdef_->result_output_.at(i); + if (T_PSEUDO_GROUP_ID == expr->type_) { + // do nothing + } else { + ObDatum &col_datum = expr->locate_expr_datum(*scan_rtdef_->eval_ctx_); + if (OB_FAIL(col_datum.to_obj(obj_ptr_[i], expr->obj_meta_, expr->obj_datum_map_))) { + LOG_WARN("convert datum to obj failed", K(ret)); + } + } + } + + if (OB_SUCC(ret)) { + ObRowkey table_rowkey(obj_ptr_, rowkey_cnt); + bool pass_through = true; + + ObObj mbr_obj; + ObExpr *mbr_expr = scan_ctdef_->result_output_.at(rowkey_cnt); + ObDatum &mbr_datum = mbr_expr->locate_expr_datum(*scan_rtdef_->eval_ctx_); + if (OB_FAIL(mbr_datum.to_obj(mbr_obj, mbr_expr->obj_meta_, mbr_expr->obj_datum_map_))) { + LOG_WARN("convert datum to obj failed", K(ret)); + } else if (!is_whole_range_ && OB_FAIL(filter_by_mbr(mbr_obj, pass_through))) { + LOG_WARN("filter mbr failed", K(ret)); + } else if (!is_whole_range_ && pass_through) { + // not target + mbr_filter_cnt_++; + } else { + got_row = true; + } + } + + return ret; +} + +int ObDASSpatialScanIter::filter_by_mbr(const ObObj &mbr_obj, bool &pass_through) +{ + int ret = OB_SUCCESS; + + pass_through = true; + ObString mbr_str = mbr_obj.get_varchar(); + bool is_point = (WKB_POINT_DATA_SIZE == mbr_str.length()); + ObSpatialMBR idx_spa_mbr; + + if (OB_FAIL(ObSpatialMBR::from_string(mbr_str, ObDomainOpType::T_INVALID, idx_spa_mbr, is_point))) { + LOG_WARN("fail to create index spatial mbr", K(ret), K(mbr_obj)); + } else { + idx_spa_mbr.is_point_ = is_point; + for (int64_t i = 0; OB_SUCC(ret) && i < mbr_filters_->count() && pass_through; i++) { + const ObSpatialMBR &spa_mbr = mbr_filters_->at(i); + idx_spa_mbr.is_geog_ = spa_mbr.is_geog(); + if (OB_FAIL(idx_spa_mbr.filter(spa_mbr, spa_mbr.get_type(), pass_through))) { + LOG_WARN("fail to filter by s2", K(ret), K(spa_mbr), K(idx_spa_mbr)); + } + } + } + + return ret; +} + +} // namespace sql +} // namespace oceanbase diff --git a/src/sql/das/iter/ob_das_spatial_scan_iter.h b/src/sql/das/iter/ob_das_spatial_scan_iter.h new file mode 100644 index 0000000000..5581242db9 --- /dev/null +++ b/src/sql/das/iter/ob_das_spatial_scan_iter.h @@ -0,0 +1,82 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OBDEV_SRC_SQL_DAS_ITER_OB_DAS_SPATIAL_SCAN_ITER_H_ +#define OBDEV_SRC_SQL_DAS_ITER_OB_DAS_SPATIAL_SCAN_ITER_H_ + +#include "sql/das/iter/ob_das_iter.h" +#include "sql/das/iter/ob_das_scan_iter.h" +namespace oceanbase +{ +using namespace common; +namespace sql +{ + +struct ObDASSpatialScanIterParam : public ObDASScanIterParam +{ +public: + ObDASSpatialScanIterParam() + : ObDASScanIterParam(), + scan_rtdef_(nullptr) + {} + + ObDASScanRtDef *scan_rtdef_; + + virtual bool is_valid() const override + { + return nullptr != scan_rtdef_ && ObDASIterParam::is_valid(); + } +}; + +class ObDASSpatialScanIter : public ObDASScanIter +{ +public: + ObDASSpatialScanIter(ObIAllocator &allocator) + : ObDASScanIter(), + scan_ctdef_(nullptr), + scan_rtdef_(nullptr), + mbr_filters_(nullptr), + mbr_filter_cnt_(0), + max_rowkey_cnt_(-1), + allocator_(&allocator), + obj_ptr_(nullptr) {} + + void set_scan_param(storage::ObTableScanParam &scan_param); + +protected: + virtual int inner_init(ObDASIterParam ¶m) override; + virtual int inner_get_next_row() override; + +private: + int filter_by_mbr(bool &got_row); + int filter_by_mbr(const ObObj &mbr_obj, bool &pass_through); + + const ObDASScanCtDef *scan_ctdef_; + ObDASScanRtDef *scan_rtdef_; + + const ObMbrFilterArray *mbr_filters_; + bool is_whole_range_; + + int64_t mbr_filter_cnt_; + int64_t max_rowkey_cnt_; + + ObIAllocator* allocator_; + ObObj *obj_ptr_; +}; + + +} // namespace sql +} // namespace oceanbase + + + +#endif /* OBDEV_SRC_SQL_DAS_ITER_OB_DAS_SPATIAL_SCAN_ITER_H_ */ diff --git a/src/sql/das/ob_das_scan_op.cpp b/src/sql/das/ob_das_scan_op.cpp index 66ea1b9d0f..3a60c6558f 100644 --- a/src/sql/das/ob_das_scan_op.cpp +++ b/src/sql/das/ob_das_scan_op.cpp @@ -13,7 +13,6 @@ #define USING_LOG_PREFIX SQL_DAS #include "sql/das/ob_das_scan_op.h" #include "sql/das/ob_das_extra_data.h" -#include "sql/das/ob_das_spatial_index_lookup_op.h" #include "sql/das/ob_vector_index_lookup_op.h" #include "sql/das/ob_das_utils.h" #include "sql/engine/table/ob_table_scan_op.h" @@ -379,7 +378,9 @@ ObDASIterTreeType ObDASScanOp::get_iter_tree_type() const tree_type = ObDASIterTreeType::ITER_TREE_TEXT_RETRIEVAL; } else if (is_spatial_index) { tree_type = ObDASIterTreeType::ITER_TREE_GIS_LOOKUP; - } else if (is_multivalue_index || is_vector_index) { + } else if (is_multivalue_index) { + tree_type = ObDASIterTreeType::ITER_TREE_MVI_LOOKUP; + } else if (is_vector_index) { tree_type = ObDASIterTreeType::ITER_TREE_DOMAIN_LOOKUP; } else if (OB_UNLIKELY(is_index_merge(attach_ctdef_))) { tree_type = ObDASIterTreeType::ITER_TREE_INDEX_MERGE; @@ -419,7 +420,7 @@ int ObDASScanOp::init_related_tablet_ids(ObDASRelatedTabletID &related_tablet_id LOG_WARN("fail to get rowkey doc tablet id", K(ret)); } else if (OB_FAIL(get_rowkey_vid_tablet_id(related_tablet_ids.rowkey_vid_tablet_id_))) { LOG_WARN("fail to get rowkey vid tablet id", K(ret)); - } else if (OB_FAIL(get_aux_lookup_tablet_id(related_tablet_ids.aux_lookup_tablet_id_))) { + } else if (!scan_param_.table_param_->is_spatial_index() && OB_FAIL(get_aux_lookup_tablet_id(related_tablet_ids.aux_lookup_tablet_id_))) { LOG_WARN("failed to get aux lookup tablet id", K(ret)); } else if (OB_FAIL(get_text_ir_tablet_ids(related_tablet_ids.inv_idx_tablet_id_, related_tablet_ids.fwd_idx_tablet_id_, @@ -521,8 +522,7 @@ int ObDASScanOp::open_op() if (OB_FAIL(init_scan_param())) { LOG_WARN("init scan param failed", K(ret)); } else if (FALSE_IT(tree_type = get_iter_tree_type())) { - } else if (ITER_TREE_PARTITION_SCAN == tree_type || ITER_TREE_LOCAL_LOOKUP == tree_type - || ITER_TREE_TEXT_RETRIEVAL == tree_type || ITER_TREE_INDEX_MERGE == tree_type) { + } else if (SUPPORTED_DAS_ITER_TREE(tree_type)) { ObDASIter *result = nullptr; if (OB_FAIL(init_related_tablet_ids(tablet_ids_))) { LOG_WARN("failed to init related tablet ids", K(ret)); @@ -569,8 +569,7 @@ int ObDASScanOp::release_op() { int ret = OB_SUCCESS; ObDASIterTreeType tree_type = get_iter_tree_type(); - if (ITER_TREE_PARTITION_SCAN == tree_type || ITER_TREE_LOCAL_LOOKUP == tree_type - || ITER_TREE_TEXT_RETRIEVAL == tree_type || ITER_TREE_INDEX_MERGE == tree_type) { + if (SUPPORTED_DAS_ITER_TREE(tree_type)) { if (OB_NOT_NULL(result_)) { ObDASIter *result = static_cast(result_); if (OB_FAIL(result->release())) { @@ -680,28 +679,10 @@ int ObDASScanOp::do_local_index_lookup() { int ret = OB_SUCCESS; ObTabletID lookup_tablet_id; - if (scan_param_.table_param_->is_multivalue_index() || scan_param_.table_param_->is_vec_index()) { + if (scan_param_.table_param_->is_vec_index()) { if (OB_FAIL(do_domain_index_lookup())) { LOG_WARN("failed to do domain index lookup", K(ret)); } - } else if (scan_param_.table_param_->is_spatial_index()) { - void *buf = op_alloc_.alloc(sizeof(ObSpatialIndexLookupOp)); - if (OB_ISNULL(buf)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("lookup op buf allocated failed", K(ret)); - } else { - ObSpatialIndexLookupOp *op = new(buf) ObSpatialIndexLookupOp(op_alloc_); - op->set_rowkey_iter(result_); - result_ = op; - if (OB_FAIL(op->init(get_lookup_ctdef(), get_lookup_rtdef(), scan_ctdef_, scan_rtdef_, - trans_desc_, snapshot_, scan_param_))) { - LOG_WARN("init spatial lookup op failed", K(ret)); - } else if (FALSE_IT(get_table_lookup_tablet_id(lookup_tablet_id))) { - } else { - op->set_tablet_id(lookup_tablet_id); - op->set_ls_id(ls_id_); - } - } } else { void *buf = op_alloc_.alloc(sizeof(ObLocalIndexLookupOp)); if (OB_ISNULL(buf)) { @@ -733,25 +714,7 @@ int ObDASScanOp::do_domain_index_lookup() int ret = OB_SUCCESS; ObTabletID doc_id_idx_tablet_id; ObTabletID lookup_tablet_id; - if (scan_param_.table_param_->is_multivalue_index()) { - ObMulValueIndexLookupOp* op = nullptr; - if (OB_FAIL(get_aux_lookup_tablet_id(doc_id_idx_tablet_id))) { - LOG_WARN("failed to get doc id idx tablet id", K(ret), K_(related_tablet_ids)); - } else if (OB_ISNULL(op = OB_NEWx(ObMulValueIndexLookupOp, &op_alloc_, op_alloc_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate full text index lookup op", K(ret)); - } else if (FALSE_IT(op->set_rowkey_iter(result_))) { - } else if (FALSE_IT(result_ = op)) { - } else if (OB_FAIL(op->init(attach_ctdef_, attach_rtdef_, trans_desc_, snapshot_, scan_param_))) { - LOG_WARN("failed to init multivalue index lookup op", K(ret)); - } else if (FALSE_IT(get_table_lookup_tablet_id(lookup_tablet_id))) { - } else { - op->set_tablet_id(lookup_tablet_id); - op->set_doc_id_idx_tablet_id(doc_id_idx_tablet_id); - op->set_ls_id(ls_id_); - } - - } else if (scan_param_.table_param_->is_vec_index()) { + if (scan_param_.table_param_->is_vec_index()) { ObVectorIndexLookupOp *op = nullptr; ObTabletID doc_id_idx_tablet_id; const ObDASTableLookupCtDef *table_lookup_ctdef = nullptr; @@ -1027,8 +990,7 @@ int ObDASScanOp::rescan() "scan_range", scan_param_.key_ranges_, "range_pos", scan_param_.range_array_pos_); ObDASIterTreeType tree_type = get_iter_tree_type(); - if (ITER_TREE_PARTITION_SCAN == tree_type || ITER_TREE_LOCAL_LOOKUP == tree_type - || ITER_TREE_TEXT_RETRIEVAL == tree_type || ITER_TREE_INDEX_MERGE == tree_type) { + if (SUPPORTED_DAS_ITER_TREE(tree_type)) { if (OB_ISNULL(result_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected nullptr das iter tree", K(ret)); @@ -1071,8 +1033,7 @@ int ObDASScanOp::reuse_iter() ObDASIterTreeType tree_type = get_iter_tree_type(); ObITabletScan &tsc_service = get_tsc_service(); ObLocalIndexLookupOp *lookup_op = get_lookup_op(); - if (ITER_TREE_PARTITION_SCAN == tree_type || ITER_TREE_LOCAL_LOOKUP == tree_type - || ITER_TREE_TEXT_RETRIEVAL == tree_type || ITER_TREE_INDEX_MERGE == tree_type) { + if (SUPPORTED_DAS_ITER_TREE(tree_type)) { if (OB_ISNULL(result_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected nullptr das iter tree", K(ret)); @@ -1112,6 +1073,20 @@ int ObDASScanOp::reuse_iter() } break; } + case ITER_TREE_MVI_LOOKUP: { + if (OB_NOT_NULL(get_lookup_ctdef())) { + ObDASLocalLookupIter *lookup_iter = static_cast(result_); + lookup_iter->set_tablet_id(tablet_ids_.lookup_tablet_id_); + lookup_iter->set_ls_id(ls_id_); + } + break; + } + case ITER_TREE_GIS_LOOKUP: { + ObDASLocalLookupIter *lookup_iter = static_cast(result_); + lookup_iter->set_tablet_id(tablet_ids_.lookup_tablet_id_); + lookup_iter->set_ls_id(ls_id_); + break; + } default: { ret = OB_ERR_UNEXPECTED; } @@ -1134,11 +1109,6 @@ int ObDASScanOp::reuse_iter() && OB_FAIL(static_cast(lookup_op)->reuse_scan_iter(scan_param_.need_switch_param_))) { LOG_WARN("failed to reuse text lookup iters", K(ret)); } - } else if (scan_param_.table_param_->is_multivalue_index() && attach_ctdef_ != nullptr) { - if (nullptr != lookup_op - && OB_FAIL(static_cast(lookup_op)->reuse_scan_iter(scan_param_.need_switch_param_))) { - LOG_WARN("failed to reuse text lookup iters", K(ret)); - } } else if (OB_FAIL(tsc_service.reuse_scan_iter(scan_param_.need_switch_param_, get_storage_scan_iter()))) { LOG_WARN("reuse scan iterator failed", K(ret)); } else if (lookup_op != nullptr @@ -1338,6 +1308,20 @@ int ObDASScanOp::get_aux_lookup_tablet_id(common::ObTabletID &tablet_id) const tablet_id = related_tablet_ids_.at(i); } } + } else if (nullptr != attach_ctdef_ && ObDASOpType::DAS_OP_SORT == attach_ctdef_->op_type_) { + // for multivalue index and don't need to index back, + // sort_ctdef + // | + // aux_lookup_ctdef + // | | + // index_table docid-rowkey_table + const ObDASSortCtDef *sort_ctdef = static_cast(attach_ctdef_); + aux_lookup_ctdef = static_cast(sort_ctdef->children_[0]); + for (int i = 0; OB_NOT_NULL(aux_lookup_ctdef) && !tablet_id.is_valid() && i < related_ctdefs_.count(); ++i) { + if (aux_lookup_ctdef->get_lookup_scan_ctdef() == related_ctdefs_.at(i)) { + tablet_id = related_tablet_ids_.at(i); + } + } } return ret; } diff --git a/src/sql/das/ob_das_spatial_index_lookup_op.cpp b/src/sql/das/ob_das_spatial_index_lookup_op.cpp deleted file mode 100644 index 852dd6ceef..0000000000 --- a/src/sql/das/ob_das_spatial_index_lookup_op.cpp +++ /dev/null @@ -1,313 +0,0 @@ -/** - * Copyright (c) 2023 OceanBase - * OceanBase CE is licensed under Mulan PubL v2. - * You can use this software according to the terms and conditions of the Mulan PubL v2. - * You may obtain a copy of Mulan PubL v2 at: - * http://license.coscl.org.cn/MulanPubL-2.0 - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PubL v2 for more details. - */ - -#define USING_LOG_PREFIX SQL_DAS -#include "sql/das/ob_das_spatial_index_lookup_op.h" -#include "sql/engine/ob_exec_context.h" -#include "storage/access/ob_dml_param.h" -namespace oceanbase -{ -using namespace common; -using namespace storage; -using namespace transaction; -namespace sql -{ - -bool ObRowKeyCompare::operator()(const ObRowkey *left, const ObRowkey *right) -{ - bool bool_ret = false; - if (OB_UNLIKELY(common::OB_SUCCESS != result_code_)) { - //do nothing - } else if (OB_UNLIKELY(NULL == left) - || OB_UNLIKELY(NULL == right)) { - result_code_ = common::OB_INVALID_ARGUMENT; - LOG_WARN_RET(result_code_, "Invalid argument, ", KP(left), KP(right), K_(result_code)); - } else { - bool_ret = (*left) < (*right); - } - return bool_ret; -} - -int ObSpatialIndexLookupOp::init(const ObDASScanCtDef *lookup_ctdef, - ObDASScanRtDef *lookup_rtdef, - const ObDASScanCtDef *index_ctdef, - ObDASScanRtDef *index_rtdef, - ObTxDesc *tx_desc, - ObTxReadSnapshot *snapshot, - ObTableScanParam &scan_param) -{ - int ret = OB_SUCCESS; - if (OB_FAIL(ObLocalIndexLookupOp::init(lookup_ctdef, lookup_rtdef, index_ctdef, - index_rtdef, tx_desc, snapshot))) { - LOG_WARN("ObLocalIndexLookupOp init failed", K(ret)); - } else { - mbr_filters_ = &scan_param.mbr_filters_; - is_inited_ = false; - for (int64_t i = 0; OB_SUCC(ret) && i < scan_param.key_ranges_.count(); i++) { - if (scan_param.key_ranges_.at(i).is_whole_range()) { - is_whole_range_ = true; - } - } - is_whole_range_ |= (mbr_filters_->count() == 0); - mbr_filter_cnt_ = 0; - index_back_cnt_ = 0; - } - return ret; -} - -ObSpatialIndexLookupOp::~ObSpatialIndexLookupOp() -{ - sorter_.clean_up(); - sorter_.~ObExternalSort(); -} - -int ObSpatialIndexLookupOp::revert_iter() -{ - int ret = OB_SUCCESS; - if (OB_FAIL(ObLocalIndexLookupOp::revert_iter())) { - LOG_WARN("revert local index lookup iter from spatial fail.", K(ret)); - } - sorter_.clean_up(); - sorter_.~ObExternalSort(); - return ret; -} - -int ObSpatialIndexLookupOp::reset_lookup_state() -{ - is_inited_ = false; - return ObLocalIndexLookupOp::reset_lookup_state(); -} - -int ObSpatialIndexLookupOp::filter_by_mbr(const ObObj &mbr_obj, bool &pass_through) -{ - int ret = OB_SUCCESS; - pass_through = true; - ObString mbr_str = mbr_obj.get_varchar(); - ObSpatialMBR idx_spa_mbr; - bool is_point = (WKB_POINT_DATA_SIZE == mbr_str.length()); - - if (OB_FAIL(ObSpatialMBR::from_string(mbr_str, ObDomainOpType::T_INVALID, idx_spa_mbr, is_point))) { - LOG_WARN("fail to create index spatial mbr", K(ret), K(mbr_obj)); - } else { - idx_spa_mbr.is_point_ = is_point; - for (int64_t i = 0; OB_SUCC(ret) && i < mbr_filters_->count() && pass_through; i++) { - const ObSpatialMBR &spa_mbr = mbr_filters_->at(i); - idx_spa_mbr.is_geog_ = spa_mbr.is_geog(); - if (OB_FAIL(idx_spa_mbr.filter(spa_mbr, spa_mbr.get_type(), pass_through))) { - LOG_WARN("fail to filter by s2", K(ret), K(spa_mbr), K(idx_spa_mbr)); - } - } - } - return ret; -} - -int ObSpatialIndexLookupOp::save_rowkeys() -{ - int ret = OB_SUCCESS; - int64_t simulate_batch_row_cnt = - EVENT_CALL(EventTable::EN_TABLE_LOOKUP_BATCH_ROW_COUNT); - int64_t default_row_batch_cnt = simulate_batch_row_cnt > 0 ? simulate_batch_row_cnt : MAX_NUM_PER_BATCH; - LOG_DEBUG("simulate lookup row batch count", K(simulate_batch_row_cnt), K(default_row_batch_cnt)); - ObStoreRowkey src_key; - ObExtStoreRowkey dest_key; // original and collation_free rowkeys - const ObRowkey *idx_row = NULL; - for (int64_t i = 0; OB_SUCC(ret) && i < default_row_batch_cnt; ++i) { - if (OB_FAIL(sorter_.get_next_item(idx_row))) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next sorted item", K(ret), K(i)); - } - } else if (last_rowkey_ != *idx_row) { - ObNewRange lookup_range; - uint64_t ref_table_id = lookup_ctdef_->ref_table_id_; - int64_t group_idx = 0; - for (int64_t i = 0; OB_SUCC(ret) && i < index_ctdef_->result_output_.count(); ++i) { - ObObj tmp_obj; - ObExpr *expr = index_ctdef_->result_output_.at(i); - if (T_PSEUDO_GROUP_ID == expr->type_) { - group_idx = ObNewRange::get_group_idx(expr->locate_expr_datum(*lookup_rtdef_->eval_ctx_).get_int()); - } - } - if (OB_FAIL(lookup_range.build_range(ref_table_id, *idx_row))) { - LOG_WARN("build lookup range failed", K(ret), K(ref_table_id), K(*idx_row)); - } else if (FALSE_IT(lookup_range.group_idx_ = group_idx)) { - } else if (OB_FAIL(scan_param_.key_ranges_.push_back(lookup_range))) { - LOG_WARN("store lookup key range failed", K(ret), K(scan_param_)); - } - last_rowkey_ = *idx_row; - index_back_cnt_++; - LOG_DEBUG("build data table range", K(ret), K(*idx_row), K(lookup_range), K(scan_param_.key_ranges_.count())); - } - } - return ret; -} - -int ObSpatialIndexLookupOp::get_next_row() -{ - int ret = OB_SUCCESS; - if (is_inited_ == false) { - /* init external sorter and mbr whole range*/ - cmp_ret_ = OB_SUCCESS; - new (&comparer_) ObRowKeyCompare(cmp_ret_); - const int64_t file_buf_size = ObExternalSortConstant::DEFAULT_FILE_READ_WRITE_BUFFER; - const int64_t expire_timestamp = 0; - const int64_t buf_limit = SORT_MEMORY_LIMIT; - const uint64_t tenant_id = MTL_ID(); - sorter_.clean_up(); - if (OB_FAIL(sorter_.init(buf_limit, file_buf_size, expire_timestamp, tenant_id, &comparer_))) { - STORAGE_LOG(WARN, "fail to init external sorter", K(ret)); - } else { - is_inited_ = true; - while (OB_SUCC(ret)) { - index_rtdef_->p_pd_expr_op_->clear_evaluated_flag(); - if (OB_FAIL(rowkey_iter_->get_next_row())) { - if (OB_ITER_END != ret) { - LOG_WARN("get next row from index scan failed", K(ret)); - } - } else if (OB_FAIL(process_data_table_rowkey())) { - LOG_WARN("process data table rowkey with das failed", K(ret)); - } else { - ++lookup_rowkey_cnt_; - } - } - - if (OB_ITER_END == ret && lookup_rowkey_cnt_ > 0) { - if (OB_FAIL(sorter_.do_sort(true))) { - LOG_WARN("sort candidates failed", K(ret)); - } - } - } - } - - bool got_next_row = false; - do { - switch (state_) { - case INDEX_SCAN: { - if (OB_FAIL(save_rowkeys())) { - if (ret != OB_ITER_END) { - LOG_WARN("failed get rowkey from sorter", K(ret)); - } - } - if (OB_SUCC(ret) || OB_ITER_END == ret) { - state_ = DO_LOOKUP; - index_end_ = (OB_ITER_END == ret); - if (ret == OB_ITER_END) { - LOG_INFO("test spatial index back cnt", K(lookup_rowkey_cnt_), K(index_back_cnt_), K(mbr_filter_cnt_)); - ret = OB_SUCCESS; - } - } - break; - } - case DO_LOOKUP: { - lookup_row_cnt_ = 0; - if (OB_FAIL(do_index_lookup())) { - LOG_WARN("do index lookup failed", K(ret)); - } else { - state_ = OUTPUT_ROWS; - } - break; - } - case OUTPUT_ROWS: { - lookup_rtdef_->p_pd_expr_op_->clear_evaluated_flag(); - if (scan_param_.key_ranges_.empty()) { - ret= OB_ITER_END; - state_ = FINISHED; - } else if (OB_FAIL(lookup_iter_->get_next_row())) { - if (OB_ITER_END == ret) { - ret = OB_SUCCESS; - if (!index_end_) { - // reuse lookup_iter_ only - ObLocalIndexLookupOp::reset_lookup_state(); - index_end_ = false; - state_ = INDEX_SCAN; - } else { - state_ = FINISHED; - } - } else { - LOG_WARN("look up get next row failed", K(ret)); - } - } else { - got_next_row = true; - ++lookup_row_cnt_; - } - break; - } - case FINISHED: { - if (OB_SUCC(ret) || OB_ITER_END == ret) { - ret = OB_ITER_END; - } - break; - } - default: { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected state", K(state_)); - } - } - } while (!got_next_row && OB_SUCC(ret)); - return ret; -} - -int ObSpatialIndexLookupOp::process_data_table_rowkey() -{ - int ret = OB_SUCCESS; - int64_t rowkey_cnt = max_rowkey_cnt_; - if (max_rowkey_cnt_ < 0 || OB_ISNULL(obj_ptr_)) { - rowkey_cnt = index_ctdef_->result_output_.count() - 1; - void *buf = nullptr; - if (index_ctdef_->trans_info_expr_ != nullptr) { - rowkey_cnt = rowkey_cnt - 1; - } - max_rowkey_cnt_ = rowkey_cnt; - if (OB_ISNULL(allocator_)) { - ret = OB_ERR_UNEXPECTED; - } else if (OB_ISNULL(buf = allocator_->alloc(sizeof(ObObj) * rowkey_cnt))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("allocate buffer failed", K(ret), K(rowkey_cnt)); - } else { - obj_ptr_ = new(buf) ObObj[rowkey_cnt]; - } - } - ObNewRange lookup_range; - for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_cnt; ++i) { - ObExpr *expr = index_ctdef_->result_output_.at(i); - if (T_PSEUDO_GROUP_ID == expr->type_) { - // do nothing - } else { - ObDatum &col_datum = expr->locate_expr_datum(*lookup_rtdef_->eval_ctx_); - if (OB_FAIL(col_datum.to_obj(obj_ptr_[i], expr->obj_meta_, expr->obj_datum_map_))) { - LOG_WARN("convert datum to obj failed", K(ret)); - } - } - } - if (OB_SUCC(ret)) { - ObRowkey table_rowkey(obj_ptr_, rowkey_cnt); - ObObj mbr_obj; - bool pass_through = true; - ObExpr *mbr_expr = index_ctdef_->result_output_.at(rowkey_cnt); - ObDatum &mbr_datum = mbr_expr->locate_expr_datum(*lookup_rtdef_->eval_ctx_); - if (OB_FAIL(mbr_datum.to_obj(mbr_obj, mbr_expr->obj_meta_, mbr_expr->obj_datum_map_))) { - LOG_WARN("convert datum to obj failed", K(ret)); - } else if (!is_whole_range_ && OB_FAIL(filter_by_mbr(mbr_obj, pass_through))) { - LOG_WARN("filter mbr failed", K(ret)); - } else if (!is_whole_range_ && pass_through) { - // not target - mbr_filter_cnt_++; - } else if (OB_FAIL(sorter_.add_item(table_rowkey))) { - LOG_WARN("filter mbr failed", K(ret)); - } else { - LOG_TRACE("geo idx add rowkey success", K(table_rowkey), KP(obj_ptr_), K(obj_ptr_[0]), K(rowkey_cnt)); - } - } - return ret; -} - - -} // namespace sql -} // namespace oceanbase diff --git a/src/sql/das/ob_das_spatial_index_lookup_op.h b/src/sql/das/ob_das_spatial_index_lookup_op.h deleted file mode 100644 index 1aef55d802..0000000000 --- a/src/sql/das/ob_das_spatial_index_lookup_op.h +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright (c) 2023 OceanBase - * OceanBase CE is licensed under Mulan PubL v2. - * You can use this software according to the terms and conditions of the Mulan PubL v2. - * You may obtain a copy of Mulan PubL v2 at: - * http://license.coscl.org.cn/MulanPubL-2.0 - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PubL v2 for more details. - */ - -#ifndef OBDEV_SRC_SQL_DAS_OB_DAS_SPATIAL_INDEX_LOOKUP_OP_H_ -#define OBDEV_SRC_SQL_DAS_OB_DAS_SPATIAL_INDEX_LOOKUP_OP_H_ -#include "sql/das/ob_das_scan_op.h" -#include "storage/ob_store_row_comparer.h" -#include "storage/ob_parallel_external_sort.h" -namespace oceanbase -{ -namespace sql -{ - - -class ObRowKeyCompare { -public: - ObRowKeyCompare(int &sort_ret) : result_code_(sort_ret) {} - bool operator()(const ObRowkey *left, const ObRowkey *right); - int &result_code_; -}; - -class ObSpatialIndexLookupOp : public ObLocalIndexLookupOp -{ -public: - ObSpatialIndexLookupOp(ObIAllocator &allocator) : - ObLocalIndexLookupOp(), - mbr_filters_(NULL), - cmp_ret_(0), - comparer_(cmp_ret_), - sorter_(allocator), - is_sorted_(false), - is_whole_range_(false), - is_inited_(false), - mbr_filter_cnt_(0), - index_back_cnt_(0), - max_rowkey_cnt_(-1), - allocator_(&allocator), - obj_ptr_(nullptr) {} - virtual ~ObSpatialIndexLookupOp(); - - int init(const ObDASScanCtDef *lookup_ctdef, - ObDASScanRtDef *lookup_rtdef, - const ObDASScanCtDef *index_ctdef, - ObDASScanRtDef *index_rtdef, - transaction::ObTxDesc *tx_desc, - transaction::ObTxReadSnapshot *snapshot, - storage::ObTableScanParam &scan_param); - int reset_lookup_state(); - int filter_by_mbr(const ObObj &mbr_obj, bool &pass_through); - int get_next_row(); - int revert_iter(); - -private: - int process_data_table_rowkey(); - int save_rowkeys(); -private: - static const int64_t SORT_MEMORY_LIMIT = 32L * 1024L * 1024L; - static const int64_t MAX_NUM_PER_BATCH = 10000; - - const ObMbrFilterArray *mbr_filters_; - int cmp_ret_; - ObRowKeyCompare comparer_; - ObExternalSort sorter_; // use ObRowKeyCompare to compare rowkey - ObRowkey last_rowkey_; // store last index row for distinct, who allocs the memory? // no need to use ObExtStoreRowkey - bool is_sorted_; - bool is_whole_range_; - bool is_inited_; - int64_t mbr_filter_cnt_; - int64_t index_back_cnt_; - int64_t max_rowkey_cnt_; - ObIAllocator* allocator_; - ObObj *obj_ptr_; -}; - -} // namespace sql -} // namespace oceanbase -#endif /* OBDEV_SRC_SQL_DAS_OB_DAS_SPATIAL_INDEX_LOOKUP_OP_H_ */ diff --git a/src/sql/das/ob_domain_index_lookup_op.cpp b/src/sql/das/ob_domain_index_lookup_op.cpp index 2088a4b08d..e611c1f47f 100644 --- a/src/sql/das/ob_domain_index_lookup_op.cpp +++ b/src/sql/das/ob_domain_index_lookup_op.cpp @@ -388,504 +388,5 @@ int ObDomainIndexLookupOp::reuse_scan_iter() return OB_SUCCESS; } -int ObMulValueIndexLookupOp::revert_iter() -{ - int ret = OB_SUCCESS; - if (nullptr != aux_lookup_iter_) { - ObITabletScan &tsc_service = get_tsc_service(); - if (OB_FAIL(tsc_service.revert_scan_iter(aux_lookup_iter_))) { - LOG_WARN("revert scan iterator failed", K(ret)); - } - aux_lookup_iter_ = nullptr; - } - - sorter_.clean_up(); - sorter_.~ObExternalSort(); - - aux_sorter_.clean_up(); - aux_sorter_.~ObExternalSort(); - - if (OB_SUCC(ret) && OB_FAIL(ObDomainIndexLookupOp::revert_iter())) { - LOG_WARN("failed to revert multivalue index lookup op iter", K(ret)); - } - return ret; -} - -void ObMulValueIndexLookupOp::do_clear_evaluated_flag() -{ - lookup_rtdef_->p_pd_expr_op_->clear_evaluated_flag(); - return ObDomainIndexLookupOp::do_clear_evaluated_flag(); -} - -int ObMulValueIndexLookupOp::init_scan_param() -{ - int ret = OB_SUCCESS; - - if (OB_FAIL(ObDomainIndexLookupOp::init_scan_param())) { - LOG_WARN("failed to init scan param", K(ret)); - } - - return ret; -} - -int ObMulValueIndexLookupOp::init(const ObDASBaseCtDef *table_lookup_ctdef, - ObDASBaseRtDef *table_lookup_rtdef, - ObTxDesc *tx_desc, - ObTxReadSnapshot *snapshot, - ObTableScanParam &scan_param) -{ - int ret = OB_SUCCESS; - - const ObDASTableLookupCtDef *tbl_lookup_ctdef = nullptr; - ObDASTableLookupRtDef *tbl_lookup_rtdef = nullptr; - const ObDASIRAuxLookupCtDef *aux_lookup_ctdef = nullptr; - ObDASIRAuxLookupRtDef *aux_lookup_rtdef = nullptr; - - if (OB_ISNULL(table_lookup_ctdef) || OB_ISNULL(table_lookup_rtdef)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("table lookup param is nullptr", KP(table_lookup_ctdef), KP(table_lookup_rtdef)); - } else if (OB_FAIL(ObDASUtils::find_target_das_def(table_lookup_ctdef, - table_lookup_rtdef, - DAS_OP_TABLE_LOOKUP, - tbl_lookup_ctdef, - tbl_lookup_rtdef))) { - LOG_WARN("find data table lookup def failed", K(ret)); - } else if (OB_FAIL(ObDASUtils::find_target_das_def(table_lookup_ctdef, - table_lookup_rtdef, - DAS_OP_IR_AUX_LOOKUP, - aux_lookup_ctdef, - aux_lookup_rtdef))) { - LOG_WARN("find ir aux lookup def failed", K(ret)); - } else if (aux_lookup_ctdef->children_cnt_ != 2) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("find index def failed", K(ret), K(aux_lookup_ctdef->children_cnt_)); - } else { - const ObDASScanCtDef* index_ctdef = static_cast(aux_lookup_ctdef->children_[0]); - ObDASScanRtDef * index_rtdef = static_cast(aux_lookup_rtdef->children_[0]); - - if (OB_FAIL(ObDomainIndexLookupOp::init(tbl_lookup_ctdef->get_lookup_scan_ctdef(), - tbl_lookup_rtdef->get_lookup_scan_rtdef(), - index_ctdef, - index_rtdef, - aux_lookup_ctdef->get_lookup_scan_ctdef(), - aux_lookup_rtdef->get_lookup_scan_rtdef(), - tx_desc, snapshot, scan_param))) { - LOG_WARN("ObLocalIndexLookupOp init failed", K(ret)); - } - } - return ret; -} - -int ObMulValueIndexLookupOp::reset_lookup_state() -{ - is_inited_ = false; - return ObDomainIndexLookupOp::reset_lookup_state(); -} - -int ObMulValueIndexLookupOp::init_sort() -{ - int ret = OB_SUCCESS; - if (!is_inited_) { - cmp_ret_ = OB_SUCCESS; - aux_cmp_ret_ = OB_SUCCESS; - new (&comparer_) ObDomainRowkeyComp(cmp_ret_); - new (&aux_comparer_) ObDomainRowkeyComp(aux_cmp_ret_); - const int64_t file_buf_size = ObExternalSortConstant::DEFAULT_FILE_READ_WRITE_BUFFER; - const int64_t expire_timestamp = 0; - const int64_t buf_limit = SORT_MEMORY_LIMIT; - const uint64_t tenant_id = MTL_ID(); - sorter_.clean_up(); - aux_sorter_.clean_up(); - if (OB_FAIL(sorter_.init(buf_limit, file_buf_size, expire_timestamp, tenant_id, &comparer_))) { - LOG_WARN("fail to init sorter", K(ret)); - } else if (OB_FAIL(aux_sorter_.init(buf_limit, file_buf_size, expire_timestamp, tenant_id, &aux_comparer_))) { - LOG_WARN("fail to init aux sorter", K(ret)); - } else { - is_inited_ = true; - } - } - - return ret; -} - -int ObMulValueIndexLookupOp::get_next_row() -{ - int ret = OB_SUCCESS; - bool got_next_row = false; - - if (!is_inited_) { - if (OB_FAIL(fetch_index_table_rowkey())) { - if (OB_UNLIKELY(ret != OB_ITER_END)) { - LOG_WARN("failed get index table rowkey", K(ret)); - } else { - ret = OB_SUCCESS; - } - } - - if (FAILEDx(fetch_rowkey_from_aux())) { - LOG_WARN("fetch rowkey from doc-rowkey table failed", K(ret)); - } else { - is_inited_ = true; - } - } - - while (OB_SUCC(ret) && !got_next_row) { - switch (state_) { - case INDEX_SCAN: { - if (OB_FAIL(save_rowkeys())) { - if (OB_UNLIKELY(ret != OB_ITER_END)) { - LOG_WARN("failed get index table rowkey", K(ret)); - } - } - - if (OB_SUCC(ret) || OB_ITER_END == ret) { - if (OB_ITER_END == ret) { - state_ = FINISHED; - index_end_ = true; - } else { - state_ = DO_LOOKUP; - ret = OB_SUCCESS; - } - } - break; - } - case DO_LOOKUP: { - if (OB_FAIL(do_index_lookup())) { - LOG_WARN("do index lookup failed", K(ret)); - } else { - state_ = OUTPUT_ROWS; - } - break; - } - case OUTPUT_ROWS: { - if (OB_FAIL(get_next_row_from_data_table())) { - if (OB_ITER_END == ret) { - if (!index_end_) { - ret = OB_SUCCESS; - state_ = INDEX_SCAN; - ObLocalIndexLookupOp::reset_lookup_state(); - } else { - state_ = FINISHED; - } - } else { - LOG_WARN("look up get next row failed", K(ret)); - } - } else { - got_next_row = true; - ++lookup_row_cnt_; - LOG_DEBUG("got next row from table lookup", K(ret), K(lookup_row_cnt_), K(lookup_rowkey_cnt_), "main table output", ROWEXPR2STR(get_eval_ctx(), get_output_expr()) ); - } - break; - } - case FINISHED: { - ret = OB_ITER_END; - break; - } - default: { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected state", K(state_)); - } - } - } - - return ret; -} - -int ObMulValueIndexLookupOp::save_doc_id_and_rowkey() -{ - int ret = OB_SUCCESS; - - // index_column_cnt : |multivalue column| rowkey column | doc-id column | - int64_t index_column_cnt = index_ctdef_->result_output_.count(); - const storage::ObTableReadInfo& read_info = lookup_ctdef_->table_param_.get_read_info(); - int64_t main_rowkey_column_cnt = read_info.get_schema_rowkey_count(); - ObObj *obj_ptr = nullptr; - - ObArenaAllocator allocator("MulvalLookup"); - if (OB_ISNULL(obj_ptr = static_cast(allocator.alloc(sizeof(ObObj) * index_column_cnt)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("allocate buffer failed", K(ret), K(index_column_cnt)); - } else { - obj_ptr = new(obj_ptr) ObObj[index_column_cnt]; - } - - int64_t rowkey_null_count = 0; - - for (int64_t i = 0; OB_SUCC(ret) && i < main_rowkey_column_cnt; ++i) { - ObExpr *expr = index_ctdef_->result_output_.at(i); - if (T_PSEUDO_GROUP_ID == expr->type_) { - // do nothing - } else { - ObDatum &col_datum = expr->locate_expr_datum(*lookup_rtdef_->eval_ctx_); - if (OB_FAIL(col_datum.to_obj(obj_ptr[i], expr->obj_meta_, expr->obj_datum_map_))) { - LOG_WARN("convert datum to obj failed", K(ret)); - } else if (col_datum.is_null()) { - rowkey_null_count++; - } - } - } - - if (OB_FAIL(ret)) { - } else if (rowkey_null_count != main_rowkey_column_cnt) { - ++index_rowkey_cnt_; - ++lookup_rowkey_cnt_; - ObRowkey main_rowkey(obj_ptr, main_rowkey_column_cnt); - if (OB_FAIL(sorter_.add_item(main_rowkey))) { - LOG_WARN("filter mbr failed", K(ret)); - } - } else { - ++aux_key_count_; - ++lookup_rowkey_cnt_; - // last column is doc-id - int64_t doc_id_idx = main_rowkey_column_cnt; - ObExpr* doc_id_expr = index_ctdef_->result_output_.at(doc_id_idx); - ObDatum& doc_id_datum = doc_id_expr->locate_expr_datum(*lookup_rtdef_->eval_ctx_); - if (OB_FAIL(doc_id_datum.to_obj(obj_ptr[doc_id_idx], doc_id_expr->obj_meta_, doc_id_expr->obj_datum_map_))) { - LOG_WARN("convert datum to obj failed", K(ret)); - } else if (doc_id_datum.is_null()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("docid and rowkey can't both be null", K(ret)); - } else { - ObRowkey table_rowkey(&obj_ptr[doc_id_idx], 1); - if (OB_FAIL(aux_sorter_.add_item(table_rowkey))) { - LOG_WARN("filter mbr failed", K(ret)); - } - } - } - - return ret; -} - -int ObMulValueIndexLookupOp::fetch_index_table_rowkey() -{ - int ret = OB_SUCCESS; - ObITabletScan &tsc_service = get_tsc_service(); - - if (is_inited_) { - } else if (OB_FAIL(init_sort())) { - LOG_WARN("fail to init sorter", K(ret)); - } else { - while (OB_SUCC(ret)) { - index_rtdef_->p_pd_expr_op_->clear_evaluated_flag(); - if (OB_FAIL(rowkey_iter_->get_next_row())) { - if (OB_ITER_END != ret) { - LOG_WARN("get next row from index scan failed", K(ret)); - } - } else if (OB_FAIL(save_doc_id_and_rowkey())) { - LOG_WARN("process data table rowkey with das failed", K(ret)); - } - } - } - return ret; -} - -int ObMulValueIndexLookupOp::save_aux_rowkeys() -{ - INIT_SUCC(ret); - - doc_id_scan_param_.key_ranges_.reset(); - const ObRowkey *idx_row = nullptr; - - int64_t simulate_batch_row_cnt = - EVENT_CALL(EventTable::EN_TABLE_LOOKUP_BATCH_ROW_COUNT); - int64_t default_row_batch_cnt = simulate_batch_row_cnt > 0 ? simulate_batch_row_cnt : MAX_NUM_PER_BATCH; - - if (OB_FAIL(aux_sorter_.do_sort(true))) { - LOG_WARN("do docid sort failed", K(ret)); - } - - for (int64_t i = 0; OB_SUCC(ret) && i < default_row_batch_cnt; ++i) { - if (OB_FAIL(aux_sorter_.get_next_item(idx_row))) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next sorted item", K(ret), K(i)); - } else { - ret = OB_SUCCESS; - } - } else if (aux_last_rowkey_ != *idx_row) { - ObNewRange lookup_range; - uint64_t ref_table_id = doc_id_lookup_ctdef_->ref_table_id_; - if (OB_FAIL(lookup_range.build_range(ref_table_id, *idx_row))) { - LOG_WARN("build lookup range failed", K(ret), K(ref_table_id), K(*idx_row)); - } else if (OB_FAIL(doc_id_scan_param_.key_ranges_.push_back(lookup_range))) { - LOG_WARN("store lookup key range failed", K(ret), K(doc_id_scan_param_)); - } - aux_last_rowkey_ = *idx_row; - LOG_DEBUG("build data table range", K(ret), K(*idx_row), K(lookup_range), K(doc_id_scan_param_.key_ranges_.count())); - } - } - return ret; -} - -int ObMulValueIndexLookupOp::save_rowkeys() -{ - int ret = OB_SUCCESS; - ObStoreRowkey src_key; - const ObRowkey *idx_row = NULL; - int64_t simulate_batch_row_cnt = - EVENT_CALL(EventTable::EN_TABLE_LOOKUP_BATCH_ROW_COUNT); - int64_t default_row_batch_cnt = simulate_batch_row_cnt > 0 ? simulate_batch_row_cnt : MAX_NUM_PER_BATCH; - for (int64_t i = 0; OB_SUCC(ret) && i < default_row_batch_cnt; ++i) { - if (OB_FAIL(sorter_.get_next_item(idx_row))) { - if (ret == OB_ITER_END) { - ret = i > 0 ? OB_SUCCESS : ret; - } else if (OB_ITER_END != ret) { - LOG_WARN("fail to get next sorted item", K(ret), K(i)); - } - } else if (last_rowkey_ != *idx_row) { - int64_t group_idx = 0; - for (int64_t i = 0; OB_SUCC(ret) && i < index_ctdef_->result_output_.count(); ++i) { - ObObj tmp_obj; - ObExpr *expr = index_ctdef_->result_output_.at(i); - if (T_PSEUDO_GROUP_ID == expr->type_) { - group_idx = expr->locate_expr_datum(*lookup_rtdef_->eval_ctx_).get_int(); - } - } - - ObNewRange lookup_range; - uint64_t ref_table_id = lookup_ctdef_->ref_table_id_; - if (OB_FAIL(lookup_range.build_range(ref_table_id, *idx_row))) { - LOG_WARN("build lookup range failed", K(ret), K(ref_table_id), K(*idx_row)); - } else if (FALSE_IT(lookup_range.group_idx_ = group_idx)) { - } else if (OB_FAIL(scan_param_.key_ranges_.push_back(lookup_range))) { - LOG_WARN("store lookup key range failed", K(ret), K(scan_param_)); - } - last_rowkey_ = *idx_row; - LOG_DEBUG("build data table range", K(ret), K(*idx_row), K(lookup_range), K(scan_param_.key_ranges_.count())); - } - } - return ret; -} - -int ObMulValueIndexLookupOp::get_aux_table_rowkey() -{ - INIT_SUCC(ret); - - if (OB_FAIL(fetch_rowkey_from_aux())) { - LOG_WARN("fetch rowkey from doc-rowkey table failed", K(ret)); - } else if (OB_FAIL(save_rowkeys())) { - LOG_WARN("store rowkeys failed", K(ret)); - } - - return ret; -} - - -int ObMulValueIndexLookupOp::fetch_rowkey_from_aux() -{ - INIT_SUCC(ret); - - ObITabletScan &tsc_service = get_tsc_service(); - ObNewRowIterator *&storage_iter = get_aux_lookup_iter(); - - if (aux_key_count_ == 0) { - //do nothing - } else if (storage_iter == nullptr) { - //first index lookup, init scan param and do table scan - if (OB_FAIL(set_doc_id_idx_lookup_param( - doc_id_lookup_ctdef_, doc_id_lookup_rtdef_, doc_id_scan_param_, doc_id_idx_tablet_id_, ls_id_))) { - LOG_WARN("failed to init doc id lookup scan param", K(ret)); - } else if (OB_FAIL(save_aux_rowkeys())) { - LOG_WARN("failed to save aux keys failed", K(ret)); - } else if (OB_FAIL(tsc_service.table_scan(doc_id_scan_param_, - storage_iter))) { - if (OB_SNAPSHOT_DISCARDED == ret && doc_id_scan_param_.fb_snapshot_.is_valid()) { - ret = OB_INVALID_QUERY_TIMESTAMP; - } else if (OB_TRY_LOCK_ROW_CONFLICT != ret) { - LOG_WARN("fail to scan table", K(doc_id_scan_param_), K(ret)); - } - } - } else { - const ObTabletID &storage_tablet_id = doc_id_scan_param_.tablet_id_; - doc_id_scan_param_.need_switch_param_ = (storage_tablet_id.is_valid() && storage_tablet_id != tablet_id_ ? true : false); - doc_id_scan_param_.tablet_id_ = tablet_id_; - doc_id_scan_param_.ls_id_ = ls_id_; - if (OB_FAIL(save_aux_rowkeys())) { - LOG_WARN("failed to save aux keys failed", K(ret)); - } else if (OB_FAIL(tsc_service.table_rescan(doc_id_scan_param_, storage_iter))) { - LOG_WARN("table_rescan scan iter failed", K(ret)); - } - } - - if (aux_key_count_ > 0) { - while (OB_SUCC(ret)) { - doc_id_lookup_rtdef_->p_pd_expr_op_->clear_evaluated_flag(); - if (OB_FAIL(storage_iter->get_next_row())) { - if (OB_ITER_END != ret) { - LOG_WARN("get next row from index scan failed", K(ret)); - } else { - ret = OB_SUCCESS; - break; - } - } else { - ObObj *obj_ptr = nullptr; - ObArenaAllocator allocator("MulvalLookup"); - - const storage::ObTableReadInfo& read_info = lookup_ctdef_->table_param_.get_read_info(); - int64_t main_rowkey_column_cnt = read_info.get_schema_rowkey_count(); - - if (OB_ISNULL(obj_ptr = static_cast(allocator.alloc(sizeof(ObObj) * main_rowkey_column_cnt)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("allocate buffer failed", K(ret), K(main_rowkey_column_cnt)); - } else { - obj_ptr = new(obj_ptr) ObObj[main_rowkey_column_cnt]; - } - - for (int64_t i = 0; OB_SUCC(ret) && i < main_rowkey_column_cnt; ++i) { - ObExpr *expr = doc_id_lookup_ctdef_->result_output_.at(i); - if (T_PSEUDO_GROUP_ID == expr->type_) { - // do nothing - } else { - ObDatum &rowkey_datum = expr->locate_expr_datum(*doc_id_lookup_rtdef_->eval_ctx_); - if (OB_FAIL(rowkey_datum.to_obj(obj_ptr[i], expr->obj_meta_, expr->obj_datum_map_))) { - LOG_WARN("convert datum to obj failed", K(ret)); - } - } - } - if (OB_SUCC(ret)) { - ObRowkey table_rowkey(obj_ptr, main_rowkey_column_cnt); - if (OB_FAIL(sorter_.add_item(table_rowkey))) { - LOG_WARN("filter mbr failed", K(ret)); - } else { - LOG_TRACE("add rowkey success", K(table_rowkey), K(obj_ptr), K(obj_ptr[0]), K(main_rowkey_column_cnt)); - } - } - } - } - } - - if (OB_FAIL(ret)) { - } else if (OB_FAIL(sorter_.do_sort(true))) { - LOG_WARN("do rowkey sort failed", K(ret)); - } else { - lookup_rowkey_cnt_ = 0; - } - - return ret; -} - -int ObMulValueIndexLookupOp::reuse_scan_iter(bool need_switch_param) -{ - int ret = OB_SUCCESS; - ObITabletScan &tsc_service = get_tsc_service(); - - // reset var for multi value - aux_last_rowkey_.reset(); - last_rowkey_.reset(); - - if (OB_FAIL(ObDomainIndexLookupOp::reuse_scan_iter())) { - LOG_WARN("failed to reuse scan iter", K(ret)); - } else if (OB_FAIL(tsc_service.reuse_scan_iter(need_switch_param, rowkey_iter_))) { - LOG_WARN("failed to reuse scan iter", K(ret)); - } else if (OB_NOT_NULL(aux_lookup_iter_)) { - const ObTabletID &scan_tablet_id = doc_id_scan_param_.tablet_id_; - doc_id_scan_param_.need_switch_param_ = scan_tablet_id.is_valid() && scan_tablet_id != doc_id_idx_tablet_id_; - if (OB_FAIL(tsc_service.reuse_scan_iter(doc_id_scan_param_.need_switch_param_, aux_lookup_iter_))) { - LOG_WARN("failed to reuse scan iter", K(ret)); - } else { - doc_id_scan_param_.key_ranges_.reuse(); - doc_id_scan_param_.ss_key_ranges_.reuse(); - } - } - return ret; -} - } // namespace sql } // namespace oceanbase diff --git a/src/sql/das/ob_domain_index_lookup_op.h b/src/sql/das/ob_domain_index_lookup_op.h index 9f07fafb24..5fd613ce50 100644 --- a/src/sql/das/ob_domain_index_lookup_op.h +++ b/src/sql/das/ob_domain_index_lookup_op.h @@ -19,27 +19,6 @@ namespace oceanbase { namespace sql { -class ObDomainRowkeyComp { -public: - ObDomainRowkeyComp(int &sort_ret) : result_code_(sort_ret) {} - - bool operator()(const ObRowkey *left, const ObRowkey *right) - { - bool bool_ret = false; - if (OB_UNLIKELY(common::OB_SUCCESS != result_code_)) { - //do nothing - } else if (OB_UNLIKELY(NULL == left) - || OB_UNLIKELY(NULL == right)) { - result_code_ = common::OB_INVALID_ARGUMENT; - LOG_WARN_RET(result_code_, "Invaid argument, ", KP(left), KP(right), K_(result_code)); - } else { - bool_ret = (*left) < (*right); - } - return bool_ret; - } - - int &result_code_; -}; class ObDomainIndexLookupOp : public ObLocalIndexLookupOp { @@ -115,68 +94,7 @@ protected: static const int64_t MAX_NUM_PER_BATCH = 1000; }; -class ObMulValueIndexLookupOp : public ObDomainIndexLookupOp -{ -public: - explicit ObMulValueIndexLookupOp(ObIAllocator &allocator) - : ObDomainIndexLookupOp(allocator), - cmp_ret_(0), - aux_cmp_ret_(0), - aux_key_count_(0), - index_rowkey_cnt_(0), - comparer_(cmp_ret_), - aux_comparer_(aux_cmp_ret_), - sorter_(allocator), - aux_sorter_(allocator), - aux_lookup_iter_(nullptr), - last_rowkey_(), - aux_last_rowkey_(), - is_inited_(false) {} - virtual ~ObMulValueIndexLookupOp() - { - sorter_.clean_up(); - sorter_.~ObExternalSort(); - - aux_sorter_.clean_up(); - aux_sorter_.~ObExternalSort(); - } - virtual void do_clear_evaluated_flag() override; - int init(const ObDASBaseCtDef *table_lookup_ctdef, - ObDASBaseRtDef *table_lookup_rtdef, - transaction::ObTxDesc *tx_desc, - transaction::ObTxReadSnapshot *snapshot, - storage::ObTableScanParam &scan_param); - int reuse_scan_iter(bool need_switch_param); -protected: - virtual int init_scan_param() override; -protected: - virtual int fetch_index_table_rowkey(); - virtual int revert_iter() override; - int init_sort(); - int save_aux_rowkeys(); - int save_rowkeys(); - int save_doc_id_and_rowkey(); - int fetch_rowkey_from_aux(); - virtual int get_next_row() override; - void reset_sorter(); - virtual int reset_lookup_state() override; - virtual int get_aux_table_rowkey() override; - ObNewRowIterator*& get_aux_lookup_iter() { return aux_lookup_iter_; } -private: - int cmp_ret_; - int aux_cmp_ret_; - uint32_t aux_key_count_; - int index_rowkey_cnt_; - ObDomainRowkeyComp comparer_; - ObDomainRowkeyComp aux_comparer_; - ObExternalSort sorter_; // use ObRowKeyCompare to compare rowkey - ObExternalSort aux_sorter_; - common::ObNewRowIterator *aux_lookup_iter_; - ObRowkey last_rowkey_; - ObRowkey aux_last_rowkey_; - bool is_inited_; -}; } // namespace sql } // namespace oceanbase #endif /* OBDEV_SRC_SQL_DAS_OB_DOMAIN_INDEX_LOOKUP_OP_H_ */ diff --git a/src/sql/engine/sort/ob_sort_op_impl.cpp b/src/sql/engine/sort/ob_sort_op_impl.cpp index ecc24de74f..ae46615f2b 100644 --- a/src/sql/engine/sort/ob_sort_op_impl.cpp +++ b/src/sql/engine/sort/ob_sort_op_impl.cpp @@ -3354,6 +3354,11 @@ int ObPrefixSortImpl::get_next_batch(const common::ObIArray &exprs, return ret; } +void ObPrefixSortImpl::reuse() +{ + ObSortOpImpl::reuse(); +} + /*********************************** end ObPrefixSortImpl *****************************/ /*********************************** start ObUniqueSortImpl *****************************/ diff --git a/src/sql/engine/sort/ob_sort_op_impl.h b/src/sql/engine/sort/ob_sort_op_impl.h index 1b201dca6a..fd602af2e6 100644 --- a/src/sql/engine/sort/ob_sort_op_impl.h +++ b/src/sql/engine/sort/ob_sort_op_impl.h @@ -273,9 +273,9 @@ public: virtual int64_t get_prefix_pos() const { return 0; } // keep initialized, can sort same rows (same cell type, cell count, projector) after reuse. - void reuse(); + virtual void reuse(); // reset to state before init - void reset(); + virtual void reset(); void destroy() { reset(); } // Add row and return the stored row. @@ -345,9 +345,9 @@ public: int add_stored_row(const ObChunkDatumStore::StoredRow &input_row); - int sort(); + virtual int sort(); - int get_next_row(const common::ObIArray &exprs) + virtual int get_next_row(const common::ObIArray &exprs) { int ret = OB_SUCCESS; const ObChunkDatumStore::StoredRow *sr = NULL; @@ -390,7 +390,7 @@ public: // get next batch rows, %max_cnt should equal or smaller than max batch size. // return OB_ITER_END for EOF - int get_next_batch(const common::ObIArray &exprs, + virtual int get_next_batch(const common::ObIArray &exprs, const int64_t max_cnt, int64_t &read_rows); // rewind get_next_row() iterator to begin. @@ -910,13 +910,13 @@ public: bool is_fetch_with_ties = false); int64_t get_prefix_pos() const { return prefix_pos_; } - int get_next_row(const common::ObIArray &exprs); + virtual int get_next_row(const common::ObIArray &exprs); - int get_next_batch(const common::ObIArray &exprs, + virtual int get_next_batch(const common::ObIArray &exprs, const int64_t max_cnt, int64_t &read_rows); - void reuse(); - void reset(); + virtual void reuse(); + virtual void reset(); private: // fetch rows in same prefix && do sort, set %next_prefix_row_ to NULL // when all child rows are fetched. @@ -974,9 +974,8 @@ private: class ObUniqueSortImpl : public ObSortOpImpl { public: - explicit ObUniqueSortImpl(ObMonitorNode &op_monitor_info) : ObSortOpImpl(op_monitor_info), prev_row_(NULL), prev_buf_size_(0) - { - } + explicit ObUniqueSortImpl(ObMonitorNode &op_monitor_info) : ObSortOpImpl(op_monitor_info), prev_row_(NULL), prev_buf_size_(0) {} + ObUniqueSortImpl() : ObSortOpImpl(), prev_row_(NULL), prev_buf_size_(0) {} virtual ~ObUniqueSortImpl() { @@ -1005,16 +1004,16 @@ public: default_block_size); } - int get_next_row(const common::ObIArray &exprs); + virtual int get_next_row(const common::ObIArray &exprs); int get_next_stored_row(const ObChunkDatumStore::StoredRow *&sr); - int get_next_batch(const common::ObIArray &exprs, + virtual int get_next_batch(const common::ObIArray &exprs, const int64_t max_cnt, int64_t &read_rows); - void reuse(); - void reset(); + virtual void reuse(); + virtual void reset(); - int sort() + virtual int sort() { free_prev_row(); return ObSortOpImpl::sort(); diff --git a/src/sql/optimizer/ob_join_order.cpp b/src/sql/optimizer/ob_join_order.cpp index 5cc7842233..fe1b83a4ea 100755 --- a/src/sql/optimizer/ob_join_order.cpp +++ b/src/sql/optimizer/ob_join_order.cpp @@ -3310,20 +3310,7 @@ int ObJoinOrder::get_valid_index_ids(const uint64_t table_id, // for use index hint, get index ids from hint. if (OB_FAIL(valid_index_ids.assign(log_table_hint->index_list_))) { LOG_WARN("failed to assign index ids", K(ret)); - // TODO, yunyi,zixia need multivalue das iter refactor - } else if (OB_FAIL(temp_prune_candidate_multivalue_index(schema_guard, - ref_table_id, - nullptr, - index_count, - valid_index_ids))) { - LOG_WARN("failed to prune multivalue index", K(ref_table_id), K(ret)); - } else if (valid_index_ids.count() < log_table_hint->index_list_.count() && valid_index_ids.empty()) { - LOG_WARN("hint index may containt multivalue index, which is pruned, using other access path", - K(ref_table_id)); } - } - - if (OB_FAIL(ret) || (valid_index_ids.count() > 0)) { } else if (OB_FAIL(schema_guard->get_can_read_index_array(ref_table_id, tids, index_count, @@ -3342,12 +3329,6 @@ int ObJoinOrder::get_valid_index_ids(const uint64_t table_id, log_table_hint->index_list_, valid_index_ids))) { LOG_WARN("failed to get hint index ids", K(ret)); - } else if (OB_FAIL(temp_prune_candidate_multivalue_index(schema_guard, - ref_table_id, - tids, - index_count, - valid_index_ids))) { - LOG_WARN("failed to prune multivalue index", K(ref_table_id), K(ret)); } else { if (0 == valid_index_ids.count()) { for (int64_t i = -1; OB_SUCC(ret) && i < index_count; ++i) { @@ -16734,15 +16715,6 @@ int ObJoinOrder::try_get_json_generated_col_index_expr(ObRawExpr *depend_expr, } } } - - const ObDMLStmt *stmt = get_plan()->get_stmt(); - if (OB_NOT_NULL(stmt) && stmt->get_stmt_type() != stmt::StmtType::T_SELECT) { - // TODO weiyouchao.wyc - // as domain query not support das iter refractor, in certain case there's happened coredump bug - // temperal ban multivalue access-path when not select - new_qual = nullptr; - LOG_INFO("not surppot delete update replace with domain predict", K(stmt->get_stmt_type())); - } return ret; } @@ -17537,54 +17509,3 @@ int ObJoinOrder::get_matched_inv_index_tid(ObMatchFunRawExpr *match_expr, } return ret; } - -int ObJoinOrder::temp_prune_candidate_multivalue_index(ObSqlSchemaGuard *schema_guard, - const uint64_t table_id, - uint64_t *index_tid_array, - int64_t &size, - ObIArray &valid_index_ids) -{ - // TODO: yunyi, zixia wait multivalue das iter refractor - int ret = OB_SUCCESS; - const uint64_t tenant_id = MTL_ID(); - const ObTableSchema *schema = NULL; - const ObDMLStmt *stmt = get_plan()->get_stmt(); - bool is_link = ObSqlSchemaGuard::is_link_table(stmt, table_id); - ObArray index_tids; - ObArray tmp_valid_index_ids; - - if (!get_plan()->get_stmt()->is_select_stmt() - && (OB_NOT_NULL(index_tid_array) > 0 || valid_index_ids.count() > 0)) { - - for (int64_t i = 0; OB_SUCC(ret) && OB_NOT_NULL(index_tid_array) && i < size; ++i) { - schema_guard->get_table_schema(index_tid_array[i], schema, is_link); - if (OB_ISNULL(schema)) { - } else if (schema->is_multivalue_index_aux()) { - } else if (OB_FAIL(index_tids.push_back(index_tid_array[i]))) { - LOG_WARN("fail to push back index tid array", K(ret), K(index_tids.count())); - } - } - - for (int64_t i = 0; OB_SUCC(ret) && i < valid_index_ids.count(); ++i) { - schema_guard->get_table_schema(valid_index_ids.at(i), schema, is_link); - if (OB_ISNULL(schema)) { - } else if (schema->is_multivalue_index_aux()) { - } else if (OB_FAIL(tmp_valid_index_ids.push_back(valid_index_ids.at(i)))) { - LOG_WARN("fail to push back index tid array", K(ret), K(index_tids.count())); - } - } - - if (OB_FAIL(ret)) { - } else if (OB_FAIL(valid_index_ids.assign(tmp_valid_index_ids))) { - LOG_WARN("fail to assign index tids", K(ret)); - } else if (OB_NOT_NULL(index_tid_array)) { - int64_t i = 0; - for (; OB_FAIL(ret) && i < index_tids.count(); ++i) { - index_tid_array[i] = index_tids.at(i); - } - size = index_tids.count(); - } - } - - return ret; -} diff --git a/src/sql/optimizer/ob_join_order.h b/src/sql/optimizer/ob_join_order.h index 99272052da..e35436e6bd 100755 --- a/src/sql/optimizer/ob_join_order.h +++ b/src/sql/optimizer/ob_join_order.h @@ -1486,11 +1486,7 @@ struct NullAwareAntiJoinInfo { int get_matched_inv_index_tid(ObMatchFunRawExpr *match_expr, uint64_t ref_table_id, uint64_t &inv_idx_tid); - int temp_prune_candidate_multivalue_index(ObSqlSchemaGuard *schema_guard, - const uint64_t table_id, - uint64_t *index_tid_array, - int64_t &size, - ObIArray &valid_index_ids); + int get_vector_inv_index_tid(ObSqlSchemaGuard *schema_guard, ObRawExpr *vector_expr, const uint64_t table_id, diff --git a/src/sql/optimizer/ob_log_table_scan.h b/src/sql/optimizer/ob_log_table_scan.h index 834ca20058..1c91f69d05 100644 --- a/src/sql/optimizer/ob_log_table_scan.h +++ b/src/sql/optimizer/ob_log_table_scan.h @@ -660,6 +660,7 @@ public: inline bool is_tsc_with_vid() const { return is_tsc_with_vid_; } inline bool is_text_retrieval_scan() const { return is_index_scan() && NULL != text_retrieval_info_.match_expr_; } inline bool is_multivalue_index_scan() const { return is_multivalue_index_; } + inline bool is_spatial_index_scan() const { return is_spatial_index_; } inline ObTextRetrievalInfo &get_text_retrieval_info() { return text_retrieval_info_; } inline const ObTextRetrievalInfo &get_text_retrieval_info() const { return text_retrieval_info_; } int prepare_text_retrieval_dep_exprs();