diff --git a/be/src/common/config.h b/be/src/common/config.h index 44749c095c..10aeeff6d5 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -840,6 +840,10 @@ CONF_Int32(doris_remote_scanner_thread_pool_thread_num, "16"); // number of s3 scanner thread pool queue size CONF_Int32(doris_remote_scanner_thread_pool_queue_size, "10240"); +// If set to true, the new scan node framework will be used. +// This config should be removed when the new scan node is ready. +CONF_Bool(enable_new_scan_node, "false"); + #ifdef BE_TEST // test s3 CONF_String(test_s3_resource, "resource"); diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 5c4981c739..66827019f9 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -62,6 +62,7 @@ #include "vec/core/block.h" #include "vec/exec/file_scan_node.h" #include "vec/exec/join/vhash_join_node.h" +#include "vec/exec/scan/new_olap_scan_node.h" #include "vec/exec/vaggregation_node.h" #include "vec/exec/vanalytic_eval_node.h" #include "vec/exec/vassert_num_rows_node.h" @@ -213,12 +214,15 @@ Status ExecNode::prepare(RuntimeState* state) { if (_vconjunct_ctx_ptr) { RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, _row_descriptor)); } - if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode)) { + + // For vectorized olap scan node, the conjuncts is prepared in _vconjunct_ctx_ptr. + // And _conjunct_ctxs is useless. + // TODO: Should be removed when non-vec engine is removed. + if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode) && + typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) { RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _row_descriptor)); } - // TODO(zc): - // AddExprCtxsToFree(_conjunct_ctxs); for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->prepare(state)); } @@ -231,7 +235,8 @@ Status ExecNode::open(RuntimeState* state) { if (_vconjunct_ctx_ptr) { RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state)); } - if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode)) { + if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode) && + typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) { return Expr::open(_conjunct_ctxs, state); } else { return Status::OK(); @@ -275,7 +280,8 @@ Status ExecNode::close(RuntimeState* state) { if (_vconjunct_ctx_ptr) { (*_vconjunct_ctx_ptr)->close(state); } - if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode)) { + if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode) && + typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) { Expr::close(_conjunct_ctxs, state); } @@ -453,7 +459,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::OLAP_SCAN_NODE: if (state->enable_vectorized_exec()) { - *node = pool->add(new vectorized::VOlapScanNode(pool, tnode, descs)); + if (config::enable_new_scan_node) { + *node = pool->add(new vectorized::NewOlapScanNode(pool, tnode, descs)); + } else { + *node = pool->add(new vectorized::VOlapScanNode(pool, tnode, descs)); + } } else { *node = pool->add(new OlapScanNode(pool, tnode, descs)); } @@ -682,8 +692,16 @@ void ExecNode::try_do_aggregate_serde_improve() { return; } - ScanNode* scan_node = static_cast(agg_node[0]->_children[0]); - scan_node->set_no_agg_finalize(); + // TODO(cmy): should be removed when NewOlapScanNode is ready + ExecNode* child0 = agg_node[0]->_children[0]; + if (typeid(*child0) == typeid(vectorized::NewOlapScanNode)) { + vectorized::VScanNode* scan_node = + static_cast(agg_node[0]->_children[0]); + scan_node->set_no_agg_finalize(); + } else { + ScanNode* scan_node = static_cast(agg_node[0]->_children[0]); + scan_node->set_no_agg_finalize(); + } } void ExecNode::init_runtime_profile(const std::string& name) { diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 99caa8afb9..c6759b2727 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -1491,13 +1491,6 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } } - ThreadPoolToken* thread_token = nullptr; - if (limit() != -1 && limit() < 1024) { - thread_token = state->get_query_fragments_ctx()->get_serial_token(); - } else { - thread_token = state->get_query_fragments_ctx()->get_token(); - } - /********************************* * The basic strategy of priority scheduling: * 1. Determine the initial nice value by querying the number of split ranges @@ -1508,6 +1501,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { * The larger the nice value, the more preferentially obtained query resources * 4. Regularly increase the priority of the remaining tasks in the queue to avoid starvation for large queries *********************************/ + ThreadPoolToken* thread_token = state->get_query_fragments_ctx()->get_token(); PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool(); _total_assign_num = 0; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 6a3b71ca4d..bb10b75aaa 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -24,7 +24,8 @@ namespace doris { namespace vectorized { class VDataStreamMgr; -} +class ScannerScheduler; +} // namespace vectorized class BfdParser; class BrokerMgr; @@ -155,6 +156,7 @@ public: StreamLoadExecutor* stream_load_executor() { return _stream_load_executor; } RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; } HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; } + doris::vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } private: Status _init(const std::vector& store_paths); @@ -232,6 +234,7 @@ private: SmallFileMgr* _small_file_mgr = nullptr; HeartbeatFlags* _heartbeat_flags = nullptr; StoragePolicyMgr* _storage_policy_mgr = nullptr; + doris::vectorized::ScannerScheduler* _scanner_scheduler = nullptr; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 360e2e7a54..3057681596 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -58,6 +58,7 @@ #include "util/pretty_printer.h" #include "util/priority_thread_pool.hpp" #include "util/priority_work_stealing_thread_pool.hpp" +#include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/vdata_stream_mgr.h" #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ @@ -129,6 +130,8 @@ Status ExecEnv::_init(const std::vector& store_paths) { .set_max_queue_size(config::send_batch_thread_pool_queue_size) .build(&_send_batch_thread_pool); + _scanner_scheduler = new doris::vectorized::ScannerScheduler(); + _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups); _fragment_mgr = new FragmentMgr(this); _result_cache = new ResultCache(config::query_cache_max_size_mb, @@ -160,6 +163,8 @@ Status ExecEnv::_init(const std::vector& store_paths) { } _broker_mgr->init(); _small_file_mgr->init(); + _scanner_scheduler->init(this); + _init_mem_tracker(); RETURN_IF_ERROR( @@ -360,6 +365,7 @@ void ExecEnv::_destroy() { SAFE_DELETE(_heartbeat_flags); SAFE_DELETE(_task_pool_mem_tracker_registry); SAFE_DELETE(_buffer_reservation); + SAFE_DELETE(_scanner_scheduler); DEREGISTER_HOOK_METRIC(query_mem_consumption); DEREGISTER_HOOK_METRIC(load_mem_consumption); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7b60e9d406..60e4d2ffc4 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -245,6 +245,7 @@ Status FragmentExecState::execute() { CgroupsMgr::apply_system_cgroup(); WARN_IF_ERROR(_executor.open(), strings::Substitute("Got error while opening fragment $0", print_id(_fragment_instance_id))); + _executor.close(); } DorisMetrics::instance()->fragment_requests_total->increment(1); @@ -635,21 +636,33 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi fragments_ctx->set_rsc_info = true; } - if (params.__isset.query_options) { - fragments_ctx->timeout_second = params.query_options.query_timeout; - if (params.query_options.__isset.resource_limit) { - fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit); - } + fragments_ctx->timeout_second = params.query_options.query_timeout; + +#ifndef BE_TEST + // set thread token + // the thread token will be set if + // 1. the cpu_limit is set, or + // 2. the limit is very small ( < 1024) + int concurrency = 1; + bool is_serial = false; + if (params.query_options.__isset.resource_limit && + params.query_options.resource_limit.__isset.cpu_limit) { + concurrency = params.query_options.resource_limit.cpu_limit; + } else { + concurrency = config::doris_scanner_thread_pool_thread_num; } if (params.__isset.fragment && params.fragment.__isset.plan && params.fragment.plan.nodes.size() > 0) { for (auto& node : params.fragment.plan.nodes) { if (node.limit > 0 && node.limit < 1024) { - fragments_ctx->set_serial_thread_token(); + concurrency = 1; + is_serial = true; break; } } } + fragments_ctx->set_thread_token(concurrency, is_serial); +#endif { // Find _fragments_ctx_map again, in case some other request has already diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 6599785a41..90870b6a4e 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -47,6 +47,7 @@ #include "util/telemetry/telemetry.h" #include "util/uid_util.h" #include "vec/core/block.h" +#include "vec/exec/scan/new_olap_scan_node.h" #include "vec/exec/vexchange_node.h" #include "vec/runtime/vdata_stream_mgr.h" @@ -165,11 +166,20 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, _plan->try_do_aggregate_serde_improve(); for (int i = 0; i < scan_nodes.size(); ++i) { - ScanNode* scan_node = static_cast(scan_nodes[i]); - const std::vector& scan_ranges = - find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); - scan_node->set_scan_ranges(scan_ranges); - VLOG_CRITICAL << "scan_node_Id=" << scan_node->id() << " size=" << scan_ranges.size(); + // TODO(cmy): this "if...else" should be removed once all ScanNode are derived from VScanNode. + ExecNode* node = scan_nodes[i]; + if (typeid(*node) == typeid(vectorized::NewOlapScanNode)) { + vectorized::VScanNode* scan_node = static_cast(scan_nodes[i]); + const std::vector& scan_ranges = + find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); + scan_node->set_scan_ranges(scan_ranges); + } else { + ScanNode* scan_node = static_cast(scan_nodes[i]); + const std::vector& scan_ranges = + find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); + scan_node->set_scan_ranges(scan_ranges); + VLOG_CRITICAL << "scan_node_Id=" << scan_node->id() << " size=" << scan_ranges.size(); + } } _runtime_state->set_per_fragment_instance_idx(params.sender_id); diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 78d84cb4b2..80a07f47e4 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -52,22 +52,15 @@ public: return false; } - void set_thread_token(int cpu_limit) { - if (cpu_limit > 0) { - // For now, cpu_limit will be the max concurrency of the scan thread pool token. - _thread_token = _exec_env->limited_scan_thread_pool()->new_token( - ThreadPool::ExecutionMode::CONCURRENT, cpu_limit); - } - } - void set_serial_thread_token() { - _serial_thread_token = _exec_env->limited_scan_thread_pool()->new_token( - ThreadPool::ExecutionMode::SERIAL, 1); + void set_thread_token(int concurrency, bool is_serial) { + _thread_token = _exec_env->limited_scan_thread_pool()->new_token( + is_serial ? ThreadPool::ExecutionMode::SERIAL + : ThreadPool::ExecutionMode::CONCURRENT, + concurrency); } ThreadPoolToken* get_token() { return _thread_token.get(); } - ThreadPoolToken* get_serial_token() { return _serial_thread_token.get(); } - void set_ready_to_execute() { { std::lock_guard l(_start_lock); @@ -114,11 +107,6 @@ private: // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env. std::unique_ptr _thread_token; - // A token used to submit olap scanner to the "_limited_scan_thread_pool" serially, it used for - // query like `select * limit 1`, this query used for limit the max scaner thread to 1 to avoid - // this query cost too much resource - std::unique_ptr _serial_thread_token; - std::mutex _start_lock; std::condition_variable _start_cond; // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState. diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 8743013590..f1fc4b4ec2 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -232,8 +232,15 @@ set(VEC_FILES exec/format/parquet/schema_desc.cpp exec/format/parquet/vparquet_column_reader.cpp exec/format/parquet/level_decoder.cpp - exec/format/parquet/parquet_common.cpp) + exec/format/parquet/parquet_common.cpp + exec/scan/vscan_node.cpp + exec/scan/vscanner.cpp + exec/scan/scanner_context.cpp + exec/scan/scanner_scheduler.cpp + exec/scan/new_olap_scan_node.cpp + exec/scan/new_olap_scanner.cpp +) add_library(Vec STATIC - ${VEC_FILES} -) + ${VEC_FILES} + ) diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp new file mode 100644 index 0000000000..bd48e288ec --- /dev/null +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -0,0 +1,307 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_olap_scan_node.h" + +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "vec/columns/column_const.h" +#include "vec/exec/scan/new_olap_scanner.h" +#include "vec/functions/in.h" + +namespace doris::vectorized { + +NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : VScanNode(pool, tnode, descs), _olap_scan_node(tnode.olap_scan_node) { + _output_tuple_id = tnode.olap_scan_node.tuple_id; + if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) { + _limit_per_scanner = _olap_scan_node.sort_limit; + } +} + +Status NewOlapScanNode::prepare(RuntimeState* state) { + RETURN_IF_ERROR(VScanNode::prepare(state)); + _scanner_mem_tracker = std::make_unique("OlapScanners"); + return Status::OK(); +} + +Status NewOlapScanNode::_init_profile() { + return Status::OK(); +} + +Status NewOlapScanNode::_process_conjuncts() { + RETURN_IF_ERROR(VScanNode::_process_conjuncts()); + if (_eos) { + return Status::OK(); + } + RETURN_IF_ERROR(_build_key_ranges_and_filters()); + return Status::OK(); +} + +Status NewOlapScanNode::_build_key_ranges_and_filters() { + const std::vector& column_names = _olap_scan_node.key_column_name; + const std::vector& column_types = _olap_scan_node.key_column_type; + DCHECK(column_types.size() == column_names.size()); + + // 1. construct scan key except last olap engine short key + _scan_keys.set_is_convertible(limit() == -1); + + // we use `exact_range` to identify a key range is an exact range or not when we convert + // it to `_scan_keys`. If `exact_range` is true, we can just discard it from `_olap_filters`. + bool exact_range = true; + for (int column_index = 0; column_index < column_names.size() && !_scan_keys.has_range_value(); + ++column_index) { + auto iter = _colname_to_value_range.find(column_names[column_index]); + if (_colname_to_value_range.end() == iter) { + break; + } + + RETURN_IF_ERROR(std::visit( + [&](auto&& range) { + RETURN_IF_ERROR( + _scan_keys.extend_scan_key(range, _max_scan_key_num, &exact_range)); + if (exact_range) { + _colname_to_value_range.erase(iter->first); + } + return Status::OK(); + }, + iter->second)); + } + + for (auto& iter : _colname_to_value_range) { + std::vector filters; + std::visit([&](auto&& range) { range.to_olap_filter(filters); }, iter.second); + + for (const auto& filter : filters) { + _olap_filters.push_back(std::move(filter)); + } + } + + // _runtime_profile->add_info_string("PushDownPredicate", olap_filters_to_string(_olap_filters)); + // _runtime_profile->add_info_string("KeyRanges", _scan_keys.debug_string()); + VLOG_CRITICAL << _scan_keys.debug_string(); + + return Status::OK(); +} + +bool NewOlapScanNode::_should_push_down_binary_predicate( + VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* constant_val, + int* slot_ref_child, const std::function& fn_checker) { + if (!fn_checker(fn_call->fn().name.function_name)) { + return false; + } + + const auto& children = fn_call->children(); + DCHECK(children.size() == 2); + for (size_t i = 0; i < children.size(); i++) { + if (VExpr::expr_without_cast(children[i])->node_type() != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + continue; + } + if (!children[1 - i]->is_constant()) { + // only handle constant value + return false; + } else { + if (const ColumnConst* const_column = check_and_get_column( + children[1 - i]->get_const_col(expr_ctx)->column_ptr)) { + *slot_ref_child = i; + *constant_val = const_column->get_data_at(0); + } else { + return false; + } + } + } + return true; +} + +bool NewOlapScanNode::_should_push_down_in_predicate(VInPredicate* pred, VExprContext* expr_ctx, + bool is_not_in) { + if (pred->is_not_in() != is_not_in) { + return false; + } + InState* state = reinterpret_cast( + expr_ctx->fn_context(pred->fn_context_index()) + ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + HybridSetBase* set = state->hybrid_set.get(); + + // if there are too many elements in InPredicate, exceed the limit, + // we will not push any condition of this column to storage engine. + // because too many conditions pushed down to storage engine may even + // slow down the query process. + // ATTN: This is just an experience value. You may need to try + // different thresholds to improve performance. + if (set->size() > _max_pushdown_conditions_per_column) { + VLOG_NOTICE << "Predicate value num " << set->size() << " exceed limit " + << _max_pushdown_conditions_per_column; + return false; + } + return true; +} + +bool NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_call, + VExprContext* expr_ctx, + StringVal* constant_str, + doris_udf::FunctionContext** fn_ctx) { + // Now only `like` function filters is supported to push down + if (fn_call->fn().name.function_name != "like") { + return false; + } + + const auto& children = fn_call->children(); + doris_udf::FunctionContext* func_cxt = expr_ctx->fn_context(fn_call->fn_context_index()); + DCHECK(func_cxt != nullptr); + DCHECK(children.size() == 2); + for (size_t i = 0; i < children.size(); i++) { + if (VExpr::expr_without_cast(children[i])->node_type() != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + continue; + } + if (!children[1 - i]->is_constant()) { + // only handle constant value + return false; + } else { + DCHECK(children[1 - i]->type().is_string_type()); + if (const ColumnConst* const_column = check_and_get_column( + children[1 - i]->get_const_col(expr_ctx)->column_ptr)) { + *constant_str = const_column->get_data_at(0).to_string_val(); + } else { + return false; + } + } + } + *fn_ctx = func_cxt; + return true; +} + +// PlanFragmentExecutor will call this method to set scan range +// Doris scan range is defined in thrift file like this +// struct TPaloScanRange { +// 1: required list hosts +// 2: required string schema_hash +// 3: required string version +// 5: required Types.TTabletId tablet_id +// 6: required string db_name +// 7: optional list partition_column_ranges +// 8: optional string index_name +// 9: optional string table_name +//} +// every doris_scan_range is related with one tablet so that one olap scan node contains multiple tablet +void NewOlapScanNode::set_scan_ranges(const std::vector& scan_ranges) { + for (auto& scan_range : scan_ranges) { + DCHECK(scan_range.scan_range.__isset.palo_scan_range); + _scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range)); + // COUNTER_UPDATE(_tablet_counter, 1); + } + // telemetry::set_current_span_attribute(_tablet_counter); + + return; +} + +Status NewOlapScanNode::_init_scanners(std::list* scanners) { + if (_scan_ranges.empty()) { + _eos = true; + return Status::OK(); + } + auto span = opentelemetry::trace::Tracer::GetCurrentSpan(); + + // ranges constructed from scan keys + std::vector> cond_ranges; + RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges)); + // if we can't get ranges from conditions, we give it a total range + if (cond_ranges.empty()) { + cond_ranges.emplace_back(new doris::OlapScanRange()); + } + int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); + + // std::unordered_set disk_set; + for (auto& scan_range : _scan_ranges) { + auto tablet_id = scan_range->tablet_id; + std::string err; + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); + if (tablet == nullptr) { + std::stringstream ss; + ss << "failed to get tablet: " << tablet_id << ", reason: " << err; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + + std::vector>* ranges = &cond_ranges; + int size_based_scanners_per_tablet = 1; + + if (config::doris_scan_range_max_mb > 0) { + size_based_scanners_per_tablet = std::max( + 1, (int)tablet->tablet_footprint() / config::doris_scan_range_max_mb << 20); + } + + int ranges_per_scanner = + std::max(1, (int)ranges->size() / + std::min(scanners_per_tablet, size_based_scanners_per_tablet)); + int num_ranges = ranges->size(); + for (int i = 0; i < num_ranges;) { + std::vector scanner_ranges; + scanner_ranges.push_back((*ranges)[i].get()); + ++i; + for (int j = 1; i < num_ranges && j < ranges_per_scanner && + (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include; + ++j, ++i) { + scanner_ranges.push_back((*ranges)[i].get()); + } + + NewOlapScanner* scanner = new NewOlapScanner( + _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, + _need_agg_finalize, *scan_range, _scanner_mem_tracker.get()); + // add scanner to pool before doing prepare. + // so that scanner can be automatically deconstructed if prepare failed. + _scanner_pool.add(scanner); + RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(), + _olap_filters, _bloom_filters_push_down, + _push_down_functions)); + scanners->push_back((VScanner*)scanner); + // disk_set.insert(scanner->scan_disk()); + } + } + + // COUNTER_SET(_num_disks_accessed_counter, static_cast(disk_set.size())); + // COUNTER_SET(_num_scanners, static_cast(_volap_scanners.size())); + // telemetry::set_span_attribute(span, _num_disks_accessed_counter); + // telemetry::set_span_attribute(span, _num_scanners); + + // init progress + // std::stringstream ss; + // ss << "ScanThread complete (node=" << id() << "):"; + // _progress = ProgressUpdater(ss.str(), _volap_scanners.size(), 1); + return Status::OK(); +} + +bool NewOlapScanNode::_is_key_column(const std::string& key_name) { + // all column in dup_keys table or unique_keys with merge on write table olap scan node threat + // as key column + if (_olap_scan_node.keyType == TKeysType::DUP_KEYS || + (_olap_scan_node.keyType == TKeysType::UNIQUE_KEYS && + _olap_scan_node.__isset.enable_unique_key_merge_on_write && + _olap_scan_node.enable_unique_key_merge_on_write)) { + return true; + } + + auto res = std::find(_olap_scan_node.key_column_name.begin(), + _olap_scan_node.key_column_name.end(), key_name); + return res != _olap_scan_node.key_column_name.end(); +} + +}; // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h new file mode 100644 index 0000000000..508c8851d5 --- /dev/null +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/exec/scan/vscan_node.h" + +namespace doris::vectorized { + +class NewOlapScanner; +class NewOlapScanNode : public VScanNode { +public: + NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + friend class NewOlapScanner; + + Status prepare(RuntimeState* state) override; + + void set_scan_ranges(const std::vector& scan_ranges) override; + +protected: + Status _init_profile() override; + Status _process_conjuncts() override; + bool _is_key_column(const std::string& col_name) override; + + bool _should_push_down_binary_predicate( + VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* constant_val, + int* slot_ref_child, + const std::function& fn_checker) override; + + bool _should_push_down_in_predicate(VInPredicate* in_pred, VExprContext* expr_ctx, + bool is_not_in) override; + + bool _should_push_down_function_filter(VectorizedFnCall* fn_call, VExprContext* expr_ctx, + StringVal* constant_str, + doris_udf::FunctionContext** fn_ctx) override; + + Status _init_scanners(std::list* scanners) override; + +private: + Status _build_key_ranges_and_filters(); + +private: + TOlapScanNode _olap_scan_node; + std::vector> _scan_ranges; + OlapScanKeys _scan_keys; + + std::unique_ptr _scanner_mem_tracker; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp new file mode 100644 index 0000000000..3ae67a34be --- /dev/null +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_olap_scanner.h" + +#include "olap/storage_engine.h" +#include "vec/exec/scan/new_olap_scan_node.h" +#include "vec/olap/block_reader.h" + +namespace doris::vectorized { + +NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, + bool aggregation, bool need_agg_finalize, + const TPaloScanRange& scan_range, MemTracker* tracker) + : VScanner(state, static_cast(parent), limit, tracker), + _aggregation(aggregation), + _need_agg_finalize(need_agg_finalize), + _version(-1) { + _tablet_schema = std::make_shared(); +} + +Status NewOlapScanner::prepare( + const TPaloScanRange& scan_range, const std::vector& key_ranges, + VExprContext** vconjunct_ctx_ptr, const std::vector& filters, + const std::vector>>& bloom_filters, + const std::vector& function_filters) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + + if (vconjunct_ctx_ptr != nullptr) { + // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. + RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); + } + + // set limit to reduce end of rowset and segment mem use + _tablet_reader = std::make_unique(); + _tablet_reader->set_batch_size( + _parent->limit() == -1 + ? _state->batch_size() + : std::min(static_cast(_state->batch_size()), _parent->limit())); + + // Get olap table + TTabletId tablet_id = scan_range.tablet_id; + _version = strtoul(scan_range.version.c_str(), nullptr, 10); + { + std::string err; + _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); + if (_tablet.get() == nullptr) { + std::stringstream ss; + ss << "failed to get tablet. tablet_id=" << tablet_id << ", reason=" << err; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + _tablet_schema->copy_from(*_tablet->tablet_schema()); + + TOlapScanNode& olap_scan_node = ((NewOlapScanNode*)_parent)->_olap_scan_node; + if (!olap_scan_node.columns_desc.empty() && + olap_scan_node.columns_desc[0].col_unique_id >= 0) { + // Originally scanner get TabletSchema from tablet object in BE. + // To support lightweight schema change for adding / dropping columns, + // tabletschema is bounded to rowset and tablet's schema maybe outdated, + // so we have to use schema from a query plan witch FE puts it in query plans. + _tablet_schema->clear_columns(); + for (const auto& column_desc : olap_scan_node.columns_desc) { + _tablet_schema->append_column(TabletColumn(column_desc)); + } + } + { + std::shared_lock rdlock(_tablet->get_header_lock()); + const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); + if (rowset == nullptr) { + std::stringstream ss; + ss << "fail to get latest version of tablet: " << tablet_id; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + + // acquire tablet rowset readers at the beginning of the scan node + // to prevent this case: when there are lots of olap scanners to run for example 10000 + // the rowsets maybe compacted when the last olap scanner starts + Version rd_version(0, _version); + Status acquire_reader_st = + _tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers); + if (!acquire_reader_st.ok()) { + LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; + std::stringstream ss; + ss << "failed to initialize storage reader. tablet=" << _tablet->full_name() + << ", res=" << acquire_reader_st + << ", backend=" << BackendOptions::get_localhost(); + return Status::InternalError(ss.str()); + } + + // Initialize tablet_reader_params + RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters, bloom_filters, + function_filters)); + } + } + + return Status::OK(); +} + +Status NewOlapScanner::open(RuntimeState* state) { + RETURN_IF_ERROR(VScanner::open(state)); + // SCOPED_TIMER(_parent->_reader_init_timer); + // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + + // _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false); + + auto res = _tablet_reader->init(_tablet_reader_params); + if (!res.ok()) { + std::stringstream ss; + ss << "failed to initialize storage reader. tablet=" + << _tablet_reader_params.tablet->full_name() << ", res=" << res + << ", backend=" << BackendOptions::get_localhost(); + return Status::InternalError(ss.str()); + } + return Status::OK(); +} + +// it will be called under tablet read lock because capture rs readers need +Status NewOlapScanner::_init_tablet_reader_params( + const std::vector& key_ranges, const std::vector& filters, + const std::vector>>& bloom_filters, + const std::vector& function_filters) { + // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty + bool single_version = + (_tablet_reader_params.rs_readers.size() == 1 && + _tablet_reader_params.rs_readers[0]->rowset()->start_version() == 0 && + !_tablet_reader_params.rs_readers[0] + ->rowset() + ->rowset_meta() + ->is_segments_overlapping()) || + (_tablet_reader_params.rs_readers.size() == 2 && + _tablet_reader_params.rs_readers[0]->rowset()->rowset_meta()->num_rows() == 0 && + _tablet_reader_params.rs_readers[1]->rowset()->start_version() == 2 && + !_tablet_reader_params.rs_readers[1] + ->rowset() + ->rowset_meta() + ->is_segments_overlapping()); + + _tablet_reader_params.direct_mode = _aggregation || single_version; + + RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode)); + + _tablet_reader_params.tablet = _tablet; + _tablet_reader_params.tablet_schema = _tablet_schema; + _tablet_reader_params.reader_type = READER_QUERY; + _tablet_reader_params.aggregation = _aggregation; + _tablet_reader_params.version = Version(0, _version); + + // Condition + for (auto& filter : filters) { + _tablet_reader_params.conditions.push_back(filter); + } + std::copy(bloom_filters.cbegin(), bloom_filters.cend(), + std::inserter(_tablet_reader_params.bloom_filters, + _tablet_reader_params.bloom_filters.begin())); + + std::copy(function_filters.cbegin(), function_filters.cend(), + std::inserter(_tablet_reader_params.function_filters, + _tablet_reader_params.function_filters.begin())); + + std::copy(_tablet->delete_predicates().cbegin(), _tablet->delete_predicates().cend(), + std::inserter(_tablet_reader_params.delete_predicates, + _tablet_reader_params.delete_predicates.begin())); + + // Range + for (auto key_range : key_ranges) { + if (key_range->begin_scan_range.size() == 1 && + key_range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) { + continue; + } + + _tablet_reader_params.start_key_include = key_range->begin_include; + _tablet_reader_params.end_key_include = key_range->end_include; + + _tablet_reader_params.start_key.push_back(key_range->begin_scan_range); + _tablet_reader_params.end_key.push_back(key_range->end_scan_range); + } + + _tablet_reader_params.profile = _parent->runtime_profile(); + _tablet_reader_params.runtime_state = _state; + + _tablet_reader_params.origin_return_columns = &_return_columns; + _tablet_reader_params.tablet_columns_convert_to_null_set = &_tablet_columns_convert_to_null_set; + + if (_tablet_reader_params.direct_mode) { + _tablet_reader_params.return_columns = _return_columns; + } else { + // we need to fetch all key columns to do the right aggregation on storage engine side. + for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) { + _tablet_reader_params.return_columns.push_back(i); + } + for (auto index : _return_columns) { + if (_tablet_schema->column(index).is_key()) { + continue; + } else { + _tablet_reader_params.return_columns.push_back(index); + } + } + } + + // If a agg node is this scan node direct parent + // we will not call agg object finalize method in scan node, + // to avoid the unnecessary SerDe and improve query performance + _tablet_reader_params.need_agg_finalize = _need_agg_finalize; + + if (!config::disable_storage_page_cache) { + _tablet_reader_params.use_page_cache = true; + } + + if (_tablet->enable_unique_key_merge_on_write()) { + _tablet_reader_params.delete_bitmap = &_tablet->tablet_meta()->delete_bitmap(); + } + + TOlapScanNode& olap_scan_node = ((NewOlapScanNode*)_parent)->_olap_scan_node; + if (olap_scan_node.__isset.sort_info && olap_scan_node.sort_info.is_asc_order.size() > 0) { + _limit = _parent->_limit_per_scanner; + _tablet_reader_params.read_orderby_key = true; + if (!olap_scan_node.sort_info.is_asc_order[0]) { + _tablet_reader_params.read_orderby_key_reverse = true; + } + _tablet_reader_params.read_orderby_key_num_prefix_columns = + olap_scan_node.sort_info.is_asc_order.size(); + } + + return Status::OK(); +} + +Status NewOlapScanner::_init_return_columns(bool need_seq_col) { + for (auto slot : _output_tuple_desc->slots()) { + if (!slot->is_materialized()) { + continue; + } + + int32_t index = slot->col_unique_id() >= 0 + ? _tablet_schema->field_index(slot->col_unique_id()) + : _tablet_schema->field_index(slot->col_name()); + + if (index < 0) { + std::stringstream ss; + ss << "field name is invalid. field=" << slot->col_name(); + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + _return_columns.push_back(index); + if (slot->is_nullable() && !_tablet_schema->column(index).is_nullable()) { + _tablet_columns_convert_to_null_set.emplace(index); + } + } + + // expand the sequence column + if (_tablet_schema->has_sequence_col() && need_seq_col) { + bool has_replace_col = false; + for (auto col : _return_columns) { + if (_tablet_schema->column(col).aggregation() == + FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) { + has_replace_col = true; + break; + } + } + if (auto sequence_col_idx = _tablet_schema->sequence_col_idx(); + has_replace_col && std::find(_return_columns.begin(), _return_columns.end(), + sequence_col_idx) == _return_columns.end()) { + _return_columns.push_back(sequence_col_idx); + } + } + + if (_return_columns.empty()) { + return Status::InternalError("failed to build storage scanner, no materialized slot!"); + } + return Status::OK(); +} + +Status NewOlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { + // Read one block from block reader + // ATTN: Here we need to let the _get_block_impl method guarantee the semantics of the interface, + // that is, eof can be set to true only when the returned block is empty. + RETURN_IF_ERROR(_tablet_reader->next_block_with_aggregation(block, nullptr, nullptr, eof)); + if (block->rows() > 0) { + *eof = false; + } + return Status::OK(); +} + +Status NewOlapScanner::close(RuntimeState* state) { + if (_is_closed) { + return Status::OK(); + } + + // olap scan node will call scanner.close() when finished + // will release resources here + // if not clear rowset readers in read_params here + // readers will be release when runtime state deconstructed but + // deconstructor in reader references runtime state + // so that it will core + _tablet_reader_params.rs_readers.clear(); + _update_counter(); + _tablet_reader.reset(); + // Expr::close(_conjunct_ctxs, state); + _is_closed = true; + return Status::OK(); + + RETURN_IF_ERROR(VScanner::close(state)); + return Status::OK(); +} + +void NewOlapScanner::_update_counter() { + // TODO +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h new file mode 100644 index 0000000000..73c0330b20 --- /dev/null +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/olap_utils.h" +#include "exprs/bloomfilter_predicate.h" +#include "exprs/function_filter.h" +#include "olap/reader.h" +#include "vec/exec/scan/vscanner.h" + +namespace doris { + +struct OlapScanRange; + +namespace vectorized { + +class NewOlapScanNode; + +class NewOlapScanner : public VScanner { +public: + NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, + bool need_agg_finalize, const TPaloScanRange& scan_range, MemTracker* tracker); + + Status open(RuntimeState* state) override; + + Status close(RuntimeState* state) override; + +public: + Status prepare(const TPaloScanRange& scan_range, const std::vector& key_ranges, + VExprContext** vconjunct_ctx_ptr, const std::vector& filters, + const std::vector>>& + bloom_filters, + const std::vector& function_filters); + +protected: + Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; + +private: + void _update_counter(); + + Status _init_tablet_reader_params( + const std::vector& key_ranges, const std::vector& filters, + const std::vector>>& + bloom_filters, + const std::vector& function_filters); + + Status _init_return_columns(bool need_seq_col); + +private: + bool _aggregation; + bool _need_agg_finalize; + + TabletSchemaSPtr _tablet_schema; + TabletSharedPtr _tablet; + int64_t _version; + + TabletReader::ReaderParams _tablet_reader_params; + std::unique_ptr _tablet_reader; + + std::vector _return_columns; + std::unordered_set _tablet_columns_convert_to_null_set; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp new file mode 100644 index 0000000000..bef4fe8e66 --- /dev/null +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "scanner_context.h" + +#include "common/config.h" +#include "runtime/runtime_state.h" +#include "util/threadpool.h" +#include "vec/core/block.h" +#include "vec/exec/scan/scanner_scheduler.h" +#include "vec/exec/scan/vscanner.h" + +namespace doris::vectorized { + +Status ScannerContext::init() { + _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc; + // 1. Calculate how many blocks need to be preallocated. + // The calculation logic is as follows: + // 1. Assuming that at most M rows can be scanned in one scan(config::doris_scanner_row_num), + // then figure out how many blocks are required for one scan(_block_per_scanner). + // 2. The maximum number of concurrency * the blocks required for one scan, + // that is, the number of blocks that need to be pre-allocated + auto doris_scanner_row_num = + limit == -1 ? config::doris_scanner_row_num + : std::min(static_cast(config::doris_scanner_row_num), limit); + int real_block_size = limit == -1 ? _state->batch_size() + : std::min(static_cast(_state->batch_size()), limit); + _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size; + auto pre_alloc_block_count = + std::min((int32_t)_scanners.size(), config::doris_scanner_thread_pool_thread_num) * + _block_per_scanner; + + // The free blocks is used for final output block of scanners. + // So use _output_tuple_desc; + for (int i = 0; i < pre_alloc_block_count; ++i) { + auto block = new vectorized::Block(_output_tuple_desc->slots(), real_block_size); + _free_blocks.emplace_back(block); + } + + // 2. Calculate max concurrency + _max_thread_num = config::doris_scanner_thread_pool_thread_num; + if (config::doris_scanner_row_num > _state->batch_size()) { + _max_thread_num /= config::doris_scanner_row_num / _state->batch_size(); + if (_max_thread_num <= 0) { + _max_thread_num = 1; + } + } + + // 3. get thread token + thread_token = _state->get_query_fragments_ctx()->get_token(); + + // 4. This ctx will be submitted to the scanner scheduler right after init. + // So set _num_scheduling_ctx to 1 here. + _num_scheduling_ctx = 1; + + _num_unfinished_scanners = _scanners.size(); + + return Status::OK(); +} + +vectorized::Block* ScannerContext::get_free_block(bool* get_free_block) { + { + std::lock_guard l(_free_blocks_lock); + if (!_free_blocks.empty()) { + auto block = _free_blocks.back(); + _free_blocks.pop_back(); + return block; + } + } + *get_free_block = false; + + return new vectorized::Block(_real_tuple_desc->slots(), _state->batch_size()); +} + +void ScannerContext::return_free_block(vectorized::Block* block) { + block->clear_column_data(); + std::lock_guard l(_free_blocks_lock); + _free_blocks.emplace_back(block); +} + +void ScannerContext::append_blocks_to_queue(const std::vector& blocks) { + std::lock_guard l(_transfer_lock); + blocks_queue.insert(blocks_queue.end(), blocks.begin(), blocks.end()); + for (auto b : blocks) { + _cur_bytes_in_queue += b->allocated_bytes(); + } + _blocks_queue_added_cv.notify_one(); +} + +Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos) { + std::unique_lock l(_transfer_lock); + // Wait for block from queue + while (_process_status.ok() && !_is_finished && blocks_queue.empty()) { + _blocks_queue_added_cv.wait_for(l, std::chrono::seconds(1)); + } + + if (!_process_status.ok()) { + return _process_status; + } + + if (!blocks_queue.empty()) { + *block = blocks_queue.front(); + blocks_queue.pop_front(); + _cur_bytes_in_queue -= (*block)->allocated_bytes(); + return Status::OK(); + } else { + *eos = _is_finished; + } + return Status::OK(); +} + +bool ScannerContext::set_status_on_error(const Status& status) { + std::lock_guard l(_transfer_lock); + if (_process_status.ok()) { + _process_status = status; + _blocks_queue_added_cv.notify_one(); + return true; + } + return false; +} + +Status ScannerContext::_close_and_clear_scanners() { + std::unique_lock l(_scanners_lock); + for (auto scanner : _scanners) { + scanner->close(_state); + // Scanners are in ObjPool in ScanNode, + // so no need to delete them here. + } + _scanners.clear(); + return Status::OK(); +} + +void ScannerContext::clear_and_join() { + _close_and_clear_scanners(); + + std::unique_lock l(_transfer_lock); + do { + if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { + break; + } else { + _ctx_finish_cv.wait( + l, [this] { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; }); + break; + } + } while (false); + + std::for_each(blocks_queue.begin(), blocks_queue.end(), + std::default_delete()); + std::for_each(_free_blocks.begin(), _free_blocks.end(), + std::default_delete()); + return; +} + +std::string ScannerContext::debug_string() { + return fmt::format( + "id: {}, sacnners: {}, blocks in queue: {}," + " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {}," + " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {}, _max_thread_num: {}," + " _block_per_scanner: {}, _cur_bytes_in_queue: {}, _max_bytes_in_queue: {}", + ctx_id, _scanners.size(), blocks_queue.size(), _process_status.ok(), _should_stop, + _is_finished, _free_blocks.size(), limit, _num_running_scanners, _num_scheduling_ctx, + _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue); +} + +void ScannerContext::push_back_scanner_and_reschedule(ScannerScheduler* scheduler, + VScanner* scanner) { + { + std::unique_lock l(_scanners_lock); + _scanners.push_front(scanner); + } + + std::lock_guard l(_transfer_lock); + _num_running_scanners--; + _num_scheduling_ctx++; + scheduler->submit(this); + if (scanner->need_to_close() && (--_num_unfinished_scanners) == 0) { + _is_finished = true; + _blocks_queue_added_cv.notify_one(); + } + _ctx_finish_cv.notify_one(); +} + +void ScannerContext::get_next_batch_of_scanners(std::list* current_run) { + // 1. Calculate how many scanners should be scheduled at this run. + int thread_slot_num = 0; + { + std::unique_lock l(_transfer_lock); + if (_cur_bytes_in_queue < _max_bytes_in_queue / 2) { + // If there are enough space in blocks queue, + // the scanner number depends on the _free_blocks numbers + std::lock_guard l(_free_blocks_lock); + thread_slot_num = _free_blocks.size() / _block_per_scanner; + thread_slot_num += (_free_blocks.size() % _block_per_scanner != 0); + thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); + if (thread_slot_num <= 0) { + thread_slot_num = 1; + } + } + } + + // 2. get #thread_slot_num scanners from ctx->scanners + // and put them into "this_run". + { + std::unique_lock l(_scanners_lock); + for (int i = 0; i < thread_slot_num && !_scanners.empty();) { + auto scanner = _scanners.front(); + _scanners.pop_front(); + if (scanner->need_to_close()) { + scanner->close(_state); + } else { + current_run->push_back(scanner); + i++; + } + } + } +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h new file mode 100644 index 0000000000..10728bc7df --- /dev/null +++ b/be/src/vec/exec/scan/scanner_context.h @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "runtime/descriptors.h" +#include "util/uid_util.h" +#include "vec/core/block.h" + +namespace doris { + +class PriorityThreadPool; +class ThreadPool; +class ThreadPoolToken; +class ScannerScheduler; + +namespace vectorized { + +class VScanner; + +// ScannerContext is responsible for recording the execution status +// of a group of Scanners corresponding to a ScanNode. +// Including how many scanners are being scheduled, and maintaining +// a producer-consumer blocks queue between scanners and scan nodes. +// +// ScannerContext is also the scheduling unit of ScannerScheduler. +// ScannerScheduler schedules a ScannerContext at a time, +// and submits the Scanners to the scanner thread pool for data scanning. +class ScannerContext { +public: + ScannerContext(RuntimeState* state_, const TupleDescriptor* input_tuple_desc, + const TupleDescriptor* output_tuple_desc, const std::list& scanners_, + int64_t limit_, int64_t max_bytes_in_blocks_queue_) + : _state(state_), + _input_tuple_desc(input_tuple_desc), + _output_tuple_desc(output_tuple_desc), + _process_status(Status::OK()), + limit(limit_), + _max_bytes_in_queue(max_bytes_in_blocks_queue_), + _scanners(scanners_) { + ctx_id = UniqueId::gen_uid().to_string(); + if (_scanners.empty()) { + _is_finished = true; + } + } + + Status init(); + + vectorized::Block* get_free_block(bool* get_free_block); + void return_free_block(vectorized::Block* block); + + // Append blocks from scanners to the blocks queue. + void append_blocks_to_queue(const std::vector& blocks); + + // Get next block from blocks queue. Called by ScanNode + // Set eos to true if there is no more data to read. + // And if eos is true, the block returned must be nullptr. + Status get_block_from_queue(vectorized::Block** block, bool* eos); + + // When a scanner complete a scan, this method will be called + // to return the scanner to the list for next scheduling. + void push_back_scanner_and_reschedule(ScannerScheduler* scheduler, VScanner* scanner); + + bool set_status_on_error(const Status& status); + + Status status() { + std::lock_guard l(_transfer_lock); + return _process_status; + } + + // Called by ScanNode. + // Used to notify the scheduler that this ScannerContext can stop working. + void set_should_stop() { + std::lock_guard l(_transfer_lock); + _should_stop = true; + _blocks_queue_added_cv.notify_one(); + } + + // Return true if this ScannerContext need no more process + bool done() { + std::lock_guard l(_transfer_lock); + return _is_finished || _should_stop || !_process_status.ok(); + } + + // Update the running num of scanners and contexts + void update_num_running(int32_t scanner_inc, int32_t sched_inc) { + std::lock_guard l(_transfer_lock); + _num_running_scanners += scanner_inc; + _num_scheduling_ctx += sched_inc; + _blocks_queue_added_cv.notify_one(); + _ctx_finish_cv.notify_one(); + } + + void get_next_batch_of_scanners(std::list* current_run); + + void clear_and_join(); + + std::string debug_string(); + + RuntimeState* state() { return _state; } + +public: + // the unique id of this context + std::string ctx_id; + int32_t queue_idx = -1; + ThreadPoolToken* thread_token; + +private: + Status _close_and_clear_scanners(); + +private: + RuntimeState* _state; + + // the comment of same fields in VScanNode + const TupleDescriptor* _input_tuple_desc; + const TupleDescriptor* _output_tuple_desc; + // If _input_tuple_desc is not null, _real_tuple_desc point to _input_tuple_desc, + // otherwise, _real_tuple_desc point to _output_tuple_desc + const TupleDescriptor* _real_tuple_desc; + + // _transfer_lock is used to protect the critical section + // where the ScanNode and ScannerScheduler interact. + // Including access to variables such as blocks_queue, _process_status, _is_finished, etc. + std::mutex _transfer_lock; + // The blocks got from scanners will be added to the "blocks_queue". + // And the upper scan node will be as a consumer to fetch blocks from this queue. + // Should be protected by "_transfer_lock" + std::list blocks_queue; + // Wait in get_block_from_queue(), by ScanNode. + std::condition_variable _blocks_queue_added_cv; + // Wait in clear_and_join(), by ScanNode. + std::condition_variable _ctx_finish_cv; + + // The following 3 variables control the process of the scanner scheduling. + // Use _transfer_lock to protect them. + // 1. _process_status + // indicates the global status of this scanner context. + // Set to non-ok if encounter errors. + // And if it is non-ok, the scanner process should stop. + // Set be set by either ScanNode or ScannerScheduler. + // 2. _should_stop + // Always be set by ScanNode. + // True means no more data need to be read(reach limit or closed) + // 3. _is_finished + // Always be set by ScannerScheduler. + // True means all scanners are finished to scan. + Status _process_status; + bool _should_stop = false; + bool _is_finished = false; + + // Pre-allocated blocks for all scanners to share, for memory reuse. + std::mutex _free_blocks_lock; + std::vector _free_blocks; + + // The limit from SQL's limit clause + int64_t limit; + + // Current number of running scanners. + int32_t _num_running_scanners = 0; + // Current number of ctx being scheduled. + // After each Scanner finishes a task, it will put the corresponding ctx + // back into the scheduling queue. + // Therefore, there will be multiple pointer of same ctx in the scheduling queue. + // Here we record the number of ctx in the scheduling queue to clean up at the end. + int32_t _num_scheduling_ctx = 0; + // Num of unfinished scanners. Should be set in init() + int32_t _num_unfinished_scanners = 0; + // Max number of scan thread for this scanner context. + int32_t _max_thread_num = 0; + // How many blocks a scanner can use in one task. + int32_t _block_per_scanner = 0; + + // The current bytes of blocks in blocks queue + int64_t _cur_bytes_in_queue = 0; + // The max limit bytes of blocks in blocks queue + int64_t _max_bytes_in_queue; + + // List "scanners" saves all "unfinished" scanners. + // The scanner scheduler will pop scanners from this list, run scanner, + // and then if the scanner is not finished, will be pushed back to this list. + // Not need to protect by lock, because only one scheduler thread will access to it. + std::mutex _scanners_lock; + std::list _scanners; + + // TODO: Add statistics of this scanner +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp new file mode 100644 index 0000000000..7ddd5e304b --- /dev/null +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -0,0 +1,253 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "scanner_scheduler.h" + +#include "common/config.h" +#include "util/priority_thread_pool.hpp" +#include "util/priority_work_stealing_thread_pool.hpp" +#include "util/thread.h" +#include "util/threadpool.h" +#include "vec/core/block.h" +#include "vec/exec/scan/vscanner.h" +#include "vec/exprs/vexpr.h" + +namespace doris::vectorized { + +ScannerScheduler::ScannerScheduler() {} + +ScannerScheduler::~ScannerScheduler() { + _is_closed = true; + _scheduler_pool->shutdown(); + _local_scan_thread_pool->shutdown(); + _remote_scan_thread_pool->shutdown(); + // TODO: safely delete all objects and graceful exit +} + +Status ScannerScheduler::init(ExecEnv* env) { + // 1. scheduling thread pool and scheduling queues + ThreadPoolBuilder("SchedulingThreadPool") + .set_min_threads(QUEUE_NUM) + .set_max_threads(QUEUE_NUM) + .build(&_scheduler_pool); + + _pending_queues = new BlockingQueue*[QUEUE_NUM]; + for (int i = 0; i < QUEUE_NUM; i++) { + _pending_queues[i] = new BlockingQueue(INT32_MAX); + _scheduler_pool->submit_func([this, i] { this->_schedule_thread(i); }); + } + + // 2. local scan thread pool + _local_scan_thread_pool = new PriorityWorkStealingThreadPool( + config::doris_scanner_thread_pool_thread_num, env->store_paths().size(), + config::doris_scanner_thread_pool_queue_size); + + // 3. remote scan thread pool + _remote_scan_thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_queue_size); + + return Status::OK(); +} + +Status ScannerScheduler::submit(ScannerContext* ctx) { + if (ctx->queue_idx == -1) { + ctx->queue_idx = (_queue_idx++ % QUEUE_NUM); + } + if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) { + return Status::InternalError("failed to submit scanner context to scheduler"); + } + return Status::OK(); +} + +void ScannerScheduler::_schedule_thread(int queue_id) { + BlockingQueue* queue = _pending_queues[queue_id]; + while (!_is_closed) { + ScannerContext* ctx; + bool ok = queue->blocking_get(&ctx); + if (!ok) { + // maybe closed + continue; + } + + _schedule_scanners(ctx); + // If ctx is done, no need to schedule it again. + // But should notice that there may still scanners running in scanner pool. + } + return; +} + +void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { + if (ctx->done()) { + ctx->update_num_running(0, -1); + return; + } + + std::list this_run; + ctx->get_next_batch_of_scanners(&this_run); + if (this_run.empty()) { + submit(ctx); + return; + } + + ctx->update_num_running(this_run.size(), -1); + // Submit scanners to thread pool + // TODO(cmy): How to handle this "nice"? + int nice = 1; + auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan(); + auto iter = this_run.begin(); + if (ctx->thread_token != nullptr) { + while (iter != this_run.end()) { + auto s = ctx->thread_token->submit_func( + [this, scanner = *iter, parent_span = cur_span, ctx] { + opentelemetry::trace::Scope scope {parent_span}; + this->_scanner_scan(this, ctx, scanner); + }); + if (s.ok()) { + (*iter)->start_wait_worker_timer(); + this_run.erase(iter++); + } else { + ctx->set_status_on_error(s); + break; + } + } + } else { + while (iter != this_run.end()) { + PriorityThreadPool::Task task; + task.work_function = [this, scanner = *iter, parent_span = cur_span, ctx] { + opentelemetry::trace::Scope scope {parent_span}; + this->_scanner_scan(this, ctx, scanner); + }; + task.priority = nice; + task.queue_id = (*iter)->queue_id(); + (*iter)->start_wait_worker_timer(); + + TabletStorageType type = (*iter)->get_storage_type(); + bool ret = false; + if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { + ret = _local_scan_thread_pool->offer(task); + } else { + ret = _remote_scan_thread_pool->offer(task); + } + if (ret) { + this_run.erase(iter++); + } else { + ctx->set_status_on_error( + Status::InternalError("failed to submit scanner to scanner pool")); + break; + } + } + } +} + +void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, + VScanner* scanner) { + // TODO: rethink mem tracker and span + // START_AND_SCOPE_SPAN(scanner->runtime_state()->get_tracer(), span, + // "ScannerScheduler::_scanner_scan"); + // SCOPED_ATTACH_TASK(scanner->runtime_state()); + + Thread::set_self_name("_scanner_scan"); + // int64_t wait_time = scanner->update_wait_worker_timer(); + // Do not use ScopedTimer. There is no guarantee that, the counter + // (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`. + ThreadCpuStopWatch cpu_watch; + cpu_watch.start(); + Status status = Status::OK(); + bool eos = false; + RuntimeState* state = ctx->state(); + DCHECK(nullptr != state); + if (!scanner->is_open()) { + status = scanner->open(state); + if (!status.ok()) { + ctx->set_status_on_error(status); + eos = true; + } + scanner->set_opened(); + } + + scanner->try_append_late_arrival_runtime_filter(); + + // Because we use thread pool to scan data from storage. One scanner can't + // use this thread too long, this can starve other query's scanner. So, we + // need yield this thread when we do enough work. However, OlapStorage read + // data in pre-aggregate mode, then we can't use storage returned data to + // judge if we need to yield. So we record all raw data read in this round + // scan, if this exceeds row number or bytes threshold, we yield this thread. + std::vector blocks; + int64_t raw_rows_read = scanner->raw_rows_read(); + int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; + int64_t raw_bytes_read = 0; + int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; + bool get_free_block = true; + int num_rows_in_block = 0; + + // Has to wait at least one full block, or it will cause a lot of schedule task in priority + // queue, it will affect query latency and query concurrency for example ssb 3.3. + while (!eos && raw_bytes_read < raw_bytes_threshold && + ((raw_rows_read < raw_rows_threshold && get_free_block) || + num_rows_in_block < state->batch_size())) { + if (UNLIKELY(ctx->done())) { + eos = true; + status = Status::Cancelled("Cancelled"); + LOG(INFO) << "Scan thread cancelled, cause query done, maybe reach limit."; + break; + } + + auto block = ctx->get_free_block(&get_free_block); + status = scanner->get_block(state, block, &eos); + VLOG_ROW << "VOlapScanNode input rows: " << block->rows(); + if (!status.ok()) { + LOG(WARNING) << "Scan thread read VOlapScanner failed: " << status.to_string(); + // Add block ptr in blocks, prevent mem leak in read failed + blocks.push_back(block); + eos = true; + break; + } + + raw_bytes_read += block->bytes(); + num_rows_in_block += block->rows(); + if (UNLIKELY(block->rows() == 0)) { + ctx->return_free_block(block); + } else { + if (!blocks.empty() && blocks.back()->rows() + block->rows() <= state->batch_size()) { + vectorized::MutableBlock(blocks.back()).merge(*block); + ctx->return_free_block(block); + } else { + blocks.push_back(block); + } + } + raw_rows_read = scanner->raw_rows_read(); + } // end for while + + // if we failed, check status. + if (UNLIKELY(!status.ok())) { + // _transfer_done = true; + ctx->set_status_on_error(status); + eos = true; + std::for_each(blocks.begin(), blocks.end(), std::default_delete()); + } else if (!blocks.empty()) { + ctx->append_blocks_to_queue(blocks); + } + + if (eos) { + scanner->mark_to_need_to_close(); + } + + ctx->push_back_scanner_and_reschedule(scheduler, scanner); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h new file mode 100644 index 0000000000..a72fd5021e --- /dev/null +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "util/blocking_queue.hpp" +#include "vec/exec/scan/scanner_context.h" + +namespace doris::vectorized { + +// Responsible for the scheduling and execution of all Scanners of a BE node. +// ScannerScheduler has two types of thread pools: +// 1. Scheduling thread pool +// Responsible for Scanner scheduling. +// A set of Scanners for a query will be encapsulated into a ScannerContext +// and submitted to the ScannerScheduler's scheduling queue. +// There are multiple scheduling queues in ScannerScheduler, and each scheduling queue +// is handled by a scheduling thread. +// The scheduling thread is scheduled in granularity of ScannerContext, +// that is, a group of Scanners in a ScannerContext are scheduled at a time. +// +//2. Execution thread pool +// The scheduling thread will submit the Scanners selected from the ScannerContext +// to the execution thread pool to do the actual scan task. +// Each Scanner will act as a producer, read a group of blocks and put them into +// the corresponding block queue. +// The corresponding ScanNode will act as a consumer to consume blocks from the block queue. + +using ContextMap = phmap::parallel_flat_hash_map< + std::string, std::shared_ptr, phmap::priv::hash_default_hash, + phmap::priv::hash_default_eq, + std::allocator>>, 12, + std::mutex>; + +class Env; +class ScannerScheduler { +public: + ScannerScheduler(); + ~ScannerScheduler(); + + Status init(ExecEnv* env); + + Status submit(ScannerContext* ctx); + +private: + // scheduling thread function + void _schedule_thread(int queue_id); + // schedule scanners in a certain ScannerContext + void _schedule_scanners(ScannerContext* ctx); + // execution thread function + void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScanner* scanner); + +private: + // Scheduling queue number. + // TODO: make it configurable. + static const int QUEUE_NUM = 4; + // The ScannerContext will be submitted to the pending queue roundrobin. + // _queue_idx pointer to the current queue. + // The scheduler thread will take ctx from pending queue, schedule it, + // and put it to the _scheduling_map. + // If any scanner finish, it will take ctx from and put it to pending queue again. + std::atomic_int _queue_idx = {0}; + BlockingQueue** _pending_queues; + + // scheduling thread pool + std::unique_ptr _scheduler_pool; + // execution thread pool + // _local_scan_thread_pool is for local scan task(typically, olap scanner) + // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) + PriorityThreadPool* _local_scan_thread_pool; + PriorityThreadPool* _remote_scan_thread_pool; + + // true is the scheduler is closed. + std::atomic_bool _is_closed = {false}; + + ContextMap _context_map; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp new file mode 100644 index 0000000000..26f8847c1d --- /dev/null +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -0,0 +1,864 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/vscan_node.h" + +#include "exprs/hybrid_set.h" +#include "runtime/runtime_filter_mgr.h" +#include "util/stack_util.h" +#include "util/threadpool.h" +#include "vec/columns/column_const.h" +#include "vec/exec/scan/scanner_scheduler.h" +#include "vec/exec/scan/vscanner.h" +#include "vec/exprs/vcompound_pred.h" +#include "vec/exprs/vslot_ref.h" +#include "vec/functions/in.h" + +namespace doris::vectorized { + +#define RETURN_IF_PUSH_DOWN(stmt) \ + if (!push_down) { \ + stmt; \ + } else { \ + return; \ + } + +static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) { + if ((slot->type().is_date_type() || slot->type().is_date_v2_type() || + slot->type().is_datetime_v2_type()) && + (expr->type().is_date_type() || expr->type().is_date_v2_type() || + expr->type().is_datetime_v2_type())) { + return true; + } + if (slot->type().is_string_type() && expr->type().is_string_type()) { + return true; + } + return false; +} + +Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + _state = state; + + const TQueryOptions& query_options = state->query_options(); + if (query_options.__isset.max_scan_key_num) { + _max_scan_key_num = query_options.max_scan_key_num; + } else { + _max_scan_key_num = config::doris_max_scan_key_num; + } + if (query_options.__isset.max_pushdown_conditions_per_column) { + _max_pushdown_conditions_per_column = query_options.max_pushdown_conditions_per_column; + } else { + _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; + } + + RETURN_IF_ERROR(_register_runtime_filter()); + + return Status::OK(); +} + +Status VScanNode::prepare(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::prepare(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + + RETURN_IF_ERROR(_init_profile()); + + _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id); + _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); + + // init profile for runtime filter + for (auto& rf_ctx : _runtime_filter_ctxs) { + rf_ctx.runtime_filter->init_profile(_runtime_profile.get()); + } + return Status::OK(); +} + +Status VScanNode::open(RuntimeState* state) { + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(ExecNode::open(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + + RETURN_IF_ERROR(_acquire_runtime_filter()); + RETURN_IF_ERROR(_process_conjuncts()); + + std::list scanners; + RETURN_IF_ERROR(_init_scanners(&scanners)); + if (scanners.empty()) { + _eos = true; + } else { + RETURN_IF_ERROR(_start_scanners(scanners)); + } + return Status::OK(); +} + +Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (state->is_cancelled()) { + _scanner_ctx->set_status_on_error(Status::Cancelled("query cancelled")); + return _scanner_ctx->status(); + } + + if (_eos) { + *eos = true; + return Status::OK(); + } + + vectorized::Block* scan_block = nullptr; + RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(&scan_block, eos)); + if (*eos) { + DCHECK(scan_block == nullptr); + return Status::OK(); + } + + // get scanner's block memory + block->swap(*scan_block); + _scanner_ctx->return_free_block(scan_block); + + reached_limit(block, eos); + if (*eos) { + // reach limit, stop the scanners. + _scanner_ctx->set_should_stop(); + } + + return Status::OK(); +} + +Status VScanNode::_start_scanners(const std::list& scanners) { + _scanner_ctx.reset(new ScannerContext(_state, _input_tuple_desc, _output_tuple_desc, scanners, + limit(), _state->query_options().mem_limit / 20)); + RETURN_IF_ERROR(_scanner_ctx->init()); + RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); + return Status::OK(); +} + +Status VScanNode::_register_runtime_filter() { + int filter_size = _runtime_filter_descs.size(); + _runtime_filter_ctxs.resize(filter_size); + _runtime_filter_ready_flag.resize(filter_size); + for (int i = 0; i < filter_size; ++i) { + IRuntimeFilter* runtime_filter = nullptr; + const auto& filter_desc = _runtime_filter_descs[i]; + RETURN_IF_ERROR(_state->runtime_filter_mgr()->regist_filter( + RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id())); + RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, + &runtime_filter)); + _runtime_filter_ctxs[i].runtime_filter = runtime_filter; + _runtime_filter_ready_flag[i] = false; + } + return Status::OK(); +} + +Status VScanNode::_acquire_runtime_filter() { + std::vector vexprs; + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; + bool ready = runtime_filter->is_ready(); + if (!ready) { + ready = runtime_filter->await(); + } + if (ready) { + RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs)); + _runtime_filter_ctxs[i].apply_mark = true; + } else { + _is_all_rf_applied = false; + } + } + RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs)); + + return Status::OK(); +} + +Status VScanNode::_append_rf_into_conjuncts(std::vector& vexprs) { + if (vexprs.empty()) { + return Status::OK(); + } + + auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0]; + for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) { + if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) { + continue; + } + TFunction fn; + TFunctionName fn_name; + fn_name.__set_db_name(""); + fn_name.__set_function_name("and"); + fn.__set_name(fn_name); + fn.__set_binary_type(TFunctionBinaryType::BUILTIN); + std::vector arg_types; + arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + fn.__set_arg_types(arg_types); + fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + fn.__set_has_var_args(false); + + TExprNode texpr_node; + texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED); + texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND); + texpr_node.__set_fn(fn); + texpr_node.__set_is_nullable(last_expr->is_nullable() || vexprs[j]->is_nullable()); + VExpr* new_node = _pool->add(new VcompoundPred(texpr_node)); + new_node->add_child(last_expr); + DCHECK((vexprs[j])->get_impl() != nullptr); + new_node->add_child(vexprs[j]); + last_expr = new_node; + _rf_vexpr_set.insert(vexprs[j]); + } + auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr)); + if (_vconjunct_ctx_ptr) { + (*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr); + } + RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(_state, row_desc())); + RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(_state)); + if (_vconjunct_ctx_ptr) { + (*(_vconjunct_ctx_ptr.get()))->mark_as_stale(); + _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); + } + _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); + *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr; + return Status::OK(); +} + +Status VScanNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::close"); + if (_scanner_ctx.get()) { + // stop and wait the scanner scheduler to be done + // _scanner_ctx may not be created for some short circuit case. + _scanner_ctx->set_should_stop(); + _scanner_ctx->clear_and_join(); + } + + for (auto& ctx : _runtime_filter_ctxs) { + IRuntimeFilter* runtime_filter = ctx.runtime_filter; + runtime_filter->consumer_close(); + } + + for (auto& ctx : _stale_vexpr_ctxs) { + (*ctx)->close(state); + } + + RETURN_IF_ERROR(ExecNode::close(state)); + return Status::OK(); +} + +Status VScanNode::_normalize_conjuncts() { + // The conjuncts is always on output tuple, so use _output_tuple_desc; + std::vector slots = _output_tuple_desc->slots(); + + for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) { + switch (slots[slot_idx]->type().type) { +#define M(NAME) \ + case TYPE_##NAME: { \ + ColumnValueRange range(slots[slot_idx]->col_name(), \ + slots[slot_idx]->type().precision, \ + slots[slot_idx]->type().scale); \ + _slot_id_to_value_range[slots[slot_idx]->id()] = std::pair {slots[slot_idx], range}; \ + break; \ + } +#define APPLY_FOR_PRIMITIVE_TYPE(M) \ + M(TINYINT) \ + M(SMALLINT) \ + M(INT) \ + M(BIGINT) \ + M(LARGEINT) \ + M(CHAR) \ + M(DATE) \ + M(DATETIME) \ + M(DATEV2) \ + M(DATETIMEV2) \ + M(VARCHAR) \ + M(STRING) \ + M(HLL) \ + M(DECIMAL32) \ + M(DECIMAL64) \ + M(DECIMAL128) \ + M(DECIMALV2) \ + M(BOOLEAN) + APPLY_FOR_PRIMITIVE_TYPE(M) +#undef M + default: { + VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << slots[slot_idx]->col_name() + << "]"; + break; + } + } + } + if (_vconjunct_ctx_ptr) { + if ((*_vconjunct_ctx_ptr)->root()) { + VExpr* new_root = _normalize_predicate((*_vconjunct_ctx_ptr)->root()); + if (new_root) { + (*_vconjunct_ctx_ptr)->set_root(new_root); + } else { + (*(_vconjunct_ctx_ptr.get()))->mark_as_stale(); + _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); + _vconjunct_ctx_ptr.reset(nullptr); + } + } + } + for (auto& it : _slot_id_to_value_range) { + std::visit( + [&](auto&& range) { + if (range.is_empty_value_range()) { + _eos = true; + } + }, + it.second.second); + _colname_to_value_range[it.second.first->col_name()] = it.second.second; + } + + return Status::OK(); +} + +VExpr* VScanNode::_normalize_predicate(VExpr* conjunct_expr_root) { + static constexpr auto is_leaf = [](VExpr* expr) { return !expr->is_and_expr(); }; + auto in_predicate_checker = [](const std::vector& children, const VSlotRef** slot, + VExpr** child_contains_slot) { + if (children.empty() || + VExpr::expr_without_cast(children[0])->node_type() != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + return false; + } + *slot = reinterpret_cast(VExpr::expr_without_cast(children[0])); + *child_contains_slot = children[0]; + return true; + }; + auto eq_predicate_checker = [](const std::vector& children, const VSlotRef** slot, + VExpr** child_contains_slot) { + for (const VExpr* child : children) { + if (VExpr::expr_without_cast(child)->node_type() != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + continue; + } + *slot = reinterpret_cast(VExpr::expr_without_cast(child)); + *child_contains_slot = const_cast(child); + return true; + } + return false; + }; + + if (conjunct_expr_root != nullptr) { + if (is_leaf(conjunct_expr_root)) { + auto impl = conjunct_expr_root->get_impl(); + VExpr* cur_expr = impl ? const_cast(impl) : conjunct_expr_root; + SlotDescriptor* slot; + ColumnValueRangeType* range = nullptr; + bool push_down = false; + _eval_const_conjuncts(cur_expr, *(_vconjunct_ctx_ptr.get()), &push_down); + if (!push_down && + (_is_predicate_acting_on_slot(cur_expr, in_predicate_checker, &slot, &range) || + _is_predicate_acting_on_slot(cur_expr, eq_predicate_checker, &slot, &range))) { + std::visit( + [&](auto& value_range) { + RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, + &push_down)); + RETURN_IF_PUSH_DOWN(_normalize_not_in_and_not_eq_predicate( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, + &push_down)); + RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, + &push_down)); + RETURN_IF_PUSH_DOWN(_normalize_noneq_binary_predicate( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, + &push_down)); + if (_is_key_column(slot->col_name())) { + RETURN_IF_PUSH_DOWN(_normalize_bloom_filter( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &push_down)); + if (_state->enable_function_pushdown()) { + RETURN_IF_PUSH_DOWN(_normalize_function_filters( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, + &push_down)); + } + } + }, + *range); + } + if (push_down && _is_key_column(slot->col_name())) { + return nullptr; + } else { + return conjunct_expr_root; + } + } else { + VExpr* left_child = _normalize_predicate(conjunct_expr_root->children()[0]); + VExpr* right_child = _normalize_predicate(conjunct_expr_root->children()[1]); + + if (left_child != nullptr && right_child != nullptr) { + conjunct_expr_root->set_children({left_child, right_child}); + return conjunct_expr_root; + } else { + // here only close the and expr self, do not close the child + conjunct_expr_root->set_children({}); + conjunct_expr_root->close(_state, *_vconjunct_ctx_ptr, + (*_vconjunct_ctx_ptr)->get_function_state_scope()); + } + + // here do not close Expr* now + return left_child != nullptr ? left_child : right_child; + } + } + return conjunct_expr_root; +} + +Status VScanNode::_normalize_bloom_filter(VExpr* expr, VExprContext* expr_ctx, SlotDescriptor* slot, + bool* push_down) { + if (TExprNodeType::BLOOM_PRED == expr->node_type()) { + DCHECK(expr->children().size() == 1); + _bloom_filters_push_down.emplace_back(slot->col_name(), expr->get_bloom_filter_func()); + *push_down = true; + } + return Status::OK(); +} + +Status VScanNode::_normalize_function_filters(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, bool* push_down) { + bool opposite = false; + VExpr* fn_expr = expr; + if (TExprNodeType::COMPOUND_PRED == expr->node_type() && + expr->fn().name.function_name == "not") { + fn_expr = fn_expr->children()[0]; + opposite = true; + } + + if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type()) { + doris_udf::FunctionContext* fn_ctx = nullptr; + StringVal val; + if (_should_push_down_function_filter(reinterpret_cast(fn_expr), + expr_ctx, &val, &fn_ctx)) { + std::string col = slot->col_name(); + _push_down_functions.emplace_back(opposite, col, fn_ctx, val); + *push_down = true; + } + } + return Status::OK(); +} + +bool VScanNode::_is_predicate_acting_on_slot( + VExpr* expr, + const std::function&, const VSlotRef**, VExpr**)>& checker, + SlotDescriptor** slot_desc, ColumnValueRangeType** range) { + const VSlotRef* slot_ref = nullptr; + VExpr* child_contains_slot = nullptr; + if (!checker(expr->children(), &slot_ref, &child_contains_slot)) { + // not a slot ref(column) + return false; + } + + auto entry = _slot_id_to_value_range.find(slot_ref->slot_id()); + if (_slot_id_to_value_range.end() == entry) { + return false; + } + *slot_desc = entry->second.first; + DCHECK(child_contains_slot != nullptr); + if (child_contains_slot->type().type != (*slot_desc)->type().type) { + if (!ignore_cast(*slot_desc, child_contains_slot)) { + // the type of predicate not match the slot's type + return false; + } + } + *range = &(entry->second.second); + return true; +} + +void VScanNode::_eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, bool* push_down) { + char* constant_val = nullptr; + if (vexpr->is_constant()) { + if (const ColumnConst* const_column = + check_and_get_column(vexpr->get_const_col(expr_ctx)->column_ptr)) { + constant_val = const_cast(const_column->get_data_at(0).data); + if (constant_val == nullptr || *reinterpret_cast(constant_val) == false) { + *push_down = true; + _eos = true; + } + } else { + LOG(WARNING) << "Expr[" << vexpr->debug_string() + << "] is a constant but doesn't contain a const column!"; + } + } +} + +template +Status VScanNode::_normalize_in_and_eq_predicate(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down) { + auto temp_range = ColumnValueRange::create_empty_column_value_range(slot->type().precision, + slot->type().scale); + bool effect = false; + // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' + if (TExprNodeType::IN_PRED == expr->node_type()) { + VInPredicate* pred = static_cast(expr); + if (!_should_push_down_in_predicate(pred, expr_ctx, false)) { + return Status::OK(); + } + + // begin to push InPredicate value into ColumnValueRange + InState* state = reinterpret_cast( + expr_ctx->fn_context(pred->fn_context_index()) + ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); + auto fn_name = std::string(""); + while (iter->has_next()) { + // column in (nullptr) is always false so continue to + // dispose next item + if (nullptr == iter->get_value()) { + iter->next(); + continue; + } + auto value = const_cast(iter->get_value()); + RETURN_IF_ERROR(_change_value_range(temp_range, value, + ColumnValueRange::add_fixed_value_range, + fn_name, !state->hybrid_set->is_date_v2())); + iter->next(); + } + + range.intersection(temp_range); + effect = true; + } else if (TExprNodeType::BINARY_PRED == expr->node_type()) { + DCHECK(expr->children().size() == 2); + auto eq_checker = [](const std::string& fn_name) { return fn_name == "eq"; }; + + StringRef value; + int slot_ref_child = -1; + if (_should_push_down_binary_predicate(reinterpret_cast(expr), expr_ctx, + &value, &slot_ref_child, eq_checker)) { + DCHECK(slot_ref_child >= 0); + // where A = nullptr should return empty result set + auto fn_name = std::string(""); + if (value.data != nullptr) { + if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || + T == TYPE_HLL) { + auto val = StringValue(value.data, value.size); + RETURN_IF_ERROR(_change_value_range( + temp_range, reinterpret_cast(&val), + ColumnValueRange::add_fixed_value_range, fn_name)); + } else { + RETURN_IF_ERROR(_change_value_range( + temp_range, reinterpret_cast(const_cast(value.data)), + ColumnValueRange::add_fixed_value_range, fn_name)); + } + range.intersection(temp_range); + effect = true; + } + } + } + + // exceed limit, no conditions will be pushed down to storage engine. + if (range.get_fixed_value_size() > _max_pushdown_conditions_per_column) { + range.set_whole_value_range(); + } else { + *push_down = effect; + } + return Status::OK(); +} + +template +Status VScanNode::_normalize_not_in_and_not_eq_predicate(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, + ColumnValueRange& range, + bool* push_down) { + bool is_fixed_range = range.is_fixed_value_range(); + auto not_in_range = ColumnValueRange::create_empty_column_value_range(range.column_name()); + bool effect = false; + // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' + if (TExprNodeType::IN_PRED == expr->node_type()) { + VInPredicate* pred = static_cast(expr); + if (!_should_push_down_in_predicate(pred, expr_ctx, true)) { + return Status::OK(); + } + + // begin to push InPredicate value into ColumnValueRange + InState* state = reinterpret_cast( + expr_ctx->fn_context(pred->fn_context_index()) + ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); + auto fn_name = std::string(""); + while (iter->has_next()) { + // column not in (nullptr) is always true + if (nullptr == iter->get_value()) { + continue; + } + auto value = const_cast(iter->get_value()); + if (is_fixed_range) { + RETURN_IF_ERROR(_change_value_range( + range, value, ColumnValueRange::remove_fixed_value_range, fn_name, + !state->hybrid_set->is_date_v2())); + } else { + RETURN_IF_ERROR(_change_value_range( + not_in_range, value, ColumnValueRange::add_fixed_value_range, fn_name, + !state->hybrid_set->is_date_v2())); + } + iter->next(); + } + effect = true; + } else if (TExprNodeType::BINARY_PRED == expr->node_type()) { + DCHECK(expr->children().size() == 2); + + auto ne_checker = [](const std::string& fn_name) { return fn_name == "ne"; }; + StringRef value; + int slot_ref_child = -1; + if (_should_push_down_binary_predicate(reinterpret_cast(expr), expr_ctx, + &value, &slot_ref_child, ne_checker)) { + DCHECK(slot_ref_child >= 0); + // where A = nullptr should return empty result set + if (value.data != nullptr) { + auto fn_name = std::string(""); + if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || + T == TYPE_HLL) { + auto val = StringValue(value.data, value.size); + if (is_fixed_range) { + RETURN_IF_ERROR(_change_value_range( + range, reinterpret_cast(&val), + ColumnValueRange::remove_fixed_value_range, fn_name)); + } else { + RETURN_IF_ERROR(_change_value_range( + not_in_range, reinterpret_cast(&val), + ColumnValueRange::add_fixed_value_range, fn_name)); + } + } else { + if (is_fixed_range) { + RETURN_IF_ERROR(_change_value_range( + range, reinterpret_cast(const_cast(value.data)), + ColumnValueRange::remove_fixed_value_range, fn_name)); + } else { + RETURN_IF_ERROR(_change_value_range( + not_in_range, + reinterpret_cast(const_cast(value.data)), + ColumnValueRange::add_fixed_value_range, fn_name)); + } + } + effect = true; + } + } + } + + if (is_fixed_range || + not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) { + if (!is_fixed_range) { + // push down not in condition to storage engine + not_in_range.to_in_condition(_olap_filters, false); + } + *push_down = effect; + } + return Status::OK(); +} + +template +Status VScanNode::_normalize_is_null_predicate(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down) { + if (TExprNodeType::FUNCTION_CALL == expr->node_type()) { + if (reinterpret_cast(expr)->fn().name.function_name == "is_null_pred") { + auto temp_range = ColumnValueRange::create_empty_column_value_range( + slot->type().precision, slot->type().scale); + temp_range.set_contain_null(true); + range.intersection(temp_range); + *push_down = true; + } else if (reinterpret_cast(expr)->fn().name.function_name == + "is_not_null_pred") { + auto temp_range = ColumnValueRange::create_empty_column_value_range( + slot->type().precision, slot->type().scale); + temp_range.set_contain_null(false); + range.intersection(temp_range); + *push_down = true; + } + } + return Status::OK(); +} + +template +Status VScanNode::_normalize_noneq_binary_predicate(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, + ColumnValueRange& range, bool* push_down) { + if (TExprNodeType::BINARY_PRED == expr->node_type()) { + DCHECK(expr->children().size() == 2); + + auto noneq_checker = [](const std::string& fn_name) { + return fn_name != "ne" && fn_name != "eq"; + }; + StringRef value; + int slot_ref_child = -1; + if (_should_push_down_binary_predicate(reinterpret_cast(expr), expr_ctx, + &value, &slot_ref_child, noneq_checker)) { + DCHECK(slot_ref_child >= 0); + const std::string& fn_name = + reinterpret_cast(expr)->fn().name.function_name; + + // where A = nullptr should return empty result set + if (value.data != nullptr) { + *push_down = true; + if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || + T == TYPE_HLL) { + auto val = StringValue(value.data, value.size); + RETURN_IF_ERROR(_change_value_range(range, reinterpret_cast(&val), + ColumnValueRange::add_value_range, + fn_name, true, slot_ref_child)); + } else { + RETURN_IF_ERROR(_change_value_range( + range, reinterpret_cast(const_cast(value.data)), + ColumnValueRange::add_value_range, fn_name, true, slot_ref_child)); + } + } + } + } + return Status::OK(); +} + +template +Status VScanNode::_change_value_range(ColumnValueRange& temp_range, void* value, + const ChangeFixedValueRangeFunc& func, + const std::string& fn_name, bool cast_date_to_datetime, + int slot_ref_child) { + if constexpr (PrimitiveType == TYPE_DATE) { + DateTimeValue date_value; + reinterpret_cast(value)->convert_vec_dt_to_dt(&date_value); + if constexpr (IsFixed) { + if (!date_value.check_loss_accuracy_cast_to_date()) { + func(temp_range, + reinterpret_cast::CppType*>( + &date_value)); + } + } else { + if (date_value.check_loss_accuracy_cast_to_date()) { + if (fn_name == "lt" || fn_name == "ge") { + ++date_value; + } + } + func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), + reinterpret_cast::CppType*>( + &date_value)); + } + } else if constexpr (PrimitiveType == TYPE_DATETIME) { + DateTimeValue date_value; + reinterpret_cast(value)->convert_vec_dt_to_dt(&date_value); + if constexpr (IsFixed) { + func(temp_range, + reinterpret_cast::CppType*>( + &date_value)); + } else { + func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), + reinterpret_cast::CppType*>( + reinterpret_cast(&date_value))); + } + } else if constexpr (PrimitiveType == TYPE_DATEV2) { + if (cast_date_to_datetime) { + DateV2Value datetimev2_value = + *reinterpret_cast*>(value); + if constexpr (IsFixed) { + if (datetimev2_value.can_cast_to_date_without_loss_accuracy()) { + DateV2Value date_v2; + date_v2.set_date_uint32(binary_cast, uint64_t>( + datetimev2_value) >> + TIME_PART_LENGTH); + func(temp_range, &date_v2); + } + } else { + doris::vectorized::DateV2Value date_v2; + date_v2.set_date_uint32( + binary_cast, uint64_t>(datetimev2_value) >> + TIME_PART_LENGTH); + if (!datetimev2_value.can_cast_to_date_without_loss_accuracy()) { + if (fn_name == "lt" || fn_name == "ge") { + ++date_v2; + } + } + func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), &date_v2); + } + } else { + if constexpr (IsFixed) { + func(temp_range, + reinterpret_cast::CppType*>( + value)); + } else { + func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), + reinterpret_cast::CppType*>( + value)); + } + } + } else if constexpr ((PrimitiveType == TYPE_DECIMALV2) || (PrimitiveType == TYPE_CHAR) || + (PrimitiveType == TYPE_VARCHAR) || (PrimitiveType == TYPE_HLL) || + (PrimitiveType == TYPE_DATETIMEV2) || (PrimitiveType == TYPE_TINYINT) || + (PrimitiveType == TYPE_SMALLINT) || (PrimitiveType == TYPE_INT) || + (PrimitiveType == TYPE_BIGINT) || (PrimitiveType == TYPE_LARGEINT) || + (PrimitiveType == TYPE_DECIMAL32) || (PrimitiveType == TYPE_DECIMAL64) || + (PrimitiveType == TYPE_DECIMAL128) || (PrimitiveType == TYPE_STRING) || + (PrimitiveType == TYPE_BOOLEAN)) { + if constexpr (IsFixed) { + func(temp_range, + reinterpret_cast::CppType*>(value)); + } else { + func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), + reinterpret_cast::CppType*>(value)); + } + } else { + static_assert(always_false_v); + } + + return Status::OK(); +} + +Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { + if (_is_all_rf_applied) { + *arrived_rf_num = _runtime_filter_descs.size(); + return Status::OK(); + } + + // This method will be called in scanner thread. + // So need to add lock + std::unique_lock l(_rf_locks); + if (_is_all_rf_applied) { + *arrived_rf_num = _runtime_filter_descs.size(); + return Status::OK(); + } + + // 1. Check if are runtime filter ready but not applied. + std::vector vexprs; + int current_arrived_rf_num = 0; + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + if (_runtime_filter_ctxs[i].apply_mark) { + ++current_arrived_rf_num; + continue; + } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) { + _runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, row_desc()); + ++current_arrived_rf_num; + _runtime_filter_ctxs[i].apply_mark = true; + } + } + // 2. Append unapplied runtime filters to vconjunct_ctx_ptr + if (!vexprs.empty()) { + RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs)); + } + if (current_arrived_rf_num == _runtime_filter_descs.size()) { + _is_all_rf_applied = true; + } + + *arrived_rf_num = current_arrived_rf_num; + return Status::OK(); +} + +Status VScanNode::clone_vconjunct_ctx(VExprContext** _vconjunct_ctx) { + if (_vconjunct_ctx_ptr) { + std::unique_lock l(_rf_locks); + return (*_vconjunct_ctx_ptr)->clone(_state, _vconjunct_ctx); + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h new file mode 100644 index 0000000000..292234162e --- /dev/null +++ b/be/src/vec/exec/scan/vscan_node.h @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/exec_node.h" +#include "exec/olap_common.h" +#include "exprs/function_filter.h" +#include "exprs/runtime_filter.h" +#include "vec/exec/scan/scanner_context.h" +#include "vec/exprs/vectorized_fn_call.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vin_predicate.h" + +namespace doris::vectorized { + +class VScanner; +class VSlotRef; + +class VScanNode : public ExecNode { +public: + VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs) {} + friend class NewOlapScanner; + + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + + Status prepare(RuntimeState* state) override; + + Status open(RuntimeState* state) override; + + virtual void set_scan_ranges(const std::vector& scan_ranges) {} + + Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override { + return Status::NotSupported("Not implement"); + } + + // Get next block. + // If eos is true, no more data will be read and block should be empty. + Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; + + Status close(RuntimeState* state) override; + + void set_no_agg_finalize() { _need_agg_finalize = false; } + + // Try append late arrived runtime filters. + // Return num of filters which are applied already. + Status try_append_late_arrival_runtime_filter(int* arrived_rf_num); + + // Clone current vconjunct_ctx to _vconjunct_ctx, if exists. + Status clone_vconjunct_ctx(VExprContext** _vconjunct_ctx); + + int runtime_filter_num() const { return (int)_runtime_filter_ctxs.size(); } + + TupleId input_tuple_id() const { return _input_tuple_id; } + TupleId output_tuple_id() const { return _output_tuple_id; } + const TupleDescriptor* input_tuple_desc() const { return _input_tuple_desc; } + const TupleDescriptor* output_tuple_desc() const { return _output_tuple_desc; } + +protected: + // Different data sources register different profiles by implementing this method + virtual Status _init_profile() { return Status::OK(); } + + // Process predicates, extract the predicates in the conjuncts that can be pushed down + // to the data source, and convert them into common expressions structure ColumnPredicate. + // There are currently 3 types of predicates that can be pushed down to data sources: + // + // 1. Simple predicate, with column on left and constant on right, such as "a=1", "b in (1,2,3)" etc. + // 2. Bloom Filter, predicate condition generated by runtime filter + // 3. Function Filter, some data sources can accept function conditions, such as "a like 'abc%'" + // + // Predicates that can be fully processed by the data source will be removed from conjuncts + virtual Status _process_conjuncts() { + RETURN_IF_ERROR(_normalize_conjuncts()); + return Status::OK(); + } + + // Create a list of scanners. + // The number of scanners is related to the implementation of the data source, + // predicate conditions, and scheduling strategy. + // So this method needs to be implemented separately by the subclass of ScanNode. + // Finally, a set of scanners that have been prepared are returned. + virtual Status _init_scanners(std::list* scanners) { return Status::OK(); } + + // Different data sources can implement the following 3 methods to determine whether a predicate + // can be pushed down to the data source. + // 3 types: + // 1. binary predicate + // 2. in/not in predicate + // 3. function predicate + // TODO: these interfaces should be change to become more common. + virtual bool _should_push_down_binary_predicate( + VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* constant_val, + int* slot_ref_child, const std::function& fn_checker) { + return false; + } + + virtual bool _should_push_down_in_predicate(VInPredicate* in_pred, VExprContext* expr_ctx, + bool is_not_in) { + return false; + } + + virtual bool _should_push_down_function_filter(VectorizedFnCall* fn_call, + VExprContext* expr_ctx, StringVal* constant_str, + doris_udf::FunctionContext** fn_ctx) { + return false; + } + + // Return true if it is a key column. + // Only predicate on key column can be pushed down. + virtual bool _is_key_column(const std::string& col_name) { return false; } + +protected: + RuntimeState* _state; + // For load scan node, there should be both input and output tuple descriptor. + // For query scan node, there is only output_tuple_desc. + TupleId _input_tuple_id = -1; + TupleId _output_tuple_id = -1; + const TupleDescriptor* _input_tuple_desc; + const TupleDescriptor* _output_tuple_desc; + + // These two values are from query_options + int _max_scan_key_num; + int _max_pushdown_conditions_per_column; + + // For runtime filters + struct RuntimeFilterContext { + RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {} + // set to true if this runtime filter is already applied to vconjunct_ctx_ptr + bool apply_mark; + IRuntimeFilter* runtime_filter; + }; + std::vector _runtime_filter_ctxs; + + std::vector _runtime_filter_descs; + // Set to true if the runtime filter is ready. + std::vector _runtime_filter_ready_flag; + std::mutex _rf_locks; + std::map _conjunct_id_to_runtime_filter_ctxs; + phmap::flat_hash_set _rf_vexpr_set; + // True means all runtime filters are applied to scanners + bool _is_all_rf_applied = true; + + // Each scan node will generates a ScannerContext to manage all Scanners. + // See comments of ScannerContext for more details + std::shared_ptr _scanner_ctx; + // Save all scanner objects. + ObjectPool _scanner_pool; + + // indicate this scan node has no more data to return + bool _eos = false; + + // Save all bloom filter predicates which may be pushed down to data source. + // column name -> bloom filter function + std::vector>> + _bloom_filters_push_down; + + // Save all function predicates which may be pushed down to data source. + std::vector _push_down_functions; + + // slot id -> ColumnValueRange + // Parsed from conjunts + phmap::flat_hash_map> + _slot_id_to_value_range; + // column -> ColumnValueRange + std::map _colname_to_value_range; + + bool _need_agg_finalize = true; + + // TODO: should be moved to olap scan node? + std::vector _olap_filters; + + // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector + // so that it will be destroyed uniformly at the end of the query. + std::vector> _stale_vexpr_ctxs; + + // If sort info is set, push limit to each scanner; + int64_t _limit_per_scanner = -1; + +private: + // Register and get all runtime filters at Init phase. + Status _register_runtime_filter(); + // Get all arrived runtime filters at Open phase. + Status _acquire_runtime_filter(); + // Append late-arrival runtime filters to the vconjunct_ctx. + Status _append_rf_into_conjuncts(std::vector& vexprs); + + Status _normalize_conjuncts(); + VExpr* _normalize_predicate(VExpr* conjunct_expr_root); + void _eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, bool* push_down); + + Status _normalize_bloom_filter(VExpr* expr, VExprContext* expr_ctx, SlotDescriptor* slot, + bool* push_down); + + Status _normalize_function_filters(VExpr* expr, VExprContext* expr_ctx, SlotDescriptor* slot, + bool* push_down); + + bool _is_predicate_acting_on_slot(VExpr* expr, + const std::function&, + const VSlotRef**, VExpr**)>& checker, + SlotDescriptor** slot_desc, ColumnValueRangeType** range); + + template + Status _normalize_in_and_eq_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down); + template + Status _normalize_not_in_and_not_eq_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down); + + template + Status _normalize_noneq_binary_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down); + + template + Status _normalize_is_null_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down); + + template + static Status _change_value_range(ColumnValueRange& range, void* value, + const ChangeFixedValueRangeFunc& func, + const std::string& fn_name, bool cast_date_to_datetime = true, + int slot_ref_child = -1); + + // Submit the scanner to the thread pool and start execution + Status _start_scanners(const std::list& scanners); +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp new file mode 100644 index 0000000000..e154e0e6ba --- /dev/null +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/vscanner.h" + +#include "vec/exec/scan/vscan_node.h" + +namespace doris::vectorized { + +VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, MemTracker* mem_tracker) + : _state(state), + _parent(parent), + _limit(limit), + _mem_tracker(mem_tracker), + _input_tuple_desc(parent->input_tuple_desc()), + _output_tuple_desc(parent->output_tuple_desc()) { + _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc; + _total_rf_num = _parent->runtime_filter_num(); +} + +Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { + // only empty block should be here + DCHECK(block->rows() == 0); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + + int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; + if (!block->mem_reuse()) { + for (const auto slot_desc : _output_tuple_desc->slots()) { + block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + } + + _init_input_block(block); + { + // SCOPED_TIMER(_parent->_scan_timer); + do { + // 1. Get input block from scanner + RETURN_IF_ERROR(_get_block_impl(state, _input_block_ptr, eof)); + if (*eof) { + DCHECK(_input_block_ptr->rows() == 0); + break; + } + _num_rows_read += _input_block_ptr->rows(); + // _update_realtime_counter(); + + // 2. For load, use prefilter to filter the input block first. + RETURN_IF_ERROR(_filter_input_block(_input_block_ptr)); + + // 3. For load, convert input block to output block + RETURN_IF_ERROR(_convert_to_output_block(block)); + + // 4. Filter the output block finally. + // NOTE that step 2/3 may be skipped, for Query. + RETURN_IF_ERROR(_filter_output_block(block)); + // record rows return (after filter) for _limit check + _num_rows_return += block->rows(); + } while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold); + } + + return Status::OK(); +} + +void VScanner::_init_input_block(Block* output_block) { + if (_input_tuple_desc == nullptr) { + _input_block_ptr = output_block; + return; + } + + // init the input block used for scanner. + _input_block.clear(); + _input_block_ptr = &_input_block; + DCHECK(_input_block.columns() == 0); + + for (auto& slot_desc : _input_tuple_desc->slots()) { + auto data_type = slot_desc->get_data_type_ptr(); + _input_block.insert(vectorized::ColumnWithTypeAndName( + data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name())); + } +} + +Status VScanner::_filter_input_block(Block* block) { + // TODO: implement + return Status::OK(); +} + +Status VScanner::_convert_to_output_block(Block* output_block) { + if (_input_block_ptr == output_block) { + return Status::OK(); + } + // TODO: implement + + return Status::OK(); +} + +Status VScanner::_filter_output_block(Block* block) { + return VExprContext::filter_block(_vconjunct_ctx, block, _output_tuple_desc->slots().size()); +} + +Status VScanner::try_append_late_arrival_runtime_filter() { + if (_applied_rf_num == _total_rf_num) { + return Status::OK(); + } + DCHECK(_applied_rf_num < _total_rf_num); + + int arrived_rf_num = 0; + RETURN_IF_ERROR(_parent->try_append_late_arrival_runtime_filter(&arrived_rf_num)); + + if (arrived_rf_num == _applied_rf_num) { + // No newly arrived runtime filters, just return; + return Status::OK(); + } + + // There are newly arrived runtime filters, + // renew the vconjunct_ctx_ptr + if (_vconjunct_ctx) { + _discard_conjuncts(); + } + // Notice that the number of runtiem filters may be larger than _applied_rf_num. + // But it is ok because it will be updated at next time. + RETURN_IF_ERROR(_parent->clone_vconjunct_ctx(&_vconjunct_ctx)); + _applied_rf_num = arrived_rf_num; + return Status::OK(); +} + +Status VScanner::close(RuntimeState* state) { + if (_is_closed) { + return Status::OK(); + } + for (auto& ctx : _stale_vexpr_ctxs) { + ctx->close(state); + } + if (_vconjunct_ctx) { + _vconjunct_ctx->close(state); + } + _is_closed = true; + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h new file mode 100644 index 0000000000..aff3c60393 --- /dev/null +++ b/be/src/vec/exec/scan/vscanner.h @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "olap/tablet.h" +#include "runtime/runtime_state.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris::vectorized { + +class Block; +class VScanNode; + +class VScanner { +public: + VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, MemTracker* mem_tracker); + + virtual ~VScanner() {} + + virtual Status open(RuntimeState* state) { return Status::OK(); } + + Status get_block(RuntimeState* state, Block* block, bool* eos); + + virtual Status close(RuntimeState* state); + + // Subclass must implement this to return the current rows read + virtual int64_t raw_rows_read() { return 0; } + +protected: + // Subclass should implement this to return data. + virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0; + + // Init the input block if _input_tuple_desc is set. + // Otherwise, use output_block directly. + void _init_input_block(Block* output_block); + + // Use prefilters to filter input block + Status _filter_input_block(Block* block); + + // Convert input block to output block, if needed. + Status _convert_to_output_block(Block* output_block); + + // Filter the output block finally. + Status _filter_output_block(Block* block); + +public: + VScanNode* get_parent() { return _parent; } + + Status try_append_late_arrival_runtime_filter(); + + // Call start_wait_worker_timer() when submit the scanner to the thread pool. + // And call update_wait_worker_timer() when it is actually being executed. + void start_wait_worker_timer() {} + int64_t update_wait_worker_timer() { return 0; } + + RuntimeState* runtime_state() { return _state; } + + bool is_open() { return _is_open; } + void set_opened() { _is_open = true; } + + int queue_id() { return _state->exec_env()->store_path_to_index("xxx"); } + + doris::TabletStorageType get_storage_type() { + return doris::TabletStorageType::STORAGE_TYPE_LOCAL; + } + + bool need_to_close() { return _need_to_close; } + + void mark_to_need_to_close() { _need_to_close = true; } + + void set_status_on_failure(const Status& st) { _status = st; } + + VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; } + +protected: + void _discard_conjuncts() { + if (_vconjunct_ctx) { + _vconjunct_ctx->mark_as_stale(); + _stale_vexpr_ctxs.push_back(_vconjunct_ctx); + _vconjunct_ctx = nullptr; + } + } + +protected: + RuntimeState* _state; + VScanNode* _parent; + // Set if scan node has sort limit info + int64_t _limit = -1; + MemTracker* _mem_tracker; + + const TupleDescriptor* _input_tuple_desc; + const TupleDescriptor* _output_tuple_desc; + const TupleDescriptor* _real_tuple_desc; + + // If _input_tuple_desc is set, the scanner will read data into + // this _input_block first, then convert to the output block. + Block _input_block; + // If _input_tuple_desc is set, this will point to _input_block, + // otherwise, it will point to the output block. + Block* _input_block_ptr; + + bool _is_open = false; + bool _is_closed = false; + bool _need_to_close = false; + Status _status; + + // If _applied_rf_num == _total_rf_num + // means all runtime filters are arrived and applied. + int _applied_rf_num = 0; + int _total_rf_num = 0; + // Cloned from _vconjunct_ctx of scan node. + // It includes predicate in SQL and runtime filters. + VExprContext* _vconjunct_ctx = nullptr; + // Late arriving runtime filters will update _vconjunct_ctx. + // The old _vconjunct_ctx will be temporarily placed in _stale_vexpr_ctxs + // and will be destroyed at the end. + std::vector _stale_vexpr_ctxs; + + int64_t _num_rows_read = 0; + int64_t _raw_rows_read = 0; + int64_t _num_rows_return = 0; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 5542eefeb0..f0c667f8f4 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -1171,12 +1171,7 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per } // post volap scanners to thread-pool - ThreadPoolToken* thread_token = nullptr; - if (_limit > -1 && _limit < 1024) { - thread_token = state->get_query_fragments_ctx()->get_serial_token(); - } else { - thread_token = state->get_query_fragments_ctx()->get_token(); - } + ThreadPoolToken* thread_token = state->get_query_fragments_ctx()->get_token(); auto iter = olap_scanners.begin(); if (thread_token != nullptr) { while (iter != olap_scanners.end()) { diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index cb1c1aa263..19e8c28cad 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -175,6 +175,7 @@ Status BlockReader::_direct_next_block(Block* block, MemPool* mem_pool, ObjectPo return res; } *eof = res.precise_code() == OLAP_ERR_DATA_EOF; + _eof = *eof; if (UNLIKELY(_reader_context.record_rowids)) { res = _vcollect_iter.current_block_row_locations(&_block_row_locations); if (UNLIKELY(!res.ok() && res != Status::OLAPInternalError(OLAP_ERR_DATA_EOF))) { @@ -208,6 +209,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP while (true) { auto res = _vcollect_iter.next(&_next_row); if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) { + _eof = true; *eof = true; break; } @@ -265,6 +267,7 @@ Status BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, Obje // merge the lower versions auto res = _vcollect_iter.next(&_next_row); if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) { + _eof = true; *eof = true; if (UNLIKELY(_reader_context.record_rowids)) { _block_row_locations.resize(target_block_row); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 1c8bcf4f87..be9861caf5 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -179,7 +179,7 @@ public: _is_local = (_brpc_dest_addr.hostname == localhost) && (_brpc_dest_addr.port == config::brpc_port); if (_is_local) { - LOG(INFO) << "will use local Exchange, dest_node_id is : " << _dest_node_id; + VLOG_NOTICE << "will use local Exchange, dest_node_id is : " << _dest_node_id; } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy index 10f2bed777..bf50eafdf4 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy @@ -375,7 +375,7 @@ class Config { String getDbNameByFile(File suiteFile) { String dir = new File(suitePath).relativePath(suiteFile.parentFile) // We put sql files under sql dir, so dbs and tables used by cases - // under sql directory should be prepared by load.groovy unbder the + // under sql directory should be prepared by load.groovy under the // parent. // // e.g.