[Tablet] A small refactor on class Tablet (#3339)

There is no functional changes in this patch.
Key refactor points are:
- Remove meaningless return value of functions in class Tablet, and
  also some related functions in other classes
- Allow RowsetGraph::capture_consistent_versions to pass a nullptr
  to the output parameter
- Use CHECK instead of LOG(FATAL) to simplify code
This commit is contained in:
Yingchun Lai
2020-04-24 22:22:26 +08:00
committed by GitHub
parent 0e66385235
commit 37fccd53c4
9 changed files with 125 additions and 241 deletions

View File

@ -56,10 +56,7 @@ Status CompactionAction::_handle_show_compaction(HttpRequest *req, std::string*
return Status::NotFound("Tablet not found");
}
OLAPStatus s = tablet->get_compaction_status(json_result);
if (s != OLAP_SUCCESS) {
return Status::InternalError(strings::Substitute("failed to get tablet compaction status. res $0", s));
}
tablet->get_compaction_status(json_result);
return Status::OK();
}

View File

@ -90,7 +90,7 @@ OLAPStatus Compaction::do_compaction_impl() {
RETURN_NOT_OK(check_correctness(stats));
// 4. modify rowsets in memory
RETURN_NOT_OK(modify_rowsets());
modify_rowsets();
// 5. update last success compaction time
int64_t now = UnixMillis();
@ -140,30 +140,13 @@ OLAPStatus Compaction::construct_input_rowset_readers() {
return OLAP_SUCCESS;
}
OLAPStatus Compaction::modify_rowsets() {
void Compaction::modify_rowsets() {
std::vector<RowsetSharedPtr> output_rowsets;
output_rowsets.push_back(_output_rowset);
WriteLock wrlock(_tablet->get_header_lock_ptr());
OLAPStatus res = _tablet->modify_rowsets(output_rowsets, _input_rowsets);
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to replace data sources. res" << res
<< ", tablet=" << _tablet->full_name()
<< ", compaction__version=" << _output_version.first
<< "-" << _output_version.second;
return res;
}
res = _tablet->save_meta();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save tablet meta. res=" << res
<< ", tablet=" << _tablet->full_name()
<< ", compaction_version=" << _output_version.first
<< "-" << _output_version.second;
return OLAP_ERR_BE_SAVE_HEADER_ERROR;
}
return OLAP_SUCCESS;
_tablet->modify_rowsets(output_rowsets, _input_rowsets);
_tablet->save_meta();
}
OLAPStatus Compaction::gc_unused_rowsets() {

View File

@ -59,7 +59,7 @@ protected:
OLAPStatus do_compaction();
OLAPStatus do_compaction_impl();
OLAPStatus modify_rowsets();
void modify_rowsets();
OLAPStatus gc_unused_rowsets();
OLAPStatus construct_output_rowset_writer();

View File

@ -40,7 +40,7 @@ OLAPStatus CumulativeCompaction::compact() {
}
// 1.calculate cumulative point
RETURN_NOT_OK(_tablet->calculate_cumulative_point());
_tablet->calculate_cumulative_point();
// 2. pick rowsets to compact
RETURN_NOT_OK(pick_rowsets_to_compact());

View File

@ -150,11 +150,6 @@ OLAPStatus RowsetGraph::capture_consistent_versions(const Version& spec_version,
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
if (version_path == nullptr) {
LOG(WARNING) << "param version_path is nullptr.";
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
// bfs_queue's element is vertex_index.
std::queue<int64_t> bfs_queue;
// predecessor[i] means the predecessor of vertex_index 'i'.
@ -227,25 +222,26 @@ OLAPStatus RowsetGraph::capture_consistent_versions(const Version& spec_version,
reversed_path.push_back(tmp_vertex_index);
}
// Make version_path from reversed_path.
std::stringstream shortest_path_for_debug;
for (size_t path_id = reversed_path.size() - 1; path_id > 0; --path_id) {
int64_t tmp_start_vertex_value = _version_graph[reversed_path[path_id]].value;
int64_t tmp_end_vertex_value = _version_graph[reversed_path[path_id - 1]].value;
if (version_path != nullptr) {
// Make version_path from reversed_path.
std::stringstream shortest_path_for_debug;
for (size_t path_id = reversed_path.size() - 1; path_id > 0; --path_id) {
int64_t tmp_start_vertex_value = _version_graph[reversed_path[path_id]].value;
int64_t tmp_end_vertex_value = _version_graph[reversed_path[path_id - 1]].value;
// tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value
if (tmp_start_vertex_value <= tmp_end_vertex_value) {
version_path->emplace_back(tmp_start_vertex_value, tmp_end_vertex_value - 1);
} else {
version_path->emplace_back(tmp_end_vertex_value, tmp_start_vertex_value - 1);
// tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value
if (tmp_start_vertex_value <= tmp_end_vertex_value) {
version_path->emplace_back(tmp_start_vertex_value, tmp_end_vertex_value - 1);
} else {
version_path->emplace_back(tmp_end_vertex_value, tmp_start_vertex_value - 1);
}
shortest_path_for_debug << (*version_path)[version_path->size() - 1] << ' ';
}
shortest_path_for_debug << (*version_path)[version_path->size() - 1] << ' ';
VLOG(10) << "success to find path for spec_version. spec_version=" << spec_version
<< ", path=" << shortest_path_for_debug.str();
}
VLOG(10) << "success to find path for spec_version. spec_version=" << spec_version
<< ", path=" << shortest_path_for_debug.str();
return OLAP_SUCCESS;
}

View File

@ -1322,11 +1322,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
// check if new_tablet.ce_point > base_tablet.ce_point?
new_tablet->set_cumulative_layer_point(-1);
// save tablet meta
res = new_tablet->save_meta();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save tablet meta after remove rowset from new tablet"
<< new_tablet->full_name();
}
new_tablet->save_meta();
for (auto& rowset : rowsets_to_delete) {
// do not call rowset.remove directly, using gc thread to delete it
StorageEngine::instance()->add_unused_rowset(rowset);
@ -1399,10 +1395,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
if (res != OLAP_SUCCESS) {
break;
}
res = new_tablet->save_meta();
if (res != OLAP_SUCCESS) {
break;
}
new_tablet->save_meta();
} while(0);
if (res == OLAP_SUCCESS) {
@ -1588,25 +1581,14 @@ OLAPStatus SchemaChangeHandler::_add_alter_task(
new_tablet->schema_hash(),
versions_to_be_changed,
alter_tablet_type);
OLAPStatus res = base_tablet->save_meta();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save base tablet meta. res=" << res
<< ", tablet=" << base_tablet->full_name();
return res;
}
base_tablet->save_meta();
new_tablet->add_alter_task(base_tablet->tablet_id(),
base_tablet->schema_hash(),
vector<Version>(), // empty versions
alter_tablet_type);
res = new_tablet->save_meta();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save new tablet meta. res=" << res
<< ", tablet=" << new_tablet->full_name();
return res;
}
new_tablet->save_meta();
LOG(INFO) << "successfully add alter task to both base and new";
return res;
return OLAP_SUCCESS;
}
OLAPStatus SchemaChangeHandler::_save_alter_state(
@ -1627,13 +1609,7 @@ OLAPStatus SchemaChangeHandler::_save_alter_state(
<< " res=" << res;
return res;
}
res = base_tablet->save_meta();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save base tablet meta. res=" << res
<< ", base_tablet=" << base_tablet->full_name();
return res;
}
base_tablet->save_meta();
AlterTabletTaskSharedPtr new_alter_task = new_tablet->alter_task();
if (new_alter_task == nullptr) {
LOG(INFO) << "could not find alter task info from new tablet " << new_tablet->full_name();
@ -1646,14 +1622,9 @@ OLAPStatus SchemaChangeHandler::_save_alter_state(
<< " res" << res;
return res;
}
res = new_tablet->save_meta();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save new tablet meta. res=" << res
<< ", new_tablet=" << base_tablet->full_name();
return res;
}
new_tablet->save_meta();
return res;
return OLAP_SUCCESS;
}
OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params) {
@ -1794,7 +1765,7 @@ PROCESS_ALTER_EXIT:
{
// save tablet meta here because rowset meta is not saved during add rowset
WriteLock new_wlock(sc_params.new_tablet->get_header_lock_ptr());
res = sc_params.new_tablet->save_meta();
sc_params.new_tablet->save_meta();
}
if (res == OLAP_SUCCESS) {
Version test_version(0, end_version);

View File

@ -70,21 +70,17 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) :
_last_cumu_compaction_failure_millis(0),
_last_base_compaction_failure_millis(0),
_last_cumu_compaction_success_millis(0),
_last_base_compaction_success_millis(0) {
_last_base_compaction_success_millis(0),
_cumulative_point(kInvalidCumulativePoint) {
_gen_tablet_path();
_rs_graph.construct_rowset_graph(_tablet_meta->all_rs_metas());
}
Tablet::~Tablet() {
_rs_version_map.clear();
_inc_rs_version_map.clear();
}
OLAPStatus Tablet::_init_once_action() {
OLAPStatus res = OLAP_SUCCESS;
VLOG(3) << "begin to load tablet. tablet=" << full_name()
<< ", version_size=" << _tablet_meta->version_count();
for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
Version version = rs_meta->version();
RowsetSharedPtr rowset;
res = RowsetFactory::create_rowset(&_schema, _tablet_path, rs_meta, &rowset);
@ -115,7 +111,6 @@ OLAPStatus Tablet::_init_once_action() {
_inc_rs_version_map[version] = std::move(rowset);
}
_cumulative_point = -1;
return res;
}
@ -135,16 +130,12 @@ OLAPStatus Tablet::set_tablet_state(TabletState state) {
// should save tablet meta to remote meta store
// if it's a primary replica
OLAPStatus Tablet::save_meta() {
OLAPStatus res = _tablet_meta->save_meta(_data_dir);
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save tablet_meta. res=" << res << ", root=" << _data_dir->path();
}
void Tablet::save_meta() {
auto res = _tablet_meta->save_meta(_data_dir);
CHECK_EQ(res, OLAP_SUCCESS) << "fail to save tablet_meta. res=" << res << ", root=" << _data_dir->path();
// User could directly update tablet schema by _tablet_meta,
// So we need to refetch schema again
_schema = _tablet_meta->tablet_schema();
return res;
}
OLAPStatus Tablet::revise_tablet_meta(
@ -169,19 +160,10 @@ OLAPStatus Tablet::revise_tablet_meta(
<< full_name() << ", version=" << version << "]";
}
if (res != OLAP_SUCCESS) {
break;
}
for (auto& rs_meta : rowsets_to_clone) {
new_tablet_meta->add_rs_meta(rs_meta);
}
if (res != OLAP_SUCCESS) {
break;
}
VLOG(3) << "load rowsets successfully when clone. tablet=" << full_name()
VLOG(3) << "load rowsets successfully when clone. tablet=" << full_name()
<< ", added rowset size=" << rowsets_to_clone.size();
// save and reload tablet_meta
res = new_tablet_meta->save_meta(_data_dir);
@ -194,6 +176,7 @@ OLAPStatus Tablet::revise_tablet_meta(
for (auto& version : versions_to_delete) {
auto it = _rs_version_map.find(version);
DCHECK(it != _rs_version_map.end());
StorageEngine::instance()->add_unused_rowset(it->second);
_rs_version_map.erase(it);
}
@ -207,7 +190,7 @@ OLAPStatus Tablet::revise_tablet_meta(
RowsetSharedPtr rowset;
res = RowsetFactory::create_rowset(&_schema, _tablet_path, rs_meta, &rowset);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init rowset. version=" << version.first << "-" << version.second;
LOG(WARNING) << "fail to init rowset. version=" << version;
return res;
}
_rs_version_map[version] = std::move(rowset);
@ -241,14 +224,10 @@ OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
// but it should be removed in multi path version
for (auto& it : _rs_version_map) {
if (rowset->version().contains(it.first) && rowset->version() != it.first) {
if (it.second == nullptr) {
LOG(FATAL) << "there exist a version=" << it.first
<< " contains the input rs with version=" << rowset->version()
<< ", but the related rs is null";
return OLAP_ERR_PUSH_ROWSET_NOT_FOUND;
} else {
rowsets_to_delete.push_back(it.second);
}
CHECK(it.second != nullptr) << "there exist a version=" << it.first
<< " contains the input rs with version=" << rowset->version()
<< ", but the related rs is null";
rowsets_to_delete.push_back(it.second);
}
}
modify_rowsets(std::vector<RowsetSharedPtr>(), rowsets_to_delete);
@ -266,31 +245,23 @@ OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
return OLAP_SUCCESS;
}
OLAPStatus Tablet::modify_rowsets(const vector<RowsetSharedPtr>& to_add,
const vector<RowsetSharedPtr>& to_delete) {
void Tablet::modify_rowsets(const vector<RowsetSharedPtr>& to_add,
const vector<RowsetSharedPtr>& to_delete) {
vector<RowsetMetaSharedPtr> rs_metas_to_add;
for (auto& rs : to_add) {
rs_metas_to_add.push_back(rs->rowset_meta());
_rs_version_map[rs->version()] = rs;
++_newly_created_rowset_num;
}
vector<RowsetMetaSharedPtr> rs_metas_to_delete;
for (auto& rs : to_delete) {
rs_metas_to_delete.push_back(rs->rowset_meta());
_rs_version_map.erase(rs->version());
}
_tablet_meta->modify_rs_metas(rs_metas_to_add, rs_metas_to_delete);
for (auto& rs : to_delete) {
auto it = _rs_version_map.find(rs->version());
_rs_version_map.erase(it);
}
for (auto& rs : to_add) {
_rs_version_map[rs->version()] = rs;
++_newly_created_rowset_num;
}
_rs_graph.reconstruct_rowset_graph(_tablet_meta->all_rs_metas());
return OLAP_SUCCESS;
}
// snapshot manager may call this api to check if version exists, so that
@ -298,16 +269,14 @@ OLAPStatus Tablet::modify_rowsets(const vector<RowsetSharedPtr>& to_add,
const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version) const {
auto iter = _rs_version_map.find(version);
if (iter == _rs_version_map.end()) {
VLOG(3) << "no rowset for version:" << version.first << "-" << version.second
<< ", tablet: " << full_name();
VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name();
return nullptr;
}
RowsetSharedPtr rowset = iter->second;
return rowset;
return iter->second;
}
// This function only be called by SnapshotManager to perform incremental clone.
// It will be called under protected of _metalock(SnapshotManager will fetch it manually),
// It will be called under protected of _meta_lock(SnapshotManager will fetch it manually),
// so it is no need to lock here.
const RowsetSharedPtr Tablet::get_inc_rowset_by_version(const Version& version) const {
auto iter = _inc_rs_version_map.find(version);
@ -315,8 +284,7 @@ const RowsetSharedPtr Tablet::get_inc_rowset_by_version(const Version& version)
VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name();
return nullptr;
}
RowsetSharedPtr rowset = iter->second;
return rowset;
return iter->second;
}
// Already under _meta_lock
@ -326,15 +294,10 @@ const RowsetSharedPtr Tablet::rowset_with_max_version() const {
return nullptr;
}
DCHECK(_rs_version_map.find(max_version) != _rs_version_map.end())
<< "invalid version:" << max_version;
auto iter = _rs_version_map.find(max_version);
if (iter == _rs_version_map.end()) {
LOG(WARNING) << "no rowset for version:" << max_version;
return nullptr;
}
RowsetSharedPtr rowset = iter->second;
return rowset;
DCHECK(_rs_version_map.find(max_version) != _rs_version_map.end())
<< "invalid version:" << max_version;
return iter->second;
}
RowsetSharedPtr Tablet::_rowset_with_largest_size() {
@ -373,15 +336,12 @@ OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) {
void Tablet::_delete_inc_rowset_by_version(const Version& version,
const VersionHash& version_hash) {
// delete incremental rowset from map
auto it = _inc_rs_version_map.find(version);
if (it != _inc_rs_version_map.end()) {
_inc_rs_version_map.erase(it);
}
_inc_rs_version_map.erase(version);
RowsetMetaSharedPtr rowset_meta = _tablet_meta->acquire_inc_rs_meta_by_version(version);
if (rowset_meta == nullptr) {
return;
}
_tablet_meta->delete_inc_rs_meta_by_version(version);
VLOG(3) << "delete incremental rowset. tablet=" << full_name() << ", version=" << version;
}
@ -412,10 +372,7 @@ void Tablet::delete_expired_inc_rowsets() {
<< ", version=" << pair.first;
}
if (save_meta() != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save tablet_meta when delete expire incremental data."
<< "tablet=" << full_name();
}
save_meta();
}
OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
@ -433,15 +390,13 @@ OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
<< ", missed version for version:" << spec_version;
_print_missed_versions(missed_versions);
}
return status;
}
return status;
}
OLAPStatus Tablet::check_version_integrity(const Version& version) {
vector<Version> span_versions;
ReadLock rdlock(&_meta_lock);
return capture_consistent_versions(version, &span_versions);
return capture_consistent_versions(version, nullptr);
}
// If any rowset contains the specific version, it means the version already exist
@ -457,8 +412,9 @@ bool Tablet::check_version_exist(const Version& version) const {
void Tablet::list_versions(vector<Version>* versions) const {
DCHECK(versions != nullptr && versions->empty());
versions->reserve(_rs_version_map.size());
// versions vector is not sorted.
for (auto& it : _rs_version_map) {
for (const auto& it : _rs_version_map) {
versions->push_back(it.first);
}
}
@ -474,6 +430,7 @@ OLAPStatus Tablet::capture_consistent_rowsets(const Version& spec_version,
OLAPStatus Tablet::_capture_consistent_rowsets_unlocked(const vector<Version>& version_path,
vector<RowsetSharedPtr>* rowsets) const {
DCHECK(rowsets != nullptr && rowsets->empty());
rowsets->reserve(version_path.size());
for (auto& version : version_path) {
auto it = _rs_version_map.find(version);
if (it == _rs_version_map.end()) {
@ -496,7 +453,7 @@ OLAPStatus Tablet::capture_rs_readers(const Version& spec_version,
OLAPStatus Tablet::capture_rs_readers(const vector<Version>& version_path,
vector<RowsetReaderSharedPtr>* rs_readers) const {
DCHECK(rs_readers != NULL && rs_readers->empty());
DCHECK(rs_readers != nullptr && rs_readers->empty());
for (auto version : version_path) {
auto it = _rs_version_map.find(version);
if (it == _rs_version_map.end()) {
@ -535,10 +492,10 @@ AlterTabletTaskSharedPtr Tablet::alter_task() {
return _tablet_meta->alter_task();
}
OLAPStatus Tablet::add_alter_task(int64_t related_tablet_id,
int32_t related_schema_hash,
const vector<Version>& versions_to_alter,
const AlterTabletType alter_type) {
void Tablet::add_alter_task(int64_t related_tablet_id,
int32_t related_schema_hash,
const vector<Version>& versions_to_alter,
const AlterTabletType alter_type) {
AlterTabletTask alter_task;
alter_task.set_alter_state(ALTER_RUNNING);
alter_task.set_related_tablet_id(related_tablet_id);
@ -550,7 +507,6 @@ OLAPStatus Tablet::add_alter_task(int64_t related_tablet_id,
<< ", related_tablet_id " << related_tablet_id
<< ", related_schema_hash " << related_schema_hash
<< ", alter_type " << alter_type;
return OLAP_SUCCESS;
}
void Tablet::delete_alter_task() {
@ -569,17 +525,16 @@ OLAPStatus Tablet::recover_tablet_until_specfic_version(const int64_t& spec_vers
bool Tablet::can_do_compaction() {
// 如果table正在做schema change,则通过选路判断数据是否转换完成
// 如果选路成功,则转换完成,可以进行BE
// 如果选路失败,则转换未完成,不能进行BE
// 如果选路成功,则转换完成,可以进行compaction
// 如果选路失败,则转换未完成,不能进行compaction
ReadLock rdlock(&_meta_lock);
const RowsetSharedPtr lastest_delta = rowset_with_max_version();
if (lastest_delta == NULL) {
if (lastest_delta == nullptr) {
return false;
}
Version test_version = Version(0, lastest_delta->end_version());
vector<Version> path_versions;
if (OLAP_SUCCESS != capture_consistent_versions(test_version, &path_versions)) {
if (OLAP_SUCCESS != capture_consistent_versions(test_version, nullptr)) {
return false;
}
@ -627,7 +582,7 @@ const uint32_t Tablet::calc_base_compaction_score() const {
}
void Tablet::compute_version_hash_from_rowsets(
const std::vector<RowsetSharedPtr>& rowsets, VersionHash* version_hash) const {
const std::vector<RowsetSharedPtr>& rowsets, VersionHash* version_hash) {
DCHECK(version_hash != nullptr) << "invalid parameter, version_hash is nullptr";
int64_t v_hash = 0;
// version hash is useless since Doris version 0.11
@ -637,7 +592,6 @@ void Tablet::compute_version_hash_from_rowsets(
*version_hash = v_hash;
}
void Tablet::calc_missed_versions(int64_t spec_version, vector<Version>* missed_versions) {
ReadLock rdlock(&_meta_lock);
calc_missed_versions_unlocked(spec_version, missed_versions);
@ -666,7 +620,7 @@ void Tablet::calc_missed_versions_unlocked(int64_t spec_version,
for (const Version& version : existing_versions) {
if (version.first > last_version + 1) {
for (int64_t i = last_version + 1; i < version.first; ++i) {
missed_versions->emplace_back(i, i);
missed_versions->emplace_back(Version(i, i));
}
}
last_version = version.second;
@ -675,18 +629,18 @@ void Tablet::calc_missed_versions_unlocked(int64_t spec_version,
}
}
for (int64_t i = last_version + 1; i <= spec_version; ++i) {
missed_versions->emplace_back(i, i);
missed_versions->emplace_back(Version(i, i));
}
}
OLAPStatus Tablet::max_continuous_version_from_begining(Version* version,
VersionHash* v_hash) {
void Tablet::max_continuous_version_from_begining(Version* version,
VersionHash* v_hash) {
ReadLock rdlock(&_meta_lock);
return _max_continuous_version_from_begining_unlocked(version, v_hash);
_max_continuous_version_from_begining_unlocked(version, v_hash);
}
OLAPStatus Tablet::_max_continuous_version_from_begining_unlocked(Version* version,
VersionHash* v_hash) const {
void Tablet::_max_continuous_version_from_begining_unlocked(Version* version,
VersionHash* v_hash) const {
vector<pair<Version, VersionHash>> existing_versions;
for (auto& rs : _tablet_meta->all_rs_metas()) {
existing_versions.emplace_back(rs->version() , rs->version_hash());
@ -710,15 +664,14 @@ OLAPStatus Tablet::_max_continuous_version_from_begining_unlocked(Version* versi
}
*version = max_continuous_version;
*v_hash = max_continuous_version_hash;
return OLAP_SUCCESS;
}
OLAPStatus Tablet::calculate_cumulative_point() {
void Tablet::calculate_cumulative_point() {
WriteLock wrlock(&_meta_lock);
if (_cumulative_point != -1) {
if (_cumulative_point != kInvalidCumulativePoint) {
// only calculate the point once.
// after that, cumulative point will be updated along with compaction process.
return OLAP_SUCCESS;
return;
}
std::list<RowsetMetaSharedPtr> existing_rss;
@ -755,21 +708,15 @@ OLAPStatus Tablet::calculate_cumulative_point() {
prev_version = rs->version().second;
_cumulative_point = prev_version + 1;
}
return OLAP_SUCCESS;
}
OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings,
const OlapTuple& end_key_strings,
uint64_t request_block_row_count,
vector<OlapTuple>* ranges) {
if (ranges == nullptr) {
LOG(WARNING) << "parameter end_row is null.";
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
DCHECK(ranges != nullptr);
RowCursor start_key;
RowCursor end_key;
// 如果有startkey,用startkey初始化;反之则用minkey初始化
if (start_key_strings.size() > 0) {
if (start_key.init_scan_key(_schema, start_key_strings.values()) != OLAP_SUCCESS) {
@ -791,6 +738,7 @@ OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings,
start_key.build_min_key();
}
RowCursor end_key;
// 和startkey一样处理,没有则用maxkey初始化
if (end_key_strings.size() > 0) {
if (OLAP_SUCCESS != end_key.init_scan_key(_schema, end_key_strings.values())) {
@ -832,17 +780,17 @@ void Tablet::delete_all_files() {
// we have to call list_versions first, or else error occurs when
// removing hash_map item and iterating hash_map concurrently.
ReadLock rdlock(&_meta_lock);
for (auto it = _rs_version_map.begin(); it != _rs_version_map.end(); ++it) {
it->second->remove();
for (auto it : _rs_version_map) {
it.second->remove();
}
_rs_version_map.clear();
for (auto it = _inc_rs_version_map.begin(); it != _inc_rs_version_map.end(); ++it) {
it->second->remove();
for (auto it : _inc_rs_version_map) {
it.second->remove();
}
_inc_rs_version_map.clear();
}
bool Tablet::check_path(const std::string& path_to_check) {
bool Tablet::check_path(const std::string& path_to_check) const {
ReadLock rdlock(&_meta_lock);
if (path_to_check == _tablet_path) {
return true;
@ -911,14 +859,10 @@ OLAPStatus Tablet::_contains_version(const Version& version) {
// because the value type is std::shared_ptr, when will it be nullptr?
// In addition, in this class, there are many places that do not make this judgment
// when access _rs_version_map's value.
if (it.second == nullptr) {
LOG(FATAL) << "there exist a version=" << it.first
<< " contains the input rs with version=" << version
<< ", but the related rs is null";
return OLAP_ERR_PUSH_ROWSET_NOT_FOUND;
} else {
return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST;
}
CHECK(it.second != nullptr) << "there exist a version=" << it.first
<< " contains the input rs with version=" << version
<< ", but the related rs is null";
return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST;
}
}
@ -954,7 +898,7 @@ void Tablet::pick_candicate_rowsets_to_base_compaction(vector<RowsetSharedPtr>*
}
// For http compaction action
OLAPStatus Tablet::get_compaction_status(std::string* json_result) {
void Tablet::get_compaction_status(std::string* json_result) {
rapidjson::Document root;
root.SetObject();
@ -962,10 +906,13 @@ OLAPStatus Tablet::get_compaction_status(std::string* json_result) {
std::vector<bool> delete_flags;
{
ReadLock rdlock(&_meta_lock);
rowsets.reserve(_rs_version_map.size());
for (auto& it : _rs_version_map) {
rowsets.push_back(it.second);
}
std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
delete_flags.reserve(rowsets.size());
for (auto& rs : rowsets) {
delete_flags.push_back(version_for_delete_predicate(rs->version()));
}
@ -1008,29 +955,28 @@ OLAPStatus Tablet::get_compaction_status(std::string* json_result) {
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
root.Accept(writer);
*json_result = std::string(strbuf.GetString());
return OLAP_SUCCESS;
}
OLAPStatus Tablet::do_tablet_meta_checkpoint() {
void Tablet::do_tablet_meta_checkpoint() {
WriteLock store_lock(&_meta_store_lock);
if (_newly_created_rowset_num == 0) {
return OLAP_SUCCESS;
return;
}
if (UnixMillis() - _last_checkpoint_time < config::tablet_meta_checkpoint_min_interval_secs * 1000
&& _newly_created_rowset_num < config::tablet_meta_checkpoint_min_new_rowsets_num) {
return OLAP_SUCCESS;
return;
}
// hold read-lock other than write-lock, because it will not modify meta structure
ReadLock rdlock(&_meta_lock);
if (tablet_state() != TABLET_RUNNING) {
LOG(INFO) << "tablet is under state=" << tablet_state()
<< ", not running, skip do checkpoint"
<< ", tablet=" << full_name();
return OLAP_SUCCESS;
return;
}
LOG(INFO) << "start to do tablet meta checkpoint, tablet=" << full_name();
RETURN_NOT_OK(save_meta());
save_meta();
// if save meta successfully, then should remove the rowset meta existing in tablet
// meta from rowset meta store
for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
@ -1048,7 +994,6 @@ OLAPStatus Tablet::do_tablet_meta_checkpoint() {
}
_newly_created_rowset_num = 0;
_last_checkpoint_time = UnixMillis();
return OLAP_SUCCESS;
}
bool Tablet::rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta) {
@ -1131,7 +1076,7 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
// there are some rowset meta in local meta store and in in-memory tablet meta
// but not in tablet meta in local meta store
// TODO(lingbin): do we need _meta_lock?
void Tablet::generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) {
void Tablet::generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const {
TabletMetaPB tablet_meta_pb;
_tablet_meta->to_meta_pb(&tablet_meta_pb);
new_tablet_meta->init_from_pb(tablet_meta_pb);

View File

@ -51,7 +51,6 @@ public:
DataDir* data_dir = nullptr);
Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
~Tablet();
OLAPStatus init();
inline bool init_succeeded();
@ -69,12 +68,11 @@ public:
// Property encapsulated in TabletMeta
inline const TabletMetaSharedPtr tablet_meta();
OLAPStatus save_meta();
void save_meta();
// Used in clone task, to update local meta when finishing a clone job
OLAPStatus revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone,
const std::vector<Version>& versions_to_delete);
inline TabletUid tablet_uid() const;
inline int64_t table_id() const;
// Returns a string can be used to uniquely identify a tablet.
@ -111,8 +109,8 @@ public:
// operation in rowsets
OLAPStatus add_rowset(RowsetSharedPtr rowset, bool need_persist = true);
OLAPStatus modify_rowsets(const vector<RowsetSharedPtr>& to_add,
const vector<RowsetSharedPtr>& to_delete);
void modify_rowsets(const vector<RowsetSharedPtr>& to_add,
const vector<RowsetSharedPtr>& to_delete);
// _rs_version_map and _inc_rs_version_map should be protected by _meta_lock
// The caller must call hold _meta_lock when call this two function.
@ -144,7 +142,7 @@ public:
// message for alter task
AlterTabletTaskSharedPtr alter_task();
OLAPStatus add_alter_task(int64_t related_tablet_id, int32_t related_schema_hash,
void add_alter_task(int64_t related_tablet_id, int32_t related_schema_hash,
const vector<Version>& versions_to_alter,
const AlterTabletType alter_type);
void delete_alter_task();
@ -177,8 +175,8 @@ public:
bool can_do_compaction();
const uint32_t calc_cumulative_compaction_score() const;
const uint32_t calc_base_compaction_score() const;
void compute_version_hash_from_rowsets(const std::vector<RowsetSharedPtr>& rowsets,
VersionHash* version_hash) const;
static void compute_version_hash_from_rowsets(const std::vector<RowsetSharedPtr>& rowsets,
VersionHash* version_hash);
// operation for clone
void calc_missed_versions(int64_t spec_version, vector<Version>* missed_versions);
@ -187,7 +185,7 @@ public:
// This function to find max continous version from the beginning.
// For example: If there are 1, 2, 3, 5, 6, 7 versions belongs tablet, then 3 is target.
OLAPStatus max_continuous_version_from_begining(Version* version, VersionHash* v_hash);
void max_continuous_version_from_begining(Version* version, VersionHash* v_hash);
// operation for query
OLAPStatus split_range(
@ -217,7 +215,7 @@ public:
void delete_all_files();
bool check_path(const std::string& check_path);
bool check_path(const std::string& check_path) const;
bool check_rowset_id(const RowsetId& rowset_id);
OLAPStatus set_partition_id(int64_t partition_id);
@ -228,7 +226,7 @@ public:
std::vector<RowsetSharedPtr>* candidate_rowsets);
void pick_candicate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets);
OLAPStatus calculate_cumulative_point();
void calculate_cumulative_point();
// TODO(ygl):
inline bool is_primary_replica() { return false; }
@ -237,24 +235,24 @@ public:
// eco mode also means save money in palo
inline bool in_eco_mode() { return false; }
OLAPStatus do_tablet_meta_checkpoint();
void do_tablet_meta_checkpoint();
bool rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta);
void build_tablet_report_info(TTabletInfo* tablet_info);
void generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta);
void generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const;
// return a json string to show the compaction status of this tablet
OLAPStatus get_compaction_status(std::string* json_result);
void get_compaction_status(std::string* json_result);
private:
OLAPStatus _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions) const;
bool _contains_rowset(const RowsetId rowset_id);
OLAPStatus _contains_version(const Version& version);
OLAPStatus _max_continuous_version_from_begining_unlocked(Version* version,
VersionHash* v_hash) const ;
void _max_continuous_version_from_begining_unlocked(Version* version,
VersionHash* v_hash) const ;
void _gen_tablet_path();
RowsetSharedPtr _rowset_with_largest_size();
void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash);
@ -262,6 +260,7 @@ private:
vector<RowsetSharedPtr>* rowsets) const;
private:
static const int64_t kInvalidCumulativePoint = -1;
TabletState _state;
TabletMetaSharedPtr _tablet_meta;
TabletSchema _schema;
@ -281,7 +280,7 @@ private:
// TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to
// explain how these two locks work together.
RWMutex _meta_lock;
mutable RWMutex _meta_lock;
// A new load job will produce a new rowset, which will be inserted into both _rs_version_map
// and _inc_rs_version_map. Only the most recent rowsets are kept in _inc_rs_version_map to
// reduce the amount of data that needs to be copied during the clone task.

View File

@ -170,9 +170,7 @@ OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, Schem
OLAPStatus res = OLAP_SUCCESS;
if (update_meta) {
// call tablet save meta in order to valid the meta
RETURN_NOT_OK_LOG(tablet->save_meta(), Substitute(
"failed to save new tablet's meta. tablet_id=$0, schema_hash=$1",
tablet_id, schema_hash));
tablet->save_meta();
}
if (drop_old) {
// If the new tablet is fresher than the existing one, then replace
@ -529,11 +527,7 @@ OLAPStatus TabletManager::_drop_tablet_unlocked(
&& related_alter_task->related_tablet_id() == tablet_id
&& related_alter_task->related_schema_hash() == schema_hash) {
related_tablet->delete_alter_task();
res = related_tablet->save_meta();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save tablet header. res=" << res
<< ", tablet=" << related_tablet->full_name();
}
related_tablet->save_meta();
}
related_tablet->release_header_lock();
res = _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files);
@ -1340,8 +1334,7 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(
// and the tablet will be loaded at restart time.
// To avoid this exception, we first set the state of the tablet to `SHUTDOWN`.
tablet->set_tablet_state(TABLET_SHUTDOWN);
RETURN_NOT_OK_LOG(tablet->save_meta(), Substitute(
"fail to drop tablet.tablet_id=$0, schema_hash=$1", tablet_id, schema_hash));
tablet->save_meta();
{
WriteLock wlock(&_shutdown_tablets_lock);
_shutdown_tablets.push_back(tablet);