fix complement data for same tablet repeatedly.
This commit is contained in:
parent
19ddc9dcf3
commit
c7666a7ad6
@ -258,12 +258,15 @@ int ObComplementDataContext::init(const ObComplementDataParam ¶m, const ObDa
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
void *builder_buf = nullptr;
|
||||
const ObSSTable *latest_major_sstable = nullptr;
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("ObComplementDataContext has already been inited", K(ret));
|
||||
} else if (OB_UNLIKELY(!param.is_valid() || !desc.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(param), K(desc));
|
||||
} else if (OB_FAIL(ObTabletDDLUtil::check_and_get_major_sstable(param.ls_id_, param.dest_tablet_id_, latest_major_sstable))) {
|
||||
LOG_WARN("check if major sstable exist failed", K(ret), K(param));
|
||||
} else if (OB_FAIL(data_sstable_redo_writer_.init(param.ls_id_,
|
||||
param.dest_tablet_id_))) {
|
||||
LOG_WARN("fail to init data sstable redo writer", K(ret), K(param));
|
||||
@ -278,6 +281,7 @@ int ObComplementDataContext::init(const ObComplementDataParam ¶m, const ObDa
|
||||
} else if (OB_FAIL(index_builder_->init(desc))) {
|
||||
LOG_WARN("failed to init index builder", K(ret), K(desc));
|
||||
} else {
|
||||
is_major_sstable_exist_ = nullptr != latest_major_sstable ? true : false;
|
||||
concurrent_cnt_ = param.concurrent_cnt_;
|
||||
is_inited_ = true;
|
||||
}
|
||||
@ -320,6 +324,7 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m)
|
||||
void ObComplementDataContext::destroy()
|
||||
{
|
||||
is_inited_ = false;
|
||||
is_major_sstable_exist_ = false;
|
||||
complement_data_ret_ = OB_SUCCESS;
|
||||
concurrent_cnt_ = 0;
|
||||
if (OB_NOT_NULL(index_builder_)) {
|
||||
@ -612,6 +617,8 @@ int ObComplementPrepareTask::process()
|
||||
} else if (FALSE_IT(dag = static_cast<ObComplementDataDag *>(tmp_dag))) {
|
||||
} else if (OB_FAIL(dag->prepare_context())) {
|
||||
LOG_WARN("prepare complement context failed", K(ret));
|
||||
} else if (context_->is_major_sstable_exist_) {
|
||||
FLOG_INFO("major sstable exists, all task should finish", K(ret), K(*param_));
|
||||
} else if (OB_FAIL(context_->write_start_log(*param_))) {
|
||||
LOG_WARN("write start log failed", K(ret), KPC(param_));
|
||||
} else {
|
||||
@ -672,6 +679,7 @@ int ObComplementWriteTask::process()
|
||||
LOG_WARN("dag is invalid", K(ret), KP(tmp_dag));
|
||||
} else if (OB_SUCCESS != (context_->complement_data_ret_)) {
|
||||
LOG_WARN("complement data has already failed", "ret", context_->complement_data_ret_);
|
||||
} else if (context_->is_major_sstable_exist_) {
|
||||
} else if (OB_FAIL(guard.switch_to(param_->tenant_id_))) {
|
||||
LOG_WARN("switch to tenant failed", K(ret), K(param_->tenant_id_));
|
||||
} else if (OB_FAIL(local_scan_by_range())) {
|
||||
@ -1123,6 +1131,23 @@ int ObComplementMergeTask::process()
|
||||
LOG_WARN("complement data has already failed", "ret", context_->complement_data_ret_);
|
||||
} else if (OB_FAIL(guard.switch_to(param_->hidden_table_schema_->get_tenant_id()))) {
|
||||
LOG_WARN("switch to tenant failed", K(ret), K(param_->hidden_table_schema_->get_tenant_id()));
|
||||
} else if (context_->is_major_sstable_exist_) {
|
||||
const ObSSTable *latest_major_sstable = nullptr;
|
||||
if (OB_FAIL(ObTabletDDLUtil::check_and_get_major_sstable(param_->ls_id_, param_->dest_tablet_id_, latest_major_sstable))) {
|
||||
LOG_WARN("check if major sstable exist failed", K(ret), K(*param_));
|
||||
} else if (OB_ISNULL(latest_major_sstable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, major sstable shoud not be null", K(ret), K(*param_));
|
||||
} else if (OB_FAIL(ObTabletDDLUtil::report_ddl_checksum(param_->ls_id_,
|
||||
param_->dest_tablet_id_,
|
||||
param_->hidden_table_schema_->get_table_id(),
|
||||
1 /* execution_id */,
|
||||
param_->task_id_,
|
||||
latest_major_sstable->get_meta().get_col_checksum()))) {
|
||||
LOG_WARN("report ddl column checksum failed", K(ret), K(*param_));
|
||||
} else if (OB_FAIL(GCTX.ob_service_->submit_tablet_update_task(param_->tenant_id_, param_->ls_id_, param_->dest_tablet_id_))) {
|
||||
LOG_WARN("fail to submit tablet update task", K(ret), K(*param_));
|
||||
}
|
||||
} else if (OB_FAIL(add_build_hidden_table_sstable())) {
|
||||
LOG_WARN("fail to build new sstable and write macro redo", K(ret));
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ struct ObComplementDataContext final
|
||||
{
|
||||
public:
|
||||
ObComplementDataContext():
|
||||
is_inited_(false), complement_data_ret_(common::OB_SUCCESS),
|
||||
is_inited_(false), is_major_sstable_exist_(false), complement_data_ret_(common::OB_SUCCESS),
|
||||
allocator_("ComplementData"), lock_(), concurrent_cnt_(0), data_sstable_redo_writer_(), index_builder_(nullptr)
|
||||
{}
|
||||
~ObComplementDataContext() { destroy(); }
|
||||
@ -115,6 +115,7 @@ public:
|
||||
TO_STRING_KV(K_(is_inited), K_(complement_data_ret), K_(concurrent_cnt), KP_(index_builder));
|
||||
public:
|
||||
bool is_inited_;
|
||||
bool is_major_sstable_exist_;
|
||||
int complement_data_ret_;
|
||||
common::ObArenaAllocator allocator_;
|
||||
ObSpinLock lock_;
|
||||
|
@ -354,10 +354,10 @@ int ObDDLTableMergeTask::process()
|
||||
ObTabletDDLParam ddl_param;
|
||||
ObTableHandleV2 table_handle;
|
||||
bool is_data_complete = false;
|
||||
bool is_major_sstable_exist = false;
|
||||
if (OB_FAIL(ObTabletDDLUtil::check_if_major_sstable_exist(merge_param_.ls_id_, merge_param_.tablet_id_, is_major_sstable_exist))) {
|
||||
const ObSSTable *latest_major_sstable = nullptr;
|
||||
if (OB_FAIL(ObTabletDDLUtil::check_and_get_major_sstable(merge_param_.ls_id_, merge_param_.tablet_id_, latest_major_sstable))) {
|
||||
LOG_WARN("check if major sstable exist failed", K(ret));
|
||||
} else if (is_major_sstable_exist) {
|
||||
} else if (nullptr != latest_major_sstable) {
|
||||
LOG_INFO("major sstable has been created before", K(merge_param_), K(ddl_param.table_key_));
|
||||
sstable = static_cast<ObSSTable *>(tablet_handle.get_obj()->get_table_store().get_major_sstables().get_boundary_table(false/*first*/));
|
||||
} else if (tablet_handle.get_obj()->get_tablet_meta().table_store_flag_.with_major_sstable()) {
|
||||
@ -829,14 +829,14 @@ int ObTabletDDLUtil::report_ddl_checksum(const share::ObLSID &ls_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLUtil::check_if_major_sstable_exist(const share::ObLSID &ls_id,
|
||||
const ObTabletID &tablet_id,
|
||||
bool &is_major_sstable_exist)
|
||||
int ObTabletDDLUtil::check_and_get_major_sstable(const share::ObLSID &ls_id,
|
||||
const ObTabletID &tablet_id,
|
||||
const ObSSTable *&latest_major_sstable)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObSSTable *latest_major_sstable = nullptr;
|
||||
latest_major_sstable = nullptr;
|
||||
if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_id));
|
||||
@ -852,7 +852,6 @@ int ObTabletDDLUtil::check_if_major_sstable_exist(const share::ObLSID &ls_id,
|
||||
} else {
|
||||
latest_major_sstable = static_cast<ObSSTable *>(
|
||||
tablet_handle.get_obj()->get_table_store().get_major_sstables().get_boundary_table(true/*last*/));
|
||||
is_major_sstable_exist = nullptr != latest_major_sstable;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -170,9 +170,9 @@ public:
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<int64_t> &column_checksums);
|
||||
static int check_if_major_sstable_exist(const share::ObLSID &ls_id,
|
||||
const ObTabletID &tablet_id,
|
||||
bool &is_major_sstable_exist);
|
||||
static int check_and_get_major_sstable(const share::ObLSID &ls_id,
|
||||
const ObTabletID &tablet_id,
|
||||
const blocksstable::ObSSTable *&latest_major_sstable);
|
||||
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user