diff --git a/src/logservice/data_dictionary/ob_data_dict_struct.cpp b/src/logservice/data_dictionary/ob_data_dict_struct.cpp index ed3ef615c..73f30dc72 100644 --- a/src/logservice/data_dictionary/ob_data_dict_struct.cpp +++ b/src/logservice/data_dictionary/ob_data_dict_struct.cpp @@ -870,8 +870,10 @@ void ObDictTableMeta::reset() database_id_ = OB_INVALID_ID; table_id_ = OB_INVALID_ID; schema_version_ = OB_INVALID_TIMESTAMP; - allocator_->free(table_name_.ptr()); - table_name_.reset(); + if (table_name_.ptr() != nullptr) { + allocator_->free(table_name_.ptr()); + table_name_.reset(); + } name_case_mode_ = common::OB_NAME_CASE_INVALID; table_type_ = schema::ObTableType::USER_TABLE; table_mode_.reset(); diff --git a/src/logservice/data_dictionary/ob_data_dict_struct.h b/src/logservice/data_dictionary/ob_data_dict_struct.h index 7dd417fce..4e4c11559 100644 --- a/src/logservice/data_dictionary/ob_data_dict_struct.h +++ b/src/logservice/data_dictionary/ob_data_dict_struct.h @@ -115,7 +115,7 @@ private: ObDictMetaType meta_type_; ObDictMetaStorageType storage_type_; int64_t dict_serialized_length_; -}; +}; // end of ObDictMetaHeader class ObDictTenantMeta { @@ -189,7 +189,7 @@ private: int64_t drop_tenant_time_; bool in_recyclebin_; share::ObLSArray ls_arr_; -}; +}; // end of ObDictTenantMeta class ObDictDatabaseMeta { @@ -250,7 +250,7 @@ private: common::ObCollationType collation_type_;//default:utf8mb4_general_ci common::ObNameCaseMode name_case_mode_; bool in_recyclebin_; -}; +}; // end of ObDictDatabaseMeta class ObDictColumnMeta { @@ -361,7 +361,7 @@ private: common::ObSEArray column_ref_ids_; uint64_t udt_set_id_; uint64_t sub_type_; -}; +}; // end of ObDictColumnMeta class ObDictTableMeta { @@ -527,7 +527,7 @@ private: common::ObIndexColumn *index_cols_; uint64_t data_table_id_; uint64_t association_table_id_; -}; +}; // end of ObDictTableMeta } // namespace datadict } // namespace oceanbase diff --git a/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp b/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp index 015afb478..f9b660139 100644 --- a/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp +++ b/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp @@ -215,51 +215,27 @@ int ObCDCLobDataMerger::push_lob_column_( const auto seq_no_st = transaction::ObTxSEQ::cast_from_int(lob_data_out_row_ctx->seq_no_st_); const uint32_t seq_no_cnt = lob_data_out_row_ctx->seq_no_cnt_; const uint32_t del_seq_no_cnt = lob_data_out_row_ctx->del_seq_no_cnt_; + const uint32_t insert_seq_no_cnt = seq_no_cnt - del_seq_no_cnt; LobColumnFragmentCtxList new_lob_col_fra_ctx_list; LobColumnFragmentCtxList old_lob_col_fra_ctx_list; if (is_empty_sql) { // do nothing } else if (lob_data_get_ctx.is_insert()) { - if (OB_FAIL(lob_data_get_ctx.new_lob_col_ctx_.init(seq_no_cnt, allocator))) { + if (OB_FAIL(check_empty_outrow_lob_col_(lob_data_get_ctx, seq_no_cnt, del_seq_no_cnt, is_update_outrow_lob_from_empty_to_empty))) { + LOG_ERROR("check_empty_outrow_lob_col_ failed", K(lob_data_get_ctx), K(seq_no_cnt), K(del_seq_no_cnt), K(is_update_outrow_lob_from_empty_to_empty)); + } else if (OB_FAIL(lob_data_get_ctx.new_lob_col_ctx_.init(insert_seq_no_cnt, allocator))) { LOG_ERROR("lob_data_get_ctx new_lob_col_ctx_ init failed", KR(ret), K(seq_no_cnt), K(lob_data_get_ctx)); } else if (OB_FAIL(get_lob_col_fra_ctx_list_(true/*is_new_col*/, seq_no_st, seq_no_cnt, allocator, - lob_data_get_ctx, new_lob_col_fra_ctx_list))) { + lob_data_get_ctx, new_lob_col_fra_ctx_list))) { LOG_ERROR("get_lob_col_fra_ctx_list_ failed", KR(ret), K(seq_no_st), K(seq_no_cnt), K(new_lob_col_fra_ctx_list)); } } else if (lob_data_get_ctx.is_update()) { - const int64_t insert_seq_no_cnt = seq_no_cnt - del_seq_no_cnt; - // NOTICE: - // 1. Update LOB column data from in_row to out_row, the del_seq_no_cnt is 0 - // 2. Update LOB column data from out_row to empty string, the insert_seq_no_cnt is 0 - // - // 3. Currently, LOB column data is stored in out_row in these cases: - // 3.1 Length of column data is larger than 4K - // 3.2. Length of column data is less than 4K(even if column data is empty string), - // but was larger than 4K(stored out_row) and not update to NULL until this trans. - - if (del_seq_no_cnt == 0 && insert_seq_no_cnt == del_seq_no_cnt) { - // empty out_row update to empty out_row - // Under normal circumstances, this scenario should not occur; - // Abnormaly circumstances in OBServer version less than 4.2.1 BP2 and 4.1.0 BP4, please refer case t/libobcdc/lob_empty_outrow_udpate.test - const uint64_t cluster_version = TCTX.global_info_.get_min_cluster_version(); - const bool skip_ob_version_exist_known_issues = (cluster_version == 0) // can't get cluster_version, may in direct mode - || (cluster_version < CLUSTER_VERSION_4_2_1_2) // ob version less than 4213 and 4102 has known issues will result in this scenario. - || (cluster_version <= CLUSTER_VERSION_4_1_0_2); - const bool can_ignore_empty_outrow_update = (1 == TCONF.skip_empty_outrow_lob_update) || skip_ob_version_exist_known_issues; - - if (can_ignore_empty_outrow_update) { - is_update_outrow_lob_from_empty_to_empty = true; - } else { - ret = OB_NOT_SUPPORTED; - LOG_ERROR("[FATAL] [OUTROW_LOB] unexpected update outrow lob from empty to empty, config skip_empty_outrow_lob_update = 1 if necessary", - KR(ret), K(can_ignore_empty_outrow_update), K(cluster_version)); - } - } - - if (FAILEDx(lob_data_get_ctx.old_lob_col_ctx_.init(del_seq_no_cnt, allocator))) { + if (OB_FAIL(check_empty_outrow_lob_col_(lob_data_get_ctx, seq_no_cnt, del_seq_no_cnt, is_update_outrow_lob_from_empty_to_empty))) { + LOG_ERROR("check_empty_outrow_lob_col_ failed", K(lob_data_get_ctx), K(seq_no_cnt), K(del_seq_no_cnt), K(is_update_outrow_lob_from_empty_to_empty)); + } else if (OB_FAIL(lob_data_get_ctx.old_lob_col_ctx_.init(del_seq_no_cnt, allocator))) { LOG_ERROR("lob_data_get_ctx old_lob_col_ctx_ init failed", KR(ret), K(del_seq_no_cnt), K(lob_data_get_ctx)); } else if (OB_FAIL(get_lob_col_fra_ctx_list_(false/*is_new_col*/, seq_no_st, del_seq_no_cnt, @@ -312,6 +288,46 @@ int ObCDCLobDataMerger::push_lob_column_( return ret; } +int ObCDCLobDataMerger::check_empty_outrow_lob_col_( + ObLobDataGetCtx &lob_data_get_ctx, + uint32_t seq_no_cnt, + uint32_t del_seq_no_cnt, + bool &is_update_outrow_lob_from_empty_to_empty) +{ + int ret = OB_SUCCESS; + const int64_t insert_seq_no_cnt = seq_no_cnt - del_seq_no_cnt; + const bool is_empty_lob = lob_data_get_ctx.is_insert() ? insert_seq_no_cnt == 0 : (insert_seq_no_cnt == 0 && del_seq_no_cnt == 0); + // NOTICE: + // 1. Update LOB column data from in_row to out_row, the del_seq_no_cnt is 0 + // 2. Update LOB column data from out_row to empty string, the insert_seq_no_cnt is 0 + // + // 3. Currently, LOB column data is stored in out_row in these cases: + // 3.1 Length of column data is larger than 4K + // 3.2. Length of column data is less than 4K(even if column data is empty string), + // but was larger than 4K(stored out_row) and not update to NULL until this trans. + + if (is_empty_lob) { + // empty out_row update to empty out_row + // Under normal circumstances, this scenario should not occur; + // Abnormaly circumstances in OBServer version less than 4.2.1 BP2 and 4.1.0 BP4, please refer case t/libobcdc/lob_empty_outrow_udpate.test + const uint64_t cluster_version = TCTX.global_info_.get_min_cluster_version(); + const bool skip_ob_version_exist_known_issues = (cluster_version == 0) // can't get cluster_version, may in direct mode + || (cluster_version < CLUSTER_VERSION_4_2_1_2) // ob version less than 4213 and 4102 has known issues will result in this scenario. + || (cluster_version <= CLUSTER_VERSION_4_1_0_2); + const bool can_ignore_empty_outrow_update = (1 == TCONF.skip_empty_outrow_lob_update) || skip_ob_version_exist_known_issues; + + if (can_ignore_empty_outrow_update) { + is_update_outrow_lob_from_empty_to_empty = true; + } else { + ret = OB_NOT_SUPPORTED; + LOG_ERROR("[FATAL] [OUTROW_LOB] unexpected update outrow lob from empty to empty, config skip_empty_outrow_lob_update = 1 if necessary", + KR(ret), K(can_ignore_empty_outrow_update), K(cluster_version)); + } + } + + return ret; +} + int ObCDCLobDataMerger::get_lob_col_fra_ctx_list_( const bool is_new_col, const transaction::ObTxSEQ &seq_no_start, diff --git a/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.h b/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.h index 490393b3e..dedee3136 100644 --- a/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.h +++ b/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.h @@ -81,6 +81,11 @@ private: ObLobDataOutRowCtxList &task, ObLobDataGetCtx &lob_data_get_ctx, volatile bool &stop_flag); + int check_empty_outrow_lob_col_( + ObLobDataGetCtx &lob_data_get_ctx, + uint32_t seq_no_cnt, + uint32_t del_seq_no_cnt, + bool &is_update_outrow_lob_from_empty_to_empty); int get_lob_col_fra_ctx_list_( const bool is_new_col, const transaction::ObTxSEQ &seq_no_start, diff --git a/src/logservice/libobcdc/src/ob_cdc_tenant_sql_server_provider.cpp b/src/logservice/libobcdc/src/ob_cdc_tenant_sql_server_provider.cpp index 5ef6d4993..6654245ae 100644 --- a/src/logservice/libobcdc/src/ob_cdc_tenant_sql_server_provider.cpp +++ b/src/logservice/libobcdc/src/ob_cdc_tenant_sql_server_provider.cpp @@ -106,6 +106,7 @@ int ObCDCTenantSQLServerProvider::prepare_refresh() } else if (OB_FAIL(query_tenant_server_())) { LOG_WARN("query_tenant_server_ failed", KR(ret)); } + LOG_INFO("prepare_refresh tenant sql_server_provider done", KR(ret)); return ret; } @@ -120,6 +121,7 @@ int ObCDCTenantSQLServerProvider::end_refresh() } else if (OB_FAIL(refresh_server_lock_.unlock())) { LOG_ERROR("release refresh_server_lock_ failed", KR(ret), K(this)); } + LOG_INFO("refresh tenant sql_server_provider end", KR(ret)); return ret; } @@ -296,7 +298,7 @@ int ObCDCTenantSQLServerProvider::query_tenant_server_() LOG_WARN("tenant_server_list is empty but tenant is not dropped", K(tenant_id), K(tenant_server_list)); } } else { - LOG_DEBUG("find tenant servers", K(tenant_id), K(tenant_server_list)); + LOG_INFO("find tenant servers", K(tenant_id), K(tenant_server_list)); } if (OB_NOT_NULL(tenant_server_list)) { diff --git a/src/logservice/libobcdc/src/ob_log_ls_getter.cpp b/src/logservice/libobcdc/src/ob_log_ls_getter.cpp index 03e95fb65..9f0782209 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_getter.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_getter.cpp @@ -133,9 +133,10 @@ int ObLogLsGetter::get_ls_ids( // query and set if (OB_FAIL(query_and_set_tenant_ls_info_(tenant_id, snapshot_ts))) { LOG_ERROR("query_and_set_tenant_ls_info_ failed", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tenant_ls_ids_cache_.get(tenant_id, ls_ids))) { + LOG_ERROR("get tenant_ls_ids from cache failed", KR(ret), K(tenant_id), K(snapshot_ts)); } } - } else { } if (OB_SUCC(ret)) { @@ -144,6 +145,10 @@ int ObLogLsGetter::get_ls_ids( LOG_ERROR("ls_id_array push_back failed", KR(ret), K(ls_id_array)); } } + + if (OB_SUCC(ret)) { + LOG_INFO("get_ls_ids succ", K(tenant_id), K(ls_id_array), K(snapshot_ts)); + } } return ret; diff --git a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp index 0d6d8e31f..670f10f59 100644 --- a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp +++ b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp @@ -14,6 +14,7 @@ #include "ob_log_sequencer1.h" +#include #include "lib/string/ob_string.h" // ObString #include "lib/atomic/ob_atomic.h" #include "lib/thread/ob_thread_name.h" @@ -228,7 +229,10 @@ int ObLogSequencer::push(PartTransTask *part_trans_task, volatile bool &stop_fla hash_value = ATOMIC_FAA(&round_value_, 1); } void *push_task = static_cast(part_trans_task); - RETRY_FUNC(stop_flag, *(static_cast(this)), push, push_task, hash_value, DATA_OP_TIMEOUT); + + if (is_global_heartbeat || OB_SUCC(wait_until_ready_queue_not_busy_(stop_flag))) { + RETRY_FUNC(stop_flag, *(static_cast(this)), push, push_task, hash_value, DATA_OP_TIMEOUT); + } if (OB_SUCC(ret)) { (void)ATOMIC_AAF(&queue_part_trans_task_count_, 1); @@ -807,20 +811,19 @@ int ObLogSequencer::push_task_into_committer_(PartTransTask *task, return ret; } -int ObLogSequencer::handle_participants_ready_trans_(const bool is_dml_trans, +int ObLogSequencer::handle_participants_ready_trans_( + const bool is_dml_trans, TransCtx *trans_ctx, volatile bool &stop_flag) { int ret = OB_SUCCESS; - - stop_flag = stop_flag; + uint64_t tenant_id = OB_INVALID_TENANT_ID; if (OB_ISNULL(trans_ctx)) { LOG_ERROR("invalid argument", K(trans_ctx)); ret = OB_INVALID_ARGUMENT; } else { // Avoiding TransCtx recycling - uint64_t tenant_id = OB_INVALID_TENANT_ID; ObLogTenantGuard guard; ObLogTenant *tenant = NULL; @@ -839,23 +842,67 @@ int ObLogSequencer::handle_participants_ready_trans_(const bool is_dml_trans, } } } + } - if (OB_SUCC(ret)) { - TrxSortElem &trx_sort_elem = trans_ctx->get_trx_sort_elem(); - { - ObByteLockGuard guard(trans_queue_lock_); - ready_trans_queue_.push(trx_sort_elem); - } - // signal push_ready_trans_to_seq_queue_ - ready_queue_cond_.signal(); - - _DSTAT("[TRANS_QUEUE] TENANT_ID=%lu TRANS_ID=%s QUEUE_SIZE=(%lu/%ld) IS_DML=%d", - tenant_id, - to_cstring(trx_sort_elem), - ready_trans_queue_.size(), - seq_trans_queue_.get_curr_total(), - is_dml_trans); + if (OB_SUCC(ret)) { + TrxSortElem &trx_sort_elem = trans_ctx->get_trx_sort_elem(); + { + ObByteLockGuard guard(trans_queue_lock_); + ready_trans_queue_.push(trx_sort_elem); } + // signal push_ready_trans_to_seq_queue_ + ready_queue_cond_.signal(); + + _DSTAT("[TRANS_QUEUE] TENANT_ID=%lu TRANS_ID=%s QUEUE_SIZE=(%lu/%ld) IS_DML=%d", + tenant_id, + to_cstring(trx_sort_elem), + ready_trans_queue_.size(), + seq_trans_queue_.get_curr_total(), + is_dml_trans); + } + + return ret; +} + +int ObLogSequencer::wait_until_ready_queue_not_busy_(volatile bool &stop_flag) +{ + int ret = OB_SUCCESS; + const int64_t seq_trans_queue_cap = seq_trans_queue_.capacity(); + const int64_t seq_trans_threshold = seq_trans_queue_cap * TCONF.pause_redo_dispatch_task_count_threshold / 100; + bool ready_queue_busy = true; + const int64_t start_ts = get_timestamp(); + const int64_t base_sleep_us_if_busy = 100; + + while (ready_queue_busy && !stop_flag) { + // ready_queue_busy = true if: seq_trans_queue is not empty and memory hold touch warn threshold + const int64_t mem_hold = lib::get_memory_hold(); + const int64_t mem_limit = TCONF.memory_limit; + const double mem_warn_threshold = mem_limit * TCONF.memory_usage_warn_threshold / 100.0; + const int64_t seq_trans_cnt = seq_trans_queue_.get_curr_total(); + + if (mem_hold < mem_warn_threshold) { + ready_queue_busy = (seq_trans_cnt >= seq_trans_queue_cap); + } else if (mem_hold < mem_limit) { + ready_queue_busy = seq_trans_cnt > seq_trans_threshold; + } else { + // should not disaptch if exist trans not handled + ready_queue_busy = seq_trans_cnt > 0; + } + + if (ready_queue_busy) { + if (REACH_TIME_INTERVAL_THREAD_LOCAL(PRINT_SEQ_INFO_INTERVAL)) { + LOG_INFO("[FLOW_CONTROL][PAUSE_DISPATCH_TO_SEQUENCER]", + K(mem_hold), + "sequencer_queue_size", queue_part_trans_task_count_, + "ready_queue_size", ready_trans_queue_.size(), + K(seq_trans_cnt)); + } + ob_usleep(100); + } + } + + if (stop_flag) { + ret = OB_IN_STOP_STATE; } return ret; diff --git a/src/logservice/libobcdc/src/ob_log_sequencer1.h b/src/logservice/libobcdc/src/ob_log_sequencer1.h index d8c5a6bd4..cb8a394d1 100644 --- a/src/logservice/libobcdc/src/ob_log_sequencer1.h +++ b/src/logservice/libobcdc/src/ob_log_sequencer1.h @@ -169,6 +169,7 @@ private: int handle_participants_ready_trans_(const bool is_dml_trans, TransCtx *trans_ctx, volatile bool &stop_flag); + int wait_until_ready_queue_not_busy_(volatile bool &stop_flag); // Once the participants are gathered, the entire DML transaction is processed // TODO int handle_dml_trans_(ObLogTenant &tenant, TransCtx &trans_ctx, volatile bool &stop_flag); diff --git a/src/logservice/libobcdc/tests/scripts/watch_flow_control.sh b/src/logservice/libobcdc/tests/scripts/watch_flow_control.sh new file mode 100755 index 000000000..1b16645dc --- /dev/null +++ b/src/logservice/libobcdc/tests/scripts/watch_flow_control.sh @@ -0,0 +1,3 @@ +LOG_FILE=log/libobcdc.log + +watch -n 1 "if [ -f $LOG_FILE ]; then grep NEED_PAUSE log/libobcdc.log | awk '{print \$10, \$11, \$12, \$13, \$15}' | tail -n 5; fi" diff --git a/unittest/data_dictionary/data_dict_test_utils.h b/unittest/data_dictionary/data_dict_test_utils.h index a13cc810a..51b2ab46d 100644 --- a/unittest/data_dictionary/data_dict_test_utils.h +++ b/unittest/data_dictionary/data_dict_test_utils.h @@ -73,7 +73,7 @@ public: int build_table_schema(ObTableSchema &table_schema) { int ret = OB_SUCCESS; - int column_cnt = 5; + int column_cnt = 500; table_schema.set_tenant_id(get_random_uint64_()); table_schema.set_database_id(get_random_uint64_()); table_schema.set_table_id(get_random_uint64_()); @@ -101,6 +101,10 @@ public: column_schema.schema_version_ = 2341235; ObObjMeta meta; meta.set_collation_type(ObCollationType::CS_TYPE_GB18030_BIN); + common::ObString ext_info("enum {a, b, c}"); + ObArray extend_info; + extend_info.push_back(ext_info); + column_schema.set_extended_type_info(extend_info); column_schema.meta_type_ = meta; return ret; }