[CP] fix bug insert_up core with sanity_binary

This commit is contained in:
yishenglanlingzui
2023-08-17 11:46:19 +00:00
committed by ob-robot
parent 16e059ba84
commit bbc291bf78
3 changed files with 102 additions and 95 deletions

View File

@ -5497,12 +5497,7 @@ int ObSql::get_first_batched_multi_stmt(ObPlanCacheCtx &pc_ctx, ObMultiStmtItem
int ret = OB_SUCCESS;
if (pc_ctx.sql_ctx_.is_do_insert_batch_opt()) {
// Restore the original SQL according to the first row of parameters
if (OB_FAIL(ObPlanCache::do_construct_sql(pc_ctx.allocator_,
pc_ctx,
*pc_ctx.insert_batch_opt_info_.multi_raw_params_.at(0),
pc_ctx.insert_batch_opt_info_.insert_params_count_,
pc_ctx.insert_batch_opt_info_.sql_delta_length_,
pc_ctx.fp_result_.pc_key_.name_))) {
if (OB_FAIL(ObPlanCache::restore_param_to_truncated_sql(pc_ctx))) {
LOG_WARN("fail to do construct sql",
K(ret), K(pc_ctx.fp_result_.pc_key_.name_), K(pc_ctx.insert_batch_opt_info_.new_reconstruct_sql_));
// if rebuild origin sql fail, this sql would rollback

View File

@ -721,11 +721,18 @@ int ObPlanCache::construct_fast_parser_result(common::ObIAllocator &allocator,
} else if (!can_do_batch_insert) {
LOG_INFO("can't do batch insert optimization", K(raw_sql));
// can't do batch insert
} else if (OB_FAIL(rebuild_raw_params(allocator,
pc_ctx,
fp_result,
batch_count))) {
LOG_WARN("fail to rebuild raw_param", K(ret), K(batch_count));
} else if (pc_ctx.insert_batch_opt_info_.multi_raw_params_.empty()) {
LOG_INFO("unexpected multi_raw_params, can't do batch insert opt, but not need to return error",
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected multi_raw_params, can't do batch insert opt, but not need to return error",
K(batch_count), K(first_truncated_sql), K(pc_ctx.raw_sql_), K(fp_result));
} else if (OB_ISNULL(pc_ctx.insert_batch_opt_info_.multi_raw_params_.at(0))) {
LOG_INFO("unexpected null ptr, can't do batch insert opt, but not need to return error",
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr, can't do batch insert opt, but not need to return error",
K(batch_count), K(first_truncated_sql), K(pc_ctx.raw_sql_), K(fp_result));
} else {
fp_result.raw_params_.reset();
@ -744,42 +751,69 @@ int ObPlanCache::construct_fast_parser_result(common::ObIAllocator &allocator,
return ret;
}
int ObPlanCache::do_construct_sql(common::ObIAllocator &allocator,
ObPlanCacheCtx &pc_ctx,
const ObIArray<ObPCParam *> &raw_params,
int64_t ins_params_count,
int64_t delta_length,
ObString &no_param_sql)
// For insert into t1 values(1,1),(2,2),(3,3); After parameterization,
// the SQL will become insert into t1 values(?,?),(?,?),(?,?);
// After inspection, it is found that insert multi-values ​​batch optimization can be done,
// and the SQL is truncated to insert into t1 values(?,?); If the SQL does not hit the plan from plan_cache,
// hard parsing is required, then insert into t1 values(?,?); Revert to insert into t1 values(1,1);
// This function is to complete the parameter reduction in parameterless SQL,
// so that the original SQL can be used as a parser later
// replace into statement and insert_up statement are same
int ObPlanCache::restore_param_to_truncated_sql(ObPlanCacheCtx &pc_ctx)
{
int ret = OB_SUCCESS;
char *buf = NULL;
int32_t pos = 0;
ObPCParam *pc_param = NULL;
int64_t idx = 0;
const ObIArray<ObPCParam *> *raw_params = nullptr;
int64_t buff_len = pc_ctx.raw_sql_.length();
if (OB_ISNULL(buf = (char *)allocator.alloc(pc_ctx.raw_sql_.length()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("buff is null", K(ret));
} else if (raw_params.count() < ins_params_count) {
int64_t ins_params_count = pc_ctx.insert_batch_opt_info_.insert_params_count_;
ObString &no_param_sql = pc_ctx.fp_result_.pc_key_.name_;
if (pc_ctx.insert_batch_opt_info_.multi_raw_params_.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected params count", K(ret), K(raw_params.count()), K(ins_params_count));
LOG_WARN("unexpected params count", K(ret), K(pc_ctx.insert_batch_opt_info_.multi_raw_params_.count()));
} else if (OB_ISNULL(raw_params = pc_ctx.insert_batch_opt_info_.multi_raw_params_.at(0))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret), K(raw_params));
} else if (OB_ISNULL(buf = (char *)pc_ctx.allocator_.alloc(pc_ctx.raw_sql_.length()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("buff is null", K(ret), K(pc_ctx.raw_sql_.length()));
} else if (raw_params->count() < ins_params_count) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected params count", K(ret), K(raw_params->count()), K(ins_params_count));
}
for (int64_t i = 0; OB_SUCC(ret) && i < raw_params.count(); i++) {
if (OB_ISNULL(pc_param = raw_params.at(i))) {
for (int64_t i = 0; OB_SUCC(ret) && i < raw_params->count(); i++) {
ObPCParam *pc_param = nullptr;
if (OB_ISNULL(pc_param = raw_params->at(i))) {
ret = OB_ERR_UNEXPECTED;
} else if (i < ins_params_count) {
LOG_WARN("unexpected null ptr", K(ret), K(i), K(raw_params));
} else {
int32_t len = (int32_t)pc_param->node_->pos_ - idx;
LOG_TRACE("print raw_params", K(i), K(buff_len), K(len), K(idx), K(raw_params.at(i)->node_->pos_),
K(ObString(raw_params.at(i)->node_->text_len_, raw_params.at(i)->node_->raw_text_)));
if (len > buff_len - pos) {
LOG_TRACE("print raw_params", K(i), K(buff_len), K(len), K(idx), K(pc_param->node_->pos_),
K(ObString(pc_param->node_->text_len_, pc_param->node_->raw_text_)));
if (len == 0) {
// insert into t1 values(2-1,2-2); becomes insert into t1 values(??,??); after parameterization
// So this scenario len == 0 is needed
if (pc_param->node_->text_len_ > buff_len - pos) {
ret = OB_BUF_NOT_ENOUGH;
} else if (len == 0) {
LOG_WARN("unexpected len", K(ret), K(i), K(buff_len), K(pc_param->node_->text_len_), K(pos), K(no_param_sql));
} else {
MEMCPY(buf + pos, pc_param->node_->raw_text_, pc_param->node_->text_len_);
pos += (int32_t)pc_param->node_->text_len_;
idx = (int32_t)pc_param->node_->pos_ + 1;
}
} else if (len > 0) {
//copy text
if (len > buff_len - pos) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("unexpected len", K(ret), K(i), K(buff_len), K(idx), K(pos), K(no_param_sql));
} else if (pc_param->node_->text_len_ > (buff_len - pos - len)) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("unexpected len", K(ret), K(i), K(buff_len), K(idx), K(pc_param->node_->text_len_), K(pos), K(no_param_sql));
} else {
// copy sql text
// insert into t1 values(?,?);
// first times, it copy 'insert into t1 values('
MEMCPY(buf + pos, no_param_sql.ptr() + idx, len);
idx = (int32_t)pc_param->node_->pos_ + 1;
pos += len;
@ -787,23 +821,6 @@ int ObPlanCache::do_construct_sql(common::ObIAllocator &allocator,
MEMCPY(buf + pos, pc_param->node_->raw_text_, pc_param->node_->text_len_);
pos += (int32_t)pc_param->node_->text_len_;
}
} else {
int32_t len = (int32_t)pc_param->node_->pos_ - delta_length - idx;
LOG_TRACE("print delta_length", K(i), K(pc_param->node_->pos_), K(delta_length), K(idx));
if (len > buff_len - pos) {
ret = OB_BUF_NOT_ENOUGH;
} else if (len == 0) {
MEMCPY(buf + pos, pc_param->node_->raw_text_, pc_param->node_->text_len_);
pos += (int32_t)pc_param->node_->text_len_;
idx = (int32_t)pc_param->node_->pos_ + 1;
} else if (len > 0) {
//copy text
MEMCPY(buf + pos, no_param_sql.ptr() + idx, len);
idx = (int32_t)pc_param->node_->pos_ - delta_length + 1;
pos += len;
//copy raw param
MEMCPY(buf + pos, pc_param->node_->raw_text_, pc_param->node_->text_len_);
pos += (int32_t)pc_param->node_->text_len_;
}
}
}
@ -812,6 +829,7 @@ int ObPlanCache::do_construct_sql(common::ObIAllocator &allocator,
int32_t len = no_param_sql.length() - idx;
if (len > buff_len - pos) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("unexpected len", K(ret), K(buff_len), K(pos), K(idx), K(no_param_sql.length()), K(no_param_sql));
} else if (len > 0) {
MEMCPY(buf + pos, no_param_sql.ptr() + idx, len);
idx += len;
@ -828,19 +846,28 @@ int ObPlanCache::do_construct_sql(common::ObIAllocator &allocator,
return ret;
}
// For insert into t1 values(1,1),(2,2); after finishing fast_parser,
// all the parameters will be extracted,and the parameter array becomes [1, 1, 2, 2]
// But we will truncate the SQL to insert into t1 values(?,?);
// so the parameter array needs to be changed to a two-dimensional matrix like {[1, 1], [2, 2]}
// This function is to complete the conversion. At the same time, for the insert_up statement,
// insert into t1 values(1,1),(2,2) on duplicate key update c1 = 3, c2 = 4;
// After the SQL is truncated, the two parameters in the update part correspond to the pos_ in the truncated SQL
// that needs to be subtracted from the truncated part
int ObPlanCache::rebuild_raw_params(common::ObIAllocator &allocator,
ObPlanCacheCtx &pc_ctx,
ObFastParserResult &fp_result,
int64_t row_count,
int64_t insert_param_count,
int64_t upd_param_count)
int64_t row_count)
{
int ret = OB_SUCCESS;
void *array_ptr = nullptr;
int64_t params_idx = 0;
ObSEArray<ObPCParam *, 8> update_raw_params;
int64_t insert_param_count = pc_ctx.insert_batch_opt_info_.insert_params_count_;
int64_t upd_param_count = pc_ctx.insert_batch_opt_info_.update_params_count_;
int64_t one_row_params_cnt = insert_param_count + upd_param_count;
if ((row_count * insert_param_count + upd_param_count) != fp_result.raw_params_.count()) {
int64_t upd_start_idx = row_count * insert_param_count;
int64_t sql_delta_length = pc_ctx.insert_batch_opt_info_.sql_delta_length_;
if (((row_count * insert_param_count) + upd_param_count) != fp_result.raw_params_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected raw_params", K(ret),
K(row_count), K(insert_param_count), K(upd_param_count), K(fp_result.raw_params_.count()));
@ -848,17 +875,18 @@ int ObPlanCache::rebuild_raw_params(common::ObIAllocator &allocator,
pc_ctx.insert_batch_opt_info_.multi_raw_params_.set_capacity(row_count);
}
int64_t upd_start_idx = row_count * insert_param_count;
for (int64_t i = upd_start_idx; OB_SUCC(ret) && i < fp_result.raw_params_.count(); i++) {
if (OB_FAIL(update_raw_params.push_back(fp_result.raw_params_.at(i)))) {
LOG_WARN("fail to push back raw_param", K(ret), K(i));
}
}
if (OB_SUCC(ret)) {
if (update_raw_params.count() != upd_param_count) {
// As sql: insert into t1 values(1,1),(2,2) on duplicate key update c1 = 3, c2 = 4;
// After the SQL is truncated, the two parameters in the update part correspond to the pos_ in the truncated SQL
// that needs to be subtracted from the truncated part, sql_delta_length is the length of truncated part.
ObPCParam *pc_param = fp_result.raw_params_.at(i);
if (OB_ISNULL(pc_param)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected update_raw_params count", K(ret), K(upd_param_count), K(upd_start_idx));
LOG_WARN("unexpected null ptr", K(ret), K(i));
} else if (FALSE_IT(pc_param->node_->pos_ = pc_param->node_->pos_ - sql_delta_length)) {
// For the parameters of the update part, pos_ needs to subtract the length of the truncated part
} else if (OB_FAIL(update_raw_params.push_back(pc_param))) {
LOG_WARN("fail to push back raw_param", K(ret), K(i));
}
}
@ -953,31 +981,22 @@ int ObPlanCache::check_can_do_insert_opt(common::ObIAllocator &allocator,
delta_length,
batch_count))) {
LOG_WARN("fail to parser insert string", K(ret), K(fp_result.pc_key_.name_));
} else if (!can_do_batch || ins_params_count <= 0 || batch_count <= 1) {
} else if (!can_do_batch || ins_params_count <= 0) {
can_do_batch = false;
// Only the insert ... values ​​... statement will print this,after trying to do insert batch optimization failure
LOG_INFO("can not do batch insert opt", K(ret), K(can_do_batch), K(upd_params_count),
K(ins_params_count), K(batch_count), K(pc_ctx.raw_sql_));
} else if (batch_count <= 1) {
can_do_batch = false;
} else if (upd_params_count > 0 && delta_length <= 0) {
// Only the insert ... values ​​... on duplicate key update ... statement will print this log
// after trying to do insert batch optimization failure
can_do_batch = false;
LOG_INFO("can not do batch insert opt", K(ret), K(can_do_batch), K(ins_params_count), K(batch_count), K(pc_ctx.raw_sql_));
} else if (OB_FAIL(rebuild_raw_params(allocator,
pc_ctx,
fp_result,
batch_count,
ins_params_count,
upd_params_count))) {
LOG_WARN("fail to rebuild raw_param", K(ret));
} else if (pc_ctx.insert_batch_opt_info_.multi_raw_params_.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected multi_raw_params", K(ret));
} else if (OB_ISNULL(pc_ctx.insert_batch_opt_info_.multi_raw_params_.at(0))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret));
} else {
pc_ctx.insert_batch_opt_info_.insert_params_count_ = ins_params_count;
pc_ctx.insert_batch_opt_info_.update_params_count_ = upd_params_count;
pc_ctx.insert_batch_opt_info_.sql_delta_length_ = delta_length;
// do nothing
}
}

View File

@ -246,16 +246,9 @@ public:
static int rebuild_raw_params(common::ObIAllocator &allocator,
ObPlanCacheCtx &pc_ctx,
ObFastParserResult &fp_result,
int64_t row_count,
int64_t insert_param_count,
int64_t upd_param_count);
int64_t row_count);
static int do_construct_sql(common::ObIAllocator &allocator,
ObPlanCacheCtx &pc_ctx,
const ObIArray<ObPCParam *> &raw_params,
int64_t ins_params_count,
int64_t delta_length,
ObString &no_param_sql);
static int restore_param_to_truncated_sql(ObPlanCacheCtx &pc_ctx);
static bool can_do_insert_batch_opt(ObPlanCacheCtx &pc_ctx);