add defense for column checksum after convert co merge

This commit is contained in:
Tsunaou 2024-10-31 03:17:46 +00:00 committed by ob-robot
parent ddfbc8c07a
commit b822115873
3 changed files with 52 additions and 0 deletions

View File

@ -213,6 +213,40 @@ int ObCOTabletMergeCtx::prepare_cs_replica_param()
return ret;
}
int ObCOTabletMergeCtx::check_convert_co_checksum(const ObSSTable *new_sstable)
{
int ret = OB_SUCCESS;
if (is_convert_co_major_merge(get_merge_type())) {
const ObITable *base_table = get_tables_handle().get_table(0);
const ObCOSSTableV2 *co_sstable = nullptr;
const ObSSTable *row_sstable = nullptr;
ObSEArray<int64_t, 16> row_column_checksums;
ObSEArray<int64_t, 16> col_column_checksums;
if (OB_ISNULL(new_sstable) || OB_ISNULL(base_table)
|| OB_UNLIKELY(!new_sstable->is_co_sstable() || !base_table->is_sstable())
|| OB_ISNULL(co_sstable = static_cast<const ObCOSSTableV2 *>(new_sstable))
|| OB_ISNULL(row_sstable = static_cast<const ObSSTable *>(base_table))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid sstable", K(ret), KPC(new_sstable), KPC(co_sstable), KPC(base_table), KPC(row_sstable));
} else if (OB_FAIL(row_sstable->fill_column_ckm_array(row_column_checksums))) {
LOG_WARN("failed to fill column ckm array", K(ret), KPC(row_sstable));
} else if (OB_FAIL(co_sstable->fill_column_ckm_array(*get_schema(), col_column_checksums))) {
LOG_WARN("failed to fill column ckm array", K(ret), KPC(co_sstable));
} else if (row_column_checksums.count() != col_column_checksums.count()) {
ret = OB_CHECKSUM_ERROR;
LOG_WARN("column count not match", K(ret), K(row_column_checksums), K(col_column_checksums));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < row_column_checksums.count(); ++i) {
if (row_column_checksums.at(i) != col_column_checksums.at(i)) {
ret = OB_CHECKSUM_ERROR;
LOG_ERROR("column checksum error match", K(ret), K(i), K(row_column_checksums), K(col_column_checksums));
}
}
}
}
return ret;
}
int ObCOTabletMergeCtx::prepare_schema()
{
int ret = OB_SUCCESS;
@ -649,6 +683,10 @@ int ObCOTabletMergeCtx::create_sstable(const ObSSTable *&new_sstable)
new_sstable = static_cast<ObSSTable *>(base_co_table);
LOG_DEBUG("[RowColSwitch] FinsihTask with only one co sstable", KPC(new_sstable));
}
if (FAILEDx(check_convert_co_checksum(new_sstable))) {
LOG_WARN("failed to check convert co checksum", K(ret), KPC(new_sstable));
}
return ret;
}

View File

@ -147,6 +147,7 @@ struct ObCOTabletMergeCtx : public ObBasicTabletMergeCtx
int prepare_mocked_row_store_cg_schema();
bool should_mock_row_store_cg_schema();
int prepare_cs_replica_param();
int check_convert_co_checksum(const ObSSTable *new_sstable);
OB_INLINE bool is_build_row_store_from_rowkey_cg() const { return static_param_.is_build_row_store_from_rowkey_cg(); }
OB_INLINE bool is_build_row_store() const { return static_param_.is_build_row_store(); }
int get_cg_schema_for_merge(const int64_t idx, const ObStorageColumnGroupSchema *&cg_schema_ptr);

View File

@ -41,6 +41,7 @@ namespace oceanbase
{
namespace storage
{
ERRSIM_POINT_DEF(EN_CO_SSTABLE_COLUMN_CHECKSUM_ERROR);
ObSSTableWrapper::ObSSTableWrapper()
: meta_handle_(),
@ -973,6 +974,18 @@ int ObCOSSTableV2::fill_column_ckm_array(
}
}
} // for
#ifdef ERRSIM
if (OB_SUCC(ret)) {
int tmp_ret = EN_CO_SSTABLE_COLUMN_CHECKSUM_ERROR;
if (OB_SUCCESS != tmp_ret) {
const int64_t column_cnt = column_checksums.count();
for (int64_t j = column_cnt - 1; j - 1 >= 0; j--) {
column_checksums.at(j) = column_checksums.at(j-1);
}
LOG_INFO("ERRSIM EN_CO_SSTABLE_COLUMN_CHECKSUM_ERROR, after errsim", K(tmp_ret), K(column_checksums));
}
}
#endif
}
}
return ret;