diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index f57850324a..aea8db6ba3 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -1072,7 +1072,7 @@ PCODE_DEF(OB_DIRECT_LOAD_CONTROL, 0x1604) //160Cfor transfer partition //PCODE_DEF(OB_TRABSFER_PARTITION, 0x160C) -//PCODE_DEF(OB_CREATE_TRIGGER_WITH_RES, 0x160D) +PCODE_DEF(OB_CREATE_TRIGGER_WITH_RES, 0x160D) // Shared storage network throt // PCODE_DEF(OB_SHARE_STORAGE_NET_THROT_REGISTER, 0x160E) diff --git a/src/observer/ob_srv_xlator_rootserver.cpp b/src/observer/ob_srv_xlator_rootserver.cpp index ce427bd077..557873ab05 100644 --- a/src/observer/ob_srv_xlator_rootserver.cpp +++ b/src/observer/ob_srv_xlator_rootserver.cpp @@ -128,6 +128,7 @@ void oceanbase::observer::init_srv_xlator_for_rootserver(ObSrvRpcXlator *xlator) RPC_PROCESSOR(rootserver::ObRpcAlterPackageP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcDropPackageP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcCreateTriggerP, *gctx_.root_service_); + RPC_PROCESSOR(rootserver::ObRpcCreateTriggerWithResP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcAlterTriggerP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcDropTriggerP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcCreateSynonymP, *gctx_.root_service_); diff --git a/src/rootserver/ob_ddl_operator.cpp b/src/rootserver/ob_ddl_operator.cpp index abee6b1920..b000d4837b 100644 --- a/src/rootserver/ob_ddl_operator.cpp +++ b/src/rootserver/ob_ddl_operator.cpp @@ -8647,12 +8647,13 @@ int ObDDLOperator::create_trigger(ObTriggerInfo &trigger_info, ObMySQLTransaction &trans, ObErrorInfo &error_info, ObIArray &dep_infos, + int64_t &table_schema_version, const ObString *ddl_stmt_str, bool is_update_table_schema_version, bool is_for_truncate_table) { int ret = OB_SUCCESS; - + table_schema_version = OB_INVALID_VERSION; ObSchemaService *schema_service = schema_service_.get_schema_service(); const uint64_t tenant_id = trigger_info.get_tenant_id(); int64_t new_schema_version = OB_INVALID_VERSION; @@ -8674,8 +8675,10 @@ int ObDDLOperator::create_trigger(ObTriggerInfo &trigger_info, trigger_info.get_trigger_name(), is_replace); if (!trigger_info.is_system_type() && is_update_table_schema_version) { uint64_t base_table_id = trigger_info.get_base_object_id(); + OZ (schema_service_.gen_new_schema_version(tenant_id, new_schema_version)); + OX (table_schema_version = new_schema_version); OZ (schema_service->get_table_sql_service().update_data_table_schema_version( - trans, tenant_id, base_table_id, false/*in offline ddl white list*/), + trans, tenant_id, base_table_id, false/*in offline ddl white list*/, new_schema_version), base_table_id, trigger_info.get_trigger_name()); } if (OB_FAIL(ret)) { diff --git a/src/rootserver/ob_ddl_operator.h b/src/rootserver/ob_ddl_operator.h index 3c30348ccb..b80026244c 100644 --- a/src/rootserver/ob_ddl_operator.h +++ b/src/rootserver/ob_ddl_operator.h @@ -799,6 +799,7 @@ public: common::ObMySQLTransaction &trans, share::schema::ObErrorInfo &error_info, ObIArray &dep_infos, + int64_t &table_schema_version, const common::ObString *ddl_stmt_str/*=NULL*/, bool is_update_table_schema_version = true, bool is_for_truncate_table = false); diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index 6c6da1f9e1..aeefee991d 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -16052,8 +16052,9 @@ int ObDDLService::rebuild_triggers_on_hidden_table( } if (OB_SUCC(ret)) { ObSEArray dep_infos; + int64_t table_schema_version = OB_INVALID_VERSION; OZ (ddl_operator.create_trigger(new_trigger_info, trans, error_info, dep_infos, - nullptr, false/*is_update_table_schema_version*/)); + table_schema_version, nullptr, false/*is_update_table_schema_version*/)); } } } @@ -30701,10 +30702,11 @@ int ObDDLService::drop_package(const ObPackageInfo &package_info, return ret; } -int ObDDLService::create_trigger(const ObCreateTriggerArg &arg) +int ObDDLService::create_trigger(const ObCreateTriggerArg &arg, + ObSchemaGetterGuard &schema_guard, + ObCreateTriggerRes *res) { int ret = OB_SUCCESS; - ObSchemaGetterGuard schema_guard; ObTriggerInfo new_trigger_info; //in_second_stage_ is false, Indicates that the trigger is created normally //true Indicates that the error message is inserted into the system table after the trigger is created @@ -30719,8 +30721,6 @@ int ObDDLService::create_trigger(const ObCreateTriggerArg &arg) if (OB_FAIL(new_trigger_info.assign(arg.trigger_info_))) { LOG_WARN("assign trigger_info failed", K(ret)); } else if (FALSE_IT(tenant_id = new_trigger_info.get_tenant_id())) { - } else if (OB_FAIL(get_tenant_schema_guard_with_version_in_inner_table(tenant_id, schema_guard))) { - LOG_WARN("o get schema guard in inner table failed", KR(ret), K(tenant_id)); } else { const ObTriggerInfo *old_trigger_info = NULL; if (!arg.is_valid()) { @@ -30744,14 +30744,24 @@ int ObDDLService::create_trigger(const ObCreateTriggerArg &arg) } } } - if (OB_SUCC(ret) && - OB_FAIL(create_trigger_in_trans(new_trigger_info, - const_cast(arg.error_info_), - const_cast &>(arg.dependency_infos_), - &arg.ddl_stmt_str_, - arg.in_second_stage_, - schema_guard))) { - LOG_WARN("create trigger in trans failed", K(ret)); + if (OB_SUCC(ret)) { + bool with_res = (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_1_2); + int64_t table_schema_version = OB_INVALID_VERSION; + if (with_res && OB_ISNULL(res)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("res is NULL", K(ret)); + } else if (OB_FAIL(create_trigger_in_trans(new_trigger_info, + const_cast(arg.error_info_), + const_cast &>(arg.dependency_infos_), + &arg.ddl_stmt_str_, + arg.in_second_stage_, + schema_guard, + table_schema_version))) { + LOG_WARN("create trigger in trans failed", K(ret)); + } else if (with_res && !arg.in_second_stage_) { + res->table_schema_version_ = table_schema_version; + res->trigger_schema_version_ = new_trigger_info.get_schema_version(); + } } return ret; } @@ -30761,7 +30771,8 @@ int ObDDLService::create_trigger_in_trans(ObTriggerInfo &trigger_info, ObIArray &dep_infos, const ObString *ddl_stmt_str, bool in_second_stage, - share::schema::ObSchemaGetterGuard &schema_guard) + share::schema::ObSchemaGetterGuard &schema_guard, + int64_t &table_schema_version) { int ret = OB_SUCCESS; const uint64_t tenant_id = trigger_info.get_tenant_id(); @@ -30776,7 +30787,7 @@ int ObDDLService::create_trigger_in_trans(ObTriggerInfo &trigger_info, if (OB_SUCC(ret) && !in_second_stage) { OZ (adjust_trigger_action_order(schema_guard, trans, ddl_operator, trigger_info, true)); } - OZ (ddl_operator.create_trigger(trigger_info, trans, error_info, dep_infos, ddl_stmt_str)); + OZ (ddl_operator.create_trigger(trigger_info, trans, error_info, dep_infos, table_schema_version, ddl_stmt_str)); if (trans.is_started()) { int temp_ret = OB_SUCCESS; if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) { @@ -31029,7 +31040,9 @@ int ObDDLService::create_trigger_for_truncate_table(ObSchemaGetterGuard &schema_ new_table_schema.get_trigger_list().push_back(new_trigger_id); if (OB_SUCC(ret)) { ObSEArray dep_infos; + int64_t table_schema_version = OB_INVALID_VERSION; if (OB_FAIL(ddl_operator.create_trigger(new_trigger_info, trans, error_info, dep_infos, + table_schema_version, &origin_trigger_info->get_trigger_body(), is_update_table_schema_version, true))) { diff --git a/src/rootserver/ob_ddl_service.h b/src/rootserver/ob_ddl_service.h index 4ad44a75bf..26c188b21b 100644 --- a/src/rootserver/ob_ddl_service.h +++ b/src/rootserver/ob_ddl_service.h @@ -936,7 +936,9 @@ int check_table_udt_id_is_exist(share::schema::ObSchemaGetterGuard &schema_guard //----End of functions for managing package---- //----Functions for managing trigger---- - virtual int create_trigger(const obrpc::ObCreateTriggerArg &arg); + virtual int create_trigger(const obrpc::ObCreateTriggerArg &arg, + ObSchemaGetterGuard &schema_guard, + obrpc::ObCreateTriggerRes *res); virtual int drop_trigger(const obrpc::ObDropTriggerArg &arg); virtual int alter_trigger(const obrpc::ObAlterTriggerArg &arg); //----End of functions for managing trigger---- @@ -1026,7 +1028,8 @@ int check_table_udt_id_is_exist(share::schema::ObSchemaGetterGuard &schema_guard ObIArray &dep_infos, const common::ObString *ddl_stmt_str, bool for_insert_errors, - share::schema::ObSchemaGetterGuard &schema_guard); + share::schema::ObSchemaGetterGuard &schema_guard, + int64_t &table_schema_version); int drop_trigger_in_trans(const share::schema::ObTriggerInfo &trigger_info, const common::ObString *ddl_stmt_str, share::schema::ObSchemaGetterGuard &schema_guard); diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index e394cbe92b..20e96e16b5 100755 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -6963,10 +6963,33 @@ int ObRootService::drop_package(const obrpc::ObDropPackageArg &arg) int ObRootService::create_trigger(const obrpc::ObCreateTriggerArg &arg) { int ret = OB_SUCCESS; + ObSchemaGetterGuard schema_guard; if (!inited_) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); - } else if (OB_FAIL(ddl_service_.create_trigger(arg))) { + } else if (OB_FAIL(ddl_service_.get_tenant_schema_guard_with_version_in_inner_table( + arg.trigger_info_.get_tenant_id(), schema_guard))) { + LOG_WARN("get schema guard with version in inner table failed", K(ret)); + } else if (OB_FAIL(ddl_service_.create_trigger(arg, schema_guard, NULL))) { + LOG_WARN("failed to create trigger", K(ret)); + } + return ret; +} + +int ObRootService::create_trigger_with_res(const obrpc::ObCreateTriggerArg &arg, + obrpc::ObCreateTriggerRes &res) +{ + int ret = OB_SUCCESS; + ObSchemaGetterGuard schema_guard; + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (OB_FAIL(ddl_service_.get_tenant_schema_guard_with_version_in_inner_table( + arg.trigger_info_.get_tenant_id(), schema_guard))) { + LOG_WARN("get schema guard with version in inner table failed", K(ret)); + } else if (OB_FAIL(check_parallel_ddl_conflict(schema_guard, arg))) { + LOG_WARN("check parallel ddl conflict failed", K(ret)); + } else if (OB_FAIL(ddl_service_.create_trigger(arg, schema_guard, &res))) { LOG_WARN("failed to create trigger", K(ret)); } return ret; diff --git a/src/rootserver/ob_root_service.h b/src/rootserver/ob_root_service.h index cbaa33051b..59fbdddda0 100644 --- a/src/rootserver/ob_root_service.h +++ b/src/rootserver/ob_root_service.h @@ -607,6 +607,8 @@ public: //----Functions for managing trigger---- int create_trigger(const obrpc::ObCreateTriggerArg &arg); + int create_trigger_with_res(const obrpc::ObCreateTriggerArg &arg, + obrpc::ObCreateTriggerRes &res); int alter_trigger(const obrpc::ObAlterTriggerArg &arg); int drop_trigger(const obrpc::ObDropTriggerArg &arg); //----End of functions for managing trigger---- diff --git a/src/rootserver/ob_rs_rpc_processor.h b/src/rootserver/ob_rs_rpc_processor.h index bce7adfc3b..c2d3e09ade 100644 --- a/src/rootserver/ob_rs_rpc_processor.h +++ b/src/rootserver/ob_rs_rpc_processor.h @@ -438,6 +438,7 @@ DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_ALTER_PACKAGE, ObRpcAlterPackageP, alter_p DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_DROP_PACKAGE, ObRpcDropPackageP, drop_package(arg_)); DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_TRIGGER, ObRpcCreateTriggerP, create_trigger(arg_)); +DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_TRIGGER_WITH_RES, ObRpcCreateTriggerWithResP, create_trigger_with_res(arg_, result_)); DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_ALTER_TRIGGER, ObRpcAlterTriggerP, alter_trigger(arg_)); DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_DROP_TRIGGER, ObRpcDropTriggerP, drop_trigger(arg_)); diff --git a/src/share/ob_common_rpc_proxy.h b/src/share/ob_common_rpc_proxy.h index fb7ff86958..2030dc10f4 100644 --- a/src/share/ob_common_rpc_proxy.h +++ b/src/share/ob_common_rpc_proxy.h @@ -175,6 +175,7 @@ public: RPC_S(PRD drop_package, obrpc::OB_DROP_PACKAGE, (ObDropPackageArg)); RPC_S(PRD create_trigger, obrpc::OB_CREATE_TRIGGER, (ObCreateTriggerArg)); + RPC_S(PRD create_trigger_with_res, obrpc::OB_CREATE_TRIGGER_WITH_RES, (ObCreateTriggerArg), ObCreateTriggerRes); RPC_S(PRD alter_trigger, obrpc::OB_ALTER_TRIGGER, (ObAlterTriggerArg)); RPC_S(PRD drop_trigger, obrpc::OB_DROP_TRIGGER, (ObDropTriggerArg)); diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 7799a8cd6d..39ef2748cd 100755 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -5397,6 +5397,10 @@ OB_SERIALIZE_MEMBER((ObCreateTriggerArg, ObDDLArg), error_info_, dependency_infos_); +OB_SERIALIZE_MEMBER(ObCreateTriggerRes, + table_schema_version_, + trigger_schema_version_); + bool ObDropTriggerArg::is_valid() const { return OB_INVALID_TENANT_ID != tenant_id_ diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 811d327c60..3598171b69 100755 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -6418,6 +6418,25 @@ public: common::ObSArray dependency_infos_; }; +struct ObCreateTriggerRes +{ + OB_UNIS_VERSION(1); + +public: + ObCreateTriggerRes() : + table_schema_version_(OB_INVALID_VERSION), + trigger_schema_version_(OB_INVALID_VERSION) + {} + int assign(const ObCreateTriggerRes &other) { + table_schema_version_ = other.table_schema_version_; + trigger_schema_version_ = other.trigger_schema_version_; + return common::OB_SUCCESS; + } + TO_STRING_KV(K_(table_schema_version), K_(trigger_schema_version)); + int64_t table_schema_version_; + int64_t trigger_schema_version_; +}; + struct ObDropTriggerArg : public ObDDLArg { OB_UNIS_VERSION(1); diff --git a/src/sql/engine/cmd/ob_trigger_executor.cpp b/src/sql/engine/cmd/ob_trigger_executor.cpp index a3d4b81c8a..e1b3853581 100644 --- a/src/sql/engine/cmd/ob_trigger_executor.cpp +++ b/src/sql/engine/cmd/ob_trigger_executor.cpp @@ -38,15 +38,22 @@ int ObCreateTriggerExecutor::execute(ObExecContext &ctx, ObCreateTriggerStmt &st ObCreateTriggerArg &arg = stmt.get_trigger_arg(); uint64_t tenant_id = arg.trigger_info_.get_tenant_id(); ObString first_stmt; + obrpc::ObCreateTriggerRes res; + bool with_res = (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_1_2); OZ (stmt.get_first_stmt(first_stmt)); arg.ddl_stmt_str_ = first_stmt; OV (OB_NOT_NULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx)), OB_NOT_INIT); OZ (task_exec_ctx->get_common_rpc(common_rpc_proxy)); OV (OB_NOT_NULL(common_rpc_proxy)); - OZ (common_rpc_proxy->create_trigger(arg), common_rpc_proxy->get_server()); + if (with_res) { + OZ (common_rpc_proxy->create_trigger_with_res(arg, res), common_rpc_proxy->get_server()); + } else { + OZ (common_rpc_proxy->create_trigger(arg), common_rpc_proxy->get_server()); + } //这里需要刷新schema,否则可能获取不到最新的trigger_info OZ (ObSPIService::force_refresh_schema(tenant_id)); CK (OB_NOT_NULL(ctx.get_sql_ctx())); + CK (OB_NOT_NULL(ctx.get_sql_ctx()->schema_guard_)); CK (OB_NOT_NULL(ctx.get_my_session())); CK (OB_NOT_NULL(ctx.get_sql_proxy())); CK (OB_NOT_NULL(ctx.get_task_exec_ctx().schema_service_)); @@ -58,11 +65,26 @@ int ObCreateTriggerExecutor::execute(ObExecContext &ctx, ObCreateTriggerStmt &st ctx.get_sql_proxy(), ctx.get_allocator(), arg)); + OZ (ctx.get_sql_ctx()->schema_guard_->reset()); if (OB_SUCC(ret)) { arg.ddl_stmt_str_.reset(); - OZ (common_rpc_proxy->create_trigger(arg), common_rpc_proxy->get_server()); + if (with_res) { + arg.based_schema_object_infos_.reset(); + OZ (arg.based_schema_object_infos_.push_back(ObBasedSchemaObjectInfo(arg.trigger_info_.get_base_object_id(), + TABLE_SCHEMA, + res.table_schema_version_))); + OZ (arg.based_schema_object_infos_.push_back(ObBasedSchemaObjectInfo(arg.trigger_info_.get_trigger_id(), + TRIGGER_SCHEMA, + res.trigger_schema_version_))); + OZ (common_rpc_proxy->create_trigger_with_res(arg, res), common_rpc_proxy->get_server()); + if (OB_ERR_PARALLEL_DDL_CONFLICT == ret) { + LOG_WARN("trigger or base table maybe changed by other session, ignore the error", K(ret), K(res)); + ret = OB_SUCCESS; + } + } else { + OZ (common_rpc_proxy->create_trigger(arg), common_rpc_proxy->get_server()); + } } - OZ (ctx.get_sql_ctx()->schema_guard_->reset()); return ret; } @@ -108,7 +130,7 @@ int ObCreateTriggerExecutor::analyze_dependencies(ObSchemaGetterGuard &schema_gu { int ret = OB_SUCCESS; uint64_t tenant_id = arg.trigger_info_.get_tenant_id(); - const ObString &trigger_name = arg.trigger_info_.get_trigger_name();\ + const ObString &trigger_name = arg.trigger_info_.get_trigger_name(); const ObString &db_name = arg.trigger_database_; const ObTriggerInfo *trigger_info = NULL; if (OB_FAIL(schema_guard.get_trigger_info(tenant_id, arg.trigger_info_.get_database_id(), diff --git a/src/sql/resolver/ddl/ob_trigger_resolver.cpp b/src/sql/resolver/ddl/ob_trigger_resolver.cpp index c07e7ea6f6..3a0d0ed990 100644 --- a/src/sql/resolver/ddl/ob_trigger_resolver.cpp +++ b/src/sql/resolver/ddl/ob_trigger_resolver.cpp @@ -158,6 +158,18 @@ int ObTriggerResolver::resolve_create_trigger_stmt(const ParseNode &parse_node, OX (trigger_arg.trigger_info_.set_owner_id(session_info_->get_user_id())); OZ (resolve_sp_definer(is_ora ? nullptr : parse_node.children_[0], trigger_arg)); OZ (resolve_trigger_source(*parse_node.children_[is_ora ? 0 : 1], trigger_arg)); + if (OB_SUCC(ret)) { + const ObTableSchema *table_schema = NULL; + CK (OB_NOT_NULL(schema_checker_)); + CK (OB_NOT_NULL(schema_checker_->get_schema_guard())); + OZ (schema_checker_->get_schema_guard()->get_table_schema(trigger_arg.trigger_info_.get_tenant_id(), + trigger_arg.trigger_info_.get_base_object_id(), + table_schema)); + CK (OB_NOT_NULL(table_schema)); + OZ (trigger_arg.based_schema_object_infos_.push_back(ObBasedSchemaObjectInfo(table_schema->get_table_id(), + TABLE_SCHEMA, + table_schema->get_schema_version()))); + } if (OB_SUCC(ret)) { ObErrorInfo &error_info = trigger_arg.error_info_; error_info.collect_error_info(&(trigger_arg.trigger_info_));