diff --git a/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h b/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h index 7c6d7635c9..77833606fd 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h +++ b/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h @@ -78,6 +78,7 @@ public: dblink_id_(OB_INVALID_ID), dblink_driver_proto_(-1), sessid_(-1), + consumer_group_id_(0), has_reverse_link_credentials_(false), usable_(true) {} @@ -135,6 +136,8 @@ public: virtual int64_t get_cluster_id() const { return common::OB_INVALID_ID; } void set_init_remote_env(bool flag) { is_init_remote_env_ = flag;} bool get_init_remote_env() const { return is_init_remote_env_; } + void set_group_id(const int64_t v) {consumer_group_id_ = v; } + int64_t get_group_id() const {return consumer_group_id_; } void set_reverse_link_creadentials(bool flag) { has_reverse_link_credentials_ = flag; } bool get_reverse_link_creadentials() { return has_reverse_link_credentials_; } void set_usable(bool flag) { usable_ = flag; } @@ -146,6 +149,7 @@ protected: uint64_t dblink_id_; // for dblink, record dblink_id of a connection used by dblink int64_t dblink_driver_proto_; //for dblink, record DblinkDriverProto of a connection used by dblink uint32_t sessid_; + int64_t consumer_group_id_; //for resource isolation bool has_reverse_link_credentials_; // for dblink, mark if this link has credentials set bool usable_; // usable_ = false: connection is unusable, should not execute query again. }; diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp index 1af13f0831..ff20c9fb8c 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp @@ -91,6 +91,7 @@ int ObCommonSqlProxy::read(ReadResult &result, const uint64_t tenant_id, const c } else if (nullptr != session_param) { conn->set_ddl_info(&session_param->ddl_info_); conn->set_use_external_session(session_param->use_external_session_); + conn->set_group_id(session_param->consumer_group_id_); if (nullptr != session_param->sql_mode_) { if (OB_FAIL(conn->set_session_variable("sql_mode", *session_param->sql_mode_))) { LOG_WARN("set inner connection sql mode failed", K(ret)); @@ -197,6 +198,7 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const ObString sql, if (OB_SUCC(ret) && nullptr != param) { conn->set_is_load_data_exec(param->is_load_data_exec_); conn->set_use_external_session(param->use_external_session_); + conn->set_group_id(param->consumer_group_id_); if (param->is_load_data_exec_) { is_user_sql = true; } diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h index e6c4f33dd0..99d5c47344 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h @@ -67,7 +67,7 @@ struct ObSessionParam final { public: ObSessionParam() - : sql_mode_(nullptr), tz_info_wrap_(nullptr), ddl_info_(), is_load_data_exec_(false), use_external_session_(false), nls_formats_{} + : sql_mode_(nullptr), tz_info_wrap_(nullptr), ddl_info_(), is_load_data_exec_(false), use_external_session_(false), consumer_group_id_(0), nls_formats_{} {} ~ObSessionParam() = default; public: @@ -76,6 +76,7 @@ public: ObSessionDDLInfo ddl_info_; bool is_load_data_exec_; bool use_external_session_; // need init remote inner sql conn with sess getting from sess mgr + int64_t consumer_group_id_; common::ObString nls_formats_[common::ObNLSFormatEnum::NLS_MAX]; }; diff --git a/src/observer/ob_inner_sql_connection.cpp b/src/observer/ob_inner_sql_connection.cpp index f716f35579..6ff6b4b3ad 100644 --- a/src/observer/ob_inner_sql_connection.cpp +++ b/src/observer/ob_inner_sql_connection.cpp @@ -1837,6 +1837,7 @@ int ObInnerSQLConnection::execute_write_inner(const uint64_t tenant_id, const Ob TimeoutGuard timeout_guard(*this); // backup && restore worker/session timeout int64_t query_timeout = OB_DEFAULT_SESSION_TIMEOUT; int64_t trx_timeout = OB_DEFAULT_SESSION_TIMEOUT; + int64_t consumer_group_id = get_group_id(); ObSQLMode sql_mode = 0; const ObSessionDDLInfo &ddl_info = get_session().get_ddl_info(); bool is_load_data_exec = get_session().is_load_data_exec_session(); @@ -1877,7 +1878,7 @@ int ObInnerSQLConnection::execute_write_inner(const uint64_t tenant_id, const Ob sql, ObInnerSQLTransmitArg::OPERATION_TYPE_EXECUTE_WRITE, lib::Worker::CompatMode::ORACLE == get_compat_mode(), GCONF.cluster_id, THIS_WORKER.get_timeout_ts(), query_timeout, trx_timeout, sql_mode, - ddl_info, is_load_data_exec, use_external_session_); + ddl_info, is_load_data_exec, use_external_session_, consumer_group_id); arg.set_nls_formats(get_session().get_local_nls_date_format(), get_session().get_local_nls_timestamp_format(), get_session().get_local_nls_timestamp_tz_format()); diff --git a/src/observer/ob_inner_sql_rpc_processor.cpp b/src/observer/ob_inner_sql_rpc_processor.cpp index 46cdc98532..b036df5cdb 100644 --- a/src/observer/ob_inner_sql_rpc_processor.cpp +++ b/src/observer/ob_inner_sql_rpc_processor.cpp @@ -534,6 +534,7 @@ int ObInnerSqlRpcP::process() LOG_WARN("failed to acquire inner connection", K(ret), K(transmit_arg)); } /* init session info */ + const int64_t group_id = transmit_arg.get_consumer_group_id(); if (OB_NOT_NULL(tmp_session)) { tmp_session->set_current_trace_id(ObCurTraceId::get_trace_id()); tmp_session->switch_tenant(transmit_arg.get_tenant_id()); @@ -558,6 +559,7 @@ int ObInnerSqlRpcP::process() if (OB_FAIL(inner_conn->set_session_timeout(transmit_arg.get_query_timeout(), transmit_arg.get_trx_timeout()))) { LOG_WARN("failed to set_session_timeout", K(ret), K(transmit_arg)); } else if (FALSE_IT(THIS_WORKER.set_timeout_ts(transmit_arg.get_worker_timeout()))) { + } else if (FALSE_IT(THIS_WORKER.set_group_id(group_id))) { //for ddl data_complement process } else { switch (transmit_arg.get_operation_type()) { case ObInnerSQLTransmitArg::OPERATION_TYPE_START_TRANSACTION: { diff --git a/src/observer/ob_inner_sql_rpc_proxy.cpp b/src/observer/ob_inner_sql_rpc_proxy.cpp index ac8430f047..5b5db95095 100644 --- a/src/observer/ob_inner_sql_rpc_proxy.cpp +++ b/src/observer/ob_inner_sql_rpc_proxy.cpp @@ -39,7 +39,8 @@ OB_DEF_SERIALIZE(obrpc::ObInnerSQLTransmitArg) ddl_info_, is_load_data_exec_, nls_formats_, - use_external_session_); + use_external_session_, + consumer_group_id_); return ret; } @@ -63,7 +64,8 @@ OB_DEF_DESERIALIZE(obrpc::ObInnerSQLTransmitArg) ddl_info_, is_load_data_exec_, nls_formats_, - use_external_session_); + use_external_session_, + consumer_group_id_); if (OB_SUCC(ret)) { (void)sql::ObSQLUtils::adjust_time_by_ntp_offset(worker_timeout_); } @@ -90,7 +92,8 @@ OB_DEF_SERIALIZE_SIZE(obrpc::ObInnerSQLTransmitArg) ddl_info_, is_load_data_exec_, nls_formats_, - use_external_session_); + use_external_session_, + consumer_group_id_); return len; } // diff --git a/src/observer/ob_inner_sql_rpc_proxy.h b/src/observer/ob_inner_sql_rpc_proxy.h index dbb9f5968b..e4b7d9a457 100644 --- a/src/observer/ob_inner_sql_rpc_proxy.h +++ b/src/observer/ob_inner_sql_rpc_proxy.h @@ -57,21 +57,21 @@ public: worker_timeout_(OB_DEFAULT_SESSION_TIMEOUT), query_timeout_(OB_DEFAULT_SESSION_TIMEOUT), trx_timeout_(OB_DEFAULT_SESSION_TIMEOUT), sql_mode_(0), tz_info_wrap_(), ddl_info_(), is_load_data_exec_(false), nls_formats_{}, - use_external_session_(false) {}; + use_external_session_(false), consumer_group_id_(0) {}; ObInnerSQLTransmitArg(common::ObAddr ctrl_svr, common::ObAddr runner_svr, uint64_t tenant_id, uint64_t conn_id, common::ObString inner_sql, InnerSQLOperationType operation_type, bool is_oracle_mode, const int64_t source_cluster_id, const int64_t worker_timeout, const int64_t query_timeout, const int64_t trx_timeout, ObSQLMode sql_mode, ObSessionDDLInfo ddl_info, const bool is_load_data_exec, - const bool use_external_session) + const bool use_external_session, const int64_t consumer_group_id = 0) : ctrl_svr_(ctrl_svr), runner_svr_(runner_svr), tenant_id_(tenant_id), conn_id_(conn_id), inner_sql_(inner_sql), operation_type_(operation_type), is_oracle_mode_(is_oracle_mode), source_cluster_id_(source_cluster_id), worker_timeout_(worker_timeout), query_timeout_(query_timeout), trx_timeout_(trx_timeout), sql_mode_(sql_mode), tz_info_wrap_(), ddl_info_(ddl_info), is_load_data_exec_(is_load_data_exec), nls_formats_{}, - use_external_session_(use_external_session) {} + use_external_session_(use_external_session), consumer_group_id_(consumer_group_id) {} ~ObInnerSQLTransmitArg() {} const common::ObAddr &get_ctrl_svr() const { return ctrl_svr_; } @@ -120,6 +120,12 @@ public: int64_t get_trx_timeout() const { return trx_timeout_; } + void set_consumer_group_id(const int64_t consumer_group_id) { + consumer_group_id_ = consumer_group_id; + } + int64_t get_consumer_group_id() const { + return consumer_group_id_; + } inline int set_tz_info_wrap(const ObTimeZoneInfoWrap &other) { return tz_info_wrap_.deep_copy(other); } void set_nls_formats(const common::ObString &nls_date_format, const common::ObString &nls_timestamp_format, @@ -153,7 +159,8 @@ public: K_(ddl_info), K_(is_load_data_exec), K_(nls_formats), - K_(use_external_session)); + K_(use_external_session), + K_(consumer_group_id)); private: common::ObAddr ctrl_svr_; @@ -173,6 +180,7 @@ private: bool is_load_data_exec_; common::ObString nls_formats_[common::ObNLSFormatEnum::NLS_MAX]; bool use_external_session_; + int64_t consumer_group_id_; }; class ObInnerSQLTransmitResult diff --git a/src/observer/table_load/ob_table_load_redef_table.cpp b/src/observer/table_load/ob_table_load_redef_table.cpp index b0fa0f89cb..39394e606e 100644 --- a/src/observer/table_load/ob_table_load_redef_table.cpp +++ b/src/observer/table_load/ob_table_load_redef_table.cpp @@ -44,6 +44,7 @@ int ObTableLoadRedefTable::start(const ObTableLoadRedefTableStartArg &arg, create_table_arg.nls_formats_[ObNLSFormatEnum::NLS_DATE] = session_info.get_local_nls_date_format(); create_table_arg.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP] = session_info.get_local_nls_timestamp_format(); create_table_arg.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP_TZ] = session_info.get_local_nls_timestamp_tz_format(); + create_table_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); if (OB_FAIL(create_table_arg.tz_info_wrap_.deep_copy(session_info.get_tz_info_wrap()))) { LOG_WARN("failed to deep copy tz_info_wrap", KR(ret)); } else if (OB_FAIL(ObDDLServerClient::create_hidden_table(create_table_arg, create_table_res, snapshot_version, session_info))) { diff --git a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp index ed3a45466d..4df44ab525 100644 --- a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp @@ -39,7 +39,7 @@ ObColumnRedefinitionTask::~ObColumnRedefinitionTask() } int ObColumnRedefinitionTask::init(const uint64_t tenant_id, const int64_t task_id, const share::ObDDLType &ddl_type, - const int64_t data_table_id, const int64_t dest_table_id, const int64_t schema_version, const int64_t parallelism, + const int64_t data_table_id, const int64_t dest_table_id, const int64_t schema_version, const int64_t parallelism, const int64_t consumer_group_id, const obrpc::ObAlterTableArg &alter_table_arg, const int64_t task_status, const int64_t snapshot_version) { int ret = OB_SUCCESS; @@ -69,6 +69,7 @@ int ObColumnRedefinitionTask::init(const uint64_t tenant_id, const int64_t task_ task_version_ = OB_COLUMN_REDEFINITION_TASK_VERSION; task_id_ = task_id; parallelism_ = parallelism; + consumer_group_id_ = consumer_group_id; execution_id_ = 1L; start_time_ = ObTimeUtility::current_time(); if (OB_FAIL(init_ddl_task_monitor_info(target_object_id_))) { @@ -187,6 +188,7 @@ int ObColumnRedefinitionTask::send_build_single_replica_request() param.parallelism_ = alter_table_arg_.parallelism_; param.execution_id_ = execution_id_; param.data_format_version_ = data_format_version_; + param.consumer_group_id_ = alter_table_arg_.consumer_group_id_; if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, object_id_, param.source_tablet_ids_))) { LOG_WARN("fail to get tablets", K(ret), K(tenant_id_), K(object_id_)); } else if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, target_object_id_, param.dest_tablet_ids_))) { @@ -335,6 +337,7 @@ int ObColumnRedefinitionTask::copy_table_indexes() 0/*object_id*/, index_schema->get_schema_version(), parallelism_ / index_ids.count()/*parallelism*/, + consumer_group_id_, &allocator_, &create_index_arg, task_id_); diff --git a/src/rootserver/ddl_task/ob_column_redefinition_task.h b/src/rootserver/ddl_task/ob_column_redefinition_task.h index 5013bf4dcb..fd0c553a1e 100644 --- a/src/rootserver/ddl_task/ob_column_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_column_redefinition_task.h @@ -36,6 +36,7 @@ public: const int64_t dest_table_id, const int64_t schema_version, const int64_t parallelism, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg &alter_table_arg, const int64_t task_status = share::ObDDLTaskStatus::PREPARE, const int64_t snapshot_version = 0); diff --git a/src/rootserver/ddl_task/ob_constraint_task.cpp b/src/rootserver/ddl_task/ob_constraint_task.cpp index ea2fd9ed2b..3ed9d816e2 100644 --- a/src/rootserver/ddl_task/ob_constraint_task.cpp +++ b/src/rootserver/ddl_task/ob_constraint_task.cpp @@ -484,6 +484,7 @@ int ObConstraintTask::init( const ObDDLType type, const int64_t schema_version, const ObAlterTableArg &alter_table_arg, + const int64_t consumer_group_id, const int64_t parent_task_id, const int64_t status, const int64_t snapshot_version) @@ -516,6 +517,7 @@ int ObConstraintTask::init( root_service_ = root_service; task_id_ = task_id; parent_task_id_ = parent_task_id; + consumer_group_id_ = consumer_group_id; task_version_ = OB_CONSTRAINT_TASK_VERSION; is_table_hidden_ = table_schema->is_user_hidden_table(); is_inited_ = true; diff --git a/src/rootserver/ddl_task/ob_constraint_task.h b/src/rootserver/ddl_task/ob_constraint_task.h index 5c36075c67..46a4db80bb 100644 --- a/src/rootserver/ddl_task/ob_constraint_task.h +++ b/src/rootserver/ddl_task/ob_constraint_task.h @@ -97,6 +97,7 @@ public: const share::ObDDLType ddl_type, const int64_t schema_version, const obrpc::ObAlterTableArg &alter_table_arg, + const int64_t consumer_group_id, const int64_t parent_task_id = 0, const int64_t status = share::ObDDLTaskStatus::WAIT_TRANS_END, const int64_t snapshot_version = 0); diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp index f63607997e..c2cc9c7b1a 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp @@ -42,6 +42,7 @@ ObDDLRedefinitionSSTableBuildTask::ObDDLRedefinitionSSTableBuildTask( const int64_t schema_version, const int64_t snapshot_version, const int64_t execution_id, + const int64_t consumer_group_id, const ObSQLMode &sql_mode, const common::ObCurTraceId::TraceId &trace_id, const int64_t parallelism, @@ -50,7 +51,7 @@ ObDDLRedefinitionSSTableBuildTask::ObDDLRedefinitionSSTableBuildTask( const common::ObAddr &inner_sql_exec_addr) : is_inited_(false), tenant_id_(tenant_id), task_id_(task_id), data_table_id_(data_table_id), dest_table_id_(dest_table_id), schema_version_(schema_version), snapshot_version_(snapshot_version), - execution_id_(execution_id), sql_mode_(sql_mode), trace_id_(trace_id), + execution_id_(execution_id), consumer_group_id_(consumer_group_id), sql_mode_(sql_mode), trace_id_(trace_id), parallelism_(parallelism), use_heap_table_ddl_plan_(use_heap_table_ddl_plan), root_service_(root_service), inner_sql_exec_addr_(inner_sql_exec_addr) { @@ -135,6 +136,7 @@ int ObDDLRedefinitionSSTableBuildTask::process() session_param.ddl_info_.set_dest_table_hidden(true); session_param.ddl_info_.set_heap_table_ddl(use_heap_table_ddl_plan_); session_param.use_external_session_ = true; // means session id dispatched by session mgr + session_param.consumer_group_id_ = consumer_group_id_; common::ObAddr *sql_exec_addr = nullptr; if (inner_sql_exec_addr_.is_valid()) { @@ -188,6 +190,7 @@ ObAsyncTask *ObDDLRedefinitionSSTableBuildTask::deep_copy(char *buf, const int64 schema_version_, snapshot_version_, execution_id_, + consumer_group_id_, sql_mode_, trace_id_, parallelism_, @@ -776,6 +779,7 @@ int ObDDLRedefinitionTask::add_constraint_ddl_task(const int64_t constraint_id) constraint_id, table_schema->get_schema_version(), 0L/*parallelism*/, + consumer_group_id_, &allocator_, &alter_table_arg, task_id_); @@ -903,6 +907,7 @@ int ObDDLRedefinitionTask::add_fk_ddl_task(const int64_t fk_id) fk_id, hidden_table_schema->get_schema_version(), 0L/*parallelism*/, + consumer_group_id_, &allocator_, &alter_table_arg, task_id_); diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h index a007081427..38d64604bc 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h @@ -32,6 +32,7 @@ public: const int64_t schema_version, const int64_t snapshot_version, const int64_t execution_id, + const int64_t consumer_group_id, const ObSQLMode &sql_mode, const common::ObCurTraceId::TraceId &trace_id, const int64_t parallelism, @@ -56,6 +57,7 @@ private: int64_t schema_version_; int64_t snapshot_version_; int64_t execution_id_; + int64_t consumer_group_id_; ObSQLMode sql_mode_; ObTimeZoneInfoWrap tz_info_wrap_; share::ObColumnNameMap col_name_map_; diff --git a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp index f400e7a0b2..5e55fcfe44 100644 --- a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp @@ -155,6 +155,7 @@ int ObDDLRetryTask::init(const uint64_t tenant_id, const int64_t task_id, const uint64_t object_id, const int64_t schema_version, + const int64_t consumer_group_id, const share::ObDDLType &ddl_type, const obrpc::ObDDLArg *ddl_arg, const int64_t task_status) @@ -181,6 +182,7 @@ int ObDDLRetryTask::init(const uint64_t tenant_id, object_id_ = object_id; target_object_id_ = object_id; schema_version_ = schema_version; + consumer_group_id_ = consumer_group_id; tenant_id_ = tenant_id; task_id_ = task_id; task_type_ = ddl_type; diff --git a/src/rootserver/ddl_task/ob_ddl_retry_task.h b/src/rootserver/ddl_task/ob_ddl_retry_task.h index c7f8594778..b24ab0fe64 100644 --- a/src/rootserver/ddl_task/ob_ddl_retry_task.h +++ b/src/rootserver/ddl_task/ob_ddl_retry_task.h @@ -32,6 +32,7 @@ public: const int64_t task_id, const uint64_t object_id, const int64_t schema_version, + const int64_t consumer_group_id, const share::ObDDLType &type, const obrpc::ObDDLArg *ddl_arg, const int64_t task_status = share::ObDDLTaskStatus::PREPARE); diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index 87cc6e0df8..3aa74b47d2 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -671,6 +671,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.dest_table_schema_, param.parallelism_, param.parent_task_id_, + param.consumer_group_id_, create_index_arg, *param.allocator_, task_record))) { @@ -683,6 +684,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, if (OB_FAIL(create_drop_index_task(proxy, param.src_table_schema_, param.parent_task_id_, + param.consumer_group_id_, drop_index_arg, *param.allocator_, task_record))) { @@ -702,6 +704,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.src_table_schema_, param.dest_table_schema_, param.parallelism_, + param.consumer_group_id_, static_cast(param.ddl_arg_), *param.allocator_, task_record))) { @@ -715,6 +718,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.src_table_schema_, param.dest_table_schema_, param.parallelism_, + param.consumer_group_id_, alter_table_arg, *param.allocator_, task_record))) { @@ -731,6 +735,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.schema_version_, static_cast(param.ddl_arg_), param.parent_task_id_, + param.consumer_group_id_, *param.allocator_, task_record))) { LOG_WARN("fail to create constraint task failed", K(ret)); @@ -744,6 +749,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.src_table_schema_, param.dest_table_schema_, param.parallelism_, + param.consumer_group_id_, static_cast(param.ddl_arg_), *param.allocator_, task_record))) { @@ -755,6 +761,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.tenant_id_, param.src_table_schema_->get_table_id(), param.schema_version_, + param.consumer_group_id_, static_cast(param.ddl_arg_), *param.allocator_, task_record))) { @@ -772,6 +779,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.tenant_id_, param.object_id_, param.schema_version_, + param.consumer_group_id_, param.type_, param.ddl_arg_, *param.allocator_, @@ -854,6 +862,8 @@ int ObDDLScheduler::prepare_alter_table_arg(const ObPrepareAlterTableArgParam &p // do nothing } else if (FALSE_IT(alter_table_arg.sql_mode_ = param.sql_mode_)) { // do nothing + } else if (FALSE_IT(alter_table_arg.consumer_group_id_ = param.sql_mode_)) { + // do nothing } else if (FALSE_IT(alter_table_arg.ddl_stmt_str_.assign_ptr(ddl_stmt_str.ptr(), ddl_stmt_str.length()))) { // do nothing } else if (OB_FAIL(alter_table_arg.tz_info_.assign(param.tz_info_))) { @@ -1189,6 +1199,7 @@ int ObDDLScheduler::start_redef_table(const obrpc::ObStartRedefTableArg &arg, ob LOG_WARN("failed to build alter table arg", K(ret)); } else { common::ObArenaAllocator allocator(lib::ObLabel("StartRedefTable")); + int64_t group_id = THIS_WORKER.get_group_id(); //TODO qilu: pass id when directload_arg_init ObCreateDDLTaskParam param(tenant_id, arg.ddl_type_, orig_table_schema, @@ -1196,6 +1207,7 @@ int ObDDLScheduler::start_redef_table(const obrpc::ObStartRedefTableArg &arg, ob orig_table_schema->get_table_id(), orig_table_schema->get_schema_version(), arg.parallelism_, + group_id, &allocator, &alter_table_arg, 0); @@ -1220,6 +1232,7 @@ int ObDDLScheduler::create_build_index_task( const ObTableSchema *index_schema, const int64_t parallelism, const int64_t parent_task_id, + const int64_t consumer_group_id, const obrpc::ObCreateIndexArg *create_index_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record) @@ -1241,6 +1254,7 @@ int ObDDLScheduler::create_build_index_task( index_schema, index_schema->get_schema_version(), parallelism, + consumer_group_id, *create_index_arg, parent_task_id))) { LOG_WARN("init global index task failed", K(ret), K(data_table_schema), K(index_schema)); @@ -1259,6 +1273,7 @@ int ObDDLScheduler::create_drop_index_task( common::ObISQLClient &proxy, const share::schema::ObTableSchema *index_schema, const int64_t parent_task_id, + const int64_t consumer_group_id, const obrpc::ObDropIndexArg *drop_index_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record) @@ -1286,6 +1301,7 @@ int ObDDLScheduler::create_drop_index_task( index_table_id, index_schema->get_schema_version(), parent_task_id, + consumer_group_id, *drop_index_arg))) { LOG_WARN("init drop index task failed", K(ret), K(data_table_id), K(index_table_id)); } else if (OB_FAIL(index_task.set_trace_id(*ObCurTraceId::get_trace_id()))) { @@ -1305,6 +1321,7 @@ int ObDDLScheduler::create_constraint_task( const int64_t schema_version, const obrpc::ObAlterTableArg *arg, const int64_t parent_task_id, + const int64_t consumer_group_id, ObIAllocator &allocator, ObDDLTaskRecord &task_record) { @@ -1320,7 +1337,7 @@ int ObDDLScheduler::create_constraint_task( LOG_WARN("invalid argument", K(ret), K(table_schema), K(constraint_id), K(schema_version), K(arg)); } else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), task_id))) { LOG_WARN("fetch new task id failed", K(ret)); - } else if (OB_FAIL(constraint_task.init(task_id, table_schema, constraint_id, ddl_type, schema_version, *arg, parent_task_id))) { + } else if (OB_FAIL(constraint_task.init(task_id, table_schema, constraint_id, ddl_type, schema_version, *arg, consumer_group_id, parent_task_id))) { LOG_WARN("init constraint task failed", K(ret), K(table_schema), K(constraint_id)); } else if (OB_FAIL(constraint_task.set_trace_id(*ObCurTraceId::get_trace_id()))) { LOG_WARN("set trace id failed", K(ret)); @@ -1338,6 +1355,7 @@ int ObDDLScheduler::create_table_redefinition_task( const share::schema::ObTableSchema *src_schema, const share::schema::ObTableSchema *dest_schema, const int64_t parallelism, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record) @@ -1360,6 +1378,7 @@ int ObDDLScheduler::create_table_redefinition_task( dest_schema->get_table_id(), dest_schema->get_schema_version(), parallelism, + consumer_group_id, *alter_table_arg))) { LOG_WARN("fail to init redefinition task", K(ret)); } else if (OB_FAIL(redefinition_task.set_trace_id(*ObCurTraceId::get_trace_id()))) { @@ -1378,6 +1397,7 @@ int ObDDLScheduler::create_drop_primary_key_task( const ObTableSchema *src_schema, const ObTableSchema *dest_schema, const int64_t parallelism, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record) @@ -1400,6 +1420,7 @@ int ObDDLScheduler::create_drop_primary_key_task( dest_schema->get_table_id(), dest_schema->get_schema_version(), parallelism, + consumer_group_id, *alter_table_arg))) { LOG_WARN("fail to init redefinition task", K(ret)); } else if (OB_FAIL(drop_pk_task.set_trace_id(*ObCurTraceId::get_trace_id()))) { @@ -1418,6 +1439,7 @@ int ObDDLScheduler::create_column_redefinition_task( const share::schema::ObTableSchema *src_schema, const share::schema::ObTableSchema *dest_schema, const int64_t parallelism, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record) @@ -1440,6 +1462,7 @@ int ObDDLScheduler::create_column_redefinition_task( dest_schema->get_table_id(), dest_schema->get_schema_version(), parallelism, + consumer_group_id, *alter_table_arg))) { LOG_WARN("fail to init redefinition task", K(ret)); } else if (OB_FAIL(redefinition_task.set_trace_id(*ObCurTraceId::get_trace_id()))) { @@ -1457,6 +1480,7 @@ int ObDDLScheduler::create_modify_autoinc_task( const uint64_t tenant_id, const int64_t table_id, const int64_t schema_version, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg *arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record) @@ -1473,7 +1497,7 @@ int ObDDLScheduler::create_modify_autoinc_task( LOG_WARN("invalid argument", K(ret), K(tenant_id), K(table_id), K(schema_version), K(arg)); } else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), task_id))) { LOG_WARN("fetch new task id failed", K(ret)); - } else if (OB_FAIL(modify_autoinc_task.init(tenant_id, task_id, table_id, schema_version, *arg))) { + } else if (OB_FAIL(modify_autoinc_task.init(tenant_id, task_id, table_id, schema_version, consumer_group_id, *arg))) { LOG_WARN("init global index task failed", K(ret), K(table_id), K(arg)); } else if (OB_FAIL(modify_autoinc_task.set_trace_id(*ObCurTraceId::get_trace_id()))) { LOG_WARN("set trace id failed", K(ret)); @@ -1492,6 +1516,7 @@ int ObDDLScheduler::create_ddl_retry_task( const uint64_t tenant_id, const uint64_t object_id, const int64_t schema_version, + const int64_t consumer_group_id, const share::ObDDLType &type, const obrpc::ObDDLArg *arg, ObIAllocator &allocator, @@ -1509,7 +1534,7 @@ int ObDDLScheduler::create_ddl_retry_task( LOG_WARN("invalid argument", K(ret), K(tenant_id), K(object_id), K(schema_version), K(arg)); } else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), task_id))) { LOG_WARN("fetch new task id failed", K(ret)); - } else if (OB_FAIL(ddl_retry_task.init(tenant_id, task_id, object_id, schema_version, type, arg))) { + } else if (OB_FAIL(ddl_retry_task.init(tenant_id, task_id, object_id, schema_version, consumer_group_id, type, arg))) { LOG_WARN("init ddl retry task failed", K(ret), K(arg)); } else if (OB_FAIL(ddl_retry_task.set_trace_id(*ObCurTraceId::get_trace_id()))) { LOG_WARN("set trace id failed", K(ret)); diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.h b/src/rootserver/ddl_task/ob_ddl_scheduler.h index d9420968aa..d8f1020c2c 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.h +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.h @@ -276,6 +276,7 @@ private: const share::schema::ObTableSchema *index_schema, const int64_t parallelism, const int64_t parent_task_id, + const int64_t consumer_group_id, const obrpc::ObCreateIndexArg *create_index_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -287,6 +288,7 @@ private: const int64_t schema_version, const obrpc::ObAlterTableArg *arg, const int64_t parent_task_id, + const int64_t consumer_group_id, ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -296,6 +298,7 @@ private: const share::schema::ObTableSchema *src_schema, const share::schema::ObTableSchema *dest_schema, const int64_t parallelism, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -306,6 +309,7 @@ private: const ObTableSchema *src_schema, const ObTableSchema *dest_schema, const int64_t parallelism, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -316,6 +320,7 @@ private: const share::schema::ObTableSchema *src_schema, const share::schema::ObTableSchema *dest_schema, const int64_t parallelism, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -325,6 +330,7 @@ private: const uint64_t tenant_id, const int64_t table_id, const int64_t schema_version, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -333,6 +339,7 @@ private: common::ObISQLClient &proxy, const share::schema::ObTableSchema *index_schema, const int64_t parent_task_id, + const int64_t consumer_group_id, const obrpc::ObDropIndexArg *drop_index_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -342,6 +349,7 @@ private: const uint64_t tenant_id, const uint64_t object_id, const int64_t schema_version, + const int64_t consumer_group_id, const share::ObDDLType &type, const obrpc::ObDDLArg *arg, ObIAllocator &allocator, diff --git a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp index a70edef930..10180db7fb 100644 --- a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp +++ b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp @@ -45,6 +45,7 @@ int ObDDLSingleReplicaExecutor::build(const ObDDLSingleReplicaExecutorParam &par execution_id_ = param.execution_id_; parallelism_ = param.parallelism_; data_format_version_ = param.data_format_version_; + consumer_group_id_ = param.consumer_group_id_; common::ObIArray &build_infos = partition_build_stat_; common::ObIArray &tablet_ids = source_tablet_ids_; @@ -120,6 +121,7 @@ int ObDDLSingleReplicaExecutor::schedule_task() arg.execution_id_ = execution_id_; arg.data_format_version_ = data_format_version_; arg.tablet_task_id_ = tablet_task_ids_.at(i); + arg.consumer_group_id_ = consumer_group_id_; if (OB_FAIL(location_service->get(tenant_id_, arg.source_tablet_id_, expire_renew_time, is_cache_hit, ls_id))) { LOG_WARN("get ls failed", K(ret), K(arg.source_tablet_id_)); diff --git a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.h b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.h index 92a36c54e6..7045efd3cc 100644 --- a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.h +++ b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.h @@ -37,7 +37,8 @@ public: task_id_(0), parallelism_(0), execution_id_(-1), - data_format_version_(0) + data_format_version_(0), + consumer_group_id_(0) {} ~ObDDLSingleReplicaExecutorParam() = default; bool is_valid() const { @@ -45,11 +46,12 @@ public: && source_tablet_ids_.count() > 0 && dest_tablet_ids_.count() > 0 && common::OB_INVALID_ID != source_table_id_ && common::OB_INVALID_ID != dest_table_id_ && schema_version_ > 0 && snapshot_version_ > 0 && task_id_ > 0 && execution_id_ >= 0 - && data_format_version_ > 0; + && data_format_version_ > 0 && consumer_group_id_ >= 0; } TO_STRING_KV(K_(tenant_id), K_(type), K_(source_tablet_ids), K_(dest_tablet_ids), K_(source_table_id), K_(dest_table_id), K_(schema_version), - K_(snapshot_version), K_(task_id), K_(parallelism), K_(execution_id), K_(data_format_version)); + K_(snapshot_version), K_(task_id), K_(parallelism), K_(execution_id), + K_(data_format_version), K_(consumer_group_id)); public: uint64_t tenant_id_; share::ObDDLType type_; @@ -63,6 +65,7 @@ public: int64_t parallelism_; int64_t execution_id_; int64_t data_format_version_; + int64_t consumer_group_id_; }; class ObDDLSingleReplicaExecutor @@ -122,6 +125,7 @@ private: int64_t parallelism_; int64_t execution_id_; int64_t data_format_version_; + int64_t consumer_group_id_; common::ObArray partition_build_stat_; common::ObSpinLock lock_; }; diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 68a2c2f86e..9b30de3128 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -125,12 +125,16 @@ int ObDDLTaskID::assign(const ObDDLTaskID &other) return ret; } -ObDDLTaskSerializeField::ObDDLTaskSerializeField(const int64_t task_version, const int64_t parallelism, - const int64_t data_format_version, const bool is_abort) +ObDDLTaskSerializeField::ObDDLTaskSerializeField(const int64_t task_version, + const int64_t parallelism, + const int64_t data_format_version, + const int64_t consumer_group_id, + const bool is_abort) { task_version_ = task_version; parallelism_ = parallelism; data_format_version_ = data_format_version; + consumer_group_id_ = consumer_group_id; is_abort_ = is_abort; } @@ -139,13 +143,19 @@ void ObDDLTaskSerializeField::reset() task_version_ = 0; parallelism_ = 0; data_format_version_ = 0; + consumer_group_id_ = 0; is_abort_ = false; } -OB_SERIALIZE_MEMBER(ObDDLTaskSerializeField, task_version_, parallelism_, data_format_version_, is_abort_); +OB_SERIALIZE_MEMBER(ObDDLTaskSerializeField, + task_version_, + parallelism_, + data_format_version_, + consumer_group_id_, + is_abort_); ObCreateDDLTaskParam::ObCreateDDLTaskParam() - : tenant_id_(OB_INVALID_ID), object_id_(OB_INVALID_ID), schema_version_(0), parallelism_(0), parent_task_id_(0), + : tenant_id_(OB_INVALID_ID), object_id_(OB_INVALID_ID), schema_version_(0), parallelism_(0), consumer_group_id_(0), parent_task_id_(0), type_(DDL_INVALID), src_table_schema_(nullptr), dest_table_schema_(nullptr), ddl_arg_(nullptr), allocator_(nullptr) { } @@ -157,10 +167,11 @@ ObCreateDDLTaskParam::ObCreateDDLTaskParam(const uint64_t tenant_id, const int64_t object_id, const int64_t schema_version, const int64_t parallelism, + const int64_t consumer_group_id, ObIAllocator *allocator, const obrpc::ObDDLArg *ddl_arg, const int64_t parent_task_id) - : tenant_id_(tenant_id), object_id_(object_id), schema_version_(schema_version), parallelism_(parallelism), + : tenant_id_(tenant_id), object_id_(object_id), schema_version_(schema_version), parallelism_(parallelism), consumer_group_id_(consumer_group_id), parent_task_id_(parent_task_id), type_(type), src_table_schema_(src_table_schema), dest_table_schema_(dest_table_schema), ddl_arg_(ddl_arg), allocator_(allocator) { @@ -812,7 +823,7 @@ int ObDDLTask::set_ddl_stmt_str(const ObString &ddl_stmt_str) int ObDDLTask::serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const { int ret = OB_SUCCESS; - ObDDLTaskSerializeField serialize_field(task_version_, parallelism_, data_format_version_, is_abort_); + ObDDLTaskSerializeField serialize_field(task_version_, parallelism_, data_format_version_, consumer_group_id_, is_abort_); if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_size)); @@ -827,15 +838,13 @@ int ObDDLTask::deserlize_params_from_message(const uint64_t tenant_id, const cha int ret = OB_SUCCESS; ObDDLTaskSerializeField serialize_field; serialize_field.reset(); - if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(buf), K(tenant_id), K(buf_size)); - } else if (OB_FAIL(serialize_field.deserialize(buf, buf_size, pos))) { + if (OB_FAIL(serialize_field.deserialize(buf, buf_size, pos))) { LOG_WARN("serialize_field deserialize failed", K(ret)); } else { task_version_ = serialize_field.task_version_; parallelism_ = serialize_field.parallelism_; data_format_version_ = serialize_field.data_format_version_; + consumer_group_id_ = serialize_field.consumer_group_id_; is_abort_ = serialize_field.is_abort_; } return ret; @@ -843,7 +852,7 @@ int ObDDLTask::deserlize_params_from_message(const uint64_t tenant_id, const cha int64_t ObDDLTask::get_serialize_param_size() const { - ObDDLTaskSerializeField serialize_field(task_version_, parallelism_, data_format_version_, is_abort_); + ObDDLTaskSerializeField serialize_field(task_version_, parallelism_, data_format_version_, consumer_group_id_, is_abort_); return serialize_field.get_serialize_size(); } @@ -3212,5 +3221,6 @@ int ObDDLTask::init_ddl_task_monitor_info(const uint64_t target_table_id) } + } // end namespace rootserver } // end namespace oceanbase diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index e8e4ad62e7..3f8bf256da 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -108,18 +108,23 @@ struct ObDDLTaskSerializeField final { OB_UNIS_VERSION(1); public: - TO_STRING_KV(K_(task_version), K_(parallelism), K_(data_format_version), K_(is_abort)); - ObDDLTaskSerializeField() : task_version_(0), parallelism_(0), data_format_version_(0), is_abort_(false) {} - ObDDLTaskSerializeField(const int64_t task_version, const int64_t parallelism, - const int64_t data_format_version, const bool is_abort); + TO_STRING_KV(K_(task_version), K_(parallelism), K_(data_format_version), K_(consumer_group_id), K_(is_abort)); + ObDDLTaskSerializeField() : task_version_(0), parallelism_(0), data_format_version_(0), consumer_group_id_(0), is_abort_(false) {} + ObDDLTaskSerializeField(const int64_t task_version, + const int64_t parallelism, + const int64_t data_format_version, + const int64_t consumer_group_id, + const bool is_abort); ~ObDDLTaskSerializeField() = default; void reset(); public: int64_t task_version_; int64_t parallelism_; int64_t data_format_version_; + int64_t consumer_group_id_; bool is_abort_; }; + struct ObCreateDDLTaskParam final { public: @@ -131,19 +136,21 @@ public: const int64_t object_id, const int64_t schema_version, const int64_t parallelism, + const int64_t consumer_group_id, ObIAllocator *allocator, const obrpc::ObDDLArg *ddl_arg = nullptr, const int64_t parent_task_id = 0); ~ObCreateDDLTaskParam() = default; bool is_valid() const { return OB_INVALID_ID != tenant_id_ && type_ > share::DDL_INVALID && type_ < share::DDL_MAX && nullptr != allocator_; } - TO_STRING_KV(K_(tenant_id), K_(object_id), K_(schema_version), K_(parallelism), K_(parent_task_id), K_(type), - KPC_(src_table_schema), KPC_(dest_table_schema), KPC_(ddl_arg)); + TO_STRING_KV(K_(tenant_id), K_(object_id), K_(schema_version), K_(parallelism), K_(consumer_group_id), K_(parent_task_id), + K_(type), KPC_(src_table_schema), KPC_(dest_table_schema), KPC_(ddl_arg)); public: uint64_t tenant_id_; int64_t object_id_; int64_t schema_version_; int64_t parallelism_; + int64_t consumer_group_id_; int64_t parent_task_id_; share::ObDDLType type_; const ObTableSchema *src_table_schema_; @@ -437,6 +444,7 @@ public: void set_task_status(const share::ObDDLTaskStatus new_status) {task_status_ = new_status; } void set_is_abort(const bool is_abort) { is_abort_ = is_abort; } bool get_is_abort() { return is_abort_; } + void set_consumer_group_id(const int64_t group_id) { consumer_group_id_ = group_id; } bool try_set_running() { return !ATOMIC_CAS(&is_running_, false, true); } uint64_t get_tenant_id() const { return tenant_id_; } uint64_t get_object_id() const { return object_id_; } @@ -506,7 +514,7 @@ public: K_(ret_code), K_(task_id), K_(parent_task_id), K_(parent_task_key), K_(task_version), K_(parallelism), K_(ddl_stmt_str), K_(compat_mode), K_(sys_task_id), K_(err_code_occurence_cnt), K_(stat_info), - K_(next_schedule_ts), K_(delay_schedule_time), K(execution_id_), K(sql_exec_addr_), K_(data_format_version)); + K_(next_schedule_ts), K_(delay_schedule_time), K(execution_id_), K(sql_exec_addr_), K_(data_format_version), K(consumer_group_id_)); protected: int gather_redefinition_stats(const uint64_t tenant_id, const int64_t task_id, @@ -572,6 +580,7 @@ protected: common::ObAddr sql_exec_addr_; int64_t start_time_; int64_t data_format_version_; + int64_t consumer_group_id_; }; enum ColChecksumStat diff --git a/src/rootserver/ddl_task/ob_drop_index_task.cpp b/src/rootserver/ddl_task/ob_drop_index_task.cpp index 934cca3c3c..39493d075b 100644 --- a/src/rootserver/ddl_task/ob_drop_index_task.cpp +++ b/src/rootserver/ddl_task/ob_drop_index_task.cpp @@ -41,6 +41,7 @@ int ObDropIndexTask::init( const uint64_t index_table_id, const int64_t schema_version, const int64_t parent_task_id, + const int64_t consumer_group_id, const obrpc::ObDropIndexArg &drop_index_arg) { int ret = OB_SUCCESS; @@ -61,6 +62,7 @@ int ObDropIndexTask::init( schema_version_ = schema_version; task_id_ = task_id; parent_task_id_ = parent_task_id; + consumer_group_id_ = consumer_group_id; task_version_ = OB_DROP_INDEX_TASK_VERSION; is_inited_ = true; ddl_tracing_.open(); diff --git a/src/rootserver/ddl_task/ob_drop_index_task.h b/src/rootserver/ddl_task/ob_drop_index_task.h index 50e34ba7e0..a0a52523e3 100644 --- a/src/rootserver/ddl_task/ob_drop_index_task.h +++ b/src/rootserver/ddl_task/ob_drop_index_task.h @@ -32,6 +32,7 @@ public: const uint64_t index_table_id, const int64_t schema_version, const int64_t parent_task_id, + const int64_t consumer_group_id, const obrpc::ObDropIndexArg &drop_index_arg); int init(const ObDDLTaskRecord &task_record); virtual int process() override; diff --git a/src/rootserver/ddl_task/ob_drop_primary_key_task.cpp b/src/rootserver/ddl_task/ob_drop_primary_key_task.cpp index b044b85431..1469e95239 100644 --- a/src/rootserver/ddl_task/ob_drop_primary_key_task.cpp +++ b/src/rootserver/ddl_task/ob_drop_primary_key_task.cpp @@ -40,14 +40,15 @@ ObDropPrimaryKeyTask::~ObDropPrimaryKeyTask() int ObDropPrimaryKeyTask::init(const uint64_t tenant_id, const int64_t task_id, const share::ObDDLType &ddl_type, const int64_t data_table_id, const int64_t dest_table_id, const int64_t schema_version, const int64_t parallelism, - const obrpc::ObAlterTableArg &alter_table_arg, const int64_t task_status, const int64_t snapshot_version) + const int64_t consumer_group_id, const obrpc::ObAlterTableArg &alter_table_arg, const int64_t task_status, const int64_t snapshot_version) { int ret = OB_SUCCESS; if (OB_FAIL(ObTableRedefinitionTask::init(tenant_id, task_id, ddl_type, data_table_id, - dest_table_id, schema_version, parallelism, + dest_table_id, schema_version, parallelism, consumer_group_id, alter_table_arg, task_status, snapshot_version))) { LOG_WARN("fail to init ObDropPrimaryKeyTask", K(ret)); } else { + consumer_group_id_ = consumer_group_id; task_version_ = OB_DROP_PRIMARY_KEY_TASK_VERSION; ddl_tracing_.open(); } diff --git a/src/rootserver/ddl_task/ob_drop_primary_key_task.h b/src/rootserver/ddl_task/ob_drop_primary_key_task.h index 1755e54f58..ee5524cfc3 100644 --- a/src/rootserver/ddl_task/ob_drop_primary_key_task.h +++ b/src/rootserver/ddl_task/ob_drop_primary_key_task.h @@ -35,6 +35,7 @@ public: const int64_t dest_table_id, const int64_t schema_version, const int64_t parallelism, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg &alter_table_arg, const int64_t task_status = share::ObDDLTaskStatus::PREPARE, const int64_t snapshot_version = 0); diff --git a/src/rootserver/ddl_task/ob_index_build_task.cpp b/src/rootserver/ddl_task/ob_index_build_task.cpp index 9cea44e13b..396cd0dc68 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_index_build_task.cpp @@ -110,6 +110,7 @@ int ObIndexSSTableBuildTask::process() session_param.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP] = nls_timestamp_format_; session_param.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP_TZ] = nls_timestamp_tz_format_; session_param.use_external_session_ = true; // means session id dispatched by session mgr + session_param.consumer_group_id_ = consumer_group_id_; common::ObAddr *sql_exec_addr = nullptr; if (inner_sql_exec_addr_.is_valid()) { @@ -165,6 +166,7 @@ ObAsyncTask *ObIndexSSTableBuildTask::deep_copy(char *buf, const int64_t buf_siz schema_version_, snapshot_version_, execution_id_, + consumer_group_id_, trace_id_, parallelism_, root_service_, @@ -309,6 +311,7 @@ int ObIndexBuildTask::init( const ObTableSchema *index_schema, const int64_t schema_version, const int64_t parallelism, + const int64_t consumer_group_id, const obrpc::ObCreateIndexArg &create_index_arg, const int64_t parent_task_id /* = 0 */, const int64_t task_status /* = TaskStatus::PREPARE */, @@ -357,6 +360,7 @@ int ObIndexBuildTask::init( if (ObDDLTaskStatus::VALIDATE_CHECKSUM == task_status) { sstable_complete_ts_ = ObTimeUtility::current_time(); } + consumer_group_id_ = consumer_group_id; task_id_ = task_id; parent_task_id_ = parent_task_id; task_version_ = OB_INDEX_BUILD_TASK_VERSION; @@ -788,6 +792,7 @@ int ObIndexBuildTask::send_build_single_replica_request() schema_version_, snapshot_version_, new_execution_id, + consumer_group_id_, trace_id_, parallelism_, root_service_, diff --git a/src/rootserver/ddl_task/ob_index_build_task.h b/src/rootserver/ddl_task/ob_index_build_task.h index c2fc3ee249..d619d4a5da 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.h +++ b/src/rootserver/ddl_task/ob_index_build_task.h @@ -31,13 +31,14 @@ public: const int64_t schema_version, const int64_t snapshot_version, const int64_t execution_id, + const int64_t consumer_group_id, const common::ObCurTraceId::TraceId &trace_id, const int64_t parallelism, ObRootService *root_service, const common::ObAddr &inner_sql_exec_addr) : task_id_(task_id), tenant_id_(tenant_id), data_table_id_(data_table_id), dest_table_id_(dest_table_id), schema_version_(schema_version), snapshot_version_(snapshot_version), execution_id_(execution_id), - trace_id_(trace_id), parallelism_(parallelism), allocator_("IdxSSTBuildTask"), + consumer_group_id_(consumer_group_id), trace_id_(trace_id), parallelism_(parallelism), allocator_("IdxSSTBuildTask"), root_service_(root_service), inner_sql_exec_addr_(inner_sql_exec_addr) { set_retry_times(0); @@ -52,7 +53,7 @@ public: virtual int64_t get_deep_copy_size() const override { return sizeof(*this); } virtual ObAsyncTask *deep_copy(char *buf, const int64_t buf_size) const override; TO_STRING_KV(K_(data_table_id), K_(dest_table_id), K_(schema_version), K_(snapshot_version), - K_(execution_id), K_(trace_id), K_(parallelism), K_(nls_date_format), + K_(execution_id), K_(consumer_group_id), K_(trace_id), K_(parallelism), K_(nls_date_format), K_(nls_timestamp_format), K_(nls_timestamp_tz_format)); private: @@ -63,6 +64,7 @@ private: int64_t schema_version_; int64_t snapshot_version_; int64_t execution_id_; + int64_t consumer_group_id_; common::ObCurTraceId::TraceId trace_id_; int64_t parallelism_; common::ObArenaAllocator allocator_; @@ -87,6 +89,7 @@ public: const share::schema::ObTableSchema *index_schema, const int64_t schema_version, const int64_t parallel, + const int64_t consumer_group_id, const obrpc::ObCreateIndexArg &create_index_arg, const int64_t parent_task_id /* = 0 */, const int64_t task_status = share::ObDDLTaskStatus::PREPARE, diff --git a/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp b/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp index 447fd3b751..fcd69764b3 100644 --- a/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp +++ b/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp @@ -153,6 +153,7 @@ int ObModifyAutoincTask::init(const uint64_t tenant_id, const int64_t task_id, const int64_t table_id, const int64_t schema_version, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg &alter_table_arg, const int64_t task_status, const int64_t snapshot_version) @@ -174,6 +175,7 @@ int ObModifyAutoincTask::init(const uint64_t tenant_id, object_id_ = table_id; target_object_id_ = table_id; schema_version_ = schema_version; + consumer_group_id_ = consumer_group_id; task_status_ = static_cast(task_status); snapshot_version_ = snapshot_version; tenant_id_ = tenant_id; diff --git a/src/rootserver/ddl_task/ob_modify_autoinc_task.h b/src/rootserver/ddl_task/ob_modify_autoinc_task.h index 4971763567..d8e9757847 100644 --- a/src/rootserver/ddl_task/ob_modify_autoinc_task.h +++ b/src/rootserver/ddl_task/ob_modify_autoinc_task.h @@ -58,6 +58,7 @@ public: const int64_t task_id, const int64_t table_id, const int64_t schema_version, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg &alter_table_arg, const int64_t task_status = share::ObDDLTaskStatus::LOCK_TABLE, const int64_t snapshot_version = 0); diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index 33afeb8313..b0bd47b454 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -45,7 +45,7 @@ ObTableRedefinitionTask::~ObTableRedefinitionTask() int ObTableRedefinitionTask::init(const uint64_t tenant_id, const int64_t task_id, const share::ObDDLType &ddl_type, const int64_t data_table_id, const int64_t dest_table_id, const int64_t schema_version, const int64_t parallelism, - const ObAlterTableArg &alter_table_arg, const int64_t task_status, const int64_t snapshot_version) + const int64_t consumer_group_id, const ObAlterTableArg &alter_table_arg, const int64_t task_status, const int64_t snapshot_version) { int ret = OB_SUCCESS; uint64_t tenant_data_format_version = 0; @@ -63,6 +63,7 @@ int ObTableRedefinitionTask::init(const uint64_t tenant_id, const int64_t task_i } else if (OB_FAIL(ObShareUtil::fetch_current_data_version(*GCTX.sql_proxy_, tenant_id, tenant_data_format_version))) { LOG_WARN("get min data version failed", K(ret), K(tenant_id)); } else { + consumer_group_id_ = consumer_group_id; task_type_ = ddl_type; object_id_ = data_table_id; target_object_id_ = dest_table_id; @@ -228,6 +229,7 @@ int ObTableRedefinitionTask::send_build_replica_request_by_sql() schema_version_, snapshot_version_, new_execution_id, + consumer_group_id_, sql_mode, trace_id_, parallelism_, @@ -469,6 +471,7 @@ int ObTableRedefinitionTask::copy_table_indexes() 0/*object_id*/, index_schema->get_schema_version(), parallelism_ / index_ids.count()/*parallelism*/, + consumer_group_id_, &allocator_, &create_index_arg, task_id_); diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.h b/src/rootserver/ddl_task/ob_table_redefinition_task.h index f3a370c751..a138baa91b 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.h @@ -39,6 +39,7 @@ public: const int64_t dest_table_id, const int64_t schema_version, const int64_t parallelism, + const int64_t consumer_group_id, const obrpc::ObAlterTableArg &alter_table_arg, const int64_t task_status = share::ObDDLTaskStatus::PREPARE, const int64_t snapshot_version = 0); diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index 94e3b8eaf7..fca20cbf26 100644 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -4962,6 +4962,7 @@ int ObDDLService::alter_table_index(const obrpc::ObAlterTableArg &alter_table_ar 0/*object_id*/, new_index_schema.get_schema_version(), 0L/*parallelism*/, + drop_index_arg->consumer_group_id_, &allocator, drop_index_arg); if (OB_FAIL(GCTX.root_service_->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) { @@ -10305,7 +10306,8 @@ int ObDDLService::alter_table_in_trans(obrpc::ObAlterTableArg &alter_table_arg, &index_schema, alter_table_arg.parallelism_, alter_table_arg.allocator_, - task_record))) { + task_record, + alter_table_arg.consumer_group_id_))) { LOG_WARN("fail to submit build index task", K(ret), "type", create_index_arg->index_type_); } else if (OB_FAIL(ddl_tasks.push_back(task_record))) { LOG_WARN("fail to push ddl task", K(ret), K(task_record)); @@ -10355,6 +10357,7 @@ int ObDDLService::alter_table_in_trans(obrpc::ObAlterTableArg &alter_table_arg, (*iter)->get_constraint_id(), new_table_schema.get_schema_version(), 0/*parallelsim*/, + const_alter_table_arg.consumer_group_id_, &alter_table_arg.allocator_, &const_alter_table_arg); if (OB_FAIL(GCTX.root_service_->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) { @@ -10400,6 +10403,7 @@ int ObDDLService::alter_table_in_trans(obrpc::ObAlterTableArg &alter_table_arg, fk_id, new_table_schema.get_schema_version(), 0/*parallelism*/, + const_alter_table_arg.consumer_group_id_, &alter_table_arg.allocator_, &const_alter_table_arg); if (OB_FAIL(GCTX.root_service_->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) { @@ -10886,6 +10890,7 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar 0/*object_id*/, new_table_schema.get_schema_version(), alter_table_arg.parallelism_, + alter_table_arg.consumer_group_id_, &alter_table_arg.allocator_, &alter_table_arg); if (orig_table_schema->is_tmp_table()) { @@ -10912,6 +10917,7 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar 0/*object_id*/, new_table_schema.get_schema_version(), alter_table_arg.parallelism_, + alter_table_arg.consumer_group_id_, &alter_table_arg.allocator_, &alter_table_arg); if (OB_FAIL(root_service->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) { @@ -11026,6 +11032,7 @@ int ObDDLService::create_hidden_table( } else if (OB_FAIL(root_service->get_ddl_scheduler().prepare_alter_table_arg(param, &new_table_schema, alter_table_arg))) { LOG_WARN("prepare alter table arg fail", K(ret)); } else { + alter_table_arg.consumer_group_id_ = create_hidden_table_arg.consumer_group_id_; LOG_DEBUG("alter table arg preparation complete!", K(ret), K(alter_table_arg)); ObCreateDDLTaskParam param(tenant_id, create_hidden_table_arg.ddl_type_, @@ -11034,6 +11041,7 @@ int ObDDLService::create_hidden_table( table_id, orig_table_schema->get_schema_version(), create_hidden_table_arg.parallelism_, + create_hidden_table_arg.consumer_group_id_, &allocator_for_redef, &alter_table_arg); if (OB_FAIL(root_service->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) { @@ -19559,7 +19567,8 @@ int ObDDLService::rebuild_index(const ObRebuildIndexArg &arg, obrpc::ObAlterTabl &new_table_schema, arg.parallelism_, allocator, - task_record))) { + task_record, + arg.consumer_group_id_))) { LOG_WARN("fail to submit build global index task", KR(ret)); } else { res.index_table_id_ = new_table_schema.get_table_id(); diff --git a/src/rootserver/ob_index_builder.cpp b/src/rootserver/ob_index_builder.cpp index ad34cef4ee..5ce8c3c757 100644 --- a/src/rootserver/ob_index_builder.cpp +++ b/src/rootserver/ob_index_builder.cpp @@ -293,7 +293,8 @@ int ObIndexBuilder::do_create_global_index( &index_schema, arg.parallelism_, allocator, - task_record))) { + task_record, + arg.consumer_group_id_))) { LOG_WARN("fail to submit build global index task", K(ret)); } } @@ -328,7 +329,8 @@ int ObIndexBuilder::submit_build_index_task( const ObTableSchema *index_schema, const int64_t parallelism, common::ObIAllocator &allocator, - ObDDLTaskRecord &task_record) + ObDDLTaskRecord &task_record, + const int64_t group_id) { int ret = OB_SUCCESS; ObCreateDDLTaskParam param(index_schema->get_tenant_id(), @@ -338,6 +340,7 @@ int ObIndexBuilder::submit_build_index_task( 0/*object_id*/, index_schema->get_schema_version(), parallelism, + group_id, &allocator, &create_index_arg); if (OB_ISNULL(data_schema) || OB_ISNULL(index_schema)) { @@ -369,6 +372,7 @@ int ObIndexBuilder::submit_drop_index_task(const ObTableSchema &index_schema, 0/*object_id*/, index_schema.get_schema_version(), 0/*parallelism*/, + arg.consumer_group_id_, &allocator, &arg); if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().create_ddl_task(param, ddl_service_.get_sql_proxy(), task_record))) { @@ -463,7 +467,8 @@ int ObIndexBuilder::do_create_local_index( &index_schema, create_index_arg.parallelism_, allocator, - task_record))) { + task_record, + create_index_arg.consumer_group_id_))) { LOG_WARN("failt to submit build local index task", K(ret)); } else { res.index_table_id_ = index_schema.get_table_id(); diff --git a/src/rootserver/ob_index_builder.h b/src/rootserver/ob_index_builder.h index 45a02de33f..4b4f3f1366 100644 --- a/src/rootserver/ob_index_builder.h +++ b/src/rootserver/ob_index_builder.h @@ -88,7 +88,8 @@ public: const share::schema::ObTableSchema *index_schema, const int64_t parallelism, common::ObIAllocator &allocator, - ObDDLTaskRecord &task_record); + ObDDLTaskRecord &task_record, + const int64_t group_id); private: typedef common::ObArray > OrderFTColumns; class FulltextColumnOrder diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index 81a4136768..c0dd992e92 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -4083,6 +4083,7 @@ int ObRootService::alter_table(const obrpc::ObAlterTableArg &arg, obrpc::ObAlter orig_table_schema->get_table_id(), orig_table_schema->get_schema_version(), arg.parallelism_, + arg.consumer_group_id_, &allocator, &arg, 0 /* parent task id*/); @@ -4202,6 +4203,7 @@ int ObRootService::drop_table(const obrpc::ObDropTableArg &arg, obrpc::ObDDLRes target_object_id, schema_version, arg.parallelism_, + arg.consumer_group_id_, &allocator, &arg, 0 /* parent task id*/); @@ -4261,6 +4263,7 @@ int ObRootService::drop_database(const obrpc::ObDropDatabaseArg &arg, ObDropData database_id, schema_version, arg.parallelism_, + arg.consumer_group_id_, &allocator, &arg, 0 /* parent task id*/); @@ -4451,6 +4454,7 @@ int ObRootService::truncate_table(const obrpc::ObTruncateTableArg &arg, obrpc::O table_schema->get_table_id(), table_schema->get_schema_version(), arg.parallelism_, + arg.consumer_group_id_, &allocator, &arg, 0 /* parent task id*/); diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 2ad351125b..19dcf242e3 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -79,6 +79,7 @@ int ObDDLArg::assign(const ObDDLArg &other) sync_from_primary_ = other.sync_from_primary_; parallelism_ = other.parallelism_; task_id_ = other.task_id_; + consumer_group_id_ = other.consumer_group_id_; } return ret; } @@ -534,6 +535,7 @@ int ObCreateHiddenTableArg::assign(const ObCreateHiddenTableArg &arg) } else { tenant_id_ = arg.tenant_id_; table_id_ = arg.table_id_; + consumer_group_id_ = arg.consumer_group_id_; dest_tenant_id_ = arg.dest_tenant_id_; session_id_ = arg.session_id_; parallelism_ = arg.parallelism_; @@ -558,6 +560,7 @@ OB_DEF_SERIALIZE(ObCreateHiddenTableArg) LST_DO_CODE(OB_UNIS_ENCODE, tenant_id_, table_id_, + consumer_group_id_, dest_tenant_id_, session_id_, parallelism_, @@ -585,6 +588,7 @@ OB_DEF_DESERIALIZE(ObCreateHiddenTableArg) LST_DO_CODE(OB_UNIS_DECODE, tenant_id_, table_id_, + consumer_group_id_, dest_tenant_id_, session_id_, parallelism_, @@ -627,6 +631,7 @@ OB_DEF_SERIALIZE_SIZE(ObCreateHiddenTableArg) LST_DO_CODE(OB_UNIS_ADD_LEN, tenant_id_, table_id_, + consumer_group_id_, dest_tenant_id_, session_id_, parallelism_, @@ -902,7 +907,8 @@ OB_SERIALIZE_MEMBER(ObDDLArg, sync_from_primary_, based_schema_object_infos_, parallelism_, - task_id_); + task_id_, + consumer_group_id_); ////////////////////////////////////////////// // @@ -6396,7 +6402,7 @@ OB_SERIALIZE_MEMBER(ObLogReqLoadProxyProgressResponse, err_, progress_); OB_SERIALIZE_MEMBER(ObDDLBuildSingleReplicaRequestArg, tenant_id_, ls_id_, source_tablet_id_, dest_tablet_id_, source_table_id_, dest_schema_id_, schema_version_, snapshot_version_, ddl_type_, task_id_, execution_id_, - parallelism_, tablet_task_id_, data_format_version_); + parallelism_, tablet_task_id_, data_format_version_, consumer_group_id_); int ObDDLBuildSingleReplicaRequestArg::assign(const ObDDLBuildSingleReplicaRequestArg &other) { @@ -6415,6 +6421,7 @@ int ObDDLBuildSingleReplicaRequestArg::assign(const ObDDLBuildSingleReplicaReque execution_id_ = other.execution_id_; tablet_task_id_ = other.tablet_task_id_; data_format_version_ = other.data_format_version_; + consumer_group_id_ = other.consumer_group_id_; return ret; } diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 02c1c95288..ed2d79b15d 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -222,7 +222,8 @@ public: sync_from_primary_(false), based_schema_object_infos_(), parallelism_(0), - task_id_(0) + task_id_(0), + consumer_group_id_(0) { } virtual ~ObDDLArg() = default; bool is_need_check_based_schema_objects() const @@ -248,9 +249,10 @@ public: based_schema_object_infos_.reset(); parallelism_ = 0; task_id_ = 0; + consumer_group_id_ = 0; } TO_STRING_KV(K_(ddl_stmt_str), K_(exec_tenant_id), K_(ddl_id_str), K_(sync_from_primary), K_(based_schema_object_infos), - K_(parallelism), K_(task_id)); + K_(parallelism), K_(task_id), K_(consumer_group_id)); common::ObString ddl_stmt_str_; uint64_t exec_tenant_id_; @@ -259,6 +261,7 @@ public: common::ObSArray based_schema_object_infos_; int64_t parallelism_; int64_t task_id_; + int64_t consumer_group_id_; }; struct ObAlterResourceUnitArg : public ObDDLArg @@ -1743,6 +1746,7 @@ public: TO_STRING_KV(K_(exec_tenant_id), K_(tenant_id), K_(table_id), + K_(consumer_group_id), K_(dest_tenant_id), K_(session_id), K_(parallelism), @@ -1755,6 +1759,7 @@ public: ObDDLArg(), tenant_id_(common::OB_INVALID_ID), table_id_(common::OB_INVALID_ID), + consumer_group_id_(0), dest_tenant_id_(common::OB_INVALID_ID), session_id_(common::OB_INVALID_ID), ddl_type_(share::DDL_INVALID), @@ -1772,6 +1777,7 @@ public: { tenant_id_ = common::OB_INVALID_ID; table_id_ = common::OB_INVALID_ID; + consumer_group_id_ = 0; dest_tenant_id_ = common::OB_INVALID_ID; session_id_ = common::OB_INVALID_ID; ddl_type_ = share::DDL_INVALID; @@ -1782,6 +1788,7 @@ public: public: uint64_t tenant_id_; int64_t table_id_; + int64_t consumer_group_id_; uint64_t dest_tenant_id_; uint64_t session_id_; uint64_t parallelism_; @@ -2327,6 +2334,7 @@ public: nls_timestamp_tz_format_ = other.nls_timestamp_tz_format_; sql_mode_ = other.sql_mode_; inner_sql_exec_addr_ = other.inner_sql_exec_addr_; + consumer_group_id_ = other.consumer_group_id_; } return ret; } @@ -7814,16 +7822,18 @@ public: ObDDLBuildSingleReplicaRequestArg() : tenant_id_(OB_INVALID_ID), ls_id_(), source_tablet_id_(), dest_tablet_id_(), source_table_id_(OB_INVALID_ID), dest_schema_id_(OB_INVALID_ID), schema_version_(0), snapshot_version_(0), ddl_type_(0), task_id_(0), - parallelism_(0), execution_id_(-1), tablet_task_id_(0), data_format_version_(0) {} + parallelism_(0), execution_id_(-1), tablet_task_id_(0), data_format_version_(0), + consumer_group_id_(0) {} bool is_valid() const { return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && source_tablet_id_.is_valid() && dest_tablet_id_.is_valid() && OB_INVALID_ID != source_table_id_ && OB_INVALID_ID != dest_schema_id_ && schema_version_ > 0 && snapshot_version_ > 0 - && task_id_ > 0 && parallelism_ > 0 && tablet_task_id_ > 0 && data_format_version_ > 0; + && task_id_ > 0 && parallelism_ > 0 && tablet_task_id_ > 0 && data_format_version_ > 0 && consumer_group_id_ >= 0; } int assign(const ObDDLBuildSingleReplicaRequestArg &other); TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(source_tablet_id), K_(dest_tablet_id), K_(source_table_id), K_(dest_schema_id), K_(schema_version), K_(snapshot_version), - K_(task_id), K_(parallelism), K_(execution_id), K_(tablet_task_id), K_(data_format_version)); + K_(task_id), K_(parallelism), K_(execution_id), K_(tablet_task_id), K_(data_format_version), + K_(consumer_group_id)); public: uint64_t tenant_id_; share::ObLSID ls_id_; @@ -7839,6 +7849,7 @@ public: int64_t execution_id_; int64_t tablet_task_id_; int64_t data_format_version_; + int64_t consumer_group_id_; }; struct ObDDLBuildSingleReplicaRequestResult final diff --git a/src/share/scheduler/ob_dag_scheduler.cpp b/src/share/scheduler/ob_dag_scheduler.cpp index 2c22f21b20..7d7abed943 100644 --- a/src/share/scheduler/ob_dag_scheduler.cpp +++ b/src/share/scheduler/ob_dag_scheduler.cpp @@ -365,6 +365,7 @@ ObIDag::ObIDag(const ObDagType::ObDagTypeEnum type) dag_ret_(OB_SUCCESS), add_time_(0), start_time_(0), + consumer_group_id_(0), is_inited_(false), type_(type), priority_(OB_DAG_TYPES[type].init_dag_prio_), @@ -416,6 +417,7 @@ void ObIDag::clear_running_info() { add_time_ = 0; start_time_ = 0; + consumer_group_id_ = 0; running_task_cnt_ = 0; dag_status_ = ObDagStatus::DAG_STATUS_INITING; dag_ret_ = OB_SUCCESS; @@ -718,7 +720,7 @@ int64_t ObIDag::to_string(char *buf, const int64_t buf_len) const } else { J_OBJ_START(); J_KV(KP(this), K_(type), "name", get_dag_type_str(type_), K_(id), K_(dag_ret), K_(dag_status), - K_(start_time), K_(running_task_cnt), K_(indegree), "hash", hash()); + K_(start_time), K_(running_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash()); J_OBJ_END(); } return pos; @@ -1290,7 +1292,7 @@ ObTenantDagWorker::ObTenantDagWorker() check_period_(0), last_check_time_(0), function_type_(0), - group_id_(-1), + group_id_(INT64_MAX), tg_id_(-1), is_inited_(false) { @@ -1351,7 +1353,7 @@ void ObTenantDagWorker::destroy() check_period_ = 0; last_check_time_ = 0; function_type_ = 0; - group_id_ = -1; + group_id_ = INT64_MAX; self_ = NULL; is_inited_ = false; TG_DESTROY(tg_id_); @@ -1370,21 +1372,28 @@ void ObTenantDagWorker::resume() notify(DWS_RUNNABLE); } -int ObTenantDagWorker::set_dag_resource() +int ObTenantDagWorker::set_dag_resource(const uint64_t group_id) { int ret = OB_SUCCESS; - uint64_t group_id = 0; if (nullptr == GCTX.cgroup_ctrl_ || OB_UNLIKELY(!GCTX.cgroup_ctrl_->is_valid())) { //invalid cgroup, cannot bind thread and control resource - } else if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_function_type(MTL_ID(), function_type_, group_id))) { - LOG_WARN("fail to get group id by function", K(ret), K(MTL_ID()), K(function_type_), K(group_id)); - } else if (group_id == group_id_) { - // group not change, do nothing - } else if (OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_group(MTL_ID(), group_id))) { - LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(MTL_ID()), K(group_id)); - } else { - ATOMIC_SET(&group_id_, group_id); - THIS_WORKER.set_group_id(static_cast(group_id)); + } else { + uint64_t consumer_group_id = 0; + if (group_id != 0) { + //user level + consumer_group_id = group_id; + } else if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_function_type(MTL_ID(), function_type_, consumer_group_id))) { + //function level + LOG_WARN("fail to get group id by function", K(ret), K(MTL_ID()), K(function_type_), K(consumer_group_id)); + } + if (OB_SUCC(ret) && consumer_group_id != group_id_) { + if (OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_group(MTL_ID(), group_id))) { + LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(MTL_ID()), K(group_id)); + } else { + ATOMIC_SET(&group_id_, group_id); + THIS_WORKER.set_group_id(static_cast(group_id)); + } + } } return ret; } @@ -1417,7 +1426,7 @@ void ObTenantDagWorker::run1() COMMON_LOG(WARN, "invalid compat mode", K(ret), K(*dag)); } else { THIS_WORKER.set_compatibility_mode(compat_mode); - if (OB_FAIL(set_dag_resource())) { + if (OB_FAIL(set_dag_resource(dag->get_consumer_group_id()))) { LOG_WARN("isolate dag CPU and IOPS failed", K(ret)); } else if (OB_FAIL(task_->do_work())) { if (!dag->ignore_warning()) { diff --git a/src/share/scheduler/ob_dag_scheduler.h b/src/share/scheduler/ob_dag_scheduler.h index 4d8d9148a9..50ab1af14e 100644 --- a/src/share/scheduler/ob_dag_scheduler.h +++ b/src/share/scheduler/ob_dag_scheduler.h @@ -350,12 +350,14 @@ public: virtual int create_first_task() { return common::OB_SUCCESS; } virtual int fill_dag_key(char *buf, const int64_t buf_len) const = 0; virtual lib::Worker::CompatMode get_compat_mode() const = 0; + virtual uint64_t get_consumer_group_id() const = 0; int remove_task(ObITask &task); protected: int dag_ret_; int64_t add_time_; int64_t start_time_; + uint64_t consumer_group_id_; private: typedef common::ObDList TaskList; @@ -707,7 +709,7 @@ public: void yield(); void set_task(ObITask *task) { task_ = task; } void set_function_type(const int64_t function_type) { function_type_ = function_type; } - int set_dag_resource(); + int set_dag_resource(const uint64_t group_id); bool need_wake_up() const; ObITask *get_task() const { return task_; } static ObTenantDagWorker *self() { return self_; } diff --git a/src/sql/engine/cmd/ob_database_executor.cpp b/src/sql/engine/cmd/ob_database_executor.cpp index 45f385b633..cae9d12f82 100644 --- a/src/sql/engine/cmd/ob_database_executor.cpp +++ b/src/sql/engine/cmd/ob_database_executor.cpp @@ -44,11 +44,13 @@ int ObCreateDatabaseExecutor::execute(ObExecContext &ctx, ObCreateDatabaseStmt & ObTaskExecutorCtx *task_exec_ctx = NULL; obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; const obrpc::ObCreateDatabaseArg &create_database_arg = stmt.get_create_database_arg(); + obrpc::ObCreateDatabaseArg &tmp_arg = const_cast(create_database_arg); ObString first_stmt; if (OB_FAIL(stmt.get_first_stmt(first_stmt))) { SQL_ENG_LOG(WARN, "fail to get first stmt" , K(ret)); } else { - const_cast(create_database_arg).ddl_stmt_str_ = first_stmt; + tmp_arg.ddl_stmt_str_ = first_stmt; + tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); } if (OB_FAIL(ret)) { } else if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) { @@ -135,6 +137,7 @@ int ObAlterDatabaseExecutor::execute(ObExecContext &ctx, ObAlterDatabaseStmt &st ObTaskExecutorCtx *task_exec_ctx = NULL; obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; const obrpc::ObAlterDatabaseArg &alter_database_arg = stmt.get_alter_database_arg(); + obrpc::ObAlterDatabaseArg &tmp_arg = const_cast(alter_database_arg); ObString first_stmt; ObSQLSessionInfo *session = NULL; if (OB_ISNULL(session = ctx.get_my_session())) { @@ -143,7 +146,8 @@ int ObAlterDatabaseExecutor::execute(ObExecContext &ctx, ObAlterDatabaseStmt &st } else if (OB_FAIL(stmt.get_first_stmt(first_stmt))) { SQL_ENG_LOG(WARN, "fail to get first stmt" , K(ret)); } else { - const_cast(alter_database_arg).ddl_stmt_str_ = first_stmt; + tmp_arg.ddl_stmt_str_ = first_stmt; + tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); } if (OB_FAIL(ret)) { } else if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) { @@ -183,11 +187,13 @@ int ObDropDatabaseExecutor::execute(ObExecContext &ctx, ObDropDatabaseStmt &stmt ObTaskExecutorCtx *task_exec_ctx = NULL; obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; const obrpc::ObDropDatabaseArg &drop_database_arg = stmt.get_drop_database_arg(); + obrpc::ObDropDatabaseArg &tmp_arg = const_cast(drop_database_arg); ObString first_stmt; if (OB_FAIL(stmt.get_first_stmt(first_stmt))) { SQL_ENG_LOG(WARN, "fail to get first stmt" , K(ret)); } else { - const_cast(drop_database_arg).ddl_stmt_str_ = first_stmt; + tmp_arg.ddl_stmt_str_ = first_stmt; + tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); } if (OB_FAIL(ret)) { } else if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) { @@ -253,13 +259,15 @@ int ObFlashBackDatabaseExecutor::execute(ObExecContext &ctx, ObFlashBackDatabase { int ret = OB_SUCCESS; const obrpc::ObFlashBackDatabaseArg &flashback_database_arg = stmt.get_flashback_database_arg(); + obrpc::ObFlashBackDatabaseArg &tmp_arg = const_cast(flashback_database_arg); ObTaskExecutorCtx *task_exec_ctx = NULL; obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; ObString first_stmt; if (OB_FAIL(stmt.get_first_stmt(first_stmt))) { SQL_ENG_LOG(WARN, "fail to get first stmt" , K(ret)); } else { - const_cast(flashback_database_arg).ddl_stmt_str_ = first_stmt; + tmp_arg.ddl_stmt_str_ = first_stmt; + tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); } if (OB_FAIL(ret)) { } else if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) { @@ -280,13 +288,15 @@ int ObPurgeDatabaseExecutor::execute(ObExecContext &ctx, ObPurgeDatabaseStmt &st { int ret = OB_SUCCESS; const obrpc::ObPurgeDatabaseArg &purge_database_arg = stmt.get_purge_database_arg(); + obrpc::ObPurgeDatabaseArg &tmp_arg = const_cast(purge_database_arg); ObTaskExecutorCtx *task_exec_ctx = NULL; obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; ObString first_stmt; if (OB_FAIL(stmt.get_first_stmt(first_stmt))) { SQL_ENG_LOG(WARN, "fail to get first stmt" , K(ret)); } else { - const_cast(purge_database_arg).ddl_stmt_str_ = first_stmt; + tmp_arg.ddl_stmt_str_ = first_stmt; + tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); } if (OB_FAIL(ret)) { } else if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) { diff --git a/src/sql/engine/cmd/ob_index_executor.cpp b/src/sql/engine/cmd/ob_index_executor.cpp index 13678a96e9..d073b03bde 100644 --- a/src/sql/engine/cmd/ob_index_executor.cpp +++ b/src/sql/engine/cmd/ob_index_executor.cpp @@ -81,6 +81,7 @@ int ObCreateIndexExecutor::execute(ObExecContext &ctx, ObCreateIndexStmt &stmt) //impossible } else if (FALSE_IT(create_index_arg.is_inner_ = my_session->is_inner())) { } else if (FALSE_IT(create_index_arg.parallelism_ = stmt.get_parallelism())) { + } else if (FALSE_IT(create_index_arg.consumer_group_id_ = THIS_WORKER.get_group_id())) { } else if (OB_FAIL(common_rpc_proxy->create_index(create_index_arg, res))) { //send the signal of creating index to rs LOG_WARN("rpc proxy create index failed", K(create_index_arg), "dst", common_rpc_proxy->get_server(), K(ret)); @@ -336,14 +337,16 @@ int ObDropIndexExecutor::execute(ObExecContext &ctx, ObDropIndexStmt &stmt) ObTaskExecutorCtx *task_exec_ctx = NULL; obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; const obrpc::ObDropIndexArg &drop_index_arg = stmt.get_drop_index_arg(); + obrpc::ObDropIndexArg &tmp_arg = const_cast(drop_index_arg); ObSQLSessionInfo *my_session = ctx.get_my_session(); ObString first_stmt; ObDropIndexRes res; - const_cast(drop_index_arg).is_add_to_scheduler_ = true; + tmp_arg.is_add_to_scheduler_ = true; if (OB_FAIL(stmt.get_first_stmt(first_stmt))) { LOG_WARN("fail to get first stmt" , K(ret)); } else { - const_cast(drop_index_arg).ddl_stmt_str_ = first_stmt; + tmp_arg.ddl_stmt_str_ = first_stmt; + } if (OB_FAIL(ret)) { } else if (NULL == my_session) { @@ -358,8 +361,9 @@ int ObDropIndexExecutor::execute(ObExecContext &ctx, ObDropIndexStmt &stmt) ret = OB_ERR_UNEXPECTED; LOG_WARN("common rpc proxy should not be null", K(ret)); } else if (OB_INVALID_ID == drop_index_arg.session_id_ - && FALSE_IT(const_cast(drop_index_arg).session_id_ = my_session->get_sessid_for_table())) { + && FALSE_IT(tmp_arg.session_id_ = my_session->get_sessid_for_table())) { //impossible + } else if (FALSE_IT(tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id())) { } else if (OB_FAIL(common_rpc_proxy->drop_index(drop_index_arg, res))) { LOG_WARN("rpc proxy drop index failed", "dst", common_rpc_proxy->get_server(), K(ret)); } else if (OB_FAIL(wait_drop_index_finish(res.tenant_id_, res.task_id_, *my_session))) { diff --git a/src/sql/engine/cmd/ob_table_executor.cpp b/src/sql/engine/cmd/ob_table_executor.cpp index ff3f09c90f..6192e677ed 100644 --- a/src/sql/engine/cmd/ob_table_executor.cpp +++ b/src/sql/engine/cmd/ob_table_executor.cpp @@ -459,6 +459,7 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt) LOG_WARN("get first statement failed", K(ret)); } else { create_table_arg.is_inner_ = my_session->is_inner(); + create_table_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); const_cast(create_table_arg).ddl_stmt_str_ = first_stmt; if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) { ret = OB_NOT_INIT; @@ -779,6 +780,7 @@ int ObAlterTableExecutor::execute(ObExecContext &ctx, ObAlterTableStmt &stmt) } else if (FALSE_IT(alter_table_arg.sql_mode_ = my_session->get_sql_mode())) { // do nothing } else if (FALSE_IT(alter_table_arg.parallelism_ = stmt.get_parallelism())) { + } else if (FALSE_IT(alter_table_arg.consumer_group_id_ = THIS_WORKER.get_group_id())) { } else if (OB_FAIL(check_alter_partition(ctx, stmt, alter_table_arg))) { LOG_WARN("check alter partition failed", K(ret)); } else if (OB_FAIL(alter_table_arg.alter_table_schema_.check_if_oracle_compat_mode(is_oracle_mode))) { @@ -1586,6 +1588,7 @@ int ObDropTableExecutor::execute(ObExecContext &ctx, ObDropTableStmt &stmt) obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; obrpc::ObDDLRes res; const obrpc::ObDropTableArg &drop_table_arg = stmt.get_drop_table_arg(); + obrpc::ObDropTableArg &tmp_arg = const_cast(drop_table_arg); ObString first_stmt; ObSQLSessionInfo *my_session = NULL; int64_t foreign_key_checks = 0; @@ -1593,7 +1596,8 @@ int ObDropTableExecutor::execute(ObExecContext &ctx, ObDropTableStmt &stmt) LOG_WARN("get first statement failed", K(ret)); } else { int64_t affected_rows = 0; - const_cast(drop_table_arg).ddl_stmt_str_ = first_stmt; + tmp_arg.ddl_stmt_str_ = first_stmt; + tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); my_session = ctx.get_my_session(); if (NULL == my_session) { ret = OB_ERR_UNEXPECTED; @@ -1607,12 +1611,11 @@ int ObDropTableExecutor::execute(ObExecContext &ctx, ObDropTableStmt &stmt) ret = OB_ERR_UNEXPECTED; LOG_WARN("common rpc proxy should not be null", K(ret)); } else if (OB_INVALID_ID == drop_table_arg.session_id_ - && FALSE_IT(const_cast(drop_table_arg).session_id_ = my_session->get_sessid_for_table())) { + && FALSE_IT(tmp_arg.session_id_ = my_session->get_sessid_for_table())) { //impossible } else if (FALSE_IT(my_session->get_foreign_key_checks(foreign_key_checks))) { - } else if (FALSE_IT(const_cast(drop_table_arg).foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks))) { - } else if (FALSE_IT(const_cast(drop_table_arg).compat_mode_ = - ORACLE_MODE == my_session->get_compatibility_mode() ? + } else if (FALSE_IT(tmp_arg.foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks))) { + } else if (FALSE_IT(tmp_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ? lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL)) { } else if (OB_FAIL(common_rpc_proxy->drop_table(drop_table_arg, res))) { LOG_WARN("rpc proxy drop table failed", K(ret), "dst", common_rpc_proxy->get_server()); @@ -1670,13 +1673,15 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st { int ret = OB_SUCCESS; const obrpc::ObTruncateTableArg &truncate_table_arg = stmt.get_truncate_table_arg(); + obrpc::ObTruncateTableArg &tmp_arg = const_cast(truncate_table_arg); ObString first_stmt; obrpc::ObDDLRes res; ObSQLSessionInfo *my_session = ctx.get_my_session(); if (OB_FAIL(stmt.get_first_stmt(first_stmt))) { LOG_WARN("get first statement failed", K(ret)); } else { - const_cast(truncate_table_arg).ddl_stmt_str_ = first_stmt; + tmp_arg.ddl_stmt_str_ = first_stmt; + tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); ObTaskExecutorCtx *task_exec_ctx = NULL; obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; @@ -1692,14 +1697,14 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st ret = OB_ERR_UNEXPECTED; LOG_WARN("failed to get my session", K(ret), K(ctx)); } else if (OB_INVALID_ID == truncate_table_arg.session_id_ - && FALSE_IT(const_cast(truncate_table_arg).session_id_ = my_session->get_sessid_for_table())) { + && FALSE_IT(tmp_arg.session_id_ = my_session->get_sessid_for_table())) { //impossible } else if (!stmt.is_truncate_oracle_temp_table()) { int64_t foreign_key_checks = 0; share::schema::ObSchemaGetterGuard schema_guard; my_session->get_foreign_key_checks(foreign_key_checks); - const_cast(truncate_table_arg).foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks); - const_cast(truncate_table_arg).compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() + tmp_arg.foreign_key_checks_ = is_oracle_mode() || (is_mysql_mode() && foreign_key_checks); + tmp_arg.compat_mode_ = ORACLE_MODE == my_session->get_compatibility_mode() ? lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL; int64_t affected_rows = 0; uint64_t compat_version = 0; @@ -1819,13 +1824,14 @@ int ObCreateTableLikeExecutor::execute(ObExecContext &ctx, ObCreateTableLikeStmt { int ret = OB_SUCCESS; const obrpc::ObCreateTableLikeArg &create_table_like_arg = stmt.get_create_table_like_arg(); + obrpc::ObCreateTableLikeArg &tmp_arg = const_cast(create_table_like_arg); ObString first_stmt; if (OB_FAIL(stmt.get_first_stmt(first_stmt))) { LOG_WARN("get first statement failed", K(ret)); } else { - const_cast(create_table_like_arg).ddl_stmt_str_ = first_stmt; - const_cast(create_table_like_arg).session_id_ = - ctx.get_my_session()->get_sessid_for_table(); + tmp_arg.ddl_stmt_str_ = first_stmt; + tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); + tmp_arg.session_id_ = ctx.get_my_session()->get_sessid_for_table(); ObTaskExecutorCtx *task_exec_ctx = NULL; obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) { @@ -1847,11 +1853,13 @@ int ObFlashBackTableFromRecyclebinExecutor::execute(ObExecContext &ctx, ObFlashB { int ret = OB_SUCCESS; const obrpc::ObFlashBackTableFromRecyclebinArg &flashback_table_arg = stmt.get_flashback_table_arg(); + obrpc::ObFlashBackTableFromRecyclebinArg &tmp_arg = const_cast(flashback_table_arg); ObString first_stmt; if (OB_FAIL(stmt.get_first_stmt(first_stmt))) { LOG_WARN("get first statement failed", K(ret)); } else { - const_cast(flashback_table_arg).ddl_stmt_str_ = first_stmt; + tmp_arg.ddl_stmt_str_ = first_stmt; + tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); ObTaskExecutorCtx *task_exec_ctx = NULL; obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) { @@ -1872,6 +1880,7 @@ int ObFlashBackTableFromRecyclebinExecutor::execute(ObExecContext &ctx, ObFlashB int ObFlashBackTableToScnExecutor::execute(ObExecContext &ctx, ObFlashBackTableToScnStmt &stmt) { int ret = OB_SUCCESS; obrpc::ObFlashBackTableToScnArg &arg = stmt.flashback_table_to_scn_arg_; + arg.consumer_group_id_ = THIS_WORKER.get_group_id(); RowDesc row_desc; ObTempExpr *temp_expr = NULL; CK(OB_NOT_NULL(ctx.get_sql_ctx())); @@ -1935,11 +1944,13 @@ int ObPurgeTableExecutor::execute(ObExecContext &ctx, ObPurgeTableStmt &stmt) { int ret = OB_SUCCESS; const obrpc::ObPurgeTableArg &purge_table_arg = stmt.get_purge_table_arg(); + obrpc::ObPurgeTableArg &tmp_arg = const_cast(purge_table_arg); ObString first_stmt; if (OB_FAIL(stmt.get_first_stmt(first_stmt))) { LOG_WARN("get first statement failed", K(ret)); } else { - const_cast(purge_table_arg).ddl_stmt_str_ = first_stmt; + tmp_arg.ddl_stmt_str_ = first_stmt; + tmp_arg.consumer_group_id_ = THIS_WORKER.get_group_id(); ObTaskExecutorCtx *task_exec_ctx = NULL; obrpc::ObCommonRpcProxy *common_rpc_proxy = NULL; @@ -1967,6 +1978,7 @@ int ObOptimizeTableExecutor::execute(ObExecContext &ctx, ObOptimizeTableStmt &st LOG_WARN("fail to get first stmt", K(ret)); } else { arg.ddl_stmt_str_ = first_stmt; + arg.consumer_group_id_ = THIS_WORKER.get_group_id(); ObTaskExecutorCtx *task_exec_ctx = nullptr; obrpc::ObCommonRpcProxy *common_rpc_proxy = nullptr; if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) { @@ -1997,6 +2009,7 @@ int ObOptimizeTenantExecutor::execute(ObExecContext &ctx, ObOptimizeTenantStmt & LOG_WARN("error unexpected, my session must not be NULL", K(ret)); } else { arg.ddl_stmt_str_ = first_stmt; + arg.consumer_group_id_ = THIS_WORKER.get_group_id(); ObTaskExecutorCtx *task_exec_ctx = nullptr; obrpc::ObCommonRpcProxy *common_rpc_proxy = nullptr; const observer::ObGlobalContext &gctx = observer::ObServer::get_instance().get_gctx(); @@ -2082,6 +2095,7 @@ int ObOptimizeAllExecutor::execute(ObExecContext &ctx, ObOptimizeAllStmt &stmt) LOG_WARN("fail to get first stmt", K(ret)); } else { arg.ddl_stmt_str_ = first_stmt; + arg.consumer_group_id_ = THIS_WORKER.get_group_id(); ObTaskExecutorCtx *task_exec_ctx = nullptr; obrpc::ObCommonRpcProxy *common_rpc_proxy = nullptr; ObSchemaGetterGuard schema_guard; diff --git a/src/storage/backup/ob_backup_task.h b/src/storage/backup/ob_backup_task.h index dd7d0f22cb..42fcac540c 100644 --- a/src/storage/backup/ob_backup_task.h +++ b/src/storage/backup/ob_backup_task.h @@ -243,6 +243,7 @@ public: virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual int64_t hash() const override; virtual lib::Worker::CompatMode get_compat_mode() const override; + virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited)); private: @@ -266,6 +267,7 @@ public: virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual int64_t hash() const override; virtual lib::Worker::CompatMode get_compat_mode() const override; + virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited)); private: @@ -296,6 +298,7 @@ public: virtual int64_t hash() const override; virtual bool check_can_schedule() override; virtual lib::Worker::CompatMode get_compat_mode() const override; + virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited)); private: @@ -321,6 +324,7 @@ public: virtual int fill_comment(char *buf, const int64_t buf_len) const override; virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual lib::Worker::CompatMode get_compat_mode() const override; + virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(param)); protected: @@ -366,6 +370,7 @@ public: virtual bool operator==(const ObIDag &other) const override; virtual int64_t hash() const override; virtual lib::Worker::CompatMode get_compat_mode() const override; + virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } private: int get_file_id_list_(common::ObIArray &file_id_list); @@ -397,6 +402,7 @@ public: virtual bool operator==(const ObIDag &other) const override; virtual int64_t hash() const override; virtual lib::Worker::CompatMode get_compat_mode() const override; + virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } private: bool is_inited_; diff --git a/src/storage/backup/ob_ls_backup_clean_mgr.h b/src/storage/backup/ob_ls_backup_clean_mgr.h index 63dad667b8..bbbf58ef3d 100644 --- a/src/storage/backup/ob_ls_backup_clean_mgr.h +++ b/src/storage/backup/ob_ls_backup_clean_mgr.h @@ -98,6 +98,8 @@ public: virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual lib::Worker::CompatMode get_compat_mode() const override { return lib::Worker::CompatMode::MYSQL; } + virtual uint64_t get_consumer_group_id() const override + { return consumer_group_id_; } int create_first_task(); INHERIT_TO_STRING_KV("ObIDag", ObIDag, KP(this), K_(param), K_(result)); diff --git a/src/storage/compaction/ob_tablet_merge_task.h b/src/storage/compaction/ob_tablet_merge_task.h index 0181e5db61..43f47adc20 100644 --- a/src/storage/compaction/ob_tablet_merge_task.h +++ b/src/storage/compaction/ob_tablet_merge_task.h @@ -218,7 +218,7 @@ public: int get_tablet_and_compat_mode(); virtual int64_t to_string(char* buf, const int64_t buf_len) const override; virtual lib::Worker::CompatMode get_compat_mode() const override { return compat_mode_; } - + virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } static int generate_merge_task( ObBasicTabletMergeDag &merge_dag, ObTabletMergeCtx &ctx, diff --git a/src/storage/ddl/ob_build_index_task.h b/src/storage/ddl/ob_build_index_task.h index 067d1fc552..36b1dd3b69 100644 --- a/src/storage/ddl/ob_build_index_task.h +++ b/src/storage/ddl/ob_build_index_task.h @@ -183,6 +183,8 @@ public: virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual lib::Worker::CompatMode get_compat_mode() const override { return compat_mode_; } + virtual uint64_t get_consumer_group_id() const override + { return consumer_group_id_; } private: bool is_inited_; uint64_t tenant_id_; diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index f96fe723d1..e9d579088d 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -331,6 +331,7 @@ int ObComplementDataDag::init(const ObDDLBuildSingleReplicaRequestArg &arg) ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected", K(ret), K(param_)); } else { + consumer_group_id_ = arg.consumer_group_id_; is_inited_ = true; } LOG_INFO("finish to init complement data dag", K(ret), K(param_)); diff --git a/src/storage/ddl/ob_complement_data_task.h b/src/storage/ddl/ob_complement_data_task.h index e34bdd7981..56e9011bee 100644 --- a/src/storage/ddl/ob_complement_data_task.h +++ b/src/storage/ddl/ob_complement_data_task.h @@ -150,6 +150,8 @@ public: int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual lib::Worker::CompatMode get_compat_mode() const override { return param_.compat_mode_; } + virtual uint64_t get_consumer_group_id() const override + { return consumer_group_id_; } virtual int create_first_task() override; // report replica build status to RS. int report_replica_build_status(); diff --git a/src/storage/ddl/ob_ddl_merge_task.h b/src/storage/ddl/ob_ddl_merge_task.h index 7d490ed336..61ff58ae8a 100644 --- a/src/storage/ddl/ob_ddl_merge_task.h +++ b/src/storage/ddl/ob_ddl_merge_task.h @@ -84,6 +84,8 @@ public: virtual bool ignore_warning() override; virtual lib::Worker::CompatMode get_compat_mode() const override { return compat_mode_; } + virtual uint64_t get_consumer_group_id() const override + { return consumer_group_id_; } private: bool is_inited_; ObDDLTableMergeDagParam ddl_param_; diff --git a/src/storage/high_availability/ob_ls_remove_member_dag.h b/src/storage/high_availability/ob_ls_remove_member_dag.h index 0ec2169888..e537d2aafc 100644 --- a/src/storage/high_availability/ob_ls_remove_member_dag.h +++ b/src/storage/high_availability/ob_ls_remove_member_dag.h @@ -74,6 +74,7 @@ public: virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual int64_t to_string(char* buf, const int64_t buf_len) const override; virtual lib::Worker::CompatMode get_compat_mode() const override { return lib::Worker::CompatMode::MYSQL; } + virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } virtual int init_by_param(const share::ObIDagInitParam *param) override; virtual int create_first_task() override; protected: diff --git a/src/storage/high_availability/ob_storage_ha_dag.h b/src/storage/high_availability/ob_storage_ha_dag.h index 470a6ea5ae..3a929e2018 100644 --- a/src/storage/high_availability/ob_storage_ha_dag.h +++ b/src/storage/high_availability/ob_storage_ha_dag.h @@ -145,6 +145,8 @@ public: virtual int report_result(); virtual lib::Worker::CompatMode get_compat_mode() const override { return compat_mode_; } + virtual uint64_t get_consumer_group_id() const override + { return consumer_group_id_; } ObStorageHADagType get_sub_type() const { return sub_type_; } ObIHADagNetCtx *get_ha_dag_net_ctx() const { return ha_dag_net_ctx_; } diff --git a/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp b/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp index 31be0dfdff..c44adc0845 100644 --- a/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp +++ b/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp @@ -135,6 +135,8 @@ public: virtual int fill_dag_key(char *buf,const int64_t size) const override { UNUSEDx(buf, size); return OB_SUCCESS; } virtual lib::Worker::CompatMode get_compat_mode() const override { return lib::Worker::CompatMode::MYSQL; } + virtual uint64_t get_consumer_group_id() const override + { return consumer_group_id_; } INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(type), K_(id), K(task_list_.get_size()), K_(dag_ret)); diff --git a/unittest/share/scheduler/test_dag_scheduler.cpp b/unittest/share/scheduler/test_dag_scheduler.cpp index bbf6c63844..19f48f486f 100644 --- a/unittest/share/scheduler/test_dag_scheduler.cpp +++ b/unittest/share/scheduler/test_dag_scheduler.cpp @@ -438,6 +438,8 @@ public: } virtual lib::Worker::CompatMode get_compat_mode() const override { return lib::Worker::CompatMode::MYSQL; } + virtual uint64_t get_consumer_group_id() const override + { return consumer_group_id_; } VIRTUAL_TO_STRING_KV(K_(is_inited), K_(type), K_(id), K(task_list_.get_size())); protected: int64_t id_;