add ls locality cache in compaction[发版后合]

This commit is contained in:
obdev 2023-04-04 00:41:21 +00:00 committed by ob-robot
parent 93cf8e3057
commit 2c86db0c46
7 changed files with 544 additions and 9 deletions

View File

@ -463,6 +463,7 @@ ob_set_subtarget(ob_storage compaction
compaction/ob_server_compaction_event_history.cpp
compaction/ob_compaction_util.cpp
compaction/ob_partition_rows_merger.cpp
compaction/ob_storage_locality_cache.cpp
)
ob_set_subtarget(ob_storage concurrency_control

View File

@ -683,6 +683,7 @@ int ObMediumCompactionScheduleFunc::check_medium_meta_table(
const int64_t check_medium_snapshot,
const ObLSID &ls_id,
const ObTabletID &tablet_id,
const ObLSLocality &ls_locality,
bool &merge_finish)
{
int ret = OB_SUCCESS;
@ -702,6 +703,7 @@ int ObMediumCompactionScheduleFunc::check_medium_meta_table(
} else {
const ObArray<ObTabletReplica> &replica_array = tablet_info.get_replicas();
int64_t unfinish_cnt = 0;
int64_t filter_cnt = 0;
bool pass = true;
for (int i = 0; OB_SUCC(ret) && i < replica_array.count(); ++i) {
const ObTabletReplica &replica = replica_array.at(i);
@ -712,15 +714,23 @@ int ObMediumCompactionScheduleFunc::check_medium_meta_table(
LOG_WARN("filter replica failed", K(ret), K(replica), K(filters_));
} else if (!pass) {
// do nothing
filter_cnt++;
} else if (replica.get_snapshot_version() >= check_medium_snapshot) {
// replica may have check_medium_snapshot = 2, but have received medium info of 3,
// when this replica is elected as leader, this will happened
} else if (ls_locality.is_valid()) {
if (ls_locality.check_exist(replica.get_server())) {
unfinish_cnt++;
} else {
filter_cnt++;
LOG_TRACE("filter by ls locality", K(ret), K(replica));
}
} else {
unfinish_cnt++;
}
} // end of for
FLOG_INFO("check_medium_compaction_finish", K(ret), K(ls_id), K(tablet_id), K(check_medium_snapshot),
K(unfinish_cnt), "total_cnt", replica_array.count());
LOG_INFO("check_medium_compaction_finish", K(ret), K(ls_id), K(tablet_id), K(check_medium_snapshot),
K(unfinish_cnt), K(filter_cnt), "total_cnt", replica_array.count());
if (0 == unfinish_cnt) { // merge finish
merge_finish = true;
@ -795,7 +805,7 @@ int ObMediumCompactionScheduleFunc::check_medium_checksum_table(
}
// for Leader, clean wait_check_medium_scn
int ObMediumCompactionScheduleFunc::check_medium_finish()
int ObMediumCompactionScheduleFunc::check_medium_finish(const ObLSLocality &ls_locality)
{
int ret = OB_SUCCESS;
const ObLSID &ls_id = ls_.get_ls_id();
@ -805,7 +815,7 @@ int ObMediumCompactionScheduleFunc::check_medium_finish()
if (0 == wait_check_medium_scn) {
// do nothing
} else if (OB_FAIL(check_medium_meta_table(wait_check_medium_scn, ls_id, tablet_id, merge_finish))) {
} else if (OB_FAIL(check_medium_meta_table(wait_check_medium_scn, ls_id, tablet_id, ls_locality, merge_finish))) {
LOG_WARN("failed to check inner table", K(ret), KPC(this));
} else if (!merge_finish) {
// do nothing

View File

@ -60,7 +60,7 @@ public:
const bool is_major,
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE);
int check_medium_finish();
int check_medium_finish(const ObLSLocality &ls_locality);
int64_t to_string(char* buf, const int64_t buf_len) const;
@ -85,6 +85,7 @@ protected:
const int64_t medium_snapshot,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id,
const ObLSLocality &ls_locality,
bool &merge_finish);
int init_tablet_filters();
static int check_medium_checksum_table(

View File

@ -0,0 +1,395 @@
//Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#define USING_LOG_PREFIX STORAGE_COMPACTION
#include "storage/compaction/ob_storage_locality_cache.h"
#include "src/share/ob_zone_merge_info.h"
#include "observer/ob_server_struct.h"
#include "src/share/ob_zone_merge_table_operator.h"
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "lib/mysqlclient/ob_mysql_result.h"
namespace oceanbase
{
namespace compaction
{
using namespace share;
int ObLSLocalityInCache::assgin(ObLSLocalityInCache &ls_locality)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!ls_locality.is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "input ls locality is invalid", K(ret), K(ls_locality));
} else {
reset();
if (OB_FAIL(svr_addr_list_.assign(ls_locality.svr_addr_list_))) {
STORAGE_LOG(WARN, "failed to copy svr addr list", K(ret), K(ls_locality));
} else {
ls_id_ = ls_locality.ls_id_;
}
}
return ret;
}
// locality will return exist when ls locality is invalid
bool ObLSLocality::check_exist(const common::ObAddr &addr) const
{
bool exist = true;
if (is_valid()) {
exist = false;
for (int64_t j = 0; j < svr_addr_list_.count(); ++j) {
if (addr == svr_addr_list_.at(j)) {
exist = true;
break;
}
} // end of for
}
return exist;
}
ObStorageLocalityCache::ObStorageLocalityCache()
: is_inited_(false),
lock_(),
tenant_id_(OB_INVALID_TENANT_ID),
sql_proxy_(nullptr),
alloc_buf_(nullptr),
allocator_("StoLocCache"),
ls_locality_array_(OB_MALLOC_NORMAL_BLOCK_SIZE, allocator_)
{}
ObStorageLocalityCache::~ObStorageLocalityCache()
{
reset();
}
int ObStorageLocalityCache::init(
uint64_t tenant_id,
ObMySQLProxy *sql_proxy)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", KR(ret));
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == sql_proxy)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id), KP(sql_proxy));
} else {
tenant_id_ = tenant_id;
sql_proxy_ = sql_proxy;
is_inited_ = true;
}
return ret;
}
void ObStorageLocalityCache::reset()
{
is_inited_ = false;
tenant_id_ = OB_INVALID_TENANT_ID;
sql_proxy_ = nullptr;
ls_locality_array_.reset();
if (OB_NOT_NULL(alloc_buf_)) {
allocator_.free(alloc_buf_);
alloc_buf_ = nullptr;
}
allocator_.reset();
}
int ObStorageLocalityCache::refresh_ls_locality()
{
int ret = OB_SUCCESS;
int64_t cost_ts = ObTimeUtility::fast_current_time();
ObSEArray<ObZone, 10> zone_list;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObStorageLocalityCache is not inited", KR(ret));
} else if (OB_FAIL(get_zone_list(zone_list))) {
LOG_WARN("failed to get zone list", K(ret));
} else if (OB_FAIL(get_ls_locality_by_zone(zone_list))) {
LOG_WARN("failed to get ls locality by zone", K(ret));
}
cost_ts = ObTimeUtility::fast_current_time() - cost_ts;
LOG_INFO("refresh ls locality cache", K(ret), K(cost_ts), "ls_count", ls_locality_array_.count(),
K_(ls_locality_array));
return ret;
}
int ObStorageLocalityCache::get_zone_list(ObIArray<ObZone> &zone_list)
{
int ret = OB_SUCCESS;
zone_list.reuse();
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
sqlclient::ObMySQLResult *result = nullptr;
if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE tenant_id = '%lu' AND previous_locality = ''",
OB_ALL_TENANT_TNAME, tenant_id_))) {
LOG_WARN("fail to append sql", KR(ret), K_(tenant_id));
} else if (OB_FAIL(sql_proxy_->read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
LOG_WARN("fail to execute sql", KR(ret), K_(tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get sql result", KR(ret), K_(tenant_id), K(sql));
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next", KR(ret), K_(tenant_id), K(sql));
} else {
ret = OB_SUCCESS;
}
} else {
int64_t tmp_real_str_len = 0; // used to fill output argument
SMART_VAR(char[MAX_ZONE_LIST_LENGTH], zone_list_str) {
zone_list_str[0] = '\0';
EXTRACT_STRBUF_FIELD_MYSQL(*result, "zone_list", zone_list_str,
MAX_ZONE_LIST_LENGTH, tmp_real_str_len);
if (FAILEDx(str2zone_list(zone_list_str, zone_list))) {
LOG_WARN("fail to str2zone_list", KR(ret), K(zone_list_str));
}
}
}
}
return ret;
}
// TODO(@lixia.yq) replace ls_locality cache in RS::ObZoneMergeManagerBase
int ObStorageLocalityCache::str2zone_list(
const char *str,
ObIArray<ObZone> &zone_list)
{
int ret = OB_SUCCESS;
char *item_str = NULL;
char *save_ptr = NULL;
zone_list.reuse();
if (NULL == str) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("str is null", KP(str), K(ret));
} else {
while (OB_SUCC(ret)) {
item_str = strtok_r((NULL == item_str ? const_cast<char *>(str) : NULL), ";", &save_ptr);
if (NULL != item_str) {
if (OB_FAIL(zone_list.push_back(ObZone(item_str)))) {
LOG_WARN("fail to push_back", KR(ret));
}
} else {
break;
}
}
}
return ret;
}
int ObStorageLocalityCache::get_ls_locality_cnt(
ObMySQLTransaction &trans,
const ObIArray<ObZone> &zone_list,
int64_t &ls_locality_cnt)
{
int ret = OB_SUCCESS;
ls_locality_cnt = 0;
const uint64_t meta_tenant_id = get_private_table_exec_tenant_id(tenant_id_);
SMART_VAR(common::ObMySQLProxy::MySQLResult, res) {
ObSqlString sql;
common::sqlclient::ObMySQLResult *result = nullptr;
if (OB_FAIL(sql.assign_fmt("select count(*) ls_count from %s WHERE tenant_id = '%lu' AND zone IN ( ",
OB_ALL_LS_META_TABLE_TNAME, tenant_id_))) {
LOG_WARN("fail to assign sql", K(ret));
} else if (OB_FAIL(append_zone_info(zone_list, sql))) {
LOG_WARN("failed to append zone info", K(ret), K(zone_list));
} else if (OB_FAIL(sql.append_fmt(")"))) {
LOG_WARN("failed to append sql", KR(ret), K_(tenant_id));
} else if (OB_FAIL(trans.read(res, meta_tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", K(ret), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result is nullptr", K(ret), K(sql));
} else if (OB_FAIL(result->next())) {
LOG_WARN("no result", K(ret), K(sql));
} else {
int64_t count = 0;
EXTRACT_INT_FIELD_MYSQL(*result, OB_STR_LS_COUNT, count, int64_t);
if (OB_SUCC(ret)) {
ls_locality_cnt = count;
}
}
}
return ret;
}
int ObStorageLocalityCache::get_ls_locality_by_zone(const ObIArray<ObZone> &zone_list)
{
int ret = OB_SUCCESS;
const uint64_t meta_tenant_id = get_private_table_exec_tenant_id(tenant_id_);
int64_t ls_locality_cnt = 0;
ObMySQLTransaction trans; // read trans
if (OB_UNLIKELY(zone_list.empty())) {
LOG_TRACE("zone list is empty, skip get ls locality", K(ret), K_(tenant_id));
} else if (OB_FAIL(trans.start(sql_proxy_, meta_tenant_id))) {
LOG_WARN("fail to start transaction", KR(ret), K_(tenant_id), K(meta_tenant_id));
} else if (OB_FAIL(get_ls_locality_cnt(trans, zone_list, ls_locality_cnt))) {
LOG_WARN("failed to get ls locality cnt", K(ret));
} else if (OB_UNLIKELY(0 == ls_locality_cnt)) {
LOG_WARN("empty ls locality", K(ls_locality_cnt));
} else {
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
sqlclient::ObMySQLResult *result = nullptr;
if (OB_FAIL(sql.assign_fmt("SELECT ls_id,svr_ip,svr_port FROM %s WHERE tenant_id = '%lu' AND zone IN ( ",
OB_ALL_LS_META_TABLE_TNAME, tenant_id_))) {
LOG_WARN("failed to append sql", KR(ret), K_(tenant_id));
} else if (OB_FAIL(append_zone_info(zone_list, sql))) {
LOG_WARN("failed to append zone info", K(ret), K(zone_list));
} else if (OB_FAIL(sql.append_fmt(") ORDER BY ls_id"))) {
LOG_WARN("failed to append sql", KR(ret), K_(tenant_id));
} else if (OB_FAIL(trans.read(res, meta_tenant_id, sql.ptr()))) {
LOG_WARN("failed to execute sql", KR(ret), K_(tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get sql result", KR(ret), K_(tenant_id), K(sql));
} else if (OB_FAIL(generate_ls_locality(ls_locality_cnt, *result))) {
LOG_WARN("failed to generate ls locality", K(ret), K(ls_locality_cnt));
}
}
}
return ret;
}
int ObStorageLocalityCache::generate_ls_locality(
const int64_t ls_locality_cnt,
sqlclient::ObMySQLResult &result)
{
int ret = OB_SUCCESS;
void *buf = nullptr;
ObAddr *svr_addr_list = nullptr;
if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObAddr) * ls_locality_cnt))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc ls svr addr list", K(ret));
} else {
svr_addr_list = new (buf) ObAddr[ls_locality_cnt];
}
common::ObArray<ObLSLocalityInCache> tmp_ls_locality_array;
ObLSLocalityInCache ls_locality;
int64_t ls_id = 0;
ObString svr_ip;
int64_t svr_port = 0;
int64_t idx = 0;
while (OB_SUCC(ret)) {
if (OB_FAIL(result.next())) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next result", KR(ret));
} else {
ret = OB_SUCCESS;
break;
}
} else if (OB_UNLIKELY(idx >= ls_locality_cnt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("idx is unexpected invalid", K(ret), K(idx), K(ls_locality_cnt));
} else {
ObAddr &svr_addr = svr_addr_list[idx++];
EXTRACT_INT_FIELD_MYSQL(result, "ls_id", ls_id, int64_t);
EXTRACT_VARCHAR_FIELD_MYSQL(result, "svr_ip", svr_ip);
EXTRACT_INT_FIELD_MYSQL(result, "svr_port", svr_port, int64_t);
if (OB_FAIL(ret)) {
} else if (!ls_locality.ls_id_.is_valid()) {
ls_locality.ls_id_ = ObLSID(ls_id);
} else if (ls_locality.ls_id_.id() != ls_id) { // not same ls
if (OB_FAIL(tmp_ls_locality_array.push_back(ls_locality))) {
LOG_WARN("failed to push ls_locality into array", K(ret), K(ls_locality));
} else {
ls_locality.reset();
ls_locality.ls_id_ = ObLSID(ls_id);
}
}
if (OB_FAIL(ret)) {
} else if (false == svr_addr.set_ip_addr(svr_ip, svr_port)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to set svr addr", K(ret));
} else if (OB_FAIL(ls_locality.svr_addr_list_.push_back(&svr_addr))) {
LOG_WARN("failed to add svr_addr to list", K(ret), K(svr_addr));
}
}
} // end of while
if (OB_SUCC(ret)) {
if (ls_locality.is_valid() && OB_FAIL(tmp_ls_locality_array.push_back(ls_locality))) {
LOG_WARN("failed to push ls_locality into array", K(ret), K(ls_locality));
} else {
lib::ObMutexGuard guard(lock_);
if (OB_FAIL(ls_locality_array_.assign(tmp_ls_locality_array))) {
LOG_WARN("failed to assign ls_locality array", K(ret), K(tmp_ls_locality_array));
} else {
if (OB_NOT_NULL(alloc_buf_)) {
allocator_.free(alloc_buf_);
}
alloc_buf_ = buf;
LOG_DEBUG("success to refresh ls locality", K(ret), K(ls_locality_array_));
}
} // end of lock
}
if (OB_FAIL(ret) && nullptr != buf) {
ls_locality.reset();
tmp_ls_locality_array.reset();
allocator_.free(buf);
}
return ret;
}
int ObStorageLocalityCache::append_zone_info(
const ObIArray<ObZone> &zone_list,
ObSqlString &sql)
{
int ret = OB_SUCCESS;
for (int64_t idx = 0; OB_SUCC(ret) && (idx < zone_list.count()); ++idx) {
const ObZone &zone = zone_list.at(idx);
if (OB_UNLIKELY(zone.is_empty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("zone info is unexpected empty", K(ret), K(zone));
} else if (OB_FAIL(sql.append_fmt(
"%s '%s'",
0 == idx ? "" : ",",
zone.str().ptr()))) {
LOG_WARN("fail to assign sql", KR(ret), K(idx), K(zone));
}
}
return ret;
}
// if locality not exist in cache, will return invalid ls_locality(not filter any addr)
int ObStorageLocalityCache::get_ls_locality(
const share::ObLSID &ls_id,
ObLSLocality &ls_locality)
{
int ret = OB_SUCCESS;
ls_locality.reset();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObStorageLocalityCache is not inited", KR(ret));
} else if (OB_UNLIKELY(!ls_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else {
lib::ObMutexGuard guard(lock_);
for (int64_t i = 0; OB_SUCC(ret) && i < ls_locality_array_.count(); ++i) {
ObLSLocalityInCache &locality = ls_locality_array_[i];
if (locality.ls_id_ == ls_id) {
ls_locality.ls_id_ = ls_id;
for (int64_t j = 0; OB_SUCC(ret) && j < locality.svr_addr_list_.count(); ++j) {
if (OB_ISNULL(locality.svr_addr_list_[j])) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("svr addr in locality cache is unexpected null", K(ret), K(j), K(locality));
} else if (OB_FAIL(ls_locality.svr_addr_list_.push_back(*locality.svr_addr_list_[j]))) {
LOG_WARN("failed to push back addr", K(ret), K(locality));
}
}
break;
}
} // end of for
} // end of lock
return ret;
}
} // namespace compaction
} // namespace oceanbase

View File

@ -0,0 +1,111 @@
//Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#ifndef OB_STORAGE_COMPACTION_LOCALITY_CACHE_H_
#define OB_STORAGE_COMPACTION_LOCALITY_CACHE_H_
#include "share/ob_ls_id.h"
#include "deps/oblib/src/lib/net/ob_addr.h"
#include "deps/oblib/src/common/ob_zone.h"
namespace oceanbase
{
namespace common
{
class ObMySQLProxy;
class ObMySQLTransaction;
namespace sqlclient
{
class ObMySQLResult;
}
}
namespace compaction
{
template<typename T>
struct ObLSLocalityStruct
{
public:
ObLSLocalityStruct()
: ls_id_(),
svr_addr_list_()
{}
~ObLSLocalityStruct()
{
}
void reset()
{
ls_id_.reset();
svr_addr_list_.reset();
}
bool is_valid() const
{
return ls_id_.is_valid() && svr_addr_list_.count() > 0;
}
TO_STRING_KV(K_(ls_id), K_(svr_addr_list));
share::ObLSID ls_id_;
common::ObArray<T> svr_addr_list_;
};
struct ObLSLocalityInCache : public ObLSLocalityStruct<ObAddr *>
{
public:
int assgin(ObLSLocalityInCache &locality);
};
struct ObLSLocality : public ObLSLocalityStruct<ObAddr>
{
public:
bool check_exist(const common::ObAddr &addr) const;
};
class ObStorageLocalityCache
{
public:
ObStorageLocalityCache();
~ObStorageLocalityCache();
int init(uint64_t tenant_id, ObMySQLProxy *sql_proxy);
void reset();
int refresh_ls_locality();
int get_ls_locality(const share::ObLSID &ls_id, ObLSLocality &ls_locality);
private:
int get_zone_list(ObIArray<common::ObZone> &zone_list);
static int str2zone_list(
const char *str,
ObIArray<common::ObZone> &zone_list);
int get_ls_locality_by_zone(const ObIArray<common::ObZone> &zone_list);
static int append_zone_info(
const ObIArray<common::ObZone> &zone_list,
ObSqlString &sql);
int get_ls_locality_cnt(
ObMySQLTransaction &trans,
const ObIArray<common::ObZone> &zone_list,
int64_t &ls_locality_cnt);
int generate_ls_locality(
const int64_t ls_locality_cnt,
sqlclient::ObMySQLResult &result);
TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(ls_locality_array));
private:
bool is_inited_;
lib::ObMutex lock_;
uint64_t tenant_id_;
ObMySQLProxy *sql_proxy_;
void *alloc_buf_;
common::DefaultPageAllocator allocator_;
common::ObArray<ObLSLocalityInCache, DefaultPageAllocator> ls_locality_array_;
};
} // namespace compaction
} // namespace oceanbase
#endif // OB_STORAGE_COMPACTION_LOCALITY_CACHE_H_

View File

@ -168,7 +168,8 @@ ObTenantTabletScheduler::ObTenantTabletScheduler()
sstable_gc_task_(),
fast_freeze_checker_(),
enable_adaptive_compaction_(false),
error_tablet_cnt_(0)
error_tablet_cnt_(0),
ls_locality_cache_()
{
STATIC_ASSERT(static_cast<int64_t>(NO_MAJOR_MERGE_TYPE_CNT) == ARRAYSIZEOF(MERGE_TYPES), "merge type array len is mismatch");
}
@ -194,6 +195,7 @@ void ObTenantTabletScheduler::destroy()
medium_loop_tg_id_ = 0;
sstable_gc_tg_id_ = 0;
schedule_interval_ = 0;
ls_locality_cache_.reset();
is_inited_ = false;
LOG_INFO("The ObTenantTabletScheduler destroy");
}
@ -224,6 +226,8 @@ int ObTenantTabletScheduler::init()
MTL_ID(),
"bf_queue"))) {
LOG_WARN("Fail to init bloom filter queue", K(ret));
} else if (OB_FAIL(ls_locality_cache_.init(MTL_ID(), GCTX.sql_proxy_))) {
LOG_WARN("failed to init ls locality cache", K(ret), KP(GCTX.sql_proxy_));
} else {
schedule_interval_ = schedule_interval;
is_inited_ = true;
@ -936,7 +940,7 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
bool is_leader = false;
bool could_major_merge = false;
const int64_t major_frozen_scn = get_frozen_version();
ObLSLocality ls_locality;
if (MTL(ObTenantTabletScheduler *)->could_major_merge_start()) {
could_major_merge = true;
} else if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
@ -952,6 +956,9 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
}
} else if (is_leader_by_election(role)) {
is_leader = true;
if (OB_FAIL(ls_locality_cache_.get_ls_locality(ls_id, ls_locality))) {
LOG_WARN("failed to get ls locality", K(ret), K(ls_id));
}
}
} else {
all_ls_weak_read_ts_ready = false;
@ -994,7 +1001,7 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
if (!is_leader || OB_ISNULL(latest_major)) {
// follower or no major: do nothing
} else if (tablet->get_medium_compaction_info_list().need_check_finish()) { // need check finished
if (OB_TMP_FAIL(func.check_medium_finish())) {
if (OB_TMP_FAIL(func.check_medium_finish(ls_locality))) {
LOG_WARN("failed to check medium finish", K(tmp_ret), K(ls_id), K(tablet_id));
} else if (ObTimeUtility::fast_current_time() <
tablet->get_medium_compaction_info_list().get_wait_check_medium_scn() + WAIT_MEDIUM_CHECK_THRESHOLD) {
@ -1070,6 +1077,13 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
if (REACH_TENANT_TIME_INTERVAL(CHECK_REPORT_SCN_INTERVAL)) {
check_report_scn_flag = true;
}
if (REACH_TENANT_TIME_INTERVAL(CHECK_LS_LOCALITY_INTERVAL)) {
if (OB_TMP_FAIL(ls_locality_cache_.refresh_ls_locality())) {
LOG_WARN("failed to refresh ls locality", K(tmp_ret));
ADD_SUSPECT_INFO(MEDIUM_MERGE, share::ObLSID(INT64_MAX), ObTabletID(INT64_MAX),
"refresh ls locality cache failed", "errno", tmp_ret);
}
}
#ifdef ERRSIM
check_report_scn_flag = true;
#endif

View File

@ -19,6 +19,7 @@
#include "storage/ob_i_store.h"
#include "storage/compaction/ob_tablet_merge_task.h"
#include "storage/compaction/ob_partition_merge_policy.h"
#include "storage/compaction/ob_storage_locality_cache.h"
namespace oceanbase
{
@ -225,10 +226,11 @@ private:
static const int64_t DEFAULT_HASH_MAP_BUCKET_CNT = 1009;
static const int64_t DEFAULT_COMPACTION_SCHEDULE_INTERVAL = 30 * 1000 * 1000L; // 30s
static const int64_t CHECK_WEAK_READ_TS_SCHEDULE_INTERVAL = 10 * 1000 * 1000L; // 10s
static const int64_t CHECK_REPORT_SCN_INTERVAL = 2 * 60 * 1000 * 1000L; // 2m, temp solution, change to 10m later
static const int64_t CHECK_REPORT_SCN_INTERVAL = 5 * 60 * 1000 * 1000L; // 5m
static const int64_t ADD_LOOP_EVENT_INTERVAL = 120 * 1000 * 1000L; // 120s
static const int64_t WAIT_MEDIUM_CHECK_THRESHOLD = 10 * 60 * 1000 * 1000L; // 10m
static const int64_t PRINT_LOG_INVERVAL = 2 * 60 * 1000 * 1000L; // 2m
static const int64_t CHECK_LS_LOCALITY_INTERVAL = 5 * 60 * 1000 * 1000L; // 5m
private:
bool is_inited_;
bool major_merge_status_;
@ -250,6 +252,7 @@ private:
ObFastFreezeChecker fast_freeze_checker_;
bool enable_adaptive_compaction_;
int64_t error_tablet_cnt_; // for diagnose
compaction::ObStorageLocalityCache ls_locality_cache_;
};
} // namespace storage