Reduce the number of partition info in BrokerScanNode param (#1675)
And we should reduce the number of partition info in BrokerScanNode param if user already set target partitions to load, instead of adding all partitions' info. It will cause the size of RPC packet too large.
This commit is contained in:
@ -342,26 +342,6 @@ Status BrokerScanNode::scanner_scan(
|
||||
continue;
|
||||
}
|
||||
|
||||
// The reason we check if partition_expr_ctxs is empty is when loading data to
|
||||
// a unpartitioned table who has no partition_expr_ctxs, user can specify
|
||||
// a partition name. And we check here to avoid crash and make our
|
||||
// process run as normal
|
||||
if (scan_range.params.__isset.partition_ids && !partition_expr_ctxs.empty()) {
|
||||
int64_t partition_id = get_partition_id(partition_expr_ctxs, row);
|
||||
if (partition_id == -1 ||
|
||||
!std::binary_search(scan_range.params.partition_ids.begin(),
|
||||
scan_range.params.partition_ids.end(),
|
||||
partition_id)) {
|
||||
counter->num_rows_filtered++;
|
||||
|
||||
std::stringstream error_msg;
|
||||
error_msg << "No corresponding partition, partition id: " << partition_id;
|
||||
_runtime_state->append_error_msg_to_file(Tuple::to_string(tuple, *_tuple_desc),
|
||||
error_msg.str());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// eval conjuncts of this row.
|
||||
if (eval_conjuncts(&conjunct_ctxs[0], conjunct_ctxs.size(), row)) {
|
||||
row_batch->commit_last_row();
|
||||
|
||||
@ -38,7 +38,6 @@ import org.apache.doris.catalog.BrokerTable;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.FsBroker;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Table;
|
||||
@ -124,8 +123,6 @@ public class BrokerScanNode extends ScanNode {
|
||||
private int nextBe = 0;
|
||||
|
||||
private Analyzer analyzer;
|
||||
|
||||
private List<DataSplitSink.EtlRangePartitionInfo> partitionInfos;
|
||||
private List<Expr> partitionExprs;
|
||||
|
||||
private static class ParamCreateContext {
|
||||
@ -205,22 +202,6 @@ public class BrokerScanNode extends ScanNode {
|
||||
this.strictMode = strictMode;
|
||||
}
|
||||
|
||||
private void createPartitionInfos() throws AnalysisException {
|
||||
if (partitionInfos != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, Expr> exprByName = Maps.newHashMap();
|
||||
|
||||
for (SlotDescriptor slotDesc : desc.getSlots()) {
|
||||
exprByName.put(slotDesc.getColumn().getName(), new SlotRef(slotDesc));
|
||||
}
|
||||
|
||||
partitionExprs = Lists.newArrayList();
|
||||
partitionInfos = DataSplitSink.EtlRangePartitionInfo.createParts(
|
||||
(OlapTable) targetTable, exprByName, null, partitionExprs);
|
||||
}
|
||||
|
||||
// Called from init, construct source tuple information
|
||||
private void initParams(ParamCreateContext context) throws AnalysisException, UserException {
|
||||
TBrokerScanRangeParams params = new TBrokerScanRangeParams();
|
||||
@ -230,14 +211,6 @@ public class BrokerScanNode extends ScanNode {
|
||||
params.setColumn_separator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]);
|
||||
params.setLine_delimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]);
|
||||
params.setStrict_mode(strictMode);
|
||||
|
||||
// Parse partition information
|
||||
List<Long> partitionIds = fileGroup.getPartitionIds();
|
||||
if (partitionIds != null && partitionIds.size() > 0) {
|
||||
params.setPartition_ids(partitionIds);
|
||||
createPartitionInfos();
|
||||
}
|
||||
|
||||
params.setProperties(brokerDesc.getProperties());
|
||||
initColumns(context);
|
||||
}
|
||||
@ -742,10 +715,6 @@ public class BrokerScanNode extends ScanNode {
|
||||
protected void toThrift(TPlanNode msg) {
|
||||
msg.node_type = TPlanNodeType.BROKER_SCAN_NODE;
|
||||
TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt());
|
||||
if (partitionInfos != null) {
|
||||
brokerScanNode.setPartition_exprs(Expr.treesToThrift(partitionExprs));
|
||||
brokerScanNode.setPartition_infos(DataSplitSink.EtlRangePartitionInfo.listToNonDistThrift(partitionInfos));
|
||||
}
|
||||
msg.setBroker_scan_node(brokerScanNode);
|
||||
}
|
||||
|
||||
|
||||
@ -362,7 +362,7 @@ public class DataSplitSink extends DataSink {
|
||||
}
|
||||
|
||||
public static List<EtlRangePartitionInfo> createParts(
|
||||
OlapTable tbl, Map<String, Expr> exprByCol, Set<String> targetPartitions,
|
||||
OlapTable tbl, Map<String, Expr> exprByCol, Set<Long> targetPartitionIds,
|
||||
List<Expr> partitionExprs) throws AnalysisException {
|
||||
List<EtlRangePartitionInfo> parts = Lists.newArrayList();
|
||||
PartitionInfo partInfo = tbl.getPartitionInfo();
|
||||
@ -372,7 +372,7 @@ public class DataSplitSink extends DataSink {
|
||||
partitionExprs.add(exprByCol.get(col.getName()));
|
||||
}
|
||||
for (Partition part : tbl.getPartitions()) {
|
||||
if (targetPartitions != null && !targetPartitions.contains(part.getName())) {
|
||||
if (targetPartitionIds != null && !targetPartitionIds.contains(part.getId())) {
|
||||
continue;
|
||||
}
|
||||
DistributionInfo distInfo = part.getDistributionInfo();
|
||||
|
||||
Reference in New Issue
Block a user