use medium in memtable to schedule

This commit is contained in:
obdev
2023-01-28 18:00:20 +08:00
committed by ob-robot
parent 555fb0026b
commit 0a9f32a5d1
13 changed files with 247 additions and 175 deletions

View File

@ -21,14 +21,15 @@ const char * ObMergeTypeStr[] = {
"MAJOR_MERGE",
"MEDIUM_MERGE",
"DDL_KV_MERGE",
"BACKFILL_TX_MERGE"
"BACKFILL_TX_MERGE",
"EMPTY_MERGE_TYPE"
};
const char *merge_type_to_str(const ObMergeType &merge_type)
{
STATIC_ASSERT(static_cast<int64_t>(MERGE_TYPE_MAX) == ARRAYSIZEOF(ObMergeTypeStr), "merge type str len is mismatch");
STATIC_ASSERT(static_cast<int64_t>(MERGE_TYPE_MAX + 1) == ARRAYSIZEOF(ObMergeTypeStr), "merge type str len is mismatch");
const char *str = "";
if (merge_type >= MERGE_TYPE_MAX || merge_type <= INVALID_MERGE_TYPE) {
if (merge_type > MERGE_TYPE_MAX || merge_type <= INVALID_MERGE_TYPE) {
str = "invalid_merge_type";
} else {
str = ObMergeTypeStr[merge_type];

View File

@ -262,7 +262,7 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
if (medium_info.medium_snapshot_ == tablet_.get_snapshot_version() // no uncommitted sstable
&& weak_read_ts.get_val_for_tx() <= max_reserved_snapshot
&& weak_read_ts.get_val_for_tx() + DEFAULT_SCHEDULE_MEDIUM_INTERVAL < ObTimeUtility::current_time_ns()) {
medium_info.medium_snapshot_ = weak_read_ts.get_val_for_tx();
medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, weak_read_ts.get_val_for_tx());
LOG_INFO("use weak_read_ts to schedule medium", K(ret), KPC(this),
K(medium_info), K(max_reserved_snapshot), K(weak_read_ts));
} else {
@ -562,29 +562,14 @@ int ObMediumCompactionScheduleFunc::get_table_schema_to_merge(
LOG_WARN("table is deleted", K(ret), K(table_id));
}
}
if (OB_SUCC(ret)) {
int64_t max_schema_version = 0;
if (OB_FAIL(tablet_.get_max_sync_storage_schema_version(max_schema_version))) {
LOG_WARN("failed to get max sync storage schema version", K(ret), KPC(this));
} else if (max_schema_version < table_schema->get_schema_version()) {
// need sync schema clog
if (OB_FAIL(tablet_.try_update_storage_schema(
table_id,
table_schema->get_schema_version(),
allocator_,
DEFAULT_SYNC_SCHEMA_CLOG_TIMEOUT))) {
LOG_WARN("failed to sync schema clog", K(ret), KPC(this), KPC(table_schema));
}
}
if (FAILEDx(medium_info.storage_schema_.init(
allocator_,
*table_schema,
tablet_.get_tablet_meta().compat_mode_))) {
LOG_WARN("failed to init storage schema", K(ret), K(schema_version));
} else {
FLOG_INFO("get schema to merge", K(table_id), K(schema_version), K(save_schema_version),
K(*reinterpret_cast<const ObPrintableTableSchema*>(table_schema)));
}
if (FAILEDx(medium_info.storage_schema_.init(
allocator_,
*table_schema,
tablet_.get_tablet_meta().compat_mode_))) {
LOG_WARN("failed to init storage schema", K(ret), K(schema_version));
} else {
FLOG_INFO("get schema to merge", K(table_id), K(schema_version), K(save_schema_version),
K(*reinterpret_cast<const ObPrintableTableSchema*>(table_schema)));
}
return ret;
}
@ -749,7 +734,8 @@ int ObMediumCompactionScheduleFunc::check_medium_finish()
int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
ObLS &ls,
ObTablet &tablet,
const int64_t input_major_snapshot)
const int64_t input_major_snapshot,
const bool schedule_with_memtable)
{
int ret = OB_SUCCESS;
#ifdef ERRSIM
@ -763,12 +749,19 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
const ObLSID &ls_id = ls.get_ls_id();
int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot;
const int64_t schedule_scn = tablet.get_medium_compaction_info_list().get_schedule_scn(major_frozen_snapshot);
ObMediumCompactionInfo::ObCompactionType compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX;
int64_t schedule_scn = 0;
bool need_merge = false;
LOG_DEBUG("schedule_tablet_medium_merge", K(schedule_scn), K(ls_id), K(tablet_id));
if (schedule_scn > 0) {
if (OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, need_merge))) {
(void)tablet.get_medium_compaction_info_list().get_schedule_scn(major_frozen_snapshot, schedule_scn, compaction_type);
LOG_DEBUG("schedule_tablet_medium_merge", K(schedule_scn), K(major_frozen_snapshot), K(schedule_with_memtable), K(ls_id), K(tablet_id));
if (0 == schedule_scn
&& schedule_with_memtable
&& OB_FAIL(get_schedule_medium_from_memtable(tablet, major_frozen_snapshot, schedule_scn, compaction_type))) {
LOG_WARN("failed to get schedule medium scn from memtables", K(ret));
} else if (schedule_scn > 0) {
if (OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, compaction_type))) {
LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn));
}
}
@ -792,39 +785,45 @@ int ObMediumCompactionScheduleFunc::get_palf_role(const ObLSID &ls_id, ObRole &r
return ret;
}
int ObMediumCompactionScheduleFunc::freeze_memtable_to_get_medium_info()
int ObMediumCompactionScheduleFunc::get_schedule_medium_from_memtable(
ObTablet &tablet,
const int64_t major_frozen_snapshot,
int64_t &schedule_medium_scn,
ObMediumCompactionInfo::ObCompactionType &compaction_type)
{
int ret = OB_SUCCESS;
ObSEArray<ObITable*, MAX_MEMSTORE_CNT> memtables;
if (OB_FAIL(tablet_.get_table_store().get_memtables(memtables, true/*need_active*/))) {
LOG_WARN("failed to get memtables", K(ret), KPC(this));
} else if (memtables.empty()) {
// do nothing
} else {
memtable::ObMemtable *memtable = nullptr;
bool receive_medium_info = false;
bool has_medium_info = false;
for (int i = 0; OB_SUCC(ret) && i < memtables.count(); ++i) {
if (OB_ISNULL(memtable = static_cast<memtable::ObMemtable*>(memtables.at(i)))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("memtable is null", K(ret), K(i), KPC(memtables.at(i)), K(memtable));
} else if (memtable->has_multi_source_data_unit(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO)) {
has_medium_info = true;
if (memtable->is_active_memtable()) {
receive_medium_info = true;
break;
schedule_medium_scn = 0;
compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX;
ObITable *last_major = tablet.get_table_store().get_major_sstables().get_boundary_table(true/*last*/);
if (OB_NOT_NULL(last_major)) {
ObArenaAllocator tmp_allocator;
ObMediumCompactionInfoList tmp_medium_list;
const int64_t last_major_snapshot = last_major->get_snapshot_version();
ObSEArray<ObITable*, MAX_MEMSTORE_CNT> memtables;
if (OB_FAIL(tablet.get_table_store().get_memtables(memtables, true/*need_active*/))) {
LOG_WARN("failed to get memtables", K(ret), "tablet_id", tablet.get_tablet_meta().tablet_id_);
} else if (memtables.empty()) {
// do nothing
} else if (OB_FAIL(get_medium_info_list_from_memtable(tmp_allocator, memtables, tmp_medium_list))) {
LOG_WARN("failed to get medium info list from memtable", K(ret));
} else if (!tmp_medium_list.is_empty()) {
const ObMediumCompactionInfo *info_in_list = nullptr;
DLIST_FOREACH_X(info, tmp_medium_list.get_list(), OB_SUCC(ret)) {
if (OB_UNLIKELY(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO != info->type())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("return info is invalid", K(ret), KPC(info));
} else if (FALSE_IT(info_in_list = static_cast<const ObMediumCompactionInfo *>(info))) {
} else if (info_in_list->medium_snapshot_ <= last_major_snapshot) {
// finished, this medium info could recycle
} else {
if (info_in_list->is_medium_compaction() || info_in_list->medium_snapshot_ <= major_frozen_snapshot) {
schedule_medium_scn = info_in_list->medium_snapshot_;
compaction_type = (ObMediumCompactionInfo::ObCompactionType)info_in_list->compaction_type_;
}
break; // found one unfinish medium info, loop end
}
}
} // end of for
if (OB_FAIL(ret)) {
} else if (receive_medium_info) {
if (OB_FAIL(ls_.tablet_freeze(tablet_.get_tablet_meta().tablet_id_, true/*is_sync*/))) {
if (OB_TABLE_NOT_EXIST != ret) {
LOG_WARN("failed to freeze tablet", K(ret), KPC(this));
}
}
} else if (has_medium_info) {
LOG_INFO("received medium info, the memtable is frozen, no need to freeze tablet again", K(ret), K(tablet_));
}
}
return ret;
@ -834,17 +833,20 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule(
ObLS &ls,
ObTablet &tablet,
const int64_t schedule_scn,
bool &need_merge)
const ObMediumCompactionInfo::ObCompactionType compaction_type)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
need_merge = false;
bool need_merge = false;
bool can_merge = false;
bool need_force_freeze = false;
const ObLSID &ls_id = ls.get_ls_id();
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
if (OB_FAIL(ObPartitionMergePolicy::check_need_medium_merge(
if (OB_UNLIKELY(0 == schedule_scn || !ObMediumCompactionInfo::is_valid_compaction_type(compaction_type))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(schedule_scn), K(compaction_type));
} else if (OB_FAIL(ObPartitionMergePolicy::check_need_medium_merge(
ls,
tablet,
schedule_scn,
@ -854,14 +856,12 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule(
LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id));
} else if (need_merge && can_merge) {
const ObMediumCompactionInfo *medium_info = nullptr;
if (OB_FAIL(tablet.get_medium_compaction_info_list().get_specified_scn_info(schedule_scn, medium_info))) {
LOG_WARN("failed to get specified scn info", K(ret), K(schedule_scn));
} else if (OB_TMP_FAIL(ObTenantTabletScheduler::schedule_merge_dag(
if (OB_TMP_FAIL(ObTenantTabletScheduler::schedule_merge_dag(
ls.get_ls_id(),
tablet.get_tablet_meta().tablet_id_,
tablet_id,
MEDIUM_MERGE,
schedule_scn,
medium_info->is_major_compaction()))) {
ObMediumCompactionInfo::is_major_compaction(compaction_type)))) {
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
ret = tmp_ret;
LOG_WARN("failed to schedule medium merge dag", K(ret), K(ls_id), K(tablet_id));
@ -870,8 +870,8 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule(
LOG_DEBUG("success to schedule medium merge dag", K(ret), K(schedule_scn));
}
} else if (need_force_freeze) {
if (OB_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_id, true/*force_freeze*/))) {
LOG_WARN("failed to force freeze tablet", K(ret), K(ls_id), K(tablet_id));
if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_id, true/*force_freeze*/, true/*is_sync*/))) {
LOG_WARN("failed to force freeze tablet", K(tmp_ret), K(ls_id), K(tablet_id));
}
}
return ret;
@ -910,5 +910,45 @@ int ObMediumCompactionScheduleFunc::get_latest_storage_schema_from_memtable(
return ret;
}
// make sure only data tablet could schedule this func
int ObMediumCompactionScheduleFunc::get_medium_info_list_from_memtable(
ObIAllocator &allocator,
const ObIArray<ObITable *> &memtables,
ObMediumCompactionInfoList &medium_list)
{
int ret = OB_SUCCESS;
if (OB_FAIL(medium_list.init(allocator))) {
LOG_WARN("failed to init merge list", K(ret));
}
ObITable *table = nullptr;
memtable::ObMemtable *memtable = nullptr;
compaction::ObMediumCompactionInfo useless_medium_info;
memtable::ObMultiSourceData::ObIMultiSourceDataUnitList dst_list;
for (int64_t i = 0; OB_SUCC(ret) && i < memtables.count(); ++i) {
dst_list.reset();
if (OB_ISNULL(table = memtables.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table in tables_handle is invalid", K(ret), KPC(table));
} else if (OB_ISNULL(memtable = dynamic_cast<memtable::ObMemtable *>(table))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table pointer does not point to a ObMemtable object", KPC(table));
} else if (OB_FAIL(memtable->get_multi_source_data_unit_list(&useless_medium_info, dst_list, &allocator))) {
LOG_WARN("failed to get medium info from memtable", K(ret), KPC(table));
} else {
ObMediumCompactionInfo *info_in_list = nullptr;
DLIST_FOREACH_X(info, dst_list, OB_SUCC(ret)) {
if (OB_UNLIKELY(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO != info->type())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("return info is invalid", K(ret), KPC(info));
} else if (FALSE_IT(info_in_list = static_cast<ObMediumCompactionInfo *>(info))) {
} else if (OB_FAIL(medium_list.add_medium_compaction_info(*info_in_list))) {
LOG_WARN("failed to add medium compaction info", K(ret), KPC(info_in_list));
}
}
}
} // end of for
return ret;
}
} //namespace compaction
} // namespace oceanbase

View File

@ -35,11 +35,16 @@ public:
static int schedule_tablet_medium_merge(
ObLS &ls,
ObTablet &tablet,
const int64_t major_frozen_scn = 0);
const int64_t major_frozen_scn = 0,
const bool schedule_with_memtable = false);
static int get_latest_storage_schema_from_memtable(
ObIAllocator &allocator,
const ObIArray<ObITable *> &memtables,
ObStorageSchema &storage_schema);
static int get_medium_info_list_from_memtable(
ObIAllocator &allocator,
const ObIArray<ObITable *> &memtables,
ObMediumCompactionInfoList &medium_list);
static int get_palf_role(const share::ObLSID &ls_id, ObRole &role);
int schedule_next_medium_for_leader(const int64_t major_snapshot);
@ -50,8 +55,6 @@ public:
int check_medium_finish();
int freeze_memtable_to_get_medium_info();
TO_STRING_KV("ls_id", ls_.get_ls_id(), "tablet_id", tablet_.get_tablet_meta().tablet_id_);
protected:
int get_status_from_inner_table(share::ObTabletCompactionScnInfo &ret_info);
@ -98,12 +101,17 @@ protected:
ObLS &ls,
ObTablet &tablet,
const int64_t schedule_scn,
bool &need_merge);
const ObMediumCompactionInfo::ObCompactionType compaction_type);
int schedule_next_medium_primary_cluster(const int64_t major_snapshot);
int get_table_schema_to_merge(const int64_t schema_version, ObMediumCompactionInfo &medium_info);
int get_max_reserved_snapshot(int64_t &max_reserved_snapshot);
static int get_schedule_medium_from_memtable(
ObTablet &tablet,
const int64_t major_frozen_snapshot,
int64_t &schedule_medium_scn,
ObMediumCompactionInfo::ObCompactionType &compaction_type);
static int get_table_id(
ObMultiVersionSchemaService &schema_service,
const ObTabletID &tablet_id,

View File

@ -93,6 +93,7 @@ public:
int gene_parallel_info(
ObIAllocator &allocator,
common::ObArrayArray<ObStoreRange> &paral_range);
static inline bool is_valid_compaction_type(const ObCompactionType type) { return MEDIUM_COMPACTION <= type && type < COMPACTION_TYPE_MAX; }
static inline bool is_medium_compaction(const ObCompactionType type) { return MEDIUM_COMPACTION == type; }
static inline bool is_major_compaction(const ObCompactionType type) { return MAJOR_COMPACTION == type; }
inline bool is_major_compaction() const { return is_major_compaction((ObCompactionType)compaction_type_); }

View File

@ -363,29 +363,30 @@ int ObMediumCompactionInfoList::init(common::ObIAllocator &allocator,
const ObMediumCompactionInfoList *old_list,
const ObMediumCompactionInfoList *dump_list,
const int64_t finish_medium_scn/*= 0*/,
const bool update_in_major_type_merge/*= false*/)
const ObMergeType merge_type/*= MERGE_TYPE_MAX*/)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (FALSE_IT(allocator_ = &allocator)) {
} else if (nullptr != old_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, update_in_major_type_merge, *old_list))) {
} else if (nullptr != old_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, *old_list))) {
LOG_WARN("failed to deep copy list", K(ret), K(old_list));
} else if (nullptr != dump_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, update_in_major_type_merge, *dump_list))) {
} else if (nullptr != dump_list && OB_FAIL(append_list_with_deep_copy(finish_medium_scn, *dump_list))) {
LOG_WARN("failed to deep copy list", K(ret), K(dump_list));
} else {
// if update_in_major_type_merge = true, will update wait_check_medium_scn in delete_medium_compaction_info
if (!update_in_major_type_merge && nullptr != old_list) {
last_compaction_type_ = old_list->last_compaction_type_;
wait_check_medium_scn_ = old_list->get_wait_check_medium_scn();
}
} else if (is_major_merge_type(merge_type)) { // update list after major_type_merge
last_compaction_type_ = is_major_merge(merge_type) ? ObMediumCompactionInfo::MAJOR_COMPACTION : ObMediumCompactionInfo::MEDIUM_COMPACTION;
wait_check_medium_scn_ = finish_medium_scn;
} else if (OB_NOT_NULL(old_list)) { // update list with old_list
last_compaction_type_ = old_list->last_compaction_type_;
wait_check_medium_scn_ = old_list->get_wait_check_medium_scn();
}
if (OB_SUCC(ret)) {
compat_ = MEDIUM_LIST_VERSION;
is_inited_ = true;
if (medium_info_list_.get_size() > 0 || wait_check_medium_scn_ > 0) {
LOG_INFO("success to init list", K(ret), KPC(this), KPC(old_list));
LOG_INFO("success to init list", K(ret), KPC(this), KPC(old_list), K(finish_medium_scn),
"merge_type", merge_type_to_str(merge_type));
}
} else if (OB_UNLIKELY(!is_inited_)) {
reset();
@ -406,7 +407,7 @@ int ObMediumCompactionInfoList::init_after_check_finish(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(old_list));
} else if (FALSE_IT(allocator_ = &allocator)) {
} else if (OB_FAIL(append_list_with_deep_copy(wait_check_medium_scn_, false, old_list))) {
} else if (OB_FAIL(append_list_with_deep_copy(wait_check_medium_scn_, old_list))) {
LOG_WARN("failed to deep copy list", K(ret), K(wait_check_medium_scn_));
} else {
last_compaction_type_ = old_list.last_compaction_type_;
@ -421,7 +422,6 @@ int ObMediumCompactionInfoList::init_after_check_finish(
return ret;
}
void ObMediumCompactionInfoList::reset_list()
{
DLIST_REMOVE_ALL_NORET(info, medium_info_list_) {
@ -487,18 +487,22 @@ int ObMediumCompactionInfoList::get_specified_scn_info(
return ret;
}
int64_t ObMediumCompactionInfoList::get_schedule_scn(const int64_t major_compaction_scn) const
void ObMediumCompactionInfoList::get_schedule_scn(
const int64_t major_compaction_scn,
int64_t &schedule_scn,
ObMediumCompactionInfo::ObCompactionType &compaction_type) const
{
int64_t ret_scn = 0;
schedule_scn = 0;
compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX;
if (size() > 0) {
const ObMediumCompactionInfo *first_medium_info = get_first_medium_info();
if (first_medium_info->is_medium_compaction()
|| (first_medium_info->is_major_compaction() && major_compaction_scn >= first_medium_info->medium_snapshot_)) {
// for standby cluster, receive several medium info, only schedule what scheduler have received
ret_scn = first_medium_info->medium_snapshot_;
schedule_scn = first_medium_info->medium_snapshot_;
compaction_type = (ObMediumCompactionInfo::ObCompactionType)first_medium_info->compaction_type_;
}
}
return ret_scn;
}
int ObMediumCompactionInfoList::inner_deep_copy_node(

View File

@ -95,7 +95,7 @@ public:
const ObMediumCompactionInfoList *old_list,
const ObMediumCompactionInfoList *dump_list = nullptr,
const int64_t finish_medium_scn = 0,
const bool update_in_major_type_merge = false);
const ObMergeType merge_type = MERGE_TYPE_MAX);
int init_after_check_finish(
ObIAllocator &allocator,
@ -124,7 +124,10 @@ public:
{
return (ObMediumCompactionInfo::ObCompactionType)last_compaction_type_;
}
int64_t get_schedule_scn(const int64_t major_compaction_scn) const;
void get_schedule_scn(
const int64_t major_compaction_scn,
int64_t &schedule_scn,
ObMediumCompactionInfo::ObCompactionType &compaction_type) const;
int get_specified_scn_info(
const int64_t snapshot,
@ -165,17 +168,11 @@ private:
}
OB_INLINE int append_list_with_deep_copy(
const int64_t finish_scn,
const bool update_in_major_type_merge,
const ObMediumCompactionInfoList &input_list)
{
int ret = OB_SUCCESS;
DLIST_FOREACH_X(input_info, input_list.medium_info_list_, OB_SUCC(ret)) {
const ObMediumCompactionInfo *medium_info = static_cast<const ObMediumCompactionInfo *>(input_info);
if (update_in_major_type_merge
&& medium_info->medium_snapshot_ == finish_scn) {
last_compaction_type_ = medium_info->compaction_type_;
wait_check_medium_scn_ = finish_scn;
}
if (medium_info->medium_snapshot_ > finish_scn) {
ret = inner_deep_copy_node(*medium_info);
}

View File

@ -646,7 +646,6 @@ int ObTabletMergeCtx::get_merge_range(int64_t parallel_idx, ObDatumRange &merge_
int ObTabletMergeCtx::inner_init_for_medium()
{
int ret = OB_SUCCESS;
const ObMediumCompactionInfo *medium_info = nullptr;
ObGetMergeTablesParam get_merge_table_param;
ObGetMergeTablesResult get_merge_table_result;
get_merge_table_param.merge_type_ = param_.merge_type_;
@ -667,11 +666,8 @@ int ObTabletMergeCtx::inner_init_for_medium()
ret = OB_EAGAIN;
LOG_INFO("tx table is not ready. waiting for max_decided_log_ts ...",
KR(ret), "merge_scn", get_merge_table_result.scn_range_.end_scn_);
} else if (OB_FAIL(init_get_medium_compaction_info(param_.merge_version_, medium_info))) { // have checked medium info inside
} else if (OB_FAIL(init_get_medium_compaction_info(param_.merge_version_, get_merge_table_result))) { // have checked medium info inside
LOG_WARN("failed to get medium compaction info", K(ret), KPC(this));
} else if (FALSE_IT(get_merge_table_result.schema_version_ = medium_info->storage_schema_.schema_version_)) {
} else if (FALSE_IT(data_version_ = medium_info->data_version_)) {
} else if (FALSE_IT(is_tenant_major_merge_ = medium_info->is_major_compaction())) {
} else if (OB_FAIL(get_basic_info_from_result(get_merge_table_result))) {
LOG_WARN("failed to set basic info to ctx", K(ret), K(get_merge_table_result), KPC(this));
} else if (OB_FAIL(cal_major_merge_param(get_merge_table_result))) {
@ -682,23 +678,27 @@ int ObTabletMergeCtx::inner_init_for_medium()
int ObTabletMergeCtx::init_get_medium_compaction_info(
const int64_t medium_snapshot,
const ObMediumCompactionInfo *&medium_info)
ObGetMergeTablesResult &get_merge_table_result)
{
int ret = OB_SUCCESS;
medium_info = nullptr;
const ObMediumCompactionInfoList &medium_list = tablet_handle_.get_obj()->get_medium_compaction_info_list();
ObTablet *tablet = tablet_handle_.get_obj();
const ObMediumCompactionInfoList &medium_list = tablet->get_medium_compaction_info_list();
const bool medium_in_storage = medium_snapshot <= tablet->get_tablet_meta().max_serialized_medium_scn_;
ObMediumCompactionInfo medium_info;
const ObMediumCompactionInfo *medium_info_ptr = &medium_info;
if (OB_UNLIKELY(!medium_list.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("medium compaction mgr is invalid", K(ret), KPC(this), K(medium_list));
} else if (OB_FAIL(medium_list.get_specified_scn_info(medium_snapshot, medium_info))) {
} else if (medium_in_storage && OB_FAIL(medium_list.get_specified_scn_info(medium_snapshot, medium_info_ptr))) {
LOG_WARN("failed to get medium info from mgr", K(ret), K(medium_snapshot), K(medium_list));
} else if (OB_UNLIKELY(nullptr == medium_info || !medium_info->is_valid())) {
} else if (!medium_in_storage && OB_FAIL(get_specified_medium_compaction_info_from_memtable(allocator_, medium_snapshot, medium_info))) {
LOG_WARN("failed to get medium info from memtable", K(ret), K(medium_snapshot));
} else if (OB_UNLIKELY(nullptr == medium_info_ptr || !medium_info_ptr->is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("medium compaction info is invalid", K(ret), KPC(this), K(medium_list), KPC(medium_info));
} else if (medium_info->contain_parallel_range_
&& OB_FAIL(parallel_merge_ctx_.init(*medium_info))) {
LOG_WARN("failed to init parallel merge ctx", K(ret), KPC(medium_info));
LOG_ERROR("medium compaction info is invalid", K(ret), KPC(this), K(medium_list), KPC(medium_info_ptr));
} else if (medium_info_ptr->contain_parallel_range_
&& OB_FAIL(parallel_merge_ctx_.init(*medium_info_ptr))) {
LOG_WARN("failed to init parallel merge ctx", K(ret), KPC(medium_info_ptr));
} else {
void *buf = nullptr;
if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObStorageSchema)))) {
@ -709,8 +709,8 @@ int ObTabletMergeCtx::init_get_medium_compaction_info(
storage_schema = new(buf) ObStorageSchema();
schema_ctx_.storage_schema_ = storage_schema;
schema_ctx_.allocated_storage_schema_ = true;
if (OB_FAIL(storage_schema->init(allocator_, medium_info->storage_schema_))) {
LOG_WARN("failed to init storage schema from current medium info", K(ret), KPC(medium_info));
if (OB_FAIL(storage_schema->init(allocator_, medium_info_ptr->storage_schema_))) {
LOG_WARN("failed to init storage schema from current medium info", K(ret), KPC(medium_info_ptr));
}
}
@ -720,7 +720,11 @@ int ObTabletMergeCtx::init_get_medium_compaction_info(
LOG_ERROR("multi version data is discarded, should not compaction now", K(ret), K(param_), K(medium_snapshot));
}
}
if (OB_SUCC(ret)) {
get_merge_table_result.schema_version_ = medium_info_ptr->storage_schema_.schema_version_;
data_version_ = medium_info_ptr->data_version_;
is_tenant_major_merge_ = medium_info_ptr->is_major_compaction();
}
return ret;
}
@ -1007,39 +1011,13 @@ int ObTabletMergeCtx::get_medium_compaction_info_to_store()
{
int ret = OB_SUCCESS;
if (is_mini_merge(param_.merge_type_)) {
if (OB_FAIL(merge_list_.init(allocator_))) {
LOG_WARN("failed to init merge list", K(ret));
}
ObITable *table = nullptr;
memtable::ObMemtable * memtable = nullptr;
compaction::ObMediumCompactionInfo medium_info;
memtable::ObMultiSourceData::ObIMultiSourceDataUnitList dst_list;
for (int i = 0; OB_SUCC(ret) && i < tables_handle_.get_count(); ++i) {
dst_list.reset();
if (OB_UNLIKELY(nullptr == (table = tables_handle_.get_table(i)) || !table->is_frozen_memtable())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table in tables_handle is invalid", K(ret), KPC(table));
} else if (OB_ISNULL(memtable = dynamic_cast<memtable::ObMemtable *>(table))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table pointer does not point to a ObMemtable object", KPC(table));
} else if (OB_FAIL(memtable->get_multi_source_data_unit_list(&medium_info, dst_list, &allocator_))) {
LOG_WARN("failed to get medium info from memtable", K(ret), KPC(table));
} else if (dst_list.is_empty()) {
// do nothing
} else {
ObMediumCompactionInfo *input_info = nullptr;
DLIST_FOREACH_X(info, dst_list, OB_SUCC(ret)) {
if (OB_UNLIKELY(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO != info->type())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("return info is invalid", K(ret), KPC(info));
} else if (FALSE_IT(input_info = static_cast<ObMediumCompactionInfo *>(info))) {
} else if (OB_FAIL(merge_list_.add_medium_compaction_info(*input_info))) {
LOG_WARN("failed to add medium compaction info", K(ret), KPC(input_info));
}
}
}
} // end of for
if (OB_SUCC(ret)) {
ObSEArray<ObITable*, MAX_MEMSTORE_CNT> memtables;
if (OB_FAIL(tables_handle_.get_tables(memtables))) {
LOG_WARN("failed to get tables", K(ret), K(memtables));
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_medium_info_list_from_memtable(
allocator_, memtables, merge_list_))) {
LOG_WARN("failed to get medium info list from memtable", K(ret));
} else if (merge_list_.size() > 0) {
LOG_INFO("success get medium info list", "ls_id", param_.ls_id_,
"tablet_id", param_.tablet_id_, K(merge_list_));
}
@ -1047,6 +1025,51 @@ int ObTabletMergeCtx::get_medium_compaction_info_to_store()
return ret;
}
int ObTabletMergeCtx::get_specified_medium_compaction_info_from_memtable(
ObIAllocator &allocator,
const int64_t medium_snapshot,
ObMediumCompactionInfo &medium_info)
{
int ret = OB_SUCCESS;
ObArenaAllocator tmp_allocator;
ObSEArray<ObITable*, MAX_MEMSTORE_CNT> memtables;
ObMediumCompactionInfoList tmp_medium_list;
if (OB_FAIL(tablet_handle_.get_obj()->get_table_store().get_memtables(memtables, true/*need_active*/))) {
LOG_WARN("failed to get memtables", K(ret), K(param_));
} else if (memtables.empty()) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("no memtable", K(ret), K(memtables), K(param_));
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_medium_info_list_from_memtable(
tmp_allocator, memtables, tmp_medium_list))) {
LOG_WARN("failed to get medium info list from memtable", K(ret));
} else if (tmp_medium_list.is_empty()) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("have memtables, but medium list is empty", K(ret), K(memtables), K(tmp_medium_list));
} else {
const ObMediumCompactionInfo *info_in_list = nullptr;
DLIST_FOREACH_X(info, tmp_medium_list.get_list(), OB_SUCC(ret)) {
if (OB_UNLIKELY(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO != info->type())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("return info is invalid", K(ret), KPC(info));
} else if (FALSE_IT(info_in_list = static_cast<const ObMediumCompactionInfo *>(info))) {
} else if (medium_snapshot < info_in_list->medium_snapshot_) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("not found specified medium info in medium list", K(ret), K(param_), K(memtables), K(tmp_medium_list));
} else if (medium_snapshot == info_in_list->medium_snapshot_) {
if (OB_FAIL(medium_info.init(allocator, *info_in_list))) {
LOG_WARN("failed to init medium info", K(ret));
}
break;
}
}
if (OB_SUCC(ret) && medium_info.is_valid()) {
LOG_INFO("success get medium info", "ls_id", param_.ls_id_,
"tablet_id", param_.tablet_id_, K(medium_info));
}
}
return ret;
}
int ObTabletMergeCtx::get_storage_schema_to_merge(
const ObTablesHandleArray &merge_tables_handle,
const bool get_schema_on_memtable)

View File

@ -167,7 +167,11 @@ struct ObTabletMergeCtx
int inner_init_for_mini(bool &skip_rest_operation);
int inner_init_for_medium();
int init_get_medium_compaction_info(const int64_t medium_snapshot, const ObMediumCompactionInfo *&medium_info);
int init_get_medium_compaction_info(const int64_t medium_snapshot, ObGetMergeTablesResult &result);
int get_specified_medium_compaction_info_from_memtable(
ObIAllocator &allocator,
const int64_t medium_snapshot,
ObMediumCompactionInfo &info);
int get_schema_and_gene_from_result(const ObGetMergeTablesResult &get_merge_table_result);
int get_storage_schema_and_gene_from_result(const ObGetMergeTablesResult &get_merge_table_result);
int get_storage_schema_to_merge(const ObTablesHandleArray &merge_tables_handle, const bool get_schema_on_memtable = true);

View File

@ -1098,18 +1098,12 @@ int ObTabletMergeFinishTask::get_merged_sstable(ObTabletMergeCtx &ctx, ObSSTable
int ObTabletMergeFinishTask::add_sstable_for_merge(ObTabletMergeCtx &ctx)
{
int ret = OB_SUCCESS;
const ObStorageSchema *update_storage_schema = ctx.schema_ctx_.storage_schema_;
ObTablet *old_tablet = ctx.tablet_handle_.get_obj();
const ObMergeType merge_type = ctx.param_.merge_type_;
if (OB_UNLIKELY(!ctx.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error of merge ctx", K(ctx));
} else if (is_major_merge_type(merge_type)
&& update_storage_schema->schema_version_ > old_tablet->get_storage_schema().schema_version_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema in major can't have larger schema version than tablet", K(ret),
KPC(update_storage_schema), K(old_tablet->get_storage_schema()));
} else if (is_mini_merge(merge_type) && !ctx.param_.tablet_id_.is_special_merge_tablet()) {
// if only one medium compaction info need store, just use ObUpdateTableStoreParam
// OR need to read from inner table to decide what need to keep after release memtable
@ -1130,7 +1124,8 @@ int ObTabletMergeFinishTask::add_sstable_for_merge(ObTabletMergeCtx &ctx)
clog_checkpoint_scn,
is_minor_merge(ctx.param_.merge_type_)/*need_check_sstable*/,
false/*allow_duplicate_sstable*/,
&ctx.merge_list_);
&ctx.merge_list_,
ctx.param_.get_merge_type());
ObTablet *old_tablet = ctx.tablet_handle_.get_obj();
ObTabletHandle new_tablet_handle;
if (ctx.param_.tablet_id_.is_special_merge_tablet()) {

View File

@ -982,19 +982,12 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
if (could_major_merge && OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
ls,
*tablet,
major_frozen_scn))) {
major_frozen_scn,
true/*schedule_with_memtable*/))) {
if (OB_EAGAIN != ret) {
LOG_WARN("failed to schedule medium", K(tmp_ret), K(ls_id), K(tablet_id));
}
}
// get info from memtable to check have received new medium info
if (OB_TMP_FAIL(func.freeze_memtable_to_get_medium_info())) {
if (OB_TABLE_NOT_EXIST != tmp_ret) {
LOG_WARN("failed to freeze memtable", K(tmp_ret), K(ls_id), K(tablet_id));
}
}
ls_merge_finish &= tablet_merge_finish;
}
} // end of while

View File

@ -233,7 +233,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
tx_data_(),
binding_info_(),
auto_inc_seq_(),
medium_info_list_(nullptr)
medium_info_list_(nullptr),
merge_type_(MERGE_TYPE_MAX)
{
clog_checkpoint_scn_.set_min();
}
@ -248,7 +249,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
const SCN clog_checkpoint_scn,
const bool need_check_sstable,
const bool allow_duplicate_sstable,
const compaction::ObMediumCompactionInfoList *medium_info_list)
const compaction::ObMediumCompactionInfoList *medium_info_list,
const ObMergeType merge_type)
: table_handle_(table_handle),
snapshot_version_(snapshot_version),
clog_checkpoint_scn_(),
@ -268,7 +270,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
tx_data_(),
binding_info_(),
auto_inc_seq_(),
medium_info_list_(medium_info_list)
medium_info_list_(medium_info_list),
merge_type_(merge_type)
{
clog_checkpoint_scn_ = clog_checkpoint_scn;
}
@ -301,7 +304,8 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
tx_data_(),
binding_info_(),
auto_inc_seq_(),
medium_info_list_(nullptr)
medium_info_list_(nullptr),
merge_type_(MERGE_TYPE_MAX)
{
clog_checkpoint_scn_.set_min();
}

View File

@ -310,7 +310,8 @@ struct ObUpdateTableStoreParam
const share::SCN clog_checkpoint_scn = share::SCN::min_scn(),
const bool need_check_sstable = false,
const bool allow_duplicate_sstable = false,
const compaction::ObMediumCompactionInfoList *medium_info_list = nullptr);
const compaction::ObMediumCompactionInfoList *medium_info_list = nullptr,
const ObMergeType merge_type = MERGE_TYPE_MAX);
ObUpdateTableStoreParam( // for ddl merge task only
const ObTableHandleV2 &table_handle,
@ -327,7 +328,7 @@ struct ObUpdateTableStoreParam
K_(keep_old_ddl_sstable), K_(need_report), KPC_(storage_schema), K_(rebuild_seq), K_(update_with_major_flag),
K_(need_check_sstable), K_(ddl_checkpoint_scn), K_(ddl_start_scn), K_(ddl_snapshot_version),
K_(ddl_execution_id), K_(ddl_cluster_version), K_(allow_duplicate_sstable), K_(tx_data), K_(binding_info), K_(auto_inc_seq),
KPC_(medium_info_list));
KPC_(medium_info_list), "merge_type", merge_type_to_str(merge_type_));
ObTableHandleV2 table_handle_;
int64_t snapshot_version_;
@ -352,6 +353,7 @@ struct ObUpdateTableStoreParam
share::ObTabletAutoincSeq auto_inc_seq_;
const compaction::ObMediumCompactionInfoList *medium_info_list_;
ObMergeType merge_type_; // set merge_type only when update tablet in compaction
};
struct ObBatchUpdateTableStoreParam final

View File

@ -243,7 +243,7 @@ int ObTablet::init(
param.medium_info_list_,
// delete all medium before latest finish major snapshot
nullptr != last_major ? last_major->get_snapshot_version() : 0,
update_in_major_type_merge))) {
param.merge_type_))) {
LOG_WARN("failed to init medium info list", K(ret));
} else if (OB_FAIL(build_read_info(*allocator_))) {
LOG_WARN("failed to build read info", K(ret));