diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp index eb590d58f..b1d9a6418 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp @@ -313,6 +313,7 @@ int ObDirectLoadControlAbortExecutor::process() } } else { ObTableLoadStore::abort_ctx(table_ctx, res_.is_stopped_); + table_ctx->mark_delete(); if (res_.is_stopped_ && OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) { LOG_WARN("fail to remove table ctx", KR(ret), K(key)); } diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index 3542459de..459eb1880 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -150,7 +150,8 @@ void ObTableLoadService::ObGCTask::runTimerTask() } for (int64_t i = 0; i < table_ctx_array.count(); ++i) { ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i); - if (gc_heart_beat_expired_ctx(table_ctx)) { + if (gc_mark_delete(table_ctx)) { + } else if (gc_heart_beat_expired_ctx(table_ctx)) { } else if (gc_table_not_exist_ctx(table_ctx)) { } manager.put_table_ctx(table_ctx); @@ -158,6 +159,35 @@ void ObTableLoadService::ObGCTask::runTimerTask() } } +bool ObTableLoadService::ObGCTask::gc_mark_delete(ObTableLoadTableCtx *table_ctx) +{ + int ret = OB_SUCCESS; + bool is_removed = false; + if (OB_ISNULL(table_ctx)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected table ctx is null", KR(ret)); + is_removed = true; + } else { + const uint64_t table_id = table_ctx->param_.table_id_; + const int64_t task_id = table_ctx->ddl_param_.task_id_; + const uint64_t dest_table_id = table_ctx->ddl_param_.dest_table_id_; + // check if table ctx is removed + if (table_ctx->is_dirty()) { + LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), K(task_id), K(dest_table_id), + "ref_count", table_ctx->get_ref_count()); + is_removed = true; + } + // check is mark delete + else if (table_ctx->is_mark_delete()) { + if (table_ctx->is_stopped() && OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) { + LOG_WARN("fail to remove table ctx", KR(ret), K(tenant_id_), K(table_id), K(task_id), K(dest_table_id)); + } + is_removed = true; // skip other gc + } + } + return is_removed; +} + bool ObTableLoadService::ObGCTask::gc_heart_beat_expired_ctx(ObTableLoadTableCtx *table_ctx) { int ret = OB_SUCCESS; @@ -168,22 +198,27 @@ bool ObTableLoadService::ObGCTask::gc_heart_beat_expired_ctx(ObTableLoadTableCtx is_removed = true; } else { const uint64_t table_id = table_ctx->param_.table_id_; - const uint64_t hidden_table_id = table_ctx->ddl_param_.dest_table_id_; + const int64_t task_id = table_ctx->ddl_param_.task_id_; + const uint64_t dest_table_id = table_ctx->ddl_param_.dest_table_id_; // check if table ctx is removed if (table_ctx->is_dirty()) { - LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), "ref_count", - table_ctx->get_ref_count()); + LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), K(task_id), K(dest_table_id), + "ref_count", table_ctx->get_ref_count()); is_removed = true; } - // check if heart beat expired - else if (nullptr != table_ctx->store_ctx_ && table_ctx->store_ctx_->enable_heart_beat_check()) { + // check if heart beat expired, ignore coordinator + else if (nullptr == table_ctx->coordinator_ctx_ && + nullptr != table_ctx->store_ctx_ && + table_ctx->store_ctx_->enable_heart_beat_check()) { if (OB_UNLIKELY( table_ctx->store_ctx_->check_heart_beat_expired(HEART_BEEAT_EXPIRED_TIME_US))) { - LOG_INFO("store heart beat expired, abort", K(tenant_id_), K(table_id), K(hidden_table_id)); + FLOG_INFO("store heart beat expired, abort", K(tenant_id_), K(table_id), K(task_id), K(dest_table_id)); bool is_stopped = false; ObTableLoadStore::abort_ctx(table_ctx, is_stopped); - // 先不移除, 防止心跳超时后, 网络恢复, 控制节点查不到table_ctx, 直接认为已经停止 - // 如果网络一直不恢复, 也可以通过table不存在来gc此table_ctx + table_ctx->mark_delete(); + if (is_stopped && OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) { + LOG_WARN("fail to remove table ctx", KR(ret), K(tenant_id_), K(table_id), K(task_id), K(dest_table_id)); + } is_removed = true; // skip other gc } } diff --git a/src/observer/table_load/ob_table_load_service.h b/src/observer/table_load/ob_table_load_service.h index ee0ca6622..db5687abc 100644 --- a/src/observer/table_load/ob_table_load_service.h +++ b/src/observer/table_load/ob_table_load_service.h @@ -122,6 +122,7 @@ private: int init(uint64_t tenant_id); void runTimerTask() override; private: + bool gc_mark_delete(ObTableLoadTableCtx *table_ctx); bool gc_heart_beat_expired_ctx(ObTableLoadTableCtx *table_ctx); bool gc_table_not_exist_ctx(ObTableLoadTableCtx *table_ctx); private: diff --git a/src/observer/table_load/ob_table_load_table_ctx.cpp b/src/observer/table_load/ob_table_load_table_ctx.cpp index 5c0ec6de6..3f399746e 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.cpp +++ b/src/observer/table_load/ob_table_load_table_ctx.cpp @@ -40,6 +40,7 @@ ObTableLoadTableCtx::ObTableLoadTableCtx() ref_count_(0), is_assigned_resource_(false), is_assigned_memory_(false), + mark_delete_(false), is_dirty_(false), is_inited_(false) { @@ -317,5 +318,17 @@ void ObTableLoadTableCtx::free_trans_ctx(ObTableLoadTransCtx *trans_ctx) } } +bool ObTableLoadTableCtx::is_stopped() const +{ + bool bret = true; + if (nullptr != coordinator_ctx_ && !coordinator_ctx_->task_scheduler_->is_stopped()) { + bret = false; + } + if (nullptr != store_ctx_ && !store_ctx_->task_scheduler_->is_stopped()) { + bret = false; + } + return bret; +} + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_table_ctx.h b/src/observer/table_load/ob_table_load_table_ctx.h index 62fa66fea..dc7567116 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.h +++ b/src/observer/table_load/ob_table_load_table_ctx.h @@ -51,8 +51,18 @@ public: void set_assigned_memory() { is_assigned_memory_ = true; } bool is_dirty() const { return is_dirty_; } void set_dirty() { is_dirty_ = true; } - TO_STRING_KV(K_(param), KP_(coordinator_ctx), KP_(store_ctx), "ref_count", get_ref_count(), - K_(is_assigned_resource), K_(is_assigned_memory), K_(is_dirty), K_(is_inited)); + bool is_mark_delete() const { return mark_delete_; } + void mark_delete() { mark_delete_ = true; } + bool is_stopped() const; + TO_STRING_KV(K_(param), + KP_(coordinator_ctx), + KP_(store_ctx), + "ref_count", get_ref_count(), + K_(is_assigned_resource), + K_(is_assigned_memory), + K_(mark_delete), + K_(is_dirty), + K_(is_inited)); public: int init_coordinator_ctx(const common::ObIArray &idx_array, ObTableLoadExecCtx *exec_ctx); @@ -85,6 +95,7 @@ private: int64_t ref_count_ CACHE_ALIGNED; bool is_assigned_resource_; bool is_assigned_memory_; + bool mark_delete_; volatile bool is_dirty_; bool is_inited_; DISALLOW_COPY_AND_ASSIGN(ObTableLoadTableCtx);