Support multi partition column when creating table (#1574)
When creating table with OLAP engine, use can specify multi parition columns.
eg:
PARTITION BY RANGE(`date`, `id`)
(
PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"),
PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"),
PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01")
)
Notice that load by hadoop cluster does not support multi parition column table.
This commit is contained in:
@ -150,7 +150,6 @@ OlapTablePartitionParam::OlapTablePartitionParam(
|
||||
std::shared_ptr<OlapTableSchemaParam> schema,
|
||||
const TOlapTablePartitionParam& t_param)
|
||||
: _schema(schema), _t_param(t_param),
|
||||
_partition_slot_desc(nullptr),
|
||||
_mem_tracker(new MemTracker()),
|
||||
_mem_pool(new MemPool(_mem_tracker.get())) {
|
||||
}
|
||||
@ -170,11 +169,23 @@ Status OlapTablePartitionParam::init() {
|
||||
ss << "partition column not found, column=" << _t_param.partition_column;
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
_partition_slot_desc = it->second;
|
||||
_partition_slot_descs.push_back(it->second);
|
||||
} else {
|
||||
DCHECK(_t_param.__isset.partition_columns);
|
||||
for (auto& part_col : _t_param.partition_columns) {
|
||||
auto it = slots_map.find(part_col);
|
||||
if (it == std::end(slots_map)) {
|
||||
std::stringstream ss;
|
||||
ss << "partition column not found, column=" << part_col;
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
_partition_slot_descs.push_back(it->second);
|
||||
}
|
||||
}
|
||||
|
||||
_partitions_map.reset(
|
||||
new std::map<Tuple*, OlapTablePartition*, OlapTablePartKeyComparator>(
|
||||
OlapTablePartKeyComparator(_partition_slot_desc)));
|
||||
OlapTablePartKeyComparator(_partition_slot_descs)));
|
||||
if (_t_param.__isset.distributed_columns) {
|
||||
for (auto& col : _t_param.distributed_columns) {
|
||||
auto it = slots_map.find(col);
|
||||
@ -191,12 +202,22 @@ Status OlapTablePartitionParam::init() {
|
||||
const TOlapTablePartition& t_part = _t_param.partitions[i];
|
||||
OlapTablePartition* part = _obj_pool.add(new OlapTablePartition());
|
||||
part->id = t_part.id;
|
||||
|
||||
if (t_part.__isset.start_key) {
|
||||
RETURN_IF_ERROR(_create_partition_key(t_part.start_key, &part->start_key));
|
||||
// deprecated, use start_keys instead
|
||||
std::vector<TExprNode> exprs = { t_part.start_key };
|
||||
RETURN_IF_ERROR(_create_partition_keys(exprs, &part->start_key));
|
||||
} else if (t_part.__isset.start_keys) {
|
||||
RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part->start_key));
|
||||
}
|
||||
if (t_part.__isset.end_key) {
|
||||
RETURN_IF_ERROR(_create_partition_key(t_part.end_key, &part->end_key));
|
||||
// deprecated, use end_keys instead
|
||||
std::vector<TExprNode> exprs = { t_part.end_key };
|
||||
RETURN_IF_ERROR(_create_partition_keys(exprs, &part->end_key));
|
||||
} else if (t_part.__isset.end_keys) {
|
||||
RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part->end_key));
|
||||
}
|
||||
|
||||
part->num_buckets = t_part.num_buckets;
|
||||
auto num_indexes = _schema->indexes().size();
|
||||
if (t_part.indexes.size() != num_indexes) {
|
||||
@ -242,10 +263,19 @@ bool OlapTablePartitionParam::find_tablet(Tuple* tuple,
|
||||
return false;
|
||||
}
|
||||
|
||||
Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, Tuple** part_key) {
|
||||
Status OlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs, Tuple** part_key) {
|
||||
Tuple* tuple = (Tuple*)_mem_pool->allocate(_schema->tuple_desc()->byte_size());
|
||||
void* slot = tuple->get_slot(_partition_slot_desc->tuple_offset());
|
||||
tuple->set_not_null(_partition_slot_desc->null_indicator_offset());
|
||||
for (int i = 0; i < t_exprs.size(); i++) {
|
||||
const TExprNode& t_expr = t_exprs[i];
|
||||
RETURN_IF_ERROR(_create_partition_key(t_expr, tuple, _partition_slot_descs[i]));
|
||||
}
|
||||
*part_key = tuple;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, Tuple* tuple, SlotDescriptor* slot_desc) {
|
||||
void* slot = tuple->get_slot(slot_desc->tuple_offset());
|
||||
tuple->set_not_null(slot_desc->null_indicator_offset());
|
||||
switch (t_expr.node_type) {
|
||||
case TExprNodeType::DATE_LITERAL: {
|
||||
if (!reinterpret_cast<DateTimeValue*>(slot)->from_date_str(
|
||||
@ -293,7 +323,6 @@ Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, T
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
}
|
||||
*part_key = tuple;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -104,7 +104,8 @@ struct OlapTablePartition {
|
||||
|
||||
class OlapTablePartKeyComparator {
|
||||
public:
|
||||
OlapTablePartKeyComparator(SlotDescriptor* slot_desc) : _slot_desc(slot_desc) { }
|
||||
OlapTablePartKeyComparator(const std::vector<SlotDescriptor*>& slot_descs) :
|
||||
_slot_descs(slot_descs) { }
|
||||
// return true if lhs < rhs
|
||||
// 'nullptr' is max value, but 'null' is min value
|
||||
bool operator()(const Tuple* lhs, const Tuple* rhs) const {
|
||||
@ -114,16 +115,23 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
bool lhs_null = lhs->is_null(_slot_desc->null_indicator_offset());
|
||||
bool rhs_null = rhs->is_null(_slot_desc->null_indicator_offset());
|
||||
if (lhs_null || rhs_null) { return !rhs_null; }
|
||||
for (auto slot_desc : _slot_descs) {
|
||||
bool lhs_null = lhs->is_null(slot_desc->null_indicator_offset());
|
||||
bool rhs_null = rhs->is_null(slot_desc->null_indicator_offset());
|
||||
if (lhs_null && rhs_null) { continue; }
|
||||
if (lhs_null || rhs_null) { return !rhs_null; }
|
||||
|
||||
auto lhs_value = lhs->get_slot(_slot_desc->tuple_offset());
|
||||
auto rhs_value = rhs->get_slot(_slot_desc->tuple_offset());
|
||||
return RawValue::lt(lhs_value, rhs_value, _slot_desc->type());
|
||||
auto lhs_value = lhs->get_slot(slot_desc->tuple_offset());
|
||||
auto rhs_value = rhs->get_slot(slot_desc->tuple_offset());
|
||||
|
||||
int res = RawValue::compare(lhs_value, rhs_value, slot_desc->type());
|
||||
if (res != 0) { return res < 0; }
|
||||
}
|
||||
// equal, return false
|
||||
return false;
|
||||
}
|
||||
private:
|
||||
SlotDescriptor* _slot_desc;
|
||||
std::vector<SlotDescriptor*> _slot_descs;
|
||||
};
|
||||
|
||||
// store an olap table's tablet information
|
||||
@ -150,7 +158,9 @@ public:
|
||||
}
|
||||
std::string debug_string() const;
|
||||
private:
|
||||
Status _create_partition_key(const TExprNode& t_expr, Tuple** part_key);
|
||||
Status _create_partition_keys(const std::vector<TExprNode>& t_exprs, Tuple** part_key);
|
||||
|
||||
Status _create_partition_key(const TExprNode& t_expr, Tuple* tuple, SlotDescriptor* slot_desc);
|
||||
|
||||
uint32_t _compute_dist_hash(Tuple* key) const;
|
||||
|
||||
@ -160,7 +170,7 @@ private:
|
||||
// start_key is nullptr means the lower bound is boundless
|
||||
return true;
|
||||
}
|
||||
OlapTablePartKeyComparator comparator(_partition_slot_desc);
|
||||
OlapTablePartKeyComparator comparator(_partition_slot_descs);
|
||||
return !comparator(key, part->start_key);
|
||||
}
|
||||
private:
|
||||
@ -168,7 +178,7 @@ private:
|
||||
std::shared_ptr<OlapTableSchemaParam> _schema;
|
||||
TOlapTablePartitionParam _t_param;
|
||||
|
||||
SlotDescriptor* _partition_slot_desc;
|
||||
std::vector<SlotDescriptor*> _partition_slot_descs;
|
||||
std::vector<SlotDescriptor*> _distributed_slot_descs;
|
||||
|
||||
ObjectPool _obj_pool;
|
||||
|
||||
@ -86,7 +86,7 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的
|
||||
|
||||
1. Partition
|
||||
|
||||
* Partition 列只能指定一列。且必须为 KEY 列。
|
||||
* Partition 列可以指定一列或多列。分区类必须为 KEY 列。多列分区的使用方式在后面 **多列分区** 小结介绍。
|
||||
* Partition 的区间界限是左闭右开。比如如上示例,如果想在 p201702 存储所有2月份的数据,则分区值需要输入 "2017-03-01",即范围为:[2017-02-01, 2017-03-01)。
|
||||
* 不论分区列是什么类型,在写分区值时,都需要加双引号。
|
||||
* 分区列通常为时间列,以方便的管理新旧数据。
|
||||
@ -156,7 +156,7 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的
|
||||
|
||||
* 如果使用了 Partition,则 `DISTRIBUTED ...` 语句描述的是数据在**各个分区内**的划分规则。如果不使用 Partition,则描述的是对整个表的数据的划分规则。
|
||||
* 分桶列可以是多列,但必须为 Key 列。分桶列可以和 Partition 列相同或不同。
|
||||
* 分桶列的选择,是在 **查询吞吐** 和 **查询延迟** 之间的一种权衡:
|
||||
* 分桶列的选择,是在 **查询吞吐** 和 **查询并发** 之间的一种权衡:
|
||||
|
||||
1. 如果选择多个分桶列,则数据分布更均匀。但如果查询条件不包含所有分桶列的等值条件的话,一个查询会扫描所有分桶。这样查询的吞吐会增加,但是单个查询的延迟也会增加。这个方式适合大吞吐低并发的查询场景。
|
||||
2. 如果仅选择一个或少数分桶列,则点查询可以仅查询一个分桶。这种方式适合高并发的点查询场景。
|
||||
@ -174,6 +174,42 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的
|
||||
* 举一些例子:假设在有10台BE,每台BE一块磁盘的情况下。如果一个表总大小为 500MB,则可以考虑4-8个分片。5GB:8-16个。50GB:32个。500GB:建议分区,每个分区大小在 50GB 左右,每个分区16-32个分片。5TB:建议分区,每个分区大小在 50GB 左右,每个分区16-32个分片。
|
||||
|
||||
> 注:表的数据量可以通过 `show data` 命令查看,结果除以副本数,即表的数据量。
|
||||
|
||||
#### 多列分区
|
||||
|
||||
Doris 支持指定多列作为分区列,示例如下:
|
||||
|
||||
```
|
||||
PARTITION BY RANGE(`date`, `id`)
|
||||
(
|
||||
PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"),
|
||||
PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"),
|
||||
PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01")
|
||||
)
|
||||
```
|
||||
|
||||
在以上示例中,我们指定 `date`(DATE 类型) 和 `id`(INT 类型) 作为分区列。以上示例最终得到的分区如下:
|
||||
|
||||
```
|
||||
* p201701_1000: [(MIN_VALUE, MIN_VALUE), ("2017-02-01", "1000") )
|
||||
* p201702_2000: [("2017-02-01", "1000"), ("2017-03-01", "2000") )
|
||||
* p201703_all: [("2017-03-01", "2000"), ("2017-04-01", MIN_VALUE))
|
||||
```
|
||||
|
||||
注意,最后一个分区用户缺省只指定了 `date` 列的分区值,所以 `id` 列的分区值会默认填充 `MIN_VALUE`。当用户插入数据时,分区列值会按照顺序依次比较,最终得到对应的分区。举例如下:
|
||||
|
||||
```
|
||||
* 数据 --> 分区
|
||||
* 2017-01-01, 200 --> p201701_1000
|
||||
* 2017-01-01, 2000 --> p201701_1000
|
||||
* 2017-02-01, 100 --> p201701_1000
|
||||
* 2017-02-01, 2000 --> p201702_2000
|
||||
* 2017-02-15, 5000 --> p201702_2000
|
||||
* 2017-03-01, 2000 --> p201703_all
|
||||
* 2017-03-10, 1 --> p201703_all
|
||||
* 2017-04-01, 1000 --> 无法导入
|
||||
* 2017-05-01, 1000 --> 无法导入
|
||||
```
|
||||
|
||||
### PROPERTIES
|
||||
|
||||
|
||||
@ -139,10 +139,10 @@
|
||||
4. partition_desc
|
||||
1) Range 分区
|
||||
语法:
|
||||
PARTITION BY RANGE (k1)
|
||||
PARTITION BY RANGE (k1, k2, ...)
|
||||
(
|
||||
PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1")
|
||||
PARTITION partition_name VALUES LESS THAN MAXVALUE|("value2")
|
||||
PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1", "value2", ...)
|
||||
PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1", "value2", ...)
|
||||
...
|
||||
)
|
||||
说明:
|
||||
@ -152,6 +152,7 @@
|
||||
TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE, DATETIME
|
||||
3) 分区为左闭右开区间,首个分区的左边界为做最小值
|
||||
4) NULL 值只会存放在包含最小值的分区中。当包含最小值的分区被删除后,NULL 值将无法导入。
|
||||
5) 可以指定一列或多列作为分区列。如果分区值缺省,则会默认填充最小值。
|
||||
|
||||
注意:
|
||||
1) 分区一般用于时间维度的数据管理
|
||||
|
||||
@ -287,10 +287,6 @@ public class CreateTableStmt extends DdlStmt {
|
||||
}
|
||||
|
||||
RangePartitionDesc rangePartitionDesc = (RangePartitionDesc) partitionDesc;
|
||||
if (rangePartitionDesc.getPartitionColNames().size() != 1) {
|
||||
throw new AnalysisException("Only allow partitioned by one column");
|
||||
}
|
||||
|
||||
rangePartitionDesc.analyze(columnDefs, properties);
|
||||
}
|
||||
|
||||
|
||||
@ -173,9 +173,9 @@ public class RangePartitionDesc extends PartitionDesc {
|
||||
* VALUE LESS THEN (80)
|
||||
*
|
||||
* key range is:
|
||||
* ( {MIN, MIN, MIN}, {10, 100, 1000} )
|
||||
* [ {10, 100, 500}, {50, 500, ? } )
|
||||
* [ {50, 500, ? }, {80, ?, ? } )
|
||||
* ( {MIN, MIN, MIN}, {10, 100, 1000} )
|
||||
* [ {10, 100, 1000}, {50, 500, MIN } )
|
||||
* [ {50, 500, MIN }, {80, MIN, MIN } )
|
||||
*/
|
||||
RangePartitionInfo rangePartitionInfo = new RangePartitionInfo(partitionColumns);
|
||||
for (SingleRangePartitionDesc desc : singleRangePartitionDescs) {
|
||||
|
||||
@ -38,6 +38,8 @@ public class PartitionInfo implements Writable {
|
||||
protected Map<Long, DataProperty> idToDataProperty;
|
||||
// partition id -> replication num
|
||||
protected Map<Long, Short> idToReplicationNum;
|
||||
// true if the partition has multi partition columns
|
||||
protected boolean isMultiColumnPartition = false;
|
||||
|
||||
public PartitionInfo() {
|
||||
// for persist
|
||||
@ -87,6 +89,10 @@ public class PartitionInfo implements Writable {
|
||||
return partitionInfo;
|
||||
}
|
||||
|
||||
public boolean isMultiColumnPartition() {
|
||||
return isMultiColumnPartition;
|
||||
}
|
||||
|
||||
public String toSql(OlapTable table, List<Long> partitionId) {
|
||||
return "";
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@ -75,8 +76,7 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable {
|
||||
// fill the vacancy with MIN
|
||||
for (; i < columns.size(); ++i) {
|
||||
Type type = Type.fromPrimitiveType(columns.get(i).getDataType());
|
||||
partitionKey.keys.add(
|
||||
LiteralExpr.createInfinity(type, false));
|
||||
partitionKey.keys.add(LiteralExpr.createInfinity(type, false));
|
||||
partitionKey.types.add(columns.get(i).getDataType());
|
||||
}
|
||||
|
||||
@ -127,8 +127,8 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
// compare with other PartitionKey. used for partition prune
|
||||
@Override
|
||||
public int compareTo(PartitionKey other) {
|
||||
int this_key_len = this.keys.size();
|
||||
int other_key_len = other.keys.size();
|
||||
@ -184,10 +184,10 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable {
|
||||
value = dateLiteral.toSql();
|
||||
}
|
||||
}
|
||||
if (keys.size() - 1 == i) {
|
||||
strBuilder.append("(").append(value).append(")");
|
||||
} else {
|
||||
strBuilder.append("(").append(value).append("), ");
|
||||
strBuilder.append(value);
|
||||
|
||||
if (keys.size() - 1 != i) {
|
||||
strBuilder.append(", ");
|
||||
}
|
||||
i++;
|
||||
}
|
||||
@ -198,9 +198,7 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable {
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("types: [");
|
||||
for (PrimitiveType type : types) {
|
||||
builder.append(type.toString());
|
||||
}
|
||||
builder.append(Joiner.on(", ").join(types));
|
||||
builder.append("]; ");
|
||||
|
||||
builder.append("keys: [");
|
||||
|
||||
@ -73,6 +73,7 @@ public class RangePartitionInfo extends PartitionInfo {
|
||||
super(PartitionType.RANGE);
|
||||
this.partitionColumns = partitionColumns;
|
||||
this.idToRange = new HashMap<Long, Range<PartitionKey>>();
|
||||
this.isMultiColumnPartition = partitionColumns.size() > 1;
|
||||
}
|
||||
|
||||
public List<Column> getPartitionColumns() {
|
||||
@ -367,6 +368,8 @@ public class RangePartitionInfo extends PartitionInfo {
|
||||
partitionColumns.add(column);
|
||||
}
|
||||
|
||||
this.isMultiColumnPartition = partitionColumns.size() > 1;
|
||||
|
||||
counter = in.readInt();
|
||||
for (int i = 0; i < counter; i++) {
|
||||
long partitionId = in.readLong();
|
||||
@ -405,20 +408,20 @@ public class RangePartitionInfo extends PartitionInfo {
|
||||
// first partition
|
||||
if (!range.lowerEndpoint().isMinValue()) {
|
||||
sb.append("PARTITION ").append(FeNameFormat.FORBIDDEN_PARTITION_NAME).append(idx)
|
||||
.append(" VALUES LESS THAN ").append(range.lowerEndpoint().toSql());
|
||||
.append(" VALUES LESS THAN (").append(range.lowerEndpoint().toSql()).append(")");
|
||||
sb.append(",\n");
|
||||
}
|
||||
} else {
|
||||
Preconditions.checkNotNull(lastRange);
|
||||
if (!lastRange.upperEndpoint().equals(range.lowerEndpoint())) {
|
||||
sb.append("PARTITION ").append(FeNameFormat.FORBIDDEN_PARTITION_NAME).append(idx)
|
||||
.append(" VALUES LESS THAN ").append(range.lowerEndpoint().toSql());
|
||||
.append(" VALUES LESS THAN (").append(range.lowerEndpoint().toSql()).append(")");
|
||||
sb.append(",\n");
|
||||
}
|
||||
}
|
||||
|
||||
sb.append("PARTITION ").append(partitionName).append(" VALUES LESS THAN ");
|
||||
sb.append(range.upperEndpoint().toSql());
|
||||
sb.append("PARTITION ").append(partitionName).append(" VALUES LESS THAN (");
|
||||
sb.append(range.upperEndpoint().toSql()).append(")");
|
||||
|
||||
if (partitionId != null) {
|
||||
partitionId.add(entry.getKey());
|
||||
|
||||
@ -17,14 +17,6 @@
|
||||
|
||||
package org.apache.doris.load;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.doris.analysis.BinaryPredicate;
|
||||
import org.apache.doris.analysis.CancelLoadStmt;
|
||||
import org.apache.doris.analysis.ColumnSeparator;
|
||||
@ -98,6 +90,16 @@ import org.apache.doris.transaction.TableCommitInfo;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
|
||||
import org.apache.doris.transaction.TransactionStatus;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.gson.Gson;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -519,7 +521,7 @@ public class Load {
|
||||
Map<Long, Map<Long, List<Source>>> tableToPartitionSources = Maps.newHashMap();
|
||||
for (DataDescription dataDescription : dataDescriptions) {
|
||||
// create source
|
||||
checkAndCreateSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag());
|
||||
checkAndCreateSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag(), etlJobType);
|
||||
job.addTableName(dataDescription.getTableName());
|
||||
}
|
||||
for (Entry<Long, Map<Long, List<Source>>> tableEntry : tableToPartitionSources.entrySet()) {
|
||||
@ -648,7 +650,7 @@ public class Load {
|
||||
|
||||
public static void checkAndCreateSource(Database db, DataDescription dataDescription,
|
||||
Map<Long, Map<Long, List<Source>>> tableToPartitionSources,
|
||||
boolean deleteFlag)
|
||||
boolean deleteFlag, EtlJobType jobType)
|
||||
throws DdlException {
|
||||
Source source = new Source(dataDescription.getFilePaths());
|
||||
long tableId = -1;
|
||||
@ -668,6 +670,11 @@ public class Load {
|
||||
throw new DdlException("Table [" + tableName + "] is not olap table");
|
||||
}
|
||||
|
||||
if (((OlapTable) table).getPartitionInfo().isMultiColumnPartition() && jobType == EtlJobType.HADOOP) {
|
||||
throw new DdlException("Load by hadoop cluster does not support table with multi partition columns."
|
||||
+ " Table: " + table.getName() + ". Try using broker load. See 'help broker load;'");
|
||||
}
|
||||
|
||||
// check partition
|
||||
if (dataDescription.getPartitionNames() != null &&
|
||||
!dataDescription.getPartitionNames().isEmpty() &&
|
||||
|
||||
@ -34,6 +34,7 @@ import java.util.Map;
|
||||
/**
|
||||
* PullLoadSourceInfo
|
||||
*/
|
||||
@Deprecated
|
||||
public class PullLoadSourceInfo implements Writable {
|
||||
private static final Logger LOG = LogManager.getLogger(PullLoadSourceInfo.class);
|
||||
|
||||
|
||||
@ -101,7 +101,7 @@ public class BrokerLoadJob extends LoadJob {
|
||||
throw new DdlException("Database[" + dbName + "] does not exist");
|
||||
}
|
||||
// check data source info
|
||||
LoadJob.checkDataSourceInfo(db, stmt.getDataDescriptions());
|
||||
LoadJob.checkDataSourceInfo(db, stmt.getDataDescriptions(), EtlJobType.BROKER);
|
||||
|
||||
// create job
|
||||
try {
|
||||
|
||||
@ -259,13 +259,14 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
|
||||
}
|
||||
}
|
||||
|
||||
protected static void checkDataSourceInfo(Database db, List<DataDescription> dataDescriptions) throws DdlException {
|
||||
protected static void checkDataSourceInfo(Database db, List<DataDescription> dataDescriptions,
|
||||
EtlJobType jobType) throws DdlException {
|
||||
for (DataDescription dataDescription : dataDescriptions) {
|
||||
// loadInfo is a temporary param for the method of checkAndCreateSource.
|
||||
// <TableId,<PartitionId,<LoadInfoList>>>
|
||||
Map<Long, Map<Long, List<Source>>> loadInfo = Maps.newHashMap();
|
||||
// only support broker load now
|
||||
Load.checkAndCreateSource(db, dataDescription, loadInfo, false);
|
||||
Load.checkAndCreateSource(db, dataDescription, loadInfo, false, jobType);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -31,8 +31,8 @@ import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionKey;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.RangePartitionInfo;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.thrift.TAggregationType;
|
||||
@ -51,6 +51,7 @@ import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Range;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -59,6 +60,7 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
// This class used to split data read from file to batch
|
||||
@Deprecated
|
||||
public class DataSplitSink extends DataSink {
|
||||
private static final Logger LOG = LogManager.getLogger(Planner.class);
|
||||
|
||||
|
||||
@ -217,12 +217,11 @@ public class OlapTableSink extends DataSink {
|
||||
switch (partType) {
|
||||
case RANGE: {
|
||||
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo();
|
||||
// range partition's only has one column
|
||||
Preconditions.checkArgument(rangePartitionInfo.getPartitionColumns().size() == 1,
|
||||
"number columns of range partition is not 1, number_columns="
|
||||
+ rangePartitionInfo.getPartitionColumns().size());
|
||||
partitionParam.setPartition_column(rangePartitionInfo.getPartitionColumns().get(0).getName());
|
||||
for (Column partCol : rangePartitionInfo.getPartitionColumns()) {
|
||||
partitionParam.addToPartition_columns(partCol.getName());
|
||||
}
|
||||
|
||||
int partColNum = rangePartitionInfo.getPartitionColumns().size();
|
||||
DistributionInfo selectedDistInfo = null;
|
||||
for (Partition partition : table.getPartitions()) {
|
||||
if (partitionSet != null && !partitionSet.contains(partition.getName())) {
|
||||
@ -231,12 +230,19 @@ public class OlapTableSink extends DataSink {
|
||||
TOlapTablePartition tPartition = new TOlapTablePartition();
|
||||
tPartition.setId(partition.getId());
|
||||
Range<PartitionKey> range = rangePartitionInfo.getRange(partition.getId());
|
||||
// set start keys
|
||||
if (range.hasLowerBound() && !range.lowerEndpoint().isMinValue()) {
|
||||
tPartition.setStart_key(range.lowerEndpoint().getKeys().get(0).treeToThrift().getNodes().get(0));
|
||||
for (int i = 0; i < partColNum; i++) {
|
||||
tPartition.addToStart_keys(range.lowerEndpoint().getKeys().get(i).treeToThrift().getNodes().get(0));
|
||||
}
|
||||
}
|
||||
// set end keys
|
||||
if (range.hasUpperBound() && !range.upperEndpoint().isMaxValue()) {
|
||||
tPartition.setEnd_key(range.upperEndpoint().getKeys().get(0).treeToThrift().getNodes().get(0));
|
||||
for (int i = 0; i < partColNum; i++) {
|
||||
tPartition.addToEnd_keys(range.upperEndpoint().getKeys().get(i).treeToThrift().getNodes().get(0));
|
||||
}
|
||||
}
|
||||
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices()) {
|
||||
tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList(
|
||||
index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
|
||||
|
||||
@ -94,6 +94,8 @@ public class RangePartitionPruner implements PartitionPruner {
|
||||
if (filter.lowerBoundInclusive && filter.upperBoundInclusive
|
||||
&& filter.lowerBound != null && filter.upperBound != null
|
||||
&& 0 == filter.lowerBound.compareLiteral(filter.upperBound)) {
|
||||
|
||||
// eg: [10, 10], [null, null]
|
||||
if (filter.lowerBound instanceof NullLiteral && filter.upperBound instanceof NullLiteral) {
|
||||
// replace Null with min value
|
||||
LiteralExpr minKeyValue = LiteralExpr.createInfinity(
|
||||
@ -109,6 +111,7 @@ public class RangePartitionPruner implements PartitionPruner {
|
||||
maxKey.popColumn();
|
||||
return result;
|
||||
}
|
||||
|
||||
// no in predicate
|
||||
BoundType lowerType = filter.lowerBoundInclusive ? BoundType.CLOSED : BoundType.OPEN;
|
||||
BoundType upperType = filter.upperBoundInclusive ? BoundType.CLOSED : BoundType.OPEN;
|
||||
|
||||
@ -111,7 +111,7 @@ public class DdlExecutor {
|
||||
} else {
|
||||
if (Config.disable_hadoop_load) {
|
||||
throw new DdlException("Load job by hadoop cluster is disabled."
|
||||
+ " Try use broker load. See 'help broker load;'");
|
||||
+ " Try using broker load. See 'help broker load;'");
|
||||
}
|
||||
jobType = EtlJobType.HADOOP;
|
||||
}
|
||||
|
||||
@ -26,11 +26,13 @@ import org.apache.doris.thrift.TEtlState;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.Map;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
// Used to process pull load etl task
|
||||
@Deprecated
|
||||
public class PullLoadEtlTask extends LoadEtlTask {
|
||||
private static final Logger LOG = LogManager.getLogger(PullLoadEtlTask.class);
|
||||
private PullLoadJobMgr mgr;
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.task;
|
||||
import org.apache.doris.load.LoadJob;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -27,6 +28,7 @@ import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
// One pull load job
|
||||
@Deprecated
|
||||
public class PullLoadJob {
|
||||
private static final Logger LOG = LogManager.getLogger(PullLoadTask.class);
|
||||
|
||||
|
||||
@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@Deprecated
|
||||
public class PullLoadJobMgr {
|
||||
private static final Logger LOG = LogManager.getLogger(PullLoadJobMgr.class);
|
||||
|
||||
|
||||
@ -40,6 +40,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
// Making a pull load job to some tasks
|
||||
@Deprecated
|
||||
public class PullLoadPendingTask extends LoadPendingTask {
|
||||
private static final Logger LOG = LogManager.getLogger(PullLoadPendingTask.class);
|
||||
|
||||
|
||||
@ -42,6 +42,7 @@ import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
// A pull load task is used to process one table of this pull load job.
|
||||
@Deprecated
|
||||
public class PullLoadTask {
|
||||
private static final Logger LOG = LogManager.getLogger(PullLoadTask.class);
|
||||
// Input parameter
|
||||
|
||||
@ -27,8 +27,8 @@ import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.NotImplementedException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.BrokerScanNode;
|
||||
import org.apache.doris.planner.DataPartition;
|
||||
import org.apache.doris.planner.DataSplitSink;
|
||||
@ -49,6 +49,7 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
// Planner used to generate a plan for pull load ETL work
|
||||
@Deprecated
|
||||
public class PullLoadTaskPlanner {
|
||||
private static final Logger LOG = LogManager.getLogger(PullLoadTaskPlanner.class);
|
||||
|
||||
|
||||
@ -141,7 +141,7 @@ public class BrokerLoadJobTest {
|
||||
@Mock
|
||||
public void checkAndCreateSource(Database db, DataDescription dataDescription,
|
||||
Map<Long, Map<Long, List<Source>>> tableToPartitionSources,
|
||||
boolean deleteFlag) {
|
||||
boolean deleteFlag, EtlJobType jobType) {
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -118,6 +118,7 @@ struct TOlapTableIndexTablets {
|
||||
// its a closed-open range
|
||||
struct TOlapTablePartition {
|
||||
1: required i64 id
|
||||
// deprecated, use 'start_keys' and 'end_keys' instead
|
||||
2: optional Exprs.TExprNode start_key
|
||||
3: optional Exprs.TExprNode end_key
|
||||
|
||||
@ -125,6 +126,9 @@ struct TOlapTablePartition {
|
||||
4: required i32 num_buckets
|
||||
|
||||
5: required list<TOlapTableIndexTablets> indexes
|
||||
|
||||
6: optional list<Exprs.TExprNode> start_keys
|
||||
7: optional list<Exprs.TExprNode> end_keys
|
||||
}
|
||||
|
||||
struct TOlapTablePartitionParam {
|
||||
@ -133,6 +137,7 @@ struct TOlapTablePartitionParam {
|
||||
3: required i64 version
|
||||
|
||||
// used to split a logical table to multiple paritions
|
||||
// deprecated, use 'partition_columns' instead
|
||||
4: optional string partition_column
|
||||
|
||||
// used to split a partition to multiple tablets
|
||||
@ -140,6 +145,8 @@ struct TOlapTablePartitionParam {
|
||||
|
||||
// partitions
|
||||
6: required list<TOlapTablePartition> partitions
|
||||
|
||||
7: optional list<string> partition_columns
|
||||
}
|
||||
|
||||
struct TOlapTableIndexSchema {
|
||||
|
||||
Reference in New Issue
Block a user