fix recycle freeze info can not schedule compaction in standy tenant
This commit is contained in:
@ -1386,7 +1386,9 @@ inline bool is_compaction_dag(ObDagType::ObDagTypeEnum dag_type)
|
|||||||
ObDagType::DAG_TYPE_CO_MERGE_FINISH == dag_type ||
|
ObDagType::DAG_TYPE_CO_MERGE_FINISH == dag_type ||
|
||||||
ObDagType::DAG_TYPE_MAJOR_MERGE == dag_type ||
|
ObDagType::DAG_TYPE_MAJOR_MERGE == dag_type ||
|
||||||
ObDagType::DAG_TYPE_MINI_MERGE == dag_type ||
|
ObDagType::DAG_TYPE_MINI_MERGE == dag_type ||
|
||||||
ObDagType::DAG_TYPE_MERGE_EXECUTE == dag_type;
|
ObDagType::DAG_TYPE_MERGE_EXECUTE == dag_type ||
|
||||||
|
ObDagType::DAG_TYPE_TX_TABLE_MERGE == dag_type ||
|
||||||
|
ObDagType::DAG_TYPE_MDS_TABLE_MERGE == dag_type;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline int dag_yield()
|
inline int dag_yield()
|
||||||
|
|||||||
@ -1220,40 +1220,33 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
const int64_t last_major_snapshot = tablet.get_last_major_snapshot_version();
|
const int64_t last_major_snapshot = tablet.get_last_major_snapshot_version(); // current compaction scn
|
||||||
if (MTL(ObTenantTabletScheduler *)->could_major_merge_start() && last_major_snapshot > 0) {
|
if (MTL(ObTenantTabletScheduler *)->could_major_merge_start() && last_major_snapshot > 0) {
|
||||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
bool is_standy_tenant = !MTL_TENANT_ROLE_CACHE_IS_PRIMARY_OR_INVALID();
|
||||||
const ObLSID &ls_id = ls.get_ls_id();
|
if (is_standy_tenant && !scheduler_called) {
|
||||||
ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info
|
// standy tenant should not visit inner table if it is not scheduler_called, wait for scheduler loop
|
||||||
const ObMediumCompactionInfoList *medium_list = nullptr;
|
} else {
|
||||||
bool schedule_flag = false;
|
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||||
const int64_t inner_table_merged_version = MTL(ObTenantTabletScheduler *)->get_inner_table_merged_scn();
|
const ObLSID &ls_id = ls.get_ls_id();
|
||||||
if (OB_FAIL(tablet.read_medium_info_list(temp_allocator, medium_list))) {
|
ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info
|
||||||
LOG_WARN("failed to load medium info list", K(ret), K(tablet));
|
const ObMediumCompactionInfoList *medium_list = nullptr;
|
||||||
} else if (ObMediumCompactionInfo::MAJOR_COMPACTION == medium_list->get_last_compaction_type()
|
bool schedule_flag = false;
|
||||||
&& inner_table_merged_version < medium_list->get_last_compaction_scn()
|
const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot; // broadcast scn
|
||||||
&& !MTL_TENANT_ROLE_CACHE_IS_PRIMARY_OR_INVALID()) { // for STANDBY/RESTORE TENANT
|
ObMediumCompactionInfo::ObCompactionType compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX;
|
||||||
ObTabletCompactionScnInfo ret_info;
|
int64_t schedule_scn = 0; // medium_snapshot in medium info
|
||||||
// for standby/restore tenant, need select inner_table to check RS status before schedule new round
|
if (OB_FAIL(tablet.read_medium_info_list(temp_allocator, medium_list))) {
|
||||||
if (!scheduler_called) { // should not visit inner table, wait for scheduler loop
|
LOG_WARN("failed to load medium info list", K(ret), K(tablet));
|
||||||
} else if (OB_FAIL(get_status_from_inner_table(ls_id, tablet_id, ret_info))) {
|
} else if (OB_FAIL(read_medium_info_from_list(*medium_list, last_major_snapshot, major_frozen_snapshot, compaction_type, schedule_scn))) {
|
||||||
LOG_WARN("failed to get status from inner tablet", K(ret), K(ls_id), K(tablet_id));
|
LOG_WARN("failed to read medium info from list", K(ret), K(ls_id), K(tablet_id), KPC(medium_list), K(last_major_snapshot));
|
||||||
} else if (ret_info.could_schedule_next_round(medium_list->get_last_compaction_scn())) {
|
} else if (is_standy_tenant) { // for STANDBY/RESTORE TENANT
|
||||||
LOG_INFO("success to check RS major checksum validation finished", K(ret), K(ls_id), K(tablet_id));
|
if (OB_FAIL(decide_standy_tenant_schedule(ls_id, tablet_id, compaction_type, schedule_scn, major_frozen_snapshot, *medium_list, schedule_flag))) {
|
||||||
|
LOG_WARN("failed to decide whehter to schedule standy schedule", K(ret), K(ls_id), K(tablet_id), K(compaction_type), K(schedule_scn), K(major_frozen_snapshot), K(medium_list));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
schedule_flag = true;
|
schedule_flag = true;
|
||||||
}
|
}
|
||||||
} else {
|
if (OB_FAIL(ret) || !schedule_flag) {
|
||||||
schedule_flag = true;
|
} else if (schedule_scn > 0 && OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, tablet_need_freeze_flag, create_dag_flag))) {
|
||||||
}
|
|
||||||
if (OB_FAIL(ret) || !schedule_flag) {
|
|
||||||
} else {
|
|
||||||
const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot;
|
|
||||||
int64_t schedule_scn = 0;
|
|
||||||
if (OB_FAIL(read_medium_info_from_list(*medium_list, last_major_snapshot,
|
|
||||||
major_frozen_snapshot, schedule_scn))) {
|
|
||||||
LOG_WARN("failed to read medium info from list", K(ret), K(ls_id), K(tablet_id), KPC(medium_list), K(last_major_snapshot));
|
|
||||||
} else if (schedule_scn > 0
|
|
||||||
&& OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, tablet_need_freeze_flag, create_dag_flag))) {
|
|
||||||
LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn));
|
LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1262,10 +1255,47 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObMediumCompactionScheduleFunc::decide_standy_tenant_schedule(
|
||||||
|
const ObLSID &ls_id,
|
||||||
|
const ObTabletID &tablet_id,
|
||||||
|
const ObMediumCompactionInfo::ObCompactionType &compaction_type,
|
||||||
|
const int64_t schedule_scn,
|
||||||
|
const int64_t major_frozen_snapshot,
|
||||||
|
const ObMediumCompactionInfoList &medium_list,
|
||||||
|
bool &schedule_flag)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
schedule_flag = false;
|
||||||
|
if (ObMediumCompactionInfo::MAJOR_COMPACTION == compaction_type) {
|
||||||
|
if (schedule_scn > major_frozen_snapshot) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("schedule_scn of current major is bigger than broadcast scn, wait for next round loop", K(ret), K(schedule_scn), K(major_frozen_snapshot));
|
||||||
|
} else {
|
||||||
|
schedule_flag = true;
|
||||||
|
}
|
||||||
|
} else if (ObMediumCompactionInfo::MEDIUM_COMPACTION == compaction_type) {
|
||||||
|
if (schedule_scn > major_frozen_snapshot && ObMediumCompactionInfo::MAJOR_COMPACTION == medium_list.get_last_compaction_type()) {
|
||||||
|
ObTabletCompactionScnInfo ret_info;
|
||||||
|
if (OB_FAIL(get_status_from_inner_table(ls_id, tablet_id, ret_info))) {
|
||||||
|
LOG_WARN("failed to get status from inner tablet", K(ret), K(ls_id), K(tablet_id));
|
||||||
|
} else if (ret_info.could_schedule_next_round(medium_list.get_last_compaction_scn())) {
|
||||||
|
LOG_INFO("success to check RS major checksum validation finished", K(ret), K(ls_id), K(tablet_id));
|
||||||
|
schedule_flag = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
schedule_flag = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// does not read valid medium info, wait for next round scheduler loop
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObMediumCompactionScheduleFunc::read_medium_info_from_list(
|
int ObMediumCompactionScheduleFunc::read_medium_info_from_list(
|
||||||
const ObMediumCompactionInfoList &medium_list,
|
const ObMediumCompactionInfoList &medium_list,
|
||||||
const int64_t last_major_snapshot,
|
const int64_t last_major_snapshot,
|
||||||
const int64_t major_frozen_snapshot,
|
const int64_t major_frozen_snapshot,
|
||||||
|
ObMediumCompactionInfo::ObCompactionType &compaction_type,
|
||||||
int64_t &schedule_scn)
|
int64_t &schedule_scn)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -1276,6 +1306,7 @@ int ObMediumCompactionScheduleFunc::read_medium_info_from_list(
|
|||||||
if (info->is_medium_compaction()
|
if (info->is_medium_compaction()
|
||||||
|| info->medium_snapshot_ <= major_frozen_snapshot) {
|
|| info->medium_snapshot_ <= major_frozen_snapshot) {
|
||||||
schedule_scn = info->medium_snapshot_;
|
schedule_scn = info->medium_snapshot_;
|
||||||
|
compaction_type = (ObMediumCompactionInfo::ObCompactionType)info->compaction_type_;
|
||||||
}
|
}
|
||||||
break; // found one unfinish medium info, loop end
|
break; // found one unfinish medium info, loop end
|
||||||
}
|
}
|
||||||
|
|||||||
@ -51,10 +51,23 @@ public:
|
|||||||
bool &create_dag_flag,
|
bool &create_dag_flag,
|
||||||
const int64_t major_frozen_scn = 0,
|
const int64_t major_frozen_scn = 0,
|
||||||
const bool scheduler_called = false);
|
const bool scheduler_called = false);
|
||||||
|
/*
|
||||||
|
* see
|
||||||
|
* standby tenant should catch up broadcast scn when freeze info is recycled
|
||||||
|
*/
|
||||||
|
static int decide_standy_tenant_schedule(
|
||||||
|
const ObLSID &ls_id,
|
||||||
|
const ObTabletID &tablet_id,
|
||||||
|
const ObMediumCompactionInfo::ObCompactionType &compaction_type,
|
||||||
|
const int64_t schedule_scn,
|
||||||
|
const int64_t major_frozen_snapshot,
|
||||||
|
const ObMediumCompactionInfoList &medium_list,
|
||||||
|
bool &schedule_flag);
|
||||||
static int read_medium_info_from_list(
|
static int read_medium_info_from_list(
|
||||||
const ObMediumCompactionInfoList &medium_list,
|
const ObMediumCompactionInfoList &medium_list,
|
||||||
const int64_t major_frozen_snapshot,
|
const int64_t major_frozen_snapshot,
|
||||||
const int64_t last_major_snapshot,
|
const int64_t last_major_snapshot,
|
||||||
|
ObMediumCompactionInfo::ObCompactionType &compaction_type,
|
||||||
int64_t &schedule_scn);
|
int64_t &schedule_scn);
|
||||||
static int is_election_leader(const share::ObLSID &ls_id, bool &ls_election_leader);
|
static int is_election_leader(const share::ObLSID &ls_id, bool &ls_election_leader);
|
||||||
static int get_max_sync_medium_scn(
|
static int get_max_sync_medium_scn(
|
||||||
|
|||||||
@ -45,16 +45,6 @@ using namespace blocksstable;
|
|||||||
namespace compaction
|
namespace compaction
|
||||||
{
|
{
|
||||||
|
|
||||||
bool is_merge_dag(ObDagType::ObDagTypeEnum dag_type)
|
|
||||||
{
|
|
||||||
return dag_type == ObDagType::DAG_TYPE_MAJOR_MERGE
|
|
||||||
|| dag_type == ObDagType::DAG_TYPE_MERGE_EXECUTE
|
|
||||||
|| dag_type == ObDagType::DAG_TYPE_MINI_MERGE
|
|
||||||
|| dag_type == ObDagType::DAG_TYPE_TX_TABLE_MERGE
|
|
||||||
|| dag_type == ObDagType::DAG_TYPE_MDS_TABLE_MERGE;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ----------------------------------------------ObCompactionTimeGuard--------------------------------------------------
|
* ----------------------------------------------ObCompactionTimeGuard--------------------------------------------------
|
||||||
*/
|
*/
|
||||||
@ -809,7 +799,7 @@ int ObTabletMergePrepareTask::init()
|
|||||||
} else if (OB_ISNULL(dag_)) {
|
} else if (OB_ISNULL(dag_)) {
|
||||||
ret = OB_ERR_SYS;
|
ret = OB_ERR_SYS;
|
||||||
LOG_WARN("dag must not null", K(ret));
|
LOG_WARN("dag must not null", K(ret));
|
||||||
} else if (OB_UNLIKELY(!is_merge_dag(dag_->get_type()))) {
|
} else if (OB_UNLIKELY(!is_compaction_dag(dag_->get_type()))) {
|
||||||
ret = OB_ERR_SYS;
|
ret = OB_ERR_SYS;
|
||||||
LOG_ERROR("dag type not match", K(ret), KPC(dag_));
|
LOG_ERROR("dag type not match", K(ret), KPC(dag_));
|
||||||
} else {
|
} else {
|
||||||
@ -964,7 +954,7 @@ int ObTabletMergeFinishTask::init()
|
|||||||
} else if (OB_ISNULL(dag_)) {
|
} else if (OB_ISNULL(dag_)) {
|
||||||
ret = OB_ERR_SYS;
|
ret = OB_ERR_SYS;
|
||||||
LOG_WARN("dag must not null", K(ret));
|
LOG_WARN("dag must not null", K(ret));
|
||||||
} else if (!is_merge_dag(dag_->get_type())) {
|
} else if (!is_compaction_dag(dag_->get_type())) {
|
||||||
ret = OB_ERR_SYS;
|
ret = OB_ERR_SYS;
|
||||||
LOG_ERROR("dag type not match", K(ret), KPC(dag_));
|
LOG_ERROR("dag type not match", K(ret), KPC(dag_));
|
||||||
} else {
|
} else {
|
||||||
@ -1072,7 +1062,7 @@ int ObTabletMergeTask::generate_next_task(ObITask *&next_task)
|
|||||||
LOG_WARN("not init", K(ret));
|
LOG_WARN("not init", K(ret));
|
||||||
} else if (idx_ + 1 == ctx_->get_concurrent_cnt()) {
|
} else if (idx_ + 1 == ctx_->get_concurrent_cnt()) {
|
||||||
ret = OB_ITER_END;
|
ret = OB_ITER_END;
|
||||||
} else if (!is_merge_dag(dag_->get_type())) {
|
} else if (!is_compaction_dag(dag_->get_type())) {
|
||||||
ret = OB_ERR_SYS;
|
ret = OB_ERR_SYS;
|
||||||
LOG_ERROR("dag type not match", K(ret), KPC(dag_));
|
LOG_ERROR("dag type not match", K(ret), KPC(dag_));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -596,7 +596,7 @@ int ObTenantTabletScheduler::schedule_merge(const int64_t broadcast_version)
|
|||||||
LOG_WARN("failed to add progress", K(tmp_ret), K(broadcast_version));
|
LOG_WARN("failed to add progress", K(tmp_ret), K(broadcast_version));
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
obsys::ObRLockGuard frozen_version_guard(frozen_version_lock_);
|
obsys::ObWLockGuard frozen_version_guard(frozen_version_lock_);
|
||||||
frozen_version_ = broadcast_version;
|
frozen_version_ = broadcast_version;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user