bp #36559
This commit is contained in:
@ -0,0 +1,35 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* This is a special exception.
|
||||
* If this exception is thrown, it means that the connection to the server is abnormal.
|
||||
* We need to kill the connection actively.
|
||||
*/
|
||||
public class ConnectionException extends IOException {
|
||||
public ConnectionException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public ConnectionException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.mysql;
|
||||
|
||||
import org.apache.doris.common.ConnectionException;
|
||||
import org.apache.doris.common.util.NetUtils;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ConnectProcessor;
|
||||
@ -401,11 +402,13 @@ public class MysqlChannel implements BytesChannel {
|
||||
protected void realNetSend(ByteBuffer buffer) throws IOException {
|
||||
buffer = encryptData(buffer);
|
||||
long bufLen = buffer.remaining();
|
||||
long start = System.currentTimeMillis();
|
||||
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(),
|
||||
TimeUnit.SECONDS);
|
||||
if (bufLen != writeLen) {
|
||||
throw new IOException("Write mysql packet failed.[write=" + writeLen
|
||||
+ ", needToWrite=" + bufLen + "]");
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
throw new ConnectionException("Write mysql packet failed.[write=" + writeLen
|
||||
+ ", needToWrite=" + bufLen + "], duration: " + duration + " ms");
|
||||
}
|
||||
Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS);
|
||||
isSend = true;
|
||||
|
||||
@ -32,6 +32,7 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ConnectionException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.NotImplementedException;
|
||||
@ -198,9 +199,11 @@ public abstract class ConnectProcessor {
|
||||
}
|
||||
|
||||
// only throw an exception when there is a problem interacting with the requesting client
|
||||
protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) {
|
||||
protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) throws ConnectionException {
|
||||
try {
|
||||
executeQuery(mysqlCommand, originStmt);
|
||||
} catch (ConnectionException exception) {
|
||||
throw exception;
|
||||
} catch (Exception ignored) {
|
||||
// saved use handleQueryException
|
||||
}
|
||||
@ -414,14 +417,18 @@ public abstract class ConnectProcessor {
|
||||
|
||||
// Use a handler for exception to avoid big try catch block which is a little hard to understand
|
||||
protected void handleQueryException(Throwable throwable, String origStmt,
|
||||
StatementBase parsedStmt, Data.PQueryStatistics statistics) {
|
||||
StatementBase parsedStmt, Data.PQueryStatistics statistics) throws ConnectionException {
|
||||
if (ctx.getMinidump() != null) {
|
||||
MinidumpUtils.saveMinidumpString(ctx.getMinidump(), DebugUtil.printId(ctx.queryId()));
|
||||
}
|
||||
if (throwable instanceof IOException) {
|
||||
if (throwable instanceof ConnectionException) {
|
||||
// Throw this exception to close the connection outside.
|
||||
LOG.warn("Process one query failed because ConnectionException: ", throwable);
|
||||
throw (ConnectionException) throwable;
|
||||
} else if (throwable instanceof IOException) {
|
||||
// Client failed.
|
||||
LOG.warn("Process one query failed because IOException: ", throwable);
|
||||
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris process failed");
|
||||
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris process failed: " + throwable.getMessage());
|
||||
} else if (throwable instanceof UserException) {
|
||||
LOG.warn("Process one query failed because.", throwable);
|
||||
ctx.getState().setError(((UserException) throwable).getMysqlErrorCode(), throwable.getMessage());
|
||||
@ -479,7 +486,7 @@ public abstract class ConnectProcessor {
|
||||
|
||||
// Get the column definitions of a table
|
||||
@SuppressWarnings("rawtypes")
|
||||
protected void handleFieldList(String tableName) {
|
||||
protected void handleFieldList(String tableName) throws ConnectionException {
|
||||
// Already get command code.
|
||||
if (Strings.isNullOrEmpty(tableName)) {
|
||||
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_TABLE, "Empty tableName");
|
||||
|
||||
@ -24,6 +24,7 @@ import org.apache.doris.analysis.PrepareStmt;
|
||||
import org.apache.doris.analysis.QueryStmt;
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.catalog.MysqlColType;
|
||||
import org.apache.doris.common.ConnectionException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.mysql.MysqlChannel;
|
||||
@ -245,7 +246,7 @@ public class MysqlConnectProcessor extends ConnectProcessor {
|
||||
}
|
||||
|
||||
// Process COM_QUERY statement,
|
||||
private void handleQuery(MysqlCommand mysqlCommand) {
|
||||
private void handleQuery(MysqlCommand mysqlCommand) throws ConnectionException {
|
||||
// convert statement to Java string
|
||||
byte[] bytes = packetBuf.array();
|
||||
int ending = packetBuf.limit() - 1;
|
||||
@ -307,7 +308,7 @@ public class MysqlConnectProcessor extends ConnectProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
private void handleFieldList() {
|
||||
private void handleFieldList() throws ConnectionException {
|
||||
String tableName = new String(MysqlProto.readNulTerminateString(packetBuf), StandardCharsets.UTF_8);
|
||||
handleFieldList(tableName);
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.service.arrowflight;
|
||||
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.common.ConnectionException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.Status;
|
||||
@ -81,7 +82,7 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC
|
||||
ctx.setStartTime();
|
||||
}
|
||||
|
||||
public void handleQuery(String query) {
|
||||
public void handleQuery(String query) throws ConnectionException {
|
||||
MysqlCommand command = MysqlCommand.COM_QUERY;
|
||||
prepare(command);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user