From 97996c9275ceffce1dd4a11bdfd8b1831d8e5624 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Fri, 1 Jul 2022 15:29:26 +0800 Subject: [PATCH] [fix](Insert) fix 5 concurrent "insert...select..." OOM (#10501) * [hotfix](dev-1.0.1) 5 concurrent insert...select... OOM Co-authored-by: minghong Co-authored-by: yiguolei --- be/src/common/config.h | 2 +- be/src/exec/tablet_sink.h | 12 ++++++++++++ be/src/olap/delta_writer.h | 2 ++ be/src/runtime/tablets_channel.cpp | 1 + be/src/vec/core/block.h | 9 +++++++++ be/src/vec/exec/volap_scan_node.cpp | 8 ++++---- be/src/vec/exec/volap_scanner.cpp | 4 +--- be/src/vec/sink/vtablet_sink.cpp | 17 ++++++++++++++++- be/src/vec/sink/vtablet_sink.h | 2 ++ 9 files changed, 48 insertions(+), 9 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 8d655185c2..c1b91827a9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -490,7 +490,7 @@ CONF_mInt64(memtable_max_buffer_size, "419430400"); // impact the load performance when user upgrading Doris. // user should set these configs properly if necessary. CONF_Int64(load_process_max_memory_limit_bytes, "107374182400"); // 100GB -CONF_Int32(load_process_max_memory_limit_percent, "80"); // 80% +CONF_Int32(load_process_max_memory_limit_percent, "50"); // 50% // result buffer cancelled time (unit: second) CONF_mInt32(result_buffer_cancelled_interval_time, "300"); diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index ad691cfc48..342b028524 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -233,8 +233,11 @@ public: _node_info.brpc_port); } + size_t get_pending_bytes() { return _pending_batches_bytes; } + protected: void _cancel_with_msg(const std::string& msg); + virtual void _close_check(); protected: @@ -343,9 +346,18 @@ public: size_t num_node_channels() const { return _node_channels.size(); } + size_t get_pending_bytes() const { + size_t mem_consumption = 0; + for (auto& kv : _node_channels) { + mem_consumption += kv.second->get_pending_bytes(); + } + return mem_consumption; + } + private: friend class NodeChannel; friend class VNodeChannel; + friend class VOlapTableSink; OlapTableSink* _parent; int64_t _index_id; diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 3d49c302d3..1e2e9b6ed0 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -91,6 +91,8 @@ public: int32_t schema_hash() { return _tablet->schema_hash(); } + int64_t memtable_consumption() const; + int64_t save_mem_consumption_snapshot(); int64_t get_mem_consumption_snapshot() const; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 566d90fb7f..2e3945bb80 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -179,6 +179,7 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { // If we flush all the tablets at this time, each tablet will generate a lot of small files. // So here we only flush part of the tablet, and the next time the reduce memory operation is triggered, // the tablet that has not been flushed before will accumulate more data, thereby reducing the number of flushes. + int64_t mem_to_flushed = mem_limit / 3; int counter = 0; int64_t sum = 0; diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index c15a6728c4..1d51cbf733 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -449,6 +449,15 @@ public: void clear_column_data() noexcept; size_t allocated_bytes() const; + + size_t bytes() const { + size_t res = 0; + for (const auto& elem : _columns) { + res += elem->byte_size(); + } + + return res; + } }; } // namespace vectorized diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 2933f3f821..1f7671675f 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -495,9 +495,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { // Has to wait at least one full block, or it will cause a lot of schedule task in priority // queue, it will affect query latency and query concurrency for example ssb 3.3. - while (!eos && ((raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold && - get_free_block) || - num_rows_in_block < _runtime_state->batch_size())) { + while (!eos && raw_bytes_read < raw_bytes_threshold && + ((raw_rows_read < raw_rows_threshold && get_free_block) || + num_rows_in_block < _runtime_state->batch_size())) { if (UNLIKELY(_transfer_done)) { eos = true; status = Status::Cancelled("Cancelled"); @@ -516,7 +516,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { break; } - raw_bytes_read += block->allocated_bytes(); + raw_bytes_read += block->bytes(); num_rows_in_block += block->rows(); // 4. if status not ok, change status_. if (UNLIKELY(block->rows() == 0)) { diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index 3ef387b75b..bd2728f988 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -272,7 +272,6 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; - int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; if (!block->mem_reuse()) { for (const auto slot_desc : _tuple_desc->slots()) { block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), @@ -297,8 +296,7 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo _update_realtime_counter(); RETURN_IF_ERROR( VExprContext::filter_block(_vconjunct_ctx, block, _tuple_desc->slots().size())); - } while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold && - block->allocated_bytes() < raw_bytes_threshold); + } while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold); } // NOTE: // There is no need to check raw_bytes_threshold since block->rows() == 0 is checked first. diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 96a2b8c94b..ef334df09e 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -168,7 +168,8 @@ Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) { _cur_mutable_block->add_row(block_row.first, block_row.second); _cur_add_block_request.add_tablet_ids(tablet_id); - if (_cur_mutable_block->rows() == _batch_size) { + if (_cur_mutable_block->rows() == _batch_size || + _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) { { SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); std::lock_guard l(_pending_batches_lock); @@ -373,6 +374,13 @@ Status VOlapTableSink::open(RuntimeState* state) { return OlapTableSink::open(state); } +size_t VOlapTableSink::get_pending_bytes() const { + size_t mem_consumption = 0; + for (auto& indexChannel : _channels) { + mem_consumption += indexChannel->get_pending_bytes(); + } + return mem_consumption; +} Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) { SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); Status status = Status::OK(); @@ -426,6 +434,13 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { _partition_to_tablet_map.clear(); } + + //if pending bytes is more than 500M, wait + //constexpr size_t MAX_PENDING_BYTES = 500 * 1024 * 1024; + //while (get_pending_bytes() > MAX_PENDING_BYTES) { + // std::this_thread::sleep_for(std::chrono::microseconds(500)); + //} + for (int i = 0; i < num_rows; ++i) { if (filtered_rows > 0 && _filter_bitmap.Get(i)) { continue; diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index f0286bd4a5..36943473a8 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -89,6 +89,8 @@ public: using OlapTableSink::send; Status send(RuntimeState* state, vectorized::Block* block) override; + size_t get_pending_bytes() const; + private: // make input data valid for OLAP table // return number of invalid/filtered rows.