* [feature](planner): push limit to olapscan when meet sort.
* if olap_scan_node's sort_info is set, push sort_limit, read_orderby_key
and read_orderby_key_reverse for olap scanner
* There is a common query pattern to find latest time serials data.
eg. SELECT * from t_log WHERE t>t1 AND t<t2 ORDER BY t DESC LIMIT 100
If the ORDER BY columns is the prefix of the sort key of table, it can
be greatly optimized to read much fewer data instead of read all data
between t1 and t2.
By leveraging the same order of ORDER BY columns and sort key of table,
just read the LIMIT N rows for each related segment and merge N rows.
1. set read_orderby_key to true for read_params and _reader_context
if olap_scan_node's sort info is set.
2. set read_orderby_key_reverse to true for read_params and _reader_context
if is_asc_order is false.
3. rowset reader force merge read segments if read_orderby_key is true.
4. block reader and tablet reader force merge read rowsets if read_orderby_key is true.
5. for ORDER BY DESC, read and compare in reverse order
5.1 segment iterator read backward using a new BackwardBitmapRangeIterator and
reverse the result block before return to caller.
5.2 VCollectIterator::LevelIteratorComparator, VMergeIteratorContext return
opposite result for _is_reverse order in its compare function.
Co-authored-by: jackwener <jakevingoo@gmail.com>
1879 lines
79 KiB
C++
1879 lines
79 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include "vec/exec/volap_scan_node.h"
|
|
|
|
#include "common/resource_tls.h"
|
|
#include "exec/scan_node.h"
|
|
#include "gen_cpp/PlanNodes_types.h"
|
|
#include "olap/storage_engine.h"
|
|
#include "runtime/descriptors.h"
|
|
#include "runtime/exec_env.h"
|
|
#include "runtime/large_int_value.h"
|
|
#include "runtime/runtime_filter_mgr.h"
|
|
#include "util/priority_thread_pool.hpp"
|
|
#include "util/to_string.h"
|
|
#include "vec/core/block.h"
|
|
#include "vec/data_types/data_type_decimal.h"
|
|
#include "vec/exec/volap_scanner.h"
|
|
#include "vec/exprs/vbloom_predicate.h"
|
|
#include "vec/exprs/vcompound_pred.h"
|
|
#include "vec/exprs/vexpr.h"
|
|
#include "vec/exprs/vruntimefilter_wrapper.h"
|
|
#include "vec/functions/in.h"
|
|
|
|
namespace doris::vectorized {
|
|
using doris::operator<<;
|
|
|
|
#define RETURN_IF_PUSH_DOWN(stmt) \
|
|
if (!push_down) { \
|
|
stmt; \
|
|
} else { \
|
|
return; \
|
|
}
|
|
|
|
VOlapScanNode::VOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
|
|
: ScanNode(pool, tnode, descs),
|
|
_tuple_id(tnode.olap_scan_node.tuple_id),
|
|
_olap_scan_node(tnode.olap_scan_node),
|
|
_tuple_desc(nullptr),
|
|
_tuple_idx(0),
|
|
_eos(false),
|
|
_max_materialized_row_batches(config::doris_scanner_queue_size),
|
|
_start(false),
|
|
_scanner_done(false),
|
|
_transfer_done(false),
|
|
_status(Status::OK()),
|
|
_resource_info(nullptr),
|
|
_buffered_bytes(0),
|
|
_eval_conjuncts_fn(nullptr),
|
|
_runtime_filter_descs(tnode.runtime_filters),
|
|
_max_materialized_blocks(config::doris_scanner_queue_size) {
|
|
_materialized_blocks.reserve(_max_materialized_blocks);
|
|
_free_blocks.reserve(_max_materialized_blocks);
|
|
// if sort_info is set, push _limit to each olap scanner
|
|
if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) {
|
|
_limit_per_scanner = _olap_scan_node.sort_limit;
|
|
}
|
|
}
|
|
|
|
Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
|
RETURN_IF_ERROR(ExecNode::init(tnode, state));
|
|
|
|
const TQueryOptions& query_options = state->query_options();
|
|
if (query_options.__isset.max_scan_key_num) {
|
|
_max_scan_key_num = query_options.max_scan_key_num;
|
|
} else {
|
|
_max_scan_key_num = config::doris_max_scan_key_num;
|
|
}
|
|
|
|
if (query_options.__isset.max_pushdown_conditions_per_column) {
|
|
_max_pushdown_conditions_per_column = query_options.max_pushdown_conditions_per_column;
|
|
} else {
|
|
_max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
|
|
}
|
|
|
|
_max_scanner_queue_size_bytes = query_options.mem_limit / 20; //TODO: session variable percent
|
|
|
|
/// TODO: could one filter used in the different scan_node ?
|
|
int filter_size = _runtime_filter_descs.size();
|
|
_runtime_filter_ctxs.resize(filter_size);
|
|
_runtime_filter_ready_flag.resize(filter_size);
|
|
for (int i = 0; i < filter_size; ++i) {
|
|
IRuntimeFilter* runtime_filter = nullptr;
|
|
const auto& filter_desc = _runtime_filter_descs[i];
|
|
RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
|
|
RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id()));
|
|
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
|
|
&runtime_filter));
|
|
|
|
_runtime_filter_ctxs[i].runtimefilter = runtime_filter;
|
|
_runtime_filter_ready_flag[i] = false;
|
|
_rf_locks.push_back(std::make_unique<std::mutex>());
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
void VOlapScanNode::init_scan_profile() {
|
|
std::string scanner_profile_name = "VOlapScanner";
|
|
if (_olap_scan_node.__isset.table_name) {
|
|
scanner_profile_name = fmt::format("VOlapScanner({0})", _olap_scan_node.table_name);
|
|
}
|
|
_scanner_profile.reset(new RuntimeProfile(scanner_profile_name));
|
|
runtime_profile()->add_child(_scanner_profile.get(), true, nullptr);
|
|
|
|
_segment_profile.reset(new RuntimeProfile("SegmentIterator"));
|
|
_scanner_profile->add_child(_segment_profile.get(), true, nullptr);
|
|
}
|
|
|
|
void VOlapScanNode::_init_counter(RuntimeState* state) {
|
|
ADD_TIMER(_scanner_profile, "ShowHintsTime_V1");
|
|
|
|
_reader_init_timer = ADD_TIMER(_scanner_profile, "ReaderInitTime");
|
|
_read_compressed_counter = ADD_COUNTER(_segment_profile, "CompressedBytesRead", TUnit::BYTES);
|
|
_read_uncompressed_counter =
|
|
ADD_COUNTER(_segment_profile, "UncompressedBytesRead", TUnit::BYTES);
|
|
_block_load_timer = ADD_TIMER(_segment_profile, "BlockLoadTime");
|
|
_block_load_counter = ADD_COUNTER(_segment_profile, "BlocksLoad", TUnit::UNIT);
|
|
_block_fetch_timer = ADD_TIMER(_scanner_profile, "BlockFetchTime");
|
|
_raw_rows_counter = ADD_COUNTER(_segment_profile, "RawRowsRead", TUnit::UNIT);
|
|
_block_convert_timer = ADD_TIMER(_scanner_profile, "BlockConvertTime");
|
|
// Will be delete after non-vectorized code is removed
|
|
_block_seek_timer = ADD_TIMER(_segment_profile, "BlockSeekTime");
|
|
_block_seek_counter = ADD_COUNTER(_segment_profile, "BlockSeekCount", TUnit::UNIT);
|
|
_block_init_timer = ADD_TIMER(_segment_profile, "BlockInitTime");
|
|
_block_init_seek_timer = ADD_TIMER(_segment_profile, "BlockInitSeekTime");
|
|
_block_init_seek_counter = ADD_COUNTER(_segment_profile, "BlockInitSeekCount", TUnit::UNIT);
|
|
|
|
_rows_vec_cond_counter = ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT);
|
|
_vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime");
|
|
_short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime");
|
|
_first_read_timer = ADD_TIMER(_segment_profile, "FirstReadTime");
|
|
_first_read_seek_timer = ADD_TIMER(_segment_profile, "FirstReadSeekTime");
|
|
_first_read_seek_counter = ADD_COUNTER(_segment_profile, "FirstReadSeekCount", TUnit::UNIT);
|
|
|
|
_lazy_read_timer = ADD_TIMER(_segment_profile, "LazyReadTime");
|
|
_lazy_read_seek_timer = ADD_TIMER(_segment_profile, "LazyReadSeekTime");
|
|
_lazy_read_seek_counter = ADD_COUNTER(_segment_profile, "LazyReadSeekCount", TUnit::UNIT);
|
|
|
|
_output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime");
|
|
|
|
_stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT);
|
|
_bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT);
|
|
_del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT);
|
|
_conditions_filtered_counter =
|
|
ADD_COUNTER(_segment_profile, "RowsConditionsFiltered", TUnit::UNIT);
|
|
_key_range_filtered_counter =
|
|
ADD_COUNTER(_segment_profile, "RowsKeyRangeFiltered", TUnit::UNIT);
|
|
|
|
_io_timer = ADD_TIMER(_segment_profile, "IOTimer");
|
|
_decompressor_timer = ADD_TIMER(_segment_profile, "DecompressorTimer");
|
|
_index_load_timer = ADD_TIMER(_segment_profile, "IndexLoadTime_V1");
|
|
|
|
_scan_timer = ADD_TIMER(_scanner_profile, "ScanTime");
|
|
_scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScanCpuTime");
|
|
|
|
_total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT);
|
|
_cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT);
|
|
|
|
_bitmap_index_filter_counter =
|
|
ADD_COUNTER(_segment_profile, "RowsBitmapIndexFiltered", TUnit::UNIT);
|
|
_bitmap_index_filter_timer = ADD_TIMER(_segment_profile, "BitmapIndexFilterTimer");
|
|
|
|
_num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
|
|
|
|
_filtered_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentFiltered", TUnit::UNIT);
|
|
_total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT);
|
|
|
|
// time of transfer thread to wait for row batch from scan thread
|
|
_scanner_wait_batch_timer = ADD_TIMER(_runtime_profile, "ScannerBatchWaitTime");
|
|
// time of scan thread to wait for worker thread of the thread pool
|
|
_scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, "ScannerWorkerWaitTime");
|
|
|
|
// time of node to wait for batch/block queue
|
|
_olap_wait_batch_queue_timer = ADD_TIMER(_runtime_profile, "BatchQueueWaitTime");
|
|
|
|
// for the purpose of debugging or profiling
|
|
for (int i = 0; i < GENERAL_DEBUG_COUNT; ++i) {
|
|
char name[64];
|
|
snprintf(name, sizeof(name), "GeneralDebugTimer%d", i);
|
|
_general_debug_timer[i] = ADD_TIMER(_segment_profile, name);
|
|
}
|
|
}
|
|
|
|
Status VOlapScanNode::prepare(RuntimeState* state) {
|
|
init_scan_profile();
|
|
RETURN_IF_ERROR(ScanNode::prepare(state));
|
|
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
|
// create scanner profile
|
|
// create timer
|
|
_tablet_counter = ADD_COUNTER(runtime_profile(), "TabletCount ", TUnit::UNIT);
|
|
_scanner_sched_counter = ADD_COUNTER(runtime_profile(), "ScannerSchedCount ", TUnit::UNIT);
|
|
|
|
_rows_pushed_cond_filtered_counter =
|
|
ADD_COUNTER(_scanner_profile, "RowsPushedCondFiltered", TUnit::UNIT);
|
|
_init_counter(state);
|
|
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
|
|
|
|
_scanner_mem_tracker = std::make_unique<MemTracker>("OlapScanners");
|
|
|
|
if (_tuple_desc == nullptr) {
|
|
// TODO: make sure we print all available diagnostic output to our error log
|
|
return Status::InternalError("Failed to get tuple descriptor.");
|
|
}
|
|
|
|
const std::vector<SlotDescriptor*>& slots = _tuple_desc->slots();
|
|
|
|
for (int i = 0; i < slots.size(); ++i) {
|
|
if (!slots[i]->is_materialized()) {
|
|
continue;
|
|
}
|
|
|
|
if (slots[i]->type().is_collection_type()) {
|
|
_collection_slots.push_back(slots[i]);
|
|
}
|
|
|
|
if (slots[i]->type().is_string_type()) {
|
|
_string_slots.push_back(slots[i]);
|
|
}
|
|
}
|
|
|
|
_runtime_state = state;
|
|
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
|
|
IRuntimeFilter* runtime_filter = nullptr;
|
|
state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
|
|
&runtime_filter);
|
|
DCHECK(runtime_filter != nullptr);
|
|
runtime_filter->init_profile(_runtime_profile.get());
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::open(RuntimeState* state) {
|
|
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::open");
|
|
VLOG_CRITICAL << "VOlapScanNode::Open";
|
|
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
|
RETURN_IF_CANCELLED(state);
|
|
RETURN_IF_ERROR(ExecNode::open(state));
|
|
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
|
|
|
_resource_info = ResourceTls::get_resource_tls();
|
|
|
|
// acquire runtime filter
|
|
_runtime_filter_ctxs.resize(_runtime_filter_descs.size());
|
|
|
|
std::vector<VExpr*> vexprs;
|
|
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
|
|
auto& filter_desc = _runtime_filter_descs[i];
|
|
IRuntimeFilter* runtime_filter = nullptr;
|
|
state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
|
|
DCHECK(runtime_filter != nullptr);
|
|
if (runtime_filter == nullptr) {
|
|
continue;
|
|
}
|
|
bool ready = runtime_filter->is_ready();
|
|
if (!ready) {
|
|
ready = runtime_filter->await();
|
|
}
|
|
if (ready) {
|
|
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
|
|
_runtime_filter_ctxs[i].apply_mark = true;
|
|
_runtime_filter_ctxs[i].runtimefilter = runtime_filter;
|
|
}
|
|
}
|
|
RETURN_IF_ERROR(_append_rf_into_conjuncts(state, vexprs));
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
void VOlapScanNode::transfer_thread(RuntimeState* state) {
|
|
// scanner open pushdown to scanThread
|
|
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::transfer_thread");
|
|
SCOPED_ATTACH_TASK(state);
|
|
Status status = Status::OK();
|
|
|
|
if (_vconjunct_ctx_ptr) {
|
|
for (auto scanner : _volap_scanners) {
|
|
status = (*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr());
|
|
if (!status.ok()) {
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
_status = status;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*********************************
|
|
* 优先级调度基本策略:
|
|
* 1. 通过查询拆分的Range个数来确定初始nice值
|
|
* Range个数越多,越倾向于认定为大查询,nice值越小
|
|
* 2. 通过查询累计读取的数据量来调整nice值
|
|
* 读取的数据越多,越倾向于认定为大查询,nice值越小
|
|
* 3. 通过nice值来判断查询的优先级
|
|
* nice值越大的,越优先获得的查询资源
|
|
* 4. 定期提高队列内残留任务的优先级,避免大查询完全饿死
|
|
*********************************/
|
|
_total_assign_num = 0;
|
|
_nice = 18 + std::max(0, 2 - (int)_volap_scanners.size() / 5);
|
|
|
|
auto doris_scanner_row_num =
|
|
_limit == -1 ? config::doris_scanner_row_num
|
|
: std::min(static_cast<int64_t>(config::doris_scanner_row_num), _limit);
|
|
_block_size = _limit == -1 ? state->batch_size()
|
|
: std::min(static_cast<int64_t>(state->batch_size()), _limit);
|
|
auto block_per_scanner = (doris_scanner_row_num + (_block_size - 1)) / _block_size;
|
|
auto pre_block_count =
|
|
std::min(_volap_scanners.size(),
|
|
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) *
|
|
block_per_scanner;
|
|
|
|
for (int i = 0; i < pre_block_count; ++i) {
|
|
auto block = new Block(_tuple_desc->slots(), _block_size);
|
|
_free_blocks.emplace_back(block);
|
|
_buffered_bytes += block->allocated_bytes();
|
|
}
|
|
|
|
// read from scanner
|
|
while (LIKELY(status.ok())) {
|
|
int assigned_thread_num = _start_scanner_thread_task(state, block_per_scanner);
|
|
|
|
std::vector<Block*> blocks;
|
|
{
|
|
// 1 scanner idle task not empty, assign new scanner task
|
|
std::unique_lock<std::mutex> l(_scan_blocks_lock);
|
|
|
|
// scanner_row_num = 16k
|
|
// 16k * 10 * 12 * 8 = 15M(>2s) --> nice=10
|
|
// 16k * 20 * 22 * 8 = 55M(>6s) --> nice=0
|
|
while (_nice > 0 && _total_assign_num > (22 - _nice) * (20 - _nice) * 6) {
|
|
--_nice;
|
|
}
|
|
|
|
// 2 wait when all scanner are running & no result in queue
|
|
while (UNLIKELY(_running_thread == assigned_thread_num && _scan_blocks.empty() &&
|
|
!_scanner_done)) {
|
|
SCOPED_TIMER(_scanner_wait_batch_timer);
|
|
_scan_block_added_cv.wait(l);
|
|
}
|
|
|
|
// 3 transfer result block when queue is not empty
|
|
if (LIKELY(!_scan_blocks.empty())) {
|
|
blocks.swap(_scan_blocks);
|
|
for (auto b : blocks) {
|
|
_scan_row_batches_bytes -= b->allocated_bytes();
|
|
}
|
|
// delete scan_block if transfer thread should be stopped
|
|
// because scan_block wouldn't be useful anymore
|
|
if (UNLIKELY(_transfer_done)) {
|
|
std::for_each(blocks.begin(), blocks.end(), std::default_delete<Block>());
|
|
blocks.clear();
|
|
}
|
|
} else {
|
|
if (_scanner_done) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!blocks.empty()) {
|
|
_add_blocks(blocks);
|
|
}
|
|
}
|
|
telemetry::set_span_attribute(span, _scanner_sched_counter);
|
|
|
|
VLOG_CRITICAL << "TransferThread finish.";
|
|
_transfer_done = true;
|
|
_block_added_cv.notify_all();
|
|
{
|
|
std::unique_lock<std::mutex> l(_scan_blocks_lock);
|
|
_scan_thread_exit_cv.wait(l, [this] { return _running_thread == 0; });
|
|
}
|
|
VLOG_CRITICAL << "Scanner threads have been exited. TransferThread exit.";
|
|
}
|
|
|
|
void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
|
|
START_AND_SCOPE_SPAN(scanner->runtime_state()->get_tracer(), span,
|
|
"VOlapScanNode::scanner_thread");
|
|
SCOPED_ATTACH_TASK(_runtime_state);
|
|
Thread::set_self_name("volap_scanner");
|
|
int64_t wait_time = scanner->update_wait_worker_timer();
|
|
// Do not use ScopedTimer. There is no guarantee that, the counter
|
|
// (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`.
|
|
ThreadCpuStopWatch cpu_watch;
|
|
cpu_watch.start();
|
|
Status status = Status::OK();
|
|
bool eos = false;
|
|
RuntimeState* state = scanner->runtime_state();
|
|
DCHECK(nullptr != state);
|
|
if (!scanner->is_open()) {
|
|
status = scanner->open();
|
|
if (!status.ok()) {
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
_status = status;
|
|
eos = true;
|
|
}
|
|
scanner->set_opened();
|
|
}
|
|
|
|
/*
|
|
// the following code will cause coredump when running tpcds_sf1 sqls,
|
|
// disable temporariy to avoid it, SHOULD BE FIX LATER
|
|
std::vector<VExpr*> vexprs;
|
|
auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks();
|
|
DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
|
|
for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
|
|
if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) {
|
|
/// When runtime filters are ready during running, we should use them to filter data
|
|
/// in VOlapScanner.
|
|
/// New arrival rf will be processed as below:
|
|
/// 1. convert these runtime filters to vectorized expressions
|
|
/// 2. if this is the first scanner thread to receive this rf, construct a new
|
|
/// VExprContext and update `_vconjunct_ctx_ptr` in scan node. Notice that we use
|
|
/// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only
|
|
/// once after any runtime_filters are ready.
|
|
/// 3. finally, just copy this new VExprContext to scanner and use it to filter data.
|
|
IRuntimeFilter* runtime_filter = nullptr;
|
|
state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
|
|
&runtime_filter);
|
|
DCHECK(runtime_filter != nullptr);
|
|
bool ready = runtime_filter->is_ready();
|
|
if (ready) {
|
|
runtime_filter->get_prepared_vexprs(&vexprs, row_desc());
|
|
scanner_filter_apply_marks[i] = true;
|
|
if (!_runtime_filter_ready_flag[i] && !vexprs.empty()) {
|
|
std::unique_lock<std::mutex> l(*(_rf_locks[i]));
|
|
if (!_runtime_filter_ready_flag[i]) {
|
|
// Use all conjuncts and new arrival runtime filters to construct a new
|
|
// expression tree here.
|
|
auto last_expr =
|
|
_vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0];
|
|
for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
|
|
if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) {
|
|
continue;
|
|
}
|
|
TExprNode texpr_node;
|
|
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
|
|
texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
|
|
texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
|
|
VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
|
|
new_node->add_child(last_expr);
|
|
new_node->add_child(vexprs[j]);
|
|
last_expr = new_node;
|
|
_rf_vexpr_set.insert(vexprs[j]);
|
|
}
|
|
auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
|
|
auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc());
|
|
// If error occurs in `prepare` or `open` phase, discard these runtime
|
|
// filters directly.
|
|
if (UNLIKELY(!expr_status.OK())) {
|
|
LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
|
|
vexprs.clear();
|
|
break;
|
|
}
|
|
expr_status = new_vconjunct_ctx_ptr->open(state);
|
|
if (UNLIKELY(!expr_status.OK())) {
|
|
LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
|
|
vexprs.clear();
|
|
break;
|
|
}
|
|
if (_vconjunct_ctx_ptr) {
|
|
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
|
|
}
|
|
_vconjunct_ctx_ptr.reset(new VExprContext*);
|
|
*(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
|
|
_runtime_filter_ready_flag[i] = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!vexprs.empty()) {
|
|
if (*scanner->vconjunct_ctx_ptr()) {
|
|
(*scanner->vconjunct_ctx_ptr())->close(state);
|
|
*scanner->vconjunct_ctx_ptr() = nullptr;
|
|
}
|
|
WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()),
|
|
"Something wrong for runtime filters: ");
|
|
scanner->set_use_pushdown_conjuncts(true);
|
|
}
|
|
*/
|
|
|
|
std::vector<Block*> blocks;
|
|
|
|
// Because we use thread pool to scan data from storage. One scanner can't
|
|
// use this thread too long, this can starve other query's scanner. So, we
|
|
// need yield this thread when we do enough work. However, OlapStorage read
|
|
// data in pre-aggregate mode, then we can't use storage returned data to
|
|
// judge if we need to yield. So we record all raw data read in this round
|
|
// scan, if this exceed row number or bytes threshold, we yield this thread.
|
|
int64_t raw_rows_read = scanner->raw_rows_read();
|
|
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
|
|
int64_t raw_bytes_read = 0;
|
|
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
|
|
bool get_free_block = true;
|
|
int num_rows_in_block = 0;
|
|
|
|
// Has to wait at least one full block, or it will cause a lot of schedule task in priority
|
|
// queue, it will affect query latency and query concurrency for example ssb 3.3.
|
|
while (!eos && raw_bytes_read < raw_bytes_threshold &&
|
|
((raw_rows_read < raw_rows_threshold && get_free_block) ||
|
|
num_rows_in_block < _runtime_state->batch_size())) {
|
|
if (UNLIKELY(_transfer_done)) {
|
|
eos = true;
|
|
status = Status::Cancelled("Cancelled");
|
|
LOG(INFO) << "Scan thread cancelled, cause query done, maybe reach limit.";
|
|
break;
|
|
}
|
|
|
|
auto block = _alloc_block(get_free_block);
|
|
status = scanner->get_block(_runtime_state, block, &eos);
|
|
VLOG_ROW << "VOlapScanNode input rows: " << block->rows();
|
|
if (!status.ok()) {
|
|
LOG(WARNING) << "Scan thread read VOlapScanner failed: " << status.to_string();
|
|
// Add block ptr in blocks, prevent mem leak in read failed
|
|
blocks.push_back(block);
|
|
eos = true;
|
|
break;
|
|
}
|
|
|
|
raw_bytes_read += block->bytes();
|
|
num_rows_in_block += block->rows();
|
|
// 4. if status not ok, change status_.
|
|
if (UNLIKELY(block->rows() == 0)) {
|
|
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
|
_free_blocks.emplace_back(block);
|
|
} else {
|
|
if (!blocks.empty() &&
|
|
blocks.back()->rows() + block->rows() <= _runtime_state->batch_size()) {
|
|
MutableBlock(blocks.back()).merge(*block);
|
|
block->clear_column_data();
|
|
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
|
_free_blocks.emplace_back(block);
|
|
} else {
|
|
blocks.push_back(block);
|
|
}
|
|
}
|
|
raw_rows_read = scanner->raw_rows_read();
|
|
}
|
|
|
|
{
|
|
// if we failed, check status.
|
|
if (UNLIKELY(!status.ok())) {
|
|
_transfer_done = true;
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
if (LIKELY(_status.ok())) {
|
|
_status = status;
|
|
}
|
|
}
|
|
|
|
bool global_status_ok = false;
|
|
{
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
global_status_ok = _status.ok();
|
|
}
|
|
if (UNLIKELY(!global_status_ok)) {
|
|
eos = true;
|
|
std::for_each(blocks.begin(), blocks.end(), std::default_delete<Block>());
|
|
} else {
|
|
std::lock_guard<std::mutex> l(_scan_blocks_lock);
|
|
_scan_blocks.insert(_scan_blocks.end(), blocks.begin(), blocks.end());
|
|
for (auto b : blocks) {
|
|
_scan_row_batches_bytes += b->allocated_bytes();
|
|
}
|
|
}
|
|
// If eos is true, we will process out of this lock block.
|
|
if (eos) {
|
|
scanner->mark_to_need_to_close();
|
|
}
|
|
std::lock_guard<std::mutex> l(_volap_scanners_lock);
|
|
_volap_scanners.push_front(scanner);
|
|
}
|
|
if (eos) {
|
|
std::lock_guard<std::mutex> l(_scan_blocks_lock);
|
|
_progress.update(1);
|
|
if (_progress.done()) {
|
|
// this is the right out
|
|
_scanner_done = true;
|
|
}
|
|
}
|
|
_scan_cpu_timer->update(cpu_watch.elapsed_time());
|
|
_scanner_wait_worker_timer->update(wait_time);
|
|
|
|
std::unique_lock<std::mutex> l(_scan_blocks_lock);
|
|
_running_thread--;
|
|
|
|
// The transfer thead will wait for `_running_thread==0`, to make sure all scanner threads won't access class members.
|
|
// Do not access class members after this code.
|
|
_scan_block_added_cv.notify_one();
|
|
_scan_thread_exit_cv.notify_one();
|
|
}
|
|
|
|
Status VOlapScanNode::_add_blocks(std::vector<Block*>& block) {
|
|
{
|
|
std::unique_lock<std::mutex> l(_blocks_lock);
|
|
|
|
// check queue limit for both block queue size and bytes
|
|
while (UNLIKELY((_materialized_blocks.size() >= _max_materialized_blocks ||
|
|
_materialized_row_batches_bytes >= _max_scanner_queue_size_bytes / 2) &&
|
|
!_transfer_done)) {
|
|
_block_consumed_cv.wait(l);
|
|
}
|
|
|
|
VLOG_CRITICAL << "Push block to materialized_blocks";
|
|
_materialized_blocks.insert(_materialized_blocks.end(), block.cbegin(), block.cend());
|
|
for (auto b : block) {
|
|
_materialized_row_batches_bytes += b->allocated_bytes();
|
|
}
|
|
}
|
|
// remove one block, notify main thread
|
|
_block_added_cv.notify_one();
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::normalize_conjuncts() {
|
|
std::vector<SlotDescriptor*> slots = _tuple_desc->slots();
|
|
|
|
for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) {
|
|
switch (slots[slot_idx]->type().type) {
|
|
#define M(NAME) \
|
|
case TYPE_##NAME: { \
|
|
ColumnValueRange<TYPE_##NAME> range(slots[slot_idx]->col_name(), \
|
|
slots[slot_idx]->type().precision, \
|
|
slots[slot_idx]->type().scale); \
|
|
_id_to_slot_column_value_range[slots[slot_idx]->id()] = \
|
|
std::pair {slots[slot_idx], range}; \
|
|
break; \
|
|
}
|
|
#define APPLY_FOR_PRIMITIVE_TYPE(M) \
|
|
M(TINYINT) \
|
|
M(SMALLINT) \
|
|
M(INT) \
|
|
M(BIGINT) \
|
|
M(LARGEINT) \
|
|
M(CHAR) \
|
|
M(DATE) \
|
|
M(DATETIME) \
|
|
M(DATEV2) \
|
|
M(DATETIMEV2) \
|
|
M(VARCHAR) \
|
|
M(STRING) \
|
|
M(HLL) \
|
|
M(DECIMAL32) \
|
|
M(DECIMAL64) \
|
|
M(DECIMAL128) \
|
|
M(DECIMALV2) \
|
|
M(BOOLEAN)
|
|
APPLY_FOR_PRIMITIVE_TYPE(M)
|
|
#undef M
|
|
default: {
|
|
VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << slots[slot_idx]->col_name()
|
|
<< "]";
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (_vconjunct_ctx_ptr) {
|
|
if ((*_vconjunct_ctx_ptr)->root()) {
|
|
VExpr* new_root = _normalize_predicate(_runtime_state, (*_vconjunct_ctx_ptr)->root());
|
|
if (new_root) {
|
|
(*_vconjunct_ctx_ptr)->set_root(new_root);
|
|
} else {
|
|
(*(_vconjunct_ctx_ptr.get()))->mark_as_stale();
|
|
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
|
|
_vconjunct_ctx_ptr.reset(nullptr);
|
|
}
|
|
}
|
|
}
|
|
for (auto& it : _id_to_slot_column_value_range) {
|
|
std::visit(
|
|
[&](auto&& range) {
|
|
if (range.is_empty_value_range()) {
|
|
_eos = true;
|
|
}
|
|
},
|
|
it.second.second);
|
|
_column_value_ranges[it.second.first->col_name()] = it.second.second;
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
static std::string olap_filter_to_string(const doris::TCondition& condition) {
|
|
auto op_name = condition.condition_op;
|
|
if (condition.condition_op == "*=") {
|
|
op_name = "IN";
|
|
} else if (condition.condition_op == "!*=") {
|
|
op_name = "NOT IN";
|
|
}
|
|
return fmt::format("{{{} {} {}}}", condition.column_name, op_name,
|
|
to_string(condition.condition_values));
|
|
}
|
|
|
|
static std::string olap_filters_to_string(const std::vector<doris::TCondition>& filters) {
|
|
std::string filters_string;
|
|
filters_string += "[";
|
|
for (auto it = filters.cbegin(); it != filters.cend(); it++) {
|
|
if (it != filters.cbegin()) {
|
|
filters_string += ",";
|
|
}
|
|
filters_string += olap_filter_to_string(*it);
|
|
}
|
|
filters_string += "]";
|
|
return filters_string;
|
|
}
|
|
|
|
Status VOlapScanNode::build_key_ranges_and_filters() {
|
|
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
|
|
const std::vector<TPrimitiveType::type>& column_types = _olap_scan_node.key_column_type;
|
|
DCHECK(column_types.size() == column_names.size());
|
|
|
|
// 1. construct scan key except last olap engine short key
|
|
_scan_keys.set_is_convertible(limit() == -1);
|
|
|
|
// we use `exact_range` to identify a key range is an exact range or not when we convert
|
|
// it to `_scan_keys`. If `exact_range` is true, we can just discard it from `_olap_filter`.
|
|
bool exact_range = true;
|
|
for (int column_index = 0; column_index < column_names.size() && !_scan_keys.has_range_value();
|
|
++column_index) {
|
|
auto iter = _column_value_ranges.find(column_names[column_index]);
|
|
if (_column_value_ranges.end() == iter) {
|
|
break;
|
|
}
|
|
|
|
RETURN_IF_ERROR(std::visit(
|
|
[&](auto&& range) {
|
|
RETURN_IF_ERROR(
|
|
_scan_keys.extend_scan_key(range, _max_scan_key_num, &exact_range));
|
|
if (exact_range) {
|
|
_column_value_ranges.erase(iter->first);
|
|
}
|
|
return Status::OK();
|
|
},
|
|
iter->second));
|
|
}
|
|
|
|
for (auto& iter : _column_value_ranges) {
|
|
std::vector<TCondition> filters;
|
|
std::visit([&](auto&& range) { range.to_olap_filter(filters); }, iter.second);
|
|
|
|
for (const auto& filter : filters) {
|
|
_olap_filter.push_back(std::move(filter));
|
|
}
|
|
}
|
|
|
|
_runtime_profile->add_info_string("PushdownPredicate", olap_filters_to_string(_olap_filter));
|
|
|
|
_runtime_profile->add_info_string("KeyRanges", _scan_keys.debug_string());
|
|
|
|
VLOG_CRITICAL << _scan_keys.debug_string();
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::start_scan(RuntimeState* state) {
|
|
RETURN_IF_CANCELLED(state);
|
|
|
|
VLOG_CRITICAL << "NormalizeConjuncts";
|
|
RETURN_IF_ERROR(normalize_conjuncts());
|
|
|
|
if (_eos) {
|
|
return Status::OK();
|
|
}
|
|
|
|
VLOG_CRITICAL << "BuildKeyRangesAndFilters";
|
|
RETURN_IF_ERROR(build_key_ranges_and_filters());
|
|
|
|
VLOG_CRITICAL << "StartScanThread";
|
|
RETURN_IF_ERROR(start_scan_thread(state));
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
|
|
if ((slot->type().is_date_type() || slot->type().is_date_v2_type() ||
|
|
slot->type().is_datetime_v2_type()) &&
|
|
(expr->type().is_date_type() || expr->type().is_date_v2_type() ||
|
|
expr->type().is_datetime_v2_type())) {
|
|
return true;
|
|
}
|
|
if (slot->type().is_string_type() && expr->type().is_string_type()) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
template <bool IsFixed, PrimitiveType PrimitiveType, typename ChangeFixedValueRangeFunc>
|
|
Status VOlapScanNode::change_value_range(ColumnValueRange<PrimitiveType>& temp_range, void* value,
|
|
const ChangeFixedValueRangeFunc& func,
|
|
const std::string& fn_name, int slot_ref_child) {
|
|
if constexpr (PrimitiveType == TYPE_DATE) {
|
|
DateTimeValue date_value;
|
|
reinterpret_cast<VecDateTimeValue*>(value)->convert_vec_dt_to_dt(&date_value);
|
|
if constexpr (IsFixed) {
|
|
if (!date_value.check_loss_accuracy_cast_to_date()) {
|
|
func(temp_range,
|
|
reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>(
|
|
&date_value));
|
|
}
|
|
} else {
|
|
if (date_value.check_loss_accuracy_cast_to_date()) {
|
|
if (fn_name == "lt" || fn_name == "ge") {
|
|
++date_value;
|
|
}
|
|
}
|
|
func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
|
|
reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>(
|
|
&date_value));
|
|
}
|
|
} else if constexpr (PrimitiveType == TYPE_DATETIME) {
|
|
DateTimeValue date_value;
|
|
reinterpret_cast<VecDateTimeValue*>(value)->convert_vec_dt_to_dt(&date_value);
|
|
if constexpr (IsFixed) {
|
|
func(temp_range,
|
|
reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>(
|
|
&date_value));
|
|
} else {
|
|
func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
|
|
reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>(
|
|
reinterpret_cast<char*>(&date_value)));
|
|
}
|
|
} else if constexpr (PrimitiveType == TYPE_DATEV2) {
|
|
DateV2Value<DateTimeV2ValueType> datetimev2_value =
|
|
*reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(value);
|
|
if constexpr (IsFixed) {
|
|
if (datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
|
|
DateV2Value<DateV2ValueType> date_v2;
|
|
date_v2.set_date_uint32(
|
|
binary_cast<DateV2Value<DateTimeV2ValueType>, uint64_t>(datetimev2_value) >>
|
|
TIME_PART_LENGTH);
|
|
func(temp_range, &date_v2);
|
|
}
|
|
} else {
|
|
doris::vectorized::DateV2Value<DateV2ValueType> date_v2;
|
|
date_v2.set_date_uint32(
|
|
binary_cast<DateV2Value<DateTimeV2ValueType>, uint64_t>(datetimev2_value) >>
|
|
TIME_PART_LENGTH);
|
|
if (!datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
|
|
if (fn_name == "lt" || fn_name == "ge") {
|
|
++date_v2;
|
|
}
|
|
}
|
|
func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), &date_v2);
|
|
}
|
|
} else if constexpr ((PrimitiveType == TYPE_DECIMALV2) || (PrimitiveType == TYPE_CHAR) ||
|
|
(PrimitiveType == TYPE_VARCHAR) || (PrimitiveType == TYPE_HLL) ||
|
|
(PrimitiveType == TYPE_DATETIMEV2) || (PrimitiveType == TYPE_TINYINT) ||
|
|
(PrimitiveType == TYPE_SMALLINT) || (PrimitiveType == TYPE_INT) ||
|
|
(PrimitiveType == TYPE_BIGINT) || (PrimitiveType == TYPE_LARGEINT) ||
|
|
(PrimitiveType == TYPE_DECIMAL32) || (PrimitiveType == TYPE_DECIMAL64) ||
|
|
(PrimitiveType == TYPE_DECIMAL128) || (PrimitiveType == TYPE_STRING) ||
|
|
(PrimitiveType == TYPE_BOOLEAN)) {
|
|
if constexpr (IsFixed) {
|
|
func(temp_range,
|
|
reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>(value));
|
|
} else {
|
|
func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
|
|
reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>(value));
|
|
}
|
|
} else {
|
|
static_assert(always_false_v<PrimitiveType>);
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
bool VOlapScanNode::is_key_column(const std::string& key_name) {
|
|
// all column in dup_keys table olap scan node threat
|
|
// as key column
|
|
if (_olap_scan_node.keyType == TKeysType::DUP_KEYS) {
|
|
return true;
|
|
}
|
|
|
|
auto res = std::find(_olap_scan_node.key_column_name.begin(),
|
|
_olap_scan_node.key_column_name.end(), key_name);
|
|
return res != _olap_scan_node.key_column_name.end();
|
|
}
|
|
|
|
Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
|
|
if (_scan_ranges.empty()) {
|
|
_transfer_done = true;
|
|
return Status::OK();
|
|
}
|
|
auto span = opentelemetry::trace::Tracer::GetCurrentSpan();
|
|
|
|
// ranges constructed from scan keys
|
|
std::vector<std::unique_ptr<OlapScanRange>> cond_ranges;
|
|
RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
|
|
// if we can't get ranges from conditions, we give it a total range
|
|
if (cond_ranges.empty()) {
|
|
cond_ranges.emplace_back(new OlapScanRange());
|
|
}
|
|
bool need_split = true;
|
|
// If we have ranges more than 64, there is no need to call
|
|
// ShowHint to split ranges
|
|
if (limit() != -1 || cond_ranges.size() > 64) {
|
|
need_split = false;
|
|
}
|
|
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
|
|
|
|
std::unordered_set<std::string> disk_set;
|
|
for (auto& scan_range : _scan_ranges) {
|
|
auto tablet_id = scan_range->tablet_id;
|
|
std::string err;
|
|
TabletSharedPtr tablet =
|
|
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
|
|
if (tablet == nullptr) {
|
|
std::stringstream ss;
|
|
ss << "failed to get tablet: " << tablet_id << ", reason: " << err;
|
|
LOG(WARNING) << ss.str();
|
|
return Status::InternalError(ss.str());
|
|
}
|
|
|
|
std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges;
|
|
std::vector<std::unique_ptr<OlapScanRange>> split_ranges;
|
|
if (need_split && !tablet->all_beta()) {
|
|
auto st = get_hints(tablet, *scan_range, config::doris_scan_range_row_count,
|
|
_scan_keys.begin_include(), _scan_keys.end_include(), cond_ranges,
|
|
&split_ranges, _runtime_profile.get());
|
|
if (st.ok()) {
|
|
ranges = &split_ranges;
|
|
}
|
|
}
|
|
int size_based_scanners_per_tablet = 1;
|
|
|
|
if (config::doris_scan_range_max_mb > 0) {
|
|
size_based_scanners_per_tablet = std::max(
|
|
1, (int)tablet->tablet_footprint() / config::doris_scan_range_max_mb << 20);
|
|
}
|
|
|
|
int ranges_per_scanner =
|
|
std::max(1, (int)ranges->size() /
|
|
std::min(scanners_per_tablet, size_based_scanners_per_tablet));
|
|
int num_ranges = ranges->size();
|
|
for (int i = 0; i < num_ranges;) {
|
|
std::vector<OlapScanRange*> scanner_ranges;
|
|
scanner_ranges.push_back((*ranges)[i].get());
|
|
++i;
|
|
for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
|
|
(*ranges)[i]->end_include == (*ranges)[i - 1]->end_include;
|
|
++j, ++i) {
|
|
scanner_ranges.push_back((*ranges)[i].get());
|
|
}
|
|
VOlapScanner* scanner =
|
|
new VOlapScanner(state, this, _olap_scan_node.is_preaggregation,
|
|
_need_agg_finalize, *scan_range, _scanner_mem_tracker.get());
|
|
// add scanner to pool before doing prepare.
|
|
// so that scanner can be automatically deconstructed if prepare failed.
|
|
_scanner_pool.add(scanner);
|
|
RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter,
|
|
_bloom_filters_push_down, _push_down_functions));
|
|
|
|
_volap_scanners.push_back(scanner);
|
|
disk_set.insert(scanner->scan_disk());
|
|
}
|
|
}
|
|
COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
|
|
COUNTER_SET(_num_scanners, static_cast<int64_t>(_volap_scanners.size()));
|
|
telemetry::set_span_attribute(span, _num_disks_accessed_counter);
|
|
telemetry::set_span_attribute(span, _num_scanners);
|
|
|
|
// init progress
|
|
std::stringstream ss;
|
|
ss << "ScanThread complete (node=" << id() << "):";
|
|
_progress = ProgressUpdater(ss.str(), _volap_scanners.size(), 1);
|
|
|
|
_transfer_thread.reset(new std::thread(
|
|
[this, state, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
|
|
opentelemetry::trace::Scope scope {parent_span};
|
|
transfer_thread(state);
|
|
}));
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::close(RuntimeState* state) {
|
|
if (is_closed()) {
|
|
return Status::OK();
|
|
}
|
|
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::close");
|
|
// change done status
|
|
{
|
|
std::unique_lock<std::mutex> l(_blocks_lock);
|
|
_transfer_done = true;
|
|
}
|
|
// notify all scanner thread
|
|
_block_consumed_cv.notify_all();
|
|
_block_added_cv.notify_all();
|
|
_scan_block_added_cv.notify_all();
|
|
|
|
// join transfer thread
|
|
if (_transfer_thread) {
|
|
_transfer_thread->join();
|
|
}
|
|
|
|
// clear some block in queue
|
|
// TODO: The presence of transfer_thread here may cause Block's memory alloc and be released not in a thread,
|
|
// which may lead to potential performance problems. we should rethink whether to delete the transfer thread
|
|
std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(),
|
|
std::default_delete<Block>());
|
|
_materialized_row_batches_bytes = 0;
|
|
std::for_each(_scan_blocks.begin(), _scan_blocks.end(), std::default_delete<Block>());
|
|
_scan_row_batches_bytes = 0;
|
|
std::for_each(_free_blocks.begin(), _free_blocks.end(), std::default_delete<Block>());
|
|
|
|
// OlapScanNode terminate by exception
|
|
// so that initiative close the Scanner
|
|
for (auto scanner : _volap_scanners) {
|
|
scanner->close(state);
|
|
}
|
|
|
|
for (auto& filter_desc : _runtime_filter_descs) {
|
|
IRuntimeFilter* runtime_filter = nullptr;
|
|
state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
|
|
DCHECK(runtime_filter != nullptr);
|
|
runtime_filter->consumer_close();
|
|
}
|
|
|
|
for (auto& ctx : _stale_vexpr_ctxs) {
|
|
(*ctx)->close(state);
|
|
}
|
|
|
|
VLOG_CRITICAL << "VOlapScanNode::close()";
|
|
return ScanNode::close(state);
|
|
}
|
|
|
|
Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
|
|
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VOlapScanNode::get_next");
|
|
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
|
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
|
|
|
// check if Canceled.
|
|
if (state->is_cancelled()) {
|
|
std::unique_lock<std::mutex> l(_blocks_lock);
|
|
_transfer_done = true;
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
if (LIKELY(_status.ok())) {
|
|
_status = Status::Cancelled("Cancelled");
|
|
}
|
|
return _status;
|
|
}
|
|
|
|
// check if started.
|
|
if (!_start) {
|
|
Status status = start_scan(state);
|
|
|
|
if (!status.ok()) {
|
|
LOG(ERROR) << "StartScan Failed cause " << status.get_error_msg();
|
|
*eos = true;
|
|
return status;
|
|
}
|
|
|
|
_start = true;
|
|
}
|
|
|
|
// some conjuncts will be disposed in start_scan function, so
|
|
// we should check _eos after call start_scan
|
|
if (_eos) {
|
|
*eos = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
// wait for block from queue
|
|
Block* materialized_block = nullptr;
|
|
{
|
|
std::unique_lock<std::mutex> l(_blocks_lock);
|
|
SCOPED_TIMER(_olap_wait_batch_queue_timer);
|
|
while (_materialized_blocks.empty() && !_transfer_done) {
|
|
if (state->is_cancelled()) {
|
|
_transfer_done = true;
|
|
}
|
|
|
|
// use wait_for, not wait, in case to capture the state->is_cancelled()
|
|
_block_added_cv.wait_for(l, std::chrono::seconds(1));
|
|
}
|
|
|
|
if (!_materialized_blocks.empty()) {
|
|
materialized_block = _materialized_blocks.back();
|
|
DCHECK(materialized_block != nullptr);
|
|
_materialized_blocks.pop_back();
|
|
_materialized_row_batches_bytes -= materialized_block->allocated_bytes();
|
|
}
|
|
}
|
|
|
|
// return block
|
|
if (nullptr != materialized_block) {
|
|
// notify scanner
|
|
_block_consumed_cv.notify_one();
|
|
// get scanner's block memory
|
|
block->swap(*materialized_block);
|
|
VLOG_ROW << "VOlapScanNode output rows: " << block->rows();
|
|
reached_limit(block, eos);
|
|
|
|
// reach scan node limit
|
|
if (*eos) {
|
|
{
|
|
std::unique_lock<std::mutex> l(_blocks_lock);
|
|
_transfer_done = true;
|
|
}
|
|
|
|
_block_consumed_cv.notify_all();
|
|
*eos = true;
|
|
LOG(INFO) << "VOlapScanNode ReachedLimit.";
|
|
} else {
|
|
*eos = false;
|
|
}
|
|
|
|
{
|
|
// ReThink whether the SpinLock Better
|
|
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
|
_free_blocks.emplace_back(materialized_block);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// all scanner done, change *eos to true
|
|
*eos = true;
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
return _status;
|
|
}
|
|
|
|
Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
|
|
{
|
|
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
|
if (!_free_blocks.empty()) {
|
|
auto block = _free_blocks.back();
|
|
_free_blocks.pop_back();
|
|
return block;
|
|
}
|
|
}
|
|
|
|
get_free_block = false;
|
|
|
|
auto block = new Block(_tuple_desc->slots(), _block_size);
|
|
_buffered_bytes += block->allocated_bytes();
|
|
return block;
|
|
}
|
|
|
|
int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) {
|
|
std::list<VOlapScanner*> olap_scanners;
|
|
int assigned_thread_num = _running_thread;
|
|
size_t max_thread = config::doris_scanner_queue_size;
|
|
if (config::doris_scanner_row_num > state->batch_size()) {
|
|
max_thread /= config::doris_scanner_row_num / state->batch_size();
|
|
if (max_thread <= 0) max_thread = 1;
|
|
}
|
|
// copy to local
|
|
{
|
|
// How many thread can apply to this query
|
|
size_t thread_slot_num = 0;
|
|
{
|
|
if (_scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) {
|
|
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
|
thread_slot_num = _free_blocks.size() / block_per_scanner;
|
|
thread_slot_num += (_free_blocks.size() % block_per_scanner != 0);
|
|
thread_slot_num = std::min(thread_slot_num, max_thread - assigned_thread_num);
|
|
if (thread_slot_num <= 0) {
|
|
thread_slot_num = 1;
|
|
}
|
|
} else {
|
|
std::lock_guard<std::mutex> l(_scan_blocks_lock);
|
|
if (_scan_blocks.empty()) {
|
|
// Just for notify if _scan_blocks is empty and no running thread
|
|
if (assigned_thread_num == 0) {
|
|
thread_slot_num = 1;
|
|
// NOTE: if olap_scanners_ is empty, scanner_done_ should be true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
{
|
|
std::lock_guard<std::mutex> l(_volap_scanners_lock);
|
|
thread_slot_num = std::min(thread_slot_num, _volap_scanners.size());
|
|
for (int i = 0; i < thread_slot_num && !_volap_scanners.empty();) {
|
|
auto scanner = _volap_scanners.front();
|
|
_volap_scanners.pop_front();
|
|
|
|
if (scanner->need_to_close()) {
|
|
scanner->close(state);
|
|
} else {
|
|
olap_scanners.push_back(scanner);
|
|
_running_thread++;
|
|
assigned_thread_num++;
|
|
i++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// post volap scanners to thread-pool
|
|
auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
|
|
ThreadPoolToken* thread_token = nullptr;
|
|
if (_limit > -1 && _limit < 1024) {
|
|
thread_token = state->get_query_fragments_ctx()->get_serial_token();
|
|
} else {
|
|
thread_token = state->get_query_fragments_ctx()->get_token();
|
|
}
|
|
auto iter = olap_scanners.begin();
|
|
if (thread_token != nullptr) {
|
|
while (iter != olap_scanners.end()) {
|
|
auto s = thread_token->submit_func([this, scanner = *iter, parent_span = cur_span] {
|
|
opentelemetry::trace::Scope scope {parent_span};
|
|
this->scanner_thread(scanner);
|
|
});
|
|
if (s.ok()) {
|
|
(*iter)->start_wait_worker_timer();
|
|
COUNTER_UPDATE(_scanner_sched_counter, 1);
|
|
olap_scanners.erase(iter++);
|
|
} else {
|
|
LOG(FATAL) << "Failed to assign scanner task to thread pool! " << s.get_error_msg();
|
|
}
|
|
++_total_assign_num;
|
|
}
|
|
} else {
|
|
PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
|
|
PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool();
|
|
while (iter != olap_scanners.end()) {
|
|
PriorityThreadPool::Task task;
|
|
task.work_function = [this, scanner = *iter, parent_span = cur_span] {
|
|
opentelemetry::trace::Scope scope {parent_span};
|
|
this->scanner_thread(scanner);
|
|
};
|
|
task.priority = _nice;
|
|
task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk());
|
|
(*iter)->start_wait_worker_timer();
|
|
|
|
TabletStorageType type = (*iter)->get_storage_type();
|
|
bool ret = false;
|
|
COUNTER_UPDATE(_scanner_sched_counter, 1);
|
|
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
|
|
ret = thread_pool->offer(task);
|
|
} else {
|
|
ret = remote_thread_pool->offer(task);
|
|
}
|
|
|
|
if (ret) {
|
|
olap_scanners.erase(iter++);
|
|
} else {
|
|
LOG(FATAL) << "Failed to assign scanner task to thread pool!";
|
|
}
|
|
++_total_assign_num;
|
|
}
|
|
}
|
|
|
|
return assigned_thread_num;
|
|
}
|
|
|
|
// PlanFragmentExecutor will call this method to set scan range
|
|
// Doris scan range is defined in thrift file like this
|
|
// struct TPaloScanRange {
|
|
// 1: required list<Types.TNetworkAddress> hosts
|
|
// 2: required string schema_hash
|
|
// 3: required string version
|
|
// 5: required Types.TTabletId tablet_id
|
|
// 6: required string db_name
|
|
// 7: optional list<TKeyRange> partition_column_ranges
|
|
// 8: optional string index_name
|
|
// 9: optional string table_name
|
|
//}
|
|
// every doris_scan_range is related with one tablet so that one olap scan node contains multiple tablet
|
|
Status VOlapScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
|
|
for (auto& scan_range : scan_ranges) {
|
|
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
|
|
_scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range));
|
|
COUNTER_UPDATE(_tablet_counter, 1);
|
|
}
|
|
telemetry::set_current_span_attribute(_tablet_counter);
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range,
|
|
int block_row_count, bool is_begin_include, bool is_end_include,
|
|
const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range,
|
|
std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range,
|
|
RuntimeProfile* profile) {
|
|
RuntimeProfile::Counter* show_hints_timer = profile->get_counter("ShowHintsTime_V1");
|
|
std::vector<std::vector<OlapTuple>> ranges;
|
|
bool have_valid_range = false;
|
|
for (auto& key_range : scan_key_range) {
|
|
if (key_range->begin_scan_range.size() == 1 &&
|
|
key_range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) {
|
|
continue;
|
|
}
|
|
SCOPED_TIMER(show_hints_timer);
|
|
|
|
Status res = Status::OK();
|
|
std::vector<OlapTuple> range;
|
|
res = table->split_range(key_range->begin_scan_range, key_range->end_scan_range,
|
|
block_row_count, &range);
|
|
if (!res.ok()) {
|
|
return Status::InternalError("fail to show hints");
|
|
}
|
|
ranges.emplace_back(std::move(range));
|
|
have_valid_range = true;
|
|
}
|
|
|
|
if (!have_valid_range) {
|
|
std::vector<OlapTuple> range;
|
|
auto res = table->split_range({}, {}, block_row_count, &range);
|
|
if (!res.ok()) {
|
|
return Status::InternalError("fail to show hints");
|
|
}
|
|
ranges.emplace_back(std::move(range));
|
|
}
|
|
|
|
for (int i = 0; i < ranges.size(); ++i) {
|
|
for (int j = 0; j < ranges[i].size(); j += 2) {
|
|
std::unique_ptr<OlapScanRange> range(new OlapScanRange);
|
|
range->begin_scan_range.reset();
|
|
range->begin_scan_range = ranges[i][j];
|
|
range->end_scan_range.reset();
|
|
range->end_scan_range = ranges[i][j + 1];
|
|
|
|
if (0 == j) {
|
|
range->begin_include = is_begin_include;
|
|
} else {
|
|
range->begin_include = true;
|
|
}
|
|
|
|
if (j + 2 == ranges[i].size()) {
|
|
range->end_include = is_end_include;
|
|
} else {
|
|
range->end_include = false;
|
|
}
|
|
|
|
sub_scan_range->emplace_back(std::move(range));
|
|
}
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
template <bool IsNotIn>
|
|
bool VOlapScanNode::_should_push_down_in_predicate(VInPredicate* pred, VExprContext* expr_ctx) {
|
|
if (pred->is_not_in() != IsNotIn) {
|
|
return false;
|
|
}
|
|
InState* state = reinterpret_cast<InState*>(
|
|
expr_ctx->fn_context(pred->fn_context_index())
|
|
->get_function_state(FunctionContext::FRAGMENT_LOCAL));
|
|
HybridSetBase* set = state->hybrid_set.get();
|
|
|
|
// if there are too many elements in InPredicate, exceed the limit,
|
|
// we will not push any condition of this column to storage engine.
|
|
// because too many conditions pushed down to storage engine may even
|
|
// slow down the query process.
|
|
// ATTN: This is just an experience value. You may need to try
|
|
// different thresholds to improve performance.
|
|
if (set->size() > _max_pushdown_conditions_per_column) {
|
|
VLOG_NOTICE << "Predicate value num " << set->size() << " exceed limit "
|
|
<< _max_pushdown_conditions_per_column;
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool VOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_call,
|
|
VExprContext* expr_ctx,
|
|
std::string* constant_str,
|
|
doris_udf::FunctionContext** fn_ctx) {
|
|
// Now only `like` function filters is supported to push down
|
|
if (fn_call->fn().name.function_name != "like") {
|
|
return false;
|
|
}
|
|
|
|
const auto& children = fn_call->children();
|
|
doris_udf::FunctionContext* func_cxt = expr_ctx->fn_context(fn_call->fn_context_index());
|
|
DCHECK(func_cxt != nullptr);
|
|
DCHECK(children.size() == 2);
|
|
for (size_t i = 0; i < children.size(); i++) {
|
|
if (VExpr::expr_without_cast(children[i])->node_type() != TExprNodeType::SLOT_REF) {
|
|
// not a slot ref(column)
|
|
continue;
|
|
}
|
|
if (!children[1 - i]->is_constant()) {
|
|
// only handle constant value
|
|
return false;
|
|
} else {
|
|
DCHECK(children[1 - i]->type().is_string_type());
|
|
if (const ColumnConst* const_column = check_and_get_column<ColumnConst>(
|
|
children[1 - i]->get_const_col(expr_ctx)->column_ptr)) {
|
|
*constant_str = const_column->get_data_at(0).to_string();
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
*fn_ctx = func_cxt;
|
|
return true;
|
|
}
|
|
|
|
bool VOlapScanNode::_should_push_down_binary_predicate(
|
|
VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* constant_val,
|
|
int* slot_ref_child, const std::function<bool(const std::string&)>& fn_checker) {
|
|
if (!fn_checker(fn_call->fn().name.function_name)) {
|
|
return false;
|
|
}
|
|
|
|
const auto& children = fn_call->children();
|
|
DCHECK(children.size() == 2);
|
|
for (size_t i = 0; i < children.size(); i++) {
|
|
if (VExpr::expr_without_cast(children[i])->node_type() != TExprNodeType::SLOT_REF) {
|
|
// not a slot ref(column)
|
|
continue;
|
|
}
|
|
if (!children[1 - i]->is_constant()) {
|
|
// only handle constant value
|
|
return false;
|
|
} else {
|
|
if (const ColumnConst* const_column = check_and_get_column<ColumnConst>(
|
|
children[1 - i]->get_const_col(expr_ctx)->column_ptr)) {
|
|
*slot_ref_child = i;
|
|
*constant_val = const_column->get_data_at(0);
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool VOlapScanNode::_is_predicate_acting_on_slot(
|
|
VExpr* expr,
|
|
const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**, VExpr**)>& checker,
|
|
SlotDescriptor** slot_desc, ColumnValueRangeType** range) {
|
|
const VSlotRef* slot_ref = nullptr;
|
|
VExpr* child_contains_slot = nullptr;
|
|
if (!checker(expr->children(), &slot_ref, &child_contains_slot)) {
|
|
// not a slot ref(column)
|
|
return false;
|
|
}
|
|
|
|
auto entry = _id_to_slot_column_value_range.find(slot_ref->slot_id());
|
|
if (_id_to_slot_column_value_range.end() == entry) {
|
|
return false;
|
|
}
|
|
*slot_desc = entry->second.first;
|
|
DCHECK(child_contains_slot != nullptr);
|
|
if (child_contains_slot->type().type != (*slot_desc)->type().type) {
|
|
if (!ignore_cast(*slot_desc, child_contains_slot)) {
|
|
// the type of predicate not match the slot's type
|
|
return false;
|
|
}
|
|
}
|
|
*range = &(entry->second.second);
|
|
return true;
|
|
}
|
|
|
|
template <PrimitiveType T>
|
|
Status VOlapScanNode::_normalize_in_and_eq_predicate(VExpr* expr, VExprContext* expr_ctx,
|
|
SlotDescriptor* slot,
|
|
ColumnValueRange<T>& range, bool* push_down) {
|
|
auto temp_range = ColumnValueRange<T>::create_empty_column_value_range(slot->type().precision,
|
|
slot->type().scale);
|
|
bool effect = false;
|
|
// 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
|
|
if (TExprNodeType::IN_PRED == expr->node_type()) {
|
|
VInPredicate* pred = static_cast<VInPredicate*>(expr);
|
|
if (!_should_push_down_in_predicate<false>(pred, expr_ctx)) {
|
|
return Status::OK();
|
|
}
|
|
|
|
// begin to push InPredicate value into ColumnValueRange
|
|
InState* state = reinterpret_cast<InState*>(
|
|
expr_ctx->fn_context(pred->fn_context_index())
|
|
->get_function_state(FunctionContext::FRAGMENT_LOCAL));
|
|
HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
|
|
auto fn_name = std::string("");
|
|
while (iter->has_next()) {
|
|
// column in (nullptr) is always false so continue to
|
|
// dispose next item
|
|
if (nullptr == iter->get_value()) {
|
|
iter->next();
|
|
continue;
|
|
}
|
|
auto value = const_cast<void*>(iter->get_value());
|
|
RETURN_IF_ERROR(change_value_range<true>(
|
|
temp_range, value, ColumnValueRange<T>::add_fixed_value_range, fn_name));
|
|
iter->next();
|
|
}
|
|
|
|
range.intersection(temp_range);
|
|
effect = true;
|
|
} else if (TExprNodeType::BINARY_PRED == expr->node_type()) {
|
|
DCHECK(expr->children().size() == 2);
|
|
auto eq_checker = [](const std::string& fn_name) { return fn_name == "eq"; };
|
|
|
|
StringRef value;
|
|
int slot_ref_child = -1;
|
|
if (_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr), expr_ctx,
|
|
&value, &slot_ref_child, eq_checker)) {
|
|
DCHECK(slot_ref_child >= 0);
|
|
// where A = nullptr should return empty result set
|
|
auto fn_name = std::string("");
|
|
if (value.data != nullptr) {
|
|
if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING ||
|
|
T == TYPE_HLL) {
|
|
auto val = StringValue(value.data, value.size);
|
|
RETURN_IF_ERROR(change_value_range<true>(
|
|
temp_range, reinterpret_cast<void*>(&val),
|
|
ColumnValueRange<T>::add_fixed_value_range, fn_name));
|
|
} else {
|
|
RETURN_IF_ERROR(change_value_range<true>(
|
|
temp_range, reinterpret_cast<void*>(const_cast<char*>(value.data)),
|
|
ColumnValueRange<T>::add_fixed_value_range, fn_name));
|
|
}
|
|
range.intersection(temp_range);
|
|
effect = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
// exceed limit, no conditions will be pushed down to storage engine.
|
|
if (range.get_fixed_value_size() > _max_pushdown_conditions_per_column) {
|
|
range.set_whole_value_range();
|
|
} else {
|
|
*push_down = effect;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
template <PrimitiveType T>
|
|
Status VOlapScanNode::_normalize_not_in_and_not_eq_predicate(VExpr* expr, VExprContext* expr_ctx,
|
|
SlotDescriptor* slot,
|
|
ColumnValueRange<T>& range,
|
|
bool* push_down) {
|
|
bool is_fixed_range = range.is_fixed_value_range();
|
|
auto not_in_range = ColumnValueRange<T>::create_empty_column_value_range(range.column_name());
|
|
bool effect = false;
|
|
// 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
|
|
if (TExprNodeType::IN_PRED == expr->node_type()) {
|
|
VInPredicate* pred = static_cast<VInPredicate*>(expr);
|
|
if (!_should_push_down_in_predicate<true>(pred, expr_ctx)) {
|
|
return Status::OK();
|
|
}
|
|
|
|
// begin to push InPredicate value into ColumnValueRange
|
|
InState* state = reinterpret_cast<InState*>(
|
|
expr_ctx->fn_context(pred->fn_context_index())
|
|
->get_function_state(FunctionContext::FRAGMENT_LOCAL));
|
|
HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
|
|
auto fn_name = std::string("");
|
|
while (iter->has_next()) {
|
|
// column not in (nullptr) is always true
|
|
if (nullptr == iter->get_value()) {
|
|
continue;
|
|
}
|
|
auto value = const_cast<void*>(iter->get_value());
|
|
if (is_fixed_range) {
|
|
RETURN_IF_ERROR(change_value_range<true>(
|
|
range, value, ColumnValueRange<T>::remove_fixed_value_range, fn_name));
|
|
} else {
|
|
RETURN_IF_ERROR(change_value_range<true>(
|
|
not_in_range, value, ColumnValueRange<T>::add_fixed_value_range, fn_name));
|
|
}
|
|
iter->next();
|
|
}
|
|
effect = true;
|
|
} else if (TExprNodeType::BINARY_PRED == expr->node_type()) {
|
|
DCHECK(expr->children().size() == 2);
|
|
|
|
auto ne_checker = [](const std::string& fn_name) { return fn_name == "ne"; };
|
|
StringRef value;
|
|
int slot_ref_child = -1;
|
|
if (_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr), expr_ctx,
|
|
&value, &slot_ref_child, ne_checker)) {
|
|
DCHECK(slot_ref_child >= 0);
|
|
// where A = nullptr should return empty result set
|
|
if (value.data != nullptr) {
|
|
auto fn_name = std::string("");
|
|
if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING ||
|
|
T == TYPE_HLL) {
|
|
auto val = StringValue(value.data, value.size);
|
|
if (is_fixed_range) {
|
|
RETURN_IF_ERROR(change_value_range<true>(
|
|
range, reinterpret_cast<void*>(&val),
|
|
ColumnValueRange<T>::remove_fixed_value_range, fn_name));
|
|
} else {
|
|
RETURN_IF_ERROR(change_value_range<true>(
|
|
not_in_range, reinterpret_cast<void*>(&val),
|
|
ColumnValueRange<T>::add_fixed_value_range, fn_name));
|
|
}
|
|
} else {
|
|
if (is_fixed_range) {
|
|
RETURN_IF_ERROR(change_value_range<true>(
|
|
range, reinterpret_cast<void*>(const_cast<char*>(value.data)),
|
|
ColumnValueRange<T>::remove_fixed_value_range, fn_name));
|
|
} else {
|
|
RETURN_IF_ERROR(change_value_range<true>(
|
|
not_in_range,
|
|
reinterpret_cast<void*>(const_cast<char*>(value.data)),
|
|
ColumnValueRange<T>::add_fixed_value_range, fn_name));
|
|
}
|
|
}
|
|
effect = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (is_fixed_range ||
|
|
not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) {
|
|
if (!is_fixed_range) {
|
|
// push down not in condition to storage engine
|
|
not_in_range.to_in_condition(_olap_filter, false);
|
|
}
|
|
*push_down = effect;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
template <PrimitiveType T>
|
|
Status VOlapScanNode::_normalize_is_null_predicate(VExpr* expr, VExprContext* expr_ctx,
|
|
SlotDescriptor* slot, ColumnValueRange<T>& range,
|
|
bool* push_down) {
|
|
if (TExprNodeType::FUNCTION_CALL == expr->node_type()) {
|
|
if (reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name == "is_null_pred") {
|
|
auto temp_range = ColumnValueRange<T>::create_empty_column_value_range(
|
|
slot->type().precision, slot->type().scale);
|
|
temp_range.set_contain_null(true);
|
|
range.intersection(temp_range);
|
|
*push_down = true;
|
|
} else if (reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name ==
|
|
"is_not_null_pred") {
|
|
auto temp_range = ColumnValueRange<T>::create_empty_column_value_range(
|
|
slot->type().precision, slot->type().scale);
|
|
temp_range.set_contain_null(false);
|
|
range.intersection(temp_range);
|
|
*push_down = true;
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
template <PrimitiveType T>
|
|
Status VOlapScanNode::_normalize_noneq_binary_predicate(VExpr* expr, VExprContext* expr_ctx,
|
|
SlotDescriptor* slot,
|
|
ColumnValueRange<T>& range,
|
|
bool* push_down) {
|
|
if (TExprNodeType::BINARY_PRED == expr->node_type()) {
|
|
DCHECK(expr->children().size() == 2);
|
|
|
|
auto noneq_checker = [](const std::string& fn_name) {
|
|
return fn_name != "ne" && fn_name != "eq";
|
|
};
|
|
StringRef value;
|
|
int slot_ref_child = -1;
|
|
if (_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr), expr_ctx,
|
|
&value, &slot_ref_child, noneq_checker)) {
|
|
DCHECK(slot_ref_child >= 0);
|
|
const std::string& fn_name =
|
|
reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name;
|
|
|
|
// where A = nullptr should return empty result set
|
|
if (value.data != nullptr) {
|
|
*push_down = true;
|
|
if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING ||
|
|
T == TYPE_HLL) {
|
|
auto val = StringValue(value.data, value.size);
|
|
RETURN_IF_ERROR(change_value_range<false>(range, reinterpret_cast<void*>(&val),
|
|
ColumnValueRange<T>::add_value_range,
|
|
fn_name, slot_ref_child));
|
|
} else {
|
|
RETURN_IF_ERROR(change_value_range<false>(
|
|
range, reinterpret_cast<void*>(const_cast<char*>(value.data)),
|
|
ColumnValueRange<T>::add_value_range, fn_name, slot_ref_child));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::_normalize_bloom_filter(VExpr* expr, VExprContext* expr_ctx,
|
|
SlotDescriptor* slot, bool* push_down) {
|
|
if (TExprNodeType::BLOOM_PRED == expr->node_type()) {
|
|
DCHECK(expr->children().size() == 1);
|
|
_bloom_filters_push_down.emplace_back(slot->col_name(), expr->get_bloom_filter_func());
|
|
*push_down = true;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::_normalize_function_filters(VExpr* expr, VExprContext* expr_ctx,
|
|
SlotDescriptor* slot, bool* push_down) {
|
|
if (!config::enable_function_pushdown) {
|
|
return Status::OK();
|
|
}
|
|
bool opposite = false;
|
|
VExpr* fn_expr = expr;
|
|
if (TExprNodeType::COMPOUND_PRED == expr->node_type() &&
|
|
expr->fn().name.function_name == "not") {
|
|
fn_expr = fn_expr->children()[0];
|
|
opposite = true;
|
|
}
|
|
|
|
if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type()) {
|
|
doris_udf::FunctionContext* fn_ctx = nullptr;
|
|
std::string str;
|
|
if (_should_push_down_function_filter(reinterpret_cast<VectorizedFnCall*>(fn_expr),
|
|
expr_ctx, &str, &fn_ctx)) {
|
|
std::string col = slot->col_name();
|
|
StringVal val(reinterpret_cast<uint8_t*>(str.data()), str.size());
|
|
_push_down_functions.emplace_back(opposite, col, fn_ctx, val);
|
|
*push_down = true;
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void VOlapScanNode::eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, bool* push_down) {
|
|
char* constant_val = nullptr;
|
|
if (vexpr->is_constant()) {
|
|
if (const ColumnConst* const_column =
|
|
check_and_get_column<ColumnConst>(vexpr->get_const_col(expr_ctx)->column_ptr)) {
|
|
constant_val = const_cast<char*>(const_column->get_data_at(0).data);
|
|
if (constant_val == nullptr || *reinterpret_cast<bool*>(constant_val) == false) {
|
|
*push_down = true;
|
|
_eos = true;
|
|
}
|
|
} else {
|
|
LOG(WARNING) << "Expr[" << vexpr->debug_string()
|
|
<< "] is a constant but doesn't contain a const column!";
|
|
}
|
|
}
|
|
}
|
|
|
|
VExpr* VOlapScanNode::_normalize_predicate(RuntimeState* state, VExpr* conjunct_expr_root) {
|
|
static constexpr auto is_leaf = [](VExpr* expr) { return !expr->is_and_expr(); };
|
|
auto in_predicate_checker = [](const std::vector<VExpr*>& children, const VSlotRef** slot,
|
|
VExpr** child_contains_slot) {
|
|
if (children.empty() ||
|
|
VExpr::expr_without_cast(children[0])->node_type() != TExprNodeType::SLOT_REF) {
|
|
// not a slot ref(column)
|
|
return false;
|
|
}
|
|
*slot = reinterpret_cast<const VSlotRef*>(VExpr::expr_without_cast(children[0]));
|
|
*child_contains_slot = children[0];
|
|
return true;
|
|
};
|
|
auto eq_predicate_checker = [](const std::vector<VExpr*>& children, const VSlotRef** slot,
|
|
VExpr** child_contains_slot) {
|
|
for (const VExpr* child : children) {
|
|
if (VExpr::expr_without_cast(child)->node_type() != TExprNodeType::SLOT_REF) {
|
|
// not a slot ref(column)
|
|
continue;
|
|
}
|
|
*slot = reinterpret_cast<const VSlotRef*>(VExpr::expr_without_cast(child));
|
|
*child_contains_slot = const_cast<VExpr*>(child);
|
|
return true;
|
|
}
|
|
return false;
|
|
};
|
|
|
|
if (conjunct_expr_root != nullptr) {
|
|
if (is_leaf(conjunct_expr_root)) {
|
|
auto impl = conjunct_expr_root->get_impl();
|
|
VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) : conjunct_expr_root;
|
|
SlotDescriptor* slot;
|
|
ColumnValueRangeType* range = nullptr;
|
|
bool push_down = false;
|
|
eval_const_conjuncts(cur_expr, *(_vconjunct_ctx_ptr.get()), &push_down);
|
|
if (!push_down &&
|
|
(_is_predicate_acting_on_slot(cur_expr, in_predicate_checker, &slot, &range) ||
|
|
_is_predicate_acting_on_slot(cur_expr, eq_predicate_checker, &slot, &range))) {
|
|
std::visit(
|
|
[&](auto& value_range) {
|
|
RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate(
|
|
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range,
|
|
&push_down));
|
|
RETURN_IF_PUSH_DOWN(_normalize_not_in_and_not_eq_predicate(
|
|
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range,
|
|
&push_down));
|
|
RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate(
|
|
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range,
|
|
&push_down));
|
|
RETURN_IF_PUSH_DOWN(_normalize_noneq_binary_predicate(
|
|
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range,
|
|
&push_down));
|
|
if (is_key_column(slot->col_name())) {
|
|
RETURN_IF_PUSH_DOWN(_normalize_bloom_filter(
|
|
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &push_down));
|
|
RETURN_IF_PUSH_DOWN(_normalize_function_filters(
|
|
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &push_down));
|
|
}
|
|
},
|
|
*range);
|
|
}
|
|
if (push_down && is_key_column(slot->col_name())) {
|
|
return nullptr;
|
|
} else {
|
|
return conjunct_expr_root;
|
|
}
|
|
} else {
|
|
VExpr* left_child = _normalize_predicate(state, conjunct_expr_root->children()[0]);
|
|
VExpr* right_child = _normalize_predicate(state, conjunct_expr_root->children()[1]);
|
|
|
|
if (left_child != nullptr && right_child != nullptr) {
|
|
conjunct_expr_root->set_children({left_child, right_child});
|
|
return conjunct_expr_root;
|
|
} else {
|
|
// here only close the and expr self, do not close the child
|
|
conjunct_expr_root->set_children({});
|
|
conjunct_expr_root->close(state, *_vconjunct_ctx_ptr,
|
|
(*_vconjunct_ctx_ptr)->get_function_state_scope());
|
|
}
|
|
|
|
// here do not close Expr* now
|
|
return left_child != nullptr ? left_child : right_child;
|
|
}
|
|
}
|
|
return conjunct_expr_root;
|
|
}
|
|
|
|
Status VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state, std::vector<VExpr*>& vexprs) {
|
|
if (!vexprs.empty()) {
|
|
auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0];
|
|
for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
|
|
if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) {
|
|
continue;
|
|
}
|
|
TFunction fn;
|
|
TFunctionName fn_name;
|
|
fn_name.__set_db_name("");
|
|
fn_name.__set_function_name("and");
|
|
fn.__set_name(fn_name);
|
|
fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
|
|
std::vector<TTypeDesc> arg_types;
|
|
arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
|
|
arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
|
|
fn.__set_arg_types(arg_types);
|
|
fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
|
|
fn.__set_has_var_args(false);
|
|
|
|
TExprNode texpr_node;
|
|
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
|
|
texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
|
|
texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
|
|
texpr_node.__set_fn(fn);
|
|
texpr_node.__set_is_nullable(last_expr->is_nullable() || vexprs[j]->is_nullable());
|
|
VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
|
|
new_node->add_child(last_expr);
|
|
DCHECK((vexprs[j])->get_impl() != nullptr);
|
|
new_node->add_child(vexprs[j]);
|
|
last_expr = new_node;
|
|
_rf_vexpr_set.insert(vexprs[j]);
|
|
}
|
|
auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
|
|
if (_vconjunct_ctx_ptr) {
|
|
(*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr);
|
|
}
|
|
RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, row_desc()));
|
|
RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(state));
|
|
if (_vconjunct_ctx_ptr) {
|
|
(*(_vconjunct_ctx_ptr.get()))->mark_as_stale();
|
|
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
|
|
}
|
|
_vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
|
|
*(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
} // namespace doris::vectorized
|