Implement of auto refresh location cache
This commit is contained in:
parent
12e315115d
commit
418f092462
@ -569,6 +569,7 @@ PCODE_DEF(OB_UPDATE_STANDBY_CLUSTER_INFO, 0x913)
|
||||
PCODE_DEF(OB_CHECK_NEED_OFFLINE_REPLICA, 0x914)
|
||||
PCODE_DEF(OB_GET_MEMBER_LIST_AND_LEADER_V2, 0x915)
|
||||
PCODE_DEF(OB_CHECK_FLASHBACK_INFO_DUMP, 0x916)
|
||||
PCODE_DEF(OB_BROADCAST_LOCATIONS, 0x917)
|
||||
|
||||
PCODE_DEF(OB_RPC_ASSEMBLE, 0x1000)
|
||||
|
||||
|
@ -37,6 +37,7 @@ ob_set_subtarget(ob_server common
|
||||
ob_lease_state_mgr.cpp
|
||||
ob_partition_table_checker.cpp
|
||||
ob_partition_table_updater.cpp
|
||||
ob_partition_location_updater.cpp
|
||||
ob_rebuild_flag_reporter.cpp
|
||||
ob_root_service_monitor.cpp
|
||||
ob_rpc_extra_payload.cpp
|
||||
|
@ -65,6 +65,16 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
inline bool need_assign_when_equal() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
inline int assign_when_equal(const ObIndexStatusReporter& other)
|
||||
{
|
||||
UNUSED(other);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
TO_STRING_KV(K_(part_key), K_(self), K_(index_table_id), K_(index_status), K_(ret_code));
|
||||
|
||||
private:
|
||||
|
319
src/observer/ob_partition_location_updater.cpp
Normal file
319
src/observer/ob_partition_location_updater.cpp
Normal file
@ -0,0 +1,319 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#define USING_LOG_PREFIX SERVER
|
||||
|
||||
#include "common/ob_partition_key.h"
|
||||
#include "common/ob_role.h"
|
||||
#include "lib/thread_local/ob_tsi_factory.h"
|
||||
#include "rootserver/ob_rs_async_rpc_proxy.h"
|
||||
#include "share/config/ob_server_config.h"
|
||||
#include "share/partition_table/ob_partition_location_cache.h"
|
||||
#include "share/ob_alive_server_tracer.h"
|
||||
#include "storage/ob_partition_service.h"
|
||||
#include "observer/ob_service.h"
|
||||
#include "observer/ob_partition_location_updater.h"
|
||||
|
||||
namespace oceanbase {
|
||||
namespace observer {
|
||||
|
||||
using namespace common;
|
||||
using namespace share;
|
||||
|
||||
static const char* location_queue_type[] = {"LOCATION_SENDER", "LOCATION_RECEIVER", "LOCATION_INVALID"};
|
||||
|
||||
int ObPartitionLocationUpdater::init(observer::ObService& ob_service, storage::ObPartitionService*& partition_service,
|
||||
obrpc::ObSrvRpcProxy*& srv_rpc_proxy, share::ObPartitionLocationCache*& location_cache,
|
||||
share::ObIAliveServerTracer& server_tracer)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
static_assert(
|
||||
QueueType::MAX_TYPE == ARRAYSIZEOF(location_queue_type) - 1, "type str array size mismatch with type cnt");
|
||||
if (inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("inited twice", KR(ret));
|
||||
} else if (OB_ISNULL(partition_service) || OB_ISNULL(srv_rpc_proxy) || OB_ISNULL(location_cache)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("ptr is null", KR(ret), KP(partition_service), KP(srv_rpc_proxy), KP(location_cache));
|
||||
} else {
|
||||
queue_size_ = !lib::is_mini_mode() ? MAX_PARTITION_CNT : MINI_MODE_MAX_PARTITION_CNT;
|
||||
thread_cnt_ = !lib::is_mini_mode() ? GCONF.location_refresh_thread_count : UPDATER_THREAD_CNT;
|
||||
if (OB_FAIL(sender_.init(this, thread_cnt_, queue_size_, "PTSender"))) {
|
||||
LOG_WARN("init sender updater queue failed", KR(ret), K_(queue_size), K_(thread_cnt));
|
||||
} else if (OB_FAIL(receiver_.init(this, thread_cnt_, queue_size_, "PTReceiver"))) {
|
||||
LOG_WARN("init receiver updater queue failed", KR(ret), K_(queue_size), K_(thread_cnt));
|
||||
} else {
|
||||
ob_service_ = &ob_service;
|
||||
partition_service_ = partition_service;
|
||||
srv_rpc_proxy_ = srv_rpc_proxy;
|
||||
location_cache_ = location_cache;
|
||||
server_tracer_ = &server_tracer;
|
||||
inited_ = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartitionLocationUpdater::check_inner_stat() const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("updater is not inited yet", KR(ret));
|
||||
} else if (stopped_) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_WARN("updater is stopped now", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObPartitionLocationUpdater::stop()
|
||||
{
|
||||
stopped_ = true;
|
||||
sender_.stop();
|
||||
receiver_.stop();
|
||||
}
|
||||
|
||||
void ObPartitionLocationUpdater::wait()
|
||||
{
|
||||
if (stopped_) {
|
||||
sender_.wait();
|
||||
receiver_.wait();
|
||||
}
|
||||
}
|
||||
|
||||
void ObPartitionLocationUpdater::destroy()
|
||||
{
|
||||
stop();
|
||||
wait();
|
||||
}
|
||||
|
||||
int ObPartitionLocationUpdater::submit_broadcast_task(const ObPartitionBroadcastTask& task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!GCONF.enable_auto_refresh_location_cache) {
|
||||
// skip
|
||||
} else if (OB_FAIL(check_inner_stat())) {
|
||||
LOG_WARN("fail to check inner stat", KR(ret));
|
||||
} else if (OB_FAIL(sender_.add(task))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to add task", KR(ret), K(task));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartitionLocationUpdater::submit_update_task(const ObPartitionUpdateTask& task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!GCONF.enable_auto_refresh_location_cache) {
|
||||
// skip
|
||||
} else if (OB_FAIL(check_inner_stat())) {
|
||||
LOG_WARN("fail to check inner stat", KR(ret));
|
||||
} else if (OB_FAIL(receiver_.add(task))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to add task", KR(ret), K(task));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartitionLocationUpdater::process_barrier(const ObPartitionBroadcastTask& task, bool& stopped)
|
||||
{
|
||||
UNUSED(task);
|
||||
UNUSED(stopped);
|
||||
return OB_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
int ObPartitionLocationUpdater::process_barrier(const ObPartitionUpdateTask& task, bool& stopped)
|
||||
{
|
||||
UNUSED(task);
|
||||
UNUSED(stopped);
|
||||
return OB_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
int ObPartitionLocationUpdater::batch_process_tasks(
|
||||
const ObIArray<ObPartitionBroadcastTask>& batch_tasks, bool& stopped)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObCurTraceId::init(GCONF.self_addr_);
|
||||
obrpc::ObPartitionBroadcastArg arg;
|
||||
if (OB_FAIL(check_inner_stat())) {
|
||||
LOG_WARN("fail to check inner stat", KR(ret));
|
||||
} else if (stopped) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_WARN("updater is is stopped", KR(ret), K(stopped));
|
||||
} else if (!GCONF.enable_auto_refresh_location_cache) {
|
||||
// skip
|
||||
} else if (OB_UNLIKELY(batch_tasks.count() <= 0)) {
|
||||
// skip
|
||||
} else {
|
||||
int64_t start_ts = ObTimeUtility::current_time();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < batch_tasks.count(); i++) {
|
||||
const ObPartitionBroadcastTask& task = batch_tasks.at(i);
|
||||
ObPartitionKey pkey;
|
||||
ObRole role = FOLLOWER;
|
||||
if (OB_FAIL(pkey.init(task.get_table_id(), task.get_partition_id(), task.get_partition_cnt()))) {
|
||||
LOG_WARN("init pkey failed", KR(ret), K(task));
|
||||
} else if (OB_FAIL(partition_service_->get_role(pkey, role))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret || OB_PARTITION_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to get role", KR(ret), K(pkey));
|
||||
}
|
||||
} else if (is_strong_leader(role)) {
|
||||
if (OB_FAIL(arg.keys_.push_back(task))) {
|
||||
LOG_WARN("fail to push back task", KR(ret), K(task));
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("broadcast task is", KR(ret), K(task), K(pkey), K(role));
|
||||
} // end for
|
||||
|
||||
if (OB_SUCC(ret) && arg.keys_.count() > 0) {
|
||||
rootserver::ObBroadcastLocationProxy proxy(*srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::broadcast_locations);
|
||||
ObArray<ObAddr> alive_servers;
|
||||
const int64_t timeout = GCONF.location_cache_refresh_rpc_timeout;
|
||||
if (OB_FAIL(server_tracer_->get_active_server_list(alive_servers))) {
|
||||
LOG_WARN("fail to get alive server list", KR(ret));
|
||||
}
|
||||
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < alive_servers.count(); i++) {
|
||||
ObAddr& addr = alive_servers.at(i);
|
||||
if (OB_FAIL(proxy.call(addr, timeout, arg))) {
|
||||
LOG_WARN("fail to call addr", KR(ret), K(addr), K(timeout), K(arg));
|
||||
}
|
||||
} // end for
|
||||
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = proxy.wait())) {
|
||||
LOG_WARN("fail to wait rpc callback", KR(tmp_ret));
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
|
||||
LOG_DEBUG("try broadcast location", K(arg));
|
||||
|
||||
int64_t exec_ts = ObTimeUtility::current_time() - start_ts;
|
||||
int64_t wait_ts = 0;
|
||||
(void)control_rate_limit(
|
||||
QueueType::SENDER, exec_ts, arg.keys_.count(), GCONF.auto_broadcast_location_cache_rate_limit, wait_ts);
|
||||
(void)dump_statistic(QueueType::SENDER, ret, exec_ts, wait_ts, arg.keys_.count());
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartitionLocationUpdater::batch_process_tasks(const ObIArray<ObPartitionUpdateTask>& batch_tasks, bool& stopped)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObCurTraceId::init(GCONF.self_addr_);
|
||||
if (OB_FAIL(check_inner_stat())) {
|
||||
LOG_WARN("fail to check inner stat", KR(ret));
|
||||
} else if (stopped) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_WARN("updater is is stopped", KR(ret), K(stopped));
|
||||
} else if (!GCONF.enable_auto_refresh_location_cache) {
|
||||
// skip
|
||||
} else if (OB_UNLIKELY(batch_tasks.count() <= 0)) {
|
||||
// skip
|
||||
} else {
|
||||
int64_t start_ts = ObTimeUtility::current_time();
|
||||
int64_t renew_cnt = 0;
|
||||
LOG_DEBUG("try renew location", K(batch_tasks));
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < batch_tasks.count(); i++) {
|
||||
const ObPartitionUpdateTask& task = batch_tasks.at(i);
|
||||
ObPartitionKey pkey;
|
||||
ObPartitionLocation location;
|
||||
if (OB_FAIL(pkey.init(task.get_table_id(), task.get_partition_id(), 0 /*partition_cnt, no used here*/))) {
|
||||
LOG_WARN("init pkey failed", KR(ret), K(task));
|
||||
} else if (OB_FAIL(location_cache_->nonblock_get(pkey, location))) {
|
||||
if (OB_LOCATION_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to get location", KR(ret), K(pkey));
|
||||
}
|
||||
} else if (task.get_timestamp() > location.get_renew_time()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = location_cache_->nonblock_renew(pkey, 0 /*expire_renew_time*/))) {
|
||||
LOG_WARN("nonblock renew failed", KR(tmp_ret), K(pkey));
|
||||
} else {
|
||||
renew_cnt++;
|
||||
LOG_DEBUG("try renew location", K(pkey), K(task), K(location));
|
||||
}
|
||||
}
|
||||
}
|
||||
int64_t exec_ts = ObTimeUtility::current_time() - start_ts;
|
||||
int64_t wait_ts = 0;
|
||||
(void)control_rate_limit(
|
||||
QueueType::RECEIVER, exec_ts, renew_cnt, GCONF.auto_refresh_location_cache_rate_limit, wait_ts);
|
||||
(void)dump_statistic(QueueType::RECEIVER, ret, exec_ts, wait_ts, renew_cnt);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObPartitionLocationUpdater::dump_statistic(
|
||||
const QueueType queue_type, const int exec_ret, const int64_t exec_ts, const int64_t wait_ts, const int64_t cnt)
|
||||
{
|
||||
TSILocationStatistics* statistics = GET_TSI(TSILocationStatistics);
|
||||
if (OB_ISNULL(statistics)) {
|
||||
LOG_WARN("fail to get statistic", "ret", OB_ERR_UNEXPECTED);
|
||||
} else {
|
||||
// calc statistic
|
||||
(void)statistics->calc(exec_ret, exec_ts, wait_ts, cnt);
|
||||
int64_t total_cnt = statistics->get_total_cnt();
|
||||
if (TC_REACH_TIME_INTERVAL(CHECK_INTERVAL_US) && total_cnt > 0) {
|
||||
QueueType type =
|
||||
(QueueType::SENDER <= queue_type && queue_type <= QueueType::RECEIVER) ? queue_type : QueueType::MAX_TYPE;
|
||||
ObTaskController::get().allow_next_syslog();
|
||||
LOG_INFO("[LOCATION_STATISTIC] auto refresh location statistics",
|
||||
"queue_type",
|
||||
location_queue_type[type],
|
||||
KPC(statistics),
|
||||
"avg_exec_us",
|
||||
statistics->total_exec_us_ / total_cnt,
|
||||
"avg_wait_us",
|
||||
statistics->total_wait_us_ / total_cnt);
|
||||
(void)statistics->reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ObPartitionLocationUpdater::control_rate_limit(
|
||||
const QueueType queue_type, const int64_t exec_ts, const int64_t cnt, int64_t rate_limit_conf, int64_t& wait_ts)
|
||||
{
|
||||
wait_ts = 0;
|
||||
TSILocationRateLimit* info = GET_TSI(TSILocationRateLimit);
|
||||
if (OB_ISNULL(info)) {
|
||||
LOG_WARN("fail to get info", "ret", OB_ERR_UNEXPECTED);
|
||||
} else {
|
||||
int64_t rate_limit = max(rate_limit_conf / thread_cnt_, 1);
|
||||
wait_ts = info->calc_wait_ts(cnt, exec_ts, rate_limit);
|
||||
if (wait_ts > 0) {
|
||||
QueueType type =
|
||||
(QueueType::SENDER <= queue_type && queue_type <= QueueType::RECEIVER) ? queue_type : QueueType::MAX_TYPE;
|
||||
ObTaskController::get().allow_next_syslog();
|
||||
LOG_INFO("[LOCATION_STATISTIC] rate limit",
|
||||
"queue_type",
|
||||
location_queue_type[type],
|
||||
KPC(info),
|
||||
K(rate_limit),
|
||||
K(wait_ts));
|
||||
usleep(wait_ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // end namespace observer
|
||||
} // end namespace oceanbase
|
104
src/observer/ob_partition_location_updater.h
Normal file
104
src/observer/ob_partition_location_updater.h
Normal file
@ -0,0 +1,104 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef OCEANBASE_OBSERVER_OB_PARTITION_LOCATION_UPDATER_H_
|
||||
#define OCEANBASE_OBSERVER_OB_PARTITION_LOCATION_UPDATER_H_
|
||||
|
||||
#include "share/ob_srv_rpc_proxy.h"
|
||||
#include "observer/ob_uniq_task_queue.h"
|
||||
#include "share/partition_table/ob_partition_location_task.h"
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
class ObIAliveServerTracer;
|
||||
class ObPartitionService;
|
||||
} // namespace storage
|
||||
namespace share {
|
||||
class ObPartitionLocationCache;
|
||||
}
|
||||
namespace obrpc {
|
||||
class ObSrvRpcProxy;
|
||||
}
|
||||
namespace observer {
|
||||
class ObPartitionLocationUpdater;
|
||||
class ObService;
|
||||
|
||||
typedef ObUniqTaskQueue<share::ObPartitionBroadcastTask, ObPartitionLocationUpdater> ObPTSenderQueue;
|
||||
typedef ObUniqTaskQueue<share::ObPartitionUpdateTask, ObPartitionLocationUpdater> ObPTReceiverQueue;
|
||||
|
||||
class ObPartitionLocationUpdater {
|
||||
public:
|
||||
const static int64_t UPDATER_THREAD_CNT = 1;
|
||||
const static int64_t MINI_MODE_MAX_PARTITION_CNT = 10000;
|
||||
const static int64_t MAX_PARTITION_CNT = 1000000;
|
||||
const static int64_t CHECK_INTERVAL_US = 1 * 1000 * 1000L; // 1s
|
||||
|
||||
enum QueueType { SENDER = 0, RECEIVER = 1, MAX_TYPE = 2 };
|
||||
|
||||
ObPartitionLocationUpdater()
|
||||
: inited_(false),
|
||||
stopped_(false),
|
||||
thread_cnt_(UPDATER_THREAD_CNT),
|
||||
queue_size_(MINI_MODE_MAX_PARTITION_CNT),
|
||||
ob_service_(NULL),
|
||||
partition_service_(NULL),
|
||||
srv_rpc_proxy_(NULL),
|
||||
location_cache_(NULL),
|
||||
server_tracer_(NULL),
|
||||
sender_(),
|
||||
receiver_()
|
||||
{}
|
||||
virtual ~ObPartitionLocationUpdater()
|
||||
{
|
||||
destroy();
|
||||
}
|
||||
|
||||
int init(observer::ObService& ob_service, storage::ObPartitionService*& partition_service,
|
||||
obrpc::ObSrvRpcProxy*& srv_rpc_proxy, share::ObPartitionLocationCache*& location_cache,
|
||||
share::ObIAliveServerTracer& server_tracer);
|
||||
|
||||
void stop();
|
||||
void wait();
|
||||
void destroy();
|
||||
|
||||
virtual int submit_broadcast_task(const share::ObPartitionBroadcastTask& task);
|
||||
virtual int submit_update_task(const share::ObPartitionUpdateTask& task);
|
||||
|
||||
virtual int process_barrier(const share::ObPartitionBroadcastTask& task, bool& stopped);
|
||||
virtual int process_barrier(const share::ObPartitionUpdateTask& task, bool& stopped);
|
||||
|
||||
virtual int batch_process_tasks(const common::ObIArray<share::ObPartitionBroadcastTask>& tasks, bool& stopped);
|
||||
virtual int batch_process_tasks(const common::ObIArray<share::ObPartitionUpdateTask>& tasks, bool& stopped);
|
||||
|
||||
private:
|
||||
int check_inner_stat() const;
|
||||
void dump_statistic(
|
||||
const QueueType queue_type, const int exec_ret, const int64_t exec_ts, const int64_t wait_ts, const int64_t cnt);
|
||||
void control_rate_limit(const QueueType queue_type, const int64_t exec_ts, const int64_t cnt,
|
||||
const int64_t rate_limit_conf, int64_t& wait_ts);
|
||||
|
||||
private:
|
||||
bool inited_;
|
||||
bool stopped_;
|
||||
int64_t thread_cnt_;
|
||||
int64_t queue_size_;
|
||||
observer::ObService* ob_service_;
|
||||
storage::ObPartitionService* partition_service_;
|
||||
obrpc::ObSrvRpcProxy* srv_rpc_proxy_;
|
||||
share::ObPartitionLocationCache* location_cache_;
|
||||
share::ObIAliveServerTracer* server_tracer_;
|
||||
ObPTSenderQueue sender_;
|
||||
ObPTReceiverQueue receiver_;
|
||||
};
|
||||
} // end namespace observer
|
||||
} // end namespace oceanbase
|
||||
|
||||
#endif // OCEANBASE_OBSERVER_OB_PARTITION_LOCATION_UPDATER_H_
|
@ -475,6 +475,11 @@ int ObPartitionTableUpdater::do_batch_execute(const int64_t start_time,
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(GCTX.pt_operator_->batch_report_partition_role(replicas, new_role))) {
|
||||
LOG_WARN("fail to batch report partition role", KR(ret), K(replicas));
|
||||
} else if (is_strong_leader(new_role)) {
|
||||
int tmp_ret = submit_broadcast_tasks(replicas);
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
LOG_WARN("submit broadcast_tasks failed", KR(ret), K(replicas));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -829,6 +834,12 @@ int ObPartitionTableUpdater::do_batch_execute(const int64_t start_time, const Ob
|
||||
"do partition table update failed", K(ret), "escape time", ObTimeUtility::current_time() - start_time);
|
||||
} else {
|
||||
success_idx++;
|
||||
if (is_strong_leader(replicas.at(i).role_)) {
|
||||
int tmp_ret = submit_broadcast_tasks(tmp_replicas);
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
LOG_WARN("submit broadcast_tasks failed", KR(ret), K(tmp_replicas));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (replicas.at(0).need_force_full_report() || with_role) {
|
||||
@ -842,6 +853,12 @@ int ObPartitionTableUpdater::do_batch_execute(const int64_t start_time, const Ob
|
||||
ObTimeUtility::current_time() - start_time);
|
||||
} else {
|
||||
success_idx = replicas.count() - 1;
|
||||
if (is_strong_leader(replicas.at(0).role_)) {
|
||||
int tmp_ret = submit_broadcast_tasks(replicas);
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
LOG_WARN("submit broadcast_tasks failed", KR(ret), K(replicas));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(GCTX.pt_operator_->batch_report_with_optimization(replicas, false /*without role*/))) {
|
||||
@ -1133,6 +1150,34 @@ int ObPartitionTableUpdater::do_sync_pt_finish(const int64_t version)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartitionTableUpdater::submit_broadcast_tasks(const common::ObIArray<ObPartitionReplica>& replicas)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (replicas.count() <= 0 || !GCONF.enable_auto_refresh_location_cache) {
|
||||
// skip
|
||||
} else if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", KR(ret));
|
||||
} else if (OB_ISNULL(GCTX.ob_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("observer is null", KR(ret));
|
||||
} else {
|
||||
const int64_t timestamp = ObTimeUtility::current_time();
|
||||
ObPartitionBroadcastTask task;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < replicas.count(); i++) {
|
||||
task.reset();
|
||||
const ObPartitionReplica& replica = replicas.at(i);
|
||||
if (OB_FAIL(task.init(replica.table_id_, replica.partition_id_, replica.partition_cnt_, timestamp))) {
|
||||
LOG_WARN("fail to init task", KR(ret), K(replica));
|
||||
} else if (OB_FAIL(GCTX.ob_service_->submit_broadcast_task(task))) {
|
||||
LOG_WARN("fail to submit broadcast task", KR(ret), K(task));
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("submit broadcast task", KR(ret), K(replicas));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObPartitionTableUpdater::stop()
|
||||
{
|
||||
if (!inited_) {
|
||||
|
@ -83,6 +83,15 @@ public:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
inline bool need_assign_when_equal() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
inline int assign_when_equal(const ObPTUpdateRoleTask& other)
|
||||
{
|
||||
UNUSED(other);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
TO_STRING_KV(K_(pkey), K_(data_version), K_(first_submit_time));
|
||||
|
||||
private:
|
||||
@ -127,6 +136,15 @@ public:
|
||||
}
|
||||
bool is_barrier() const;
|
||||
static bool is_barrier(const common::ObPartitionKey& pkey);
|
||||
inline bool need_assign_when_equal() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
inline int assign_when_equal(const ObPTUpdateTask& other)
|
||||
{
|
||||
UNUSED(other);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
TO_STRING_KV(K_(part_key), K_(data_version), K_(first_submit_time), K_(is_remove), K_(with_role));
|
||||
|
||||
@ -232,6 +250,7 @@ private:
|
||||
const common::ObIArray<share::ObPartitionReplica>& replicas, const bool with_role);
|
||||
int do_batch_execute(const int64_t start_time, const common::ObIArray<ObPTUpdateRoleTask>& tasks,
|
||||
const common::ObIArray<share::ObPartitionReplica>& replicas, const common::ObRole new_role);
|
||||
int submit_broadcast_tasks(const common::ObIArray<share::ObPartitionReplica>& replicas);
|
||||
|
||||
private:
|
||||
bool inited_;
|
||||
|
@ -57,6 +57,15 @@ public:
|
||||
{
|
||||
return add_timestamp_;
|
||||
}
|
||||
inline bool need_assign_when_equal() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
inline int assign_when_equal(const ObPGPartitionMTUpdateTask& other)
|
||||
{
|
||||
UNUSED(other);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
TO_STRING_KV(K_(pkey), K_(add_timestamp), K_(update_type), K_(version));
|
||||
|
||||
private:
|
||||
|
@ -59,6 +59,15 @@ public:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
inline bool need_assign_when_equal() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
inline int assign_when_equal(const ObRebuildFlagReporter& other)
|
||||
{
|
||||
UNUSED(other);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
TO_STRING_KV(K_(part_key), K_(server), K_(rebuild_flag));
|
||||
|
||||
|
@ -1235,6 +1235,18 @@ int ObRpcBatchGetRoleP::process()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRpcBroadcastLocationsP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(gctx_.ob_service_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("invalid argument", K(gctx_.ob_service_), K(ret));
|
||||
} else {
|
||||
ret = gctx_.ob_service_->broadcast_locations(arg_, result_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSyncPGPartitionMTP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -174,6 +174,7 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_GET_MEMBER_LIST_AND_LEADER, ObRpcGetMemberListAndL
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_GET_MEMBER_LIST_AND_LEADER_V2, ObRpcGetMemberListAndLeaderV2P);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_BATCH_GET_MEMBER_LIST_AND_LEADER, ObRpcBatchGetMemberListAndLeaderP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_BATCH_GET_ROLE, ObRpcBatchGetRoleP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_BROADCAST_LOCATIONS, ObRpcBroadcastLocationsP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_BATCH_GET_PROTECTION_LEVEL, ObRpcBatchGetProtectionLevelP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_CHECK_NEED_OFFLINE_REPLICA, ObRpcCheckNeedOffineReplicaP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_CHECK_FLASHBACK_INFO_DUMP, ObRpcCheckFlashbackInfoDumpP);
|
||||
|
@ -235,6 +235,8 @@ int ObServer::init(const ObServerOptions& opts, const ObPLogWriterCfg& log_cfg)
|
||||
LOG_ERROR("init interrupt fail", K(ret));
|
||||
} else if (OB_FAIL(rs_mgr_.init(&rs_rpc_proxy_, &config_, &sql_proxy_))) {
|
||||
LOG_ERROR("init rs_mgr_ failed", K(ret));
|
||||
} else if (OB_FAIL(server_tracer_.init(rs_rpc_proxy_, sql_proxy_))) {
|
||||
LOG_WARN("init server tracer failed", K(ret));
|
||||
} else if (OB_FAIL(init_ob_service())) {
|
||||
LOG_ERROR("init ob service fail", K(ret));
|
||||
} else if (OB_FAIL(init_root_service())) {
|
||||
@ -262,8 +264,6 @@ int ObServer::init(const ObServerOptions& opts, const ObPLogWriterCfg& log_cfg)
|
||||
ObPartitionService::get_instance().get_locality_manager(),
|
||||
config_.cluster_id))) {
|
||||
LOG_WARN("location fetcher init failed", K(ret));
|
||||
} else if (OB_FAIL(server_tracer_.init(rs_rpc_proxy_, sql_proxy_))) {
|
||||
LOG_WARN("init server tracer failed", K(ret));
|
||||
} else if (OB_FAIL(location_cache_.init(schema_service_,
|
||||
config_,
|
||||
server_tracer_,
|
||||
@ -1310,7 +1310,7 @@ int ObServer::init_global_kvcache()
|
||||
int ObServer::init_ob_service()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ob_service_.init(sql_proxy_))) {
|
||||
if (OB_FAIL(ob_service_.init(sql_proxy_, server_tracer_))) {
|
||||
LOG_ERROR("oceanbase service init failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
@ -1679,7 +1679,7 @@ int ObServer::init_gc_partition_adapter()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObServer::get_network_speed_from_sysfs(int64_t &network_speed)
|
||||
int ObServer::get_network_speed_from_sysfs(int64_t& network_speed)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// sys_bkgd_net_percentage_ = config_.sys_bkgd_net_percentage;
|
||||
@ -1715,9 +1715,9 @@ char* strtrim(char* str)
|
||||
return str;
|
||||
}
|
||||
|
||||
static int64_t nic_rate_parse(const char *str, bool &valid)
|
||||
static int64_t nic_rate_parse(const char* str, bool& valid)
|
||||
{
|
||||
char *p_unit = nullptr;
|
||||
char* p_unit = nullptr;
|
||||
int64_t value = 0;
|
||||
|
||||
if (OB_ISNULL(str) || '\0' == str[0]) {
|
||||
@ -1731,22 +1731,15 @@ static int64_t nic_rate_parse(const char *str, bool &valid)
|
||||
valid = false;
|
||||
} else if (value <= 0) {
|
||||
valid = false;
|
||||
} else if (0 == STRCASECMP("bit", p_unit)
|
||||
|| 0 == STRCASECMP("b", p_unit)) {
|
||||
} else if (0 == STRCASECMP("bit", p_unit) || 0 == STRCASECMP("b", p_unit)) {
|
||||
// do nothing
|
||||
} else if (0 == STRCASECMP("kbit", p_unit)
|
||||
|| 0 == STRCASECMP("kb", p_unit)
|
||||
|| 0 == STRCASECMP("k", p_unit)) {
|
||||
} else if (0 == STRCASECMP("kbit", p_unit) || 0 == STRCASECMP("kb", p_unit) || 0 == STRCASECMP("k", p_unit)) {
|
||||
value <<= 10;
|
||||
} else if ('\0' == *p_unit
|
||||
|| 0 == STRCASECMP("mbit", p_unit)
|
||||
|| 0 == STRCASECMP("mb", p_unit)
|
||||
|| 0 == STRCASECMP("m", p_unit)) {
|
||||
} else if ('\0' == *p_unit || 0 == STRCASECMP("mbit", p_unit) || 0 == STRCASECMP("mb", p_unit) ||
|
||||
0 == STRCASECMP("m", p_unit)) {
|
||||
// default is meta bit
|
||||
value <<= 20;
|
||||
} else if (0 == STRCASECMP("gbit", p_unit)
|
||||
|| 0 == STRCASECMP("gb", p_unit)
|
||||
|| 0 == STRCASECMP("g", p_unit)) {
|
||||
} else if (0 == STRCASECMP("gbit", p_unit) || 0 == STRCASECMP("gb", p_unit) || 0 == STRCASECMP("g", p_unit)) {
|
||||
value <<= 30;
|
||||
} else {
|
||||
valid = false;
|
||||
@ -1756,17 +1749,16 @@ static int64_t nic_rate_parse(const char *str, bool &valid)
|
||||
return value;
|
||||
}
|
||||
|
||||
int ObServer::get_network_speed_from_config_file(int64_t &network_speed)
|
||||
int ObServer::get_network_speed_from_config_file(int64_t& network_speed)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const char *nic_rate_path = "etc/nic.rate.config";
|
||||
const int64_t MAX_NIC_CONFIG_FILE_SIZE = 1 << 10; // 1KB
|
||||
FILE *fp = nullptr;
|
||||
char *buf = nullptr;
|
||||
const char* nic_rate_path = "etc/nic.rate.config";
|
||||
const int64_t MAX_NIC_CONFIG_FILE_SIZE = 1 << 10; // 1KB
|
||||
FILE* fp = nullptr;
|
||||
char* buf = nullptr;
|
||||
static int nic_rate_file_exist = 1;
|
||||
|
||||
if (OB_ISNULL(buf = static_cast<char *>(ob_malloc(MAX_NIC_CONFIG_FILE_SIZE + 1,
|
||||
ObModIds::OB_BUFFER)))) {
|
||||
if (OB_ISNULL(buf = static_cast<char*>(ob_malloc(MAX_NIC_CONFIG_FILE_SIZE + 1, ObModIds::OB_BUFFER)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc buffer failed", LITERAL_K(MAX_NIC_CONFIG_FILE_SIZE), K(ret));
|
||||
} else if (OB_ISNULL(fp = fopen(nic_rate_path, "r"))) {
|
||||
@ -1791,7 +1783,7 @@ int ObServer::get_network_speed_from_config_file(int64_t &network_speed)
|
||||
}
|
||||
memset(buf, 0, MAX_NIC_CONFIG_FILE_SIZE + 1);
|
||||
fread(buf, 1, MAX_NIC_CONFIG_FILE_SIZE, fp);
|
||||
char *prate = nullptr;
|
||||
char* prate = nullptr;
|
||||
|
||||
if (OB_UNLIKELY(0 != ferror(fp))) {
|
||||
ret = OB_IO_ERROR;
|
||||
@ -1815,13 +1807,13 @@ int ObServer::get_network_speed_from_config_file(int64_t &network_speed)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("invalid NIC Config file", K(ret));
|
||||
}
|
||||
} // else
|
||||
} // else
|
||||
|
||||
if (OB_UNLIKELY(0 != fclose(fp))) {
|
||||
ret = OB_IO_ERROR;
|
||||
LOG_ERROR("Close NIC Config file failed", K(ret));
|
||||
}
|
||||
} // else
|
||||
} // else
|
||||
if (OB_LIKELY(nullptr != buf)) {
|
||||
ob_free(buf);
|
||||
buf = nullptr;
|
||||
@ -1849,10 +1841,7 @@ int ObServer::init_bandwidth_throttle()
|
||||
if (OB_FAIL(bandwidth_throttle_.init(rate))) {
|
||||
LOG_WARN("failed to init bandwidth throttle", K(ret), K(rate), K(network_speed));
|
||||
} else {
|
||||
LOG_INFO("succeed to init_bandwidth_throttle",
|
||||
K(sys_bkgd_net_percentage_),
|
||||
K(network_speed),
|
||||
K(rate));
|
||||
LOG_INFO("succeed to init_bandwidth_throttle", K(sys_bkgd_net_percentage_), K(network_speed), K(rate));
|
||||
ethernet_speed_ = network_speed;
|
||||
}
|
||||
}
|
||||
@ -1886,8 +1875,10 @@ int ObServer::reload_bandwidth_throttle_limit(int64_t network_speed)
|
||||
LOG_WARN("failed to reset bandwidth throttle", K(ret), K(rate), K(ethernet_speed_));
|
||||
} else {
|
||||
LOG_INFO("succeed to reload_bandwidth_throttle_limit",
|
||||
"old_percentage", sys_bkgd_net_percentage_,
|
||||
"new_percentage", sys_bkgd_net_percentage,
|
||||
"old_percentage",
|
||||
sys_bkgd_net_percentage_,
|
||||
"new_percentage",
|
||||
sys_bkgd_net_percentage,
|
||||
K(network_speed),
|
||||
K(rate));
|
||||
sys_bkgd_net_percentage_ = sys_bkgd_net_percentage;
|
||||
@ -2134,11 +2125,10 @@ int ObServer::refresh_temp_table_sess_active_time()
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObServer::ObRefreshNetworkSpeedTask::ObRefreshNetworkSpeedTask()
|
||||
: obs_(nullptr), is_inited_(false)
|
||||
ObServer::ObRefreshNetworkSpeedTask::ObRefreshNetworkSpeedTask() : obs_(nullptr), is_inited_(false)
|
||||
{}
|
||||
|
||||
int ObServer::ObRefreshNetworkSpeedTask::init(ObServer *obs, int tg_id)
|
||||
int ObServer::ObRefreshNetworkSpeedTask::init(ObServer* obs, int tg_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
@ -2221,11 +2211,11 @@ int ObServer::init_ctas_clean_up_task()
|
||||
|
||||
int ObServer::init_refresh_network_speed_task()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(refresh_network_speed_task_.init(this, lib::TGDefIDs::ServerGTimer))) {
|
||||
LOG_WARN("fail to init refresh network speed task", K(ret));
|
||||
}
|
||||
return ret;
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(refresh_network_speed_task_.init(this, lib::TGDefIDs::ServerGTimer))) {
|
||||
LOG_WARN("fail to init refresh network speed task", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// @@Query cleanup rules for built tables and temporary tables:
|
||||
|
@ -71,6 +71,15 @@ public:
|
||||
{
|
||||
return schema_info_.get_schema_version();
|
||||
}
|
||||
inline bool need_assign_when_equal() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
inline int assign_when_equal(const ObServerSchemaTask& other)
|
||||
{
|
||||
UNUSED(other);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
TO_STRING_KV(K_(type), K_(did_retry), K_(schema_info));
|
||||
|
||||
private:
|
||||
|
@ -55,6 +55,7 @@
|
||||
#include "observer/ob_dump_task_generator.h"
|
||||
#include "observer/ob_server_schema_updater.h"
|
||||
#include "ob_server_event_history_table_operator.h"
|
||||
#include "share/ob_alive_server_tracer.h"
|
||||
|
||||
namespace oceanbase {
|
||||
|
||||
@ -118,6 +119,7 @@ ObService::ObService(const ObGlobalContext& gctx)
|
||||
stopped_(false),
|
||||
schema_updater_(),
|
||||
partition_table_updater_(),
|
||||
partition_location_updater_(),
|
||||
index_status_report_queue_(),
|
||||
rebuild_flag_report_queue_(),
|
||||
pt_checker_(),
|
||||
@ -129,7 +131,7 @@ ObService::ObService(const ObGlobalContext& gctx)
|
||||
ObService::~ObService()
|
||||
{}
|
||||
|
||||
int ObService::init(common::ObMySQLProxy& sql_proxy)
|
||||
int ObService::init(common::ObMySQLProxy& sql_proxy, share::ObIAliveServerTracer& server_tracer)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -148,6 +150,9 @@ int ObService::init(common::ObMySQLProxy& sql_proxy)
|
||||
LOG_ERROR("client_manager_.initialize failed", "self_addr", gctx_.self_addr_, K(ret));
|
||||
} else if (OB_FAIL(partition_table_updater_.init())) {
|
||||
LOG_WARN("init partition table updater failed", K(ret));
|
||||
} else if (OB_FAIL(partition_location_updater_.init(
|
||||
*this, GCTX.par_ser_, GCTX.srv_rpc_proxy_, GCTX.location_cache_, server_tracer))) {
|
||||
LOG_WARN("init partition location updater failed", KR(ret));
|
||||
} else if (OB_FAIL(checksum_updater_.init())) {
|
||||
LOG_WARN("fail to init checksum updater", K(ret));
|
||||
} else if (OB_FAIL(ObPGPartitionMTUpdater::get_instance().init())) {
|
||||
@ -269,6 +274,7 @@ void ObService::stop()
|
||||
stopped_ = true;
|
||||
schema_updater_.stop();
|
||||
partition_table_updater_.stop();
|
||||
partition_location_updater_.stop();
|
||||
checksum_updater_.stop();
|
||||
ObPGPartitionMTUpdater::get_instance().stop();
|
||||
index_status_report_queue_.stop();
|
||||
@ -285,6 +291,7 @@ void ObService::wait()
|
||||
} else {
|
||||
schema_updater_.wait();
|
||||
partition_table_updater_.wait();
|
||||
partition_location_updater_.wait();
|
||||
checksum_updater_.wait();
|
||||
ObPGPartitionMTUpdater::get_instance().wait();
|
||||
index_status_report_queue_.wait();
|
||||
@ -915,8 +922,7 @@ int ObService::submit_pt_update_role_task(const ObPartitionKey& pkey)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObService::submit_pt_update_task(
|
||||
const ObPartitionKey& part_key, const bool need_report_checksum)
|
||||
int ObService::submit_pt_update_task(const ObPartitionKey& part_key, const bool need_report_checksum)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const bool is_remove = false;
|
||||
@ -927,7 +933,7 @@ int ObService::submit_pt_update_task(
|
||||
} else if (!part_key.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(part_key), K(ret));
|
||||
} else if (OB_FAIL(partition_table_updater_.async_update(part_key, false/*with_role*/))) {
|
||||
} else if (OB_FAIL(partition_table_updater_.async_update(part_key, false /*with_role*/))) {
|
||||
LOG_WARN("async_update failed", K(part_key), K(ret));
|
||||
} else if (need_report_checksum) {
|
||||
if (part_key.is_pg()) {
|
||||
@ -2904,8 +2910,7 @@ int ObService::report_replica()
|
||||
// The partition has been deleted. There is no need to trigger the report
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else if (OB_FAIL(submit_pt_update_task(
|
||||
partition->get_partition_key(), true /*need report checksum*/))) {
|
||||
} else if (OB_FAIL(submit_pt_update_task(partition->get_partition_key(), true /*need report checksum*/))) {
|
||||
if (OB_PARTITION_NOT_EXIST == ret) {
|
||||
// The GC thread is already working,
|
||||
// and deleted during traversal, the replica has been deleted needs to be avoided blocking the start process
|
||||
@ -2915,10 +2920,8 @@ int ObService::report_replica()
|
||||
LOG_WARN(
|
||||
"submit partition table update task failed", K(ret), "partition_key", partition->get_partition_key());
|
||||
}
|
||||
} else if (OB_FAIL(submit_pt_update_role_task(
|
||||
partition->get_partition_key()))) {
|
||||
LOG_WARN("fail to submit pt update role task", K(ret),
|
||||
"pkey", partition->get_partition_key());
|
||||
} else if (OB_FAIL(submit_pt_update_role_task(partition->get_partition_key()))) {
|
||||
LOG_WARN("fail to submit pt update role task", K(ret), "pkey", partition->get_partition_key());
|
||||
} else {
|
||||
// Update partition meta table without concern for error codes
|
||||
submit_pg_pt_update_task(pkeys);
|
||||
@ -3584,5 +3587,45 @@ int ObService::broadcast_rs_list(const ObRsListArg& arg)
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int ObService::submit_broadcast_task(const ObPartitionBroadcastTask& task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("service do not init", KR(ret), K(task));
|
||||
} else if (!task.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(task));
|
||||
} else if (OB_FAIL(partition_location_updater_.submit_broadcast_task(task))) {
|
||||
LOG_WARN("submit broadcast task failed", KR(ret), K(task));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObService::broadcast_locations(const obrpc::ObPartitionBroadcastArg& arg, obrpc::ObPartitionBroadcastResult& result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("service do not init", KR(ret), K(arg));
|
||||
} else if (!arg.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(arg));
|
||||
} else {
|
||||
ObPartitionUpdateTask task;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < arg.keys_.count(); i++) {
|
||||
const ObPartitionBroadcastTask& key = arg.keys_.at(i);
|
||||
task.reset();
|
||||
if (OB_FAIL(task.init(key.get_table_id(), key.get_partition_id(), key.get_timestamp()))) {
|
||||
LOG_WARN("fail to init task", KR(ret), K(key));
|
||||
} else if (OB_FAIL(partition_location_updater_.submit_update_task(task))) {
|
||||
LOG_WARN("fail to submit update task", KR(ret), K(task));
|
||||
}
|
||||
}
|
||||
}
|
||||
result.ret_ = ret;
|
||||
LOG_DEBUG("receive broadcast locations", KR(ret), K(arg));
|
||||
return ret;
|
||||
}
|
||||
} // end namespace observer
|
||||
} // end namespace oceanbase
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "observer/ob_lease_state_mgr.h"
|
||||
#include "observer/ob_heartbeat.h"
|
||||
#include "observer/ob_partition_table_updater.h"
|
||||
#include "observer/ob_partition_location_updater.h"
|
||||
#include "observer/ob_sstable_checksum_updater.h"
|
||||
#include "observer/ob_server_schema_updater.h"
|
||||
#include "observer/ob_pg_partition_meta_table_updater.h"
|
||||
@ -38,6 +39,7 @@ namespace share {
|
||||
class ObSSTableDataChecksumItem;
|
||||
class ObSSTableColumnChecksumItem;
|
||||
class ObPGPartitionMTUpdateItem;
|
||||
class ObIAliveServerTracer;
|
||||
} // namespace share
|
||||
namespace storage {
|
||||
class ObFrozenStatus;
|
||||
@ -70,7 +72,7 @@ public:
|
||||
explicit ObService(const ObGlobalContext& gctx);
|
||||
virtual ~ObService();
|
||||
|
||||
int init(common::ObMySQLProxy& sql_proxy);
|
||||
int init(common::ObMySQLProxy& sql_proxy, share::ObIAliveServerTracer& server_tracer);
|
||||
int start();
|
||||
void set_stop();
|
||||
void stop();
|
||||
@ -101,8 +103,8 @@ public:
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
// ObIPartitionReport interface
|
||||
virtual int submit_pt_update_task(const common::ObPartitionKey& part_key,
|
||||
const bool need_report_checksum = true) override;
|
||||
virtual int submit_pt_update_task(
|
||||
const common::ObPartitionKey& part_key, const bool need_report_checksum = true) override;
|
||||
virtual int submit_pt_update_role_task(const common::ObPartitionKey& part_key) override;
|
||||
virtual void submit_pg_pt_update_task(const common::ObPartitionArray& pg_partitions) override;
|
||||
virtual int submit_checksum_update_task(const common::ObPartitionKey& part_key, const uint64_t sstable_id,
|
||||
@ -271,6 +273,8 @@ public:
|
||||
int cancel_sys_task(const share::ObTaskId& task_id);
|
||||
int refresh_memory_stat();
|
||||
int broadcast_rs_list(const obrpc::ObRsListArg& arg);
|
||||
int submit_broadcast_task(const share::ObPartitionBroadcastTask& task);
|
||||
int broadcast_locations(const obrpc::ObPartitionBroadcastArg& arg, obrpc::ObPartitionBroadcastResult& result);
|
||||
////////////////////////////////////////////////////////////////
|
||||
// misc functions
|
||||
int64_t get_partition_table_updater_user_queue_size() const;
|
||||
@ -307,6 +311,7 @@ private:
|
||||
ObServerSchemaUpdater schema_updater_;
|
||||
|
||||
ObPartitionTableUpdater partition_table_updater_;
|
||||
ObPartitionLocationUpdater partition_location_updater_;
|
||||
ObIndexStatusUpdater index_updater_;
|
||||
ObSSTableChecksumUpdater checksum_updater_;
|
||||
ObUniqTaskQueue<ObIndexStatusReporter, ObIndexStatusUpdater> index_status_report_queue_;
|
||||
|
@ -53,6 +53,7 @@ void oceanbase::observer::init_srv_xlator_for_sys(ObSrvRpcXlator* xlator)
|
||||
{
|
||||
RPC_PROCESSOR(ObRpcGetRoleP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcBatchGetRoleP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcBroadcastLocationsP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcGetMasterRSP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcSetConfigP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcGetConfigP, gctx_);
|
||||
|
@ -75,6 +75,15 @@ public:
|
||||
{
|
||||
return add_timestamp_;
|
||||
}
|
||||
inline bool need_assign_when_equal() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
inline int assign_when_equal(const ObSSTableChecksumUpdateTask& other)
|
||||
{
|
||||
UNUSED(other);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
TO_STRING_KV(K_(pkey), K_(sstable_id), K_(sstable_type), K_(is_remove));
|
||||
|
||||
private:
|
||||
|
@ -243,8 +243,17 @@ int ObUniqTaskQueue<Task, Process>::add(const Task& task)
|
||||
const Task* stored_task = NULL;
|
||||
if (OB_FAIL(task_map_.set_refactored(task, task))) {
|
||||
if (common::OB_HASH_EXIST == ret) {
|
||||
ret = common::OB_EAGAIN;
|
||||
SERVER_LOG(TRACE, "same task exist", K(task));
|
||||
if (task.need_assign_when_equal()) {
|
||||
if (NULL == (stored_task = task_map_.get(task))) {
|
||||
ret = common::OB_ERR_SYS;
|
||||
SERVER_LOG(WARN, "get inserted task failed", K(ret), K(task));
|
||||
} else if (OB_FAIL(const_cast<Task*>(stored_task)->assign_when_equal(task))) {
|
||||
SERVER_LOG(WARN, "assign task failed", K(ret), K(task));
|
||||
}
|
||||
} else {
|
||||
ret = common::OB_EAGAIN;
|
||||
SERVER_LOG(TRACE, "same task exist", K(task));
|
||||
}
|
||||
} else {
|
||||
SERVER_LOG(WARN, "insert into hash failed", K(ret), K(task));
|
||||
}
|
||||
|
@ -459,6 +459,8 @@ RPC_F(obrpc::OB_GET_MIN_SSTABLE_SCHEMA_VERSION, obrpc::ObGetMinSSTableSchemaVers
|
||||
RPC_F(obrpc::OB_BATCH_GET_MEMBER_LIST_AND_LEADER, obrpc::ObLocationRpcRenewArg, obrpc::ObLocationRpcRenewResult,
|
||||
ObBatchRpcRenewLocProxy);
|
||||
RPC_F(obrpc::OB_BATCH_GET_ROLE, obrpc::ObBatchGetRoleArg, obrpc::ObBatchGetRoleResult, ObBatchGetRoleProxy);
|
||||
RPC_F(obrpc::OB_BROADCAST_LOCATIONS, obrpc::ObPartitionBroadcastArg, obrpc::ObPartitionBroadcastResult,
|
||||
ObBroadcastLocationProxy);
|
||||
RPC_F(obrpc::OB_BATCH_GET_PROTECTION_LEVEL, obrpc::ObBatchCheckLeaderArg, obrpc::ObBatchCheckRes,
|
||||
ObBatchGetProtectionLevelProxy);
|
||||
RPC_F(obrpc::OB_CHECK_NEED_OFFLINE_REPLICA, obrpc::ObTenantSchemaVersions, obrpc::ObGetPartitionCountResult,
|
||||
|
@ -142,6 +142,7 @@ ob_set_subtarget(ob_share partition_table
|
||||
partition_table/ob_united_pt_operator.cpp
|
||||
partition_table/ob_inmemory_partition_table.cpp
|
||||
partition_table/ob_location_update_task.cpp
|
||||
partition_table/ob_partition_location_task.cpp
|
||||
partition_table/ob_partition_info.cpp
|
||||
partition_table/ob_partition_location.cpp
|
||||
partition_table/ob_partition_location_cache.cpp
|
||||
|
@ -189,6 +189,25 @@ int ObAliveServerMap::refresh_server_list(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAliveServerMap::get_active_server_list(common::ObIArray<common::ObAddr>& addrs) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", KR(ret));
|
||||
} else {
|
||||
addrs.reset();
|
||||
ObLatchRGuard guard(lock_, ObLatchIds::ALIVE_SERVER_TRACER_LOCK);
|
||||
common::hash::ObHashSet<common::ObAddr, common::hash::NoPthreadDefendMode>::const_iterator iter;
|
||||
for (iter = active_servers_.begin(); OB_SUCC(ret) && iter != active_servers_.end(); ++iter) {
|
||||
if (OB_FAIL(addrs.push_back(iter->first))) {
|
||||
LOG_WARN("fail to push back addr", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObAliveServerRefreshTask::ObAliveServerRefreshTask(ObAliveServerTracer& tracer) : tracer_(tracer), is_inited_(false)
|
||||
{}
|
||||
|
||||
@ -371,5 +390,22 @@ int ObAliveServerTracer::get_primary_cluster_id(int64_t& cluster_id) const
|
||||
UNUSED(cluster_id);
|
||||
return OB_NOT_SUPPORTED;
|
||||
}
|
||||
int ObAliveServerTracer::get_active_server_list(common::ObIArray<common::ObAddr>& addrs) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", KR(ret));
|
||||
} else {
|
||||
const ObAliveServerMap* volatile map = cur_map_;
|
||||
if (OB_ISNULL(map)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("null pointer", KR(ret));
|
||||
} else if (OB_FAIL(map->get_active_server_list(addrs))) {
|
||||
LOG_WARN("check server alive failed", KR(ret), K(addrs));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
} // end namespace share
|
||||
} // end namespace oceanbase
|
||||
|
@ -40,6 +40,7 @@ public:
|
||||
const common::ObAddr& addr, bool& alive, bool& is_server_exist, int64_t& trace_time) const = 0;
|
||||
|
||||
virtual int get_primary_cluster_id(int64_t& cluster_id) const = 0;
|
||||
virtual int get_active_server_list(common::ObIArray<common::ObAddr>& addrs) const = 0;
|
||||
};
|
||||
|
||||
class ObAliveServerMap : public ObIAliveServerTracer {
|
||||
@ -61,6 +62,7 @@ public:
|
||||
UNUSED(cluster_id);
|
||||
return common::OB_OP_NOT_ALLOW;
|
||||
}
|
||||
virtual int get_active_server_list(common::ObIArray<common::ObAddr>& addrs) const;
|
||||
|
||||
private:
|
||||
virtual int refresh_server_list(const common::ObIArray<common::ObAddr>& server_list,
|
||||
@ -106,6 +108,7 @@ public:
|
||||
const common::ObAddr& addr, bool& alive, bool& is_server_exist, int64_t& trace_time) const;
|
||||
virtual int get_primary_cluster_id(int64_t& cluster_id) const;
|
||||
virtual int refresh();
|
||||
virtual int get_active_server_list(common::ObIArray<common::ObAddr>& addrs) const;
|
||||
|
||||
private:
|
||||
int refresh_primary_cluster_id();
|
||||
|
@ -4530,5 +4530,35 @@ OB_SERIALIZE_MEMBER(ObDropRestorePointArg, tenant_id_, name_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObCheckBuildIndexTaskExistArg, tenant_id_, task_id_, scheduler_id_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObPartitionBroadcastArg, keys_);
|
||||
bool ObPartitionBroadcastArg::is_valid() const
|
||||
{
|
||||
return keys_.count() > 0;
|
||||
}
|
||||
int ObPartitionBroadcastArg::assign(const ObPartitionBroadcastArg& other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (this == &other) {
|
||||
} else if (OB_FAIL(keys_.assign(other.keys_))) {
|
||||
LOG_WARN("fail to assign keys", KR(ret), K(other));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObPartitionBroadcastResult, ret_);
|
||||
bool ObPartitionBroadcastResult::is_valid() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
int ObPartitionBroadcastResult::assign(const ObPartitionBroadcastResult& other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (this == &other) {
|
||||
} else {
|
||||
ret_ = other.ret_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // end namespace obrpc
|
||||
} // namespace oceanbase
|
||||
|
@ -47,7 +47,8 @@
|
||||
#include "share/restore/ob_restore_args.h" // ObRestoreArgs
|
||||
#include "rootserver/ob_rs_job_table_operator.h"
|
||||
#include "sql/executor/ob_task_id.h"
|
||||
#include "sql/parser/ob_item_type.h" // ObCacheType
|
||||
#include "sql/parser/ob_item_type.h" // ObCacheType
|
||||
#include "share/partition_table/ob_partition_location_task.h" // ObPartitionBroadcastTask
|
||||
|
||||
namespace oceanbase {
|
||||
namespace rootserver {
|
||||
@ -8044,6 +8045,44 @@ public:
|
||||
TO_STRING_KV(K_(tenant_id), K_(task_id), K_(scheduler_id));
|
||||
};
|
||||
|
||||
struct ObPartitionBroadcastArg {
|
||||
OB_UNIS_VERSION(1);
|
||||
|
||||
public:
|
||||
ObPartitionBroadcastArg() : keys_()
|
||||
{}
|
||||
~ObPartitionBroadcastArg()
|
||||
{}
|
||||
bool is_valid() const;
|
||||
int assign(const ObPartitionBroadcastArg& other);
|
||||
TO_STRING_KV(K_(keys));
|
||||
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObPartitionBroadcastArg);
|
||||
|
||||
public:
|
||||
common::ObSEArray<share::ObPartitionBroadcastTask, common::UNIQ_TASK_QUEUE_BATCH_EXECUTE_NUM> keys_;
|
||||
};
|
||||
|
||||
struct ObPartitionBroadcastResult {
|
||||
OB_UNIS_VERSION(1);
|
||||
|
||||
public:
|
||||
ObPartitionBroadcastResult() : ret_(common::OB_SUCCESS)
|
||||
{}
|
||||
~ObPartitionBroadcastResult()
|
||||
{}
|
||||
bool is_valid() const;
|
||||
int assign(const ObPartitionBroadcastResult& other);
|
||||
TO_STRING_KV(K_(ret));
|
||||
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObPartitionBroadcastResult);
|
||||
|
||||
public:
|
||||
int ret_;
|
||||
};
|
||||
|
||||
} // end namespace obrpc
|
||||
} // end namespace oceanbase
|
||||
#endif
|
||||
|
@ -139,6 +139,8 @@ public:
|
||||
RPC_AP(PR3 batch_get_member_list_and_leader, OB_BATCH_GET_MEMBER_LIST_AND_LEADER, (obrpc::ObLocationRpcRenewArg),
|
||||
obrpc::ObLocationRpcRenewResult);
|
||||
RPC_AP(PR3 batch_get_role, OB_BATCH_GET_ROLE, (obrpc::ObBatchGetRoleArg), obrpc::ObBatchGetRoleResult);
|
||||
RPC_AP(PR5 broadcast_locations, OB_BROADCAST_LOCATIONS, (obrpc::ObPartitionBroadcastArg),
|
||||
obrpc::ObPartitionBroadcastResult);
|
||||
RPC_AP(PR5 check_has_need_offline_replica, OB_CHECK_NEED_OFFLINE_REPLICA, (obrpc::ObTenantSchemaVersions),
|
||||
obrpc::ObGetPartitionCountResult);
|
||||
RPC_AP(PR5 check_flashback_info_dump, OB_CHECK_FLASHBACK_INFO_DUMP, (obrpc::ObCheckFlashbackInfoArg),
|
||||
|
@ -69,7 +69,8 @@ DEF_INT(tenant_task_queue_size, OB_CLUSTER_PARAMETER, "65536", "[1024,]",
|
||||
"the size of the task queue for each tenant. Range: [1024,+∞)",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_CAP_WITH_CHECKER(memory_limit, OB_CLUSTER_PARAMETER, "0", common::ObConfigMemoryLimitChecker, "0, [8G,)",
|
||||
"the size of the memory reserved for internal use(for testing purpose), 0 means follow memory_limit_percentage. Range: 0, [8G,)",
|
||||
"the size of the memory reserved for internal use(for testing purpose), 0 means follow memory_limit_percentage. "
|
||||
"Range: 0, [8G,)",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_CAP(rootservice_memory_limit, OB_CLUSTER_PARAMETER, "2G", "[2G,)",
|
||||
"max memory size which can be used by rs tenant The default value is 2G. Range: [2G,)",
|
||||
@ -754,12 +755,11 @@ DEF_INT(clog_max_unconfirmed_log_count, OB_TENANT_PARAMETER, "1500", "[100, 5000
|
||||
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
|
||||
DEF_TIME(_ob_clog_timeout_to_force_switch_leader, OB_CLUSTER_PARAMETER, "10s", "[0s, 60m]",
|
||||
"When log sync is blocking, leader need wait this interval before revoke."
|
||||
"The default value is 0s, use 0s to close this function. Range: [0s, 60m]",
|
||||
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_INT(_ob_clog_disk_buffer_cnt, OB_CLUSTER_PARAMETER, "64", "[1, 2000]",
|
||||
"clog disk buffer cnt. Range: [1, 2000]",
|
||||
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
"When log sync is blocking, leader need wait this interval before revoke."
|
||||
"The default value is 0s, use 0s to close this function. Range: [0s, 60m]",
|
||||
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_INT(_ob_clog_disk_buffer_cnt, OB_CLUSTER_PARAMETER, "64", "[1, 2000]", "clog disk buffer cnt. Range: [1, 2000]",
|
||||
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_TIME(_ob_trans_rpc_timeout, OB_CLUSTER_PARAMETER, "3s", "[0s, 3600s]",
|
||||
"transaction rpc timeout(s). Range: [0s, 3600s]",
|
||||
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
@ -825,6 +825,14 @@ DEF_TIME(location_cache_refresh_sql_timeout, OB_CLUSTER_PARAMETER, "1s", "[1ms,)
|
||||
ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_STR(all_server_list, OB_CLUSTER_PARAMETER, "", "all server addr in cluster",
|
||||
ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_BOOL(enable_auto_refresh_location_cache, OB_CLUSTER_PARAMETER, "False", "enable auto refresh location",
|
||||
ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_INT(auto_refresh_location_cache_rate_limit, OB_CLUSTER_PARAMETER, "1000", "[1, 100000]",
|
||||
"Maximum number of partitions to refresh location automatically per second",
|
||||
ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_INT(auto_broadcast_location_cache_rate_limit, OB_CLUSTER_PARAMETER, "1000", "[1, 100000]",
|
||||
"Maximum number of partitions to broadcast location per second",
|
||||
ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
|
||||
//// cache config
|
||||
DEF_INT(clog_cache_priority, OB_CLUSTER_PARAMETER, "1", "[1,)", "clog cache priority. Range: [1, )",
|
||||
@ -1454,6 +1462,6 @@ DEF_BOOL(_enable_block_file_punch_hole, OB_CLUSTER_PARAMETER, "False",
|
||||
"specifies whether to punch whole when free blocks in block_file",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_BOOL(_ob_enable_px_for_inner_sql, OB_CLUSTER_PARAMETER, "true",
|
||||
"specifies whether inner sql uses px. "
|
||||
"The default value is TRUE. Value: TRUE: turned on FALSE: turned off",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
"specifies whether inner sql uses px. "
|
||||
"The default value is TRUE. Value: TRUE: turned on FALSE: turned off",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
|
@ -272,6 +272,15 @@ public:
|
||||
return type_;
|
||||
}
|
||||
bool need_discard() const;
|
||||
inline bool need_assign_when_equal() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
inline int assign_when_equal(const ObLocationAsyncUpdateTask& other)
|
||||
{
|
||||
UNUSED(other);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
TO_STRING_KV(KT_(table_id), K_(partition_id), K_(add_timestamp), K_(cluster_id), K_(type));
|
||||
|
||||
private:
|
||||
|
264
src/share/partition_table/ob_partition_location_task.cpp
Normal file
264
src/share/partition_table/ob_partition_location_task.cpp
Normal file
@ -0,0 +1,264 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#define USING_LOG_PREFIX SHARE_PT
|
||||
|
||||
#include "lib/time/ob_time_utility.h"
|
||||
#include "share/ob_errno.h"
|
||||
#include "share/ob_task_define.h"
|
||||
#include "share/partition_table/ob_partition_location_task.h"
|
||||
|
||||
namespace oceanbase {
|
||||
using namespace common;
|
||||
namespace share {
|
||||
|
||||
void TSILocationRateLimit::reset()
|
||||
{
|
||||
cnt_ = 0;
|
||||
start_ts_ = OB_INVALID_TIMESTAMP;
|
||||
}
|
||||
|
||||
int64_t TSILocationRateLimit::calc_wait_ts(const int64_t cnt, const int64_t exec_ts, const int64_t frequency)
|
||||
{
|
||||
int64_t wait_ts = 0;
|
||||
int64_t current_ts = ObTimeUtility::current_time();
|
||||
if (current_ts - start_ts_ >= ONE_SECOND_US) { // init or >= 1s
|
||||
cnt_ = cnt;
|
||||
start_ts_ = current_ts - exec_ts;
|
||||
} else {
|
||||
cnt_ += cnt;
|
||||
}
|
||||
if (cnt_ > frequency) {
|
||||
wait_ts = cnt_ / (double)frequency * ONE_SECOND_US - (current_ts - start_ts_);
|
||||
}
|
||||
return wait_ts > 0 ? wait_ts : 0;
|
||||
}
|
||||
|
||||
void TSILocationStatistics::reset()
|
||||
{
|
||||
suc_cnt_ = 0;
|
||||
fail_cnt_ = 0;
|
||||
total_exec_us_ = 0;
|
||||
total_wait_us_ = 0;
|
||||
}
|
||||
|
||||
int64_t TSILocationStatistics::get_total_cnt() const
|
||||
{
|
||||
return suc_cnt_ + fail_cnt_;
|
||||
}
|
||||
|
||||
void TSILocationStatistics::calc(const int ret, const int64_t exec_us, const int64_t wait_us, const int64_t cnt)
|
||||
{
|
||||
total_exec_us_ += static_cast<uint64_t>(exec_us);
|
||||
total_wait_us_ += static_cast<uint64_t>(wait_us);
|
||||
if (OB_SUCCESS == ret) {
|
||||
suc_cnt_ += cnt;
|
||||
} else {
|
||||
fail_cnt_ += cnt;
|
||||
}
|
||||
}
|
||||
|
||||
int ObPartitionBroadcastTask::init(
|
||||
const uint64_t table_id, const int64_t partition_id, const int64_t partition_cnt, const int64_t timestamp)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
table_id_ = table_id;
|
||||
partition_id_ = partition_id;
|
||||
partition_cnt_ = partition_cnt;
|
||||
timestamp_ = timestamp;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObPartitionBroadcastTask::reset()
|
||||
{
|
||||
table_id_ = OB_INVALID_ID;
|
||||
partition_id_ = OB_INVALID_ID;
|
||||
partition_cnt_ = OB_INVALID_ID;
|
||||
timestamp_ = OB_INVALID_TIMESTAMP;
|
||||
}
|
||||
|
||||
bool ObPartitionBroadcastTask::is_valid() const
|
||||
{
|
||||
return OB_INVALID_ID != table_id_ && OB_INVALID_ID != partition_id_ && OB_INVALID_ID != partition_cnt_ &&
|
||||
OB_INVALID_TIMESTAMP != timestamp_;
|
||||
}
|
||||
|
||||
int ObPartitionBroadcastTask::assign(const ObPartitionBroadcastTask& other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (this != &other) {
|
||||
table_id_ = other.table_id_;
|
||||
partition_id_ = other.partition_id_;
|
||||
partition_cnt_ = other.partition_cnt_;
|
||||
timestamp_ = other.timestamp_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObPartitionBroadcastTask::need_process_alone() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t ObPartitionBroadcastTask::hash() const
|
||||
{
|
||||
uint64_t hash_val = 0;
|
||||
hash_val = murmurhash(&table_id_, sizeof(table_id_), hash_val);
|
||||
hash_val = murmurhash(&partition_id_, sizeof(partition_id_), hash_val);
|
||||
hash_val = murmurhash(&partition_cnt_, sizeof(partition_cnt_), hash_val);
|
||||
return hash_val;
|
||||
}
|
||||
|
||||
bool ObPartitionBroadcastTask::operator==(const ObPartitionBroadcastTask& other) const
|
||||
{
|
||||
bool equal = false;
|
||||
if (!is_valid() || !other.is_valid()) {
|
||||
LOG_WARN("invalid argument", "self", *this, K(other));
|
||||
} else if (this == &other) {
|
||||
equal = true;
|
||||
} else {
|
||||
equal = (table_id_ == other.table_id_ && partition_id_ == other.partition_id_ &&
|
||||
partition_cnt_ == other.partition_cnt_);
|
||||
}
|
||||
return equal;
|
||||
}
|
||||
|
||||
bool ObPartitionBroadcastTask::compare_without_version(const ObPartitionBroadcastTask& other) const
|
||||
{
|
||||
return (*this == other);
|
||||
}
|
||||
|
||||
uint64_t ObPartitionBroadcastTask::get_group_id() const
|
||||
{
|
||||
return extract_tenant_id(table_id_);
|
||||
}
|
||||
|
||||
bool ObPartitionBroadcastTask::is_barrier() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ObPartitionBroadcastTask::need_assign_when_equal() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
int ObPartitionBroadcastTask::assign_when_equal(const ObPartitionBroadcastTask& other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (*this == other) {
|
||||
if (other.timestamp_ > timestamp_) {
|
||||
timestamp_ = other.timestamp_;
|
||||
}
|
||||
} else {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("task should be equal", KR(ret), KPC(this), K(other));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObPartitionBroadcastTask, table_id_, partition_id_, timestamp_);
|
||||
|
||||
int ObPartitionUpdateTask::init(const uint64_t table_id, const int64_t partition_id, const int64_t timestamp)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
table_id_ = table_id;
|
||||
partition_id_ = partition_id;
|
||||
timestamp_ = timestamp;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObPartitionUpdateTask::reset()
|
||||
{
|
||||
table_id_ = OB_INVALID_ID;
|
||||
partition_id_ = OB_INVALID_ID;
|
||||
timestamp_ = OB_INVALID_TIMESTAMP;
|
||||
}
|
||||
|
||||
bool ObPartitionUpdateTask::is_valid() const
|
||||
{
|
||||
return OB_INVALID_ID != table_id_ && OB_INVALID_ID != partition_id_ && OB_INVALID_TIMESTAMP != timestamp_;
|
||||
}
|
||||
|
||||
int ObPartitionUpdateTask::assign(const ObPartitionUpdateTask& other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (this != &other) {
|
||||
table_id_ = other.table_id_;
|
||||
partition_id_ = other.partition_id_;
|
||||
timestamp_ = other.timestamp_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObPartitionUpdateTask::need_process_alone() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t ObPartitionUpdateTask::hash() const
|
||||
{
|
||||
uint64_t hash_val = 0;
|
||||
hash_val = murmurhash(&table_id_, sizeof(table_id_), hash_val);
|
||||
hash_val = murmurhash(&partition_id_, sizeof(partition_id_), hash_val);
|
||||
return hash_val;
|
||||
}
|
||||
|
||||
bool ObPartitionUpdateTask::operator==(const ObPartitionUpdateTask& other) const
|
||||
{
|
||||
bool equal = false;
|
||||
if (!is_valid() || !other.is_valid()) {
|
||||
LOG_WARN("invalid argument", "self", *this, K(other));
|
||||
} else if (this == &other) {
|
||||
equal = true;
|
||||
} else {
|
||||
equal = (table_id_ == other.table_id_ && partition_id_ == other.partition_id_);
|
||||
}
|
||||
return equal;
|
||||
}
|
||||
|
||||
bool ObPartitionUpdateTask::compare_without_version(const ObPartitionUpdateTask& other) const
|
||||
{
|
||||
return (*this == other);
|
||||
}
|
||||
|
||||
uint64_t ObPartitionUpdateTask::get_group_id() const
|
||||
{
|
||||
return extract_tenant_id(table_id_);
|
||||
}
|
||||
|
||||
bool ObPartitionUpdateTask::is_barrier() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ObPartitionUpdateTask::need_assign_when_equal() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
int ObPartitionUpdateTask::assign_when_equal(const ObPartitionUpdateTask& other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (*this == other) {
|
||||
if (other.timestamp_ > timestamp_) {
|
||||
timestamp_ = other.timestamp_;
|
||||
}
|
||||
} else {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("task should be equal", KR(ret), KPC(this), K(other));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace share
|
||||
} // namespace oceanbase
|
174
src/share/partition_table/ob_partition_location_task.h
Normal file
174
src/share/partition_table/ob_partition_location_task.h
Normal file
@ -0,0 +1,174 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef OCEANBASE_SHARE_PARTITION_TABLE_OB_PARTITION_LOCATION_TASK_H_
|
||||
#define OCEANBASE_SHARE_PARTITION_TABLE_OB_PARTITION_LOCATION_TASK_H_
|
||||
|
||||
#include "lib/ob_define.h"
|
||||
#include "lib/list/ob_dlink_node.h"
|
||||
#include "lib/oblog/ob_log_module.h"
|
||||
#include "lib/utility/ob_print_utils.h"
|
||||
#include "lib/utility/ob_unify_serialize.h"
|
||||
|
||||
namespace oceanbase {
|
||||
namespace share {
|
||||
struct TSILocationRateLimit {
|
||||
public:
|
||||
const int64_t ONE_SECOND_US = 1 * 1000 * 1000L; // 1s
|
||||
public:
|
||||
TSILocationRateLimit() : cnt_(0), start_ts_(common::OB_INVALID_TIMESTAMP)
|
||||
{}
|
||||
~TSILocationRateLimit()
|
||||
{}
|
||||
void reset();
|
||||
int64_t calc_wait_ts(const int64_t cnt, const int64_t exec_ts, const int64_t frequency);
|
||||
TO_STRING_KV(K_(cnt), K_(start_ts));
|
||||
|
||||
public:
|
||||
int64_t cnt_;
|
||||
int64_t start_ts_;
|
||||
};
|
||||
|
||||
struct TSILocationStatistics {
|
||||
public:
|
||||
TSILocationStatistics() : suc_cnt_(0), fail_cnt_(0), total_exec_us_(0), total_wait_us_(0)
|
||||
{}
|
||||
~TSILocationStatistics()
|
||||
{}
|
||||
void reset();
|
||||
void calc(const int ret, const int64_t exec_us, const int64_t wait_us, const int64_t cnt);
|
||||
int64_t get_total_cnt() const;
|
||||
TO_STRING_KV(K_(suc_cnt), K_(fail_cnt), K_(total_exec_us), K_(total_wait_us));
|
||||
|
||||
public:
|
||||
int64_t suc_cnt_;
|
||||
int64_t fail_cnt_;
|
||||
uint64_t total_exec_us_;
|
||||
uint64_t total_wait_us_;
|
||||
};
|
||||
|
||||
// For Sender of ObPartitionLocationUpdater
|
||||
class ObPartitionBroadcastTask : public common::ObDLinkBase<ObPartitionBroadcastTask> {
|
||||
public:
|
||||
OB_UNIS_VERSION(1);
|
||||
friend class ObPartitionLocationUpdater;
|
||||
|
||||
public:
|
||||
ObPartitionBroadcastTask()
|
||||
: table_id_(common::OB_INVALID_ID),
|
||||
partition_id_(common::OB_INVALID_ID),
|
||||
partition_cnt_(common::OB_INVALID_ID),
|
||||
timestamp_(common::OB_INVALID_TIMESTAMP)
|
||||
{}
|
||||
explicit ObPartitionBroadcastTask(
|
||||
const int64_t table_id, const int64_t partition_id, const int64_t partition_cnt, const int64_t timestamp)
|
||||
: table_id_(table_id), partition_id_(partition_id), partition_cnt_(partition_cnt), timestamp_(timestamp)
|
||||
{}
|
||||
virtual ~ObPartitionBroadcastTask()
|
||||
{}
|
||||
|
||||
int init(const uint64_t table_id, const int64_t partition_id, const int64_t partition_cnt, const int64_t timestamp);
|
||||
int assign(const ObPartitionBroadcastTask& other);
|
||||
void reset();
|
||||
bool is_valid() const;
|
||||
|
||||
virtual int64_t hash() const;
|
||||
virtual bool operator==(const ObPartitionBroadcastTask& other) const;
|
||||
|
||||
uint64_t get_group_id() const;
|
||||
bool is_barrier() const;
|
||||
bool need_process_alone() const;
|
||||
virtual bool compare_without_version(const ObPartitionBroadcastTask& other) const;
|
||||
bool need_assign_when_equal() const;
|
||||
int assign_when_equal(const ObPartitionBroadcastTask& other);
|
||||
|
||||
uint64_t get_table_id() const
|
||||
{
|
||||
return table_id_;
|
||||
}
|
||||
int64_t get_partition_id() const
|
||||
{
|
||||
return partition_id_;
|
||||
}
|
||||
int64_t get_partition_cnt() const
|
||||
{
|
||||
return partition_cnt_;
|
||||
}
|
||||
int64_t get_timestamp() const
|
||||
{
|
||||
return timestamp_;
|
||||
}
|
||||
|
||||
TO_STRING_KV(K_(table_id), K_(partition_id), K_(partition_cnt), K_(timestamp));
|
||||
|
||||
private:
|
||||
uint64_t table_id_;
|
||||
int64_t partition_id_;
|
||||
int64_t partition_cnt_; // won't serialize/deserialize
|
||||
int64_t timestamp_;
|
||||
};
|
||||
|
||||
// For Receiver of ObPartitionLocationUpdater
|
||||
class ObPartitionUpdateTask : public common::ObDLinkBase<ObPartitionUpdateTask> {
|
||||
public:
|
||||
friend class ObPartitionLocationUpdater;
|
||||
|
||||
public:
|
||||
ObPartitionUpdateTask()
|
||||
: table_id_(common::OB_INVALID_ID), partition_id_(common::OB_INVALID_ID), timestamp_(common::OB_INVALID_TIMESTAMP)
|
||||
{}
|
||||
explicit ObPartitionUpdateTask(const int64_t table_id, const int64_t partition_id, const int64_t timestamp)
|
||||
: table_id_(table_id), partition_id_(partition_id), timestamp_(timestamp)
|
||||
{}
|
||||
virtual ~ObPartitionUpdateTask()
|
||||
{}
|
||||
|
||||
int init(const uint64_t table_id, const int64_t partition_id, const int64_t timestamp);
|
||||
void reset();
|
||||
bool is_valid() const;
|
||||
int assign(const ObPartitionUpdateTask& other);
|
||||
|
||||
virtual int64_t hash() const;
|
||||
virtual bool operator==(const ObPartitionUpdateTask& other) const;
|
||||
|
||||
uint64_t get_group_id() const;
|
||||
bool is_barrier() const;
|
||||
bool need_process_alone() const;
|
||||
virtual bool compare_without_version(const ObPartitionUpdateTask& other) const;
|
||||
bool need_assign_when_equal() const;
|
||||
int assign_when_equal(const ObPartitionUpdateTask& other);
|
||||
|
||||
uint64_t get_table_id() const
|
||||
{
|
||||
return table_id_;
|
||||
}
|
||||
int64_t get_partition_id() const
|
||||
{
|
||||
return partition_id_;
|
||||
}
|
||||
int64_t get_timestamp() const
|
||||
{
|
||||
return timestamp_;
|
||||
}
|
||||
|
||||
TO_STRING_KV(K_(table_id), K_(partition_id), K_(timestamp));
|
||||
|
||||
private:
|
||||
uint64_t table_id_;
|
||||
int64_t partition_id_;
|
||||
int64_t timestamp_;
|
||||
};
|
||||
|
||||
} // namespace share
|
||||
} // namespace oceanbase
|
||||
|
||||
#endif
|
Loading…
x
Reference in New Issue
Block a user