fix concurrency issue about co merge progress
This commit is contained in:
parent
583738a5ed
commit
73ab63df36
@ -413,6 +413,7 @@ ObCOMergeBatchExeDag::ObCOMergeBatchExeDag()
|
||||
start_cg_idx_(0),
|
||||
end_cg_idx_(0),
|
||||
retry_create_task_(false),
|
||||
progress_inited_(false),
|
||||
merge_progress_(nullptr)
|
||||
{
|
||||
}
|
||||
@ -618,10 +619,12 @@ int ObCOMergeBatchExeDag::init_merge_progress()
|
||||
if (OB_UNLIKELY(nullptr == dag_net || nullptr == (ctx = dag_net->get_merge_ctx()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("dag_net or ctx is null", K(ret), KPC(dag_net));
|
||||
} else if (OB_ISNULL(merge_progress_)
|
||||
} else if (!progress_inited_
|
||||
&& OB_ISNULL(merge_progress_)
|
||||
&& OB_FAIL(ctx->prepare_merge_progress(merge_progress_, this, start_cg_idx_, end_cg_idx_))) {
|
||||
STORAGE_LOG(WARN, "fail to prepare merge_progress", K(ret));
|
||||
}
|
||||
progress_inited_ = true; // execute only once regardless of any failure
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -751,18 +754,18 @@ int ObCOMergeBatchExeTask::process()
|
||||
void ObCOMergeBatchExeTask::merge_start()
|
||||
{
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObIDag::ObDagGuard guard(*dag_);
|
||||
ObCOMergeBatchExeDag *execute_dag = static_cast<ObCOMergeBatchExeDag*>(dag_);
|
||||
ctx_->cg_merge_info_array_[execute_dag->get_start_cg_idx()]->get_sstable_merge_info().update_start_time();
|
||||
// execute init_progress only once
|
||||
if (OB_TMP_FAIL(execute_dag->init_merge_progress())) {
|
||||
LOG_WARN_RET(tmp_ret, "failed to init merge progress");
|
||||
}
|
||||
// each task has one merger, and all mergers share the progress
|
||||
ObCOMerger *co_merger = static_cast<ObCOMerger*>(merger_);
|
||||
co_merger->set_merge_progress(execute_dag->get_merge_progress());
|
||||
{
|
||||
ObIDag::ObDagGuard guard(*dag_);
|
||||
if (!execute_dag->get_time_guard().is_empty()) {
|
||||
execute_dag->get_time_guard().reuse();
|
||||
}
|
||||
// execute time click init only once
|
||||
if (execute_dag->get_time_guard().is_empty()) {
|
||||
ctx_->cg_merge_info_array_[execute_dag->get_start_cg_idx()]->get_sstable_merge_info().update_start_time();
|
||||
execute_dag->dag_time_guard_click(ObStorageCompactionTimeGuard::DAG_WAIT_TO_SCHEDULE);
|
||||
}
|
||||
}
|
||||
|
@ -164,6 +164,7 @@ private:
|
||||
uint32_t start_cg_idx_;
|
||||
uint32_t end_cg_idx_;
|
||||
bool retry_create_task_;
|
||||
bool progress_inited_; // inited = true & merge_progress_ = nullptr means init failed
|
||||
compaction::ObPartitionMergeProgress *merge_progress_;
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user