fix medium list may have invalid info

This commit is contained in:
yangqise7en
2023-05-25 12:16:54 +00:00
committed by ob-robot
parent 88513f9ed3
commit 85bc7d4d0e
4 changed files with 4 additions and 303 deletions

View File

@ -446,7 +446,6 @@ ob_set_subtarget(ob_storage compaction
compaction/ob_column_checksum_calculator.cpp compaction/ob_column_checksum_calculator.cpp
compaction/ob_index_block_micro_iterator.cpp compaction/ob_index_block_micro_iterator.cpp
compaction/ob_i_compaction_filter.cpp compaction/ob_i_compaction_filter.cpp
compaction/ob_merge_schedule_info.cpp
compaction/ob_partition_merge_fuser.cpp compaction/ob_partition_merge_fuser.cpp
compaction/ob_partition_merge_iter.cpp compaction/ob_partition_merge_iter.cpp
compaction/ob_partition_merge_progress.cpp compaction/ob_partition_merge_progress.cpp

View File

@ -543,6 +543,7 @@ int ObMediumCompactionInfoList::inner_deep_copy_node(
} else if (OB_UNLIKELY(!inner_is_valid())) { } else if (OB_UNLIKELY(!inner_is_valid())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("medium info list is invalid", K(ret), KPC(this)); LOG_WARN("medium info list is invalid", K(ret), KPC(this));
medium_info_list_.remove(new_info);
} else { } else {
LOG_TRACE("success to deep copy append medium info", K(ret), KPC(new_info)); LOG_TRACE("success to deep copy append medium info", K(ret), KPC(new_info));
} }
@ -613,6 +614,7 @@ int ObMediumCompactionInfoList::deserialize(
LOG_WARN("list count should be zero in old version medium list", K(ret), K(list_count)); LOG_WARN("list count should be zero in old version medium list", K(ret), K(list_count));
} }
} else if (FALSE_IT(info_ = deserialize_info)) { } else if (FALSE_IT(info_ = deserialize_info)) {
} else if (FALSE_IT(allocator_ = &allocator)) { // set allocator to call reset() when deserialize failed
} else if (OB_FAIL(serialization::decode_vi64(buf, data_len, new_pos, &last_medium_scn_))) { } else if (OB_FAIL(serialization::decode_vi64(buf, data_len, new_pos, &last_medium_scn_))) {
LOG_WARN("failed to deserialize wait_check_medium_scn", K(ret), K(data_len)); LOG_WARN("failed to deserialize wait_check_medium_scn", K(ret), K(data_len));
} else if (OB_FAIL(serialization::decode_vi64(buf, data_len, new_pos, &list_count))) { } else if (OB_FAIL(serialization::decode_vi64(buf, data_len, new_pos, &list_count))) {
@ -622,8 +624,8 @@ int ObMediumCompactionInfoList::deserialize(
LOG_WARN("unexpected list count", K(ret), K(list_count)); LOG_WARN("unexpected list count", K(ret), K(list_count));
} else if (list_count > 0) { } else if (list_count > 0) {
void *alloc_buf = nullptr; void *alloc_buf = nullptr;
ObMediumCompactionInfo *new_info = nullptr;
for (int i = 0; OB_SUCC(ret) && i < list_count; ++i) { for (int i = 0; OB_SUCC(ret) && i < list_count; ++i) {
ObMediumCompactionInfo *new_info = nullptr;
if (OB_ISNULL(alloc_buf = allocator.alloc(sizeof(ObMediumCompactionInfo)))) { if (OB_ISNULL(alloc_buf = allocator.alloc(sizeof(ObMediumCompactionInfo)))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret)); LOG_WARN("failed to alloc memory", K(ret));
@ -645,11 +647,11 @@ int ObMediumCompactionInfoList::deserialize(
} // end of for } // end of for
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
reset();
} else if (OB_UNLIKELY(!inner_is_valid())) { } else if (OB_UNLIKELY(!inner_is_valid())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("medium info list is invalid", K(ret), KPC(this)); LOG_WARN("medium info list is invalid", K(ret), KPC(this));
} else { } else {
allocator_ = &allocator;
compat_ = MEDIUM_LIST_VERSION; compat_ = MEDIUM_LIST_VERSION;
is_inited_ = true; is_inited_ = true;
pos = new_pos; pos = new_pos;

View File

@ -1,227 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE_COMPACTION
#include "storage/compaction/ob_merge_schedule_info.h"
#include "observer/ob_server_event_history_table_operator.h"
#include "share/ob_force_print_log.h"
namespace oceanbase
{
namespace storage
{
using namespace oceanbase::common;
/**
* -----------------------------------------------ObMajorMergeHistory--------------------------------------------------------
*/
ObMergeStatEntry::ObMergeStatEntry()
: frozen_version_(0),
start_time_(0),
finish_time_(0)
{
}
void ObMergeStatEntry::reset()
{
frozen_version_ = 0;
start_time_ = 0;
finish_time_ = 0;
}
ObMajorMergeHistory::ObMajorMergeHistory()
: lock_()
{
}
ObMajorMergeHistory::~ObMajorMergeHistory()
{
}
int ObMajorMergeHistory::notify_major_merge_start(const int64_t frozen_version)
{
int ret = OB_SUCCESS;
ObMergeStatEntry *pentry = NULL;
if (OB_UNLIKELY(frozen_version <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(frozen_version), K(ret));
} else {
obsys::ObWLockGuard guard(lock_);
if (OB_FAIL(search_entry(frozen_version, pentry))) {
if (OB_ENTRY_NOT_EXIST == ret) {
pentry = &(stats_[frozen_version % MAX_KEPT_HISTORY]);
pentry->reset();
pentry->frozen_version_ = frozen_version;
pentry->start_time_ = ::oceanbase::common::ObTimeUtility::current_time();
} else {
LOG_WARN("Fail to search entry", K(ret));
}
} else if (OB_ISNULL(pentry)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected error, the pentry is NULL", K(ret));
}
}
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
}
return ret;
}
int ObMajorMergeHistory::notify_major_merge_finish(const int64_t frozen_version)
{
int ret = OB_SUCCESS;
ObMergeStatEntry *pentry = NULL;
if (OB_UNLIKELY(frozen_version <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(frozen_version), K(ret));
} else {
obsys::ObWLockGuard guard(lock_);
if (OB_FAIL(search_entry(frozen_version, pentry))) {
LOG_WARN("Fail to search entry", K(ret));
} else if (OB_ISNULL(pentry)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected error, the pentry is NULL", K(ret));
} else {
if (0 == pentry->finish_time_) {
pentry->finish_time_ = ::oceanbase::common::ObTimeUtility::current_time();
LOG_INFO("set merge finish time",
K(frozen_version),
"cost_time", (pentry->finish_time_ - pentry->start_time_) / 1000000,
"start_time", time2str(pentry->start_time_),
"finish_time", time2str(pentry->finish_time_));
}
}
}
return ret;
}
int ObMajorMergeHistory::get_entry(const int64_t frozen_version, ObMergeStatEntry &entry)
{
int ret = OB_SUCCESS;
ObMergeStatEntry *pentry = NULL;
if (OB_UNLIKELY(frozen_version <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(frozen_version), K(ret));
} else {
obsys::ObRLockGuard guard(lock_);
if (OB_FAIL(search_entry(frozen_version, pentry))) {
if (OB_ENTRY_NOT_EXIST == ret) {
entry.reset();
} else {
LOG_WARN("Fail to search entry", K(frozen_version), K(ret));
}
} else {
entry = *pentry;
}
}
return ret;
}
int ObMajorMergeHistory::search_entry(const int64_t frozen_version, ObMergeStatEntry *&pentry)
{
int ret = OB_SUCCESS;
pentry = NULL;
if (OB_UNLIKELY(frozen_version <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(frozen_version), K(ret));
} else if (stats_[frozen_version % MAX_KEPT_HISTORY].frozen_version_ != frozen_version) {
ret = OB_ENTRY_NOT_EXIST;
} else {
pentry = &(stats_[frozen_version % MAX_KEPT_HISTORY]);
}
return ret;
}
/*
* ----------------------------------------------ObMinorMergeHistory--------------------------------------------------
*/
ObMinorMergeHistory::ObMinorMergeHistory(const uint64_t tenant_id)
: mutex_(common::ObLatchIds::INFO_MGR_LOCK),
count_(0),
tenant_id_(tenant_id)
{
}
ObMinorMergeHistory::~ObMinorMergeHistory()
{
}
int ObMinorMergeHistory::notify_minor_merge_start(const int64_t snapshot_version)
{
int ret = OB_SUCCESS;
lib::ObMutexGuard guard(mutex_);
if (MAX_MINOR_HISTORY == count_) {
MEMMOVE(snapshot_history_, &snapshot_history_[1], sizeof(int64_t) * (count_ - 1));
--count_;
}
int64_t insert_pos = 0;
for (int64_t i = count_ - 1; -1 == insert_pos && i >= 0; --i) {
if (snapshot_history_[i] < snapshot_version) {
insert_pos = i + 1;
}
}
MEMMOVE(&snapshot_history_[insert_pos + 1], &snapshot_history_[insert_pos],
sizeof(int64_t) * (count_ - insert_pos));
snapshot_history_[insert_pos] = snapshot_version;
++count_;
SERVER_EVENT_ADD(
"minor_merge", "minor merge start",
"tenant_id", tenant_id_,
"snapshot_version", snapshot_version,
"checkpoint_type", "DATA_CKPT",
"checkpoint_cluster_version", GET_MIN_CLUSTER_VERSION());
FLOG_INFO("[MINOR_MERGE_HISTORY] minor merge start", K(tenant_id_), K(snapshot_version), K_(count));
return ret;
}
int ObMinorMergeHistory::notify_minor_merge_finish(const int64_t snapshot_version)
{
int ret = OB_SUCCESS;
lib::ObMutexGuard guard(mutex_);
if (count_ > 0) {
int64_t i = 0;
bool is_found = false;
for (; i < count_ ; ++i) {
if (snapshot_version >= snapshot_history_[i]) {
SERVER_EVENT_ADD(
"minor_merge", "minor merge finish",
"tenant_id", tenant_id_,
"snapshot_version", snapshot_history_[i],
"checkpoint_type", "DATA_CKPT",
"checkpoint_cluster_version", GET_MIN_CLUSTER_VERSION());
FLOG_INFO("[MINOR_MERGE_HISTORY] minor merge finish", K(tenant_id_), K(snapshot_version),
K_(count));
is_found = true;
} else {
break;
}
}
if (is_found) {
if (i < count_) {
MEMMOVE(snapshot_history_, &snapshot_history_[i], sizeof(int64_t) * (count_ - i));
}
count_ -= i;
LOG_INFO("[MINOR_MERGE_HISTORY] notify minor merge finish", K(tenant_id_), K_(count));
}
}
return ret;
}
} // namespace storage
} // namespace oceanbase

View File

@ -1,73 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_STORAGE_COMPACTION_OB_MERGE_SCHEDULE_INFO_H_
#define OCEANBASE_STORAGE_COMPACTION_OB_MERGE_SCHEDULE_INFO_H_
#include "lib/queue/ob_dedup_queue.h"
#include "storage/ob_i_store.h"
#include "storage/ob_i_table.h"
#include "storage/blocksstable/ob_macro_block_id.h"
namespace oceanbase
{
namespace storage
{
struct ObMergeStatEntry
{
ObMergeStatEntry();
void reset();
int64_t frozen_version_;
int64_t start_time_;
int64_t finish_time_;
};
// Major Merge History
class ObMajorMergeHistory
{
public:
ObMajorMergeHistory();
virtual ~ObMajorMergeHistory();
int notify_major_merge_start(const int64_t frozen_version);
int notify_major_merge_finish(const int64_t frozen_version);
int get_entry(const int64_t frozen_version, ObMergeStatEntry &entry);
private:
int search_entry(const int64_t frozen_version, ObMergeStatEntry *&pentry);
static const int64_t MAX_KEPT_HISTORY = 16;
obsys::ObRWLock lock_;
ObMergeStatEntry stats_[MAX_KEPT_HISTORY];
private:
DISALLOW_COPY_AND_ASSIGN(ObMajorMergeHistory);
};
class ObMinorMergeHistory
{
public:
explicit ObMinorMergeHistory(const uint64_t tenant_id);
virtual ~ObMinorMergeHistory();
int notify_minor_merge_start(const int64_t snapshot_version);
int notify_minor_merge_finish(const int64_t snapshot_version);
private:
static const int64_t MAX_MINOR_HISTORY = 16;
lib::ObMutex mutex_;
int64_t count_;
uint64_t tenant_id_;
int64_t snapshot_history_[MAX_MINOR_HISTORY];
private:
DISALLOW_COPY_AND_ASSIGN(ObMinorMergeHistory);
};
} /* namespace storage */
} /* namespace oceanbase */
#endif /* OCEANBASE_STORAGE_COMPACTION_OB_MERGE_INFO_H_ */