add storage schema case in farm & add compaction history obtest & fix progressive merge

This commit is contained in:
yangqise7en 2024-07-23 13:32:05 +00:00 committed by ob-robot
parent f7dbcb80b7
commit 0f9dcf8cd9
26 changed files with 124 additions and 100 deletions

View File

@ -610,7 +610,7 @@ void TestIndexBlockDataPrepare::prepare_data(const int64_t micro_block_size)
share::SCN scn;
scn.convert_for_tx(SNAPSHOT_VERSION);
ObWholeDataStoreDesc desc;
ASSERT_EQ(OB_SUCCESS, desc.init(table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION,
ASSERT_EQ(OB_SUCCESS, desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION,
DATA_CURRENT_VERSION, scn));
desc.get_desc().static_desc_->schema_version_ = 10;
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
@ -676,7 +676,7 @@ void TestIndexBlockDataPrepare::prepare_cg_data()
ObWholeDataStoreDesc desc;
share::SCN scn;
scn.convert_for_tx(SNAPSHOT_VERSION);
ASSERT_EQ(OB_SUCCESS, desc.init(table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, DATA_CURRENT_VERSION, scn));
ASSERT_EQ(OB_SUCCESS, desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, DATA_CURRENT_VERSION, scn));
ObIArray<ObColDesc> &col_descs = desc.get_desc().col_desc_->col_desc_array_;
for (int64_t i = 0; i < col_descs.count(); ++i) {
if (col_descs.at(i).col_type_.type_ == ObIntType) {
@ -695,7 +695,7 @@ void TestIndexBlockDataPrepare::prepare_cg_data()
ASSERT_EQ(merge_type_, ObMergeType::MAJOR_MERGE);
ObWholeDataStoreDesc data_desc;
OK(data_desc.init(table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_),
OK(data_desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_),
merge_type_, SNAPSHOT_VERSION, DATA_CURRENT_VERSION,
scn, &cg_schema, 0));
data_desc.get_desc().static_desc_->schema_version_ = 10;
@ -870,10 +870,10 @@ void TestIndexBlockDataPrepare::prepare_partial_ddl_data()
ObMacroDataSeq start_seq(0);
start_seq.set_data_block();
row_generate_.reset();
ObWholeDataStoreDesc desc(true/*is ddl*/);
ObWholeDataStoreDesc desc;
share::SCN end_scn;
end_scn.convert_from_ts(ObTimeUtility::current_time());
ASSERT_EQ(OB_SUCCESS, desc.init(table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, end_scn));
ASSERT_EQ(OB_SUCCESS, desc.init(true/*is ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, end_scn));
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
merge_root_index_builder_ = new (builder_buf) ObSSTableIndexBuilder();
ASSERT_NE(nullptr, merge_root_index_builder_);
@ -945,10 +945,10 @@ void TestIndexBlockDataPrepare::prepare_partial_cg_data()
ObMacroDataSeq start_seq(0);
start_seq.set_data_block();
row_generate_.reset();
ObWholeDataStoreDesc desc(true/*is ddl*/);
ObWholeDataStoreDesc desc;
share::SCN end_scn;
end_scn.convert_from_ts(ObTimeUtility::current_time());
ASSERT_EQ(OB_SUCCESS, desc.init(table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, end_scn));
ASSERT_EQ(OB_SUCCESS, desc.init(true/*is ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, end_scn));
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
merge_root_index_builder_ = new (builder_buf) ObSSTableIndexBuilder();
ASSERT_NE(nullptr, merge_root_index_builder_);
@ -1173,7 +1173,7 @@ void TestIndexBlockDataPrepare::prepare_contrastive_sstable()
ObWholeDataStoreDesc desc;
share::SCN end_scn;
end_scn.convert_from_ts(ObTimeUtility::current_time());
ASSERT_EQ(OB_SUCCESS, desc.init(table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, end_scn));
ASSERT_EQ(OB_SUCCESS, desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, end_scn));
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
root_index_builder_ = new (builder_buf) ObSSTableIndexBuilder();
ASSERT_NE(nullptr, root_index_builder_);

View File

@ -417,7 +417,7 @@ void TestIndexTree::prepare_data()
ASSERT_EQ(OB_SUCCESS, multi_row.init(allocator_, MAX_TEST_COLUMN_CNT));
ObDmlFlag dml = DF_INSERT;
ret = data_desc.init(table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, 1, DATA_CURRENT_VERSION);
ret = data_desc.init(false/*is_ddl*/, table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, 1, DATA_CURRENT_VERSION);
ASSERT_EQ(OB_SUCCESS, ret);
ret = writer.open(data_desc.get_desc(), start_seq);
ASSERT_EQ(OB_SUCCESS, ret);
@ -576,7 +576,7 @@ void TestIndexTree::prepare_data_desc(ObWholeDataStoreDesc &data_desc,
ObSSTableIndexBuilder *sstable_builder)
{
int ret = OB_SUCCESS;
ret = data_desc.init(table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, 1, DATA_CURRENT_VERSION);
ret = data_desc.init(false/*is_ddl*/, table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, 1, DATA_CURRENT_VERSION);
data_desc.get_desc().sstable_index_builder_ = sstable_builder;
ASSERT_EQ(OB_SUCCESS, ret);
}
@ -1704,7 +1704,7 @@ TEST_F(TestIndexTree, test_close_with_old_schema)
// mock old schema with fewer columns
ObWholeDataStoreDesc index_desc;
OK(index_desc.init(table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, 1/*snapshot*/, 0/*cluster_version*/));
OK(index_desc.init(false/*is_ddl*/, table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, 1/*snapshot*/, 0/*cluster_version*/));
index_desc.static_desc_.major_working_cluster_version_ = DATA_VERSION_4_0_0_0;
--index_desc.get_desc().col_desc_->full_stored_col_cnt_;
index_desc.get_desc().col_desc_->col_default_checksum_array_.pop_back();

View File

@ -386,7 +386,7 @@ void TestCOMerge::prepare_co_sstable(
ObCOMergeProjector *row_project = nullptr;
ObTableHandleV2 *table_handle = nullptr;
OK(data_store_desc.init(table_schema,
OK(data_store_desc.init(false/*is_ddl*/, table_schema,
ObLSID(ls_id_),
ObTabletID(tablet_id_),
merge_type,

View File

@ -20,25 +20,9 @@ namespace blocksstable
* -------------------------------------------------------------------ObStaticDataStoreDesc-------------------------------------------------------------------
*/
const ObCompressorType ObStaticDataStoreDesc::DEFAULT_MINOR_COMPRESSOR_TYPE;
ObStaticDataStoreDesc::ObStaticDataStoreDesc(const bool is_ddl)
: is_ddl_(is_ddl),
merge_type_(compaction::INVALID_MERGE_TYPE),
compressor_type_(ObCompressorType::INVALID_COMPRESSOR),
ls_id_(),
tablet_id_(),
macro_block_size_(0),
macro_store_size_(0),
micro_block_size_limit_(0),
schema_version_(0),
snapshot_version_(0),
end_scn_(),
progressive_merge_round_(0),
major_working_cluster_version_(0),
encrypt_id_(0),
master_key_id_(0)
ObStaticDataStoreDesc::ObStaticDataStoreDesc()
{
end_scn_.set_min();
MEMSET(encrypt_key_, 0, sizeof(encrypt_key_));
reset();
}
bool ObStaticDataStoreDesc::is_valid() const
@ -52,21 +36,7 @@ bool ObStaticDataStoreDesc::is_valid() const
void ObStaticDataStoreDesc::reset()
{
merge_type_ = compaction::INVALID_MERGE_TYPE;
compressor_type_ = ObCompressorType::INVALID_COMPRESSOR;
ls_id_.reset();
tablet_id_.reset();
macro_block_size_ = 0;
macro_store_size_ = 0;
micro_block_size_limit_ = 0;
schema_version_ = 0;
snapshot_version_ = 0;
end_scn_.set_min();
progressive_merge_round_ = 0;
major_working_cluster_version_ = 0;
encrypt_id_ = 0;
master_key_id_ = 0;
MEMSET(encrypt_key_, 0, sizeof(encrypt_key_));
MEMSET(this, 0, sizeof(*this));
}
int ObStaticDataStoreDesc::assign(const ObStaticDataStoreDesc &desc)
@ -83,6 +53,7 @@ int ObStaticDataStoreDesc::assign(const ObStaticDataStoreDesc &desc)
schema_version_ = desc.schema_version_;
snapshot_version_ = desc.snapshot_version_;
end_scn_ = desc.end_scn_;
progressive_merge_round_ = desc.progressive_merge_round_;
major_working_cluster_version_ = desc.major_working_cluster_version_;
encrypt_id_ = desc.encrypt_id_;
master_key_id_ = desc.master_key_id_;
@ -126,6 +97,7 @@ void ObStaticDataStoreDesc::init_block_size(const ObMergeSchema &merge_schema)
}
int ObStaticDataStoreDesc::init(
const bool is_ddl,
const ObMergeSchema &merge_schema,
const share::ObLSID &ls_id,
const common::ObTabletID tablet_id,
@ -142,6 +114,7 @@ int ObStaticDataStoreDesc::init(
STORAGE_LOG(WARN, "arguments is invalid", K(ret), K(merge_schema), K(snapshot_version), K(end_scn));
} else {
reset();
is_ddl_ = is_ddl;
merge_type_ = merge_type;
ls_id_ = ls_id;
tablet_id_ = tablet_id;
@ -180,6 +153,11 @@ int ObStaticDataStoreDesc::init(
return ret;
}
bool ObStaticDataStoreDesc::operator==(const ObStaticDataStoreDesc &other) const
{
return (0 == MEMCMP(this, &other, sizeof(*this)));
}
/**
* -------------------------------------------------------------------ObColDataStoreDesc-------------------------------------------------------------------
*/
@ -876,6 +854,7 @@ int ObWholeDataStoreDesc::init(
}
int ObWholeDataStoreDesc::init(
const bool is_ddl,
const ObMergeSchema &merge_schema,
const share::ObLSID &ls_id,
const common::ObTabletID tablet_id,
@ -888,7 +867,7 @@ int ObWholeDataStoreDesc::init(
{
int ret = OB_SUCCESS;
reset();
if (OB_FAIL(static_desc_.init(merge_schema, ls_id, tablet_id, merge_type, snapshot_version, end_scn, cluster_version))) {
if (OB_FAIL(static_desc_.init(is_ddl, merge_schema, ls_id, tablet_id, merge_type, snapshot_version, end_scn, cluster_version))) {
STORAGE_LOG(WARN, "failed to init static desc", KR(ret));
} else if (OB_FAIL(inner_init(merge_schema, cg_schema, table_cg_idx))) {
STORAGE_LOG(WARN, "failed to init", KR(ret), K(merge_schema), K(cg_schema), K(table_cg_idx));

View File

@ -23,7 +23,6 @@ namespace oceanbase
namespace storage {
struct ObSSTableMergeInfo;
struct ObStorageColumnGroupSchema;
struct ObSSTableMergeInfo;
}
namespace share
{
@ -49,9 +48,10 @@ struct ObSSTableBasicMeta;
struct ObStaticDataStoreDesc
{
public:
ObStaticDataStoreDesc(const bool is_ddl = false);
ObStaticDataStoreDesc();
~ObStaticDataStoreDesc() { reset(); }
int init(
const bool is_ddl,
const share::schema::ObMergeSchema &merge_schema,
const share::ObLSID &ls_id,
const common::ObTabletID tablet_id,
@ -77,15 +77,17 @@ public:
K_(encrypt_id),
K_(master_key_id),
KPHEX_(encrypt_key, sizeof(encrypt_key_)),
K_(major_working_cluster_version));
K_(major_working_cluster_version),
K_(progressive_merge_round));
private:
OB_INLINE int init_encryption_info(const share::schema::ObMergeSchema &merge_schema);
OB_INLINE void init_block_size(const share::schema::ObMergeSchema &merge_schema);
static const int64_t DEFAULT_RESERVE_PERCENT = 90;
static const int64_t MIN_RESERVED_SIZE = 1024; //1KB;
static const ObCompressorType DEFAULT_MINOR_COMPRESSOR_TYPE = ObCompressorType::LZ4_COMPRESSOR;
bool operator==(const ObStaticDataStoreDesc &other) const; // for unittest
public:
bool is_ddl_;
bool is_ddl_; // only used to print ERROR or WARN log
compaction::ObMergeType merge_type_;
ObCompressorType compressor_type_;
share::ObLSID ls_id_;
@ -306,8 +308,8 @@ private:
struct ObWholeDataStoreDesc
{
ObWholeDataStoreDesc(bool is_ddl = false)
: static_desc_(is_ddl),
ObWholeDataStoreDesc()
: static_desc_(),
col_desc_(),
desc_()
{}
@ -324,6 +326,7 @@ struct ObWholeDataStoreDesc
const storage::ObStorageColumnGroupSchema *cg_schema = nullptr,
const uint16_t table_cg_idx = 0);
int init(
const bool is_ddl,
const share::schema::ObMergeSchema &merge_schema,
const share::ObLSID &ls_id,
const common::ObTabletID tablet_id,

View File

@ -780,6 +780,7 @@ int ObSharedMacroBlockMgr::prepare_data_desc(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mds storage schema is invalid", K(ret), KP(storage_schema), KPC(storage_schema));
} else if (OB_FAIL(data_desc.init(
false/*is_ddl*/,
*storage_schema,
tablet.get_tablet_meta().ls_id_,
tablet.get_tablet_meta().tablet_id_,
@ -809,6 +810,7 @@ int ObSharedMacroBlockMgr::prepare_data_desc(
}
if (FAILEDx(data_desc.init(
false/*is_ddl*/,
*storage_schema,
tablet.get_tablet_meta().ls_id_,
tablet.get_tablet_meta().tablet_id_,

View File

@ -292,8 +292,12 @@ int ObStaticMergeParam::cal_major_merge_param()
}
if (OB_SUCC(ret)) {
if (is_full_merge_ || is_meta_major_merge(get_merge_type()) || (merge_level_ != MACRO_BLOCK_MERGE_LEVEL && is_schema_changed_)) {
if (is_full_merge_
|| is_meta_major_merge(get_merge_type())
|| (merge_level_ != MACRO_BLOCK_MERGE_LEVEL && is_schema_changed_)
|| (data_version_ >= DATA_VERSION_4_3_3_0 && need_calc_progressive_merge())) {
merge_level_ = MACRO_BLOCK_MERGE_LEVEL;
// for progressive merge, if all macro have larger progressive_merge_round, no need progressive merge any more TODO;
}
}
return ret;
@ -790,7 +794,7 @@ int ObBasicTabletMergeCtx::init_static_param_and_desc()
int ret = OB_SUCCESS;
if (OB_FAIL(static_param_.init_static_info(get_concurrent_cnt(), tablet_handle_))) {
LOG_WARN("failed to init basic info", KR(ret));
} else if (OB_FAIL(static_desc_.init(*get_schema(), get_ls_id(), get_tablet_id(),
} else if (OB_FAIL(static_desc_.init(false/*is_ddl*/, *get_schema(), get_ls_id(), get_tablet_id(),
get_merge_type(), get_snapshot(),
static_param_.scn_range_.end_scn_,
static_param_.data_version_))) {

View File

@ -49,6 +49,11 @@ struct ObStaticMergeParam final
is_full_merge_ = is_full_merge;
merge_level_ = MACRO_BLOCK_MERGE_LEVEL;
}
bool need_calc_progressive_merge() const
{
return is_major_merge_type(get_merge_type())
&& progressive_merge_step_ < progressive_merge_num_;
}
private:
int init_multi_version_column_descs();

View File

@ -21,7 +21,6 @@ namespace oceanbase
namespace compaction
{
enum ObMergeType;
struct ObCompactionParam;
class ObTabletMergeDag;

View File

@ -18,6 +18,7 @@ namespace compaction
{
const static char * ObMergeTypeStr[] = {
"INVALID_MERGE_TYPE",
"MINOR_MERGE",
"HISTORY_MINOR_MERGE",
"META_MAJOR_MERGE",

View File

@ -17,19 +17,19 @@ namespace oceanbase
{
namespace compaction
{
enum ObMergeType
enum ObMergeType : uint8_t
{
INVALID_MERGE_TYPE = -1,
MINOR_MERGE = 0, // minor merge, compaction several mini sstable into one larger mini sstable
HISTORY_MINOR_MERGE = 1,
META_MAJOR_MERGE = 2,
MINI_MERGE = 3, // mini merge, only flush memtable
MAJOR_MERGE = 4,
MEDIUM_MERGE = 5,
DDL_KV_MERGE = 6, // only use for ddl dag
BACKFILL_TX_MERGE = 7,
MDS_MINI_MERGE = 8,
MDS_MINOR_MERGE = 9,
INVALID_MERGE_TYPE = 0,
MINOR_MERGE, // minor merge, compaction several mini sstable into one larger mini sstable
HISTORY_MINOR_MERGE,
META_MAJOR_MERGE,
MINI_MERGE, // mini merge, only flush memtable
MAJOR_MERGE,
MEDIUM_MERGE,
DDL_KV_MERGE, // only use for ddl dag
BACKFILL_TX_MERGE,
MDS_MINI_MERGE,
MDS_MINOR_MERGE,
// add new merge type here
// fix merge_type_to_str & ObPartitionMergePolicy::get_merge_tables
MERGE_TYPE_MAX

View File

@ -50,8 +50,10 @@ namespace compaction
ERRSIM_POINT_DEF(EN_COMPACTION_DISABLE_ROW_COL_SWITCH);
// keep order with ObMergeType
ObPartitionMergePolicy::GetMergeTables ObPartitionMergePolicy::get_merge_tables[MERGE_TYPE_MAX]
= { ObPartitionMergePolicy::get_minor_merge_tables,
ObPartitionMergePolicy::GetMergeTables ObPartitionMergePolicy::get_merge_tables[]
= {
ObPartitionMergePolicy::not_support_merge_type,
ObPartitionMergePolicy::get_minor_merge_tables,
ObPartitionMergePolicy::get_hist_minor_merge_tables,
ObAdaptiveMergePolicy::get_meta_merge_tables,
ObPartitionMergePolicy::get_mini_merge_tables,

View File

@ -206,7 +206,7 @@ public:
storage::ObLS &ls,
const storage::ObTablet &,
storage::ObGetMergeTablesResult&);
static GetMergeTables get_merge_tables[compaction::ObMergeType::MERGE_TYPE_MAX];
static GetMergeTables get_merge_tables[];
};
struct ObMinorExecuteRangeMgr

View File

@ -105,8 +105,7 @@ int ObProgressiveMergeHelper::init(const ObSSTable &sstable, const ObMergeParame
} else {
int64_t rewrite_macro_cnt = 0, reduce_macro_cnt = 0, rewrite_block_cnt_for_progressive = 0;
bool last_is_small_data_macro = false;
const bool is_major = is_major_merge_type(static_param.get_merge_type());
const bool need_calc_progressive_merge = is_major && static_param.progressive_merge_step_ < static_param.progressive_merge_num_;
const bool need_calc_progressive_merge = static_param.need_calc_progressive_merge();
progressive_merge_round_ = static_param.progressive_merge_round_;

View File

@ -196,6 +196,11 @@ int ObSSTableBuilder::build_sstable_merge_res(
} else if (OB_FAIL(rebuild_index_builder_.close(res))) {
STORAGE_LOG(WARN, "fail to close", K(ret), K(rebuild_index_builder_));
} else { //update merge info
STORAGE_LOG(INFO, "after rebuild sstable", K(ret), "cg_idx", data_store_desc_.get_desc().get_table_cg_idx(),
"old_multiplexed_macro_block_count", sstable_merge_info.multiplexed_macro_block_count_,
"old_total_macro_count", sstable_merge_info.macro_block_count_,
"new_multiplexed_macro_block_count", multiplexed_macro_block_count,
"new_total_macro_count", res.data_blocks_cnt_);
sstable_merge_info.multiplexed_macro_block_count_ = multiplexed_macro_block_count;
sstable_merge_info.macro_block_count_ = res.data_blocks_cnt_;
}
@ -254,6 +259,7 @@ int ObSSTableBuilder::check_need_rebuild(const ObStaticMergeParam &merge_param,
if (OB_FAIL(pre_check_rebuild(merge_param, iter, need_check_rebuild))) {
STORAGE_LOG(WARN, "Fail to pre check need rebuild", K(ret));
} else if (need_check_rebuild) {
// find continues macro to rewrite
while (OB_SUCC(ret) && OB_SUCC(iter.get_next_macro_block(macro_meta))) {
if (OB_ISNULL(macro_meta)) {
ret = OB_ERR_UNEXPECTED;
@ -261,10 +267,10 @@ int ObSSTableBuilder::check_need_rebuild(const ObStaticMergeParam &merge_param,
} else if (check_macro_block_could_merge(*macro_meta)) {
const int64_t macro_block_sum = macro_meta->val_.occupy_size_ + macro_meta->val_.block_size_;
bool need_merge = false;
// check last_macro_block_sum + cur_macro can be merged into one
if (OB_FAIL(check_cur_macro_need_merge(last_macro_block_sum, *macro_meta, need_merge))) {
STORAGE_LOG(WARN, "fail to check_cur_macro_need_merge", K(ret), K(macro_meta));
} else if (!need_merge) {
} else if (!need_merge) { // found first can't merge macro, reset collect info
last_macro_id = macro_meta->get_macro_id();
last_macro_is_first = true;
last_macro_block_sum = macro_block_sum;

View File

@ -590,7 +590,7 @@ bool ObComplementDataDag::ignore_warning()
int ObComplementDataDag::prepare_context()
{
int ret = OB_SUCCESS;
ObWholeDataStoreDesc data_desc(true/*is_ddl*/);
ObWholeDataStoreDesc data_desc;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *hidden_table_schema = nullptr;
if (OB_UNLIKELY(!is_inited_)) {
@ -608,7 +608,8 @@ int ObComplementDataDag::prepare_context()
} else if (OB_ISNULL(hidden_table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("hidden table schema not exist", K(ret), K(param_));
} else if (OB_FAIL(data_desc.init(*hidden_table_schema,
} else if (OB_FAIL(data_desc.init(true/*is_ddl*/,
*hidden_table_schema,
param_.dest_ls_id_,
param_.dest_tablet_id_,
MAJOR_MERGE,
@ -1376,12 +1377,11 @@ int ObComplementWriteTask::append_row(ObScan *scan)
int ret = OB_SUCCESS;
ObComplementDataDag *current_dag = nullptr;
const int64_t CHECK_DAG_NEED_EXIT_INTERVAL = 10000; // 1w rows.
ObDataStoreDesc data_desc;
HEAP_VARS_4((ObMacroBlockWriter, writer),
(ObSchemaGetterGuard, schema_guard),
(ObRelativeTable, relative_table),
(blocksstable::ObNewRowBuilder, new_row_builder)) {
HEAP_VAR(ObWholeDataStoreDesc, data_desc, true) {
HEAP_VAR(ObWholeDataStoreDesc, data_desc) {
ObArray<int64_t> report_col_checksums;
ObArray<int64_t> report_col_ids;
ObDDLRedoLogWriterCallback callback;
@ -1444,7 +1444,7 @@ int ObComplementWriteTask::append_row(ObScan *scan)
} else if (OB_ISNULL(hidden_table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", K(ret), K(param_->dest_tenant_id_), K(param_->dest_table_id_));
} else if (OB_FAIL(data_desc.init(*hidden_table_schema,
} else if (OB_FAIL(data_desc.init(true/*is_ddl*/, *hidden_table_schema,
param_->dest_ls_id_,
param_->dest_tablet_id_,
MAJOR_MERGE,

View File

@ -734,14 +734,15 @@ int ObTabletDDLUtil::prepare_index_data_desc(ObTablet &tablet,
LOG_WARN("unexpected table key is minor sstable", K(ret), K(table_key));
} else {
const ObStorageColumnGroupSchema &cur_cg_schema = cg_schemas.at(cg_idx);
if (OB_FAIL(data_desc.init(*storage_schema, ls_id, tablet_id,
if (OB_FAIL(data_desc.init(true/*is_ddl*/, *storage_schema, ls_id, tablet_id,
compaction::ObMergeType::MAJOR_MERGE, snapshot_version, data_format_version, end_scn, &cur_cg_schema, cg_idx))) {
LOG_WARN("init data desc for cg failed", K(ret));
} else {
LOG_DEBUG("get data desc from column group schema", K(ret), K(tablet_id), K(cg_idx), K(data_desc), K(cur_cg_schema));
}
}
} else if (OB_FAIL(data_desc.init(*storage_schema,
} else if (OB_FAIL(data_desc.init(true/*is_ddl*/,
*storage_schema,
ls_id,
tablet_id,
table_key.is_minor_sstable() ? compaction::MINOR_MERGE : compaction::MAJOR_MERGE,
@ -780,7 +781,7 @@ int ObTabletDDLUtil::create_ddl_sstable(ObTablet &tablet,
int ret = OB_SUCCESS;
HEAP_VAR(ObSSTableIndexBuilder, sstable_index_builder) {
ObIndexBlockRebuilder index_block_rebuilder;
ObWholeDataStoreDesc data_desc(true/*is_ddl*/);
ObWholeDataStoreDesc data_desc;
int64_t macro_block_column_count = 0;
if (OB_UNLIKELY(!ddl_param.is_valid() || OB_ISNULL(storage_schema))) {
ret = OB_INVALID_ARGUMENT;

View File

@ -1170,7 +1170,7 @@ private:
};
ObTabletDirectLoadBuildCtx::ObTabletDirectLoadBuildCtx()
: allocator_(), slice_writer_allocator_(), build_param_(), slice_mgr_map_(), data_block_desc_(true/*is ddl*/), index_builder_(nullptr),
: allocator_(), slice_writer_allocator_(), build_param_(), slice_mgr_map_(), data_block_desc_(), index_builder_(nullptr),
column_stat_array_(), sorted_slice_writers_(), sorted_slices_idx_(), is_task_end_(false), task_finish_count_(0), fill_column_group_finish_count_(0),
commit_scn_(), schema_allocator_("TDL_schema", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), storage_schema_(nullptr)
{
@ -2260,10 +2260,10 @@ int ObTabletDirectLoadMgr::fill_aggregated_column_group(
int ObTabletDirectLoadMgr::prepare_index_builder_if_need(const ObTableSchema &table_schema)
{
int ret = OB_SUCCESS;
ObWholeDataStoreDesc index_block_desc(true/*is ddl*/);
ObWholeDataStoreDesc index_block_desc;
if (sqc_build_ctx_.index_builder_ != nullptr) {
LOG_INFO("index builder is already prepared");
} else if (OB_FAIL(index_block_desc.init(table_schema, ls_id_, tablet_id_,
} else if (OB_FAIL(index_block_desc.init(true/*is ddl*/, table_schema, ls_id_, tablet_id_,
is_full_direct_load(direct_load_type_) ? compaction::ObMergeType::MAJOR_MERGE : compaction::ObMergeType::MINOR_MERGE,
is_full_direct_load(direct_load_type_) ? table_key_.get_snapshot_version() : 1L,
data_format_version_,
@ -2283,7 +2283,7 @@ int ObTabletDirectLoadMgr::prepare_index_builder_if_need(const ObTableSchema &ta
nullptr, // macro block flush callback
ObSSTableIndexBuilder::DISABLE))) {
LOG_WARN("failed to init index builder", K(ret), K(index_block_desc));
} else if (OB_FAIL(sqc_build_ctx_.data_block_desc_.init(table_schema, ls_id_, tablet_id_,
} else if (OB_FAIL(sqc_build_ctx_.data_block_desc_.init(true/*is ddl*/, table_schema, ls_id_, tablet_id_,
is_full_direct_load(direct_load_type_) ? compaction::ObMergeType::MAJOR_MERGE : compaction::ObMergeType::MINOR_MERGE,
is_full_direct_load(direct_load_type_) ? table_key_.get_snapshot_version() : 1L,
data_format_version_,

View File

@ -1423,7 +1423,7 @@ int ObCOSliceWriter::init(const ObStorageSchema *storage_schema, const int64_t c
const uint64_t data_format_version = tablet_direct_load_mgr->get_data_format_version();
ObLSID ls_id = tablet_direct_load_mgr->get_ls_id();
if (OB_FAIL(data_desc_.init(*storage_schema,
if (OB_FAIL(data_desc_.init(true/*is ddl*/, *storage_schema,
ls_id,
table_key.get_tablet_id(),
compaction::ObMergeType::MAJOR_MERGE,

View File

@ -617,7 +617,7 @@ private:
class ObCOSliceWriter
{
public:
ObCOSliceWriter() : is_inited_(false), cg_idx_(-1), cg_schema_(nullptr), data_desc_(true /*is ddl*/) {}
ObCOSliceWriter() : is_inited_(false), cg_idx_(-1), cg_schema_(nullptr), data_desc_() {}
~ObCOSliceWriter() {}
int init(
const ObStorageSchema *storage_schema,

View File

@ -833,6 +833,7 @@ int ObSSTableCopyFinishTask::prepare_data_store_desc_(
}
}
if (FAILEDx(desc.init(
false/*is ddl*/,
*storage_schema,
ls_id,
tablet_id,

View File

@ -307,6 +307,7 @@ int ObGetMergeTablesResult::copy_basic_info(const ObGetMergeTablesResult &src)
is_simplified_ = src.is_simplified_;
is_backfill_ = src.is_backfill_;
backfill_scn_ = src.backfill_scn_;
snapshot_info_ = src.snapshot_info_;
}
return ret;
}

View File

@ -377,7 +377,7 @@ int ObMdsTableMiniMerger::init(compaction::ObTabletMergeCtx &ctx, ObMdsMiniMerge
} else if (OB_UNLIKELY(!storage_schema->is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mds storage schema is invalid", K(ret), KP(storage_schema), KPC(storage_schema));
} else if (OB_FAIL(data_desc_.init(*storage_schema, ls_id, tablet_id,
} else if (OB_FAIL(data_desc_.init(false/*is ddl*/, *storage_schema, ls_id, tablet_id,
ctx.get_merge_type(), ctx.get_snapshot(), data_version,
ctx.static_param_.scn_range_.end_scn_))) {
LOG_WARN("fail to init whole desc", KR(ret), K(ctx), K(ls_id), K(tablet_id));

View File

@ -420,7 +420,7 @@ void ObMultiVersionSSTableTest::reset_writer(const int64_t snapshot_version)
ObTabletID tablet_id(tablet_id_);
SCN scn;
scn.convert_for_tx(snapshot_version);
ASSERT_EQ(OB_SUCCESS, data_desc_.init(table_schema_, ls_id, tablet_id, merge_type_, snapshot_version, DATA_VERSION_4_1_0_0, scn));
ASSERT_EQ(OB_SUCCESS, data_desc_.init(false/*is_ddl*/, table_schema_, ls_id, tablet_id, merge_type_, snapshot_version, DATA_VERSION_4_1_0_0, scn));
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
root_index_builder_ = new (builder_buf) ObSSTableIndexBuilder();
ASSERT_NE(nullptr, root_index_builder_);

View File

@ -49,14 +49,15 @@ TEST_F(TestObDataStoreDesc, test_static_desc)
ObTableSchema table_schema;
TestSchemaPrepare::prepare_schema(table_schema, 5);
table_schema.compressor_type_ = ObCompressorType::ZSTD_1_3_8_COMPRESSOR;
table_schema.set_encrypt_key(ObString("ObEncry"));
const int64_t snapshot = 10000;
share::SCN scn;
scn.convert_for_tx(100);
ASSERT_EQ(OB_INVALID_ARGUMENT,
static_desc.init(table_schema, mock_ls_id_, mock_tablet_id_,
static_desc.init(false/*is_ddl*/, table_schema, mock_ls_id_, mock_tablet_id_,
MINI_MERGE, snapshot, share::SCN::invalid_scn(), 1/*cluster_version*/));
ASSERT_EQ(OB_SUCCESS, static_desc.init(table_schema, mock_ls_id_, mock_tablet_id_, MINI_MERGE, snapshot, scn, 1/*cluster_version*/));
ASSERT_EQ(OB_SUCCESS, static_desc.init(false/*is_ddl*/, table_schema, mock_ls_id_, mock_tablet_id_, MINI_MERGE, snapshot, scn, 1/*cluster_version*/));
ASSERT_TRUE(static_desc.is_valid());
ASSERT_EQ(static_desc.is_ddl_, false);
@ -71,9 +72,9 @@ TEST_F(TestObDataStoreDesc, test_static_desc)
static_desc.reset();
ASSERT_FALSE(static_desc.is_valid());
ObStaticDataStoreDesc static_desc2(true/*is_ddl*/);
ObStaticDataStoreDesc static_desc2;
ASSERT_EQ(OB_SUCCESS,
static_desc2.init(table_schema, mock_ls_id_, mock_tablet_id_,
static_desc2.init(true/*is_ddl*/, table_schema, mock_ls_id_, mock_tablet_id_,
MAJOR_MERGE, snapshot, scn, DATA_VERSION_4_2_0_0));
ASSERT_TRUE(static_desc2.is_valid());
@ -85,10 +86,18 @@ TEST_F(TestObDataStoreDesc, test_static_desc)
ASSERT_EQ(static_desc2.schema_version_, table_schema.schema_version_);
ASSERT_EQ(static_desc2.snapshot_version_, snapshot);
ASSERT_EQ(static_desc2.end_scn_.val_, snapshot);
static_desc2.progressive_merge_round_ = 1;
static_desc2.macro_block_size_ = 100;
static_desc2.macro_store_size_ = 100;
static_desc2.micro_block_size_limit_ = 100;
static_desc2.encrypt_id_ = 100;
static_desc2.master_key_id_ = 100;
ObStaticDataStoreDesc static_desc3(true/*is_ddl*/);
ObStaticDataStoreDesc static_desc3;
ASSERT_EQ(OB_SUCCESS, static_desc3.assign(static_desc2));
ASSERT_TRUE(static_desc3.is_valid());
STORAGE_LOG(INFO, "cmp", K(static_desc2), K(static_desc3));
ASSERT_TRUE(static_desc3 == static_desc2);
}
TEST_F(TestObDataStoreDesc, test_col_desc)
@ -122,7 +131,7 @@ TEST_F(TestObDataStoreDesc, test_whole_data_desc)
ObTableSchema table_schema;
TestSchemaPrepare::prepare_schema(table_schema, 5);
ASSERT_EQ(OB_SUCCESS,
whole_desc.init(table_schema, mock_ls_id_, mock_tablet_id_,
whole_desc.init(false/*is_ddl*/, table_schema, mock_ls_id_, mock_tablet_id_,
MAJOR_MERGE, snapshot, DATA_VERSION_4_2_0_0,
share::SCN::invalid_scn()));
ASSERT_TRUE(whole_desc.is_valid());
@ -130,11 +139,11 @@ TEST_F(TestObDataStoreDesc, test_whole_data_desc)
// point to other static desc member
ObStaticDataStoreDesc static_desc;
ASSERT_EQ(OB_INVALID_ARGUMENT,
static_desc.init(table_schema, mock_ls_id_, mock_tablet_id_,
static_desc.init(false/*is_ddl*/, table_schema, mock_ls_id_, mock_tablet_id_,
MINI_MERGE, snapshot,
share::SCN::invalid_scn(), 0/*cluster_version*/));
ASSERT_EQ(OB_SUCCESS,
static_desc.init(table_schema, mock_ls_id_, mock_tablet_id_,
static_desc.init(false/*is_ddl*/, table_schema, mock_ls_id_, mock_tablet_id_,
MAJOR_MERGE, snapshot,
share::SCN::invalid_scn(), DATA_VERSION_4_2_0_0));
whole_desc.desc_.static_desc_ = &static_desc;
@ -152,7 +161,7 @@ TEST_F(TestObDataStoreDesc, gen_index_desc)
share::SCN scn;
scn.convert_for_tx(100);
ASSERT_EQ(OB_SUCCESS,
data_desc.init(table_schema, mock_ls_id_, mock_tablet_id_,
data_desc.init(false/*is_ddl*/, table_schema, mock_ls_id_, mock_tablet_id_,
MAJOR_MERGE, snapshot, 1/*clsuter_version*/));
ASSERT_TRUE(data_desc.is_valid());
const ObDataStoreDesc &data_store_desc = data_desc.get_desc();
@ -184,7 +193,7 @@ TEST_F(TestObDataStoreDesc, test_cg)
share::SCN scn;
scn.convert_for_tx(100);
ASSERT_EQ(OB_SUCCESS,
static_desc.init(table_schema, mock_ls_id_, mock_tablet_id_,
static_desc.init(false/*is_ddl*/, table_schema, mock_ls_id_, mock_tablet_id_,
MAJOR_MERGE, snapshot, share::SCN::invalid_scn(), DATA_VERSION_4_3_2_0/*cluster_version*/));
ASSERT_TRUE(static_desc.is_valid());

View File

@ -303,6 +303,18 @@ TEST_F(TestStorageSchema, test_update_tablet_store_schema)
ASSERT_EQ(result_storage_schema->is_column_info_simplified(), true);
ObStorageSchemaUtil::free_storage_schema(allocator_, result_storage_schema);
// mock schema with virtual column, same column_cnt & store_column_cnt, simplified = false
storage_schema2.reset();
ASSERT_EQ(OB_SUCCESS, storage_schema2.init(allocator_, table_schema, lib::Worker::CompatMode::MYSQL));
storage_schema1.store_column_cnt_ -= 1;
storage_schema2.store_column_cnt_ -= 1;
ret = ObStorageSchemaUtil::update_tablet_storage_schema(ObTabletID(1), allocator_, storage_schema1, storage_schema2, result_storage_schema);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(result_storage_schema->schema_version_, storage_schema2.schema_version_);
ASSERT_EQ(result_storage_schema->store_column_cnt_, storage_schema2.store_column_cnt_);
ASSERT_EQ(result_storage_schema->is_column_info_simplified(), false);
ObStorageSchemaUtil::free_storage_schema(allocator_, result_storage_schema);
// schema_on_tablet and schema1 have same store column cnt, but storage_schema1 have full column info
ObStorageSchema schema_on_tablet;
ASSERT_EQ(OB_SUCCESS, schema_on_tablet.init(allocator_, storage_schema1, true/*skip_column_info*/));