fix cs replica lob column checksum

This commit is contained in:
Tsunaou 2024-12-12 09:18:37 +00:00 committed by ob-robot
parent 781cdf1f31
commit 172336d992
8 changed files with 168 additions and 47 deletions

View File

@ -246,12 +246,15 @@ int ObCOTabletMergeCtx::check_convert_co_checksum(const ObSSTable *new_sstable)
LOG_WARN("failed to fill column ckm array", K(ret), KPC(co_sstable));
} else if (row_column_checksums.count() != col_column_checksums.count()) {
ret = OB_CHECKSUM_ERROR;
LOG_WARN("column count not match", K(ret), K(row_column_checksums), K(col_column_checksums));
LOG_WARN("column count not match", K(ret), "row_column_ckm_cnt", row_column_checksums.count(), "col_column_ckm_cnt", col_column_checksums.count(),
K(row_column_checksums), K(col_column_checksums), KPC(row_sstable)); // only print partia ObIArray; co_sstable will be printed before.
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < row_column_checksums.count(); ++i) {
if (row_column_checksums.at(i) != col_column_checksums.at(i)) {
ret = OB_CHECKSUM_ERROR;
LOG_ERROR("column checksum error match", K(ret), K(i), K(row_column_checksums), K(col_column_checksums));
LOG_ERROR("column checksum error match", K(ret), "row_ckm", row_column_checksums.at(i) , "col_ckm", col_column_checksums.at(i),
K(i), "row_column_ckm_cnt", row_column_checksums.count(), "col_column_ckm_cnt", col_column_checksums.count(),
K(row_column_checksums), K(col_column_checksums), KPC(row_sstable));
}
}
}

View File

@ -1447,9 +1447,14 @@ int ObBasicTabletMergeCtx::get_convert_compaction_info()
ObStorageSchema *schema_for_merge = nullptr;
ObUpdateCSReplicaSchemaParam param;
bool generate_cs_replica_cg_array = false;
uint64_t min_data_version = 0;
int64_t schema_stored_column_cnt = 0;
int64_t base_major_column_cnt = 0; // include 2 multi version column
if (OB_FAIL(OB_UNLIKELY(EN_COMPACTION_DISABLE_CONVERT_CO))) {
LOG_INFO("EN_COMPACTION_DISABLE_CONVERT_CO: disable convert co merge", K(ret));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), min_data_version))) {
LOG_WARN("failed to get min data version", K(ret));
} else if (OB_FAIL(static_param_.tablet_schema_guard_.load(schema_on_tablet))) {
LOG_WARN("failed to load schema on tablet", K(ret), KPC(tablet));
} else if (OB_UNLIKELY(!is_convert_co_major_merge(get_merge_type()) || OB_ISNULL(schema_on_tablet) || OB_ISNULL(tablet))) {
@ -1457,22 +1462,39 @@ int ObBasicTabletMergeCtx::get_convert_compaction_info()
LOG_WARN("get unexpected static param", K(ret), KPC(schema_on_tablet), K_(static_param), KPC(tablet));
} else if (OB_FAIL(ObStorageSchemaUtil::alloc_storage_schema(mem_ctx_.get_allocator(), schema_for_merge))) {
LOG_WARN("failed to alloc storage schema", K(ret));
} else if (schema_on_tablet->is_column_info_simplified() && OB_FAIL(param.init(*tablet))) {
LOG_WARN("failed to init param", K(ret), KPC(tablet));
} else if (FALSE_IT(generate_cs_replica_cg_array = (schema_on_tablet->is_row_store() || schema_on_tablet->is_column_info_simplified() || schema_on_tablet->need_generate_cg_array()))) {
} else if (OB_FAIL(tablet->get_valid_last_major_column_count(base_major_column_cnt))) {
LOG_WARN("failed to get valid last major column count", K(ret), KPC(tablet));
} else if (schema_on_tablet->is_column_info_simplified()) {
if (OB_FAIL(param.init(tablet->get_tablet_id(), base_major_column_cnt, ObUpdateCSReplicaSchemaParam::REFRESH_TABLE_SCHEMA))) {
LOG_WARN("failed to init param", K(ret), KPC(tablet));
} else {
generate_cs_replica_cg_array = true;
}
} else if (OB_FAIL(schema_on_tablet->get_stored_column_count_in_sstable(schema_stored_column_cnt))) {
LOG_WARN("failed to get stored column count in sstable", K(ret), KPC(schema_on_tablet));
} else if (schema_stored_column_cnt > base_major_column_cnt) {
if (OB_FAIL(param.init(tablet->get_tablet_id(), base_major_column_cnt, ObUpdateCSReplicaSchemaParam::TRUNCATE_COLUMN_ARRAY))) {
LOG_WARN("failed to init param", K(ret), KPC(tablet));
} else {
generate_cs_replica_cg_array = true;
}
}
if (OB_FAIL(ret)) {
} else if (FALSE_IT(generate_cs_replica_cg_array |= (schema_on_tablet->is_row_store() || schema_on_tablet->need_generate_cg_array()))) {
// 1. storage schema is column store but simplifed, it should become not simplified before it can be used for merge
// 2. if need generate cg array (column group cnt <= column cnt), need generate cg array from the latest column array
} else if (OB_FAIL(schema_for_merge->init(mem_ctx_.get_allocator(), *schema_on_tablet,
false /*skip_column_info*/, nullptr /*column_group_schema*/, generate_cs_replica_cg_array,
schema_on_tablet->is_column_info_simplified() ? &param : nullptr))) {
param.is_valid() ? &param : nullptr))) {
LOG_WARN("failed to init storage schema for convert co major merge", K(ret), K(tablet), KPC(schema_on_tablet));
} else {
static_param_.schema_ = schema_for_merge;
static_param_.data_version_ = DATA_CURRENT_VERSION;
static_param_.data_version_ = min_data_version;
static_param_.is_rebuild_column_store_ = true;
static_param_.is_schema_changed_ = true; // use MACRO_BLOCK_MERGE_LEVEL
static_param_.merge_reason_ = ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP;
FLOG_INFO("[CS-Replica] get storage schema to convert co merge", "param", get_dag_param(), KPC_(static_param_.schema));
FLOG_INFO("[CS-Replica] get storage schema to convert co merge", K(param), K(generate_cs_replica_cg_array), "dag_param", get_dag_param(), KPC_(static_param_.schema));
}
if (OB_FAIL(ret) && OB_NOT_NULL(schema_for_merge)) {

View File

@ -186,6 +186,25 @@ int ObLobManager::fill_lob_header(ObIAllocator &allocator, ObString &data, ObStr
return ret;
}
// Only use for default lob col val.
int ObLobManager::fill_lob_header(
ObIAllocator &allocator,
ObStorageDatum &datum)
{
int ret = OB_SUCCESS;
if (datum.is_null() || datum.is_nop_value()) {
} else {
ObString data = datum.get_string();
ObString out;
if (OB_FAIL(ObLobManager::fill_lob_header(allocator, data, out))) {
LOG_WARN("failed to fill lob header for column.", K(data));
} else {
datum.set_string(out);
}
}
return ret;
}
// Only use for default lob col val
int ObLobManager::fill_lob_header(ObIAllocator &allocator,
const ObIArray<share::schema::ObColDesc> &column_ids,
@ -194,15 +213,8 @@ int ObLobManager::fill_lob_header(ObIAllocator &allocator,
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < column_ids.count(); ++i) {
if (column_ids.at(i).col_type_.is_lob_storage()) {
if (datum_row.storage_datums_[i].is_null() || datum_row.storage_datums_[i].is_nop_value()) {
} else {
ObString data = datum_row.storage_datums_[i].get_string();
ObString out;
if (OB_FAIL(ObLobManager::fill_lob_header(allocator, data, out))) {
LOG_WARN("failed to fill lob header for column.", K(i), K(column_ids), K(data));
} else {
datum_row.storage_datums_[i].set_string(out);
}
if (OB_FAIL(fill_lob_header(allocator, datum_row.storage_datums_[i]))) {
LOG_WARN("failed to fill lob header for column.", K(i), K(column_ids), K(datum_row));
}
}
}

View File

@ -71,6 +71,7 @@ public:
// Only use for default lob col val
static int fill_lob_header(ObIAllocator &allocator, ObString &data, ObString &out);
static int fill_lob_header(ObIAllocator &allocator, ObStorageDatum &datum);
static int fill_lob_header(ObIAllocator &allocator,
const ObIArray<share::schema::ObColDesc> &column_ids,
blocksstable::ObDatumRow &datum_row);

View File

@ -400,6 +400,7 @@ int ObStorageColumnGroupSchema::copy_from(ObIArray<ObColDesc> &column_ids,
ObUpdateCSReplicaSchemaParam::ObUpdateCSReplicaSchemaParam()
: tablet_id_(),
major_column_cnt_(0),
update_type_(UpdateType::MAX_TYPE),
is_inited_(false)
{
@ -410,25 +411,23 @@ ObUpdateCSReplicaSchemaParam::~ObUpdateCSReplicaSchemaParam()
}
int ObUpdateCSReplicaSchemaParam::init(const ObTablet &tablet)
int ObUpdateCSReplicaSchemaParam::init(
const ObTabletID &tablet_id,
const int64_t major_column_cnt,
const UpdateType update_type)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
STORAGE_LOG(WARN, "ObUpdateCSReplicaSchemaParam has been inited", K(ret), KPC(this));
} else if (tablet.get_last_major_column_count() < 0) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "invalid last major column count", K(ret), K(tablet));
} else if (tablet.get_last_major_column_count() == 0) {
if (tablet.get_tablet_meta().table_store_flag_.with_major_sstable()) {
ret = OB_EAGAIN;
} else {
ret = OB_ERR_UNEXPECTED;
}
STORAGE_LOG(WARN, "tablet has no major sstable", K(ret), K(tablet));
} else if (OB_UNLIKELY(!tablet_id.is_valid() || update_type >= UpdateType::MAX_TYPE
|| major_column_cnt <= ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(tablet_id), K(major_column_cnt), K(update_type));
} else {
tablet_id_ = tablet.get_tablet_meta().tablet_id_;
major_column_cnt_ = tablet.get_last_major_column_count();
tablet_id_ = tablet_id;
major_column_cnt_ = major_column_cnt;
update_type_ = update_type;
is_inited_ = true;
}
return ret;
@ -438,7 +437,9 @@ bool ObUpdateCSReplicaSchemaParam::is_valid() const
{
return is_inited_
&& tablet_id_.is_valid()
&& major_column_cnt_ > 0;
&& major_column_cnt_ > ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()
&& update_type_ >= UpdateType::REFRESH_TABLE_SCHEMA
&& update_type_ < UpdateType::MAX_TYPE;
}
/*
@ -582,11 +583,21 @@ int ObStorageSchema::init(
} else if (OB_FAIL(copy_from(old_schema))) {
STORAGE_LOG(WARN, "failed to copy from old schema", K(ret), K(old_schema));
} else if (FALSE_IT(column_info_simplified_ = (skip_column_info || old_schema.column_info_simplified_))) {
} else if (OB_UNLIKELY((generate_cs_replica_cg_array && column_group_schema != nullptr)
|| (nullptr != update_param && !column_info_simplified_)
|| (nullptr != update_param && nullptr != column_group_schema))) {
} else if (OB_UNLIKELY(generate_cs_replica_cg_array && column_group_schema != nullptr)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument to init storage schema", K(ret), K(column_info_simplified_), K(column_group_schema), KPC(update_param));
STORAGE_LOG(WARN, "invalid args to init storage schema", K(ret), KPC(column_group_schema));
} else if (nullptr != update_param) {
if (OB_UNLIKELY(!update_param->is_valid() || nullptr != column_group_schema)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid args", K(ret), K(update_param), KPC(column_group_schema));
} else if (OB_UNLIKELY(update_param->need_refresh_schema() && !column_info_simplified_
|| update_param->need_truncate_column_array() && column_info_simplified_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid args", K(ret), K(update_param), K(skip_column_info), K(old_schema));
}
}
if (OB_FAIL(ret)) {
} else {
allocator_ = &allocator;
rowkey_array_.set_allocator(&allocator);
@ -680,16 +691,46 @@ int ObStorageSchema::rebuild_column_array(
const ObUpdateCSReplicaSchemaParam &update_param)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!update_param.is_valid() || !src_schema.is_column_info_simplified())) {
int64_t expected_stored_column_cnt = 0;
if (OB_UNLIKELY(!update_param.is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(update_param), K(src_schema));
} else if (OB_FAIL(ObCSReplicaUtil::get_full_column_array_from_table_schema(allocator, update_param, src_schema, column_array_))) {
STORAGE_LOG(WARN, "failed to get full column array from table schema", K(ret), K(update_param));
} else {
column_info_simplified_ = false;
column_cnt_ = column_array_.count();
store_column_cnt_ = update_param.major_column_cnt_ - ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
STORAGE_LOG(INFO, "rebuild column array from table schema", K(ret), K(update_param), K(src_schema), K_(column_array));
STORAGE_LOG(WARN, "invalid argument", K(ret), K(update_param));
} else if (FALSE_IT(expected_stored_column_cnt = update_param.major_column_cnt_ - ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt())) {
} else if (update_param.need_refresh_schema()) {
if (OB_UNLIKELY(!src_schema.is_column_info_simplified())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "src schema is not simplified", K(ret), K(src_schema));
} else if (OB_FAIL(ObCSReplicaUtil::get_full_column_array_from_table_schema(allocator, update_param, src_schema, column_array_))) {
STORAGE_LOG(WARN, "failed to get full column array from table schema", K(ret), K(update_param));
} else {
column_info_simplified_ = false;
column_cnt_ = column_array_.count();
store_column_cnt_ = expected_stored_column_cnt;
STORAGE_LOG(INFO, "rebuild column array from table schema", K(ret), K(update_param), K(src_schema), K_(column_array));
}
} else if (update_param.need_truncate_column_array()) {
const int64_t original_column_cnt = column_array_.count();
int64_t stored_column_cnt = 0;
bool finish_truncate = false;
for (int64_t i = 0; i < original_column_cnt; ++i) {
if (column_array_.at(i).is_column_stored_in_sstable()) {
stored_column_cnt++;
if (stored_column_cnt == expected_stored_column_cnt) {
for (int64_t j = i + 1; j < original_column_cnt; ++j) {
column_array_.pop_back();
}
finish_truncate = true;
column_cnt_ = column_array_.count();
store_column_cnt_ = expected_stored_column_cnt;
STORAGE_LOG(INFO, "truncate column array for convert co merge", K(ret), K(update_param), K_(column_array));
break;
}
}
}
if (!finish_truncate) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "failed to truncate column array", K(ret), K(update_param), K_(column_array));
}
}
return ret;
}
@ -1593,6 +1634,9 @@ int ObStorageSchema::generate_column_array(const ObTableSchema &input_schema)
col_schema.meta_type_ = meta_type;
if (OB_FAIL(datum.from_obj_enhance(col->get_orig_default_value()))) {
STORAGE_LOG(WARN, "Failed to transfer obj to datum", K(ret));
} else if (ob_is_large_text(col->get_data_type()) && !datum.has_lob_header()
&& OB_FAIL(ObLobManager::fill_lob_header(*allocator_, datum))) {
STORAGE_LOG(WARN, "failed to fill lob header", K(ret), K(datum));
} else {
col_schema.default_checksum_ = datum.checksum(0);
#ifdef ERRSIM

View File

@ -174,15 +174,27 @@ public:
struct ObUpdateCSReplicaSchemaParam
{
public:
enum UpdateType : uint8_t {
REFRESH_TABLE_SCHEMA = 0, // storage schema is simplified, need get storage schema from table schema
TRUNCATE_COLUMN_ARRAY = 1, // column array in tablet storage schema is newer than last row store major, need truncate
MAX_TYPE
};
public:
ObUpdateCSReplicaSchemaParam();
~ObUpdateCSReplicaSchemaParam();
int init(const ObTablet &tablet);
int init(
const ObTabletID &tablet_id,
const int64_t major_column_cnt,
const UpdateType update_type);
bool is_valid() const;
TO_STRING_KV(K_(tablet_id), K_(major_column_cnt), K_(is_inited));
inline bool need_refresh_schema() const { return REFRESH_TABLE_SCHEMA == update_type_; }
inline bool need_truncate_column_array() const { return TRUNCATE_COLUMN_ARRAY == update_type_; }
TO_STRING_KV(K_(tablet_id), K_(major_column_cnt), K_(update_type), K_(is_inited));
public:
ObTabletID tablet_id_;
int64_t major_column_cnt_;
UpdateType update_type_;
bool is_inited_;
};

View File

@ -1451,7 +1451,10 @@ int ObTablet::init_with_replace_members(
} else if (old_storage_schema->is_column_info_simplified()) {
ObUpdateCSReplicaSchemaParam param;
ObStorageSchema *full_storage_schema = nullptr;
if (OB_FAIL(param.init(*this))) {
int64_t base_major_column_cnt = 0;
if (OB_FAIL(get_valid_last_major_column_count(base_major_column_cnt))) {
LOG_WARN("failed to get valid last major column count", K(ret), KPC(this));
} else if (OB_FAIL(param.init(get_tablet_id(), base_major_column_cnt, ObUpdateCSReplicaSchemaParam::REFRESH_TABLE_SCHEMA))) {
LOG_WARN("fail to init param", K(ret), KPC(this));
} else if (OB_FAIL(ObCSReplicaUtil::get_rebuild_storage_schema(allocator, param, *old_storage_schema, full_storage_schema))) {
LOG_WARN("failed to get rebuild storage schema", K(ret), K(param), KPC(old_storage_schema));
@ -6906,6 +6909,29 @@ int ObTablet::scan_mds_table_with_op(
return ret;
}
int ObTablet::get_valid_last_major_column_count(int64_t &last_major_column_cnt) const
{
int ret = OB_SUCCESS;
last_major_column_cnt = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", KR(ret), K_(is_inited));
} else if (OB_UNLIKELY(get_last_major_column_count() < 0)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "invalid last major column count", K(ret), KPC(this));
} else if (get_last_major_column_count() == 0) {
if (tablet_meta_.table_store_flag_.with_major_sstable()) {
ret = OB_EAGAIN;
} else {
ret = OB_ERR_UNEXPECTED;
}
STORAGE_LOG(WARN, "tablet has no major sstable", K(ret), KPC(this));
} else {
last_major_column_cnt = get_last_major_column_count();
}
return ret;
}
int ObTablet::get_storage_schema_for_transfer_in(
common::ObArenaAllocator &allocator,
ObStorageSchema &storage_schema) const

View File

@ -184,6 +184,7 @@ public:
int scan_mds_table_with_op(
const int64_t mds_construct_sequence,
ObMdsMiniMergeOperator &op) const;
int get_valid_last_major_column_count(int64_t &last_major_column_cnt) const;
public:
// first time create tablet