add for insert values statement split&rewrite batch optimization

This commit is contained in:
yishenglanlingzui
2023-08-14 15:12:40 +00:00
committed by ob-robot
parent 15934d24ac
commit 9b3f07d4ad
37 changed files with 1653 additions and 290 deletions

View File

@ -637,17 +637,294 @@ int ObPlanCache::construct_fast_parser_result(common::ObIAllocator &allocator,
FPContext fp_ctx(conn_coll);
fp_ctx.enable_batched_multi_stmt_ = pc_ctx.sql_ctx_.handle_batched_multi_stmt();
fp_ctx.sql_mode_ = sql_mode;
bool can_do_batch_insert = false;
ObString first_truncated_sql;
int64_t batch_count = 0;
if (OB_FAIL(ObSqlParameterization::fast_parser(allocator,
fp_ctx,
raw_sql,
fp_result))) {
LOG_WARN("failed to fast parser", K(ret), K(sql_mode), K(pc_ctx.raw_sql_));
} else { /*do nothing*/ }
} else if (OB_FAIL(check_can_do_insert_opt(allocator,
pc_ctx,
fp_result,
can_do_batch_insert,
batch_count,
first_truncated_sql))) {
LOG_WARN("fail to do insert optimization", K(ret));
} else if (!can_do_batch_insert) {
LOG_INFO("can't do batch insert optimization", K(raw_sql));
// can't do batch insert
} else if (pc_ctx.insert_batch_opt_info_.multi_raw_params_.empty()) {
LOG_INFO("unexpected multi_raw_params, can't do batch insert opt, but not need to return error",
K(batch_count), K(first_truncated_sql), K(pc_ctx.raw_sql_), K(fp_result));
} else if (OB_ISNULL(pc_ctx.insert_batch_opt_info_.multi_raw_params_.at(0))) {
LOG_INFO("unexpected null ptr, can't do batch insert opt, but not need to return error",
K(batch_count), K(first_truncated_sql), K(pc_ctx.raw_sql_), K(fp_result));
} else {
fp_result.raw_params_.reset();
fp_result.raw_params_.set_allocator(&allocator);
fp_result.raw_params_.set_capacity(pc_ctx.insert_batch_opt_info_.multi_raw_params_.at(0)->count());
if (OB_FAIL(fp_result.raw_params_.assign(*pc_ctx.insert_batch_opt_info_.multi_raw_params_.at(0)))) {
LOG_WARN("fail to assign raw_param", K(ret));
} else {
pc_ctx.sql_ctx_.set_is_do_insert_batch_opt(batch_count);
fp_result.pc_key_.name_.assign_ptr(first_truncated_sql.ptr(), first_truncated_sql.length());
LOG_DEBUG("print new fp_result.pc_key_.name_", K(fp_result.pc_key_.name_));
}
}
}
}
return ret;
}
int ObPlanCache::do_construct_sql(common::ObIAllocator &allocator,
ObPlanCacheCtx &pc_ctx,
const ObIArray<ObPCParam *> &raw_params,
int64_t ins_params_count,
int64_t delta_length,
ObString &no_param_sql)
{
int ret = OB_SUCCESS;
char *buf = NULL;
int32_t pos = 0;
ObPCParam *pc_param = NULL;
int64_t idx = 0;
int64_t buff_len = pc_ctx.raw_sql_.length();
if (OB_ISNULL(buf = (char *)allocator.alloc(pc_ctx.raw_sql_.length()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("buff is null", K(ret));
} else if (raw_params.count() < ins_params_count) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected params count", K(ret), K(raw_params.count()), K(ins_params_count));
}
for (int64_t i = 0; OB_SUCC(ret) && i < raw_params.count(); i++) {
if (OB_ISNULL(pc_param = raw_params.at(i))) {
ret = OB_ERR_UNEXPECTED;
} else if (i < ins_params_count) {
int32_t len = (int32_t)pc_param->node_->pos_ - idx;
LOG_TRACE("print raw_params", K(i), K(buff_len), K(len), K(idx), K(raw_params.at(i)->node_->pos_),
K(ObString(raw_params.at(i)->node_->text_len_, raw_params.at(i)->node_->raw_text_)));
if (len > buff_len - pos) {
ret = OB_BUF_NOT_ENOUGH;
} else if (len == 0) {
MEMCPY(buf + pos, pc_param->node_->raw_text_, pc_param->node_->text_len_);
pos += (int32_t)pc_param->node_->text_len_;
idx = (int32_t)pc_param->node_->pos_ + 1;
} else if (len > 0) {
//copy text
MEMCPY(buf + pos, no_param_sql.ptr() + idx, len);
idx = (int32_t)pc_param->node_->pos_ + 1;
pos += len;
//copy raw param
MEMCPY(buf + pos, pc_param->node_->raw_text_, pc_param->node_->text_len_);
pos += (int32_t)pc_param->node_->text_len_;
}
} else {
int32_t len = (int32_t)pc_param->node_->pos_ - delta_length - idx;
LOG_TRACE("print delta_length", K(i), K(pc_param->node_->pos_), K(delta_length), K(idx));
if (len > buff_len - pos) {
ret = OB_BUF_NOT_ENOUGH;
} else if (len == 0) {
MEMCPY(buf + pos, pc_param->node_->raw_text_, pc_param->node_->text_len_);
pos += (int32_t)pc_param->node_->text_len_;
idx = (int32_t)pc_param->node_->pos_ + 1;
} else if (len > 0) {
//copy text
MEMCPY(buf + pos, no_param_sql.ptr() + idx, len);
idx = (int32_t)pc_param->node_->pos_ - delta_length + 1;
pos += len;
//copy raw param
MEMCPY(buf + pos, pc_param->node_->raw_text_, pc_param->node_->text_len_);
pos += (int32_t)pc_param->node_->text_len_;
}
}
}
if (OB_SUCCESS == ret) {
int32_t len = no_param_sql.length() - idx;
if (len > buff_len - pos) {
ret = OB_BUF_NOT_ENOUGH;
} else if (len > 0) {
MEMCPY(buf + pos, no_param_sql.ptr() + idx, len);
idx += len;
pos += len;
}
}
if (OB_SUCC(ret)) {
buf[pos] = ';';
pos++;
pc_ctx.insert_batch_opt_info_.new_reconstruct_sql_.assign_ptr(buf, pos);
LOG_TRACE("print new_truncated_sql", K(pc_ctx.insert_batch_opt_info_.new_reconstruct_sql_), K(pos));
}
return ret;
}
int ObPlanCache::rebuild_raw_params(common::ObIAllocator &allocator,
ObPlanCacheCtx &pc_ctx,
ObFastParserResult &fp_result,
int64_t row_count,
int64_t insert_param_count,
int64_t upd_param_count)
{
int ret = OB_SUCCESS;
void *array_ptr = nullptr;
int64_t params_idx = 0;
ObSEArray<ObPCParam *, 8> update_raw_params;
int64_t one_row_params_cnt = insert_param_count + upd_param_count;
if ((row_count * insert_param_count + upd_param_count) != fp_result.raw_params_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected raw_params", K(ret),
K(row_count), K(insert_param_count), K(upd_param_count), K(fp_result.raw_params_.count()));
} else {
pc_ctx.insert_batch_opt_info_.multi_raw_params_.set_capacity(row_count);
}
int64_t upd_start_idx = row_count * insert_param_count;
for (int64_t i = upd_start_idx; OB_SUCC(ret) && i < fp_result.raw_params_.count(); i++) {
if (OB_FAIL(update_raw_params.push_back(fp_result.raw_params_.at(i)))) {
LOG_WARN("fail to push back raw_param", K(ret), K(i));
}
}
if (OB_SUCC(ret)) {
if (update_raw_params.count() != upd_param_count) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected update_raw_params count", K(ret), K(upd_param_count), K(upd_start_idx));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < row_count; i++) {
void *buf = nullptr;
ObRawParams *params_array = nullptr;
if (OB_ISNULL(buf = allocator.alloc(sizeof(ObRawParams)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", K(ret), K(sizeof(ObRawParams)));
} else {
params_array = new(buf) ObRawParams(allocator);
params_array->set_capacity(one_row_params_cnt);
}
for (int64_t j = 0; OB_SUCC(ret) && j < insert_param_count; j++) {
if (params_idx >= fp_result.raw_params_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected params_idx", K(ret), K(i), K(j), K(params_idx), K(fp_result.raw_params_));
} else if (OB_FAIL(params_array->push_back(fp_result.raw_params_.at(params_idx)))) {
LOG_WARN("fail to push back", K(ret), K(i), K(j), K(params_idx));
} else {
params_idx++;
}
}
if (OB_SUCC(ret)) {
if (0 != upd_param_count && OB_FAIL(append(*params_array, update_raw_params))) {
LOG_WARN("fail to append update raw params", K(ret));
} else if (OB_FAIL(pc_ctx.insert_batch_opt_info_.multi_raw_params_.push_back(params_array))) {
LOG_WARN("fail to push params array", K(ret));
}
}
}
return ret;
}
bool ObPlanCache::can_do_insert_batch_opt(ObPlanCacheCtx &pc_ctx)
{
bool bret = false;
ObSQLSessionInfo *session_info = nullptr;
if (OB_NOT_NULL(session_info = pc_ctx.sql_ctx_.session_info_)) {
if (!pc_ctx.sql_ctx_.is_batch_params_execute() &&
GCONF._sql_insert_multi_values_split_opt &&
!pc_ctx.sql_ctx_.get_enable_user_defined_rewrite() &&
!session_info->is_inner() &&
OB_BATCHED_MULTI_STMT_ROLLBACK != session_info->get_retry_info().get_last_query_retry_err()) {
bret = true;
} else {
LOG_TRACE("can't do insert batch optimization",
"is_arraybinding", pc_ctx.sql_ctx_.is_batch_params_execute(),
"is_open_switch", GCONF._sql_insert_multi_values_split_opt,
"is_inner_sql", session_info->is_inner(),
"udr", pc_ctx.sql_ctx_.get_enable_user_defined_rewrite(),
"last_ret", session_info->get_retry_info().get_last_query_retry_err(),
"curr_sql", pc_ctx.raw_sql_);
}
}
return bret;
}
int ObPlanCache::check_can_do_insert_opt(common::ObIAllocator &allocator,
ObPlanCacheCtx &pc_ctx,
ObFastParserResult &fp_result,
bool &can_do_batch,
int64_t &batch_count,
ObString &first_truncated_sql)
{
int ret = OB_SUCCESS;
can_do_batch = false;
batch_count = 0;
if (fp_result.values_token_pos_ != 0 &&
can_do_insert_batch_opt(pc_ctx)) {
char *new_param_sql = nullptr;
int64_t new_param_sql_len = 0;
int64_t ins_params_count = 0;
int64_t upd_params_count = 0;
int64_t delta_length = 0;
ObSQLMode sql_mode = pc_ctx.sql_ctx_.session_info_->get_sql_mode();
ObCollationType conn_coll = pc_ctx.sql_ctx_.session_info_->get_local_collation_connection();
FPContext fp_ctx(conn_coll);
fp_ctx.enable_batched_multi_stmt_ = pc_ctx.sql_ctx_.handle_batched_multi_stmt();
fp_ctx.sql_mode_ = sql_mode;
ObFastParserMysql fp(allocator, fp_ctx);
if (OB_FAIL(fp.parser_insert_str(allocator,
fp_result.values_token_pos_,
fp_result.pc_key_.name_,
first_truncated_sql,
can_do_batch,
ins_params_count,
upd_params_count,
delta_length,
batch_count))) {
LOG_WARN("fail to parser insert string", K(ret), K(fp_result.pc_key_.name_));
} else if (!can_do_batch || ins_params_count <= 0 || batch_count <= 1) {
can_do_batch = false;
LOG_INFO("can not do batch insert opt", K(ret), K(can_do_batch), K(upd_params_count),
K(ins_params_count), K(batch_count), K(pc_ctx.raw_sql_));
} else if (upd_params_count > 0 && delta_length <= 0) {
can_do_batch = false;
LOG_INFO("can not do batch insert opt", K(ret), K(can_do_batch), K(ins_params_count), K(batch_count), K(pc_ctx.raw_sql_));
} else if (OB_FAIL(rebuild_raw_params(allocator,
pc_ctx,
fp_result,
batch_count,
ins_params_count,
upd_params_count))) {
LOG_WARN("fail to rebuild raw_param", K(ret));
} else if (pc_ctx.insert_batch_opt_info_.multi_raw_params_.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected multi_raw_params", K(ret));
} else if (OB_ISNULL(pc_ctx.insert_batch_opt_info_.multi_raw_params_.at(0))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret));
} else {
pc_ctx.insert_batch_opt_info_.insert_params_count_ = ins_params_count;
pc_ctx.insert_batch_opt_info_.update_params_count_ = upd_params_count;
pc_ctx.insert_batch_opt_info_.sql_delta_length_ = delta_length;
// do nothing
}
}
if (ret != OB_SUCCESS) {
// 这里边的无论什么报错,都可以被吞掉,只是报错后就不能再做batch优化
can_do_batch = false;
LOG_WARN("can't do insert batch optimization, cover the error code by design", K(ret), K(pc_ctx.raw_sql_));
ret = OB_SUCCESS;
}
return ret;
}
//plan cache中add plan 入口
//1.检查内存限制
//2.添加plan