[fix](task) Abort creating replica task if sending RPC failed #42276 (#42963)

cherry pick from #42276
This commit is contained in:
walter
2024-11-08 10:44:27 +08:00
committed by GitHub
parent 6006907c79
commit 1b6d47d351
6 changed files with 54 additions and 14 deletions

View File

@ -314,7 +314,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
ok = false;
}
if (!ok) {
if (!ok || !countDownLatch.getStatus().ok()) {
// create replicas failed. just cancel the job
// clear tasks and show the failed replicas to user
AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);

View File

@ -981,7 +981,7 @@ public class RestoreJob extends AbstractJob {
ok = true;
}
if (ok) {
if (ok && latch.getStatus().ok()) {
if (LOG.isDebugEnabled()) {
LOG.debug("finished to create all restored replicas. {}", this);
}
@ -1045,8 +1045,13 @@ public class RestoreJob extends AbstractJob {
.map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")")
.collect(Collectors.toList());
String idStr = Joiner.on(", ").join(subList);
status = new Status(ErrCode.COMMON_ERROR,
"Failed to create replicas for restore. unfinished marks: " + idStr);
String reason = "TIMEDOUT";
if (!latch.getStatus().ok()) {
reason = latch.getStatus().getErrorMsg();
}
String errMsg = String.format(
"Failed to create replicas for restore: %s, unfinished marks: %s", reason, idStr);
status = new Status(ErrCode.COMMON_ERROR, errMsg);
return;
}
LOG.info("finished to prepare meta. {}", this);

View File

@ -47,6 +47,20 @@ public class MarkedCountDownLatch<K, V> extends CountDownLatch {
return false;
}
public synchronized boolean markedCountDownWithStatus(K key, V value, Status status) {
// update status first before countDown.
// so that the waiting thread will get the correct status.
if (st.ok()) {
st = status;
}
if (marks.remove(key, value)) {
super.countDown();
return true;
}
return false;
}
public synchronized List<Entry<K, V>> getLeftMarks() {
return Lists.newArrayList(marks.entries());
}

View File

@ -155,9 +155,11 @@ public class AgentBatchTask implements Runnable {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
String errMsg = "";
try {
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null || !backend.isAlive()) {
errMsg = String.format("backend %d is not alive", backendId);
continue;
}
List<AgentTask> tasks = this.backendIdToTasks.get(backendId);
@ -167,30 +169,28 @@ public class AgentBatchTask implements Runnable {
client = ClientPool.backendPool.borrowObject(address);
List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>();
for (AgentTask task : tasks) {
try {
agentTaskRequests.add(toAgentTaskRequest(task));
} catch (Exception e) {
task.failed();
throw e;
}
agentTaskRequests.add(toAgentTaskRequest(task));
}
client.submitTasks(agentTaskRequests);
if (LOG.isDebugEnabled()) {
for (AgentTask task : tasks) {
if (LOG.isDebugEnabled()) {
LOG.debug("send task: type[{}], backend[{}], signature[{}]",
task.getTaskType(), backendId, task.getSignature());
}
LOG.debug("send task: type[{}], backend[{}], signature[{}]",
task.getTaskType(), backendId, task.getSignature());
}
}
ok = true;
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backendId, e);
errMsg = String.format("task exec error: %s. backend[%d]", e.getMessage(), backendId);
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
List<AgentTask> tasks = this.backendIdToTasks.get(backendId);
for (AgentTask task : tasks) {
task.failedWithMsg(errMsg);
}
}
}
} // end for backend

View File

@ -110,6 +110,10 @@ public abstract class AgentTask {
++this.failedTimes;
}
public void failedWithMsg(String errMsg) {
failed();
}
public int getFailedTimes() {
return this.failedTimes;
}

View File

@ -228,6 +228,23 @@ public class CreateReplicaTask extends AgentTask {
}
}
@Override
public void failedWithMsg(String errMsg) {
super.failedWithMsg(errMsg);
// CreateReplicaTask will not trigger a retry in ReportTask. Therefore, it needs to
// be marked as failed here and all threads waiting for the result of
// CreateReplicaTask need to be awakened.
if (this.latch != null) {
Status s = new Status(TStatusCode.CANCELLED, errMsg);
latch.markedCountDownWithStatus(getBackendId(), getTabletId(), s);
if (LOG.isDebugEnabled()) {
LOG.debug("CreateReplicaTask failed with msg: {}, tablet: {}, backend: {}",
errMsg, getTabletId(), getBackendId());
}
}
}
public void setLatch(MarkedCountDownLatch<Long, Long> latch) {
this.latch = latch;
}