fix medium defense & fix not schedule in standby cluster
This commit is contained in:
parent
976456c113
commit
6510ee3da8
@ -711,6 +711,7 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info(
|
||||
ObTablet *tablet = nullptr;
|
||||
ObTableStoreIterator table_iter;
|
||||
medium_info.cluster_id_ = GCONF.cluster_id;
|
||||
medium_info.tenant_id_ = MTL_ID();
|
||||
// get table schema
|
||||
if (OB_UNLIKELY(!tablet_handle_.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
@ -353,6 +353,7 @@ ObMediumCompactionInfo::ObMediumCompactionInfo()
|
||||
contain_parallel_range_(false),
|
||||
medium_merge_reason_(ObAdaptiveMergePolicy::NONE),
|
||||
is_schema_changed_(false),
|
||||
tenant_id_(0),
|
||||
reserved_(0),
|
||||
cluster_id_(0),
|
||||
data_version_(0),
|
||||
@ -416,6 +417,7 @@ void ObMediumCompactionInfo::reset()
|
||||
contain_parallel_range_ = false;
|
||||
medium_merge_reason_ = ObAdaptiveMergePolicy::NONE;
|
||||
is_schema_changed_ = false;
|
||||
tenant_id_ = 0;
|
||||
cluster_id_ = 0;
|
||||
medium_snapshot_ = 0;
|
||||
last_medium_snapshot_ = 0;
|
||||
@ -545,7 +547,7 @@ int64_t ObMediumCompactionInfo::to_string(char* buf, const int64_t buf_len) cons
|
||||
J_KV(K_(cluster_id), K_(medium_compat_version), K_(data_version),
|
||||
"compaction_type", ObMediumCompactionInfo::get_compaction_type_str((ObCompactionType)compaction_type_),
|
||||
"medium_merge_reason", ObAdaptiveMergePolicy::merge_reason_to_str(medium_merge_reason_),
|
||||
K_(is_schema_changed), K_(cluster_id), K_(medium_snapshot), K_(last_medium_snapshot), K_(storage_schema),
|
||||
K_(is_schema_changed), K_(tenant_id), K_(cluster_id), K_(medium_snapshot), K_(last_medium_snapshot), K_(storage_schema),
|
||||
K_(contain_parallel_range), K_(parallel_merge_info));
|
||||
J_OBJ_END();
|
||||
}
|
||||
|
@ -195,7 +195,8 @@ public:
|
||||
}
|
||||
void reset();
|
||||
bool is_valid() const;
|
||||
bool from_cur_cluster() const { return cluster_id_ == GCONF.cluster_id; }
|
||||
bool from_cur_cluster() const { return cluster_id_ == GCONF.cluster_id && tenant_id_ == MTL_ID(); }
|
||||
bool cluster_id_equal() const { return cluster_id_ == GCONF.cluster_id; } // for compat
|
||||
// serialize & deserialize
|
||||
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const;
|
||||
int deserialize(
|
||||
@ -213,7 +214,7 @@ public:
|
||||
|
||||
private:
|
||||
static const int32_t SCS_ONE_BIT = 1;
|
||||
static const int32_t SCS_RESERVED_BITS = 48;
|
||||
static const int32_t SCS_RESERVED_BITS = 32;
|
||||
|
||||
public:
|
||||
union {
|
||||
@ -224,6 +225,7 @@ public:
|
||||
uint64_t contain_parallel_range_ : SCS_ONE_BIT;
|
||||
uint64_t medium_merge_reason_ : 8;
|
||||
uint64_t is_schema_changed_ : SCS_ONE_BIT;
|
||||
uint64_t tenant_id_ : 16; // record tenant_id of ls primary_leader
|
||||
uint64_t reserved_ : SCS_RESERVED_BITS;
|
||||
};
|
||||
};
|
||||
|
@ -232,9 +232,10 @@ int ObTabletMediumCompactionInfoRecorder::inner_replay_clog(
|
||||
ObTabletHandle tmp_tablet_handle;
|
||||
if (OB_FAIL(replay_medium_info.deserialize(tmp_allocator, buf, size, pos))) {
|
||||
LOG_WARN("failed to deserialize medium compaction info", K(ret));
|
||||
} else if (!replay_medium_info.from_cur_cluster()
|
||||
} else if (!replay_medium_info.cluster_id_equal()
|
||||
&& replay_medium_info.is_medium_compaction()) {
|
||||
// throw medium compaction clog from other cluster
|
||||
ret = OB_NO_NEED_UPDATE;
|
||||
} else { // new mds path
|
||||
ObTabletMediumClogReplayExecutor replay_executor(replay_medium_info);
|
||||
if (OB_FAIL(replay_executor.init(scn))) {
|
||||
@ -246,7 +247,7 @@ int ObTabletMediumCompactionInfoRecorder::inner_replay_clog(
|
||||
LOG_WARN("failed to replay medium info", K(ret), K(replay_medium_info));
|
||||
}
|
||||
} else {
|
||||
FLOG_INFO("success to save medium info", K(ret), K_(tablet_id), K_(ls_id), K(replay_medium_info), K(max_saved_version_));
|
||||
FLOG_INFO("success to save medium info", K(ret), K_(tablet_id), K_(ls_id), K(scn), K(replay_medium_info), K(max_saved_version_));
|
||||
}
|
||||
}
|
||||
|
||||
@ -671,5 +672,58 @@ void ObMediumCompactionInfoList::gene_info(
|
||||
}
|
||||
}
|
||||
|
||||
int ObMediumCompactionInfoList::check_medium_info_and_last_major(
|
||||
const ObMediumCompactionInfo &medium_info,
|
||||
const ObITable *last_major_sstable,
|
||||
const bool force_check)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (nullptr != last_major_sstable
|
||||
&& ObMediumCompactionInfo::MEIDUM_COMPAT_VERSION_V2 == medium_info.medium_compat_version_
|
||||
&& medium_info.medium_snapshot_ > last_major_sstable->get_snapshot_version()) {
|
||||
if (medium_info.from_cur_cluster()) {
|
||||
if (OB_UNLIKELY(medium_info.last_medium_snapshot_ != last_major_sstable->get_snapshot_version())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("last medium snapshot in medium info is not equal to last "
|
||||
"major sstable, medium info may lost",
|
||||
KR(ret), K(medium_info), K(last_major_sstable));
|
||||
}
|
||||
} else { // check next freeze info in inner_table & medium_info
|
||||
const int64_t last_major_sstable_snapshot = last_major_sstable->get_snapshot_version();
|
||||
ObTenantFreezeInfoMgr::FreezeInfo freeze_info;
|
||||
if (OB_FAIL(MTL_CALL_FREEZE_INFO_MGR(
|
||||
get_freeze_info_behind_snapshot_version,
|
||||
last_major_sstable_snapshot, freeze_info))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get freeze info", K(ret), K(last_major_sstable_snapshot));
|
||||
} else if (force_check) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_WARN("next freeze info is not exist yet, need to check after refresh freeze info",
|
||||
KR(ret), K(medium_info), KPC(last_major_sstable));
|
||||
} // if force_check = false, not return errno; check next time
|
||||
} else if (OB_UNLIKELY(freeze_info.freeze_version < medium_info.medium_snapshot_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("next freeze info is not equal to last major sstable, medium info may lost",
|
||||
KR(ret), "freeze_version", freeze_info.freeze_version, K(medium_info), KPC(last_major_sstable));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
const ObMediumCompactionInfo * ObMediumCompactionInfoList::get_next_schedule_medium_info(const int64_t last_major_snapshot) const
|
||||
{
|
||||
const ObMediumCompactionInfo *ret_val = nullptr;
|
||||
const ObMediumCompactionInfo *info = medium_info_list_.get_first();
|
||||
while (nullptr != info && info != medium_info_list_.get_header()) {
|
||||
// get next schedule medium info
|
||||
if (info->medium_snapshot_ > last_major_snapshot) {
|
||||
ret_val = info;
|
||||
}
|
||||
info = info->get_next();
|
||||
}
|
||||
return ret_val;
|
||||
}
|
||||
|
||||
} //namespace compaction
|
||||
} // namespace oceanbase
|
||||
|
@ -47,6 +47,7 @@ public:
|
||||
int submit_medium_compaction_info(ObMediumCompactionInfo &medium_info, ObIAllocator &allocator);
|
||||
// follower
|
||||
int replay_medium_compaction_log(const share::SCN &scn, const char *buf, const int64_t size, int64_t &pos);
|
||||
INHERIT_TO_STRING_KV("ObIStorageClogRecorder", ObIStorageClogRecorder, K_(ignore_medium), K_(ls_id), K_(tablet_id));
|
||||
private:
|
||||
virtual int inner_replay_clog(
|
||||
const int64_t update_version,
|
||||
@ -118,6 +119,7 @@ public:
|
||||
{
|
||||
return !wait_check_flag_ && medium_info_list_.is_empty();
|
||||
}
|
||||
const ObMediumCompactionInfo *get_next_schedule_medium_info(const int64_t last_major_snapshot) const;
|
||||
OB_INLINE ObMediumCompactionInfo::ObCompactionType get_last_compaction_type() const
|
||||
{
|
||||
return (ObMediumCompactionInfo::ObCompactionType)last_compaction_type_;
|
||||
@ -141,7 +143,10 @@ public:
|
||||
int64_t get_serialize_size() const;
|
||||
|
||||
void gene_info(char* buf, const int64_t buf_len, int64_t &pos) const;
|
||||
|
||||
static int check_medium_info_and_last_major(
|
||||
const ObMediumCompactionInfo &medium_info,
|
||||
const ObITable *last_major_sstable,
|
||||
const bool force_check);
|
||||
TO_STRING_KV(K_(is_inited), K_(info), K_(last_compaction_type), K_(wait_check_flag), K_(last_medium_scn),
|
||||
"list_size", size(), K_(medium_info_list));
|
||||
|
||||
|
@ -738,7 +738,8 @@ int ObTabletMergeCtx::init_get_medium_compaction_info(
|
||||
LOG_ERROR("multi version data is discarded, should not compaction now", K(ret), K(param_), K(medium_snapshot));
|
||||
}
|
||||
}
|
||||
if (FAILEDx(check_medium_info_and_last_major(medium_info, get_merge_table_result))) {
|
||||
if (FAILEDx(ObMediumCompactionInfoList::check_medium_info_and_last_major(
|
||||
medium_info, get_merge_table_result.handle_.get_table(0), true/*force_check*/))) {
|
||||
LOG_WARN("failed to check medium info and last major sstable", KR(ret), K(medium_info), K(get_merge_table_result));
|
||||
} else {
|
||||
schema_ctx_.schema_version_ = medium_info.storage_schema_.schema_version_;
|
||||
@ -748,45 +749,6 @@ int ObTabletMergeCtx::init_get_medium_compaction_info(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMergeCtx::check_medium_info_and_last_major(
|
||||
const ObMediumCompactionInfo &medium_info,
|
||||
const ObGetMergeTablesResult &get_merge_table_result) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (ObMediumCompactionInfo::MEIDUM_COMPAT_VERSION_V2 == medium_info.medium_compat_version_) {
|
||||
if (medium_info.from_cur_cluster()) {
|
||||
if (OB_UNLIKELY(medium_info.last_medium_snapshot_
|
||||
!= get_merge_table_result.handle_.get_table(0)->get_snapshot_version())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("last medium snapshot in medium info is not equal to last "
|
||||
"major sstable, medium info may lost",
|
||||
KR(ret), K(medium_info), K(get_merge_table_result));
|
||||
}
|
||||
} else { // check next freeze info in inner_table & medium_info
|
||||
const int64_t last_major_sstable_snapshot =
|
||||
get_merge_table_result.handle_.get_table(0)->get_snapshot_version();
|
||||
ObTenantFreezeInfoMgr::FreezeInfo freeze_info;
|
||||
if (OB_FAIL(MTL_CALL_FREEZE_INFO_MGR(
|
||||
get_freeze_info_behind_snapshot_version,
|
||||
last_major_sstable_snapshot, freeze_info))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get freeze info", K(ret),
|
||||
K(last_major_sstable_snapshot), KPC(this));
|
||||
} else {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_WARN("next freeze info is not exist yet, need to check after refresh freeze info",
|
||||
KR(ret), K(medium_info), K(get_merge_table_result));
|
||||
}
|
||||
} else if (OB_UNLIKELY(freeze_info.freeze_version != medium_info.medium_snapshot_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("next freeze info is not equal to last major sstable, medium info may lost",
|
||||
KR(ret), "freeze_version", freeze_info.freeze_version, K(medium_info), K(get_merge_table_result));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMergeCtx::inner_init_for_mini(bool &skip_rest_operation)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -194,11 +194,6 @@ struct ObTabletMergeCtx
|
||||
scn_range_.end_scn_.get_val_for_tx() : sstable_version_range_.snapshot_version_;
|
||||
}
|
||||
int get_merge_tables(ObGetMergeTablesResult &get_merge_table_result);
|
||||
int check_medium_info_and_last_major(
|
||||
const ObMediumCompactionInfo &medium_info,
|
||||
const ObGetMergeTablesResult &get_merge_table_result) const;
|
||||
|
||||
typedef common::ObSEArray<ObGetMergeTablesResult, ObPartitionMergePolicy::OB_MINOR_PARALLEL_INFO_ARRAY_SIZE> MinorParallelResultArray;
|
||||
static const int64_t LARGE_VOLUME_DATA_ROW_COUNT_THREASHOLD = 1000L * 1000L; // 100w
|
||||
static const int64_t LARGE_VOLUME_DATA_MACRO_COUNT_THREASHOLD = 300L;
|
||||
// 1. init in dag
|
||||
|
@ -260,7 +260,7 @@ int ObLSReservedSnapshotMgr::inner_replay_clog(
|
||||
UNUSEDx(scn, buf, size, pos);
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(inner_update_reserved_snapshot(update_version))) {
|
||||
LOG_WARN("failed to update reserved_snapshot", K(ret), K(update_version));
|
||||
LOG_WARN("failed to update reserved_snapshot", K(ret), K(scn), K(update_version));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -217,9 +217,13 @@ int ObIStorageClogRecorder::replay_clog(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (update_version <= ATOMIC_LOAD(&max_saved_version_)) {
|
||||
LOG_INFO("skip clog with smaller version", K(update_version), K(max_saved_version_));
|
||||
LOG_INFO("skip clog with smaller version", K(update_version), K(max_saved_version_), KPC(this));
|
||||
} else if (OB_FAIL(inner_replay_clog(update_version, scn, buf, size, pos))) {
|
||||
LOG_WARN("fail to replay clog", K(ret), KPC(this));
|
||||
if (OB_NO_NEED_UPDATE == ret) { // not update max_saved_version_
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to replay clog", K(ret), KPC(this));
|
||||
}
|
||||
} else {
|
||||
ATOMIC_STORE(&max_saved_version_, update_version);
|
||||
LOG_DEBUG("success to replay clog", K(ret), KPC(this), K(max_saved_version_));
|
||||
|
@ -74,7 +74,7 @@ public:
|
||||
ObIStorageClogRecorder(const ObIStorageClogRecorder&) = delete;
|
||||
ObIStorageClogRecorder& operator=(const ObIStorageClogRecorder&) = delete;
|
||||
|
||||
TO_STRING_KV(K(max_saved_version_), K(clog_scn_), KP(log_handler_));
|
||||
VIRTUAL_TO_STRING_KV(K(max_saved_version_), K(clog_scn_), KP(log_handler_));
|
||||
protected:
|
||||
// follower, check update version
|
||||
int replay_clog(
|
||||
|
@ -65,7 +65,7 @@ public:
|
||||
|
||||
ObStorageSchemaRecorder(const ObStorageSchemaRecorder&) = delete;
|
||||
ObStorageSchemaRecorder& operator=(const ObStorageSchemaRecorder&) = delete;
|
||||
TO_STRING_KV(K_(is_inited), K_(ls_id), K_(tablet_id));
|
||||
INHERIT_TO_STRING_KV("ObIStorageClogRecorder", ObIStorageClogRecorder, K_(ls_id), K_(tablet_id));
|
||||
|
||||
private:
|
||||
virtual int inner_replay_clog(
|
||||
|
@ -3073,7 +3073,7 @@ int ObTablet::replay_medium_compaction_clog(
|
||||
} else if (OB_UNLIKELY(buf_size <= pos || pos < 0 || buf_size <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(buf_size), K(pos));
|
||||
} else if (tablet_meta_.tablet_id_.is_special_merge_tablet()) {
|
||||
} else if (tablet_meta_.tablet_id_.is_ls_inner_tablet()) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_memtable_mgr(memtable_mgr))) {
|
||||
LOG_WARN("failed to get memtable mgr", K(ret));
|
||||
@ -4459,7 +4459,18 @@ int ObTablet::check_medium_list() const
|
||||
LOG_WARN("medium list is invalid for last major sstable", K(ret), K(medium_info_list), KPC(last_major));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && !medium_info_list.is_empty()) {
|
||||
const ObMediumCompactionInfo *next_schedule_info = medium_info_list.get_next_schedule_medium_info(last_major->get_snapshot_version());
|
||||
if (nullptr != next_schedule_info
|
||||
&& OB_FAIL(ObMediumCompactionInfoList::check_medium_info_and_last_major(
|
||||
*next_schedule_info, last_major, false/*force_check*/))) {
|
||||
LOG_WARN("failed to check medium info and last major", KR(ret), K(medium_info_list), KPC(last_major));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("skip check medium list for non empty ha_status", KR(ret),
|
||||
"tablet_id", tablet_meta_.tablet_id_, K(tablet_meta_.ha_status_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -560,8 +560,21 @@ int ObAdminParserLogEntry::parse_reserved_snapshot_log_()
|
||||
|
||||
int ObAdminParserLogEntry::parse_medium_log_()
|
||||
{
|
||||
//not supported so far, just reserved
|
||||
int ret = OB_NOT_SUPPORTED;
|
||||
int ret = OB_SUCCESS;
|
||||
ObArenaAllocator allocator;
|
||||
ObTabletID tablet_id;
|
||||
int64_t medium_snapshot = 0;
|
||||
compaction::ObMediumCompactionInfo medium_info;
|
||||
ObStorageSchema storage_schema;
|
||||
if (OB_FAIL(tablet_id.deserialize(buf_, buf_len_, pos_))) {
|
||||
LOG_WARN("fail to deserialize tablet id", K(ret));
|
||||
} else if (OB_FAIL(serialization::decode_i64(buf_, buf_len_, pos_, &medium_snapshot))) {
|
||||
LOG_WARN("fail to deserialize medium_snapshot", K(ret));
|
||||
} else if (OB_FAIL(medium_info.deserialize(allocator, buf_, buf_len_, pos_))) {
|
||||
LOG_WARN("fail to deserialize medium info", K(ret));
|
||||
} else {
|
||||
fprintf(stdout, " ###<MediumCompactionLog>: tablet_id:%ld, medium_info: %s\n", tablet_id.id(), to_cstring(medium_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user