fix update partition merge progress
This commit is contained in:
parent
38278be8b0
commit
1de9266dd0
@ -134,7 +134,6 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx)
|
||||
LOG_WARN("get invalid arguments", K(ret), K(ctx));
|
||||
} else {
|
||||
const ObIArray<ObITable*> &tables = ctx->tables_handle_.get_tables();
|
||||
ObITable *table = nullptr;
|
||||
int64_t old_major_data_size = 0;
|
||||
if (OB_UNLIKELY(0 == tables.count() || NULL == tables.at(0))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -162,26 +161,30 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx)
|
||||
} else {
|
||||
estimate_row_cnt_ = MAX(1, part_estimate.physical_row_count_);
|
||||
for (int64_t i = tables.count() - 1; i >= 0; --i) {
|
||||
if (OB_ISNULL(table = tables.at(i))) {
|
||||
if (OB_UNLIKELY(nullptr == tables.at(i) || !tables.at(i)->is_memtable())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null table", K(ret), K(i), K(table));
|
||||
} else if (table->is_memtable()) {
|
||||
estimate_occupy_size_ = static_cast<ObMemtable *>(table)->get_occupied_size();
|
||||
LOG_WARN("get unexpected null table", K(ret), K(i), K(tables.at(i)));
|
||||
} else {
|
||||
estimate_occupy_size_ += static_cast<ObMemtable *>(tables.at(i))->get_occupied_size();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int64_t total_macro_block_cnt = 0;
|
||||
ObSSTable *table = nullptr;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < tables.count(); ++i) {
|
||||
if (OB_ISNULL(table = tables.at(i))) {
|
||||
if (OB_UNLIKELY(nullptr == tables.at(i) || !tables.at(i)->is_sstable())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null table", K(ret), K(i), K(table));
|
||||
} else if (table->is_sstable()) {
|
||||
const ObSSTableBasicMeta &meta = static_cast<ObSSTable *>(table)->get_meta().get_basic_meta();
|
||||
LOG_WARN("get unexpected table", K(ret), K(i), KPC(tables.at(i)));
|
||||
} else {
|
||||
table = static_cast<ObSSTable *>(tables.at(i));
|
||||
const ObSSTableBasicMeta &meta = table->get_meta().get_basic_meta();
|
||||
if (meta.get_total_macro_block_count() <= 0) {
|
||||
LOG_DEBUG("table is empty, skip it", K(i), KPC(static_cast<ObSSTable *>(table)));
|
||||
continue;
|
||||
} else {
|
||||
total_macro_block_cnt += meta.get_total_macro_block_count() - meta.get_total_use_old_macro_block_count();
|
||||
estimate_row_cnt_ += meta.row_count_;
|
||||
estimate_occupy_size_ += meta.occupy_size_;
|
||||
if (table->is_major_sstable()) {
|
||||
@ -190,6 +193,10 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (0 != total_macro_block_cnt) {
|
||||
estimate_row_cnt_ = (0 == estimate_row_cnt_) ? DEFAULT_ROW_CNT_PER_MACRO_BLOCK * total_macro_block_cnt : estimate_row_cnt_;
|
||||
estimate_occupy_size_ = (0 == estimate_occupy_size_) ? common::OB_DEFAULT_MACRO_BLOCK_SIZE * total_macro_block_cnt : estimate_occupy_size_;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -241,9 +248,6 @@ int ObPartitionMergeProgress::update_merge_progress(
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObPartitionMergeProgress not inited", K(ret));
|
||||
} else if (OB_UNLIKELY(0 == estimate_row_cnt_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected zero estimate_total_units_", K(ret));
|
||||
} else if (OB_UNLIKELY(idx < 0 || idx >= concurrent_cnt_ || scanned_row_cnt < 0 || output_block_cnt < 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid arguments", K(ret), K(idx), K(concurrent_cnt_), K(scanned_row_cnt), K(output_block_cnt));
|
||||
@ -264,7 +268,7 @@ int ObPartitionMergeProgress::update_merge_progress(
|
||||
}
|
||||
|
||||
if (scanned_row_cnt >= estimate_row_cnt_) {
|
||||
estimate_row_cnt_ += scanned_row_cnt - pre_scanned_row_cnt_;
|
||||
estimate_row_cnt_ += MAX(scanned_row_cnt / DEFAULT_INCREMENT_ROW_FACTOR, 1);
|
||||
avg_row_length_ = estimate_occupy_size_ * 1.0 / estimate_row_cnt_;
|
||||
}
|
||||
|
||||
@ -295,14 +299,14 @@ int ObPartitionMergeProgress::update_merge_info(ObSSTableMergeInfo &merge_info)
|
||||
void ObPartitionMergeProgress::update_estimated_finish_time_()
|
||||
{
|
||||
int64_t current_time = ObTimeUtility::fast_current_time();
|
||||
if (0 == pre_scanned_row_cnt_) {
|
||||
if (0 == pre_scanned_row_cnt_) { // first time to init merge_progress
|
||||
int64_t spend_time = estimate_occupy_size_ / common::OB_DEFAULT_MACRO_BLOCK_SIZE * ObCompactionProgress::MERGE_SPEED
|
||||
+ ObCompactionProgress::EXTRA_TIME;
|
||||
estimated_finish_time_ = spend_time + current_time + UPDATE_INTERVAL;
|
||||
} else {
|
||||
int64_t rest_time = (estimate_row_cnt_ - pre_scanned_row_cnt_)
|
||||
* (current_time - merge_dag_->get_start_time()) / pre_scanned_row_cnt_;
|
||||
estimated_finish_time_ = current_time + rest_time + UPDATE_INTERVAL;
|
||||
int64_t delta_row_cnt = estimate_row_cnt_ - pre_scanned_row_cnt_;
|
||||
int64_t rest_time = MAX(1, delta_row_cnt) * (current_time - merge_dag_->get_start_time()) / pre_scanned_row_cnt_;
|
||||
estimated_finish_time_ = MAX(estimated_finish_time_, current_time + rest_time + UPDATE_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
@ -378,7 +382,7 @@ int ObPartitionMajorMergeProgress::update_merge_progress(
|
||||
}
|
||||
|
||||
if (scanned_row_cnt >= estimate_row_cnt_) {
|
||||
estimate_row_cnt_ += scanned_row_cnt - pre_scanned_row_cnt_;
|
||||
estimate_row_cnt_ += MAX(scanned_row_cnt / DEFAULT_INCREMENT_ROW_FACTOR, 1);
|
||||
avg_row_length_ = estimate_occupy_size_ * 1.0 / estimate_row_cnt_;
|
||||
}
|
||||
|
||||
|
@ -50,6 +50,8 @@ public:
|
||||
public:
|
||||
static const int32_t UPDATE_INTERVAL = 2 * 1000 * 1000; // 2 second
|
||||
static const int32_t NORMAL_UPDATE_PARAM = 300;
|
||||
static const int32_t DEFAULT_ROW_CNT_PER_MACRO_BLOCK = 1000;
|
||||
static const int32_t DEFAULT_INCREMENT_ROW_FACTOR = 10;
|
||||
protected:
|
||||
int estimate(ObTabletMergeCtx *ctx);
|
||||
void update_estimated_finish_time_();
|
||||
|
Loading…
x
Reference in New Issue
Block a user