diff --git a/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp b/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp index ead9da7dcb..23d733f51d 100644 --- a/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp +++ b/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp @@ -16,6 +16,7 @@ #include "ob_cdc_lob_aux_meta_storager.h" // ObCDCLobAuxMetaStorager #include "ob_log_instance.h" // TCTX #include "ob_log_formatter.h" // IObLogFormatter +#include "ob_log_trace_id.h" // ObLogTraceIdGuard using namespace oceanbase::common; @@ -130,6 +131,7 @@ int ObCDCLobDataMerger::handle(void *data, const int64_t thread_index, volatile { int ret = OB_SUCCESS; set_cdc_thread_name("LobDtMerger", thread_index); + ObLogTraceIdGuard trace_guard; LobColumnFragmentCtx *task = static_cast(data); if (IS_NOT_INIT) { diff --git a/src/logservice/libobcdc/src/ob_cdc_part_trans_resolver.cpp b/src/logservice/libobcdc/src/ob_cdc_part_trans_resolver.cpp index fbeaede64d..d663b77e69 100644 --- a/src/logservice/libobcdc/src/ob_cdc_part_trans_resolver.cpp +++ b/src/logservice/libobcdc/src/ob_cdc_part_trans_resolver.cpp @@ -42,38 +42,38 @@ IObCDCPartTransResolver::MissingLogInfo::~MissingLogInfo() IObCDCPartTransResolver::MissingLogInfo &IObCDCPartTransResolver::MissingLogInfo::operator=(const IObCDCPartTransResolver::MissingLogInfo &miss_log_info) { - this->miss_redo_or_state_lsn_arr_ = miss_log_info.miss_redo_or_state_lsn_arr_; - this->miss_record_log_lsn_ = miss_log_info.miss_record_log_lsn_; + this->miss_redo_lsn_arr_ = miss_log_info.miss_redo_lsn_arr_; + this->miss_record_or_state_log_lsn_ = miss_log_info.miss_record_or_state_log_lsn_; this->need_reconsume_commit_log_entry_ = miss_log_info.need_reconsume_commit_log_entry_; this->is_resolving_miss_log_ = miss_log_info.is_resolving_miss_log_; return *this; } -int IObCDCPartTransResolver::MissingLogInfo::set_miss_record_log_lsn(const palf::LSN &record_log_lsn) +int IObCDCPartTransResolver::MissingLogInfo::set_miss_record_or_state_log_lsn(const palf::LSN &record_log_lsn) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!record_log_lsn.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_ERROR("set_miss_record_log_lsn invalid record_log_lsn", KR(ret), K(record_log_lsn)); - } else if (OB_UNLIKELY(miss_record_log_lsn_.is_valid())) { + LOG_ERROR("set_miss_record_or_state_log_lsn invalid record_log_lsn", KR(ret), K(record_log_lsn)); + } else if (OB_UNLIKELY(miss_record_or_state_log_lsn_.is_valid())) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("miss_record_log_lsn already set, should not set again!", KR(ret), K(record_log_lsn), KPC(this)); + LOG_ERROR("miss_record_or_state_log_lsn already set, should not set again!", KR(ret), K(record_log_lsn), KPC(this)); } else { - miss_record_log_lsn_ = record_log_lsn; + miss_record_or_state_log_lsn_ = record_log_lsn; } return ret; } -int IObCDCPartTransResolver::MissingLogInfo::get_miss_record_log_lsn(palf::LSN &miss_record_lsn) const +int IObCDCPartTransResolver::MissingLogInfo::get_miss_record_or_state_log_lsn(palf::LSN &miss_record_lsn) const { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!miss_record_log_lsn_.is_valid())) { + if (OB_UNLIKELY(!miss_record_or_state_log_lsn_.is_valid())) { ret = OB_ENTRY_NOT_EXIST; } else { - miss_record_lsn = miss_record_log_lsn_; + miss_record_lsn = miss_record_or_state_log_lsn_; } return ret; @@ -86,9 +86,9 @@ int IObCDCPartTransResolver::MissingLogInfo::push_back_single_miss_log_lsn(const if (OB_UNLIKELY(!misslog_lsn.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("misslog lsn invalid", KR(ret), K(misslog_lsn)); - } else if (OB_FAIL(miss_redo_or_state_lsn_arr_.push_back(misslog_lsn))) { + } else if (OB_FAIL(miss_redo_lsn_arr_.push_back(misslog_lsn))) { LOG_ERROR("push_back misslog lsn to missinglog_lsn_array fail", KR(ret), - K(misslog_lsn), K_(miss_redo_or_state_lsn_arr)); + K(misslog_lsn), K_(miss_redo_lsn_arr)); } return ret; @@ -102,9 +102,9 @@ int IObCDCPartTransResolver::MissingLogInfo::push_back_missing_log_lsn_arr(const for(int64_t idx = 0; OB_SUCC(ret) && idx < misslog_lsn_arr.count(); idx++) { const palf::LSN &lsn = misslog_lsn_arr.at(idx); - if (OB_FAIL(miss_redo_or_state_lsn_arr_.push_back(misslog_lsn_arr.at(idx)))) { + if (OB_FAIL(miss_redo_lsn_arr_.push_back(misslog_lsn_arr.at(idx)))) { LOG_ERROR("push_back_missing_log_lsn_arr failed", KR(ret), - K(misslog_lsn_arr), K(idx), K(miss_redo_or_state_lsn_arr_), K(lsn)); + K(misslog_lsn_arr), K(idx), K(miss_redo_lsn_arr_), K(lsn)); } } @@ -113,9 +113,9 @@ int IObCDCPartTransResolver::MissingLogInfo::push_back_missing_log_lsn_arr(const int64_t IObCDCPartTransResolver::MissingLogInfo::get_total_misslog_cnt() const { - int64_t cnt_ret = miss_redo_or_state_lsn_arr_.count(); + int64_t cnt_ret = miss_redo_lsn_arr_.count(); - if (miss_record_log_lsn_.is_valid()) { + if (miss_record_or_state_log_lsn_.is_valid()) { cnt_ret +=1; } @@ -125,7 +125,7 @@ int64_t IObCDCPartTransResolver::MissingLogInfo::get_total_misslog_cnt() const int IObCDCPartTransResolver::MissingLogInfo::sort_and_unique_missing_log_lsn() { auto fn = [](palf::LSN &lsn1, palf::LSN &lsn2) { return lsn1 < lsn2; }; - return sort_and_unique_array(miss_redo_or_state_lsn_arr_, fn); + return sort_and_unique_array(miss_redo_lsn_arr_, fn); } // *************** ObCDCPartTransResolver public functions ***************** // @@ -189,11 +189,22 @@ int ObCDCPartTransResolver::read( while (OB_SUCC(ret)) { transaction::ObTxLogHeader tx_header; - if (OB_FAIL(tx_log_block.get_next_log(tx_header))) { + // + if (OB_FAIL(read_trans_header_( + lsn, + tx_log_block_header.get_tx_id(), + missing_info.is_resolving_miss_log(), + tx_log_block, + tx_header))) { if (OB_ITER_END != ret) { - LOG_ERROR("get_next_log from tx_log_block failed", KR(ret), K_(tls_id), K(lsn), + LOG_ERROR("read_trans_header_ from tx_log_block failed", KR(ret), K_(tls_id), K(lsn), K(tx_log_block_header), K(tx_log_block), K(tx_header), K(has_redo_in_cur_entry)); } + } else if (OB_UNLIKELY(transaction::ObTxLogType::TX_BIG_SEGMENT_LOG == tx_header.get_tx_log_type())) { + // ignore. + LOG_DEBUG("ignore tx_big_segment_log which is not collect complete", + K_(tls_id), K(lsn), K(tx_log_block_header), K(tx_header)); + ret = OB_ITER_END; } else if (missing_info.need_reconsume_commit_log_entry() && ! (transaction::ObTxLogType::TX_COMMIT_LOG == tx_header.get_tx_log_type())) { // ignore tx_log which is not commit_log if is_reconsuming_commit_log_entry. @@ -270,6 +281,57 @@ int ObCDCPartTransResolver::offline(volatile bool &stop_flag) // *************** ObCDCPartTransResolver private functions ***************** // + +int ObCDCPartTransResolver::read_trans_header_( + const palf::LSN &lsn, + const transaction::ObTransID &tx_id, + const bool is_resolving_miss_log, + transaction::ObTxLogBlock &tx_log_block, + transaction::ObTxLogHeader &tx_header) +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(tx_log_block.get_next_log(tx_header))) { + if (OB_LOG_ALREADY_SPLIT == ret) { + // need use big_segment_buf + PartTransTask *part_trans_task = NULL; + + if (OB_UNLIKELY(transaction::ObTxLogType::TX_BIG_SEGMENT_LOG != tx_header.get_tx_log_type())) { + ret = OB_STATE_NOT_MATCH; + LOG_ERROR("expected TX_BIG_SEGMENT_LOG but not", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(tx_header), K(is_resolving_miss_log)); + } else if (OB_FAIL(obtain_task_(tx_id, part_trans_task, is_resolving_miss_log))) { + LOG_ERROR("obtain_task_ failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(is_resolving_miss_log)); + } else if (OB_FAIL(tx_log_block.get_next_log(tx_header, part_trans_task->get_segment_buf()))) { + if (OB_LOG_TOO_LARGE == ret) { + // note: will change ret to OB_SUCCESS if push_fetched_log_entry success. + if (OB_FAIL(part_trans_task->push_fetched_log_entry(lsn))) { + LOG_ERROR("push_fetched_log_entry of BigSegmentBuf Log failed", KR(ret)); + } else { + LOG_DEBUG("handle_big_segment_buf part done", K_(tls_id), K(tx_id), K(lsn), K(tx_header)); + } + } else if (OB_NO_NEED_UPDATE == ret) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("consume tx_log_block while segment_buf is collected done and not reseted", KR(ret), + K_(tls_id), K(lsn), K(tx_id), K(tx_header)); + } else if (OB_START_LOG_CURSOR_INVALID == ret) { + // consume tx_big_segment_log in middle of parts log. + // 1. reset segment buf and reset ret to OB_SUCCESS + // 2. not mark fetched_logentry_list. + // 3. wait misslog to find all log_entry of the TX_BIG_SEGMENT_LOG + part_trans_task->get_segment_buf()->reset(); + ret = OB_SUCCESS; + LOG_DEBUG("found half_part of big_segment_buf tx_log, should ignore and fetch by misslog later", + K_(tls_id), K(tx_id), K(lsn), K(tx_header)); + } + } + } else if (OB_ITER_END != ret) { + LOG_ERROR("get_next_log from tx_log_block failed", KR(ret), K_(tls_id), K(lsn), K(tx_id), K(is_resolving_miss_log), K(tx_header)); + } + } + + return ret; +} + int ObCDCPartTransResolver::read_trans_log_( const transaction::ObTxLogBlockHeader &tx_log_block_header, transaction::ObTxLogBlock &tx_log_block, @@ -424,7 +486,13 @@ int ObCDCPartTransResolver::handle_redo_( LOG_ERROR("obtain_task_ fail", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(handling_miss_log)); } else if (OB_FAIL(push_fetched_log_entry_(lsn, *task))) { - LOG_ERROR("push_fetched_log_entry failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), KPC(task)); + if (OB_ENTRY_EXIST == ret) { + LOG_WARN("redo already fetched, ignore", KR(ret), K_(tls_id), K(tx_id), K(lsn), + "task_sorted_log_entry_info", task->get_sorted_log_entry_info()); + ret = OB_SUCCESS; + } else { + LOG_ERROR("push_fetched_log_entry failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), KPC(task)); + } } else if (OB_FAIL(task->push_redo_log( tx_id, lsn, @@ -504,9 +572,9 @@ int ObCDCPartTransResolver::handle_record_( LOG_ERROR("obtain PartTransTask failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log), K(missing_info)); } else if (OB_FAIL(part_trans_task->push_back_recored_redo_lsn_arr(prev_redo_lsns, lsn, false/*has_redo_in_cur_entry*/))) { LOG_ERROR("push_back_recored_redo_lsn_arr failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log), K(prev_redo_lsns), KPC(part_trans_task)); - } else if (OB_UNLIKELY(! missing_info.is_empty())) { + } else if (OB_UNLIKELY(missing_info.has_miss_record_or_state_log())) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("expect empty missing_info while resolving record_log", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log), + LOG_ERROR("expect prev miss_record_or_state_log handled while resolving record_log", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log), K(missing_info), KPC(part_trans_task)); } else if (is_resolving_miss_log) { // push back all prev_log_lsns into missing_info @@ -515,7 +583,7 @@ int ObCDCPartTransResolver::handle_record_( K(missing_info), KPC(part_trans_task)); } else if (is_first_record) { part_trans_task->mark_read_first_record(); - } else if (OB_FAIL(missing_info.set_miss_record_log_lsn(prev_record_lsn))) { + } else if (OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_record_lsn))) { LOG_ERROR("push prev_record_lsn into missing_info failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log), K(is_first_record), K(missing_info), KPC(part_trans_task)); } else { @@ -528,7 +596,7 @@ int ObCDCPartTransResolver::handle_record_( } else if (is_first_record) { part_trans_task->mark_read_first_record(); LOG_DEBUG("mark_read_first_record", K_(tls_id), K(tx_id), K(lsn), K(record_log)); - } else if (OB_FAIL(missing_info.set_miss_record_log_lsn(prev_record_lsn))) { + } else if (OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_record_lsn))) { LOG_ERROR("push prev_record_lsn into missing_info failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(record_log), K(is_first_record), K(missing_info), KPC(part_trans_task)); } @@ -621,6 +689,7 @@ int ObCDCPartTransResolver::handle_commit_info_( const transaction::ObXATransID &xid = commit_info_log.get_xid(); const transaction::ObRedoLSNArray &prev_redo_lsns = commit_info_log.get_redo_lsns(); const palf::LSN &prev_record_lsn = commit_info_log.get_prev_record_lsn(); + const bool has_record_log = prev_record_lsn.is_valid(); if (OB_FAIL(part_trans_task->set_commit_info(trace_id, trace_info, is_dup_tx, xid))) { LOG_ERROR("set_commit_info failed", KR(ret), K_(tls_id), K(lsn), K(commit_info_log), KPC(part_trans_task)); @@ -631,19 +700,21 @@ int ObCDCPartTransResolver::handle_commit_info_( } else if (OB_FAIL(part_trans_task->push_back_recored_redo_lsn_arr(prev_redo_lsns, lsn, has_redo_in_cur_entry))) { LOG_ERROR("push_back_recored_redo_lsn_arr failed", KR(ret), K(prev_redo_lsns), KPC(part_trans_task)); } else if (is_resolving_miss_log) { - // TODO pushback prev_redo_lsns to missing_info; if (OB_FAIL(missing_info.push_back_missing_log_lsn_arr(prev_redo_lsns))) { LOG_ERROR("push_back_missing_log_lsn_arr fail", KR(ret), K(commit_info_log), K(missing_info), KPC(part_trans_task)); - } else if (prev_record_lsn.is_valid() && OB_FAIL(missing_info.set_miss_record_log_lsn(prev_record_lsn))) { - LOG_ERROR("set_miss_record_log_lsn failed", KR(ret), K(commit_info_log), K(missing_info), KPC(part_trans_task)); + } else if (prev_record_lsn.is_valid() && OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_record_lsn))) { + LOG_ERROR("set_miss_record_or_state_log_lsn failed", KR(ret), K(commit_info_log), K(missing_info), KPC(part_trans_task)); } } else if (! part_trans_task->has_find_first_record()) { // check if (1) trans doesn't have record_log; (2) trans has record_log but found log_miss - if (OB_FAIL(check_redo_log_list_(prev_redo_lsns, *part_trans_task, missing_info))) { + if (OB_UNLIKELY(has_record_log && missing_info.need_reconsume_commit_log_entry())) { + ret = OB_STATE_NOT_MATCH; + LOG_ERROR("all record_log should already fetched while reconsume log_entry", KR(ret), K_(tls_id), K(commit_info_log)); + } else if (OB_FAIL(check_redo_log_list_(prev_redo_lsns, *part_trans_task, missing_info))) { LOG_ERROR("check_redo_log_list_ failed", KR(ret), K(commit_info_log), K(missing_info), KPC(part_trans_task)); // To handle log seq like: record redo redo redo commit_info, obcdc start after last record. // prev_record_log_lsn is valid && not find first record - } else if (prev_record_lsn.is_valid() && OB_FAIL(missing_info.set_miss_record_log_lsn(prev_record_lsn))) { + } else if (has_record_log && OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_record_lsn))) { LOG_ERROR("push prev_record_lsn failed", KR(ret), K(prev_record_lsn), K(missing_info), KPC(part_trans_task)); } } else { @@ -683,7 +754,7 @@ int ObCDCPartTransResolver::handle_prepare_( const palf::LSN &commit_info_lsn = prepare_log.get_prev_lsn(); if (commit_info_lsn.is_valid()) { - if (OB_FAIL(missing_info.push_back_single_miss_log_lsn(commit_info_lsn))) { + if (OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(commit_info_lsn))) { LOG_ERROR("push_back_missing_log_lsn fail", KR(ret), K_(tls_id), K(tx_id), K(prepare_log), K(commit_info_lsn), K(missing_info), KPC(part_trans_task)); } else { @@ -743,7 +814,8 @@ int ObCDCPartTransResolver::handle_commit_( } else if (OB_FAIL(obtain_task_(tx_id, part_trans_task, is_resolving_miss_log))) { LOG_ERROR("obtain_part_trans_task fail while reading commit log", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(commit_log), K(missing_info)); - // TODO 下面是否检查sys日志流里非DDL/非LS_TABLE的事务? + } else if (OB_FAIL(part_trans_task->push_multi_data_source_data(lsn, commit_log.get_multi_source_data(), true/*is_commit_log*/))) { + LOG_ERROR("push_multi_data_source_data failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(commit_log), KPC(part_trans_task)); } else if (!part_trans_task->has_read_commit_info()) { if (is_resolving_miss_log) { // commit info is miss log and handled done, reconsumeing commit_log @@ -770,7 +842,7 @@ int ObCDCPartTransResolver::handle_commit_( // LOG_ERROR("handle unserverd single CommitLog(commit_log with invalid prev_log_lsn in dist_trans) failed", // KR(ret), K_(tls_id), K(tx_id), K(commit_log), K(lsn)); // } - } else if (OB_FAIL(missing_info.push_back_single_miss_log_lsn(prev_log_lsn))) { + } else if (OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_log_lsn))) { LOG_ERROR("push_back_single_miss_log_lsn failed", KR(ret), K_(tls_id), K(tx_id), K(commit_log), K(missing_info)); } else { missing_info.set_need_reconsume_commit_log_entry(); diff --git a/src/logservice/libobcdc/src/ob_cdc_part_trans_resolver.h b/src/logservice/libobcdc/src/ob_cdc_part_trans_resolver.h index 537a032b5c..780b235334 100644 --- a/src/logservice/libobcdc/src/ob_cdc_part_trans_resolver.h +++ b/src/logservice/libobcdc/src/ob_cdc_part_trans_resolver.h @@ -76,15 +76,15 @@ public: void reset() { - miss_redo_or_state_lsn_arr_.reset(); - miss_record_log_lsn_.reset(); + miss_redo_lsn_arr_.reset(); + miss_record_or_state_log_lsn_.reset(); need_reconsume_commit_log_entry_ = false; is_resolving_miss_log_ = false; } public: /// has misslog or not /// @retval bool ture if has miss_log(including redo/commit_info/prepare/commit and record_log) - bool is_empty() const { return miss_redo_or_state_lsn_arr_.count() <= 0 && !miss_record_log_lsn_.is_valid(); } + bool is_empty() const { return miss_redo_lsn_arr_.count() <= 0 && !miss_record_or_state_log_lsn_.is_valid(); } /// set need reconsume the state log(currently need reconsume commit_info(currently enable reentrant)/commit log) void set_need_reconsume_commit_log_entry() { need_reconsume_commit_log_entry_ = true; } bool need_reconsume_commit_log_entry() const { return need_reconsume_commit_log_entry_; } @@ -92,39 +92,40 @@ public: void set_resolving_miss_log() { is_resolving_miss_log_ = true; } bool is_resolving_miss_log() const { return is_resolving_miss_log_; } - int set_miss_record_log_lsn(const palf::LSN &record_log_lsn); - int get_miss_record_log_lsn(palf::LSN &miss_record_lsn) const; - ObLogLSNArray &get_miss_redo_or_state_log_arr() { return miss_redo_or_state_lsn_arr_; } + int set_miss_record_or_state_log_lsn(const palf::LSN &record_log_lsn); + bool has_miss_record_or_state_log() const { return miss_record_or_state_log_lsn_.is_valid(); } + int get_miss_record_or_state_log_lsn(palf::LSN &miss_record_lsn) const; + ObLogLSNArray &get_miss_redo_lsn_arr() { return miss_redo_lsn_arr_; } + void reset_miss_record_or_state_log_lsn() { miss_record_or_state_log_lsn_.reset(); } int push_back_single_miss_log_lsn(const palf::LSN &misslog_lsn); template int push_back_missing_log_lsn_arr(const LSN_ARRAY &miss_log_lsn_arr); int64_t get_total_misslog_cnt() const; - // 由于record日志要放到所有misslog的最后面去获取,不能排序或者排序时排除record日志的LSN - // 这里只处理miss_redo_or_state_log_lsn_arr int sort_and_unique_missing_log_lsn(); TO_STRING_KV( - K_(miss_redo_or_state_lsn_arr), - K_(miss_record_log_lsn), + "miss_redo_count", miss_redo_lsn_arr_.count(), + K_(miss_redo_lsn_arr), + K_(miss_record_or_state_log_lsn), K_(need_reconsume_commit_log_entry), K_(is_resolving_miss_log)); private: - // miss log lsn array: redo log and state_log(commit_info/prepare) - ObLogLSNArray miss_redo_or_state_lsn_arr_; - // record log lsn - palf::LSN miss_record_log_lsn_; - // if reconsume the log_entry or not after handling miss_log + // miss redo log lsn array + ObLogLSNArray miss_redo_lsn_arr_; + // miss record log or state log(commit_info/prepare) lsn + palf::LSN miss_record_or_state_log_lsn_; + // need reconsume the log_entry or not after handling miss_log or not. // reconsume if: - // (1) find miss_log while resolving commit_log to submit the part_trans_task - // (2) find miss_log while resolving commit_info_log in case of commit_log - // is the the same log_entry. NOTE: won't reconsume if commit_log is a miss_log. + // (1) find miss_log by check redo is complete or not while resolving commit_log + // (2) find miss_log not empty while resolving commit_log(miss_log found while resolving prepare/commit_info log + // with the the same log_entry with commit_log). bool need_reconsume_commit_log_entry_; // resolving miss log - // directly append miss log lsn if found miss log while resolving + // will directly append prev_log lsn while resolving miss_log bool is_resolving_miss_log_; // TODO use a int8_t instead the two bool variable, may add is_reconsuming var for handle commit_info and commit log }; @@ -241,6 +242,12 @@ private: private: // ******* tx log handler ******** // + int read_trans_header_( + const palf::LSN &lsn, + const transaction::ObTransID &tx_id, + const bool is_resolving_miss_log, + transaction::ObTxLogBlock &tx_log_block, + transaction::ObTxLogHeader &tx_header); // read trans log from tx_log_block as ObTxxxxLog and resolve the tx log. int read_trans_log_( const transaction::ObTxLogBlockHeader &tx_log_block_header, diff --git a/src/logservice/libobcdc/src/ob_log_committer.cpp b/src/logservice/libobcdc/src/ob_log_committer.cpp index 0a53038914..28dde0af66 100644 --- a/src/logservice/libobcdc/src/ob_log_committer.cpp +++ b/src/logservice/libobcdc/src/ob_log_committer.cpp @@ -29,6 +29,7 @@ #include "ob_log_binlog_record_pool.h" // IObLogBRPool #include "ob_log_config.h" // ObLogConfig #include "ob_log_tenant_mgr.h" // IObLogTenantMgr +#include "ob_log_trace_id.h" // ObLogTraceIdGuard #define _STAT(level, fmt, args...) _OBLOG_COMMITTER_LOG(level, "[STAT] [COMMITTER] " fmt, ##args) #define STAT(level, fmt, args...) OBLOG_COMMITTER_LOG(level, "[STAT] [COMMITTER] " fmt, ##args) @@ -704,6 +705,7 @@ int ObLogCommitter::dispatch_heartbeat_binlog_record_(const int64_t heartbeat_ti void ObLogCommitter::heartbeat_routine() { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; if (OB_UNLIKELY(! inited_)) { LOG_ERROR("committer has not been initialized"); @@ -777,6 +779,7 @@ void ObLogCommitter::heartbeat_routine() void ObLogCommitter::commit_routine() { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; if (OB_UNLIKELY(! inited_)) { LOG_ERROR("committer has not been initialized"); diff --git a/src/logservice/libobcdc/src/ob_log_ddl_parser.cpp b/src/logservice/libobcdc/src/ob_log_ddl_parser.cpp index f17fcefa68..71b4809bc6 100644 --- a/src/logservice/libobcdc/src/ob_log_ddl_parser.cpp +++ b/src/logservice/libobcdc/src/ob_log_ddl_parser.cpp @@ -17,6 +17,7 @@ #include "ob_log_instance.h" // IObLogErrHandler #include "ob_log_part_trans_parser.h" // IObLogPartTransParser #include "ob_log_part_trans_task.h" // PartTransTask +#include "ob_log_trace_id.h" using namespace oceanbase::common; @@ -143,6 +144,7 @@ int ObLogDdlParser::handle(void *data, volatile bool &stop_flag) { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; PartTransTask *task = (PartTransTask *)data; if (OB_UNLIKELY(! inited_) || OB_ISNULL(part_trans_parser_)) { diff --git a/src/logservice/libobcdc/src/ob_log_dml_parser.cpp b/src/logservice/libobcdc/src/ob_log_dml_parser.cpp index 5f94b08520..da5978fd58 100644 --- a/src/logservice/libobcdc/src/ob_log_dml_parser.cpp +++ b/src/logservice/libobcdc/src/ob_log_dml_parser.cpp @@ -21,6 +21,7 @@ #include "ob_log_part_trans_parser.h" // IObLogPartTransParser #include "ob_ms_queue_thread.h" // BitSet #include "ob_log_resource_collector.h" // IObLogResourceCollector +#include "ob_log_trace_id.h" using namespace oceanbase::common; @@ -161,6 +162,7 @@ int ObLogDmlParser::handle(void *data, volatile bool &stop_flag) { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; ObLogEntryTask *task = (ObLogEntryTask *)(data); PartTransTask *part_trans_task = NULL; diff --git a/src/logservice/libobcdc/src/ob_log_fetcher_dead_pool.cpp b/src/logservice/libobcdc/src/ob_log_fetcher_dead_pool.cpp index f3f3f4174b..40caed4375 100644 --- a/src/logservice/libobcdc/src/ob_log_fetcher_dead_pool.cpp +++ b/src/logservice/libobcdc/src/ob_log_fetcher_dead_pool.cpp @@ -22,6 +22,7 @@ #include "ob_log_instance.h" // IObLogErrHandler #include "ob_log_ls_fetch_mgr.h" // IObLogLSFetchMgr #include "ob_log_fetcher.h" // IObLogFetcher +#include "ob_log_trace_id.h" // ObLogTraceIdGuard namespace oceanbase { @@ -140,6 +141,7 @@ void ObLogFetcherDeadPool::mark_stop_flag() void ObLogFetcherDeadPool::run(const int64_t thread_index) { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; if (OB_UNLIKELY(! inited_)) { LOG_ERROR("not inited"); diff --git a/src/logservice/libobcdc/src/ob_log_fetcher_idle_pool.cpp b/src/logservice/libobcdc/src/ob_log_fetcher_idle_pool.cpp index 6330fb0b6a..5d96242c4d 100644 --- a/src/logservice/libobcdc/src/ob_log_fetcher_idle_pool.cpp +++ b/src/logservice/libobcdc/src/ob_log_fetcher_idle_pool.cpp @@ -21,6 +21,7 @@ #include "ob_log_instance.h" // IObLogErrHandler #include "ob_ls_worker.h" // IObLSWorker +#include "ob_log_trace_id.h" // ObLogTraceIdGuard using namespace oceanbase::common; @@ -143,6 +144,7 @@ void ObLogFetcherIdlePool::mark_stop_flag() void ObLogFetcherIdlePool::run(const int64_t thread_index) { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; if (OB_UNLIKELY(! inited_)) { LOG_ERROR("not inited"); diff --git a/src/logservice/libobcdc/src/ob_log_formatter.cpp b/src/logservice/libobcdc/src/ob_log_formatter.cpp index 44b970633a..00904ff7eb 100644 --- a/src/logservice/libobcdc/src/ob_log_formatter.cpp +++ b/src/logservice/libobcdc/src/ob_log_formatter.cpp @@ -35,7 +35,8 @@ #include "ob_cdc_lob_data_merger.h" // IObCDCLobDataMerger #include "ob_cdc_lob_aux_meta_storager.h" // ObCDCLobAuxMetaStorager #include "ob_cdc_lob_aux_table_parse.h" // ObCDCLobAuxMetaStorager -#include "ob_cdc_udt.h" // ObCDCUdtValueBuilder +#include "ob_cdc_udt.h" // ObCDCUdtValueBuilder +#include "ob_log_trace_id.h" // ObLogTraceIdGuard using namespace oceanbase::common; using namespace oceanbase::storage; @@ -312,6 +313,7 @@ int ObLogFormatter::handle(void *data, const int64_t thread_index, volatile bool { int ret = OB_SUCCESS; set_cdc_thread_name("Formatter", thread_index); + ObLogTraceIdGuard trace_guard; bool cur_stmt_need_callback = false; IStmtTask *stmt_task = static_cast(data); DmlStmtTask *dml_stmt_task = dynamic_cast(stmt_task); diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp b/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp index f32d027091..9b7c51bf48 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp @@ -527,11 +527,7 @@ int LSFetchCtx::read_miss_tx_log( } else { if (OB_FAIL(part_trans_resolver_->read(buf, buf_len, pos, lsn, submit_ts, serve_info_, missing, tsi))) { - if (OB_ITEM_NOT_SETTED == ret) { - // if found new generated miss log while resolving misslog, FetchStream will - // found new_generated_missing_info not empty and goon with misslog process - ret = OB_SUCCESS; - } else { + if (OB_ITEM_NOT_SETTED != ret) { LOG_ERROR("resolve miss_log fail", KR(ret), K(log_entry), K(log_base_header), K(lsn), K(missing)); } } diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp index 0e6c8d1ea7..810643c050 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp @@ -832,6 +832,7 @@ int FetchStream::read_group_entry_(palf::LogGroupEntry &group_entry, stop_flag))) { if (OB_ITEM_NOT_SETTED == ret) { // handle missing_log_info + missing_info.set_resolving_miss_log(); const bool need_reconsume = missing_info.need_reconsume_commit_log_entry(); KickOutReason fail_reason = NONE; @@ -852,7 +853,6 @@ int FetchStream::read_group_entry_(palf::LogGroupEntry &group_entry, } } else if (need_reconsume) { IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info; - reconsume_miss_info.set_resolving_miss_log(); if (missing_info.need_reconsume_commit_log_entry()) { reconsume_miss_info.set_need_reconsume_commit_log_entry(); } @@ -1466,7 +1466,7 @@ int FetchStream::fetch_miss_log_direct_( resp->set_next_miss_lsn(miss_log_array.at(0).miss_lsn_); while (OB_SUCC(ret) && !stop_fetch) { bool retry_on_err = false; - while(OB_SUCC(ret) && fetched_cnt < arr_cnt && !is_timeout) { + while (OB_SUCC(ret) && fetched_cnt < arr_cnt && !is_timeout) { const int64_t start_fetch_entry_ts = get_timestamp(); const ObCdcLSFetchMissLogReq::MissLogParam ¶m = miss_log_array.at(fetched_cnt); const LSN &missing_lsn = param.miss_lsn_; @@ -1602,126 +1602,215 @@ int FetchStream::fetch_miss_log_( int FetchStream::handle_log_miss_( palf::LogEntry &log_entry, - IObCDCPartTransResolver::MissingLogInfo &org_missing_info, + IObCDCPartTransResolver::MissingLogInfo &missing_info, logfetcher::TransStatInfo &tsi, volatile bool &stop_flag, KickOutReason &fail_reason) { int ret = OB_SUCCESS; - bool misslog_handle_done = false; - if (OB_UNLIKELY(org_missing_info.is_empty())) { + if (OB_UNLIKELY(missing_info.is_empty())) { ret = OB_INVALID_ARGUMENT; - LOG_ERROR("empty missing_info", KR(ret), K(org_missing_info), K(log_entry)); - } else if (OB_FAIL(org_missing_info.sort_and_unique_missing_log_lsn())) { - LOG_ERROR("sort_and_unique_missing_log_lsn failed", KR(ret), K(org_missing_info), K_(ls_fetch_ctx)); + LOG_ERROR("empty missing_info", KR(ret), K(missing_info), K(log_entry)); } else { - IObCDCPartTransResolver::MissingLogInfo handling_misslog_info = org_missing_info; - int64_t fetched_missing_log_cnt = 0; FetchLogSRpc *fetch_log_srpc = NULL; - int64_t rpc_timeout = ATOMIC_LOAD(&g_rpc_timeout); if (OB_FAIL(alloc_fetch_log_srpc_(fetch_log_srpc))) { LOG_ERROR("alloc fetch_log_srpc fail", KR(ret)); } else if (OB_ISNULL(fetch_log_srpc)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("invalid fetch_log_srpc", KR(ret)); + } else if (OB_FAIL(handle_miss_record_or_state_log_( + *fetch_log_srpc, + missing_info, + tsi, + stop_flag, + fail_reason))) { + LOG_ERROR("handle_miss_record_or_state_log_ failed", KR(ret)); + } else if (OB_FAIL(handle_miss_redo_log_( + *fetch_log_srpc, + missing_info, + tsi, + stop_flag, + fail_reason))) { + LOG_ERROR("handle_miss_redo_log_ failed", KR(ret), KR(ret)); + } + + if (stop_flag) { + ret = OB_IN_STOP_STATE; + } else if (OB_NEED_RETRY == ret) { + fail_reason = KickOutReason::MISSING_LOG_FETCH_FAIL; } else { - // for new generated miss log while handling current misslog - IObCDCPartTransResolver::MissingLogInfo new_generated_miss_info; - new_generated_miss_info.reset(); - - // may found new misslog while resolving misslog, here should handle all found misslog - while (OB_SUCC(ret) && !misslog_handle_done) { - const int64_t total_misslog_cnt = handling_misslog_info.get_total_misslog_cnt(); - - // handle current missing_info(handle by batch) - while (OB_SUCC(ret) && fetched_missing_log_cnt < total_misslog_cnt) { - ObArrayImpl batched_misslog_lsn_arr; - - if (OB_FAIL(build_batch_misslog_lsn_arr_( - fetched_missing_log_cnt, - handling_misslog_info, - batched_misslog_lsn_arr))) { - LOG_ERROR("build_batch_misslog_lsn_arr_ failed", KR(ret), - K(handling_misslog_info), K(fetched_missing_log_cnt)); - } else if (OB_FAIL(fetch_miss_log_(*fetch_log_srpc, *rpc_, - *ls_fetch_ctx_, batched_misslog_lsn_arr, svr_, - rpc_timeout))) { - LOG_ERROR("fetch_miss_log_ failed", KR(ret), K_(ls_fetch_ctx), K_(svr), K(batched_misslog_lsn_arr)); - } else { - const obrpc::ObRpcResultCode &rcode = fetch_log_srpc->get_result_code(); - const obrpc::ObCdcLSFetchLogResp &resp = fetch_log_srpc->get_resp(); // TODO change to fetch_miss_log RPC - - if (OB_FAIL(rcode.rcode_) || OB_FAIL(resp.get_err())) { - ret = OB_NEED_RETRY; - LOG_ERROR("fetch log fail on rpc", K(rcode), K(resp), K(batched_misslog_lsn_arr)); - } else { - // check next_miss_lsn - bool is_next_miss_lsn_match = false; - palf::LSN next_miss_lsn = resp.get_next_miss_lsn(); - const int64_t batch_cnt = batched_misslog_lsn_arr.count(); - const int64_t resp_log_cnt = resp.get_log_num(); - - if (batch_cnt == resp_log_cnt) { - is_next_miss_lsn_match = (batched_misslog_lsn_arr.at(batch_cnt-1).miss_lsn_ == next_miss_lsn); - } else if (batch_cnt > resp_log_cnt) { - is_next_miss_lsn_match = (batched_misslog_lsn_arr.at(resp_log_cnt).miss_lsn_ == next_miss_lsn); - } else { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("too many misslog fetched", KR(ret), K(next_miss_lsn), K(batch_cnt), - K(resp_log_cnt),K(resp), K_(ls_fetch_ctx)); - } - if (OB_SUCC(ret)) { - if (!is_next_miss_lsn_match) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("misslog fetched is not match batched_misslog_lsn_arr requested", KR(ret), - K(next_miss_lsn), K(batch_cnt), K(resp_log_cnt), K(batched_misslog_lsn_arr), K(resp), K_(ls_fetch_ctx)); - } else if (OB_FAIL(read_batch_misslog_( - resp, - fetched_missing_log_cnt, - tsi, - handling_misslog_info, - new_generated_miss_info))) { - LOG_ERROR("read_batch_misslog_ fail", KR(ret), K_(ls_fetch_ctx), - K(fetched_missing_log_cnt), K(handling_misslog_info), K(new_generated_miss_info)); - } - } - } - } - - if (OB_NEED_RETRY == ret) { - fail_reason = KickOutReason::MISSING_LOG_FETCH_FAIL; - } - } - - if (OB_SUCC(ret)) { - // check fetched_missing_log_cnt == total_misslog_cnt - if (OB_UNLIKELY(handling_misslog_info.get_total_misslog_cnt() != fetched_missing_log_cnt)) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("misslog not all fetched", KR(ret), K(fetched_missing_log_cnt), K(handling_misslog_info)); - } else if (!new_generated_miss_info.is_empty()) { - // continue handle misslog in new_generated_miss_info - if (OB_FAIL(new_generated_miss_info.sort_and_unique_missing_log_lsn())) { - LOG_ERROR("sort_and_unique_missing_log_lsn failed", KR(ret), K(new_generated_miss_info)); - } else { - fetched_missing_log_cnt = 0; - handling_misslog_info = new_generated_miss_info; // copy_assign - new_generated_miss_info.reset(); // can safely reset - } - } else { - misslog_handle_done = true; - } - } - LOG_INFO("handle miss_log", KR(ret), K(handling_misslog_info), K(misslog_handle_done), K(new_generated_miss_info)); - } + LOG_INFO("handle miss_log done", KR(ret), "tls_id", ls_fetch_ctx_->get_tls_id(), K(missing_info)); } if (OB_NOT_NULL(fetch_log_srpc)) { free_fetch_log_srpc_(fetch_log_srpc); fetch_log_srpc = NULL; } + } + return ret; +} + +int FetchStream::handle_miss_record_or_state_log_( + FetchLogSRpc &fetch_log_srpc, + IObCDCPartTransResolver::MissingLogInfo &missing_info, + logfetcher::TransStatInfo &tsi, + volatile bool &stop_flag, + KickOutReason &fail_reason) +{ + int ret = OB_SUCCESS; + + if (missing_info.has_miss_record_or_state_log()) { + int64_t rpc_timeout = ATOMIC_LOAD(&g_rpc_timeout); + ObArrayImpl batched_misslog_lsn_arr; + palf::LSN misslog_lsn; + + while (OB_SUCC(ret) && ! stop_flag && missing_info.has_miss_record_or_state_log()) { + misslog_lsn.reset(); + batched_misslog_lsn_arr.reset(); + ObCdcLSFetchMissLogReq::MissLogParam param; + + if (OB_FAIL(missing_info.get_miss_record_or_state_log_lsn(misslog_lsn))) { + LOG_ERROR("get_miss_record_or_state_log_lsn failed", K(missing_info), K(misslog_lsn)); + } else { + param.miss_lsn_ = misslog_lsn; + + if (OB_FAIL(batched_misslog_lsn_arr.push_back(param))) { + LOG_ERROR("push_back miss_record_or_state_log_lsn into batched_misslog_lsn_arr failed", KR(ret), K(param)); + } else if (OB_FAIL(fetch_miss_log_( + fetch_log_srpc, + *rpc_, + *ls_fetch_ctx_, + batched_misslog_lsn_arr, + svr_, + rpc_timeout))) { + LOG_ERROR("fetch_miss_log_ failed", KR(ret), K_(ls_fetch_ctx), K_(svr), K(batched_misslog_lsn_arr)); + } else { + const obrpc::ObRpcResultCode &rcode = fetch_log_srpc.get_result_code(); + const obrpc::ObCdcLSFetchLogResp &resp = fetch_log_srpc.get_resp(); + + if (OB_FAIL(rcode.rcode_) || OB_FAIL(resp.get_err())) { + ret = OB_NEED_RETRY; + LOG_ERROR("fetch log fail on rpc", K(rcode), K(resp), K(batched_misslog_lsn_arr)); + } else if (resp.get_log_num() < 1) { + LOG_INFO("fetch_miss_log_rpc doesn't fetch log, retry", K(misslog_lsn), K_(svr)); + } else if (OB_UNLIKELY(resp.get_log_num() > 1)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("expect only one misslog while fetching miss_record_or_state_log", K(resp)); + } else if (OB_UNLIKELY(resp.get_next_miss_lsn() != misslog_lsn)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("fetched log not match miss_log_lsn", KR(ret), K(misslog_lsn), K(resp)); + } else { + missing_info.reset_miss_record_or_state_log_lsn(); + palf::LogEntry miss_log_entry; + miss_log_entry.reset(); + const char *buf = resp.get_log_entry_buf(); + const int64_t len = resp.get_pos(); + int64_t pos = 0; + + if (OB_FAIL(miss_log_entry.deserialize(buf, len, pos))) { + LOG_ERROR("deserialize log_entry of miss_record_or_state_log failed", KR(ret), K(misslog_lsn), KP(buf), K(len), K(pos)); + } else if (OB_FAIL(ls_fetch_ctx_->read_miss_tx_log(miss_log_entry, misslog_lsn, tsi, missing_info))) { + if (OB_ITEM_NOT_SETTED == ret) { + ret = OB_SUCCESS; + LOG_INFO("found new miss_record_or_state_log while resolving current miss_record_or_state_log", + "tls_id", ls_fetch_ctx_->get_tls_id(), K(misslog_lsn), K(missing_info)); + } else { + LOG_ERROR("read miss_log failed", KR(ret), K(miss_log_entry), K(misslog_lsn), K(missing_info)); + } + } + } + } + } + } + + if (OB_SUCC(ret)) { + LOG_INFO("fetch record and state misslog done and collect all miss normal misslog", + "tls_id", ls_fetch_ctx_->get_tls_id(), K(missing_info)); + } + } + + return ret; +} + +int FetchStream::handle_miss_redo_log_( + FetchLogSRpc &fetch_log_srpc, + IObCDCPartTransResolver::MissingLogInfo &missing_info, + logfetcher::TransStatInfo &tsi, + volatile bool &stop_flag, + KickOutReason &fail_reason) +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(missing_info.sort_and_unique_missing_log_lsn())) { + LOG_ERROR("sort_and_unique_missing_log_lsn failed", KR(ret), K(missing_info), K_(ls_fetch_ctx)); + } else { + const int64_t total_misslog_cnt = missing_info.get_total_misslog_cnt(); + int64_t fetched_missing_log_cnt = 0; + int64_t rpc_timeout = ATOMIC_LOAD(&g_rpc_timeout); + ObArrayImpl batched_misslog_lsn_arr; + + while (OB_SUCC(ret) && ! stop_flag && fetched_missing_log_cnt < total_misslog_cnt) { + batched_misslog_lsn_arr.reset(); + + if (OB_FAIL(build_batch_misslog_lsn_arr_( + fetched_missing_log_cnt, + missing_info, + batched_misslog_lsn_arr))) { + LOG_ERROR("build_batch_misslog_lsn_arr_ failed", KR(ret), + K(missing_info), K(fetched_missing_log_cnt)); + } else if (OB_FAIL(fetch_miss_log_( + fetch_log_srpc, + *rpc_, + *ls_fetch_ctx_, + batched_misslog_lsn_arr, + svr_, + rpc_timeout))) { + LOG_ERROR("fetch_miss_log_ failed", KR(ret), K_(ls_fetch_ctx), K_(svr), K(batched_misslog_lsn_arr)); + } else { + const obrpc::ObRpcResultCode &rcode = fetch_log_srpc.get_result_code(); + const obrpc::ObCdcLSFetchLogResp &resp = fetch_log_srpc.get_resp(); + + if (OB_FAIL(rcode.rcode_) || OB_FAIL(resp.get_err())) { + ret = OB_NEED_RETRY; + LOG_ERROR("fetch log fail on rpc", K(rcode), K(resp), K(batched_misslog_lsn_arr)); + } else { + // check next_miss_lsn + bool is_next_miss_lsn_match = false; + palf::LSN next_miss_lsn = resp.get_next_miss_lsn(); + const int64_t batch_cnt = batched_misslog_lsn_arr.count(); + const int64_t resp_log_cnt = resp.get_log_num(); + + if (batch_cnt == resp_log_cnt) { + is_next_miss_lsn_match = (batched_misslog_lsn_arr.at(batch_cnt - 1).miss_lsn_ == next_miss_lsn); + } else if (batch_cnt > resp_log_cnt) { + is_next_miss_lsn_match = (batched_misslog_lsn_arr.at(resp_log_cnt).miss_lsn_ == next_miss_lsn); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("too many misslog fetched", KR(ret), K(next_miss_lsn), K(batch_cnt), + K(resp_log_cnt),K(resp), K_(ls_fetch_ctx)); + } + + if (OB_SUCC(ret)) { + if (!is_next_miss_lsn_match) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("misslog fetched is not match batched_misslog_lsn_arr requested", KR(ret), + K(next_miss_lsn), K(batch_cnt), K(resp_log_cnt), K(batched_misslog_lsn_arr), K(resp), K_(ls_fetch_ctx)); + } else if (OB_FAIL(read_batch_misslog_( + resp, + fetched_missing_log_cnt, + tsi, + missing_info))) { + // expected no misslog found while resolving normal log. + LOG_ERROR("read_batch_misslog failed", KR(ret), K_(ls_fetch_ctx), + K(fetched_missing_log_cnt), K(missing_info)); + } + } + } + } + } } return ret; @@ -1734,13 +1823,13 @@ int FetchStream::build_batch_misslog_lsn_arr_( { int ret = OB_SUCCESS; int64_t batched_cnt = 0; - static int64_t MAX_MISSLOG_CNT_PER_RPC= 100; + static int64_t MAX_MISSLOG_CNT_PER_RPC= 1024; if (OB_UNLIKELY(0 < batched_misslog_lsn_arr.count())) { ret = OB_INVALID_ARGUMENT; LOG_ERROR("invalid batched_misslog_lsn_arr", KR(ret), K(batched_misslog_lsn_arr)); } else { - const ObLogLSNArray &miss_redo_or_state_log_arr = missing_log_info.get_miss_redo_or_state_log_arr(); + const ObLogLSNArray &miss_redo_or_state_log_arr = missing_log_info.get_miss_redo_lsn_arr(); int miss_log_cnt = miss_redo_or_state_log_arr.count(); batched_misslog_lsn_arr.reset(); @@ -1759,31 +1848,13 @@ int FetchStream::build_batch_misslog_lsn_arr_( batched_cnt++; } } - - if (OB_SUCC(ret) && (fetched_log_idx + batched_cnt == miss_log_cnt)) { - // e.g.: already fetched log cnt is 91, current batch_cnt is 9, and total miss_log_cnt is just 100, - // then try to fetch record_log_lsn. - palf::LSN miss_record_lsn; - - if (OB_FAIL(missing_log_info.get_miss_record_log_lsn(miss_record_lsn))) { - if (OB_ENTRY_NOT_EXIST == ret) { - ret = OB_SUCCESS; - } else { - LOG_ERROR("get_miss_record_log_lsn from missing_log failed", KR(ret), K(missing_log_info)); - } - } else { - ObCdcLSFetchMissLogReq::MissLogParam param; - param.miss_lsn_ = miss_record_lsn; - - if (OB_FAIL(batched_misslog_lsn_arr.push_back(param))) { - LOG_ERROR("push_back miss_record_log_lsn into batched_misslog_lsn_arr arr failed", - KR(ret), K(missing_log_info), K(param)); - } - } - } } - LOG_DEBUG("build_batch_misslog_lsn_arr_", KR(ret), K(missing_log_info), K(batched_misslog_lsn_arr), K(fetched_log_idx)); + LOG_INFO("build_batch_misslog_lsn_arr_", KR(ret), + "tls_id", ls_fetch_ctx_->get_tls_id(), + K(missing_log_info), + "batched_misslog_lsn_count", batched_misslog_lsn_arr.count(), + K(fetched_log_idx)); return ret; } @@ -1792,19 +1863,17 @@ int FetchStream::read_batch_misslog_( const obrpc::ObCdcLSFetchLogResp &resp, int64_t &fetched_missing_log_cnt, logfetcher::TransStatInfo &tsi, - IObCDCPartTransResolver::MissingLogInfo &org_missing_info, - IObCDCPartTransResolver::MissingLogInfo &new_generated_miss_info) + IObCDCPartTransResolver::MissingLogInfo &missing_info) { int ret = OB_SUCCESS; - LOG_INFO("read_batch_misslog_ begin", K(resp), K(fetched_missing_log_cnt)); + LOG_INFO("read_batch_misslog_ begin", "tls_id", ls_fetch_ctx_->get_tls_id(), K(resp), K(fetched_missing_log_cnt)); - const int64_t total_misslog_cnt = org_missing_info.get_total_misslog_cnt(); + const int64_t total_misslog_cnt = missing_info.get_total_misslog_cnt(); const char *buf = resp.get_log_entry_buf(); const int64_t len = resp.get_pos(); int64_t pos = 0; const int64_t log_cnt = resp.get_log_num(); - const ObLogLSNArray &org_misslog_arr = org_missing_info.get_miss_redo_or_state_log_arr(); - new_generated_miss_info.set_resolving_miss_log(); + const ObLogLSNArray &org_misslog_arr = missing_info.get_miss_redo_lsn_arr(); int64_t start_ts = get_timestamp(); if (OB_UNLIKELY(log_cnt <= 0)) { @@ -1815,21 +1884,23 @@ int FetchStream::read_batch_misslog_( if (fetched_missing_log_cnt >= total_misslog_cnt) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("fetched_missing_log_cnt is more than total_misslog_cnt", KR(ret), - K(fetched_missing_log_cnt), K(org_missing_info), K(idx), K(resp)); + K(fetched_missing_log_cnt), K(missing_info), K(idx), K(resp)); } else { palf::LSN misslog_lsn; palf::LogEntry miss_log_entry; misslog_lsn.reset(); miss_log_entry.reset(); + IObCDCPartTransResolver::MissingLogInfo tmp_miss_info; + tmp_miss_info.set_resolving_miss_log(); if (org_misslog_arr.count() == fetched_missing_log_cnt) { // already consume the all miss_redo_log, but still exist one miss_record_log. // lsn record_log is the last miss_log to fetch. - if (OB_FAIL(org_missing_info.get_miss_record_log_lsn(misslog_lsn))) { + if (OB_FAIL(missing_info.get_miss_record_or_state_log_lsn(misslog_lsn))) { if (OB_ENTRY_NOT_EXIST == ret) { - LOG_ERROR("expect valid miss-record_log_lsn", KR(ret), K(org_missing_info), K(fetched_missing_log_cnt), K_(ls_fetch_ctx)); + LOG_ERROR("expect valid miss-record_log_lsn", KR(ret), K(missing_info), K(fetched_missing_log_cnt), K_(ls_fetch_ctx)); } else { - LOG_ERROR("get_miss_record_log_lsn failed", KR(ret), K(org_missing_info), K(fetched_missing_log_cnt), K_(ls_fetch_ctx)); + LOG_ERROR("get_miss_record_or_state_log_lsn failed", KR(ret), K(missing_info), K(fetched_missing_log_cnt), K_(ls_fetch_ctx)); } } } else if (OB_FAIL(org_misslog_arr.at(fetched_missing_log_cnt, misslog_lsn))) { @@ -1840,9 +1911,9 @@ int FetchStream::read_batch_misslog_( if (OB_FAIL(ret)) { } else if (OB_FAIL(miss_log_entry.deserialize(buf, len, pos))) { LOG_ERROR("deserialize miss_log_entry fail", KR(ret), K(len), K(pos)); - } else if (OB_FAIL(ls_fetch_ctx_->read_miss_tx_log(miss_log_entry, misslog_lsn, tsi, new_generated_miss_info))) { - LOG_ERROR("read_miss_log fail", KR(ret), K(miss_log_entry), K(new_generated_miss_info), - K(misslog_lsn), K(fetched_missing_log_cnt), K(idx)); + } else if (OB_FAIL(ls_fetch_ctx_->read_miss_tx_log(miss_log_entry, misslog_lsn, tsi, tmp_miss_info))) { + LOG_ERROR("read_miss_log fail", KR(ret), K(miss_log_entry), + K(misslog_lsn), K(fetched_missing_log_cnt), K(idx), K(tmp_miss_info)); } else { fetched_missing_log_cnt++; } @@ -1851,7 +1922,7 @@ int FetchStream::read_batch_misslog_( } int64_t read_batch_missing_cost = get_timestamp() - start_ts; - LOG_INFO("read_batch_misslog_ end", KR(ret), K(read_batch_missing_cost), + LOG_INFO("read_batch_misslog_ end", KR(ret), "tls_id", ls_fetch_ctx_->get_tls_id(), K(read_batch_missing_cost), K(fetched_missing_log_cnt), K(resp), K(start_ts)); return ret; diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h index 1efcd3767e..8709f98e57 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h @@ -251,8 +251,8 @@ private: // handle if found misslog while read_log_ // // @param [in] log_entry LogEntry - // @param [in] org_missing_info MissingLogInfo - // @param [in] tsi logfetcher::TransStatInfo + // @param [in] missing_info MissingLogInfo + // @param [in] tsi TransStatInfo // @param [out] fail_reason KickOutReason // // @retval OB_SUCCESS success @@ -260,7 +260,19 @@ private: // @retval other error code fail int handle_log_miss_( palf::LogEntry &log_entry, - IObCDCPartTransResolver::MissingLogInfo &org_missing_info, + IObCDCPartTransResolver::MissingLogInfo &missing_info, + logfetcher::TransStatInfo &tsi, + volatile bool &stop_flag, + KickOutReason &fail_reason); + int handle_miss_record_or_state_log_( + FetchLogSRpc &fetch_log_srpc, + IObCDCPartTransResolver::MissingLogInfo &missing_info, + logfetcher::TransStatInfo &tsi, + volatile bool &stop_flag, + KickOutReason &fail_reason); + int handle_miss_redo_log_( + FetchLogSRpc &fetch_log_srpc, + IObCDCPartTransResolver::MissingLogInfo &missing_info, logfetcher::TransStatInfo &tsi, volatile bool &stop_flag, KickOutReason &fail_reason); @@ -274,8 +286,7 @@ private: const obrpc::ObCdcLSFetchLogResp &resp, int64_t &fetched_missing_log_cnt, logfetcher::TransStatInfo &tsi, - IObCDCPartTransResolver::MissingLogInfo &org_missing_info, - IObCDCPartTransResolver::MissingLogInfo &new_generated_miss_info); + IObCDCPartTransResolver::MissingLogInfo &missing_info); int alloc_fetch_log_srpc_(FetchLogSRpc *&fetch_log_srpc); void free_fetch_log_srpc_(FetchLogSRpc *fetch_log_srpc); // TODO @bohou handle missing log end diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp index 60d320b9d7..cc5e0de83e 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp @@ -2141,6 +2141,7 @@ PartTransTask::PartTransTask() : ref_cnt_(0), multi_data_source_node_arr_(), multi_data_source_info_(), + segment_buf_(), checkpoint_seq_(0), global_trans_seq_(0), global_schema_version_(OB_INVALID_VERSION), @@ -2275,6 +2276,7 @@ void PartTransTask::reset() ref_cnt_ = 0; multi_data_source_node_arr_.reset(); multi_data_source_info_.reset(); + segment_buf_.reset(); checkpoint_seq_ = 0; global_trans_seq_ = 0; global_schema_version_ = OB_INVALID_VERSION; diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.h b/src/logservice/libobcdc/src/ob_log_part_trans_task.h index 61af2b8f7a..641e999010 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.h @@ -22,6 +22,7 @@ #include "common/ob_queue_thread.h" // ObCond #include "ob_cdc_tablet_to_table_info.h" // ObCDCTabletChangeInfo #include "storage/tx/ob_trans_define.h" // ObTransID, ObLSLogInfoArray +#include "storage/tx/ob_tx_big_segment_buf.h" // ObTxBigSegmentBuf #include "storage/memtable/ob_memtable_mutator.h" // ObMemtableMutatorRow, ObMemtableMutatorMeta #include "storage/blocksstable/ob_datum_row.h" // ObRowDml #include "logservice/data_dictionary/ob_data_dict_storager.h" // ObDataDictStorage @@ -1091,6 +1092,7 @@ public: { return ! sorted_redo_list_.has_dispatched_but_unsorted_redo(); } + transaction::ObTxBigSegmentBuf *get_segment_buf() { return &segment_buf_; } int push_multi_data_source_data( const palf::LSN &lsn, const transaction::ObTxBufferNodeArray &mds_data_arr, @@ -1276,6 +1278,7 @@ private: // For MultiDataSource MultiDataSourceNodeArray multi_data_source_node_arr_; // array record MultiDataSourceNode MultiDataSourceInfo multi_data_source_info_; // MultiDataSourceInfo + transaction::ObTxBigSegmentBuf segment_buf_; // ObTxBigSegmentBuf for Big Tx Log // checkpoint seq number // diff --git a/src/logservice/libobcdc/src/ob_log_reader.cpp b/src/logservice/libobcdc/src/ob_log_reader.cpp index cc0b8f24f5..2814e5b75d 100644 --- a/src/logservice/libobcdc/src/ob_log_reader.cpp +++ b/src/logservice/libobcdc/src/ob_log_reader.cpp @@ -21,6 +21,7 @@ #include "ob_log_store_service.h" // IObStoreService #include "ob_log_utils.h" // get_timestamp #include "ob_log_factory.h" // ReadLogBuf, ReadLogBufFactory +#include "ob_log_trace_id.h" // ObLogTraceIdGuard using namespace oceanbase::common; @@ -168,6 +169,7 @@ void ObLogReader::print_stat_info() int ObLogReader::handle(void *data, const int64_t thread_index, volatile bool &stop_flag) { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; ObLogEntryTask *task = static_cast(data); if (OB_UNLIKELY(! inited_)) { diff --git a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp index 1831ece3ed..afa0bdad32 100644 --- a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp +++ b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp @@ -18,6 +18,7 @@ #include "storage/tx/ob_trans_define.h" // ObTransID +#include "ob_log_trace_id.h" // ObLogTraceIdGuard #include "ob_log_part_trans_task.h" // PartTransTask #include "ob_log_task_pool.h" // ObLogTransTaskPool #include "ob_log_binlog_record_pool.h" // ObLogBRPool @@ -470,6 +471,7 @@ int ObLogResourceCollector::handle(void *data, volatile bool &stop_flag) { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; ObLogResourceRecycleTask *recycle_task = NULL; if (OB_UNLIKELY(! inited_)) { diff --git a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp index 648681e4ff..dcb23159d5 100644 --- a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp +++ b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp @@ -29,6 +29,7 @@ #include "ob_log_meta_data_struct.h" // ObDictTenantInfo #include "ob_log_ddl_processor.h" // ObLogDDLProcessor #include "ob_log_meta_data_service.h" // GLOGMETADATASERVICE +#include "ob_log_trace_id.h" // ObLogTraceIdGuard #define _STAT(level, tag_str, args...) _OBLOG_SEQUENCER_LOG(level, "[STAT] [SEQ] " tag_str, ##args) #define STAT(level, tag_str, args...) OBLOG_SEQUENCER_LOG(level, "[STAT] [SEQ] " tag_str, ##args) @@ -250,6 +251,7 @@ void ObLogSequencer::get_task_count(SeqStatInfo &stat_info) // A thread is responsible for continually rotating the sequence of transactions that need sequence void ObLogSequencer::run1() { + ObLogTraceIdGuard trace_guard; const int64_t SLEEP_US = 1000; lib::set_thread_name("ObLogSequencerTrans"); int ret = OB_SUCCESS; @@ -389,6 +391,7 @@ int ObLogSequencer::handle_to_be_sequenced_trans_(TrxSortElem &trx_sort_elem, int ObLogSequencer::handle(void *data, const int64_t thread_index, volatile bool &stop_flag) { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; PartTransTask *part_trans_task = static_cast(data); (void)ATOMIC_AAF(&queue_part_trans_task_count_, -1); diff --git a/src/logservice/libobcdc/src/ob_log_storager.cpp b/src/logservice/libobcdc/src/ob_log_storager.cpp index b0d402aee7..8b0f9c6544 100644 --- a/src/logservice/libobcdc/src/ob_log_storager.cpp +++ b/src/logservice/libobcdc/src/ob_log_storager.cpp @@ -21,6 +21,7 @@ #include "ob_log_store_key.h" #include "ob_log_store_task.h" #include "ob_log_factory.h" // ObLogStoreTaskFactory +#include "ob_log_trace_id.h" // ObLogTraceIdGuard using namespace oceanbase::common; @@ -177,6 +178,7 @@ void ObLogStorager::thread_end() int ObLogStorager::handle(void *data, const int64_t thread_index, volatile bool &stop_flag) { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; IObLogBatchBufTask *task = static_cast(data); Block *block = static_cast(task); LOG_DEBUG("Storager handle succ", "addr", &task, KPC(task), KPC(block)); diff --git a/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.cpp b/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.cpp index f3b58faeb0..3d6ed6067d 100644 --- a/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.cpp +++ b/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.cpp @@ -21,6 +21,7 @@ #include "ob_log_schema_getter.h" // IObLogSchemaGetter #include "ob_log_tenant_mgr.h" // IObLogTenantMgr #include "ob_log_config.h" // TCONF +#include "ob_log_trace_id.h" #define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[STAT] [SYS_LS_HANDLER] " fmt, ##args) #define STAT(level, fmt, args...) OBLOG_LOG(level, "[STAT] [SYS_LS_HANDLER] " fmt, ##args) @@ -426,6 +427,7 @@ int ObLogSysLsTaskHandler::dispatch_task_( void ObLogSysLsTaskHandler::handle_task_routine() { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; while (! stop_flag_ && OB_SUCCESS == ret) { PartTransTask *task = NULL; diff --git a/src/logservice/libobcdc/src/ob_log_trans_log.h b/src/logservice/libobcdc/src/ob_log_trans_log.h index a2110d4d80..b59398657c 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_log.h +++ b/src/logservice/libobcdc/src/ob_log_trans_log.h @@ -74,7 +74,10 @@ typedef SortedLightyList SortedLogEntryArray; class SortedLogEntryInfo { public: - SortedLogEntryInfo() : last_fetched_redo_log_entry_(NULL), fetched_log_entry_arr_(true), recorded_lsn_arr_() {} + SortedLogEntryInfo() : + last_fetched_redo_log_entry_(NULL), + fetched_log_entry_arr_(true), /*is_unique*/ + recorded_lsn_arr_() {} ~SortedLogEntryInfo() { reset(); } void reset() { @@ -99,7 +102,11 @@ public: SortedLogEntryArray &get_fetched_log_entry_node_arr() { return fetched_log_entry_arr_; } - TO_STRING_KV(K_(fetched_log_entry_arr), K_(recorded_lsn_arr)); + TO_STRING_KV( + "fetched_log_entry_count", fetched_log_entry_arr_.count(), + "recorded_lsn_count", recorded_lsn_arr_.count(), + K_(fetched_log_entry_arr), + K_(recorded_lsn_arr)); private: LogEntryNode *last_fetched_redo_log_entry_; // hold all fetched log_entry_info.(include lsn of log_entry which contains redo_log and rollback_to log) diff --git a/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp b/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp index a9a1e5f07b..1d11226f79 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp +++ b/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp @@ -20,6 +20,7 @@ #include "ob_log_trans_msg_sorter.h" #include "ob_log_instance.h" // IObLogErrHandler +#include "ob_log_trace_id.h" // ObLogTraceIdGuard #define RETRY_FUNC_ON_ERROR_WITH_USLEEP(err_no, var, func, args...) \ do {\ @@ -186,6 +187,7 @@ void ObLogTransMsgSorter::mark_stop_flag() void ObLogTransMsgSorter::handle(void *data) { int ret = OB_SUCCESS; + ObLogTraceIdGuard trace_guard; TransCtx *trans = NULL; if (OB_UNLIKELY(IS_NOT_INIT)) { diff --git a/src/logservice/libobcdc/tests/libobcdc.conf b/src/logservice/libobcdc/tests/libobcdc.conf index 2a3475c8cd..33b7766bde 100644 --- a/src/logservice/libobcdc/tests/libobcdc.conf +++ b/src/logservice/libobcdc/tests/libobcdc.conf @@ -17,9 +17,9 @@ cluster_db_name=oceanbase cluster_id_black_list=| cluster_id_black_value_max=2147483647 cluster_id_black_value_min=2147473648 -cluster_password=admin -cluster_url=*** -cluster_user=admin@sys +cluster_password=default +cluster_url=| +cluster_user=default cluster_version_refresh_interval_sec=600 config_fpath=etc/libobcdc.conf data_start_schema_version=| @@ -35,11 +35,13 @@ enable_filter_sys_tenant=0 enable_formatter_print_log=0 enable_global_unique_index_belong_to_multi_instance=0 enable_hbase_mode=0 +enable_log_limit=1 enable_oracle_mode_match_case_sensitive=0 enable_output_hidden_primary_key=1 enable_output_invisible_column=0 enable_output_trans_order_by_sql_operation=0 enable_verify_mode=1 +extra_redo_dispatch_memory_size=4M fetch_log_rpc_timeout_sec=15 fetch_stream_cached_count=16 fetching_log_mode=integrated @@ -47,21 +49,21 @@ formatter_thread_num=10 global_data_start_schema_version=0 history_schema_version_count=16 idle_pool_thread_num=4 -init_log_level=ALL.*:INFO;SHARE.SCHEMA:INFO +init_log_level=ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:INFO instance_index=0 instance_num=1 io_thread_num=4 lob_data_merger_thread_num=2 log_clean_cycle_time_in_hours=24 log_entry_task_prealloc_count=100000 -log_level=ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:WARN +log_level=ALL.*:WARN;STORAGE.TRANS:DEBUG;TLOG.*:INFO;TLOG.FETCHER:DEBUG log_router_background_refresh_interval_sec=10 ls_count_upper_limit=2000000 ls_fetch_progress_update_timeout_sec=15 ls_fetch_progress_update_timeout_sec_for_lagged_replica=3 max_log_file_count=40 memory_limit=20G -meta_data_refresh_mode=data_dict +meta_data_refresh_mode=online msg_sorter_task_count_upper_limit=200000 msg_sorter_thread_num=1 need_verify_ob_trace_id=0 @@ -88,7 +90,7 @@ reader_queue_length=102400 reader_thread_num=10 ready_to_seq_task_upper_bound=20000 redo_dispatched_memory_limit_exceed_ratio=2 -redo_dispatcher_memory_limit=1G +redo_dispatcher_memory_limit=512M region=default_region resource_collector_thread_num=10 resource_collector_thread_num_for_br=7 @@ -116,7 +118,7 @@ sql_server_change_interval_sec=60 ssl_client_authentication=0 ssl_external_kms_info=| start_lsn_locator_batch_count=5 -start_lsn_locator_locate_count=3 +start_lsn_locator_locate_count=1 start_lsn_locator_rpc_timeout_sec=60 start_lsn_locator_thread_num=4 storager_mem_percentage=2 @@ -131,7 +133,7 @@ system_memory_avail_percentage_lower_bound=10 tablegroup_black_list=| tablegroup_white_list=*.* tb_black_list=| -tb_white_list=oracle.*.* +tb_white_list=*cdc*.*.* tenant_manager_memory_upper_limit=5G tenant_sql_connect_timeout_sec=40 tenant_sql_query_timeout_sec=30 diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index 00e6d97eb6..df548db1f3 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -24668,7 +24668,6 @@ int ObDDLService::drop_database(const ObDropDatabaseArg &arg, actual_trans, schema_guard, &arg.ddl_stmt_str_))) { LOG_WARN("drop database to recyclebin failed", K(arg), K(ret)); } - (void) actual_trans.disable_serialize_inc_schemas(); } else { if (OB_FAIL(ret)) { // FAIL diff --git a/src/storage/tx/ob_tx_log.h b/src/storage/tx/ob_tx_log.h index 4f7b94cf0a..4f219969b5 100755 --- a/src/storage/tx/ob_tx_log.h +++ b/src/storage/tx/ob_tx_log.h @@ -28,7 +28,7 @@ #include "share/scn.h" //#include -// #define OB_TX_MDS_LOG_USE_BIT_SEGMENT_BUF +#define OB_TX_MDS_LOG_USE_BIT_SEGMENT_BUF namespace oceanbase { diff --git a/unittest/libobcdc/test_ob_cdc_part_trans_resolver.cpp b/unittest/libobcdc/test_ob_cdc_part_trans_resolver.cpp index 5845025cd8..4b621825be 100644 --- a/unittest/libobcdc/test_ob_cdc_part_trans_resolver.cpp +++ b/unittest/libobcdc/test_ob_cdc_part_trans_resolver.cpp @@ -152,7 +152,7 @@ TEST(ObCDCPartTransResolver, test_misslog_info_basic) { int ret = OB_SUCCESS; IObCDCPartTransResolver::MissingLogInfo missing_info; - ObLogLSNArray &missing_log_id = missing_info.get_miss_redo_or_state_log_arr(); + ObLogLSNArray &missing_log_id = missing_info.get_miss_redo_lsn_arr(); // prepare data palf::LSN lsn_1(1); @@ -366,7 +366,7 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq2_miss) LOG_DEBUG("read log2", K(lsn), K(lsn2), K(missing_info)); EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry()); EXPECT_EQ(1, missing_info.get_total_misslog_cnt()); - EXPECT_EQ(lsn, missing_info.get_miss_redo_or_state_log_arr().at(0)); + EXPECT_EQ(lsn, missing_info.get_miss_redo_lsn_arr().at(0)); missing_info.reset(); missing_info.set_resolving_miss_log(); IObCDCPartTransResolver::MissingLogInfo new_miss_log; @@ -425,7 +425,6 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq3_miss) EXPECT_EQ(0, new_miss_log.get_total_misslog_cnt()); EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry()); IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info; - reconsume_miss_info.set_resolving_miss_log(); reconsume_miss_info.set_need_reconsume_commit_log_entry(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, reconsume_miss_info, tsi, stop_flag)); EXPECT_EQ(0, reconsume_miss_info.get_total_misslog_cnt()); @@ -478,13 +477,12 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq4_miss_1) EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry2, lsn2, missing_info, tsi, stop_flag)); EXPECT_EQ(1, missing_info.get_total_misslog_cnt()); - EXPECT_EQ(lsn, missing_info.get_miss_redo_or_state_log_arr().at(0)); + EXPECT_EQ(lsn, missing_info.get_miss_redo_lsn_arr().at(0)); EXPECT_FALSE(missing_info.need_reconsume_commit_log_entry()); // commit_info entry is not miss_log, need reconsume IObCDCPartTransResolver::MissingLogInfo new_miss_log; new_miss_log.set_resolving_miss_log(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log)); IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info; - reconsume_miss_info.set_resolving_miss_log(); reconsume_miss_info.set_need_reconsume_commit_log_entry(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, reconsume_miss_info, tsi, stop_flag)); missing_info.reset(); @@ -513,20 +511,20 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq4_miss_2) EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag)); EXPECT_EQ(1, missing_info.get_total_misslog_cnt()); - EXPECT_EQ(lsn2, missing_info.get_miss_redo_or_state_log_arr().at(0)); + EXPECT_EQ(lsn2, missing_info.miss_record_or_state_log_lsn_); EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry()); IObCDCPartTransResolver::MissingLogInfo new_miss_log; new_miss_log.set_resolving_miss_log(); - EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log)); + EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log)); EXPECT_EQ(1, new_miss_log.get_total_misslog_cnt()); - EXPECT_EQ(lsn, new_miss_log.get_miss_redo_or_state_log_arr().at(0)); + EXPECT_EQ(lsn, new_miss_log.get_miss_redo_lsn_arr().at(0)); EXPECT_FALSE(new_miss_log.need_reconsume_commit_log_entry()); IObCDCPartTransResolver::MissingLogInfo new_miss_log_2; new_miss_log_2.set_resolving_miss_log(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log_2)); IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info; - reconsume_miss_info.set_resolving_miss_log(); + reconsume_miss_info.set_need_reconsume_commit_log_entry(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, reconsume_miss_info, tsi, stop_flag)); @@ -579,13 +577,12 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq5_miss) EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry2, lsn2, missing_info, tsi, stop_flag)); EXPECT_EQ(1, missing_info.get_total_misslog_cnt()); - EXPECT_EQ(lsn, missing_info.get_miss_redo_or_state_log_arr().at(0)); + EXPECT_EQ(lsn, missing_info.get_miss_redo_lsn_arr().at(0)); EXPECT_FALSE(missing_info.need_reconsume_commit_log_entry()); // commit_info entry is not miss_log, need reconsume IObCDCPartTransResolver::MissingLogInfo new_miss_log; new_miss_log.set_resolving_miss_log(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log)); IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info; - reconsume_miss_info.set_resolving_miss_log(); reconsume_miss_info.set_need_reconsume_commit_log_entry(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, reconsume_miss_info, tsi, stop_flag)); missing_info.reset(); @@ -644,8 +641,8 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq6_miss) EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag)); EXPECT_EQ(2, missing_info.get_total_misslog_cnt()); - EXPECT_EQ(lsn, missing_info.get_miss_redo_or_state_log_arr().at(0)); - EXPECT_EQ(lsn2, missing_info.get_miss_redo_or_state_log_arr().at(1)); + EXPECT_EQ(lsn, missing_info.get_miss_redo_lsn_arr().at(0)); + EXPECT_EQ(lsn2, missing_info.get_miss_redo_lsn_arr().at(1)); EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry()); IObCDCPartTransResolver::MissingLogInfo new_miss_log; new_miss_log.set_resolving_miss_log(); @@ -654,7 +651,6 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq6_miss) EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log)); IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info; - reconsume_miss_info.set_resolving_miss_log(); reconsume_miss_info.set_need_reconsume_commit_log_entry(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, reconsume_miss_info, tsi, stop_flag)); @@ -738,19 +734,18 @@ TEST(ObCDCPartTransResolver, test_sp_tx_dist_miss2) EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag)); EXPECT_EQ(1, missing_info.get_total_misslog_cnt()); - EXPECT_EQ(lsn2, missing_info.get_miss_redo_or_state_log_arr().at(0)); + EXPECT_EQ(lsn2, missing_info.miss_record_or_state_log_lsn_); EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry()); IObCDCPartTransResolver::MissingLogInfo new_miss_log; new_miss_log.set_resolving_miss_log(); - EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log)); + EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log)); EXPECT_EQ(1, new_miss_log.get_total_misslog_cnt()); - EXPECT_EQ(lsn, new_miss_log.get_miss_redo_or_state_log_arr().at(0)); + EXPECT_EQ(lsn, new_miss_log.miss_record_or_state_log_lsn_); new_miss_log.reset(); new_miss_log.set_resolving_miss_log(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log)); EXPECT_EQ(0, new_miss_log.get_total_misslog_cnt()); missing_info.reset(); - missing_info.set_resolving_miss_log(); missing_info.set_need_reconsume_commit_log_entry(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag)); @@ -880,14 +875,14 @@ TEST(ObCDCPartTransResolver, test_sp_tx_record_miss) log_generator.gen_log_entry(log_entry_rc2, lsn_rc2); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry1, lsn1, missing_info, tsi, stop_flag)); EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry_rc1, lsn_rc1, missing_info, tsi, stop_flag)); - EXPECT_TRUE(missing_info.miss_record_log_lsn_.is_valid()); + EXPECT_TRUE(missing_info.miss_record_or_state_log_lsn_.is_valid()); EXPECT_EQ(1, missing_info.get_total_misslog_cnt()); LOG_INFO("", K(lsn), K(lsn_rc0), K(lsn1), K(lsn_rc1), K(lsn2), K(lsn_rc2), K(missing_info)); // EXPECT_EQ(lsn, missing_info.miss_redo_or_state_lsn_arr_.at(0)); IObCDCPartTransResolver::MissingLogInfo missing_info1; missing_info1.set_resolving_miss_log(); EXPECT_EQ(OB_ITEM_NOT_SETTED, ls_fetch_ctx->read_log(log_entry_rc0, lsn_rc0, missing_info1, tsi, stop_flag)); - EXPECT_EQ(lsn, missing_info1.get_miss_redo_or_state_log_arr().at(0)); + EXPECT_EQ(lsn, missing_info1.get_miss_redo_lsn_arr().at(0)); IObCDCPartTransResolver::MissingLogInfo missing_info2; missing_info2.set_resolving_miss_log(); EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry, lsn, missing_info2, tsi, stop_flag));