branch-2.1: [chore](task) log the thrift message size if the broken pipe is occurred #49492 (#49509)

Cherry-picked from #49492

Co-authored-by: walter <maochuan@selectdb.com>
This commit is contained in:
github-actions[bot]
2025-03-28 09:57:53 +08:00
committed by GitHub
parent 63fb74b480
commit 2fb8e00907

View File

@ -20,6 +20,7 @@ package org.apache.doris.task;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThriftUtils;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
@ -60,6 +61,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/*
* This class group tasks by backend
@ -166,6 +168,7 @@ public class AgentBatchTask implements Runnable {
TNetworkAddress address = null;
boolean ok = false;
String errMsg = "";
List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>();
try {
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null || !backend.isAlive()) {
@ -177,7 +180,6 @@ public class AgentBatchTask implements Runnable {
String host = FeConstants.runningUnitTest ? "127.0.0.1" : backend.getHost();
address = new TNetworkAddress(host, backend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>();
for (AgentTask task : tasks) {
agentTaskRequests.add(toAgentTaskRequest(task));
if (agentTaskRequests.size() >= batchSize) {
@ -190,6 +192,21 @@ public class AgentBatchTask implements Runnable {
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backendId, e);
errMsg = String.format("task exec error: %s. backend[%d]", e.getMessage(), backendId);
if (!agentTaskRequests.isEmpty() && errMsg.contains("Broken pipe")) {
// Log the task binary message size and the max task type, to help debug the
// large thrift message size issue.
List<Pair<TTaskType, Long>> taskTypeAndSize = agentTaskRequests.stream()
.map(req -> Pair.of(req.getTaskType(), ThriftUtils.getBinaryMessageSize(req)))
.collect(Collectors.toList());
Pair<TTaskType, Long> maxTaskTypeAndSize = taskTypeAndSize.stream()
.max((p1, p2) -> Long.compare(p1.value(), p2.value()))
.orElse(null); // taskTypeAndSize is not empty
TTaskType maxType = maxTaskTypeAndSize.first;
long maxSize = maxTaskTypeAndSize.second;
long totalSize = taskTypeAndSize.stream().map(Pair::value).reduce(0L, Long::sum);
LOG.warn("submit {} tasks to backend[{}], total size: {}, max task type: {}, size: {}. msg: {}",
agentTaskRequests.size(), backendId, totalSize, maxType, maxSize, e.getMessage());
}
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);