From f161cc3bcf4b4da9a002783780e9becdc1c92f7f Mon Sep 17 00:00:00 2001 From: SanmuWangZJU Date: Wed, 15 Feb 2023 11:12:34 +0000 Subject: [PATCH] [OBCDC] Support SYS_LS DIST TRANS(including DDL/DML) --- .../src/ob_cdc_multi_data_source_info.cpp | 8 ++- .../libobcdc/src/ob_log_committer.cpp | 4 +- .../src/ob_log_fetcher_dispatcher.cpp | 52 +++++++++++-------- .../libobcdc/src/ob_log_fetcher_dispatcher.h | 1 + .../libobcdc/src/ob_log_part_trans_parser.cpp | 2 +- .../libobcdc/src/ob_log_part_trans_task.cpp | 11 +++- .../libobcdc/src/ob_log_part_trans_task.h | 2 +- .../src/ob_log_resource_collector.cpp | 1 + .../libobcdc/src/ob_log_sequencer1.cpp | 42 +++++++++++---- .../libobcdc/src/ob_log_sequencer1.h | 6 ++- .../libobcdc/src/ob_log_trans_log.h | 15 ++++++ .../libobcdc/src/ob_log_trans_msg_sorter.cpp | 2 +- .../src/ob_log_trans_redo_dispatcher.cpp | 2 +- 13 files changed, 102 insertions(+), 46 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_cdc_multi_data_source_info.cpp b/src/logservice/libobcdc/src/ob_cdc_multi_data_source_info.cpp index 4d8ca41f59..4d2688276b 100644 --- a/src/logservice/libobcdc/src/ob_cdc_multi_data_source_info.cpp +++ b/src/logservice/libobcdc/src/ob_cdc_multi_data_source_info.cpp @@ -107,18 +107,16 @@ int64_t MultiDataSourceInfo::to_string(char *buf, const int64_t buf_len) const if (NULL != buf && buf_len > 0) { if (has_ls_table_op_) { - (void)common::databuff_printf(buf, buf_len, pos, "ls_table_op: "); - pos += ls_attr_.to_string(buf, buf_len); + (void)common::databuff_printf(buf, buf_len, pos, "{ls_table_op: %s", to_cstring(ls_attr_)); } else { (void)common::databuff_printf(buf, buf_len, pos, "has_ls_table_op: false"); } (void)common::databuff_printf(buf, buf_len, pos, ", is_ddl_trans: %d", has_ddl_trans_op_); if (has_tablet_change_op()) { - (void)common::databuff_printf(buf, buf_len, pos, ", tablet_change_info: "); - pos += tablet_change_info_arr_.to_string(buf, buf_len); + (void)common::databuff_printf(buf, buf_len, pos, ", tablet_change_info: %s}", to_cstring(tablet_change_info_arr_)); } else { - (void)common::databuff_printf(buf, buf_len, pos, ", tablet_change_info: None"); + (void)common::databuff_printf(buf, buf_len, pos, ", tablet_change_info: None}"); } } diff --git a/src/logservice/libobcdc/src/ob_log_committer.cpp b/src/logservice/libobcdc/src/ob_log_committer.cpp index 75768caeeb..411683a91e 100644 --- a/src/logservice/libobcdc/src/ob_log_committer.cpp +++ b/src/logservice/libobcdc/src/ob_log_committer.cpp @@ -417,7 +417,7 @@ int ObLogCommitter::recycle_task_directly_(PartTransTask &task, const bool can_a if (OB_NOT_NULL(resource_collector_) && OB_SUCCESS != (revert_ret = resource_collector_->revert(&task))) { if (OB_IN_STOP_STATE != revert_ret) { - LOG_ERROR("revert HEARTBEAT task fail", K(revert_ret), K(task)); + LOG_ERROR("revert PartTransTask fail", K(revert_ret), K(task)); } ret = OB_SUCCESS == ret ? revert_ret : ret; } @@ -1090,7 +1090,7 @@ int ObLogCommitter::handle_dml_task_(PartTransTask *participants) // Place the Binlog Record chain in the user queue // Binlog Record may be recycled at any time - if (OB_SUCCESS == ret) { + if (OB_SUCC(ret)) { if (OB_FAIL(commit_binlog_record_list_(*trans_ctx, cluster_id, valid_part_trans_task_count, tenant_id, trans_commit_version))) { if (OB_IN_STOP_STATE != ret) { diff --git a/src/logservice/libobcdc/src/ob_log_fetcher_dispatcher.cpp b/src/logservice/libobcdc/src/ob_log_fetcher_dispatcher.cpp index dba151c7be..6064350ba2 100644 --- a/src/logservice/libobcdc/src/ob_log_fetcher_dispatcher.cpp +++ b/src/logservice/libobcdc/src/ob_log_fetcher_dispatcher.cpp @@ -155,12 +155,28 @@ int ObLogFetcherDispatcher::dispatch_ddl_trans_task_(PartTransTask &task, volati { int ret = OB_SUCCESS; + if (OB_FAIL(dispatch_to_sys_ls_handler_(task, stop_flag))) { + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("sys_ls_handler push fail", KR(ret), K(task)); + } + } else { + // succ + } + + return ret; +} + +// dispatch sys_ls task into sys_ls_handler. +// Including following trans_type: DDL/SYS_LS_HB/SYS_LS_OFFLINE +int ObLogFetcherDispatcher::dispatch_to_sys_ls_handler_(PartTransTask &task, volatile bool &stop_flag) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(sys_ls_handler_)) { ret = OB_INVALID_ERROR; LOG_ERROR("invalid sys_ls_handler", KR(ret), K(sys_ls_handler_)); } else { - // DDL transaction push into DDLHandler - LOG_DEBUG("dispatch ddl_trans to sys_ls_handler", K(task)); + LOG_DEBUG("dispatch sys_ls_trans to sys_ls_handler", K(task)); RETRY_FUNC(stop_flag, *sys_ls_handler_, push, &task, DATA_OP_TIMEOUT); } @@ -189,12 +205,10 @@ int ObLogFetcherDispatcher::dispatch_ls_heartbeat_(PartTransTask &task, volatile // Heartbeat of the DDL partition is distributed to the DDL processor if (task.is_sys_ls_heartbeat()) { - if (OB_ISNULL(sys_ls_handler_)) { - ret = OB_INVALID_ERROR; - LOG_ERROR("invalid sys_ls_handler", KR(ret), K(sys_ls_handler_)); - } else { - // Push into DDL Handler - RETRY_FUNC(stop_flag, *sys_ls_handler_, push, &task, DATA_OP_TIMEOUT); + if (OB_FAIL(dispatch_to_sys_ls_handler_(task, stop_flag))) { + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("dispatch sys_ls_heartbeat task into sys_ls_handler failed", KR(ret), K(task)); + } } } else { ret = dispatch_to_committer_(task, stop_flag); @@ -210,12 +224,10 @@ int ObLogFetcherDispatcher::dispatch_offline_ls_task_(PartTransTask &task, // DDL partition's offline tasks are distributed to DDL processors if (task.is_sys_ls_offline_task()) { - if (OB_ISNULL(sys_ls_handler_)) { - ret = OB_INVALID_ERROR; - LOG_ERROR("invalid sys_ls_handler", KR(ret), K(sys_ls_handler_)); - } else { - // Push into DDL Handler - RETRY_FUNC(stop_flag, *sys_ls_handler_, push, &task, DATA_OP_TIMEOUT); + if (OB_FAIL(dispatch_to_sys_ls_handler_(task, stop_flag))) { + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("dispatch sys_ls_offline task into sys_ls_handler failed", KR(ret), K(task)); + } } } else { ret = dispatch_to_committer_(task, stop_flag); @@ -255,13 +267,11 @@ int ObLogFetcherDispatcher::dispatch_ls_table_op_(PartTransTask &task, volatile { int ret = OB_SUCCESS; - if (OB_ISNULL(sys_ls_handler_)) { - ret = OB_INVALID_ERROR; - LOG_ERROR("invalid sys_ls_handler", KR(ret), K(sys_ls_handler_)); - } else { - // DDL transaction push into DDLHandler - LOG_DEBUG("dispatch ls_table_op to sys_ls_handler", K(task)); - RETRY_FUNC(stop_flag, *sys_ls_handler_, push, &task, DATA_OP_TIMEOUT); + // ls table op task expected only in sys_ls. + if (OB_FAIL(dispatch_to_sys_ls_handler_(task, stop_flag))) { + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("dispatch sys_ls_heartbeat task into sys_ls_handler failed", KR(ret), K(task)); + } } return ret; diff --git a/src/logservice/libobcdc/src/ob_log_fetcher_dispatcher.h b/src/logservice/libobcdc/src/ob_log_fetcher_dispatcher.h index d79461b38c..073d616536 100644 --- a/src/logservice/libobcdc/src/ob_log_fetcher_dispatcher.h +++ b/src/logservice/libobcdc/src/ob_log_fetcher_dispatcher.h @@ -56,6 +56,7 @@ private: int dispatch_offline_ls_task_(PartTransTask &task, volatile bool &stop_flag); int dispatch_global_ls_heartbeat_(PartTransTask &task, volatile bool &stop_flag); int dispatch_ls_table_op_(PartTransTask &task, volatile bool &stop_flag); + int dispatch_to_sys_ls_handler_(PartTransTask &task, volatile bool &stop_flag); private: bool inited_; diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp index 01caf69617..1b99bab971 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp @@ -150,8 +150,8 @@ int ObLogPartTransParser::parse(ObLogEntryTask &task, volatile bool &stop_flag) ret = OB_INVALID_DATA; // Calibrate data for completeness } else if (OB_UNLIKELY(! redo_node->check_data_integrity())) { - LOG_ERROR("redo data is not valid", KPC(redo_node)); ret = OB_INVALID_DATA; + LOG_ERROR("redo data is not valid", KR(ret), KPC(redo_node)); } else if (OB_FAIL(parse_stmts_(tenant, *redo_node, task, *part_trans_task, row_index, stop_flag))) { LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), KPC(redo_node), K(task), K(row_index)); diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp index f2f62cdf55..4f369ebf05 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp @@ -3004,6 +3004,9 @@ int PartTransTask::commit( if (! tls_id_.is_sys_log_stream()) { set_ref_cnt(sorted_redo_list_.get_node_number() + 1); + } else if (is_sys_ls_dml_trans()) { + // set ref for DML in SYS_LS + set_ref_cnt(1); } } } @@ -3023,6 +3026,8 @@ int PartTransTask::try_to_set_data_ready_status() ret = OB_STATE_NOT_MATCH; } else if (is_data_ready()) { // do nothing + } else if (is_sys_ls_dml_trans()) { + set_data_ready(); } else if (is_contain_empty_redo_log()) { set_data_ready(); } else { @@ -3507,7 +3512,11 @@ int PartTransTask::wait_formatted(const int64_t timeout, ObCond &cond) void PartTransTask::set_data_ready() { LOG_DEBUG("[STAT] [TRANS_TASK] SET_DATA_READY", K_(is_data_ready), "task", *this); - sorted_redo_list_.init_iterator(); + if (is_sys_ls_dml_trans()) { + sorted_redo_list_.mark_sys_ls_dml_trans_dispatched(); + } else { + sorted_redo_list_.init_iterator(); + } (void)ATOMIC_SET(&is_data_ready_, true); wait_data_ready_cond_.signal(); } diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.h b/src/logservice/libobcdc/src/ob_log_part_trans_task.h index ed056b46fc..03c50996d4 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.h @@ -919,6 +919,7 @@ public: return is_offline_ls_task() && is_sys_ls_part_trans(); } bool is_not_served_trans() const { return TASK_TYPE_NOT_SERVED_TRANS == type_; } + bool is_sys_ls_dml_trans() const { return is_dml_trans() && is_sys_ls_part_trans(); } void set_task_info(const TenantLSID &tls_id, const char *info); @@ -1030,7 +1031,6 @@ public: bool is_served() const { return SERVED == serve_state_; } bool is_single_ls_trans() const { return transaction::TransType::SP_TRANS == trans_type_; } bool is_dist_trans() const { return transaction::TransType::DIST_TRANS == trans_type_; } - void is_part_trans_sort_finish() const { sorted_redo_list_.is_dml_stmt_iter_end(); } bool is_part_dispatch_finish() const { return sorted_redo_list_.is_dispatch_finish(); } void inc_sorted_br() { ATOMIC_INC(&output_br_count_by_turn_); } // get and reset sorted br count diff --git a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp index cbc1b8cf92..e03e894c75 100644 --- a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp +++ b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp @@ -842,6 +842,7 @@ void ObLogResourceCollector::do_stat_(PartTransTask &task, (void)ATOMIC_AAF(&total_part_trans_task_count_, cnt); if (task.is_ddl_trans()) { + LOG_DEBUG("do_stat_ for ddl_trans", K_(ddl_part_trans_task_count), K(cnt), K(task), "lbt", lbt_oblog()); (void)ATOMIC_AAF(&ddl_part_trans_task_count_, cnt); } else if (task.is_dml_trans()) { (void)ATOMIC_AAF(&dml_part_trans_task_count_, cnt); diff --git a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp index 112d3a8d7f..da0893d2e9 100644 --- a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp +++ b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp @@ -224,7 +224,7 @@ int ObLogSequencer::push(PartTransTask *part_trans_task, volatile bool &stop_fla if (OB_SUCC(ret)) { (void)ATOMIC_AAF(&queue_part_trans_task_count_, 1); - do_stat_for_part_trans_task_count_(*part_trans_task, 1); + do_stat_for_part_trans_task_count_(*part_trans_task, 1, false/*is_sub_stat*/); } if (OB_FAIL(ret)) { @@ -709,7 +709,7 @@ int ObLogSequencer::push_task_into_committer_(PartTransTask *task, LOG_ERROR("sequencer has not been initialized", KR(ret), K(tenant)); } else { // Counting the number of partitioned tasks - do_stat_for_part_trans_task_count_(*task, -task_count); + do_stat_for_part_trans_task_count_(*task, task_count, true/*is_sub_stat*/); RETRY_FUNC(stop_flag, (*trans_committer_), push, task, task_count, DATA_OP_TIMEOUT, tenant); } @@ -753,7 +753,8 @@ int ObLogSequencer::handle_participants_ready_trans_(const bool is_dml_trans, ObByteLockGuard guard(trans_queue_lock_); trans_queue_.push(trx_sort_elem); - _DSTAT("[TRANS_QUEUE] TRANS_ID=%s QUEUE_SIZE=%lu IS_DML=%d", + _DSTAT("[TRANS_QUEUE] TENANT_ID=%lu TRANS_ID=%s QUEUE_SIZE=%lu IS_DML=%d", + tenant_id, to_cstring(trx_sort_elem), trans_queue_.size(), is_dml_trans); @@ -842,6 +843,11 @@ int ObLogSequencer::handle_multi_data_source_info_( IObLogPartMgr &part_mgr = tenant.get_part_mgr(); while (OB_SUCC(ret) && OB_NOT_NULL(part_trans_task)) { + if (! part_trans_task->is_sys_ls_part_trans()) { + // USER_LS part_trans_task in DIST_DDL_TRANS won't into dispatcher, set_ref_cnt to 1 to + // recycle the part_trans_task. + part_trans_task->set_ref_cnt(1); + } if (part_trans_task->get_multi_data_source_info().has_tablet_change_op()) { const CDCTabletChangeInfoArray &tablet_change_info_arr = part_trans_task->get_multi_data_source_info().get_tablet_change_info_arr(); @@ -1083,31 +1089,45 @@ int ObLogSequencer::recycle_resources_after_trans_ready_(TransCtx &trans_ctx, Ob return ret; } -void ObLogSequencer::do_stat_for_part_trans_task_count_(PartTransTask &part_trans_task, - const int64_t task_count) +void ObLogSequencer::do_stat_for_part_trans_task_count_( + PartTransTask &part_trans_task, + const int64_t task_count, + const bool is_sub_stat) { bool is_hb_sub_stat = false; int64_t hb_dec_task_count = 0; + int64_t op_task_count = task_count; + if (is_sub_stat) { + op_task_count = -1 * task_count; + } if (part_trans_task.is_ddl_trans()) { - (void)ATOMIC_AAF(&ddl_part_trans_task_count_, task_count); + if (is_sub_stat) { + (void)ATOMIC_AAF(&ddl_part_trans_task_count_, -1); + // dist ddl_task contains dml part_trans_task, should do_stat seperately + if (task_count > 1) { + (void)ATOMIC_AAF(&dml_part_trans_task_count_, 1 - task_count); + } + } else { + (void)ATOMIC_AAF(&ddl_part_trans_task_count_, op_task_count); + } } else if (part_trans_task.is_dml_trans()) { - (void)ATOMIC_AAF(&dml_part_trans_task_count_, task_count); + (void)ATOMIC_AAF(&dml_part_trans_task_count_, op_task_count); } else { // heartbeat - if (task_count < 0) { + if (is_sub_stat) { is_hb_sub_stat = true; - hb_dec_task_count = task_count * SequencerThread::get_thread_num(); + hb_dec_task_count = op_task_count * SequencerThread::get_thread_num(); (void)ATOMIC_AAF(&hb_part_trans_task_count_, hb_dec_task_count); } else { - (void)ATOMIC_AAF(&hb_part_trans_task_count_, task_count); + (void)ATOMIC_AAF(&hb_part_trans_task_count_, op_task_count); } } if (is_hb_sub_stat) { (void)ATOMIC_AAF(&total_part_trans_task_count_, hb_dec_task_count); } else { - (void)ATOMIC_AAF(&total_part_trans_task_count_, task_count); + (void)ATOMIC_AAF(&total_part_trans_task_count_, op_task_count); } } diff --git a/src/logservice/libobcdc/src/ob_log_sequencer1.h b/src/logservice/libobcdc/src/ob_log_sequencer1.h index 8bfbfd6295..64d4035ddc 100644 --- a/src/logservice/libobcdc/src/ob_log_sequencer1.h +++ b/src/logservice/libobcdc/src/ob_log_sequencer1.h @@ -182,8 +182,10 @@ private: const int64_t task_count, volatile bool &stop_flag, ObLogTenant *tenant); - void do_stat_for_part_trans_task_count_(PartTransTask &part_trans_task, - const int64_t task_count); + void do_stat_for_part_trans_task_count_( + PartTransTask &part_trans_task, + const int64_t task_count, + const bool is_sub_stat); // TODO add // 1. statistics on transaction tps and rps (rps before and after Formatter filtering) diff --git a/src/logservice/libobcdc/src/ob_log_trans_log.h b/src/logservice/libobcdc/src/ob_log_trans_log.h index 5f0eb7840d..a2110d4d80 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_log.h +++ b/src/logservice/libobcdc/src/ob_log_trans_log.h @@ -290,6 +290,12 @@ public: ATOMIC_SET(&sorted_redo_count_, 0); ATOMIC_SET(&sorted_row_seq_no_, 0); } + void reset_for_sys_ls_dml_trans(const int64_t redo_node_count) + { + ATOMIC_SET(&dispatched_redo_count_, redo_node_count); + ATOMIC_SET(&sorted_redo_count_, redo_node_count); + ATOMIC_SET(&sorted_row_seq_no_, 0); + } OB_INLINE int64_t get_dispatched_redo_count() const { return ATOMIC_LOAD(&dispatched_redo_count_); } OB_INLINE void inc_dispatched_redo_count() { ATOMIC_INC(&dispatched_redo_count_); } OB_INLINE int64_t get_sorted_redo_count() const { return ATOMIC_LOAD(&sorted_redo_count_); } @@ -435,6 +441,15 @@ struct SortedRedoLogList "cur_sort_redo", static_cast(cur_sort_redo_), KP_(cur_sort_stmt), K_(is_dml_stmt_iter_end)); + + void mark_sys_ls_dml_trans_dispatched() + { + cur_dispatch_redo_ = NULL; + cur_sort_redo_ = NULL; + cur_sort_stmt_ = NULL; + sorted_progress_.reset_for_sys_ls_dml_trans(node_num_); + } + }; } // namespace libobcdc } // namespace oceanbase diff --git a/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp b/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp index 4e933ad372..e02f0394f8 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp +++ b/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp @@ -34,7 +34,7 @@ if (err_no == ret) { \ retry_cnt ++; \ if (0 == retry_cnt % 200) { \ - LOG_WARN(#func "retry for too many times", KP(&var), K(var)); \ + LOG_WARN(#func " retry for too many times", KP(&var), K(var)); \ } \ /* sleep 5 ms*/ \ ob_usleep(5 * 1000); \ diff --git a/src/logservice/libobcdc/src/ob_log_trans_redo_dispatcher.cpp b/src/logservice/libobcdc/src/ob_log_trans_redo_dispatcher.cpp index 4ae2bad85a..7f4bf6dbdc 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_redo_dispatcher.cpp +++ b/src/logservice/libobcdc/src/ob_log_trans_redo_dispatcher.cpp @@ -88,6 +88,7 @@ int ObLogTransRedoDispatcher::dispatch_trans_redo(TransCtx &trans, volatile bool LOG_ERROR("failed to dispatch redo of trans", KR(ret), K(trans), K(stop_flag)); } } else { + trans.set_trans_redo_dispatched(); trans_stat_mgr_->do_dispatch_trans_stat(); } @@ -175,7 +176,6 @@ int ObLogTransRedoDispatcher::dispatch_by_turn_(TransCtx &trans, volatile bool & LOG_WARN("trans dispatch_by_turn for too many times", KR(ret), K(retry_cnt), K(trans), K_(trans_dispatch_ctx)); } } else { - trans.set_trans_redo_dispatched(); } }