From feef077520ba9dc843fa900febd43b8ed03c2b66 Mon Sep 17 00:00:00 2001 From: LingBin Date: Mon, 17 Feb 2020 14:50:29 +0800 Subject: [PATCH] Some refactors on `TabletManager` (#2918) 1. Add some comments to make the code easier to understand; 2. Make the metric `create_tablet_requests_failed` to be accurate; 3. Some internal methods use naked pointers directly instead of `shared_ptr`; 4. The `using` in `.h` files are contagious when included by other files, so we should only use it in `.cpp` files; 5. Some formatting changes: such as wrapping lines that are too long 6. Parameters that need to be modified, use pointers instead of references No functional changes in this patch. --- be/src/olap/tablet.cpp | 15 +- be/src/olap/tablet.h | 6 +- be/src/olap/tablet_manager.cpp | 1037 +++++++++++++--------------- be/src/olap/tablet_manager.h | 61 +- be/src/olap/tablet_meta.cpp | 34 +- be/src/olap/tablet_meta.h | 37 +- be/src/service/backend_service.cpp | 11 +- be/src/service/backend_service.h | 2 +- be/test/olap/tablet_mgr_test.cpp | 1 - 9 files changed, 573 insertions(+), 631 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index dafcdfcfff..874b5316ed 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -48,16 +48,9 @@ using std::sort; using std::string; using std::vector; -TabletSharedPtr Tablet::create_tablet_from_meta( - TabletMetaSharedPtr tablet_meta, - DataDir* data_dir) { - TabletSharedPtr tablet = std::make_shared(tablet_meta, data_dir); - if (tablet == nullptr) { - LOG(WARNING) << "fail to malloc a table."; - return nullptr; - } - - return tablet; +TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, + DataDir* data_dir) { + return std::make_shared(tablet_meta, data_dir); } void Tablet::_gen_tablet_path() { @@ -535,6 +528,8 @@ OLAPStatus Tablet::add_delete_predicate( return _tablet_meta->add_delete_predicate(delete_predicate, version); } +// TODO(lingbin): what is the difference between version_for_delete_predicate() and +// version_for_load_deletion()? should at least leave a comment bool Tablet::version_for_delete_predicate(const Version& version) { return _tablet_meta->version_for_delete_predicate(version); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 9f94637d25..a668a84eec 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -47,9 +47,8 @@ using TabletSharedPtr = std::shared_ptr; class Tablet : public std::enable_shared_from_this { public: - static TabletSharedPtr create_tablet_from_meta( - TabletMetaSharedPtr tablet_meta, - DataDir* data_dir = nullptr); + static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, + DataDir* data_dir = nullptr); Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir); ~Tablet(); @@ -241,7 +240,6 @@ public: bool rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta); - void build_tablet_report_info(TTabletInfo* tablet_info); OLAPStatus generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta); diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 024904cf9b..efc1129f42 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -29,6 +29,7 @@ #include #include +#include "gutil/strings/strcat.h" #include "olap/base_compaction.h" #include "olap/cumulative_compaction.h" #include "olap/data_dir.h" @@ -39,6 +40,7 @@ #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_id_generator.h" #include "olap/schema_change.h" +#include "olap/tablet.h" #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/utils.h" @@ -46,35 +48,23 @@ #include "util/file_utils.h" #include "util/pretty_printer.h" #include "util/path_util.h" +#include "util/scoped_cleanup.h" #include "util/time.h" -using apache::thrift::ThriftDebugString; -using boost::filesystem::canonical; -using boost::filesystem::directory_iterator; -using boost::filesystem::path; -using boost::filesystem::recursive_directory_iterator; -using std::back_inserter; -using std::copy; -using std::inserter; using std::list; using std::map; -using std::nothrow; -using std::pair; -using std::priority_queue; using std::set; -using std::set_difference; using std::string; using std::vector; +using strings::Substitute; namespace doris { -bool _sort_tablet_by_creation_time(const TabletSharedPtr& a, const TabletSharedPtr& b) { +static bool _cmp_tablet_by_create_time(const TabletSharedPtr& a, const TabletSharedPtr& b) { return a->creation_time() < b->creation_time(); } -TabletManager::TabletManager() : - _tablet_stat_cache_update_time_ms(0), - _available_storage_medium_type_count(0) { } +TabletManager::TabletManager() : _last_update_stat_ms(0) { } OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash, const TabletSharedPtr& tablet, @@ -83,42 +73,42 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s VLOG(3) << "begin to add tablet to TabletManager. " << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash << ", force=" << force; - TabletSharedPtr tablet_item = nullptr; + TabletSharedPtr existed_tablet = nullptr; for (TabletSharedPtr item : _tablet_map[tablet_id].table_arr) { if (item->equal(tablet_id, schema_hash)) { - tablet_item = item; + existed_tablet = item; break; } } - if (tablet_item == nullptr) { - VLOG(3) << "not find exist tablet just add it to map" - << " tablet_id = " << tablet_id - << " schema_hash = " << schema_hash; - return _add_tablet_to_map_unlocked(tablet_id, schema_hash, tablet, update_meta, false, false); + if (existed_tablet == nullptr) { + return _add_tablet_to_map_unlocked(tablet_id, schema_hash, + tablet, update_meta, + false /*keep_files*/, false /*drop_old*/); } if (!force) { - if (tablet_item->tablet_path() == tablet->tablet_path()) { - LOG(WARNING) << "add the same tablet twice! tablet_id=" - << tablet_id << " schema_hash=" << schema_hash; + if (existed_tablet->tablet_path() == tablet->tablet_path()) { + LOG(WARNING) << "add the same tablet twice! tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash + << ", tablet_path=" << tablet->tablet_path(); return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE; } - if (tablet_item->data_dir() == tablet->data_dir()) { - LOG(WARNING) << "add tablet with same data dir twice! tablet_id=" - << tablet_id << " schema_hash=" << schema_hash; + if (existed_tablet->data_dir() == tablet->data_dir()) { + LOG(WARNING) << "add tablet with same data dir twice! tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash; return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE; } } - tablet_item->obtain_header_rdlock(); - const RowsetSharedPtr old_rowset = tablet_item->rowset_with_max_version(); + existed_tablet->obtain_header_rdlock(); + const RowsetSharedPtr old_rowset = existed_tablet->rowset_with_max_version(); const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version(); - // if new tablet is empty, it is a newly created schema change tablet + // If new tablet is empty, it is a newly created schema change tablet. // the old tablet is dropped before add tablet. it should not exist old tablet if (new_rowset == nullptr) { - tablet_item->release_header_lock(); + existed_tablet->release_header_lock(); // it seems useless to call unlock and return here. // it could prevent error when log level is changed in the future. LOG(FATAL) << "new tablet is empty and old tablet exists. it should not happen." @@ -129,7 +119,7 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s int64_t new_time = new_rowset->creation_time(); int32_t old_version = old_rowset == nullptr ? -1 : old_rowset->end_version(); int32_t new_version = new_rowset->end_version(); - tablet_item->release_header_lock(); + existed_tablet->release_header_lock(); // In restore process, we replace all origin files in tablet dir with // the downloaded snapshot files. Then we try to reload tablet header. @@ -142,7 +132,8 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s if (force || (new_version > old_version || (new_version == old_version && new_time > old_time))) { // check if new tablet's meta is in store and add new tablet's meta to meta store - res = _add_tablet_to_map_unlocked(tablet_id, schema_hash, tablet, update_meta, keep_files, true); + res = _add_tablet_to_map_unlocked(tablet_id, schema_hash, tablet, + update_meta, keep_files, true /*drop_old*/); } else { res = OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE; } @@ -150,55 +141,44 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s << ", tablet_id=" << tablet_id << ", schema_hash=" << schema_hash << ", old_version=" << old_version << ", new_version=" << new_version << ", old_time=" << old_time << ", new_time=" << new_time - << ", old_tablet_path=" << tablet_item->tablet_path() + << ", old_tablet_path=" << existed_tablet->tablet_path() << ", new_tablet_path=" << tablet->tablet_path(); return res; -} // add_tablet +} OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, SchemaHash schema_hash, - const TabletSharedPtr& tablet, bool update_meta, - bool keep_files, bool drop_old) { + const TabletSharedPtr& tablet, + bool update_meta, bool keep_files, + bool drop_old) { // check if new tablet's meta is in store and add new tablet's meta to meta store OLAPStatus res = OLAP_SUCCESS; if (update_meta) { // call tablet save meta in order to valid the meta - res = tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to save new tablet's meta to meta store" - << " tablet_id = " << tablet_id - << " schema_hash = " << schema_hash; - return res; - } + RETURN_NOT_OK_LOG(tablet->save_meta(), Substitute( + "failed to save new tablet's meta. tablet_id=$0, schema_hash=$1", + tablet_id, schema_hash)); } if (drop_old) { - // if the new tablet is fresher than current one - // then delete current one and add new one - res = _drop_tablet_unlocked(tablet_id, schema_hash, keep_files); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to drop old tablet when add new tablet" - << " tablet_id = " << tablet_id - << " schema_hash = " << schema_hash; - return res; - } + // If the new tablet is fresher than the existing one, then replace + // the existing tablet with the new one. + RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, schema_hash, keep_files), Substitute( + "failed to drop old tablet when add new tablet. tablet_id=$0, schema_hash=$1", + tablet_id, schema_hash)); } - // Register tablet into StorageEngine, so that we can manage tablet from + // Register tablet into DataDir, so that we can manage tablet from // the perspective of root path. // Example: unregister all tables when a bad disk found. - res = tablet->register_tablet_into_dir(); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to register tablet into StorageEngine. res=" << res - << ", data_dir=" << tablet->data_dir()->path(); - return res; - } - _tablet_map[tablet_id].table_arr.push_back(tablet); - _tablet_map[tablet_id].table_arr.sort(_sort_tablet_by_creation_time); + RETURN_NOT_OK_LOG(tablet->register_tablet_into_dir(), Substitute( + "fail to register tablet into StorageEngine. data_dir=$0", + tablet->data_dir()->path())); - // add the tablet id to partition map + _tablet_map[tablet_id].table_arr.push_back(tablet); + _tablet_map[tablet_id].table_arr.sort(_cmp_tablet_by_create_time); _partition_tablet_map[tablet->partition_id()].insert(tablet->get_tablet_info()); - VLOG(3) << "add tablet to map successfully." << " tablet_id=" << tablet_id - << ", schema_hash=" << schema_hash; + VLOG(3) << "add tablet to map successfully." + << " tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; return res; } @@ -212,120 +192,133 @@ bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) { return it != _tablet_map.end() && !it->second.table_arr.empty(); } -void TabletManager::clear() { - _tablet_map.clear(); - _shutdown_tablets.clear(); -} - OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, std::vector stores) { - LOG(INFO) << "begin to process create tablet. tablet=" << request.tablet_id - << ", schema_hash=" << request.tablet_schema.schema_hash; - WriteLock wrlock(&_tablet_map_lock); - OLAPStatus res = OLAP_SUCCESS; DorisMetrics::create_tablet_requests_total.increment(1); - // Make sure create_tablet operation is idempotent: - // return true if tablet with same tablet_id and schema_hash exist, - // false if tablet with same tablet_id but different schema_hash exist - // during alter, if the tablet(same tabletid and schema hash) already exist - // then just return true, if tablet id with different schema hash exist, wait report - // task to delete the tablet - if (_check_tablet_id_exist_unlocked(request.tablet_id)) { - TabletSharedPtr tablet = _get_tablet_unlocked( - request.tablet_id, request.tablet_schema.schema_hash); + + int64_t tablet_id = request.tablet_id; + int32_t schema_hash = request.tablet_schema.schema_hash; + LOG(INFO) << "begin to create tablet. tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash; + + WriteLock wrlock(&_tablet_map_lock); + // Make create_tablet operation to be idempotent: + // 1. Return true if tablet with same tablet_id and schema_hash exist; + // false if tablet with same tablet_id but different schema_hash exist. + // 2. When this is an alter task, if the tablet(both tablet_id and schema_hash are + // same) already exist, then just return true(an duplicate request). But if + // tablet_id exist but with different schema_hash, return an error(report task will + // eventually trigger its deletion). + if (_check_tablet_id_exist_unlocked(tablet_id)) { + TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id, schema_hash); if (tablet != nullptr) { - LOG(INFO) << "create tablet success because tablet already exist. tablet_id=" - << request.tablet_id; + LOG(INFO) << "success to create tablet. tablet already exist. tablet_id=" << tablet_id; return OLAP_SUCCESS; } else { - LOG(WARNING) << "tablet with different schema hash already exists. tablet_id=" - << request.tablet_id; + LOG(WARNING) << "fail to create tablet. tablet exist but with different schema_hash. " + << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; + DorisMetrics::create_tablet_requests_failed.increment(1); return OLAP_ERR_CE_TABLET_ID_EXIST; } } - TabletSharedPtr ref_tablet = nullptr; - bool is_schema_change_tablet = false; - // if the CreateTabletReq has base_tablet_id then it is a alter tablet request + TabletSharedPtr base_tablet = nullptr; + bool is_schema_change = false; + // If the CreateTabletReq has base_tablet_id then it is a alter-tablet request if (request.__isset.base_tablet_id && request.base_tablet_id > 0) { - is_schema_change_tablet = true; - ref_tablet = _get_tablet_unlocked(request.base_tablet_id, request.base_schema_hash); - if (ref_tablet == nullptr) { - LOG(WARNING) << "fail to create new tablet. new_tablet_id=" << request.tablet_id - << ", new_schema_hash=" << request.tablet_schema.schema_hash - << ", because could not find base tablet, base_tablet_id=" << request.base_tablet_id + is_schema_change = true; + base_tablet = _get_tablet_unlocked(request.base_tablet_id, request.base_schema_hash); + if (base_tablet == nullptr) { + LOG(WARNING) << "fail to create tablet(change schema), base tablet does not exist. " + << "new_tablet_id=" << tablet_id << ", new_schema_hash=" << schema_hash + << ", base_tablet_id=" << request.base_tablet_id << ", base_schema_hash=" << request.base_schema_hash; + DorisMetrics::create_tablet_requests_failed.increment(1); return OLAP_ERR_TABLE_CREATE_META_ERROR; } - // schema change should use the same data dir + // If we are doing schema-change, we should use the same data dir + // TODO(lingbin): A litter trick here, the directory should be determined before + // entering this method stores.clear(); - stores.push_back(ref_tablet->data_dir()); + stores.push_back(base_tablet->data_dir()); } - // set alter type to schema change. it is useless - TabletSharedPtr tablet = _internal_create_tablet_unlocked(AlterTabletType::SCHEMA_CHANGE, request, - is_schema_change_tablet, ref_tablet, stores); + // set alter type to schema-change. it is useless + TabletSharedPtr tablet = _internal_create_tablet_unlocked( + AlterTabletType::SCHEMA_CHANGE, request, is_schema_change, base_tablet.get(), stores); if (tablet == nullptr) { - res = OLAP_ERR_CE_CMD_PARAMS_ERROR; - LOG(WARNING) << "fail to create tablet. res=" << res; + LOG(WARNING) << "fail to create tablet. tablet_id=" << request.tablet_id; + DorisMetrics::create_tablet_requests_failed.increment(1); + return OLAP_ERR_CE_CMD_PARAMS_ERROR; } - LOG(INFO) << "finish to process create tablet. res=" << res; - return res; -} // create_tablet + LOG(INFO) << "success to create tablet. tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash; + return OLAP_SUCCESS; +} TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( const AlterTabletType alter_type, const TCreateTabletReq& request, - const bool is_schema_change_tablet, const TabletSharedPtr ref_tablet, - std::vector data_dirs) { - DCHECK((is_schema_change_tablet && ref_tablet != nullptr) - || (!is_schema_change_tablet && ref_tablet == nullptr)); - // check if the tablet with specified tablet id and schema hash already exists - auto checked_tablet = _get_tablet_unlocked(request.tablet_id, request.tablet_schema.schema_hash); - if (checked_tablet != nullptr) { - LOG(WARNING) << "failed to create tablet because tablet already exist." - << " tablet id = " << request.tablet_id - << " schema hash = " << request.tablet_schema.schema_hash; - return nullptr; - } - bool is_tablet_added = false; - auto tablet = _create_tablet_meta_and_dir_unlocked(request, is_schema_change_tablet, ref_tablet, data_dirs); + const bool is_schema_change, const Tablet* base_tablet, + const std::vector& data_dirs) { + // If in schema-change state, base_tablet must also be provided. + // i.e., is_schema_change and base_tablet are either assigned or not assigned + DCHECK((is_schema_change && base_tablet) || (!is_schema_change && !base_tablet)); + + // NOTE: The existence of tablet_id and schema_hash has already been checked, + // no need check again here. + + auto tablet = _create_tablet_meta_and_dir_unlocked(request, is_schema_change, + base_tablet, data_dirs); if (tablet == nullptr) { return nullptr; } + int64_t new_tablet_id = request.tablet_id; + int32_t new_schema_hash = request.tablet_schema.schema_hash; + + // should remove the tablet's pending_id no matter create-tablet success or not + DataDir* data_dir = tablet->data_dir(); + SCOPED_CLEANUP({ + data_dir->remove_pending_ids(StrCat(TABLET_ID_PREFIX, new_tablet_id)); + }); + // TODO(yiguolei) - // the following code is very difficult to understand because it mixed alter tablet v2 and alter tablet v1 - // should remove alter tablet v1 code after v0.12 + // the following code is very difficult to understand because it mixed alter tablet v2 + // and alter tablet v1 should remove alter tablet v1 code after v0.12 OLAPStatus res = OLAP_SUCCESS; + bool is_tablet_added = false; do { res = tablet->init(); if (res != OLAP_SUCCESS) { LOG(WARNING) << "tablet init failed. tablet:" << tablet->full_name(); break; } - if (!is_schema_change_tablet || (request.__isset.base_tablet_id && request.base_tablet_id > 0)) { + // TODO(lingbin): is it needed? because all type of create_tablet will be true. + // 1. !is_schema_change: not in schema-change state; + // 2. request.base_tablet_id > 0: in schema-change state; + if (!is_schema_change || (request.__isset.base_tablet_id && request.base_tablet_id > 0)) { // Create init version if this is not a restore mode replica and request.version is set // bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; // if (!in_restore_mode && request.__isset.version) { // create inital rowset before add it to storage engine could omit many locks - res = _create_inital_rowset_unlocked(tablet, request); + res = _create_inital_rowset_unlocked(request, tablet.get()); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to create initial version for tablet. res=" << res; break; } } - if (is_schema_change_tablet) { + if (is_schema_change) { if (request.__isset.base_tablet_id && request.base_tablet_id > 0) { - LOG(INFO) << "this request is for alter tablet request v2, so that not add alter task to tablet"; + LOG(INFO) << "request for alter-tablet v2, do not add alter task to tablet"; // if this is a new alter tablet, has to set its state to not ready // because schema change hanlder depends on it to check whether history data // convert finished tablet->set_tablet_state(TabletState::TABLET_NOTREADY); } else { // add alter task to new tablet if it is a new tablet during schema change - tablet->add_alter_task(ref_tablet->tablet_id(), ref_tablet->schema_hash(), - vector(), alter_type); + tablet->add_alter_task(base_tablet->tablet_id(), base_tablet->schema_hash(), + vector(), alter_type); } // 有可能出现以下2种特殊情况: // 1. 因为操作系统时间跳变,导致新生成的表的creation_time小于旧表的creation_time时间 @@ -334,24 +327,25 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( // // 当出现以上2种情况时,为了能够区分alter得到的新表和旧表,这里把新表的creation_time设置为 // 旧表的creation_time加1 - if (tablet->creation_time() <= ref_tablet->creation_time()) { - LOG(WARNING) << "new tablet's creation time is less than or equal to old tablet" - << "new_tablet_creation_time=" << tablet->creation_time() - << ", ref_tablet_creation_time=" << ref_tablet->creation_time(); - int64_t new_creation_time = ref_tablet->creation_time() + 1; + if (tablet->creation_time() <= base_tablet->creation_time()) { + LOG(WARNING) << "new tablet's create time is less than or equal to old tablet" + << "new_tablet_create_time=" << tablet->creation_time() + << ", base_tablet_create_time=" << base_tablet->creation_time(); + int64_t new_creation_time = base_tablet->creation_time() + 1; tablet->set_creation_time(new_creation_time); } } // Add tablet to StorageEngine will make it visiable to user - res = _add_tablet_unlocked(request.tablet_id, request.tablet_schema.schema_hash, tablet, true, false); + res = _add_tablet_unlocked(new_tablet_id, new_schema_hash, tablet, true, false); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to add tablet to StorageEngine. res=" << res; break; } + is_tablet_added = true; + // TODO(lingbin): The following logic seems useless, can be removed? // Because if _add_tablet_unlocked() return OK, we must can get it from map. - is_tablet_added = true; - TabletSharedPtr tablet_ptr = _get_tablet_unlocked(request.tablet_id, request.tablet_schema.schema_hash); + TabletSharedPtr tablet_ptr = _get_tablet_unlocked(new_tablet_id, new_schema_hash); if (tablet_ptr == nullptr) { res = OLAP_ERR_TABLE_NOT_FOUND; LOG(WARNING) << "fail to get tablet. res=" << res; @@ -359,28 +353,21 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( } } while (0); - // should remove the pending path of tablet id no matter create tablet success or not - tablet->data_dir()->remove_pending_ids(TABLET_ID_PREFIX + std::to_string(request.tablet_id)); - - // clear environment - if (res != OLAP_SUCCESS) { - DorisMetrics::create_tablet_requests_failed.increment(1); - if (is_tablet_added) { - OLAPStatus status = _drop_tablet_unlocked( - request.tablet_id, request.tablet_schema.schema_hash, false); - if (status != OLAP_SUCCESS) { - LOG(WARNING) << "fail to drop tablet when create tablet failed. res=" << res; - } - } else { - tablet->delete_all_files(); - TabletMetaManager::remove(tablet->data_dir(), request.tablet_id, request.tablet_schema.schema_hash); - } - return nullptr; - } else { - LOG(INFO) << "finish to process create tablet. res=" << res; + if (res == OLAP_SUCCESS) { return tablet; } -} // _internal_create_tablet_unlocked + // something is wrong, we need clear environment + if (is_tablet_added) { + OLAPStatus status = _drop_tablet_unlocked(new_tablet_id, new_schema_hash, false); + if (status != OLAP_SUCCESS) { + LOG(WARNING) << "fail to drop tablet when create tablet failed. res=" << res; + } + } else { + tablet->delete_all_files(); + TabletMetaManager::remove(data_dir, new_tablet_id, new_schema_hash); + } + return nullptr; +} static string _gen_tablet_dir(const string& dir, int16_t shard_id, int64_t tablet_id) { string path = dir; @@ -391,21 +378,22 @@ static string _gen_tablet_dir(const string& dir, int16_t shard_id, int64_t table } TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( - const TCreateTabletReq& request, const bool is_schema_change_tablet, - const TabletSharedPtr ref_tablet, std::vector data_dirs) { - TabletSharedPtr tablet; - // Try to create tablet on each of all_available_root_path, util success + const TCreateTabletReq& request, const bool is_schema_change, + const Tablet* base_tablet, const std::vector& data_dirs) { + string pending_id = StrCat(TABLET_ID_PREFIX, request.tablet_id); + // Many attempts are made here in the hope that even if a disk fails, it can still continue. DataDir* last_dir = nullptr; for (auto& data_dir : data_dirs) { if (last_dir != nullptr) { - // if last dir != null, it means preivous create tablet retry failed - last_dir->remove_pending_ids(TABLET_ID_PREFIX + std::to_string(request.tablet_id)); + // If last_dir != null, it means the last attempt to create a tablet failed + last_dir->remove_pending_ids(pending_id); } last_dir = data_dir; + TabletMetaSharedPtr tablet_meta; // if create meta faild, do not need to clean dir, because it is only in memory OLAPStatus res = _create_tablet_meta_unlocked( - request, data_dir, is_schema_change_tablet, ref_tablet, &tablet_meta); + request, data_dir, is_schema_change, base_tablet, &tablet_meta); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to create tablet meta. res=" << res << ", root=" << data_dir->path(); @@ -423,39 +411,22 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( LOG(WARNING) << "skip this dir because tablet path exist, path="<< schema_hash_dir; continue; } else { - data_dir->add_pending_ids(TABLET_ID_PREFIX + std::to_string(request.tablet_id)); - Status ret = FileUtils::create_dir(schema_hash_dir); - if(!ret.ok()) { - LOG(WARNING) << "create dir fail. [res=" << res << " path:" << schema_hash_dir - << " error: " << ret.to_string(); - res = OLAP_ERR_CANNOT_CREATE_DIR; + data_dir->add_pending_ids(pending_id); + Status st = FileUtils::create_dir(schema_hash_dir); + if(!st.ok()) { + LOG(WARNING) << "create dir fail. path=" << schema_hash_dir + << " error=" << st.to_string(); continue; } } - tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir); - if (tablet == nullptr) { - LOG(WARNING) << "fail to load tablet from tablet_meta. root_path:" << data_dir->path(); - Status ret = FileUtils::remove_all(tablet_dir); - if (!ret.ok()) { - LOG(WARNING) << "remove tablet dir:" << tablet_dir << ", err: " << ret.to_string(); - res = OLAP_ERR_IO_ERROR; - } - continue; - } - break; + TabletSharedPtr new_tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir); + DCHECK(new_tablet != nullptr); + return new_tablet; } - return tablet; + return nullptr; } -// Drop tablet specified, the main logical is as follows: -// 1. tablet not in schema change: -// drop specified tablet directly; -// 2. tablet in schema change: -// a. schema change not finished && dropped tablet is base : -// base tablet cannot be dropped; -// b. other cases: -// drop specified tablet and clear schema change info. OLAPStatus TabletManager::drop_tablet( TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) { WriteLock wlock(&_tablet_map_lock); @@ -472,25 +443,20 @@ OLAPStatus TabletManager::drop_tablet( // drop specified tablet directly and clear schema change info. OLAPStatus TabletManager::_drop_tablet_unlocked( TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) { - LOG(INFO) << "begin to process drop tablet." - << "tablet=" << tablet_id << ", schema_hash=" << schema_hash; + LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; DorisMetrics::drop_tablet_requests_total.increment(1); - OLAPStatus res = OLAP_SUCCESS; - - // Get tablet which need to be droped - TabletSharedPtr dropped_tablet = _get_tablet_unlocked(tablet_id, schema_hash); - if (dropped_tablet == nullptr) { - LOG(WARNING) << "tablet to drop does not exist already." - << " tablet_id=" << tablet_id - << ", schema_hash=" << schema_hash; + // Fetch tablet which need to be droped + TabletSharedPtr to_drop_tablet = _get_tablet_unlocked(tablet_id, schema_hash); + if (to_drop_tablet == nullptr) { + LOG(WARNING) << "fail to drop tablet because it does not exist. " + << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; return OLAP_SUCCESS; } - // Try to get schema change info - AlterTabletTaskSharedPtr alter_task = dropped_tablet->alter_task(); - - // Drop tablet directly when not in schema change + // Try to get schema change info, we can drop tablet directly if it is not + // in schema-change state. + AlterTabletTaskSharedPtr alter_task = to_drop_tablet->alter_task(); if (alter_task == nullptr) { return _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files); } @@ -499,40 +465,52 @@ OLAPStatus TabletManager::_drop_tablet_unlocked( TTabletId related_tablet_id = alter_task->related_tablet_id(); TSchemaHash related_schema_hash = alter_task->related_schema_hash();; - // Check tablet is in schema change or not, is base tablet or not - bool is_schema_change_finished = (alter_state == ALTER_FINISHED || alter_state == ALTER_FAILED); - - bool is_drop_base_tablet = false; - TabletSharedPtr related_tablet = _get_tablet_unlocked( - related_tablet_id, related_schema_hash); + TabletSharedPtr related_tablet = _get_tablet_unlocked(related_tablet_id, related_schema_hash); if (related_tablet == nullptr) { + // TODO(lingbin): in what case, can this happen? LOG(WARNING) << "drop tablet directly when related tablet not found. " << " tablet_id=" << related_tablet_id << " schema_hash=" << related_schema_hash; return _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files); } - if (dropped_tablet->creation_time() < related_tablet->creation_time()) { - is_drop_base_tablet = true; + // Check whether the tablet we want to delete is in schema-change state + bool is_schema_change_finished = (alter_state == ALTER_FINISHED || alter_state == ALTER_FAILED); + + // Check whether the tablet we want to delete is base-tablet + bool is_dropping_base_tablet = false; + if (to_drop_tablet->creation_time() < related_tablet->creation_time()) { + is_dropping_base_tablet = true; } - if (is_drop_base_tablet && !is_schema_change_finished) { - LOG(WARNING) << "base tablet in schema change cannot be droped. tablet=" - << dropped_tablet->full_name(); + if (is_dropping_base_tablet && !is_schema_change_finished) { + LOG(WARNING) << "fail to drop tablet. it is in schema-change state. tablet=" + << to_drop_tablet->full_name(); return OLAP_ERR_PREVIOUS_SCHEMA_CHANGE_NOT_FINISHED; } - // Drop specified tablet and clear schema change info - // must first break the link and then drop the tablet - // if drop tablet, then break link. the link maybe exists but the tablet - // not exist when be restarts + // When the code gets here, there are two possibilities: + // 1. The tablet currently being deleted is a base-tablet, and the corresponding + // schema-change process has finished; + // 2. The tablet we are currently trying to drop is not base-tablet(i.e. a tablet + // generated from its base-tablet due to schema-change). For example, the current + // request is triggered by cancel alter). In this scenario, the corresponding + // schema-change task may still in process. + + // Drop specified tablet and clear schema-change info + // NOTE: must first break the hard-link and then drop the tablet. + // Otherwise, if first drop tablet, then break link. If BE restarts during execution, + // after BE restarts, the tablet is no longer in metadata, but because the hard-link + // is still there, the corresponding file may never be deleted from disk. related_tablet->obtain_header_wrlock(); - // should check the related tablet id in alter task is current tablet to be dropped - // A related to B, BUT B related to C - // if drop A, should not clear B's alter task + // should check the related tablet_id in alter task is current tablet to be dropped + // For example: A related to B, BUT B related to C. + // If drop A, should not clear B's alter task + OLAPStatus res = OLAP_SUCCESS; AlterTabletTaskSharedPtr related_alter_task = related_tablet->alter_task(); - if (related_alter_task != nullptr && related_alter_task->related_tablet_id() == tablet_id - && related_alter_task->related_schema_hash() == schema_hash) { + if (related_alter_task != nullptr + && related_alter_task->related_tablet_id() == tablet_id + && related_alter_task->related_schema_hash() == schema_hash) { related_tablet->delete_alter_task(); res = related_tablet->save_meta(); if (res != OLAP_SUCCESS) { @@ -544,13 +522,13 @@ OLAPStatus TabletManager::_drop_tablet_unlocked( res = _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to drop tablet which in schema change. tablet=" - << dropped_tablet->full_name(); + << to_drop_tablet->full_name(); return res; } LOG(INFO) << "finish to drop tablet. res=" << res; return res; -} // drop_tablet_unlock +} OLAPStatus TabletManager::drop_tablets_on_error_root_path( const vector& tablet_info_vec) { @@ -583,54 +561,60 @@ OLAPStatus TabletManager::drop_tablets_on_error_root_path( } return res; -} // drop_tablets_on_error_root_path +} TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema_hash, - bool include_deleted, std::string* err) { + bool include_deleted, string* err) { ReadLock rlock(&_tablet_map_lock); return _get_tablet_unlocked(tablet_id, schema_hash, include_deleted, err); } TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash, - bool include_deleted, std::string* err) { + bool include_deleted, string* err) { TabletSharedPtr tablet; tablet = _get_tablet_unlocked(tablet_id, schema_hash); if (tablet == nullptr && include_deleted) { for (auto& deleted_tablet : _shutdown_tablets) { - CHECK(deleted_tablet != nullptr) << "deleted tablet in nullptr"; - if (deleted_tablet->tablet_id() == tablet_id && deleted_tablet->schema_hash() == schema_hash) { + CHECK(deleted_tablet != nullptr) << "deleted tablet is nullptr"; + if (deleted_tablet->tablet_id() == tablet_id + && deleted_tablet->schema_hash() == schema_hash) { tablet = deleted_tablet; break; } } } - if (tablet != nullptr) { - if (!tablet->is_used()) { - LOG(WARNING) << "tablet cannot be used. tablet=" << tablet_id; - if (err != nullptr) { *err = "tablet cannot be used"; } - tablet.reset(); + if (tablet == nullptr) { + if (err != nullptr) { + *err = "tablet does not exist"; } - } else if (err != nullptr) { - *err = "tablet does not exist"; + return nullptr; + } + + if (!tablet->is_used()) { + LOG(WARNING) << "tablet cannot be used. tablet=" << tablet_id; + if (err != nullptr) { + *err = "tablet cannot be used"; + } + return nullptr; } return tablet; -} // get_tablet +} TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, bool include_deleted, - std::string* err) { + string* err) { ReadLock rlock(&_tablet_map_lock); TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id, schema_hash, include_deleted, err); if (tablet != nullptr && tablet->tablet_uid() == tablet_uid) { return tablet; } return nullptr; -} // get_tablet +} bool TabletManager::get_tablet_id_and_schema_hash_from_path( - const std::string& path, TTabletId* tablet_id, TSchemaHash* schema_hash) { + const string& path, TTabletId* tablet_id, TSchemaHash* schema_hash) { static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)"); if (RE2::PartialMatch(path, normal_re, tablet_id, schema_hash)) { return true; @@ -647,9 +631,9 @@ bool TabletManager::get_tablet_id_and_schema_hash_from_path( return true; } -bool TabletManager::get_rowset_id_from_path(const std::string& path, RowsetId* rowset_id) { +bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId* rowset_id) { static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*"); - std::string id_str; + string id_str; bool ret = RE2::PartialMatch(path, re, &id_str); if (ret) { rowset_id->init(id_str); @@ -658,179 +642,175 @@ bool TabletManager::get_rowset_id_from_path(const std::string& path, RowsetId* r return false; } -void TabletManager::get_tablet_stat(TTabletStatResult& result) { - VLOG(3) << "begin to get all tablet stat."; - - // get current time - int64_t current_time = UnixMillis(); - - // update cache if too old +void TabletManager::get_tablet_stat(TTabletStatResult* result) { + int64_t curr_ms = UnixMillis(); + // Update cache if it is too old { + int interval_sec = config::tablet_stat_cache_update_interval_second; std::lock_guard l(_tablet_stat_mutex); - if (current_time - _tablet_stat_cache_update_time_ms > - config::tablet_stat_cache_update_interval_second * 1000) { + if (curr_ms - _last_update_stat_ms > interval_sec * 1000) { VLOG(3) << "update tablet stat."; _build_tablet_stat(); - _tablet_stat_cache_update_time_ms = UnixMillis(); + _last_update_stat_ms = UnixMillis(); } } - result.__set_tablets_stats(_tablet_stat_cache); -} // get_tablet_stat + result->__set_tablets_stats(_tablet_stat_cache); +} -TabletSharedPtr TabletManager::find_best_tablet_to_compaction( - CompactionType compaction_type, DataDir* data_dir) { - int64_t now = UnixMillis(); +TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType compaction_type, + DataDir* data_dir) { + int64_t now_ms = UnixMillis(); + const string& compaction_type_str = CompactionType::BASE_COMPACTION ? "base" : "cumulative"; ReadLock tablet_map_rdlock(&_tablet_map_lock); uint32_t highest_score = 0; TabletSharedPtr best_tablet; for (tablet_map_t::value_type& table_ins : _tablet_map){ - for (TabletSharedPtr& table_ptr : table_ins.second.table_arr) { - AlterTabletTaskSharedPtr cur_alter_task = table_ptr->alter_task(); - if (cur_alter_task != nullptr && cur_alter_task->alter_state() != ALTER_FINISHED - && cur_alter_task->alter_state() != ALTER_FAILED) { - TabletSharedPtr related_tablet = _get_tablet_unlocked( - cur_alter_task->related_tablet_id(), cur_alter_task->related_schema_hash()); - if (related_tablet != nullptr - && table_ptr->creation_time() > related_tablet->creation_time()) { - // it means cur tablet is a new tablet during schema change or rollup, skip compaction - continue; - } + for (TabletSharedPtr& tablet_ptr : table_ins.second.table_arr) { + AlterTabletTaskSharedPtr cur_alter_task = tablet_ptr->alter_task(); + if (cur_alter_task != nullptr + && cur_alter_task->alter_state() != ALTER_FINISHED + && cur_alter_task->alter_state() != ALTER_FAILED) { + TabletSharedPtr related_tablet = _get_tablet_unlocked( + cur_alter_task->related_tablet_id(), cur_alter_task->related_schema_hash()); + if (related_tablet != nullptr + && tablet_ptr->creation_time() > related_tablet->creation_time()) { + // Current tablet is newly created during schema-change or rollup, skip it + continue; + } } - // if tablet is not ready, it maybe a new tablet under schema change, not do compaction - if (table_ptr->tablet_state() == TABLET_NOTREADY) { + // A not-ready tablet maybe a newly created tablet under schema-change, skip it + if (tablet_ptr->tablet_state() == TABLET_NOTREADY) { continue; } - if (table_ptr->data_dir()->path_hash() != data_dir->path_hash() - || !table_ptr->is_used() - || !table_ptr->init_succeeded() - || !table_ptr->can_do_compaction()) { + if (tablet_ptr->data_dir()->path_hash() != data_dir->path_hash() + || !tablet_ptr->is_used() + || !tablet_ptr->init_succeeded() + || !tablet_ptr->can_do_compaction()) { continue; } - int64_t last_failure_time = table_ptr->last_cumu_compaction_failure_time(); + int64_t last_failure_ms = tablet_ptr->last_cumu_compaction_failure_time(); if (compaction_type == CompactionType::BASE_COMPACTION) { - last_failure_time = table_ptr->last_base_compaction_failure_time(); + last_failure_ms = tablet_ptr->last_base_compaction_failure_time(); } - if (now - last_failure_time <= config::min_compaction_failure_interval_sec * 1000) { - VLOG(1) << "last " << (compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative") - << " compaction failure time is: " << last_failure_time << ", tablet: " << table_ptr->tablet_id(); + if (now_ms - last_failure_ms <= config::min_compaction_failure_interval_sec * 1000) { + VLOG(1) << "Too often to check compaction, skip it." + << "compaction_type=" << compaction_type_str + << ", last_failure_time_ms=" << last_failure_ms + << ", tablet_id=" << tablet_ptr->tablet_id(); continue; } - if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - MutexLock lock(table_ptr->get_cumulative_lock(), TRY_LOCK); + if (compaction_type == CompactionType::BASE_COMPACTION) { + MutexLock lock(tablet_ptr->get_base_lock(), TRY_LOCK); + if (!lock.own_lock()) { + continue; + } + } else { + MutexLock lock(tablet_ptr->get_cumulative_lock(), TRY_LOCK); if (!lock.own_lock()) { continue; } } - if (compaction_type == CompactionType::BASE_COMPACTION) { - MutexLock lock(table_ptr->get_base_lock(), TRY_LOCK); - if (!lock.own_lock()) { - continue; - } - } - ReadLock rdlock(table_ptr->get_header_lock_ptr()); uint32_t table_score = 0; - if (compaction_type == CompactionType::BASE_COMPACTION) { - table_score = table_ptr->calc_base_compaction_score(); - } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - table_score = table_ptr->calc_cumulative_compaction_score(); + { + ReadLock rdlock(tablet_ptr->get_header_lock_ptr()); + if (compaction_type == CompactionType::BASE_COMPACTION) { + table_score = tablet_ptr->calc_base_compaction_score(); + } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { + table_score = tablet_ptr->calc_cumulative_compaction_score(); + } } if (table_score > highest_score) { highest_score = table_score; - best_tablet = table_ptr; + best_tablet = tablet_ptr; } } } if (best_tablet != nullptr) { - LOG(INFO) << "find best tablet to do compaction." - << " type: " << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "cumulative" : "base") - << ", tablet id: " << best_tablet->tablet_id() << ", score: " << highest_score; - if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - DorisMetrics::tablet_cumulative_max_compaction_score.set_value(highest_score); - } else { + LOG(INFO) << "Found the best tablet for compaction. " + << "compaction_type=" << compaction_type_str + << ", tablet_id=" << best_tablet->tablet_id() + << ", highest_score=" << highest_score; + // TODO(lingbin): Remove 'max' from metric name, it would be misunderstood as the + // biggest in history(like peak), but it is really just the value at current moment. + if (compaction_type == CompactionType::BASE_COMPACTION) { DorisMetrics::tablet_base_max_compaction_score.set_value(highest_score); + } else { + DorisMetrics::tablet_cumulative_max_compaction_score.set_value(highest_score); } } return best_tablet; } OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id, - TSchemaHash schema_hash, const std::string& meta_binary, bool update_meta, bool force) { + TSchemaHash schema_hash, const string& meta_binary, bool update_meta, bool force) { WriteLock wlock(&_tablet_map_lock); TabletMetaSharedPtr tablet_meta(new TabletMeta()); OLAPStatus status = tablet_meta->deserialize(meta_binary); if (status != OLAP_SUCCESS) { - LOG(WARNING) << "parse meta_binary string failed for tablet_id:" << tablet_id << ", schema_hash:" << schema_hash; + LOG(WARNING) << "fail to load tablet because can not parse meta_binary string. " + << "tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash; return OLAP_ERR_HEADER_PB_PARSE_FAILED; } // check if tablet meta is valid if (tablet_meta->tablet_id() != tablet_id || tablet_meta->schema_hash() != schema_hash) { - LOG(WARNING) << "tablet meta load from meta is invalid" - << " input tablet id=" << tablet_id - << " input tablet schema_hash=" << schema_hash - << " meta tablet=" << tablet_meta->full_name(); + LOG(WARNING) << "fail to load tablet because meet invalid tablet meta. " + << "trying to load tablet(tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash << ")" + << ", but meet tablet=" << tablet_meta->full_name(); return OLAP_ERR_HEADER_PB_PARSE_FAILED; } if (tablet_meta->tablet_uid().hi == 0 && tablet_meta->tablet_uid().lo == 0) { - LOG(WARNING) << "not load this tablet because uid == 0" - << " tablet=" << tablet_meta->full_name(); + LOG(WARNING) << "fail to load tablet because its uid == 0. " + << "tablet=" << tablet_meta->full_name(); return OLAP_ERR_HEADER_PB_PARSE_FAILED; } - // init must be called TabletSharedPtr tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir); if (tablet == nullptr) { - LOG(WARNING) << "fail to new tablet. tablet_id=" << tablet_id << ", schema_hash:" << schema_hash; + LOG(WARNING) << "fail to load tablet. tablet_id=" << tablet_id + << ", schema_hash:" << schema_hash; return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR; } if (tablet_meta->tablet_state() == TABLET_SHUTDOWN) { - LOG(INFO) << "tablet is to be deleted, skip load it. tablet_id=" << tablet_meta->tablet_id() - << " schema_hash=" << tablet_meta->schema_hash(); + LOG(INFO) << "fail to load tablet because it is to be deleted. tablet_id=" << tablet_id + << " schema_hash=" << schema_hash; _shutdown_tablets.push_back(tablet); return OLAP_ERR_TABLE_ALREADY_DELETED_ERROR; } - // not check tablet init version because when be restarts during alter task the new tablet may be empty + // NOTE: We do not check tablet's initial version here, because if BE restarts when + // one tablet is doing schema-change, we may meet empty tablet. if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) { - LOG(WARNING) << "tablet is in running state without delta is invalid." + LOG(WARNING) << "fail to load tablet. it is in running state but without delta. " << "tablet=" << tablet->full_name(); // tablet state is invalid, drop tablet return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR; } - OLAPStatus res = tablet->init(); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "tablet init failed. tablet:" << tablet->full_name(); - return res; - } - res = _add_tablet_unlocked(tablet_id, schema_hash, tablet, update_meta, force); - if (res != OLAP_SUCCESS) { - // insert existed tablet return OLAP_SUCCESS - if (res == OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE) { - LOG(WARNING) << "add duplicate tablet. tablet=" << tablet->full_name(); - } - - LOG(WARNING) << "failed to add tablet. tablet=" << tablet->full_name(); - return res; - } - + RETURN_NOT_OK_LOG(tablet->init(), Substitute("tablet init failed. tablet=$0", + tablet->full_name())); + RETURN_NOT_OK_LOG(_add_tablet_unlocked(tablet_id, schema_hash, tablet, update_meta, force), + Substitute("fail to add tablet. tablet=$0", tablet->full_name())); return OLAP_SUCCESS; -} // load_tablet_from_meta +} OLAPStatus TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, - SchemaHash schema_hash, const string& schema_hash_path, + SchemaHash schema_hash, + const string& schema_hash_path, bool force) { LOG(INFO) << "begin to load tablet from dir. " - << " tablet_id=" << tablet_id - << " schema_hash=" << schema_hash - << " path = " << schema_hash_path; + << " tablet_id=" << tablet_id + << " schema_hash=" << schema_hash + << " path = " << schema_hash_path; // not add lock here, because load_tablet_from_meta already add lock string header_path = TabletMeta::construct_header_file_path(schema_hash_path, tablet_id); // should change shard id before load tablet @@ -839,44 +819,30 @@ OLAPStatus TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_ int32_t shard = stol(shard_str); // load dir is called by clone, restore, storage migration // should change tablet uid when tablet object changed - OLAPStatus res = TabletMeta::reset_tablet_uid(header_path); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to set tablet uid when copied tablet meta file" - << " header_path=" << header_path; - return res; - } - TabletMetaSharedPtr tablet_meta(new(nothrow) TabletMeta()); - do { - if (access(header_path.c_str(), F_OK) != 0) { - LOG(WARNING) << "fail to find header file. [header_path=" << header_path << "]"; - res = OLAP_ERR_FILE_NOT_EXIST; - break; - } - if (tablet_meta == nullptr) { - LOG(WARNING) << "fail to malloc TabletMeta."; - res = OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR; - break; - } + RETURN_NOT_OK_LOG(TabletMeta::reset_tablet_uid(header_path), Substitute( + "failed to set tablet uid when copied meta file. header_path=%0", header_path));; - if (tablet_meta->create_from_file(header_path) != OLAP_SUCCESS) { - LOG(WARNING) << "fail to load tablet_meta. file_path=" << header_path; - res = OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR; - break; - } - // has to change shard id here, because meta file maybe copyed from other source - // its shard is different from local shard - tablet_meta->set_shard_id(shard); - std::string meta_binary; - tablet_meta->serialize(&meta_binary); - res = load_tablet_from_meta(store, tablet_id, schema_hash, meta_binary, true, force); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to load tablet. [header_path=" << header_path << "]"; - res = OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR; - break; - } - } while (0); - return res; -} // load_tablet_from_dir + if (!Env::Default()->path_exists(header_path).ok()) { + LOG(WARNING) << "fail to find header file. [header_path=" << header_path << "]"; + return OLAP_ERR_FILE_NOT_EXIST; + } + + TabletMetaSharedPtr tablet_meta(new TabletMeta()); + if (tablet_meta->create_from_file(header_path) != OLAP_SUCCESS) { + LOG(WARNING) << "fail to load tablet_meta. file_path=" << header_path; + return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR; + } + // has to change shard id here, because meta file maybe copyed from other source + // its shard is different from local shard + tablet_meta->set_shard_id(shard); + string meta_binary; + tablet_meta->serialize(&meta_binary); + RETURN_NOT_OK_LOG(load_tablet_from_meta(store, tablet_id, schema_hash, + meta_binary, true, force), + Substitute("fail to load tablet. header_path=$0", header_path)); + + return OLAP_SUCCESS; +} void TabletManager::release_schema_change_lock(TTabletId tablet_id) { VLOG(3) << "release_schema_change_lock begin. tablet_id=" << tablet_id; @@ -889,7 +855,7 @@ void TabletManager::release_schema_change_lock(TTabletId tablet_id) { it->second.schema_change_lock.unlock(); } VLOG(3) << "release_schema_change_lock end. tablet_id=" << tablet_id; -} // release_schema_change_lock +} OLAPStatus TabletManager::report_tablet_info(TTabletInfo* tablet_info) { DorisMetrics::report_tablet_requests_total.increment(1); @@ -899,11 +865,9 @@ OLAPStatus TabletManager::report_tablet_info(TTabletInfo* tablet_info) { OLAPStatus res = OLAP_SUCCESS; - TabletSharedPtr tablet = get_tablet( - tablet_info->tablet_id, tablet_info->schema_hash); + TabletSharedPtr tablet = get_tablet(tablet_info->tablet_id, tablet_info->schema_hash); if (tablet == nullptr) { - LOG(WARNING) << "can't find tablet. " - << " tablet=" << tablet_info->tablet_id + LOG(WARNING) << "can't find tablet. " << " tablet=" << tablet_info->tablet_id << " schema_hash=" << tablet_info->schema_hash; return OLAP_ERR_TABLE_NOT_FOUND; } @@ -911,39 +875,32 @@ OLAPStatus TabletManager::report_tablet_info(TTabletInfo* tablet_info) { tablet->build_tablet_report_info(tablet_info); VLOG(10) << "success to process report tablet info."; return res; -} // report_tablet_info +} OLAPStatus TabletManager::report_all_tablets_info(std::map* tablets_info) { - LOG(INFO) << "begin to process report all tablets info."; + DCHECK(tablets_info != nullptr); + LOG(INFO) << "begin to report all tablets info"; // build the expired txn map first, outside the tablet map lock std::map> expire_txn_map; StorageEngine::instance()->txn_manager()->build_expire_txn_map(&expire_txn_map); - ReadLock rlock(&_tablet_map_lock); DorisMetrics::report_all_tablets_requests_total.increment(1); - - if (tablets_info == nullptr) { - return OLAP_ERR_INPUT_PARAMETER_ERROR; - } + ReadLock rlock(&_tablet_map_lock); for (const auto& item : _tablet_map) { if (item.second.table_arr.size() == 0) { continue; } - TTablet tablet; + uint64_t tablet_id = item.first; + TTablet t_tablet; for (TabletSharedPtr tablet_ptr : item.second.table_arr) { - if (tablet_ptr == nullptr) { - continue; - } - TTabletInfo tablet_info; tablet_ptr->build_tablet_report_info(&tablet_info); // find expire transaction corresponding to this tablet - TabletInfo tinfo = TabletInfo( - tablet_ptr->tablet_id(), tablet_ptr->schema_hash(), tablet_ptr->tablet_uid()); + TabletInfo tinfo(tablet_id, tablet_ptr->schema_hash(), tablet_ptr->tablet_uid()); vector transaction_ids; auto find = expire_txn_map.find(tinfo); if (find != expire_txn_map.end()) { @@ -952,18 +909,17 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map* } } tablet_info.__set_transaction_ids(transaction_ids); - - tablet.tablet_infos.push_back(tablet_info); + t_tablet.tablet_infos.push_back(tablet_info); } - if (tablet.tablet_infos.size() != 0) { - tablets_info->insert(pair(tablet.tablet_infos[0].tablet_id, tablet)); + if (!t_tablet.tablet_infos.empty()) { + tablets_info->emplace(tablet_id, t_tablet); } } - LOG(INFO) << "success to process report all tablets info. tablet_num=" << tablets_info->size(); + LOG(INFO) << "success to report all tablets info. tablet_count=" << tablets_info->size(); return OLAP_SUCCESS; -} // report_all_tablets_info +} OLAPStatus TabletManager::start_trash_sweep() { { @@ -981,9 +937,6 @@ OLAPStatus TabletManager::start_trash_sweep() { } } for (TabletSharedPtr tablet : item.second.table_arr) { - if (tablet == nullptr) { - continue; - } tablet->delete_expired_inc_rowsets(); } } @@ -1003,71 +956,70 @@ OLAPStatus TabletManager::start_trash_sweep() { // and get tablet will access shut_down_tablets WriteLock wlock(&_tablet_map_lock); auto it = _shutdown_tablets.begin(); - for (; it != _shutdown_tablets.end();) { + while (it != _shutdown_tablets.end()) { // check if the meta has the tablet info and its state is shutdown if (it->use_count() > 1) { - // it means current tablet is referenced in other thread + // it means current tablet is referenced by other thread ++it; continue; } - TabletMetaSharedPtr new_tablet_meta(new(nothrow) TabletMeta()); - if (new_tablet_meta == nullptr) { - LOG(WARNING) << "fail to malloc TabletMeta."; - ++it; - continue; - } - OLAPStatus check_st = TabletMetaManager::get_meta((*it)->data_dir(), - (*it)->tablet_id(), (*it)->schema_hash(), new_tablet_meta); + TabletMetaSharedPtr tablet_meta(new TabletMeta()); + OLAPStatus check_st = TabletMetaManager::get_meta( + (*it)->data_dir(), (*it)->tablet_id(), (*it)->schema_hash(), tablet_meta); if (check_st == OLAP_SUCCESS) { - if (new_tablet_meta->tablet_state() != TABLET_SHUTDOWN - || new_tablet_meta->tablet_uid() != (*it)->tablet_uid()) { + if (tablet_meta->tablet_state() != TABLET_SHUTDOWN + || tablet_meta->tablet_uid() != (*it)->tablet_uid()) { LOG(WARNING) << "tablet's state changed to normal, skip remove dirs" - << " tablet id = " << new_tablet_meta->tablet_id() - << " schema hash = " << new_tablet_meta->schema_hash() + << " tablet id = " << tablet_meta->tablet_id() + << " schema hash = " << tablet_meta->schema_hash() << " old tablet_uid=" << (*it)->tablet_uid() - << " cur tablet_uid=" << new_tablet_meta->tablet_uid(); + << " cur tablet_uid=" << tablet_meta->tablet_uid(); // remove it from list it = _shutdown_tablets.erase(it); continue; } - if (FileUtils::check_exist((*it)->tablet_path())) { + // move data to trash + string tablet_path = (*it)->tablet_path(); + if (Env::Default()->path_exists(tablet_path).ok()) { // take snapshot of tablet meta - std::string meta_file = (*it)->tablet_path() + "/" + std::to_string((*it)->tablet_id()) + ".hdr"; - (*it)->tablet_meta()->save(meta_file); - LOG(INFO) << "start to move path to trash" - << " tablet path = " << (*it)->tablet_path(); - OLAPStatus rm_st = move_to_trash((*it)->tablet_path(), (*it)->tablet_path()); + string meta_file_path = path_util::join_path_segments( + (*it)->tablet_path(), std::to_string((*it)->tablet_id()) + ".hdr"); + (*it)->tablet_meta()->save(meta_file_path); + LOG(INFO) << "start to move tablet to trash. tablet_path = " << tablet_path; + OLAPStatus rm_st = move_to_trash(tablet_path, tablet_path); if (rm_st != OLAP_SUCCESS) { - LOG(WARNING) << "failed to move dir to trash" - << " dir = " << (*it)->tablet_path(); + LOG(WARNING) << "fail to move dir to trash. dir=" << tablet_path; ++it; continue; } } - TabletMetaManager::remove((*it)->data_dir(), (*it)->tablet_id(), (*it)->schema_hash()); - LOG(INFO) << "successfully move tablet to trash." - << " tablet id " << (*it)->tablet_id() - << " schema hash " << (*it)->schema_hash() - << " tablet path " << (*it)->tablet_path(); + // remove tablet meta + TabletMetaManager::remove( + (*it)->data_dir(), (*it)->tablet_id(), (*it)->schema_hash()); + LOG(INFO) << "successfully move tablet to trash. " + << "tablet_id=" << (*it)->tablet_id() + << ", schema_hash=" << (*it)->schema_hash() + << ", tablet_path=" << tablet_path; it = _shutdown_tablets.erase(it); ++ clean_num; } else { // if could not find tablet info in meta store, then check if dir existed - if (FileUtils::check_exist((*it)->tablet_path())) { - LOG(WARNING) << "errors while load meta from store, skip this tablet" - << " tablet id " << (*it)->tablet_id() - << " schema hash " << (*it)->schema_hash(); + string tablet_path = (*it)->tablet_path(); + if (Env::Default()->path_exists(tablet_path).ok()) { + LOG(WARNING) << "errors while load meta from store, skip this tablet. " + << "tablet_id=" << (*it)->tablet_id() + << ", schema_hash=" << (*it)->schema_hash(); ++it; } else { - LOG(INFO) << "could not find tablet dir, skip move to trash, remove it from gc queue." - << " tablet id " << (*it)->tablet_id() - << " schema hash " << (*it)->schema_hash() - << " tablet path " << (*it)->tablet_path(); + LOG(INFO) << "could not find tablet dir, skip it and remove it from gc-queue. " + << "tablet_id=" << (*it)->tablet_id() + << ", schema_hash=" << (*it)->schema_hash() + << ", tablet_path=" << tablet_path; it = _shutdown_tablets.erase(it); } } - // if clean 100 tablets, should yield + // yield to avoid hoding _tablet_map_lock for too long if (clean_num >= 200) { break; } @@ -1091,13 +1043,15 @@ bool TabletManager::try_schema_change_lock(TTabletId tablet_id) { return res; } -void TabletManager::update_root_path_info(std::map* path_map, - size_t* tablet_counter) { +void TabletManager::update_root_path_info(std::map* path_map, + size_t* tablet_count) { + DCHECK(tablet_count != 0); + *tablet_count = 0; ReadLock rlock(&_tablet_map_lock); for (auto& entry : _tablet_map) { const TableInstances& instance = entry.second; for (auto& tablet : instance.table_arr) { - ++(*tablet_counter); + ++(*tablet_count); int64_t data_size = tablet->tablet_footprint(); auto iter = path_map->find(tablet->data_dir()->path()); if (iter == path_map->end()) { @@ -1108,15 +1062,13 @@ void TabletManager::update_root_path_info(std::map* pa } } } -} // update_root_path_info +} void TabletManager::get_partition_related_tablets(int64_t partition_id, std::set* tablet_infos) { ReadLock rlock(&_tablet_map_lock); if (_partition_tablet_map.find(partition_id) != _partition_tablet_map.end()) { - for (auto& tablet_info : _partition_tablet_map[partition_id]) { - tablet_infos->insert(tablet_info); - } + *tablet_infos = _partition_tablet_map[partition_id]; } } @@ -1125,17 +1077,17 @@ void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { { ReadLock tablet_map_rdlock(&_tablet_map_lock); for (tablet_map_t::value_type& table_ins : _tablet_map){ - for (TabletSharedPtr& table_ptr : table_ins.second.table_arr) { - // if tablet is not ready, it maybe a new tablet under schema change, not do compaction - if (table_ptr->tablet_state() != TABLET_RUNNING) { + for (TabletSharedPtr& tablet_ptr : table_ins.second.table_arr) { + if (tablet_ptr->tablet_state() != TABLET_RUNNING) { continue; } - if (table_ptr->data_dir()->path_hash() != data_dir->path_hash() - || !table_ptr->is_used() || !table_ptr->init_succeeded()) { + if (tablet_ptr->data_dir()->path_hash() != data_dir->path_hash() + || !tablet_ptr->is_used() + || !tablet_ptr->init_succeeded()) { continue; } - related_tablets.push_back(table_ptr); + related_tablets.push_back(tablet_ptr); } } } @@ -1157,15 +1109,15 @@ void TabletManager::_build_tablet_stat() { TTabletStat stat; stat.tablet_id = item.first; for (TabletSharedPtr tablet : item.second.table_arr) { + // TODO(lingbin): if it is nullptr, why is it not deleted? if (tablet == nullptr) { continue; } - // we only get base tablet's stat stat.__set_data_size(tablet->tablet_footprint()); stat.__set_row_num(tablet->num_rows()); - VLOG(3) << "tablet_id=" << item.first + VLOG(3) << "building tablet stat. tablet_id=" << item.first << ", data_size=" << tablet->tablet_footprint() - << ", row_num:" << tablet->num_rows(); + << ", row_num=" << tablet->num_rows(); break; } @@ -1173,26 +1125,17 @@ void TabletManager::_build_tablet_stat() { } } -OLAPStatus TabletManager::_create_inital_rowset_unlocked( - TabletSharedPtr tablet, const TCreateTabletReq& request) { +OLAPStatus TabletManager::_create_inital_rowset_unlocked(const TCreateTabletReq& request, + Tablet* tablet) { OLAPStatus res = OLAP_SUCCESS; - if (request.version < 1) { - LOG(WARNING) << "init version of tablet should at least 1."; + LOG(WARNING) << "init version of tablet should at least 1. req.ver=" << request.version; return OLAP_ERR_CE_CMD_PARAMS_ERROR; } else { Version version(0, request.version); - VLOG(3) << "begin to create init version. " - << "begin=" << version.first << ", end=" << version.second; + VLOG(3) << "begin to create init version. version=" << version; RowsetSharedPtr new_rowset; do { - if (version.first > version.second) { - LOG(WARNING) << "begin should not larger than end." - << " begin=" << version.first - << " end=" << version.second; - res = OLAP_ERR_INPUT_PARAMETER_ERROR; - break; - } RowsetWriterContext context; context.rowset_id = StorageEngine::instance()->next_rowset_id(); context.tablet_uid = tablet->tablet_uid(); @@ -1240,77 +1183,65 @@ OLAPStatus TabletManager::_create_inital_rowset_unlocked( // Unregister index and delete files(index and data) if failed if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to create init base version. " - << " res=" << res - << " version=" << request.version; + LOG(WARNING) << "fail to create initial rowset. res=" << res << " version=" << version; StorageEngine::instance()->add_unused_rowset(new_rowset); return res; } } tablet->set_cumulative_layer_point(request.version + 1); - // should not save tablet meta here, because it will be saved if add to map successfully - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to save header. [tablet=" << tablet->full_name() << "]"; - } + // NOTE: should not save tablet meta here, because it will be saved if add to map successfully return res; } OLAPStatus TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& request, DataDir* store, - const bool is_schema_change_tablet, - const TabletSharedPtr ref_tablet, + const bool is_schema_change, + const Tablet* base_tablet, TabletMetaSharedPtr* tablet_meta) { - uint64_t shard_id = 0; - OLAPStatus res = store->get_shard(&shard_id); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to get root path shard. res=" << res; - return res; - } - uint32_t next_unique_id = 0; - uint32_t col_ordinal = 0; - std::unordered_map col_ordinal_to_unique_id; - if (!is_schema_change_tablet) { - for (TColumn column : request.tablet_schema.columns) { - col_ordinal_to_unique_id[col_ordinal] = col_ordinal; - col_ordinal++; + std::unordered_map col_idx_to_unique_id; + if (!is_schema_change) { + for (uint32_t col_idx = 0; col_idx < request.tablet_schema.columns.size(); ++col_idx) { + col_idx_to_unique_id[col_idx] = col_idx; } - next_unique_id = col_ordinal; + next_unique_id = request.tablet_schema.columns.size(); } else { - next_unique_id = ref_tablet->next_unique_id(); - size_t num_columns = ref_tablet->num_columns(); - size_t field = 0; - for (TColumn column : request.tablet_schema.columns) { + next_unique_id = base_tablet->next_unique_id(); + size_t old_num_columns = base_tablet->num_columns(); + auto& new_columns = request.tablet_schema.columns; + for (uint32_t new_col_idx = 0; new_col_idx < new_columns.size(); ++new_col_idx) { + const TColumn& column = new_columns[new_col_idx]; // For schema change, compare old_tablet and new_tablet: - // 1. if column exist in both new_tablet and old_tablet, assign unique_id of old_tablet - // to the column ordinal number of new_tablet + // 1. if column exist in both new_tablet and old_tablet, choose the column's + // unique_id in old_tablet to be the column's ordinal number in new_tablet // 2. if column exists only in new_tablet, assign next_unique_id of old_tablet // to the new column - for (field = 0 ; field < num_columns; ++field) { - if (ref_tablet->tablet_schema().column(field).name() == column.column_name) { - uint32_t unique_id = ref_tablet->tablet_schema().column(field).unique_id(); - col_ordinal_to_unique_id[col_ordinal] = unique_id; + size_t old_col_idx = 0; + for (old_col_idx = 0 ; old_col_idx < old_num_columns; ++old_col_idx) { + const string& old_name = base_tablet->tablet_schema().column(old_col_idx).name(); + if (old_name == column.column_name) { + uint32_t old_unique_id + = base_tablet->tablet_schema().column(old_col_idx).unique_id(); + col_idx_to_unique_id[new_col_idx] = old_unique_id; break; } } // Not exist in old tablet, it is a new added column - if (field == num_columns) { - col_ordinal_to_unique_id[col_ordinal] = next_unique_id; - next_unique_id++; + if (old_col_idx == old_num_columns) { + col_idx_to_unique_id[new_col_idx] = next_unique_id++; } - col_ordinal++; } } + LOG(INFO) << "creating tablet meta. next_unique_id=" << next_unique_id; - LOG(INFO) << "in _create_tablet_meta_unlocked(): next_unique_id:" << next_unique_id; - // it is a new tablet meta obviously, should generate a new tablet id - TabletUid tablet_uid = TabletUid::gen_uid(); - res = TabletMeta::create(request.table_id, request.partition_id, - request.tablet_id, request.tablet_schema.schema_hash, - shard_id, request.tablet_schema, - next_unique_id, col_ordinal_to_unique_id, - tablet_meta, tablet_uid); + // We generate a new tablet_uid for this new tablet. + uint64_t shard_id = 0; + RETURN_NOT_OK_LOG(store->get_shard(&shard_id), "fail to get root path shard"); + OLAPStatus res = TabletMeta::create(request, TabletUid::gen_uid(), shard_id, + next_unique_id, col_idx_to_unique_id, tablet_meta); + + // TODO(lingbin): when beta-rowset is default, should remove it if (request.__isset.storage_format && request.storage_format == TStorageFormat::V2) { (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET); } @@ -1319,61 +1250,52 @@ OLAPStatus TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& r OLAPStatus TabletManager::_drop_tablet_directly_unlocked( TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) { - OLAPStatus res = OLAP_SUCCESS; - TabletSharedPtr dropped_tablet = _get_tablet_unlocked(tablet_id, schema_hash); if (dropped_tablet == nullptr) { - LOG(WARNING) << "fail to drop not existed tablet. " + LOG(WARNING) << "fail to drop tablet because it does not exist. " << " tablet_id=" << tablet_id - << " schema_hash=" << schema_hash; + << ", schema_hash=" << schema_hash; return OLAP_ERR_TABLE_NOT_FOUND; } - for (list::iterator it = _tablet_map[tablet_id].table_arr.begin(); - it != _tablet_map[tablet_id].table_arr.end();) { - if ((*it)->equal(tablet_id, schema_hash)) { - TabletSharedPtr tablet = *it; - _remove_tablet_from_partition_unlocked(*(*it)); - it = _tablet_map[tablet_id].table_arr.erase(it); - if (!keep_files) { - // drop tablet will update tablet meta, should lock - WriteLock wrlock(tablet->get_header_lock_ptr()); - LOG(INFO) << "set tablet to shutdown state and remove it from memory" - << " tablet_id=" << tablet_id - << " schema_hash=" << schema_hash - << " tablet path=" << dropped_tablet->tablet_path(); - // has to update tablet here, must not update tablet meta directly - // because other thread may hold the tablet object, they may save meta too - // if update meta directly here, other thread may override the meta - // and the tablet will be loaded at restart time. - tablet->set_tablet_state(TABLET_SHUTDOWN); - res = tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to drop tablet. " - << " tablet_id=" << tablet_id - << " schema_hash=" << schema_hash; - return res; - } - _shutdown_tablets.push_back(tablet); - } - } else { + list& candidate_tablets = _tablet_map[tablet_id].table_arr; + list::iterator it = candidate_tablets.begin(); + while (it != candidate_tablets.end()) { + if (!(*it)->equal(tablet_id, schema_hash)) { ++it; + continue; + } + + TabletSharedPtr tablet = *it; + _remove_tablet_from_partition_unlocked(*(*it)); + it = candidate_tablets.erase(it); + if (!keep_files) { + // drop tablet will update tablet meta, should lock + WriteLock wrlock(tablet->get_header_lock_ptr()); + LOG(INFO) << "set tablet to shutdown state and remove it from memory. " + << "tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash + << ", tablet_path=" << dropped_tablet->tablet_path(); + // NOTE: has to update tablet here, but must not update tablet meta directly. + // because other thread may hold the tablet object, they may save meta too. + // If update meta directly here, other thread may override the meta + // and the tablet will be loaded at restart time. + // To avoid this exception, we first set the state of the tablet to `SHUTDOWN`. + tablet->set_tablet_state(TABLET_SHUTDOWN); + RETURN_NOT_OK_LOG(tablet->save_meta(), Substitute( + "fail to drop tablet.tablet_id=$0, schema_hash=$1", tablet_id, schema_hash)); + _shutdown_tablets.push_back(tablet); } } - res = dropped_tablet->deregister_tablet_from_dir(); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to unregister from root path. " - << " res= " << res - << " tablet=" << tablet_id; - } - - return res; -} // _drop_tablet_directly_unlocked + RETURN_NOT_OK_LOG(dropped_tablet->deregister_tablet_from_dir(), Substitute( + "fail to unregister from root path. tablet=$0, schema_hash=$1", + tablet_id, schema_hash)); + return OLAP_SUCCESS; +} TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash) { - VLOG(3) << "begin to get tablet. tablet_id=" << tablet_id - << ", schema_hash=" << schema_hash; + VLOG(3) << "begin to get tablet. tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; tablet_map_t::iterator it = _tablet_map.find(tablet_id); if (it != _tablet_map.end()) { for (TabletSharedPtr tablet : it->second.table_arr) { @@ -1386,8 +1308,7 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaH } } - VLOG(3) << "fail to get tablet. tablet_id=" << tablet_id - << ", schema_hash=" << schema_hash; + VLOG(3) << "fail to get tablet. tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; // Return nullptr tablet if fail TabletSharedPtr tablet; return tablet; diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 93b7d8a363..f45df1073e 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "agent/status.h" #include "common/status.h" @@ -50,11 +51,14 @@ public: bool check_tablet_id_exist(TTabletId tablet_id); - void clear(); - - // Create new tablet for StorageEngine - OLAPStatus create_tablet(const TCreateTabletReq& request, - std::vector stores); + // The param stores holds all candidate data_dirs for this tablet. + // NOTE: If the request is from a schema-changing tablet, The directory selected by the + // new tablet should be the same as the directory of origin tablet. Because the + // linked-schema-change type requires Linux hard-link, which does not support cross disk. + // TODO(lingbin): Other schema-change type do not need to be on the same disk. Because + // there may be insufficient space on the current disk, which will lead the schema-change + // task to be fail, even if there is enough space on other disks + OLAPStatus create_tablet(const TCreateTabletReq& request, std::vector stores); // Drop a tablet by description // If set keep_files == true, files will NOT be deleted when deconstruction. @@ -89,7 +93,7 @@ public: static bool get_rowset_id_from_path(const std::string& path, RowsetId* rowset_id); - void get_tablet_stat(TTabletStatResult& result); + void get_tablet_stat(TTabletStatResult* result); // parse tablet header msg to generate tablet object OLAPStatus load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id, @@ -138,7 +142,9 @@ private: bool keep_files, bool drop_old); bool _check_tablet_id_exist_unlocked(TTabletId tablet_id); - OLAPStatus _create_inital_rowset_unlocked(TabletSharedPtr tablet, const TCreateTabletReq& request); + OLAPStatus _create_inital_rowset_unlocked(const TCreateTabletReq& request, + Tablet* tablet); + OLAPStatus _drop_tablet_directly_unlocked(TTabletId tablet_id, TSchemaHash schema_hash, bool keep_files = false); @@ -151,17 +157,17 @@ private: TabletSharedPtr _internal_create_tablet_unlocked(const AlterTabletType alter_type, const TCreateTabletReq& request, - const bool is_schema_change_tablet, - const TabletSharedPtr ref_tablet, - std::vector data_dirs); + const bool is_schema_change, + const Tablet* base_tablet, + const std::vector& data_dirs); TabletSharedPtr _create_tablet_meta_and_dir_unlocked(const TCreateTabletReq& request, - const bool is_schema_change_tablet, - const TabletSharedPtr ref_tablet, - std::vector data_dirs); + const bool is_schema_change, + const Tablet* base_tablet, + const std::vector& data_dirs); OLAPStatus _create_tablet_meta_unlocked(const TCreateTabletReq& request, DataDir* store, const bool is_schema_change_tablet, - const TabletSharedPtr ref_tablet, + const Tablet* base_tablet, TabletMetaSharedPtr* tablet_meta); void _build_tablet_stat(); @@ -169,32 +175,33 @@ private: void _remove_tablet_from_partition_unlocked(const Tablet& tablet); private: + DISALLOW_COPY_AND_ASSIGN(TabletManager); + // TODO(lingbin): should be TabletInstances? // should be removed after schema_hash be removed struct TableInstances { Mutex schema_change_lock; + // The first element(i.e. tablet_arr[0]) is the base tablet. When we add new tablet + // to tablet_arr, we will sort all the elements in create-time ascending order, + // which will ensure the first one is base-tablet std::list table_arr; }; - typedef std::map tablet_map_t; + // tablet_id -> TabletInstances + typedef std::unordered_map tablet_map_t; + + // Protect _tablet_map, _partition_tablet_map, _shutdown_tablets RWMutex _tablet_map_lock; tablet_map_t _tablet_map; - std::map _store_map; + // partition_id => tablet_info + std::map> _partition_tablet_map; + std::vector _shutdown_tablets; std::mutex _tablet_stat_mutex; - // cache to save tablets' statistics, such as data size and row + // cache to save tablets' statistics, such as data-size and row-count // TODO(cmy): for now, this is a naive implementation std::map _tablet_stat_cache; // last update time of tablet stat cache - int64_t _tablet_stat_cache_update_time_ms; - - uint32_t _available_storage_medium_type_count; - - std::vector _shutdown_tablets; - - // map from partition id to tablet_id - std::map> _partition_tablet_map; - - DISALLOW_COPY_AND_ASSIGN(TabletManager); + int64_t _last_update_stat_ms; }; } // namespace doris diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 1e2bf54ca9..8001bc3d93 100755 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -28,6 +28,10 @@ #include "util/uid_util.h" #include "util/url_coding.h" +using std::string; +using std::unordered_map; +using std::vector; + namespace doris { OLAPStatus AlterTabletTask::init_from_pb(const AlterTabletPB& alter_task) { @@ -60,12 +64,23 @@ OLAPStatus TabletMeta::create(int64_t table_id, int64_t partition_id, int64_t tablet_id, int32_t schema_hash, uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t next_unique_id, - const std::unordered_map& col_ordinal_to_unique_id, + const unordered_map& col_ordinal_to_unique_id, TabletMetaSharedPtr* tablet_meta, TabletUid& tablet_uid) { tablet_meta->reset(new TabletMeta(table_id, partition_id, - tablet_id, schema_hash, - shard_id, tablet_schema, - next_unique_id, col_ordinal_to_unique_id, tablet_uid)); + tablet_id, schema_hash, + shard_id, tablet_schema, + next_unique_id, col_ordinal_to_unique_id, tablet_uid)); + return OLAP_SUCCESS; +} + +OLAPStatus TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tablet_uid, + uint64_t shard_id, uint32_t next_unique_id, + const unordered_map& col_ordinal_to_unique_id, + TabletMetaSharedPtr* tablet_meta) { + tablet_meta->reset(new TabletMeta(request.table_id, request.partition_id, + request.tablet_id, request.tablet_schema.schema_hash, + shard_id, request.tablet_schema, + next_unique_id, col_ordinal_to_unique_id, tablet_uid)); return OLAP_SUCCESS; } @@ -175,7 +190,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, init_from_pb(tablet_meta_pb); } -OLAPStatus TabletMeta::create_from_file(const std::string& file_path) { +OLAPStatus TabletMeta::create_from_file(const string& file_path) { FileHeader file_header; FileHandler file_handler; @@ -201,7 +216,7 @@ OLAPStatus TabletMeta::create_from_file(const std::string& file_path) { return init_from_pb(tablet_meta_pb); } -OLAPStatus TabletMeta::reset_tablet_uid(const std::string& file_path) { +OLAPStatus TabletMeta::reset_tablet_uid(const string& file_path) { OLAPStatus res = OLAP_SUCCESS; TabletMeta tmp_tablet_meta; if ((res = tmp_tablet_meta.create_from_file(file_path)) != OLAP_SUCCESS) { @@ -226,7 +241,8 @@ OLAPStatus TabletMeta::reset_tablet_uid(const std::string& file_path) { return res; } -std::string TabletMeta::construct_header_file_path(const std::string& schema_hash_path, const int64_t tablet_id) { +string TabletMeta::construct_header_file_path(const string& schema_hash_path, + const int64_t tablet_id) { std::stringstream header_name_stream; header_name_stream << schema_hash_path << "/" << tablet_id << ".hdr"; return header_name_stream.str(); @@ -429,7 +445,7 @@ OLAPStatus TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { return OLAP_SUCCESS; } -OLAPStatus TabletMeta::to_json(std::string* json_string, json2pb::Pb2JsonOptions& options) { +OLAPStatus TabletMeta::to_json(string* json_string, json2pb::Pb2JsonOptions& options) { TabletMetaPB tablet_meta_pb; RETURN_NOT_OK(to_meta_pb(&tablet_meta_pb)); json2pb::ProtoMessageToJson(tablet_meta_pb, json_string, options); @@ -705,7 +721,7 @@ OLAPStatus TabletMeta::set_alter_state(AlterTabletState alter_state) { } } -std::string TabletMeta::full_name() const { +string TabletMeta::full_name() const { std::stringstream ss; ss << _tablet_id << "." << _schema_hash diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index ef2c4f6390..7973f47ae6 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -33,9 +33,6 @@ #include "util/mutex.h" #include "util/uid_util.h" -using std::string; -using std::vector; - namespace doris { // Lifecycle states that a Tablet can be in. Legal state transitions for a @@ -107,6 +104,12 @@ public: uint32_t next_unique_id, const std::unordered_map& col_ordinal_to_unique_id, TabletMetaSharedPtr* tablet_meta, TabletUid& tablet_uid); + + static OLAPStatus create(const TCreateTabletReq& request, const TabletUid& tablet_uid, + uint64_t shard_id, uint32_t next_unique_id, + const std::unordered_map& col_ordinal_to_unique_id, + TabletMetaSharedPtr* tablet_meta); + TabletMeta(); TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id, int32_t schema_hash, @@ -119,13 +122,14 @@ public: // Previous tablet_meta is a physical file in tablet dir, which is not stored in rocksdb. OLAPStatus create_from_file(const std::string& file_path); OLAPStatus save(const std::string& file_path); - static OLAPStatus save(const string& file_path, TabletMetaPB& tablet_meta_pb); + static OLAPStatus save(const std::string& file_path, TabletMetaPB& tablet_meta_pb); static OLAPStatus reset_tablet_uid(const std::string& file_path); - static std::string construct_header_file_path(const std::string& schema_hash_path, const int64_t tablet_id); + static std::string construct_header_file_path(const std::string& schema_hash_path, + const int64_t tablet_id); OLAPStatus save_meta(DataDir* data_dir); - OLAPStatus serialize(string* meta_binary); - OLAPStatus deserialize(const string& meta_binary); + OLAPStatus serialize(std::string* meta_binary); + OLAPStatus deserialize(const std::string& meta_binary); OLAPStatus init_from_pb(const TabletMetaPB& tablet_meta_pb); OLAPStatus to_meta_pb(TabletMetaPB* tablet_meta_pb); @@ -157,16 +161,17 @@ public: inline const TabletSchema& tablet_schema() const; - inline const vector& all_rs_metas() const; + inline const std::vector& all_rs_metas() const; OLAPStatus add_rs_meta(const RowsetMetaSharedPtr& rs_meta); RowsetMetaSharedPtr acquire_rs_meta_by_version(const Version& version) const; - OLAPStatus delete_rs_meta_by_version(const Version& version, vector* deleted_rs_metas); - OLAPStatus modify_rs_metas(const vector& to_add, - const vector& to_delete); + OLAPStatus delete_rs_meta_by_version(const Version& version, + std::vector* deleted_rs_metas); + OLAPStatus modify_rs_metas(const std::vector& to_add, + const std::vector& to_delete); OLAPStatus revise_rs_metas(const std::vector& rs_metas); OLAPStatus revise_inc_rs_metas(const std::vector& rs_metas); - inline const vector& all_inc_rs_metas() const; + inline const std::vector& all_inc_rs_metas() const; OLAPStatus add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta); OLAPStatus delete_inc_rs_meta_by_version(const Version& version); RowsetMetaSharedPtr acquire_inc_rs_meta_by_version(const Version& version) const; @@ -207,8 +212,8 @@ private: TabletState _tablet_state; TabletSchema _schema; - vector _rs_metas; - vector _inc_rs_metas; + std::vector _rs_metas; + std::vector _inc_rs_metas; DelPredicateArray _del_pred_array; AlterTabletTaskSharedPtr _alter_task; bool _in_restore_mode = false; @@ -303,11 +308,11 @@ inline const TabletSchema& TabletMeta::tablet_schema() const { return _schema; } -inline const vector& TabletMeta::all_rs_metas() const { +inline const std::vector& TabletMeta::all_rs_metas() const { return _rs_metas; } -inline const vector& TabletMeta::all_inc_rs_metas() const { +inline const std::vector& TabletMeta::all_inc_rs_metas() const { return _inc_rs_metas; } diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 8ef958ebe7..2698163fe9 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -32,6 +32,7 @@ #include "gen_cpp/PaloInternalService_types.h" #include "gen_cpp/DorisExternalService_types.h" #include "gen_cpp/Types_types.h" +#include "gutil/strings/substitute.h" #include "olap/storage_engine.h" #include "runtime/external_scan_context_mgr.h" @@ -219,12 +220,12 @@ void BackendService::erase_export_task(TStatus& t_status, const TUniqueId& task_ } void BackendService::get_tablet_stat(TTabletStatResult& result) { - StorageEngine::instance()->tablet_manager()->get_tablet_stat(result); + StorageEngine::instance()->tablet_manager()->get_tablet_stat(&result); } void BackendService::submit_routine_load_task( TStatus& t_status, const std::vector& tasks) { - + for (auto& task : tasks) { Status st = _exec_env->routine_load_task_executor()->submit_task(task); if (!st.ok()) { @@ -281,9 +282,9 @@ void BackendService::get_next(TScanBatchResult& result_, const TScanNextBatchPar LOG(ERROR) << "getNext error: context offset [" << context->offset<<" ]" << " ,client offset [ " << offset << " ]"; // invalid offset t_status.status_code = TStatusCode::NOT_FOUND; - std::stringstream msg; - msg << "context_id: " << context_id << " send offset: " << offset << "diff with context offset: " << context->offset; - t_status.error_msgs.push_back(msg.str()); + t_status.error_msgs.push_back(strings::Substitute( + "context_id=$0, send_offset=$1, context_offset=$2", + context_id, offset, context->offset)); result_.status = t_status; } else { // during accessing, should disabled last_access_time diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 6a4f6b44fb..a2b9f3a60b 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -100,7 +100,7 @@ public: virtual void submit_etl_task(TAgentResult& result, const TMiniLoadEtlTaskRequest& request) { - VLOG_ROW << "submit_etl_task. request is " + VLOG_RPC << "submit_etl_task. request is " << apache::thrift::ThriftDebugString(request).c_str(); _agent_server->submit_etl_task(result, request); } diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp index 835d48a3f8..ddf71f78e3 100644 --- a/be/test/olap/tablet_mgr_test.cpp +++ b/be/test/olap/tablet_mgr_test.cpp @@ -85,7 +85,6 @@ public: if (boost::filesystem::exists(_engine_data_path)) { ASSERT_TRUE(boost::filesystem::remove_all(_engine_data_path)); } - _tablet_mgr.clear(); } private: