From 074c52f174cd813fc6b85440649db82589385f65 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 27 Oct 2022 03:05:36 +0000 Subject: [PATCH] [OBCDC] Modify participant structure --- .../libobcdc/src/ob_log_part_trans_task.cpp | 55 +++++++------------ .../libobcdc/src/ob_log_part_trans_task.h | 11 ++-- .../src/ob_log_resource_collector.cpp | 7 ++- .../src/ob_log_rocksdb_store_service.cpp | 4 +- .../libobcdc/src/ob_log_sequencer1.cpp | 4 +- src/logservice/libobcdc/src/ob_log_tenant.cpp | 3 + .../libobcdc/src/ob_log_trans_ctx.cpp | 4 +- .../logrouteservice/ob_log_route_service.cpp | 26 +++++---- 8 files changed, 56 insertions(+), 58 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp index 46ac0be05..078664cbf 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp @@ -403,7 +403,7 @@ int MutatorRow::parse_columns_( LOG_DEBUG("handle_lob_v2_data", K(column_stored_idx), K(lob_common), K(obj)); if (! is_out_row) { - LOG_INFO("is_lob_v2 in row", K(column_id), K(is_lob_v2), K(lob_common), K(obj)); + LOG_DEBUG("is_lob_v2 in row", K(column_id), K(is_lob_v2), K(lob_common), K(obj)); obj.set_string(obj.get_type(), lob_common.get_inrow_data_ptr(), lob_common.get_byte_size(datum.len_)); } else { const ObLobData &lob_data = *(reinterpret_cast(lob_common.buffer_)); @@ -2030,8 +2030,7 @@ PartTransTask::PartTransTask() : commit_log_lsn_(), trans_type_(transaction::TransType::UNKNOWN_TRANS), is_xa_or_dup_(false), - participant_count_(0), - participants_(NULL), + participants_(), trace_id_(), trace_info_(), sorted_log_entry_info_(), @@ -2164,8 +2163,7 @@ void PartTransTask::reset() commit_log_lsn_.reset(); trans_type_ = transaction::TransType::UNKNOWN_TRANS; is_xa_or_dup_ = false; - participant_count_ = 0; - participants_ = NULL; + participants_.reset(); // The trace_id memory does not need to be freed separately, the allocator frees it all together trace_id_.reset(); trace_info_.reset(); @@ -2261,7 +2259,9 @@ int PartTransTask::push_redo_log( if (OB_SUCC(ret) && need_store_data && is_row_completed) { if (OB_FAIL(get_and_submit_store_task_(tls_id_.get_tenant_id(), row_flags, store_log_lsn, data_buf, data_len))) { - LOG_ERROR("get_and_submit_store_task_ fail", KR(ret), K_(tls_id), K(row_flags)); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("get_and_submit_store_task_ fail", KR(ret), K_(tls_id), K(row_flags)); + } } } // need_store_data } @@ -3363,15 +3363,14 @@ int PartTransTask::init_participant_array_( const palf::LSN &commit_log_lsn) { int ret = OB_SUCCESS; - transaction::ObLSLogInfo *part_array = NULL; const int64_t part_count = is_single_ls_trans() ? 1 : participants.count(); if (OB_UNLIKELY(! tls_id_.is_valid() || ! commit_log_lsn.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_ERROR("invalid tls_id or commit_log_lsn", K_(tls_id), K(commit_log_lsn)); - } else if (OB_UNLIKELY(NULL != participants_) || OB_UNLIKELY(participant_count_ > 0)) { - LOG_ERROR("participant has been initialized", K(participants_), K(participant_count_)); + LOG_ERROR("invalid tls_id or commit_log_lsn", KR(ret), K_(tls_id), K(commit_log_lsn)); + } else if (OB_UNLIKELY(participants_.count() > 0)) { ret = OB_INIT_TWICE; + LOG_ERROR("participant has been initialized", KR(ret), K(participants_)); // participants record prepared ls info, should be empty if is single_ls_trans. } else if (OB_UNLIKELY(is_single_ls_trans() && part_count != 1) || OB_UNLIKELY(is_dist_trans() && ! is_xa_or_dup_ && part_count <= 1)) { @@ -3379,19 +3378,13 @@ int PartTransTask::init_participant_array_( LOG_ERROR("trans_type is not consistent with participant_count", KR(ret), K_(tls_id), K_(trans_id), K_(trans_type), K(participants)); } else { - int64_t alloc_size = part_count * sizeof(transaction::ObLSLogInfo); - part_array = static_cast(allocator_.alloc(alloc_size)); - - if (OB_ISNULL(part_array)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("allocate memory for participant array fail", KR(ret), K_(tls_id), K_(trans_id), - K_(trans_type), K(part_count), K(alloc_size), K(participants)); - } else if (is_single_ls_trans()) { - new (part_array) transaction::ObLSLogInfo(tls_id_.get_ls_id(), commit_log_lsn); - if (OB_UNLIKELY(! part_array->is_valid())) { + if (is_single_ls_trans()) { + if (OB_FAIL(participants_.push_back(transaction::ObLSLogInfo(tls_id_.get_ls_id(), commit_log_lsn)))) { + LOG_ERROR("participants_ push_back failed", KR(ret), KPC(this)); + } else if (OB_UNLIKELY(! participants_[0].is_valid())) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("unexcepted invalid part_array", KR(ret), KPC(part_array), KPC(this)); - } + LOG_ERROR("unexcepted invalid part_array", KR(ret), K(participants_), KPC(this)); + } else {} } else { for (int64_t index = 0; OB_SUCC(ret) && index < part_count; index++) { const transaction::ObLSLogInfo &part_log_info = participants.at(index); @@ -3400,28 +3393,19 @@ int PartTransTask::init_participant_array_( ret = OB_ERR_UNEXPECTED; LOG_ERROR("part_log_info recorded in TransCommitLog is invalid", KR(ret), K_(tls_id), K_(trans_id), K_(trans_type), K(participants), K(part_log_info)); - } else { - new(part_array + index) transaction::ObLSLogInfo(part_log_info.get_ls_id(), part_log_info.get_lsn()); - } + } else if (OB_FAIL(participants_.push_back(part_log_info))) { + LOG_ERROR("participants_ push_back failed", KR(ret), KPC(this)); + } else {} } } } - if (OB_SUCC(ret)) { - participants_ = part_array; - participant_count_ = part_count; - } else { - if (NULL != part_array) { - allocator_.free(part_array); - part_array = NULL; - } - } - return ret; } void PartTransTask::destroy_participant_array_() { + /* if (NULL != participants_ && participant_count_ > 0) { for (int64_t index = 0; index < participant_count_; index++) { participants_[index].~ObLSLogInfo(); @@ -3431,6 +3415,7 @@ void PartTransTask::destroy_participant_array_() participants_ = NULL; participant_count_ = 0; } + */ } int PartTransTask::set_participants( diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.h b/src/logservice/libobcdc/src/ob_log_part_trans_task.h index 6a1ab3342..2a408abd1 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.h @@ -982,14 +982,14 @@ public: common::ObIAllocator &get_allocator() { return allocator_; } - const transaction::ObLSLogInfo *get_participants() const + const transaction::ObLSLogInfoArray &get_participants() const { return participants_; } int64_t get_participant_count() const { - return participant_count_; + return participants_.count(); } // for unittest start @@ -1090,8 +1090,8 @@ public: K_(prepare_log_lsn), K_(commit_ts), K_(commit_log_lsn), - K_(participant_count), - KP_(participants), + "participant_count", participants_.count(), + K_(participants), K_(trace_id), K_(trace_info), K_(sorted_log_entry_info), @@ -1184,9 +1184,8 @@ private: transaction::TransType trans_type_; bool is_xa_or_dup_; // true if xa dist trans or duplicate table trans. - int64_t participant_count_; // participants info, used for determine the sequence of trans at sequencer moudle. - transaction::ObLSLogInfo *participants_; + transaction::ObLSLogInfoArray participants_; // App Trace ID (get from commit_info log) ObString trace_id_; // App Trace Info (get from commit_info log) diff --git a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp index cd7ddc735..ccb5c46f8 100644 --- a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp +++ b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp @@ -406,6 +406,9 @@ int ObLogResourceCollector::push_task_into_queue_(ObLogResourceRecycleTask &task if (OB_TIMEOUT != ret) { break; + } else { + // When timeout, need to retry + ret = OB_SUCCESS; } } // Note: After a task is pushed to the queue, it may be recycled quickly and the task cannot be accessed later @@ -613,7 +616,9 @@ int ObLogResourceCollector::revert_dml_binlog_record_(ObLogBR &br, volatile bool if (OB_SUCC(ret)) { if (OB_FAIL(dec_ref_cnt_and_try_to_recycle_log_entry_task_(br))) { - LOG_ERROR("dec_ref_cnt_and_try_to_recycle_log_entry_task_ fail", KR(ret)); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("dec_ref_cnt_and_try_to_recycle_log_entry_task_ fail", KR(ret)); + } } } } diff --git a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp index 66a2a2254..60a159b0f 100644 --- a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp +++ b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp @@ -261,12 +261,14 @@ int RocksDbStoreService::del(void *cf_handle, const std::string &key) { int ret = OB_SUCCESS; rocksdb::ColumnFamilyHandle *column_family_handle = static_cast(cf_handle); + rocksdb::WriteOptions writer_options; + writer_options.disableWAL = true; if (OB_ISNULL(column_family_handle)) { LOG_ERROR("column_family_handle is NULL"); ret = OB_ERR_UNEXPECTED; } else { - rocksdb::Status s = m_db_->Delete(rocksdb::WriteOptions(), column_family_handle, key); + rocksdb::Status s = m_db_->Delete(writer_options, column_family_handle, key); if (!s.ok()) { LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str()); diff --git a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp index 038529c2e..9974c7ceb 100644 --- a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp +++ b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp @@ -678,7 +678,9 @@ int ObLogSequencer::push_task_into_redo_dispatcher_(TransCtx &trans_ctx, volatil int ret = OB_SUCCESS; if (OB_FAIL(redo_dispatcher_->dispatch_trans_redo(trans_ctx, stop_flag))) { - LOG_ERROR("failed to dispatch trans redo", KR(ret), K(trans_ctx), K(stop_flag)); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("failed to dispatch trans redo", KR(ret), K(trans_ctx), K(stop_flag)); + } } return ret; diff --git a/src/logservice/libobcdc/src/ob_log_tenant.cpp b/src/logservice/libobcdc/src/ob_log_tenant.cpp index 639415690..49226d6de 100644 --- a/src/logservice/libobcdc/src/ob_log_tenant.cpp +++ b/src/logservice/libobcdc/src/ob_log_tenant.cpp @@ -746,6 +746,7 @@ void ObLogTenant::print_stat_info() "SEQ(GB=%ld,CMT=%ld) " "SCHEMA(GB=%ld,CUR=%ld) " "CMT_SCHEMA(CUR=%ld,NEXT=%ld) " + "CHECKPOINT(TX=%ld(%s), GHB=%ld(%s)) " "DROP_TS=%s " "DDL_TABLE=%lu", tenant_id_, tenant_name_, print_state(get_tenant_state()), get_tenant_state(), @@ -755,6 +756,8 @@ void ObLogTenant::print_stat_info() get_global_seq(), NULL == task_queue_ ? 0 : task_queue_->get_next_task_seq(), get_global_schema_version(), get_schema_version(), committer_cur_schema_version_, committer_next_trans_schema_version_, + committer_trans_commit_version_, NTS_TO_STR(committer_trans_commit_version_), + committer_global_heartbeat_, NTS_TO_STR(committer_global_heartbeat_), NTS_TO_STR(drop_tenant_tstamp_), ddl_table_id); } diff --git a/src/logservice/libobcdc/src/ob_log_trans_ctx.cpp b/src/logservice/libobcdc/src/ob_log_trans_ctx.cpp index 31e687938..62ba672ae 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_ctx.cpp +++ b/src/logservice/libobcdc/src/ob_log_trans_ctx.cpp @@ -306,7 +306,7 @@ int TransCtx::prepare_( const uint64_t tenant_id = host_tls_id.get_tenant_id(); int64_t host_commit_log_timestamp = host.get_commit_ts(); const palf::LSN &host_commit_log_lsn = host.get_commit_log_lsn(); - const ObLSLogInfo *part_array = host.get_participants(); + const transaction::ObLSLogInfoArray &part_array = host.get_participants(); int64_t part_count = host.get_participant_count(); const int64_t trans_commit_version = host.get_trans_commit_version(); tenant_id_ = tenant_id; @@ -314,7 +314,7 @@ int TransCtx::prepare_( // default serve bool is_serving_host_part = true; - if (OB_UNLIKELY(part_count <= 0) || OB_ISNULL(part_array)) { + if (OB_UNLIKELY(part_count <= 0)) { LOG_ERROR("invalid participant array", K(part_count), K(part_array), K(host)); ret = OB_ERR_UNEXPECTED; } diff --git a/src/logservice/logrouteservice/ob_log_route_service.cpp b/src/logservice/logrouteservice/ob_log_route_service.cpp index 5b7935323..6be2204ce 100644 --- a/src/logservice/logrouteservice/ob_log_route_service.cpp +++ b/src/logservice/logrouteservice/ob_log_route_service.cpp @@ -145,19 +145,21 @@ void ObLogRouteService::stop() void ObLogRouteService::wait() { - LOG_INFO("ObLogRouteService wait begin"); - TG_WAIT(lib::TGDefIDs::LogRouterTimer); - int64_t num = 0; - int ret = OB_SUCCESS; - while (OB_SUCC(TG_GET_QUEUE_NUM(tg_id_, num)) && num > 0) { - PAUSE(); + if (IS_INIT) { + LOG_INFO("ObLogRouteService wait begin"); + TG_WAIT(lib::TGDefIDs::LogRouterTimer); + int64_t num = 0; + int ret = OB_SUCCESS; + while (OB_SUCC(TG_GET_QUEUE_NUM(tg_id_, num)) && num > 0) { + PAUSE(); + } + if (OB_FAIL(ret)) { + CLOG_LOG(WARN, "ObLogRouteService failed to get queue number"); + } + TG_STOP(tg_id_); + TG_WAIT(tg_id_); + LOG_INFO("ObLogRouteService wait finish"); } - if (OB_FAIL(ret)) { - CLOG_LOG(WARN, "ObLogApplyService failed to get queue number"); - } - TG_STOP(tg_id_); - TG_WAIT(tg_id_); - LOG_INFO("ObLogRouteService wait finish"); } void ObLogRouteService::destroy()