[improvement](dry-run)(tvf) support csv schema in tvf and add "dry_run_query" variable (#16983)

This CL mainly changes:

Support specifying csv schema manually in s3/hdfs table valued function

s3 (
'URI' = 'https://bucket1/inventory.dat',
'ACCESS_KEY'= 'ak',
'SECRET_KEY' = 'sk',
'FORMAT' = 'csv',
'column_separator' = '|',
'csv_schema' = 'k1:int;k2:int;k3:int;k4:decimal(38,10)',
'use_path_style'='true'
)
Add new session variable dry_run_query

If set to true, the real query result will not be returned, instead, it will only return the number of returned rows.

mysql> select * from bigtable;
+--------------+
| ReturnedRows |
+--------------+
| 10000000     |
+--------------+
This can avoid large result set transmission time and focus on real execution time of query engine.
For debug and analysis purpose.
This commit is contained in:
Mingyu Chen
2023-03-02 16:51:27 +08:00
committed by GitHub
parent 17f4990bd3
commit 39f59f554a
15 changed files with 384 additions and 12 deletions

View File

@ -32,5 +32,4 @@ public class CommonResultSet extends AbstractResultSet {
super(columns);
}
}
}

View File

@ -1213,6 +1213,10 @@ public class Coordinator {
LOG.debug("no block query, return num >= limit rows, need cancel");
cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH);
}
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
numReceivedRows = 0;
numReceivedRows += resultBatch.getQueryStatistics().getReturnedRows();
}
} else if (resultBatch.getBatch() != null) {
numReceivedRows += resultBatch.getBatch().getRowsSize();
}
@ -3324,3 +3328,4 @@ public class Coordinator {
}
}

View File

@ -283,6 +283,8 @@ public class SessionVariable implements Serializable, Writable {
// fix replica to query. If num = 1, query the smallest replica, if 2 is the second smallest replica.
public static final String USE_FIX_REPLICA = "use_fix_replica";
public static final String DRY_RUN_QUERY = "dry_run_query";
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@ -752,6 +754,11 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = USE_FIX_REPLICA)
public int useFixReplica = -1;
// If set to true, all query will be executed without returning result
@VariableMgr.VarAttr(name = DRY_RUN_QUERY, needForward = true)
public boolean dryRunQuery = false;
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables,
// not the default value set in the code.
public void initFuzzyModeVariables() {
@ -1617,6 +1624,10 @@ public class SessionVariable implements Serializable, Writable {
tResult.setEnableFileCache(enableFileCache);
if (dryRunQuery) {
tResult.setDryRunQuery(true);
}
return tResult;
}

View File

@ -109,6 +109,7 @@ import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
@ -198,6 +199,11 @@ public class StmtExecutor implements ProfileWriter {
private String stmtName;
private PrepareStmt prepareStmt;
// The result schema if "dry_run_query" is true.
// Only one column to indicate the real return row numbers.
private static final CommonResultSetMetaData DRY_RUN_QUERY_METADATA = new CommonResultSetMetaData(
Lists.newArrayList(new Column("ReturnedRows", PrimitiveType.STRING)));
// this constructor is mainly for proxy
public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) {
this.context = context;
@ -1289,7 +1295,17 @@ public class StmtExecutor implements ProfileWriter {
}
if (!isSendFields) {
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
// Return a one row one column result set, with the real result number
List<String> data = Lists.newArrayList(batch.getQueryStatistics() == null ? "0"
: batch.getQueryStatistics().getReturnedRows() + "");
ResultSet resultSet = new CommonResultSet(DRY_RUN_QUERY_METADATA,
Collections.singletonList(data));
sendResultSet(resultSet);
return;
} else {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
}
} else {
sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
}

View File

@ -23,8 +23,10 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.planner.PlanNodeId;
@ -51,11 +53,12 @@ import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
@ -63,6 +66,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* ExternalFileTableValuedFunction is used for S3/HDFS/LOCAL table-valued-function
@ -82,6 +87,11 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
protected static final String FUZZY_PARSE = "fuzzy_parse";
protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes";
protected static final String SKIP_LINES = "skip_lines";
protected static final String CSV_SCHEMA = "csv_schema";
// decimal(p,s)
private static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
// datetime(p)
private static final Pattern DATETIME_TYPE_PATTERN = Pattern.compile("datetime\\((\\d+)\\)");
protected static final ImmutableSet<String> FILE_FORMAT_PROPERTIES = new ImmutableSet.Builder<String>()
.add(FORMAT)
@ -95,10 +105,14 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
.add(LINE_DELIMITER)
.add(TRIM_DOUBLE_QUOTES)
.add(SKIP_LINES)
.add(CSV_SCHEMA)
.build();
// Columns got from file
protected List<Column> columns = null;
// User specified csv columns, it will override columns got from file
private List<Column> csvSchema = Lists.newArrayList();
protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
protected Map<String, String> locationProperties;
@ -130,6 +144,10 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
return locationProperties;
}
public List<Column> getCsvSchema() {
return csvSchema;
}
public String getFsName() {
TFileType fileType = getTFileType();
if (fileType == TFileType.FILE_HDFS) {
@ -187,6 +205,82 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
fuzzyParse = Boolean.valueOf(validParams.get(FUZZY_PARSE)).booleanValue();
trimDoubleQuotes = Boolean.valueOf(validParams.get(TRIM_DOUBLE_QUOTES)).booleanValue();
skipLines = Integer.valueOf(validParams.getOrDefault(SKIP_LINES, "0")).intValue();
if (formatString.equals("csv") || formatString.equals("csv_with_names")
|| formatString.equals("csv_with_names_and_types")) {
parseCsvSchema(csvSchema, validParams);
}
}
// public for unit test
public static void parseCsvSchema(List<Column> csvSchema, Map<String, String> validParams)
throws AnalysisException {
String csvSchemaStr = validParams.get(CSV_SCHEMA);
if (Strings.isNullOrEmpty(csvSchemaStr)) {
return;
}
// the schema str is like: "k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)"
String[] schemaStrs = csvSchemaStr.split(";");
try {
for (String schemaStr : schemaStrs) {
String[] kv = schemaStr.replace(" ", "").split(":");
if (kv.length != 2) {
throw new AnalysisException("invalid csv schema: " + csvSchemaStr);
}
Column column = null;
String name = kv[0].toLowerCase();
FeNameFormat.checkColumnName(name);
String type = kv[1].toLowerCase();
if (type.equals("tinyint")) {
column = new Column(name, PrimitiveType.TINYINT, true);
} else if (type.equals("smallint")) {
column = new Column(name, PrimitiveType.SMALLINT, true);
} else if (type.equals("int")) {
column = new Column(name, PrimitiveType.INT, true);
} else if (type.equals("bigint")) {
column = new Column(name, PrimitiveType.BIGINT, true);
} else if (type.equals("largeint")) {
column = new Column(name, PrimitiveType.LARGEINT, true);
} else if (type.equals("float")) {
column = new Column(name, PrimitiveType.FLOAT, true);
} else if (type.equals("double")) {
column = new Column(name, PrimitiveType.DOUBLE, true);
} else if (type.startsWith("decimal")) {
// regex decimal(p, s)
Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(type);
if (!matcher.find()) {
throw new AnalysisException("invalid decimal type: " + type);
}
int precision = Integer.parseInt(matcher.group(1));
int scale = Integer.parseInt(matcher.group(2));
column = new Column(name, ScalarType.createDecimalV3Type(precision, scale), false, null, true, null,
"");
} else if (type.equals("date")) {
column = new Column(name, ScalarType.createDateType(), false, null, true, null, "");
} else if (type.startsWith("datetime")) {
int scale = 0;
if (!type.equals("datetime")) {
// regex datetime(s)
Matcher matcher = DATETIME_TYPE_PATTERN.matcher(type);
if (!matcher.find()) {
throw new AnalysisException("invalid datetime type: " + type);
}
scale = Integer.parseInt(matcher.group(1));
}
column = new Column(name, ScalarType.createDatetimeV2Type(scale), false, null, true, null, "");
} else if (type.equals("string")) {
column = new Column(name, PrimitiveType.STRING, true);
} else if (type.equals("boolean")) {
column = new Column(name, PrimitiveType.BOOLEAN, true);
} else {
throw new AnalysisException("unsupported column type: " + type);
}
csvSchema.add(column);
}
LOG.debug("get csv schema: {}", csvSchema);
} catch (Exception e) {
throw new AnalysisException("invalid csv schema: " + e.getMessage());
}
}
public List<TBrokerFileStatus> getFileStatuses() {
@ -221,6 +315,9 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
@Override
public List<Column> getTableColumns() throws AnalysisException {
if (!csvSchema.isEmpty()) {
return csvSchema;
}
if (this.columns != null) {
return columns;
}

View File

@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class ExternalFileTableValuedFunctionTest {
@Test
public void testCsvSchemaParse() {
Config.enable_date_conversion = true;
Map<String, String> properties = Maps.newHashMap();
properties.put(ExternalFileTableValuedFunction.CSV_SCHEMA,
"k1:int;k2:bigint;k3:float;k4:double;k5:smallint;k6:tinyint;k7:bool;"
+ "k8:char(10);k9:varchar(20);k10:date;k11:datetime;k12:decimal(10,2)");
List<Column> csvSchema = Lists.newArrayList();
try {
ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, properties);
Assert.fail();
} catch (AnalysisException e) {
e.printStackTrace();
Assert.assertTrue(e.getMessage().contains("unsupported column type: bool"));
}
csvSchema.clear();
properties.put(ExternalFileTableValuedFunction.CSV_SCHEMA,
"k1:int;k2:bigint;k3:float;k4:double;k5:smallint;k6:tinyint;k7:boolean;"
+ "k8:string;k9:date;k10:datetime;k11:decimal(10, 2);k12:decimal( 38,10); k13:datetime(5)");
try {
ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, properties);
Assert.assertEquals(13, csvSchema.size());
Column decimalCol = csvSchema.get(10);
Assert.assertEquals(10, decimalCol.getPrecision());
Assert.assertEquals(2, decimalCol.getScale());
decimalCol = csvSchema.get(11);
Assert.assertEquals(38, decimalCol.getPrecision());
Assert.assertEquals(10, decimalCol.getScale());
Column datetimeCol = csvSchema.get(12);
Assert.assertEquals(5, datetimeCol.getScale());
for (int i = 0; i < csvSchema.size(); i++) {
Column col = csvSchema.get(i);
switch (col.getName()) {
case "k1":
Assert.assertEquals(PrimitiveType.INT, col.getType().getPrimitiveType());
break;
case "k2":
Assert.assertEquals(PrimitiveType.BIGINT, col.getType().getPrimitiveType());
break;
case "k3":
Assert.assertEquals(PrimitiveType.FLOAT, col.getType().getPrimitiveType());
break;
case "k4":
Assert.assertEquals(PrimitiveType.DOUBLE, col.getType().getPrimitiveType());
break;
case "k5":
Assert.assertEquals(PrimitiveType.SMALLINT, col.getType().getPrimitiveType());
break;
case "k6":
Assert.assertEquals(PrimitiveType.TINYINT, col.getType().getPrimitiveType());
break;
case "k7":
Assert.assertEquals(PrimitiveType.BOOLEAN, col.getType().getPrimitiveType());
break;
case "k8":
Assert.assertEquals(PrimitiveType.STRING, col.getType().getPrimitiveType());
break;
case "k9":
Assert.assertEquals(PrimitiveType.DATEV2, col.getType().getPrimitiveType());
break;
case "k10":
Assert.assertEquals(PrimitiveType.DATETIMEV2, col.getType().getPrimitiveType());
break;
case "k11":
Assert.assertEquals(PrimitiveType.DECIMAL64, col.getType().getPrimitiveType());
break;
case "k12":
Assert.assertEquals(PrimitiveType.DECIMAL128, col.getType().getPrimitiveType());
break;
case "k13":
Assert.assertEquals(PrimitiveType.DATETIMEV2, col.getType().getPrimitiveType());
break;
default:
Assert.fail("unknown column name: " + col.getName());
}
}
} catch (AnalysisException e) {
e.printStackTrace();
Assert.fail();
}
}
}