Support sequence column for UNIQUE_KEYS Table (#4256)

* add sequence  col

Co-authored-by: yangwenbo6 <yangwenbo3@jd.com>
This commit is contained in:
Youngwb
2020-09-04 10:10:17 +08:00
committed by GitHub
parent 72f04ebdb8
commit 068707484d
49 changed files with 989 additions and 40 deletions

View File

@ -287,7 +287,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt,
describe_stmt, alter_stmt,
use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
import_columns_stmt, import_delete_on_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt;
import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt;
nonterminal ImportColumnDesc import_column_desc;
nonterminal List<ImportColumnDesc> import_column_descs;
@ -344,6 +344,7 @@ nonterminal FunctionName function_name;
nonterminal Expr where_clause;
nonterminal Expr delete_on_clause;
nonterminal Expr where_clause_without_null;
nonterminal String sequence_col_clause;
nonterminal Predicate predicate, between_predicate, comparison_predicate,
compound_predicate, in_predicate, like_predicate, exists_predicate;
nonterminal ArrayList<Expr> opt_partition_by_clause;
@ -535,6 +536,10 @@ stmts ::=
{:
RESULT = Lists.newArrayList(stmt);
:}
| import_sequence_stmt:stmt
{:
RESULT = Lists.newArrayList(stmt);
:}
;
import_columns_stmt ::=
@ -581,6 +586,13 @@ import_delete_on_stmt ::=
:}
;
import_sequence_stmt ::=
KW_ORDER KW_BY ident:s
{:
RESULT = new ImportSequenceStmt(s);
:}
;
stmt ::=
alter_stmt:stmt
{: RESULT = stmt; :}
@ -1345,9 +1357,10 @@ data_desc ::=
opt_col_mapping_list:colMappingList
where_clause:whereExpr
delete_on_clause:deleteExpr
sequence_col_clause:sequenceColName
{:
RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat,
columnsFromPath, isNeg, colMappingList, whereExpr, mergeType, deleteExpr);
columnsFromPath, isNeg, colMappingList, whereExpr, mergeType, deleteExpr, sequenceColName);
:}
| opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName
opt_negative:isNeg
@ -1543,6 +1556,10 @@ load_property ::=
{:
RESULT = deletePredicate;
:}
| import_sequence_stmt:sequenceColumn
{:
RESULT = sequenceColumn;
:}
| partition_names:partitionNames
{:
RESULT = partitionNames;
@ -3668,6 +3685,13 @@ delete_on_clause ::=
{: RESULT = e; :}
;
sequence_col_clause ::=
/* empty */
{: RESULT = null; :}
| KW_ORDER KW_BY ident:s
{: RESULT = s; :}
;
where_clause_without_null ::=
KW_WHERE expr:e
{: RESULT = e; :}

View File

@ -506,6 +506,7 @@ public class MaterializedViewHandler extends AlterHandler {
// a. all columns should exist in base rollup schema
// b. value after key
// c. if rollup contains REPLACE column, all keys on base index should be included.
// d. if base index has sequence column for unique_keys, rollup should add the sequence column
List<Column> rollupSchema = new ArrayList<Column>();
// check (a)(b)
boolean meetValue = false;
@ -552,6 +553,19 @@ public class MaterializedViewHandler extends AlterHandler {
}
}
}
if (KeysType.UNIQUE_KEYS == keysType && olapTable.hasSequenceCol()) {
if (meetValue) {
// check sequence column already exist in the rollup schema
for (Column col : rollupSchema) {
if (col.isSequenceColumn()) {
throw new DdlException("sequence column already exist in the Rollup schema");
}
}
// add the sequence column
rollupSchema.add(new Column(Column.SEQUENCE_COL, olapTable.getSequenceType(),
false, AggregateType.REPLACE, true, null, "", false));
}
}
} else if (KeysType.DUP_KEYS == keysType) {
// supplement the duplicate key
if (addRollupClause.getDupKeys() == null || addRollupClause.getDupKeys().isEmpty()) {

View File

@ -306,6 +306,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
ColumnSeparator columnSeparator = null;
ImportColumnsStmt importColumnsStmt = null;
ImportWhereStmt importWhereStmt = null;
ImportSequenceStmt importSequenceStmt = null;
PartitionNames partitionNames = null;
ImportDeleteOnStmt importDeleteOnStmt = null;
for (ParseNode parseNode : loadPropertyList) {
@ -341,10 +342,17 @@ public class CreateRoutineLoadStmt extends DdlStmt {
throw new AnalysisException("repeat setting of delete predicate");
}
importDeleteOnStmt = (ImportDeleteOnStmt) parseNode;
} else if (parseNode instanceof ImportSequenceStmt) {
// check sequence column
if (importSequenceStmt != null) {
throw new AnalysisException("repeat setting of sequence column");
}
importSequenceStmt = (ImportSequenceStmt) parseNode;
}
}
routineLoadDesc = new RoutineLoadDesc(columnSeparator, importColumnsStmt, importWhereStmt,
partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType);
partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType,
importSequenceStmt == null ? null : importSequenceStmt.getSequenceColName());
}
private void checkJobProperties() throws UserException {

View File

@ -20,7 +20,10 @@ package org.apache.doris.analysis;
import org.apache.doris.analysis.BinaryPredicate.Operator;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
@ -109,6 +112,8 @@ public class DataDescription {
private TNetworkAddress beAddr;
private String lineDelimiter;
private String sequenceCol;
// Merged from fileFieldNames, columnsFromPath and columnMappingList
// ImportColumnDesc: column name to (expr or null)
private List<ImportColumnDesc> parsedColumnExprList = Lists.newArrayList();
@ -133,7 +138,7 @@ public class DataDescription {
boolean isNegative,
List<Expr> columnMappingList) {
this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null,
isNegative, columnMappingList, null, LoadTask.MergeType.APPEND, null);
isNegative, columnMappingList, null, LoadTask.MergeType.APPEND, null, null);
}
public DataDescription(String tableName,
@ -147,7 +152,8 @@ public class DataDescription {
List<Expr> columnMappingList,
Expr whereExpr,
LoadTask.MergeType mergeType,
Expr deleteCondition) {
Expr deleteCondition,
String sequenceColName) {
this.tableName = tableName;
this.partitionNames = partitionNames;
this.filePaths = filePaths;
@ -161,6 +167,7 @@ public class DataDescription {
this.srcTableName = null;
this.mergeType = mergeType;
this.deleteCondition = deleteCondition;
this.sequenceCol = sequenceColName;
}
// data from table external_hive_table
@ -256,6 +263,14 @@ public class DataDescription {
this.lineDelimiter = lineDelimiter;
}
public String getSequenceCol() {
return sequenceCol;
}
public boolean hasSequenceCol() {
return !Strings.isNullOrEmpty(sequenceCol);
}
@Deprecated
public void addColumnMapping(String functionName, Pair<String, List<String>> pair) {
if (Strings.isNullOrEmpty(functionName) || pair == null) {
@ -408,6 +423,55 @@ public class DataDescription {
columnToHadoopFunction.put(columnName, functionPair);
}
private void analyzeSequenceCol(String fullDbName) throws AnalysisException {
Database db = Catalog.getCurrentCatalog().getDb(fullDbName);
if (db == null) {
throw new AnalysisException("Database[" + fullDbName + "] does not exist");
}
Table table = db.getTable(tableName);
if (table == null) {
throw new AnalysisException("Unknown table " + tableName
+ " in database " + db.getFullName());
}
if (!(table instanceof OlapTable)) {
throw new AnalysisException("Table " + table.getName() + " is not OlapTable");
}
OlapTable olapTable = (OlapTable) table;
// no sequence column in load and table schema
if (!hasSequenceCol() && !olapTable.hasSequenceCol()) {
return;
}
// check olapTable schema and sequenceCol
if (olapTable.hasSequenceCol() && !hasSequenceCol()) {
throw new AnalysisException("Table " + table.getName() + " has sequence column, need to specify the sequence column");
}
if (hasSequenceCol() && !olapTable.hasSequenceCol()) {
throw new AnalysisException("There is no sequence column in the table " + table.getName());
}
// check source sequence column is in parsedColumnExprList or Table base schema
boolean hasSourceSequenceCol = false;
if (!parsedColumnExprList.isEmpty()) {
for (ImportColumnDesc importColumnDesc : parsedColumnExprList) {
if (importColumnDesc.getColumnName().equals(sequenceCol)) {
hasSourceSequenceCol = true;
break;
}
}
} else {
List<Column> columns = olapTable.getBaseSchema();
for (Column column : columns) {
if (column.getName().equals(sequenceCol)) {
hasSourceSequenceCol = true;
break;
}
}
}
if (!hasSourceSequenceCol) {
throw new AnalysisException("There is no sequence column " + sequenceCol + " in the " + table.getName()
+ " or the COLUMNS and SET clause");
}
}
public static void validateMappingFunction(String functionName, List<String> args,
Map<String, String> columnNameMap,
Column mappingColumn, boolean isHadoopLoad) throws AnalysisException {
@ -616,13 +680,13 @@ public class DataDescription {
throw new AnalysisException("not support MERGE or DELETE with NEGATIVE");
}
checkLoadPriv(fullDbName);
analyzeWithoutCheckPriv();
analyzeWithoutCheckPriv(fullDbName);
if (isNegative && mergeType != LoadTask.MergeType.APPEND) {
throw new AnalysisException("Negative is only used when merge type is append.");
}
}
public void analyzeWithoutCheckPriv() throws AnalysisException {
public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException {
if (!isLoadFromTable()) {
if (filePaths == null || filePaths.isEmpty()) {
throw new AnalysisException("No file path in load statement.");
@ -641,6 +705,7 @@ public class DataDescription {
}
analyzeColumns();
analyzeSequenceCol(fullDbName);
}
/*

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
public class ImportSequenceStmt extends StatementBase {
private String sequenceColName;
public ImportSequenceStmt(String sequenceColName) {
this.sequenceColName = sequenceColName;
}
public String getSequenceColName() {
return sequenceColName;
}
@Override
public RedirectStatus getRedirectStatus() {
return null;
}
}

View File

@ -3716,6 +3716,17 @@ public class Catalog {
rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType);
}
// analyse sequence column
Type sequenceColType = null;
try {
sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType());
if (sequenceColType != null) {
olapTable.setSequenceInfo(sequenceColType);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
// analyze version info
Pair<Long, Long> versionInfo = null;
try {

View File

@ -48,6 +48,7 @@ import java.util.List;
public class Column implements Writable {
private static final Logger LOG = LogManager.getLogger(Column.class);
public static final String DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String SEQUENCE_COL = "__DORIS_SEQUENCE_COL__";
@SerializedName(value = "name")
private String name;
@SerializedName(value = "type")
@ -185,10 +186,18 @@ public class Column implements Writable {
return visible;
}
public void setIsVisible(boolean isVisible) {
this.visible = isVisible;
}
public boolean isDeleteSignColumn() {
return !visible && aggregationType == AggregateType.REPLACE && nameEquals(DELETE_SIGN, true);
}
public boolean isSequenceColumn() {
return !visible && aggregationType == AggregateType.REPLACE && nameEquals(SEQUENCE_COL, true);
}
public PrimitiveType getDataType() { return type.getPrimitiveType(); }
public Type getType() { return ScalarType.createType(type.getPrimitiveType()); }

View File

@ -123,6 +123,9 @@ public class OlapTable extends Table {
private String colocateGroup;
private boolean hasSequenceCol;
private Type sequenceType;
private TableIndexes indexes;
// In former implementation, base index id is same as table id.
@ -147,6 +150,8 @@ public class OlapTable extends Table {
this.indexes = null;
this.tableProperty = null;
this.hasSequenceCol = false;
}
public OlapTable(long id, String tableName, List<Column> baseSchema, KeysType keysType,
@ -800,6 +805,42 @@ public class OlapTable extends Table {
this.bfFpp = bfFpp;
}
public void setSequenceInfo(Type type) {
this.hasSequenceCol = true;
this.sequenceType = type;
// sequence column is value column with REPLACE aggregate type
Column sequenceCol = new Column(Column.SEQUENCE_COL, type, false, AggregateType.REPLACE, true, null, "", false);
// add sequence column at last
fullSchema.add(sequenceCol);
nameToColumn.put(Column.SEQUENCE_COL, sequenceCol);
for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) {
List<Column> schema = indexMeta.getSchema();
schema.add(sequenceCol);
}
}
public Column getSequenceCol() {
for (Column column : getBaseSchema()) {
if (column.isSequenceColumn()) {
return column;
}
}
return null;
}
public Boolean hasSequenceCol() {
return getSequenceCol() != null;
}
public Type getSequenceType() {
if (getSequenceCol() == null) {
return null;
} else {
return getSequenceCol().getType();
}
}
public void setIndexes(List<Index> indexes) {
if (this.indexes == null) {
this.indexes = new TableIndexes(null);

View File

@ -129,6 +129,15 @@ public class IndexInfoProcDir implements ProcDirInterface {
throw new AnalysisException("Index " + idxId + " does not exist");
}
bfColumns = olapTable.getCopiedBfColumns();
// sequence col is the hidden column
if (olapTable.hasSequenceCol()) {
for (Column column : schema) {
if (column.isSequenceColumn()) {
schema.remove(column);
break;
}
}
}
} else {
schema = table.getBaseSchema();
}

View File

@ -21,8 +21,10 @@ import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@ -86,6 +88,9 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_USE_TEMP_PARTITION_NAME = "use_temp_partition_name";
public static final String PROPERTIES_TYPE = "type";
// This is common prefix for function column
public static final String PROPERTIES_FUNCTION_COLUMN = "function_column";
public static final String PROPERTIES_SEQUENCE_TYPE = "sequence_type";
public static DataProperty analyzeDataProperty(Map<String, String> properties, DataProperty oldDataProperty)
throws AnalysisException {
@ -433,4 +438,24 @@ public class PropertyAnalyzer {
}
return type;
}
public static Type analyzeSequenceType(Map<String, String> properties, KeysType keysType) throws AnalysisException{
String typeStr = null;
String propertyName = PROPERTIES_FUNCTION_COLUMN + "." + PROPERTIES_SEQUENCE_TYPE;
if (properties != null && properties.containsKey(propertyName)) {
typeStr = properties.get(propertyName);
properties.remove(propertyName);
}
if (typeStr == null) {
return null;
}
if (typeStr != null && keysType != KeysType.UNIQUE_KEYS) {
throw new AnalysisException("sequence column only support UNIQUE_KEYS");
}
PrimitiveType type = PrimitiveType.valueOf(typeStr.toUpperCase());
if (!type.isFixedPointType() && !type.isDateType()) {
throw new AnalysisException("sequence type only support integer types and date types");
}
return ScalarType.createType(type);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.load;
import com.google.common.base.Strings;
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
@ -81,6 +82,8 @@ public class BrokerFileGroup implements Writable {
private Expr whereExpr;
private Expr deleteCondition;
private LoadTask.MergeType mergeType;
// sequence column name
private String sequenceCol;
// load from table
private long srcTableId = -1;
@ -108,6 +111,7 @@ public class BrokerFileGroup implements Writable {
this.whereExpr = dataDescription.getWhereExpr();
this.deleteCondition = dataDescription.getDeleteCondition();
this.mergeType = dataDescription.getMergeType();
this.sequenceCol = dataDescription.getSequenceCol();
}
// NOTE: DBLock will be held
@ -268,6 +272,14 @@ public class BrokerFileGroup implements Writable {
return mergeType;
}
public String getSequenceCol() {
return sequenceCol;
}
public boolean hasSequenceCol() {
return !Strings.isNullOrEmpty(sequenceCol);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();

View File

@ -735,7 +735,7 @@ public class Load {
// check mapping column exist in table
// check function
// convert mapping column and func arg columns to schema format
// When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in
// their names. These columns are invisible to user, but we need to generate data for these columns.
// So we add column mappings for these column.
@ -750,7 +750,7 @@ public class Load {
if (mappingExpr != null) {
/*
* eg:
* (A, C) SET (B = func(xx))
* (A, C) SET (B = func(xx))
* ->
* (A, C) SET (B = func(xx), __doris_shadow_B = func(xxx))
*/
@ -779,10 +779,10 @@ public class Load {
*/
// do nothing
}
}
}
LOG.debug("after add shadow column. parsedColumnExprList: {}, columnToHadoopFunction: {}",
parsedColumnExprList, columnToHadoopFunction);
@ -974,6 +974,11 @@ public class Load {
// We make a copy of the columnExprs so that our subsequent changes
// to the columnExprs will not affect the original columnExprs.
List<ImportColumnDesc> copiedColumnExprs = Lists.newArrayList(columnExprs);
// check whether the OlapTable has sequenceCol
boolean hasSequenceCol = false;
if (tbl instanceof OlapTable && ((OlapTable)tbl).hasSequenceCol()) {
hasSequenceCol = true;
}
// If user does not specify the file field names, generate it by using base schema of table.
// So that the following process can be unified
@ -981,6 +986,10 @@ public class Load {
if (!specifyFileFieldNames) {
List<Column> columns = tbl.getBaseSchema(false);
for (Column column : columns) {
// columnExprs has sequence column, don't need to generate the sequence column
if (hasSequenceCol && column.isSequenceColumn()) {
continue;
}
ImportColumnDesc columnDesc = new ImportColumnDesc(column.getName());
LOG.debug("add base column {} to stream load task", column.getName());
copiedColumnExprs.add(columnDesc);
@ -1129,7 +1138,7 @@ public class Load {
* The hadoop function includes: replace_value, strftime, time_format, alignment_timestamp, default_value, now.
* It rewrites those function with real function name and param.
* For the other function, the expr only go through this function and the origin expr is returned.
*
*
* @param columnName
* @param originExpr
* @return
@ -1158,7 +1167,7 @@ public class Load {
* We will convert this based on different cases:
* case 1: k1 = replace_value(null, anyval);
* to: k1 = if (k1 is not null, k1, anyval);
*
*
* case 2: k1 = replace_value(anyval1, anyval2);
* to: k1 = if (k1 is not null, if(k1 != anyval1, k1, anyval2), null);
*/
@ -1231,7 +1240,7 @@ public class Load {
/*
* change to:
* UNIX_TIMESTAMP(DATE_FORMAT(FROM_UNIXTIME(ts), "%Y-01-01 00:00:00"));
*
*
*/
// FROM_UNIXTIME

View File

@ -17,6 +17,7 @@
package org.apache.doris.load;
import com.google.common.base.Strings;
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnsStmt;
@ -32,16 +33,19 @@ public class RoutineLoadDesc {
private LoadTask.MergeType mergeType;
// nullable
private final PartitionNames partitionNames;
private final String sequenceColName;
public RoutineLoadDesc(ColumnSeparator columnSeparator, ImportColumnsStmt columnsInfo,
ImportWhereStmt wherePredicate, PartitionNames partitionNames,
Expr deleteCondition, LoadTask.MergeType mergeType) {
Expr deleteCondition, LoadTask.MergeType mergeType,
String sequenceColName) {
this.columnSeparator = columnSeparator;
this.columnsInfo = columnsInfo;
this.wherePredicate = wherePredicate;
this.partitionNames = partitionNames;
this.deleteCondition = deleteCondition;
this.mergeType = mergeType;
this.sequenceColName = sequenceColName;
}
public ColumnSeparator getColumnSeparator() {
@ -68,4 +72,12 @@ public class RoutineLoadDesc {
public Expr getDeleteCondition() {
return deleteCondition;
}
public String getSequenceColName() {
return sequenceColName;
}
public boolean hasSequenceCol() {
return !Strings.isNullOrEmpty(sequenceColName);
}
}

View File

@ -255,14 +255,14 @@ public abstract class BulkLoadJob extends LoadJob {
Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
LoadStmt stmt = null;
try {
stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx);
for (DataDescription dataDescription : stmt.getDataDescriptions()) {
dataDescription.analyzeWithoutCheckPriv();
}
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
throw new DdlException("Database[" + dbId + "] does not exist");
}
stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx);
for (DataDescription dataDescription : stmt.getDataDescriptions()) {
dataDescription.analyzeWithoutCheckPriv(db.getFullName());
}
checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
} catch (Exception e) {
LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)

View File

@ -26,9 +26,11 @@ import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
@ -181,6 +183,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected long maxBatchRows = DEFAULT_MAX_BATCH_ROWS;
protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE;
protected String sequenceCol;
/**
* RoutineLoad support json data.
* Require Params:
@ -324,10 +328,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
if (routineLoadDesc != null) {
columnDescs = Lists.newArrayList();
if (routineLoadDesc.getColumnsInfo() != null) {
ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo();
if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) {
columnDescs = Lists.newArrayList();
for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) {
columnDescs.add(columnDesc);
}
@ -351,6 +355,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
} else if (mergeType == LoadTask.MergeType.DELETE) {
columnDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
}
if (routineLoadDesc.hasSequenceCol()) {
sequenceCol = routineLoadDesc.getSequenceColName();
// add expr for sequence column
columnDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, sequenceCol)));
}
}
}
@ -566,6 +575,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return value;
}
public String getSequenceCol() {
return sequenceCol;
}
public boolean hasSequenceCol() {
return !Strings.isNullOrEmpty(sequenceCol);
}
public int getSizeOfRoutineLoadTaskInfoList() {
readLock();
try {

View File

@ -23,9 +23,11 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
@ -225,6 +227,11 @@ public class BrokerScanNode extends LoadScanNode {
} else if (mergeType == LoadTask.MergeType.DELETE) {
columnExprs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
}
// add columnExpr for sequence column
if (context.fileGroup.hasSequenceCol()) {
columnExprs.add(new ImportColumnDesc(Column.SEQUENCE_COL,
new SlotRef(null, context.fileGroup.getSequenceCol())));
}
}
Load.initColumns(targetTable, columnExprs,

View File

@ -105,6 +105,13 @@ public class StreamLoadPlanner {
&& !destTable.hasDeleteSign() ) {
throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete.");
}
if (destTable.hasSequenceCol() && !taskInfo.hasSequenceCol()) {
throw new UserException("Table " + destTable.getName() + " has sequence column, need to specify the sequence column");
}
if (!destTable.hasSequenceCol() && taskInfo.hasSequenceCol()) {
throw new UserException("There is no sequence column in the table " + destTable.getName());
}
resetAnalyzer();
// construct tuple descriptor, used for scanNode and dataSink
TupleDescriptor tupleDesc = descTable.createTupleDescriptor("DstTableTuple");

View File

@ -166,6 +166,7 @@ public class CreateReplicaTask extends AgentTask {
tSchema.setKeysType(keysType.toThrift());
tSchema.setStorageType(storageType);
int deleteSign = -1;
int sequenceCol = -1;
List<TColumn> tColumns = new ArrayList<TColumn>();
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
@ -184,9 +185,13 @@ public class CreateReplicaTask extends AgentTask {
if (column.isDeleteSignColumn()) {
deleteSign = i;
}
if (column.isSequenceColumn()) {
sequenceCol = i;
}
}
tSchema.setColumns(tColumns);
tSchema.setDeleteSignIdx(deleteSign);
tSchema.setSequenceColIdx(sequenceCol);
if (CollectionUtils.isNotEmpty(indexes)) {
List<TOlapTableIndex> tIndexes = new ArrayList<>();

View File

@ -36,6 +36,7 @@ public interface LoadTaskInfo {
public PartitionNames getPartitions();
public LoadTask.MergeType getMergeType();
public Expr getDeleteCondition();
public boolean hasSequenceCol();
public TFileType getFileType();
public TFileFormatType getFormatType();
public String getJsonPaths();

View File

@ -17,6 +17,7 @@
package org.apache.doris.task;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.doris.analysis.ColumnSeparator;
@ -26,8 +27,10 @@ import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.ImportWhereStmt;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@ -70,6 +73,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private long execMemLimit = 2 * 1024 * 1024 * 1024L; // default is 2GB
private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete
private Expr deleteCondition;
private String sequenceCol;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) {
this.id = id;
@ -164,6 +168,10 @@ public class StreamLoadTask implements LoadTaskInfo {
return deleteCondition;
}
public boolean hasSequenceCol() {
return !Strings.isNullOrEmpty(sequenceCol);
}
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request, Database db) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType());
@ -238,6 +246,11 @@ public class StreamLoadTask implements LoadTaskInfo {
} else if (mergeType == LoadTask.MergeType.DELETE) {
columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
}
if (request.isSetSequenceCol()) {
sequenceCol = request.getSequenceCol();
// add expr for sequence column
columnExprDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, sequenceCol)));
}
}
// used for stream load

View File

@ -18,6 +18,9 @@
package org.apache.doris.analysis;
import org.apache.doris.analysis.BinaryPredicate.Operator;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.load.loadv2.LoadTask;
@ -27,6 +30,7 @@ import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import org.apache.doris.system.SystemInfoService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -43,11 +47,47 @@ public class DataDescriptionTest {
private PaloAuth auth;
@Mocked
private ConnectContext ctx;
@Mocked
private Database db;
@Mocked
private OlapTable tbl;
@Mocked
private Analyzer analyzer;
@Mocked
private Catalog catalog;
@Before
public void setUp() {
public void setUp() throws AnalysisException {
MockedAuth.mockedAuth(auth);
MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1");
new Expectations() {
{
analyzer.getClusterName();
minTimes = 0;
result = SystemInfoService.DEFAULT_CLUSTER;
analyzer.getDefaultDb();
minTimes = 0;
result = "testCluster:testDb";
Catalog.getCurrentCatalog();
minTimes = 0;
result = catalog;
Catalog.getCurrentCatalog();
minTimes = 0;
result = catalog;
catalog.getDb(anyString);
minTimes = 0;
result = db;
db.getTable(anyString);
minTimes = 0;
result = tbl;
}
};
}
@Test
@ -79,7 +119,7 @@ public class DataDescriptionTest {
Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1));
desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, false, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr);
Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, false, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
desc.analyze("testDb");
Assert.assertEquals("MERGE DATA INFILE ('abc.txt') INTO TABLE testTable COLUMNS TERMINATED BY ',' (col1, col2) WHERE 1 = 1 DELETE ON 1 = 1", desc.toString());
Assert.assertEquals("1 = 1", desc.getWhereExpr().toSql());
@ -180,7 +220,7 @@ public class DataDescriptionTest {
Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1));
DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, true, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr);
Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, true, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
desc.analyze("testDb");
}
@ -267,4 +307,42 @@ public class DataDescriptionTest {
}
}
}
@Test
public void testAnalyzeSequenceColumnNormal() throws AnalysisException {
DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
Lists.newArrayList("k1", "k2", "source_sequence","v1"), new ColumnSeparator("\t"),
null, null,false, null, null, LoadTask.MergeType.APPEND, null, "source_sequence");
new Expectations() {
{
tbl.getName();
minTimes = 0;
result = "testTable";
tbl.hasSequenceCol();
minTimes = 0;
result =true;
}
};
desc.analyze("testDb");
}
@Test(expected = AnalysisException.class)
public void testAnalyzeSequenceColumnWithoutSourceSequence() throws AnalysisException {
DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
Lists.newArrayList("k1", "k2","v1"), new ColumnSeparator("\t"),
null, null,false, null, null, LoadTask.MergeType.APPEND, null, "source_sequence");
new Expectations() {
{
tbl.getName();
minTimes = 0;
result = "testTable";
tbl.hasSequenceCol();
minTimes = 0;
result =true;
}
};
desc.analyze("testDb");
}
}

View File

@ -102,6 +102,13 @@ public class CreateTableTest {
.expectThrowsNoException(() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) \n"
+ "distributed by hash(key1) buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
ExceptionChecker
.expectThrowsNoException(() -> createTable("create table test.tbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n"
+ "unique key(k1, k2)\n"
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n"
+ "'function_column.sequence_type' = 'int');"));
Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
OlapTable tbl6 = (OlapTable) db.getTable("tbl6");
Assert.assertTrue(tbl6.getColumn("k1").isKey());
@ -112,6 +119,12 @@ public class CreateTableTest {
Assert.assertTrue(tbl7.getColumn("k1").isKey());
Assert.assertFalse(tbl7.getColumn("k2").isKey());
Assert.assertTrue(tbl7.getColumn("k2").getAggregationType() == AggregateType.NONE);
OlapTable tbl8 = (OlapTable) db.getTable("tbl8");
Assert.assertTrue(tbl8.getColumn("k1").isKey());
Assert.assertTrue(tbl8.getColumn("k2").isKey());
Assert.assertFalse(tbl8.getColumn("v1").isKey());
Assert.assertTrue(tbl8.getColumn(Column.SEQUENCE_COL).getAggregationType() == AggregateType.REPLACE);
}
@Test
@ -159,5 +172,21 @@ public class CreateTableTest {
.expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium is SSD in all backends. need: 1",
() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) distributed by hash(key1) \n"
+ "buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
ExceptionChecker
.expectThrowsWithMsg(DdlException.class, "sequence column only support UNIQUE_KEYS",
() -> createTable("create table test.atbl8\n" + "(k1 varchar(40), k2 int, v1 int sum)\n"
+ "aggregate key(k1, k2)\n"
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n"
+ "'function_column.sequence_type' = 'int');"));
ExceptionChecker
.expectThrowsWithMsg(DdlException.class, "sequence type only support integer types and date types",
() -> createTable("create table test.atbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n"
+ "unique key(k1, k2)\n"
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n"
+ "'function_column.sequence_type' = 'double');"));
}
}

View File

@ -19,8 +19,10 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
@ -36,7 +38,6 @@ import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.Load;
import org.apache.doris.load.Source;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.transaction.TransactionState;
@ -45,7 +46,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

View File

@ -19,6 +19,7 @@ package org.apache.doris.load.routineload;
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.ImportSequenceStmt;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.ParseNode;
import org.apache.doris.analysis.PartitionNames;
@ -81,6 +82,8 @@ public class KafkaRoutineLoadJobTest {
private ColumnSeparator columnSeparator = new ColumnSeparator(",");
private ImportSequenceStmt sequenceStmt = new ImportSequenceStmt("source_sequence");
@Mocked
ConnectContext connectContext;
@Mocked
@ -242,7 +245,7 @@ public class KafkaRoutineLoadJobTest {
@Injectable Database database) throws LoadException {
CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt();
RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null,
partitionNames, null, LoadTask.MergeType.APPEND);
partitionNames, null, LoadTask.MergeType.APPEND, null);
Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc);
new Expectations() {
@ -267,7 +270,7 @@ public class KafkaRoutineLoadJobTest {
@Injectable OlapTable table) throws UserException {
CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt();
RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames, null,
LoadTask.MergeType.APPEND);
LoadTask.MergeType.APPEND, sequenceStmt.getSequenceColName());
Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc);
List<Pair<Integer, Long>> partitionIdToOffset = Lists.newArrayList();
List<PartitionInfo> kafkaPartitionInfoList = Lists.newArrayList();
@ -315,6 +318,7 @@ public class KafkaRoutineLoadJobTest {
Assert.assertEquals(topicName, Deencapsulation.getField(kafkaRoutineLoadJob, "topic"));
List<Integer> kafkaPartitionResult = Deencapsulation.getField(kafkaRoutineLoadJob, "customKafkaPartitions");
Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(kafkaPartitionResult));
Assert.assertEquals(sequenceStmt.getSequenceColName(), kafkaRoutineLoadJob.getSequenceCol());
}
private CreateRoutineLoadStmt initCreateRoutineLoadStmt() {

View File

@ -87,7 +87,7 @@ public class StreamLoadPlannerTest {
request.setLoadId(new TUniqueId(2, 3));
request.setFileType(TFileType.FILE_STREAM);
request.setFormatType(TFileFormatType.FORMAT_CSV_PLAIN);
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, null);
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db);
StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, streamLoadTask);
planner.plan(streamLoadTask.getId());
}

View File

@ -28,6 +28,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.OlapTable;
@ -71,6 +72,9 @@ public class StreamLoadScanNodeTest {
@Injectable
ConnectContext connectContext;
@Injectable
Database db;
@Injectable
OlapTable dstTable;
@ -130,10 +134,47 @@ public class StreamLoadScanNodeTest {
return columns;
}
List<Column> getSequenceColSchema() {
List<Column> columns = Lists.newArrayList();
Column k1 = new Column("k1", PrimitiveType.BIGINT);
k1.setIsKey(true);
k1.setIsAllowNull(false);
columns.add(k1);
Column k2 = new Column("k2", ScalarType.createVarchar(25));
k2.setIsKey(true);
k2.setIsAllowNull(true);
columns.add(k2);
// sequence column, it's hidden column
Column sequenceCol = new Column(Column.SEQUENCE_COL, PrimitiveType.BIGINT);
sequenceCol.setIsKey(false);
sequenceCol.setAggregationType(AggregateType.REPLACE, false);
sequenceCol.setIsAllowNull(false);
sequenceCol.setIsVisible(false);
columns.add(sequenceCol);
// sequence column, it's visible column for user, it's equals to the hidden column
Column visibleSequenceCol = new Column("visible_sequence_col", PrimitiveType.BIGINT);
visibleSequenceCol.setIsKey(false);
visibleSequenceCol.setAggregationType(AggregateType.REPLACE, false);
visibleSequenceCol.setIsAllowNull(true);
columns.add(visibleSequenceCol);
Column v1 = new Column("v1", ScalarType.createVarchar(25));
v1.setIsKey(false);
v1.setAggregationType(AggregateType.REPLACE, false);
v1.setIsAllowNull(false);
columns.add(v1);
return columns;
}
private StreamLoadScanNode getStreamLoadScanNode(TupleDescriptor dstDesc, TStreamLoadPutRequest request)
throws UserException {
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, null);
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db);
StreamLoadScanNode scanNode = new StreamLoadScanNode(streamLoadTask.getId(), new PlanNodeId(1), dstDesc, dstTable, streamLoadTask);
return scanNode;
}
@ -197,7 +238,7 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setColumns("k1, k2, v1");
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, null);
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db);
StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request);
scanNode.init(analyzer);
@ -273,7 +314,7 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setColumns("k1,k2,v1, v2=k2");
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, null);
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db);
StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request);
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -321,7 +362,7 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setFileType(TFileType.FILE_STREAM);
request.setColumns("k1,k2, v1=" + FunctionSet.HLL_HASH + "(k2)");
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, null);
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db);
StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request);
scanNode.init(analyzer);
@ -643,4 +684,146 @@ public class StreamLoadScanNodeTest {
columnExprs.add(new ImportColumnDesc("c3", new FunctionCallExpr("func", Lists.newArrayList())));
Load.initColumns(table, columnExprs, null, null, null, null, null, null);
}
@Test
public void testSequenceColumnWithSetColumns() throws UserException {
Analyzer analyzer = new Analyzer(catalog, connectContext);
DescriptorTable descTbl = analyzer.getDescTbl();
List<Column> columns = getSequenceColSchema();
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
for (Column column : columns) {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
System.out.println(column);
slot.setColumn(column);
slot.setIsMaterialized(true);
if (column.isAllowNull()) {
slot.setIsNullable(true);
} else {
slot.setIsNullable(false);
}
}
new Expectations() {
{
db.getTable(anyInt);
result = dstTable;
minTimes = 0;
dstTable.hasSequenceCol();
result = true;
}
};
new Expectations() {
{
dstTable.getColumn("k1");
result = columns.stream().filter(c -> c.getName().equals("k1")).findFirst().get();
minTimes = 0;
dstTable.getColumn("k2");
result = columns.stream().filter(c -> c.getName().equals("k2")).findFirst().get();
minTimes = 0;
dstTable.getColumn(Column.SEQUENCE_COL);
result = columns.stream().filter(c -> c.getName().equals(Column.SEQUENCE_COL)).findFirst().get();
minTimes = 0;
dstTable.getColumn("visible_sequence_col");
result = columns.stream().filter(c -> c.getName().equals("visible_sequence_col")).findFirst().get();
minTimes = 0;
dstTable.getColumn("v1");
result = columns.stream().filter(c -> c.getName().equals("v1")).findFirst().get();
minTimes = 0;
// there is no "source_sequence" column in the Table
dstTable.getColumn("source_sequence");
result = null;
minTimes = 0;
}
};
TStreamLoadPutRequest request = getBaseRequest();
request.setColumns("k1,k2,source_sequence,v1");
request.setFileType(TFileType.FILE_STREAM);
request.setSequenceCol("source_sequence");
StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request);
scanNode.init(analyzer);
scanNode.finalize(analyzer);
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
TPlanNode planNode = new TPlanNode();
scanNode.toThrift(planNode);
}
@Test
public void testSequenceColumnWithoutSetColumns() throws UserException {
Analyzer analyzer = new Analyzer(catalog, connectContext);
DescriptorTable descTbl = analyzer.getDescTbl();
List<Column> columns = getSequenceColSchema();
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
for (Column column : columns) {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
if (column.isAllowNull()) {
slot.setIsNullable(true);
} else {
slot.setIsNullable(false);
}
}
new Expectations() {
{
db.getTable(anyInt);
result = dstTable;
minTimes = 0;
dstTable.hasSequenceCol();
result = true;
}
};
new Expectations() {
{
dstTable.getBaseSchema(anyBoolean); result = columns;
dstTable.getFullSchema(); result = columns;
dstTable.getColumn("k1");
result = columns.stream().filter(c -> c.getName().equals("k1")).findFirst().get();
minTimes = 0;
dstTable.getColumn("k2");
result = columns.stream().filter(c -> c.getName().equals("k2")).findFirst().get();
minTimes = 0;
dstTable.getColumn(Column.SEQUENCE_COL);
result = columns.stream().filter(c -> c.getName().equals(Column.SEQUENCE_COL)).findFirst().get();
minTimes = 0;
dstTable.getColumn("visible_sequence_col");
result = columns.stream().filter(c -> c.getName().equals("visible_sequence_col")).findFirst().get();
minTimes = 0;
dstTable.getColumn("v1");
result = columns.stream().filter(c -> c.getName().equals("v1")).findFirst().get();
minTimes = 0;
dstTable.hasSequenceCol();
result = true;
minTimes = 0;
}
};
TStreamLoadPutRequest request = getBaseRequest();
request.setFileType(TFileType.FILE_STREAM);
request.setSequenceCol("visible_sequence_col");
StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request);
scanNode.init(analyzer);
scanNode.finalize(analyzer);
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
TPlanNode planNode = new TPlanNode();
scanNode.toThrift(planNode);
}
}