diff --git a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index 82873a5815..945c726890 100644 --- a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -35,7 +35,7 @@ Syntax: CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name (column_definition1[, column_definition2, ...] [, index_definition1[, ndex_definition12,]]) - [ENGINE = [olap|mysql|broker]] + [ENGINE = [olap|mysql|broker|hive]] [key_desc] [COMMENT "table comment"] [partition_desc] @@ -104,7 +104,7 @@ Syntax: Notice: Only support BITMAP index in current version, BITMAP can only apply to single column 3. ENGINE type - Default is olap. Options are: olap, mysql, broker + Default is olap. Options are: olap, mysql, broker, hive 1) For mysql, properties should include: ``` @@ -123,7 +123,7 @@ Syntax: table_name in CREATE TABLE stmt is table is Doris. They can be different or same. MySQL table created in Doris is for accessing data in MySQL database. Doris does not maintain and store any data from MySQL table. - 1) For broker, properties should include: + 2) For broker, properties should include: ``` PROPERTIES ( @@ -145,6 +145,16 @@ Syntax: Notice: Files name in "path" is separated by ",". If file name includes ",", use "%2c" instead. If file name includes "%", use "%25" instead. Support CSV and Parquet. Support GZ, BZ2, LZ4, LZO(LZOP) + 3) For hive, properties should include: + ``` + PROPERTIES ( + "database" = "hive_db_name", + "table" = "hive_table_name", + "hive.metastore.uris" = "thrift://127.0.0.1:9083" + ) + ``` + "database" is the name of the database corresponding to the hive table, "table" is the name of the hive table, and "hive.metastore.uris" is the hive metastore service address. + Notice: At present, hive external tables are only used for Spark Load and query is not supported. 4. key_desc Syntax: key_type(k1[,k2 ...]) @@ -577,6 +587,23 @@ Syntax: PROPERTIES ("in_memory"="true"); ``` +13. Create a hive external table +``` + CREATE TABLE example_db.table_hive + ( + k1 TINYINT, + k2 VARCHAR(50), + v INT + ) + ENGINE=hive + PROPERTIES + ( + "database" = "hive_db_name", + "table" = "hive_table_name", + "hive.metastore.uris" = "thrift://127.0.0.1:9083" + ); +``` + ## keyword CREATE,TABLE diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index 3ded3b0025..1513fb7d5c 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -35,7 +35,7 @@ under the License. CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name (column_definition1[, column_definition2, ...] [, index_definition1[, ndex_definition12,]]) - [ENGINE = [olap|mysql|broker]] + [ENGINE = [olap|mysql|broker|hive]] [key_desc] [COMMENT "table comment"]; [partition_desc] @@ -111,7 +111,7 @@ under the License. 当前仅支持BITMAP索引, BITMAP索引仅支持应用于单列 3. ENGINE 类型 - 默认为 olap。可选 mysql, broker + 默认为 olap。可选 mysql, broker, hive 1) 如果是 mysql,则需要在 properties 提供以下信息: ``` @@ -127,11 +127,11 @@ under the License. 注意: "table" 条目中的 "table_name" 是 mysql 中的真实表名。 - 而 CREATE TABLE 语句中的 table_name 是该 mysql 表在 Palo 中的名字,可以不同。 + 而 CREATE TABLE 语句中的 table_name 是该 mysql 表在 Doris 中的名字,可以不同。 - 在 Palo 创建 mysql 表的目的是可以通过 Palo 访问 mysql 数据库。 - 而 Palo 本身并不维护、存储任何 mysql 数据。 - 1) 如果是 broker,表示表的访问需要通过指定的broker, 需要在 properties 提供以下信息: + 在 Doris 创建 mysql 表的目的是可以通过 Doris 访问 mysql 数据库。 + 而 Doris 本身并不维护、存储任何 mysql 数据。 + 2) 如果是 broker,表示表的访问需要通过指定的broker, 需要在 properties 提供以下信息: ``` PROPERTIES ( "broker_name" = "broker_name", @@ -152,6 +152,18 @@ under the License. "path" 中如果有多个文件,用逗号[,]分割。如果文件名中包含逗号,那么使用 %2c 来替代。如果文件名中包含 %,使用 %25 代替 现在文件内容格式支持CSV,支持GZ,BZ2,LZ4,LZO(LZOP) 压缩格式。 + 3) 如果是 hive,则需要在 properties 提供以下信息: + ``` + PROPERTIES ( + "database" = "hive_db_name", + "table" = "hive_table_name", + "hive.metastore.uris" = "thrift://127.0.0.1:9083" + ) + + ``` + 其中 database 是 hive 表对应的库名字,table 是 hive 表的名字,hive.metastore.uris 是 hive metastore 服务地址。 + 注意:目前hive外部表仅用于Spark Load使用,不支持查询。 + 1. key_desc 语法: `key_type(k1[,k2 ...])` @@ -162,7 +174,7 @@ under the License. 适合报表、多维分析等业务场景。 UNIQUE KEY:key列相同的记录,value列按导入顺序进行覆盖, 适合按key列进行增删改查的点查询业务。 - DUPLICATE KEY:key列相同的记录,同时存在于Palo中, + DUPLICATE KEY:key列相同的记录,同时存在于Doris中, 适合存储明细数据或者数据无聚合特性的业务场景。 默认为DUPLICATE KEY,key列为列定义中前36个字节, 如果前36个字节的列数小于3,将使用前三列。 注意: @@ -607,6 +619,24 @@ under the License. PROPERTIES ("in_memory"="true"); ``` +13. 创建一个hive外部表 + +``` + CREATE TABLE example_db.table_hive + ( + k1 TINYINT, + k2 VARCHAR(50), + v INT + ) + ENGINE=hive + PROPERTIES + ( + "database" = "hive_db_name", + "table" = "hive_table_name", + "hive.metastore.uris" = "thrift://127.0.0.1:9083" + ); +``` + ## keyword CREATE,TABLE diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index dce9763ed2..5d75cf3ca8 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -1244,6 +1244,15 @@ data_desc ::= RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, columnsFromPath, isNeg, colMappingList, whereExpr); :} + | KW_DATA KW_FROM KW_TABLE ident:srcTableName + opt_negative:isNeg + KW_INTO KW_TABLE ident:tableName + opt_partition_names:partitionNames + opt_col_mapping_list:colMappingList + where_clause:whereExpr + {: + RESULT = new DataDescription(tableName, partitionNames, srcTableName, isNeg, colMappingList, whereExpr); + :} ; opt_negative ::= diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 27d10e121c..9e7cd580ea 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -87,11 +87,15 @@ public class CreateTableStmt extends DdlStmt { engineNames.add("mysql"); engineNames.add("broker"); engineNames.add("elasticsearch"); + engineNames.add("hive"); } // for backup. set to -1 for normal use private int tableSignature; + // TODO(wyb): spark-load + private static boolean disableHiveTable = true; + public CreateTableStmt() { // for persist tableName = new TableName(); @@ -253,8 +257,12 @@ public class CreateTableStmt extends DdlStmt { analyzeEngineName(); + // TODO(wyb): spark-load + if (engineName.equals("hive") && disableHiveTable) { + throw new AnalysisException("Spark Load from hive table is comming soon"); + } // analyze key desc - if (!(engineName.equals("mysql") || engineName.equals("broker"))) { + if (!(engineName.equals("mysql") || engineName.equals("broker") || engineName.equals("hive"))) { // olap table if (keysDesc == null) { List keysColumnNames = Lists.newArrayList(); @@ -299,7 +307,7 @@ public class CreateTableStmt extends DdlStmt { } } } else { - // mysql and broker do not need key desc + // mysql, broker and hive do not need key desc if (keysDesc != null) { throw new AnalysisException("Create " + engineName + " table should not contain keys desc"); } @@ -433,7 +441,7 @@ public class CreateTableStmt extends DdlStmt { } if (engineName.equals("mysql") || engineName.equals("broker") - || engineName.equals("elasticsearch")) { + || engineName.equals("elasticsearch") || engineName.equals("hive")) { if (!isExternal) { // this is for compatibility isExternal = true; diff --git a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java index 57cd8db5d4..7a1f339aae 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -58,6 +58,14 @@ import java.util.TreeSet; // [(tmp_col1, tmp_col2, col3, ...)] // [COLUMNS FROM PATH AS (col1, ...)] // [SET (k1=f1(xx), k2=f2(xxx))] +// [where_clause] +// +// DATA FROM TABLE external_hive_tbl_name +// [NEGATIVE] +// INTO TABLE tbl_name +// [PARTITION (p1, p2)] +// [SET (k1=f1(xx), k2=f2(xxx))] +// [where_clause] /** * The transform of columns should be added after the keyword named COLUMNS. @@ -94,6 +102,8 @@ public class DataDescription { private final List columnMappingList; private final Expr whereExpr; + private final String srcTableName; + // Used for mini load private TNetworkAddress beAddr; private String lineDelimiter; @@ -141,6 +151,27 @@ public class DataDescription { this.isNegative = isNegative; this.columnMappingList = columnMappingList; this.whereExpr = whereExpr; + this.srcTableName = null; + } + + // data from table external_hive_table + public DataDescription(String tableName, + PartitionNames partitionNames, + String srcTableName, + boolean isNegative, + List columnMappingList, + Expr whereExpr) { + this.tableName = tableName; + this.partitionNames = partitionNames; + this.filePaths = null; + this.fileFieldNames = null; + this.columnSeparator = null; + this.fileFormat = null; + this.columnsFromPath = null; + this.isNegative = isNegative; + this.columnMappingList = columnMappingList; + this.whereExpr = whereExpr; + this.srcTableName = srcTableName; } public String getTableName() { @@ -225,6 +256,14 @@ public class DataDescription { return isHadoopLoad; } + public String getSrcTableName() { + return srcTableName; + } + + public boolean isLoadFromTable() { + return !Strings.isNullOrEmpty(srcTableName); + } + /* * Analyze parsedExprMap and columnToHadoopFunction from columns, columns from path and columnMappingList * Example: @@ -530,6 +569,16 @@ public class DataDescription { ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), tableName); } + + // check hive table auth + if (isLoadFromTable()) { + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), fullDbName, srcTableName, + PrivPredicate.SELECT)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "SELECT", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), srcTableName); + } + } } public void analyze(String fullDbName) throws AnalysisException { @@ -538,11 +587,13 @@ public class DataDescription { } public void analyzeWithoutCheckPriv() throws AnalysisException { - if (filePaths == null || filePaths.isEmpty()) { - throw new AnalysisException("No file path in load statement."); - } - for (int i = 0; i < filePaths.size(); ++i) { - filePaths.set(i, filePaths.get(i).trim()); + if (!isLoadFromTable()) { + if (filePaths == null || filePaths.isEmpty()) { + throw new AnalysisException("No file path in load statement."); + } + for (int i = 0; i < filePaths.size(); ++i) { + filePaths.set(i, filePaths.get(i).trim()); + } } if (columnSeparator != null) { @@ -601,13 +652,17 @@ public class DataDescription { public String toSql() { StringBuilder sb = new StringBuilder(); - sb.append("DATA INFILE ("); - Joiner.on(", ").appendTo(sb, Lists.transform(filePaths, new Function() { - @Override - public String apply(String s) { - return "'" + s + "'"; - } - })).append(")"); + if (isLoadFromTable()) { + sb.append("DATA FROM TABLE ").append(srcTableName); + } else { + sb.append("DATA INFILE ("); + Joiner.on(", ").appendTo(sb, Lists.transform(filePaths, new Function() { + @Override + public String apply(String s) { + return "'" + s + "'"; + } + })).append(")"); + } if (isNegative) { sb.append(" NEGATIVE"); } diff --git a/fe/src/main/java/org/apache/doris/analysis/FromClause.java b/fe/src/main/java/org/apache/doris/analysis/FromClause.java index 1120675580..b5f1b12889 100644 --- a/fe/src/main/java/org/apache/doris/analysis/FromClause.java +++ b/fe/src/main/java/org/apache/doris/analysis/FromClause.java @@ -23,9 +23,15 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Table; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import com.google.common.base.Strings; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -72,6 +78,39 @@ public class FromClause implements ParseNode, Iterable { }); } + private void checkFromHiveTable(Analyzer analyzer) throws AnalysisException { + for (TableRef tblRef : tableRefs_) { + if (!(tblRef instanceof BaseTableRef)) { + continue; + } + + TableName tableName = tblRef.getName(); + String dbName = tableName.getDb(); + if (Strings.isNullOrEmpty(dbName)) { + dbName = analyzer.getDefaultDb(); + } else { + dbName = ClusterNamespace.getFullName(analyzer.getClusterName(), tblRef.getName().getDb()); + } + if (Strings.isNullOrEmpty(dbName)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + + Database db = analyzer.getCatalog().getDb(dbName); + if (db == null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, dbName); + } + + String tblName = tableName.getTbl(); + Table table = db.getTable(tblName); + if (table == null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_TABLE_ERROR, tblName); + } + if (table.getType() == Table.TableType.HIVE) { + throw new AnalysisException("Query from hive table is not supported, table: " + tblName); + } + } + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { if (analyzed_) return; @@ -98,6 +137,9 @@ public class FromClause implements ParseNode, Iterable { leftTblRef = tblRef; } + // TODO: remove when query from hive table is supported + checkFromHiveTable(analyzer); + analyzed_ = true; } diff --git a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java index 96593b6c04..a9f87c2f0d 100644 --- a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -25,7 +25,6 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.Load; -import org.apache.doris.load.loadv2.SparkLoadJob; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -176,11 +175,6 @@ public class LoadStmt extends DdlStmt { } for (Entry entry : properties.entrySet()) { - // temporary use for global dict - if (entry.getKey().startsWith(SparkLoadJob.BITMAP_DATA_PROPERTY)) { - continue; - } - if (!PROPERTIES_SET.contains(entry.getKey())) { throw new DdlException(entry.getKey() + " is invalid property"); } @@ -267,11 +261,27 @@ public class LoadStmt extends DdlStmt { if (dataDescriptions == null || dataDescriptions.isEmpty()) { throw new AnalysisException("No data file in load statement."); } + // check data descriptions, support 2 cases bellow: + // case 1: muti file paths, muti data descriptions + // case 2: one hive table, one data description + boolean isLoadFromTable = false; for (DataDescription dataDescription : dataDescriptions) { if (brokerDesc == null && resourceDesc == null) { dataDescription.setIsHadoopLoad(true); } dataDescription.analyze(label.getDbName()); + + if (dataDescription.isLoadFromTable()) { + isLoadFromTable = true; + } + } + if (isLoadFromTable) { + if (dataDescriptions.size() > 1) { + throw new AnalysisException("Only support one olap table load from one external table"); + } + if (resourceDesc == null) { + throw new AnalysisException("Load from table should use Spark Load"); + } } if (resourceDesc != null) { @@ -296,7 +306,7 @@ public class LoadStmt extends DdlStmt { // if cluster is not null, use this hadoop cluster etlJobType = EtlJobType.HADOOP; } - + try { checkProperties(properties); } catch (DdlException e) { diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 37b70c5cbe..773c76ee10 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -2964,6 +2964,9 @@ public class Catalog { } else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) { createEsTable(db, stmt); return; + } else if (engineName.equalsIgnoreCase("hive")) { + createHiveTable(db, stmt); + return; } else { ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName); } @@ -3908,6 +3911,18 @@ public class Catalog { return; } + private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException { + String tableName = stmt.getTableName(); + List columns = stmt.getColumns(); + long tableId = getNextId(); + HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties()); + hiveTable.setComment(stmt.getComment()); + if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists())) { + ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist"); + } + LOG.info("successfully create table[{}-{}]", tableName, tableId); + } + public static void getDdlStmt(Table table, List createTableStmt, List addPartitionStmt, List createRollupStmt, boolean separatePartition, boolean hidePassword) { StringBuilder sb = new StringBuilder(); @@ -3925,7 +3940,7 @@ public class Catalog { // 1.2 other table type sb.append("CREATE "); if (table.getType() == TableType.MYSQL || table.getType() == TableType.ELASTICSEARCH - || table.getType() == TableType.BROKER) { + || table.getType() == TableType.BROKER || table.getType() == TableType.HIVE) { sb.append("EXTERNAL "); } sb.append("TABLE "); @@ -4097,6 +4112,17 @@ public class Catalog { sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n"); sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n"); sb.append(")"); + } else if (table.getType() == TableType.HIVE) { + HiveTable hiveTable = (HiveTable) table; + if (!Strings.isNullOrEmpty(table.getComment())) { + sb.append("\nCOMMENT \"").append(table.getComment()).append("\""); + } + // properties + sb.append("\nPROPERTIES (\n"); + sb.append("\"database\" = \"").append(hiveTable.getHiveDb()).append("\",\n"); + sb.append("\"table\" = \"").append(hiveTable.getHiveTable()).append("\",\n"); + sb.append(new PrintableMap<>(hiveTable.getHiveProperties(), " = ", true, true, false).toString()); + sb.append("\n)"); } sb.append(";"); diff --git a/fe/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/src/main/java/org/apache/doris/catalog/HiveTable.java new file mode 100644 index 0000000000..919ebfcb19 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/catalog/HiveTable.java @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.common.DdlException; +import org.apache.doris.common.io.Text; + +import com.google.common.base.Strings; +import com.google.common.collect.Maps; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * External hive table + * Currently only support loading from hive table + */ +public class HiveTable extends Table { + private static final String PROPERTY_MISSING_MSG = "Hive %s is null. Please add properties('%s'='xxx') when create table"; + + private static final String HIVE_DB = "database"; + private static final String HIVE_TABLE = "table"; + private static final String HIVE_METASTORE_URIS = "hive.metastore.uris"; + + private String hiveDb; + private String hiveTable; + private Map hiveProperties = Maps.newHashMap(); + + public HiveTable() { + super(TableType.HIVE); + } + + public HiveTable(long id, String name, List schema, Map properties) throws DdlException { + super(id, name, TableType.HIVE, schema); + validate(properties); + } + + public String getHiveDbTable() { + return String.format("%s.%s", hiveDb, hiveTable); + } + + public String getHiveDb() { + return hiveDb; + } + + public String getHiveTable() { + return hiveTable; + } + + public Map getHiveProperties() { + return hiveProperties; + } + + private void validate(Map properties) throws DdlException { + if (properties == null) { + throw new DdlException("Please set properties of hive table, " + + "they are: database, table and 'hive.metastore.uris'"); + } + + Map copiedProps = Maps.newHashMap(properties); + hiveDb = copiedProps.get(HIVE_DB); + if (Strings.isNullOrEmpty(hiveDb)) { + throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_DB, HIVE_DB)); + } + copiedProps.remove(HIVE_DB); + + hiveTable = copiedProps.get(HIVE_TABLE); + if (Strings.isNullOrEmpty(hiveTable)) { + throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_TABLE, HIVE_TABLE)); + } + copiedProps.remove(HIVE_TABLE); + + // check hive properties + // hive.metastore.uris + String hiveMetastoreUris = copiedProps.get(HIVE_METASTORE_URIS); + if (Strings.isNullOrEmpty(hiveMetastoreUris)) { + throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_METASTORE_URIS, HIVE_METASTORE_URIS)); + } + copiedProps.remove(HIVE_METASTORE_URIS); + hiveProperties.put(HIVE_METASTORE_URIS, hiveMetastoreUris); + + if (!copiedProps.isEmpty()) { + throw new DdlException("Unknown table properties: " + copiedProps.toString()); + } + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + + Text.writeString(out, hiveDb); + Text.writeString(out, hiveTable); + out.writeInt(hiveProperties.size()); + for (Map.Entry entry : hiveProperties.entrySet()) { + Text.writeString(out, entry.getKey()); + Text.writeString(out, entry.getValue()); + } + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + hiveDb = Text.readString(in); + hiveTable = Text.readString(in); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + String key = Text.readString(in); + String val = Text.readString(in); + hiveProperties.put(key, val); + } + } +} diff --git a/fe/src/main/java/org/apache/doris/catalog/Table.java b/fe/src/main/java/org/apache/doris/catalog/Table.java index 5c383433f4..e6f0bd4af1 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/src/main/java/org/apache/doris/catalog/Table.java @@ -52,7 +52,8 @@ public class Table extends MetaObject implements Writable { INLINE_VIEW, VIEW, BROKER, - ELASTICSEARCH + ELASTICSEARCH, + HIVE } protected long id; @@ -179,6 +180,8 @@ public class Table extends MetaObject implements Writable { table = new BrokerTable(); } else if (type == TableType.ELASTICSEARCH) { table = new EsTable(); + } else if (type == TableType.HIVE) { + table = new HiveTable(); } else { throw new IOException("Unknown table type: " + type.name()); } diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java index b8182be058..78115b57e9 100644 --- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.HiveTable; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; @@ -78,6 +79,10 @@ public class BrokerFileGroup implements Writable { // filter the data which has been conformed private Expr whereExpr; + // load from table + private long srcTableId = -1; + private boolean isLoadFromTable = false; + // for unit test and edit log persistence private BrokerFileGroup() { } @@ -165,6 +170,33 @@ public class BrokerFileGroup implements Writable { // FilePath filePaths = dataDescription.getFilePaths(); + + if (dataDescription.isLoadFromTable()) { + String srcTableName = dataDescription.getSrcTableName(); + // src table should be hive table + Table srcTable = db.getTable(srcTableName); + if (srcTable == null) { + throw new DdlException("Unknown table " + srcTableName + " in database " + db.getFullName()); + } + if (!(srcTable instanceof HiveTable)) { + throw new DdlException("Source table " + srcTableName + " is not HiveTable"); + } + // src table columns should include all columns of loaded table + for (Column column : olapTable.getBaseSchema()) { + boolean isIncluded = false; + for (Column srcColumn : srcTable.getBaseSchema()) { + if (srcColumn.getName().equalsIgnoreCase(column.getName())) { + isIncluded = true; + break; + } + } + if (!isIncluded) { + throw new DdlException("Column " + column.getName() + " is not in Source table"); + } + } + srcTableId = srcTable.getId(); + isLoadFromTable = true; + } } public long getTableId() { @@ -207,10 +239,22 @@ public class BrokerFileGroup implements Writable { return columnExprList; } + public List getFileFieldNames() { + return fileFieldNames; + } + public Map>> getColumnToHadoopFunction() { return columnToHadoopFunction; } + public long getSrcTableId() { + return srcTableId; + } + + public boolean isLoadFromTable() { + return isLoadFromTable; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -261,6 +305,8 @@ public class BrokerFileGroup implements Writable { sb.append(path); } sb.append("]"); + sb.append(",srcTableId=").append(srcTableId); + sb.append(",isLoadFromTable=").append(isLoadFromTable); sb.append("}"); return sb.toString(); @@ -310,6 +356,13 @@ public class BrokerFileGroup implements Writable { out.writeBoolean(true); Text.writeString(out, fileFormat); } + + // src table + // TODO(wyb): spark-load + /* + out.writeLong(srcTableId); + out.writeBoolean(isLoadFromTable); + */ } public void readFields(DataInput in) throws IOException { @@ -360,6 +413,14 @@ public class BrokerFileGroup implements Writable { fileFormat = Text.readString(in); } } + // src table + // TODO(wyb): spark-load + /* + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.new_version) { + srcTableId = in.readLong(); + isLoadFromTable = in.readBoolean(); + } + */ // There are no columnExprList in the previous load job which is created before function is supported. // The columnExprList could not be analyzed without origin stmt in the previous load job. diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 387452b976..8bb68f764e 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -58,9 +58,6 @@ import java.util.Set; public class SparkLoadJob extends BulkLoadJob { private static final Logger LOG = LogManager.getLogger(SparkLoadJob.class); - // for global dict - public static final String BITMAP_DATA_PROPERTY = "bitmap_data"; - // --- members below need persist --- // create from resourceDesc when job created private SparkResource sparkResource; @@ -75,10 +72,6 @@ public class SparkLoadJob extends BulkLoadJob { private Map> tabletMetaToFileInfo = Maps.newHashMap(); // --- members below not persist --- - // temporary use - // one SparkLoadJob has only one table to load - // hivedb.table for global dict - private String hiveTableName = ""; private ResourceDesc resourceDesc; // for spark standalone private SparkAppHandle sparkAppHandle; @@ -107,23 +100,12 @@ public class SparkLoadJob extends BulkLoadJob { jobType = EtlJobType.SPARK; } - public String getHiveTableName() { - return hiveTableName; - } - @Override protected void setJobProperties(Map properties) throws DdlException { super.setJobProperties(properties); // set spark resource and broker desc setResourceInfo(); - - // global dict - if (properties != null) { - if (properties.containsKey(BITMAP_DATA_PROPERTY)) { - hiveTableName = properties.get(BITMAP_DATA_PROPERTY); - } - } } /** @@ -190,7 +172,6 @@ public class SparkLoadJob extends BulkLoadJob { } } // clear job infos that not persist - hiveTableName = ""; sparkAppHandle = null; resourceDesc = null; tableToLoadPartitions.clear(); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java index 341633c1ba..b9767f7602 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java @@ -243,14 +243,19 @@ public class EtlJobConfig implements Serializable { } } - public static enum ConfigVersion { + public enum ConfigVersion { V1 } - public static enum FilePatternVersion { + public enum FilePatternVersion { V1 } + public enum SourceType { + FILE, + HIVE + } + public static class EtlTable implements Serializable { @SerializedName(value = "indexes") public List indexes; @@ -454,6 +459,8 @@ public class EtlJobConfig implements Serializable { } public static class EtlFileGroup implements Serializable { + @SerializedName(value = "sourceType") + public SourceType sourceType = SourceType.FILE; @SerializedName(value = "filePaths") public List filePaths; @SerializedName(value = "fileFieldNames") @@ -474,12 +481,17 @@ public class EtlJobConfig implements Serializable { public String where; @SerializedName(value = "partitions") public List partitions; - @SerializedName(value = "hiveTableName") - public String hiveTableName; + @SerializedName(value = "hiveDbTableName") + public String hiveDbTableName; + @SerializedName(value = "hiveTableProperties") + public Map hiveTableProperties; - public EtlFileGroup(List filePaths, List fileFieldNames, List columnsFromPath, - String columnSeparator, String lineDelimiter, boolean isNegative, String fileFormat, - Map columnMappings, String where, List partitions) { + // for data infile path + public EtlFileGroup(SourceType sourceType, List filePaths, List fileFieldNames, + List columnsFromPath, String columnSeparator, String lineDelimiter, + boolean isNegative, String fileFormat, Map columnMappings, + String where, List partitions) { + this.sourceType = sourceType; this.filePaths = filePaths; this.fileFieldNames = fileFieldNames; this.columnsFromPath = columnsFromPath; @@ -492,10 +504,24 @@ public class EtlJobConfig implements Serializable { this.partitions = partitions; } + // for data from table + public EtlFileGroup(SourceType sourceType, String hiveDbTableName, Map hiveTableProperties, + boolean isNegative, Map columnMappings, + String where, List partitions) { + this.sourceType = sourceType; + this.hiveDbTableName = hiveDbTableName; + this.hiveTableProperties = hiveTableProperties; + this.isNegative = isNegative; + this.columnMappings = columnMappings; + this.where = where; + this.partitions = partitions; + } + @Override public String toString() { return "EtlFileGroup{" + - "filePaths=" + filePaths + + "sourceType=" + sourceType + + ", filePaths=" + filePaths + ", fileFieldNames=" + fileFieldNames + ", columnsFromPath=" + columnsFromPath + ", columnSeparator='" + columnSeparator + '\'' + @@ -505,7 +531,8 @@ public class EtlJobConfig implements Serializable { ", columnMappings=" + columnMappings + ", where='" + where + '\'' + ", partitions=" + partitions + - ", hiveTableName='" + hiveTableName + '\'' + + ", hiveDbTableName='" + hiveDbTableName + '\'' + + ", hiveTableProperties=" + hiveTableProperties + '}'; } } diff --git a/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java index fbad0ae4b7..413f17f57d 100644 --- a/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java @@ -144,6 +144,17 @@ public class DataDescriptionTest { sql = "DATA INFILE ('abc.txt') INTO TABLE testTable PARTITIONS (p1, p2) (k2, k3)" + " SET (`k1` = replace_value('', NULL))"; Assert.assertEquals(sql, desc.toString()); + + // data from table and set bitmap_dict + params.clear(); + params.add(new SlotRef(null, "k2")); + predicate = new BinaryPredicate(Operator.EQ, new SlotRef(null, "k1"), + new FunctionCallExpr("bitmap_dict", params)); + desc = new DataDescription("testTable", new PartitionNames(false, Lists.newArrayList("p1", "p2")), + "testHiveTable", false, Lists.newArrayList(predicate), null); + desc.analyze("testDb"); + sql = "DATA FROM TABLE testHiveTable INTO TABLE testTable PARTITIONS (p1, p2) SET (`k1` = bitmap_dict(`k2`))"; + Assert.assertEquals(sql, desc.toSql()); } @Test(expected = AnalysisException.class) diff --git a/fe/src/test/java/org/apache/doris/catalog/HiveTableTest.java b/fe/src/test/java/org/apache/doris/catalog/HiveTableTest.java new file mode 100644 index 0000000000..00b42a4ec0 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/catalog/HiveTableTest.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.common.DdlException; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +public class HiveTableTest { + private String hiveDb; + private String hiveTable; + private List columns; + private Map properties; + + @Before + public void setUp() { + hiveDb = "db0"; + hiveTable = "table0"; + + columns = Lists.newArrayList(); + Column column = new Column("col1", PrimitiveType.BIGINT); + columns.add(column); + + properties = Maps.newHashMap(); + properties.put("database", hiveDb); + properties.put("table", hiveTable); + properties.put("hive.metastore.uris", "thrift://127.0.0.1:9083"); + } + + @Test + public void testNormal() throws DdlException { + HiveTable table = new HiveTable(1000, "hive_table", columns, properties); + Assert.assertEquals(String.format("%s.%s", hiveDb, hiveTable), table.getHiveDbTable()); + Assert.assertEquals(1, table.getHiveProperties().size()); + } + + @Test(expected = DdlException.class) + public void testNoDb() throws DdlException { + properties.remove("database"); + new HiveTable(1000, "hive_table", columns, properties); + Assert.fail("No exception throws."); + } + + @Test(expected = DdlException.class) + public void testNoTbl() throws DdlException { + properties.remove("table"); + new HiveTable(1000, "hive_table", columns, properties); + Assert.fail("No exception throws."); + } + + @Test(expected = DdlException.class) + public void testNoHiveMetastoreUris() throws DdlException { + properties.remove("hive.metastore.uris"); + new HiveTable(1000, "hive_table", columns, properties); + Assert.fail("No exception throws."); + } +}