fix bk update report_scn & use cluster_version to compat
This commit is contained in:
@ -209,8 +209,7 @@ int ObTabletMetaTableCompactionOperator::batch_update_unequal_report_scn_tablet(
|
|||||||
} else if (OB_FAIL(append_tablet_id_array(tenant_id, input_tablet_id_array, start_idx, end_idx, sql))) {
|
} else if (OB_FAIL(append_tablet_id_array(tenant_id, input_tablet_id_array, start_idx, end_idx, sql))) {
|
||||||
LOG_WARN("fail to append tablet id array", KR(ret), K(tenant_id),
|
LOG_WARN("fail to append tablet id array", KR(ret), K(tenant_id),
|
||||||
K(input_tablet_id_array.count()), K(start_idx), K(end_idx));
|
K(input_tablet_id_array.count()), K(start_idx), K(end_idx));
|
||||||
} else if (OB_FAIL(sql.append_fmt(") AND compaction_scn = '%lu' AND report_scn < '%lu'",
|
} else if (OB_FAIL(sql.append_fmt(") AND report_scn < '%lu'", major_frozen_scn))) {
|
||||||
major_frozen_scn, major_frozen_scn))) {
|
|
||||||
LOG_WARN("failed to assign sql", K(ret), K(tenant_id), K(start_idx));
|
LOG_WARN("failed to assign sql", K(ret), K(tenant_id), K(start_idx));
|
||||||
} else {
|
} else {
|
||||||
SMART_VAR(ObISQLClient::ReadResult, result) {
|
SMART_VAR(ObISQLClient::ReadResult, result) {
|
||||||
@ -303,21 +302,23 @@ int ObTabletMetaTableCompactionOperator::inner_batch_update_unequal_report_scn_t
|
|||||||
int64_t affected_rows = 0;
|
int64_t affected_rows = 0;
|
||||||
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||||
ObSqlString sql;
|
ObSqlString sql;
|
||||||
if (OB_FAIL(sql.append_fmt("UPDATE %s SET report_scn='%lu' WHERE tenant_id='%lu' AND ls_id='%ld' AND tablet_id IN (",
|
if (OB_FAIL(sql.append_fmt(
|
||||||
OB_ALL_TABLET_META_TABLE_TNAME,
|
"UPDATE %s t1 SET report_scn=if(compaction_scn>'%lu' ,'%lu', compaction_scn) WHERE "
|
||||||
major_frozen_scn,
|
"tenant_id='%lu' AND ls_id='%ld' AND tablet_id IN (",
|
||||||
tenant_id,
|
OB_ALL_TABLET_META_TABLE_TNAME, major_frozen_scn, major_frozen_scn,
|
||||||
ls_id.id()))) {
|
tenant_id, ls_id.id()))) {
|
||||||
LOG_WARN("failed to append fmt", K(ret), K(tenant_id), K(ls_id));
|
LOG_WARN("failed to append fmt", K(ret), K(tenant_id), K(ls_id));
|
||||||
} else if (OB_FAIL(append_tablet_id_array(tenant_id, unequal_tablet_id_array, 0, unequal_tablet_id_array.count(), sql))) {
|
} else if (OB_FAIL(append_tablet_id_array(tenant_id, unequal_tablet_id_array,
|
||||||
|
0, unequal_tablet_id_array.count(),
|
||||||
|
sql))) {
|
||||||
LOG_WARN("fail to append tablet id array", KR(ret), K(tenant_id), K(unequal_tablet_id_array));
|
LOG_WARN("fail to append tablet id array", KR(ret), K(tenant_id), K(unequal_tablet_id_array));
|
||||||
} else if (OB_FAIL(sql.append_fmt(") AND compaction_scn >= '%lu' AND report_scn <'%lu'",
|
} else if (OB_FAIL(sql.append_fmt(") AND report_scn <'%lu'", major_frozen_scn))) {
|
||||||
major_frozen_scn, major_frozen_scn))) {
|
|
||||||
LOG_WARN("failed to assign sql", K(ret), K(tenant_id), K(ls_id));
|
LOG_WARN("failed to assign sql", K(ret), K(tenant_id), K(ls_id));
|
||||||
} else if (OB_FAIL(GCTX.sql_proxy_->write(meta_tenant_id, sql.ptr(), affected_rows))) {
|
} else if (OB_FAIL(GCTX.sql_proxy_->write(meta_tenant_id, sql.ptr(),
|
||||||
|
affected_rows))) {
|
||||||
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(sql));
|
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(sql));
|
||||||
} else if (affected_rows > 0) {
|
} else if (affected_rows > 0) {
|
||||||
LOG_INFO("success to update unequal report_scn", K(ret), K(tenant_id), K(ls_id), K(unequal_tablet_id_array.count()));
|
LOG_INFO("success to update unequal report_scn", K(ret), K(sql), K(tenant_id), K(ls_id), K(unequal_tablet_id_array.count()));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -360,7 +361,7 @@ int ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
|||||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, compat_version))) {
|
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, compat_version))) {
|
||||||
LOG_WARN("fail to get data version", KR(ret), K(tenant_id));
|
LOG_WARN("fail to get data version", KR(ret), K(tenant_id));
|
||||||
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
||||||
// do nothing
|
// do nothing until schema upgrade
|
||||||
} else {
|
} else {
|
||||||
ObMySQLTransaction trans;
|
ObMySQLTransaction trans;
|
||||||
int64_t affected_rows = 0;
|
int64_t affected_rows = 0;
|
||||||
@ -409,7 +410,7 @@ int ObTabletMetaTableCompactionOperator::batch_update_status(
|
|||||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, compat_version))) {
|
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, compat_version))) {
|
||||||
LOG_WARN("fail to get data version", KR(ret), K(tenant_id));
|
LOG_WARN("fail to get data version", KR(ret), K(tenant_id));
|
||||||
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
||||||
// do nothing
|
// do nothing until schema upgrade
|
||||||
} else {
|
} else {
|
||||||
ObMySQLTransaction trans;
|
ObMySQLTransaction trans;
|
||||||
int64_t affected_rows = 0;
|
int64_t affected_rows = 0;
|
||||||
|
|||||||
@ -279,6 +279,9 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser
|
|||||||
if (0 == table_store.get_major_sstables().count()) {
|
if (0 == table_store.get_major_sstables().count()) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("major sstable should not be empty", K(ret), K(tablet_));
|
LOG_WARN("major sstable should not be empty", K(ret), K(tablet_));
|
||||||
|
} else if (0 == ls_.get_min_reserved_snapshot()) {
|
||||||
|
ret = OB_NO_NEED_MERGE;
|
||||||
|
// not sync reserved snapshot yet, should not schedule now
|
||||||
} else if (FALSE_IT(max_merged_snapshot = table_store.get_major_sstables().get_boundary_table(true/*last*/)->get_snapshot_version())) {
|
} else if (FALSE_IT(max_merged_snapshot = table_store.get_major_sstables().get_boundary_table(true/*last*/)->get_snapshot_version())) {
|
||||||
} else if (OB_FAIL(MTL(ObTenantFreezeInfoMgr*)->get_min_reserved_snapshot(
|
} else if (OB_FAIL(MTL(ObTenantFreezeInfoMgr*)->get_min_reserved_snapshot(
|
||||||
tablet_.get_tablet_meta().tablet_id_, max_merged_snapshot, min_reserved_snapshot))) {
|
tablet_.get_tablet_meta().tablet_id_, max_merged_snapshot, min_reserved_snapshot))) {
|
||||||
|
|||||||
@ -22,7 +22,7 @@ namespace compaction
|
|||||||
{
|
{
|
||||||
|
|
||||||
const char *ObServerCompactionEvent::ObCompactionEventStr[] = {
|
const char *ObServerCompactionEvent::ObCompactionEventStr[] = {
|
||||||
"GET_COMPACTION_INFO",
|
"RECEIVE_BROADCAST_SCN",
|
||||||
"GET_FREEZE_INFO",
|
"GET_FREEZE_INFO",
|
||||||
"WEAK_READ_TS_READY",
|
"WEAK_READ_TS_READY",
|
||||||
"SCHEDULER_LOOP",
|
"SCHEDULER_LOOP",
|
||||||
@ -34,7 +34,7 @@ const char *ObServerCompactionEvent::get_comp_event_str(enum ObCompactionEvent e
|
|||||||
{
|
{
|
||||||
STATIC_ASSERT(static_cast<int64_t>(COMPACTION_EVENT_MAX) == ARRAYSIZEOF(ObCompactionEventStr), "events str len is mismatch");
|
STATIC_ASSERT(static_cast<int64_t>(COMPACTION_EVENT_MAX) == ARRAYSIZEOF(ObCompactionEventStr), "events str len is mismatch");
|
||||||
const char *str = "";
|
const char *str = "";
|
||||||
if (event >= COMPACTION_EVENT_MAX || event < GET_COMPACTION_INFO) {
|
if (event >= COMPACTION_EVENT_MAX || event < RECEIVE_BROADCAST_SCN) {
|
||||||
str = "invalid_type";
|
str = "invalid_type";
|
||||||
} else {
|
} else {
|
||||||
str = ObCompactionEventStr[event];
|
str = ObCompactionEventStr[event];
|
||||||
|
|||||||
@ -25,7 +25,7 @@ struct ObServerCompactionEvent
|
|||||||
public:
|
public:
|
||||||
enum ObCompactionEvent
|
enum ObCompactionEvent
|
||||||
{
|
{
|
||||||
GET_COMPACTION_INFO = 0,
|
RECEIVE_BROADCAST_SCN = 0,
|
||||||
GET_FREEZE_INFO,
|
GET_FREEZE_INFO,
|
||||||
WEAK_READ_TS_READY,
|
WEAK_READ_TS_READY,
|
||||||
SCHEDULER_LOOP,
|
SCHEDULER_LOOP,
|
||||||
|
|||||||
@ -523,7 +523,7 @@ int ObTenantTabletScheduler::schedule_merge(const int64_t broadcast_version)
|
|||||||
MTL_ID(),
|
MTL_ID(),
|
||||||
MAJOR_MERGE,
|
MAJOR_MERGE,
|
||||||
broadcast_version,
|
broadcast_version,
|
||||||
ObServerCompactionEvent::GET_COMPACTION_INFO,
|
ObServerCompactionEvent::RECEIVE_BROADCAST_SCN,
|
||||||
schedule_stats_.start_timestamp_,
|
schedule_stats_.start_timestamp_,
|
||||||
"last_merged_version",
|
"last_merged_version",
|
||||||
merged_version_);
|
merged_version_);
|
||||||
@ -1197,6 +1197,7 @@ int ObTenantTabletScheduler::get_min_dependent_schema_version(int64_t &min_schem
|
|||||||
int ObTenantTabletScheduler::update_report_scn_as_ls_leader(ObLS &ls)
|
int ObTenantTabletScheduler::update_report_scn_as_ls_leader(ObLS &ls)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const ObLSID &ls_id = ls.get_ls_id();
|
||||||
ObRole role = INVALID_ROLE;
|
ObRole role = INVALID_ROLE;
|
||||||
const int64_t major_merged_scn = get_inner_table_merged_scn();
|
const int64_t major_merged_scn = get_inner_table_merged_scn();
|
||||||
bool need_merge = false;
|
bool need_merge = false;
|
||||||
@ -1204,10 +1205,11 @@ int ObTenantTabletScheduler::update_report_scn_as_ls_leader(ObLS &ls)
|
|||||||
LOG_WARN("failed to check ls state", K(ret), K(ls_id));
|
LOG_WARN("failed to check ls state", K(ret), K(ls_id));
|
||||||
} else if (!need_merge) {
|
} else if (!need_merge) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (OB_FAIL(ls.get_ls_role(role))) {
|
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_palf_role(ls_id, role))) {
|
||||||
LOG_WARN("failed to get ls role", K(ret), K(ls));
|
if (OB_LS_NOT_EXIST != ret) {
|
||||||
} else if (LEADER == role) {
|
LOG_WARN("failed to get palf handle role", K(ret), K(ls_id));
|
||||||
const ObLSID &ls_id = ls.get_ls_id();
|
}
|
||||||
|
} else if (is_leader_by_election(role)) {
|
||||||
ObSEArray<ObTabletID, 200> tablet_id_array;
|
ObSEArray<ObTabletID, 200> tablet_id_array;
|
||||||
if (OB_FAIL(ls.get_tablet_svr()->get_all_tablet_ids(true/*except_ls_inner_tablet*/, tablet_id_array))) {
|
if (OB_FAIL(ls.get_tablet_svr()->get_all_tablet_ids(true/*except_ls_inner_tablet*/, tablet_id_array))) {
|
||||||
LOG_WARN("failed to get tablet id", K(ret), K(ls_id));
|
LOG_WARN("failed to get tablet id", K(ret), K(ls_id));
|
||||||
|
|||||||
@ -2474,11 +2474,8 @@ int ObTablet::get_kept_multi_version_start(
|
|||||||
LOG_WARN("failed to get min medium snapshot", K(ret), K(tablet));
|
LOG_WARN("failed to get min medium snapshot", K(ret), K(tablet));
|
||||||
}
|
}
|
||||||
|
|
||||||
// for compat, if cluster not upgrade to 4.1, should not consider ls.get_min_reserved_snapshot()
|
// for compat, if receive ls_reserved_snapshot clog, should consider ls.get_min_reserved_snapshot()
|
||||||
uint64_t compat_version = 0;
|
if (ls.get_min_reserved_snapshot() > 0) {
|
||||||
if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
|
|
||||||
LOG_WARN("fail to get data version", K(tmp_ret));
|
|
||||||
} else if (compat_version >= DATA_VERSION_4_1_0_0 && ls.get_min_reserved_snapshot() > 0) {
|
|
||||||
ls_min_reserved_snapshot = ls.get_min_reserved_snapshot();
|
ls_min_reserved_snapshot = ls.get_min_reserved_snapshot();
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
|
|||||||
Reference in New Issue
Block a user