[Spark load][Fe 4/6] Add hive external table and update hive table syntax in loadstmt (#3819)

* Add hive external table and update hive table syntax in loadstmt

* Move check hive table from SelectStmt to FromClause and update doc

* Update hive external table en sql reference
This commit is contained in:
Mingyu Chen
2020-06-13 16:28:24 +08:00
committed by GitHub
15 changed files with 561 additions and 62 deletions

View File

@ -35,7 +35,7 @@ Syntax:
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name
(column_definition1[, column_definition2, ...]
[, index_definition1[, ndex_definition12,]])
[ENGINE = [olap|mysql|broker]]
[ENGINE = [olap|mysql|broker|hive]]
[key_desc]
[COMMENT "table comment"]
[partition_desc]
@ -104,7 +104,7 @@ Syntax:
Notice:
Only support BITMAP index in current version, BITMAP can only apply to single column
3. ENGINE type
Default is olap. Options are: olap, mysql, broker
Default is olap. Options are: olap, mysql, broker, hive
1) For mysql, properties should include:
```
@ -123,7 +123,7 @@ Syntax:
table_name in CREATE TABLE stmt is table is Doris. They can be different or same.
MySQL table created in Doris is for accessing data in MySQL database.
Doris does not maintain and store any data from MySQL table.
1) For broker, properties should include:
2) For broker, properties should include:
```
PROPERTIES (
@ -145,6 +145,16 @@ Syntax:
Notice:
Files name in "path" is separated by ",". If file name includes ",", use "%2c" instead. If file name includes "%", use "%25" instead.
Support CSV and Parquet. Support GZ, BZ2, LZ4, LZO(LZOP)
3) For hive, properties should include:
```
PROPERTIES (
"database" = "hive_db_name",
"table" = "hive_table_name",
"hive.metastore.uris" = "thrift://127.0.0.1:9083"
)
```
"database" is the name of the database corresponding to the hive table, "table" is the name of the hive table, and "hive.metastore.uris" is the hive metastore service address.
Notice: At present, hive external tables are only used for Spark Load and query is not supported.
4. key_desc
Syntax:
key_type(k1[,k2 ...])
@ -577,6 +587,23 @@ Syntax:
PROPERTIES ("in_memory"="true");
```
13. Create a hive external table
```
CREATE TABLE example_db.table_hive
(
k1 TINYINT,
k2 VARCHAR(50),
v INT
)
ENGINE=hive
PROPERTIES
(
"database" = "hive_db_name",
"table" = "hive_table_name",
"hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
```
## keyword
CREATE,TABLE

View File

@ -35,7 +35,7 @@ under the License.
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name
(column_definition1[, column_definition2, ...]
[, index_definition1[, ndex_definition12,]])
[ENGINE = [olap|mysql|broker]]
[ENGINE = [olap|mysql|broker|hive]]
[key_desc]
[COMMENT "table comment"];
[partition_desc]
@ -111,7 +111,7 @@ under the License.
当前仅支持BITMAP索引, BITMAP索引仅支持应用于单列
3. ENGINE 类型
默认为 olap。可选 mysql, broker
默认为 olap。可选 mysql, broker, hive
1) 如果是 mysql,则需要在 properties 提供以下信息:
```
@ -127,11 +127,11 @@ under the License.
注意:
"table" 条目中的 "table_name" 是 mysql 中的真实表名。
而 CREATE TABLE 语句中的 table_name 是该 mysql 表在 Palo 中的名字,可以不同。
而 CREATE TABLE 语句中的 table_name 是该 mysql 表在 Doris 中的名字,可以不同。
Palo 创建 mysql 表的目的是可以通过 Palo 访问 mysql 数据库。
Palo 本身并不维护、存储任何 mysql 数据。
1) 如果是 broker,表示表的访问需要通过指定的broker, 需要在 properties 提供以下信息:
Doris 创建 mysql 表的目的是可以通过 Doris 访问 mysql 数据库。
Doris 本身并不维护、存储任何 mysql 数据。
2) 如果是 broker,表示表的访问需要通过指定的broker, 需要在 properties 提供以下信息:
```
PROPERTIES (
"broker_name" = "broker_name",
@ -152,6 +152,18 @@ under the License.
"path" 中如果有多个文件,用逗号[,]分割。如果文件名中包含逗号,那么使用 %2c 来替代。如果文件名中包含 %,使用 %25 代替
现在文件内容格式支持CSV,支持GZ,BZ2,LZ4,LZO(LZOP) 压缩格式。
3) 如果是 hive,则需要在 properties 提供以下信息:
```
PROPERTIES (
"database" = "hive_db_name",
"table" = "hive_table_name",
"hive.metastore.uris" = "thrift://127.0.0.1:9083"
)
```
其中 database 是 hive 表对应的库名字,table 是 hive 表的名字,hive.metastore.uris 是 hive metastore 服务地址。
注意:目前hive外部表仅用于Spark Load使用,不支持查询。
1. key_desc
语法:
`key_type(k1[,k2 ...])`
@ -162,7 +174,7 @@ under the License.
适合报表、多维分析等业务场景。
UNIQUE KEY:key列相同的记录,value列按导入顺序进行覆盖,
适合按key列进行增删改查的点查询业务。
DUPLICATE KEY:key列相同的记录,同时存在于Palo中,
DUPLICATE KEY:key列相同的记录,同时存在于Doris中,
适合存储明细数据或者数据无聚合特性的业务场景。
默认为DUPLICATE KEY,key列为列定义中前36个字节, 如果前36个字节的列数小于3,将使用前三列。
注意:
@ -607,6 +619,24 @@ under the License.
PROPERTIES ("in_memory"="true");
```
13. 创建一个hive外部表
```
CREATE TABLE example_db.table_hive
(
k1 TINYINT,
k2 VARCHAR(50),
v INT
)
ENGINE=hive
PROPERTIES
(
"database" = "hive_db_name",
"table" = "hive_table_name",
"hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
```
## keyword
CREATE,TABLE

View File

@ -1244,6 +1244,15 @@ data_desc ::=
RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat,
columnsFromPath, isNeg, colMappingList, whereExpr);
:}
| KW_DATA KW_FROM KW_TABLE ident:srcTableName
opt_negative:isNeg
KW_INTO KW_TABLE ident:tableName
opt_partition_names:partitionNames
opt_col_mapping_list:colMappingList
where_clause:whereExpr
{:
RESULT = new DataDescription(tableName, partitionNames, srcTableName, isNeg, colMappingList, whereExpr);
:}
;
opt_negative ::=

View File

@ -87,11 +87,15 @@ public class CreateTableStmt extends DdlStmt {
engineNames.add("mysql");
engineNames.add("broker");
engineNames.add("elasticsearch");
engineNames.add("hive");
}
// for backup. set to -1 for normal use
private int tableSignature;
// TODO(wyb): spark-load
private static boolean disableHiveTable = true;
public CreateTableStmt() {
// for persist
tableName = new TableName();
@ -253,8 +257,12 @@ public class CreateTableStmt extends DdlStmt {
analyzeEngineName();
// TODO(wyb): spark-load
if (engineName.equals("hive") && disableHiveTable) {
throw new AnalysisException("Spark Load from hive table is comming soon");
}
// analyze key desc
if (!(engineName.equals("mysql") || engineName.equals("broker"))) {
if (!(engineName.equals("mysql") || engineName.equals("broker") || engineName.equals("hive"))) {
// olap table
if (keysDesc == null) {
List<String> keysColumnNames = Lists.newArrayList();
@ -299,7 +307,7 @@ public class CreateTableStmt extends DdlStmt {
}
}
} else {
// mysql and broker do not need key desc
// mysql, broker and hive do not need key desc
if (keysDesc != null) {
throw new AnalysisException("Create " + engineName + " table should not contain keys desc");
}
@ -433,7 +441,7 @@ public class CreateTableStmt extends DdlStmt {
}
if (engineName.equals("mysql") || engineName.equals("broker")
|| engineName.equals("elasticsearch")) {
|| engineName.equals("elasticsearch") || engineName.equals("hive")) {
if (!isExternal) {
// this is for compatibility
isExternal = true;

View File

@ -58,6 +58,14 @@ import java.util.TreeSet;
// [(tmp_col1, tmp_col2, col3, ...)]
// [COLUMNS FROM PATH AS (col1, ...)]
// [SET (k1=f1(xx), k2=f2(xxx))]
// [where_clause]
//
// DATA FROM TABLE external_hive_tbl_name
// [NEGATIVE]
// INTO TABLE tbl_name
// [PARTITION (p1, p2)]
// [SET (k1=f1(xx), k2=f2(xxx))]
// [where_clause]
/**
* The transform of columns should be added after the keyword named COLUMNS.
@ -94,6 +102,8 @@ public class DataDescription {
private final List<Expr> columnMappingList;
private final Expr whereExpr;
private final String srcTableName;
// Used for mini load
private TNetworkAddress beAddr;
private String lineDelimiter;
@ -141,6 +151,27 @@ public class DataDescription {
this.isNegative = isNegative;
this.columnMappingList = columnMappingList;
this.whereExpr = whereExpr;
this.srcTableName = null;
}
// data from table external_hive_table
public DataDescription(String tableName,
PartitionNames partitionNames,
String srcTableName,
boolean isNegative,
List<Expr> columnMappingList,
Expr whereExpr) {
this.tableName = tableName;
this.partitionNames = partitionNames;
this.filePaths = null;
this.fileFieldNames = null;
this.columnSeparator = null;
this.fileFormat = null;
this.columnsFromPath = null;
this.isNegative = isNegative;
this.columnMappingList = columnMappingList;
this.whereExpr = whereExpr;
this.srcTableName = srcTableName;
}
public String getTableName() {
@ -225,6 +256,14 @@ public class DataDescription {
return isHadoopLoad;
}
public String getSrcTableName() {
return srcTableName;
}
public boolean isLoadFromTable() {
return !Strings.isNullOrEmpty(srcTableName);
}
/*
* Analyze parsedExprMap and columnToHadoopFunction from columns, columns from path and columnMappingList
* Example:
@ -530,6 +569,16 @@ public class DataDescription {
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), tableName);
}
// check hive table auth
if (isLoadFromTable()) {
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), fullDbName, srcTableName,
PrivPredicate.SELECT)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "SELECT",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), srcTableName);
}
}
}
public void analyze(String fullDbName) throws AnalysisException {
@ -538,11 +587,13 @@ public class DataDescription {
}
public void analyzeWithoutCheckPriv() throws AnalysisException {
if (filePaths == null || filePaths.isEmpty()) {
throw new AnalysisException("No file path in load statement.");
}
for (int i = 0; i < filePaths.size(); ++i) {
filePaths.set(i, filePaths.get(i).trim());
if (!isLoadFromTable()) {
if (filePaths == null || filePaths.isEmpty()) {
throw new AnalysisException("No file path in load statement.");
}
for (int i = 0; i < filePaths.size(); ++i) {
filePaths.set(i, filePaths.get(i).trim());
}
}
if (columnSeparator != null) {
@ -601,13 +652,17 @@ public class DataDescription {
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("DATA INFILE (");
Joiner.on(", ").appendTo(sb, Lists.transform(filePaths, new Function<String, String>() {
@Override
public String apply(String s) {
return "'" + s + "'";
}
})).append(")");
if (isLoadFromTable()) {
sb.append("DATA FROM TABLE ").append(srcTableName);
} else {
sb.append("DATA INFILE (");
Joiner.on(", ").appendTo(sb, Lists.transform(filePaths, new Function<String, String>() {
@Override
public String apply(String s) {
return "'" + s + "'";
}
})).append(")");
}
if (isNegative) {
sb.append(" NEGATIVE");
}

View File

@ -23,9 +23,15 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import com.google.common.base.Strings;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -72,6 +78,39 @@ public class FromClause implements ParseNode, Iterable<TableRef> {
});
}
private void checkFromHiveTable(Analyzer analyzer) throws AnalysisException {
for (TableRef tblRef : tableRefs_) {
if (!(tblRef instanceof BaseTableRef)) {
continue;
}
TableName tableName = tblRef.getName();
String dbName = tableName.getDb();
if (Strings.isNullOrEmpty(dbName)) {
dbName = analyzer.getDefaultDb();
} else {
dbName = ClusterNamespace.getFullName(analyzer.getClusterName(), tblRef.getName().getDb());
}
if (Strings.isNullOrEmpty(dbName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
Database db = analyzer.getCatalog().getDb(dbName);
if (db == null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
String tblName = tableName.getTbl();
Table table = db.getTable(tblName);
if (table == null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_TABLE_ERROR, tblName);
}
if (table.getType() == Table.TableType.HIVE) {
throw new AnalysisException("Query from hive table is not supported, table: " + tblName);
}
}
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
if (analyzed_) return;
@ -98,6 +137,9 @@ public class FromClause implements ParseNode, Iterable<TableRef> {
leftTblRef = tblRef;
}
// TODO: remove when query from hive table is supported
checkFromHiveTable(analyzer);
analyzed_ = true;
}

View File

@ -25,7 +25,6 @@ import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.SparkLoadJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@ -176,11 +175,6 @@ public class LoadStmt extends DdlStmt {
}
for (Entry<String, String> entry : properties.entrySet()) {
// temporary use for global dict
if (entry.getKey().startsWith(SparkLoadJob.BITMAP_DATA_PROPERTY)) {
continue;
}
if (!PROPERTIES_SET.contains(entry.getKey())) {
throw new DdlException(entry.getKey() + " is invalid property");
}
@ -267,11 +261,27 @@ public class LoadStmt extends DdlStmt {
if (dataDescriptions == null || dataDescriptions.isEmpty()) {
throw new AnalysisException("No data file in load statement.");
}
// check data descriptions, support 2 cases bellow:
// case 1: muti file paths, muti data descriptions
// case 2: one hive table, one data description
boolean isLoadFromTable = false;
for (DataDescription dataDescription : dataDescriptions) {
if (brokerDesc == null && resourceDesc == null) {
dataDescription.setIsHadoopLoad(true);
}
dataDescription.analyze(label.getDbName());
if (dataDescription.isLoadFromTable()) {
isLoadFromTable = true;
}
}
if (isLoadFromTable) {
if (dataDescriptions.size() > 1) {
throw new AnalysisException("Only support one olap table load from one external table");
}
if (resourceDesc == null) {
throw new AnalysisException("Load from table should use Spark Load");
}
}
if (resourceDesc != null) {
@ -296,7 +306,7 @@ public class LoadStmt extends DdlStmt {
// if cluster is not null, use this hadoop cluster
etlJobType = EtlJobType.HADOOP;
}
try {
checkProperties(properties);
} catch (DdlException e) {

View File

@ -2964,6 +2964,9 @@ public class Catalog {
} else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
createEsTable(db, stmt);
return;
} else if (engineName.equalsIgnoreCase("hive")) {
createHiveTable(db, stmt);
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName);
}
@ -3908,6 +3911,18 @@ public class Catalog {
return;
}
private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();
long tableId = getNextId();
HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties());
hiveTable.setComment(stmt.getComment());
if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists())) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
}
public static void getDdlStmt(Table table, List<String> createTableStmt, List<String> addPartitionStmt,
List<String> createRollupStmt, boolean separatePartition, boolean hidePassword) {
StringBuilder sb = new StringBuilder();
@ -3925,7 +3940,7 @@ public class Catalog {
// 1.2 other table type
sb.append("CREATE ");
if (table.getType() == TableType.MYSQL || table.getType() == TableType.ELASTICSEARCH
|| table.getType() == TableType.BROKER) {
|| table.getType() == TableType.BROKER || table.getType() == TableType.HIVE) {
sb.append("EXTERNAL ");
}
sb.append("TABLE ");
@ -4097,6 +4112,17 @@ public class Catalog {
sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n");
sb.append(")");
} else if (table.getType() == TableType.HIVE) {
HiveTable hiveTable = (HiveTable) table;
if (!Strings.isNullOrEmpty(table.getComment())) {
sb.append("\nCOMMENT \"").append(table.getComment()).append("\"");
}
// properties
sb.append("\nPROPERTIES (\n");
sb.append("\"database\" = \"").append(hiveTable.getHiveDb()).append("\",\n");
sb.append("\"table\" = \"").append(hiveTable.getHiveTable()).append("\",\n");
sb.append(new PrintableMap<>(hiveTable.getHiveProperties(), " = ", true, true, false).toString());
sb.append("\n)");
}
sb.append(";");

View File

@ -0,0 +1,130 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* External hive table
* Currently only support loading from hive table
*/
public class HiveTable extends Table {
private static final String PROPERTY_MISSING_MSG = "Hive %s is null. Please add properties('%s'='xxx') when create table";
private static final String HIVE_DB = "database";
private static final String HIVE_TABLE = "table";
private static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
private String hiveDb;
private String hiveTable;
private Map<String, String> hiveProperties = Maps.newHashMap();
public HiveTable() {
super(TableType.HIVE);
}
public HiveTable(long id, String name, List<Column> schema, Map<String, String> properties) throws DdlException {
super(id, name, TableType.HIVE, schema);
validate(properties);
}
public String getHiveDbTable() {
return String.format("%s.%s", hiveDb, hiveTable);
}
public String getHiveDb() {
return hiveDb;
}
public String getHiveTable() {
return hiveTable;
}
public Map<String, String> getHiveProperties() {
return hiveProperties;
}
private void validate(Map<String, String> properties) throws DdlException {
if (properties == null) {
throw new DdlException("Please set properties of hive table, "
+ "they are: database, table and 'hive.metastore.uris'");
}
Map<String, String> copiedProps = Maps.newHashMap(properties);
hiveDb = copiedProps.get(HIVE_DB);
if (Strings.isNullOrEmpty(hiveDb)) {
throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_DB, HIVE_DB));
}
copiedProps.remove(HIVE_DB);
hiveTable = copiedProps.get(HIVE_TABLE);
if (Strings.isNullOrEmpty(hiveTable)) {
throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_TABLE, HIVE_TABLE));
}
copiedProps.remove(HIVE_TABLE);
// check hive properties
// hive.metastore.uris
String hiveMetastoreUris = copiedProps.get(HIVE_METASTORE_URIS);
if (Strings.isNullOrEmpty(hiveMetastoreUris)) {
throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_METASTORE_URIS, HIVE_METASTORE_URIS));
}
copiedProps.remove(HIVE_METASTORE_URIS);
hiveProperties.put(HIVE_METASTORE_URIS, hiveMetastoreUris);
if (!copiedProps.isEmpty()) {
throw new DdlException("Unknown table properties: " + copiedProps.toString());
}
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, hiveDb);
Text.writeString(out, hiveTable);
out.writeInt(hiveProperties.size());
for (Map.Entry<String, String> entry : hiveProperties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
public void readFields(DataInput in) throws IOException {
super.readFields(in);
hiveDb = Text.readString(in);
hiveTable = Text.readString(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String val = Text.readString(in);
hiveProperties.put(key, val);
}
}
}

View File

@ -52,7 +52,8 @@ public class Table extends MetaObject implements Writable {
INLINE_VIEW,
VIEW,
BROKER,
ELASTICSEARCH
ELASTICSEARCH,
HIVE
}
protected long id;
@ -179,6 +180,8 @@ public class Table extends MetaObject implements Writable {
table = new BrokerTable();
} else if (type == TableType.ELASTICSEARCH) {
table = new EsTable();
} else if (type == TableType.HIVE) {
table = new HiveTable();
} else {
throw new IOException("Unknown table type: " + type.name());
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
@ -78,6 +79,10 @@ public class BrokerFileGroup implements Writable {
// filter the data which has been conformed
private Expr whereExpr;
// load from table
private long srcTableId = -1;
private boolean isLoadFromTable = false;
// for unit test and edit log persistence
private BrokerFileGroup() {
}
@ -165,6 +170,33 @@ public class BrokerFileGroup implements Writable {
// FilePath
filePaths = dataDescription.getFilePaths();
if (dataDescription.isLoadFromTable()) {
String srcTableName = dataDescription.getSrcTableName();
// src table should be hive table
Table srcTable = db.getTable(srcTableName);
if (srcTable == null) {
throw new DdlException("Unknown table " + srcTableName + " in database " + db.getFullName());
}
if (!(srcTable instanceof HiveTable)) {
throw new DdlException("Source table " + srcTableName + " is not HiveTable");
}
// src table columns should include all columns of loaded table
for (Column column : olapTable.getBaseSchema()) {
boolean isIncluded = false;
for (Column srcColumn : srcTable.getBaseSchema()) {
if (srcColumn.getName().equalsIgnoreCase(column.getName())) {
isIncluded = true;
break;
}
}
if (!isIncluded) {
throw new DdlException("Column " + column.getName() + " is not in Source table");
}
}
srcTableId = srcTable.getId();
isLoadFromTable = true;
}
}
public long getTableId() {
@ -207,10 +239,22 @@ public class BrokerFileGroup implements Writable {
return columnExprList;
}
public List<String> getFileFieldNames() {
return fileFieldNames;
}
public Map<String, Pair<String, List<String>>> getColumnToHadoopFunction() {
return columnToHadoopFunction;
}
public long getSrcTableId() {
return srcTableId;
}
public boolean isLoadFromTable() {
return isLoadFromTable;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -261,6 +305,8 @@ public class BrokerFileGroup implements Writable {
sb.append(path);
}
sb.append("]");
sb.append(",srcTableId=").append(srcTableId);
sb.append(",isLoadFromTable=").append(isLoadFromTable);
sb.append("}");
return sb.toString();
@ -310,6 +356,13 @@ public class BrokerFileGroup implements Writable {
out.writeBoolean(true);
Text.writeString(out, fileFormat);
}
// src table
// TODO(wyb): spark-load
/*
out.writeLong(srcTableId);
out.writeBoolean(isLoadFromTable);
*/
}
public void readFields(DataInput in) throws IOException {
@ -360,6 +413,14 @@ public class BrokerFileGroup implements Writable {
fileFormat = Text.readString(in);
}
}
// src table
// TODO(wyb): spark-load
/*
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.new_version) {
srcTableId = in.readLong();
isLoadFromTable = in.readBoolean();
}
*/
// There are no columnExprList in the previous load job which is created before function is supported.
// The columnExprList could not be analyzed without origin stmt in the previous load job.

View File

@ -58,9 +58,6 @@ import java.util.Set;
public class SparkLoadJob extends BulkLoadJob {
private static final Logger LOG = LogManager.getLogger(SparkLoadJob.class);
// for global dict
public static final String BITMAP_DATA_PROPERTY = "bitmap_data";
// --- members below need persist ---
// create from resourceDesc when job created
private SparkResource sparkResource;
@ -75,10 +72,6 @@ public class SparkLoadJob extends BulkLoadJob {
private Map<String, Pair<String, Long>> tabletMetaToFileInfo = Maps.newHashMap();
// --- members below not persist ---
// temporary use
// one SparkLoadJob has only one table to load
// hivedb.table for global dict
private String hiveTableName = "";
private ResourceDesc resourceDesc;
// for spark standalone
private SparkAppHandle sparkAppHandle;
@ -107,23 +100,12 @@ public class SparkLoadJob extends BulkLoadJob {
jobType = EtlJobType.SPARK;
}
public String getHiveTableName() {
return hiveTableName;
}
@Override
protected void setJobProperties(Map<String, String> properties) throws DdlException {
super.setJobProperties(properties);
// set spark resource and broker desc
setResourceInfo();
// global dict
if (properties != null) {
if (properties.containsKey(BITMAP_DATA_PROPERTY)) {
hiveTableName = properties.get(BITMAP_DATA_PROPERTY);
}
}
}
/**
@ -190,7 +172,6 @@ public class SparkLoadJob extends BulkLoadJob {
}
}
// clear job infos that not persist
hiveTableName = "";
sparkAppHandle = null;
resourceDesc = null;
tableToLoadPartitions.clear();

View File

@ -243,14 +243,19 @@ public class EtlJobConfig implements Serializable {
}
}
public static enum ConfigVersion {
public enum ConfigVersion {
V1
}
public static enum FilePatternVersion {
public enum FilePatternVersion {
V1
}
public enum SourceType {
FILE,
HIVE
}
public static class EtlTable implements Serializable {
@SerializedName(value = "indexes")
public List<EtlIndex> indexes;
@ -454,6 +459,8 @@ public class EtlJobConfig implements Serializable {
}
public static class EtlFileGroup implements Serializable {
@SerializedName(value = "sourceType")
public SourceType sourceType = SourceType.FILE;
@SerializedName(value = "filePaths")
public List<String> filePaths;
@SerializedName(value = "fileFieldNames")
@ -474,12 +481,17 @@ public class EtlJobConfig implements Serializable {
public String where;
@SerializedName(value = "partitions")
public List<Long> partitions;
@SerializedName(value = "hiveTableName")
public String hiveTableName;
@SerializedName(value = "hiveDbTableName")
public String hiveDbTableName;
@SerializedName(value = "hiveTableProperties")
public Map<String, String> hiveTableProperties;
public EtlFileGroup(List<String> filePaths, List<String> fileFieldNames, List<String> columnsFromPath,
String columnSeparator, String lineDelimiter, boolean isNegative, String fileFormat,
Map<String, EtlColumnMapping> columnMappings, String where, List<Long> partitions) {
// for data infile path
public EtlFileGroup(SourceType sourceType, List<String> filePaths, List<String> fileFieldNames,
List<String> columnsFromPath, String columnSeparator, String lineDelimiter,
boolean isNegative, String fileFormat, Map<String, EtlColumnMapping> columnMappings,
String where, List<Long> partitions) {
this.sourceType = sourceType;
this.filePaths = filePaths;
this.fileFieldNames = fileFieldNames;
this.columnsFromPath = columnsFromPath;
@ -492,10 +504,24 @@ public class EtlJobConfig implements Serializable {
this.partitions = partitions;
}
// for data from table
public EtlFileGroup(SourceType sourceType, String hiveDbTableName, Map<String, String> hiveTableProperties,
boolean isNegative, Map<String, EtlColumnMapping> columnMappings,
String where, List<Long> partitions) {
this.sourceType = sourceType;
this.hiveDbTableName = hiveDbTableName;
this.hiveTableProperties = hiveTableProperties;
this.isNegative = isNegative;
this.columnMappings = columnMappings;
this.where = where;
this.partitions = partitions;
}
@Override
public String toString() {
return "EtlFileGroup{" +
"filePaths=" + filePaths +
"sourceType=" + sourceType +
", filePaths=" + filePaths +
", fileFieldNames=" + fileFieldNames +
", columnsFromPath=" + columnsFromPath +
", columnSeparator='" + columnSeparator + '\'' +
@ -505,7 +531,8 @@ public class EtlJobConfig implements Serializable {
", columnMappings=" + columnMappings +
", where='" + where + '\'' +
", partitions=" + partitions +
", hiveTableName='" + hiveTableName + '\'' +
", hiveDbTableName='" + hiveDbTableName + '\'' +
", hiveTableProperties=" + hiveTableProperties +
'}';
}
}

View File

@ -144,6 +144,17 @@ public class DataDescriptionTest {
sql = "DATA INFILE ('abc.txt') INTO TABLE testTable PARTITIONS (p1, p2) (k2, k3)"
+ " SET (`k1` = replace_value('', NULL))";
Assert.assertEquals(sql, desc.toString());
// data from table and set bitmap_dict
params.clear();
params.add(new SlotRef(null, "k2"));
predicate = new BinaryPredicate(Operator.EQ, new SlotRef(null, "k1"),
new FunctionCallExpr("bitmap_dict", params));
desc = new DataDescription("testTable", new PartitionNames(false, Lists.newArrayList("p1", "p2")),
"testHiveTable", false, Lists.newArrayList(predicate), null);
desc.analyze("testDb");
sql = "DATA FROM TABLE testHiveTable INTO TABLE testTable PARTITIONS (p1, p2) SET (`k1` = bitmap_dict(`k2`))";
Assert.assertEquals(sql, desc.toSql());
}
@Test(expected = AnalysisException.class)

View File

@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class HiveTableTest {
private String hiveDb;
private String hiveTable;
private List<Column> columns;
private Map<String, String> properties;
@Before
public void setUp() {
hiveDb = "db0";
hiveTable = "table0";
columns = Lists.newArrayList();
Column column = new Column("col1", PrimitiveType.BIGINT);
columns.add(column);
properties = Maps.newHashMap();
properties.put("database", hiveDb);
properties.put("table", hiveTable);
properties.put("hive.metastore.uris", "thrift://127.0.0.1:9083");
}
@Test
public void testNormal() throws DdlException {
HiveTable table = new HiveTable(1000, "hive_table", columns, properties);
Assert.assertEquals(String.format("%s.%s", hiveDb, hiveTable), table.getHiveDbTable());
Assert.assertEquals(1, table.getHiveProperties().size());
}
@Test(expected = DdlException.class)
public void testNoDb() throws DdlException {
properties.remove("database");
new HiveTable(1000, "hive_table", columns, properties);
Assert.fail("No exception throws.");
}
@Test(expected = DdlException.class)
public void testNoTbl() throws DdlException {
properties.remove("table");
new HiveTable(1000, "hive_table", columns, properties);
Assert.fail("No exception throws.");
}
@Test(expected = DdlException.class)
public void testNoHiveMetastoreUris() throws DdlException {
properties.remove("hive.metastore.uris");
new HiveTable(1000, "hive_table", columns, properties);
Assert.fail("No exception throws.");
}
}