From aaa4776a6ac07ee5aa1af4d19142ad5f60754e5a Mon Sep 17 00:00:00 2001 From: SanmuWangZJU Date: Fri, 9 Feb 2024 10:40:13 +0000 Subject: [PATCH] [OBCDC] Fix misjudge LS_OFFLINE LOG and OBCDC exit process --- .../libobcdc/src/ob_log_ls_fetch_ctx.cpp | 80 +++++++++++-------- 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp b/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp index 38912c1c0c..c3caf183eb 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp @@ -23,6 +23,7 @@ #include "ob_log_instance.h" // TCTX #include "ob_log_fetcher.h" // IObLogFetcher #include "logservice/restoreservice/ob_remote_log_source_allocator.h" +#include "logservice/ob_garbage_collector.h" // ObGCLSLog #define STAT(level, fmt, args...) OBLOG_FETCHER_LOG(level, "[STAT] [FETCH_CTX] " fmt, ##args) #define _STAT(level, fmt, args...) _OBLOG_FETCHER_LOG(level, "[STAT] [FETCH_CTX] " fmt, ##args) @@ -583,42 +584,57 @@ int LSFetchCtx::handle_offline_ls_log_(const palf::LogEntry &log_entry, volatile bool &stop_flag) { int ret = OB_SUCCESS; - // const uint64_t log_id = log_entry.get_header().get_log_id(); - // const int64_t tstamp = log_entry.get_header().get_submit_timestamp(); + const char *buf = log_entry.get_data_buf(); + const int64_t buf_len = log_entry.get_data_len(); + const int64_t submit_ts = log_entry.get_scn().get_val_for_logservice(); + int64_t pos = 0; + logservice::ObGCLSLog gc_log; - ISTAT("[HANDLE_OFFLINE_LOG] begin", K_(tls_id), "state", print_state(state_)); + if (OB_ISNULL(buf) || OB_UNLIKELY(buf_len <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid GCLSLogEntry", KR(ret), K(log_entry), K(buf_len)); + } else if (OB_FAIL(gc_log.deserialize(buf, buf_len, pos))) { + LOG_ERROR("deserialize GCLSLog failed", KR(ret), K(log_entry), K(buf_len), K(pos)); + } else { + LOG_INFO("[HANDLE_OFFLINE_LOG] detect GCLSLog", K(gc_log), K(log_entry)); + logservice::ObGCLSLOGType log_type = static_cast(gc_log.get_log_type()); - // For OFFLINE logs, only tasks in NORMAL state are processed - // Tasks in other states will be deleted by other scenarios responsible for the ls - // - // Ensure that the discard recycling mechanism. - // - // 1. STATE_NORMAL: discard will be set when OFFLINE logging or ls deletion DDL is encountered - // - // Note: Mechanically, we have to take precautions in many ways and cannot rely on one mechanism to guarantee ls recovery. - // There are two scenarios in which partitions need to be reclaimed. - // 1. ls deletion by DDL: this includes deleting tables, deleting partitions, deleting DBs, deleting tenants, etc. This scenario relies on DDL deletion to be sufficient - // The observer ensures that the ls is not iterated over in the schema after the DDL is deleted - int64_t pending_trans_count = 0; - // First ensure that all tasks in the queue are dispatched - if (OB_FAIL(dispatch_(stop_flag, pending_trans_count))) { - if (OB_IN_STOP_STATE != ret) { - LOG_ERROR("dispatch task fail", KR(ret), K(tls_id_)); + if (logservice::ObGCLSLOGType::OFFLINE_LS == log_type) { + ISTAT("[HANDLE_OFFLINE_LOG] begin", K_(tls_id), "state", print_state(state_)); + + // For OFFLINE logs, only tasks in NORMAL state are processed + // Tasks in other states will be deleted by other scenarios responsible for the ls + // + // Ensure that the discard recycling mechanism. + // + // 1. STATE_NORMAL: discard will be set when OFFLINE logging or ls deletion DDL is encountered + // + // Note: Mechanically, we have to take precautions in many ways and cannot rely on one mechanism to guarantee ls recovery. + // There are two scenarios in which partitions need to be reclaimed. + // 1. ls deletion by DDL: this includes deleting tables, deleting partitions, deleting DBs, deleting tenants, etc. This scenario relies on DDL deletion to be sufficient + // The observer ensures that the ls is not iterated over in the schema after the DDL is deleted + int64_t pending_trans_count = 0; + // First ensure that all tasks in the queue are dispatched + if (OB_FAIL(dispatch_(stop_flag, pending_trans_count))) { + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("dispatch task fail", KR(ret), K(tls_id_)); + } + } + // Check if there are pending transactions to be output + else if (OB_UNLIKELY(pending_trans_count > 0)) { + ret = OB_INVALID_DATA; + LOG_ERROR("there are still pending trans after dispatch when processing offline log, unexcept error", + KR(ret), K(pending_trans_count), K(tls_id_), K(state_)); + } else { + // Finally mark the ls as ready for deletion + // Note: there is a concurrency situation here, after a successful setup, it may be dropped into the DEAD POOL for recycling by other threads immediately + // Since all data is already output here, it doesn't matter if it goes to the DEAD POOL + set_discarded(); + } + + ISTAT("[HANDLE_OFFLINE_LOG] end", KR(ret), K_(tls_id), "state", print_state(state_)); } } - // Check if there are pending transactions to be output - else if (OB_UNLIKELY(pending_trans_count > 0)) { - ret = OB_INVALID_DATA; - LOG_ERROR("there are still pending trans after dispatch when processing offline log, unexcept error", - KR(ret), K(pending_trans_count), K(tls_id_), K(state_)); - } else { - // Finally mark the ls as ready for deletion - // Note: there is a concurrency situation here, after a successful setup, it may be dropped into the DEAD POOL for recycling by other threads immediately - // Since all data is already output here, it doesn't matter if it goes to the DEAD POOL - set_discarded(); - } - - ISTAT("[HANDLE_OFFLINE_LOG] end", KR(ret), K_(tls_id), "state", print_state(state_)); return ret; }