[OBCDC] Fix misjudge LS_OFFLINE LOG and OBCDC exit process
This commit is contained in:
		| @ -23,6 +23,7 @@ | ||||
| #include "ob_log_instance.h"                  // TCTX | ||||
| #include "ob_log_fetcher.h"                   // IObLogFetcher | ||||
| #include "logservice/restoreservice/ob_remote_log_source_allocator.h" | ||||
| #include "logservice/ob_garbage_collector.h"  // ObGCLSLog | ||||
|  | ||||
| #define STAT(level, fmt, args...) OBLOG_FETCHER_LOG(level, "[STAT] [FETCH_CTX] " fmt, ##args) | ||||
| #define _STAT(level, fmt, args...) _OBLOG_FETCHER_LOG(level, "[STAT] [FETCH_CTX] " fmt, ##args) | ||||
| @ -583,42 +584,57 @@ int LSFetchCtx::handle_offline_ls_log_(const palf::LogEntry &log_entry, | ||||
|     volatile bool &stop_flag) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   // const uint64_t log_id = log_entry.get_header().get_log_id(); | ||||
|   // const int64_t tstamp = log_entry.get_header().get_submit_timestamp(); | ||||
|   const char *buf = log_entry.get_data_buf(); | ||||
|   const int64_t buf_len = log_entry.get_data_len(); | ||||
|   const int64_t submit_ts = log_entry.get_scn().get_val_for_logservice(); | ||||
|   int64_t pos = 0; | ||||
|   logservice::ObGCLSLog gc_log; | ||||
|  | ||||
|   ISTAT("[HANDLE_OFFLINE_LOG] begin", K_(tls_id), "state", print_state(state_)); | ||||
|   if (OB_ISNULL(buf) || OB_UNLIKELY(buf_len <= 0)) { | ||||
|     ret = OB_INVALID_ARGUMENT; | ||||
|     LOG_ERROR("invalid GCLSLogEntry", KR(ret), K(log_entry), K(buf_len)); | ||||
|   } else if (OB_FAIL(gc_log.deserialize(buf, buf_len, pos))) { | ||||
|     LOG_ERROR("deserialize GCLSLog failed", KR(ret), K(log_entry), K(buf_len), K(pos)); | ||||
|   } else { | ||||
|     LOG_INFO("[HANDLE_OFFLINE_LOG] detect GCLSLog", K(gc_log), K(log_entry)); | ||||
|     logservice::ObGCLSLOGType log_type = static_cast<logservice::ObGCLSLOGType>(gc_log.get_log_type()); | ||||
|  | ||||
|   // For OFFLINE logs, only tasks in NORMAL state are processed | ||||
|   // Tasks in other states will be deleted by other scenarios responsible for the ls | ||||
|   // | ||||
|   // Ensure that the discard recycling mechanism. | ||||
|   // | ||||
|   // 1. STATE_NORMAL: discard will be set when OFFLINE logging or ls deletion DDL is encountered | ||||
|   // | ||||
|   // Note: Mechanically, we have to take precautions in many ways and cannot rely on one mechanism to guarantee ls recovery. | ||||
|   // There are two scenarios in which partitions need to be reclaimed. | ||||
|   // 1. ls deletion by DDL: this includes deleting tables, deleting partitions, deleting DBs, deleting tenants, etc. This scenario relies on DDL deletion to be sufficient | ||||
|   // The observer ensures that the ls is not iterated over in the schema after the DDL is deleted | ||||
|   int64_t pending_trans_count = 0; | ||||
|   // First ensure that all tasks in the queue are dispatched | ||||
|   if (OB_FAIL(dispatch_(stop_flag, pending_trans_count))) { | ||||
|     if (OB_IN_STOP_STATE != ret) { | ||||
|       LOG_ERROR("dispatch task fail", KR(ret), K(tls_id_)); | ||||
|     if (logservice::ObGCLSLOGType::OFFLINE_LS == log_type) { | ||||
|       ISTAT("[HANDLE_OFFLINE_LOG] begin", K_(tls_id), "state", print_state(state_)); | ||||
|  | ||||
|       // For OFFLINE logs, only tasks in NORMAL state are processed | ||||
|       // Tasks in other states will be deleted by other scenarios responsible for the ls | ||||
|       // | ||||
|       // Ensure that the discard recycling mechanism. | ||||
|       // | ||||
|       // 1. STATE_NORMAL: discard will be set when OFFLINE logging or ls deletion DDL is encountered | ||||
|       // | ||||
|       // Note: Mechanically, we have to take precautions in many ways and cannot rely on one mechanism to guarantee ls recovery. | ||||
|       // There are two scenarios in which partitions need to be reclaimed. | ||||
|       // 1. ls deletion by DDL: this includes deleting tables, deleting partitions, deleting DBs, deleting tenants, etc. This scenario relies on DDL deletion to be sufficient | ||||
|       // The observer ensures that the ls is not iterated over in the schema after the DDL is deleted | ||||
|       int64_t pending_trans_count = 0; | ||||
|       // First ensure that all tasks in the queue are dispatched | ||||
|       if (OB_FAIL(dispatch_(stop_flag, pending_trans_count))) { | ||||
|         if (OB_IN_STOP_STATE != ret) { | ||||
|           LOG_ERROR("dispatch task fail", KR(ret), K(tls_id_)); | ||||
|         } | ||||
|       } | ||||
|       // Check if there are pending transactions to be output | ||||
|       else if (OB_UNLIKELY(pending_trans_count > 0)) { | ||||
|         ret = OB_INVALID_DATA; | ||||
|         LOG_ERROR("there are still pending trans after dispatch when processing offline log, unexcept error", | ||||
|             KR(ret), K(pending_trans_count), K(tls_id_), K(state_)); | ||||
|       } else { | ||||
|         // Finally mark the ls as ready for deletion | ||||
|         // Note: there is a concurrency situation here, after a successful setup, it may be dropped into the DEAD POOL for recycling by other threads immediately | ||||
|         // Since all data is already output here, it doesn't matter if it goes to the DEAD POOL | ||||
|         set_discarded(); | ||||
|       } | ||||
|  | ||||
|       ISTAT("[HANDLE_OFFLINE_LOG] end", KR(ret), K_(tls_id), "state", print_state(state_)); | ||||
|     } | ||||
|   } | ||||
|   // Check if there are pending transactions to be output | ||||
|   else if (OB_UNLIKELY(pending_trans_count > 0)) { | ||||
|     ret = OB_INVALID_DATA; | ||||
|     LOG_ERROR("there are still pending trans after dispatch when processing offline log, unexcept error", | ||||
|         KR(ret), K(pending_trans_count), K(tls_id_), K(state_)); | ||||
|   } else { | ||||
|     // Finally mark the ls as ready for deletion | ||||
|     // Note: there is a concurrency situation here, after a successful setup, it may be dropped into the DEAD POOL for recycling by other threads immediately | ||||
|     // Since all data is already output here, it doesn't matter if it goes to the DEAD POOL | ||||
|     set_discarded(); | ||||
|   } | ||||
|  | ||||
|   ISTAT("[HANDLE_OFFLINE_LOG] end", KR(ret), K_(tls_id), "state", print_state(state_)); | ||||
|  | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 SanmuWangZJU
					SanmuWangZJU