From 0a27ef030bca71897baaee11c3cea9905e44233d Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 20 Aug 2019 19:30:57 +0800 Subject: [PATCH] Reduce the number of partition info in BrokerScanNode param (#1675) And we should reduce the number of partition info in BrokerScanNode param if user already set target partitions to load, instead of adding all partitions' info. It will cause the size of RPC packet too large. --- be/src/exec/broker_scan_node.cpp | 20 ------------ .../apache/doris/planner/BrokerScanNode.java | 31 ------------------- .../apache/doris/planner/DataSplitSink.java | 4 +-- 3 files changed, 2 insertions(+), 53 deletions(-) diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index e1623dae35..5e9fccf4dd 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -342,26 +342,6 @@ Status BrokerScanNode::scanner_scan( continue; } - // The reason we check if partition_expr_ctxs is empty is when loading data to - // a unpartitioned table who has no partition_expr_ctxs, user can specify - // a partition name. And we check here to avoid crash and make our - // process run as normal - if (scan_range.params.__isset.partition_ids && !partition_expr_ctxs.empty()) { - int64_t partition_id = get_partition_id(partition_expr_ctxs, row); - if (partition_id == -1 || - !std::binary_search(scan_range.params.partition_ids.begin(), - scan_range.params.partition_ids.end(), - partition_id)) { - counter->num_rows_filtered++; - - std::stringstream error_msg; - error_msg << "No corresponding partition, partition id: " << partition_id; - _runtime_state->append_error_msg_to_file(Tuple::to_string(tuple, *_tuple_desc), - error_msg.str()); - continue; - } - } - // eval conjuncts of this row. if (eval_conjuncts(&conjunct_ctxs[0], conjunct_ctxs.size(), row)) { row_batch->commit_last_row(); diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index d3a7c45d5b..b1742753a8 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -38,7 +38,6 @@ import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FsBroker; -import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; @@ -124,8 +123,6 @@ public class BrokerScanNode extends ScanNode { private int nextBe = 0; private Analyzer analyzer; - - private List partitionInfos; private List partitionExprs; private static class ParamCreateContext { @@ -205,22 +202,6 @@ public class BrokerScanNode extends ScanNode { this.strictMode = strictMode; } - private void createPartitionInfos() throws AnalysisException { - if (partitionInfos != null) { - return; - } - - Map exprByName = Maps.newHashMap(); - - for (SlotDescriptor slotDesc : desc.getSlots()) { - exprByName.put(slotDesc.getColumn().getName(), new SlotRef(slotDesc)); - } - - partitionExprs = Lists.newArrayList(); - partitionInfos = DataSplitSink.EtlRangePartitionInfo.createParts( - (OlapTable) targetTable, exprByName, null, partitionExprs); - } - // Called from init, construct source tuple information private void initParams(ParamCreateContext context) throws AnalysisException, UserException { TBrokerScanRangeParams params = new TBrokerScanRangeParams(); @@ -230,14 +211,6 @@ public class BrokerScanNode extends ScanNode { params.setColumn_separator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]); params.setLine_delimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]); params.setStrict_mode(strictMode); - - // Parse partition information - List partitionIds = fileGroup.getPartitionIds(); - if (partitionIds != null && partitionIds.size() > 0) { - params.setPartition_ids(partitionIds); - createPartitionInfos(); - } - params.setProperties(brokerDesc.getProperties()); initColumns(context); } @@ -742,10 +715,6 @@ public class BrokerScanNode extends ScanNode { protected void toThrift(TPlanNode msg) { msg.node_type = TPlanNodeType.BROKER_SCAN_NODE; TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt()); - if (partitionInfos != null) { - brokerScanNode.setPartition_exprs(Expr.treesToThrift(partitionExprs)); - brokerScanNode.setPartition_infos(DataSplitSink.EtlRangePartitionInfo.listToNonDistThrift(partitionInfos)); - } msg.setBroker_scan_node(brokerScanNode); } diff --git a/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java b/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java index 8e8f25ba8f..6bdfb2e81d 100644 --- a/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java +++ b/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java @@ -362,7 +362,7 @@ public class DataSplitSink extends DataSink { } public static List createParts( - OlapTable tbl, Map exprByCol, Set targetPartitions, + OlapTable tbl, Map exprByCol, Set targetPartitionIds, List partitionExprs) throws AnalysisException { List parts = Lists.newArrayList(); PartitionInfo partInfo = tbl.getPartitionInfo(); @@ -372,7 +372,7 @@ public class DataSplitSink extends DataSink { partitionExprs.add(exprByCol.get(col.getName())); } for (Partition part : tbl.getPartitions()) { - if (targetPartitions != null && !targetPartitions.contains(part.getName())) { + if (targetPartitionIds != null && !targetPartitionIds.contains(part.getId())) { continue; } DistributionInfo distInfo = part.getDistributionInfo();