diff --git a/src/share/ob_ddl_checksum.cpp b/src/share/ob_ddl_checksum.cpp index 5f445eb967..188d67dd04 100644 --- a/src/share/ob_ddl_checksum.cpp +++ b/src/share/ob_ddl_checksum.cpp @@ -25,7 +25,9 @@ using namespace oceanbase::common::hash; using namespace oceanbase::share; using namespace oceanbase::share::schema; -int ObDDLChecksumOperator::fill_one_item(const ObDDLChecksumItem &item, +int ObDDLChecksumOperator::fill_one_item( + const uint64_t data_format_version, + const ObDDLChecksumItem &item, ObDMLSqlSplicer &dml) { int ret = OB_SUCCESS; @@ -44,6 +46,10 @@ int ObDDLChecksumOperator::fill_one_item(const ObDDLChecksumItem &item, || OB_FAIL(dml.add_pk_column("task_id", item.task_id_)) || OB_FAIL(dml.add_column("checksum", item.checksum_))) { LOG_WARN("fail to add column", K(ret)); + } else if (data_format_version >= DATA_VERSION_4_2_2_0) { + if (OB_FAIL(dml.add_column("tablet_id", item.tablet_id_))) { + LOG_WARN("fail to add tablet id column", K(ret), K(item.tablet_id_), K(data_format_version)); + } } return ret; } @@ -51,18 +57,20 @@ int ObDDLChecksumOperator::fill_one_item(const ObDDLChecksumItem &item, int ObDDLChecksumOperator::update_checksum( const uint64_t tenant_id, const int64_t table_id, + const int64_t tablet_id, const int64_t ddl_task_id, const common::ObIArray &main_table_checksum, const common::ObIArray &col_ids, const int64_t schema_version, const int64_t task_idx, + const uint64_t data_format_version, common::ObMySQLProxy &sql_proxy) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == table_id || OB_INVALID_ID == ddl_task_id + if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == table_id || OB_INVALID_ID == tablet_id || OB_INVALID_ID == ddl_task_id || main_table_checksum.count() <= 0 || col_ids.count() <= 0 || schema_version <= 0 || task_idx < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(tenant_id), K(table_id), K(task_idx), + LOG_WARN("invalid arguments", K(ret), K(tenant_id), K(table_id), K(tablet_id), K(task_idx), K(main_table_checksum.count()), K(col_ids.count()), K(schema_version), K(task_idx)); } else { const int64_t column_cnt = col_ids.count(); @@ -72,6 +80,7 @@ int ObDDLChecksumOperator::update_checksum( item.execution_id_ = schema_version; item.tenant_id_ = tenant_id; item.table_id_ = table_id; + item.tablet_id_ = tablet_id; item.ddl_task_id_ = ddl_task_id; item.column_id_ = col_ids.at(i); item.task_id_ = task_idx; @@ -84,7 +93,7 @@ int ObDDLChecksumOperator::update_checksum( } } if (OB_SUCC(ret)) { - if (OB_FAIL(ObDDLChecksumOperator::update_checksum(checksum_items, sql_proxy))) { + if (OB_FAIL(ObDDLChecksumOperator::update_checksum(data_format_version, checksum_items, sql_proxy))) { LOG_WARN("fail to update checksum items", K(ret)); } } @@ -92,7 +101,7 @@ int ObDDLChecksumOperator::update_checksum( return ret; } -int ObDDLChecksumOperator::update_checksum(const ObIArray &checksum_items, +int ObDDLChecksumOperator::update_checksum(const uint64_t data_format_version, const ObIArray &checksum_items, ObMySQLProxy &sql_proxy) { int ret = OB_SUCCESS; @@ -111,7 +120,7 @@ int ObDDLChecksumOperator::update_checksum(const ObIArray &ch for (int64_t i = 0; OB_SUCC(ret) && i < checksum_items.count(); ++i) { const ObDDLChecksumItem &item = checksum_items.at(i); dml.reuse(); - if (OB_FAIL(fill_one_item(item, dml))) { + if (OB_FAIL(fill_one_item(data_format_version, item, dml))) { LOG_WARN("fail to fill one item", K(ret), K(item)); } else { if (0 == i) { diff --git a/src/share/ob_ddl_checksum.h b/src/share/ob_ddl_checksum.h index 738ed1444f..ab794ded30 100644 --- a/src/share/ob_ddl_checksum.h +++ b/src/share/ob_ddl_checksum.h @@ -28,7 +28,7 @@ struct ObDDLChecksumItem { ObDDLChecksumItem() : execution_id_(-1), tenant_id_(common::OB_INVALID_ID), - table_id_(common::OB_INVALID_ID), ddl_task_id_(0), + table_id_(common::OB_INVALID_ID), tablet_id_(common::OB_INVALID_ID), ddl_task_id_(0), column_id_(common::OB_INVALID_ID), task_id_(common::OB_INVALID_ID), checksum_(0) {} ~ObDDLChecksumItem() {}; @@ -37,16 +37,18 @@ struct ObDDLChecksumItem return 0 <= execution_id_ && common::OB_INVALID_ID != tenant_id_ && common::OB_INVALID_ID != table_id_ + && common::OB_INVALID_ID != tablet_id_ && 0 < ddl_task_id_ && common::OB_INVALID_ID != column_id_; } - TO_STRING_KV(K_(execution_id), K_(tenant_id), K_(table_id), + TO_STRING_KV(K_(execution_id), K_(tenant_id), K_(table_id), K_(tablet_id), K_(ddl_task_id), K_(column_id), K_(task_id), K_(checksum)); static const int64_t PX_SQC_ID_OFFSET = 48; static const int64_t PX_TASK_ID_OFFSET = 32; int64_t execution_id_; uint64_t tenant_id_; uint64_t table_id_; + uint64_t tablet_id_; int64_t ddl_task_id_; int64_t column_id_; uint64_t task_id_; @@ -59,13 +61,17 @@ public: static int update_checksum( const uint64_t tenant_id, const int64_t table_id, + const int64_t tablet_id, const int64_t ddl_task_id, const common::ObIArray &main_table_checksum, const common::ObIArray &col_ids, const int64_t schema_version, const int64_t task_idx, + const uint64_t data_format_version, common::ObMySQLProxy &sql_proxy); - static int update_checksum(const common::ObIArray &checksum_items, + static int update_checksum( + const uint64_t data_format_version, + const common::ObIArray &checksum_items, common::ObMySQLProxy &sql_proxy); static int get_table_column_checksum( const uint64_t tenant_id, @@ -100,7 +106,9 @@ public: common::ObMySQLProxy &sql_proxy, const int64_t tablet_task_id = OB_INVALID_INDEX); private: - static int fill_one_item(const ObDDLChecksumItem &item, + static int fill_one_item( + const uint64_t data_format_version, + const ObDDLChecksumItem &item, share::ObDMLSqlSplicer &dml); static int get_column_checksum( const common::ObSqlString &sql, diff --git a/src/sql/engine/table/ob_table_scan_op.cpp b/src/sql/engine/table/ob_table_scan_op.cpp index f1243cb634..3bcf93fb5a 100644 --- a/src/sql/engine/table/ob_table_scan_op.cpp +++ b/src/sql/engine/table/ob_table_scan_op.cpp @@ -2708,6 +2708,7 @@ int ObTableScanOp::report_ddl_column_checksum() item.execution_id_ = MY_SPEC.plan_->get_ddl_execution_id(); item.tenant_id_ = MTL_ID(); item.table_id_ = table_id; + item.tablet_id_ = tablet_id.id(); item.ddl_task_id_ = MY_SPEC.plan_->get_ddl_task_id(); item.column_id_ = MY_SPEC.ddl_output_cids_.at(i) & VIRTUAL_GEN_FIXED_LEN_MASK; item.task_id_ = ctx_.get_px_sqc_id() << ObDDLChecksumItem::PX_SQC_ID_OFFSET | ctx_.get_px_task_id() << ObDDLChecksumItem::PX_TASK_ID_OFFSET | curr_scan_task_id; @@ -2728,7 +2729,12 @@ int ObTableScanOp::report_ddl_column_checksum() if (OB_SUCC(ret)) { LOG_INFO("report ddl checksum table scan", K(tablet_id), K(checksum_items)); - if (OB_FAIL(ObDDLChecksumOperator::update_checksum(checksum_items, *GCTX.sql_proxy_))) { + uint64_t data_format_version = 0; + int64_t snapshot_version = 0; + share::ObDDLTaskStatus unused_task_status = share::ObDDLTaskStatus::PREPARE; + if (OB_FAIL(ObDDLUtil::get_data_information(MTL_ID(), MY_SPEC.plan_->get_ddl_task_id(), data_format_version, snapshot_version, unused_task_status))) { + LOG_WARN("get ddl cluster version failed", K(ret)); + } else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(data_format_version, checksum_items, *GCTX.sql_proxy_))) { LOG_WARN("fail to update checksum", K(ret)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ddl_output_cids_.count(); ++i) { diff --git a/src/storage/ddl/ob_build_index_task.cpp b/src/storage/ddl/ob_build_index_task.cpp index 916ab6b92a..579b9d68d4 100644 --- a/src/storage/ddl/ob_build_index_task.cpp +++ b/src/storage/ddl/ob_build_index_task.cpp @@ -498,6 +498,7 @@ int ObUniqueIndexChecker::report_column_checksum( item.execution_id_ = execution_id_; item.tenant_id_ = tenant_id_; item.table_id_ = report_table_id; + item.tablet_id_ = tablet_id_.id(); item.ddl_task_id_ = task_id_; item.column_id_ = column_ids.at(i).col_id_; item.task_id_ = -tablet_id_.id(); @@ -509,7 +510,12 @@ int ObUniqueIndexChecker::report_column_checksum( } if (OB_SUCC(ret)) { - if (OB_FAIL(ObDDLChecksumOperator::update_checksum(checksum_items, *GCTX.sql_proxy_))) { + uint64_t data_format_version; + int64_t snapshot_version = 0; + share::ObDDLTaskStatus unused_task_status = share::ObDDLTaskStatus::PREPARE; + if (OB_FAIL(ObDDLUtil::get_data_information(tenant_id_, task_id_, data_format_version, snapshot_version, unused_task_status))) { + LOG_WARN("get ddl cluster version failed", K(ret)); + } else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(data_format_version, checksum_items, *GCTX.sql_proxy_))) { LOG_WARN("fail to update checksum", K(ret)); } } diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index 43e8ce68a1..19a3eca48d 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -1508,11 +1508,13 @@ int ObComplementWriteTask::append_row(ObScan *scan) } else { if (OB_FAIL(ObDDLChecksumOperator::update_checksum(param_->dest_tenant_id_, param_->orig_table_id_, + param_->orig_tablet_id_.id(), param_->task_id_, report_col_checksums, report_col_ids, 1/*execution_id*/, param_->tablet_task_id_ << ObDDLChecksumItem::PX_SQC_ID_OFFSET | task_id_, + param_->data_format_version_, *GCTX.sql_proxy_))) { LOG_WARN("fail to report origin table checksum", K(ret)); } else { @@ -1587,7 +1589,8 @@ int ObComplementMergeTask::process() 1 /* execution_id */, param_->task_id_, sst_meta_hdl.get_sstable_meta().get_col_checksum(), - sst_meta_hdl.get_sstable_meta().get_col_checksum_cnt()))) { + sst_meta_hdl.get_sstable_meta().get_col_checksum_cnt(), + param_->data_format_version_))) { LOG_WARN("report ddl column checksum failed", K(ret), K(*param_)); } else if (OB_FAIL(MTL(ObTabletTableUpdater*)->submit_tablet_update_task(param_->dest_ls_id_, param_->dest_tablet_id_))) { LOG_WARN("fail to submit tablet update task", K(ret), K(*param_)); @@ -1596,11 +1599,13 @@ int ObComplementMergeTask::process() LOG_WARN("get column checksum failed", K(ret)); } else if (param_->use_new_checksum() && OB_FAIL(ObDDLChecksumOperator::update_checksum(param_->dest_tenant_id_, param_->orig_table_id_, + param_->orig_tablet_id_.id(), param_->task_id_, report_col_checksums, report_col_ids, 1/*execution_id*/, param_->orig_tablet_id_.id(), + param_->data_format_version_, *GCTX.sql_proxy_))) { LOG_WARN("fail to report origin table checksum", K(ret)); } else if (OB_FAIL(add_build_hidden_table_sstable())) { diff --git a/src/storage/ddl/ob_ddl_merge_task.cpp b/src/storage/ddl/ob_ddl_merge_task.cpp index d592fe31c3..9512468f52 100644 --- a/src/storage/ddl/ob_ddl_merge_task.cpp +++ b/src/storage/ddl/ob_ddl_merge_task.cpp @@ -1138,7 +1138,8 @@ int ObTabletDDLUtil::report_ddl_checksum( const int64_t execution_id, const int64_t ddl_task_id, const int64_t *column_checksums, - const int64_t column_count) + const int64_t column_count, + const uint64_t data_format_version) { int ret = OB_SUCCESS; ObMySQLProxy *sql_proxy = GCTX.sql_proxy_; @@ -1147,9 +1148,9 @@ int ObTabletDDLUtil::report_ddl_checksum( const ObTableSchema *table_schema = nullptr; const uint64_t tenant_id = MTL_ID(); if (OB_UNLIKELY(!tablet_id.is_valid() || OB_INVALID_ID == ddl_task_id - || !is_valid_id(table_id) || 0 == table_id || execution_id < 0 || nullptr == column_checksums || column_count <= 0)) { + || !is_valid_id(table_id) || 0 == table_id || execution_id < 0 || nullptr == column_checksums || column_count <= 0 || data_format_version < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(tablet_id), K(table_id), K(execution_id), KP(column_checksums), K(column_count)); + LOG_WARN("invalid argument", K(ret), K(tablet_id), K(table_id), K(execution_id), KP(column_checksums), K(column_count), K(data_format_version)); } else if (!is_valid_tenant_id(tenant_id) || OB_ISNULL(sql_proxy) || OB_ISNULL(schema_service)) { ret = OB_ERR_SYS; LOG_WARN("ls service or sql proxy is null", K(ret), K(tenant_id), KP(sql_proxy), KP(schema_service)); @@ -1178,6 +1179,7 @@ int ObTabletDDLUtil::report_ddl_checksum( item.execution_id_ = execution_id; item.tenant_id_ = tenant_id; item.table_id_ = table_id; + item.tablet_id_ = tablet_id.id(); item.ddl_task_id_ = ddl_task_id; item.column_id_ = column_ids.at(i).col_id_; item.task_id_ = tablet_id.id(); @@ -1208,7 +1210,7 @@ int ObTabletDDLUtil::report_ddl_checksum( } #endif if (OB_FAIL(ret)) { - } else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(ddl_checksum_items, *sql_proxy))) { + } else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(data_format_version, ddl_checksum_items, *sql_proxy))) { LOG_WARN("fail to update checksum", K(ret), K(tablet_id), K(table_id), K(ddl_checksum_items)); } else { LOG_INFO("report ddl checkum success", K(tablet_id), K(table_id), K(execution_id), K(ddl_checksum_items)); diff --git a/src/storage/ddl/ob_ddl_merge_task.h b/src/storage/ddl/ob_ddl_merge_task.h index b9e72c91be..15ec08b1ac 100644 --- a/src/storage/ddl/ob_ddl_merge_task.h +++ b/src/storage/ddl/ob_ddl_merge_task.h @@ -130,7 +130,8 @@ public: const int64_t execution_id, const int64_t ddl_task_id, const int64_t *column_checksums, - const int64_t column_count); + const int64_t column_count, + const uint64_t data_format_version); static int check_and_get_major_sstable( const share::ObLSID &ls_id, diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp index b26dee89b0..8b0b43d3ed 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp @@ -757,6 +757,10 @@ int ObTenantDirectLoadMgr::check_and_process_finished_tablet( ObLSHandle ls_handle; ObTabletHandle tablet_handle; ObSSTableMetaHandle sst_meta_hdl; + const uint64_t tenant_id = MTL_ID(); + uint64_t data_format_version = 0; + int64_t snapshot_version = 0; + share::ObDDLTaskStatus unused_task_status = share::ObDDLTaskStatus::PREPARE; const ObSSTable *first_major_sstable = nullptr; ObTabletMemberWrapper table_store_wrapper; const int64_t max_wait_timeout_us = 30L * 1000L * 1000L; // 30s @@ -797,6 +801,8 @@ int ObTenantDirectLoadMgr::check_and_process_finished_tablet( LOG_WARN("check if major sstable exist failed", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(first_major_sstable->get_meta(sst_meta_hdl))) { LOG_WARN("fail to get sstable meta handle", K(ret)); + } else if (OB_FAIL(ObDDLUtil::get_data_information(tenant_id, task_id, data_format_version, snapshot_version, unused_task_status))) { + LOG_WARN("get ddl cluster version failed", K(ret), K(tenant_id), K(task_id)); } else { const int64_t *column_checksums = sst_meta_hdl.get_sstable_meta().get_col_checksum(); int64_t column_count = sst_meta_hdl.get_sstable_meta().get_col_checksum_cnt(); @@ -811,7 +817,8 @@ int ObTenantDirectLoadMgr::check_and_process_finished_tablet( execution_id, task_id, co_column_checksums.empty() ? column_checksums : co_column_checksums.get_data(), - co_column_checksums.empty() ? column_count : co_column_checksums.count()))) { + co_column_checksums.empty() ? column_count : co_column_checksums.count(), + data_format_version))) { LOG_WARN("report ddl column checksum failed", K(ret), K(ls_id), K(tablet_id), K(execution_id)); } else { break; @@ -2217,7 +2224,8 @@ int ObTabletFullDirectLoadMgr::close(const int64_t execution_id, const SCN &star execution_id, sqc_build_ctx_.build_param_.runtime_only_param_.task_id_, co_column_checksums.empty() ? column_checksums : co_column_checksums.get_data(), - co_column_checksums.empty() ? column_count : co_column_checksums.count()))) { + co_column_checksums.empty() ? column_count : co_column_checksums.count(), + data_format_version_))) { LOG_WARN("report ddl column checksum failed", K(ret), K(ls_id_), K(tablet_id_), K(execution_id), K(sqc_build_ctx_)); } else { break;