diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 0d571d46b2..2a5acd6f55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -26,12 +26,16 @@ import org.apache.doris.common.util.ParseUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TResultFileSinkOptions; + +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -139,7 +143,11 @@ public class OutFileClause { return brokerDesc; } - public void analyze(Analyzer analyzer) throws AnalysisException { + public List> getSchema() { + return schema; + } + + private void analyze(Analyzer analyzer) throws AnalysisException { analyzeFilePath(); if (Strings.isNullOrEmpty(filePath)) { @@ -167,14 +175,114 @@ public class OutFileClause { public void analyze(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { analyze(analyzer); - List items = stmt.getSelectList().getItems(); - for (SelectListItem item:items) { - if (item.getExpr().getType() == Type.LARGEINT && isParquetFormat()) { - throw new AnalysisException("currently parquet do not support largeint type"); + + if (isParquetFormat()) { + analyzeForParquetFormat(stmt.getResultExprs()); + } + } + + private void analyzeForParquetFormat(List resultExprs) throws AnalysisException { + if (this.schema.isEmpty()) { + genParquetSchema(resultExprs); + } + + // check schema number + if (resultExprs.size() != this.schema.size()) { + throw new AnalysisException("Parquet schema number does not equal to select item number"); + } + + // check type + for (int i = 0; i < this.schema.size(); ++i) { + String type = this.schema.get(i).get(1); + Type resultType = resultExprs.get(i).getType(); + switch (resultType.getPrimitiveType()) { + case BOOLEAN: + if (!type.equals("boolean")) { + throw new AnalysisException("project field type is BOOLEAN, should use boolean, but the type of column " + + i + " is " + type); + } + break; + case TINYINT: + case SMALLINT: + case INT: + if (!type.equals("int32")) { + throw new AnalysisException("project field type is TINYINT/SMALLINT/INT, should use int32, " + + "but the definition type of column " + i + " is " + type); + } + break; + case BIGINT: + case DATE: + case DATETIME: + if (!type.equals("int64")) { + throw new AnalysisException("project field type is BIGINT/DATE/DATETIME, should use int64, " + + "but the definition type of column " + i + " is " + type); + } + break; + case FLOAT: + if (!type.equals("float")) { + throw new AnalysisException("project field type is FLOAT, should use float, but the definition type of column " + + i + " is " + type); + } + break; + case DOUBLE: + if (!type.equals("double")) { + throw new AnalysisException("project field type is DOUBLE, should use double, but the definition type of column " + + i + " is " + type); + } + break; + case CHAR: + case VARCHAR: + case DECIMALV2: + if (!type.equals("byte_array")) { + throw new AnalysisException("project field type is CHAR/VARCHAR/DECIMAL, should use byte_array, " + + "but the definition type of column " + i + " is " + type); + } + break; + default: + throw new AnalysisException("Parquet format does not support column type: " + resultType.getPrimitiveType()); } } } + private void genParquetSchema(List resultExprs) throws AnalysisException { + Preconditions.checkState(this.schema.isEmpty()); + for (int i = 0; i < resultExprs.size(); ++i) { + Expr expr = resultExprs.get(i); + List column = new ArrayList<>(); + column.add("required"); + switch (expr.getType().getPrimitiveType()) { + case BOOLEAN: + column.add("boolean"); + break; + case TINYINT: + case SMALLINT: + case INT: + column.add("int32"); + break; + case BIGINT: + case DATE: + case DATETIME: + column.add("int64"); + break; + case FLOAT: + column.add("float"); + break; + case DOUBLE: + column.add("double"); + break; + case CHAR: + case VARCHAR: + case DECIMALV2: + column.add("byte_array"); + break; + default: + throw new AnalysisException("currently parquet do not support column type: " + expr.getType().getPrimitiveType()); + } + column.add("col" + i); + this.schema.add(column); + } + } + private void analyzeFilePath() throws AnalysisException { if (Strings.isNullOrEmpty(filePath)) { throw new AnalysisException("Must specify file in OUTFILE clause"); @@ -238,7 +346,6 @@ public class OutFileClause { throw new AnalysisException("Unknown properties: " + properties.keySet().stream() .filter(k -> !processedPropKeys.contains(k)).collect(Collectors.toList())); } - } private void getBrokerProperties(Set processedPropKeys) { @@ -273,14 +380,28 @@ public class OutFileClause { * currently only supports: compression, disable_dictionary, version */ private void getParquetProperties(Set processedPropKeys) throws AnalysisException { - String schema = properties.get(SCHEMA); - if (schema == null || schema.length() <= 0) { - throw new AnalysisException("schema is required for parquet file"); + // save all parquet prefix property + Iterator> iter = properties.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) { + processedPropKeys.add(entry.getKey()); + fileProperties.put(entry.getKey().substring(PARQUET_PROP_PREFIX.length()), entry.getValue()); + } } - schema = schema.replace(" ",""); + + // check schema. if schema is not set, Doris will gen schema by select items + String schema = properties.get(SCHEMA); + if (schema == null) { + return; + } + if (schema.isEmpty()) { + throw new AnalysisException("Parquet schema property should not be empty"); + } + schema = schema.replace(" ", ""); schema = schema.toLowerCase(); String[] schemas = schema.split(";"); - for (String item:schemas) { + for (String item : schemas) { String[] properties = item.split(","); if (properties.length != 3) { throw new AnalysisException("must only contains repetition type/column type/column name"); @@ -299,14 +420,6 @@ public class OutFileClause { this.schema.add(column); } processedPropKeys.add(SCHEMA); - Iterator> iter = properties.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) { - processedPropKeys.add(entry.getKey()); - fileProperties.put(entry.getKey().substring(PARQUET_PROP_PREFIX.length()), entry.getValue()); - } - } } private boolean isCsvFormat() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java index 728e994468..20bbb1acc3 100755 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java @@ -17,8 +17,6 @@ package org.apache.doris.analysis; -import mockit.Mock; -import mockit.MockUp; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.util.Util; @@ -27,8 +25,10 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.VariableMgr; import org.apache.doris.utframe.DorisAssert; import org.apache.doris.utframe.UtFrameUtils; + import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -40,6 +40,9 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import mockit.Mock; +import mockit.MockUp; + public class SelectStmtTest { private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/"; private static DorisAssert dorisAssert; @@ -584,15 +587,27 @@ public class SelectStmtTest { public void testOutfile() throws Exception { ConnectContext ctx = UtFrameUtils.createDefaultCtx(); Config.enable_outfile_to_local = true; - String sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int32,siteid;required,byte_array,username;\");"; + String sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,byte_array,col0\");"; dorisAssert.query(sql).explainQuery(); - // must contains schema + // if shema not set, gen schema sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET;"; try { SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + Assert.assertEquals(1, stmt.getOutFileClause().getSchema().size()); + Assert.assertEquals(Lists.newArrayList("required", "byte_array", "col0"), + stmt.getOutFileClause().getSchema().get(0)); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("schema is required for parquet file")); + Assert.fail(e.getMessage()); } + + // schema can not be empty + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"\");"; + try { + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Parquet schema property should not be empty")); + } + // schema must contains 3 fields sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"int32,siteid;\");"; try { @@ -600,6 +615,7 @@ public class SelectStmtTest { } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("must only contains repetition type/column type/column name")); } + // unknown repetition type sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"repeat, int32,siteid;\");"; try { @@ -607,6 +623,7 @@ public class SelectStmtTest { } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("unknown repetition type")); } + // only support required type sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"repeated,int32,siteid;\");"; try { @@ -614,6 +631,7 @@ public class SelectStmtTest { } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("currently only support required type")); } + // unknown data type sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int128,siteid;\");"; try { @@ -621,26 +639,57 @@ public class SelectStmtTest { } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("data type is not supported")); } + // contains parquet properties - sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int32,siteid;\", 'parquet.compression'='snappy');"; + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,byte_array,siteid;\", 'parquet.compression'='snappy');"; dorisAssert.query(sql).explainQuery(); // support parquet for broker - sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");"; + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET " + + "PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " + + "\"broker.hadoop.security.authentication\" = \"kerberos\", " + + "\"broker.kerberos_principal\" = \"test\", " + + "\"broker.kerberos_keytab_content\" = \"test\" , " + + "\"schema\"=\"required,byte_array,siteid;\");"; + dorisAssert.query(sql).explainQuery(); // do not support large int type try { - sql = "SELECT k5 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");"; + sql = "SELECT k5 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET " + + "PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " + + "\"broker.hadoop.security.authentication\" = \"kerberos\", " + + "\"broker.kerberos_principal\" = \"test\", " + + "\"broker.kerberos_keytab_content\" = \"test\" ," + + " \"schema\"=\"required,int32,siteid;\");"; SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("currently parquet do not support largeint type")); + e.printStackTrace(); + Assert.assertTrue(e.getMessage().contains("Parquet format does not support column type: LARGEINT")); } // do not support large int type, contains function try { - sql = "SELECT sum(k5) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");"; + sql = "SELECT sum(k5) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" " + + "FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " + + "\"broker.hadoop.security.authentication\" = \"kerberos\", " + + "\"broker.kerberos_principal\" = \"test\", " + + "\"broker.kerberos_keytab_content\" = \"test\" , " + + "\"schema\"=\"required,int32,siteid;\");"; SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("currently parquet do not support largeint type")); + Assert.assertTrue(e.getMessage().contains("Parquet format does not support column type: LARGEINT")); + } + + // support cast + try { + sql = "SELECT cast(sum(k5) as bigint) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" " + + "FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " + + "\"broker.hadoop.security.authentication\" = \"kerberos\", " + + "\"broker.kerberos_principal\" = \"test\", " + + "\"broker.kerberos_keytab_content\" = \"test\" , " + + "\"schema\"=\"required,int64,siteid;\");"; + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + } catch (Exception e) { + Assert.fail(e.getMessage()); } } }