[FEAT MERGE]:Merge foreign key feature to master

Co-authored-by: YangEfei <yangyifei96@outlook.com>
This commit is contained in:
obdev
2023-08-30 13:10:42 +00:00
committed by ob-robot
parent b4427e1a69
commit 044fadf593
59 changed files with 2587 additions and 223 deletions

View File

@ -26,6 +26,7 @@
#include "observer/ob_inner_sql_connection_pool.h"
#include "lib/worker.h"
#include "share/ob_debug_sync.h"
#include "sql/engine/dml/ob_fk_checker.h"
namespace oceanbase
{
@ -52,6 +53,7 @@ int ForeignKeyHandle::do_handle(ObTableModifyOp &op,
}
for (int i = 0; OB_SUCC(ret) && i < dml_ctdef.fk_args_.count(); i++) {
const ObForeignKeyArg &fk_arg = dml_ctdef.fk_args_.at(i);
ObForeignKeyChecker *fk_checker = dml_rtdef.fk_checker_array_.at(i);
if (OB_SUCC(ret) && !new_row.empty()) {
if (ACTION_CHECK_EXIST == fk_arg.ref_action_) {
// insert or update.
@ -61,7 +63,7 @@ int ForeignKeyHandle::do_handle(ObTableModifyOp &op,
} else if (is_foreign_key_cascade) {
// nested update can not check parent row.
LOG_DEBUG("skip foreign_key_check_exist in nested session");
} else if (OB_FAIL(check_exist(op, fk_arg, new_row, false))) {
} else if (OB_FAIL(check_exist(op, fk_arg, new_row, fk_checker, false))) {
LOG_WARN("failed to check exist", K(ret), K(fk_arg), K(new_row));
}
}
@ -75,7 +77,7 @@ int ForeignKeyHandle::do_handle(ObTableModifyOp &op,
K(ret), K(fk_arg), K(old_row), K(new_row));
} else if (!has_changed) {
// nothing.
} else if (OB_FAIL(check_exist(op, fk_arg, old_row, true))) {
} else if (OB_FAIL(check_exist(op, fk_arg, old_row, fk_checker, true))) {
LOG_WARN("failed to check exist", K(ret), K(fk_arg), K(old_row));
}
} else if (ACTION_CASCADE == fk_arg.ref_action_) {
@ -111,6 +113,10 @@ int ForeignKeyHandle::do_handle(ObTableModifyOp &op,
}
}
}
} else if (ACTION_SET_NULL == fk_arg.ref_action_) {
if (OB_FAIL(set_null(op, fk_arg, old_row))) {
LOG_WARN("failed to perform set null for foreign key", K(ret));
}
}
} // if (old_row.is_valid())
} // for
@ -149,7 +155,40 @@ int ForeignKeyHandle::value_changed(ObTableModifyOp &op,
return ret;
}
int ForeignKeyHandle::check_exist(ObTableModifyOp &op,
int ForeignKeyHandle::check_exist(ObTableModifyOp &modify_op, const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &row, ObForeignKeyChecker *fk_checker, bool expect_zero)
{
int ret = OB_SUCCESS;
if (!expect_zero) {
ret = check_exist_scan_task(modify_op, fk_arg, row, fk_checker, expect_zero);
} else {
ret = check_exist_inner_sql(modify_op, fk_arg, row, expect_zero);
}
return ret;
}
int ForeignKeyHandle::check_exist_scan_task(ObTableModifyOp &modify_op, const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &row, ObForeignKeyChecker *fk_checker, bool expect_zero)
{
int ret = OB_SUCCESS;
bool has_result = false;
DEBUG_SYNC(BEFORE_FOREIGN_KEY_CONSTRAINT_CHECK);
if (OB_ISNULL(fk_checker)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("foreign key checker is nullptr", K(ret));
} else if (OB_FAIL(fk_checker->do_fk_check_single_row(fk_arg.columns_, row, has_result))) {
LOG_WARN("failed to perform foreign key check by das scan tasks", K(ret));
} else {
if (!has_result) {
ret = OB_ERR_NO_REFERENCED_ROW;
} else {
ret = OB_SUCCESS;
}
}
return ret;
}
int ForeignKeyHandle::check_exist_inner_sql(ObTableModifyOp &op,
const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &row,
bool expect_zero)
@ -382,6 +421,85 @@ int ForeignKeyHandle::cascade(ObTableModifyOp &op,
return ret;
}
int ForeignKeyHandle::set_null(ObTableModifyOp &op,
const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &old_row)
{
int ret = OB_SUCCESS;
static const char *UPDATE_FMT_MYSQL = "update `%.*s`.`%.*s` set %.*s where %.*s";
static const char *UPDATE_FMT_ORACLE = "update \"%.*s\".\"%.*s\" set %.*s where %.*s";
ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR,
OB_MALLOC_NORMAL_BLOCK_SIZE,
MTL_ID());
char *stmt_buf = nullptr;
int64_t stmt_len = 0;
char *where_buf = nullptr;
int64_t where_len = 0;
int64_t stmt_pos = 0;
int64_t where_pos = 0;
const ObString &database_name = fk_arg.database_name_;
const ObString &table_name = fk_arg.table_name_;
if (old_row.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("old row is invalid", K(ret));
} else if (OB_FAIL(gen_where(op.get_eval_ctx(), where_buf, where_len, where_pos,
fk_arg.columns_, old_row, alloc, op.get_obj_print_params()))) {
if (OB_LIKELY(OB_ERR_NULL_VALUE == ret)) {
// skip cascade if any column in foreign key is NULL.
ret = OB_SUCCESS;
stmt_pos = 0;
} else {
LOG_WARN("failed to gen foreign key where", K(ret), K(old_row), K(fk_arg.columns_));
}
} else {
const char *update_fmt = lib::is_mysql_mode() ? UPDATE_FMT_MYSQL : UPDATE_FMT_ORACLE;
char *set_buf = nullptr;
int64_t set_len = 0;
int64_t set_pos = 0;
if (OB_FAIL(gen_column_null_value(op.get_eval_ctx(), set_buf, set_len, set_pos, fk_arg.columns_, alloc, op.get_obj_print_params()))) {
LOG_WARN("failed to gen foreign key set null", K(ret), K(fk_arg.columns_));
} else if (OB_FAIL(databuff_printf(stmt_buf, stmt_len, stmt_pos, alloc, update_fmt,
database_name.length(), database_name.ptr(),
table_name.length(), table_name.ptr(),
static_cast<int>(set_pos), set_buf,
static_cast<int>(where_pos), where_buf))) {
LOG_WARN("failed to print stmt", K(ret), K(table_name), K(set_buf), K(where_buf));
} else {
stmt_buf[stmt_pos++] = 0;
}
}
if (OB_SUCC(ret) && stmt_pos > 0) {
LOG_DEBUG("foreign key cascade set null", "stmt", stmt_buf, K(old_row), K(fk_arg));
if (OB_FAIL(op.begin_nested_session(fk_arg.is_self_ref_))) {
LOG_WARN("failed to begin nested session", K(ret));
} else {
if (OB_FAIL(op.set_foreign_key_cascade(true))) {
LOG_WARN("failed to set foreign key cascade", K(ret));
} else if (OB_FAIL(op.execute_write(stmt_buf))) {
LOG_WARN("failed to execute stmt", K(ret), K(stmt_buf));
}
int reset_ret = op.set_foreign_key_cascade(false);
if (OB_SUCCESS != reset_ret) {
LOG_WARN("failed to reset foreign key cascade", K(reset_ret));
if (OB_SUCCESS == ret) {
ret = reset_ret;
}
}
int end_ret = op.end_nested_session();
if (OB_SUCCESS != end_ret) {
LOG_WARN("failed to end nested session", K(end_ret));
if (OB_SUCCESS == ret) {
ret = end_ret;
}
}
}
}
return ret;
}
int ForeignKeyHandle::gen_set(ObEvalCtx &eval_ctx, char *&buf, int64_t &len, int64_t &pos,
const ObIArray<ObForeignKeyColumn> &columns,
const ObExprPtrIArray &row,
@ -448,6 +566,45 @@ int ForeignKeyHandle::gen_column_value(ObEvalCtx &eval_ctx, char *&buf, int64_t
return ret;
}
int ForeignKeyHandle::gen_column_null_value(ObEvalCtx &ctx, char *&buf, int64_t &len, int64_t &pos,
const common::ObIArray<ObForeignKeyColumn> &columns,
common::ObIAllocator &alloc,
const common::ObObjPrintParams &print_params)
{
int ret = OB_SUCCESS;
static const char *COLUMN_FMT_MYSQL = "`%.*s` = ";
static const char *COLUMN_FMT_ORACLE = "\"%.*s\" = ";
const char *column_fmt = lib::is_mysql_mode() ? COLUMN_FMT_MYSQL : COLUMN_FMT_ORACLE;
const char *delimiter = ", ";
if (OB_ISNULL(delimiter) || OB_ISNULL(print_params.tz_info_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("argument is invalid", K(ret), KP(buf), KP(delimiter), KP(print_params.tz_info_));
} else if (columns.count() <= 0) {
ret= OB_ERR_UNEXPECTED;
LOG_ERROR("columns count of fk is zero or less than zero", K(ret), K(columns.count()), K(columns));
}
ObObj col_obj;
col_obj.set_null();
for (int64_t i = 0; OB_SUCC(ret) && i < columns.count(); i++) {
if (OB_SUCC(ret)) {
const ObString &col_name = columns.at(i).name_;
if (OB_FAIL(databuff_printf(buf, len, pos, alloc, column_fmt,
col_name.length(), col_name.ptr()))) {
LOG_WARN("failed to print column name", K(ret), K(col_name));
} else if (OB_FAIL(col_obj.print_sql_literal(buf, len, pos, alloc, print_params))) {
LOG_WARN("failed to print column value", K(ret), K(col_obj));
} else if (OB_FAIL(databuff_printf(buf, len, pos, alloc, "%s", delimiter))) {
LOG_WARN("failed to print delimiter", K(ret), K(delimiter));
}
}
}
if (OB_SUCC(ret)) {
pos -= STRLEN(delimiter);
buf[pos++] = 0;
}
return ret;
}
int ForeignKeyHandle::is_self_ref_row(ObEvalCtx &eval_ctx,
const ObExprPtrIArray &row,
const ObForeignKeyArg &fk_arg,
@ -925,8 +1082,12 @@ int ObTableModifyOp::set_foreign_key_cascade(bool is_cascade)
int ObTableModifyOp::get_foreign_key_cascade(bool &is_cascade) const
{
int ret = OB_SUCCESS;
OV (OB_NOT_NULL(inner_conn_));
OZ (inner_conn_->get_foreign_key_cascade(is_cascade));
if (OB_NOT_NULL(inner_conn_)) {
OZ (inner_conn_->get_foreign_key_cascade(is_cascade));
} else {
//if inner connection is null, it means not need use inner sql, so set is_cascade false
is_cascade = false;
}
return ret;
}
@ -1028,7 +1189,7 @@ int ObTableModifyOp::submit_all_dml_task()
LOG_WARN("execute all dml das task failed", K(ret));
} else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) {
LOG_WARN("close all das task failed", K(ret));
} else if (OB_FAIL(ObDMLService::handle_after_row_processing(execute_single_row_, &get_dml_modify_row_list()))) {
} else if (OB_FAIL(ObDMLService::handle_after_row_processing(this, &get_dml_modify_row_list()))) {
LOG_WARN("perform batch foreign key constraints and after row trigger failed", K(ret));
} else {
dml_modify_rows_.clear();
@ -1129,5 +1290,23 @@ int ObTableModifyOp::inner_get_next_row()
}
return ret;
}
int ObTableModifyOp::perform_batch_fk_check()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < fk_checkers_.count(); ++i) {
bool all_has_result = false;
ObForeignKeyChecker *fk_checker = fk_checkers_.at(i);
if (OB_ISNULL(fk_checker)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("foreign key checker is null", K(ret), K(i));
} else if (OB_FAIL(fk_checker->do_fk_check_batch(all_has_result))) {
LOG_WARN("failed to perform batch foreign key check", K(ret));
} else if (!all_has_result) {
ret = OB_ERR_NO_REFERENCED_ROW;
}
}
return ret;
}
} // namespace sql
} // namespace oceanbase