[CP] fix serialize ObSqlArrayObj bugs
This commit is contained in:
@ -189,6 +189,8 @@ int ObValuesTableCompression::rebuild_new_raw_sql(ObPlanCacheCtx &pc_ctx,
|
||||
const int64_t param_cnt,
|
||||
const int64_t delta_length,
|
||||
const ObString &no_param_sql,
|
||||
ObIArray<int64_t> &no_param_pos,
|
||||
ObIArray<int64_t> &raw_sql_offset,
|
||||
ObString &new_raw_sql,
|
||||
int64_t &no_param_sql_pos,
|
||||
int64_t &new_raw_pos)
|
||||
@ -211,6 +213,7 @@ int ObValuesTableCompression::rebuild_new_raw_sql(ObPlanCacheCtx &pc_ctx,
|
||||
LOG_WARN("get unexpected NULL ptr", K(ret), KP(pc_param));
|
||||
} else {
|
||||
int64_t param_pos = pc_param->node_->pos_ - delta_length; // get pos is in new no param sql
|
||||
int64_t param_raw_offset = pc_param->node_->raw_sql_offset_;
|
||||
int64_t param_len = pc_param->node_->text_len_;
|
||||
len = param_pos - no_param_sql_pos;
|
||||
if (OB_UNLIKELY(len < 0) || OB_UNLIKELY(new_raw_pos + len + param_len > buff_len)) {
|
||||
@ -225,10 +228,17 @@ int ObValuesTableCompression::rebuild_new_raw_sql(ObPlanCacheCtx &pc_ctx,
|
||||
}
|
||||
if (param_pos == no_param_sql_pos) {
|
||||
//copy raw param
|
||||
param_raw_offset = new_raw_pos;
|
||||
MEMCPY(buff + new_raw_pos, pc_param->node_->raw_text_, param_len);
|
||||
new_raw_pos += param_len;
|
||||
no_param_sql_pos += 1;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(no_param_pos.push_back(param_pos))) {
|
||||
LOG_WARN("failed to push back", K(ret));
|
||||
} else if (OB_FAIL(raw_sql_offset.push_back(param_raw_offset))) {
|
||||
LOG_WARN("failed to push back", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -260,7 +270,8 @@ int ObValuesTableCompression::try_batch_exec_params(ObIAllocator &allocator,
|
||||
int64_t no_param_sql_pos = 0;
|
||||
int64_t new_raw_sql_pos = 0;
|
||||
ObString &new_raw_sql = pc_ctx.new_raw_sql_;
|
||||
ObSEArray<int64_t, 16> raw_pos;
|
||||
ObSEArray<int64_t, 16> no_param_pos;
|
||||
ObSEArray<int64_t, 16> raw_sql_offset;
|
||||
ObPhysicalPlanCtx *phy_ctx = NULL;
|
||||
uint64_t data_version = 0;
|
||||
if (pc_ctx.sql_ctx_.handle_batched_multi_stmt() ||
|
||||
@ -270,9 +281,8 @@ int ObValuesTableCompression::try_batch_exec_params(ObIAllocator &allocator,
|
||||
fp_result.values_tokens_.empty() ||
|
||||
!GCONF._enable_values_table_folding) {
|
||||
/* do nothing */
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(session_info.get_effective_tenant_id(), data_version))) {
|
||||
LOG_WARN("get tenant data version failed", K(ret), K(session_info.get_effective_tenant_id()));
|
||||
} else if (data_version < DATA_VERSION_4_2_1_0 ||
|
||||
/* TODO NOTE@yejingtao.yjt: remove following upgrade checking after next barrier version */
|
||||
} else if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_1_2 ||
|
||||
!is_support_compress_values_table(pc_ctx.raw_sql_)) {
|
||||
/* do nothing */
|
||||
} else if (OB_ISNULL(phy_ctx = pc_ctx.exec_ctx_.get_physical_plan_ctx())) {
|
||||
@ -306,20 +316,15 @@ int ObValuesTableCompression::try_batch_exec_params(ObIAllocator &allocator,
|
||||
}
|
||||
}
|
||||
for (int64_t j = last_raw_param_idx; OB_SUCC(ret) && j < param_idx + param_count; j++) {
|
||||
ObPCParam *pc_param = pc_ctx.fp_result_.raw_params_.at(j);
|
||||
if (OB_ISNULL(pc_param) || OB_ISNULL(pc_param->node_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("pc_param is null", K(ret), KP(pc_param));
|
||||
} else if (OB_FAIL(temp_store.push_back(pc_param))) {
|
||||
LOG_WARN("failed to push back", K(ret));
|
||||
} else if (OB_FAIL(raw_pos.push_back(pc_param->node_->pos_ - total_delta_len))) {
|
||||
if (OB_FAIL(temp_store.push_back(pc_ctx.fp_result_.raw_params_.at(j)))) {
|
||||
LOG_WARN("failed to push back", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(rebuild_new_raw_sql(pc_ctx, temp_store, new_raw_idx,
|
||||
temp_store.count() - new_raw_idx, total_delta_len, new_no_param_sql,
|
||||
new_raw_sql, no_param_sql_pos, new_raw_sql_pos))) {
|
||||
no_param_pos, raw_sql_offset, new_raw_sql, no_param_sql_pos,
|
||||
new_raw_sql_pos))) {
|
||||
LOG_WARN("failed to rebuild new raw sql", K(ret));
|
||||
} else {
|
||||
int64_t batch_begin_idx = new_raw_idx + param_idx - last_raw_param_idx;
|
||||
@ -337,24 +342,20 @@ int ObValuesTableCompression::try_batch_exec_params(ObIAllocator &allocator,
|
||||
}
|
||||
if (OB_SUCC(ret) && can_fold_params) {
|
||||
for (int64_t j = last_raw_param_idx; OB_SUCC(ret) && j < pc_ctx.fp_result_.raw_params_.count(); j++) {
|
||||
ObPCParam *pc_param = pc_ctx.fp_result_.raw_params_.at(j);
|
||||
if (OB_ISNULL(pc_param) || OB_ISNULL(pc_param->node_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("pc_param is null", K(ret), KP(pc_param));
|
||||
} else if (OB_FAIL(temp_store.push_back(pc_param))) {
|
||||
LOG_WARN("failed to push back", K(ret));
|
||||
} else if (OB_FAIL(raw_pos.push_back(pc_param->node_->pos_ - total_delta_len))) {
|
||||
if (OB_FAIL(temp_store.push_back(pc_ctx.fp_result_.raw_params_.at(j)))) {
|
||||
LOG_WARN("failed to push back", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(rebuild_new_raw_sql(pc_ctx, temp_store, new_raw_idx,
|
||||
temp_store.count() - new_raw_idx, total_delta_len, new_no_param_sql,
|
||||
new_raw_sql, no_param_sql_pos, new_raw_sql_pos))) {
|
||||
no_param_pos, raw_sql_offset, new_raw_sql, no_param_sql_pos,
|
||||
new_raw_sql_pos))) {
|
||||
LOG_WARN("failed to rebuild new raw sql", K(ret));
|
||||
} else if (OB_UNLIKELY(raw_pos.count() != temp_store.count())) {
|
||||
} else if (OB_UNLIKELY(no_param_pos.count() != temp_store.count()) ||
|
||||
OB_UNLIKELY(raw_sql_offset.count() != temp_store.count())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("param is invalid", K(ret));
|
||||
LOG_WARN("params is invalid", K(ret));
|
||||
} else {
|
||||
int64_t len = new_no_param_sql.length() - no_param_sql_pos;
|
||||
if (OB_UNLIKELY(len < 0) || OB_UNLIKELY(new_raw_sql_pos + len > buff_len)) {
|
||||
@ -385,7 +386,8 @@ int ObValuesTableCompression::try_batch_exec_params(ObIAllocator &allocator,
|
||||
fp_result.raw_params_.set_capacity(temp_store.count());
|
||||
for (int64_t i = 0; i < temp_store.count(); i++) {
|
||||
// checked null before
|
||||
temp_store.at(i)->node_->pos_ = raw_pos.at(i);
|
||||
temp_store.at(i)->node_->pos_ = no_param_pos.at(i);
|
||||
temp_store.at(i)->node_->raw_sql_offset_ = raw_sql_offset.at(i);
|
||||
}
|
||||
if (OB_FAIL(fp_result.raw_params_.assign(temp_store))) {
|
||||
LOG_WARN("fail to assign raw_param", K(ret));
|
||||
@ -446,12 +448,14 @@ int ObValuesTableCompression::resolve_params_for_values_clause(ObPlanCacheCtx &p
|
||||
obj_param, is_param, enable_decimal_int))) {
|
||||
LOG_WARN("failed to resolver param", K(ret), K(raw_idx));
|
||||
} else if (!is_param) {
|
||||
not_param_cnt++; // in value clause, which wonn't happen actually
|
||||
not_param_cnt++;
|
||||
} else if (OB_FAIL(ab_params->push_back(obj_param))) {
|
||||
LOG_WARN("fail to push item to array", K(ret), K(raw_idx));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
array_param_groups.at(i).start_param_idx_ -= not_param_cnt;
|
||||
}
|
||||
// 1.2 build array_param in batch group
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < param_num; j++, raw_idx++, array_param_idx++) {
|
||||
ObArrayPCParam *raw_array_param = pc_ctx.fp_result_.array_params_.at(array_param_idx);
|
||||
@ -564,7 +568,7 @@ int ObValuesTableCompression::resolve_params_for_values_clause(ObPlanCacheCtx &p
|
||||
bool enable_decimal_int = false;
|
||||
const ObIArray<ObCharsetType> ¶m_charset_type = pc_ctx.param_charset_type_;
|
||||
if (OB_UNLIKELY(!pc_ctx.exec_ctx_.has_dynamic_values_table()) || OB_ISNULL(session) ||
|
||||
OB_ISNULL(phy_ctx) || OB_UNLIKELY(param_charset_type.count() != raw_param_cnt)) {
|
||||
OB_ISNULL(phy_ctx) || OB_UNLIKELY(param_charset_type.count() > raw_param_cnt)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql should be mutil stmt", K(ret), KP(session), KP(phy_ctx), K(raw_param_cnt),
|
||||
K(param_charset_type.count()));
|
||||
@ -573,6 +577,29 @@ int ObValuesTableCompression::resolve_params_for_values_clause(ObPlanCacheCtx &p
|
||||
} else {
|
||||
ParamStore &phy_param_store = phy_ctx->get_param_store_for_update();
|
||||
ObIArray<ObArrayParamGroup> &array_param_groups = phy_ctx->get_array_param_groups();
|
||||
int64_t not_param_offset = 0;
|
||||
int64_t tmp_raw_idx = 0;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < array_param_groups.count(); i++) {
|
||||
int64_t array_idx = array_param_groups.at(i).start_param_idx_;
|
||||
while (OB_SUCC(ret) && tmp_raw_idx < array_idx && tmp_raw_idx < raw_param_cnt) {
|
||||
ObPCParam *pc_param = pc_ctx.fp_result_.raw_params_.at(tmp_raw_idx);
|
||||
if (OB_ISNULL(pc_param)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("null expr", K(ret));
|
||||
} else if (pc_param->flag_ == NOT_PARAM) {
|
||||
not_param_offset++;
|
||||
}
|
||||
tmp_raw_idx++;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_UNLIKELY(not_param_offset > array_idx)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected param", K(ret), K(array_idx), K(not_param_offset));
|
||||
} else {
|
||||
array_param_groups.at(i).start_param_idx_ -= not_param_offset;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < array_param_groups.count(); ++i) {
|
||||
int64_t param_num = array_param_groups.at(i).column_count_;
|
||||
int64_t batch_num = array_param_groups.at(i).row_count_;
|
||||
|
||||
@ -74,6 +74,8 @@ private:
|
||||
const int64_t param_cnt,
|
||||
const int64_t delta_length,
|
||||
const common::ObString &no_param_sql,
|
||||
common::ObIArray<int64_t> &no_param_pos,
|
||||
common::ObIArray<int64_t> &raw_sql_offset,
|
||||
common::ObString &new_raw_sql,
|
||||
int64_t &no_param_sql_pos,
|
||||
int64_t &new_raw_pos);
|
||||
|
||||
Reference in New Issue
Block a user