diff --git a/src/sql/engine/table/ob_table_scan_op.cpp b/src/sql/engine/table/ob_table_scan_op.cpp index ac74abb75d..055cd103cf 100644 --- a/src/sql/engine/table/ob_table_scan_op.cpp +++ b/src/sql/engine/table/ob_table_scan_op.cpp @@ -604,10 +604,6 @@ ObTableScanOp::ObTableScanOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOp column_checksum_(), scan_task_id_(0), report_checksum_(false), - range_buffers_(NULL), - range_buffer_idx_(0), - group_size_(0), - max_group_size_(0), in_rescan_(false), global_index_lookup_op_(NULL), spat_index_() @@ -780,8 +776,8 @@ int ObTableScanOp::prepare_all_das_tasks() if (MY_SPEC.batch_scan_flag_) { if (OB_SUCC(ret)) { if (!tsc_rtdef_.bnlj_params_.empty()) { - group_size_ = tsc_rtdef_.bnlj_params_.at(0).second->count_; - if (OB_UNLIKELY(group_size_ > max_group_size_)) { + tsc_rtdef_.group_size_ = tsc_rtdef_.bnlj_params_.at(0).gr_param_->count_; + if (OB_UNLIKELY(tsc_rtdef_.group_size_ > tsc_rtdef_.max_group_size_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("The amount of data exceeds the pre allocated memory", K(ret)); } @@ -797,12 +793,10 @@ int ObTableScanOp::prepare_all_das_tasks() LOG_WARN("prepare das task failed", K(ret)); } } else { - int64_t group_size = MY_SPEC.batch_scan_flag_ ? group_size_ : 1; + int64_t group_size = MY_SPEC.batch_scan_flag_ ? tsc_rtdef_.group_size_ : 1; + GroupRescanParamGuard grp_guard(tsc_rtdef_, GET_PHY_PLAN_CTX(ctx_)->get_param_store_for_update()); for (int64_t i = 0; OB_SUCC(ret) && i < group_size; ++i) { - if (MY_SPEC.batch_scan_flag_) { - replace_bnlj_param(i); - range_buffer_idx_ = i; - } + grp_guard.switch_group_rescan_param(i); if (OB_FAIL(prepare_single_scan_range(i))) { LOG_WARN("prepare single scan range failed", K(ret)); } else if (OB_FAIL(prepare_das_task())) { @@ -815,8 +809,8 @@ int ObTableScanOp::prepare_all_das_tasks() } } if (OB_SUCC(ret)) { - if (OB_FAIL(init_das_group_range(0, group_size_))) { - LOG_WARN("set group range failed", K(ret), K_(group_size)); + if (OB_FAIL(init_das_group_range(0, tsc_rtdef_.group_size_))) { + LOG_WARN("set group range failed", K(ret), K_(tsc_rtdef_.group_size)); } } return ret; @@ -1023,8 +1017,8 @@ int ObTableScanOp::prepare_batch_scan_range() int64_t batch_size = 0; if (OB_SUCC(ret)) { if (!tsc_rtdef_.bnlj_params_.empty()) { - group_size_ = tsc_rtdef_.bnlj_params_.at(0).second->count_; - if (OB_UNLIKELY(group_size_ > max_group_size_)) { + tsc_rtdef_.group_size_ = tsc_rtdef_.bnlj_params_.at(0).gr_param_->count_; + if (OB_UNLIKELY(tsc_rtdef_.group_size_ > tsc_rtdef_.max_group_size_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("The amount of data exceeds the pre allocated memory", K(ret)); } @@ -1033,10 +1027,10 @@ int ObTableScanOp::prepare_batch_scan_range() LOG_WARN("batch nlj params is empry", K(ret)); } } - for (int64_t i = 0; OB_SUCC(ret) && i < group_size_; ++i) { + GroupRescanParamGuard grp_guard(tsc_rtdef_, GET_PHY_PLAN_CTX(ctx_)->get_param_store_for_update()); + for (int64_t i = 0; OB_SUCC(ret) && i < tsc_rtdef_.group_size_; ++i) { //replace real param to param store to extract scan range - replace_bnlj_param(i); - range_buffer_idx_ = i; + grp_guard.switch_group_rescan_param(i); LOG_DEBUG("replace bnlj param to extract range", K(plan_ctx->get_param_store())); if (OB_FAIL(prepare_single_scan_range(i))) { LOG_WARN("prepare single scan range failed", K(ret)); @@ -1060,7 +1054,7 @@ int ObTableScanOp::build_bnlj_params() const ObObjParam &bnlj_param = plan_ctx->get_param_store().at(param_idx); if (bnlj_param.is_ext_sql_array()) { ObSqlArrayObj *array_obj = reinterpret_cast(bnlj_param.get_ext()); - OZ(tsc_rtdef_.bnlj_params_.push_back(BNLJParamInfo(param_idx, array_obj))); + OZ(tsc_rtdef_.bnlj_params_.push_back(GroupRescanParamInfo(param_idx, array_obj))); } } if (OB_SUCC(ret) && tsc_rtdef_.bnlj_params_.empty()) { @@ -1086,8 +1080,8 @@ int ObTableScanOp::prepare_single_scan_range(int64_t group_idx) } else if (is_same_type && MY_CTDEF.pre_query_range_.get_is_equal_and()) { int64_t column_count = MY_CTDEF.pre_query_range_.get_column_count(); size_t range_size = sizeof(ObNewRange) + sizeof(ObObj) * column_count * 2; - void *range_buffers = static_cast(range_buffers_) + range_buffer_idx_ * range_size; - if (range_buffer_idx_ < 0 || range_buffer_idx_ >= max_group_size_) { + void *range_buffers = static_cast(tsc_rtdef_.range_buffers_) + tsc_rtdef_.range_buffer_idx_ * range_size; + if (tsc_rtdef_.range_buffer_idx_ < 0 || tsc_rtdef_.range_buffer_idx_ >= tsc_rtdef_.max_group_size_) { ret = OB_ERROR_OUT_OF_RANGE; LOG_WARN("get wrong offset of range_buffers_", K(ret)); } else if (OB_FAIL(ObSQLUtils::extract_equal_pre_query_range( @@ -1201,19 +1195,6 @@ int ObTableScanOp::single_equal_scan_check_type(const ParamStore ¶m_store, b return ret; } -void ObTableScanOp::replace_bnlj_param(int64_t batch_idx) -{ - ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); - //replace real param to param store to extract scan range - for (int64_t i = 0; i < tsc_rtdef_.bnlj_params_.count(); ++i) { - ObSqlArrayObj *array_obj = tsc_rtdef_.bnlj_params_.at(i).second; - int64_t param_idx = tsc_rtdef_.bnlj_params_.at(i).first; - ObObjParam &dst_param = plan_ctx->get_param_store_for_update().at(param_idx); - dst_param = array_obj->data_[batch_idx]; - dst_param.set_param_meta(); - } -} - int ObTableScanOp::init_converter() { int ret = OB_SUCCESS; @@ -1302,23 +1283,23 @@ int ObTableScanOp::inner_open() if (OB_SUCC(ret)) { // here need add plan batch_size, because in vectorized execution, // left batch may greater than OB_MAX_BULK_JOIN_ROWS - max_group_size_ = OB_MAX_BULK_JOIN_ROWS + MY_SPEC.plan_->get_batch_size(); + tsc_rtdef_.max_group_size_ = OB_MAX_BULK_JOIN_ROWS + MY_SPEC.plan_->get_batch_size(); if (MY_CTDEF.pre_query_range_.get_is_equal_and()) { int64_t column_count = MY_CTDEF.pre_query_range_.get_column_count(); size_t range_size = sizeof(ObNewRange) + sizeof(ObObj) * column_count * 2; if (!MY_SPEC.batch_scan_flag_) { - range_buffers_ = ctx_.get_allocator().alloc(range_size); + tsc_rtdef_.range_buffers_ = ctx_.get_allocator().alloc(range_size); } else { - range_buffers_ = ctx_.get_allocator().alloc(max_group_size_ * range_size); + tsc_rtdef_.range_buffers_ = ctx_.get_allocator().alloc(tsc_rtdef_.max_group_size_ * range_size); } - if (OB_ISNULL(range_buffers_)) { + if (OB_ISNULL(tsc_rtdef_.range_buffers_)) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("allocate memory failed", K(ret), K(range_size), K(range_buffers_)); + LOG_WARN("allocate memory failed", K(ret), K(range_size), K(tsc_rtdef_.range_buffers_)); } else if (!MY_SPEC.batch_scan_flag_) { - ObNewRange *key_range = new(range_buffers_) ObNewRange(); + ObNewRange *key_range = new(tsc_rtdef_.range_buffers_) ObNewRange(); } else { - for (int64_t i = 0; i < max_group_size_; ++i) { - char *range_buffers_off = static_cast(range_buffers_) + i * range_size; + for (int64_t i = 0; i < tsc_rtdef_.max_group_size_; ++i) { + char *range_buffers_off = static_cast(tsc_rtdef_.range_buffers_) + i * range_size; ObNewRange *key_range = new(range_buffers_off) ObNewRange(); } } @@ -1546,7 +1527,7 @@ int ObTableScanOp::inner_rescan_for_tsc() // Therefore, we need to get and save bnlj parameters here or they will be // replaced by NLJ. LOG_WARN("build batch nlj params failed", KR(ret)); - } else if (!need_fetch_batch_result()) { + } else if (!need_real_rescan()) { ret = set_batch_iter(ctx_.get_das_ctx().jump_read_group_id_); } else { if (is_virtual_table(MY_SPEC.ref_table_id_) @@ -1614,8 +1595,8 @@ int ObTableScanOp::local_iter_rescan() if (OB_SUCC(ret)) { if (OB_FAIL(cherry_pick_range_by_tablet_id(scan_op))) { LOG_WARN("prune query range by partition id failed", K(ret)); - } else if (OB_FAIL(init_das_group_range(0, group_size_))) { - LOG_WARN("set group range failed", K(ret), K_(group_size)); + } else if (OB_FAIL(init_das_group_range(0, tsc_rtdef_.group_size_))) { + LOG_WARN("set group range failed", K(ret), K_(tsc_rtdef_.group_size)); } else if (OB_FAIL(MTL(ObDataAccessService*)->rescan_das_task(das_ref_, *scan_op))) { LOG_WARN("rescan das task failed", K(ret)); } @@ -1666,14 +1647,14 @@ int ObTableScanOp::switch_iterator() return OB_NOT_SUPPORTED; } -bool ObTableScanOp::need_fetch_batch_result() +bool ObTableScanOp::need_real_rescan() { bool bret = false; - if (tsc_rtdef_.bnlj_params_.empty()) { + if (!MY_SPEC.batch_scan_flag_) { bret = true; } else { ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); - int64_t param_idx = tsc_rtdef_.bnlj_params_.at(0).first; + int64_t param_idx = tsc_rtdef_.bnlj_params_.at(0).param_idx_; //param store has been inited by nlj, to fetch next batch result bret = plan_ctx->get_param_store().at(param_idx).is_ext_sql_array(); } @@ -2236,7 +2217,7 @@ int ObTableScanOp::cherry_pick_range_by_tablet_id(ObDASScanOp *scan_op) } } if (OB_SUCC(ret)) { - LOG_DEBUG("range after pruning", K(input_ranges), K(scan_ranges), K_(group_size), + LOG_DEBUG("range after pruning", K(input_ranges), K(scan_ranges), K_(tsc_rtdef_.group_size), "tablet_id", scan_op->get_tablet_id(), K(input_ss_ranges), K(ss_ranges)); } diff --git a/src/sql/engine/table/ob_table_scan_op.h b/src/sql/engine/table/ob_table_scan_op.h index de6303f83f..b1dd1c0a92 100644 --- a/src/sql/engine/table/ob_table_scan_op.h +++ b/src/sql/engine/table/ob_table_scan_op.h @@ -105,8 +105,25 @@ public: }; typedef common::ObFixedArray Int64FixedArray; -typedef std::pair BNLJParamInfo; -typedef common::ObFixedArray FixedBNLJParamArray; +struct GroupRescanParamInfo +{ + GroupRescanParamInfo() + : param_idx_(common::OB_INVALID_ID), + gr_param_(nullptr), + cur_param_() + { } + GroupRescanParamInfo(int64_t param_idx, ObSqlArrayObj *gr_param) + : param_idx_(param_idx), + gr_param_(gr_param) + { } + TO_STRING_KV(K_(param_idx), + KPC_(gr_param), + K_(cur_param)); + int64_t param_idx_; + ObSqlArrayObj *gr_param_; //group rescan param + common::ObObjParam cur_param_; //current param in param store, used to restore paramstore state after the completion of group rescan. +}; +typedef common::ObFixedArray GroupRescanParamArray; struct ObTableScanCtDef { OB_UNIS_VERSION(1); @@ -178,18 +195,30 @@ struct ObTableScanRtDef ObTableScanRtDef(common::ObIAllocator &allocator) : bnlj_params_(allocator), scan_rtdef_(), - lookup_rtdef_(nullptr) + lookup_rtdef_(nullptr), + range_buffers_(nullptr), + range_buffer_idx_(0), + group_size_(0), + max_group_size_(0) { } void prepare_multi_part_limit_param(); bool has_lookup_limit() const { return lookup_rtdef_ != nullptr && lookup_rtdef_->limit_param_.is_valid(); } TO_STRING_KV(K_(scan_rtdef), - KPC_(lookup_rtdef)); + KPC_(lookup_rtdef), + K_(group_size), + K_(max_group_size)); - FixedBNLJParamArray bnlj_params_; + GroupRescanParamArray bnlj_params_; ObDASScanRtDef scan_rtdef_; ObDASScanRtDef *lookup_rtdef_; + // for equal_query_range opt + void *range_buffers_; + int64_t range_buffer_idx_; + // for equal_query_range opt end + int64_t group_size_; + int64_t max_group_size_; }; // table scan operator input @@ -469,7 +498,7 @@ protected: int report_ddl_column_checksum(); int get_next_batch_with_das(int64_t &count, int64_t capacity); void replace_bnlj_param(int64_t batch_idx); - bool need_fetch_batch_result(); + bool need_real_rescan(); static int check_is_physical_rowid(ObIAllocator &allocator, ObRowkey &row_key, bool &is_physical_rowid, @@ -501,6 +530,54 @@ protected: } bool is_foreign_check_nested_session() { return ObSQLUtils::is_fk_nested_sql(&ctx_);} + class GroupRescanParamGuard + { + public: + GroupRescanParamGuard(ObTableScanRtDef &tsc_rtdef, ParamStore ¶m_store) + : tsc_rtdef_(tsc_rtdef), + param_store_(param_store), + range_buffer_idx_(0) + { + //Save the original state in param store. + //The param store may be modified during the execution of group rescan. + //After the execution is completed, the original state needs to be restored. + for (int64_t i = 0; i < tsc_rtdef_.bnlj_params_.count(); ++i) { + int64_t param_idx = tsc_rtdef_.bnlj_params_.at(i).param_idx_; + common::ObObjParam &cur_param = param_store_.at(param_idx); + tsc_rtdef_.bnlj_params_.at(i).cur_param_ = cur_param; + } + range_buffer_idx_ = tsc_rtdef_.range_buffer_idx_; + } + + void switch_group_rescan_param(int64_t group_idx) + { + //replace real param to param store to execute group rescan in TSC + for (int64_t i = 0; i < tsc_rtdef_.bnlj_params_.count(); ++i) { + ObSqlArrayObj *array_obj = tsc_rtdef_.bnlj_params_.at(i).gr_param_; + int64_t param_idx = tsc_rtdef_.bnlj_params_.at(i).param_idx_; + common::ObObjParam &dst_param = param_store_.at(param_idx); + dst_param = array_obj->data_[group_idx]; + dst_param.set_param_meta(); + } + tsc_rtdef_.range_buffer_idx_ = group_idx; + } + + ~GroupRescanParamGuard() + { + //restore the original state to param store + for (int64_t i = 0; i < tsc_rtdef_.bnlj_params_.count(); ++i) { + int64_t param_idx = tsc_rtdef_.bnlj_params_.at(i).param_idx_; + common::ObObjParam &cur_param = param_store_.at(param_idx); + cur_param = tsc_rtdef_.bnlj_params_.at(i).cur_param_; + } + tsc_rtdef_.range_buffer_idx_ = range_buffer_idx_; + } + private: + ObTableScanRtDef &tsc_rtdef_; + ParamStore ¶m_store_; + int64_t range_buffer_idx_; + }; + private: const ObTableScanSpec& get_tsc_spec() {return MY_SPEC;} const ObTableScanCtDef& get_tsc_ctdef() {return MY_SPEC.tsc_ctdef_;} @@ -534,12 +611,6 @@ protected: common::ObFixedArray column_checksum_; int64_t scan_task_id_; bool report_checksum_; - // for equal_query_range opt - void *range_buffers_; - int64_t range_buffer_idx_; - // for equal_query_range opt end - int64_t group_size_; - int64_t max_group_size_; bool in_rescan_; ObGlobalIndexLookupOpImpl *global_index_lookup_op_; ObSpatialIndexCache spat_index_;