[CP] Fix direct load work thread hang when finish many trans at once

This commit is contained in:
suz-yang 2023-12-22 12:43:06 +00:00 committed by ob-robot
parent a1d0111dba
commit 7dea12b77c
4 changed files with 41 additions and 86 deletions

View File

@ -290,22 +290,15 @@ int ObTableDirectLoadBeginExecutor::do_begin()
{
int ret = OB_SUCCESS;
ObTableLoadCoordinator coordinator(table_ctx_);
ObTableLoadTransId trans_id;
if (OB_FAIL(coordinator.init())) {
LOG_WARN("fail to init coordinator", KR(ret));
} else if (OB_FAIL(coordinator.begin())) {
LOG_WARN("fail to coordinator begin", KR(ret));
}
// start trans
for (int64_t i = 1; OB_SUCC(ret) && i <= table_ctx_->param_.session_count_; ++i) {
ObTableLoadSegmentID segment_id(i);
ObTableLoadTransId trans_id;
if (OB_FAIL(coordinator.start_trans(segment_id, trans_id))) {
LOG_WARN("fail to start trans", KR(ret), K(i));
} else if (OB_FAIL(client_task_->add_trans_id(trans_id))) {
LOG_WARN("fail to add trans id", KR(ret), K(trans_id));
}
}
if (OB_SUCC(ret)) {
} else if (OB_FAIL(coordinator.start_trans(ObTableLoadSegmentID(1), trans_id))) {
LOG_WARN("fail to start trans", KR(ret));
} else {
client_task_->set_trans_id(trans_id);
if (OB_FAIL(client_task_->set_status_running())) {
LOG_WARN("fail to set status running", KR(ret));
}
@ -459,15 +452,14 @@ int ObTableDirectLoadInsertExecutor::process()
LOG_WARN("fail to get table ctx", KR(ret));
} else {
ObTableLoadCoordinator coordinator(table_ctx);
ObTableLoadTransId trans_id;
int64_t batch_id = client_task->get_next_batch_id();
const ObTableLoadTransId &trans_id = client_task->get_trans_id();
const int64_t batch_id = client_task->get_next_batch_id();
const int32_t session_id = batch_id % table_ctx->param_.session_count_ + 1;
if (OB_FAIL(set_batch_seq_no(batch_id, obj_rows))) {
LOG_WARN("fail to set batch seq no", KR(ret));
} else if (OB_FAIL(client_task->get_next_trans_id(trans_id))) {
LOG_WARN("fail to get next trans id", KR(ret));
} else if (OB_FAIL(coordinator.init())) {
LOG_WARN("fail to init coordinator", KR(ret));
} else if (OB_FAIL(coordinator.write(trans_id, obj_rows))) {
} else if (OB_FAIL(coordinator.write(trans_id, session_id, 0 /*seq_no*/, obj_rows))) {
LOG_WARN("fail to coordinator write", KR(ret));
}
}

View File

@ -106,16 +106,12 @@ private:
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
LOG_WARN("fail to check status", KR(ret));
} else {
const ObIArray<ObTableLoadTransId> &trans_ids = client_task_->get_trans_ids();
ObTableLoadCoordinator coordinator(table_ctx_);
const ObTableLoadTransId &trans_id = client_task_->get_trans_id();
if (OB_FAIL(coordinator.init())) {
LOG_WARN("fail to init coordinator", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < trans_ids.count(); ++i) {
const ObTableLoadTransId &trans_id = trans_ids.at(i);
if (OB_FAIL(coordinator.finish_trans(trans_id))) {
LOG_WARN("fail to coordinator finish trans", KR(ret), K(i), K(trans_id));
}
} else if (OB_FAIL(coordinator.finish_trans(trans_id))) {
LOG_WARN("fail to coordinator finish trans", KR(ret), K(trans_id));
}
if (OB_FAIL(ret)) {
client_task_->set_status_error(ret);
@ -126,7 +122,7 @@ private:
int check_all_trans_commit()
{
int ret = OB_SUCCESS;
const ObIArray<ObTableLoadTransId> &trans_ids = client_task_->get_trans_ids();
const ObTableLoadTransId &trans_id = client_task_->get_trans_id();
while (OB_SUCC(ret)) {
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
LOG_WARN("fail to check status", KR(ret));
@ -134,37 +130,33 @@ private:
if (OB_FAIL(client_task_->get_exec_ctx()->check_status())) {
LOG_WARN("fail to check exec status", KR(ret));
} else {
bool all_commit = true;
ObTableLoadCoordinator coordinator(table_ctx_);
ObTableLoadTransStatusType trans_status = ObTableLoadTransStatusType::NONE;
int error_code = OB_SUCCESS;
bool try_again = false;
if (OB_FAIL(coordinator.init())) {
LOG_WARN("fail to init coordinator", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < trans_ids.count(); ++i) {
const ObTableLoadTransId &trans_id = trans_ids.at(i);
ObTableLoadTransStatusType trans_status = ObTableLoadTransStatusType::NONE;
int error_code = OB_SUCCESS;
if (OB_FAIL(coordinator.get_trans_status(trans_id, trans_status, error_code))) {
LOG_WARN("fail to coordinator get status", KR(ret), K(i), K(trans_id));
} else {
switch (trans_status) {
case ObTableLoadTransStatusType::FROZEN:
all_commit = false;
break;
case ObTableLoadTransStatusType::COMMIT:
break;
case ObTableLoadTransStatusType::ERROR:
ret = error_code;
LOG_WARN("trans has error", KR(ret));
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected trans status", KR(ret), K(trans_status));
break;
}
} else if (OB_FAIL(coordinator.get_trans_status(trans_id, trans_status, error_code))) {
LOG_WARN("fail to coordinator get status", KR(ret), K(trans_id));
} else {
switch (trans_status) {
case ObTableLoadTransStatusType::FROZEN:
try_again = true;
break;
case ObTableLoadTransStatusType::COMMIT:
break;
case ObTableLoadTransStatusType::ERROR:
ret = error_code;
LOG_WARN("trans has error", KR(ret));
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected trans status", KR(ret), K(trans_status));
break;
}
}
if (OB_SUCC(ret)) {
if (all_commit) {
if (!try_again) {
break;
} else {
ob_usleep(1000 * 1000);

View File

@ -37,7 +37,6 @@ ObTableLoadClientTask::ObTableLoadClientTask()
session_info_(nullptr),
exec_ctx_(nullptr),
task_scheduler_(nullptr),
next_trans_idx_(0),
next_batch_id_(0),
table_ctx_(nullptr),
client_status_(ObTableLoadClientStatus::MAX_STATUS),
@ -262,28 +261,6 @@ int ObTableLoadClientTask::get_table_ctx(ObTableLoadTableCtx *&table_ctx)
return ret;
}
int ObTableLoadClientTask::add_trans_id(const ObTableLoadTransId &trans_id)
{
int ret = OB_SUCCESS;
if (OB_FAIL(trans_ids_.push_back(trans_id))) {
LOG_WARN("fail to push back trans id", KR(ret), K(trans_id));
}
return ret;
}
int ObTableLoadClientTask::get_next_trans_id(ObTableLoadTransId &trans_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(trans_ids_.empty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected empty trans id", KR(ret));
} else {
const int64_t trans_idx = ATOMIC_FAA(&next_trans_idx_, 1) % trans_ids_.count();
trans_id = trans_ids_.at(trans_idx);
}
return ret;
}
int ObTableLoadClientTask::set_status_running()
{
int ret = OB_SUCCESS;

View File

@ -42,13 +42,9 @@ public:
int get_table_ctx(ObTableLoadTableCtx *&table_ctx);
OB_INLINE sql::ObSQLSessionInfo *get_session_info() { return session_info_; }
OB_INLINE ObTableLoadClientExecCtx *get_exec_ctx() { return exec_ctx_; }
int add_trans_id(const table::ObTableLoadTransId &trans_id);
int get_next_trans_id(table::ObTableLoadTransId &trans_id);
OB_INLINE void set_trans_id(const table::ObTableLoadTransId &trans_id) { trans_id_ = trans_id; }
OB_INLINE const table::ObTableLoadTransId &get_trans_id() const { return trans_id_; }
int64_t get_next_batch_id() { return ATOMIC_FAA(&next_batch_id_, 1); }
OB_INLINE const common::ObIArray<table::ObTableLoadTransId> &get_trans_ids() const
{
return trans_ids_;
}
int set_status_running();
int set_status_committing();
int set_status_commit();
@ -62,13 +58,12 @@ public:
int add_task(ObTableLoadTask *task);
TO_STRING_KV(K_(tenant_id), K_(user_id), K_(database_id), K_(table_id), K_(ddl_param),
K_(column_names), K_(column_idxs), K_(result_info), KP_(session_info),
K_(free_session_ctx), KP_(exec_ctx), KP_(task_scheduler), K_(trans_ids),
K_(next_trans_idx), KP_(table_ctx), K_(client_status), K_(error_code),
K_(ref_count));
K_(free_session_ctx), KP_(exec_ctx), KP_(task_scheduler), K_(trans_id),
KP_(table_ctx), K_(client_status), K_(error_code), K_(ref_count));
private:
int create_session_info(uint64_t user_id, uint64_t database_id, uint64_t table_id,
sql::ObSQLSessionInfo *&session_info,
sql::ObFreeSessionCtx &free_session_ctx);
sql::ObSQLSessionInfo *&session_info,
sql::ObFreeSessionCtx &free_session_ctx);
int init_column_names_and_idxs();
int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us);
public:
@ -87,8 +82,7 @@ private:
ObTableLoadClientExecCtx *exec_ctx_;
ObTableLoadObjectAllocator<ObTableLoadTask> task_allocator_;
ObITableLoadTaskScheduler *task_scheduler_;
common::ObArray<table::ObTableLoadTransId> trans_ids_;
int64_t next_trans_idx_;
table::ObTableLoadTransId trans_id_;
int64_t next_batch_id_ CACHE_ALIGNED;
mutable obsys::ObRWLock rw_lock_;
ObTableLoadTableCtx *table_ctx_;