From b72a4a4bc65b7851db358fff2bee0d878d8ba546 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 10 Oct 2019 09:39:02 +0800 Subject: [PATCH] Add tablet meta checkpoint mechanism (#1936) --- be/src/common/config.h | 4 + be/src/olap/data_dir.cpp | 23 +--- be/src/olap/olap_server.cpp | 10 ++ be/src/olap/rowset/rowset.h | 4 + be/src/olap/rowset/rowset_meta.h | 4 + be/src/olap/schema_change.cpp | 7 +- be/src/olap/storage_engine.cpp | 63 ++++++++++ be/src/olap/storage_engine.h | 7 ++ be/src/olap/tablet.cpp | 116 +++++++++++++++++- be/src/olap/tablet.h | 13 +- be/src/olap/tablet_manager.cpp | 27 +++- be/src/olap/tablet_manager.h | 2 + .../olap/task/engine_publish_version_task.cpp | 2 - 13 files changed, 256 insertions(+), 26 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index b37e788f65..13792fd52c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -467,6 +467,10 @@ namespace config { CONF_Int64(storage_flood_stage_left_capacity_bytes, "1073741824") // 1GB // number of thread for flushing memtable per store CONF_Int32(flush_thread_num_per_store, "2"); + + // config for tablet meta checkpoint + CONF_Int32(tablet_meta_checkpoint_min_new_rowsets_num, "10"); + CONF_Int32(tablet_meta_checkpoint_min_interval_secs, "600"); } // namespace config } // namespace doris diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 1450548920..562a2c7aa7 100755 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -825,24 +825,13 @@ OLAPStatus DataDir::load() { } } else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE && rowset_meta->tablet_uid() == tablet->tablet_uid()) { - // add visible rowset to tablet, it maybe use in the future - // there should be only preparing rowset in meta env because visible - // rowset is persist with tablet meta currently - OLAPStatus publish_status = tablet->add_inc_rowset(rowset); + OLAPStatus publish_status = tablet->add_rowset(rowset, false); if (publish_status != OLAP_SUCCESS && publish_status != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { - LOG(WARNING) << "add visilbe rowset to tablet failed rowset_id:" << rowset->rowset_id() - << " tablet id: " << rowset_meta->tablet_id() - << " txn id:" << rowset_meta->txn_id() - << " start_version: " << rowset_meta->version().first - << " end_version: " << rowset_meta->version().second; - } else { - // it is added into tablet meta, then remove it from meta - RowsetMetaManager::remove(tablet->data_dir()->get_meta(), rowset_meta->tablet_uid(), rowset->rowset_id()); - LOG(INFO) << "successfully to add visible rowset: " << rowset_meta->rowset_id() - << " to tablet: " << rowset_meta->tablet_id() - << " txn id:" << rowset_meta->txn_id() - << " start_version: " << rowset_meta->version().first - << " end_version: " << rowset_meta->version().second; + LOG(WARNING) << "add visible rowset to tablet failed rowset_id:" << rowset->rowset_id() + << " tablet id: " << rowset_meta->tablet_id() + << " txn id:" << rowset_meta->txn_id() + << " start_version: " << rowset_meta->version().first + << " end_version: " << rowset_meta->version().second; } } else { LOG(WARNING) << "find invalid rowset: " << rowset_meta->rowset_id() diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 71cdb0346d..c1bea06314 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -90,6 +90,16 @@ OLAPStatus StorageEngine::_start_bg_worker() { thread.detach(); } + for (auto data_dir : data_dirs) { + _tablet_checkpoint_threads.emplace_back( + [this, data_dir] { + _tablet_checkpoint_callback((void*)data_dir); + }); + } + for (auto& thread : _tablet_checkpoint_threads) { + thread.detach(); + } + _fd_cache_clean_thread = std::thread( [this] { _fd_cache_clean_callback(nullptr); diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index a6752e6f1c..ba4cb3e614 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -118,6 +118,10 @@ public: _need_delete_file = true; } + bool contains_version(Version version) { + return rowset_meta()->version().first <= version.first && rowset_meta()->version().second >= version.second; + } + static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) { return left->end_version() < right->end_version(); } diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index c0f494defa..d588d75295 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -293,6 +293,10 @@ public: *rs_meta_pb = _rowset_meta_pb; } + bool is_singleton_delta() { + return has_version() && _rowset_meta_pb.start_version() == _rowset_meta_pb.end_version(); + } + private: friend class AlphaRowsetMeta; bool _deserialize_from_pb(const std::string& value) { diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 43baf71a6f..17b06afd75 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1944,7 +1944,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa sc_params.new_tablet->release_push_lock(); goto PROCESS_ALTER_EXIT; } - res = sc_params.new_tablet->add_rowset(new_rowset); + res = sc_params.new_tablet->add_rowset(new_rowset, false); if (res == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { LOG(WARNING) << "version already exist, version revert occured. " << "tablet=" << sc_params.new_tablet->full_name() @@ -1973,6 +1973,11 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa } // XXX: 此时应该不取消SchemaChange状态,因为新Delta还要转换成新旧Schema的版本 PROCESS_ALTER_EXIT: + { + // save tablet meta here because rowset meta is not saved during add rowset + WriteLock new_wlock(sc_params.new_tablet->get_header_lock_ptr()); + res = sc_params.new_tablet->save_meta(); + } if (res == OLAP_SUCCESS) { Version test_version(0, end_version); res = sc_params.new_tablet->check_version_integrity(test_version); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 1f670d01cd..6c88a75529 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -648,9 +648,52 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) { // clean rubbish transactions _clean_unused_txns(); + // clean unused rowset metas in OlapMeta + _clean_unused_rowset_metas(); + return res; } +void StorageEngine::_clean_unused_rowset_metas() { + std::vector invalid_rowset_metas; + auto clean_rowset_func = [this, &invalid_rowset_metas](TabletUid tablet_uid, RowsetId rowset_id, + const std::string& meta_str) -> bool { + + RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta()); + bool parsed = rowset_meta->init(meta_str); + if (!parsed) { + LOG(WARNING) << "parse rowset meta string failed for rowset_id:" << rowset_id; + // return false will break meta iterator, return true to skip this error + return true; + } + if (rowset_meta->tablet_uid() != tablet_uid) { + LOG(WARNING) << "tablet uid is not equal, skip the rowset" + << ", rowset_id=" << rowset_meta->rowset_id() + << ", in_put_tablet_uid=" << tablet_uid + << ", tablet_uid in rowset meta=" << rowset_meta->tablet_uid(); + return true; + } + + TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id(), rowset_meta->tablet_schema_hash(), tablet_uid); + if (tablet == nullptr) { + return true; + } + if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE && (!tablet->rowset_meta_is_useful(rowset_meta))) { + LOG(INFO) << "rowset meta is useless any more, remote it. rowset_id=" << rowset_meta->rowset_id(); + invalid_rowset_metas.push_back(rowset_meta); + } + return true; + }; + auto data_dirs = get_stores(); + for (auto data_dir : data_dirs) { + RowsetMetaManager::traverse_rowset_metas(data_dir->get_meta(), clean_rowset_func); + for (auto& rowset_meta : invalid_rowset_metas) { + RowsetMetaManager::remove(data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id()); + } + invalid_rowset_metas.clear(); + } +} + void StorageEngine::_clean_unused_txns() { std::set tablet_infos; _txn_manager->get_all_related_tablets(&tablet_infos); @@ -977,4 +1020,24 @@ void* StorageEngine::_path_scan_thread_callback(void* arg) { return nullptr; } +void* StorageEngine::_tablet_checkpoint_callback(void* arg) { +#ifdef GOOGLE_PROFILER + ProfilerRegisterThread(); +#endif + LOG(INFO) << "try to start tablet meta checkpoint thread!"; + while (true) { + LOG(INFO) << "begin to do tablet meta checkpoint"; + int64_t start_time = UnixMillis(); + _tablet_manager->do_tablet_meta_checkpoint((DataDir*)arg); + int64_t used_time = (UnixMillis() - start_time) / 1000; + if (used_time < config::tablet_meta_checkpoint_min_interval_secs) { + sleep(config::tablet_meta_checkpoint_min_interval_secs - used_time); + } else { + sleep(1); + } + } + + return nullptr; +} + } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 7f96fa4bb0..d7a3b51688 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -227,6 +227,8 @@ private: OLAPStatus _start_bg_worker(); void _clean_unused_txns(); + + void _clean_unused_rowset_metas(); OLAPStatus _do_sweep( const std::string& scan_root, const time_t& local_tm_now, const int32_t expire); @@ -255,6 +257,8 @@ private: void* _path_scan_thread_callback(void* arg); + void* _tablet_checkpoint_callback(void* arg); + private: struct CompactionCandidate { @@ -333,6 +337,9 @@ private: // thread to scan disk paths std::vector _path_scan_threads; + // thread to run tablet checkpoint + std::vector _tablet_checkpoint_threads; + static atomic_t _s_request_number; // for tablet and disk report diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index cde5b89c7e..138d37a104 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -240,13 +240,48 @@ OLAPStatus Tablet::deregister_tablet_from_dir() { return _data_dir->deregister_tablet(this); } -OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset) { +OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) { WriteLock wrlock(&_meta_lock); + // if the rowset already exist, should not return version already exist + // should return OLAP_SUCCESS + if (contains_rowset(rowset->rowset_id())) { + return OLAP_SUCCESS; + } RETURN_NOT_OK(_check_added_rowset(rowset)); RETURN_NOT_OK(_tablet_meta->add_rs_meta(rowset->rowset_meta())); _rs_version_map[rowset->version()] = rowset; RETURN_NOT_OK(_rs_graph.add_version_to_graph(rowset->version())); - RETURN_NOT_OK(save_meta()); + + vector rowsets_to_delete; + // yiguolei: temp code, should remove the rowset contains by this rowset + // but it should be removed in multi path version + for (auto& it : _rs_version_map) { + if ((it.first.first >= rowset->start_version() && it.first.second < rowset->end_version()) + || (it.first.first > rowset->start_version() && it.first.second <= rowset->end_version())) { + if (it.second == nullptr) { + LOG(FATAL) << "there exist a version " + << " start_version=" << it.first.first + << " end_version=" << it.first.second + << " contains the input rs with version " + << " start_version=" << rowset->start_version() + << " end_version=" << rowset->end_version() + << " but the related rs is null"; + return OLAP_ERR_PUSH_ROWSET_NOT_FOUND; + } else { + rowsets_to_delete.push_back(it.second); + } + } + } + modify_rowsets(std::vector(), rowsets_to_delete); + + if (need_persist) { + OLAPStatus res = RowsetMetaManager::save(data_dir()->get_meta(), tablet_uid(), + rowset->rowset_id(), rowset->rowset_meta().get()); + if (res != OLAP_SUCCESS) { + LOG(FATAL) << "failed to save rowset to local meta store" << rowset->rowset_id(); + } + } + ++_newly_created_rowset_num; return OLAP_SUCCESS; } @@ -269,7 +304,8 @@ OLAPStatus Tablet::modify_rowsets(const vector& to_add, } for (auto& rs : to_add) { - _rs_version_map[rs->version()] = rs;; + _rs_version_map[rs->version()] = rs; + ++_newly_created_rowset_num; } _rs_graph.reconstruct_rowset_graph(_tablet_meta->all_rs_metas()); @@ -336,8 +372,12 @@ RowsetSharedPtr Tablet::rowset_with_largest_size() { return largest_rowset; } +// add inc rowset should not persist tablet meta, because the OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) { WriteLock wrlock(&_meta_lock); + if (contains_rowset(rowset->rowset_id())) { + return OLAP_SUCCESS; + } // check if the rowset id is valid RETURN_NOT_OK(_check_added_rowset(rowset)); RETURN_NOT_OK(_tablet_meta->add_rs_meta(rowset->rowset_meta())); @@ -345,7 +385,7 @@ OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) { _inc_rs_version_map[rowset->version()] = rowset; RETURN_NOT_OK(_rs_graph.add_version_to_graph(rowset->version())); RETURN_NOT_OK(_tablet_meta->add_inc_rs_meta(rowset->rowset_meta())); - RETURN_NOT_OK(_tablet_meta->save_meta(_data_dir)); + ++_newly_created_rowset_num; return OLAP_SUCCESS; } @@ -974,4 +1014,72 @@ void Tablet::pick_candicate_rowsets_to_base_compaction(std::vectorfull_name(); break; diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 678b5f7a0c..3cf1fc83e8 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -135,6 +135,8 @@ public: void get_partition_related_tablets(int64_t partition_id, std::set* tablet_infos); + void do_tablet_meta_checkpoint(DataDir* data_dir); + private: // Add a tablet pointer to StorageEngine // If force, drop the existing tablet add this new one diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 68e2d42854..0531709e65 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -118,8 +118,6 @@ OLAPStatus EnginePublishVersionTask::finish() { LOG(INFO) << "publish version successfully on tablet. tablet=" << tablet->full_name() << ", transaction_id=" << transaction_id << ", version=" << version.first << ", res=" << publish_status; - // delete rowset from meta env, because add inc rowset alreay saved the rowset meta to tablet meta - RowsetMetaManager::remove(tablet->data_dir()->get_meta(), tablet->tablet_uid(), rowset->rowset_id()); } // check if the related tablet remained all have the version