/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX SQL #include "sql/ob_result_set.h" #include "lib/oblog/ob_trace_log.h" #include "lib/charset/ob_charset.h" #include "lib/utility/ob_macro_utils.h" #include "rpc/obmysql/ob_mysql_global.h" #include "rpc/obmysql/ob_mysql_field.h" #include "lib/oblog/ob_log_module.h" #include "engine/ob_physical_plan.h" #include "sql/parser/parse_malloc.h" #include "share/system_variable/ob_system_variable.h" #include "share/system_variable/ob_system_variable_alias.h" #include "sql/session/ob_sql_session_info.h" #include "sql/resolver/ob_cmd.h" #include "sql/engine/px/ob_px_admission.h" #include "sql/engine/cmd/ob_table_direct_insert_service.h" #include "sql/executor/ob_executor.h" #include "sql/executor/ob_cmd_executor.h" #include "sql/resolver/dml/ob_select_stmt.h" #include "sql/resolver/cmd/ob_call_procedure_stmt.h" #include "sql/optimizer/ob_optimizer_util.h" #include "sql/optimizer/ob_log_plan_factory.h" #include "sql/ob_sql_trans_util.h" #include "sql/ob_end_trans_callback.h" #include "sql/session/ob_sql_session_info.h" #include "lib/profile/ob_perf_event.h" #include "sql/plan_cache/ob_cache_object_factory.h" #include "share/ob_cluster_version.h" #include "storage/tx/ob_trans_define.h" #include "pl/ob_pl_user_type.h" #include "pl/ob_pl_stmt.h" #include "observer/ob_server_struct.h" #include "storage/tx/wrs/ob_weak_read_service.h" // ObWeakReadService #include "storage/tx/wrs/ob_i_weak_read_service.h" // WRS_LEVEL_SERVER #include "storage/tx/wrs/ob_weak_read_util.h" // ObWeakReadUtil #include "observer/ob_req_time_service.h" #include "sql/dblink/ob_dblink_utils.h" #include "sql/dblink/ob_tm_service.h" #include using namespace oceanbase::sql; using namespace oceanbase::common; using namespace oceanbase::share; using namespace oceanbase::share::schema; using namespace oceanbase::storage; using namespace oceanbase::transaction; ObResultSet::~ObResultSet() { bool is_remote_sql = false; if (OB_NOT_NULL(get_exec_context().get_sql_ctx())) { is_remote_sql = get_exec_context().get_sql_ctx()->is_remote_sql_; } ObPhysicalPlan* physical_plan = get_physical_plan(); if (OB_NOT_NULL(physical_plan) && !is_remote_sql && OB_UNLIKELY(physical_plan->is_limited_concurrent_num())) { physical_plan->dec_concurrent_num(); } // when ObExecContext is destroyed, it also depends on the physical plan, so need to ensure // that inner_exec_ctx_ is destroyed before cache_obj_guard_ if (NULL != inner_exec_ctx_) { inner_exec_ctx_->~ObExecContext(); inner_exec_ctx_ = NULL; } ObPlanCache *pc = my_session_.get_plan_cache_directly(); if (OB_NOT_NULL(pc)) { cache_obj_guard_.force_early_release(pc); } // Always called at the end of the ObResultSet destructor update_end_time(); is_init_ = false; } int ObResultSet::open_cmd() { int ret = OB_SUCCESS; FLTSpanGuard(cmd_open); if (OB_ISNULL(cmd_)) { LOG_ERROR("cmd and physical_plan both not init", K(stmt_type_)); ret = common::OB_NOT_INIT; } else if (OB_FAIL(init_cmd_exec_context(get_exec_context()))) { LOG_WARN("fail init exec context", K(ret), K_(stmt_type)); } else if (OB_FAIL(on_cmd_execute())) { LOG_WARN("fail start cmd trans", K(ret), K_(stmt_type)); } else if (OB_FAIL(ObCmdExecutor::execute(get_exec_context(), *cmd_))) { SQL_LOG(WARN, "execute cmd failed", K(ret)); } return ret; } OB_INLINE int ObResultSet::open_plan() { int ret = OB_SUCCESS; //ObLimit *limit_opt = NULL; ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); if (OB_ISNULL(physical_plan_)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("invalid physical plan", K(physical_plan_)); } else { has_top_limit_ = physical_plan_->has_top_limit(); if (OB_SUCC(ret)) { if (OB_FAIL(ObPxAdmission::enter_query_admission(my_session_, get_exec_context(), get_stmt_type(), *get_physical_plan()))) { // query is not admitted to run // Note: explain statement's phy plan is target query's plan, don't enable admission test LOG_DEBUG("Query is not admitted to run, try again", K(ret)); } else if (THIS_WORKER.is_timeout()) { // packet有可能在队列里面呆的时间过长,到这里已经超时, // 如果这里不检查,则会继续运行,很可能进入到其他模块里面, // 其他模块检测到超时,引起其他模块的疑惑,因此在这里加上超时检查。 // 之所以在这个位置才检查,是为了考虑hint中的超时时间, // 因为在init_plan_exec_context函数里面才将hint的超时时间设进THIS_WORKER中。 ret = OB_TIMEOUT; LOG_WARN("query is timeout", K(ret), "timeout_ts", THIS_WORKER.get_timeout_ts(), "start_time", my_session_.get_query_start_time()); } else if (stmt::T_PREPARE != stmt_type_) { int64_t retry = 0; if (OB_SUCC(ret)) { do { ret = do_open_plan(get_exec_context()); } while (transaction_set_violation_and_retry(ret, retry)); } } } } return ret; } int ObResultSet::open() { int ret = OB_SUCCESS; my_session_.set_process_query_time(ObTimeUtility::current_time()); LinkExecCtxGuard link_guard(my_session_, get_exec_context()); FLTSpanGuard(open); if (lib::is_oracle_mode() && get_exec_context().get_nested_level() >= OB_MAX_RECURSIVE_SQL_LEVELS) { ret = OB_ERR_RECURSIVE_SQL_LEVELS_EXCEEDED; LOG_ORACLE_USER_ERROR(OB_ERR_RECURSIVE_SQL_LEVELS_EXCEEDED, OB_MAX_RECURSIVE_SQL_LEVELS); } else if (OB_FAIL(execute())) { LOG_WARN("execute plan failed", K(ret)); } else if (OB_FAIL(open_result())) { LOG_WARN("open result set failed", K(ret)); } if (OB_SUCC(ret)) { ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); if (OB_NOT_NULL(cmd_)) { // cmd not set } else if (ret != OB_NOT_INIT && OB_ISNULL(physical_plan_)) { LOG_WARN("empty physical plan", K(ret)); } else if (OB_FAIL(ret)) { physical_plan_->set_is_last_exec_succ(false); } else { physical_plan_->set_is_last_exec_succ(true); } } return ret; } int ObResultSet::execute() { int ret = common::OB_SUCCESS; ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); // if (stmt::T_PREPARE != stmt_type_ // && stmt::T_DEALLOCATE != stmt_type_) { if (NULL != physical_plan_) { ret = open_plan(); } else if (NULL != cmd_) { ret = open_cmd(); } else { // inner sql executor, no plan or cmd. do nothing. } // } else { // //T_PREPARE//T_DEALLOCATE do nothing // } set_errcode(ret); return ret; } int ObResultSet::open_result() { int ret = OB_SUCCESS; ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); if (NULL != physical_plan_) { if (OB_ISNULL(exec_result_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("exec result is null", K(ret)); } else if (OB_FAIL(exec_result_->open(get_exec_context()))) { if (OB_TRANSACTION_SET_VIOLATION != ret && OB_TRY_LOCK_ROW_CONFLICT != ret) { SQL_LOG(WARN, "fail open main query", K(ret)); } } else if (OB_FAIL(drive_dml_query())) { LOG_WARN("fail to drive dml query", K(ret)); } else if ((stmt::T_INSERT == get_stmt_type()) && (ObTableDirectInsertService::is_direct_insert(*physical_plan_))) { // for insert /*+ append */ into select clause if (OB_FAIL(ObTableDirectInsertService::commit_direct_insert(get_exec_context(), *physical_plan_))) { LOG_WARN("fail to commit direct insert", KR(ret)); } } } if (OB_SUCC(ret)) { if (!is_inner_result_set_ && OB_FAIL(set_mysql_info())) { SQL_LOG(WARN, "fail to get mysql info", K(ret)); } else if (NULL != get_exec_context().get_physical_plan_ctx()) { SQL_LOG(DEBUG, "get affected row", K(get_stmt_type()), K(get_exec_context().get_physical_plan_ctx()->get_affected_rows())); set_affected_rows(get_exec_context().get_physical_plan_ctx()->get_affected_rows()); } if (OB_SUCC(ret) && get_stmt_type() == stmt::T_ANONYMOUS_BLOCK) { // Compatible with oracle anonymous block affect rows setting set_affected_rows(1); } } set_errcode(ret); return ret; } /* on_cmd_execute - prepare before start execute cmd * * some command implicit commit current transaction state, * include release savepoints */ int ObResultSet::on_cmd_execute() { int ret = OB_SUCCESS; bool ac = true; if (OB_ISNULL(cmd_)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("invalid inner state", K(cmd_)); } else if (cmd_->cause_implicit_commit()) { // not allow implicit commit in xa trans if (my_session_.associated_xa()) { ret = OB_TRANS_XA_ERR_COMMIT; get_exec_context().set_need_disconnect(false); const transaction::ObTxDesc *tx_desc = my_session_.get_tx_desc(); LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), KPC(tx_desc)); } else { // commit current open transaction, synchronously if (OB_FAIL(ObSqlTransControl::implicit_end_trans(get_exec_context(), false))) { SQL_ENG_LOG(WARN, "fail end implicit trans on cmd execute", K(ret)); } } } return ret; } // open transaction if need (eg. ac=1 DML) int ObResultSet::start_stmt() { NG_TRACE(sql_start_stmt_begin); int ret = OB_SUCCESS; bool ac = true; ObPhysicalPlan* phy_plan = static_cast(cache_obj_guard_.get_cache_obj()); if (OB_ISNULL(phy_plan)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid inner state", K(phy_plan)); } else if (OB_ISNULL(phy_plan->get_root_op_spec())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("root_op_spec of phy_plan is NULL", K(phy_plan), K(ret)); } else if (OB_FAIL(my_session_.get_autocommit(ac))) { LOG_WARN("fail to get autocommit", K(ret)); } else if (phy_plan->is_link_dml_plan() || phy_plan->has_link_sfd()) { if (ac) { my_session_.set_autocommit(false); my_session_.set_restore_auto_commit(); // auto commit will be restored at tm_rollback or tm_commit; } LOG_DEBUG("dblink xa trascaction need skip start_stmt()", K(PHY_LINK_DML == phy_plan->get_root_op_spec()->type_), K(my_session_.get_dblink_context().is_dblink_xa_tras())); } else { if (-1 != phy_plan->tx_id_) { const transaction::ObTransID tx_id(phy_plan->tx_id_); } else { LOG_DEBUG("recover dblink xa skip", K(phy_plan->tx_id_)); } bool in_trans = my_session_.get_in_transaction(); // 1. 无论是否处于事务中,只要不是select并且plan为REMOTE的,就反馈给客户端不命中 // 2. feedback this misshit to obproxy (bug#6255177) // 3. 对于multi-stmt,只反馈首个partition hit信息给客户端 // 4. 需要考虑到重试的情况,需要反馈给客户端的是首个重试成功的partition hit。 if (OB_SUCC(ret) && stmt::T_SELECT != stmt_type_) { my_session_.partition_hit().try_set_bool( OB_PHY_PLAN_REMOTE != phy_plan->get_plan_type()); } if (OB_FAIL(ret)) { // do nothing } else if (ObSqlTransUtil::is_remote_trans( ac, in_trans, phy_plan->get_plan_type())) { // pass } else if (OB_LIKELY(phy_plan->is_need_trans())) { if (get_trans_state().is_start_stmt_executed()) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("invalid transaction state", K(get_trans_state().is_start_stmt_executed())); } else if (OB_FAIL(ObSqlTransControl::start_stmt(get_exec_context()))) { SQL_LOG(WARN, "fail to start stmt", K(ret), K(phy_plan->get_dependency_table())); } else { auto literal_stmt_type = literal_stmt_type_ != stmt::T_NONE ? literal_stmt_type_ : stmt_type_; my_session_.set_first_need_txn_stmt_type(literal_stmt_type); } get_trans_state().set_start_stmt_executed(OB_SUCC(ret)); } } NG_TRACE(sql_start_stmt_end); return ret; } int ObResultSet::end_stmt(const bool is_rollback) { int ret = OB_SUCCESS; NG_TRACE(start_end_stmt); if (get_trans_state().is_start_stmt_executed() && get_trans_state().is_start_stmt_success()) { ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); if (OB_ISNULL(physical_plan_)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("invalid inner state", K(physical_plan_)); } else if (physical_plan_->is_need_trans()) { if (OB_FAIL(ObSqlTransControl::end_stmt(get_exec_context(), is_rollback))) { SQL_LOG(WARN, "fail to end stmt", K(ret), K(is_rollback)); } } get_trans_state().clear_start_stmt_executed(); } else { // do nothing } if (need_revert_tx_) { // ignore ret } NG_TRACE(end_stmt); return ret; } //@notice: this interface can not be used in the interface of ObResultSet //otherwise, will cause the exec context reference mistake in session info //see the call reference in LinkExecCtxGuard int ObResultSet::get_next_row(const common::ObNewRow *&row) { LinkExecCtxGuard link_guard(my_session_, get_exec_context()); return inner_get_next_row(row); } OB_INLINE int ObResultSet::inner_get_next_row(const common::ObNewRow *&row) { int &ret = errcode_; ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); // last_exec_succ default values is true if (OB_LIKELY(NULL != physical_plan_)) { // take this branch more frequently if (OB_ISNULL(exec_result_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("exec result is null", K(ret)); } else if (OB_FAIL(exec_result_->get_next_row(get_exec_context(), row))) { if (OB_ITER_END != ret) { LOG_WARN("get next row from exec result failed", K(ret)); // marked last execute status physical_plan_->set_is_last_exec_succ(false); } } else { return_rows_++; } } else if (NULL != cmd_) { if (is_pl_stmt(static_cast(cmd_->get_cmd_type()))) { if (return_rows_ > 0) { errcode_ = OB_ITER_END; } else { row = get_exec_context().get_output_row(); return_rows_++; } } else { ret = OB_ERR_UNEXPECTED; _OB_LOG(ERROR, "should not call get_next_row in CMD SQL"); } } else { _OB_LOG(ERROR, "phy_plan not init"); ret = OB_NOT_INIT; } return ret; } // 触发本错误的条件: A、B两个SQL,同时修改了某几行数据(修改内容有交集)。 // 微观上,修改操作要先读出符合条件的行,然后再更新。在读的时候,会记录一个版本号, // 更新的时候,会检查版本号是否有变化。如果有变化,则说明在读之后、写之前,数据被其它 // 更新操作修改了。这个时候如果强行更新,则会违反一致性。举个例子: // tbl里包含一行数据 0,希望通过A、B两个并发的操作让它变成2: // A: update tbl set a = a + 1; // B: update tbl set a = a + 1; // 如果在A读a之后、更新a之前,B完成了整个更新操作,则B的更新会丢失,最终结果为a=1 // // 解决方案:在更新数据之前检查版本号,发现版本号不一致,则抛出OB_TRANSACTION_SET_VIOLATION // 错误,由SQL层在这里重试,保证读写版本号一致。 // // Note: 由于本重试不涉及到刷新Schema或更新Location Cache,所以无需做整个语句级重试, // 执行级别的重试即可。 bool ObResultSet::transaction_set_violation_and_retry(int &err, int64_t &retry_times) { bool bret = false; ObSqlCtx *sql_ctx = get_exec_context().get_sql_ctx(); bool is_batched_stmt = false; if (sql_ctx != nullptr) { is_batched_stmt = sql_ctx->multi_stmt_item_.is_batched_multi_stmt(); } if ((OB_SNAPSHOT_DISCARDED == err || OB_TRANSACTION_SET_VIOLATION == err) && retry_times < TRANSACTION_SET_VIOLATION_MAX_RETRY && !is_batched_stmt) { //batched stmt本身是一种优化,其中cursor状态在快速重路径中无法维护,不走快速重试路径 ObTxIsolationLevel isolation = my_session_.get_tx_isolation(); bool is_isolation_RR_or_SE = (isolation == ObTxIsolationLevel::RR || isolation == ObTxIsolationLevel::SERIAL); // bug#6361189 pass err to force rollback stmt in do_close_plan() if (OB_TRANSACTION_SET_VIOLATION == err && 0 == retry_times && !is_isolation_RR_or_SE) { // TSC错误重试时,只在第一次打印WARN日志 LOG_WARN_RET(err, "transaction set consistency violation, will retry"); } int ret = do_close_plan(err, get_exec_context()); ObPhysicalPlanCtx *plan_ctx = get_exec_context().get_physical_plan_ctx(); if (OB_NOT_NULL(plan_ctx)) { plan_ctx->reset_for_quick_retry(); } if (OB_SUCCESS != ret) { // 注意:如果do_close失败,会影响到TSC冲突重试,导致本该重试却实际没有重试的问题 // 这里如果出现bug,则需要看每个涉及到的Operator的close实现 LOG_WARN("failed to close plan", K(err), K(ret)); } else { // OB_SNAPSHOT_DISCARDED should not retry now, see: // // so we remove this condition: OB_TRANSACTION_SET_VIOLATION == err if (/*OB_TRANSACTION_SET_VIOLATION == err &&*/ is_isolation_RR_or_SE) { // rewrite err in ObQueryRetryCtrl::test_and_save_retry_state(). // err = OB_TRANS_CANNOT_SERIALIZE; bret = false; } else { ++retry_times; bret = true; } } LOG_DEBUG("transaction set consistency violation and retry", "retry", bret, K(retry_times), K(err)); } return bret; } OB_INLINE int ObResultSet::do_open_plan(ObExecContext &ctx) { ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); NG_TRACE_EXT(do_open_plan_begin, OB_ID(plan_id), physical_plan_->get_plan_id()); int ret = OB_SUCCESS; ctx.reset_op_env(); exec_result_ = &(ctx.get_task_exec_ctx().get_execute_result()); if (stmt::T_PREPARE != stmt_type_) { if (OB_FAIL(ctx.init_phy_op(physical_plan_->get_phy_operator_size()))) { LOG_WARN("fail init exec phy op ctx", K(ret)); } else if (OB_FAIL(ctx.init_expr_op(physical_plan_->get_expr_operator_size()))) { LOG_WARN("fail init exec expr op ctx", K(ret)); } } if (OB_SUCC(ret) && my_session_.get_ddl_info().is_ddl() && stmt::T_INSERT == get_stmt_type()) { if (OB_FAIL(ObDDLUtil::clear_ddl_checksum(physical_plan_))) { LOG_WARN("fail to clear ddl checksum", K(ret)); } } // for insert /*+ append */ into select clause if (OB_SUCC(ret) && (stmt::T_INSERT == get_stmt_type()) && (ObTableDirectInsertService::is_direct_insert(*physical_plan_))) { if (OB_FAIL(ObTableDirectInsertService::start_direct_insert(ctx, *physical_plan_))) { LOG_WARN("fail to start direct insert", KR(ret)); } } if (OB_FAIL(ret)) { } else if (OB_FAIL(start_stmt())) { LOG_WARN("fail start stmt", K(ret)); } else { /* 将exec_result_设置到executor的运行时环境中,用于返回数据 */ /* 执行plan, * 无论是本地、远程、还是分布式plan,除RootJob外,其余都会在execute_plan函数返回之前执行完毕 * exec_result_负责执行最后一个Job: RootJob **/ if (OB_FAIL(executor_.init(physical_plan_))) { SQL_LOG(WARN, "fail to init executor", K(ret), K(physical_plan_)); } else if (OB_FAIL(executor_.execute_plan(ctx))) { SQL_LOG(WARN, "fail execute plan", K(ret)); } } NG_TRACE(do_open_plan_end); return ret; } int ObResultSet::set_mysql_info() { int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = get_exec_context().get_physical_plan_ctx(); int64_t pos = 0; if (OB_ISNULL(plan_ctx)) { ret = OB_ERR_UNEXPECTED; SQL_LOG(WARN, "fail to get physical plan ctx"); } else if (stmt::T_UPDATE == get_stmt_type()) { int result_len = snprintf(message_ + pos, MSG_SIZE - pos, OB_UPDATE_MSG_FMT, plan_ctx->get_row_matched_count(), plan_ctx->get_row_duplicated_count(), warning_count_); if (OB_UNLIKELY(result_len < 0) || OB_UNLIKELY(result_len >= MSG_SIZE - pos)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("fail to snprintf to buff", K(ret)); } } else if (stmt::T_REPLACE == get_stmt_type() || stmt::T_INSERT == get_stmt_type()) { if (plan_ctx->get_row_matched_count() <= 1) { //nothing to do } else { int result_len = snprintf(message_ + pos, MSG_SIZE - pos, OB_INSERT_MSG_FMT, plan_ctx->get_row_matched_count(), plan_ctx->get_row_duplicated_count(), warning_count_); if (OB_UNLIKELY(result_len < 0) || OB_UNLIKELY(result_len >= MSG_SIZE - pos)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("fail to snprintf to buff", K(ret)); } } } else if (stmt::T_LOAD_DATA == get_stmt_type()) { int result_len = snprintf(message_ + pos, MSG_SIZE - pos, OB_LOAD_DATA_MSG_FMT, plan_ctx->get_row_matched_count(), plan_ctx->get_row_deleted_count(), plan_ctx->get_row_duplicated_count(), warning_count_); if (OB_UNLIKELY(result_len < 0) || OB_UNLIKELY(result_len >= MSG_SIZE - pos)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("fail to snprintf to buff", K(ret)); } } else { //nothing to do } return ret; } OB_INLINE void ObResultSet::store_affected_rows(ObPhysicalPlanCtx &plan_ctx) { int64_t affected_row = 0; if (!ObStmt::is_dml_stmt(get_stmt_type()) && (lib::is_oracle_mode() || !is_pl_stmt(get_stmt_type()))) { affected_row = 0; } else if (stmt::T_SELECT == get_stmt_type()) { affected_row = lib::is_oracle_mode() ? plan_ctx.get_affected_rows() : -1; } else { affected_row = get_affected_rows(); } NG_TRACE_EXT(affected_rows, OB_ID(affected_rows), affected_row); my_session_.set_affected_rows(affected_row); } OB_INLINE void ObResultSet::store_found_rows(ObPhysicalPlanCtx &plan_ctx) { int64_t rows = 1; //如果是execute语句的话,则需要判断对应prepare语句的类型,如果是select的话,则会影响found_rows的值; //to do by rongxuan.lc 20151127 if (plan_ctx.is_affect_found_row()) { if (OB_UNLIKELY(stmt::T_EXPLAIN == get_stmt_type())) { rows = 0; my_session_.set_found_rows(rows); } else { int64_t found_rows = -1; found_rows = plan_ctx.get_found_rows(); rows = found_rows == 0 ? return_rows_ : found_rows; my_session_.set_found_rows(rows); NG_TRACE_EXT(store_found_rows, OB_ID(found_rows), found_rows, OB_ID(return_rows), return_rows_); } } return; } int ObResultSet::update_last_insert_id() { return store_last_insert_id(get_exec_context()); } int ObResultSet::update_last_insert_id_to_client() { int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = get_exec_context().get_physical_plan_ctx(); if (OB_ISNULL(plan_ctx)) { ret = OB_ERR_UNEXPECTED; SQL_LOG(WARN, "get plan ctx is NULL", K(ret)); } else { set_last_insert_id_to_client(plan_ctx->calc_last_insert_id_to_client()); } return ret; } int ObResultSet::update_is_result_accurate() { int ret = OB_SUCCESS; if (stmt::T_SELECT == stmt_type_) { ObPhysicalPlanCtx *plan_ctx = get_exec_context().get_physical_plan_ctx(); bool old_is_result_accurate = true; if (OB_ISNULL(plan_ctx)) { ret = OB_ERR_UNEXPECTED; SQL_LOG(WARN, "get plan ctx is NULL", K(ret)); } else if (OB_FAIL(my_session_.get_is_result_accurate(old_is_result_accurate))) { SQL_LOG(WARN, "faile to get is_result_accurate", K(ret)); } else { bool is_result_accurate = plan_ctx->is_result_accurate(); SQL_LOG(DEBUG, "debug is_result_accurate for session", K(is_result_accurate), K(old_is_result_accurate), K(ret)); if (is_result_accurate != old_is_result_accurate) { // FIXME @qianfu 暂时写成update_sys_variable函数,以后再实现一个可以传入ObSysVarClassType并且 // 和set系统变量语句走同一套逻辑的函数,在这里调用 if (OB_FAIL(my_session_.update_sys_variable(SYS_VAR_IS_RESULT_ACCURATE, is_result_accurate ? 1 : 0))) { LOG_WARN("fail to update result accurate", K(ret)); } } } } return ret; } int ObResultSet::store_last_insert_id(ObExecContext &ctx) { int ret = OB_SUCCESS; ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); if (ObStmt::is_dml_stmt(stmt_type_) && OB_LIKELY(NULL != physical_plan_) && OB_LIKELY(!physical_plan_->is_affected_last_insert_id())) { //nothing to do } else { ObPhysicalPlanCtx *plan_ctx = ctx.get_physical_plan_ctx(); if (OB_ISNULL(plan_ctx)) { ret = OB_ERR_UNEXPECTED; SQL_LOG(WARN, "get plan ctx is NULL", K(plan_ctx)); } else { uint64_t last_insert_id_session = plan_ctx->calc_last_insert_id_session(); SQL_LOG(DEBUG, "debug last_insert_id for session", K(last_insert_id_session), K(ret)); SQL_LOG(DEBUG, "debug last_insert_id changed", K(ret), "last_insert_id_changed", plan_ctx->get_last_insert_id_changed()); if (plan_ctx->get_last_insert_id_changed()) { ObObj last_insert_id; last_insert_id.set_uint64(last_insert_id_session); // FIXME @qianfu 暂时写成update_sys_variable函数,以后再实现一个可以传入ObSysVarClassType并且 // 和set系统变量语句走同一套逻辑的函数,在这里调用 if (OB_FAIL(my_session_.update_sys_variable(SYS_VAR_LAST_INSERT_ID, last_insert_id))) { LOG_WARN("fail to update last_insert_id", K(ret)); } else if (OB_FAIL(my_session_.update_sys_variable(SYS_VAR_IDENTITY, last_insert_id))) { LOG_WARN("succ update last_insert_id, but fail to update identity", K(ret)); } else { NG_TRACE_EXT(last_insert_id, OB_ID(last_insert_id), last_insert_id_session); } } if (OB_SUCC(ret)) { // TODO when does observer should return last_insert_id to client? set_last_insert_id_to_client(plan_ctx->calc_last_insert_id_to_client()); SQL_LOG(DEBUG, "zixu debug", K(ret), "last_insert_id_to_client", get_last_insert_id_to_client()); } } } return ret; } bool ObResultSet::need_rollback(int ret, int errcode, bool is_error_ignored) const { bool bret = false; if (OB_SUCCESS != ret //当前close执行语句出错 || (errcode != OB_SUCCESS && errcode != OB_ITER_END) //errcode是open阶段特意保存下来的错误 || is_error_ignored) { //曾经出错,但是被忽略了; bret = true; } return bret; } OB_INLINE int ObResultSet::do_close_plan(int errcode, ObExecContext &ctx) { int ret = common::OB_SUCCESS; int pret = OB_SUCCESS; int sret = OB_SUCCESS; NG_TRACE(close_plan_begin); ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); if (OB_LIKELY(NULL != physical_plan_)) { // executor_.close要在exec_result_.close之后,因为主线程get_next_row的时候会占着锁, // exec_result_.close会释放锁,而executor_.close需要拿写锁来删掉该scheduler。 // FIXME qianfu.zpf 这里本来应该要调度线程结束才调用exec_result_.close的。调度线程给主线程push完最后一个 // task结果之后主线程就会往下走,有可能在调度线程结束之前主线程就调用了exec_result_.close, // 但是由于目前调度线程给主线程push完最后一个task结果之后, 调度线程不会用到exec_result_.close中释放的变量, // 因此目前这样写暂时是没有问题的。 // 以后要修正这里的逻辑。 if (OB_ISNULL(exec_result_)) { ret = OB_NOT_INIT; LOG_WARN("exec result is null", K(ret)); } else if (OB_FAIL(exec_result_->close(ctx))) { SQL_LOG(WARN, "fail close main query", K(ret)); } // 通知该plan的所有task删掉对应的中间结果 int close_ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = NULL; if (OB_ISNULL(plan_ctx = get_exec_context().get_physical_plan_ctx())) { close_ret = OB_ERR_UNEXPECTED; LOG_WARN("physical plan ctx is null"); } else if (OB_SUCCESS != (close_ret = executor_.close(ctx))) { // executor_.close里面会等到调度线程结束才返回。 SQL_LOG(WARN, "fail to close executor", K(ret), K(close_ret)); } ObPxAdmission::exit_query_admission(my_session_, get_exec_context(), get_stmt_type(), *get_physical_plan()); // Finishing direct-insert must be executed after ObPxTargetMgr::release_target() if ((OB_SUCCESS == close_ret) && (OB_SUCCESS == errcode || OB_ITER_END == errcode) && (stmt::T_INSERT == get_stmt_type()) && (ObTableDirectInsertService::is_direct_insert(*physical_plan_))) { // for insert /*+ append */ into select clause int tmp_ret = OB_SUCCESS; if (OB_TMP_FAIL(ObTableDirectInsertService::finish_direct_insert(ctx, *physical_plan_))) { errcode_ = tmp_ret; // record error code errcode = tmp_ret; LOG_WARN("fail to finish direct insert", KR(tmp_ret)); } } // // 必须要在executor_.execute_plan运行之后再调用exec_result_的一系列函数。 // if (OB_FAIL(exec_result_.close(ctx))) { // SQL_LOG(WARN, "fail close main query", K(ret)); // } // 无论如何都reset_op_ctx bool err_ignored = false; if (OB_ISNULL(plan_ctx)) { LOG_ERROR("plan is not NULL, but plan ctx is NULL", K(ret), K(errcode)); } else { err_ignored = plan_ctx->is_error_ignored(); } bool rollback = need_rollback(ret, errcode, err_ignored); get_exec_context().set_errcode(errcode); sret = end_stmt(rollback || OB_SUCCESS != pret); // SQL_LOG(INFO, "end_stmt err code", K_(errcode), K(ret), K(pret), K(sret)); // if branch fail is returned from end_stmt, then return it first if (OB_TRANS_XA_BRANCH_FAIL == sret) { ret = OB_TRANS_XA_BRANCH_FAIL; } else if (OB_FAIL(ret)) { // nop } else if (OB_SUCCESS != pret) { ret = pret; } else if (OB_SUCCESS != sret) { ret = sret; } if (OB_SUCC(ret)) { if (physical_plan_->need_record_plan_info()) { if (ctx.get_feedback_info().is_valid() && physical_plan_->get_logical_plan().is_valid() && OB_FAIL(physical_plan_->set_feedback_info(ctx))) { LOG_WARN("failed to set feedback info", K(ret)); } else { physical_plan_->set_record_plan_info(false); } } } } else { ret = OB_ERR_UNEXPECTED; } // 无论如何都reset掉executor_,否则前面调executor_.init的时候可能会报init twice executor_.reset(); NG_TRACE(close_plan_end); return ret; } int ObResultSet::close() { int ret = OB_SUCCESS; LinkExecCtxGuard link_guard(my_session_, get_exec_context()); FLTSpanGuard(close); int do_close_plan_ret = OB_SUCCESS; ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); if (OB_LIKELY(NULL != physical_plan_)) { if (OB_FAIL(my_session_.reset_tx_variable_if_remote_trans( physical_plan_->get_plan_type()))) { LOG_WARN("fail to reset tx_read_only if it is remote trans", K(ret)); } // 无论如何必须执行do_close_plan if (OB_UNLIKELY(OB_SUCCESS != (do_close_plan_ret = do_close_plan(errcode_, get_exec_context())))) { SQL_LOG(WARN, "fail close main query", K(ret), K(do_close_plan_ret)); } if (OB_SUCC(ret)) { ret = do_close_plan_ret; } } else if (NULL != cmd_) { ret = OB_SUCCESS; // cmd mode always return true in close phase } else { // inner sql executor, no plan or cmd. do nothing. } // open, get_next_row无论是否成功,close总是要调用 // 下面的逻辑保证对外吐出的是首次出现的错误码。 by xiaochu.yh // 保存close之前的错误码,用来判断本次stmt是否需要回滚; by rongxuan.lc if (OB_SUCCESS != errcode_ && OB_ITER_END != errcode_) { ret = errcode_; } if (OB_SUCC(ret)) { ObPhysicalPlanCtx *plan_ctx = get_exec_context().get_physical_plan_ctx(); if (OB_ISNULL(plan_ctx)) { ret = OB_NOT_INIT; LOG_WARN("result set isn't init", K(ret)); } else { store_affected_rows(*plan_ctx); store_found_rows(*plan_ctx); } } if (OB_SUCC(ret) && get_stmt_type() == stmt::T_SELECT) { if (OB_FAIL(update_is_result_accurate())) { SQL_LOG(WARN, "failed to update is result_accurate", K(ret)); } } // set last_insert_id int ins_ret = OB_SUCCESS; if (OB_SUCCESS != ret && get_stmt_type() != stmt::T_INSERT && get_stmt_type() != stmt::T_REPLACE) { // ignore when OB_SUCCESS != ret and stmt like select/update/delete... executed } else if (OB_SUCCESS != (ins_ret = store_last_insert_id(get_exec_context()))) { SQL_LOG(WARN, "failed to store last_insert_id", K(ret), K(ins_ret)); } if (OB_SUCC(ret)) { ret = ins_ret; } if (OB_SUCC(ret)) { if (!get_exec_context().get_das_ctx().is_partition_hit()) { my_session_.partition_hit().try_set_bool(false); } } int prev_ret = ret; bool async = false; // for debug purpose if (OB_TRANS_XA_BRANCH_FAIL == ret) { if (my_session_.associated_xa()) { //兼容oracle,这里需要重置session状态 LOG_WARN("branch fail in global transaction", KPC(my_session_.get_tx_desc())); ObSqlTransControl::clear_xa_branch(my_session_.get_xid(), my_session_.get_tx_desc()); my_session_.reset_tx_variable(); my_session_.disassociate_xa(); } } else if (OB_NOT_NULL(physical_plan_)) { //Because of the async close result we need set the partition_hit flag //to the call back param, than close the result. //But the das framwork set the patition_hit after result is closed. //So we need to set the partition info at here. if (is_end_trans_async()) { ObCurTraceId::TraceId *cur_trace_id = NULL; if (OB_ISNULL(cur_trace_id = ObCurTraceId::get_trace_id())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("current trace id is NULL", K(ret)); set_end_trans_async(false); } else { observer::ObSqlEndTransCb &sql_end_cb = my_session_.get_mysql_end_trans_cb(); ObEndTransCbPacketParam pkt_param; int fill_ret = OB_SUCCESS; fill_ret = sql_end_cb.set_packet_param(pkt_param.fill(*this, my_session_, *cur_trace_id)); if (OB_SUCCESS != fill_ret) { LOG_WARN("fail set packet param", K(ret)); set_end_trans_async(false); } } } ret = auto_end_plan_trans(*physical_plan_, ret, async); } if (is_user_sql_ && my_session_.need_reset_package()) { // need_reset_package is set, it must be reset package, wether exec succ or not. int tmp_ret = my_session_.reset_all_package_state_by_dbms_session(true); if (OB_SUCCESS != tmp_ret) { LOG_WARN("reset all package fail. ", K(tmp_ret), K(ret)); ret = OB_SUCCESS == ret ? tmp_ret : ret; } } //NG_TRACE_EXT(result_set_close, OB_ID(ret), ret, OB_ID(arg1), prev_ret, //OB_ID(arg2), ins_ret, OB_ID(arg3), errcode_, OB_ID(async), async); return ret; // 后面所有的操作都通过callback来完成 } // 异步调用end_trans // 1. 仅仅针对AC=1的phy plan,不针对cmd (除了还没实现的commit/rollback) // 2. TODO:对于commit/rollback这个cmd,后面也需要走这个路径。现在还是走同步Callback。 OB_INLINE int ObResultSet::auto_end_plan_trans(ObPhysicalPlan& plan, int ret, bool &async) { NG_TRACE(auto_end_plan_begin); bool in_trans = my_session_.is_in_transaction(); bool ac = true; bool explicit_trans = my_session_.has_explicit_start_trans(); bool is_rollback = false; my_session_.get_autocommit(ac); async = false; LOG_TRACE("auto_end_plan_trans.start", K(ret), K(in_trans), K(ac), K(explicit_trans), K(is_async_end_trans_submitted())); // explicit start trans will disable auto-commit if (!explicit_trans && ac) { ObPhysicalPlanCtx *plan_ctx = NULL; if (OB_ISNULL(plan_ctx = get_exec_context().get_physical_plan_ctx())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("physical plan ctx is null, won't call end trans!!!", K(ret)); } else { //bool is_rollback = (OB_FAIL(ret) || plan_ctx->is_force_rollback()); is_rollback = need_rollback(OB_SUCCESS, ret, plan_ctx->is_error_ignored()); // 对于UPDATE等异步提交的语句,如果需要重试,那么中途的rollback也走同步接口 if (OB_LIKELY(false == is_end_trans_async()) || OB_LIKELY(false == is_user_sql_)) { // 如果没有设置end_trans_cb,就走同步接口。这个主要是为了InnerSQL提供的。 // 因为InnerSQL没有走Obmp_query接口,而是直接操作ResultSet int save_ret = ret; if (OB_FAIL(ObSqlTransControl::implicit_end_trans(get_exec_context(), is_rollback))) { if (OB_REPLICA_NOT_READABLE != ret) { LOG_WARN("sync end trans callback return an error!", K(ret), K(is_rollback), KPC(my_session_.get_tx_desc())); } } ret = OB_SUCCESS != save_ret? save_ret : ret; async = false; } else { // 外部SQL请求,走异步接口 int save_ret = ret; my_session_.get_end_trans_cb().set_last_error(ret); ret = ObSqlTransControl::implicit_end_trans(get_exec_context(), is_rollback, &my_session_.get_end_trans_cb()); // NOTE: async callback client will not issued if: // 1) it is a rollback, which will succeed immediately // 2) the commit submit/starting failed, in this case // the connection will be released // so the error code should not override, and return to plan-driver // to decide whether a packet should be sent to client ret = save_ret == OB_SUCCESS ? ret : save_ret; async = true; } get_trans_state().clear_start_trans_executed(); } } NG_TRACE(auto_end_plan_end); LOG_TRACE("auto_end_plan_trans.end", K(ret), K(in_trans), K(ac), K(explicit_trans), K(is_rollback), K(async), K(is_async_end_trans_submitted())); return ret; } void ObResultSet::set_statement_name(const common::ObString name) { statement_name_ = name; } int ObResultSet::from_plan(const ObPhysicalPlan &phy_plan, const ObIArray &raw_params) { int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = NULL; if (OB_ISNULL(plan_ctx = get_exec_context().get_physical_plan_ctx())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("physical plan ctx is null or ref handle is invalid", K(ret), K(plan_ctx)); } else if (phy_plan.contain_paramed_column_field() && OB_FAIL(copy_field_columns(phy_plan))) { // 因为会修改ObField中的cname_,所以这里深拷计划中的field_columns_ LOG_WARN("failed to copy field columns", K(ret)); } else if (phy_plan.contain_paramed_column_field() && OB_FAIL(construct_field_name(raw_params, false))) { LOG_WARN("failed to construct field name", K(ret)); } else { int64_t ps_param_count = plan_ctx->get_orig_question_mark_cnt(); p_field_columns_ = phy_plan.contain_paramed_column_field() ? &field_columns_ : &phy_plan.get_field_columns(); p_returning_param_columns_ = &phy_plan.get_returning_param_fields(); stmt_type_ = phy_plan.get_stmt_type(); literal_stmt_type_ = phy_plan.get_literal_stmt_type(); is_returning_ = phy_plan.is_returning(); plan_ctx->set_is_affect_found_row(phy_plan.is_affect_found_row()); if (is_ps_protocol() && ps_param_count != phy_plan.get_param_fields().count()) { if (OB_FAIL(reserve_param_columns(ps_param_count))) { LOG_WARN("reserve param columns failed", K(ret), K(ps_param_count)); } for (int64_t i = 0; OB_SUCC(ret) && i < ps_param_count; ++i) { ObField param_field; param_field.type_.set_type(ObIntType); // @bug param_field.cname_ = ObString::make_string("?"); OZ (add_param_column(param_field), K(param_field), K(i), K(ps_param_count)); } LOG_DEBUG("reset param count ", K(ps_param_count), K(plan_ctx->get_orig_question_mark_cnt()), K(phy_plan.get_returning_param_fields().count()), K(phy_plan.get_param_fields().count())); } else { p_param_columns_ = &phy_plan.get_param_fields(); } } return ret; } int ObResultSet::to_plan(const PlanCacheMode mode, ObPhysicalPlan *phy_plan) { int ret = OB_SUCCESS; if (OB_ISNULL(phy_plan)) { LOG_ERROR("invalid argument", K(phy_plan)); ret = OB_INVALID_ARGUMENT; } else { if (OB_FAIL(phy_plan->set_field_columns(field_columns_))) { LOG_WARN("Failed to copy field info to plan", K(ret)); } else if ((PC_PS_MODE == mode || PC_PL_MODE == mode) && OB_FAIL(phy_plan->set_param_fields(param_columns_))) { // param fields is only needed ps mode LOG_WARN("failed to copy param field to plan", K(ret)); } else if ((PC_PS_MODE == mode || PC_PL_MODE == mode) && OB_FAIL(phy_plan->set_returning_param_fields(returning_param_columns_))) { // returning param fields is only needed ps mode LOG_WARN("failed to copy returning param field to plan", K(ret)); } } return ret; } int ObResultSet::get_read_consistency(ObConsistencyLevel &consistency) { consistency = INVALID_CONSISTENCY; int ret = OB_SUCCESS; ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); if (OB_ISNULL(physical_plan_) || OB_ISNULL(exec_ctx_) || OB_ISNULL(exec_ctx_->get_sql_ctx())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("physical_plan", K_(physical_plan), K(exec_ctx_->get_sql_ctx()), K(ret)); } else { const ObPhyPlanHint &phy_hint = physical_plan_->get_phy_plan_hint(); if (stmt::T_SELECT == stmt_type_) { // select才有weak if (exec_ctx_->get_sql_ctx()->is_protocol_weak_read_) { consistency = WEAK; } else if (OB_UNLIKELY(phy_hint.read_consistency_ != INVALID_CONSISTENCY)) { consistency = phy_hint.read_consistency_; } else { consistency = my_session_.get_consistency_level(); } } else { consistency = STRONG; } } return ret; } int ObResultSet::init_cmd_exec_context(ObExecContext &exec_ctx) { int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(exec_ctx); void *buf = NULL; if (OB_ISNULL(cmd_) || OB_ISNULL(plan_ctx)) { ret = OB_NOT_INIT; LOG_WARN("cmd or ctx is NULL", K(ret), K(cmd_), K(plan_ctx)); ret = OB_ERR_UNEXPECTED; } else if ((ObStmt::is_ddl_stmt(stmt_type_, true) || ObStmt::is_dcl_stmt(stmt_type_)) && stmt::T_VARIABLE_SET != stmt_type_ && my_session_.associated_xa()) { ret = OB_TRANS_XA_ERR_COMMIT; const transaction::ObTxDesc *tx_desc = my_session_.get_tx_desc(); LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), KPC(tx_desc)); } else if (OB_ISNULL(buf = get_mem_pool().alloc(sizeof(ObNewRow)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", K(sizeof(ObNewRow)), K(ret)); } else { exec_ctx.set_output_row(new(buf)ObNewRow()); exec_ctx.set_field_columns(&field_columns_); int64_t plan_timeout = 0; if (OB_FAIL(my_session_.get_query_timeout(plan_timeout))) { LOG_WARN("fail to get query timeout", K(ret)); } else { int64_t start_time = my_session_.get_query_start_time(); plan_ctx->set_timeout_timestamp(start_time + plan_timeout); THIS_WORKER.set_timeout_ts(plan_ctx->get_timeout_timestamp()); } } return ret; } // obmp_query中重试整个SQL之前,可能需要调用本接口来刷新Location,以避免总是发给了错误的服务器 void ObResultSet::refresh_location_cache(bool is_nonblock, int err) { DAS_CTX(get_exec_context()).get_location_router().refresh_location_cache(is_nonblock, err); } // 告诉mysql是否要传入一个EndTransCallback bool ObResultSet::need_end_trans_callback() const { int ret = OB_SUCCESS; bool need = false; ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); if (stmt::T_SELECT == get_stmt_type()) { // 对于select语句,总不走callback,无论事务状态如何 need = false; } else if (is_returning_) { need = false; } else if (my_session_.get_has_temp_table_flag() || my_session_.has_tx_level_temp_table()) { need = false; } else if (stmt::T_END_TRANS == get_stmt_type()) { need = true; } else { bool explicit_start_trans = my_session_.has_explicit_start_trans(); bool ac = true; if (OB_FAIL(my_session_.get_autocommit(ac))) { LOG_ERROR("fail to get autocommit", K(ret)); } else {} if (stmt::T_ANONYMOUS_BLOCK == get_stmt_type() && is_oracle_mode()) { need = ac && !explicit_start_trans && !is_with_rows(); } else if (OB_LIKELY(NULL != physical_plan_) && OB_LIKELY(physical_plan_->is_need_trans()) && !physical_plan_->is_link_dml_plan()) { need = (true == ObSqlTransUtil::plan_can_end_trans(ac, explicit_start_trans)) && (false == ObSqlTransUtil::is_remote_trans(ac, explicit_start_trans, physical_plan_->get_plan_type())); } } return need; } int ObResultSet::ExternalRetrieveInfo::build_into_exprs( ObStmt &stmt, pl::ObPLBlockNS *ns, bool is_dynamic_sql) { int ret = OB_SUCCESS; if (stmt.is_select_stmt()) { ObSelectIntoItem *into_item = (static_cast(stmt)).get_select_into(); if (OB_NOT_NULL(into_item)) { OZ (into_exprs_.assign(into_item->pl_vars_)); } is_select_for_update_ = (static_cast(stmt)).has_for_update(); has_hidden_rowid_ = (static_cast(stmt)).has_hidden_rowid(); } else if (stmt.is_insert_stmt() || stmt.is_update_stmt() || stmt.is_delete_stmt()) { ObDelUpdStmt &dml_stmt = static_cast(stmt); OZ (into_exprs_.assign(dml_stmt.get_returning_into_exprs())); } return ret; } int ObResultSet::ExternalRetrieveInfo::check_into_exprs(ObStmt &stmt, ObArray &basic_types, ObBitSet<> &basic_into) { int ret = OB_SUCCESS; CK (stmt.is_select_stmt() || stmt.is_insert_stmt() || stmt.is_update_stmt() || stmt.is_delete_stmt()); if (OB_SUCC(ret)) { if (stmt.is_select_stmt()) { ObSelectStmt &select_stmt = static_cast(stmt); if (basic_types.count() != select_stmt.get_select_item_size() && into_exprs_.count() != select_stmt.get_select_item_size()) { if (basic_types.count() > select_stmt.get_select_item_size() || into_exprs_.count() > select_stmt.get_select_item_size()) { ret = OB_ERR_TOO_MANY_VALUES; LOG_WARN("ORA-00913: too many values", K(ret), K(basic_types.count()), K(select_stmt.get_select_item_size())); } else if (basic_types.count() < select_stmt.get_select_item_size() || into_exprs_.count() < select_stmt.get_select_item_size()) { ret = OB_ERR_NOT_ENOUGH_VALUES; LOG_WARN("not enough values", K(ret), K(basic_types.count()), K(select_stmt.get_select_item_size())); } } } else { ObDelUpdStmt &dml_stmt = static_cast(stmt); const ObIArray &returning_exprs = dml_stmt.get_returning_exprs(); if (basic_types.count() > returning_exprs.count()) { ret = OB_ERR_TOO_MANY_VALUES; LOG_WARN("ORA-00913: too many values", K(ret), K(basic_types.count()), K(returning_exprs.count())); } else if (basic_types.count() < returning_exprs.count()) { ret = OB_ERR_NOT_ENOUGH_VALUES; LOG_WARN("not enough values", K(ret), K(basic_types.count()), K(returning_exprs.count())); } for (int64_t i = 0; OB_SUCC(ret) && i < returning_exprs.count(); ++i) { if (basic_into.has_member(i)) { // 往基本类型里into,不必检查类型匹配,存储之前做强转 } else if (OB_ISNULL(returning_exprs.at(i))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("returning expr is NULL", K(i), K(ret)); } else if (returning_exprs.at(i)->get_result_type().get_obj_meta() != basic_types.at(i).get_meta_type() || returning_exprs.at(i)->get_result_type().get_accuracy() != basic_types.at(i).get_accuracy()) { LOG_DEBUG("select item type and into expr type are not match", K(i), K(returning_exprs.at(i)->get_result_type()), K(basic_types.at(i)), K(ret)); } } } } return ret; } int ObResultSet::ExternalRetrieveInfo::recount_dynamic_param_info( common::ObIArray> ¶m_info) { int ret = OB_SUCCESS; int64_t current_position = INT64_MAX; for (int64_t i = 0; OB_SUCC(ret) && i < into_exprs_.count(); ++i) { ObRawExpr *into = into_exprs_.at(i); if (OB_NOT_NULL(into) && T_QUESTIONMARK == into->get_expr_type() && static_cast(into)->get_value().get_unknown() < current_position) { current_position = static_cast(into)->get_value().get_unknown(); } } // sort ObSEArray recount_params; for (int64_t i = 0; current_position != INT64_MAX && OB_SUCC(ret) && i < param_info.count(); ++i) { ObRawExpr *param = param_info.at(i).first; if (OB_NOT_NULL(param) && T_QUESTIONMARK == param->get_expr_type() && static_cast(param)->get_value().get_unknown() > current_position) { OZ (recount_params.push_back(static_cast(param))); } } if (OB_SUCC(ret) && recount_params.count() >= 2) { std::sort(recount_params.begin(), recount_params.end(), [](ObConstRawExpr *a, ObConstRawExpr *b) { return a->get_value().get_unknown() < b->get_value().get_unknown(); }); } for (int64_t i = 0; current_position != INT64_MAX && OB_SUCC(ret) && i < recount_params.count(); ++i) { recount_params.at(i)->get_value().set_unknown(current_position); current_position ++; } return ret; } int ObResultSet::ExternalRetrieveInfo::build( ObStmt &stmt, ObSQLSessionInfo &session_info, pl::ObPLBlockNS *ns, bool is_dynamic_sql, common::ObIArray> ¶m_info) { int ret = OB_SUCCESS; OZ (build_into_exprs(stmt, ns, is_dynamic_sql)); CK (OB_NOT_NULL(session_info.get_cur_exec_ctx())); CK (OB_NOT_NULL(session_info.get_cur_exec_ctx()->get_sql_ctx())); OX (is_bulk_ = session_info.get_cur_exec_ctx()->get_sql_ctx()->is_bulk_); OX (session_info.get_cur_exec_ctx()->get_sql_ctx()->is_bulk_ = false); if (OB_SUCC(ret)) { ObSchemaGetterGuard *schema_guard = NULL; if (OB_ISNULL(stmt.get_query_ctx()) || OB_ISNULL(schema_guard = stmt.get_query_ctx()->sql_schema_guard_.get_schema_guard())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("query_ctx is null", K(ret)); } else if (param_info.empty() && into_exprs_.empty()) { if (stmt.is_dml_stmt()) { OZ (ObSQLUtils::reconstruct_sql(allocator_, &stmt, stmt.get_query_ctx()->get_sql_stmt(), schema_guard, session_info.create_obj_print_params())); } else { // other stmt do not need reconstruct. } } else { if (is_dynamic_sql && !into_exprs_.empty()) { OZ (recount_dynamic_param_info(param_info)); } OZ (ObSQLUtils::reconstruct_sql(allocator_, &stmt, stmt.get_query_ctx()->get_sql_stmt(), schema_guard, session_info.create_obj_print_params())); /* * stmt里的?是按照表达式resolve的顺序编号的,这个顺序在prepare阶段需要依赖的 * 但是route_sql里的?需要按照入参在符号表里的下标进行编号,这个编号是proxy做路由的时候依赖的,所以这里要改掉stmt里QUESTIONMARK的值 */ external_params_.set_capacity(param_info.count()); for (int64_t i = 0; OB_SUCC(ret) && i < param_info.count(); ++i) { if (OB_ISNULL(param_info.at(i).first) || OB_ISNULL(param_info.at(i).second)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("param expr is NULL", K(i), K(param_info.at(i).first), K(param_info.at(i).second), K(ret)); } else if (OB_FAIL(external_params_.push_back(param_info.at(i).first))) { LOG_WARN("push back error", K(i), K(param_info.at(i).first), K(ret)); } else if (T_QUESTIONMARK == param_info.at(i).first->get_expr_type()) { ObConstRawExpr *const_expr = static_cast(param_info.at(i).first); if (OB_FAIL(param_info.at(i).second->get_value().apply(const_expr->get_value()))) { LOG_WARN("apply error", K(const_expr->get_value()), K(param_info.at(i).second->get_value()), K(ret)); } } else { //如果不是PL的local变量,需要把QUESTIONMARK的值改为无效值,以免使proxy混淆 param_info.at(i).second->get_value().set_unknown(OB_INVALID_INDEX); param_info.at(i).second->set_expr_obj_meta( param_info.at(i).second->get_value().get_meta()); if (stmt_sql_.empty()) { OZ (ob_write_string(allocator_, stmt.get_query_ctx()->get_sql_stmt(), stmt_sql_)); } } } OZ (ObSQLUtils::reconstruct_sql(allocator_, &stmt, route_sql_, schema_guard, session_info.create_obj_print_params())); } } LOG_INFO("reconstruct sql:", K(stmt.get_query_ctx()->get_prepare_param_count()), K(stmt.get_query_ctx()->get_sql_stmt()), K(route_sql_), K(ret)); return ret; } int ObResultSet::drive_dml_query() { // DML使用PX执行框架,在非returning情况下,需要主动 // 调用get_next_row才能驱动整个框架的执行 int ret = OB_SUCCESS; if (get_physical_plan()->is_returning() || (get_physical_plan()->is_local_or_remote_plan() && !get_physical_plan()->need_drive_dml_query_)) { //1. dml returning will drive dml write through result.get_next_row() //2. partial dml query will drive dml write through operator open //3. partial dml query need to drive dml write through result.drive_dml_query, //use the flag result.drive_dml_query to distinguish situation 2,3 } else if (get_physical_plan()->need_drive_dml_query_) { const ObNewRow *row = nullptr; if (OB_LIKELY(OB_ITER_END == (ret = inner_get_next_row(row)))) { ret = OB_SUCCESS; } else { LOG_WARN("do dml query failed", K(ret)); } } else if (get_physical_plan()->is_use_px() && !get_physical_plan()->is_use_pdml()) { // DML + PX情况下,需要调用get_next_row 来驱动PX框架 stmt::StmtType type = get_physical_plan()->get_stmt_type(); if (type == stmt::T_INSERT || type == stmt::T_DELETE || type == stmt::T_UPDATE || type == stmt::T_REPLACE || type == stmt::T_MERGE) { const ObNewRow *row = nullptr; // 非returning类型的PX+DML的驱动需要主动调用 get_next_row if (OB_LIKELY(OB_ITER_END == (ret = inner_get_next_row(row)))) { // 外层调用已经处理了对应的affected rows ret = OB_SUCCESS; } else { LOG_WARN("do dml query failed", K(ret)); } } } else if (get_physical_plan()->is_use_pdml()) { const ObNewRow *row = nullptr; // pdml+非returning情况下,需要调用get_next_row来驱动PX框架 if (OB_LIKELY(OB_ITER_END == (ret = inner_get_next_row(row)))) { // 外层调用已经处理了对应的affected rows ret = OB_SUCCESS; } else { LOG_WARN("do pdml query failed", K(ret)); } } return ret; } int ObResultSet::copy_field_columns(const ObPhysicalPlan &plan) { int ret = OB_SUCCESS; field_columns_.reset(); int64_t N = plan.get_field_columns().count(); ObField field; if (N > 0 && OB_FAIL(field_columns_.reserve(N))) { LOG_WARN("failed to reserve field column array", K(ret), K(N)); } for (int64_t i = 0; OB_SUCC(ret) && i < N; i++) { const ObField &ofield = plan.get_field_columns().at(i); if (OB_FAIL(field.deep_copy(ofield, &get_mem_pool()))) { LOG_WARN("deep copy field failed", K(ret)); } else if (OB_FAIL(field_columns_.push_back(field))) { LOG_WARN("push back field column failed", K(ret)); } else { LOG_DEBUG("succs to copy field", K(field)); } } return ret; } int ObResultSet::construct_field_name(const common::ObIArray &raw_params, const bool is_first_parse) { int ret = OB_SUCCESS; for (int64_t i = 0; OB_SUCC(ret) && i < field_columns_.count(); i++) { if (OB_FAIL(construct_display_field_name(field_columns_.at(i), raw_params, is_first_parse))) { LOG_WARN("failed to construct display name", K(ret), K(field_columns_.at(i))); } else { // do nothing } } return ret; } int ObResultSet::construct_display_field_name(common::ObField &field, const ObIArray &raw_params, const bool is_first_parse) { int ret = OB_SUCCESS; char *buf = nullptr; // mysql模式下,select '\t\t\t\t aaa'会去掉头部的转义字符以及空格,最后的长度不能大于MAX_COLUMN_CHAR_LENGTH // 所以这里多给一些buffer来处理这种情况 // 如果转义字符多了还是会有问题 int32_t buf_len = MAX_COLUMN_CHAR_LENGTH * 2; int32_t pos = 0; int32_t name_pos = 0; if (!field.is_paramed_select_item_ || NULL == field.paramed_ctx_) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(field.is_paramed_select_item_), K(field.paramed_ctx_)); } else if (0 == field.paramed_ctx_->paramed_cname_.length()) { // 1. 参数化的cname长度为0,说明指定了列名 // 2. 指定了别名,别名存在cname_里,直接使用即可 // do nothing } else if (OB_ISNULL(buf = static_cast(get_mem_pool().alloc(buf_len)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate memory", K(ret), K(buf_len)); } else { LOG_DEBUG("construct display field name", K(field)); #define PARAM_CTX field.paramed_ctx_ for (int64_t i = 0; OB_SUCC(ret) && pos <= buf_len && i < PARAM_CTX->param_idxs_.count(); i++) { int64_t idx = PARAM_CTX->param_idxs_.at(i); if (idx >= raw_params.count()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid index", K(i), K(raw_params.count())); } else if (OB_ISNULL(raw_params.at(idx)) || OB_ISNULL(raw_params.at(idx)->node_)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(raw_params.at(idx)), K(raw_params.at(idx)->node_)); } else { int32_t len = (int32_t)PARAM_CTX->param_str_offsets_.at(i) - name_pos; len = std::min(buf_len - pos, len); if (len > 0) { MEMCPY(buf + pos, PARAM_CTX->paramed_cname_.ptr() + name_pos, len); } name_pos = (int32_t)PARAM_CTX->param_str_offsets_.at(i) + 1; // skip '?' pos += len; if (is_first_parse && PARAM_CTX->neg_param_idxs_.has_member(idx) && pos < buf_len) { buf[pos++] = '-'; // insert `-` for negative value } int64_t copy_str_len = 0; const char *copy_str = NULL; // 1. // 如果有词法分析得到常量节点有is_copy_raw_text_标记,则是有前缀的常量,比如 // select date'2012-12-12' from dual, column显示为date'2012-12-12', // 词法分析得到的常量节点的raw_text为date'2012-12-12', // 所以直接拷贝raw_text,在oracle模式下需要还需要去空格以及转换大小写 // 2. // 如果esc_str_flag_被标记,那么投影列是是常量字符串,其内部字符串需要转义, // 常量节点的str_value是转义好的字符串,直接使用即可, // 但是在mysql模式下,字符串开头的某些转义字符不会显示,ObResultSet::make_final_field_name会处理 // oracle模式下,需要在加上单引号 // 比如MySQL模式:select '\'hello' from dual,列名显示'hello // select '\thello' from dual,列名显示hello (去掉了左边的转义字符) // Oracle模式:select 'hello' from dual, 列名显示 'hello', (带引号) // select '''hello' from dual, 列名显示 ''hello' // 3. // mysql模式下, 有以下对于以下sql: // select 'a' 'abc' from dual // 行为是a作为column name,’abc‘作为值,但是参数化后的sql为select ? from dual // 在词法节点两个字符串被concat,整体作为一个varchar节点,但是该节点有一个T_CONCAT_STRING子节点,节点的str_val保存的是column name // 所以这种情况下,直接去T_CONCAT_STRING的str_val作为列名 if (1 == raw_params.at(idx)->node_->is_copy_raw_text_) { copy_str_len = raw_params.at(idx)->node_->text_len_; copy_str = raw_params.at(idx)->node_->raw_text_; } else if (PARAM_CTX->esc_str_flag_) { if (lib::is_mysql_mode() && 1 == raw_params.at(idx)->node_->num_child_) { LOG_DEBUG("concat str node"); if (OB_ISNULL(raw_params.at(idx)->node_->children_) || OB_ISNULL(raw_params.at(idx)->node_->children_[0]) || T_CONCAT_STRING != raw_params.at(idx)->node_->children_[0]->type_) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret)); } else { copy_str_len = raw_params.at(idx)->node_->children_[0]->str_len_; copy_str = raw_params.at(idx)->node_->children_[0]->str_value_; } } else { copy_str_len = raw_params.at(idx)->node_->str_len_; copy_str = raw_params.at(idx)->node_->str_value_; if (lib::is_oracle_mode() && pos < buf_len) { buf[pos++] = '\''; } } } else { copy_str_len = raw_params.at(idx)->node_->text_len_; copy_str = raw_params.at(idx)->node_->raw_text_; } if (OB_SUCC(ret)) { len = std::min(buf_len - pos, (int32_t)copy_str_len); if (len > 0) { MEMCPY(buf + pos, copy_str, len); } if (len > 0 && 1 == raw_params.at(idx)->node_->is_copy_raw_text_ && lib::is_oracle_mode()) { // 如果直接拷贝了raw_text_,可能需要转换大小写和去空格 ObCollationType col_type = static_cast(field.charsetnr_); len = (int32_t)remove_extra_space(buf + pos, len); ObString str(len, (const char *)buf+pos); ObString dest; if (OB_FAIL(ObCharset::caseup(col_type, str, dest, get_mem_pool()))) { LOG_WARN("failed to do charset caseup", K(ret)); } else { len = dest.length(); MEMCPY(buf+pos, dest.ptr(), len); } get_mem_pool().free(dest.ptr()); dest.reset(); str.reset(); } pos += len; if (PARAM_CTX->esc_str_flag_ && lib::is_oracle_mode() && pos < buf_len) { buf[pos++] = '\''; } } } } // for end if (OB_FAIL(ret)) { // do nothing } else if (name_pos < PARAM_CTX->paramed_cname_.length()) { // 如果最后一个常量后面还有字符串 int32_t len = std::min((int32_t)PARAM_CTX->paramed_cname_.length() - name_pos, buf_len - pos); if (len > 0) { MEMCPY(buf + pos, PARAM_CTX->paramed_cname_.ptr() + name_pos, len); } pos += len; } if (OB_FAIL(ret) || 0 == PARAM_CTX->param_idxs_.count()) { // 如果没有任何参数化信息,不做任何处理 // do nothing } else if (lib::is_oracle_mode()) { // 如果oracle模式下cp_str_value_flag_为true,说明有alias name,直接使用即可(空格也不用去) // 否则,oracle模式下需要转换大写,去掉多余的空格 ObCollationType col_type = static_cast(field.charsetnr_); pos = (int32_t)remove_extra_space(buf, pos); ObString str(pos, (const char *)buf); ObString dest; if (OB_FAIL(ObCharset::caseup(col_type, str, dest, get_mem_pool()))) { LOG_WARN("failed to do charset caseup", K(ret)); } else { pos = dest.length(); MEMCPY(buf, dest.ptr(), pos); } get_mem_pool().free(dest.ptr()); dest.reset(); str.reset(); } if (OB_FAIL(ret)) { // do nothing } else if (lib::is_mysql_mode() && OB_FAIL(make_final_field_name(buf, pos, field.cname_))) { LOG_WARN("failed to make final field name", K(ret)); } else if (lib::is_oracle_mode()) { pos = pos < MAX_COLUMN_CHAR_LENGTH ? pos : MAX_COLUMN_CHAR_LENGTH; //field.cname_.assign(buf, pos); if (OB_FAIL(ob_write_string(get_mem_pool(), ObString(pos, buf), field.cname_))) { LOG_WARN("failed to copy paramed cname", K(ret)); } } else { // do nothing } #undef PARAM_CTX } return ret; } int64_t ObResultSet::remove_extra_space(char *buff, int64_t len) { int64_t length = 0; if (OB_LIKELY(NULL != buff)) { for (int64_t i = 0; i < len; i++) { if (!std::isspace(buff[i])) { buff[length++] = buff[i]; } } } return length; } void ObResultSet::replace_lob_type(const ObSQLSessionInfo &session, const ObField &field, obmysql::ObMySQLField &mfield) { bool is_use_lob_locator = session.is_client_use_lob_locator(); if (lib::is_oracle_mode()) { // 如果客户端不用lob locator, 而server返回了lob locator数据类型 // 则在field中改为老的方式的type:MYSQL_TYPE_LONG_BLOB; // 如果客户端使用lob locator, 则根据collation, 判断是clob还是blob; // 然后返回blob/clob的type if (is_lob_locator(field.type_.get_type()) || is_lob(field.type_.get_type())) { if (!is_use_lob_locator && is_lob_locator(field.type_.get_type())) { mfield.type_ = obmysql::EMySQLFieldType::MYSQL_TYPE_LONG_BLOB; } else if (is_use_lob_locator) { if (CS_TYPE_BINARY == field.charsetnr_) { mfield.type_ = obmysql::EMySQLFieldType::MYSQL_TYPE_ORA_BLOB; } else { mfield.type_ = obmysql::EMySQLFieldType::MYSQL_TYPE_ORA_CLOB; } } } LOG_TRACE("init field", K(is_use_lob_locator), K(field), K(mfield.type_)); } } int ObResultSet::make_final_field_name(char *buf, int64_t len, common::ObString &field_name) { int ret = OB_SUCCESS; ObCollationType cs_type = common::CS_TYPE_INVALID; if (OB_ISNULL(buf) || 0 == len) { field_name.assign(buf, static_cast(len)); } else if (OB_FAIL(my_session_.get_collation_connection(cs_type))) { LOG_WARN("failed to get collation_connection", K(ret)); } else { while (len && !ObCharset::is_graph(cs_type, *buf)) { buf++; len--; } len = len < MAX_COLUMN_CHAR_LENGTH ? len : MAX_COLUMN_CHAR_LENGTH; field_name.assign(buf, static_cast(len)); } return ret; } uint64_t ObResultSet::get_field_cnt() const { int64_t cnt = 0; uint64_t ret = 0; if (OB_ISNULL(get_field_columns())) { LOG_ERROR("unexpected error. field columns is null"); right_to_die_or_duty_to_live(); } cnt = get_field_columns()->count(); if (cnt >= 0) { ret = static_cast(cnt); } return ret; } bool ObResultSet::has_implicit_cursor() const { bool bret = false; if (get_exec_context().get_physical_plan_ctx() != nullptr) { bret = !get_exec_context().get_physical_plan_ctx()->get_implicit_cursor_infos().empty(); } return bret; } int ObResultSet::switch_implicit_cursor() { int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = get_exec_context().get_physical_plan_ctx(); if (OB_ISNULL(plan_ctx)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("plan_ctx is null", K(ret)); } else if (OB_FAIL(plan_ctx->switch_implicit_cursor())) { if (OB_ITER_END != ret) { LOG_WARN("cursor_idx is invalid", K(ret)); } } else { set_affected_rows(plan_ctx->get_affected_rows()); memset(message_, 0, sizeof(message_)); if (OB_FAIL(set_mysql_info())) { LOG_WARN("set mysql info failed", K(ret)); } } return ret; } bool ObResultSet::is_cursor_end() const { bool bret = false; ObPhysicalPlanCtx *plan_ctx = get_exec_context().get_physical_plan_ctx(); if (plan_ctx != nullptr) { bret = (plan_ctx->get_cur_stmt_id() >= plan_ctx->get_implicit_cursor_infos().count()); } return bret; } ObRemoteResultSet::ObRemoteResultSet(common::ObIAllocator &allocator) : mem_pool_(allocator), remote_resp_handler_(NULL), field_columns_(), scanner_(NULL), scanner_iter_(), all_data_empty_(false), cur_data_empty_(true), first_response_received_(false), found_rows_(0), stmt_type_(stmt::T_NONE) {} ObRemoteResultSet::~ObRemoteResultSet() { if (NULL != remote_resp_handler_) { remote_resp_handler_->~ObInnerSqlRpcStreamHandle(); remote_resp_handler_ = NULL; } mem_pool_.reset(); } int ObRemoteResultSet::reset_and_init_remote_resp_handler() { int ret = OB_SUCCESS; if (NULL != remote_resp_handler_) { remote_resp_handler_->~ObInnerSqlRpcStreamHandle(); remote_resp_handler_ = NULL; } ObInnerSqlRpcStreamHandle *buffer = NULL; if (OB_ISNULL(buffer = static_cast( mem_pool_.alloc(sizeof(ObInnerSqlRpcStreamHandle))))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory for ObInnerSqlRpcStreamHandle", K(ret)); } else { remote_resp_handler_ = new (buffer) ObInnerSqlRpcStreamHandle(); } return ret; } int ObRemoteResultSet::copy_field_columns( const common::ObSArray &src_field_columns) { int ret = OB_SUCCESS; field_columns_.reset(); int64_t N = src_field_columns.count(); ObField field; if (N > 0 && OB_FAIL(field_columns_.reserve(N))) { LOG_WARN("failed to reserve field column array", K(ret), K(N)); } for (int64_t i = 0; OB_SUCC(ret) && i < N; i++) { const ObField &ofield = src_field_columns.at(i); if (OB_FAIL(field.deep_copy(ofield, &get_mem_pool()))) { LOG_WARN("deep copy field failed", K(ret)); } else if (OB_FAIL(field_columns_.push_back(field))) { LOG_WARN("push back field column failed", K(ret)); } else { LOG_DEBUG("succs to copy field", K(field)); } } return ret; } int ObRemoteResultSet::setup_next_scanner() { int ret = OB_SUCCESS; if (OB_ISNULL(remote_resp_handler_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("resp_handler is NULL", K(ret)); } else { ObInnerSQLTransmitResult *transmit_result= NULL; if (!first_response_received_) { /* has not gotten the first scanner responsed */ if (OB_ISNULL(transmit_result = remote_resp_handler_->get_result())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("transmit_result is NULL", K(ret)); } else if (OB_FAIL(transmit_result->get_err_code())) { LOG_WARN("while fetching first scanner, the remote rcode is not OB_SUCCESS", K(ret)); } else { scanner_ = &transmit_result->get_scanner(); scanner_iter_ = scanner_->begin(); first_response_received_ = true; /* has gotten the first scanner responsed already */ found_rows_ += scanner_->get_found_rows(); stmt_type_ = transmit_result->get_stmt_type(); const common::ObSArray &src_field_columns = transmit_result->get_field_columns(); if (0 >= src_field_columns.count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("the count of field_columns is unexpected", K(ret), K(src_field_columns.count())); } else if (OB_FAIL(copy_field_columns(src_field_columns))) { LOG_WARN("copy_field_columns failed", K(ret), K(src_field_columns.count())); } else { const int64_t column_count = field_columns_.count(); if (column_count <= 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("column_count is invalid", K(column_count)); } } } } else { /* successor request will use handle to send get_more to the resource observer */ ObInnerSqlRpcStreamHandle::InnerSQLSSHandle &handle = remote_resp_handler_->get_handle(); if (handle.has_more()) { if (OB_FAIL(remote_resp_handler_->reset_and_init_scanner())) { LOG_WARN("fail reset and init result", K(ret)); } else if (OB_ISNULL(transmit_result = remote_resp_handler_->get_result())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("succ to alloc result, but result scanner is NULL", K(ret)); } else if (OB_FAIL(handle.get_more(*transmit_result))) { LOG_WARN("fail wait response", K(ret)); } else { scanner_ = &transmit_result->get_scanner(); scanner_iter_ = scanner_->begin(); found_rows_ += scanner_->get_found_rows(); } } else { ret = OB_ITER_END; LOG_DEBUG("no more scanners in the handle", K(ret)); } } } return ret; } int ObRemoteResultSet::get_next_row_from_cur_scanner(const common::ObNewRow *&row) { int ret = OB_SUCCESS; ObNewRow *new_row = NULL; if (OB_FAIL(scanner_iter_.get_next_row(new_row, NULL))) { if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("fail get next row", K(ret)); } } else { row = new_row; } return ret; } int ObRemoteResultSet::get_next_row(const common::ObNewRow *&row) { int ret = OB_SUCCESS; bool has_got_a_row = false; while (OB_SUCC(ret) && false == has_got_a_row) { if (all_data_empty_) { // has no more data ret = OB_ITER_END; } else if (cur_data_empty_) { // current scanner has no more data if (OB_FAIL(setup_next_scanner())) { if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("fail to setup next scanner", K(ret)); } else { all_data_empty_ = true; } } else { cur_data_empty_ = false; } } else { // current scanner has new rows if (OB_FAIL(get_next_row_from_cur_scanner(row))) { if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("fail to get next row from cur scanner", K(ret)); } else { cur_data_empty_ = true; ret = OB_SUCCESS; } } else { has_got_a_row = true; // get one row and break } } } return ret; } int ObRemoteResultSet::close() { int ret = OB_SUCCESS; if (OB_ISNULL(remote_resp_handler_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("resp_handler is NULL", K(ret)); } else { ObInnerSqlRpcStreamHandle::InnerSQLSSHandle &handle = remote_resp_handler_->get_handle(); if (handle.has_more()) { if (OB_FAIL(handle.abort())) { LOG_WARN("fail to abort", K(ret)); } } } return ret; }