Fix direct load client task not auto abort
This commit is contained in:
@ -90,6 +90,41 @@ int ObTableDirectLoadBeginExecutor::process()
|
||||
ret = OB_SUCCESS;
|
||||
client_task_ = nullptr;
|
||||
}
|
||||
} else {
|
||||
bool need_wait_finish = false;
|
||||
ObTableLoadClientStatus wait_client_status;
|
||||
ObTableLoadClientStatus client_status = client_task_->get_status();
|
||||
switch (client_status) {
|
||||
case ObTableLoadClientStatus::RUNNING:
|
||||
case ObTableLoadClientStatus::COMMITTING:
|
||||
if (arg_.force_create_) {
|
||||
if (OB_FAIL(ObTableLoadClientService::abort_task(client_task_))) {
|
||||
LOG_WARN("fail to abort client task", KR(ret));
|
||||
} else {
|
||||
need_wait_finish = true;
|
||||
wait_client_status = ObTableLoadClientStatus::ABORT;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case ObTableLoadClientStatus::COMMIT:
|
||||
case ObTableLoadClientStatus::ABORT:
|
||||
need_wait_finish = true;
|
||||
wait_client_status = client_status;
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected client status", KR(ret), KPC(client_task_), K(client_status));
|
||||
break;
|
||||
}
|
||||
if (OB_SUCC(ret) && need_wait_finish) {
|
||||
if (OB_FAIL(ObTableLoadClientService::wait_task_finish(client_task_, wait_client_status))) {
|
||||
LOG_WARN("fail to wait client task finish", KR(ret), KPC(client_task_),
|
||||
K(wait_client_status));
|
||||
} else {
|
||||
ObTableLoadClientService::revert_task(client_task_);
|
||||
client_task_ = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -322,6 +357,9 @@ int ObTableDirectLoadAbortExecutor::process()
|
||||
LOG_WARN("fail to get client task", KR(ret), K(key));
|
||||
} else if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) {
|
||||
LOG_WARN("fail to abort client task", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadClientService::wait_task_finish(client_task,
|
||||
ObTableLoadClientStatus::ABORT))) {
|
||||
LOG_WARN("fail to abort client task", KR(ret));
|
||||
}
|
||||
if (nullptr != client_task) {
|
||||
ObTableLoadClientService::revert_task(client_task);
|
||||
|
@ -25,7 +25,8 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginArg,
|
||||
parallel_,
|
||||
max_error_row_count_,
|
||||
dup_action_,
|
||||
timeout_);
|
||||
timeout_,
|
||||
force_create_);
|
||||
|
||||
OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginRes,
|
||||
table_id_,
|
||||
|
@ -30,16 +30,19 @@ public:
|
||||
: parallel_(0),
|
||||
max_error_row_count_(0),
|
||||
dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE),
|
||||
timeout_(0)
|
||||
timeout_(0),
|
||||
force_create_(false)
|
||||
{
|
||||
}
|
||||
TO_STRING_KV(K_(table_name), K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout));
|
||||
TO_STRING_KV(K_(table_name), K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout),
|
||||
K_(force_create));
|
||||
public:
|
||||
ObString table_name_;
|
||||
int64_t parallel_;
|
||||
uint64_t max_error_row_count_;
|
||||
sql::ObLoadDupActionType dup_action_;
|
||||
int64_t timeout_;
|
||||
bool force_create_;
|
||||
};
|
||||
|
||||
struct ObTableDirectLoadBeginRes
|
||||
|
@ -528,6 +528,40 @@ int ObTableLoadClientService::abort_task(ObTableLoadClientTask *client_task)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::wait_task_finish(ObTableLoadClientTask *client_task,
|
||||
ObTableLoadClientStatus client_status)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(nullptr == client_task)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KPC(client_task));
|
||||
} else if (OB_FAIL(client_task->check_status(client_status))) {
|
||||
LOG_WARN("fail to check status", KR(ret), KPC(client_task), K(client_status));
|
||||
} else {
|
||||
ObTimeoutCtx ctx;
|
||||
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, 10LL * 1000 * 1000))) {
|
||||
LOG_WARN("fail to set default timeout ctx", KR(ret));
|
||||
}
|
||||
while (OB_SUCC(ret)) {
|
||||
if (ctx.is_timeouted()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("timeouted", KR(ret), K(ctx));
|
||||
} else {
|
||||
ObTableLoadClientStatus client_status = client_task->get_status();
|
||||
if (client_task->get_ref_count() > 2) {
|
||||
// wait
|
||||
ob_usleep(100LL * 1000);
|
||||
} else if (OB_FAIL(remove_task(client_task))) {
|
||||
LOG_WARN("fail to remove client task", KR(ret), KPC(client_task));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::add_client_task(const ObTableLoadUniqueKey &key,
|
||||
ObTableLoadClientTask *client_task)
|
||||
{
|
||||
|
@ -48,6 +48,8 @@ public:
|
||||
static int get_task(const ObTableLoadKey &key, ObTableLoadClientTask *&client_task);
|
||||
static int commit_task(ObTableLoadClientTask *client_task);
|
||||
static int abort_task(ObTableLoadClientTask *client_task);
|
||||
static int wait_task_finish(ObTableLoadClientTask *client_task,
|
||||
table::ObTableLoadClientStatus client_status);
|
||||
|
||||
int add_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task);
|
||||
int remove_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task);
|
||||
|
@ -218,6 +218,7 @@ void ObTableLoadSchema::reset()
|
||||
column_descs_.reset();
|
||||
multi_version_column_descs_.reset();
|
||||
datum_utils_.reset();
|
||||
cmp_funcs_.reset();
|
||||
partition_ids_.reset();
|
||||
allocator_.reset();
|
||||
is_inited_ = false;
|
||||
@ -327,7 +328,7 @@ int ObTableLoadSchema::prepare_col_desc(const ObTableSchema *table_schema, commo
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadSchema::init_cmp_funcs(const ObArray<ObColDesc> &col_descs,
|
||||
int ObTableLoadSchema::init_cmp_funcs(const ObIArray<ObColDesc> &col_descs,
|
||||
const bool is_oracle_mode)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -48,7 +48,7 @@ public:
|
||||
private:
|
||||
int init_table_schema(const share::schema::ObTableSchema *table_schema);
|
||||
int prepare_col_desc(const ObTableSchema *table_schema, common::ObIArray<share::schema::ObColDesc> &col_descs);
|
||||
int init_cmp_funcs(const common::ObArray<share::schema::ObColDesc> &column_descs,
|
||||
int init_cmp_funcs(const common::ObIArray<share::schema::ObColDesc> &column_descs,
|
||||
const bool is_oracle_mode);
|
||||
public:
|
||||
common::ObArenaAllocator allocator_;
|
||||
|
@ -175,6 +175,48 @@ void ObTableLoadService::ObReleaseTask::runTimerTask()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ObClientTaskAutoAbortTask
|
||||
*/
|
||||
|
||||
int ObTableLoadService::ObClientTaskAutoAbortTask::init(uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("ObTableLoadService::ObClientTaskAutoAbortTask init twice", KR(ret), KP(this));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableLoadService::ObClientTaskAutoAbortTask::runTimerTask()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadService::ObClientTaskAutoAbortTask not init", KR(ret), KP(this));
|
||||
} else {
|
||||
LOG_DEBUG("table load auto abort client task", K(tenant_id_));
|
||||
ObArray<ObTableLoadClientTask *> client_task_array;
|
||||
if (OB_FAIL(service_.get_client_service().get_all_client_task(client_task_array))) {
|
||||
LOG_WARN("fail to get all client task", KR(ret));
|
||||
} else {
|
||||
for (int64_t i = 0; i < client_task_array.count(); ++i) {
|
||||
ObTableLoadClientTask *client_task = client_task_array.at(i);
|
||||
if (ObTableLoadClientStatus::ERROR == client_task->get_status()) {
|
||||
if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) {
|
||||
LOG_WARN("fail to abort client task", KR(ret), KPC(client_task));
|
||||
}
|
||||
}
|
||||
ObTableLoadClientService::revert_task(client_task);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ObClientTaskPurgeTask
|
||||
*/
|
||||
@ -372,6 +414,7 @@ ObTableLoadService::ObTableLoadService()
|
||||
: check_tenant_task_(*this),
|
||||
gc_task_(*this),
|
||||
release_task_(*this),
|
||||
client_task_auto_abort_task_(*this),
|
||||
client_task_purge_task_(*this),
|
||||
is_stop_(false),
|
||||
is_inited_(false)
|
||||
@ -394,6 +437,8 @@ int ObTableLoadService::init(uint64_t tenant_id)
|
||||
LOG_WARN("fail to init gc task", KR(ret));
|
||||
} else if (OB_FAIL(release_task_.init(tenant_id))) {
|
||||
LOG_WARN("fail to init release task", KR(ret));
|
||||
} else if (OB_FAIL(client_task_auto_abort_task_.init(tenant_id))) {
|
||||
LOG_WARN("fail to init client task auto abort task", KR(ret));
|
||||
} else if (OB_FAIL(client_task_purge_task_.init(tenant_id))) {
|
||||
LOG_WARN("fail to init client task purge task", KR(ret));
|
||||
} else {
|
||||
@ -418,6 +463,9 @@ int ObTableLoadService::start()
|
||||
LOG_WARN("fail to schedule gc task", KR(ret));
|
||||
} else if (OB_FAIL(timer_.schedule(release_task_, RELEASE_INTERVAL, true))) {
|
||||
LOG_WARN("fail to schedule release task", KR(ret));
|
||||
} else if (OB_FAIL(timer_.schedule(client_task_auto_abort_task_,
|
||||
CLIENT_TASK_AUTO_ABORT_INTERVAL, true))) {
|
||||
LOG_WARN("fail to schedule client task auto abort task", KR(ret));
|
||||
} else if (OB_FAIL(
|
||||
timer_.schedule(client_task_purge_task_, CLIENT_TASK_PURGE_INTERVAL, true))) {
|
||||
LOG_WARN("fail to schedule client task purge task", KR(ret));
|
||||
|
@ -55,7 +55,8 @@ private:
|
||||
static const int64_t CHECK_TENANT_INTERVAL = 1LL * 1000 * 1000; // 1s
|
||||
static const int64_t GC_INTERVAL = 30LL * 1000 * 1000; // 30s
|
||||
static const int64_t RELEASE_INTERVAL = 1LL * 1000 * 1000; // 1s
|
||||
static const int64_t CLIENT_TASK_PURGE_INTERVAL = 1LL * 1000 * 1000; // 30s
|
||||
static const int64_t CLIENT_TASK_AUTO_ABORT_INTERVAL = 1LL * 1000 * 1000; // 1s
|
||||
static const int64_t CLIENT_TASK_PURGE_INTERVAL = 1LL * 1000 * 1000; // 1s
|
||||
class ObCheckTenantTask : public common::ObTimerTask
|
||||
{
|
||||
public:
|
||||
@ -95,6 +96,21 @@ private:
|
||||
uint64_t tenant_id_;
|
||||
bool is_inited_;
|
||||
};
|
||||
class ObClientTaskAutoAbortTask : public common::ObTimerTask
|
||||
{
|
||||
public:
|
||||
ObClientTaskAutoAbortTask(ObTableLoadService &service)
|
||||
: service_(service), tenant_id_(common::OB_INVALID_ID), is_inited_(false)
|
||||
{
|
||||
}
|
||||
virtual ~ObClientTaskAutoAbortTask() = default;
|
||||
int init(uint64_t tenant_id);
|
||||
void runTimerTask() override;
|
||||
private:
|
||||
ObTableLoadService &service_;
|
||||
uint64_t tenant_id_;
|
||||
bool is_inited_;
|
||||
};
|
||||
class ObClientTaskPurgeTask : public common::ObTimerTask
|
||||
{
|
||||
public:
|
||||
@ -117,6 +133,7 @@ private:
|
||||
ObCheckTenantTask check_tenant_task_;
|
||||
ObGCTask gc_task_;
|
||||
ObReleaseTask release_task_;
|
||||
ObClientTaskAutoAbortTask client_task_auto_abort_task_;
|
||||
ObClientTaskPurgeTask client_task_purge_task_;
|
||||
volatile bool is_stop_;
|
||||
bool is_inited_;
|
||||
|
Reference in New Issue
Block a user