diff --git a/src/observer/mysql/ob_async_cmd_driver.cpp b/src/observer/mysql/ob_async_cmd_driver.cpp index 0a8650905f..917a36b738 100644 --- a/src/observer/mysql/ob_async_cmd_driver.cpp +++ b/src/observer/mysql/ob_async_cmd_driver.cpp @@ -85,6 +85,9 @@ int ObAsyncCmdDriver::response_result(ObMySQLResultSet &result) LOG_WARN("send error packet fail", K(sret), K(ret)); } } + } else if (result.is_with_rows()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("async end trans should not have rows", K(ret)); } else { //what if begin;select 1; select 2; commit; commit; //we should still have to respond a packet to client in terms of the last commit diff --git a/src/sql/engine/cmd/ob_routine_executor.cpp b/src/sql/engine/cmd/ob_routine_executor.cpp index 46b4a62d18..70918a8f53 100644 --- a/src/sql/engine/cmd/ob_routine_executor.cpp +++ b/src/sql/engine/cmd/ob_routine_executor.cpp @@ -542,13 +542,17 @@ int ObAnonymousBlockExecutor::execute(ObExecContext &ctx, ObAnonymousBlockStmt & LOG_WARN("fail to alloc obj array", K(ret), K(stmt.get_params()->count())); } CK (OB_NOT_NULL(ctx.get_field_columns())); - OV (ctx.get_field_columns()->count() == out_args.num_members() - || 0 == ctx.get_field_columns()->count(), - OB_ERR_UNEXPECTED, ctx.get_field_columns()->count(), out_args.num_members()); - if (OB_SUCC(ret) && 0 == ctx.get_field_columns()->count()) { + + // prepare of prexecue protocol fill "field_columns" with question mark count. + // prepare of ps protocol fill "field_columns" with out_args count. + // if count of "field_columns" not equal to out_args, that is legal, reset "field_columns", and refill it. + if (OB_SUCC(ret) && ctx.get_field_columns()->count() != out_args.num_members()) { + CK (ctx.get_field_columns()->count() == stmt.get_params()->count()); + OX (ctx.get_field_columns()->reset()); OZ (ctx.get_field_columns()->reserve(out_args.num_members())); OX (need_push = true); } + int64_t out_idx = 0; for (int64_t i = 0; OB_SUCC(ret) && i < stmt.get_params()->count(); ++i) { if (out_args.has_member(i)) { diff --git a/src/sql/ob_sql.cpp b/src/sql/ob_sql.cpp index 4b8ae44b06..686ba9fa8d 100644 --- a/src/sql/ob_sql.cpp +++ b/src/sql/ob_sql.cpp @@ -1268,17 +1268,39 @@ int ObSql::do_real_prepare(const ObString &sql, param_cnt = parse_result.question_mark_ctx_.count_; info_ctx.normalized_sql_ = sql; if (stmt::T_ANONYMOUS_BLOCK == stmt_type - && context.is_prepare_protocol_ - && context.is_prepare_stage_ - && context.is_pre_execute_) { + && context.is_prepare_protocol_ + && context.is_prepare_stage_ + && context.is_pre_execute_) { + OZ (result.reserve_param_columns(param_cnt)); for (int64_t i = 0; OB_SUCC(ret) && i < param_cnt; ++i) { ObField param_field; param_field.type_.set_type(ObIntType); param_field.cname_ = ObString::make_string("?"); + + // 1. why mark 'SP_PARAM_INOUT' here ? + // if this prepare result reused by ps protocol. (here is prexecute protocol prepare) + // ps protocol will use this result to charge `Is this anonymous block has out row or not?` + // and if anonymous block has not out row, anonymous block can use AsyncCmdPlanDriver. + // but for now, anonymous block do not resolve, we can not know about out row infos. + // so we treat all parameter as inout paramter. + // + // 2. why inout parameter is ok ? + // actully when anonymous block execute, it will do real resolve again, + // and will fill out row infos again, we only use prepare inout infos to avoid to use AsyncCmdPlanDriver, + // because AsyncCmdPlanDriver nerver response result row. + // + // 3. what if new ps prepare reuse prexecute prepare result? + // it is not possible, because, we mark prexecute flag to PsStmtInfo, + // when ps prepare match PsStmtInfo, will check prexecute flag, if flag is true, will evcit it, and do real prepare. + // so here is a defense, avoid ps prepare reuse wrong. + // if for some reason, ps reuse this, we can make sure result is correct. + + param_field.inout_mode_ = ObRoutineParamInOut::SP_PARAM_INOUT; OZ (result.add_param_column(param_field), K(param_field), K(i), K(param_cnt)); } } + } else { if (context.is_dynamic_sql_ && !context.is_dbms_sql_) { parse_result.input_sql_ = parse_result.no_param_sql_; @@ -1896,6 +1918,14 @@ int ObSql::handle_ps_prepare(const ObString &stmt, *stmt_info, is_expired))) { LOG_WARN("fail to check schema version", K(ret)); + } else if (!is_expired + && !context.is_pre_execute_ // ps prepare + && stmt_info->get_is_prexecute() // prexecute prepare + && stmt::T_ANONYMOUS_BLOCK == stmt_info->get_stmt_type() + && FALSE_IT(is_expired = true)) { + // prexecute prepare anonymous block result can not reused by ps prepare. + // but ps prepare anonymous block can reused by prexecute. + // here, we replace to ps prepare result. } else if (is_expired) { stmt_info->set_is_expired(); if (OB_FAIL(ps_cache->erase_stmt_item(inner_stmt_id, ps_key))) { diff --git a/src/sql/plan_cache/ob_prepare_stmt_struct.cpp b/src/sql/plan_cache/ob_prepare_stmt_struct.cpp index 2266645d42..ca4f8316c6 100644 --- a/src/sql/plan_cache/ob_prepare_stmt_struct.cpp +++ b/src/sql/plan_cache/ob_prepare_stmt_struct.cpp @@ -271,6 +271,7 @@ ObPsStmtInfo::ObPsStmtInfo(ObIAllocator *inner_allocator) ref_count_(1), question_mark_count_(0), can_direct_use_param_(false), + is_prexecute_(false), item_and_info_size_(0), last_closed_timestamp_(0), dep_objs_(NULL), @@ -300,6 +301,7 @@ ObPsStmtInfo::ObPsStmtInfo(ObIAllocator *inner_allocator, ref_count_(1), question_mark_count_(0), can_direct_use_param_(false), + is_prexecute_(false), item_and_info_size_(0), last_closed_timestamp_(0), dep_objs_(NULL), @@ -456,6 +458,7 @@ int ObPsStmtInfo::deep_copy(const ObPsStmtInfo &other) num_of_returning_into_ = other.num_of_returning_into_; is_sensitive_sql_ = other.is_sensitive_sql_; can_direct_use_param_ = other.can_direct_use_param(); + is_prexecute_ = other.get_is_prexecute(); item_and_info_size_ = other.item_and_info_size_; ps_item_ = other.ps_item_; tenant_version_ = other.tenant_version_; diff --git a/src/sql/plan_cache/ob_prepare_stmt_struct.h b/src/sql/plan_cache/ob_prepare_stmt_struct.h index 826c11037c..d7ba12e08d 100644 --- a/src/sql/plan_cache/ob_prepare_stmt_struct.h +++ b/src/sql/plan_cache/ob_prepare_stmt_struct.h @@ -193,6 +193,10 @@ public: inline const ObPsSqlMeta &get_ps_sql_meta() const { return ps_sql_meta_; } inline bool can_direct_use_param() const { return can_direct_use_param_; } inline void set_can_direct_use_param(bool v) { can_direct_use_param_ = v; } + + inline bool get_is_prexecute() const { return is_prexecute_; } + inline void set_is_prexecute(bool v) { is_prexecute_ = v; } + inline void set_ps_stmt_checksum(uint64_t ps_checksum) { ps_stmt_checksum_ = ps_checksum; } inline uint64_t get_ps_stmt_checksum() const { return ps_stmt_checksum_; } @@ -266,6 +270,7 @@ private: // for call procedure bool can_direct_use_param_; + bool is_prexecute_; int64_t item_and_info_size_; // mem_used_; int64_t last_closed_timestamp_; //引用计数上次减到1时的时间; ObSchemaObjVersion *dep_objs_; diff --git a/src/sql/plan_cache/ob_ps_cache.cpp b/src/sql/plan_cache/ob_ps_cache.cpp index c9d8188d20..c3334cd3c4 100644 --- a/src/sql/plan_cache/ob_ps_cache.cpp +++ b/src/sql/plan_cache/ob_ps_cache.cpp @@ -527,6 +527,7 @@ int ObPsCache::fill_ps_stmt_info(const ObResultSet &result, } } if (OB_SUCC(ret)) { + ps_stmt_info.set_is_prexecute(sql_ctx->is_pre_execute_); ps_stmt_info.set_question_mark_count(param_cnt); // only used when returning into ps_stmt_info.set_num_of_returning_into(returning_into_parm_num);