Filling the tablet_id field in table __all_ddl_checksum
This commit is contained in:
@ -25,7 +25,9 @@ using namespace oceanbase::common::hash;
|
||||
using namespace oceanbase::share;
|
||||
using namespace oceanbase::share::schema;
|
||||
|
||||
int ObDDLChecksumOperator::fill_one_item(const ObDDLChecksumItem &item,
|
||||
int ObDDLChecksumOperator::fill_one_item(
|
||||
const uint64_t data_format_version,
|
||||
const ObDDLChecksumItem &item,
|
||||
ObDMLSqlSplicer &dml)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -44,6 +46,10 @@ int ObDDLChecksumOperator::fill_one_item(const ObDDLChecksumItem &item,
|
||||
|| OB_FAIL(dml.add_pk_column("task_id", item.task_id_))
|
||||
|| OB_FAIL(dml.add_column("checksum", item.checksum_))) {
|
||||
LOG_WARN("fail to add column", K(ret));
|
||||
} else if (data_format_version >= DATA_VERSION_4_2_2_0) {
|
||||
if (OB_FAIL(dml.add_column("tablet_id", item.tablet_id_))) {
|
||||
LOG_WARN("fail to add tablet id column", K(ret), K(item.tablet_id_), K(data_format_version));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -51,18 +57,20 @@ int ObDDLChecksumOperator::fill_one_item(const ObDDLChecksumItem &item,
|
||||
int ObDDLChecksumOperator::update_checksum(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t table_id,
|
||||
const int64_t tablet_id,
|
||||
const int64_t ddl_task_id,
|
||||
const common::ObIArray<int64_t> &main_table_checksum,
|
||||
const common::ObIArray<int64_t> &col_ids,
|
||||
const int64_t schema_version,
|
||||
const int64_t task_idx,
|
||||
const uint64_t data_format_version,
|
||||
common::ObMySQLProxy &sql_proxy)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == table_id || OB_INVALID_ID == ddl_task_id
|
||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == table_id || OB_INVALID_ID == tablet_id || OB_INVALID_ID == ddl_task_id
|
||||
|| main_table_checksum.count() <= 0 || col_ids.count() <= 0 || schema_version <= 0 || task_idx < 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(tenant_id), K(table_id), K(task_idx),
|
||||
LOG_WARN("invalid arguments", K(ret), K(tenant_id), K(table_id), K(tablet_id), K(task_idx),
|
||||
K(main_table_checksum.count()), K(col_ids.count()), K(schema_version), K(task_idx));
|
||||
} else {
|
||||
const int64_t column_cnt = col_ids.count();
|
||||
@ -72,6 +80,7 @@ int ObDDLChecksumOperator::update_checksum(
|
||||
item.execution_id_ = schema_version;
|
||||
item.tenant_id_ = tenant_id;
|
||||
item.table_id_ = table_id;
|
||||
item.tablet_id_ = tablet_id;
|
||||
item.ddl_task_id_ = ddl_task_id;
|
||||
item.column_id_ = col_ids.at(i);
|
||||
item.task_id_ = task_idx;
|
||||
@ -84,7 +93,7 @@ int ObDDLChecksumOperator::update_checksum(
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ObDDLChecksumOperator::update_checksum(checksum_items, sql_proxy))) {
|
||||
if (OB_FAIL(ObDDLChecksumOperator::update_checksum(data_format_version, checksum_items, sql_proxy))) {
|
||||
LOG_WARN("fail to update checksum items", K(ret));
|
||||
}
|
||||
}
|
||||
@ -92,7 +101,7 @@ int ObDDLChecksumOperator::update_checksum(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLChecksumOperator::update_checksum(const ObIArray<ObDDLChecksumItem> &checksum_items,
|
||||
int ObDDLChecksumOperator::update_checksum(const uint64_t data_format_version, const ObIArray<ObDDLChecksumItem> &checksum_items,
|
||||
ObMySQLProxy &sql_proxy)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -111,7 +120,7 @@ int ObDDLChecksumOperator::update_checksum(const ObIArray<ObDDLChecksumItem> &ch
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < checksum_items.count(); ++i) {
|
||||
const ObDDLChecksumItem &item = checksum_items.at(i);
|
||||
dml.reuse();
|
||||
if (OB_FAIL(fill_one_item(item, dml))) {
|
||||
if (OB_FAIL(fill_one_item(data_format_version, item, dml))) {
|
||||
LOG_WARN("fail to fill one item", K(ret), K(item));
|
||||
} else {
|
||||
if (0 == i) {
|
||||
|
||||
@ -28,7 +28,7 @@ struct ObDDLChecksumItem
|
||||
{
|
||||
ObDDLChecksumItem()
|
||||
: execution_id_(-1), tenant_id_(common::OB_INVALID_ID),
|
||||
table_id_(common::OB_INVALID_ID), ddl_task_id_(0),
|
||||
table_id_(common::OB_INVALID_ID), tablet_id_(common::OB_INVALID_ID), ddl_task_id_(0),
|
||||
column_id_(common::OB_INVALID_ID), task_id_(common::OB_INVALID_ID), checksum_(0)
|
||||
{}
|
||||
~ObDDLChecksumItem() {};
|
||||
@ -37,16 +37,18 @@ struct ObDDLChecksumItem
|
||||
return 0 <= execution_id_
|
||||
&& common::OB_INVALID_ID != tenant_id_
|
||||
&& common::OB_INVALID_ID != table_id_
|
||||
&& common::OB_INVALID_ID != tablet_id_
|
||||
&& 0 < ddl_task_id_
|
||||
&& common::OB_INVALID_ID != column_id_;
|
||||
}
|
||||
TO_STRING_KV(K_(execution_id), K_(tenant_id), K_(table_id),
|
||||
TO_STRING_KV(K_(execution_id), K_(tenant_id), K_(table_id), K_(tablet_id),
|
||||
K_(ddl_task_id), K_(column_id), K_(task_id), K_(checksum));
|
||||
static const int64_t PX_SQC_ID_OFFSET = 48;
|
||||
static const int64_t PX_TASK_ID_OFFSET = 32;
|
||||
int64_t execution_id_;
|
||||
uint64_t tenant_id_;
|
||||
uint64_t table_id_;
|
||||
uint64_t tablet_id_;
|
||||
int64_t ddl_task_id_;
|
||||
int64_t column_id_;
|
||||
uint64_t task_id_;
|
||||
@ -59,13 +61,17 @@ public:
|
||||
static int update_checksum(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t table_id,
|
||||
const int64_t tablet_id,
|
||||
const int64_t ddl_task_id,
|
||||
const common::ObIArray<int64_t> &main_table_checksum,
|
||||
const common::ObIArray<int64_t> &col_ids,
|
||||
const int64_t schema_version,
|
||||
const int64_t task_idx,
|
||||
const uint64_t data_format_version,
|
||||
common::ObMySQLProxy &sql_proxy);
|
||||
static int update_checksum(const common::ObIArray<ObDDLChecksumItem> &checksum_items,
|
||||
static int update_checksum(
|
||||
const uint64_t data_format_version,
|
||||
const common::ObIArray<ObDDLChecksumItem> &checksum_items,
|
||||
common::ObMySQLProxy &sql_proxy);
|
||||
static int get_table_column_checksum(
|
||||
const uint64_t tenant_id,
|
||||
@ -100,7 +106,9 @@ public:
|
||||
common::ObMySQLProxy &sql_proxy,
|
||||
const int64_t tablet_task_id = OB_INVALID_INDEX);
|
||||
private:
|
||||
static int fill_one_item(const ObDDLChecksumItem &item,
|
||||
static int fill_one_item(
|
||||
const uint64_t data_format_version,
|
||||
const ObDDLChecksumItem &item,
|
||||
share::ObDMLSqlSplicer &dml);
|
||||
static int get_column_checksum(
|
||||
const common::ObSqlString &sql,
|
||||
|
||||
@ -2708,6 +2708,7 @@ int ObTableScanOp::report_ddl_column_checksum()
|
||||
item.execution_id_ = MY_SPEC.plan_->get_ddl_execution_id();
|
||||
item.tenant_id_ = MTL_ID();
|
||||
item.table_id_ = table_id;
|
||||
item.tablet_id_ = tablet_id.id();
|
||||
item.ddl_task_id_ = MY_SPEC.plan_->get_ddl_task_id();
|
||||
item.column_id_ = MY_SPEC.ddl_output_cids_.at(i) & VIRTUAL_GEN_FIXED_LEN_MASK;
|
||||
item.task_id_ = ctx_.get_px_sqc_id() << ObDDLChecksumItem::PX_SQC_ID_OFFSET | ctx_.get_px_task_id() << ObDDLChecksumItem::PX_TASK_ID_OFFSET | curr_scan_task_id;
|
||||
@ -2728,7 +2729,12 @@ int ObTableScanOp::report_ddl_column_checksum()
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
LOG_INFO("report ddl checksum table scan", K(tablet_id), K(checksum_items));
|
||||
if (OB_FAIL(ObDDLChecksumOperator::update_checksum(checksum_items, *GCTX.sql_proxy_))) {
|
||||
uint64_t data_format_version = 0;
|
||||
int64_t snapshot_version = 0;
|
||||
share::ObDDLTaskStatus unused_task_status = share::ObDDLTaskStatus::PREPARE;
|
||||
if (OB_FAIL(ObDDLUtil::get_data_information(MTL_ID(), MY_SPEC.plan_->get_ddl_task_id(), data_format_version, snapshot_version, unused_task_status))) {
|
||||
LOG_WARN("get ddl cluster version failed", K(ret));
|
||||
} else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(data_format_version, checksum_items, *GCTX.sql_proxy_))) {
|
||||
LOG_WARN("fail to update checksum", K(ret));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ddl_output_cids_.count(); ++i) {
|
||||
|
||||
@ -498,6 +498,7 @@ int ObUniqueIndexChecker::report_column_checksum(
|
||||
item.execution_id_ = execution_id_;
|
||||
item.tenant_id_ = tenant_id_;
|
||||
item.table_id_ = report_table_id;
|
||||
item.tablet_id_ = tablet_id_.id();
|
||||
item.ddl_task_id_ = task_id_;
|
||||
item.column_id_ = column_ids.at(i).col_id_;
|
||||
item.task_id_ = -tablet_id_.id();
|
||||
@ -509,7 +510,12 @@ int ObUniqueIndexChecker::report_column_checksum(
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ObDDLChecksumOperator::update_checksum(checksum_items, *GCTX.sql_proxy_))) {
|
||||
uint64_t data_format_version;
|
||||
int64_t snapshot_version = 0;
|
||||
share::ObDDLTaskStatus unused_task_status = share::ObDDLTaskStatus::PREPARE;
|
||||
if (OB_FAIL(ObDDLUtil::get_data_information(tenant_id_, task_id_, data_format_version, snapshot_version, unused_task_status))) {
|
||||
LOG_WARN("get ddl cluster version failed", K(ret));
|
||||
} else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(data_format_version, checksum_items, *GCTX.sql_proxy_))) {
|
||||
LOG_WARN("fail to update checksum", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1508,11 +1508,13 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
} else {
|
||||
if (OB_FAIL(ObDDLChecksumOperator::update_checksum(param_->dest_tenant_id_,
|
||||
param_->orig_table_id_,
|
||||
param_->orig_tablet_id_.id(),
|
||||
param_->task_id_,
|
||||
report_col_checksums,
|
||||
report_col_ids,
|
||||
1/*execution_id*/,
|
||||
param_->tablet_task_id_ << ObDDLChecksumItem::PX_SQC_ID_OFFSET | task_id_,
|
||||
param_->data_format_version_,
|
||||
*GCTX.sql_proxy_))) {
|
||||
LOG_WARN("fail to report origin table checksum", K(ret));
|
||||
} else {
|
||||
@ -1587,7 +1589,8 @@ int ObComplementMergeTask::process()
|
||||
1 /* execution_id */,
|
||||
param_->task_id_,
|
||||
sst_meta_hdl.get_sstable_meta().get_col_checksum(),
|
||||
sst_meta_hdl.get_sstable_meta().get_col_checksum_cnt()))) {
|
||||
sst_meta_hdl.get_sstable_meta().get_col_checksum_cnt(),
|
||||
param_->data_format_version_))) {
|
||||
LOG_WARN("report ddl column checksum failed", K(ret), K(*param_));
|
||||
} else if (OB_FAIL(MTL(ObTabletTableUpdater*)->submit_tablet_update_task(param_->dest_ls_id_, param_->dest_tablet_id_))) {
|
||||
LOG_WARN("fail to submit tablet update task", K(ret), K(*param_));
|
||||
@ -1596,11 +1599,13 @@ int ObComplementMergeTask::process()
|
||||
LOG_WARN("get column checksum failed", K(ret));
|
||||
} else if (param_->use_new_checksum() && OB_FAIL(ObDDLChecksumOperator::update_checksum(param_->dest_tenant_id_,
|
||||
param_->orig_table_id_,
|
||||
param_->orig_tablet_id_.id(),
|
||||
param_->task_id_,
|
||||
report_col_checksums,
|
||||
report_col_ids,
|
||||
1/*execution_id*/,
|
||||
param_->orig_tablet_id_.id(),
|
||||
param_->data_format_version_,
|
||||
*GCTX.sql_proxy_))) {
|
||||
LOG_WARN("fail to report origin table checksum", K(ret));
|
||||
} else if (OB_FAIL(add_build_hidden_table_sstable())) {
|
||||
|
||||
@ -1138,7 +1138,8 @@ int ObTabletDDLUtil::report_ddl_checksum(
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id,
|
||||
const int64_t *column_checksums,
|
||||
const int64_t column_count)
|
||||
const int64_t column_count,
|
||||
const uint64_t data_format_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMySQLProxy *sql_proxy = GCTX.sql_proxy_;
|
||||
@ -1147,9 +1148,9 @@ int ObTabletDDLUtil::report_ddl_checksum(
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
if (OB_UNLIKELY(!tablet_id.is_valid() || OB_INVALID_ID == ddl_task_id
|
||||
|| !is_valid_id(table_id) || 0 == table_id || execution_id < 0 || nullptr == column_checksums || column_count <= 0)) {
|
||||
|| !is_valid_id(table_id) || 0 == table_id || execution_id < 0 || nullptr == column_checksums || column_count <= 0 || data_format_version < 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tablet_id), K(table_id), K(execution_id), KP(column_checksums), K(column_count));
|
||||
LOG_WARN("invalid argument", K(ret), K(tablet_id), K(table_id), K(execution_id), KP(column_checksums), K(column_count), K(data_format_version));
|
||||
} else if (!is_valid_tenant_id(tenant_id) || OB_ISNULL(sql_proxy) || OB_ISNULL(schema_service)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("ls service or sql proxy is null", K(ret), K(tenant_id), KP(sql_proxy), KP(schema_service));
|
||||
@ -1178,6 +1179,7 @@ int ObTabletDDLUtil::report_ddl_checksum(
|
||||
item.execution_id_ = execution_id;
|
||||
item.tenant_id_ = tenant_id;
|
||||
item.table_id_ = table_id;
|
||||
item.tablet_id_ = tablet_id.id();
|
||||
item.ddl_task_id_ = ddl_task_id;
|
||||
item.column_id_ = column_ids.at(i).col_id_;
|
||||
item.task_id_ = tablet_id.id();
|
||||
@ -1208,7 +1210,7 @@ int ObTabletDDLUtil::report_ddl_checksum(
|
||||
}
|
||||
#endif
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(ddl_checksum_items, *sql_proxy))) {
|
||||
} else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(data_format_version, ddl_checksum_items, *sql_proxy))) {
|
||||
LOG_WARN("fail to update checksum", K(ret), K(tablet_id), K(table_id), K(ddl_checksum_items));
|
||||
} else {
|
||||
LOG_INFO("report ddl checkum success", K(tablet_id), K(table_id), K(execution_id), K(ddl_checksum_items));
|
||||
|
||||
@ -130,7 +130,8 @@ public:
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id,
|
||||
const int64_t *column_checksums,
|
||||
const int64_t column_count);
|
||||
const int64_t column_count,
|
||||
const uint64_t data_format_version);
|
||||
|
||||
static int check_and_get_major_sstable(
|
||||
const share::ObLSID &ls_id,
|
||||
|
||||
@ -757,6 +757,10 @@ int ObTenantDirectLoadMgr::check_and_process_finished_tablet(
|
||||
ObLSHandle ls_handle;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObSSTableMetaHandle sst_meta_hdl;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
uint64_t data_format_version = 0;
|
||||
int64_t snapshot_version = 0;
|
||||
share::ObDDLTaskStatus unused_task_status = share::ObDDLTaskStatus::PREPARE;
|
||||
const ObSSTable *first_major_sstable = nullptr;
|
||||
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
|
||||
const int64_t max_wait_timeout_us = 30L * 1000L * 1000L; // 30s
|
||||
@ -797,6 +801,8 @@ int ObTenantDirectLoadMgr::check_and_process_finished_tablet(
|
||||
LOG_WARN("check if major sstable exist failed", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(first_major_sstable->get_meta(sst_meta_hdl))) {
|
||||
LOG_WARN("fail to get sstable meta handle", K(ret));
|
||||
} else if (OB_FAIL(ObDDLUtil::get_data_information(tenant_id, task_id, data_format_version, snapshot_version, unused_task_status))) {
|
||||
LOG_WARN("get ddl cluster version failed", K(ret), K(tenant_id), K(task_id));
|
||||
} else {
|
||||
const int64_t *column_checksums = sst_meta_hdl.get_sstable_meta().get_col_checksum();
|
||||
int64_t column_count = sst_meta_hdl.get_sstable_meta().get_col_checksum_cnt();
|
||||
@ -811,7 +817,8 @@ int ObTenantDirectLoadMgr::check_and_process_finished_tablet(
|
||||
execution_id,
|
||||
task_id,
|
||||
co_column_checksums.empty() ? column_checksums : co_column_checksums.get_data(),
|
||||
co_column_checksums.empty() ? column_count : co_column_checksums.count()))) {
|
||||
co_column_checksums.empty() ? column_count : co_column_checksums.count(),
|
||||
data_format_version))) {
|
||||
LOG_WARN("report ddl column checksum failed", K(ret), K(ls_id), K(tablet_id), K(execution_id));
|
||||
} else {
|
||||
break;
|
||||
@ -2217,7 +2224,8 @@ int ObTabletFullDirectLoadMgr::close(const int64_t execution_id, const SCN &star
|
||||
execution_id,
|
||||
sqc_build_ctx_.build_param_.runtime_only_param_.task_id_,
|
||||
co_column_checksums.empty() ? column_checksums : co_column_checksums.get_data(),
|
||||
co_column_checksums.empty() ? column_count : co_column_checksums.count()))) {
|
||||
co_column_checksums.empty() ? column_count : co_column_checksums.count(),
|
||||
data_format_version_))) {
|
||||
LOG_WARN("report ddl column checksum failed", K(ret), K(ls_id_), K(tablet_id_), K(execution_id), K(sqc_build_ctx_));
|
||||
} else {
|
||||
break;
|
||||
|
||||
Reference in New Issue
Block a user