[enhancement](group commit)Add group commit block queues memory back pressure (#26045)
This commit is contained in:
@ -1116,6 +1116,9 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
|
||||
// Dir of default timezone files
|
||||
DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
|
||||
|
||||
// Max size(bytes) of group commit queues, used for mem back pressure.
|
||||
DEFINE_Int32(group_commit_max_queue_size, "65536");
|
||||
|
||||
// clang-format off
|
||||
#ifdef BE_TEST
|
||||
// test s3
|
||||
|
||||
@ -1186,6 +1186,9 @@ DECLARE_Bool(ignore_always_true_predicate_for_segment);
|
||||
// Dir of default timezone files
|
||||
DECLARE_String(default_tzfiles_path);
|
||||
|
||||
// Max size(bytes) of group commit queues, used for mem back pressure.
|
||||
DECLARE_Int32(group_commit_max_queue_size);
|
||||
|
||||
#ifdef BE_TEST
|
||||
// test s3
|
||||
DECLARE_String(test_s3_resource);
|
||||
|
||||
@ -23,7 +23,11 @@
|
||||
#include <gen_cpp/PaloInternalService_types.h>
|
||||
#include <gen_cpp/Types_types.h>
|
||||
|
||||
#include <memory>
|
||||
#include <numeric>
|
||||
|
||||
#include "client_cache.h"
|
||||
#include "common/config.h"
|
||||
#include "common/object_pool.h"
|
||||
#include "exec/data_sink.h"
|
||||
#include "io/fs/stream_load_pipe.h"
|
||||
@ -34,6 +38,7 @@
|
||||
#include "runtime/stream_load/new_load_stream_mgr.h"
|
||||
#include "runtime/stream_load/stream_load_context.h"
|
||||
#include "util/thrift_rpc_helper.h"
|
||||
#include "vec/core/future_block.h"
|
||||
#include "vec/exec/scan/new_file_scan_node.h"
|
||||
#include "vec/sink/group_commit_block_sink.h"
|
||||
|
||||
@ -45,10 +50,16 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
|
||||
DCHECK(block->get_schema_version() == schema_version);
|
||||
std::unique_lock l(*_mutex);
|
||||
RETURN_IF_ERROR(_status);
|
||||
while (*_all_block_queues_bytes > config::group_commit_max_queue_size) {
|
||||
_put_cond.wait_for(
|
||||
l, std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME));
|
||||
}
|
||||
if (block->rows() > 0) {
|
||||
_block_queue.push_back(block);
|
||||
*_all_block_queues_bytes += block->bytes();
|
||||
*_single_block_queue_bytes += block->bytes();
|
||||
}
|
||||
_cv->notify_one();
|
||||
_get_cond.notify_all();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -67,6 +78,7 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
|
||||
}
|
||||
while (_status.ok() && _block_queue.empty() &&
|
||||
(!need_commit || (need_commit && !_load_ids.empty()))) {
|
||||
CHECK(*_single_block_queue_bytes == 0);
|
||||
auto left_milliseconds = config::group_commit_interval_ms;
|
||||
if (!need_commit) {
|
||||
left_milliseconds = config::group_commit_interval_ms -
|
||||
@ -79,9 +91,9 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
|
||||
}
|
||||
}
|
||||
#if !defined(USE_BTHREAD_SCANNER)
|
||||
_cv->wait_for(l, std::chrono::milliseconds(left_milliseconds));
|
||||
_get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
|
||||
#else
|
||||
_cv->wait_for(l, left_milliseconds * 1000);
|
||||
_get_cond.wait_for(l, left_milliseconds * 1000);
|
||||
#endif
|
||||
}
|
||||
if (!_block_queue.empty()) {
|
||||
@ -90,12 +102,16 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
|
||||
fblock->swap_future_block(future_block);
|
||||
*find_block = true;
|
||||
_block_queue.pop_front();
|
||||
*_all_block_queues_bytes -= fblock->bytes();
|
||||
*_single_block_queue_bytes -= block->bytes();
|
||||
}
|
||||
if (_block_queue.empty() && need_commit && _load_ids.empty()) {
|
||||
CHECK(*_single_block_queue_bytes == 0);
|
||||
*eos = true;
|
||||
} else {
|
||||
*eos = false;
|
||||
}
|
||||
_put_cond.notify_all();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -103,7 +119,7 @@ void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
|
||||
std::unique_lock l(*_mutex);
|
||||
if (_load_ids.find(load_id) != _load_ids.end()) {
|
||||
_load_ids.erase(load_id);
|
||||
_cv->notify_one();
|
||||
_get_cond.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,6 +142,8 @@ void LoadBlockQueue::cancel(const Status& st) {
|
||||
auto& future_block = _block_queue.front();
|
||||
std::unique_lock<doris::Mutex> l0(*(future_block->lock));
|
||||
future_block->set_result(st, future_block->rows(), 0);
|
||||
*_all_block_queues_bytes -= future_block->bytes();
|
||||
*_single_block_queue_bytes -= future_block->bytes();
|
||||
future_block->cv->notify_all();
|
||||
}
|
||||
_block_queue.pop_front();
|
||||
@ -248,8 +266,8 @@ Status GroupCommitTable::_create_group_commit_load(
|
||||
<< ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id)
|
||||
<< ", is_pipeline=" << is_pipeline;
|
||||
{
|
||||
load_block_queue =
|
||||
std::make_shared<LoadBlockQueue>(instance_id, label, txn_id, schema_version);
|
||||
load_block_queue = std::make_shared<LoadBlockQueue>(
|
||||
instance_id, label, txn_id, schema_version, _all_block_queues_bytes);
|
||||
std::unique_lock l(_lock);
|
||||
_load_block_queues.emplace(instance_id, load_block_queue);
|
||||
_need_plan_fragment = false;
|
||||
@ -398,6 +416,7 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) {
|
||||
.set_min_threads(1)
|
||||
.set_max_threads(config::group_commit_insert_threads)
|
||||
.build(&_thread_pool));
|
||||
_all_block_queues_bytes = std::make_shared<std::atomic_size_t>(0);
|
||||
}
|
||||
|
||||
GroupCommitMgr::~GroupCommitMgr() {
|
||||
@ -536,7 +555,8 @@ Status GroupCommitMgr::get_first_block_load_queue(
|
||||
std::lock_guard wlock(_lock);
|
||||
if (_table_map.find(table_id) == _table_map.end()) {
|
||||
_table_map.emplace(table_id, std::make_shared<GroupCommitTable>(
|
||||
_exec_env, _thread_pool.get(), db_id, table_id));
|
||||
_exec_env, _thread_pool.get(), db_id, table_id,
|
||||
_all_block_queues_bytes));
|
||||
}
|
||||
group_commit_table = _table_map[table_id];
|
||||
}
|
||||
|
||||
@ -19,6 +19,9 @@
|
||||
|
||||
#include <gen_cpp/PaloInternalService_types.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "io/fs/stream_load_pipe.h"
|
||||
#include "util/lock.h"
|
||||
@ -41,14 +44,16 @@ class StreamLoadPipe;
|
||||
class LoadBlockQueue {
|
||||
public:
|
||||
LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id,
|
||||
int64_t schema_version)
|
||||
int64_t schema_version,
|
||||
std::shared_ptr<std::atomic_size_t> all_block_queues_bytes)
|
||||
: load_instance_id(load_instance_id),
|
||||
label(label),
|
||||
txn_id(txn_id),
|
||||
schema_version(schema_version),
|
||||
_start_time(std::chrono::steady_clock::now()) {
|
||||
_start_time(std::chrono::steady_clock::now()),
|
||||
_all_block_queues_bytes(all_block_queues_bytes) {
|
||||
_mutex = std::make_shared<doris::Mutex>();
|
||||
_cv = std::make_shared<doris::ConditionVariable>();
|
||||
_single_block_queue_bytes = std::make_shared<std::atomic_size_t>(0);
|
||||
};
|
||||
|
||||
Status add_block(std::shared_ptr<vectorized::FutureBlock> block);
|
||||
@ -57,6 +62,7 @@ public:
|
||||
void remove_load_id(const UniqueId& load_id);
|
||||
void cancel(const Status& st);
|
||||
|
||||
static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
|
||||
UniqueId load_instance_id;
|
||||
std::string label;
|
||||
int64_t txn_id;
|
||||
@ -67,19 +73,28 @@ private:
|
||||
std::chrono::steady_clock::time_point _start_time;
|
||||
|
||||
std::shared_ptr<doris::Mutex> _mutex;
|
||||
std::shared_ptr<doris::ConditionVariable> _cv;
|
||||
doris::ConditionVariable _put_cond;
|
||||
doris::ConditionVariable _get_cond;
|
||||
// the set of load ids of all blocks in this queue
|
||||
std::set<UniqueId> _load_ids;
|
||||
std::list<std::shared_ptr<vectorized::FutureBlock>> _block_queue;
|
||||
|
||||
Status _status = Status::OK();
|
||||
// memory consumption of all tables' load block queues, used for back pressure.
|
||||
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
|
||||
// memory consumption of one load block queue, used for correctness check.
|
||||
std::shared_ptr<std::atomic_size_t> _single_block_queue_bytes;
|
||||
};
|
||||
|
||||
class GroupCommitTable {
|
||||
public:
|
||||
GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool, int64_t db_id,
|
||||
int64_t table_id)
|
||||
: _exec_env(exec_env), _thread_pool(thread_pool), _db_id(db_id), _table_id(table_id) {};
|
||||
int64_t table_id, std::shared_ptr<std::atomic_size_t> all_block_queue_bytes)
|
||||
: _exec_env(exec_env),
|
||||
_thread_pool(thread_pool),
|
||||
_db_id(db_id),
|
||||
_table_id(table_id),
|
||||
_all_block_queues_bytes(all_block_queue_bytes) {};
|
||||
Status get_first_block_load_queue(int64_t table_id,
|
||||
std::shared_ptr<vectorized::FutureBlock> block,
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue);
|
||||
@ -105,6 +120,8 @@ private:
|
||||
// fragment_instance_id to load_block_queue
|
||||
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues;
|
||||
bool _need_plan_fragment = false;
|
||||
// memory consumption of all tables' load block queues, used for back pressure.
|
||||
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
|
||||
};
|
||||
|
||||
class GroupCommitMgr {
|
||||
@ -142,6 +159,8 @@ private:
|
||||
// thread pool to handle insert into: append data to pipe
|
||||
std::unique_ptr<doris::ThreadPool> _insert_into_thread_pool;
|
||||
std::unique_ptr<doris::ThreadPool> _thread_pool;
|
||||
// memory consumption of all tables' load block queues, used for back pressure.
|
||||
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
Reference in New Issue
Block a user