diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a53885bd13..fab834639e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1125,6 +1125,7 @@ DEFINE_mInt32(group_commit_queue_mem_limit, "67108864"); // Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. DEFINE_String(group_commit_wal_max_disk_limit, "10%"); +DEFINE_Bool(group_commit_wait_replay_wal_finish, "false"); DEFINE_mInt32(scan_thread_nice_value, "0"); DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 9a287a0c24..9065baae53 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1191,6 +1191,7 @@ DECLARE_mInt32(group_commit_queue_mem_limit); // Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. DECLARE_mString(group_commit_wal_max_disk_limit); +DECLARE_Bool(group_commit_wait_replay_wal_finish); // The configuration item is used to lower the priority of the scanner thread, // typically employed to ensure CPU scheduling for write operations. diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp index 9d92f5c9e6..19b92c3859 100644 --- a/be/src/olap/wal/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -338,6 +338,15 @@ Status WalManager::_scan_wals(const std::string& wal_path) { int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10); int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10); add_wal_status_queue(tb_id, wal_id, WalManager::WalStatus::REPLAY); + if (config::group_commit_wait_replay_wal_finish) { + std::shared_ptr lock = std::make_shared(); + std::shared_ptr cv = + std::make_shared(); + auto add_st = add_wal_cv_map(wal_id, lock, cv); + if (!add_st.ok()) { + LOG(WARNING) << "fail to add wal_id " << wal_id << " to wal_cv_map"; + } + } RETURN_IF_ERROR(add_recover_wal(db_id, tb_id, wal_id, wal_file)); } catch (const std::invalid_argument& e) { return Status::InvalidArgument("Invalid format, {}", e.what()); @@ -507,4 +516,68 @@ std::string WalManager::_get_base_wal_path(const std::string& wal_path_str) { return wal_path.string(); } +Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr lock, + std::shared_ptr cv) { + std::lock_guard wrlock(_wal_cv_lock); + auto it = _wal_cv_map.find(wal_id); + if (it != _wal_cv_map.end()) { + return Status::InternalError("wal {} is already in _wal_cv_map ", wal_id); + } + auto pair = std::make_pair(lock, cv); + _wal_cv_map.emplace(wal_id, pair); + LOG(INFO) << "add " << wal_id << " to _wal_cv_map"; + return Status::OK(); +} + +Status WalManager::erase_wal_cv_map(int64_t wal_id) { + std::lock_guard wrlock(_wal_cv_lock); + if (_wal_cv_map.erase(wal_id)) { + LOG(INFO) << "erase " << wal_id << " from _wal_cv_map"; + } else { + return Status::InternalError("fail to erase wal {} from wal_cv_map", wal_id); + } + return Status::OK(); +} + +Status WalManager::wait_replay_wal_finish(int64_t wal_id) { + std::shared_ptr lock = nullptr; + std::shared_ptr cv = nullptr; + auto st = get_lock_and_cv(wal_id, lock, cv); + if (st.ok()) { + std::unique_lock l(*(lock)); + LOG(INFO) << "start wait " << wal_id; + if (cv->wait_for(l, std::chrono::seconds(180)) == std::cv_status::timeout) { + LOG(WARNING) << "wait for " << wal_id << " is time out"; + } + LOG(INFO) << "get wal " << wal_id << ",finish wait"; + RETURN_IF_ERROR(erase_wal_cv_map(wal_id)); + LOG(INFO) << "erase wal " << wal_id; + } + return Status::OK(); +} + +Status WalManager::notify_relay_wal(int64_t wal_id) { + std::shared_ptr lock = nullptr; + std::shared_ptr cv = nullptr; + auto st = get_lock_and_cv(wal_id, lock, cv); + if (st.ok()) { + std::unique_lock l(*(lock)); + cv->notify_all(); + LOG(INFO) << "get wal " << wal_id << ",notify all"; + } + return Status::OK(); +} + +Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr& lock, + std::shared_ptr& cv) { + std::lock_guard wrlock(_wal_cv_lock); + auto it = _wal_cv_map.find(wal_id); + if (it == _wal_cv_map.end()) { + return Status::InternalError("cannot find txn {} in _wal_cv_map", wal_id); + } + lock = it->second.first; + cv = it->second.second; + return Status::OK(); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h index f6a3bfef79..b0e4d1d196 100644 --- a/be/src/olap/wal/wal_manager.h +++ b/be/src/olap/wal/wal_manager.h @@ -92,6 +92,15 @@ public: void erase_wal_column_index(int64_t wal_id); Status get_wal_column_index(int64_t wal_id, std::vector& column_index); + //for test relay + Status add_wal_cv_map(int64_t wal_id, std::shared_ptr lock, + std::shared_ptr cv); + Status erase_wal_cv_map(int64_t wal_id); + Status get_lock_and_cv(int64_t wal_id, std::shared_ptr& lock, + std::shared_ptr& cv); + Status wait_replay_wal_finish(int64_t wal_id); + Status notify_relay_wal(int64_t wal_id); + private: // wal back pressure Status _init_wal_dirs_conf(); @@ -138,5 +147,12 @@ private: // TODO should remove std::shared_mutex _wal_column_id_map_lock; std::unordered_map&> _wal_column_id_map; + + // for test relay + // + using WalCvInfo = + std::pair, std::shared_ptr>; + std::shared_mutex _wal_cv_lock; + std::unordered_map _wal_cv_map; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index b3c8f96541..5187e594ad 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -68,6 +68,12 @@ void WalTable::_pick_relay_wals() { LOG(WARNING) << "rename " << it->first << " fail" << ",st:" << st.to_string(); } + if (config::group_commit_wait_replay_wal_finish) { + auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(it->second->get_wal_id()); + if (!notify_st.ok()) { + LOG(WARNING) << "notify wal " << it->second->get_wal_id() << " fail"; + } + } need_erase_wals.push_back(it->first); continue; } @@ -122,6 +128,9 @@ Status WalTable::_relay_wal_one_by_one() { if (!st.ok()) { LOG(WARNING) << "fail to delete wal " << delete_wal_info->get_wal_path(); } + if (config::group_commit_wait_replay_wal_finish) { + RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(delete_wal_info->get_wal_id())); + } } return Status::OK(); } @@ -173,6 +182,9 @@ Status WalTable::_rename_to_tmp_path(const std::string wal) { } bool WalTable::_need_replay(std::shared_ptr wal_info) { + if (config::group_commit_wait_replay_wal_finish) { + return true; + } #ifndef BE_TEST auto replay_interval = pow(2, wal_info->get_retry_num()) * config::group_commit_replay_wal_retry_interval_seconds * 1000; @@ -210,9 +222,11 @@ Status WalTable::_replay_wal_internal(const std::string& wal) { auto wal_id = pair->first; auto label = pair->second; #ifndef BE_TEST - auto st = _try_abort_txn(_db_id, wal_id); - if (!st.ok()) { - LOG(WARNING) << "abort txn " << wal_id << " fail"; + if (!config::group_commit_wait_replay_wal_finish) { + auto st = _try_abort_txn(_db_id, wal_id); + if (!st.ok()) { + LOG(WARNING) << "abort txn " << wal_id << " fail"; + } } RETURN_IF_ERROR(_get_column_info(_db_id, _table_id)); #endif @@ -281,6 +295,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, ctx->label = label; ctx->auth.token = "relay_wal"; // this is a fake, fe not check it now ctx->auth.user = "admin"; + ctx->group_commit = false; auto st = _http_stream_action->process_put(nullptr, ctx); if (st.ok()) { // wait stream load finish diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 8a81388ecd..1bf189c384 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -52,8 +52,12 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, } RETURN_IF_ERROR(status); if (block->rows() > 0) { - _block_queue.push_back(block); - if (write_wal) { + if (!config::group_commit_wait_replay_wal_finish) { + _block_queue.push_back(block); + } else { + LOG(INFO) << "skip adding block to queue on txn " << txn_id; + } + if (write_wal || config::group_commit_wait_replay_wal_finish) { auto st = _v_wal_writer->write_wal(block.get()); if (!st.ok()) { _cancel_without_lock(st); @@ -383,9 +387,11 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ // result_status: commit txn result if (status.ok() && st.ok() && (result_status.ok() || result_status.is())) { - RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal( - txn_id, load_block_queue->block_queue_pre_allocated())); - RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id)); + if (!config::group_commit_wait_replay_wal_finish) { + RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal( + txn_id, load_block_queue->block_queue_pre_allocated())); + RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id)); + } } else { std::string wal_path; RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path)); @@ -491,10 +497,13 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector& slot_desc, int be_exe_version) { - RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path( - db_id, tb_id, wal_id, import_label, _wal_base_path)); + std::string real_label = config::group_commit_wait_replay_wal_finish + ? import_label + "_test_wait" + : import_label; + RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(db_id, tb_id, wal_id, + real_label, _wal_base_path)); _v_wal_writer = std::make_shared( - tb_id, wal_id, import_label, wal_manager, slot_desc, be_exe_version); + db_id, tb_id, wal_id, real_label, wal_manager, slot_desc, be_exe_version); return _v_wal_writer->init(); } diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index c697d373f1..2dfd32c14f 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -24,10 +24,11 @@ namespace doris { namespace vectorized { -VWalWriter::VWalWriter(int64_t tb_id, int64_t wal_id, const std::string& import_label, - WalManager* wal_manager, std::vector& slot_desc, - int be_exe_version) - : _tb_id(tb_id), +VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, + const std::string& import_label, WalManager* wal_manager, + std::vector& slot_desc, int be_exe_version) + : _db_id(db_id), + _tb_id(tb_id), _wal_id(wal_id), _label(import_label), _wal_manager(wal_manager), @@ -39,6 +40,16 @@ VWalWriter::~VWalWriter() {} Status VWalWriter::init() { RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer)); _wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WalStatus::CREATE); +#ifndef BE_TEST + if (config::group_commit_wait_replay_wal_finish) { + std::shared_ptr lock = std::make_shared(); + std::shared_ptr cv = std::make_shared(); + auto add_st = _wal_manager->add_wal_cv_map(_wal_id, lock, cv); + if (!add_st.ok()) { + LOG(WARNING) << "fail to add wal_id " << _wal_id << " to wal_cv_map"; + } + } +#endif std::stringstream ss; for (auto slot_desc : _slot_descs) { if (slot_desc.col_unique_id < 0) { @@ -61,6 +72,13 @@ Status VWalWriter::write_wal(vectorized::Block* block) { } Status VWalWriter::close() { + if (config::group_commit_wait_replay_wal_finish) { + std::string wal_path; + RETURN_IF_ERROR(_wal_manager->get_wal_path(_wal_id, wal_path)); + LOG(INFO) << "close file " << wal_path; + RETURN_IF_ERROR(_wal_manager->add_recover_wal(_db_id, _tb_id, _wal_id, wal_path)); + RETURN_IF_ERROR(_wal_manager->wait_replay_wal_finish(_wal_id)); + } if (_wal_writer != nullptr) { RETURN_IF_ERROR(_wal_writer->finalize()); } diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h index 6593837373..cbd369f7cb 100644 --- a/be/src/vec/sink/writer/vwal_writer.h +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -29,7 +29,7 @@ namespace vectorized { class VWalWriter { public: - VWalWriter(int64_t tb_id, int64_t wal_id, const std::string& import_label, + VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector& slot_desc, int be_exe_version); ~VWalWriter(); @@ -38,6 +38,7 @@ public: Status close(); private: + int64_t _db_id; int64_t _tb_id; int64_t _wal_id; // TODO version should in olap/wal_writer