/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX SQL_ENG #include "sql/engine/dml/ob_table_modify_op.h" #include "sql/engine/dml/ob_dml_service.h" #include "sql/engine/dml/ob_table_insert_op.h" #include "sql/engine/dml/ob_table_update_op.h" #include "sql/engine/basic/ob_expr_values_op.h" #include "sql/engine/expr/ob_expr_column_conv.h" #include "sql/engine/expr/ob_expr_calc_partition_id.h" #include "sql/executor/ob_task_spliter.h" #include "sql/das/ob_das_utils.h" #include "storage/ob_i_store.h" #include "lib/mysqlclient/ob_isql_client.h" #include "observer/ob_inner_sql_connection_pool.h" #include "lib/worker.h" namespace oceanbase { using namespace common; using namespace common::sqlclient; using namespace storage; using namespace share; using namespace share::schema; using namespace observer; namespace sql { int ForeignKeyHandle::do_handle(ObTableModifyOp &op, const ObDMLBaseCtDef &dml_ctdef, ObDMLBaseRtDef &dml_rtdef) { int ret = OB_SUCCESS; if (op.need_foreign_key_checks()) { const ObExprPtrIArray &old_row = dml_ctdef.old_row_; const ObExprPtrIArray &new_row = dml_ctdef.new_row_; op.get_exec_ctx().get_das_ctx().is_fk_cascading_ = true; LOG_DEBUG("do foreign_key_handle", K(old_row), K(new_row)); if (OB_FAIL(op.check_stack())) { LOG_WARN("fail to check stack", K(ret)); } for (int i = 0; OB_SUCC(ret) && i < dml_ctdef.fk_args_.count(); i++) { const ObForeignKeyArg &fk_arg = dml_ctdef.fk_args_.at(i); if (OB_SUCC(ret) && !new_row.empty()) { if (ACTION_CHECK_EXIST == fk_arg.ref_action_) { // insert or update. bool is_foreign_key_cascade = false; if (OB_FAIL(op.get_foreign_key_cascade(is_foreign_key_cascade))) { LOG_WARN("failed to get foreign key cascade", K(ret), K(fk_arg), K(new_row)); } else if (is_foreign_key_cascade) { // nested update can not check parent row. LOG_DEBUG("skip foreign_key_check_exist in nested session"); } else if (OB_FAIL(check_exist(op, fk_arg, new_row, false))) { LOG_WARN("failed to check exist", K(ret), K(fk_arg), K(new_row)); } } } if (OB_SUCC(ret) && !old_row.empty()) { if (ACTION_RESTRICT == fk_arg.ref_action_ || ACTION_NO_ACTION == fk_arg.ref_action_) { // update or delete. bool has_changed = false; if (OB_FAIL(value_changed(op, fk_arg.columns_, old_row, new_row, has_changed))) { LOG_WARN("failed to check if foreign key value has changed", K(ret), K(fk_arg), K(old_row), K(new_row)); } else if (!has_changed) { // nothing. } else if (OB_FAIL(check_exist(op, fk_arg, old_row, true))) { LOG_WARN("failed to check exist", K(ret), K(fk_arg), K(old_row)); } } else if (ACTION_CASCADE == fk_arg.ref_action_) { // update or delete. bool is_self_ref = false; if (OB_FAIL(is_self_ref_row(op.get_eval_ctx(), old_row, fk_arg, is_self_ref))) { LOG_WARN("is_self_ref_row failed", K(ret), K(old_row), K(fk_arg)); } else if (new_row.empty() && is_self_ref && op.is_fk_nested_session()) { // delete self refercnced row should not cascade delete. } else if (OB_FAIL(cascade(op, fk_arg, old_row, new_row))) { LOG_WARN("failed to cascade", K(ret), K(fk_arg), K(old_row), K(new_row)); } else if (!new_row.empty() && is_self_ref) { // we got here only when: // 1. handling update operator and // 2. foreign key constraint is self reference // need to change %new_row and %old_row // (see https://aone.alibaba-inc.com/issue/17476162) // xxx_row_res_info helps to restore row. restore row before get_next_row() //op.fk_self_ref_row_res_infos_.reset(); for (int64_t i = 0; OB_SUCC(ret) && i < fk_arg.columns_.count(); i++) { const int32_t val_idx = fk_arg.columns_.at(i).idx_; const int32_t name_idx = fk_arg.columns_.at(i).name_idx_; if (OB_SUCC(ret)) { const ObDatum &updated_value = new_row.at(val_idx)-> locate_expr_datum(op.get_eval_ctx()); ObDatum &new_row_name_col = new_row.at(name_idx)-> locate_expr_datum(op.get_eval_ctx()); ObDatum &old_row_name_col = old_row.at(name_idx)-> locate_expr_datum(op.get_eval_ctx()); OX(new_row_name_col = updated_value); OX(old_row_name_col = updated_value); } } } } } // if (old_row.is_valid()) } // for //fk cascading end op.get_exec_ctx().get_das_ctx().is_fk_cascading_ = false; } else { LOG_DEBUG("skip foreign_key_handle"); } return ret; } int ForeignKeyHandle::value_changed(ObTableModifyOp &op, const ObIArray &columns, const ObExprPtrIArray &old_row, const ObExprPtrIArray &new_row, bool &has_changed) { int ret = OB_SUCCESS; has_changed = false; if (!old_row.empty() && !new_row.empty()) { ObDatum *old_row_col = NULL; ObDatum *new_row_col = NULL; for (int64_t i = 0; OB_SUCC(ret) && !has_changed && i < columns.count(); ++i) { int64_t col_idx = columns.at(i).idx_; CK(col_idx < old_row.count()); CK(col_idx < new_row.count()); CK(OB_NOT_NULL(old_row.at(col_idx))); CK(OB_NOT_NULL(new_row.at(col_idx))); OZ(old_row.at(col_idx)->eval(op.get_eval_ctx(), old_row_col)); OZ(new_row.at(col_idx)->eval(op.get_eval_ctx(), new_row_col)); OX(has_changed = (false == ObDatum::binary_equal(*old_row_col, *new_row_col))); } } else { has_changed = true; } return ret; } int ForeignKeyHandle::check_exist(ObTableModifyOp &op, const ObForeignKeyArg &fk_arg, const ObExprPtrIArray &row, bool expect_zero) { int ret = OB_SUCCESS; static const char *SELECT_FMT_MYSQL = "select /*+ no_parallel */ 1 from `%.*s`.`%.*s` where %.*s limit 2 for update"; static const char *SELECT_FMT_ORACLE = "select /*+ no_parallel */ 1 from \"%.*s\".\"%.*s\" where %.*s and rownum <= 2 for update"; const char *select_fmt = lib::is_mysql_mode() ? SELECT_FMT_MYSQL : SELECT_FMT_ORACLE; ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR, OB_MALLOC_NORMAL_BLOCK_SIZE, op.get_eval_ctx().exec_ctx_.get_my_session()->get_effective_tenant_id()); char *stmt_buf = nullptr; int64_t stmt_len = 0; char *where_buf = nullptr; int64_t where_len = 0; int64_t stmt_pos = 0; int64_t where_pos = 0; const ObString &database_name = fk_arg.database_name_; const ObString &table_name = fk_arg.table_name_; if (row.empty()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("row is invalid", K(ret)); } else if (OB_FAIL(gen_where(op.get_eval_ctx(), where_buf, where_len, where_pos, fk_arg.columns_, row, alloc, op.get_obj_print_params()))) { if (OB_LIKELY(OB_ERR_NULL_VALUE == ret)) { // skip check exist if any column in foreign key is NULL. ret = OB_SUCCESS; stmt_pos = 0; } else { LOG_WARN("failed to gen foreign key where", K(ret), K(row), K(fk_arg.columns_)); } } else if (OB_FAIL(databuff_printf(stmt_buf, stmt_len, stmt_pos, alloc, select_fmt, database_name.length(), database_name.ptr(), table_name.length(), table_name.ptr(), static_cast(where_pos), where_buf))) { LOG_WARN("failed to print stmt", K(ret), K(table_name), K(where_buf)); } else { stmt_buf[stmt_pos++] = 0; } if (OB_SUCC(ret) && stmt_pos > 0) { LOG_DEBUG("foreign_key_check_exist", "stmt", stmt_buf, K(row), K(fk_arg)); SMART_VAR(ObMySQLProxy::MySQLResult, res) { if (OB_FAIL(op.begin_nested_session(fk_arg.is_self_ref_))) { LOG_WARN("failed to begin nested session", K(ret), K(stmt_buf)); } else if (OB_FAIL(op.set_foreign_key_check_exist(true))) { LOG_WARN("failed to set foreign key cascade", K(ret)); } else { // must call end_nested_session() if begin_nested_session() success. bool is_zero = false; if (OB_FAIL(op.execute_read(stmt_buf, res))) { LOG_WARN("failed to execute stmt", K(ret), K(stmt_buf)); } else { // must call res.get_result()->close() if execute_read() success. if (OB_ISNULL(res.get_result())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("result is NULL", K(ret)); } else if (OB_FAIL(res.get_result()->next())) { if (OB_ITER_END == ret) { is_zero = true; ret = OB_SUCCESS; } else { LOG_WARN("failed to get next", K(ret)); } } if (OB_SUCC(ret)) { if (is_zero != expect_zero) { // 如果是自引用外键 新行引用了新行 bool is_self_ref = false; // oracle for update does not support aggregate functions, so when is_zero is false // judge whether only one row is affected by whether the second record can be obtained bool is_affect_only_one = false; if (!is_zero && OB_FAIL(res.get_result()->next())) { if (OB_ITER_END == ret) { is_affect_only_one = true; ret = OB_SUCCESS; } else { LOG_WARN("failed to get next", K(ret)); } } /** * expect_zero is false when fk_arg.ref_action_ is equal to * ACTION_CHECK_EXIST, if the is_zero != expect_zero condition is * true, then is_zero is true, need to exclude the case of self reference. * other cases return OB_ERR_NO_REFERENCED_ROW. * * expect_zero is true when fk_arg.ref_action_ is equal to * ACTION_RESTRICT or ACTION_NO_ACTION, if is_zero != expect_zero condition * is true, then is_zero is false, need to exclude the case of self reference and * only affect one row. other cases return OB_ERR_ROW_IS_REFERENCED. */ if (OB_FAIL(is_self_ref_row(op.get_eval_ctx(), row, fk_arg, is_self_ref))) { LOG_WARN("is_self_ref_row failed", K(ret), K(row), K(fk_arg)); } else if (is_zero && !is_self_ref) { ret = OB_ERR_NO_REFERENCED_ROW; LOG_WARN("parent row is not exist", K(ret), K(fk_arg), K(row)); } else if (!is_zero && (!is_self_ref || !is_affect_only_one)) { ret = OB_ERR_ROW_IS_REFERENCED; LOG_WARN("child row is exist", K(ret), K(fk_arg), K(row)); } } } int close_ret = res.get_result()->close(); if (OB_SUCCESS != close_ret) { LOG_WARN("failed to close", K(close_ret)); if (OB_SUCCESS == ret) { ret = close_ret; } } } int reset_ret = op.set_foreign_key_check_exist(false); if (OB_SUCCESS != reset_ret) { LOG_WARN("failed to reset foreign key cascade", K(reset_ret)); if (OB_SUCCESS == ret) { ret = reset_ret; } } int end_ret = op.end_nested_session(); if (OB_SUCCESS != end_ret) { LOG_WARN("failed to end nested session", K(end_ret)); if (OB_SUCCESS == ret) { ret = end_ret; } } } } } return ret; } int ForeignKeyHandle::cascade(ObTableModifyOp &op, const ObForeignKeyArg &fk_arg, const ObExprPtrIArray &old_row, const ObExprPtrIArray &new_row) { static const char *UPDATE_FMT_MYSQL = "update `%.*s`.`%.*s` set %.*s where %.*s"; static const char *UPDATE_FMT_ORACLE = "update \"%.*s\".\"%.*s\" set %.*s where %.*s"; static const char *DELETE_FMT_MYSQL = "delete from `%.*s`.`%.*s` where %.*s"; static const char *DELETE_FMT_ORACLE = "delete from \"%.*s\".\"%.*s\" where %.*s"; int ret = OB_SUCCESS; ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR, OB_MALLOC_NORMAL_BLOCK_SIZE, op.get_eval_ctx().exec_ctx_.get_my_session()->get_effective_tenant_id()); char *stmt_buf = nullptr; int64_t stmt_len = 0; char *where_buf = nullptr; int64_t where_len = 0; int64_t stmt_pos = 0; int64_t where_pos = 0; const ObString &database_name = fk_arg.database_name_; const ObString &table_name = fk_arg.table_name_; if (old_row.empty()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("old row is invalid", K(ret)); } else if (OB_FAIL(gen_where(op.get_eval_ctx(), where_buf, where_len, where_pos, fk_arg.columns_, old_row, alloc, op.get_obj_print_params()))) { if (OB_LIKELY(OB_ERR_NULL_VALUE == ret)) { // skip cascade if any column in foreign key is NULL. ret = OB_SUCCESS; stmt_pos = 0; } else { LOG_WARN("failed to gen foreign key where", K(ret), K(old_row), K(fk_arg.columns_)); } } else { if (!new_row.empty()) { // update. const char *update_fmt = lib::is_mysql_mode() ? UPDATE_FMT_MYSQL : UPDATE_FMT_ORACLE; char *set_buf = nullptr; int64_t set_len = 0; int64_t set_pos = 0; if (OB_FAIL(gen_set(op.get_eval_ctx(), set_buf, set_len, set_pos, fk_arg.columns_, new_row, alloc, op.get_obj_print_params()))) { LOG_WARN("failed to gen foreign key set", K(ret), K(new_row), K(fk_arg.columns_)); } else if (OB_FAIL(databuff_printf(stmt_buf, stmt_len, stmt_pos, alloc, update_fmt, database_name.length(), database_name.ptr(), table_name.length(), table_name.ptr(), static_cast(set_pos), set_buf, static_cast(where_pos), where_buf))) { LOG_WARN("failed to print stmt", K(ret), K(table_name), K(set_buf), K(where_buf)); } else { stmt_buf[stmt_pos++] = 0; } } else { // delete. const char *delete_fmt = lib::is_mysql_mode() ? DELETE_FMT_MYSQL : DELETE_FMT_ORACLE; if (OB_FAIL(databuff_printf(stmt_buf, stmt_len, stmt_pos, alloc, delete_fmt, database_name.length(), database_name.ptr(), table_name.length(), table_name.ptr(), static_cast(where_pos), where_buf))) { LOG_WARN("failed to print stmt", K(ret), K(table_name), K(where_buf)); } else { stmt_buf[stmt_pos++] = 0; } } } if (OB_SUCC(ret) && stmt_pos > 0) { LOG_DEBUG("foreign_key_cascade", "stmt", stmt_buf, K(old_row), K(new_row), K(fk_arg)); if (OB_FAIL(op.begin_nested_session(fk_arg.is_self_ref_))) { LOG_WARN("failed to begin nested session", K(ret)); } else { // must call end_nested_session() if begin_nested_session() success. // https://work.aone.alibaba-inc.com/issue/29621871 // skip modify_ctx.set_foreign_key_cascade when cascade update and self ref. if (!(fk_arg.is_self_ref_ && !new_row.empty()) && OB_FAIL(op.set_foreign_key_cascade(true))) { LOG_WARN("failed to set foreign key cascade", K(ret)); } else if (OB_FAIL(op.execute_write(stmt_buf))) { LOG_WARN("failed to execute stmt", K(ret), K(stmt_buf)); } int reset_ret = op.set_foreign_key_cascade(false); if (OB_SUCCESS != reset_ret) { LOG_WARN("failed to reset foreign key cascade", K(reset_ret)); if (OB_SUCCESS == ret) { ret = reset_ret; } } int end_ret = op.end_nested_session(); if (OB_SUCCESS != end_ret) { LOG_WARN("failed to end nested session", K(end_ret)); if (OB_SUCCESS == ret) { ret = end_ret; } } } } return ret; } int ForeignKeyHandle::gen_set(ObEvalCtx &eval_ctx, char *&buf, int64_t &len, int64_t &pos, const ObIArray &columns, const ObExprPtrIArray &row, ObIAllocator &alloc, const ObObjPrintParams &print_params) { return gen_column_value(eval_ctx, buf, len, pos, columns, row, ", ", alloc, print_params, false); } int ForeignKeyHandle::gen_where(ObEvalCtx &eval_ctx, char *&buf, int64_t &len, int64_t &pos, const ObIArray &columns, const ObExprPtrIArray &row, ObIAllocator &alloc, const ObObjPrintParams &print_params) { return gen_column_value(eval_ctx, buf, len, pos, columns, row, " and ", alloc, print_params, true); } int ForeignKeyHandle::gen_column_value(ObEvalCtx &eval_ctx, char *&buf, int64_t &len, int64_t &pos, const ObIArray &columns, const ObExprPtrIArray &row, const char *delimiter, ObIAllocator &alloc, const ObObjPrintParams &print_params, bool forbid_null) { static const char *COLUMN_FMT_MYSQL = "`%.*s` = "; static const char *COLUMN_FMT_ORACLE = "\"%.*s\" = "; const char *column_fmt = lib::is_mysql_mode() ? COLUMN_FMT_MYSQL : COLUMN_FMT_ORACLE; int ret = OB_SUCCESS; ObDatum *col_datum = NULL; if (OB_ISNULL(delimiter) || OB_ISNULL(print_params.tz_info_)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("argument is invalid", K(ret), KP(buf), KP(delimiter), KP(print_params.tz_info_)); } else if (columns.count() <= 0) { ret= OB_ERR_UNEXPECTED; LOG_ERROR("columns count of fk is zero or less than zero", K(ret), K(columns.count()), K(columns)); } for (int64_t i = 0; OB_SUCC(ret) && i < columns.count(); i++) { OZ(row.at(columns.at(i).idx_)->eval(eval_ctx, col_datum)); if (OB_SUCC(ret)) { const ObString &col_name = columns.at(i).name_; const ObObjMeta &col_obj_meta = row.at(columns.at(i).idx_)->obj_meta_; ObObj col_obj; if (forbid_null && col_datum->is_null()) { ret = OB_ERR_NULL_VALUE; // NO LOG. } else if (OB_FAIL(databuff_printf(buf, len, pos, alloc, column_fmt, col_name.length(), col_name.ptr()))) { LOG_WARN("failed to print column name", K(ret), K(col_name)); } else if (OB_FAIL(col_datum->to_obj(col_obj, col_obj_meta))) { LOG_WARN("to_obj failed", K(ret), K(*col_datum), K(col_obj_meta)); } else if (OB_FAIL(col_obj.print_sql_literal(buf, len, pos, alloc, print_params))) { LOG_WARN("failed to print column value", K(ret), K(*col_datum), K(col_obj)); } else if (OB_FAIL(databuff_printf(buf, len, pos, alloc, "%s", delimiter))) { LOG_WARN("failed to print delimiter", K(ret), K(delimiter)); } } } if (OB_SUCC(ret)) { pos -= STRLEN(delimiter); buf[pos++] = 0; } return ret; } int ForeignKeyHandle::is_self_ref_row(ObEvalCtx &eval_ctx, const ObExprPtrIArray &row, const ObForeignKeyArg &fk_arg, bool &is_self_ref) { int ret = OB_SUCCESS; is_self_ref = fk_arg.is_self_ref_; ObDatum *name_col = NULL; ObDatum *val_col = NULL; for (int64_t i = 0; is_self_ref && i < fk_arg.columns_.count(); i++) { const int32_t name_idx = fk_arg.columns_.at(i).name_idx_; const int32_t val_idx = fk_arg.columns_.at(i).idx_; ObExprCmpFuncType cmp_func = row.at(name_idx)->basic_funcs_->null_first_cmp_; // TODO qubin.qb: uncomment below block revert the defensive check //OZ((cmp_func = (row.at(name_idx)->basic_funcs_->null_first_cmp_)) // != row.at(val_idx)->basic_funcs_->null_first_cmp_); OZ(row.at(name_idx)->eval(eval_ctx, name_col)); OZ(row.at(val_idx)->eval(eval_ctx, val_col)); OX(is_self_ref = (0 == cmp_func(*name_col, *val_col))); } return ret; } // make MY_SPEC macro available. OB_INLINE static const ObTableModifySpec &get_my_spec(const ObTableModifyOp &op) { return op.get_spec(); } ObTableModifySpec::ObTableModifySpec(common::ObIAllocator &alloc, const ObPhyOperatorType type) : ObOpSpec(alloc, type), expr_frame_info_(NULL), ab_stmt_id_(nullptr), flags_(0) { } OB_DEF_SERIALIZE(ObTableModifySpec) { int ret = OB_SUCCESS; BASE_SER((ObTableModifySpec, ObOpSpec)); OB_UNIS_ENCODE(flags_); OB_UNIS_ENCODE(ab_stmt_id_); return ret; } OB_DEF_DESERIALIZE(ObTableModifySpec) { int ret = OB_SUCCESS; BASE_DESER((ObTableModifySpec, ObOpSpec)); OB_UNIS_DECODE(flags_); OB_UNIS_DECODE(ab_stmt_id_); return ret; } OB_DEF_SERIALIZE_SIZE(ObTableModifySpec) { int64_t len = 0; BASE_ADD_LEN((ObTableModifySpec, ObOpSpec)); OB_UNIS_ADD_LEN(flags_); OB_UNIS_ADD_LEN(ab_stmt_id_); return len; } OB_SERIALIZE_MEMBER(ObTableModifyOpInput); ObTableModifyOp::ObTableModifyOp(ObExecContext &ctx, const ObOpSpec &spec, ObOpInput *input) : ObOperator(ctx, spec, input), sql_proxy_(NULL), inner_conn_(NULL), tenant_id_(0), saved_conn_(), foreign_key_checks_(false), need_close_conn_(false), iter_end_(false), dml_rtctx_(eval_ctx_, ctx, *this), is_error_logging_(false), err_log_rt_def_(), saved_session_(NULL) { obj_print_params_ = CREATE_OBJ_PRINT_PARAM(ctx_.get_my_session()); obj_print_params_.need_cast_expr_ = true; // bugfix:https://work.aone.alibaba-inc.com/issue/36658497 // in NO_BACKSLASH_ESCAPES, obj_print_sql won't escape. // We use skip_escape_ to indicate this case. It will finally be passed to ObHexEscapeSqlStr. GET_SQL_MODE_BIT(IS_NO_BACKSLASH_ESCAPES, ctx_.get_my_session()->get_sql_mode(), obj_print_params_.skip_escape_); } bool ObTableModifyOp::is_fk_root_session() { bool ret = false; if (OB_ISNULL(ctx_.get_parent_ctx())) { if (this->need_foreign_key_checks()) { ret = true; } } else { if (!ctx_.get_parent_ctx()->get_das_ctx().is_fk_cascading_ && need_foreign_key_checks()) { ret = true; } } return ret; } int ObTableModifyOp::inner_open() { int ret = OB_SUCCESS; ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx_); if (OB_FAIL(init_foreign_key_operation())) { LOG_WARN("failed to init foreign key operation", K(ret)); } else if (MY_SPEC.plan_->has_nested_sql() && OB_FAIL(open_inner_conn())) { LOG_WARN("failed to open inner conn", K(ret)); } else if (OB_FAIL(calc_single_table_loc())) { LOG_WARN("calc single table loc failed", K(ret)); } else { init_das_dml_ctx(); } return ret; } void ObTableModifyOp::clear_dml_evaluated_flag(ObExpr *clear_expr) { if (clear_expr->is_batch_result()) { clear_expr->get_evaluated_flags(eval_ctx_).unset(eval_ctx_.get_batch_idx()); } else { clear_expr->get_eval_info(eval_ctx_).clear_evaluated_flag(); } if (0 < clear_expr->parent_cnt_) { clear_dml_evaluated_flag(clear_expr->parent_cnt_, clear_expr->parents_); } } void ObTableModifyOp::clear_dml_evaluated_flag(int64_t parent_cnt, ObExpr **parent_exprs) { for (int64_t i = 0; i < parent_cnt; ++i) { clear_dml_evaluated_flag(parent_exprs[i]); } } void ObTableModifyOp::clear_dml_evaluated_flag() { // Clear the parent expr because the assignment expr may be changed for (int64_t i = 0; i < trigger_clear_exprs_.count(); ++i) { ObExpr *expr = trigger_clear_exprs_.at(i); clear_dml_evaluated_flag(expr); } } OB_INLINE int ObTableModifyOp::init_das_dml_ctx() { ObSQLSessionInfo *session = GET_MY_SESSION(ctx_); const char *label = nullptr; switch(MY_SPEC.type_) { case PHY_INSERT: label = "InsDASCtx"; break; case PHY_UPDATE: label = "UpdDASCtx"; break; case PHY_DELETE: label = "DelDASCtx"; break; case PHY_LOCK: label = "LockDASCtx"; break; default: label = "DmlDASCtx"; break; } ObMemAttr memattr(session->get_effective_tenant_id(), label, ObCtxIds::EXECUTE_CTX_ID); dml_rtctx_.das_ref_.set_expr_frame_info(nullptr != MY_SPEC.expr_frame_info_ ? MY_SPEC.expr_frame_info_ : &MY_SPEC.plan_->get_expr_frame_info()); dml_rtctx_.das_ref_.set_mem_attr(memattr); dml_rtctx_.das_ref_.set_execute_directly(!MY_SPEC.use_dist_das_); return OB_SUCCESS; } int ObTableModifyOp::merge_implict_cursor(int64_t insert_rows, int64_t update_rows, int64_t delete_rows, int64_t found_rows) { int ret = OB_SUCCESS; bool is_ins_val_opt = ctx_.get_sql_ctx()->multi_stmt_item_.is_ins_multi_val_opt(); if (MY_SPEC.ab_stmt_id_ != nullptr && !is_ins_val_opt) { ObDatum *stmt_id_datum = nullptr; if (OB_FAIL(MY_SPEC.ab_stmt_id_->eval(eval_ctx_, stmt_id_datum))) { LOG_WARN("eval ab stmt id failed", K(ret)); } else { ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); int64_t stmt_id = stmt_id_datum->get_int(); ObImplicitCursorInfo implicit_cursor; implicit_cursor.stmt_id_ = stmt_id; implicit_cursor.found_rows_ += found_rows; implicit_cursor.matched_rows_ += found_rows; if (insert_rows > 0) { implicit_cursor.affected_rows_ += insert_rows; } if (update_rows > 0) { ObSQLSessionInfo *session = GET_MY_SESSION(ctx_); bool client_found_rows = session->get_capability().cap_flags_.OB_CLIENT_FOUND_ROWS; implicit_cursor.duplicated_rows_ += update_rows; implicit_cursor.affected_rows_ += client_found_rows ? found_rows : update_rows; } if (delete_rows > 0) { implicit_cursor.affected_rows_ += delete_rows; } if (OB_FAIL(plan_ctx->merge_implicit_cursor_info(implicit_cursor))) { LOG_WARN("merge implicit cursor info to plan ctx failed", K(ret), K(implicit_cursor)); } LOG_DEBUG("merge implicit cursor", K(ret), K(implicit_cursor)); } } return ret; } int ObTableModifyOp::inner_switch_iterator() { int ret = OB_SUCCESS; if (OB_FAIL(ObOperator::inner_switch_iterator())) { LOG_WARN("switch iterator failed", K(ret)); } return ret; } int ObTableModifyOp::inner_close() { int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = ctx_.get_physical_plan_ctx(); // release cache_handle for auto-increment share::ObAutoincrementService &auto_service = share::ObAutoincrementService::get_instance(); ObIArray &autoinc_params = plan_ctx->get_autoinc_params(); for (int64_t i = 0; i < autoinc_params.count(); ++i) { if (NULL != autoinc_params.at(i).cache_handle_) { auto_service.release_handle(autoinc_params.at(i).cache_handle_); } } close_inner_conn(); if (dml_rtctx_.das_ref_.has_task()) { if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { LOG_WARN("close all insert das task failed", K(ret)); } else { dml_rtctx_.das_ref_.reset(); } } // Release the hash sets created at root ctx for delete distinct check if (OB_SUCC(ret) && get_exec_ctx().is_root_ctx()) { DASDelCtxList& del_ctx_list = get_exec_ctx().get_das_ctx().get_das_del_ctx_list(); DASDelCtxList::iterator iter = del_ctx_list.begin(); for (; OB_SUCC(ret)&& iter != del_ctx_list.end(); iter++) { DmlRowkeyDistCtx del_ctx = *iter; del_ctx.deleted_rows_->destroy(); } del_ctx_list.destroy(); } return ret; } OB_INLINE int ObTableModifyOp::init_foreign_key_operation() { int ret = OB_SUCCESS; ObPhysicalPlanCtx *phy_plan_ctx = GET_PHY_PLAN_CTX(ctx_); if (OB_ISNULL(phy_plan_ctx)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("modify_ctx or phy_plan_ctx is NULL", K(ret), KP(phy_plan_ctx)); } else if ((lib::is_mysql_mode() && phy_plan_ctx->need_foreign_key_checks()) || lib::is_oracle_mode()) { // set fk check even if ret != OB_SUCCESS. see ObTableModifyOp::inner_close() set_foreign_key_checks(); } return ret; } int ObTableModifyOp::inner_rescan() { int ret = OB_SUCCESS; if (OB_FAIL(ObOperator::inner_rescan())) { LOG_WARN("rescan child operator failed", K(ret)); } else { iter_end_ = false; if (dml_rtctx_.das_ref_.has_task()) { if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { LOG_WARN("close all insert das task failed", K(ret)); } else { dml_rtctx_.reuse(); } } } if (OB_SUCC(ret)) { if (OB_FAIL(calc_single_table_loc())) { LOG_WARN("calc single table loc failed", K(ret)); } } return ret; } int ObTableModifyOp::get_gi_task() { int ret = OB_SUCCESS; ObGranuleTaskInfo gi_task_info; GIPrepareTaskMap *gi_prepare_map = nullptr; ObDASCtx &das_ctx = ctx_.get_das_ctx(); ObTableID table_loc_id = OB_INVALID_ID; ObTableID ref_table_id = OB_INVALID_ID; ObTabletID tablet_id; if (OB_FAIL(MY_SPEC.get_single_table_loc_id(table_loc_id, ref_table_id))) { LOG_WARN("get single table loc id failed", K(ret)); } else if (OB_FAIL(ctx_.get_gi_task_map(gi_prepare_map))) { LOG_WARN("Failed to get gi task map", K(ret)); } else if (OB_FAIL(gi_prepare_map->get_refactored(MY_SPEC.id_, gi_task_info))) { if (ret != OB_HASH_NOT_EXIST) { LOG_WARN("failed to get prepare gi task", K(ret), K(MY_SPEC.get_id())); } else { LOG_DEBUG("no prepared task info, set table modify to end", K(MY_SPEC.get_id()), K(this), K(lbt())); // 当前DML算子无法从 GI中获得 task,表示当前DML算子iter end iter_end_ = true; ret = OB_SUCCESS; } } else if (OB_ISNULL(get_input()->table_loc_) && OB_ISNULL(get_input()->table_loc_ = das_ctx.get_table_loc_by_id(table_loc_id, ref_table_id))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get table location by table id failed", K(ret), K(table_loc_id), K(ref_table_id)); } else { get_input()->tablet_loc_ = gi_task_info.tablet_loc_; LOG_DEBUG("DML operator consume a task", K(ret), K(gi_task_info), KPC(get_input()), K(MY_SPEC.id_), K(get_input())); } return ret; } int ObTableModifyOp::calc_single_table_loc() { int ret = OB_SUCCESS; if (!MY_SPEC.use_dist_das_) { ObTableID table_loc_id = OB_INVALID_ID; ObTableID ref_table_id = OB_INVALID_ID; ObDASCtx &das_ctx = ctx_.get_das_ctx(); ObDASTableLoc *table_loc = nullptr; if (OB_UNLIKELY(MY_SPEC.gi_above_)) { if (OB_FAIL(get_gi_task())) { LOG_WARN("get gi task failed", K(ret)); } } else if (OB_FAIL(MY_SPEC.get_single_table_loc_id(table_loc_id, ref_table_id))) { LOG_WARN("get single table loc id failed", K(ret)); } else if (OB_ISNULL(table_loc = das_ctx.get_table_loc_by_id(table_loc_id, ref_table_id))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get table location by table id failed", K(ret), K(table_loc_id), K(ref_table_id), K(das_ctx.get_table_loc_list())); } else { get_input()->table_loc_ = table_loc; get_input()->tablet_loc_ = table_loc->get_first_tablet_loc(); } } return ret; } int ObTableModifyOp::open_inner_conn() { int ret = OB_SUCCESS; ObInnerSQLConnectionPool *pool = NULL; ObSQLSessionInfo *session = NULL; ObISQLConnection *conn; if (OB_ISNULL(sql_proxy_ = ctx_.get_sql_proxy())) { ret = OB_NOT_INIT; LOG_WARN("sql proxy is NULL", K(ret)); } else if (OB_ISNULL(session = ctx_.get_my_session())) { ret = OB_NOT_INIT; LOG_WARN("session is NULL", K(ret)); } else if (NULL != session->get_inner_conn()) { // do nothing. } else if (OB_ISNULL(pool = static_cast(sql_proxy_->get_pool()))) { ret = OB_NOT_INIT; LOG_WARN("connection pool is NULL", K(ret)); } else if (INNER_POOL != pool->get_type()) { LOG_WARN("connection pool type is not inner", K(ret), K(pool->get_type())); } else if (OB_FAIL(pool->acquire(session, conn))) { LOG_WARN("failed to acquire inner connection", K(ret)); } else { /** * session is the only data struct which can pass through multi layer nested sql, * so we put inner conn in session to share it within multi layer nested sql. */ session->set_inner_conn(conn); need_close_conn_ = true; } if (OB_SUCC(ret)) { inner_conn_ = static_cast(session->get_inner_conn()); tenant_id_ = session->get_effective_tenant_id(); } return ret; } int ObTableModifyOp::close_inner_conn() { /** * we can call it even if open_inner_conn() failed, because only the one who call * open_inner_conn() succeed first will do close job by "if (need_close_conn_)". */ int ret = OB_SUCCESS; if (need_close_conn_) { ObSQLSessionInfo *session = ctx_.get_my_session(); if (OB_ISNULL(sql_proxy_) || OB_ISNULL(session)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("sql_proxy of session is NULL", K(ret), KP(sql_proxy_), KP(session)); } else { OZ(sql_proxy_->close(static_cast(session->get_inner_conn()), true)); OX(session->set_inner_conn(NULL)); } need_close_conn_ = false; } sql_proxy_ = NULL; inner_conn_ = NULL; return ret; } int ObTableModifyOp::begin_nested_session(bool skip_cur_stmt_tables) { int ret = OB_SUCCESS; if (OB_ISNULL(inner_conn_)) { ret = OB_NOT_INIT; LOG_WARN("inner connection is NULL", K(ret)); } else if (OB_FAIL(inner_conn_->begin_nested_session(get_saved_session(), saved_conn_, skip_cur_stmt_tables))) { LOG_WARN("failed to begin nested session", K(ret)); } return ret; } int ObTableModifyOp::end_nested_session() { int ret = OB_SUCCESS; if (OB_ISNULL(inner_conn_)) { ret = OB_NOT_INIT; LOG_WARN("inner connection is NULL", K(ret)); } else if (OB_FAIL(inner_conn_->end_nested_session(get_saved_session(), saved_conn_))) { LOG_WARN("failed to end nested session", K(ret)); } return ret; } int ObTableModifyOp::set_foreign_key_cascade(bool is_cascade) { int ret = OB_SUCCESS; OV (OB_NOT_NULL(inner_conn_)); OZ (inner_conn_->set_foreign_key_cascade(is_cascade)); return ret; } int ObTableModifyOp::get_foreign_key_cascade(bool &is_cascade) const { int ret = OB_SUCCESS; OV (OB_NOT_NULL(inner_conn_)); OZ (inner_conn_->get_foreign_key_cascade(is_cascade)); return ret; } int ObTableModifyOp::set_foreign_key_check_exist(bool is_check_exist) { int ret = OB_SUCCESS; OV (OB_NOT_NULL(inner_conn_)); OZ (inner_conn_->set_foreign_key_check_exist(is_check_exist)); return ret; } int ObTableModifyOp::get_foreign_key_check_exist(bool &is_check_exist) const { int ret = OB_SUCCESS; OV (OB_NOT_NULL(inner_conn_)); OZ (inner_conn_->get_foreign_key_check_exist(is_check_exist)); return ret; } int ObTableModifyOp::execute_write(const char *sql) { int ret = OB_SUCCESS; int64_t affected_rows = 0; if (OB_ISNULL(inner_conn_)) { ret = OB_NOT_INIT; LOG_WARN("inner connection is NULL", K(ret)); } else if (OB_ISNULL(sql)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sql is NULL"); } else if (OB_FAIL(inner_conn_->execute_write(tenant_id_, sql, affected_rows))) { LOG_WARN("failed to execute sql", K(ret), K(tenant_id_), K(sql)); } return ret; } int ObTableModifyOp::execute_read(const char *sql, ObMySQLProxy::MySQLResult &res) { int ret = OB_SUCCESS; if (OB_ISNULL(inner_conn_) || OB_ISNULL(sql_proxy_)) { ret = OB_NOT_INIT; LOG_WARN("inner connection or sql proxy is NULL", K(ret), KP(inner_conn_), KP(sql_proxy_)); } else if (OB_ISNULL(sql)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sql is NULL"); } else if (OB_FAIL(inner_conn_->execute_read(tenant_id_, sql, res))) { LOG_WARN("failed to execute sql", K(ret), K(tenant_id_), K(sql)); } return ret; } int ObTableModifyOp::check_stack() { int ret = OB_SUCCESS; const int max_stack_deep = 16; bool is_stack_overflow = false; ObSQLSessionInfo *session = ctx_.get_my_session(); if (OB_ISNULL(session)) { ret = OB_ERR_NULL_VALUE; LOG_WARN("sql session is NULL", K(ret)); } else if (session->get_nested_count() > max_stack_deep) { ret = OB_SIZE_OVERFLOW; LOG_WARN("too deep recursive", K(ret), K(max_stack_deep), K(session->get_nested_count())); } else if (OB_FAIL(check_stack_overflow(is_stack_overflow))) { LOG_WARN("fail to check stack overflow", K(ret), K(is_stack_overflow)); } else if (is_stack_overflow) { ret = OB_SIZE_OVERFLOW; LOG_WARN("too deep recursive", K(ret), K(is_stack_overflow)); } return ret; } OperatorOpenOrder ObTableModifyOp::get_operator_open_order() const { OperatorOpenOrder open_order = OPEN_CHILDREN_FIRST; if (spec_.plan_->is_use_px()) { // 如果在dml中开启了PX,有两种情况: // 1. PDML:在pdml中,pdml-op完全不需要其对应的input来提供 运行时的参数,所以会直接返回 // open_order = OPEN_CHILDREN_FIRST // 2. DML+PX: 在这种情况下,dml运行是的参数完全是由其头上的GI算子塞过来的,例如这样的计划 : // PX COORD // PX TRANSMIT // GI (FULL PARTITION WISE) // DELETE // TSC // 这样的计划,需要GI来驱动删除,GI每次迭代一个新的Task,就rescan delete, // 将对应的Task的信息塞给delete算子。 open_order = OPEN_CHILDREN_FIRST; } return open_order; } void ObTableModifyOp::log_user_error_inner(int ret, int64_t col_idx, int64_t row_num, const ObIArray &column_infos) const { if (OB_DATA_OUT_OF_RANGE == ret && (0 <= col_idx && col_idx < column_infos.count())) { ObString column_name = column_infos.at(col_idx).column_name_; ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name, CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection()); LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, column_name.length(), column_name.ptr(), row_num); } else if (OB_ERR_DATA_TOO_LONG == ret && (0 <= col_idx && col_idx < column_infos.count())) { ObString column_name = column_infos.at(col_idx).column_name_; ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name, CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection()); if (lib::is_mysql_mode()) { //for error code OB_ERR_DATA_TOO_LONG, oracle and mysql has the different error msg //oracle error msg has output in ObExprColumnConv, so need to skip oracle mode here LOG_USER_ERROR(OB_ERR_DATA_TOO_LONG, column_name.length(), column_name.ptr(), row_num); } } else if (OB_ERR_VALUE_LARGER_THAN_ALLOWED == ret && col_idx >= 0) { LOG_USER_ERROR(OB_ERR_VALUE_LARGER_THAN_ALLOWED); } else if ((OB_INVALID_DATE_VALUE == ret || OB_INVALID_DATE_FORMAT == ret) && (0 <= col_idx && col_idx < column_infos.count())) { ObString column_name = column_infos.at(col_idx).column_name_; ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name, CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection()); LOG_USER_ERROR(OB_ERR_INVALID_DATE_MSG_FMT_V2, column_name.length(), column_name.ptr(), row_num); } else { LOG_WARN("fail to operate row", K(ret)); } } void ObTableModifyOp::log_user_error_inner(int ret, int64_t row_num, const ColumnContent &column_info) const { if (OB_DATA_OUT_OF_RANGE == ret) { ObString column_name = column_info.column_name_; ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name, CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection()); LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, column_name.length(), column_name.ptr(), row_num); } else if (OB_ERR_DATA_TOO_LONG == ret) { ObString column_name = column_info.column_name_; ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name, CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection()); if (lib::is_mysql_mode()) { //for error code OB_ERR_DATA_TOO_LONG, oracle and mysql has the different error msg //oracle error msg has output in ObExprColumnConv, so need to skip oracle mode here LOG_USER_ERROR(OB_ERR_DATA_TOO_LONG, column_name.length(), column_name.ptr(), row_num); } } else if (OB_ERR_VALUE_LARGER_THAN_ALLOWED == ret) { LOG_USER_ERROR(OB_ERR_VALUE_LARGER_THAN_ALLOWED); } else if (OB_INVALID_DATE_VALUE == ret || OB_INVALID_DATE_FORMAT == ret) { ObString column_name = column_info.column_name_; ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name, CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection()); LOG_USER_ERROR(OB_ERR_INVALID_DATE_MSG_FMT_V2, column_name.length(), column_name.ptr(), row_num); } else { LOG_WARN("fail to operate row", K(ret)); } } int ObTableModifyOp::submit_all_dml_task() { int ret = OB_SUCCESS; if (dml_rtctx_.das_ref_.has_task()) { if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { LOG_WARN("execute all dml das task failed", K(ret)); } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { LOG_WARN("close all das task failed", K(ret)); } else { dml_rtctx_.reuse(); } } return ret; } //The data to be written by DML will be buffered in the DAS Write Buffer //When the buffer data exceeds 6M, //needs to be written to the storage to release the memory. int ObTableModifyOp::discharge_das_write_buffer() { int ret = OB_SUCCESS; if (dml_rtctx_.das_ref_.get_das_alloc().used() >= das::OB_DAS_MAX_TOTAL_PACKET_SIZE) { LOG_INFO("DASWriteBuffer full, now to write storage", "buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used()); ret = submit_all_dml_task(); } return ret; } int ObTableModifyOp::get_next_row_from_child() { int ret = OB_SUCCESS; clear_evaluated_flag(); if (OB_FAIL(child_->get_next_row())) { if (OB_ITER_END != ret) { LOG_WARN("fail to get next row", K(ret)); } } else { LOG_TRACE("child output row", "row", ROWEXPR2STR(eval_ctx_, child_->get_spec().output_)); } return ret; } int ObTableModifyOp::inner_get_next_row() { int ret = OB_SUCCESS; if (iter_end_) { LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_)); ret = OB_ITER_END; } else { while (OB_SUCC(ret)) { if (OB_FAIL(try_check_status())) { LOG_WARN("check status failed", K(ret)); } else if (OB_FAIL(get_next_row_from_child())) { if (OB_ITER_END != ret) { LOG_WARN("fail to get next row", K(ret)); } else { iter_end_ = true; ret = OB_SUCCESS; break; } } else if (OB_FAIL(write_row_to_das_buffer())) { LOG_WARN("write row to das failed", K(ret)); } else if (OB_FAIL(discharge_das_write_buffer())) { LOG_WARN("discharge das write buffer failed", K(ret)); } else if (is_error_logging_ && err_log_rt_def_.first_err_ret_ != OB_SUCCESS) { clear_evaluated_flag(); err_log_rt_def_.curr_err_log_record_num_++; err_log_rt_def_.reset(); continue; } else if (MY_SPEC.is_returning_) { break; } } if (OB_SUCC(ret) && iter_end_ && dml_rtctx_.das_ref_.has_task()) { //DML operator reach iter end, //now submit the remaining rows in the DAS Write Buffer to the storage if (dml_rtctx_.need_pick_del_task_first() && OB_FAIL(dml_rtctx_.das_ref_.pick_del_task_to_first())) { LOG_WARN("pick delete das task to first failed", K(ret)); } else if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { LOG_WARN("execute all dml das task failed", K(ret)); } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { LOG_WARN("close all das task failed", K(ret)); } } //to post process the DML info after writing all data to the storage or returning one row ret = write_rows_post_proc(ret); if (OB_SUCC(ret) && iter_end_) { ret = OB_ITER_END; } } return ret; } } // namespace sql } // namespace oceanbase