fix drop column idempotence
This commit is contained in:
@ -379,7 +379,6 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m)
|
||||
int ret = OB_SUCCESS;
|
||||
ObITable::TableKey hidden_table_key;
|
||||
SCN start_scn;
|
||||
ObTabletDirectLoadMgrHandle handle;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObComplementDataContext not init", K(ret));
|
||||
@ -397,7 +396,7 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m)
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected err", K(ret), K(MTL_ID()));
|
||||
} else if (OB_FAIL(tenant_direct_load_mgr->open_tablet_direct_load(true, /*is_full_direct_load*/
|
||||
param.dest_ls_id_, param.dest_tablet_id_, context_id_, start_scn, handle))) {
|
||||
param.dest_ls_id_, param.dest_tablet_id_, context_id_, start_scn, tablet_direct_load_mgr_handle_))) {
|
||||
LOG_WARN("write ddl start log failed", K(ret));
|
||||
}
|
||||
LOG_INFO("complement task start ddl redo success", K(ret), K(param));
|
||||
@ -405,6 +404,41 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObComplementDataContext::check_already_committed(
|
||||
const ObLSID &ls_id,
|
||||
const ObTabletID &tablet_id,
|
||||
bool &is_commited)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
ObTabletHandle tablet_handle;
|
||||
is_commited = false;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObComplementDataContext has not been inited", K(ret));
|
||||
} else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle,
|
||||
tablet_id,
|
||||
tablet_handle,
|
||||
ObMDSGetTabletMode::READ_ALL_COMMITED))) {
|
||||
LOG_WARN("get tablet handle failed", K(ret), K(param));
|
||||
} else if (OB_UNLIKELY(nullptr == tablet_handle.get_obj())) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("tablet handle is null", K(ret), K(param));
|
||||
} else {
|
||||
is_commited = !tablet_direct_load_mgr_handle_.is_valid();
|
||||
if (!is_commited) {
|
||||
SCN commit_scn = tablet_direct_load_mgr_handle_.get_obj()->get_commit_scn(tablet_handle.get_obj()->get_tablet_meta());
|
||||
is_commited = commit_scn.is_valid_and_not_min();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObComplementDataContext::add_column_checksum(const ObIArray<int64_t> &report_col_checksums,
|
||||
const ObIArray<int64_t> &report_col_ids)
|
||||
{
|
||||
@ -1382,6 +1416,8 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
LOG_WARN("fail to yield dag", KR(ret));
|
||||
} else if (OB_FAIL(DDL_SIM(param_->dest_tenant_id_, param_->task_id_, DDL_INSERT_SSTABLE_GET_NEXT_ROW_FAILED))) {
|
||||
LOG_WARN("ddl sim failure", K(ret), KPC(param_));
|
||||
} else if ((0 == (context_->row_inserted_ % 100000)) && OB_FAIL(context_->check_already_committed(param_->dest_ls_id_, param_->dest_tablet_id_, ddl_committed))) {
|
||||
LOG_WARN("check tablet already committed failed", K(ret));
|
||||
} else if (OB_FAIL(scan->get_next_row(tmp_row, reshape_row_only_for_remote_scan))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to get next row", K(ret));
|
||||
@ -1405,10 +1441,6 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
context_->row_scanned_++;
|
||||
if (!ddl_committed && OB_FAIL(writer.append_row(datum_row))) {
|
||||
LOG_WARN("fail to append row to macro block", K(ret), K(datum_row));
|
||||
if (OB_TRANS_COMMITED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
ddl_committed = true;
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(checksum_calculator = scan->get_checksum_calculator())) {
|
||||
@ -1448,12 +1480,7 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
ObRowReshapeUtil::free_row_reshape(allocator, reshape_ptr, 1);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (!ddl_committed && OB_FAIL(writer.close())) {
|
||||
if (OB_TRANS_COMMITED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
ddl_committed = true;
|
||||
} else {
|
||||
LOG_WARN("fail to close writer", K(ret));
|
||||
}
|
||||
LOG_WARN("fail to close writer", K(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(scan->get_origin_table_checksum(report_col_checksums, report_col_ids))) {
|
||||
|
@ -129,6 +129,10 @@ public:
|
||||
int write_start_log(const ObComplementDataParam ¶m);
|
||||
int add_column_checksum(const ObIArray<int64_t> &report_col_checksums, const ObIArray<int64_t> &report_col_ids);
|
||||
int get_column_checksum(ObIArray<int64_t> &report_col_checksums, ObIArray<int64_t> &report_col_ids);
|
||||
int check_already_committed(
|
||||
const share::ObLSID &ls_id,
|
||||
const common::ObTabletID &tablet_id,
|
||||
bool &is_commited);
|
||||
TO_STRING_KV(K_(is_inited), K_(complement_data_ret), K_(concurrent_cnt), KP_(index_builder), K_(tablet_direct_load_mgr_handle), K_(row_scanned), K_(row_inserted));
|
||||
public:
|
||||
bool is_inited_;
|
||||
|
Reference in New Issue
Block a user