Fix direct load exit when worker still running
This commit is contained in:
@ -154,7 +154,9 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx)
|
|||||||
static const int64_t max_retry_times = 100; // ensure store ctx detect heart beat timeout and abort
|
static const int64_t max_retry_times = 100; // ensure store ctx detect heart beat timeout and abort
|
||||||
ObArray<ObAddr> addr_array1, addr_array2;
|
ObArray<ObAddr> addr_array1, addr_array2;
|
||||||
ObIArray<ObAddr> *curr_round = &addr_array1, *next_round = &addr_array2;
|
ObIArray<ObAddr> *curr_round = &addr_array1, *next_round = &addr_array2;
|
||||||
|
int64_t running_cnt = 0;
|
||||||
int64_t fail_cnt = 0;
|
int64_t fail_cnt = 0;
|
||||||
|
int64_t round = 0;
|
||||||
int64_t tries = 0;
|
int64_t tries = 0;
|
||||||
ObDirectLoadControlAbortArg arg;
|
ObDirectLoadControlAbortArg arg;
|
||||||
ObDirectLoadControlAbortRes res;
|
ObDirectLoadControlAbortRes res;
|
||||||
@ -170,6 +172,8 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx)
|
|||||||
}
|
}
|
||||||
while (!curr_round->empty() && tries < max_retry_times) {
|
while (!curr_round->empty() && tries < max_retry_times) {
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
|
++round;
|
||||||
|
running_cnt = 0;
|
||||||
fail_cnt = 0;
|
fail_cnt = 0;
|
||||||
for (int64_t i = 0; i < curr_round->count(); ++i) {
|
for (int64_t i = 0; i < curr_round->count(); ++i) {
|
||||||
const ObAddr &addr = curr_round->at(i);
|
const ObAddr &addr = curr_round->at(i);
|
||||||
@ -189,19 +193,28 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx)
|
|||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
++fail_cnt;
|
++fail_cnt;
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
|
} else {
|
||||||
|
++running_cnt;
|
||||||
}
|
}
|
||||||
if (OB_FAIL(next_round->push_back(addr))) {
|
if (OB_FAIL(next_round->push_back(addr))) {
|
||||||
LOG_WARN("fail to push back", KR(ret));
|
LOG_WARN("fail to push back", KR(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
++tries;
|
if (running_cnt > 0 || fail_cnt > 0) {
|
||||||
if (tries % 10 == 0) {
|
if (running_cnt > 0) {
|
||||||
LOG_WARN("retry too many times", K(tries), K(fail_cnt), KPC(next_round));
|
// peer still running, keep waiting
|
||||||
|
tries = 0;
|
||||||
|
} else {
|
||||||
|
++tries;
|
||||||
|
}
|
||||||
|
if (round % 10 == 0) {
|
||||||
|
FLOG_WARN("retry too many times", K(round), K(running_cnt), K(fail_cnt), K(tries), KPC(next_round));
|
||||||
|
}
|
||||||
|
std::swap(curr_round, next_round);
|
||||||
|
next_round->reuse();
|
||||||
|
ob_usleep(WAIT_INTERVAL_US);
|
||||||
}
|
}
|
||||||
std::swap(curr_round, next_round);
|
|
||||||
next_round->reuse();
|
|
||||||
ob_usleep(WAIT_INTERVAL_US);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
Reference in New Issue
Block a user