diff --git a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp index 6846bb33e8..d447eb724b 100644 --- a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp @@ -97,7 +97,7 @@ int ObColumnRedefinitionTask::init(const ObDDLTaskRecord &task_record) } else if (!task_record.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret)); - } else if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) { + } else if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) { LOG_WARN("deserialize params from message failed", K(ret)); } else if (OB_FAIL(set_ddl_stmt_str(task_record.ddl_stmt_str_))) { LOG_WARN("set ddl stmt str failed", K(ret)); @@ -494,17 +494,19 @@ int ObColumnRedefinitionTask::serialize_params_to_message(char *buf, const int64 return ret; } -int ObColumnRedefinitionTask::deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos) +int ObColumnRedefinitionTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos) { int ret = OB_SUCCESS; obrpc::ObAlterTableArg tmp_arg; - if (OB_UNLIKELY(nullptr == buf || data_len <= 0)) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(buf), K(data_len)); + LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len)); } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) { LOG_WARN("fail to deserialize task version", K(ret)); } else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) { LOG_WARN("serialize table failed", K(ret)); + } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { + LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) { LOG_WARN("deep copy table arg failed", K(ret)); } else { diff --git a/src/rootserver/ddl_task/ob_column_redefinition_task.h b/src/rootserver/ddl_task/ob_column_redefinition_task.h index d2c6b3141d..5013bf4dcb 100644 --- a/src/rootserver/ddl_task/ob_column_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_column_redefinition_task.h @@ -63,7 +63,7 @@ private: int copy_table_constraints(); int copy_table_foreign_keys(); virtual int serialize_params_to_message(char *buf, const int64_t buf_len, int64_t &pos) const override; - virtual int deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos) override; + virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos) override; virtual int64_t get_serialize_param_size() const override; private: static const int64_t OB_COLUMN_REDEFINITION_TASK_VERSION = 1L; diff --git a/src/rootserver/ddl_task/ob_constraint_task.cpp b/src/rootserver/ddl_task/ob_constraint_task.cpp index a472b554b8..40556791f0 100644 --- a/src/rootserver/ddl_task/ob_constraint_task.cpp +++ b/src/rootserver/ddl_task/ob_constraint_task.cpp @@ -550,7 +550,7 @@ int ObConstraintTask::init(const ObDDLTaskRecord &task_record) } else if (OB_ISNULL(root_service)) { ret = OB_ERR_SYS; LOG_WARN("error sys, root service must not be nullptr", K(ret)); - } else if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) { + } else if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) { LOG_WARN("deserialize params from message failed", K(ret)); } else { object_id_ = table_id; @@ -1819,17 +1819,19 @@ int ObConstraintTask::serialize_params_to_message(char *buf, const int64_t buf_l return ret; } -int ObConstraintTask::deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos) +int ObConstraintTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos) { int ret = OB_SUCCESS; ObAlterTableArg tmp_arg; - if (OB_UNLIKELY(nullptr == buf || data_len <= 0)) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(buf), K(data_len)); + LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len)); } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) { LOG_WARN("fail to deserialize task version", K(ret)); } else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) { LOG_WARN("serialize table failed", K(ret)); + } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { + LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) { LOG_WARN("deep copy table arg failed", K(ret)); } diff --git a/src/rootserver/ddl_task/ob_constraint_task.h b/src/rootserver/ddl_task/ob_constraint_task.h index b5efd723c3..5c36075c67 100644 --- a/src/rootserver/ddl_task/ob_constraint_task.h +++ b/src/rootserver/ddl_task/ob_constraint_task.h @@ -104,7 +104,7 @@ public: virtual int process() override; int update_check_constraint_finish(const int ret_code); virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override; - virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override; + virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override; virtual int64_t get_serialize_param_size() const override; virtual void flt_set_task_span_tag() const override; virtual void flt_set_status_span_tag() const override; diff --git a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp index 9729df4d75..ed59a0cfe5 100644 --- a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp @@ -216,7 +216,7 @@ int ObDDLRetryTask::init(const ObDDLTaskRecord &task_record) is_schema_change_done_ = false; // do not worry about it, check_schema_change_done will correct it. if (nullptr != task_record.message_) { int64_t pos = 0; - if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) { + if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) { LOG_WARN("fail to deserialize params from message", K(ret)); } } @@ -614,18 +614,20 @@ int ObDDLRetryTask::serialize_params_to_message(char *buf, const int64_t buf_siz return ret; } -int ObDDLRetryTask::deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) +int ObDDLRetryTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_size)); + LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(buf_size)); } else if (OB_FAIL(serialization::decode_i64(buf, buf_size, pos, &task_version_))) { LOG_WARN("fail to deserialize task version", K(ret)); } else if (ObDDLType::DDL_DROP_DATABASE == task_type_) { obrpc::ObDropDatabaseArg tmp_arg; if (OB_FAIL(tmp_arg.deserialize(buf, buf_size, pos))) { LOG_WARN("serialize table failed", K(ret)); + } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { + LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_ddl_arg(allocator_, task_type_, &tmp_arg))) { LOG_WARN("deep copy table arg failed", K(ret)); } @@ -633,6 +635,8 @@ int ObDDLRetryTask::deserlize_params_from_message(const char *buf, const int64_t obrpc::ObDropTableArg tmp_arg; if (OB_FAIL(tmp_arg.deserialize(buf, buf_size, pos))) { LOG_WARN("serialize table failed", K(ret)); + } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { + LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_ddl_arg(allocator_, task_type_, &tmp_arg))) { LOG_WARN("deep copy table arg failed", K(ret)); } @@ -640,6 +644,8 @@ int ObDDLRetryTask::deserlize_params_from_message(const char *buf, const int64_t obrpc::ObTruncateTableArg tmp_arg; if (OB_FAIL(tmp_arg.deserialize(buf, buf_size, pos))) { LOG_WARN("serialize table failed", K(ret)); + } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { + LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_ddl_arg(allocator_, task_type_, &tmp_arg))) { LOG_WARN("deep copy table arg failed", K(ret)); } @@ -650,6 +656,8 @@ int ObDDLRetryTask::deserlize_params_from_message(const char *buf, const int64_t obrpc::ObAlterTableArg tmp_arg; if (OB_FAIL(tmp_arg.deserialize(buf, buf_size, pos))) { LOG_WARN("serialize table failed", K(ret)); + } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { + LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_ddl_arg(allocator_, task_type_, &tmp_arg))) { LOG_WARN("deep copy table arg failed", K(ret)); } diff --git a/src/rootserver/ddl_task/ob_ddl_retry_task.h b/src/rootserver/ddl_task/ob_ddl_retry_task.h index 8f616890e6..c7f8594778 100644 --- a/src/rootserver/ddl_task/ob_ddl_retry_task.h +++ b/src/rootserver/ddl_task/ob_ddl_retry_task.h @@ -39,7 +39,7 @@ public: virtual int process() override; virtual bool is_valid() const override; virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override; - virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override; + virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override; virtual int64_t get_serialize_param_size() const override; static int update_task_status_wait_child_task_finish( common::ObMySQLTransaction &trans, diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index 38e8b49950..9a10e8a713 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -1524,6 +1524,18 @@ int ObDDLScheduler::insert_task_record( return ret; } +static bool is_tenant_primary(const ObIArray &primary_tenant_ids, const uint64_t tenant_id) +{ + bool is_primary = false; + for (int64_t i = 0; i < primary_tenant_ids.count(); ++i) { + if (primary_tenant_ids.at(i) == tenant_id) { + is_primary = true; + break; + } + } + return is_primary; +} + int ObDDLScheduler::recover_task() { int ret = OB_SUCCESS; @@ -1533,12 +1545,15 @@ int ObDDLScheduler::recover_task() } else { ObSqlString sql_string; ObArray task_records; + ObArray primary_tenant_ids; ObArenaAllocator allocator(lib::ObLabel("DdlTasRecord")); share::schema::ObMultiVersionSchemaService &schema_service = root_service_->get_schema_service(); if (OB_FAIL(ObDDLTaskRecordOperator::get_all_ddl_task_record(root_service_->get_sql_proxy(), allocator, task_records))) { LOG_WARN("get task record failed", K(ret), K(sql_string)); + } else if (OB_FAIL(ObAllTenantInfoProxy::get_primary_tenant_ids(&root_service_->get_sql_proxy(), primary_tenant_ids))) { + LOG_WARN("get primary tenant id failed", K(ret)); } - LOG_INFO("start processing ddl recovery", K(task_records)); + LOG_INFO("start processing ddl recovery", K(task_records), K(primary_tenant_ids)); for (int64_t i = 0; OB_SUCC(ret) && i < task_records.count(); ++i) { const ObDDLTaskRecord &cur_record = task_records.at(i); int64_t tenant_schema_version = 0; @@ -1547,8 +1562,11 @@ int ObDDLScheduler::recover_task() ObMySQLTransaction trans; if (OB_FAIL(schema_service.get_tenant_schema_version(cur_record.tenant_id_, tenant_schema_version))) { LOG_WARN("failed to get tenant schema version", K(ret), K(cur_record)); + } else if (!is_tenant_primary(primary_tenant_ids, cur_record.tenant_id_)) { + LOG_INFO("tenant not primary, skip schedule ddl task", K(cur_record)); } else if (tenant_schema_version < cur_record.schema_version_) { // schema has not publish, by pass now + LOG_INFO("skip schedule ddl task, because tenant schema version too old", K(tenant_schema_version), K(cur_record)); } else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), cur_record.tenant_id_))) { LOG_WARN("start transaction failed", K(ret)); } else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, @@ -1616,21 +1634,14 @@ int ObDDLScheduler::schedule_ddl_task(const ObDDLTaskRecord &record) LOG_WARN("ddl task record is invalid", K(ret), K(record)); } else { switch (record.ddl_type_) { - case ObDDLType::DDL_CREATE_INDEX: { - if (OB_FAIL(schedule_build_index_task(record))) { - LOG_WARN("schedule global index task failed", K(ret), K(record)); - } + case ObDDLType::DDL_CREATE_INDEX: + ret = schedule_build_index_task(record); break; - } case ObDDLType::DDL_DROP_INDEX: - if (OB_FAIL(schedule_drop_index_task(record))) { - LOG_WARN("schedule drop index task failed", K(ret)); - } + ret = schedule_drop_index_task(record); break; case DDL_DROP_PRIMARY_KEY: - if (OB_FAIL(schedule_drop_primary_key_task(record))) { - LOG_WARN("schedule drop primary key task failed", K(ret)); - } + ret = schedule_drop_primary_key_task(record); break; case DDL_MODIFY_COLUMN: case DDL_ADD_PRIMARY_KEY: @@ -1639,29 +1650,20 @@ int ObDDLScheduler::schedule_ddl_task(const ObDDLTaskRecord &record) case DDL_CONVERT_TO_CHARACTER: case DDL_TABLE_REDEFINITION: case DDL_DIRECT_LOAD: - if (OB_FAIL(schedule_table_redefinition_task(record))) { - LOG_WARN("schedule table redefinition task failed", K(ret)); - } + ret = schedule_table_redefinition_task(record); break; case DDL_DROP_COLUMN: case DDL_ADD_COLUMN_OFFLINE: case DDL_COLUMN_REDEFINITION: - if(OB_FAIL(schedule_column_redefinition_task(record))) { - LOG_WARN("schedule column redefinition task failed", K(ret)); - } + ret = schedule_column_redefinition_task(record); break; - case DDL_CHECK_CONSTRAINT: case DDL_FOREIGN_KEY_CONSTRAINT: case DDL_ADD_NOT_NULL_COLUMN: - if (OB_FAIL(schedule_constraint_task(record))) { - LOG_WARN("schedule constraint task failed", K(ret)); - } + ret = schedule_constraint_task(record); break; case DDL_MODIFY_AUTO_INCREMENT: - if (OB_FAIL(schedule_modify_autoinc_task(record))) { - LOG_WARN("schedule modify autoinc task failed", K(ret)); - } + ret = schedule_modify_autoinc_task(record); break; case DDL_DROP_DATABASE: case DDL_DROP_TABLE: @@ -1670,9 +1672,7 @@ int ObDDLScheduler::schedule_ddl_task(const ObDDLTaskRecord &record) case DDL_DROP_SUB_PARTITION: case DDL_TRUNCATE_PARTITION: case DDL_TRUNCATE_SUB_PARTITION: - if (OB_FAIL(schedule_ddl_retry_task(record))) { - LOG_WARN("schedule ddl retry task failed", K(ret)); - } + ret = schedule_ddl_retry_task(record); break; default: { ret = OB_NOT_SUPPORTED; @@ -1681,6 +1681,9 @@ int ObDDLScheduler::schedule_ddl_task(const ObDDLTaskRecord &record) } } LOG_INFO("schedule ddl task", K(ret), K(record)); + if (OB_ENTRY_EXIST == ret) { + ret = OB_SUCCESS; + } } return ret; } @@ -1711,9 +1714,6 @@ int ObDDLScheduler::schedule_build_index_task( allocator_.free(build_index_task); build_index_task = nullptr; } - if (OB_ENTRY_EXIST == ret) { - ret = OB_SUCCESS; - } return ret; } @@ -1740,9 +1740,6 @@ int ObDDLScheduler::schedule_drop_primary_key_task(const ObDDLTaskRecord &task_r allocator_.free(drop_pk_task); drop_pk_task = nullptr; } - if (OB_ENTRY_EXIST == ret) { - ret = OB_SUCCESS; - } return ret; } @@ -1772,9 +1769,6 @@ int ObDDLScheduler::schedule_table_redefinition_task(const ObDDLTaskRecord &task allocator_.free(redefinition_task); redefinition_task = nullptr; } - if (OB_ENTRY_EXIST == ret) { - ret = OB_SUCCESS; - } return ret; } @@ -1801,9 +1795,6 @@ int ObDDLScheduler::schedule_column_redefinition_task(const ObDDLTaskRecord &tas allocator_.free(redefinition_task); redefinition_task = nullptr; } - if (OB_ENTRY_EXIST == ret) { - ret = OB_SUCCESS; - } return ret; } @@ -1830,9 +1821,6 @@ int ObDDLScheduler::schedule_ddl_retry_task(const ObDDLTaskRecord &task_record) allocator_.free(ddl_retry_task); ddl_retry_task = nullptr; } - if (OB_ENTRY_EXIST == ret) { - ret = OB_SUCCESS; - } return ret; } @@ -1859,9 +1847,6 @@ int ObDDLScheduler::schedule_constraint_task(const ObDDLTaskRecord &task_record) allocator_.free(constraint_task); constraint_task = nullptr; } - if (OB_ENTRY_EXIST == ret) { - ret = OB_SUCCESS; - } return ret; } @@ -1888,9 +1873,6 @@ int ObDDLScheduler::schedule_modify_autoinc_task(const ObDDLTaskRecord &task_rec allocator_.free(modify_autoinc_task); modify_autoinc_task = nullptr; } - if (OB_ENTRY_EXIST == ret) { - ret = OB_SUCCESS; - } return ret; } @@ -1917,9 +1899,6 @@ int ObDDLScheduler::schedule_drop_index_task(const ObDDLTaskRecord &task_record) allocator_.free(drop_index_task); drop_index_task = nullptr; } - if (OB_ENTRY_EXIST == ret) { - ret = OB_SUCCESS; - } return ret; } diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index b110556c49..e80654481f 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -820,6 +820,7 @@ int ObDDLTask::switch_status(const ObDDLTaskStatus new_status, const bool enable int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; bool is_cancel = false; + bool is_standby_tenant = false; int real_ret_code = ret_code; bool is_tenant_dropped = false; ObDDLTaskStatus real_new_status = new_status; @@ -847,6 +848,11 @@ int ObDDLTask::switch_status(const ObDDLTaskStatus new_status, const bool enable } else if (is_tenant_dropped) { need_retry_ = false; LOG_INFO("tenant has been dropped, exit anyway", K(ret), K(tenant_id_)); + } else if (OB_FAIL(ObAllTenantInfoProxy::is_standby_tenant(&root_service->get_sql_proxy(), tenant_id_, is_standby_tenant))) { + LOG_WARN("check is standby tenant failed", K(ret), K(tenant_id_)); + } else if (is_standby_tenant) { + need_retry_ = false; + LOG_INFO("tenant is standby, exit anyway", K(ret), K(tenant_id_)); } else if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), tenant_id_))) { LOG_WARN("start transaction failed", K(ret)); } else { diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index a8bfac4678..cc2c561525 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -424,7 +424,7 @@ public: int64_t get_data_format_version() const { return data_format_version_; } static int fetch_new_task_id(ObMySQLProxy &sql_proxy, int64_t &new_task_id); virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const = 0; - virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) = 0; + virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) = 0; virtual int64_t get_serialize_param_size() const = 0; const ObString &get_ddl_stmt_str() const { return ddl_stmt_str_; } int set_ddl_stmt_str(const ObString &ddl_stmt_str); diff --git a/src/rootserver/ddl_task/ob_drop_index_task.cpp b/src/rootserver/ddl_task/ob_drop_index_task.cpp index b546839610..69dc83ccd9 100644 --- a/src/rootserver/ddl_task/ob_drop_index_task.cpp +++ b/src/rootserver/ddl_task/ob_drop_index_task.cpp @@ -90,7 +90,7 @@ int ObDropIndexTask::init( ret_code_ = task_record.ret_code_; if (nullptr != task_record.message_.ptr()) { int64_t pos = 0; - if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) { + if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) { LOG_WARN("deserialize params from message failed", K(ret)); } } @@ -418,15 +418,17 @@ int ObDropIndexTask::serialize_params_to_message(char *buf, const int64_t buf_si return ret; } -int ObDropIndexTask::deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) +int ObDropIndexTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) { int ret = OB_SUCCESS; obrpc::ObDropIndexArg tmp_drop_index_arg; - if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arg", K(ret), KP(buf), K(buf_size)); + LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(buf_size)); } else if (OB_FAIL(tmp_drop_index_arg.deserialize(buf, buf_size, pos))) { LOG_WARN("deserialize failed", K(ret)); + } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_drop_index_arg))) { + LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_drop_index_arg)); } else if (OB_FAIL(deep_copy_index_arg(allocator_, tmp_drop_index_arg, drop_index_arg_))) { LOG_WARN("deep copy drop index arg failed", K(ret)); } diff --git a/src/rootserver/ddl_task/ob_drop_index_task.h b/src/rootserver/ddl_task/ob_drop_index_task.h index 3a25d8d5ad..50e34ba7e0 100644 --- a/src/rootserver/ddl_task/ob_drop_index_task.h +++ b/src/rootserver/ddl_task/ob_drop_index_task.h @@ -37,7 +37,7 @@ public: virtual int process() override; virtual bool is_valid() const override; virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override; - virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override; + virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override; virtual int64_t get_serialize_param_size() const override; INHERIT_TO_STRING_KV("ObDDLTask", ObDDLTask, KP_(root_service)); virtual void flt_set_task_span_tag() const override; diff --git a/src/rootserver/ddl_task/ob_index_build_task.cpp b/src/rootserver/ddl_task/ob_index_build_task.cpp index 366b3b8fea..be219951e6 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_index_build_task.cpp @@ -400,7 +400,7 @@ int ObIndexBuildTask::init(const ObDDLTaskRecord &task_record) } else if (!task_record.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(task_record)); - } else if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) { + } else if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) { LOG_WARN("deserialize params from message failed", K(ret)); } else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard( task_record.tenant_id_, schema_guard, schema_version))) { @@ -1469,17 +1469,19 @@ int ObIndexBuildTask::serialize_params_to_message(char *buf, const int64_t buf_l return ret; } -int ObIndexBuildTask::deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos) +int ObIndexBuildTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos) { int ret = OB_SUCCESS; ObCreateIndexArg tmp_arg; - if (OB_UNLIKELY(nullptr == buf || data_len <= 0)) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(buf), K(data_len)); + LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len)); } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) { LOG_WARN("fail to deserialize task version", K(ret)); } else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) { LOG_WARN("deserialize table failed", K(ret)); + } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { + LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, create_index_arg_))) { LOG_WARN("deep copy create index arg failed", K(ret)); } else { diff --git a/src/rootserver/ddl_task/ob_index_build_task.h b/src/rootserver/ddl_task/ob_index_build_task.h index e040ebd508..f04d2848f5 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.h +++ b/src/rootserver/ddl_task/ob_index_build_task.h @@ -107,7 +107,7 @@ public: virtual bool is_valid() const override; virtual int collect_longops_stat(share::ObLongopsValue &value) override; virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override; - virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override; + virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override; virtual int64_t get_serialize_param_size() const override; virtual bool support_longops_monitoring() const override { return true; } static int deep_copy_index_arg(common::ObIAllocator &allocator, const obrpc::ObCreateIndexArg &source_arg, obrpc::ObCreateIndexArg &dest_arg); diff --git a/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp b/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp index 953685386a..c51207d96d 100644 --- a/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp +++ b/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp @@ -198,7 +198,7 @@ int ObModifyAutoincTask::init(const ObDDLTaskRecord &task_record) } else if (OB_UNLIKELY(!task_record.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(task_record)); - } else if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) { + } else if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) { LOG_WARN("deserialize params from message failed", K(ret)); } else if (OB_FAIL(set_ddl_stmt_str(task_record.ddl_stmt_str_))) { LOG_WARN("set ddl stmt str failed", K(ret)); @@ -599,17 +599,19 @@ int ObModifyAutoincTask::serialize_params_to_message(char *buf, const int64_t bu return ret; } -int ObModifyAutoincTask::deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos) +int ObModifyAutoincTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos) { int ret = OB_SUCCESS; obrpc::ObAlterTableArg tmp_arg; - if (OB_UNLIKELY(nullptr == buf || data_len <= 0)) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(buf), K(data_len)); + LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len)); } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) { LOG_WARN("fail to deserialize task version", K(ret)); } else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) { LOG_WARN("serialize table failed", K(ret)); + } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { + LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) { LOG_WARN("deep copy table arg failed", K(ret)); } diff --git a/src/rootserver/ddl_task/ob_modify_autoinc_task.h b/src/rootserver/ddl_task/ob_modify_autoinc_task.h index 0eab21fe90..4971763567 100644 --- a/src/rootserver/ddl_task/ob_modify_autoinc_task.h +++ b/src/rootserver/ddl_task/ob_modify_autoinc_task.h @@ -64,7 +64,7 @@ public: int init(const ObDDLTaskRecord &task_record); virtual int process() override; virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override; - virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override; + virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override; virtual int64_t get_serialize_param_size() const override; int notify_update_autoinc_finish(const uint64_t autoinc_val, const int ret_code); virtual void flt_set_task_span_tag() const override; diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index 2555e0c253..4c822a0979 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -99,7 +99,7 @@ int ObTableRedefinitionTask::init(const ObDDLTaskRecord &task_record) } else if (!task_record.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(task_record)); - } else if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) { + } else if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) { LOG_WARN("deserialize params from message failed", K(ret), K(task_record.message_), K(common::lbt())); } else if (OB_FAIL(set_ddl_stmt_str(task_record.ddl_stmt_str_))) { LOG_WARN("set ddl stmt str failed", K(ret)); @@ -962,7 +962,7 @@ int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_ return ret; } -int ObTableRedefinitionTask::deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos) +int ObTableRedefinitionTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos) { int ret = OB_SUCCESS; int8_t copy_indexes = 0; @@ -972,13 +972,15 @@ int ObTableRedefinitionTask::deserlize_params_from_message(const char *buf, cons int8_t ignore_errors = 0; int8_t do_finish = 0; obrpc::ObAlterTableArg tmp_arg; - if (OB_UNLIKELY(nullptr == buf || data_len <= 0)) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(buf), K(data_len)); + LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len)); } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) { LOG_WARN("fail to deserialize task version", K(ret)); } else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) { LOG_WARN("serialize table failed", K(ret)); + } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { + LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) { LOG_WARN("deep copy table arg failed", K(ret)); } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, ¶llelism_))) { diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.h b/src/rootserver/ddl_task/ob_table_redefinition_task.h index f42c76b622..f3a370c751 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.h @@ -57,7 +57,7 @@ public: inline void set_is_ignore_errors(const bool is_ignore_errors) {is_ignore_errors_ = is_ignore_errors;} inline void set_is_do_finish(const bool is_do_finish) {is_do_finish_ = is_do_finish;} virtual int serialize_params_to_message(char *buf, const int64_t buf_len, int64_t &pos) const override; - virtual int deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos) override; + virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos) override; virtual int64_t get_serialize_param_size() const override; int assign(const ObTableRedefinitionTask *table_redef_task); virtual int collect_longops_stat(share::ObLongopsValue &value) override; diff --git a/src/share/ob_ddl_common.cpp b/src/share/ob_ddl_common.cpp index ceb029c670..a0850642d5 100644 --- a/src/share/ob_ddl_common.cpp +++ b/src/share/ob_ddl_common.cpp @@ -1070,7 +1070,7 @@ int ObDDLUtil::get_data_format_version( EXTRACT_VARCHAR_FIELD_MYSQL(*result, "message_unhex", task_message); if (ObDDLType::DDL_CREATE_INDEX == ddl_type) { SMART_VAR(rootserver::ObIndexBuildTask, task) { - if (OB_FAIL(task.deserlize_params_from_message(task_message.ptr(), task_message.length(), pos))) { + if (OB_FAIL(task.deserlize_params_from_message(tenant_id, task_message.ptr(), task_message.length(), pos))) { LOG_WARN("deserialize from msg failed", K(ret)); } else { data_format_version = task.get_data_format_version(); @@ -1078,7 +1078,7 @@ int ObDDLUtil::get_data_format_version( } } else { SMART_VAR(rootserver::ObTableRedefinitionTask, task) { - if (OB_FAIL(task.deserlize_params_from_message(task_message.ptr(), task_message.length(), pos))) { + if (OB_FAIL(task.deserlize_params_from_message(tenant_id, task_message.ptr(), task_message.length(), pos))) { LOG_WARN("deserialize from msg failed", K(ret)); } else { data_format_version = task.get_data_format_version(); @@ -1091,6 +1091,75 @@ int ObDDLUtil::get_data_format_version( return ret; } +static inline void try_replace_user_tenant_id(const uint64_t user_tenant_id, uint64_t &check_tenant_id) +{ + check_tenant_id = !is_user_tenant(check_tenant_id) ? check_tenant_id : user_tenant_id; +} + +int ObDDLUtil::replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObAlterTableArg &alter_table_arg) +{ + int ret = OB_SUCCESS; + if (!is_user_tenant(tenant_id)) { + LOG_TRACE("not user tenant, no need to replace", K(tenant_id)); + } else { + try_replace_user_tenant_id(tenant_id, alter_table_arg.exec_tenant_id_); + for (int64_t i = 0; OB_SUCC(ret) && i < alter_table_arg.index_arg_list_.count(); ++i) { + obrpc::ObIndexArg *index_arg = alter_table_arg.index_arg_list_.at(i); + try_replace_user_tenant_id(tenant_id, index_arg->exec_tenant_id_); + try_replace_user_tenant_id(tenant_id, index_arg->tenant_id_); + } + for (int64_t i = 0; OB_SUCC(ret) && i < alter_table_arg.foreign_key_arg_list_.count(); ++i) { + obrpc::ObCreateForeignKeyArg &fk_arg = alter_table_arg.foreign_key_arg_list_.at(i); + try_replace_user_tenant_id(tenant_id, fk_arg.exec_tenant_id_); + try_replace_user_tenant_id(tenant_id, fk_arg.tenant_id_); + } + if (is_user_tenant(alter_table_arg.alter_table_schema_.get_tenant_id())) { + alter_table_arg.alter_table_schema_.set_tenant_id(tenant_id); + } + try_replace_user_tenant_id(tenant_id, alter_table_arg.sequence_ddl_arg_.exec_tenant_id_); + if (is_user_tenant(alter_table_arg.sequence_ddl_arg_.seq_schema_.get_tenant_id())) { + alter_table_arg.sequence_ddl_arg_.seq_schema_.set_tenant_id(tenant_id); + } + } + return ret; +} + +int ObDDLUtil::replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObCreateIndexArg &create_index_arg) +{ + int ret = OB_SUCCESS; + if (!is_user_tenant(tenant_id)) { + LOG_TRACE("not user tenant, no need to replace", K(tenant_id)); + } else { + try_replace_user_tenant_id(tenant_id, create_index_arg.exec_tenant_id_); + try_replace_user_tenant_id(tenant_id, create_index_arg.tenant_id_); + if (is_user_tenant(create_index_arg.index_schema_.get_tenant_id())) { + create_index_arg.index_schema_.set_tenant_id(tenant_id); + } + } + return ret; +} + +#define REPLACE_DDL_ARG_FUNC(ArgType) \ +int ObDDLUtil::replace_user_tenant_id(const uint64_t tenant_id, ArgType &ddl_arg) \ +{ \ + int ret = OB_SUCCESS; \ + if (!is_user_tenant(tenant_id)) { \ + LOG_TRACE("not user tenant, no need to replace", K(tenant_id)); \ + } else { \ + try_replace_user_tenant_id(tenant_id, ddl_arg.exec_tenant_id_); \ + try_replace_user_tenant_id(tenant_id, ddl_arg.tenant_id_); \ + } \ + return ret; \ +} + +REPLACE_DDL_ARG_FUNC(obrpc::ObDropDatabaseArg) +REPLACE_DDL_ARG_FUNC(obrpc::ObDropTableArg) +REPLACE_DDL_ARG_FUNC(obrpc::ObDropIndexArg) +REPLACE_DDL_ARG_FUNC(obrpc::ObTruncateTableArg) + +#undef REPLACE_DDL_ARG_FUNC + + /****************** ObCheckTabletDataComplementOp *************/ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status( diff --git a/src/share/ob_ddl_common.h b/src/share/ob_ddl_common.h index 873d8ab4d6..b5d9419e15 100644 --- a/src/share/ob_ddl_common.h +++ b/src/share/ob_ddl_common.h @@ -24,7 +24,12 @@ namespace oceanbase namespace obrpc { class ObSrvRpcProxy; -class ObAlterTableArg; +struct ObAlterTableArg; +struct ObDropDatabaseArg; +struct ObDropTableArg; +struct ObDropIndexArg; +struct ObTruncateTableArg; +struct ObCreateIndexArg; } namespace sql { @@ -328,6 +333,13 @@ public: const uint64_t task_id, int64_t &data_format_version); + static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObAlterTableArg &alter_table_arg); + static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObDropDatabaseArg &drop_db_arg); + static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObDropTableArg &drop_table_arg); + static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObDropIndexArg &drop_index_arg); + static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObTruncateTableArg &trucnate_table_arg); + static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObCreateIndexArg &create_index_arg); + private: static int generate_column_name_str( const common::ObIArray &column_names, diff --git a/src/share/ob_tenant_info_proxy.cpp b/src/share/ob_tenant_info_proxy.cpp index ab758bc511..367d16eebd 100644 --- a/src/share/ob_tenant_info_proxy.cpp +++ b/src/share/ob_tenant_info_proxy.cpp @@ -270,6 +270,53 @@ int ObAllTenantInfoProxy::is_standby_tenant( return ret; } +int ObAllTenantInfoProxy::get_primary_tenant_ids( + ObISQLClient *proxy, + ObIArray &tenant_ids) +{ + int ret = OB_SUCCESS; + tenant_ids.reset(); + ObSqlString sql; + ObTenantRole primary_role(ObTenantRole::PRIMARY_TENANT); + if (OB_ISNULL(proxy)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("proxy is null", KR(ret), KP(proxy)); + } else if (OB_FAIL(sql.append_fmt("select tenant_id from %s where tenant_role = '%s'", + OB_ALL_VIRTUAL_TENANT_INFO_TNAME, primary_role.to_str()))) { + LOG_WARN("gnenerate sql failed", K(ret)); + } else { + HEAP_VAR(ObMySQLProxy::MySQLResult, res) { + common::sqlclient::ObMySQLResult *result = NULL; + if (OB_FAIL(proxy->read(res, OB_SYS_TENANT_ID, sql.ptr()))) { + LOG_WARN("failed to read", KR(ret), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get sql result", KR(ret)); + } else { + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END != ret) { + LOG_WARN("get next sql result failed", K(ret)); + } else { + ret = OB_SUCCESS; + break;; + } + } else { + uint64_t tenant_id = OB_INVALID_TENANT_ID; + EXTRACT_INT_FIELD_MYSQL(*result, "tenant_id", tenant_id, uint64_t); + if (OB_FAIL(ret)) { + LOG_WARN("failed to get result", KR(ret)); + } else if (OB_FAIL(tenant_ids.push_back(tenant_id))) { + LOG_WARN("push back tenant id failed", K(ret), K(tenant_id), K(tenant_ids.count())); + } + } + } + } + } + } + return ret; +} + int ObAllTenantInfoProxy::load_tenant_info(const uint64_t tenant_id, ObISQLClient *proxy, const bool for_update, diff --git a/src/share/ob_tenant_info_proxy.h b/src/share/ob_tenant_info_proxy.h index 7428c5f0c3..dded8fa67d 100644 --- a/src/share/ob_tenant_info_proxy.h +++ b/src/share/ob_tenant_info_proxy.h @@ -156,6 +156,10 @@ public: ObISQLClient *proxy, const uint64_t tenant_id, bool &is_standby); + + static int get_primary_tenant_ids( + ObISQLClient *proxy, + ObIArray &tenant_ids); /** * @description: get target tenant's tenant_info from inner table * @param[in] tenant_id diff --git a/src/sql/engine/cmd/ob_ddl_executor_util.cpp b/src/sql/engine/cmd/ob_ddl_executor_util.cpp index 3ba218561c..8aaca31e56 100644 --- a/src/sql/engine/cmd/ob_ddl_executor_util.cpp +++ b/src/sql/engine/cmd/ob_ddl_executor_util.cpp @@ -64,6 +64,7 @@ int ObDDLExecutorUtil::wait_ddl_finish( } else { int tmp_ret = OB_SUCCESS; bool is_tenant_dropped = false; + bool is_tenant_standby = false; while (OB_SUCC(ret)) { if (OB_SUCCESS == ObDDLErrorMessageTableOperator::get_ddl_error_message( tenant_id, task_id, -1 /* target_object_id */, unused_addr, false /* is_ddl_retry_task */, *GCTX.sql_proxy_, error_message, unused_user_msg_len)) { @@ -79,6 +80,13 @@ int ObDDLExecutorUtil::wait_ddl_finish( ret = OB_TENANT_HAS_BEEN_DROPPED; LOG_WARN("tenant has been dropped", K(ret), K(tenant_id)); break; + } else if (OB_TMP_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) { + LOG_WARN("check is standby tenant failed", K(tmp_ret), K(tenant_id)); + } else if (is_tenant_standby) { + ret = OB_STANDBY_READ_ONLY; + FORWARD_USER_ERROR(ret, "DDL not finish, need check"); + LOG_WARN("tenant is standby now, stop wait", K(ret), K(tenant_id)); + break; } else if (OB_FAIL(handle_session_exception(session))) { LOG_WARN("session exeception happened", K(ret), K(is_support_cancel)); if (is_support_cancel && OB_TMP_FAIL(cancel_ddl_task(tenant_id, common_rpc_proxy))) { @@ -100,6 +108,7 @@ int ObDDLExecutorUtil::wait_build_index_finish(const uint64_t tenant_id, const i int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; bool is_tenant_dropped = false; + bool is_tenant_standby = false; ObAddr unused_addr; int64_t unused_user_msg_len = 0; THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + OB_MAX_USER_SPECIFIED_TIMEOUT); @@ -122,6 +131,12 @@ int ObDDLExecutorUtil::wait_build_index_finish(const uint64_t tenant_id, const i } else if (is_tenant_dropped) { ret = OB_TENANT_HAS_BEEN_DROPPED; LOG_WARN("tenant has been dropped", K(ret), K(tenant_id)); + } else if (OB_TMP_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) { + LOG_WARN("check is standby tenant failed", K(tmp_ret), K(tenant_id)); + } else if (is_tenant_standby) { + ret = OB_STANDBY_READ_ONLY; + FORWARD_USER_ERROR(ret, "DDL not finish, need check"); + LOG_WARN("tenant is standby now, stop wait", K(ret), K(tenant_id)); } return ret; } @@ -146,6 +161,7 @@ int ObDDLExecutorUtil::wait_ddl_retry_task_finish( LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id), KP(common_rpc_proxy)); } else { bool is_tenant_dropped = false; + bool is_tenant_standby = false; int tmp_ret = OB_SUCCESS; while (OB_SUCC(ret)) { if (OB_SUCCESS == ObDDLErrorMessageTableOperator::get_ddl_error_message( @@ -189,6 +205,13 @@ int ObDDLExecutorUtil::wait_ddl_retry_task_finish( ret = OB_TENANT_HAS_BEEN_DROPPED; LOG_WARN("tenant has been dropped", K(ret), K(tenant_id)); break; + } else if (OB_TMP_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) { + LOG_WARN("check is standby tenant failed", K(tmp_ret), K(tenant_id)); + } else if (is_tenant_standby) { + ret = OB_STANDBY_READ_ONLY; + FORWARD_USER_ERROR(ret, "DDL not finish, need check"); + LOG_WARN("tenant is standby now, stop wait", K(ret), K(tenant_id)); + break; } else if (OB_FAIL(handle_session_exception(session))) { LOG_WARN("session exeception happened", K(ret)); if (OB_TMP_FAIL(cancel_ddl_task(tenant_id, common_rpc_proxy))) { diff --git a/src/sql/engine/cmd/ob_index_executor.cpp b/src/sql/engine/cmd/ob_index_executor.cpp index fb176a1a4d..ecbf8f7925 100644 --- a/src/sql/engine/cmd/ob_index_executor.cpp +++ b/src/sql/engine/cmd/ob_index_executor.cpp @@ -293,6 +293,7 @@ int ObDropIndexExecutor::wait_drop_index_finish( while (OB_SUCC(ret)) { int tmp_ret = OB_SUCCESS; bool is_tenant_dropped = false; + bool is_tenant_standby = false; if (OB_SUCCESS == share::ObDDLErrorMessageTableOperator::get_ddl_error_message( tenant_id, task_id, -1 /* target_object_id */, unused_addr, false /* is_ddl_retry_task */, *GCTX.sql_proxy_, error_message, unused_user_msg_len)) { ret = error_message.ret_code_; @@ -307,6 +308,13 @@ int ObDropIndexExecutor::wait_drop_index_finish( ret = OB_TENANT_HAS_BEEN_DROPPED; LOG_WARN("tenant has been dropped", K(ret), K(tenant_id)); break; + } else if (OB_TMP_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) { + LOG_WARN("check is standby tenant failed", K(tmp_ret), K(tenant_id)); + } else if (is_tenant_standby) { + ret = OB_STANDBY_READ_ONLY; + FORWARD_USER_ERROR(ret, "DDL not finish, need check"); + LOG_WARN("tenant is standby now, stop wait", K(ret), K(tenant_id)); + break; } else if (OB_FAIL(session.check_session_status())) { LOG_WARN("session exeception happened", K(ret)); } else {