diff --git a/src/storage/blocksstable/ob_datum_row.cpp b/src/storage/blocksstable/ob_datum_row.cpp index 1d85a5d092..f50464fc2c 100644 --- a/src/storage/blocksstable/ob_datum_row.cpp +++ b/src/storage/blocksstable/ob_datum_row.cpp @@ -835,7 +835,6 @@ int ObGhostRowUtil::is_ghost_row( int ObGhostRowUtil::make_ghost_row( const int64_t sql_sequence_col_idx, - const ObQueryFlag &query_flag, blocksstable::ObDatumRow &row) { int ret = OB_SUCCESS; @@ -859,6 +858,22 @@ int ObGhostRowUtil::make_ghost_row( return ret; } +int ObShadowRowUtil::make_shadow_row(const int64_t sql_sequence_col_idx, + blocksstable::ObDatumRow &row) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(((row.mvcc_row_flag_.is_uncommitted_row() || row.trans_id_.is_valid())) + || row.get_column_count() < sql_sequence_col_idx)) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(row), K(sql_sequence_col_idx)); + } else { + row.storage_datums_[sql_sequence_col_idx].reuse(); + row.storage_datums_[sql_sequence_col_idx].set_int(-INT64_MAX); + row.set_shadow_row(); + } + return ret; +} + void format_dml_str(const int32_t flag, char *str, int len) { OB_ASSERT(len >= 16); int32_t bit; diff --git a/src/storage/blocksstable/ob_datum_row.h b/src/storage/blocksstable/ob_datum_row.h index 17cb7655bd..7bb5deda90 100644 --- a/src/storage/blocksstable/ob_datum_row.h +++ b/src/storage/blocksstable/ob_datum_row.h @@ -820,12 +820,20 @@ public: ~ObGhostRowUtil() = delete; static int make_ghost_row( const int64_t sql_sequence_col_idx, - const common::ObQueryFlag &query_flag, blocksstable::ObDatumRow &row); static int is_ghost_row(const blocksstable::ObMultiVersionRowFlag &flag, bool &is_ghost_row); static const int64_t GHOST_NUM = INT64_MAX; }; +struct ObShadowRowUtil { +public: + ObShadowRowUtil() = delete; + ~ObShadowRowUtil() = delete; + static int make_shadow_row( + const int64_t sql_sequence_col_idx, + blocksstable::ObDatumRow &row); +}; + struct ObSqlDatumInfo { public: ObSqlDatumInfo() : diff --git a/src/storage/blocksstable/ob_micro_block_row_scanner.cpp b/src/storage/blocksstable/ob_micro_block_row_scanner.cpp index 461af2c888..89bf433dbd 100644 --- a/src/storage/blocksstable/ob_micro_block_row_scanner.cpp +++ b/src/storage/blocksstable/ob_micro_block_row_scanner.cpp @@ -1925,14 +1925,13 @@ int ObMultiVersionMicroBlockRowScanner::get_store_rowkey(ObStoreRowkey &store_ro return ret; } -////////////////////////////// ObMultiVersionMicroBlockMinorMergeRowScannerV2 ////////////////////////////// +////////////////////////////// ObMultiVersionMicroBlockMinorMergeRowScanner ////////////////////////////// void ObMultiVersionMicroBlockMinorMergeRowScanner::reuse() { row_.row_flag_.set_flag(ObDmlFlag::DF_NOT_EXIST); ObIMicroBlockRowScanner::reuse(); } -// The scanner of the same sstable is shared, the previous state needs to be kept, so clear_status cannot be called int ObMultiVersionMicroBlockMinorMergeRowScanner::init( const ObTableIterParam ¶m, ObTableAccessContext &context, @@ -2000,7 +1999,7 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::inner_get_next_row(const ObDat } else if (FALSE_IT(++current_)) { } else if (skip_curr_row) { if (row_.is_last_multi_version_row()) { - if (OB_FAIL(ObGhostRowUtil::make_ghost_row(sql_sequence_col_idx_, context_->query_flag_, row_))) { + if (OB_FAIL(ObGhostRowUtil::make_ghost_row(sql_sequence_col_idx_, row_))) { LOG_WARN("failed to make ghost row", K(ret), K(row_)); } else { break; @@ -2020,7 +2019,7 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::inner_get_next_row(const ObDat LOG_ERROR("row is invalid", KPC(row)); } else if (FALSE_IT(row = &row_)) { } else if (row_.row_flag_.is_delete() && !row_.mvcc_row_flag_.is_uncommitted_row()) { - // set delete/insert committed row compacted + // set delete committed row compacted row_.set_compacted_multi_version_row(); } return ret; @@ -2031,26 +2030,11 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::check_row_trans_state(bool &sk int ret = OB_SUCCESS; skip_curr_row = false; if (row_.mvcc_row_flag_.is_uncommitted_row()) { - const transaction::ObTransID read_trans_id = row_.get_trans_id(); - const int64_t sql_sequence = -row_.storage_datums_[sql_sequence_col_idx_].get_int(); bool can_read = false; - //get trans status & committed_trans_version_ - int64_t state; - compaction::ObMergeCachedTransState trans_state; - const transaction::ObTxSEQ tx_sequence = transaction::ObTxSEQ::cast_from_int(sql_sequence); - if (OB_NOT_NULL(context_->trans_state_mgr_) && - OB_SUCCESS == context_->trans_state_mgr_->get_trans_state(read_trans_id, tx_sequence, trans_state)) { - state = trans_state.trans_state_; - last_trans_state_ = trans_state.trans_state_; - committed_trans_version_ = trans_state.trans_version_; - } else if (OB_FAIL(get_trans_state(read_trans_id, state, committed_trans_version_))) { - LOG_WARN("get transaction status failed", K(ret), K(read_trans_id), K(state)); - } - - if (OB_FAIL(ret)) { - } else if (state != ObTxData::ABORT - && OB_FAIL(check_curr_row_can_read(read_trans_id, tx_sequence, can_read))) { - LOG_WARN("micro block reader fail to get row.", K(ret), K_(macro_id)); + int64_t state = ObTxData::MAX_STATE_CNT; + const transaction::ObTransID &read_trans_id = row_.trans_id_; + if (OB_FAIL(get_trans_state(read_trans_id, state, can_read))) { // will get committed_trans_version_ & last_trans_state_ + LOG_WARN("get transaction status failed", K(ret), "trans_id", row_.get_trans_id(), K(state)); } else if (!can_read) { skip_curr_row = true; } else { @@ -2084,59 +2068,80 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::check_row_trans_state(bool &sk return ret; } -int ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state(const transaction::ObTransID &trans_id, - int64_t &state, - int64_t &commit_trans_version) +int ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state( + const transaction::ObTransID &read_trans_id, + int64_t &state, + bool &can_read) { int ret = OB_SUCCESS; - // get trans status & committed_trans_version_ - SCN scn_commit_trans_version = SCN::max_scn(); - auto &tx_table_guards = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guards(); - if (OB_FAIL(tx_table_guards.get_tx_state_with_scn( - trans_id, context_->merge_scn_, state, scn_commit_trans_version))) { - LOG_WARN("get transaction status failed", K(ret), K(trans_id), K(state)); - } else { - commit_trans_version = scn_commit_trans_version.get_val_for_tx(); - last_trans_state_ = state; + can_read = false; + const int64_t sql_sequence = -row_.storage_datums_[sql_sequence_col_idx_].get_int(); + const transaction::ObTxSEQ tx_sequence = transaction::ObTxSEQ::cast_from_int(sql_sequence); + state = get_trans_state_from_cache(read_trans_id, tx_sequence, can_read); + if (ObTxCommitData::MAX_STATE_CNT == state + && OB_FAIL(get_trans_state_from_tx_table(read_trans_id, tx_sequence, state, can_read))) { + LOG_WARN("failed to get trans state from tx table", KR(ret), "trans_id", row_.get_trans_id(), K(tx_sequence)); } return ret; } -int ObMultiVersionMicroBlockMinorMergeRowScanner::check_curr_row_can_read( - const transaction::ObTransID &trans_id, - const transaction::ObTxSEQ &sql_seq, - bool &can_read) +int64_t ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state_from_cache( + const transaction::ObTransID &read_trans_id, + const transaction::ObTxSEQ &tx_sequence, + bool &can_read) +{ + int64_t state = ObTxCommitData::MAX_STATE_CNT; + compaction::ObMergeCachedTransState trans_state; + if (OB_NOT_NULL(context_->trans_state_mgr_) && + OB_SUCCESS == context_->trans_state_mgr_->get_trans_state( + read_trans_id, tx_sequence, trans_state)) { + state = trans_state.trans_state_; + last_trans_state_ = trans_state.trans_state_; + committed_trans_version_ = trans_state.trans_version_; + can_read = (ObTxData::ABORT == state ? false :trans_state.can_read_); + } + return state; +} + +int ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state_from_tx_table( + const transaction::ObTransID &read_trans_id, + const transaction::ObTxSEQ &sql_seq, + int64_t &state, + bool &can_read) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - bool is_cached = false; can_read = false; - compaction::ObMergeCachedTransState trans_state; - if (OB_NOT_NULL(context_->trans_state_mgr_) && - OB_SUCCESS == context_->trans_state_mgr_->get_trans_state(trans_id, sql_seq, trans_state)) { - can_read = trans_state.can_read_; + SCN scn_commit_trans_version = SCN::max_scn(); + storage::ObTxTableGuards &tx_table_guards = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guards(); + if (OB_FAIL(tx_table_guards.get_tx_state_with_scn( + read_trans_id, context_->merge_scn_, state, scn_commit_trans_version))) { + LOG_WARN("get transaction status failed", K(ret), K(read_trans_id), K(state)); } else { - storage::ObTxTableGuards &tx_table_guards = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guards(); - int64_t cost_time = common::ObClockGenerator::getClock(); - if (OB_FAIL(tx_table_guards.check_sql_sequence_can_read( - trans_id, - sql_seq, - sstable_->get_end_scn(), - can_read))) { - LOG_WARN("check sql sequence can read failed", K(ret), K(can_read), K(trans_id), K(sql_seq)); - } else if (OB_NOT_NULL(context_->trans_state_mgr_) && - OB_TMP_FAIL(context_->trans_state_mgr_->add_trans_state(trans_id, sql_seq, - committed_trans_version_, last_trans_state_, can_read))) { - LOG_WARN("failed to add minor trans state", K(tmp_ret), K(trans_id), K(sql_seq), K(can_read)); - } - if (REACH_TENANT_TIME_INTERVAL(30 * 1000 * 1000 /*30s*/)) { - cost_time = common::ObClockGenerator::getClock() - cost_time; - if (cost_time > 10 * 1000 /*10ms*/) { - LOG_INFO("multi-ver minor row scanner check seq", K(ret), K(cost_time)); + committed_trans_version_ = scn_commit_trans_version.get_val_for_tx(); + last_trans_state_ = state; + if (ObTxData::ABORT != state) { // check sql seq can read for RUNNING/COMMIT trans + int64_t cost_time = common::ObClockGenerator::getClock(); + if (OB_FAIL(tx_table_guards.check_sql_sequence_can_read( + read_trans_id, + sql_seq, + sstable_->get_end_scn(), + can_read))) { + LOG_WARN("check sql sequence can read failed", K(ret), K(can_read), K(read_trans_id), K(sql_seq)); + } else if (OB_NOT_NULL(context_->trans_state_mgr_) && + OB_TMP_FAIL(context_->trans_state_mgr_->add_trans_state(read_trans_id, sql_seq, + committed_trans_version_, last_trans_state_, can_read))) { + LOG_WARN("failed to add minor trans state", K(tmp_ret), K(read_trans_id), K(sql_seq), K(can_read)); + } + if (REACH_TENANT_TIME_INTERVAL(30 * 1000 * 1000 /*30s*/)) { + cost_time = common::ObClockGenerator::getClock() - cost_time; + if (cost_time > 10 * 1000 /*10ms*/) { + LOG_INFO("multi-ver minor row scanner check seq", K(ret), K(cost_time)); + } } } } - LOG_DEBUG("cxf debug check sql sequence can read", K(ret), K(can_read), K(trans_id), K(sql_seq)); + LOG_DEBUG("cxf debug check sql sequence can read", K(ret), K(can_read), K(read_trans_id), K(sql_seq)); return ret; } diff --git a/src/storage/blocksstable/ob_micro_block_row_scanner.h b/src/storage/blocksstable/ob_micro_block_row_scanner.h index 31f5a46b58..8a497ba7b2 100644 --- a/src/storage/blocksstable/ob_micro_block_row_scanner.h +++ b/src/storage/blocksstable/ob_micro_block_row_scanner.h @@ -295,7 +295,11 @@ public: const ObMicroBlockData &block_data, const bool is_left_border, const bool is_right_border) override final; - INHERIT_TO_STRING_KV("ObMultiVersionMicroBlockRowScanner", ObIMicroBlockRowScanner, K_(read_row_direct_flag), K_(version_range), K_(is_last_multi_version_row), K_(finish_scanning_cur_rowkey)); + INHERIT_TO_STRING_KV("ObMultiVersionMicroBlockRowScanner", + ObIMicroBlockRowScanner, K_(read_row_direct_flag), + K_(version_range), K_(is_last_multi_version_row), + K_(finish_scanning_cur_rowkey)); + protected: virtual int inner_get_next_row(const ObDatumRow *&row) override; virtual void inner_reset(); @@ -339,7 +343,6 @@ private: int64_t trans_version_col_idx_; int64_t sql_sequence_col_idx_; int64_t cell_cnt_; - transaction::ObTransID trans_id_; common::ObVersionRange version_range_; bool read_row_direct_flag_; }; @@ -382,27 +385,21 @@ public: protected: virtual int inner_get_next_row(const ObDatumRow *&row) override; private: - enum ScanState{ - SCAN_START = 0, - GET_RUNNING_TRANS_ROW = 1, - PREPARE_COMMITTED_ROW_QUEUE = 2, - FILTER_ABORT_TRANS_ROW = 3, - COMPACT_COMMIT_TRANS_ROW = 4, - GET_ROW_FROM_ROW_QUEUE = 5, - LOCATE_LAST_COMMITTED_ROW = 6, - }; -private: - int get_trans_state(const transaction::ObTransID &trans_id, - int64_t &state, - int64_t &commit_trans_version); - int check_curr_row_can_read(const transaction::ObTransID &trans_id, const transaction::ObTxSEQ &sql_seq, bool &can_read); + int get_trans_state( + const transaction::ObTransID &read_trans_id, + int64_t &state, + bool &can_read); + int64_t get_trans_state_from_cache( + const transaction::ObTransID &read_trans_id, + const transaction::ObTxSEQ &sql_seq, + bool &can_read); + int get_trans_state_from_tx_table( + const transaction::ObTransID &read_trans_id, + const transaction::ObTxSEQ &sql_seq, + int64_t &state, + bool &can_read); int check_row_trans_state(bool &skip_curr_row); private: - enum RowCompactInfoIndex{ - COMPACT_FIRST_ROW = 0, - COMPACT_LAST_ROW = 1, - COMPACT_MAX_ROW, - }; // multi version int64_t trans_version_col_idx_; int64_t sql_sequence_col_idx_; diff --git a/src/storage/column_store/ob_co_merge_ctx.cpp b/src/storage/column_store/ob_co_merge_ctx.cpp index 313317481c..1f55fe029d 100644 --- a/src/storage/column_store/ob_co_merge_ctx.cpp +++ b/src/storage/column_store/ob_co_merge_ctx.cpp @@ -155,7 +155,7 @@ int ObCOTabletMergeCtx::prepare_schema() { int ret = OB_SUCCESS; - if (is_meta_major_merge(static_param_.get_merge_type())) { + if (is_meta_major_merge(get_merge_type())) { if (OB_FAIL(get_meta_compaction_info())) { LOG_WARN("failed to get meta compaction info", K(ret), KPC(this)); } diff --git a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp index 5f12eb8830..71efee7c29 100644 --- a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp @@ -1127,7 +1127,7 @@ int ObBasicTabletMergeCtx::get_meta_compaction_info() int64_t schema_version = 0; ObStorageSchema *storage_schema = nullptr; - if (OB_UNLIKELY(!is_meta_major_merge(static_param_.get_merge_type()) + if (OB_UNLIKELY(!is_meta_major_merge(get_merge_type()) || nullptr != static_param_.schema_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected static param", K(ret), K(static_param_), KPC(static_param_.schema_)); diff --git a/src/storage/compaction/ob_partition_merge_fuser.cpp b/src/storage/compaction/ob_partition_merge_fuser.cpp index 8ee1dfe71c..b9408562d8 100644 --- a/src/storage/compaction/ob_partition_merge_fuser.cpp +++ b/src/storage/compaction/ob_partition_merge_fuser.cpp @@ -142,6 +142,11 @@ int ObMergeFuser::fuse_row(MERGE_ITER_ARRAY ¯o_row_iters) return ret; } +int ObMergeFuser::make_result_row_shadow(const int64_t sql_sequence_col_idx) +{ + return ObShadowRowUtil::make_shadow_row(sql_sequence_col_idx, result_row_); +} + // fuse delete row int ObMergeFuser::fuse_delete_row( const blocksstable::ObDatumRow &del_row, diff --git a/src/storage/compaction/ob_partition_merge_fuser.h b/src/storage/compaction/ob_partition_merge_fuser.h index 093980ceab..c63d4822fe 100644 --- a/src/storage/compaction/ob_partition_merge_fuser.h +++ b/src/storage/compaction/ob_partition_merge_fuser.h @@ -63,6 +63,7 @@ public: int fuse_rows(const T& row, const Args&... args); int fuse_row(MERGE_ITER_ARRAY ¯o_row_iters); inline const blocksstable::ObDatumRow &get_result_row() const { return result_row_; } + int make_result_row_shadow(const int64_t sql_sequence_col_idx); VIRTUAL_TO_STRING_KV(K_(column_cnt), K_(result_row), K_(is_inited)); protected: int base_init(const bool is_fuse_row_flag = false); diff --git a/src/storage/compaction/ob_partition_merge_iter.cpp b/src/storage/compaction/ob_partition_merge_iter.cpp index 2e8af02dd9..a3b1a84dc6 100644 --- a/src/storage/compaction/ob_partition_merge_iter.cpp +++ b/src/storage/compaction/ob_partition_merge_iter.cpp @@ -1272,13 +1272,13 @@ bool ObPartitionMinorRowMergeIter::inner_check(const ObMergeParameter &merge_par { bool bret = true; const ObStaticMergeParam &static_param = merge_param.static_param_; - if (!is_multi_version_merge(static_param.get_merge_type()) && !compaction::is_backfill_tx_merge(static_param.get_merge_type())) { + if (OB_UNLIKELY(!is_multi_version_merge(static_param.get_merge_type()) && !compaction::is_backfill_tx_merge(static_param.get_merge_type()))) { bret = false; LOG_WARN_RET(OB_ERR_UNEXPECTED, "Unexpected merge type for minor row merge iter", K(bret), K(merge_param)); - } else if (static_param.merge_level_ != MACRO_BLOCK_MERGE_LEVEL) { + } else if (OB_UNLIKELY(static_param.merge_level_ != MACRO_BLOCK_MERGE_LEVEL)) { bret = false; LOG_WARN_RET(OB_ERR_UNEXPECTED, "Unexpected merge level for minor row merge iter", K(bret), K(merge_param)); - } else if (!table_->is_multi_version_table()) { + } else if (OB_UNLIKELY(!table_->is_multi_version_table())) { bret = false; LOG_WARN_RET(OB_ERR_UNEXPECTED, "Unexpected table type for minor row merge iter", K(bret), KPC(table_)); } @@ -1291,25 +1291,25 @@ int ObPartitionMinorRowMergeIter::common_minor_inner_init(const ObMergeParameter { int ret = OB_SUCCESS; int64_t row_column_cnt = 0; - + void *buf = nullptr; check_committing_trans_compacted_ = true; - if (OB_FAIL(merge_param.get_schema()->get_store_column_count(row_column_cnt, true))) { + if (OB_FAIL(merge_param.get_schema()->get_stored_column_count_in_sstable(row_column_cnt))) { LOG_WARN("Failed to get full store column count", K(ret)); - } else if (OB_FAIL(row_queue_.init(row_column_cnt + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) { + } else if (OB_FAIL(row_queue_.init(row_column_cnt))) { LOG_WARN("failed to init row_queue", K(ret), K(row_column_cnt)); + } else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObNopPos) * CRI_MAX))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(ERROR, "Failed to alloc memory for noppos", K(ret)); } else { // read flat row - void *buf = nullptr; + char *buf_pos = (char *)buf; for (int i = 0; OB_SUCC(ret) && i < CRI_MAX; ++i) { // init nop pos - if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObNopPos)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(ERROR, "Failed to alloc memory for noppos", K(ret)); + nop_pos_[i] = new (buf_pos) ObNopPos(); + if (OB_FAIL(nop_pos_[i]->init(allocator_, OB_ROW_MAX_COLUMNS_COUNT))) { + LOG_WARN("failed to init first row nop pos", K(ret)); } else { - nop_pos_[i] = new (buf) ObNopPos(); - if (OB_FAIL(nop_pos_[i]->init(allocator_, OB_ROW_MAX_COLUMNS_COUNT))) { - LOG_WARN("failed to init first row nop pos", K(ret)); - } + buf_pos += sizeof(ObNopPos); } - } + } // end of for } return ret; } @@ -1320,22 +1320,18 @@ int ObPartitionMinorRowMergeIter::inner_init(const ObMergeParameter &merge_param if (OB_FAIL(common_minor_inner_init(merge_param))) { LOG_WARN("Failed to do commont minor inner init", K(ret), K(merge_param)); - } else if (table_->is_data_memtable()) { - if (OB_UNLIKELY(!is_mini_merge(merge_param.static_param_.get_merge_type()))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("Unexpected memtable for mini minor merge", K(ret), K(merge_param), KPC(table_)); - } - } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(table_->scan(access_param_.iter_param_, access_context_, merge_range_, - row_iter_))) { + } else if (OB_UNLIKELY(NULL == table_ + || (table_->is_data_memtable() && !is_mini_merge(merge_param.static_param_.get_merge_type())))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("Unexpected memtable for mini minor merge", K(ret), K(merge_param), KPC(table_)); + } else if (OB_FAIL(table_->scan(access_param_.iter_param_, access_context_, + merge_range_, row_iter_))) { LOG_WARN("Fail to init row iter for table", K(ret), KPC(table_), - K_(merge_range), K_(access_context), K_(access_param)); + K_(merge_range), K_(access_context), K_(access_param)); } else if (OB_ISNULL(row_iter_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpceted null row iter for sstable", K(ret), K(*this)); } - return ret; } @@ -1417,11 +1413,8 @@ int ObPartitionMinorRowMergeIter::check_meet_another_trans() LOG_WARN("Unexpected row queue", K(ret), K(row_queue_.count()), KPC(row_queue_.get_first()), KPC(this)); } else if (OB_FAIL(row_queue_.add_row(*first_row, obj_copy_allocator_))) { LOG_WARN("failed to add row queue", K(ret), KPC(first_row), K(row_queue_)); - } else { - int64_t sql_sequence_col_idx = schema_rowkey_column_cnt_ + 1; - first_row->storage_datums_[sql_sequence_col_idx].reuse(); - first_row->storage_datums_[sql_sequence_col_idx].set_int(-INT64_MAX); - first_row->set_shadow_row(); + } else if (OB_FAIL(ObShadowRowUtil::make_shadow_row(schema_rowkey_column_cnt_ + 1/*sql_sequence_col_idx*/, *first_row))) { + LOG_WARN("failed to make shadow row", K(ret), KPC(first_row), K_(schema_rowkey_column_cnt)); } } @@ -1448,7 +1441,7 @@ int ObPartitionMinorRowMergeIter::compact_old_row() LOG_WARN("Failed to compact first row", K(ret)); } if (OB_FAIL(ret)) { - } else if (curr_row_->is_last_multi_version_row()) { + } else if (curr_row_->is_last_multi_version_row()) { // meet L flag row_queue_.get_last()->set_last_multi_version_row(); if (OB_FAIL(row_queue_.get_next_row(curr_row_))) { LOG_WARN("Failed to get next row from row_queue", K(ret)); @@ -1457,8 +1450,7 @@ int ObPartitionMinorRowMergeIter::compact_old_row() } else if (OB_FAIL(inner_next(true /*open_macro*/))) { LOG_WARN("Failed to inner next for compact first row", K(ret)); } - } - + } // end of while return ret; } @@ -1521,13 +1513,15 @@ int ObPartitionMinorRowMergeIter::try_make_committing_trans_compacted() } else if (OB_ISNULL(curr_row_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected null current row", K(ret), K(*this)); - } else if (OB_FAIL(check_meet_another_trans())) { + } else if (OB_FAIL(check_meet_another_trans())) { // will add empty row for different trans LOG_WARN("Fail to check meet another trans", K(ret), KPC_(curr_row), KPC(this)); } else if (OB_FAIL(compact_border_row(false/*last_row*/))) { LOG_WARN("Failed to compact first row", K(ret)); } else if (curr_row_->is_shadow_row()) { // continue } else if (OB_UNLIKELY(2 == row_queue_.count())) { + // two trans row, row queue will have > 2 rows [shadow_row / trans_A row / empty row for trans_B] + // one trans row, row queue will have 1 row [trans_A row] ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected row queue", K(ret), K(row_queue_.count()), KPC(this)); } else if (row_queue_.count() > 1 && OB_FAIL(compact_border_row(true /*last_row */))) { @@ -1535,13 +1529,12 @@ int ObPartitionMinorRowMergeIter::try_make_committing_trans_compacted() LOG_WARN("Failed to compact current row to last row", K(ret)); } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(check_compact_finish(compact_finish))) { + if (FAILEDx(check_compact_finish(compact_finish))) { LOG_WARN("Failed to check compact finish", K(ret)); } else if (curr_row_->is_last_multi_version_row()) { check_committing_trans_compacted_ = true; } - } + } // end of while if (OB_SUCC(ret)) { LOG_DEBUG("make committing trans compacted", K(ret), KPC(curr_row_), @@ -1587,10 +1580,10 @@ int ObPartitionMinorRowMergeIter::next() ret = OB_ITER_END; } else if (OB_LIKELY(curr_row_ != nullptr)) { is_rowkey_first_row_already_output_ = !curr_row_->is_last_multi_version_row(); + curr_row_ = nullptr; } if (OB_FAIL(ret)) { - } else if (FALSE_IT(curr_row_ = nullptr)) { } else if (row_queue_.has_next()) { // get row from row_queue if (OB_FAIL(row_queue_.get_next_row(curr_row_))) { LOG_WARN("Failed to get next row from row_queue", K(ret)); @@ -1615,7 +1608,6 @@ int ObPartitionMinorRowMergeIter::next() } else if (OB_FAIL(try_make_committing_trans_compacted())) { LOG_WARN("Failed to make committing trans compacted", K(ret), K(*this)); } - if (OB_SUCC(ret) && curr_row_ != nullptr && curr_row_->is_last_multi_version_row()) { check_committing_trans_compacted_ = true; } @@ -1654,8 +1646,8 @@ int ObPartitionMinorRowMergeIter::compare_multi_version_col(const ObPartitionMer LOG_WARN("Unexpected column cnt to compare multi version col", K(ret), KPC(curr_row_), KPC(other.get_curr_row())); } else { - int64_t multi_value = curr_row_->storage_datums_[multi_version_col].get_int(); - int64_t other_multi_value = other.get_curr_row()->storage_datums_[multi_version_col].get_int(); + const int64_t multi_value = curr_row_->storage_datums_[multi_version_col].get_int(); + const int64_t other_multi_value = other.get_curr_row()->storage_datums_[multi_version_col].get_int(); if (multi_value < other_multi_value) { cmp_ret = -1; } else if (multi_value > other_multi_value) { @@ -1796,8 +1788,8 @@ int ObPartitionMinorMacroMergeIter::inner_init(const ObMergeParameter &merge_par } else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObSSTableRowWholeScanner)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Failed to alloc memory for minor merge row scanner", K(ret)); - } else if (FALSE_IT(row_iter_ = new (buf) ObSSTableRowWholeScanner())) { } else { + row_iter_ = new (buf) ObSSTableRowWholeScanner(); macro_block_opened_ = false; last_macro_block_reused_ = -1; last_macro_block_recycled_ = false; @@ -1812,7 +1804,7 @@ int ObPartitionMinorMacroMergeIter::inner_init(const ObMergeParameter &merge_par false, /* reverse scan */ false, /* need micro info */ true /* need secondary meta */))) { - LOG_WARN("Fail to scan macro block", K(ret)); + LOG_WARN("Fail to scan macro block", K(ret)); } } diff --git a/src/storage/compaction/ob_partition_merger.cpp b/src/storage/compaction/ob_partition_merger.cpp index 1d6ef7ce1b..2ba8e0bee1 100644 --- a/src/storage/compaction/ob_partition_merger.cpp +++ b/src/storage/compaction/ob_partition_merger.cpp @@ -1287,17 +1287,13 @@ int ObPartitionMinorMerger::set_result_flag(MERGE_ITER_ARRAY &fuse_iters, } } } - if (OB_SUCC(ret) && add_shadow_row) { - const ObDatumRow &result_row = partition_fuser_->get_result_row(); - row_flag.set_shadow_row(true); - int64_t sql_sequence_col_idx = data_store_desc_.get_schema_rowkey_col_cnt() + 1; - result_row.storage_datums_[sql_sequence_col_idx].reuse(); - result_row.storage_datums_[sql_sequence_col_idx].set_int(-INT64_MAX); - } - - if (OB_FAIL(ret)) { - } else if (OB_FAIL(partition_fuser_->set_multi_version_flag(row_flag))) { + if (FAILEDx(partition_fuser_->set_multi_version_flag(row_flag))) { STORAGE_LOG(WARN, "Failed to set multi version row flag and dml", K(ret)); + } else if (add_shadow_row && OB_FAIL(partition_fuser_->make_result_row_shadow( + data_store_desc_.get_schema_rowkey_col_cnt() + 1 /*sql_sequence_col_idx*/))) { + LOG_WARN("failed to make shadow row", K(ret), + "result_row", partition_fuser_->get_result_row(), + "sql_seq_col_idx", data_store_desc_.get_schema_rowkey_col_cnt() + 1); } else { STORAGE_LOG(DEBUG, "succ to set multi version row flag and dml", K(partition_fuser_->get_result_row()), K(row_flag), KPC(base_row)); diff --git a/src/storage/compaction/ob_sstable_builder.cpp b/src/storage/compaction/ob_sstable_builder.cpp index a0f7ac0382..64af5d4a22 100644 --- a/src/storage/compaction/ob_sstable_builder.cpp +++ b/src/storage/compaction/ob_sstable_builder.cpp @@ -176,6 +176,7 @@ int ObSSTableBuilder::build_sstable_merge_res( { int ret = OB_SUCCESS; ObSEArray macro_id_array; + macro_id_array.set_attr(ObMemAttr(MTL_ID(), "sstBuilder", ObCtxIds::MERGE_NORMAL_CTX_ID)); blocksstable::ObSSTableIndexBuilder::ObMacroMetaIter iter; int64_t multiplexed_macro_block_count = 0; @@ -314,7 +315,7 @@ int ObSSTableBuilder::rebuild_macro_block(const ObIArrayis_valid())) { diff --git a/src/storage/compaction/ob_sstable_builder.h b/src/storage/compaction/ob_sstable_builder.h index e76b38dc33..a38f6ff5c0 100644 --- a/src/storage/compaction/ob_sstable_builder.h +++ b/src/storage/compaction/ob_sstable_builder.h @@ -117,7 +117,6 @@ private: blocksstable::ObMacroBlockWriter macro_writer_; const ObITableReadInfo *index_read_info_; static const int64_t REBUILD_MACRO_BLOCK_THRESOLD = 20; - static const int64_t MAX_TABLE_CNT = 4097; static const int64_t DEFAULT_MACRO_ID_COUNT = 32; }; diff --git a/src/storage/memtable/ob_memtable_iterator.cpp b/src/storage/memtable/ob_memtable_iterator.cpp index 7d31f77737..dd19902b99 100644 --- a/src/storage/memtable/ob_memtable_iterator.cpp +++ b/src/storage/memtable/ob_memtable_iterator.cpp @@ -995,8 +995,9 @@ int ObMemtableMultiVersionScanIterator::set_compacted_row_state(const bool add_s row_.row_flag_.fuse_flag(value_iter_->get_row_first_dml_flag()); row_.mvcc_row_flag_.set_last_multi_version_row(value_iter_->is_multi_version_iter_end()); if (add_shadow_row) { - row_.set_shadow_row(); - row_.storage_datums_[sql_sequence_col_idx_].set_int(-INT64_MAX); + if (OB_FAIL(ObShadowRowUtil::make_shadow_row(sql_sequence_col_idx_, row_))) { + LOG_WARN("failed to make shadow row", K(ret), K(row_), K_(sql_sequence_col_idx)); + } } else { // sql_sequence of committed data is 0 row_.storage_datums_[sql_sequence_col_idx_].set_int(0);