[OBCDC] fix OBCDC stuck caused by data_skew

This commit is contained in:
SanmuWangZJU
2023-02-13 05:42:03 +00:00
committed by ob-robot
parent f5338e1cdd
commit df71a667df
12 changed files with 131 additions and 27 deletions

View File

@ -201,6 +201,7 @@ public:
DEF_INT(log_clean_cycle_time_in_hours, OB_CLUSTER_PARAMETER, "24", "[0,]",
"clean log cycle time in hours, 0 means not to clean log");
DEF_INT(max_log_file_count, OB_CLUSTER_PARAMETER, "40", "[0,]", "max log file count, 0 means no limit");
T_DEF_BOOL(enable_log_limit, OB_CLUSTER_PARAMETER, 1, "0:disable log_limit, 1:enable log_limit");
T_DEF_BOOL(skip_dirty_data, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled");
@ -385,7 +386,8 @@ public:
// Not on by default (participatn-by-participant output)
T_DEF_BOOL(enable_output_trans_order_by_sql_operation, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled");
// redo dispatcher memory limit
DEF_CAP(redo_dispatcher_memory_limit, OB_CLUSTER_PARAMETER, "1G", "[128M,]", "redo dispatcher memory limit");
DEF_CAP(redo_dispatcher_memory_limit, OB_CLUSTER_PARAMETER, "512M", "[128M,]", "redo dispatcher memory limit");
DEF_CAP(extra_redo_dispatch_memory_size, OB_CLUSTER_PARAMETER, "4M", "[0, 512M]", "extra redo dispatcher memory for data skew participant");
// redo diepatcher memory limit ratio for output br by sql operation(compare with redo_dispatcher_memory_limit)
T_DEF_INT_INFT(redo_dispatched_memory_limit_exceed_ratio, OB_CLUSTER_PARAMETER, 2, 1,
"redo_dispatcher_memory_limit ratio for output by sql operation order");

View File

@ -140,7 +140,7 @@ int ObLogDmlParser::push(ObLogEntryTask &task, const int64_t timeout)
}
} else {
ATOMIC_INC(&log_entry_task_count_);
LOG_DEBUG("push task into DML parser", K(task));
LOG_DEBUG("push task into DML parser", KP(&task), K(task));
}
}
@ -174,7 +174,7 @@ int ObLogDmlParser::handle(void *data,
LOG_ERROR("part_trans_task is NULL", K(part_trans_task), KPC(task));
ret = OB_ERR_UNEXPECTED;
} else {
LOG_DEBUG("DML parser handle task", K(thread_index), KPC(task));
LOG_DEBUG("DML parser handle task", K(thread_index), KP(task), KPC(task));
const uint64_t tenant_id = part_trans_task->get_tenant_id();
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;

View File

@ -816,7 +816,7 @@ int ObLogFormatter::finish_format_(PartTransTask &part_trans_task,
LOG_ERROR("redo_log_entry_task link_row_list fail", KR(ret), K(redo_log_entry_task));
}
} else {
LOG_DEBUG("[FORMATT]", K(tenant_id), K(stmt_num), K(redo_log_entry_task), K(part_trans_task));
LOG_DEBUG("[FORMATT]", K(tenant_id), K(stmt_num), KP(&redo_log_entry_task), K(redo_log_entry_task), K(part_trans_task));
IObLogResourceCollector *resource_collector = TCTX.resource_collector_;
if (0 == row_ref_cnt) {

View File

@ -404,12 +404,14 @@ int ObLogInstance::init_logger_()
LOG_ERROR("FileDirectoryUtils create_full_path fail", KR(ret), K(log_dir));
} else {
const int64_t max_log_file_count = TCONF.max_log_file_count;
const bool enable_log_limit = (1 == TCONF.enable_log_limit);
easy_log_level = EASY_LOG_INFO;
OB_LOGGER.set_max_file_size(MAX_LOG_FILE_SIZE);
OB_LOGGER.set_max_file_index(max_log_file_count);
OB_LOGGER.set_file_name(log_file, disable_redirect_log_, false);
OB_LOGGER.set_log_level("INFO");
OB_LOGGER.disable_thread_log_level();
OB_LOGGER.set_enable_log_limit(enable_log_limit);
if (! disable_redirect_log_) {
// Open the stderr log file
@ -2118,9 +2120,11 @@ void ObLogInstance::reload_config_()
LOG_ERROR("load_from_file fail", KR(ret), K(default_config_fpath));
} else {
const int64_t max_log_file_count = config.max_log_file_count;
const bool enable_log_limit = (1 == config.enable_log_limit);
LOG_INFO("reset log config", "log_level", config.log_level.str(), K(max_log_file_count));
OB_LOGGER.set_mod_log_levels(config.log_level.str());
OB_LOGGER.set_max_file_index(max_log_file_count);
OB_LOGGER.set_enable_log_limit(enable_log_limit);
ATOMIC_STORE(&log_clean_cycle_time_us_, config.log_clean_cycle_time_in_hours * _HOUR_);

View File

@ -3734,7 +3734,10 @@ int PartTransTask::next_dml_stmt(DmlStmtTask *&dml_stmt_task)
} else if (OB_ISNULL(dml_stmt_task = static_cast<DmlStmtTask*>(dml_task))) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("dml_task get from SortedRedoList should not be null", KR(ret), K(dml_task));
} else { /* succ */ }
} else {
sorted_redo_list_.set_sorted_row_seq_no(dml_stmt_task->get_row_seq_no());
/* succ */
}
return ret;
}

View File

@ -1076,6 +1076,11 @@ public:
SortedLogEntryInfo &get_sorted_log_entry_info() { return sorted_log_entry_info_; }
int is_all_redo_log_entry_fetched(bool &is_all_redo_fetched)
{ return sorted_log_entry_info_.is_all_log_entry_fetched(is_all_redo_fetched); };
// is dispatched redo all sorted:
OB_INLINE bool is_dispatched_redo_be_sorted() const
{
return ! sorted_redo_list_.has_dispatched_but_unsorted_redo();
}
int push_multi_data_source_data(
const palf::LSN &lsn,
const transaction::ObTxBufferNodeArray &mds_data_arr,

View File

@ -179,6 +179,7 @@ int ObLogReader::handle(void *data, const int64_t thread_index, volatile bool &s
LOG_ERROR("handle_task_ fail", KR(ret), KPC(task), K(thread_index));
}
} else {
LOG_DEBUG("ObLogEntryTask read succ", KP(task));
ATOMIC_DEC(&log_entry_task_count_);
}

View File

@ -292,7 +292,7 @@ int ObLogResourceCollector::revert_log_entry_task_(ObLogEntryTask *log_entry_tas
const bool is_test_mode_on = TCONF.test_mode_on != 0;
if (is_test_mode_on) {
LOG_INFO("LogEntryTask-free", "LogEntryTask", *log_entry_task, "addr", log_entry_task);
LOG_INFO("LogEntryTask-free", "LogEntryTask", *log_entry_task, "addr", log_entry_task, K(data_len));
}
if (is_log_entry_stored) {
@ -569,7 +569,7 @@ int ObLogResourceCollector::handle(void *data,
}
} else {}
}
(void)ATOMIC_AAF(&br_count_, -1);
ATOMIC_DEC(&br_count_);
task = NULL;
}
} else {
@ -681,7 +681,7 @@ int ObLogResourceCollector::dec_ref_cnt_and_try_to_recycle_log_entry_task_(ObLog
ret = OB_ERR_UNEXPECTED;
} else {
if (TCONF.test_mode_on) {
LOG_INFO("revert_dml_binlog_record", KPC(log_entry_task));
LOG_INFO("revert_dml_binlog_record", KP(&br), K(br), KP(log_entry_task), KPC(log_entry_task));
}
const bool need_revert_log_entry_task = (log_entry_task->dec_row_ref_cnt() == 0);

View File

@ -111,7 +111,26 @@ void TransDispatchCtx::set_normal_priority_budget_(const int64_t &average_budget
{
for(int64_t i = 0; i < normal_priority_part_budget_arr_.count(); i++) {
PartTransDispatchBudget &budget = normal_priority_part_budget_arr_[i];
budget.reset_budget(average_budget);
PartTransTask *part_trans_task = budget.part_trans_task_;
if (average_budget <= 0
&& OB_NOT_NULL(part_trans_task)
&& part_trans_task->is_dispatched_redo_be_sorted()) {
const int64_t extra_redo_dispatch_size = TCONF.extra_redo_dispatch_memory_size;
if (REACH_TIME_INTERVAL(1 * _SEC_)) {
LOG_INFO("[NOTICE][REDO_DISPATCH] budget usedup but dispatched_redo all sorted, use extra_redo budget",
K(budget),
K(average_budget),
"extra_redo_dispatch_size", SIZE_TO_STR(extra_redo_dispatch_size),
"part_trans_task", part_trans_task->get_part_trans_info(),
"redo_sorted_progress", part_trans_task->get_sorted_redo_list().sorted_progress_);
}
budget.reset_budget(extra_redo_dispatch_size);
} else {
budget.reset_budget(average_budget);
}
}
}

View File

@ -252,6 +252,15 @@ int DmlRedoLogNode::set_data_info(char *data, int64_t data_len)
return ret;
}
void RedoSortedProgress::set_sorted_row_seq_no(const int64_t row_seq_no)
{
if (row_seq_no < ATOMIC_LOAD(&sorted_row_seq_no_)) {
// TODO PDML may cause row_seq_no rollback
LOG_WARN_RET(OB_STATE_NOT_MATCH, "row_seq_no rollbacked! check if PDML sence", K(row_seq_no), K_(sorted_row_seq_no));
}
ATOMIC_STORE(&sorted_row_seq_no_, row_seq_no);
}
int SortedRedoLogList::push(const bool is_data_in_memory,
RedoLogMetaNode *node)
{
@ -320,6 +329,7 @@ void SortedRedoLogList::init_iterator()
cur_dispatch_redo_ = head_;
cur_sort_redo_ = head_;
cur_sort_stmt_ = NULL; // row not format and stmt should be null
sorted_progress_.reset();
}
int SortedRedoLogList::next_dml_redo(RedoLogMetaNode *&dml_redo_meta, bool &is_last_redo)
@ -337,8 +347,9 @@ int SortedRedoLogList::next_dml_redo(RedoLogMetaNode *&dml_redo_meta, bool &is_l
RedoLogMetaNode *next_redo = cur_dispatch_redo_->get_next();
dml_redo_meta = cur_dispatch_redo_;
cur_dispatch_redo_ = next_redo;
dispatched_redo_count_++; // Theoretically no concurrent call of this function
is_last_redo = (dispatched_redo_count_ == node_num_);
// Theoretically no concurrent call of this function
sorted_progress_.inc_dispatched_redo_count();
is_last_redo = is_dispatch_finish();
}
return ret;
@ -389,6 +400,7 @@ int SortedRedoLogList::next_dml_stmt(ObLink *&dml_stmt_task)
// 1. found dml_stmt_task and it is the last stmt of cur_sort_redo
// 2. cur_sort_redo doesn't has any row
cur_sort_redo_ = cur_sort_redo_->get_next();
sorted_progress_.inc_sorted_redo_count();
}
}
}

View File

@ -278,6 +278,37 @@ public:
TO_STRING_KV("RedoLog", static_cast<const RedoLogMetaNode &>(*this));
};
class RedoSortedProgress
{
public:
RedoSortedProgress() { reset(); }
~RedoSortedProgress() { reset(); }
public:
void reset()
{
ATOMIC_SET(&dispatched_redo_count_, 0);
ATOMIC_SET(&sorted_redo_count_, 0);
ATOMIC_SET(&sorted_row_seq_no_, 0);
}
OB_INLINE int64_t get_dispatched_redo_count() const { return ATOMIC_LOAD(&dispatched_redo_count_); }
OB_INLINE void inc_dispatched_redo_count() { ATOMIC_INC(&dispatched_redo_count_); }
OB_INLINE int64_t get_sorted_redo_count() const { return ATOMIC_LOAD(&sorted_redo_count_); }
OB_INLINE void inc_sorted_redo_count() { ATOMIC_INC(&sorted_redo_count_); }
OB_INLINE int64_t get_sorted_row_seq_no() const { return ATOMIC_LOAD(&sorted_row_seq_no_); }
void set_sorted_row_seq_no(const int64_t row_seq_no);
OB_INLINE int64_t get_dispatched_not_sort_redo_count() const
{ return ATOMIC_LOAD(&dispatched_redo_count_) - ATOMIC_LOAD(&sorted_redo_count_); }
public:
TO_STRING_KV(
K_(dispatched_redo_count),
K_(sorted_redo_count),
K_(sorted_row_seq_no));
private:
int64_t dispatched_redo_count_;
int64_t sorted_redo_count_;
int64_t sorted_row_seq_no_;
};
class IStmtTask;
class DmlStmtTask;
// Ordered Redo log list
@ -288,7 +319,6 @@ struct SortedRedoLogList
// Otherwise, we can increase ready_node_num directly
int32_t ready_node_num_;
int32_t log_num_;
int64_t dispatched_redo_count_;
bool is_dml_stmt_iter_end_;
RedoLogMetaNode *head_;
@ -297,9 +327,20 @@ struct SortedRedoLogList
RedoLogMetaNode *cur_dispatch_redo_;
RedoLogMetaNode *cur_sort_redo_;
ObLink *cur_sort_stmt_;
RedoSortedProgress sorted_progress_;
SortedRedoLogList() : node_num_(0), ready_node_num_(0), log_num_(0), dispatched_redo_count_(0), is_dml_stmt_iter_end_(false),
head_(NULL), tail_(NULL), last_push_node_(NULL), cur_dispatch_redo_(NULL), cur_sort_redo_(NULL), cur_sort_stmt_(NULL)
SortedRedoLogList() :
node_num_(0),
ready_node_num_(0),
log_num_(0),
is_dml_stmt_iter_end_(false),
head_(NULL),
tail_(NULL),
last_push_node_(NULL),
cur_dispatch_redo_(NULL),
cur_sort_redo_(NULL),
cur_sort_stmt_(NULL),
sorted_progress_()
{}
~SortedRedoLogList() { reset(); }
@ -314,7 +355,6 @@ struct SortedRedoLogList
node_num_ = 0;
ready_node_num_ = 0;
log_num_ = 0;
dispatched_redo_count_ = 0;
is_dml_stmt_iter_end_ = false;
head_ = NULL;
tail_ = NULL;
@ -322,15 +362,24 @@ struct SortedRedoLogList
cur_dispatch_redo_ = NULL;
cur_sort_redo_ = NULL;
cur_sort_stmt_ = NULL;
sorted_progress_.reset();
}
bool is_valid() const
{
return node_num_ > 0 && log_num_ > 0 && NULL != head_ && NULL != tail_
return node_num_ > 0
&& log_num_ > 0
&& NULL != head_
&& NULL != tail_
&& NULL != last_push_node_;
}
bool is_dispatch_finish() const { return node_num_ == ATOMIC_LOAD(&dispatched_redo_count_); }
OB_INLINE bool is_dispatch_finish() const { return node_num_ == sorted_progress_.get_dispatched_redo_count(); }
OB_INLINE bool has_dispatched_but_unsorted_redo() const
{ return sorted_progress_.get_dispatched_not_sort_redo_count() > 0; }
OB_INLINE void set_sorted_row_seq_no(const int64_t row_seq_no) { sorted_progress_.set_sorted_row_seq_no(row_seq_no); }
bool is_dml_stmt_iter_end() const { return is_dml_stmt_iter_end_; }
@ -373,11 +422,19 @@ struct SortedRedoLogList
/// @retval Other error codes Fail
int next_dml_stmt(ObLink *&dml_stmt_task);
TO_STRING_KV(K_(node_num), K_(log_num), K_(ready_node_num),
KP_(head), KP_(tail), KP_(last_push_node),
K_(dispatched_redo_count), KP_(cur_dispatch_redo), KP_(cur_sort_redo),
"cur_sort_redo", static_cast<DmlRedoLogNode*>(cur_sort_redo_),
KP_(cur_sort_stmt), K_(is_dml_stmt_iter_end));
TO_STRING_KV(
K_(node_num),
K_(log_num),
K_(ready_node_num),
KP_(head),
KP_(tail),
KP_(last_push_node),
"redo_sorted_progress", sorted_progress_,
KP_(cur_dispatch_redo),
KP_(cur_sort_redo),
"cur_sort_redo", static_cast<DmlRedoLogNode*>(cur_sort_redo_),
KP_(cur_sort_stmt),
K_(is_dml_stmt_iter_end));
};
} // namespace libobcdc
} // namespace oceanbase

View File

@ -69,11 +69,12 @@ ObLogTransMsgSorter::~ObLogTransMsgSorter()
destroy();
}
int ObLogTransMsgSorter::init(const bool enable_sort_by_seq_no,
const int64_t thread_num,
const int64_t task_limit,
IObLogTransStatMgr &trans_stat_mgr,
IObLogErrHandler *err_handler)
int ObLogTransMsgSorter::init(
const bool enable_sort_by_seq_no,
const int64_t thread_num,
const int64_t task_limit,
IObLogTransStatMgr &trans_stat_mgr,
IObLogErrHandler *err_handler)
{
int ret = OB_SUCCESS;