[branch-2.1] Picks "[Fix](delete) Fix delete job timeout when executing delete from ... #37363" (#37374)
## Proposed changes picks https://github.com/apache/doris/pull/37363
This commit is contained in:
@ -373,6 +373,12 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ
|
||||
long timeoutMs = getTimeoutMs();
|
||||
boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
if (ok) {
|
||||
if (!countDownLatch.getStatus().ok()) {
|
||||
// encounter some errors that don't need to retry, abort directly
|
||||
LOG.warn("delete job failed, errmsg={}", countDownLatch.getStatus().getErrorMsg());
|
||||
throw new UserException(String.format("delete job failed, errmsg:%s",
|
||||
countDownLatch.getStatus().getErrorMsg()));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -137,7 +137,8 @@ public class MasterImpl {
|
||||
&& taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE
|
||||
&& taskType != TTaskType.CLONE && taskType != TTaskType.PUBLISH_VERSION
|
||||
&& taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO
|
||||
&& taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) {
|
||||
&& taskType != TTaskType.STORAGE_MEDIUM_MIGRATE
|
||||
&& taskType != TTaskType.REALTIME_PUSH) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@ -150,7 +151,6 @@ public class MasterImpl {
|
||||
finishCreateReplica(task, request);
|
||||
break;
|
||||
case REALTIME_PUSH:
|
||||
checkHasTabletInfo(request);
|
||||
Preconditions.checkState(request.isSetReportVersion());
|
||||
finishRealtimePush(task, request);
|
||||
break;
|
||||
@ -295,16 +295,32 @@ public class MasterImpl {
|
||||
}
|
||||
}
|
||||
|
||||
private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) {
|
||||
List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
|
||||
Preconditions.checkState(finishTabletInfos != null && !finishTabletInfos.isEmpty());
|
||||
|
||||
private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) throws Exception {
|
||||
PushTask pushTask = (PushTask) task;
|
||||
|
||||
long dbId = pushTask.getDbId();
|
||||
long backendId = pushTask.getBackendId();
|
||||
long signature = task.getSignature();
|
||||
long transactionId = ((PushTask) task).getTransactionId();
|
||||
|
||||
if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
|
||||
if (pushTask.getPushType() == TPushType.DELETE) {
|
||||
// DeleteHandler may return status code DELETE_INVALID_CONDITION and DELETE_INVALID_PARAMETERS,
|
||||
// we don't need to retry if meet them.
|
||||
// note that they will be converted to TStatusCode.INTERNAL_ERROR when being sent from be to fe
|
||||
if (request.getTaskStatus().getStatusCode() == TStatusCode.INTERNAL_ERROR) {
|
||||
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
|
||||
task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString());
|
||||
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
|
||||
LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString());
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
checkHasTabletInfo(request);
|
||||
List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
|
||||
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.analysis.Predicate;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.common.MarkedCountDownLatch;
|
||||
import org.apache.doris.common.Status;
|
||||
import org.apache.doris.thrift.TBrokerScanRange;
|
||||
import org.apache.doris.thrift.TColumn;
|
||||
import org.apache.doris.thrift.TCondition;
|
||||
@ -34,6 +35,7 @@ import org.apache.doris.thrift.TPriority;
|
||||
import org.apache.doris.thrift.TPushReq;
|
||||
import org.apache.doris.thrift.TPushType;
|
||||
import org.apache.doris.thrift.TResourceInfo;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
@ -211,6 +213,16 @@ public class PushTask extends AgentTask {
|
||||
}
|
||||
}
|
||||
|
||||
// call this always means one of tasks is failed. count down to zero to finish entire task
|
||||
public void countDownToZero(TStatusCode code, String errMsg) {
|
||||
if (this.latch != null) {
|
||||
latch.countDownToZero(new Status(code, errMsg));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("PushTask count down to zero. error msg: {}", errMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public long getReplicaId() {
|
||||
return replicaId;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user