Add tablet meta checkpoint mechanism (#1936)

This commit is contained in:
yiguolei
2019-10-10 09:39:02 +08:00
committed by lichaoyong
parent d46fc59cc3
commit b72a4a4bc6
13 changed files with 256 additions and 26 deletions

View File

@ -467,6 +467,10 @@ namespace config {
CONF_Int64(storage_flood_stage_left_capacity_bytes, "1073741824") // 1GB
// number of thread for flushing memtable per store
CONF_Int32(flush_thread_num_per_store, "2");
// config for tablet meta checkpoint
CONF_Int32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
CONF_Int32(tablet_meta_checkpoint_min_interval_secs, "600");
} // namespace config
} // namespace doris

View File

@ -825,24 +825,13 @@ OLAPStatus DataDir::load() {
}
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE
&& rowset_meta->tablet_uid() == tablet->tablet_uid()) {
// add visible rowset to tablet, it maybe use in the future
// there should be only preparing rowset in meta env because visible
// rowset is persist with tablet meta currently
OLAPStatus publish_status = tablet->add_inc_rowset(rowset);
OLAPStatus publish_status = tablet->add_rowset(rowset, false);
if (publish_status != OLAP_SUCCESS && publish_status != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "add visilbe rowset to tablet failed rowset_id:" << rowset->rowset_id()
<< " tablet id: " << rowset_meta->tablet_id()
<< " txn id:" << rowset_meta->txn_id()
<< " start_version: " << rowset_meta->version().first
<< " end_version: " << rowset_meta->version().second;
} else {
// it is added into tablet meta, then remove it from meta
RowsetMetaManager::remove(tablet->data_dir()->get_meta(), rowset_meta->tablet_uid(), rowset->rowset_id());
LOG(INFO) << "successfully to add visible rowset: " << rowset_meta->rowset_id()
<< " to tablet: " << rowset_meta->tablet_id()
<< " txn id:" << rowset_meta->txn_id()
<< " start_version: " << rowset_meta->version().first
<< " end_version: " << rowset_meta->version().second;
LOG(WARNING) << "add visible rowset to tablet failed rowset_id:" << rowset->rowset_id()
<< " tablet id: " << rowset_meta->tablet_id()
<< " txn id:" << rowset_meta->txn_id()
<< " start_version: " << rowset_meta->version().first
<< " end_version: " << rowset_meta->version().second;
}
} else {
LOG(WARNING) << "find invalid rowset: " << rowset_meta->rowset_id()

View File

@ -90,6 +90,16 @@ OLAPStatus StorageEngine::_start_bg_worker() {
thread.detach();
}
for (auto data_dir : data_dirs) {
_tablet_checkpoint_threads.emplace_back(
[this, data_dir] {
_tablet_checkpoint_callback((void*)data_dir);
});
}
for (auto& thread : _tablet_checkpoint_threads) {
thread.detach();
}
_fd_cache_clean_thread = std::thread(
[this] {
_fd_cache_clean_callback(nullptr);

View File

@ -118,6 +118,10 @@ public:
_need_delete_file = true;
}
bool contains_version(Version version) {
return rowset_meta()->version().first <= version.first && rowset_meta()->version().second >= version.second;
}
static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) {
return left->end_version() < right->end_version();
}

View File

@ -293,6 +293,10 @@ public:
*rs_meta_pb = _rowset_meta_pb;
}
bool is_singleton_delta() {
return has_version() && _rowset_meta_pb.start_version() == _rowset_meta_pb.end_version();
}
private:
friend class AlphaRowsetMeta;
bool _deserialize_from_pb(const std::string& value) {

View File

@ -1944,7 +1944,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
sc_params.new_tablet->release_push_lock();
goto PROCESS_ALTER_EXIT;
}
res = sc_params.new_tablet->add_rowset(new_rowset);
res = sc_params.new_tablet->add_rowset(new_rowset, false);
if (res == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "version already exist, version revert occured. "
<< "tablet=" << sc_params.new_tablet->full_name()
@ -1973,6 +1973,11 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
}
// XXX: 此时应该不取消SchemaChange状态,因为新Delta还要转换成新旧Schema的版本
PROCESS_ALTER_EXIT:
{
// save tablet meta here because rowset meta is not saved during add rowset
WriteLock new_wlock(sc_params.new_tablet->get_header_lock_ptr());
res = sc_params.new_tablet->save_meta();
}
if (res == OLAP_SUCCESS) {
Version test_version(0, end_version);
res = sc_params.new_tablet->check_version_integrity(test_version);

View File

@ -648,9 +648,52 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) {
// clean rubbish transactions
_clean_unused_txns();
// clean unused rowset metas in OlapMeta
_clean_unused_rowset_metas();
return res;
}
void StorageEngine::_clean_unused_rowset_metas() {
std::vector<RowsetMetaSharedPtr> invalid_rowset_metas;
auto clean_rowset_func = [this, &invalid_rowset_metas](TabletUid tablet_uid, RowsetId rowset_id,
const std::string& meta_str) -> bool {
RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta());
bool parsed = rowset_meta->init(meta_str);
if (!parsed) {
LOG(WARNING) << "parse rowset meta string failed for rowset_id:" << rowset_id;
// return false will break meta iterator, return true to skip this error
return true;
}
if (rowset_meta->tablet_uid() != tablet_uid) {
LOG(WARNING) << "tablet uid is not equal, skip the rowset"
<< ", rowset_id=" << rowset_meta->rowset_id()
<< ", in_put_tablet_uid=" << tablet_uid
<< ", tablet_uid in rowset meta=" << rowset_meta->tablet_uid();
return true;
}
TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id(), rowset_meta->tablet_schema_hash(), tablet_uid);
if (tablet == nullptr) {
return true;
}
if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE && (!tablet->rowset_meta_is_useful(rowset_meta))) {
LOG(INFO) << "rowset meta is useless any more, remote it. rowset_id=" << rowset_meta->rowset_id();
invalid_rowset_metas.push_back(rowset_meta);
}
return true;
};
auto data_dirs = get_stores();
for (auto data_dir : data_dirs) {
RowsetMetaManager::traverse_rowset_metas(data_dir->get_meta(), clean_rowset_func);
for (auto& rowset_meta : invalid_rowset_metas) {
RowsetMetaManager::remove(data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id());
}
invalid_rowset_metas.clear();
}
}
void StorageEngine::_clean_unused_txns() {
std::set<TabletInfo> tablet_infos;
_txn_manager->get_all_related_tablets(&tablet_infos);
@ -977,4 +1020,24 @@ void* StorageEngine::_path_scan_thread_callback(void* arg) {
return nullptr;
}
void* StorageEngine::_tablet_checkpoint_callback(void* arg) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
LOG(INFO) << "try to start tablet meta checkpoint thread!";
while (true) {
LOG(INFO) << "begin to do tablet meta checkpoint";
int64_t start_time = UnixMillis();
_tablet_manager->do_tablet_meta_checkpoint((DataDir*)arg);
int64_t used_time = (UnixMillis() - start_time) / 1000;
if (used_time < config::tablet_meta_checkpoint_min_interval_secs) {
sleep(config::tablet_meta_checkpoint_min_interval_secs - used_time);
} else {
sleep(1);
}
}
return nullptr;
}
} // namespace doris

View File

@ -227,6 +227,8 @@ private:
OLAPStatus _start_bg_worker();
void _clean_unused_txns();
void _clean_unused_rowset_metas();
OLAPStatus _do_sweep(
const std::string& scan_root, const time_t& local_tm_now, const int32_t expire);
@ -255,6 +257,8 @@ private:
void* _path_scan_thread_callback(void* arg);
void* _tablet_checkpoint_callback(void* arg);
private:
struct CompactionCandidate {
@ -333,6 +337,9 @@ private:
// thread to scan disk paths
std::vector<std::thread> _path_scan_threads;
// thread to run tablet checkpoint
std::vector<std::thread> _tablet_checkpoint_threads;
static atomic_t _s_request_number;
// for tablet and disk report

View File

@ -240,13 +240,48 @@ OLAPStatus Tablet::deregister_tablet_from_dir() {
return _data_dir->deregister_tablet(this);
}
OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset) {
OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
WriteLock wrlock(&_meta_lock);
// if the rowset already exist, should not return version already exist
// should return OLAP_SUCCESS
if (contains_rowset(rowset->rowset_id())) {
return OLAP_SUCCESS;
}
RETURN_NOT_OK(_check_added_rowset(rowset));
RETURN_NOT_OK(_tablet_meta->add_rs_meta(rowset->rowset_meta()));
_rs_version_map[rowset->version()] = rowset;
RETURN_NOT_OK(_rs_graph.add_version_to_graph(rowset->version()));
RETURN_NOT_OK(save_meta());
vector<RowsetSharedPtr> rowsets_to_delete;
// yiguolei: temp code, should remove the rowset contains by this rowset
// but it should be removed in multi path version
for (auto& it : _rs_version_map) {
if ((it.first.first >= rowset->start_version() && it.first.second < rowset->end_version())
|| (it.first.first > rowset->start_version() && it.first.second <= rowset->end_version())) {
if (it.second == nullptr) {
LOG(FATAL) << "there exist a version "
<< " start_version=" << it.first.first
<< " end_version=" << it.first.second
<< " contains the input rs with version "
<< " start_version=" << rowset->start_version()
<< " end_version=" << rowset->end_version()
<< " but the related rs is null";
return OLAP_ERR_PUSH_ROWSET_NOT_FOUND;
} else {
rowsets_to_delete.push_back(it.second);
}
}
}
modify_rowsets(std::vector<RowsetSharedPtr>(), rowsets_to_delete);
if (need_persist) {
OLAPStatus res = RowsetMetaManager::save(data_dir()->get_meta(), tablet_uid(),
rowset->rowset_id(), rowset->rowset_meta().get());
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "failed to save rowset to local meta store" << rowset->rowset_id();
}
}
++_newly_created_rowset_num;
return OLAP_SUCCESS;
}
@ -269,7 +304,8 @@ OLAPStatus Tablet::modify_rowsets(const vector<RowsetSharedPtr>& to_add,
}
for (auto& rs : to_add) {
_rs_version_map[rs->version()] = rs;;
_rs_version_map[rs->version()] = rs;
++_newly_created_rowset_num;
}
_rs_graph.reconstruct_rowset_graph(_tablet_meta->all_rs_metas());
@ -336,8 +372,12 @@ RowsetSharedPtr Tablet::rowset_with_largest_size() {
return largest_rowset;
}
// add inc rowset should not persist tablet meta, because the
OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) {
WriteLock wrlock(&_meta_lock);
if (contains_rowset(rowset->rowset_id())) {
return OLAP_SUCCESS;
}
// check if the rowset id is valid
RETURN_NOT_OK(_check_added_rowset(rowset));
RETURN_NOT_OK(_tablet_meta->add_rs_meta(rowset->rowset_meta()));
@ -345,7 +385,7 @@ OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) {
_inc_rs_version_map[rowset->version()] = rowset;
RETURN_NOT_OK(_rs_graph.add_version_to_graph(rowset->version()));
RETURN_NOT_OK(_tablet_meta->add_inc_rs_meta(rowset->rowset_meta()));
RETURN_NOT_OK(_tablet_meta->save_meta(_data_dir));
++_newly_created_rowset_num;
return OLAP_SUCCESS;
}
@ -974,4 +1014,72 @@ void Tablet::pick_candicate_rowsets_to_base_compaction(std::vector<RowsetSharedP
}
}
OLAPStatus Tablet::do_tablet_meta_checkpoint() {
WriteLock store_lock(&_meta_store_lock);
if (_newly_created_rowset_num == 0) {
return OLAP_SUCCESS;
}
if (UnixMillis() - _last_checkpoint_time < config::tablet_meta_checkpoint_min_interval_secs * 1000
&& _newly_created_rowset_num < config::tablet_meta_checkpoint_min_new_rowsets_num) {
return OLAP_SUCCESS;
}
// hold read lock not write lock, because it will not modify meta structure
ReadLock rdlock(&_meta_lock);
LOG(INFO) << "start to do tablet meta checkpoint, tablet=" << full_name();
RETURN_NOT_OK(save_meta());
// if save meta successfully, then should remove the rowset meta existing in tablet
// meta from rowset meta store
for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
if (RowsetMetaManager::check_rowset_meta(_data_dir->get_meta(), tablet_uid(), rs_meta->rowset_id())) {
RowsetMetaManager::remove(_data_dir->get_meta(), tablet_uid(), rs_meta->rowset_id());
LOG(INFO) << "remove rowset id from meta store because it is already persistent with tablet meta"
<< ", rowset_id=" << rs_meta->rowset_id();
}
}
_newly_created_rowset_num = 0;
_last_checkpoint_time = UnixMillis();
return OLAP_SUCCESS;
}
bool Tablet::rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta) {
ReadLock rdlock(&_meta_lock);
bool find_rowset_id = false;
bool find_version = false;
for (auto& version_rowset : _rs_version_map) {
if (version_rowset.second->rowset_id() == rowset_meta->rowset_id()) {
find_rowset_id = true;
}
if (version_rowset.second->contains_version(rowset_meta->version())) {
find_version = true;
}
}
for (auto& inc_version_rowset : _inc_rs_version_map) {
if (inc_version_rowset.second->rowset_id() == rowset_meta->rowset_id()) {
find_rowset_id = true;
}
if (inc_version_rowset.second->contains_version(rowset_meta->version())) {
find_version = true;
}
}
if (find_rowset_id || !find_version) {
return true;
} else {
return false;
}
}
bool Tablet::contains_rowset(const RowsetId rowset_id) {
for (auto& version_rowset : _rs_version_map) {
if (version_rowset.second->rowset_id() == rowset_id) {
return true;
}
}
for (auto& inc_version_rowset : _inc_rs_version_map) {
if (inc_version_rowset.second->rowset_id() == rowset_id) {
return true;
}
}
return false;
}
} // namespace doris

View File

@ -107,7 +107,7 @@ public:
inline size_t field_index(const string& field_name) const;
// operation in rowsets
OLAPStatus add_rowset(RowsetSharedPtr rowset);
OLAPStatus add_rowset(RowsetSharedPtr rowset, bool need_persist = true);
OLAPStatus modify_rowsets(const vector<RowsetSharedPtr>& to_add,
const vector<RowsetSharedPtr>& to_delete);
const RowsetSharedPtr get_rowset_by_version(const Version& version) const;
@ -233,6 +233,12 @@ public:
// eco mode also means save money in palo
inline bool in_eco_mode() { return false; }
OLAPStatus do_tablet_meta_checkpoint();
bool rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta);
bool contains_rowset(const RowsetId rowset_id);
private:
OLAPStatus _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions) const;
@ -249,6 +255,9 @@ private:
DorisInitOnce _init_once;
RWMutex _meta_lock;
// meta store lock is used for prevent 2 threads do checkpoint concurrently
// it will be used in econ-mode in the future
RWMutex _meta_store_lock;
Mutex _ingest_lock;
Mutex _base_lock;
Mutex _cumulative_lock;
@ -260,6 +269,8 @@ private:
std::atomic<int64_t> _last_compaction_failure_time; // timestamp of last compaction failure
std::atomic<int64_t> _cumulative_point;
std::atomic<int32_t> _newly_created_rowset_num;
std::atomic<int64_t> _last_checkpoint_time;
DISALLOW_COPY_AND_ASSIGN(Tablet);
};

View File

@ -1164,6 +1164,31 @@ void TabletManager::get_partition_related_tablets(int64_t partition_id, std::set
}
}
void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) {
vector<TabletSharedPtr> related_tablets;
{
ReadLock tablet_map_rdlock(&_tablet_map_lock);
for (tablet_map_t::value_type& table_ins : _tablet_map){
for (TabletSharedPtr& table_ptr : table_ins.second.table_arr) {
// if tablet is not ready, it maybe a new tablet under schema change, not do compaction
if (table_ptr->tablet_state() != TABLET_RUNNING) {
continue;
}
if (table_ptr->data_dir()->path_hash() != data_dir->path_hash()
|| !table_ptr->is_used() || !table_ptr->init_succeeded()) {
continue;
}
related_tablets.push_back(table_ptr);
}
}
}
for (TabletSharedPtr tablet : related_tablets) {
tablet->do_tablet_meta_checkpoint();
}
return;
}
void TabletManager::_build_tablet_info(TabletSharedPtr tablet, TTabletInfo* tablet_info) {
tablet_info->tablet_id = tablet->tablet_id();
tablet_info->schema_hash = tablet->schema_hash();
@ -1259,7 +1284,7 @@ OLAPStatus TabletManager::_create_inital_rowset(
}
new_rowset = builder->build();
res = tablet->add_rowset(new_rowset);
res = tablet->add_rowset(new_rowset, false);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to add rowset for tablet " << tablet->full_name();
break;

View File

@ -135,6 +135,8 @@ public:
void get_partition_related_tablets(int64_t partition_id, std::set<TabletInfo>* tablet_infos);
void do_tablet_meta_checkpoint(DataDir* data_dir);
private:
// Add a tablet pointer to StorageEngine
// If force, drop the existing tablet add this new one

View File

@ -118,8 +118,6 @@ OLAPStatus EnginePublishVersionTask::finish() {
LOG(INFO) << "publish version successfully on tablet. tablet=" << tablet->full_name()
<< ", transaction_id=" << transaction_id << ", version=" << version.first
<< ", res=" << publish_status;
// delete rowset from meta env, because add inc rowset alreay saved the rowset meta to tablet meta
RowsetMetaManager::remove(tablet->data_dir()->get_meta(), tablet->tablet_uid(), rowset->rowset_id());
}
// check if the related tablet remained all have the version