[Bug] Fix bug that select into outfile in parquet format may cause NPE (#6054)

1. check the parquet schema property on FE side.
2. auto generate parquet schema if not specified.
This commit is contained in:
Mingyu Chen
2021-06-23 11:33:47 +08:00
committed by GitHub
parent c8899ee5bd
commit 2998373354
2 changed files with 192 additions and 30 deletions

View File

@ -26,12 +26,16 @@ import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TResultFileSinkOptions;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -139,7 +143,11 @@ public class OutFileClause {
return brokerDesc;
}
public void analyze(Analyzer analyzer) throws AnalysisException {
public List<List<String>> getSchema() {
return schema;
}
private void analyze(Analyzer analyzer) throws AnalysisException {
analyzeFilePath();
if (Strings.isNullOrEmpty(filePath)) {
@ -167,14 +175,114 @@ public class OutFileClause {
public void analyze(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
analyze(analyzer);
List<SelectListItem> items = stmt.getSelectList().getItems();
for (SelectListItem item:items) {
if (item.getExpr().getType() == Type.LARGEINT && isParquetFormat()) {
throw new AnalysisException("currently parquet do not support largeint type");
if (isParquetFormat()) {
analyzeForParquetFormat(stmt.getResultExprs());
}
}
private void analyzeForParquetFormat(List<Expr> resultExprs) throws AnalysisException {
if (this.schema.isEmpty()) {
genParquetSchema(resultExprs);
}
// check schema number
if (resultExprs.size() != this.schema.size()) {
throw new AnalysisException("Parquet schema number does not equal to select item number");
}
// check type
for (int i = 0; i < this.schema.size(); ++i) {
String type = this.schema.get(i).get(1);
Type resultType = resultExprs.get(i).getType();
switch (resultType.getPrimitiveType()) {
case BOOLEAN:
if (!type.equals("boolean")) {
throw new AnalysisException("project field type is BOOLEAN, should use boolean, but the type of column "
+ i + " is " + type);
}
break;
case TINYINT:
case SMALLINT:
case INT:
if (!type.equals("int32")) {
throw new AnalysisException("project field type is TINYINT/SMALLINT/INT, should use int32, "
+ "but the definition type of column " + i + " is " + type);
}
break;
case BIGINT:
case DATE:
case DATETIME:
if (!type.equals("int64")) {
throw new AnalysisException("project field type is BIGINT/DATE/DATETIME, should use int64, " +
"but the definition type of column " + i + " is " + type);
}
break;
case FLOAT:
if (!type.equals("float")) {
throw new AnalysisException("project field type is FLOAT, should use float, but the definition type of column "
+ i + " is " + type);
}
break;
case DOUBLE:
if (!type.equals("double")) {
throw new AnalysisException("project field type is DOUBLE, should use double, but the definition type of column "
+ i + " is " + type);
}
break;
case CHAR:
case VARCHAR:
case DECIMALV2:
if (!type.equals("byte_array")) {
throw new AnalysisException("project field type is CHAR/VARCHAR/DECIMAL, should use byte_array, " +
"but the definition type of column " + i + " is " + type);
}
break;
default:
throw new AnalysisException("Parquet format does not support column type: " + resultType.getPrimitiveType());
}
}
}
private void genParquetSchema(List<Expr> resultExprs) throws AnalysisException {
Preconditions.checkState(this.schema.isEmpty());
for (int i = 0; i < resultExprs.size(); ++i) {
Expr expr = resultExprs.get(i);
List<String> column = new ArrayList<>();
column.add("required");
switch (expr.getType().getPrimitiveType()) {
case BOOLEAN:
column.add("boolean");
break;
case TINYINT:
case SMALLINT:
case INT:
column.add("int32");
break;
case BIGINT:
case DATE:
case DATETIME:
column.add("int64");
break;
case FLOAT:
column.add("float");
break;
case DOUBLE:
column.add("double");
break;
case CHAR:
case VARCHAR:
case DECIMALV2:
column.add("byte_array");
break;
default:
throw new AnalysisException("currently parquet do not support column type: " + expr.getType().getPrimitiveType());
}
column.add("col" + i);
this.schema.add(column);
}
}
private void analyzeFilePath() throws AnalysisException {
if (Strings.isNullOrEmpty(filePath)) {
throw new AnalysisException("Must specify file in OUTFILE clause");
@ -238,7 +346,6 @@ public class OutFileClause {
throw new AnalysisException("Unknown properties: " + properties.keySet().stream()
.filter(k -> !processedPropKeys.contains(k)).collect(Collectors.toList()));
}
}
private void getBrokerProperties(Set<String> processedPropKeys) {
@ -273,14 +380,28 @@ public class OutFileClause {
* currently only supports: compression, disable_dictionary, version
*/
private void getParquetProperties(Set<String> processedPropKeys) throws AnalysisException {
String schema = properties.get(SCHEMA);
if (schema == null || schema.length() <= 0) {
throw new AnalysisException("schema is required for parquet file");
// save all parquet prefix property
Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, String> entry = iter.next();
if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) {
processedPropKeys.add(entry.getKey());
fileProperties.put(entry.getKey().substring(PARQUET_PROP_PREFIX.length()), entry.getValue());
}
}
schema = schema.replace(" ","");
// check schema. if schema is not set, Doris will gen schema by select items
String schema = properties.get(SCHEMA);
if (schema == null) {
return;
}
if (schema.isEmpty()) {
throw new AnalysisException("Parquet schema property should not be empty");
}
schema = schema.replace(" ", "");
schema = schema.toLowerCase();
String[] schemas = schema.split(";");
for (String item:schemas) {
for (String item : schemas) {
String[] properties = item.split(",");
if (properties.length != 3) {
throw new AnalysisException("must only contains repetition type/column type/column name");
@ -299,14 +420,6 @@ public class OutFileClause {
this.schema.add(column);
}
processedPropKeys.add(SCHEMA);
Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, String> entry = iter.next();
if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) {
processedPropKeys.add(entry.getKey());
fileProperties.put(entry.getKey().substring(PARQUET_PROP_PREFIX.length()), entry.getValue());
}
}
}
private boolean isCsvFormat() {

View File

@ -17,8 +17,6 @@
package org.apache.doris.analysis;
import mockit.Mock;
import mockit.MockUp;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.Util;
@ -27,8 +25,10 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.utframe.DorisAssert;
import org.apache.doris.utframe.UtFrameUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -40,6 +40,9 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import mockit.Mock;
import mockit.MockUp;
public class SelectStmtTest {
private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/";
private static DorisAssert dorisAssert;
@ -584,15 +587,27 @@ public class SelectStmtTest {
public void testOutfile() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
Config.enable_outfile_to_local = true;
String sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int32,siteid;required,byte_array,username;\");";
String sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,byte_array,col0\");";
dorisAssert.query(sql).explainQuery();
// must contains schema
// if shema not set, gen schema
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET;";
try {
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
Assert.assertEquals(1, stmt.getOutFileClause().getSchema().size());
Assert.assertEquals(Lists.newArrayList("required", "byte_array", "col0"),
stmt.getOutFileClause().getSchema().get(0));
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("schema is required for parquet file"));
Assert.fail(e.getMessage());
}
// schema can not be empty
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"\");";
try {
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Parquet schema property should not be empty"));
}
// schema must contains 3 fields
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"int32,siteid;\");";
try {
@ -600,6 +615,7 @@ public class SelectStmtTest {
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("must only contains repetition type/column type/column name"));
}
// unknown repetition type
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"repeat, int32,siteid;\");";
try {
@ -607,6 +623,7 @@ public class SelectStmtTest {
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("unknown repetition type"));
}
// only support required type
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"repeated,int32,siteid;\");";
try {
@ -614,6 +631,7 @@ public class SelectStmtTest {
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("currently only support required type"));
}
// unknown data type
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int128,siteid;\");";
try {
@ -621,26 +639,57 @@ public class SelectStmtTest {
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("data type is not supported"));
}
// contains parquet properties
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int32,siteid;\", 'parquet.compression'='snappy');";
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,byte_array,siteid;\", 'parquet.compression'='snappy');";
dorisAssert.query(sql).explainQuery();
// support parquet for broker
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");";
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET " +
"PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " +
"\"broker.hadoop.security.authentication\" = \"kerberos\", " +
"\"broker.kerberos_principal\" = \"test\", " +
"\"broker.kerberos_keytab_content\" = \"test\" , " +
"\"schema\"=\"required,byte_array,siteid;\");";
dorisAssert.query(sql).explainQuery();
// do not support large int type
try {
sql = "SELECT k5 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");";
sql = "SELECT k5 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET " +
"PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " +
"\"broker.hadoop.security.authentication\" = \"kerberos\", " +
"\"broker.kerberos_principal\" = \"test\", " +
"\"broker.kerberos_keytab_content\" = \"test\" ," +
" \"schema\"=\"required,int32,siteid;\");";
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("currently parquet do not support largeint type"));
e.printStackTrace();
Assert.assertTrue(e.getMessage().contains("Parquet format does not support column type: LARGEINT"));
}
// do not support large int type, contains function
try {
sql = "SELECT sum(k5) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");";
sql = "SELECT sum(k5) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" " +
"FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " +
"\"broker.hadoop.security.authentication\" = \"kerberos\", " +
"\"broker.kerberos_principal\" = \"test\", " +
"\"broker.kerberos_keytab_content\" = \"test\" , " +
"\"schema\"=\"required,int32,siteid;\");";
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("currently parquet do not support largeint type"));
Assert.assertTrue(e.getMessage().contains("Parquet format does not support column type: LARGEINT"));
}
// support cast
try {
sql = "SELECT cast(sum(k5) as bigint) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" " +
"FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " +
"\"broker.hadoop.security.authentication\" = \"kerberos\", " +
"\"broker.kerberos_principal\" = \"test\", " +
"\"broker.kerberos_keytab_content\" = \"test\" , " +
"\"schema\"=\"required,int64,siteid;\");";
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}