diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index cabf14ae62..f8d53a9e43 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -95,7 +95,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _runtime_filter_ctxs[i].runtimefilter = runtime_filter; } - + _batch_size = _limit == -1 ? state->batch_size() + : std::min(static_cast(state->batch_size()), _limit); return Status::OK(); } @@ -273,7 +274,6 @@ Status OlapScanNode::open(RuntimeState* state) { Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - // check if Canceled. if (state->is_cancelled()) { std::unique_lock l(_row_batches_lock); @@ -939,6 +939,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { OlapScanner* scanner = new OlapScanner(state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, _scanner_mem_tracker.get()); + scanner->set_batch_size(_batch_size); // add scanner to pool before doing prepare. // so that scanner can be automatically deconstructed if prepare failed. _scanner_pool.add(scanner); @@ -1489,7 +1490,12 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } } - ThreadPoolToken* thread_token = state->get_query_fragments_ctx()->get_token(); + ThreadPoolToken* thread_token = nullptr; + if (limit() != -1 && limit() < 1024) { + thread_token = state->get_query_fragments_ctx()->get_serial_token(); + } else { + thread_token = state->get_query_fragments_ctx()->get_token(); + } /********************************* * The basic strategy of priority scheduling: @@ -1739,7 +1745,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { << ", fragment id=" << print_id(_runtime_state->fragment_instance_id()); break; } - RowBatch* row_batch = new RowBatch(this->row_desc(), state->batch_size()); + RowBatch* row_batch = new RowBatch(this->row_desc(), _batch_size); row_batch->set_scanner_id(scanner->id()); status = scanner->get_batch(_runtime_state, row_batch, &eos); if (!status.ok()) { @@ -1757,6 +1763,10 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { raw_bytes_read += row_batch->tuple_data_pool()->total_reserved_bytes(); } raw_rows_read = scanner->raw_rows_read(); + if (limit() != -1 && raw_rows_read >= limit()) { + eos = true; + break; + } } { @@ -1775,6 +1785,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { std::lock_guard guard(_status_mutex); global_status_ok = _status.ok(); } + if (UNLIKELY(!global_status_ok)) { eos = true; for (auto rb : row_batchs) { diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 5d5b700356..ea2b0dc70d 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -206,6 +206,8 @@ protected: // object is. ObjectPool _scanner_pool; + size_t _batch_size = 0; + std::shared_ptr _transfer_thread; // Keeps track of total splits and the number finished. diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index c725aa1458..a2dac28d30 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -63,11 +63,7 @@ Status OlapScanner::prepare( SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); set_tablet_reader(); // set limit to reduce end of rowset and segment mem use - _tablet_reader->set_batch_size( - _parent->limit() == -1 - ? _parent->_runtime_state->batch_size() - : std::min(static_cast(_parent->_runtime_state->batch_size()), - _parent->limit())); + _tablet_reader->set_batch_size(_parent->_batch_size); // Get olap table TTabletId tablet_id = scan_range.tablet_id; @@ -314,9 +310,15 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) { Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); // 2. Allocate Row's Tuple buf + Status st = Status::OK(); uint8_t* tuple_buf = - batch->tuple_data_pool()->allocate(state->batch_size() * _tuple_desc->byte_size()); - bzero(tuple_buf, state->batch_size() * _tuple_desc->byte_size()); + batch->tuple_data_pool()->allocate(_batch_size * _tuple_desc->byte_size(), &st); + RETURN_NOT_OK_STATUS_WITH_WARN(st, "Allocate mem for row batch failed"); + if (tuple_buf == nullptr) { + LOG(WARNING) << "Allocate mem for row batch failed."; + return Status::RuntimeError("Allocate mem for row batch failed."); + } + bzero(tuple_buf, _batch_size * _tuple_desc->byte_size()); Tuple* tuple = reinterpret_cast(tuple_buf); std::unique_ptr mem_pool(new MemPool(_mem_tracker)); @@ -329,6 +331,11 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { ObjectPool tmp_object_pool; // release the memory of the object which can't pass the conjuncts. ObjectPool unused_object_pool; + if (batch->tuple_data_pool()->total_reserved_bytes() >= raw_bytes_threshold) { + return Status::RuntimeError( + "Scanner row bytes buffer is too small, please try to increase be config " + "'doris_scanner_row_bytes'."); + } while (true) { // Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break if (batch->is_full() || @@ -631,6 +638,7 @@ void OlapScanner::_update_realtime_counter() { COUNTER_UPDATE(_parent->_raw_rows_counter, stats.raw_rows_read); // if raw_rows_read is reset, scanNode will scan all table rows which may cause BE crash _raw_rows_read += stats.raw_rows_read; + _tablet_reader->mutable_stats()->raw_rows_read = 0; } diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index e95e31d106..ea128dc9dd 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -90,6 +90,8 @@ public: TabletStorageType get_storage_type(); + void set_batch_size(size_t batch_size) { _batch_size = batch_size; } + protected: Status _init_tablet_reader_params( const std::vector& key_ranges, const std::vector& filters, @@ -140,6 +142,8 @@ protected: int64_t _raw_rows_read = 0; int64_t _compressed_bytes_read = 0; + size_t _batch_size = 0; + // number rows filtered by pushed condition int64_t _num_rows_pushed_cond_filtered = 0; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7b16f853ce..b2f00c862d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -640,6 +640,15 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit); } } + if (params.__isset.fragment && params.fragment.__isset.plan && + params.fragment.plan.nodes.size() > 0) { + for (auto& node : params.fragment.plan.nodes) { + if (node.limit > 0 && node.limit < 1024) { + fragments_ctx->set_serial_thread_token(); + break; + } + } + } { // Find _fragments_ctx_map again, in case some other request has already diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 0c91f1b305..78d84cb4b2 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -59,9 +59,15 @@ public: ThreadPool::ExecutionMode::CONCURRENT, cpu_limit); } } + void set_serial_thread_token() { + _serial_thread_token = _exec_env->limited_scan_thread_pool()->new_token( + ThreadPool::ExecutionMode::SERIAL, 1); + } ThreadPoolToken* get_token() { return _thread_token.get(); } + ThreadPoolToken* get_serial_token() { return _serial_thread_token.get(); } + void set_ready_to_execute() { { std::lock_guard l(_start_lock); @@ -108,6 +114,11 @@ private: // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env. std::unique_ptr _thread_token; + // A token used to submit olap scanner to the "_limited_scan_thread_pool" serially, it used for + // query like `select * limit 1`, this query used for limit the max scaner thread to 1 to avoid + // this query cost too much resource + std::unique_ptr _serial_thread_token; + std::mutex _start_lock; std::condition_variable _start_cond; // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState. diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 2b1f0a09c1..9c8ac6d3aa 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -499,7 +499,7 @@ size_t RowBatch::get_batch_size(const PRowBatch& batch) { void RowBatch::acquire_state(RowBatch* src) { // DCHECK(_row_desc.equals(src->_row_desc)); DCHECK_EQ(_num_tuples_per_row, src->_num_tuples_per_row); - DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size); + // DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size); DCHECK_EQ(_auxiliary_mem_usage, 0); // The destination row batch should be empty. diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 993475cfec..4b4ddf313f 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -1876,35 +1876,58 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per } // post volap scanners to thread-pool - PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan(); - PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool(); + ThreadPoolToken* thread_token = nullptr; + if (_limit > -1 && _limit < 1024) { + thread_token = state->get_query_fragments_ctx()->get_serial_token(); + } else { + thread_token = state->get_query_fragments_ctx()->get_token(); + } auto iter = olap_scanners.begin(); - while (iter != olap_scanners.end()) { - PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, parent_span = cur_span] { - opentelemetry::trace::Scope scope {parent_span}; - this->scanner_thread(scanner); - }; - task.priority = _nice; - task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); - (*iter)->start_wait_worker_timer(); - - TabletStorageType type = (*iter)->get_storage_type(); - bool ret = false; - COUNTER_UPDATE(_scanner_sched_counter, 1); - if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { - ret = thread_pool->offer(task); - } else { - ret = remote_thread_pool->offer(task); + if (thread_token != nullptr) { + while (iter != olap_scanners.end()) { + auto s = thread_token->submit_func([this, scanner = *iter, parent_span = cur_span] { + opentelemetry::trace::Scope scope {parent_span}; + this->scanner_thread(scanner); + }); + if (s.ok()) { + (*iter)->start_wait_worker_timer(); + COUNTER_UPDATE(_scanner_sched_counter, 1); + olap_scanners.erase(iter++); + } else { + LOG(FATAL) << "Failed to assign scanner task to thread pool! " << s.get_error_msg(); + } + ++_total_assign_num; } + } else { + PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); + PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool(); + while (iter != olap_scanners.end()) { + PriorityThreadPool::Task task; + task.work_function = [this, scanner = *iter, parent_span = cur_span] { + opentelemetry::trace::Scope scope {parent_span}; + this->scanner_thread(scanner); + }; + task.priority = _nice; + task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); + (*iter)->start_wait_worker_timer(); - if (ret) { - olap_scanners.erase(iter++); - } else { - LOG(FATAL) << "Failed to assign scanner task to thread pool!"; + TabletStorageType type = (*iter)->get_storage_type(); + bool ret = false; + COUNTER_UPDATE(_scanner_sched_counter, 1); + if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { + ret = thread_pool->offer(task); + } else { + ret = remote_thread_pool->offer(task); + } + + if (ret) { + olap_scanners.erase(iter++); + } else { + LOG(FATAL) << "Failed to assign scanner task to thread pool!"; + } + ++_total_assign_num; } - ++_total_assign_num; } return assigned_thread_num;