[OBCDC] Support BigSegmentBuf in TxLog

This commit is contained in:
SanmuWangZJU
2023-06-28 07:17:59 +00:00
committed by ob-robot
parent 033dc7668c
commit 121fd2457f
25 changed files with 426 additions and 233 deletions

View File

@ -16,6 +16,7 @@
#include "ob_cdc_lob_aux_meta_storager.h" // ObCDCLobAuxMetaStorager
#include "ob_log_instance.h" // TCTX
#include "ob_log_formatter.h" // IObLogFormatter
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
using namespace oceanbase::common;
@ -130,6 +131,7 @@ int ObCDCLobDataMerger::handle(void *data, const int64_t thread_index, volatile
{
int ret = OB_SUCCESS;
set_cdc_thread_name("LobDtMerger", thread_index);
ObLogTraceIdGuard trace_guard;
LobColumnFragmentCtx *task = static_cast<LobColumnFragmentCtx *>(data);
if (IS_NOT_INIT) {

View File

@ -42,38 +42,38 @@ IObCDCPartTransResolver::MissingLogInfo::~MissingLogInfo()
IObCDCPartTransResolver::MissingLogInfo
&IObCDCPartTransResolver::MissingLogInfo::operator=(const IObCDCPartTransResolver::MissingLogInfo &miss_log_info)
{
this->miss_redo_or_state_lsn_arr_ = miss_log_info.miss_redo_or_state_lsn_arr_;
this->miss_record_log_lsn_ = miss_log_info.miss_record_log_lsn_;
this->miss_redo_lsn_arr_ = miss_log_info.miss_redo_lsn_arr_;
this->miss_record_or_state_log_lsn_ = miss_log_info.miss_record_or_state_log_lsn_;
this->need_reconsume_commit_log_entry_ = miss_log_info.need_reconsume_commit_log_entry_;
this->is_resolving_miss_log_ = miss_log_info.is_resolving_miss_log_;
return *this;
}
int IObCDCPartTransResolver::MissingLogInfo::set_miss_record_log_lsn(const palf::LSN &record_log_lsn)
int IObCDCPartTransResolver::MissingLogInfo::set_miss_record_or_state_log_lsn(const palf::LSN &record_log_lsn)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!record_log_lsn.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("set_miss_record_log_lsn invalid record_log_lsn", KR(ret), K(record_log_lsn));
} else if (OB_UNLIKELY(miss_record_log_lsn_.is_valid())) {
LOG_ERROR("set_miss_record_or_state_log_lsn invalid record_log_lsn", KR(ret), K(record_log_lsn));
} else if (OB_UNLIKELY(miss_record_or_state_log_lsn_.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("miss_record_log_lsn already set, should not set again!", KR(ret), K(record_log_lsn), KPC(this));
LOG_ERROR("miss_record_or_state_log_lsn already set, should not set again!", KR(ret), K(record_log_lsn), KPC(this));
} else {
miss_record_log_lsn_ = record_log_lsn;
miss_record_or_state_log_lsn_ = record_log_lsn;
}
return ret;
}
int IObCDCPartTransResolver::MissingLogInfo::get_miss_record_log_lsn(palf::LSN &miss_record_lsn) const
int IObCDCPartTransResolver::MissingLogInfo::get_miss_record_or_state_log_lsn(palf::LSN &miss_record_lsn) const
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!miss_record_log_lsn_.is_valid())) {
if (OB_UNLIKELY(!miss_record_or_state_log_lsn_.is_valid())) {
ret = OB_ENTRY_NOT_EXIST;
} else {
miss_record_lsn = miss_record_log_lsn_;
miss_record_lsn = miss_record_or_state_log_lsn_;
}
return ret;
@ -86,9 +86,9 @@ int IObCDCPartTransResolver::MissingLogInfo::push_back_single_miss_log_lsn(const
if (OB_UNLIKELY(!misslog_lsn.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("misslog lsn invalid", KR(ret), K(misslog_lsn));
} else if (OB_FAIL(miss_redo_or_state_lsn_arr_.push_back(misslog_lsn))) {
} else if (OB_FAIL(miss_redo_lsn_arr_.push_back(misslog_lsn))) {
LOG_ERROR("push_back misslog lsn to missinglog_lsn_array fail", KR(ret),
K(misslog_lsn), K_(miss_redo_or_state_lsn_arr));
K(misslog_lsn), K_(miss_redo_lsn_arr));
}
return ret;
@ -102,9 +102,9 @@ int IObCDCPartTransResolver::MissingLogInfo::push_back_missing_log_lsn_arr(const
for(int64_t idx = 0; OB_SUCC(ret) && idx < misslog_lsn_arr.count(); idx++) {
const palf::LSN &lsn = misslog_lsn_arr.at(idx);
if (OB_FAIL(miss_redo_or_state_lsn_arr_.push_back(misslog_lsn_arr.at(idx)))) {
if (OB_FAIL(miss_redo_lsn_arr_.push_back(misslog_lsn_arr.at(idx)))) {
LOG_ERROR("push_back_missing_log_lsn_arr failed", KR(ret),
K(misslog_lsn_arr), K(idx), K(miss_redo_or_state_lsn_arr_), K(lsn));
K(misslog_lsn_arr), K(idx), K(miss_redo_lsn_arr_), K(lsn));
}
}
@ -113,9 +113,9 @@ int IObCDCPartTransResolver::MissingLogInfo::push_back_missing_log_lsn_arr(const
int64_t IObCDCPartTransResolver::MissingLogInfo::get_total_misslog_cnt() const
{
int64_t cnt_ret = miss_redo_or_state_lsn_arr_.count();
int64_t cnt_ret = miss_redo_lsn_arr_.count();
if (miss_record_log_lsn_.is_valid()) {
if (miss_record_or_state_log_lsn_.is_valid()) {
cnt_ret +=1;
}
@ -125,7 +125,7 @@ int64_t IObCDCPartTransResolver::MissingLogInfo::get_total_misslog_cnt() const
int IObCDCPartTransResolver::MissingLogInfo::sort_and_unique_missing_log_lsn()
{
auto fn = [](palf::LSN &lsn1, palf::LSN &lsn2) { return lsn1 < lsn2; };
return sort_and_unique_array(miss_redo_or_state_lsn_arr_, fn);
return sort_and_unique_array(miss_redo_lsn_arr_, fn);
}
// *************** ObCDCPartTransResolver public functions ***************** //
@ -189,11 +189,22 @@ int ObCDCPartTransResolver::read(
while (OB_SUCC(ret)) {
transaction::ObTxLogHeader tx_header;
if (OB_FAIL(tx_log_block.get_next_log(tx_header))) {
//
if (OB_FAIL(read_trans_header_(
lsn,
tx_log_block_header.get_tx_id(),
missing_info.is_resolving_miss_log(),
tx_log_block,
tx_header))) {
if (OB_ITER_END != ret) {
LOG_ERROR("get_next_log from tx_log_block failed", KR(ret), K_(tls_id), K(lsn),
LOG_ERROR("read_trans_header_ from tx_log_block failed", KR(ret), K_(tls_id), K(lsn),
K(tx_log_block_header), K(tx_log_block), K(tx_header), K(has_redo_in_cur_entry));
}
} else if (OB_UNLIKELY(transaction::ObTxLogType::TX_BIG_SEGMENT_LOG == tx_header.get_tx_log_type())) {
// ignore.
LOG_DEBUG("ignore tx_big_segment_log which is not collect complete",
K_(tls_id), K(lsn), K(tx_log_block_header), K(tx_header));
ret = OB_ITER_END;
} else if (missing_info.need_reconsume_commit_log_entry()
&& ! (transaction::ObTxLogType::TX_COMMIT_LOG == tx_header.get_tx_log_type())) {
// ignore tx_log which is not commit_log if is_reconsuming_commit_log_entry.
@ -270,6 +281,57 @@ int ObCDCPartTransResolver::offline(volatile bool &stop_flag)
// *************** ObCDCPartTransResolver private functions ***************** //
int ObCDCPartTransResolver::read_trans_header_(
const palf::LSN &lsn,
const transaction::ObTransID &tx_id,
const bool is_resolving_miss_log,
transaction::ObTxLogBlock &tx_log_block,
transaction::ObTxLogHeader &tx_header)
{
int ret = OB_SUCCESS;
if (OB_FAIL(tx_log_block.get_next_log(tx_header))) {
if (OB_LOG_ALREADY_SPLIT == ret) {
// need use big_segment_buf
PartTransTask *part_trans_task = NULL;
if (OB_UNLIKELY(transaction::ObTxLogType::TX_BIG_SEGMENT_LOG != tx_header.get_tx_log_type())) {
ret = OB_STATE_NOT_MATCH;
LOG_ERROR("expected TX_BIG_SEGMENT_LOG but not", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(tx_header), K(is_resolving_miss_log));
} else if (OB_FAIL(obtain_task_(tx_id, part_trans_task, is_resolving_miss_log))) {
LOG_ERROR("obtain_task_ failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(is_resolving_miss_log));
} else if (OB_FAIL(tx_log_block.get_next_log(tx_header, part_trans_task->get_segment_buf()))) {
if (OB_LOG_TOO_LARGE == ret) {
// note: will change ret to OB_SUCCESS if push_fetched_log_entry success.
if (OB_FAIL(part_trans_task->push_fetched_log_entry(lsn))) {
LOG_ERROR("push_fetched_log_entry of BigSegmentBuf Log failed", KR(ret));
} else {
LOG_DEBUG("handle_big_segment_buf part done", K_(tls_id), K(tx_id), K(lsn), K(tx_header));
}
} else if (OB_NO_NEED_UPDATE == ret) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("consume tx_log_block while segment_buf is collected done and not reseted", KR(ret),
K_(tls_id), K(lsn), K(tx_id), K(tx_header));
} else if (OB_START_LOG_CURSOR_INVALID == ret) {
// consume tx_big_segment_log in middle of parts log.
// 1. reset segment buf and reset ret to OB_SUCCESS
// 2. not mark fetched_logentry_list.
// 3. wait misslog to find all log_entry of the TX_BIG_SEGMENT_LOG
part_trans_task->get_segment_buf()->reset();
ret = OB_SUCCESS;
LOG_DEBUG("found half_part of big_segment_buf tx_log, should ignore and fetch by misslog later",
K_(tls_id), K(tx_id), K(lsn), K(tx_header));
}
}
} else if (OB_ITER_END != ret) {
LOG_ERROR("get_next_log from tx_log_block failed", KR(ret), K_(tls_id), K(lsn), K(tx_id), K(is_resolving_miss_log), K(tx_header));
}
}
return ret;
}
int ObCDCPartTransResolver::read_trans_log_(
const transaction::ObTxLogBlockHeader &tx_log_block_header,
transaction::ObTxLogBlock &tx_log_block,
@ -424,7 +486,13 @@ int ObCDCPartTransResolver::handle_redo_(
LOG_ERROR("obtain_task_ fail", KR(ret), K_(tls_id), K(tx_id), K(lsn),
K(handling_miss_log));
} else if (OB_FAIL(push_fetched_log_entry_(lsn, *task))) {
LOG_ERROR("push_fetched_log_entry failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), KPC(task));
if (OB_ENTRY_EXIST == ret) {
LOG_WARN("redo already fetched, ignore", KR(ret), K_(tls_id), K(tx_id), K(lsn),
"task_sorted_log_entry_info", task->get_sorted_log_entry_info());
ret = OB_SUCCESS;
} else {
LOG_ERROR("push_fetched_log_entry failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), KPC(task));
}
} else if (OB_FAIL(task->push_redo_log(
tx_id,
lsn,
@ -504,9 +572,9 @@ int ObCDCPartTransResolver::handle_record_(
LOG_ERROR("obtain PartTransTask failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log), K(missing_info));
} else if (OB_FAIL(part_trans_task->push_back_recored_redo_lsn_arr(prev_redo_lsns, lsn, false/*has_redo_in_cur_entry*/))) {
LOG_ERROR("push_back_recored_redo_lsn_arr failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log), K(prev_redo_lsns), KPC(part_trans_task));
} else if (OB_UNLIKELY(! missing_info.is_empty())) {
} else if (OB_UNLIKELY(missing_info.has_miss_record_or_state_log())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("expect empty missing_info while resolving record_log", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log),
LOG_ERROR("expect prev miss_record_or_state_log handled while resolving record_log", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log),
K(missing_info), KPC(part_trans_task));
} else if (is_resolving_miss_log) {
// push back all prev_log_lsns into missing_info
@ -515,7 +583,7 @@ int ObCDCPartTransResolver::handle_record_(
K(missing_info), KPC(part_trans_task));
} else if (is_first_record) {
part_trans_task->mark_read_first_record();
} else if (OB_FAIL(missing_info.set_miss_record_log_lsn(prev_record_lsn))) {
} else if (OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_record_lsn))) {
LOG_ERROR("push prev_record_lsn into missing_info failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log),
K(is_first_record), K(missing_info), KPC(part_trans_task));
} else {
@ -528,7 +596,7 @@ int ObCDCPartTransResolver::handle_record_(
} else if (is_first_record) {
part_trans_task->mark_read_first_record();
LOG_DEBUG("mark_read_first_record", K_(tls_id), K(tx_id), K(lsn), K(record_log));
} else if (OB_FAIL(missing_info.set_miss_record_log_lsn(prev_record_lsn))) {
} else if (OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_record_lsn))) {
LOG_ERROR("push prev_record_lsn into missing_info failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log),
K(is_first_record), K(missing_info), KPC(part_trans_task));
}
@ -621,6 +689,7 @@ int ObCDCPartTransResolver::handle_commit_info_(
const transaction::ObXATransID &xid = commit_info_log.get_xid();
const transaction::ObRedoLSNArray &prev_redo_lsns = commit_info_log.get_redo_lsns();
const palf::LSN &prev_record_lsn = commit_info_log.get_prev_record_lsn();
const bool has_record_log = prev_record_lsn.is_valid();
if (OB_FAIL(part_trans_task->set_commit_info(trace_id, trace_info, is_dup_tx, xid))) {
LOG_ERROR("set_commit_info failed", KR(ret), K_(tls_id), K(lsn), K(commit_info_log), KPC(part_trans_task));
@ -631,19 +700,21 @@ int ObCDCPartTransResolver::handle_commit_info_(
} else if (OB_FAIL(part_trans_task->push_back_recored_redo_lsn_arr(prev_redo_lsns, lsn, has_redo_in_cur_entry))) {
LOG_ERROR("push_back_recored_redo_lsn_arr failed", KR(ret), K(prev_redo_lsns), KPC(part_trans_task));
} else if (is_resolving_miss_log) {
// TODO pushback prev_redo_lsns to missing_info;
if (OB_FAIL(missing_info.push_back_missing_log_lsn_arr(prev_redo_lsns))) {
LOG_ERROR("push_back_missing_log_lsn_arr fail", KR(ret), K(commit_info_log), K(missing_info), KPC(part_trans_task));
} else if (prev_record_lsn.is_valid() && OB_FAIL(missing_info.set_miss_record_log_lsn(prev_record_lsn))) {
LOG_ERROR("set_miss_record_log_lsn failed", KR(ret), K(commit_info_log), K(missing_info), KPC(part_trans_task));
} else if (prev_record_lsn.is_valid() && OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_record_lsn))) {
LOG_ERROR("set_miss_record_or_state_log_lsn failed", KR(ret), K(commit_info_log), K(missing_info), KPC(part_trans_task));
}
} else if (! part_trans_task->has_find_first_record()) {
// check if (1) trans doesn't have record_log; (2) trans has record_log but found log_miss
if (OB_FAIL(check_redo_log_list_(prev_redo_lsns, *part_trans_task, missing_info))) {
if (OB_UNLIKELY(has_record_log && missing_info.need_reconsume_commit_log_entry())) {
ret = OB_STATE_NOT_MATCH;
LOG_ERROR("all record_log should already fetched while reconsume log_entry", KR(ret), K_(tls_id), K(commit_info_log));
} else if (OB_FAIL(check_redo_log_list_(prev_redo_lsns, *part_trans_task, missing_info))) {
LOG_ERROR("check_redo_log_list_ failed", KR(ret), K(commit_info_log), K(missing_info), KPC(part_trans_task));
// To handle log seq like: record redo redo redo commit_info, obcdc start after last record.
// prev_record_log_lsn is valid && not find first record
} else if (prev_record_lsn.is_valid() && OB_FAIL(missing_info.set_miss_record_log_lsn(prev_record_lsn))) {
} else if (has_record_log && OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_record_lsn))) {
LOG_ERROR("push prev_record_lsn failed", KR(ret), K(prev_record_lsn), K(missing_info), KPC(part_trans_task));
}
} else {
@ -683,7 +754,7 @@ int ObCDCPartTransResolver::handle_prepare_(
const palf::LSN &commit_info_lsn = prepare_log.get_prev_lsn();
if (commit_info_lsn.is_valid()) {
if (OB_FAIL(missing_info.push_back_single_miss_log_lsn(commit_info_lsn))) {
if (OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(commit_info_lsn))) {
LOG_ERROR("push_back_missing_log_lsn fail", KR(ret), K_(tls_id), K(tx_id),
K(prepare_log), K(commit_info_lsn), K(missing_info), KPC(part_trans_task));
} else {
@ -743,7 +814,8 @@ int ObCDCPartTransResolver::handle_commit_(
} else if (OB_FAIL(obtain_task_(tx_id, part_trans_task, is_resolving_miss_log))) {
LOG_ERROR("obtain_part_trans_task fail while reading commit log", KR(ret), K_(tls_id), K(tx_id), K(lsn),
K(commit_log), K(missing_info));
// TODO 下面是否检查sys日志流里非DDL/非LS_TABLE的事务?
} else if (OB_FAIL(part_trans_task->push_multi_data_source_data(lsn, commit_log.get_multi_source_data(), true/*is_commit_log*/))) {
LOG_ERROR("push_multi_data_source_data failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(commit_log), KPC(part_trans_task));
} else if (!part_trans_task->has_read_commit_info()) {
if (is_resolving_miss_log) {
// commit info is miss log and handled done, reconsumeing commit_log
@ -770,7 +842,7 @@ int ObCDCPartTransResolver::handle_commit_(
// LOG_ERROR("handle unserverd single CommitLog(commit_log with invalid prev_log_lsn in dist_trans) failed",
// KR(ret), K_(tls_id), K(tx_id), K(commit_log), K(lsn));
// }
} else if (OB_FAIL(missing_info.push_back_single_miss_log_lsn(prev_log_lsn))) {
} else if (OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_log_lsn))) {
LOG_ERROR("push_back_single_miss_log_lsn failed", KR(ret), K_(tls_id), K(tx_id), K(commit_log), K(missing_info));
} else {
missing_info.set_need_reconsume_commit_log_entry();

View File

@ -76,15 +76,15 @@ public:
void reset()
{
miss_redo_or_state_lsn_arr_.reset();
miss_record_log_lsn_.reset();
miss_redo_lsn_arr_.reset();
miss_record_or_state_log_lsn_.reset();
need_reconsume_commit_log_entry_ = false;
is_resolving_miss_log_ = false;
}
public:
/// has misslog or not
/// @retval bool ture if has miss_log(including redo/commit_info/prepare/commit and record_log)
bool is_empty() const { return miss_redo_or_state_lsn_arr_.count() <= 0 && !miss_record_log_lsn_.is_valid(); }
bool is_empty() const { return miss_redo_lsn_arr_.count() <= 0 && !miss_record_or_state_log_lsn_.is_valid(); }
/// set need reconsume the state log(currently need reconsume commit_info(currently enable reentrant)/commit log)
void set_need_reconsume_commit_log_entry() { need_reconsume_commit_log_entry_ = true; }
bool need_reconsume_commit_log_entry() const { return need_reconsume_commit_log_entry_; }
@ -92,39 +92,40 @@ public:
void set_resolving_miss_log() { is_resolving_miss_log_ = true; }
bool is_resolving_miss_log() const { return is_resolving_miss_log_; }
int set_miss_record_log_lsn(const palf::LSN &record_log_lsn);
int get_miss_record_log_lsn(palf::LSN &miss_record_lsn) const;
ObLogLSNArray &get_miss_redo_or_state_log_arr() { return miss_redo_or_state_lsn_arr_; }
int set_miss_record_or_state_log_lsn(const palf::LSN &record_log_lsn);
bool has_miss_record_or_state_log() const { return miss_record_or_state_log_lsn_.is_valid(); }
int get_miss_record_or_state_log_lsn(palf::LSN &miss_record_lsn) const;
ObLogLSNArray &get_miss_redo_lsn_arr() { return miss_redo_lsn_arr_; }
void reset_miss_record_or_state_log_lsn() { miss_record_or_state_log_lsn_.reset(); }
int push_back_single_miss_log_lsn(const palf::LSN &misslog_lsn);
template<typename LSN_ARRAY>
int push_back_missing_log_lsn_arr(const LSN_ARRAY &miss_log_lsn_arr);
int64_t get_total_misslog_cnt() const;
// 由于record日志要放到所有misslog的最后面去获取,不能排序或者排序时排除record日志的LSN
// 这里只处理miss_redo_or_state_log_lsn_arr
int sort_and_unique_missing_log_lsn();
TO_STRING_KV(
K_(miss_redo_or_state_lsn_arr),
K_(miss_record_log_lsn),
"miss_redo_count", miss_redo_lsn_arr_.count(),
K_(miss_redo_lsn_arr),
K_(miss_record_or_state_log_lsn),
K_(need_reconsume_commit_log_entry),
K_(is_resolving_miss_log));
private:
// miss log lsn array: redo log and state_log(commit_info/prepare)
ObLogLSNArray miss_redo_or_state_lsn_arr_;
// record log lsn
palf::LSN miss_record_log_lsn_;
// if reconsume the log_entry or not after handling miss_log
// miss redo log lsn array
ObLogLSNArray miss_redo_lsn_arr_;
// miss record log or state log(commit_info/prepare) lsn
palf::LSN miss_record_or_state_log_lsn_;
// need reconsume the log_entry or not after handling miss_log or not.
// reconsume if:
// (1) find miss_log while resolving commit_log to submit the part_trans_task
// (2) find miss_log while resolving commit_info_log in case of commit_log
// is the the same log_entry. NOTE: won't reconsume if commit_log is a miss_log.
// (1) find miss_log by check redo is complete or not while resolving commit_log
// (2) find miss_log not empty while resolving commit_log(miss_log found while resolving prepare/commit_info log
// with the the same log_entry with commit_log).
bool need_reconsume_commit_log_entry_;
// resolving miss log
// directly append miss log lsn if found miss log while resolving
// will directly append prev_log lsn while resolving miss_log
bool is_resolving_miss_log_;
// TODO use a int8_t instead the two bool variable, may add is_reconsuming var for handle commit_info and commit log
};
@ -241,6 +242,12 @@ private:
private:
// ******* tx log handler ******** //
int read_trans_header_(
const palf::LSN &lsn,
const transaction::ObTransID &tx_id,
const bool is_resolving_miss_log,
transaction::ObTxLogBlock &tx_log_block,
transaction::ObTxLogHeader &tx_header);
// read trans log from tx_log_block as ObTxxxxLog and resolve the tx log.
int read_trans_log_(
const transaction::ObTxLogBlockHeader &tx_log_block_header,

View File

@ -29,6 +29,7 @@
#include "ob_log_binlog_record_pool.h" // IObLogBRPool
#include "ob_log_config.h" // ObLogConfig
#include "ob_log_tenant_mgr.h" // IObLogTenantMgr
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
#define _STAT(level, fmt, args...) _OBLOG_COMMITTER_LOG(level, "[STAT] [COMMITTER] " fmt, ##args)
#define STAT(level, fmt, args...) OBLOG_COMMITTER_LOG(level, "[STAT] [COMMITTER] " fmt, ##args)
@ -704,6 +705,7 @@ int ObLogCommitter::dispatch_heartbeat_binlog_record_(const int64_t heartbeat_ti
void ObLogCommitter::heartbeat_routine()
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("committer has not been initialized");
@ -777,6 +779,7 @@ void ObLogCommitter::heartbeat_routine()
void ObLogCommitter::commit_routine()
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("committer has not been initialized");

View File

@ -17,6 +17,7 @@
#include "ob_log_instance.h" // IObLogErrHandler
#include "ob_log_part_trans_parser.h" // IObLogPartTransParser
#include "ob_log_part_trans_task.h" // PartTransTask
#include "ob_log_trace_id.h"
using namespace oceanbase::common;
@ -143,6 +144,7 @@ int ObLogDdlParser::handle(void *data,
volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
PartTransTask *task = (PartTransTask *)data;
if (OB_UNLIKELY(! inited_) || OB_ISNULL(part_trans_parser_)) {

View File

@ -21,6 +21,7 @@
#include "ob_log_part_trans_parser.h" // IObLogPartTransParser
#include "ob_ms_queue_thread.h" // BitSet
#include "ob_log_resource_collector.h" // IObLogResourceCollector
#include "ob_log_trace_id.h"
using namespace oceanbase::common;
@ -161,6 +162,7 @@ int ObLogDmlParser::handle(void *data,
volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
ObLogEntryTask *task = (ObLogEntryTask *)(data);
PartTransTask *part_trans_task = NULL;

View File

@ -22,6 +22,7 @@
#include "ob_log_instance.h" // IObLogErrHandler
#include "ob_log_ls_fetch_mgr.h" // IObLogLSFetchMgr
#include "ob_log_fetcher.h" // IObLogFetcher
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
namespace oceanbase
{
@ -140,6 +141,7 @@ void ObLogFetcherDeadPool::mark_stop_flag()
void ObLogFetcherDeadPool::run(const int64_t thread_index)
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("not inited");

View File

@ -21,6 +21,7 @@
#include "ob_log_instance.h" // IObLogErrHandler
#include "ob_ls_worker.h" // IObLSWorker
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
using namespace oceanbase::common;
@ -143,6 +144,7 @@ void ObLogFetcherIdlePool::mark_stop_flag()
void ObLogFetcherIdlePool::run(const int64_t thread_index)
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("not inited");

View File

@ -35,7 +35,8 @@
#include "ob_cdc_lob_data_merger.h" // IObCDCLobDataMerger
#include "ob_cdc_lob_aux_meta_storager.h" // ObCDCLobAuxMetaStorager
#include "ob_cdc_lob_aux_table_parse.h" // ObCDCLobAuxMetaStorager
#include "ob_cdc_udt.h" // ObCDCUdtValueBuilder
#include "ob_cdc_udt.h" // ObCDCUdtValueBuilder
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
using namespace oceanbase::common;
using namespace oceanbase::storage;
@ -312,6 +313,7 @@ int ObLogFormatter::handle(void *data, const int64_t thread_index, volatile bool
{
int ret = OB_SUCCESS;
set_cdc_thread_name("Formatter", thread_index);
ObLogTraceIdGuard trace_guard;
bool cur_stmt_need_callback = false;
IStmtTask *stmt_task = static_cast<IStmtTask *>(data);
DmlStmtTask *dml_stmt_task = dynamic_cast<DmlStmtTask *>(stmt_task);

View File

@ -527,11 +527,7 @@ int LSFetchCtx::read_miss_tx_log(
} else {
if (OB_FAIL(part_trans_resolver_->read(buf, buf_len, pos, lsn, submit_ts, serve_info_, missing, tsi))) {
if (OB_ITEM_NOT_SETTED == ret) {
// if found new generated miss log while resolving misslog, FetchStream will
// found new_generated_missing_info not empty and goon with misslog process
ret = OB_SUCCESS;
} else {
if (OB_ITEM_NOT_SETTED != ret) {
LOG_ERROR("resolve miss_log fail", KR(ret), K(log_entry), K(log_base_header), K(lsn), K(missing));
}
}

View File

@ -832,6 +832,7 @@ int FetchStream::read_group_entry_(palf::LogGroupEntry &group_entry,
stop_flag))) {
if (OB_ITEM_NOT_SETTED == ret) {
// handle missing_log_info
missing_info.set_resolving_miss_log();
const bool need_reconsume = missing_info.need_reconsume_commit_log_entry();
KickOutReason fail_reason = NONE;
@ -852,7 +853,6 @@ int FetchStream::read_group_entry_(palf::LogGroupEntry &group_entry,
}
} else if (need_reconsume) {
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_resolving_miss_log();
if (missing_info.need_reconsume_commit_log_entry()) {
reconsume_miss_info.set_need_reconsume_commit_log_entry();
}
@ -1466,7 +1466,7 @@ int FetchStream::fetch_miss_log_direct_(
resp->set_next_miss_lsn(miss_log_array.at(0).miss_lsn_);
while (OB_SUCC(ret) && !stop_fetch) {
bool retry_on_err = false;
while(OB_SUCC(ret) && fetched_cnt < arr_cnt && !is_timeout) {
while (OB_SUCC(ret) && fetched_cnt < arr_cnt && !is_timeout) {
const int64_t start_fetch_entry_ts = get_timestamp();
const ObCdcLSFetchMissLogReq::MissLogParam &param = miss_log_array.at(fetched_cnt);
const LSN &missing_lsn = param.miss_lsn_;
@ -1602,126 +1602,215 @@ int FetchStream::fetch_miss_log_(
int FetchStream::handle_log_miss_(
palf::LogEntry &log_entry,
IObCDCPartTransResolver::MissingLogInfo &org_missing_info,
IObCDCPartTransResolver::MissingLogInfo &missing_info,
logfetcher::TransStatInfo &tsi,
volatile bool &stop_flag,
KickOutReason &fail_reason)
{
int ret = OB_SUCCESS;
bool misslog_handle_done = false;
if (OB_UNLIKELY(org_missing_info.is_empty())) {
if (OB_UNLIKELY(missing_info.is_empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("empty missing_info", KR(ret), K(org_missing_info), K(log_entry));
} else if (OB_FAIL(org_missing_info.sort_and_unique_missing_log_lsn())) {
LOG_ERROR("sort_and_unique_missing_log_lsn failed", KR(ret), K(org_missing_info), K_(ls_fetch_ctx));
LOG_ERROR("empty missing_info", KR(ret), K(missing_info), K(log_entry));
} else {
IObCDCPartTransResolver::MissingLogInfo handling_misslog_info = org_missing_info;
int64_t fetched_missing_log_cnt = 0;
FetchLogSRpc *fetch_log_srpc = NULL;
int64_t rpc_timeout = ATOMIC_LOAD(&g_rpc_timeout);
if (OB_FAIL(alloc_fetch_log_srpc_(fetch_log_srpc))) {
LOG_ERROR("alloc fetch_log_srpc fail", KR(ret));
} else if (OB_ISNULL(fetch_log_srpc)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid fetch_log_srpc", KR(ret));
} else if (OB_FAIL(handle_miss_record_or_state_log_(
*fetch_log_srpc,
missing_info,
tsi,
stop_flag,
fail_reason))) {
LOG_ERROR("handle_miss_record_or_state_log_ failed", KR(ret));
} else if (OB_FAIL(handle_miss_redo_log_(
*fetch_log_srpc,
missing_info,
tsi,
stop_flag,
fail_reason))) {
LOG_ERROR("handle_miss_redo_log_ failed", KR(ret), KR(ret));
}
if (stop_flag) {
ret = OB_IN_STOP_STATE;
} else if (OB_NEED_RETRY == ret) {
fail_reason = KickOutReason::MISSING_LOG_FETCH_FAIL;
} else {
// for new generated miss log while handling current misslog
IObCDCPartTransResolver::MissingLogInfo new_generated_miss_info;
new_generated_miss_info.reset();
// may found new misslog while resolving misslog, here should handle all found misslog
while (OB_SUCC(ret) && !misslog_handle_done) {
const int64_t total_misslog_cnt = handling_misslog_info.get_total_misslog_cnt();
// handle current missing_info(handle by batch)
while (OB_SUCC(ret) && fetched_missing_log_cnt < total_misslog_cnt) {
ObArrayImpl<obrpc::ObCdcLSFetchMissLogReq::MissLogParam> batched_misslog_lsn_arr;
if (OB_FAIL(build_batch_misslog_lsn_arr_(
fetched_missing_log_cnt,
handling_misslog_info,
batched_misslog_lsn_arr))) {
LOG_ERROR("build_batch_misslog_lsn_arr_ failed", KR(ret),
K(handling_misslog_info), K(fetched_missing_log_cnt));
} else if (OB_FAIL(fetch_miss_log_(*fetch_log_srpc, *rpc_,
*ls_fetch_ctx_, batched_misslog_lsn_arr, svr_,
rpc_timeout))) {
LOG_ERROR("fetch_miss_log_ failed", KR(ret), K_(ls_fetch_ctx), K_(svr), K(batched_misslog_lsn_arr));
} else {
const obrpc::ObRpcResultCode &rcode = fetch_log_srpc->get_result_code();
const obrpc::ObCdcLSFetchLogResp &resp = fetch_log_srpc->get_resp(); // TODO change to fetch_miss_log RPC
if (OB_FAIL(rcode.rcode_) || OB_FAIL(resp.get_err())) {
ret = OB_NEED_RETRY;
LOG_ERROR("fetch log fail on rpc", K(rcode), K(resp), K(batched_misslog_lsn_arr));
} else {
// check next_miss_lsn
bool is_next_miss_lsn_match = false;
palf::LSN next_miss_lsn = resp.get_next_miss_lsn();
const int64_t batch_cnt = batched_misslog_lsn_arr.count();
const int64_t resp_log_cnt = resp.get_log_num();
if (batch_cnt == resp_log_cnt) {
is_next_miss_lsn_match = (batched_misslog_lsn_arr.at(batch_cnt-1).miss_lsn_ == next_miss_lsn);
} else if (batch_cnt > resp_log_cnt) {
is_next_miss_lsn_match = (batched_misslog_lsn_arr.at(resp_log_cnt).miss_lsn_ == next_miss_lsn);
} else {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("too many misslog fetched", KR(ret), K(next_miss_lsn), K(batch_cnt),
K(resp_log_cnt),K(resp), K_(ls_fetch_ctx));
}
if (OB_SUCC(ret)) {
if (!is_next_miss_lsn_match) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("misslog fetched is not match batched_misslog_lsn_arr requested", KR(ret),
K(next_miss_lsn), K(batch_cnt), K(resp_log_cnt), K(batched_misslog_lsn_arr), K(resp), K_(ls_fetch_ctx));
} else if (OB_FAIL(read_batch_misslog_(
resp,
fetched_missing_log_cnt,
tsi,
handling_misslog_info,
new_generated_miss_info))) {
LOG_ERROR("read_batch_misslog_ fail", KR(ret), K_(ls_fetch_ctx),
K(fetched_missing_log_cnt), K(handling_misslog_info), K(new_generated_miss_info));
}
}
}
}
if (OB_NEED_RETRY == ret) {
fail_reason = KickOutReason::MISSING_LOG_FETCH_FAIL;
}
}
if (OB_SUCC(ret)) {
// check fetched_missing_log_cnt == total_misslog_cnt
if (OB_UNLIKELY(handling_misslog_info.get_total_misslog_cnt() != fetched_missing_log_cnt)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("misslog not all fetched", KR(ret), K(fetched_missing_log_cnt), K(handling_misslog_info));
} else if (!new_generated_miss_info.is_empty()) {
// continue handle misslog in new_generated_miss_info
if (OB_FAIL(new_generated_miss_info.sort_and_unique_missing_log_lsn())) {
LOG_ERROR("sort_and_unique_missing_log_lsn failed", KR(ret), K(new_generated_miss_info));
} else {
fetched_missing_log_cnt = 0;
handling_misslog_info = new_generated_miss_info; // copy_assign
new_generated_miss_info.reset(); // can safely reset
}
} else {
misslog_handle_done = true;
}
}
LOG_INFO("handle miss_log", KR(ret), K(handling_misslog_info), K(misslog_handle_done), K(new_generated_miss_info));
}
LOG_INFO("handle miss_log done", KR(ret), "tls_id", ls_fetch_ctx_->get_tls_id(), K(missing_info));
}
if (OB_NOT_NULL(fetch_log_srpc)) {
free_fetch_log_srpc_(fetch_log_srpc);
fetch_log_srpc = NULL;
}
}
return ret;
}
int FetchStream::handle_miss_record_or_state_log_(
FetchLogSRpc &fetch_log_srpc,
IObCDCPartTransResolver::MissingLogInfo &missing_info,
logfetcher::TransStatInfo &tsi,
volatile bool &stop_flag,
KickOutReason &fail_reason)
{
int ret = OB_SUCCESS;
if (missing_info.has_miss_record_or_state_log()) {
int64_t rpc_timeout = ATOMIC_LOAD(&g_rpc_timeout);
ObArrayImpl<obrpc::ObCdcLSFetchMissLogReq::MissLogParam> batched_misslog_lsn_arr;
palf::LSN misslog_lsn;
while (OB_SUCC(ret) && ! stop_flag && missing_info.has_miss_record_or_state_log()) {
misslog_lsn.reset();
batched_misslog_lsn_arr.reset();
ObCdcLSFetchMissLogReq::MissLogParam param;
if (OB_FAIL(missing_info.get_miss_record_or_state_log_lsn(misslog_lsn))) {
LOG_ERROR("get_miss_record_or_state_log_lsn failed", K(missing_info), K(misslog_lsn));
} else {
param.miss_lsn_ = misslog_lsn;
if (OB_FAIL(batched_misslog_lsn_arr.push_back(param))) {
LOG_ERROR("push_back miss_record_or_state_log_lsn into batched_misslog_lsn_arr failed", KR(ret), K(param));
} else if (OB_FAIL(fetch_miss_log_(
fetch_log_srpc,
*rpc_,
*ls_fetch_ctx_,
batched_misslog_lsn_arr,
svr_,
rpc_timeout))) {
LOG_ERROR("fetch_miss_log_ failed", KR(ret), K_(ls_fetch_ctx), K_(svr), K(batched_misslog_lsn_arr));
} else {
const obrpc::ObRpcResultCode &rcode = fetch_log_srpc.get_result_code();
const obrpc::ObCdcLSFetchLogResp &resp = fetch_log_srpc.get_resp();
if (OB_FAIL(rcode.rcode_) || OB_FAIL(resp.get_err())) {
ret = OB_NEED_RETRY;
LOG_ERROR("fetch log fail on rpc", K(rcode), K(resp), K(batched_misslog_lsn_arr));
} else if (resp.get_log_num() < 1) {
LOG_INFO("fetch_miss_log_rpc doesn't fetch log, retry", K(misslog_lsn), K_(svr));
} else if (OB_UNLIKELY(resp.get_log_num() > 1)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("expect only one misslog while fetching miss_record_or_state_log", K(resp));
} else if (OB_UNLIKELY(resp.get_next_miss_lsn() != misslog_lsn)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("fetched log not match miss_log_lsn", KR(ret), K(misslog_lsn), K(resp));
} else {
missing_info.reset_miss_record_or_state_log_lsn();
palf::LogEntry miss_log_entry;
miss_log_entry.reset();
const char *buf = resp.get_log_entry_buf();
const int64_t len = resp.get_pos();
int64_t pos = 0;
if (OB_FAIL(miss_log_entry.deserialize(buf, len, pos))) {
LOG_ERROR("deserialize log_entry of miss_record_or_state_log failed", KR(ret), K(misslog_lsn), KP(buf), K(len), K(pos));
} else if (OB_FAIL(ls_fetch_ctx_->read_miss_tx_log(miss_log_entry, misslog_lsn, tsi, missing_info))) {
if (OB_ITEM_NOT_SETTED == ret) {
ret = OB_SUCCESS;
LOG_INFO("found new miss_record_or_state_log while resolving current miss_record_or_state_log",
"tls_id", ls_fetch_ctx_->get_tls_id(), K(misslog_lsn), K(missing_info));
} else {
LOG_ERROR("read miss_log failed", KR(ret), K(miss_log_entry), K(misslog_lsn), K(missing_info));
}
}
}
}
}
}
if (OB_SUCC(ret)) {
LOG_INFO("fetch record and state misslog done and collect all miss normal misslog",
"tls_id", ls_fetch_ctx_->get_tls_id(), K(missing_info));
}
}
return ret;
}
int FetchStream::handle_miss_redo_log_(
FetchLogSRpc &fetch_log_srpc,
IObCDCPartTransResolver::MissingLogInfo &missing_info,
logfetcher::TransStatInfo &tsi,
volatile bool &stop_flag,
KickOutReason &fail_reason)
{
int ret = OB_SUCCESS;
if (OB_FAIL(missing_info.sort_and_unique_missing_log_lsn())) {
LOG_ERROR("sort_and_unique_missing_log_lsn failed", KR(ret), K(missing_info), K_(ls_fetch_ctx));
} else {
const int64_t total_misslog_cnt = missing_info.get_total_misslog_cnt();
int64_t fetched_missing_log_cnt = 0;
int64_t rpc_timeout = ATOMIC_LOAD(&g_rpc_timeout);
ObArrayImpl<obrpc::ObCdcLSFetchMissLogReq::MissLogParam> batched_misslog_lsn_arr;
while (OB_SUCC(ret) && ! stop_flag && fetched_missing_log_cnt < total_misslog_cnt) {
batched_misslog_lsn_arr.reset();
if (OB_FAIL(build_batch_misslog_lsn_arr_(
fetched_missing_log_cnt,
missing_info,
batched_misslog_lsn_arr))) {
LOG_ERROR("build_batch_misslog_lsn_arr_ failed", KR(ret),
K(missing_info), K(fetched_missing_log_cnt));
} else if (OB_FAIL(fetch_miss_log_(
fetch_log_srpc,
*rpc_,
*ls_fetch_ctx_,
batched_misslog_lsn_arr,
svr_,
rpc_timeout))) {
LOG_ERROR("fetch_miss_log_ failed", KR(ret), K_(ls_fetch_ctx), K_(svr), K(batched_misslog_lsn_arr));
} else {
const obrpc::ObRpcResultCode &rcode = fetch_log_srpc.get_result_code();
const obrpc::ObCdcLSFetchLogResp &resp = fetch_log_srpc.get_resp();
if (OB_FAIL(rcode.rcode_) || OB_FAIL(resp.get_err())) {
ret = OB_NEED_RETRY;
LOG_ERROR("fetch log fail on rpc", K(rcode), K(resp), K(batched_misslog_lsn_arr));
} else {
// check next_miss_lsn
bool is_next_miss_lsn_match = false;
palf::LSN next_miss_lsn = resp.get_next_miss_lsn();
const int64_t batch_cnt = batched_misslog_lsn_arr.count();
const int64_t resp_log_cnt = resp.get_log_num();
if (batch_cnt == resp_log_cnt) {
is_next_miss_lsn_match = (batched_misslog_lsn_arr.at(batch_cnt - 1).miss_lsn_ == next_miss_lsn);
} else if (batch_cnt > resp_log_cnt) {
is_next_miss_lsn_match = (batched_misslog_lsn_arr.at(resp_log_cnt).miss_lsn_ == next_miss_lsn);
} else {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("too many misslog fetched", KR(ret), K(next_miss_lsn), K(batch_cnt),
K(resp_log_cnt),K(resp), K_(ls_fetch_ctx));
}
if (OB_SUCC(ret)) {
if (!is_next_miss_lsn_match) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("misslog fetched is not match batched_misslog_lsn_arr requested", KR(ret),
K(next_miss_lsn), K(batch_cnt), K(resp_log_cnt), K(batched_misslog_lsn_arr), K(resp), K_(ls_fetch_ctx));
} else if (OB_FAIL(read_batch_misslog_(
resp,
fetched_missing_log_cnt,
tsi,
missing_info))) {
// expected no misslog found while resolving normal log.
LOG_ERROR("read_batch_misslog failed", KR(ret), K_(ls_fetch_ctx),
K(fetched_missing_log_cnt), K(missing_info));
}
}
}
}
}
}
return ret;
@ -1734,13 +1823,13 @@ int FetchStream::build_batch_misslog_lsn_arr_(
{
int ret = OB_SUCCESS;
int64_t batched_cnt = 0;
static int64_t MAX_MISSLOG_CNT_PER_RPC= 100;
static int64_t MAX_MISSLOG_CNT_PER_RPC= 1024;
if (OB_UNLIKELY(0 < batched_misslog_lsn_arr.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid batched_misslog_lsn_arr", KR(ret), K(batched_misslog_lsn_arr));
} else {
const ObLogLSNArray &miss_redo_or_state_log_arr = missing_log_info.get_miss_redo_or_state_log_arr();
const ObLogLSNArray &miss_redo_or_state_log_arr = missing_log_info.get_miss_redo_lsn_arr();
int miss_log_cnt = miss_redo_or_state_log_arr.count();
batched_misslog_lsn_arr.reset();
@ -1759,31 +1848,13 @@ int FetchStream::build_batch_misslog_lsn_arr_(
batched_cnt++;
}
}
if (OB_SUCC(ret) && (fetched_log_idx + batched_cnt == miss_log_cnt)) {
// e.g.: already fetched log cnt is 91, current batch_cnt is 9, and total miss_log_cnt is just 100,
// then try to fetch record_log_lsn.
palf::LSN miss_record_lsn;
if (OB_FAIL(missing_log_info.get_miss_record_log_lsn(miss_record_lsn))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_ERROR("get_miss_record_log_lsn from missing_log failed", KR(ret), K(missing_log_info));
}
} else {
ObCdcLSFetchMissLogReq::MissLogParam param;
param.miss_lsn_ = miss_record_lsn;
if (OB_FAIL(batched_misslog_lsn_arr.push_back(param))) {
LOG_ERROR("push_back miss_record_log_lsn into batched_misslog_lsn_arr arr failed",
KR(ret), K(missing_log_info), K(param));
}
}
}
}
LOG_DEBUG("build_batch_misslog_lsn_arr_", KR(ret), K(missing_log_info), K(batched_misslog_lsn_arr), K(fetched_log_idx));
LOG_INFO("build_batch_misslog_lsn_arr_", KR(ret),
"tls_id", ls_fetch_ctx_->get_tls_id(),
K(missing_log_info),
"batched_misslog_lsn_count", batched_misslog_lsn_arr.count(),
K(fetched_log_idx));
return ret;
}
@ -1792,19 +1863,17 @@ int FetchStream::read_batch_misslog_(
const obrpc::ObCdcLSFetchLogResp &resp,
int64_t &fetched_missing_log_cnt,
logfetcher::TransStatInfo &tsi,
IObCDCPartTransResolver::MissingLogInfo &org_missing_info,
IObCDCPartTransResolver::MissingLogInfo &new_generated_miss_info)
IObCDCPartTransResolver::MissingLogInfo &missing_info)
{
int ret = OB_SUCCESS;
LOG_INFO("read_batch_misslog_ begin", K(resp), K(fetched_missing_log_cnt));
LOG_INFO("read_batch_misslog_ begin", "tls_id", ls_fetch_ctx_->get_tls_id(), K(resp), K(fetched_missing_log_cnt));
const int64_t total_misslog_cnt = org_missing_info.get_total_misslog_cnt();
const int64_t total_misslog_cnt = missing_info.get_total_misslog_cnt();
const char *buf = resp.get_log_entry_buf();
const int64_t len = resp.get_pos();
int64_t pos = 0;
const int64_t log_cnt = resp.get_log_num();
const ObLogLSNArray &org_misslog_arr = org_missing_info.get_miss_redo_or_state_log_arr();
new_generated_miss_info.set_resolving_miss_log();
const ObLogLSNArray &org_misslog_arr = missing_info.get_miss_redo_lsn_arr();
int64_t start_ts = get_timestamp();
if (OB_UNLIKELY(log_cnt <= 0)) {
@ -1815,21 +1884,23 @@ int FetchStream::read_batch_misslog_(
if (fetched_missing_log_cnt >= total_misslog_cnt) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("fetched_missing_log_cnt is more than total_misslog_cnt", KR(ret),
K(fetched_missing_log_cnt), K(org_missing_info), K(idx), K(resp));
K(fetched_missing_log_cnt), K(missing_info), K(idx), K(resp));
} else {
palf::LSN misslog_lsn;
palf::LogEntry miss_log_entry;
misslog_lsn.reset();
miss_log_entry.reset();
IObCDCPartTransResolver::MissingLogInfo tmp_miss_info;
tmp_miss_info.set_resolving_miss_log();
if (org_misslog_arr.count() == fetched_missing_log_cnt) {
// already consume the all miss_redo_log, but still exist one miss_record_log.
// lsn record_log is the last miss_log to fetch.
if (OB_FAIL(org_missing_info.get_miss_record_log_lsn(misslog_lsn))) {
if (OB_FAIL(missing_info.get_miss_record_or_state_log_lsn(misslog_lsn))) {
if (OB_ENTRY_NOT_EXIST == ret) {
LOG_ERROR("expect valid miss-record_log_lsn", KR(ret), K(org_missing_info), K(fetched_missing_log_cnt), K_(ls_fetch_ctx));
LOG_ERROR("expect valid miss-record_log_lsn", KR(ret), K(missing_info), K(fetched_missing_log_cnt), K_(ls_fetch_ctx));
} else {
LOG_ERROR("get_miss_record_log_lsn failed", KR(ret), K(org_missing_info), K(fetched_missing_log_cnt), K_(ls_fetch_ctx));
LOG_ERROR("get_miss_record_or_state_log_lsn failed", KR(ret), K(missing_info), K(fetched_missing_log_cnt), K_(ls_fetch_ctx));
}
}
} else if (OB_FAIL(org_misslog_arr.at(fetched_missing_log_cnt, misslog_lsn))) {
@ -1840,9 +1911,9 @@ int FetchStream::read_batch_misslog_(
if (OB_FAIL(ret)) {
} else if (OB_FAIL(miss_log_entry.deserialize(buf, len, pos))) {
LOG_ERROR("deserialize miss_log_entry fail", KR(ret), K(len), K(pos));
} else if (OB_FAIL(ls_fetch_ctx_->read_miss_tx_log(miss_log_entry, misslog_lsn, tsi, new_generated_miss_info))) {
LOG_ERROR("read_miss_log fail", KR(ret), K(miss_log_entry), K(new_generated_miss_info),
K(misslog_lsn), K(fetched_missing_log_cnt), K(idx));
} else if (OB_FAIL(ls_fetch_ctx_->read_miss_tx_log(miss_log_entry, misslog_lsn, tsi, tmp_miss_info))) {
LOG_ERROR("read_miss_log fail", KR(ret), K(miss_log_entry),
K(misslog_lsn), K(fetched_missing_log_cnt), K(idx), K(tmp_miss_info));
} else {
fetched_missing_log_cnt++;
}
@ -1851,7 +1922,7 @@ int FetchStream::read_batch_misslog_(
}
int64_t read_batch_missing_cost = get_timestamp() - start_ts;
LOG_INFO("read_batch_misslog_ end", KR(ret), K(read_batch_missing_cost),
LOG_INFO("read_batch_misslog_ end", KR(ret), "tls_id", ls_fetch_ctx_->get_tls_id(), K(read_batch_missing_cost),
K(fetched_missing_log_cnt), K(resp), K(start_ts));
return ret;

View File

@ -251,8 +251,8 @@ private:
// handle if found misslog while read_log_
//
// @param [in] log_entry LogEntry
// @param [in] org_missing_info MissingLogInfo
// @param [in] tsi logfetcher::TransStatInfo
// @param [in] missing_info MissingLogInfo
// @param [in] tsi TransStatInfo
// @param [out] fail_reason KickOutReason
//
// @retval OB_SUCCESS success
@ -260,7 +260,19 @@ private:
// @retval other error code fail
int handle_log_miss_(
palf::LogEntry &log_entry,
IObCDCPartTransResolver::MissingLogInfo &org_missing_info,
IObCDCPartTransResolver::MissingLogInfo &missing_info,
logfetcher::TransStatInfo &tsi,
volatile bool &stop_flag,
KickOutReason &fail_reason);
int handle_miss_record_or_state_log_(
FetchLogSRpc &fetch_log_srpc,
IObCDCPartTransResolver::MissingLogInfo &missing_info,
logfetcher::TransStatInfo &tsi,
volatile bool &stop_flag,
KickOutReason &fail_reason);
int handle_miss_redo_log_(
FetchLogSRpc &fetch_log_srpc,
IObCDCPartTransResolver::MissingLogInfo &missing_info,
logfetcher::TransStatInfo &tsi,
volatile bool &stop_flag,
KickOutReason &fail_reason);
@ -274,8 +286,7 @@ private:
const obrpc::ObCdcLSFetchLogResp &resp,
int64_t &fetched_missing_log_cnt,
logfetcher::TransStatInfo &tsi,
IObCDCPartTransResolver::MissingLogInfo &org_missing_info,
IObCDCPartTransResolver::MissingLogInfo &new_generated_miss_info);
IObCDCPartTransResolver::MissingLogInfo &missing_info);
int alloc_fetch_log_srpc_(FetchLogSRpc *&fetch_log_srpc);
void free_fetch_log_srpc_(FetchLogSRpc *fetch_log_srpc);
// TODO @bohou handle missing log end

View File

@ -2141,6 +2141,7 @@ PartTransTask::PartTransTask() :
ref_cnt_(0),
multi_data_source_node_arr_(),
multi_data_source_info_(),
segment_buf_(),
checkpoint_seq_(0),
global_trans_seq_(0),
global_schema_version_(OB_INVALID_VERSION),
@ -2275,6 +2276,7 @@ void PartTransTask::reset()
ref_cnt_ = 0;
multi_data_source_node_arr_.reset();
multi_data_source_info_.reset();
segment_buf_.reset();
checkpoint_seq_ = 0;
global_trans_seq_ = 0;
global_schema_version_ = OB_INVALID_VERSION;

View File

@ -22,6 +22,7 @@
#include "common/ob_queue_thread.h" // ObCond
#include "ob_cdc_tablet_to_table_info.h" // ObCDCTabletChangeInfo
#include "storage/tx/ob_trans_define.h" // ObTransID, ObLSLogInfoArray
#include "storage/tx/ob_tx_big_segment_buf.h" // ObTxBigSegmentBuf
#include "storage/memtable/ob_memtable_mutator.h" // ObMemtableMutatorRow, ObMemtableMutatorMeta
#include "storage/blocksstable/ob_datum_row.h" // ObRowDml
#include "logservice/data_dictionary/ob_data_dict_storager.h" // ObDataDictStorage
@ -1091,6 +1092,7 @@ public:
{
return ! sorted_redo_list_.has_dispatched_but_unsorted_redo();
}
transaction::ObTxBigSegmentBuf *get_segment_buf() { return &segment_buf_; }
int push_multi_data_source_data(
const palf::LSN &lsn,
const transaction::ObTxBufferNodeArray &mds_data_arr,
@ -1276,6 +1278,7 @@ private:
// For MultiDataSource
MultiDataSourceNodeArray multi_data_source_node_arr_; // array record MultiDataSourceNode
MultiDataSourceInfo multi_data_source_info_; // MultiDataSourceInfo
transaction::ObTxBigSegmentBuf segment_buf_; // ObTxBigSegmentBuf for Big Tx Log
// checkpoint seq number
//

View File

@ -21,6 +21,7 @@
#include "ob_log_store_service.h" // IObStoreService
#include "ob_log_utils.h" // get_timestamp
#include "ob_log_factory.h" // ReadLogBuf, ReadLogBufFactory
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
using namespace oceanbase::common;
@ -168,6 +169,7 @@ void ObLogReader::print_stat_info()
int ObLogReader::handle(void *data, const int64_t thread_index, volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
ObLogEntryTask *task = static_cast<ObLogEntryTask *>(data);
if (OB_UNLIKELY(! inited_)) {

View File

@ -18,6 +18,7 @@
#include "storage/tx/ob_trans_define.h" // ObTransID
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
#include "ob_log_part_trans_task.h" // PartTransTask
#include "ob_log_task_pool.h" // ObLogTransTaskPool
#include "ob_log_binlog_record_pool.h" // ObLogBRPool
@ -470,6 +471,7 @@ int ObLogResourceCollector::handle(void *data,
volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
ObLogResourceRecycleTask *recycle_task = NULL;
if (OB_UNLIKELY(! inited_)) {

View File

@ -29,6 +29,7 @@
#include "ob_log_meta_data_struct.h" // ObDictTenantInfo
#include "ob_log_ddl_processor.h" // ObLogDDLProcessor
#include "ob_log_meta_data_service.h" // GLOGMETADATASERVICE
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
#define _STAT(level, tag_str, args...) _OBLOG_SEQUENCER_LOG(level, "[STAT] [SEQ] " tag_str, ##args)
#define STAT(level, tag_str, args...) OBLOG_SEQUENCER_LOG(level, "[STAT] [SEQ] " tag_str, ##args)
@ -250,6 +251,7 @@ void ObLogSequencer::get_task_count(SeqStatInfo &stat_info)
// A thread is responsible for continually rotating the sequence of transactions that need sequence
void ObLogSequencer::run1()
{
ObLogTraceIdGuard trace_guard;
const int64_t SLEEP_US = 1000;
lib::set_thread_name("ObLogSequencerTrans");
int ret = OB_SUCCESS;
@ -389,6 +391,7 @@ int ObLogSequencer::handle_to_be_sequenced_trans_(TrxSortElem &trx_sort_elem,
int ObLogSequencer::handle(void *data, const int64_t thread_index, volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
PartTransTask *part_trans_task = static_cast<PartTransTask *>(data);
(void)ATOMIC_AAF(&queue_part_trans_task_count_, -1);

View File

@ -21,6 +21,7 @@
#include "ob_log_store_key.h"
#include "ob_log_store_task.h"
#include "ob_log_factory.h" // ObLogStoreTaskFactory
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
using namespace oceanbase::common;
@ -177,6 +178,7 @@ void ObLogStorager::thread_end()
int ObLogStorager::handle(void *data, const int64_t thread_index, volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
IObLogBatchBufTask *task = static_cast<IObLogBatchBufTask *>(data);
Block *block = static_cast<Block *>(task);
LOG_DEBUG("Storager handle succ", "addr", &task, KPC(task), KPC(block));

View File

@ -21,6 +21,7 @@
#include "ob_log_schema_getter.h" // IObLogSchemaGetter
#include "ob_log_tenant_mgr.h" // IObLogTenantMgr
#include "ob_log_config.h" // TCONF
#include "ob_log_trace_id.h"
#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[STAT] [SYS_LS_HANDLER] " fmt, ##args)
#define STAT(level, fmt, args...) OBLOG_LOG(level, "[STAT] [SYS_LS_HANDLER] " fmt, ##args)
@ -426,6 +427,7 @@ int ObLogSysLsTaskHandler::dispatch_task_(
void ObLogSysLsTaskHandler::handle_task_routine()
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
while (! stop_flag_ && OB_SUCCESS == ret) {
PartTransTask *task = NULL;

View File

@ -74,7 +74,10 @@ typedef SortedLightyList<LogEntryNode> SortedLogEntryArray;
class SortedLogEntryInfo
{
public:
SortedLogEntryInfo() : last_fetched_redo_log_entry_(NULL), fetched_log_entry_arr_(true), recorded_lsn_arr_() {}
SortedLogEntryInfo() :
last_fetched_redo_log_entry_(NULL),
fetched_log_entry_arr_(true), /*is_unique*/
recorded_lsn_arr_() {}
~SortedLogEntryInfo() { reset(); }
void reset()
{
@ -99,7 +102,11 @@ public:
SortedLogEntryArray &get_fetched_log_entry_node_arr() { return fetched_log_entry_arr_; }
TO_STRING_KV(K_(fetched_log_entry_arr), K_(recorded_lsn_arr));
TO_STRING_KV(
"fetched_log_entry_count", fetched_log_entry_arr_.count(),
"recorded_lsn_count", recorded_lsn_arr_.count(),
K_(fetched_log_entry_arr),
K_(recorded_lsn_arr));
private:
LogEntryNode *last_fetched_redo_log_entry_;
// hold all fetched log_entry_info.(include lsn of log_entry which contains redo_log and rollback_to log)

View File

@ -20,6 +20,7 @@
#include "ob_log_trans_msg_sorter.h"
#include "ob_log_instance.h" // IObLogErrHandler
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
#define RETRY_FUNC_ON_ERROR_WITH_USLEEP(err_no, var, func, args...) \
do {\
@ -186,6 +187,7 @@ void ObLogTransMsgSorter::mark_stop_flag()
void ObLogTransMsgSorter::handle(void *data)
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
TransCtx *trans = NULL;
if (OB_UNLIKELY(IS_NOT_INIT)) {

View File

@ -17,9 +17,9 @@ cluster_db_name=oceanbase
cluster_id_black_list=|
cluster_id_black_value_max=2147483647
cluster_id_black_value_min=2147473648
cluster_password=admin
cluster_url=***
cluster_user=admin@sys
cluster_password=default
cluster_url=|
cluster_user=default
cluster_version_refresh_interval_sec=600
config_fpath=etc/libobcdc.conf
data_start_schema_version=|
@ -35,11 +35,13 @@ enable_filter_sys_tenant=0
enable_formatter_print_log=0
enable_global_unique_index_belong_to_multi_instance=0
enable_hbase_mode=0
enable_log_limit=1
enable_oracle_mode_match_case_sensitive=0
enable_output_hidden_primary_key=1
enable_output_invisible_column=0
enable_output_trans_order_by_sql_operation=0
enable_verify_mode=1
extra_redo_dispatch_memory_size=4M
fetch_log_rpc_timeout_sec=15
fetch_stream_cached_count=16
fetching_log_mode=integrated
@ -47,21 +49,21 @@ formatter_thread_num=10
global_data_start_schema_version=0
history_schema_version_count=16
idle_pool_thread_num=4
init_log_level=ALL.*:INFO;SHARE.SCHEMA:INFO
init_log_level=ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:INFO
instance_index=0
instance_num=1
io_thread_num=4
lob_data_merger_thread_num=2
log_clean_cycle_time_in_hours=24
log_entry_task_prealloc_count=100000
log_level=ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:WARN
log_level=ALL.*:WARN;STORAGE.TRANS:DEBUG;TLOG.*:INFO;TLOG.FETCHER:DEBUG
log_router_background_refresh_interval_sec=10
ls_count_upper_limit=2000000
ls_fetch_progress_update_timeout_sec=15
ls_fetch_progress_update_timeout_sec_for_lagged_replica=3
max_log_file_count=40
memory_limit=20G
meta_data_refresh_mode=data_dict
meta_data_refresh_mode=online
msg_sorter_task_count_upper_limit=200000
msg_sorter_thread_num=1
need_verify_ob_trace_id=0
@ -88,7 +90,7 @@ reader_queue_length=102400
reader_thread_num=10
ready_to_seq_task_upper_bound=20000
redo_dispatched_memory_limit_exceed_ratio=2
redo_dispatcher_memory_limit=1G
redo_dispatcher_memory_limit=512M
region=default_region
resource_collector_thread_num=10
resource_collector_thread_num_for_br=7
@ -116,7 +118,7 @@ sql_server_change_interval_sec=60
ssl_client_authentication=0
ssl_external_kms_info=|
start_lsn_locator_batch_count=5
start_lsn_locator_locate_count=3
start_lsn_locator_locate_count=1
start_lsn_locator_rpc_timeout_sec=60
start_lsn_locator_thread_num=4
storager_mem_percentage=2
@ -131,7 +133,7 @@ system_memory_avail_percentage_lower_bound=10
tablegroup_black_list=|
tablegroup_white_list=*.*
tb_black_list=|
tb_white_list=oracle.*.*
tb_white_list=*cdc*.*.*
tenant_manager_memory_upper_limit=5G
tenant_sql_connect_timeout_sec=40
tenant_sql_query_timeout_sec=30

View File

@ -24668,7 +24668,6 @@ int ObDDLService::drop_database(const ObDropDatabaseArg &arg,
actual_trans, schema_guard, &arg.ddl_stmt_str_))) {
LOG_WARN("drop database to recyclebin failed", K(arg), K(ret));
}
(void) actual_trans.disable_serialize_inc_schemas();
} else {
if (OB_FAIL(ret)) {
// FAIL

View File

@ -28,7 +28,7 @@
#include "share/scn.h"
//#include <cstdint>
// #define OB_TX_MDS_LOG_USE_BIT_SEGMENT_BUF
#define OB_TX_MDS_LOG_USE_BIT_SEGMENT_BUF
namespace oceanbase
{

View File

@ -152,7 +152,7 @@ TEST(ObCDCPartTransResolver, test_misslog_info_basic)
{
int ret = OB_SUCCESS;
IObCDCPartTransResolver::MissingLogInfo missing_info;
ObLogLSNArray &missing_log_id = missing_info.get_miss_redo_or_state_log_arr();
ObLogLSNArray &missing_log_id = missing_info.get_miss_redo_lsn_arr();
// prepare data
palf::LSN lsn_1(1);
@ -366,7 +366,7 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq2_miss)
LOG_DEBUG("read log2", K(lsn), K(lsn2), K(missing_info));
EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry());
EXPECT_EQ(1, missing_info.get_total_misslog_cnt());
EXPECT_EQ(lsn, missing_info.get_miss_redo_or_state_log_arr().at(0));
EXPECT_EQ(lsn, missing_info.get_miss_redo_lsn_arr().at(0));
missing_info.reset();
missing_info.set_resolving_miss_log();
IObCDCPartTransResolver::MissingLogInfo new_miss_log;
@ -425,7 +425,6 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq3_miss)
EXPECT_EQ(0, new_miss_log.get_total_misslog_cnt());
EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry());
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_resolving_miss_log();
reconsume_miss_info.set_need_reconsume_commit_log_entry();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, reconsume_miss_info, tsi, stop_flag));
EXPECT_EQ(0, reconsume_miss_info.get_total_misslog_cnt());
@ -478,13 +477,12 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq4_miss_1)
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry2, lsn2, missing_info, tsi, stop_flag));
EXPECT_EQ(1, missing_info.get_total_misslog_cnt());
EXPECT_EQ(lsn, missing_info.get_miss_redo_or_state_log_arr().at(0));
EXPECT_EQ(lsn, missing_info.get_miss_redo_lsn_arr().at(0));
EXPECT_FALSE(missing_info.need_reconsume_commit_log_entry()); // commit_info entry is not miss_log, need reconsume
IObCDCPartTransResolver::MissingLogInfo new_miss_log;
new_miss_log.set_resolving_miss_log();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log));
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_resolving_miss_log();
reconsume_miss_info.set_need_reconsume_commit_log_entry();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, reconsume_miss_info, tsi, stop_flag));
missing_info.reset();
@ -513,20 +511,20 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq4_miss_2)
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag));
EXPECT_EQ(1, missing_info.get_total_misslog_cnt());
EXPECT_EQ(lsn2, missing_info.get_miss_redo_or_state_log_arr().at(0));
EXPECT_EQ(lsn2, missing_info.miss_record_or_state_log_lsn_);
EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry());
IObCDCPartTransResolver::MissingLogInfo new_miss_log;
new_miss_log.set_resolving_miss_log();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log));
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log));
EXPECT_EQ(1, new_miss_log.get_total_misslog_cnt());
EXPECT_EQ(lsn, new_miss_log.get_miss_redo_or_state_log_arr().at(0));
EXPECT_EQ(lsn, new_miss_log.get_miss_redo_lsn_arr().at(0));
EXPECT_FALSE(new_miss_log.need_reconsume_commit_log_entry());
IObCDCPartTransResolver::MissingLogInfo new_miss_log_2;
new_miss_log_2.set_resolving_miss_log();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log_2));
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_resolving_miss_log();
reconsume_miss_info.set_need_reconsume_commit_log_entry();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, reconsume_miss_info, tsi, stop_flag));
@ -579,13 +577,12 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq5_miss)
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry2, lsn2, missing_info, tsi, stop_flag));
EXPECT_EQ(1, missing_info.get_total_misslog_cnt());
EXPECT_EQ(lsn, missing_info.get_miss_redo_or_state_log_arr().at(0));
EXPECT_EQ(lsn, missing_info.get_miss_redo_lsn_arr().at(0));
EXPECT_FALSE(missing_info.need_reconsume_commit_log_entry()); // commit_info entry is not miss_log, need reconsume
IObCDCPartTransResolver::MissingLogInfo new_miss_log;
new_miss_log.set_resolving_miss_log();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log));
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_resolving_miss_log();
reconsume_miss_info.set_need_reconsume_commit_log_entry();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, reconsume_miss_info, tsi, stop_flag));
missing_info.reset();
@ -644,8 +641,8 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq6_miss)
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag));
EXPECT_EQ(2, missing_info.get_total_misslog_cnt());
EXPECT_EQ(lsn, missing_info.get_miss_redo_or_state_log_arr().at(0));
EXPECT_EQ(lsn2, missing_info.get_miss_redo_or_state_log_arr().at(1));
EXPECT_EQ(lsn, missing_info.get_miss_redo_lsn_arr().at(0));
EXPECT_EQ(lsn2, missing_info.get_miss_redo_lsn_arr().at(1));
EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry());
IObCDCPartTransResolver::MissingLogInfo new_miss_log;
new_miss_log.set_resolving_miss_log();
@ -654,7 +651,6 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq6_miss)
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log));
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_resolving_miss_log();
reconsume_miss_info.set_need_reconsume_commit_log_entry();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, reconsume_miss_info, tsi, stop_flag));
@ -738,19 +734,18 @@ TEST(ObCDCPartTransResolver, test_sp_tx_dist_miss2)
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag));
EXPECT_EQ(1, missing_info.get_total_misslog_cnt());
EXPECT_EQ(lsn2, missing_info.get_miss_redo_or_state_log_arr().at(0));
EXPECT_EQ(lsn2, missing_info.miss_record_or_state_log_lsn_);
EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry());
IObCDCPartTransResolver::MissingLogInfo new_miss_log;
new_miss_log.set_resolving_miss_log();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log));
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log));
EXPECT_EQ(1, new_miss_log.get_total_misslog_cnt());
EXPECT_EQ(lsn, new_miss_log.get_miss_redo_or_state_log_arr().at(0));
EXPECT_EQ(lsn, new_miss_log.miss_record_or_state_log_lsn_);
new_miss_log.reset();
new_miss_log.set_resolving_miss_log();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log));
EXPECT_EQ(0, new_miss_log.get_total_misslog_cnt());
missing_info.reset();
missing_info.set_resolving_miss_log();
missing_info.set_need_reconsume_commit_log_entry();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag));
@ -880,14 +875,14 @@ TEST(ObCDCPartTransResolver, test_sp_tx_record_miss)
log_generator.gen_log_entry(log_entry_rc2, lsn_rc2);
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry1, lsn1, missing_info, tsi, stop_flag));
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry_rc1, lsn_rc1, missing_info, tsi, stop_flag));
EXPECT_TRUE(missing_info.miss_record_log_lsn_.is_valid());
EXPECT_TRUE(missing_info.miss_record_or_state_log_lsn_.is_valid());
EXPECT_EQ(1, missing_info.get_total_misslog_cnt());
LOG_INFO("", K(lsn), K(lsn_rc0), K(lsn1), K(lsn_rc1), K(lsn2), K(lsn_rc2), K(missing_info));
// EXPECT_EQ(lsn, missing_info.miss_redo_or_state_lsn_arr_.at(0));
IObCDCPartTransResolver::MissingLogInfo missing_info1;
missing_info1.set_resolving_miss_log();
EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry_rc0, lsn_rc0, missing_info1, tsi, stop_flag));
EXPECT_EQ(lsn, missing_info1.get_miss_redo_or_state_log_arr().at(0));
EXPECT_EQ(lsn, missing_info1.get_miss_redo_lsn_arr().at(0));
IObCDCPartTransResolver::MissingLogInfo missing_info2;
missing_info2.set_resolving_miss_log();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry, lsn, missing_info2, tsi, stop_flag));