[refactor](group commit) remove future block (#27720)
Co-authored-by: huanghaibin <284824253@qq.com>
This commit is contained in:
@ -32,8 +32,7 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block) {
|
||||
DCHECK(block->get_schema_version() == schema_version);
|
||||
Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block) {
|
||||
std::unique_lock l(mutex);
|
||||
RETURN_IF_ERROR(_status);
|
||||
while (_all_block_queues_bytes->load(std::memory_order_relaxed) >
|
||||
@ -43,6 +42,8 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
|
||||
}
|
||||
if (block->rows() > 0) {
|
||||
_block_queue.push_back(block);
|
||||
//write wal
|
||||
RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get()));
|
||||
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
|
||||
_single_block_queue_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
|
||||
}
|
||||
@ -80,9 +81,8 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
|
||||
_get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
|
||||
}
|
||||
if (!_block_queue.empty()) {
|
||||
auto& future_block = _block_queue.front();
|
||||
auto* fblock = static_cast<vectorized::FutureBlock*>(block);
|
||||
fblock->swap_future_block(future_block);
|
||||
auto fblock = _block_queue.front();
|
||||
block->swap(*fblock.get());
|
||||
*find_block = true;
|
||||
_block_queue.pop_front();
|
||||
_all_block_queues_bytes->fetch_sub(fblock->bytes(), std::memory_order_relaxed);
|
||||
@ -123,21 +123,18 @@ void LoadBlockQueue::cancel(const Status& st) {
|
||||
while (!_block_queue.empty()) {
|
||||
{
|
||||
auto& future_block = _block_queue.front();
|
||||
std::unique_lock<std::mutex> l0(*(future_block->lock));
|
||||
future_block->set_result(st, future_block->rows(), 0);
|
||||
_all_block_queues_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed);
|
||||
_single_block_queue_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed);
|
||||
future_block->cv->notify_all();
|
||||
}
|
||||
_block_queue.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
Status GroupCommitTable::get_first_block_load_queue(
|
||||
int64_t table_id, std::shared_ptr<vectorized::FutureBlock> block,
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
|
||||
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
|
||||
std::shared_ptr<vectorized::Block> block, std::shared_ptr<LoadBlockQueue>& load_block_queue,
|
||||
int be_exe_version) {
|
||||
DCHECK(table_id == _table_id);
|
||||
auto base_schema_version = block->get_schema_version();
|
||||
{
|
||||
std::unique_lock l(_lock);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
@ -145,7 +142,7 @@ Status GroupCommitTable::get_first_block_load_queue(
|
||||
for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) {
|
||||
if (!it->second->need_commit) {
|
||||
if (base_schema_version == it->second->schema_version) {
|
||||
if (it->second->add_load_id(block->get_load_id()).ok()) {
|
||||
if (it->second->add_load_id(load_id).ok()) {
|
||||
load_block_queue = it->second;
|
||||
return Status::OK();
|
||||
}
|
||||
@ -160,13 +157,14 @@ Status GroupCommitTable::get_first_block_load_queue(
|
||||
if (!_need_plan_fragment) {
|
||||
_need_plan_fragment = true;
|
||||
RETURN_IF_ERROR(_thread_pool->submit_func([&] {
|
||||
[[maybe_unused]] auto st = _create_group_commit_load(load_block_queue);
|
||||
[[maybe_unused]] auto st =
|
||||
_create_group_commit_load(load_block_queue, be_exe_version);
|
||||
}));
|
||||
}
|
||||
_cv.wait_for(l, std::chrono::seconds(4));
|
||||
if (load_block_queue != nullptr) {
|
||||
if (load_block_queue->schema_version == base_schema_version) {
|
||||
if (load_block_queue->add_load_id(block->get_load_id()).ok()) {
|
||||
if (load_block_queue->add_load_id(load_id).ok()) {
|
||||
return Status::OK();
|
||||
}
|
||||
} else if (base_schema_version < load_block_queue->schema_version) {
|
||||
@ -180,7 +178,7 @@ Status GroupCommitTable::get_first_block_load_queue(
|
||||
}
|
||||
|
||||
Status GroupCommitTable::_create_group_commit_load(
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) {
|
||||
Status st = Status::OK();
|
||||
std::unique_ptr<int, std::function<void(int*)>> finish_plan_func((int*)0x01, [&](int*) {
|
||||
if (!st.ok()) {
|
||||
@ -251,16 +249,16 @@ Status GroupCommitTable::_create_group_commit_load(
|
||||
std::unique_lock l(_lock);
|
||||
_load_block_queues.emplace(instance_id, load_block_queue);
|
||||
_need_plan_fragment = false;
|
||||
_cv.notify_all();
|
||||
}
|
||||
if (_exec_env->wal_mgr()->is_running()) {
|
||||
_exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id,
|
||||
WalManager::WAL_STATUS::PREPARE);
|
||||
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params,
|
||||
pipeline_params);
|
||||
} else {
|
||||
st = Status::InternalError("be is stopping");
|
||||
//create wal
|
||||
RETURN_IF_ERROR(
|
||||
load_block_queue->create_wal(_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
|
||||
params.desc_tbl.slotDescriptors, be_exe_version));
|
||||
_cv.notify_all();
|
||||
}
|
||||
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params,
|
||||
pipeline_params);
|
||||
if (!st.ok()) {
|
||||
static_cast<void>(_finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id,
|
||||
st, true, nullptr));
|
||||
@ -315,6 +313,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
|
||||
auto it = _load_block_queues.find(instance_id);
|
||||
if (it != _load_block_queues.end()) {
|
||||
auto& load_block_queue = it->second;
|
||||
//close wal
|
||||
RETURN_IF_ERROR(load_block_queue->close_wal());
|
||||
if (prepare_failed || !status.ok()) {
|
||||
load_block_queue->cancel(status);
|
||||
}
|
||||
@ -420,9 +420,12 @@ void GroupCommitMgr::stop() {
|
||||
LOG(INFO) << "GroupCommitMgr is stopped";
|
||||
}
|
||||
|
||||
Status GroupCommitMgr::get_first_block_load_queue(
|
||||
int64_t db_id, int64_t table_id, std::shared_ptr<vectorized::FutureBlock> block,
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
|
||||
Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_id,
|
||||
int64_t base_schema_version,
|
||||
const UniqueId& load_id,
|
||||
std::shared_ptr<vectorized::Block> block,
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue,
|
||||
int be_exe_version) {
|
||||
std::shared_ptr<GroupCommitTable> group_commit_table;
|
||||
{
|
||||
std::lock_guard wlock(_lock);
|
||||
@ -433,7 +436,8 @@ Status GroupCommitMgr::get_first_block_load_queue(
|
||||
}
|
||||
group_commit_table = _table_map[table_id];
|
||||
}
|
||||
return group_commit_table->get_first_block_load_queue(table_id, block, load_block_queue);
|
||||
return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id,
|
||||
block, load_block_queue, be_exe_version);
|
||||
}
|
||||
|
||||
Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
|
||||
@ -450,4 +454,18 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i
|
||||
}
|
||||
return group_commit_table->get_load_block_queue(instance_id, load_block_queue);
|
||||
}
|
||||
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
|
||||
const std::string& import_label, WalManager* wal_manager,
|
||||
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) {
|
||||
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
|
||||
db_id, tb_id, txn_id, label, wal_manager, slot_desc, be_exe_version);
|
||||
return _v_wal_writer->init();
|
||||
}
|
||||
|
||||
Status LoadBlockQueue::close_wal() {
|
||||
if (_v_wal_writer != nullptr) {
|
||||
RETURN_IF_ERROR(_v_wal_writer->close());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user