keep all medium info for standby cluster
This commit is contained in:
@ -248,7 +248,7 @@ int ObAllVirtualSysStat::update_all_stats_(const int64_t tenant_id, ObStatEventS
|
||||
// it is ok to not have any records
|
||||
}
|
||||
|
||||
{
|
||||
if (!is_virtual_tenant_id(tenant_id)) { // skip virtual tenant
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
||||
if (tenant_config.is_valid()) {
|
||||
MTL_SWITCH(tenant_id) {
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
|
||||
#include "storage/compaction/ob_medium_compaction_mgr.h"
|
||||
#include "storage/compaction/ob_medium_compaction_func.h"
|
||||
|
||||
using namespace oceanbase;
|
||||
using namespace common;
|
||||
@ -199,7 +200,8 @@ int ObAllVirtualTabletCompactionInfo::process_curr_tenant(common::ObNewRow *&row
|
||||
cur_row_.cells_[i].set_int(medium_info_list->get_wait_check_medium_scn());
|
||||
break;
|
||||
case MAX_RECEIVED_SCN:
|
||||
if (OB_SUCCESS == tablet->get_max_sync_medium_scn(max_sync_medium_scn)) {
|
||||
if (OB_SUCCESS == compaction::ObMediumCompactionScheduleFunc::get_max_sync_medium_scn(
|
||||
*tablet, *medium_info_list, max_sync_medium_scn)) {
|
||||
cur_row_.cells_[i].set_int(max_sync_medium_scn);
|
||||
} else {
|
||||
cur_row_.cells_[i].set_int(-1);
|
||||
|
||||
@ -1152,7 +1152,10 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium(
|
||||
LOG_WARN("fail to fetch table store", K(ret));
|
||||
} else if (OB_ISNULL(last_major_sstable =
|
||||
table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true/*last*/))) {
|
||||
} else if (OB_FAIL(tablet.get_max_sync_medium_scn(max_sync_medium_scn))){
|
||||
} else if (OB_FAIL(tablet.read_medium_info_list(allocator, medium_list))) {
|
||||
LOG_WARN("failed to load medium info list", K(ret), K(tablet));
|
||||
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_max_sync_medium_scn(
|
||||
tablet, *medium_list, max_sync_medium_scn))){
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), K(ls_id), K(tablet_id));
|
||||
} else {
|
||||
// diagnose medium
|
||||
|
||||
@ -181,7 +181,7 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
}
|
||||
} else {
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MAJOR_COMPACTION;
|
||||
medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE;
|
||||
medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::TENANT_MAJOR;
|
||||
medium_info.medium_snapshot_ = freeze_info.freeze_version;
|
||||
schema_version = freeze_info.schema_version;
|
||||
}
|
||||
@ -275,6 +275,41 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::get_adaptive_reason(
|
||||
const int64_t schedule_major_snapshot,
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason &adaptive_merge_reason)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
ObTablet *tablet = tablet_handle_.get_obj();
|
||||
if (OB_FAIL(ObMediumCompactionScheduleFunc::get_max_sync_medium_scn(
|
||||
*tablet, *medium_info_list_, max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max received medium scn", KR(ret), KPC(this));
|
||||
} else if (schedule_major_snapshot > max_sync_medium_scn) {
|
||||
adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::TENANT_MAJOR;
|
||||
} else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(*tablet, adaptive_merge_reason))) {
|
||||
if (OB_HASH_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get meta merge priority", K(ret), KPC(this));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret)) {
|
||||
if (tablet->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret;
|
||||
LOG_INFO("errsim", K(ret), KPC(this));
|
||||
if (OB_FAIL(ret)) {
|
||||
FLOG_INFO("errsim EN_SCHEDULE_MEDIUM_COMPACTION", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
adaptive_merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::LOAD_DATA_SCENE;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
const int64_t schedule_major_snapshot,
|
||||
ObTenantTabletScheduler::ObScheduleStatistics &schedule_stat)
|
||||
@ -284,8 +319,6 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
ObTabletCompactionScnInfo ret_info;
|
||||
// check last medium type, select inner table for last major
|
||||
bool schedule_medium_flag = false;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
const bool is_major = 0 != schedule_major_snapshot;
|
||||
ObITable *last_major = nullptr;
|
||||
ObTablet *tablet = nullptr;
|
||||
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
|
||||
@ -305,38 +338,15 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
} else if (OB_ISNULL(medium_info_list_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("medium info list is unexpected null", K(ret), KPC(this), KPC_(medium_info_list));
|
||||
} else if (!medium_info_list_->could_schedule_next_round()) { // check serialized list
|
||||
} else if (!medium_info_list_->could_schedule_next_round()) { // check medium list int mds_table
|
||||
// do nothing
|
||||
} else if (OB_FAIL(tablet->get_max_sync_medium_scn(max_sync_medium_scn))) { // check info in memory
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), K(max_sync_medium_scn));
|
||||
} else if (nullptr != last_major && last_major->get_snapshot_version() < max_sync_medium_scn) {
|
||||
// there is unfinish medium info, schedule next time
|
||||
} else if (is_major && schedule_major_snapshot > max_sync_medium_scn) {
|
||||
schedule_medium_flag = true;
|
||||
} else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(*tablet, adaptive_merge_reason))) {
|
||||
if (OB_HASH_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get meta merge priority", K(ret), KPC(this));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else if (OB_FAIL(get_adaptive_reason(schedule_major_snapshot, adaptive_merge_reason))) {
|
||||
LOG_WARN("failed to get adaptive reason", KR(ret), K(schedule_major_snapshot));
|
||||
} else if (adaptive_merge_reason > ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE) {
|
||||
schedule_medium_flag = true;
|
||||
}
|
||||
LOG_TRACE("schedule next medium in primary cluster", K(ret), KPC(this), K(schedule_medium_flag),
|
||||
K(schedule_major_snapshot), K(adaptive_merge_reason), KPC(last_major), KPC_(medium_info_list), K(max_sync_medium_scn));
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret)) {
|
||||
if (tablet->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret;
|
||||
LOG_INFO("errsim", K(ret), KPC(this));
|
||||
if (OB_FAIL(ret)) {
|
||||
FLOG_INFO("set schedule medium with errsim", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
schedule_medium_flag = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
K(schedule_major_snapshot), K(adaptive_merge_reason), KPC(last_major), KPC_(medium_info_list));
|
||||
|
||||
if (OB_FAIL(ret) || !schedule_medium_flag) {
|
||||
} else if (ObMediumCompactionInfo::MAJOR_COMPACTION == medium_info_list_->get_last_compaction_type()) {
|
||||
@ -346,13 +356,13 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
LOG_WARN("failed to get status from inner tablet", K(ret), KPC(this));
|
||||
} else if (ret_info.could_schedule_next_round(medium_info_list_->get_last_compaction_scn())) {
|
||||
LOG_INFO("success to check RS major checksum validation finished", K(ret), KPC(this), K(ret_info));
|
||||
ret = decide_medium_snapshot(is_major);
|
||||
ret = decide_medium_snapshot(adaptive_merge_reason);
|
||||
} else {
|
||||
++schedule_stat.wait_rs_validate_cnt_;
|
||||
LOG_TRACE("cannot schedule next round merge now", K(ret), K(ret_info), KPC_(medium_info_list), KPC(tablet));
|
||||
LOG_TRACE("cannot schedule next round merge now", K(ret), K(ret_info), KPC_(medium_info_list), KPC(tablet), K(adaptive_merge_reason));
|
||||
}
|
||||
} else {
|
||||
ret = decide_medium_snapshot(is_major, adaptive_merge_reason);
|
||||
ret = decide_medium_snapshot(adaptive_merge_reason);
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -453,12 +463,12 @@ int ObMediumCompactionScheduleFunc::choose_medium_schema_version(
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
const bool is_major,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObTablet *tablet = nullptr;
|
||||
const bool is_major = (merge_reason == ObAdaptiveMergePolicy::TENANT_MAJOR);
|
||||
if (OB_UNLIKELY(!tablet_handle_.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_));
|
||||
@ -466,41 +476,32 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
tablet = tablet_handle_.get_obj();
|
||||
const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
uint64_t compat_version = 0;
|
||||
LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(tablet_id));
|
||||
if (OB_FAIL(tablet->get_max_sync_medium_scn(max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), KPC(this));
|
||||
ObMediumCompactionInfo medium_info;
|
||||
LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(tablet_id), K(merge_reason));
|
||||
if (OB_FAIL(ObMediumCompactionScheduleFunc::get_max_sync_medium_scn(
|
||||
*tablet, *medium_info_list_, max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium scn", KR(ret), KPC(this));
|
||||
} else if (OB_FAIL(ls_.add_dependent_medium_tablet(tablet_id))) { // add dependent_id in ObLSReservedSnapshotMgr
|
||||
LOG_WARN("failed to add dependent tablet", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
|
||||
LOG_WARN("fail to get data version", K(ret));
|
||||
} else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_1_0_0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid data version to schedule medium compaction", K(ret), K(compat_version));
|
||||
} else if (OB_FAIL(medium_info.init_data_version())) {
|
||||
LOG_WARN("fail to set data version", K(ret));
|
||||
} else {
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
ObGetMergeTablesResult result;
|
||||
ObMediumCompactionInfo medium_info;
|
||||
medium_info.data_version_ = compat_version;
|
||||
int64_t schema_version = 0;
|
||||
if (medium_info.data_version_ < DATA_VERSION_4_2_0_0) {
|
||||
medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION;
|
||||
} else {
|
||||
medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2;
|
||||
}
|
||||
|
||||
if (OB_FAIL(choose_medium_scn[is_major](*this, ls_, *tablet, merge_reason, allocator_, medium_info, result, schema_version))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
|
||||
}
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
// for standby -> primary cluster, new chosen medium_snapshot should larger than max_sync_medium_scn
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (is_major) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get multi_version_start", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot &&
|
||||
OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result))) {
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot
|
||||
&& OB_FAIL(choose_new_medium_snapshot(max_reserved_snapshot, medium_info, result))) {
|
||||
// chosen medium snapshot is far too old
|
||||
LOG_WARN("failed to choose new medium snapshot", KR(ret), K(max_reserved_snapshot), K(medium_info));
|
||||
} else if (OB_FAIL(choose_medium_schema_version(allocator_, medium_info.medium_snapshot_, *tablet, schema_version))) {
|
||||
@ -545,13 +546,10 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
if (OB_TABLE_IS_DELETED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("failed to prepare medium info", K(ret), K(result));
|
||||
LOG_WARN("failed to prepare medium info", K(ret), K(result), K(schema_version));
|
||||
}
|
||||
} else if (OB_FAIL(submit_medium_clog(medium_info))) {
|
||||
LOG_WARN("failed to submit medium clog and update inner table", K(ret), KPC(this));
|
||||
} else if (OB_TMP_FAIL(ls_.tablet_freeze(tablet_id, true/*is_sync*/))) {
|
||||
// need to freeze memtable with MediumCompactionInfo
|
||||
LOG_WARN("failed to freeze tablet", K(tmp_ret), KPC(this));
|
||||
}
|
||||
// delete tablet_id in ObLSReservedSnapshotMgr even if submit clog or update inner table failed
|
||||
if (OB_TMP_FAIL(ls_.del_dependent_medium_tablet(tablet_id))) {
|
||||
@ -1234,5 +1232,26 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::get_max_sync_medium_scn(
|
||||
const ObTablet &tablet,
|
||||
const ObMediumCompactionInfoList &medium_list,
|
||||
int64_t &max_sync_medium_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
max_sync_medium_scn = 0;
|
||||
int64_t max_sync_medium_scn_on_tablet = 0;
|
||||
int64_t max_sync_medium_scn_from_list = 0;
|
||||
if (OB_FAIL(tablet.get_max_sync_medium_scn(max_sync_medium_scn_on_tablet))) {
|
||||
LOG_WARN("failed to get max sync medium scn from tablet", KR(ret), K(tablet));
|
||||
} else if (OB_FAIL(medium_list.get_max_sync_medium_scn(max_sync_medium_scn_from_list))) {
|
||||
LOG_WARN("failed to get max sync medium scn from medium_list", KR(ret), K(medium_list));
|
||||
} else {
|
||||
max_sync_medium_scn = MAX(max_sync_medium_scn_on_tablet, max_sync_medium_scn_from_list);
|
||||
LOG_TRACE("get max sync medium scn", KR(ret), K(max_sync_medium_scn), K(max_sync_medium_scn_on_tablet),
|
||||
K(max_sync_medium_scn_from_list));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} //namespace compaction
|
||||
} // namespace oceanbase
|
||||
|
||||
@ -48,6 +48,10 @@ public:
|
||||
ObMediumCompactionInfo::ObCompactionType &compaction_type,
|
||||
int64_t &schedule_scn);
|
||||
static int get_palf_role(const share::ObLSID &ls_id, ObRole &role);
|
||||
static int get_max_sync_medium_scn(
|
||||
const ObTablet &tablet,
|
||||
const ObMediumCompactionInfoList &medium_list,
|
||||
int64_t &max_sync_medium_scn);
|
||||
static int get_table_schema_to_merge(
|
||||
ObMultiVersionSchemaService &schema_service,
|
||||
const ObTablet &tablet,
|
||||
@ -56,10 +60,8 @@ public:
|
||||
ObMediumCompactionInfo &medium_info);
|
||||
|
||||
int schedule_next_medium_for_leader(const int64_t major_snapshot, ObTenantTabletScheduler::ObScheduleStatistics &schedule_stat);
|
||||
|
||||
int decide_medium_snapshot(
|
||||
const bool is_major,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE);
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason);
|
||||
|
||||
int check_medium_finish(const ObLSLocality &ls_locality);
|
||||
|
||||
@ -149,6 +151,9 @@ protected:
|
||||
const ObTabletID &tablet_id,
|
||||
const int64_t schema_version,
|
||||
uint64_t &table_id);
|
||||
int get_adaptive_reason(
|
||||
const int64_t schedule_major_snapshot,
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason &adaptive_merge_reason);
|
||||
static const int64_t DEFAULT_SCHEDULE_MEDIUM_INTERVAL = 60L * 1000L * 1000L; // 60s
|
||||
static constexpr double SCHEDULE_RANGE_INC_ROW_COUNT_PERCENRAGE_THRESHOLD = 0.2;
|
||||
static const int64_t SCHEDULE_RANGE_ROW_COUNT_THRESHOLD = 1000 * 1000L; // 100w
|
||||
|
||||
@ -359,7 +359,7 @@ const char *ObMediumCompactionInfo::get_compaction_type_str(enum ObCompactionTyp
|
||||
}
|
||||
|
||||
ObMediumCompactionInfo::ObMediumCompactionInfo()
|
||||
: medium_compat_version_(MEDIUM_COMPAT_VERSION_V2),
|
||||
: medium_compat_version_(MEDIUM_COMPAT_VERSION_V3),
|
||||
compaction_type_(COMPACTION_TYPE_MAX),
|
||||
contain_parallel_range_(false),
|
||||
medium_merge_reason_(ObAdaptiveMergePolicy::NONE),
|
||||
@ -409,6 +409,28 @@ int ObMediumCompactionInfo::init(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionInfo::init_data_version()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t compat_version = 0;
|
||||
if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
|
||||
LOG_WARN("fail to get data version", K(ret));
|
||||
} else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_1_0_0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid data version to schedule medium compaction", K(ret), K(compat_version));
|
||||
} else {
|
||||
data_version_ = compat_version;
|
||||
if (compat_version < DATA_VERSION_4_2_0_0) {
|
||||
medium_compat_version_ = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION;
|
||||
} else if (compat_version < DATA_VERSION_4_2_1_0) {
|
||||
medium_compat_version_ = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2;
|
||||
} else {
|
||||
medium_compat_version_ = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V3;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObMediumCompactionInfo::is_valid() const
|
||||
{
|
||||
return COMPACTION_TYPE_MAX != compaction_type_
|
||||
@ -417,7 +439,7 @@ bool ObMediumCompactionInfo::is_valid() const
|
||||
&& storage_schema_.is_valid()
|
||||
&& parallel_merge_info_.is_valid()
|
||||
&& (MEDIUM_COMPAT_VERSION == medium_compat_version_
|
||||
|| (MEDIUM_COMPAT_VERSION_V2 == medium_compat_version_ && last_medium_snapshot_ != 0));
|
||||
|| (MEDIUM_COMPAT_VERSION_V2 <= medium_compat_version_ && last_medium_snapshot_ != 0));
|
||||
}
|
||||
|
||||
void ObMediumCompactionInfo::reset()
|
||||
@ -437,6 +459,19 @@ void ObMediumCompactionInfo::reset()
|
||||
parallel_merge_info_.destroy();
|
||||
}
|
||||
|
||||
bool ObMediumCompactionInfo::should_throw_for_standby_cluster() const
|
||||
{
|
||||
bool bret = false;
|
||||
if (medium_compat_version_ < MEDIUM_COMPAT_VERSION_V3) {
|
||||
// for old version medium, should throw if cluster_id is different
|
||||
bret = !cluster_id_equal() && is_medium_compaction();
|
||||
} else {
|
||||
// for new version medium, all medium should be kept
|
||||
bret = false;
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionInfo::gene_parallel_info(
|
||||
ObIAllocator &allocator,
|
||||
ObArrayArray<ObStoreRange> ¶l_range)
|
||||
@ -475,7 +510,7 @@ int ObMediumCompactionInfo::serialize(char *buf, const int64_t buf_len, int64_t
|
||||
OB_UNIS_ENCODE,
|
||||
parallel_merge_info_);
|
||||
}
|
||||
if (OB_SUCC(ret) && MEDIUM_COMPAT_VERSION_V2 == medium_compat_version_) {
|
||||
if (OB_SUCC(ret) && MEDIUM_COMPAT_VERSION_V2 <= medium_compat_version_) {
|
||||
LST_DO_CODE(
|
||||
OB_UNIS_ENCODE,
|
||||
last_medium_snapshot_);
|
||||
@ -512,7 +547,7 @@ int ObMediumCompactionInfo::deserialize(
|
||||
clear_parallel_range();
|
||||
LOG_DEBUG("ObMediumCompactionInfo::deserialize", K(ret), K(buf), K(data_len), K(pos));
|
||||
}
|
||||
if (OB_SUCC(ret) && MEDIUM_COMPAT_VERSION_V2 == medium_compat_version_) {
|
||||
if (OB_SUCC(ret) && MEDIUM_COMPAT_VERSION_V2 <= medium_compat_version_) {
|
||||
LST_DO_CODE(
|
||||
OB_UNIS_DECODE,
|
||||
last_medium_snapshot_);
|
||||
@ -534,7 +569,7 @@ int64_t ObMediumCompactionInfo::get_serialize_size() const
|
||||
if (contain_parallel_range_) {
|
||||
LST_DO_CODE(OB_UNIS_ADD_LEN, parallel_merge_info_);
|
||||
}
|
||||
if (MEDIUM_COMPAT_VERSION_V2 == medium_compat_version_) {
|
||||
if (MEDIUM_COMPAT_VERSION_V2 <= medium_compat_version_) {
|
||||
LST_DO_CODE(
|
||||
OB_UNIS_ADD_LEN,
|
||||
last_medium_snapshot_);
|
||||
|
||||
@ -180,6 +180,7 @@ public:
|
||||
|
||||
int assign(ObIAllocator &allocator, const ObMediumCompactionInfo &medium_info);
|
||||
int init(ObIAllocator &allocator, const ObMediumCompactionInfo &medium_info);
|
||||
int init_data_version();
|
||||
int gene_parallel_info(
|
||||
ObIAllocator &allocator,
|
||||
common::ObArrayArray<ObStoreRange> ¶l_range);
|
||||
@ -197,6 +198,7 @@ public:
|
||||
bool is_valid() const;
|
||||
bool from_cur_cluster() const { return cluster_id_ == GCONF.cluster_id && tenant_id_ == MTL_ID(); }
|
||||
bool cluster_id_equal() const { return cluster_id_ == GCONF.cluster_id; } // for compat
|
||||
bool should_throw_for_standby_cluster() const;
|
||||
// serialize & deserialize
|
||||
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const;
|
||||
int deserialize(
|
||||
@ -210,8 +212,8 @@ public:
|
||||
int64_t to_string(char* buf, const int64_t buf_len) const;
|
||||
public:
|
||||
static const int64_t MEDIUM_COMPAT_VERSION = 1;
|
||||
static const int64_t MEDIUM_COMPAT_VERSION_V2 = 2;
|
||||
|
||||
static const int64_t MEDIUM_COMPAT_VERSION_V2 = 2; // for add last_medium_snapshot_
|
||||
static const int64_t MEDIUM_COMPAT_VERSION_V3 = 3; // for stanby tenant, not throw medium info
|
||||
private:
|
||||
static const int32_t SCS_ONE_BIT = 1;
|
||||
static const int32_t SCS_RESERVED_BITS = 32;
|
||||
|
||||
@ -235,8 +235,7 @@ int ObTabletMediumCompactionInfoRecorder::inner_replay_clog(
|
||||
ObMediumCompactionInfo replay_medium_info;
|
||||
if (OB_FAIL(replay_medium_info.deserialize(tmp_allocator, buf, size, pos))) {
|
||||
LOG_WARN("failed to deserialize medium compaction info", K(ret));
|
||||
} else if (!replay_medium_info.cluster_id_equal()
|
||||
&& replay_medium_info.is_medium_compaction()) {
|
||||
} else if (replay_medium_info.should_throw_for_standby_cluster()) {
|
||||
// throw medium compaction clog from other cluster
|
||||
ret = OB_NO_NEED_UPDATE;
|
||||
} else { // new mds path
|
||||
@ -661,5 +660,20 @@ void ObMediumCompactionInfoList::gene_info(
|
||||
}
|
||||
}
|
||||
|
||||
int ObMediumCompactionInfoList::get_max_sync_medium_scn(int64_t &max_sync_medium_scn) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
max_sync_medium_scn = 0;
|
||||
if (OB_UNLIKELY(!is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("medium list is invalid", KR(ret), KPC(this));
|
||||
} else if (FALSE_IT(max_sync_medium_scn = get_last_compaction_scn())) {
|
||||
} else if (!medium_info_list_.is_empty()) {
|
||||
max_sync_medium_scn = MAX(max_sync_medium_scn,
|
||||
((ObMediumCompactionInfo *)medium_info_list_.get_last())->medium_snapshot_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} //namespace compaction
|
||||
} // namespace oceanbase
|
||||
|
||||
@ -131,6 +131,7 @@ public:
|
||||
{
|
||||
return extra_info_;
|
||||
}
|
||||
int get_max_sync_medium_scn(int64_t &max_received_medium_scn) const;
|
||||
|
||||
// serialize & deserialize
|
||||
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const;
|
||||
|
||||
@ -30,6 +30,7 @@
|
||||
#include "observer/omt/ob_tenant_config_mgr.h"
|
||||
#include "share/scn.h"
|
||||
#include "storage/compaction/ob_tenant_tablet_scheduler.h"
|
||||
#include "storage/compaction/ob_medium_compaction_func.h"
|
||||
|
||||
using namespace oceanbase;
|
||||
using namespace common;
|
||||
@ -430,7 +431,12 @@ int ObPartitionMergePolicy::get_boundary_snapshot_version(
|
||||
}
|
||||
|
||||
int64_t max_medium_scn = 0;
|
||||
if (OB_FAIL(tablet.get_max_sync_medium_scn(max_medium_scn))) {
|
||||
ObArenaAllocator allocator("GetMediumList", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
|
||||
const compaction::ObMediumCompactionInfoList *medium_list = nullptr;
|
||||
if (OB_FAIL(tablet.read_medium_info_list(allocator, medium_list))) {
|
||||
LOG_WARN("failed to read medium info list", K(ret), KPC(medium_list));
|
||||
} else if (OB_FAIL(compaction::ObMediumCompactionScheduleFunc::get_max_sync_medium_scn(
|
||||
tablet, *medium_list, max_medium_scn))) {
|
||||
LOG_WARN("failed to get max medium snapshot", K(ret), K(tablet));
|
||||
} else {
|
||||
min_snapshot = max(min_snapshot, max_medium_scn);
|
||||
@ -941,9 +947,8 @@ int ObPartitionMergePolicy::check_need_medium_merge(
|
||||
|
||||
if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) {
|
||||
LOG_WARN("fail to fetch table store", K(ret));
|
||||
} else if (FALSE_IT(last_major =
|
||||
} else if (OB_ISNULL(last_major =
|
||||
table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true/*last*/))) {
|
||||
} else if (nullptr == last_major) {
|
||||
// no major, no medium
|
||||
} else {
|
||||
need_merge = last_major->get_snapshot_version() < medium_snapshot;
|
||||
@ -986,8 +991,7 @@ int ObPartitionMergePolicy::check_need_medium_merge(
|
||||
tablet_id,
|
||||
ObSuspectInfoType::SUSPECT_CANT_MAJOR_MERGE,
|
||||
medium_snapshot, static_cast<int64_t>(is_tablet_data_status_complete),
|
||||
static_cast<int64_t>(need_force_freeze),
|
||||
tablet.get_tablet_meta().max_serialized_medium_scn_))) {
|
||||
static_cast<int64_t>(need_force_freeze)))) {
|
||||
LOG_WARN("failed to add suspect info", K(tmp_ret));
|
||||
}
|
||||
}
|
||||
@ -1269,7 +1273,8 @@ const char * ObAdaptiveMergeReasonStr[] = {
|
||||
"LOAD_DATA_SCENE",
|
||||
"TOMBSTONE_SCENE",
|
||||
"INEFFICIENT_QUERY",
|
||||
"FREQUENT_WRITE"
|
||||
"FREQUENT_WRITE",
|
||||
"TENANT_MAJOR"
|
||||
};
|
||||
|
||||
const char* ObAdaptiveMergePolicy::merge_reason_to_str(const int64_t merge_reason)
|
||||
|
||||
@ -218,6 +218,7 @@ public:
|
||||
TOMBSTONE_SCENE = 2,
|
||||
INEFFICIENT_QUERY = 3,
|
||||
FREQUENT_WRITE = 4,
|
||||
TENANT_MAJOR = 5,
|
||||
INVALID_REASON
|
||||
};
|
||||
|
||||
|
||||
@ -1183,7 +1183,11 @@ int ObTabletMergeCtx::try_swap_tablet_handle()
|
||||
const ObTabletMapKey key(param_.ls_id_, param_.tablet_id_);
|
||||
if (OB_FAIL(MTL(ObTenantMetaMemMgr*)->get_tablet_with_allocator(
|
||||
WashTabletPriority::WTP_LOW, key, allocator_, tablet_handle_, true/*force_alloc_new*/))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_TABLET_NOT_EXIST;
|
||||
} else {
|
||||
LOG_WARN("failed to get alloc tablet handle", K(ret), K(key));
|
||||
}
|
||||
} else if (OB_FAIL(inner_init_for_medium())) {
|
||||
LOG_WARN("failed to init for medium", K(ret), K(param_));
|
||||
} else if (OB_FAIL(tablet_handle_.get_obj()->clear_memtables_on_table_store())) {
|
||||
|
||||
@ -668,7 +668,8 @@ const char *ObProhibitScheduleMediumMap::ProhibitFlagStr[] = {
|
||||
"MEDIUM",
|
||||
};
|
||||
ObProhibitScheduleMediumMap::ObProhibitScheduleMediumMap()
|
||||
: lock_(),
|
||||
: transfer_flag_cnt_(0),
|
||||
lock_(),
|
||||
ls_id_map_()
|
||||
{
|
||||
STATIC_ASSERT(static_cast<int64_t>(FLAG_MAX) == ARRAYSIZEOF(ProhibitFlagStr), "flag str len is mismatch");
|
||||
@ -739,6 +740,7 @@ int ObProhibitScheduleMediumMap::clear_flag(const ObLSID &ls_id, const ProhibitF
|
||||
|
||||
void ObProhibitScheduleMediumMap::destroy()
|
||||
{
|
||||
transfer_flag_cnt_ = 0;
|
||||
ls_id_map_.destroy();
|
||||
}
|
||||
|
||||
@ -922,10 +924,11 @@ int ObTenantTabletScheduler::schedule_tablet_meta_major_merge(
|
||||
if (OB_FAIL(tablet_handle.get_obj()->fetch_table_store(table_store_wrapper))) {
|
||||
LOG_WARN("fail to fetch table store", K(ret));
|
||||
} else if (FALSE_IT(last_major = table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true/*last*/))) {
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_max_sync_medium_scn(max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium snapshot", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->read_medium_info_list(allocator, medium_list))) {
|
||||
LOG_WARN("failed to read medium info list", K(ret), K(tablet_id));
|
||||
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_max_sync_medium_scn(
|
||||
*tablet_handle.get_obj(), *medium_list, max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium snapshot", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if ((nullptr != medium_list && medium_list->size() > 0)
|
||||
|| nullptr == last_major
|
||||
|| max_sync_medium_scn > last_major->get_snapshot_version()) {
|
||||
|
||||
@ -1077,7 +1077,7 @@ int ObStorageHATabletsBuilder::hold_local_reuse_sstable_(
|
||||
} else if (OB_FAIL(hold_local_complete_tablet_sstable_(tablet, tables_handle))) {
|
||||
LOG_WARN("failed to hold local complete tablet sstable", K(ret), KP(tablet));
|
||||
} else {
|
||||
if (!storage_schema.is_valid()
|
||||
if (!storage_schema.is_inited()
|
||||
|| storage_schema.compare_schema_newer(*tablet_storage_schema)) {
|
||||
storage_schema.reset();
|
||||
if (OB_FAIL(storage_schema.init(allocator, *tablet_storage_schema))) {
|
||||
|
||||
Reference in New Issue
Block a user