From b57e6187130f2ab17cb5536708f24437a89e482d Mon Sep 17 00:00:00 2001 From: yangqise7en <877793735@qq.com> Date: Mon, 21 Oct 2024 07:13:50 +0000 Subject: [PATCH] add schema changed defense --- src/storage/column_store/ob_co_merge_ctx.cpp | 2 +- .../compaction/ob_basic_tablet_merge_ctx.cpp | 27 ++++++++- .../compaction/ob_basic_tablet_merge_ctx.h | 2 + .../compaction/ob_medium_compaction_func.cpp | 56 ++++++++++++------- .../compaction/ob_medium_compaction_func.h | 5 +- .../compaction/ob_schedule_status_cache.cpp | 8 ++- .../compaction/ob_schedule_status_cache.h | 3 + .../compaction/ob_tablet_merge_ctx.cpp | 1 - src/storage/compaction/ob_tablet_merge_ctx.h | 4 +- .../compaction/ob_tenant_tablet_scheduler.cpp | 4 +- unittest/storage/test_major_rows_merger.cpp | 14 +++++ 11 files changed, 98 insertions(+), 28 deletions(-) diff --git a/src/storage/column_store/ob_co_merge_ctx.cpp b/src/storage/column_store/ob_co_merge_ctx.cpp index f9e14df5f..102ba95e7 100644 --- a/src/storage/column_store/ob_co_merge_ctx.cpp +++ b/src/storage/column_store/ob_co_merge_ctx.cpp @@ -304,7 +304,7 @@ int ObCOTabletMergeCtx::cal_merge_param() force_full_merge = true; static_param_.is_rebuild_column_store_ = true; } - if (FAILEDx(static_param_.cal_major_merge_param(force_full_merge, progressive_merge_mgr_))) { + if (FAILEDx(ObBasicTabletMergeCtx::cal_major_merge_param(force_full_merge, progressive_merge_mgr_))) { LOG_WARN("failed to calc major merge param", KR(ret), K(force_full_merge)); } return ret; diff --git a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp index 59e2f1290..54f0d9af4 100644 --- a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp @@ -279,7 +279,8 @@ int ObStaticMergeParam::cal_major_merge_param( LOG_WARN("failed to init progressive merge mgr", KR(ret), K_(is_full_merge), K(sstable_meta_hdl), KPC(schema_)); } else if (is_full_merge_ || (merge_level_ != MACRO_BLOCK_MERGE_LEVEL && is_schema_changed_) - || (data_version_ >= DATA_VERSION_4_3_3_0 && progressive_mgr.need_calc_progressive_merge())) { + || (data_version_ >= DATA_VERSION_4_3_3_0 && progressive_mgr.need_calc_progressive_merge()) + || (data_version_ >= DATA_VERSION_4_3_3_0 && data_version_ < DATA_VERSION_4_3_4_0 && !get_tablet_id().is_user_tablet())) { merge_level_ = MACRO_BLOCK_MERGE_LEVEL; // ATTENTION! Critical diagnostic log, DO NOT CHANGE!!! LOG_INFO("set merge_level to MACRO_BLOCK_MERGE_LEVEL", K_(is_schema_changed), K(force_full_merge), @@ -1224,6 +1225,30 @@ int ObBasicTabletMergeCtx::get_medium_compaction_info() return ret; } +int ObBasicTabletMergeCtx::cal_major_merge_param( + const bool force_full_merge, + ObProgressiveMergeMgr &progressive_mgr) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(static_param_.cal_major_merge_param(force_full_merge, + progressive_mgr))) { + LOG_WARN("failed to calc major param", KR(ret), K_(static_param)); + } else if (!progressive_merge_mgr_.need_calc_progressive_merge() && static_param_.data_version_ >= DATA_VERSION_4_3_3_0) { + bool is_schema_changed = false; + if (OB_FAIL(ObMediumCompactionScheduleFunc::check_if_schema_changed(*get_tablet(), *get_schema(), is_schema_changed))) { + LOG_WARN("failed to check is schema changed", KR(ret), K_(static_param), KPC(get_schema())); + } else if (is_schema_changed && !static_param_.is_schema_changed_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("found schema changed when compare sstable & schema but progressive merge round is not increasing", KR(ret), + K(is_schema_changed), "param", get_dag_param(), KPC(get_schema())); +#ifdef ERRSIM + SERVER_EVENT_SYNC_ADD("merge_errsim", "found_schema_changed", "ls_id", get_ls_id(), "tablet_id", get_tablet_id()); +#endif + } + } + return ret; +} + int ObBasicTabletMergeCtx::check_medium_info( const ObMediumCompactionInfo &next_medium_info, const int64_t last_major_snapshot) diff --git a/src/storage/compaction/ob_basic_tablet_merge_ctx.h b/src/storage/compaction/ob_basic_tablet_merge_ctx.h index 57f944f6b..fa800d80b 100644 --- a/src/storage/compaction/ob_basic_tablet_merge_ctx.h +++ b/src/storage/compaction/ob_basic_tablet_merge_ctx.h @@ -261,6 +261,8 @@ public: VIRTUAL_TO_STRING_KV(K_(static_param), K_(static_desc), K_(parallel_merge_ctx), K_(tablet_handle), K_(info_collector), KP_(merge_dag)); protected: + int cal_major_merge_param(const bool force_full_merge, + ObProgressiveMergeMgr &progressive_mgr); virtual int get_merge_tables(ObGetMergeTablesResult &get_merge_table_result); virtual int try_swap_tablet(ObGetMergeTablesResult &get_merge_table_result) { diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index da851d6a6..a83642fa6 100644 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -41,7 +41,7 @@ using namespace common; namespace compaction { - +ERRSIM_POINT_DEF(EN_COMPACTION_SKIP_INIT_SCHEMA_CHANGED); int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_len) const { int64_t pos = 0; @@ -697,36 +697,54 @@ int ObMediumCompactionScheduleFunc::errsim_choose_medium_snapshot( } #endif +int ObMediumCompactionScheduleFunc::check_if_schema_changed(ObMediumCompactionInfo &medium_info) +{ + int ret = OB_SUCCESS; + bool is_schema_changed = false; + if (OB_FAIL(check_if_schema_changed(*tablet_handle_.get_obj(), medium_info.storage_schema_, is_schema_changed))) { + LOG_WARN("failed to get check if schema changed", K(ret), K(medium_info)); +#ifdef ERRSIM + } else if (OB_UNLIKELY(EN_COMPACTION_SKIP_INIT_SCHEMA_CHANGED)) { + bool is_progressive_merge = false; + medium_info.is_schema_changed_ = false; + medium_info.storage_schema_.progressive_merge_round_ = 1; + FLOG_INFO("ERRSIM EN_COMPACTION_SKIP_INIT_SCHEMA_CHANGED", KPC(this), K(is_schema_changed), K(is_progressive_merge), + K(medium_info)); +#endif + } else { + medium_info.is_schema_changed_ = is_schema_changed; + } + return ret; +} + int ObMediumCompactionScheduleFunc::check_if_schema_changed( - ObMediumCompactionInfo &medium_info) + const ObTablet &tablet, + const ObStorageSchema &schema, + bool &is_schema_changed) { int ret = OB_SUCCESS; int64_t full_stored_col_cnt = 0; - const ObStorageSchema &schema = medium_info.storage_schema_; if (OB_UNLIKELY(!schema.is_inited())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("schema is not inited", KR(ret), K(schema)); } else if (OB_FAIL(schema.get_stored_column_count_in_sstable(full_stored_col_cnt))) { LOG_WARN("failed to get stored column count in sstable", K(ret), K(schema)); - } else if (OB_UNLIKELY(tablet_handle_.get_obj()->get_last_major_column_count() > full_stored_col_cnt)) { + } else if (OB_UNLIKELY(tablet.get_last_major_column_count() > full_stored_col_cnt)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("stored col cnt in curr schema is less than old major sstable", K(ret), - "col_cnt_in_sstable", tablet_handle_.get_obj()->get_last_major_column_count(), - "col_cnt_in_schema", full_stored_col_cnt, KPC(this)); - } else if (medium_info.data_version_ >= DATA_VERSION_4_3_3_0) { - // do not check schema changed - medium_info.is_schema_changed_ = false; - } else if (tablet_handle_.get_obj()->get_last_major_column_count() != full_stored_col_cnt - || tablet_handle_.get_obj()->get_last_major_compressor_type() != schema.get_compressor_type() - || (ObRowStoreType::DUMMY_ROW_STORE != tablet_handle_.get_obj()->get_last_major_latest_row_store_type() - && tablet_handle_.get_obj()->get_last_major_latest_row_store_type() != schema.row_store_type_)) { - medium_info.is_schema_changed_ = true; - LOG_INFO("schema changed", KPC(this), K(schema), K(full_stored_col_cnt), - "col_cnt_in_sstable", tablet_handle_.get_obj()->get_last_major_column_count(), - "compressor_type_in_sstable", tablet_handle_.get_obj()->get_last_major_compressor_type(), - "latest_row_store_type_in_sstable", tablet_handle_.get_obj()->get_last_major_latest_row_store_type()); + "col_cnt_in_sstable", tablet.get_last_major_column_count(), + "col_cnt_in_schema", full_stored_col_cnt, K(schema)); + } else if (tablet.get_last_major_column_count() != full_stored_col_cnt + || tablet.get_last_major_compressor_type() != schema.get_compressor_type() + || (ObRowStoreType::DUMMY_ROW_STORE != tablet.get_last_major_latest_row_store_type() + && tablet.get_last_major_latest_row_store_type() != schema.row_store_type_)) { + is_schema_changed = true; + LOG_INFO("schema changed", K(schema), K(schema), K(full_stored_col_cnt), + "col_cnt_in_sstable", tablet.get_last_major_column_count(), + "compressor_type_in_sstable", tablet.get_last_major_compressor_type(), + "latest_row_store_type_in_sstable", tablet.get_last_major_latest_row_store_type()); } else { - medium_info.is_schema_changed_ = false; + is_schema_changed = false; } return ret; } diff --git a/src/storage/compaction/ob_medium_compaction_func.h b/src/storage/compaction/ob_medium_compaction_func.h index 7ac268fe2..ddd5b2a8e 100644 --- a/src/storage/compaction/ob_medium_compaction_func.h +++ b/src/storage/compaction/ob_medium_compaction_func.h @@ -91,6 +91,10 @@ public: const ObTabletID &tablet_id, const int64_t schema_version, uint64_t &table_id); + static int check_if_schema_changed( + const ObTablet &tablet, + const ObStorageSchema &storage_schema, + bool &is_schema_changed); #ifdef OB_BUILD_SHARED_STORAGE // medium compaction is not considered int prepare_ls_major_merge_info( @@ -106,7 +110,6 @@ public: const storage::ObStorageSchema &storage_schema, bool &is_progressive_merge); #endif - int64_t to_string(char* buf, const int64_t buf_len) const; protected: int decide_medium_snapshot(bool &medium_clog_submitted); diff --git a/src/storage/compaction/ob_schedule_status_cache.cpp b/src/storage/compaction/ob_schedule_status_cache.cpp index 5b6dc28f2..4fe41ee14 100644 --- a/src/storage/compaction/ob_schedule_status_cache.cpp +++ b/src/storage/compaction/ob_schedule_status_cache.cpp @@ -150,6 +150,7 @@ const static char * ObTabletScheduleNewRoundStateStr[] = { "DURING_SPLIT", "NEED_CHECK_LAST_MEDIUM_CKM", "EXIST_UNFINISH_MEDIUM", + "SCHEDULE_CONFLICT", "NONE", "STATE_MAX" }; @@ -367,7 +368,12 @@ int ObTabletStatusCache::register_map( bool could_schedule_merge = false; if (OB_FAIL(MTL(ObTenantTabletScheduler*)->tablet_start_schedule_medium( tablet_id, could_schedule_merge))) { - LOG_WARN("failed to add tablet", K(ret), K(tablet_id)); + if (OB_ENTRY_EXIST == ret) { + new_round_state_ = SCHEDULE_CONFLICT; + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to add tablet", K(ret), K(tablet_id)); + } } else if (could_schedule_merge) { new_round_state_ = CAN_SCHEDULE_NEW_ROUND; } else { diff --git a/src/storage/compaction/ob_schedule_status_cache.h b/src/storage/compaction/ob_schedule_status_cache.h index 351a4d706..966621a23 100644 --- a/src/storage/compaction/ob_schedule_status_cache.h +++ b/src/storage/compaction/ob_schedule_status_cache.h @@ -93,6 +93,7 @@ public: DURING_SPLIT, NEED_CHECK_LAST_MEDIUM_CKM, EXIST_UNFINISH_MEDIUM, + SCHEDULE_CONFLICT, DIAGNOSE_NORMAL, // for diagnose NEW_ROUND_STATE_MAX, }; @@ -131,6 +132,8 @@ public: bool need_diagnose() const; bool could_schedule_new_round() const { return can_merge() && inner_check_new_round_state(); } bool tablet_merge_finish() const { return tablet_merge_finish_; } + + // CAREFUL! medium list may be NULL for some situation const compaction::ObMediumCompactionInfoList *medium_list() const { return medium_list_; } TabletExecuteState get_execute_state() const { return execute_state_; } TabletScheduleNewRoundState get_new_round_state() const { return new_round_state_; } diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index fcbf255c6..d5ba08c48 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -549,7 +549,6 @@ int ObTabletMajorMergeCtx::prepare_schema() return ret; } - #ifdef OB_BUILD_SHARED_STORAGE /* * ----------------------------------------------ObSSMergeCtx-------------------------------------------------- diff --git a/src/storage/compaction/ob_tablet_merge_ctx.h b/src/storage/compaction/ob_tablet_merge_ctx.h index 722c76484..2c5eb3979 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.h +++ b/src/storage/compaction/ob_tablet_merge_ctx.h @@ -106,8 +106,8 @@ protected: virtual int try_swap_tablet(ObGetMergeTablesResult &get_merge_table_result) override { return ObBasicTabletMergeCtx::swap_tablet(get_merge_table_result); } virtual int cal_merge_param() override { - return static_param_.cal_major_merge_param(false /*force_full_merge*/, - progressive_merge_mgr_); + return ObBasicTabletMergeCtx::cal_major_merge_param( + false /*force_full_merge*/, progressive_merge_mgr_); } }; diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index cd2823322..8bbce5d30 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -754,7 +754,7 @@ int ObProhibitScheduleMediumMap::add_flag(const ObTabletID &tablet_id, const Pro LOG_INFO("flag in conflict", K(ret), K(tablet_id), K(tmp_flag), K(input_flag)); } } else { // tmp_flag == input_flag - ret = OB_ERR_UNEXPECTED; + ret = OB_ENTRY_EXIST; LOG_WARN("flag in already exist", K(ret), K(tablet_id), K(tmp_flag), K(input_flag)); } } @@ -950,7 +950,7 @@ int ObTenantTabletScheduler::tablet_start_schedule_medium(const ObTabletID &tabl if (OB_EAGAIN == ret) { tablet_could_schedule_medium = false; ret = OB_SUCCESS; - } else { + } else if (OB_ENTRY_EXIST != ret) { LOG_WARN("failed to add flag for tablet schedule medium", K(ret), K(tablet_id)); } } else { diff --git a/unittest/storage/test_major_rows_merger.cpp b/unittest/storage/test_major_rows_merger.cpp index b4c4407a1..85f142dda 100644 --- a/unittest/storage/test_major_rows_merger.cpp +++ b/unittest/storage/test_major_rows_merger.cpp @@ -30,6 +30,20 @@ using namespace share::schema; using namespace blocksstable; using namespace compaction; using namespace unittest; +namespace compaction +{ +int ObBasicTabletMergeCtx::cal_major_merge_param( + const bool force_full_merge, + ObProgressiveMergeMgr &progressive_mgr) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(static_param_.cal_major_merge_param(force_full_merge, + progressive_mgr))) { + LOG_WARN("failed to calc major param", KR(ret), K_(static_param)); + } + return ret; +} +} // namespace compaction namespace storage { namespace mds