clear ddl checksum of source table and dest table before retry execution.

This commit is contained in:
obdev
2023-09-11 10:44:10 +00:00
committed by ob-robot
parent 79c21dd9c5
commit 3f04abacab
4 changed files with 31 additions and 9 deletions

View File

@ -390,27 +390,39 @@ int ObDDLChecksumOperator::check_column_checksum(
return ret;
}
/**
* The request of complement data task is to clean up the scan checksum record of the specified tablet.
* The request of insert into select is to clean up all tablets' checksum record.
* And the input argument (tablet_task_id) is to classify the above two scenarios, default value (OB_INVALID_INDEX)
* means to clear all checksum records.
*/
int ObDDLChecksumOperator::delete_checksum(
const uint64_t tenant_id,
const int64_t execution_id,
const uint64_t source_table_id,
const uint64_t dest_table_id,
const int64_t ddl_task_id,
common::ObMySQLProxy &sql_proxy)
common::ObMySQLProxy &sql_proxy,
const int64_t tablet_task_id)
{
int ret = OB_SUCCESS;
ObSqlString sql;
ObSqlString remove_tablet_chksum_sql;
int64_t affected_rows = 0;
const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || execution_id < 0 || OB_INVALID_ID == ddl_task_id
|| OB_INVALID_ID == source_table_id || OB_INVALID_ID == dest_table_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(execution_id), K(source_table_id), K(dest_table_id));
} else if (OB_INVALID_INDEX != tablet_task_id
&& OB_FAIL(remove_tablet_chksum_sql.assign_fmt("AND (task_id >> %ld) = %ld ", ObDDLChecksumItem::PX_SQC_ID_OFFSET, tablet_task_id))) {
LOG_WARN("assign fmt failed", K(ret), K(tablet_task_id), K(remove_tablet_chksum_sql));
} else if (OB_FAIL(sql.assign_fmt(
"DELETE /*+ use_plan_cache(none) */ FROM %s "
"WHERE tenant_id = 0 AND execution_id = %ld AND ddl_task_id = %ld AND table_id IN (%ld, %ld)",
OB_ALL_DDL_CHECKSUM_TNAME, execution_id, ddl_task_id, source_table_id, dest_table_id))) {
LOG_WARN("fail to assign fmt", K(ret));
"WHERE tenant_id = 0 AND execution_id = %ld AND ddl_task_id = %ld AND table_id IN (%ld, %ld) %.*s",
OB_ALL_DDL_CHECKSUM_TNAME, execution_id, ddl_task_id, source_table_id, dest_table_id,
static_cast<int>(remove_tablet_chksum_sql.length()), remove_tablet_chksum_sql.ptr()))) {
LOG_WARN("fail to assign fmt", K(ret), K(remove_tablet_chksum_sql));
} else if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("fail to execute sql", KR(ret), K(sql));
} else if (OB_UNLIKELY(affected_rows < 0)) {

View File

@ -42,6 +42,8 @@ struct ObDDLChecksumItem
}
TO_STRING_KV(K_(execution_id), K_(tenant_id), K_(table_id),
K_(ddl_task_id), K_(column_id), K_(task_id), K_(checksum));
static const int64_t PX_SQC_ID_OFFSET = 48;
static const int64_t PX_TASK_ID_OFFSET = 32;
int64_t execution_id_;
uint64_t tenant_id_;
uint64_t table_id_;
@ -95,7 +97,8 @@ public:
const uint64_t source_table_id,
const uint64_t dest_table_id,
const int64_t ddl_task_id,
common::ObMySQLProxy &sql_proxy);
common::ObMySQLProxy &sql_proxy,
const int64_t tablet_task_id = OB_INVALID_INDEX);
private:
static int fill_one_item(const ObDDLChecksumItem &item,
share::ObDMLSqlSplicer &dml);

View File

@ -2678,7 +2678,7 @@ int ObTableScanOp::report_ddl_column_checksum()
item.table_id_ = table_id;
item.ddl_task_id_ = MY_SPEC.plan_->get_ddl_task_id();
item.column_id_ = MY_SPEC.ddl_output_cids_.at(i) & VIRTUAL_GEN_FIXED_LEN_MASK;
item.task_id_ = ctx_.get_px_sqc_id() << 48 | ctx_.get_px_task_id() << 32 | curr_scan_task_id;
item.task_id_ = ctx_.get_px_sqc_id() << ObDDLChecksumItem::PX_SQC_ID_OFFSET | ctx_.get_px_task_id() << ObDDLChecksumItem::PX_TASK_ID_OFFSET | curr_scan_task_id;
item.checksum_ = i < column_checksum_.count() ? column_checksum_[i] : 0;
#ifdef ERRSIM
if (OB_SUCC(ret)) {

View File

@ -616,7 +616,6 @@ int ObComplementPrepareTask::process()
ObComplementDataDag *dag = nullptr;
ObComplementWriteTask *write_task = nullptr;
ObComplementMergeTask *merge_task = nullptr;
LOG_WARN("start to process ObComplementPrepareTask", K(ret));
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObComplementPrepareTask has not been inited", K(ret));
@ -630,8 +629,16 @@ int ObComplementPrepareTask::process()
FLOG_INFO("major sstable exists, all task should finish", K(ret), K(*param_));
} else if (OB_FAIL(context_->write_start_log(*param_))) {
LOG_WARN("write start log failed", K(ret), KPC(param_));
} else if (OB_FAIL(ObDDLChecksumOperator::delete_checksum(param_->dest_tenant_id_,
param_->execution_id_,
param_->orig_table_id_,
0/*use 0 just to avoid clearing target table chksum*/,
param_->task_id_,
*GCTX.sql_proxy_,
param_->tablet_task_id_))) {
LOG_WARN("failed to delete checksum", K(ret), KPC(param_));
} else {
LOG_INFO("finish the complement prepare task", K(ret));
LOG_INFO("finish the complement prepare task", K(ret), KPC(param_));
}
if (OB_FAIL(ret)) {
@ -1241,7 +1248,7 @@ int ObComplementWriteTask::append_row(ObScan *scan)
report_col_checksums,
report_col_ids,
1/*execution_id*/,
param_->tablet_task_id_ << 48 | task_id_,
param_->tablet_task_id_ << ObDDLChecksumItem::PX_SQC_ID_OFFSET | task_id_,
*GCTX.sql_proxy_))) {
LOG_WARN("fail to report origin table checksum", K(ret));
} else {