add schema changed defense
This commit is contained in:
parent
bd2fea0f7e
commit
b57e618713
@ -304,7 +304,7 @@ int ObCOTabletMergeCtx::cal_merge_param()
|
||||
force_full_merge = true;
|
||||
static_param_.is_rebuild_column_store_ = true;
|
||||
}
|
||||
if (FAILEDx(static_param_.cal_major_merge_param(force_full_merge, progressive_merge_mgr_))) {
|
||||
if (FAILEDx(ObBasicTabletMergeCtx::cal_major_merge_param(force_full_merge, progressive_merge_mgr_))) {
|
||||
LOG_WARN("failed to calc major merge param", KR(ret), K(force_full_merge));
|
||||
}
|
||||
return ret;
|
||||
|
@ -279,7 +279,8 @@ int ObStaticMergeParam::cal_major_merge_param(
|
||||
LOG_WARN("failed to init progressive merge mgr", KR(ret), K_(is_full_merge), K(sstable_meta_hdl), KPC(schema_));
|
||||
} else if (is_full_merge_
|
||||
|| (merge_level_ != MACRO_BLOCK_MERGE_LEVEL && is_schema_changed_)
|
||||
|| (data_version_ >= DATA_VERSION_4_3_3_0 && progressive_mgr.need_calc_progressive_merge())) {
|
||||
|| (data_version_ >= DATA_VERSION_4_3_3_0 && progressive_mgr.need_calc_progressive_merge())
|
||||
|| (data_version_ >= DATA_VERSION_4_3_3_0 && data_version_ < DATA_VERSION_4_3_4_0 && !get_tablet_id().is_user_tablet())) {
|
||||
merge_level_ = MACRO_BLOCK_MERGE_LEVEL;
|
||||
// ATTENTION! Critical diagnostic log, DO NOT CHANGE!!!
|
||||
LOG_INFO("set merge_level to MACRO_BLOCK_MERGE_LEVEL", K_(is_schema_changed), K(force_full_merge),
|
||||
@ -1224,6 +1225,30 @@ int ObBasicTabletMergeCtx::get_medium_compaction_info()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBasicTabletMergeCtx::cal_major_merge_param(
|
||||
const bool force_full_merge,
|
||||
ObProgressiveMergeMgr &progressive_mgr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(static_param_.cal_major_merge_param(force_full_merge,
|
||||
progressive_mgr))) {
|
||||
LOG_WARN("failed to calc major param", KR(ret), K_(static_param));
|
||||
} else if (!progressive_merge_mgr_.need_calc_progressive_merge() && static_param_.data_version_ >= DATA_VERSION_4_3_3_0) {
|
||||
bool is_schema_changed = false;
|
||||
if (OB_FAIL(ObMediumCompactionScheduleFunc::check_if_schema_changed(*get_tablet(), *get_schema(), is_schema_changed))) {
|
||||
LOG_WARN("failed to check is schema changed", KR(ret), K_(static_param), KPC(get_schema()));
|
||||
} else if (is_schema_changed && !static_param_.is_schema_changed_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("found schema changed when compare sstable & schema but progressive merge round is not increasing", KR(ret),
|
||||
K(is_schema_changed), "param", get_dag_param(), KPC(get_schema()));
|
||||
#ifdef ERRSIM
|
||||
SERVER_EVENT_SYNC_ADD("merge_errsim", "found_schema_changed", "ls_id", get_ls_id(), "tablet_id", get_tablet_id());
|
||||
#endif
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBasicTabletMergeCtx::check_medium_info(
|
||||
const ObMediumCompactionInfo &next_medium_info,
|
||||
const int64_t last_major_snapshot)
|
||||
|
@ -261,6 +261,8 @@ public:
|
||||
VIRTUAL_TO_STRING_KV(K_(static_param), K_(static_desc), K_(parallel_merge_ctx), K_(tablet_handle),
|
||||
K_(info_collector), KP_(merge_dag));
|
||||
protected:
|
||||
int cal_major_merge_param(const bool force_full_merge,
|
||||
ObProgressiveMergeMgr &progressive_mgr);
|
||||
virtual int get_merge_tables(ObGetMergeTablesResult &get_merge_table_result);
|
||||
virtual int try_swap_tablet(ObGetMergeTablesResult &get_merge_table_result)
|
||||
{
|
||||
|
@ -41,7 +41,7 @@ using namespace common;
|
||||
|
||||
namespace compaction
|
||||
{
|
||||
|
||||
ERRSIM_POINT_DEF(EN_COMPACTION_SKIP_INIT_SCHEMA_CHANGED);
|
||||
int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int64_t pos = 0;
|
||||
@ -697,36 +697,54 @@ int ObMediumCompactionScheduleFunc::errsim_choose_medium_snapshot(
|
||||
}
|
||||
#endif
|
||||
|
||||
int ObMediumCompactionScheduleFunc::check_if_schema_changed(ObMediumCompactionInfo &medium_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_schema_changed = false;
|
||||
if (OB_FAIL(check_if_schema_changed(*tablet_handle_.get_obj(), medium_info.storage_schema_, is_schema_changed))) {
|
||||
LOG_WARN("failed to get check if schema changed", K(ret), K(medium_info));
|
||||
#ifdef ERRSIM
|
||||
} else if (OB_UNLIKELY(EN_COMPACTION_SKIP_INIT_SCHEMA_CHANGED)) {
|
||||
bool is_progressive_merge = false;
|
||||
medium_info.is_schema_changed_ = false;
|
||||
medium_info.storage_schema_.progressive_merge_round_ = 1;
|
||||
FLOG_INFO("ERRSIM EN_COMPACTION_SKIP_INIT_SCHEMA_CHANGED", KPC(this), K(is_schema_changed), K(is_progressive_merge),
|
||||
K(medium_info));
|
||||
#endif
|
||||
} else {
|
||||
medium_info.is_schema_changed_ = is_schema_changed;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::check_if_schema_changed(
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
const ObTablet &tablet,
|
||||
const ObStorageSchema &schema,
|
||||
bool &is_schema_changed)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t full_stored_col_cnt = 0;
|
||||
const ObStorageSchema &schema = medium_info.storage_schema_;
|
||||
if (OB_UNLIKELY(!schema.is_inited())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema is not inited", KR(ret), K(schema));
|
||||
} else if (OB_FAIL(schema.get_stored_column_count_in_sstable(full_stored_col_cnt))) {
|
||||
LOG_WARN("failed to get stored column count in sstable", K(ret), K(schema));
|
||||
} else if (OB_UNLIKELY(tablet_handle_.get_obj()->get_last_major_column_count() > full_stored_col_cnt)) {
|
||||
} else if (OB_UNLIKELY(tablet.get_last_major_column_count() > full_stored_col_cnt)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("stored col cnt in curr schema is less than old major sstable", K(ret),
|
||||
"col_cnt_in_sstable", tablet_handle_.get_obj()->get_last_major_column_count(),
|
||||
"col_cnt_in_schema", full_stored_col_cnt, KPC(this));
|
||||
} else if (medium_info.data_version_ >= DATA_VERSION_4_3_3_0) {
|
||||
// do not check schema changed
|
||||
medium_info.is_schema_changed_ = false;
|
||||
} else if (tablet_handle_.get_obj()->get_last_major_column_count() != full_stored_col_cnt
|
||||
|| tablet_handle_.get_obj()->get_last_major_compressor_type() != schema.get_compressor_type()
|
||||
|| (ObRowStoreType::DUMMY_ROW_STORE != tablet_handle_.get_obj()->get_last_major_latest_row_store_type()
|
||||
&& tablet_handle_.get_obj()->get_last_major_latest_row_store_type() != schema.row_store_type_)) {
|
||||
medium_info.is_schema_changed_ = true;
|
||||
LOG_INFO("schema changed", KPC(this), K(schema), K(full_stored_col_cnt),
|
||||
"col_cnt_in_sstable", tablet_handle_.get_obj()->get_last_major_column_count(),
|
||||
"compressor_type_in_sstable", tablet_handle_.get_obj()->get_last_major_compressor_type(),
|
||||
"latest_row_store_type_in_sstable", tablet_handle_.get_obj()->get_last_major_latest_row_store_type());
|
||||
"col_cnt_in_sstable", tablet.get_last_major_column_count(),
|
||||
"col_cnt_in_schema", full_stored_col_cnt, K(schema));
|
||||
} else if (tablet.get_last_major_column_count() != full_stored_col_cnt
|
||||
|| tablet.get_last_major_compressor_type() != schema.get_compressor_type()
|
||||
|| (ObRowStoreType::DUMMY_ROW_STORE != tablet.get_last_major_latest_row_store_type()
|
||||
&& tablet.get_last_major_latest_row_store_type() != schema.row_store_type_)) {
|
||||
is_schema_changed = true;
|
||||
LOG_INFO("schema changed", K(schema), K(schema), K(full_stored_col_cnt),
|
||||
"col_cnt_in_sstable", tablet.get_last_major_column_count(),
|
||||
"compressor_type_in_sstable", tablet.get_last_major_compressor_type(),
|
||||
"latest_row_store_type_in_sstable", tablet.get_last_major_latest_row_store_type());
|
||||
} else {
|
||||
medium_info.is_schema_changed_ = false;
|
||||
is_schema_changed = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -91,6 +91,10 @@ public:
|
||||
const ObTabletID &tablet_id,
|
||||
const int64_t schema_version,
|
||||
uint64_t &table_id);
|
||||
static int check_if_schema_changed(
|
||||
const ObTablet &tablet,
|
||||
const ObStorageSchema &storage_schema,
|
||||
bool &is_schema_changed);
|
||||
#ifdef OB_BUILD_SHARED_STORAGE
|
||||
// medium compaction is not considered
|
||||
int prepare_ls_major_merge_info(
|
||||
@ -106,7 +110,6 @@ public:
|
||||
const storage::ObStorageSchema &storage_schema,
|
||||
bool &is_progressive_merge);
|
||||
#endif
|
||||
|
||||
int64_t to_string(char* buf, const int64_t buf_len) const;
|
||||
protected:
|
||||
int decide_medium_snapshot(bool &medium_clog_submitted);
|
||||
|
@ -150,6 +150,7 @@ const static char * ObTabletScheduleNewRoundStateStr[] = {
|
||||
"DURING_SPLIT",
|
||||
"NEED_CHECK_LAST_MEDIUM_CKM",
|
||||
"EXIST_UNFINISH_MEDIUM",
|
||||
"SCHEDULE_CONFLICT",
|
||||
"NONE",
|
||||
"STATE_MAX"
|
||||
};
|
||||
@ -367,7 +368,12 @@ int ObTabletStatusCache::register_map(
|
||||
bool could_schedule_merge = false;
|
||||
if (OB_FAIL(MTL(ObTenantTabletScheduler*)->tablet_start_schedule_medium(
|
||||
tablet_id, could_schedule_merge))) {
|
||||
LOG_WARN("failed to add tablet", K(ret), K(tablet_id));
|
||||
if (OB_ENTRY_EXIST == ret) {
|
||||
new_round_state_ = SCHEDULE_CONFLICT;
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("failed to add tablet", K(ret), K(tablet_id));
|
||||
}
|
||||
} else if (could_schedule_merge) {
|
||||
new_round_state_ = CAN_SCHEDULE_NEW_ROUND;
|
||||
} else {
|
||||
|
@ -93,6 +93,7 @@ public:
|
||||
DURING_SPLIT,
|
||||
NEED_CHECK_LAST_MEDIUM_CKM,
|
||||
EXIST_UNFINISH_MEDIUM,
|
||||
SCHEDULE_CONFLICT,
|
||||
DIAGNOSE_NORMAL, // for diagnose
|
||||
NEW_ROUND_STATE_MAX,
|
||||
};
|
||||
@ -131,6 +132,8 @@ public:
|
||||
bool need_diagnose() const;
|
||||
bool could_schedule_new_round() const { return can_merge() && inner_check_new_round_state(); }
|
||||
bool tablet_merge_finish() const { return tablet_merge_finish_; }
|
||||
|
||||
// CAREFUL! medium list may be NULL for some situation
|
||||
const compaction::ObMediumCompactionInfoList *medium_list() const { return medium_list_; }
|
||||
TabletExecuteState get_execute_state() const { return execute_state_; }
|
||||
TabletScheduleNewRoundState get_new_round_state() const { return new_round_state_; }
|
||||
|
@ -549,7 +549,6 @@ int ObTabletMajorMergeCtx::prepare_schema()
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
#ifdef OB_BUILD_SHARED_STORAGE
|
||||
/*
|
||||
* ----------------------------------------------ObSSMergeCtx--------------------------------------------------
|
||||
|
@ -106,8 +106,8 @@ protected:
|
||||
virtual int try_swap_tablet(ObGetMergeTablesResult &get_merge_table_result) override
|
||||
{ return ObBasicTabletMergeCtx::swap_tablet(get_merge_table_result); }
|
||||
virtual int cal_merge_param() override {
|
||||
return static_param_.cal_major_merge_param(false /*force_full_merge*/,
|
||||
progressive_merge_mgr_);
|
||||
return ObBasicTabletMergeCtx::cal_major_merge_param(
|
||||
false /*force_full_merge*/, progressive_merge_mgr_);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -754,7 +754,7 @@ int ObProhibitScheduleMediumMap::add_flag(const ObTabletID &tablet_id, const Pro
|
||||
LOG_INFO("flag in conflict", K(ret), K(tablet_id), K(tmp_flag), K(input_flag));
|
||||
}
|
||||
} else { // tmp_flag == input_flag
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
ret = OB_ENTRY_EXIST;
|
||||
LOG_WARN("flag in already exist", K(ret), K(tablet_id), K(tmp_flag), K(input_flag));
|
||||
}
|
||||
}
|
||||
@ -950,7 +950,7 @@ int ObTenantTabletScheduler::tablet_start_schedule_medium(const ObTabletID &tabl
|
||||
if (OB_EAGAIN == ret) {
|
||||
tablet_could_schedule_medium = false;
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
} else if (OB_ENTRY_EXIST != ret) {
|
||||
LOG_WARN("failed to add flag for tablet schedule medium", K(ret), K(tablet_id));
|
||||
}
|
||||
} else {
|
||||
|
@ -30,6 +30,20 @@ using namespace share::schema;
|
||||
using namespace blocksstable;
|
||||
using namespace compaction;
|
||||
using namespace unittest;
|
||||
namespace compaction
|
||||
{
|
||||
int ObBasicTabletMergeCtx::cal_major_merge_param(
|
||||
const bool force_full_merge,
|
||||
ObProgressiveMergeMgr &progressive_mgr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(static_param_.cal_major_merge_param(force_full_merge,
|
||||
progressive_mgr))) {
|
||||
LOG_WARN("failed to calc major param", KR(ret), K_(static_param));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
} // namespace compaction
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
|
Loading…
x
Reference in New Issue
Block a user