[OBCDC] fix not set read_first_record while resolving misslog
This commit is contained in:

committed by
wangzelin.wzl

parent
0036ef55b1
commit
12390ca2da
@ -512,7 +512,9 @@ int ObCDCPartTransResolver::handle_record_(
|
|||||||
if (OB_FAIL(missing_info.push_back_missing_log_lsn_arr(prev_redo_lsns))) {
|
if (OB_FAIL(missing_info.push_back_missing_log_lsn_arr(prev_redo_lsns))) {
|
||||||
LOG_ERROR("push_back_missing_log_lsn_arr failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log), K(is_resolving_miss_log),
|
LOG_ERROR("push_back_missing_log_lsn_arr failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log), K(is_resolving_miss_log),
|
||||||
K(missing_info), KPC(part_trans_task));
|
K(missing_info), KPC(part_trans_task));
|
||||||
} else if (! is_first_record && OB_FAIL(missing_info.set_miss_record_log_lsn(prev_record_lsn))) {
|
} else if (is_first_record) {
|
||||||
|
part_trans_task->mark_read_first_record();
|
||||||
|
} else if (OB_FAIL(missing_info.set_miss_record_log_lsn(prev_record_lsn))) {
|
||||||
LOG_ERROR("push prev_record_lsn into missing_info failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log),
|
LOG_ERROR("push prev_record_lsn into missing_info failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log),
|
||||||
K(is_first_record), K(missing_info), KPC(part_trans_task));
|
K(is_first_record), K(missing_info), KPC(part_trans_task));
|
||||||
} else {
|
} else {
|
||||||
|
@ -63,6 +63,7 @@ public:
|
|||||||
block_builder_(tx_id, cluster_id),
|
block_builder_(tx_id, cluster_id),
|
||||||
lsn_arr_(),
|
lsn_arr_(),
|
||||||
last_record_lsn_(),
|
last_record_lsn_(),
|
||||||
|
last_normal_lsn_(),
|
||||||
trans_type_(transaction::TransType::SP_TRANS),
|
trans_type_(transaction::TransType::SP_TRANS),
|
||||||
addr_(),
|
addr_(),
|
||||||
cluster_version_(1),
|
cluster_version_(1),
|
||||||
@ -73,6 +74,7 @@ public:
|
|||||||
epoch_(1024),
|
epoch_(1024),
|
||||||
last_op_scn_(2048),
|
last_op_scn_(2048),
|
||||||
checksum_(29890209),
|
checksum_(29890209),
|
||||||
|
has_record_log_(false),
|
||||||
allocator_()
|
allocator_()
|
||||||
{
|
{
|
||||||
addr_.set_ip_addr("127.0.0.1", 2881);
|
addr_.set_ip_addr("127.0.0.1", 2881);
|
||||||
@ -115,8 +117,14 @@ public:
|
|||||||
LOG_ERROR("lsn is not valid", KR(ret));
|
LOG_ERROR("lsn is not valid", KR(ret));
|
||||||
} else if (OB_FAIL(block_builder_.next_log_block())) {
|
} else if (OB_FAIL(block_builder_.next_log_block())) {
|
||||||
LOG_ERROR("next_log_block failed", "log_entry_no", block_builder_.get_log_entry_no());
|
LOG_ERROR("next_log_block failed", "log_entry_no", block_builder_.get_log_entry_no());
|
||||||
|
} else if (has_record_log_) {
|
||||||
|
last_record_lsn_ = lsn;
|
||||||
|
last_normal_lsn_ = lsn;
|
||||||
|
has_record_log_ = false;
|
||||||
} else if (OB_FAIL(lsn_arr_.push_back(lsn))) {
|
} else if (OB_FAIL(lsn_arr_.push_back(lsn))) {
|
||||||
LOG_ERROR("push_back lsn to lsn_arr failed", KR(ret), K(lsn));
|
LOG_ERROR("push_back lsn to lsn_arr failed", KR(ret), K(lsn));
|
||||||
|
} else {
|
||||||
|
last_normal_lsn_ = lsn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,6 +175,7 @@ private:
|
|||||||
ObTxLogBlockBuilder block_builder_;
|
ObTxLogBlockBuilder block_builder_;
|
||||||
ObLogLSNArray lsn_arr_;
|
ObLogLSNArray lsn_arr_;
|
||||||
LSN last_record_lsn_;
|
LSN last_record_lsn_;
|
||||||
|
LSN last_normal_lsn_;
|
||||||
int32_t trans_type_;
|
int32_t trans_type_;
|
||||||
ObAddr addr_;
|
ObAddr addr_;
|
||||||
int64_t cluster_version_;
|
int64_t cluster_version_;
|
||||||
@ -177,6 +186,7 @@ private:
|
|||||||
int64_t epoch_;
|
int64_t epoch_;
|
||||||
int64_t last_op_scn_;
|
int64_t last_op_scn_;
|
||||||
int64_t checksum_;
|
int64_t checksum_;
|
||||||
|
bool has_record_log_;
|
||||||
common::ObArenaAllocator allocator_;
|
common::ObArenaAllocator allocator_;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -253,8 +263,9 @@ void ObTxLogGenerator::gen_record_log()
|
|||||||
}
|
}
|
||||||
ObTxRecordLog record_log(record_log_ref);
|
ObTxRecordLog record_log(record_log_ref);
|
||||||
LOG_DEBUG("gen record", K(record_log));
|
LOG_DEBUG("gen record", K(record_log));
|
||||||
lsn_arr_.reset();
|
has_record_log_ = true;
|
||||||
EXPECT_EQ(OB_SUCCESS, block_builder_.fill_tx_log_except_redo(record_log));
|
EXPECT_EQ(OB_SUCCESS, block_builder_.fill_tx_log_except_redo(record_log));
|
||||||
|
lsn_arr_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObTxLogGenerator::gen_commit_info_log()
|
void ObTxLogGenerator::gen_commit_info_log()
|
||||||
@ -308,9 +319,14 @@ void ObTxLogGenerator::gen_commit_log()
|
|||||||
palf::LSN ObTxLogGenerator::last_lsn_()
|
palf::LSN ObTxLogGenerator::last_lsn_()
|
||||||
{
|
{
|
||||||
palf::LSN lsn;
|
palf::LSN lsn;
|
||||||
const int64_t cnt = lsn_arr_.count();
|
if (last_record_lsn_.is_valid()) {
|
||||||
if (cnt > 0) {
|
if (last_normal_lsn_.is_valid()) {
|
||||||
lsn = lsn_arr_.at(cnt - 1);
|
lsn = std::max(last_record_lsn_, last_normal_lsn_);
|
||||||
|
} else {
|
||||||
|
lsn = last_record_lsn_;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
lsn = last_normal_lsn_;
|
||||||
}
|
}
|
||||||
return lsn;
|
return lsn;
|
||||||
}
|
}
|
||||||
|
@ -806,6 +806,93 @@ TEST(ObCDCPartTransResolver, DISABLED_test_sp_tx_dist4)
|
|||||||
DESTROY_OBLOG_INSTANCE();
|
DESTROY_OBLOG_INSTANCE();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(ObCDCPartTransResolver, test_sp_tx_record)
|
||||||
|
{
|
||||||
|
PREPARE_LS_FETCH_CTX();
|
||||||
|
LogEntry log_entry;
|
||||||
|
palf::LSN lsn;
|
||||||
|
LogEntry log_entry_rc0;
|
||||||
|
palf::LSN lsn_rc0;
|
||||||
|
LogEntry log_entry1;
|
||||||
|
palf::LSN lsn1;
|
||||||
|
LogEntry log_entry_rc1;
|
||||||
|
palf::LSN lsn_rc1;
|
||||||
|
LogEntry log_entry2;
|
||||||
|
palf::LSN lsn2;
|
||||||
|
LogEntry log_entry_rc2;
|
||||||
|
palf::LSN lsn_rc2;
|
||||||
|
// generate and log_entry below
|
||||||
|
log_generator.gen_redo_log();
|
||||||
|
log_generator.gen_log_entry(log_entry, lsn);
|
||||||
|
log_generator.gen_record_log();
|
||||||
|
log_generator.gen_log_entry(log_entry_rc0, lsn_rc0);
|
||||||
|
log_generator.gen_redo_log();
|
||||||
|
log_generator.gen_log_entry(log_entry1, lsn1);
|
||||||
|
log_generator.gen_record_log();
|
||||||
|
log_generator.gen_log_entry(log_entry_rc1, lsn_rc1);
|
||||||
|
log_generator.gen_redo_log();
|
||||||
|
log_generator.gen_log_entry(log_entry2, lsn2);
|
||||||
|
log_generator.gen_record_log();
|
||||||
|
log_generator.gen_log_entry(log_entry_rc2, lsn_rc2);
|
||||||
|
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry, lsn, missing_info, tsi, stop_flag));
|
||||||
|
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry_rc0, lsn_rc0, missing_info, tsi, stop_flag));
|
||||||
|
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry1, lsn1, missing_info, tsi, stop_flag));
|
||||||
|
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry_rc1, lsn_rc1, missing_info, tsi, stop_flag));
|
||||||
|
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, missing_info, tsi, stop_flag));
|
||||||
|
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry_rc2, lsn_rc2, missing_info, tsi, stop_flag));
|
||||||
|
|
||||||
|
DESTROY_OBLOG_INSTANCE();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ObCDCPartTransResolver, test_sp_tx_record_miss)
|
||||||
|
{
|
||||||
|
PREPARE_LS_FETCH_CTX();
|
||||||
|
LogEntry log_entry;
|
||||||
|
palf::LSN lsn;
|
||||||
|
LogEntry log_entry_rc0;
|
||||||
|
palf::LSN lsn_rc0;
|
||||||
|
LogEntry log_entry1;
|
||||||
|
palf::LSN lsn1;
|
||||||
|
LogEntry log_entry_rc1;
|
||||||
|
palf::LSN lsn_rc1;
|
||||||
|
LogEntry log_entry2;
|
||||||
|
palf::LSN lsn2;
|
||||||
|
LogEntry log_entry_rc2;
|
||||||
|
palf::LSN lsn_rc2;
|
||||||
|
// generate and log_entry below
|
||||||
|
log_generator.gen_redo_log();
|
||||||
|
log_generator.gen_log_entry(log_entry, lsn);
|
||||||
|
log_generator.gen_record_log();
|
||||||
|
log_generator.gen_log_entry(log_entry_rc0, lsn_rc0);
|
||||||
|
log_generator.gen_redo_log();
|
||||||
|
log_generator.gen_log_entry(log_entry1, lsn1);
|
||||||
|
log_generator.gen_record_log();
|
||||||
|
log_generator.gen_log_entry(log_entry_rc1, lsn_rc1);
|
||||||
|
log_generator.gen_redo_log();
|
||||||
|
log_generator.gen_log_entry(log_entry2, lsn2);
|
||||||
|
log_generator.gen_record_log();
|
||||||
|
log_generator.gen_log_entry(log_entry_rc2, lsn_rc2);
|
||||||
|
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry1, lsn1, missing_info, tsi, stop_flag));
|
||||||
|
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry_rc1, lsn_rc1, missing_info, tsi, stop_flag));
|
||||||
|
EXPECT_TRUE(missing_info.miss_record_log_lsn_.is_valid());
|
||||||
|
EXPECT_EQ(1, missing_info.get_total_misslog_cnt());
|
||||||
|
LOG_INFO("", K(lsn), K(lsn_rc0), K(lsn1), K(lsn_rc1), K(lsn2), K(lsn_rc2), K(missing_info));
|
||||||
|
// EXPECT_EQ(lsn, missing_info.miss_redo_or_state_lsn_arr_.at(0));
|
||||||
|
IObCDCPartTransResolver::MissingLogInfo missing_info1;
|
||||||
|
missing_info1.set_resolving_miss_log();
|
||||||
|
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry_rc0, lsn_rc0, missing_info1, tsi, stop_flag));
|
||||||
|
EXPECT_EQ(lsn, missing_info1.get_miss_redo_or_state_log_arr().at(0));
|
||||||
|
IObCDCPartTransResolver::MissingLogInfo missing_info2;
|
||||||
|
missing_info2.set_resolving_miss_log();
|
||||||
|
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry, lsn, missing_info2, tsi, stop_flag));
|
||||||
|
IObCDCPartTransResolver::MissingLogInfo missing_info3;
|
||||||
|
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, missing_info3, tsi, stop_flag));
|
||||||
|
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry_rc2, lsn_rc2, missing_info3, tsi, stop_flag));
|
||||||
|
LOG_INFO("missing_infos", K(missing_info), K(missing_info1), K(missing_info2), K(missing_info3));
|
||||||
|
|
||||||
|
DESTROY_OBLOG_INSTANCE();
|
||||||
|
}
|
||||||
|
|
||||||
TEST(ObCDCPartTransResolver, test_sp_tx_seq_example)
|
TEST(ObCDCPartTransResolver, test_sp_tx_seq_example)
|
||||||
{
|
{
|
||||||
PREPARE_LS_FETCH_CTX();
|
PREPARE_LS_FETCH_CTX();
|
||||||
@ -828,7 +915,7 @@ int main(int argc, char **argv)
|
|||||||
bool not_output_obcdc_log = true;
|
bool not_output_obcdc_log = true;
|
||||||
logger.set_file_name("test_ob_cdc_part_trans_resolver.log", not_output_obcdc_log, false);
|
logger.set_file_name("test_ob_cdc_part_trans_resolver.log", not_output_obcdc_log, false);
|
||||||
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
|
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
|
||||||
logger.set_mod_log_levels("ALL.*:DEBUG, TLOG.*:DEBUG");
|
logger.set_mod_log_levels("ALL.*:DEBUG;TLOG.*:DEBUG");
|
||||||
logger.set_enable_async_log(false);
|
logger.set_enable_async_log(false);
|
||||||
testing::InitGoogleTest(&argc,argv);
|
testing::InitGoogleTest(&argc,argv);
|
||||||
return RUN_ALL_TESTS();
|
return RUN_ALL_TESTS();
|
||||||
|
Reference in New Issue
Block a user