[FEAT MERGE] implement values statement

Co-authored-by: wangt1xiuyi <13547954130@163.com>
This commit is contained in:
jingtaoye35
2023-08-30 10:44:18 +00:00
committed by ob-robot
parent 5a62e3cee9
commit 8015a958d0
59 changed files with 3135 additions and 536 deletions

View File

@ -83,6 +83,7 @@ int ObExprValuesSpec::serialize(char *buf,
OB_UNIS_ENCODE(contain_ab_param_);
OB_UNIS_ENCODE(ins_values_batch_opt_);
OB_UNIS_ENCODE(column_names_);
OB_UNIS_ENCODE(array_group_idx_);
}
}
@ -107,6 +108,7 @@ OB_DEF_SERIALIZE_SIZE(ObExprValuesSpec)
OB_UNIS_ADD_LEN(contain_ab_param_);
OB_UNIS_ADD_LEN(ins_values_batch_opt_);
OB_UNIS_ADD_LEN(column_names_);
OB_UNIS_ADD_LEN(array_group_idx_);
return len;
}
@ -120,6 +122,7 @@ OB_DEF_SERIALIZE(ObExprValuesSpec)
OB_UNIS_ENCODE(contain_ab_param_);
OB_UNIS_ENCODE(ins_values_batch_opt_);
OB_UNIS_ENCODE(column_names_);
OB_UNIS_ENCODE(array_group_idx_);
return ret;
}
@ -133,6 +136,7 @@ OB_DEF_DESERIALIZE(ObExprValuesSpec)
OB_UNIS_DECODE(contain_ab_param_);
OB_UNIS_DECODE(ins_values_batch_opt_);
OB_UNIS_DECODE(column_names_);
OB_UNIS_DECODE(array_group_idx_);
return ret;
}
@ -180,6 +184,7 @@ int64_t ObExprValuesSpec::get_serialize_size_(const ObPhyOpSeriCtx &seri_ctx) co
OB_UNIS_ADD_LEN(contain_ab_param_);
OB_UNIS_ADD_LEN(ins_values_batch_opt_);
OB_UNIS_ADD_LEN(column_names_);
OB_UNIS_ADD_LEN(array_group_idx_);
return len;
}
@ -193,7 +198,10 @@ ObExprValuesOp::ObExprValuesOp(ObExecContext &exec_ctx,
cm_(CM_NONE),
err_log_service_(get_eval_ctx()),
err_log_rt_def_(),
has_sequence_(false)
has_sequence_(false),
real_value_cnt_(0),
param_idx_(0),
param_cnt_(0)
{
}
@ -203,14 +211,17 @@ int ObExprValuesOp::inner_open()
node_idx_ = 0;
const bool is_explicit_cast = false;
const int32_t result_flag = 0;
if (OB_FAIL(datum_caster_.init(eval_ctx_.exec_ctx_))) {
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
if (OB_ISNULL(plan_ctx) || OB_ISNULL(ctx_.get_sql_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected NULL ptr", K(ret), KP(plan_ctx), KP(ctx_.get_sql_ctx()));
} else if (OB_FAIL(datum_caster_.init(eval_ctx_.exec_ctx_))) {
LOG_WARN("fail to init datum_caster", K(ret));
} else if (OB_FAIL(ObSQLUtils::get_default_cast_mode(is_explicit_cast, result_flag,
ctx_.get_my_session(), cm_))) {
LOG_WARN("fail to get_default_cast_mode", K(ret));
} else {
// see ObSQLUtils::wrap_column_convert_ctx(), add CM_WARN_ON_FAIL for INSERT IGNORE.
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
cm_ = cm_ | CM_COLUMN_CONVERT;
if (plan_ctx->is_ignore_stmt() || !is_strict_mode(ctx_.get_my_session()->get_sql_mode())) {
// CM_CHARSET_CONVERT_IGNORE_ERR is will give '?' when do string_string convert.
@ -226,6 +237,29 @@ int ObExprValuesOp::inner_open()
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected child cnt", K(child_cnt_), K(ret));
}
if (OB_SUCC(ret)) {
if (MY_SPEC.contain_ab_param_ && !ctx_.has_dynamic_values_table()) {
int64_t value_group = MY_SPEC.contain_ab_param_ ?
ctx_.get_sql_ctx()->get_batch_params_count() : 1;
real_value_cnt_ = MY_SPEC.get_value_count() * value_group;
param_idx_ = 0;
param_cnt_ = plan_ctx->get_datum_param_store().count();
} else if (MY_SPEC.contain_ab_param_ && ctx_.has_dynamic_values_table() &&
MY_SPEC.array_group_idx_ >= 0) {
if (OB_UNLIKELY(MY_SPEC.array_group_idx_ >= plan_ctx->get_array_param_groups().count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected idx", K(ret), K(MY_SPEC.array_group_idx_));
} else {
ObArrayParamGroup &array_param_group = plan_ctx->get_array_param_groups().at(MY_SPEC.array_group_idx_);
real_value_cnt_ = MY_SPEC.get_value_count() * array_param_group.row_count_;
param_cnt_ = array_param_group.column_count_;
param_idx_ = array_param_group.start_param_idx_;
}
} else {
real_value_cnt_ = MY_SPEC.get_value_count();
}
LOG_TRACE("init expr values op", K(real_value_cnt_), K(param_cnt_), K(param_idx_));
}
}
return ret;
}
@ -314,11 +348,9 @@ int ObExprValuesOp::get_real_batch_obj_type(ObDatumMeta &src_meta,
int64_t group_idx)
{
int ret = OB_SUCCESS;
if (MY_SPEC.ins_values_batch_opt_
&& T_QUESTIONMARK == src_expr->type_
&& (src_expr->frame_idx_
< spec_.plan_->get_expr_frame_info().const_frame_.count()
+ spec_.plan_->get_expr_frame_info().param_frame_.count())) {
if (T_QUESTIONMARK == src_expr->type_ &&
src_expr->frame_idx_ < spec_.plan_->get_expr_frame_info().const_frame_.count() +
spec_.plan_->get_expr_frame_info().param_frame_.count()) {
int64_t param_idx = src_expr->extra_;
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
const ObSqlArrayObj *array_obj = NULL;
@ -426,20 +458,16 @@ OB_INLINE int ObExprValuesOp::calc_next_row()
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
int64_t col_num = MY_SPEC.get_output_count();
int64_t col_idx = 0;
int64_t batch_cnt = ctx_.get_sql_ctx()->get_batch_params_count();
int64_t value_group = (MY_SPEC.contain_ab_param_ ? batch_cnt : 1);
int64_t real_value_cnt = MY_SPEC.get_value_count() * value_group;
if (node_idx_ == real_value_cnt) {
if (node_idx_ == real_value_cnt_) {
// there is no values any more
ret = OB_ITER_END;
} else {
bool is_break = false;
ObDatum *datum = NULL;
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
int64_t group_idx = 0;
if (MY_SPEC.contain_ab_param_) {
group_idx = node_idx_ / MY_SPEC.get_value_count();
if (OB_FAIL(plan_ctx->replace_batch_param_datum(group_idx))) {
if (OB_FAIL(plan_ctx->replace_batch_param_datum(group_idx, param_idx_, param_cnt_))) {
LOG_WARN("replace batch param datum failed", K(ret), K(group_idx));
}
}
@ -450,7 +478,7 @@ OB_INLINE int ObExprValuesOp::calc_next_row()
}
}
}
while (OB_SUCC(ret) && node_idx_ < real_value_cnt && !is_break) {
while (OB_SUCC(ret) && node_idx_ < real_value_cnt_ && !is_break) {
int64_t real_node_idx = node_idx_ % MY_SPEC.get_value_count();
int64_t row_num = real_node_idx / col_num + 1;
ObExpr *src_expr = MY_SPEC.values_.at(real_node_idx);

View File

@ -36,9 +36,10 @@ public:
str_values_array_(alloc),
err_log_ct_def_(alloc),
contain_ab_param_(0),
ins_values_batch_opt_(false)
ins_values_batch_opt_(false),
array_group_idx_(-1)
{ }
INHERIT_TO_STRING_KV("op_spec", ObOpSpec, K(array_group_idx_));
int64_t get_value_count() const { return values_.count(); }
int64_t get_is_strict_json_desc_count() const { return is_strict_json_desc_.count(); }
virtual int serialize(char *buf,
@ -57,6 +58,7 @@ public:
ObErrLogCtDef err_log_ct_def_;
int64_t contain_ab_param_;
bool ins_values_batch_opt_;
int64_t array_group_idx_;
};
class ObExprValuesOp : public ObOperator
@ -94,6 +96,9 @@ private:
ObErrLogService err_log_service_;
ObErrLogRtDef err_log_rt_def_;
bool has_sequence_;
int64_t real_value_cnt_;
int64_t param_idx_;
int64_t param_cnt_;
};
} // end namespace sql

View File

@ -539,7 +539,8 @@ OB_NOINLINE int ObPreCalcExprFrameInfo::do_batch_stmt_eval(ObExecContext &exec_c
} // for end
//replace array param to the real param and eval the pre calc expr
for (int64_t group_id = 0; OB_SUCC(ret) && group_id < group_cnt; ++group_id) {
if (OB_FAIL(exec_ctx.get_physical_plan_ctx()->replace_batch_param_datum(group_id))) {
if (OB_FAIL(exec_ctx.get_physical_plan_ctx()->replace_batch_param_datum(group_id, 0,
exec_ctx.get_physical_plan_ctx()->get_datum_param_store().count()))) {
LOG_WARN("replace batch param frame failed", K(ret));
}
//params of each group need to clear the datum evaluted flags before calc the pre_calc_expr

View File

@ -470,6 +470,13 @@ public:
void set_cur_rownum(int64_t cur_rownum) { cur_row_num_ = cur_rownum; }
int64_t get_cur_rownum() { return cur_row_num_; }
bool use_temp_expr_ctx_cache() const { return use_temp_expr_ctx_cache_; }
bool has_dynamic_values_table() const {
bool ret = false;
if (NULL != phy_plan_ctx_) {
ret = phy_plan_ctx_->get_array_param_groups().count() > 0;
}
return ret;
}
private:
int build_temp_expr_ctx(const ObTempExpr &temp_expr, ObTempExprCtx *&temp_expr_ctx);
int set_phy_op_ctx_ptr(uint64_t index, void *phy_op);

View File

@ -77,6 +77,7 @@ ObPhysicalPlanCtx::ObPhysicalPlanCtx(common::ObIAllocator &allocator)
tenant_schema_version_(OB_INVALID_VERSION),
orig_question_mark_cnt_(0),
tenant_srs_version_(OB_INVALID_VERSION),
array_param_groups_(),
affected_rows_(0),
is_affect_found_row_(false),
found_rows_(0),
@ -555,30 +556,65 @@ OB_INLINE void ObPhysicalPlanCtx::get_param_frame_info(int64_t param_idx,
eval_info = reinterpret_cast<ObEvalInfo *>(param_frame_ptrs_.at(idx) + off + sizeof(ObDatum));
}
int ObPhysicalPlanCtx::replace_batch_param_datum(int64_t cur_group_id)
int ObPhysicalPlanCtx::replace_batch_param_datum(const int64_t cur_group_id,
const int64_t start_param,
const int64_t param_cnt)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < datum_param_store_.count(); i++) {
if (datum_param_store_.at(i).meta_.is_ext_sql_array()) {
//need to expand the real param to param frame
ObDatum *datum = nullptr;
ObEvalInfo *eval_info = nullptr;
get_param_frame_info(i, datum, eval_info);
const ObSqlDatumArray *datum_array = datum_param_store_.at(i).get_sql_datum_array();;
if (OB_UNLIKELY(cur_group_id < 0) || OB_UNLIKELY(cur_group_id >= datum_array->count_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid group id", K(ret), K(cur_group_id), K(datum_array->count_));
} else {
//assign datum ptr to the real param datum
*datum = datum_array->data_[cur_group_id];
eval_info->evaluated_ = true;
LOG_DEBUG("replace batch param datum", K(cur_group_id), KPC(datum), K(datum));
const int64_t datum_cnt = datum_param_store_.count();
if (OB_UNLIKELY(start_param < 0 || start_param + param_cnt > datum_cnt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected params", K(ret), K(start_param), K(param_cnt), K(datum_cnt));
} else {
for (int64_t i = start_param; OB_SUCC(ret) && i < start_param + param_cnt; i++) {
if (datum_param_store_.at(i).meta_.is_ext_sql_array()) {
//need to expand the real param to param frame
ObDatum *datum = nullptr;
ObEvalInfo *eval_info = nullptr;
get_param_frame_info(i, datum, eval_info);
const ObSqlDatumArray *datum_array = datum_param_store_.at(i).get_sql_datum_array();;
if (OB_UNLIKELY(cur_group_id < 0) || OB_UNLIKELY(cur_group_id >= datum_array->count_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid group id", K(ret), K(cur_group_id), K(datum_array->count_));
} else {
//assign datum ptr to the real param datum
*datum = datum_array->data_[cur_group_id];
eval_info->evaluated_ = true;
LOG_DEBUG("replace batch param datum", K(cur_group_id), KPC(datum), K(datum));
}
}
}
}
return ret;
}
OB_DEF_SERIALIZE(ObArrayParamGroup)
{
int ret = OB_SUCCESS;
OB_UNIS_ENCODE(row_count_);
OB_UNIS_ENCODE(column_count_);
OB_UNIS_ENCODE(start_param_idx_);
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObArrayParamGroup)
{
int64_t len = 0;
OB_UNIS_ADD_LEN(row_count_);
OB_UNIS_ADD_LEN(column_count_);
OB_UNIS_ADD_LEN(start_param_idx_);
return len;
}
OB_DEF_DESERIALIZE(ObArrayParamGroup)
{
int ret = OB_SUCCESS;
OB_UNIS_DECODE(row_count_);
OB_UNIS_DECODE(column_count_);
OB_UNIS_DECODE(start_param_idx_);
return ret;
}
OB_DEF_SERIALIZE(ObPhysicalPlanCtx)
{
int ret = OB_SUCCESS;
@ -686,6 +722,13 @@ OB_DEF_SERIALIZE(ObPhysicalPlanCtx)
OB_UNIS_ENCODE(plan_start_time_);
OB_UNIS_ENCODE(last_trace_id_);
OB_UNIS_ENCODE(tenant_srs_version_);
OB_UNIS_ENCODE(original_param_cnt_);
OB_UNIS_ENCODE(array_param_groups_.count());
if (OB_SUCC(ret) && array_param_groups_.count() > 0) {
for (int64_t i = 0 ; OB_SUCC(ret) && i < array_param_groups_.count(); i++) {
OB_UNIS_ENCODE(array_param_groups_.at(i));
}
}
return ret;
}
@ -769,6 +812,13 @@ OB_DEF_SERIALIZE_SIZE(ObPhysicalPlanCtx)
OB_UNIS_ADD_LEN(plan_start_time_);
OB_UNIS_ADD_LEN(last_trace_id_);
OB_UNIS_ADD_LEN(tenant_srs_version_);
OB_UNIS_ADD_LEN(original_param_cnt_);
OB_UNIS_ADD_LEN(array_param_groups_.count());
if (array_param_groups_.count() > 0) {
for (int64_t i = 0 ; i < array_param_groups_.count(); i++) {
OB_UNIS_ADD_LEN(array_param_groups_.at(i));
}
}
return len;
}
@ -851,6 +901,33 @@ OB_DEF_DESERIALIZE(ObPhysicalPlanCtx)
}
OB_UNIS_DECODE(last_trace_id_);
OB_UNIS_DECODE(tenant_srs_version_);
OB_UNIS_DECODE(original_param_cnt_);
int64_t array_group_count = 0;
OB_UNIS_DECODE(array_group_count);
if (OB_SUCC(ret) && array_group_count > 0) {
for (int64_t i = 0 ; OB_SUCC(ret) && i < array_group_count; i++) {
ObArrayParamGroup array_p_group;
OB_UNIS_DECODE(array_p_group);
if (OB_FAIL(array_param_groups_.push_back(array_p_group))) {
LOG_WARN("failed to push back");
}
}
}
if (OB_SUCC(ret) && array_group_count > 0 &&
datum_param_store_.count() == 0 &&
datum_param_store_.count() != param_store_.count()) {
if (OB_FAIL(datum_param_store_.prepare_allocate(param_store_.count()))) {
LOG_WARN("fail to prepare allocate", K(ret), K(param_store_.count()));
}
for (int64_t i = 0; OB_SUCC(ret) && i < param_store_.count(); i++) {
ObDatumObjParam &datum_param = datum_param_store_.at(i);
if (OB_FAIL(datum_param.alloc_datum_reserved_buff(param_store_.at(i).meta_, allocator_))) {
LOG_WARN("alloc datum reserved buffer failed", K(ret));
} else if (OB_FAIL(datum_param.from_objparam(param_store_.at(i), &allocator_))) {
LOG_WARN("fail to convert obj param", K(ret), K(param_store_.at(i)));
}
}
}
return ret;
}

View File

@ -72,6 +72,21 @@ struct ObRemoteSqlInfo
bool sql_from_pl_;
};
/* refer to a group of array params
* values clause using now
*/
struct ObArrayParamGroup {
OB_UNIS_VERSION(1);
public:
ObArrayParamGroup() : row_count_(0), column_count_(0), start_param_idx_(0) {}
ObArrayParamGroup(const int64_t row_cnt, const int64_t param_cnt, const int64_t start_param_idx)
: row_count_(row_cnt), column_count_(param_cnt), start_param_idx_(start_param_idx) {}
int64_t row_count_;
int64_t column_count_;
int64_t start_param_idx_; // in param store
TO_STRING_KV(K(row_count_), K(column_count_), K(start_param_idx_));
};
class ObPhysicalPlanCtx
{
OB_UNIS_VERSION(1);
@ -435,7 +450,7 @@ public:
bool get_is_ps_rewrite_sql() const { return is_ps_rewrite_sql_; }
void set_plan_start_time(int64_t t) { plan_start_time_ = t; }
int64_t get_plan_start_time() const { return plan_start_time_; }
int replace_batch_param_datum(int64_t cur_group_id);
int replace_batch_param_datum(const int64_t cur_group_id, const int64_t start_param, const int64_t param_cnt);
void set_last_trace_id(const common::ObCurTraceId::TraceId &trace_id)
{
last_trace_id_ = trace_id;
@ -443,7 +458,8 @@ public:
const common::ObCurTraceId::TraceId &get_last_trace_id() const { return last_trace_id_; }
common::ObCurTraceId::TraceId &get_last_trace_id() { return last_trace_id_; }
void set_spm_timeout_timestamp(const int64_t timeout) { spm_ts_timeout_us_ = timeout; }
const ObIArray<ObArrayParamGroup> &get_array_param_groups() const { return array_param_groups_; }
ObIArray<ObArrayParamGroup> &get_array_param_groups() { return array_param_groups_; }
private:
void reset_datum_frame(char *frame, int64_t expr_cnt);
int extend_param_frame(const int64_t old_size);
@ -507,6 +523,7 @@ private:
int64_t orig_question_mark_cnt_;
common::ObCurTraceId::TraceId last_trace_id_;
int64_t tenant_srs_version_;
ObSEArray<ObArrayParamGroup, 2> array_param_groups_;
private:
/**