fix distinguish ha dag/dag_net

This commit is contained in:
a1iive
2023-05-23 09:17:24 +00:00
committed by ob-robot
parent 5d1467efcc
commit 2bc0002996
7 changed files with 40 additions and 46 deletions

View File

@ -1666,7 +1666,7 @@ void ObTenantDagScheduler::destroy()
}
if (dag_net_map_[RUNNING_DAG_NET_MAP].created()) {
for (DagNetMap::iterator iter = dag_net_map_[RUNNING_DAG_NET_MAP].begin(); iter != dag_net_map_[RUNNING_DAG_NET_MAP].end(); ++iter) {
const bool ha_dag_net = is_ha_dag_net(iter->second->get_type());
const bool ha_dag_net = iter->second->is_ha_dag_net();
iter->second->~ObIDagNet();
if (ha_dag_net) {
ha_allocator_.free(iter->second);
@ -1733,7 +1733,7 @@ void ObTenantDagScheduler::inner_free_dag(ObIDag &dag)
if (OB_UNLIKELY(nullptr != dag.prev_ || nullptr != dag.next_)) {
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag is in dag_list", K(dag), K(dag.prev_), K(dag.next_));
}
const bool ha_dag = is_ha_dag(dag.get_type());
const bool ha_dag = dag.is_ha_dag();
dag.~ObIDag();
if (ha_dag) {
ha_allocator_.free(&dag);

View File

@ -336,6 +336,8 @@ public:
virtual int generate_next_dag(ObIDag *&next_dag) { UNUSED(next_dag); return common::OB_ITER_END; }
virtual int set_result(const int32_t result) { UNUSED(result); return common::OB_SUCCESS; }
virtual bool is_ha_dag() const { return false; }
DECLARE_VIRTUAL_TO_STRING;
DISABLE_COPY_ASSIGN(ObIDag);
public:
@ -473,6 +475,7 @@ public:
{
return OB_SUCCESS;
}
virtual bool is_ha_dag_net() const { return true; }
public:
friend class ObTenantDagScheduler;
@ -822,17 +825,6 @@ public:
int cancel_dag_net(const ObDagId &dag_id);
int get_complement_data_dag_progress(const ObIDag *dag, int64_t &row_scanned, int64_t &row_inserted);
OB_INLINE bool is_ha_dag(ObDagType::ObDagTypeEnum type) const
{
return ObDagType::DAG_TYPE_MIGRATE <= type &&
ObDagType::DAG_TYPE_REMOVE_MEMBER >= type;
}
OB_INLINE bool is_ha_dag_net(ObDagNetType::ObDagNetTypeEnum type) const
{
return ObDagNetType::DAG_NET_TYPE_MIGARTION <= type &&
ObDagNetType::DAG_NET_TYPE_BACKUP_CLEAN >= type;
}
private:
typedef common::ObDList<ObIDag> DagList;
typedef common::ObDList<ObIDagNet> DagNetList;
@ -1006,7 +998,7 @@ int ObTenantDagScheduler::alloc_dag(T *&dag)
COMMON_LOG(WARN, "Dag Object is too large", K(ret), K(sizeof(T)));
} else {
T tmp_dag;
common::ObFIFOAllocator *allocator = is_ha_dag(tmp_dag.get_type()) ? &ha_allocator_ : &allocator_;
common::ObFIFOAllocator *allocator = tmp_dag.is_ha_dag() ? &ha_allocator_ : &allocator_;
if (NULL == (buf = allocator->alloc(sizeof(T)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
COMMON_LOG(WARN, "failed to alloc dag", K(ret));
@ -1030,7 +1022,7 @@ template<typename T>
void ObTenantDagScheduler::free_dag_net(T *&dag_net)
{
if (OB_NOT_NULL(dag_net)) {
const bool ha_dag_net = is_ha_dag_net(dag_net->get_type());
const bool ha_dag_net = dag_net->is_ha_dag_net();
dag_net->~T();
if (ha_dag_net) {
ha_allocator_.free(dag_net);
@ -1054,7 +1046,7 @@ int ObTenantDagScheduler::create_and_add_dag_net(const ObIDagInitParam *param, T
COMMON_LOG(WARN, "scheduler is not init", K(ret));
} else {
T tmp_dag_net;
common::ObFIFOAllocator *allocator = is_ha_dag_net(tmp_dag_net.get_type()) ? &ha_allocator_ : &allocator_;
common::ObFIFOAllocator *allocator = tmp_dag_net.is_ha_dag_net() ? &ha_allocator_ : &allocator_;
if (NULL == (buf = allocator->alloc(sizeof(T)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
COMMON_LOG(WARN, "failed to alloc dag_net", K(ret));

View File

@ -227,6 +227,7 @@ class ObBackupDag : public share::ObIDag
public:
explicit ObBackupDag(const ObBackupDagSubType &sub_type);
virtual ~ObBackupDag();
virtual bool is_ha_dag() const override { return true; }
ObBackupDagSubType get_sub_type() const { return sub_type_; };
INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(sub_type));
protected:

View File

@ -100,6 +100,7 @@ public:
{ return lib::Worker::CompatMode::MYSQL; }
virtual uint64_t get_consumer_group_id() const override
{ return consumer_group_id_; }
virtual bool is_ha_dag() const override { return true; }
int create_first_task();
INHERIT_TO_STRING_KV("ObIDag", ObIDag, KP(this), K_(param), K_(result));

View File

@ -648,38 +648,36 @@ int ObCompactionDiagnoseMgr::do_tenant_major_merge_diagnose(
} else {
ObTenantTabletScheduler *scheduler = MTL(ObTenantTabletScheduler*);
const int64_t frozen_scn = MAX(scheduler->get_frozen_version(), MTL(ObTenantFreezeInfoMgr*)->get_latest_frozen_version());
if (frozen_scn == scheduler->get_merged_version()) {
SMART_VAR(ObArray<ObTabletReplica>, uncompacted_tablets) {
if (OB_FAIL(major_freeze_service->get_uncompacted_tablets(uncompacted_tablets))) {
LOG_WARN("fail to get uncompacted tablets", KR(ret));
} else {
int64_t uncompacted_tablets_cnt = uncompacted_tablets.count();
LOG_INFO("finish get uncompacted tablets for diagnose", K(ret), K(uncompacted_tablets_cnt));
for (int64_t i = 0; (OB_SUCCESS == ret) && i < uncompacted_tablets_cnt; ++i) {
if (can_add_diagnose_info()) {
bool compaction_scn_not_valid = frozen_scn > uncompacted_tablets.at(i).get_snapshot_version();
const char *status =
ObTabletReplica::SCN_STATUS_ERROR == uncompacted_tablets.at(i).get_status()
? "CHECKSUM_ERROR"
: (compaction_scn_not_valid ? "compaction_scn_not_update" : "report_scn_not_update");
if (OB_FAIL(SET_DIAGNOSE_INFO(
info_array_[idx_++], MAJOR_MERGE, MTL_ID(),
uncompacted_tablets.at(i).get_ls_id(),
uncompacted_tablets.at(i).get_tablet_id(),
ObCompactionDiagnoseInfo::DIA_STATUS_RS_UNCOMPACTED,
ObTimeUtility::fast_current_time(), "server",
uncompacted_tablets.at(i).get_server(), "status", status,
"frozen_scn", frozen_scn,
"compaction_scn", uncompacted_tablets.at(i).get_snapshot_version(),
"report_scn", uncompacted_tablets.at(i).get_report_scn()))) {
LOG_WARN("fail to set diagnose info", KR(ret), "uncompacted_tablet",
uncompacted_tablets.at(i));
ret = OB_SUCCESS; // ignore ret, and process next uncompacted_tablet
}
} else {
LOG_INFO("can not add diagnose info", K_(idx), K_(max_cnt), "uncompacted_tablet",
SMART_VAR(ObArray<ObTabletReplica>, uncompacted_tablets) {
if (OB_FAIL(major_freeze_service->get_uncompacted_tablets(uncompacted_tablets))) {
LOG_WARN("fail to get uncompacted tablets", KR(ret));
} else {
int64_t uncompacted_tablets_cnt = uncompacted_tablets.count();
LOG_INFO("finish get uncompacted tablets for diagnose", K(ret), K(uncompacted_tablets_cnt));
for (int64_t i = 0; (OB_SUCCESS == ret) && i < uncompacted_tablets_cnt; ++i) {
if (can_add_diagnose_info()) {
bool compaction_scn_not_valid = frozen_scn > uncompacted_tablets.at(i).get_snapshot_version();
const char *status =
ObTabletReplica::SCN_STATUS_ERROR == uncompacted_tablets.at(i).get_status()
? "CHECKSUM_ERROR"
: (compaction_scn_not_valid ? "compaction_scn_not_update" : "report_scn_not_update");
if (OB_FAIL(SET_DIAGNOSE_INFO(
info_array_[idx_++], MAJOR_MERGE, MTL_ID(),
uncompacted_tablets.at(i).get_ls_id(),
uncompacted_tablets.at(i).get_tablet_id(),
ObCompactionDiagnoseInfo::DIA_STATUS_RS_UNCOMPACTED,
ObTimeUtility::fast_current_time(), "server",
uncompacted_tablets.at(i).get_server(), "status", status,
"frozen_scn", frozen_scn,
"compaction_scn", uncompacted_tablets.at(i).get_snapshot_version(),
"report_scn", uncompacted_tablets.at(i).get_report_scn()))) {
LOG_WARN("fail to set diagnose info", KR(ret), "uncompacted_tablet",
uncompacted_tablets.at(i));
ret = OB_SUCCESS; // ignore ret, and process next uncompacted_tablet
}
} else {
LOG_INFO("can not add diagnose info", K_(idx), K_(max_cnt), "uncompacted_tablet",
uncompacted_tablets.at(i));
}
}
}

View File

@ -77,6 +77,7 @@ public:
virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; }
virtual int init_by_param(const share::ObIDagInitParam *param) override;
virtual int create_first_task() override;
virtual bool is_ha_dag() const override { return true; }
protected:
bool is_inited_;
ObLSRemoveMemberCtx ctx_;

View File

@ -149,6 +149,7 @@ public:
{ return consumer_group_id_; }
ObStorageHADagType get_sub_type() const { return sub_type_; }
ObIHADagNetCtx *get_ha_dag_net_ctx() const { return ha_dag_net_ctx_; }
virtual bool is_ha_dag() const override { return true; }
INHERIT_TO_STRING_KV("ObIDag", ObIDag, KPC_(ha_dag_net_ctx), K_(sub_type), K_(result_mgr));
protected: