diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 0b0c356367..a2b1afbe48 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -206,9 +206,6 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re if (request.__isset.load_job_id) { _runtime_state->set_load_job_id(request.load_job_id); } - if (request.__isset.shared_scan_opt) { - _runtime_state->set_shared_scan_opt(request.shared_scan_opt); - } if (request.query_options.__isset.is_report_success) { fragment_context->set_is_report_success(request.query_options.is_report_success); @@ -258,7 +255,10 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re auto* scan_node = static_cast(scan_nodes[i]); const std::vector& scan_ranges = find_with_default( local_params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); + const bool shared_scan = + find_with_default(local_params.per_node_shared_scans, scan_node->id(), false); scan_node->set_scan_ranges(scan_ranges); + scan_node->set_shared_scan(_runtime_state.get(), shared_scan); } else { ScanNode* scan_node = static_cast(node); const std::vector& scan_ranges = find_with_default( diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 02b9f0f5c3..70e771c5c3 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -201,10 +201,6 @@ public: int64_t load_job_id() const { return _load_job_id; } - void set_shared_scan_opt(bool shared_scan_opt) { _shared_scan_opt = shared_scan_opt; } - - bool shared_scan_opt() const { return _shared_scan_opt; } - const std::string get_error_log_file_path() const { return _error_log_file_path; } // append error msg and error line to file when loading data. @@ -458,7 +454,6 @@ private: std::string _db_name; std::string _load_dir; int64_t _load_job_id; - bool _shared_scan_opt = false; // mini load int64_t _normal_row_number; diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 1c1cd3f73f..8f5d7aac91 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -55,7 +55,7 @@ Status ScannerContext::init() { // 1. Calculate max concurrency // TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4 // should find a more reasonable value. - _max_thread_num = _state->shared_scan_opt() ? config::doris_scanner_thread_pool_thread_num + _max_thread_num = _parent->_shared_scan_opt ? config::doris_scanner_thread_pool_thread_num : config::doris_scanner_thread_pool_thread_num / 4; _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; DCHECK(_max_thread_num > 0); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 22378d2744..a1d07b1f15 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -67,7 +67,6 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); _state = state; _is_pipeline_scan = state->enable_pipeline_exec(); - _shared_scan_opt = state->shared_scan_opt(); const TQueryOptions& query_options = state->query_options(); if (query_options.__isset.max_scan_key_num) { diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index da25b63024..996a8ea6d7 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -74,6 +74,23 @@ public: virtual void set_scan_ranges(const std::vector& scan_ranges) {} + void set_shared_scan(RuntimeState* state, bool shared_scan) { + _shared_scan_opt = shared_scan; + if (_is_pipeline_scan) { + if (_shared_scan_opt) { + _shared_scanner_controller = + state->get_query_fragments_ctx()->get_shared_scanner_controller(); + auto [should_create_scanner, queue_id] = + _shared_scanner_controller->should_build_scanner_and_queue_id(id()); + _should_create_scanner = should_create_scanner; + _context_queue_id = queue_id; + } else { + _should_create_scanner = true; + _context_queue_id = 0; + } + } + } + // Get next block. // If eos is true, no more data will be read and block should be empty. Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 9cc7e14451..cee6ff90d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1222,6 +1222,11 @@ public class OlapScanNode extends ScanNode { shouldColoScan = true; } + @Override + public boolean getShouldColoScan() { + return shouldColoScan; + } + @Override protected void toThrift(TPlanNode msg) { List keyColumnNames = new ArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 7ab8963de3..4a807a6452 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -850,6 +850,10 @@ public abstract class PlanNode extends TreeNode implements PlanStats { public void setShouldColoScan() {} + public boolean getShouldColoScan() { + return false; + } + public void setNumInstances(int numInstances) { this.numInstances = numInstances; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 64c26dd753..597978eef4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -139,6 +139,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -1631,7 +1632,6 @@ public class Coordinator { bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params); } else { - params.sharedScanOpt = true; // case A for (Entry>> entry : fragmentExecParamsMap.get( fragment.getFragmentId()).scanRangeAssignment.entrySet()) { @@ -1641,7 +1641,14 @@ public class Coordinator { for (Integer planNodeId : value.keySet()) { List perNodeScanRanges = value.get(planNodeId); List> perInstanceScanRanges = Lists.newArrayList(); - if (!enablePipelineEngine) { + List sharedScanOpts = Lists.newArrayList(); + + Optional node = scanNodes.stream().filter(scanNode -> { + return scanNode.getId().asInt() == planNodeId; + }).findFirst(); + + if (!enablePipelineEngine || perNodeScanRanges.size() > parallelExecInstanceNum + || (node.isPresent() && node.get().getShouldColoScan())) { int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { //the scan instance num should not larger than the tablets num @@ -1649,19 +1656,24 @@ public class Coordinator { } perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, expectedInstanceNum); + sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), false); } else { int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); - for (int j = 0; j < Math.max(expectedInstanceNum, 1); j++) { - perInstanceScanRanges.add(perNodeScanRanges); - } + expectedInstanceNum = Math.max(expectedInstanceNum, 1); + perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges); + sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), true); } LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size()); - for (List scanRangeParams : perInstanceScanRanges) { + for (int j = 0; j < perInstanceScanRanges.size(); j++) { + List scanRangeParams = perInstanceScanRanges.get(j); + boolean sharedScan = sharedScanOpts.get(j); + FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params); instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams); + instanceParam.perNodeSharedScans.put(planNodeId, sharedScan); params.instanceExecParams.add(instanceParam); } } @@ -3059,8 +3071,6 @@ public class Coordinator { public List instanceExecParams = Lists.newArrayList(); public FragmentScanRangeAssignment scanRangeAssignment = new FragmentScanRangeAssignment(); - public boolean sharedScanOpt = false; - public FragmentExecParams(PlanFragment fragment) { this.fragment = fragment; } @@ -3152,7 +3162,6 @@ public class Coordinator { fragment.isTransferQueryStatisticsWithEveryBatch()); params.setFragment(fragment.toThrift()); params.setLocalParams(Lists.newArrayList()); - params.setSharedScanOpt(sharedScanOpt); if (tResourceGroups != null) { params.setResourceGroups(tResourceGroups); } @@ -3164,10 +3173,13 @@ public class Coordinator { localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin); localParams.setFragmentInstanceId(instanceExecParam.instanceId); Map> scanRanges = instanceExecParam.perNodeScanRanges; + Map perNodeSharedScans = instanceExecParam.perNodeSharedScans; if (scanRanges == null) { scanRanges = Maps.newHashMap(); + perNodeSharedScans = Maps.newHashMap(); } localParams.setPerNodeScanRanges(scanRanges); + localParams.setPerNodeSharedScans(perNodeSharedScans); localParams.setSenderId(i); localParams.setBackendNum(backendNum++); localParams.setRuntimeFilterParams(new TRuntimeFilterParams()); @@ -3265,6 +3277,7 @@ public class Coordinator { TUniqueId instanceId; TNetworkAddress host; Map> perNodeScanRanges = Maps.newHashMap(); + Map perNodeSharedScans = Maps.newHashMap(); int perFragmentInstanceIdx; diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 617ea8c478..1d3e636425 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -281,6 +281,7 @@ struct TPlanFragmentExecParams { 11: optional bool send_query_statistics_with_every_batch // Used to merge and send runtime filter 12: optional TRuntimeFilterParams runtime_filter_params + } // Global query parameters assigned by the coordinator. @@ -572,6 +573,7 @@ struct TPipelineInstanceParams { 4: optional i32 sender_id 5: optional TRuntimeFilterParams runtime_filter_params 6: optional i32 backend_num + 7: optional map per_node_shared_scans } struct TPipelineResourceGroup { @@ -608,7 +610,6 @@ struct TPipelineFragmentParams { 22: optional TGlobalDict global_dict // scan node could use the global dict to encode the string value to an integer 23: optional Planner.TPlanFragment fragment 24: list local_params - 25: optional bool shared_scan_opt = false; 26: optional list resource_groups }