Add ddl_commit_scn to guarantee single ddl commit log
This commit is contained in:
@ -981,6 +981,7 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan)
|
||||
ObDatumRow datum_row;
|
||||
int64_t rowkey_column_cnt = 0;
|
||||
const int64_t extra_rowkey_cnt = storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
|
||||
bool ddl_committed = false;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObComplementWriteTask is not inited", K(ret));
|
||||
@ -1020,7 +1021,7 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan)
|
||||
LOG_WARN("the dag of this task is null", K(ret));
|
||||
} else if (FALSE_IT(sstable_redo_writer.set_start_scn(
|
||||
static_cast<ObComplementDataDag *>(get_dag())->get_context().data_sstable_redo_writer_.get_start_scn()))) {
|
||||
} else if (OB_FAIL(callback.init(DDL_MB_DATA_TYPE, hidden_table_key, &sstable_redo_writer))) {
|
||||
} else if (OB_FAIL(callback.init(DDL_MB_DATA_TYPE, hidden_table_key, &sstable_redo_writer, context_->ddl_kv_mgr_handle_))) {
|
||||
LOG_WARN("fail to init data callback", K(ret), K(hidden_table_key));
|
||||
} else if (OB_FAIL(writer.open(data_desc, macro_start_seq, &callback))) {
|
||||
LOG_WARN("fail to open macro block writer", K(ret), K(data_desc));
|
||||
@ -1092,8 +1093,14 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan)
|
||||
t2 = ObTimeUtility::current_time();
|
||||
get_next_row_time += t2 - t1;
|
||||
context_->row_scanned_++;
|
||||
if (OB_FAIL(writer.append_row(datum_row))) {
|
||||
if (!ddl_committed && OB_FAIL(writer.append_row(datum_row))) {
|
||||
LOG_WARN("fail to append row to macro block", K(ret), K(datum_row));
|
||||
if (OB_TRANS_COMMITED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
ddl_committed = true;
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(checksum_calculator = local_scan.get_checksum_calculator())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("checksum calculator is nullptr", K(ret), KP(checksum_calculator));
|
||||
@ -1116,8 +1123,15 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan)
|
||||
K(get_next_row_time), K(append_row_time));
|
||||
ObRowReshapeUtil::free_row_reshape(allocator, reshape_ptr, 1);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(writer.close())) {
|
||||
LOG_WARN("fail to close writer", K(ret));
|
||||
} else if (!ddl_committed && OB_FAIL(writer.close())) {
|
||||
if (OB_TRANS_COMMITED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
ddl_committed = true;
|
||||
} else {
|
||||
LOG_WARN("fail to close writer", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(local_scan.get_origin_table_checksum(report_col_checksums, report_col_ids))) {
|
||||
LOG_WARN("fail to get origin table columns checksum", K(ret));
|
||||
} else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(param_->tenant_id_,
|
||||
@ -1214,8 +1228,6 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
ObTablet *tablet = nullptr;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObITable::TableKey hidden_table_key;
|
||||
SCN commit_scn;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
@ -1228,49 +1240,11 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
|
||||
LOG_WARN("error unexpected", K(ret), KP(param_), KP(context_));
|
||||
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(param_->ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(param_->ls_id_));
|
||||
} else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, param_->dest_tablet_id_, tablet_handle))) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K(param_->ls_id_), K(param_->dest_tablet_id_));
|
||||
} else if (OB_ISNULL(tablet = tablet_handle.get_obj())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tablet is null", K(ret), K(param_->dest_tablet_id_));
|
||||
} else if (OB_FAIL(param_->get_hidden_table_key(hidden_table_key))) {
|
||||
LOG_WARN("fail to get hidden table key", K(ret), K(hidden_table_key));
|
||||
} else if (OB_UNLIKELY(!hidden_table_key.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("hidden table key is invalid", K(ret), K(hidden_table_key));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(context_->data_sstable_redo_writer_.write_commit_log(hidden_table_key,
|
||||
param_->dest_table_id_,
|
||||
param_->execution_id_,
|
||||
param_->task_id_,
|
||||
commit_scn))) {
|
||||
if (OB_TASK_EXPIRED == ret) {
|
||||
LOG_INFO("ddl task expired", K(ret), K(hidden_table_key), KPC(param_));
|
||||
} else {
|
||||
LOG_WARN("fail write ddl commit log", K(ret), K(hidden_table_key));
|
||||
}
|
||||
} else {
|
||||
ObTabletHandle new_tablet_handle; // no use here
|
||||
ObDDLKvMgrHandle ddl_kv_mgr_handle;
|
||||
const ObLSID &ls_id = param_->ls_id_;
|
||||
const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_;
|
||||
const SCN &ddl_start_scn = static_cast<ObComplementDataDag *>(get_dag())->get_context().data_sstable_redo_writer_.get_start_scn();
|
||||
const uint64_t table_id = param_->dest_table_id_;
|
||||
const int64_t ddl_task_id = param_->task_id_;
|
||||
if (OB_FAIL(tablet->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("get ddl kv manager failed", K(ret));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(ddl_start_scn,
|
||||
commit_scn,
|
||||
table_id,
|
||||
ddl_task_id))) {
|
||||
LOG_WARN("commit ddl log failed", K(ret), K(ls_id), K(tablet_id), K(commit_scn), K(hidden_table_key),
|
||||
K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_merge_success(ddl_start_scn, commit_scn))) {
|
||||
LOG_WARN("wait ddl merge failed", K(ret), K(ls_id), K(tablet_id), K(hidden_table_key),
|
||||
K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
|
||||
}
|
||||
} else if (OB_FAIL(context_->data_sstable_redo_writer_.end_ddl_redo_and_create_ddl_sstable(
|
||||
ls_handle, hidden_table_key, param_->dest_table_id_, param_->execution_id_, param_->task_id_))) {
|
||||
LOG_WARN("failed to end ddl redo", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user