[CP] optimize performance of gi partition pruning in pkey plan

This commit is contained in:
sdc
2024-02-27 09:15:17 +00:00
committed by ob-robot
parent 7339c60cb9
commit 86e6b53016
4 changed files with 205 additions and 54 deletions

View File

@ -191,7 +191,7 @@ ObGranuleIteratorOp::ObGranuleIteratorOp(ObExecContext &exec_ctx, const ObOpSpec
void ObGranuleIteratorOp::destroy()
{
rescan_tasks_.reset();
rescan_tasks_info_.destroy();
pwj_rescan_task_infos_.reset();
table_location_keys_.reset();
pruning_partition_ids_.reset();
@ -226,14 +226,14 @@ int ObGranuleIteratorOp::try_pruning_repart_partition(
bool &partition_pruned)
{
int ret = OB_SUCCESS;
ObGranuleTaskInfo info;
uint64_t tablet_id = OB_INVALID_ID;
if (OB_INVALID_ID == ctx_.get_gi_pruning_info().get_part_id()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pruning id is not set", K(ret));
} else if (OB_FAIL(taskset.get_task_at_pos(info, pos))) {
} else if (OB_FAIL(taskset.get_task_tablet_id_at_pos(pos, tablet_id))) {
LOG_WARN("get task info failed", K(ret));
} else {
partition_pruned = repart_partition_pruned(info);
partition_pruned = tablet_id != ctx_.get_gi_pruning_info().get_part_id();
}
return ret;
}
@ -241,8 +241,8 @@ int ObGranuleIteratorOp::try_pruning_repart_partition(
// 逻辑说明:
// 对于 NLJ rescan 右表场景,它分为两步:
// 1. 开始扫描之前,is_rescan_ = false, 会反复调用 try_fetch_task
// 把所有分区都填到 rescan_tasks_ 里, 然后会设置 is_rescan_ = true
// 2. 开始扫描后,对于左边来的每一行,都会反复从 rescan_tasks_ 选择合适的 task 来做
// 把所有分区都填到 rescan_tasks_pos_ 里, 然后会设置 is_rescan_ = true
// 2. 开始扫描后,对于左边来的每一行,都会反复从 rescan_tasks_pos_ 选择合适的 task 来做
// 扫描。之所以引入 partition pruning 是为了处理 NLJ 右表是分区表场景
// 下,避免扫描无效分区。
int ObGranuleIteratorOp::try_fetch_task(ObGranuleTaskInfo &info)
@ -260,22 +260,14 @@ int ObGranuleIteratorOp::try_fetch_task(ObGranuleTaskInfo &info)
LOG_WARN("the pump can not be null", K(ret));
} else {
if (is_rescan_) {
bool partition_pruned = false;
do {
partition_pruned = false;
if (rescan_task_idx_ >= rescan_tasks_.count()) {
ret = OB_ITER_END;
ret = get_next_task_pos(pos, taskset);
if (OB_SUCC(ret)) {
if (OB_FAIL(taskset->get_task_at_pos(info, pos))) {
LOG_WARN("get task info failed", K(ret));
} else {
taskset = rescan_taskset_;
pos = rescan_tasks_.at(rescan_task_idx_++);
if (ObGranuleUtil::enable_partition_pruning(MY_SPEC.gi_attri_flag_)
&& OB_FAIL(try_pruning_repart_partition(*taskset, pos, partition_pruned))) {
LOG_WARN("fail try prune partition", K(ret));
} else if (partition_pruned) {
// next task
}
info.task_id_ = worker_id_;
}
} while (OB_SUCC(ret) && partition_pruned);
}
} else {
const bool from_share_pool = !MY_SPEC.affinitize_ && !MY_SPEC.access_all_;
if (OB_FAIL(gi_task_pump->fetch_granule_task(taskset,
@ -291,8 +283,11 @@ int ObGranuleIteratorOp::try_fetch_task(ObGranuleTaskInfo &info)
} else if (NULL == taskset) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL taskset returned", K(ret));
} else if (OB_FAIL(rescan_tasks_.push_back(pos))) {
LOG_WARN("array push back failed", K(ret));
} else if (OB_FAIL(taskset->get_task_at_pos(info, pos))) {
LOG_WARN("get task info failed", K(ret));
} else if (FALSE_IT(info.task_id_ = worker_id_)) {
} else if (OB_FAIL(rescan_tasks_info_.insert_rescan_task(pos, info))) {
LOG_WARN("array push back failed", K(ret), K(info));
} else {
if (NULL == rescan_taskset_) {
rescan_taskset_ = taskset;
@ -303,16 +298,91 @@ int ObGranuleIteratorOp::try_fetch_task(ObGranuleTaskInfo &info)
}
}
}
if(OB_FAIL(ret)) {
} else if (OB_FAIL(taskset->get_task_at_pos(info, pos))) {
LOG_WARN("get task info failed", K(ret));
} else {
info.task_id_ = worker_id_;
}
return ret;
}
//GI has its own rescan
int ObGranuleIteratorOp::get_next_task_pos(int64_t &pos, const ObGITaskSet *&taskset)
{
int ret = OB_SUCCESS;
if (rescan_tasks_info_.use_opt_) {
taskset = rescan_taskset_;
if (rescan_task_idx_ > 0) {
ret = OB_ITER_END;
} else if (OB_FAIL(rescan_tasks_info_.rescan_tasks_map_.get_refactored(
ctx_.get_gi_pruning_info().get_part_id(), pos))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_ITER_END;
} else {
LOG_WARN("get tablet task pos failed", K(ret));
}
} else {
rescan_task_idx_++;
}
} else {
bool partition_pruned = false;
do {
partition_pruned = false;
if (rescan_task_idx_ >= rescan_tasks_info_.rescan_tasks_pos_.count()) {
ret = OB_ITER_END;
} else {
taskset = rescan_taskset_;
pos = rescan_tasks_info_.rescan_tasks_pos_.at(rescan_task_idx_++);
if (ObGranuleUtil::enable_partition_pruning(MY_SPEC.gi_attri_flag_)
&& OB_FAIL(try_pruning_repart_partition(*taskset, pos, partition_pruned))) {
LOG_WARN("fail try prune partition", K(ret));
} else if (partition_pruned) {
// next task
}
}
} while (OB_SUCC(ret) && partition_pruned);
}
return ret;
}
int ObGranuleIteratorOp::pw_get_next_task_pos(const common::ObIArray<int64_t> &op_ids)
{
int ret = OB_SUCCESS;
if (rescan_tasks_info_.use_opt_) {
if (rescan_task_idx_ > 0) {
ret = OB_ITER_END;
state_ = GI_END;
all_task_fetched_ = true;
} else if (OB_FAIL(rescan_tasks_info_.rescan_tasks_map_.get_refactored(
ctx_.get_gi_pruning_info().get_part_id(), rescan_task_idx_))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_ITER_END;
} else {
LOG_WARN("get tablet task pos failed", K(ret), K(ctx_.get_gi_pruning_info().get_part_id()));
}
}
} else {
bool partition_pruned = false;
int64_t repart_idx = MY_SPEC.repart_pruning_tsc_idx_;
if (OB_UNLIKELY(repart_idx < 0 || repart_idx >= op_ids.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected repart pruning tsc index", K(ret), K(repart_idx));
}
do {
if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) {
ret = OB_ITER_END;
state_ = GI_END;
all_task_fetched_ = true;
} else if (OB_UNLIKELY(rescan_task_idx_ + repart_idx >= pwj_rescan_task_infos_.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected pwj_rescan_task_infos_ count", K(ret), K(rescan_task_idx_),
K(repart_idx), K(pwj_rescan_task_infos_.count()), K(op_ids));
} else {
partition_pruned = repart_partition_pruned(pwj_rescan_task_infos_.at(rescan_task_idx_ + repart_idx));
if (partition_pruned) {
rescan_task_idx_ += op_ids.count();
}
}
} while (OB_SUCC(ret) && partition_pruned);
}
return ret;
}
int ObGranuleIteratorOp::rescan()
{
int ret = ObOperator::inner_rescan();
@ -325,7 +395,7 @@ int ObGranuleIteratorOp::rescan()
pump_version_ = pump_->get_pump_version();
is_rescan_ = false;
rescan_taskset_ = NULL;
rescan_tasks_.reset();
rescan_tasks_info_.reset();
all_task_fetched_ = false;
pwj_rescan_task_infos_.reset();
pruning_partition_ids_.reset();
@ -387,6 +457,8 @@ int ObGranuleIteratorOp::inner_open()
ObOperator *real_child = nullptr;
if (OB_FAIL(parameters_init())) {
LOG_WARN("parameters init failed", K(ret));
} else if (OB_FAIL(init_rescan_tasks_info())) {
LOG_WARN("init rescan tasks info failed", K(ret));
} else {
if (!MY_SPEC.full_partition_wise()) {
if (OB_FAIL(get_gi_task_consumer_node(this, real_child))) {
@ -692,7 +764,9 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool &partition_pruning)
if (OB_SUCC(ret)) {
if (is_rescan_) {
if (OB_FAIL(fetch_rescan_pw_task_infos(*op_ids_pointer, gi_prepare_map, gi_task_infos))) {
LOG_WARN("fail to fetch rescan pw task infos", K(ret));
if (OB_ITER_END != ret) {
LOG_WARN("fail to fetch rescan pw task infos", K(ret));
}
}
} else if (OB_FAIL(fetch_normal_pw_task_infos(*op_ids_pointer, gi_prepare_map, gi_task_infos))) {
if (OB_ITER_END != ret) {
@ -1047,28 +1121,11 @@ int ObGranuleIteratorOp::fetch_rescan_pw_task_infos(const common::ObIArray<int64
all_task_fetched_ = true;
} else {
if (ObGranuleUtil::enable_partition_pruning(MY_SPEC.gi_attri_flag_)) {
bool partition_pruned = false;
int64_t repart_idx = MY_SPEC.repart_pruning_tsc_idx_;
if (OB_UNLIKELY(repart_idx < 0 || repart_idx >= op_ids.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected repart pruning tsc index", K(ret), K(repart_idx));
}
do {
if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) {
ret = OB_ITER_END;
state_ = GI_END;
all_task_fetched_ = true;
} else if (OB_UNLIKELY(rescan_task_idx_ + repart_idx >= pwj_rescan_task_infos_.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected pwj_rescan_task_infos_ count", K(ret), K(rescan_task_idx_),
K(repart_idx), K(pwj_rescan_task_infos_.count()), K(op_ids));
} else {
partition_pruned = repart_partition_pruned(pwj_rescan_task_infos_.at(rescan_task_idx_ + repart_idx));
if (partition_pruned) {
rescan_task_idx_ += op_ids.count();
}
if (OB_FAIL(pw_get_next_task_pos(op_ids))) {
if (OB_ITER_END != ret) {
LOG_WARN("pw get next task pos failed", K(ret));
}
} while (OB_SUCC(ret) && partition_pruned);
}
}
ARRAY_FOREACH_X(op_ids, idx, cnt, OB_SUCC(ret)) {
// GI在向Map中塞任务的时候,需要尝试清理上一次塞入的任务
@ -1114,6 +1171,20 @@ int ObGranuleIteratorOp::fetch_normal_pw_task_infos(const common::ObIArray<int64
all_task_fetched_ = true;
}
} else {
if (rescan_tasks_info_.use_opt_) {
int64_t repart_idx = MY_SPEC.repart_pruning_tsc_idx_;
if (OB_UNLIKELY(repart_idx >= gi_task_infos.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected gi task infos count", K(repart_idx), K(gi_task_infos.count()),
K(gi_task_infos));
} else if (OB_FAIL(rescan_tasks_info_.rescan_tasks_map_.set_refactored(
gi_task_infos.at(repart_idx).tablet_loc_->tablet_id_.id(), pwj_rescan_task_infos_.count()))) {
LOG_WARN("set refactored failed", K(ret), KPC(gi_task_infos.at(repart_idx).tablet_loc_));
} else {
LOG_TRACE("set pw rescan task pos", K(spec_.id_), K(repart_idx), K(pwj_rescan_task_infos_.count()),
K(gi_task_infos.at(repart_idx).tablet_loc_->tablet_id_));
}
}
for (int i = 0; i < gi_task_infos.count() && OB_SUCC(ret); ++i) {
if (OB_FAIL(pwj_rescan_task_infos_.push_back(gi_task_infos.at(i)))) {
LOG_WARN("fail to rescan pwj task info", K(ret));
@ -1415,5 +1486,44 @@ int ObGranuleIteratorOp::do_parallel_runtime_filter_extract_query_range(
return ret;
}
int ObGranuleIteratorOp::RescanTasksInfo::insert_rescan_task(int64_t pos, const ObGranuleTaskInfo &info)
{
int ret = OB_SUCCESS;
if (use_opt_) {
ret = rescan_tasks_map_.set_refactored(info.tablet_loc_->tablet_id_.id(), pos);
} else {
ret = rescan_tasks_pos_.push_back(pos);
}
return ret;
}
int ObGranuleIteratorOp::init_rescan_tasks_info()
{
int ret = OB_SUCCESS;
rescan_tasks_info_.use_opt_ = ObGranuleUtil::enable_partition_pruning(MY_SPEC.gi_attri_flag_);
if (!rescan_tasks_info_.use_opt_) {
} else if (OB_ISNULL(pump_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null pump_", K(ret), K(spec_.id_));
} else if (OB_UNLIKELY(parallelism_ <= 0 || pump_->get_pump_args().count() < 1
|| pump_->get_pump_args().at(0).tablet_arrays_.count() < 1)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("unexpected argument", K(parallelism_), K(pump_->get_pump_args()));
} else {
int64_t tablet_cnt = pump_->get_pump_args().at(0).tablet_arrays_.count();
if (tablet_cnt < parallelism_) {
// no use optimization if parallelism_ is greater than tablet count.
rescan_tasks_info_.use_opt_ = false;
} else {
const ObMemAttr attr(MTL_ID(), "GIRescanTaskMap");
int64_t bucket_num = tablet_cnt / parallelism_ * 2;
if (OB_FAIL(rescan_tasks_info_.rescan_tasks_map_.create(bucket_num, attr))) {
LOG_WARN("init map failed", K(ret));
}
}
}
return ret;
}
} // end namespace sql
} // end namespace oceanbase

View File

@ -151,6 +151,27 @@ private:
GI_GET_NEXT_GRANULE_TASK,
GI_END,
};
class RescanTasksInfo
{
public:
RescanTasksInfo() : use_opt_(false) {}
void reset() {
rescan_tasks_pos_.reset();
rescan_tasks_map_.clear();
}
void destroy() {
rescan_tasks_pos_.reset();
rescan_tasks_map_.destroy();
}
int insert_rescan_task(int64_t pos, const ObGranuleTaskInfo &info);
// use opt means partition_pruning is enabled and pos of task of each tablet is recorded in rescan_tasks_map_.
bool use_opt_;
common::ObSEArray<int64_t, OB_MIN_PARALLEL_TASK_COUNT * 2> rescan_tasks_pos_;
// key is tablet_id, value is
// 1.non-pw: pos. call ObGITaskSet::get_task_at_pos(pos) to get ObGranuleTaskInfo.
// 2.pw: rescan_task_idx_. pwj_rescan_task_infos_[rescan_task_idx_] to get ObGranuleTaskInfo.
hash::ObHashMap<uint64_t, int64_t, common::hash::NoPthreadDefendMode> rescan_tasks_map_;
};
public:
ObGranuleIteratorOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input);
~ObGranuleIteratorOp() {}
@ -175,6 +196,8 @@ private:
// 非full partition wise获得task的方式
// TODO: jiangting.lk 重构下函数名字
int try_fetch_task(ObGranuleTaskInfo &info);
int get_next_task_pos(int64_t &pos, const ObGITaskSet *&taskset);
int pw_get_next_task_pos(const common::ObIArray<int64_t> &op_ids);
/**
* @brief
* full partition wise的模式下,通过op ids获得对应的task infos
@ -222,6 +245,7 @@ private:
int wait_partition_runtime_filter_ready(bool &partition_pruning);
int do_join_filter_partition_pruning(int64_t tablet_id, bool &partition_pruning);
int try_build_tablet2part_id_map();
int init_rescan_tasks_info();
//---end----
// for runtime filter extract query_range
@ -245,7 +269,7 @@ private:
bool all_task_fetched_;
bool is_rescan_;
const ObGITaskSet *rescan_taskset_ = NULL;
common::ObSEArray<int64_t, OB_MIN_PARALLEL_TASK_COUNT * 2> rescan_tasks_;
RescanTasksInfo rescan_tasks_info_;
int64_t rescan_task_idx_;
// full pwj场景下, 在执行过程中缓存住了自己的任务队列.
// 供GI rescan使用

View File

@ -64,6 +64,19 @@ int ObGITaskSet::get_task_at_pos(ObGranuleTaskInfo &info, const int64_t &pos) co
return ret;
}
int ObGITaskSet::get_task_tablet_id_at_pos(const int64_t &pos, uint64_t &tablet_id) const
{
int ret = OB_SUCCESS;
if (pos < 0 || pos >= gi_task_set_.count()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(pos));
} else {
int64_t cur_idx = gi_task_set_.at(pos).idx_;
tablet_id = gi_task_set_.at(pos).tablet_loc_->tablet_id_.id();
}
return ret;
}
int ObGITaskSet::get_next_gi_task_pos(int64_t &pos)
{
int ret = OB_SUCCESS;
@ -439,7 +452,9 @@ int ObGranulePump::fetch_pw_granule_by_worker_id(ObIArray<ObGranuleTaskInfo> &in
if (OB_FAIL(ret)) {
} else if (OB_FAIL(check_pw_end(end_tsc_count, op_ids.count(), infos.count()))) {
LOG_WARN("incorrect state", K(ret));
if (OB_ITER_END != ret) {
LOG_WARN("incorrect state", K(ret));
}
}
LOG_TRACE("get a new partition wise join gi tasks", K(infos), K(ret));
return ret;

View File

@ -153,6 +153,8 @@ public:
ObGITaskSet() : gi_task_set_(), cur_pos_(0) {}
TO_STRING_KV(K(gi_task_set_), K(cur_pos_));
int get_task_at_pos(ObGranuleTaskInfo &info, const int64_t &pos) const;
int get_task_tablet_id_at_pos(const int64_t &pos, uint64_t &tablet_id) const;
int get_next_gi_task_pos(int64_t &pos);
int get_next_gi_task(ObGranuleTaskInfo &info);
int assign(const ObGITaskSet &other);