[fix](jdbc) fix Unknown command(27) (#41621) (#41637)

pick https://github.com/apache/doris/pull/41621
This commit is contained in:
meiyi
2024-10-10 17:46:45 +08:00
committed by GitHub
parent a26079c09d
commit 070bd6cf99
3 changed files with 116 additions and 2 deletions

View File

@ -48,8 +48,8 @@ public enum MysqlCommand {
COM_STMT_SEND_LONG_DATA("COM_STMT_SEND_LONG_DATA", 24),
COM_STMT_CLOSE("COM_STMT_CLOSE", 25),
COM_STMT_RESET("COM_STMT_RESET", 26),
COM_SET_OPTION("COM_RESET_CONNECTION", 27),
COM_STMT_FETCH("COM_RESET_CONNECTION", 28),
COM_SET_OPTION("COM_SET_OPTION", 27),
COM_STMT_FETCH("COM_STMT_FETCH", 28),
COM_DAEMON("COM_DAEMON", 29),
COM_RESET_CONNECTION("COM_RESET_CONNECTION", 31);

View File

@ -336,6 +336,9 @@ public class MysqlConnectProcessor extends ConnectProcessor {
case COM_STMT_CLOSE:
handleStmtClose();
break;
case COM_SET_OPTION:
handleSetOption();
break;
default:
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unsupported command(" + command + ")");
LOG.warn("Unsupported command(" + command + ")");
@ -436,6 +439,15 @@ public class MysqlConnectProcessor extends ConnectProcessor {
ctx.getState().setOk();
}
private void handleSetOption() {
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_set_option.html
int optionOperation = MysqlProto.readInt2(packetBuf);
LOG.debug("option_operation {}", optionOperation);
// Do nothing for now.
// https://dev.mysql.com/doc/c-api/8.0/en/mysql-set-server-option.html
ctx.getState().setOk();
}
// Process a MySQL request
public void processOnce() throws IOException {
// set status of query to OK.

View File

@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import java.util.Arrays
import java.util.stream.Collectors
suite("test_jdbc") {
def user = context.config.jdbcUser
def password = context.config.jdbcPassword
def realDb = "regression_test_insert_p0"
def tableName = realDb + ".test_jdbc"
sql "CREATE DATABASE IF NOT EXISTS ${realDb}"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`id` int(11) NULL,
`phone` varchar(50) NULL,
) ENGINE=OLAP
unique KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
// Parse url
String jdbcUrl = context.config.jdbcUrl
String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
def sql_port
if (urlWithoutSchema.indexOf("/") >= 0) {
// e.g: jdbc:mysql://localhost:8080/?a=b
sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/"))
} else {
// e.g: jdbc:mysql://localhost:8080
sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1)
}
String url = String.format("jdbc:mysql://%s:%s/%s?useLocalSessionState=true", sql_ip, sql_port, realDb)
def batchSize = 5
def urls = [
url,
url + "&rewriteBatchedStatements=true",
url + "&rewriteBatchedStatements=true&allowMultiQueries=true",
url + "&rewriteBatchedStatements=true&allowMultiQueries=false"
]
def insert = { jdbc_url ->
connect(user = user, password = password, url = jdbc_url) {
logger.info("insert url: {}", jdbc_url)
def ps = prepareStatement "insert into ${tableName} values(?, ?)"
for (int i = 0; i < batchSize; i++) {
String phone = UUID.randomUUID().toString()
ps.setInt(1, i + 1)
ps.setString(2, phone)
logger.info((i + 1) + ", " + phone)
ps.addBatch()
}
int[] results = ps.executeBatch()
logger.info("insert results: {}", Arrays.stream(results).boxed().map(i -> String.valueOf(i)).collect(Collectors.joining(", ")))
ps.close()
}
}
def update = { jdbc_url ->
connect(user = user, password = password, url = jdbc_url) {
logger.info("update url: {}", jdbc_url)
def ps = prepareStatement "update ${tableName} set phone = ? where id = ?";
for (int i = 0; i < batchSize; i++) {
String phone = UUID.randomUUID().toString()
ps.setInt(2, i + 1)
ps.setString(1, phone)
logger.info((i + 1) + ", " + phone)
ps.addBatch()
}
int[] results = ps.executeBatch()
logger.info("update results: {}", Arrays.stream(results).boxed().map(i -> String.valueOf(i)).collect(Collectors.joining(", ")))
ps.close()
}
}
for (final def jdbc_url in urls) {
insert(jdbc_url)
update(jdbc_url)
}
}