diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp index 19b92c3859..b1931f62a6 100644 --- a/be/src/olap/wal/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -28,9 +28,9 @@ #include "common/config.h" #include "common/status.h" +#include "gutil/strings/split.h" #include "io/fs/local_file_system.h" #include "olap/wal/wal_dirs_info.h" -#include "olap/wal/wal_writer.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/plan_fragment_executor.h" @@ -40,7 +40,7 @@ namespace doris { WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) : _exec_env(exec_env), _stop(false), _stop_background_threads_latch(1) { - doris::vectorized::WalReader::string_split(wal_dir_list, ";", _wal_dirs); + _wal_dirs = strings::Split(wal_dir_list, ";", strings::SkipWhitespace()); static_cast(ThreadPoolBuilder("GroupCommitReplayWalThreadPool") .set_min_threads(1) .set_max_threads(config::group_commit_relay_wal_threads) @@ -109,7 +109,7 @@ Status WalManager::_init_wal_dirs_conf() { Status WalManager::_init_wal_dirs() { bool exists = false; for (auto wal_dir : _wal_dirs) { - std::string tmp_dir = wal_dir + "/" + tmp; + std::string tmp_dir = wal_dir + "/" + _tmp; LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); if (!exists) { @@ -160,76 +160,50 @@ Status WalManager::_init_wal_dirs_info() { &_update_wal_dirs_info_thread); } -void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status) { - std::lock_guard wrlock(_wal_status_lock); +void WalManager::add_wal_queue(int64_t table_id, int64_t wal_id) { + std::lock_guard wrlock(_wal_queue_lock); LOG(INFO) << "add wal queue " - << ",table_id:" << table_id << ",wal_id:" << wal_id << ",status:" << wal_status; - auto it = _wal_status_queues.find(table_id); - if (it == _wal_status_queues.end()) { - std::unordered_map tmp_map; - tmp_map.emplace(wal_id, wal_status); - _wal_status_queues.emplace(table_id, tmp_map); + << ",table_id:" << table_id << ",wal_id:" << wal_id; + auto it = _wal_queues.find(table_id); + if (it == _wal_queues.end()) { + std::set tmp_set; + tmp_set.insert(wal_id); + _wal_queues.emplace(table_id, tmp_set); } else { - it->second.emplace(wal_id, wal_status); + it->second.insert(wal_id); } } -Status WalManager::erase_wal_status_queue(int64_t table_id, int64_t wal_id) { - std::lock_guard wrlock(_wal_status_lock); - auto it = _wal_status_queues.find(table_id); - LOG(INFO) << "remove wal queue " - << ",table_id:" << table_id << ",wal_id:" << wal_id; - if (it == _wal_status_queues.end()) { - return Status::InternalError("table_id " + std::to_string(table_id) + - " not found in wal status queue"); - } else { +void WalManager::erase_wal_queue(int64_t table_id, int64_t wal_id) { + std::lock_guard wrlock(_wal_queue_lock); + auto it = _wal_queues.find(table_id); + if (it != _wal_queues.end()) { + LOG(INFO) << "remove wal queue " + << ",table_id:" << table_id << ",wal_id:" << wal_id; it->second.erase(wal_id); if (it->second.empty()) { - _wal_status_queues.erase(table_id); + _wal_queues.erase(table_id); } } - return Status::OK(); } -Status WalManager::get_wal_status_queue_size(const PGetWalQueueSizeRequest* request, - PGetWalQueueSizeResponse* response) { - std::lock_guard wrlock(_wal_status_lock); +size_t WalManager::get_wal_queue_size(int64_t table_id) { + std::lock_guard wrlock(_wal_queue_lock); size_t count = 0; - auto table_id = request->table_id(); - auto txn_id = request->txn_id(); - if (table_id > 0 && txn_id > 0) { - auto it = _wal_status_queues.find(table_id); - if (it == _wal_status_queues.end()) { - LOG(INFO) << ("table_id " + std::to_string(table_id) + - " not found in wal status queue"); + if (table_id > 0) { + auto it = _wal_queues.find(table_id); + if (it != _wal_queues.end()) { + return it->second.size(); } else { - for (auto wal_it = it->second.begin(); wal_it != it->second.end(); ++wal_it) { - if (wal_it->first <= txn_id) { - count += 1; - } - } + return 0; } } else { - for (auto it = _wal_status_queues.begin(); it != _wal_status_queues.end(); it++) { + //table_id is -1 meaning get all table wal size + for (auto it = _wal_queues.begin(); it != _wal_queues.end(); it++) { count += it->second.size(); } } - response->set_size(count); - if (count > 0) { - print_wal_status_queue(); - } - return Status::OK(); -} - -void WalManager::print_wal_status_queue() { - std::stringstream ss; - for (auto it = _wal_status_queues.begin(); it != _wal_status_queues.end(); ++it) { - ss << "table_id:" << it->first << std::endl; - for (auto wal_it = it->second.begin(); wal_it != it->second.end(); ++wal_it) { - ss << "wal_id:" << wal_it->first << ",status:" << wal_it->second << std::endl; - } - } - LOG(INFO) << ss.str(); + return count; } Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, @@ -239,7 +213,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_ ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" << std::to_string(wal_id) << "_" << label; { - std::lock_guard wrlock(_wal_lock); + std::lock_guard wrlock(_wal_path_lock); auto it = _wal_path_map.find(wal_id); if (it != _wal_path_map.end()) { return Status::InternalError("wal_id {} already in wal_path_map", wal_id); @@ -250,7 +224,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_ } Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { - std::shared_lock rdlock(_wal_lock); + std::shared_lock rdlock(_wal_path_lock); auto it = _wal_path_map.find(wal_id); if (it != _wal_path_map.end()) { wal_path = _wal_path_map[wal_id]; @@ -260,35 +234,6 @@ Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { return Status::OK(); } -Status WalManager::create_wal_reader(const std::string& wal_path, - std::shared_ptr& wal_reader) { - wal_reader = std::make_shared(wal_path); - RETURN_IF_ERROR(wal_reader->init()); - return Status::OK(); -} - -Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer) { - std::string wal_path; - RETURN_IF_ERROR(get_wal_path(wal_id, wal_path)); - // TODO move the create_dir into wal_writer::init - std::vector path_element; - doris::vectorized::WalReader::string_split(wal_path, "/", path_element); - std::stringstream ss; - for (int i = 0; i < path_element.size() - 1; i++) { - ss << path_element[i] << "/"; - } - std::string base_path = ss.str(); - bool exists = false; - RETURN_IF_ERROR(io::global_local_filesystem()->exists(base_path, &exists)); - if (!exists) { - RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path)); - } - LOG(INFO) << "create wal " << wal_path; - wal_writer = std::make_shared(wal_path); - RETURN_IF_ERROR(wal_writer->init()); - return Status::OK(); -} - Status WalManager::_scan_wals(const std::string& wal_path) { size_t count = 0; bool exists = true; @@ -299,7 +244,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) { return st; } for (const auto& database_id : dbs) { - if (database_id.is_file || database_id.file_name == tmp) { + if (database_id.is_file || database_id.file_name == _tmp) { continue; } std::vector tables; @@ -329,7 +274,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) { auto wal_file = table_path + "/" + wal.file_name; res.emplace_back(wal_file); { - std::lock_guard wrlock(_wal_lock); + std::lock_guard wrlock(_wal_path_lock); auto pos = wal.file_name.find("_"); try { int64_t wal_id = @@ -337,7 +282,6 @@ Status WalManager::_scan_wals(const std::string& wal_path) { _wal_path_map.emplace(wal_id, wal_file); int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10); int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10); - add_wal_status_queue(tb_id, wal_id, WalManager::WalStatus::REPLAY); if (config::group_commit_wait_replay_wal_finish) { std::shared_ptr lock = std::make_shared(); std::shared_ptr cv = @@ -396,6 +340,7 @@ Status WalManager::_replay() { Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal) { + add_wal_queue(table_id, wal_id); std::lock_guard wrlock(_table_lock); std::shared_ptr table_ptr; auto it = _table_map.find(table_id); @@ -423,23 +368,6 @@ size_t WalManager::get_wal_table_size(int64_t table_id) { } } -Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated) { - std::string wal_path; - { - std::lock_guard wrlock(_wal_lock); - auto it = _wal_path_map.find(wal_id); - if (it != _wal_path_map.end()) { - wal_path = it->second; - RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path)); - LOG(INFO) << "delete file=" << wal_path; - _wal_path_map.erase(wal_id); - } - } - RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0, - block_queue_pre_allocated)); - return Status::OK(); -} - void WalManager::_stop_relay_wal() { std::lock_guard wrlock(_table_lock); for (auto it = _table_map.begin(); it != _table_map.end(); it++) { @@ -447,32 +375,6 @@ void WalManager::_stop_relay_wal() { } } -void WalManager::add_wal_column_index(int64_t wal_id, std::vector& column_index) { - std::lock_guard wrlock(_wal_column_id_map_lock); - _wal_column_id_map.emplace(wal_id, column_index); - LOG(INFO) << "add " << wal_id << " to wal_column_id_map"; -} - -void WalManager::erase_wal_column_index(int64_t wal_id) { - std::lock_guard wrlock(_wal_column_id_map_lock); - if (_wal_column_id_map.erase(wal_id)) { - LOG(INFO) << "erase " << wal_id << " from wal_column_id_map"; - } else { - LOG(WARNING) << "fail to erase wal " << wal_id << " from wal_column_id_map"; - } -} - -Status WalManager::get_wal_column_index(int64_t wal_id, std::vector& column_index) { - std::lock_guard wrlock(_wal_column_id_map_lock); - auto it = _wal_column_id_map.find(wal_id); - if (it != _wal_column_id_map.end()) { - column_index = it->second; - } else { - return Status::InternalError("cannot find wal {} in wal_column_id_map", wal_id); - } - return Status::OK(); -} - size_t WalManager::get_max_available_size() { return _wal_dirs_info->get_max_available_size(); } @@ -580,4 +482,54 @@ Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr& return Status::OK(); } +Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t block_queue_pre_allocated) { + std::string wal_path; + { + std::lock_guard wrlock(_wal_path_lock); + auto it = _wal_path_map.find(wal_id); + if (it != _wal_path_map.end()) { + wal_path = it->second; + auto st = io::global_local_filesystem()->delete_file(wal_path); + if (st.ok()) { + LOG(INFO) << "delete file=" << wal_path; + } else { + LOG(WARNING) << "fail to delete file=" << wal_path; + } + _wal_path_map.erase(wal_id); + } + } + erase_wal_queue(table_id, wal_id); + RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0, + block_queue_pre_allocated)); + return Status::OK(); +} + +Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id) { + io::Path wal_path = wal; + std::list path_element; + for (int i = 0; i < 3; ++i) { + if (!wal_path.has_parent_path()) { + return Status::InternalError("parent path is not enough when rename " + wal); + } + path_element.push_front(wal_path.filename().string()); + wal_path = wal_path.parent_path(); + } + wal_path.append(_tmp); + for (auto path : path_element) { + wal_path.append(path); + } + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path())); + } + auto res = std::rename(wal.c_str(), wal_path.string().c_str()); + if (res < 0) { + return Status::InternalError("rename fail on path " + wal); + } + LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string(); + erase_wal_queue(table_id, wal_id); + return Status::OK(); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h index b0e4d1d196..44fdef2e6c 100644 --- a/be/src/olap/wal/wal_manager.h +++ b/be/src/olap/wal/wal_manager.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -46,13 +47,6 @@ namespace doris { class WalManager { ENABLE_FACTORY_CREATOR(WalManager); -public: - enum WalStatus { - PREPARE = 0, - REPLAY = 1, - CREATE = 2, - }; - public: WalManager(ExecEnv* exec_env, const std::string& wal_dir); ~WalManager(); @@ -72,25 +66,14 @@ public: Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label, std::string& base_path); Status get_wal_path(int64_t wal_id, std::string& wal_path); - Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0); + Status delete_wal(int64_t table_id, int64_t wal_id, size_t block_queue_pre_allocated = 0); + Status rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id); Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal); - // used for ut + void add_wal_queue(int64_t table_id, int64_t wal_id); + void erase_wal_queue(int64_t table_id, int64_t wal_id); + size_t get_wal_queue_size(int64_t table_id); + // fot ut size_t get_wal_table_size(int64_t table_id); - // TODO util function, should remove - Status create_wal_reader(const std::string& wal_path, std::shared_ptr& wal_reader); - Status create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer); - - // for wal status, can be removed - Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request, - PGetWalQueueSizeResponse* response); - void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status); - Status erase_wal_status_queue(int64_t table_id, int64_t wal_id); - void print_wal_status_queue(); - - // for _wal_column_id_map, can be removed - void add_wal_column_index(int64_t wal_id, std::vector& column_index); - void erase_wal_column_index(int64_t wal_id); - Status get_wal_column_index(int64_t wal_id, std::vector& column_index); //for test relay Status add_wal_cv_map(int64_t wal_id, std::shared_ptr lock, @@ -118,12 +101,11 @@ public: // used for be ut size_t wal_limit_test_bytes; - const std::string tmp = "tmp"; - private: ExecEnv* _exec_env = nullptr; std::atomic _stop; CountDownLatch _stop_background_threads_latch; + const std::string _tmp = "tmp"; // wal back pressure std::vector _wal_dirs; @@ -137,16 +119,11 @@ private: std::shared_mutex _table_lock; std::map> _table_map; - std::shared_mutex _wal_lock; + std::shared_mutex _wal_path_lock; std::unordered_map _wal_path_map; - // TODO Now only used for debug wal status, consider remove it - std::shared_mutex _wal_status_lock; - std::unordered_map> _wal_status_queues; - - // TODO should remove - std::shared_mutex _wal_column_id_map_lock; - std::unordered_map&> _wal_column_id_map; + std::shared_mutex _wal_queue_lock; + std::unordered_map> _wal_queues; // for test relay // diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp index 75bbaf1094..36a4c15aa2 100644 --- a/be/src/olap/wal/wal_reader.cpp +++ b/be/src/olap/wal/wal_reader.cpp @@ -70,7 +70,7 @@ Status WalReader::read_block(PBlock& block) { return Status::OK(); } -Status WalReader::read_header(uint32_t& version, std::string& col_ids) { +Status WalReader::read_header(std::string& col_ids) { size_t bytes_read = 0; std::string magic_str; magic_str.resize(k_wal_magic_length); @@ -83,7 +83,7 @@ Status WalReader::read_header(uint32_t& version, std::string& col_ids) { RETURN_IF_ERROR( file_reader->read_at(_offset, {version_buf, WalWriter::VERSION_SIZE}, &bytes_read)); _offset += WalWriter::VERSION_SIZE; - version = decode_fixed32_le(version_buf); + _version = decode_fixed32_le(version_buf); uint8_t len_buf[WalWriter::LENGTH_SIZE]; RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, WalWriter::LENGTH_SIZE}, &bytes_read)); _offset += WalWriter::LENGTH_SIZE; diff --git a/be/src/olap/wal/wal_reader.h b/be/src/olap/wal/wal_reader.h index f68a031c09..7c852ee18f 100644 --- a/be/src/olap/wal/wal_reader.h +++ b/be/src/olap/wal/wal_reader.h @@ -32,7 +32,7 @@ public: Status finalize(); Status read_block(PBlock& block); - Status read_header(uint32_t& version, std::string& col_ids); + Status read_header(std::string& col_ids); private: Status _deserialize(PBlock& block, std::string& buf); @@ -40,6 +40,7 @@ private: private: std::string _file_name; + uint32_t _version = 0; size_t _offset; io::FileReaderSPtr file_reader; }; diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index 5187e594ad..54500273da 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -19,6 +19,7 @@ #include +#include "gutil/strings/split.h" #include "http/action/http_stream.h" #include "http/action/stream_load.h" #include "http/ev_http_server.h" @@ -33,12 +34,11 @@ #include "runtime/plan_fragment_executor.h" #include "util/path_util.h" #include "util/thrift_rpc_helper.h" -#include "vec/exec/format/wal/wal_reader.h" namespace doris { WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) - : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) { + : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) { _http_stream_action = std::make_shared(exec_env); } WalTable::~WalTable() {} @@ -63,7 +63,8 @@ void WalTable::_pick_relay_wals() { if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) { LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id << ", wal=" << it->first << ", retry_num=" << wal_info->get_retry_num(); - auto st = _rename_to_tmp_path(it->first); + auto st = _exec_env->wal_mgr()->rename_to_tmp_path(it->first, _table_id, + wal_info->get_wal_id()); if (!st.ok()) { LOG(WARNING) << "rename " << it->first << " fail" << ",st:" << st.to_string(); @@ -124,7 +125,7 @@ Status WalTable::_relay_wal_one_by_one() { } } for (auto delete_wal_info : need_delete_wals) { - auto st = _delete_wal(delete_wal_info->get_wal_id()); + auto st = _exec_env->wal_mgr()->delete_wal(_table_id, delete_wal_info->get_wal_id()); if (!st.ok()) { LOG(WARNING) << "fail to delete wal " << delete_wal_info->get_wal_path(); } @@ -154,33 +155,6 @@ Status WalTable::replay_wals() { return Status::OK(); } -Status WalTable::_rename_to_tmp_path(const std::string wal) { - io::Path wal_path = wal; - std::list path_element; - for (int i = 0; i < 3; ++i) { - if (!wal_path.has_parent_path()) { - return Status::InternalError("parent path is not enough when rename " + wal); - } - path_element.push_front(wal_path.filename().string()); - wal_path = wal_path.parent_path(); - } - wal_path.append(_exec_env->wal_mgr()->tmp); - for (auto path : path_element) { - wal_path.append(path); - } - bool exists = false; - RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists)); - if (!exists) { - RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path())); - } - auto res = std::rename(wal.c_str(), wal_path.string().c_str()); - if (res < 0) { - return Status::InternalError("rename fail on path " + wal); - } - LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string(); - return Status::OK(); -} - bool WalTable::_need_replay(std::shared_ptr wal_info) { if (config::group_commit_wait_replay_wal_finish) { return true; @@ -217,10 +191,9 @@ Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) { Status WalTable::_replay_wal_internal(const std::string& wal) { LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << _table_id << ", wal=" << wal; - std::shared_ptr> pair = nullptr; - RETURN_IF_ERROR(_parse_wal_path(wal, pair)); - auto wal_id = pair->first; - auto label = pair->second; + int64_t wal_id = 0; + std::string label = ""; + RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label)); #ifndef BE_TEST if (!config::group_commit_wait_replay_wal_finish) { auto st = _try_abort_txn(_db_id, wal_id); @@ -228,22 +201,18 @@ Status WalTable::_replay_wal_internal(const std::string& wal) { LOG(WARNING) << "abort txn " << wal_id << " fail"; } } - RETURN_IF_ERROR(_get_column_info(_db_id, _table_id)); #endif RETURN_IF_ERROR(_replay_one_txn_with_stremaload(wal_id, wal, label)); return Status::OK(); } -Status WalTable::_parse_wal_path(const std::string& wal, - std::shared_ptr>& pair) { - std::vector path_element; - doris::vectorized::WalReader::string_split(wal, "/", path_element); - auto pos = path_element[path_element.size() - 1].find("_"); +Status WalTable::_parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label) { + io::Path wal_path = wal; + auto file_name = wal_path.filename().string(); + auto pos = file_name.find("_"); try { - int64_t wal_id = std::strtoll(path_element[path_element.size() - 1].substr(0, pos).c_str(), - NULL, 10); - auto label = path_element[path_element.size() - 1].substr(pos + 1); - pair = std::make_shared>(std::make_pair(wal_id, label)); + wal_id = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10); + label = file_name.substr(pos + 1); } catch (const std::invalid_argument& e) { return Status::InvalidArgument("Invalid format, {}", e.what()); } @@ -251,33 +220,29 @@ Status WalTable::_parse_wal_path(const std::string& wal, } Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label, - std::string& sql_str, std::vector& index_vector) { + std::string& sql_str) { std::string columns; RETURN_IF_ERROR(_read_wal_header(wal, columns)); - std::vector column_id_element; - doris::vectorized::WalReader::string_split(columns, ",", column_id_element); + std::vector column_id_vector = + strings::Split(columns, ",", strings::SkipWhitespace()); + std::map column_info_map; + RETURN_IF_ERROR(_get_column_info(_db_id, _table_id, column_info_map)); std::stringstream ss_name; - std::stringstream ss_id; - int index_raw = 0; - for (auto column_id_str : column_id_element) { + for (auto column_id_str : column_id_vector) { try { int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10); - auto it = _column_id_info_map.find(column_id); - if (it != _column_id_info_map.end()) { - ss_name << "`" << it->second->first << "`,"; - ss_id << "c" << std::to_string(it->second->second) << ","; - index_vector.emplace_back(index_raw); + auto it = column_info_map.find(column_id); + if (it != column_info_map.end()) { + ss_name << "`" << it->second << "`,"; } - index_raw++; } catch (const std::invalid_argument& e) { return Status::InvalidArgument("Invalid format, {}", e.what()); } } auto name = ss_name.str().substr(0, ss_name.str().size() - 1); - auto id = ss_id.str().substr(0, ss_id.str().size() - 1); std::stringstream ss; ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " (" - << name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \"" + << name << ") select " << name << " from http_stream(\"format\" = \"wal\", \"table_id\" = \"" << std::to_string(_table_id) << "\")"; sql_str = ss.str().data(); return Status::OK(); @@ -286,9 +251,7 @@ Status WalTable::_construct_sql_str(const std::string& wal, const std::string& l Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label) { std::string sql_str; - std::vector index_vector; - RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str, index_vector)); - _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector); + RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str)); std::shared_ptr ctx = std::make_shared(_exec_env); ctx->sql_str = sql_str; ctx->wal_id = wal_id; @@ -304,13 +267,13 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get()); st = commit_st; } else if (!ctx->status.ok()) { - LOG(WARNING) << "handle streaming load failed, id=" << ctx->id - << ", errmsg=" << ctx->status; - _exec_env->stream_load_executor()->rollback_txn(ctx.get()); st = ctx->status; } } - _exec_env->wal_mgr()->erase_wal_column_index(wal_id); + if (!st.ok()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id << ", errmsg=" << st; + _exec_env->stream_load_executor()->rollback_txn(ctx.get()); + } LOG(INFO) << "relay wal id=" << wal_id << ",st=" << st.to_string(); return st; } @@ -339,9 +302,6 @@ void WalTable::stop() { do { { std::lock_guard lock(_replay_wal_lock); - if (!this->_stop.load()) { - this->_stop.store(true); - } if (_replay_wal_map.empty() && _replaying_queue.empty()) { break; } @@ -355,10 +315,11 @@ void WalTable::stop() { size_t WalTable::size() { std::lock_guard lock(_replay_wal_lock); - return _replay_wal_map.size(); + return _replay_wal_map.size() + _replaying_queue.size(); } -Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) { +Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id, + std::map& column_info_map) { TGetColumnInfoRequest request; request.__set_db_id(db_id); request.__set_table_id(tb_id); @@ -378,33 +339,21 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) { return status; } std::vector column_element = result.columns; - int64_t column_index = 1; - _column_id_info_map.clear(); for (auto column : column_element) { auto column_name = column.column_name; auto column_id = column.column_id; - std::shared_ptr column_pair = - std::make_shared(std::make_pair(column_name, column_index)); - _column_id_info_map.emplace(column_id, column_pair); - column_index++; + column_info_map.emplace(column_id, column_name); } } return status; } Status WalTable::_read_wal_header(const std::string& wal_path, std::string& columns) { - std::shared_ptr wal_reader; - RETURN_IF_ERROR(_exec_env->wal_mgr()->create_wal_reader(wal_path, wal_reader)); - uint32_t version = 0; - RETURN_IF_ERROR(wal_reader->read_header(version, columns)); + std::shared_ptr wal_reader = std::make_shared(wal_path); + RETURN_IF_ERROR(wal_reader->init()); + RETURN_IF_ERROR(wal_reader->read_header(columns)); RETURN_IF_ERROR(wal_reader->finalize()); return Status::OK(); } -Status WalTable::_delete_wal(int64_t wal_id) { - RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id)); - RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id)); - return Status::OK(); -} - } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal/wal_table.h b/be/src/olap/wal/wal_table.h index 66ee4fd372..07287d8f7e 100644 --- a/be/src/olap/wal/wal_table.h +++ b/be/src/olap/wal/wal_table.h @@ -41,41 +41,31 @@ public: void stop(); private: - // - using ColumnInfo = std::pair; - void _pick_relay_wals(); bool _need_replay(std::shared_ptr); Status _relay_wal_one_by_one(); - Status _delete_wal(int64_t wal_id); - Status _rename_to_tmp_path(const std::string wal); Status _replay_wal_internal(const std::string& wal); - // TODO change the param: (wal, int64_t* wal_id, std::string* label) - Status _parse_wal_path(const std::string& wal, - std::shared_ptr>&); + Status _parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label); Status _try_abort_txn(int64_t db_id, int64_t wal_id); - Status _get_column_info(int64_t db_id, int64_t tb_id); + Status _get_column_info(int64_t db_id, int64_t tb_id, + std::map& column_info_map); Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, const std::string& label); Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label); Status _construct_sql_str(const std::string& wal, const std::string& label, - std::string& sql_str, std::vector& index_vector); + std::string& sql_str); Status _read_wal_header(const std::string& wal, std::string& columns); private: ExecEnv* _exec_env; int64_t _db_id; int64_t _table_id; - // TODO the stop is not used? - std::atomic _stop; std::shared_ptr _http_stream_action; mutable std::mutex _replay_wal_lock; // key is wal_path std::map> _replay_wal_map; std::list> _replaying_queue; - // TODO should not use this map - std::map> _column_id_info_map; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp index 25f7cc0870..8edc6b3b86 100644 --- a/be/src/olap/wal/wal_writer.cpp +++ b/be/src/olap/wal/wal_writer.cpp @@ -35,7 +35,15 @@ WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {} WalWriter::~WalWriter() {} Status WalWriter::init() { + io::Path wal_path = _file_name; + auto parent_path = wal_path.parent_path(); + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(parent_path, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(parent_path)); + } RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_name, &_file_writer)); + LOG(INFO) << "create wal " << _file_name; return Status::OK(); } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 2838ebbed5..fc4a2df427 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -297,8 +297,6 @@ Status GroupCommitTable::_create_group_commit_load( std::unique_lock l(_lock); _load_block_queues.emplace(instance_id, load_block_queue); _need_plan_fragment = false; - _exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id, - WalManager::WalStatus::PREPARE); //create wal if (!is_pipeline) { RETURN_IF_ERROR(load_block_queue->create_wal( @@ -388,15 +386,16 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ if (status.ok() && st.ok() && (result_status.ok() || result_status.is())) { if (!config::group_commit_wait_replay_wal_finish) { - RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal( - txn_id, load_block_queue->block_queue_pre_allocated())); - RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id)); + auto delete_st = _exec_env->wal_mgr()->delete_wal( + table_id, txn_id, load_block_queue->block_queue_pre_allocated()); + if (!delete_st.ok()) { + LOG(WARNING) << "fail to delete wal " << txn_id; + } } } else { std::string wal_path; RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path)); RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path)); - _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id, WalManager::WalStatus::REPLAY); } std::stringstream ss; ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 4943c0177e..18a8325e4c 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -2047,7 +2047,9 @@ void PInternalServiceImpl::get_wal_queue_size(google::protobuf::RpcController* c bool ret = _light_work_pool.try_offer([this, request, response, done]() { brpc::ClosureGuard closure_guard(done); Status st = Status::OK(); - st = _exec_env->wal_mgr()->get_wal_status_queue_size(request, response); + auto table_id = request->table_id(); + auto count = _exec_env->wal_mgr()->get_wal_queue_size(table_id); + response->set_size(count); response->mutable_status()->set_status_code(st.code()); }); if (!ret) { diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp index 7344b4cbd9..98bb35c4bc 100644 --- a/be/src/vec/exec/format/wal/wal_reader.cpp +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -18,6 +18,7 @@ #include "wal_reader.h" #include "common/logging.h" +#include "gutil/strings/split.h" #include "olap/wal/wal_manager.h" #include "runtime/runtime_state.h" #include "vec/data_types/data_type_string.h" @@ -33,9 +34,11 @@ WalReader::~WalReader() { } } -Status WalReader::init_reader() { +Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) { + _tuple_descriptor = tuple_descriptor; RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path)); - RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_reader(_wal_path, _wal_reader)); + _wal_reader = std::make_shared(_wal_path); + RETURN_IF_ERROR(_wal_reader->init()); return Status::OK(); } @@ -59,14 +62,16 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { vectorized::Block dst_block; int index = 0; auto columns = block->get_columns_with_type_and_name(); - for (auto column : columns) { - auto pos = _column_index[index]; + CHECK(columns.size() == _tuple_descriptor->slots().size()); + for (auto slot_desc : _tuple_descriptor->slots()) { + auto pos = _column_pos_map[slot_desc->col_unique_id()]; vectorized::ColumnPtr column_ptr = src_block.get_by_position(pos).column; - if (column_ptr != nullptr && column.column->is_nullable()) { + if (column_ptr != nullptr && slot_desc->is_nullable()) { column_ptr = make_nullable(column_ptr); } - dst_block.insert(index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), - column.type, column.name)); + dst_block.insert( + index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), columns[index].type, + columns[index].name)); index++; } block->swap(dst_block); @@ -75,24 +80,22 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { return Status::OK(); } -void WalReader::string_split(const std::string& str, const std::string& splits, - std::vector& res) { - if (str == "") return; - std::string strs = str + splits; - size_t pos = strs.find(splits); - int step = splits.size(); - while (pos != strs.npos) { - std::string temp = strs.substr(0, pos); - res.push_back(temp); - strs = strs.substr(pos + step, strs.size()); - pos = strs.find(splits); - } -} - Status WalReader::get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) { - RETURN_IF_ERROR(_wal_reader->read_header(_version, _col_ids)); - RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_column_index(_wal_id, _column_index)); + std::string col_ids; + RETURN_IF_ERROR(_wal_reader->read_header(col_ids)); + std::vector column_id_vector = + strings::Split(col_ids, ",", strings::SkipWhitespace()); + try { + int64_t pos = 0; + for (auto col_id_str : column_id_vector) { + auto col_id = std::strtoll(col_id_str.c_str(), NULL, 10); + _column_pos_map.emplace(col_id, pos); + pos++; + } + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("Invalid format, {}", e.what()); + } return Status::OK(); } diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h index 78f2c31d92..e65af10016 100644 --- a/be/src/vec/exec/format/wal/wal_reader.h +++ b/be/src/vec/exec/format/wal/wal_reader.h @@ -17,6 +17,7 @@ #pragma once #include "olap/wal/wal_reader.h" +#include "runtime/descriptors.h" #include "vec/exec/format/generic_reader.h" namespace doris { @@ -26,23 +27,19 @@ class WalReader : public GenericReader { public: WalReader(RuntimeState* state); ~WalReader() override; - Status init_reader(); + Status init_reader(const TupleDescriptor* tuple_descriptor); Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; - // TODO move it - static void string_split(const std::string& str, const std::string& splits, - std::vector& res); private: RuntimeState* _state = nullptr; int64_t _wal_id; std::string _wal_path; - std::shared_ptr _wal_reader; - // TODO version should in olap/wal_reader - uint32_t _version = 0; - std::string _col_ids; - std::vector _column_index; + std::shared_ptr _wal_reader = nullptr; + const TupleDescriptor* _tuple_descriptor = nullptr; + // column_id, column_pos + std::map _column_pos_map; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index c70295e131..57f079d77a 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -882,7 +882,7 @@ Status VFileScanner::_get_next_reader() { } case TFileFormatType::FORMAT_WAL: { _cur_reader.reset(new WalReader(_state)); - init_status = ((WalReader*)(_cur_reader.get()))->init_reader(); + init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc); break; } case TFileFormatType::FORMAT_ARROW: { diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index 2dfd32c14f..c19f56fd07 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -38,8 +38,6 @@ VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, VWalWriter::~VWalWriter() {} Status VWalWriter::init() { - RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer)); - _wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WalStatus::CREATE); #ifndef BE_TEST if (config::group_commit_wait_replay_wal_finish) { std::shared_ptr lock = std::make_shared(); @@ -50,6 +48,8 @@ Status VWalWriter::init() { } } #endif + RETURN_IF_ERROR(_create_wal_writer(_wal_id, _wal_writer)); + _wal_manager->add_wal_queue(_tb_id, _wal_id); std::stringstream ss; for (auto slot_desc : _slot_descs) { if (slot_desc.col_unique_id < 0) { @@ -84,5 +84,13 @@ Status VWalWriter::close() { } return Status::OK(); } + +Status VWalWriter::_create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer) { + std::string wal_path; + RETURN_IF_ERROR(_wal_manager->get_wal_path(wal_id, wal_path)); + wal_writer = std::make_shared(wal_path); + RETURN_IF_ERROR(wal_writer->init()); + return Status::OK(); +} } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h index cbd369f7cb..f22250cb5d 100644 --- a/be/src/vec/sink/writer/vwal_writer.h +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -37,6 +37,9 @@ public: Status write_wal(vectorized::Block* block); Status close(); +private: + Status _create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer); + private: int64_t _db_id; int64_t _tb_id; diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index cf9e173364..6f983ca5bf 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -226,7 +226,6 @@ TEST_F(VWalScannerTest, normal) { index_vector.emplace_back(0); index_vector.emplace_back(1); index_vector.emplace_back(2); - _env->_wal_manager->add_wal_column_index(txn_id, index_vector); // config::group_commit_replay_wal_dir = wal_dir; NewFileScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); scan_node._output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 9e7174752a..d40020fd57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -537,9 +537,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } return; } - long maxWalId = Env.getCurrentGlobalTransactionMgr() - .getTransactionIDGenerator().getNextTransactionId(); - waitWalFinished(maxWalId); + waitWalFinished(); /* * all tasks are finished. check the integrity. * we just check whether all new replicas are healthy. @@ -602,20 +600,22 @@ public class SchemaChangeJobV2 extends AlterJobV2 { LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId); } - private void waitWalFinished(long maxWalId) { + private void waitWalFinished() { // wait wal done here Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK); LOG.info("block table {}", tableId); List aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true); long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold; - boolean walFinished = false; - while (System.currentTimeMillis() < expireTime) { + while (true) { LOG.info("wai for wal queue size to be empty"); - walFinished = Env.getCurrentEnv().getGroupCommitManager() - .isPreviousWalFinished(tableId, maxWalId, aliveBeIds); + boolean walFinished = Env.getCurrentEnv().getGroupCommitManager() + .isPreviousWalFinished(tableId, aliveBeIds); if (walFinished) { LOG.info("all wal is finished"); break; + } else if (System.currentTimeMillis() > expireTime) { + LOG.warn("waitWalFinished time out"); + break; } else { try { Thread.sleep(100); @@ -624,9 +624,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } } } - if (!walFinished) { - LOG.warn("waitWalFinished time out"); - } Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.NORMAL); LOG.info("release table {}", tableId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index ec4e01a0b4..3b9719b259 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -60,7 +60,7 @@ public class GroupCommitManager { /** * Check the wal before the endTransactionId is finished or not. */ - public boolean isPreviousWalFinished(long tableId, long endTransactionId, List aliveBeIds) { + public boolean isPreviousWalFinished(long tableId, List aliveBeIds) { boolean empty = true; for (int i = 0; i < aliveBeIds.size(); i++) { Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i)); @@ -70,9 +70,8 @@ public class GroupCommitManager { } PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() .setTableId(tableId) - .setTxnId(endTransactionId) .build(); - long size = getWallQueueSize(backend, request); + long size = getWalQueueSize(backend, request); if (size > 0) { LOG.info("backend id:" + backend.getId() + ",wal size:" + size); empty = false; @@ -84,16 +83,15 @@ public class GroupCommitManager { public long getAllWalQueueSize(Backend backend) { PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() .setTableId(-1) - .setTxnId(-1) .build(); - long size = getWallQueueSize(backend, request); + long size = getWalQueueSize(backend, request); if (size > 0) { LOG.info("backend id:" + backend.getId() + ",all wal size:" + size); } return size; } - public long getWallQueueSize(Backend backend, PGetWalQueueSizeRequest request) { + public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) { PGetWalQueueSizeResponse response = null; long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold; long size = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 5fe89a07f1..09de78a5e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -330,8 +330,10 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio List fileColumns = new ArrayList<>(); Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId); List tableColumns = table.getBaseSchema(true); - for (int i = 1; i <= tableColumns.size(); i++) { - fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true)); + for (int i = 0; i < tableColumns.size(); i++) { + Column column = new Column(tableColumns.get(i).getName(), tableColumns.get(i).getType(), true); + column.setUniqueId(tableColumns.get(i).getUniqueId()); + fileColumns.add(column); } return fileColumns; } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 351bf5caf8..c91a4865ca 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -804,7 +804,6 @@ message PStreamHeader { message PGetWalQueueSizeRequest{ optional int64 table_id = 1; - optional int64 txn_id = 2; } message PGetWalQueueSizeResponse{