dont inc progressive round when add column for column_store & collect co major info in dag_net

This commit is contained in:
yangqise7en 2024-09-24 12:45:28 +00:00 committed by ob-robot
parent 86258d2719
commit 2bbdfaad57
18 changed files with 158 additions and 34 deletions

View File

@ -13240,6 +13240,38 @@ int ObDDLService::update_tables_attribute(ObIArray<ObTableSchema*> &new_table_sc
return ret;
}
int ObDDLService::check_need_add_progressive_round(
const uint64_t tenant_data_version,
const ObTableSchema &table_schema,
const AlterTableSchema &alter_table_schema,
bool &need_add_progressive_round)
{
int ret = OB_SUCCESS;
need_add_progressive_round = true;
bool is_column_store_schema = false;
if (tenant_data_version < DATA_VERSION_4_3_3_0) {
// do nothing
} else if (OB_FAIL(table_schema.get_is_column_store(is_column_store_schema))) {
LOG_WARN("failed to get is column store", KR(ret));
} else if (is_column_store_schema) {
AlterColumnSchema *alter_column_schema = nullptr;
ObTableSchema::const_column_iterator it_begin = alter_table_schema.column_begin();
ObTableSchema::const_column_iterator it_end = alter_table_schema.column_end();
need_add_progressive_round = false;
for (;OB_SUCC(ret) && it_begin != it_end; it_begin++) {
if (OB_ISNULL(alter_column_schema = static_cast<AlterColumnSchema *>(*it_begin))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("*it_begin is NULL", K(ret));
} else if (OB_DDL_ADD_COLUMN != alter_column_schema->alter_type_) {
need_add_progressive_round = true;
break;
}
}
}
return ret;
}
//fix me :Check whether the newly added index column covers the partition column --by rongxuan.lc
// It can be repaired after the featrue that add index in alter_table statement
int ObDDLService::alter_table_in_trans(obrpc::ObAlterTableArg &alter_table_arg,
@ -13352,20 +13384,32 @@ int ObDDLService::alter_table_in_trans(obrpc::ObAlterTableArg &alter_table_arg,
} else {
bool need_update_index_table = false;
AlterLocalityOp alter_locality_op = ALTER_LOCALITY_OP_INVALID;
if (alter_table_arg.is_alter_columns_
|| (alter_table_arg.is_alter_options_
&& alter_table_arg.need_progressive_merge())) {
bool need_progressive_for_alter_column = false;
if (alter_table_arg.is_alter_columns_) {
if (OB_FAIL(check_need_add_progressive_round(
tenant_data_version,
*orig_table_schema,
alter_table_arg.alter_table_schema_,
need_progressive_for_alter_column))) {
LOG_WARN("failed to check need progressive round", KR(ret));
}
} else if (alter_table_arg.is_alter_options_
&& alter_table_arg.need_progressive_merge()) {
if (alter_table_arg.alter_table_schema_.alter_option_bitset_.
has_member(ObAlterTableArg::ENCRYPTION) &&
alter_table_arg.alter_table_schema_.is_equal_encryption(*orig_table_schema)) {
// If the values before and after changing the encryption algorithm in the table are the same,
// the merge is not marked
} else {
alter_table_arg.is_alter_options_ = true;
alter_table_arg.alter_table_schema_.set_progressive_merge_round(orig_table_schema->get_progressive_merge_round() + 1);
if (OB_FAIL(alter_table_arg.alter_table_schema_.alter_option_bitset_.add_member(ObAlterTableArg::PROGRESSIVE_MERGE_ROUND))) {
LOG_WARN("fail to add member progressive merge round", K(ret));
}
need_progressive_for_alter_column = true;
}
}
if (OB_SUCC(ret) && need_progressive_for_alter_column) {
alter_table_arg.is_alter_options_ = true;
alter_table_arg.alter_table_schema_.set_progressive_merge_round(orig_table_schema->get_progressive_merge_round() + 1);
if (OB_FAIL(alter_table_arg.alter_table_schema_.alter_option_bitset_.add_member(ObAlterTableArg::PROGRESSIVE_MERGE_ROUND))) {
LOG_WARN("fail to add member progressive merge round", K(ret));
}
}
if (OB_SUCC(ret)) {

View File

@ -1554,6 +1554,11 @@ private:
obrpc::ObAlterTableRes &res,
const uint64_t tenant_data_version);
int check_need_add_progressive_round(
const uint64_t tenant_data_version,
const ObTableSchema &table_schema,
const AlterTableSchema &alter_table_schema,
bool &need_add_progressive_round);
int need_modify_not_null_constraint_validate(const obrpc::ObAlterTableArg &alter_table_arg,
bool &is_add_not_null_col,
bool &need_modify) const;

View File

@ -321,6 +321,30 @@ int ObCOTabletMergeCtx::collect_running_info()
return ret;
}
int ObCOTabletMergeCtx::collect_running_info_in_batch(
const uint32_t start_cg_idx,
const uint32_t end_cg_idx,
const ObCompactionTimeGuard &time_guard)
{
int ret = OB_SUCCESS;
ObSSTableMergeHistory &merge_history = cg_merge_info_array_[start_cg_idx]->get_merge_history();
for (int64_t idx = start_cg_idx + 1; OB_SUCC(ret) && idx < end_cg_idx; ++idx) {
const ObSSTableMergeHistory &tmp = cg_merge_info_array_[idx]->get_merge_history();
if (OB_FAIL(merge_history.update_block_info(tmp.block_info_, true/*without_row_cnt*/))) {
LOG_WARN("failed to update block info", KR(ret), K(tmp));
}
}
if (FAILEDx(dag_net_merge_history_.update_block_info(
merge_history.block_info_,
0 == start_cg_idx ? false : true/*without_row_cnt*/))) {
LOG_WARN("failed to update block info", KR(ret), K(dag_net_merge_history_), K(merge_history));
} else {
dag_net_merge_history_.update_execute_time(merge_history.running_info_.execute_time_);
info_collector_.time_guard_.add_time_guard(time_guard);
}
return ret;
}
// for ObCOMergeBatchExeDag
int ObCOTabletMergeCtx::collect_running_info(
const uint32_t start_cg_idx,
@ -334,6 +358,8 @@ int ObCOTabletMergeCtx::collect_running_info(
if (OB_UNLIKELY(NULL != new_table && !new_table->is_sstable())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get new sstable", K(ret), KP(new_table));
} else if (OB_FAIL(collect_running_info_in_batch(start_cg_idx, end_cg_idx, time_guard))) {
LOG_WARN("failed to collect running info in batch");
} else {
const ObSSTable *new_sstable = static_cast<ObSSTable *>(new_table);
add_sstable_merge_info(cg_merge_info_array_[start_cg_idx]->get_merge_history(),

View File

@ -65,6 +65,10 @@ struct ObCOTabletMergeCtx : public ObBasicTabletMergeCtx
virtual int cal_merge_param() override;
virtual int prepare_index_tree() override { return OB_SUCCESS; }
virtual int collect_running_info() override;
int collect_running_info_in_batch(
const uint32_t start_cg_idx,
const uint32_t end_cg_idx,
const ObCompactionTimeGuard &time_guard);
virtual int build_ctx(bool &finish_flag) override;
virtual int check_merge_ctx_valid() override;
OB_INLINE bool all_cg_finish() const // locked by ObCODagNet ctx_lock_

View File

@ -180,7 +180,7 @@ int ObWriteHelper::end_write(ObTabletMergeInfo &merge_info)
STORAGE_LOG(WARN, "failed to close macro writer", K(ret), K(macro_writer_));
} else {
ObSSTableMergeHistory &merge_history = merge_info.get_merge_history();
if (OB_FAIL(merge_history.update_block_info(macro_writer_.get_merge_block_info()))) {
if (OB_FAIL(merge_history.update_block_info(macro_writer_.get_merge_block_info(), false/*without_row_cnt*/))) {
STORAGE_LOG(WARN, "Failed to add macro blocks", K(ret));
}
}

View File

@ -371,6 +371,9 @@ int ObCOMerger::close()
}
}
}
if (OB_SUCC(ret) && OB_NOT_NULL(merge_infos[start_cg_idx_])) {
merge_infos[start_cg_idx_]->get_merge_history().update_execute_time(ObTimeUtility::fast_current_time() - start_time_);
}
return ret;
}

View File

@ -742,11 +742,13 @@ void ObBasicTabletMergeCtx::add_sstable_merge_info(
if (exe_ts > 0 && block_info.new_micro_info_.get_data_micro_size() > 0) {
block_info.new_flush_data_rate_ = (int)(((float)block_info.new_micro_info_.get_data_micro_size()/ 1024) / ((float)exe_ts / 1_s));
int64_t io_percentage = block_info.block_io_us_ * 100 / (float)exe_ts;
ADD_COMMENT("block_io_us", block_info.block_io_us_);
if (io_percentage > 0) {
running_info.io_percentage_ = io_percentage;
}
}
if (running_info.execute_time_ > 30_s && (get_concurrent_cnt() > 1 || end_cg_idx > 0)) {
ADD_COMMENT("execute_time", running_info.execute_time_);
}
int64_t mem_peak_mb = mem_ctx_.get_total_mem_peak() >> 20;
if (mem_peak_mb > 0) {
ADD_COMMENT("cost_mb", mem_peak_mb);

View File

@ -253,7 +253,10 @@ public:
int build_update_table_store_param(
const blocksstable::ObSSTable *sstable,
ObUpdateTableStoreParam &param);
virtual int update_block_info(const ObMergeBlockInfo &block_info) { return OB_NOT_SUPPORTED; }
virtual int update_block_info(
const ObMergeBlockInfo &block_info,
const int64_t cost_time)
{ return OB_NOT_SUPPORTED; }
VIRTUAL_TO_STRING_KV(K_(static_param), K_(static_desc), K_(parallel_merge_ctx), K_(tablet_handle),
K_(info_collector), KP_(merge_dag));
protected:

View File

@ -708,12 +708,15 @@ int ObMediumCompactionScheduleFunc::check_if_schema_changed(
LOG_ERROR("stored col cnt in curr schema is less than old major sstable", K(ret),
"col_cnt_in_sstable", tablet_handle_.get_obj()->get_last_major_column_count(),
"col_cnt_in_schema", full_stored_col_cnt, KPC(this));
} else if (medium_info.data_version_ >= DATA_VERSION_4_3_3_0) {
// do not check schema changed
medium_info.is_schema_changed_ = false;
} else if (tablet_handle_.get_obj()->get_last_major_column_count() != full_stored_col_cnt
|| tablet_handle_.get_obj()->get_last_major_compressor_type() != schema.get_compressor_type()
|| (ObRowStoreType::DUMMY_ROW_STORE != tablet_handle_.get_obj()->get_last_major_latest_row_store_type()
&& tablet_handle_.get_obj()->get_last_major_latest_row_store_type() != schema.row_store_type_)) {
medium_info.is_schema_changed_ = true;
LOG_INFO("schema changed", KPC(this), K(schema),
LOG_INFO("schema changed", KPC(this), K(schema), K(full_stored_col_cnt),
"col_cnt_in_sstable", tablet_handle_.get_obj()->get_last_major_column_count(),
"compressor_type_in_sstable", tablet_handle_.get_obj()->get_last_major_compressor_type(),
"latest_row_store_type_in_sstable", tablet_handle_.get_obj()->get_last_major_latest_row_store_type());

View File

@ -74,12 +74,13 @@ ObMerger::ObMerger(
: merger_arena_(allocator),
merge_ctx_(nullptr),
task_idx_(0),
force_flat_format_(false),
merge_param_(static_param),
partition_fuser_(nullptr),
merge_helper_(nullptr),
base_iter_(nullptr),
trans_state_mgr_(merger_arena_)
trans_state_mgr_(merger_arena_),
start_time_(ObTimeUtility::current_time()),
force_flat_format_(false)
{
}
@ -287,10 +288,11 @@ int ObPartitionMerger::close()
int ret = OB_SUCCESS;
if (OB_FAIL(macro_writer_->close())) {
STORAGE_LOG(WARN, "Failed to close macro block writer", K(ret));
} else if (OB_FAIL(merge_ctx_->update_block_info(macro_writer_->get_merge_block_info()))) {
} else if (OB_FAIL(merge_ctx_->update_block_info(
macro_writer_->get_merge_block_info(),
ObTimeUtility::fast_current_time() - start_time_))) {
STORAGE_LOG(WARN, "Failed to add macro blocks", K(ret));
}
return ret;
}

View File

@ -112,12 +112,13 @@ protected:
compaction::ObLocalArena &merger_arena_;
ObBasicTabletMergeCtx *merge_ctx_;
int64_t task_idx_;
bool force_flat_format_;
ObMergeParameter merge_param_;
ObIPartitionMergeFuser *partition_fuser_;
ObPartitionMergeHelper *merge_helper_;
ObPartitionMergeIter *base_iter_;
ObCachedTransStateMgr trans_state_mgr_;
int64_t start_time_;
bool force_flat_format_;
};
class ObPartitionMerger : public ObMerger

View File

@ -178,6 +178,7 @@ void ObMergeStaticInfo::shallow_copy(const ObMergeStaticInfo &other)
ObMergeRunningInfo::ObMergeRunningInfo()
: merge_start_time_(0),
merge_finish_time_(0),
execute_time_(0),
start_cg_idx_(0),
end_cg_idx_(0),
io_percentage_(0),
@ -190,6 +191,7 @@ void ObMergeRunningInfo::reset()
{
merge_start_time_ = 0;
merge_finish_time_ = 0;
execute_time_ = 0;
start_cg_idx_ = 0;
end_cg_idx_ = 0;
io_percentage_ = 0;
@ -207,6 +209,7 @@ void ObMergeRunningInfo::shallow_copy(const ObMergeRunningInfo &other)
{
merge_start_time_ = other.merge_start_time_;
merge_finish_time_ = other.merge_finish_time_;
execute_time_ = other.execute_time_;
start_cg_idx_ = other.start_cg_idx_;
end_cg_idx_ = other.end_cg_idx_;
io_percentage_ = other.io_percentage_;
@ -275,14 +278,22 @@ void ObMergeBlockInfo::shallow_copy(const ObMergeBlockInfo &other)
}
void ObMergeBlockInfo::add(const ObMergeBlockInfo &other)
{
total_row_count_ += other.total_row_count_;
incremental_row_count_ += other.incremental_row_count_;
add_without_row_cnt(other);
}
/*
* for column store, each batch should have same row cnt, need skip when add
*/
void ObMergeBlockInfo::add_without_row_cnt(const ObMergeBlockInfo &other)
{
occupy_size_ += other.occupy_size_;
original_size_ += other.original_size_;
compressed_size_ += other.compressed_size_;
macro_block_count_ += other.macro_block_count_;
multiplexed_macro_block_count_ += other.multiplexed_macro_block_count_;
total_row_count_ += other.total_row_count_;
incremental_row_count_ += other.incremental_row_count_;
multiplexed_micro_count_in_new_macro_ += other.multiplexed_micro_count_in_new_macro_;
new_micro_count_in_new_macro_ += other.new_micro_count_in_new_macro_;
block_io_us_ += other.block_io_us_;
@ -372,10 +383,16 @@ void ObSSTableMergeHistory::shallow_copy(ObIDiagnoseInfo *other)
}
}
int ObSSTableMergeHistory::update_block_info(const ObMergeBlockInfo &block_info)
int ObSSTableMergeHistory::update_block_info(
const ObMergeBlockInfo &block_info,
const bool without_row_cnt)
{
int ret = OB_SUCCESS;
block_info_.add(block_info);
if (without_row_cnt) {
block_info_.add_without_row_cnt(block_info);
} else {
block_info_.add(block_info);
}
running_info_.merge_finish_time_ = ObTimeUtility::fast_current_time();
return ret;
}

View File

@ -145,12 +145,13 @@ struct ObMergeRunningInfo
void shallow_copy(const ObMergeRunningInfo &other);
static const int64_t MERGE_INFO_COMMENT_LENGTH = 256;
TO_STRING_KV(K_(merge_start_time), K_(merge_finish_time), K_(dag_id),
"merge_cost_time", merge_finish_time_ - merge_start_time_,
TO_STRING_KV(K_(merge_start_time), K_(merge_finish_time), K_(execute_time), K_(dag_id),
K_(start_cg_idx), K_(end_cg_idx), K_(io_percentage), K_(parallel_merge_info));
int64_t merge_start_time_;
int64_t merge_finish_time_;
// for parallel merge & column store, finish_time-start_time can't show the real execute time
int64_t execute_time_;
int64_t start_cg_idx_;
int64_t end_cg_idx_;
int64_t io_percentage_;
@ -168,6 +169,7 @@ public:
bool is_valid() const;
void shallow_copy(const ObMergeBlockInfo &other);
void add(const ObMergeBlockInfo &block_info);
void add_without_row_cnt(const ObMergeBlockInfo &block_info);
void add_index_block_info(const ObMergeBlockInfo &block_info);
TO_STRING_KV(K_(occupy_size), K_(original_size), K_(macro_block_count), K_(multiplexed_macro_block_count),
K_(new_micro_count_in_new_macro), K_(multiplexed_micro_count_in_new_macro),
@ -213,8 +215,8 @@ struct ObSSTableMergeHistory : public ObIDiagnoseInfo
bool is_valid() const;
void reset();
virtual void shallow_copy(ObIDiagnoseInfo *other) override;
int update_block_info(const ObMergeBlockInfo &block_info);
int update_block_info(const ObMergeBlockInfo &block_info, const bool without_row_cnt);
void update_execute_time(const int64_t cost_time) { running_info_.execute_time_ += cost_time; }
int64_t get_macro_block_count() const { return block_info_.macro_block_count_; }
int64_t get_multiplexed_macro_block_count() const { return block_info_.multiplexed_macro_block_count_; }
bool is_major_merge_type() const { return compaction::is_major_merge_type(static_info_.merge_type_); }

View File

@ -110,6 +110,19 @@ int ObTabletMergeCtx::collect_running_info()
return ret;
}
int ObTabletMergeCtx::update_block_info(
const ObMergeBlockInfo &block_info,
const int64_t cost_time)
{
int ret = OB_SUCCESS;
if (OB_FAIL(merge_info_.get_merge_history().update_block_info(block_info, false/*without_row_cnt*/))) {
LOG_WARN("failed to update block info", KR(ret), K(block_info));
} else {
merge_info_.get_merge_history().update_execute_time(cost_time);
}
return ret;
}
/*
* ----------------------------------------------ObTabletMiniMergeCtx--------------------------------------------------
*/

View File

@ -61,10 +61,9 @@ public:
virtual int create_sstable(const blocksstable::ObSSTable *&new_sstable) override;
virtual int collect_running_info() override;
const ObSSTableMergeHistory &get_merge_history() { return merge_info_.get_merge_history(); }
virtual int update_block_info(const ObMergeBlockInfo &block_info) override
{
return merge_info_.get_merge_history().update_block_info(block_info);
}
virtual int update_block_info(
const ObMergeBlockInfo &block_info,
const int64_t cost_time) override;
INHERIT_TO_STRING_KV("ObBasicTabletMergeCtx", ObBasicTabletMergeCtx, K_(merge_info));
storage::ObTableHandleV2 merged_table_handle_;
ObTabletMergeInfo merge_info_;

View File

@ -424,7 +424,7 @@ int ObMdsTableMiniMerger::generate_mds_mini_sstable(
SMART_VARS_2((ObSSTableMergeRes, res), (ObTabletCreateSSTableParam, param)) {
if (OB_FAIL(macro_writer_.close())) {
LOG_WARN("fail to close macro writer", K(ret), K(macro_writer_));
} else if (OB_FAIL(ctx_->update_block_info(macro_writer_.get_merge_block_info()))) {
} else if (OB_FAIL(ctx_->update_block_info(macro_writer_.get_merge_block_info(), 0/*cost_time*/))) {
STORAGE_LOG(WARN, "Failed to add macro blocks", K(ret));
} else if (OB_FAIL(sstable_builder_.close(res))) {
LOG_WARN("fail to close sstable builder", K(ret), K(sstable_builder_));

View File

@ -1,8 +1,8 @@
create table t1(a int, b varchar(3048), primary key (a)) with column group (all columns, each column);
create table t2(c0 int, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int) with column group (all columns, each column);
create table t2(c0 int, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int, c13 int, c14 int, c15 int, c16 int, c17 int, c18 int, c19 int, c20 int, c21 int, c22 int, c23 int, c24 int, c25 int) with column group (all columns, each column);
create table t3(a int, b varchar(3048), primary key (a)) with column group (all columns, each column);
insert into t1 values(1, 1);
insert into t2 values(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
insert into t2 values(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25);
insert into t3 values(1, 1);
select count(*) from t1;
count(*)

View File

@ -13,11 +13,11 @@ drop table if exists t3;
--enable_query_log
create table t1(a int, b varchar(3048), primary key (a)) with column group (all columns, each column);
create table t2(c0 int, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int) with column group (all columns, each column);
create table t2(c0 int, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int, c13 int, c14 int, c15 int, c16 int, c17 int, c18 int, c19 int, c20 int, c21 int, c22 int, c23 int, c24 int, c25 int) with column group (all columns, each column);
create table t3(a int, b varchar(3048), primary key (a)) with column group (all columns, each column);
sleep 10;
insert into t1 values(1, 1);
insert into t2 values(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
insert into t2 values(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25);
insert into t3 values(1, 1);
--disable_query_log