From 70904af004428ce7bf5d5be1ff67b3aa4a259c20 Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 6 Jun 2023 12:48:18 +0000 Subject: [PATCH] optimise truncate table parallel tenant inner task wait --- src/rootserver/ob_ddl_service.cpp | 4 +- src/share/schema/ob_ddl_trans_controller.cpp | 75 ++++++++++++++------ src/share/schema/ob_ddl_trans_controller.h | 8 +-- 3 files changed, 61 insertions(+), 26 deletions(-) diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index e93335008b..7aaa610982 100644 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -16510,7 +16510,7 @@ int ObDDLService::generate_table_schemas(const ObIArray &o } } int64_t finish_time = ObTimeUtility::current_time(); - LOG_INFO("finish generate_table_schema", KR(ret), "cost_ts", finish_time - start_time); + LOG_INFO("finish generate_table_schema", KR(ret), "cost_ts", finish_time - start_time, K(task_id)); return ret; } @@ -16641,7 +16641,7 @@ int ObDDLService::new_truncate_table_in_trans(const ObIArrayget_ddl_trans_controller().wait_task_ready(task_id, THIS_WORKER.get_timeout_remain()))) { + if (FAILEDx(schema_service_->get_ddl_trans_controller().wait_task_ready(tenant_id, task_id, THIS_WORKER.get_timeout_remain()))) { LOG_WARN("wait_task_ready", KR(ret), K(table_name), K(task_id)); } int64_t wait_task = ObTimeUtility::current_time(); diff --git a/src/share/schema/ob_ddl_trans_controller.cpp b/src/share/schema/ob_ddl_trans_controller.cpp index 1d4c01f4c3..17b6009346 100644 --- a/src/share/schema/ob_ddl_trans_controller.cpp +++ b/src/share/schema/ob_ddl_trans_controller.cpp @@ -203,34 +203,59 @@ int ObDDLTransController::create_task_and_assign_schema_version(const uint64_t t LOG_WARN("register_task_and_assign_schema_version", KR(ret)); } } - if (OB_FAIL(ret)) { - } else if (tasks_.count() > 0 && schema_version_res.at(0) <= tasks_.at(tasks_.count()-1).task_id_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("assign schema_version", KR(ret), K(tasks_), K(schema_version_res)); - } else if (OB_FAIL(tasks_.push_back(TaskDesc{tenant_id, schema_version_res.at(0), false}))) { - LOG_WARN("register_task_and_assign_schema_version", KR(ret)); - } else { - task_id = schema_version_res.at(0); + if (OB_SUCC(ret)) { + int64_t first_schema_version = schema_version_res.at(0); + int64_t last_schema_version = schema_version_res.at(schema_version_res.count() - 1); + // check tenant schema_version + for (int64_t i = tasks_.count() - 1; i >= 0; i--) { + if (tasks_.at(i).tenant_id_ == tenant_id) { + if (first_schema_version <= tasks_.at(i).task_id_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("assign schema_version", KR(ret), K(tasks_), K(schema_version_res)); + } + break; + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(tasks_.push_back(TaskDesc{tenant_id, last_schema_version, false}))) { + LOG_WARN("register_task_and_assign_schema_version", KR(ret)); + } else { + task_id = last_schema_version; + } } } + LOG_INFO("create_task_and_assign_schema_version", KR(ret), K(tenant_id), K(task_id)); return ret; } -int ObDDLTransController::check_task_ready(int64_t task_id, bool &ready) +int ObDDLTransController::check_task_ready(uint64_t tenant_id, int64_t task_id, bool &ready) { int ret = OB_SUCCESS; int idx = OB_INVALID_INDEX; + int pre_task_count = 0; SpinWLockGuard guard(lock_); for (int i = 0; i < tasks_.count(); i++) { + if (tasks_.at(i).tenant_id_ == tenant_id) { + pre_task_count++; + } if (tasks_.at(i).task_id_ == task_id) { + if (tenant_id != tasks_.at(i).tenant_id_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("task tenant_id mismatch", KR(ret), K(tenant_id), K(tasks_)); + } idx = i; break; } } ready = false; - if (OB_INVALID_INDEX == idx) { + if (OB_FAIL(ret)) { + } else if (OB_INVALID_INDEX == idx) { ret = OB_ENTRY_NOT_EXIST; - } else if (0 == idx) { + LOG_WARN("task_id not found", KR(ret), K(tenant_id), K(task_id), K(tasks_)); + } else if (pre_task_count == 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("pre_task is null", KR(ret), K(tenant_id), K(task_id), K(tasks_)); + } else if (pre_task_count == 1) { ready = true; } else { // gc end task @@ -248,14 +273,14 @@ int ObDDLTransController::check_task_ready(int64_t task_id, bool &ready) return ret; } -int ObDDLTransController::wait_task_ready(int64_t task_id, int64_t wait_us) +int ObDDLTransController::wait_task_ready(uint64_t tenant_id, int64_t task_id, int64_t wait_us) { int ret = OB_SUCCESS; bool ready = false; uint64_t cond_idx = task_id % DDL_TASK_COND_SLOT; int64_t start_time = ObTimeUtility::current_time(); while (OB_SUCC(ret) && ObTimeUtility::current_time() - start_time < wait_us) { - if (OB_FAIL(check_task_ready(task_id, ready))) { + if (OB_FAIL(check_task_ready(tenant_id, task_id, ready))) { LOG_WARN("wait_task_ready", KR(ret), K(task_id), K(ready)); } else if (ready) { break; @@ -277,12 +302,13 @@ int ObDDLTransController::remove_task(int64_t task_id) { int ret = OB_SUCCESS; int idx = OB_INVALID_INDEX; + uint64_t tenant_id = OB_INVALID_TENANT_ID; SpinWLockGuard guard(lock_); for (int i = 0; i < tasks_.count(); i++) { if (tasks_.at(i).task_id_ == task_id) { tasks_.at(i).task_end_ = true; idx = i; - uint64_t tenant_id = tasks_.at(i).tenant_id_; + tenant_id = tasks_.at(i).tenant_id_; if (OB_FAIL(tasks_.remove(i))) { LOG_WARN("remove_task fail", KR(ret), K(task_id)); } else if (OB_FAIL(tenants_.set_refactored(tenant_id, 1, 0, 1))) { @@ -296,16 +322,25 @@ int ObDDLTransController::remove_task(int64_t task_id) if (OB_FAIL(ret)) { } else if (OB_INVALID_INDEX == idx) { ret = OB_ENTRY_NOT_EXIST; - } else if (idx < tasks_.count()) { + LOG_WARN("task_id not found", KR(ret), K(task_id), K(tasks_)); + } else if (OB_INVALID_TENANT_ID == tenant_id) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tenant_id is invalid", KR(ret), K(task_id), K(tasks_)); + } else { // wake up next - int64_t next_task_id = tasks_.at(idx).task_id_; - uint64_t cond_idx = next_task_id % DDL_TASK_COND_SLOT; - cond_slot_[cond_idx].broadcast(); + for (int next = idx; next < tasks_.count(); next++) { + if (tasks_.at(next).tenant_id_ == tenant_id) { + int64_t next_task_id = tasks_.at(next).task_id_; + uint64_t cond_idx = next_task_id % DDL_TASK_COND_SLOT; + cond_slot_[cond_idx].broadcast(); + break; + } + } } return ret; } -int ObDDLTransController::check_enable_ddl_trans_new_lock(int64_t tenant_id, bool &res) +int ObDDLTransController::check_enable_ddl_trans_new_lock(uint64_t tenant_id, bool &res) { int ret = OB_SUCCESS; if (!inited_) { @@ -327,7 +362,7 @@ int ObDDLTransController::check_enable_ddl_trans_new_lock(int64_t tenant_id, boo return ret; } -int ObDDLTransController::set_enable_ddl_trans_new_lock(int64_t tenant_id) +int ObDDLTransController::set_enable_ddl_trans_new_lock(uint64_t tenant_id) { int ret = OB_SUCCESS; if (!inited_) { diff --git a/src/share/schema/ob_ddl_trans_controller.h b/src/share/schema/ob_ddl_trans_controller.h index ab328c13e2..15ae120385 100644 --- a/src/share/schema/ob_ddl_trans_controller.h +++ b/src/share/schema/ob_ddl_trans_controller.h @@ -51,16 +51,16 @@ public: const uint64_t schema_version_count, int64_t &task_id, ObIArray &schema_version_res); - int wait_task_ready(int64_t task_id, int64_t wait_us); + int wait_task_ready(uint64_t tenant_id, int64_t task_id, int64_t wait_us); int remove_task(int64_t task_id); - int check_enable_ddl_trans_new_lock(int64_t tenant_id, bool &res); - int set_enable_ddl_trans_new_lock(int64_t tenant_id); + int check_enable_ddl_trans_new_lock(uint64_t tenant_id, bool &res); + int set_enable_ddl_trans_new_lock(uint64_t tenant_id); int broadcast_consensus_version(const int64_t tenant_id, const int64_t schema_version, const ObArray &server_list); private: virtual void run1() override; - int check_task_ready(int64_t task_id, bool &ready); + int check_task_ready(uint64_t tenant_id, int64_t task_id, bool &ready); private: bool inited_; common::ObThreadCond cond_slot_[DDL_TASK_COND_SLOT];