diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index ef8381b07..60e448573 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -569,6 +569,7 @@ PCODE_DEF(OB_UPDATE_STANDBY_CLUSTER_INFO, 0x913) PCODE_DEF(OB_CHECK_NEED_OFFLINE_REPLICA, 0x914) PCODE_DEF(OB_GET_MEMBER_LIST_AND_LEADER_V2, 0x915) PCODE_DEF(OB_CHECK_FLASHBACK_INFO_DUMP, 0x916) +PCODE_DEF(OB_BROADCAST_LOCATIONS, 0x917) PCODE_DEF(OB_RPC_ASSEMBLE, 0x1000) diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index 31e3644e3..e726e29f1 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -37,6 +37,7 @@ ob_set_subtarget(ob_server common ob_lease_state_mgr.cpp ob_partition_table_checker.cpp ob_partition_table_updater.cpp + ob_partition_location_updater.cpp ob_rebuild_flag_reporter.cpp ob_root_service_monitor.cpp ob_rpc_extra_payload.cpp diff --git a/src/observer/ob_index_status_reporter.h b/src/observer/ob_index_status_reporter.h index 542a2810c..4d6e49036 100644 --- a/src/observer/ob_index_status_reporter.h +++ b/src/observer/ob_index_status_reporter.h @@ -65,6 +65,16 @@ public: return false; } + inline bool need_assign_when_equal() const + { + return false; + } + inline int assign_when_equal(const ObIndexStatusReporter& other) + { + UNUSED(other); + return common::OB_NOT_SUPPORTED; + } + TO_STRING_KV(K_(part_key), K_(self), K_(index_table_id), K_(index_status), K_(ret_code)); private: diff --git a/src/observer/ob_partition_location_updater.cpp b/src/observer/ob_partition_location_updater.cpp new file mode 100644 index 000000000..8c6b15418 --- /dev/null +++ b/src/observer/ob_partition_location_updater.cpp @@ -0,0 +1,319 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SERVER + +#include "common/ob_partition_key.h" +#include "common/ob_role.h" +#include "lib/thread_local/ob_tsi_factory.h" +#include "rootserver/ob_rs_async_rpc_proxy.h" +#include "share/config/ob_server_config.h" +#include "share/partition_table/ob_partition_location_cache.h" +#include "share/ob_alive_server_tracer.h" +#include "storage/ob_partition_service.h" +#include "observer/ob_service.h" +#include "observer/ob_partition_location_updater.h" + +namespace oceanbase { +namespace observer { + +using namespace common; +using namespace share; + +static const char* location_queue_type[] = {"LOCATION_SENDER", "LOCATION_RECEIVER", "LOCATION_INVALID"}; + +int ObPartitionLocationUpdater::init(observer::ObService& ob_service, storage::ObPartitionService*& partition_service, + obrpc::ObSrvRpcProxy*& srv_rpc_proxy, share::ObPartitionLocationCache*& location_cache, + share::ObIAliveServerTracer& server_tracer) +{ + int ret = OB_SUCCESS; + static_assert( + QueueType::MAX_TYPE == ARRAYSIZEOF(location_queue_type) - 1, "type str array size mismatch with type cnt"); + if (inited_) { + ret = OB_INIT_TWICE; + LOG_WARN("inited twice", KR(ret)); + } else if (OB_ISNULL(partition_service) || OB_ISNULL(srv_rpc_proxy) || OB_ISNULL(location_cache)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("ptr is null", KR(ret), KP(partition_service), KP(srv_rpc_proxy), KP(location_cache)); + } else { + queue_size_ = !lib::is_mini_mode() ? MAX_PARTITION_CNT : MINI_MODE_MAX_PARTITION_CNT; + thread_cnt_ = !lib::is_mini_mode() ? GCONF.location_refresh_thread_count : UPDATER_THREAD_CNT; + if (OB_FAIL(sender_.init(this, thread_cnt_, queue_size_, "PTSender"))) { + LOG_WARN("init sender updater queue failed", KR(ret), K_(queue_size), K_(thread_cnt)); + } else if (OB_FAIL(receiver_.init(this, thread_cnt_, queue_size_, "PTReceiver"))) { + LOG_WARN("init receiver updater queue failed", KR(ret), K_(queue_size), K_(thread_cnt)); + } else { + ob_service_ = &ob_service; + partition_service_ = partition_service; + srv_rpc_proxy_ = srv_rpc_proxy; + location_cache_ = location_cache; + server_tracer_ = &server_tracer; + inited_ = true; + } + } + return ret; +} + +int ObPartitionLocationUpdater::check_inner_stat() const +{ + int ret = OB_SUCCESS; + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("updater is not inited yet", KR(ret)); + } else if (stopped_) { + ret = OB_CANCELED; + LOG_WARN("updater is stopped now", KR(ret)); + } + return ret; +} + +void ObPartitionLocationUpdater::stop() +{ + stopped_ = true; + sender_.stop(); + receiver_.stop(); +} + +void ObPartitionLocationUpdater::wait() +{ + if (stopped_) { + sender_.wait(); + receiver_.wait(); + } +} + +void ObPartitionLocationUpdater::destroy() +{ + stop(); + wait(); +} + +int ObPartitionLocationUpdater::submit_broadcast_task(const ObPartitionBroadcastTask& task) +{ + int ret = OB_SUCCESS; + if (!GCONF.enable_auto_refresh_location_cache) { + // skip + } else if (OB_FAIL(check_inner_stat())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (OB_FAIL(sender_.add(task))) { + if (OB_EAGAIN == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to add task", KR(ret), K(task)); + } + } + return ret; +} + +int ObPartitionLocationUpdater::submit_update_task(const ObPartitionUpdateTask& task) +{ + int ret = OB_SUCCESS; + if (!GCONF.enable_auto_refresh_location_cache) { + // skip + } else if (OB_FAIL(check_inner_stat())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (OB_FAIL(receiver_.add(task))) { + if (OB_EAGAIN == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to add task", KR(ret), K(task)); + } + } + return ret; +} + +int ObPartitionLocationUpdater::process_barrier(const ObPartitionBroadcastTask& task, bool& stopped) +{ + UNUSED(task); + UNUSED(stopped); + return OB_NOT_SUPPORTED; +} + +int ObPartitionLocationUpdater::process_barrier(const ObPartitionUpdateTask& task, bool& stopped) +{ + UNUSED(task); + UNUSED(stopped); + return OB_NOT_SUPPORTED; +} + +int ObPartitionLocationUpdater::batch_process_tasks( + const ObIArray& batch_tasks, bool& stopped) +{ + int ret = OB_SUCCESS; + ObCurTraceId::init(GCONF.self_addr_); + obrpc::ObPartitionBroadcastArg arg; + if (OB_FAIL(check_inner_stat())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (stopped) { + ret = OB_CANCELED; + LOG_WARN("updater is is stopped", KR(ret), K(stopped)); + } else if (!GCONF.enable_auto_refresh_location_cache) { + // skip + } else if (OB_UNLIKELY(batch_tasks.count() <= 0)) { + // skip + } else { + int64_t start_ts = ObTimeUtility::current_time(); + for (int64_t i = 0; OB_SUCC(ret) && i < batch_tasks.count(); i++) { + const ObPartitionBroadcastTask& task = batch_tasks.at(i); + ObPartitionKey pkey; + ObRole role = FOLLOWER; + if (OB_FAIL(pkey.init(task.get_table_id(), task.get_partition_id(), task.get_partition_cnt()))) { + LOG_WARN("init pkey failed", KR(ret), K(task)); + } else if (OB_FAIL(partition_service_->get_role(pkey, role))) { + if (OB_ENTRY_NOT_EXIST == ret || OB_PARTITION_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to get role", KR(ret), K(pkey)); + } + } else if (is_strong_leader(role)) { + if (OB_FAIL(arg.keys_.push_back(task))) { + LOG_WARN("fail to push back task", KR(ret), K(task)); + } + } + LOG_DEBUG("broadcast task is", KR(ret), K(task), K(pkey), K(role)); + } // end for + + if (OB_SUCC(ret) && arg.keys_.count() > 0) { + rootserver::ObBroadcastLocationProxy proxy(*srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::broadcast_locations); + ObArray alive_servers; + const int64_t timeout = GCONF.location_cache_refresh_rpc_timeout; + if (OB_FAIL(server_tracer_->get_active_server_list(alive_servers))) { + LOG_WARN("fail to get alive server list", KR(ret)); + } + + for (int64_t i = 0; OB_SUCC(ret) && i < alive_servers.count(); i++) { + ObAddr& addr = alive_servers.at(i); + if (OB_FAIL(proxy.call(addr, timeout, arg))) { + LOG_WARN("fail to call addr", KR(ret), K(addr), K(timeout), K(arg)); + } + } // end for + + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = proxy.wait())) { + LOG_WARN("fail to wait rpc callback", KR(tmp_ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + + LOG_DEBUG("try broadcast location", K(arg)); + + int64_t exec_ts = ObTimeUtility::current_time() - start_ts; + int64_t wait_ts = 0; + (void)control_rate_limit( + QueueType::SENDER, exec_ts, arg.keys_.count(), GCONF.auto_broadcast_location_cache_rate_limit, wait_ts); + (void)dump_statistic(QueueType::SENDER, ret, exec_ts, wait_ts, arg.keys_.count()); + } + } + return ret; +} + +int ObPartitionLocationUpdater::batch_process_tasks(const ObIArray& batch_tasks, bool& stopped) +{ + int ret = OB_SUCCESS; + ObCurTraceId::init(GCONF.self_addr_); + if (OB_FAIL(check_inner_stat())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (stopped) { + ret = OB_CANCELED; + LOG_WARN("updater is is stopped", KR(ret), K(stopped)); + } else if (!GCONF.enable_auto_refresh_location_cache) { + // skip + } else if (OB_UNLIKELY(batch_tasks.count() <= 0)) { + // skip + } else { + int64_t start_ts = ObTimeUtility::current_time(); + int64_t renew_cnt = 0; + LOG_DEBUG("try renew location", K(batch_tasks)); + for (int64_t i = 0; OB_SUCC(ret) && i < batch_tasks.count(); i++) { + const ObPartitionUpdateTask& task = batch_tasks.at(i); + ObPartitionKey pkey; + ObPartitionLocation location; + if (OB_FAIL(pkey.init(task.get_table_id(), task.get_partition_id(), 0 /*partition_cnt, no used here*/))) { + LOG_WARN("init pkey failed", KR(ret), K(task)); + } else if (OB_FAIL(location_cache_->nonblock_get(pkey, location))) { + if (OB_LOCATION_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to get location", KR(ret), K(pkey)); + } + } else if (task.get_timestamp() > location.get_renew_time()) { + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = location_cache_->nonblock_renew(pkey, 0 /*expire_renew_time*/))) { + LOG_WARN("nonblock renew failed", KR(tmp_ret), K(pkey)); + } else { + renew_cnt++; + LOG_DEBUG("try renew location", K(pkey), K(task), K(location)); + } + } + } + int64_t exec_ts = ObTimeUtility::current_time() - start_ts; + int64_t wait_ts = 0; + (void)control_rate_limit( + QueueType::RECEIVER, exec_ts, renew_cnt, GCONF.auto_refresh_location_cache_rate_limit, wait_ts); + (void)dump_statistic(QueueType::RECEIVER, ret, exec_ts, wait_ts, renew_cnt); + } + return ret; +} + +void ObPartitionLocationUpdater::dump_statistic( + const QueueType queue_type, const int exec_ret, const int64_t exec_ts, const int64_t wait_ts, const int64_t cnt) +{ + TSILocationStatistics* statistics = GET_TSI(TSILocationStatistics); + if (OB_ISNULL(statistics)) { + LOG_WARN("fail to get statistic", "ret", OB_ERR_UNEXPECTED); + } else { + // calc statistic + (void)statistics->calc(exec_ret, exec_ts, wait_ts, cnt); + int64_t total_cnt = statistics->get_total_cnt(); + if (TC_REACH_TIME_INTERVAL(CHECK_INTERVAL_US) && total_cnt > 0) { + QueueType type = + (QueueType::SENDER <= queue_type && queue_type <= QueueType::RECEIVER) ? queue_type : QueueType::MAX_TYPE; + ObTaskController::get().allow_next_syslog(); + LOG_INFO("[LOCATION_STATISTIC] auto refresh location statistics", + "queue_type", + location_queue_type[type], + KPC(statistics), + "avg_exec_us", + statistics->total_exec_us_ / total_cnt, + "avg_wait_us", + statistics->total_wait_us_ / total_cnt); + (void)statistics->reset(); + } + } +} + +void ObPartitionLocationUpdater::control_rate_limit( + const QueueType queue_type, const int64_t exec_ts, const int64_t cnt, int64_t rate_limit_conf, int64_t& wait_ts) +{ + wait_ts = 0; + TSILocationRateLimit* info = GET_TSI(TSILocationRateLimit); + if (OB_ISNULL(info)) { + LOG_WARN("fail to get info", "ret", OB_ERR_UNEXPECTED); + } else { + int64_t rate_limit = max(rate_limit_conf / thread_cnt_, 1); + wait_ts = info->calc_wait_ts(cnt, exec_ts, rate_limit); + if (wait_ts > 0) { + QueueType type = + (QueueType::SENDER <= queue_type && queue_type <= QueueType::RECEIVER) ? queue_type : QueueType::MAX_TYPE; + ObTaskController::get().allow_next_syslog(); + LOG_INFO("[LOCATION_STATISTIC] rate limit", + "queue_type", + location_queue_type[type], + KPC(info), + K(rate_limit), + K(wait_ts)); + usleep(wait_ts); + } + } +} + +} // end namespace observer +} // end namespace oceanbase diff --git a/src/observer/ob_partition_location_updater.h b/src/observer/ob_partition_location_updater.h new file mode 100644 index 000000000..d4fce74ed --- /dev/null +++ b/src/observer/ob_partition_location_updater.h @@ -0,0 +1,104 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_OBSERVER_OB_PARTITION_LOCATION_UPDATER_H_ +#define OCEANBASE_OBSERVER_OB_PARTITION_LOCATION_UPDATER_H_ + +#include "share/ob_srv_rpc_proxy.h" +#include "observer/ob_uniq_task_queue.h" +#include "share/partition_table/ob_partition_location_task.h" +namespace oceanbase { +namespace storage { +class ObIAliveServerTracer; +class ObPartitionService; +} // namespace storage +namespace share { +class ObPartitionLocationCache; +} +namespace obrpc { +class ObSrvRpcProxy; +} +namespace observer { +class ObPartitionLocationUpdater; +class ObService; + +typedef ObUniqTaskQueue ObPTSenderQueue; +typedef ObUniqTaskQueue ObPTReceiverQueue; + +class ObPartitionLocationUpdater { +public: + const static int64_t UPDATER_THREAD_CNT = 1; + const static int64_t MINI_MODE_MAX_PARTITION_CNT = 10000; + const static int64_t MAX_PARTITION_CNT = 1000000; + const static int64_t CHECK_INTERVAL_US = 1 * 1000 * 1000L; // 1s + + enum QueueType { SENDER = 0, RECEIVER = 1, MAX_TYPE = 2 }; + + ObPartitionLocationUpdater() + : inited_(false), + stopped_(false), + thread_cnt_(UPDATER_THREAD_CNT), + queue_size_(MINI_MODE_MAX_PARTITION_CNT), + ob_service_(NULL), + partition_service_(NULL), + srv_rpc_proxy_(NULL), + location_cache_(NULL), + server_tracer_(NULL), + sender_(), + receiver_() + {} + virtual ~ObPartitionLocationUpdater() + { + destroy(); + } + + int init(observer::ObService& ob_service, storage::ObPartitionService*& partition_service, + obrpc::ObSrvRpcProxy*& srv_rpc_proxy, share::ObPartitionLocationCache*& location_cache, + share::ObIAliveServerTracer& server_tracer); + + void stop(); + void wait(); + void destroy(); + + virtual int submit_broadcast_task(const share::ObPartitionBroadcastTask& task); + virtual int submit_update_task(const share::ObPartitionUpdateTask& task); + + virtual int process_barrier(const share::ObPartitionBroadcastTask& task, bool& stopped); + virtual int process_barrier(const share::ObPartitionUpdateTask& task, bool& stopped); + + virtual int batch_process_tasks(const common::ObIArray& tasks, bool& stopped); + virtual int batch_process_tasks(const common::ObIArray& tasks, bool& stopped); + +private: + int check_inner_stat() const; + void dump_statistic( + const QueueType queue_type, const int exec_ret, const int64_t exec_ts, const int64_t wait_ts, const int64_t cnt); + void control_rate_limit(const QueueType queue_type, const int64_t exec_ts, const int64_t cnt, + const int64_t rate_limit_conf, int64_t& wait_ts); + +private: + bool inited_; + bool stopped_; + int64_t thread_cnt_; + int64_t queue_size_; + observer::ObService* ob_service_; + storage::ObPartitionService* partition_service_; + obrpc::ObSrvRpcProxy* srv_rpc_proxy_; + share::ObPartitionLocationCache* location_cache_; + share::ObIAliveServerTracer* server_tracer_; + ObPTSenderQueue sender_; + ObPTReceiverQueue receiver_; +}; +} // end namespace observer +} // end namespace oceanbase + +#endif // OCEANBASE_OBSERVER_OB_PARTITION_LOCATION_UPDATER_H_ diff --git a/src/observer/ob_partition_table_updater.cpp b/src/observer/ob_partition_table_updater.cpp index 728cc32a5..01446a831 100644 --- a/src/observer/ob_partition_table_updater.cpp +++ b/src/observer/ob_partition_table_updater.cpp @@ -475,6 +475,11 @@ int ObPartitionTableUpdater::do_batch_execute(const int64_t start_time, if (OB_SUCC(ret)) { if (OB_FAIL(GCTX.pt_operator_->batch_report_partition_role(replicas, new_role))) { LOG_WARN("fail to batch report partition role", KR(ret), K(replicas)); + } else if (is_strong_leader(new_role)) { + int tmp_ret = submit_broadcast_tasks(replicas); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("submit broadcast_tasks failed", KR(ret), K(replicas)); + } } } if (OB_FAIL(ret)) { @@ -829,6 +834,12 @@ int ObPartitionTableUpdater::do_batch_execute(const int64_t start_time, const Ob "do partition table update failed", K(ret), "escape time", ObTimeUtility::current_time() - start_time); } else { success_idx++; + if (is_strong_leader(replicas.at(i).role_)) { + int tmp_ret = submit_broadcast_tasks(tmp_replicas); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("submit broadcast_tasks failed", KR(ret), K(tmp_replicas)); + } + } } } } else if (replicas.at(0).need_force_full_report() || with_role) { @@ -842,6 +853,12 @@ int ObPartitionTableUpdater::do_batch_execute(const int64_t start_time, const Ob ObTimeUtility::current_time() - start_time); } else { success_idx = replicas.count() - 1; + if (is_strong_leader(replicas.at(0).role_)) { + int tmp_ret = submit_broadcast_tasks(replicas); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("submit broadcast_tasks failed", KR(ret), K(replicas)); + } + } } } else { if (OB_FAIL(GCTX.pt_operator_->batch_report_with_optimization(replicas, false /*without role*/))) { @@ -1133,6 +1150,34 @@ int ObPartitionTableUpdater::do_sync_pt_finish(const int64_t version) return ret; } +int ObPartitionTableUpdater::submit_broadcast_tasks(const common::ObIArray& replicas) +{ + int ret = OB_SUCCESS; + if (replicas.count() <= 0 || !GCONF.enable_auto_refresh_location_cache) { + // skip + } else if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_ISNULL(GCTX.ob_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("observer is null", KR(ret)); + } else { + const int64_t timestamp = ObTimeUtility::current_time(); + ObPartitionBroadcastTask task; + for (int64_t i = 0; OB_SUCC(ret) && i < replicas.count(); i++) { + task.reset(); + const ObPartitionReplica& replica = replicas.at(i); + if (OB_FAIL(task.init(replica.table_id_, replica.partition_id_, replica.partition_cnt_, timestamp))) { + LOG_WARN("fail to init task", KR(ret), K(replica)); + } else if (OB_FAIL(GCTX.ob_service_->submit_broadcast_task(task))) { + LOG_WARN("fail to submit broadcast task", KR(ret), K(task)); + } + } + LOG_DEBUG("submit broadcast task", KR(ret), K(replicas)); + } + return ret; +} + void ObPartitionTableUpdater::stop() { if (!inited_) { diff --git a/src/observer/ob_partition_table_updater.h b/src/observer/ob_partition_table_updater.h index c9ddd1543..cfa028ac4 100644 --- a/src/observer/ob_partition_table_updater.h +++ b/src/observer/ob_partition_table_updater.h @@ -83,6 +83,15 @@ public: { return false; } + inline bool need_assign_when_equal() const + { + return false; + } + inline int assign_when_equal(const ObPTUpdateRoleTask& other) + { + UNUSED(other); + return common::OB_NOT_SUPPORTED; + } TO_STRING_KV(K_(pkey), K_(data_version), K_(first_submit_time)); private: @@ -127,6 +136,15 @@ public: } bool is_barrier() const; static bool is_barrier(const common::ObPartitionKey& pkey); + inline bool need_assign_when_equal() const + { + return false; + } + inline int assign_when_equal(const ObPTUpdateTask& other) + { + UNUSED(other); + return common::OB_NOT_SUPPORTED; + } TO_STRING_KV(K_(part_key), K_(data_version), K_(first_submit_time), K_(is_remove), K_(with_role)); @@ -232,6 +250,7 @@ private: const common::ObIArray& replicas, const bool with_role); int do_batch_execute(const int64_t start_time, const common::ObIArray& tasks, const common::ObIArray& replicas, const common::ObRole new_role); + int submit_broadcast_tasks(const common::ObIArray& replicas); private: bool inited_; diff --git a/src/observer/ob_pg_partition_meta_table_updater.h b/src/observer/ob_pg_partition_meta_table_updater.h index ad7de2f1a..c5348ff3f 100644 --- a/src/observer/ob_pg_partition_meta_table_updater.h +++ b/src/observer/ob_pg_partition_meta_table_updater.h @@ -57,6 +57,15 @@ public: { return add_timestamp_; } + inline bool need_assign_when_equal() const + { + return false; + } + inline int assign_when_equal(const ObPGPartitionMTUpdateTask& other) + { + UNUSED(other); + return common::OB_NOT_SUPPORTED; + } TO_STRING_KV(K_(pkey), K_(add_timestamp), K_(update_type), K_(version)); private: diff --git a/src/observer/ob_rebuild_flag_reporter.h b/src/observer/ob_rebuild_flag_reporter.h index 9571d9c0e..12a2858e4 100644 --- a/src/observer/ob_rebuild_flag_reporter.h +++ b/src/observer/ob_rebuild_flag_reporter.h @@ -59,6 +59,15 @@ public: { return false; } + inline bool need_assign_when_equal() const + { + return false; + } + inline int assign_when_equal(const ObRebuildFlagReporter& other) + { + UNUSED(other); + return common::OB_NOT_SUPPORTED; + } TO_STRING_KV(K_(part_key), K_(server), K_(rebuild_flag)); diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 461ac9082..b4032fccf 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -1235,6 +1235,18 @@ int ObRpcBatchGetRoleP::process() return ret; } +int ObRpcBroadcastLocationsP::process() +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(gctx_.ob_service_)) { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid argument", K(gctx_.ob_service_), K(ret)); + } else { + ret = gctx_.ob_service_->broadcast_locations(arg_, result_); + } + return ret; +} + int ObSyncPGPartitionMTP::process() { int ret = OB_SUCCESS; diff --git a/src/observer/ob_rpc_processor_simple.h b/src/observer/ob_rpc_processor_simple.h index 4feda05b8..c53508937 100644 --- a/src/observer/ob_rpc_processor_simple.h +++ b/src/observer/ob_rpc_processor_simple.h @@ -174,6 +174,7 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_GET_MEMBER_LIST_AND_LEADER, ObRpcGetMemberListAndL OB_DEFINE_PROCESSOR_S(Srv, OB_GET_MEMBER_LIST_AND_LEADER_V2, ObRpcGetMemberListAndLeaderV2P); OB_DEFINE_PROCESSOR_S(Srv, OB_BATCH_GET_MEMBER_LIST_AND_LEADER, ObRpcBatchGetMemberListAndLeaderP); OB_DEFINE_PROCESSOR_S(Srv, OB_BATCH_GET_ROLE, ObRpcBatchGetRoleP); +OB_DEFINE_PROCESSOR_S(Srv, OB_BROADCAST_LOCATIONS, ObRpcBroadcastLocationsP); OB_DEFINE_PROCESSOR_S(Srv, OB_BATCH_GET_PROTECTION_LEVEL, ObRpcBatchGetProtectionLevelP); OB_DEFINE_PROCESSOR_S(Srv, OB_CHECK_NEED_OFFLINE_REPLICA, ObRpcCheckNeedOffineReplicaP); OB_DEFINE_PROCESSOR_S(Srv, OB_CHECK_FLASHBACK_INFO_DUMP, ObRpcCheckFlashbackInfoDumpP); diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 970ecd4d0..53038741a 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -235,6 +235,8 @@ int ObServer::init(const ObServerOptions& opts, const ObPLogWriterCfg& log_cfg) LOG_ERROR("init interrupt fail", K(ret)); } else if (OB_FAIL(rs_mgr_.init(&rs_rpc_proxy_, &config_, &sql_proxy_))) { LOG_ERROR("init rs_mgr_ failed", K(ret)); + } else if (OB_FAIL(server_tracer_.init(rs_rpc_proxy_, sql_proxy_))) { + LOG_WARN("init server tracer failed", K(ret)); } else if (OB_FAIL(init_ob_service())) { LOG_ERROR("init ob service fail", K(ret)); } else if (OB_FAIL(init_root_service())) { @@ -262,8 +264,6 @@ int ObServer::init(const ObServerOptions& opts, const ObPLogWriterCfg& log_cfg) ObPartitionService::get_instance().get_locality_manager(), config_.cluster_id))) { LOG_WARN("location fetcher init failed", K(ret)); - } else if (OB_FAIL(server_tracer_.init(rs_rpc_proxy_, sql_proxy_))) { - LOG_WARN("init server tracer failed", K(ret)); } else if (OB_FAIL(location_cache_.init(schema_service_, config_, server_tracer_, @@ -1310,7 +1310,7 @@ int ObServer::init_global_kvcache() int ObServer::init_ob_service() { int ret = OB_SUCCESS; - if (OB_FAIL(ob_service_.init(sql_proxy_))) { + if (OB_FAIL(ob_service_.init(sql_proxy_, server_tracer_))) { LOG_ERROR("oceanbase service init failed", K(ret)); } return ret; @@ -1679,7 +1679,7 @@ int ObServer::init_gc_partition_adapter() return ret; } -int ObServer::get_network_speed_from_sysfs(int64_t &network_speed) +int ObServer::get_network_speed_from_sysfs(int64_t& network_speed) { int ret = OB_SUCCESS; // sys_bkgd_net_percentage_ = config_.sys_bkgd_net_percentage; @@ -1715,9 +1715,9 @@ char* strtrim(char* str) return str; } -static int64_t nic_rate_parse(const char *str, bool &valid) +static int64_t nic_rate_parse(const char* str, bool& valid) { - char *p_unit = nullptr; + char* p_unit = nullptr; int64_t value = 0; if (OB_ISNULL(str) || '\0' == str[0]) { @@ -1731,22 +1731,15 @@ static int64_t nic_rate_parse(const char *str, bool &valid) valid = false; } else if (value <= 0) { valid = false; - } else if (0 == STRCASECMP("bit", p_unit) - || 0 == STRCASECMP("b", p_unit)) { + } else if (0 == STRCASECMP("bit", p_unit) || 0 == STRCASECMP("b", p_unit)) { // do nothing - } else if (0 == STRCASECMP("kbit", p_unit) - || 0 == STRCASECMP("kb", p_unit) - || 0 == STRCASECMP("k", p_unit)) { + } else if (0 == STRCASECMP("kbit", p_unit) || 0 == STRCASECMP("kb", p_unit) || 0 == STRCASECMP("k", p_unit)) { value <<= 10; - } else if ('\0' == *p_unit - || 0 == STRCASECMP("mbit", p_unit) - || 0 == STRCASECMP("mb", p_unit) - || 0 == STRCASECMP("m", p_unit)) { + } else if ('\0' == *p_unit || 0 == STRCASECMP("mbit", p_unit) || 0 == STRCASECMP("mb", p_unit) || + 0 == STRCASECMP("m", p_unit)) { // default is meta bit value <<= 20; - } else if (0 == STRCASECMP("gbit", p_unit) - || 0 == STRCASECMP("gb", p_unit) - || 0 == STRCASECMP("g", p_unit)) { + } else if (0 == STRCASECMP("gbit", p_unit) || 0 == STRCASECMP("gb", p_unit) || 0 == STRCASECMP("g", p_unit)) { value <<= 30; } else { valid = false; @@ -1756,17 +1749,16 @@ static int64_t nic_rate_parse(const char *str, bool &valid) return value; } -int ObServer::get_network_speed_from_config_file(int64_t &network_speed) +int ObServer::get_network_speed_from_config_file(int64_t& network_speed) { int ret = OB_SUCCESS; - const char *nic_rate_path = "etc/nic.rate.config"; - const int64_t MAX_NIC_CONFIG_FILE_SIZE = 1 << 10; // 1KB - FILE *fp = nullptr; - char *buf = nullptr; + const char* nic_rate_path = "etc/nic.rate.config"; + const int64_t MAX_NIC_CONFIG_FILE_SIZE = 1 << 10; // 1KB + FILE* fp = nullptr; + char* buf = nullptr; static int nic_rate_file_exist = 1; - if (OB_ISNULL(buf = static_cast(ob_malloc(MAX_NIC_CONFIG_FILE_SIZE + 1, - ObModIds::OB_BUFFER)))) { + if (OB_ISNULL(buf = static_cast(ob_malloc(MAX_NIC_CONFIG_FILE_SIZE + 1, ObModIds::OB_BUFFER)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_ERROR("alloc buffer failed", LITERAL_K(MAX_NIC_CONFIG_FILE_SIZE), K(ret)); } else if (OB_ISNULL(fp = fopen(nic_rate_path, "r"))) { @@ -1791,7 +1783,7 @@ int ObServer::get_network_speed_from_config_file(int64_t &network_speed) } memset(buf, 0, MAX_NIC_CONFIG_FILE_SIZE + 1); fread(buf, 1, MAX_NIC_CONFIG_FILE_SIZE, fp); - char *prate = nullptr; + char* prate = nullptr; if (OB_UNLIKELY(0 != ferror(fp))) { ret = OB_IO_ERROR; @@ -1815,13 +1807,13 @@ int ObServer::get_network_speed_from_config_file(int64_t &network_speed) ret = OB_INVALID_ARGUMENT; LOG_ERROR("invalid NIC Config file", K(ret)); } - } // else + } // else if (OB_UNLIKELY(0 != fclose(fp))) { ret = OB_IO_ERROR; LOG_ERROR("Close NIC Config file failed", K(ret)); } - } // else + } // else if (OB_LIKELY(nullptr != buf)) { ob_free(buf); buf = nullptr; @@ -1849,10 +1841,7 @@ int ObServer::init_bandwidth_throttle() if (OB_FAIL(bandwidth_throttle_.init(rate))) { LOG_WARN("failed to init bandwidth throttle", K(ret), K(rate), K(network_speed)); } else { - LOG_INFO("succeed to init_bandwidth_throttle", - K(sys_bkgd_net_percentage_), - K(network_speed), - K(rate)); + LOG_INFO("succeed to init_bandwidth_throttle", K(sys_bkgd_net_percentage_), K(network_speed), K(rate)); ethernet_speed_ = network_speed; } } @@ -1886,8 +1875,10 @@ int ObServer::reload_bandwidth_throttle_limit(int64_t network_speed) LOG_WARN("failed to reset bandwidth throttle", K(ret), K(rate), K(ethernet_speed_)); } else { LOG_INFO("succeed to reload_bandwidth_throttle_limit", - "old_percentage", sys_bkgd_net_percentage_, - "new_percentage", sys_bkgd_net_percentage, + "old_percentage", + sys_bkgd_net_percentage_, + "new_percentage", + sys_bkgd_net_percentage, K(network_speed), K(rate)); sys_bkgd_net_percentage_ = sys_bkgd_net_percentage; @@ -2134,11 +2125,10 @@ int ObServer::refresh_temp_table_sess_active_time() return ret; } -ObServer::ObRefreshNetworkSpeedTask::ObRefreshNetworkSpeedTask() -: obs_(nullptr), is_inited_(false) +ObServer::ObRefreshNetworkSpeedTask::ObRefreshNetworkSpeedTask() : obs_(nullptr), is_inited_(false) {} -int ObServer::ObRefreshNetworkSpeedTask::init(ObServer *obs, int tg_id) +int ObServer::ObRefreshNetworkSpeedTask::init(ObServer* obs, int tg_id) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { @@ -2221,11 +2211,11 @@ int ObServer::init_ctas_clean_up_task() int ObServer::init_refresh_network_speed_task() { - int ret = OB_SUCCESS; - if (OB_FAIL(refresh_network_speed_task_.init(this, lib::TGDefIDs::ServerGTimer))) { - LOG_WARN("fail to init refresh network speed task", K(ret)); - } - return ret; + int ret = OB_SUCCESS; + if (OB_FAIL(refresh_network_speed_task_.init(this, lib::TGDefIDs::ServerGTimer))) { + LOG_WARN("fail to init refresh network speed task", K(ret)); + } + return ret; } // @@Query cleanup rules for built tables and temporary tables: diff --git a/src/observer/ob_server_schema_updater.h b/src/observer/ob_server_schema_updater.h index 34f309fb4..37e5ff6fa 100644 --- a/src/observer/ob_server_schema_updater.h +++ b/src/observer/ob_server_schema_updater.h @@ -71,6 +71,15 @@ public: { return schema_info_.get_schema_version(); } + inline bool need_assign_when_equal() const + { + return false; + } + inline int assign_when_equal(const ObServerSchemaTask& other) + { + UNUSED(other); + return common::OB_NOT_SUPPORTED; + } TO_STRING_KV(K_(type), K_(did_retry), K_(schema_info)); private: diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 0f58e3ee7..7bbe70520 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -55,6 +55,7 @@ #include "observer/ob_dump_task_generator.h" #include "observer/ob_server_schema_updater.h" #include "ob_server_event_history_table_operator.h" +#include "share/ob_alive_server_tracer.h" namespace oceanbase { @@ -118,6 +119,7 @@ ObService::ObService(const ObGlobalContext& gctx) stopped_(false), schema_updater_(), partition_table_updater_(), + partition_location_updater_(), index_status_report_queue_(), rebuild_flag_report_queue_(), pt_checker_(), @@ -129,7 +131,7 @@ ObService::ObService(const ObGlobalContext& gctx) ObService::~ObService() {} -int ObService::init(common::ObMySQLProxy& sql_proxy) +int ObService::init(common::ObMySQLProxy& sql_proxy, share::ObIAliveServerTracer& server_tracer) { int ret = OB_SUCCESS; @@ -148,6 +150,9 @@ int ObService::init(common::ObMySQLProxy& sql_proxy) LOG_ERROR("client_manager_.initialize failed", "self_addr", gctx_.self_addr_, K(ret)); } else if (OB_FAIL(partition_table_updater_.init())) { LOG_WARN("init partition table updater failed", K(ret)); + } else if (OB_FAIL(partition_location_updater_.init( + *this, GCTX.par_ser_, GCTX.srv_rpc_proxy_, GCTX.location_cache_, server_tracer))) { + LOG_WARN("init partition location updater failed", KR(ret)); } else if (OB_FAIL(checksum_updater_.init())) { LOG_WARN("fail to init checksum updater", K(ret)); } else if (OB_FAIL(ObPGPartitionMTUpdater::get_instance().init())) { @@ -269,6 +274,7 @@ void ObService::stop() stopped_ = true; schema_updater_.stop(); partition_table_updater_.stop(); + partition_location_updater_.stop(); checksum_updater_.stop(); ObPGPartitionMTUpdater::get_instance().stop(); index_status_report_queue_.stop(); @@ -285,6 +291,7 @@ void ObService::wait() } else { schema_updater_.wait(); partition_table_updater_.wait(); + partition_location_updater_.wait(); checksum_updater_.wait(); ObPGPartitionMTUpdater::get_instance().wait(); index_status_report_queue_.wait(); @@ -915,8 +922,7 @@ int ObService::submit_pt_update_role_task(const ObPartitionKey& pkey) return ret; } -int ObService::submit_pt_update_task( - const ObPartitionKey& part_key, const bool need_report_checksum) +int ObService::submit_pt_update_task(const ObPartitionKey& part_key, const bool need_report_checksum) { int ret = OB_SUCCESS; const bool is_remove = false; @@ -927,7 +933,7 @@ int ObService::submit_pt_update_task( } else if (!part_key.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(part_key), K(ret)); - } else if (OB_FAIL(partition_table_updater_.async_update(part_key, false/*with_role*/))) { + } else if (OB_FAIL(partition_table_updater_.async_update(part_key, false /*with_role*/))) { LOG_WARN("async_update failed", K(part_key), K(ret)); } else if (need_report_checksum) { if (part_key.is_pg()) { @@ -2904,8 +2910,7 @@ int ObService::report_replica() // The partition has been deleted. There is no need to trigger the report ret = OB_SUCCESS; } - } else if (OB_FAIL(submit_pt_update_task( - partition->get_partition_key(), true /*need report checksum*/))) { + } else if (OB_FAIL(submit_pt_update_task(partition->get_partition_key(), true /*need report checksum*/))) { if (OB_PARTITION_NOT_EXIST == ret) { // The GC thread is already working, // and deleted during traversal, the replica has been deleted needs to be avoided blocking the start process @@ -2915,10 +2920,8 @@ int ObService::report_replica() LOG_WARN( "submit partition table update task failed", K(ret), "partition_key", partition->get_partition_key()); } - } else if (OB_FAIL(submit_pt_update_role_task( - partition->get_partition_key()))) { - LOG_WARN("fail to submit pt update role task", K(ret), - "pkey", partition->get_partition_key()); + } else if (OB_FAIL(submit_pt_update_role_task(partition->get_partition_key()))) { + LOG_WARN("fail to submit pt update role task", K(ret), "pkey", partition->get_partition_key()); } else { // Update partition meta table without concern for error codes submit_pg_pt_update_task(pkeys); @@ -3584,5 +3587,45 @@ int ObService::broadcast_rs_list(const ObRsListArg& arg) } return ret; } +int ObService::submit_broadcast_task(const ObPartitionBroadcastTask& task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("service do not init", KR(ret), K(task)); + } else if (!task.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(task)); + } else if (OB_FAIL(partition_location_updater_.submit_broadcast_task(task))) { + LOG_WARN("submit broadcast task failed", KR(ret), K(task)); + } + return ret; +} + +int ObService::broadcast_locations(const obrpc::ObPartitionBroadcastArg& arg, obrpc::ObPartitionBroadcastResult& result) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("service do not init", KR(ret), K(arg)); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(arg)); + } else { + ObPartitionUpdateTask task; + for (int64_t i = 0; OB_SUCC(ret) && i < arg.keys_.count(); i++) { + const ObPartitionBroadcastTask& key = arg.keys_.at(i); + task.reset(); + if (OB_FAIL(task.init(key.get_table_id(), key.get_partition_id(), key.get_timestamp()))) { + LOG_WARN("fail to init task", KR(ret), K(key)); + } else if (OB_FAIL(partition_location_updater_.submit_update_task(task))) { + LOG_WARN("fail to submit update task", KR(ret), K(task)); + } + } + } + result.ret_ = ret; + LOG_DEBUG("receive broadcast locations", KR(ret), K(arg)); + return ret; +} } // end namespace observer } // end namespace oceanbase diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index 7d827261a..7d6017a8a 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -20,6 +20,7 @@ #include "observer/ob_lease_state_mgr.h" #include "observer/ob_heartbeat.h" #include "observer/ob_partition_table_updater.h" +#include "observer/ob_partition_location_updater.h" #include "observer/ob_sstable_checksum_updater.h" #include "observer/ob_server_schema_updater.h" #include "observer/ob_pg_partition_meta_table_updater.h" @@ -38,6 +39,7 @@ namespace share { class ObSSTableDataChecksumItem; class ObSSTableColumnChecksumItem; class ObPGPartitionMTUpdateItem; +class ObIAliveServerTracer; } // namespace share namespace storage { class ObFrozenStatus; @@ -70,7 +72,7 @@ public: explicit ObService(const ObGlobalContext& gctx); virtual ~ObService(); - int init(common::ObMySQLProxy& sql_proxy); + int init(common::ObMySQLProxy& sql_proxy, share::ObIAliveServerTracer& server_tracer); int start(); void set_stop(); void stop(); @@ -101,8 +103,8 @@ public: //////////////////////////////////////////////////////////////// // ObIPartitionReport interface - virtual int submit_pt_update_task(const common::ObPartitionKey& part_key, - const bool need_report_checksum = true) override; + virtual int submit_pt_update_task( + const common::ObPartitionKey& part_key, const bool need_report_checksum = true) override; virtual int submit_pt_update_role_task(const common::ObPartitionKey& part_key) override; virtual void submit_pg_pt_update_task(const common::ObPartitionArray& pg_partitions) override; virtual int submit_checksum_update_task(const common::ObPartitionKey& part_key, const uint64_t sstable_id, @@ -271,6 +273,8 @@ public: int cancel_sys_task(const share::ObTaskId& task_id); int refresh_memory_stat(); int broadcast_rs_list(const obrpc::ObRsListArg& arg); + int submit_broadcast_task(const share::ObPartitionBroadcastTask& task); + int broadcast_locations(const obrpc::ObPartitionBroadcastArg& arg, obrpc::ObPartitionBroadcastResult& result); //////////////////////////////////////////////////////////////// // misc functions int64_t get_partition_table_updater_user_queue_size() const; @@ -307,6 +311,7 @@ private: ObServerSchemaUpdater schema_updater_; ObPartitionTableUpdater partition_table_updater_; + ObPartitionLocationUpdater partition_location_updater_; ObIndexStatusUpdater index_updater_; ObSSTableChecksumUpdater checksum_updater_; ObUniqTaskQueue index_status_report_queue_; diff --git a/src/observer/ob_srv_xlator_primary.cpp b/src/observer/ob_srv_xlator_primary.cpp index 6629cc265..a9681f9f4 100644 --- a/src/observer/ob_srv_xlator_primary.cpp +++ b/src/observer/ob_srv_xlator_primary.cpp @@ -53,6 +53,7 @@ void oceanbase::observer::init_srv_xlator_for_sys(ObSrvRpcXlator* xlator) { RPC_PROCESSOR(ObRpcGetRoleP, gctx_); RPC_PROCESSOR(ObRpcBatchGetRoleP, gctx_); + RPC_PROCESSOR(ObRpcBroadcastLocationsP, gctx_); RPC_PROCESSOR(ObRpcGetMasterRSP, gctx_); RPC_PROCESSOR(ObRpcSetConfigP, gctx_); RPC_PROCESSOR(ObRpcGetConfigP, gctx_); diff --git a/src/observer/ob_sstable_checksum_updater.h b/src/observer/ob_sstable_checksum_updater.h index e02bd12e0..b2c8071f9 100644 --- a/src/observer/ob_sstable_checksum_updater.h +++ b/src/observer/ob_sstable_checksum_updater.h @@ -75,6 +75,15 @@ public: { return add_timestamp_; } + inline bool need_assign_when_equal() const + { + return false; + } + inline int assign_when_equal(const ObSSTableChecksumUpdateTask& other) + { + UNUSED(other); + return common::OB_NOT_SUPPORTED; + } TO_STRING_KV(K_(pkey), K_(sstable_id), K_(sstable_type), K_(is_remove)); private: diff --git a/src/observer/ob_uniq_task_queue.h b/src/observer/ob_uniq_task_queue.h index aa52c46ee..557f5a280 100644 --- a/src/observer/ob_uniq_task_queue.h +++ b/src/observer/ob_uniq_task_queue.h @@ -243,8 +243,17 @@ int ObUniqTaskQueue::add(const Task& task) const Task* stored_task = NULL; if (OB_FAIL(task_map_.set_refactored(task, task))) { if (common::OB_HASH_EXIST == ret) { - ret = common::OB_EAGAIN; - SERVER_LOG(TRACE, "same task exist", K(task)); + if (task.need_assign_when_equal()) { + if (NULL == (stored_task = task_map_.get(task))) { + ret = common::OB_ERR_SYS; + SERVER_LOG(WARN, "get inserted task failed", K(ret), K(task)); + } else if (OB_FAIL(const_cast(stored_task)->assign_when_equal(task))) { + SERVER_LOG(WARN, "assign task failed", K(ret), K(task)); + } + } else { + ret = common::OB_EAGAIN; + SERVER_LOG(TRACE, "same task exist", K(task)); + } } else { SERVER_LOG(WARN, "insert into hash failed", K(ret), K(task)); } diff --git a/src/rootserver/ob_rs_async_rpc_proxy.h b/src/rootserver/ob_rs_async_rpc_proxy.h index 18df4fea7..08ac324f5 100644 --- a/src/rootserver/ob_rs_async_rpc_proxy.h +++ b/src/rootserver/ob_rs_async_rpc_proxy.h @@ -459,6 +459,8 @@ RPC_F(obrpc::OB_GET_MIN_SSTABLE_SCHEMA_VERSION, obrpc::ObGetMinSSTableSchemaVers RPC_F(obrpc::OB_BATCH_GET_MEMBER_LIST_AND_LEADER, obrpc::ObLocationRpcRenewArg, obrpc::ObLocationRpcRenewResult, ObBatchRpcRenewLocProxy); RPC_F(obrpc::OB_BATCH_GET_ROLE, obrpc::ObBatchGetRoleArg, obrpc::ObBatchGetRoleResult, ObBatchGetRoleProxy); +RPC_F(obrpc::OB_BROADCAST_LOCATIONS, obrpc::ObPartitionBroadcastArg, obrpc::ObPartitionBroadcastResult, + ObBroadcastLocationProxy); RPC_F(obrpc::OB_BATCH_GET_PROTECTION_LEVEL, obrpc::ObBatchCheckLeaderArg, obrpc::ObBatchCheckRes, ObBatchGetProtectionLevelProxy); RPC_F(obrpc::OB_CHECK_NEED_OFFLINE_REPLICA, obrpc::ObTenantSchemaVersions, obrpc::ObGetPartitionCountResult, diff --git a/src/share/CMakeLists.txt b/src/share/CMakeLists.txt index d9afaa0bd..4e8b183a0 100644 --- a/src/share/CMakeLists.txt +++ b/src/share/CMakeLists.txt @@ -142,6 +142,7 @@ ob_set_subtarget(ob_share partition_table partition_table/ob_united_pt_operator.cpp partition_table/ob_inmemory_partition_table.cpp partition_table/ob_location_update_task.cpp + partition_table/ob_partition_location_task.cpp partition_table/ob_partition_info.cpp partition_table/ob_partition_location.cpp partition_table/ob_partition_location_cache.cpp diff --git a/src/share/ob_alive_server_tracer.cpp b/src/share/ob_alive_server_tracer.cpp index c17e9d022..938cba4c7 100644 --- a/src/share/ob_alive_server_tracer.cpp +++ b/src/share/ob_alive_server_tracer.cpp @@ -189,6 +189,25 @@ int ObAliveServerMap::refresh_server_list( return ret; } +int ObAliveServerMap::get_active_server_list(common::ObIArray& addrs) const +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else { + addrs.reset(); + ObLatchRGuard guard(lock_, ObLatchIds::ALIVE_SERVER_TRACER_LOCK); + common::hash::ObHashSet::const_iterator iter; + for (iter = active_servers_.begin(); OB_SUCC(ret) && iter != active_servers_.end(); ++iter) { + if (OB_FAIL(addrs.push_back(iter->first))) { + LOG_WARN("fail to push back addr", KR(ret)); + } + } + } + return ret; +} + ObAliveServerRefreshTask::ObAliveServerRefreshTask(ObAliveServerTracer& tracer) : tracer_(tracer), is_inited_(false) {} @@ -371,5 +390,22 @@ int ObAliveServerTracer::get_primary_cluster_id(int64_t& cluster_id) const UNUSED(cluster_id); return OB_NOT_SUPPORTED; } +int ObAliveServerTracer::get_active_server_list(common::ObIArray& addrs) const +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else { + const ObAliveServerMap* volatile map = cur_map_; + if (OB_ISNULL(map)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null pointer", KR(ret)); + } else if (OB_FAIL(map->get_active_server_list(addrs))) { + LOG_WARN("check server alive failed", KR(ret), K(addrs)); + } + } + return ret; +} } // end namespace share } // end namespace oceanbase diff --git a/src/share/ob_alive_server_tracer.h b/src/share/ob_alive_server_tracer.h index 1d48bfe2a..20624623f 100644 --- a/src/share/ob_alive_server_tracer.h +++ b/src/share/ob_alive_server_tracer.h @@ -40,6 +40,7 @@ public: const common::ObAddr& addr, bool& alive, bool& is_server_exist, int64_t& trace_time) const = 0; virtual int get_primary_cluster_id(int64_t& cluster_id) const = 0; + virtual int get_active_server_list(common::ObIArray& addrs) const = 0; }; class ObAliveServerMap : public ObIAliveServerTracer { @@ -61,6 +62,7 @@ public: UNUSED(cluster_id); return common::OB_OP_NOT_ALLOW; } + virtual int get_active_server_list(common::ObIArray& addrs) const; private: virtual int refresh_server_list(const common::ObIArray& server_list, @@ -106,6 +108,7 @@ public: const common::ObAddr& addr, bool& alive, bool& is_server_exist, int64_t& trace_time) const; virtual int get_primary_cluster_id(int64_t& cluster_id) const; virtual int refresh(); + virtual int get_active_server_list(common::ObIArray& addrs) const; private: int refresh_primary_cluster_id(); diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 58c9419ff..77238d03a 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -4530,5 +4530,35 @@ OB_SERIALIZE_MEMBER(ObDropRestorePointArg, tenant_id_, name_); OB_SERIALIZE_MEMBER(ObCheckBuildIndexTaskExistArg, tenant_id_, task_id_, scheduler_id_); +OB_SERIALIZE_MEMBER(ObPartitionBroadcastArg, keys_); +bool ObPartitionBroadcastArg::is_valid() const +{ + return keys_.count() > 0; +} +int ObPartitionBroadcastArg::assign(const ObPartitionBroadcastArg& other) +{ + int ret = OB_SUCCESS; + if (this == &other) { + } else if (OB_FAIL(keys_.assign(other.keys_))) { + LOG_WARN("fail to assign keys", KR(ret), K(other)); + } + return ret; +} + +OB_SERIALIZE_MEMBER(ObPartitionBroadcastResult, ret_); +bool ObPartitionBroadcastResult::is_valid() const +{ + return true; +} +int ObPartitionBroadcastResult::assign(const ObPartitionBroadcastResult& other) +{ + int ret = OB_SUCCESS; + if (this == &other) { + } else { + ret_ = other.ret_; + } + return ret; +} + } // end namespace obrpc } // namespace oceanbase diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 29655770e..165e147be 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -47,7 +47,8 @@ #include "share/restore/ob_restore_args.h" // ObRestoreArgs #include "rootserver/ob_rs_job_table_operator.h" #include "sql/executor/ob_task_id.h" -#include "sql/parser/ob_item_type.h" // ObCacheType +#include "sql/parser/ob_item_type.h" // ObCacheType +#include "share/partition_table/ob_partition_location_task.h" // ObPartitionBroadcastTask namespace oceanbase { namespace rootserver { @@ -8044,6 +8045,44 @@ public: TO_STRING_KV(K_(tenant_id), K_(task_id), K_(scheduler_id)); }; +struct ObPartitionBroadcastArg { + OB_UNIS_VERSION(1); + +public: + ObPartitionBroadcastArg() : keys_() + {} + ~ObPartitionBroadcastArg() + {} + bool is_valid() const; + int assign(const ObPartitionBroadcastArg& other); + TO_STRING_KV(K_(keys)); + +private: + DISALLOW_COPY_AND_ASSIGN(ObPartitionBroadcastArg); + +public: + common::ObSEArray keys_; +}; + +struct ObPartitionBroadcastResult { + OB_UNIS_VERSION(1); + +public: + ObPartitionBroadcastResult() : ret_(common::OB_SUCCESS) + {} + ~ObPartitionBroadcastResult() + {} + bool is_valid() const; + int assign(const ObPartitionBroadcastResult& other); + TO_STRING_KV(K_(ret)); + +private: + DISALLOW_COPY_AND_ASSIGN(ObPartitionBroadcastResult); + +public: + int ret_; +}; + } // end namespace obrpc } // end namespace oceanbase #endif diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index f1a04b8b1..8fe9f85f7 100644 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -139,6 +139,8 @@ public: RPC_AP(PR3 batch_get_member_list_and_leader, OB_BATCH_GET_MEMBER_LIST_AND_LEADER, (obrpc::ObLocationRpcRenewArg), obrpc::ObLocationRpcRenewResult); RPC_AP(PR3 batch_get_role, OB_BATCH_GET_ROLE, (obrpc::ObBatchGetRoleArg), obrpc::ObBatchGetRoleResult); + RPC_AP(PR5 broadcast_locations, OB_BROADCAST_LOCATIONS, (obrpc::ObPartitionBroadcastArg), + obrpc::ObPartitionBroadcastResult); RPC_AP(PR5 check_has_need_offline_replica, OB_CHECK_NEED_OFFLINE_REPLICA, (obrpc::ObTenantSchemaVersions), obrpc::ObGetPartitionCountResult); RPC_AP(PR5 check_flashback_info_dump, OB_CHECK_FLASHBACK_INFO_DUMP, (obrpc::ObCheckFlashbackInfoArg), diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 4fb7fb6eb..ab394c249 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -69,7 +69,8 @@ DEF_INT(tenant_task_queue_size, OB_CLUSTER_PARAMETER, "65536", "[1024,]", "the size of the task queue for each tenant. Range: [1024,+∞)", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_CAP_WITH_CHECKER(memory_limit, OB_CLUSTER_PARAMETER, "0", common::ObConfigMemoryLimitChecker, "0, [8G,)", - "the size of the memory reserved for internal use(for testing purpose), 0 means follow memory_limit_percentage. Range: 0, [8G,)", + "the size of the memory reserved for internal use(for testing purpose), 0 means follow memory_limit_percentage. " + "Range: 0, [8G,)", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_CAP(rootservice_memory_limit, OB_CLUSTER_PARAMETER, "2G", "[2G,)", "max memory size which can be used by rs tenant The default value is 2G. Range: [2G,)", @@ -754,12 +755,11 @@ DEF_INT(clog_max_unconfirmed_log_count, OB_TENANT_PARAMETER, "1500", "[100, 5000 ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_TIME(_ob_clog_timeout_to_force_switch_leader, OB_CLUSTER_PARAMETER, "10s", "[0s, 60m]", - "When log sync is blocking, leader need wait this interval before revoke." - "The default value is 0s, use 0s to close this function. Range: [0s, 60m]", - ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); -DEF_INT(_ob_clog_disk_buffer_cnt, OB_CLUSTER_PARAMETER, "64", "[1, 2000]", - "clog disk buffer cnt. Range: [1, 2000]", - ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); + "When log sync is blocking, leader need wait this interval before revoke." + "The default value is 0s, use 0s to close this function. Range: [0s, 60m]", + ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_INT(_ob_clog_disk_buffer_cnt, OB_CLUSTER_PARAMETER, "64", "[1, 2000]", "clog disk buffer cnt. Range: [1, 2000]", + ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_TIME(_ob_trans_rpc_timeout, OB_CLUSTER_PARAMETER, "3s", "[0s, 3600s]", "transaction rpc timeout(s). Range: [0s, 3600s]", ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); @@ -825,6 +825,14 @@ DEF_TIME(location_cache_refresh_sql_timeout, OB_CLUSTER_PARAMETER, "1s", "[1ms,) ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_STR(all_server_list, OB_CLUSTER_PARAMETER, "", "all server addr in cluster", ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_BOOL(enable_auto_refresh_location_cache, OB_CLUSTER_PARAMETER, "False", "enable auto refresh location", + ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_INT(auto_refresh_location_cache_rate_limit, OB_CLUSTER_PARAMETER, "1000", "[1, 100000]", + "Maximum number of partitions to refresh location automatically per second", + ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_INT(auto_broadcast_location_cache_rate_limit, OB_CLUSTER_PARAMETER, "1000", "[1, 100000]", + "Maximum number of partitions to broadcast location per second", + ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); //// cache config DEF_INT(clog_cache_priority, OB_CLUSTER_PARAMETER, "1", "[1,)", "clog cache priority. Range: [1, )", @@ -1454,6 +1462,6 @@ DEF_BOOL(_enable_block_file_punch_hole, OB_CLUSTER_PARAMETER, "False", "specifies whether to punch whole when free blocks in block_file", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_BOOL(_ob_enable_px_for_inner_sql, OB_CLUSTER_PARAMETER, "true", - "specifies whether inner sql uses px. " - "The default value is TRUE. Value: TRUE: turned on FALSE: turned off", - ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); + "specifies whether inner sql uses px. " + "The default value is TRUE. Value: TRUE: turned on FALSE: turned off", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/src/share/partition_table/ob_partition_location_cache.h b/src/share/partition_table/ob_partition_location_cache.h index 1a5e99597..1596e5f88 100644 --- a/src/share/partition_table/ob_partition_location_cache.h +++ b/src/share/partition_table/ob_partition_location_cache.h @@ -272,6 +272,15 @@ public: return type_; } bool need_discard() const; + inline bool need_assign_when_equal() const + { + return false; + } + inline int assign_when_equal(const ObLocationAsyncUpdateTask& other) + { + UNUSED(other); + return common::OB_NOT_SUPPORTED; + } TO_STRING_KV(KT_(table_id), K_(partition_id), K_(add_timestamp), K_(cluster_id), K_(type)); private: diff --git a/src/share/partition_table/ob_partition_location_task.cpp b/src/share/partition_table/ob_partition_location_task.cpp new file mode 100644 index 000000000..fc93a52cc --- /dev/null +++ b/src/share/partition_table/ob_partition_location_task.cpp @@ -0,0 +1,264 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SHARE_PT + +#include "lib/time/ob_time_utility.h" +#include "share/ob_errno.h" +#include "share/ob_task_define.h" +#include "share/partition_table/ob_partition_location_task.h" + +namespace oceanbase { +using namespace common; +namespace share { + +void TSILocationRateLimit::reset() +{ + cnt_ = 0; + start_ts_ = OB_INVALID_TIMESTAMP; +} + +int64_t TSILocationRateLimit::calc_wait_ts(const int64_t cnt, const int64_t exec_ts, const int64_t frequency) +{ + int64_t wait_ts = 0; + int64_t current_ts = ObTimeUtility::current_time(); + if (current_ts - start_ts_ >= ONE_SECOND_US) { // init or >= 1s + cnt_ = cnt; + start_ts_ = current_ts - exec_ts; + } else { + cnt_ += cnt; + } + if (cnt_ > frequency) { + wait_ts = cnt_ / (double)frequency * ONE_SECOND_US - (current_ts - start_ts_); + } + return wait_ts > 0 ? wait_ts : 0; +} + +void TSILocationStatistics::reset() +{ + suc_cnt_ = 0; + fail_cnt_ = 0; + total_exec_us_ = 0; + total_wait_us_ = 0; +} + +int64_t TSILocationStatistics::get_total_cnt() const +{ + return suc_cnt_ + fail_cnt_; +} + +void TSILocationStatistics::calc(const int ret, const int64_t exec_us, const int64_t wait_us, const int64_t cnt) +{ + total_exec_us_ += static_cast(exec_us); + total_wait_us_ += static_cast(wait_us); + if (OB_SUCCESS == ret) { + suc_cnt_ += cnt; + } else { + fail_cnt_ += cnt; + } +} + +int ObPartitionBroadcastTask::init( + const uint64_t table_id, const int64_t partition_id, const int64_t partition_cnt, const int64_t timestamp) +{ + int ret = OB_SUCCESS; + table_id_ = table_id; + partition_id_ = partition_id; + partition_cnt_ = partition_cnt; + timestamp_ = timestamp; + return ret; +} + +void ObPartitionBroadcastTask::reset() +{ + table_id_ = OB_INVALID_ID; + partition_id_ = OB_INVALID_ID; + partition_cnt_ = OB_INVALID_ID; + timestamp_ = OB_INVALID_TIMESTAMP; +} + +bool ObPartitionBroadcastTask::is_valid() const +{ + return OB_INVALID_ID != table_id_ && OB_INVALID_ID != partition_id_ && OB_INVALID_ID != partition_cnt_ && + OB_INVALID_TIMESTAMP != timestamp_; +} + +int ObPartitionBroadcastTask::assign(const ObPartitionBroadcastTask& other) +{ + int ret = OB_SUCCESS; + if (this != &other) { + table_id_ = other.table_id_; + partition_id_ = other.partition_id_; + partition_cnt_ = other.partition_cnt_; + timestamp_ = other.timestamp_; + } + return ret; +} + +bool ObPartitionBroadcastTask::need_process_alone() const +{ + return false; +} + +int64_t ObPartitionBroadcastTask::hash() const +{ + uint64_t hash_val = 0; + hash_val = murmurhash(&table_id_, sizeof(table_id_), hash_val); + hash_val = murmurhash(&partition_id_, sizeof(partition_id_), hash_val); + hash_val = murmurhash(&partition_cnt_, sizeof(partition_cnt_), hash_val); + return hash_val; +} + +bool ObPartitionBroadcastTask::operator==(const ObPartitionBroadcastTask& other) const +{ + bool equal = false; + if (!is_valid() || !other.is_valid()) { + LOG_WARN("invalid argument", "self", *this, K(other)); + } else if (this == &other) { + equal = true; + } else { + equal = (table_id_ == other.table_id_ && partition_id_ == other.partition_id_ && + partition_cnt_ == other.partition_cnt_); + } + return equal; +} + +bool ObPartitionBroadcastTask::compare_without_version(const ObPartitionBroadcastTask& other) const +{ + return (*this == other); +} + +uint64_t ObPartitionBroadcastTask::get_group_id() const +{ + return extract_tenant_id(table_id_); +} + +bool ObPartitionBroadcastTask::is_barrier() const +{ + return false; +} + +bool ObPartitionBroadcastTask::need_assign_when_equal() const +{ + return true; +} + +int ObPartitionBroadcastTask::assign_when_equal(const ObPartitionBroadcastTask& other) +{ + int ret = OB_SUCCESS; + if (*this == other) { + if (other.timestamp_ > timestamp_) { + timestamp_ = other.timestamp_; + } + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("task should be equal", KR(ret), KPC(this), K(other)); + } + return ret; +} + +OB_SERIALIZE_MEMBER(ObPartitionBroadcastTask, table_id_, partition_id_, timestamp_); + +int ObPartitionUpdateTask::init(const uint64_t table_id, const int64_t partition_id, const int64_t timestamp) +{ + int ret = OB_SUCCESS; + table_id_ = table_id; + partition_id_ = partition_id; + timestamp_ = timestamp; + return ret; +} + +void ObPartitionUpdateTask::reset() +{ + table_id_ = OB_INVALID_ID; + partition_id_ = OB_INVALID_ID; + timestamp_ = OB_INVALID_TIMESTAMP; +} + +bool ObPartitionUpdateTask::is_valid() const +{ + return OB_INVALID_ID != table_id_ && OB_INVALID_ID != partition_id_ && OB_INVALID_TIMESTAMP != timestamp_; +} + +int ObPartitionUpdateTask::assign(const ObPartitionUpdateTask& other) +{ + int ret = OB_SUCCESS; + if (this != &other) { + table_id_ = other.table_id_; + partition_id_ = other.partition_id_; + timestamp_ = other.timestamp_; + } + return ret; +} + +bool ObPartitionUpdateTask::need_process_alone() const +{ + return false; +} + +int64_t ObPartitionUpdateTask::hash() const +{ + uint64_t hash_val = 0; + hash_val = murmurhash(&table_id_, sizeof(table_id_), hash_val); + hash_val = murmurhash(&partition_id_, sizeof(partition_id_), hash_val); + return hash_val; +} + +bool ObPartitionUpdateTask::operator==(const ObPartitionUpdateTask& other) const +{ + bool equal = false; + if (!is_valid() || !other.is_valid()) { + LOG_WARN("invalid argument", "self", *this, K(other)); + } else if (this == &other) { + equal = true; + } else { + equal = (table_id_ == other.table_id_ && partition_id_ == other.partition_id_); + } + return equal; +} + +bool ObPartitionUpdateTask::compare_without_version(const ObPartitionUpdateTask& other) const +{ + return (*this == other); +} + +uint64_t ObPartitionUpdateTask::get_group_id() const +{ + return extract_tenant_id(table_id_); +} + +bool ObPartitionUpdateTask::is_barrier() const +{ + return false; +} + +bool ObPartitionUpdateTask::need_assign_when_equal() const +{ + return true; +} + +int ObPartitionUpdateTask::assign_when_equal(const ObPartitionUpdateTask& other) +{ + int ret = OB_SUCCESS; + if (*this == other) { + if (other.timestamp_ > timestamp_) { + timestamp_ = other.timestamp_; + } + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("task should be equal", KR(ret), KPC(this), K(other)); + } + return ret; +} + +} // namespace share +} // namespace oceanbase diff --git a/src/share/partition_table/ob_partition_location_task.h b/src/share/partition_table/ob_partition_location_task.h new file mode 100644 index 000000000..ffebe3aa0 --- /dev/null +++ b/src/share/partition_table/ob_partition_location_task.h @@ -0,0 +1,174 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_SHARE_PARTITION_TABLE_OB_PARTITION_LOCATION_TASK_H_ +#define OCEANBASE_SHARE_PARTITION_TABLE_OB_PARTITION_LOCATION_TASK_H_ + +#include "lib/ob_define.h" +#include "lib/list/ob_dlink_node.h" +#include "lib/oblog/ob_log_module.h" +#include "lib/utility/ob_print_utils.h" +#include "lib/utility/ob_unify_serialize.h" + +namespace oceanbase { +namespace share { +struct TSILocationRateLimit { +public: + const int64_t ONE_SECOND_US = 1 * 1000 * 1000L; // 1s +public: + TSILocationRateLimit() : cnt_(0), start_ts_(common::OB_INVALID_TIMESTAMP) + {} + ~TSILocationRateLimit() + {} + void reset(); + int64_t calc_wait_ts(const int64_t cnt, const int64_t exec_ts, const int64_t frequency); + TO_STRING_KV(K_(cnt), K_(start_ts)); + +public: + int64_t cnt_; + int64_t start_ts_; +}; + +struct TSILocationStatistics { +public: + TSILocationStatistics() : suc_cnt_(0), fail_cnt_(0), total_exec_us_(0), total_wait_us_(0) + {} + ~TSILocationStatistics() + {} + void reset(); + void calc(const int ret, const int64_t exec_us, const int64_t wait_us, const int64_t cnt); + int64_t get_total_cnt() const; + TO_STRING_KV(K_(suc_cnt), K_(fail_cnt), K_(total_exec_us), K_(total_wait_us)); + +public: + int64_t suc_cnt_; + int64_t fail_cnt_; + uint64_t total_exec_us_; + uint64_t total_wait_us_; +}; + +// For Sender of ObPartitionLocationUpdater +class ObPartitionBroadcastTask : public common::ObDLinkBase { +public: + OB_UNIS_VERSION(1); + friend class ObPartitionLocationUpdater; + +public: + ObPartitionBroadcastTask() + : table_id_(common::OB_INVALID_ID), + partition_id_(common::OB_INVALID_ID), + partition_cnt_(common::OB_INVALID_ID), + timestamp_(common::OB_INVALID_TIMESTAMP) + {} + explicit ObPartitionBroadcastTask( + const int64_t table_id, const int64_t partition_id, const int64_t partition_cnt, const int64_t timestamp) + : table_id_(table_id), partition_id_(partition_id), partition_cnt_(partition_cnt), timestamp_(timestamp) + {} + virtual ~ObPartitionBroadcastTask() + {} + + int init(const uint64_t table_id, const int64_t partition_id, const int64_t partition_cnt, const int64_t timestamp); + int assign(const ObPartitionBroadcastTask& other); + void reset(); + bool is_valid() const; + + virtual int64_t hash() const; + virtual bool operator==(const ObPartitionBroadcastTask& other) const; + + uint64_t get_group_id() const; + bool is_barrier() const; + bool need_process_alone() const; + virtual bool compare_without_version(const ObPartitionBroadcastTask& other) const; + bool need_assign_when_equal() const; + int assign_when_equal(const ObPartitionBroadcastTask& other); + + uint64_t get_table_id() const + { + return table_id_; + } + int64_t get_partition_id() const + { + return partition_id_; + } + int64_t get_partition_cnt() const + { + return partition_cnt_; + } + int64_t get_timestamp() const + { + return timestamp_; + } + + TO_STRING_KV(K_(table_id), K_(partition_id), K_(partition_cnt), K_(timestamp)); + +private: + uint64_t table_id_; + int64_t partition_id_; + int64_t partition_cnt_; // won't serialize/deserialize + int64_t timestamp_; +}; + +// For Receiver of ObPartitionLocationUpdater +class ObPartitionUpdateTask : public common::ObDLinkBase { +public: + friend class ObPartitionLocationUpdater; + +public: + ObPartitionUpdateTask() + : table_id_(common::OB_INVALID_ID), partition_id_(common::OB_INVALID_ID), timestamp_(common::OB_INVALID_TIMESTAMP) + {} + explicit ObPartitionUpdateTask(const int64_t table_id, const int64_t partition_id, const int64_t timestamp) + : table_id_(table_id), partition_id_(partition_id), timestamp_(timestamp) + {} + virtual ~ObPartitionUpdateTask() + {} + + int init(const uint64_t table_id, const int64_t partition_id, const int64_t timestamp); + void reset(); + bool is_valid() const; + int assign(const ObPartitionUpdateTask& other); + + virtual int64_t hash() const; + virtual bool operator==(const ObPartitionUpdateTask& other) const; + + uint64_t get_group_id() const; + bool is_barrier() const; + bool need_process_alone() const; + virtual bool compare_without_version(const ObPartitionUpdateTask& other) const; + bool need_assign_when_equal() const; + int assign_when_equal(const ObPartitionUpdateTask& other); + + uint64_t get_table_id() const + { + return table_id_; + } + int64_t get_partition_id() const + { + return partition_id_; + } + int64_t get_timestamp() const + { + return timestamp_; + } + + TO_STRING_KV(K_(table_id), K_(partition_id), K_(timestamp)); + +private: + uint64_t table_id_; + int64_t partition_id_; + int64_t timestamp_; +}; + +} // namespace share +} // namespace oceanbase + +#endif