optimise truncate table parallel tenant inner task wait

This commit is contained in:
obdev
2023-06-06 12:48:18 +00:00
committed by ob-robot
parent 25539cc048
commit 70904af004
3 changed files with 61 additions and 26 deletions

View File

@ -16510,7 +16510,7 @@ int ObDDLService::generate_table_schemas(const ObIArray<const ObTableSchema*> &o
}
}
int64_t finish_time = ObTimeUtility::current_time();
LOG_INFO("finish generate_table_schema", KR(ret), "cost_ts", finish_time - start_time);
LOG_INFO("finish generate_table_schema", KR(ret), "cost_ts", finish_time - start_time, K(task_id));
return ret;
}
@ -16641,7 +16641,7 @@ int ObDDLService::new_truncate_table_in_trans(const ObIArray<const ObTableSchema
int64_t before_wait_task = ObTimeUtility::current_time();
// Serial Submit
if (FAILEDx(schema_service_->get_ddl_trans_controller().wait_task_ready(task_id, THIS_WORKER.get_timeout_remain()))) {
if (FAILEDx(schema_service_->get_ddl_trans_controller().wait_task_ready(tenant_id, task_id, THIS_WORKER.get_timeout_remain()))) {
LOG_WARN("wait_task_ready", KR(ret), K(table_name), K(task_id));
}
int64_t wait_task = ObTimeUtility::current_time();

View File

@ -203,34 +203,59 @@ int ObDDLTransController::create_task_and_assign_schema_version(const uint64_t t
LOG_WARN("register_task_and_assign_schema_version", KR(ret));
}
}
if (OB_FAIL(ret)) {
} else if (tasks_.count() > 0 && schema_version_res.at(0) <= tasks_.at(tasks_.count()-1).task_id_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("assign schema_version", KR(ret), K(tasks_), K(schema_version_res));
} else if (OB_FAIL(tasks_.push_back(TaskDesc{tenant_id, schema_version_res.at(0), false}))) {
LOG_WARN("register_task_and_assign_schema_version", KR(ret));
} else {
task_id = schema_version_res.at(0);
if (OB_SUCC(ret)) {
int64_t first_schema_version = schema_version_res.at(0);
int64_t last_schema_version = schema_version_res.at(schema_version_res.count() - 1);
// check tenant schema_version
for (int64_t i = tasks_.count() - 1; i >= 0; i--) {
if (tasks_.at(i).tenant_id_ == tenant_id) {
if (first_schema_version <= tasks_.at(i).task_id_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("assign schema_version", KR(ret), K(tasks_), K(schema_version_res));
}
break;
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tasks_.push_back(TaskDesc{tenant_id, last_schema_version, false}))) {
LOG_WARN("register_task_and_assign_schema_version", KR(ret));
} else {
task_id = last_schema_version;
}
}
}
LOG_INFO("create_task_and_assign_schema_version", KR(ret), K(tenant_id), K(task_id));
return ret;
}
int ObDDLTransController::check_task_ready(int64_t task_id, bool &ready)
int ObDDLTransController::check_task_ready(uint64_t tenant_id, int64_t task_id, bool &ready)
{
int ret = OB_SUCCESS;
int idx = OB_INVALID_INDEX;
int pre_task_count = 0;
SpinWLockGuard guard(lock_);
for (int i = 0; i < tasks_.count(); i++) {
if (tasks_.at(i).tenant_id_ == tenant_id) {
pre_task_count++;
}
if (tasks_.at(i).task_id_ == task_id) {
if (tenant_id != tasks_.at(i).tenant_id_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task tenant_id mismatch", KR(ret), K(tenant_id), K(tasks_));
}
idx = i;
break;
}
}
ready = false;
if (OB_INVALID_INDEX == idx) {
if (OB_FAIL(ret)) {
} else if (OB_INVALID_INDEX == idx) {
ret = OB_ENTRY_NOT_EXIST;
} else if (0 == idx) {
LOG_WARN("task_id not found", KR(ret), K(tenant_id), K(task_id), K(tasks_));
} else if (pre_task_count == 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pre_task is null", KR(ret), K(tenant_id), K(task_id), K(tasks_));
} else if (pre_task_count == 1) {
ready = true;
} else {
// gc end task
@ -248,14 +273,14 @@ int ObDDLTransController::check_task_ready(int64_t task_id, bool &ready)
return ret;
}
int ObDDLTransController::wait_task_ready(int64_t task_id, int64_t wait_us)
int ObDDLTransController::wait_task_ready(uint64_t tenant_id, int64_t task_id, int64_t wait_us)
{
int ret = OB_SUCCESS;
bool ready = false;
uint64_t cond_idx = task_id % DDL_TASK_COND_SLOT;
int64_t start_time = ObTimeUtility::current_time();
while (OB_SUCC(ret) && ObTimeUtility::current_time() - start_time < wait_us) {
if (OB_FAIL(check_task_ready(task_id, ready))) {
if (OB_FAIL(check_task_ready(tenant_id, task_id, ready))) {
LOG_WARN("wait_task_ready", KR(ret), K(task_id), K(ready));
} else if (ready) {
break;
@ -277,12 +302,13 @@ int ObDDLTransController::remove_task(int64_t task_id)
{
int ret = OB_SUCCESS;
int idx = OB_INVALID_INDEX;
uint64_t tenant_id = OB_INVALID_TENANT_ID;
SpinWLockGuard guard(lock_);
for (int i = 0; i < tasks_.count(); i++) {
if (tasks_.at(i).task_id_ == task_id) {
tasks_.at(i).task_end_ = true;
idx = i;
uint64_t tenant_id = tasks_.at(i).tenant_id_;
tenant_id = tasks_.at(i).tenant_id_;
if (OB_FAIL(tasks_.remove(i))) {
LOG_WARN("remove_task fail", KR(ret), K(task_id));
} else if (OB_FAIL(tenants_.set_refactored(tenant_id, 1, 0, 1))) {
@ -296,16 +322,25 @@ int ObDDLTransController::remove_task(int64_t task_id)
if (OB_FAIL(ret)) {
} else if (OB_INVALID_INDEX == idx) {
ret = OB_ENTRY_NOT_EXIST;
} else if (idx < tasks_.count()) {
LOG_WARN("task_id not found", KR(ret), K(task_id), K(tasks_));
} else if (OB_INVALID_TENANT_ID == tenant_id) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant_id is invalid", KR(ret), K(task_id), K(tasks_));
} else {
// wake up next
int64_t next_task_id = tasks_.at(idx).task_id_;
uint64_t cond_idx = next_task_id % DDL_TASK_COND_SLOT;
cond_slot_[cond_idx].broadcast();
for (int next = idx; next < tasks_.count(); next++) {
if (tasks_.at(next).tenant_id_ == tenant_id) {
int64_t next_task_id = tasks_.at(next).task_id_;
uint64_t cond_idx = next_task_id % DDL_TASK_COND_SLOT;
cond_slot_[cond_idx].broadcast();
break;
}
}
}
return ret;
}
int ObDDLTransController::check_enable_ddl_trans_new_lock(int64_t tenant_id, bool &res)
int ObDDLTransController::check_enable_ddl_trans_new_lock(uint64_t tenant_id, bool &res)
{
int ret = OB_SUCCESS;
if (!inited_) {
@ -327,7 +362,7 @@ int ObDDLTransController::check_enable_ddl_trans_new_lock(int64_t tenant_id, boo
return ret;
}
int ObDDLTransController::set_enable_ddl_trans_new_lock(int64_t tenant_id)
int ObDDLTransController::set_enable_ddl_trans_new_lock(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (!inited_) {

View File

@ -51,16 +51,16 @@ public:
const uint64_t schema_version_count,
int64_t &task_id,
ObIArray<int64_t> &schema_version_res);
int wait_task_ready(int64_t task_id, int64_t wait_us);
int wait_task_ready(uint64_t tenant_id, int64_t task_id, int64_t wait_us);
int remove_task(int64_t task_id);
int check_enable_ddl_trans_new_lock(int64_t tenant_id, bool &res);
int set_enable_ddl_trans_new_lock(int64_t tenant_id);
int check_enable_ddl_trans_new_lock(uint64_t tenant_id, bool &res);
int set_enable_ddl_trans_new_lock(uint64_t tenant_id);
int broadcast_consensus_version(const int64_t tenant_id,
const int64_t schema_version,
const ObArray<ObAddr> &server_list);
private:
virtual void run1() override;
int check_task_ready(int64_t task_id, bool &ready);
int check_task_ready(uint64_t tenant_id, int64_t task_id, bool &ready);
private:
bool inited_;
common::ObThreadCond cond_slot_[DDL_TASK_COND_SLOT];