diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index d884845ab1..57f8d52e20 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -34,6 +34,7 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : VScanNode(pool, tnode, descs), _olap_scan_node(tnode.olap_scan_node) { _output_tuple_id = tnode.olap_scan_node.tuple_id; + _col_distribute_ids = tnode.olap_scan_node.distribute_column_ids; if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) { _limit_per_scanner = _olap_scan_node.sort_limit; } diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index cacc61df1d..d90aa9a911 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -29,9 +29,11 @@ public: const TupleDescriptor* input_tuple_desc, const TupleDescriptor* output_tuple_desc, const std::list& scanners, int64_t limit, - int64_t max_bytes_in_blocks_queue) + int64_t max_bytes_in_blocks_queue, const std::vector& col_distribute_ids) : vectorized::ScannerContext(state, parent, input_tuple_desc, output_tuple_desc, - scanners, limit, max_bytes_in_blocks_queue) {} + scanners, limit, max_bytes_in_blocks_queue), + _col_distribute_ids(col_distribute_ids), + _need_colocate_distribute(!_col_distribute_ids.empty()) {} Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, int id, bool wait = false) override { @@ -67,19 +69,54 @@ public: const int queue_size = _queue_mutexs.size(); const int block_size = blocks.size(); int64_t local_bytes = 0; - for (const auto& block : blocks) { - local_bytes += block->allocated_bytes(); - } - for (int i = 0; i < queue_size && i < block_size; ++i) { - int queue = _next_queue_to_feed; - { - std::lock_guard l(*_queue_mutexs[queue]); - for (int j = i; j < block_size; j += queue_size) { - _blocks_queues[queue].emplace_back(std::move(blocks[j])); + if (_need_colocate_distribute) { + std::vector hash_vals; + for (const auto& block : blocks) { + // vectorized calculate hash + int rows = block->rows(); + const auto element_size = _max_queue_size; + hash_vals.resize(rows); + std::fill(hash_vals.begin(), hash_vals.end(), 0); + auto* __restrict hashes = hash_vals.data(); + + for (int j = 0; j < _col_distribute_ids.size(); ++j) { + block->get_by_position(_col_distribute_ids[j]) + .column->update_crcs_with_value( + hash_vals, _output_tuple_desc->slots()[_col_distribute_ids[j]] + ->type() + .type); + } + for (int i = 0; i < rows; i++) { + hashes[i] = hashes[i] % element_size; + } + + std::vector channel2rows[element_size]; + for (int i = 0; i < rows; i++) { + channel2rows[hashes[i]].emplace_back(i); + } + + for (int i = 0; i < element_size; ++i) { + if (!channel2rows[i].empty()) { + _add_rows_colocate_blocks(block.get(), i, channel2rows[i]); + } } } - _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0; + } else { + for (const auto& block : blocks) { + local_bytes += block->allocated_bytes(); + } + + for (int i = 0; i < queue_size && i < block_size; ++i) { + int queue = _next_queue_to_feed; + { + std::lock_guard l(*_queue_mutexs[queue]); + for (int j = i; j < block_size; j += queue_size) { + _blocks_queues[queue].emplace_back(std::move(blocks[j])); + } + } + _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0; + } } _current_used_bytes += local_bytes; } @@ -95,18 +132,86 @@ public: _queue_mutexs.emplace_back(new std::mutex); _blocks_queues.emplace_back(std::list()); } + if (_need_colocate_distribute) { + int real_block_size = + limit == -1 ? _batch_size : std::min(static_cast(_batch_size), limit); + int64_t free_blocks_memory_usage = 0; + for (int i = 0; i < _max_queue_size; ++i) { + auto block = std::make_unique(_output_tuple_desc->slots(), + real_block_size, + true /*ignore invalid slots*/); + free_blocks_memory_usage += block->allocated_bytes(); + _colocate_mutable_blocks.emplace_back(new vectorized::MutableBlock(block.get())); + _colocate_blocks.emplace_back(std::move(block)); + _colocate_block_mutexs.emplace_back(new std::mutex); + } + _free_blocks_memory_usage->add(free_blocks_memory_usage); + } } bool has_enough_space_in_blocks_queue() const override { return _current_used_bytes < _max_bytes_in_queue / 2 * _max_queue_size; } + virtual void _dispose_coloate_blocks_not_in_queue() override { + if (_need_colocate_distribute) { + for (int i = 0; i < _max_queue_size; ++i) { + std::scoped_lock s(*_colocate_block_mutexs[i], *_queue_mutexs[i]); + if (_colocate_blocks[i] && !_colocate_blocks[i]->empty()) { + _current_used_bytes += _colocate_blocks[i]->allocated_bytes(); + _blocks_queues[i].emplace_back(std::move(_colocate_blocks[i])); + _colocate_mutable_blocks[i]->clear(); + } + } + } + } + private: int _max_queue_size = 1; int _next_queue_to_feed = 0; std::vector> _queue_mutexs; std::vector> _blocks_queues; std::atomic_int64_t _current_used_bytes = 0; + + const std::vector& _col_distribute_ids; + const bool _need_colocate_distribute; + std::vector _colocate_blocks; + std::vector> _colocate_mutable_blocks; + std::vector> _colocate_block_mutexs; + + void _add_rows_colocate_blocks(vectorized::Block* block, int loc, + const std::vector& rows) { + int row_wait_add = rows.size(); + const int batch_size = _batch_size; + const int* begin = &rows[0]; + std::lock_guard l(*_colocate_block_mutexs[loc]); + + while (row_wait_add > 0) { + int row_add = 0; + int max_add = batch_size - _colocate_mutable_blocks[loc]->rows(); + if (row_wait_add >= max_add) { + row_add = max_add; + } else { + row_add = row_wait_add; + } + + _colocate_mutable_blocks[loc]->add_rows(block, begin, begin + row_add); + row_wait_add -= row_add; + begin += row_add; + + if (row_add == max_add) { + _current_used_bytes += _colocate_blocks[loc]->allocated_bytes(); + { + std::lock_guard queue_l(*_queue_mutexs[loc]); + _blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc])); + } + bool get_block_not_empty = true; + _colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty); + _colocate_mutable_blocks[loc]->set_muatable_columns( + _colocate_blocks[loc]->mutate_columns()); + } + } + } }; } // namespace pipeline } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 0b3cbcd3d7..f9b6ad4831 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -114,14 +114,17 @@ Status ScannerContext::init() { return Status::OK(); } -vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block) { +vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block, + bool get_block_not_empty) { { std::lock_guard l(_free_blocks_lock); if (!_free_blocks.empty()) { - auto block = std::move(_free_blocks.back()); - _free_blocks.pop_back(); - _free_blocks_memory_usage->add(-block->allocated_bytes()); - return block; + if (!get_block_not_empty || _free_blocks.back()->mem_reuse()) { + auto block = std::move(_free_blocks.back()); + _free_blocks.pop_back(); + _free_blocks_memory_usage->add(-block->allocated_bytes()); + return block; + } } } *has_free_block = false; @@ -319,6 +322,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) { // same scanner. if (scanner->need_to_close() && scanner->set_counted_down() && (--_num_unfinished_scanners) == 0) { + _dispose_coloate_blocks_not_in_queue(); _is_finished = true; _blocks_queue_added_cv.notify_one(); } diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 1d937c5fb8..ea887e9ad3 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -55,7 +55,7 @@ public: virtual ~ScannerContext() = default; Status init(); - vectorized::BlockUPtr get_free_block(bool* has_free_block); + vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false); void return_free_block(std::unique_ptr block); // Append blocks from scanners to the blocks queue. @@ -140,6 +140,8 @@ private: Status _close_and_clear_scanners(VScanNode* node, RuntimeState* state); protected: + virtual void _dispose_coloate_blocks_not_in_queue() {} + RuntimeState* _state; VScanNode* _parent; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 1286baa573..191091d6ed 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -261,9 +261,9 @@ Status VScanNode::_init_profile() { Status VScanNode::_start_scanners(const std::list& scanners) { if (_is_pipeline_scan) { - _scanner_ctx.reset(new pipeline::PipScannerContext(_state, this, _input_tuple_desc, - _output_tuple_desc, scanners, limit(), - _state->query_options().mem_limit / 20)); + _scanner_ctx.reset(new pipeline::PipScannerContext( + _state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(), + _state->query_options().mem_limit / 20, _col_distribute_ids)); } else { _scanner_ctx.reset(new ScannerContext(_state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(), diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index de641e6055..da25b63024 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -312,6 +312,7 @@ protected: RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage; std::unordered_map _colname_to_slot_id; + std::vector _col_distribute_ids; private: // Register and get all runtime filters at Init phase. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 43e911674a..9519b1e6fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -932,9 +932,11 @@ public class DistributedPlanner { return createPhase2DistinctAggregationFragment(node, childFragment, fragments); } else { if (canColocateAgg(node.getAggInfo(), childFragment.getDataPartition()) - && childFragment.getPlanRoot().shouldColoAgg()) { + || childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo())) { + childFragment.getPlanRoot().setShouldColoScan(); childFragment.addPlanRoot(node); - childFragment.setHasColocatePlanNode(true); + // pipeline here should use shared scan to improve performance + childFragment.setHasColocatePlanNode(!ConnectContext.get().getSessionVariable().enablePipelineEngine()); return childFragment; } else { return createMergeAggregationFragment(node, childFragment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 59a8e3283d..9cc7e14451 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -17,6 +17,7 @@ package org.apache.doris.planner; +import org.apache.doris.analysis.AggregateInfo; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BaseTableRef; import org.apache.doris.analysis.BinaryPredicate; @@ -185,15 +186,28 @@ public class OlapScanNode extends ScanNode { private Map pointQueryEqualPredicats; private DescriptorTable descTable; + private Set distributionColumnIds; + + private boolean shouldColoScan = false; + // Constructs node to scan given data files of table 'tbl'. public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { super(id, desc, planNodeName, StatisticalType.OLAP_SCAN_NODE); olapTable = (OlapTable) desc.getTable(); + distributionColumnIds = Sets.newTreeSet(); + + Set distColumnName = olapTable != null + ? olapTable.getDistributionColumnNames() : Sets.newTreeSet(); + int columnId = 0; // use for Nereids to generate uniqueId set for inverted index to avoid scan unnecessary big size column for (SlotDescriptor slotDescriptor : desc.getSlots()) { if (slotDescriptor.getColumn() != null) { outputColumnUniqueIds.add(slotDescriptor.getColumn().getUniqueId()); + if (distColumnName.contains(slotDescriptor.getColumn().getName().toLowerCase())) { + distributionColumnIds.add(columnId); + } } + columnId++; } } @@ -1166,17 +1180,48 @@ public class OlapScanNode extends ScanNode { } @Override - public boolean shouldColoAgg() { - // In pipeline exec engine, the instance num is parallel instance. we should disable colo agg - // in parallelInstance >= tablet_num * 2 to use more thread to speed up the query - if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) { - int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); - return parallelInstance < result.size() * 2; + public boolean shouldColoAgg(AggregateInfo aggregateInfo) { + distributionColumnIds.clear(); + if (ConnectContext.get().getSessionVariable().enablePipelineEngine() + && ConnectContext.get().getSessionVariable().enableColocateScan()) { + List aggPartitionExprs = aggregateInfo.getInputPartitionExprs(); + List slots = desc.getSlots(); + for (Expr aggExpr : aggPartitionExprs) { + if (aggExpr instanceof SlotRef) { + SlotDescriptor slotDesc = ((SlotRef) aggExpr).getDesc(); + int columnId = 0; + for (SlotDescriptor slotDescriptor : slots) { + if (slotDescriptor.equals(slotDesc)) { + if (slotDescriptor.getType().isFixedLengthType() + || slotDescriptor.getType().isStringType()) { + distributionColumnIds.add(columnId); + } else { + return false; + } + } + columnId++; + } + } + } + + for (int i = 0; i < slots.size(); i++) { + if (!distributionColumnIds.contains(i) && (!slots.get(i).getType().isFixedLengthType() + || slots.get(i).getType().isStringType())) { + return false; + } + } + + return !distributionColumnIds.isEmpty(); } else { - return true; + return false; } } + @Override + public void setShouldColoScan() { + shouldColoScan = true; + } + @Override protected void toThrift(TPlanNode msg) { List keyColumnNames = new ArrayList(); @@ -1233,6 +1278,10 @@ public class OlapScanNode extends ScanNode { if (outputColumnUniqueIds != null) { msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds); } + + if (shouldColoScan) { + msg.olap_scan_node.setDistributeColumnIds(new ArrayList<>(distributionColumnIds)); + } } // export some tablets diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 3d6ad9f57b..4e3412b760 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -20,6 +20,7 @@ package org.apache.doris.planner; +import org.apache.doris.analysis.AggregateInfo; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BitmapFilterPredicate; import org.apache.doris.analysis.CompoundPredicate; @@ -835,10 +836,12 @@ public abstract class PlanNode extends TreeNode implements PlanStats { return numInstances; } - public boolean shouldColoAgg() { - return true; + public boolean shouldColoAgg(AggregateInfo aggregateInfo) { + return false; } + public void setShouldColoScan() {} + public void setNumInstances(int numInstances) { this.numInstances = numInstances; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 34764de514..9a12e3a168 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -96,6 +96,7 @@ public class SessionVariable implements Serializable, Writable { public static final String BATCH_SIZE = "batch_size"; public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations"; public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan"; + public static final String ENABLE_COLOCATE_SCAN = "enable_colocate_scan"; public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join"; public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; @@ -454,6 +455,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = DISABLE_COLOCATE_PLAN) public boolean disableColocatePlan = false; + @VariableMgr.VarAttr(name = ENABLE_COLOCATE_SCAN) + public boolean enableColocateScan = false; + @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN) public boolean enableBucketShuffleJoin = true; @@ -1097,6 +1101,10 @@ public class SessionVariable implements Serializable, Writable { return disableColocatePlan; } + public boolean enableColocateScan() { + return enableColocateScan; + } + public boolean isEnableBucketShuffleJoin() { return enableBucketShuffleJoin; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index b5dda4e6d9..0eaf1d61a0 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -581,6 +581,7 @@ struct TOlapScanNode { 13: optional bool use_topn_opt 14: optional list indexes_desc 15: optional set output_column_unique_ids + 16: optional list distribute_column_ids } struct TEqJoinCondition {