[feature-wip](merge-on-write) MOW table support different primary keys and sort keys (#24788)
This commit is contained in:
@ -310,24 +310,19 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
*/
|
||||
if (KeysType.UNIQUE_KEYS == olapTable.getKeysType()) {
|
||||
List<Column> baseSchema = indexSchemaMap.get(baseIndexId);
|
||||
boolean isKey = false;
|
||||
for (Column column : baseSchema) {
|
||||
if (column.isKey() && column.getName().equalsIgnoreCase(dropColName)) {
|
||||
lightSchemaChange = false;
|
||||
isKey = true;
|
||||
break;
|
||||
if (column.getName().equalsIgnoreCase(dropColName)) {
|
||||
if (column.isKey()) {
|
||||
throw new DdlException("Can not drop key column in Unique data model table");
|
||||
} else if (column.isClusterKey()) {
|
||||
throw new DdlException("Can not drop cluster key column in Unique data model table");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isKey) {
|
||||
throw new DdlException("Can not drop key column in Unique data model table");
|
||||
}
|
||||
|
||||
if (olapTable.hasSequenceCol() && dropColName.equalsIgnoreCase(olapTable.getSequenceMapCol())) {
|
||||
throw new DdlException("Can not drop sequence mapping column[" + dropColName
|
||||
+ "] in Unique data model table[" + olapTable.getName() + "]");
|
||||
}
|
||||
|
||||
} else if (KeysType.AGG_KEYS == olapTable.getKeysType()) {
|
||||
if (null == targetIndexName) {
|
||||
// drop column in base table
|
||||
@ -595,6 +590,9 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
col.checkSchemaChangeAllowed(modColumn);
|
||||
lightSchemaChange = olapTable.getEnableLightSchemaChange();
|
||||
}
|
||||
if (col.isClusterKey()) {
|
||||
throw new DdlException("Can not modify cluster key column: " + col.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (hasColPos) {
|
||||
@ -808,6 +806,9 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
if (!column.isVisible()) {
|
||||
newSchema.add(column);
|
||||
}
|
||||
if (column.isClusterKey()) {
|
||||
throw new DdlException("Can not modify column order in Unique data model table");
|
||||
}
|
||||
}
|
||||
}
|
||||
if (newSchema.size() != targetIndexSchema.size()) {
|
||||
|
||||
@ -181,6 +181,7 @@ public class ColumnDef {
|
||||
private DefaultValue defaultValue;
|
||||
private String comment;
|
||||
private boolean visible;
|
||||
private int clusterKeyId = -1;
|
||||
|
||||
public ColumnDef(String name, TypeDef typeDef) {
|
||||
this(name, typeDef, false, null, false, false, DefaultValue.NOT_SET, "");
|
||||
@ -306,6 +307,10 @@ public class ColumnDef {
|
||||
return visible;
|
||||
}
|
||||
|
||||
public void setClusterKeyId(int clusterKeyId) {
|
||||
this.clusterKeyId = clusterKeyId;
|
||||
}
|
||||
|
||||
public void analyze(boolean isOlap) throws AnalysisException {
|
||||
if (name == null || typeDef == null) {
|
||||
throw new AnalysisException("No column name or column type in column definition.");
|
||||
@ -578,7 +583,8 @@ public class ColumnDef {
|
||||
}
|
||||
|
||||
return new Column(name, type, isKey, aggregateType, isAllowNull, isAutoInc, defaultValue.value, comment,
|
||||
visible, defaultValue.defaultValueExprDef, Column.COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue.getValue());
|
||||
visible, defaultValue.defaultValueExprDef, Column.COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue.getValue(),
|
||||
clusterKeyId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -450,6 +450,10 @@ public class CreateTableStmt extends DdlStmt {
|
||||
}
|
||||
|
||||
keysDesc.analyze(columnDefs);
|
||||
if (!CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnNames()) && !enableUniqueKeyMergeOnWrite) {
|
||||
throw new AnalysisException("Cluster keys only support unique keys table which enabled "
|
||||
+ PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE);
|
||||
}
|
||||
for (int i = 0; i < keysDesc.keysColumnSize(); ++i) {
|
||||
columnDefs.get(i).setIsKey(true);
|
||||
}
|
||||
|
||||
@ -32,6 +32,8 @@ import java.util.List;
|
||||
public class KeysDesc implements Writable {
|
||||
private KeysType type;
|
||||
private List<String> keysColumnNames;
|
||||
private List<String> clusterKeysColumnNames;
|
||||
private List<Integer> clusterKeysColumnIds = null;
|
||||
|
||||
public KeysDesc() {
|
||||
this.type = KeysType.AGG_KEYS;
|
||||
@ -43,6 +45,11 @@ public class KeysDesc implements Writable {
|
||||
this.keysColumnNames = keysColumnNames;
|
||||
}
|
||||
|
||||
public KeysDesc(KeysType type, List<String> keysColumnNames, List<String> clusterKeyColumnNames) {
|
||||
this(type, keysColumnNames);
|
||||
this.clusterKeysColumnNames = clusterKeyColumnNames;
|
||||
}
|
||||
|
||||
public KeysType getKeysType() {
|
||||
return type;
|
||||
}
|
||||
@ -51,6 +58,14 @@ public class KeysDesc implements Writable {
|
||||
return keysColumnNames.size();
|
||||
}
|
||||
|
||||
public List<String> getClusterKeysColumnNames() {
|
||||
return clusterKeysColumnNames;
|
||||
}
|
||||
|
||||
public List<Integer> getClusterKeysColumnIds() {
|
||||
return clusterKeysColumnIds;
|
||||
}
|
||||
|
||||
public boolean containsCol(String colName) {
|
||||
return keysColumnNames.contains(colName);
|
||||
}
|
||||
@ -68,6 +83,14 @@ public class KeysDesc implements Writable {
|
||||
throw new AnalysisException("The number of key columns should be less than the number of columns.");
|
||||
}
|
||||
|
||||
if (clusterKeysColumnNames != null) {
|
||||
if (type != KeysType.UNIQUE_KEYS) {
|
||||
throw new AnalysisException("Cluster keys only support unique keys table.");
|
||||
}
|
||||
clusterKeysColumnIds = Lists.newArrayList();
|
||||
analyzeClusterKeys(cols);
|
||||
}
|
||||
|
||||
for (int i = 0; i < keysColumnNames.size(); ++i) {
|
||||
String name = cols.get(i).getName();
|
||||
if (!keysColumnNames.get(i).equalsIgnoreCase(name)) {
|
||||
@ -100,6 +123,44 @@ public class KeysDesc implements Writable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (clusterKeysColumnNames != null) {
|
||||
int minKeySize = keysColumnNames.size() < clusterKeysColumnNames.size() ? keysColumnNames.size()
|
||||
: clusterKeysColumnNames.size();
|
||||
boolean sameKey = true;
|
||||
for (int i = 0; i < minKeySize; ++i) {
|
||||
if (!keysColumnNames.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
|
||||
sameKey = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (sameKey) {
|
||||
throw new AnalysisException("Unique keys and cluster keys should be different.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void analyzeClusterKeys(List<ColumnDef> cols) throws AnalysisException {
|
||||
for (int i = 0; i < clusterKeysColumnNames.size(); ++i) {
|
||||
String name = clusterKeysColumnNames.get(i);
|
||||
// check if key is duplicate
|
||||
for (int j = 0; j < i; j++) {
|
||||
if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) {
|
||||
throw new AnalysisException("Duplicate cluster key column[" + name + "].");
|
||||
}
|
||||
}
|
||||
// check if key exists and generate key column ids
|
||||
for (int j = 0; j < cols.size(); j++) {
|
||||
if (cols.get(j).getName().equalsIgnoreCase(name)) {
|
||||
cols.get(j).setClusterKeyId(clusterKeysColumnIds.size());
|
||||
clusterKeysColumnIds.add(j);
|
||||
break;
|
||||
}
|
||||
if (j == cols.size() - 1) {
|
||||
throw new AnalysisException("Key cluster column[" + name + "] doesn't exist.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public String toSql() {
|
||||
@ -114,6 +175,18 @@ public class KeysDesc implements Writable {
|
||||
i++;
|
||||
}
|
||||
stringBuilder.append(")");
|
||||
if (clusterKeysColumnNames != null) {
|
||||
stringBuilder.append("\nCLUSTER BY (");
|
||||
i = 0;
|
||||
for (String columnName : clusterKeysColumnNames) {
|
||||
if (i != 0) {
|
||||
stringBuilder.append(", ");
|
||||
}
|
||||
stringBuilder.append("`").append(columnName).append("`");
|
||||
i++;
|
||||
}
|
||||
stringBuilder.append(")");
|
||||
}
|
||||
return stringBuilder.toString();
|
||||
}
|
||||
|
||||
@ -132,6 +205,14 @@ public class KeysDesc implements Writable {
|
||||
for (String colName : keysColumnNames) {
|
||||
Text.writeString(out, colName);
|
||||
}
|
||||
if (clusterKeysColumnNames == null) {
|
||||
out.writeInt(0);
|
||||
} else {
|
||||
out.writeInt(clusterKeysColumnNames.size());
|
||||
for (String colName : clusterKeysColumnNames) {
|
||||
Text.writeString(out, colName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
@ -141,5 +222,12 @@ public class KeysDesc implements Writable {
|
||||
for (int i = 0; i < count; i++) {
|
||||
keysColumnNames.add(Text.readString(in));
|
||||
}
|
||||
count = in.readInt();
|
||||
if (count > 0) {
|
||||
clusterKeysColumnNames = Lists.newArrayList();
|
||||
for (int i = 0; i < count; i++) {
|
||||
clusterKeysColumnNames.add(Text.readString(in));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -128,6 +128,9 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
@SerializedName(value = "genericAggregationName")
|
||||
private String genericAggregationName;
|
||||
|
||||
@SerializedName(value = "clusterKeyId")
|
||||
private int clusterKeyId = -1;
|
||||
|
||||
private boolean isCompoundKey = false;
|
||||
|
||||
@SerializedName(value = "hasOnUpdateDefaultValue")
|
||||
@ -242,6 +245,14 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
|
||||
boolean isAutoInc, String defaultValue, String comment, boolean visible,
|
||||
DefaultValueExprDef defaultValueExprDef, int colUniqueId, String realDefaultValue, int clusterKeyId) {
|
||||
this(name, type, isKey, aggregateType, isAllowNull, isAutoInc, defaultValue, comment, visible,
|
||||
defaultValueExprDef, colUniqueId, realDefaultValue);
|
||||
this.clusterKeyId = clusterKeyId;
|
||||
}
|
||||
|
||||
public Column(Column column) {
|
||||
this.name = column.getName();
|
||||
this.type = column.type;
|
||||
@ -263,6 +274,7 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
this.defineName = column.getDefineName();
|
||||
this.hasOnUpdateDefaultValue = column.hasOnUpdateDefaultValue;
|
||||
this.onUpdateDefaultValueExprDef = column.onUpdateDefaultValueExprDef;
|
||||
this.clusterKeyId = column.getClusterKeyId();
|
||||
}
|
||||
|
||||
public void createChildrenColumn(Type type, Column column) {
|
||||
@ -551,6 +563,7 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
tColumn.addToChildrenColumn(column.toThrift());
|
||||
}
|
||||
}
|
||||
tColumn.setClusterKeyId(this.clusterKeyId);
|
||||
// ATTN:
|
||||
// Currently, this `toThrift()` method is only used from CreateReplicaTask.
|
||||
// And CreateReplicaTask does not need `defineExpr` field.
|
||||
@ -580,6 +593,7 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
if (tColumn.getAggregationType() != null) {
|
||||
childrenTColumn.setAggregationType(tColumn.getAggregationType());
|
||||
}
|
||||
childrenTColumn.setClusterKeyId(children.clusterKeyId);
|
||||
|
||||
tColumn.children_column.add(childrenTColumn);
|
||||
toChildrenThrift(children, childrenTColumn);
|
||||
@ -660,6 +674,7 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
&& (other.getDataType() == PrimitiveType.VARCHAR || other.getDataType() == PrimitiveType.STRING)) {
|
||||
return;
|
||||
}
|
||||
// TODO check cluster key
|
||||
}
|
||||
|
||||
public boolean nameEquals(String otherColName, boolean ignorePrefix) {
|
||||
@ -718,6 +733,14 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isClusterKey() {
|
||||
return clusterKeyId != -1;
|
||||
}
|
||||
|
||||
public int getClusterKeyId() {
|
||||
return clusterKeyId;
|
||||
}
|
||||
|
||||
public String toSql() {
|
||||
return toSql(false, false);
|
||||
}
|
||||
@ -811,7 +834,7 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
public int hashCode() {
|
||||
return Objects.hash(name, getDataType(), getStrLen(), getPrecision(), getScale(), aggregationType,
|
||||
isAggregationTypeImplicit, isKey, isAllowNull, isAutoInc, defaultValue, comment, children, visible,
|
||||
realDefaultValue);
|
||||
realDefaultValue, clusterKeyId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -839,7 +862,8 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
&& Objects.equals(comment, other.comment)
|
||||
&& visible == other.visible
|
||||
&& Objects.equals(children, other.children)
|
||||
&& Objects.equals(realDefaultValue, other.realDefaultValue);
|
||||
&& Objects.equals(realDefaultValue, other.realDefaultValue)
|
||||
&& clusterKeyId == other.clusterKeyId;
|
||||
}
|
||||
|
||||
// distribution column compare only care about attrs which affect data,
|
||||
@ -861,20 +885,22 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
&& getScale() == other.getScale()
|
||||
&& visible == other.visible
|
||||
&& Objects.equals(children, other.children)
|
||||
&& Objects.equals(realDefaultValue, other.realDefaultValue);
|
||||
&& Objects.equals(realDefaultValue, other.realDefaultValue)
|
||||
&& clusterKeyId == other.clusterKeyId;
|
||||
|
||||
if (!ok) {
|
||||
LOG.info("this column: name {} default value {} aggregationType {} isAggregationTypeImplicit {} "
|
||||
+ "isKey {}, isAllowNull {}, datatype {}, strlen {}, precision {}, scale {}, visible {} "
|
||||
+ "children {} realDefaultValue {}",
|
||||
name, getDefaultValue(), aggregationType, isAggregationTypeImplicit, isKey, isAllowNull,
|
||||
getDataType(), getStrLen(), getPrecision(), getScale(), visible, children, realDefaultValue);
|
||||
+ "isKey {}, isAllowNull {}, datatype {}, strlen {}, precision {}, scale {}, visible {} "
|
||||
+ "children {}, realDefaultValue {}, clusterKeyId {}",
|
||||
name, getDefaultValue(), aggregationType, isAggregationTypeImplicit, isKey, isAllowNull,
|
||||
getDataType(), getStrLen(), getPrecision(), getScale(), visible, children, realDefaultValue,
|
||||
clusterKeyId);
|
||||
LOG.info("other column: name {} default value {} aggregationType {} isAggregationTypeImplicit {} "
|
||||
+ "isKey {}, isAllowNull {}, datatype {}, strlen {}, precision {}, scale {}, visible {} "
|
||||
+ "children {} realDefaultValue {}",
|
||||
other.name, other.getDefaultValue(), other.aggregationType, other.isAggregationTypeImplicit,
|
||||
other.isKey, other.isAllowNull, other.getDataType(), other.getStrLen(), other.getPrecision(),
|
||||
other.getScale(), other.visible, other.children, other.realDefaultValue);
|
||||
+ "isKey {}, isAllowNull {}, datatype {}, strlen {}, precision {}, scale {}, visible {}, "
|
||||
+ "children {}, realDefaultValue {}, clusterKeyId {}",
|
||||
other.name, other.getDefaultValue(), other.aggregationType, other.isAggregationTypeImplicit,
|
||||
other.isKey, other.isAllowNull, other.getDataType(), other.getStrLen(), other.getPrecision(),
|
||||
other.getScale(), other.visible, other.children, other.realDefaultValue, other.clusterKeyId);
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
@ -943,6 +969,7 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
sb.append(isKey);
|
||||
sb.append(isAllowNull);
|
||||
sb.append(aggregationType);
|
||||
sb.append(clusterKeyId);
|
||||
sb.append(defaultValue == null ? "" : defaultValue);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@ -295,6 +295,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -3021,12 +3022,21 @@ public class Env {
|
||||
: keySql.substring("DUPLICATE ".length()))
|
||||
.append("(");
|
||||
List<String> keysColumnNames = Lists.newArrayList();
|
||||
Map<Integer, String> clusterKeysColumnNamesToId = new TreeMap<>();
|
||||
for (Column column : olapTable.getBaseSchema()) {
|
||||
if (column.isKey()) {
|
||||
keysColumnNames.add("`" + column.getName() + "`");
|
||||
}
|
||||
if (column.isClusterKey()) {
|
||||
clusterKeysColumnNamesToId.put(column.getClusterKeyId(), column.getName());
|
||||
}
|
||||
}
|
||||
sb.append(Joiner.on(", ").join(keysColumnNames)).append(")");
|
||||
// show cluster keys
|
||||
if (!clusterKeysColumnNamesToId.isEmpty()) {
|
||||
sb.append("\n").append("CLUSTER BY (`");
|
||||
sb.append(Joiner.on("`, `").join(clusterKeysColumnNamesToId.values())).append("`)");
|
||||
}
|
||||
}
|
||||
|
||||
if (specificVersion != -1) {
|
||||
@ -3952,15 +3962,22 @@ public class Env {
|
||||
public static short calcShortKeyColumnCount(List<Column> columns, Map<String, String> properties,
|
||||
boolean isKeysRequired) throws DdlException {
|
||||
List<Column> indexColumns = new ArrayList<Column>();
|
||||
Map<Integer, Column> clusterColumns = new TreeMap<>();
|
||||
for (Column column : columns) {
|
||||
if (column.isKey()) {
|
||||
indexColumns.add(column);
|
||||
}
|
||||
if (column.isClusterKey()) {
|
||||
clusterColumns.put(column.getClusterKeyId(), column);
|
||||
}
|
||||
}
|
||||
LOG.debug("index column size: {}", indexColumns.size());
|
||||
LOG.debug("index column size: {}, cluster column size: {}", indexColumns.size(), clusterColumns.size());
|
||||
if (isKeysRequired) {
|
||||
Preconditions.checkArgument(indexColumns.size() > 0);
|
||||
}
|
||||
// sort by cluster keys for mow if set, otherwise by index columns
|
||||
List<Column> sortKeyColumns = clusterColumns.isEmpty() ? indexColumns
|
||||
: clusterColumns.values().stream().collect(Collectors.toList());
|
||||
|
||||
// figure out shortKeyColumnCount
|
||||
short shortKeyColumnCount = (short) -1;
|
||||
@ -3975,12 +3992,12 @@ public class Env {
|
||||
throw new DdlException("Invalid short key: " + shortKeyColumnCount);
|
||||
}
|
||||
|
||||
if (shortKeyColumnCount > indexColumns.size()) {
|
||||
throw new DdlException("Short key is too large. should less than: " + indexColumns.size());
|
||||
if (shortKeyColumnCount > sortKeyColumns.size()) {
|
||||
throw new DdlException("Short key is too large. should less than: " + sortKeyColumns.size());
|
||||
}
|
||||
|
||||
for (int pos = 0; pos < shortKeyColumnCount; pos++) {
|
||||
if (indexColumns.get(pos).getDataType() == PrimitiveType.VARCHAR && pos != shortKeyColumnCount - 1) {
|
||||
if (sortKeyColumns.get(pos).getDataType() == PrimitiveType.VARCHAR && pos != shortKeyColumnCount - 1) {
|
||||
throw new DdlException("Varchar should not in the middle of short keys.");
|
||||
}
|
||||
}
|
||||
@ -3995,9 +4012,9 @@ public class Env {
|
||||
*/
|
||||
shortKeyColumnCount = 0;
|
||||
int shortKeySizeByte = 0;
|
||||
int maxShortKeyColumnCount = Math.min(indexColumns.size(), FeConstants.shortkey_max_column_count);
|
||||
int maxShortKeyColumnCount = Math.min(sortKeyColumns.size(), FeConstants.shortkey_max_column_count);
|
||||
for (int i = 0; i < maxShortKeyColumnCount; i++) {
|
||||
Column column = indexColumns.get(i);
|
||||
Column column = sortKeyColumns.get(i);
|
||||
shortKeySizeByte += column.getOlapColumnIndexSize();
|
||||
if (shortKeySizeByte > FeConstants.shortkey_maxsize_bytes) {
|
||||
if (column.getDataType().isCharFamily()) {
|
||||
@ -4020,6 +4037,18 @@ public class Env {
|
||||
|
||||
} // end calc shortKeyColumnCount
|
||||
|
||||
if (clusterColumns.size() > 0 && shortKeyColumnCount < clusterColumns.size()) {
|
||||
boolean sameKey = true;
|
||||
for (int i = 0; i < shortKeyColumnCount; i++) {
|
||||
if (!clusterColumns.get(i).getName().equals(indexColumns.get(i).getName())) {
|
||||
sameKey = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (sameKey) {
|
||||
throw new DdlException(shortKeyColumnCount + " short keys is a part of unique keys");
|
||||
}
|
||||
}
|
||||
return shortKeyColumnCount;
|
||||
}
|
||||
|
||||
|
||||
@ -188,6 +188,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
@ -1565,7 +1566,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
olapTable.getTimeSeriesCompactionFileCountThreshold(),
|
||||
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
|
||||
olapTable.storeRowColumn(),
|
||||
binlogConfig, dataProperty.isStorageMediumSpecified());
|
||||
binlogConfig, dataProperty.isStorageMediumSpecified(), null);
|
||||
// TODO cluster key ids
|
||||
|
||||
// check again
|
||||
olapTable = db.getOlapTableOrDdlException(tableName);
|
||||
@ -1819,7 +1821,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
String compactionPolicy, Long timeSeriesCompactionGoalSizeMbytes,
|
||||
Long timeSeriesCompactionFileCountThreshold, Long timeSeriesCompactionTimeThresholdSeconds,
|
||||
boolean storeRowColumn, BinlogConfig binlogConfig,
|
||||
boolean isStorageMediumSpecified) throws DdlException {
|
||||
boolean isStorageMediumSpecified, List<Integer> clusterKeyIndexes) throws DdlException {
|
||||
// create base index first.
|
||||
Preconditions.checkArgument(baseIndexId != -1);
|
||||
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
|
||||
@ -1887,6 +1889,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
storeRowColumn, binlogConfig);
|
||||
|
||||
task.setStorageFormat(storageFormat);
|
||||
task.setClusterKeyIndexes(clusterKeyIndexes);
|
||||
batchTask.addTask(task);
|
||||
// add to AgentTaskQueue for handling finish report.
|
||||
// not for resending task
|
||||
@ -2154,8 +2157,10 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
olapTable.setCompressionType(compressionType);
|
||||
|
||||
// check data sort properties
|
||||
int keyColumnSize = CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnIds()) ? keysDesc.keysColumnSize() :
|
||||
keysDesc.getClusterKeysColumnIds().size();
|
||||
DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
|
||||
keysDesc.keysColumnSize(), storageFormat);
|
||||
keyColumnSize, storageFormat);
|
||||
olapTable.setDataSortInfo(dataSortInfo);
|
||||
|
||||
boolean enableUniqueKeyMergeOnWrite = false;
|
||||
@ -2484,7 +2489,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
olapTable.getTimeSeriesCompactionFileCountThreshold(),
|
||||
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
|
||||
storeRowColumn, binlogConfigForTask,
|
||||
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
|
||||
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(),
|
||||
keysDesc.getClusterKeysColumnIds());
|
||||
olapTable.addPartition(partition);
|
||||
} else if (partitionInfo.getType() == PartitionType.RANGE
|
||||
|| partitionInfo.getType() == PartitionType.LIST) {
|
||||
@ -2559,7 +2565,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
olapTable.getTimeSeriesCompactionFileCountThreshold(),
|
||||
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
|
||||
storeRowColumn, binlogConfigForTask,
|
||||
dataProperty.isStorageMediumSpecified());
|
||||
dataProperty.isStorageMediumSpecified(), keysDesc.getClusterKeysColumnIds());
|
||||
olapTable.addPartition(partition);
|
||||
olapTable.getPartitionInfo().getDataProperty(partition.getId())
|
||||
.setStoragePolicy(partionStoragePolicy);
|
||||
@ -2961,6 +2967,14 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
Set<Long> tabletIdSet = Sets.newHashSet();
|
||||
long bufferSize = IdGeneratorUtil.getBufferSizeForTruncateTable(copiedTbl, origPartitions.values());
|
||||
IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
|
||||
Map<Integer, Integer> clusterKeyMap = new TreeMap<>();
|
||||
for (int i = 0; i < olapTable.getBaseSchema().size(); i++) {
|
||||
Column column = olapTable.getBaseSchema().get(i);
|
||||
if (column.getClusterKeyId() != -1) {
|
||||
clusterKeyMap.put(column.getClusterKeyId(), i);
|
||||
}
|
||||
}
|
||||
List<Integer> clusterKeyIdxes = clusterKeyMap.values().stream().collect(Collectors.toList());
|
||||
try {
|
||||
for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
|
||||
// the new partition must use new id
|
||||
@ -2986,7 +3000,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
olapTable.getTimeSeriesCompactionFileCountThreshold(),
|
||||
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
|
||||
olapTable.storeRowColumn(), binlogConfig,
|
||||
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified());
|
||||
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified(),
|
||||
clusterKeyIdxes);
|
||||
newPartitions.add(newPartition);
|
||||
}
|
||||
} catch (DdlException e) {
|
||||
|
||||
@ -202,6 +202,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -2333,17 +2334,30 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
if (sortExprs.size() > olapTable.getDataSortInfo().getColNum()) {
|
||||
return false;
|
||||
}
|
||||
List<Column> sortKeyColumns = olapTable.getFullSchema();
|
||||
if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
|
||||
Map<Integer, Column> clusterKeyMap = new TreeMap<>();
|
||||
for (Column column : olapTable.getFullSchema()) {
|
||||
if (column.getClusterKeyId() != -1) {
|
||||
clusterKeyMap.put(column.getClusterKeyId(), column);
|
||||
}
|
||||
}
|
||||
if (!clusterKeyMap.isEmpty()) {
|
||||
sortKeyColumns.clear();
|
||||
sortKeyColumns.addAll(clusterKeyMap.values());
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < sortExprs.size(); i++) {
|
||||
// table key.
|
||||
Column tableKey = olapTable.getFullSchema().get(i);
|
||||
// sort key.
|
||||
Column sortColumn = sortKeyColumns.get(i);
|
||||
// sort slot.
|
||||
Expr sortExpr = sortExprs.get(i);
|
||||
if (sortExpr instanceof SlotRef) {
|
||||
SlotRef slotRef = (SlotRef) sortExpr;
|
||||
if (tableKey.equals(slotRef.getColumn())) {
|
||||
if (sortColumn.equals(slotRef.getColumn())) {
|
||||
// ORDER BY DESC NULLS FIRST can not be optimized to only read file tail,
|
||||
// since NULLS is at file head but data is at tail
|
||||
if (tableKey.isAllowNull() && nullsFirsts.get(i) && !isAscOrders.get(i)) {
|
||||
if (sortColumn.isAllowNull() && nullsFirsts.get(i) && !isAscOrders.get(i)) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -1391,6 +1391,10 @@ public class OlapScanNode extends ScanNode {
|
||||
}
|
||||
|
||||
msg.node_type = TPlanNodeType.OLAP_SCAN_NODE;
|
||||
if (olapTable.getBaseSchema().stream().anyMatch(Column::isClusterKey)) {
|
||||
keyColumnNames.clear();
|
||||
keyColumnTypes.clear();
|
||||
}
|
||||
msg.olap_scan_node = new TOlapScanNode(desc.getId().asInt(), keyColumnNames, keyColumnTypes, isPreAggregation);
|
||||
msg.olap_scan_node.setColumnsDesc(columnsDesc);
|
||||
msg.olap_scan_node.setIndexesDesc(indexDesc);
|
||||
|
||||
@ -114,6 +114,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
private boolean storeRowColumn;
|
||||
|
||||
private BinlogConfig binlogConfig;
|
||||
private List<Integer> clusterKeyIndexes;
|
||||
|
||||
public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
|
||||
long replicaId, short shortKeyColumnCount, int schemaHash, long version,
|
||||
@ -220,6 +221,10 @@ public class CreateReplicaTask extends AgentTask {
|
||||
this.storageFormat = storageFormat;
|
||||
}
|
||||
|
||||
public void setClusterKeyIndexes(List<Integer> clusterKeyIndexes) {
|
||||
this.clusterKeyIndexes = clusterKeyIndexes;
|
||||
}
|
||||
|
||||
public TCreateTabletReq toThrift() {
|
||||
TCreateTabletReq createTabletReq = new TCreateTabletReq();
|
||||
createTabletReq.setTabletId(tabletId);
|
||||
@ -265,7 +270,10 @@ public class CreateReplicaTask extends AgentTask {
|
||||
tSchema.setDeleteSignIdx(deleteSign);
|
||||
tSchema.setSequenceColIdx(sequenceCol);
|
||||
tSchema.setVersionColIdx(versionCol);
|
||||
|
||||
if (!CollectionUtils.isEmpty(clusterKeyIndexes)) {
|
||||
tSchema.setClusterKeyIdxes(clusterKeyIndexes);
|
||||
LOG.debug("cluster key index={}, table_id={}, tablet_id={}", clusterKeyIndexes, tableId, tabletId);
|
||||
}
|
||||
if (CollectionUtils.isNotEmpty(indexes)) {
|
||||
List<TOlapTableIndex> tIndexes = new ArrayList<>();
|
||||
for (Index index : indexes) {
|
||||
|
||||
Reference in New Issue
Block a user