fix unit gc blocked by running compaction dag by adding dag_yield

This commit is contained in:
Tsunaou 2023-11-08 13:39:24 +00:00 committed by ob-robot
parent e3c7ed0253
commit b5b2a5f052
8 changed files with 33 additions and 71 deletions

View File

@ -683,6 +683,7 @@ class EventTable
EN_SCHEDULE_MEDIUM_FAILED = 731,
EN_SPECIAL_TABLE_HAVE_LARGER_SCN = 732,
EN_COMPACTION_CO_PUSH_TABLES_FAILED = 733,
EN_COMPACTION_CO_MERGE_PARTITION_LONG_TIME = 734,
// please add new trace point after 750
EN_SESSION_LEAK_COUNT_THRESHOLD = 751,

View File

@ -163,12 +163,7 @@ static int advance_checkpoint_by_flush(const uint64_t tenant_id, const share::Ob
}
ob_usleep(CHECK_TIME_INTERVAL);
if (OB_FAIL(share::dag_yield())) {
if (OB_CANCELED == ret) {
LOG_INFO("Cancel this task since the whole dag is canceled", K(ret));
break;
} else {
LOG_WARN("Invalid return value for dag_yield", K(ret));
}
LOG_WARN("fail to yield dag", KR(ret));
}
}
} while (OB_SUCC(ret));
@ -5047,12 +5042,7 @@ int ObLSBackupComplementLogTask::wait_piece_frozen_(const share::ObTenantArchive
LOG_INFO("wait piece frozen", K(piece));
ob_usleep(CHECK_TIME_INTERVAL);
if (OB_FAIL(share::dag_yield())) {
if (OB_CANCELED == ret) {
LOG_INFO("Cancel this task since the whole dag is canceled", K(ret));
break;
} else {
LOG_WARN("Invalid return value for dag_yield", K(ret));
}
LOG_WARN("fail to yield dag", KR(ret));
}
}
} while (OB_SUCC(ret));

View File

@ -410,7 +410,23 @@ int ObCOMerger::merge_partition(ObBasicTabletMergeCtx &ctx, const int64_t idx)
bool need_replay_mergelog = true;
bool need_move_row_iter = false;
while (OB_SUCC(ret) && !merge_helper_->is_iter_end()) {
if (need_replay_mergelog == false) {
if (OB_FAIL(share::dag_yield())) {
STORAGE_LOG(WARN, "fail to yield co merge dag", KR(ret));
}
#ifdef ERRSIM
if (OB_SUCC(ret)) {
ret = OB_E(EventTable::EN_COMPACTION_CO_MERGE_PARTITION_LONG_TIME) ret;
if (OB_FAIL(ret)) {
if (REACH_TENANT_TIME_INTERVAL(ObPartitionMergeProgress::UPDATE_INTERVAL)) {
LOG_INFO("ERRSIM EN_COMPACTION_CO_MERGE_PARTITION_LONG_TIME", K(ret));
}
ret = OB_SUCCESS;
continue;
}
}
#endif
if (OB_FAIL(ret)) {
} else if (need_replay_mergelog == false) {
//reuse result_row
} else if (OB_FAIL(merge_helper_->find_rowkey_minimum_iters(minimum_iters))) {
STORAGE_LOG(WARN, "failed to find_rowkey_minimum_iters", K(ret), KPC(merge_helper_));

View File

@ -685,14 +685,8 @@ int ObPartitionMajorMerger::merge_partition(
macro_block_count = merge_info_.macro_block_count_;
ctx.mem_ctx_.mem_click();
if (OB_FAIL(share::dag_yield())) {
if (OB_CANCELED == ret) {
STORAGE_LOG(WARN, "Cancel this task since the whole dag is canceled", K(ret));
break;
} else {
STORAGE_LOG(WARN, "Invalid return value for dag_yield", K(ret));
}
}
if (OB_UNLIKELY(!MTL(ObTenantTabletScheduler *)->could_major_merge_start())) {
STORAGE_LOG(WARN, "fail to yield dag", KR(ret));
} else if (OB_UNLIKELY(!MTL(ObTenantTabletScheduler *)->could_major_merge_start())) {
ret = OB_CANCELED;
STORAGE_LOG(WARN, "Major merge has been paused", K(ret));
CTX_SET_DIAGNOSE_LOCATION(ctx);
@ -1044,15 +1038,8 @@ int ObPartitionMinorMerger::merge_partition(
macro_block_count = merge_info_.macro_block_count_;
ctx.mem_ctx_.mem_click();
if (OB_FAIL(share::dag_yield())) {
if (OB_CANCELED == ret) {
STORAGE_LOG(WARN, "Cancel this task since the whole dag is canceled", K(ret));
break;
} else {
STORAGE_LOG(WARN, "Invalid return value for dag_yield", K(ret));
}
}
//find minimum merge iter
if (merge_helper_->is_iter_end()) {
STORAGE_LOG(WARN, "fail to yield dag", KR(ret));
} else if (merge_helper_->is_iter_end()) { //find minimum merge iter
ret = OB_ITER_END;
} else if (OB_FAIL(merge_helper_->find_rowkey_minimum_iters(rowkey_minimum_iters))) {
STORAGE_LOG(WARN, "Failed to find minimum iters", K(ret), KPC(merge_helper_));

View File

@ -129,12 +129,7 @@ int ObUniqueIndexChecker::calc_column_checksum(
column_checksum.at(i) += row->storage_datums_[i].checksum(0);
}
if (OB_FAIL(dag_yield())) {
if (OB_CANCELED == ret) {
STORAGE_LOG(INFO, "Cancel this task since the whole dag is canceled", K(ret));
break;
} else {
STORAGE_LOG(WARN, "Invalid return value for dag_yield", K(ret));
}
STORAGE_LOG(WARN, "fail to yield dag", KR(ret));
}
}
}
@ -566,12 +561,8 @@ int ObUniqueIndexChecker::check_unique_index(ObIDag *dag)
LOG_WARN("fail to generate index ddl error message", K(ret), K(tmp_ret), KPC(index_schema_), K(tablet_id_), K(self_addr));
ob_usleep(RETRY_INTERVAL);
if (OB_FAIL(dag_yield())) {
if (OB_CANCELED == ret) {
LOG_INFO("Cancel this task since the whole dag is canceled", K(ret));
break;
} else {
LOG_WARN("Invalid return value for dag_yield", K(ret));
}
LOG_WARN("fail to yield dag", KR(ret));
keep_report_err_msg = false;
}
} else {
if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret && OB_ERR_DUPLICATED_UNIQUE_KEY == report_ret_code) {
@ -616,12 +607,7 @@ int ObUniqueIndexChecker::wait_trans_end(ObIDag *dag)
ret = OB_SUCCESS;
ob_usleep(RETRY_INTERVAL);
if (OB_FAIL(dag_yield())) {
if (OB_CANCELED == ret) {
LOG_INFO("Cancel this task since the whole dag is canceled", K(ret));
break;
} else {
LOG_WARN("Invalid return value for dag_yield", K(ret));
}
LOG_WARN("fail to yield dag", KR(ret));
}
} else {
LOG_WARN("fail to check modify time elapsed", K(ret));

View File

@ -1201,12 +1201,7 @@ int ObComplementWriteTask::append_row(ObScan *scan)
ObColumnChecksumCalculator *checksum_calculator = nullptr;
t1 = ObTimeUtility::current_time();
if (OB_FAIL(dag_yield())) {
if (OB_CANCELED == ret) {
LOG_INFO("Cancel this task since the whole dag is canceled", K(ret));
break;
} else {
LOG_WARN("Invalid return value for dag_yield", K(ret));
}
LOG_WARN("fail to yield dag", KR(ret));
} else if (OB_FAIL(scan->get_next_row(tmp_row, reshape_row_only_for_remote_scan))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get next row", K(ret));

View File

@ -133,15 +133,8 @@ int ObStorageHAMacroBlockWriter::process(blocksstable::ObMacroBlocksWriteCtx &co
break;
}
if (OB_FAIL(dag_yield())) {
if (OB_CANCELED == ret) {
STORAGE_LOG(INFO, "Cancel this task since the whole dag is canceled", K(ret));
break;
} else {
STORAGE_LOG(WARN, "Invalid return value for dag_yield", K(ret));
}
}
if (OB_FAIL(reader_->get_next_macro_block(header, data))) {
STORAGE_LOG(WARN, "fail to yield dag", KR(ret));
} else if (OB_FAIL(reader_->get_next_macro_block(header, data))) {
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "failed to get next macro block", K(ret));
} else {

View File

@ -1253,14 +1253,8 @@ int ObExternalSortRound<T, Compare>::do_one_run(
while (OB_SUCC(ret)) {
if (OB_FAIL(share::dag_yield())) {
if (OB_CANCELED == ret) {
STORAGE_LOG(INFO, "Cancel this task since the whole dag is canceled", K(ret));
break;
} else {
STORAGE_LOG(WARN, "Invalid return value for dag_yield", K(ret));
}
}
if (OB_FAIL(merger_.get_next_item(item))) {
STORAGE_LOG(WARN, "fail to yield dag", KR(ret));
} else if (OB_FAIL(merger_.get_next_item(item))) {
if (common::OB_ITER_END != ret) {
STORAGE_LOG(WARN, "fail to get next item", K(ret));
} else {