From 379fbf754cbff241b09ecfc12ecb514fd407730b Mon Sep 17 00:00:00 2001 From: Hongqin-Li Date: Thu, 29 Jun 2023 12:48:11 +0000 Subject: [PATCH] Acquire table lock before create hidden table --- src/rootserver/ddl_task/ob_ddl_scheduler.cpp | 36 ++++++++-------- src/rootserver/ddl_task/ob_ddl_scheduler.h | 4 ++ src/rootserver/ddl_task/ob_ddl_task.cpp | 7 ++-- src/rootserver/ddl_task/ob_ddl_task.h | 6 ++- src/rootserver/ob_ddl_service.cpp | 43 ++++++++++++-------- 5 files changed, 53 insertions(+), 43 deletions(-) diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index 7bb0d23f90..15028d8eb8 100755 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -881,6 +881,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.dest_table_schema_, param.parallelism_, param.consumer_group_id_, + param.task_id_, static_cast(param.ddl_arg_), *param.allocator_, task_record))) { @@ -895,6 +896,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.dest_table_schema_, param.parallelism_, param.consumer_group_id_, + param.task_id_, alter_table_arg, *param.allocator_, task_record))) { @@ -926,6 +928,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.dest_table_schema_, param.parallelism_, param.consumer_group_id_, + param.task_id_, static_cast(param.ddl_arg_), *param.allocator_, task_record))) { @@ -938,6 +941,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.src_table_schema_->get_table_id(), param.schema_version_, param.consumer_group_id_, + param.task_id_, static_cast(param.ddl_arg_), *param.allocator_, task_record))) { @@ -1430,21 +1434,19 @@ int ObDDLScheduler::create_table_redefinition_task( const share::schema::ObTableSchema *dest_schema, const int64_t parallelism, const int64_t consumer_group_id, + const int64_t task_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record) { int ret = OB_SUCCESS; - int64_t task_id = 0; SMART_VAR(ObTableRedefinitionTask, redefinition_task) { if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLScheduler has not been inited", K(ret)); - } else if (OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) { + } else if (OB_UNLIKELY(0 == task_id) || OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(alter_table_arg), KP(src_schema), KP(dest_schema)); - } else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), src_schema->get_tenant_id(), task_id))) { - LOG_WARN("fetch new task id failed", K(ret)); + LOG_WARN("invalid arguments", K(ret), K(task_id), KP(alter_table_arg), KP(src_schema), KP(dest_schema)); } else if (OB_FAIL(redefinition_task.init(src_schema->get_tenant_id(), task_id, type, @@ -1472,21 +1474,19 @@ int ObDDLScheduler::create_drop_primary_key_task( const ObTableSchema *dest_schema, const int64_t parallelism, const int64_t consumer_group_id, + const int64_t task_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record) { int ret = OB_SUCCESS; - int64_t task_id = 0; SMART_VAR(ObDropPrimaryKeyTask, drop_pk_task) { if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLScheduler has not been inited", K(ret)); - } else if (OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) { + } else if (OB_UNLIKELY(0 == task_id) || OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(alter_table_arg), KP(src_schema), KP(dest_schema)); - } else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), src_schema->get_tenant_id(), task_id))) { - LOG_WARN("fetch new task id failed", K(ret)); + LOG_WARN("invalid arguments", K(ret), K(task_id), KP(alter_table_arg), KP(src_schema), KP(dest_schema)); } else if (OB_FAIL(drop_pk_task.init(src_schema->get_tenant_id(), task_id, type, @@ -1514,21 +1514,19 @@ int ObDDLScheduler::create_column_redefinition_task( const share::schema::ObTableSchema *dest_schema, const int64_t parallelism, const int64_t consumer_group_id, + const int64_t task_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record) { int ret = OB_SUCCESS; - int64_t task_id = 0; SMART_VAR(ObColumnRedefinitionTask, redefinition_task) { if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLScheduler has not been inited", K(ret)); - } else if (OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) { + } else if (OB_UNLIKELY(0 == task_id) || OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(alter_table_arg), KP(src_schema), KP(dest_schema)); - } else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), src_schema->get_tenant_id(), task_id))) { - LOG_WARN("fetch new task id failed", K(ret)); + LOG_WARN("invalid arguments", K(ret), K(task_id), KP(alter_table_arg), KP(src_schema), KP(dest_schema)); } else if (OB_FAIL(redefinition_task.init(src_schema->get_tenant_id(), task_id, type, @@ -1555,22 +1553,20 @@ int ObDDLScheduler::create_modify_autoinc_task( const int64_t table_id, const int64_t schema_version, const int64_t consumer_group_id, + const int64_t task_id, const obrpc::ObAlterTableArg *arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record) { int ret = OB_SUCCESS; SMART_VAR(ObModifyAutoincTask, modify_autoinc_task) { - int64_t task_id = 0; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == table_id - || schema_version <= 0 || nullptr == arg || !arg->is_valid())) { + || schema_version <= 0 || 0 == task_id || nullptr == arg || !arg->is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(tenant_id), K(table_id), K(schema_version), K(arg)); - } else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), tenant_id, task_id))) { - LOG_WARN("fetch new task id failed", K(ret)); + LOG_WARN("invalid argument", K(ret), K(tenant_id), K(table_id), K(schema_version), K(task_id), K(arg)); } else if (OB_FAIL(modify_autoinc_task.init(tenant_id, task_id, table_id, schema_version, consumer_group_id, *arg))) { LOG_WARN("init global index task failed", K(ret), K(table_id), K(arg)); } else if (OB_FAIL(modify_autoinc_task.set_trace_id(*ObCurTraceId::get_trace_id()))) { diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.h b/src/rootserver/ddl_task/ob_ddl_scheduler.h index 13b815f7c6..488dc63851 100755 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.h +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.h @@ -346,6 +346,7 @@ private: const share::schema::ObTableSchema *dest_schema, const int64_t parallelism, const int64_t consumer_group_id, + const int64_t task_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -357,6 +358,7 @@ private: const ObTableSchema *dest_schema, const int64_t parallelism, const int64_t consumer_group_id, + const int64_t task_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -368,6 +370,7 @@ private: const share::schema::ObTableSchema *dest_schema, const int64_t parallelism, const int64_t consumer_group_id, + const int64_t task_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -378,6 +381,7 @@ private: const int64_t table_id, const int64_t schema_version, const int64_t consumer_group_id, + const int64_t task_id, const obrpc::ObAlterTableArg *alter_table_arg, ObIAllocator &allocator, ObDDLTaskRecord &task_record); diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 8d18f7e5b0..bda01f580c 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -157,7 +157,7 @@ OB_SERIALIZE_MEMBER(ObDDLTaskSerializeField, is_abort_); ObCreateDDLTaskParam::ObCreateDDLTaskParam() - : tenant_id_(OB_INVALID_ID), object_id_(OB_INVALID_ID), schema_version_(0), parallelism_(0), consumer_group_id_(0), parent_task_id_(0), + : tenant_id_(OB_INVALID_ID), object_id_(OB_INVALID_ID), schema_version_(0), parallelism_(0), consumer_group_id_(0), parent_task_id_(0), task_id_(0), type_(DDL_INVALID), src_table_schema_(nullptr), dest_table_schema_(nullptr), ddl_arg_(nullptr), allocator_(nullptr) { } @@ -172,9 +172,10 @@ ObCreateDDLTaskParam::ObCreateDDLTaskParam(const uint64_t tenant_id, const int64_t consumer_group_id, ObIAllocator *allocator, const obrpc::ObDDLArg *ddl_arg, - const int64_t parent_task_id) + const int64_t parent_task_id, + const int64_t task_id) : tenant_id_(tenant_id), object_id_(object_id), schema_version_(schema_version), parallelism_(parallelism), consumer_group_id_(consumer_group_id), - parent_task_id_(parent_task_id), type_(type), src_table_schema_(src_table_schema), dest_table_schema_(dest_table_schema), + parent_task_id_(parent_task_id), task_id_(task_id), type_(type), src_table_schema_(src_table_schema), dest_table_schema_(dest_table_schema), ddl_arg_(ddl_arg), allocator_(allocator) { } diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index a9186dd81f..fa3fae3ed9 100755 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -143,11 +143,12 @@ public: const int64_t consumer_group_id, ObIAllocator *allocator, const obrpc::ObDDLArg *ddl_arg = nullptr, - const int64_t parent_task_id = 0); + const int64_t parent_task_id = 0, + const int64_t task_id = 0); ~ObCreateDDLTaskParam() = default; bool is_valid() const { return OB_INVALID_ID != tenant_id_ && type_ > share::DDL_INVALID && type_ < share::DDL_MAX && nullptr != allocator_; } - TO_STRING_KV(K_(tenant_id), K_(object_id), K_(schema_version), K_(parallelism), K_(consumer_group_id), K_(parent_task_id), + TO_STRING_KV(K_(tenant_id), K_(object_id), K_(schema_version), K_(parallelism), K_(consumer_group_id), K_(parent_task_id), K_(task_id), K_(type), KPC_(src_table_schema), KPC_(dest_table_schema), KPC_(ddl_arg)); public: uint64_t tenant_id_; @@ -156,6 +157,7 @@ public: int64_t parallelism_; int64_t consumer_group_id_; int64_t parent_task_id_; + int64_t task_id_; share::ObDDLType type_; const ObTableSchema *src_table_schema_; const ObTableSchema *dest_table_schema_; diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index df548db1f3..2585643b63 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -11213,12 +11213,20 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar } else { ObDDLSQLTransaction trans(schema_service_); ObDDLTaskRecord task_record; + int64_t task_id = 0; int64_t refreshed_schema_version = 0; bool with_primary_key_operation = false; if (OB_FAIL(schema_guard.get_schema_version(tenant_id, refreshed_schema_version))) { LOG_WARN("failed to get tenant schema version", KR(ret), K(tenant_id)); } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, refreshed_schema_version))) { LOG_WARN("start transaction failed", KR(ret), K(tenant_id), K(refreshed_schema_version)); + } else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service->get_sql_proxy(), tenant_id, task_id))) { + LOG_WARN("fetch new task id failed", K(ret)); + } else if (OB_FAIL(ObDDLLock::lock_for_offline_ddl(*orig_table_schema, + nullptr, + ObTableLockOwnerID(task_id), + trans))) { + LOG_WARN("failed to lock ddl lock", K(ret)); } // TODO yiren, refactor it, create user hidden table after alter index/column/part/cst... if (OB_FAIL(ret)) { @@ -11369,7 +11377,9 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar alter_table_arg.parallelism_, alter_table_arg.consumer_group_id_, &alter_table_arg.allocator_, - &alter_table_arg); + &alter_table_arg, + 0/*parent_task_id*/, + task_id); if (orig_table_schema->is_tmp_table() || orig_table_schema->is_external_table()) { ret = OB_OP_NOT_ALLOW; char err_msg[OB_MAX_ERROR_MSG_LEN] = {0}; @@ -11383,11 +11393,6 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar LOG_WARN("failed to alter table that has conflict ddl", K(ret), K(orig_table_schema->get_table_id())); } else if (OB_FAIL(root_service->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) { LOG_WARN("submit ddl task failed", K(ret)); - } else if (OB_FAIL(ObDDLLock::lock_for_offline_ddl(*orig_table_schema, - bind_tablets ? &new_table_schema : nullptr, - ObTableLockOwnerID(task_record.task_id_), - trans))) { - LOG_WARN("failed to lock ddl lock", K(ret)); } else { res.task_id_ = task_record.task_id_; } @@ -11401,14 +11406,11 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar alter_table_arg.parallelism_, alter_table_arg.consumer_group_id_, &alter_table_arg.allocator_, - &alter_table_arg); + &alter_table_arg, + 0/*parent_task_id*/, + task_id); if (OB_FAIL(root_service->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) { LOG_WARN("submit ddl task failed", K(ret)); - } else if (OB_FAIL(ObDDLLock::lock_for_offline_ddl(new_table_schema, - nullptr, - ObTableLockOwnerID(task_record.task_id_), - trans))) { - LOG_WARN("failed to lock ddl lock", K(ret)); } else { res.task_id_ = task_record.task_id_; } @@ -11480,6 +11482,7 @@ int ObDDLService::create_hidden_table( ObDDLSQLTransaction trans(schema_service_); common::ObArenaAllocator allocator; ObDDLTaskRecord task_record; + int64_t task_id = 0; int64_t refreshed_schema_version = 0; new_table_schema.set_tenant_id(dest_tenant_id); new_table_schema.set_table_state_flag(ObTableStateFlag::TABLE_STATE_OFFLINE_DDL); @@ -11487,6 +11490,13 @@ int ObDDLService::create_hidden_table( LOG_WARN("failed to get tenant schema version", KR(ret), K(tenant_id)); } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, refreshed_schema_version))) { LOG_WARN("start transaction failed", KR(ret), K(tenant_id), K(refreshed_schema_version)); + } else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service->get_sql_proxy(), tenant_id, task_id))) { + LOG_WARN("fetch new task id failed", K(ret)); + } else if (OB_FAIL(ObDDLLock::lock_for_offline_ddl(*orig_table_schema, + nullptr, + ObTableLockOwnerID(task_id), + trans))) { + LOG_WARN("failed to lock ddl lock", K(ret)); } else if (OB_FAIL(create_user_hidden_table( *orig_table_schema, new_table_schema, @@ -11527,14 +11537,11 @@ int ObDDLService::create_hidden_table( create_hidden_table_arg.parallelism_, create_hidden_table_arg.consumer_group_id_, &allocator_for_redef, - &alter_table_arg); + &alter_table_arg, + 0, + task_id); if (OB_FAIL(root_service->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) { LOG_WARN("submit ddl task failed", K(ret)); - } else if (OB_FAIL(ObDDLLock::lock_for_offline_ddl(*orig_table_schema, - bind_tablets ? &new_table_schema : nullptr, - ObTableLockOwnerID(task_record.task_id_), - trans))) { - LOG_WARN("failed to lock ddl lock", K(ret)); } else if (orig_table_schema->get_table_state_flag() == ObTableStateFlag::TABLE_STATE_OFFLINE_DDL) { ret = OB_OP_NOT_ALLOW; LOG_WARN("offline ddl is being executed, other ddl operations are not allowed, create hidden table fail", K(ret), K(create_hidden_table_arg));