fix incremental direct load ctx leak
This commit is contained in:
		@ -313,6 +313,7 @@ int ObDirectLoadControlAbortExecutor::process()
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    ObTableLoadStore::abort_ctx(table_ctx, res_.is_stopped_);
 | 
					    ObTableLoadStore::abort_ctx(table_ctx, res_.is_stopped_);
 | 
				
			||||||
 | 
					    table_ctx->mark_delete();
 | 
				
			||||||
    if (res_.is_stopped_ && OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
 | 
					    if (res_.is_stopped_ && OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
 | 
				
			||||||
      LOG_WARN("fail to remove table ctx", KR(ret), K(key));
 | 
					      LOG_WARN("fail to remove table ctx", KR(ret), K(key));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -150,7 +150,8 @@ void ObTableLoadService::ObGCTask::runTimerTask()
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
    for (int64_t i = 0; i < table_ctx_array.count(); ++i) {
 | 
					    for (int64_t i = 0; i < table_ctx_array.count(); ++i) {
 | 
				
			||||||
      ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
 | 
					      ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
 | 
				
			||||||
      if (gc_heart_beat_expired_ctx(table_ctx)) {
 | 
					      if (gc_mark_delete(table_ctx)) {
 | 
				
			||||||
 | 
					      } else if (gc_heart_beat_expired_ctx(table_ctx)) {
 | 
				
			||||||
      } else if (gc_table_not_exist_ctx(table_ctx)) {
 | 
					      } else if (gc_table_not_exist_ctx(table_ctx)) {
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      manager.put_table_ctx(table_ctx);
 | 
					      manager.put_table_ctx(table_ctx);
 | 
				
			||||||
@ -158,6 +159,35 @@ void ObTableLoadService::ObGCTask::runTimerTask()
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					bool ObTableLoadService::ObGCTask::gc_mark_delete(ObTableLoadTableCtx *table_ctx)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					  bool is_removed = false;
 | 
				
			||||||
 | 
					  if (OB_ISNULL(table_ctx)) {
 | 
				
			||||||
 | 
					    ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
 | 
					    LOG_WARN("unexpected table ctx is null", KR(ret));
 | 
				
			||||||
 | 
					    is_removed = true;
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    const uint64_t table_id = table_ctx->param_.table_id_;
 | 
				
			||||||
 | 
					    const int64_t task_id = table_ctx->ddl_param_.task_id_;
 | 
				
			||||||
 | 
					    const uint64_t dest_table_id = table_ctx->ddl_param_.dest_table_id_;
 | 
				
			||||||
 | 
					    // check if table ctx is removed
 | 
				
			||||||
 | 
					    if (table_ctx->is_dirty()) {
 | 
				
			||||||
 | 
					      LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), K(task_id), K(dest_table_id),
 | 
				
			||||||
 | 
					                "ref_count", table_ctx->get_ref_count());
 | 
				
			||||||
 | 
					      is_removed = true;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    // check is mark delete
 | 
				
			||||||
 | 
					    else if (table_ctx->is_mark_delete()) {
 | 
				
			||||||
 | 
					      if (table_ctx->is_stopped() && OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
 | 
				
			||||||
 | 
					        LOG_WARN("fail to remove table ctx", KR(ret), K(tenant_id_), K(table_id), K(task_id), K(dest_table_id));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					      is_removed = true; // skip other gc
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  return is_removed;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
bool ObTableLoadService::ObGCTask::gc_heart_beat_expired_ctx(ObTableLoadTableCtx *table_ctx)
 | 
					bool ObTableLoadService::ObGCTask::gc_heart_beat_expired_ctx(ObTableLoadTableCtx *table_ctx)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
@ -168,22 +198,27 @@ bool ObTableLoadService::ObGCTask::gc_heart_beat_expired_ctx(ObTableLoadTableCtx
 | 
				
			|||||||
    is_removed = true;
 | 
					    is_removed = true;
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    const uint64_t table_id = table_ctx->param_.table_id_;
 | 
					    const uint64_t table_id = table_ctx->param_.table_id_;
 | 
				
			||||||
    const uint64_t hidden_table_id = table_ctx->ddl_param_.dest_table_id_;
 | 
					    const int64_t task_id = table_ctx->ddl_param_.task_id_;
 | 
				
			||||||
 | 
					    const uint64_t dest_table_id = table_ctx->ddl_param_.dest_table_id_;
 | 
				
			||||||
    // check if table ctx is removed
 | 
					    // check if table ctx is removed
 | 
				
			||||||
    if (table_ctx->is_dirty()) {
 | 
					    if (table_ctx->is_dirty()) {
 | 
				
			||||||
      LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), "ref_count",
 | 
					      LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), K(task_id), K(dest_table_id),
 | 
				
			||||||
                table_ctx->get_ref_count());
 | 
					                "ref_count", table_ctx->get_ref_count());
 | 
				
			||||||
      is_removed = true;
 | 
					      is_removed = true;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    // check if heart beat expired
 | 
					    // check if heart beat expired, ignore coordinator
 | 
				
			||||||
    else if (nullptr != table_ctx->store_ctx_ && table_ctx->store_ctx_->enable_heart_beat_check()) {
 | 
					    else if (nullptr == table_ctx->coordinator_ctx_ &&
 | 
				
			||||||
 | 
					             nullptr != table_ctx->store_ctx_ &&
 | 
				
			||||||
 | 
					             table_ctx->store_ctx_->enable_heart_beat_check()) {
 | 
				
			||||||
      if (OB_UNLIKELY(
 | 
					      if (OB_UNLIKELY(
 | 
				
			||||||
            table_ctx->store_ctx_->check_heart_beat_expired(HEART_BEEAT_EXPIRED_TIME_US))) {
 | 
					            table_ctx->store_ctx_->check_heart_beat_expired(HEART_BEEAT_EXPIRED_TIME_US))) {
 | 
				
			||||||
        LOG_INFO("store heart beat expired, abort", K(tenant_id_), K(table_id), K(hidden_table_id));
 | 
					        FLOG_INFO("store heart beat expired, abort", K(tenant_id_), K(table_id), K(task_id), K(dest_table_id));
 | 
				
			||||||
        bool is_stopped = false;
 | 
					        bool is_stopped = false;
 | 
				
			||||||
        ObTableLoadStore::abort_ctx(table_ctx, is_stopped);
 | 
					        ObTableLoadStore::abort_ctx(table_ctx, is_stopped);
 | 
				
			||||||
        // 先不移除, 防止心跳超时后, 网络恢复, 控制节点查不到table_ctx, 直接认为已经停止
 | 
					        table_ctx->mark_delete();
 | 
				
			||||||
        // 如果网络一直不恢复, 也可以通过table不存在来gc此table_ctx
 | 
					        if (is_stopped && OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
 | 
				
			||||||
 | 
					          LOG_WARN("fail to remove table ctx", KR(ret), K(tenant_id_), K(table_id), K(task_id), K(dest_table_id));
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        is_removed = true; // skip other gc
 | 
					        is_removed = true; // skip other gc
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -122,6 +122,7 @@ private:
 | 
				
			|||||||
    int init(uint64_t tenant_id);
 | 
					    int init(uint64_t tenant_id);
 | 
				
			||||||
    void runTimerTask() override;
 | 
					    void runTimerTask() override;
 | 
				
			||||||
  private:
 | 
					  private:
 | 
				
			||||||
 | 
					    bool gc_mark_delete(ObTableLoadTableCtx *table_ctx);
 | 
				
			||||||
    bool gc_heart_beat_expired_ctx(ObTableLoadTableCtx *table_ctx);
 | 
					    bool gc_heart_beat_expired_ctx(ObTableLoadTableCtx *table_ctx);
 | 
				
			||||||
    bool gc_table_not_exist_ctx(ObTableLoadTableCtx *table_ctx);
 | 
					    bool gc_table_not_exist_ctx(ObTableLoadTableCtx *table_ctx);
 | 
				
			||||||
  private:
 | 
					  private:
 | 
				
			||||||
 | 
				
			|||||||
@ -40,6 +40,7 @@ ObTableLoadTableCtx::ObTableLoadTableCtx()
 | 
				
			|||||||
    ref_count_(0),
 | 
					    ref_count_(0),
 | 
				
			||||||
    is_assigned_resource_(false),
 | 
					    is_assigned_resource_(false),
 | 
				
			||||||
    is_assigned_memory_(false),
 | 
					    is_assigned_memory_(false),
 | 
				
			||||||
 | 
					    mark_delete_(false),
 | 
				
			||||||
    is_dirty_(false),
 | 
					    is_dirty_(false),
 | 
				
			||||||
    is_inited_(false)
 | 
					    is_inited_(false)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
@ -317,5 +318,17 @@ void ObTableLoadTableCtx::free_trans_ctx(ObTableLoadTransCtx *trans_ctx)
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					bool ObTableLoadTableCtx::is_stopped() const
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  bool bret = true;
 | 
				
			||||||
 | 
					  if (nullptr != coordinator_ctx_ && !coordinator_ctx_->task_scheduler_->is_stopped()) {
 | 
				
			||||||
 | 
					    bret = false;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  if (nullptr != store_ctx_ && !store_ctx_->task_scheduler_->is_stopped()) {
 | 
				
			||||||
 | 
					    bret = false;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  return bret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}  // namespace observer
 | 
					}  // namespace observer
 | 
				
			||||||
}  // namespace oceanbase
 | 
					}  // namespace oceanbase
 | 
				
			||||||
 | 
				
			|||||||
@ -51,8 +51,18 @@ public:
 | 
				
			|||||||
  void set_assigned_memory() { is_assigned_memory_ = true; }
 | 
					  void set_assigned_memory() { is_assigned_memory_ = true; }
 | 
				
			||||||
  bool is_dirty() const { return is_dirty_; }
 | 
					  bool is_dirty() const { return is_dirty_; }
 | 
				
			||||||
  void set_dirty() { is_dirty_ = true; }
 | 
					  void set_dirty() { is_dirty_ = true; }
 | 
				
			||||||
  TO_STRING_KV(K_(param), KP_(coordinator_ctx), KP_(store_ctx), "ref_count", get_ref_count(),
 | 
					  bool is_mark_delete() const { return mark_delete_; }
 | 
				
			||||||
               K_(is_assigned_resource), K_(is_assigned_memory), K_(is_dirty), K_(is_inited));
 | 
					  void mark_delete() { mark_delete_ = true; }
 | 
				
			||||||
 | 
					  bool is_stopped() const;
 | 
				
			||||||
 | 
					  TO_STRING_KV(K_(param),
 | 
				
			||||||
 | 
					               KP_(coordinator_ctx),
 | 
				
			||||||
 | 
					               KP_(store_ctx),
 | 
				
			||||||
 | 
					               "ref_count", get_ref_count(),
 | 
				
			||||||
 | 
					               K_(is_assigned_resource),
 | 
				
			||||||
 | 
					               K_(is_assigned_memory),
 | 
				
			||||||
 | 
					               K_(mark_delete),
 | 
				
			||||||
 | 
					               K_(is_dirty),
 | 
				
			||||||
 | 
					               K_(is_inited));
 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  int init_coordinator_ctx(const common::ObIArray<int64_t> &idx_array,
 | 
					  int init_coordinator_ctx(const common::ObIArray<int64_t> &idx_array,
 | 
				
			||||||
                           ObTableLoadExecCtx *exec_ctx);
 | 
					                           ObTableLoadExecCtx *exec_ctx);
 | 
				
			||||||
@ -85,6 +95,7 @@ private:
 | 
				
			|||||||
  int64_t ref_count_ CACHE_ALIGNED;
 | 
					  int64_t ref_count_ CACHE_ALIGNED;
 | 
				
			||||||
  bool is_assigned_resource_;
 | 
					  bool is_assigned_resource_;
 | 
				
			||||||
  bool is_assigned_memory_;
 | 
					  bool is_assigned_memory_;
 | 
				
			||||||
 | 
					  bool mark_delete_;
 | 
				
			||||||
  volatile bool is_dirty_;
 | 
					  volatile bool is_dirty_;
 | 
				
			||||||
  bool is_inited_;
 | 
					  bool is_inited_;
 | 
				
			||||||
  DISALLOW_COPY_AND_ASSIGN(ObTableLoadTableCtx);
 | 
					  DISALLOW_COPY_AND_ASSIGN(ObTableLoadTableCtx);
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user