From 2fb8e00907e193995d4c05f8ab700358ffd67688 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 28 Mar 2025 09:57:53 +0800 Subject: [PATCH] branch-2.1: [chore](task) log the thrift message size if the broken pipe is occurred #49492 (#49509) Cherry-picked from #49492 Co-authored-by: walter --- .../org/apache/doris/task/AgentBatchTask.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index ebfdb28a16..0045e05ccb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -20,6 +20,7 @@ package org.apache.doris.task; import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.ThriftUtils; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; @@ -60,6 +61,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /* * This class group tasks by backend @@ -166,6 +168,7 @@ public class AgentBatchTask implements Runnable { TNetworkAddress address = null; boolean ok = false; String errMsg = ""; + List agentTaskRequests = new LinkedList(); try { Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); if (backend == null || !backend.isAlive()) { @@ -177,7 +180,6 @@ public class AgentBatchTask implements Runnable { String host = FeConstants.runningUnitTest ? "127.0.0.1" : backend.getHost(); address = new TNetworkAddress(host, backend.getBePort()); client = ClientPool.backendPool.borrowObject(address); - List agentTaskRequests = new LinkedList(); for (AgentTask task : tasks) { agentTaskRequests.add(toAgentTaskRequest(task)); if (agentTaskRequests.size() >= batchSize) { @@ -190,6 +192,21 @@ public class AgentBatchTask implements Runnable { } catch (Exception e) { LOG.warn("task exec error. backend[{}]", backendId, e); errMsg = String.format("task exec error: %s. backend[%d]", e.getMessage(), backendId); + if (!agentTaskRequests.isEmpty() && errMsg.contains("Broken pipe")) { + // Log the task binary message size and the max task type, to help debug the + // large thrift message size issue. + List> taskTypeAndSize = agentTaskRequests.stream() + .map(req -> Pair.of(req.getTaskType(), ThriftUtils.getBinaryMessageSize(req))) + .collect(Collectors.toList()); + Pair maxTaskTypeAndSize = taskTypeAndSize.stream() + .max((p1, p2) -> Long.compare(p1.value(), p2.value())) + .orElse(null); // taskTypeAndSize is not empty + TTaskType maxType = maxTaskTypeAndSize.first; + long maxSize = maxTaskTypeAndSize.second; + long totalSize = taskTypeAndSize.stream().map(Pair::value).reduce(0L, Long::sum); + LOG.warn("submit {} tasks to backend[{}], total size: {}, max task type: {}, size: {}. msg: {}", + agentTaskRequests.size(), backendId, totalSize, maxType, maxSize, e.getMessage()); + } } finally { if (ok) { ClientPool.backendPool.returnObject(address, client);