Fix the issue of copy inconsistency caused by concurrent clog writing in the transfer start phase and medium compaction

This commit is contained in:
WenJinyu
2023-07-21 10:12:50 +00:00
committed by ob-robot
parent 227bef6aed
commit 52d353303f
7 changed files with 197 additions and 28 deletions

View File

@ -826,8 +826,11 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
if (OB_TMP_FAIL(get_suspect_info_and_print(MEDIUM_MERGE, share::ObLSID(INT64_MAX), ObTabletID(INT64_MAX)))) {
LOG_WARN("failed get tenant merge suspect info", K(tmp_ret));
}
if ((!scheduler->could_major_merge_start() || !scheduler->could_schedule_medium())
if ((!scheduler->could_major_merge_start())
&& can_add_diagnose_info()) {
const int64_t OB_MAX_BUF_LENGTH = 1024;
char ls_info_buf[OB_MAX_BUF_LENGTH] = {0};
scheduler->print_prohibit_medium_ls_info(ls_info_buf, OB_MAX_BUF_LENGTH);
SET_DIAGNOSE_INFO(
info_array_[idx_++],
!scheduler->could_major_merge_start() ? MAJOR_MERGE : MEDIUM_MERGE,
@ -838,7 +841,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
ObTimeUtility::fast_current_time(),
"info", "major or medium may be suspended",
"could_major_merge", scheduler->could_major_merge_start(),
"could_schedule_medium", scheduler->could_schedule_medium());
"prohibit_medium_ls_info", ls_info_buf);
}
}

View File

@ -216,7 +216,7 @@ ObTenantTabletScheduler::ObTenantTabletScheduler()
medium_ls_tablet_iter_(true/*is_major*/),
error_tablet_cnt_(0),
allow_schedule_medium_lock_(),
allow_schedule_medium_flag_(true),
prohibit_medium_ls_id_map_(),
ls_locality_cache_()
{
STATIC_ASSERT(static_cast<int64_t>(NO_MAJOR_MERGE_TYPE_CNT) == ARRAYSIZEOF(MERGE_TYPES), "merge type array len is mismatch");
@ -248,6 +248,7 @@ void ObTenantTabletScheduler::destroy()
minor_ls_tablet_iter_.reset();
medium_ls_tablet_iter_.reset();
ls_locality_cache_.reset();
prohibit_medium_ls_id_map_.destroy();
is_inited_ = false;
LOG_INFO("The ObTenantTabletScheduler destroy");
}
@ -280,6 +281,8 @@ int ObTenantTabletScheduler::init()
LOG_WARN("Fail to init bloom filter queue", K(ret));
} else if (OB_FAIL(ls_locality_cache_.init(MTL_ID(), GCTX.sql_proxy_))) {
LOG_WARN("failed to init ls locality cache", K(ret), KP(GCTX.sql_proxy_));
} else if (OB_FAIL(prohibit_medium_ls_id_map_.create(OB_MAX_LS_NUM_PER_TENANT_PER_SERVER, "Medium", "Medium"))) {
LOG_WARN("Fail to create prohibit medium ls id map", K(ret));
} else {
schedule_interval_ = schedule_interval;
is_inited_ = true;
@ -673,18 +676,110 @@ void ObTenantTabletScheduler::resume_major_merge()
}
}
void ObTenantTabletScheduler::stop_schedule_medium()
// The transfer task sets the flag that prohibits the scheduling of medium when the log stream is src_ls of transfer
int ObTenantTabletScheduler::stop_ls_schedule_medium(const share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
ProhibitMediumTask tmp_task;
ProhibitMediumTask task = ProhibitMediumTask::TRANSFER;
obsys::ObWLockGuard lock_guard(allow_schedule_medium_lock_);
allow_schedule_medium_flag_ = false;
if (!ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ls_id));
} else if (OB_FAIL(prohibit_medium_ls_id_map_.get_refactored(ls_id, tmp_task))) {
if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(prohibit_medium_ls_id_map_.set_refactored(ls_id, task))) {
LOG_WARN("failed to stop ls schedule medium", K(ret), K(ls_id), K(task));
}
} else {
LOG_WARN("failed to get map", K(ret), K(ls_id), K(tmp_task));
}
} else if (ProhibitMediumTask::MEDIUM == tmp_task) {
ret = OB_EAGAIN;
LOG_WARN("need wait ls already schedule medium end", K(ret), K(ls_id), K(tmp_task));
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task is already exist", K(ret), K(ls_id), K(tmp_task), K(task));
}
return ret;
}
void ObTenantTabletScheduler::resume_schedule_medium()
// When executing the medium task, set the flag in the normal task process of the log stream
int ObTenantTabletScheduler::ls_start_schedule_medium(const share::ObLSID &ls_id, bool &ls_could_schedule_medium)
{
int ret = OB_SUCCESS;
ProhibitMediumTask tmp_task;
ProhibitMediumTask task = ProhibitMediumTask::MEDIUM;
ls_could_schedule_medium = false;
obsys::ObWLockGuard lock_guard(allow_schedule_medium_lock_);
allow_schedule_medium_flag_ = true;
if (!ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ls_id));
} else if (OB_FAIL(prohibit_medium_ls_id_map_.get_refactored(ls_id, tmp_task))) {
if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(prohibit_medium_ls_id_map_.set_refactored(ls_id, task))) {
LOG_WARN("failed to stop ls schedule medium", K(ret), K(ls_id), K(task));
} else {
ls_could_schedule_medium = true;
}
} else {
LOG_WARN("failed to get map", K(ret), K(ls_id), K(tmp_task));
}
} else if (ProhibitMediumTask::TRANSFER == tmp_task) {
// do nothing
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task is already exist", K(ret), K(ls_id), K(tmp_task), K(task));
}
return ret;
}
// The transfer start transaction ends or the execution of the medium task ends and the flag is cleared
int ObTenantTabletScheduler::clear_prohibit_medium_flag(const share::ObLSID &ls_id, const ProhibitMediumTask &task)
{
int ret = OB_SUCCESS;
ProhibitMediumTask tmp_task;
obsys::ObWLockGuard lock_guard(allow_schedule_medium_lock_);
if (!ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ls_id));
} else if (OB_FAIL(prohibit_medium_ls_id_map_.get_refactored(ls_id, tmp_task))) {
LOG_WARN("failed to get task from map", K(ret), K(ls_id), K(task));
} else if (tmp_task != task) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task do not match", K(ret), K(ls_id), K(task), K(tmp_task));
} else if (OB_FAIL(prohibit_medium_ls_id_map_.erase_refactored(ls_id))) {
LOG_WARN("failed to resume ls schedule medium", K(ret), K(ls_id), K(task));
}
return ret;
}
void ObTenantTabletScheduler::print_prohibit_medium_ls_info(
char *buf,
const int64_t buf_len)
{
int ret = OB_SUCCESS;
static const char *task_strs[] = {
"TRANSFER",
"MEDIUM",
};
if (OB_ISNULL(buf) || buf_len <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KP(buf), K(buf_len));
} else if (0 == prohibit_medium_ls_id_map_.size()) {
// do nothing
} else {
FOREACH_X(it, prohibit_medium_ls_id_map_, OB_SUCC(ret)) {
const ObLSID &ls_id = it->first;
if (ProhibitMediumTask::TRANSFER > it->second || ProhibitMediumTask::MEDIUM < it->second) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("prihibit medium task is not expected", K(ret), "ProhibitMediumTask", it->second);
} else if (OB_FAIL(databuff_printf(buf, buf_len, "%ld,%s;", ls_id.id(), task_strs[it->second]))) {
LOG_WARN("fail to print str", K(ret), K(buf), K(buf_len));
}
}
}
}
int64_t ObTenantTabletScheduler::get_frozen_version() const
{
obsys::ObRLockGuard frozen_version_guard(frozen_version_lock_);
@ -1064,6 +1159,24 @@ int ObTenantTabletScheduler::schedule_ls_minor_merge(
return ret;
}
int ObTenantTabletScheduler::check_tablet_could_schedule_by_status(const ObTablet &tablet, bool &could_schedule_merge)
{
int ret = OB_SUCCESS;
ObTabletCreateDeleteMdsUserData user_data;
bool committed_flag = false;
could_schedule_merge = true;
if (OB_FAIL(tablet.ObITabletMdsInterface::get_latest_tablet_status(user_data, committed_flag))) {
LOG_WARN("failed to get tablet status", K(ret), K(tablet), K(user_data));
} else if (ObTabletStatus::TRANSFER_OUT == user_data.tablet_status_
|| ObTabletStatus::TRANSFER_OUT_DELETED == user_data.tablet_status_) {
could_schedule_merge = false;
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
LOG_INFO("tablet status is TRANSFER_OUT or TRANSFER_OUT_DELETED, merging is not allowed", K(user_data), K(tablet));
}
}
return ret;
}
int ObTenantTabletScheduler::schedule_ls_medium_merge(
int64_t &merge_version,
ObLSHandle &ls_handle,
@ -1074,6 +1187,7 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
bool need_merge = false;
ObLS &ls = *ls_handle.get_ls();
const ObLSID &ls_id = ls.get_ls_id();
bool ls_could_schedule_medium = false;
if (OB_FAIL(check_ls_state_in_major(ls, need_merge))) {
LOG_WARN("failed to check ls state", K(ret), K(ls));
} else if (!need_merge) {
@ -1127,12 +1241,15 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
if (!locality_bad_case) {
DEL_SUSPECT_INFO(MEDIUM_MERGE, ls_id, ObTabletID(INT64_MAX));
}
{ // start of RLock
// RLock to avoid changing allow_schedule_medium_flag_ when schedule current tablet
obsys::ObRLockGuard lock_guard(allow_schedule_medium_lock_);
if (!allow_schedule_medium_flag_) { // not allow schedule medium
if(OB_FAIL(ret)) {
} else if (is_leader && could_major_merge && OB_TMP_FAIL(ls_start_schedule_medium(ls_id, ls_could_schedule_medium))) {
LOG_WARN("failed to set start schedule medium", K(ret), K(tmp_ret), K(ls_id));
} else {
if (!ls_could_schedule_medium) { // not allow schedule medium
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
LOG_INFO("tenant is blocking schedule medium", KR(ret), K(MTL_ID()), K(ls_id), K_(allow_schedule_medium_flag));
LOG_INFO("tenant is blocking schedule medium", KR(ret), K(MTL_ID()), K(ls_id));
}
}
}
@ -1174,13 +1291,17 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
} else if (FALSE_IT(++schedule_tablet_cnt)) { // inc tablet cnt
} else if (OB_TMP_FAIL(schedule_tablet_medium(
ls_handle, tablet_handle, major_frozen_scn, weak_read_ts,
could_major_merge, enable_adaptive_compaction, ls_locality,
could_major_merge, enable_adaptive_compaction, ls_could_schedule_medium, ls_locality,
merge_version, is_leader, tablet_merge_finish))) {
LOG_WARN("fail to schedule tablet medium", K(tmp_ret));
}
medium_ls_tablet_iter_.update_merge_finish(tablet_merge_finish);
} // end of while
} // end of RLock allow_schedule_medium_lock_
// clear flags set by ls_start_schedule_medium
if (ls_could_schedule_medium
&& OB_TMP_FAIL(clear_prohibit_medium_flag(ls_id, ProhibitMediumTask::MEDIUM))) {
LOG_WARN("failed to clear prohibit schedule medium flag", K(tmp_ret), K(ret), K(ls_id));
}
} // else
return ret;
}
@ -1192,6 +1313,7 @@ int ObTenantTabletScheduler::schedule_tablet_medium(
const share::SCN &weak_read_ts,
const bool could_major_merge,
const bool enable_adaptive_compaction,
const bool ls_could_schedule_medium,
ObLSLocality ls_locality,
int64_t &merge_version,
bool &is_leader,
@ -1200,7 +1322,7 @@ int ObTenantTabletScheduler::schedule_tablet_medium(
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
tablet_merge_finish = false;
bool tablet_could_schedule_merge = false;
ObLS &ls = *ls_handle.get_ls();
const ObLSID &ls_id = ls.get_ls_id();
ObTablet &tablet = *tablet_handle.get_obj();
@ -1210,6 +1332,9 @@ int ObTenantTabletScheduler::schedule_tablet_medium(
if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) {
LOG_WARN("fail to fetch table store", K(ret));
} else {
if (ls_could_schedule_medium && OB_TMP_FAIL(check_tablet_could_schedule_by_status(tablet, tablet_could_schedule_merge))) {
LOG_WARN("failed to check tablet counld schedule merge", K(tmp_ret), K(tablet_id));
}
ObMediumCompactionScheduleFunc func(ls, tablet_handle, weak_read_ts);
ObITable *latest_major = table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true/*last*/);
if (OB_NOT_NULL(latest_major) && latest_major->get_snapshot_version() >= merge_version) {
@ -1254,7 +1379,7 @@ int ObTenantTabletScheduler::schedule_tablet_medium(
}
if (could_schedule_next_medium && could_major_merge
&& (!tablet_merge_finish || enable_adaptive_compaction || check_medium_finish)
&& allow_schedule_medium_flag_) {
&& tablet_could_schedule_merge) {
if (OB_TMP_FAIL(func.schedule_next_medium_for_leader(
tablet_merge_finish ? 0 : merge_version, schedule_stats_))) { // schedule another round
if (OB_NOT_MASTER == tmp_ret) {
@ -1269,8 +1394,8 @@ int ObTenantTabletScheduler::schedule_tablet_medium(
if (could_major_merge && OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
ls, tablet, major_frozen_scn, true/*scheduler_called*/))) {
if (OB_EAGAIN != ret) {
LOG_WARN("failed to schedule medium", K(tmp_ret), K(ls_id), K(tablet_id));
if (OB_EAGAIN != tmp_ret) {
LOG_WARN("failed to schedule medium", K(tmp_ret), K(ret), K(ls_id), K(tablet_id));
}
}
}

View File

@ -37,7 +37,11 @@ class ObLS;
class ObTablet;
class ObITable;
class ObTabletDDLKvMgr;
enum ProhibitMediumTask
{
TRANSFER = 0,
MEDIUM = 1
};
class ObFastFreezeChecker
{
public:
@ -147,7 +151,6 @@ public:
int64_t finish_cnt_;
int64_t wait_rs_validate_cnt_;
};
public:
ObTenantTabletScheduler();
~ObTenantTabletScheduler();
@ -177,9 +180,11 @@ public:
void stop_major_merge();
void resume_major_merge();
OB_INLINE bool could_major_merge_start() const { return major_merge_status_; }
void stop_schedule_medium(); // may block for waiting lock
void resume_schedule_medium();
OB_INLINE bool could_schedule_medium() const { return allow_schedule_medium_flag_; }
int check_tablet_could_schedule_by_status(const ObTablet &tablet, bool &could_schedule_merge);
int stop_ls_schedule_medium(const share::ObLSID &ls_id); // may block for waiting lock
int clear_prohibit_medium_flag(const share::ObLSID &ls_id, const ProhibitMediumTask &task);
int ls_start_schedule_medium(const share::ObLSID &ls_id, bool &ls_could_schedule_medium);
void print_prohibit_medium_ls_info(char *buf, const int64_t buf_len);
int64_t get_frozen_version() const;
int64_t get_merged_version() const { return merged_version_; }
int64_t get_inner_table_merged_scn() const { return ATOMIC_LOAD(&inner_table_merged_scn_); }
@ -247,6 +252,7 @@ private:
const share::SCN &weak_read_ts,
const bool could_major_merge,
const bool enable_adaptive_compaction,
const bool ls_could_schedule_medium,
compaction::ObLSLocality ls_locality,
int64_t &merge_version,
bool &is_leader,
@ -342,7 +348,7 @@ private:
ObCompactionScheduleIterator medium_ls_tablet_iter_;
int64_t error_tablet_cnt_; // for diagnose
mutable obsys::ObRWLock allow_schedule_medium_lock_;
mutable bool allow_schedule_medium_flag_;
common::hash::ObHashMap<share::ObLSID, ProhibitMediumTask> prohibit_medium_ls_id_map_;
compaction::ObStorageLocalityCache ls_locality_cache_;
};

View File

@ -644,7 +644,7 @@ int ObTabletBackfillTXTask::get_backfill_tx_minor_sstables_(
LOG_WARN("tablet backfill tx task do not init", K(ret));
} else if (OB_ISNULL(tablet)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get backfll tx memtables get invalid argument", K(ret), KP(tablet));
LOG_WARN("get backfll tx minor sstable get invalid argument", K(ret), KP(tablet));
} else if (OB_FAIL(tablet->get_mini_minor_sstables(minor_table_iter))) {
LOG_WARN("failed to get mini minor sstables", K(ret));
} else {

View File

@ -24,6 +24,7 @@
#include "lib/utility/ob_tracepoint.h"
#include "observer/ob_server_event_history_table_operator.h"
#include "ob_storage_ha_utils.h"
#include "storage/compaction/ob_tenant_tablet_scheduler.h"
using namespace oceanbase::transaction;
using namespace oceanbase::share;
@ -418,6 +419,7 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta
ObTimeoutCtx timeout_ctx;
ObMySQLTransaction trans;
bool enable_kill_trx = false;
bool succ_stop_medium = false;
if (!is_inited_) {
ret = OB_NOT_INIT;
@ -451,6 +453,8 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta
enable_kill_trx = tenant_config->_enable_balance_kill_transaction;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(stop_ls_schedule_medium_(task_info.src_ls_id_, succ_stop_medium))) {
LOG_WARN("failed to stop ls schedule medium", K(ret), K(task_info));
} else if (OB_FAIL(lock_src_and_dest_ls_member_list_(task_info, task_info.src_ls_id_, task_info.dest_ls_id_))) {
LOG_WARN("failed to lock src and dest ls member list", K(ret), K(task_info));
} else if (!enable_kill_trx && OB_FAIL(check_src_ls_has_active_trans_(task_info.src_ls_id_))) {
@ -466,12 +470,19 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta
} else {
DEBUG_SYNC(BEFORE_TRANSFER_START_COMMIT);
}
if (OB_TMP_FAIL(commit_trans_(ret, trans))) {
LOG_WARN("failed to commit trans", K(tmp_ret), K(ret));
if (OB_SUCCESS == ret) {
ret = tmp_ret;
}
}
if (succ_stop_medium && OB_TMP_FAIL(clear_prohibit_medium_flag_(task_info.src_ls_id_))) {
LOG_WARN("failed to clear prohibit schedule medium flag", K(tmp_ret), K(ret), K(task_info));
if (OB_SUCCESS == ret) {
ret = tmp_ret;
}
}
}
if (OB_FAIL(ret)) {
@ -1932,6 +1943,27 @@ void ObTransferHandler::online()
{
transfer_worker_mgr_.reset_task_id();
}
int ObTransferHandler::stop_ls_schedule_medium_(const share::ObLSID &ls_id, bool &succ_stop)
{
int ret = OB_SUCCESS;
succ_stop = false;
if (OB_FAIL(MTL(ObTenantTabletScheduler*)->stop_ls_schedule_medium(ls_id))) {
LOG_WARN("failed to resume ls schedule medium", K(ret), K(ls_id));
} else {
succ_stop = true;
}
return ret;
}
int ObTransferHandler::clear_prohibit_medium_flag_(const share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
if (OB_FAIL(MTL(ObTenantTabletScheduler*)->clear_prohibit_medium_flag(ls_id, ProhibitMediumTask::TRANSFER))) {
LOG_WARN("failed to clear prohibit schedule medium flag", K(ret), K(ls_id));
}
return ret;
}
}
}

View File

@ -211,6 +211,9 @@ private:
const uint64_t tenant_id,
const share::ObLSID &ls_id);
int record_server_event_(const int32_t ret, const int64_t round, const share::ObTransferTaskInfo &task_info) const;
int clear_prohibit_medium_flag_(const share::ObLSID &ls_id);
int stop_ls_schedule_medium_(const share::ObLSID &ls_id, bool &succ_stop);
private:
static const int64_t INTERVAL_US = 1 * 1000 * 1000; //1s
static const int64_t KILL_TX_MAX_RETRY_TIMES = 3;

View File

@ -316,11 +316,11 @@ int ObTXTransferUtils::get_tablet_status_(
bool unused_committed_flag = false;
if (get_commit) {
if (OB_FAIL(tablet->ObITabletMdsInterface::get_tablet_status(share::SCN::max_scn(), user_data, ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US))) {
LOG_WARN("failed to get tx data", K(ret), KPC(tablet), K(user_data));
LOG_WARN("failed to get committed tablet status", K(ret), KPC(tablet), K(user_data));
}
} else {
if (OB_FAIL(tablet->ObITabletMdsInterface::get_latest_tablet_status(user_data, unused_committed_flag))) {
LOG_WARN("failed to get tx data", K(ret), KPC(tablet), K(user_data));
LOG_WARN("failed to get latest tablet status", K(ret), KPC(tablet), K(user_data));
}
}
return ret;