diff --git a/mittest/simple_server/test_location_service.cpp b/mittest/simple_server/test_location_service.cpp index a01a491719..14285e9068 100644 --- a/mittest/simple_server/test_location_service.cpp +++ b/mittest/simple_server/test_location_service.cpp @@ -362,10 +362,11 @@ TEST_F(TestLocationService, test_clear_tablet_ls_cache) ASSERT_TRUE(cache_size == cache_size_before_renew); // test 1 million cache clear + const bool update_only = false; for (int64_t i = 0; i < 1000000; ++i) { ObTabletLSCache cache; ASSERT_EQ(OB_SUCCESS, cache.init(tenant_id, ObTabletID(i+300000), ObLSID(1002), ObClockGenerator::getClock(), 1)); - ASSERT_EQ(OB_SUCCESS, tablet_ls_service->inner_cache_.update(cache)); + ASSERT_EQ(OB_SUCCESS, tablet_ls_service->inner_cache_.update(cache, update_only)); } cache_size = tablet_ls_service->inner_cache_.size(); ASSERT_TRUE(1000000 == cache_size - cache_size_before_renew); diff --git a/mittest/simple_server/test_tablet_autoinc_mgr.cpp b/mittest/simple_server/test_tablet_autoinc_mgr.cpp index 6aa3438928..e92f427ce9 100644 --- a/mittest/simple_server/test_tablet_autoinc_mgr.cpp +++ b/mittest/simple_server/test_tablet_autoinc_mgr.cpp @@ -192,8 +192,9 @@ TEST_F(TestTabletAutoincMgr, test_lob_tablet_autoinc_location_cache) ASSERT_EQ(OB_SUCCESS, task.result_); // restore old tablet ls cache + const bool update_only = false; for (int64_t i = 0; i < old_tablet_ls_cache.count(); i++) { - ASSERT_EQ(OB_SUCCESS, tablet_ls_service->update_cache_(old_tablet_ls_cache.at(i))); + ASSERT_EQ(OB_SUCCESS, tablet_ls_service->update_cache(old_tablet_ls_cache.at(i), update_only)); } // remove source ls and clear src ls cache diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 0771b5bdc6..b3d19e74a4 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -2856,6 +2856,22 @@ int ObAdminUnlockMemberListP::process() return ret; } +int ObTabletLocationReceiveP::process() +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(GCTX.location_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("GCTX.location_service_ is nullptr", KR(ret), KP(GCTX.location_service_)); + } else { + FOREACH_CNT_X(it, arg_.get_tasks(), OB_SUCC(ret)) { + if (OB_FAIL(GCTX.location_service_->submit_tablet_update_task(*it))) { + LOG_WARN("failed to submit_tablet_update_tasks", KR(ret)); + } + } + } + result_.set_ret(ret); + return OB_SUCCESS; +} } // end of namespace observer } // end of namespace oceanbase diff --git a/src/observer/ob_rpc_processor_simple.h b/src/observer/ob_rpc_processor_simple.h index 68d8ec9b1e..04dc22f5b6 100644 --- a/src/observer/ob_rpc_processor_simple.h +++ b/src/observer/ob_rpc_processor_simple.h @@ -258,6 +258,7 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_HA_UNLOCK_MEMBER_LIST, ObAdminUnlockMemberListP); OB_DEFINE_PROCESSOR_S(Srv, OB_TABLET_MAJOR_FREEZE, ObRpcTabletMajorFreezeP); // OB_DEFINE_PROCESSOR_S(Srv, OB_KILL_CLIENT_SESSION, ObKillClientSessionP); // OB_DEFINE_PROCESSOR_S(Srv, OB_CLIENT_SESSION_CONNECT_TIME, ObClientSessionConnectTimeP); +OB_DEFINE_PROCESSOR_S(Srv, OB_TABLET_LOCATION_BROADCAST, ObTabletLocationReceiveP); } // end of namespace observer } // end of namespace oceanbase diff --git a/src/observer/ob_srv_xlator_partition.cpp b/src/observer/ob_srv_xlator_partition.cpp index 6604027691..79603421cd 100644 --- a/src/observer/ob_srv_xlator_partition.cpp +++ b/src/observer/ob_srv_xlator_partition.cpp @@ -117,6 +117,7 @@ void oceanbase::observer::init_srv_xlator_for_partition(ObSrvRpcXlator *xlator) RPC_PROCESSOR(ObRegisterTxDataP, gctx_); RPC_PROCESSOR(ObRpcGetLSAccessModeP, gctx_); RPC_PROCESSOR(ObRpcChangeLSAccessModeP, gctx_); + RPC_PROCESSOR(ObTabletLocationReceiveP, gctx_); } void oceanbase::observer::init_srv_xlator_for_migrator(ObSrvRpcXlator *xlator) { diff --git a/src/rootserver/ob_root_utils.h b/src/rootserver/ob_root_utils.h index e8ff9c79bc..b3efd8d31f 100644 --- a/src/rootserver/ob_root_utils.h +++ b/src/rootserver/ob_root_utils.h @@ -685,6 +685,11 @@ public: const obrpc::ObNotifySwitchLeaderArg::SwitchLeaderComment &comment); static int check_tenant_ls_balance(uint64_t tenant_id, int &check_ret); + template + static int copy_array(const common::ObIArray &src_array, + const int64_t start_pos, + const int64_t end_pos, + common::ObIArray &dst_array); }; template @@ -702,6 +707,33 @@ bool ObRootUtils::is_subset(const common::ObIArray &superset_array, return bret; } +template +int ObRootUtils::copy_array( + const common::ObIArray &src_array, + const int64_t start_pos, + const int64_t end_pos, + common::ObIArray &dst_array) +{ + int ret = common::OB_SUCCESS; + dst_array.reset(); + if (OB_UNLIKELY(start_pos < 0 || start_pos > end_pos || end_pos > src_array.count())) { + ret = common::OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "invalid start_pos/end_pos", KR(ret), + K(start_pos), K(end_pos), "src_array_cnt", src_array.count()); + } else if (start_pos == end_pos) { + // do nothing + } else if (OB_FAIL(dst_array.reserve(end_pos - start_pos))) { + COMMON_LOG(WARN, "fail to reserve array", KR(ret), "cnt", end_pos - start_pos); + } else { + for (int64_t i = start_pos; OB_SUCC(ret) && i < end_pos; i++) { + if (OB_FAIL(dst_array.push_back(src_array.at(i)))) { + COMMON_LOG(WARN, "fail to push back", KR(ret), K(i)); + } + } // end for + } + return ret; +} + class ObClusterInfoGetter { public: diff --git a/src/rootserver/ob_tenant_transfer_service.cpp b/src/rootserver/ob_tenant_transfer_service.cpp index 9ce98d01dc..c88119483b 100644 --- a/src/rootserver/ob_tenant_transfer_service.cpp +++ b/src/rootserver/ob_tenant_transfer_service.cpp @@ -1095,8 +1095,8 @@ int ObTenantTransferService::generate_transfer_task( const int64_t part_count = min(get_tablet_count_threshold_(), part_list.count()); if (OB_FAIL(transfer_part_list.reserve(part_count))) { LOG_WARN("reserve failed", KR(ret), K(part_count)); - } else if (OB_FAIL(ObCommonIDUtils::gen_unique_id(tenant_id_, task_id))) { - LOG_WARN("gen_unique_id failed", KR(ret), K(task_id), K_(tenant_id)); + } else if (OB_FAIL(ObTransferTaskOperator::generate_transfer_task_id(trans, tenant_id_, task_id))) { + LOG_WARN("fail to generate transfer task id", KR(ret), K_(tenant_id)); } else { // process from the back of part_list makes it easier to remove when task is done for (int64_t i = part_list.count() - 1; OB_SUCC(ret) && (i >= part_list.count() - part_count); --i) { diff --git a/src/share/CMakeLists.txt b/src/share/CMakeLists.txt index c599ab42a5..46b6257080 100644 --- a/src/share/CMakeLists.txt +++ b/src/share/CMakeLists.txt @@ -210,7 +210,9 @@ ob_set_subtarget(ob_share common_mixed location_cache/ob_ls_location_map.cpp location_cache/ob_tablet_ls_service.cpp location_cache/ob_tablet_ls_map.cpp + location_cache/ob_tablet_location_broadcast.cpp location_cache/ob_vtable_location_service.cpp + location_cache/ob_tablet_location_refresh_service.cpp inner_table/ob_inner_table_schema.vt.cpp inner_table/ob_inner_table_schema.lob.cpp inner_table/ob_inner_table_schema_misc.ipp diff --git a/src/share/location_cache/ob_location_service.cpp b/src/share/location_cache/ob_location_service.cpp index 05c3682086..9d45256439 100644 --- a/src/share/location_cache/ob_location_service.cpp +++ b/src/share/location_cache/ob_location_service.cpp @@ -297,7 +297,7 @@ int ObLocationService::init( LOG_WARN("location service init twice", KR(ret)); } else if (OB_FAIL(ls_location_service_.init(ls_pt, schema_service, rs_mgr, srv_rpc_proxy))) { LOG_WARN("ls_location_service init failed", KR(ret)); - } else if (OB_FAIL(tablet_ls_service_.init(sql_proxy))) { + } else if (OB_FAIL(tablet_ls_service_.init(schema_service, sql_proxy, srv_rpc_proxy))) { LOG_WARN("tablet_ls_service init failed", KR(ret)); } else if (OB_FAIL(vtable_location_service_.init(server_tracer, rs_mgr, rpc_proxy))) { LOG_WARN("vtable_location_service init failed", KR(ret)); @@ -315,8 +315,9 @@ int ObLocationService::start() LOG_WARN("location service not init", KR(ret)); } else if (OB_FAIL(ls_location_service_.start())) { LOG_WARN("ls_location_service start failed", KR(ret)); + } else if (OB_FAIL(tablet_ls_service_.start())) { + LOG_WARN("tablet_ls_service start failed", KR(ret)); } - // tablet_ls_service_ and vtable_location_service_ have no threads need to be started return ret; } @@ -490,6 +491,30 @@ int ObLocationService::renew_tablet_location( return ret; } +int ObLocationService::submit_tablet_broadcast_task(const ObTabletLocationBroadcastTask &task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_FAIL(tablet_ls_service_.submit_broadcast_task(task))) { + LOG_WARN("failed to submit_broadcast_task by tablet_ls_service_", KR(ret)); + } + return ret; +} + +int ObLocationService::submit_tablet_update_task(const ObTabletLocationBroadcastTask &task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_FAIL(tablet_ls_service_.submit_update_task(task))) { + LOG_WARN("failed to submit_broadcast_task by tablet_ls_service_", KR(ret)); + } + return ret; +} + ERRSIM_POINT_DEF(EN_CHECK_LS_EXIST_WITH_TENANT_NOT_NORMAL); int ObLocationService::check_ls_exist( diff --git a/src/share/location_cache/ob_location_service.h b/src/share/location_cache/ob_location_service.h index 26c804f962..9b2963e205 100644 --- a/src/share/location_cache/ob_location_service.h +++ b/src/share/location_cache/ob_location_service.h @@ -169,6 +169,9 @@ public: const common::ObTabletID &tablet_id, const int error_code, const bool is_nonblock); + + int submit_tablet_broadcast_task(const ObTabletLocationBroadcastTask &task); + int submit_tablet_update_task(const ObTabletLocationBroadcastTask &task); // ----------------------- End interfaces for tablet to log stream ----------------------- // ----------------------- Interfaces for virtual table location ------------------------- diff --git a/src/share/location_cache/ob_location_struct.cpp b/src/share/location_cache/ob_location_struct.cpp index 4fe36ed21b..1fa369628a 100644 --- a/src/share/location_cache/ob_location_struct.cpp +++ b/src/share/location_cache/ob_location_struct.cpp @@ -18,6 +18,7 @@ #include "lib/oblog/ob_log_module.h" #include "lib/net/ob_addr.h" #include "lib/stat/ob_diagnose_info.h" +#include "share/transfer/ob_transfer_info.h" namespace oceanbase { @@ -53,11 +54,6 @@ OB_SERIALIZE_MEMBER(ObTabletLSKey, tenant_id_, tablet_id_); -OB_SERIALIZE_MEMBER(ObTabletLSCache, - cache_key_, - ls_id_, - renew_time_); - ObLSReplicaLocation::ObLSReplicaLocation() : server_(), role_(FOLLOWER), @@ -702,7 +698,8 @@ int ObTabletLocation::deep_copy( ObTabletLSCache::ObTabletLSCache() : cache_key_(), ls_id_(), - renew_time_(0) + renew_time_(0), + transfer_seq_(OB_INVALID_TRANSFER_SEQ) { } @@ -716,6 +713,7 @@ void ObTabletLSCache::reset() cache_key_.reset(); ls_id_.reset(); renew_time_ = 0; + transfer_seq_ = OB_INVALID_TRANSFER_SEQ; } int ObTabletLSCache::assign(const ObTabletLSCache &other) @@ -725,6 +723,7 @@ int ObTabletLSCache::assign(const ObTabletLSCache &other) cache_key_ = other.cache_key_; ls_id_ = other.ls_id_; renew_time_ = other.renew_time_; + transfer_seq_ = other.transfer_seq_; } return ret; } @@ -733,19 +732,16 @@ bool ObTabletLSCache::is_valid() const { return cache_key_.is_valid() && ls_id_.is_valid() - && renew_time_ > 0; -} - -bool ObTabletLSCache::mapping_is_same_with(const ObTabletLSCache &other) const -{ - return cache_key_ == other.cache_key_ - && ls_id_ == other.ls_id_; + && renew_time_ > 0 + && transfer_seq_ > OB_INVALID_TRANSFER_SEQ; } bool ObTabletLSCache::operator==(const ObTabletLSCache &other) const { - return mapping_is_same_with(other) - && renew_time_ == other.renew_time_; + return cache_key_ == other.cache_key_ + && ls_id_ == other.ls_id_ + && renew_time_ == other.renew_time_ + && transfer_seq_ == other.transfer_seq_; } bool ObTabletLSCache::operator!=(const ObTabletLSCache &other) const @@ -758,7 +754,7 @@ int ObTabletLSCache::init( const ObTabletID &tablet_id, const ObLSID &ls_id, const int64_t renew_time, - const int64_t row_scn) + const int64_t transfer_seq) { int ret = OB_SUCCESS; if (OB_FAIL(cache_key_.init(tenant_id, tablet_id))) { @@ -766,6 +762,7 @@ int ObTabletLSCache::init( } else { ls_id_ = ls_id; renew_time_ = renew_time; + transfer_seq_ = transfer_seq; ObLink::reset(); } return ret; diff --git a/src/share/location_cache/ob_location_struct.h b/src/share/location_cache/ob_location_struct.h index 0d423598ee..de5d448aa9 100644 --- a/src/share/location_cache/ob_location_struct.h +++ b/src/share/location_cache/ob_location_struct.h @@ -286,31 +286,26 @@ public: void reset(); int assign(const ObTabletLSCache &other); bool is_valid() const; - // mapping is same with other, ignoring timestamp - bool mapping_is_same_with(const ObTabletLSCache &other) const; bool operator==(const ObTabletLSCache &other) const; bool operator!=(const ObTabletLSCache &other) const; inline uint64_t get_tenant_id() const { return cache_key_.get_tenant_id(); } inline ObTabletID get_tablet_id() const { return cache_key_.get_tablet_id(); } inline ObLSID get_ls_id() const { return ls_id_; } inline int64_t get_renew_time() const { return renew_time_; } - //inline int64_t get_row_scn() const { return row_scn_; } - //void set_last_access_ts(const int64_t ts) { last_access_ts_ = ts; } - //int64_t get_last_access_ts() const { return last_access_ts_; } const ObTabletLSKey &get_cache_key() const { return cache_key_; } + inline int64_t get_transfer_seq() const { return transfer_seq_; } int init( const uint64_t tenant_id, const ObTabletID &tablet_id, const ObLSID &ls_id, const int64_t renew_time, - const int64_t row_scn); - TO_STRING_KV(K_(cache_key), K_(ls_id), K_(renew_time)); + const int64_t transfer_seq); + TO_STRING_KV(K_(cache_key), K_(ls_id), K_(renew_time), K_(transfer_seq)); private: ObTabletLSKey cache_key_; ObLSID ls_id_; int64_t renew_time_; // renew by sql - //int64_t row_scn_; // used for auto refresh location - //int64_t last_access_ts_; // used for ObTabletLSMap + int64_t transfer_seq_; }; //TODO: Reserved for tableapi. Need remove. diff --git a/src/share/location_cache/ob_location_update_task.cpp b/src/share/location_cache/ob_location_update_task.cpp index 5a910c9a5a..7d0571c76e 100644 --- a/src/share/location_cache/ob_location_update_task.cpp +++ b/src/share/location_cache/ob_location_update_task.cpp @@ -339,5 +339,106 @@ void ObClearTabletLSCacheTimerTask::runTimerTask() } } +ObTabletLocationBroadcastTask::ObTabletLocationBroadcastTask() + : tenant_id_(OB_INVALID_TENANT_ID), task_id_() , + ls_id_(), tablet_list_() +{ + tablet_list_.set_attr(SET_USE_500("BroTabletList")); +} + +int ObTabletLocationBroadcastTask::init( + const uint64_t tenant_id, + const ObTransferTaskID &task_id, + const ObLSID &ls_id, + const ObIArray &tablet_list) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(tablet_list_.assign(tablet_list))) { + LOG_WARN("fail to assign tablet_list_", KR(ret)); + } else { + tenant_id_ = tenant_id; + task_id_ = task_id; + ls_id_ = ls_id; + } + return ret; +} + +int ObTabletLocationBroadcastTask::assign(const ObTabletLocationBroadcastTask &other) +{ + int ret = OB_SUCCESS; + if (this != &other) { + if (OB_FAIL(tablet_list_.assign(other.tablet_list_))) { + LOG_WARN("failed to assign tablet_list_", KR(ret)); + } else { + tenant_id_ = other.tenant_id_; + task_id_ = other.task_id_; + ls_id_ = other.ls_id_; + } + } + return ret; +} + +void ObTabletLocationBroadcastTask::reset() +{ + tenant_id_ = OB_INVALID_TENANT_ID; + task_id_.reset(); + ls_id_.reset(); + tablet_list_.reset(); +} + +bool ObTabletLocationBroadcastTask::is_valid() const +{ + return OB_INVALID_TENANT_ID != tenant_id_ + && task_id_.is_valid() + && ls_id_.is_valid() + && !tablet_list_.empty(); +} + +int64_t ObTabletLocationBroadcastTask::hash() const +{ + uint64_t hash_val = 0; + hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val); + hash_val = murmurhash(&task_id_, sizeof(task_id_), hash_val); + return hash_val; +} + +bool ObTabletLocationBroadcastTask::operator ==(const ObTabletLocationBroadcastTask &other) const +{ + bool equal = false; + if (!is_valid() || !other.is_valid()) { + LOG_WARN_RET(OB_INVALID_ARGUMENT, "invalid argument", "self", *this, K(other)); + } else if (this == &other) { // same pointer + equal = true; + } else { + equal = tenant_id_ == other.tenant_id_ + && task_id_ == other.task_id_; + } + return equal; +} + +bool ObTabletLocationBroadcastTask::operator!=(const ObTabletLocationBroadcastTask &other) const +{ + return !(*this == other); +} + +bool ObTabletLocationBroadcastTask::compare_without_version + (const ObTabletLocationBroadcastTask &other) const +{ + return (*this == other); +} + +int ObTabletLocationBroadcastTask::assign_when_equal( + const ObTabletLocationBroadcastTask &other) +{ + UNUSED(other); + return OB_NOT_SUPPORTED; +} + +OB_SERIALIZE_MEMBER(ObTabletLocationBroadcastTask, + tenant_id_, + task_id_, + ls_id_, + tablet_list_) + } // end namespace share } // end namespace oceanbase diff --git a/src/share/location_cache/ob_location_update_task.h b/src/share/location_cache/ob_location_update_task.h index f5f6115289..2a66f5e6a0 100644 --- a/src/share/location_cache/ob_location_update_task.h +++ b/src/share/location_cache/ob_location_update_task.h @@ -15,6 +15,8 @@ #include "share/location_cache/ob_location_struct.h" #include "observer/ob_uniq_task_queue.h" +#include "share/ob_balance_define.h" // ObTransferTaskID +#include "share/transfer/ob_transfer_info.h" // ObTransferTabletInfo namespace oceanbase { @@ -210,6 +212,46 @@ private: ObTabletLSService &tablet_ls_service_; }; +class ObTabletLocationBroadcastTask + : public observer::ObIUniqTaskQueueTask +{ + OB_UNIS_VERSION(1); +public: + typedef ObSArray TabletInfoList; + ObTabletLocationBroadcastTask(); + virtual ~ObTabletLocationBroadcastTask() {} + int init( + const uint64_t tenant_id, + const ObTransferTaskID &task_id, + const ObLSID &ls_id, + const ObIArray &tablet_list); + int assign(const ObTabletLocationBroadcastTask &other); + virtual void reset(); + virtual bool is_barrier() const { return false; } + virtual bool need_process_alone() const { return true; } // process 1 task each time + virtual bool need_assign_when_equal() const { return false; } + virtual bool is_valid() const; + virtual int64_t hash() const; + virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; } + virtual bool operator==(const ObTabletLocationBroadcastTask &other) const; + virtual bool operator!=(const ObTabletLocationBroadcastTask &other) const; + virtual bool compare_without_version(const ObTabletLocationBroadcastTask &other) const; + virtual uint64_t get_group_id() const { return tenant_id_; } + virtual int assign_when_equal(const ObTabletLocationBroadcastTask &other); + + inline uint64_t get_tenant_id() const { return tenant_id_; } + inline const ObTransferTaskID &get_task_id() const { return task_id_; } + inline const ObLSID &get_ls_id() const { return ls_id_; } + inline const TabletInfoList &get_tablet_list() const { return tablet_list_; } + inline int64_t get_tablet_cnt() const { return tablet_list_.count(); } + + TO_STRING_KV(K_(tenant_id), K_(task_id), K_(ls_id), K_(tablet_list)); +private: + uint64_t tenant_id_; + ObTransferTaskID task_id_; + ObLSID ls_id_; + TabletInfoList tablet_list_; +}; } // end namespace share } // end namespace oceanbase #endif diff --git a/src/share/location_cache/ob_tablet_location_broadcast.cpp b/src/share/location_cache/ob_tablet_location_broadcast.cpp new file mode 100644 index 0000000000..ee81bf5e75 --- /dev/null +++ b/src/share/location_cache/ob_tablet_location_broadcast.cpp @@ -0,0 +1,392 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SERVER + +#include "ob_tablet_location_broadcast.h" + +#include "lib/oblog/ob_log.h" // LOG_* +#include "share/ob_all_server_tracer.h" // SVR_TRACER +#include "share/location_cache/ob_tablet_ls_service.h" // ObTabletLSService +#include "share/ob_cluster_version.h" // GET_MIN_CLUSTER_VERSION +#include "observer/omt/ob_multi_tenant.h" // omt + +namespace oceanbase +{ +using namespace common; +using namespace observer; +using namespace obrpc; +namespace share +{ +void TabletLocationStatistics::reset_() +{ + task_cnt_ = 0; + tablet_suc_cnt_ = 0; + tablet_fail_cnt_ = 0; + total_exec_us_ = 0; + total_wait_us_ = 0; +} + +int64_t TabletLocationStatistics::get_total_cnt_() const +{ + return tablet_suc_cnt_ + tablet_fail_cnt_; +} + +void TabletLocationStatistics::calc_( + const int ret, + const int64_t exec_us, + const int64_t wait_us, + const int64_t tablet_cnt) +{ + total_exec_us_ += static_cast(exec_us); + total_wait_us_ += static_cast(wait_us); + task_cnt_++; + if (OB_SUCCESS == ret) { + tablet_suc_cnt_ += tablet_cnt; + } else { + tablet_fail_cnt_ += tablet_cnt; + } +} + +void TabletLocationStatistics::dump_statistics( + const int exec_ret, + const int64_t exec_ts, + const int64_t wait_ts, + const int64_t tablet_cnt, + const int64_t rate_limit) +{ + ObSpinLockGuard guard(lock_); + // calc statistic + if (tablet_cnt > 0) { + (void) calc_(exec_ret, max(exec_ts, 0), max(wait_ts, 0), tablet_cnt); + } + // print statistics if reach interval + int64_t tablet_total_cnt = get_total_cnt_(); + if (TC_REACH_TIME_INTERVAL(CHECK_INTERVAL_US) && tablet_total_cnt > 0) { + FLOG_INFO("[LOCATION_STATISTIC] tablet location statistics", + K_(task_cnt), K(tablet_total_cnt), K_(tablet_suc_cnt), K_(tablet_fail_cnt), + K_(total_exec_us), K_(total_wait_us), + "avg_rate", total_exec_us_ <= 0 ? -1 : (tablet_total_cnt * ONE_SECOND_US / total_exec_us_), + "rate_limit", rate_limit < 0 ? "no_limit" : to_cstring(rate_limit), + "avg_exec_us", total_exec_us_ / tablet_total_cnt, + "avg_wait_us", total_wait_us_ / tablet_total_cnt); + (void) reset_(); + } +} + +void TabletLocationRateLimit::reset_() +{ + tablet_cnt_ = 0; + start_ts_ = OB_INVALID_TIMESTAMP; +} + +int64_t TabletLocationRateLimit::calc_wait_ts_( + const int64_t tablet_cnt, + const int64_t exec_ts, + const int64_t frequency) +{ + int64_t wait_ts = 0; + int64_t current_ts = ObTimeUtility::current_time(); + // init or >= 1s, reset tablet_cnt and start_ts_ + if (current_ts - start_ts_ >= ONE_SECOND_US) { + tablet_cnt_ = tablet_cnt; + start_ts_ = current_ts - exec_ts; + } else { + tablet_cnt_ += tablet_cnt; + } + // calc wait_ts if tablet_cnt_ exceed frequency + if (frequency > 0 && tablet_cnt_ > frequency) { + wait_ts = tablet_cnt_ / (double) frequency * ONE_SECOND_US - (current_ts - start_ts_); + } + return wait_ts > 0 ? wait_ts : 0; +} + +void TabletLocationRateLimit::control_rate_limit( + const int64_t tablet_cnt, + const int64_t exec_ts, + int64_t rate_limit_conf, + int64_t &wait_ts) +{ + ObSpinLockGuard guard(lock_); + if (rate_limit_conf <= 0 || tablet_cnt <= 0) { + // invalid argument, no need to calc + wait_ts = 0; + } else { + wait_ts = calc_wait_ts_(tablet_cnt, max(exec_ts, 0), rate_limit_conf); + if (wait_ts > 0) { + FLOG_INFO("[LOCATION_STATISTIC] rate limit", + K_(tablet_cnt), K_(start_ts), K(exec_ts), + K(rate_limit_conf), K(wait_ts)); + ob_usleep(static_cast(wait_ts)); + } + } +} + +int ObTabletLocationSender::init(ObSrvRpcProxy *srv_rpc_proxy) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("tablet_location_sender inited twice", KR(ret)); + } else if (OB_ISNULL(srv_rpc_proxy)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("srv_rpc_proxy is nullptr", KR(ret), KP(srv_rpc_proxy)); + } else if (OB_FAIL(send_queue_.init(this, + THREAD_CNT, + TASK_QUEUE_SIZE, + "TabletLocSender"))) { + LOG_WARN("fail to init send_queue", KR(ret)); + } else { + srv_rpc_proxy_ = srv_rpc_proxy; + inited_ = true; + stopped_ = false; + } + return ret; +} + +int ObTabletLocationSender::check_inner_stat() const +{ + int ret = OB_SUCCESS; + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("updater is not inited yet", KR(ret)); + } else if (stopped_) { + ret = OB_CANCELED; + LOG_WARN("updater is stopped now", KR(ret)); + } + return ret; +} + +int ObTabletLocationSender::process_barrier(const ObTabletLocationBroadcastTask &task, bool &stopped) +{ + UNUSEDx(task, stopped); + return OB_NOT_SUPPORTED; +} + +void ObTabletLocationSender::stop() { + stopped_ = true; + send_queue_.stop(); +} + +void ObTabletLocationSender::wait() +{ + if (stopped_) { + send_queue_.wait(); + } +} + +void ObTabletLocationSender::destroy() +{ + stop(); + wait(); + inited_ = false; + stopped_ = true; + srv_rpc_proxy_ = nullptr; +} + +int ObTabletLocationSender::submit_broadcast_task(const ObTabletLocationBroadcastTask &task) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat())) { + LOG_WARN("fail to check_inner_stat", KR(ret), K(task)); + } else if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_1_2) { + // skip + } else if (0 == get_rate_limit()) { + // skip + } else if (OB_UNLIKELY(!task.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid task_info", KR(ret), K(task)); + } else if (OB_FAIL(send_queue_.add(task))) { + LOG_WARN("fail to add task to send_queue_", KR(ret), K(task)); + } + return ret; +} + +int ObTabletLocationSender::batch_process_tasks( + const common::ObIArray &tasks, + bool &stopped) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + ObArray alive_servers; + int64_t rate_limit_conf = 0; + if (OB_FAIL(check_inner_stat())) { + LOG_WARN("fail to check_inner_stat", KR(ret), K(tasks)); + } else if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_1_2) { + // skip + } else if (0 == (rate_limit_conf = get_rate_limit())) { + // skip + } else if (OB_UNLIKELY(1 != tasks.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tasks count is not 1", KR(ret), K(tasks)); + } else if (OB_ISNULL(srv_rpc_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("srv_rpc_proxy_ is nullptr", KR(ret)); + } else if (OB_FAIL(SVR_TRACER.get_alive_servers(ObZone(), alive_servers))) { + LOG_WARN("fail to get_alive_servers", KR(ret)); + } else { + ObTabletLocationSendProxy rpc_proxy(*srv_rpc_proxy_, + &ObSrvRpcProxy::tablet_location_send); + // count of tasks is expected to be 1 + int64_t tablet_cnt = tasks.at(0).get_tablet_cnt(); + ObTabletLocationSendArg rpc_arg; + int64_t timeout = GCONF.location_cache_refresh_rpc_timeout; // 500ms by default + int64_t start_ts = ObTimeUtility::current_time(); + if (OB_FAIL(rpc_arg.set(tasks))) { + LOG_WARN("fail to set tablet_location_send rpc_arg", KR(ret)); + } else { + FOREACH_CNT(addr, alive_servers) { + if (OB_TMP_FAIL(rpc_proxy.call(*addr, timeout, rpc_arg))) { + ret = OB_SUCC(ret) ? tmp_ret : ret; + LOG_WARN("fail to send rpc", KR(tmp_ret), KR(ret), KPC(addr), K(rpc_arg)); + } + } + ObArray rc_array; + if (OB_TMP_FAIL(rpc_proxy.wait_all(rc_array))) { + ret = OB_SUCC(ret) ? tmp_ret : ret; + LOG_WARN("fail to wait all results", KR(ret), KR(tmp_ret)); + } + } + // flow control and dump statistics + int64_t exec_ts = ObTimeUtility::current_time() - start_ts; + int64_t wait_ts = 0; + if (OB_SUCC(ret)) { + (void) rate_limit_.control_rate_limit(tablet_cnt, exec_ts, + rate_limit_conf, wait_ts); + } + (void) statistics_.dump_statistics(ret, exec_ts, wait_ts, tablet_cnt, rate_limit_conf); + } + return ret; +} + +int ObTabletLocationUpdater::init(ObTabletLSService *tablet_ls_service) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("tablet_location_sender inited twice", KR(ret)); + } else if (OB_ISNULL(tablet_ls_service)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("tablet_ls_service is nullptr", KR(ret), KP(tablet_ls_service)); + } else if (OB_FAIL(update_queue_.init(this, + THREAD_CNT, + TASK_QUEUE_SIZE, + "TabletLocUpdater"))) { + LOG_WARN("fail to init update_queue_", KR(ret)); + } else { + tablet_ls_service_ = tablet_ls_service; + inited_ = true; + stopped_ = false; + } + return ret; +} + +int ObTabletLocationUpdater::check_inner_stat() const +{ + int ret = OB_SUCCESS; + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("updater is not inited yet", KR(ret)); + } else if (stopped_) { + ret = OB_CANCELED; + LOG_WARN("updater is stopped now", KR(ret)); + } + return ret; +} + +int ObTabletLocationUpdater::process_barrier(const ObTabletLocationBroadcastTask &task, bool &stopped) +{ + UNUSEDx(task, stopped); + return OB_NOT_SUPPORTED; +} + +void ObTabletLocationUpdater::stop() { + stopped_ = true; + update_queue_.stop(); +} + +void ObTabletLocationUpdater::wait() +{ + if (stopped_) { + update_queue_.wait(); + } +} + +void ObTabletLocationUpdater::destroy() +{ + stop(); + wait(); + inited_ = false; + stopped_ = true; + tablet_ls_service_ = nullptr; +} + +int ObTabletLocationUpdater::submit_update_task(const ObTabletLocationBroadcastTask &received_task) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat())) { + LOG_WARN("fail to check_inner_stat", KR(ret), K(received_task)); + } else if (OB_UNLIKELY(!received_task.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid received_task", KR(ret), K(received_task)); + } else if (OB_FAIL(update_queue_.add(received_task))) { + LOG_WARN("fail to add task to update_queue_", KR(ret), K(received_task)); + } + return ret; +} + + +int ObTabletLocationUpdater::batch_process_tasks( + const ObIArray &tasks, + bool &stopped) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat())) { + LOG_WARN("fail to check_inner_stat", KR(ret), K(tasks)); + } else if (OB_UNLIKELY(1 != tasks.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tasks count is not 1", KR(ret), K(tasks)); + } else if (OB_ISNULL(tablet_ls_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tablet_ls_service_ is nullptr", KR(ret)); + } else { + // tasks count is expected to be 1 + const ObTabletLocationBroadcastTask &task = tasks.at(0); + int64_t start_ts = ObTimeUtility::current_time(); + ObTabletLSCache tablet_ls_cache; + const bool update_only = true; + FOREACH_CNT_X(tablet_info, task.get_tablet_list(), OB_SUCC(ret)) { + tablet_ls_cache.reset(); + int64_t now = ObTimeUtility::current_time(); + if (OB_UNLIKELY(!tablet_info->is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid tablet_info", KR(ret), K(task), KPC(tablet_info)); + } else if (OB_FAIL(tablet_ls_cache.init(task.get_tenant_id(), + tablet_info->tablet_id(), + task.get_ls_id(), + now, + tablet_info->transfer_seq()))) { + LOG_WARN("fail to init tablet_ls_cache", KR(ret), K(task), KPC(tablet_info)); + } else if (OB_FAIL(tablet_ls_service_->update_cache(tablet_ls_cache, update_only))) { + LOG_WARN("fail to update_cache", KR(ret), K(tablet_ls_cache), K(update_only)); + } + } + int64_t exec_ts = ObTimeUtility::current_time() - start_ts; + int64_t wait_ts = 0; + (void) statistics_.dump_statistics(ret, exec_ts, wait_ts, task.get_tablet_cnt(), -1/*rate_limit*/); + } + return ret; +} + +} // end namespace share +} // end namespace oceanbase diff --git a/src/share/location_cache/ob_tablet_location_broadcast.h b/src/share/location_cache/ob_tablet_location_broadcast.h new file mode 100644 index 0000000000..511bee210f --- /dev/null +++ b/src/share/location_cache/ob_tablet_location_broadcast.h @@ -0,0 +1,154 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_OBSERVER_TABLET_LOCATION_BROADCAST_ +#define OCEANBASE_OBSERVER_TABLET_LOCATION_BROADCAST_ + +#include "observer/ob_uniq_task_queue.h" +#include "share/transfer/ob_transfer_info.h" // ObTransferTaskInfo +#include "share/location_cache/ob_location_update_task.h" // ObTabletLocationSendTask +#include "share/rpc/ob_async_rpc_proxy.h" // ObSrvRpcProxy, RPC_F + + +namespace oceanbase +{ +namespace share +{ +class ObTabletLocationSender; +class ObTabletLocationUpdater; +class ObTabletLSService; +typedef observer::ObUniqTaskQueue ObTLSenderQueue; +typedef observer::ObUniqTaskQueue ObTLUpdaterQueue; +RPC_F(obrpc::OB_TABLET_LOCATION_BROADCAST, obrpc::ObTabletLocationSendArg, + obrpc::ObTabletLocationSendResult, ObTabletLocationSendProxy); + +class TabletLocationStatistics +{ +public: + const int64_t ONE_SECOND_US = 1 * 1000 * 1000L; + const static int64_t CHECK_INTERVAL_US = 10 * 1000 * 1000L; // 10s + TabletLocationStatistics() : + lock_(), task_cnt_(0), tablet_suc_cnt_(0), tablet_fail_cnt_(0), + total_exec_us_(0), total_wait_us_(0) {} + ~TabletLocationStatistics() {} + void dump_statistics(const int exec_ret, + const int64_t exec_ts, + const int64_t wait_ts, + const int64_t tablet_cnt, + const int64_t rate_limit = -1); + TO_STRING_KV(K_(tablet_suc_cnt), K_(tablet_fail_cnt), K_(total_exec_us), K_(total_wait_us)); +private: + void reset_(); + int64_t get_total_cnt_() const; + void calc_(const int ret, + const int64_t exec_us, + const int64_t wait_us, + const int64_t tablet_cnt); +private: + common::ObSpinLock lock_; + int64_t task_cnt_; + int64_t tablet_suc_cnt_; + int64_t tablet_fail_cnt_; + uint64_t total_exec_us_; + uint64_t total_wait_us_; +}; + +class TabletLocationRateLimit +{ +public: + const int64_t ONE_SECOND_US = 1 * 1000 * 1000L; +public: + TabletLocationRateLimit() : + lock_(), tablet_cnt_(0), start_ts_(common::OB_INVALID_TIMESTAMP) {} + ~TabletLocationRateLimit() {} + void control_rate_limit(const int64_t tablet_cnt, + const int64_t exec_ts, + int64_t rate_limit_conf, + int64_t &wait_ts); + TO_STRING_KV(K_(tablet_cnt), K_(start_ts)); +private: + void reset_(); + int64_t calc_wait_ts_(const int64_t tablet_cnt, + const int64_t exec_ts, + const int64_t frequency); +private: + common::ObSpinLock lock_; + int64_t tablet_cnt_; + int64_t start_ts_; +}; + +class ObTabletLocationSender +{ +public: + ObTabletLocationSender() + : inited_(false), stopped_(true), srv_rpc_proxy_(nullptr), + send_queue_(), statistics_(), rate_limit_() {} + virtual ~ObTabletLocationSender() { destroy(); } + int init(obrpc::ObSrvRpcProxy * srv_rpc_proxy); + int check_inner_stat() const; + // Unused. + int process_barrier(const ObTabletLocationBroadcastTask &task, bool &stopped); + void stop(); + void wait(); + void destroy(); + + int submit_broadcast_task(const ObTabletLocationBroadcastTask &task_info); + int batch_process_tasks(const common::ObIArray &tasks, + bool &stopped); + inline int64_t get_rate_limit() + { + return GCONF._auto_broadcast_tablet_location_rate_limit; + } +private: + static const int64_t THREAD_CNT = 1; + static const int64_t TASK_QUEUE_SIZE = 100000; + bool inited_; + bool stopped_; + obrpc::ObSrvRpcProxy *srv_rpc_proxy_; + ObTLSenderQueue send_queue_; + TabletLocationStatistics statistics_; + TabletLocationRateLimit rate_limit_; +}; + +class ObTabletLocationUpdater +{ +public: + ObTabletLocationUpdater() + : inited_(false), stopped_(true), + update_queue_(), statistics_() {} + virtual ~ObTabletLocationUpdater() { destroy(); } + int init(share::ObTabletLSService *tablet_ls_service); + int check_inner_stat() const; + // Unused. + int process_barrier(const ObTabletLocationBroadcastTask &task, bool &stopped); + void stop(); + void wait(); + void destroy(); + + int submit_update_task(const share::ObTabletLocationBroadcastTask &received_task); + int batch_process_tasks(const common::ObIArray &tasks, + bool &stopped); +private: + static const int64_t THREAD_CNT = 1; + static const int64_t TASK_QUEUE_SIZE = 100000; + bool inited_; + bool stopped_; + share::ObTabletLSService *tablet_ls_service_; + ObTLUpdaterQueue update_queue_; + TabletLocationStatistics statistics_; +}; + +} // end namespace share +} // end namespace oceanbase +#endif // OCEANBASE_OBSERVER_TABLET_LOCATION_BROADCAST_ \ No newline at end of file diff --git a/src/share/location_cache/ob_tablet_location_refresh_service.cpp b/src/share/location_cache/ob_tablet_location_refresh_service.cpp new file mode 100644 index 0000000000..50fb61497e --- /dev/null +++ b/src/share/location_cache/ob_tablet_location_refresh_service.cpp @@ -0,0 +1,963 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SHARE_LOCATION +#include "share/location_cache/ob_tablet_location_refresh_service.h" +#include "share/location_cache/ob_tablet_ls_service.h" +#include "share/schema/ob_multi_version_schema_service.h" +#include "share/transfer/ob_transfer_task_operator.h" +#include "rootserver/ob_root_utils.h" + +namespace oceanbase +{ +namespace share +{ + +ObTabletLocationRefreshMgr::ObTabletLocationRefreshMgr( + const uint64_t tenant_id, + const ObTransferTaskID &base_task_id) + : mutex_(), + tenant_id_(tenant_id), + base_task_id_(base_task_id), + tablet_ids_(), + inc_task_infos_() +{ + tablet_ids_.set_attr(SET_USE_500("TbltRefIDS")); + inc_task_infos_.set_attr(SET_USE_500("TbltRefTasks")); +} + +ObTabletLocationRefreshMgr::~ObTabletLocationRefreshMgr() +{ +} + +void ObTabletLocationRefreshMgr::set_base_task_id( + const ObTransferTaskID &base_task_id) +{ + lib::ObMutexGuard guard(mutex_); + base_task_id_ = base_task_id; +} + +void ObTabletLocationRefreshMgr::get_base_task_id( + ObTransferTaskID &base_task_id) +{ + lib::ObMutexGuard guard(mutex_); + base_task_id = base_task_id_; +} + +int ObTabletLocationRefreshMgr::set_tablet_ids( + const common::ObIArray &tablet_ids) +{ + int ret = OB_SUCCESS; + lib::ObMutexGuard guard(mutex_); + if (OB_FAIL(tablet_ids_.assign(tablet_ids))) { + LOG_WARN("fail to assign tablet_ids", KR(ret), K_(tenant_id)); + } + return ret; +} + +int ObTabletLocationRefreshMgr::get_tablet_ids( + common::ObIArray &tablet_ids) +{ + int ret = OB_SUCCESS; + lib::ObMutexGuard guard(mutex_); + if (OB_FAIL(tablet_ids.assign(tablet_ids_))) { + LOG_WARN("fail to get tablet_ids", KR(ret), K_(tenant_id)); + } + return ret; +} + +// update & add inc_task_infos_ +int ObTabletLocationRefreshMgr::merge_inc_task_infos( + common::ObArray &inc_task_infos_to_merge) +{ + int ret = OB_SUCCESS; + if (inc_task_infos_to_merge.count() <= 0) { + // do nothing + } else { + std::sort(inc_task_infos_to_merge.begin(), inc_task_infos_to_merge.end(), ObTransferRefreshInfo::less_than); + ObArray new_tasks; + ObArray changed_tasks; + + lib::ObMutexGuard guard(mutex_); + int64_t local_pos = 0; + int64_t inc_pos = 0; + for ( ; + OB_SUCC(ret) + && local_pos < inc_task_infos_.count() + && inc_pos < inc_task_infos_to_merge.count() + ;) { + ObTransferRefreshInfo &local_task = inc_task_infos_.at(local_pos); + const ObTransferRefreshInfo &inc_task = inc_task_infos_to_merge.at(inc_pos); + if (local_task.get_task_id() == inc_task.get_task_id()) { + bool changed = false; + (void) local_task.get_status().update(inc_task.get_status(), changed); + if (changed && OB_FAIL(changed_tasks.push_back(inc_task))) { + LOG_WARN("fail to push back task", KR(ret), K(inc_task)); + } + local_pos++; + inc_pos++; + } else if (local_task.get_task_id() < inc_task.get_task_id()) { + local_pos++; + } else if (local_task.get_task_id() > inc_task.get_task_id()) { + if (OB_FAIL(new_tasks.push_back(inc_task))) { + LOG_WARN("fail to push back task", KR(ret), K(inc_task)); + } + inc_pos++; + } + } // end for + + if (changed_tasks.count() > 0) { + FLOG_INFO("[REFRESH_TABLET_LOCATION] change tasks", + KR(ret), K_(tenant_id), K(changed_tasks.count()), K(changed_tasks)); + } + + for (int64_t i = inc_pos; OB_SUCC(ret) && i < inc_task_infos_to_merge.count(); i++) { + const ObTransferRefreshInfo &inc_task = inc_task_infos_to_merge.at(i); + if (OB_FAIL(new_tasks.push_back(inc_task))) { + LOG_WARN("fail to push back task", KR(ret), K(inc_task)); + } + } // end dor + + if (OB_SUCC(ret) && new_tasks.count() > 0) { + int64_t new_count = inc_task_infos_.count() + new_tasks.count(); + if (OB_FAIL(inc_task_infos_.reserve(new_count))) { + LOG_WARN("fail to reserve array", KR(ret), K(new_count)); + } else if (OB_FAIL(append(inc_task_infos_, new_tasks))) { + LOG_WARN("fail to append array", KR(ret), K(new_count)); + } else { + std::sort(inc_task_infos_.begin(), inc_task_infos_.end(), ObTransferRefreshInfo::less_than); + FLOG_INFO("[REFRESH_TABLET_LOCATION] add tasks", + KR(ret), K_(tenant_id), K(new_tasks.count()), K(new_tasks)); + } + } + } + return ret; +} + +// return 128 task ids at most in one time +int ObTabletLocationRefreshMgr::get_doing_task_ids( + common::ObIArray &task_ids) +{ + int ret = OB_SUCCESS; + task_ids.reset(); + lib::ObMutexGuard guard(mutex_); + for (int64_t i = 0; + OB_SUCC(ret) + && i < inc_task_infos_.count() + && task_ids.count() < BATCH_TASK_COUNT + ; i++) { + const ObTransferRefreshInfo &inc_task = inc_task_infos_.at(i); + if (inc_task.get_status().is_doing_status()) { + if (OB_FAIL(task_ids.push_back(inc_task.get_task_id()))) { + LOG_WARN("fail to push back task", KR(ret), K(inc_task)); + } + } + } // end for + return ret; +} + +// clear inc_task_infos_ & update base_task_id_ +int ObTabletLocationRefreshMgr::clear_task_infos(bool &has_doing_task) +{ + int ret = OB_SUCCESS; + has_doing_task = false; + lib::ObMutexGuard guard(mutex_); + + int64_t continuous_done_status_pos = OB_INVALID_INDEX; + for (int64_t i = 0; OB_SUCC(ret) && i < inc_task_infos_.count(); i++) { + if (inc_task_infos_.at(i).get_status().is_done_status()) { + continuous_done_status_pos = i; + } else { + break; + } + } // end for + + if (OB_SUCC(ret) + && continuous_done_status_pos >= 0 + && continuous_done_status_pos < inc_task_infos_.count()) { + ObTransferTaskID from_task_id = inc_task_infos_.at(0).get_task_id(); + ObTransferTaskID to_task_id = inc_task_infos_.at(continuous_done_status_pos).get_task_id(); + int64_t clear_tasks_cnt = continuous_done_status_pos + 1; + int64_t remain_tasks_cnt = inc_task_infos_.count() - 1 - continuous_done_status_pos; + if (continuous_done_status_pos == inc_task_infos_.count() - 1) { + inc_task_infos_.reset(); + } else { + ObArray tmp_task_infos; + if (OB_FAIL(rootserver::ObRootUtils::copy_array( + inc_task_infos_, continuous_done_status_pos + 1, + inc_task_infos_.count(), tmp_task_infos))) { + LOG_WARN("fail to copy array", KR(ret), K(continuous_done_status_pos), K(inc_task_infos_.count())); + } else if (OB_FAIL(inc_task_infos_.assign(tmp_task_infos))) { + LOG_WARN("fail to assign array", KR(ret), K(tmp_task_infos.count())); + } + } + + if (OB_SUCC(ret) && base_task_id_ < to_task_id) { + FLOG_INFO("[REFRESH_TABLET_LOCATION] update base_task_id when fetch inc tasks", + KR(ret), K_(tenant_id), "from_base_task_id", base_task_id_, "to_base_task_id", to_task_id); + base_task_id_ = to_task_id; + } + FLOG_INFO("[REFRESH_TABLET_LOCATION] clear tasks", KR(ret), K_(tenant_id), + K(from_task_id), K(to_task_id), K(clear_tasks_cnt), K(remain_tasks_cnt)); + } + + for (int64_t i = 0; OB_SUCC(ret) && !has_doing_task && i < inc_task_infos_.count(); i++) { + if (inc_task_infos_.at(i).get_status().is_doing_status()) { + has_doing_task = true; + } + } // end for + return ret; +} + +void ObTabletLocationRefreshMgr::dump_statistic() +{ + lib::ObMutexGuard guard(mutex_); + int64_t unknown_task_cnt = 0; + int64_t doing_task_cnt = 0; + int64_t done_task_cnt = 0; + for (int64_t i = 0; i < inc_task_infos_.count(); i++) { + const ObTransferRefreshInfo &transfer_task = inc_task_infos_.at(i); + if (transfer_task.get_status().is_unknown_status()) { + unknown_task_cnt++; + } else if (transfer_task.get_status().is_done_status()) { + doing_task_cnt++; + } else if (transfer_task.get_status().is_done_status()) { + done_task_cnt++; + } else {} + } // end for + FLOG_INFO("[REFRESH_TABLET_LOCATION] dump statistic", + K_(tenant_id), K_(base_task_id), + "tablet_list_cnt", tablet_ids_.count(), + "total_task_cnt", inc_task_infos_.count(), + K(unknown_task_cnt), K(doing_task_cnt), K(done_task_cnt)); +} + +int64_t ObTabletLocationRefreshServiceIdling::get_idle_interval_us() +{ + int64_t idle_time = DEFAULT_TIMEOUT_US; + if (0 != GCONF._auto_refresh_tablet_location_interval) { + idle_time = GCONF._auto_refresh_tablet_location_interval; + } + return idle_time; +} + +int ObTabletLocationRefreshServiceIdling::fast_idle() +{ + return idle(FAST_TIMEOUT_US); +} + +ObTabletLocationRefreshService::ObTabletLocationRefreshService() + : ObRsReentrantThread(true), + inited_(false), + has_task_(false), + idling_(stop_), + tablet_ls_service_(NULL), + schema_service_(NULL), + sql_proxy_(NULL), + allocator_(SET_USE_500("TbltReSrv")), + rwlock_(), + tenant_mgr_map_() +{ +} + +ObTabletLocationRefreshService::~ObTabletLocationRefreshService() +{ + destroy(); +} + +int ObTabletLocationRefreshService::init( + ObTabletLSService &tablet_ls_service, + share::schema::ObMultiVersionSchemaService &schema_service, + common::ObMySQLProxy &sql_proxy) +{ + int ret = OB_SUCCESS; + const int64_t THREAD_CNT = 1; + const int64_t BUCKET_NUM = 1024; + if (OB_UNLIKELY(inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("already inited", KR(ret)); + } else if (OB_FAIL(create(THREAD_CNT, "TbltRefreshSer"))) { + LOG_WARN("create thread failed", KR(ret)); + } else if (OB_FAIL(tenant_mgr_map_.create(BUCKET_NUM, + SET_USE_500("TbltRefreshMap"), SET_USE_500("TbltRefreshMap")))) { + LOG_WARN("fail to create hash map", KR(ret)); + } else { + tablet_ls_service_ = &tablet_ls_service; + schema_service_ = &schema_service; + sql_proxy_ = &sql_proxy; + has_task_ = false; + inited_ = true; + } + FLOG_INFO("[REFRESH_TABLET_LOCATION] init service", KR(ret)); + return ret; +} + +int ObTabletLocationRefreshService::start() +{ + int ret = OB_SUCCESS; + FLOG_INFO("[REFRESH_TABLET_LOCATION] start service begin"); + if (OB_FAIL(ObRsReentrantThread::start())) { + LOG_WARN("fail to start thread", KR(ret)); + } + FLOG_INFO("[REFRESH_TABLET_LOCATION] start service end", KR(ret)); + return ret; +} + +void ObTabletLocationRefreshService::stop() +{ + FLOG_INFO("[REFRESH_TABLET_LOCATION] stop service begin"); + ObReentrantThread::stop(); + idling_.wakeup(); + FLOG_INFO("[REFRESH_TABLET_LOCATION] stop service end"); +} + +void ObTabletLocationRefreshService::wait() +{ + FLOG_INFO("[REFRESH_TABLET_LOCATION] wait service begin"); + ObRsReentrantThread::wait(); + ObReentrantThread::wait(); + FLOG_INFO("[REFRESH_TABLET_LOCATION] wait service end"); +} + +void ObTabletLocationRefreshService::destroy() +{ + FLOG_INFO("[REFRESH_TABLET_LOCATION] destroy service begin"); + (void) stop(); + (void) wait(); + SpinWLockGuard guard(rwlock_); + if (inited_) { + FOREACH(it, tenant_mgr_map_) { + if (OB_NOT_NULL(it->second)) { + (it->second)->~ObTabletLocationRefreshMgr(); + it->second = NULL; + } + } + tablet_ls_service_ = NULL; + schema_service_ = NULL; + sql_proxy_ = NULL; + tenant_mgr_map_.destroy(); + allocator_.reset(); + has_task_ = false; + inited_ = false; + } + FLOG_INFO("[REFRESH_TABLET_LOCATION] destroy service end"); +} + +void ObTabletLocationRefreshService::run3() +{ + FLOG_INFO("[REFRESH_TABLET_LOCATION] run service begin"); + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not inited yet", KR(ret)); + } else { + while(!stop_) { + ObCurTraceId::init(GCTX.self_addr()); + if (OB_FAIL(refresh_cache_())) { + LOG_WARN("fail to refresh tablet location", KR(ret)); + } + // retry until stopped, reset ret to OB_SUCCESS + ret = OB_SUCCESS; + idle_(); + } + } + FLOG_INFO("[REFRESH_TABLET_LOCATION] run service end"); +} + +void ObTabletLocationRefreshService::idle_() +{ + if (OB_UNLIKELY(stop_)) { + // skip + } else if (OB_UNLIKELY(has_task_ && 0 != GCONF._auto_refresh_tablet_location_interval)) { + (void) idling_.fast_idle(); + } else { + (void) idling_.idle(); + } + has_task_ = false; +} + +int ObTabletLocationRefreshService::check_stop_() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not inited yet", KR(ret)); + } else if (OB_UNLIKELY(stop_)) { + ret = OB_CANCELED; + LOG_WARN("thread has been stopped", KR(ret)); + } else if (OB_UNLIKELY(0 == GCONF._auto_refresh_tablet_location_interval)) { + ret = OB_CANCELED; + LOG_WARN("service is shut down by config", KR(ret)); + } + return ret; +} + +int ObTabletLocationRefreshService::check_tenant_can_refresh_(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + uint64_t data_version = 0; + if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) { + LOG_WARN("fail to check data version", KR(ret), K(tenant_id)); + } else if (data_version < DATA_VERSION_4_2_1_2) { + ret = OB_EAGAIN; + LOG_WARN("tenant's data_version is less than 4.2.1.2, try later", + KR(ret), K(tenant_id), K(data_version)); + } + return ret; +} + +int ObTabletLocationRefreshService::get_base_task_id_( + const uint64_t tenant_id, + ObTransferTaskID &base_task_id) +{ + int ret = OB_SUCCESS; + base_task_id.reset(); + SpinRLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(mgr)) { + ret = OB_TENANT_NOT_EXIST; + LOG_WARN("mgr is null, tenant has been dropped", KR(ret), K(tenant_id)); + } else { + (void) mgr->get_base_task_id(base_task_id); + } + return ret; +} + +int ObTabletLocationRefreshService::try_init_base_point(const int64_t tenant_id) +{ + int ret = OB_SUCCESS; + if (is_user_tenant(tenant_id)) { + bool should_init = false; + { + SpinRLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + should_init = true; + } else { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } + } + } + if (OB_SUCC(ret) && should_init) { + if (OB_FAIL(try_init_base_point_(tenant_id))) { + LOG_WARN("fail to init base point", KR(ret), K(tenant_id)); + } + } + } + return ret; +} + +// 1. mgr is null means tenant has been dropped. +// 2. OB_HASH_NOT_EXIST means tenant has not been inited. +int ObTabletLocationRefreshService::inner_get_mgr_( + const int64_t tenant_id, + ObTabletLocationRefreshMgr *&mgr) +{ + int ret = OB_SUCCESS; + mgr = NULL; + int hash_ret = tenant_mgr_map_.get_refactored(tenant_id, mgr); + if (OB_SUCCESS == hash_ret) { + // success + } else { + mgr = NULL; + ret = hash_ret; + LOG_WARN("fail to get mgr from map", KR(ret), K(tenant_id)); + } + return ret; +} + +int ObTabletLocationRefreshService::get_tenant_ids_( + common::ObIArray &tenant_ids) +{ + int ret = OB_SUCCESS; + SpinRLockGuard guard(rwlock_); + if (OB_FAIL(tenant_ids.reserve(tenant_mgr_map_.size()))) { + LOG_WARN("fail to reserved array", KR(ret)); + } else { + FOREACH_X(it, tenant_mgr_map_, OB_SUCC(ret)) { + const uint64_t tenant_id = it->first; + if (OB_FAIL(tenant_ids.push_back(tenant_id))) { + LOG_WARN("fail to push back tenant_id", KR(ret), K(tenant_id)); + } + } + } + return ret; +} + +int ObTabletLocationRefreshService::try_clear_mgr_(const uint64_t tenant_id, bool &clear) +{ + int ret = OB_SUCCESS; + bool has_been_dropped = false; + clear = false; + if (OB_ISNULL(schema_service_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init yet", KR(ret)); + } else if (OB_FAIL(schema_service_->check_if_tenant_has_been_dropped(tenant_id, has_been_dropped))) { + LOG_WARN("fail to check if tenant has been dropped", KR(ret), K(tenant_id)); + } else if (!has_been_dropped) { + // skip + } else { + bool should_clear = false; + { + SpinRLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } else if (OB_NOT_NULL(mgr)) { + should_clear = true; + } else { + clear = true; + } + } + + if (OB_SUCC(ret) && should_clear) { + SpinWLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } else if (OB_NOT_NULL(mgr)) { + ObTabletLocationRefreshMgr *tmp_mgr = NULL; + int overwrite = 1; + if (OB_FAIL(tenant_mgr_map_.set_refactored(tenant_id, tmp_mgr, overwrite))) { + LOG_WARN("fail to overwrite mgr", KR(ret), K(tenant_id)); + } else { + FLOG_INFO("[REFRESH_TABLET_LOCATION] destroy struct because tenant has been dropped", K(tenant_id)); + mgr->~ObTabletLocationRefreshMgr(); + mgr = NULL; + } + } + if (OB_SUCC(ret)) { + clear = true; + } + } + } + return ret; +} + +int ObTabletLocationRefreshService::try_init_base_point_(const int64_t tenant_id) +{ + int ret = OB_SUCCESS; + uint64_t data_version = 0; + ObTransferTaskID base_task_id; + + // try get data_version + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) { + // tenant's compat version may be not persisted in local in the following cases: + // 1. unit has been dropped + // 2. unit is added/migrated + data_version = LAST_BARRIER_DATA_VERSION; + LOG_WARN("fail to get min data version, use last barrier version instead", + KR(tmp_ret), K(tenant_id), K(data_version)); + } + + // try get base_task_id + if (data_version >= DATA_VERSION_4_2_1_2) { + if (OB_ISNULL(sql_proxy_)) { + ret = OB_NOT_INIT; + LOG_WARN("sql_proxy_ is null", KR(ret)); + } else if (OB_FAIL(ObTransferTaskOperator::fetch_initial_base_task_id( + *sql_proxy_, tenant_id, base_task_id))) { + LOG_WARN("fail to fetch initial base task id", KR(ret), K(tenant_id)); + } + } else { + base_task_id.reset(); + } + + // try init struct + if (OB_SUCC(ret)) { + SpinWLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + void *buf = NULL; + if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObTabletLocationRefreshMgr)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc memory", KR(ret)); + } else if (FALSE_IT(mgr = new (buf) ObTabletLocationRefreshMgr(tenant_id, base_task_id))) { + LOG_WARN("fail to new ObTabletLocationRefreshMgr", + KR(ret), K(tenant_id), K(data_version), K(base_task_id)); + } else if (OB_FAIL(tenant_mgr_map_.set_refactored(tenant_id, mgr))) { + LOG_WARN("fail to set ObTabletLocationRefreshMgr", + KR(ret), K(tenant_id), K(data_version), K(base_task_id)); + } + FLOG_INFO("[REFRESH_TABLET_LOCATION] init struct", + KR(ret), K(tenant_id), K(data_version), K(base_task_id)); + } else { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } + } + } + return ret; +} + +int ObTabletLocationRefreshService::refresh_cache_() +{ + FLOG_INFO("[REFRESH_TABLET_LOCATION] refresh cache start"); + int64_t start_time = ObTimeUtility::current_time(); + int ret = OB_SUCCESS; + ObArray tenant_ids; + if (OB_FAIL(get_tenant_ids_(tenant_ids))) { + LOG_WARN("fail to get tenant_ids", KR(ret)); + } else { + int tmp_ret = OB_SUCCESS; + + // dump statistic + const int64_t DUMP_INTERVAL = 10 * 60 * 1000 * 1000L; // 10min + if (TC_REACH_TIME_INTERVAL(DUMP_INTERVAL)) { + for (int64_t i = 0; i < tenant_ids.count(); i++) { // ignore different tenant's failure + const uint64_t tenant_id = tenant_ids.at(i); + SpinRLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_TMP_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(tmp_ret), K(tenant_id)); + } else if (OB_NOT_NULL(mgr)) { + (void) mgr->dump_statistic(); + } + } // end for + } + + for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); i++) { + const uint64_t tenant_id = tenant_ids.at(i); + if (OB_FAIL(check_stop_())) { + LOG_WARN("fail to check stop", KR(ret)); + } else { // ignore different tenant's failure + bool clear = false; // will be true when tenant has been dropped + if (OB_TMP_FAIL(try_clear_mgr_(tenant_id, clear))) { + LOG_WARN("fail to clear mgr", KR(tmp_ret), K(tenant_id)); + } + + if (!clear && OB_TMP_FAIL(refresh_cache_(tenant_id))) { + LOG_WARN("fail to refresh cache", KR(tmp_ret), K(tenant_id)); + } + } + } // end for + } + FLOG_INFO("[REFRESH_TABLET_LOCATION] refresh cache end", + KR(ret), "cost_us", ObTimeUtility::current_time() - start_time, + "tenant_cnt", tenant_mgr_map_.size()); + return ret; +} + +int ObTabletLocationRefreshService::refresh_cache_(const uint64_t tenant_id) +{ + FLOG_INFO("[REFRESH_TABLET_LOCATION] refresh cache start", K(tenant_id)); + int64_t start_time = ObTimeUtility::current_time(); + int ret = OB_SUCCESS; + if (OB_FAIL(check_stop_())) { + LOG_WARN("fail to check stop", KR(ret)); + } else if (OB_FAIL(check_tenant_can_refresh_(tenant_id))) { + LOG_WARN("fail to check tenant", KR(ret), K(tenant_id)); + } else if (OB_FAIL(try_runs_for_compatibility_(tenant_id))) { + LOG_WARN("fail to runs for compatibility", KR(ret), K(tenant_id)); + } else if (OB_FAIL(fetch_inc_task_infos_and_update_(tenant_id))) { + LOG_WARN("fail to fetch inc task infos", KR(ret), K(tenant_id)); + } else if (OB_FAIL(process_doing_task_infos_(tenant_id))) { + LOG_WARN("fail to process tasks", KR(ret), K(tenant_id)); + } else if (OB_FAIL(clear_task_infos_(tenant_id))) { + LOG_WARN("fail to clear tasks", KR(ret), K(tenant_id)); + } + FLOG_INFO("[REFRESH_TABLET_LOCATION] refresh cache end", + KR(ret), K(tenant_id), "cost_us", ObTimeUtility::current_time() - start_time); + return ret; +} + +// try init base_task_id_ & get tablet_ids from local cache +int ObTabletLocationRefreshService::try_runs_for_compatibility_( + const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + ObTransferTaskID base_task_id; + if (OB_FAIL(get_base_task_id_(tenant_id, base_task_id))) { + LOG_WARN("fail to get base_task_id", KR(ret), K(tenant_id)); + } else if (base_task_id.is_valid()) { + // non compatibility scene, do nothing + } else { + ObArray tablet_ids; + tablet_ids.set_attr(SET_USE_500("TbltRefIDS")); + if (OB_FAIL(check_stop_())) { + LOG_WARN("fail to check stop", KR(ret)); + } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(tablet_ls_service_)) { + ret = OB_NOT_INIT; + LOG_WARN("sql_proxy_ or tablet_ls_service_ is null", + KR(ret), KP_(sql_proxy), KP_(tablet_ls_service)); + } else if (OB_FAIL(ObTransferTaskOperator::fetch_initial_base_task_id( + *sql_proxy_, tenant_id, base_task_id))) { + LOG_WARN("fail to fetch initial base task id", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tablet_ls_service_->get_tablet_ids_from_cache(tenant_id, tablet_ids))) { + LOG_WARN("fail to get tablet_ids", KR(ret), K(tenant_id)); + } else { + SpinWLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + ObTransferTaskID existed_base_task_id; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(mgr)) { + ret = OB_TENANT_NOT_EXIST; + LOG_WARN("mgr is null, tenant has been dropped", KR(ret), K(tenant_id)); + } else if (FALSE_IT(mgr->get_base_task_id(existed_base_task_id))) { + } else if (existed_base_task_id.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("base_task_id should be invalid", + KR(ret), K(tenant_id), K(existed_base_task_id)); + } else if (OB_FAIL(mgr->set_tablet_ids(tablet_ids))) { + LOG_WARN("fail to set tablet_ids", KR(ret)); + } else { + (void) mgr->set_base_task_id(base_task_id); + FLOG_INFO("[REFRESH_TABLET_LOCATION] update base_task_id for compatibility", + K(tenant_id), K(base_task_id), "tablet_ids_cnt", tablet_ids.count()); + } + } + } + + // try reload tablet-ls caches according to tablet_ids_ + if (FAILEDx(try_reload_tablet_cache_(tenant_id))) { + LOG_WARN("fail to reload tablet cache", KR(ret), K(tenant_id)); + } + return ret; +} + +int ObTabletLocationRefreshService::try_reload_tablet_cache_( + const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + ObArray store_tablet_ids; + store_tablet_ids.set_attr(SET_USE_500("TbltRefIDS")); + ObTransferTaskID base_task_id; + { + SpinRLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(mgr)) { + ret = OB_TENANT_NOT_EXIST; + LOG_WARN("mgr is null, tenant has been dropped", KR(ret), K(tenant_id)); + } else if (OB_FAIL(mgr->get_tablet_ids(store_tablet_ids))) { + LOG_WARN("fail to get tablet_ids", KR(ret), K(tenant_id)); + } else { + (void) mgr->get_base_task_id(base_task_id); + } + } + + if (OB_SUCC(ret) && base_task_id.is_valid() && store_tablet_ids.count() > 0) { + + const int64_t MAX_RELOAD_TABLET_NUM_IN_BATCH = 128; + int64_t end_pos = min(store_tablet_ids.count(), MAX_RELOAD_TABLET_NUM_IN_BATCH); + ObArenaAllocator allocator; + ObList process_tablet_ids(allocator); + for (int64_t i = 0; OB_SUCC(ret) && i < end_pos; i++) { + if (OB_FAIL(check_stop_())) { + LOG_WARN("fail to check stop", KR(ret)); + } else if (OB_FAIL(process_tablet_ids.push_back(store_tablet_ids.at(i)))) { + LOG_WARN("fail to push back", KR(ret), K(store_tablet_ids.at(i))); + } + } // end for + + if (OB_SUCC(ret)) { + ObArray tablet_ls_caches; // not used + if (OB_FAIL(check_stop_())) { + LOG_WARN("fail to check stop", KR(ret)); + } else if (OB_ISNULL(tablet_ls_service_)) { + ret = OB_NOT_INIT; + LOG_WARN("tablet_ls_service_ is null", KR(ret)); + } else if (OB_FAIL(tablet_ls_service_->batch_renew_tablet_ls_cache( + tenant_id, process_tablet_ids, tablet_ls_caches))) { + LOG_WARN("fail to batch renew tablet ls cache", + KR(ret), K(tenant_id), "tablet_ids_cnt", process_tablet_ids.size()); + } + } + + if (OB_SUCC(ret)) { + ObArray remain_tablet_ids; + remain_tablet_ids.set_attr(SET_USE_500("TbltRefIDS")); + if (OB_FAIL(rootserver::ObRootUtils::copy_array(store_tablet_ids, + end_pos, store_tablet_ids.count(), remain_tablet_ids))) { + LOG_WARN("fail to copy array", KR(ret), K(end_pos), K(store_tablet_ids.count())); + } else { + SpinRLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(mgr)) { + ret = OB_TENANT_NOT_EXIST; + LOG_WARN("mgr is null, tenant has been dropped", KR(ret), K(tenant_id)); + } else if (OB_FAIL(mgr->set_tablet_ids(remain_tablet_ids))) { + LOG_WARN("fail to set tablet_ids", KR(ret), K(tenant_id), K(remain_tablet_ids.count())); + } else { + has_task_ = (remain_tablet_ids.count() > 0); + } + } + } + + FLOG_INFO("[REFRESH_TABLET_LOCATION] update tablet-ls caches for compatibility", KR(ret), + K(tenant_id), "process_tablet_cnt", process_tablet_ids.size(), + "remain_tablet_cnt", store_tablet_ids.count() - process_tablet_ids.size()); + } + return ret; +} + +int ObTabletLocationRefreshService::fetch_inc_task_infos_and_update_( + const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + ObTransferTaskID base_task_id; + if (OB_FAIL(get_base_task_id_(tenant_id, base_task_id))) { + LOG_WARN("fail to get base_task_id", KR(ret), K(tenant_id)); + } else if (!base_task_id.is_valid()) { + // skip + } else { + ObArray inc_task_infos; + if (OB_FAIL(check_stop_())) { + LOG_WARN("fail to check stop", KR(ret)); + } else if (OB_ISNULL(sql_proxy_)) { + ret = OB_NOT_INIT; + LOG_WARN("sql_proxy_ is null", KR(ret), KP_(sql_proxy)); + } else if (OB_FAIL(ObTransferTaskOperator::fetch_inc_task_infos( + *sql_proxy_, tenant_id, base_task_id, inc_task_infos))) { + LOG_WARN("fail to fetch inc task infos", KR(ret), K(tenant_id), K(base_task_id)); + } else { + SpinRLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(mgr)) { + ret = OB_TENANT_NOT_EXIST; + LOG_WARN("mgr is null, tenant has been dropped", KR(ret), K(tenant_id)); + } else if (OB_FAIL(mgr->merge_inc_task_infos(inc_task_infos))) { + LOG_WARN("fail to merge inc task infos", KR(ret), K(tenant_id), K(inc_task_infos.count())); + } + } + } + return ret; +} + +int ObTabletLocationRefreshService::process_doing_task_infos_( + const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + ObTransferTaskID base_task_id; + if (OB_FAIL(get_base_task_id_(tenant_id, base_task_id))) { + LOG_WARN("fail to get base_task_id", KR(ret), K(tenant_id)); + } else if (!base_task_id.is_valid()) { + // skip + } else { + ObArray task_ids; + { + SpinRLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(mgr)) { + ret = OB_TENANT_NOT_EXIST; + LOG_WARN("mgr is null, tenant has been dropped", KR(ret), K(tenant_id)); + } else if (OB_FAIL(mgr->get_doing_task_ids(task_ids))) { + LOG_WARN("fail to get doing task ids", KR(ret), K(tenant_id)); + } + } + + if (OB_SUCC(ret) && task_ids.count() > 0) { + common::ObArray tablet_ls_caches; + tablet_ls_caches.set_attr(SET_USE_500("TbltRefCaches")); + if (OB_FAIL(check_stop_())) { + LOG_WARN("fail to check stop", KR(ret)); + } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(tablet_ls_service_)) { + ret = OB_NOT_INIT; + LOG_WARN("sql_proxy_ or tablet_ls_service_ is null", + KR(ret), KP_(sql_proxy), KP_(tablet_ls_service)); + } else if (OB_FAIL(ObTransferTaskOperator::batch_get_tablet_ls_cache( + *sql_proxy_, tenant_id, task_ids, tablet_ls_caches))) { + LOG_WARN("fail to get tablet ls cache", KR(ret), K(tenant_id), K(task_ids)); + } else { + int64_t start_time = ObTimeUtility::current_time(); + const bool update_only = true; + for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ls_caches.count(); i++) { + const ObTabletLSCache &tablet_ls = tablet_ls_caches.at(i); + if (OB_FAIL(check_stop_())) { + LOG_WARN("fail to check stop", KR(ret)); + } else if (OB_FAIL(tablet_ls_service_->update_cache(tablet_ls, update_only))) { + LOG_WARN("update cache failed", KR(ret), K(tablet_ls)); + } + } // end for + FLOG_INFO("[REFRESH_TABLET_LOCATION] update tablet-ls caches when process tasks", + KR(ret), K(tenant_id), K(tablet_ls_caches.count()), + "cost_us", ObTimeUtility::current_time() - start_time, + K(task_ids)); + } + + if (OB_SUCC(ret)) { + ObArray done_task_infos; + ObTransferRefreshStatus done_status(ObTransferRefreshStatus::DONE); + if (OB_FAIL(done_task_infos.reserve(task_ids.count()))) { + LOG_WARN("fail to reserve array", KR(ret), K(task_ids.count())); + } + for (int64_t i = 0; OB_SUCC(ret) && i < task_ids.count(); i++) { + ObTransferRefreshInfo task_info; + if (OB_FAIL(check_stop_())) { + LOG_WARN("fail to check stop", KR(ret)); + } else if (OB_FAIL(task_info.init(task_ids.at(i), done_status))) { + LOG_WARN("fail to init refresh info", KR(ret), K(tenant_id), K(task_ids.at(i))); + } else if (OB_FAIL(done_task_infos.push_back(task_info))) { + LOG_WARN("fail to push back task info", KR(ret), K(tenant_id), K(task_info)); + } + } // end for + if (OB_SUCC(ret)) { + SpinRLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(mgr)) { + ret = OB_TENANT_NOT_EXIST; + LOG_WARN("mgr is null, tenant has been dropped", KR(ret), K(tenant_id)); + } else if (OB_FAIL(mgr->merge_inc_task_infos(done_task_infos))) { + LOG_WARN("fail to merge inc task infos", KR(ret), K(tenant_id), K(done_task_infos)); + } + } + } + } + } + return ret; +} + +int ObTabletLocationRefreshService::clear_task_infos_( + const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + ObTransferTaskID base_task_id; + if (OB_FAIL(get_base_task_id_(tenant_id, base_task_id))) { + LOG_WARN("fail to get base_task_id", KR(ret), K(tenant_id)); + } else if (!base_task_id.is_valid()) { + // skip + } else { + SpinRLockGuard guard(rwlock_); + ObTabletLocationRefreshMgr *mgr = NULL; + bool has_doing_task = false; + if (OB_FAIL(inner_get_mgr_(tenant_id, mgr))) { + LOG_WARN("fail to get mgr", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(mgr)) { + ret = OB_TENANT_NOT_EXIST; + LOG_WARN("mgr is null, tenant has been dropped", KR(ret), K(tenant_id)); + } else if (OB_FAIL(mgr->clear_task_infos(has_doing_task))) { + LOG_WARN("fail to clear task", KR(ret), K(tenant_id)); + } else if (has_doing_task) { + has_task_ = true; + } + } + return ret; +} + +} // end namespace share +} // end namespace oceanbase diff --git a/src/share/location_cache/ob_tablet_location_refresh_service.h b/src/share/location_cache/ob_tablet_location_refresh_service.h new file mode 100644 index 0000000000..21f626345c --- /dev/null +++ b/src/share/location_cache/ob_tablet_location_refresh_service.h @@ -0,0 +1,144 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_SHARE_OB_TABLET_LOCATION_REFRESH_SERVICE_H +#define OCEANBASE_SHARE_OB_TABLET_LOCATION_REFRESH_SERVICE_H + +#include "lib/hash/ob_hashmap.h" +#include "lib/lock/ob_mutex.h" +#include "rootserver/ob_rs_reentrant_thread.h" +#include "rootserver/ob_thread_idling.h" +#include "share/transfer/ob_transfer_info.h" +namespace oceanbase +{ +namespace share +{ +namespace schema +{ +class ObMultiVersionSchemaService; +} +class ObTabletLSService; + +class ObTabletLocationRefreshMgr +{ +public: + ObTabletLocationRefreshMgr() = delete; + ObTabletLocationRefreshMgr(const uint64_t tenant_id, + const ObTransferTaskID &base_task_id); + ~ObTabletLocationRefreshMgr(); + + void set_base_task_id(const ObTransferTaskID &base_task_id); + void get_base_task_id(ObTransferTaskID &base_task_id); + + int set_tablet_ids(const common::ObIArray &tablet_ids); + int get_tablet_ids(common::ObIArray &tablet_ids); + + // ensure inc_task_infos_ order by task_id asc + int merge_inc_task_infos(common::ObArray &inc_task_infos_to_merge); + int get_doing_task_ids(common::ObIArray &doing_task_ids); + int clear_task_infos(bool &has_doing_task); + + void dump_statistic(); +public: + static const int64_t BATCH_TASK_COUNT = 128; +private: + lib::ObMutex mutex_; + uint64_t tenant_id_; + // base_task_id_ = 0 : valid, means transfer never occur. + // base_task_id_ = OB_INVALID_ID, invalid, means should be run in compat mode + ObTransferTaskID base_task_id_; + // tablet_ids to reload cache for compatibility + common::ObArray tablet_ids_; + // transfer tasks to process, order by tablet_id asc + common::ObArray inc_task_infos_; + DISALLOW_COPY_AND_ASSIGN(ObTabletLocationRefreshMgr); +}; + +class ObTabletLocationRefreshServiceIdling : public rootserver::ObThreadIdling +{ +public: + explicit ObTabletLocationRefreshServiceIdling(volatile bool &stop) + : ObThreadIdling(stop) {} + virtual int64_t get_idle_interval_us() override; + int fast_idle(); +private: + const static int64_t DEFAULT_TIMEOUT_US = 10 * 60 * 1000 * 1000L; // 10m + const static int64_t FAST_TIMEOUT_US = 1 * 60 * 1000 * 1000L; // 1m +}; + +// refresh all tenant's cached tablet-ls locations automatically +// design doc : ob/rootservice/di76sdhof1h97har#p34dp +class ObTabletLocationRefreshService : public rootserver::ObRsReentrantThread +{ +public: + ObTabletLocationRefreshService(); + virtual ~ObTabletLocationRefreshService(); + + int init(ObTabletLSService &tablet_ls_service, + share::schema::ObMultiVersionSchemaService &schema_service, + common::ObMySQLProxy &sql_proxy); + + int try_init_base_point(const int64_t tenant_id); + + void destroy(); + + virtual int start() override; + virtual void stop() override; + virtual void wait() override; + virtual void run3() override; + virtual int blocking_run() override { BLOCKING_RUN_IMPLEMENT(); } + // don't use common::ObThreadFlags::set_rs_flag() + virtual int before_blocking_run() override { return common::OB_SUCCESS; } + virtual int after_blocking_run() override { return common::OB_SUCCESS; } +private: + void idle_(); + int check_stop_(); + int check_tenant_can_refresh_(const uint64_t tenant_id); + int get_base_task_id_(const uint64_t tenant_id, ObTransferTaskID &base_task_id); + + int inner_get_mgr_(const int64_t tenant_id, + ObTabletLocationRefreshMgr *&mgr); + int get_tenant_ids_(common::ObIArray &tenant_ids); + int try_clear_mgr_(const uint64_t tenant_id, bool &clear); + + int try_init_base_point_(const int64_t tenant_id); + + int refresh_cache_(); + int refresh_cache_(const uint64_t tenant_id); + + int try_runs_for_compatibility_(const uint64_t tenant_id); + int try_reload_tablet_cache_(const uint64_t tenant_id); + + int fetch_inc_task_infos_and_update_(const uint64_t tenant_id); + int process_doing_task_infos_(const uint64_t tenant_id); + int clear_task_infos_(const uint64_t tenant_id); +private: + bool inited_; + bool has_task_; + mutable ObTabletLocationRefreshServiceIdling idling_; + ObTabletLSService *tablet_ls_service_; + share::schema::ObMultiVersionSchemaService *schema_service_; + common::ObMySQLProxy *sql_proxy_; + common::ObArenaAllocator allocator_; + // Wlock will be holded in the following scenes: + // - Init/Destroy tenant's management struct. + // - Init `base_task_id_` for compatibility scence. + // - Destroy tenant's management struct. + common::SpinRWLock rwlock_; + // tenant_mgr_map_ won't be erased + common::hash::ObHashMap tenant_mgr_map_; + DISALLOW_COPY_AND_ASSIGN(ObTabletLocationRefreshService); +}; + +} // end namespace share +} // end namespace oceanbase +#endif // OCEANBASE_SHARE_OB_TABLET_LOCATION_REFRESH_SERVICE_H diff --git a/src/share/location_cache/ob_tablet_ls_map.cpp b/src/share/location_cache/ob_tablet_ls_map.cpp index 088b88153b..c7ff881930 100644 --- a/src/share/location_cache/ob_tablet_ls_map.cpp +++ b/src/share/location_cache/ob_tablet_ls_map.cpp @@ -96,7 +96,9 @@ void ObTabletLSMap::destroy() size_ = 0; } -int ObTabletLSMap::update(const ObTabletLSCache &tablet_ls_cache) +int ObTabletLSMap::update( + const ObTabletLSCache &tablet_ls_cache, + const bool update_only) { int ret = OB_SUCCESS; ObTabletLSCache *curr = NULL; @@ -121,21 +123,25 @@ int ObTabletLSMap::update(const ObTabletLSCache &tablet_ls_cache) if (OB_ISNULL(curr)) { // insert - ObTabletLSCache *tmp = op_alloc(ObTabletLSCache); - if (OB_ISNULL(tmp)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("ls location memory alloc error", KR(ret), K(key), K(tablet_ls_cache)); - } else if (OB_FAIL(tmp->assign(tablet_ls_cache))) { - LOG_WARN("fail to assign tablet_ls_cache", KR(ret), K(tablet_ls_cache)); - } else { - // try_update_access_ts_(tmp); // always update for insert - tmp->next_ = ls_buckets_[pos]; - ls_buckets_[pos] = tmp; - ATOMIC_INC(&size_); + if (!update_only) { + ObTabletLSCache *tmp = op_alloc(ObTabletLSCache); + if (OB_ISNULL(tmp)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("ls location memory alloc error", KR(ret), K(key), K(tablet_ls_cache)); + } else if (OB_FAIL(tmp->assign(tablet_ls_cache))) { + LOG_WARN("fail to assign tablet_ls_cache", KR(ret), K(tablet_ls_cache)); + } else { + // try_update_access_ts_(tmp); // always update for insert + tmp->next_ = ls_buckets_[pos]; + ls_buckets_[pos] = tmp; + ATOMIC_INC(&size_); + } } } else { // update - if (OB_FAIL(curr->assign(tablet_ls_cache))) { + if (curr->get_transfer_seq() >= tablet_ls_cache.get_transfer_seq()) { + LOG_TRACE("current tablet-ls is new enough, just skip", KPC(curr), K(tablet_ls_cache)); + } else if (OB_FAIL(curr->assign(tablet_ls_cache))) { LOG_WARN("fail to assign tablet_ls_cache", KR(ret), K(tablet_ls_cache)); } else { // try_update_access_ts_(curr); // always update for update @@ -200,6 +206,32 @@ int ObTabletLSMap::get_all(common::ObIArray &cache_array) return ret; } +int ObTabletLSMap::get_tablet_ids( + const uint64_t tenant_id, + common::ObIArray &tablet_ids) +{ + int ret = OB_SUCCESS; + int64_t start_time = ObTimeUtility::current_time(); + tablet_ids.reset(); + ObTabletLSCache *tablet_ls_cache = NULL; + for (int64_t i = 0; OB_SUCC(ret) && i < BUCKETS_CNT; ++i) { + ObQSyncLockReadGuard guard(buckets_lock_[i % LOCK_SLOT_CNT]); + tablet_ls_cache = ls_buckets_[i]; + while (OB_NOT_NULL(tablet_ls_cache) && OB_SUCC(ret)) { + if (tablet_ls_cache->get_tenant_id() == tenant_id) { + if (OB_FAIL(tablet_ids.push_back(tablet_ls_cache->get_tablet_id()))) { + LOG_WARN("fail to push back tablet_id", KR(ret), KPC(tablet_ls_cache)); + } + } + tablet_ls_cache = static_cast(tablet_ls_cache->next_); + } + } + FLOG_INFO("get tablet_ids cost", KR(ret), K(tenant_id), + "map_size", size_, "tablet_ids_cnt", tablet_ids.count(), + "cost_us", ObTimeUtility::current_time() - start_time); + return ret; +} + int ObTabletLSMap::del(const ObTabletLSKey &key) { int ret = OB_SUCCESS; diff --git a/src/share/location_cache/ob_tablet_ls_map.h b/src/share/location_cache/ob_tablet_ls_map.h index 7a609c15a5..358093431f 100644 --- a/src/share/location_cache/ob_tablet_ls_map.h +++ b/src/share/location_cache/ob_tablet_ls_map.h @@ -13,7 +13,7 @@ #ifndef OCEANBASE_SHARE_OB_TABLET_LS_MAP #define OCEANBASE_SHARE_OB_TABLET_LS_MAP -#include "lib/lock/ob_qsync_lock.h" +#include "lib/lock/ob_qsync_lock.h" // ObQSyncLockWriteGuard #include "share/location_cache/ob_location_struct.h" // ObTabletLSKey, ObTabletLSCache namespace oceanbase @@ -40,13 +40,15 @@ public: ~ObTabletLSMap() { destroy(); } void destroy(); int init(); - int update(const ObTabletLSCache &tablet_ls_cache); + int update(const ObTabletLSCache &tablet_ls_cache, + const bool update_only); int update_limit_by_threshold( const int64_t threshold, const ObTabletLSKey &key, const ObTabletLSCache &tablet_ls_cache); int get(const ObTabletLSKey &key, ObTabletLSCache &tablet_ls_cache); int get_all(common::ObIArray &cache_array); + int get_tablet_ids(const uint64_t tenant_id, common::ObIArray &tablet_ids); int del(const ObTabletLSKey &key); int64_t size() const { return size_; } diff --git a/src/share/location_cache/ob_tablet_ls_service.cpp b/src/share/location_cache/ob_tablet_ls_service.cpp index 55b12cf457..fd98cea6ae 100644 --- a/src/share/location_cache/ob_tablet_ls_service.cpp +++ b/src/share/location_cache/ob_tablet_ls_service.cpp @@ -31,7 +31,10 @@ using namespace common::hash; namespace share { -int ObTabletLSService::init(common::ObMySQLProxy &sql_proxy) +int ObTabletLSService::init( + share::schema::ObMultiVersionSchemaService &schema_service, + common::ObMySQLProxy &sql_proxy, + obrpc::ObSrvRpcProxy &srv_rpc_proxy) { int ret = OB_SUCCESS; const int64_t user_thread_cnt = @@ -56,6 +59,12 @@ int ObTabletLSService::init(common::ObMySQLProxy &sql_proxy) CLEAR_EXPIRED_CACHE_INTERVAL_US, true/*repeat*/))) { LOG_WARN("schedule clear expired cache timer task failed", KR(ret)); + } else if (OB_FAIL(auto_refresh_service_.init(*this, schema_service, sql_proxy))) { + LOG_WARN("fail to init auto refresh service", KR(ret)); + } else if (OB_FAIL(broadcast_sender_.init(&srv_rpc_proxy))) { + LOG_WARN("broadcast_sender init failed", KR(ret)); + } else if (OB_FAIL(broadcast_updater_.init(this))) { + LOG_WARN("broadcast_updater init failed", KR(ret)); } else { sql_proxy_ = &sql_proxy; inited_ = true; @@ -250,14 +259,29 @@ int ObTabletLSService::process_barrier( return OB_NOT_SUPPORTED; } +int ObTabletLSService::start() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(auto_refresh_service_.start())) { + LOG_WARN("fail to start auto refresh service", KR(ret)); + } + return ret; +} + void ObTabletLSService::stop() { async_queue_.stop(); + auto_refresh_service_.stop(); + broadcast_sender_.stop(); + broadcast_updater_.stop(); } void ObTabletLSService::wait() { async_queue_.wait(); + auto_refresh_service_.wait(); + broadcast_sender_.wait(); + broadcast_updater_.wait(); } int ObTabletLSService::destroy() @@ -267,6 +291,9 @@ int ObTabletLSService::destroy() inited_ = false; inner_cache_.destroy(); async_queue_.destroy(); + auto_refresh_service_.destroy(); + broadcast_sender_.destroy(); + broadcast_updater_.destroy(); return ret; } @@ -303,7 +330,7 @@ int ObTabletLSService::get_from_cache_( tablet_id, SYS_LS, now, - INT64_MAX))) { + 0 /*transfer_seq*/))) { LOG_WARN("init tablet_cache failed", KR(ret), K(cache_key), K(SYS_LS), K(now)); } @@ -347,7 +374,7 @@ int ObTabletLSService::renew_cache_( tablet_id, SYS_LS, now, - INT64_MAX))) { + 0 /*transfer_seq*/))) { LOG_WARN("init tablet_cache failed", KR(ret), K(tenant_id), K(tablet_id), K(SYS_LS), K(now)); } @@ -366,7 +393,9 @@ int ObTabletLSService::renew_cache_( return ret; } -int ObTabletLSService::update_cache_(const ObTabletLSCache &tablet_cache) +int ObTabletLSService::update_cache( + const ObTabletLSCache &tablet_cache, + const bool update_only) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!inited_)) { @@ -375,10 +404,25 @@ int ObTabletLSService::update_cache_(const ObTabletLSCache &tablet_cache) } else if (!tablet_cache.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid tablet_cache", KR(ret), K(tablet_cache)); - } else if (OB_FAIL(inner_cache_.update(tablet_cache))) { - LOG_WARN("put tablet_cache to user inner_cache failed", KR(ret), K(tablet_cache)); + } else if (OB_FAIL(inner_cache_.update(tablet_cache, update_only))) { + LOG_WARN("put tablet_cache to user inner_cache failed", KR(ret), K(tablet_cache), K(update_only)); } else { - LOG_TRACE("renew tablet_cache in inner_cache succeed", KR(ret), K(tablet_cache)); + LOG_TRACE("renew tablet_cache in inner_cache succeed", KR(ret), K(tablet_cache), K(update_only)); + } + return ret; +} + +int ObTabletLSService::get_tablet_ids_from_cache( + const uint64_t tenant_id, + common::ObIArray &tablet_ids) +{ + int ret = OB_SUCCESS; + tablet_ids.reset(); + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("service not init", KR(ret)); + } else if (OB_FAIL(inner_cache_.get_tablet_ids(tenant_id, tablet_ids))) { + LOG_WARN("fail to get tablet_ids", KR(ret), K(tenant_id)); } return ret; } @@ -427,7 +471,7 @@ int ObTabletLSService::batch_renew_tablet_ls_cache( FOREACH_X(tablet_id, tablet_list, OB_SUCC(ret)) { if (belong_to_sys_ls_(tenant_id, *tablet_id)) { ObTabletLSCache cache; - if (OB_FAIL(cache.init(tenant_id, *tablet_id, SYS_LS, now, INT64_MAX))) { + if (OB_FAIL(cache.init(tenant_id, *tablet_id, SYS_LS, now, 0 /*transfer_seq*/))) { LOG_WARN("init cache failed", KR(ret), K(tenant_id), K(*tablet_id), K(now)); } else if (OB_FAIL(tablet_ls_caches.push_back(cache))) { LOG_WARN("push back failed", KR(ret), K(cache)); @@ -443,7 +487,9 @@ int ObTabletLSService::batch_renew_tablet_ls_cache( const int64_t single_get_timeout = GCONF.location_cache_refresh_sql_timeout; // calculate timeout by count of inner_sql const int64_t batch_get_timeout = (user_tablet_ids.count() / ObTabletToLSTableOperator::MAX_BATCH_COUNT + 1) * single_get_timeout; - if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, batch_get_timeout))) { + if (OB_FAIL(auto_refresh_service_.try_init_base_point(tenant_id))) { + LOG_WARN("fail to init base point", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, batch_get_timeout))) { LOG_WARN("fail to set default_timeout_ctx", KR(ret)); } else if (OB_FAIL(ObTabletToLSTableOperator::batch_get_tablet_ls_cache( *sql_proxy_, @@ -461,9 +507,10 @@ int ObTabletLSService::batch_renew_tablet_ls_cache( } } // update user tablet ls cache + bool update_only = false; ARRAY_FOREACH(user_tablet_ls_caches, idx) { const ObTabletLSCache &tablet_ls = user_tablet_ls_caches.at(idx); - if (OB_FAIL(update_cache_(tablet_ls))) { + if (OB_FAIL(update_cache(tablet_ls, update_only))) { LOG_WARN("update cache failed", KR(ret), K(tablet_ls)); } else if (OB_FAIL(tablet_ls_caches.push_back(tablet_ls))) { LOG_WARN("push back faled", KR(ret), K(tablet_ls)); @@ -599,5 +646,29 @@ int ObTabletLSService::clear_expired_cache() return ret; } +int ObTabletLSService::submit_broadcast_task(const ObTabletLocationBroadcastTask &task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("service not init", KR(ret)); + } else if (OB_FAIL(broadcast_sender_.submit_broadcast_task(task))) { + LOG_WARN("failed to submit broadcast task by sender", KR(ret), K(task)); + } + return ret; +} + +int ObTabletLSService::submit_update_task(const ObTabletLocationBroadcastTask &task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("service not init", KR(ret)); + } else if (OB_FAIL(broadcast_updater_.submit_update_task(task))) { + LOG_WARN("failed to submit broadcast task by sender", KR(ret)); + } + return ret; +} + } // end namespace share } // end namespace oceanbase diff --git a/src/share/location_cache/ob_tablet_ls_service.h b/src/share/location_cache/ob_tablet_ls_service.h index 50afa853e9..abbef8bf1a 100644 --- a/src/share/location_cache/ob_tablet_ls_service.h +++ b/src/share/location_cache/ob_tablet_ls_service.h @@ -16,6 +16,8 @@ #include "share/location_cache/ob_location_struct.h" #include "share/location_cache/ob_location_update_task.h" #include "share/location_cache/ob_tablet_ls_map.h" // ObTabletLSMap +#include "share/location_cache/ob_tablet_location_refresh_service.h" +#include "share/location_cache/ob_tablet_location_broadcast.h" // ObTabletLocationSender, ObTabletLocationUpdater namespace oceanbase { @@ -39,9 +41,14 @@ public: sql_proxy_(NULL), inner_cache_(), async_queue_(), - clear_expired_cache_task_(*this) {} + clear_expired_cache_task_(*this), + broadcast_sender_(), + broadcast_updater_(), + auto_refresh_service_() {} virtual ~ObTabletLSService() {} - int init(common::ObMySQLProxy &sql_proxy); + int init(share::schema::ObMultiVersionSchemaService &schema_service, + common::ObMySQLProxy &sql_proxy, + obrpc::ObSrvRpcProxy &srv_rpc_proxy); // Gets the mapping between the tablet and log stream synchronously. // // @param [in] tenant_id: target tenant which the tablet belongs to @@ -86,13 +93,21 @@ public: bool &stopped); // Unused. int process_barrier(const ObTabletLSUpdateTask &task, bool &stopped); + int start(); void stop(); void wait(); int destroy(); int reload_config(); int clear_expired_cache(); + int submit_broadcast_task(const ObTabletLocationBroadcastTask &task); + int submit_update_task(const ObTabletLocationBroadcastTask &task); + int update_cache(const ObTabletLSCache &tablet_cache, const bool update_only); + + int get_tablet_ids_from_cache( + const uint64_t tenant_id, + common::ObIArray &tablet_ids); private: int get_from_cache_( const uint64_t tenant_id, @@ -102,7 +117,6 @@ private: const uint64_t tenant_id, const ObTabletID &tablet_id, ObTabletLSCache &tablet_cache); - int update_cache_(const ObTabletLSCache &tablet_cache); int set_timeout_ctx_(common::ObTimeoutCtx &ctx); bool is_valid_key_(const uint64_t tenant_id, const ObTabletID &tablet_id) const; int erase_cache_(const uint64_t tenant_id, const ObTabletID &tablet_id); @@ -122,7 +136,10 @@ private: ObTabletLSMap inner_cache_; // Store the mapping between tablet and log stream in user tenant. ObTabletLSUpdateQueue async_queue_; ObClearTabletLSCacheTimerTask clear_expired_cache_task_; // timer task used to clear the expired cache + ObTabletLocationSender broadcast_sender_; // broadcast updated tablet location to every server + ObTabletLocationUpdater broadcast_updater_; // process received broadcast task //TODO: need more queue later + ObTabletLocationRefreshService auto_refresh_service_; }; } // end namespace share diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index c86b2b3f72..24f0506b32 100755 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -9833,5 +9833,83 @@ int ObAdminUnlockMemberListOpArg::set( OB_SERIALIZE_MEMBER(ObAdminUnlockMemberListOpArg, tenant_id_, ls_id_, lock_id_); +ObTabletLocationSendArg::ObTabletLocationSendArg() + : tasks_() +{ +} + +ObTabletLocationSendArg::~ObTabletLocationSendArg() +{ +} + +int ObTabletLocationSendArg::assign(const ObTabletLocationSendArg &other) +{ + int ret = OB_SUCCESS; + if (this != &other) { + if (OB_FAIL(tasks_.assign(other.tasks_))) { + LOG_WARN("fail to assign tasks_", KR(ret)); + } + } + return ret; +} + +int ObTabletLocationSendArg::set( + const ObIArray &tasks) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(tasks.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("tasks array is empty", KR(ret)); + } else if (OB_FAIL(tasks_.assign(tasks))) { + LOG_WARN("fail to assign tasks", KR(ret)); + } + return ret; +} + +bool ObTabletLocationSendArg::is_valid() const +{ + return !tasks_.empty(); +} + +void ObTabletLocationSendArg::reset() +{ + tasks_.reset(); +} + +OB_SERIALIZE_MEMBER(ObTabletLocationSendArg, tasks_); + +ObTabletLocationSendResult::ObTabletLocationSendResult() + : ret_(common::OB_ERROR) +{} + +ObTabletLocationSendResult::~ObTabletLocationSendResult() +{} + +int ObTabletLocationSendResult::assign(const ObTabletLocationSendResult &other) +{ + int ret = OB_SUCCESS; + if (this != &other) { + ret_ = other.ret_; + } + return ret; +} + +void ObTabletLocationSendResult::reset() +{ + ret_ = common::OB_ERROR; +} + +void ObTabletLocationSendResult::set_ret(int ret) +{ + ret_ = ret; +} + +int ObTabletLocationSendResult::get_ret() const +{ + return ret_; +} + +OB_SERIALIZE_MEMBER(ObTabletLocationSendResult, ret_); + }//end namespace obrpc }//end namepsace oceanbase diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 285b7fdf19..6477b692aa 100755 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -72,6 +72,7 @@ #include "share/scn.h"//SCN #include "share/ob_server_table_operator.h" #include "share/restore/ob_import_arg.h" +#include "share/location_cache/ob_location_update_task.h" namespace oceanbase { @@ -10235,6 +10236,36 @@ public: int64_t lock_id_; }; +struct ObTabletLocationSendArg final +{ + OB_UNIS_VERSION(1); +public: + ObTabletLocationSendArg(); + ~ObTabletLocationSendArg(); + int assign(const ObTabletLocationSendArg &other); + int set(const ObIArray &tasks); + const ObSArray &get_tasks() const { return tasks_; } + bool is_valid() const; + void reset(); + TO_STRING_KV(K_(tasks)); +public: + ObSArray tasks_; +}; + +struct ObTabletLocationSendResult final +{ + OB_UNIS_VERSION(1); +public: + ObTabletLocationSendResult(); + ~ObTabletLocationSendResult(); + int assign(const ObTabletLocationSendResult &other); + void reset(); + void set_ret(int ret); + int get_ret() const; + TO_STRING_KV(K_(ret)); +private: + int ret_; +}; }//end namespace obrpc }//end namespace oceanbase #endif diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index a3ebef794a..247a127a4d 100755 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -240,6 +240,7 @@ public: RPC_AP(PR5 tablet_major_freeze, OB_TABLET_MAJOR_FREEZE, (ObTabletMajorFreezeArg), obrpc::Int64); // RPC_AP(PR5 kill_client_session, OB_KILL_CLIENT_SESSION, (ObKillClientSessionArg), ObKillClientSessionRes); // RPC_S(PR5 client_session_create_time, OB_CLIENT_SESSION_CONNECT_TIME, (ObClientSessionCreateTimeArg), ObClientSessionCreateTimeRes); + RPC_AP(PR5 tablet_location_send, OB_TABLET_LOCATION_BROADCAST, (obrpc::ObTabletLocationSendArg), obrpc::ObTabletLocationSendResult); }; // end of class ObSrvRpcProxy } // end of namespace rpc diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 14d5a9c79a..f4aa0ae69b 100755 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -814,6 +814,13 @@ DEF_TIME(location_cache_refresh_rpc_timeout, OB_CLUSTER_PARAMETER, "500ms", "[1m DEF_TIME(location_cache_refresh_sql_timeout, OB_CLUSTER_PARAMETER, "1s", "[1ms,)", "The timeout used for refreshing location cache by SQL. Range: [1ms, +∞)", ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_TIME(_auto_refresh_tablet_location_interval, OB_CLUSTER_PARAMETER, "10m", "[0s,)", + "Polling period of auto refresh tablet location service. " + "When the value is 0, it means shutting down related service. Range: [0s, +∞)", + ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_INT(_auto_broadcast_tablet_location_rate_limit, OB_CLUSTER_PARAMETER, "10000", "[0, 100000]", + "Maximum number of tablets broadcasted per second by a single observer. When the value is 0, it means shutting down related logic.", + ObParameterAttr(Section::LOCATION_CACHE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); //// cache config DEF_INT(tablet_ls_cache_priority, OB_CLUSTER_PARAMETER, "1000", "[1,)", "tablet ls cache priority. Range:[1, )", diff --git a/src/share/restore/ob_recover_table_util.cpp b/src/share/restore/ob_recover_table_util.cpp index 26a6eb033b..f4f5d70b71 100644 --- a/src/share/restore/ob_recover_table_util.cpp +++ b/src/share/restore/ob_recover_table_util.cpp @@ -51,4 +51,4 @@ int ObRecoverTableUtil::check_compatible(const uint64_t target_tenant_id) LOG_USER_ERROR(OB_NOT_SUPPORTED, "Tenant COMPATIBLE is below 4.2.1.0, recover table is"); } return ret; -} \ No newline at end of file +} diff --git a/src/share/tablet/ob_tablet_to_ls_operator.cpp b/src/share/tablet/ob_tablet_to_ls_operator.cpp index 6eba25eeb5..e566378bb0 100644 --- a/src/share/tablet/ob_tablet_to_ls_operator.cpp +++ b/src/share/tablet/ob_tablet_to_ls_operator.cpp @@ -19,6 +19,7 @@ #include "lib/string/ob_sql_string.h" // ObSqlString #include "lib/mysqlclient/ob_mysql_proxy.h" // ObISqlClient, SMART_VAR #include "observer/ob_sql_client_decorator.h" // ObSQLClientRetryWeak +#include "share/transfer/ob_transfer_info.h" namespace oceanbase { @@ -633,7 +634,7 @@ int ObTabletToLSTableOperator::inner_batch_get_( common::ObIArray &tablet_ls_caches) { int ret = OB_SUCCESS; - const char *query_column_str = "tablet_id, ls_id, ORA_ROWSCN"; + const char *query_column_str = "*"; const bool keep_order = false; INNER_BATCH_GET(sql_proxy, tenant_id, tablet_ids, start_idx, end_idx, query_column_str, keep_order, tablet_ls_caches); @@ -651,19 +652,21 @@ int ObTabletToLSTableOperator::construct_results_( tablet_ls_cache.reset(); uint64_t tablet_id = ObTabletID::INVALID_TABLET_ID; int64_t ls_id = ObLSID::INVALID_LS_ID; - int64_t row_scn = OB_MIN_SCN_TS_NS; + int64_t transfer_seq = OB_INVALID_TRANSFER_SEQ; EXTRACT_INT_FIELD_MYSQL(res, "tablet_id", tablet_id, uint64_t); EXTRACT_INT_FIELD_MYSQL(res, "ls_id", ls_id, int64_t); - EXTRACT_INT_FIELD_MYSQL(res, "ORA_ROWSCN", row_scn, int64_t); + EXTRACT_INT_FIELD_MYSQL_WITH_DEFAULT_VALUE( + res, "transfer_seq", transfer_seq, int64_t, + false/*skip null error*/, true/*skip column error*/, 0 /*default value*/); const int64_t now = ObTimeUtility::fast_current_time(); if (FAILEDx(tablet_ls_cache.init( tenant_id, ObTabletID(tablet_id), ObLSID(ls_id), now, - row_scn))) { + transfer_seq))) { LOG_WARN("init tablet_ls_cache failed", KR(ret), K(tenant_id), - K(tablet_id), K(ls_id), K(now), K(row_scn)); + K(tablet_id), K(ls_id), K(now), K(transfer_seq)); } else if (OB_FAIL(tablet_ls_caches.push_back(tablet_ls_cache))) { LOG_WARN("fail to push back", KR(ret), K(tablet_ls_cache)); } diff --git a/src/share/transfer/ob_transfer_info.cpp b/src/share/transfer/ob_transfer_info.cpp index e5c2b1e519..fa87dc2ef4 100644 --- a/src/share/transfer/ob_transfer_info.cpp +++ b/src/share/transfer/ob_transfer_info.cpp @@ -161,6 +161,115 @@ int ObTransferStatusHelper::check_can_change_status( return ret; } +/////////////// ObTransferRefreshStatus/////////////// +ObTransferRefreshStatus &ObTransferRefreshStatus::operator=(const ObTransferRefreshStatus &other) +{ + status_ = other.status_; + return *this; +} + +ObTransferRefreshStatus &ObTransferRefreshStatus::operator=(const ObTransferRefreshStatus::STATUS &status) +{ + status_ = status; + return *this; +} + +const char *ObTransferRefreshStatus::str() const +{ + const char *str = "INVALID"; + switch (status_) { + case UNKNOWN: { + str = "UNKNOWN"; + break; + } + case DOING: { + str = "DOING"; + break; + } + case DONE: { + str = "DONE"; + break; + } + default: { + break; + } + } + return str; +} + +// change ObTransferStatus to ObTransferRefreshStatus +void ObTransferRefreshStatus::convert_from(const ObTransferStatus &status) +{ + if (status.is_init_status() + || status.is_start_status()) { + status_ = UNKNOWN; + } else if (status.is_doing_status() + || status.is_completed_status()) { + status_ = DOING; + } else if (status.is_aborted_status() + || status.is_canceled_status() + || status.is_failed_status()) { + status_ = DONE; + } else { + status_ = INVALID; + } +} + +// update status according to state machine +void ObTransferRefreshStatus::update( + const ObTransferRefreshStatus &other, + bool &changed) +{ + changed = false; + const ObTransferRefreshStatus::STATUS &new_status = other.status_; + switch (status_) { + case INVALID : { + if (INVALID != new_status) { + changed = true; + status_ = new_status; + } + break; + } + case UNKNOWN: { + if (DOING == new_status + || DONE == new_status) { + changed = true; + status_ = other.status_; + } + break; + } + case DOING: { + if (DONE == new_status) { + changed = true; + status_ = new_status; + } + break; + } + case DONE: { + break; + } + default: { + break; + } + } +} + +int ObTransferRefreshInfo::init( + const ObTransferTaskID &task_id, + const ObTransferRefreshStatus &status) +{ + int ret = OB_SUCCESS; + task_id_ = task_id; + status_ = status; + return ret; +} + +void ObTransferRefreshInfo::reset() +{ + task_id_.reset(); + status_.reset(); +} + ObTransferTabletInfo::ObTransferTabletInfo() : tablet_id_(), transfer_seq_(OB_INVALID_TRANSFER_SEQ) diff --git a/src/share/transfer/ob_transfer_info.h b/src/share/transfer/ob_transfer_info.h index 454b03551f..5210d7344d 100644 --- a/src/share/transfer/ob_transfer_info.h +++ b/src/share/transfer/ob_transfer_info.h @@ -91,6 +91,65 @@ struct ObTransferStatusHelper bool &can_change); }; +/////////////// ObTransferRefreshStatus/////////////// +// For auto refresh tablet location +class ObTransferRefreshStatus final +{ +public: + enum STATUS : uint8_t + { + UNKNOWN = 0, + DOING = 1, + DONE = 2, + INVALID + }; + ObTransferRefreshStatus() : status_(INVALID) {} + ~ObTransferRefreshStatus() {} + explicit ObTransferRefreshStatus(const ObTransferRefreshStatus &other) : status_(other.status_) {} + explicit ObTransferRefreshStatus(const ObTransferRefreshStatus::STATUS &status) : status_(status) {} + + ObTransferRefreshStatus &operator=(const ObTransferRefreshStatus &other); + ObTransferRefreshStatus &operator=(const ObTransferRefreshStatus::STATUS &status); + + bool is_valid() const { return UNKNOWN <= status_ && status_ < INVALID; } + void reset() { status_ = INVALID; } + const char *str() const; + + bool is_unknown_status() const { return UNKNOWN == status_; } + bool is_doing_status() const { return DOING == status_; } + bool is_done_status() const { return DONE == status_; } + + void convert_from(const ObTransferStatus &status); + void update(const ObTransferRefreshStatus &other, bool &changed); + + TO_STRING_KV(K_(status), "status", str()); +private: + STATUS status_; +}; + +class ObTransferRefreshInfo final +{ +public: + ObTransferRefreshInfo() : task_id_(), status_() {} + ~ObTransferRefreshInfo() {} + + int init(const ObTransferTaskID &task_id, + const ObTransferRefreshStatus &status); + void reset(); + + const ObTransferTaskID &get_task_id() const { return task_id_; } + ObTransferRefreshStatus &get_status() { return status_; } + const ObTransferRefreshStatus &get_status() const { return status_; } + + static bool less_than(const ObTransferRefreshInfo &left, const ObTransferRefreshInfo &right) + { return left.task_id_ < right.task_id_; } + + TO_STRING_KV(K_(task_id), K_(status)); +private: + ObTransferTaskID task_id_; + ObTransferRefreshStatus status_; +}; + /////////////// ObTransferTabletInfo /////////////// // // Represents a Tablet for Transfer diff --git a/src/share/transfer/ob_transfer_task_operator.cpp b/src/share/transfer/ob_transfer_task_operator.cpp index 4eabd062f8..81213839bb 100644 --- a/src/share/transfer/ob_transfer_task_operator.cpp +++ b/src/share/transfer/ob_transfer_task_operator.cpp @@ -15,7 +15,9 @@ #include "share/transfer/ob_transfer_task_operator.h" #include "share/ob_dml_sql_splicer.h" // ObDMLSqlSplicer #include "lib/mysqlclient/ob_mysql_proxy.h" // ObISqlClient, SMART_VAR +#include "lib/mysqlclient/ob_mysql_transaction.h" // ObMySQLTransaction #include "share/inner_table/ob_inner_table_schema.h" // OB_ALL_TRANSFER_TASK_TNAME +#include "share/location_cache/ob_location_struct.h" // ObTabletLSCache namespace oceanbase { @@ -1082,5 +1084,321 @@ int ObTransferTaskOperator::update_comment( return ret; } +int ObTransferTaskOperator::generate_transfer_task_id( + common::ObMySQLTransaction &trans, + const uint64_t tenant_id, + ObTransferTaskID &new_task_id) +{ + int ret = OB_SUCCESS; + new_task_id.reset(); + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); + } else { + SMART_VAR(ObISQLClient::ReadResult, res) { + ObSqlString sql; + common::sqlclient::ObMySQLResult *result = NULL; + if (OB_FAIL(sql.assign_fmt( + "SELECT MAX(task_id) AS task_id, 0 AS is_history FROM %s " + "UNION SELECT MAX(task_id) AS task_id, 1 AS is_history FROM %s", + OB_ALL_TRANSFER_TASK_TNAME, OB_ALL_TRANSFER_TASK_HISTORY_TNAME))) { + LOG_WARN("fail to assign fmt", KR(ret)); + } else if (OB_FAIL(trans.read(res, tenant_id, sql.ptr()))) { + LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get mysql result failed", KR(ret), K(tenant_id), K(sql)); + } else { + int64_t row_count = 0; + int64_t max_task_id = 0; + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("fail to get next", KR(ret)); + } + } else { + row_count++; + int64_t task_id = OB_INVALID_ID; // NULL means empty, then convert to 0 + EXTRACT_INT_FIELD_MYSQL_WITH_DEFAULT_VALUE( + *result, "task_id", task_id, int64_t, + true /*skip_null_error*/, false /*skip_column_error*/, 0 /*default_value*/); + if (OB_SUCC(ret)) { + max_task_id = max(max_task_id, task_id); + } + } + } // end while + + if (OB_SUCC(ret)) { + if (OB_UNLIKELY(2 != row_count)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("row_count not match", KR(ret), K(tenant_id), K(row_count)); + } else { + new_task_id = ObTransferTaskID(max_task_id + 1); + } + } + } + } // end SMART_VAR + } + return ret; +} + +int ObTransferTaskOperator::fetch_initial_base_task_id( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + ObTransferTaskID &base_task_id) +{ + int ret = OB_SUCCESS; + base_task_id.reset(); + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); + } else { + SMART_VAR(ObISQLClient::ReadResult, res) { + ObSqlString sql; + common::sqlclient::ObMySQLResult *result = NULL; + if (OB_FAIL(sql.assign_fmt( + "SELECT MIN(task_id) AS task_id, 0 AS is_history FROM %s " + "UNION SELECT MAX(task_id) AS task_id, 1 AS is_history FROM %s", + OB_ALL_TRANSFER_TASK_TNAME, OB_ALL_TRANSFER_TASK_HISTORY_TNAME))) { + LOG_WARN("fail to assign fmt", KR(ret)); + } else if (OB_FAIL(sql_proxy.read(res, tenant_id, sql.ptr()))) { + LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get mysql result failed", KR(ret), K(tenant_id), K(sql)); + } else { + int64_t min_process_task_id = OB_INVALID_ID; + int64_t max_history_task_id = OB_INVALID_ID; + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("fail to get next", KR(ret)); + } + } else { + int64_t task_id = OB_INVALID_ID; // NULL means empty, then convert to 0 + int64_t is_history = 0; + EXTRACT_INT_FIELD_MYSQL_WITH_DEFAULT_VALUE( + *result, "task_id", task_id, int64_t, + true /*skip_null_error*/, false /*skip_column_error*/, 0 /*default_value*/); + EXTRACT_INT_FIELD_MYSQL(*result, "is_history", is_history, int64_t); + + if (OB_FAIL(ret)) { + } else if (0 == is_history) { + min_process_task_id = task_id; + } else { + max_history_task_id = task_id; + } + } + } // end while + + if (OB_SUCC(ret)) { + if (OB_UNLIKELY(min_process_task_id < 0 || max_history_task_id < 0)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid result", KR(ret), K(tenant_id), K(min_process_task_id), K(max_history_task_id)); + } else if (0 == min_process_task_id && 0 == max_history_task_id) { + // __all_transfer_task/__all_transfer_task_history are empty + base_task_id = ObTransferTaskID(0); + } else if (0 == min_process_task_id && 0 < max_history_task_id) { + // only __all_transfer_task is empty + base_task_id = ObTransferTaskID(max_history_task_id); + } else if (0 < min_process_task_id && 0 == max_history_task_id) { + // only __all_transfer_task_history is empty + base_task_id = ObTransferTaskID(min_process_task_id - 1); + } else { + // __all_transfer_task/__all_transfer_task_history are not empty + int64_t min_task_id = min(min_process_task_id - 1, max_history_task_id); + base_task_id = ObTransferTaskID(min_task_id); + } + } + } + } // end SMART_VAR + } + return ret; +} + +int ObTransferTaskOperator::fetch_inc_task_infos( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + const ObTransferTaskID &base_task_id, + common::ObIArray &inc_task_infos) +{ + int ret = OB_SUCCESS; + inc_task_infos.reset(); + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id + || !base_task_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id/base_task_id", KR(ret), K(tenant_id), K(base_task_id)); + } else { + SMART_VAR(ObISQLClient::ReadResult, res) { + ObSqlString sql; + common::sqlclient::ObMySQLResult *result = NULL; + // transfer task which status is `completed` but tablet_list is invalid actually do nothing. + ObTransferStatus complete_status(ObTransferStatus::COMPLETED); + ObTransferStatus fail_status(ObTransferStatus::FAILED); + +#define FETCH_INC_TRANSFER_TASKS_SQL "SELECT task_id, " \ + "(CASE WHEN status = '%s' AND (tablet_list is null OR tablet_list = '') THEN '%s' ELSE status END) AS STATUS " \ + "FROM %s WHERE task_id > %ld " + + if (OB_FAIL(sql.assign_fmt( + FETCH_INC_TRANSFER_TASKS_SQL " UNION " FETCH_INC_TRANSFER_TASKS_SQL, + complete_status.str(), fail_status.str(), OB_ALL_TRANSFER_TASK_TNAME, base_task_id.id(), + complete_status.str(), fail_status.str(), OB_ALL_TRANSFER_TASK_HISTORY_TNAME, base_task_id.id() + ))) { + LOG_WARN("fail to assign sql", KR(ret), K(tenant_id)); + } else if (OB_FAIL(sql_proxy.read(res, tenant_id, sql.ptr()))) { + LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get mysql result failed", KR(ret), K(tenant_id), K(sql)); + } else { + int64_t task_id_val = ObTransferTaskID::INVALID_ID; + ObString status_str; + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("fail to get next", KR(ret)); + } + } else { + EXTRACT_INT_FIELD_MYSQL(*result, "task_id", task_id_val, int64_t); + EXTRACT_VARCHAR_FIELD_MYSQL(*result, "STATUS", status_str); + + ObTransferStatus status; + if (FAILEDx(status.parse_from_str(status_str))) { + LOG_WARN("fail to parse status", KR(ret), K(status_str)); + } else { + ObTransferTaskID task_id(task_id_val); + ObTransferRefreshStatus refresh_status; + ObTransferRefreshInfo refresh_info; + (void) refresh_status.convert_from(status); + if (OB_UNLIKELY(!task_id.is_valid() || !refresh_status.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invaild task_id/refresh_status", + KR(ret), K(tenant_id), K(task_id), K(refresh_status), K(status)); + } else if (OB_FAIL(refresh_info.init(task_id, refresh_status))) { + LOG_WARN("fail to init refresh info", + KR(ret), K(tenant_id), K(task_id), K(refresh_status)); + } else if (OB_FAIL(inc_task_infos.push_back(refresh_info))) { + LOG_WARN("fail to push back refresh info", + KR(ret), K(tenant_id), K(refresh_info)); + } + } + } + } // end while + } + } // end SMART_VAR +#undef TRANSFER_TASK_COLUMNS + } + return ret; +} + +int ObTransferTaskOperator::batch_get_tablet_ls_cache( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + const common::ObIArray &task_ids, + common::ObIArray &tablet_ls_caches) +{ + int ret = OB_SUCCESS; + tablet_ls_caches.reset(); + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id + || task_ids.count() <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id/task_ids_cnt", + KR(ret), K(tenant_id), "task_ids_cnt", task_ids.count()); + } else { + SMART_VAR(ObISQLClient::ReadResult, res) { + ObSqlString sql; + common::sqlclient::ObMySQLResult *result = NULL; + + ObSqlString task_ids_str; + for (int64_t i = 0; OB_SUCC(ret) && i < task_ids.count(); i++) { + if (OB_FAIL(task_ids_str.append_fmt("%s%ld", + 0 == i ? "" : ", ", task_ids.at(i).id()))) { + LOG_WARN("fail to append fmt", KR(ret), K(i), K(task_ids.at(i))); + } + } // end for + + if (FAILEDx(sql.assign_fmt( + "SELECT tablet_list, dest_ls FROM %s WHERE task_id in (%s) " + "UNION SELECT tablet_list, dest_ls FROM %s WHERE task_id in (%s)", + OB_ALL_TRANSFER_TASK_TNAME, task_ids_str.ptr(), + OB_ALL_TRANSFER_TASK_HISTORY_TNAME, task_ids_str.ptr()))) { + LOG_WARN("fail to assign fmt", KR(ret), K(tenant_id)); + } else if (OB_FAIL(sql_proxy.read(res, tenant_id, sql.ptr()))) { + LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get mysql result failed", KR(ret), K(tenant_id), K(sql)); + } else { + const int64_t now = ObTimeUtility::fast_current_time(); + int64_t row_cnt = 0; + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("fail to get next", KR(ret)); + } + } else { + row_cnt++; + ObString tablet_list_str; + int64_t dest_ls_id = ObLSID::INVALID_LS_ID; + EXTRACT_INT_FIELD_MYSQL(*result, "dest_ls", dest_ls_id, int64_t); + EXTRACT_VARCHAR_FIELD_MYSQL(*result, "tablet_list", tablet_list_str); + + ObTransferTabletList tablet_list; + ObLSID ls_id(dest_ls_id); + if (FAILEDx(tablet_list.parse_from_display_str(tablet_list_str))) { + LOG_WARN("fail to parse from str", KR(ret), K(tablet_list_str)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < tablet_list.count(); i++) { + const ObTransferTabletInfo &tablet_info = tablet_list.at(i); + ObTabletLSCache tablet_ls_cache; + // `transfer_seq` in __all_transfer_task/__all_transfer_task_history means + // the original `transfer_seq` before transfer task execute. + // We should use `transfer_seq` + 1 as the result of related transfer task. + int64_t transfer_seq = tablet_info.transfer_seq() + 1; + if (OB_FAIL(tablet_ls_cache.init( + tenant_id, + tablet_info.tablet_id(), + ls_id, + now, + transfer_seq))) { + LOG_WARN("fail to init tablet-ls cache", + KR(ret), K(tenant_id), K(ls_id), K(tablet_info), K(transfer_seq)); + } else if (OB_FAIL(tablet_ls_caches.push_back(tablet_ls_cache))) { + LOG_WARN("fail to push back tablet-ls cache", KR(ret), K(tablet_ls_cache)); + } + } // end for + } + + } + } // end while + + if (OB_SUCC(ret)) { + if (OB_UNLIKELY(task_ids.count() != row_cnt + || tablet_ls_caches.count() <= 0)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result not match", KR(ret), K(tenant_id), K(row_cnt), + K(task_ids.count()), K(tablet_ls_caches.count()), K(task_ids)); + } + } + } + + } // end SMART_VAR + } + return ret; +} + } // end namespace share } // end namespace oceanbase diff --git a/src/share/transfer/ob_transfer_task_operator.h b/src/share/transfer/ob_transfer_task_operator.h index de17c19936..09ae3735dd 100644 --- a/src/share/transfer/ob_transfer_task_operator.h +++ b/src/share/transfer/ob_transfer_task_operator.h @@ -30,6 +30,7 @@ class ObMySQLResult; namespace share { class ObDMLSqlSplicer; +class ObTabletLSCache; // operator for __all_transfer_task class ObTransferTaskOperator final @@ -423,6 +424,36 @@ public: const ObTransferTaskID task_id, const ObTransferTaskComment &comment); + /* + * generate new task_id for transfer task + * + * should be protected by trans to ensure task_id is unique + */ + static int generate_transfer_task_id( + common::ObMySQLTransaction &trans, + const uint64_t tenant_id, + ObTransferTaskID &new_task_id); + + /*-----For auto refresh tablet location----*/ + + static int fetch_initial_base_task_id( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + ObTransferTaskID &base_task_id); + + static int fetch_inc_task_infos( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + const ObTransferTaskID &base_task_id, + common::ObIArray &inc_task_infos); + + static int batch_get_tablet_ls_cache( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + const common::ObIArray &task_ids, + common::ObIArray &tablet_ls_caches); + /*-----------------------------------------*/ + private: static int get_by_ls_id_( common::ObISQLClient &sql_proxy, diff --git a/src/storage/high_availability/ob_transfer_handler.cpp b/src/storage/high_availability/ob_transfer_handler.cpp index a6fdd53b76..1ee7fd85fc 100644 --- a/src/storage/high_availability/ob_transfer_handler.cpp +++ b/src/storage/high_availability/ob_transfer_handler.cpp @@ -419,6 +419,7 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta palf::LogConfigVersion config_version; bool is_leader = true; bool succ_block_tx = false; + bool commit_succ = false; if (!is_inited_) { ret = OB_NOT_INIT; @@ -494,16 +495,19 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta DEBUG_SYNC(BEFORE_TRANSFER_START_COMMIT); } + commit_succ = OB_SUCC(ret); if (OB_TMP_FAIL(commit_trans_(ret, trans))) { LOG_WARN("failed to commit trans", K(tmp_ret), K(ret)); if (OB_SUCCESS == ret) { ret = tmp_ret; } + commit_succ = false; } clear_prohibit_(task_info, succ_block_tx, succ_stop_medium); } + if (OB_FAIL(ret)) { if (!is_leader) { } else if (can_retry_(task_info, ret)) { @@ -521,6 +525,10 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta } } + if (commit_succ && OB_TMP_FAIL(broadcast_tablet_location_(task_info))) { + LOG_WARN("failed to submit submit tablet_broadcast task", KR(tmp_ret), K(task_info)); + } + if (OB_SUCCESS != (tmp_ret = record_server_event_(ret, round_, task_info))) { LOG_WARN("failed to record server event", K(tmp_ret), K(ret), K(retry_count_), K(task_info)); } @@ -2309,6 +2317,47 @@ int ObTransferHandler::get_src_ls_member_list_( return ret; } +int ObTransferHandler::broadcast_tablet_location_(const ObTransferTaskInfo &task_info) +{ + int ret = OB_SUCCESS; + share::ObLocationService *location_service = nullptr; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer handler do not init", K(ret)); + } else if (OB_UNLIKELY(!task_info.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("task_info not valid", KR(ret), K(task_info)); + } else if (OB_ISNULL(location_service = GCTX.location_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("location service should not be NULL", K(ret), KP(location_service)); + } else { + ObTabletLocationBroadcastTask broadcast_task; + ObArray tablet_info_list; + if (OB_FAIL(tablet_info_list.reserve(task_info.tablet_list_.count()))) { + LOG_WARN("failed to reserve for tablet_info_list", KR(ret), "count", task_info.tablet_list_.count()); + } + // increment transfer_seq + FOREACH_CNT_X(tablet_info, task_info.tablet_list_, OB_SUCC(ret)) { + ObTransferTabletInfo new_tablet_info; + if (OB_FAIL(new_tablet_info.init(tablet_info->tablet_id(), tablet_info->transfer_seq() + 1))) { + LOG_WARN("failed to init new_table_info", KR(ret), KPC(tablet_info)); + } else if (OB_FAIL(tablet_info_list.push_back(new_tablet_info))) { + LOG_WARN("failed to push_back", KR(ret), K(new_tablet_info)); + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(broadcast_task.init(task_info.tenant_id_, + task_info.task_id_, + task_info.dest_ls_id_, + tablet_info_list))) { + LOG_WARN("failed to init broadcast_task", KR(ret), K(task_info), K(tablet_info_list)); + } else if (OB_FAIL(location_service->submit_tablet_broadcast_task(broadcast_task))) { + LOG_WARN("failed to submit tablet location broadcast task", KR(ret), K(broadcast_task)); + } + } + return ret; +} + } } diff --git a/src/storage/high_availability/ob_transfer_handler.h b/src/storage/high_availability/ob_transfer_handler.h index 620d46ad4e..b58e6bfb05 100644 --- a/src/storage/high_availability/ob_transfer_handler.h +++ b/src/storage/high_availability/ob_transfer_handler.h @@ -257,6 +257,7 @@ private: bool &task_exist) const; int get_src_ls_member_list_( common::ObMemberList &member_list); + int broadcast_tablet_location_(const share::ObTransferTaskInfo &task_info); private: static const int64_t INTERVAL_US = 1 * 1000 * 1000; //1s diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index c6c5232a49..65e18f59e4 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -254,7 +254,9 @@ writing_throttling_trigger_percentage zone _advance_checkpoint_timeout _audit_mode +_auto_broadcast_tablet_location_rate_limit _auto_drop_recovering_auxiliary_tenant +_auto_refresh_tablet_location_interval _backup_idle_time _backup_task_keep_alive_interval _backup_task_keep_alive_timeout