opt the display of co merge history info

This commit is contained in:
Fengjingkun 2024-12-06 08:49:20 +00:00 committed by ob-robot
parent adf639e461
commit 5358c920cf
14 changed files with 104 additions and 44 deletions

View File

@ -320,9 +320,9 @@ int ObLLVMDIHelper::create_struct_type(
LOG_WARN("name or jc or file or scope is NULL",
K(ret), K(name), K(jc_), K(file), K(scope), K(line));
} else {
SmallVector<Metadata *, 8> element_types;
for (int i = 0; OB_SUCC(ret) && i < member_types.count(); i++) {
if (OB_ISNULL(member_types.at(i).get_v())) {
SmallVector<Metadata *, 8> element_types;
for (int i = 0; OB_SUCC(ret) && i < member_types.count(); i++) {
if (OB_ISNULL(member_types.at(i).get_v())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("member type is NULL", K(ret), K(i), K(member_types.count()));
} else {

View File

@ -441,7 +441,7 @@ int ObMicroBlockCSEncoder::append_row(const ObDatumRow &row)
if (OB_UNLIKELY(appended_row_count_ >= ObCSEncodingUtil::MAX_MICRO_BLOCK_ROW_CNT
|| appended_row_count_ > ctx_.encoding_granularity_)) {
ret = OB_BUF_NOT_ENOUGH;
LOG_INFO("Try to encode more rows than maximum of row cnt in header, force to build a block",
LOG_DEBUG("Try to encode more rows than maximum of row cnt in header, force to build a block",
K_(appended_row_count), K(row), K(ctx_.encoding_granularity_));
} else if (OB_FAIL(copy_and_append_row_(row, store_size))) {
if (OB_UNLIKELY(OB_BUF_NOT_ENOUGH != ret)) {

View File

@ -609,7 +609,6 @@ int ObMacroBlockWriter::append_row(const ObDatumRow &row, const ObMacroBlockDesc
} else if (OB_FAIL(try_active_flush_macro_block())) {
STORAGE_LOG(WARN, "Fail to try_active_flush_macro_block", K(ret));
} else {
++merge_block_info_.incremental_row_count_;
STORAGE_LOG(DEBUG, "Success to append row, ", "tablet_id", data_store_desc_->get_tablet_id(), K(row));
}
return ret;

View File

@ -192,6 +192,7 @@ public:
const int64_t verify_level = MICRO_BLOCK_MERGE_VERIFY_LEVEL::ENCODING_AND_COMPRESSION);
inline int64_t get_macro_data_size() const { return macro_blocks_[current_index_].get_data_size() + micro_writer_->get_block_size(); }
const compaction::ObMergeBlockInfo& get_merge_block_info() const { return merge_block_info_; }
void inc_incremental_row_count() { ++merge_block_info_.incremental_row_count_; }
protected:
virtual int build_micro_block();
virtual int try_switch_macro_block();

View File

@ -376,14 +376,25 @@ int ObCOTabletMergeCtx::cal_merge_param()
int ObCOTabletMergeCtx::collect_running_info()
{
int ret = OB_SUCCESS;
dag_net_merge_history_.static_info_.shallow_copy(static_history_);
dag_net_merge_history_.static_info_.is_fake_ = true;
dag_net_merge_history_.running_info_.merge_start_time_ = dag_net_.get_start_time();
// add a fake merge info into history with only dag_net time_guard
(void) ObBasicTabletMergeCtx::add_sstable_merge_info(dag_net_merge_history_, dag_net_.get_dag_id(), dag_net_.hash(),
info_collector_.time_guard_, nullptr/*sstable*/,
&static_param_.snapshot_info_,
0/*start_cg_idx*/, array_count_/*end_cg_idx*/);
const int64_t batch_exe_dag_cnt = dag_net_.get_batch_dag_count();
if (is_build_row_store() || 1 >= batch_exe_dag_cnt) {
// no need to report dag net merge history
} else {
dag_net_merge_history_.static_info_.shallow_copy(static_history_);
dag_net_merge_history_.static_info_.is_fake_ = true;
dag_net_merge_history_.running_info_.merge_start_time_ = dag_net_.get_start_time();
// add a fake merge info into history with only dag_net time_guard
(void) ObBasicTabletMergeCtx::add_sstable_merge_info(dag_net_merge_history_,
dag_net_.get_dag_id(),
dag_net_.hash(),
info_collector_.time_guard_,
nullptr/*sstable*/,
&static_param_.snapshot_info_,
0/*start_cg_idx*/,
array_count_/*end_cg_idx*/,
batch_exe_dag_cnt);
}
return ret;
}
@ -428,7 +439,7 @@ int ObCOTabletMergeCtx::collect_running_info(
LOG_WARN("failed to collect running info in batch");
} else {
const ObSSTable *new_sstable = static_cast<ObSSTable *>(new_table);
add_sstable_merge_info(cg_merge_info_array_[start_cg_idx]->get_merge_history(),
ObBasicTabletMergeCtx::add_sstable_merge_info(cg_merge_info_array_[start_cg_idx]->get_merge_history(),
dag_id, hash, time_guard, new_sstable, &static_param_.snapshot_info_, start_cg_idx, end_cg_idx);
}
return ret;

View File

@ -500,7 +500,6 @@ int ObCOMergeBatchExeDag::init_by_param(const share::ObIDagInitParam *param)
int ObCOMergeBatchExeDag::create_first_task()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObCOMergeBatchExeTask *execute_task = nullptr;
ObCOMergeBatchFinishTask *finish_task = nullptr;
ObCOMergeDagNet *dag_net = static_cast<ObCOMergeDagNet*>(get_dag_net());
@ -941,6 +940,7 @@ int ObCOMergeBatchFinishTask::process()
} else if (OB_FAIL(execute_dag->create_sstable_after_merge())) {
LOG_WARN("failed to create sstable after merge", K(ret), KPC(execute_dag));
} else {
dag_net_->inc_batch_dag_count();
FLOG_INFO("co batch sstable merge finish", K(ret),
"start_cg sstable_merge_info", ctx_->cg_merge_info_array_[execute_dag->get_start_cg_idx()]->get_merge_history(),
"time_guard", execute_dag->get_time_guard(),
@ -1089,6 +1089,7 @@ ObCOMergeDagNet::ObCOMergeDagNet()
batch_reduced_(false),
ctx_lock_(),
merge_batch_size_(ObCOTabletMergeCtx::DEFAULT_CG_MERGE_BATCH_SIZE),
batch_dag_cnt_(0),
merge_status_(COMergeStatus::NOT_INIT),
basic_param_(),
tmp_allocator_("CoDagNet", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID(), ObCtxIds::MERGE_NORMAL_CTX_ID),

View File

@ -294,6 +294,8 @@ public:
int swap_tablet_after_minor();
ObCOTabletMergeCtx *get_merge_ctx() const { return co_merge_ctx_; }
const ObCOMergeDagParam& get_dag_param() const { return basic_param_; }
int64_t get_batch_dag_count() const { return ATOMIC_LOAD(&batch_dag_cnt_); }
void inc_batch_dag_count() { ATOMIC_INC(&batch_dag_cnt_); }
void collect_running_info(const uint32_t start_cg_idx, const uint32_t end_cg_idx, const int64_t hash,
const share::ObDagId &dag_id, const ObCompactionTimeGuard &time_guard);
template<class T>
@ -304,7 +306,7 @@ public:
share::ObIDag *parent = nullptr,
const bool add_scheduler_flag = true);
INHERIT_TO_STRING_KV("ObIDagNet", ObIDagNet, K_(is_inited), K_(merge_status), K_(finish_added),
K_(merge_batch_size), K_(basic_param), KP_(finish_dag));
K_(merge_batch_size), K_(batch_dag_cnt), K_(basic_param), KP_(finish_dag));
private:
static const int64_t DELAY_SCHEDULE_FINISH_DAG_CG_CNT = 150;
static const int64_t DEFAULT_MAX_RETRY_TIMES = 2;
@ -347,6 +349,7 @@ private:
bool batch_reduced_; // only reduce batch_size one time in a round // locked by ctx_lock_
lib::ObMutex ctx_lock_;
int64_t merge_batch_size_; // will decrease when meet memory allocate failed
int64_t batch_dag_cnt_; // record the batch exec dag cnt
COMergeStatus merge_status_;
ObCOMergeDagParam basic_param_;
common::ObArenaAllocator tmp_allocator_; // TODO(@lixia.yq) temp solution, use allocator on ObIDagNet later

View File

@ -769,7 +769,8 @@ void ObBasicTabletMergeCtx::add_sstable_merge_info(
const ObSSTable *sstable,
const ObStorageSnapshotInfo *snapshot_info,
const int64_t start_cg_idx,
const int64_t end_cg_idx)
const int64_t end_cg_idx,
const int64_t batch_exec_dag_cnt)
{
int tmp_ret = OB_SUCCESS;
ObDagWarningInfo warning_info;
@ -802,6 +803,9 @@ void ObBasicTabletMergeCtx::add_sstable_merge_info(
running_info.io_percentage_ = io_percentage;
}
}
if (batch_exec_dag_cnt > 0) {
ADD_COMMENT("CO_DAG_NET batch_cnt", batch_exec_dag_cnt);
}
if (running_info.execute_time_ > 30_s && (get_concurrent_cnt() > 1 || end_cg_idx > 0)) {
ADD_COMMENT("execute_time", running_info.execute_time_);
}
@ -1332,6 +1336,7 @@ int ObBasicTabletMergeCtx::get_meta_compaction_info()
int64_t schema_version = 0;
ObStorageSchema *storage_schema = nullptr;
bool is_building_index = false; // placeholder
uint64_t min_data_version = 0;
if (OB_UNLIKELY(!is_meta_major_merge(get_merge_type())
|| nullptr != static_param_.schema_)) {
@ -1344,10 +1349,12 @@ int ObBasicTabletMergeCtx::get_meta_compaction_info()
LOG_WARN("failed to get schema service from MTL", K(ret));
} else if (OB_FAIL(tablet->get_schema_version_from_storage_schema(schema_version))){
LOG_WARN("failed to get schema version from tablet", KR(ret), KPC(tablet));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), min_data_version))) {
LOG_WARN("failed to get min data version", K(ret));
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_table_schema_to_merge(*schema_service,
*tablet,
schema_version,
DATA_CURRENT_VERSION,
min_data_version,
mem_ctx_.get_allocator(),
*storage_schema,
is_building_index))) {
@ -1366,7 +1373,7 @@ int ObBasicTabletMergeCtx::get_meta_compaction_info()
}
if (OB_SUCC(ret)) {
static_param_.data_version_ = DATA_CURRENT_VERSION;
static_param_.data_version_ = min_data_version;
static_param_.is_rebuild_column_store_ = false;
static_param_.is_schema_changed_ = true; // use MACRO_BLOCK_MERGE_LEVEL
static_param_.merge_reason_ = ObAdaptiveMergePolicy::TOMBSTONE_SCENE;

View File

@ -204,7 +204,8 @@ public:
const blocksstable::ObSSTable *sstable = nullptr,
const ObStorageSnapshotInfo *snapshot_info = nullptr,
const int64_t start_cg_idx = 0,
const int64_t end_cg_idx = 0);
const int64_t end_cg_idx = 0,
const int64_t batch_exec_dag_cnt = 0);
int generate_participant_table_info(PartTableInfo &info) const;
int generate_macro_id_list(char *buf, const int64_t buf_len, const blocksstable::ObSSTable *&sstable) const;
/* GET FUNC */

View File

@ -742,10 +742,10 @@ int ObMediumCompactionScheduleFunc::check_if_schema_changed(
|| (ObRowStoreType::DUMMY_ROW_STORE != tablet.get_last_major_latest_row_store_type()
&& tablet.get_last_major_latest_row_store_type() != schema.row_store_type_)) {
is_schema_changed = true;
LOG_INFO("schema changed", K(schema), K(schema), K(full_stored_col_cnt),
"col_cnt_in_sstable", tablet.get_last_major_column_count(),
"compressor_type_in_sstable", tablet.get_last_major_compressor_type(),
"latest_row_store_type_in_sstable", tablet.get_last_major_latest_row_store_type());
LOG_INFO("schema changed", K(schema), K(full_stored_col_cnt),
"col_cnt_in_sstable", tablet.get_last_major_column_count(),
"compressor_type_in_sstable", tablet.get_last_major_compressor_type(),
"latest_row_store_type_in_sstable", tablet.get_last_major_latest_row_store_type());
} else {
is_schema_changed = false;
}

View File

@ -431,10 +431,23 @@ int ObCOMajorMergeProgress::finish_merge_progress()
} else if (OB_UNLIKELY(OB_ISNULL(merge_dag_) || typeid(*merge_dag_) != typeid(ObCOMergeBatchExeDag))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("merge_dag has unexpected type", K(ret), KPC_(merge_dag));
} else if (OB_UNLIKELY(OB_ISNULL(ctx_) || typeid(*ctx_) != typeid(ObCOTabletMergeCtx))) {
} else if (OB_ISNULL(ctx_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ctx has unexpected type", K(ret), KPC_(ctx));
} else {
LOG_WARN("get unexpected null ctx", K(ret), KPC_(ctx));
} else if (typeid(*ctx_) != typeid(ObCOTabletMergeCtx)) {
if (!GCTX.is_shared_storage_mode()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ctx has unexpected type", K(ret), KPC_(ctx));
#ifdef OB_BUILD_SHARED_STORAGE
} else if (typeid(*ctx_) != typeid(ObCOTabletOutputMergeCtx)
&& typeid(*ctx_) != typeid(ObCOTabletValidateMergeCtx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ctx has unexpected type", K(ret), KPC_(ctx));
#endif
}
}
if (OB_SUCC(ret)) {
ObCOMergeBatchExeDag *merge_dag = static_cast<ObCOMergeBatchExeDag*>(merge_dag_);
ObCOTabletMergeCtx *ctx = static_cast<ObCOTabletMergeCtx*>(ctx_);
if (OB_FAIL(finish_progress(ctx->get_merge_version(),

View File

@ -356,7 +356,9 @@ int ObPartitionMerger::process(const ObMicroBlock &micro_block)
return ret;
}
int ObPartitionMerger::process(const ObDatumRow &row)
int ObPartitionMerger::process(
const ObDatumRow &row,
bool is_incremental_row)
{
int ret = OB_SUCCESS;
ObICompactionFilter::ObFilterRet filter_ret = ObICompactionFilter::FILTER_RET_MAX;
@ -385,7 +387,7 @@ int ObPartitionMerger::process(const ObDatumRow &row)
// drop this row
} else if (OB_FAIL(check_row_columns(row))) {
STORAGE_LOG(WARN, "Failed to check row columns", K(ret), K(row));
} else if (OB_FAIL(inner_process(row))) {
} else if (OB_FAIL(inner_process(row, is_incremental_row))) {
STORAGE_LOG(WARN, "Failed to inner append row", K(ret));
} else {
LOG_DEBUG("append row", K(ret), K(row));
@ -531,7 +533,9 @@ void write_wrong_row(const ObTabletID &tablet_id, const ObDatumRow &row)
}
#endif
int ObPartitionMajorMerger::inner_process(const ObDatumRow &row)
int ObPartitionMajorMerger::inner_process(
const ObDatumRow &row,
bool is_incremental_row)
{
int ret = OB_SUCCESS;
const bool is_delete = row.row_flag_.is_delete();
@ -548,6 +552,8 @@ int ObPartitionMajorMerger::inner_process(const ObDatumRow &row)
STORAGE_LOG(WARN, "Failed to get base iter macro", K(ret));
} else if (OB_FAIL(macro_writer_->append_row(row, macro_desc))) {
STORAGE_LOG(WARN, "Failed to append row to macro writer", K(ret));
} else if (is_incremental_row) {
macro_writer_->inc_incremental_row_count();
}
}
@ -624,7 +630,7 @@ int ObPartitionMajorMerger::merge_partition(
STORAGE_LOG(WARN, "cur row is null, but block opened", K(ret), KPC(iter));
}
} else if (OB_FAIL(merge_same_rowkey_iters(minimum_iters_))) {
STORAGE_LOG(WARN, "failed to merge_same_rowkey_iters", K(ret), K(minimum_iters_));
STORAGE_LOG(WARN, "failed to merge same rowkey iters", K(ret), K(minimum_iters_));
}
if (OB_FAIL(ret)) {
@ -681,12 +687,22 @@ int ObPartitionMajorMerger::init_progressive_merge_helper()
return ret;
}
int ObPartitionMajorMerger::merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters)
int ObPartitionMajorMerger::merge_same_rowkey_iters(
MERGE_ITER_ARRAY &merge_iters,
bool is_incremental_row)
{
int ret = OB_SUCCESS;
if (is_incremental_row &&
1 == merge_iters.count() &&
OB_NOT_NULL(merge_iters.at(0)) &&
merge_iters.at(0)->is_major_sstable_iter()) {
is_incremental_row = false;
}
if (OB_FAIL(partition_fuser_->fuse_row(merge_iters))) {
STORAGE_LOG(WARN, "Failed to fuse row", KPC_(partition_fuser), K(ret));
} else if (OB_FAIL(process(partition_fuser_->get_result_row()))) {
} else if (OB_FAIL(process(partition_fuser_->get_result_row(), is_incremental_row))) {
STORAGE_LOG(WARN, "Failed to process row", K(ret), K(partition_fuser_->get_result_row()));
} else if (OB_FAIL(merge_helper_->move_iters_next(merge_iters))) {
STORAGE_LOG(WARN, "failed to move iters", K(ret), K(merge_iters));
@ -743,7 +759,7 @@ int ObPartitionMajorMerger::rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters)
curr_macro_id = curr_macro->macro_block_id_;
// TODO maybe we need use macro_block_ctx to decide whether the result row came from the same macro block
while (OB_SUCC(ret) && !iter->is_iter_end() && iter->is_macro_block_opened()) {
if (OB_FAIL(merge_same_rowkey_iters(minimum_iters))) {
if (OB_FAIL(merge_same_rowkey_iters(minimum_iters, false))) {
STORAGE_LOG(WARN, "failed to merge_same_rowkey_iters", K(ret), K(minimum_iters));
} else if (OB_FAIL(iter->get_curr_macro_block(tmp_macro))) {
STORAGE_LOG(WARN, "failed to get curr macro block", K(ret), KPC(tmp_macro));
@ -893,15 +909,20 @@ int ObPartitionMinorMerger::rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters)
return ret;
}
int ObPartitionMinorMerger::inner_process(const ObDatumRow &row)
int ObPartitionMinorMerger::inner_process(
const ObDatumRow &row,
bool is_incremental_row)
{
int ret = OB_SUCCESS;
UNUSED(is_incremental_row);
const blocksstable::ObMacroBlockDesc *macro_desc;
if (OB_FAIL(get_base_iter_curr_macro_block(macro_desc))) {
STORAGE_LOG(WARN, "Failed to get base iter macro", K(ret));
} else if (OB_FAIL(macro_writer_->append_row(row, macro_desc))) {
STORAGE_LOG(WARN, "Failed to append row to macro writer", K(ret));
} else {
macro_writer_->inc_incremental_row_count();
STORAGE_LOG(DEBUG, "Success to append row to minor macro writer", K(ret), K(row));
}
@ -1250,9 +1271,12 @@ int ObPartitionMinorMerger::try_remove_ghost_iters(MERGE_ITER_ARRAY &merge_iters
return ret;
}
int ObPartitionMinorMerger::merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters)
int ObPartitionMinorMerger::merge_same_rowkey_iters(
MERGE_ITER_ARRAY &merge_iters,
bool is_incremental_row)
{
int ret = OB_SUCCESS;
UNUSED(is_incremental_row);
if (OB_UNLIKELY(merge_iters.empty())) {
ret = OB_INVALID_ARGUMENT;

View File

@ -132,17 +132,17 @@ public:
INHERIT_TO_STRING_KV("ObPartitionMerger", ObMerger, KPC_(merge_progress), K_(data_store_desc),
K_(minimum_iters), KP_(validator));
protected:
virtual int inner_process(const blocksstable::ObDatumRow &row) = 0;
virtual int inner_process(const blocksstable::ObDatumRow &row, bool is_incremental_row = true) = 0;
virtual int close() override;
virtual int process(const blocksstable::ObMicroBlock &micro_block);
virtual int process(
const blocksstable::ObMacroBlockDesc &macro_meta,
const ObMicroBlockData *micro_block_data);
virtual int process(const blocksstable::ObDatumRow &row);
virtual int process(const blocksstable::ObDatumRow &row, bool is_incremental_row = true);
virtual int rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) = 0;
virtual int merge_macro_block_iter(MERGE_ITER_ARRAY &minimum_iters, int64_t &reuse_row_cnt);
virtual int check_macro_block_op(const ObMacroBlockDesc &macro_desc, ObMacroBlockOp &block_op);
virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) = 0;
virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters, bool is_incremental_row = true) = 0;
int check_row_columns(const blocksstable::ObDatumRow &row);
int try_filter_row(const blocksstable::ObDatumRow &row, ObICompactionFilter::ObFilterRet &filter_ret);
@ -174,12 +174,12 @@ public:
const int64_t idx) override;
INHERIT_TO_STRING_KV("ObPartitionMajorMerger", ObPartitionMerger, "curr merger", "major merger");
protected:
virtual int inner_process(const blocksstable::ObDatumRow &row) override;
virtual int inner_process(const blocksstable::ObDatumRow &row, bool is_incremental_row = true) override;
private:
virtual int inner_init() override;
int init_progressive_merge_helper();
virtual int rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) override;
virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) override;
virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters, bool is_incremental_row = true) override;
int merge_micro_block_iter(ObPartitionMergeIter &iter, int64_t &reuse_row_cnt);
int reuse_base_sstable(ObPartitionMergeHelper &merge_helper);
};
@ -197,11 +197,11 @@ public:
const int64_t idx) override;
INHERIT_TO_STRING_KV("ObPartitionMinorMerger", ObPartitionMerger, K_(minimum_iter_idxs));
protected:
virtual int inner_process(const blocksstable::ObDatumRow &row) override;
virtual int inner_process(const blocksstable::ObDatumRow &row, bool is_incremental_row = true) override;
int find_minimum_iters_with_same_rowkey(MERGE_ITER_ARRAY &merge_iters,
MERGE_ITER_ARRAY &minimum_iters,
common::ObIArray<int64_t> &iter_idxs);
virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) override;
virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters, bool is_incremental_row = true) override;
int try_remove_ghost_iters(MERGE_ITER_ARRAY &merge_iters,
const bool shadow_already_output,
MERGE_ITER_ARRAY &minimum_iters,

View File

@ -173,7 +173,7 @@ int ObProgressiveMergeHelper::init(
&& sstable.is_normal_cg_sstable() && rewrite_macro_cnt < CG_TABLE_CHECK_REWRITE_CNT_) {
check_macro_need_merge_ = true;
}
LOG_INFO("finish macro block need merge check", "tablet_id", static_param.get_tablet_id(), K(check_macro_need_merge_), K(rewrite_macro_cnt), K(reduce_macro_cnt), K(table_idx_));
FLOG_INFO("finish macro block need merge check", "tablet_id", static_param.get_tablet_id(), K(check_macro_need_merge_), K(rewrite_macro_cnt), K(reduce_macro_cnt), K(table_idx_));
}
}