diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 4b0f8b2858..a3ed4b73f2 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -446,7 +446,6 @@ ob_set_subtarget(ob_storage compaction compaction/ob_column_checksum_calculator.cpp compaction/ob_index_block_micro_iterator.cpp compaction/ob_i_compaction_filter.cpp - compaction/ob_merge_schedule_info.cpp compaction/ob_partition_merge_fuser.cpp compaction/ob_partition_merge_iter.cpp compaction/ob_partition_merge_progress.cpp diff --git a/src/storage/compaction/ob_medium_compaction_mgr.cpp b/src/storage/compaction/ob_medium_compaction_mgr.cpp index 381949b98e..21cca1a534 100644 --- a/src/storage/compaction/ob_medium_compaction_mgr.cpp +++ b/src/storage/compaction/ob_medium_compaction_mgr.cpp @@ -543,6 +543,7 @@ int ObMediumCompactionInfoList::inner_deep_copy_node( } else if (OB_UNLIKELY(!inner_is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("medium info list is invalid", K(ret), KPC(this)); + medium_info_list_.remove(new_info); } else { LOG_TRACE("success to deep copy append medium info", K(ret), KPC(new_info)); } @@ -613,6 +614,7 @@ int ObMediumCompactionInfoList::deserialize( LOG_WARN("list count should be zero in old version medium list", K(ret), K(list_count)); } } else if (FALSE_IT(info_ = deserialize_info)) { + } else if (FALSE_IT(allocator_ = &allocator)) { // set allocator to call reset() when deserialize failed } else if (OB_FAIL(serialization::decode_vi64(buf, data_len, new_pos, &last_medium_scn_))) { LOG_WARN("failed to deserialize wait_check_medium_scn", K(ret), K(data_len)); } else if (OB_FAIL(serialization::decode_vi64(buf, data_len, new_pos, &list_count))) { @@ -622,8 +624,8 @@ int ObMediumCompactionInfoList::deserialize( LOG_WARN("unexpected list count", K(ret), K(list_count)); } else if (list_count > 0) { void *alloc_buf = nullptr; - ObMediumCompactionInfo *new_info = nullptr; for (int i = 0; OB_SUCC(ret) && i < list_count; ++i) { + ObMediumCompactionInfo *new_info = nullptr; if (OB_ISNULL(alloc_buf = allocator.alloc(sizeof(ObMediumCompactionInfo)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to alloc memory", K(ret)); @@ -645,11 +647,11 @@ int ObMediumCompactionInfoList::deserialize( } // end of for } if (OB_FAIL(ret)) { + reset(); } else if (OB_UNLIKELY(!inner_is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("medium info list is invalid", K(ret), KPC(this)); } else { - allocator_ = &allocator; compat_ = MEDIUM_LIST_VERSION; is_inited_ = true; pos = new_pos; diff --git a/src/storage/compaction/ob_merge_schedule_info.cpp b/src/storage/compaction/ob_merge_schedule_info.cpp deleted file mode 100644 index 0b34e4365f..0000000000 --- a/src/storage/compaction/ob_merge_schedule_info.cpp +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Copyright (c) 2021 OceanBase - * OceanBase CE is licensed under Mulan PubL v2. - * You can use this software according to the terms and conditions of the Mulan PubL v2. - * You may obtain a copy of Mulan PubL v2 at: - * http://license.coscl.org.cn/MulanPubL-2.0 - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PubL v2 for more details. - */ - -#define USING_LOG_PREFIX STORAGE_COMPACTION -#include "storage/compaction/ob_merge_schedule_info.h" -#include "observer/ob_server_event_history_table_operator.h" -#include "share/ob_force_print_log.h" - -namespace oceanbase -{ -namespace storage -{ - -using namespace oceanbase::common; - -/** - * -----------------------------------------------ObMajorMergeHistory-------------------------------------------------------- - */ - -ObMergeStatEntry::ObMergeStatEntry() - : frozen_version_(0), - start_time_(0), - finish_time_(0) -{ -} - -void ObMergeStatEntry::reset() -{ - frozen_version_ = 0; - start_time_ = 0; - finish_time_ = 0; -} - -ObMajorMergeHistory::ObMajorMergeHistory() - : lock_() -{ -} - -ObMajorMergeHistory::~ObMajorMergeHistory() -{ -} - -int ObMajorMergeHistory::notify_major_merge_start(const int64_t frozen_version) -{ - int ret = OB_SUCCESS; - ObMergeStatEntry *pentry = NULL; - if (OB_UNLIKELY(frozen_version <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("Invalid argument", K(frozen_version), K(ret)); - } else { - obsys::ObWLockGuard guard(lock_); - if (OB_FAIL(search_entry(frozen_version, pentry))) { - if (OB_ENTRY_NOT_EXIST == ret) { - pentry = &(stats_[frozen_version % MAX_KEPT_HISTORY]); - pentry->reset(); - pentry->frozen_version_ = frozen_version; - pentry->start_time_ = ::oceanbase::common::ObTimeUtility::current_time(); - } else { - LOG_WARN("Fail to search entry", K(ret)); - } - } else if (OB_ISNULL(pentry)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("Unexpected error, the pentry is NULL", K(ret)); - } - } - - if (OB_ENTRY_NOT_EXIST == ret) { - ret = OB_SUCCESS; - } - return ret; -} - -int ObMajorMergeHistory::notify_major_merge_finish(const int64_t frozen_version) -{ - int ret = OB_SUCCESS; - ObMergeStatEntry *pentry = NULL; - if (OB_UNLIKELY(frozen_version <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("Invalid argument", K(frozen_version), K(ret)); - } else { - obsys::ObWLockGuard guard(lock_); - if (OB_FAIL(search_entry(frozen_version, pentry))) { - LOG_WARN("Fail to search entry", K(ret)); - } else if (OB_ISNULL(pentry)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("Unexpected error, the pentry is NULL", K(ret)); - } else { - if (0 == pentry->finish_time_) { - pentry->finish_time_ = ::oceanbase::common::ObTimeUtility::current_time(); - LOG_INFO("set merge finish time", - K(frozen_version), - "cost_time", (pentry->finish_time_ - pentry->start_time_) / 1000000, - "start_time", time2str(pentry->start_time_), - "finish_time", time2str(pentry->finish_time_)); - } - } - } - return ret; -} - -int ObMajorMergeHistory::get_entry(const int64_t frozen_version, ObMergeStatEntry &entry) -{ - int ret = OB_SUCCESS; - ObMergeStatEntry *pentry = NULL; - if (OB_UNLIKELY(frozen_version <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("Invalid argument", K(frozen_version), K(ret)); - } else { - obsys::ObRLockGuard guard(lock_); - if (OB_FAIL(search_entry(frozen_version, pentry))) { - if (OB_ENTRY_NOT_EXIST == ret) { - entry.reset(); - } else { - LOG_WARN("Fail to search entry", K(frozen_version), K(ret)); - } - } else { - entry = *pentry; - } - } - return ret; -} - -int ObMajorMergeHistory::search_entry(const int64_t frozen_version, ObMergeStatEntry *&pentry) -{ - int ret = OB_SUCCESS; - pentry = NULL; - if (OB_UNLIKELY(frozen_version <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("Invalid argument", K(frozen_version), K(ret)); - } else if (stats_[frozen_version % MAX_KEPT_HISTORY].frozen_version_ != frozen_version) { - ret = OB_ENTRY_NOT_EXIST; - } else { - pentry = &(stats_[frozen_version % MAX_KEPT_HISTORY]); - } - return ret; -} - -/* - * ----------------------------------------------ObMinorMergeHistory-------------------------------------------------- - */ - -ObMinorMergeHistory::ObMinorMergeHistory(const uint64_t tenant_id) - : mutex_(common::ObLatchIds::INFO_MGR_LOCK), - count_(0), - tenant_id_(tenant_id) -{ -} - -ObMinorMergeHistory::~ObMinorMergeHistory() -{ -} - -int ObMinorMergeHistory::notify_minor_merge_start(const int64_t snapshot_version) -{ - int ret = OB_SUCCESS; - lib::ObMutexGuard guard(mutex_); - if (MAX_MINOR_HISTORY == count_) { - MEMMOVE(snapshot_history_, &snapshot_history_[1], sizeof(int64_t) * (count_ - 1)); - --count_; - } - int64_t insert_pos = 0; - for (int64_t i = count_ - 1; -1 == insert_pos && i >= 0; --i) { - if (snapshot_history_[i] < snapshot_version) { - insert_pos = i + 1; - } - } - MEMMOVE(&snapshot_history_[insert_pos + 1], &snapshot_history_[insert_pos], - sizeof(int64_t) * (count_ - insert_pos)); - snapshot_history_[insert_pos] = snapshot_version; - ++count_; - SERVER_EVENT_ADD( - "minor_merge", "minor merge start", - "tenant_id", tenant_id_, - "snapshot_version", snapshot_version, - "checkpoint_type", "DATA_CKPT", - "checkpoint_cluster_version", GET_MIN_CLUSTER_VERSION()); - - FLOG_INFO("[MINOR_MERGE_HISTORY] minor merge start", K(tenant_id_), K(snapshot_version), K_(count)); - return ret; -} - -int ObMinorMergeHistory::notify_minor_merge_finish(const int64_t snapshot_version) -{ - int ret = OB_SUCCESS; - lib::ObMutexGuard guard(mutex_); - if (count_ > 0) { - int64_t i = 0; - bool is_found = false; - for (; i < count_ ; ++i) { - if (snapshot_version >= snapshot_history_[i]) { - SERVER_EVENT_ADD( - "minor_merge", "minor merge finish", - "tenant_id", tenant_id_, - "snapshot_version", snapshot_history_[i], - "checkpoint_type", "DATA_CKPT", - "checkpoint_cluster_version", GET_MIN_CLUSTER_VERSION()); - FLOG_INFO("[MINOR_MERGE_HISTORY] minor merge finish", K(tenant_id_), K(snapshot_version), - K_(count)); - is_found = true; - } else { - break; - } - } - if (is_found) { - if (i < count_) { - MEMMOVE(snapshot_history_, &snapshot_history_[i], sizeof(int64_t) * (count_ - i)); - } - count_ -= i; - LOG_INFO("[MINOR_MERGE_HISTORY] notify minor merge finish", K(tenant_id_), K_(count)); - } - } - return ret; -} - - -} // namespace storage -} // namespace oceanbase - diff --git a/src/storage/compaction/ob_merge_schedule_info.h b/src/storage/compaction/ob_merge_schedule_info.h deleted file mode 100644 index 7ec14b7d0f..0000000000 --- a/src/storage/compaction/ob_merge_schedule_info.h +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Copyright (c) 2021 OceanBase - * OceanBase CE is licensed under Mulan PubL v2. - * You can use this software according to the terms and conditions of the Mulan PubL v2. - * You may obtain a copy of Mulan PubL v2 at: - * http://license.coscl.org.cn/MulanPubL-2.0 - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PubL v2 for more details. - */ - -#ifndef OCEANBASE_STORAGE_COMPACTION_OB_MERGE_SCHEDULE_INFO_H_ -#define OCEANBASE_STORAGE_COMPACTION_OB_MERGE_SCHEDULE_INFO_H_ - -#include "lib/queue/ob_dedup_queue.h" -#include "storage/ob_i_store.h" -#include "storage/ob_i_table.h" -#include "storage/blocksstable/ob_macro_block_id.h" - -namespace oceanbase -{ -namespace storage -{ - -struct ObMergeStatEntry -{ - ObMergeStatEntry(); - void reset(); - int64_t frozen_version_; - int64_t start_time_; - int64_t finish_time_; -}; - -// Major Merge History -class ObMajorMergeHistory -{ -public: - ObMajorMergeHistory(); - virtual ~ObMajorMergeHistory(); - int notify_major_merge_start(const int64_t frozen_version); - int notify_major_merge_finish(const int64_t frozen_version); - int get_entry(const int64_t frozen_version, ObMergeStatEntry &entry); -private: - int search_entry(const int64_t frozen_version, ObMergeStatEntry *&pentry); - static const int64_t MAX_KEPT_HISTORY = 16; - obsys::ObRWLock lock_; - ObMergeStatEntry stats_[MAX_KEPT_HISTORY]; -private: - DISALLOW_COPY_AND_ASSIGN(ObMajorMergeHistory); -}; - -class ObMinorMergeHistory -{ -public: - explicit ObMinorMergeHistory(const uint64_t tenant_id); - virtual ~ObMinorMergeHistory(); - int notify_minor_merge_start(const int64_t snapshot_version); - int notify_minor_merge_finish(const int64_t snapshot_version); -private: - static const int64_t MAX_MINOR_HISTORY = 16; - lib::ObMutex mutex_; - int64_t count_; - uint64_t tenant_id_; - int64_t snapshot_history_[MAX_MINOR_HISTORY]; -private: - DISALLOW_COPY_AND_ASSIGN(ObMinorMergeHistory); -}; - -} /* namespace storage */ -} /* namespace oceanbase */ - -#endif /* OCEANBASE_STORAGE_COMPACTION_OB_MERGE_INFO_H_ */