add compaction trans cache

This commit is contained in:
a1iive
2023-03-16 00:10:54 +00:00
committed by ob-robot
parent b66dcf06a8
commit 01ad74c050
12 changed files with 327 additions and 24 deletions

View File

@ -457,6 +457,7 @@ ob_set_subtarget(ob_storage compaction
compaction/ob_medium_compaction_info.cpp compaction/ob_medium_compaction_info.cpp
compaction/ob_compaction_diagnose.cpp compaction/ob_compaction_diagnose.cpp
compaction/ob_compaction_suggestion.cpp compaction/ob_compaction_suggestion.cpp
compaction/ob_compaction_trans_cache.cpp
compaction/ob_sstable_merge_info_mgr.cpp compaction/ob_sstable_merge_info_mgr.cpp
compaction/ob_tenant_compaction_progress.cpp compaction/ob_tenant_compaction_progress.cpp
compaction/ob_server_compaction_event_history.cpp compaction/ob_server_compaction_event_history.cpp

View File

@ -72,7 +72,8 @@ ObTableAccessContext::ObTableAccessContext()
lob_locator_helper_(nullptr), lob_locator_helper_(nullptr),
iter_pool_(nullptr), iter_pool_(nullptr),
block_row_store_(nullptr), block_row_store_(nullptr),
io_callback_(nullptr) io_callback_(nullptr),
trans_state_mgr_(nullptr)
{ {
merge_scn_.set_max(); merge_scn_.set_max();
} }

View File

@ -20,6 +20,10 @@
namespace oceanbase namespace oceanbase
{ {
namespace compaction
{
struct ObCachedTransStateMgr;
}
namespace common namespace common
{ {
class ObIOCallback; class ObIOCallback;
@ -161,6 +165,7 @@ public:
ObStoreRowIterPool *iter_pool_; ObStoreRowIterPool *iter_pool_;
ObBlockRowStore *block_row_store_; ObBlockRowStore *block_row_store_;
common::ObIOCallback *io_callback_; common::ObIOCallback *io_callback_;
compaction::ObCachedTransStateMgr *trans_state_mgr_;
#ifdef ENABLE_DEBUG_LOG #ifdef ENABLE_DEBUG_LOG
transaction::ObDefensiveCheckRecordExtend defensive_check_record_; transaction::ObDefensiveCheckRecordExtend defensive_check_record_;
#endif #endif

View File

@ -21,6 +21,7 @@
#include "storage/blocksstable/ob_index_block_row_scanner.h" #include "storage/blocksstable/ob_index_block_row_scanner.h"
#include "storage/tx_table/ob_tx_table.h" #include "storage/tx_table/ob_tx_table.h"
#include "storage/tx/ob_tx_data_functor.h" #include "storage/tx/ob_tx_data_functor.h"
#include "storage/compaction/ob_compaction_trans_cache.h"
namespace oceanbase namespace oceanbase
{ {
@ -1010,6 +1011,7 @@ int ObMultiVersionMicroBlockRowScanner::inner_inner_get_next_row(
const ObDatumRow *&ret_row, bool &version_fit, bool &final_result, bool &have_uncommited_row) const ObDatumRow *&ret_row, bool &version_fit, bool &final_result, bool &have_uncommited_row)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ret_row = nullptr; ret_row = nullptr;
version_fit = false; version_fit = false;
final_result = false; final_result = false;
@ -1045,16 +1047,25 @@ int ObMultiVersionMicroBlockRowScanner::inner_inner_get_next_row(
} else if (FALSE_IT(flag = row_header->get_row_multi_version_flag())) { } else if (FALSE_IT(flag = row_header->get_row_multi_version_flag())) {
} else if (flag.is_uncommitted_row()) { } else if (flag.is_uncommitted_row()) {
have_uncommited_row = true; // TODO @lvling check transaction status instead have_uncommited_row = true; // TODO @lvling check transaction status instead
transaction::ObLockForReadArg lock_for_read_arg(acc_ctx, compaction::ObMergeCachedTransState trans_state;
transaction::ObTransID(row_header->get_trans_id()), if (OB_NOT_NULL(context_->trans_state_mgr_) &&
sql_sequence, OB_SUCCESS == context_->trans_state_mgr_->get_trans_state(
context_->query_flag_.read_latest_); transaction::ObTransID(row_header->get_trans_id()), sql_sequence, trans_state)) {
can_read = trans_state.can_read_;
trans_version = trans_state.trans_version_;
is_determined_state = trans_state.is_determined_state_;
} else {
transaction::ObLockForReadArg lock_for_read_arg(acc_ctx,
transaction::ObTransID(row_header->get_trans_id()),
sql_sequence,
context_->query_flag_.read_latest_);
if (OB_FAIL(lock_for_read(lock_for_read_arg, if (OB_FAIL(lock_for_read(lock_for_read_arg,
can_read, can_read,
trans_version, trans_version,
is_determined_state))) { is_determined_state))) {
STORAGE_LOG(WARN, "fail to check transaction status", K(ret), KPC(row_header), K_(macro_id)); STORAGE_LOG(WARN, "fail to check transaction status", K(ret), KPC(row_header), K_(macro_id));
}
} }
} }
@ -1311,6 +1322,7 @@ int ObMultiVersionMicroBlockRowScanner::lock_for_read(
bool &is_determined_state) bool &is_determined_state)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard(); auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard();
int64_t read_epoch = tx_table_guard.epoch(); int64_t read_epoch = tx_table_guard.epoch();
SCN scn_trans_version = SCN::invalid_scn(); SCN scn_trans_version = SCN::invalid_scn();
@ -1319,6 +1331,14 @@ int ObMultiVersionMicroBlockRowScanner::lock_for_read(
LOG_WARN("failed to check transaction status", K(ret)); LOG_WARN("failed to check transaction status", K(ret));
} else { } else {
trans_version = scn_trans_version.get_val_for_tx(); trans_version = scn_trans_version.get_val_for_tx();
if (OB_NOT_NULL(context_->trans_state_mgr_) &&
OB_TMP_FAIL(context_->trans_state_mgr_->add_trans_state(
lock_for_read_arg.data_trans_id_, lock_for_read_arg.data_sql_sequence_,
trans_version, ObTxData::MAX_STATE_CNT, can_read, is_determined_state))) {
LOG_WARN("failed to add trans state to cache", K(tmp_ret),
"trans_id", lock_for_read_arg.data_trans_id_,
"sql_seq", lock_for_read_arg.data_sql_sequence_);
}
} }
return ret; return ret;
} }
@ -2015,7 +2035,9 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::compact_last_row()
int ObMultiVersionMicroBlockMinorMergeRowScanner::find_uncommitted_row() int ObMultiVersionMicroBlockMinorMergeRowScanner::find_uncommitted_row()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
last_trans_id_.reset(); last_trans_id_.reset();
last_trans_state_ = INT64_MAX;
if (OB_UNLIKELY(OB_ISNULL(reader_) || SCAN_START != scan_state_)) { if (OB_UNLIKELY(OB_ISNULL(reader_) || SCAN_START != scan_state_)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("reader is null OR scan state is wrong", K(ret), K(reader_), K(scan_state_)); LOG_WARN("reader is null OR scan state is wrong", K(ret), K(reader_), K(scan_state_));
@ -2046,9 +2068,16 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::find_uncommitted_row()
//get trans status & committed_trans_version_ //get trans status & committed_trans_version_
int64_t state; int64_t state;
int64_t commit_trans_version = INT64_MAX; int64_t commit_trans_version = INT64_MAX;
if (OB_FAIL(get_trans_state(last_trans_id_, state, commit_trans_version))) { compaction::ObMergeCachedTransState trans_state;
if (OB_NOT_NULL(context_->trans_state_mgr_) &&
OB_SUCCESS == context_->trans_state_mgr_->get_trans_state(last_trans_id_, sql_sequence, trans_state)) {
state = trans_state.trans_state_;
last_trans_state_ = trans_state.trans_state_;
commit_trans_version = trans_state.trans_version_;
} else if (OB_FAIL(get_trans_state(last_trans_id_, state, commit_trans_version))) {
LOG_WARN("get transaction status failed", K(ret), K(last_trans_id_), K(state)); LOG_WARN("get transaction status failed", K(ret), K(last_trans_id_), K(state));
} else if (OB_FAIL(judge_trans_state(state, commit_trans_version))) { }
if (OB_SUCC(ret) && OB_FAIL(judge_trans_state(state, commit_trans_version))) {
LOG_WARN("failed to judge transaction status", K(ret), K(last_trans_id_), LOG_WARN("failed to judge transaction status", K(ret), K(last_trans_id_),
K(state)); K(state));
} }
@ -2069,12 +2098,13 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state(
//get trans status & committed_trans_version_ //get trans status & committed_trans_version_
SCN scn_commit_trans_version = SCN::max_scn(); SCN scn_commit_trans_version = SCN::max_scn();
auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard(); auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard();
int64_t read_epoch = tx_table_guard.epoch();; int64_t read_epoch = tx_table_guard.epoch();
if (OB_FAIL(tx_table_guard.get_tx_table()->get_tx_state_with_scn( if (OB_FAIL(tx_table_guard.get_tx_table()->get_tx_state_with_scn(
trans_id, context_->merge_scn_, read_epoch, state, scn_commit_trans_version))) { trans_id, context_->merge_scn_, read_epoch, state, scn_commit_trans_version))) {
LOG_WARN("get transaction status failed", K(ret), K(trans_id), K(state)); LOG_WARN("get transaction status failed", K(ret), K(trans_id), K(state));
} else { } else {
commit_trans_version = scn_commit_trans_version.get_val_for_tx(); commit_trans_version = scn_commit_trans_version.get_val_for_tx();
last_trans_state_ = state;
} }
return ret; return ret;
} }
@ -2204,15 +2234,27 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::check_curr_row_can_read(
bool &can_read) bool &can_read)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool is_cached = false;
can_read = false; can_read = false;
auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard(); compaction::ObMergeCachedTransState trans_state;
int64_t read_epoch = tx_table_guard.epoch(); if (OB_NOT_NULL(context_->trans_state_mgr_) &&
if (OB_FAIL(tx_table_guard.get_tx_table()->check_sql_sequence_can_read( OB_SUCCESS == context_->trans_state_mgr_->get_trans_state(trans_id, sql_seq, trans_state)) {
trans_id, can_read = trans_state.can_read_;
sql_seq, } else {
read_epoch, auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard();
can_read))) { int64_t read_epoch = tx_table_guard.epoch();
LOG_WARN("check sql sequence can read failed", K(ret), K(can_read), K(trans_id), K(sql_seq)); if (OB_FAIL(tx_table_guard.get_tx_table()->check_sql_sequence_can_read(
trans_id,
sql_seq,
read_epoch,
can_read))) {
LOG_WARN("check sql sequence can read failed", K(ret), K(can_read), K(trans_id), K(sql_seq));
} else if (OB_NOT_NULL(context_->trans_state_mgr_) &&
OB_TMP_FAIL(context_->trans_state_mgr_->add_trans_state(trans_id, sql_seq,
committed_trans_version_, last_trans_state_, can_read, 0))) {
LOG_WARN("failed to add minor trans state", K(tmp_ret), K(trans_id), K(sql_seq), K(can_read));
}
} }
LOG_DEBUG("cxf debug check sql sequence can read", K(ret), K(can_read), K(trans_id), K(sql_seq)); LOG_DEBUG("cxf debug check sql sequence can read", K(ret), K(can_read), K(trans_id), K(sql_seq));
return ret; return ret;

View File

@ -269,6 +269,7 @@ public:
is_row_queue_ready_(false), is_row_queue_ready_(false),
scan_state_(SCAN_START), scan_state_(SCAN_START),
committed_trans_version_(INT64_MAX), committed_trans_version_(INT64_MAX),
last_trans_state_(INT64_MAX),
read_trans_id_(), read_trans_id_(),
last_trans_id_(), last_trans_id_(),
first_rowkey_flag_(true), first_rowkey_flag_(true),
@ -367,6 +368,7 @@ private:
bool is_row_queue_ready_; bool is_row_queue_ready_;
ScanState scan_state_; ScanState scan_state_;
int64_t committed_trans_version_; int64_t committed_trans_version_;
int64_t last_trans_state_;
transaction::ObTransID read_trans_id_; transaction::ObTransID read_trans_id_;
transaction::ObTransID last_trans_id_; transaction::ObTransID last_trans_id_;
bool first_rowkey_flag_; bool first_rowkey_flag_;

View File

@ -0,0 +1,106 @@
/**
* Copyright (c) 2022 OceanBase
* OceanBase is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE_COMPACTION
#include "storage/compaction/ob_compaction_trans_cache.h"
namespace oceanbase
{
using namespace storage;
namespace compaction
{
/*
* ---------------------------------------------ObCachedTransStateMgr----------------------------------------------
*/
int ObCachedTransStateMgr::init(int64_t max_cnt)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObCachedTransStateMgr has already been initiated", K(ret));
} else if (max_cnt <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("max count is invalid", K(ret), K(max_cnt));
} else {
void *buf = nullptr;
if (OB_ISNULL(buf = allocator_.alloc(max_cnt * sizeof(ObMergeCachedTransState)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret), K(max_cnt),
"alloc_size", max_cnt * sizeof(ObMergeCachedTransState));
} else {
array_ = new(buf) ObMergeCachedTransState[max_cnt]();
max_cnt_ = max_cnt;
is_inited_ = true;
}
}
return ret;
}
void ObCachedTransStateMgr::destroy()
{
if (OB_NOT_NULL(array_)) {
allocator_.free(array_);
array_ = nullptr;
}
is_inited_ = false;
}
int ObCachedTransStateMgr::get_trans_state(
const transaction::ObTransID &trans_id,
const int64_t sql_seq,
ObMergeCachedTransState &trans_state)
{
int ret = OB_SUCCESS;
ObMergeCachedTransKey key(trans_id, sql_seq);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObCachedTransStateMgr is not initialized", K(ret));
} else if (!key.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid key", K(ret), K(key));
} else {
uint64_t idx = cal_idx(key);
if (array_[idx].key_ == key && array_[idx].is_valid()) {
trans_state = array_[cal_idx(key)];
} else {
ret = OB_HASH_NOT_EXIST;
}
}
return ret;
}
int ObCachedTransStateMgr::add_trans_state(
const transaction::ObTransID &trans_id,
const int64_t sql_seq,
const int64_t commited_trans_version,
const int32_t trans_state,
const int16_t can_read,
const int16_t is_determined_state)
{
int ret = OB_SUCCESS;
ObMergeCachedTransKey key(trans_id, sql_seq);
ObMergeCachedTransState status(trans_id, sql_seq, commited_trans_version, trans_state, can_read, is_determined_state);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObCachedTransStateMgr is not initialized", K(ret));
} else if (!status.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid trans state", K(ret), K(status));
} else {
array_[cal_idx(key)] = status;
}
return ret;
}
} // namespace compaction
} // namespace oceanbase

View File

@ -0,0 +1,131 @@
/**
* Copyright (c) 2022 OceanBase
* OceanBase is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef SRC_STORAGE_COMPACTION_OB_TRANS_CACHE_H_
#define SRC_STORAGE_COMPACTION_OB_TRANS_CACHE_H_
#include "lib/ob_define.h"
#include "lib/utility/ob_print_utils.h"
#include "lib/hash/ob_hashutils.h"
#include "storage/tx/ob_trans_define.h"
#include "storage/tx/ob_tx_data_define.h"
namespace oceanbase
{
namespace storage
{
class ObTxData;
}
namespace compaction
{
struct ObMergeCachedTransKey {
ObMergeCachedTransKey()
: trans_id_(),
sql_sequence_(0)
{}
ObMergeCachedTransKey(
transaction::ObTransID trans_id,
int64_t sql_sequence)
: trans_id_(trans_id),
sql_sequence_(sql_sequence)
{}
~ObMergeCachedTransKey() {}
inline bool operator == (const ObMergeCachedTransKey &other) const
{
return trans_id_ == other.trans_id_ && sql_sequence_ == other.sql_sequence_;
}
inline uint64_t hash() const
{
uint64_t hash_value = trans_id_.hash();
hash_value = murmurhash(&sql_sequence_, sizeof(sql_sequence_), hash_value);
return hash_value;
}
inline bool is_valid() const
{
return trans_id_.is_valid() && 0 != sql_sequence_;
}
TO_STRING_KV(K_(trans_id), K_(sql_sequence));
transaction::ObTransID trans_id_;
int64_t sql_sequence_;
};
struct ObMergeCachedTransState {
ObMergeCachedTransState()
: key_(),
trans_version_(0),
trans_state_(INT32_MAX),
can_read_(INVALID_BOOL_VALUE),
is_determined_state_(INVALID_BOOL_VALUE)
{}
ObMergeCachedTransState(
transaction::ObTransID trans_id,
int64_t sql_sequence,
int64_t trans_version,
int32_t trans_state,
int16_t can_read,
int16_t is_determined_state)
: key_(trans_id, sql_sequence),
trans_version_(trans_version),
trans_state_(trans_state),
can_read_(can_read),
is_determined_state_(is_determined_state)
{}
virtual ~ObMergeCachedTransState() {}
inline bool is_valid() const
{
return key_.is_valid() && 0 != trans_version_ && INT32_MAX != trans_state_ &&
INVALID_BOOL_VALUE != can_read_ && INVALID_BOOL_VALUE != is_determined_state_;
}
TO_STRING_KV(K_(key), K_(trans_state), K_(trans_version), K_(can_read), K_(is_determined_state));
static const int16_t INVALID_BOOL_VALUE = -1;
ObMergeCachedTransKey key_;
int64_t trans_version_;
int32_t trans_state_;
int16_t can_read_; // 0 false; 1 true
int16_t is_determined_state_; // 0 false; 1 true
};
class ObCachedTransStateMgr {
public:
ObCachedTransStateMgr(common::ObIAllocator &allocator)
: is_inited_(false),
max_cnt_(0),
allocator_(allocator),
array_(nullptr)
{}
~ObCachedTransStateMgr() { destroy(); }
int init(int64_t max_cnt);
void destroy();
inline uint64_t cal_idx(const ObMergeCachedTransKey &key) { return key.hash() % max_cnt_; }
int get_trans_state(const transaction::ObTransID &trans_id, const int64_t sql_seq, ObMergeCachedTransState &trans_state);
int add_trans_state(
const transaction::ObTransID &trans_id,
const int64_t sql_seq,
const int64_t trans_version,
const int32_t trans_state,
const int16_t can_read,
const int16_t is_determined_state);
private:
bool is_inited_;
int64_t max_cnt_;
common::ObIAllocator &allocator_;
ObMergeCachedTransState *array_;
};
} // namespace compaction
} // namespace oceanbase
#endif

View File

@ -127,6 +127,7 @@ int ObPartitionMergeIter::init_query_base_params(const ObMergeParameter &merge_p
merge_param.version_range_))) { merge_param.version_range_))) {
LOG_WARN("Failed to init table access context", K(ret), K(query_flag)); LOG_WARN("Failed to init table access context", K(ret), K(query_flag));
} else { } else {
access_context_.trans_state_mgr_ = merge_param.trans_state_mgr_;
// always use end_scn for safety // always use end_scn for safety
access_context_.merge_scn_ = merge_param.scn_range_.end_scn_; access_context_.merge_scn_ = merge_param.scn_range_.end_scn_;
} }

View File

@ -46,6 +46,7 @@ ObPartitionMerger::ObPartitionMerger()
macro_writer_(nullptr), macro_writer_(nullptr),
minimum_iters_(DEFAULT_ITER_ARRAY_SIZE, ModulePageAllocator(allocator_)), minimum_iters_(DEFAULT_ITER_ARRAY_SIZE, ModulePageAllocator(allocator_)),
base_iter_(nullptr), base_iter_(nullptr),
trans_state_mgr_(allocator_),
task_idx_(0), task_idx_(0),
check_macro_need_merge_(false), check_macro_need_merge_(false),
is_inited_(false) is_inited_(false)
@ -78,6 +79,7 @@ void ObPartitionMerger::reset()
allocator_.free(macro_writer_); allocator_.free(macro_writer_);
macro_writer_ = nullptr; macro_writer_ = nullptr;
} }
trans_state_mgr_.destroy();
allocator_.reset(); allocator_.reset();
} }
@ -362,6 +364,7 @@ int ObPartitionMerger::prepare_merge_partition(ObMergeParameter &merge_param,
ObPartitionMergeHelper &merge_helper) ObPartitionMergeHelper &merge_helper)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObPartitionMerger is not inited", K(ret)); STORAGE_LOG(WARN, "ObPartitionMerger is not inited", K(ret));
@ -374,10 +377,14 @@ int ObPartitionMerger::prepare_merge_partition(ObMergeParameter &merge_param,
STORAGE_LOG(WARN, "Failed to open macro writer", K(ret), K(merge_param)); STORAGE_LOG(WARN, "Failed to open macro writer", K(ret), K(merge_param));
} else if (OB_FAIL(init_partition_fuser(merge_param))) { } else if (OB_FAIL(init_partition_fuser(merge_param))) {
STORAGE_LOG(WARN, "Failed to init partition merge fuser", K(merge_param), K(ret)); STORAGE_LOG(WARN, "Failed to init partition merge fuser", K(merge_param), K(ret));
} else if (OB_FAIL(merge_helper.init(*partition_fuser_, merge_param, data_store_desc_.row_store_type_))) { } else if (OB_TMP_FAIL(trans_state_mgr_.init(CACHED_TRANS_STATE_MAX_CNT))) {
STORAGE_LOG(WARN, "failed to init merge trans state mgr", K(tmp_ret));
} else {
merge_param.trans_state_mgr_ = &trans_state_mgr_;
}
if (OB_SUCC(ret) && OB_FAIL(merge_helper.init(*partition_fuser_, merge_param, data_store_desc_.row_store_type_))) {
STORAGE_LOG(WARN, "Failed to init merge helper", K(ret)); STORAGE_LOG(WARN, "Failed to init merge helper", K(ret));
} }
return ret; return ret;
} }

View File

@ -26,6 +26,7 @@
#include "storage/blocksstable/ob_sstable.h" #include "storage/blocksstable/ob_sstable.h"
#include "lib/container/ob_loser_tree.h" #include "lib/container/ob_loser_tree.h"
#include "storage/compaction/ob_partition_rows_merger.h" #include "storage/compaction/ob_partition_rows_merger.h"
#include "storage/compaction/ob_compaction_trans_cache.h"
namespace oceanbase namespace oceanbase
{ {
@ -73,6 +74,7 @@ protected:
void set_base_iter(const MERGE_ITER_ARRAY &minimum_iters); void set_base_iter(const MERGE_ITER_ARRAY &minimum_iters);
protected: protected:
static const int64_t DEFAULT_ITER_ARRAY_SIZE = DEFAULT_ITER_COUNT * sizeof(ObPartitionMergeIter *); static const int64_t DEFAULT_ITER_ARRAY_SIZE = DEFAULT_ITER_COUNT * sizeof(ObPartitionMergeIter *);
static const int64_t CACHED_TRANS_STATE_MAX_CNT = 10 * 1024l;
protected: protected:
common::ObArenaAllocator allocator_; common::ObArenaAllocator allocator_;
ObTabletMergeCtx *merge_ctx_; ObTabletMergeCtx *merge_ctx_;
@ -83,6 +85,7 @@ protected:
blocksstable::ObMacroBlockWriter *macro_writer_; blocksstable::ObMacroBlockWriter *macro_writer_;
MERGE_ITER_ARRAY minimum_iters_; MERGE_ITER_ARRAY minimum_iters_;
ObPartitionMergeIter *base_iter_; ObPartitionMergeIter *base_iter_;
ObCachedTransStateMgr trans_state_mgr_;
int64_t task_idx_; int64_t task_idx_;
bool check_macro_need_merge_; bool check_macro_need_merge_;
bool is_inited_; bool is_inited_;

View File

@ -73,7 +73,8 @@ ObMergeParameter::ObMergeParameter()
version_range_(), version_range_(),
scn_range_(), scn_range_(),
full_read_info_(nullptr), full_read_info_(nullptr),
is_full_merge_(false) is_full_merge_(false),
trans_state_mgr_(nullptr)
{ {
} }
@ -102,6 +103,7 @@ void ObMergeParameter::reset()
version_range_.reset(); version_range_.reset();
scn_range_.reset(); scn_range_.reset();
is_full_merge_ = false; is_full_merge_ = false;
trans_state_mgr_ = nullptr;
} }
int ObMergeParameter::init(compaction::ObTabletMergeCtx &merge_ctx, const int64_t idx) int ObMergeParameter::init(compaction::ObTabletMergeCtx &merge_ctx, const int64_t idx)

View File

@ -50,6 +50,7 @@ class ObTabletMergeDag;
struct ObTabletMergeCtx; struct ObTabletMergeCtx;
class ObTabletMergeInfo; class ObTabletMergeInfo;
class ObPartitionMerger; class ObPartitionMerger;
struct ObCachedTransStateMgr;
struct ObMergeParameter { struct ObMergeParameter {
@ -73,6 +74,7 @@ struct ObMergeParameter {
share::ObScnRange scn_range_; share::ObScnRange scn_range_;
const ObTableReadInfo *full_read_info_; // full read info of old tablet const ObTableReadInfo *full_read_info_; // full read info of old tablet
bool is_full_merge_; // full merge or increment merge, duplicated with merge_level bool is_full_merge_; // full merge or increment merge, duplicated with merge_level
compaction::ObCachedTransStateMgr *trans_state_mgr_;
TO_STRING_KV(KPC_(tables_handle), K_(merge_type), K_(merge_level), KP_(merge_schema), TO_STRING_KV(KPC_(tables_handle), K_(merge_type), K_(merge_level), KP_(merge_schema),
K_(merge_range), K_(version_range), K_(scn_range), K_(is_full_merge)); K_(merge_range), K_(version_range), K_(scn_range), K_(is_full_merge));