From c6f8434e6fd8a21c0091f7633ed24b97671226ba Mon Sep 17 00:00:00 2001 From: suz-yang Date: Thu, 28 Mar 2024 11:56:18 +0000 Subject: [PATCH] Fix direct load exit when worker still running --- .../table_load/ob_table_load_coordinator.cpp | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 095d43ed58..5c15dc0553 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -154,7 +154,9 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx) static const int64_t max_retry_times = 100; // ensure store ctx detect heart beat timeout and abort ObArray addr_array1, addr_array2; ObIArray *curr_round = &addr_array1, *next_round = &addr_array2; + int64_t running_cnt = 0; int64_t fail_cnt = 0; + int64_t round = 0; int64_t tries = 0; ObDirectLoadControlAbortArg arg; ObDirectLoadControlAbortRes res; @@ -170,6 +172,8 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx) } while (!curr_round->empty() && tries < max_retry_times) { ret = OB_SUCCESS; + ++round; + running_cnt = 0; fail_cnt = 0; for (int64_t i = 0; i < curr_round->count(); ++i) { const ObAddr &addr = curr_round->at(i); @@ -189,19 +193,28 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx) if (OB_FAIL(ret)) { ++fail_cnt; ret = OB_SUCCESS; + } else { + ++running_cnt; } if (OB_FAIL(next_round->push_back(addr))) { LOG_WARN("fail to push back", KR(ret)); } } } - ++tries; - if (tries % 10 == 0) { - LOG_WARN("retry too many times", K(tries), K(fail_cnt), KPC(next_round)); + if (running_cnt > 0 || fail_cnt > 0) { + if (running_cnt > 0) { + // peer still running, keep waiting + tries = 0; + } else { + ++tries; + } + if (round % 10 == 0) { + FLOG_WARN("retry too many times", K(round), K(running_cnt), K(fail_cnt), K(tries), KPC(next_round)); + } + std::swap(curr_round, next_round); + next_round->reuse(); + ob_usleep(WAIT_INTERVAL_US); } - std::swap(curr_round, next_round); - next_round->reuse(); - ob_usleep(WAIT_INTERVAL_US); } } return ret;