[opt](session variable) max_msg_size_of_result_receiver #31809

This commit is contained in:
zhiqiang
2024-03-06 14:50:27 +08:00
committed by yiguolei
parent f897db666c
commit 2e4201e406
7 changed files with 155 additions and 8 deletions

View File

@ -659,7 +659,8 @@ public class Coordinator implements CoordInterface {
if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) {
TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId,
addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline);
addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
if (!context.isReturnResultFromLocal()) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));

View File

@ -35,6 +35,7 @@ import org.apache.doris.proto.InternalService.KeyTuple;
import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.rpc.TCustomProtocolFactory;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TExprList;
@ -85,6 +86,8 @@ public class PointQueryExec implements CoordInterface {
// using this ID to find for this prepared statement
private UUID cacheID;
private final int maxMsgSizeOfResultReceiver;
private OlapScanNode getPlanRoot() {
List<PlanFragment> fragments = planner.getFragments();
PlanFragment fragment = fragments.get(0);
@ -96,7 +99,7 @@ public class PointQueryExec implements CoordInterface {
return planRoot;
}
public PointQueryExec(Planner planner, Analyzer analyzer) {
public PointQueryExec(Planner planner, Analyzer analyzer, int maxMessageSize) {
// init from planner
this.planner = planner;
List<PlanFragment> fragments = planner.getFragments();
@ -117,6 +120,7 @@ public class PointQueryExec implements CoordInterface {
// TODO
// planner.getDescTable().toThrift();
}
this.maxMsgSizeOfResultReceiver = maxMessageSize;
}
void setScanRangeLocations() throws Exception {
@ -310,8 +314,17 @@ public class PointQueryExec implements CoordInterface {
} else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
byte[] serialResult = pResult.getRowBatch().toByteArray();
TResultBatch resultBatch = new TResultBatch();
TDeserializer deserializer = new TDeserializer();
deserializer.deserialize(resultBatch, serialResult);
TDeserializer deserializer = new TDeserializer(
new TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
try {
deserializer.deserialize(resultBatch, serialResult);
} catch (TException e) {
if (e.getMessage().contains("MaxMessageSize reached")) {
throw new TException("MaxMessageSize reached, try increase max_msg_size_of_result_receiver");
} else {
throw e;
}
}
rowBatch.setBatch(resultBatch);
rowBatch.setEos(true);
return rowBatch;

View File

@ -23,6 +23,7 @@ import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.rpc.TCustomProtocolFactory;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TStatusCode;
@ -53,12 +54,16 @@ public class ResultReceiver {
private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture = null;
public String cancelReason = "";
public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs) {
int maxMsgSizeOfResultReceiver;
public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs,
int maxMsgSizeOfResultReceiver) {
this.queryId = Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
this.finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
this.backendId = backendId;
this.address = address;
this.timeoutTs = timeoutTs;
this.maxMsgSizeOfResultReceiver = maxMsgSizeOfResultReceiver;
}
public RowBatch getNext(Status status) throws TException {
@ -136,8 +141,19 @@ public class ResultReceiver {
} else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
byte[] serialResult = pResult.getRowBatch().toByteArray();
TResultBatch resultBatch = new TResultBatch();
TDeserializer deserializer = new TDeserializer();
deserializer.deserialize(resultBatch, serialResult);
TDeserializer deserializer = new TDeserializer(
new TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
try {
deserializer.deserialize(resultBatch, serialResult);
} catch (TException e) {
if (e.getMessage().contains("MaxMessageSize reached")) {
throw new TException(
"MaxMessageSize reached, try increase max_msg_size_of_result_receiver");
} else {
throw e;
}
}
rowBatch.setBatch(resultBatch);
rowBatch.setEos(pResult.getEos());
return rowBatch;

View File

@ -45,6 +45,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TConfiguration;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@ -515,6 +516,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String SHOW_ALL_FE_CONNECTION = "show_all_fe_connection";
public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER = "max_msg_size_of_result_receiver";
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@ -1636,6 +1639,13 @@ public class SessionVariable implements Serializable, Writable {
"当变量为true时,show processlist命令展示所有fe的连接"})
public boolean showAllFeConnection = false;
@VariableMgr.VarAttr(name = MAX_MSG_SIZE_OF_RESULT_RECEIVER,
description = {"Max message size during result deserialization, change this if you meet error"
+ " like \"MaxMessageSize reached\"",
"用于控制结果反序列化时 thrift 字段的最大值,当遇到类似\"MaxMessageSize reached\"这样的错误时可以考虑修改该参数"})
public int maxMsgSizeOfResultReceiver = TConfiguration.DEFAULT_MAX_MESSAGE_SIZE;
// CLOUD_VARIABLES_BEGIN
@VariableMgr.VarAttr(name = CLOUD_CLUSTER)
public String cloudCluster = "";
@ -3450,4 +3460,8 @@ public class SessionVariable implements Serializable, Writable {
return this.showAllFeConnection;
}
public int getMaxMsgSizeOfResultReceiver() {
return this.maxMsgSizeOfResultReceiver;
}
}

View File

@ -1569,7 +1569,8 @@ public class StmtExecutor {
RowBatch batch;
CoordInterface coordBase = null;
if (queryStmt instanceof SelectStmt && ((SelectStmt) parsedStmt).isPointQueryShortCircuit()) {
coordBase = new PointQueryExec(planner, analyzer);
coordBase = new PointQueryExec(planner, analyzer,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
} else {
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.rpc;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransport;
/*
* CustomProtocolFactory can change maxMessageSize of TTransport,
* so that we can transfer message whose filed size is very large.
*/
public class TCustomProtocolFactory implements TProtocolFactory {
private final int maxMessageSize;
@Override
public TProtocol getProtocol(TTransport tTransport) {
tTransport.getConfiguration().setMaxMessageSize(maxMessageSize);
return new TBinaryProtocol(tTransport);
}
public TCustomProtocolFactory(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
}