simply pdml code, remove compat logic on leader

This commit is contained in:
chinaxing 2024-06-17 09:49:23 +00:00 committed by ob-robot
parent 89e696f309
commit 156e89bb28
16 changed files with 102 additions and 292 deletions

View File

@ -65,7 +65,7 @@ TestTableScanPureDataTable::TestTableScanPureDataTable()
void TestTableScanPureDataTable::SetUpTestCase()
{
uint64_t version = cal_version(4, 2, 0, 0);
uint64_t version = cal_version(4, 3, 0, 0);
ASSERT_EQ(OB_SUCCESS, ObClusterVersion::get_instance().init(version));
ASSERT_EQ(OB_SUCCESS, omt::ObTenantConfigMgr::get_instance().add_tenant_config(MTL_ID()));
ObClusterVersion::get_instance().tenant_config_mgr_ = &omt::ObTenantConfigMgr::get_instance();

View File

@ -100,7 +100,7 @@ public:
{
LOG_INFO("SetUpTestCase");
ASSERT_EQ(OB_SUCCESS, omt::ObTenantConfigMgr::get_instance().add_tenant_config(MTL_ID()));
uint64_t version = cal_version(4, 2, 0, 0);
uint64_t version = cal_version(4, 3, 0, 0);
ASSERT_EQ(OB_SUCCESS, ObClusterVersion::get_instance().init(version));
ObClusterVersion::get_instance().tenant_config_mgr_ = &omt::ObTenantConfigMgr::get_instance();

View File

@ -59,7 +59,7 @@ ObTableLockOp DEFAULT_CONFLICT_OUT_TRANS_LOCK_OP;
void init_default_lock_test_value()
{
int ret = OB_SUCCESS;
const ObTxSEQ seq_no;
const ObTxSEQ seq_no(100,0);
const int64_t create_timestamp = 1;
const int64_t create_schema_version = 1;

View File

@ -90,7 +90,7 @@ void MockObTxCtx::init(const uint64_t tenant_id,
tenant_id_ = tenant_id;
// mock trans ctx end
cluster_version_ = CLUSTER_VERSION_4_0_0_0;
cluster_version_ = CLUSTER_VERSION_4_3_0_0;
timer_ = NULL;
// trans part ctx
ls_id_ = ls_id;
@ -138,6 +138,7 @@ void MockTxEnv::start_tx(const ObTransID &tx_id,
{
my_ctx.tx_desc_.tx_id_ = tx_id;
my_ctx.tx_desc_.state_ = ObTxDesc::State::ACTIVE;
my_ctx.tx_desc_.seq_base_ = ObTimeUtility::current_time();
my_ctx.tx_ctx_.ctx_tx_data_.test_tx_data_reset();
my_ctx.tx_ctx_.ctx_tx_data_.test_set_tx_id(tx_id);
my_ctx.tx_ctx_.init(tenant_id_,

View File

@ -394,7 +394,7 @@ int ObTableApiProcessorBase::init_read_trans(const ObTableConsistencyLevel consi
bool strong_read = ObTableConsistencyLevel::STRONG == consistency_level;
transaction::ObTransService *txs = MTL(transaction::ObTransService*);
if (OB_FAIL(txs->acquire_tx(trans_desc_, session().get_sessid(), session().get_data_version()))) {
if (OB_FAIL(txs->acquire_tx(trans_desc_, session().get_sessid()))) {
LOG_WARN("failed to acquire tx desc", K(ret));
} else if (OB_FAIL(setup_tx_snapshot_(*trans_desc_, tx_snapshot_, strong_read, ls_id, timeout_ts))) {
LOG_WARN("setup txn snapshot fail", K(ret), KPC_(trans_desc), K(strong_read), K(ls_id), K(timeout_ts));
@ -485,7 +485,7 @@ int ObTableApiProcessorBase::start_trans_(bool is_readonly,
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("start_trans is executed", K(ret));
} else {
if (OB_FAIL(txs->acquire_tx(trans_desc, session().get_sessid(), session().get_data_version()))) {
if (OB_FAIL(txs->acquire_tx(trans_desc, session().get_sessid()))) {
LOG_WARN("failed to acquire tx desc", K(ret));
} else if (OB_FAIL(txs->start_tx(*trans_desc, tx_param))) {
LOG_WARN("failed to start trans", K(ret), KPC(trans_desc));

View File

@ -184,7 +184,7 @@ ObTransCallbackMgr::RDLockGuard::RDLockGuard(const SpinRWLock &rwlock)
#define CALLBACK_LISTS_FOREACH_(idx, list, CONST) \
CONST ObTxCallbackList *list = &callback_list_; \
const int list_cnt = (!need_merge_ && callback_lists_) ? MAX_CALLBACK_LIST_COUNT : 1; \
const int list_cnt = callback_lists_ ? MAX_CALLBACK_LIST_COUNT : 1; \
for (int idx = 0; OB_SUCC(ret) && idx < list_cnt; \
list = (list_cnt > 1 ? callback_lists_ + idx : NULL), ++idx)
@ -197,7 +197,7 @@ void ObTransCallbackMgr::reset()
skip_checksum_ = false;
callback_list_.reset();
if (callback_lists_) {
int cnt = need_merge_ ? MAX_CALLBACK_LIST_COUNT : MAX_CALLBACK_LIST_COUNT - 1;
int cnt = MAX_CALLBACK_LIST_COUNT - 1;
for (int i = 0; i < cnt; ++i) {
if (!callback_lists_[i].empty()) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "txn callback list is broken", K(stat), K(i), K(this));
@ -223,15 +223,12 @@ void ObTransCallbackMgr::reset()
parallel_stat_ = 0;
write_epoch_ = 0;
write_epoch_start_tid_ = 0;
need_merge_ = false;
for_replay_ = false;
has_branch_replayed_into_first_list_ = false;
serial_final_scn_.set_max();
serial_final_seq_no_.reset();
serial_sync_scn_.set_min();
callback_main_list_append_count_ = 0;
callback_slave_list_append_count_ = 0;
callback_slave_list_merge_count_ = 0;
callback_remove_for_trans_end_count_ = 0;
callback_remove_for_remove_memtable_count_ = 0;
callback_remove_for_fast_commit_count_ = 0;
@ -354,7 +351,6 @@ int ObTransCallbackMgr::append(ObITransCallback *node)
}
const transaction::ObTxSEQ seq_no = node->get_seq_no();
if (seq_no.support_branch()) {
// NEW since version 4.3, select by branch
int slot = seq_no.get_branch() % MAX_CALLBACK_LIST_COUNT;
if (slot > 0
&& for_replay_
@ -405,24 +401,16 @@ int ObTransCallbackMgr::append(ObITransCallback *node)
} else {
ret = callback_lists_[slot - 1].append_callback(node, for_replay_, parallel_replay_, is_serial_final_());
}
} else if (!for_replay_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "write by older version", K(ret), K(seq_no), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
} else {
// OLD before version 4.3
// if has parallel select from callback_lists_
// don't select main, and merge into main finally
const int64_t tid = get_itid() + 1;
int slot = tid % MAX_CALLBACK_LIST_COUNT;
int64_t stat = ATOMIC_LOAD(&parallel_stat_);
if (PARALLEL_STMT == stat) {
if (OB_FAIL(!callback_lists_ && extend_callback_lists_(MAX_CALLBACK_LIST_COUNT))) {
TRANS_LOG(WARN, "extend callback lists failed", K(ret));
} else {
ret = callback_lists_[slot].append_callback(node, for_replay_, parallel_replay_, true);
add_slave_list_append_cnt();
}
} else {
ret = callback_list_.append_callback(node, for_replay_, parallel_replay_, true);
add_main_list_append_cnt();
}
// for replay, before version 4.2.4
ret = callback_list_.append_callback(node, for_replay_, parallel_replay_, true);
add_main_list_append_cnt();
}
after_append(node, ret);
return ret;
@ -431,9 +419,7 @@ int ObTransCallbackMgr::append(ObITransCallback *node)
void ObTransCallbackMgr::before_append(ObITransCallback *node)
{
int64_t size = node->get_data_size();
int64_t new_size = inc_pending_log_size(size);
try_merge_multi_callback_lists(new_size, size, node->is_logging_blocked());
if (for_replay_) {
inc_flushed_log_size(size);
}
@ -458,7 +444,7 @@ int ObTransCallbackMgr::rollback_to(const ObTxSEQ to_seq_no,
int ret = OB_SUCCESS;
int slot = -1;
remove_cnt = callback_remove_for_rollback_to_count_;
if (OB_LIKELY(to_seq_no.support_branch())) { // since 4.3
if (OB_LIKELY(to_seq_no.support_branch())) { // since 4.2.4
// it is a global savepoint, rollback on all list
if (to_seq_no.get_branch() == 0) {
CALLBACK_LISTS_FOREACH(idx, list) {
@ -487,7 +473,13 @@ int ObTransCallbackMgr::rollback_to(const ObTxSEQ to_seq_no,
KPC(this), KPC(get_trans_ctx()), K(replay_scn), K(to_seq_no), K(from_seq_no));
}
}
} else { // before 4.3
} else if (!for_replay_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "leader rollback to with old version", K(ret), K(to_seq_no), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
} else { // for replay, before 4.2.4
ret = callback_list_.remove_callbacks_for_rollback_to(to_seq_no, from_seq_no, replay_scn);
}
if (OB_FAIL(ret)) {
@ -497,40 +489,6 @@ int ObTransCallbackMgr::rollback_to(const ObTxSEQ to_seq_no,
return ret;
}
// merge `callback_lists_` into `callback_list_`
void ObTransCallbackMgr::merge_multi_callback_lists()
{
int64_t stat = ATOMIC_LOAD(&parallel_stat_);
int64_t cnt = 0;
if (OB_UNLIKELY(ATOMIC_LOAD(&need_merge_)) && PARALLEL_STMT == stat) {
WRLockGuard guard(rwlock_);
if (OB_NOT_NULL(callback_lists_)) {
for (int64_t i = 0; i < MAX_CALLBACK_LIST_COUNT; ++i) {
cnt = callback_list_.concat_callbacks(callback_lists_[i]);
add_slave_list_merge_cnt(cnt);
}
}
#ifndef NDEBUG
TRANS_LOG(INFO, "merge callback lists to callback list", K(stat), K(host_.get_tx_id()));
#endif
}
}
void ObTransCallbackMgr::force_merge_multi_callback_lists()
{
int64_t cnt = 0;
if (OB_UNLIKELY(ATOMIC_LOAD(&need_merge_))) {
WRLockGuard guard(rwlock_);
if (OB_NOT_NULL(callback_lists_)) {
for (int64_t i = 0; i < MAX_CALLBACK_LIST_COUNT; ++i) {
cnt = callback_list_.concat_callbacks(callback_lists_[i]);
add_slave_list_merge_cnt(cnt);
}
}
}
TRANS_LOG(DEBUG, "force merge callback lists to callback list", K(host_.get_tx_id()));
}
transaction::ObPartTransCtx *ObTransCallbackMgr::get_trans_ctx() const
{
return host_.get_trans_ctx();
@ -549,8 +507,6 @@ void ObTransCallbackMgr::reset_pdml_stat()
need_retry = false;
}
}
force_merge_multi_callback_lists();
}
// only for replay
@ -563,9 +519,6 @@ int ObTransCallbackMgr::remove_callbacks_for_fast_commit(const int16_t callback_
{
int ret = OB_SUCCESS;
RDLockGuard guard(rwlock_);
// NOTE:
// this can handle both NEW(since 4.3) and compatible with OLD version:
// because before 4.3, replay only append to main (the `callback_list_`)
if (OB_UNLIKELY(callback_list_idx != 0 || is_serial_final_())) {
ObTxCallbackList *list = get_callback_list_(callback_list_idx, true);
if (OB_ISNULL(list)) {
@ -601,8 +554,6 @@ int ObTransCallbackMgr::remove_callbacks_for_fast_commit(const int16_t callback_
// @scopes: the log's callback-list scopes
int ObTransCallbackMgr::remove_callbacks_for_fast_commit(const ObCallbackScopeArray &scopes)
{
// this can handle both NEW (since 4.3) and OLD (before 4.3):
// before 4.3: scopes must be single and came from the main list
const share::SCN stop_scn = is_serial_final_() ? share::SCN::invalid_scn() : serial_sync_scn_;
int ret = OB_SUCCESS;
ARRAY_FOREACH(scopes, i) {
@ -623,11 +574,7 @@ int ObTransCallbackMgr::remove_callback_for_uncommited_txn(const memtable::ObMem
if (OB_ISNULL(memtable_set)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "memtable is null", K(ret));
} else if (need_merge_) { // OLD (before 4.3)
if (OB_FAIL(callback_list_.remove_callbacks_for_remove_memtable(memtable_set, stop_scn))) {
TRANS_LOG(WARN, "fifo remove callback fail", K(ret), KPC(memtable_set));
}
} else { // NEW (since 4.3)
} else {
CALLBACK_LISTS_FOREACH(idx, list) {
if (OB_FAIL(list->remove_callbacks_for_remove_memtable(memtable_set, stop_scn))) {
TRANS_LOG(WARN, "fifo remove callback fail", K(ret), K(idx), KPC(memtable_set));
@ -643,18 +590,12 @@ int ObTransCallbackMgr::remove_callback_for_uncommited_txn(const memtable::ObMem
int ObTransCallbackMgr::clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction<void()> &before_remove)
{
int ret = OB_SUCCESS;
if (need_merge_) { // OLD (before 4.3)
if (OB_FAIL(callback_list_.clean_unlog_callbacks(removed_cnt, before_remove))) {
TRANS_LOG(WARN, "clean unlog callbacks failed", K(ret));
}
} else { // NEW (since 4.3)
CALLBACK_LISTS_FOREACH(idx, list) {
int64_t rm_cnt = 0;
if (OB_FAIL(list->clean_unlog_callbacks(rm_cnt, before_remove))) {
TRANS_LOG(WARN, "clean unlog callbacks failed", K(ret), K(idx));
} else {
removed_cnt += rm_cnt;
}
CALLBACK_LISTS_FOREACH(idx, list) {
int64_t rm_cnt = 0;
if (OB_FAIL(list->clean_unlog_callbacks(rm_cnt, before_remove))) {
TRANS_LOG(WARN, "clean unlog callbacks failed", K(ret), K(idx));
} else {
removed_cnt += rm_cnt;
}
}
TRANS_LOG(TRACE, "clean callbacks", K(ret), K(removed_cnt));
@ -668,13 +609,13 @@ int ObTransCallbackMgr::calc_checksum_before_scn(const SCN scn,
int ret = OB_SUCCESS;
const share::SCN stop_scn = is_serial_final_() ? share::SCN::max_scn() : serial_sync_scn_;
const bool is_single_callback_list = ATOMIC_LOAD(&callback_lists_) == NULL;
if (need_merge_ || is_single_callback_list) { // OLD (before 4.3) or only single callback_list
if (is_single_callback_list) { // only single callback_list
if (OB_FAIL(callback_list_.tx_calc_checksum_before_scn(stop_scn))) {
TRANS_LOG(WARN, "calc checksum fail", K(ret));
} else {
callback_list_.get_checksum_and_scn(checksum.at(0), checksum_scn.at(0));
}
} else { // new (since 4.3) and multiple callback_list
} else { // multiple callback_list
// reserve space
if (checksum.count() < MAX_CALLBACK_LIST_COUNT) {
if (OB_FAIL(checksum.reserve(MAX_CALLBACK_LIST_COUNT))) {
@ -884,7 +825,7 @@ int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq,
int ObTransCallbackMgr::fill_log(ObTxFillRedoCtx &ctx, ObITxFillRedoFunctor &func)
{
int ret = OB_SUCCESS;
if (!ATOMIC_LOAD(&callback_lists_) || need_merge_) {
if (!ATOMIC_LOAD(&callback_lists_)) {
if (OB_UNLIKELY(ctx.list_idx_ > 0)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "list_idx is unexpected", K(ret), K(ctx));
@ -912,17 +853,17 @@ int ObTransCallbackMgr::fill_from_one_list(ObTxFillRedoCtx &ctx,
ObITxFillRedoFunctor &func)
{
int ret = OB_SUCCESS;
FILL_LOG_TRACE("from one list", K(ctx), K(need_merge_));
FILL_LOG_TRACE("from one list", K(ctx));
RDLockGuard guard(rwlock_);
int64_t epoch_from = 0, epoch_to = 0;
if (OB_LIKELY(callback_lists_ == NULL) || need_merge_) {
if (OB_LIKELY(callback_lists_ == NULL)) {
epoch_from = callback_list_.get_log_epoch();
epoch_to = INT64_MAX;
} else {
calc_list_fill_log_epoch_(list_idx, epoch_from, epoch_to);
}
FILL_LOG_TRACE("start fill list", K(list_idx), K(epoch_from), K(epoch_to), K(ctx));
if (!need_merge_ && epoch_from == 0) {
if (epoch_from == 0) {
ret = OB_ITER_END; // can not fill any callback, because of other list has min write epoch
} else if (epoch_from == INT64_MAX) {
ret = OB_SUCCESS; // no callback to fill
@ -1241,7 +1182,6 @@ int ObTransCallbackMgr::update_checksum(const ObIArray<uint64_t> &checksum,
const ObIArray<SCN> &checksum_scn)
{
int ret = OB_SUCCESS;
// extend callback list if need, this can only happened since 4.3
if (checksum.count() > 1) {
OB_ASSERT(checksum.count() == MAX_CALLBACK_LIST_COUNT);
if (OB_ISNULL(callback_lists_) &&
@ -1279,34 +1219,14 @@ int64_t ObTransCallbackMgr::inc_pending_log_size(const int64_t size)
return new_size;
}
void ObTransCallbackMgr::try_merge_multi_callback_lists(const int64_t new_size, const int64_t size, const bool is_logging_blocked)
{
if (OB_UNLIKELY(need_merge_) && !for_replay_) {
int64_t old_size = new_size - size;
if (size < 0 || new_size < 0 || old_size < 0) {
} else if ((0 != GCONF._private_buffer_size
&& old_size < GCONF._private_buffer_size
&& new_size >= GCONF._private_buffer_size)
|| is_logging_blocked) {
// merge the multi callback lists once the immediate logging is satisfied.
merge_multi_callback_lists();
}
}
}
int ObTransCallbackMgr::get_memtable_key_arr(ObMemtableKeyArray &memtable_key_arr)
{
int ret = OB_SUCCESS;
int fail_at = 0;
if (need_merge_) { // OLD (before 4.3)
ret = callback_list_.get_memtable_key_arr_w_timeout(memtable_key_arr);
CALLBACK_LISTS_FOREACH(idx, list) {
fail_at = idx;
ret = list->get_memtable_key_arr_w_timeout(memtable_key_arr);
if (OB_ITER_STOP == ret) { ret = OB_SUCCESS; }
} else { // NEW (since 4.3)
CALLBACK_LISTS_FOREACH(idx, list) {
fail_at = idx;
ret = list->get_memtable_key_arr_w_timeout(memtable_key_arr);
if (OB_ITER_STOP == ret) { ret = OB_SUCCESS; }
}
}
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "get memtablekey fail", K(ret), K(fail_at), K(memtable_key_arr));
@ -1314,7 +1234,7 @@ int ObTransCallbackMgr::get_memtable_key_arr(ObMemtableKeyArray &memtable_key_ar
return ret;
}
int ObTransCallbackMgr::acquire_callback_list(const bool new_epoch, const bool need_merge)
int ObTransCallbackMgr::acquire_callback_list(const bool new_epoch)
{
int64_t stat = ATOMIC_LOAD(&parallel_stat_);
int64_t tid = get_itid() + 1;
@ -1328,11 +1248,6 @@ int ObTransCallbackMgr::acquire_callback_list(const bool new_epoch, const bool n
//
ATOMIC_STORE(&parallel_stat_, PARALLEL_STMT);
}
// mark callback_list need merge into main
// this is compatible with version before 4.3
if (ATOMIC_LOAD(&need_merge_) != need_merge) {
ATOMIC_STORE(&need_merge_, need_merge);
}
int slot = 0;
// inc write_epoch
// for each write epoch the first thread always stay in slot 0
@ -1353,7 +1268,6 @@ int ObTransCallbackMgr::acquire_callback_list(const bool new_epoch, const bool n
void ObTransCallbackMgr::revert_callback_list()
{
int64_t stat = ATOMIC_LOAD(&parallel_stat_);
bool need_merge = ATOMIC_LOAD(&need_merge_);
const int64_t tid = get_itid() + 1;
const int slot = tid % MAX_CALLBACK_LIST_COUNT;
// if no parallel til now, all callbacks in main list, no need merge
@ -1363,15 +1277,6 @@ void ObTransCallbackMgr::revert_callback_list()
} else {
UNUSED(ATOMIC_BCAS(&parallel_stat_, stat, stat - 1));
}
need_merge = false;
}
// compatible with before version 4.3, merge to main list
if (need_merge) {
WRLockGuard guard(rwlock_);
if (OB_NOT_NULL(callback_lists_)) {
int64_t cnt = callback_list_.concat_callbacks(callback_lists_[slot]);
add_slave_list_merge_cnt(cnt);
}
}
}
@ -1445,21 +1350,11 @@ int ObTransCallbackMgr::trans_end(const bool commit)
if (!commit) {
set_skip_checksum_calc();
}
if (OB_UNLIKELY(ATOMIC_LOAD(&need_merge_))) { // OLD (before 4.3)
// If the txn ends abnormally, there may still be tasks in execution. Our
// solution is that before the txn resets, all callback_lists need be
// cleaned up after blocking new writes (through end_code). So if PDML
// exists and some data is cached in callback_lists, we need merge them into
// main callback_list
merge_multi_callback_lists();
if (OB_LIKELY(ATOMIC_LOAD(&callback_lists_) == NULL)) {
ret = commit ? callback_list_.tx_commit() : callback_list_.tx_abort();
} else { // New (since 4.3)
if (OB_LIKELY(ATOMIC_LOAD(&callback_lists_) == NULL)) {
ret = commit ? callback_list_.tx_commit() : callback_list_.tx_abort();
} else {
CALLBACK_LISTS_FOREACH(idx, list) {
ret = commit ? list->tx_commit() : list->tx_abort();
}
} else {
CALLBACK_LISTS_FOREACH(idx, list) {
ret = commit ? list->tx_commit() : list->tx_abort();
}
}
if (OB_SUCC(ret)) {
@ -1472,7 +1367,7 @@ int ObTransCallbackMgr::calc_checksum_all(ObIArray<uint64_t> &checksum)
{
RDLockGuard guard(rwlock_);
int ret = OB_SUCCESS;
if (OB_UNLIKELY(ATOMIC_LOAD(&need_merge_)) || OB_LIKELY(callback_lists_ == NULL)) {
if (OB_LIKELY(callback_lists_ == NULL)) {
callback_list_.tx_calc_checksum_all();
ret = checksum.push_back(callback_list_.get_checksum());
} else {
@ -1487,14 +1382,10 @@ int ObTransCallbackMgr::calc_checksum_all(ObIArray<uint64_t> &checksum)
void ObTransCallbackMgr::print_callbacks()
{
RDLockGuard guard(rwlock_);
if (need_merge_) {
callback_list_.tx_print_callback();
} else {
int ret = OB_SUCCESS;
CALLBACK_LISTS_FOREACH(idx, list) {
_TRANS_LOG(INFO, "print callback at CallbackList[%d]:", idx);
list->tx_print_callback();
}
int ret = OB_SUCCESS;
CALLBACK_LISTS_FOREACH(idx, list) {
_TRANS_LOG(INFO, "print callback at CallbackList[%d]:", idx);
list->tx_print_callback();
}
}
@ -1503,11 +1394,7 @@ int ObTransCallbackMgr::get_callback_list_stat(ObIArray<ObTxCallbackListStat> &s
RDLockGuard guard(rwlock_);
int ret = OB_SUCCESS;
if (rwlock_.try_rdlock()) {
if (need_merge_) {
if (OB_SUCC(stats.prepare_allocate(1))) {
ret = callback_list_.get_stat_for_display(stats.at(0));
}
} else if (OB_SUCC(stats.prepare_allocate(get_callback_list_count()))) {
if (OB_SUCC(stats.prepare_allocate(get_callback_list_count()))) {
CALLBACK_LISTS_FOREACH(idx, list) {
if (list->get_appended() > 0) {
ret = list->get_stat_for_display(stats.at(idx));
@ -1523,13 +1410,9 @@ int ObTransCallbackMgr::get_callback_list_stat(ObIArray<ObTxCallbackListStat> &s
void ObTransCallbackMgr::elr_trans_preparing()
{
if (ATOMIC_LOAD(&need_merge_)) {
callback_list_.tx_elr_preparing();
} else {
int ret = OB_SUCCESS;
CALLBACK_LISTS_FOREACH(idx, list) {
list->tx_elr_preparing();
}
int ret = OB_SUCCESS;
CALLBACK_LISTS_FOREACH(idx, list) {
list->tx_elr_preparing();
}
}
@ -2163,46 +2046,33 @@ void ObTransCallbackMgr::print_statistics(char *buf, const int64_t buf_len, int6
{
common::databuff_printf(buf, buf_len, pos,
"callback_list:{"
"cnt=%d need_merge=%d "
"stat:[",
get_callback_list_count(),
need_merge_);
if (need_merge_) {
common::databuff_printf(buf, buf_len, pos,
"main=%ld, slave=%ld, merge=%ld, ",
get_callback_main_list_append_count(),
get_callback_slave_list_append_count(),
get_callback_slave_list_merge_count());
}
common::databuff_printf(buf, buf_len, pos,
"cnt=%d stat:["
"tx_end=%ld, rollback_to=%ld, "
"fast_commit=%ld, remove_memtable=%ld, "
"ext_info_log=%ld]",
"ext_info_log=%ld] "
"detail:[(log_epoch,length,logged,synced,appended,removed,unlog_removed,branch_removed)|",
get_callback_list_count(),
get_callback_remove_for_trans_end_count(),
get_callback_remove_for_rollback_to_count(),
get_callback_remove_for_fast_commit_count(),
get_callback_remove_for_remove_memtable_count(),
get_callback_ext_info_log_count());
if (!need_merge_) {
common::databuff_printf(buf, buf_len, pos,
" detail:[(log_epoch,length,logged,synced,appended,removed,unlog_removed,branch_removed)|");
int ret = OB_SUCCESS;
CALLBACK_LISTS_FOREACH_CONST(idx, list) {
int64_t a = list->get_length(),
b = list->get_logged(),
c = list->get_synced(),
d = list->get_appended(),
e = list->get_removed(),
f = list->get_unlog_removed(),
g = list->get_branch_removed();
if (a || b || c || d || e || f || g) {
int64_t log_epoch = list->get_log_epoch();
log_epoch = log_epoch == INT64_MAX ? -1 : log_epoch;
common::databuff_printf(buf, buf_len, pos, "%d:(%ld,%ld,%ld,%ld,%ld,%ld,%ld,%ld)|", idx, log_epoch, a, b, c, d, e, f, g);
}
int ret = OB_SUCCESS;
CALLBACK_LISTS_FOREACH_CONST(idx, list) {
int64_t a = list->get_length(),
b = list->get_logged(),
c = list->get_synced(),
d = list->get_appended(),
e = list->get_removed(),
f = list->get_unlog_removed(),
g = list->get_branch_removed();
if (a || b || c || d || e || f || g) {
int64_t log_epoch = list->get_log_epoch();
log_epoch = log_epoch == INT64_MAX ? -1 : log_epoch;
common::databuff_printf(buf, buf_len, pos, "%d:(%ld,%ld,%ld,%ld,%ld,%ld,%ld,%ld)|", idx, log_epoch, a, b, c, d, e, f, g);
}
common::databuff_printf(buf, buf_len, pos, "]}");
}
common::databuff_printf(buf, buf_len, pos, "]}");
}
bool ObTransCallbackMgr::find(ObITxCallbackFinder &func)
@ -2252,7 +2122,7 @@ void ObTransCallbackMgr::check_all_redo_flushed()
__attribute__((noinline))
int ObTransCallbackMgr::get_logging_list_count() const
{
return (!need_merge_ && callback_lists_) ? MAX_CALLBACK_LIST_COUNT : 1;
return callback_lists_ ? MAX_CALLBACK_LIST_COUNT : 1;
}
bool ObTransCallbackMgr::pending_log_size_too_large(const transaction::ObTxSEQ &write_seq_no,

View File

@ -205,15 +205,12 @@ public:
parallel_stat_(0),
write_epoch_(0),
write_epoch_start_tid_(0),
need_merge_(false),
for_replay_(false),
has_branch_replayed_into_first_list_(false),
serial_final_scn_(share::SCN::max_scn()),
serial_final_seq_no_(),
serial_sync_scn_(share::SCN::min_scn()),
callback_main_list_append_count_(0),
callback_slave_list_append_count_(0),
callback_slave_list_merge_count_(0),
callback_remove_for_trans_end_count_(0),
callback_remove_for_remove_memtable_count_(0),
callback_remove_for_fast_commit_count_(0),
@ -253,7 +250,7 @@ public:
int remove_callbacks_for_fast_commit(const ObCallbackScopeArray &callbacks_arr);
int remove_callback_for_uncommited_txn(const memtable::ObMemtableSet *memtable_set);
int get_memtable_key_arr(transaction::ObMemtableKeyArray &memtable_key_arr);
int acquire_callback_list(const bool new_epoch, const bool need_merge);
int acquire_callback_list(const bool new_epoch);
void revert_callback_list();
int get_tx_seq_replay_idx(const transaction::ObTxSEQ seq) const;
common::SpinRWLock& get_rwlock() { return rwlock_; }
@ -275,7 +272,6 @@ public:
int clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction<void()> &before_remove);
// when not inc, return -1
int64_t inc_pending_log_size(const int64_t size);
void try_merge_multi_callback_lists(const int64_t new_size, const int64_t size, const bool is_logging_blocked);
void inc_flushed_log_size(const int64_t size) {
if (!serial_final_scn_.is_valid()) {
UNUSED(ATOMIC_FAA(&flushed_log_size_, size));
@ -292,17 +288,12 @@ public:
const transaction::ObTxSEQ serial_final_seq_no);
void set_skip_checksum_calc();
bool skip_checksum_calc() const { return ATOMIC_LOAD(&skip_checksum_); }
void merge_multi_callback_lists();
void reset_pdml_stat();
bool find(ObITxCallbackFinder &func);
uint64_t get_main_list_length() const
{ return callback_list_.get_length(); }
int64_t get_callback_main_list_append_count() const
{ return callback_main_list_append_count_; }
int64_t get_callback_slave_list_append_count() const
{ return callback_slave_list_append_count_; }
int64_t get_callback_slave_list_merge_count() const
{ return callback_slave_list_merge_count_; }
int64_t get_callback_remove_for_trans_end_count() const
{ return callback_remove_for_trans_end_count_; }
int64_t get_callback_remove_for_remove_memtable_count() const
@ -315,10 +306,6 @@ public:
{ return callback_ext_info_log_count_; }
void add_main_list_append_cnt(int64_t cnt = 1)
{ ATOMIC_AAF(&callback_main_list_append_count_, cnt); }
void add_slave_list_append_cnt(int64_t cnt = 1)
{ ATOMIC_AAF(&callback_slave_list_append_count_, cnt); }
void add_slave_list_merge_cnt(int64_t cnt = 1)
{ ATOMIC_AAF(&callback_slave_list_merge_count_, cnt); }
void add_tx_end_callback_remove_cnt(int64_t cnt = 1)
{ ATOMIC_AAF(&callback_remove_for_trans_end_count_, cnt); }
void add_release_memtable_callback_remove_cnt(int64_t cnt = 1)
@ -330,11 +317,9 @@ public:
void add_callback_ext_info_log_count(int64_t cnt = 1)
{ ATOMIC_AAF(&callback_ext_info_log_count_, cnt); }
int get_callback_list_count() const
{ return callback_lists_ ? (MAX_CALLBACK_LIST_COUNT + (need_merge_ ? 1 : 0)) : 1; }
{ return callback_lists_ ? MAX_CALLBACK_LIST_COUNT : 1; }
int get_logging_list_count() const;
ObTxCallbackList *get_callback_list_(const int16_t index, const bool nullable);
int is_callback_list_need_merge() const
{ return need_merge_; }
bool is_serial_final() const { return is_serial_final_(); }
bool is_callback_list_append_only(const int idx) const
{
@ -347,13 +332,11 @@ public:
K_(serial_final_seq_no),
K_(serial_sync_scn),
KP_(callback_lists),
K_(need_merge),
K_(pending_log_size),
K_(flushed_log_size),
K_(for_replay),
K_(parallel_stat));
private:
void force_merge_multi_callback_lists();
void update_serial_sync_scn_(const share::SCN scn);
bool is_serial_final_() const
{
@ -404,13 +387,6 @@ private:
// the first thread is always assigned to first callback-list
int64_t write_epoch_start_tid_;
RLOCAL_STATIC(bool, parallel_replay_);
// since 4.3, support multi-callback-list logging, extended callback-list(s)
// won't do merge, use this to indicate the OLD version.
// it was set in write path.
// NOTE: this value is default set to false is required
// On Follower, in the OLD version, data is replay into single CallbackList
// On Leader, if no write after takeover, merge is also not required
bool need_merge_;
bool for_replay_;
// used to mark that some branch callback replayed in first callback list
// actually, by default they were replayed into its own callback list by
@ -429,8 +405,6 @@ private:
share::SCN serial_sync_scn_;
// statistics for callback remove
int64_t callback_main_list_append_count_;
int64_t callback_slave_list_append_count_;
int64_t callback_slave_list_merge_count_;
int64_t callback_remove_for_trans_end_count_;
int64_t callback_remove_for_remove_memtable_count_;
int64_t callback_remove_for_fast_commit_count_;

View File

@ -767,7 +767,7 @@ void ObTxCallbackList::update_checksum(const uint64_t checksum, const SCN checks
LockGuard guard(*this, LOCK_MODE::LOCK_ITERATE);
if (checksum_scn.is_max()) {
if (checksum == 0 && id_ > 0) {
// only check extends list, because version before 4.3 with 0 may happen
// only check extends list, because version before 4.2.4 with 0 may happen
// and they will be replayed into first list (id_ equals to 0)
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "checksum should not be 0 if checksum_scn is max", KPC(this));
}

View File

@ -640,7 +640,6 @@ int ObMemtableCtx::commit_to_replay()
ATOMIC_STORE(&is_master_, false);
WRLockGuard wrguard(rwlock_);
trans_mgr_.set_for_replay(true);
trans_mgr_.merge_multi_callback_lists();
return OB_SUCCESS;
}
@ -969,18 +968,6 @@ bool ObMemtableCtx::pending_log_size_too_large(const ObTxSEQ &write_seq_no)
return ret;
}
// NB: We also donot hold the memtable context latch because it changes only
// callback list and multi callback lists which is protected by itself
void ObMemtableCtx::merge_multi_callback_lists_for_changing_leader()
{
trans_mgr_.merge_multi_callback_lists();
}
void ObMemtableCtx::merge_multi_callback_lists_for_immediate_logging()
{
trans_mgr_.merge_multi_callback_lists();
}
int ObMemtableCtx::get_table_lock_store_info(ObTableLockInfo &table_lock_info)
{
int ret = OB_SUCCESS;

View File

@ -424,8 +424,6 @@ public:
void add_lock_for_read_elapse(const int64_t elapse) { lock_for_read_elapse_ += elapse; }
int64_t get_lock_for_read_elapse() const { return lock_for_read_elapse_; }
bool pending_log_size_too_large(const transaction::ObTxSEQ &write_seq_no);
void merge_multi_callback_lists_for_changing_leader();
void merge_multi_callback_lists_for_immediate_logging();
void reset_pdml_stat();
int clean_unlog_callbacks();
int check_tx_mem_size_overflow(bool &is_overflow);
@ -465,8 +463,8 @@ public: // callback
int append_callback(ObITransCallback *cb) { return trans_mgr_.append(cb); }
int64_t get_pending_log_size() { return trans_mgr_.get_pending_log_size(); }
int64_t get_flushed_log_size() { return trans_mgr_.get_flushed_log_size(); }
int acquire_callback_list(const bool new_epoch, const bool need_merge)
{ return trans_mgr_.acquire_callback_list(new_epoch, need_merge); }
int acquire_callback_list(const bool new_epoch)
{ return trans_mgr_.acquire_callback_list(new_epoch); }
void revert_callback_list() { trans_mgr_.revert_callback_list(); }
void set_for_replay(const bool for_replay) { trans_mgr_.set_for_replay(for_replay); }
void inc_pending_log_size(const int64_t size) { trans_mgr_.inc_pending_log_size(size); }

View File

@ -181,7 +181,7 @@ int ObMemtableRowCompactor::try_cleanout_tx_node_during_compact_(ObTxTableGuard
// cleanout and filled back through commit callback. So we add the error
// log back
//
// since 4.3 multiple callback list will not merged, in commiting phase
// since 4.2.4 multiple callback list will not merged, in commiting phase
// if two TransNode of same row stay in different callback list, their
// fill back order is undefined, and this situation can happened
// ret = OB_ERR_UNEXPECTED;

View File

@ -5250,7 +5250,7 @@ void ObPartTransCtx::check_no_need_replay_checksum(const SCN &log_ts, const int
}
/*
* since 4.3, redo_lsns can not be maintained on follower with order
* since 4.2.4, redo_lsns can not be maintained on follower with order
* because the redo were replay parallelly, instead, redo_lsns only
* maintained on leader and when switch to follower, it will persistent
* redo_lsns with `RecordLog`
@ -5273,7 +5273,7 @@ int ObPartTransCtx::check_and_merge_redo_lsns_(const palf::LSN &offset)
/*
* replay redo in tx ctx
*
* since 4.3, support parallel replay redo, and the design principle is
* since 4.2.4, support parallel replay redo, and the design principle is
* seperate redo and other logs(named as Txn's Log), redo is belongs to
* memtable (and locktable), and only Txn's Log will replay into Tx ctx
* and affect the Tx ctx's state
@ -5293,10 +5293,10 @@ int ObPartTransCtx::replay_redo_in_ctx(const ObTxRedoLog &redo_log,
common::ObTimeGuard timeguard("replay_redo_in_ctx", 10 * 1000);
{
CtxLockGuard guard(lock_);
// before 4.3, cluster_version is in RedoLog, and
// before 4.2.4, cluster_version is in RedoLog, and
// the cluster_version_ is initialized to CURRENT_CLUSTER_VERSION when ctx created
// it should be correct with in redo
// since 4.3, cluster_version is in LogBlockHeader, so the cluster_version is correct
// since 4.2.4, cluster_version is in LogBlockHeader, so the cluster_version is correct
// when created, and cluster_version in RedoLog is 0
if (redo_log.get_cluster_version() > 0) {
ret = correct_cluster_version_(redo_log.get_cluster_version());
@ -5515,7 +5515,7 @@ int ObPartTransCtx::replay_rollback_to(const ObTxRollbackToLog &log,
KPC(this));
}
// this is compatible code, since 4.3, redo_lsn not collect during replay
// this is compatible code, since 4.2.4, redo_lsn not collect during replay
if (OB_SUCC(ret) && OB_FAIL(check_and_merge_redo_lsns_(offset))) {
TRANS_LOG(WARN, "check and merge redo lsns failed", K(ret), K(trans_id_), K(timestamp), K(offset));
}
@ -6516,8 +6516,6 @@ int ObPartTransCtx::switch_to_follower_forcedly(ObTxCommitCallback *&cb_list_hea
// do nothing
} else if (OB_FALSE_IT(mt_ctx_.commit_to_replay())) {
// do nothing
} else if (OB_FALSE_IT(mt_ctx_.merge_multi_callback_lists_for_changing_leader())) {
// do nothing
} else if (OB_FAIL(mt_ctx_.clean_unlog_callbacks())) {
TRANS_LOG(WARN, "clear unlog callbacks", KR(ret), K(*this));
}
@ -6633,8 +6631,6 @@ int ObPartTransCtx::switch_to_follower_gracefully(ObTxCommitCallback *&cb_list_h
}
timeguard.click();
if (OB_SUCC(ret) && need_submit_log && !need_force_abort_()) {
// We need merge all callbacklists before submitting active info
(void)mt_ctx_.merge_multi_callback_lists_for_changing_leader();
if (ObTxLogType::TX_COMMIT_INFO_LOG == log_type) {
if (OB_FAIL(submit_redo_commit_info_log_())) {
// currently, if there is no log callback, switch leader would fail,
@ -8440,23 +8436,23 @@ int ObPartTransCtx::start_access(const ObTxDesc &tx_desc, ObTxSEQ &data_scn, con
// others must wait the first thread of parallel open the write epoch
// hence this must be done in lock
if (data_scn.support_branch() && pending_write == 1) {
callback_list_idx = mt_ctx_.acquire_callback_list(true, false);
callback_list_idx = mt_ctx_.acquire_callback_list(true);
}
}
}
// other operations are allowed to out of lock
if (OB_SUCC(ret)) {
mt_ctx_.inc_ref();
if (data_scn.support_branch()) { // NEW version >= 4.3
if (data_scn.support_branch()) {
if (pending_write != 1) {
callback_list_idx = mt_ctx_.acquire_callback_list(false, false);
callback_list_idx = mt_ctx_.acquire_callback_list(false);
}
// remember selected callback_list idx into seq_no
if (data_scn.get_branch() == 0 && alloc && callback_list_idx != 0) {
data_scn.set_branch(callback_list_idx);
}
} else { // OLD version < 4.3
mt_ctx_.acquire_callback_list(false, true /* need merge to main */);
} else { // OLD version < 4.2.4
ret = OB_ERR_UNEXPECTED;
}
}
last_request_ts_ = ObClockGenerator::getClock();
@ -10449,7 +10445,7 @@ int ObPartTransCtx::set_replay_completeness_(const bool complete, const SCN repl
inline bool ObPartTransCtx::is_support_parallel_replay_() const
{
return cluster_version_accurate_ && cluster_version_ >= CLUSTER_VERSION_4_3_0_0;
return cluster_version_accurate_ && cluster_version_ >= MOCK_CLUSTER_VERSION_4_2_4_0;
}
inline bool ObPartTransCtx::is_support_tx_op_() const

View File

@ -77,9 +77,11 @@ inline int ObTransService::init_tx_(ObTxDesc &tx,
// cluster_version is invalid, need to get it
if (0 == cluster_version && OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, tx.cluster_version_))) {
TRANS_LOG(WARN, "get min data version fail", K(ret), K(tx));
} else if (tx.cluster_version_ >= DATA_VERSION_4_3_0_0) {
tx.seq_base_ = common::ObSequence::get_max_seq_no() - 1;
} else if (tx.cluster_version_ < MOCK_DATA_VERSION_4_2_4_0) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "data version should >= 4_2_4_0", K(ret), K(tx));
}
tx.seq_base_ = common::ObSequence::get_max_seq_no() - 1;
return ret;
}

View File

@ -196,8 +196,8 @@ int ObCtxRedoInfo::before_serialize()
if (OB_FAIL(compat_bytes_.set_all_member_need_ser())) {
TRANS_LOG(WARN, "reset all compat_bytes_ valid failed", K(ret));
} else {
// skip serialize cluster_version, since 4.3, cluster_version put in LogBlockHeader
TX_NO_NEED_SER(cluster_version_ >= DATA_VERSION_4_3_0_0, 1, compat_bytes_);
// skip serialize cluster_version, since 4.2.4, cluster_version put in LogBlockHeader
TX_NO_NEED_SER(true, 1, compat_bytes_);
}
} else {
if (OB_FAIL(compat_bytes_.init(1))) {
@ -383,7 +383,7 @@ int ObTxActiveInfoLog::before_serialize()
TX_NO_NEED_SER(last_op_sn_ == 0, 13, compat_bytes_);
TX_NO_NEED_SER(!first_seq_no_.is_valid(), 14, compat_bytes_);
TX_NO_NEED_SER(!last_seq_no_.is_valid(), 15, compat_bytes_);
TX_NO_NEED_SER((cluster_version_ == 0 || cluster_version_ >= DATA_VERSION_4_3_0_0), 16, compat_bytes_);
TX_NO_NEED_SER(true, 16, compat_bytes_);
TX_NO_NEED_SER(!max_submitted_seq_no_.is_valid(), 17, compat_bytes_);
TX_NO_NEED_SER(xid_.empty(), 18, compat_bytes_);
TX_NO_NEED_SER(!serial_final_seq_no_.is_valid(), 19, compat_bytes_);
@ -414,7 +414,7 @@ int ObTxCommitInfoLog::before_serialize()
TX_NO_NEED_SER(is_dup_tx_ == false, 5, compat_bytes_);
TX_NO_NEED_SER(can_elr_ == false, 6, compat_bytes_);
TX_NO_NEED_SER(incremental_participants_.empty(), 7, compat_bytes_);
TX_NO_NEED_SER((cluster_version_ == 0 || cluster_version_ >= DATA_VERSION_4_3_0_0), 8, compat_bytes_);
TX_NO_NEED_SER(true, 8, compat_bytes_);
TX_NO_NEED_SER(app_trace_id_str_.empty(), 9, compat_bytes_);
TX_NO_NEED_SER(app_trace_info_.empty(), 10, compat_bytes_);
TX_NO_NEED_SER(prev_record_lsn_.is_valid() == false, 11, compat_bytes_);
@ -1294,9 +1294,7 @@ int ObTxLogBlockHeader::before_serialize()
if (cluster_version_ == 0) {
ob_abort();
}
if (cluster_version_ >= DATA_VERSION_4_3_0_0) {
TX_NO_NEED_SER(true, 2, compat_bytes_);
}
TX_NO_NEED_SER(true, 2, compat_bytes_);
if (serialize_size_ == 0) {
calc_serialize_size_();
}

View File

@ -286,7 +286,7 @@ int ObTxReplayExecutor::try_get_tx_ctx_()
ret = OB_SUCCESS;
bool tx_ctx_existed = false;
common::ObAddr scheduler = log_block_.get_header().get_scheduler();
// since 4.3, cluster version in log_block_header
// since 4.2.4, cluster version in log_block_header
const uint64_t cluster_version = log_block_.get_header().get_cluster_version();
ObTxCreateArg arg(true, /* for_replay */
PartCtxSource::REPLAY,

View File

@ -156,7 +156,6 @@ public:
TEST_F(ObTestRedoFill, serial_single_list_fill_all_BLOCK_FROZEN)
{
set_parallel_logging(true);
callback_mgr_.need_merge_ = false;
// single list
callback_mgr_.callback_lists_ = NULL;
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(1));
@ -182,7 +181,6 @@ TEST_F(ObTestRedoFill, serial_single_list_fill_all_BLOCK_FROZEN)
TEST_F(ObTestRedoFill, serial_multi_list_fill_all_ALL_FILLED)
{
set_parallel_logging(false);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -278,7 +276,6 @@ TEST_F(ObTestRedoFill, serial_multi_list_fill_all_ALL_FILLED)
TEST_F(ObTestRedoFill, serial_multi_list_fill_all_BLOCK_FROZEN)
{
set_parallel_logging(false);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -327,7 +324,6 @@ TEST_F(ObTestRedoFill, serial_multi_list_fill_all_BLOCK_FROZEN)
TEST_F(ObTestRedoFill, serial_single_list_fill_all_BUF_NOT_ENOUGH)
{
set_parallel_logging(false);
callback_mgr_.need_merge_ = false;
// single list
callback_mgr_.callback_lists_ = NULL;
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(1));
@ -354,7 +350,6 @@ TEST_F(ObTestRedoFill, serial_single_list_fill_all_BUF_NOT_ENOUGH)
TEST_F(ObTestRedoFill, serial_single_list_fill_all_list_BIG_ROW)
{
set_parallel_logging(false);
callback_mgr_.need_merge_ = false;
// single list
callback_mgr_.callback_lists_ = NULL;
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(1));
@ -381,7 +376,6 @@ TEST_F(ObTestRedoFill, serial_single_list_fill_all_list_BIG_ROW)
TEST_F(ObTestRedoFill, serial_multi_list_fill_all_list_BIG_ROW)
{
set_parallel_logging(false);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -410,7 +404,6 @@ TEST_F(ObTestRedoFill, serial_multi_list_fill_all_list_BIG_ROW)
TEST_F(ObTestRedoFill, serial_multi_list_fill_all_OTHER_ERROR_WHEN_FILL_OTHERS)
{
set_parallel_logging(false);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -458,7 +451,6 @@ TEST_F(ObTestRedoFill, serial_multi_list_fill_all_OTHER_ERROR_WHEN_FILL_OTHERS)
TEST_F(ObTestRedoFill, serial_logging_fill_from_all_list_ALL_OTHERS_BLOCK_FROZEN_EMPTY)
{
set_parallel_logging(false);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -500,7 +492,6 @@ TEST_F(ObTestRedoFill, serial_logging_fill_from_all_list_ALL_OTHERS_BLOCK_FROZEN
TEST_F(ObTestRedoFill, serial_logging_fill_from_all_list_FIRST_ITER_END_OTHERS_BLOCK_FROZEN_EMPTY)
{
set_parallel_logging(false);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -545,7 +536,6 @@ TEST_F(ObTestRedoFill, serial_logging_fill_from_all_list_FIRST_ITER_END_OTHERS_B
TEST_F(ObTestRedoFill, parallel_multi_list_fill_all_BLOCK_FROZEN)
{
set_parallel_logging(true);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -574,7 +564,6 @@ TEST_F(ObTestRedoFill, parallel_multi_list_fill_all_BLOCK_FROZEN)
TEST_F(ObTestRedoFill, parallel_multi_list_fill_all_BLOCK_FROZEN_EMPTY)
{
set_parallel_logging(true);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -603,7 +592,6 @@ TEST_F(ObTestRedoFill, parallel_multi_list_fill_all_BLOCK_FROZEN_EMPTY)
TEST_F(ObTestRedoFill, parallel_multi_list_fill_all_BLOCK_FROZEN_FILL_FROM_OTHERS)
{
set_parallel_logging(true);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -639,7 +627,6 @@ TEST_F(ObTestRedoFill, parallel_multi_list_fill_all_BLOCK_FROZEN_FILL_FROM_OTHER
TEST_F(ObTestRedoFill, parallel_multi_list_fill_all_BLOCK_BY_OTHERS)
{
set_parallel_logging(true);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -669,7 +656,6 @@ TEST_F(ObTestRedoFill, parallel_multi_list_fill_all_BLOCK_BY_OTHERS)
TEST_F(ObTestRedoFill, parallel_logging_fill_from_all_list_ALL_FILLED_OTHERS_REMAIN)
{
set_parallel_logging(true);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -698,7 +684,6 @@ TEST_F(ObTestRedoFill, parallel_logging_fill_from_all_list_ALL_FILLED_OTHERS_REM
TEST_F(ObTestRedoFill, parallel_logging_fill_from_one_list_OTHERS_IS_EMPTY)
{
set_parallel_logging(true);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));
@ -729,7 +714,6 @@ TEST_F(ObTestRedoFill, parallel_logging_fill_from_one_list_OTHERS_IS_EMPTY)
TEST_F(ObTestRedoFill, serial_logging_fill_from_one_list_OTHERS_IS_EMPTY)
{
set_parallel_logging(false);
callback_mgr_.need_merge_ = false;
// 4 list
extend_callback_lists_(3);
EXPECT_CALL(mdo_, get_logging_list_count()).Times(AtLeast(1)).WillRepeatedly(Return(4));