use small batch size to fetch checksum
This commit is contained in:
parent
8187d25c4f
commit
d8881da113
@ -2448,12 +2448,13 @@ int ObService::inner_fill_tablet_info_(
|
||||
LOG_WARN("fail to init a tablet replica", KR(ret), K(tenant_id),
|
||||
K(tablet_id), K(tablet_replica));
|
||||
} else if (!need_checksum) {
|
||||
} else if (OB_FAIL(tablet_checksum.set_tenant_id(tenant_id))) {
|
||||
LOG_WARN("failed to set tenant id", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tablet_checksum.column_meta_.init(column_checksums))) {
|
||||
LOG_WARN("fail to init report column meta with column_checksums", KR(ret), K(column_checksums));
|
||||
} else if (OB_FAIL(tablet_checksum.compaction_scn_.convert_for_tx(snapshot_version))) {
|
||||
LOG_WARN("failed to convert scn", KR(ret), K(snapshot_version));
|
||||
} else {
|
||||
tablet_checksum.tenant_id_ = tenant_id;
|
||||
tablet_checksum.ls_id_ = ls->get_ls_id();
|
||||
tablet_checksum.tablet_id_ = tablet_id;
|
||||
tablet_checksum.server_ = gctx_.self_addr();
|
||||
|
@ -17,12 +17,17 @@ namespace compaction
|
||||
{
|
||||
void ObScheduleBatchSizeMgr::set_tablet_batch_size(const int64_t tablet_batch_size)
|
||||
{
|
||||
if (tablet_batch_size != tablet_batch_size_) {
|
||||
if (tablet_batch_size != tablet_batch_size_ && tablet_batch_size > 0) {
|
||||
LOG_INFO("succeeded to reload new merge schedule tablet batch cnt", K(tablet_batch_size));
|
||||
tablet_batch_size_ = tablet_batch_size;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t ObScheduleBatchSizeMgr::get_checker_batch_size() const
|
||||
{
|
||||
return MAX(DEFAULT_CHECKER_BATCH_SIZE, tablet_batch_size_ / 100);
|
||||
}
|
||||
|
||||
void ObScheduleBatchSizeMgr::get_rs_check_batch_size(
|
||||
const int64_t table_cnt,
|
||||
int64_t &table_id_batch_size) const
|
||||
|
@ -23,6 +23,7 @@ struct ObScheduleBatchSizeMgr
|
||||
~ObScheduleBatchSizeMgr() {}
|
||||
void set_tablet_batch_size(const int64_t tablet_batch_size);
|
||||
int64_t get_schedule_batch_size() const { return tablet_batch_size_; }
|
||||
int64_t get_checker_batch_size() const;
|
||||
void get_rs_check_batch_size(
|
||||
const int64_t table_cnt,
|
||||
int64_t &table_id_batch_size) const;
|
||||
@ -37,6 +38,8 @@ private:
|
||||
const static int64_t TABLE_ID_BATCH_CHECK_SIZE = 200;
|
||||
const static int64_t TOTAL_TABLE_CNT_THREASHOLD = 100 * 1000; // 10w
|
||||
const static int64_t DEFAULT_INNER_TABLE_SCAN_BATCH_SIZE = 500;
|
||||
const static int64_t DEFAULT_CHECKER_BATCH_SIZE = 200;
|
||||
// cached compaction_schedule_tablet_batch_cnt: [10000,200000]
|
||||
int64_t tablet_batch_size_;
|
||||
};
|
||||
|
||||
|
@ -331,10 +331,11 @@ int ObTabletReplicaChecksumItem::assign(const ObTabletReplicaChecksumItem &other
|
||||
int ret = OB_SUCCESS;
|
||||
if (this != &other) {
|
||||
reset();
|
||||
if (OB_FAIL(column_meta_.assign(other.column_meta_))) {
|
||||
if (OB_FAIL(set_tenant_id(other.tenant_id_))) {
|
||||
LOG_WARN("failed to set tenant id", KR(ret), K(other));
|
||||
} else if (OB_FAIL(column_meta_.assign(other.column_meta_))) {
|
||||
LOG_WARN("fail to assign column meta", KR(ret), K(other));
|
||||
} else {
|
||||
tenant_id_ = other.tenant_id_;
|
||||
tablet_id_ = other.tablet_id_;
|
||||
ls_id_ = other.ls_id_;
|
||||
server_ = other.server_;
|
||||
@ -346,6 +347,19 @@ int ObTabletReplicaChecksumItem::assign(const ObTabletReplicaChecksumItem &other
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletReplicaChecksumItem::set_tenant_id(const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
column_meta_.column_checksums_.set_attr(ObMemAttr(tenant_id, "RepCkmItem"));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/****************************** ObTabletReplicaChecksumOperator ******************************/
|
||||
|
||||
int ObTabletReplicaChecksumOperator::batch_remove_with_trans(
|
||||
@ -651,8 +665,9 @@ int ObTabletReplicaChecksumOperator::construct_tablet_replica_checksum_item_(
|
||||
|
||||
if (OB_FAIL(item.compaction_scn_.convert_for_inner_table_field(compaction_scn_val))) {
|
||||
LOG_WARN("fail to convert val to SCN", KR(ret), K(compaction_scn_val));
|
||||
} else if (OB_FAIL(item.set_tenant_id((uint64_t)int_tenant_id))) {
|
||||
LOG_WARN("failed to set tenant id", KR(ret), K(int_tenant_id));
|
||||
} else {
|
||||
item.tenant_id_ = (uint64_t)int_tenant_id;
|
||||
item.tablet_id_ = (uint64_t)int_tablet_id;
|
||||
item.ls_id_ = ls_id;
|
||||
if (OB_UNLIKELY(!item.server_.set_ip_addr(ip, static_cast<int32_t>(port)))) {
|
||||
|
@ -85,6 +85,7 @@ public:
|
||||
int verify_checksum(const ObTabletReplicaChecksumItem &other) const;
|
||||
int assign_key(const ObTabletReplicaChecksumItem &other);
|
||||
int assign(const ObTabletReplicaChecksumItem &other);
|
||||
int set_tenant_id(const uint64_t tenant_id);
|
||||
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), K_(server), K_(row_count),
|
||||
K_(compaction_scn), K_(data_checksum), K_(column_meta));
|
||||
|
@ -990,7 +990,7 @@ int ObCompactionDiagnoseMgr::check_ls_status(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls is null", K(ret), K(ls_id));
|
||||
} else if (!ls_hash_exist) {
|
||||
if (OB_TMP_FAIL(ObTenantTabletScheduler::check_ls_state(*ls, need_merge))) {
|
||||
if (OB_TMP_FAIL(ObTabletMergeChecker::check_ls_state(*ls, need_merge))) {
|
||||
LOG_WARN("failed to check ls state", K(tmp_ret), KPC(ls), K(need_merge));
|
||||
} else if (need_merge) {
|
||||
weak_read_ts_ready = ObTenantTabletScheduler::check_weak_read_ts_ready(compaction_scn, *ls);
|
||||
|
@ -1202,18 +1202,16 @@ int ObMediumCompactionScheduleFunc::batch_check_medium_finish(
|
||||
checksum_items.set_attr(ObMemAttr(MTL_ID(), "CkmItems"));
|
||||
if (OB_FAIL(batch_check_medium_meta_table(tablet_ls_infos, ls_info_map, finish_tablet_ls_infos, time_guard))) {
|
||||
LOG_WARN("failed to check inner table", K(ret), K(tablet_ls_infos));
|
||||
} else {
|
||||
if (OB_SUCC(ret) && !finish_tablet_ls_infos.empty()) {
|
||||
if (OB_FAIL(checksum_items.reserve(finish_tablet_ls_infos.count()))) {
|
||||
LOG_WARN("failed to reserve array", KR(ret), "array_cnt", finish_tablet_ls_infos.count());
|
||||
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablets_replica_checksum(
|
||||
MTL_ID(), finish_tablet_ls_infos, checksum_items))) {
|
||||
LOG_WARN("failed to get tablet checksum", K(ret));
|
||||
} else if (FALSE_IT(time_guard.click(ObCompactionScheduleTimeGuard::SEARCH_CHECKSUM))) {
|
||||
} else if (OB_FAIL(batch_check_medium_checksum(finish_tablet_ls_infos, checksum_items))) {
|
||||
LOG_WARN("failed to check medium tablets checksum", K(ret));
|
||||
} else if (FALSE_IT(time_guard.click(ObCompactionScheduleTimeGuard::CHECK_CHECKSUM))) {
|
||||
}
|
||||
} else if (!finish_tablet_ls_infos.empty()) {
|
||||
if (OB_FAIL(checksum_items.reserve(finish_tablet_ls_infos.count()))) {
|
||||
LOG_WARN("failed to reserve array", KR(ret), "array_cnt", finish_tablet_ls_infos.count());
|
||||
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablets_replica_checksum(
|
||||
MTL_ID(), finish_tablet_ls_infos, checksum_items))) {
|
||||
LOG_WARN("failed to get tablet checksum", K(ret));
|
||||
} else if (FALSE_IT(time_guard.click(ObCompactionScheduleTimeGuard::SEARCH_CHECKSUM))) {
|
||||
} else if (OB_FAIL(batch_check_medium_checksum(finish_tablet_ls_infos, checksum_items))) {
|
||||
LOG_WARN("failed to check medium tablets checksum", K(ret));
|
||||
} else if (FALSE_IT(time_guard.click(ObCompactionScheduleTimeGuard::CHECK_CHECKSUM))) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "lib/ob_errno.h"
|
||||
#include "storage/compaction/ob_compaction_util.h"
|
||||
#include "storage/tablet/ob_tablet.h"
|
||||
#include "storage/ls/ob_ls.h"
|
||||
|
||||
#define USING_LOG_PREFIX STORAGE_COMPACTION
|
||||
|
||||
@ -80,5 +81,45 @@ int ObTabletMergeChecker::check_could_merge_for_medium(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMergeChecker::check_ls_state(ObLS &ls, bool &need_merge)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
need_merge = false;
|
||||
if (ls.is_deleted()) {
|
||||
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
|
||||
LOG_INFO("ls is deleted", K(ret), K(ls));
|
||||
}
|
||||
} else if (ls.is_offline()) {
|
||||
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
|
||||
LOG_INFO("ls is offline", K(ret), K(ls));
|
||||
}
|
||||
} else {
|
||||
need_merge = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMergeChecker::check_ls_state_in_major(ObLS &ls, bool &need_merge)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
need_merge = false;
|
||||
ObLSRestoreStatus restore_status;
|
||||
if (OB_FAIL(check_ls_state(ls, need_merge))) {
|
||||
LOG_WARN("failed to check ls state", KR(ret), "ls_id", ls.get_ls_id());
|
||||
} else if (!need_merge) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ls.get_ls_meta().get_restore_status(restore_status))) {
|
||||
LOG_WARN("failed to get restore status", K(ret), K(ls));
|
||||
} else if (OB_UNLIKELY(!restore_status.is_restore_none())) {
|
||||
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
|
||||
LOG_INFO("ls is in restore status, should not loop tablet to schedule", K(ret), "ls_id", ls.get_ls_id());
|
||||
}
|
||||
} else {
|
||||
need_merge = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
} // namespace compaction
|
||||
} // namespace oceanbase
|
||||
|
@ -21,6 +21,7 @@ namespace oceanbase
|
||||
namespace storage
|
||||
{
|
||||
class ObTablet;
|
||||
class ObLS;
|
||||
}
|
||||
|
||||
namespace compaction
|
||||
@ -32,6 +33,8 @@ public:
|
||||
static int check_could_merge_for_medium(
|
||||
const storage::ObTablet &tablet,
|
||||
bool &could_schedule_merge);
|
||||
static int check_ls_state(storage::ObLS &ls, bool &need_merge);
|
||||
static int check_ls_state_in_major(storage::ObLS &ls, bool &need_merge);
|
||||
private:
|
||||
static const int64_t PRINT_LOG_INVERVAL = 2 * 60 * 1000 * 1000L; // 2m
|
||||
};
|
||||
|
@ -252,7 +252,7 @@ int ObTenantMediumChecker::check_medium_finish_schedule()
|
||||
tablet_ls_set_.clear();
|
||||
}
|
||||
}
|
||||
const int64_t batch_size = MTL(ObTenantTabletScheduler *)->get_schedule_batch_size();
|
||||
const int64_t batch_size = MTL(ObTenantTabletScheduler *)->get_checker_batch_size();
|
||||
if (OB_FAIL(ret) || tablet_ls_infos.empty()) {
|
||||
} else if (OB_FAIL(batch_tablet_ls_infos.reserve(batch_size))) {
|
||||
LOG_WARN("fail to reserve array", K(ret), "size", batch_size);
|
||||
|
@ -796,45 +796,6 @@ bool ObTenantTabletScheduler::check_tx_table_ready(ObLS &ls, const SCN &check_sc
|
||||
return tx_table_ready;
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::check_ls_state(ObLS &ls, bool &need_merge)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
need_merge = false;
|
||||
if (ls.is_deleted()) {
|
||||
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
|
||||
LOG_INFO("ls is deleted", K(ret), K(ls));
|
||||
}
|
||||
} else if (ls.is_offline()) {
|
||||
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
|
||||
LOG_INFO("ls is offline", K(ret), K(ls));
|
||||
}
|
||||
} else {
|
||||
need_merge = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::check_ls_state_in_major(ObLS &ls, bool &need_merge)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
need_merge = false;
|
||||
ObLSRestoreStatus restore_status;
|
||||
if (OB_FAIL(check_ls_state(ls, need_merge))) {
|
||||
LOG_WARN("failed to check ls state", KR(ret), "ls_id", ls.get_ls_id());
|
||||
} else if (!need_merge) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ls.get_ls_meta().get_restore_status(restore_status))) {
|
||||
LOG_WARN("failed to get restore status", K(ret), K(ls));
|
||||
} else if (OB_UNLIKELY(!restore_status.is_restore_none())) {
|
||||
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
|
||||
LOG_INFO("ls is in restore status, should not loop tablet to schedule", K(ret), "ls_id", ls.get_ls_id());
|
||||
}
|
||||
} else {
|
||||
need_merge = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::schedule_merge_dag(
|
||||
const ObLSID &ls_id,
|
||||
const storage::ObTablet &tablet,
|
||||
@ -1156,7 +1117,7 @@ int ObTenantTabletScheduler::schedule_ls_minor_merge(
|
||||
bool need_fast_freeze = false;
|
||||
ObLS &ls = *ls_handle.get_ls();
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
if (OB_FAIL(check_ls_state(ls, need_merge))) {
|
||||
if (OB_FAIL(ObTabletMergeChecker::check_ls_state(ls, need_merge))) {
|
||||
LOG_WARN("failed to check ls state", K(ret), K(ls));
|
||||
} else if (!need_merge) {
|
||||
// no need to merge, do nothing
|
||||
@ -1386,7 +1347,7 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
ObLS &ls = *ls_handle.get_ls();
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
bool ls_could_schedule_medium = false;
|
||||
if (OB_FAIL(check_ls_state_in_major(ls, need_merge))) {
|
||||
if (OB_FAIL(ObTabletMergeChecker::check_ls_state_in_major(ls, need_merge))) {
|
||||
LOG_WARN("failed to check ls can schedule medium", K(ret), K(ls));
|
||||
} else if (!need_merge) {
|
||||
// no need to merge, do nothing // TODO(@jingshui): add diagnose info
|
||||
@ -1822,7 +1783,7 @@ int ObTenantTabletScheduler::try_schedule_tablet_medium_merge(
|
||||
LOG_WARN("major compaction is suspended", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
LOG_WARN("failed to get ls", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(check_ls_state_in_major(*ls_handle.get_ls(), can_merge))) {
|
||||
} else if (OB_FAIL(ObTabletMergeChecker::check_ls_state_in_major(*ls_handle.get_ls(), can_merge))) {
|
||||
LOG_WARN("failed to check ls can schedule medium", K(ret), K(ls_handle));
|
||||
} else if (!can_merge) {
|
||||
// can't merge, do nothing
|
||||
@ -1883,7 +1844,7 @@ int ObTenantTabletScheduler::update_report_scn_as_ls_leader(ObLS &ls)
|
||||
bool is_election_leader = false;
|
||||
const int64_t major_merged_scn = get_inner_table_merged_scn();
|
||||
bool need_merge = false;
|
||||
if (OB_FAIL(check_ls_state(ls, need_merge))) {
|
||||
if (OB_FAIL(ObTabletMergeChecker::check_ls_state(ls, need_merge))) {
|
||||
LOG_WARN("failed to check ls state", K(ret), K(ls_id));
|
||||
} else if (!need_merge) {
|
||||
ret = OB_STATE_NOT_MATCH; // do nothing
|
||||
|
@ -198,7 +198,6 @@ public:
|
||||
const blocksstable::MacroBlockId ¯o_id,
|
||||
const int64_t prefix_len);
|
||||
static bool check_tx_table_ready(ObLS &ls, const share::SCN &check_scn);
|
||||
static int check_ls_state(ObLS &ls, bool &need_merge);
|
||||
static int fill_minor_compaction_param(
|
||||
const ObTabletHandle &tablet_handle,
|
||||
const ObGetMergeTablesResult &result,
|
||||
@ -206,7 +205,6 @@ public:
|
||||
const int64_t parallel_dag_cnt,
|
||||
const int64_t create_time,
|
||||
compaction::ObTabletMergeDagParam ¶m);
|
||||
static int check_ls_state_in_major(ObLS &ls, bool &need_merge);
|
||||
template <class T>
|
||||
static int schedule_tablet_minor_merge(
|
||||
ObLSHandle &ls_handle,
|
||||
@ -244,6 +242,7 @@ public:
|
||||
const ObIArray<compaction::ObTabletCheckInfo> &tablet_ls_infos,
|
||||
const ObIArray<compaction::ObTabletCheckInfo> &finish_tablet_ls_infos);
|
||||
OB_INLINE int64_t get_schedule_batch_size() const { return batch_size_mgr_.get_schedule_batch_size(); }
|
||||
OB_INLINE int64_t get_checker_batch_size() const { return batch_size_mgr_.get_checker_batch_size(); }
|
||||
private:
|
||||
friend struct ObTenantTabletSchedulerTaskMgr;
|
||||
int schedule_next_medium_for_leader(
|
||||
|
Loading…
x
Reference in New Issue
Block a user