diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index 10af149445..abb7d8a324 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -41,6 +41,7 @@ WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) : _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false) { doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); _all_wal_disk_bytes = std::make_shared(0); + _cv = std::make_shared(); } WalManager::~WalManager() { @@ -199,7 +200,7 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path)); } LOG(INFO) << "create wal " << wal_path; - wal_writer = std::make_shared(wal_path, _all_wal_disk_bytes); + wal_writer = std::make_shared(wal_path, _all_wal_disk_bytes, _cv); RETURN_IF_ERROR(wal_writer->init()); { std::lock_guard wrlock(_wal_lock); @@ -207,6 +208,7 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& } return Status::OK(); } + Status WalManager::scan_wals(const std::string& wal_path) { size_t count = 0; bool exists = true; @@ -336,13 +338,11 @@ Status WalManager::delete_wal(int64_t wal_id) { { std::lock_guard wrlock(_wal_lock); if (_wal_id_to_writer_map.find(wal_id) != _wal_id_to_writer_map.end()) { - _all_wal_disk_bytes->store( - _all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(), - std::memory_order_relaxed), - std::memory_order_relaxed); - _wal_id_to_writer_map[wal_id]->cv.notify_one(); + _all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(), + std::memory_order_relaxed); + _cv->notify_one(); std::string wal_path = _wal_path_map[wal_id]; - LOG(INFO) << "wal delete file=" << wal_path << ", this file disk usage is" + LOG(INFO) << "wal delete file=" << wal_path << ", this file disk usage is " << _wal_id_to_writer_map[wal_id]->disk_bytes() << " ,after deleting it, all wals disk usage is " << _all_wal_disk_bytes->load(std::memory_order_relaxed); diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index cf4589fbf0..2cfaaa4ec5 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -17,6 +17,7 @@ #include +#include #include #include "common/config.h" @@ -85,5 +86,6 @@ private: std::unordered_map> _wal_status_queues; std::atomic _stop; std::unordered_map&> _wal_column_id_map; + std::shared_ptr _cv; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp index 52202b35c3..9d3da90d88 100644 --- a/be/src/olap/wal_writer.cpp +++ b/be/src/olap/wal_writer.cpp @@ -18,6 +18,8 @@ #include "olap/wal_writer.h" #include +#include +#include #include "common/config.h" #include "io/fs/file_writer.h" @@ -32,8 +34,13 @@ const char* k_wal_magic = "WAL1"; const uint32_t k_wal_magic_length = 4; WalWriter::WalWriter(const std::string& file_name, - const std::shared_ptr& all_wal_disk_bytes) - : _file_name(file_name), _disk_bytes(0), _all_wal_disk_bytes(all_wal_disk_bytes) {} + const std::shared_ptr& all_wal_disk_bytes, + const std::shared_ptr& cv) + : cv(cv), + _file_name(file_name), + _disk_bytes(0), + _all_wal_disk_bytes(all_wal_disk_bytes), + _is_first_append_blocks(true) {} WalWriter::~WalWriter() {} @@ -52,9 +59,19 @@ Status WalWriter::finalize() { Status WalWriter::append_blocks(const PBlockArray& blocks) { { - std::unique_lock l(_mutex); - while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > config::wal_max_disk_size) { - cv.wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME)); + if (_is_first_append_blocks) { + _is_first_append_blocks = false; + std::unique_lock l(_mutex); + while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > + config::wal_max_disk_size) { + LOG(INFO) << "First time to append blocks to wal file " << _file_name + << ". Currently, all wal disk space usage is " + << _all_wal_disk_bytes->load(std::memory_order_relaxed) + << ", larger than the maximum limit " << config::wal_max_disk_size + << ", so we need to wait. When any other load finished, that wal will be " + "removed, the space used by that wal will be free."; + cv->wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME)); + } } } size_t total_size = 0; @@ -82,11 +99,8 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) { "failed to write block to wal expected= " + std::to_string(total_size) + ",actually=" + std::to_string(offset)); } - _disk_bytes.store(_disk_bytes.fetch_add(total_size, std::memory_order_relaxed), - std::memory_order_relaxed); - _all_wal_disk_bytes->store( - _all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed), - std::memory_order_relaxed); + _disk_bytes.fetch_add(total_size, std::memory_order_relaxed); + _all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed); return Status::OK(); } diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal_writer.h index 058902783e..88ff465976 100644 --- a/be/src/olap/wal_writer.h +++ b/be/src/olap/wal_writer.h @@ -35,7 +35,8 @@ extern const uint32_t k_wal_magic_length; class WalWriter { public: explicit WalWriter(const std::string& file_name, - const std::shared_ptr& all_wal_disk_bytes); + const std::shared_ptr& all_wal_disk_bytes, + const std::shared_ptr& cv); ~WalWriter(); Status init(); @@ -50,7 +51,7 @@ public: public: static const int64_t LENGTH_SIZE = 8; static const int64_t CHECKSUM_SIZE = 4; - std::condition_variable cv; + std::shared_ptr cv; static const int64_t VERSION_SIZE = 4; private: @@ -60,6 +61,7 @@ private: std::atomic_size_t _disk_bytes; std::shared_ptr _all_wal_disk_bytes; std::mutex _mutex; + bool _is_first_append_blocks; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 375af4db6c..c044ccca3b 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -18,6 +18,9 @@ #include "runtime/group_commit_mgr.h" #include +#include + +#include #include "client_cache.h" #include "common/config.h" @@ -33,14 +36,15 @@ Status LoadBlockQueue::add_block(std::shared_ptr block) DCHECK(block->get_schema_version() == schema_version); std::unique_lock l(mutex); RETURN_IF_ERROR(_status); - while (*_all_block_queues_bytes > config::group_commit_max_queue_size) { + while (_all_block_queues_bytes->load(std::memory_order_relaxed) > + config::group_commit_max_queue_size) { _put_cond.wait_for( l, std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME)); } if (block->rows() > 0) { _block_queue.push_back(block); - *_all_block_queues_bytes += block->bytes(); - *_single_block_queue_bytes += block->bytes(); + _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); + _single_block_queue_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); } _get_cond.notify_all(); return Status::OK(); @@ -81,11 +85,11 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo fblock->swap_future_block(future_block); *find_block = true; _block_queue.pop_front(); - *_all_block_queues_bytes -= fblock->bytes(); - *_single_block_queue_bytes -= block->bytes(); + _all_block_queues_bytes->fetch_sub(fblock->bytes(), std::memory_order_relaxed); + _single_block_queue_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed); } if (_block_queue.empty() && need_commit && _load_ids.empty()) { - CHECK(*_single_block_queue_bytes == 0); + CHECK_EQ(_single_block_queue_bytes->load(), 0); *eos = true; } else { *eos = false; @@ -121,8 +125,8 @@ void LoadBlockQueue::cancel(const Status& st) { auto& future_block = _block_queue.front(); std::unique_lock l0(*(future_block->lock)); future_block->set_result(st, future_block->rows(), 0); - *_all_block_queues_bytes -= future_block->bytes(); - *_single_block_queue_bytes -= future_block->bytes(); + _all_block_queues_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed); + _single_block_queue_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed); future_block->cv->notify_all(); } _block_queue.pop_front(); diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 0730cb2a53..266fdf72f0 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -47,6 +47,7 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) { _db_id = table_sink.db_id; _table_id = table_sink.table_id; _base_schema_version = table_sink.base_schema_version; + _load_id = table_sink.load_id; return Status::OK(); } @@ -155,8 +156,8 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, std::make_shared(); future_block->swap(*(output_block.get())); TUniqueId load_id; - load_id.__set_hi(load_id.hi); - load_id.__set_lo(load_id.lo); + load_id.__set_hi(_load_id.hi); + load_id.__set_lo(_load_id.lo); future_block->set_info(_base_schema_version, load_id); if (_load_block_queue == nullptr) { RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue( diff --git a/be/test/olap/wal_manager_test.cpp b/be/test/olap/wal_manager_test.cpp index ec387680a6..64588dc5d1 100644 --- a/be/test/olap/wal_manager_test.cpp +++ b/be/test/olap/wal_manager_test.cpp @@ -80,7 +80,8 @@ public: void createWal(const std::string& wal_path) { std::shared_ptr _all_wal_disk_bytes = std::make_shared(0); - auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes); + std::shared_ptr cv = std::make_shared(); + auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes, cv); static_cast(wal_writer.init()); static_cast(wal_writer.finalize()); } diff --git a/be/test/olap/wal_reader_writer_test.cpp b/be/test/olap/wal_reader_writer_test.cpp index 09460477e3..1d1102f350 100644 --- a/be/test/olap/wal_reader_writer_test.cpp +++ b/be/test/olap/wal_reader_writer_test.cpp @@ -92,7 +92,8 @@ TEST_F(WalReaderWriterTest, TestWriteAndRead1) { std::string file_name = _s_test_data_path + "/abcd123.txt"; std::shared_ptr _all_wal_disk_bytes = std::make_shared(0); - auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes); + std::shared_ptr cv = std::make_shared(); + auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes, cv); static_cast(wal_writer.init()); size_t file_len = 0; int64_t file_size = -1; diff --git a/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.csv.gz b/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.csv.gz new file mode 100644 index 0000000000..539fb01b41 Binary files /dev/null and b/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.csv.gz differ diff --git a/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out b/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out new file mode 100644 index 0000000000..4b064bec44 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 + +-- !2 -- +1 + +-- !3 -- +1 + diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy new file mode 100644 index 0000000000..910589df11 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_group_commit_and_wal_back_pressure", "p2") { + + def tableName = "test_group_commit_and_wal_back_pressure" + sql """ DROP TABLE IF EXISTS ${tableName}1 """ + sql """ + CREATE TABLE ${tableName}1 ( + k bigint, + v string + ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) BUCKETS 32 + PROPERTIES( + "replication_num" = "1" + ); + """ + + sql """ DROP TABLE IF EXISTS ${tableName}2 """ + sql """ + CREATE TABLE ${tableName}2 ( + k bigint, + v string + ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) BUCKETS 32 + PROPERTIES( + "replication_num" = "1" + ); + """ + + sql """ DROP TABLE IF EXISTS ${tableName}3 """ + sql """ + CREATE TABLE ${tableName}3 ( + k bigint, + v string + ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) BUCKETS 32 + PROPERTIES( + "replication_num" = "1" + ); + """ + + def t1 = [] + for (int i = 0; i < 20; i++) { + t1.add(Thread.startDaemon { + streamLoad { + table "${tableName}1" + + set 'column_separator', ',' + set 'compress_type', 'GZ' + set 'format', 'csv' + set 'group_commit', 'true' + unset 'label' + + file 'test_group_commit_and_wal_back_pressure.csv.gz' + time 100000 + } + }) + } + + def t2 = [] + for (int i = 0; i < 20; i++) { + t2.add(Thread.startDaemon { + streamLoad { + table "${tableName}2" + + set 'column_separator', ',' + set 'compress_type', 'GZ' + set 'format', 'csv' + set 'group_commit', 'true' + unset 'label' + + file 'test_group_commit_and_wal_back_pressure.csv.gz' + time 100000 + } + }) + } + + def t3 = [] + for (int i = 0; i < 20; i++) { + t3.add(Thread.startDaemon { + streamLoad { + table "${tableName}3" + + set 'column_separator', ',' + set 'compress_type', 'GZ' + set 'format', 'csv' + set 'group_commit', 'true' + unset 'label' + + file 'test_group_commit_and_wal_back_pressure.csv.gz' + time 100000 + } + }) + } + + for (Thread th in t1) { + th.join() + } + + for (Thread th in t2) { + th.join() + } + + for (Thread th in t3) { + th.join() + } + + sql "sync" + + qt_1 """ select count(*) from ${tableName}1;""" + + qt_2 """ select count(*) from ${tableName}2;""" + + qt_3 """ select count(*) from ${tableName}3;""" + +} \ No newline at end of file