[CP] move is_cs_replica_compat from storage schema to rowkey_read_info in ObTablet

This commit is contained in:
Tsunaou 2024-09-30 10:46:01 +00:00 committed by ob-robot
parent 3d555f8db1
commit bc498ed2a8
17 changed files with 113 additions and 80 deletions

View File

@ -218,7 +218,7 @@ TEST_F(TestLSMigrationParam, test_placeholder_storage_schema)
placeholder_tablet.tablet_meta_.compat_mode_ = lib::Worker::get_compatibility_mode();
ASSERT_NE(nullptr, ptr = allocator.alloc(sizeof(ObRowkeyReadInfo)));
placeholder_tablet.rowkey_read_info_ = new (ptr) ObRowkeyReadInfo();
placeholder_tablet.build_read_info(allocator);
placeholder_tablet.build_read_info(allocator, nullptr /*tablet*/, false /*is_cs_replica_compat*/);
ASSERT_EQ(OB_SUCCESS, ret);
}

View File

@ -969,10 +969,11 @@ int ObTableParam::construct_columns_and_projector(
} else if (!is_cs && query_cs_replica) {
is_cs = true;
is_column_replica_table_ = true;
has_all_column_group = false;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(table_schema.has_all_column_group(has_all_column_group))) {
} else if (!is_column_replica_table_ && OB_FAIL(table_schema.has_all_column_group(has_all_column_group))) {
LOG_WARN("Failed to check if has all column group", K(ret));
} else {
// column array

View File

@ -270,6 +270,8 @@ int ObTableAccessParam::init(
} else if (OB_UNLIKELY(nullptr == rowkey_read_info && nullptr == tablet_handle)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), KP(rowkey_read_info), KP(tablet_handle));
} else if (OB_NOT_NULL(tablet_handle) && OB_FAIL(check_valid_before_query_init(*scan_param.table_param_, *tablet_handle))) {
LOG_WARN("failed to check cs replica compat schema", K(ret), KPC(tablet_handle));
} else {
const share::schema::ObTableParam &table_param = *scan_param.table_param_;
iter_param_.table_id_ = table_param.get_table_id();
@ -366,6 +368,22 @@ int ObTableAccessParam::init(
return ret;
}
int ObTableAccessParam::check_valid_before_query_init(
const ObTableParam &table_param,
const ObTabletHandle &tablet_handle)
{
int ret = OB_SUCCESS;
ObTablet *tablet = nullptr;
if (OB_UNLIKELY(!tablet_handle.is_valid() || OB_ISNULL(tablet = tablet_handle.get_obj()))) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid table handle", K(ret), K(tablet_handle), KPC(tablet));
} else if (OB_UNLIKELY(tablet->is_cs_replica_compat() && !table_param.is_column_replica_table())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid table param for cs replica tablet", K(ret), K(table_param), KPC(tablet));
}
return ret;
}
int ObTableAccessParam::get_prefix_cnt_for_skip_scan(const ObTableScanParam &scan_param, ObTableIterParam &iter_param)
{
int ret = OB_SUCCESS;

View File

@ -268,6 +268,8 @@ public:
OB_INLINE void set_use_global_iter_pool() { iter_param_.set_use_global_iter_pool(); }
OB_INLINE void diable_use_global_iter_pool() { iter_param_.diable_use_global_iter_pool(); }
OB_INLINE bool is_use_global_iter_pool() const { return iter_param_.is_use_global_iter_pool(); }
private:
int check_valid_before_query_init(const ObTableParam &table_param, const ObTabletHandle &tablet_handle);
public:
DECLARE_TO_STRING;
public:

View File

@ -227,6 +227,7 @@ void ObReadInfoStruct::reset()
allocator_ = nullptr;
schema_column_count_ = 0;
compat_version_ = READ_INFO_VERSION_V3;
is_cs_replica_compat_ = false;
reserved_ = 0;
schema_rowkey_cnt_ = 0;
rowkey_cnt_ = 0;
@ -239,12 +240,14 @@ void ObReadInfoStruct::reset()
void ObReadInfoStruct::init_basic_info(const int64_t schema_column_count,
const int64_t schema_rowkey_cnt,
const bool is_oracle_mode,
const bool is_cg_sstable) {
const bool is_cg_sstable,
const bool is_cs_replica_compat) {
const int64_t extra_rowkey_cnt = is_cg_sstable ? 0 : storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
schema_column_count_ = schema_column_count;
schema_rowkey_cnt_ = schema_rowkey_cnt;
rowkey_cnt_ = schema_rowkey_cnt + extra_rowkey_cnt;
is_oracle_mode_ = is_oracle_mode;
is_cs_replica_compat_ = is_cs_replica_compat;
}
int ObReadInfoStruct::generate_for_column_store(ObIAllocator &allocator,
@ -282,6 +285,7 @@ int64_t ObReadInfoStruct::to_string(char *buf, const int64_t buf_len) const
} else {
J_OBJ_START();
J_KV(K_(is_inited), K_(compat_version), K_(is_oracle_mode),
K_(is_cs_replica_compat),
K_(schema_column_count),
K_(schema_rowkey_cnt),
K_(rowkey_cnt),
@ -349,7 +353,7 @@ int ObTableReadInfo::mock_for_sstable_query(
LOG_WARN("failed to pre check", K(ret));
} else if (OB_FAIL(init_compat_version())) { // init compat verion
LOG_WARN("failed to init compat version", KR(ret));
} else if (FALSE_IT(init_basic_info(schema_column_count, schema_rowkey_cnt, is_oracle_mode, is_cg_sstable))) { // init basic info
} else if (FALSE_IT(init_basic_info(schema_column_count, schema_rowkey_cnt, is_oracle_mode, is_cg_sstable, false /*is_cs_replica_compat*/))) { // init basic info
} else if (OB_FAIL(cols_desc_.init_and_assign(cols_desc, allocator))) {
LOG_WARN("Fail to assign cols_desc", K(ret));
} else if (OB_FAIL(cols_index_.init_and_assign(storage_cols_index, allocator))) {
@ -420,7 +424,7 @@ int ObTableReadInfo::init(
LOG_WARN("failed to pre check", K(ret));
} else if (OB_FAIL(init_compat_version())) { // init compat verion
LOG_WARN("failed to init compat version", KR(ret));
} else if (FALSE_IT(init_basic_info(schema_column_count, schema_rowkey_cnt, is_oracle_mode, is_cg_sstable))) { // init basic info
} else if (FALSE_IT(init_basic_info(schema_column_count, schema_rowkey_cnt, is_oracle_mode, is_cg_sstable, false /*is_cs_replica_compat*/))) { // init basic info
} else if (OB_FAIL(ObReadInfoStruct::prepare_arrays(allocator, cols_desc, cols_desc.count()))) {
LOG_WARN("failed to prepare arrays", K(ret), K(cols_desc.count()));
} else if (nullptr != cols_param && OB_FAIL(cols_param_.init_and_assign(*cols_param, allocator))) {
@ -784,7 +788,8 @@ int ObRowkeyReadInfo::init(
const bool is_oracle_mode,
const common::ObIArray<ObColDesc> &rowkey_col_descs,
const bool is_cg_sstable,
const bool use_default_compat_version)
const bool use_default_compat_version,
const bool is_cs_replica_compat)
{
int ret = OB_SUCCESS;
const int64_t extra_rowkey_cnt = is_cg_sstable ? 0: storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
@ -803,7 +808,7 @@ int ObRowkeyReadInfo::init(
LOG_WARN("failed to init compat version", KR(ret));
}
if (OB_SUCC(ret)) {
init_basic_info(schema_column_count, schema_rowkey_cnt, is_oracle_mode, is_cg_sstable); // init basic info
init_basic_info(schema_column_count, schema_rowkey_cnt, is_oracle_mode, is_cg_sstable, is_cs_replica_compat); // init basic info
if (OB_FAIL(prepare_arrays(allocator, rowkey_col_descs, out_cols_cnt))) {
LOG_WARN("failed to prepare arrays", K(ret), K(out_cols_cnt));
} else if (OB_FAIL(datum_utils_.init(cols_desc_, schema_rowkey_cnt_, is_oracle_mode_, allocator, is_cg_sstable))) {
@ -1170,7 +1175,8 @@ int ObTenantCGReadInfoMgr::construct_index_read_info(ObIAllocator &allocator, Ob
lib::is_oracle_mode(),
idx_cols_desc,
true, /* is_cg_sstable */
true /* use_default_compat_version */))) {
true /* use_default_compat_version */,
false /* is_cs_replica_compat */))) {
STORAGE_LOG(WARN, "Fail to init mtl index read info", K(ret));
}

View File

@ -149,6 +149,7 @@ public:
allocator_(nullptr),
schema_column_count_(0),
compat_version_(READ_INFO_VERSION_V3),
is_cs_replica_compat_(false),
reserved_(0),
schema_rowkey_cnt_(0),
rowkey_cnt_(0),
@ -230,6 +231,7 @@ public:
OB_ASSERT_MSG(false, "ObReadInfoStruct dose not promise all column group");
return false;
}
OB_INLINE bool is_cs_replica_compat() const { return is_cs_replica_compat_; }
DECLARE_VIRTUAL_TO_STRING;
int generate_for_column_store(ObIAllocator &allocator,
const ObColDesc &desc,
@ -237,7 +239,8 @@ public:
void init_basic_info(const int64_t schema_column_count,
const int64_t schema_rowkey_cnt,
const bool is_oracle_mode,
const bool is_cg_sstable);
const bool is_cg_sstable,
const bool is_cs_replica_compat);
int prepare_arrays(common::ObIAllocator &allocator,
const common::ObIArray<ObColDesc> &cols_desc,
const int64_t col_cnt);
@ -247,6 +250,9 @@ protected:
static const int64_t READ_INFO_VERSION_V1 = 1;
static const int64_t READ_INFO_VERSION_V2 = 2;
static const int64_t READ_INFO_VERSION_V3 = 3;
static const int32_t READ_INFO_ONE_BIT = 1;
static const int32_t READ_INFO_RESERVED_BITS = 15;
bool is_inited_;
bool is_oracle_mode_;
ObIAllocator *allocator_;
@ -256,7 +262,8 @@ protected:
struct {
uint32_t schema_column_count_;
uint16_t compat_version_;
uint16_t reserved_;
uint16_t is_cs_replica_compat_ : READ_INFO_ONE_BIT; // only used for rowkey_read_info in ObTablet
uint16_t reserved_ : READ_INFO_RESERVED_BITS;
};
};
int64_t schema_rowkey_cnt_;
@ -396,7 +403,8 @@ public:
const bool is_oracle_mode,
const common::ObIArray<ObColDesc> &rowkey_col_descs,
const bool is_cg_sstable = false,
const bool use_default_compat_version = false);
const bool use_default_compat_version = false,
const bool is_cs_replica_compat = false);
OB_INLINE virtual int64_t get_seq_read_column_count() const override
{ return get_request_count(); }
OB_INLINE virtual int64_t get_trans_col_index() const override

View File

@ -172,11 +172,14 @@ int ObCOTabletMergeCtx::prepare_cs_replica_param()
ObStorageSchema *schema_on_tablet = nullptr;
ObSSTable *sstable = nullptr;
if (static_param_.ls_handle_.get_ls()->is_cs_replica()) {
if (OB_FAIL(static_param_.tablet_schema_guard_.init(tablet_handle_, mem_ctx_))) {
if (OB_UNLIKELY(!tablet_handle_.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet handle is invalid", K(ret), K_(tablet_handle));
} else if (OB_FAIL(static_param_.tablet_schema_guard_.init(tablet_handle_, mem_ctx_))) {
LOG_WARN("failed to init cs replica schema guard", K(ret), KPC(this));
} else if (OB_FAIL(static_param_.tablet_schema_guard_.load(schema_on_tablet))) {
LOG_WARN("failed to load schema on tablet", K(ret));
} else if (schema_on_tablet->is_cs_replica_compat()) {
} else if (tablet_handle_.get_obj()->is_cs_replica_compat()) {
static_param_.is_cs_replica_ = true;
} else if (is_convert_co_major_merge(get_merge_type())) {
static_param_.is_cs_replica_ = true;

View File

@ -106,7 +106,7 @@ int ObCSReplicaUtil::check_need_process_cs_replica(
int ret = OB_SUCCESS;
need_process_cs_replica = ls.is_cs_replica()
&& tablet_id.is_user_tablet()
&& (schema.is_row_store() || schema.is_cs_replica_compat())
&& schema.is_row_store()
&& schema.is_user_data_table();
return ret;
}
@ -118,7 +118,7 @@ int ObCSReplicaUtil::check_need_wait_major_convert(
bool &need_wait_major_convert)
{
int ret = OB_SUCCESS;
bool need_process_cs_replica = false;
bool need_process_cs_replica = tablet.is_cs_replica_compat();
ObStorageSchema *storage_schema = nullptr;
ObArenaAllocator arena_allocator(common::ObMemAttr(MTL_ID(), "CkMjrCvrt"));
ObTabletMemberWrapper<ObTabletTableStore> wrapper;
@ -130,7 +130,7 @@ int ObCSReplicaUtil::check_need_wait_major_convert(
} else if (OB_ISNULL(storage_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("storage schema is nullptr", K(ret), K(tablet));
} else if (OB_FAIL(check_need_process_cs_replica(ls, tablet_id, *storage_schema, need_process_cs_replica))) {
} else if (!need_process_cs_replica && OB_FAIL(check_need_process_cs_replica(ls, tablet_id, *storage_schema, need_process_cs_replica))) {
LOG_WARN("fail to check need process cs replica", K(ret), K(ls), K(tablet_id), KPC(storage_schema));
} else if (need_process_cs_replica) {
if (tablet.is_row_store()) {

View File

@ -521,7 +521,6 @@ int ObStorageSchema::init(
storage_schema_version_ = old_schema.storage_schema_version_;
compat_mode_ = old_schema.compat_mode_;
is_cs_replica_compat_ = old_schema.is_cs_replica_compat_;
compressor_type_ = old_schema.compressor_type_;
column_cnt_ = old_schema.column_cnt_;
store_column_cnt_ = old_schema.store_column_cnt_;
@ -544,15 +543,9 @@ int ObStorageSchema::init(
if (OB_FAIL(ObStorageSchema::generate_cs_replica_cg_array())) {
STORAGE_LOG(WARN, "failed to generate_cs_replica_cg_array", K(ret));
}
} else if (NULL != column_group_schema) {
if (OB_FAIL(deep_copy_column_group_array(allocator, *column_group_schema))) {
STORAGE_LOG(WARN, "failed to deep copy column array from column group schema", K(ret), K(old_schema), KPC(column_group_schema));
} else if (!old_schema.is_cs_replica_compat() && column_group_schema->is_cs_replica_compat()) {
// use row store storage schema from src when ls rebuild, but use column group from old tablet to init cg schemas
is_cs_replica_compat_ = true;
STORAGE_LOG(INFO, "[CS-Replica] take old schema from param and column group from old tablet in cs replica", K(ret), K(old_schema), KPC(column_group_schema));
}
} else if (OB_FAIL(deep_copy_column_group_array(allocator, old_schema))) {
} else if (NULL != column_group_schema && OB_FAIL(deep_copy_column_group_array(allocator, *column_group_schema))) {
STORAGE_LOG(WARN, "failed to deep copy column array from column group schema", K(ret), K(old_schema), KPC(column_group_schema));
} else if (NULL == column_group_schema && OB_FAIL(deep_copy_column_group_array(allocator, old_schema))) {
STORAGE_LOG(WARN, "failed to deep copy column array", K(ret), K(old_schema));
}
@ -1155,7 +1148,6 @@ int ObStorageSchema::generate_cs_replica_cg_array()
if (OB_FAIL(generate_cs_replica_cg_array(*allocator_, column_group_array_))) {
STORAGE_LOG(WARN, "Failed to generate column store cg array", K(ret), KPC(this));
} else {
is_cs_replica_compat_ = true;
STORAGE_LOG(INFO, "[CS-Replica] Success to generate cs replica cg array", K(ret), KPC(this));
}
return ret;

View File

@ -309,11 +309,10 @@ public:
inline bool is_aux_lob_meta_table() const { return share::schema::is_aux_lob_meta_table(table_type_); }
inline bool is_aux_lob_piece_table() const { return share::schema::is_aux_lob_piece_table(table_type_); }
OB_INLINE bool is_user_hidden_table() const { return share::schema::TABLE_STATE_IS_HIDDEN_MASK & table_mode_.state_flag_; }
OB_INLINE bool is_cs_replica_compat() const { return is_cs_replica_compat_; }
VIRTUAL_TO_STRING_KV(KP(this), K_(storage_schema_version), K_(version),
K_(is_use_bloomfilter), K_(column_info_simplified), K_(compat_mode), K_(table_type), K_(index_type),
K_(row_store_type), K_(schema_version), K_(is_cs_replica_compat),
K_(row_store_type), K_(schema_version),
K_(column_cnt), K_(store_column_cnt), K_(tablet_size), K_(pctfree), K_(block_size), K_(progressive_merge_round),
K_(master_key_id), K_(compressor_type), K_(encryption), K_(encrypt_key),
"rowkey_cnt", rowkey_array_.count(), K_(rowkey_array), "column_cnt", column_array_.count(), K_(column_array),
@ -362,7 +361,7 @@ public:
static const int32_t SS_ONE_BIT = 1;
static const int32_t SS_HALF_BYTE = 4;
static const int32_t SS_ONE_BYTE = 8;
static const int32_t SS_RESERVED_BITS = 17;
static const int32_t SS_RESERVED_BITS = 18;
// STORAGE_SCHEMA_VERSION is for serde compatibility.
// Currently we do not use "standard" serde function macro,
@ -386,7 +385,6 @@ public:
uint32_t compat_mode_ :SS_HALF_BYTE;
uint32_t is_use_bloomfilter_ :SS_ONE_BIT;
uint32_t column_info_simplified_ :SS_ONE_BIT;
uint32_t is_cs_replica_compat_ :SS_ONE_BIT; // for storage schema on tablet
uint32_t reserved_ :SS_RESERVED_BITS;
};
};

View File

@ -279,7 +279,8 @@ int ObMdsSchemaHelper::build_rowkey_read_info(
storage_schema.is_oracle_mode(),
cols_desc,
false/*is_cg_sstable*/,
true/*use_default_compat_version*/))) {
true/*use_default_compat_version*/,
false/*is_cs_replica_compat*/))) {
LOG_WARN("fail to init rowkey read info", K(ret));
}

View File

@ -405,7 +405,7 @@ int ObTablet::init_for_first_time_creation(
LOG_WARN("failed to init table store cache", K(ret), KPC(this));
} else if (OB_FAIL(check_sstable_column_checksum())) {
LOG_WARN("failed to check sstable column checksum", K(ret), KPC(this));
} else if (OB_FAIL(build_read_info(allocator))) {
} else if (OB_FAIL(build_read_info(allocator, nullptr /*tablet*/, need_generate_cs_replica_cg_array))) {
LOG_WARN("failed to build read info", K(ret));
} else if (OB_FAIL(init_aggregated_info(allocator, nullptr/* link_writer, tmp_tablet do no write */))) {
LOG_WARN("fail to init aggregated info", K(ret));
@ -441,6 +441,7 @@ int ObTablet::init_for_merge(
const ObTabletTableStore *old_table_store = nullptr;
ObStorageSchema *old_storage_schema = nullptr;
const bool need_report_major = param.need_report_major();
const bool is_convert_co_merge = is_convert_co_major_merge(param.compaction_info_.merge_type_);
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
@ -503,7 +504,7 @@ int ObTablet::init_for_merge(
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(build_read_info(allocator))) {
} else if (OB_FAIL(build_read_info(allocator, nullptr /*tablet*/, is_convert_co_merge ? true : old_tablet.is_cs_replica_compat()))) {
LOG_WARN("failed to build read info", K(ret));
} else if (OB_FAIL(check_medium_list())) {
LOG_WARN("failed to check medium list", K(ret), K(param), K(old_tablet));
@ -514,7 +515,7 @@ int ObTablet::init_for_merge(
} else if (FALSE_IT(set_initial_addr())) {
} else if (OB_FAIL(inner_inc_macro_ref_cnt())) {
LOG_WARN("failed to increase macro ref cnt", K(ret));
} else if (OB_FAIL(check_tablet_schema_mismatch(old_tablet, *param.storage_schema_, is_convert_co_major_merge(param.compaction_info_.merge_type_)))) {
} else if (OB_FAIL(check_tablet_schema_mismatch(old_tablet, *param.storage_schema_, is_convert_co_merge))) {
LOG_ERROR("find error while checking tablet schema mismatch", K(ret), KPC(param.storage_schema_), K(old_tablet), K(param.compaction_info_));
} else if (OB_FAIL(check_table_store_flag_match_with_table_store_(table_store_addr_.get_ptr()))) {
LOG_WARN("failed to check table store flag match with table store", K(ret), K(old_tablet), K_(table_store_addr));
@ -531,7 +532,7 @@ int ObTablet::init_for_merge(
old_tablet.tablet_meta_.report_status_.cur_report_version_, tablet_meta_.report_status_))) {
LOG_WARN("failed to init report info", K(tmp_ret));
}
} else if (is_convert_co_major_merge(param.compaction_info_.merge_type_) && FALSE_IT(tablet_meta_.report_status_.reset())) {
} else if (is_convert_co_merge && FALSE_IT(tablet_meta_.report_status_.reset())) {
// force update data checksum for cs replica migration
} else if (OB_TMP_FAIL(ObTabletMeta::init_report_info(major_table,
old_tablet.tablet_meta_.report_status_.cur_report_version_, tablet_meta_.report_status_))) {
@ -620,7 +621,7 @@ int ObTablet::init_for_shared_merge(
LOG_WARN("failed to update start scn", K(ret), K(param), K(table_store_addr_));
} else if (OB_FAIL(try_update_table_store_flag(param.get_update_with_major_flag()))) {
LOG_WARN("failed to update table store flag", K(ret), K(param), K(table_store_addr_));
} else if (OB_FAIL(build_read_info(allocator))) {
} else if (OB_FAIL(build_read_info(allocator, nullptr /*tablet*/, false /*is_cs_replica_compat*/))) {
LOG_WARN("failed to build read info", K(ret));
} else if (OB_FAIL(check_sstable_column_checksum())) {
LOG_WARN("failed to check sstable column checksum", K(ret), KPC(this));
@ -756,7 +757,7 @@ int ObTablet::init_with_migrate_param(
table_store_addr_.get_ptr()->get_minor_sstables(),
param.storage_schema_.is_row_store() && !need_process_cs_replica))) {
LOG_WARN("failed to init table store cache", K(ret), KPC(this));
} else if (OB_FAIL(build_read_info(allocator))) {
} else if (OB_FAIL(build_read_info(allocator, nullptr /*tablet*/, need_process_cs_replica || param.is_storage_schema_cs_replica_))) {
LOG_WARN("fail to build read info", K(ret));
} else if (OB_FAIL(check_medium_list())) {
LOG_WARN("failed to check medium list", K(ret), K(param));
@ -830,7 +831,7 @@ int ObTablet::init_for_defragment(
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(build_read_info(*allocator_))) {
} else if (OB_FAIL(build_read_info(*allocator_, nullptr /*tablet*/, old_tablet.is_cs_replica_compat()))) {
LOG_WARN("fail to build read info", K(ret));
} else if (OB_FAIL(check_medium_list())) {
LOG_WARN("failed to check medium list", K(ret), KPC(this));
@ -903,6 +904,7 @@ int ObTablet::init_for_sstable_replace(
int64_t finish_medium_scn = 0;
int64_t max_sync_schema_version = 0;
const bool is_tablet_split = param.tablet_split_param_.is_valid();
const bool param_is_storage_schema_cs_replica = OB_ISNULL(param.tablet_meta_) ? false : param.tablet_meta_->is_storage_schema_cs_replica_;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
@ -959,7 +961,7 @@ int ObTablet::init_for_sstable_replace(
old_tablet.tablet_meta_.extra_medium_info_, param.tablet_meta_->extra_medium_info_, finish_medium_scn))) {
} else if (!is_ls_inner_tablet() && !param.is_transfer_replace_ && !is_tablet_split && OB_FAIL(update_tablet_status_from_sstable(false/*expect_persist_status*/))) {
LOG_WARN("fail to update tablet status from sstable", K(ret));
} else if (OB_FAIL(build_read_info(*allocator_))) {
} else if (OB_FAIL(build_read_info(*allocator_, nullptr /*tablet*/, old_tablet.is_cs_replica_compat() || param_is_storage_schema_cs_replica))) {
LOG_WARN("failed to build read info", K(ret));
} else if (OB_FAIL(check_medium_list())) {
LOG_WARN("failed to check medium list", K(ret), K(param), K(old_tablet));
@ -1227,7 +1229,7 @@ int ObTablet::init_with_update_medium_info(
} else {
ALLOC_AND_INIT(allocator, storage_schema_addr_, *old_storage_schema);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(build_read_info(allocator))) {
} else if (OB_FAIL(build_read_info(allocator, nullptr /*tablet*/, old_tablet.is_cs_replica_compat()))) {
LOG_WARN("failed to build read info", K(ret));
} else {
if (old_tablet.get_tablet_meta().has_next_tablet_) {
@ -1413,7 +1415,7 @@ int ObTablet::init_with_replace_members(
ALLOC_AND_INIT(allocator, storage_schema_addr_, *old_storage_schema);
}
if (FAILEDx(build_read_info(allocator))) {
if (FAILEDx(build_read_info(allocator, nullptr /*tablet*/, need_generate_cs_replica_cg_array ? true : old_tablet.is_cs_replica_compat()))) {
LOG_WARN("failed to build read info", K(ret));
} else {
if (old_tablet.get_tablet_meta().has_next_tablet_) {
@ -1488,7 +1490,7 @@ int ObTablet::init_with_mds_sstable(
table_store_addr_.get_ptr()->get_minor_sstables(),
old_tablet.table_store_cache_.is_row_store_))) {
LOG_WARN("failed to init table store cache", K(ret), KPC(this));
} else if (CLICK() && FAILEDx(build_read_info(*allocator_))) {
} else if (CLICK() && FAILEDx(build_read_info(*allocator_, nullptr /*tablet*/, old_tablet.is_cs_replica_compat()))) {
LOG_WARN("failed to build read info", K(ret));
} else if (CLICK_FAIL(check_medium_list())) {
LOG_WARN("failed to check medium list", K(ret), KPC(this));
@ -1669,7 +1671,7 @@ int ObTablet::inner_init_compat_normal_tablet(
table_store_addr_.get_ptr()->get_minor_sstables(),
old_tablet.table_store_cache_.is_row_store_))) {
LOG_WARN("failed to init table store cache", K(ret), KPC(this));
} else if (CLICK_FAIL(build_read_info(*allocator_))) {
} else if (CLICK_FAIL(build_read_info(*allocator_, nullptr /*tablet*/, old_tablet.is_cs_replica_compat()))) {
LOG_WARN("failed to build read info", K(ret));
} else if (CLICK_FAIL(check_medium_list())) {
LOG_WARN("failed to check medium list", K(ret), KPC(this));
@ -2256,7 +2258,7 @@ int ObTablet::load_deserialize_v1(
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(build_read_info(allocator))) {
} else if (OB_FAIL(build_read_info(allocator, nullptr /*tablet*/, false /*is_cs_replica_compat*/))) {
LOG_WARN("failed to build read info", K(ret));
} else if (OB_FAIL(pull_memtables(allocator))) {
LOG_WARN("fail to pull memtable", K(ret), K(len), K(pos));
@ -5349,7 +5351,10 @@ int ObTablet::init_shared_params(
return ret;
}
int ObTablet::build_read_info(common::ObArenaAllocator &allocator, const ObTablet *tablet)
int ObTablet::build_read_info(
common::ObArenaAllocator &allocator,
const ObTablet *tablet,
const bool is_cs_replica_compat)
{
int ret = OB_SUCCESS;
int64_t full_stored_col_cnt = 0;
@ -5358,6 +5363,9 @@ int ObTablet::build_read_info(common::ObArenaAllocator &allocator, const ObTable
tablet = (tablet == nullptr) ? this : tablet;
if (OB_FAIL(tablet->load_storage_schema(allocator, storage_schema))) {
LOG_WARN("fail to load storage schema", K(ret));
} else if (OB_UNLIKELY(storage_schema->is_row_store() && is_cs_replica_compat)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mismatch storage schema and cs replica param", K(ret), KPC(storage_schema), K(is_cs_replica_compat));
} else if (OB_FAIL(storage_schema->get_mulit_version_rowkey_column_ids(cols_desc))) {
LOG_WARN("fail to get rowkey column ids", K(ret), KPC(storage_schema));
} else if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, rowkey_read_info_))) {
@ -5368,7 +5376,10 @@ int ObTablet::build_read_info(common::ObArenaAllocator &allocator, const ObTable
full_stored_col_cnt,
storage_schema->get_rowkey_column_num(),
storage_schema->is_oracle_mode(),
cols_desc))) {
cols_desc,
false /*is_cg_sstable*/,
false /*use_default_compat_version*/,
is_cs_replica_compat))) {
LOG_WARN("fail to init rowkey read info", K(ret), KPC(storage_schema));
}
ObTabletObjLoadHelper::free(allocator, storage_schema);
@ -5506,6 +5517,8 @@ int ObTablet::build_migration_tablet_param_storage_schema(
LOG_WARN("fail to load storage schema", K(ret));
} else if (OB_FAIL(mig_tablet_param.storage_schema_.init(mig_tablet_param.allocator_, *storage_schema))) {
LOG_WARN("failed to copy storage schema", K(ret), KPC(storage_schema));
} else {
mig_tablet_param.is_storage_schema_cs_replica_ = is_cs_replica_compat();
}
ObTabletObjLoadHelper::free(arena_allocator, storage_schema);
return ret;
@ -5997,25 +6010,6 @@ int ObTablet::update_tablet_autoinc_seq(const uint64_t autoinc_seq, const bool i
return ret;
}
int ObTablet::check_cs_replica_compat_schema(bool &is_cs_replica_compat) const
{
int ret = OB_SUCCESS;
is_cs_replica_compat = false;
ObStorageSchema *storage_schema = nullptr;
ObArenaAllocator arena_allocator(common::ObMemAttr(MTL_ID(), "CSReplSchema"));
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_FAIL(load_storage_schema(arena_allocator, storage_schema))) {
LOG_WARN("fail to load storage schema", K(ret), K_(storage_schema_addr));
} else {
// column storage schema
is_cs_replica_compat = storage_schema->is_cs_replica_compat_;
}
ObTabletObjLoadHelper::free(arena_allocator, storage_schema);
return ret;
}
int ObTablet::check_row_store_with_co_major(bool &is_row_store_with_co_major) const
{
int ret = OB_SUCCESS;
@ -6075,12 +6069,10 @@ int ObTablet::inner_pre_process_cs_replica(
bool &replay_normal_in_cs_replica)
{
int ret = OB_SUCCESS;
bool is_cs_replica_compat = false;
bool is_cs_replica_compat = this->is_cs_replica_compat();
replay_normal_in_cs_replica = false;
if (is_row_store() || !is_ddl_direct_load(direct_load_type)) {
// do not need to process cs replica
} else if (OB_FAIL(check_cs_replica_compat_schema(is_cs_replica_compat))) {
LOG_WARN("failed to check cs replica compat", K(ret), KPC(this));
} else if (is_cs_replica_compat) {
if (tablet_meta_.is_cs_replica_global_visable_when_ddl()) {
table_key.table_type_ = ObITable::COLUMN_ORIENTED_SSTABLE; // for passing defence
@ -6459,7 +6451,6 @@ int ObTablet::get_tablet_report_info_by_sstable(
ObArray<int64_t> column_checksums;
column_checksums.set_attr(ObMemAttr(MTL_ID(), "tmpCkmArr"));
ObSSTable *table = nullptr;
bool is_cs_replica_compat = false;
if (OB_UNLIKELY(nullptr == main_major || report_major_snapshot != main_major->get_snapshot_version())) {
if (GCTX.is_shared_storage_mode()) {
ret = OB_EAGAIN;
@ -6503,8 +6494,6 @@ int ObTablet::get_tablet_report_info_by_sstable(
LOG_WARN("fail to init a tablet replica", KR(ret), "tablet_id", get_tablet_id(), K(tablet_replica));
} else if (!need_checksums) {
// do nothing
} else if (OB_FAIL(check_cs_replica_compat_schema(is_cs_replica_compat))) {
LOG_WARN("fail to check cs replica compat", K(ret), K(is_cs_replica_compat));
} else if (OB_FAIL(get_sstable_column_checksum(*main_major, column_checksums))) {
LOG_WARN("fail to get sstable column checksum", K(ret), KPC(main_major));
} else if (OB_FAIL(tablet_checksum.set_tenant_id(MTL_ID()))) {
@ -6519,7 +6508,7 @@ int ObTablet::get_tablet_report_info_by_sstable(
tablet_checksum.server_ = addr;
tablet_checksum.row_count_ = get_tablet_meta().report_status_.row_count_;
tablet_checksum.data_checksum_ = get_tablet_meta().report_status_.data_checksum_;
tablet_checksum.data_checksum_type_ = is_cs_replica_compat ? ObDataChecksumType::DATA_CHECKSUM_COLUMN_STORE : ObDataChecksumType::DATA_CHECKSUM_NORMAL;
tablet_checksum.data_checksum_type_ = is_cs_replica_compat() ? ObDataChecksumType::DATA_CHECKSUM_COLUMN_STORE : ObDataChecksumType::DATA_CHECKSUM_NORMAL;
LOG_INFO("success to get tablet report info", KR(ret), "tablet_id", get_tablet_id(), "report_status",
tablet_meta_.report_status_, K(tablet_checksum));
}
@ -7117,6 +7106,7 @@ int ObTablet::build_transfer_tablet_param_current_(
mig_tablet_param.ddl_table_type_ = tablet_meta_.ddl_table_type_;
mig_tablet_param.mds_checkpoint_scn_ = user_data.transfer_scn_;
mig_tablet_param.report_status_.reset();
mig_tablet_param.is_storage_schema_cs_replica_ = is_cs_replica_compat();
if (OB_FAIL(mig_tablet_param.last_persisted_committed_tablet_status_.assign(user_data))) {
LOG_WARN("fail to assign mig tablet param from tablet meta", K(ret), K(user_data));
@ -7165,7 +7155,9 @@ int64_t ObTablet::to_string(char *buf, const int64_t buf_len) const
KP_(ddl_kvs),
K_(ddl_kv_count),
K_(is_external_tablet),
K_(table_store_cache));
K_(table_store_cache),
KP_(rowkey_read_info),
"is_cs_replica_compat", is_cs_replica_compat());
J_COMMA();
BUF_PRINTF("memtables:");
J_ARRAY_START();

View File

@ -483,7 +483,7 @@ public:
// column store replica
public:
int check_cs_replica_compat_schema(bool &is_cs_replica_compat) const;
bool is_cs_replica_compat() const { return nullptr == rowkey_read_info_ ? false : rowkey_read_info_->is_cs_replica_compat(); }
int check_row_store_with_co_major(bool &is_row_store_with_co_major) const;
int pre_process_cs_replica(
const ObDirectLoadType direct_load_type,
@ -693,7 +693,10 @@ private:
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id,
const lib::Worker::CompatMode compat_mode);
int build_read_info(common::ObArenaAllocator &allocator, const ObTablet *tablet = nullptr);
int build_read_info(
common::ObArenaAllocator &allocator,
const ObTablet *tablet,
const bool is_cs_replica_compat);
int create_memtable(const int64_t schema_version,
const share::SCN clog_checkpoint_scn,
const bool for_direct_load,

View File

@ -1065,6 +1065,7 @@ ObMigrationTabletParam::ObMigrationTabletParam()
micro_index_clustered_(false),
major_ckm_info_(),
ddl_table_type_(ObITable::MAX_TABLE_TYPE),
is_storage_schema_cs_replica_(false),
allocator_("MigTblParam", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID)
{
}
@ -1249,6 +1250,8 @@ int ObMigrationTabletParam::serialize(char *buf, const int64_t len, int64_t &pos
LOG_WARN("failed to serialize micro_index_clustered", K(ret), K(len), K(new_pos), K_(micro_index_clustered));
} else if (PARAM_VERSION_V3 <= version_ && new_pos - pos < length && OB_FAIL(major_ckm_info_.serialize(buf, len, new_pos))) {
LOG_WARN("failed to serialize major ckm info", K(ret), K(len), K(new_pos), K_(major_ckm_info));
} else if (PARAM_VERSION_V3 <= version_ && new_pos - pos < length && OB_FAIL(serialization::encode_bool(buf, len, new_pos, is_storage_schema_cs_replica_))) {
LOG_WARN("failed to serialize is_storage_schema_cs_replica_", K(ret), K(len), K(new_pos), K_(is_storage_schema_cs_replica));
} else if (OB_UNLIKELY(length != new_pos - pos)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("length doesn't match standard length", K(ret), K(new_pos), K(pos), K(length));
@ -1342,6 +1345,8 @@ int ObMigrationTabletParam::deserialize_v2_v3(const char *buf, const int64_t len
LOG_WARN("failed to deserialize micro_index_clustered", K(ret), K(len));
} else if (PARAM_VERSION_V3 <= version_ && new_pos - pos < length && OB_FAIL(major_ckm_info_.deserialize(allocator_, buf, len, new_pos))) {
LOG_WARN("failed to deserialize major ckm info", K(ret), K(len));
} else if (PARAM_VERSION_V3 <= version_ && new_pos - pos < length && OB_FAIL(serialization::decode_bool(buf, len, new_pos, &is_storage_schema_cs_replica_))) {
LOG_WARN("failed to deserialize is_storage_schema_cs_replica", K(ret), K(len));
} else if (OB_UNLIKELY(length != new_pos - pos)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet's length doesn't match standard length", K(ret), K(new_pos), K(pos), K(length), KPC(this));
@ -1545,6 +1550,7 @@ int64_t ObMigrationTabletParam::get_serialize_size() const
size += serialization::encoded_length_bool(micro_index_clustered_);
size += major_ckm_info_.get_serialize_size();
size += serialization::encoded_length(ddl_table_type_);
size += serialization::encoded_length_bool(is_storage_schema_cs_replica_);
}
return size;
}
@ -1587,6 +1593,7 @@ void ObMigrationTabletParam::reset()
micro_index_clustered_ = false;
major_ckm_info_.reset();
ddl_table_type_ = ObITable::MAX_TABLE_TYPE;
is_storage_schema_cs_replica_ = false;
allocator_.reset();
}
@ -1630,6 +1637,7 @@ int ObMigrationTabletParam::assign(const ObMigrationTabletParam &param)
mds_checkpoint_scn_ = param.mds_checkpoint_scn_;
transfer_info_ = param.transfer_info_;
extra_medium_info_ = param.extra_medium_info_;
is_storage_schema_cs_replica_ = param.is_storage_schema_cs_replica_;
if (OB_FAIL(mds_data_.assign(param.mds_data_, allocator_))) {
LOG_WARN("failed to assign mds data", K(ret), K(param));
} else if (OB_FAIL(last_persisted_committed_tablet_status_.assign(
@ -1723,7 +1731,6 @@ int ObMigrationTabletParam::construct_placeholder_storage_schema_and_medium(
storage_schema.progressive_merge_num_ = 0;
storage_schema.master_key_id_ = OB_INVALID_ID;
storage_schema.compat_mode_ = static_cast<uint32_t>(lib::Worker::get_compatibility_mode());
storage_schema.is_cs_replica_compat_ = false;
ObStorageRowkeyColumnSchema rowkey_schema;
rowkey_schema.meta_type_.set_tinyint();

View File

@ -293,7 +293,8 @@ public:
K_(transfer_info),
K_(create_schema_version),
K_(micro_index_clustered),
K_(major_ckm_info));
K_(major_ckm_info),
K_(is_storage_schema_cs_replica));
private:
int deserialize_v2_v3(const char *buf, const int64_t len, int64_t &pos);
int deserialize_v1(const char *buf, const int64_t len, int64_t &pos);
@ -347,6 +348,7 @@ public:
bool micro_index_clustered_;
blocksstable::ObMajorChecksumInfo major_ckm_info_; // from table store
ObITable::TableType ddl_table_type_;
bool is_storage_schema_cs_replica_;
// Add new serialization member before this line, below members won't serialize
common::ObArenaAllocator allocator_; // for storage schema

View File

@ -402,7 +402,7 @@ void ObMultiVersionSSTableTest::init_tablet()
tablet->storage_schema_addr_.get_ptr()->init(allocator_, table_schema_, lib::Worker::CompatMode::MYSQL);
ASSERT_NE(nullptr, ptr = allocator_.alloc(sizeof(ObRowkeyReadInfo)));
tablet->rowkey_read_info_ = new (ptr) ObRowkeyReadInfo();
tablet->build_read_info(allocator_);
tablet->build_read_info(allocator_, nullptr /*tablet*/, false /*is_cs_replica_compat*/);
}
void ObMultiVersionSSTableTest::reset_writer(

View File

@ -55,7 +55,7 @@ int MockObTableReadInfo::init(common::ObIAllocator &allocator,
const int64_t extra_rowkey_col_cnt = storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
const bool is_cg_sstable = (schema_rowkey_cnt == 0 && schema_column_count == 1);
init_basic_info(schema_column_count, schema_rowkey_cnt, is_oracle_mode, is_cg_sstable); // init basic info
init_basic_info(schema_column_count, schema_rowkey_cnt, is_oracle_mode, is_cg_sstable, false /*is_cs_replica_compat*/); // init basic info
if (OB_FAIL(prepare_arrays(allocator, cols_desc, out_cols_cnt))) {
STORAGE_LOG(WARN, "failed to prepare arrays", K(ret), K(out_cols_cnt));
} else if (nullptr != cols_param && OB_FAIL(cols_param_.init_and_assign(*cols_param, allocator))) {