fix major freeze about updating report_scn in case of backup-restore tenant
This commit is contained in:
@ -128,8 +128,8 @@ void ObFreezeInfoDetector::run3()
|
|||||||
|
|
||||||
bool need_broadcast = false;
|
bool need_broadcast = false;
|
||||||
ret = OB_SUCCESS; // ignore ret
|
ret = OB_SUCCESS; // ignore ret
|
||||||
if (OB_FAIL(check_need_broadcast(need_broadcast))) {
|
if (OB_FAIL(check_need_broadcast(need_broadcast, proposal_id))) {
|
||||||
LOG_WARN("fail to check need broadcast", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to check need broadcast", KR(ret), K_(tenant_id), K(proposal_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (need_broadcast) {
|
if (need_broadcast) {
|
||||||
@ -166,13 +166,13 @@ void ObFreezeInfoDetector::run3()
|
|||||||
LOG_INFO("stop freeze_info_detector", K_(tenant_id));
|
LOG_INFO("stop freeze_info_detector", K_(tenant_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObFreezeInfoDetector::check_need_broadcast(bool &need_broadcast)
|
int ObFreezeInfoDetector::check_need_broadcast(bool &need_broadcast, const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("not init", KR(ret), K_(tenant_id));
|
LOG_WARN("not init", KR(ret), K_(tenant_id));
|
||||||
} else if (OB_FAIL(try_adjust_global_merge_info())) {
|
} else if (OB_FAIL(try_adjust_global_merge_info(expected_epoch))) {
|
||||||
LOG_WARN("fail to try adjust global merge info", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to try adjust global merge info", KR(ret), K_(tenant_id));
|
||||||
} else if (OB_FAIL(freeze_info_mgr_->check_need_broadcast(need_broadcast))) {
|
} else if (OB_FAIL(freeze_info_mgr_->check_need_broadcast(need_broadcast))) {
|
||||||
LOG_WARN("fail to check need broadcast", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to check need broadcast", KR(ret), K_(tenant_id));
|
||||||
@ -348,7 +348,7 @@ int ObFreezeInfoDetector::try_reload_freeze_info(const int64_t expected_epoch)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObFreezeInfoDetector::try_adjust_global_merge_info()
|
int ObFreezeInfoDetector::try_adjust_global_merge_info(const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool is_initial = false;
|
bool is_initial = false;
|
||||||
@ -368,7 +368,7 @@ int ObFreezeInfoDetector::try_adjust_global_merge_info()
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("fail to try adjust global merge info, freeze info manager is null", KR(ret),
|
LOG_WARN("fail to try adjust global merge info, freeze info manager is null", KR(ret),
|
||||||
K_(tenant_id), K_(is_primary_service));
|
K_(tenant_id), K_(is_primary_service));
|
||||||
} else if (OB_FAIL(freeze_info_mgr_->adjust_global_merge_info())) {
|
} else if (OB_FAIL(freeze_info_mgr_->adjust_global_merge_info(expected_epoch))) {
|
||||||
LOG_WARN("fail to adjust global merge info", KR(ret), K_(tenant_id), K_(is_primary_service));
|
LOG_WARN("fail to adjust global merge info", KR(ret), K_(tenant_id), K_(is_primary_service));
|
||||||
} else {
|
} else {
|
||||||
is_global_merge_info_adjusted_ = true;
|
is_global_merge_info_adjusted_ = true;
|
||||||
|
|||||||
@ -46,7 +46,7 @@ public:
|
|||||||
int signal();
|
int signal();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int check_need_broadcast(bool &need_broadcast);
|
int check_need_broadcast(bool &need_broadcast, const int64_t expected_epoch);
|
||||||
int try_broadcast_freeze_info(const int64_t expected_epoch);
|
int try_broadcast_freeze_info(const int64_t expected_epoch);
|
||||||
int try_renew_snapshot_gc_scn(const bool renew_on_start);
|
int try_renew_snapshot_gc_scn(const bool renew_on_start);
|
||||||
int try_minor_freeze();
|
int try_minor_freeze();
|
||||||
@ -57,7 +57,7 @@ private:
|
|||||||
int check_tenant_is_restore(const uint64_t tenant_id, bool &is_restore);
|
int check_tenant_is_restore(const uint64_t tenant_id, bool &is_restore);
|
||||||
int try_reload_freeze_info(const int64_t expected_epoch);
|
int try_reload_freeze_info(const int64_t expected_epoch);
|
||||||
// adjust global_merge_info in memory to avoid useless major freezes on restore major_freeze_service
|
// adjust global_merge_info in memory to avoid useless major freezes on restore major_freeze_service
|
||||||
int try_adjust_global_merge_info();
|
int try_adjust_global_merge_info(const int64_t expected_epoch);
|
||||||
int check_global_merge_info(bool &is_initial) const;
|
int check_global_merge_info(bool &is_initial) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|||||||
@ -211,13 +211,13 @@ int ObFreezeInfoManager::get_global_broadcast_scn(SCN &global_broadcast_scn) con
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObFreezeInfoManager::adjust_global_merge_info()
|
int ObFreezeInfoManager::adjust_global_merge_info(const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_ISNULL(merge_info_mgr_)) {
|
if (OB_ISNULL(merge_info_mgr_)) {
|
||||||
ret = OB_INNER_STAT_ERROR;
|
ret = OB_INNER_STAT_ERROR;
|
||||||
LOG_WARN("merge info mgr is null", KR(ret));
|
LOG_WARN("merge info mgr is null", KR(ret));
|
||||||
} else if (OB_FAIL(merge_info_mgr_->adjust_global_merge_info())) {
|
} else if (OB_FAIL(merge_info_mgr_->adjust_global_merge_info(expected_epoch))) {
|
||||||
LOG_WARN("fail to adjust global merge info", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to adjust global merge info", KR(ret), K_(tenant_id));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -105,7 +105,7 @@ public:
|
|||||||
int get_global_last_merged_scn(share::SCN &global_last_merged_scn) const;
|
int get_global_last_merged_scn(share::SCN &global_last_merged_scn) const;
|
||||||
int get_global_broadcast_scn(share::SCN &global_broadcast_scn) const;
|
int get_global_broadcast_scn(share::SCN &global_broadcast_scn) const;
|
||||||
int get_local_latest_frozen_scn(share::SCN &frozen_scn);
|
int get_local_latest_frozen_scn(share::SCN &frozen_scn);
|
||||||
int adjust_global_merge_info();
|
int adjust_global_merge_info(const int64_t expected_epoch);
|
||||||
|
|
||||||
void reset_freeze_info();
|
void reset_freeze_info();
|
||||||
|
|
||||||
|
|||||||
@ -17,6 +17,7 @@
|
|||||||
#include "share/ob_freeze_info_proxy.h"
|
#include "share/ob_freeze_info_proxy.h"
|
||||||
#include "share/ob_zone_merge_table_operator.h"
|
#include "share/ob_zone_merge_table_operator.h"
|
||||||
#include "share/ob_global_merge_table_operator.h"
|
#include "share/ob_global_merge_table_operator.h"
|
||||||
|
#include "share/ob_tablet_meta_table_compaction_operator.h"
|
||||||
#include "observer/ob_server_struct.h"
|
#include "observer/ob_server_struct.h"
|
||||||
#include "share/ob_cluster_version.h"
|
#include "share/ob_cluster_version.h"
|
||||||
#include "share/inner_table/ob_inner_table_schema_constants.h"
|
#include "share/inner_table/ob_inner_table_schema_constants.h"
|
||||||
@ -900,28 +901,45 @@ int ObZoneMergeManagerBase::try_update_zone_merge_info(const int64_t expected_ep
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObZoneMergeManagerBase::adjust_global_merge_info()
|
int ObZoneMergeManagerBase::adjust_global_merge_info(const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObSimpleFrozenStatus max_frozen_status;
|
ObSimpleFrozenStatus max_frozen_status;
|
||||||
ObFreezeInfoProxy freeze_info_proxy(tenant_id_);
|
ObFreezeInfoProxy freeze_info_proxy(tenant_id_);
|
||||||
|
SCN min_compaction_scn;
|
||||||
|
SCN max_frozen_scn;
|
||||||
|
// 1. get min{compaction_scn} of all tablets in __all_tablet_meta_table
|
||||||
if (OB_FAIL(check_inner_stat())) {
|
if (OB_FAIL(check_inner_stat())) {
|
||||||
LOG_WARN("fail to check inner stat", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to check inner stat", KR(ret), K_(tenant_id));
|
||||||
} else if (OB_FAIL(freeze_info_proxy.get_max_freeze_info(*proxy_, max_frozen_status))) {
|
} else if (OB_FAIL(ObTabletMetaTableCompactionOperator::get_min_compaction_scn(tenant_id_, min_compaction_scn))) {
|
||||||
LOG_WARN("fail to get freeze info with max frozen_scn", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to get min_compaction_scn", KR(ret), K_(tenant_id));
|
||||||
} else if (OB_UNLIKELY(max_frozen_status.frozen_scn_ < SCN::base_scn())) {
|
} else if (min_compaction_scn < SCN::base_scn()) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected frozen_scn", KR(ret), K(max_frozen_status), K_(tenant_id));
|
LOG_WARN("unexpected min_compaction_scn", KR(ret), K_(tenant_id), K(min_compaction_scn));
|
||||||
} else {
|
} else if (min_compaction_scn == SCN::base_scn()) {
|
||||||
// Adjust global_merge_info in memory with (max_frozen_status.frozen_scn_ - 1).
|
// do nothing. no need to adjust global_merge_info
|
||||||
// So as to launch one major freeze with max_frozen_status.frozen_scn_.
|
} else if (min_compaction_scn > SCN::base_scn()) {
|
||||||
// Note: only launch major freezes with frozen_scn > 1
|
// 2. if min{compaction_scn} > 1, get max{frozen_scn} <= min{compaction_scn} from __all_freeze_info
|
||||||
if (max_frozen_status.frozen_scn_ > SCN::base_scn()) {
|
// case 1: if min{compaction_scn} is medium_compaction_scn, return the max frozen_scn which is
|
||||||
inner_adjust_global_merge_info(SCN::minus(max_frozen_status.frozen_scn_, 1));
|
// smaller than this medium_compaction_scn from __all_freeze_info
|
||||||
LOG_INFO("succ to adjust global merge info", K_(tenant_id), "max_frozen_scn",
|
// case 2: if min{compaction_scn} is major_compaction_scn, return this major_compaction_scn
|
||||||
max_frozen_status.frozen_scn_, K_(global_merge_info));
|
if (OB_FAIL(freeze_info_proxy.get_max_frozen_scn_smaller_or_equal_than(*proxy_,
|
||||||
|
min_compaction_scn, max_frozen_scn))) {
|
||||||
|
LOG_WARN("fail to get max frozen_scn smaller than or equal to min_compaction_scn", KR(ret),
|
||||||
|
K_(tenant_id), K(min_compaction_scn));
|
||||||
|
} else if (max_frozen_scn < SCN::base_scn()) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("unexpected max_frozen_scn", KR(ret), K_(tenant_id), K(max_frozen_scn));
|
||||||
|
} else if (max_frozen_scn == SCN::base_scn()) {
|
||||||
|
// do nothing. no need to adjust global_merge_info
|
||||||
|
} else if (max_frozen_scn > SCN::base_scn()) {
|
||||||
|
// 3. if max{frozen_scn} > 1, update __all_merge_info and global_merge_info with max{frozen_scn}
|
||||||
|
if (OB_FAIL(inner_adjust_global_merge_info(max_frozen_scn, expected_epoch))) {
|
||||||
|
LOG_WARN("fail to inner adjust global merge info", KR(ret), K_(tenant_id), K(max_frozen_scn));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
FLOG_INFO("finish to adjust global merge info", K_(tenant_id), K(max_frozen_scn), K_(global_merge_info));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1127,16 +1145,48 @@ int ObZoneMergeManagerBase::handle_zone_merge_info_to_insert(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObZoneMergeManagerBase::inner_adjust_global_merge_info(const SCN &min_compaction_scn)
|
int ObZoneMergeManagerBase::inner_adjust_global_merge_info(
|
||||||
|
const SCN &frozen_scn,
|
||||||
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
// Here, only adjust global_merge_info in memory, not adjust global_merge_info in table.
|
int ret = OB_SUCCESS;
|
||||||
// The next round of major freeze would update global_merge_info in table.
|
if (OB_UNLIKELY(!frozen_scn.is_valid() || expected_epoch < 0)) {
|
||||||
// Note that, here not only adjust last_merged_scn, but also adjust global_broadcast_scn and
|
ret = OB_INVALID_ARGUMENT;
|
||||||
// frozen_scn. So as to avoid error in ObMajorMergeScheduler::do_work(), which works based on
|
LOG_WARN("invalid argument", KR(ret), K(frozen_scn), K(expected_epoch));
|
||||||
// these global_merge_info in memory.
|
} else {
|
||||||
global_merge_info_.last_merged_scn_.set_scn(min_compaction_scn, false);
|
// 1. adjust global_merge_info in memory to control the frozen_scn of the next major compaction.
|
||||||
global_merge_info_.global_broadcast_scn_.set_scn(min_compaction_scn, false);
|
// 2. adjust global_merge_info in table for background thread to update report_scn.
|
||||||
global_merge_info_.frozen_scn_.set_scn(min_compaction_scn, false);
|
// https://work.aone.alibaba-inc.com/issue/47967612
|
||||||
|
// Note that, here not only adjust last_merged_scn, but also adjust global_broadcast_scn and
|
||||||
|
// frozen_scn. So as to avoid error in ObMajorMergeScheduler::do_work(), which works based on
|
||||||
|
// these global_merge_info in memory.
|
||||||
|
ObMySQLTransaction trans;
|
||||||
|
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
|
||||||
|
if (OB_FAIL(trans.start(proxy_, meta_tenant_id))) {
|
||||||
|
LOG_WARN("fail to start transaction", KR(ret), K_(tenant_id), K(meta_tenant_id));
|
||||||
|
} else if (OB_FAIL(check_freeze_service_epoch(trans, expected_epoch))) {
|
||||||
|
LOG_WARN("fail to check freeze_service_epoch", KR(ret), K(expected_epoch));
|
||||||
|
} else {
|
||||||
|
ObGlobalMergeInfo tmp_global_info;
|
||||||
|
if (OB_FAIL(tmp_global_info.assign_value(global_merge_info_))) {
|
||||||
|
LOG_WARN("fail to assign global merge info", KR(ret), K_(global_merge_info));
|
||||||
|
} else {
|
||||||
|
tmp_global_info.frozen_scn_.set_scn(frozen_scn, true);
|
||||||
|
tmp_global_info.global_broadcast_scn_.set_scn(frozen_scn, true);
|
||||||
|
tmp_global_info.last_merged_scn_.set_scn(frozen_scn, true);
|
||||||
|
if (OB_FAIL(ObGlobalMergeTableOperator::update_partial_global_merge_info(trans, tenant_id_, tmp_global_info))) {
|
||||||
|
LOG_WARN("fail to update partial global merge info", KR(ret), K(tmp_global_info));
|
||||||
|
}
|
||||||
|
handle_trans_stat(trans, ret);
|
||||||
|
if (FAILEDx(global_merge_info_.assign_value(tmp_global_info))) {
|
||||||
|
LOG_WARN("fail to assign global_merge_info", KR(ret), K(tmp_global_info), K_(global_merge_info));
|
||||||
|
} else {
|
||||||
|
LOG_INFO("succ to update global_merge_info", K_(tenant_id), K(tmp_global_info), K_(global_merge_info));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
// only used for copying data to/from shadow_
|
// only used for copying data to/from shadow_
|
||||||
@ -1458,7 +1508,7 @@ int ObZoneMergeManager::try_update_zone_merge_info(const int64_t expected_epoch)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObZoneMergeManager::adjust_global_merge_info()
|
int ObZoneMergeManager::adjust_global_merge_info(const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
SpinWLockGuard guard(write_lock_);
|
SpinWLockGuard guard(write_lock_);
|
||||||
@ -1468,7 +1518,7 @@ int ObZoneMergeManager::adjust_global_merge_info()
|
|||||||
ObZoneMergeMgrGuard shadow_guard(lock_,
|
ObZoneMergeMgrGuard shadow_guard(lock_,
|
||||||
*(static_cast<ObZoneMergeManagerBase *> (this)), shadow_, ret);
|
*(static_cast<ObZoneMergeManagerBase *> (this)), shadow_, ret);
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
ret = shadow_.adjust_global_merge_info();
|
ret = shadow_.adjust_global_merge_info(expected_epoch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -69,7 +69,7 @@ public:
|
|||||||
virtual int try_update_global_last_merged_scn(const int64_t expected_epoch);
|
virtual int try_update_global_last_merged_scn(const int64_t expected_epoch);
|
||||||
virtual int update_global_merge_info_after_merge(const int64_t expected_epoch);
|
virtual int update_global_merge_info_after_merge(const int64_t expected_epoch);
|
||||||
virtual int try_update_zone_merge_info(const int64_t expected_epoch);
|
virtual int try_update_zone_merge_info(const int64_t expected_epoch);
|
||||||
virtual int adjust_global_merge_info();
|
virtual int adjust_global_merge_info(const int64_t expected_epoch);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int check_valid(const common::ObZone &zone, int64_t &idx) const;
|
int check_valid(const common::ObZone &zone, int64_t &idx) const;
|
||||||
@ -90,7 +90,8 @@ private:
|
|||||||
const common::ObIArray<share::ObZoneMergeInfo> &ori_merge_infos,
|
const common::ObIArray<share::ObZoneMergeInfo> &ori_merge_infos,
|
||||||
const common::ObIArray<common::ObZone> &zone_list,
|
const common::ObIArray<common::ObZone> &zone_list,
|
||||||
common::ObIArray<share::ObZoneMergeInfo> &to_insert_infos);
|
common::ObIArray<share::ObZoneMergeInfo> &to_insert_infos);
|
||||||
void inner_adjust_global_merge_info(const share::SCN &min_compaction_scn);
|
int inner_adjust_global_merge_info(const share::SCN &frozen_scn,
|
||||||
|
const int64_t expected_epoch);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
common::SpinRWLock lock_;
|
common::SpinRWLock lock_;
|
||||||
@ -137,7 +138,7 @@ public:
|
|||||||
virtual int try_update_global_last_merged_scn(const int64_t expected_epoch);
|
virtual int try_update_global_last_merged_scn(const int64_t expected_epoch);
|
||||||
virtual int update_global_merge_info_after_merge(const int64_t expected_epoch);
|
virtual int update_global_merge_info_after_merge(const int64_t expected_epoch);
|
||||||
virtual int try_update_zone_merge_info(const int64_t expected_epoch);
|
virtual int try_update_zone_merge_info(const int64_t expected_epoch);
|
||||||
virtual int adjust_global_merge_info();
|
virtual int adjust_global_merge_info(const int64_t expected_epoch);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
class ObZoneMergeMgrGuard
|
class ObZoneMergeMgrGuard
|
||||||
|
|||||||
@ -169,6 +169,42 @@ int ObFreezeInfoProxy::get_freeze_info_larger_or_equal_than(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObFreezeInfoProxy::get_max_frozen_scn_smaller_or_equal_than(
|
||||||
|
ObISQLClient &sql_proxy,
|
||||||
|
const SCN &compaction_scn,
|
||||||
|
SCN &max_frozen_scn)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObSqlString sql;
|
||||||
|
if (OB_UNLIKELY(!compaction_scn.is_valid() || (compaction_scn < SCN::base_scn()))) {
|
||||||
|
LOG_WARN("invalid argument", KR(ret), K(compaction_scn));
|
||||||
|
} else {
|
||||||
|
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||||
|
ObMySQLResult *result = nullptr;
|
||||||
|
const uint64_t compaction_scn_val = compaction_scn.get_val_for_inner_table_field();
|
||||||
|
if (OB_FAIL(sql.assign_fmt("SELECT MAX(frozen_scn) as value FROM %s WHERE frozen_scn <= %lu",
|
||||||
|
OB_ALL_FREEZE_INFO_TNAME, compaction_scn_val))) {
|
||||||
|
LOG_WARN("fail to append sql", KR(ret), K_(tenant_id), K(compaction_scn));
|
||||||
|
} else if (OB_FAIL(sql_proxy.read(res, tenant_id_, sql.ptr()))) {
|
||||||
|
LOG_WARN("fail to execute sql", KR(ret), K(sql), K_(tenant_id));
|
||||||
|
} else if (OB_ISNULL(result = res.get_result())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("fail to get sql result", KR(ret), K(sql), K_(tenant_id));
|
||||||
|
} else if (OB_FAIL(result->next())) {
|
||||||
|
LOG_WARN("get next result failed", KR(ret), K_(tenant_id), K(sql));
|
||||||
|
} else {
|
||||||
|
uint64_t max_frozen_scn_val = UINT64_MAX;
|
||||||
|
EXTRACT_UINT_FIELD_MYSQL(*result, "value", max_frozen_scn_val, uint64_t);
|
||||||
|
if (FAILEDx(max_frozen_scn.convert_for_inner_table_field(max_frozen_scn_val))) {
|
||||||
|
LOG_WARN("fail to convert uint64_t to SCN", KR(ret), K(max_frozen_scn_val));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG_INFO("finish to get freeze_info", KR(ret), K_(tenant_id), K(sql));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObFreezeInfoProxy::set_freeze_info(
|
int ObFreezeInfoProxy::set_freeze_info(
|
||||||
ObISQLClient &sql_proxy,
|
ObISQLClient &sql_proxy,
|
||||||
const ObSimpleFrozenStatus &frozen_status)
|
const ObSimpleFrozenStatus &frozen_status)
|
||||||
|
|||||||
@ -133,6 +133,12 @@ public:
|
|||||||
const SCN &frozen_scn,
|
const SCN &frozen_scn,
|
||||||
common::ObIArray<ObSimpleFrozenStatus> &frozen_statuses);
|
common::ObIArray<ObSimpleFrozenStatus> &frozen_statuses);
|
||||||
|
|
||||||
|
// get the maximum frozen_scn which is smaller than or equal to the given @compaction_scn
|
||||||
|
int get_max_frozen_scn_smaller_or_equal_than(
|
||||||
|
common::ObISQLClient &sql_proxy,
|
||||||
|
const SCN &compaction_scn,
|
||||||
|
SCN &max_frozen_scn);
|
||||||
|
|
||||||
int set_freeze_info(common::ObISQLClient &sql_proxy,
|
int set_freeze_info(common::ObISQLClient &sql_proxy,
|
||||||
const ObSimpleFrozenStatus &frozen_status);
|
const ObSimpleFrozenStatus &frozen_status);
|
||||||
|
|
||||||
|
|||||||
@ -17,6 +17,7 @@
|
|||||||
#include "share/ob_dml_sql_splicer.h"
|
#include "share/ob_dml_sql_splicer.h"
|
||||||
#include "share/tablet/ob_tablet_filter.h"
|
#include "share/tablet/ob_tablet_filter.h"
|
||||||
#include "share/ob_service_epoch_proxy.h"
|
#include "share/ob_service_epoch_proxy.h"
|
||||||
|
#include "share/scn.h"
|
||||||
#include "observer/ob_server_struct.h"
|
#include "observer/ob_server_struct.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
@ -243,6 +244,56 @@ int ObTabletMetaTableCompactionOperator::batch_update_unequal_report_scn_tablet(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObTabletMetaTableCompactionOperator::get_min_compaction_scn(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
SCN &min_compaction_scn)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const int64_t start_time_us = ObTimeUtil::current_time();
|
||||||
|
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||||
|
} else {
|
||||||
|
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||||
|
int64_t estimated_timeout_us = 0;
|
||||||
|
ObTimeoutCtx timeout_ctx;
|
||||||
|
// set trx_timeout and query_timeout based on tablet_replica_cnt
|
||||||
|
if (OB_FAIL(ObTabletMetaTableCompactionOperator::get_estimated_timeout_us(tenant_id,
|
||||||
|
estimated_timeout_us))) {
|
||||||
|
LOG_WARN("fail to get estimated_timeout_us", KR(ret), K(tenant_id));
|
||||||
|
} else if (OB_FAIL(timeout_ctx.set_trx_timeout_us(estimated_timeout_us))) {
|
||||||
|
LOG_WARN("fail to set trx timeout", KR(ret), K(estimated_timeout_us));
|
||||||
|
} else if (OB_FAIL(timeout_ctx.set_timeout(estimated_timeout_us))) {
|
||||||
|
LOG_WARN("fail to set abs timeout", KR(ret), K(estimated_timeout_us));
|
||||||
|
} else {
|
||||||
|
ObSqlString sql;
|
||||||
|
SMART_VAR(ObISQLClient::ReadResult, res) {
|
||||||
|
ObMySQLResult *result = nullptr;
|
||||||
|
if (OB_FAIL(sql.assign_fmt("SELECT MIN(compaction_scn) as value FROM %s WHERE tenant_id ="
|
||||||
|
" '%ld' ", OB_ALL_TABLET_META_TABLE_TNAME, tenant_id))) {
|
||||||
|
LOG_WARN("failed to append fmt", K(ret), K(tenant_id));
|
||||||
|
} else if (OB_FAIL(GCTX.sql_proxy_->read(res, meta_tenant_id, sql.ptr()))) {
|
||||||
|
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(sql));
|
||||||
|
} else if (OB_ISNULL(result = res.get_result())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("fail to get mysql result", KR(ret), K(tenant_id), K(sql));
|
||||||
|
} else if (OB_FAIL(result->next())) {
|
||||||
|
LOG_WARN("get next result failed", KR(ret), K(tenant_id), K(sql));
|
||||||
|
} else {
|
||||||
|
uint64_t min_compaction_scn_val = UINT64_MAX;
|
||||||
|
EXTRACT_UINT_FIELD_MYSQL(*result, "value", min_compaction_scn_val, uint64_t);
|
||||||
|
if (FAILEDx(min_compaction_scn.convert_for_inner_table_field(min_compaction_scn_val))) {
|
||||||
|
LOG_WARN("fail to convert uint64_t to SCN", KR(ret), K(min_compaction_scn_val));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG_INFO("finish to get min_compaction_scn", KR(ret), K(tenant_id), K(min_compaction_scn),
|
||||||
|
"cost_time_us", ObTimeUtil::current_time() - start_time_us, K(estimated_timeout_us));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObTabletMetaTableCompactionOperator::construct_unequal_tablet_id_array(
|
int ObTabletMetaTableCompactionOperator::construct_unequal_tablet_id_array(
|
||||||
sqlclient::ObMySQLResult &result,
|
sqlclient::ObMySQLResult &result,
|
||||||
common::ObIArray<ObTabletID> &unequal_tablet_id_array)
|
common::ObIArray<ObTabletID> &unequal_tablet_id_array)
|
||||||
|
|||||||
@ -28,6 +28,7 @@ class ObMySQLTransaction;
|
|||||||
namespace share
|
namespace share
|
||||||
{
|
{
|
||||||
class ObTabletReplicaFilter;
|
class ObTabletReplicaFilter;
|
||||||
|
class SCN;
|
||||||
|
|
||||||
// part compaction related member from __all_tablet_meta_table
|
// part compaction related member from __all_tablet_meta_table
|
||||||
struct ObTabletCompactionScnInfo
|
struct ObTabletCompactionScnInfo
|
||||||
@ -116,6 +117,9 @@ public:
|
|||||||
const share::ObLSID &ls_id,
|
const share::ObLSID &ls_id,
|
||||||
const int64_t major_frozen_scn,
|
const int64_t major_frozen_scn,
|
||||||
const common::ObIArray<ObTabletID> &input_tablet_id_array);
|
const common::ObIArray<ObTabletID> &input_tablet_id_array);
|
||||||
|
static int get_min_compaction_scn(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
SCN &min_compaction_scn);
|
||||||
private:
|
private:
|
||||||
// is_update_finish_scn = TRUE: update finish_scn
|
// is_update_finish_scn = TRUE: update finish_scn
|
||||||
// is_update_finish_scn = FALSE: delete rows
|
// is_update_finish_scn = FALSE: delete rows
|
||||||
|
|||||||
Reference in New Issue
Block a user