diff --git a/be/src/http/action/compaction_action.cpp b/be/src/http/action/compaction_action.cpp index 11f5f091f4..489ba6da23 100644 --- a/be/src/http/action/compaction_action.cpp +++ b/be/src/http/action/compaction_action.cpp @@ -56,10 +56,7 @@ Status CompactionAction::_handle_show_compaction(HttpRequest *req, std::string* return Status::NotFound("Tablet not found"); } - OLAPStatus s = tablet->get_compaction_status(json_result); - if (s != OLAP_SUCCESS) { - return Status::InternalError(strings::Substitute("failed to get tablet compaction status. res $0", s)); - } + tablet->get_compaction_status(json_result); return Status::OK(); } diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 461de413ac..d203b6363d 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -90,7 +90,7 @@ OLAPStatus Compaction::do_compaction_impl() { RETURN_NOT_OK(check_correctness(stats)); // 4. modify rowsets in memory - RETURN_NOT_OK(modify_rowsets()); + modify_rowsets(); // 5. update last success compaction time int64_t now = UnixMillis(); @@ -140,30 +140,13 @@ OLAPStatus Compaction::construct_input_rowset_readers() { return OLAP_SUCCESS; } -OLAPStatus Compaction::modify_rowsets() { +void Compaction::modify_rowsets() { std::vector output_rowsets; output_rowsets.push_back(_output_rowset); WriteLock wrlock(_tablet->get_header_lock_ptr()); - OLAPStatus res = _tablet->modify_rowsets(output_rowsets, _input_rowsets); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to replace data sources. res" << res - << ", tablet=" << _tablet->full_name() - << ", compaction__version=" << _output_version.first - << "-" << _output_version.second; - return res; - } - - res = _tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to save tablet meta. res=" << res - << ", tablet=" << _tablet->full_name() - << ", compaction_version=" << _output_version.first - << "-" << _output_version.second; - return OLAP_ERR_BE_SAVE_HEADER_ERROR; - } - - return OLAP_SUCCESS; + _tablet->modify_rowsets(output_rowsets, _input_rowsets); + _tablet->save_meta(); } OLAPStatus Compaction::gc_unused_rowsets() { diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index aeccc8da19..9837ac5ea6 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -59,7 +59,7 @@ protected: OLAPStatus do_compaction(); OLAPStatus do_compaction_impl(); - OLAPStatus modify_rowsets(); + void modify_rowsets(); OLAPStatus gc_unused_rowsets(); OLAPStatus construct_output_rowset_writer(); diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 4a32553b48..5aa6a8dd5c 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -40,7 +40,7 @@ OLAPStatus CumulativeCompaction::compact() { } // 1.calculate cumulative point - RETURN_NOT_OK(_tablet->calculate_cumulative_point()); + _tablet->calculate_cumulative_point(); // 2. pick rowsets to compact RETURN_NOT_OK(pick_rowsets_to_compact()); diff --git a/be/src/olap/rowset_graph.cpp b/be/src/olap/rowset_graph.cpp index cb59ea0fca..4b56ba96ca 100644 --- a/be/src/olap/rowset_graph.cpp +++ b/be/src/olap/rowset_graph.cpp @@ -150,11 +150,6 @@ OLAPStatus RowsetGraph::capture_consistent_versions(const Version& spec_version, return OLAP_ERR_INPUT_PARAMETER_ERROR; } - if (version_path == nullptr) { - LOG(WARNING) << "param version_path is nullptr."; - return OLAP_ERR_INPUT_PARAMETER_ERROR; - } - // bfs_queue's element is vertex_index. std::queue bfs_queue; // predecessor[i] means the predecessor of vertex_index 'i'. @@ -227,25 +222,26 @@ OLAPStatus RowsetGraph::capture_consistent_versions(const Version& spec_version, reversed_path.push_back(tmp_vertex_index); } - // Make version_path from reversed_path. - std::stringstream shortest_path_for_debug; - for (size_t path_id = reversed_path.size() - 1; path_id > 0; --path_id) { - int64_t tmp_start_vertex_value = _version_graph[reversed_path[path_id]].value; - int64_t tmp_end_vertex_value = _version_graph[reversed_path[path_id - 1]].value; + if (version_path != nullptr) { + // Make version_path from reversed_path. + std::stringstream shortest_path_for_debug; + for (size_t path_id = reversed_path.size() - 1; path_id > 0; --path_id) { + int64_t tmp_start_vertex_value = _version_graph[reversed_path[path_id]].value; + int64_t tmp_end_vertex_value = _version_graph[reversed_path[path_id - 1]].value; - // tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value - if (tmp_start_vertex_value <= tmp_end_vertex_value) { - version_path->emplace_back(tmp_start_vertex_value, tmp_end_vertex_value - 1); - } else { - version_path->emplace_back(tmp_end_vertex_value, tmp_start_vertex_value - 1); + // tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value + if (tmp_start_vertex_value <= tmp_end_vertex_value) { + version_path->emplace_back(tmp_start_vertex_value, tmp_end_vertex_value - 1); + } else { + version_path->emplace_back(tmp_end_vertex_value, tmp_start_vertex_value - 1); + } + + shortest_path_for_debug << (*version_path)[version_path->size() - 1] << ' '; } - - shortest_path_for_debug << (*version_path)[version_path->size() - 1] << ' '; + VLOG(10) << "success to find path for spec_version. spec_version=" << spec_version + << ", path=" << shortest_path_for_debug.str(); } - VLOG(10) << "success to find path for spec_version. spec_version=" << spec_version - << ", path=" << shortest_path_for_debug.str(); - return OLAP_SUCCESS; } diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index e69d040c30..65725b7533 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1322,11 +1322,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe // check if new_tablet.ce_point > base_tablet.ce_point? new_tablet->set_cumulative_layer_point(-1); // save tablet meta - res = new_tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to save tablet meta after remove rowset from new tablet" - << new_tablet->full_name(); - } + new_tablet->save_meta(); for (auto& rowset : rowsets_to_delete) { // do not call rowset.remove directly, using gc thread to delete it StorageEngine::instance()->add_unused_rowset(rowset); @@ -1399,10 +1395,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe if (res != OLAP_SUCCESS) { break; } - res = new_tablet->save_meta(); - if (res != OLAP_SUCCESS) { - break; - } + new_tablet->save_meta(); } while(0); if (res == OLAP_SUCCESS) { @@ -1588,25 +1581,14 @@ OLAPStatus SchemaChangeHandler::_add_alter_task( new_tablet->schema_hash(), versions_to_be_changed, alter_tablet_type); - OLAPStatus res = base_tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to save base tablet meta. res=" << res - << ", tablet=" << base_tablet->full_name(); - return res; - } - + base_tablet->save_meta(); new_tablet->add_alter_task(base_tablet->tablet_id(), base_tablet->schema_hash(), vector(), // empty versions alter_tablet_type); - res = new_tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to save new tablet meta. res=" << res - << ", tablet=" << new_tablet->full_name(); - return res; - } + new_tablet->save_meta(); LOG(INFO) << "successfully add alter task to both base and new"; - return res; + return OLAP_SUCCESS; } OLAPStatus SchemaChangeHandler::_save_alter_state( @@ -1627,13 +1609,7 @@ OLAPStatus SchemaChangeHandler::_save_alter_state( << " res=" << res; return res; } - res = base_tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to save base tablet meta. res=" << res - << ", base_tablet=" << base_tablet->full_name(); - return res; - } - + base_tablet->save_meta(); AlterTabletTaskSharedPtr new_alter_task = new_tablet->alter_task(); if (new_alter_task == nullptr) { LOG(INFO) << "could not find alter task info from new tablet " << new_tablet->full_name(); @@ -1646,14 +1622,9 @@ OLAPStatus SchemaChangeHandler::_save_alter_state( << " res" << res; return res; } - res = new_tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to save new tablet meta. res=" << res - << ", new_tablet=" << base_tablet->full_name(); - return res; - } + new_tablet->save_meta(); - return res; + return OLAP_SUCCESS; } OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params) { @@ -1794,7 +1765,7 @@ PROCESS_ALTER_EXIT: { // save tablet meta here because rowset meta is not saved during add rowset WriteLock new_wlock(sc_params.new_tablet->get_header_lock_ptr()); - res = sc_params.new_tablet->save_meta(); + sc_params.new_tablet->save_meta(); } if (res == OLAP_SUCCESS) { Version test_version(0, end_version); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 8d96b9b64b..5119be06ff 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -70,21 +70,17 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) : _last_cumu_compaction_failure_millis(0), _last_base_compaction_failure_millis(0), _last_cumu_compaction_success_millis(0), - _last_base_compaction_success_millis(0) { + _last_base_compaction_success_millis(0), + _cumulative_point(kInvalidCumulativePoint) { _gen_tablet_path(); _rs_graph.construct_rowset_graph(_tablet_meta->all_rs_metas()); } -Tablet::~Tablet() { - _rs_version_map.clear(); - _inc_rs_version_map.clear(); -} - OLAPStatus Tablet::_init_once_action() { OLAPStatus res = OLAP_SUCCESS; VLOG(3) << "begin to load tablet. tablet=" << full_name() << ", version_size=" << _tablet_meta->version_count(); - for (auto& rs_meta : _tablet_meta->all_rs_metas()) { + for (const auto& rs_meta : _tablet_meta->all_rs_metas()) { Version version = rs_meta->version(); RowsetSharedPtr rowset; res = RowsetFactory::create_rowset(&_schema, _tablet_path, rs_meta, &rowset); @@ -115,7 +111,6 @@ OLAPStatus Tablet::_init_once_action() { _inc_rs_version_map[version] = std::move(rowset); } - _cumulative_point = -1; return res; } @@ -135,16 +130,12 @@ OLAPStatus Tablet::set_tablet_state(TabletState state) { // should save tablet meta to remote meta store // if it's a primary replica -OLAPStatus Tablet::save_meta() { - OLAPStatus res = _tablet_meta->save_meta(_data_dir); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to save tablet_meta. res=" << res << ", root=" << _data_dir->path(); - } +void Tablet::save_meta() { + auto res = _tablet_meta->save_meta(_data_dir); + CHECK_EQ(res, OLAP_SUCCESS) << "fail to save tablet_meta. res=" << res << ", root=" << _data_dir->path(); // User could directly update tablet schema by _tablet_meta, // So we need to refetch schema again _schema = _tablet_meta->tablet_schema(); - - return res; } OLAPStatus Tablet::revise_tablet_meta( @@ -169,19 +160,10 @@ OLAPStatus Tablet::revise_tablet_meta( << full_name() << ", version=" << version << "]"; } - if (res != OLAP_SUCCESS) { - break; - } - for (auto& rs_meta : rowsets_to_clone) { new_tablet_meta->add_rs_meta(rs_meta); } - - if (res != OLAP_SUCCESS) { - break; - } - - VLOG(3) << "load rowsets successfully when clone. tablet=" << full_name() + VLOG(3) << "load rowsets successfully when clone. tablet=" << full_name() << ", added rowset size=" << rowsets_to_clone.size(); // save and reload tablet_meta res = new_tablet_meta->save_meta(_data_dir); @@ -194,6 +176,7 @@ OLAPStatus Tablet::revise_tablet_meta( for (auto& version : versions_to_delete) { auto it = _rs_version_map.find(version); + DCHECK(it != _rs_version_map.end()); StorageEngine::instance()->add_unused_rowset(it->second); _rs_version_map.erase(it); } @@ -207,7 +190,7 @@ OLAPStatus Tablet::revise_tablet_meta( RowsetSharedPtr rowset; res = RowsetFactory::create_rowset(&_schema, _tablet_path, rs_meta, &rowset); if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to init rowset. version=" << version.first << "-" << version.second; + LOG(WARNING) << "fail to init rowset. version=" << version; return res; } _rs_version_map[version] = std::move(rowset); @@ -241,14 +224,10 @@ OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) { // but it should be removed in multi path version for (auto& it : _rs_version_map) { if (rowset->version().contains(it.first) && rowset->version() != it.first) { - if (it.second == nullptr) { - LOG(FATAL) << "there exist a version=" << it.first - << " contains the input rs with version=" << rowset->version() - << ", but the related rs is null"; - return OLAP_ERR_PUSH_ROWSET_NOT_FOUND; - } else { - rowsets_to_delete.push_back(it.second); - } + CHECK(it.second != nullptr) << "there exist a version=" << it.first + << " contains the input rs with version=" << rowset->version() + << ", but the related rs is null"; + rowsets_to_delete.push_back(it.second); } } modify_rowsets(std::vector(), rowsets_to_delete); @@ -266,31 +245,23 @@ OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) { return OLAP_SUCCESS; } -OLAPStatus Tablet::modify_rowsets(const vector& to_add, - const vector& to_delete) { +void Tablet::modify_rowsets(const vector& to_add, + const vector& to_delete) { vector rs_metas_to_add; for (auto& rs : to_add) { rs_metas_to_add.push_back(rs->rowset_meta()); + _rs_version_map[rs->version()] = rs; + ++_newly_created_rowset_num; } vector rs_metas_to_delete; for (auto& rs : to_delete) { rs_metas_to_delete.push_back(rs->rowset_meta()); + _rs_version_map.erase(rs->version()); } _tablet_meta->modify_rs_metas(rs_metas_to_add, rs_metas_to_delete); - for (auto& rs : to_delete) { - auto it = _rs_version_map.find(rs->version()); - _rs_version_map.erase(it); - } - - for (auto& rs : to_add) { - _rs_version_map[rs->version()] = rs; - ++_newly_created_rowset_num; - } - _rs_graph.reconstruct_rowset_graph(_tablet_meta->all_rs_metas()); - return OLAP_SUCCESS; } // snapshot manager may call this api to check if version exists, so that @@ -298,16 +269,14 @@ OLAPStatus Tablet::modify_rowsets(const vector& to_add, const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version) const { auto iter = _rs_version_map.find(version); if (iter == _rs_version_map.end()) { - VLOG(3) << "no rowset for version:" << version.first << "-" << version.second - << ", tablet: " << full_name(); + VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name(); return nullptr; } - RowsetSharedPtr rowset = iter->second; - return rowset; + return iter->second; } // This function only be called by SnapshotManager to perform incremental clone. -// It will be called under protected of _metalock(SnapshotManager will fetch it manually), +// It will be called under protected of _meta_lock(SnapshotManager will fetch it manually), // so it is no need to lock here. const RowsetSharedPtr Tablet::get_inc_rowset_by_version(const Version& version) const { auto iter = _inc_rs_version_map.find(version); @@ -315,8 +284,7 @@ const RowsetSharedPtr Tablet::get_inc_rowset_by_version(const Version& version) VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name(); return nullptr; } - RowsetSharedPtr rowset = iter->second; - return rowset; + return iter->second; } // Already under _meta_lock @@ -326,15 +294,10 @@ const RowsetSharedPtr Tablet::rowset_with_max_version() const { return nullptr; } - DCHECK(_rs_version_map.find(max_version) != _rs_version_map.end()) - << "invalid version:" << max_version; auto iter = _rs_version_map.find(max_version); - if (iter == _rs_version_map.end()) { - LOG(WARNING) << "no rowset for version:" << max_version; - return nullptr; - } - RowsetSharedPtr rowset = iter->second; - return rowset; + DCHECK(_rs_version_map.find(max_version) != _rs_version_map.end()) + << "invalid version:" << max_version; + return iter->second; } RowsetSharedPtr Tablet::_rowset_with_largest_size() { @@ -373,15 +336,12 @@ OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) { void Tablet::_delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash) { // delete incremental rowset from map - auto it = _inc_rs_version_map.find(version); - if (it != _inc_rs_version_map.end()) { - _inc_rs_version_map.erase(it); - } + _inc_rs_version_map.erase(version); + RowsetMetaSharedPtr rowset_meta = _tablet_meta->acquire_inc_rs_meta_by_version(version); if (rowset_meta == nullptr) { return; } - _tablet_meta->delete_inc_rs_meta_by_version(version); VLOG(3) << "delete incremental rowset. tablet=" << full_name() << ", version=" << version; } @@ -412,10 +372,7 @@ void Tablet::delete_expired_inc_rowsets() { << ", version=" << pair.first; } - if (save_meta() != OLAP_SUCCESS) { - LOG(FATAL) << "fail to save tablet_meta when delete expire incremental data." - << "tablet=" << full_name(); - } + save_meta(); } OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version, @@ -433,15 +390,13 @@ OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version, << ", missed version for version:" << spec_version; _print_missed_versions(missed_versions); } - return status; } return status; } OLAPStatus Tablet::check_version_integrity(const Version& version) { - vector span_versions; ReadLock rdlock(&_meta_lock); - return capture_consistent_versions(version, &span_versions); + return capture_consistent_versions(version, nullptr); } // If any rowset contains the specific version, it means the version already exist @@ -457,8 +412,9 @@ bool Tablet::check_version_exist(const Version& version) const { void Tablet::list_versions(vector* versions) const { DCHECK(versions != nullptr && versions->empty()); + versions->reserve(_rs_version_map.size()); // versions vector is not sorted. - for (auto& it : _rs_version_map) { + for (const auto& it : _rs_version_map) { versions->push_back(it.first); } } @@ -474,6 +430,7 @@ OLAPStatus Tablet::capture_consistent_rowsets(const Version& spec_version, OLAPStatus Tablet::_capture_consistent_rowsets_unlocked(const vector& version_path, vector* rowsets) const { DCHECK(rowsets != nullptr && rowsets->empty()); + rowsets->reserve(version_path.size()); for (auto& version : version_path) { auto it = _rs_version_map.find(version); if (it == _rs_version_map.end()) { @@ -496,7 +453,7 @@ OLAPStatus Tablet::capture_rs_readers(const Version& spec_version, OLAPStatus Tablet::capture_rs_readers(const vector& version_path, vector* rs_readers) const { - DCHECK(rs_readers != NULL && rs_readers->empty()); + DCHECK(rs_readers != nullptr && rs_readers->empty()); for (auto version : version_path) { auto it = _rs_version_map.find(version); if (it == _rs_version_map.end()) { @@ -535,10 +492,10 @@ AlterTabletTaskSharedPtr Tablet::alter_task() { return _tablet_meta->alter_task(); } -OLAPStatus Tablet::add_alter_task(int64_t related_tablet_id, - int32_t related_schema_hash, - const vector& versions_to_alter, - const AlterTabletType alter_type) { +void Tablet::add_alter_task(int64_t related_tablet_id, + int32_t related_schema_hash, + const vector& versions_to_alter, + const AlterTabletType alter_type) { AlterTabletTask alter_task; alter_task.set_alter_state(ALTER_RUNNING); alter_task.set_related_tablet_id(related_tablet_id); @@ -550,7 +507,6 @@ OLAPStatus Tablet::add_alter_task(int64_t related_tablet_id, << ", related_tablet_id " << related_tablet_id << ", related_schema_hash " << related_schema_hash << ", alter_type " << alter_type; - return OLAP_SUCCESS; } void Tablet::delete_alter_task() { @@ -569,17 +525,16 @@ OLAPStatus Tablet::recover_tablet_until_specfic_version(const int64_t& spec_vers bool Tablet::can_do_compaction() { // 如果table正在做schema change,则通过选路判断数据是否转换完成 - // 如果选路成功,则转换完成,可以进行BE - // 如果选路失败,则转换未完成,不能进行BE + // 如果选路成功,则转换完成,可以进行compaction + // 如果选路失败,则转换未完成,不能进行compaction ReadLock rdlock(&_meta_lock); const RowsetSharedPtr lastest_delta = rowset_with_max_version(); - if (lastest_delta == NULL) { + if (lastest_delta == nullptr) { return false; } Version test_version = Version(0, lastest_delta->end_version()); - vector path_versions; - if (OLAP_SUCCESS != capture_consistent_versions(test_version, &path_versions)) { + if (OLAP_SUCCESS != capture_consistent_versions(test_version, nullptr)) { return false; } @@ -627,7 +582,7 @@ const uint32_t Tablet::calc_base_compaction_score() const { } void Tablet::compute_version_hash_from_rowsets( - const std::vector& rowsets, VersionHash* version_hash) const { + const std::vector& rowsets, VersionHash* version_hash) { DCHECK(version_hash != nullptr) << "invalid parameter, version_hash is nullptr"; int64_t v_hash = 0; // version hash is useless since Doris version 0.11 @@ -637,7 +592,6 @@ void Tablet::compute_version_hash_from_rowsets( *version_hash = v_hash; } - void Tablet::calc_missed_versions(int64_t spec_version, vector* missed_versions) { ReadLock rdlock(&_meta_lock); calc_missed_versions_unlocked(spec_version, missed_versions); @@ -666,7 +620,7 @@ void Tablet::calc_missed_versions_unlocked(int64_t spec_version, for (const Version& version : existing_versions) { if (version.first > last_version + 1) { for (int64_t i = last_version + 1; i < version.first; ++i) { - missed_versions->emplace_back(i, i); + missed_versions->emplace_back(Version(i, i)); } } last_version = version.second; @@ -675,18 +629,18 @@ void Tablet::calc_missed_versions_unlocked(int64_t spec_version, } } for (int64_t i = last_version + 1; i <= spec_version; ++i) { - missed_versions->emplace_back(i, i); + missed_versions->emplace_back(Version(i, i)); } } -OLAPStatus Tablet::max_continuous_version_from_begining(Version* version, - VersionHash* v_hash) { +void Tablet::max_continuous_version_from_begining(Version* version, + VersionHash* v_hash) { ReadLock rdlock(&_meta_lock); - return _max_continuous_version_from_begining_unlocked(version, v_hash); + _max_continuous_version_from_begining_unlocked(version, v_hash); } -OLAPStatus Tablet::_max_continuous_version_from_begining_unlocked(Version* version, - VersionHash* v_hash) const { +void Tablet::_max_continuous_version_from_begining_unlocked(Version* version, + VersionHash* v_hash) const { vector> existing_versions; for (auto& rs : _tablet_meta->all_rs_metas()) { existing_versions.emplace_back(rs->version() , rs->version_hash()); @@ -710,15 +664,14 @@ OLAPStatus Tablet::_max_continuous_version_from_begining_unlocked(Version* versi } *version = max_continuous_version; *v_hash = max_continuous_version_hash; - return OLAP_SUCCESS; } -OLAPStatus Tablet::calculate_cumulative_point() { +void Tablet::calculate_cumulative_point() { WriteLock wrlock(&_meta_lock); - if (_cumulative_point != -1) { + if (_cumulative_point != kInvalidCumulativePoint) { // only calculate the point once. // after that, cumulative point will be updated along with compaction process. - return OLAP_SUCCESS; + return; } std::list existing_rss; @@ -755,21 +708,15 @@ OLAPStatus Tablet::calculate_cumulative_point() { prev_version = rs->version().second; _cumulative_point = prev_version + 1; } - return OLAP_SUCCESS; } OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings, const OlapTuple& end_key_strings, uint64_t request_block_row_count, vector* ranges) { - if (ranges == nullptr) { - LOG(WARNING) << "parameter end_row is null."; - return OLAP_ERR_INPUT_PARAMETER_ERROR; - } + DCHECK(ranges != nullptr); RowCursor start_key; - RowCursor end_key; - // 如果有startkey,用startkey初始化;反之则用minkey初始化 if (start_key_strings.size() > 0) { if (start_key.init_scan_key(_schema, start_key_strings.values()) != OLAP_SUCCESS) { @@ -791,6 +738,7 @@ OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings, start_key.build_min_key(); } + RowCursor end_key; // 和startkey一样处理,没有则用maxkey初始化 if (end_key_strings.size() > 0) { if (OLAP_SUCCESS != end_key.init_scan_key(_schema, end_key_strings.values())) { @@ -832,17 +780,17 @@ void Tablet::delete_all_files() { // we have to call list_versions first, or else error occurs when // removing hash_map item and iterating hash_map concurrently. ReadLock rdlock(&_meta_lock); - for (auto it = _rs_version_map.begin(); it != _rs_version_map.end(); ++it) { - it->second->remove(); + for (auto it : _rs_version_map) { + it.second->remove(); } _rs_version_map.clear(); - for (auto it = _inc_rs_version_map.begin(); it != _inc_rs_version_map.end(); ++it) { - it->second->remove(); + for (auto it : _inc_rs_version_map) { + it.second->remove(); } _inc_rs_version_map.clear(); } -bool Tablet::check_path(const std::string& path_to_check) { +bool Tablet::check_path(const std::string& path_to_check) const { ReadLock rdlock(&_meta_lock); if (path_to_check == _tablet_path) { return true; @@ -911,14 +859,10 @@ OLAPStatus Tablet::_contains_version(const Version& version) { // because the value type is std::shared_ptr, when will it be nullptr? // In addition, in this class, there are many places that do not make this judgment // when access _rs_version_map's value. - if (it.second == nullptr) { - LOG(FATAL) << "there exist a version=" << it.first - << " contains the input rs with version=" << version - << ", but the related rs is null"; - return OLAP_ERR_PUSH_ROWSET_NOT_FOUND; - } else { - return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST; - } + CHECK(it.second != nullptr) << "there exist a version=" << it.first + << " contains the input rs with version=" << version + << ", but the related rs is null"; + return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST; } } @@ -954,7 +898,7 @@ void Tablet::pick_candicate_rowsets_to_base_compaction(vector* } // For http compaction action -OLAPStatus Tablet::get_compaction_status(std::string* json_result) { +void Tablet::get_compaction_status(std::string* json_result) { rapidjson::Document root; root.SetObject(); @@ -962,10 +906,13 @@ OLAPStatus Tablet::get_compaction_status(std::string* json_result) { std::vector delete_flags; { ReadLock rdlock(&_meta_lock); + rowsets.reserve(_rs_version_map.size()); for (auto& it : _rs_version_map) { rowsets.push_back(it.second); } std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator); + + delete_flags.reserve(rowsets.size()); for (auto& rs : rowsets) { delete_flags.push_back(version_for_delete_predicate(rs->version())); } @@ -1008,29 +955,28 @@ OLAPStatus Tablet::get_compaction_status(std::string* json_result) { rapidjson::PrettyWriter writer(strbuf); root.Accept(writer); *json_result = std::string(strbuf.GetString()); - - return OLAP_SUCCESS; } -OLAPStatus Tablet::do_tablet_meta_checkpoint() { +void Tablet::do_tablet_meta_checkpoint() { WriteLock store_lock(&_meta_store_lock); if (_newly_created_rowset_num == 0) { - return OLAP_SUCCESS; + return; } if (UnixMillis() - _last_checkpoint_time < config::tablet_meta_checkpoint_min_interval_secs * 1000 && _newly_created_rowset_num < config::tablet_meta_checkpoint_min_new_rowsets_num) { - return OLAP_SUCCESS; + return; } + // hold read-lock other than write-lock, because it will not modify meta structure ReadLock rdlock(&_meta_lock); if (tablet_state() != TABLET_RUNNING) { LOG(INFO) << "tablet is under state=" << tablet_state() << ", not running, skip do checkpoint" << ", tablet=" << full_name(); - return OLAP_SUCCESS; + return; } LOG(INFO) << "start to do tablet meta checkpoint, tablet=" << full_name(); - RETURN_NOT_OK(save_meta()); + save_meta(); // if save meta successfully, then should remove the rowset meta existing in tablet // meta from rowset meta store for (auto& rs_meta : _tablet_meta->all_rs_metas()) { @@ -1048,7 +994,6 @@ OLAPStatus Tablet::do_tablet_meta_checkpoint() { } _newly_created_rowset_num = 0; _last_checkpoint_time = UnixMillis(); - return OLAP_SUCCESS; } bool Tablet::rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta) { @@ -1131,7 +1076,7 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) { // there are some rowset meta in local meta store and in in-memory tablet meta // but not in tablet meta in local meta store // TODO(lingbin): do we need _meta_lock? -void Tablet::generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) { +void Tablet::generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const { TabletMetaPB tablet_meta_pb; _tablet_meta->to_meta_pb(&tablet_meta_pb); new_tablet_meta->init_from_pb(tablet_meta_pb); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index bb64f0f3a5..8cb65654d8 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -51,7 +51,6 @@ public: DataDir* data_dir = nullptr); Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir); - ~Tablet(); OLAPStatus init(); inline bool init_succeeded(); @@ -69,12 +68,11 @@ public: // Property encapsulated in TabletMeta inline const TabletMetaSharedPtr tablet_meta(); - OLAPStatus save_meta(); + void save_meta(); // Used in clone task, to update local meta when finishing a clone job OLAPStatus revise_tablet_meta(const std::vector& rowsets_to_clone, const std::vector& versions_to_delete); - inline TabletUid tablet_uid() const; inline int64_t table_id() const; // Returns a string can be used to uniquely identify a tablet. @@ -111,8 +109,8 @@ public: // operation in rowsets OLAPStatus add_rowset(RowsetSharedPtr rowset, bool need_persist = true); - OLAPStatus modify_rowsets(const vector& to_add, - const vector& to_delete); + void modify_rowsets(const vector& to_add, + const vector& to_delete); // _rs_version_map and _inc_rs_version_map should be protected by _meta_lock // The caller must call hold _meta_lock when call this two function. @@ -144,7 +142,7 @@ public: // message for alter task AlterTabletTaskSharedPtr alter_task(); - OLAPStatus add_alter_task(int64_t related_tablet_id, int32_t related_schema_hash, + void add_alter_task(int64_t related_tablet_id, int32_t related_schema_hash, const vector& versions_to_alter, const AlterTabletType alter_type); void delete_alter_task(); @@ -177,8 +175,8 @@ public: bool can_do_compaction(); const uint32_t calc_cumulative_compaction_score() const; const uint32_t calc_base_compaction_score() const; - void compute_version_hash_from_rowsets(const std::vector& rowsets, - VersionHash* version_hash) const; + static void compute_version_hash_from_rowsets(const std::vector& rowsets, + VersionHash* version_hash); // operation for clone void calc_missed_versions(int64_t spec_version, vector* missed_versions); @@ -187,7 +185,7 @@ public: // This function to find max continous version from the beginning. // For example: If there are 1, 2, 3, 5, 6, 7 versions belongs tablet, then 3 is target. - OLAPStatus max_continuous_version_from_begining(Version* version, VersionHash* v_hash); + void max_continuous_version_from_begining(Version* version, VersionHash* v_hash); // operation for query OLAPStatus split_range( @@ -217,7 +215,7 @@ public: void delete_all_files(); - bool check_path(const std::string& check_path); + bool check_path(const std::string& check_path) const; bool check_rowset_id(const RowsetId& rowset_id); OLAPStatus set_partition_id(int64_t partition_id); @@ -228,7 +226,7 @@ public: std::vector* candidate_rowsets); void pick_candicate_rowsets_to_base_compaction(std::vector* candidate_rowsets); - OLAPStatus calculate_cumulative_point(); + void calculate_cumulative_point(); // TODO(ygl): inline bool is_primary_replica() { return false; } @@ -237,24 +235,24 @@ public: // eco mode also means save money in palo inline bool in_eco_mode() { return false; } - OLAPStatus do_tablet_meta_checkpoint(); + void do_tablet_meta_checkpoint(); bool rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta); void build_tablet_report_info(TTabletInfo* tablet_info); - void generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta); + void generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const; // return a json string to show the compaction status of this tablet - OLAPStatus get_compaction_status(std::string* json_result); + void get_compaction_status(std::string* json_result); private: OLAPStatus _init_once_action(); void _print_missed_versions(const std::vector& missed_versions) const; bool _contains_rowset(const RowsetId rowset_id); OLAPStatus _contains_version(const Version& version); - OLAPStatus _max_continuous_version_from_begining_unlocked(Version* version, - VersionHash* v_hash) const ; + void _max_continuous_version_from_begining_unlocked(Version* version, + VersionHash* v_hash) const ; void _gen_tablet_path(); RowsetSharedPtr _rowset_with_largest_size(); void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash); @@ -262,6 +260,7 @@ private: vector* rowsets) const; private: + static const int64_t kInvalidCumulativePoint = -1; TabletState _state; TabletMetaSharedPtr _tablet_meta; TabletSchema _schema; @@ -281,7 +280,7 @@ private: // TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to // explain how these two locks work together. - RWMutex _meta_lock; + mutable RWMutex _meta_lock; // A new load job will produce a new rowset, which will be inserted into both _rs_version_map // and _inc_rs_version_map. Only the most recent rowsets are kept in _inc_rs_version_map to // reduce the amount of data that needs to be copied during the clone task. diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index ca78d05549..5239e1c066 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -170,9 +170,7 @@ OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, Schem OLAPStatus res = OLAP_SUCCESS; if (update_meta) { // call tablet save meta in order to valid the meta - RETURN_NOT_OK_LOG(tablet->save_meta(), Substitute( - "failed to save new tablet's meta. tablet_id=$0, schema_hash=$1", - tablet_id, schema_hash)); + tablet->save_meta(); } if (drop_old) { // If the new tablet is fresher than the existing one, then replace @@ -529,11 +527,7 @@ OLAPStatus TabletManager::_drop_tablet_unlocked( && related_alter_task->related_tablet_id() == tablet_id && related_alter_task->related_schema_hash() == schema_hash) { related_tablet->delete_alter_task(); - res = related_tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to save tablet header. res=" << res - << ", tablet=" << related_tablet->full_name(); - } + related_tablet->save_meta(); } related_tablet->release_header_lock(); res = _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files); @@ -1340,8 +1334,7 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked( // and the tablet will be loaded at restart time. // To avoid this exception, we first set the state of the tablet to `SHUTDOWN`. tablet->set_tablet_state(TABLET_SHUTDOWN); - RETURN_NOT_OK_LOG(tablet->save_meta(), Substitute( - "fail to drop tablet.tablet_id=$0, schema_hash=$1", tablet_id, schema_hash)); + tablet->save_meta(); { WriteLock wlock(&_shutdown_tablets_lock); _shutdown_tablets.push_back(tablet);