fix checksum error of drop column
This commit is contained in:
@ -327,6 +327,53 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObComplementDataContext::add_column_checksum(const ObIArray<int64_t> &report_col_checksums,
|
||||
const ObIArray<int64_t> &report_col_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(lock_);
|
||||
if (0 == report_col_checksums_.count()) {
|
||||
if (OB_FAIL(report_col_checksums_.prepare_allocate(report_col_checksums.count()))) {
|
||||
LOG_WARN("prepare allocate report column checksum array failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && 0 == report_col_ids_.count()) {
|
||||
if (OB_FAIL(report_col_ids_.prepare_allocate(report_col_ids.count()))) {
|
||||
LOG_WARN("prepare allocate report col checksum array failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (report_col_checksums_.count() != report_col_checksums.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, report col checksum array count is not equal", K(ret), K(report_col_checksums.count()), K(report_col_checksums_.count()));
|
||||
} else if (report_col_ids_.count() != report_col_ids.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, report col ids array count is not equal", K(ret), K(report_col_ids.count()), K(report_col_ids_.count()));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < report_col_checksums.count(); ++i) {
|
||||
report_col_checksums_.at(i) += report_col_checksums.at(i);
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < report_col_ids.count(); ++i) {
|
||||
report_col_ids_.at(i) = report_col_ids.at(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObComplementDataContext::get_column_checksum(ObIArray<int64_t> &report_col_checksums,
|
||||
ObIArray<int64_t> &report_col_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(lock_);
|
||||
if (OB_FAIL(report_col_checksums.assign(report_col_checksums_))) {
|
||||
LOG_WARN("assign column checksum failed", K(ret));
|
||||
} else if (OB_FAIL(report_col_ids.assign(report_col_ids_))) {
|
||||
LOG_WARN("assign column ids failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObComplementDataContext::destroy()
|
||||
{
|
||||
is_inited_ = false;
|
||||
@ -629,7 +676,7 @@ int ObComplementPrepareTask::process()
|
||||
FLOG_INFO("major sstable exists, all task should finish", K(ret), K(*param_));
|
||||
} else if (OB_FAIL(context_->write_start_log(*param_))) {
|
||||
LOG_WARN("write start log failed", K(ret), KPC(param_));
|
||||
} else if (OB_FAIL(ObDDLChecksumOperator::delete_checksum(param_->dest_tenant_id_,
|
||||
} else if (!param_->use_new_checksum() && OB_FAIL(ObDDLChecksumOperator::delete_checksum(param_->dest_tenant_id_,
|
||||
param_->execution_id_,
|
||||
param_->orig_table_id_,
|
||||
0/*use 0 just to avoid clearing target table chksum*/,
|
||||
@ -1242,17 +1289,28 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
* Meanwhile, the original tenant is a backup tenant, can not support write operation,
|
||||
* report its' checksum under the dest tenant, and origin_table_id + ddl_task_id will aviod the conflict.
|
||||
*/
|
||||
else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(param_->dest_tenant_id_,
|
||||
param_->orig_table_id_,
|
||||
param_->task_id_,
|
||||
report_col_checksums,
|
||||
report_col_ids,
|
||||
1/*execution_id*/,
|
||||
param_->tablet_task_id_ << ObDDLChecksumItem::PX_SQC_ID_OFFSET | task_id_,
|
||||
*GCTX.sql_proxy_))) {
|
||||
LOG_WARN("fail to report origin table checksum", K(ret));
|
||||
} else {
|
||||
LOG_INFO("update checksum successfully", K(param_->orig_table_id_), K(report_col_checksums), K(param_->orig_tablet_id_));
|
||||
else {
|
||||
if (param_->use_new_checksum()) {
|
||||
// add checksum to context and report checksum in merge task
|
||||
if (OB_FAIL(context_->add_column_checksum(report_col_checksums, report_col_ids))) {
|
||||
LOG_WARN("add column checksum failed", K(ret));
|
||||
} else {
|
||||
LOG_INFO("use new checksum", K(param_->orig_table_id_), K(report_col_checksums), K(param_->orig_tablet_id_));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(ObDDLChecksumOperator::update_checksum(param_->dest_tenant_id_,
|
||||
param_->orig_table_id_,
|
||||
param_->task_id_,
|
||||
report_col_checksums,
|
||||
report_col_ids,
|
||||
1/*execution_id*/,
|
||||
param_->tablet_task_id_ << ObDDLChecksumItem::PX_SQC_ID_OFFSET | task_id_,
|
||||
*GCTX.sql_proxy_))) {
|
||||
LOG_WARN("fail to report origin table checksum", K(ret));
|
||||
} else {
|
||||
LOG_INFO("update checksum successfully", K(param_->orig_table_id_), K(report_col_checksums), K(param_->orig_tablet_id_));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -1294,6 +1352,8 @@ int ObComplementMergeTask::process()
|
||||
ObLSHandle ls_handle;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObTablet *tablet = nullptr;
|
||||
ObArray<int64_t> report_col_checksums;
|
||||
ObArray<int64_t> report_col_ids;
|
||||
if (OB_ISNULL(tmp_dag) || ObDagType::DAG_TYPE_DDL != tmp_dag->get_type()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("dag is invalid", K(ret), KP(tmp_dag));
|
||||
@ -1302,6 +1362,17 @@ int ObComplementMergeTask::process()
|
||||
LOG_WARN("complement data has already failed", "ret", context_->complement_data_ret_);
|
||||
} else if (OB_FAIL(guard.switch_to(param_->dest_tenant_id_, false))) {
|
||||
LOG_WARN("switch to tenant failed", K(ret), K(param_->dest_tenant_id_));
|
||||
} else if (param_->use_new_checksum() && OB_FAIL(context_->get_column_checksum(report_col_checksums, report_col_ids))) {
|
||||
LOG_WARN("get column checksum failed", K(ret));
|
||||
} else if (param_->use_new_checksum() && OB_FAIL(ObDDLChecksumOperator::update_checksum(param_->dest_tenant_id_,
|
||||
param_->orig_table_id_,
|
||||
param_->task_id_,
|
||||
report_col_checksums,
|
||||
report_col_ids,
|
||||
1/*execution_id*/,
|
||||
param_->orig_tablet_id_.id(),
|
||||
*GCTX.sql_proxy_))) {
|
||||
LOG_WARN("fail to report origin table checksum", K(ret));
|
||||
} else if (context_->is_major_sstable_exist_) {
|
||||
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
|
||||
const ObSSTable *first_major_sstable = nullptr;
|
||||
|
@ -60,6 +60,7 @@ public:
|
||||
}
|
||||
|
||||
int get_hidden_table_key(ObITable::TableKey &table_key) const;
|
||||
bool use_new_checksum() const { return data_format_version_ >= DATA_VERSION_4_2_1_1; }
|
||||
void destroy()
|
||||
{
|
||||
is_inited_ = false;
|
||||
@ -124,6 +125,8 @@ public:
|
||||
int init(const ObComplementDataParam ¶m, const ObDataStoreDesc &desc);
|
||||
void destroy();
|
||||
int write_start_log(const ObComplementDataParam ¶m);
|
||||
int add_column_checksum(const ObIArray<int64_t> &report_col_checksums, const ObIArray<int64_t> &report_col_ids);
|
||||
int get_column_checksum(ObIArray<int64_t> &report_col_checksums, ObIArray<int64_t> &report_col_ids);
|
||||
TO_STRING_KV(K_(is_inited), K_(complement_data_ret), K_(concurrent_cnt), KP_(index_builder), K_(ddl_kv_mgr_handle), K_(row_scanned), K_(row_inserted));
|
||||
public:
|
||||
bool is_inited_;
|
||||
@ -137,6 +140,8 @@ public:
|
||||
ObDDLKvMgrHandle ddl_kv_mgr_handle_; // for keeping ddl kv mgr alive
|
||||
int64_t row_scanned_;
|
||||
int64_t row_inserted_;
|
||||
ObArray<int64_t> report_col_checksums_;
|
||||
ObArray<int64_t> report_col_ids_;
|
||||
};
|
||||
|
||||
class ObComplementPrepareTask;
|
||||
|
Reference in New Issue
Block a user