diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 38e599180e..d5daf2af53 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -62,9 +62,27 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, RETURN_IF_ERROR(status); if (block->rows() > 0) { if (!config::group_commit_wait_replay_wal_finish) { - _block_queue.push_back(block); + _block_queue.emplace_back(block); _data_bytes += block->bytes(); + int before_block_queues_bytes = _all_block_queues_bytes->load(); _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); + std::stringstream ss; + ss << "["; + for (const auto& id : _load_ids) { + ss << id.to_string() << ", "; + } + ss << "]"; + VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). " + << "block queue size is " << _block_queue.size() << ", block rows is " + << block->rows() << ", block bytes is " << block->bytes() + << ", before add block, all block queues bytes is " + << before_block_queues_bytes + << ", after add block, all block queues bytes is " + << _all_block_queues_bytes->load() << ", txn_id=" << txn_id + << ", label=" << label << ", instance_id=" << load_instance_id + << ", load_ids=" << ss.str() << ", runtime_state=" << runtime_state + << ", the block is " << block->dump_data() << ", the block column size is " + << block->columns_bytes(); } if (write_wal || config::group_commit_wait_replay_wal_finish) { auto st = _v_wal_writer->write_wal(block.get()); @@ -132,11 +150,29 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* return st; } if (!_block_queue.empty()) { - auto fblock = _block_queue.front(); - block->swap(*fblock.get()); + const BlockData block_data = _block_queue.front(); + block->swap(*block_data.block); *find_block = true; _block_queue.pop_front(); - _all_block_queues_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed); + int before_block_queues_bytes = _all_block_queues_bytes->load(); + _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); + std::stringstream ss; + ss << "["; + for (const auto& id : _load_ids) { + ss << id.to_string() << ", "; + } + ss << "]"; + VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). " + << "block queue size is " << _block_queue.size() << ", block rows is " + << block->rows() << ", block bytes is " << block->bytes() + << ", before remove block, all block queues bytes is " + << before_block_queues_bytes + << ", after remove block, all block queues bytes is " + << _all_block_queues_bytes->load() << ", txn_id=" << txn_id + << ", label=" << label << ", instance_id=" << load_instance_id + << ", load_ids=" << ss.str() << ", runtime_state=" << runtime_state + << ", the block is " << block->dump_data() << ", the block column size is " + << block->columns_bytes(); } if (_block_queue.empty() && _need_commit && _load_ids.empty()) { *eos = true; @@ -176,10 +212,26 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) { << ", status=" << st.to_string(); status = st; while (!_block_queue.empty()) { - { - auto& future_block = _block_queue.front(); - _all_block_queues_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed); + const BlockData& block_data = _block_queue.front().block; + int before_block_queues_bytes = _all_block_queues_bytes->load(); + _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); + std::stringstream ss; + ss << "["; + for (const auto& id : _load_ids) { + ss << id.to_string() << ", "; } + ss << "]"; + VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). " + << "block queue size is " << _block_queue.size() << ", block rows is " + << block_data.block->rows() << ", block bytes is " << block_data.block->bytes() + << ", before remove block, all block queues bytes is " + << before_block_queues_bytes + << ", after remove block, all block queues bytes is " + << _all_block_queues_bytes->load() << ", txn_id=" << txn_id + << ", label=" << label << ", instance_id=" << load_instance_id + << ", load_ids=" << ss.str() << ", the block is " + << block_data.block->dump_data() << ", the block column size is " + << block_data.block->columns_bytes(); _block_queue.pop_front(); } } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 76a890f7a8..c41f6abd6f 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -41,6 +41,13 @@ class ExecEnv; class TUniqueId; class RuntimeState; +struct BlockData { + BlockData(const std::shared_ptr& block) + : block(block), block_bytes(block->bytes()) {}; + std::shared_ptr block; + size_t block_bytes; +}; + class LoadBlockQueue { public: LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id, @@ -94,7 +101,7 @@ private: // the set of load ids of all blocks in this queue std::set _load_ids; - std::list> _block_queue; + std::list _block_queue; // wal std::string _wal_base_path; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index dd1a659ae1..d26d219e5d 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -417,6 +417,25 @@ size_t Block::bytes() const { return res; } +std::string Block::columns_bytes() const { + std::stringstream res; + res << "column bytes: ["; + for (const auto& elem : data) { + if (!elem.column) { + std::stringstream ss; + for (const auto& e : data) { + ss << e.name + " "; + } + throw Exception(ErrorCode::INTERNAL_ERROR, + "Column {} in block is nullptr, in method bytes. All Columns are {}", + elem.name, ss.str()); + } + res << ", " << elem.column->byte_size(); + } + res << "]"; + return res.str(); +} + size_t Block::allocated_bytes() const { size_t res = 0; for (const auto& elem : data) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index c9b3f2d5b5..593d37f7ff 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -202,6 +202,8 @@ public: /// Approximate number of bytes in memory - for profiling and limits. size_t bytes() const; + std::string columns_bytes() const; + /// Approximate number of allocated bytes in memory - for profiling and limits. size_t allocated_bytes() const; diff --git a/regression-test/suites/insert_p0/test_group_commit_variant.groovy b/regression-test/suites/insert_p0/test_group_commit_variant.groovy new file mode 100644 index 0000000000..15d8304bd4 --- /dev/null +++ b/regression-test/suites/insert_p0/test_group_commit_variant.groovy @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_group_commit_variant") { + + sql "set group_commit=async_mode" + + def testTable = "test_group_commit_variant" + + sql """ + CREATE TABLE IF NOT EXISTS ${testTable} ( + k bigint, + var variant + ) + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH (`k`) BUCKETS 5 + properties("replication_num" = "1", + "disable_auto_compaction" = "false"); + """ + + try { + sql "insert into ${testTable} (k) values (1),(2);" + sql "insert into ${testTable} (k) values (3),(4);" + } catch (Exception e) { + // should not throw exception + logger.info(e.getMessage()) + assertTrue(False) + } +}