[improve](group commit) Group commit support commit by data size (#29428)
This commit is contained in:
@ -36,11 +36,11 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
|
||||
while (!runtime_state->is_cancelled() && status.ok() &&
|
||||
_all_block_queues_bytes->load(std::memory_order_relaxed) >
|
||||
config::group_commit_queue_mem_limit) {
|
||||
_put_cond.wait_for(
|
||||
l, std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME));
|
||||
_put_cond.wait_for(l,
|
||||
std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME));
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - start);
|
||||
if (duration.count() > LoadBlockQueue::WAL_MEM_BACK_PRESSURE_TIME_OUT) {
|
||||
if (duration.count() > LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIMEOUT) {
|
||||
return Status::TimedOut(
|
||||
"Wal memory back pressure wait too much time! Load block queue txn id: {}, "
|
||||
"label: {}, instance id: {}",
|
||||
@ -60,8 +60,14 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
|
||||
return st;
|
||||
}
|
||||
}
|
||||
_data_bytes += block->bytes();
|
||||
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
|
||||
}
|
||||
if (_data_bytes >= _group_commit_data_bytes) {
|
||||
VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label
|
||||
<< ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes;
|
||||
_need_commit = true;
|
||||
}
|
||||
_get_cond.notify_all();
|
||||
return Status::OK();
|
||||
}
|
||||
@ -71,25 +77,25 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
|
||||
*find_block = false;
|
||||
*eos = false;
|
||||
std::unique_lock l(mutex);
|
||||
if (!need_commit) {
|
||||
if (!_need_commit) {
|
||||
auto left_milliseconds =
|
||||
_group_commit_interval_ms - std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - _start_time)
|
||||
.count();
|
||||
if (left_milliseconds <= 0) {
|
||||
need_commit = true;
|
||||
_need_commit = true;
|
||||
}
|
||||
}
|
||||
while (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() &&
|
||||
(!need_commit || (need_commit && !_load_ids.empty()))) {
|
||||
(!_need_commit || (_need_commit && !_load_ids.empty()))) {
|
||||
auto left_milliseconds = _group_commit_interval_ms;
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - _start_time)
|
||||
.count();
|
||||
if (!need_commit) {
|
||||
if (!_need_commit) {
|
||||
left_milliseconds = _group_commit_interval_ms - duration;
|
||||
if (left_milliseconds <= 0) {
|
||||
need_commit = true;
|
||||
_need_commit = true;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
@ -120,7 +126,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
|
||||
_block_queue.pop_front();
|
||||
_all_block_queues_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed);
|
||||
}
|
||||
if (_block_queue.empty() && need_commit && _load_ids.empty()) {
|
||||
if (_block_queue.empty() && _need_commit && _load_ids.empty()) {
|
||||
*eos = true;
|
||||
} else {
|
||||
*eos = false;
|
||||
@ -139,7 +145,7 @@ void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
|
||||
|
||||
Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
|
||||
std::unique_lock l(mutex);
|
||||
if (need_commit) {
|
||||
if (_need_commit) {
|
||||
return Status::InternalError("block queue is set need commit, id=" +
|
||||
load_instance_id.to_string());
|
||||
}
|
||||
@ -175,7 +181,7 @@ Status GroupCommitTable::get_first_block_load_queue(
|
||||
for (int i = 0; i < 3; i++) {
|
||||
bool is_schema_version_match = true;
|
||||
for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) {
|
||||
if (!it->second->need_commit) {
|
||||
if (!it->second->need_commit()) {
|
||||
if (base_schema_version == it->second->schema_version) {
|
||||
if (it->second->add_load_id(load_id).ok()) {
|
||||
load_block_queue = it->second;
|
||||
@ -282,7 +288,8 @@ Status GroupCommitTable::_create_group_commit_load(
|
||||
{
|
||||
load_block_queue = std::make_shared<LoadBlockQueue>(
|
||||
instance_id, label, txn_id, schema_version, _all_block_queues_bytes,
|
||||
result.wait_internal_group_commit_finish, result.group_commit_interval_ms);
|
||||
result.wait_internal_group_commit_finish, result.group_commit_interval_ms,
|
||||
result.group_commit_data_bytes);
|
||||
std::unique_lock l(_lock);
|
||||
_load_block_queues.emplace(instance_id, load_block_queue);
|
||||
_need_plan_fragment = false;
|
||||
@ -377,7 +384,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
|
||||
if (status.ok() && st.ok() &&
|
||||
(result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
|
||||
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(
|
||||
txn_id, load_block_queue->block_queue_pre_allocated.load()));
|
||||
txn_id, load_block_queue->block_queue_pre_allocated()));
|
||||
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id));
|
||||
} else {
|
||||
std::string wal_path;
|
||||
@ -485,7 +492,7 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
|
||||
const std::string& import_label, WalManager* wal_manager,
|
||||
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) {
|
||||
RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id, tb_id, wal_id,
|
||||
import_label, wal_base_path));
|
||||
import_label, _wal_base_path));
|
||||
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
|
||||
tb_id, wal_id, import_label, wal_manager, slot_desc, be_exe_version);
|
||||
return _v_wal_writer->init();
|
||||
@ -502,17 +509,17 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
|
||||
auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
|
||||
size_t available_bytes = 0;
|
||||
{
|
||||
Status st = wal_mgr->get_wal_dir_available_size(wal_base_path, &available_bytes);
|
||||
Status st = wal_mgr->get_wal_dir_available_size(_wal_base_path, &available_bytes);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "get wal disk available size filed!";
|
||||
}
|
||||
}
|
||||
if (pre_allocated < available_bytes) {
|
||||
Status st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, pre_allocated, true);
|
||||
Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, true);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string();
|
||||
}
|
||||
block_queue_pre_allocated.fetch_add(pre_allocated);
|
||||
_block_queue_pre_allocated.fetch_add(pre_allocated);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
||||
Reference in New Issue
Block a user