[CP] Fix table direct load deadlock wait
This commit is contained in:
parent
3347f6118c
commit
b8f2ed8666
@ -94,7 +94,7 @@ int ObTableDirectLoadBeginExecutor::process()
|
||||
}
|
||||
|
||||
// get the existing client task if it exists
|
||||
if (OB_SUCC(ret)) {
|
||||
while (OB_SUCC(ret)) {
|
||||
ObTableLoadKey key(tenant_id, table_id);
|
||||
if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task_))) {
|
||||
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
|
||||
@ -102,6 +102,7 @@ int ObTableDirectLoadBeginExecutor::process()
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
client_task_ = nullptr;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
bool need_wait_finish = false;
|
||||
@ -129,13 +130,15 @@ int ObTableDirectLoadBeginExecutor::process()
|
||||
LOG_WARN("unexpected client status", KR(ret), KPC(client_task_), K(client_status));
|
||||
break;
|
||||
}
|
||||
if (OB_SUCC(ret) && need_wait_finish) {
|
||||
if (OB_FAIL(ObTableLoadClientService::wait_task_finish(client_task_, wait_client_status))) {
|
||||
LOG_WARN("fail to wait client task finish", KR(ret), KPC(client_task_),
|
||||
K(wait_client_status));
|
||||
} else {
|
||||
ObTableLoadClientService::revert_task(client_task_);
|
||||
client_task_ = nullptr;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (!need_wait_finish) {
|
||||
break;
|
||||
} else {
|
||||
ObTableLoadUniqueKey task_key(table_id, client_task_->ddl_param_.task_id_);
|
||||
ObTableLoadClientService::revert_task(client_task_);
|
||||
client_task_ = nullptr;
|
||||
if (OB_FAIL(ObTableLoadClientService::wait_task_finish(task_key))) {
|
||||
LOG_WARN("fail to wait client task finish", KR(ret), K(task_key), K(wait_client_status));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -370,14 +373,14 @@ int ObTableDirectLoadAbortExecutor::process()
|
||||
LOG_WARN("fail to get client task", KR(ret), K(key));
|
||||
} else if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) {
|
||||
LOG_WARN("fail to abort client task", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadClientService::wait_task_finish(client_task,
|
||||
ObTableLoadClientStatus::ABORT))) {
|
||||
LOG_WARN("fail to abort client task", KR(ret));
|
||||
}
|
||||
if (nullptr != client_task) {
|
||||
ObTableLoadClientService::revert_task(client_task);
|
||||
client_task = nullptr;
|
||||
}
|
||||
if (OB_SUCC(ret) && OB_FAIL(ObTableLoadClientService::wait_task_finish(key))) {
|
||||
LOG_WARN("fail to wait client task finish", KR(ret), K(key));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -490,6 +490,21 @@ int ObTableLoadClientService::get_task(const ObTableLoadKey &key,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::exist_task(const ObTableLoadUniqueKey &key, bool &is_exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadService *service = nullptr;
|
||||
if (OB_ISNULL(service = MTL(ObTableLoadService *))) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("null table load service", KR(ret));
|
||||
} else {
|
||||
if (OB_FAIL(service->get_client_service().exist_client_task(key, is_exist))) {
|
||||
LOG_WARN("fail to check exist client task", KR(ret), K(key));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::commit_task(ObTableLoadClientTask *client_task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -528,34 +543,27 @@ int ObTableLoadClientService::abort_task(ObTableLoadClientTask *client_task)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::wait_task_finish(ObTableLoadClientTask *client_task,
|
||||
ObTableLoadClientStatus client_status)
|
||||
int ObTableLoadClientService::wait_task_finish(const ObTableLoadUniqueKey &key)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(nullptr == client_task)) {
|
||||
if (OB_UNLIKELY(!key.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KPC(client_task));
|
||||
} else if (OB_FAIL(client_task->check_status(client_status))) {
|
||||
LOG_WARN("fail to check status", KR(ret), KPC(client_task), K(client_status));
|
||||
LOG_WARN("invalid args", KR(ret), K(key));
|
||||
} else {
|
||||
bool is_exist = true;
|
||||
ObTimeoutCtx ctx;
|
||||
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, 10LL * 1000 * 1000))) {
|
||||
LOG_WARN("fail to set default timeout ctx", KR(ret));
|
||||
}
|
||||
while (OB_SUCC(ret)) {
|
||||
while (OB_SUCC(ret) && is_exist) {
|
||||
if (ctx.is_timeouted()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("timeouted", KR(ret), K(ctx));
|
||||
} else {
|
||||
ObTableLoadClientStatus client_status = client_task->get_status();
|
||||
if (client_task->get_ref_count() > 2) {
|
||||
// wait
|
||||
ob_usleep(100LL * 1000);
|
||||
} else if (OB_FAIL(remove_task(client_task))) {
|
||||
LOG_WARN("fail to remove client task", KR(ret), KPC(client_task));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else if (OB_FAIL(exist_task(key, is_exist))) {
|
||||
LOG_WARN("fail to check exist client task", KR(ret), K(key));
|
||||
} else if (is_exist) {
|
||||
// wait
|
||||
ob_usleep(100LL * 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -731,6 +739,29 @@ int ObTableLoadClientService::get_client_task_by_table_id(uint64_t table_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::exist_client_task(const ObTableLoadUniqueKey &key, bool &is_exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_exist = false;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadClientService not init", KR(ret), KP(this));
|
||||
} else {
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
ObTableLoadClientTask *client_task = nullptr;
|
||||
if (OB_FAIL(client_task_map_.get_refactored(key, client_task))) {
|
||||
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get refactored", KR(ret), K(key));
|
||||
} else {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
}
|
||||
} else {
|
||||
is_exist = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObTableLoadClientService::get_client_task_count() const
|
||||
{
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
|
@ -46,16 +46,17 @@ public:
|
||||
static int remove_task(ObTableLoadClientTask *client_task);
|
||||
static int get_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *&client_task);
|
||||
static int get_task(const ObTableLoadKey &key, ObTableLoadClientTask *&client_task);
|
||||
static int exist_task(const ObTableLoadUniqueKey &key, bool &is_exist);
|
||||
static int commit_task(ObTableLoadClientTask *client_task);
|
||||
static int abort_task(ObTableLoadClientTask *client_task);
|
||||
static int wait_task_finish(ObTableLoadClientTask *client_task,
|
||||
table::ObTableLoadClientStatus client_status);
|
||||
static int wait_task_finish(const ObTableLoadUniqueKey &key);
|
||||
|
||||
int add_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task);
|
||||
int remove_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task);
|
||||
int get_all_client_task(common::ObIArray<ObTableLoadClientTask *> &client_task_array);
|
||||
int get_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *&client_task);
|
||||
int get_client_task_by_table_id(uint64_t table_id, ObTableLoadClientTask *&client_task);
|
||||
int exist_client_task(const ObTableLoadUniqueKey &key, bool &is_exist);
|
||||
int64_t get_client_task_count() const;
|
||||
void purge_client_task();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user