// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "vec/exec/volap_scan_node.h" #include "common/resource_tls.h" #include "exec/scan_node.h" #include "gen_cpp/PlanNodes_types.h" #include "olap/storage_engine.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/large_int_value.h" #include "runtime/runtime_filter_mgr.h" #include "util/priority_thread_pool.hpp" #include "util/to_string.h" #include "vec/core/block.h" #include "vec/data_types/data_type_decimal.h" #include "vec/exec/volap_scanner.h" #include "vec/exprs/vcompound_pred.h" #include "vec/exprs/vexpr.h" namespace doris::vectorized { using doris::operator<<; VOlapScanNode::VOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ScanNode(pool, tnode, descs), _tuple_id(tnode.olap_scan_node.tuple_id), _olap_scan_node(tnode.olap_scan_node), _tuple_desc(nullptr), _tuple_idx(0), _eos(false), _max_materialized_row_batches(config::doris_scanner_queue_size), _start(false), _scanner_done(false), _transfer_done(false), _status(Status::OK()), _resource_info(nullptr), _buffered_bytes(0), _eval_conjuncts_fn(nullptr), _runtime_filter_descs(tnode.runtime_filters), _max_materialized_blocks(config::doris_scanner_queue_size) { _materialized_blocks.reserve(_max_materialized_blocks); _free_blocks.reserve(_max_materialized_blocks); } Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); _direct_conjunct_size = _conjunct_ctxs.size(); const TQueryOptions& query_options = state->query_options(); if (query_options.__isset.max_scan_key_num) { _max_scan_key_num = query_options.max_scan_key_num; } else { _max_scan_key_num = config::doris_max_scan_key_num; } if (query_options.__isset.max_pushdown_conditions_per_column) { _max_pushdown_conditions_per_column = query_options.max_pushdown_conditions_per_column; } else { _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; } _max_scanner_queue_size_bytes = query_options.mem_limit / 20; //TODO: session variable percent /// TODO: could one filter used in the different scan_node ? int filter_size = _runtime_filter_descs.size(); _runtime_filter_ctxs.resize(filter_size); _runtime_filter_ready_flag.resize(filter_size); for (int i = 0; i < filter_size; ++i) { IRuntimeFilter* runtime_filter = nullptr; const auto& filter_desc = _runtime_filter_descs[i]; RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id())); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter)); _runtime_filter_ctxs[i].runtimefilter = runtime_filter; _runtime_filter_ready_flag[i] = false; _rf_locks.push_back(std::make_unique()); } return Status::OK(); } void VOlapScanNode::init_scan_profile() { std::string scanner_profile_name = "VOlapScanner"; if (_olap_scan_node.__isset.table_name) { scanner_profile_name = fmt::format("VOlapScanner({0})", _olap_scan_node.table_name); } _scanner_profile.reset(new RuntimeProfile(scanner_profile_name)); runtime_profile()->add_child(_scanner_profile.get(), true, nullptr); _segment_profile.reset(new RuntimeProfile("SegmentIterator")); _scanner_profile->add_child(_segment_profile.get(), true, nullptr); } void VOlapScanNode::_init_counter(RuntimeState* state) { ADD_TIMER(_scanner_profile, "ShowHintsTime_V1"); _reader_init_timer = ADD_TIMER(_scanner_profile, "ReaderInitTime"); _read_compressed_counter = ADD_COUNTER(_segment_profile, "CompressedBytesRead", TUnit::BYTES); _read_uncompressed_counter = ADD_COUNTER(_segment_profile, "UncompressedBytesRead", TUnit::BYTES); _block_load_timer = ADD_TIMER(_segment_profile, "BlockLoadTime"); _block_load_counter = ADD_COUNTER(_segment_profile, "BlocksLoad", TUnit::UNIT); _block_fetch_timer = ADD_TIMER(_scanner_profile, "BlockFetchTime"); _raw_rows_counter = ADD_COUNTER(_segment_profile, "RawRowsRead", TUnit::UNIT); _block_convert_timer = ADD_TIMER(_scanner_profile, "BlockConvertTime"); // Will be delete after non-vectorized code is removed _block_seek_timer = ADD_TIMER(_segment_profile, "BlockSeekTime"); _block_seek_counter = ADD_COUNTER(_segment_profile, "BlockSeekCount", TUnit::UNIT); _block_init_timer = ADD_TIMER(_segment_profile, "BlockInitTime"); _block_init_seek_timer = ADD_TIMER(_segment_profile, "BlockInitSeekTime"); _block_init_seek_counter = ADD_COUNTER(_segment_profile, "BlockInitSeekCount", TUnit::UNIT); _rows_vec_cond_counter = ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT); _vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime"); _short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime"); _first_read_timer = ADD_TIMER(_segment_profile, "FirstReadTime"); _first_read_seek_timer = ADD_TIMER(_segment_profile, "FirstReadSeekTime"); _first_read_seek_counter = ADD_COUNTER(_segment_profile, "FirstReadSeekCount", TUnit::UNIT); _lazy_read_timer = ADD_TIMER(_segment_profile, "LazyReadTime"); _lazy_read_seek_timer = ADD_TIMER(_segment_profile, "LazyReadSeekTime"); _lazy_read_seek_counter = ADD_COUNTER(_segment_profile, "LazyReadSeekCount", TUnit::UNIT); _output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime"); _stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT); _bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT); _del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT); _conditions_filtered_counter = ADD_COUNTER(_segment_profile, "RowsConditionsFiltered", TUnit::UNIT); _key_range_filtered_counter = ADD_COUNTER(_segment_profile, "RowsKeyRangeFiltered", TUnit::UNIT); _io_timer = ADD_TIMER(_segment_profile, "IOTimer"); _decompressor_timer = ADD_TIMER(_segment_profile, "DecompressorTimer"); _index_load_timer = ADD_TIMER(_segment_profile, "IndexLoadTime_V1"); _scan_timer = ADD_TIMER(_scanner_profile, "ScanTime"); _scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScanCpuTime"); _total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT); _cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT); _bitmap_index_filter_counter = ADD_COUNTER(_segment_profile, "RowsBitmapIndexFiltered", TUnit::UNIT); _bitmap_index_filter_timer = ADD_TIMER(_segment_profile, "BitmapIndexFilterTimer"); _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT); _filtered_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentFiltered", TUnit::UNIT); _total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT); // time of transfer thread to wait for row batch from scan thread _scanner_wait_batch_timer = ADD_TIMER(_runtime_profile, "ScannerBatchWaitTime"); // time of scan thread to wait for worker thread of the thread pool _scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, "ScannerWorkerWaitTime"); // time of node to wait for batch/block queue _olap_wait_batch_queue_timer = ADD_TIMER(_runtime_profile, "BatchQueueWaitTime"); // for the purpose of debugging or profiling for (int i = 0; i < GENERAL_DEBUG_COUNT; ++i) { char name[64]; snprintf(name, sizeof(name), "GeneralDebugTimer%d", i); _general_debug_timer[i] = ADD_TIMER(_segment_profile, name); } } Status VOlapScanNode::prepare(RuntimeState* state) { init_scan_profile(); RETURN_IF_ERROR(ScanNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); // create scanner profile // create timer _tablet_counter = ADD_COUNTER(runtime_profile(), "TabletCount ", TUnit::UNIT); _scanner_sched_counter = ADD_COUNTER(runtime_profile(), "ScannerSchedCount ", TUnit::UNIT); _rows_pushed_cond_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsPushedCondFiltered", TUnit::UNIT); _init_counter(state); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); _scanner_mem_tracker = MemTracker::create_tracker(state->instance_mem_tracker()->limit(), "Scanners", mem_tracker()); if (_tuple_desc == nullptr) { // TODO: make sure we print all available diagnostic output to our error log return Status::InternalError("Failed to get tuple descriptor."); } const std::vector& slots = _tuple_desc->slots(); for (int i = 0; i < slots.size(); ++i) { if (!slots[i]->is_materialized()) { continue; } if (slots[i]->type().is_collection_type()) { _collection_slots.push_back(slots[i]); } if (slots[i]->type().is_string_type()) { _string_slots.push_back(slots[i]); } } _runtime_state = state; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = nullptr; state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id, &runtime_filter); DCHECK(runtime_filter != nullptr); runtime_filter->init_profile(_runtime_profile.get()); } return Status::OK(); } Status VOlapScanNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::open"); VLOG_CRITICAL << "VOlapScanNode::Open"; SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); _resource_info = ResourceTls::get_resource_tls(); // acquire runtime filter _runtime_filter_ctxs.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { auto& filter_desc = _runtime_filter_descs[i]; IRuntimeFilter* runtime_filter = nullptr; state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter); DCHECK(runtime_filter != nullptr); if (runtime_filter == nullptr) { continue; } bool ready = runtime_filter->is_ready(); if (!ready) { ready = runtime_filter->await(); } if (ready) { std::list expr_context; RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context)); _runtime_filter_ctxs[i].apply_mark = true; _runtime_filter_ctxs[i].runtimefilter = runtime_filter; for (auto ctx : expr_context) { ctx->prepare(state, row_desc(), _expr_mem_tracker); ctx->open(state); int index = _conjunct_ctxs.size(); _conjunct_ctxs.push_back(ctx); // it's safe to store address from a fix-resized vector _conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i]; } } } return Status::OK(); } void VOlapScanNode::transfer_thread(RuntimeState* state) { // scanner open pushdown to scanThread START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::transfer_thread"); SCOPED_ATTACH_TASK_THREAD(state, mem_tracker()); Status status = Status::OK(); if (_vconjunct_ctx_ptr) { for (auto scanner : _volap_scanners) { status = (*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()); if (!status.ok()) { std::lock_guard guard(_status_mutex); _status = status; break; } } } /********************************* * 优先级调度基本策略: * 1. 通过查询拆分的Range个数来确定初始nice值 * Range个数越多,越倾向于认定为大查询,nice值越小 * 2. 通过查询累计读取的数据量来调整nice值 * 读取的数据越多,越倾向于认定为大查询,nice值越小 * 3. 通过nice值来判断查询的优先级 * nice值越大的,越优先获得的查询资源 * 4. 定期提高队列内残留任务的优先级,避免大查询完全饿死 *********************************/ _total_assign_num = 0; _nice = 18 + std::max(0, 2 - (int)_volap_scanners.size() / 5); auto doris_scanner_row_num = _limit == -1 ? config::doris_scanner_row_num : std::min(static_cast(config::doris_scanner_row_num), _limit); _block_size = _limit == -1 ? state->batch_size() : std::min(static_cast(state->batch_size()), _limit); auto block_per_scanner = (doris_scanner_row_num + (_block_size - 1)) / _block_size; auto pre_block_count = std::min(_volap_scanners.size(), static_cast(config::doris_scanner_thread_pool_thread_num)) * block_per_scanner; for (int i = 0; i < pre_block_count; ++i) { auto block = new Block(_tuple_desc->slots(), _block_size); _free_blocks.emplace_back(block); _buffered_bytes += block->allocated_bytes(); } _block_mem_tracker->consume(_buffered_bytes); // read from scanner while (LIKELY(status.ok())) { int assigned_thread_num = _start_scanner_thread_task(state, block_per_scanner); std::vector blocks; { // 1 scanner idle task not empty, assign new scanner task std::unique_lock l(_scan_blocks_lock); // scanner_row_num = 16k // 16k * 10 * 12 * 8 = 15M(>2s) --> nice=10 // 16k * 20 * 22 * 8 = 55M(>6s) --> nice=0 while (_nice > 0 && _total_assign_num > (22 - _nice) * (20 - _nice) * 6) { --_nice; } // 2 wait when all scanner are running & no result in queue while (UNLIKELY(_running_thread == assigned_thread_num && _scan_blocks.empty() && !_scanner_done)) { SCOPED_TIMER(_scanner_wait_batch_timer); _scan_block_added_cv.wait(l); } // 3 transfer result block when queue is not empty if (LIKELY(!_scan_blocks.empty())) { blocks.swap(_scan_blocks); for (auto b : blocks) { _scan_row_batches_bytes -= b->allocated_bytes(); } // delete scan_block if transfer thread should be stopped // because scan_block wouldn't be useful anymore if (UNLIKELY(_transfer_done)) { std::for_each(blocks.begin(), blocks.end(), std::default_delete()); blocks.clear(); } } else { if (_scanner_done) { break; } } } if (!blocks.empty()) { _add_blocks(blocks); } } telemetry::set_span_attribute(span, _scanner_sched_counter); VLOG_CRITICAL << "TransferThread finish."; _transfer_done = true; _block_added_cv.notify_all(); { std::unique_lock l(_scan_blocks_lock); _scan_thread_exit_cv.wait(l, [this] { return _running_thread == 0; }); } VLOG_CRITICAL << "Scanner threads have been exited. TransferThread exit."; } void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { START_AND_SCOPE_SPAN(scanner->runtime_state()->get_tracer(), span, "VOlapScanNode::scanner_thread"); SCOPED_ATTACH_TASK_THREAD(_runtime_state, mem_tracker()); ADD_THREAD_LOCAL_MEM_TRACKER(scanner->mem_tracker()); Thread::set_self_name("volap_scanner"); int64_t wait_time = scanner->update_wait_worker_timer(); // Do not use ScopedTimer. There is no guarantee that, the counter // (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`. ThreadCpuStopWatch cpu_watch; cpu_watch.start(); Status status = Status::OK(); bool eos = false; RuntimeState* state = scanner->runtime_state(); DCHECK(nullptr != state); if (!scanner->is_open()) { status = scanner->open(); if (!status.ok()) { std::lock_guard guard(_status_mutex); _status = status; eos = true; } scanner->set_opened(); } /* // the following code will cause coredump when running tpcds_sf1 sqls, // disable temporariy to avoid it, SHOULD BE FIX LATER std::vector vexprs; auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks(); DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size()); for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) { if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) { /// When runtime filters are ready during running, we should use them to filter data /// in VOlapScanner. /// New arrival rf will be processed as below: /// 1. convert these runtime filters to vectorized expressions /// 2. if this is the first scanner thread to receive this rf, construct a new /// VExprContext and update `_vconjunct_ctx_ptr` in scan node. Notice that we use /// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only /// once after any runtime_filters are ready. /// 3. finally, just copy this new VExprContext to scanner and use it to filter data. IRuntimeFilter* runtime_filter = nullptr; state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id, &runtime_filter); DCHECK(runtime_filter != nullptr); bool ready = runtime_filter->is_ready(); if (ready) { runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), _expr_mem_tracker); scanner_filter_apply_marks[i] = true; if (!_runtime_filter_ready_flag[i] && !vexprs.empty()) { std::unique_lock l(*(_rf_locks[i])); if (!_runtime_filter_ready_flag[i]) { // Use all conjuncts and new arrival runtime filters to construct a new // expression tree here. auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0]; for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) { if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) { continue; } TExprNode texpr_node; texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED); texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND); VExpr* new_node = _pool->add(new VcompoundPred(texpr_node)); new_node->add_child(last_expr); new_node->add_child(vexprs[j]); last_expr = new_node; _rf_vexpr_set.insert(vexprs[j]); } auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr)); auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc(), expr_mem_tracker()); // If error occurs in `prepare` or `open` phase, discard these runtime // filters directly. if (UNLIKELY(!expr_status.OK())) { LOG(WARNING) << "Something wrong for runtime filters: " << expr_status; vexprs.clear(); break; } expr_status = new_vconjunct_ctx_ptr->open(state); if (UNLIKELY(!expr_status.OK())) { LOG(WARNING) << "Something wrong for runtime filters: " << expr_status; vexprs.clear(); break; } if (_vconjunct_ctx_ptr) { _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); } _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr; _runtime_filter_ready_flag[i] = true; } } } } } if (!vexprs.empty()) { if (*scanner->vconjunct_ctx_ptr()) { (*scanner->vconjunct_ctx_ptr())->close(state); *scanner->vconjunct_ctx_ptr() = nullptr; } WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()), "Something wrong for runtime filters: "); scanner->set_use_pushdown_conjuncts(true); } */ std::vector blocks; // Because we use thread pool to scan data from storage. One scanner can't // use this thread too long, this can starve other query's scanner. So, we // need yield this thread when we do enough work. However, OlapStorage read // data in pre-aggregate mode, then we can't use storage returned data to // judge if we need to yield. So we record all raw data read in this round // scan, if this exceed row number or bytes threshold, we yield this thread. int64_t raw_rows_read = scanner->raw_rows_read(); int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; int64_t raw_bytes_read = 0; int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; bool get_free_block = true; int num_rows_in_block = 0; // Has to wait at least one full block, or it will cause a lot of schedule task in priority // queue, it will affect query latency and query concurrency for example ssb 3.3. while (!eos && raw_bytes_read < raw_bytes_threshold && ((raw_rows_read < raw_rows_threshold && get_free_block) || num_rows_in_block < _runtime_state->batch_size())) { if (UNLIKELY(_transfer_done)) { eos = true; status = Status::Cancelled("Cancelled"); LOG(INFO) << "Scan thread cancelled, cause query done, maybe reach limit."; break; } auto block = _alloc_block(get_free_block); status = scanner->get_block(_runtime_state, block, &eos); VLOG_ROW << "VOlapScanNode input rows: " << block->rows(); if (!status.ok()) { LOG(WARNING) << "Scan thread read VOlapScanner failed: " << status.to_string(); // Add block ptr in blocks, prevent mem leak in read failed blocks.push_back(block); eos = true; break; } raw_bytes_read += block->bytes(); num_rows_in_block += block->rows(); // 4. if status not ok, change status_. if (UNLIKELY(block->rows() == 0)) { std::lock_guard l(_free_blocks_lock); _free_blocks.emplace_back(block); } else { if (!blocks.empty() && blocks.back()->rows() + block->rows() <= _runtime_state->batch_size()) { MutableBlock(blocks.back()).merge(*block); block->clear_column_data(); std::lock_guard l(_free_blocks_lock); _free_blocks.emplace_back(block); } else { blocks.push_back(block); } } raw_rows_read = scanner->raw_rows_read(); } { // if we failed, check status. if (UNLIKELY(!status.ok())) { _transfer_done = true; std::lock_guard guard(_status_mutex); if (LIKELY(_status.ok())) { _status = status; } } bool global_status_ok = false; { std::lock_guard guard(_status_mutex); global_status_ok = _status.ok(); } if (UNLIKELY(!global_status_ok)) { eos = true; std::for_each(blocks.begin(), blocks.end(), std::default_delete()); } else { std::lock_guard l(_scan_blocks_lock); _scan_blocks.insert(_scan_blocks.end(), blocks.begin(), blocks.end()); for (auto b : blocks) { _scan_row_batches_bytes += b->allocated_bytes(); } } // If eos is true, we will process out of this lock block. if (eos) { scanner->mark_to_need_to_close(); } std::lock_guard l(_volap_scanners_lock); _volap_scanners.push_front(scanner); } if (eos) { std::lock_guard l(_scan_blocks_lock); _progress.update(1); if (_progress.done()) { // this is the right out _scanner_done = true; } } _scan_cpu_timer->update(cpu_watch.elapsed_time()); _scanner_wait_worker_timer->update(wait_time); std::unique_lock l(_scan_blocks_lock); _running_thread--; // The transfer thead will wait for `_running_thread==0`, to make sure all scanner threads won't access class members. // Do not access class members after this code. _scan_block_added_cv.notify_one(); _scan_thread_exit_cv.notify_one(); } Status VOlapScanNode::_add_blocks(std::vector& block) { { std::unique_lock l(_blocks_lock); // check queue limit for both block queue size and bytes while (UNLIKELY((_materialized_blocks.size() >= _max_materialized_blocks || _materialized_row_batches_bytes >= _max_scanner_queue_size_bytes / 2) && !_transfer_done)) { _block_consumed_cv.wait(l); } VLOG_CRITICAL << "Push block to materialized_blocks"; _materialized_blocks.insert(_materialized_blocks.end(), block.cbegin(), block.cend()); for (auto b : block) { _materialized_row_batches_bytes += b->allocated_bytes(); } } // remove one block, notify main thread _block_added_cv.notify_one(); return Status::OK(); } void VOlapScanNode::eval_const_conjuncts() { for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { // if conjunct is constant, compute direct and set eos = true if (_conjunct_ctxs[conj_idx]->root()->is_constant()) { void* value = _conjunct_ctxs[conj_idx]->get_value(nullptr); if (value == nullptr || *reinterpret_cast(value) == false) { _eos = true; break; } } } } Status VOlapScanNode::normalize_conjuncts() { std::vector slots = _tuple_desc->slots(); for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) { switch (slots[slot_idx]->type().type) { case TYPE_TINYINT: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_SMALLINT: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_INT: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_BIGINT: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_LARGEINT: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_CHAR: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_VARCHAR: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_HLL: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_STRING: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_DATE: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_DATETIME: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_DATEV2: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_DECIMALV2: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_DECIMAL32: { ColumnValueRange range(slots[slot_idx]->col_name(), slots[slot_idx]->type().precision, slots[slot_idx]->type().scale); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_DECIMAL64: { ColumnValueRange range(slots[slot_idx]->col_name(), slots[slot_idx]->type().precision, slots[slot_idx]->type().scale); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_DECIMAL128: { ColumnValueRange range(slots[slot_idx]->col_name(), slots[slot_idx]->type().precision, slots[slot_idx]->type().scale); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_BOOLEAN: { ColumnValueRange range(slots[slot_idx]->col_name()); normalize_predicate(range, slots[slot_idx]); break; } default: { VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << slots[slot_idx]->col_name() << "]"; break; } } } return Status::OK(); } static std::string olap_filter_to_string(const doris::TCondition& condition) { auto op_name = condition.condition_op; if (condition.condition_op == "*=") { op_name = "IN"; } else if (condition.condition_op == "!*=") { op_name = "NOT IN"; } return fmt::format("{{{} {} {}}}", condition.column_name, op_name, to_string(condition.condition_values)); } static std::string olap_filters_to_string(const std::vector& filters) { std::string filters_string; filters_string += "["; for (auto it = filters.cbegin(); it != filters.cend(); it++) { if (it != filters.cbegin()) { filters_string += ","; } filters_string += olap_filter_to_string(*it); } filters_string += "]"; return filters_string; } Status VOlapScanNode::build_key_ranges_and_filters() { const std::vector& column_names = _olap_scan_node.key_column_name; const std::vector& column_types = _olap_scan_node.key_column_type; DCHECK(column_types.size() == column_names.size()); // 1. construct scan key except last olap engine short key _scan_keys.set_is_convertible(limit() == -1); // we use `exact_range` to identify a key range is an exact range or not when we convert // it to `_scan_keys`. If `exact_range` is true, we can just discard it from `_olap_filter`. bool exact_range = true; for (int column_index = 0; column_index < column_names.size() && !_scan_keys.has_range_value(); ++column_index) { auto iter = _column_value_ranges.find(column_names[column_index]); if (_column_value_ranges.end() == iter) { break; } RETURN_IF_ERROR(std::visit( [&](auto&& range) { RETURN_IF_ERROR( _scan_keys.extend_scan_key(range, _max_scan_key_num, &exact_range)); if (exact_range) { _column_value_ranges.erase(iter->first); } return Status::OK(); }, iter->second)); } for (auto& iter : _column_value_ranges) { std::vector filters; std::visit([&](auto&& range) { range.to_olap_filter(filters); }, iter.second); for (const auto& filter : filters) { _olap_filter.push_back(std::move(filter)); } } _runtime_profile->add_info_string("PushdownPredicate", olap_filters_to_string(_olap_filter)); _runtime_profile->add_info_string("KeyRanges", _scan_keys.debug_string()); VLOG_CRITICAL << _scan_keys.debug_string(); return Status::OK(); } Status VOlapScanNode::start_scan(RuntimeState* state) { RETURN_IF_CANCELLED(state); VLOG_CRITICAL << "Eval Const Conjuncts"; // 1. Eval const conjuncts to find whether eos = true eval_const_conjuncts(); VLOG_CRITICAL << "NormalizeConjuncts"; // 2. Convert conjuncts to ColumnValueRange in each column, some conjuncts may // set eos = true RETURN_IF_ERROR(normalize_conjuncts()); // 1 and 2 step dispose find conjuncts set eos = true, return directly if (_eos) { return Status::OK(); } VLOG_CRITICAL << "BuildKeyRangesAndFilters"; // 3. Using `Key Column`'s ColumnValueRange to split ScanRange to several `Sub ScanRange` RETURN_IF_ERROR(build_key_ranges_and_filters()); VLOG_CRITICAL << "Filter idle conjuncts"; // 4. Filter idle conjunct which already trans to olap filters // this must be after build_scan_key, it will free the StringValue memory remove_pushed_conjuncts(state); VLOG_CRITICAL << "StartScanThread"; // 5. Start multi thread to read several `Sub Sub ScanRange` RETURN_IF_ERROR(start_scan_thread(state)); return Status::OK(); } template Status VOlapScanNode::normalize_predicate(ColumnValueRange& range, SlotDescriptor* slot) { // 1. Normalize InPredicate, add to ColumnValueRange RETURN_IF_ERROR(normalize_in_and_eq_predicate(slot, &range)); // 2. Normalize NotInPredicate, add to ColumnValueRange RETURN_IF_ERROR(normalize_not_in_and_not_eq_predicate(slot, &range)); // 3. Normalize BinaryPredicate , add to ColumnValueRange RETURN_IF_ERROR(normalize_noneq_binary_predicate(slot, &range)); // 3. Normalize BloomFilterPredicate, push down by hash join node RETURN_IF_ERROR(normalize_bloom_filter_predicate(slot)); // 4. Check whether range is empty, set _eos if (range.is_empty_value_range()) _eos = true; // 5. Add range to Column->ColumnValueRange map _column_value_ranges[slot->col_name()] = range; return Status::OK(); } static bool ignore_cast(SlotDescriptor* slot, Expr* expr) { if ((slot->type().is_date_type() || slot->type().is_date_v2_type()) && (expr->type().is_date_type() || expr->type().is_date_v2_type())) { return true; } if (slot->type().is_string_type() && expr->type().is_string_type()) { return true; } return false; } bool VOlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor* slot, doris::InPredicate* pred) { if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { // not a slot ref(column) return false; } std::vector slot_ids; if (pred->get_child(0)->get_slot_ids(&slot_ids) != 1) { // not a single column predicate return false; } if (slot_ids[0] != slot->id()) { // predicate not related to current column return false; } if (pred->get_child(0)->type().type != slot->type().type) { if (!ignore_cast(slot, pred->get_child(0))) { // the type of predicate not match the slot's type return false; } } VLOG_CRITICAL << slot->col_name() << " fixed_values add num: " << pred->hybrid_set()->size(); // if there are too many elements in InPredicate, exceed the limit, // we will not push any condition of this column to storage engine. // because too many conditions pushed down to storage engine may even // slow down the query process. // ATTN: This is just an experience value. You may need to try // different thresholds to improve performance. if (pred->hybrid_set()->size() > _max_pushdown_conditions_per_column) { VLOG_NOTICE << "Predicate value num " << pred->hybrid_set()->size() << " exceed limit " << _max_pushdown_conditions_per_column; return false; } return true; } std::pair VOlapScanNode::should_push_down_eq_predicate(doris::SlotDescriptor* slot, doris::Expr* pred, int conj_idx, int child_idx) { auto result_pair = std::make_pair(false, nullptr); // Do not get slot_ref of column, should not push_down to Storage Engine if (Expr::type_without_cast(pred->get_child(child_idx)) != TExprNodeType::SLOT_REF) { return result_pair; } std::vector slot_ids; if (pred->get_child(child_idx)->get_slot_ids(&slot_ids) != 1) { // not a single column predicate return result_pair; } if (slot_ids[0] != slot->id()) { // predicate not related to current column return result_pair; } if (pred->get_child(child_idx)->type().type != slot->type().type) { if (!ignore_cast(slot, pred->get_child(child_idx))) { // the type of predicate not match the slot's type return result_pair; } } Expr* expr = pred->get_child(1 - child_idx); if (!expr->is_constant()) { // only handle constant value return result_pair; } // get value in result pair result_pair = std::make_pair( true, _conjunct_ctxs[conj_idx]->get_value(expr, nullptr, slot->type().precision, slot->type().scale)); return result_pair; } template Status VOlapScanNode::change_fixed_value_range(ColumnValueRange& temp_range, void* value, const ChangeFixedValueRangeFunc& func) { switch (primitive_type) { case TYPE_DATE: { DateTimeValue date_value = *reinterpret_cast(value); // There is must return empty data in olap_scan_node, // Because data value loss accuracy if (!date_value.check_loss_accuracy_cast_to_date()) { func(temp_range, reinterpret_cast::CppType*>( &date_value)); } break; } case TYPE_DECIMALV2: case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_HLL: case TYPE_DATETIME: case TYPE_TINYINT: case TYPE_SMALLINT: case TYPE_INT: case TYPE_BIGINT: case TYPE_LARGEINT: case TYPE_DECIMAL32: case TYPE_DECIMAL64: case TYPE_DECIMAL128: case TYPE_STRING: { func(temp_range, reinterpret_cast::CppType*>(value)); break; } case TYPE_BOOLEAN: { bool v = *reinterpret_cast(value); func(temp_range, reinterpret_cast::CppType*>(&v)); break; } case TYPE_DATEV2: { DateTimeValue date_value = *reinterpret_cast(value); if (!date_value.check_loss_accuracy_cast_to_date()) { doris::vectorized::DateV2Value date_v2; date_v2.convert_dt_to_date_v2(&date_value); if constexpr (primitive_type == PrimitiveType::TYPE_DATEV2) { func(temp_range, &date_v2); } else { __builtin_unreachable(); } } break; } default: { LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type=" << primitive_type << "]"; return Status::InternalError("Normalize filter fail, Unsupported Primitive type"); } } return Status::OK(); } bool VOlapScanNode::is_key_column(const std::string& key_name) { // all column in dup_keys table olap scan node threat // as key column if (_olap_scan_node.keyType == TKeysType::DUP_KEYS) { return true; } auto res = std::find(_olap_scan_node.key_column_name.begin(), _olap_scan_node.key_column_name.end(), key_name); return res != _olap_scan_node.key_column_name.end(); } void VOlapScanNode::remove_pushed_conjuncts(RuntimeState* state) { if (_pushed_conjuncts_index.empty()) { return; } // dispose direct conjunct first std::vector new_conjunct_ctxs; for (int i = 0; i < _direct_conjunct_size; ++i) { if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) == _pushed_conjuncts_index.cend()) { new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]); } else { _conjunct_ctxs[i]->close(state); } } auto new_direct_conjunct_size = new_conjunct_ctxs.size(); // dispose hash join push down conjunct second for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) { if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) == _pushed_conjuncts_index.cend()) { new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]); } else { _conjunct_ctxs[i]->close(state); } } _conjunct_ctxs = std::move(new_conjunct_ctxs); _direct_conjunct_size = new_direct_conjunct_size; // TODO: support vbloom_filter_predicate/vbinary_predicate and merge unpushed predicate to _vconjunct_ctx for (auto push_down_ctx : _pushed_conjuncts_index) { auto iter = _conjunctid_to_runtime_filter_ctxs.find(push_down_ctx); if (iter != _conjunctid_to_runtime_filter_ctxs.end()) { iter->second->runtimefilter->set_push_down_profile(); } } // set vconjunct_ctx is empty, if all conjunct if (_direct_conjunct_size == 0) { if (_vconjunct_ctx_ptr != nullptr) { (*_vconjunct_ctx_ptr)->close(state); _vconjunct_ctx_ptr = nullptr; } } // filter idle conjunct in vexpr_contexts auto checker = [&](int index) { return _pushed_conjuncts_index.count(index); }; _peel_pushed_vconjunct(state, checker); } // Construct the ColumnValueRange for one specified column // It will only handle the InPredicate and eq BinaryPredicate in conjunct_ctxs. // It will try to push down conditions of that column as much as possible, // But if the number of conditions exceeds the limit, none of conditions will be pushed down. template Status VOlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, ColumnValueRange* range) { std::vector filter_conjuncts_index; for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { // create empty range as temp range, temp range should do intersection on range auto temp_range = ColumnValueRange::create_empty_column_value_range( slot->type().precision, slot->type().scale); // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' if (TExprOpcode::FILTER_IN == _conjunct_ctxs[conj_idx]->root()->op()) { InPredicate* pred = static_cast(_conjunct_ctxs[conj_idx]->root()); if (!should_push_down_in_predicate(slot, pred)) { continue; } // begin to push InPredicate value into ColumnValueRange HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin(); while (iter->has_next()) { // column in (nullptr) is always false so continue to // dispose next item if (nullptr == iter->get_value()) { iter->next(); continue; } auto value = const_cast(iter->get_value()); RETURN_IF_ERROR(change_fixed_value_range( temp_range, value, ColumnValueRange::add_fixed_value_range)); iter->next(); } if (is_key_column(slot->col_name())) { filter_conjuncts_index.emplace_back(conj_idx); } range->intersection(temp_range); } // end of handle in predicate // 2. Normalize eq conjuncts like 'where col = value' else if (TExprNodeType::BINARY_PRED == _conjunct_ctxs[conj_idx]->root()->node_type() && FILTER_IN == to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) { Expr* pred = _conjunct_ctxs[conj_idx]->root(); DCHECK(pred->get_num_children() == 2); for (int child_idx = 0; child_idx < 2; ++child_idx) { // TODO: should use C++17 structured bindlings to refactor this code in the future: // 'auto [should_push_down, value] = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);' // make code tidier and readabler auto result_pair = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx); if (!result_pair.first) { continue; } auto value = result_pair.second; // where A = nullptr should return empty result set if (value != nullptr) { RETURN_IF_ERROR(change_fixed_value_range( temp_range, value, ColumnValueRange::add_fixed_value_range)); } if (is_key_column(slot->col_name())) { filter_conjuncts_index.emplace_back(conj_idx); } range->intersection(temp_range); } // end for each binary predicate child } // end of handling eq binary predicate } // exceed limit, no conditions will be pushed down to storage engine. if (range->get_fixed_value_size() > _max_pushdown_conditions_per_column) { range->set_whole_value_range(); } else { std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); } return Status::OK(); } // Construct the ColumnValueRange for one specified column // It will only handle the NotInPredicate and not eq BinaryPredicate in conjunct_ctxs. // It will try to push down conditions of that column as much as possible, // But if the number of conditions exceeds the limit, none of conditions will be pushed down. template Status VOlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot, ColumnValueRange* range) { // If the conjunct of slot is fixed value, will change the fixed value set of column value range // else add value to not in range and push down predicate directly bool is_fixed_range = range->is_fixed_value_range(); auto not_in_range = ColumnValueRange::create_empty_column_value_range(range->column_name()); std::vector filter_conjuncts_index; for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { // 1. Normalize in conjuncts like 'where col not in (v1, v2, v3)' if (TExprOpcode::FILTER_NOT_IN == _conjunct_ctxs[conj_idx]->root()->op()) { InPredicate* pred = static_cast(_conjunct_ctxs[conj_idx]->root()); if (!should_push_down_in_predicate(slot, pred)) { continue; } // begin to push InPredicate value into ColumnValueRange auto iter = pred->hybrid_set()->begin(); while (iter->has_next()) { // column not in (nullptr) is always true if (nullptr == iter->get_value()) { continue; } auto value = const_cast(iter->get_value()); if (is_fixed_range) { RETURN_IF_ERROR(change_fixed_value_range( *range, value, ColumnValueRange::remove_fixed_value_range)); } else { RETURN_IF_ERROR(change_fixed_value_range( not_in_range, value, ColumnValueRange::add_fixed_value_range)); } iter->next(); } // only where a in ('a', 'b', nullptr) contain nullptr will // clear temp_range to whole range, no need do intersection if (is_key_column(slot->col_name())) { filter_conjuncts_index.emplace_back(conj_idx); } } // end of handle not in predicate // 2. Normalize eq conjuncts like 'where col != value' if (TExprNodeType::BINARY_PRED == _conjunct_ctxs[conj_idx]->root()->node_type() && FILTER_NOT_IN == to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) { Expr* pred = _conjunct_ctxs[conj_idx]->root(); DCHECK(pred->get_num_children() == 2); for (int child_idx = 0; child_idx < 2; ++child_idx) { // TODO: should use C++17 structured bindlings to refactor this code in the future: // 'auto [should_push_down, value] = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);' // make code tidier and readabler auto result_pair = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx); if (!result_pair.first) { continue; } auto value = result_pair.second; if (is_fixed_range) { RETURN_IF_ERROR(change_fixed_value_range( *range, value, ColumnValueRange::remove_fixed_value_range)); } else { RETURN_IF_ERROR(change_fixed_value_range( not_in_range, value, ColumnValueRange::add_fixed_value_range)); } if (is_key_column(slot->col_name())) { filter_conjuncts_index.emplace_back(conj_idx); } } // end for each binary predicate child } // end of handling eq binary predicate } // exceed limit, no conditions will be pushed down to storage engine. if (is_fixed_range || not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) { if (!is_fixed_range) { // push down not in condition to storage engine not_in_range.to_in_condition(_olap_filter, false); } std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); } return Status::OK(); } template bool VOlapScanNode::normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot, const std::string& is_null_str, ColumnValueRange* range) { if (expr->node_type() != TExprNodeType::SLOT_REF) { return false; } std::vector slot_ids; if (1 != expr->get_slot_ids(&slot_ids)) { return false; } if (slot_ids[0] != slot->id()) { return false; } auto temp_range = ColumnValueRange::create_empty_column_value_range(slot->type().precision, slot->type().scale); temp_range.set_contain_null(is_null_str == "null"); range->intersection(temp_range); return true; } template Status VOlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot, ColumnValueRange* range) { std::vector filter_conjuncts_index; for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { Expr* root_expr = _conjunct_ctxs[conj_idx]->root(); if (TExprNodeType::BINARY_PRED != root_expr->node_type() || FILTER_IN == to_olap_filter_type(root_expr->op(), false) || FILTER_NOT_IN == to_olap_filter_type(root_expr->op(), false)) { if (TExprNodeType::FUNCTION_CALL == root_expr->node_type()) { std::string is_null_str; // 1. dispose the where pred "A is null" and "A is not null" if (root_expr->is_null_scalar_function(is_null_str) && normalize_is_null_predicate(root_expr->get_child(0), slot, is_null_str, range)) { // if column is key column should push down conjunct storage engine if (is_key_column(slot->col_name())) { filter_conjuncts_index.emplace_back(conj_idx); } } } continue; } // 2. dispose the where pred "A <,<=" and "A >,>=" Expr* pred = _conjunct_ctxs[conj_idx]->root(); DCHECK(pred->get_num_children() == 2); for (int child_idx = 0; child_idx < 2; ++child_idx) { if (Expr::type_without_cast(pred->get_child(child_idx)) != TExprNodeType::SLOT_REF) { continue; } if (pred->get_child(child_idx)->type().type != slot->type().type) { if (!ignore_cast(slot, pred->get_child(child_idx))) { continue; } } std::vector slot_ids; if (1 == pred->get_child(child_idx)->get_slot_ids(&slot_ids)) { if (slot_ids[0] != slot->id()) { continue; } Expr* expr = pred->get_child(1 - child_idx); // for case: where col_a > col_b if (!expr->is_constant()) { continue; } void* value = _conjunct_ctxs[conj_idx]->get_value( expr, nullptr, slot->type().precision, slot->type().scale); // for case: where col > null if (value == nullptr) { continue; } switch (slot->type().type) { case TYPE_DATE: { DateTimeValue date_value = *reinterpret_cast(value); // NOTE: Datetime may be truncated to a date column, so we call ++operator for date_value // for example: '2010-01-01 00:00:01' will be truncate to '2010-01-01' if (date_value.check_loss_accuracy_cast_to_date()) { if (pred->op() == TExprOpcode::LT || pred->op() == TExprOpcode::GE) { ++date_value; } } range->add_range(to_olap_filter_type(pred->op(), child_idx), *reinterpret_cast::CppType*>( &date_value)); break; } case TYPE_DATEV2: { DateTimeValue date_value = *reinterpret_cast(value); if (date_value.check_loss_accuracy_cast_to_date()) { if (pred->op() == TExprOpcode::LT || pred->op() == TExprOpcode::GE) { ++date_value; } } doris::vectorized::DateV2Value date_v2; date_v2.convert_dt_to_date_v2(&date_value); if constexpr (T == PrimitiveType::TYPE_DATEV2) { range->add_range(to_olap_filter_type(pred->op(), child_idx), date_v2); break; } else { __builtin_unreachable(); } } case TYPE_TINYINT: case TYPE_DECIMALV2: case TYPE_DECIMAL32: case TYPE_DECIMAL64: case TYPE_DECIMAL128: case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_HLL: case TYPE_DATETIME: case TYPE_SMALLINT: case TYPE_INT: case TYPE_BIGINT: case TYPE_LARGEINT: case TYPE_BOOLEAN: case TYPE_STRING: { range->add_range( to_olap_filter_type(pred->op(), child_idx), *reinterpret_cast::CppType*>(value)); break; } default: { LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type=" << expr->type() << "]"; return Status::InternalError( "Normalize filter fail, Unsupported Primitive type"); } } if (is_key_column(slot->col_name())) { filter_conjuncts_index.emplace_back(conj_idx); } VLOG_CRITICAL << slot->col_name() << " op: " << static_cast(to_olap_filter_type(pred->op(), child_idx)) << " value: " << *reinterpret_cast::CppType*>( value); } } } std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); return Status::OK(); } Status VOlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) { std::vector filter_conjuncts_index; for (int conj_idx = _direct_conjunct_size; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { Expr* root_expr = _conjunct_ctxs[conj_idx]->root(); if (TExprNodeType::BLOOM_PRED != root_expr->node_type()) continue; Expr* pred = _conjunct_ctxs[conj_idx]->root(); DCHECK(pred->get_num_children() == 1); if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { continue; } if (pred->get_child(0)->type().type != slot->type().type) { if (!ignore_cast(slot, pred->get_child(0))) { continue; } } std::vector slot_ids; if (1 == pred->get_child(0)->get_slot_ids(&slot_ids)) { if (slot_ids[0] != slot->id()) { continue; } // only key column of bloom filter will push down to storage engine if (is_key_column(slot->col_name())) { filter_conjuncts_index.emplace_back(conj_idx); _bloom_filters_push_down.emplace_back( slot->col_name(), (reinterpret_cast(pred))->get_bloom_filter_func()); } } } std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); return Status::OK(); } Status VOlapScanNode::start_scan_thread(RuntimeState* state) { if (_scan_ranges.empty()) { _transfer_done = true; return Status::OK(); } auto span = opentelemetry::trace::Tracer::GetCurrentSpan(); _block_mem_tracker = MemTracker::create_virtual_tracker(-1, "VOlapScanNode:Block"); // ranges constructed from scan keys std::vector> cond_ranges; RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges)); // if we can't get ranges from conditions, we give it a total range if (cond_ranges.empty()) { cond_ranges.emplace_back(new OlapScanRange()); } bool need_split = true; // If we have ranges more than 64, there is no need to call // ShowHint to split ranges if (limit() != -1 || cond_ranges.size() > 64) { need_split = false; } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); std::unordered_set disk_set; for (auto& scan_range : _scan_ranges) { auto tablet_id = scan_range->tablet_id; std::string err; TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); if (tablet == nullptr) { std::stringstream ss; ss << "failed to get tablet: " << tablet_id << ", reason: " << err; LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } std::vector>* ranges = &cond_ranges; std::vector> split_ranges; if (need_split && !tablet->all_beta()) { auto st = get_hints(tablet, *scan_range, config::doris_scan_range_row_count, _scan_keys.begin_include(), _scan_keys.end_include(), cond_ranges, &split_ranges, _runtime_profile.get()); if (st.ok()) { ranges = &split_ranges; } } int size_based_scanners_per_tablet = 1; if (config::doris_scan_range_max_mb > 0) { size_based_scanners_per_tablet = std::max( 1, (int)tablet->tablet_footprint() / config::doris_scan_range_max_mb << 20); } int ranges_per_scanner = std::max(1, (int)ranges->size() / std::min(scanners_per_tablet, size_based_scanners_per_tablet)); int num_ranges = ranges->size(); for (int i = 0; i < num_ranges;) { std::vector scanner_ranges; scanner_ranges.push_back((*ranges)[i].get()); ++i; for (int j = 1; i < num_ranges && j < ranges_per_scanner && (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include; ++j, ++i) { scanner_ranges.push_back((*ranges)[i].get()); } VOlapScanner* scanner = new VOlapScanner(state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, _scanner_mem_tracker); // add scanner to pool before doing prepare. // so that scanner can be automatically deconstructed if prepare failed. _scanner_pool.add(scanner); RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter, _bloom_filters_push_down)); _volap_scanners.push_back(scanner); disk_set.insert(scanner->scan_disk()); } } COUNTER_SET(_num_disks_accessed_counter, static_cast(disk_set.size())); COUNTER_SET(_num_scanners, static_cast(_volap_scanners.size())); telemetry::set_span_attribute(span, _num_disks_accessed_counter); telemetry::set_span_attribute(span, _num_scanners); // init progress std::stringstream ss; ss << "ScanThread complete (node=" << id() << "):"; _progress = ProgressUpdater(ss.str(), _volap_scanners.size(), 1); _transfer_thread.reset(new std::thread( [this, state, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { opentelemetry::trace::Scope scope {parent_span}; transfer_thread(state); })); return Status::OK(); } Status VOlapScanNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::close"); // change done status { std::unique_lock l(_blocks_lock); _transfer_done = true; } // notify all scanner thread _block_consumed_cv.notify_all(); _block_added_cv.notify_all(); _scan_block_added_cv.notify_all(); // join transfer thread if (_transfer_thread) { _transfer_thread->join(); } // clear some block in queue // TODO: The presence of transfer_thread here may cause Block's memory alloc and be released not in a thread, // which may lead to potential performance problems. we should rethink whether to delete the transfer thread std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(), std::default_delete()); _materialized_row_batches_bytes = 0; std::for_each(_scan_blocks.begin(), _scan_blocks.end(), std::default_delete()); _scan_row_batches_bytes = 0; std::for_each(_free_blocks.begin(), _free_blocks.end(), std::default_delete()); _block_mem_tracker->release(_buffered_bytes); // OlapScanNode terminate by exception // so that initiative close the Scanner for (auto scanner : _volap_scanners) { scanner->close(state); } for (auto& filter_desc : _runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter); DCHECK(runtime_filter != nullptr); runtime_filter->consumer_close(); } for (auto& ctx : _stale_vexpr_ctxs) { (*ctx)->close(state); } VLOG_CRITICAL << "VOlapScanNode::close()"; return ScanNode::close(state); } Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VOlapScanNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker()); // check if Canceled. if (state->is_cancelled()) { std::unique_lock l(_blocks_lock); _transfer_done = true; std::lock_guard guard(_status_mutex); if (LIKELY(_status.ok())) { _status = Status::Cancelled("Cancelled"); } return _status; } // check if started. if (!_start) { Status status = start_scan(state); if (!status.ok()) { LOG(ERROR) << "StartScan Failed cause " << status.get_error_msg(); *eos = true; return status; } _start = true; } // some conjuncts will be disposed in start_scan function, so // we should check _eos after call start_scan if (_eos) { *eos = true; return Status::OK(); } // wait for block from queue Block* materialized_block = nullptr; { std::unique_lock l(_blocks_lock); SCOPED_TIMER(_olap_wait_batch_queue_timer); while (_materialized_blocks.empty() && !_transfer_done) { if (state->is_cancelled()) { _transfer_done = true; } // use wait_for, not wait, in case to capture the state->is_cancelled() _block_added_cv.wait_for(l, std::chrono::seconds(1)); } if (!_materialized_blocks.empty()) { materialized_block = _materialized_blocks.back(); DCHECK(materialized_block != nullptr); _materialized_blocks.pop_back(); _materialized_row_batches_bytes -= materialized_block->allocated_bytes(); } } // return block if (nullptr != materialized_block) { // notify scanner _block_consumed_cv.notify_one(); // get scanner's block memory block->swap(*materialized_block); VLOG_ROW << "VOlapScanNode output rows: " << block->rows(); reached_limit(block, eos); // reach scan node limit if (*eos) { { std::unique_lock l(_blocks_lock); _transfer_done = true; } _block_consumed_cv.notify_all(); *eos = true; LOG(INFO) << "VOlapScanNode ReachedLimit."; } else { *eos = false; } { // ReThink whether the SpinLock Better std::lock_guard l(_free_blocks_lock); _free_blocks.emplace_back(materialized_block); } return Status::OK(); } // all scanner done, change *eos to true *eos = true; std::lock_guard guard(_status_mutex); return _status; } Block* VOlapScanNode::_alloc_block(bool& get_free_block) { { std::lock_guard l(_free_blocks_lock); if (!_free_blocks.empty()) { auto block = _free_blocks.back(); _free_blocks.pop_back(); return block; } } get_free_block = false; auto block = new Block(_tuple_desc->slots(), _block_size); _buffered_bytes += block->allocated_bytes(); return block; } int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) { std::list olap_scanners; int assigned_thread_num = _running_thread; size_t max_thread = config::doris_scanner_queue_size; if (config::doris_scanner_row_num > state->batch_size()) { max_thread /= config::doris_scanner_row_num / state->batch_size(); if (max_thread <= 0) max_thread = 1; } // copy to local { // How many thread can apply to this query size_t thread_slot_num = 0; { if (_scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) { std::lock_guard l(_free_blocks_lock); thread_slot_num = _free_blocks.size() / block_per_scanner; thread_slot_num += (_free_blocks.size() % block_per_scanner != 0); thread_slot_num = std::min(thread_slot_num, max_thread - assigned_thread_num); if (thread_slot_num <= 0) { thread_slot_num = 1; } } else { std::lock_guard l(_scan_blocks_lock); if (_scan_blocks.empty()) { // Just for notify if _scan_blocks is empty and no running thread if (assigned_thread_num == 0) { thread_slot_num = 1; // NOTE: if olap_scanners_ is empty, scanner_done_ should be true } } } } { std::lock_guard l(_volap_scanners_lock); thread_slot_num = std::min(thread_slot_num, _volap_scanners.size()); for (int i = 0; i < thread_slot_num && !_volap_scanners.empty();) { auto scanner = _volap_scanners.front(); _volap_scanners.pop_front(); if (scanner->need_to_close()) { scanner->close(state); } else { olap_scanners.push_back(scanner); _running_thread++; assigned_thread_num++; i++; } } } } // post volap scanners to thread-pool PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan(); auto iter = olap_scanners.begin(); while (iter != olap_scanners.end()) { PriorityThreadPool::Task task; task.work_function = [this, scanner = *iter, parent_span = cur_span] { opentelemetry::trace::Scope scope {parent_span}; this->scanner_thread(scanner); }; task.priority = _nice; task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); (*iter)->start_wait_worker_timer(); COUNTER_UPDATE(_scanner_sched_counter, 1); if (thread_pool->offer(task)) { olap_scanners.erase(iter++); } else { LOG(FATAL) << "Failed to assign scanner task to thread pool!"; } ++_total_assign_num; } return assigned_thread_num; } // PlanFragmentExecutor will call this method to set scan range // Doris scan range is defined in thrift file like this // struct TPaloScanRange { // 1: required list hosts // 2: required string schema_hash // 3: required string version // 5: required Types.TTabletId tablet_id // 6: required string db_name // 7: optional list partition_column_ranges // 8: optional string index_name // 9: optional string table_name //} // every doris_scan_range is related with one tablet so that one olap scan node contains multiple tablet Status VOlapScanNode::set_scan_ranges(const std::vector& scan_ranges) { for (auto& scan_range : scan_ranges) { DCHECK(scan_range.scan_range.__isset.palo_scan_range); _scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range)); COUNTER_UPDATE(_tablet_counter, 1); } telemetry::set_current_span_attribute(_tablet_counter); return Status::OK(); } Status VOlapScanNode::get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range, int block_row_count, bool is_begin_include, bool is_end_include, const std::vector>& scan_key_range, std::vector>* sub_scan_range, RuntimeProfile* profile) { RuntimeProfile::Counter* show_hints_timer = profile->get_counter("ShowHintsTime_V1"); std::vector> ranges; bool have_valid_range = false; for (auto& key_range : scan_key_range) { if (key_range->begin_scan_range.size() == 1 && key_range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) { continue; } SCOPED_TIMER(show_hints_timer); Status res = Status::OK(); std::vector range; res = table->split_range(key_range->begin_scan_range, key_range->end_scan_range, block_row_count, &range); if (!res.ok()) { return Status::InternalError("fail to show hints"); } ranges.emplace_back(std::move(range)); have_valid_range = true; } if (!have_valid_range) { std::vector range; auto res = table->split_range({}, {}, block_row_count, &range); if (!res.ok()) { return Status::InternalError("fail to show hints"); } ranges.emplace_back(std::move(range)); } for (int i = 0; i < ranges.size(); ++i) { for (int j = 0; j < ranges[i].size(); j += 2) { std::unique_ptr range(new OlapScanRange); range->begin_scan_range.reset(); range->begin_scan_range = ranges[i][j]; range->end_scan_range.reset(); range->end_scan_range = ranges[i][j + 1]; if (0 == j) { range->begin_include = is_begin_include; } else { range->begin_include = true; } if (j + 2 == ranges[i].size()) { range->end_include = is_end_include; } else { range->end_include = false; } sub_scan_range->emplace_back(std::move(range)); } } return Status::OK(); } } // namespace doris::vectorized