add comment & func for minor merge

This commit is contained in:
yangqise7en
2024-04-28 14:50:01 +00:00
committed by ob-robot
parent db22f4f1b1
commit 5de7a177d4
13 changed files with 165 additions and 145 deletions

View File

@ -835,7 +835,6 @@ int ObGhostRowUtil::is_ghost_row(
int ObGhostRowUtil::make_ghost_row(
const int64_t sql_sequence_col_idx,
const ObQueryFlag &query_flag,
blocksstable::ObDatumRow &row)
{
int ret = OB_SUCCESS;
@ -859,6 +858,22 @@ int ObGhostRowUtil::make_ghost_row(
return ret;
}
int ObShadowRowUtil::make_shadow_row(const int64_t sql_sequence_col_idx,
blocksstable::ObDatumRow &row)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(((row.mvcc_row_flag_.is_uncommitted_row() || row.trans_id_.is_valid()))
|| row.get_column_count() < sql_sequence_col_idx)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(row), K(sql_sequence_col_idx));
} else {
row.storage_datums_[sql_sequence_col_idx].reuse();
row.storage_datums_[sql_sequence_col_idx].set_int(-INT64_MAX);
row.set_shadow_row();
}
return ret;
}
void format_dml_str(const int32_t flag, char *str, int len) {
OB_ASSERT(len >= 16);
int32_t bit;

View File

@ -820,12 +820,20 @@ public:
~ObGhostRowUtil() = delete;
static int make_ghost_row(
const int64_t sql_sequence_col_idx,
const common::ObQueryFlag &query_flag,
blocksstable::ObDatumRow &row);
static int is_ghost_row(const blocksstable::ObMultiVersionRowFlag &flag, bool &is_ghost_row);
static const int64_t GHOST_NUM = INT64_MAX;
};
struct ObShadowRowUtil {
public:
ObShadowRowUtil() = delete;
~ObShadowRowUtil() = delete;
static int make_shadow_row(
const int64_t sql_sequence_col_idx,
blocksstable::ObDatumRow &row);
};
struct ObSqlDatumInfo {
public:
ObSqlDatumInfo() :

View File

@ -1925,14 +1925,13 @@ int ObMultiVersionMicroBlockRowScanner::get_store_rowkey(ObStoreRowkey &store_ro
return ret;
}
////////////////////////////// ObMultiVersionMicroBlockMinorMergeRowScannerV2 //////////////////////////////
////////////////////////////// ObMultiVersionMicroBlockMinorMergeRowScanner //////////////////////////////
void ObMultiVersionMicroBlockMinorMergeRowScanner::reuse()
{
row_.row_flag_.set_flag(ObDmlFlag::DF_NOT_EXIST);
ObIMicroBlockRowScanner::reuse();
}
// The scanner of the same sstable is shared, the previous state needs to be kept, so clear_status cannot be called
int ObMultiVersionMicroBlockMinorMergeRowScanner::init(
const ObTableIterParam &param,
ObTableAccessContext &context,
@ -2000,7 +1999,7 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::inner_get_next_row(const ObDat
} else if (FALSE_IT(++current_)) {
} else if (skip_curr_row) {
if (row_.is_last_multi_version_row()) {
if (OB_FAIL(ObGhostRowUtil::make_ghost_row(sql_sequence_col_idx_, context_->query_flag_, row_))) {
if (OB_FAIL(ObGhostRowUtil::make_ghost_row(sql_sequence_col_idx_, row_))) {
LOG_WARN("failed to make ghost row", K(ret), K(row_));
} else {
break;
@ -2020,7 +2019,7 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::inner_get_next_row(const ObDat
LOG_ERROR("row is invalid", KPC(row));
} else if (FALSE_IT(row = &row_)) {
} else if (row_.row_flag_.is_delete() && !row_.mvcc_row_flag_.is_uncommitted_row()) {
// set delete/insert committed row compacted
// set delete committed row compacted
row_.set_compacted_multi_version_row();
}
return ret;
@ -2031,26 +2030,11 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::check_row_trans_state(bool &sk
int ret = OB_SUCCESS;
skip_curr_row = false;
if (row_.mvcc_row_flag_.is_uncommitted_row()) {
const transaction::ObTransID read_trans_id = row_.get_trans_id();
const int64_t sql_sequence = -row_.storage_datums_[sql_sequence_col_idx_].get_int();
bool can_read = false;
//get trans status & committed_trans_version_
int64_t state;
compaction::ObMergeCachedTransState trans_state;
const transaction::ObTxSEQ tx_sequence = transaction::ObTxSEQ::cast_from_int(sql_sequence);
if (OB_NOT_NULL(context_->trans_state_mgr_) &&
OB_SUCCESS == context_->trans_state_mgr_->get_trans_state(read_trans_id, tx_sequence, trans_state)) {
state = trans_state.trans_state_;
last_trans_state_ = trans_state.trans_state_;
committed_trans_version_ = trans_state.trans_version_;
} else if (OB_FAIL(get_trans_state(read_trans_id, state, committed_trans_version_))) {
LOG_WARN("get transaction status failed", K(ret), K(read_trans_id), K(state));
}
if (OB_FAIL(ret)) {
} else if (state != ObTxData::ABORT
&& OB_FAIL(check_curr_row_can_read(read_trans_id, tx_sequence, can_read))) {
LOG_WARN("micro block reader fail to get row.", K(ret), K_(macro_id));
int64_t state = ObTxData::MAX_STATE_CNT;
const transaction::ObTransID &read_trans_id = row_.trans_id_;
if (OB_FAIL(get_trans_state(read_trans_id, state, can_read))) { // will get committed_trans_version_ & last_trans_state_
LOG_WARN("get transaction status failed", K(ret), "trans_id", row_.get_trans_id(), K(state));
} else if (!can_read) {
skip_curr_row = true;
} else {
@ -2084,59 +2068,80 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::check_row_trans_state(bool &sk
return ret;
}
int ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state(const transaction::ObTransID &trans_id,
int64_t &state,
int64_t &commit_trans_version)
int ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state(
const transaction::ObTransID &read_trans_id,
int64_t &state,
bool &can_read)
{
int ret = OB_SUCCESS;
// get trans status & committed_trans_version_
SCN scn_commit_trans_version = SCN::max_scn();
auto &tx_table_guards = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guards();
if (OB_FAIL(tx_table_guards.get_tx_state_with_scn(
trans_id, context_->merge_scn_, state, scn_commit_trans_version))) {
LOG_WARN("get transaction status failed", K(ret), K(trans_id), K(state));
} else {
commit_trans_version = scn_commit_trans_version.get_val_for_tx();
last_trans_state_ = state;
can_read = false;
const int64_t sql_sequence = -row_.storage_datums_[sql_sequence_col_idx_].get_int();
const transaction::ObTxSEQ tx_sequence = transaction::ObTxSEQ::cast_from_int(sql_sequence);
state = get_trans_state_from_cache(read_trans_id, tx_sequence, can_read);
if (ObTxCommitData::MAX_STATE_CNT == state
&& OB_FAIL(get_trans_state_from_tx_table(read_trans_id, tx_sequence, state, can_read))) {
LOG_WARN("failed to get trans state from tx table", KR(ret), "trans_id", row_.get_trans_id(), K(tx_sequence));
}
return ret;
}
int ObMultiVersionMicroBlockMinorMergeRowScanner::check_curr_row_can_read(
const transaction::ObTransID &trans_id,
const transaction::ObTxSEQ &sql_seq,
bool &can_read)
int64_t ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state_from_cache(
const transaction::ObTransID &read_trans_id,
const transaction::ObTxSEQ &tx_sequence,
bool &can_read)
{
int64_t state = ObTxCommitData::MAX_STATE_CNT;
compaction::ObMergeCachedTransState trans_state;
if (OB_NOT_NULL(context_->trans_state_mgr_) &&
OB_SUCCESS == context_->trans_state_mgr_->get_trans_state(
read_trans_id, tx_sequence, trans_state)) {
state = trans_state.trans_state_;
last_trans_state_ = trans_state.trans_state_;
committed_trans_version_ = trans_state.trans_version_;
can_read = (ObTxData::ABORT == state ? false :trans_state.can_read_);
}
return state;
}
int ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state_from_tx_table(
const transaction::ObTransID &read_trans_id,
const transaction::ObTxSEQ &sql_seq,
int64_t &state,
bool &can_read)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool is_cached = false;
can_read = false;
compaction::ObMergeCachedTransState trans_state;
if (OB_NOT_NULL(context_->trans_state_mgr_) &&
OB_SUCCESS == context_->trans_state_mgr_->get_trans_state(trans_id, sql_seq, trans_state)) {
can_read = trans_state.can_read_;
SCN scn_commit_trans_version = SCN::max_scn();
storage::ObTxTableGuards &tx_table_guards = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guards();
if (OB_FAIL(tx_table_guards.get_tx_state_with_scn(
read_trans_id, context_->merge_scn_, state, scn_commit_trans_version))) {
LOG_WARN("get transaction status failed", K(ret), K(read_trans_id), K(state));
} else {
storage::ObTxTableGuards &tx_table_guards = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guards();
int64_t cost_time = common::ObClockGenerator::getClock();
if (OB_FAIL(tx_table_guards.check_sql_sequence_can_read(
trans_id,
sql_seq,
sstable_->get_end_scn(),
can_read))) {
LOG_WARN("check sql sequence can read failed", K(ret), K(can_read), K(trans_id), K(sql_seq));
} else if (OB_NOT_NULL(context_->trans_state_mgr_) &&
OB_TMP_FAIL(context_->trans_state_mgr_->add_trans_state(trans_id, sql_seq,
committed_trans_version_, last_trans_state_, can_read))) {
LOG_WARN("failed to add minor trans state", K(tmp_ret), K(trans_id), K(sql_seq), K(can_read));
}
if (REACH_TENANT_TIME_INTERVAL(30 * 1000 * 1000 /*30s*/)) {
cost_time = common::ObClockGenerator::getClock() - cost_time;
if (cost_time > 10 * 1000 /*10ms*/) {
LOG_INFO("multi-ver minor row scanner check seq", K(ret), K(cost_time));
committed_trans_version_ = scn_commit_trans_version.get_val_for_tx();
last_trans_state_ = state;
if (ObTxData::ABORT != state) { // check sql seq can read for RUNNING/COMMIT trans
int64_t cost_time = common::ObClockGenerator::getClock();
if (OB_FAIL(tx_table_guards.check_sql_sequence_can_read(
read_trans_id,
sql_seq,
sstable_->get_end_scn(),
can_read))) {
LOG_WARN("check sql sequence can read failed", K(ret), K(can_read), K(read_trans_id), K(sql_seq));
} else if (OB_NOT_NULL(context_->trans_state_mgr_) &&
OB_TMP_FAIL(context_->trans_state_mgr_->add_trans_state(read_trans_id, sql_seq,
committed_trans_version_, last_trans_state_, can_read))) {
LOG_WARN("failed to add minor trans state", K(tmp_ret), K(read_trans_id), K(sql_seq), K(can_read));
}
if (REACH_TENANT_TIME_INTERVAL(30 * 1000 * 1000 /*30s*/)) {
cost_time = common::ObClockGenerator::getClock() - cost_time;
if (cost_time > 10 * 1000 /*10ms*/) {
LOG_INFO("multi-ver minor row scanner check seq", K(ret), K(cost_time));
}
}
}
}
LOG_DEBUG("cxf debug check sql sequence can read", K(ret), K(can_read), K(trans_id), K(sql_seq));
LOG_DEBUG("cxf debug check sql sequence can read", K(ret), K(can_read), K(read_trans_id), K(sql_seq));
return ret;
}

View File

@ -295,7 +295,11 @@ public:
const ObMicroBlockData &block_data,
const bool is_left_border,
const bool is_right_border) override final;
INHERIT_TO_STRING_KV("ObMultiVersionMicroBlockRowScanner", ObIMicroBlockRowScanner, K_(read_row_direct_flag), K_(version_range), K_(is_last_multi_version_row), K_(finish_scanning_cur_rowkey));
INHERIT_TO_STRING_KV("ObMultiVersionMicroBlockRowScanner",
ObIMicroBlockRowScanner, K_(read_row_direct_flag),
K_(version_range), K_(is_last_multi_version_row),
K_(finish_scanning_cur_rowkey));
protected:
virtual int inner_get_next_row(const ObDatumRow *&row) override;
virtual void inner_reset();
@ -339,7 +343,6 @@ private:
int64_t trans_version_col_idx_;
int64_t sql_sequence_col_idx_;
int64_t cell_cnt_;
transaction::ObTransID trans_id_;
common::ObVersionRange version_range_;
bool read_row_direct_flag_;
};
@ -382,27 +385,21 @@ public:
protected:
virtual int inner_get_next_row(const ObDatumRow *&row) override;
private:
enum ScanState{
SCAN_START = 0,
GET_RUNNING_TRANS_ROW = 1,
PREPARE_COMMITTED_ROW_QUEUE = 2,
FILTER_ABORT_TRANS_ROW = 3,
COMPACT_COMMIT_TRANS_ROW = 4,
GET_ROW_FROM_ROW_QUEUE = 5,
LOCATE_LAST_COMMITTED_ROW = 6,
};
private:
int get_trans_state(const transaction::ObTransID &trans_id,
int64_t &state,
int64_t &commit_trans_version);
int check_curr_row_can_read(const transaction::ObTransID &trans_id, const transaction::ObTxSEQ &sql_seq, bool &can_read);
int get_trans_state(
const transaction::ObTransID &read_trans_id,
int64_t &state,
bool &can_read);
int64_t get_trans_state_from_cache(
const transaction::ObTransID &read_trans_id,
const transaction::ObTxSEQ &sql_seq,
bool &can_read);
int get_trans_state_from_tx_table(
const transaction::ObTransID &read_trans_id,
const transaction::ObTxSEQ &sql_seq,
int64_t &state,
bool &can_read);
int check_row_trans_state(bool &skip_curr_row);
private:
enum RowCompactInfoIndex{
COMPACT_FIRST_ROW = 0,
COMPACT_LAST_ROW = 1,
COMPACT_MAX_ROW,
};
// multi version
int64_t trans_version_col_idx_;
int64_t sql_sequence_col_idx_;

View File

@ -155,7 +155,7 @@ int ObCOTabletMergeCtx::prepare_schema()
{
int ret = OB_SUCCESS;
if (is_meta_major_merge(static_param_.get_merge_type())) {
if (is_meta_major_merge(get_merge_type())) {
if (OB_FAIL(get_meta_compaction_info())) {
LOG_WARN("failed to get meta compaction info", K(ret), KPC(this));
}

View File

@ -1127,7 +1127,7 @@ int ObBasicTabletMergeCtx::get_meta_compaction_info()
int64_t schema_version = 0;
ObStorageSchema *storage_schema = nullptr;
if (OB_UNLIKELY(!is_meta_major_merge(static_param_.get_merge_type())
if (OB_UNLIKELY(!is_meta_major_merge(get_merge_type())
|| nullptr != static_param_.schema_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected static param", K(ret), K(static_param_), KPC(static_param_.schema_));

View File

@ -142,6 +142,11 @@ int ObMergeFuser::fuse_row(MERGE_ITER_ARRAY &macro_row_iters)
return ret;
}
int ObMergeFuser::make_result_row_shadow(const int64_t sql_sequence_col_idx)
{
return ObShadowRowUtil::make_shadow_row(sql_sequence_col_idx, result_row_);
}
// fuse delete row
int ObMergeFuser::fuse_delete_row(
const blocksstable::ObDatumRow &del_row,

View File

@ -63,6 +63,7 @@ public:
int fuse_rows(const T& row, const Args&... args);
int fuse_row(MERGE_ITER_ARRAY &macro_row_iters);
inline const blocksstable::ObDatumRow &get_result_row() const { return result_row_; }
int make_result_row_shadow(const int64_t sql_sequence_col_idx);
VIRTUAL_TO_STRING_KV(K_(column_cnt), K_(result_row), K_(is_inited));
protected:
int base_init(const bool is_fuse_row_flag = false);

View File

@ -1272,13 +1272,13 @@ bool ObPartitionMinorRowMergeIter::inner_check(const ObMergeParameter &merge_par
{
bool bret = true;
const ObStaticMergeParam &static_param = merge_param.static_param_;
if (!is_multi_version_merge(static_param.get_merge_type()) && !compaction::is_backfill_tx_merge(static_param.get_merge_type())) {
if (OB_UNLIKELY(!is_multi_version_merge(static_param.get_merge_type()) && !compaction::is_backfill_tx_merge(static_param.get_merge_type()))) {
bret = false;
LOG_WARN_RET(OB_ERR_UNEXPECTED, "Unexpected merge type for minor row merge iter", K(bret), K(merge_param));
} else if (static_param.merge_level_ != MACRO_BLOCK_MERGE_LEVEL) {
} else if (OB_UNLIKELY(static_param.merge_level_ != MACRO_BLOCK_MERGE_LEVEL)) {
bret = false;
LOG_WARN_RET(OB_ERR_UNEXPECTED, "Unexpected merge level for minor row merge iter", K(bret), K(merge_param));
} else if (!table_->is_multi_version_table()) {
} else if (OB_UNLIKELY(!table_->is_multi_version_table())) {
bret = false;
LOG_WARN_RET(OB_ERR_UNEXPECTED, "Unexpected table type for minor row merge iter", K(bret), KPC(table_));
}
@ -1291,25 +1291,25 @@ int ObPartitionMinorRowMergeIter::common_minor_inner_init(const ObMergeParameter
{
int ret = OB_SUCCESS;
int64_t row_column_cnt = 0;
void *buf = nullptr;
check_committing_trans_compacted_ = true;
if (OB_FAIL(merge_param.get_schema()->get_store_column_count(row_column_cnt, true))) {
if (OB_FAIL(merge_param.get_schema()->get_stored_column_count_in_sstable(row_column_cnt))) {
LOG_WARN("Failed to get full store column count", K(ret));
} else if (OB_FAIL(row_queue_.init(row_column_cnt + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
} else if (OB_FAIL(row_queue_.init(row_column_cnt))) {
LOG_WARN("failed to init row_queue", K(ret), K(row_column_cnt));
} else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObNopPos) * CRI_MAX))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(ERROR, "Failed to alloc memory for noppos", K(ret));
} else { // read flat row
void *buf = nullptr;
char *buf_pos = (char *)buf;
for (int i = 0; OB_SUCC(ret) && i < CRI_MAX; ++i) { // init nop pos
if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObNopPos)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(ERROR, "Failed to alloc memory for noppos", K(ret));
nop_pos_[i] = new (buf_pos) ObNopPos();
if (OB_FAIL(nop_pos_[i]->init(allocator_, OB_ROW_MAX_COLUMNS_COUNT))) {
LOG_WARN("failed to init first row nop pos", K(ret));
} else {
nop_pos_[i] = new (buf) ObNopPos();
if (OB_FAIL(nop_pos_[i]->init(allocator_, OB_ROW_MAX_COLUMNS_COUNT))) {
LOG_WARN("failed to init first row nop pos", K(ret));
}
buf_pos += sizeof(ObNopPos);
}
}
} // end of for
}
return ret;
}
@ -1320,22 +1320,18 @@ int ObPartitionMinorRowMergeIter::inner_init(const ObMergeParameter &merge_param
if (OB_FAIL(common_minor_inner_init(merge_param))) {
LOG_WARN("Failed to do commont minor inner init", K(ret), K(merge_param));
} else if (table_->is_data_memtable()) {
if (OB_UNLIKELY(!is_mini_merge(merge_param.static_param_.get_merge_type()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected memtable for mini minor merge", K(ret), K(merge_param), KPC(table_));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(table_->scan(access_param_.iter_param_, access_context_, merge_range_,
row_iter_))) {
} else if (OB_UNLIKELY(NULL == table_
|| (table_->is_data_memtable() && !is_mini_merge(merge_param.static_param_.get_merge_type())))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected memtable for mini minor merge", K(ret), K(merge_param), KPC(table_));
} else if (OB_FAIL(table_->scan(access_param_.iter_param_, access_context_,
merge_range_, row_iter_))) {
LOG_WARN("Fail to init row iter for table", K(ret), KPC(table_),
K_(merge_range), K_(access_context), K_(access_param));
K_(merge_range), K_(access_context), K_(access_param));
} else if (OB_ISNULL(row_iter_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpceted null row iter for sstable", K(ret), K(*this));
}
return ret;
}
@ -1417,11 +1413,8 @@ int ObPartitionMinorRowMergeIter::check_meet_another_trans()
LOG_WARN("Unexpected row queue", K(ret), K(row_queue_.count()), KPC(row_queue_.get_first()), KPC(this));
} else if (OB_FAIL(row_queue_.add_row(*first_row, obj_copy_allocator_))) {
LOG_WARN("failed to add row queue", K(ret), KPC(first_row), K(row_queue_));
} else {
int64_t sql_sequence_col_idx = schema_rowkey_column_cnt_ + 1;
first_row->storage_datums_[sql_sequence_col_idx].reuse();
first_row->storage_datums_[sql_sequence_col_idx].set_int(-INT64_MAX);
first_row->set_shadow_row();
} else if (OB_FAIL(ObShadowRowUtil::make_shadow_row(schema_rowkey_column_cnt_ + 1/*sql_sequence_col_idx*/, *first_row))) {
LOG_WARN("failed to make shadow row", K(ret), KPC(first_row), K_(schema_rowkey_column_cnt));
}
}
@ -1448,7 +1441,7 @@ int ObPartitionMinorRowMergeIter::compact_old_row()
LOG_WARN("Failed to compact first row", K(ret));
}
if (OB_FAIL(ret)) {
} else if (curr_row_->is_last_multi_version_row()) {
} else if (curr_row_->is_last_multi_version_row()) { // meet L flag
row_queue_.get_last()->set_last_multi_version_row();
if (OB_FAIL(row_queue_.get_next_row(curr_row_))) {
LOG_WARN("Failed to get next row from row_queue", K(ret));
@ -1457,8 +1450,7 @@ int ObPartitionMinorRowMergeIter::compact_old_row()
} else if (OB_FAIL(inner_next(true /*open_macro*/))) {
LOG_WARN("Failed to inner next for compact first row", K(ret));
}
}
} // end of while
return ret;
}
@ -1521,13 +1513,15 @@ int ObPartitionMinorRowMergeIter::try_make_committing_trans_compacted()
} else if (OB_ISNULL(curr_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null current row", K(ret), K(*this));
} else if (OB_FAIL(check_meet_another_trans())) {
} else if (OB_FAIL(check_meet_another_trans())) { // will add empty row for different trans
LOG_WARN("Fail to check meet another trans", K(ret), KPC_(curr_row), KPC(this));
} else if (OB_FAIL(compact_border_row(false/*last_row*/))) {
LOG_WARN("Failed to compact first row", K(ret));
} else if (curr_row_->is_shadow_row()) {
// continue
} else if (OB_UNLIKELY(2 == row_queue_.count())) {
// two trans row, row queue will have > 2 rows [shadow_row / trans_A row / empty row for trans_B]
// one trans row, row queue will have 1 row [trans_A row]
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected row queue", K(ret), K(row_queue_.count()), KPC(this));
} else if (row_queue_.count() > 1 && OB_FAIL(compact_border_row(true /*last_row */))) {
@ -1535,13 +1529,12 @@ int ObPartitionMinorRowMergeIter::try_make_committing_trans_compacted()
LOG_WARN("Failed to compact current row to last row", K(ret));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(check_compact_finish(compact_finish))) {
if (FAILEDx(check_compact_finish(compact_finish))) {
LOG_WARN("Failed to check compact finish", K(ret));
} else if (curr_row_->is_last_multi_version_row()) {
check_committing_trans_compacted_ = true;
}
}
} // end of while
if (OB_SUCC(ret)) {
LOG_DEBUG("make committing trans compacted", K(ret), KPC(curr_row_),
@ -1587,10 +1580,10 @@ int ObPartitionMinorRowMergeIter::next()
ret = OB_ITER_END;
} else if (OB_LIKELY(curr_row_ != nullptr)) {
is_rowkey_first_row_already_output_ = !curr_row_->is_last_multi_version_row();
curr_row_ = nullptr;
}
if (OB_FAIL(ret)) {
} else if (FALSE_IT(curr_row_ = nullptr)) {
} else if (row_queue_.has_next()) { // get row from row_queue
if (OB_FAIL(row_queue_.get_next_row(curr_row_))) {
LOG_WARN("Failed to get next row from row_queue", K(ret));
@ -1615,7 +1608,6 @@ int ObPartitionMinorRowMergeIter::next()
} else if (OB_FAIL(try_make_committing_trans_compacted())) {
LOG_WARN("Failed to make committing trans compacted", K(ret), K(*this));
}
if (OB_SUCC(ret) && curr_row_ != nullptr && curr_row_->is_last_multi_version_row()) {
check_committing_trans_compacted_ = true;
}
@ -1654,8 +1646,8 @@ int ObPartitionMinorRowMergeIter::compare_multi_version_col(const ObPartitionMer
LOG_WARN("Unexpected column cnt to compare multi version col",
K(ret), KPC(curr_row_), KPC(other.get_curr_row()));
} else {
int64_t multi_value = curr_row_->storage_datums_[multi_version_col].get_int();
int64_t other_multi_value = other.get_curr_row()->storage_datums_[multi_version_col].get_int();
const int64_t multi_value = curr_row_->storage_datums_[multi_version_col].get_int();
const int64_t other_multi_value = other.get_curr_row()->storage_datums_[multi_version_col].get_int();
if (multi_value < other_multi_value) {
cmp_ret = -1;
} else if (multi_value > other_multi_value) {
@ -1796,8 +1788,8 @@ int ObPartitionMinorMacroMergeIter::inner_init(const ObMergeParameter &merge_par
} else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObSSTableRowWholeScanner)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to alloc memory for minor merge row scanner", K(ret));
} else if (FALSE_IT(row_iter_ = new (buf) ObSSTableRowWholeScanner())) {
} else {
row_iter_ = new (buf) ObSSTableRowWholeScanner();
macro_block_opened_ = false;
last_macro_block_reused_ = -1;
last_macro_block_recycled_ = false;
@ -1812,7 +1804,7 @@ int ObPartitionMinorMacroMergeIter::inner_init(const ObMergeParameter &merge_par
false, /* reverse scan */
false, /* need micro info */
true /* need secondary meta */))) {
LOG_WARN("Fail to scan macro block", K(ret));
LOG_WARN("Fail to scan macro block", K(ret));
}
}

View File

@ -1287,17 +1287,13 @@ int ObPartitionMinorMerger::set_result_flag(MERGE_ITER_ARRAY &fuse_iters,
}
}
}
if (OB_SUCC(ret) && add_shadow_row) {
const ObDatumRow &result_row = partition_fuser_->get_result_row();
row_flag.set_shadow_row(true);
int64_t sql_sequence_col_idx = data_store_desc_.get_schema_rowkey_col_cnt() + 1;
result_row.storage_datums_[sql_sequence_col_idx].reuse();
result_row.storage_datums_[sql_sequence_col_idx].set_int(-INT64_MAX);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(partition_fuser_->set_multi_version_flag(row_flag))) {
if (FAILEDx(partition_fuser_->set_multi_version_flag(row_flag))) {
STORAGE_LOG(WARN, "Failed to set multi version row flag and dml", K(ret));
} else if (add_shadow_row && OB_FAIL(partition_fuser_->make_result_row_shadow(
data_store_desc_.get_schema_rowkey_col_cnt() + 1 /*sql_sequence_col_idx*/))) {
LOG_WARN("failed to make shadow row", K(ret),
"result_row", partition_fuser_->get_result_row(),
"sql_seq_col_idx", data_store_desc_.get_schema_rowkey_col_cnt() + 1);
} else {
STORAGE_LOG(DEBUG, "succ to set multi version row flag and dml", K(partition_fuser_->get_result_row()),
K(row_flag), KPC(base_row));

View File

@ -176,6 +176,7 @@ int ObSSTableBuilder::build_sstable_merge_res(
{
int ret = OB_SUCCESS;
ObSEArray<blocksstable::MacroBlockId, DEFAULT_MACRO_ID_COUNT> macro_id_array;
macro_id_array.set_attr(ObMemAttr(MTL_ID(), "sstBuilder", ObCtxIds::MERGE_NORMAL_CTX_ID));
blocksstable::ObSSTableIndexBuilder::ObMacroMetaIter iter;
int64_t multiplexed_macro_block_count = 0;
@ -314,7 +315,7 @@ int ObSSTableBuilder::rebuild_macro_block(const ObIArray<blocksstable::MacroBloc
if (OB_FAIL(micro_iter.init())) {
STORAGE_LOG(WARN, "init SSTableRebuildMicroBlockIter failed", K(ret));
} else {
const blocksstable::ObDataMacroBlockMeta *macro_meta;
const blocksstable::ObDataMacroBlockMeta *macro_meta = NULL;
int64_t macro_id_idx = 0;
while (OB_SUCC(ret) && OB_SUCC(iter.get_next_macro_block(macro_meta))) {
if (OB_UNLIKELY(nullptr == macro_meta || !macro_meta->is_valid())) {

View File

@ -117,7 +117,6 @@ private:
blocksstable::ObMacroBlockWriter macro_writer_;
const ObITableReadInfo *index_read_info_;
static const int64_t REBUILD_MACRO_BLOCK_THRESOLD = 20;
static const int64_t MAX_TABLE_CNT = 4097;
static const int64_t DEFAULT_MACRO_ID_COUNT = 32;
};

View File

@ -995,8 +995,9 @@ int ObMemtableMultiVersionScanIterator::set_compacted_row_state(const bool add_s
row_.row_flag_.fuse_flag(value_iter_->get_row_first_dml_flag());
row_.mvcc_row_flag_.set_last_multi_version_row(value_iter_->is_multi_version_iter_end());
if (add_shadow_row) {
row_.set_shadow_row();
row_.storage_datums_[sql_sequence_col_idx_].set_int(-INT64_MAX);
if (OB_FAIL(ObShadowRowUtil::make_shadow_row(sql_sequence_col_idx_, row_))) {
LOG_WARN("failed to make shadow row", K(ret), K(row_), K_(sql_sequence_col_idx));
}
} else {
// sql_sequence of committed data is 0
row_.storage_datums_[sql_sequence_col_idx_].set_int(0);