[Optimize] Reduce meaningless memory copies (#5748)
Reduce meaningless memory copies of rowset_meta pb
This commit is contained in:
@ -160,6 +160,7 @@ public:
|
||||
bool delete_flag() const { return rowset_meta()->delete_flag(); }
|
||||
int64_t num_segments() const { return rowset_meta()->num_segments(); }
|
||||
void to_rowset_pb(RowsetMetaPB* rs_meta) { return rowset_meta()->to_rowset_pb(rs_meta); }
|
||||
const RowsetMetaPB& get_rowset_pb() { return rowset_meta()->get_rowset_pb(); }
|
||||
inline KeysType keys_type() { return _schema->keys_type(); }
|
||||
|
||||
// remove all files in this rowset
|
||||
@ -192,8 +193,8 @@ public:
|
||||
return;
|
||||
}
|
||||
VLOG_NOTICE << "rowset is close. rowset state from:" << old_state << " to "
|
||||
<< _rowset_state_machine.rowset_state() << ", version:" << start_version() << "-"
|
||||
<< end_version() << ", tabletid:" << _rowset_meta->tablet_id();
|
||||
<< _rowset_state_machine.rowset_state() << ", version:" << start_version()
|
||||
<< "-" << end_version() << ", tabletid:" << _rowset_meta->tablet_id();
|
||||
}
|
||||
|
||||
// hard link all files in this rowset to `dir` to form a new rowset with id `new_rowset_id`.
|
||||
@ -238,7 +239,8 @@ public:
|
||||
}
|
||||
}
|
||||
if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
|
||||
VLOG_NOTICE << "close the rowset. rowset state from ROWSET_UNLOADING to ROWSET_UNLOADED"
|
||||
VLOG_NOTICE
|
||||
<< "close the rowset. rowset state from ROWSET_UNLOADING to ROWSET_UNLOADED"
|
||||
<< ", version:" << start_version() << "-" << end_version()
|
||||
<< ", tabletid:" << _rowset_meta->tablet_id();
|
||||
}
|
||||
|
||||
@ -225,6 +225,7 @@ public:
|
||||
void set_num_segments(int64_t num_segments) { _rowset_meta_pb.set_num_segments(num_segments); }
|
||||
|
||||
void to_rowset_pb(RowsetMetaPB* rs_meta_pb) const { *rs_meta_pb = _rowset_meta_pb; }
|
||||
const RowsetMetaPB& get_rowset_pb() { return _rowset_meta_pb; }
|
||||
|
||||
bool is_singleton_delta() const {
|
||||
return has_version() && _rowset_meta_pb.start_version() == _rowset_meta_pb.end_version();
|
||||
|
||||
@ -38,7 +38,7 @@ bool RowsetMetaManager::check_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
|
||||
const RowsetId& rowset_id) {
|
||||
std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
|
||||
std::string value;
|
||||
return meta->key_may_exist(META_COLUMN_FAMILY_INDEX, key, &value);;
|
||||
return meta->key_may_exist(META_COLUMN_FAMILY_INDEX, key, &value);
|
||||
}
|
||||
|
||||
OLAPStatus RowsetMetaManager::get_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
|
||||
@ -146,9 +146,7 @@ OLAPStatus RowsetMetaManager::load_json_rowset_meta(OlapMeta* meta,
|
||||
}
|
||||
RowsetId rowset_id = rowset_meta.rowset_id();
|
||||
TabletUid tablet_uid = rowset_meta.tablet_uid();
|
||||
RowsetMetaPB rowset_meta_pb;
|
||||
rowset_meta.to_rowset_pb(&rowset_meta_pb);
|
||||
OLAPStatus status = save(meta, tablet_uid, rowset_id, rowset_meta_pb);
|
||||
OLAPStatus status = save(meta, tablet_uid, rowset_id, rowset_meta.get_rowset_pb());
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
@ -85,7 +85,7 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
|
||||
OLAPStatus Tablet::_init_once_action() {
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
VLOG_NOTICE << "begin to load tablet. tablet=" << full_name()
|
||||
<< ", version_size=" << _tablet_meta->version_count();
|
||||
<< ", version_size=" << _tablet_meta->version_count();
|
||||
|
||||
#ifdef BE_TEST
|
||||
// init cumulative compaction policy by type
|
||||
@ -165,7 +165,7 @@ OLAPStatus Tablet::revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& ro
|
||||
new_tablet_meta->add_rs_meta(rs_meta);
|
||||
}
|
||||
VLOG_NOTICE << "load rowsets successfully when clone. tablet=" << full_name()
|
||||
<< ", added rowset size=" << rowsets_to_clone.size();
|
||||
<< ", added rowset size=" << rowsets_to_clone.size();
|
||||
// save and reload tablet_meta
|
||||
res = new_tablet_meta->save_meta(_data_dir);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
@ -238,10 +238,9 @@ OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
|
||||
modify_rowsets(empty_vec, rowsets_to_delete);
|
||||
|
||||
if (need_persist) {
|
||||
RowsetMetaPB rowset_meta_pb;
|
||||
rowset->rowset_meta()->to_rowset_pb(&rowset_meta_pb);
|
||||
OLAPStatus res = RowsetMetaManager::save(data_dir()->get_meta(), tablet_uid(),
|
||||
rowset->rowset_id(), rowset_meta_pb);
|
||||
OLAPStatus res =
|
||||
RowsetMetaManager::save(data_dir()->get_meta(), tablet_uid(), rowset->rowset_id(),
|
||||
rowset->rowset_meta()->get_rowset_pb());
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "failed to save rowset to local meta store" << rowset->rowset_id();
|
||||
}
|
||||
@ -272,7 +271,7 @@ void Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
|
||||
if (to_add[i]->version() != to_delete[i]->version()) {
|
||||
same_version = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
same_version = false;
|
||||
@ -311,14 +310,15 @@ void Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
|
||||
// delete rowset in "to_delete" directly
|
||||
for (auto& rs : to_delete) {
|
||||
LOG(INFO) << "add unused rowset " << rs->rowset_id() << " because of same version";
|
||||
StorageEngine::instance()->add_unused_rowset(rs);
|
||||
StorageEngine::instance()->add_unused_rowset(rs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// snapshot manager may call this api to check if version exists, so that
|
||||
// the version maybe not exist
|
||||
const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version, bool find_in_stale) const {
|
||||
const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version,
|
||||
bool find_in_stale) const {
|
||||
auto iter = _rs_version_map.find(version);
|
||||
if (iter == _rs_version_map.end()) {
|
||||
if (find_in_stale) {
|
||||
@ -640,7 +640,7 @@ OLAPStatus Tablet::capture_rs_readers(const std::vector<Version>& version_path,
|
||||
auto it = _rs_version_map.find(version);
|
||||
if (it == _rs_version_map.end()) {
|
||||
VLOG_NOTICE << "fail to find Rowset in rs_version for version. tablet=" << full_name()
|
||||
<< ", version='" << version.first << "-" << version.second;
|
||||
<< ", version='" << version.first << "-" << version.second;
|
||||
|
||||
it = _stale_rs_version_map.find(version);
|
||||
if (it == _rs_version_map.end()) {
|
||||
@ -722,8 +722,9 @@ bool Tablet::can_do_compaction() {
|
||||
return true;
|
||||
}
|
||||
|
||||
uint32_t Tablet::calc_compaction_score(CompactionType compaction_type,
|
||||
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
|
||||
uint32_t Tablet::calc_compaction_score(
|
||||
CompactionType compaction_type,
|
||||
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
|
||||
// Need meta lock, because it will iterator "all_rs_metas" of tablet meta.
|
||||
ReadLock rdlock(&_meta_lock);
|
||||
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
|
||||
@ -737,8 +738,8 @@ uint32_t Tablet::calc_compaction_score(CompactionType compaction_type,
|
||||
const uint32_t Tablet::_calc_cumulative_compaction_score(
|
||||
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
|
||||
#ifndef BE_TEST
|
||||
if (_cumulative_compaction_policy == nullptr || _cumulative_compaction_policy->name() !=
|
||||
cumulative_compaction_policy->name()) {
|
||||
if (_cumulative_compaction_policy == nullptr ||
|
||||
_cumulative_compaction_policy->name() != cumulative_compaction_policy->name()) {
|
||||
_cumulative_compaction_policy = cumulative_compaction_policy;
|
||||
}
|
||||
#endif
|
||||
@ -1125,12 +1126,13 @@ void Tablet::get_compaction_status(std::string* json_result) {
|
||||
for (int i = 0; i < stale_rowsets.size(); ++i) {
|
||||
const Version& ver = stale_rowsets[i]->version();
|
||||
rapidjson::Value value;
|
||||
std::string disk_size =
|
||||
PrettyPrinter::print(stale_rowsets[i]->rowset_meta()->total_disk_size(), TUnit::BYTES);
|
||||
std::string disk_size = PrettyPrinter::print(
|
||||
stale_rowsets[i]->rowset_meta()->total_disk_size(), TUnit::BYTES);
|
||||
std::string version_str = strings::Substitute(
|
||||
"[$0-$1] $2 $3 $4", ver.first, ver.second, stale_rowsets[i]->num_segments(),
|
||||
stale_rowsets[i]->rowset_id().to_string(), disk_size);
|
||||
value.SetString(version_str.c_str(), version_str.length(), stale_versions_arr.GetAllocator());
|
||||
value.SetString(version_str.c_str(), version_str.length(),
|
||||
stale_versions_arr.GetAllocator());
|
||||
stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
|
||||
}
|
||||
root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
|
||||
@ -1414,4 +1416,4 @@ void Tablet::reset_compaction(CompactionType compaction_type) {
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
@ -25,8 +25,8 @@
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <filesystem>
|
||||
#include <cstdio>
|
||||
#include <filesystem>
|
||||
#include <new>
|
||||
#include <queue>
|
||||
#include <random>
|
||||
@ -162,8 +162,8 @@ OLAPStatus TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId tra
|
||||
_insert_txn_partition_map_unlocked(transaction_id, partition_id);
|
||||
|
||||
VLOG_NOTICE << "add transaction to engine successfully."
|
||||
<< "partition_id: " << key.first << ", transaction_id: " << key.second
|
||||
<< ", tablet: " << tablet_info.to_string();
|
||||
<< "partition_id: " << key.first << ", transaction_id: " << key.second
|
||||
<< ", tablet: " << tablet_info.to_string();
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
@ -229,10 +229,9 @@ OLAPStatus TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
// save meta need access disk, it maybe very slow, so that it is not in global txn lock
|
||||
// it is under a single txn lock
|
||||
if (!is_recovery) {
|
||||
RowsetMetaPB rowset_meta_pb;
|
||||
rowset_ptr->rowset_meta()->to_rowset_pb(&rowset_meta_pb);
|
||||
OLAPStatus save_status =
|
||||
RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(), rowset_meta_pb);
|
||||
RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(),
|
||||
rowset_ptr->rowset_meta()->get_rowset_pb());
|
||||
if (save_status != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "save committed rowset failed. when commit txn rowset_id:"
|
||||
<< rowset_ptr->rowset_id() << "tablet id: " << tablet_id
|
||||
@ -285,10 +284,9 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
// TODO(ygl): rowset is already set version here, memory is changed, if save failed
|
||||
// it maybe a fatal error
|
||||
rowset_ptr->make_visible(version, version_hash);
|
||||
RowsetMetaPB rowset_meta_pb;
|
||||
rowset_ptr->rowset_meta()->to_rowset_pb(&rowset_meta_pb);
|
||||
OLAPStatus save_status =
|
||||
RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(), rowset_meta_pb);
|
||||
RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(),
|
||||
rowset_ptr->rowset_meta()->get_rowset_pb());
|
||||
if (save_status != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "save committed rowset failed. when publish txn rowset_id:"
|
||||
<< rowset_ptr->rowset_id() << ", tablet id: " << tablet_id
|
||||
@ -388,10 +386,11 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
StorageEngine::instance()->add_unused_rowset(load_info.rowset);
|
||||
#endif
|
||||
VLOG_NOTICE << "delete transaction from engine successfully."
|
||||
<< " partition_id: " << key.first << ", transaction_id: " << key.second
|
||||
<< ", tablet: " << tablet_info.to_string() << ", rowset: "
|
||||
<< (load_info.rowset != nullptr ? load_info.rowset->rowset_id().to_string()
|
||||
: "0");
|
||||
<< " partition_id: " << key.first << ", transaction_id: " << key.second
|
||||
<< ", tablet: " << tablet_info.to_string() << ", rowset: "
|
||||
<< (load_info.rowset != nullptr
|
||||
? load_info.rowset->rowset_id().to_string()
|
||||
: "0");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -420,9 +419,9 @@ void TxnManager::get_tablet_related_txns(TTabletId tablet_id, SchemaHash schema_
|
||||
*partition_id = it.first.first;
|
||||
transaction_ids->insert(it.first.second);
|
||||
VLOG_NOTICE << "find transaction on tablet."
|
||||
<< "partition_id: " << it.first.first
|
||||
<< ", transaction_id: " << it.first.second
|
||||
<< ", tablet: " << tablet_info.to_string();
|
||||
<< "partition_id: " << it.first.first
|
||||
<< ", transaction_id: " << it.first.second
|
||||
<< ", tablet: " << tablet_info.to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -475,7 +474,7 @@ void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
|
||||
auto it = txn_tablet_map.find(key);
|
||||
if (it == txn_tablet_map.end()) {
|
||||
VLOG_NOTICE << "could not find tablet for"
|
||||
<< " partition_id=" << partition_id << ", transaction_id=" << transaction_id;
|
||||
<< " partition_id=" << partition_id << ", transaction_id=" << transaction_id;
|
||||
return;
|
||||
}
|
||||
std::map<TabletInfo, TabletTxnInfo>& load_info_map = it->second;
|
||||
@ -525,8 +524,8 @@ void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>
|
||||
(*expire_txn_map)[t_map.first].push_back(txn_id);
|
||||
if (VLOG_IS_ON(3)) {
|
||||
VLOG_NOTICE << "find expired txn."
|
||||
<< " tablet=" << t_map.first.to_string()
|
||||
<< " transaction_id=" << txn_id << " exist_sec=" << diff;
|
||||
<< " tablet=" << t_map.first.to_string()
|
||||
<< " transaction_id=" << txn_id << " exist_sec=" << diff;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user