diff --git a/src/logservice/libobcdc/src/ob_log_config.h b/src/logservice/libobcdc/src/ob_log_config.h index 2a4e8dc0a5..63b9d7ab0e 100644 --- a/src/logservice/libobcdc/src/ob_log_config.h +++ b/src/logservice/libobcdc/src/ob_log_config.h @@ -201,6 +201,7 @@ public: DEF_INT(log_clean_cycle_time_in_hours, OB_CLUSTER_PARAMETER, "24", "[0,]", "clean log cycle time in hours, 0 means not to clean log"); DEF_INT(max_log_file_count, OB_CLUSTER_PARAMETER, "40", "[0,]", "max log file count, 0 means no limit"); + T_DEF_BOOL(enable_log_limit, OB_CLUSTER_PARAMETER, 1, "0:disable log_limit, 1:enable log_limit"); T_DEF_BOOL(skip_dirty_data, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled"); @@ -385,7 +386,8 @@ public: // Not on by default (participatn-by-participant output) T_DEF_BOOL(enable_output_trans_order_by_sql_operation, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled"); // redo dispatcher memory limit - DEF_CAP(redo_dispatcher_memory_limit, OB_CLUSTER_PARAMETER, "1G", "[128M,]", "redo dispatcher memory limit"); + DEF_CAP(redo_dispatcher_memory_limit, OB_CLUSTER_PARAMETER, "512M", "[128M,]", "redo dispatcher memory limit"); + DEF_CAP(extra_redo_dispatch_memory_size, OB_CLUSTER_PARAMETER, "4M", "[0, 512M]", "extra redo dispatcher memory for data skew participant"); // redo diepatcher memory limit ratio for output br by sql operation(compare with redo_dispatcher_memory_limit) T_DEF_INT_INFT(redo_dispatched_memory_limit_exceed_ratio, OB_CLUSTER_PARAMETER, 2, 1, "redo_dispatcher_memory_limit ratio for output by sql operation order"); diff --git a/src/logservice/libobcdc/src/ob_log_dml_parser.cpp b/src/logservice/libobcdc/src/ob_log_dml_parser.cpp index 794878e3a1..5f94b08520 100644 --- a/src/logservice/libobcdc/src/ob_log_dml_parser.cpp +++ b/src/logservice/libobcdc/src/ob_log_dml_parser.cpp @@ -140,7 +140,7 @@ int ObLogDmlParser::push(ObLogEntryTask &task, const int64_t timeout) } } else { ATOMIC_INC(&log_entry_task_count_); - LOG_DEBUG("push task into DML parser", K(task)); + LOG_DEBUG("push task into DML parser", KP(&task), K(task)); } } @@ -174,7 +174,7 @@ int ObLogDmlParser::handle(void *data, LOG_ERROR("part_trans_task is NULL", K(part_trans_task), KPC(task)); ret = OB_ERR_UNEXPECTED; } else { - LOG_DEBUG("DML parser handle task", K(thread_index), KPC(task)); + LOG_DEBUG("DML parser handle task", K(thread_index), KP(task), KPC(task)); const uint64_t tenant_id = part_trans_task->get_tenant_id(); lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID; diff --git a/src/logservice/libobcdc/src/ob_log_formatter.cpp b/src/logservice/libobcdc/src/ob_log_formatter.cpp index e09cbef9db..ea79d64912 100644 --- a/src/logservice/libobcdc/src/ob_log_formatter.cpp +++ b/src/logservice/libobcdc/src/ob_log_formatter.cpp @@ -816,7 +816,7 @@ int ObLogFormatter::finish_format_(PartTransTask &part_trans_task, LOG_ERROR("redo_log_entry_task link_row_list fail", KR(ret), K(redo_log_entry_task)); } } else { - LOG_DEBUG("[FORMATT]", K(tenant_id), K(stmt_num), K(redo_log_entry_task), K(part_trans_task)); + LOG_DEBUG("[FORMATT]", K(tenant_id), K(stmt_num), KP(&redo_log_entry_task), K(redo_log_entry_task), K(part_trans_task)); IObLogResourceCollector *resource_collector = TCTX.resource_collector_; if (0 == row_ref_cnt) { diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index 6ef1210a08..2d2db2df18 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -404,12 +404,14 @@ int ObLogInstance::init_logger_() LOG_ERROR("FileDirectoryUtils create_full_path fail", KR(ret), K(log_dir)); } else { const int64_t max_log_file_count = TCONF.max_log_file_count; + const bool enable_log_limit = (1 == TCONF.enable_log_limit); easy_log_level = EASY_LOG_INFO; OB_LOGGER.set_max_file_size(MAX_LOG_FILE_SIZE); OB_LOGGER.set_max_file_index(max_log_file_count); OB_LOGGER.set_file_name(log_file, disable_redirect_log_, false); OB_LOGGER.set_log_level("INFO"); OB_LOGGER.disable_thread_log_level(); + OB_LOGGER.set_enable_log_limit(enable_log_limit); if (! disable_redirect_log_) { // Open the stderr log file @@ -2118,9 +2120,11 @@ void ObLogInstance::reload_config_() LOG_ERROR("load_from_file fail", KR(ret), K(default_config_fpath)); } else { const int64_t max_log_file_count = config.max_log_file_count; + const bool enable_log_limit = (1 == config.enable_log_limit); LOG_INFO("reset log config", "log_level", config.log_level.str(), K(max_log_file_count)); OB_LOGGER.set_mod_log_levels(config.log_level.str()); OB_LOGGER.set_max_file_index(max_log_file_count); + OB_LOGGER.set_enable_log_limit(enable_log_limit); ATOMIC_STORE(&log_clean_cycle_time_us_, config.log_clean_cycle_time_in_hours * _HOUR_); diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp index 0e4593b54b..f2f62cdf55 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp @@ -3734,7 +3734,10 @@ int PartTransTask::next_dml_stmt(DmlStmtTask *&dml_stmt_task) } else if (OB_ISNULL(dml_stmt_task = static_cast(dml_task))) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("dml_task get from SortedRedoList should not be null", KR(ret), K(dml_task)); - } else { /* succ */ } + } else { + sorted_redo_list_.set_sorted_row_seq_no(dml_stmt_task->get_row_seq_no()); + /* succ */ + } return ret; } diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.h b/src/logservice/libobcdc/src/ob_log_part_trans_task.h index c4fd55bef1..ed056b46fc 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.h @@ -1076,6 +1076,11 @@ public: SortedLogEntryInfo &get_sorted_log_entry_info() { return sorted_log_entry_info_; } int is_all_redo_log_entry_fetched(bool &is_all_redo_fetched) { return sorted_log_entry_info_.is_all_log_entry_fetched(is_all_redo_fetched); }; + // is dispatched redo all sorted: + OB_INLINE bool is_dispatched_redo_be_sorted() const + { + return ! sorted_redo_list_.has_dispatched_but_unsorted_redo(); + } int push_multi_data_source_data( const palf::LSN &lsn, const transaction::ObTxBufferNodeArray &mds_data_arr, diff --git a/src/logservice/libobcdc/src/ob_log_reader.cpp b/src/logservice/libobcdc/src/ob_log_reader.cpp index 6aaeb6266f..13b37d853f 100644 --- a/src/logservice/libobcdc/src/ob_log_reader.cpp +++ b/src/logservice/libobcdc/src/ob_log_reader.cpp @@ -179,6 +179,7 @@ int ObLogReader::handle(void *data, const int64_t thread_index, volatile bool &s LOG_ERROR("handle_task_ fail", KR(ret), KPC(task), K(thread_index)); } } else { + LOG_DEBUG("ObLogEntryTask read succ", KP(task)); ATOMIC_DEC(&log_entry_task_count_); } diff --git a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp index ccb5c46f82..cbc1b8cf92 100644 --- a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp +++ b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp @@ -292,7 +292,7 @@ int ObLogResourceCollector::revert_log_entry_task_(ObLogEntryTask *log_entry_tas const bool is_test_mode_on = TCONF.test_mode_on != 0; if (is_test_mode_on) { - LOG_INFO("LogEntryTask-free", "LogEntryTask", *log_entry_task, "addr", log_entry_task); + LOG_INFO("LogEntryTask-free", "LogEntryTask", *log_entry_task, "addr", log_entry_task, K(data_len)); } if (is_log_entry_stored) { @@ -569,7 +569,7 @@ int ObLogResourceCollector::handle(void *data, } } else {} } - (void)ATOMIC_AAF(&br_count_, -1); + ATOMIC_DEC(&br_count_); task = NULL; } } else { @@ -681,7 +681,7 @@ int ObLogResourceCollector::dec_ref_cnt_and_try_to_recycle_log_entry_task_(ObLog ret = OB_ERR_UNEXPECTED; } else { if (TCONF.test_mode_on) { - LOG_INFO("revert_dml_binlog_record", KPC(log_entry_task)); + LOG_INFO("revert_dml_binlog_record", KP(&br), K(br), KP(log_entry_task), KPC(log_entry_task)); } const bool need_revert_log_entry_task = (log_entry_task->dec_row_ref_cnt() == 0); diff --git a/src/logservice/libobcdc/src/ob_log_trans_dispatch_ctx.cpp b/src/logservice/libobcdc/src/ob_log_trans_dispatch_ctx.cpp index 424d5ba9be..dfc160f601 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_dispatch_ctx.cpp +++ b/src/logservice/libobcdc/src/ob_log_trans_dispatch_ctx.cpp @@ -111,7 +111,26 @@ void TransDispatchCtx::set_normal_priority_budget_(const int64_t &average_budget { for(int64_t i = 0; i < normal_priority_part_budget_arr_.count(); i++) { PartTransDispatchBudget &budget = normal_priority_part_budget_arr_[i]; - budget.reset_budget(average_budget); + PartTransTask *part_trans_task = budget.part_trans_task_; + + if (average_budget <= 0 + && OB_NOT_NULL(part_trans_task) + && part_trans_task->is_dispatched_redo_be_sorted()) { + const int64_t extra_redo_dispatch_size = TCONF.extra_redo_dispatch_memory_size; + + if (REACH_TIME_INTERVAL(1 * _SEC_)) { + LOG_INFO("[NOTICE][REDO_DISPATCH] budget usedup but dispatched_redo all sorted, use extra_redo budget", + K(budget), + K(average_budget), + "extra_redo_dispatch_size", SIZE_TO_STR(extra_redo_dispatch_size), + "part_trans_task", part_trans_task->get_part_trans_info(), + "redo_sorted_progress", part_trans_task->get_sorted_redo_list().sorted_progress_); + } + + budget.reset_budget(extra_redo_dispatch_size); + } else { + budget.reset_budget(average_budget); + } } } diff --git a/src/logservice/libobcdc/src/ob_log_trans_log.cpp b/src/logservice/libobcdc/src/ob_log_trans_log.cpp index 443b22bd4c..7548cf796c 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_log.cpp +++ b/src/logservice/libobcdc/src/ob_log_trans_log.cpp @@ -252,6 +252,15 @@ int DmlRedoLogNode::set_data_info(char *data, int64_t data_len) return ret; } +void RedoSortedProgress::set_sorted_row_seq_no(const int64_t row_seq_no) +{ + if (row_seq_no < ATOMIC_LOAD(&sorted_row_seq_no_)) { + // TODO PDML may cause row_seq_no rollback + LOG_WARN_RET(OB_STATE_NOT_MATCH, "row_seq_no rollbacked! check if PDML sence", K(row_seq_no), K_(sorted_row_seq_no)); + } + ATOMIC_STORE(&sorted_row_seq_no_, row_seq_no); +} + int SortedRedoLogList::push(const bool is_data_in_memory, RedoLogMetaNode *node) { @@ -320,6 +329,7 @@ void SortedRedoLogList::init_iterator() cur_dispatch_redo_ = head_; cur_sort_redo_ = head_; cur_sort_stmt_ = NULL; // row not format and stmt should be null + sorted_progress_.reset(); } int SortedRedoLogList::next_dml_redo(RedoLogMetaNode *&dml_redo_meta, bool &is_last_redo) @@ -337,8 +347,9 @@ int SortedRedoLogList::next_dml_redo(RedoLogMetaNode *&dml_redo_meta, bool &is_l RedoLogMetaNode *next_redo = cur_dispatch_redo_->get_next(); dml_redo_meta = cur_dispatch_redo_; cur_dispatch_redo_ = next_redo; - dispatched_redo_count_++; // Theoretically no concurrent call of this function - is_last_redo = (dispatched_redo_count_ == node_num_); + // Theoretically no concurrent call of this function + sorted_progress_.inc_dispatched_redo_count(); + is_last_redo = is_dispatch_finish(); } return ret; @@ -389,6 +400,7 @@ int SortedRedoLogList::next_dml_stmt(ObLink *&dml_stmt_task) // 1. found dml_stmt_task and it is the last stmt of cur_sort_redo // 2. cur_sort_redo doesn't has any row cur_sort_redo_ = cur_sort_redo_->get_next(); + sorted_progress_.inc_sorted_redo_count(); } } } diff --git a/src/logservice/libobcdc/src/ob_log_trans_log.h b/src/logservice/libobcdc/src/ob_log_trans_log.h index d0d433da32..5f0eb7840d 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_log.h +++ b/src/logservice/libobcdc/src/ob_log_trans_log.h @@ -278,6 +278,37 @@ public: TO_STRING_KV("RedoLog", static_cast(*this)); }; +class RedoSortedProgress +{ +public: + RedoSortedProgress() { reset(); } + ~RedoSortedProgress() { reset(); } +public: + void reset() + { + ATOMIC_SET(&dispatched_redo_count_, 0); + ATOMIC_SET(&sorted_redo_count_, 0); + ATOMIC_SET(&sorted_row_seq_no_, 0); + } + OB_INLINE int64_t get_dispatched_redo_count() const { return ATOMIC_LOAD(&dispatched_redo_count_); } + OB_INLINE void inc_dispatched_redo_count() { ATOMIC_INC(&dispatched_redo_count_); } + OB_INLINE int64_t get_sorted_redo_count() const { return ATOMIC_LOAD(&sorted_redo_count_); } + OB_INLINE void inc_sorted_redo_count() { ATOMIC_INC(&sorted_redo_count_); } + OB_INLINE int64_t get_sorted_row_seq_no() const { return ATOMIC_LOAD(&sorted_row_seq_no_); } + void set_sorted_row_seq_no(const int64_t row_seq_no); + OB_INLINE int64_t get_dispatched_not_sort_redo_count() const + { return ATOMIC_LOAD(&dispatched_redo_count_) - ATOMIC_LOAD(&sorted_redo_count_); } +public: + TO_STRING_KV( + K_(dispatched_redo_count), + K_(sorted_redo_count), + K_(sorted_row_seq_no)); +private: + int64_t dispatched_redo_count_; + int64_t sorted_redo_count_; + int64_t sorted_row_seq_no_; +}; + class IStmtTask; class DmlStmtTask; // Ordered Redo log list @@ -288,7 +319,6 @@ struct SortedRedoLogList // Otherwise, we can increase ready_node_num directly int32_t ready_node_num_; int32_t log_num_; - int64_t dispatched_redo_count_; bool is_dml_stmt_iter_end_; RedoLogMetaNode *head_; @@ -297,9 +327,20 @@ struct SortedRedoLogList RedoLogMetaNode *cur_dispatch_redo_; RedoLogMetaNode *cur_sort_redo_; ObLink *cur_sort_stmt_; + RedoSortedProgress sorted_progress_; - SortedRedoLogList() : node_num_(0), ready_node_num_(0), log_num_(0), dispatched_redo_count_(0), is_dml_stmt_iter_end_(false), - head_(NULL), tail_(NULL), last_push_node_(NULL), cur_dispatch_redo_(NULL), cur_sort_redo_(NULL), cur_sort_stmt_(NULL) + SortedRedoLogList() : + node_num_(0), + ready_node_num_(0), + log_num_(0), + is_dml_stmt_iter_end_(false), + head_(NULL), + tail_(NULL), + last_push_node_(NULL), + cur_dispatch_redo_(NULL), + cur_sort_redo_(NULL), + cur_sort_stmt_(NULL), + sorted_progress_() {} ~SortedRedoLogList() { reset(); } @@ -314,7 +355,6 @@ struct SortedRedoLogList node_num_ = 0; ready_node_num_ = 0; log_num_ = 0; - dispatched_redo_count_ = 0; is_dml_stmt_iter_end_ = false; head_ = NULL; tail_ = NULL; @@ -322,15 +362,24 @@ struct SortedRedoLogList cur_dispatch_redo_ = NULL; cur_sort_redo_ = NULL; cur_sort_stmt_ = NULL; + sorted_progress_.reset(); } bool is_valid() const { - return node_num_ > 0 && log_num_ > 0 && NULL != head_ && NULL != tail_ + return node_num_ > 0 + && log_num_ > 0 + && NULL != head_ + && NULL != tail_ && NULL != last_push_node_; } - bool is_dispatch_finish() const { return node_num_ == ATOMIC_LOAD(&dispatched_redo_count_); } + OB_INLINE bool is_dispatch_finish() const { return node_num_ == sorted_progress_.get_dispatched_redo_count(); } + + OB_INLINE bool has_dispatched_but_unsorted_redo() const + { return sorted_progress_.get_dispatched_not_sort_redo_count() > 0; } + + OB_INLINE void set_sorted_row_seq_no(const int64_t row_seq_no) { sorted_progress_.set_sorted_row_seq_no(row_seq_no); } bool is_dml_stmt_iter_end() const { return is_dml_stmt_iter_end_; } @@ -373,11 +422,19 @@ struct SortedRedoLogList /// @retval Other error codes Fail int next_dml_stmt(ObLink *&dml_stmt_task); - TO_STRING_KV(K_(node_num), K_(log_num), K_(ready_node_num), - KP_(head), KP_(tail), KP_(last_push_node), - K_(dispatched_redo_count), KP_(cur_dispatch_redo), KP_(cur_sort_redo), - "cur_sort_redo", static_cast(cur_sort_redo_), - KP_(cur_sort_stmt), K_(is_dml_stmt_iter_end)); + TO_STRING_KV( + K_(node_num), + K_(log_num), + K_(ready_node_num), + KP_(head), + KP_(tail), + KP_(last_push_node), + "redo_sorted_progress", sorted_progress_, + KP_(cur_dispatch_redo), + KP_(cur_sort_redo), + "cur_sort_redo", static_cast(cur_sort_redo_), + KP_(cur_sort_stmt), + K_(is_dml_stmt_iter_end)); }; } // namespace libobcdc } // namespace oceanbase diff --git a/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp b/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp index 116d78fdc8..4e933ad372 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp +++ b/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp @@ -69,11 +69,12 @@ ObLogTransMsgSorter::~ObLogTransMsgSorter() destroy(); } -int ObLogTransMsgSorter::init(const bool enable_sort_by_seq_no, - const int64_t thread_num, - const int64_t task_limit, - IObLogTransStatMgr &trans_stat_mgr, - IObLogErrHandler *err_handler) +int ObLogTransMsgSorter::init( + const bool enable_sort_by_seq_no, + const int64_t thread_num, + const int64_t task_limit, + IObLogTransStatMgr &trans_stat_mgr, + IObLogErrHandler *err_handler) { int ret = OB_SUCCESS;