[enhancement](stmt-forward) record query result for proxy query to avoid EOF (#30536)

This commit is contained in:
Siyang Tang
2024-02-08 12:01:07 +08:00
committed by yiguolei
parent ff82e2ab59
commit 08c196f3dc
11 changed files with 169 additions and 20 deletions

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mysql;
import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import java.util.List;
/**
* An interceptor for proxy query, keeping all packets to be sent to master mysql channel.
*/
public class ProxyMysqlChannel extends MysqlChannel {
private final List<ByteBuffer> proxyResultBuffer = Lists.newArrayList();
@Override
public void sendOnePacket(ByteBuffer packet) {
proxyResultBuffer.add(packet);
}
public List<ByteBuffer> getProxyResultBufferList() {
return proxyResultBuffer;
}
}

View File

@ -45,6 +45,7 @@ import org.apache.doris.mysql.MysqlCapability;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlSslContext;
import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
@ -326,14 +327,20 @@ public class ConnectContext {
}
public ConnectContext() {
this((StreamConnection) null);
this(null);
}
public ConnectContext(StreamConnection connection) {
this(connection, false);
}
public ConnectContext(StreamConnection connection, boolean isProxy) {
connectType = ConnectType.MYSQL;
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
if (connection != null) {
mysqlChannel = new MysqlChannel(connection, this);
} else if (isProxy) {
mysqlChannel = new ProxyMysqlChannel();
} else {
mysqlChannel = new DummyMysqlChannel();
}

View File

@ -445,7 +445,11 @@ public abstract class ConnectProcessor {
&& ctx.getState().getStateType() != QueryState.MysqlStateType.ERR) {
ShowResultSet resultSet = executor.getShowResultSet();
if (resultSet == null) {
packet = executor.getOutputPacket();
if (executor.sendProxyQueryResult()) {
packet = getResultPacket();
} else {
packet = executor.getOutputPacket();
}
} else {
executor.sendResultSet(resultSet);
packet = getResultPacket();
@ -594,8 +598,12 @@ public abstract class ConnectProcessor {
result.setStatusCode(ctx.getState().getErrorCode().getCode());
result.setErrMessage(ctx.getState().getErrorMessage());
}
if (executor != null && executor.getProxyResultSet() != null) {
result.setResultSet(executor.getProxyResultSet().tothrift());
if (executor != null) {
if (executor.getProxyShowResultSet() != null) {
result.setResultSet(executor.getProxyShowResultSet().tothrift());
} else if (!executor.getProxyQueryResultBufList().isEmpty()) {
result.setQueryResultBufList(executor.getProxyQueryResultBufList());
}
}
return result;
}

View File

@ -35,6 +35,8 @@ import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MasterOpExecutor {
@ -144,7 +146,7 @@ public class MasterOpExecutor {
private TMasterOpRequest buildStmtForwardParams() {
TMasterOpRequest params = new TMasterOpRequest();
//node ident
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setSql(originStmt.originStmt);
@ -170,7 +172,7 @@ public class MasterOpExecutor {
private TMasterOpRequest buildSyncJournalParmas() {
final TMasterOpRequest params = new TMasterOpRequest();
//node ident
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setSyncJournalOnly(true);
@ -235,6 +237,10 @@ public class MasterOpExecutor {
}
}
public List<ByteBuffer> getQueryResultBufList() {
return result.isSetQueryResultBufList() ? result.getQueryResultBufList() : Collections.emptyList();
}
public void setResult(TMasterOpResult result) {
this.result = result;
}

View File

@ -100,6 +100,8 @@ import org.apache.doris.common.Version;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.ProfileManager.ProfileType;
@ -115,6 +117,7 @@ import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlEofPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
@ -224,7 +227,7 @@ public class StmtExecutor {
private RedirectStatus redirectStatus = null;
private Planner planner;
private boolean isProxy;
private ShowResultSet proxyResultSet = null;
private ShowResultSet proxyShowResultSet = null;
private Data.PQueryStatistics.Builder statisticsForAuditLog;
private boolean isCached;
private String stmtName;
@ -363,7 +366,7 @@ public class StmtExecutor {
// this is a query stmt, but this non-master FE can not read, forward it to master
if (isQuery() && !Env.getCurrentEnv().isMaster()
&& !Env.getCurrentEnv().canRead()) {
&& (!Env.getCurrentEnv().canRead() || debugForwardAllQueries())) {
return true;
}
@ -374,6 +377,11 @@ public class StmtExecutor {
}
}
private boolean debugForwardAllQueries() {
DebugPoint debugPoint = DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries");
return debugPoint != null && debugPoint.param("forwardAllQueries", true);
}
public ByteBuffer getOutputPacket() {
if (masterOpExecutor == null) {
return null;
@ -382,8 +390,8 @@ public class StmtExecutor {
}
}
public ShowResultSet getProxyResultSet() {
return proxyResultSet;
public ShowResultSet getProxyShowResultSet() {
return proxyShowResultSet;
}
public ShowResultSet getShowResultSet() {
@ -2358,7 +2366,7 @@ public class StmtExecutor {
return;
}
if (isProxy) {
proxyResultSet = resultSet;
proxyShowResultSet = resultSet;
return;
}
@ -2865,8 +2873,8 @@ public class StmtExecutor {
}
public void setProxyResultSet(ShowResultSet proxyResultSet) {
this.proxyResultSet = proxyResultSet;
public void setProxyShowResultSet(ShowResultSet proxyShowResultSet) {
this.proxyShowResultSet = proxyShowResultSet;
}
public ConnectContext getContext() {
@ -2883,6 +2891,24 @@ public class StmtExecutor {
}
return "";
}
public List<ByteBuffer> getProxyQueryResultBufList() {
return ((ProxyMysqlChannel) context.getMysqlChannel()).getProxyResultBufferList();
}
public boolean sendProxyQueryResult() throws IOException {
if (masterOpExecutor == null) {
return false;
}
List<ByteBuffer> queryResultBufList = masterOpExecutor.getQueryResultBufList();
if (queryResultBufList.isEmpty()) {
return false;
}
for (ByteBuffer byteBuffer : queryResultBufList) {
context.getMysqlChannel().sendOnePacket(byteBuffer);
}
return true;
}
}

View File

@ -959,7 +959,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
// add this log so that we can track this stmt
LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost());
ConnectContext context = new ConnectContext();
ConnectContext context = new ConnectContext(null, true);
// Set current connected FE to the client address, so that we can know where this request come from.
context.setCurrentConnectedFEIp(params.getClientNodeHost());

View File

@ -282,7 +282,7 @@ public class AnalysisManager implements Writable {
if (!proxy) {
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
} else {
ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet);
ConnectContext.get().getExecutor().setProxyShowResultSet(commonResultSet);
}
} catch (Throwable t) {
LOG.warn("Failed to send job id to user", t);