split calc data_size from write lock
This commit is contained in:
@ -73,6 +73,12 @@ int ObAllVirtualServerCompactionProgress::fill_cells()
|
||||
int64_t compression_ratio = 0;
|
||||
int64_t estimate_finish_time = 0;
|
||||
compaction::ObServerCompactionEvent tmp_event;
|
||||
if (!progress_.is_inited_) {
|
||||
progress_.total_tablet_cnt_ = -1;
|
||||
progress_.unfinished_tablet_cnt_ = -1;
|
||||
progress_.data_size_ = -1;
|
||||
progress_.unfinished_data_size_ = -1;
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < col_count; ++i) {
|
||||
uint64_t col_id = output_column_ids_.at(i);
|
||||
switch (col_id) {
|
||||
@ -147,8 +153,8 @@ int ObAllVirtualServerCompactionProgress::fill_cells()
|
||||
cells[i].set_timestamp(progress_.estimated_finish_time_);
|
||||
break;
|
||||
case COMMENTS:
|
||||
cells[i].set_varchar("");
|
||||
cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
|
||||
MEMSET(event_buf_, '\0', sizeof(event_buf_));
|
||||
tmp_event.reset();
|
||||
if (share::ObIDag::DAG_STATUS_FINISH != progress_.status_) {
|
||||
MTL_SWITCH(progress_.tenant_id_) {
|
||||
MTL(compaction::ObServerCompactionEventHistory *)->get_last_event(tmp_event);
|
||||
@ -158,13 +164,15 @@ int ObAllVirtualServerCompactionProgress::fill_cells()
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
cells[i].set_varchar(event_buf_);
|
||||
}
|
||||
} else {
|
||||
progress_.sum_time_guard_.to_string(event_buf_, sizeof(event_buf_));
|
||||
cells[i].set_varchar(event_buf_);
|
||||
}
|
||||
{
|
||||
int64_t pos = strlen(event_buf_);
|
||||
databuff_printf(event_buf_, sizeof(event_buf_), pos, "is_inited:%d", progress_.is_inited_);
|
||||
}
|
||||
cells[i].set_varchar(event_buf_);
|
||||
cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
||||
@ -67,6 +67,7 @@ bool ObTenantCompactionProgress::is_valid() const
|
||||
|
||||
ObTenantCompactionProgress & ObTenantCompactionProgress::operator=(const ObTenantCompactionProgress &other)
|
||||
{
|
||||
is_inited_ = other.is_inited_;
|
||||
tenant_id_ = other.tenant_id_;
|
||||
merge_type_ = other.merge_type_;
|
||||
merge_version_ = other.merge_version_;
|
||||
@ -151,7 +152,6 @@ void ObTenantCompactionProgressMgr::destroy()
|
||||
|
||||
int ObTenantCompactionProgressMgr::loop_major_sstable_(
|
||||
const int64_t merge_snapshot_version,
|
||||
const bool equal_flag,
|
||||
int64_t &cnt,
|
||||
int64_t &size)
|
||||
{
|
||||
@ -202,8 +202,11 @@ int ObTenantCompactionProgressMgr::loop_major_sstable_(
|
||||
} else if (OB_ISNULL(sstable = static_cast<ObSSTable *>(
|
||||
table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true/*last*/)))) {
|
||||
// do nothing
|
||||
} else if ((equal_flag && sstable->get_snapshot_version() == merge_snapshot_version)
|
||||
|| (!equal_flag && sstable->get_snapshot_version() < merge_snapshot_version)) {
|
||||
} else if (sstable->get_snapshot_version() <= merge_snapshot_version) {
|
||||
// ATTENTION:
|
||||
// 1. it is hard to distinguish whether this major was generated by this compaction or whether it existed before the compaction,
|
||||
// so maybe some more tablets will be calculated.
|
||||
// 2. for the major sstable generated by this compaction, the size will be calculated larger than the old major sstable.
|
||||
++cnt;
|
||||
size += sstable->get_total_macro_block_count() * DEFAULT_MACRO_BLOCK_SIZE;
|
||||
}
|
||||
@ -216,33 +219,6 @@ int ObTenantCompactionProgressMgr::loop_major_sstable_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantCompactionProgressMgr::init_progress_(ObTenantCompactionProgress &progress)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t occupy_size = 0;
|
||||
|
||||
if (OB_FAIL(loop_major_sstable_(
|
||||
progress.merge_version_,
|
||||
false/*equal_flag*/,
|
||||
progress.total_tablet_cnt_,
|
||||
occupy_size))) {
|
||||
LOG_WARN("failed to get sstable info", K(ret));
|
||||
} else {
|
||||
progress.tenant_id_ = MTL_ID();
|
||||
progress.data_size_ = occupy_size;
|
||||
progress.merge_type_ = MAJOR_MERGE;
|
||||
progress.unfinished_tablet_cnt_ = progress.total_tablet_cnt_;
|
||||
progress.start_time_ = ObTimeUtility::fast_current_time();
|
||||
progress.unfinished_data_size_ = occupy_size;
|
||||
progress.status_ = share::ObIDag::DAG_STATUS_INITING;
|
||||
|
||||
progress.estimated_finish_time_ = ObTimeUtility::fast_current_time()
|
||||
+ progress.data_size_ * ObCompactionProgress::MERGE_SPEED + ObCompactionProgress::EXTRA_TIME;
|
||||
LOG_DEBUG("success to gene_compaction_info", K(progress));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantCompactionProgressMgr::add_progress(const int64_t major_snapshot_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -256,12 +232,17 @@ int ObTenantCompactionProgressMgr::add_progress(const int64_t major_snapshot_ver
|
||||
LOG_WARN("pos is invalid", K(ret), K(pos), K(major_snapshot_version));
|
||||
} else {
|
||||
ret = OB_SUCCESS; // clear OB_ENTRY_NOT_EXIST
|
||||
(void)finish_progress_(array_[ObInfoRingArray::get_last_pos()]);
|
||||
// force to finish previous progress
|
||||
for (int64_t i = 0; i < size(); ++i) {
|
||||
(void)finish_progress_(array_[i]);
|
||||
}
|
||||
ObTenantCompactionProgress progress;
|
||||
progress.merge_version_ = major_snapshot_version;
|
||||
if (OB_FAIL(init_progress_(progress))) {
|
||||
LOG_WARN("failed to init progress", K(ret), K(major_snapshot_version));
|
||||
} else if (OB_FAIL(ObInfoRingArray::add_no_lock(progress))) {
|
||||
progress.tenant_id_ = MTL_ID();
|
||||
progress.merge_type_ = MAJOR_MERGE;
|
||||
progress.start_time_ = ObTimeUtility::fast_current_time();
|
||||
progress.status_ = share::ObIDag::DAG_STATUS_INITING;
|
||||
if (OB_FAIL(ObInfoRingArray::add_no_lock(progress))) {
|
||||
LOG_WARN("failed to add progress", K(ret));
|
||||
} else {
|
||||
LOG_INFO("add_progress", K(ret), K(major_snapshot_version), K(progress), K(size()));
|
||||
@ -271,6 +252,39 @@ int ObTenantCompactionProgressMgr::add_progress(const int64_t major_snapshot_ver
|
||||
return ret;
|
||||
}
|
||||
|
||||
// init data size
|
||||
int ObTenantCompactionProgressMgr::init_progress(const int64_t major_snapshot_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t total_tablet_cnt = 0;
|
||||
int64_t occupy_size = 0;
|
||||
if (OB_UNLIKELY(major_snapshot_version <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(major_snapshot_version));
|
||||
} else if (OB_FAIL(loop_major_sstable_(
|
||||
major_snapshot_version,
|
||||
total_tablet_cnt,
|
||||
occupy_size))) {
|
||||
LOG_WARN("failed to get sstable info", K(ret));
|
||||
} else {
|
||||
int64_t pos = -1;
|
||||
SpinWLockGuard guard(lock_);
|
||||
if (OB_FAIL(get_pos_(major_snapshot_version, pos))) {
|
||||
LOG_WARN("pos is invalid", K(ret), K(pos), K(major_snapshot_version));
|
||||
} else if (share::ObIDag::DAG_STATUS_FINISH != array_[pos].status_) { // before init, major probably already finished
|
||||
array_[pos].is_inited_ = true;
|
||||
array_[pos].total_tablet_cnt_ = total_tablet_cnt;
|
||||
array_[pos].data_size_ += occupy_size;
|
||||
array_[pos].unfinished_tablet_cnt_ += array_[pos].total_tablet_cnt_;
|
||||
array_[pos].unfinished_data_size_ += array_[pos].data_size_;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
LOG_INFO("success to init progress", K(ret), K(major_snapshot_version), K(array_[pos]));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantCompactionProgressMgr::finish_progress_(ObTenantCompactionProgress &progress)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -360,7 +374,7 @@ int ObTenantCompactionProgressMgr::update_progress(
|
||||
LOG_WARN("pos is invalid", K(ret), K(pos), K(major_snapshot_version));
|
||||
} else if (share::ObIDag::DAG_STATUS_FINISH != array_[pos].status_) {
|
||||
if (finish_flag && !co_merge) {
|
||||
if (OB_UNLIKELY(0 == array_[pos].unfinished_tablet_cnt_)) {
|
||||
if (array_[pos].is_inited_ && OB_UNLIKELY(0 == array_[pos].unfinished_tablet_cnt_)) {
|
||||
if (REACH_TIME_INTERVAL(1000 * 1000)) {
|
||||
LOG_WARN("unfinished partition count is invalid", K(ret), K(array_[pos].unfinished_tablet_cnt_));
|
||||
}
|
||||
@ -373,30 +387,33 @@ int ObTenantCompactionProgressMgr::update_progress(
|
||||
array_[pos].unfinished_data_size_ += total_data_size_delta;
|
||||
array_[pos].unfinished_data_size_ -= scanned_data_size_delta;
|
||||
if (nullptr != time_guard) {
|
||||
if (array_[pos].sum_time_guard_.is_empty()) {
|
||||
array_[pos].sum_time_guard_ = *time_guard;
|
||||
} else {
|
||||
array_[pos].sum_time_guard_.add_time_guard(*time_guard);
|
||||
// ObCompactionTimeGuard don't have to_string
|
||||
const ObStorageCompactionTimeGuard *storage_time_guard = static_cast<const ObStorageCompactionTimeGuard *>(time_guard);
|
||||
array_[pos].sum_time_guard_.add_time_guard(*storage_time_guard);
|
||||
}
|
||||
|
||||
if (array_[pos].is_inited_) {
|
||||
if (OB_UNLIKELY(array_[pos].data_size_ < 0)) {
|
||||
LOG_WARN("data size is invalid", K(ret), K(array_[pos].data_size_));
|
||||
array_[pos].data_size_ = 0;
|
||||
}
|
||||
if (OB_UNLIKELY(array_[pos].unfinished_data_size_ < 0)) {
|
||||
LOG_WARN("unfinished data size is invalid", K(ret), K(array_[pos].unfinished_data_size_));
|
||||
array_[pos].unfinished_data_size_ = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_UNLIKELY(array_[pos].data_size_ < 0)) {
|
||||
LOG_WARN("data size is invalid", K(ret), K(array_[pos].data_size_));
|
||||
array_[pos].data_size_ = 0;
|
||||
}
|
||||
if (OB_UNLIKELY(array_[pos].unfinished_data_size_ < 0)) {
|
||||
LOG_WARN("unfinished data size is invalid", K(ret), K(array_[pos].unfinished_data_size_));
|
||||
array_[pos].unfinished_data_size_ = 0;
|
||||
}
|
||||
array_[pos].estimated_finish_time_ = MAX(array_[pos].estimated_finish_time_, estimate_finish_time);
|
||||
if (REACH_TIME_INTERVAL(FINISH_TIME_UPDATE_FROM_SCHEDULER_INTERVAL)) {
|
||||
const int64_t current_time = ObTimeUtility::fast_current_time();
|
||||
int64_t rest_time = 0;
|
||||
int64_t data_size = array_[pos].data_size_ < 0 ? 0 : array_[pos].data_size_;
|
||||
int64_t unfinished_data_size = array_[pos].unfinished_data_size_ < 0 ? 0 : array_[pos].unfinished_data_size_;
|
||||
const int64_t used_time = current_time - array_[pos].start_time_;
|
||||
if (0 != used_time) {
|
||||
const float work_ratio = (float)(array_[pos].data_size_ - array_[pos].unfinished_data_size_) / used_time;
|
||||
const float work_ratio = (float)(data_size - unfinished_data_size) / used_time;
|
||||
if (fabs(work_ratio) > 1e-6) {
|
||||
rest_time = (int64_t)(array_[pos].unfinished_data_size_ / work_ratio);
|
||||
rest_time = (int64_t)(unfinished_data_size / work_ratio);
|
||||
}
|
||||
}
|
||||
array_[pos].estimated_finish_time_ = MAX(array_[pos].estimated_finish_time_, current_time + rest_time);
|
||||
|
||||
@ -70,6 +70,7 @@ struct ObTenantCompactionProgress : public ObCompactionProgress
|
||||
{
|
||||
ObTenantCompactionProgress()
|
||||
: ObCompactionProgress(),
|
||||
is_inited_(false),
|
||||
total_tablet_cnt_(0),
|
||||
unfinished_tablet_cnt_(0),
|
||||
sum_time_guard_()
|
||||
@ -77,12 +78,13 @@ struct ObTenantCompactionProgress : public ObCompactionProgress
|
||||
}
|
||||
bool is_valid() const;
|
||||
ObTenantCompactionProgress & operator=(const ObTenantCompactionProgress &other);
|
||||
INHERIT_TO_STRING_KV("ObCompactionProgress", ObCompactionProgress, K_(total_tablet_cnt),
|
||||
INHERIT_TO_STRING_KV("ObCompactionProgress", ObCompactionProgress, K_(is_inited), K_(total_tablet_cnt),
|
||||
K_(unfinished_tablet_cnt), K_(sum_time_guard));
|
||||
|
||||
bool is_inited_;
|
||||
int64_t total_tablet_cnt_;
|
||||
int64_t unfinished_tablet_cnt_;
|
||||
ObCompactionTimeGuard sum_time_guard_;
|
||||
ObStorageCompactionTimeGuard sum_time_guard_;
|
||||
};
|
||||
|
||||
/*
|
||||
@ -103,6 +105,7 @@ public:
|
||||
void destroy();
|
||||
|
||||
int add_progress(const int64_t major_snapshot_version);
|
||||
int init_progress(const int64_t major_snapshot_version);
|
||||
int update_progress_status(const int64_t major_snapshot_version, share::ObIDag::ObDagStatus status);
|
||||
int update_progress(
|
||||
const int64_t major_snapshot_version,
|
||||
@ -116,8 +119,7 @@ public:
|
||||
int update_compression_ratio(const int64_t major_snapshot_version, storage::ObSSTableMergeInfo &info);
|
||||
|
||||
private:
|
||||
int init_progress_(ObTenantCompactionProgress &progress);
|
||||
int loop_major_sstable_(int64_t version, const bool equal_flag, int64_t &cnt, int64_t &size);
|
||||
int loop_major_sstable_(int64_t version, int64_t &cnt, int64_t &size);
|
||||
int finish_progress_(ObTenantCompactionProgress &progress);
|
||||
OB_INLINE int get_pos_(const int64_t major_snapshot_version, int64_t &pos) const;
|
||||
|
||||
|
||||
@ -546,15 +546,19 @@ int ObTenantTabletScheduler::schedule_merge(const int64_t broadcast_version)
|
||||
LOG_WARN("Invalid argument, ", K(broadcast_version), K(ret));
|
||||
} else if (broadcast_version <= get_frozen_version()) {
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
// add progress first before change frozen_version_
|
||||
if (OB_TMP_FAIL(MTL(ObTenantCompactionProgressMgr *)->add_progress(broadcast_version))) {
|
||||
LOG_WARN("failed to add progress", K(tmp_ret), K(broadcast_version));
|
||||
}
|
||||
{
|
||||
obsys::ObRLockGuard frozen_version_guard(frozen_version_lock_);
|
||||
frozen_version_ = broadcast_version;
|
||||
}
|
||||
|
||||
LOG_INFO("schedule merge major version", K(broadcast_version));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(MTL(ObTenantCompactionProgressMgr *)->add_progress(broadcast_version))) {
|
||||
LOG_WARN("failed to add progress", K(tmp_ret), K(broadcast_version));
|
||||
if (OB_TMP_FAIL(MTL(ObTenantCompactionProgressMgr *)->init_progress(broadcast_version))) {
|
||||
LOG_WARN("failed to init progress", K(tmp_ret), K(broadcast_version));
|
||||
}
|
||||
loop_cnt_ = 0;
|
||||
clear_error_tablet_cnt();
|
||||
@ -1624,11 +1628,8 @@ int ObTenantTabletScheduler::update_major_progress(const int64_t merge_version)
|
||||
if (major_merged_scn > merged_version_) {
|
||||
FLOG_INFO("last major merge finish", K(merge_version), K(major_merged_scn), K(merged_version_));
|
||||
merged_version_ = major_merged_scn;
|
||||
if (OB_FAIL(MTL(ObTenantCompactionProgressMgr *)->update_progress_status(
|
||||
merged_version_, share::ObIDag::DAG_STATUS_FINISH))) {
|
||||
LOG_WARN("failed to finish progress", KR(ret), K(merge_version));
|
||||
}
|
||||
} else if (OB_FAIL(MTL(ObTenantCompactionProgressMgr *)->update_progress_status(
|
||||
}
|
||||
if (OB_FAIL(MTL(ObTenantCompactionProgressMgr *)->update_progress_status(
|
||||
merge_version, share::ObIDag::DAG_STATUS_NODE_RUNNING))) {
|
||||
LOG_WARN("failed to update progress", KR(ret), K(merge_version));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user