fix not schedule after migrate
This commit is contained in:
@ -191,7 +191,7 @@ int ObAllVirtualTabletCompactionInfo::process_curr_tenant(common::ObNewRow *&row
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case SERIALIZE_SCN_LIST:
|
case SERIALIZE_SCN_LIST:
|
||||||
if (medium_info_list.size() > 0) {
|
if (medium_info_list.size() > 0 || compaction::ObMediumCompactionInfo::MAJOR_COMPACTION == medium_info_list.get_last_compaction_type()) {
|
||||||
int64_t pos = 0;
|
int64_t pos = 0;
|
||||||
medium_info_list.gene_info(medium_info_buf_, OB_MAX_VARCHAR_LENGTH, pos);
|
medium_info_list.gene_info(medium_info_buf_, OB_MAX_VARCHAR_LENGTH, pos);
|
||||||
cur_row_.cells_[i].set_varchar(medium_info_buf_);
|
cur_row_.cells_[i].set_varchar(medium_info_buf_);
|
||||||
|
|||||||
@ -331,8 +331,9 @@ ObMediumCompactionInfoList::ObMediumCompactionInfoList()
|
|||||||
allocator_(nullptr),
|
allocator_(nullptr),
|
||||||
compat_(MEDIUM_LIST_VERSION),
|
compat_(MEDIUM_LIST_VERSION),
|
||||||
last_compaction_type_(0),
|
last_compaction_type_(0),
|
||||||
|
wait_check_flag_(0),
|
||||||
reserved_(0),
|
reserved_(0),
|
||||||
wait_check_medium_scn_(0)
|
last_medium_scn_(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -355,13 +356,14 @@ int ObMediumCompactionInfoList::init(common::ObIAllocator &allocator)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
// MINI: dump_list is from memtable
|
// MINI: other_list is from memtable
|
||||||
|
// MIGRATE: other_list is from migrate_src
|
||||||
// finish_medium_scn = last_major_scn
|
// finish_medium_scn = last_major_scn
|
||||||
// init_by_ha = true: need force set wait_check = finish_scn
|
// init_by_ha = true: need force set wait_check = finish_scn
|
||||||
// if wait_check=0 after restore, report_scn don't will be updated by leader
|
// if wait_check=0 after restore, report_scn don't will be updated by leader
|
||||||
int ObMediumCompactionInfoList::init(common::ObIAllocator &allocator,
|
int ObMediumCompactionInfoList::init(common::ObIAllocator &allocator,
|
||||||
const ObMediumCompactionInfoList *old_list,
|
const ObMediumCompactionInfoList *old_list,
|
||||||
const ObMediumCompactionInfoList *dump_list,
|
const ObMediumCompactionInfoList *other_list,
|
||||||
const int64_t finish_medium_scn/*= 0*/,
|
const int64_t finish_medium_scn/*= 0*/,
|
||||||
const ObMergeType merge_type/*= MERGE_TYPE_MAX*/)
|
const ObMergeType merge_type/*= MERGE_TYPE_MAX*/)
|
||||||
{
|
{
|
||||||
@ -369,22 +371,33 @@ int ObMediumCompactionInfoList::init(common::ObIAllocator &allocator,
|
|||||||
if (IS_INIT) {
|
if (IS_INIT) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
LOG_WARN("init twice", K(ret));
|
LOG_WARN("init twice", K(ret));
|
||||||
|
} else if (OB_UNLIKELY(nullptr == old_list && nullptr != other_list)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", K(ret), K(merge_type), KPC(old_list), KPC(other_list));
|
||||||
} else if (FALSE_IT(allocator_ = &allocator)) {
|
} else if (FALSE_IT(allocator_ = &allocator)) {
|
||||||
} else if (nullptr != old_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, *old_list))) {
|
} else if (nullptr != old_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, *old_list))) {
|
||||||
LOG_WARN("failed to deep copy list", K(ret), K(old_list));
|
LOG_WARN("failed to deep copy list", K(ret), K(old_list));
|
||||||
} else if (nullptr != dump_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, *dump_list))) {
|
} else if (nullptr != other_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, *other_list))) {
|
||||||
LOG_WARN("failed to deep copy list", K(ret), K(dump_list));
|
LOG_WARN("failed to deep copy list", K(ret), K(other_list));
|
||||||
} else if (is_major_merge_type(merge_type)) { // update list after major_type_merge
|
} else if (is_major_merge_type(merge_type)) { // update list after major_type_merge
|
||||||
last_compaction_type_ = is_major_merge(merge_type) ? ObMediumCompactionInfo::MAJOR_COMPACTION : ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
last_compaction_type_ = is_major_merge(merge_type) ? ObMediumCompactionInfo::MAJOR_COMPACTION : ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||||
wait_check_medium_scn_ = finish_medium_scn;
|
last_medium_scn_ = finish_medium_scn;
|
||||||
} else if (OB_NOT_NULL(old_list)) { // update list with old_list
|
wait_check_flag_ = true;
|
||||||
last_compaction_type_ = old_list->last_compaction_type_;
|
} else { // update info with newest list
|
||||||
wait_check_medium_scn_ = old_list->get_wait_check_medium_scn();
|
const ObMediumCompactionInfoList *update_list = old_list;
|
||||||
|
if (OB_NOT_NULL(other_list)
|
||||||
|
&& OB_NOT_NULL(old_list)
|
||||||
|
&& other_list->last_medium_scn_ > old_list->last_medium_scn_) { // compare get newer list
|
||||||
|
update_list = other_list;
|
||||||
|
}
|
||||||
|
if (OB_NOT_NULL(update_list)) {
|
||||||
|
set_basic_info(*update_list);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
compat_ = MEDIUM_LIST_VERSION;
|
compat_ = MEDIUM_LIST_VERSION;
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
if (medium_info_list_.get_size() > 0 || wait_check_medium_scn_ > 0) {
|
if (medium_info_list_.get_size() > 0 || wait_check_flag_) {
|
||||||
LOG_INFO("success to init list", K(ret), KPC(this), KPC(old_list), K(finish_medium_scn),
|
LOG_INFO("success to init list", K(ret), KPC(this), KPC(old_list), K(finish_medium_scn),
|
||||||
"merge_type", merge_type_to_str(merge_type));
|
"merge_type", merge_type_to_str(merge_type));
|
||||||
}
|
}
|
||||||
@ -407,11 +420,12 @@ int ObMediumCompactionInfoList::init_after_check_finish(
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ret), K(old_list));
|
LOG_WARN("invalid argument", K(ret), K(old_list));
|
||||||
} else if (FALSE_IT(allocator_ = &allocator)) {
|
} else if (FALSE_IT(allocator_ = &allocator)) {
|
||||||
} else if (OB_FAIL(append_list_with_deep_copy(wait_check_medium_scn_, old_list))) {
|
} else if (OB_FAIL(append_list_with_deep_copy(last_medium_scn_, old_list))) {
|
||||||
LOG_WARN("failed to deep copy list", K(ret), K(wait_check_medium_scn_));
|
LOG_WARN("failed to deep copy list", K(ret), K(last_medium_scn_));
|
||||||
} else {
|
} else {
|
||||||
last_compaction_type_ = old_list.last_compaction_type_;
|
last_compaction_type_ = old_list.last_compaction_type_;
|
||||||
wait_check_medium_scn_ = 0; // update after check finished, should reset wait_check_medium_scn
|
last_medium_scn_ = old_list.last_medium_scn_;
|
||||||
|
wait_check_flag_ = false; // update after check finished, should reset wait_check_medium_scn
|
||||||
compat_ = MEDIUM_LIST_VERSION;
|
compat_ = MEDIUM_LIST_VERSION;
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
LOG_INFO("success to init list", K(ret), KPC(this), K(old_list));
|
LOG_INFO("success to init list", K(ret), KPC(this), K(old_list));
|
||||||
@ -438,7 +452,7 @@ void ObMediumCompactionInfoList::reset()
|
|||||||
}
|
}
|
||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
info_ = 0;
|
info_ = 0;
|
||||||
wait_check_medium_scn_ = 0;
|
last_medium_scn_ = 0;
|
||||||
allocator_ = nullptr;
|
allocator_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -549,7 +563,7 @@ int ObMediumCompactionInfoList::serialize(char *buf, const int64_t buf_len, int6
|
|||||||
LOG_WARN("medium info list is invalid", K(ret), KPC(this));
|
LOG_WARN("medium info list is invalid", K(ret), KPC(this));
|
||||||
} else if (OB_FAIL(serialization::encode_vi64(buf, buf_len, new_pos, info_))) {
|
} else if (OB_FAIL(serialization::encode_vi64(buf, buf_len, new_pos, info_))) {
|
||||||
STORAGE_LOG(WARN, "failed to serialize info", K(ret), K(buf_len), K(pos));
|
STORAGE_LOG(WARN, "failed to serialize info", K(ret), K(buf_len), K(pos));
|
||||||
} else if (OB_FAIL(serialization::encode_vi64(buf, buf_len, new_pos, wait_check_medium_scn_))) {
|
} else if (OB_FAIL(serialization::encode_vi64(buf, buf_len, new_pos, last_medium_scn_))) {
|
||||||
STORAGE_LOG(WARN, "failed to serialize wait_check_medium_scn", K(ret), K(buf_len), K(pos));
|
STORAGE_LOG(WARN, "failed to serialize wait_check_medium_scn", K(ret), K(buf_len), K(pos));
|
||||||
} else if (OB_FAIL(serialization::encode_vi64(buf, buf_len, new_pos, medium_info_list_.get_size()))) {
|
} else if (OB_FAIL(serialization::encode_vi64(buf, buf_len, new_pos, medium_info_list_.get_size()))) {
|
||||||
LOG_WARN("failed to serialize medium status", K(ret), K(buf_len));
|
LOG_WARN("failed to serialize medium status", K(ret), K(buf_len));
|
||||||
@ -594,7 +608,7 @@ int ObMediumCompactionInfoList::deserialize(
|
|||||||
LOG_WARN("list count should be zero in old version medium list", K(ret), K(list_count));
|
LOG_WARN("list count should be zero in old version medium list", K(ret), K(list_count));
|
||||||
}
|
}
|
||||||
} else if (FALSE_IT(info_ = deserialize_info)) {
|
} else if (FALSE_IT(info_ = deserialize_info)) {
|
||||||
} else if (OB_FAIL(serialization::decode_vi64(buf, data_len, new_pos, &wait_check_medium_scn_))) {
|
} else if (OB_FAIL(serialization::decode_vi64(buf, data_len, new_pos, &last_medium_scn_))) {
|
||||||
LOG_WARN("failed to deserialize wait_check_medium_scn", K(ret), K(data_len));
|
LOG_WARN("failed to deserialize wait_check_medium_scn", K(ret), K(data_len));
|
||||||
} else if (OB_FAIL(serialization::decode_vi64(buf, data_len, new_pos, &list_count))) {
|
} else if (OB_FAIL(serialization::decode_vi64(buf, data_len, new_pos, &list_count))) {
|
||||||
LOG_WARN("failed to deserialize list count", K(ret), K(data_len));
|
LOG_WARN("failed to deserialize list count", K(ret), K(data_len));
|
||||||
@ -642,7 +656,7 @@ int64_t ObMediumCompactionInfoList::get_serialize_size() const
|
|||||||
{
|
{
|
||||||
int64_t len = 0;
|
int64_t len = 0;
|
||||||
len += serialization::encoded_length_vi64(info_);
|
len += serialization::encoded_length_vi64(info_);
|
||||||
len += serialization::encoded_length_vi64(wait_check_medium_scn_);
|
len += serialization::encoded_length_vi64(last_medium_scn_);
|
||||||
len += serialization::encoded_length_vi64(medium_info_list_.get_size());
|
len += serialization::encoded_length_vi64(medium_info_list_.get_size());
|
||||||
DLIST_FOREACH_NORET(info, medium_info_list_){
|
DLIST_FOREACH_NORET(info, medium_info_list_){
|
||||||
len += static_cast<const ObMediumCompactionInfo *>(info)->get_serialize_size();
|
len += static_cast<const ObMediumCompactionInfo *>(info)->get_serialize_size();
|
||||||
@ -657,7 +671,7 @@ void ObMediumCompactionInfoList::gene_info(
|
|||||||
// do nothing
|
// do nothing
|
||||||
} else {
|
} else {
|
||||||
J_OBJ_START();
|
J_OBJ_START();
|
||||||
J_KV("size", size(), K_(info), K_(wait_check_medium_scn));
|
J_KV("size", size(), K_(last_compaction_type), K_(wait_check_flag), K_(last_medium_scn));
|
||||||
J_COMMA();
|
J_COMMA();
|
||||||
BUF_PRINTF("info_list");
|
BUF_PRINTF("info_list");
|
||||||
J_COLON();
|
J_COLON();
|
||||||
|
|||||||
@ -113,12 +113,12 @@ public:
|
|||||||
int add_medium_compaction_info(const ObMediumCompactionInfo &input_info);
|
int add_medium_compaction_info(const ObMediumCompactionInfo &input_info);
|
||||||
|
|
||||||
OB_INLINE const MediumInfoList &get_list() const { return medium_info_list_; }
|
OB_INLINE const MediumInfoList &get_list() const { return medium_info_list_; }
|
||||||
OB_INLINE int64_t get_wait_check_medium_scn() const { return wait_check_medium_scn_; }
|
OB_INLINE int64_t get_wait_check_medium_scn() const { return wait_check_flag_ ? last_medium_scn_ : 0; }
|
||||||
OB_INLINE bool need_check_finish() const { return 0 != wait_check_medium_scn_; }
|
OB_INLINE bool need_check_finish() const { return wait_check_flag_; }
|
||||||
// check status on serialized medium list
|
// check status on serialized medium list
|
||||||
OB_INLINE bool could_schedule_next_round() const
|
OB_INLINE bool could_schedule_next_round() const
|
||||||
{
|
{
|
||||||
return 0 == wait_check_medium_scn_ && medium_info_list_.is_empty();
|
return !wait_check_flag_ && medium_info_list_.is_empty();
|
||||||
}
|
}
|
||||||
OB_INLINE ObMediumCompactionInfo::ObCompactionType get_last_compaction_type() const
|
OB_INLINE ObMediumCompactionInfo::ObCompactionType get_last_compaction_type() const
|
||||||
{
|
{
|
||||||
@ -156,7 +156,7 @@ public:
|
|||||||
|
|
||||||
void gene_info(char* buf, const int64_t buf_len, int64_t &pos) const;
|
void gene_info(char* buf, const int64_t buf_len, int64_t &pos) const;
|
||||||
|
|
||||||
TO_STRING_KV(K_(is_inited), K_(info), K_(last_compaction_type), K_(wait_check_medium_scn),
|
TO_STRING_KV(K_(is_inited), K_(info), K_(last_compaction_type), K_(wait_check_flag), K_(last_medium_scn),
|
||||||
"list_size", size(), K_(medium_info_list));
|
"list_size", size(), K_(medium_info_list));
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -164,7 +164,7 @@ private:
|
|||||||
OB_INLINE bool inner_is_valid() const
|
OB_INLINE bool inner_is_valid() const
|
||||||
{
|
{
|
||||||
return last_compaction_type_ < ObMediumCompactionInfo::COMPACTION_TYPE_MAX
|
return last_compaction_type_ < ObMediumCompactionInfo::COMPACTION_TYPE_MAX
|
||||||
&& wait_check_medium_scn_ >= 0 && size() >= 0;
|
&& last_medium_scn_ >= 0 && size() >= 0;
|
||||||
}
|
}
|
||||||
OB_INLINE int append_list_with_deep_copy(
|
OB_INLINE int append_list_with_deep_copy(
|
||||||
const int64_t finish_scn,
|
const int64_t finish_scn,
|
||||||
@ -180,11 +180,17 @@ private:
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
int inner_deep_copy_node(const ObMediumCompactionInfo &medium_info);
|
int inner_deep_copy_node(const ObMediumCompactionInfo &medium_info);
|
||||||
|
OB_INLINE void set_basic_info(const ObMediumCompactionInfoList &input_list)
|
||||||
|
{
|
||||||
|
last_compaction_type_ = input_list.last_compaction_type_;
|
||||||
|
last_medium_scn_ = input_list.last_medium_scn_;
|
||||||
|
wait_check_flag_ = input_list.wait_check_flag_;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static const int64_t MEDIUM_LIST_VERSION = 1;
|
static const int64_t MEDIUM_LIST_VERSION = 1;
|
||||||
static const int64_t MAX_SERIALIZE_SIZE = 2;
|
static const int64_t MAX_SERIALIZE_SIZE = 2;
|
||||||
static const int32_t MEDIUM_LIST_INFO_RESERVED_BITS = 52;
|
static const int32_t MEDIUM_LIST_INFO_RESERVED_BITS = 51;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
@ -192,14 +198,15 @@ private:
|
|||||||
|
|
||||||
// need serialize
|
// need serialize
|
||||||
union {
|
union {
|
||||||
int64_t info_;
|
uint64_t info_;
|
||||||
struct {
|
struct {
|
||||||
int64_t compat_ : 8;
|
uint64_t compat_ : 8;
|
||||||
int64_t last_compaction_type_ : 4; // check inner_table when last_compaction is major
|
uint64_t last_compaction_type_ : 4; // check inner_table when last_compaction is major
|
||||||
int64_t reserved_ : MEDIUM_LIST_INFO_RESERVED_BITS;
|
uint64_t wait_check_flag_ : 1; // true: need check finish, false: don't need check
|
||||||
|
uint64_t reserved_ : MEDIUM_LIST_INFO_RESERVED_BITS;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
int64_t wait_check_medium_scn_;
|
int64_t last_medium_scn_;
|
||||||
|
|
||||||
MediumInfoList medium_info_list_;
|
MediumInfoList medium_info_list_;
|
||||||
};
|
};
|
||||||
|
|||||||
@ -1127,7 +1127,6 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
|
|||||||
ObTimeUtility::fast_current_time(),
|
ObTimeUtility::fast_current_time(),
|
||||||
"schedule_stats",
|
"schedule_stats",
|
||||||
schedule_stats_);
|
schedule_stats_);
|
||||||
schedule_stats_.clear_tablet_cnt();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1156,6 +1155,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
|
|||||||
|
|
||||||
LOG_INFO("finish schedule all tablet merge", K(merge_version), K(schedule_stats_), K(tenant_merge_finish),
|
LOG_INFO("finish schedule all tablet merge", K(merge_version), K(schedule_stats_), K(tenant_merge_finish),
|
||||||
K(merged_version_));
|
K(merged_version_));
|
||||||
|
schedule_stats_.clear_tablet_cnt();
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user