1. Fix a bug that query large column table may cause infinite loop 2. Optimize the query logic with limit, for the case where the limit value is relatively small, reduce the parallelism of the scanner, reduce unnecessary resource consumption, and increase the number of similar queries that the system can carry at the same time, and increase the query speed by more than 60%
2023 lines
82 KiB
C++
2023 lines
82 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include "vec/exec/volap_scan_node.h"
|
|
|
|
#include "common/resource_tls.h"
|
|
#include "exec/scan_node.h"
|
|
#include "gen_cpp/PlanNodes_types.h"
|
|
#include "olap/storage_engine.h"
|
|
#include "runtime/descriptors.h"
|
|
#include "runtime/exec_env.h"
|
|
#include "runtime/large_int_value.h"
|
|
#include "runtime/runtime_filter_mgr.h"
|
|
#include "util/priority_thread_pool.hpp"
|
|
#include "util/to_string.h"
|
|
#include "vec/core/block.h"
|
|
#include "vec/data_types/data_type_decimal.h"
|
|
#include "vec/exec/volap_scanner.h"
|
|
#include "vec/exprs/vcompound_pred.h"
|
|
#include "vec/exprs/vexpr.h"
|
|
|
|
namespace doris::vectorized {
|
|
using doris::operator<<;
|
|
|
|
VOlapScanNode::VOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
|
|
: ScanNode(pool, tnode, descs),
|
|
_tuple_id(tnode.olap_scan_node.tuple_id),
|
|
_olap_scan_node(tnode.olap_scan_node),
|
|
_tuple_desc(nullptr),
|
|
_tuple_idx(0),
|
|
_eos(false),
|
|
_max_materialized_row_batches(config::doris_scanner_queue_size),
|
|
_start(false),
|
|
_scanner_done(false),
|
|
_transfer_done(false),
|
|
_status(Status::OK()),
|
|
_resource_info(nullptr),
|
|
_buffered_bytes(0),
|
|
_eval_conjuncts_fn(nullptr),
|
|
_runtime_filter_descs(tnode.runtime_filters),
|
|
_max_materialized_blocks(config::doris_scanner_queue_size) {
|
|
_materialized_blocks.reserve(_max_materialized_blocks);
|
|
_free_blocks.reserve(_max_materialized_blocks);
|
|
}
|
|
|
|
Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
|
RETURN_IF_ERROR(ExecNode::init(tnode, state));
|
|
_direct_conjunct_size = _conjunct_ctxs.size();
|
|
|
|
const TQueryOptions& query_options = state->query_options();
|
|
if (query_options.__isset.max_scan_key_num) {
|
|
_max_scan_key_num = query_options.max_scan_key_num;
|
|
} else {
|
|
_max_scan_key_num = config::doris_max_scan_key_num;
|
|
}
|
|
|
|
if (query_options.__isset.max_pushdown_conditions_per_column) {
|
|
_max_pushdown_conditions_per_column = query_options.max_pushdown_conditions_per_column;
|
|
} else {
|
|
_max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
|
|
}
|
|
|
|
_max_scanner_queue_size_bytes = query_options.mem_limit / 20; //TODO: session variable percent
|
|
|
|
/// TODO: could one filter used in the different scan_node ?
|
|
int filter_size = _runtime_filter_descs.size();
|
|
_runtime_filter_ctxs.resize(filter_size);
|
|
_runtime_filter_ready_flag.resize(filter_size);
|
|
for (int i = 0; i < filter_size; ++i) {
|
|
IRuntimeFilter* runtime_filter = nullptr;
|
|
const auto& filter_desc = _runtime_filter_descs[i];
|
|
RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
|
|
RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id()));
|
|
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
|
|
&runtime_filter));
|
|
|
|
_runtime_filter_ctxs[i].runtimefilter = runtime_filter;
|
|
_runtime_filter_ready_flag[i] = false;
|
|
_rf_locks.push_back(std::make_unique<std::mutex>());
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
void VOlapScanNode::init_scan_profile() {
|
|
std::string scanner_profile_name = "VOlapScanner";
|
|
if (_olap_scan_node.__isset.table_name) {
|
|
scanner_profile_name = fmt::format("VOlapScanner({0})", _olap_scan_node.table_name);
|
|
}
|
|
_scanner_profile.reset(new RuntimeProfile(scanner_profile_name));
|
|
runtime_profile()->add_child(_scanner_profile.get(), true, nullptr);
|
|
|
|
_segment_profile.reset(new RuntimeProfile("SegmentIterator"));
|
|
_scanner_profile->add_child(_segment_profile.get(), true, nullptr);
|
|
}
|
|
|
|
void VOlapScanNode::_init_counter(RuntimeState* state) {
|
|
ADD_TIMER(_scanner_profile, "ShowHintsTime_V1");
|
|
|
|
_reader_init_timer = ADD_TIMER(_scanner_profile, "ReaderInitTime");
|
|
_read_compressed_counter = ADD_COUNTER(_segment_profile, "CompressedBytesRead", TUnit::BYTES);
|
|
_read_uncompressed_counter =
|
|
ADD_COUNTER(_segment_profile, "UncompressedBytesRead", TUnit::BYTES);
|
|
_block_load_timer = ADD_TIMER(_segment_profile, "BlockLoadTime");
|
|
_block_load_counter = ADD_COUNTER(_segment_profile, "BlocksLoad", TUnit::UNIT);
|
|
_block_fetch_timer = ADD_TIMER(_scanner_profile, "BlockFetchTime");
|
|
_raw_rows_counter = ADD_COUNTER(_segment_profile, "RawRowsRead", TUnit::UNIT);
|
|
_block_convert_timer = ADD_TIMER(_scanner_profile, "BlockConvertTime");
|
|
// Will be delete after non-vectorized code is removed
|
|
_block_seek_timer = ADD_TIMER(_segment_profile, "BlockSeekTime");
|
|
_block_seek_counter = ADD_COUNTER(_segment_profile, "BlockSeekCount", TUnit::UNIT);
|
|
_block_init_timer = ADD_TIMER(_segment_profile, "BlockInitTime");
|
|
_block_init_seek_timer = ADD_TIMER(_segment_profile, "BlockInitSeekTime");
|
|
_block_init_seek_counter = ADD_COUNTER(_segment_profile, "BlockInitSeekCount", TUnit::UNIT);
|
|
|
|
_rows_vec_cond_counter = ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT);
|
|
_vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime");
|
|
_short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime");
|
|
_first_read_timer = ADD_TIMER(_segment_profile, "FirstReadTime");
|
|
_first_read_seek_timer = ADD_TIMER(_segment_profile, "FirstReadSeekTime");
|
|
_first_read_seek_counter = ADD_COUNTER(_segment_profile, "FirstReadSeekCount", TUnit::UNIT);
|
|
|
|
_lazy_read_timer = ADD_TIMER(_segment_profile, "LazyReadTime");
|
|
_lazy_read_seek_timer = ADD_TIMER(_segment_profile, "LazyReadSeekTime");
|
|
_lazy_read_seek_counter = ADD_COUNTER(_segment_profile, "LazyReadSeekCount", TUnit::UNIT);
|
|
|
|
_output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime");
|
|
|
|
_stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT);
|
|
_bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT);
|
|
_del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT);
|
|
_conditions_filtered_counter =
|
|
ADD_COUNTER(_segment_profile, "RowsConditionsFiltered", TUnit::UNIT);
|
|
_key_range_filtered_counter =
|
|
ADD_COUNTER(_segment_profile, "RowsKeyRangeFiltered", TUnit::UNIT);
|
|
|
|
_io_timer = ADD_TIMER(_segment_profile, "IOTimer");
|
|
_decompressor_timer = ADD_TIMER(_segment_profile, "DecompressorTimer");
|
|
_index_load_timer = ADD_TIMER(_segment_profile, "IndexLoadTime_V1");
|
|
|
|
_scan_timer = ADD_TIMER(_scanner_profile, "ScanTime");
|
|
_scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScanCpuTime");
|
|
|
|
_total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT);
|
|
_cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT);
|
|
|
|
_bitmap_index_filter_counter =
|
|
ADD_COUNTER(_segment_profile, "RowsBitmapIndexFiltered", TUnit::UNIT);
|
|
_bitmap_index_filter_timer = ADD_TIMER(_segment_profile, "BitmapIndexFilterTimer");
|
|
|
|
_num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
|
|
|
|
_filtered_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentFiltered", TUnit::UNIT);
|
|
_total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT);
|
|
|
|
// time of transfer thread to wait for row batch from scan thread
|
|
_scanner_wait_batch_timer = ADD_TIMER(_runtime_profile, "ScannerBatchWaitTime");
|
|
// time of scan thread to wait for worker thread of the thread pool
|
|
_scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, "ScannerWorkerWaitTime");
|
|
|
|
// time of node to wait for batch/block queue
|
|
_olap_wait_batch_queue_timer = ADD_TIMER(_runtime_profile, "BatchQueueWaitTime");
|
|
|
|
// for the purpose of debugging or profiling
|
|
for (int i = 0; i < GENERAL_DEBUG_COUNT; ++i) {
|
|
char name[64];
|
|
snprintf(name, sizeof(name), "GeneralDebugTimer%d", i);
|
|
_general_debug_timer[i] = ADD_TIMER(_segment_profile, name);
|
|
}
|
|
}
|
|
|
|
Status VOlapScanNode::prepare(RuntimeState* state) {
|
|
init_scan_profile();
|
|
RETURN_IF_ERROR(ScanNode::prepare(state));
|
|
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
|
// create scanner profile
|
|
// create timer
|
|
_tablet_counter = ADD_COUNTER(runtime_profile(), "TabletCount ", TUnit::UNIT);
|
|
_scanner_sched_counter = ADD_COUNTER(runtime_profile(), "ScannerSchedCount ", TUnit::UNIT);
|
|
|
|
_rows_pushed_cond_filtered_counter =
|
|
ADD_COUNTER(_scanner_profile, "RowsPushedCondFiltered", TUnit::UNIT);
|
|
_init_counter(state);
|
|
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
|
|
|
|
_scanner_mem_tracker = std::make_unique<MemTracker>("OlapScanners");
|
|
|
|
if (_tuple_desc == nullptr) {
|
|
// TODO: make sure we print all available diagnostic output to our error log
|
|
return Status::InternalError("Failed to get tuple descriptor.");
|
|
}
|
|
|
|
const std::vector<SlotDescriptor*>& slots = _tuple_desc->slots();
|
|
|
|
for (int i = 0; i < slots.size(); ++i) {
|
|
if (!slots[i]->is_materialized()) {
|
|
continue;
|
|
}
|
|
|
|
if (slots[i]->type().is_collection_type()) {
|
|
_collection_slots.push_back(slots[i]);
|
|
}
|
|
|
|
if (slots[i]->type().is_string_type()) {
|
|
_string_slots.push_back(slots[i]);
|
|
}
|
|
}
|
|
|
|
_runtime_state = state;
|
|
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
|
|
IRuntimeFilter* runtime_filter = nullptr;
|
|
state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
|
|
&runtime_filter);
|
|
DCHECK(runtime_filter != nullptr);
|
|
runtime_filter->init_profile(_runtime_profile.get());
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::open(RuntimeState* state) {
|
|
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::open");
|
|
VLOG_CRITICAL << "VOlapScanNode::Open";
|
|
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
|
RETURN_IF_CANCELLED(state);
|
|
RETURN_IF_ERROR(ExecNode::open(state));
|
|
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
|
|
|
_resource_info = ResourceTls::get_resource_tls();
|
|
|
|
// acquire runtime filter
|
|
_runtime_filter_ctxs.resize(_runtime_filter_descs.size());
|
|
|
|
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
|
|
auto& filter_desc = _runtime_filter_descs[i];
|
|
IRuntimeFilter* runtime_filter = nullptr;
|
|
state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
|
|
DCHECK(runtime_filter != nullptr);
|
|
if (runtime_filter == nullptr) {
|
|
continue;
|
|
}
|
|
bool ready = runtime_filter->is_ready();
|
|
if (!ready) {
|
|
ready = runtime_filter->await();
|
|
}
|
|
if (ready) {
|
|
std::list<ExprContext*> expr_context;
|
|
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context));
|
|
_runtime_filter_ctxs[i].apply_mark = true;
|
|
_runtime_filter_ctxs[i].runtimefilter = runtime_filter;
|
|
|
|
for (auto ctx : expr_context) {
|
|
ctx->prepare(state, row_desc());
|
|
ctx->open(state);
|
|
int index = _conjunct_ctxs.size();
|
|
_conjunct_ctxs.push_back(ctx);
|
|
// it's safe to store address from a fix-resized vector
|
|
_conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i];
|
|
}
|
|
}
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
void VOlapScanNode::transfer_thread(RuntimeState* state) {
|
|
// scanner open pushdown to scanThread
|
|
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::transfer_thread");
|
|
SCOPED_ATTACH_TASK(state);
|
|
Status status = Status::OK();
|
|
|
|
if (_vconjunct_ctx_ptr) {
|
|
for (auto scanner : _volap_scanners) {
|
|
status = (*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr());
|
|
if (!status.ok()) {
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
_status = status;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*********************************
|
|
* 优先级调度基本策略:
|
|
* 1. 通过查询拆分的Range个数来确定初始nice值
|
|
* Range个数越多,越倾向于认定为大查询,nice值越小
|
|
* 2. 通过查询累计读取的数据量来调整nice值
|
|
* 读取的数据越多,越倾向于认定为大查询,nice值越小
|
|
* 3. 通过nice值来判断查询的优先级
|
|
* nice值越大的,越优先获得的查询资源
|
|
* 4. 定期提高队列内残留任务的优先级,避免大查询完全饿死
|
|
*********************************/
|
|
_total_assign_num = 0;
|
|
_nice = 18 + std::max(0, 2 - (int)_volap_scanners.size() / 5);
|
|
|
|
auto doris_scanner_row_num =
|
|
_limit == -1 ? config::doris_scanner_row_num
|
|
: std::min(static_cast<int64_t>(config::doris_scanner_row_num), _limit);
|
|
_block_size = _limit == -1 ? state->batch_size()
|
|
: std::min(static_cast<int64_t>(state->batch_size()), _limit);
|
|
auto block_per_scanner = (doris_scanner_row_num + (_block_size - 1)) / _block_size;
|
|
auto pre_block_count =
|
|
std::min(_volap_scanners.size(),
|
|
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) *
|
|
block_per_scanner;
|
|
|
|
for (int i = 0; i < pre_block_count; ++i) {
|
|
auto block = new Block(_tuple_desc->slots(), _block_size);
|
|
_free_blocks.emplace_back(block);
|
|
_buffered_bytes += block->allocated_bytes();
|
|
}
|
|
|
|
// read from scanner
|
|
while (LIKELY(status.ok())) {
|
|
int assigned_thread_num = _start_scanner_thread_task(state, block_per_scanner);
|
|
|
|
std::vector<Block*> blocks;
|
|
{
|
|
// 1 scanner idle task not empty, assign new scanner task
|
|
std::unique_lock<std::mutex> l(_scan_blocks_lock);
|
|
|
|
// scanner_row_num = 16k
|
|
// 16k * 10 * 12 * 8 = 15M(>2s) --> nice=10
|
|
// 16k * 20 * 22 * 8 = 55M(>6s) --> nice=0
|
|
while (_nice > 0 && _total_assign_num > (22 - _nice) * (20 - _nice) * 6) {
|
|
--_nice;
|
|
}
|
|
|
|
// 2 wait when all scanner are running & no result in queue
|
|
while (UNLIKELY(_running_thread == assigned_thread_num && _scan_blocks.empty() &&
|
|
!_scanner_done)) {
|
|
SCOPED_TIMER(_scanner_wait_batch_timer);
|
|
_scan_block_added_cv.wait(l);
|
|
}
|
|
|
|
// 3 transfer result block when queue is not empty
|
|
if (LIKELY(!_scan_blocks.empty())) {
|
|
blocks.swap(_scan_blocks);
|
|
for (auto b : blocks) {
|
|
_scan_row_batches_bytes -= b->allocated_bytes();
|
|
}
|
|
// delete scan_block if transfer thread should be stopped
|
|
// because scan_block wouldn't be useful anymore
|
|
if (UNLIKELY(_transfer_done)) {
|
|
std::for_each(blocks.begin(), blocks.end(), std::default_delete<Block>());
|
|
blocks.clear();
|
|
}
|
|
} else {
|
|
if (_scanner_done) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!blocks.empty()) {
|
|
_add_blocks(blocks);
|
|
}
|
|
}
|
|
telemetry::set_span_attribute(span, _scanner_sched_counter);
|
|
|
|
VLOG_CRITICAL << "TransferThread finish.";
|
|
_transfer_done = true;
|
|
_block_added_cv.notify_all();
|
|
{
|
|
std::unique_lock<std::mutex> l(_scan_blocks_lock);
|
|
_scan_thread_exit_cv.wait(l, [this] { return _running_thread == 0; });
|
|
}
|
|
VLOG_CRITICAL << "Scanner threads have been exited. TransferThread exit.";
|
|
}
|
|
|
|
void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
|
|
START_AND_SCOPE_SPAN(scanner->runtime_state()->get_tracer(), span,
|
|
"VOlapScanNode::scanner_thread");
|
|
SCOPED_ATTACH_TASK(_runtime_state);
|
|
Thread::set_self_name("volap_scanner");
|
|
int64_t wait_time = scanner->update_wait_worker_timer();
|
|
// Do not use ScopedTimer. There is no guarantee that, the counter
|
|
// (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`.
|
|
ThreadCpuStopWatch cpu_watch;
|
|
cpu_watch.start();
|
|
Status status = Status::OK();
|
|
bool eos = false;
|
|
RuntimeState* state = scanner->runtime_state();
|
|
DCHECK(nullptr != state);
|
|
if (!scanner->is_open()) {
|
|
status = scanner->open();
|
|
if (!status.ok()) {
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
_status = status;
|
|
eos = true;
|
|
}
|
|
scanner->set_opened();
|
|
}
|
|
|
|
/*
|
|
// the following code will cause coredump when running tpcds_sf1 sqls,
|
|
// disable temporariy to avoid it, SHOULD BE FIX LATER
|
|
std::vector<VExpr*> vexprs;
|
|
auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks();
|
|
DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
|
|
for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
|
|
if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) {
|
|
/// When runtime filters are ready during running, we should use them to filter data
|
|
/// in VOlapScanner.
|
|
/// New arrival rf will be processed as below:
|
|
/// 1. convert these runtime filters to vectorized expressions
|
|
/// 2. if this is the first scanner thread to receive this rf, construct a new
|
|
/// VExprContext and update `_vconjunct_ctx_ptr` in scan node. Notice that we use
|
|
/// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only
|
|
/// once after any runtime_filters are ready.
|
|
/// 3. finally, just copy this new VExprContext to scanner and use it to filter data.
|
|
IRuntimeFilter* runtime_filter = nullptr;
|
|
state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
|
|
&runtime_filter);
|
|
DCHECK(runtime_filter != nullptr);
|
|
bool ready = runtime_filter->is_ready();
|
|
if (ready) {
|
|
runtime_filter->get_prepared_vexprs(&vexprs, row_desc());
|
|
scanner_filter_apply_marks[i] = true;
|
|
if (!_runtime_filter_ready_flag[i] && !vexprs.empty()) {
|
|
std::unique_lock<std::mutex> l(*(_rf_locks[i]));
|
|
if (!_runtime_filter_ready_flag[i]) {
|
|
// Use all conjuncts and new arrival runtime filters to construct a new
|
|
// expression tree here.
|
|
auto last_expr =
|
|
_vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0];
|
|
for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
|
|
if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) {
|
|
continue;
|
|
}
|
|
TExprNode texpr_node;
|
|
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
|
|
texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
|
|
texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
|
|
VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
|
|
new_node->add_child(last_expr);
|
|
new_node->add_child(vexprs[j]);
|
|
last_expr = new_node;
|
|
_rf_vexpr_set.insert(vexprs[j]);
|
|
}
|
|
auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
|
|
auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc());
|
|
// If error occurs in `prepare` or `open` phase, discard these runtime
|
|
// filters directly.
|
|
if (UNLIKELY(!expr_status.OK())) {
|
|
LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
|
|
vexprs.clear();
|
|
break;
|
|
}
|
|
expr_status = new_vconjunct_ctx_ptr->open(state);
|
|
if (UNLIKELY(!expr_status.OK())) {
|
|
LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
|
|
vexprs.clear();
|
|
break;
|
|
}
|
|
if (_vconjunct_ctx_ptr) {
|
|
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
|
|
}
|
|
_vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
|
|
*(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
|
|
_runtime_filter_ready_flag[i] = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!vexprs.empty()) {
|
|
if (*scanner->vconjunct_ctx_ptr()) {
|
|
(*scanner->vconjunct_ctx_ptr())->close(state);
|
|
*scanner->vconjunct_ctx_ptr() = nullptr;
|
|
}
|
|
WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()),
|
|
"Something wrong for runtime filters: ");
|
|
scanner->set_use_pushdown_conjuncts(true);
|
|
}
|
|
*/
|
|
|
|
std::vector<Block*> blocks;
|
|
|
|
// Because we use thread pool to scan data from storage. One scanner can't
|
|
// use this thread too long, this can starve other query's scanner. So, we
|
|
// need yield this thread when we do enough work. However, OlapStorage read
|
|
// data in pre-aggregate mode, then we can't use storage returned data to
|
|
// judge if we need to yield. So we record all raw data read in this round
|
|
// scan, if this exceed row number or bytes threshold, we yield this thread.
|
|
int64_t raw_rows_read = scanner->raw_rows_read();
|
|
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
|
|
int64_t raw_bytes_read = 0;
|
|
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
|
|
bool get_free_block = true;
|
|
int num_rows_in_block = 0;
|
|
|
|
// Has to wait at least one full block, or it will cause a lot of schedule task in priority
|
|
// queue, it will affect query latency and query concurrency for example ssb 3.3.
|
|
while (!eos && raw_bytes_read < raw_bytes_threshold &&
|
|
((raw_rows_read < raw_rows_threshold && get_free_block) ||
|
|
num_rows_in_block < _runtime_state->batch_size())) {
|
|
if (UNLIKELY(_transfer_done)) {
|
|
eos = true;
|
|
status = Status::Cancelled("Cancelled");
|
|
LOG(INFO) << "Scan thread cancelled, cause query done, maybe reach limit.";
|
|
break;
|
|
}
|
|
|
|
auto block = _alloc_block(get_free_block);
|
|
status = scanner->get_block(_runtime_state, block, &eos);
|
|
VLOG_ROW << "VOlapScanNode input rows: " << block->rows();
|
|
if (!status.ok()) {
|
|
LOG(WARNING) << "Scan thread read VOlapScanner failed: " << status.to_string();
|
|
// Add block ptr in blocks, prevent mem leak in read failed
|
|
blocks.push_back(block);
|
|
eos = true;
|
|
break;
|
|
}
|
|
|
|
raw_bytes_read += block->bytes();
|
|
num_rows_in_block += block->rows();
|
|
// 4. if status not ok, change status_.
|
|
if (UNLIKELY(block->rows() == 0)) {
|
|
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
|
_free_blocks.emplace_back(block);
|
|
} else {
|
|
if (!blocks.empty() &&
|
|
blocks.back()->rows() + block->rows() <= _runtime_state->batch_size()) {
|
|
MutableBlock(blocks.back()).merge(*block);
|
|
block->clear_column_data();
|
|
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
|
_free_blocks.emplace_back(block);
|
|
} else {
|
|
blocks.push_back(block);
|
|
}
|
|
}
|
|
raw_rows_read = scanner->raw_rows_read();
|
|
}
|
|
|
|
{
|
|
// if we failed, check status.
|
|
if (UNLIKELY(!status.ok())) {
|
|
_transfer_done = true;
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
if (LIKELY(_status.ok())) {
|
|
_status = status;
|
|
}
|
|
}
|
|
|
|
bool global_status_ok = false;
|
|
{
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
global_status_ok = _status.ok();
|
|
}
|
|
if (UNLIKELY(!global_status_ok)) {
|
|
eos = true;
|
|
std::for_each(blocks.begin(), blocks.end(), std::default_delete<Block>());
|
|
} else {
|
|
std::lock_guard<std::mutex> l(_scan_blocks_lock);
|
|
_scan_blocks.insert(_scan_blocks.end(), blocks.begin(), blocks.end());
|
|
for (auto b : blocks) {
|
|
_scan_row_batches_bytes += b->allocated_bytes();
|
|
}
|
|
}
|
|
// If eos is true, we will process out of this lock block.
|
|
if (eos) {
|
|
scanner->mark_to_need_to_close();
|
|
}
|
|
std::lock_guard<std::mutex> l(_volap_scanners_lock);
|
|
_volap_scanners.push_front(scanner);
|
|
}
|
|
if (eos) {
|
|
std::lock_guard<std::mutex> l(_scan_blocks_lock);
|
|
_progress.update(1);
|
|
if (_progress.done()) {
|
|
// this is the right out
|
|
_scanner_done = true;
|
|
}
|
|
}
|
|
_scan_cpu_timer->update(cpu_watch.elapsed_time());
|
|
_scanner_wait_worker_timer->update(wait_time);
|
|
|
|
std::unique_lock<std::mutex> l(_scan_blocks_lock);
|
|
_running_thread--;
|
|
|
|
// The transfer thead will wait for `_running_thread==0`, to make sure all scanner threads won't access class members.
|
|
// Do not access class members after this code.
|
|
_scan_block_added_cv.notify_one();
|
|
_scan_thread_exit_cv.notify_one();
|
|
}
|
|
|
|
Status VOlapScanNode::_add_blocks(std::vector<Block*>& block) {
|
|
{
|
|
std::unique_lock<std::mutex> l(_blocks_lock);
|
|
|
|
// check queue limit for both block queue size and bytes
|
|
while (UNLIKELY((_materialized_blocks.size() >= _max_materialized_blocks ||
|
|
_materialized_row_batches_bytes >= _max_scanner_queue_size_bytes / 2) &&
|
|
!_transfer_done)) {
|
|
_block_consumed_cv.wait(l);
|
|
}
|
|
|
|
VLOG_CRITICAL << "Push block to materialized_blocks";
|
|
_materialized_blocks.insert(_materialized_blocks.end(), block.cbegin(), block.cend());
|
|
for (auto b : block) {
|
|
_materialized_row_batches_bytes += b->allocated_bytes();
|
|
}
|
|
}
|
|
// remove one block, notify main thread
|
|
_block_added_cv.notify_one();
|
|
return Status::OK();
|
|
}
|
|
|
|
void VOlapScanNode::eval_const_conjuncts() {
|
|
for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
|
|
// if conjunct is constant, compute direct and set eos = true
|
|
if (_conjunct_ctxs[conj_idx]->root()->is_constant()) {
|
|
void* value = _conjunct_ctxs[conj_idx]->get_value(nullptr);
|
|
if (value == nullptr || *reinterpret_cast<bool*>(value) == false) {
|
|
_eos = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Status VOlapScanNode::normalize_conjuncts() {
|
|
std::vector<SlotDescriptor*> slots = _tuple_desc->slots();
|
|
|
|
for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) {
|
|
switch (slots[slot_idx]->type().type) {
|
|
case TYPE_TINYINT: {
|
|
ColumnValueRange<TYPE_TINYINT> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_SMALLINT: {
|
|
ColumnValueRange<TYPE_SMALLINT> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_INT: {
|
|
ColumnValueRange<TYPE_INT> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_BIGINT: {
|
|
ColumnValueRange<TYPE_BIGINT> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_LARGEINT: {
|
|
ColumnValueRange<TYPE_LARGEINT> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_CHAR: {
|
|
ColumnValueRange<TYPE_CHAR> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
case TYPE_VARCHAR: {
|
|
ColumnValueRange<TYPE_VARCHAR> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
case TYPE_HLL: {
|
|
ColumnValueRange<TYPE_HLL> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
case TYPE_STRING: {
|
|
ColumnValueRange<TYPE_STRING> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_DATE: {
|
|
ColumnValueRange<TYPE_DATE> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
case TYPE_DATETIME: {
|
|
ColumnValueRange<TYPE_DATETIME> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_DATEV2: {
|
|
ColumnValueRange<TYPE_DATEV2> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_DATETIMEV2: {
|
|
ColumnValueRange<TYPE_DATETIMEV2> range(slots[slot_idx]->col_name(),
|
|
slots[slot_idx]->type().precision,
|
|
slots[slot_idx]->type().scale);
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_DECIMALV2: {
|
|
ColumnValueRange<TYPE_DECIMALV2> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_DECIMAL32: {
|
|
ColumnValueRange<TYPE_DECIMAL32> range(slots[slot_idx]->col_name(),
|
|
slots[slot_idx]->type().precision,
|
|
slots[slot_idx]->type().scale);
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_DECIMAL64: {
|
|
ColumnValueRange<TYPE_DECIMAL64> range(slots[slot_idx]->col_name(),
|
|
slots[slot_idx]->type().precision,
|
|
slots[slot_idx]->type().scale);
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_DECIMAL128: {
|
|
ColumnValueRange<TYPE_DECIMAL128> range(slots[slot_idx]->col_name(),
|
|
slots[slot_idx]->type().precision,
|
|
slots[slot_idx]->type().scale);
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
case TYPE_BOOLEAN: {
|
|
ColumnValueRange<TYPE_BOOLEAN> range(slots[slot_idx]->col_name());
|
|
normalize_predicate(range, slots[slot_idx]);
|
|
break;
|
|
}
|
|
|
|
default: {
|
|
VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << slots[slot_idx]->col_name()
|
|
<< "]";
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
static std::string olap_filter_to_string(const doris::TCondition& condition) {
|
|
auto op_name = condition.condition_op;
|
|
if (condition.condition_op == "*=") {
|
|
op_name = "IN";
|
|
} else if (condition.condition_op == "!*=") {
|
|
op_name = "NOT IN";
|
|
}
|
|
return fmt::format("{{{} {} {}}}", condition.column_name, op_name,
|
|
to_string(condition.condition_values));
|
|
}
|
|
|
|
static std::string olap_filters_to_string(const std::vector<doris::TCondition>& filters) {
|
|
std::string filters_string;
|
|
filters_string += "[";
|
|
for (auto it = filters.cbegin(); it != filters.cend(); it++) {
|
|
if (it != filters.cbegin()) {
|
|
filters_string += ",";
|
|
}
|
|
filters_string += olap_filter_to_string(*it);
|
|
}
|
|
filters_string += "]";
|
|
return filters_string;
|
|
}
|
|
|
|
Status VOlapScanNode::build_function_filters() {
|
|
for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
|
|
ExprContext* ex_ctx = _conjunct_ctxs[conj_idx];
|
|
Expr* fn_expr = ex_ctx->root();
|
|
bool opposite = false;
|
|
|
|
if (TExprNodeType::COMPOUND_PRED == fn_expr->node_type() &&
|
|
TExprOpcode::COMPOUND_NOT == fn_expr->op()) {
|
|
fn_expr = fn_expr->get_child(0);
|
|
opposite = true;
|
|
}
|
|
|
|
// currently only support like / not like
|
|
if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type() &&
|
|
"like" == fn_expr->fn().name.function_name) {
|
|
doris_udf::FunctionContext* func_cxt =
|
|
ex_ctx->fn_context(fn_expr->get_fn_context_index());
|
|
|
|
if (!func_cxt) {
|
|
continue;
|
|
}
|
|
if (fn_expr->children().size() != 2) {
|
|
continue;
|
|
}
|
|
SlotRef* slot_ref = nullptr;
|
|
Expr* literal_expr = nullptr;
|
|
|
|
if (TExprNodeType::SLOT_REF == fn_expr->get_child(0)->node_type()) {
|
|
literal_expr = fn_expr->get_child(1);
|
|
slot_ref = (SlotRef*)(fn_expr->get_child(0));
|
|
} else if (TExprNodeType::SLOT_REF == fn_expr->get_child(1)->node_type()) {
|
|
literal_expr = fn_expr->get_child(0);
|
|
slot_ref = (SlotRef*)(fn_expr->get_child(1));
|
|
} else {
|
|
continue;
|
|
}
|
|
|
|
if (TExprNodeType::STRING_LITERAL != literal_expr->node_type()) continue;
|
|
|
|
const SlotDescriptor* slot_desc = nullptr;
|
|
std::vector<SlotId> slot_ids;
|
|
slot_ref->get_slot_ids(&slot_ids);
|
|
for (SlotDescriptor* slot : _tuple_desc->slots()) {
|
|
if (slot->id() == slot_ids[0]) {
|
|
slot_desc = slot;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!slot_desc) {
|
|
continue;
|
|
}
|
|
std::string col = slot_desc->col_name();
|
|
StringVal val = literal_expr->get_string_val(ex_ctx, nullptr);
|
|
_push_down_functions.emplace_back(opposite, col, func_cxt, val);
|
|
_pushed_func_conjuncts_index.insert(conj_idx);
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::build_key_ranges_and_filters() {
|
|
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
|
|
const std::vector<TPrimitiveType::type>& column_types = _olap_scan_node.key_column_type;
|
|
DCHECK(column_types.size() == column_names.size());
|
|
|
|
// 1. construct scan key except last olap engine short key
|
|
_scan_keys.set_is_convertible(limit() == -1);
|
|
|
|
// we use `exact_range` to identify a key range is an exact range or not when we convert
|
|
// it to `_scan_keys`. If `exact_range` is true, we can just discard it from `_olap_filter`.
|
|
bool exact_range = true;
|
|
for (int column_index = 0; column_index < column_names.size() && !_scan_keys.has_range_value();
|
|
++column_index) {
|
|
auto iter = _column_value_ranges.find(column_names[column_index]);
|
|
if (_column_value_ranges.end() == iter) {
|
|
break;
|
|
}
|
|
|
|
RETURN_IF_ERROR(std::visit(
|
|
[&](auto&& range) {
|
|
RETURN_IF_ERROR(
|
|
_scan_keys.extend_scan_key(range, _max_scan_key_num, &exact_range));
|
|
if (exact_range) {
|
|
_column_value_ranges.erase(iter->first);
|
|
}
|
|
return Status::OK();
|
|
},
|
|
iter->second));
|
|
}
|
|
|
|
for (auto& iter : _column_value_ranges) {
|
|
std::vector<TCondition> filters;
|
|
std::visit([&](auto&& range) { range.to_olap_filter(filters); }, iter.second);
|
|
|
|
for (const auto& filter : filters) {
|
|
_olap_filter.push_back(std::move(filter));
|
|
}
|
|
}
|
|
|
|
_runtime_profile->add_info_string("PushdownPredicate", olap_filters_to_string(_olap_filter));
|
|
|
|
_runtime_profile->add_info_string("KeyRanges", _scan_keys.debug_string());
|
|
|
|
VLOG_CRITICAL << _scan_keys.debug_string();
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::start_scan(RuntimeState* state) {
|
|
RETURN_IF_CANCELLED(state);
|
|
|
|
VLOG_CRITICAL << "Eval Const Conjuncts";
|
|
// 1. Eval const conjuncts to find whether eos = true
|
|
eval_const_conjuncts();
|
|
|
|
VLOG_CRITICAL << "NormalizeConjuncts";
|
|
// 2. Convert conjuncts to ColumnValueRange in each column, some conjuncts may
|
|
// set eos = true
|
|
RETURN_IF_ERROR(normalize_conjuncts());
|
|
|
|
// 1 and 2 step dispose find conjuncts set eos = true, return directly
|
|
if (_eos) {
|
|
return Status::OK();
|
|
}
|
|
|
|
VLOG_CRITICAL << "BuildKeyRangesAndFilters";
|
|
// 3.1 Using `Key Column`'s ColumnValueRange to split ScanRange to several `Sub ScanRange`
|
|
RETURN_IF_ERROR(build_key_ranges_and_filters());
|
|
// 3.2 Function pushdown
|
|
if (config::enable_function_pushdown) RETURN_IF_ERROR(build_function_filters());
|
|
|
|
VLOG_CRITICAL << "Filter idle conjuncts";
|
|
// 4.1 Filter idle conjunct which already trans to olap filters
|
|
// this must be after build_scan_key, it will free the StringValue memory
|
|
remove_pushed_conjuncts(state);
|
|
|
|
VLOG_CRITICAL << "StartScanThread";
|
|
// 5. Start multi thread to read several `Sub Sub ScanRange`
|
|
RETURN_IF_ERROR(start_scan_thread(state));
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
template <PrimitiveType T>
|
|
Status VOlapScanNode::normalize_predicate(ColumnValueRange<T>& range, SlotDescriptor* slot) {
|
|
// 1. Normalize InPredicate, add to ColumnValueRange
|
|
RETURN_IF_ERROR(normalize_in_and_eq_predicate(slot, &range));
|
|
|
|
// 2. Normalize NotInPredicate, add to ColumnValueRange
|
|
RETURN_IF_ERROR(normalize_not_in_and_not_eq_predicate(slot, &range));
|
|
|
|
// 3. Normalize BinaryPredicate , add to ColumnValueRange
|
|
RETURN_IF_ERROR(normalize_noneq_binary_predicate(slot, &range));
|
|
|
|
// 3. Normalize BloomFilterPredicate, push down by hash join node
|
|
RETURN_IF_ERROR(normalize_bloom_filter_predicate(slot));
|
|
|
|
// 4. Check whether range is empty, set _eos
|
|
if (range.is_empty_value_range()) _eos = true;
|
|
|
|
// 5. Add range to Column->ColumnValueRange map
|
|
_column_value_ranges[slot->col_name()] = range;
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
static bool ignore_cast(SlotDescriptor* slot, Expr* expr) {
|
|
if ((slot->type().is_date_type() || slot->type().is_date_v2_type() ||
|
|
slot->type().is_datetime_v2_type()) &&
|
|
(expr->type().is_date_type() || expr->type().is_date_v2_type() ||
|
|
expr->type().is_datetime_v2_type())) {
|
|
return true;
|
|
}
|
|
if (slot->type().is_string_type() && expr->type().is_string_type()) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool VOlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor* slot,
|
|
doris::InPredicate* pred) {
|
|
if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
|
|
// not a slot ref(column)
|
|
return false;
|
|
}
|
|
|
|
std::vector<SlotId> slot_ids;
|
|
if (pred->get_child(0)->get_slot_ids(&slot_ids) != 1) {
|
|
// not a single column predicate
|
|
return false;
|
|
}
|
|
|
|
if (slot_ids[0] != slot->id()) {
|
|
// predicate not related to current column
|
|
return false;
|
|
}
|
|
|
|
if (pred->get_child(0)->type().type != slot->type().type) {
|
|
if (!ignore_cast(slot, pred->get_child(0))) {
|
|
// the type of predicate not match the slot's type
|
|
return false;
|
|
}
|
|
}
|
|
|
|
VLOG_CRITICAL << slot->col_name() << " fixed_values add num: " << pred->hybrid_set()->size();
|
|
|
|
// if there are too many elements in InPredicate, exceed the limit,
|
|
// we will not push any condition of this column to storage engine.
|
|
// because too many conditions pushed down to storage engine may even
|
|
// slow down the query process.
|
|
// ATTN: This is just an experience value. You may need to try
|
|
// different thresholds to improve performance.
|
|
if (pred->hybrid_set()->size() > _max_pushdown_conditions_per_column) {
|
|
VLOG_NOTICE << "Predicate value num " << pred->hybrid_set()->size() << " exceed limit "
|
|
<< _max_pushdown_conditions_per_column;
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
std::pair<bool, void*> VOlapScanNode::should_push_down_eq_predicate(doris::SlotDescriptor* slot,
|
|
doris::Expr* pred, int conj_idx,
|
|
int child_idx) {
|
|
auto result_pair = std::make_pair<bool, void*>(false, nullptr);
|
|
|
|
// Do not get slot_ref of column, should not push_down to Storage Engine
|
|
if (Expr::type_without_cast(pred->get_child(child_idx)) != TExprNodeType::SLOT_REF) {
|
|
return result_pair;
|
|
}
|
|
|
|
std::vector<SlotId> slot_ids;
|
|
if (pred->get_child(child_idx)->get_slot_ids(&slot_ids) != 1) {
|
|
// not a single column predicate
|
|
return result_pair;
|
|
}
|
|
|
|
if (slot_ids[0] != slot->id()) {
|
|
// predicate not related to current column
|
|
return result_pair;
|
|
}
|
|
|
|
if (pred->get_child(child_idx)->type().type != slot->type().type) {
|
|
if (!ignore_cast(slot, pred->get_child(child_idx))) {
|
|
// the type of predicate not match the slot's type
|
|
return result_pair;
|
|
}
|
|
}
|
|
|
|
Expr* expr = pred->get_child(1 - child_idx);
|
|
if (!expr->is_constant()) {
|
|
// only handle constant value
|
|
return result_pair;
|
|
}
|
|
|
|
// get value in result pair
|
|
result_pair = std::make_pair(
|
|
true, _conjunct_ctxs[conj_idx]->get_value(expr, nullptr, slot->type().precision,
|
|
slot->type().scale));
|
|
|
|
return result_pair;
|
|
}
|
|
|
|
template <PrimitiveType primitive_type, typename ChangeFixedValueRangeFunc>
|
|
Status VOlapScanNode::change_fixed_value_range(ColumnValueRange<primitive_type>& temp_range,
|
|
void* value, const ChangeFixedValueRangeFunc& func) {
|
|
switch (primitive_type) {
|
|
case TYPE_DATE: {
|
|
DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
|
|
// There is must return empty data in olap_scan_node,
|
|
// Because data value loss accuracy
|
|
if (!date_value.check_loss_accuracy_cast_to_date()) {
|
|
func(temp_range,
|
|
reinterpret_cast<typename PrimitiveTypeTraits<primitive_type>::CppType*>(
|
|
&date_value));
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_DECIMALV2:
|
|
case TYPE_CHAR:
|
|
case TYPE_VARCHAR:
|
|
case TYPE_HLL:
|
|
case TYPE_DATETIME:
|
|
case TYPE_DATETIMEV2:
|
|
case TYPE_TINYINT:
|
|
case TYPE_SMALLINT:
|
|
case TYPE_INT:
|
|
case TYPE_BIGINT:
|
|
case TYPE_LARGEINT:
|
|
case TYPE_DECIMAL32:
|
|
case TYPE_DECIMAL64:
|
|
case TYPE_DECIMAL128:
|
|
case TYPE_STRING: {
|
|
func(temp_range,
|
|
reinterpret_cast<typename PrimitiveTypeTraits<primitive_type>::CppType*>(value));
|
|
break;
|
|
}
|
|
case TYPE_BOOLEAN: {
|
|
bool v = *reinterpret_cast<bool*>(value);
|
|
func(temp_range,
|
|
reinterpret_cast<typename PrimitiveTypeTraits<primitive_type>::CppType*>(&v));
|
|
break;
|
|
}
|
|
case TYPE_DATEV2: {
|
|
DateV2Value<DateTimeV2ValueType> datetimev2_value =
|
|
*reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(value);
|
|
if (datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
|
|
DateV2Value<DateV2ValueType> date_v2;
|
|
date_v2.set_date_uint32(
|
|
binary_cast<DateV2Value<DateTimeV2ValueType>, uint64_t>(datetimev2_value) >>
|
|
TIME_PART_LENGTH);
|
|
if constexpr (primitive_type == PrimitiveType::TYPE_DATEV2) {
|
|
func(temp_range, &date_v2);
|
|
} else {
|
|
__builtin_unreachable();
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
default: {
|
|
LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type="
|
|
<< primitive_type << "]";
|
|
return Status::InternalError("Normalize filter fail, Unsupported Primitive type");
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
bool VOlapScanNode::is_key_column(const std::string& key_name) {
|
|
// all column in dup_keys table olap scan node threat
|
|
// as key column
|
|
if (_olap_scan_node.keyType == TKeysType::DUP_KEYS) {
|
|
return true;
|
|
}
|
|
|
|
auto res = std::find(_olap_scan_node.key_column_name.begin(),
|
|
_olap_scan_node.key_column_name.end(), key_name);
|
|
return res != _olap_scan_node.key_column_name.end();
|
|
}
|
|
|
|
void VOlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
|
|
if (_pushed_conjuncts_index.empty() && _pushed_func_conjuncts_index.empty()) {
|
|
return;
|
|
}
|
|
|
|
// dispose direct conjunct first
|
|
std::vector<ExprContext*> new_conjunct_ctxs;
|
|
for (int i = 0; i < _direct_conjunct_size; ++i) {
|
|
if (!_pushed_conjuncts_index.empty() && _pushed_conjuncts_index.count(i)) {
|
|
_conjunct_ctxs[i]->close(state); // pushed condition, just close
|
|
} else if (!_pushed_func_conjuncts_index.empty() && _pushed_func_conjuncts_index.count(i)) {
|
|
_pushed_func_conjunct_ctxs.emplace_back(
|
|
_conjunct_ctxs[i]); // pushed functions, need keep ctxs
|
|
} else {
|
|
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
|
|
}
|
|
}
|
|
auto new_direct_conjunct_size = new_conjunct_ctxs.size();
|
|
|
|
// dispose hash join push down conjunct second
|
|
for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) {
|
|
if (!_pushed_conjuncts_index.empty() && _pushed_conjuncts_index.count(i)) {
|
|
_conjunct_ctxs[i]->close(state); // pushed condition, just close
|
|
} else if (!_pushed_func_conjuncts_index.empty() && _pushed_func_conjuncts_index.count(i)) {
|
|
_pushed_func_conjunct_ctxs.emplace_back(
|
|
_conjunct_ctxs[i]); // pushed functions, need keep ctxs
|
|
} else {
|
|
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
|
|
}
|
|
}
|
|
|
|
_conjunct_ctxs = std::move(new_conjunct_ctxs);
|
|
_direct_conjunct_size = new_direct_conjunct_size;
|
|
|
|
// TODO: support vbloom_filter_predicate/vbinary_predicate and merge unpushed predicate to _vconjunct_ctx
|
|
for (auto push_down_ctx : _pushed_conjuncts_index) {
|
|
auto iter = _conjunctid_to_runtime_filter_ctxs.find(push_down_ctx);
|
|
if (iter != _conjunctid_to_runtime_filter_ctxs.end()) {
|
|
iter->second->runtimefilter->set_push_down_profile();
|
|
}
|
|
}
|
|
|
|
// set vconjunct_ctx is empty, if all conjunct
|
|
if (_direct_conjunct_size == 0) {
|
|
if (_vconjunct_ctx_ptr != nullptr) {
|
|
(*_vconjunct_ctx_ptr)->close(state);
|
|
_vconjunct_ctx_ptr = nullptr;
|
|
}
|
|
}
|
|
|
|
// filter idle conjunct in vexpr_contexts
|
|
auto checker = [&](int index) { return _pushed_conjuncts_index.count(index); };
|
|
_peel_pushed_vconjunct(state, checker);
|
|
}
|
|
|
|
// Construct the ColumnValueRange for one specified column
|
|
// It will only handle the InPredicate and eq BinaryPredicate in conjunct_ctxs.
|
|
// It will try to push down conditions of that column as much as possible,
|
|
// But if the number of conditions exceeds the limit, none of conditions will be pushed down.
|
|
template <PrimitiveType T>
|
|
Status VOlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
|
|
ColumnValueRange<T>* range) {
|
|
std::vector<uint32_t> filter_conjuncts_index;
|
|
for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
|
|
// create empty range as temp range, temp range should do intersection on range
|
|
auto temp_range = ColumnValueRange<T>::create_empty_column_value_range(
|
|
slot->type().precision, slot->type().scale);
|
|
|
|
// 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
|
|
if (TExprOpcode::FILTER_IN == _conjunct_ctxs[conj_idx]->root()->op()) {
|
|
InPredicate* pred = static_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
|
|
if (!should_push_down_in_predicate(slot, pred)) {
|
|
continue;
|
|
}
|
|
|
|
// begin to push InPredicate value into ColumnValueRange
|
|
HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin();
|
|
while (iter->has_next()) {
|
|
// column in (nullptr) is always false so continue to
|
|
// dispose next item
|
|
if (nullptr == iter->get_value()) {
|
|
iter->next();
|
|
continue;
|
|
}
|
|
auto value = const_cast<void*>(iter->get_value());
|
|
RETURN_IF_ERROR(change_fixed_value_range(
|
|
temp_range, value, ColumnValueRange<T>::add_fixed_value_range));
|
|
iter->next();
|
|
}
|
|
|
|
if (is_key_column(slot->col_name())) {
|
|
filter_conjuncts_index.emplace_back(conj_idx);
|
|
}
|
|
range->intersection(temp_range);
|
|
} // end of handle in predicate
|
|
// 2. Normalize eq conjuncts like 'where col = value'
|
|
else if (TExprNodeType::BINARY_PRED == _conjunct_ctxs[conj_idx]->root()->node_type() &&
|
|
FILTER_IN == to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) {
|
|
Expr* pred = _conjunct_ctxs[conj_idx]->root();
|
|
DCHECK(pred->get_num_children() == 2);
|
|
|
|
for (int child_idx = 0; child_idx < 2; ++child_idx) {
|
|
// TODO: should use C++17 structured bindlings to refactor this code in the future:
|
|
// 'auto [should_push_down, value] = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);'
|
|
// make code tidier and readabler
|
|
auto result_pair = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);
|
|
if (!result_pair.first) {
|
|
continue;
|
|
}
|
|
|
|
auto value = result_pair.second;
|
|
// where A = nullptr should return empty result set
|
|
if (value != nullptr) {
|
|
RETURN_IF_ERROR(change_fixed_value_range(
|
|
temp_range, value, ColumnValueRange<T>::add_fixed_value_range));
|
|
}
|
|
|
|
if (is_key_column(slot->col_name())) {
|
|
filter_conjuncts_index.emplace_back(conj_idx);
|
|
}
|
|
range->intersection(temp_range);
|
|
} // end for each binary predicate child
|
|
} // end of handling eq binary predicate
|
|
}
|
|
|
|
// exceed limit, no conditions will be pushed down to storage engine.
|
|
if (range->get_fixed_value_size() > _max_pushdown_conditions_per_column) {
|
|
range->set_whole_value_range();
|
|
} else {
|
|
std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
|
|
std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// Construct the ColumnValueRange for one specified column
|
|
// It will only handle the NotInPredicate and not eq BinaryPredicate in conjunct_ctxs.
|
|
// It will try to push down conditions of that column as much as possible,
|
|
// But if the number of conditions exceeds the limit, none of conditions will be pushed down.
|
|
template <PrimitiveType T>
|
|
Status VOlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot,
|
|
ColumnValueRange<T>* range) {
|
|
// If the conjunct of slot is fixed value, will change the fixed value set of column value range
|
|
// else add value to not in range and push down predicate directly
|
|
bool is_fixed_range = range->is_fixed_value_range();
|
|
auto not_in_range = ColumnValueRange<T>::create_empty_column_value_range(range->column_name());
|
|
|
|
std::vector<uint32_t> filter_conjuncts_index;
|
|
for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
|
|
// 1. Normalize in conjuncts like 'where col not in (v1, v2, v3)'
|
|
if (TExprOpcode::FILTER_NOT_IN == _conjunct_ctxs[conj_idx]->root()->op()) {
|
|
InPredicate* pred = static_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
|
|
if (!should_push_down_in_predicate(slot, pred)) {
|
|
continue;
|
|
}
|
|
|
|
// begin to push InPredicate value into ColumnValueRange
|
|
auto iter = pred->hybrid_set()->begin();
|
|
while (iter->has_next()) {
|
|
// column not in (nullptr) is always true
|
|
if (nullptr == iter->get_value()) {
|
|
continue;
|
|
}
|
|
auto value = const_cast<void*>(iter->get_value());
|
|
if (is_fixed_range) {
|
|
RETURN_IF_ERROR(change_fixed_value_range(
|
|
*range, value, ColumnValueRange<T>::remove_fixed_value_range));
|
|
} else {
|
|
RETURN_IF_ERROR(change_fixed_value_range(
|
|
not_in_range, value, ColumnValueRange<T>::add_fixed_value_range));
|
|
}
|
|
iter->next();
|
|
}
|
|
|
|
// only where a in ('a', 'b', nullptr) contain nullptr will
|
|
// clear temp_range to whole range, no need do intersection
|
|
if (is_key_column(slot->col_name())) {
|
|
filter_conjuncts_index.emplace_back(conj_idx);
|
|
}
|
|
} // end of handle not in predicate
|
|
|
|
// 2. Normalize eq conjuncts like 'where col != value'
|
|
if (TExprNodeType::BINARY_PRED == _conjunct_ctxs[conj_idx]->root()->node_type() &&
|
|
FILTER_NOT_IN == to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) {
|
|
Expr* pred = _conjunct_ctxs[conj_idx]->root();
|
|
DCHECK(pred->get_num_children() == 2);
|
|
|
|
for (int child_idx = 0; child_idx < 2; ++child_idx) {
|
|
// TODO: should use C++17 structured bindlings to refactor this code in the future:
|
|
// 'auto [should_push_down, value] = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);'
|
|
// make code tidier and readabler
|
|
auto result_pair = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);
|
|
if (!result_pair.first) {
|
|
continue;
|
|
}
|
|
auto value = result_pair.second;
|
|
|
|
if (is_fixed_range) {
|
|
RETURN_IF_ERROR(change_fixed_value_range(
|
|
*range, value, ColumnValueRange<T>::remove_fixed_value_range));
|
|
} else {
|
|
RETURN_IF_ERROR(change_fixed_value_range(
|
|
not_in_range, value, ColumnValueRange<T>::add_fixed_value_range));
|
|
}
|
|
|
|
if (is_key_column(slot->col_name())) {
|
|
filter_conjuncts_index.emplace_back(conj_idx);
|
|
}
|
|
} // end for each binary predicate child
|
|
} // end of handling eq binary predicate
|
|
}
|
|
|
|
// exceed limit, no conditions will be pushed down to storage engine.
|
|
if (is_fixed_range ||
|
|
not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) {
|
|
if (!is_fixed_range) {
|
|
// push down not in condition to storage engine
|
|
not_in_range.to_in_condition(_olap_filter, false);
|
|
}
|
|
std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
|
|
std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
template <PrimitiveType T>
|
|
bool VOlapScanNode::normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot,
|
|
const std::string& is_null_str,
|
|
ColumnValueRange<T>* range) {
|
|
if (expr->node_type() != TExprNodeType::SLOT_REF) {
|
|
return false;
|
|
}
|
|
|
|
std::vector<SlotId> slot_ids;
|
|
if (1 != expr->get_slot_ids(&slot_ids)) {
|
|
return false;
|
|
}
|
|
|
|
if (slot_ids[0] != slot->id()) {
|
|
return false;
|
|
}
|
|
|
|
auto temp_range = ColumnValueRange<T>::create_empty_column_value_range(slot->type().precision,
|
|
slot->type().scale);
|
|
temp_range.set_contain_null(is_null_str == "null");
|
|
range->intersection(temp_range);
|
|
|
|
return true;
|
|
}
|
|
|
|
template <PrimitiveType T>
|
|
Status VOlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot,
|
|
ColumnValueRange<T>* range) {
|
|
std::vector<uint32_t> filter_conjuncts_index;
|
|
|
|
for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
|
|
Expr* root_expr = _conjunct_ctxs[conj_idx]->root();
|
|
if (TExprNodeType::BINARY_PRED != root_expr->node_type() ||
|
|
FILTER_IN == to_olap_filter_type(root_expr->op(), false) ||
|
|
FILTER_NOT_IN == to_olap_filter_type(root_expr->op(), false)) {
|
|
if (TExprNodeType::FUNCTION_CALL == root_expr->node_type()) {
|
|
std::string is_null_str;
|
|
// 1. dispose the where pred "A is null" and "A is not null"
|
|
if (root_expr->is_null_scalar_function(is_null_str) &&
|
|
normalize_is_null_predicate(root_expr->get_child(0), slot, is_null_str,
|
|
range)) {
|
|
// if column is key column should push down conjunct storage engine
|
|
if (is_key_column(slot->col_name())) {
|
|
filter_conjuncts_index.emplace_back(conj_idx);
|
|
}
|
|
}
|
|
}
|
|
continue;
|
|
}
|
|
|
|
// 2. dispose the where pred "A <,<=" and "A >,>="
|
|
Expr* pred = _conjunct_ctxs[conj_idx]->root();
|
|
DCHECK(pred->get_num_children() == 2);
|
|
|
|
for (int child_idx = 0; child_idx < 2; ++child_idx) {
|
|
if (Expr::type_without_cast(pred->get_child(child_idx)) != TExprNodeType::SLOT_REF) {
|
|
continue;
|
|
}
|
|
if (pred->get_child(child_idx)->type().type != slot->type().type) {
|
|
if (!ignore_cast(slot, pred->get_child(child_idx))) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
std::vector<SlotId> slot_ids;
|
|
|
|
if (1 == pred->get_child(child_idx)->get_slot_ids(&slot_ids)) {
|
|
if (slot_ids[0] != slot->id()) {
|
|
continue;
|
|
}
|
|
|
|
Expr* expr = pred->get_child(1 - child_idx);
|
|
|
|
// for case: where col_a > col_b
|
|
if (!expr->is_constant()) {
|
|
continue;
|
|
}
|
|
|
|
void* value = _conjunct_ctxs[conj_idx]->get_value(
|
|
expr, nullptr, slot->type().precision, slot->type().scale);
|
|
// for case: where col > null
|
|
if (value == nullptr) {
|
|
continue;
|
|
}
|
|
|
|
switch (slot->type().type) {
|
|
case TYPE_DATE: {
|
|
DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
|
|
// NOTE: Datetime may be truncated to a date column, so we call ++operator for date_value
|
|
// for example: '2010-01-01 00:00:01' will be truncate to '2010-01-01'
|
|
if (date_value.check_loss_accuracy_cast_to_date()) {
|
|
if (pred->op() == TExprOpcode::LT || pred->op() == TExprOpcode::GE) {
|
|
++date_value;
|
|
}
|
|
}
|
|
range->add_range(to_olap_filter_type(pred->op(), child_idx),
|
|
*reinterpret_cast<typename PrimitiveTypeTraits<T>::CppType*>(
|
|
&date_value));
|
|
break;
|
|
}
|
|
case TYPE_DATEV2: {
|
|
DateV2Value<DateTimeV2ValueType> datetimev2_value =
|
|
*reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(value);
|
|
doris::vectorized::DateV2Value<DateV2ValueType> date_v2;
|
|
date_v2.set_date_uint32(binary_cast<DateV2Value<DateTimeV2ValueType>, uint64_t>(
|
|
datetimev2_value) >>
|
|
TIME_PART_LENGTH);
|
|
if (!datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
|
|
if (pred->op() == TExprOpcode::LT || pred->op() == TExprOpcode::GE) {
|
|
++date_v2;
|
|
}
|
|
}
|
|
if constexpr (T == PrimitiveType::TYPE_DATEV2) {
|
|
range->add_range(to_olap_filter_type(pred->op(), child_idx), date_v2);
|
|
break;
|
|
} else {
|
|
__builtin_unreachable();
|
|
}
|
|
}
|
|
case TYPE_TINYINT:
|
|
case TYPE_DECIMALV2:
|
|
case TYPE_DECIMAL32:
|
|
case TYPE_DECIMAL64:
|
|
case TYPE_DECIMAL128:
|
|
case TYPE_CHAR:
|
|
case TYPE_VARCHAR:
|
|
case TYPE_HLL:
|
|
case TYPE_DATETIME:
|
|
case TYPE_DATETIMEV2:
|
|
case TYPE_SMALLINT:
|
|
case TYPE_INT:
|
|
case TYPE_BIGINT:
|
|
case TYPE_LARGEINT:
|
|
case TYPE_BOOLEAN:
|
|
case TYPE_STRING: {
|
|
range->add_range(
|
|
to_olap_filter_type(pred->op(), child_idx),
|
|
*reinterpret_cast<typename PrimitiveTypeTraits<T>::CppType*>(value));
|
|
break;
|
|
}
|
|
|
|
default: {
|
|
LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type="
|
|
<< expr->type() << "]";
|
|
return Status::InternalError(
|
|
"Normalize filter fail, Unsupported Primitive type");
|
|
}
|
|
}
|
|
|
|
if (is_key_column(slot->col_name())) {
|
|
filter_conjuncts_index.emplace_back(conj_idx);
|
|
}
|
|
|
|
VLOG_CRITICAL << slot->col_name() << " op: "
|
|
<< static_cast<int>(to_olap_filter_type(pred->op(), child_idx))
|
|
<< " value: "
|
|
<< *reinterpret_cast<typename PrimitiveTypeTraits<T>::CppType*>(
|
|
value);
|
|
}
|
|
}
|
|
}
|
|
|
|
std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
|
|
std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
|
|
std::vector<uint32_t> filter_conjuncts_index;
|
|
|
|
for (int conj_idx = _direct_conjunct_size; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
|
|
Expr* root_expr = _conjunct_ctxs[conj_idx]->root();
|
|
if (TExprNodeType::BLOOM_PRED != root_expr->node_type()) continue;
|
|
|
|
Expr* pred = _conjunct_ctxs[conj_idx]->root();
|
|
DCHECK(pred->get_num_children() == 1);
|
|
|
|
if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
|
|
continue;
|
|
}
|
|
if (pred->get_child(0)->type().type != slot->type().type) {
|
|
if (!ignore_cast(slot, pred->get_child(0))) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
std::vector<SlotId> slot_ids;
|
|
|
|
if (1 == pred->get_child(0)->get_slot_ids(&slot_ids)) {
|
|
if (slot_ids[0] != slot->id()) {
|
|
continue;
|
|
}
|
|
// only key column of bloom filter will push down to storage engine
|
|
if (is_key_column(slot->col_name())) {
|
|
filter_conjuncts_index.emplace_back(conj_idx);
|
|
_bloom_filters_push_down.emplace_back(
|
|
slot->col_name(),
|
|
(reinterpret_cast<BloomFilterPredicate*>(pred))->get_bloom_filter_func());
|
|
}
|
|
}
|
|
}
|
|
|
|
std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
|
|
std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
|
|
if (_scan_ranges.empty()) {
|
|
_transfer_done = true;
|
|
return Status::OK();
|
|
}
|
|
auto span = opentelemetry::trace::Tracer::GetCurrentSpan();
|
|
|
|
// ranges constructed from scan keys
|
|
std::vector<std::unique_ptr<OlapScanRange>> cond_ranges;
|
|
RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
|
|
// if we can't get ranges from conditions, we give it a total range
|
|
if (cond_ranges.empty()) {
|
|
cond_ranges.emplace_back(new OlapScanRange());
|
|
}
|
|
bool need_split = true;
|
|
// If we have ranges more than 64, there is no need to call
|
|
// ShowHint to split ranges
|
|
if (limit() != -1 || cond_ranges.size() > 64) {
|
|
need_split = false;
|
|
}
|
|
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
|
|
|
|
std::unordered_set<std::string> disk_set;
|
|
for (auto& scan_range : _scan_ranges) {
|
|
auto tablet_id = scan_range->tablet_id;
|
|
std::string err;
|
|
TabletSharedPtr tablet =
|
|
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
|
|
if (tablet == nullptr) {
|
|
std::stringstream ss;
|
|
ss << "failed to get tablet: " << tablet_id << ", reason: " << err;
|
|
LOG(WARNING) << ss.str();
|
|
return Status::InternalError(ss.str());
|
|
}
|
|
|
|
std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges;
|
|
std::vector<std::unique_ptr<OlapScanRange>> split_ranges;
|
|
if (need_split && !tablet->all_beta()) {
|
|
auto st = get_hints(tablet, *scan_range, config::doris_scan_range_row_count,
|
|
_scan_keys.begin_include(), _scan_keys.end_include(), cond_ranges,
|
|
&split_ranges, _runtime_profile.get());
|
|
if (st.ok()) {
|
|
ranges = &split_ranges;
|
|
}
|
|
}
|
|
int size_based_scanners_per_tablet = 1;
|
|
|
|
if (config::doris_scan_range_max_mb > 0) {
|
|
size_based_scanners_per_tablet = std::max(
|
|
1, (int)tablet->tablet_footprint() / config::doris_scan_range_max_mb << 20);
|
|
}
|
|
|
|
int ranges_per_scanner =
|
|
std::max(1, (int)ranges->size() /
|
|
std::min(scanners_per_tablet, size_based_scanners_per_tablet));
|
|
int num_ranges = ranges->size();
|
|
for (int i = 0; i < num_ranges;) {
|
|
std::vector<OlapScanRange*> scanner_ranges;
|
|
scanner_ranges.push_back((*ranges)[i].get());
|
|
++i;
|
|
for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
|
|
(*ranges)[i]->end_include == (*ranges)[i - 1]->end_include;
|
|
++j, ++i) {
|
|
scanner_ranges.push_back((*ranges)[i].get());
|
|
}
|
|
VOlapScanner* scanner =
|
|
new VOlapScanner(state, this, _olap_scan_node.is_preaggregation,
|
|
_need_agg_finalize, *scan_range, _scanner_mem_tracker.get());
|
|
// add scanner to pool before doing prepare.
|
|
// so that scanner can be automatically deconstructed if prepare failed.
|
|
_scanner_pool.add(scanner);
|
|
RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter,
|
|
_bloom_filters_push_down, _push_down_functions));
|
|
|
|
_volap_scanners.push_back(scanner);
|
|
disk_set.insert(scanner->scan_disk());
|
|
}
|
|
}
|
|
COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
|
|
COUNTER_SET(_num_scanners, static_cast<int64_t>(_volap_scanners.size()));
|
|
telemetry::set_span_attribute(span, _num_disks_accessed_counter);
|
|
telemetry::set_span_attribute(span, _num_scanners);
|
|
|
|
// init progress
|
|
std::stringstream ss;
|
|
ss << "ScanThread complete (node=" << id() << "):";
|
|
_progress = ProgressUpdater(ss.str(), _volap_scanners.size(), 1);
|
|
|
|
_transfer_thread.reset(new std::thread(
|
|
[this, state, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
|
|
opentelemetry::trace::Scope scope {parent_span};
|
|
transfer_thread(state);
|
|
}));
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::close(RuntimeState* state) {
|
|
if (is_closed()) {
|
|
return Status::OK();
|
|
}
|
|
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::close");
|
|
// change done status
|
|
{
|
|
std::unique_lock<std::mutex> l(_blocks_lock);
|
|
_transfer_done = true;
|
|
}
|
|
// notify all scanner thread
|
|
_block_consumed_cv.notify_all();
|
|
_block_added_cv.notify_all();
|
|
_scan_block_added_cv.notify_all();
|
|
|
|
// join transfer thread
|
|
if (_transfer_thread) {
|
|
_transfer_thread->join();
|
|
}
|
|
|
|
// clear some block in queue
|
|
// TODO: The presence of transfer_thread here may cause Block's memory alloc and be released not in a thread,
|
|
// which may lead to potential performance problems. we should rethink whether to delete the transfer thread
|
|
std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(),
|
|
std::default_delete<Block>());
|
|
_materialized_row_batches_bytes = 0;
|
|
std::for_each(_scan_blocks.begin(), _scan_blocks.end(), std::default_delete<Block>());
|
|
_scan_row_batches_bytes = 0;
|
|
std::for_each(_free_blocks.begin(), _free_blocks.end(), std::default_delete<Block>());
|
|
|
|
// OlapScanNode terminate by exception
|
|
// so that initiative close the Scanner
|
|
for (auto scanner : _volap_scanners) {
|
|
scanner->close(state);
|
|
}
|
|
|
|
for (auto& filter_desc : _runtime_filter_descs) {
|
|
IRuntimeFilter* runtime_filter = nullptr;
|
|
state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
|
|
DCHECK(runtime_filter != nullptr);
|
|
runtime_filter->consumer_close();
|
|
}
|
|
// pushed functions close
|
|
Expr::close(_pushed_func_conjunct_ctxs, state);
|
|
|
|
for (auto& ctx : _stale_vexpr_ctxs) {
|
|
(*ctx)->close(state);
|
|
}
|
|
|
|
VLOG_CRITICAL << "VOlapScanNode::close()";
|
|
return ScanNode::close(state);
|
|
}
|
|
|
|
Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
|
|
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VOlapScanNode::get_next");
|
|
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
|
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
|
|
|
// check if Canceled.
|
|
if (state->is_cancelled()) {
|
|
std::unique_lock<std::mutex> l(_blocks_lock);
|
|
_transfer_done = true;
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
if (LIKELY(_status.ok())) {
|
|
_status = Status::Cancelled("Cancelled");
|
|
}
|
|
return _status;
|
|
}
|
|
|
|
// check if started.
|
|
if (!_start) {
|
|
Status status = start_scan(state);
|
|
|
|
if (!status.ok()) {
|
|
LOG(ERROR) << "StartScan Failed cause " << status.get_error_msg();
|
|
*eos = true;
|
|
return status;
|
|
}
|
|
|
|
_start = true;
|
|
}
|
|
|
|
// some conjuncts will be disposed in start_scan function, so
|
|
// we should check _eos after call start_scan
|
|
if (_eos) {
|
|
*eos = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
// wait for block from queue
|
|
Block* materialized_block = nullptr;
|
|
{
|
|
std::unique_lock<std::mutex> l(_blocks_lock);
|
|
SCOPED_TIMER(_olap_wait_batch_queue_timer);
|
|
while (_materialized_blocks.empty() && !_transfer_done) {
|
|
if (state->is_cancelled()) {
|
|
_transfer_done = true;
|
|
}
|
|
|
|
// use wait_for, not wait, in case to capture the state->is_cancelled()
|
|
_block_added_cv.wait_for(l, std::chrono::seconds(1));
|
|
}
|
|
|
|
if (!_materialized_blocks.empty()) {
|
|
materialized_block = _materialized_blocks.back();
|
|
DCHECK(materialized_block != nullptr);
|
|
_materialized_blocks.pop_back();
|
|
_materialized_row_batches_bytes -= materialized_block->allocated_bytes();
|
|
}
|
|
}
|
|
|
|
// return block
|
|
if (nullptr != materialized_block) {
|
|
// notify scanner
|
|
_block_consumed_cv.notify_one();
|
|
// get scanner's block memory
|
|
block->swap(*materialized_block);
|
|
VLOG_ROW << "VOlapScanNode output rows: " << block->rows();
|
|
reached_limit(block, eos);
|
|
|
|
// reach scan node limit
|
|
if (*eos) {
|
|
{
|
|
std::unique_lock<std::mutex> l(_blocks_lock);
|
|
_transfer_done = true;
|
|
}
|
|
|
|
_block_consumed_cv.notify_all();
|
|
*eos = true;
|
|
LOG(INFO) << "VOlapScanNode ReachedLimit.";
|
|
} else {
|
|
*eos = false;
|
|
}
|
|
|
|
{
|
|
// ReThink whether the SpinLock Better
|
|
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
|
_free_blocks.emplace_back(materialized_block);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// all scanner done, change *eos to true
|
|
*eos = true;
|
|
std::lock_guard<SpinLock> guard(_status_mutex);
|
|
return _status;
|
|
}
|
|
|
|
Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
|
|
{
|
|
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
|
if (!_free_blocks.empty()) {
|
|
auto block = _free_blocks.back();
|
|
_free_blocks.pop_back();
|
|
return block;
|
|
}
|
|
}
|
|
|
|
get_free_block = false;
|
|
|
|
auto block = new Block(_tuple_desc->slots(), _block_size);
|
|
_buffered_bytes += block->allocated_bytes();
|
|
return block;
|
|
}
|
|
|
|
int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) {
|
|
std::list<VOlapScanner*> olap_scanners;
|
|
int assigned_thread_num = _running_thread;
|
|
size_t max_thread = config::doris_scanner_queue_size;
|
|
if (config::doris_scanner_row_num > state->batch_size()) {
|
|
max_thread /= config::doris_scanner_row_num / state->batch_size();
|
|
if (max_thread <= 0) max_thread = 1;
|
|
}
|
|
// copy to local
|
|
{
|
|
// How many thread can apply to this query
|
|
size_t thread_slot_num = 0;
|
|
{
|
|
if (_scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) {
|
|
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
|
thread_slot_num = _free_blocks.size() / block_per_scanner;
|
|
thread_slot_num += (_free_blocks.size() % block_per_scanner != 0);
|
|
thread_slot_num = std::min(thread_slot_num, max_thread - assigned_thread_num);
|
|
if (thread_slot_num <= 0) {
|
|
thread_slot_num = 1;
|
|
}
|
|
} else {
|
|
std::lock_guard<std::mutex> l(_scan_blocks_lock);
|
|
if (_scan_blocks.empty()) {
|
|
// Just for notify if _scan_blocks is empty and no running thread
|
|
if (assigned_thread_num == 0) {
|
|
thread_slot_num = 1;
|
|
// NOTE: if olap_scanners_ is empty, scanner_done_ should be true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
{
|
|
std::lock_guard<std::mutex> l(_volap_scanners_lock);
|
|
thread_slot_num = std::min(thread_slot_num, _volap_scanners.size());
|
|
for (int i = 0; i < thread_slot_num && !_volap_scanners.empty();) {
|
|
auto scanner = _volap_scanners.front();
|
|
_volap_scanners.pop_front();
|
|
|
|
if (scanner->need_to_close()) {
|
|
scanner->close(state);
|
|
} else {
|
|
olap_scanners.push_back(scanner);
|
|
_running_thread++;
|
|
assigned_thread_num++;
|
|
i++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// post volap scanners to thread-pool
|
|
auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
|
|
ThreadPoolToken* thread_token = nullptr;
|
|
if (_limit > -1 && _limit < 1024) {
|
|
thread_token = state->get_query_fragments_ctx()->get_serial_token();
|
|
} else {
|
|
thread_token = state->get_query_fragments_ctx()->get_token();
|
|
}
|
|
auto iter = olap_scanners.begin();
|
|
if (thread_token != nullptr) {
|
|
while (iter != olap_scanners.end()) {
|
|
auto s = thread_token->submit_func([this, scanner = *iter, parent_span = cur_span] {
|
|
opentelemetry::trace::Scope scope {parent_span};
|
|
this->scanner_thread(scanner);
|
|
});
|
|
if (s.ok()) {
|
|
(*iter)->start_wait_worker_timer();
|
|
COUNTER_UPDATE(_scanner_sched_counter, 1);
|
|
olap_scanners.erase(iter++);
|
|
} else {
|
|
LOG(FATAL) << "Failed to assign scanner task to thread pool! " << s.get_error_msg();
|
|
}
|
|
++_total_assign_num;
|
|
}
|
|
} else {
|
|
PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
|
|
PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool();
|
|
while (iter != olap_scanners.end()) {
|
|
PriorityThreadPool::Task task;
|
|
task.work_function = [this, scanner = *iter, parent_span = cur_span] {
|
|
opentelemetry::trace::Scope scope {parent_span};
|
|
this->scanner_thread(scanner);
|
|
};
|
|
task.priority = _nice;
|
|
task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk());
|
|
(*iter)->start_wait_worker_timer();
|
|
|
|
TabletStorageType type = (*iter)->get_storage_type();
|
|
bool ret = false;
|
|
COUNTER_UPDATE(_scanner_sched_counter, 1);
|
|
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
|
|
ret = thread_pool->offer(task);
|
|
} else {
|
|
ret = remote_thread_pool->offer(task);
|
|
}
|
|
|
|
if (ret) {
|
|
olap_scanners.erase(iter++);
|
|
} else {
|
|
LOG(FATAL) << "Failed to assign scanner task to thread pool!";
|
|
}
|
|
++_total_assign_num;
|
|
}
|
|
}
|
|
|
|
return assigned_thread_num;
|
|
}
|
|
|
|
// PlanFragmentExecutor will call this method to set scan range
|
|
// Doris scan range is defined in thrift file like this
|
|
// struct TPaloScanRange {
|
|
// 1: required list<Types.TNetworkAddress> hosts
|
|
// 2: required string schema_hash
|
|
// 3: required string version
|
|
// 5: required Types.TTabletId tablet_id
|
|
// 6: required string db_name
|
|
// 7: optional list<TKeyRange> partition_column_ranges
|
|
// 8: optional string index_name
|
|
// 9: optional string table_name
|
|
//}
|
|
// every doris_scan_range is related with one tablet so that one olap scan node contains multiple tablet
|
|
Status VOlapScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
|
|
for (auto& scan_range : scan_ranges) {
|
|
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
|
|
_scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range));
|
|
COUNTER_UPDATE(_tablet_counter, 1);
|
|
}
|
|
telemetry::set_current_span_attribute(_tablet_counter);
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VOlapScanNode::get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range,
|
|
int block_row_count, bool is_begin_include, bool is_end_include,
|
|
const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range,
|
|
std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range,
|
|
RuntimeProfile* profile) {
|
|
RuntimeProfile::Counter* show_hints_timer = profile->get_counter("ShowHintsTime_V1");
|
|
std::vector<std::vector<OlapTuple>> ranges;
|
|
bool have_valid_range = false;
|
|
for (auto& key_range : scan_key_range) {
|
|
if (key_range->begin_scan_range.size() == 1 &&
|
|
key_range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) {
|
|
continue;
|
|
}
|
|
SCOPED_TIMER(show_hints_timer);
|
|
|
|
Status res = Status::OK();
|
|
std::vector<OlapTuple> range;
|
|
res = table->split_range(key_range->begin_scan_range, key_range->end_scan_range,
|
|
block_row_count, &range);
|
|
if (!res.ok()) {
|
|
return Status::InternalError("fail to show hints");
|
|
}
|
|
ranges.emplace_back(std::move(range));
|
|
have_valid_range = true;
|
|
}
|
|
|
|
if (!have_valid_range) {
|
|
std::vector<OlapTuple> range;
|
|
auto res = table->split_range({}, {}, block_row_count, &range);
|
|
if (!res.ok()) {
|
|
return Status::InternalError("fail to show hints");
|
|
}
|
|
ranges.emplace_back(std::move(range));
|
|
}
|
|
|
|
for (int i = 0; i < ranges.size(); ++i) {
|
|
for (int j = 0; j < ranges[i].size(); j += 2) {
|
|
std::unique_ptr<OlapScanRange> range(new OlapScanRange);
|
|
range->begin_scan_range.reset();
|
|
range->begin_scan_range = ranges[i][j];
|
|
range->end_scan_range.reset();
|
|
range->end_scan_range = ranges[i][j + 1];
|
|
|
|
if (0 == j) {
|
|
range->begin_include = is_begin_include;
|
|
} else {
|
|
range->begin_include = true;
|
|
}
|
|
|
|
if (j + 2 == ranges[i].size()) {
|
|
range->end_include = is_end_include;
|
|
} else {
|
|
range->end_include = false;
|
|
}
|
|
|
|
sub_scan_range->emplace_back(std::move(range));
|
|
}
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
} // namespace doris::vectorized
|