Fix bug that null value is not correctly handled when loading data (#1070)
When partition column's value is NULL, it should be loaded into
the partition which include MIN VALUE
This commit is contained in:
@ -105,12 +105,25 @@ struct OlapTablePartition {
|
||||
class OlapTablePartKeyComparator {
|
||||
public:
|
||||
OlapTablePartKeyComparator(SlotDescriptor* slot_desc) : _slot_desc(slot_desc) { }
|
||||
// return true if lhs < rhs
|
||||
// 'nullptr' is max value, but 'null' is min value
|
||||
bool operator()(const Tuple* lhs, const Tuple* rhs) const {
|
||||
if (lhs == nullptr) {
|
||||
return false;
|
||||
} else if (rhs == nullptr) {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool lhs_null = lhs->is_null(_slot_desc->null_indicator_offset());
|
||||
bool rhs_null = rhs->is_null(_slot_desc->null_indicator_offset());
|
||||
if (lhs_null && rhs_null) {
|
||||
return false;
|
||||
} else if (lhs_null) {
|
||||
return true;
|
||||
} else if (rhs_null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto lhs_value = lhs->get_slot(_slot_desc->tuple_offset());
|
||||
auto rhs_value = rhs->get_slot(_slot_desc->tuple_offset());
|
||||
return RawValue::lt(lhs_value, rhs_value, _slot_desc->type());
|
||||
@ -150,6 +163,7 @@ private:
|
||||
// check if this partition contain this key
|
||||
bool _part_contains(OlapTablePartition* part, Tuple* key) const {
|
||||
if (part->start_key == nullptr) {
|
||||
// start_key is nullptr means the lower bound is boundless
|
||||
return true;
|
||||
}
|
||||
OlapTablePartKeyComparator comparator(_partition_slot_desc);
|
||||
|
||||
@ -150,6 +150,7 @@
|
||||
2) 目前仅支持以下类型的列作为 Range 分区列,且只能指定一个分区列
|
||||
TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE, DATETIME
|
||||
3) 分区为左闭右开区间,首个分区的左边界为做最小值
|
||||
4) NULL 值只会存放在包含最小值的分区中。当包含最小值的分区被删除后,NULL 值将无法导入。
|
||||
|
||||
注意:
|
||||
1) 分区一般用于时间维度的数据管理
|
||||
|
||||
@ -29,8 +29,8 @@ import org.apache.doris.common.io.Writable;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
@ -44,7 +44,7 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable {
|
||||
private List<LiteralExpr> keys;
|
||||
private List<PrimitiveType> types;
|
||||
|
||||
// constuct for partition prune
|
||||
// constructor for partition prune
|
||||
public PartitionKey() {
|
||||
keys = Lists.newArrayList();
|
||||
types = Lists.newArrayList();
|
||||
|
||||
@ -227,7 +227,7 @@ public class OlapTableSink extends DataSink {
|
||||
TOlapTablePartition tPartition = new TOlapTablePartition();
|
||||
tPartition.setId(partition.getId());
|
||||
Range<PartitionKey> range = rangePartitionInfo.getRange(partition.getId());
|
||||
if (range.hasLowerBound()) {
|
||||
if (range.hasLowerBound() && !range.lowerEndpoint().isMinValue()) {
|
||||
tPartition.setStart_key(range.lowerEndpoint().getKeys().get(0).treeToThrift().getNodes().get(0));
|
||||
}
|
||||
if (range.hasUpperBound() && !range.upperEndpoint().isMaxValue()) {
|
||||
@ -311,3 +311,4 @@ public class OlapTableSink extends DataSink {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -49,7 +49,7 @@ public class PartitionColumnFilter {
|
||||
this.inPredicate = inPredicate;
|
||||
}
|
||||
|
||||
// selete the bigger bound
|
||||
// select the bigger bound
|
||||
public void setLowerBound(LiteralExpr newLowerBound, boolean newLowerBoundInclusive) {
|
||||
if (null == lowerBound) {
|
||||
lowerBound = newLowerBound;
|
||||
|
||||
@ -56,14 +56,14 @@ public class RangePartitionPruner implements PartitionPruner {
|
||||
}
|
||||
|
||||
private Collection<Long> prune(RangeMap<PartitionKey, Long> rangeMap,
|
||||
int columnId,
|
||||
int columnIdx,
|
||||
PartitionKey minKey,
|
||||
PartitionKey maxKey,
|
||||
int complex)
|
||||
throws AnalysisException {
|
||||
LOG.debug("column id {}, column filters {}", columnId, partitionColumnFilters);
|
||||
LOG.debug("column idx {}, column filters {}", columnIdx, partitionColumnFilters);
|
||||
// the last column in partition Key
|
||||
if (columnId == partitionColumns.size()) {
|
||||
if (columnIdx == partitionColumns.size()) {
|
||||
try {
|
||||
return Lists.newArrayList(rangeMap.subRangeMap(Range.closed(minKey, maxKey)).asMapOfRanges().values());
|
||||
} catch (IllegalArgumentException e) {
|
||||
@ -71,7 +71,7 @@ public class RangePartitionPruner implements PartitionPruner {
|
||||
}
|
||||
}
|
||||
// no filter in this column
|
||||
Column keyColumn = partitionColumns.get(columnId);
|
||||
Column keyColumn = partitionColumns.get(columnIdx);
|
||||
PartitionColumnFilter filter = partitionColumnFilters.get(keyColumn.getName());
|
||||
if (null == filter) {
|
||||
minKey.pushColumn(LiteralExpr.createInfinity(Type.fromPrimitiveType(keyColumn.getDataType()), false),
|
||||
@ -104,7 +104,7 @@ public class RangePartitionPruner implements PartitionPruner {
|
||||
minKey.pushColumn(filter.lowerBound, keyColumn.getDataType());
|
||||
maxKey.pushColumn(filter.upperBound, keyColumn.getDataType());
|
||||
}
|
||||
Collection<Long> result = prune(rangeMap, columnId + 1, minKey, maxKey, complex);
|
||||
Collection<Long> result = prune(rangeMap, columnIdx + 1, minKey, maxKey, complex);
|
||||
minKey.popColumn();
|
||||
maxKey.popColumn();
|
||||
return result;
|
||||
@ -118,8 +118,8 @@ public class RangePartitionPruner implements PartitionPruner {
|
||||
if (filter.lowerBound != null) {
|
||||
minKey.pushColumn(filter.lowerBound, keyColumn.getDataType());
|
||||
pushMinCount++;
|
||||
if (filter.lowerBoundInclusive && columnId != lastColumnId) {
|
||||
Column column = partitionColumns.get(columnId + 1);
|
||||
if (filter.lowerBoundInclusive && columnIdx != lastColumnId) {
|
||||
Column column = partitionColumns.get(columnIdx + 1);
|
||||
Type type = Type.fromPrimitiveType(column.getDataType());
|
||||
minKey.pushColumn(LiteralExpr.createInfinity(type, false), column.getDataType());
|
||||
pushMinCount++;
|
||||
@ -132,8 +132,8 @@ public class RangePartitionPruner implements PartitionPruner {
|
||||
if (filter.upperBound != null) {
|
||||
maxKey.pushColumn(filter.upperBound, keyColumn.getDataType());
|
||||
pushMaxCount++;
|
||||
if (filter.upperBoundInclusive && columnId != lastColumnId) {
|
||||
Column column = partitionColumns.get(columnId + 1);
|
||||
if (filter.upperBoundInclusive && columnIdx != lastColumnId) {
|
||||
Column column = partitionColumns.get(columnIdx + 1);
|
||||
maxKey.pushColumn(LiteralExpr.createInfinity(Type.fromPrimitiveType(column.getDataType()), true),
|
||||
column.getDataType());
|
||||
pushMaxCount++;
|
||||
@ -167,7 +167,7 @@ public class RangePartitionPruner implements PartitionPruner {
|
||||
LiteralExpr expr = (LiteralExpr) inPredicate.getChild(i);
|
||||
minKey.pushColumn(expr, keyColumn.getDataType());
|
||||
maxKey.pushColumn(expr, keyColumn.getDataType());
|
||||
Collection<Long> subList = prune(rangeMap, columnId + 1, minKey, maxKey, newComplex);
|
||||
Collection<Long> subList = prune(rangeMap, columnIdx + 1, minKey, maxKey, newComplex);
|
||||
for (long partId : subList) {
|
||||
resultSet.add(partId);
|
||||
}
|
||||
|
||||
@ -17,9 +17,6 @@
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
@ -27,8 +24,12 @@ import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
|
||||
import com.google.common.base.Objects;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Representation of the common elements of all scan nodes.
|
||||
*/
|
||||
@ -54,10 +55,6 @@ abstract public class ScanNode extends PlanNode {
|
||||
return result;
|
||||
}
|
||||
|
||||
public Map<String, PartitionColumnFilter> getColumnFilters() {
|
||||
return this.columnFilters;
|
||||
}
|
||||
|
||||
public void setColumnFilters(Map<String, PartitionColumnFilter> columnFilters) {
|
||||
this.columnFilters = columnFilters;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user