[CP] fix batch rescan report -4016

This commit is contained in:
leslieyuchen
2023-10-18 12:39:28 +00:00
committed by ob-robot
parent b4ef37cba2
commit bcd29d1bc9
2 changed files with 113 additions and 61 deletions

View File

@ -604,10 +604,6 @@ ObTableScanOp::ObTableScanOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOp
column_checksum_(),
scan_task_id_(0),
report_checksum_(false),
range_buffers_(NULL),
range_buffer_idx_(0),
group_size_(0),
max_group_size_(0),
in_rescan_(false),
global_index_lookup_op_(NULL),
spat_index_()
@ -780,8 +776,8 @@ int ObTableScanOp::prepare_all_das_tasks()
if (MY_SPEC.batch_scan_flag_) {
if (OB_SUCC(ret)) {
if (!tsc_rtdef_.bnlj_params_.empty()) {
group_size_ = tsc_rtdef_.bnlj_params_.at(0).second->count_;
if (OB_UNLIKELY(group_size_ > max_group_size_)) {
tsc_rtdef_.group_size_ = tsc_rtdef_.bnlj_params_.at(0).gr_param_->count_;
if (OB_UNLIKELY(tsc_rtdef_.group_size_ > tsc_rtdef_.max_group_size_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("The amount of data exceeds the pre allocated memory", K(ret));
}
@ -797,12 +793,10 @@ int ObTableScanOp::prepare_all_das_tasks()
LOG_WARN("prepare das task failed", K(ret));
}
} else {
int64_t group_size = MY_SPEC.batch_scan_flag_ ? group_size_ : 1;
int64_t group_size = MY_SPEC.batch_scan_flag_ ? tsc_rtdef_.group_size_ : 1;
GroupRescanParamGuard grp_guard(tsc_rtdef_, GET_PHY_PLAN_CTX(ctx_)->get_param_store_for_update());
for (int64_t i = 0; OB_SUCC(ret) && i < group_size; ++i) {
if (MY_SPEC.batch_scan_flag_) {
replace_bnlj_param(i);
range_buffer_idx_ = i;
}
grp_guard.switch_group_rescan_param(i);
if (OB_FAIL(prepare_single_scan_range(i))) {
LOG_WARN("prepare single scan range failed", K(ret));
} else if (OB_FAIL(prepare_das_task())) {
@ -815,8 +809,8 @@ int ObTableScanOp::prepare_all_das_tasks()
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(init_das_group_range(0, group_size_))) {
LOG_WARN("set group range failed", K(ret), K_(group_size));
if (OB_FAIL(init_das_group_range(0, tsc_rtdef_.group_size_))) {
LOG_WARN("set group range failed", K(ret), K_(tsc_rtdef_.group_size));
}
}
return ret;
@ -1023,8 +1017,8 @@ int ObTableScanOp::prepare_batch_scan_range()
int64_t batch_size = 0;
if (OB_SUCC(ret)) {
if (!tsc_rtdef_.bnlj_params_.empty()) {
group_size_ = tsc_rtdef_.bnlj_params_.at(0).second->count_;
if (OB_UNLIKELY(group_size_ > max_group_size_)) {
tsc_rtdef_.group_size_ = tsc_rtdef_.bnlj_params_.at(0).gr_param_->count_;
if (OB_UNLIKELY(tsc_rtdef_.group_size_ > tsc_rtdef_.max_group_size_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("The amount of data exceeds the pre allocated memory", K(ret));
}
@ -1033,10 +1027,10 @@ int ObTableScanOp::prepare_batch_scan_range()
LOG_WARN("batch nlj params is empry", K(ret));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < group_size_; ++i) {
GroupRescanParamGuard grp_guard(tsc_rtdef_, GET_PHY_PLAN_CTX(ctx_)->get_param_store_for_update());
for (int64_t i = 0; OB_SUCC(ret) && i < tsc_rtdef_.group_size_; ++i) {
//replace real param to param store to extract scan range
replace_bnlj_param(i);
range_buffer_idx_ = i;
grp_guard.switch_group_rescan_param(i);
LOG_DEBUG("replace bnlj param to extract range", K(plan_ctx->get_param_store()));
if (OB_FAIL(prepare_single_scan_range(i))) {
LOG_WARN("prepare single scan range failed", K(ret));
@ -1060,7 +1054,7 @@ int ObTableScanOp::build_bnlj_params()
const ObObjParam &bnlj_param = plan_ctx->get_param_store().at(param_idx);
if (bnlj_param.is_ext_sql_array()) {
ObSqlArrayObj *array_obj = reinterpret_cast<ObSqlArrayObj*>(bnlj_param.get_ext());
OZ(tsc_rtdef_.bnlj_params_.push_back(BNLJParamInfo(param_idx, array_obj)));
OZ(tsc_rtdef_.bnlj_params_.push_back(GroupRescanParamInfo(param_idx, array_obj)));
}
}
if (OB_SUCC(ret) && tsc_rtdef_.bnlj_params_.empty()) {
@ -1086,8 +1080,8 @@ int ObTableScanOp::prepare_single_scan_range(int64_t group_idx)
} else if (is_same_type && MY_CTDEF.pre_query_range_.get_is_equal_and()) {
int64_t column_count = MY_CTDEF.pre_query_range_.get_column_count();
size_t range_size = sizeof(ObNewRange) + sizeof(ObObj) * column_count * 2;
void *range_buffers = static_cast<char*>(range_buffers_) + range_buffer_idx_ * range_size;
if (range_buffer_idx_ < 0 || range_buffer_idx_ >= max_group_size_) {
void *range_buffers = static_cast<char*>(tsc_rtdef_.range_buffers_) + tsc_rtdef_.range_buffer_idx_ * range_size;
if (tsc_rtdef_.range_buffer_idx_ < 0 || tsc_rtdef_.range_buffer_idx_ >= tsc_rtdef_.max_group_size_) {
ret = OB_ERROR_OUT_OF_RANGE;
LOG_WARN("get wrong offset of range_buffers_", K(ret));
} else if (OB_FAIL(ObSQLUtils::extract_equal_pre_query_range(
@ -1201,19 +1195,6 @@ int ObTableScanOp::single_equal_scan_check_type(const ParamStore &param_store, b
return ret;
}
void ObTableScanOp::replace_bnlj_param(int64_t batch_idx)
{
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
//replace real param to param store to extract scan range
for (int64_t i = 0; i < tsc_rtdef_.bnlj_params_.count(); ++i) {
ObSqlArrayObj *array_obj = tsc_rtdef_.bnlj_params_.at(i).second;
int64_t param_idx = tsc_rtdef_.bnlj_params_.at(i).first;
ObObjParam &dst_param = plan_ctx->get_param_store_for_update().at(param_idx);
dst_param = array_obj->data_[batch_idx];
dst_param.set_param_meta();
}
}
int ObTableScanOp::init_converter()
{
int ret = OB_SUCCESS;
@ -1302,23 +1283,23 @@ int ObTableScanOp::inner_open()
if (OB_SUCC(ret)) {
// here need add plan batch_size, because in vectorized execution,
// left batch may greater than OB_MAX_BULK_JOIN_ROWS
max_group_size_ = OB_MAX_BULK_JOIN_ROWS + MY_SPEC.plan_->get_batch_size();
tsc_rtdef_.max_group_size_ = OB_MAX_BULK_JOIN_ROWS + MY_SPEC.plan_->get_batch_size();
if (MY_CTDEF.pre_query_range_.get_is_equal_and()) {
int64_t column_count = MY_CTDEF.pre_query_range_.get_column_count();
size_t range_size = sizeof(ObNewRange) + sizeof(ObObj) * column_count * 2;
if (!MY_SPEC.batch_scan_flag_) {
range_buffers_ = ctx_.get_allocator().alloc(range_size);
tsc_rtdef_.range_buffers_ = ctx_.get_allocator().alloc(range_size);
} else {
range_buffers_ = ctx_.get_allocator().alloc(max_group_size_ * range_size);
tsc_rtdef_.range_buffers_ = ctx_.get_allocator().alloc(tsc_rtdef_.max_group_size_ * range_size);
}
if (OB_ISNULL(range_buffers_)) {
if (OB_ISNULL(tsc_rtdef_.range_buffers_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(range_size), K(range_buffers_));
LOG_WARN("allocate memory failed", K(ret), K(range_size), K(tsc_rtdef_.range_buffers_));
} else if (!MY_SPEC.batch_scan_flag_) {
ObNewRange *key_range = new(range_buffers_) ObNewRange();
ObNewRange *key_range = new(tsc_rtdef_.range_buffers_) ObNewRange();
} else {
for (int64_t i = 0; i < max_group_size_; ++i) {
char *range_buffers_off = static_cast<char*>(range_buffers_) + i * range_size;
for (int64_t i = 0; i < tsc_rtdef_.max_group_size_; ++i) {
char *range_buffers_off = static_cast<char*>(tsc_rtdef_.range_buffers_) + i * range_size;
ObNewRange *key_range = new(range_buffers_off) ObNewRange();
}
}
@ -1546,7 +1527,7 @@ int ObTableScanOp::inner_rescan_for_tsc()
// Therefore, we need to get and save bnlj parameters here or they will be
// replaced by NLJ.
LOG_WARN("build batch nlj params failed", KR(ret));
} else if (!need_fetch_batch_result()) {
} else if (!need_real_rescan()) {
ret = set_batch_iter(ctx_.get_das_ctx().jump_read_group_id_);
} else {
if (is_virtual_table(MY_SPEC.ref_table_id_)
@ -1614,8 +1595,8 @@ int ObTableScanOp::local_iter_rescan()
if (OB_SUCC(ret)) {
if (OB_FAIL(cherry_pick_range_by_tablet_id(scan_op))) {
LOG_WARN("prune query range by partition id failed", K(ret));
} else if (OB_FAIL(init_das_group_range(0, group_size_))) {
LOG_WARN("set group range failed", K(ret), K_(group_size));
} else if (OB_FAIL(init_das_group_range(0, tsc_rtdef_.group_size_))) {
LOG_WARN("set group range failed", K(ret), K_(tsc_rtdef_.group_size));
} else if (OB_FAIL(MTL(ObDataAccessService*)->rescan_das_task(das_ref_, *scan_op))) {
LOG_WARN("rescan das task failed", K(ret));
}
@ -1666,14 +1647,14 @@ int ObTableScanOp::switch_iterator()
return OB_NOT_SUPPORTED;
}
bool ObTableScanOp::need_fetch_batch_result()
bool ObTableScanOp::need_real_rescan()
{
bool bret = false;
if (tsc_rtdef_.bnlj_params_.empty()) {
if (!MY_SPEC.batch_scan_flag_) {
bret = true;
} else {
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
int64_t param_idx = tsc_rtdef_.bnlj_params_.at(0).first;
int64_t param_idx = tsc_rtdef_.bnlj_params_.at(0).param_idx_;
//param store has been inited by nlj, to fetch next batch result
bret = plan_ctx->get_param_store().at(param_idx).is_ext_sql_array();
}
@ -2236,7 +2217,7 @@ int ObTableScanOp::cherry_pick_range_by_tablet_id(ObDASScanOp *scan_op)
}
}
if (OB_SUCC(ret)) {
LOG_DEBUG("range after pruning", K(input_ranges), K(scan_ranges), K_(group_size),
LOG_DEBUG("range after pruning", K(input_ranges), K(scan_ranges), K_(tsc_rtdef_.group_size),
"tablet_id", scan_op->get_tablet_id(),
K(input_ss_ranges), K(ss_ranges));
}