From 93a3577baa27c8ce08d4eb9c4a2d2ee0bb930a8e Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Mon, 5 Aug 2019 16:16:43 +0800 Subject: [PATCH] Support multi partition column when creating table (#1574) When creating table with OLAP engine, use can specify multi parition columns. eg: PARTITION BY RANGE(`date`, `id`) ( PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"), PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"), PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01") ) Notice that load by hadoop cluster does not support multi parition column table. --- be/src/exec/tablet_info.cpp | 47 +++++++++++++++---- be/src/exec/tablet_info.h | 32 ++++++++----- .../cn/getting-started/data-partition.md | 40 +++++++++++++++- .../help/Contents/Data Definition/ddl_stmt.md | 7 +-- .../doris/analysis/CreateTableStmt.java | 4 -- .../doris/analysis/RangePartitionDesc.java | 6 +-- .../apache/doris/catalog/PartitionInfo.java | 6 +++ .../apache/doris/catalog/PartitionKey.java | 18 ++++--- .../doris/catalog/RangePartitionInfo.java | 11 +++-- .../main/java/org/apache/doris/load/Load.java | 27 +++++++---- .../apache/doris/load/PullLoadSourceInfo.java | 1 + .../doris/load/loadv2/BrokerLoadJob.java | 2 +- .../org/apache/doris/load/loadv2/LoadJob.java | 5 +- .../apache/doris/planner/DataSplitSink.java | 4 +- .../apache/doris/planner/OlapTableSink.java | 20 +++++--- .../doris/planner/RangePartitionPruner.java | 3 ++ .../java/org/apache/doris/qe/DdlExecutor.java | 2 +- .../apache/doris/task/PullLoadEtlTask.java | 4 +- .../org/apache/doris/task/PullLoadJob.java | 2 + .../org/apache/doris/task/PullLoadJobMgr.java | 1 + .../doris/task/PullLoadPendingTask.java | 1 + .../org/apache/doris/task/PullLoadTask.java | 1 + .../doris/task/PullLoadTaskPlanner.java | 3 +- .../doris/load/loadv2/BrokerLoadJobTest.java | 2 +- gensrc/thrift/Descriptors.thrift | 7 +++ 25 files changed, 185 insertions(+), 71 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index c4fc5b3b5c..2ad37491f7 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -150,7 +150,6 @@ OlapTablePartitionParam::OlapTablePartitionParam( std::shared_ptr schema, const TOlapTablePartitionParam& t_param) : _schema(schema), _t_param(t_param), - _partition_slot_desc(nullptr), _mem_tracker(new MemTracker()), _mem_pool(new MemPool(_mem_tracker.get())) { } @@ -170,11 +169,23 @@ Status OlapTablePartitionParam::init() { ss << "partition column not found, column=" << _t_param.partition_column; return Status::InternalError(ss.str()); } - _partition_slot_desc = it->second; + _partition_slot_descs.push_back(it->second); + } else { + DCHECK(_t_param.__isset.partition_columns); + for (auto& part_col : _t_param.partition_columns) { + auto it = slots_map.find(part_col); + if (it == std::end(slots_map)) { + std::stringstream ss; + ss << "partition column not found, column=" << part_col; + return Status::InternalError(ss.str()); + } + _partition_slot_descs.push_back(it->second); + } } + _partitions_map.reset( new std::map( - OlapTablePartKeyComparator(_partition_slot_desc))); + OlapTablePartKeyComparator(_partition_slot_descs))); if (_t_param.__isset.distributed_columns) { for (auto& col : _t_param.distributed_columns) { auto it = slots_map.find(col); @@ -191,12 +202,22 @@ Status OlapTablePartitionParam::init() { const TOlapTablePartition& t_part = _t_param.partitions[i]; OlapTablePartition* part = _obj_pool.add(new OlapTablePartition()); part->id = t_part.id; + if (t_part.__isset.start_key) { - RETURN_IF_ERROR(_create_partition_key(t_part.start_key, &part->start_key)); + // deprecated, use start_keys instead + std::vector exprs = { t_part.start_key }; + RETURN_IF_ERROR(_create_partition_keys(exprs, &part->start_key)); + } else if (t_part.__isset.start_keys) { + RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part->start_key)); } if (t_part.__isset.end_key) { - RETURN_IF_ERROR(_create_partition_key(t_part.end_key, &part->end_key)); + // deprecated, use end_keys instead + std::vector exprs = { t_part.end_key }; + RETURN_IF_ERROR(_create_partition_keys(exprs, &part->end_key)); + } else if (t_part.__isset.end_keys) { + RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part->end_key)); } + part->num_buckets = t_part.num_buckets; auto num_indexes = _schema->indexes().size(); if (t_part.indexes.size() != num_indexes) { @@ -242,10 +263,19 @@ bool OlapTablePartitionParam::find_tablet(Tuple* tuple, return false; } -Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, Tuple** part_key) { +Status OlapTablePartitionParam::_create_partition_keys(const std::vector& t_exprs, Tuple** part_key) { Tuple* tuple = (Tuple*)_mem_pool->allocate(_schema->tuple_desc()->byte_size()); - void* slot = tuple->get_slot(_partition_slot_desc->tuple_offset()); - tuple->set_not_null(_partition_slot_desc->null_indicator_offset()); + for (int i = 0; i < t_exprs.size(); i++) { + const TExprNode& t_expr = t_exprs[i]; + RETURN_IF_ERROR(_create_partition_key(t_expr, tuple, _partition_slot_descs[i])); + } + *part_key = tuple; + return Status::OK(); +} + +Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, Tuple* tuple, SlotDescriptor* slot_desc) { + void* slot = tuple->get_slot(slot_desc->tuple_offset()); + tuple->set_not_null(slot_desc->null_indicator_offset()); switch (t_expr.node_type) { case TExprNodeType::DATE_LITERAL: { if (!reinterpret_cast(slot)->from_date_str( @@ -293,7 +323,6 @@ Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, T return Status::InternalError(ss.str()); } } - *part_key = tuple; return Status::OK(); } diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 6ba2b99e2e..ffef971f8b 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -104,7 +104,8 @@ struct OlapTablePartition { class OlapTablePartKeyComparator { public: - OlapTablePartKeyComparator(SlotDescriptor* slot_desc) : _slot_desc(slot_desc) { } + OlapTablePartKeyComparator(const std::vector& slot_descs) : + _slot_descs(slot_descs) { } // return true if lhs < rhs // 'nullptr' is max value, but 'null' is min value bool operator()(const Tuple* lhs, const Tuple* rhs) const { @@ -114,16 +115,23 @@ public: return true; } - bool lhs_null = lhs->is_null(_slot_desc->null_indicator_offset()); - bool rhs_null = rhs->is_null(_slot_desc->null_indicator_offset()); - if (lhs_null || rhs_null) { return !rhs_null; } + for (auto slot_desc : _slot_descs) { + bool lhs_null = lhs->is_null(slot_desc->null_indicator_offset()); + bool rhs_null = rhs->is_null(slot_desc->null_indicator_offset()); + if (lhs_null && rhs_null) { continue; } + if (lhs_null || rhs_null) { return !rhs_null; } - auto lhs_value = lhs->get_slot(_slot_desc->tuple_offset()); - auto rhs_value = rhs->get_slot(_slot_desc->tuple_offset()); - return RawValue::lt(lhs_value, rhs_value, _slot_desc->type()); + auto lhs_value = lhs->get_slot(slot_desc->tuple_offset()); + auto rhs_value = rhs->get_slot(slot_desc->tuple_offset()); + + int res = RawValue::compare(lhs_value, rhs_value, slot_desc->type()); + if (res != 0) { return res < 0; } + } + // equal, return false + return false; } private: - SlotDescriptor* _slot_desc; + std::vector _slot_descs; }; // store an olap table's tablet information @@ -150,7 +158,9 @@ public: } std::string debug_string() const; private: - Status _create_partition_key(const TExprNode& t_expr, Tuple** part_key); + Status _create_partition_keys(const std::vector& t_exprs, Tuple** part_key); + + Status _create_partition_key(const TExprNode& t_expr, Tuple* tuple, SlotDescriptor* slot_desc); uint32_t _compute_dist_hash(Tuple* key) const; @@ -160,7 +170,7 @@ private: // start_key is nullptr means the lower bound is boundless return true; } - OlapTablePartKeyComparator comparator(_partition_slot_desc); + OlapTablePartKeyComparator comparator(_partition_slot_descs); return !comparator(key, part->start_key); } private: @@ -168,7 +178,7 @@ private: std::shared_ptr _schema; TOlapTablePartitionParam _t_param; - SlotDescriptor* _partition_slot_desc; + std::vector _partition_slot_descs; std::vector _distributed_slot_descs; ObjectPool _obj_pool; diff --git a/docs/documentation/cn/getting-started/data-partition.md b/docs/documentation/cn/getting-started/data-partition.md index bb4f103cda..0224af1625 100644 --- a/docs/documentation/cn/getting-started/data-partition.md +++ b/docs/documentation/cn/getting-started/data-partition.md @@ -86,7 +86,7 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的 1. Partition - * Partition 列只能指定一列。且必须为 KEY 列。 + * Partition 列可以指定一列或多列。分区类必须为 KEY 列。多列分区的使用方式在后面 **多列分区** 小结介绍。 * Partition 的区间界限是左闭右开。比如如上示例,如果想在 p201702 存储所有2月份的数据,则分区值需要输入 "2017-03-01",即范围为:[2017-02-01, 2017-03-01)。 * 不论分区列是什么类型,在写分区值时,都需要加双引号。 * 分区列通常为时间列,以方便的管理新旧数据。 @@ -156,7 +156,7 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的 * 如果使用了 Partition,则 `DISTRIBUTED ...` 语句描述的是数据在**各个分区内**的划分规则。如果不使用 Partition,则描述的是对整个表的数据的划分规则。 * 分桶列可以是多列,但必须为 Key 列。分桶列可以和 Partition 列相同或不同。 - * 分桶列的选择,是在 **查询吞吐** 和 **查询延迟** 之间的一种权衡: + * 分桶列的选择,是在 **查询吞吐** 和 **查询并发** 之间的一种权衡: 1. 如果选择多个分桶列,则数据分布更均匀。但如果查询条件不包含所有分桶列的等值条件的话,一个查询会扫描所有分桶。这样查询的吞吐会增加,但是单个查询的延迟也会增加。这个方式适合大吞吐低并发的查询场景。 2. 如果仅选择一个或少数分桶列,则点查询可以仅查询一个分桶。这种方式适合高并发的点查询场景。 @@ -174,6 +174,42 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的 * 举一些例子:假设在有10台BE,每台BE一块磁盘的情况下。如果一个表总大小为 500MB,则可以考虑4-8个分片。5GB:8-16个。50GB:32个。500GB:建议分区,每个分区大小在 50GB 左右,每个分区16-32个分片。5TB:建议分区,每个分区大小在 50GB 左右,每个分区16-32个分片。 > 注:表的数据量可以通过 `show data` 命令查看,结果除以副本数,即表的数据量。 + +#### 多列分区 + +Doris 支持指定多列作为分区列,示例如下: + +``` +PARTITION BY RANGE(`date`, `id`) +( + PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"), + PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"), + PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01") +) +``` + +在以上示例中,我们指定 `date`(DATE 类型) 和 `id`(INT 类型) 作为分区列。以上示例最终得到的分区如下: + +``` +* p201701_1000: [(MIN_VALUE, MIN_VALUE), ("2017-02-01", "1000") ) +* p201702_2000: [("2017-02-01", "1000"), ("2017-03-01", "2000") ) +* p201703_all: [("2017-03-01", "2000"), ("2017-04-01", MIN_VALUE)) +``` + +注意,最后一个分区用户缺省只指定了 `date` 列的分区值,所以 `id` 列的分区值会默认填充 `MIN_VALUE`。当用户插入数据时,分区列值会按照顺序依次比较,最终得到对应的分区。举例如下: + +``` +* 数据 --> 分区 +* 2017-01-01, 200 --> p201701_1000 +* 2017-01-01, 2000 --> p201701_1000 +* 2017-02-01, 100 --> p201701_1000 +* 2017-02-01, 2000 --> p201702_2000 +* 2017-02-15, 5000 --> p201702_2000 +* 2017-03-01, 2000 --> p201703_all +* 2017-03-10, 1 --> p201703_all +* 2017-04-01, 1000 --> 无法导入 +* 2017-05-01, 1000 --> 无法导入 +``` ### PROPERTIES diff --git a/docs/help/Contents/Data Definition/ddl_stmt.md b/docs/help/Contents/Data Definition/ddl_stmt.md index e723d24348..147511f1e3 100644 --- a/docs/help/Contents/Data Definition/ddl_stmt.md +++ b/docs/help/Contents/Data Definition/ddl_stmt.md @@ -139,10 +139,10 @@ 4. partition_desc 1) Range 分区 语法: - PARTITION BY RANGE (k1) + PARTITION BY RANGE (k1, k2, ...) ( - PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1") - PARTITION partition_name VALUES LESS THAN MAXVALUE|("value2") + PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1", "value2", ...) + PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1", "value2", ...) ... ) 说明: @@ -152,6 +152,7 @@ TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE, DATETIME 3) 分区为左闭右开区间,首个分区的左边界为做最小值 4) NULL 值只会存放在包含最小值的分区中。当包含最小值的分区被删除后,NULL 值将无法导入。 + 5) 可以指定一列或多列作为分区列。如果分区值缺省,则会默认填充最小值。 注意: 1) 分区一般用于时间维度的数据管理 diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 041f33df9a..bb43b9b482 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -287,10 +287,6 @@ public class CreateTableStmt extends DdlStmt { } RangePartitionDesc rangePartitionDesc = (RangePartitionDesc) partitionDesc; - if (rangePartitionDesc.getPartitionColNames().size() != 1) { - throw new AnalysisException("Only allow partitioned by one column"); - } - rangePartitionDesc.analyze(columnDefs, properties); } diff --git a/fe/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java b/fe/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java index 5360f9a12f..3e539e4da9 100644 --- a/fe/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java +++ b/fe/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java @@ -173,9 +173,9 @@ public class RangePartitionDesc extends PartitionDesc { * VALUE LESS THEN (80) * * key range is: - * ( {MIN, MIN, MIN}, {10, 100, 1000} ) - * [ {10, 100, 500}, {50, 500, ? } ) - * [ {50, 500, ? }, {80, ?, ? } ) + * ( {MIN, MIN, MIN}, {10, 100, 1000} ) + * [ {10, 100, 1000}, {50, 500, MIN } ) + * [ {50, 500, MIN }, {80, MIN, MIN } ) */ RangePartitionInfo rangePartitionInfo = new RangePartitionInfo(partitionColumns); for (SingleRangePartitionDesc desc : singleRangePartitionDescs) { diff --git a/fe/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/src/main/java/org/apache/doris/catalog/PartitionInfo.java index 3028ce8ade..b0d98b7e10 100644 --- a/fe/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -38,6 +38,8 @@ public class PartitionInfo implements Writable { protected Map idToDataProperty; // partition id -> replication num protected Map idToReplicationNum; + // true if the partition has multi partition columns + protected boolean isMultiColumnPartition = false; public PartitionInfo() { // for persist @@ -87,6 +89,10 @@ public class PartitionInfo implements Writable { return partitionInfo; } + public boolean isMultiColumnPartition() { + return isMultiColumnPartition; + } + public String toSql(OlapTable table, List partitionId) { return ""; } diff --git a/fe/src/main/java/org/apache/doris/catalog/PartitionKey.java b/fe/src/main/java/org/apache/doris/catalog/PartitionKey.java index 0feb555411..f34633ec1c 100644 --- a/fe/src/main/java/org/apache/doris/catalog/PartitionKey.java +++ b/fe/src/main/java/org/apache/doris/catalog/PartitionKey.java @@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -75,8 +76,7 @@ public class PartitionKey implements Comparable, Writable { // fill the vacancy with MIN for (; i < columns.size(); ++i) { Type type = Type.fromPrimitiveType(columns.get(i).getDataType()); - partitionKey.keys.add( - LiteralExpr.createInfinity(type, false)); + partitionKey.keys.add(LiteralExpr.createInfinity(type, false)); partitionKey.types.add(columns.get(i).getDataType()); } @@ -127,8 +127,8 @@ public class PartitionKey implements Comparable, Writable { return true; } - @Override // compare with other PartitionKey. used for partition prune + @Override public int compareTo(PartitionKey other) { int this_key_len = this.keys.size(); int other_key_len = other.keys.size(); @@ -184,10 +184,10 @@ public class PartitionKey implements Comparable, Writable { value = dateLiteral.toSql(); } } - if (keys.size() - 1 == i) { - strBuilder.append("(").append(value).append(")"); - } else { - strBuilder.append("(").append(value).append("), "); + strBuilder.append(value); + + if (keys.size() - 1 != i) { + strBuilder.append(", "); } i++; } @@ -198,9 +198,7 @@ public class PartitionKey implements Comparable, Writable { public String toString() { StringBuilder builder = new StringBuilder(); builder.append("types: ["); - for (PrimitiveType type : types) { - builder.append(type.toString()); - } + builder.append(Joiner.on(", ").join(types)); builder.append("]; "); builder.append("keys: ["); diff --git a/fe/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java b/fe/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java index 118cb22ea6..404ed8c84e 100644 --- a/fe/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java +++ b/fe/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java @@ -73,6 +73,7 @@ public class RangePartitionInfo extends PartitionInfo { super(PartitionType.RANGE); this.partitionColumns = partitionColumns; this.idToRange = new HashMap>(); + this.isMultiColumnPartition = partitionColumns.size() > 1; } public List getPartitionColumns() { @@ -367,6 +368,8 @@ public class RangePartitionInfo extends PartitionInfo { partitionColumns.add(column); } + this.isMultiColumnPartition = partitionColumns.size() > 1; + counter = in.readInt(); for (int i = 0; i < counter; i++) { long partitionId = in.readLong(); @@ -405,20 +408,20 @@ public class RangePartitionInfo extends PartitionInfo { // first partition if (!range.lowerEndpoint().isMinValue()) { sb.append("PARTITION ").append(FeNameFormat.FORBIDDEN_PARTITION_NAME).append(idx) - .append(" VALUES LESS THAN ").append(range.lowerEndpoint().toSql()); + .append(" VALUES LESS THAN (").append(range.lowerEndpoint().toSql()).append(")"); sb.append(",\n"); } } else { Preconditions.checkNotNull(lastRange); if (!lastRange.upperEndpoint().equals(range.lowerEndpoint())) { sb.append("PARTITION ").append(FeNameFormat.FORBIDDEN_PARTITION_NAME).append(idx) - .append(" VALUES LESS THAN ").append(range.lowerEndpoint().toSql()); + .append(" VALUES LESS THAN (").append(range.lowerEndpoint().toSql()).append(")"); sb.append(",\n"); } } - sb.append("PARTITION ").append(partitionName).append(" VALUES LESS THAN "); - sb.append(range.upperEndpoint().toSql()); + sb.append("PARTITION ").append(partitionName).append(" VALUES LESS THAN ("); + sb.append(range.upperEndpoint().toSql()).append(")"); if (partitionId != null) { partitionId.add(entry.getKey()); diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 4a290c117b..536e8dd14a 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -17,14 +17,6 @@ package org.apache.doris.load; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.gson.Gson; -import org.apache.commons.lang.StringUtils; import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.ColumnSeparator; @@ -98,6 +90,16 @@ import org.apache.doris.transaction.TableCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionStatus; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.gson.Gson; + +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -519,7 +521,7 @@ public class Load { Map>> tableToPartitionSources = Maps.newHashMap(); for (DataDescription dataDescription : dataDescriptions) { // create source - checkAndCreateSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag()); + checkAndCreateSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag(), etlJobType); job.addTableName(dataDescription.getTableName()); } for (Entry>> tableEntry : tableToPartitionSources.entrySet()) { @@ -648,7 +650,7 @@ public class Load { public static void checkAndCreateSource(Database db, DataDescription dataDescription, Map>> tableToPartitionSources, - boolean deleteFlag) + boolean deleteFlag, EtlJobType jobType) throws DdlException { Source source = new Source(dataDescription.getFilePaths()); long tableId = -1; @@ -668,6 +670,11 @@ public class Load { throw new DdlException("Table [" + tableName + "] is not olap table"); } + if (((OlapTable) table).getPartitionInfo().isMultiColumnPartition() && jobType == EtlJobType.HADOOP) { + throw new DdlException("Load by hadoop cluster does not support table with multi partition columns." + + " Table: " + table.getName() + ". Try using broker load. See 'help broker load;'"); + } + // check partition if (dataDescription.getPartitionNames() != null && !dataDescription.getPartitionNames().isEmpty() && diff --git a/fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java b/fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java index a211185502..b611467fb0 100644 --- a/fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java +++ b/fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java @@ -34,6 +34,7 @@ import java.util.Map; /** * PullLoadSourceInfo */ +@Deprecated public class PullLoadSourceInfo implements Writable { private static final Logger LOG = LogManager.getLogger(PullLoadSourceInfo.class); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index ca0e8c4b3c..3f7d7044ff 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -101,7 +101,7 @@ public class BrokerLoadJob extends LoadJob { throw new DdlException("Database[" + dbName + "] does not exist"); } // check data source info - LoadJob.checkDataSourceInfo(db, stmt.getDataDescriptions()); + LoadJob.checkDataSourceInfo(db, stmt.getDataDescriptions(), EtlJobType.BROKER); // create job try { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 0792682d61..41f7f35ab0 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -259,13 +259,14 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } - protected static void checkDataSourceInfo(Database db, List dataDescriptions) throws DdlException { + protected static void checkDataSourceInfo(Database db, List dataDescriptions, + EtlJobType jobType) throws DdlException { for (DataDescription dataDescription : dataDescriptions) { // loadInfo is a temporary param for the method of checkAndCreateSource. // >> Map>> loadInfo = Maps.newHashMap(); // only support broker load now - Load.checkAndCreateSource(db, dataDescription, loadInfo, false); + Load.checkAndCreateSource(db, dataDescription, loadInfo, false, jobType); } } diff --git a/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java b/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java index e817a28e7d..8e8f25ba8f 100644 --- a/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java +++ b/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java @@ -31,8 +31,8 @@ import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.Type; import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.thrift.TAggregationType; @@ -51,6 +51,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,6 +60,7 @@ import java.util.Map; import java.util.Set; // This class used to split data read from file to batch +@Deprecated public class DataSplitSink extends DataSink { private static final Logger LOG = LogManager.getLogger(Planner.class); diff --git a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java index aa42f36525..f69754afe6 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -217,12 +217,11 @@ public class OlapTableSink extends DataSink { switch (partType) { case RANGE: { RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo(); - // range partition's only has one column - Preconditions.checkArgument(rangePartitionInfo.getPartitionColumns().size() == 1, - "number columns of range partition is not 1, number_columns=" - + rangePartitionInfo.getPartitionColumns().size()); - partitionParam.setPartition_column(rangePartitionInfo.getPartitionColumns().get(0).getName()); + for (Column partCol : rangePartitionInfo.getPartitionColumns()) { + partitionParam.addToPartition_columns(partCol.getName()); + } + int partColNum = rangePartitionInfo.getPartitionColumns().size(); DistributionInfo selectedDistInfo = null; for (Partition partition : table.getPartitions()) { if (partitionSet != null && !partitionSet.contains(partition.getName())) { @@ -231,12 +230,19 @@ public class OlapTableSink extends DataSink { TOlapTablePartition tPartition = new TOlapTablePartition(); tPartition.setId(partition.getId()); Range range = rangePartitionInfo.getRange(partition.getId()); + // set start keys if (range.hasLowerBound() && !range.lowerEndpoint().isMinValue()) { - tPartition.setStart_key(range.lowerEndpoint().getKeys().get(0).treeToThrift().getNodes().get(0)); + for (int i = 0; i < partColNum; i++) { + tPartition.addToStart_keys(range.lowerEndpoint().getKeys().get(i).treeToThrift().getNodes().get(0)); + } } + // set end keys if (range.hasUpperBound() && !range.upperEndpoint().isMaxValue()) { - tPartition.setEnd_key(range.upperEndpoint().getKeys().get(0).treeToThrift().getNodes().get(0)); + for (int i = 0; i < partColNum; i++) { + tPartition.addToEnd_keys(range.upperEndpoint().getKeys().get(i).treeToThrift().getNodes().get(0)); + } } + for (MaterializedIndex index : partition.getMaterializedIndices()) { tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList( index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList())))); diff --git a/fe/src/main/java/org/apache/doris/planner/RangePartitionPruner.java b/fe/src/main/java/org/apache/doris/planner/RangePartitionPruner.java index 245f1e6f0e..d91ae08868 100644 --- a/fe/src/main/java/org/apache/doris/planner/RangePartitionPruner.java +++ b/fe/src/main/java/org/apache/doris/planner/RangePartitionPruner.java @@ -94,6 +94,8 @@ public class RangePartitionPruner implements PartitionPruner { if (filter.lowerBoundInclusive && filter.upperBoundInclusive && filter.lowerBound != null && filter.upperBound != null && 0 == filter.lowerBound.compareLiteral(filter.upperBound)) { + + // eg: [10, 10], [null, null] if (filter.lowerBound instanceof NullLiteral && filter.upperBound instanceof NullLiteral) { // replace Null with min value LiteralExpr minKeyValue = LiteralExpr.createInfinity( @@ -109,6 +111,7 @@ public class RangePartitionPruner implements PartitionPruner { maxKey.popColumn(); return result; } + // no in predicate BoundType lowerType = filter.lowerBoundInclusive ? BoundType.CLOSED : BoundType.OPEN; BoundType upperType = filter.upperBoundInclusive ? BoundType.CLOSED : BoundType.OPEN; diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java index e1faaf089e..9ad683fcf9 100644 --- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -111,7 +111,7 @@ public class DdlExecutor { } else { if (Config.disable_hadoop_load) { throw new DdlException("Load job by hadoop cluster is disabled." - + " Try use broker load. See 'help broker load;'"); + + " Try using broker load. See 'help broker load;'"); } jobType = EtlJobType.HADOOP; } diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadEtlTask.java b/fe/src/main/java/org/apache/doris/task/PullLoadEtlTask.java index 76d817bbb0..24648c61f7 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadEtlTask.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadEtlTask.java @@ -26,11 +26,13 @@ import org.apache.doris.thrift.TEtlState; import com.google.common.collect.Maps; -import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Map; + // Used to process pull load etl task +@Deprecated public class PullLoadEtlTask extends LoadEtlTask { private static final Logger LOG = LogManager.getLogger(PullLoadEtlTask.class); private PullLoadJobMgr mgr; diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadJob.java b/fe/src/main/java/org/apache/doris/task/PullLoadJob.java index e77a77706d..fa2e520a9e 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadJob.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadJob.java @@ -20,6 +20,7 @@ package org.apache.doris.task; import org.apache.doris.load.LoadJob; import com.google.common.collect.Sets; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,6 +28,7 @@ import java.util.List; import java.util.Set; // One pull load job +@Deprecated public class PullLoadJob { private static final Logger LOG = LogManager.getLogger(PullLoadTask.class); diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java index f3e4865ef9..385007b2e9 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; +@Deprecated public class PullLoadJobMgr { private static final Logger LOG = LogManager.getLogger(PullLoadJobMgr.class); diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java b/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java index f820169d46..66532127f4 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; // Making a pull load job to some tasks +@Deprecated public class PullLoadPendingTask extends LoadPendingTask { private static final Logger LOG = LogManager.getLogger(PullLoadPendingTask.class); diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadTask.java b/fe/src/main/java/org/apache/doris/task/PullLoadTask.java index e312e5af0f..3ac0ebf96d 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadTask.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.UUID; // A pull load task is used to process one table of this pull load job. +@Deprecated public class PullLoadTask { private static final Logger LOG = LogManager.getLogger(PullLoadTask.class); // Input parameter diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java b/fe/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java index d07b6a197c..c9b6573681 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java @@ -27,8 +27,8 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.UserException; import org.apache.doris.common.NotImplementedException; +import org.apache.doris.common.UserException; import org.apache.doris.planner.BrokerScanNode; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSplitSink; @@ -49,6 +49,7 @@ import java.util.Collections; import java.util.List; // Planner used to generate a plan for pull load ETL work +@Deprecated public class PullLoadTaskPlanner { private static final Logger LOG = LogManager.getLogger(PullLoadTaskPlanner.class); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index d99cd86850..e114aef9fb 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -141,7 +141,7 @@ public class BrokerLoadJobTest { @Mock public void checkAndCreateSource(Database db, DataDescription dataDescription, Map>> tableToPartitionSources, - boolean deleteFlag) { + boolean deleteFlag, EtlJobType jobType) { } }; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index b8ffcb035b..2d10dbd877 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -118,6 +118,7 @@ struct TOlapTableIndexTablets { // its a closed-open range struct TOlapTablePartition { 1: required i64 id + // deprecated, use 'start_keys' and 'end_keys' instead 2: optional Exprs.TExprNode start_key 3: optional Exprs.TExprNode end_key @@ -125,6 +126,9 @@ struct TOlapTablePartition { 4: required i32 num_buckets 5: required list indexes + + 6: optional list start_keys + 7: optional list end_keys } struct TOlapTablePartitionParam { @@ -133,6 +137,7 @@ struct TOlapTablePartitionParam { 3: required i64 version // used to split a logical table to multiple paritions + // deprecated, use 'partition_columns' instead 4: optional string partition_column // used to split a partition to multiple tablets @@ -140,6 +145,8 @@ struct TOlapTablePartitionParam { // partitions 6: required list partitions + + 7: optional list partition_columns } struct TOlapTableIndexSchema {