Files
oceanbase/src/sql/engine/dml/ob_table_modify_op.cpp

1179 lines
46 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/dml/ob_table_modify_op.h"
#include "sql/engine/dml/ob_dml_service.h"
#include "sql/engine/dml/ob_table_insert_op.h"
#include "sql/engine/dml/ob_table_update_op.h"
#include "sql/engine/basic/ob_expr_values_op.h"
#include "sql/engine/expr/ob_expr_column_conv.h"
#include "sql/engine/expr/ob_expr_calc_partition_id.h"
#include "sql/executor/ob_task_spliter.h"
#include "sql/das/ob_das_utils.h"
#include "storage/ob_i_store.h"
#include "lib/mysqlclient/ob_isql_client.h"
#include "observer/ob_inner_sql_connection_pool.h"
#include "lib/worker.h"
#include "share/ob_debug_sync.h"
namespace oceanbase
{
using namespace common;
using namespace common::sqlclient;
using namespace storage;
using namespace share;
using namespace share::schema;
using namespace observer;
namespace sql
{
int ForeignKeyHandle::do_handle(ObTableModifyOp &op,
const ObDMLBaseCtDef &dml_ctdef,
ObDMLBaseRtDef &dml_rtdef)
{
int ret = OB_SUCCESS;
if (op.need_foreign_key_checks()) {
const ObExprPtrIArray &old_row = dml_ctdef.old_row_;
const ObExprPtrIArray &new_row = dml_ctdef.new_row_;
op.get_exec_ctx().get_das_ctx().is_fk_cascading_ = true;
LOG_DEBUG("do foreign_key_handle", K(old_row), K(new_row));
if (OB_FAIL(op.check_stack())) {
LOG_WARN("fail to check stack", K(ret));
}
for (int i = 0; OB_SUCC(ret) && i < dml_ctdef.fk_args_.count(); i++) {
const ObForeignKeyArg &fk_arg = dml_ctdef.fk_args_.at(i);
if (OB_SUCC(ret) && !new_row.empty()) {
if (ACTION_CHECK_EXIST == fk_arg.ref_action_) {
// insert or update.
bool is_foreign_key_cascade = false;
if (OB_FAIL(op.get_foreign_key_cascade(is_foreign_key_cascade))) {
LOG_WARN("failed to get foreign key cascade", K(ret), K(fk_arg), K(new_row));
} else if (is_foreign_key_cascade) {
// nested update can not check parent row.
LOG_DEBUG("skip foreign_key_check_exist in nested session");
} else if (OB_FAIL(check_exist(op, fk_arg, new_row, false))) {
LOG_WARN("failed to check exist", K(ret), K(fk_arg), K(new_row));
}
}
}
if (OB_SUCC(ret) && !old_row.empty()) {
if (ACTION_RESTRICT == fk_arg.ref_action_ || ACTION_NO_ACTION == fk_arg.ref_action_) {
// update or delete.
bool has_changed = false;
if (OB_FAIL(value_changed(op, fk_arg.columns_, old_row, new_row, has_changed))) {
LOG_WARN("failed to check if foreign key value has changed",
K(ret), K(fk_arg), K(old_row), K(new_row));
} else if (!has_changed) {
// nothing.
} else if (OB_FAIL(check_exist(op, fk_arg, old_row, true))) {
LOG_WARN("failed to check exist", K(ret), K(fk_arg), K(old_row));
}
} else if (ACTION_CASCADE == fk_arg.ref_action_) {
// update or delete.
bool is_self_ref = false;
if (OB_FAIL(is_self_ref_row(op.get_eval_ctx(), old_row, fk_arg, is_self_ref))) {
LOG_WARN("is_self_ref_row failed", K(ret), K(old_row), K(fk_arg));
} else if (new_row.empty() && is_self_ref && op.is_fk_nested_session()) {
// delete self refercnced row should not cascade delete.
} else if (OB_FAIL(cascade(op, fk_arg, old_row, new_row))) {
LOG_WARN("failed to cascade", K(ret), K(fk_arg), K(old_row), K(new_row));
} else if (!new_row.empty() && is_self_ref) {
// we got here only when:
// 1. handling update operator and
// 2. foreign key constraint is self reference
// need to change %new_row and %old_row
// (see https://aone.alibaba-inc.com/issue/17476162)
// xxx_row_res_info helps to restore row. restore row before get_next_row()
//op.fk_self_ref_row_res_infos_.reset();
for (int64_t i = 0; OB_SUCC(ret) && i < fk_arg.columns_.count(); i++) {
const int32_t val_idx = fk_arg.columns_.at(i).idx_;
const int32_t name_idx = fk_arg.columns_.at(i).name_idx_;
if (OB_SUCC(ret)) {
const ObDatum &updated_value = new_row.at(val_idx)->
locate_expr_datum(op.get_eval_ctx());
ObDatum &new_row_name_col = new_row.at(name_idx)->
locate_expr_datum(op.get_eval_ctx());
ObDatum &old_row_name_col = old_row.at(name_idx)->
locate_expr_datum(op.get_eval_ctx());
OX(new_row_name_col = updated_value);
OX(old_row_name_col = updated_value);
}
}
}
}
} // if (old_row.is_valid())
} // for
//fk cascading end
op.get_exec_ctx().get_das_ctx().is_fk_cascading_ = false;
} else {
LOG_DEBUG("skip foreign_key_handle");
}
return ret;
}
int ForeignKeyHandle::value_changed(ObTableModifyOp &op,
const ObIArray<ObForeignKeyColumn> &columns,
const ObExprPtrIArray &old_row,
const ObExprPtrIArray &new_row,
bool &has_changed)
{
int ret = OB_SUCCESS;
has_changed = false;
if (!old_row.empty() && !new_row.empty()) {
ObDatum *old_row_col = NULL;
ObDatum *new_row_col = NULL;
for (int64_t i = 0; OB_SUCC(ret) && !has_changed && i < columns.count(); ++i) {
int64_t col_idx = columns.at(i).idx_;
CK(col_idx < old_row.count());
CK(col_idx < new_row.count());
CK(OB_NOT_NULL(old_row.at(col_idx)));
CK(OB_NOT_NULL(new_row.at(col_idx)));
OZ(old_row.at(col_idx)->eval(op.get_eval_ctx(), old_row_col));
OZ(new_row.at(col_idx)->eval(op.get_eval_ctx(), new_row_col));
OX(has_changed = (false == ObDatum::binary_equal(*old_row_col, *new_row_col)));
}
} else {
has_changed = true;
}
return ret;
}
int ForeignKeyHandle::check_exist(ObTableModifyOp &op,
const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &row,
bool expect_zero)
{
DEBUG_SYNC(BEFORE_FOREIGN_KEY_CONSTRAINT_CHECK);
int ret = OB_SUCCESS;
static const char *SELECT_FMT_MYSQL =
"select /*+ no_parallel */ 1 from `%.*s`.`%.*s` where %.*s limit 2";
static const char *SELECT_FMT_ORACLE =
"select /*+ no_parallel */ 1 from \"%.*s\".\"%.*s\" where %.*s and rownum <= 2";
const char *select_fmt = lib::is_mysql_mode() ? SELECT_FMT_MYSQL : SELECT_FMT_ORACLE;
ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR,
OB_MALLOC_NORMAL_BLOCK_SIZE,
op.get_eval_ctx().exec_ctx_.get_my_session()->get_effective_tenant_id());
char *stmt_buf = nullptr;
int64_t stmt_len = 0;
char *where_buf = nullptr;
int64_t where_len = 0;
int64_t stmt_pos = 0;
int64_t where_pos = 0;
const ObString &database_name = fk_arg.database_name_;
const ObString &table_name = fk_arg.table_name_;
if (row.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("row is invalid", K(ret));
} else if (OB_FAIL(gen_where(op.get_eval_ctx(), where_buf, where_len, where_pos,
fk_arg.columns_, row, alloc, op.get_obj_print_params()))) {
if (OB_LIKELY(OB_ERR_NULL_VALUE == ret)) {
// skip check exist if any column in foreign key is NULL.
ret = OB_SUCCESS;
stmt_pos = 0;
} else {
LOG_WARN("failed to gen foreign key where", K(ret), K(row), K(fk_arg.columns_));
}
} else if (OB_FAIL(databuff_printf(stmt_buf, stmt_len, stmt_pos, alloc, select_fmt,
database_name.length(), database_name.ptr(),
table_name.length(), table_name.ptr(),
static_cast<int>(where_pos), where_buf))) {
LOG_WARN("failed to print stmt", K(ret), K(table_name), K(where_buf));
} else {
stmt_buf[stmt_pos++] = 0;
}
if (OB_SUCC(ret) && stmt_pos > 0) {
LOG_DEBUG("foreign_key_check_exist", "stmt", stmt_buf, K(row), K(fk_arg));
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
if (OB_FAIL(op.begin_nested_session(fk_arg.is_self_ref_))) {
LOG_WARN("failed to begin nested session", K(ret), K(stmt_buf));
} else if (OB_FAIL(op.set_foreign_key_check_exist(true))) {
LOG_WARN("failed to set foreign key cascade", K(ret));
} else {
// must call end_nested_session() if begin_nested_session() success.
bool is_zero = false;
if (OB_FAIL(op.execute_read(stmt_buf, res))) {
LOG_WARN("failed to execute stmt", K(ret), K(stmt_buf));
} else {
// must call res.get_result()->close() if execute_read() success.
if (OB_ISNULL(res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result is NULL", K(ret));
} else if (OB_FAIL(res.get_result()->next())) {
if (OB_ITER_END == ret) {
is_zero = true;
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get next", K(ret));
}
}
if (OB_SUCC(ret)) {
if (is_zero != expect_zero) {
// 如果是自引用外键 新行引用了新行
bool is_self_ref = false;
// oracle for update does not support aggregate functions, so when is_zero is false
// judge whether only one row is affected by whether the second record can be obtained
bool is_affect_only_one = false;
if (!is_zero && OB_FAIL(res.get_result()->next())) {
if (OB_ITER_END == ret) {
is_affect_only_one = true;
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get next", K(ret));
}
}
/**
* expect_zero is false when fk_arg.ref_action_ is equal to
* ACTION_CHECK_EXIST, if the is_zero != expect_zero condition is
* true, then is_zero is true, need to exclude the case of self reference.
* other cases return OB_ERR_NO_REFERENCED_ROW.
*
* expect_zero is true when fk_arg.ref_action_ is equal to
* ACTION_RESTRICT or ACTION_NO_ACTION, if is_zero != expect_zero condition
* is true, then is_zero is false, need to exclude the case of self reference and
* only affect one row. other cases return OB_ERR_ROW_IS_REFERENCED.
*/
if (OB_FAIL(is_self_ref_row(op.get_eval_ctx(), row, fk_arg, is_self_ref))) {
LOG_WARN("is_self_ref_row failed", K(ret), K(row), K(fk_arg));
} else if (is_zero && !is_self_ref) {
ret = OB_ERR_NO_REFERENCED_ROW;
LOG_WARN("parent row is not exist", K(ret), K(fk_arg), K(row));
} else if (!is_zero) {
ret = OB_ERR_ROW_IS_REFERENCED;
LOG_WARN("child row is exist", K(ret), K(fk_arg), K(row));
}
}
}
int close_ret = res.get_result()->close();
if (OB_SUCCESS != close_ret) {
LOG_WARN("failed to close", K(close_ret));
if (OB_SUCCESS == ret) {
ret = close_ret;
}
}
}
int reset_ret = op.set_foreign_key_check_exist(false);
if (OB_SUCCESS != reset_ret) {
LOG_WARN("failed to reset foreign key cascade", K(reset_ret));
if (OB_SUCCESS == ret) {
ret = reset_ret;
}
}
int end_ret = op.end_nested_session();
if (OB_SUCCESS != end_ret) {
LOG_WARN("failed to end nested session", K(end_ret));
if (OB_SUCCESS == ret) {
ret = end_ret;
}
}
}
}
}
return ret;
}
int ForeignKeyHandle::cascade(ObTableModifyOp &op,
const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &old_row,
const ObExprPtrIArray &new_row)
{
static const char *UPDATE_FMT_MYSQL = "update `%.*s`.`%.*s` set %.*s where %.*s";
static const char *UPDATE_FMT_ORACLE = "update \"%.*s\".\"%.*s\" set %.*s where %.*s";
static const char *DELETE_FMT_MYSQL = "delete from `%.*s`.`%.*s` where %.*s";
static const char *DELETE_FMT_ORACLE = "delete from \"%.*s\".\"%.*s\" where %.*s";
int ret = OB_SUCCESS;
ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR,
OB_MALLOC_NORMAL_BLOCK_SIZE,
op.get_eval_ctx().exec_ctx_.get_my_session()->get_effective_tenant_id());
char *stmt_buf = nullptr;
int64_t stmt_len = 0;
char *where_buf = nullptr;
int64_t where_len = 0;
int64_t stmt_pos = 0;
int64_t where_pos = 0;
const ObString &database_name = fk_arg.database_name_;
const ObString &table_name = fk_arg.table_name_;
if (OB_FAIL(op.get_exec_ctx().check_status())) {
LOG_WARN("failed check status", K(ret));
} else if (old_row.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("old row is invalid", K(ret));
} else if (OB_FAIL(gen_where(op.get_eval_ctx(), where_buf, where_len, where_pos,
fk_arg.columns_, old_row, alloc, op.get_obj_print_params()))) {
if (OB_LIKELY(OB_ERR_NULL_VALUE == ret)) {
// skip cascade if any column in foreign key is NULL.
ret = OB_SUCCESS;
stmt_pos = 0;
} else {
LOG_WARN("failed to gen foreign key where", K(ret), K(old_row), K(fk_arg.columns_));
}
} else {
if (!new_row.empty()) {
// update.
const char *update_fmt = lib::is_mysql_mode() ? UPDATE_FMT_MYSQL : UPDATE_FMT_ORACLE;
char *set_buf = nullptr;
int64_t set_len = 0;
int64_t set_pos = 0;
if (OB_FAIL(gen_set(op.get_eval_ctx(), set_buf, set_len, set_pos,
fk_arg.columns_, new_row, alloc, op.get_obj_print_params()))) {
LOG_WARN("failed to gen foreign key set", K(ret), K(new_row), K(fk_arg.columns_));
} else if (OB_FAIL(databuff_printf(stmt_buf, stmt_len, stmt_pos, alloc, update_fmt,
database_name.length(), database_name.ptr(),
table_name.length(), table_name.ptr(),
static_cast<int>(set_pos), set_buf,
static_cast<int>(where_pos), where_buf))) {
LOG_WARN("failed to print stmt", K(ret), K(table_name), K(set_buf), K(where_buf));
} else {
stmt_buf[stmt_pos++] = 0;
}
} else {
// delete.
const char *delete_fmt = lib::is_mysql_mode() ? DELETE_FMT_MYSQL : DELETE_FMT_ORACLE;
if (OB_FAIL(databuff_printf(stmt_buf, stmt_len, stmt_pos, alloc, delete_fmt,
database_name.length(), database_name.ptr(),
table_name.length(), table_name.ptr(),
static_cast<int>(where_pos), where_buf))) {
LOG_WARN("failed to print stmt", K(ret), K(table_name), K(where_buf));
} else {
stmt_buf[stmt_pos++] = 0;
}
}
}
if (OB_SUCC(ret) && stmt_pos > 0) {
LOG_DEBUG("foreign_key_cascade", "stmt", stmt_buf, K(old_row), K(new_row), K(fk_arg));
if (OB_FAIL(op.begin_nested_session(fk_arg.is_self_ref_))) {
LOG_WARN("failed to begin nested session", K(ret));
} else {
// must call end_nested_session() if begin_nested_session() success.
// https://work.aone.alibaba-inc.com/issue/29621871
// skip modify_ctx.set_foreign_key_cascade when cascade update and self ref.
if (!(fk_arg.is_self_ref_ && !new_row.empty()) &&
OB_FAIL(op.set_foreign_key_cascade(true))) {
LOG_WARN("failed to set foreign key cascade", K(ret));
} else if (OB_FAIL(op.execute_write(stmt_buf))) {
LOG_WARN("failed to execute stmt", K(ret), K(stmt_buf));
}
int reset_ret = op.set_foreign_key_cascade(false);
if (OB_SUCCESS != reset_ret) {
LOG_WARN("failed to reset foreign key cascade", K(reset_ret));
if (OB_SUCCESS == ret) {
ret = reset_ret;
}
}
int end_ret = op.end_nested_session();
if (OB_SUCCESS != end_ret) {
LOG_WARN("failed to end nested session", K(end_ret));
if (OB_SUCCESS == ret) {
ret = end_ret;
}
}
}
}
return ret;
}
int ForeignKeyHandle::gen_set(ObEvalCtx &eval_ctx, char *&buf, int64_t &len, int64_t &pos,
const ObIArray<ObForeignKeyColumn> &columns,
const ObExprPtrIArray &row,
ObIAllocator &alloc,
const ObObjPrintParams &print_params)
{
return gen_column_value(eval_ctx, buf, len, pos, columns, row, ", ", alloc, print_params, false);
}
int ForeignKeyHandle::gen_where(ObEvalCtx &eval_ctx, char *&buf, int64_t &len, int64_t &pos,
const ObIArray<ObForeignKeyColumn> &columns,
const ObExprPtrIArray &row,
ObIAllocator &alloc,
const ObObjPrintParams &print_params)
{
return gen_column_value(eval_ctx, buf, len, pos, columns, row, " and ", alloc, print_params, true);
}
int ForeignKeyHandle::gen_column_value(ObEvalCtx &eval_ctx, char *&buf, int64_t &len, int64_t &pos,
const ObIArray<ObForeignKeyColumn> &columns,
const ObExprPtrIArray &row,
const char *delimiter,
ObIAllocator &alloc,
const ObObjPrintParams &print_params,
bool forbid_null)
{
static const char *COLUMN_FMT_MYSQL = "`%.*s` = ";
static const char *COLUMN_FMT_ORACLE = "\"%.*s\" = ";
const char *column_fmt = lib::is_mysql_mode() ? COLUMN_FMT_MYSQL : COLUMN_FMT_ORACLE;
int ret = OB_SUCCESS;
ObDatum *col_datum = NULL;
if (OB_ISNULL(delimiter) || OB_ISNULL(print_params.tz_info_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("argument is invalid", K(ret), KP(buf), KP(delimiter), KP(print_params.tz_info_));
} else if (columns.count() <= 0) {
ret= OB_ERR_UNEXPECTED;
LOG_ERROR("columns count of fk is zero or less than zero", K(ret), K(columns.count()), K(columns));
}
for (int64_t i = 0; OB_SUCC(ret) && i < columns.count(); i++) {
OZ(row.at(columns.at(i).idx_)->eval(eval_ctx, col_datum));
if (OB_SUCC(ret)) {
const ObString &col_name = columns.at(i).name_;
const ObObjMeta &col_obj_meta = row.at(columns.at(i).idx_)->obj_meta_;
ObObj col_obj;
if (forbid_null && col_datum->is_null()) {
ret = OB_ERR_NULL_VALUE;
// NO LOG.
} else if (OB_FAIL(databuff_printf(buf, len, pos, alloc, column_fmt,
col_name.length(), col_name.ptr()))) {
LOG_WARN("failed to print column name", K(ret), K(col_name));
} else if (OB_FAIL(col_datum->to_obj(col_obj, col_obj_meta))) {
LOG_WARN("to_obj failed", K(ret), K(*col_datum), K(col_obj_meta));
} else if (OB_FAIL(col_obj.print_sql_literal(buf, len, pos, alloc, print_params))) {
LOG_WARN("failed to print column value", K(ret), K(*col_datum), K(col_obj));
} else if (OB_FAIL(databuff_printf(buf, len, pos, alloc, "%s", delimiter))) {
LOG_WARN("failed to print delimiter", K(ret), K(delimiter));
}
}
}
if (OB_SUCC(ret)) {
pos -= STRLEN(delimiter);
buf[pos++] = 0;
}
return ret;
}
int ForeignKeyHandle::is_self_ref_row(ObEvalCtx &eval_ctx,
const ObExprPtrIArray &row,
const ObForeignKeyArg &fk_arg,
bool &is_self_ref)
{
int ret = OB_SUCCESS;
is_self_ref = fk_arg.is_self_ref_;
ObDatum *name_col = NULL;
ObDatum *val_col = NULL;
for (int64_t i = 0; is_self_ref && i < fk_arg.columns_.count(); i++) {
const int32_t name_idx = fk_arg.columns_.at(i).name_idx_;
const int32_t val_idx = fk_arg.columns_.at(i).idx_;
ObExprCmpFuncType cmp_func = row.at(name_idx)->basic_funcs_->null_first_cmp_;
// TODO qubin.qb: uncomment below block revert the defensive check
//OZ((cmp_func = (row.at(name_idx)->basic_funcs_->null_first_cmp_))
// != row.at(val_idx)->basic_funcs_->null_first_cmp_);
OZ(row.at(name_idx)->eval(eval_ctx, name_col));
OZ(row.at(val_idx)->eval(eval_ctx, val_col));
OX(is_self_ref = (0 == cmp_func(*name_col, *val_col)));
}
return ret;
}
// make MY_SPEC macro available.
OB_INLINE static const ObTableModifySpec &get_my_spec(const ObTableModifyOp &op)
{
return op.get_spec();
}
ObTableModifySpec::ObTableModifySpec(common::ObIAllocator &alloc,
const ObPhyOperatorType type)
: ObOpSpec(alloc, type),
expr_frame_info_(NULL),
ab_stmt_id_(nullptr),
flags_(0)
{
}
OB_DEF_SERIALIZE(ObTableModifySpec)
{
int ret = OB_SUCCESS;
BASE_SER((ObTableModifySpec, ObOpSpec));
OB_UNIS_ENCODE(flags_);
OB_UNIS_ENCODE(ab_stmt_id_);
return ret;
}
OB_DEF_DESERIALIZE(ObTableModifySpec)
{
int ret = OB_SUCCESS;
BASE_DESER((ObTableModifySpec, ObOpSpec));
OB_UNIS_DECODE(flags_);
OB_UNIS_DECODE(ab_stmt_id_);
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObTableModifySpec)
{
int64_t len = 0;
BASE_ADD_LEN((ObTableModifySpec, ObOpSpec));
OB_UNIS_ADD_LEN(flags_);
OB_UNIS_ADD_LEN(ab_stmt_id_);
return len;
}
OB_SERIALIZE_MEMBER(ObTableModifyOpInput);
ObTableModifyOp::ObTableModifyOp(ObExecContext &ctx,
const ObOpSpec &spec,
ObOpInput *input)
: ObOperator(ctx, spec, input),
sql_proxy_(NULL),
inner_conn_(NULL),
tenant_id_(0),
saved_conn_(),
foreign_key_checks_(false),
need_close_conn_(false),
iter_end_(false),
dml_rtctx_(eval_ctx_, ctx, *this),
is_error_logging_(false),
execute_single_row_(false),
err_log_rt_def_(),
dml_modify_rows_(ctx.get_allocator()),
saved_session_(NULL)
{
obj_print_params_ = CREATE_OBJ_PRINT_PARAM(ctx_.get_my_session());
obj_print_params_.need_cast_expr_ = true;
// bugfix:https://work.aone.alibaba-inc.com/issue/36658497
// in NO_BACKSLASH_ESCAPES, obj_print_sql<ObVarcharType> won't escape.
// We use skip_escape_ to indicate this case. It will finally be passed to ObHexEscapeSqlStr.
GET_SQL_MODE_BIT(IS_NO_BACKSLASH_ESCAPES, ctx_.get_my_session()->get_sql_mode(), obj_print_params_.skip_escape_);
}
bool ObTableModifyOp::is_fk_root_session() {
bool ret = false;
if (OB_ISNULL(ctx_.get_parent_ctx())) {
if (this->need_foreign_key_checks()) {
ret = true;
}
} else {
if (!ctx_.get_parent_ctx()->get_das_ctx().is_fk_cascading_ && need_foreign_key_checks()) {
ret = true;
}
}
return ret;
}
int ObTableModifyOp::inner_open()
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx_);
if (OB_FAIL(init_foreign_key_operation())) {
LOG_WARN("failed to init foreign key operation", K(ret));
} else if (MY_SPEC.plan_->has_nested_sql() && OB_FAIL(open_inner_conn())) {
LOG_WARN("failed to open inner conn", K(ret));
} else if (OB_FAIL(calc_single_table_loc())) {
LOG_WARN("calc single table loc failed", K(ret));
} else if (OB_FAIL(check_need_exec_single_row())) {
LOG_WARN("failed to perform single row execution check", K(ret));
} else {
init_das_dml_ctx();
}
return ret;
}
void ObTableModifyOp::clear_dml_evaluated_flag(ObExpr *clear_expr)
{
if (clear_expr->is_batch_result()) {
clear_expr->get_evaluated_flags(eval_ctx_).unset(eval_ctx_.get_batch_idx());
} else {
clear_expr->get_eval_info(eval_ctx_).clear_evaluated_flag();
}
if (0 < clear_expr->parent_cnt_) {
clear_dml_evaluated_flag(clear_expr->parent_cnt_, clear_expr->parents_);
}
}
void ObTableModifyOp::clear_dml_evaluated_flag(int64_t parent_cnt, ObExpr **parent_exprs)
{
for (int64_t i = 0; i < parent_cnt; ++i) {
clear_dml_evaluated_flag(parent_exprs[i]);
}
}
void ObTableModifyOp::clear_dml_evaluated_flag()
{
// Clear the parent expr because the assignment expr may be changed
for (int64_t i = 0; i < trigger_clear_exprs_.count(); ++i) {
ObExpr *expr = trigger_clear_exprs_.at(i);
clear_dml_evaluated_flag(expr);
}
}
OB_INLINE int ObTableModifyOp::init_das_dml_ctx()
{
ObSQLSessionInfo *session = GET_MY_SESSION(ctx_);
const char *label = nullptr;
switch(MY_SPEC.type_) {
case PHY_INSERT:
label = "InsDASCtx";
break;
case PHY_UPDATE:
label = "UpdDASCtx";
break;
case PHY_DELETE:
label = "DelDASCtx";
break;
case PHY_LOCK:
label = "LockDASCtx";
break;
default:
label = "DmlDASCtx";
break;
}
ObMemAttr memattr(session->get_effective_tenant_id(), label, ObCtxIds::EXECUTE_CTX_ID);
dml_rtctx_.das_ref_.set_expr_frame_info(nullptr != MY_SPEC.expr_frame_info_
? MY_SPEC.expr_frame_info_
: &MY_SPEC.plan_->get_expr_frame_info());
dml_rtctx_.das_ref_.set_mem_attr(memattr);
dml_rtctx_.das_ref_.set_execute_directly(!MY_SPEC.use_dist_das_);
return OB_SUCCESS;
}
int ObTableModifyOp::merge_implict_cursor(int64_t insert_rows,
int64_t update_rows,
int64_t delete_rows,
int64_t found_rows)
{
int ret = OB_SUCCESS;
bool is_ins_val_opt = ctx_.get_sql_ctx()->multi_stmt_item_.is_ins_multi_val_opt();
if (MY_SPEC.ab_stmt_id_ != nullptr && !is_ins_val_opt) {
ObDatum *stmt_id_datum = nullptr;
if (OB_FAIL(MY_SPEC.ab_stmt_id_->eval(eval_ctx_, stmt_id_datum))) {
LOG_WARN("eval ab stmt id failed", K(ret));
} else {
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
int64_t stmt_id = stmt_id_datum->get_int();
ObImplicitCursorInfo implicit_cursor;
implicit_cursor.stmt_id_ = stmt_id;
implicit_cursor.found_rows_ += found_rows;
implicit_cursor.matched_rows_ += found_rows;
if (insert_rows > 0) {
implicit_cursor.affected_rows_ += insert_rows;
}
if (update_rows > 0) {
ObSQLSessionInfo *session = GET_MY_SESSION(ctx_);
bool client_found_rows = session->get_capability().cap_flags_.OB_CLIENT_FOUND_ROWS;
implicit_cursor.duplicated_rows_ += update_rows;
implicit_cursor.affected_rows_ += client_found_rows ? found_rows : update_rows;
}
if (delete_rows > 0) {
implicit_cursor.affected_rows_ += delete_rows;
}
if (OB_FAIL(plan_ctx->merge_implicit_cursor_info(implicit_cursor))) {
LOG_WARN("merge implicit cursor info to plan ctx failed", K(ret), K(implicit_cursor));
}
LOG_DEBUG("merge implicit cursor", K(ret), K(implicit_cursor));
}
}
return ret;
}
int ObTableModifyOp::inner_switch_iterator()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObOperator::inner_switch_iterator())) {
LOG_WARN("switch iterator failed", K(ret));
}
return ret;
}
int ObTableModifyOp::inner_close()
{
int ret = OB_SUCCESS;
ObPhysicalPlanCtx *plan_ctx = ctx_.get_physical_plan_ctx();
// release cache_handle for auto-increment
share::ObAutoincrementService &auto_service = share::ObAutoincrementService::get_instance();
ObIArray<share::AutoincParam> &autoinc_params = plan_ctx->get_autoinc_params();
for (int64_t i = 0; i < autoinc_params.count(); ++i) {
if (NULL != autoinc_params.at(i).cache_handle_) {
auto_service.release_handle(autoinc_params.at(i).cache_handle_);
}
}
close_inner_conn();
if (dml_rtctx_.das_ref_.has_task()) {
if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) {
LOG_WARN("close all insert das task failed", K(ret));
} else {
dml_rtctx_.das_ref_.reset();
}
}
dml_modify_rows_.clear();
// Release the hash sets created at fk root ctx for delete distinct checks
if (OB_SUCC(ret) && get_exec_ctx().is_fk_root_ctx()) {
DASDelCtxList& del_ctx_list = get_exec_ctx().get_das_ctx().get_das_del_ctx_list();
DASDelCtxList::iterator iter = del_ctx_list.begin();
for (; OB_SUCC(ret)&& iter != del_ctx_list.end(); iter++) {
DmlRowkeyDistCtx del_ctx = *iter;
if (del_ctx.deleted_rows_ != nullptr) {
del_ctx.deleted_rows_->destroy();
del_ctx.deleted_rows_ = nullptr;
}
}
del_ctx_list.destroy();
}
return ret;
}
OB_INLINE int ObTableModifyOp::init_foreign_key_operation()
{
int ret = OB_SUCCESS;
ObPhysicalPlanCtx *phy_plan_ctx = GET_PHY_PLAN_CTX(ctx_);
if (OB_ISNULL(phy_plan_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("modify_ctx or phy_plan_ctx is NULL", K(ret), KP(phy_plan_ctx));
} else if ((lib::is_mysql_mode() && phy_plan_ctx->need_foreign_key_checks())
|| lib::is_oracle_mode()) {
// set fk check even if ret != OB_SUCCESS. see ObTableModifyOp::inner_close()
set_foreign_key_checks();
}
return ret;
}
int ObTableModifyOp::inner_rescan()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObOperator::inner_rescan())) {
LOG_WARN("rescan child operator failed", K(ret));
} else {
iter_end_ = false;
if (dml_rtctx_.das_ref_.has_task()) {
if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) {
LOG_WARN("close all insert das task failed", K(ret));
} else {
dml_rtctx_.reuse();
}
}
}
if (OB_SUCC(ret)) {
dml_modify_rows_.clear();
if (OB_FAIL(calc_single_table_loc())) {
LOG_WARN("calc single table loc failed", K(ret));
}
}
return ret;
}
int ObTableModifyOp::check_need_exec_single_row() {
int ret = OB_SUCCESS;
execute_single_row_ = false;
return ret;
}
int ObTableModifyOp::get_gi_task()
{
int ret = OB_SUCCESS;
ObGranuleTaskInfo gi_task_info;
GIPrepareTaskMap *gi_prepare_map = nullptr;
ObDASCtx &das_ctx = ctx_.get_das_ctx();
ObTableID table_loc_id = OB_INVALID_ID;
ObTableID ref_table_id = OB_INVALID_ID;
ObTabletID tablet_id;
if (OB_FAIL(MY_SPEC.get_single_table_loc_id(table_loc_id, ref_table_id))) {
LOG_WARN("get single table loc id failed", K(ret));
} else if (OB_FAIL(ctx_.get_gi_task_map(gi_prepare_map))) {
LOG_WARN("Failed to get gi task map", K(ret));
} else if (OB_FAIL(gi_prepare_map->get_refactored(MY_SPEC.id_, gi_task_info))) {
if (ret != OB_HASH_NOT_EXIST) {
LOG_WARN("failed to get prepare gi task", K(ret), K(MY_SPEC.get_id()));
} else {
LOG_DEBUG("no prepared task info, set table modify to end",
K(MY_SPEC.get_id()), K(this), K(lbt()));
// 当前DML算子无法从 GI中获得 task,表示当前DML算子iter end
iter_end_ = true;
ret = OB_SUCCESS;
}
} else if (OB_ISNULL(get_input()->table_loc_) &&
OB_ISNULL(get_input()->table_loc_ = das_ctx.get_table_loc_by_id(table_loc_id, ref_table_id))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get table location by table id failed", K(ret), K(table_loc_id), K(ref_table_id));
} else {
get_input()->tablet_loc_ = gi_task_info.tablet_loc_;
LOG_DEBUG("DML operator consume a task", K(ret),
K(gi_task_info), KPC(get_input()), K(MY_SPEC.id_), K(get_input()));
}
return ret;
}
int ObTableModifyOp::calc_single_table_loc()
{
int ret = OB_SUCCESS;
if (!MY_SPEC.use_dist_das_) {
ObTableID table_loc_id = OB_INVALID_ID;
ObTableID ref_table_id = OB_INVALID_ID;
ObDASCtx &das_ctx = ctx_.get_das_ctx();
ObDASTableLoc *table_loc = nullptr;
if (OB_UNLIKELY(MY_SPEC.gi_above_)) {
if (OB_FAIL(get_gi_task())) {
LOG_WARN("get gi task failed", K(ret));
}
} else if (OB_FAIL(MY_SPEC.get_single_table_loc_id(table_loc_id, ref_table_id))) {
LOG_WARN("get single table loc id failed", K(ret));
} else if (OB_ISNULL(table_loc = das_ctx.get_table_loc_by_id(table_loc_id, ref_table_id))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get table location by table id failed", K(ret),
K(table_loc_id), K(ref_table_id), K(das_ctx.get_table_loc_list()));
} else {
get_input()->table_loc_ = table_loc;
get_input()->tablet_loc_ = table_loc->get_first_tablet_loc();
}
}
return ret;
}
int ObTableModifyOp::open_inner_conn()
{
int ret = OB_SUCCESS;
ObInnerSQLConnectionPool *pool = NULL;
ObSQLSessionInfo *session = NULL;
ObISQLConnection *conn;
if (OB_ISNULL(sql_proxy_ = ctx_.get_sql_proxy())) {
ret = OB_NOT_INIT;
LOG_WARN("sql proxy is NULL", K(ret));
} else if (OB_ISNULL(session = ctx_.get_my_session())) {
ret = OB_NOT_INIT;
LOG_WARN("session is NULL", K(ret));
} else if (NULL != session->get_inner_conn()) {
// do nothing.
} else if (OB_ISNULL(pool = static_cast<ObInnerSQLConnectionPool*>(sql_proxy_->get_pool()))) {
ret = OB_NOT_INIT;
LOG_WARN("connection pool is NULL", K(ret));
} else if (INNER_POOL != pool->get_type()) {
LOG_WARN("connection pool type is not inner", K(ret), K(pool->get_type()));
} else if (OB_FAIL(pool->acquire(session, conn))) {
LOG_WARN("failed to acquire inner connection", K(ret));
} else {
/**
* session is the only data struct which can pass through multi layer nested sql,
* so we put inner conn in session to share it within multi layer nested sql.
*/
session->set_inner_conn(conn);
need_close_conn_ = true;
}
if (OB_SUCC(ret)) {
inner_conn_ = static_cast<ObInnerSQLConnection *>(session->get_inner_conn());
tenant_id_ = session->get_effective_tenant_id();
}
return ret;
}
int ObTableModifyOp::close_inner_conn()
{
/**
* we can call it even if open_inner_conn() failed, because only the one who call
* open_inner_conn() succeed first will do close job by "if (need_close_conn_)".
*/
int ret = OB_SUCCESS;
if (need_close_conn_) {
ObSQLSessionInfo *session = ctx_.get_my_session();
if (OB_ISNULL(sql_proxy_) || OB_ISNULL(session)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("sql_proxy of session is NULL", K(ret), KP(sql_proxy_), KP(session));
} else {
OZ(sql_proxy_->close(static_cast<ObInnerSQLConnection *>(session->get_inner_conn()), true));
OX(session->set_inner_conn(NULL));
}
need_close_conn_ = false;
}
sql_proxy_ = NULL;
inner_conn_ = NULL;
return ret;
}
int ObTableModifyOp::begin_nested_session(bool skip_cur_stmt_tables)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(inner_conn_)) {
ret = OB_NOT_INIT;
LOG_WARN("inner connection is NULL", K(ret));
} else if (OB_FAIL(inner_conn_->begin_nested_session(get_saved_session(), saved_conn_,
skip_cur_stmt_tables))) {
LOG_WARN("failed to begin nested session", K(ret));
}
return ret;
}
int ObTableModifyOp::end_nested_session()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(inner_conn_)) {
ret = OB_NOT_INIT;
LOG_WARN("inner connection is NULL", K(ret));
} else if (OB_FAIL(inner_conn_->end_nested_session(get_saved_session(), saved_conn_))) {
LOG_WARN("failed to end nested session", K(ret));
}
return ret;
}
int ObTableModifyOp::set_foreign_key_cascade(bool is_cascade)
{
int ret = OB_SUCCESS;
OV (OB_NOT_NULL(inner_conn_));
OZ (inner_conn_->set_foreign_key_cascade(is_cascade));
return ret;
}
int ObTableModifyOp::get_foreign_key_cascade(bool &is_cascade) const
{
int ret = OB_SUCCESS;
OV (OB_NOT_NULL(inner_conn_));
OZ (inner_conn_->get_foreign_key_cascade(is_cascade));
return ret;
}
int ObTableModifyOp::set_foreign_key_check_exist(bool is_check_exist)
{
int ret = OB_SUCCESS;
OV (OB_NOT_NULL(inner_conn_));
OZ (inner_conn_->set_foreign_key_check_exist(is_check_exist));
return ret;
}
int ObTableModifyOp::get_foreign_key_check_exist(bool &is_check_exist) const
{
int ret = OB_SUCCESS;
OV (OB_NOT_NULL(inner_conn_));
OZ (inner_conn_->get_foreign_key_check_exist(is_check_exist));
return ret;
}
int ObTableModifyOp::execute_write(const char *sql)
{
int ret = OB_SUCCESS;
int64_t affected_rows = 0;
if (OB_ISNULL(inner_conn_)) {
ret = OB_NOT_INIT;
LOG_WARN("inner connection is NULL", K(ret));
} else if (OB_ISNULL(sql)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql is NULL");
} else if (OB_FAIL(inner_conn_->execute_write(tenant_id_, sql, affected_rows))) {
LOG_WARN("failed to execute sql", K(ret), K(tenant_id_), K(sql));
}
return ret;
}
int ObTableModifyOp::execute_read(const char *sql, ObMySQLProxy::MySQLResult &res)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(inner_conn_) || OB_ISNULL(sql_proxy_)) {
ret = OB_NOT_INIT;
LOG_WARN("inner connection or sql proxy is NULL", K(ret), KP(inner_conn_), KP(sql_proxy_));
} else if (OB_ISNULL(sql)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql is NULL");
} else if (OB_FAIL(inner_conn_->execute_read(tenant_id_, sql, res))) {
LOG_WARN("failed to execute sql", K(ret), K(tenant_id_), K(sql));
}
return ret;
}
int ObTableModifyOp::check_stack()
{
int ret = OB_SUCCESS;
const int max_stack_deep = 16;
bool is_stack_overflow = false;
ObSQLSessionInfo *session = ctx_.get_my_session();
if (OB_ISNULL(session)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("sql session is NULL", K(ret));
} else if (session->get_nested_count() > max_stack_deep) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("too deep recursive", K(ret), K(max_stack_deep), K(session->get_nested_count()));
} else if (OB_FAIL(check_stack_overflow(is_stack_overflow))) {
LOG_WARN("fail to check stack overflow", K(ret), K(is_stack_overflow));
} else if (is_stack_overflow) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("too deep recursive", K(ret), K(is_stack_overflow));
}
return ret;
}
OperatorOpenOrder ObTableModifyOp::get_operator_open_order() const
{
OperatorOpenOrder open_order = OPEN_CHILDREN_FIRST;
if (spec_.plan_->is_use_px()) {
// 如果在dml中开启了PX,有两种情况:
// 1. PDML:在pdml中,pdml-op完全不需要其对应的input来提供 运行时的参数,所以会直接返回
// open_order = OPEN_CHILDREN_FIRST
// 2. DML+PX: 在这种情况下,dml运行是的参数完全是由其头上的GI算子塞过来的,例如这样的计划 :
// PX COORD
// PX TRANSMIT
// GI (FULL PARTITION WISE)
// DELETE
// TSC
// 这样的计划,需要GI来驱动删除,GI每次迭代一个新的Task,就rescan delete,
// 将对应的Task的信息塞给delete算子。
open_order = OPEN_CHILDREN_FIRST;
}
return open_order;
}
void ObTableModifyOp::log_user_error_inner(int ret, int64_t col_idx, int64_t row_num,
const ObIArray<ColumnContent> &column_infos) const
{
if (OB_DATA_OUT_OF_RANGE == ret &&
(0 <= col_idx && col_idx < column_infos.count())) {
ObString column_name = column_infos.at(col_idx).column_name_;
ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name,
CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection());
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, column_name.length(), column_name.ptr(), row_num);
} else if (OB_ERR_DATA_TOO_LONG == ret &&
(0 <= col_idx && col_idx < column_infos.count())) {
ObString column_name = column_infos.at(col_idx).column_name_;
ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name,
CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection());
if (lib::is_mysql_mode()) {
//for error code OB_ERR_DATA_TOO_LONG, oracle and mysql has the different error msg
//oracle error msg has output in ObExprColumnConv, so need to skip oracle mode here
LOG_USER_ERROR(OB_ERR_DATA_TOO_LONG, column_name.length(), column_name.ptr(), row_num);
}
} else if (OB_ERR_VALUE_LARGER_THAN_ALLOWED == ret && col_idx >= 0) {
LOG_USER_ERROR(OB_ERR_VALUE_LARGER_THAN_ALLOWED);
} else if ((OB_INVALID_DATE_VALUE == ret || OB_INVALID_DATE_FORMAT == ret)
&& (0 <= col_idx && col_idx < column_infos.count())) {
ObString column_name = column_infos.at(col_idx).column_name_;
ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name,
CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection());
LOG_USER_ERROR(OB_ERR_INVALID_DATE_MSG_FMT_V2, column_name.length(),
column_name.ptr(), row_num);
} else {
LOG_WARN("fail to operate row", K(ret));
}
}
void ObTableModifyOp::log_user_error_inner(int ret, int64_t row_num, const ColumnContent &column_info) const
{
if (OB_DATA_OUT_OF_RANGE == ret) {
ObString column_name = column_info.column_name_;
ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name,
CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection());
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, column_name.length(), column_name.ptr(), row_num);
} else if (OB_ERR_DATA_TOO_LONG == ret) {
ObString column_name = column_info.column_name_;
ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name,
CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection());
if (lib::is_mysql_mode()) {
//for error code OB_ERR_DATA_TOO_LONG, oracle and mysql has the different error msg
//oracle error msg has output in ObExprColumnConv, so need to skip oracle mode here
LOG_USER_ERROR(OB_ERR_DATA_TOO_LONG, column_name.length(), column_name.ptr(), row_num);
}
} else if (OB_ERR_VALUE_LARGER_THAN_ALLOWED == ret) {
LOG_USER_ERROR(OB_ERR_VALUE_LARGER_THAN_ALLOWED);
} else if (OB_INVALID_DATE_VALUE == ret || OB_INVALID_DATE_FORMAT == ret) {
ObString column_name = column_info.column_name_;
ObSQLUtils::copy_and_convert_string_charset(ctx_.get_allocator(), column_name, column_name,
CS_TYPE_UTF8MB4_BIN, ctx_.get_my_session()->get_local_collation_connection());
LOG_USER_ERROR(OB_ERR_INVALID_DATE_MSG_FMT_V2, column_name.length(),
column_name.ptr(), row_num);
} else {
LOG_WARN("fail to operate row", K(ret));
}
}
int ObTableModifyOp::submit_all_dml_task()
{
int ret = OB_SUCCESS;
if (dml_rtctx_.das_ref_.has_task()) {
if (dml_rtctx_.need_pick_del_task_first() &&
OB_FAIL(dml_rtctx_.das_ref_.pick_del_task_to_first())) {
LOG_WARN("fail to pick delete das task to first", K(ret));
} else if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) {
LOG_WARN("execute all dml das task failed", K(ret));
} else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) {
LOG_WARN("close all das task failed", K(ret));
} else if (OB_FAIL(ObDMLService::handle_after_row_processing(execute_single_row_, &get_dml_modify_row_list()))) {
LOG_WARN("perform batch foreign key constraints and after row trigger failed", K(ret));
} else {
dml_modify_rows_.clear();
dml_rtctx_.reuse();
}
}
return ret;
}
//The data to be written by DML will be buffered in the DAS Write Buffer
//When the buffer data exceeds 6M,
//needs to be written to the storage to release the memory.
int ObTableModifyOp::discharge_das_write_buffer()
{
int ret = OB_SUCCESS;
if (dml_rtctx_.das_ref_.get_das_mem_used() >= das::OB_DAS_MAX_TOTAL_PACKET_SIZE || execute_single_row_) {
LOG_INFO("DASWriteBuffer full or need single row execution, now to write storage",
"buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used(), K(execute_single_row_));
ret = submit_all_dml_task();
}
return ret;
}
int ObTableModifyOp::get_next_row_from_child()
{
int ret = OB_SUCCESS;
clear_evaluated_flag();
if (OB_FAIL(child_->get_next_row())) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next row", K(ret));
}
} else {
LOG_TRACE("child output row", "row", ROWEXPR2STR(eval_ctx_, child_->get_spec().output_));
}
return ret;
}
int ObTableModifyOp::inner_get_next_row()
{
int ret = OB_SUCCESS;
if (iter_end_) {
LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_));
ret = OB_ITER_END;
} else {
int64_t row_count = 0;
while (OB_SUCC(ret)) {
if (OB_FAIL(try_check_status())) {
LOG_WARN("check status failed", K(ret));
} else if (OB_FAIL(get_next_row_from_child())) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next row", K(ret));
} else {
iter_end_ = true;
ret = OB_SUCCESS;
break;
}
} else if (OB_FAIL(write_row_to_das_buffer())) {
LOG_WARN("write row to das failed", K(ret));
} else if (OB_FAIL(discharge_das_write_buffer())) {
LOG_WARN("discharge das write buffer failed", K(ret));
} else if (is_error_logging_ && err_log_rt_def_.first_err_ret_ != OB_SUCCESS) {
clear_evaluated_flag();
err_log_rt_def_.curr_err_log_record_num_++;
err_log_rt_def_.reset();
continue;
} else if (MY_SPEC.is_returning_) {
break;
}
row_count ++;
}
if (OB_FAIL(ret)) {
record_err_for_load_data(ret, row_count);
}
if (OB_SUCC(ret) && iter_end_ && dml_rtctx_.das_ref_.has_task()) {
//DML operator reach iter end,
//now submit the remaining rows in the DAS Write Buffer to the storage
if (OB_FAIL(submit_all_dml_task())) {
LOG_WARN("failed to submit the remaining dml tasks", K(ret));
}
}
//to post process the DML info after writing all data to the storage or returning one row
ret = write_rows_post_proc(ret);
if (OB_SUCC(ret) && iter_end_) {
ret = OB_ITER_END;
}
}
return ret;
}
} // namespace sql
} // namespace oceanbase