[Enhancement](sequence-column) optimize the use of sequence column (#13872)
When you create the Uniq table, you can specify the mapping of sequence column to other columns. You no longer need to specify mapping column when importing.
This commit is contained in:
@ -57,20 +57,38 @@ The principle is the same as that of the reading process during Cumulative Compa
|
||||
The principle is the same as the reading process during Base Compaction.
|
||||
|
||||
### Syntax
|
||||
When the Sequence column creates a table, an attribute is added to the property, which is used to identify the type import of `__DORIS_SEQUENCE_COL__`. The grammar design is mainly to add a mapping from the sequence column to other columns. The settings of each seed method will be described below introduce.
|
||||
|
||||
#### Create Table
|
||||
When you create the Uniq table, you can specify the sequence column type.
|
||||
There are two ways to create a table with sequence column. One is to set the `sequence_col` attribute when creating a table, and the other is to set the `sequence_type` attribute when creating a table.
|
||||
|
||||
#### Set `sequence_col`(recommend)
|
||||
When you create the Uniq table, you can specify the mapping of sequence column to other columns
|
||||
|
||||
```text
|
||||
PROPERTIES (
|
||||
"function_column.sequence_col" = 'column_name',
|
||||
);
|
||||
```
|
||||
The sequence_col is used to specify the mapping of the sequence column to a column in the table, which can be integral and time (DATE, DATETIME). The type of this column cannot be changed after creation.
|
||||
|
||||
The import method is the same as that without the sequence column. It is relatively simple and recommended.
|
||||
|
||||
### Set `sequence_type`
|
||||
|
||||
When you create the Uniq table, you can specify the sequence column type
|
||||
|
||||
```text
|
||||
PROPERTIES (
|
||||
"function_column.sequence_type" = 'Date',
|
||||
);
|
||||
```
|
||||
The sequence_type is used to specify the type of the sequence column, which can be integral and time (DATE / DATETIME).
|
||||
The sequence_type is used to specify the type of the sequence column, which can be integral and time (DATE / DATETIME).
|
||||
|
||||
The mapping column needs to be specified when importing.
|
||||
|
||||
#### Stream Load
|
||||
|
||||
The syntax of the stream load is to add the mapping of hidden columns corresponding to source_sequence in the 'function_column.sequence_col' field in the header, for example
|
||||
The syntax of the stream load is to add the mapping of hidden columns corresponding to source_sequence in the `function_column.sequence_col` field in the header, for example
|
||||
|
||||
```shell
|
||||
curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/testDb/testTbl/_stream_load
|
||||
```
|
||||
@ -128,7 +146,7 @@ The mapping method is the same as above, as shown below
|
||||
```
|
||||
|
||||
## Enable sequence column support
|
||||
If `function_column.sequence_type` is set when creating a new table, the new table will support sequence column. For a table that does not support sequence column, if you want to use this function, you can use the following statement: `ALTER TABLE example_db.my_table ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "Date")` to enable.
|
||||
If `function_column.sequence_col` or `function_column.sequence_type` is set when creating a new table, the new table will support sequence column. For a table that does not support sequence column, if you want to use this function, you can use the following statement: `ALTER TABLE example_db.my_table ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "Date")` to enable.
|
||||
|
||||
If you are not sure whether a table supports sequence column, you can display hidden columns by setting a session variable `SET show_hidden_columns=true`, then use `desc tablename`, if there is a `__DORIS_SEQUENCE_COL__` column in the output, it is supported, if not, it is not supported .
|
||||
|
||||
@ -136,7 +154,7 @@ If `function_column.sequence_type` is set when creating a new table, the new tab
|
||||
Let's take the stream Load as an example to show how to use it
|
||||
1. Create a table that supports sequence column.
|
||||
|
||||
Create the test_table data table of the unique model and specify that the type of the specified sequence column is Date
|
||||
Create the test_table data table of the unique model and specify that the sequence column maps to the `modify_date` column in the table.
|
||||
|
||||
```sql
|
||||
CREATE TABLE test.test_table
|
||||
@ -150,7 +168,7 @@ CREATE TABLE test.test_table
|
||||
UNIQUE KEY(user_id, date, group_id)
|
||||
DISTRIBUTED BY HASH (user_id) BUCKETS 32
|
||||
PROPERTIES(
|
||||
"function_column.sequence_type" = 'Date',
|
||||
"function_column.sequence_col" = 'modify_date',
|
||||
"replication_num" = "1",
|
||||
"in_memory" = "false"
|
||||
);
|
||||
@ -183,7 +201,7 @@ Import the following data
|
||||
```
|
||||
Take the Stream Load as an example here and map the sequence column to the modify_date column
|
||||
```shell
|
||||
curl --location-trusted -u root: -H "function_column.sequence_col: modify_date" -T testData http://host:port/api/test/test_table/_stream_load
|
||||
curl --location-trusted -u root: -T testData http://host:port/api/test/test_table/_stream_load
|
||||
```
|
||||
The results is
|
||||
```sql
|
||||
|
||||
@ -307,6 +307,14 @@ distribution_desc
|
||||
|
||||
`"in_memory" = "true"`
|
||||
|
||||
* `function_column.sequence_col`
|
||||
|
||||
When using the UNIQUE KEY model, you can specify a sequence column. When the KEY columns are the same, REPLACE will be performed according to the sequence column (the larger value replaces the smaller value, otherwise it cannot be replaced)
|
||||
|
||||
The `function_column.sequence_col` is used to specify the mapping of the sequence column to a column in the table, which can be integral and time (DATE, DATETIME). The type of this column cannot be changed after creation. If `function_column.sequence_col` is set, `function_column.sequence_type` is ignored.
|
||||
|
||||
`"function_column.sequence_col" ='column_name'`
|
||||
|
||||
* `function_column.sequence_type`
|
||||
|
||||
When using the UNIQUE KEY model, you can specify a sequence column. When the KEY columns are the same, REPLACE will be performed according to the sequence column (the larger value replaces the smaller value, otherwise it cannot be replaced)
|
||||
|
||||
@ -60,11 +60,24 @@ Base Compaction 时读取过程原理相同。
|
||||
|
||||
## 使用语法
|
||||
|
||||
Sequence列建表时在property中增加了一个属性,用来标识`__DORIS_SEQUENCE_COL__`的类型 导入的语法设计方面主要是增加一个从sequence列的到其他column的映射,各个导种方式设置的将在下面分别介绍
|
||||
Sequence列建表时有两种方式,一种是建表时设置`sequence_col`属性,一种是建表时设置`sequence_type`属性。
|
||||
|
||||
**建表**
|
||||
### 设置`sequence_col`(推荐)
|
||||
|
||||
创建Uniq表时,可以指定sequence列类型
|
||||
创建Uniq表时,指定sequence列到表中其他column的映射
|
||||
|
||||
```text
|
||||
PROPERTIES (
|
||||
"function_column.sequence_col" = 'column_name',
|
||||
);
|
||||
```
|
||||
sequence_col用来指定sequence列到表中某一列的映射,该列可以为整型和时间类型(DATE、DATETIME),创建后不能更改该列的类型。
|
||||
|
||||
导入方式和没有sequence列时一样,使用相对比较简单,推荐使用。
|
||||
|
||||
### 设置`sequence_type`
|
||||
|
||||
创建Uniq表时,指定sequence列类型
|
||||
|
||||
```text
|
||||
PROPERTIES (
|
||||
@ -74,6 +87,8 @@ PROPERTIES (
|
||||
|
||||
sequence_type用来指定sequence列的类型,可以为整型和时间类型(DATE、DATETIME)。
|
||||
|
||||
导入时需要指定sequence列到其他列的映射。
|
||||
|
||||
**Stream Load**
|
||||
|
||||
stream load 的写法是在header中的`function_column.sequence_col`字段添加隐藏列对应的source_sequence的映射, 示例
|
||||
@ -135,7 +150,7 @@ PROPERTIES
|
||||
|
||||
## 启用sequence column支持
|
||||
|
||||
在新建表时如果设置了`function_column.sequence_type` ,则新建表将支持sequence column。 对于一个不支持sequence column的表,如果想要使用该功能,可以使用如下语句: `ALTER TABLE example_db.my_table ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "Date")` 来启用。 如果不确定一个表是否支持sequence column,可以通过设置一个session variable来显示隐藏列 `SET show_hidden_columns=true` ,之后使用`desc tablename`,如果输出中有`__DORIS_SEQUENCE_COL__` 列则支持,如果没有则不支持。
|
||||
在新建表时如果设置了`function_column.sequence_col`或者`function_column.sequence_type` ,则新建表将支持sequence column。 对于一个不支持sequence column的表,如果想要使用该功能,可以使用如下语句: `ALTER TABLE example_db.my_table ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "Date")` 来启用。 如果不确定一个表是否支持sequence column,可以通过设置一个session variable来显示隐藏列 `SET show_hidden_columns=true` ,之后使用`desc tablename`,如果输出中有`__DORIS_SEQUENCE_COL__` 列则支持,如果没有则不支持。
|
||||
|
||||
## 使用示例
|
||||
|
||||
@ -143,7 +158,7 @@ PROPERTIES
|
||||
|
||||
1. 创建支持sequence column的表
|
||||
|
||||
创建unique模型的test_table数据表,并指定指定sequence列的类型为Date
|
||||
创建unique模型的test_table数据表,并指定指定sequence列映射到表中的modify_date列。
|
||||
|
||||
```sql
|
||||
CREATE TABLE test.test_table
|
||||
@ -157,7 +172,7 @@ CREATE TABLE test.test_table
|
||||
UNIQUE KEY(user_id, date, group_id)
|
||||
DISTRIBUTED BY HASH (user_id) BUCKETS 32
|
||||
PROPERTIES(
|
||||
"function_column.sequence_type" = 'Date',
|
||||
"function_column.sequence_col" = 'modify_date',
|
||||
"replication_num" = "1",
|
||||
"in_memory" = "false"
|
||||
);
|
||||
@ -191,10 +206,10 @@ MySQL > desc test_table;
|
||||
1 2020-02-22 1 2020-02-24 b
|
||||
```
|
||||
|
||||
此处以stream load为例, 将sequence column映射为modify_date列
|
||||
此处以stream load为例
|
||||
|
||||
```bash
|
||||
curl --location-trusted -u root: -H "function_column.sequence_col: modify_date" -T testData http://host:port/api/test/test_table/_stream_load
|
||||
curl --location-trusted -u root: -T testData http://host:port/api/test/test_table/_stream_load
|
||||
```
|
||||
|
||||
结果为
|
||||
|
||||
@ -314,6 +314,14 @@ distribution_desc
|
||||
|
||||
`"compression"="zstd"`
|
||||
|
||||
* `function_column.sequence_col`
|
||||
|
||||
当使用 UNIQUE KEY 模型时,可以指定一个sequence列,当KEY列相同时,将按照 sequence 列进行 REPLACE(较大值替换较小值,否则无法替换)
|
||||
|
||||
`function_column.sequence_col`用来指定sequence列到表中某一列的映射,该列可以为整型和时间类型(DATE、DATETIME),创建后不能更改该列的类型。如果设置了`function_column.sequence_col`, `function_column.sequence_type`将被忽略。
|
||||
|
||||
`"function_column.sequence_col" = 'column_name'`
|
||||
|
||||
* `function_column.sequence_type`
|
||||
|
||||
当使用 UNIQUE KEY 模型时,可以指定一个sequence列,当KEY列相同时,将按照 sequence 列进行 REPLACE(较大值替换较小值,否则无法替换)
|
||||
|
||||
@ -574,6 +574,12 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
throw new DdlException("Column[" + columnPos.getLastCol() + "] does not exists");
|
||||
}
|
||||
|
||||
// sequence col can not change type
|
||||
if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && typeChanged
|
||||
&& modColumn.getName().equalsIgnoreCase(olapTable.getSequenceMapCol())) {
|
||||
throw new DdlException("Can not alter sequence column[" + modColumn.getName() + "]");
|
||||
}
|
||||
|
||||
// check if add to first
|
||||
if (columnPos != null && columnPos.isFirst()) {
|
||||
lastColIndex = -1;
|
||||
|
||||
@ -782,6 +782,10 @@ public class DataDescription {
|
||||
if (!hasSequenceCol() && !olapTable.hasSequenceCol()) {
|
||||
return;
|
||||
}
|
||||
// table has sequence map col
|
||||
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null) {
|
||||
return;
|
||||
}
|
||||
// check olapTable schema and sequenceCol
|
||||
if (olapTable.hasSequenceCol() && !hasSequenceCol()) {
|
||||
throw new AnalysisException("Table " + olapTable.getName()
|
||||
|
||||
@ -681,7 +681,12 @@ public class InsertStmt extends DdlStmt {
|
||||
if (exprByName.containsKey(col.getName())) {
|
||||
resultExprs.add(exprByName.get(col.getName()));
|
||||
} else {
|
||||
if (col.getDefaultValue() == null) {
|
||||
// process sequence col, map sequence column to other column
|
||||
if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol()
|
||||
&& col.getName().equals(Column.SEQUENCE_COL)
|
||||
&& ((OlapTable) targetTable).getSequenceMapCol() != null) {
|
||||
resultExprs.add(exprByName.get(((OlapTable) targetTable).getSequenceMapCol()));
|
||||
} else if (col.getDefaultValue() == null) {
|
||||
/*
|
||||
The import stmt has been filtered in function checkColumnCoverage when
|
||||
the default value of column is null and column is not nullable.
|
||||
|
||||
@ -3004,9 +3004,15 @@ public class Env {
|
||||
|
||||
// sequence type
|
||||
if (olapTable.hasSequenceCol()) {
|
||||
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_FUNCTION_COLUMN + "."
|
||||
+ PropertyAnalyzer.PROPERTIES_SEQUENCE_TYPE).append("\" = \"");
|
||||
sb.append(olapTable.getSequenceType().toString()).append("\"");
|
||||
if (olapTable.getSequenceMapCol() != null) {
|
||||
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_FUNCTION_COLUMN + "."
|
||||
+ PropertyAnalyzer.PROPERTIES_SEQUENCE_COL).append("\" = \"");
|
||||
sb.append(olapTable.getSequenceMapCol()).append("\"");
|
||||
} else {
|
||||
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_FUNCTION_COLUMN + "."
|
||||
+ PropertyAnalyzer.PROPERTIES_SEQUENCE_TYPE).append("\" = \"");
|
||||
sb.append(olapTable.getSequenceType().toString()).append("\"");
|
||||
}
|
||||
}
|
||||
|
||||
// disable auto compaction
|
||||
@ -4149,6 +4155,11 @@ public class Env {
|
||||
}
|
||||
}
|
||||
|
||||
// 5. modify sequence map col
|
||||
if (table.hasSequenceCol() && table.getSequenceMapCol().equalsIgnoreCase(colName)) {
|
||||
table.setSequenceMapCol(newColName);
|
||||
}
|
||||
|
||||
table.rebuildFullSchema();
|
||||
|
||||
if (!isReplay) {
|
||||
|
||||
@ -896,6 +896,21 @@ public class OlapTable extends Table {
|
||||
this.bfFpp = bfFpp;
|
||||
}
|
||||
|
||||
public String getSequenceMapCol() {
|
||||
if (tableProperty == null) {
|
||||
return null;
|
||||
}
|
||||
return tableProperty.getSequenceMapCol();
|
||||
}
|
||||
|
||||
// map the sequence column to other column
|
||||
public void setSequenceMapCol(String colName) {
|
||||
if (tableProperty == null) {
|
||||
tableProperty = new TableProperty(new HashMap<>());
|
||||
}
|
||||
tableProperty.setSequenceMapCol(colName);
|
||||
}
|
||||
|
||||
public void setSequenceInfo(Type type) {
|
||||
this.hasSequenceCol = true;
|
||||
this.sequenceType = type;
|
||||
|
||||
@ -290,6 +290,16 @@ public class TableProperty implements Writable {
|
||||
PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, "false"));
|
||||
}
|
||||
|
||||
public void setSequenceMapCol(String colName) {
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_FUNCTION_COLUMN + "."
|
||||
+ PropertyAnalyzer.PROPERTIES_SEQUENCE_COL, colName);
|
||||
}
|
||||
|
||||
public String getSequenceMapCol() {
|
||||
return properties.get(PropertyAnalyzer.PROPERTIES_FUNCTION_COLUMN + "."
|
||||
+ PropertyAnalyzer.PROPERTIES_SEQUENCE_COL);
|
||||
}
|
||||
|
||||
public void buildReplicaAllocation() {
|
||||
try {
|
||||
// Must copy the properties because "analyzeReplicaAllocation" will remove the property
|
||||
|
||||
@ -105,6 +105,7 @@ public class PropertyAnalyzer {
|
||||
// This is common prefix for function column
|
||||
public static final String PROPERTIES_FUNCTION_COLUMN = "function_column";
|
||||
public static final String PROPERTIES_SEQUENCE_TYPE = "sequence_type";
|
||||
public static final String PROPERTIES_SEQUENCE_COL = "sequence_col";
|
||||
|
||||
public static final String PROPERTIES_SWAP_TABLE = "swap";
|
||||
|
||||
@ -620,6 +621,20 @@ public class PropertyAnalyzer {
|
||||
return ScalarType.createType(type);
|
||||
}
|
||||
|
||||
public static String analyzeSequenceMapCol(Map<String, String> properties, KeysType keysType)
|
||||
throws AnalysisException {
|
||||
String sequenceCol = null;
|
||||
String propertyName = PROPERTIES_FUNCTION_COLUMN + "." + PROPERTIES_SEQUENCE_COL;
|
||||
if (properties != null && properties.containsKey(propertyName)) {
|
||||
sequenceCol = properties.get(propertyName);
|
||||
properties.remove(propertyName);
|
||||
}
|
||||
if (sequenceCol != null && keysType != KeysType.UNIQUE_KEYS) {
|
||||
throw new AnalysisException("sequence column only support UNIQUE_KEYS");
|
||||
}
|
||||
return sequenceCol;
|
||||
}
|
||||
|
||||
public static Boolean analyzeBackendDisableProperties(Map<String, String> properties, String key,
|
||||
Boolean defaultValue) {
|
||||
if (properties.containsKey(key)) {
|
||||
|
||||
@ -2026,10 +2026,32 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType);
|
||||
}
|
||||
|
||||
// analyse sequence column
|
||||
// analyse sequence map column
|
||||
String sequenceMapCol = null;
|
||||
try {
|
||||
sequenceMapCol = PropertyAnalyzer.analyzeSequenceMapCol(properties, olapTable.getKeysType());
|
||||
if (sequenceMapCol != null) {
|
||||
Column col = olapTable.getColumn(sequenceMapCol);
|
||||
if (col == null) {
|
||||
throw new DdlException("The specified sequence column[" + sequenceMapCol + "] not exists");
|
||||
}
|
||||
if (!col.getType().isFixedPointType() && !col.getType().isDateType()) {
|
||||
throw new DdlException("Sequence type only support integer types and date types");
|
||||
}
|
||||
olapTable.setSequenceMapCol(sequenceMapCol);
|
||||
olapTable.setSequenceInfo(col.getType());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new DdlException(e.getMessage());
|
||||
}
|
||||
|
||||
// analyse sequence type
|
||||
Type sequenceColType = null;
|
||||
try {
|
||||
sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType());
|
||||
if (sequenceMapCol != null && sequenceColType != null) {
|
||||
throw new DdlException("The sequence_col and sequence_type cannot be set at the same time");
|
||||
}
|
||||
if (sequenceColType != null) {
|
||||
olapTable.setSequenceInfo(sequenceColType);
|
||||
}
|
||||
|
||||
@ -31,6 +31,7 @@ import org.apache.doris.catalog.BrokerTable;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.FsBroker;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -277,9 +278,13 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
columnDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
|
||||
}
|
||||
// add columnExpr for sequence column
|
||||
if (context.fileGroup.hasSequenceCol()) {
|
||||
if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol()) {
|
||||
String sequenceCol = ((OlapTable) targetTable).getSequenceMapCol();
|
||||
if (sequenceCol == null) {
|
||||
sequenceCol = context.fileGroup.getSequenceCol();
|
||||
}
|
||||
columnDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL,
|
||||
new SlotRef(null, context.fileGroup.getSequenceCol())));
|
||||
new SlotRef(null, sequenceCol)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -123,7 +123,7 @@ public class StreamLoadPlanner {
|
||||
throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete.");
|
||||
}
|
||||
|
||||
if (destTable.hasSequenceCol() && !taskInfo.hasSequenceCol()) {
|
||||
if (destTable.hasSequenceCol() && !taskInfo.hasSequenceCol() && destTable.getSequenceMapCol() == null) {
|
||||
throw new UserException("Table " + destTable.getName()
|
||||
+ " has sequence column, need to specify the sequence column");
|
||||
}
|
||||
|
||||
@ -25,6 +25,7 @@ import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
@ -136,9 +137,13 @@ public class StreamLoadScanNode extends LoadScanNode {
|
||||
} else if (mergeType == LoadTask.MergeType.DELETE) {
|
||||
columnExprDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
|
||||
}
|
||||
if (taskInfo.hasSequenceCol()) {
|
||||
if (dstTable instanceof OlapTable && ((OlapTable) dstTable).hasSequenceCol()) {
|
||||
String sequenceCol = ((OlapTable) dstTable).getSequenceMapCol();
|
||||
if (sequenceCol == null) {
|
||||
sequenceCol = taskInfo.getSequenceCol();
|
||||
}
|
||||
columnExprDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL,
|
||||
new SlotRef(null, taskInfo.getSequenceCol())));
|
||||
new SlotRef(null, sequenceCol)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -24,6 +24,7 @@ import org.apache.doris.analysis.IntLiteral;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
@ -191,9 +192,14 @@ public class LoadScanProvider implements FileScanProviderIf {
|
||||
columnDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
|
||||
}
|
||||
// add columnExpr for sequence column
|
||||
if (context.fileGroup.hasSequenceCol()) {
|
||||
columnDescs.descs.add(
|
||||
new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, context.fileGroup.getSequenceCol())));
|
||||
TableIf targetTable = getTargetTable();
|
||||
if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol()) {
|
||||
String sequenceCol = ((OlapTable) targetTable).getSequenceMapCol();
|
||||
if (sequenceCol == null) {
|
||||
sequenceCol = context.fileGroup.getSequenceCol();
|
||||
}
|
||||
columnDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL,
|
||||
new SlotRef(null, sequenceCol)));
|
||||
}
|
||||
List<Integer> srcSlotIds = Lists.newArrayList();
|
||||
Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(),
|
||||
|
||||
@ -169,6 +169,10 @@ public class AlterTest {
|
||||
|
||||
createTable("create table test.show_test (k1 int, k2 int) distributed by hash(k1) "
|
||||
+ "buckets 1 properties(\"replication_num\" = \"1\");");
|
||||
|
||||
createTable("create table test.unique_sequence_col (k1 int, v1 int, v2 date) ENGINE=OLAP "
|
||||
+ " UNIQUE KEY(`k1`) DISTRIBUTED BY HASH(`k1`) BUCKETS 1"
|
||||
+ " PROPERTIES (\"replication_num\" = \"1\", \"function_column.sequence_col\" = \"v1\");");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@ -1140,4 +1144,10 @@ public class AlterTest {
|
||||
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Unknown table 'table1_error'",
|
||||
executor::execute);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModifySequenceCol() {
|
||||
String stmt = "alter table test.unique_sequence_col modify column v1 Date";
|
||||
alterTable(stmt, true);
|
||||
}
|
||||
}
|
||||
|
||||
@ -208,6 +208,15 @@ public class CreateTableTest {
|
||||
+ "distributed by hash(k2) buckets 1\n"
|
||||
+ "properties('replication_num' = '1');"));
|
||||
|
||||
// table with sequence col
|
||||
ExceptionChecker
|
||||
.expectThrowsNoException(() -> createTable("create table test.tbl13\n"
|
||||
+ "(k1 varchar(40), k2 int, v1 int)\n"
|
||||
+ "unique key(k1, k2)\n"
|
||||
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
|
||||
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n"
|
||||
+ "'function_column.sequence_col' = 'v1');"));
|
||||
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException("default_cluster:test");
|
||||
OlapTable tbl6 = (OlapTable) db.getTableOrDdlException("tbl6");
|
||||
Assert.assertTrue(tbl6.getColumn("k1").isKey());
|
||||
@ -224,6 +233,11 @@ public class CreateTableTest {
|
||||
Assert.assertTrue(tbl8.getColumn("k2").isKey());
|
||||
Assert.assertFalse(tbl8.getColumn("v1").isKey());
|
||||
Assert.assertTrue(tbl8.getColumn(Column.SEQUENCE_COL).getAggregationType() == AggregateType.REPLACE);
|
||||
|
||||
OlapTable tbl13 = (OlapTable) db.getTableOrDdlException("tbl13");
|
||||
Assert.assertTrue(tbl13.getColumn(Column.SEQUENCE_COL).getAggregationType() == AggregateType.REPLACE);
|
||||
Assert.assertTrue(tbl13.getColumn(Column.SEQUENCE_COL).getType() == Type.INT);
|
||||
Assert.assertEquals(tbl13.getSequenceMapCol(), "v1");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -288,6 +302,30 @@ public class CreateTableTest {
|
||||
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n"
|
||||
+ "'function_column.sequence_type' = 'double');"));
|
||||
|
||||
ExceptionChecker
|
||||
.expectThrowsWithMsg(DdlException.class, "The sequence_col and sequence_type cannot be set at the same time",
|
||||
() -> createTable("create table test.atbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n"
|
||||
+ "unique key(k1, k2)\n"
|
||||
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
|
||||
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n"
|
||||
+ "'function_column.sequence_type' = 'int', 'function_column.sequence_col' = 'v1');"));
|
||||
|
||||
ExceptionChecker
|
||||
.expectThrowsWithMsg(DdlException.class, "The specified sequence column[v3] not exists",
|
||||
() -> createTable("create table test.atbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n"
|
||||
+ "unique key(k1, k2)\n"
|
||||
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
|
||||
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n"
|
||||
+ "'function_column.sequence_col' = 'v3');"));
|
||||
|
||||
ExceptionChecker
|
||||
.expectThrowsWithMsg(DdlException.class, "Sequence type only support integer types and date types",
|
||||
() -> createTable("create table test.atbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n"
|
||||
+ "unique key(k1, k2)\n"
|
||||
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
|
||||
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n"
|
||||
+ "'function_column.sequence_col' = 'k1');"));
|
||||
|
||||
/**
|
||||
* create table with list partition
|
||||
*/
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !all --
|
||||
1 4 11
|
||||
2 5 12
|
||||
3 6 13
|
||||
|
||||
-- !all --
|
||||
1 2 14
|
||||
2 5 12
|
||||
3 6 13
|
||||
|
||||
-- !all --
|
||||
1 2 14
|
||||
15 8 19
|
||||
2 5 12
|
||||
3 6 13
|
||||
|
||||
-- !all --
|
||||
1 10 14 0 14
|
||||
15 8 19 0 19
|
||||
2 5 14 0 12
|
||||
3 6 11 0 13
|
||||
|
||||
-- !desc --
|
||||
k1 INT Yes true \N
|
||||
v1 TINYINT Yes false \N REPLACE
|
||||
v2 INT Yes false \N REPLACE
|
||||
__DORIS_DELETE_SIGN__ TINYINT No false 0 REPLACE
|
||||
__DORIS_SEQUENCE_COL__ INT Yes false \N REPLACE
|
||||
|
||||
-- !desc --
|
||||
k1 INT Yes true \N
|
||||
v1 TINYINT Yes false \N REPLACE
|
||||
vv2 INT Yes false \N REPLACE
|
||||
__DORIS_DELETE_SIGN__ TINYINT No false 0 REPLACE
|
||||
__DORIS_SEQUENCE_COL__ INT Yes false \N REPLACE
|
||||
|
||||
-- !all --
|
||||
1 10 14 0 14
|
||||
15 8 19 0 19
|
||||
2 5 14 0 12
|
||||
21 8 22 0 22
|
||||
23 9 24 0 24
|
||||
3 6 11 0 13
|
||||
|
||||
@ -0,0 +1,124 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_unique_table_new_sequence") {
|
||||
def tableName = "test_uniq_new_sequence"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
`k1` int NULL,
|
||||
`v1` tinyint NULL,
|
||||
`v2` int
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(k1)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
|
||||
PROPERTIES (
|
||||
"function_column.sequence_col" = "v2",
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"light_schema_change" = "true"
|
||||
);
|
||||
"""
|
||||
// load unique key
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'columns', 'k1,v1,v2'
|
||||
|
||||
file 'unique_key_data1.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(3, json.NumberTotalRows)
|
||||
assertEquals(3, json.NumberLoadedRows)
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
assertEquals(0, json.NumberUnselectedRows)
|
||||
}
|
||||
}
|
||||
sql "sync"
|
||||
order_qt_all "SELECT * from ${tableName}"
|
||||
|
||||
// load unique key
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'columns', 'k1,v1,v2'
|
||||
|
||||
file 'unique_key_data2.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(3, json.NumberTotalRows)
|
||||
assertEquals(3, json.NumberLoadedRows)
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
assertEquals(0, json.NumberUnselectedRows)
|
||||
}
|
||||
}
|
||||
sql "sync"
|
||||
|
||||
order_qt_all "SELECT * from ${tableName}"
|
||||
|
||||
sql "INSERT INTO ${tableName} values(15, 8, 19)"
|
||||
|
||||
sql "INSERT INTO ${tableName} values(15, 9, 18)"
|
||||
|
||||
sql "sync"
|
||||
|
||||
order_qt_all "SELECT * from ${tableName}"
|
||||
|
||||
sql "UPDATE ${tableName} SET v1 = 10 WHERE k1 = 1"
|
||||
|
||||
sql "UPDATE ${tableName} SET v2 = 14 WHERE k1 = 2"
|
||||
|
||||
sql "UPDATE ${tableName} SET v2 = 11 WHERE k1 = 3"
|
||||
|
||||
sql "SET show_hidden_columns=true"
|
||||
|
||||
sql "sync"
|
||||
|
||||
order_qt_all "SELECT * from ${tableName}"
|
||||
|
||||
qt_desc "desc ${tableName}"
|
||||
|
||||
sql "ALTER TABLE ${tableName} RENAME COLUMN v2 vv2"
|
||||
|
||||
qt_desc "desc ${tableName}"
|
||||
|
||||
sql "INSERT INTO ${tableName} values(21, 8, 22)"
|
||||
|
||||
sql "INSERT INTO ${tableName} values(23, 9, 24)"
|
||||
|
||||
sql "sync"
|
||||
|
||||
order_qt_all "SELECT * from ${tableName}"
|
||||
|
||||
sql "DROP TABLE ${tableName}"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user