[Feature](group commit) move group_commit_interval_ms from be.conf to table property (#27116)
This commit is contained in:
@ -51,10 +51,10 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
|
||||
*eos = false;
|
||||
std::unique_lock l(mutex);
|
||||
if (!need_commit) {
|
||||
auto left_milliseconds = config::group_commit_interval_ms -
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - _start_time)
|
||||
.count();
|
||||
auto left_milliseconds =
|
||||
_group_commit_interval_ms - std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - _start_time)
|
||||
.count();
|
||||
if (left_milliseconds <= 0) {
|
||||
need_commit = true;
|
||||
}
|
||||
@ -62,9 +62,9 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
|
||||
while (_status.ok() && _block_queue.empty() &&
|
||||
(!need_commit || (need_commit && !_load_ids.empty()))) {
|
||||
CHECK(*_single_block_queue_bytes == 0);
|
||||
auto left_milliseconds = config::group_commit_interval_ms;
|
||||
auto left_milliseconds = _group_commit_interval_ms;
|
||||
if (!need_commit) {
|
||||
left_milliseconds = config::group_commit_interval_ms -
|
||||
left_milliseconds = _group_commit_interval_ms -
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - _start_time)
|
||||
.count();
|
||||
@ -251,7 +251,7 @@ Status GroupCommitTable::_create_group_commit_load(
|
||||
{
|
||||
load_block_queue = std::make_shared<LoadBlockQueue>(
|
||||
instance_id, label, txn_id, schema_version, _all_block_queues_bytes,
|
||||
result.wait_internal_group_commit_finish);
|
||||
result.wait_internal_group_commit_finish, result.group_commit_interval_ms);
|
||||
std::unique_lock l(_lock);
|
||||
_load_block_queues.emplace(instance_id, load_block_queue);
|
||||
_need_plan_fragment = false;
|
||||
|
||||
Reference in New Issue
Block a user