split compaction parallel range for big table
This commit is contained in:
1
deps/oblib/src/lib/utility/ob_tracepoint.h
vendored
1
deps/oblib/src/lib/utility/ob_tracepoint.h
vendored
@ -709,6 +709,7 @@ class EventTable
|
||||
EN_CO_MREGE_DAG_READY_FOREVER = 738,
|
||||
EN_CO_MREGE_DAG_SCHEDULE_REST = 739,
|
||||
EN_COMPACTION_SCHEDULE_MEDIUM_MERGE_AFTER_MINI = 740,
|
||||
EN_COMPACTION_MEDIUM_INIT_LARGE_PARALLEL_RANGE = 741,
|
||||
|
||||
// please add new trace point after 750
|
||||
EN_SESSION_LEAK_COUNT_THRESHOLD = 751,
|
||||
|
@ -601,6 +601,7 @@ void ObMajorMergeProgressChecker::print_unfinish_info(const int64_t cost_us)
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t array_cnt = ObUncompactInfo::DEBUG_INFO_CNT;
|
||||
ObSEArray<uint64_t, array_cnt> tmp_table_id_array;
|
||||
ObSEArray<uint64_t, array_cnt> tmp_tablets_array;
|
||||
ObSEArray<ObTabletReplica, array_cnt> uncompacted_replica_array;
|
||||
ObSEArray<uint64_t, array_cnt> uncompacted_table_array;
|
||||
(void) uncompact_info_.get_uncompact_info(uncompacted_replica_array, uncompacted_table_array);
|
||||
@ -612,6 +613,12 @@ void ObMajorMergeProgressChecker::print_unfinish_info(const int64_t cost_us)
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int64_t idx = 0; OB_SUCC(ret) && idx < uncompacted_replica_array.count(); ++idx) {
|
||||
if (OB_FAIL(tmp_tablets_array.push_back(
|
||||
uncompacted_replica_array.at(idx).get_tablet_id().id()))) {
|
||||
LOG_WARN("failed to push array", KR(ret));
|
||||
}
|
||||
}
|
||||
// table in table_ids_ may finish verified in deal_with_rest_data_table()
|
||||
// need next loop to delete from array
|
||||
ADD_RS_COMPACTION_EVENT(
|
||||
@ -620,6 +627,7 @@ void ObMajorMergeProgressChecker::print_unfinish_info(const int64_t cost_us)
|
||||
common::ObTimeUtility::fast_current_time(),
|
||||
K(cost_us), K_(progress), "remain_table_id_count", table_ids_.count(),
|
||||
"remain_table_ids", tmp_table_id_array,
|
||||
"remain_tablet_ids", tmp_tablets_array,
|
||||
K_(total_time_guard), K_(validator_statistics));
|
||||
LOG_INFO("succ to check merge progress", K_(tenant_id), K_(loop_cnt), K_(compaction_scn), K(cost_us),
|
||||
K_(progress), "remain_table_id_count", table_ids_.count(),
|
||||
|
@ -13674,7 +13674,7 @@ def_table_schema(**gen_iterate_private_virtual_table_def(
|
||||
# 12454: __all_virtual_wr_sqltext
|
||||
# 12455: __all_virtual_trusted_root_certificate_info
|
||||
# 12456: __all_virtual_dbms_lock_allocated
|
||||
# 12457: __all_virtual_sharing_storage_compaction_info
|
||||
# 12457: __all_virtual_shared_storage_compaction_info
|
||||
|
||||
def_table_schema(
|
||||
owner = 'wendongbodongbo.wd',
|
||||
|
@ -161,9 +161,9 @@ int ObStaticDataStoreDesc::init(
|
||||
STORAGE_LOG(WARN, "fail to get data version", K(ret));
|
||||
} else {
|
||||
major_working_cluster_version_ = compat_version;
|
||||
STORAGE_LOG(INFO, "success to set major working cluster version", K(ret), "merge_type", merge_type_to_str(merge_type),
|
||||
K(cluster_version), K(major_working_cluster_version_));
|
||||
}
|
||||
STORAGE_LOG(INFO, "success to set major working cluster version", K(ret), "merge_type", merge_type_to_str(merge_type),
|
||||
K(cluster_version), K(major_working_cluster_version_));
|
||||
} else if (compressor_type_ != ObCompressorType::NONE_COMPRESSOR) {
|
||||
// for mini/minor, use default compressor
|
||||
compressor_type_ = DEFAULT_MINOR_COMPRESSOR_TYPE;
|
||||
@ -705,7 +705,7 @@ int ObDataStoreDesc::init(
|
||||
if (OB_FAIL(inner_init(merge_schema, row_store_type))) {
|
||||
STORAGE_LOG(WARN, "failed inner init", KR(ret), K(merge_schema));
|
||||
} else {
|
||||
STORAGE_LOG(INFO, "success to init data desc", K(ret), KPC(this), K(merge_schema));
|
||||
STORAGE_LOG(TRACE, "success to init data desc", K(ret), KPC(this), K(merge_schema));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
reset();
|
||||
|
@ -713,6 +713,13 @@ int ObMediumCompactionScheduleFunc::init_parallel_range_and_schema_changed(
|
||||
expected_task_count = 2;
|
||||
LOG_INFO("ERRSIM EN_COMPACTION_MEDIUM_INIT_PARALLEL_RANGE", KPC(this), K(expected_task_count));
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
ret = OB_E(EventTable::EN_COMPACTION_MEDIUM_INIT_LARGE_PARALLEL_RANGE) ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
expected_task_count = 64;
|
||||
LOG_INFO("ERRSIM EN_COMPACTION_MEDIUM_INIT_LARGE_PARALLEL_RANGE", KPC(this), K(expected_task_count));
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
@ -738,16 +745,50 @@ int ObMediumCompactionScheduleFunc::init_parallel_range_and_schema_changed(
|
||||
LOG_WARN("failed to get table iter", K(ret), K(range_array));
|
||||
} else if (OB_FAIL(input_range_array.push_back(range))) {
|
||||
LOG_WARN("failed to push back range", K(ret), K(range));
|
||||
} else if (OB_FAIL(range_spliter.get_split_multi_ranges(
|
||||
input_range_array,
|
||||
expected_task_count,
|
||||
tablet->get_rowkey_read_info(),
|
||||
table_iter,
|
||||
allocator_,
|
||||
range_array))) {
|
||||
LOG_WARN("failed to get split multi range", K(ret), K(range_array));
|
||||
} else if (OB_FAIL(medium_info.gene_parallel_info(allocator_, range_array))) {
|
||||
LOG_WARN("failed to get parallel ranges", K(ret), K(range_array));
|
||||
} else {
|
||||
bool recalc_count_flag = false;
|
||||
do {
|
||||
if (OB_FAIL(range_spliter.get_split_multi_ranges(
|
||||
input_range_array,
|
||||
expected_task_count,
|
||||
tablet->get_rowkey_read_info(),
|
||||
table_iter,
|
||||
allocator_,
|
||||
range_array))) {
|
||||
LOG_WARN("failed to get split multi range", K(ret), K(range_array));
|
||||
} else if (OB_FAIL(medium_info.gene_parallel_info(allocator_, range_array))) {
|
||||
LOG_WARN("failed to get parallel ranges", K(ret), K(range_array));
|
||||
} else {
|
||||
int64_t buf_len = ObTabletMediumCompactionInfoRecorder::cal_buf_len(tablet->get_tablet_meta().tablet_id_, medium_info, nullptr/*log_header*/);
|
||||
#ifdef ERRSIM
|
||||
ret = OB_E(EventTable::EN_COMPACTION_MEDIUM_INIT_LARGE_PARALLEL_RANGE) ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
ret = OB_SUCCESS;
|
||||
if (!recalc_count_flag) {
|
||||
buf_len = common::OB_MAX_LOG_ALLOWED_SIZE;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (buf_len < common::OB_MAX_LOG_ALLOWED_SIZE) {
|
||||
LOG_TRACE("success to split ranges", KR(ret), K(buf_len), K(medium_info.parallel_merge_info_), K(range_array), K(medium_info.parallel_merge_info_.get_serialize_size()));
|
||||
break;
|
||||
} else if (recalc_count_flag) {
|
||||
expected_task_count -= MAX(1, expected_task_count / 5);
|
||||
} else {
|
||||
recalc_count_flag = true;
|
||||
// get parallel info serialize size
|
||||
const int64_t parallel_size = medium_info.parallel_merge_info_.get_serialize_size();
|
||||
const double avg_range_size = (parallel_size + 0.0) / range_array.count();
|
||||
const int64_t rest_info_size = buf_len - parallel_size;
|
||||
expected_task_count = MAX(1, (common::OB_MAX_LOG_ALLOWED_SIZE - 1 - rest_info_size) / avg_range_size);
|
||||
expected_task_count = MIN(expected_task_count, MAX_MERGE_THREAD);
|
||||
LOG_INFO("success to recalc ranges", KR(ret), K(buf_len), K(expected_task_count), K(avg_range_size), K(rest_info_size));
|
||||
}
|
||||
medium_info.clear_parallel_range();
|
||||
table_iter.resume();
|
||||
range_array.reuse();
|
||||
}
|
||||
} while (OB_SUCC(ret) && !medium_info.contain_parallel_range_ && expected_task_count > 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -944,7 +985,7 @@ int ObMediumCompactionScheduleFunc::submit_medium_clog(
|
||||
|
||||
int ObMediumCompactionScheduleFunc::batch_check_medium_meta_table(
|
||||
const ObIArray<ObTabletCheckInfo> &tablet_ls_infos,
|
||||
hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
const hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
ObIArray<ObTabletCheckInfo> &finish_tablet_ls,
|
||||
ObCompactionTimeGuard &time_guard)
|
||||
{
|
||||
@ -993,7 +1034,7 @@ int ObMediumCompactionScheduleFunc::check_medium_meta_table(
|
||||
const int64_t check_medium_snapshot,
|
||||
const ObTabletInfo &tablet_info,
|
||||
const share::ObTabletReplicaFilterHolder &filters,
|
||||
hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
const hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
bool &merge_finish)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1167,7 +1208,7 @@ int ObMediumCompactionScheduleFunc::batch_check_medium_checksum(
|
||||
|
||||
// for Leader, clean wait_check_medium_scn
|
||||
int ObMediumCompactionScheduleFunc::batch_check_medium_finish(
|
||||
hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
const hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
ObIArray<ObTabletCheckInfo> &finish_tablet_ls_infos,
|
||||
const ObIArray<ObTabletCheckInfo> &tablet_ls_infos,
|
||||
ObCompactionTimeGuard &time_guard)
|
||||
|
@ -82,7 +82,7 @@ public:
|
||||
ObIAllocator &allocator,
|
||||
storage::ObStorageSchema &storage_schema);
|
||||
static int batch_check_medium_finish(
|
||||
hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
const hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
ObIArray<ObTabletCheckInfo> &finish_tablet_ls_infos,
|
||||
const ObIArray<ObTabletCheckInfo> &tablet_ls_infos,
|
||||
ObCompactionTimeGuard &time_guard);
|
||||
@ -112,14 +112,14 @@ protected:
|
||||
int submit_medium_clog(ObMediumCompactionInfo &medium_info);
|
||||
static int batch_check_medium_meta_table(
|
||||
const ObIArray<ObTabletCheckInfo> &tablet_ls_infos,
|
||||
hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
const hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
ObIArray<ObTabletCheckInfo> &finish_tablet_ls,
|
||||
ObCompactionTimeGuard &time_guard);
|
||||
static int check_medium_meta_table(
|
||||
const int64_t medium_snapshot,
|
||||
const ObTabletInfo &tablet_info,
|
||||
const share::ObTabletReplicaFilterHolder &filters,
|
||||
hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
const hash::ObHashMap<ObLSID, share::ObLSInfo> &ls_info_map,
|
||||
bool &merge_finish);
|
||||
static int init_tablet_filters(share::ObTabletReplicaFilterHolder &filters);
|
||||
static int check_medium_checksum(
|
||||
|
@ -196,10 +196,6 @@ int ObParallelMergeInfo::generate_from_range_array(
|
||||
LOG_DEBUG("parallel range info", K(ret), KPC(this), K(paral_range), K(paral_range.count()), K(paral_range.at(0)));
|
||||
if (OB_FAIL(ret)) {
|
||||
destroy();
|
||||
} else if (get_serialize_size() > MAX_PARALLEL_RANGE_SERIALIZE_LEN) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
LOG_DEBUG("parallel range info is too large to sync", K(ret), KPC(this));
|
||||
destroy();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -484,8 +480,6 @@ int ObMediumCompactionInfo::gene_parallel_info(
|
||||
if (OB_FAIL(parallel_merge_info_.generate_from_range_array(allocator, paral_range))) {
|
||||
if (OB_UNLIKELY(OB_SIZE_OVERFLOW != ret)) {
|
||||
LOG_WARN("failed to generate parallel merge info", K(ret), K(paral_range));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else if (parallel_merge_info_.get_size() > 0) {
|
||||
contain_parallel_range_ = true;
|
||||
|
@ -301,6 +301,27 @@ int ObTabletMediumCompactionInfoRecorder::submit_trans_on_mds_table(const bool i
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObTabletMediumCompactionInfoRecorder::cal_buf_len(
|
||||
const common::ObTabletID &tablet_id,
|
||||
const ObMediumCompactionInfo &medium_info,
|
||||
const logservice::ObLogBaseHeader *log_header)
|
||||
{
|
||||
int64_t buf_len = 0;
|
||||
if (OB_NOT_NULL(log_header)) {
|
||||
buf_len += log_header->get_serialize_size();
|
||||
} else {
|
||||
const logservice::ObLogBaseHeader tmp_log_header(
|
||||
logservice::ObLogBaseType::MEDIUM_COMPACTION_LOG_BASE_TYPE,
|
||||
logservice::ObReplayBarrierType::NO_NEED_BARRIER,
|
||||
tablet_id.id());
|
||||
buf_len += tmp_log_header.get_serialize_size();
|
||||
}
|
||||
buf_len += (tablet_id.get_serialize_size()
|
||||
+ serialization::encoded_length_i64(medium_info.medium_snapshot_)
|
||||
+ medium_info.get_serialize_size());
|
||||
return buf_len;
|
||||
}
|
||||
|
||||
// log_header + tablet_id + medium_snapshot + medium_compaction_info
|
||||
int ObTabletMediumCompactionInfoRecorder::prepare_struct_in_lock(
|
||||
int64_t &update_version,
|
||||
@ -322,10 +343,7 @@ int ObTabletMediumCompactionInfoRecorder::prepare_struct_in_lock(
|
||||
char *buf = nullptr;
|
||||
char *alloc_clog_buf = nullptr;
|
||||
int64_t alloc_buf_offset = 0;
|
||||
const int64_t buf_len = log_header.get_serialize_size()
|
||||
+ tablet_id_.get_serialize_size()
|
||||
+ serialization::encoded_length_i64(medium_info_->medium_snapshot_)
|
||||
+ medium_info_->get_serialize_size();
|
||||
const int64_t buf_len = cal_buf_len(tablet_id_, *medium_info_, &log_header);
|
||||
const int64_t alloc_buf_size = buf_len + sizeof(ObTabletHandle) + sizeof(ObStorageCLogCb) + sizeof(mds::MdsCtx);
|
||||
|
||||
if (OB_UNLIKELY(nullptr == medium_info_ || nullptr == allocator)) {
|
||||
|
@ -47,6 +47,10 @@ public:
|
||||
int submit_medium_compaction_info(ObMediumCompactionInfo &medium_info, ObIAllocator &allocator);
|
||||
// follower
|
||||
int replay_medium_compaction_log(const share::SCN &scn, const char *buf, const int64_t size, int64_t &pos);
|
||||
static int64_t cal_buf_len(
|
||||
const common::ObTabletID &tablet_id,
|
||||
const ObMediumCompactionInfo &medium_info,
|
||||
const logservice::ObLogBaseHeader *log_header);
|
||||
INHERIT_TO_STRING_KV("ObIStorageClogRecorder", ObIStorageClogRecorder, K_(ignore_medium), K_(ls_id), K_(tablet_id));
|
||||
private:
|
||||
virtual int inner_replay_clog(
|
||||
|
@ -170,6 +170,7 @@ DEFINE_COMPACTION_EVENT_PRINT_KV(3)
|
||||
DEFINE_COMPACTION_EVENT_PRINT_KV(4)
|
||||
DEFINE_COMPACTION_EVENT_PRINT_KV(5)
|
||||
DEFINE_COMPACTION_EVENT_PRINT_KV(6)
|
||||
DEFINE_COMPACTION_EVENT_PRINT_KV(7)
|
||||
|
||||
|
||||
} // namespace compaction
|
||||
|
Reference in New Issue
Block a user