From 17bea8074bb1e18be6f1f5a637a0d5b343e02a74 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 3 Nov 2022 10:10:17 +0000 Subject: [PATCH] fix tablet report logic --- src/observer/CMakeLists.txt | 1 - src/observer/ob_service.cpp | 164 +----- src/observer/ob_service.h | 30 +- src/observer/ob_tablet_checksum_updater.cpp | 508 ------------------ src/observer/ob_tablet_checksum_updater.h | 141 ----- src/observer/report/ob_i_meta_report.h | 4 - .../report/ob_tablet_table_updater.cpp | 117 ++-- src/observer/report/ob_tablet_table_updater.h | 7 +- .../report/ob_tenant_meta_checker.cpp | 9 +- .../ob_tablet_replica_checksum_operator.cpp | 129 +++-- .../ob_tablet_replica_checksum_operator.h | 30 +- src/share/tablet/ob_tablet_table_operator.cpp | 106 ++-- src/share/tablet/ob_tablet_table_operator.h | 12 + .../compaction/ob_tablet_merge_task.cpp | 4 +- .../compaction/ob_tenant_tablet_scheduler.cpp | 4 +- src/storage/ddl/ob_ddl_merge_task.cpp | 2 - unittest/storage/mock_ob_meta_report.h | 3 - 17 files changed, 323 insertions(+), 948 deletions(-) delete mode 100644 src/observer/ob_tablet_checksum_updater.cpp delete mode 100644 src/observer/ob_tablet_checksum_updater.h diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index 0c2596a49..489236d91 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -35,7 +35,6 @@ ob_set_subtarget(ob_server common ob_srv_xlator_primary.cpp ob_srv_xlator_rootserver.cpp ob_srv_xlator_storage.cpp - ob_tablet_checksum_updater.cpp ob_tenant_duty_task.cpp ob_uniq_task_queue.cpp ) diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index eedf701c3..51eaa79b1 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -164,7 +164,7 @@ ObService::ObService(const ObGlobalContext &gctx) lease_state_mgr_(), heartbeat_process_(gctx, schema_updater_, lease_state_mgr_), gctx_(gctx), server_trace_task_(), schema_release_task_(), schema_status_task_(), remote_master_rs_update_task_(gctx), ls_table_updater_(), - tablet_table_updater_(), tablet_checksum_updater_(), meta_table_checker_() + tablet_table_updater_(), meta_table_checker_() { } @@ -205,8 +205,6 @@ int ObService::init(common::ObMySQLProxy &sql_proxy, FLOG_WARN("init tablet table updater failed", KR(ret)); } else if (OB_FAIL(ls_table_updater_.init())) { FLOG_WARN("init log stream table updater failed", KR(ret)); - } else if (OB_FAIL(tablet_checksum_updater_.init(*this))) { - FLOG_WARN("init tablet checksum updater failed", KR(ret)); } else if (OB_FAIL(meta_table_checker_.init( gctx_.lst_operator_, gctx_.tablet_operator_, @@ -300,10 +298,6 @@ void ObService::stop() tablet_table_updater_.stop(); FLOG_INFO("tablet table updater stopped"); - FLOG_INFO("begin to stop tablet checksum updater"); - tablet_checksum_updater_.stop(); - FLOG_INFO("tablet checksum updater stopped"); - FLOG_INFO("begin to stop meta table checker"); meta_table_checker_.stop(); FLOG_INFO("meta table checker stopped"); @@ -329,10 +323,6 @@ void ObService::wait() tablet_table_updater_.wait(); FLOG_INFO("wait tablet table updater success"); - FLOG_INFO("begin to wait tablet checksum updater"); - tablet_checksum_updater_.wait(); - FLOG_INFO("wait tablet checksum updater success"); - FLOG_INFO("begin to wait meta table checker"); meta_table_checker_.wait(); FLOG_INFO("wait meta table checker success"); @@ -430,23 +420,6 @@ int ObService::submit_tablet_update_task( return ret; } -int ObService::submit_tablet_checksums_task( - const uint64_t tenant_id, - const ObLSID &ls_id, - const ObTabletID &tablet_id) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("not init", KR(ret)); - } else if (!ls_id.is_valid_with_tenant(tenant_id) || !tablet_id.is_valid_with_tenant(tenant_id)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(tablet_id)); - } else if (OB_FAIL(tablet_checksum_updater_.async_update(tenant_id, ls_id, tablet_id))) { - LOG_WARN("fail to async update tablet checksum", KR(ret), K(tenant_id), K(ls_id), K(tablet_id)); - } - return ret; -} int ObService::submit_async_refresh_schema_task( const uint64_t tenant_id, @@ -1942,11 +1915,13 @@ int ObService::write_ddl_sstable_commit_log(const ObDDLWriteSSTableCommitLogArg return ret; } -int ObService::inner_fill_tablet_replica_( +int ObService::inner_fill_tablet_info_( const int64_t tenant_id, const ObTabletID &tablet_id, storage::ObLS *ls, - ObTabletReplica &tablet_replica) + ObTabletReplica &tablet_replica, + share::ObTabletReplicaChecksumItem &tablet_checksum, + const bool need_checksum) { ObLSHandle ls_handle; ObTabletHandle tablet_handle; @@ -1978,9 +1953,9 @@ int ObService::inner_fill_tablet_replica_( const ObLSID &ls_id = ls->get_ls_id(); int64_t data_size = 0; int64_t required_size = 0; - ObArray column_checksums; // param palceholder + ObArray column_checksums; if (OB_FAIL(tablet_handle.get_obj()->get_tablet_report_info(column_checksums, data_size, - required_size, false/*need_checksum*/))) { + required_size, need_checksum))) { LOG_WARN("fail to get tablet report info from tablet", KR(ret), K(tenant_id), K(tablet_id)); } else if (OB_FAIL(tablet_replica.init( tenant_id, @@ -1992,66 +1967,29 @@ int ObService::inner_fill_tablet_replica_( required_size))) { LOG_WARN("fail to init a tablet replica", KR(ret), K(tenant_id), K(tablet_id), K(tablet_replica)); - } - } - return ret; -} - -int ObService::inner_fill_tablet_replica_checksum_item_( - const int64_t tenant_id, - const ObTabletID &tablet_id, - storage::ObLS *ls, - share::ObTabletReplicaChecksumItem &item) -{ - int ret = OB_SUCCESS; - ObTabletHandle tablet_handle; - if (OB_UNLIKELY(!inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("service not inited", KR(ret)); - } else if (OB_UNLIKELY(!tablet_id.is_valid()) - || OB_INVALID_TENANT_ID == tenant_id - || OB_ISNULL(ls)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument or nullptr", KR(ret), K(tablet_id), K(tenant_id)); - } else if (OB_ISNULL(ls->get_tablet_svr())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get_tablet_svr is null", KR(ret), K(tenant_id), K(tablet_id)); - } else if (OB_FAIL(ls->get_tablet_svr()->get_tablet( - tablet_id, - tablet_handle, - ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { - if (OB_TABLET_NOT_EXIST != ret) { - LOG_WARN("get tablet failed", KR(ret), K(tenant_id), K(tablet_id)); - } - } else if (OB_ISNULL(gctx_.config_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("gctx_.config_ is null", KR(ret), K(tenant_id), K(tablet_id)); - } else { - item.tenant_id_ = tenant_id; - item.ls_id_ = ls->get_ls_id(); - item.tablet_id_ = tablet_id; - item.server_ = gctx_.self_addr(); - item.row_count_ = tablet_handle.get_obj()->get_tablet_meta().report_status_.row_count_; - item.snapshot_version_ = tablet_handle.get_obj()->get_tablet_meta().report_status_.merge_snapshot_version_; - item.data_checksum_ = tablet_handle.get_obj()->get_tablet_meta().report_status_.data_checksum_; - - int64_t data_size = 0; - int64_t required_size = 0; - ObArray column_checksums; - if (OB_FAIL(tablet_handle.get_obj()->get_tablet_report_info(column_checksums, data_size, required_size))) { - LOG_WARN("fail to get column checksums from tablet", KR(ret), K(tenant_id), K(tablet_id), K(item)); - } else if (OB_FAIL(item.column_meta_.init(column_checksums))) { + } else if (!need_checksum) { + } else if (OB_FAIL(tablet_checksum.column_meta_.init(column_checksums))) { LOG_WARN("fail to init report column meta with column_checksums", KR(ret), K(column_checksums)); + } else { + tablet_checksum.tenant_id_ = tenant_id; + tablet_checksum.ls_id_ = ls->get_ls_id(); + tablet_checksum.tablet_id_ = tablet_id; + tablet_checksum.server_ = gctx_.self_addr(); + tablet_checksum.snapshot_version_ = snapshot_version; + tablet_checksum.row_count_ = tablet_handle.get_obj()->get_tablet_meta().report_status_.row_count_; + tablet_checksum.data_checksum_ = tablet_handle.get_obj()->get_tablet_meta().report_status_.data_checksum_; } } return ret; } -int ObService::fill_tablet_replica( +int ObService::fill_tablet_report_info( const uint64_t tenant_id, const ObLSID &ls_id, const ObTabletID &tablet_id, - ObTabletReplica &tablet_replica) + ObTabletReplica &tablet_replica, + share::ObTabletReplicaChecksumItem &tablet_checksum, + const bool need_checksum) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!inited_)) { @@ -2077,62 +2015,18 @@ int ObService::fill_tablet_replica( LOG_TRACE("log stream not exist in this tenant", KR(ret), K(tenant_id), K(ls_id)); } } else if (FALSE_IT(ls = ls_handle.get_ls())) { - } else if (OB_FAIL(inner_fill_tablet_replica_(tenant_id, - tablet_id, - ls, - tablet_replica))) { + } else if (OB_FAIL(inner_fill_tablet_info_(tenant_id, + tablet_id, + ls, + tablet_replica, + tablet_checksum, + need_checksum))) { if (OB_TABLET_NOT_EXIST != ret) { LOG_WARN("fail to inner fill tenant's tablet replica", KR(ret), - K(tenant_id), K(tablet_id), K(ls), K(tablet_replica)); + K(tenant_id), K(tablet_id), K(ls), K(tablet_replica), K(tablet_checksum), K(need_checksum)); } else { LOG_TRACE("tablet not exist in this log stream", KR(ret), - K(tenant_id), K(tablet_id), K(ls), K(tablet_replica)); - } - } - } - } - return ret; -} - -int ObService::fill_tablet_replica_checksum_item( - const uint64_t tenant_id, - const share::ObLSID &ls_id, - const ObTabletID &tablet_id, - share::ObTabletReplicaChecksumItem &item) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("service not inited", KR(ret)); - } else if (!tablet_id.is_valid() || !ls_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tablet_id), K(ls_id), K(tenant_id)); - } else { - MTL_SWITCH(tenant_id) { - ObTabletHandle tablet_handle; - ObLSHandle ls_handle; - storage::ObLS *ls = nullptr; - ObLSService* ls_svr = nullptr; - if (OB_ISNULL(ls_svr = MTL(ObLSService*))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("MTL ObLSService is null", KR(ret), K(tenant_id)); - } else if (OB_FAIL(ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::OBSERVER_MOD))) { - if (OB_LS_NOT_EXIST != ret) { - LOG_WARN("fail to get log_stream's ls_handle", KR(ret), K(tenant_id), K(ls_id)); - } else { - LOG_TRACE("log stream not exist in this tenant", KR(ret), K(tenant_id), K(ls_id)); - } - } else if (FALSE_IT(ls = ls_handle.get_ls())) { - } else if (OB_FAIL(inner_fill_tablet_replica_checksum_item_(tenant_id, - tablet_id, - ls, - item))) { - if (OB_TABLET_NOT_EXIST != ret) { - LOG_WARN("fail to inner fill tenant's tablet checksum item", KR(ret), - K(tenant_id), K(tablet_id), K(ls), K(item)); - } else { - LOG_TRACE("tablet not exist in this log stream", KR(ret), - K(tenant_id), K(tablet_id), K(ls), K(item)); + K(tenant_id), K(tablet_id), K(ls), K(tablet_replica), K(tablet_checksum), K(need_checksum)); } } } diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index d0dbd88d0..36d50c0e0 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -18,7 +18,6 @@ #include "share/ob_all_server_tracer.h" #include "observer/ob_lease_state_mgr.h" #include "observer/ob_heartbeat.h" -#include "observer/ob_tablet_checksum_updater.h" #include "observer/ob_server_schema_updater.h" #include "observer/ob_rpc_processor_simple.h" #include "observer/ob_uniq_task_queue.h" @@ -92,18 +91,17 @@ public: // @params[in] ls_id: tablet belongs to which log stream // @params[in] tablet_id: the tablet to build // @params[out] tablet_replica: infos about this tablet replica + // @params[out] tablet_checksum: infos about this tablet data/column checksum + // @params[in] need_checksum: whether to fill tablet_checksum // ATTENTION: If ls not exist, then OB_LS_NOT_EXIST // If tablet not exist on that ls, then OB_TABLET_NOT_EXIST - int fill_tablet_replica( + int fill_tablet_report_info( const uint64_t tenant_id, const share::ObLSID &ls_id, const ObTabletID &tablet_id, - share::ObTabletReplica &tablet_replica); - int fill_tablet_replica_checksum_item( - const uint64_t tenant_id, - const share::ObLSID &ls_id, - const ObTabletID &tablet_id, - share::ObTabletReplicaChecksumItem &item); + share::ObTabletReplica &tablet_replica, + share::ObTabletReplicaChecksumItem &tablet_checksum, + const bool need_checksum = true); int detect_master_rs_ls(const obrpc::ObDetectMasterRsArg &arg, obrpc::ObDetectMasterRsLSResult &result); @@ -120,10 +118,6 @@ public: const uint64_t tenant_id, const share::ObLSID &ls_id, const ObTabletID &tablet_id) override; - virtual int submit_tablet_checksums_task( - const uint64_t tenant_id, - const share::ObLSID &ls_id, - const ObTabletID &tablet_id) override; //////////////////////////////////////////////////////////////// int check_frozen_version(const obrpc::ObCheckFrozenVersionArg &arg); @@ -227,16 +221,13 @@ public: int renew_in_zone_hb(const share::ObInZoneHbRequest &arg, share::ObInZoneHbResponse &result); private: - int inner_fill_tablet_replica_( + int inner_fill_tablet_info_( const int64_t tenant_id, const ObTabletID &tablet_id, storage::ObLS *ls, - share::ObTabletReplica &tablet_replica); - int inner_fill_tablet_replica_checksum_item_( - const int64_t tenant_id, - const ObTabletID &tablet_id, - storage::ObLS *ls, - share::ObTabletReplicaChecksumItem &item); + share::ObTabletReplica &tablet_replica, + share::ObTabletReplicaChecksumItem &tablet_checksum, + const bool need_checksum); int register_self(); int check_server_empty(const obrpc::ObCheckServerEmptyArg &arg, const bool wait_log_scan, bool &server_empty); @@ -262,7 +253,6 @@ private: // report ObLSTableUpdater ls_table_updater_; ObTabletTableUpdater tablet_table_updater_; - ObTabletChecksumUpdater tablet_checksum_updater_; ObServerMetaTableChecker meta_table_checker_; }; diff --git a/src/observer/ob_tablet_checksum_updater.cpp b/src/observer/ob_tablet_checksum_updater.cpp deleted file mode 100644 index b98abe1c1..000000000 --- a/src/observer/ob_tablet_checksum_updater.cpp +++ /dev/null @@ -1,508 +0,0 @@ -/** - * Copyright (c) 2021 OceanBase - * OceanBase CE is licensed under Mulan PubL v2. - * You can use this software according to the terms and conditions of the Mulan PubL v2. - * You may obtain a copy of Mulan PubL v2 at: - * http://license.coscl.org.cn/MulanPubL-2.0 - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PubL v2 for more details. - */ - -#define USING_LOG_PREFIX SERVER - -#include "ob_tablet_checksum_updater.h" -#include "observer/ob_service.h" -#include "share/ob_tablet_replica_checksum_operator.h" - -namespace oceanbase -{ -using namespace common; -using namespace share; - -namespace observer -{ - -ObTabletChecksumUpdateTask::ObTabletChecksumUpdateTask() - : tenant_id_(OB_INVALID_TENANT_ID), - ls_id_(), - tablet_id_(), - add_timestamp_(OB_INVALID_TIMESTAMP) -{ -} - -ObTabletChecksumUpdateTask::ObTabletChecksumUpdateTask( - const uint64_t tenant_id, - const share::ObLSID &ls_id, - const common::ObTabletID &tablet_id, - const int64_t add_timestamp) - : tenant_id_(tenant_id), - ls_id_(ls_id), - tablet_id_(tablet_id), - add_timestamp_(add_timestamp) -{ -} - -ObTabletChecksumUpdateTask::~ObTabletChecksumUpdateTask() -{ - reset(); -} - -void ObTabletChecksumUpdateTask::reset() -{ - tenant_id_ = OB_INVALID_TENANT_ID; - ls_id_.reset(); - tablet_id_.reset(); - add_timestamp_ = OB_INVALID_TIMESTAMP; -} - -bool ObTabletChecksumUpdateTask::is_valid() const -{ - return OB_INVALID_TENANT_ID != tenant_id_ - && ls_id_.is_valid_with_tenant(tenant_id_) - && tablet_id_.is_valid_with_tenant(tenant_id_) - && 0 < add_timestamp_; -} - -int ObTabletChecksumUpdateTask::init( - const uint64_t tenant_id, - const ObLSID &ls_id, - const ObTabletID &tablet_id, - const int64_t add_timestamp) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id) - || !tablet_id.is_valid_with_tenant(tenant_id) - || 0 >= add_timestamp)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("get invalid arguments", KR(ret), K(tenant_id), K(ls_id), K(tablet_id), K(add_timestamp)); - } else { - tenant_id_ = tenant_id; - ls_id_ = ls_id; - tablet_id_ = tablet_id; - add_timestamp_ = add_timestamp; - } - return ret; -} - -int64_t ObTabletChecksumUpdateTask::hash() const -{ - uint64_t hash_val = 0; - hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val); - hash_val = murmurhash(&ls_id_, sizeof(ls_id_), hash_val); - hash_val = murmurhash(&tablet_id_, sizeof(tablet_id_), hash_val); - return hash_val; -} - -void ObTabletChecksumUpdateTask::check_task_status() const -{ - // TODO @danling.fjk ignore this error log temp - // int64_t now = ObTimeUtility::current_time(); - // const int64_t safe_interval = TABLET_CHECK_INTERVAL; - // // print an error log if this task is not executed correctly since two mins ago - // if (now - add_timestamp_ > safe_interval) { - // LOG_ERROR("tablet table update task cost too much time to execute", - // K(*this), K(safe_interval), "cost_time", now - add_timestamp_); - // } -} - -int ObTabletChecksumUpdateTask::assign(const ObTabletChecksumUpdateTask &other) -{ - int ret = OB_SUCCESS; - if (this != &other) { - tenant_id_ = other.tenant_id_; - ls_id_ = other.ls_id_; - tablet_id_ = other.tablet_id_; - add_timestamp_ = other.add_timestamp_; - } - return ret; -} - -bool ObTabletChecksumUpdateTask::operator==(const ObTabletChecksumUpdateTask &other) const -{ - bool bret = false; - if (this == &other) { - bret = true; - } else { - bret = (tenant_id_ == other.tenant_id_ - && ls_id_ == other.ls_id_ - && tablet_id_ == other.tablet_id_); - } - return bret; -} - -bool ObTabletChecksumUpdateTask::operator!=(const ObTabletChecksumUpdateTask &other) const -{ - return !(operator==(other)); -} - -bool ObTabletChecksumUpdateTask::compare_without_version( - const ObTabletChecksumUpdateTask &other) const -{ - return (operator==(other)); -} - - -ObTabletChecksumUpdater::ObTabletChecksumUpdater() - : inited_(false), - stopped_(true), - ob_service_(nullptr), - task_queue_() -{ -} - -ObTabletChecksumUpdater::~ObTabletChecksumUpdater() -{ - destroy(); -} - -int ObTabletChecksumUpdater::init(ObService &ob_service) -{ - int ret = OB_SUCCESS; - const int64_t task_queue_size = !lib::is_mini_mode() - ? TASK_QUEUE_SIZE - : MINI_MODE_TASK_QUEUE_SIZE; - const int64_t task_thread_cnt = !lib::is_mini_mode() - ? UPDATE_TASK_THREAD_CNT - : MINI_MODE_UPDATE_TASK_THREAD_CNT; - if (OB_UNLIKELY(inited_)) { - ret = OB_INIT_TWICE; - LOG_WARN("inited twice", KR(ret)); - } else if (OB_FAIL(task_queue_.init(this, - task_thread_cnt, task_queue_size, "TbltCksUp"))) { - LOG_WARN("failed to init tablet checksum updater task queue", KR(ret), - "thread_count", task_thread_cnt, - "queue_size", task_queue_size); - } else { - ob_service_ = &ob_service; - inited_ = true; - stopped_ = false; - LOG_INFO("success to init ObTabletChecksumUpdater"); - } - return ret; -} - -void ObTabletChecksumUpdater::stop() -{ - if (inited_) { - stopped_ = true; - task_queue_.stop(); - LOG_INFO("stop ObTabletChecksumUpdater success"); - } -} - -void ObTabletChecksumUpdater::wait() -{ - if (inited_) { - task_queue_.wait(); - LOG_INFO("wait ObTabletChecksumUpdater"); - } -} - -void ObTabletChecksumUpdater::destroy() -{ - stop(); - wait(); - inited_ = false; - stopped_ = true; - ob_service_ = nullptr; -} - -int ObTabletChecksumUpdater::async_update( - const uint64_t tenant_id, - const share::ObLSID &ls_id, - const common::ObTabletID &tablet_id) -{ - int ret = OB_SUCCESS; - int64_t add_timestamp = ObTimeUtility::current_time(); - ObTabletChecksumUpdateTask task; - if (OB_UNLIKELY(!inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("ObTabletChecksumUpdater not inited", KR(ret)); - } else if (tablet_id.is_reserved_tablet() || is_virtual_tenant_id(tenant_id)) { - LOG_TRACE("no need to report virtual tenant's tablet and reserved tablet", - KR(ret), K(tablet_id), K(tenant_id)); - } else if (OB_INVALID_TENANT_ID == tenant_id - || !ls_id.is_valid() - || !ls_id.is_valid_with_tenant(tenant_id) - || !tablet_id.is_valid() - || !tablet_id.is_valid_with_tenant(tenant_id)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(tablet_id)); - } else if (OB_FAIL(task.init(tenant_id, - ls_id, - tablet_id, - add_timestamp))) { - LOG_WARN("set update task failed", KR(ret), K(tenant_id), K(ls_id), K(tablet_id), - K(add_timestamp)); - } else if (OB_FAIL(add_task_(task))){ - LOG_WARN("fail to add task", KR(ret), K(tenant_id), K(ls_id), K(tablet_id), - K(add_timestamp)); - } - return ret; -} - -int ObTabletChecksumUpdater::add_task_(const ObTabletChecksumUpdateTask &task) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("not inited", KR(ret)); - } else if (!task.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid task", KR(ret), K(task)); - } else if (OB_FAIL(task_queue_.add(task))){ - // TODO: deal with barrier-tasks when execute - if (OB_EAGAIN == ret) { - LOG_TRACE("tablet table update task exist", K(task)); - ret = OB_SUCCESS; - } else { - LOG_WARN("add tablet table update task failed", KR(ret), K(task)); - } - } - return ret; -} - -int ObTabletChecksumUpdater::reput_to_queue_(const ObIArray &tasks) -{ - int ret = OB_SUCCESS; - // try to push task back to queue, ignore ret code - for (int64_t i = 0; i < tasks.count(); i++) { - const ObTabletChecksumUpdateTask &task = tasks.at(i); - if (!task.is_valid()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get invalid task", KR(ret), K(task)); - } else if (OB_FAIL(add_task_(task))) { - LOG_ERROR("fail to reput to queue", KR(ret), K(task)); - } - } - return ret; -} - -int ObTabletChecksumUpdater::process_barrier( - const ObTabletChecksumUpdateTask &task, - bool &stopped) -{ - int ret = OB_NOT_SUPPORTED; - UNUSEDx(task, stopped); - return ret; -} - -int ObTabletChecksumUpdater::batch_process_tasks( - const common::ObIArray &tasks, - bool &stopped) -{ - int ret = OB_SUCCESS; - UNUSED(stopped); - int tmp_ret = OB_SUCCESS; - const int64_t start_time = ObTimeUtility::current_time(); - ObArray update_tablet_items; - UpdateTabletChecksumTaskList update_tasks; - RemoveTabletChecksumTaskList remove_tasks; - ObCurTraceId::init(GCONF.self_addr_); - - if (OB_UNLIKELY(!inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("ObTabletChecksumUpdater is not inited", KR(ret)); - } else if (OB_FAIL(generate_tasks_(tasks, update_tablet_items, update_tasks, remove_tasks))){ - if (OB_TENANT_NOT_IN_SERVER != ret && OB_NOT_RUNNING != ret) { - LOG_ERROR("generate_tasks failed", KR(ret), "tasks count", tasks.count(), - "update_tablet_items", update_tablet_items.count(), - "update_tasks", update_tasks.count(), - "remove_tasks", remove_tasks.count()); - } else { - LOG_WARN("Tenant/LogStream has been stopped, skip to process tasks", KR(ret)); - } - } else { - tmp_ret = do_batch_update_(start_time, update_tasks, update_tablet_items); - if (OB_SUCCESS != tmp_ret) { - ret = OB_SUCC(ret) ? tmp_ret : ret; - LOG_WARN("do_batch_update_ failed", KR(tmp_ret), K(start_time), K(update_tasks), - K(update_tablet_items)); - } else { - ObTaskController::get().allow_next_syslog(); - LOG_INFO("REPORT: success to update tablets", KR(tmp_ret), K(update_tablet_items)); - } - tmp_ret = do_batch_remove_(start_time, remove_tasks); - if (OB_SUCCESS != tmp_ret) { - ret = OB_SUCC(ret) ? tmp_ret : ret; - LOG_WARN("do_batch_remove_ failed", KR(tmp_ret), K(start_time), K(remove_tasks)); - } else { - ObTaskController::get().allow_next_syslog(); - LOG_INFO("REPORT: success to remove tablets", KR(tmp_ret), K(remove_tasks)); - } - } - return ret; -} - -int ObTabletChecksumUpdater::generate_tasks_( - const ObIArray &tasks, - ObIArray &update_tablet_items, - UpdateTabletChecksumTaskList &update_tasks, - RemoveTabletChecksumTaskList &remove_tasks) -{ - int ret = OB_SUCCESS; - int64_t tenant_id = OB_INVALID_TENANT_ID; - int64_t count = UNIQ_TASK_QUEUE_BATCH_EXECUTE_NUM; - if (OB_UNLIKELY(!inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("ObTabletChecksumUpdater is not inited", KR(ret)); - } else if (OB_ISNULL(ob_service_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid argument", KR(ret), KP_(ob_service)); - } else if (OB_UNLIKELY(tasks.count() <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("tasks count <= 0", KR(ret), "tasks_count", tasks.count()); - } else if (FALSE_IT(tenant_id = tasks.at(0).get_tenant_id())) { - // shall never be here - } else { - ObTabletReplicaChecksumItem item; - FOREACH_CNT_X(task, tasks, OB_SUCC(ret)) { - // split tasks into remove and update - if (OB_ISNULL(task)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid task", KR(ret), K(task)); - } else if (tenant_id != task->get_tenant_id()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("tenant_id not the same", KR(ret), K(tenant_id), K(task)); - } else { - task->check_task_status(); - item.reset(); - if (OB_FAIL(ob_service_->fill_tablet_replica_checksum_item(task->get_tenant_id(), - task->get_ls_id(), task->get_tablet_id(), item))) { - if (OB_TABLET_NOT_EXIST == ret || OB_LS_NOT_EXIST == ret) { - LOG_INFO("try update a not exist or invalid tablet, turn to remove tablet table", - KR(ret), "tenant_id", task->get_tenant_id(), - "ls_id", task->get_ls_id(), - "tablet_id", task->get_tablet_id()); - ret = OB_SUCCESS; - if (OB_FAIL(remove_tasks.reserve(count))) { - // reserve() is reentrant, do not have to check whether first time - LOG_WARN("fail to reserver remove_tasks", KR(ret), K(count)); - } else if (OB_FAIL(remove_tasks.push_back(*task))) { - LOG_WARN("fail to push back remove task", KR(ret), KPC(task)); - } - } else { - LOG_WARN("fail to fill tablet item", KR(ret), "tenant_id", task->get_tenant_id(), - "ls_id", task->get_ls_id(), "tablet_id", task->get_tablet_id()); - } - } else { - LOG_TRACE("fill tablet checksum item success", K(task), K(item)); - if (OB_FAIL(update_tablet_items.reserve(count))) { - // reserve() is reentrant, do not have to check whether first time - LOG_WARN("fail to reserve update_tablet_items", KR(ret), K(count)); - } else if (OB_FAIL(update_tasks.reserve(count))) { - // reserve() is reentrant, do not have to check whether first time - LOG_WARN("fail to reserve update_tasks", KR(ret), K(count)); - } else if (OB_FAIL(update_tablet_items.push_back(item))) { - LOG_WARN("fail to push back item", KR(ret), K(item)); - } else if (OB_FAIL(update_tasks.push_back(*task))) { - LOG_WARN("fail to push back task", KR(ret), KPC(task)); - } - } - } - } //FOREACH - - if (OB_SUCC(ret) - && (update_tasks.count() != update_tablet_items.count() - || update_tasks.count() + remove_tasks.count() != tasks.count())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("tablet task count and item count not match", KR(ret), - "tablet_update_tasks count", update_tasks.count(), - "tablet_update_items count", update_tablet_items.count(), - "tablet_remove_tasks count", remove_tasks.count(), - "tasks count", tasks.count()); - } - } - return ret; -} - -int ObTabletChecksumUpdater::do_batch_update_( - const int64_t start_time, - const ObIArray &tasks, - const ObIArray &items) -{ - int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - int64_t tenant_id = OB_INVALID_TENANT_ID; - if (tasks.count() != items.count() || 0 == tasks.count()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("tasks num not match", KR(ret), "task_cnt", tasks.count(), "item_cnt", items.count()); - } else if (FALSE_IT(tenant_id = tasks.at(0).get_tenant_id())) { - } else if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_update(tenant_id, items, *GCTX.sql_proxy_))) { - LOG_WARN("do tablet table update failed, try to reput to queue", KR(ret), - "escape time", ObTimeUtility::current_time() - start_time); - (void) throttle_(ret, ObTimeUtility::current_time() - start_time); - if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) { - LOG_ERROR("fail to reput update task to queue", KR(tmp_ret), K(tasks.count())); - } else { - LOG_INFO("reput update task to queue success", K(tasks.count())); - } - } else { - LOG_INFO("batch process update success", KR(ret), K(items.count()), - "use_time", ObTimeUtility::current_time() - start_time); - } - return ret; -} - -int ObTabletChecksumUpdater::do_batch_remove_( - const int64_t start_time, - const ObIArray &tasks) -{ - int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - int64_t tenant_id = OB_INVALID_TENANT_ID; - if (0 == tasks.count()) { - LOG_INFO("no need to remove task", KR(ret), "task_cnt", tasks.count()); - } else if (FALSE_IT(tenant_id = tasks.at(0).get_tenant_id())) { - } else { - ObArray ls_ids; - ObArray tablet_ids; - for (int64_t i = 0; OB_SUCC(ret) && i < tasks.count(); ++i) { - if (OB_FAIL(ls_ids.push_back(tasks.at(i).ls_id_))) { - LOG_WARN("failed to add ls id", KR(ret)); - } else if (OB_FAIL(tablet_ids.push_back(tasks.at(i).tablet_id_))) { - LOG_WARN("failed to add tablet id", KR(ret)); - } - } - if (OB_SUCC(ret) && OB_FAIL(ObTabletReplicaChecksumOperator::batch_remove(tenant_id, ls_ids, tablet_ids, *GCTX.sql_proxy_))) { - LOG_WARN("do tablet checksum remove failed, try to reput to queue", KR(ret), - "escape time", ObTimeUtility::current_time() - start_time); - (void) throttle_(ret, ObTimeUtility::current_time() - start_time); - if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) { - LOG_ERROR("fail to reput remove task to queue", KR(tmp_ret), K(tasks.count())); - } else { - LOG_INFO("reput remove task to queue success", K(tasks.count())); - } - } else { - LOG_INFO("batch process remove success", KR(ret), K(tasks.count()), - "use_time", ObTimeUtility::current_time() - start_time); - } - } - return ret; -} - -int ObTabletChecksumUpdater::throttle_( - const int return_code, - const int64_t execute_time_us) -{ - int ret = OB_SUCCESS; - int64_t sleep_us = 0; - if (OB_SUCCESS != return_code) { - sleep_us = 2l * 1000 * 1000; // 2s - } else if (execute_time_us > 20 * 1000 * 1000) { // 20s - sleep_us = MIN(1L * 1000 * 1000, (execute_time_us - 20 * 1000 * 1000)); - LOG_WARN("detected slow update, may be too many concurrent updating", K(sleep_us)); - } - const static int64_t sleep_step_us = 20 * 1000; // 20ms - for (; !stopped_ && sleep_us > 0; - sleep_us -= sleep_step_us) { - ob_usleep(static_cast(std::min(sleep_step_us, sleep_us))); - } - return ret; -} - - -} // observer -} // oceanbase diff --git a/src/observer/ob_tablet_checksum_updater.h b/src/observer/ob_tablet_checksum_updater.h deleted file mode 100644 index 2853461e0..000000000 --- a/src/observer/ob_tablet_checksum_updater.h +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Copyright (c) 2021 OceanBase - * OceanBase CE is licensed under Mulan PubL v2. - * You can use this software according to the terms and conditions of the Mulan PubL v2. - * You may obtain a copy of Mulan PubL v2 at: - * http://license.coscl.org.cn/MulanPubL-2.0 - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PubL v2 for more details. - */ - -#ifndef OCEANBASE_OBSERVER_OB_TABLET_CHECKSUM_UPDATER_H_ -#define OCEANBASE_OBSERVER_OB_TABLET_CHECKSUM_UPDATER_H_ - -#include "observer/ob_uniq_task_queue.h" -#include "common/ob_tablet_id.h" -#include "share/ob_ls_id.h" - -namespace oceanbase -{ -namespace common -{ -class ObAddr; -class ObTabletID; -} -namespace share -{ -class ObLSID; -struct ObTabletReplicaChecksumItem; -} - -namespace observer -{ -class ObService; -class ObTabletChecksumUpdater; -class ObTabletChecksumUpdateTask : public ObIUniqTaskQueueTask -{ -public: - friend class ObTabletChecksumUpdater; - - ObTabletChecksumUpdateTask(); - ObTabletChecksumUpdateTask( - const uint64_t tenant_id, - const share::ObLSID &ls_id, - const common::ObTabletID &tablet_id, - const int64_t add_timestamp); - virtual ~ObTabletChecksumUpdateTask(); - int init( - const uint64_t tenant_id, - const share::ObLSID &ls_id, - const common::ObTabletID &tablet_id, - const int64_t add_timestamp); - void reset(); - bool is_valid() const; - int64_t hash() const; - void check_task_status() const; - int assign(const ObTabletChecksumUpdateTask &other); - virtual bool operator==(const ObTabletChecksumUpdateTask &other) const; - virtual bool operator!=(const ObTabletChecksumUpdateTask &other) const; - inline int64_t get_tenant_id() const { return tenant_id_; } - inline const share::ObLSID &get_ls_id() const { return ls_id_; } - inline const common::ObTabletID &get_tablet_id() const { return tablet_id_; } - inline int64_t get_add_timestamp() const { return add_timestamp_; } - bool compare_without_version(const ObTabletChecksumUpdateTask& other) const; - uint64_t get_group_id() const { return tenant_id_; } - bool need_process_alone() const { return false; } - virtual bool need_assign_when_equal() const { return false; } - inline int assign_when_equal(const ObTabletChecksumUpdateTask &other) - { - UNUSED(other); - return common::OB_NOT_SUPPORTED; - } - bool is_barrier() const { return false; } - TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), K_(add_timestamp)); -private: - const int64_t TABLET_CHECK_INTERVAL = 120l * 1000 * 1000; //2 minutes - int64_t tenant_id_; - share::ObLSID ls_id_; - common::ObTabletID tablet_id_; - int64_t add_timestamp_; -}; - - -// TODO impl Checker to clean the abandoned task -typedef ObUniqTaskQueue ObTabletChecksumTaskQueue; -typedef ObArray UpdateTabletChecksumTaskList; -typedef ObArray RemoveTabletChecksumTaskList; - -// TODO impl MTL Module -class ObTabletChecksumUpdater -{ -public: - ObTabletChecksumUpdater(); - virtual ~ObTabletChecksumUpdater(); - int init(ObService &ob_service); - inline bool is_inited() const { return inited_; } - void stop(); - void wait(); - void destroy(); - int async_update( - const uint64_t tenant_id, - const share::ObLSID &ls_id, - const common::ObTabletID &tablet_id); - int batch_process_tasks( - const common::ObIArray &tasks, - bool &stopped); - int process_barrier(const ObTabletChecksumUpdateTask &task, bool &stopped); -private: - int add_task_(const ObTabletChecksumUpdateTask &task); - int reput_to_queue_(const ObIArray &tasks); - int generate_tasks_( - const ObIArray &tasks, - ObIArray &update_tablet_items, - UpdateTabletChecksumTaskList &update_tasks, - RemoveTabletChecksumTaskList &remove_tasks); - int do_batch_update_( - const int64_t start_time, - const ObIArray &tasks, - const ObIArray &items); - int do_batch_remove_( - const int64_t start_time, - const ObIArray &tasks); - - int throttle_(const int return_code, const int64_t execute_time_us); -private: - const int64_t MINI_MODE_UPDATE_TASK_THREAD_CNT = 1; - const int64_t UPDATE_TASK_THREAD_CNT = 7; - const int64_t MINI_MODE_TASK_QUEUE_SIZE = 20 * 10000; - const int64_t TASK_QUEUE_SIZE = 100 * 10000; - bool inited_; - bool stopped_; - ObService *ob_service_; - ObTabletChecksumTaskQueue task_queue_; - DISALLOW_COPY_AND_ASSIGN(ObTabletChecksumUpdater); -}; - - -} // observer -} // oceanbase -#endif // OCEANBASE_OBSERVER_OB_TABLET_CHECKSUM_UPDATER_H_ \ No newline at end of file diff --git a/src/observer/report/ob_i_meta_report.h b/src/observer/report/ob_i_meta_report.h index 258aedf12..01d9a906c 100644 --- a/src/observer/report/ob_i_meta_report.h +++ b/src/observer/report/ob_i_meta_report.h @@ -37,10 +37,6 @@ public: const uint64_t tenant_id, const share::ObLSID &ls_id, const common::ObTabletID &tablet_id) = 0; - virtual int submit_tablet_checksums_task( - const uint64_t tenant_id, - const share::ObLSID &ls_id, - const common::ObTabletID &tablet_id) = 0; }; } // end namespace observer diff --git a/src/observer/report/ob_tablet_table_updater.cpp b/src/observer/report/ob_tablet_table_updater.cpp index 8a1eeade6..ebaf2502d 100644 --- a/src/observer/report/ob_tablet_table_updater.cpp +++ b/src/observer/report/ob_tablet_table_updater.cpp @@ -17,6 +17,9 @@ #include "share/tablet/ob_tablet_info.h" // for ObTabletInfo #include "share/tablet/ob_tablet_table_operator.h" // for ObTabletOperator #include "observer/ob_service.h" // for is_mini_mode +#include "share/ob_tablet_replica_checksum_operator.h" // for ObTabletReplicaChecksumItem +#include "lib/mysqlclient/ob_mysql_transaction.h" // ObMySQLTransaction +#include "lib/mysqlclient/ob_mysql_proxy.h" namespace oceanbase { @@ -287,6 +290,7 @@ int ObTabletTableUpdater::generate_tasks_( const ObIArray &batch_tasks, ObArray &update_tablet_replicas, ObArray &remove_tablet_replicas, + ObArray &update_tablet_checksums, UpdateTaskList &update_tablet_tasks, RemoveTaskList &remove_tablet_tasks) { @@ -307,6 +311,7 @@ int ObTabletTableUpdater::generate_tasks_( // shall never be here } else { ObTabletReplica replica; + ObTabletReplicaChecksumItem checksum_item; FOREACH_CNT_X(task, batch_tasks, OB_SUCC(ret)) { // split tasks into remove and update if (OB_ISNULL(task)) { @@ -318,10 +323,12 @@ int ObTabletTableUpdater::generate_tasks_( } else { task->check_task_status(); replica.reset(); - if (OB_FAIL(ob_service_->fill_tablet_replica(task->get_tenant_id(), - task->get_ls_id(), - task->get_tablet_id(), - replica))) { + checksum_item.reset(); + if (OB_FAIL(ob_service_->fill_tablet_report_info(task->get_tenant_id(), + task->get_ls_id(), + task->get_tablet_id(), + replica, + checksum_item))) { if (OB_TABLET_NOT_EXIST == ret || OB_LS_NOT_EXIST == ret) { ret = OB_SUCCESS; // fill primary keys of the replica for removing @@ -353,11 +360,16 @@ int ObTabletTableUpdater::generate_tasks_( if (OB_FAIL(update_tablet_replicas.reserve(count))) { // reserve() is reentrant, do not have to check whether first time LOG_WARN("fail to reserve update_tablet_replicas", KR(ret), K(count)); + } else if (OB_FAIL(update_tablet_checksums.reserve(count))) { + // reserve() is reentrant, do not have to check whether first time + LOG_WARN("fail to reserve update_tablet_checksums", KR(ret), K(count)); } else if (OB_FAIL(update_tablet_tasks.reserve(count))) { // reserve() is reentrant, do not have to check whether first time LOG_WARN("fail to reserve update_tablet_tasks", KR(ret), K(count)); } else if (OB_FAIL(update_tablet_replicas.push_back(replica))) { LOG_WARN("fail to push back replica", KR(ret), K(replica)); + } else if (OB_FAIL(update_tablet_checksums.push_back(checksum_item))) { + LOG_WARN("fail to push back checksum item", KR(ret), K(checksum_item)); } else if (OB_FAIL(update_tablet_tasks.push_back(*task))) { LOG_WARN("fail to push back task", KR(ret), KPC(task)); } @@ -367,12 +379,14 @@ int ObTabletTableUpdater::generate_tasks_( if (OB_SUCC(ret) && (update_tablet_tasks.count() != update_tablet_replicas.count() + || update_tablet_tasks.count() != update_tablet_checksums.count() || update_tablet_tasks.count() + remove_tablet_tasks.count() != batch_tasks.count())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet task count and replica count not match", KR(ret), "tablet_update_tasks count", update_tablet_tasks.count(), "tablet_update_replicas count", update_tablet_replicas.count(), + "tablet_update_checksums count", update_tablet_checksums.count(), "tablet_remove_tasks count", remove_tablet_tasks.count(), "batch_tasks count", batch_tasks.count()); } @@ -390,6 +404,7 @@ int ObTabletTableUpdater::batch_process_tasks( const int64_t start_time = ObTimeUtility::current_time(); ObArray update_tablet_replicas; ObArray remove_tablet_replicas; + ObArray update_tablet_checksums; UpdateTaskList update_tablet_tasks; RemoveTaskList remove_tablet_tasks; ObCurTraceId::init(GCONF.self_addr_); @@ -424,19 +439,21 @@ int ObTabletTableUpdater::batch_process_tasks( batch_tasks, update_tablet_replicas, remove_tablet_replicas, + update_tablet_checksums, update_tablet_tasks, remove_tablet_tasks))) { LOG_ERROR("generate_tasks failed", KR(ret), "batch_tasks count", batch_tasks.count(), "update_tablet_replicas", update_tablet_replicas.count(), "remove_tablet_replicas", remove_tablet_replicas.count(), + "update_tablet_checksums", update_tablet_checksums.count(), "update_tablet_tasks", update_tablet_tasks.count(), "remove_tablet_tasks", remove_tablet_tasks.count()); } else { - tmp_ret = do_batch_update_(start_time, update_tablet_tasks, update_tablet_replicas); + tmp_ret = do_batch_update_(start_time, update_tablet_tasks, update_tablet_replicas, update_tablet_checksums); if (OB_SUCCESS != tmp_ret) { ret = OB_SUCC(ret) ? tmp_ret : ret; LOG_WARN("do_batch_update_ failed", KR(tmp_ret), K(start_time), K(update_tablet_tasks), - K(update_tablet_replicas)); + K(update_tablet_replicas), K(update_tablet_checksums)); } else { ObTaskController::get().allow_next_syslog(); LOG_INFO("REPORT: success to update tablets", KR(tmp_ret), K(update_tablet_replicas)); @@ -472,18 +489,37 @@ int ObTabletTableUpdater::do_batch_remove_( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid tasks count", KR(ret), K(tasks_count)); } else if (FALSE_IT(tenant_id = tasks.at(0).get_tenant_id())) { - } else if (OB_FAIL(tablet_operator_->batch_remove(tenant_id, replicas))) { - LOG_WARN("do tablet table remove failed, try to reput to queue", KR(ret), - "escape time", ObTimeUtility::current_time() - start_time); - (void) throttle_(ret, ObTimeUtility::current_time() - start_time); - if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) { - LOG_ERROR("fail to reput remove task to queue", KR(tmp_ret), K(tasks_count)); - } else { - LOG_INFO("reput remove task to queue success", K(tasks_count)); - } } else { - LOG_INFO("batch process remove success", KR(ret), K(tasks_count), - "use_time", ObTimeUtility::current_time() - start_time); + common::ObMySQLTransaction trans; + const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id); + if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) { + LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id)); + } else if (OB_FAIL(tablet_operator_->batch_remove(trans, tenant_id, replicas))) { + LOG_WARN("do tablet table remove failed, try to reput to queue", KR(ret), + "escape time", ObTimeUtility::current_time() - start_time); + } else if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_remove_with_trans(trans, tenant_id, replicas))) { + LOG_WARN("do tablet table checksum remove failed, try to reput to queue", KR(ret), + "escape time", ObTimeUtility::current_time() - start_time); + } + + if (trans.is_started()) { + int trans_ret = trans.end(OB_SUCCESS == ret); + if (OB_SUCCESS != trans_ret) { + LOG_WARN("fail to end transaction", KR(trans_ret)); + ret = ((OB_SUCCESS == ret) ? trans_ret : ret); + } + } + if (OB_FAIL(ret)) { + (void) throttle_(ret, ObTimeUtility::current_time() - start_time); + if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) { + LOG_ERROR("fail to reput remove task to queue", KR(tmp_ret), K(tasks_count)); + } else { + LOG_INFO("reput remove task to queue success", K(tasks_count)); + } + } else { + LOG_INFO("batch process remove success", KR(ret), K(tasks_count), + "use_time", ObTimeUtility::current_time() - start_time); + } } return ret; } @@ -491,30 +527,51 @@ int ObTabletTableUpdater::do_batch_remove_( int ObTabletTableUpdater::do_batch_update_( const int64_t start_time, const ObIArray &tasks, - const ObIArray &replicas) + const ObIArray &replicas, + const ObIArray &checksums) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; int64_t tenant_id = OB_INVALID_TENANT_ID; if (tasks.count() != replicas.count() + || tasks.count() != checksums.count() || OB_ISNULL(tablet_operator_) || 0 == tasks.count()) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("tasks num not match or invalid tablet_operator", - KR(ret), "task_cnt", tasks.count(), "replica_cnt", replicas.count()); + LOG_WARN("tasks num not match or invalid tablet_operator", KR(ret), "task_cnt", tasks.count(), + "replica_cnt", replicas.count(), "checksum_cnt", checksums.count()); } else if (FALSE_IT(tenant_id = tasks.at(0).get_tenant_id())) { - } else if (OB_FAIL(tablet_operator_->batch_update(tenant_id, replicas))) { - LOG_WARN("do tablet table update failed, try to reput to queue", KR(ret), - "escape time", ObTimeUtility::current_time() - start_time); - (void) throttle_(ret, ObTimeUtility::current_time() - start_time); - if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) { - LOG_ERROR("fail to reput update task to queue", KR(tmp_ret), K(tasks.count())); - } else { - LOG_INFO("reput update task to queue success", K(tasks.count())); - } } else { - LOG_INFO("batch process update success", KR(ret), K(replicas.count()), + common::ObMySQLTransaction trans; + const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id); + if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) { + LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id)); + } else if (OB_FAIL(tablet_operator_->batch_update(trans, tenant_id, replicas))) { + LOG_WARN("do tablet table update failed, try to reput to queue", KR(ret), + "escape time", ObTimeUtility::current_time() - start_time); + } else if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_update_with_trans(trans, tenant_id, checksums))) { + LOG_WARN("do tablet table checksum update failed, try to reput to queue", KR(ret), + "escape time", ObTimeUtility::current_time() - start_time); + } + + if (trans.is_started()) { + int trans_ret = trans.end(OB_SUCCESS == ret); + if (OB_SUCCESS != trans_ret) { + LOG_WARN("fail to end transaction", KR(trans_ret)); + ret = ((OB_SUCCESS == ret) ? trans_ret : ret); + } + } + if (OB_FAIL(ret)) { + (void) throttle_(ret, ObTimeUtility::current_time() - start_time); + if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) { + LOG_ERROR("fail to reput update task to queue", KR(tmp_ret), K(tasks.count())); + } else { + LOG_INFO("reput update task to queue success", K(tasks.count())); + } + } else { + LOG_INFO("batch process update success", KR(ret), K(replicas.count()), "use_time", ObTimeUtility::current_time() - start_time); + } } return ret; } diff --git a/src/observer/report/ob_tablet_table_updater.h b/src/observer/report/ob_tablet_table_updater.h index a39139377..8be00f37f 100644 --- a/src/observer/report/ob_tablet_table_updater.h +++ b/src/observer/report/ob_tablet_table_updater.h @@ -29,6 +29,7 @@ namespace share class ObLSID; class ObTabletReplica; class ObTabletTableOperator; +struct ObTabletReplicaChecksumItem; } namespace observer { @@ -143,12 +144,14 @@ private: // @parma [in] batch_tasks, input tasks // @parma [out] update_tablet_replicas, generated update replicas // @parma [out] remove_tablet_replicas, generated remove replicas + // @parma [out] update_tablet_checksums, generated update tablet checksums // @parma [out] update_tablet_tasks, generated update tasks // @parma [out] remove_tablet_tasks, generated remove tasks int generate_tasks_( const ObIArray &batch_tasks, ObArray &update_tablet_replicas, ObArray &remove_tablet_replicas, + ObArray &update_tablet_checksums, UpdateTaskList &update_tablet_tasks, RemoveTaskList &remove_tablet_tasks); @@ -156,10 +159,12 @@ private: // @parma [in] start_time, the time to start this execution // @parma [in] tasks, batch of tasks to execute // @parma [in] replicas, related replica to each task + // @parma [in] checksums, related checksum to each task int do_batch_update_( const int64_t start_time, const ObIArray &tasks, - const ObIArray &replicas); + const ObIArray &replicas, + const ObIArray &checksums); // do_batch_remove - the real action to remove a batch of tasks // @parma [in] start_time, the time to start this execution diff --git a/src/observer/report/ob_tenant_meta_checker.cpp b/src/observer/report/ob_tenant_meta_checker.cpp index 0ea73e7d7..55e9796bf 100644 --- a/src/observer/report/ob_tenant_meta_checker.cpp +++ b/src/observer/report/ob_tenant_meta_checker.cpp @@ -22,6 +22,7 @@ #include "share/tablet/ob_tablet_table_iterator.h" // ObTenantTabletTableIterator #include "storage/tx_storage/ob_ls_service.h" // ObLSService, ObLSIterator #include "storage/tx_storage/ob_ls_handle.h" // ObLSHandle +#include "share/ob_tablet_replica_checksum_operator.h" // ObTabletReplicaChecksumItem namespace oceanbase { @@ -621,6 +622,8 @@ int ObTenantMetaChecker::check_report_replicas_( ObTabletID tablet_id; ObTabletReplica local_replica; // replica from local ObTabletReplica table_replica; // replica from meta table + share::ObTabletReplicaChecksumItem tablet_checksum; // TODO(@donglou.zl) check tablet_replica_checksum + const bool need_checksum = false; const ObLSID &ls_id = ls->get_ls_id(); while (OB_SUCC(ret)) { if (OB_FAIL(tablet_iter.get_next_tablet(tablet_handle))) { @@ -650,11 +653,13 @@ int ObTenantMetaChecker::check_report_replicas_( LOG_WARN("get replica from hashmap failed", KR(ret), K_(tenant_id), K(ls_id), K(tablet_id)); } - } else if (OB_FAIL(GCTX.ob_service_->fill_tablet_replica( + } else if (OB_FAIL(GCTX.ob_service_->fill_tablet_report_info( tenant_id_, ls_id, tablet_id, - local_replica))) { + local_replica, + tablet_checksum, + need_checksum))) { LOG_WARN("fail to fill tablet replica", KR(ret), K_(tenant_id), K(ls_id), K(tablet_id)); } else if (table_replica.is_equal_for_report(local_replica)) { continue; diff --git a/src/share/ob_tablet_replica_checksum_operator.cpp b/src/share/ob_tablet_replica_checksum_operator.cpp index e9badb56d..76c67ed34 100644 --- a/src/share/ob_tablet_replica_checksum_operator.cpp +++ b/src/share/ob_tablet_replica_checksum_operator.cpp @@ -20,6 +20,8 @@ #include "lib/mysqlclient/ob_mysql_proxy.h" #include "share/inner_table/ob_inner_table_schema_constants.h" #include "share/schema/ob_column_schema.h" +#include "share/tablet/ob_tablet_info.h" +#include "share/config/ob_server_config.h" namespace oceanbase { @@ -346,14 +348,77 @@ ObTabletReplicaChecksumItem &ObTabletReplicaChecksumItem::operator=(const ObTabl /****************************** ObTabletReplicaChecksumOperator ******************************/ -int ObTabletReplicaChecksumOperator::batch_remove( +int ObTabletReplicaChecksumOperator::batch_remove_with_trans( + ObMySQLTransaction &trans, const uint64_t tenant_id, - const ObIArray &ls_ids, - const ObIArray &tablet_ids, - ObISQLClient &sql_proxy) + const common::ObIArray &tablet_replicas) { - int ret = OB_NOT_IMPLEMENT; - UNUSEDx(tenant_id, ls_ids, tablet_ids, sql_proxy); + int ret = OB_SUCCESS; + const int64_t replicas_count = tablet_replicas.count(); + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || replicas_count <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), "tablet_replica cnt", replicas_count); + } else { + int64_t start_idx = 0; + int64_t end_idx = min(MAX_BATCH_COUNT, replicas_count); + while (OB_SUCC(ret) && (start_idx < end_idx)) { + if (OB_FAIL(inner_batch_remove_by_sql_(tenant_id, tablet_replicas, start_idx, end_idx, trans))) { + LOG_WARN("fail to inner batch remove", KR(ret), K(tenant_id), K(start_idx), K(end_idx)); + } else { + start_idx = end_idx; + end_idx = min(start_idx + MAX_BATCH_COUNT, replicas_count); + } + } + } + return ret; +} + +int ObTabletReplicaChecksumOperator::inner_batch_remove_by_sql_( + const uint64_t tenant_id, + const common::ObIArray &tablet_replicas, + const int64_t start_idx, + const int64_t end_idx, + ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) + || (tablet_replicas.count() <= 0) + || (start_idx < 0) + || (start_idx > end_idx) + || (end_idx > tablet_replicas.count()))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(start_idx), K(end_idx), + "tablet_replica cnt", tablet_replicas.count()); + } else { + ObSqlString sql; + int64_t affected_rows = 0; + if (OB_FAIL(sql.assign_fmt("DELETE FROM %s WHERE tenant_id = '%lu' AND (tablet_id, svr_ip, svr_port, ls_id) IN(", + OB_ALL_TABLET_REPLICA_CHECKSUM_TNAME, tenant_id))) { + LOG_WARN("fail to assign sql", KR(ret), K(tenant_id)); + } else { + char ip[OB_MAX_SERVER_ADDR_SIZE] = ""; + for (int64_t idx = start_idx; OB_SUCC(ret) && (idx < end_idx); ++idx) { + if (OB_UNLIKELY(!tablet_replicas.at(idx).get_server().ip_to_string(ip, sizeof(ip)))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("convert server ip to string failed", KR(ret), "server", tablet_replicas.at(idx).get_server()); + } else if (OB_FAIL(sql.append_fmt("('%lu', '%s', %d, %ld)%s", + tablet_replicas.at(idx).get_tablet_id().id(), + ip, + tablet_replicas.at(idx).get_server().get_port(), + tablet_replicas.at(idx).get_ls_id().id(), + ((idx == end_idx - 1) ? ")" : ", ")))) { + LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(idx), K(start_idx), K(end_idx)); + } + } + + const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id); + if (FAILEDx(trans.write(meta_tenant_id, sql.ptr(), affected_rows))) { + LOG_WARN("fail to execute sql", KR(ret), K(meta_tenant_id), K(sql)); + } else { + LOG_INFO("will batch delete tablet replica checksum", K(affected_rows)); + } + } + } return ret; } @@ -687,58 +752,38 @@ int ObTabletReplicaChecksumOperator::construct_tablet_replica_checksum_item_( return ret; } -int ObTabletReplicaChecksumOperator::batch_update( + +int ObTabletReplicaChecksumOperator::batch_update_with_trans( + common::ObMySQLTransaction &trans, const uint64_t tenant_id, - const ObIArray &items, - ObISQLClient &sql_proxy) + const common::ObIArray &items) { - return batch_insert_or_update_(tenant_id, items, sql_proxy, true); + return batch_insert_or_update_with_trans_(tenant_id, items, trans, true); } -int ObTabletReplicaChecksumOperator::batch_insert( +int ObTabletReplicaChecksumOperator::batch_insert_or_update_with_trans_( const uint64_t tenant_id, const ObIArray &items, - ObISQLClient &sql_proxy) -{ - return batch_insert_or_update_(tenant_id, items, sql_proxy, false); -} - -int ObTabletReplicaChecksumOperator::batch_insert_or_update_( - const uint64_t tenant_id, - const ObIArray &items, - ObISQLClient &sql_proxy, + common::ObMySQLTransaction &trans, const bool is_update) { int ret = OB_SUCCESS; - ObMySQLTransaction trans; if (OB_UNLIKELY((OB_INVALID_TENANT_ID == tenant_id) || (items.count() <= 0))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), "items count", items.count()); } else { - const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id); - if (OB_FAIL(trans.start(&sql_proxy, meta_tenant_id))) { - LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id)); - } else { - int64_t start_idx = 0; - int64_t end_idx = min(MAX_BATCH_COUNT, items.count()); - while (OB_SUCC(ret) && (start_idx < end_idx)) { - if (OB_FAIL(inner_batch_insert_or_update_by_sql_(tenant_id, items, start_idx, - end_idx, trans, is_update))) { - LOG_WARN("fail to inner batch insert", KR(ret), K(tenant_id), K(start_idx), K(is_update)); - } else { - start_idx = end_idx; - end_idx = min(start_idx + MAX_BATCH_COUNT, items.count()); - } + int64_t start_idx = 0; + int64_t end_idx = min(MAX_BATCH_COUNT, items.count()); + while (OB_SUCC(ret) && (start_idx < end_idx)) { + if (OB_FAIL(inner_batch_insert_or_update_by_sql_(tenant_id, items, start_idx, + end_idx, trans, is_update))) { + LOG_WARN("fail to inner batch insert", KR(ret), K(tenant_id), K(start_idx), K(is_update)); + } else { + start_idx = end_idx; + end_idx = min(start_idx + MAX_BATCH_COUNT, items.count()); } } } - if (trans.is_started()) { - int trans_ret = trans.end(OB_SUCCESS == ret); - if (OB_SUCCESS != trans_ret) { - LOG_WARN("fail to end transaction", KR(trans_ret)); - ret = ((OB_SUCCESS == ret) ? trans_ret : ret); - } - } return ret; } diff --git a/src/share/ob_tablet_replica_checksum_operator.h b/src/share/ob_tablet_replica_checksum_operator.h index ca64897dd..1a726ecf4 100644 --- a/src/share/ob_tablet_replica_checksum_operator.h +++ b/src/share/ob_tablet_replica_checksum_operator.h @@ -30,6 +30,7 @@ namespace common class ObISQLClient; class ObAddr; class ObTabletID; +class ObMySQLTransaction; namespace sqlclient { class ObMySQLResult; @@ -37,6 +38,7 @@ class ObMySQLResult; } namespace share { +class ObTabletReplica; struct ObTabletReplicaReportColumnMeta { @@ -117,19 +119,14 @@ public: const common::ObSqlString &sql, common::ObISQLClient &sql_proxy, common::ObIArray &items); - static int batch_insert( + static int batch_update_with_trans( + common::ObMySQLTransaction &trans, const uint64_t tenant_id, - const common::ObIArray &items, - common::ObISQLClient &sql_proxy); - static int batch_update( + const common::ObIArray &item); + static int batch_remove_with_trans( + common::ObMySQLTransaction &trans, const uint64_t tenant_id, - const common::ObIArray &items, - common::ObISQLClient &sql_proxy); - static int batch_remove( - const uint64_t tenant_id, - const common::ObIArray &ls_ids, - const common::ObIArray &tablet_ids, - common::ObISQLClient &sql_proxy); + const common::ObIArray &tablet_replicas); static int check_column_checksum( const uint64_t tenant_id, @@ -153,10 +150,10 @@ public: common::ObString &column_meta_hex_str); private: - static int batch_insert_or_update_( + static int batch_insert_or_update_with_trans_( const uint64_t tenant_id, const common::ObIArray &items, - common::ObISQLClient &sql_proxy, + common::ObMySQLTransaction &trans, const bool is_update); static int inner_batch_insert_or_update_by_sql_( @@ -167,6 +164,13 @@ private: common::ObISQLClient &sql_client, const bool is_update); + static int inner_batch_remove_by_sql_( + const uint64_t tenant_id, + const common::ObIArray &tablet_replicas, + const int64_t start_idx, + const int64_t end_idx, + common::ObMySQLTransaction &trans); + static int inner_batch_get_by_sql_( const uint64_t tenant_id, const common::ObSqlString &sql, diff --git a/src/share/tablet/ob_tablet_table_operator.cpp b/src/share/tablet/ob_tablet_table_operator.cpp index a212da1a8..7a4e005b8 100644 --- a/src/share/tablet/ob_tablet_table_operator.cpp +++ b/src/share/tablet/ob_tablet_table_operator.cpp @@ -396,35 +396,49 @@ int ObTabletTableOperator::batch_update( const ObIArray &replicas) { int ret = OB_SUCCESS; - common::ObMySQLTransaction trans; + if (OB_UNLIKELY(!inited_) || OB_ISNULL(sql_proxy_)) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret)); - } else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || replicas.count() <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id), "replicas count", replicas.count()); } else { + common::ObMySQLTransaction trans; const uint64_t sql_tenant_id = gen_meta_tenant_id(tenant_id); if (OB_FAIL(trans.start(sql_proxy_, sql_tenant_id))) { LOG_WARN("start transaction failed", KR(ret), K(sql_tenant_id)); - } else { - int64_t start_idx = 0; - int64_t end_idx = min(MAX_BATCH_COUNT, replicas.count()); - while (OB_SUCC(ret) && (start_idx < end_idx)) { - if (OB_FAIL(inner_batch_update_by_sql_(tenant_id, replicas, start_idx, end_idx, trans))) { - LOG_WARN("fail to inner batch update", KR(ret), K(tenant_id), K(replicas), K(start_idx)); - } else { - start_idx = end_idx; - end_idx = min(start_idx + MAX_BATCH_COUNT, replicas.count()); - } + } else if (OB_FAIL(batch_update(trans, tenant_id, replicas))) { + LOG_WARN("fail to batch update", KR(ret), K(tenant_id)); + } + + if (trans.is_started()) { + int trans_ret = trans.end(OB_SUCCESS == ret); + if (OB_SUCCESS != trans_ret) { + LOG_WARN("end transaction failed", KR(trans_ret)); + ret = OB_SUCCESS == ret ? trans_ret : ret; } } } - if (trans.is_started()) { - int trans_ret = trans.end(OB_SUCCESS == ret); - if (OB_SUCCESS != trans_ret) { - LOG_WARN("end transaction failed", KR(trans_ret)); - ret = OB_SUCCESS == ret ? trans_ret : ret; + return ret; +} + +int ObTabletTableOperator::batch_update( + ObISQLClient &sql_client, + const uint64_t tenant_id, + const ObIArray &replicas) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || replicas.count() <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), "replicas count", replicas.count()); + } else { + int64_t start_idx = 0; + int64_t end_idx = min(MAX_BATCH_COUNT, replicas.count()); + while (OB_SUCC(ret) && (start_idx < end_idx)) { + if (OB_FAIL(inner_batch_update_by_sql_(tenant_id, replicas, start_idx, end_idx, sql_client))) { + LOG_WARN("fail to inner batch update", KR(ret), K(tenant_id), K(replicas), K(start_idx)); + } else { + start_idx = end_idx; + end_idx = min(start_idx + MAX_BATCH_COUNT, replicas.count()); + } } } return ret; @@ -549,40 +563,52 @@ int ObTabletTableOperator::batch_remove( const ObIArray &replicas) { int ret = OB_SUCCESS; - common::ObMySQLTransaction trans; if (OB_UNLIKELY(!inited_) || OB_ISNULL(sql_proxy_)) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret)); - } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || replicas.count() <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id), "replicas count", replicas.count()); } else { + common::ObMySQLTransaction trans; const uint64_t sql_tenant_id = gen_meta_tenant_id(tenant_id); if (OB_FAIL(trans.start(sql_proxy_, sql_tenant_id))) { LOG_WARN("start transaction failed", KR(ret), K(sql_tenant_id)); - } else { - int64_t start_idx = 0; - int64_t end_idx = min(MAX_BATCH_COUNT, replicas.count()); - while (OB_SUCC(ret) && (start_idx < end_idx)) { - if (OB_FAIL(inner_batch_remove_by_sql_(tenant_id, replicas, start_idx, end_idx, trans))) { - LOG_WARN("fail to inner batch remove", KR(ret), K(tenant_id), K(replicas), K(start_idx)); - } else { - start_idx = end_idx; - end_idx = min(start_idx + MAX_BATCH_COUNT, replicas.count()); - } - } + } else if (OB_FAIL(batch_remove(trans, tenant_id, replicas))) { + LOG_WARN("fail to batch remove", KR(ret)); } - } - if (trans.is_started()) { - int trans_ret = trans.end(OB_SUCCESS == ret); - if (OB_SUCCESS != trans_ret) { - LOG_WARN("end transaction failed", KR(trans_ret)); - ret = OB_SUCCESS == ret ? trans_ret : ret; + + if (trans.is_started()) { + int trans_ret = trans.end(OB_SUCCESS == ret); + if (OB_SUCCESS != trans_ret) { + LOG_WARN("end transaction failed", KR(trans_ret)); + ret = OB_SUCCESS == ret ? trans_ret : ret; + } } } return ret; } +int ObTabletTableOperator::batch_remove( + ObISQLClient &sql_client, + const uint64_t tenant_id, + const ObIArray &replicas) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || replicas.count() <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), "replicas count", replicas.count()); + } else { + int64_t start_idx = 0; + int64_t end_idx = min(MAX_BATCH_COUNT, replicas.count()); + while (OB_SUCC(ret) && (start_idx < end_idx)) { + if (OB_FAIL(inner_batch_remove_by_sql_(tenant_id, replicas, start_idx, end_idx, sql_client))) { + LOG_WARN("fail to inner batch remove", KR(ret), K(tenant_id), K(replicas), K(start_idx)); + } else { + start_idx = end_idx; + end_idx = min(start_idx + MAX_BATCH_COUNT, replicas.count()); + } + } + } + return ret; +} int ObTabletTableOperator::inner_batch_remove_by_sql_( const uint64_t tenant_id, diff --git a/src/share/tablet/ob_tablet_table_operator.h b/src/share/tablet/ob_tablet_table_operator.h index 120c907b1..4e5304760 100644 --- a/src/share/tablet/ob_tablet_table_operator.h +++ b/src/share/tablet/ob_tablet_table_operator.h @@ -102,6 +102,12 @@ public: int batch_update( const uint64_t tenant_id, const ObIArray &replicas); + // batch update replicas into __all_tablet_meta_table + // differ from above batch_update(), it will use @sql_client to commit, not inner sql_proxy_. + int batch_update( + common::ObISQLClient &sql_client, + const uint64_t tenant_id, + const ObIArray &replicas); // batch remove replicas from __all_tablet_meta_table // // @param [in] tenant_id, target tenant_id @@ -110,6 +116,12 @@ public: int batch_remove( const uint64_t tenant_id, const ObIArray &replicas); + // batch remove replicas from __all_tablet_meta_table + // differ from above batch_remove(), it will use @sql_client to commit, not inner sql_proxy_. + int batch_remove( + common::ObISQLClient &sql_client, + const uint64_t tenant_id, + const ObIArray &replicas); // remove residual tablet in __all_tablet_meta_table for ObServerMetaTableChecker // // @param [in] tenant_id, tenant for query diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index f95fbb9a0..7a95604f3 100644 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -881,9 +881,7 @@ int ObTabletMergeFinishTask::process() } if (OB_SUCC(ret) && ctx.param_.is_major_merge() && NULL != ctx.param_.report_) { int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(ctx.param_.report_->submit_tablet_checksums_task(MTL_ID(), ctx.param_.ls_id_, tablet_id))) { - LOG_WARN("failed to submit tablet checksums task to report", K(tmp_ret), K(MTL_ID()), K(ctx.param_.ls_id_), K(tablet_id)); - } else if (OB_TMP_FAIL(ctx.param_.report_->submit_tablet_update_task(MTL_ID(), ctx.param_.ls_id_, tablet_id))) { + if (OB_TMP_FAIL(ctx.param_.report_->submit_tablet_update_task(MTL_ID(), ctx.param_.ls_id_, tablet_id))) { LOG_WARN("failed to submit tablet update task to report", K(tmp_ret), K(MTL_ID()), K(ctx.param_.ls_id_), K(tablet_id)); } else if (OB_TMP_FAIL(ctx.ls_handle_.get_ls()->get_tablet_svr()->update_tablet_report_status(tablet_id))) { LOG_WARN("failed to update tablet report status", K(tmp_ret), K(MTL_ID()), K(tablet_id)); diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 1411ce8d9..4a2d96744 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -658,9 +658,7 @@ int ObTenantTabletScheduler::schedule_tablet_major_merge( } if (OB_SUCC(ret) && tablet_merge_finish && tablet.get_tablet_meta().report_status_.need_report()) { - if (OB_TMP_FAIL(GCTX.ob_service_->submit_tablet_checksums_task(MTL_ID(), ls_id, tablet_id))) { - LOG_WARN("failed to submit tablet checksums task to report", K(tmp_ret), K(MTL_ID()), K(tablet_id)); - } else if (OB_TMP_FAIL(GCTX.ob_service_->submit_tablet_update_task(MTL_ID(), ls_id, tablet_id))) { + if (OB_TMP_FAIL(GCTX.ob_service_->submit_tablet_update_task(MTL_ID(), ls_id, tablet_id))) { LOG_WARN("failed to submit tablet update task to report", K(tmp_ret), K(MTL_ID()), K(tablet_id)); } else if (OB_TMP_FAIL(ls.get_tablet_svr()->update_tablet_report_status(tablet_id))) { LOG_WARN("failed to update tablet report status", K(tmp_ret), K(MTL_ID()), K(tablet_id)); diff --git a/src/storage/ddl/ob_ddl_merge_task.cpp b/src/storage/ddl/ob_ddl_merge_task.cpp index 1de7924b8..9185beeed 100644 --- a/src/storage/ddl/ob_ddl_merge_task.cpp +++ b/src/storage/ddl/ob_ddl_merge_task.cpp @@ -381,8 +381,6 @@ int ObDDLTableMergeTask::process() merge_param_.ddl_task_id_, sstable->get_meta().get_col_checksum()))) { LOG_WARN("report ddl column checksum failed", K(ret), K(merge_param_)); - } else if (OB_FAIL(GCTX.ob_service_->submit_tablet_checksums_task(tenant_id, merge_param_.ls_id_, merge_param_.tablet_id_))) { - LOG_WARN("fail to submit tablet checksums task", K(ret), K(tenant_id), K(merge_param_)); } else if (OB_FAIL(GCTX.ob_service_->submit_tablet_update_task(tenant_id, merge_param_.ls_id_, merge_param_.tablet_id_))) { LOG_WARN("fail to submit tablet update task", K(ret), K(tenant_id), K(merge_param_)); } diff --git a/unittest/storage/mock_ob_meta_report.h b/unittest/storage/mock_ob_meta_report.h index 3127a214c..653017f63 100644 --- a/unittest/storage/mock_ob_meta_report.h +++ b/unittest/storage/mock_ob_meta_report.h @@ -26,9 +26,6 @@ public: MOCK_METHOD3(submit_tablet_update_task, int(const uint64_t tenant_id, const share::ObLSID &ls_id, const common::ObTabletID &tablet_id)); - MOCK_METHOD3(submit_tablet_checksums_task, int(const uint64_t tenant_id, - const share::ObLSID &ls_id, - const common::ObTabletID &tablet_id)); }; }