diff --git a/src/sql/engine/px/ob_granule_iterator_op.cpp b/src/sql/engine/px/ob_granule_iterator_op.cpp index 7b04bb10e8..2fa85886bf 100644 --- a/src/sql/engine/px/ob_granule_iterator_op.cpp +++ b/src/sql/engine/px/ob_granule_iterator_op.cpp @@ -191,7 +191,7 @@ ObGranuleIteratorOp::ObGranuleIteratorOp(ObExecContext &exec_ctx, const ObOpSpec void ObGranuleIteratorOp::destroy() { - rescan_tasks_.reset(); + rescan_tasks_info_.destroy(); pwj_rescan_task_infos_.reset(); table_location_keys_.reset(); pruning_partition_ids_.reset(); @@ -226,14 +226,14 @@ int ObGranuleIteratorOp::try_pruning_repart_partition( bool &partition_pruned) { int ret = OB_SUCCESS; - ObGranuleTaskInfo info; + uint64_t tablet_id = OB_INVALID_ID; if (OB_INVALID_ID == ctx_.get_gi_pruning_info().get_part_id()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("pruning id is not set", K(ret)); - } else if (OB_FAIL(taskset.get_task_at_pos(info, pos))) { + } else if (OB_FAIL(taskset.get_task_tablet_id_at_pos(pos, tablet_id))) { LOG_WARN("get task info failed", K(ret)); } else { - partition_pruned = repart_partition_pruned(info); + partition_pruned = tablet_id != ctx_.get_gi_pruning_info().get_part_id(); } return ret; } @@ -241,8 +241,8 @@ int ObGranuleIteratorOp::try_pruning_repart_partition( // 逻辑说明: // 对于 NLJ rescan 右表场景,它分为两步: // 1. 开始扫描之前,is_rescan_ = false, 会反复调用 try_fetch_task -// 把所有分区都填到 rescan_tasks_ 里, 然后会设置 is_rescan_ = true -// 2. 开始扫描后,对于左边来的每一行,都会反复从 rescan_tasks_ 选择合适的 task 来做 +// 把所有分区都填到 rescan_tasks_pos_ 里, 然后会设置 is_rescan_ = true +// 2. 开始扫描后,对于左边来的每一行,都会反复从 rescan_tasks_pos_ 选择合适的 task 来做 // 扫描。之所以引入 partition pruning 是为了处理 NLJ 右表是分区表场景 // 下,避免扫描无效分区。 int ObGranuleIteratorOp::try_fetch_task(ObGranuleTaskInfo &info) @@ -260,22 +260,14 @@ int ObGranuleIteratorOp::try_fetch_task(ObGranuleTaskInfo &info) LOG_WARN("the pump can not be null", K(ret)); } else { if (is_rescan_) { - bool partition_pruned = false; - do { - partition_pruned = false; - if (rescan_task_idx_ >= rescan_tasks_.count()) { - ret = OB_ITER_END; + ret = get_next_task_pos(pos, taskset); + if (OB_SUCC(ret)) { + if (OB_FAIL(taskset->get_task_at_pos(info, pos))) { + LOG_WARN("get task info failed", K(ret)); } else { - taskset = rescan_taskset_; - pos = rescan_tasks_.at(rescan_task_idx_++); - if (ObGranuleUtil::enable_partition_pruning(MY_SPEC.gi_attri_flag_) - && OB_FAIL(try_pruning_repart_partition(*taskset, pos, partition_pruned))) { - LOG_WARN("fail try prune partition", K(ret)); - } else if (partition_pruned) { - // next task - } + info.task_id_ = worker_id_; } - } while (OB_SUCC(ret) && partition_pruned); + } } else { const bool from_share_pool = !MY_SPEC.affinitize_ && !MY_SPEC.access_all_; if (OB_FAIL(gi_task_pump->fetch_granule_task(taskset, @@ -291,8 +283,11 @@ int ObGranuleIteratorOp::try_fetch_task(ObGranuleTaskInfo &info) } else if (NULL == taskset) { ret = OB_ERR_UNEXPECTED; LOG_WARN("NULL taskset returned", K(ret)); - } else if (OB_FAIL(rescan_tasks_.push_back(pos))) { - LOG_WARN("array push back failed", K(ret)); + } else if (OB_FAIL(taskset->get_task_at_pos(info, pos))) { + LOG_WARN("get task info failed", K(ret)); + } else if (FALSE_IT(info.task_id_ = worker_id_)) { + } else if (OB_FAIL(rescan_tasks_info_.insert_rescan_task(pos, info))) { + LOG_WARN("array push back failed", K(ret), K(info)); } else { if (NULL == rescan_taskset_) { rescan_taskset_ = taskset; @@ -303,16 +298,91 @@ int ObGranuleIteratorOp::try_fetch_task(ObGranuleTaskInfo &info) } } } - - if(OB_FAIL(ret)) { - } else if (OB_FAIL(taskset->get_task_at_pos(info, pos))) { - LOG_WARN("get task info failed", K(ret)); - } else { - info.task_id_ = worker_id_; - } return ret; } //GI has its own rescan + +int ObGranuleIteratorOp::get_next_task_pos(int64_t &pos, const ObGITaskSet *&taskset) +{ + int ret = OB_SUCCESS; + if (rescan_tasks_info_.use_opt_) { + taskset = rescan_taskset_; + if (rescan_task_idx_ > 0) { + ret = OB_ITER_END; + } else if (OB_FAIL(rescan_tasks_info_.rescan_tasks_map_.get_refactored( + ctx_.get_gi_pruning_info().get_part_id(), pos))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_ITER_END; + } else { + LOG_WARN("get tablet task pos failed", K(ret)); + } + } else { + rescan_task_idx_++; + } + } else { + bool partition_pruned = false; + do { + partition_pruned = false; + if (rescan_task_idx_ >= rescan_tasks_info_.rescan_tasks_pos_.count()) { + ret = OB_ITER_END; + } else { + taskset = rescan_taskset_; + pos = rescan_tasks_info_.rescan_tasks_pos_.at(rescan_task_idx_++); + if (ObGranuleUtil::enable_partition_pruning(MY_SPEC.gi_attri_flag_) + && OB_FAIL(try_pruning_repart_partition(*taskset, pos, partition_pruned))) { + LOG_WARN("fail try prune partition", K(ret)); + } else if (partition_pruned) { + // next task + } + } + } while (OB_SUCC(ret) && partition_pruned); + } + return ret; +} + +int ObGranuleIteratorOp::pw_get_next_task_pos(const common::ObIArray &op_ids) +{ + int ret = OB_SUCCESS; + if (rescan_tasks_info_.use_opt_) { + if (rescan_task_idx_ > 0) { + ret = OB_ITER_END; + state_ = GI_END; + all_task_fetched_ = true; + } else if (OB_FAIL(rescan_tasks_info_.rescan_tasks_map_.get_refactored( + ctx_.get_gi_pruning_info().get_part_id(), rescan_task_idx_))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_ITER_END; + } else { + LOG_WARN("get tablet task pos failed", K(ret), K(ctx_.get_gi_pruning_info().get_part_id())); + } + } + } else { + bool partition_pruned = false; + int64_t repart_idx = MY_SPEC.repart_pruning_tsc_idx_; + if (OB_UNLIKELY(repart_idx < 0 || repart_idx >= op_ids.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected repart pruning tsc index", K(ret), K(repart_idx)); + } + do { + if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) { + ret = OB_ITER_END; + state_ = GI_END; + all_task_fetched_ = true; + } else if (OB_UNLIKELY(rescan_task_idx_ + repart_idx >= pwj_rescan_task_infos_.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected pwj_rescan_task_infos_ count", K(ret), K(rescan_task_idx_), + K(repart_idx), K(pwj_rescan_task_infos_.count()), K(op_ids)); + } else { + partition_pruned = repart_partition_pruned(pwj_rescan_task_infos_.at(rescan_task_idx_ + repart_idx)); + if (partition_pruned) { + rescan_task_idx_ += op_ids.count(); + } + } + } while (OB_SUCC(ret) && partition_pruned); + } + return ret; +} + int ObGranuleIteratorOp::rescan() { int ret = ObOperator::inner_rescan(); @@ -325,7 +395,7 @@ int ObGranuleIteratorOp::rescan() pump_version_ = pump_->get_pump_version(); is_rescan_ = false; rescan_taskset_ = NULL; - rescan_tasks_.reset(); + rescan_tasks_info_.reset(); all_task_fetched_ = false; pwj_rescan_task_infos_.reset(); pruning_partition_ids_.reset(); @@ -387,6 +457,8 @@ int ObGranuleIteratorOp::inner_open() ObOperator *real_child = nullptr; if (OB_FAIL(parameters_init())) { LOG_WARN("parameters init failed", K(ret)); + } else if (OB_FAIL(init_rescan_tasks_info())) { + LOG_WARN("init rescan tasks info failed", K(ret)); } else { if (!MY_SPEC.full_partition_wise()) { if (OB_FAIL(get_gi_task_consumer_node(this, real_child))) { @@ -692,7 +764,9 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool &partition_pruning) if (OB_SUCC(ret)) { if (is_rescan_) { if (OB_FAIL(fetch_rescan_pw_task_infos(*op_ids_pointer, gi_prepare_map, gi_task_infos))) { - LOG_WARN("fail to fetch rescan pw task infos", K(ret)); + if (OB_ITER_END != ret) { + LOG_WARN("fail to fetch rescan pw task infos", K(ret)); + } } } else if (OB_FAIL(fetch_normal_pw_task_infos(*op_ids_pointer, gi_prepare_map, gi_task_infos))) { if (OB_ITER_END != ret) { @@ -1047,28 +1121,11 @@ int ObGranuleIteratorOp::fetch_rescan_pw_task_infos(const common::ObIArray= op_ids.count())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected repart pruning tsc index", K(ret), K(repart_idx)); - } - do { - if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) { - ret = OB_ITER_END; - state_ = GI_END; - all_task_fetched_ = true; - } else if (OB_UNLIKELY(rescan_task_idx_ + repart_idx >= pwj_rescan_task_infos_.count())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected pwj_rescan_task_infos_ count", K(ret), K(rescan_task_idx_), - K(repart_idx), K(pwj_rescan_task_infos_.count()), K(op_ids)); - } else { - partition_pruned = repart_partition_pruned(pwj_rescan_task_infos_.at(rescan_task_idx_ + repart_idx)); - if (partition_pruned) { - rescan_task_idx_ += op_ids.count(); - } + if (OB_FAIL(pw_get_next_task_pos(op_ids))) { + if (OB_ITER_END != ret) { + LOG_WARN("pw get next task pos failed", K(ret)); } - } while (OB_SUCC(ret) && partition_pruned); + } } ARRAY_FOREACH_X(op_ids, idx, cnt, OB_SUCC(ret)) { // GI在向Map中塞任务的时候,需要尝试清理上一次塞入的任务 @@ -1114,6 +1171,20 @@ int ObGranuleIteratorOp::fetch_normal_pw_task_infos(const common::ObIArray= gi_task_infos.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected gi task infos count", K(repart_idx), K(gi_task_infos.count()), + K(gi_task_infos)); + } else if (OB_FAIL(rescan_tasks_info_.rescan_tasks_map_.set_refactored( + gi_task_infos.at(repart_idx).tablet_loc_->tablet_id_.id(), pwj_rescan_task_infos_.count()))) { + LOG_WARN("set refactored failed", K(ret), KPC(gi_task_infos.at(repart_idx).tablet_loc_)); + } else { + LOG_TRACE("set pw rescan task pos", K(spec_.id_), K(repart_idx), K(pwj_rescan_task_infos_.count()), + K(gi_task_infos.at(repart_idx).tablet_loc_->tablet_id_)); + } + } for (int i = 0; i < gi_task_infos.count() && OB_SUCC(ret); ++i) { if (OB_FAIL(pwj_rescan_task_infos_.push_back(gi_task_infos.at(i)))) { LOG_WARN("fail to rescan pwj task info", K(ret)); @@ -1415,5 +1486,44 @@ int ObGranuleIteratorOp::do_parallel_runtime_filter_extract_query_range( return ret; } +int ObGranuleIteratorOp::RescanTasksInfo::insert_rescan_task(int64_t pos, const ObGranuleTaskInfo &info) +{ + int ret = OB_SUCCESS; + if (use_opt_) { + ret = rescan_tasks_map_.set_refactored(info.tablet_loc_->tablet_id_.id(), pos); + } else { + ret = rescan_tasks_pos_.push_back(pos); + } + return ret; +} + +int ObGranuleIteratorOp::init_rescan_tasks_info() +{ + int ret = OB_SUCCESS; + rescan_tasks_info_.use_opt_ = ObGranuleUtil::enable_partition_pruning(MY_SPEC.gi_attri_flag_); + if (!rescan_tasks_info_.use_opt_) { + } else if (OB_ISNULL(pump_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null pump_", K(ret), K(spec_.id_)); + } else if (OB_UNLIKELY(parallelism_ <= 0 || pump_->get_pump_args().count() < 1 + || pump_->get_pump_args().at(0).tablet_arrays_.count() < 1)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("unexpected argument", K(parallelism_), K(pump_->get_pump_args())); + } else { + int64_t tablet_cnt = pump_->get_pump_args().at(0).tablet_arrays_.count(); + if (tablet_cnt < parallelism_) { + // no use optimization if parallelism_ is greater than tablet count. + rescan_tasks_info_.use_opt_ = false; + } else { + const ObMemAttr attr(MTL_ID(), "GIRescanTaskMap"); + int64_t bucket_num = tablet_cnt / parallelism_ * 2; + if (OB_FAIL(rescan_tasks_info_.rescan_tasks_map_.create(bucket_num, attr))) { + LOG_WARN("init map failed", K(ret)); + } + } + } + return ret; +} + } // end namespace sql } // end namespace oceanbase diff --git a/src/sql/engine/px/ob_granule_iterator_op.h b/src/sql/engine/px/ob_granule_iterator_op.h index 57cab5c877..f1c03784d1 100644 --- a/src/sql/engine/px/ob_granule_iterator_op.h +++ b/src/sql/engine/px/ob_granule_iterator_op.h @@ -151,6 +151,27 @@ private: GI_GET_NEXT_GRANULE_TASK, GI_END, }; + class RescanTasksInfo + { + public: + RescanTasksInfo() : use_opt_(false) {} + void reset() { + rescan_tasks_pos_.reset(); + rescan_tasks_map_.clear(); + } + void destroy() { + rescan_tasks_pos_.reset(); + rescan_tasks_map_.destroy(); + } + int insert_rescan_task(int64_t pos, const ObGranuleTaskInfo &info); + // use opt means partition_pruning is enabled and pos of task of each tablet is recorded in rescan_tasks_map_. + bool use_opt_; + common::ObSEArray rescan_tasks_pos_; + // key is tablet_id, value is + // 1.non-pw: pos. call ObGITaskSet::get_task_at_pos(pos) to get ObGranuleTaskInfo. + // 2.pw: rescan_task_idx_. pwj_rescan_task_infos_[rescan_task_idx_] to get ObGranuleTaskInfo. + hash::ObHashMap rescan_tasks_map_; + }; public: ObGranuleIteratorOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input); ~ObGranuleIteratorOp() {} @@ -175,6 +196,8 @@ private: // 非full partition wise获得task的方式 // TODO: jiangting.lk 重构下函数名字 int try_fetch_task(ObGranuleTaskInfo &info); + int get_next_task_pos(int64_t &pos, const ObGITaskSet *&taskset); + int pw_get_next_task_pos(const common::ObIArray &op_ids); /** * @brief * full partition wise的模式下,通过op ids获得对应的task infos @@ -222,6 +245,7 @@ private: int wait_partition_runtime_filter_ready(bool &partition_pruning); int do_join_filter_partition_pruning(int64_t tablet_id, bool &partition_pruning); int try_build_tablet2part_id_map(); + int init_rescan_tasks_info(); //---end---- // for runtime filter extract query_range @@ -245,7 +269,7 @@ private: bool all_task_fetched_; bool is_rescan_; const ObGITaskSet *rescan_taskset_ = NULL; - common::ObSEArray rescan_tasks_; + RescanTasksInfo rescan_tasks_info_; int64_t rescan_task_idx_; // full pwj场景下, 在执行过程中缓存住了自己的任务队列. // 供GI rescan使用 diff --git a/src/sql/engine/px/ob_granule_pump.cpp b/src/sql/engine/px/ob_granule_pump.cpp index c518ca967b..bc99b5022c 100644 --- a/src/sql/engine/px/ob_granule_pump.cpp +++ b/src/sql/engine/px/ob_granule_pump.cpp @@ -64,6 +64,19 @@ int ObGITaskSet::get_task_at_pos(ObGranuleTaskInfo &info, const int64_t &pos) co return ret; } +int ObGITaskSet::get_task_tablet_id_at_pos(const int64_t &pos, uint64_t &tablet_id) const +{ + int ret = OB_SUCCESS; + if (pos < 0 || pos >= gi_task_set_.count()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(pos)); + } else { + int64_t cur_idx = gi_task_set_.at(pos).idx_; + tablet_id = gi_task_set_.at(pos).tablet_loc_->tablet_id_.id(); + } + return ret; +} + int ObGITaskSet::get_next_gi_task_pos(int64_t &pos) { int ret = OB_SUCCESS; @@ -439,7 +452,9 @@ int ObGranulePump::fetch_pw_granule_by_worker_id(ObIArray &in if (OB_FAIL(ret)) { } else if (OB_FAIL(check_pw_end(end_tsc_count, op_ids.count(), infos.count()))) { - LOG_WARN("incorrect state", K(ret)); + if (OB_ITER_END != ret) { + LOG_WARN("incorrect state", K(ret)); + } } LOG_TRACE("get a new partition wise join gi tasks", K(infos), K(ret)); return ret; diff --git a/src/sql/engine/px/ob_granule_pump.h b/src/sql/engine/px/ob_granule_pump.h index c49fd9a10b..ed00c51329 100644 --- a/src/sql/engine/px/ob_granule_pump.h +++ b/src/sql/engine/px/ob_granule_pump.h @@ -153,6 +153,8 @@ public: ObGITaskSet() : gi_task_set_(), cur_pos_(0) {} TO_STRING_KV(K(gi_task_set_), K(cur_pos_)); int get_task_at_pos(ObGranuleTaskInfo &info, const int64_t &pos) const; + int get_task_tablet_id_at_pos(const int64_t &pos, uint64_t &tablet_id) const; + int get_next_gi_task_pos(int64_t &pos); int get_next_gi_task(ObGranuleTaskInfo &info); int assign(const ObGITaskSet &other);