fix lob column checksum validation for cs replica

This commit is contained in:
Fengjingkun 2024-10-16 09:14:04 +00:00 committed by ob-robot
parent 06cce66ff4
commit 765b100b29
4 changed files with 141 additions and 5 deletions

View File

@ -339,11 +339,52 @@ int ObTabletReplicaChecksumItem::verify_checksum(const ObTabletReplicaChecksumIt
int ObTabletReplicaChecksumItem::verify_column_checksum(const ObTabletReplicaChecksumItem &other) const
{
int ret = OB_SUCCESS;
if (compaction_scn_ == other.compaction_scn_) {
bool column_meta_equal = false;
if (OB_FAIL(column_meta_.check_equal(other.column_meta_, column_meta_equal))) {
bool is_cs_replica = false;
if (OB_UNLIKELY(compaction_scn_ != other.compaction_scn_)) {
// do nothing
} else if (OB_FAIL(column_meta_.check_equal(other.column_meta_, column_meta_equal))) {
LOG_WARN("fail to check column meta equal", KR(ret), K(other), K(*this));
} else if (!column_meta_equal) {
} else if (column_meta_equal) {
// do nothing
} else if (OB_FAIL(check_data_checksum_type(is_cs_replica))) {
LOG_WARN("fail to check data checksum type", KR(ret), KPC(this));
} else if (is_cs_replica) {
// do nothing
} else if (OB_FAIL(other.check_data_checksum_type(is_cs_replica))) {
LOG_WARN("fail to check data checksum type", KR(ret), K(other));
} else if (is_cs_replica) {
// do nothing
} else {
ret = OB_CHECKSUM_ERROR;
}
bool is_large_text_column = false;
uint64_t compat_version = 0;
if (OB_FAIL(ret) || !is_cs_replica) {
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, compat_version))) {
LOG_WARN("failed to get min data version", K(ret), K(tenant_id_));
} else if (compat_version >= DATA_VERSION_4_3_4_0) {
// should not skip the validation of lob column between cs replica and row replica
} else {
ObSEArray<int64_t, 8> column_idxs;
for (int64_t idx = 0; OB_SUCC(ret) && idx < column_meta_.column_checksums_.count(); ++idx) {
if (column_meta_.column_checksums_.at(idx) == other.column_meta_.column_checksums_.at(idx)) {
// do nothing
} else if (OB_FAIL(column_idxs.push_back(idx))) {
LOG_WARN("failed to add column idx", K(ret), K(idx));
}
}
if (FAILEDx(compaction::ObCSReplicaChecksumHelper::check_column_type(tablet_id_,
compaction_scn_.get_val_for_tx(),
column_idxs,
is_large_text_column))) {
LOG_WARN("failed to check column type for cs replica", K(ret), KPC(this), K(other));
} else if (is_large_text_column) {
// do nothing
} else {
ret = OB_CHECKSUM_ERROR;
}
}

View File

@ -44,6 +44,7 @@
#include "storage/compaction/ob_tenant_medium_checker.h"
#include "storage/compaction/ob_batch_freeze_tablets_dag.h"
#include "storage/ob_gc_upper_trans_helper.h"
#include "share/schema/ob_tenant_schema_service.h"
namespace oceanbase
{
@ -241,6 +242,75 @@ void ObFastFreezeChecker::try_update_tablet_threshold(
}
/*******************************************ObCSReplicaChecksumHelper impl*****************************************/
int ObCSReplicaChecksumHelper::check_column_type(
const common::ObTabletID &tablet_id,
const int64_t compaction_scn,
const common::ObIArray<int64_t> &column_idxs,
bool &is_large_text_column)
{
int ret = OB_SUCCESS;
share::ObFreezeInfo freeze_info;
uint64_t table_id = 0;
schema::ObMultiVersionSchemaService *schema_service = nullptr;
schema::ObSchemaGetterGuard schema_guard;
ObSEArray<ObColDesc, 16> column_descs;
int64_t save_schema_version = 0;
const ObTableSchema *table_schema = nullptr;
is_large_text_column = true;
if (OB_UNLIKELY(!tablet_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arguments", K(ret), K(tablet_id), K(column_idxs));
} else if (OB_UNLIKELY(column_idxs.empty())) {
// do nothing
} else if (OB_FAIL(MTL(ObTenantFreezeInfoMgr *)->get_freeze_info_by_snapshot_version(compaction_scn, freeze_info))) {
LOG_WARN("failed to get major freeze info", K(ret), K(compaction_scn));
} else if (FALSE_IT(save_schema_version = freeze_info.schema_version_)) {
} else if (OB_ISNULL(schema_service = MTL(ObTenantSchemaService *)->get_schema_service())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null schema service", K(ret));
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_table_id(*schema_service,
tablet_id,
freeze_info.schema_version_,
table_id))) {
LOG_WARN("failed to get table id", K(ret), K(tablet_id));
} else if (OB_FAIL(MTL(ObTenantSchemaService *)->get_schema_service()->retry_get_schema_guard(MTL_ID(),
freeze_info.schema_version_,
table_id,
schema_guard,
save_schema_version))) {
LOG_WARN("failed to get schema guard", K(ret));
} else if (OB_UNLIKELY(save_schema_version < freeze_info.schema_version_)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("can not use older schema version", K(ret), K(freeze_info), K(save_schema_version), K(table_id));
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) {
LOG_WARN("Fail to get table schema", K(ret), K(table_id));
} else if (NULL == table_schema) {
ret = OB_TABLE_IS_DELETED;
LOG_WARN("table is deleted", K(ret), K(table_id));
} else if (OB_FAIL(table_schema->get_multi_version_column_descs(column_descs))) {
LOG_WARN("failed to get multi version column descs", K(ret), K(tablet_id), KPC(table_schema));
} else {
for (int64_t idx = 0; is_large_text_column && OB_SUCC(ret) && idx < column_idxs.count(); ++idx) {
const int64_t cur_col_idx = column_idxs.at(idx);
const int64_t column_id = column_descs.at(cur_col_idx).col_id_;
const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(column_id);
if (OB_ISNULL(col_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null col schema", K(ret));
} else {
is_large_text_column = ob_is_large_text(col_schema->get_data_type());
LOG_DEBUG("check column type for cs replica", K(cur_col_idx), KPC(col_schema),
"rowkey_cnt", table_schema->get_rowkey_column_num(), KPC(table_schema));
}
}
}
return ret;
}
/********************************************ObTenantTabletScheduler impl******************************************/
constexpr ObMergeType ObTenantTabletScheduler::MERGE_TYPES[];

View File

@ -127,6 +127,18 @@ private:
common::hash::ObHashMap<ObTabletID, ProhibitFlag> tablet_id_map_; // tablet is used for transfer of medium compaction
};
struct ObCSReplicaChecksumHelper
{
public:
static int check_column_type(
const common::ObTabletID &tablet_id,
const int64_t compaction_scn,
const common::ObIArray<int64_t> &column_idxs,
bool &is_large_text_column);
};
class ObTenantTabletScheduler : public ObBasicMergeScheduler
{
public:

View File

@ -18,6 +18,7 @@
#include "share/schema/ob_schema_struct.h"
#include "storage/ob_storage_struct.h"
#include "storage/column_store/ob_column_store_replica_util.h"
#include "share/ob_cluster_version.h"
namespace oceanbase
{
@ -1530,6 +1531,7 @@ int ObStorageSchema::generate_column_array(const ObTableSchema &input_schema)
int ret = OB_SUCCESS;
// build column schema map
common::hash::ObHashMap<uint64_t, uint64_t> tmp_map; // column_id -> index
if (OB_FAIL(tmp_map.create(input_schema.get_column_count(), "StorageSchema"))) {
STORAGE_LOG(WARN, "failed to create map", K(ret));
} else if (OB_FAIL(input_schema.check_column_array_sorted_by_column_id(true/*skip_rowkey*/))) {
@ -1831,6 +1833,8 @@ int ObStorageSchema::init_column_meta_array(
{
int ret = OB_SUCCESS;
ObArray<ObColDesc> columns;
uint64_t compat_version = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "not inited", K(ret), K_(is_inited));
@ -1839,6 +1843,8 @@ int ObStorageSchema::init_column_meta_array(
STORAGE_LOG(WARN, "not support get multi version column desc array when column simplified", K(ret), KPC(this));
} else if (OB_FAIL(get_multi_version_column_descs(columns))) {
STORAGE_LOG(WARN, "fail to get store column ids", K(ret));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
STORAGE_LOG(WARN, "failed to get min data version", K(ret));
} else {
// build column schema map
common::hash::ObHashMap<uint64_t, uint64_t> tmp_map; // column_id -> index
@ -1873,6 +1879,13 @@ int ObStorageSchema::init_column_meta_array(
if (!col_schema.is_column_stored_in_sstable_ && !is_storage_index_table()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "virtual generated column should be filtered already", K(ret), K(col_schema));
} else if (ob_is_large_text(col_schema.get_data_type()) && compat_version < DATA_VERSION_4_3_4_0) {
blocksstable::ObStorageDatum datum;
if (OB_FAIL(datum.from_obj_enhance(col_schema.get_orig_default_value()))) {
STORAGE_LOG(WARN, "Failed to transfer obj to datum", K(ret));
} else {
col_meta.column_default_checksum_ = datum.checksum(0);
}
} else {
col_meta.column_default_checksum_ = col_schema.default_checksum_;
}