use small batch size to fetch checksum
This commit is contained in:
		| @ -2448,12 +2448,13 @@ int ObService::inner_fill_tablet_info_( | ||||
|       LOG_WARN("fail to init a tablet replica", KR(ret), K(tenant_id), | ||||
|           K(tablet_id), K(tablet_replica)); | ||||
|     } else if (!need_checksum) { | ||||
|     } else if (OB_FAIL(tablet_checksum.set_tenant_id(tenant_id))) { | ||||
|       LOG_WARN("failed to set tenant id", KR(ret), K(tenant_id)); | ||||
|     } else if (OB_FAIL(tablet_checksum.column_meta_.init(column_checksums))) { | ||||
|       LOG_WARN("fail to init report column meta with column_checksums", KR(ret), K(column_checksums)); | ||||
|     } else if (OB_FAIL(tablet_checksum.compaction_scn_.convert_for_tx(snapshot_version))) { | ||||
|       LOG_WARN("failed to convert scn", KR(ret), K(snapshot_version)); | ||||
|     } else { | ||||
|       tablet_checksum.tenant_id_ = tenant_id; | ||||
|       tablet_checksum.ls_id_ = ls->get_ls_id(); | ||||
|       tablet_checksum.tablet_id_ = tablet_id; | ||||
|       tablet_checksum.server_ = gctx_.self_addr(); | ||||
|  | ||||
| @ -17,12 +17,17 @@ namespace compaction | ||||
| { | ||||
| void ObScheduleBatchSizeMgr::set_tablet_batch_size(const int64_t tablet_batch_size) | ||||
| { | ||||
|   if (tablet_batch_size != tablet_batch_size_) { | ||||
|   if (tablet_batch_size != tablet_batch_size_ && tablet_batch_size > 0) { | ||||
|     LOG_INFO("succeeded to reload new merge schedule tablet batch cnt", K(tablet_batch_size)); | ||||
|     tablet_batch_size_ = tablet_batch_size; | ||||
|   } | ||||
| } | ||||
|  | ||||
| int64_t ObScheduleBatchSizeMgr::get_checker_batch_size() const | ||||
| { | ||||
|   return MAX(DEFAULT_CHECKER_BATCH_SIZE, tablet_batch_size_ / 100); | ||||
| } | ||||
|  | ||||
| void ObScheduleBatchSizeMgr::get_rs_check_batch_size( | ||||
|     const int64_t table_cnt, | ||||
|     int64_t &table_id_batch_size) const | ||||
|  | ||||
| @ -23,6 +23,7 @@ struct ObScheduleBatchSizeMgr | ||||
|   ~ObScheduleBatchSizeMgr() {} | ||||
|   void set_tablet_batch_size(const int64_t tablet_batch_size); | ||||
|   int64_t get_schedule_batch_size() const { return tablet_batch_size_; } | ||||
|   int64_t get_checker_batch_size() const; | ||||
|   void get_rs_check_batch_size( | ||||
|     const int64_t table_cnt, | ||||
|     int64_t &table_id_batch_size) const; | ||||
| @ -37,6 +38,8 @@ private: | ||||
|   const static int64_t TABLE_ID_BATCH_CHECK_SIZE = 200; | ||||
|   const static int64_t TOTAL_TABLE_CNT_THREASHOLD = 100 * 1000; // 10w | ||||
|   const static int64_t DEFAULT_INNER_TABLE_SCAN_BATCH_SIZE = 500; | ||||
|   const static int64_t DEFAULT_CHECKER_BATCH_SIZE = 200; | ||||
|   // cached compaction_schedule_tablet_batch_cnt: [10000,200000] | ||||
|   int64_t tablet_batch_size_; | ||||
| }; | ||||
|  | ||||
|  | ||||
| @ -331,10 +331,11 @@ int ObTabletReplicaChecksumItem::assign(const ObTabletReplicaChecksumItem &other | ||||
|   int ret = OB_SUCCESS; | ||||
|   if (this != &other) { | ||||
|     reset(); | ||||
|     if (OB_FAIL(column_meta_.assign(other.column_meta_))) { | ||||
|     if (OB_FAIL(set_tenant_id(other.tenant_id_))) { | ||||
|       LOG_WARN("failed to set tenant id", KR(ret), K(other)); | ||||
|     } else if (OB_FAIL(column_meta_.assign(other.column_meta_))) { | ||||
|       LOG_WARN("fail to assign column meta", KR(ret), K(other)); | ||||
|     } else { | ||||
|       tenant_id_ = other.tenant_id_; | ||||
|       tablet_id_ = other.tablet_id_; | ||||
|       ls_id_ = other.ls_id_; | ||||
|       server_ = other.server_; | ||||
| @ -346,6 +347,19 @@ int ObTabletReplicaChecksumItem::assign(const ObTabletReplicaChecksumItem &other | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObTabletReplicaChecksumItem::set_tenant_id(const uint64_t tenant_id) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { | ||||
|     ret = OB_INVALID_ARGUMENT; | ||||
|     LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); | ||||
|   } else { | ||||
|     tenant_id_ = tenant_id; | ||||
|     column_meta_.column_checksums_.set_attr(ObMemAttr(tenant_id, "RepCkmItem")); | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| /****************************** ObTabletReplicaChecksumOperator ******************************/ | ||||
|  | ||||
| int ObTabletReplicaChecksumOperator::batch_remove_with_trans( | ||||
| @ -651,8 +665,9 @@ int ObTabletReplicaChecksumOperator::construct_tablet_replica_checksum_item_( | ||||
|  | ||||
|   if (OB_FAIL(item.compaction_scn_.convert_for_inner_table_field(compaction_scn_val))) { | ||||
|     LOG_WARN("fail to convert val to SCN", KR(ret), K(compaction_scn_val)); | ||||
|   } else if (OB_FAIL(item.set_tenant_id((uint64_t)int_tenant_id))) { | ||||
|     LOG_WARN("failed to set tenant id", KR(ret), K(int_tenant_id)); | ||||
|   } else { | ||||
|     item.tenant_id_ = (uint64_t)int_tenant_id; | ||||
|     item.tablet_id_ = (uint64_t)int_tablet_id; | ||||
|     item.ls_id_ = ls_id; | ||||
|     if (OB_UNLIKELY(!item.server_.set_ip_addr(ip, static_cast<int32_t>(port)))) { | ||||
|  | ||||
| @ -85,6 +85,7 @@ public: | ||||
|   int verify_checksum(const ObTabletReplicaChecksumItem &other) const; | ||||
|   int assign_key(const ObTabletReplicaChecksumItem &other); | ||||
|   int assign(const ObTabletReplicaChecksumItem &other); | ||||
|   int set_tenant_id(const uint64_t tenant_id); | ||||
|  | ||||
|   TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), K_(server), K_(row_count), | ||||
|       K_(compaction_scn), K_(data_checksum), K_(column_meta)); | ||||
|  | ||||
| @ -990,7 +990,7 @@ int ObCompactionDiagnoseMgr::check_ls_status( | ||||
|     ret = OB_ERR_UNEXPECTED; | ||||
|     LOG_WARN("ls is null", K(ret), K(ls_id)); | ||||
|   } else if (!ls_hash_exist) { | ||||
|     if (OB_TMP_FAIL(ObTenantTabletScheduler::check_ls_state(*ls, need_merge))) { | ||||
|     if (OB_TMP_FAIL(ObTabletMergeChecker::check_ls_state(*ls, need_merge))) { | ||||
|       LOG_WARN("failed to check ls state", K(tmp_ret), KPC(ls), K(need_merge)); | ||||
|     } else if (need_merge) { | ||||
|       weak_read_ts_ready = ObTenantTabletScheduler::check_weak_read_ts_ready(compaction_scn, *ls); | ||||
|  | ||||
| @ -1202,18 +1202,16 @@ int ObMediumCompactionScheduleFunc::batch_check_medium_finish( | ||||
|     checksum_items.set_attr(ObMemAttr(MTL_ID(), "CkmItems")); | ||||
|     if (OB_FAIL(batch_check_medium_meta_table(tablet_ls_infos, ls_info_map, finish_tablet_ls_infos, time_guard))) { | ||||
|       LOG_WARN("failed to check inner table", K(ret), K(tablet_ls_infos)); | ||||
|     } else { | ||||
|       if (OB_SUCC(ret) && !finish_tablet_ls_infos.empty()) { | ||||
|         if (OB_FAIL(checksum_items.reserve(finish_tablet_ls_infos.count()))) { | ||||
|           LOG_WARN("failed to reserve array", KR(ret), "array_cnt", finish_tablet_ls_infos.count()); | ||||
|         } else if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablets_replica_checksum( | ||||
|             MTL_ID(), finish_tablet_ls_infos, checksum_items))) { | ||||
|           LOG_WARN("failed to get tablet checksum", K(ret)); | ||||
|         } else if (FALSE_IT(time_guard.click(ObCompactionScheduleTimeGuard::SEARCH_CHECKSUM))) { | ||||
|         } else if (OB_FAIL(batch_check_medium_checksum(finish_tablet_ls_infos, checksum_items))) { | ||||
|           LOG_WARN("failed to check medium tablets checksum", K(ret)); | ||||
|         } else if (FALSE_IT(time_guard.click(ObCompactionScheduleTimeGuard::CHECK_CHECKSUM))) { | ||||
|         } | ||||
|     } else if (!finish_tablet_ls_infos.empty()) { | ||||
|       if (OB_FAIL(checksum_items.reserve(finish_tablet_ls_infos.count()))) { | ||||
|         LOG_WARN("failed to reserve array", KR(ret), "array_cnt", finish_tablet_ls_infos.count()); | ||||
|       } else if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablets_replica_checksum( | ||||
|           MTL_ID(), finish_tablet_ls_infos, checksum_items))) { | ||||
|         LOG_WARN("failed to get tablet checksum", K(ret)); | ||||
|       } else if (FALSE_IT(time_guard.click(ObCompactionScheduleTimeGuard::SEARCH_CHECKSUM))) { | ||||
|       } else if (OB_FAIL(batch_check_medium_checksum(finish_tablet_ls_infos, checksum_items))) { | ||||
|         LOG_WARN("failed to check medium tablets checksum", K(ret)); | ||||
|       } else if (FALSE_IT(time_guard.click(ObCompactionScheduleTimeGuard::CHECK_CHECKSUM))) { | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|  | ||||
| @ -15,6 +15,7 @@ | ||||
| #include "lib/ob_errno.h" | ||||
| #include "storage/compaction/ob_compaction_util.h" | ||||
| #include "storage/tablet/ob_tablet.h" | ||||
| #include "storage/ls/ob_ls.h" | ||||
|  | ||||
| #define USING_LOG_PREFIX STORAGE_COMPACTION | ||||
|  | ||||
| @ -80,5 +81,45 @@ int ObTabletMergeChecker::check_could_merge_for_medium( | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObTabletMergeChecker::check_ls_state(ObLS &ls, bool &need_merge) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   need_merge = false; | ||||
|   if (ls.is_deleted()) { | ||||
|     if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) { | ||||
|       LOG_INFO("ls is deleted", K(ret), K(ls)); | ||||
|     } | ||||
|   } else if (ls.is_offline()) { | ||||
|     if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) { | ||||
|       LOG_INFO("ls is offline", K(ret), K(ls)); | ||||
|     } | ||||
|   } else { | ||||
|     need_merge = true; | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObTabletMergeChecker::check_ls_state_in_major(ObLS &ls, bool &need_merge) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   need_merge = false; | ||||
|   ObLSRestoreStatus restore_status; | ||||
|   if (OB_FAIL(check_ls_state(ls, need_merge))) { | ||||
|     LOG_WARN("failed to check ls state", KR(ret), "ls_id", ls.get_ls_id()); | ||||
|   } else if (!need_merge) { | ||||
|     // do nothing | ||||
|   } else if (OB_FAIL(ls.get_ls_meta().get_restore_status(restore_status))) { | ||||
|     LOG_WARN("failed to get restore status", K(ret), K(ls)); | ||||
|   } else if (OB_UNLIKELY(!restore_status.is_restore_none())) { | ||||
|     if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) { | ||||
|       LOG_INFO("ls is in restore status, should not loop tablet to schedule", K(ret), "ls_id", ls.get_ls_id()); | ||||
|     } | ||||
|   } else { | ||||
|     need_merge = true; | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
|  | ||||
| } // namespace compaction | ||||
| } // namespace oceanbase | ||||
|  | ||||
| @ -21,6 +21,7 @@ namespace oceanbase | ||||
| namespace storage | ||||
| { | ||||
| class ObTablet; | ||||
| class ObLS; | ||||
| } | ||||
|  | ||||
| namespace compaction | ||||
| @ -32,6 +33,8 @@ public: | ||||
|   static int check_could_merge_for_medium( | ||||
|     const storage::ObTablet &tablet, | ||||
|     bool &could_schedule_merge); | ||||
|   static int check_ls_state(storage::ObLS &ls, bool &need_merge); | ||||
|   static int check_ls_state_in_major(storage::ObLS &ls, bool &need_merge); | ||||
| private: | ||||
|   static const int64_t PRINT_LOG_INVERVAL = 2 * 60 * 1000 * 1000L; // 2m | ||||
| }; | ||||
|  | ||||
| @ -252,7 +252,7 @@ int ObTenantMediumChecker::check_medium_finish_schedule() | ||||
|         tablet_ls_set_.clear(); | ||||
|       } | ||||
|     } | ||||
|     const int64_t batch_size = MTL(ObTenantTabletScheduler *)->get_schedule_batch_size(); | ||||
|     const int64_t batch_size = MTL(ObTenantTabletScheduler *)->get_checker_batch_size(); | ||||
|     if (OB_FAIL(ret) || tablet_ls_infos.empty()) { | ||||
|     } else if (OB_FAIL(batch_tablet_ls_infos.reserve(batch_size))) { | ||||
|       LOG_WARN("fail to reserve array", K(ret), "size", batch_size); | ||||
|  | ||||
| @ -796,45 +796,6 @@ bool ObTenantTabletScheduler::check_tx_table_ready(ObLS &ls, const SCN &check_sc | ||||
|   return tx_table_ready; | ||||
| } | ||||
|  | ||||
| int ObTenantTabletScheduler::check_ls_state(ObLS &ls, bool &need_merge) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   need_merge = false; | ||||
|   if (ls.is_deleted()) { | ||||
|     if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) { | ||||
|       LOG_INFO("ls is deleted", K(ret), K(ls)); | ||||
|     } | ||||
|   } else if (ls.is_offline()) { | ||||
|     if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) { | ||||
|       LOG_INFO("ls is offline", K(ret), K(ls)); | ||||
|     } | ||||
|   } else { | ||||
|     need_merge = true; | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObTenantTabletScheduler::check_ls_state_in_major(ObLS &ls, bool &need_merge) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   need_merge = false; | ||||
|   ObLSRestoreStatus restore_status; | ||||
|   if (OB_FAIL(check_ls_state(ls, need_merge))) { | ||||
|     LOG_WARN("failed to check ls state", KR(ret), "ls_id", ls.get_ls_id()); | ||||
|   } else if (!need_merge) { | ||||
|     // do nothing | ||||
|   } else if (OB_FAIL(ls.get_ls_meta().get_restore_status(restore_status))) { | ||||
|     LOG_WARN("failed to get restore status", K(ret), K(ls)); | ||||
|   } else if (OB_UNLIKELY(!restore_status.is_restore_none())) { | ||||
|     if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) { | ||||
|       LOG_INFO("ls is in restore status, should not loop tablet to schedule", K(ret), "ls_id", ls.get_ls_id()); | ||||
|     } | ||||
|   } else { | ||||
|     need_merge = true; | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObTenantTabletScheduler::schedule_merge_dag( | ||||
|     const ObLSID &ls_id, | ||||
|     const storage::ObTablet &tablet, | ||||
| @ -1156,7 +1117,7 @@ int ObTenantTabletScheduler::schedule_ls_minor_merge( | ||||
|   bool need_fast_freeze = false; | ||||
|   ObLS &ls = *ls_handle.get_ls(); | ||||
|   const ObLSID &ls_id = ls.get_ls_id(); | ||||
|   if (OB_FAIL(check_ls_state(ls, need_merge))) { | ||||
|   if (OB_FAIL(ObTabletMergeChecker::check_ls_state(ls, need_merge))) { | ||||
|     LOG_WARN("failed to check ls state", K(ret), K(ls)); | ||||
|   } else if (!need_merge) { | ||||
|     // no need to merge, do nothing | ||||
| @ -1386,7 +1347,7 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge( | ||||
|   ObLS &ls = *ls_handle.get_ls(); | ||||
|   const ObLSID &ls_id = ls.get_ls_id(); | ||||
|   bool ls_could_schedule_medium = false; | ||||
|   if (OB_FAIL(check_ls_state_in_major(ls, need_merge))) { | ||||
|   if (OB_FAIL(ObTabletMergeChecker::check_ls_state_in_major(ls, need_merge))) { | ||||
|     LOG_WARN("failed to check ls can schedule medium", K(ret), K(ls)); | ||||
|   } else if (!need_merge) { | ||||
|     // no need to merge, do nothing // TODO(@jingshui): add diagnose info | ||||
| @ -1822,7 +1783,7 @@ int ObTenantTabletScheduler::try_schedule_tablet_medium_merge( | ||||
|     LOG_WARN("major compaction is suspended", K(ret), K(ls_id), K(tablet_id)); | ||||
|   } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { | ||||
|     LOG_WARN("failed to get ls", K(ret), K(ls_id)); | ||||
|   } else if (OB_FAIL(check_ls_state_in_major(*ls_handle.get_ls(), can_merge))) { | ||||
|   } else if (OB_FAIL(ObTabletMergeChecker::check_ls_state_in_major(*ls_handle.get_ls(), can_merge))) { | ||||
|     LOG_WARN("failed to check ls can schedule medium", K(ret), K(ls_handle)); | ||||
|   } else if (!can_merge) { | ||||
|     // can't merge, do nothing | ||||
| @ -1883,7 +1844,7 @@ int ObTenantTabletScheduler::update_report_scn_as_ls_leader(ObLS &ls) | ||||
|   bool is_election_leader = false; | ||||
|   const int64_t major_merged_scn = get_inner_table_merged_scn(); | ||||
|   bool need_merge = false; | ||||
|   if (OB_FAIL(check_ls_state(ls, need_merge))) { | ||||
|   if (OB_FAIL(ObTabletMergeChecker::check_ls_state(ls, need_merge))) { | ||||
|     LOG_WARN("failed to check ls state", K(ret), K(ls_id)); | ||||
|   } else if (!need_merge) { | ||||
|     ret = OB_STATE_NOT_MATCH; // do nothing | ||||
|  | ||||
| @ -198,7 +198,6 @@ public: | ||||
|       const blocksstable::MacroBlockId ¯o_id, | ||||
|       const int64_t prefix_len); | ||||
|   static bool check_tx_table_ready(ObLS &ls, const share::SCN &check_scn); | ||||
|   static int check_ls_state(ObLS &ls, bool &need_merge); | ||||
|   static int fill_minor_compaction_param( | ||||
|       const ObTabletHandle &tablet_handle, | ||||
|       const ObGetMergeTablesResult &result, | ||||
| @ -206,7 +205,6 @@ public: | ||||
|       const int64_t parallel_dag_cnt, | ||||
|       const int64_t create_time, | ||||
|       compaction::ObTabletMergeDagParam ¶m); | ||||
|   static int check_ls_state_in_major(ObLS &ls, bool &need_merge); | ||||
|   template <class T> | ||||
|   static int schedule_tablet_minor_merge( | ||||
|       ObLSHandle &ls_handle, | ||||
| @ -244,6 +242,7 @@ public: | ||||
|     const ObIArray<compaction::ObTabletCheckInfo> &tablet_ls_infos, | ||||
|     const ObIArray<compaction::ObTabletCheckInfo> &finish_tablet_ls_infos); | ||||
|   OB_INLINE int64_t get_schedule_batch_size() const { return batch_size_mgr_.get_schedule_batch_size(); } | ||||
|   OB_INLINE int64_t get_checker_batch_size() const { return batch_size_mgr_.get_checker_batch_size(); } | ||||
| private: | ||||
|   friend struct ObTenantTabletSchedulerTaskMgr; | ||||
|   int schedule_next_medium_for_leader( | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 obdev
					obdev