[opt](compatibility) fill schema and table name in protocol column def (#38126) (#38522)

pick from master #38126

according to mysql text protocol Protocol::ColumnDefinition41, should
fill schema name, table name into it if column from table.
This commit is contained in:
morrySnow
2024-07-30 17:41:07 +08:00
committed by GitHub
parent 43ec98a30b
commit e25eb733c1
12 changed files with 266 additions and 22 deletions

View File

@ -17,6 +17,8 @@
package org.apache.doris.analysis;
import org.apache.doris.mysql.FieldInfo;
import java.util.ArrayList;
import java.util.List;
@ -36,5 +38,9 @@ public interface Queriable {
ArrayList<String> getColLabels();
default List<FieldInfo> getFieldInfos() {
return null;
}
String toDigest();
}

View File

@ -240,7 +240,7 @@ public class NereidsSqlCacheManager {
String cachedPlan = sqlCacheContext.getPhysicalPlan();
LogicalSqlCache logicalSqlCache = new LogicalSqlCache(
sqlCacheContext.getQueryId(), sqlCacheContext.getColLabels(),
sqlCacheContext.getQueryId(), sqlCacheContext.getColLabels(), sqlCacheContext.getFieldInfos(),
sqlCacheContext.getResultExprs(), resultSetInFe, ImmutableList.of(),
"none", cachedPlan
);
@ -265,7 +265,7 @@ public class NereidsSqlCacheManager {
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);
LogicalSqlCache logicalSqlCache = new LogicalSqlCache(
sqlCacheContext.getQueryId(), sqlCacheContext.getColLabels(),
sqlCacheContext.getQueryId(), sqlCacheContext.getColLabels(), sqlCacheContext.getFieldInfos(),
sqlCacheContext.getResultExprs(), Optional.empty(),
cacheValues, backendAddress, cachedPlan
);

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mysql;
/**
* according to mysql text protocol ColumnDefinition41. Field should be filled by these attribute.
*/
public class FieldInfo {
private final String schema;
private final String table;
private final String originalTable;
private final String name;
private final String originalName;
public FieldInfo(String schema, String table, String originalTable, String name, String originalName) {
this.schema = schema;
this.table = table;
this.originalTable = originalTable;
this.name = name;
this.originalName = originalName;
}
public String getSchema() {
return schema;
}
public String getTable() {
return table;
}
public String getOriginalTable() {
return originalTable;
}
public String getName() {
return name;
}
public String getOriginalName() {
return originalName;
}
}

View File

@ -161,6 +161,35 @@ public class MysqlSerializer {
}
}
public void writeField(FieldInfo fieldInfo, Type type) {
// Catalog Name: length encoded string
writeLenEncodedString("def");
// Schema: length encoded string
writeLenEncodedString(fieldInfo.getSchema());
// Table: length encoded string
writeLenEncodedString(fieldInfo.getTable());
// Origin Table: length encoded string
writeLenEncodedString(fieldInfo.getOriginalTable());
// Name: length encoded string
writeLenEncodedString(fieldInfo.getName());
// Original Name: length encoded string
writeLenEncodedString(fieldInfo.getOriginalName());
// length of the following fields(always 0x0c)
writeVInt(0x0c);
// Character set: two byte integer
writeInt2(33);
// Column length: four byte integer
writeInt4(getMysqlTypeLength(type));
// Column type: one byte integer
writeInt1(type.getPrimitiveType().toMysqlType().getCode());
// Flags: two byte integer
writeInt2(0);
// Decimals: one byte integer
writeInt1(getMysqlDecimals(type));
// filler: two byte integer
writeInt2(0);
}
public void writeField(String db, String table, Column column, boolean sendDefault) {
// Catalog Name: length encoded string
writeLenEncodedString("def");

View File

@ -21,11 +21,13 @@ import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.nereids.CascadesContext.Lock;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@ -46,6 +48,7 @@ import org.apache.doris.nereids.processor.pre.PlanPreprocessors;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializationContext;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.ComputeResultSet;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
@ -76,7 +79,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Planner to do query plan in Nereids.
@ -170,7 +172,8 @@ public class NereidsPlanner extends Planner {
rewrittenPlan = analyzedPlan = plan;
LogicalSqlCache logicalSqlCache = (LogicalSqlCache) plan;
physicalPlan = new PhysicalSqlCache(
logicalSqlCache.getQueryId(), logicalSqlCache.getColumnLabels(),
logicalSqlCache.getQueryId(),
logicalSqlCache.getColumnLabels(), logicalSqlCache.getFieldInfos(),
logicalSqlCache.getResultExprs(), logicalSqlCache.getResultSetInFe(),
logicalSqlCache.getCacheValues(), logicalSqlCache.getBackendAddress(),
logicalSqlCache.getPlanBody()
@ -337,13 +340,35 @@ public class NereidsPlanner extends Planner {
}
// set output exprs
logicalPlanAdapter.setResultExprs(root.getOutputExprs());
ArrayList<String> columnLabelList = physicalPlan.getOutput().stream().map(NamedExpression::getName)
.collect(Collectors.toCollection(ArrayList::new));
logicalPlanAdapter.setColLabels(columnLabelList);
ArrayList<String> columnLabels = Lists.newArrayListWithExpectedSize(physicalPlan.getOutput().size());
List<FieldInfo> fieldInfos = Lists.newArrayListWithExpectedSize(physicalPlan.getOutput().size());
for (NamedExpression output : physicalPlan.getOutput()) {
Optional<Column> column = Optional.empty();
Optional<TableIf> table = Optional.empty();
if (output instanceof SlotReference) {
SlotReference slotReference = (SlotReference) output;
column = slotReference.getColumn();
table = slotReference.getTable();
}
columnLabels.add(output.getName());
FieldInfo fieldInfo = new FieldInfo(
table.isPresent() ? (table.get().getDatabase() != null
? table.get().getDatabase().getFullName() : "") : "",
!output.getQualifier().isEmpty() ? output.getQualifier().get(output.getQualifier().size() - 1)
: (table.isPresent() ? table.get().getName() : ""),
table.isPresent() ? table.get().getName() : "",
output.getName(),
column.isPresent() ? column.get().getName() : ""
);
fieldInfos.add(fieldInfo);
}
logicalPlanAdapter.setColLabels(columnLabels);
logicalPlanAdapter.setFieldInfos(fieldInfos);
logicalPlanAdapter.setViewDdlSqls(statementContext.getViewDdlSqls());
if (statementContext.getSqlCacheContext().isPresent()) {
SqlCacheContext sqlCacheContext = statementContext.getSqlCacheContext().get();
sqlCacheContext.setColLabels(columnLabelList);
sqlCacheContext.setColLabels(columnLabels);
sqlCacheContext.setFieldInfos(fieldInfos);
sqlCacheContext.setResultExprs(root.getOutputExprs());
sqlCacheContext.setPhysicalPlan(resultPlan.treeString());
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.mysql.privilege.DataMaskPolicy;
import org.apache.doris.mysql.privilege.RowFilterPolicy;
import org.apache.doris.nereids.trees.expressions.Expression;
@ -82,6 +83,7 @@ public class SqlCacheContext {
private volatile List<Expr> resultExprs;
private volatile List<String> colLabels;
private volatile List<FieldInfo> fieldInfos;
private volatile PUniqueId cacheKeyMd5;
private volatile ResultSet resultSetInFe;
@ -320,6 +322,14 @@ public class SqlCacheContext {
this.colLabels = ImmutableList.copyOf(colLabels);
}
public List<FieldInfo> getFieldInfos() {
return fieldInfos;
}
public void setFieldInfos(List<FieldInfo> fieldInfos) {
this.fieldInfos = fieldInfos;
}
public TUniqueId getQueryId() {
return queryId;
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.Queriable;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
@ -46,6 +47,7 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable {
private final LogicalPlan logicalPlan;
private List<Expr> resultExprs;
private ArrayList<String> colLabels;
private List<FieldInfo> fieldInfos;
private List<String> viewDdlSqls;
public LogicalPlanAdapter(LogicalPlan logicalPlan, StatementContext statementContext) {
@ -100,6 +102,11 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable {
return colLabels;
}
@Override
public List<FieldInfo> getFieldInfos() {
return fieldInfos;
}
public List<String> getViewDdlSqls() {
return viewDdlSqls;
}
@ -117,6 +124,10 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable {
this.colLabels = colLabels;
}
public void setFieldInfos(List<FieldInfo> fieldInfos) {
this.fieldInfos = fieldInfos;
}
public void setViewDdlSqls(List<String> viewDdlSqls) {
this.viewDdlSqls = viewDdlSqls;
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.analysis.Expr;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
@ -44,6 +45,7 @@ import java.util.Optional;
public class LogicalSqlCache extends LogicalLeaf implements SqlCache, TreeStringPlan, BlockFuncDepsPropagation {
private final TUniqueId queryId;
private final List<String> columnLabels;
private final List<FieldInfo> fieldInfos;
private final List<Expr> resultExprs;
private final Optional<ResultSet> resultSetInFe;
private final List<InternalService.PCacheValue> cacheValues;
@ -52,12 +54,13 @@ public class LogicalSqlCache extends LogicalLeaf implements SqlCache, TreeString
/** LogicalSqlCache */
public LogicalSqlCache(TUniqueId queryId,
List<String> columnLabels, List<Expr> resultExprs,
List<String> columnLabels, List<FieldInfo> fieldInfos, List<Expr> resultExprs,
Optional<ResultSet> resultSetInFe, List<InternalService.PCacheValue> cacheValues,
String backendAddress, String planBody) {
super(PlanType.LOGICAL_SQL_CACHE, Optional.empty(), Optional.empty());
this.queryId = Objects.requireNonNull(queryId, "queryId can not be null");
this.columnLabels = Objects.requireNonNull(columnLabels, "columnLabels can not be null");
this.fieldInfos = Objects.requireNonNull(fieldInfos, "fieldInfos can not be null");
this.resultExprs = Objects.requireNonNull(resultExprs, "resultExprs can not be null");
this.resultSetInFe = Objects.requireNonNull(resultSetInFe, "resultSetInFe can not be null");
this.cacheValues = Objects.requireNonNull(cacheValues, "cacheValues can not be null");
@ -85,6 +88,10 @@ public class LogicalSqlCache extends LogicalLeaf implements SqlCache, TreeString
return columnLabels;
}
public List<FieldInfo> getFieldInfos() {
return fieldInfos;
}
public List<Expr> getResultExprs() {
return resultExprs;
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.analysis.Expr;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.SqlCacheContext;
import org.apache.doris.nereids.memo.GroupExpression;
@ -50,6 +51,7 @@ import java.util.Optional;
public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStringPlan, ComputeResultSet {
private final TUniqueId queryId;
private final List<String> columnLabels;
private final List<FieldInfo> fieldInfos;
private final List<Expr> resultExprs;
private final Optional<ResultSet> resultSet;
private final List<InternalService.PCacheValue> cacheValues;
@ -58,13 +60,14 @@ public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStri
/** PhysicalSqlCache */
public PhysicalSqlCache(TUniqueId queryId,
List<String> columnLabels, List<Expr> resultExprs,
List<String> columnLabels, List<FieldInfo> fieldInfos, List<Expr> resultExprs,
Optional<ResultSet> resultSet, List<InternalService.PCacheValue> cacheValues,
String backendAddress, String planBody) {
super(PlanType.PHYSICAL_SQL_CACHE, Optional.empty(),
new LogicalProperties(() -> ImmutableList.of(), () -> FunctionalDependencies.EMPTY_FUNC_DEPS));
this.queryId = Objects.requireNonNull(queryId, "queryId can not be null");
this.columnLabels = Objects.requireNonNull(columnLabels, "colNames can not be null");
this.fieldInfos = Objects.requireNonNull(fieldInfos, "fieldInfos can not be null");
this.resultExprs = Objects.requireNonNull(resultExprs, "resultExprs can not be null");
this.resultSet = Objects.requireNonNull(resultSet, "resultSet can not be null");
this.cacheValues = Objects.requireNonNull(cacheValues, "cacheValues can not be null");
@ -92,6 +95,10 @@ public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStri
return columnLabels;
}
public List<FieldInfo> getFieldInfos() {
return fieldInfos;
}
public List<Expr> getResultExprs() {
return resultExprs;
}

View File

@ -401,6 +401,7 @@ public abstract class ConnectProcessor {
logicalPlanAdapter.setColLabels(
Lists.newArrayList(logicalSqlCache.getColumnLabels())
);
logicalPlanAdapter.setFieldInfos(Lists.newArrayList(logicalSqlCache.getFieldInfos()));
logicalPlanAdapter.setResultExprs(logicalSqlCache.getResultExprs());
logicalPlanAdapter.setOrigStmt(statementContext.getOriginStatement());
logicalPlanAdapter.setUserInfo(ctx.getCurrentUserIdentity());

View File

@ -123,6 +123,7 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.LoadJobRowResult;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.loadv2.LoadManagerAdapter;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlEofPacket;
@ -1579,7 +1580,8 @@ public class StmtExecutor {
batch.setEos(true);
if (!isSend) {
// send meta fields before sending first data batch.
sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs()));
sendFields(selectStmt.getColLabels(), selectStmt.getFieldInfos(),
exprToType(selectStmt.getResultExprs()));
isSend = true;
}
for (ByteBuffer row : batch.getBatch().getRows()) {
@ -1594,7 +1596,8 @@ public class StmtExecutor {
? null : batch.getQueryStatistics().toBuilder();
}
if (!isSend) {
sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs()));
sendFields(selectStmt.getColLabels(), selectStmt.getFieldInfos(),
exprToType(selectStmt.getResultExprs()));
isSend = true;
}
context.getState().setEof();
@ -1686,7 +1689,7 @@ public class StmtExecutor {
&& context.getCommand() != MysqlCommand.COM_STMT_EXECUTE) {
Optional<ResultSet> resultSet = planner.handleQueryInFe(parsedStmt);
if (resultSet.isPresent()) {
sendResultSet(resultSet.get());
sendResultSet(resultSet.get(), ((Queriable) parsedStmt).getFieldInfos());
isHandleQueryInFe = true;
LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
return;
@ -1731,7 +1734,7 @@ public class StmtExecutor {
LOG.debug("ignore handle limit 0 ,sql:{}", parsedSelectStmt.toSql());
}
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
sendFields(queryStmt.getColLabels(), queryStmt.getFieldInfos(), exprToType(queryStmt.getResultExprs()));
context.getState().setEof();
LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
return;
@ -1810,7 +1813,8 @@ public class StmtExecutor {
// so We need to send fields after first batch arrived
if (!isSendFields) {
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
sendFields(queryStmt.getColLabels(), queryStmt.getFieldInfos(),
exprToType(queryStmt.getResultExprs()));
} else {
if (!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) {
outfileWriteSuccess(queryStmt.getOutFileClause());
@ -1856,7 +1860,8 @@ public class StmtExecutor {
sendResultSet(resultSet);
return;
} else {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
sendFields(queryStmt.getColLabels(), queryStmt.getFieldInfos(),
exprToType(queryStmt.getResultExprs()));
}
} else {
sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
@ -2472,16 +2477,25 @@ public class StmtExecutor {
}
private void sendMetaData(ResultSetMetaData metaData) throws IOException {
sendMetaData(metaData, null);
}
private void sendMetaData(ResultSetMetaData metaData, List<FieldInfo> fieldInfos) throws IOException {
Preconditions.checkState(context.getConnectType() == ConnectType.MYSQL);
// sends how many columns
serializer.reset();
serializer.writeVInt(metaData.getColumnCount());
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
// send field one by one
for (Column col : metaData.getColumns()) {
for (int i = 0; i < metaData.getColumns().size(); i++) {
Column col = metaData.getColumn(i);
serializer.reset();
// TODO(zhaochun): only support varchar type
serializer.writeField(col.getName(), col.getType());
if (fieldInfos == null) {
// TODO(zhaochun): only support varchar type
serializer.writeField(col.getName(), col.getType());
} else {
serializer.writeField(fieldInfos.get(i), col.getType());
}
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
// send EOF
@ -2539,6 +2553,10 @@ public class StmtExecutor {
}
private void sendFields(List<String> colNames, List<Type> types) throws IOException {
sendFields(colNames, null, types);
}
private void sendFields(List<String> colNames, List<FieldInfo> fieldInfos, List<Type> types) throws IOException {
Preconditions.checkState(context.getConnectType() == ConnectType.MYSQL);
// sends how many columns
serializer.reset();
@ -2556,13 +2574,21 @@ public class StmtExecutor {
// we send a field
byte[] serializedField = ((PrepareStmt) prepareStmt).getSerializedField(colNames.get(i));
if (serializedField == null) {
serializer.writeField(colNames.get(i), types.get(i));
if (fieldInfos != null) {
serializer.writeField(fieldInfos.get(i), types.get(i));
} else {
serializer.writeField(colNames.get(i), types.get(i));
}
serializedField = serializer.toArray();
((PrepareStmt) prepareStmt).setSerializedField(colNames.get(i), serializedField);
}
context.getMysqlChannel().sendOnePacket(ByteBuffer.wrap(serializedField));
} else {
serializer.writeField(colNames.get(i), types.get(i));
if (fieldInfos != null) {
serializer.writeField(fieldInfos.get(i), types.get(i));
} else {
serializer.writeField(colNames.get(i), types.get(i));
}
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
}
@ -2574,10 +2600,14 @@ public class StmtExecutor {
}
public void sendResultSet(ResultSet resultSet) throws IOException {
sendResultSet(resultSet, null);
}
public void sendResultSet(ResultSet resultSet, List<FieldInfo> fieldInfos) throws IOException {
if (context.getConnectType().equals(ConnectType.MYSQL)) {
context.updateReturnRows(resultSet.getResultRows().size());
// Send meta data.
sendMetaData(resultSet.getMetaData());
sendMetaData(resultSet.getMetaData(), fieldInfos);
// Send result set.
for (List<String> row : resultSet.getResultRows()) {