fix direct-load thread hang when unit deleting

This commit is contained in:
medcll 2025-01-02 11:18:40 +00:00 committed by ob-robot
parent e0e8ab0dc5
commit 77a3b2e596
4 changed files with 5 additions and 14 deletions

View File

@ -154,8 +154,7 @@ bool ObTableLoadService::ObGCTask::gc_heart_beat_expired_ctx(ObTableLoadTableCtx
is_removed = true;
}
// check if heart beat expired, ignore coordinator
else if (nullptr == table_ctx->coordinator_ctx_ && nullptr != table_ctx->store_ctx_ &&
table_ctx->store_ctx_->enable_heart_beat_check()) {
else if (nullptr == table_ctx->coordinator_ctx_ && nullptr != table_ctx->store_ctx_) {
if (OB_UNLIKELY(
table_ctx->store_ctx_->check_heart_beat_expired(HEART_BEEAT_EXPIRED_TIME_US))) {
FLOG_INFO("store heart beat expired, abort", K(table_id), K(task_id), K(dest_table_id));
@ -959,6 +958,7 @@ void ObTableLoadService::fail_all_ctx(int error_code)
{
int ret = OB_SUCCESS;
ObArray<ObTableLoadTableCtx *> table_ctx_array;
bool is_stopped = false;
table_ctx_array.set_tenant_id(MTL_ID());
if (OB_FAIL(manager_.get_all_table_ctx(table_ctx_array))) {
LOG_WARN("fail to get all table ctx list", KR(ret));
@ -968,10 +968,12 @@ void ObTableLoadService::fail_all_ctx(int error_code)
// fail coordinator
if (nullptr != table_ctx->coordinator_ctx_) {
table_ctx->coordinator_ctx_->set_status_error(error_code);
ObTableLoadStore::abort_ctx(table_ctx, is_stopped);
}
// fail store
if (nullptr != table_ctx->store_ctx_) {
table_ctx->store_ctx_->set_status_error(error_code);
ObTableLoadStore::abort_ctx(table_ctx, is_stopped);
}
manager_.revert_table_ctx(table_ctx);
}

View File

@ -101,9 +101,7 @@ void ObTableLoadStore::abort_ctx(ObTableLoadTableCtx *ctx, bool &is_stopped)
if (OB_SUCCESS != (tmp_ret = ctx->store_ctx_->set_status_abort())) {
LOG_WARN("fail to set store status abort", KR(tmp_ret));
}
// 2. disable heart beat check
ctx->store_ctx_->set_enable_heart_beat_check(false);
// 3. mark all active trans abort
// 2. mark all active trans abort
if (OB_SUCCESS != (tmp_ret = abort_active_trans(ctx))) {
LOG_WARN("fail to abort active trans", KR(tmp_ret));
}
@ -191,7 +189,6 @@ int ObTableLoadStore::confirm_begin()
} else {
LOG_INFO("store confirm begin");
store_ctx_->heart_beat(); // init heart beat
store_ctx_->set_enable_heart_beat_check(true);
if (ObDirectLoadMethod::is_incremental(param_.method_)) {
if (OB_FAIL(open_insert_table_ctx())) {
LOG_WARN("fail to open insert_table_ctx", KR(ret));
@ -505,7 +502,6 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info,
}
ctx_->reset_assigned_memory();
}
store_ctx_->set_enable_heart_beat_check(false);
result_info = store_ctx_->result_info_;
}
}

View File

@ -64,7 +64,6 @@ ObTableLoadStoreCtx::ObTableLoadStoreCtx(ObTableLoadTableCtx *ctx)
status_(ObTableLoadStatusType::NONE),
error_code_(OB_SUCCESS),
last_heart_beat_ts_(0),
enable_heart_beat_check_(false),
is_inited_(false)
{
allocator_.set_tenant_id(MTL_ID());

View File

@ -105,11 +105,6 @@ public:
int set_status_error(int error_code);
int set_status_abort();
int check_status(table::ObTableLoadStatusType status) const;
OB_INLINE bool enable_heart_beat_check() const { return enable_heart_beat_check_; }
OB_INLINE void set_enable_heart_beat_check(bool enable_heart_beat_check)
{
enable_heart_beat_check_ = enable_heart_beat_check;
}
void heart_beat();
bool check_heart_beat_expired(const uint64_t expired_time_us);
private:
@ -203,7 +198,6 @@ private:
SegmentCtxMap segment_ctx_map_;
common::ObArray<ObTableLoadTransStore *> committed_trans_store_array_;
uint64_t last_heart_beat_ts_;
bool enable_heart_beat_check_;
bool is_inited_;
};