[CP] Fix direct load work thread hang when finish many trans at once
This commit is contained in:
@ -290,22 +290,15 @@ int ObTableDirectLoadBeginExecutor::do_begin()
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTableLoadCoordinator coordinator(table_ctx_);
|
ObTableLoadCoordinator coordinator(table_ctx_);
|
||||||
|
ObTableLoadTransId trans_id;
|
||||||
if (OB_FAIL(coordinator.init())) {
|
if (OB_FAIL(coordinator.init())) {
|
||||||
LOG_WARN("fail to init coordinator", KR(ret));
|
LOG_WARN("fail to init coordinator", KR(ret));
|
||||||
} else if (OB_FAIL(coordinator.begin())) {
|
} else if (OB_FAIL(coordinator.begin())) {
|
||||||
LOG_WARN("fail to coordinator begin", KR(ret));
|
LOG_WARN("fail to coordinator begin", KR(ret));
|
||||||
}
|
} else if (OB_FAIL(coordinator.start_trans(ObTableLoadSegmentID(1), trans_id))) {
|
||||||
// start trans
|
LOG_WARN("fail to start trans", KR(ret));
|
||||||
for (int64_t i = 1; OB_SUCC(ret) && i <= table_ctx_->param_.session_count_; ++i) {
|
} else {
|
||||||
ObTableLoadSegmentID segment_id(i);
|
client_task_->set_trans_id(trans_id);
|
||||||
ObTableLoadTransId trans_id;
|
|
||||||
if (OB_FAIL(coordinator.start_trans(segment_id, trans_id))) {
|
|
||||||
LOG_WARN("fail to start trans", KR(ret), K(i));
|
|
||||||
} else if (OB_FAIL(client_task_->add_trans_id(trans_id))) {
|
|
||||||
LOG_WARN("fail to add trans id", KR(ret), K(trans_id));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (OB_SUCC(ret)) {
|
|
||||||
if (OB_FAIL(client_task_->set_status_running())) {
|
if (OB_FAIL(client_task_->set_status_running())) {
|
||||||
LOG_WARN("fail to set status running", KR(ret));
|
LOG_WARN("fail to set status running", KR(ret));
|
||||||
}
|
}
|
||||||
@ -459,15 +452,14 @@ int ObTableDirectLoadInsertExecutor::process()
|
|||||||
LOG_WARN("fail to get table ctx", KR(ret));
|
LOG_WARN("fail to get table ctx", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
ObTableLoadCoordinator coordinator(table_ctx);
|
ObTableLoadCoordinator coordinator(table_ctx);
|
||||||
ObTableLoadTransId trans_id;
|
const ObTableLoadTransId &trans_id = client_task->get_trans_id();
|
||||||
int64_t batch_id = client_task->get_next_batch_id();
|
const int64_t batch_id = client_task->get_next_batch_id();
|
||||||
|
const int32_t session_id = batch_id % table_ctx->param_.session_count_ + 1;
|
||||||
if (OB_FAIL(set_batch_seq_no(batch_id, obj_rows))) {
|
if (OB_FAIL(set_batch_seq_no(batch_id, obj_rows))) {
|
||||||
LOG_WARN("fail to set batch seq no", KR(ret));
|
LOG_WARN("fail to set batch seq no", KR(ret));
|
||||||
} else if (OB_FAIL(client_task->get_next_trans_id(trans_id))) {
|
|
||||||
LOG_WARN("fail to get next trans id", KR(ret));
|
|
||||||
} else if (OB_FAIL(coordinator.init())) {
|
} else if (OB_FAIL(coordinator.init())) {
|
||||||
LOG_WARN("fail to init coordinator", KR(ret));
|
LOG_WARN("fail to init coordinator", KR(ret));
|
||||||
} else if (OB_FAIL(coordinator.write(trans_id, obj_rows))) {
|
} else if (OB_FAIL(coordinator.write(trans_id, session_id, 0 /*seq_no*/, obj_rows))) {
|
||||||
LOG_WARN("fail to coordinator write", KR(ret));
|
LOG_WARN("fail to coordinator write", KR(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -106,16 +106,12 @@ private:
|
|||||||
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
|
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
|
||||||
LOG_WARN("fail to check status", KR(ret));
|
LOG_WARN("fail to check status", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
const ObIArray<ObTableLoadTransId> &trans_ids = client_task_->get_trans_ids();
|
|
||||||
ObTableLoadCoordinator coordinator(table_ctx_);
|
ObTableLoadCoordinator coordinator(table_ctx_);
|
||||||
|
const ObTableLoadTransId &trans_id = client_task_->get_trans_id();
|
||||||
if (OB_FAIL(coordinator.init())) {
|
if (OB_FAIL(coordinator.init())) {
|
||||||
LOG_WARN("fail to init coordinator", KR(ret));
|
LOG_WARN("fail to init coordinator", KR(ret));
|
||||||
}
|
} else if (OB_FAIL(coordinator.finish_trans(trans_id))) {
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < trans_ids.count(); ++i) {
|
LOG_WARN("fail to coordinator finish trans", KR(ret), K(trans_id));
|
||||||
const ObTableLoadTransId &trans_id = trans_ids.at(i);
|
|
||||||
if (OB_FAIL(coordinator.finish_trans(trans_id))) {
|
|
||||||
LOG_WARN("fail to coordinator finish trans", KR(ret), K(i), K(trans_id));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
client_task_->set_status_error(ret);
|
client_task_->set_status_error(ret);
|
||||||
@ -126,7 +122,7 @@ private:
|
|||||||
int check_all_trans_commit()
|
int check_all_trans_commit()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const ObIArray<ObTableLoadTransId> &trans_ids = client_task_->get_trans_ids();
|
const ObTableLoadTransId &trans_id = client_task_->get_trans_id();
|
||||||
while (OB_SUCC(ret)) {
|
while (OB_SUCC(ret)) {
|
||||||
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
|
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
|
||||||
LOG_WARN("fail to check status", KR(ret));
|
LOG_WARN("fail to check status", KR(ret));
|
||||||
@ -134,37 +130,33 @@ private:
|
|||||||
if (OB_FAIL(client_task_->get_exec_ctx()->check_status())) {
|
if (OB_FAIL(client_task_->get_exec_ctx()->check_status())) {
|
||||||
LOG_WARN("fail to check exec status", KR(ret));
|
LOG_WARN("fail to check exec status", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
bool all_commit = true;
|
|
||||||
ObTableLoadCoordinator coordinator(table_ctx_);
|
ObTableLoadCoordinator coordinator(table_ctx_);
|
||||||
|
ObTableLoadTransStatusType trans_status = ObTableLoadTransStatusType::NONE;
|
||||||
|
int error_code = OB_SUCCESS;
|
||||||
|
bool try_again = false;
|
||||||
if (OB_FAIL(coordinator.init())) {
|
if (OB_FAIL(coordinator.init())) {
|
||||||
LOG_WARN("fail to init coordinator", KR(ret));
|
LOG_WARN("fail to init coordinator", KR(ret));
|
||||||
}
|
} else if (OB_FAIL(coordinator.get_trans_status(trans_id, trans_status, error_code))) {
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < trans_ids.count(); ++i) {
|
LOG_WARN("fail to coordinator get status", KR(ret), K(trans_id));
|
||||||
const ObTableLoadTransId &trans_id = trans_ids.at(i);
|
} else {
|
||||||
ObTableLoadTransStatusType trans_status = ObTableLoadTransStatusType::NONE;
|
switch (trans_status) {
|
||||||
int error_code = OB_SUCCESS;
|
case ObTableLoadTransStatusType::FROZEN:
|
||||||
if (OB_FAIL(coordinator.get_trans_status(trans_id, trans_status, error_code))) {
|
try_again = true;
|
||||||
LOG_WARN("fail to coordinator get status", KR(ret), K(i), K(trans_id));
|
break;
|
||||||
} else {
|
case ObTableLoadTransStatusType::COMMIT:
|
||||||
switch (trans_status) {
|
break;
|
||||||
case ObTableLoadTransStatusType::FROZEN:
|
case ObTableLoadTransStatusType::ERROR:
|
||||||
all_commit = false;
|
ret = error_code;
|
||||||
break;
|
LOG_WARN("trans has error", KR(ret));
|
||||||
case ObTableLoadTransStatusType::COMMIT:
|
break;
|
||||||
break;
|
default:
|
||||||
case ObTableLoadTransStatusType::ERROR:
|
ret = OB_ERR_UNEXPECTED;
|
||||||
ret = error_code;
|
LOG_WARN("unexpected trans status", KR(ret), K(trans_status));
|
||||||
LOG_WARN("trans has error", KR(ret));
|
break;
|
||||||
break;
|
|
||||||
default:
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("unexpected trans status", KR(ret), K(trans_status));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (all_commit) {
|
if (!try_again) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
ob_usleep(1000 * 1000);
|
ob_usleep(1000 * 1000);
|
||||||
|
|||||||
@ -37,7 +37,6 @@ ObTableLoadClientTask::ObTableLoadClientTask()
|
|||||||
session_info_(nullptr),
|
session_info_(nullptr),
|
||||||
exec_ctx_(nullptr),
|
exec_ctx_(nullptr),
|
||||||
task_scheduler_(nullptr),
|
task_scheduler_(nullptr),
|
||||||
next_trans_idx_(0),
|
|
||||||
next_batch_id_(0),
|
next_batch_id_(0),
|
||||||
table_ctx_(nullptr),
|
table_ctx_(nullptr),
|
||||||
client_status_(ObTableLoadClientStatus::MAX_STATUS),
|
client_status_(ObTableLoadClientStatus::MAX_STATUS),
|
||||||
@ -262,28 +261,6 @@ int ObTableLoadClientTask::get_table_ctx(ObTableLoadTableCtx *&table_ctx)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTableLoadClientTask::add_trans_id(const ObTableLoadTransId &trans_id)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (OB_FAIL(trans_ids_.push_back(trans_id))) {
|
|
||||||
LOG_WARN("fail to push back trans id", KR(ret), K(trans_id));
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadClientTask::get_next_trans_id(ObTableLoadTransId &trans_id)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (OB_UNLIKELY(trans_ids_.empty())) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("unexpected empty trans id", KR(ret));
|
|
||||||
} else {
|
|
||||||
const int64_t trans_idx = ATOMIC_FAA(&next_trans_idx_, 1) % trans_ids_.count();
|
|
||||||
trans_id = trans_ids_.at(trans_idx);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadClientTask::set_status_running()
|
int ObTableLoadClientTask::set_status_running()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
@ -42,13 +42,9 @@ public:
|
|||||||
int get_table_ctx(ObTableLoadTableCtx *&table_ctx);
|
int get_table_ctx(ObTableLoadTableCtx *&table_ctx);
|
||||||
OB_INLINE sql::ObSQLSessionInfo *get_session_info() { return session_info_; }
|
OB_INLINE sql::ObSQLSessionInfo *get_session_info() { return session_info_; }
|
||||||
OB_INLINE ObTableLoadClientExecCtx *get_exec_ctx() { return exec_ctx_; }
|
OB_INLINE ObTableLoadClientExecCtx *get_exec_ctx() { return exec_ctx_; }
|
||||||
int add_trans_id(const table::ObTableLoadTransId &trans_id);
|
OB_INLINE void set_trans_id(const table::ObTableLoadTransId &trans_id) { trans_id_ = trans_id; }
|
||||||
int get_next_trans_id(table::ObTableLoadTransId &trans_id);
|
OB_INLINE const table::ObTableLoadTransId &get_trans_id() const { return trans_id_; }
|
||||||
int64_t get_next_batch_id() { return ATOMIC_FAA(&next_batch_id_, 1); }
|
int64_t get_next_batch_id() { return ATOMIC_FAA(&next_batch_id_, 1); }
|
||||||
OB_INLINE const common::ObIArray<table::ObTableLoadTransId> &get_trans_ids() const
|
|
||||||
{
|
|
||||||
return trans_ids_;
|
|
||||||
}
|
|
||||||
int set_status_running();
|
int set_status_running();
|
||||||
int set_status_committing();
|
int set_status_committing();
|
||||||
int set_status_commit();
|
int set_status_commit();
|
||||||
@ -62,13 +58,12 @@ public:
|
|||||||
int add_task(ObTableLoadTask *task);
|
int add_task(ObTableLoadTask *task);
|
||||||
TO_STRING_KV(K_(tenant_id), K_(user_id), K_(database_id), K_(table_id), K_(ddl_param),
|
TO_STRING_KV(K_(tenant_id), K_(user_id), K_(database_id), K_(table_id), K_(ddl_param),
|
||||||
K_(column_names), K_(column_idxs), K_(result_info), KP_(session_info),
|
K_(column_names), K_(column_idxs), K_(result_info), KP_(session_info),
|
||||||
K_(free_session_ctx), KP_(exec_ctx), KP_(task_scheduler), K_(trans_ids),
|
K_(free_session_ctx), KP_(exec_ctx), KP_(task_scheduler), K_(trans_id),
|
||||||
K_(next_trans_idx), KP_(table_ctx), K_(client_status), K_(error_code),
|
KP_(table_ctx), K_(client_status), K_(error_code), K_(ref_count));
|
||||||
K_(ref_count));
|
|
||||||
private:
|
private:
|
||||||
int create_session_info(uint64_t user_id, uint64_t database_id, uint64_t table_id,
|
int create_session_info(uint64_t user_id, uint64_t database_id, uint64_t table_id,
|
||||||
sql::ObSQLSessionInfo *&session_info,
|
sql::ObSQLSessionInfo *&session_info,
|
||||||
sql::ObFreeSessionCtx &free_session_ctx);
|
sql::ObFreeSessionCtx &free_session_ctx);
|
||||||
int init_column_names_and_idxs();
|
int init_column_names_and_idxs();
|
||||||
int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us);
|
int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us);
|
||||||
public:
|
public:
|
||||||
@ -87,8 +82,7 @@ private:
|
|||||||
ObTableLoadClientExecCtx *exec_ctx_;
|
ObTableLoadClientExecCtx *exec_ctx_;
|
||||||
ObTableLoadObjectAllocator<ObTableLoadTask> task_allocator_;
|
ObTableLoadObjectAllocator<ObTableLoadTask> task_allocator_;
|
||||||
ObITableLoadTaskScheduler *task_scheduler_;
|
ObITableLoadTaskScheduler *task_scheduler_;
|
||||||
common::ObArray<table::ObTableLoadTransId> trans_ids_;
|
table::ObTableLoadTransId trans_id_;
|
||||||
int64_t next_trans_idx_;
|
|
||||||
int64_t next_batch_id_ CACHE_ALIGNED;
|
int64_t next_batch_id_ CACHE_ALIGNED;
|
||||||
mutable obsys::ObRWLock rw_lock_;
|
mutable obsys::ObRWLock rw_lock_;
|
||||||
ObTableLoadTableCtx *table_ctx_;
|
ObTableLoadTableCtx *table_ctx_;
|
||||||
|
|||||||
Reference in New Issue
Block a user