diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index 54d0d249f8..6da27ac56a 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -157,6 +157,7 @@ #include "observer/ob_server_event_history_table_operator.h" #include "storage/tenant_snapshot/ob_tenant_snapshot_service.h" #include "share/index_usage/ob_index_usage_info_mgr.h" +#include "rootserver/mview/ob_mview_maintenance_service.h" using namespace oceanbase; using namespace oceanbase::lib; @@ -569,6 +570,7 @@ int ObMultiTenant::init(ObAddr myaddr, MTL_BIND2(mtl_new_default, ObTenantSnapshotService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObIndexUsageInfoMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, storage::ObTabletMemtableMgrPool::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); + MTL_BIND2(mtl_new_default, rootserver::ObMViewMaintenanceService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); } if (OB_SUCC(ret)) { diff --git a/src/rootserver/CMakeLists.txt b/src/rootserver/CMakeLists.txt index c116b39104..106a7b2101 100644 --- a/src/rootserver/CMakeLists.txt +++ b/src/rootserver/CMakeLists.txt @@ -177,4 +177,11 @@ ob_set_subtarget(ob_rootserver tenant_snapshot tenant_snapshot/ob_tenant_snapshot_util.cpp ) +ob_set_subtarget(ob_rootserver mview + mview/ob_mview_maintenance_service.cpp + mview/ob_mlog_maintenance_task.cpp + mview/ob_mview_maintenance_task.cpp + mview/ob_mview_refresh_stats_maintenance_task.cpp +) + ob_server_add_target(ob_rootserver) diff --git a/src/rootserver/mview/ob_mlog_maintenance_task.cpp b/src/rootserver/mview/ob_mlog_maintenance_task.cpp new file mode 100644 index 0000000000..e0a099932d --- /dev/null +++ b/src/rootserver/mview/ob_mlog_maintenance_task.cpp @@ -0,0 +1,316 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX RS + +#include "rootserver/mview/ob_mlog_maintenance_task.h" +#include "observer/dbms_scheduler/ob_dbms_sched_job_utils.h" +#include "observer/ob_server_struct.h" +#include "observer/omt/ob_multi_tenant.h" +#include "share/ob_errno.h" +#include "share/schema/ob_mlog_info.h" + +namespace oceanbase +{ +namespace rootserver +{ +using namespace common; +using namespace share::schema; +using namespace dbms_scheduler; + +ObMLogMaintenanceTask::ObMLogMaintenanceTask() + : tenant_id_(OB_INVALID_TENANT_ID), + round_(0), + status_(StatusType::PREPARE), + error_code_(OB_SUCCESS), + last_fetch_mlog_id_(OB_INVALID_ID), + fetch_mlog_num_(0), + gc_mlog_num_(0), + start_time_(-1), + start_gc_mlog_time_(-1), + cost_us_(-1), + gc_mlog_cost_us_(-1), + fetch_finish_(false), + in_sched_(false), + is_stop_(true), + is_inited_(false) +{ +} + +ObMLogMaintenanceTask::~ObMLogMaintenanceTask() {} + +int ObMLogMaintenanceTask::init() +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("ObMLogMaintenanceTask init twice", KR(ret), KP(this)); + } else { + const uint64_t tenant_id = MTL_ID(); + tenant_id_ = tenant_id; + mlog_ids_.set_attr(ObMemAttr(tenant_id, "MLogIds")); + is_inited_ = true; + } + return ret; +} + +int ObMLogMaintenanceTask::start() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObMLogMaintenanceTask not init", KR(ret), KP(this)); + } else { + is_stop_ = false; + if (!in_sched_ && OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer *)->get_tg_id(), *this, + MLOG_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) { + LOG_WARN("fail to schedule mlog maintenance task", KR(ret)); + } else { + in_sched_ = true; + } + } + return ret; +} + +void ObMLogMaintenanceTask::stop() +{ + is_stop_ = true; + in_sched_ = false; + TG_CANCEL_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); +} + +void ObMLogMaintenanceTask::wait() { TG_WAIT_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); } + +void ObMLogMaintenanceTask::destroy() +{ + is_inited_ = false; + is_stop_ = true; + in_sched_ = false; + TG_CANCEL_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); + TG_WAIT_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); + cleanup(); + tenant_id_ = OB_INVALID_TENANT_ID; + mlog_ids_.destroy(); +} + +void ObMLogMaintenanceTask::runTimerTask() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObMLogMaintenanceTask not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(is_stop_)) { + // do nothing + } else { + switch (status_) { + case StatusType::PREPARE: + if (OB_FAIL(prepare())) { + LOG_WARN("fail to prepare", KR(ret)); + } + break; + case StatusType::GC_MLOG: + if (OB_FAIL(gc_mlog())) { + LOG_WARN("fail to gc mlog", KR(ret)); + } + break; + case StatusType::SUCCESS: + case StatusType::FAIL: + if (OB_FAIL(finish())) { + LOG_WARN("fail to finish", KR(ret)); + } + break; + default: + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected status", KR(ret), K(status_)); + break; + } + } +} + +bool ObMLogMaintenanceTask::is_retry_ret_code(int ret_code) { return OB_EAGAIN == ret_code; } + +void ObMLogMaintenanceTask::switch_status(StatusType new_status, int ret_code) +{ + int ret = OB_SUCCESS; + if (OB_LIKELY(OB_SUCCESS == ret_code)) { + status_ = new_status; + } else if (is_retry_ret_code(ret_code)) { + // do nothing + } else { + status_ = StatusType::FAIL; + error_code_ = ret_code; + } + if (in_sched_ && OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer *)->get_tg_id(), *this, + MLOG_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) { + LOG_WARN("fail to schedule mlog maintenance task", KR(ret)); + } +} + +int ObMLogMaintenanceTask::prepare() +{ + int ret = OB_SUCCESS; + uint64_t compat_version = 0; + if (start_time_ == -1) { + start_time_ = ObTimeUtil::current_time(); + } + if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, compat_version))) { + LOG_WARN("fail to get data version", KR(ret), K_(tenant_id)); + } else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_3_0_0)) { + ret = OB_EAGAIN; + LOG_WARN("version lower than 4.3, try again", KR(ret), K_(tenant_id), K(compat_version)); + } else { + ++round_; + prepare_cost_us_ = ObTimeUtil::current_time() - start_time_; + LOG_INFO("mlog maintenance task prepare success", K(tenant_id_), K(round_), + K(prepare_cost_us_)); + } + switch_status(StatusType::GC_MLOG, ret); + return ret; +} + +int ObMLogMaintenanceTask::gc_mlog() +{ + int ret = OB_SUCCESS; + StatusType new_status = StatusType::GC_MLOG; + if (start_gc_mlog_time_ == -1) { + start_gc_mlog_time_ = ObTimeUtil::current_time(); + } + if (mlog_ids_.empty()) { // fetch next batch + if (OB_FAIL(ObMLogInfo::batch_fetch_mlog_ids(*GCTX.sql_proxy_, tenant_id_, last_fetch_mlog_id_, + mlog_ids_, MLOG_NUM_FETCH_PER_SCHED))) { + LOG_WARN("fail to batch fetch mlog ids", KR(ret), K(tenant_id_), K(last_fetch_mlog_id_)); + } else { + fetch_mlog_num_ += mlog_ids_.count(); + fetch_finish_ = mlog_ids_.count() < MLOG_NUM_FETCH_PER_SCHED; + if (!mlog_ids_.empty()) { + last_fetch_mlog_id_ = mlog_ids_.at(mlog_ids_.count() - 1); + } + } + } else { // gc current batch + ObSchemaGetterGuard schema_guard; + int64_t tenant_schema_version = OB_INVALID_VERSION; + if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id_, schema_guard))) { + LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id_)); + } else if (OB_FAIL(schema_guard.get_schema_version(tenant_id_, tenant_schema_version))) { + LOG_WARN("fail to get schema version", KR(ret), K(tenant_id_)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < mlog_ids_.count(); ++i) { + const uint64_t mlog_id = mlog_ids_.at(i); + ObMLogInfo mlog_info; + const ObTableSchema *table_schema = nullptr; + bool is_exist = false; + if (OB_FAIL(ObMLogInfo::fetch_mlog_info(*GCTX.sql_proxy_, tenant_id_, mlog_id, mlog_info))) { + LOG_WARN("fail to fetch mlog info", KR(ret), K(tenant_id_), K(mlog_id)); + } else if (mlog_info.get_schema_version() > tenant_schema_version) { + is_exist = true; // skip, wait next round + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, mlog_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id_), K(mlog_id)); + } else { + is_exist = (nullptr != table_schema); + } + if (OB_SUCC(ret) && !is_exist) { + LOG_INFO("gc one mlog", K_(tenant_id), K(mlog_id)); + if (OB_FAIL(drop_mlog(mlog_id))) { + LOG_WARN("fail to drop mlog", KR(ret), K(mlog_id)); + } else { + ++gc_mlog_num_; + } + } + } + mlog_ids_.reset(); + } + if (OB_SUCC(ret) && fetch_finish_ && mlog_ids_.empty()) { // goto next status + gc_mlog_cost_us_ = ObTimeUtility::current_time() - start_gc_mlog_time_; + LOG_INFO("mlog maintenance task gc mlog success", K(tenant_id_), K(round_), K(gc_mlog_cost_us_), + K(fetch_mlog_num_), K(gc_mlog_num_)); + new_status = StatusType::SUCCESS; + } + switch_status(new_status, ret); + return ret; +} + +int ObMLogMaintenanceTask::finish() +{ + int ret = OB_SUCCESS; + cost_us_ = ObTimeUtility::current_time() - start_time_; + LOG_INFO("mlog maintenace task finish", K(tenant_id_), K(round_), K(status_), K(error_code_), + K(cost_us_), K(prepare_cost_us_), K(gc_mlog_cost_us_), K(fetch_mlog_num_), + K(gc_mlog_num_)); + // cleanup + cleanup(); + // schedule next round + if (in_sched_ && OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer *)->get_tg_id(), *this, + MLOG_MAINTENANCE_INTERVAL, false /*repeat*/))) { + LOG_WARN("fail to schedule mlog maintenance task", KR(ret)); + } + return ret; +} + +void ObMLogMaintenanceTask::cleanup() +{ + status_ = StatusType::PREPARE; + error_code_ = OB_SUCCESS; + last_fetch_mlog_id_ = OB_INVALID_ID; + mlog_ids_.reset(); + fetch_mlog_num_ = 0; + gc_mlog_num_ = 0; + start_time_ = -1; + start_gc_mlog_time_ = -1; + cost_us_ = -1; + prepare_cost_us_ = -1; + gc_mlog_cost_us_ = -1; + fetch_finish_ = false; +} + +int ObMLogMaintenanceTask::drop_mlog(uint64_t mlog_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_ID == mlog_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(mlog_id)); + } else { + ObMySQLTransaction trans; + ObMLogInfo mlog_info; + if (OB_FAIL(trans.start(GCTX.sql_proxy_, tenant_id_))) { + LOG_WARN("fail to start trans", KR(ret), K(tenant_id_)); + } else if (OB_FAIL(ObMLogInfo::fetch_mlog_info(trans, tenant_id_, mlog_id, mlog_info, + true /*for_update*/, true /*nowait*/))) { + if (OB_LIKELY(OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT == ret)) { + LOG_WARN("can not lock mlog info", KR(ret), K_(tenant_id), K(mlog_id)); + ret = OB_SUCCESS; // skip, wait next round + } else if (OB_LIKELY(OB_ENTRY_NOT_EXIST == ret)) { + LOG_WARN("mlog info not exist", KR(ret), K_(tenant_id), K(mlog_id)); + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to fetch mlog info", KR(ret), K(tenant_id_), K(mlog_id)); + } + } else if (!mlog_info.get_purge_job().empty() && + OB_FAIL(ObDBMSSchedJobUtils::remove_dbms_sched_job( + trans, tenant_id_, mlog_info.get_purge_job(), true /*if_exists*/))) { + LOG_WARN("fail to remove dbms sched job", KR(ret), K(tenant_id_), "job_name", + mlog_info.get_purge_job()); + } else if (OB_FAIL(ObMLogInfo::drop_mlog_info(trans, mlog_info))) { + LOG_WARN("fail to drop mlog info", KR(ret), K(mlog_info)); + } + if (trans.is_started()) { + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) { + LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret)); + ret = COVER_SUCC(tmp_ret); + } + } + } + return ret; +} + +} // namespace rootserver +} // namespace oceanbase diff --git a/src/rootserver/mview/ob_mlog_maintenance_task.h b/src/rootserver/mview/ob_mlog_maintenance_task.h new file mode 100644 index 0000000000..41b43fe5a3 --- /dev/null +++ b/src/rootserver/mview/ob_mlog_maintenance_task.h @@ -0,0 +1,84 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#pragma once + +#include "lib/container/ob_array.h" +#include "lib/task/ob_timer.h" + +namespace oceanbase +{ +namespace rootserver +{ +class ObMLogMaintenanceTask : public common::ObTimerTask +{ +public: + ObMLogMaintenanceTask(); + virtual ~ObMLogMaintenanceTask(); + DISABLE_COPY_ASSIGN(ObMLogMaintenanceTask); + + // for Service + int init(); + int start(); + void stop(); + void wait(); + void destroy(); + + // for TimerTask + void runTimerTask() override; + +private: + static const int64_t MLOG_MAINTENANCE_INTERVAL = 24LL * 3600 * 1000 * 1000; // 1day + static const int64_t MLOG_MAINTENANCE_SCHED_INTERVAL = 10LL * 1000 * 1000; // 10s + static const int64_t MLOG_NUM_FETCH_PER_SCHED = 1000; + + enum class StatusType + { + PREPARE = 0, + GC_MLOG = 1, + SUCCESS = 100, + FAIL = 101, + }; + +private: + static bool is_retry_ret_code(int ret_code); + void switch_status(StatusType new_status, int ret_code); + + int prepare(); + int gc_mlog(); + int finish(); + + void cleanup(); + int drop_mlog(uint64_t mlog_id); + +private: + uint64_t tenant_id_; + int64_t round_; + StatusType status_; + int error_code_; + uint64_t last_fetch_mlog_id_; + ObArray mlog_ids_; + int64_t fetch_mlog_num_; + int64_t gc_mlog_num_; + int64_t start_time_; + int64_t start_gc_mlog_time_; + int64_t cost_us_; + int64_t prepare_cost_us_; + int64_t gc_mlog_cost_us_; + bool fetch_finish_; + bool in_sched_; + bool is_stop_; + bool is_inited_; +}; + +} // namespace rootserver +} // namespace oceanbase diff --git a/src/rootserver/mview/ob_mview_maintenance_service.cpp b/src/rootserver/mview/ob_mview_maintenance_service.cpp new file mode 100644 index 0000000000..7721ff149e --- /dev/null +++ b/src/rootserver/mview/ob_mview_maintenance_service.cpp @@ -0,0 +1,174 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX RS + +#include "rootserver/mview/ob_mview_maintenance_service.h" +#include "observer/omt/ob_multi_tenant.h" +#include "share/ob_errno.h" +#include "share/rc/ob_tenant_base.h" + +namespace oceanbase +{ +namespace rootserver +{ +using namespace common; + +/** + * ObMViewMaintenanceService + */ + +ObMViewMaintenanceService::ObMViewMaintenanceService() : is_inited_(false) {} + +ObMViewMaintenanceService::~ObMViewMaintenanceService() {} + +int ObMViewMaintenanceService::mtl_init(ObMViewMaintenanceService *&service) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(service)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(service)); + } else if (OB_FAIL(service->init())) { + LOG_WARN("fail to init mview maintenance service", KR(ret)); + } + return ret; +} + +int ObMViewMaintenanceService::init() +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("ObMViewMaintenanceService init twice", KR(ret), KP(this)); + } else { + if (OB_FAIL(mlog_maintenance_task_.init())) { + LOG_WARN("fail to init mlog maintenance task", KR(ret)); + } else if (OB_FAIL(mview_maintenance_task_.init())) { + LOG_WARN("fail to init mview maintenance task", KR(ret)); + } else if (OB_FAIL(mvref_stats_maintenance_task_.init())) { + LOG_WARN("fail to init mvref stats maintenance task", KR(ret)); + } else { + is_inited_ = true; + } + } + return ret; +} + +int ObMViewMaintenanceService::start() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObMViewMaintenanceService not init", KR(ret), KP(this)); + } else { + // do nothing + } + return ret; +} + +void ObMViewMaintenanceService::stop() +{ + mlog_maintenance_task_.stop(); + mview_maintenance_task_.stop(); + mvref_stats_maintenance_task_.stop(); +} + +void ObMViewMaintenanceService::wait() +{ + mlog_maintenance_task_.wait(); + mview_maintenance_task_.wait(); + mvref_stats_maintenance_task_.wait(); +} + +void ObMViewMaintenanceService::destroy() +{ + is_inited_ = false; + mlog_maintenance_task_.destroy(); + mview_maintenance_task_.destroy(); + mvref_stats_maintenance_task_.destroy(); +} + +int ObMViewMaintenanceService::inner_switch_to_leader() +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + int64_t start_time_us = ObTimeUtility::current_time(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObMViewMaintenanceService not init", KR(ret), KP(this)); + } else { + if (OB_FAIL(mlog_maintenance_task_.start())) { + LOG_WARN("fail to start mlog maintenance task", KR(ret)); + } else if (OB_FAIL(mview_maintenance_task_.start())) { + LOG_WARN("fail to start mview maintenance task", KR(ret)); + } else if (OB_FAIL(mvref_stats_maintenance_task_.start())) { + LOG_WARN("fail to start mvref stats maintenance task", KR(ret)); + } + } + const int64_t cost_us = ObTimeUtility::current_time() - start_time_us; + FLOG_INFO("mview_maintenance: switch_to_leader", KR(ret), K(tenant_id), K(cost_us)); + return ret; +} + +int ObMViewMaintenanceService::inner_switch_to_follower() +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + int64_t start_time_us = ObTimeUtility::current_time(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObMViewMaintenanceService not init", KR(ret), KP(this)); + } else { + mvref_stats_maintenance_task_.stop(); + } + const int64_t cost_us = ObTimeUtility::current_time() - start_time_us; + FLOG_INFO("mview_maintenance: switch_to_follower", KR(ret), K(tenant_id), K(cost_us)); + return ret; +} + +void ObMViewMaintenanceService::switch_to_follower_forcedly() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(inner_switch_to_follower())) { + LOG_WARN("failed to switch leader", KR(ret)); + } +} + +int ObMViewMaintenanceService::switch_to_leader() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(inner_switch_to_leader())) { + LOG_WARN("failed to switch leader", KR(ret)); + } + return ret; +} + +int ObMViewMaintenanceService::switch_to_follower_gracefully() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(inner_switch_to_follower())) { + LOG_WARN("failed to switch leader", KR(ret)); + } + return ret; +} + +int ObMViewMaintenanceService::resume_leader() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(inner_switch_to_leader())) { + LOG_WARN("failed to switch leader", KR(ret)); + } + return ret; +} + +} // namespace rootserver +} // namespace oceanbase diff --git a/src/rootserver/mview/ob_mview_maintenance_service.h b/src/rootserver/mview/ob_mview_maintenance_service.h new file mode 100644 index 0000000000..c5a1ec864a --- /dev/null +++ b/src/rootserver/mview/ob_mview_maintenance_service.h @@ -0,0 +1,79 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#pragma once + +#include "logservice/ob_log_base_type.h" +#include "rootserver/mview/ob_mlog_maintenance_task.h" +#include "rootserver/mview/ob_mview_maintenance_task.h" +#include "rootserver/mview/ob_mview_refresh_stats_maintenance_task.h" +#include "share/scn.h" + +namespace oceanbase +{ +namespace rootserver +{ +class ObMViewMaintenanceService : public logservice::ObIReplaySubHandler, + public logservice::ObICheckpointSubHandler, + public logservice::ObIRoleChangeSubHandler +{ +public: + ObMViewMaintenanceService(); + virtual ~ObMViewMaintenanceService(); + DISABLE_COPY_ASSIGN(ObMViewMaintenanceService); + + // for MTL + static int mtl_init(ObMViewMaintenanceService *&service); + int init(); + int start(); + void stop(); + void wait(); + void destroy(); + + // for replay, do nothing + int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, + const share::SCN &scn) override final + { + UNUSED(buffer); + UNUSED(nbytes); + UNUSED(lsn); + UNUSED(scn); + return OB_SUCCESS; + } + + // for checkpoint, do nothing + share::SCN get_rec_scn() override final { return share::SCN::max_scn(); } + int flush(share::SCN &rec_scn) override final + { + UNUSED(rec_scn); + return OB_SUCCESS; + } + + // for role change + void switch_to_follower_forcedly() override final; + int switch_to_leader() override final; + int switch_to_follower_gracefully() override final; + int resume_leader() override final; + +private: + int inner_switch_to_leader(); + int inner_switch_to_follower(); + +private: + ObMLogMaintenanceTask mlog_maintenance_task_; + ObMViewMaintenanceTask mview_maintenance_task_; + ObMViewRefreshStatsMaintenanceTask mvref_stats_maintenance_task_; + bool is_inited_; +}; + +} // namespace rootserver +} // namespace oceanbase diff --git a/src/rootserver/mview/ob_mview_maintenance_task.cpp b/src/rootserver/mview/ob_mview_maintenance_task.cpp new file mode 100644 index 0000000000..9c7d93e075 --- /dev/null +++ b/src/rootserver/mview/ob_mview_maintenance_task.cpp @@ -0,0 +1,358 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX RS + +#include "rootserver/mview/ob_mview_maintenance_task.h" +#include "observer/dbms_scheduler/ob_dbms_sched_job_utils.h" +#include "observer/ob_server_struct.h" +#include "observer/omt/ob_multi_tenant.h" +#include "share/ob_errno.h" +#include "share/schema/ob_mview_info.h" +#include "share/schema/ob_mview_refresh_stats_params.h" +#include "storage/mview/ob_mview_refresh_stats_purge.h" + +namespace oceanbase +{ +namespace rootserver +{ +using namespace common; +using namespace share::schema; +using namespace dbms_scheduler; + +ObMViewMaintenanceTask::ObMViewMaintenanceTask() + : tenant_id_(OB_INVALID_TENANT_ID), + round_(0), + status_(StatusType::PREPARE), + error_code_(OB_SUCCESS), + last_fetch_mview_id_(OB_INVALID_ID), + mview_idx_(0), + gc_mview_id_(OB_INVALID_ID), + fetch_mview_num_(0), + gc_mview_num_(0), + gc_stats_num_(0), + start_time_(-1), + start_gc_mview_time_(-1), + cost_us_(-1), + gc_mview_cost_us_(-1), + fetch_finish_(false), + in_sched_(false), + is_stop_(true), + is_inited_(false) +{ +} + +ObMViewMaintenanceTask::~ObMViewMaintenanceTask() {} + +int ObMViewMaintenanceTask::init() +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("ObMViewMaintenanceTask init twice", KR(ret), KP(this)); + } else { + const uint64_t tenant_id = MTL_ID(); + tenant_id_ = tenant_id; + mview_ids_.set_attr(ObMemAttr(tenant_id, "MVIds")); + is_inited_ = true; + } + return ret; +} + +int ObMViewMaintenanceTask::start() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObMViewMaintenanceTask not init", KR(ret), KP(this)); + } else { + is_stop_ = false; + if (!in_sched_ && OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer *)->get_tg_id(), *this, + MVIEW_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) { + LOG_WARN("fail to schedule mview maintenance task", KR(ret)); + } else { + in_sched_ = true; + } + } + return ret; +} + +void ObMViewMaintenanceTask::stop() +{ + is_stop_ = true; + in_sched_ = false; + TG_CANCEL_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); +} + +void ObMViewMaintenanceTask::wait() { TG_WAIT_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); } + +void ObMViewMaintenanceTask::destroy() +{ + is_inited_ = false; + is_stop_ = true; + in_sched_ = false; + TG_CANCEL_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); + TG_WAIT_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); + cleanup(); + tenant_id_ = OB_INVALID_TENANT_ID; + mview_ids_.destroy(); +} + +void ObMViewMaintenanceTask::runTimerTask() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObMViewMaintenanceTask not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(is_stop_)) { + // do nothing + } else { + switch (status_) { + case StatusType::PREPARE: + if (OB_FAIL(prepare())) { + LOG_WARN("fail to prepare", KR(ret)); + } + break; + case StatusType::GC_MVIEW: + if (OB_FAIL(gc_mview())) { + LOG_WARN("fail to gc mview", KR(ret)); + } + break; + case StatusType::SUCCESS: + case StatusType::FAIL: + if (OB_FAIL(finish())) { + LOG_WARN("fail to finish", KR(ret)); + } + break; + default: + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected status", KR(ret), K(status_)); + break; + } + } +} + +bool ObMViewMaintenanceTask::is_retry_ret_code(int ret_code) { return OB_EAGAIN == ret_code; } + +void ObMViewMaintenanceTask::switch_status(StatusType new_status, int ret_code) +{ + int ret = OB_SUCCESS; + if (OB_LIKELY(OB_SUCCESS == ret_code)) { + status_ = new_status; + } else if (is_retry_ret_code(ret_code)) { + // do nothing + } else { + status_ = StatusType::FAIL; + error_code_ = ret_code; + } + if (in_sched_ && OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer *)->get_tg_id(), *this, + MVIEW_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) { + LOG_WARN("fail to schedule mview maintenance task", KR(ret)); + } +} + +int ObMViewMaintenanceTask::prepare() +{ + int ret = OB_SUCCESS; + uint64_t compat_version = 0; + if (start_time_ == -1) { + start_time_ = ObTimeUtil::current_time(); + } + if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, compat_version))) { + LOG_WARN("fail to get data version", KR(ret), K_(tenant_id)); + } else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_3_0_0)) { + ret = OB_EAGAIN; + LOG_WARN("version lower than 4.3, try again", KR(ret), K_(tenant_id), K(compat_version)); + } else { + ++round_; + prepare_cost_us_ = ObTimeUtil::current_time() - start_time_; + LOG_INFO("mview maintenance task prepare success", K(tenant_id_), K(round_), + K(prepare_cost_us_)); + } + switch_status(StatusType::GC_MVIEW, ret); + return ret; +} + +int ObMViewMaintenanceTask::gc_mview() +{ + int ret = OB_SUCCESS; + StatusType new_status = StatusType::GC_MVIEW; + if (start_gc_mview_time_ == -1) { + start_gc_mview_time_ = ObTimeUtil::current_time(); + } + if (mview_idx_ >= mview_ids_.count() && OB_INVALID_ID == gc_mview_id_) { // fetch next batch + mview_ids_.reset(); + mview_idx_ = 0; + if (OB_FAIL(ObMViewInfo::batch_fetch_mview_ids(*GCTX.sql_proxy_, tenant_id_, + last_fetch_mview_id_, mview_ids_, + MVIEW_NUM_FETCH_PER_SCHED))) { + LOG_WARN("fail to batch fetch mview ids", KR(ret), K(tenant_id_), K(last_fetch_mview_id_)); + } else { + fetch_mview_num_ += mview_ids_.count(); + fetch_finish_ = mview_ids_.count() < MVIEW_NUM_FETCH_PER_SCHED; + if (!mview_ids_.empty()) { + last_fetch_mview_id_ = mview_ids_.at(mview_ids_.count() - 1); + } + } + } else { // gc current batch + ObSchemaGetterGuard schema_guard; + int64_t tenant_schema_version = OB_INVALID_VERSION; + int64_t gc_mview_num = 0; + int64_t gc_stats_num = 0; + int64_t affected_rows = 0; + if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id_, schema_guard))) { + LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id_)); + } else if (OB_FAIL(schema_guard.get_schema_version(tenant_id_, tenant_schema_version))) { + LOG_WARN("fail to get schema version", KR(ret), K(tenant_id_)); + } + while (OB_SUCC(ret) && (mview_idx_ < mview_ids_.count() || OB_INVALID_ID != gc_mview_id_) && + gc_stats_num < MVREF_STATS_NUM_PURGE_PER_SCHED) { + const int64_t limit = MVREF_STATS_NUM_PURGE_PER_SCHED - gc_stats_num; + if (OB_INVALID_ID == gc_mview_id_) { // check next mview id + const uint64_t mview_id = mview_ids_.at(mview_idx_); + ObMViewInfo mview_info; + const ObTableSchema *table_schema = nullptr; + bool is_exist = false; + if (OB_FAIL( + ObMViewInfo::fetch_mview_info(*GCTX.sql_proxy_, tenant_id_, mview_id, mview_info))) { + LOG_WARN("fail to fetch mview info", KR(ret), K(tenant_id_), K(mview_id)); + } else if (mview_info.get_schema_version() > tenant_schema_version) { + is_exist = true; // skip, wait next round + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, mview_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id_), K(mview_id)); + } else { + is_exist = (nullptr != table_schema); + } + if (OB_SUCC(ret)) { + ++mview_idx_; + if (!is_exist) { + gc_mview_id_ = mview_id; + LOG_INFO("gc one mview", K_(tenant_id), K(mview_id)); + } + } + } else { + const uint64_t mview_id = gc_mview_id_; + ObMViewRefreshStats::FilterParam filter_param; + filter_param.set_mview_id(mview_id); + if (OB_FAIL(ObMViewRefreshStatsPurgeUtil::purge_refresh_stats( + *GCTX.sql_proxy_, tenant_id_, filter_param, affected_rows, limit))) { + LOG_WARN("fail to purge refresh stats", KR(ret), K(tenant_id_), K(filter_param), + K(limit)); + } else { + gc_stats_num += affected_rows; + } + if (OB_SUCC(ret) && affected_rows < limit) { + if (OB_FAIL(drop_mview(mview_id))) { + LOG_WARN("fail to drop mview", KR(ret), K(mview_id)); + } else { + gc_mview_id_ = OB_INVALID_ID; + ++gc_mview_num; + } + } + } + } + gc_mview_num_ += gc_mview_num; + gc_stats_num_ += gc_stats_num; + } + if (OB_SUCC(ret) && fetch_finish_ && mview_idx_ >= mview_ids_.count() && + OB_INVALID_ID == gc_mview_id_) { // goto next status + gc_mview_cost_us_ = ObTimeUtility::current_time() - start_gc_mview_time_; + LOG_INFO("mview maintenance task gc mview success", K(tenant_id_), K(round_), + K(gc_mview_cost_us_), K(fetch_mview_num_), K(gc_mview_num_), K(gc_stats_num_)); + new_status = StatusType::SUCCESS; + } + switch_status(new_status, ret); + return ret; +} + +int ObMViewMaintenanceTask::finish() +{ + int ret = OB_SUCCESS; + cost_us_ = ObTimeUtility::current_time() - start_time_; + LOG_INFO("mview maintenace task finish", K(tenant_id_), K(round_), K(status_), K(error_code_), + K(cost_us_), K(prepare_cost_us_), K(gc_mview_cost_us_), K(fetch_mview_num_), + K(gc_mview_num_), K(gc_stats_num_)); + // cleanup + cleanup(); + // schedule next round + if (in_sched_ && OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer *)->get_tg_id(), *this, + MVIEW_MAINTENANCE_INTERVAL, false /*repeat*/))) { + LOG_WARN("fail to schedule mview maintenance task", KR(ret)); + } + return ret; +} + +void ObMViewMaintenanceTask::cleanup() +{ + status_ = StatusType::PREPARE; + error_code_ = OB_SUCCESS; + last_fetch_mview_id_ = OB_INVALID_ID; + mview_ids_.reset(); + mview_idx_ = 0; + gc_mview_id_ = OB_INVALID_ID; + fetch_mview_num_ = 0; + gc_mview_num_ = 0; + gc_stats_num_ = 0; + start_time_ = -1; + start_gc_mview_time_ = -1; + cost_us_ = -1; + prepare_cost_us_ = -1; + gc_mview_cost_us_ = -1; + fetch_finish_ = false; +} + +int ObMViewMaintenanceTask::drop_mview(uint64_t mview_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_ID == mview_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(mview_id)); + } else { + ObMySQLTransaction trans; + ObMViewInfo mview_info; + if (OB_FAIL(trans.start(GCTX.sql_proxy_, tenant_id_))) { + LOG_WARN("fail to start trans", KR(ret), K(tenant_id_)); + } else if (OB_FAIL(ObMViewInfo::fetch_mview_info(trans, tenant_id_, mview_id, mview_info, + true /*for_update*/, true /*nowait*/))) { + if (OB_LIKELY(OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT == ret)) { + LOG_WARN("can not lock mview info", KR(ret), K_(tenant_id), K(mview_id)); + ret = OB_SUCCESS; // skip, wait next round + } else if (OB_LIKELY(OB_ENTRY_NOT_EXIST == ret)) { + LOG_WARN("mview info not exist", KR(ret), K_(tenant_id), K(mview_id)); + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to fetch mview info", KR(ret), K(tenant_id_), K(mview_id)); + } + } else if (OB_FAIL(ObMViewRefreshStatsParams::drop_mview_refresh_stats_params( + trans, tenant_id_, mview_id, true /*if_exists*/))) { + LOG_WARN("fail to drop mview refresh stats params", KR(ret), K(tenant_id_), K(mview_id)); + } else if (!mview_info.get_refresh_job().empty() && + OB_FAIL(ObDBMSSchedJobUtils::remove_dbms_sched_job( + trans, tenant_id_, mview_info.get_refresh_job(), true /*if_exists*/))) { + LOG_WARN("fail to remove dbms sched job", KR(ret), K(tenant_id_), "job_name", + mview_info.get_refresh_job()); + } else if (OB_FAIL(ObMViewInfo::drop_mview_info(trans, mview_info))) { + LOG_WARN("fail to drop mview info", KR(ret), K(mview_info)); + } + if (trans.is_started()) { + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) { + LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret)); + ret = COVER_SUCC(tmp_ret); + } + } + } + return ret; +} + +} // namespace rootserver +} // namespace oceanbase diff --git a/src/rootserver/mview/ob_mview_maintenance_task.h b/src/rootserver/mview/ob_mview_maintenance_task.h new file mode 100644 index 0000000000..46e6675325 --- /dev/null +++ b/src/rootserver/mview/ob_mview_maintenance_task.h @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#pragma once + +#include "lib/container/ob_array.h" +#include "lib/task/ob_timer.h" + +namespace oceanbase +{ +namespace rootserver +{ +class ObMViewMaintenanceTask : public common::ObTimerTask +{ +public: + ObMViewMaintenanceTask(); + virtual ~ObMViewMaintenanceTask(); + DISABLE_COPY_ASSIGN(ObMViewMaintenanceTask); + + // for Service + int init(); + int start(); + void stop(); + void wait(); + void destroy(); + + // for TimerTask + void runTimerTask() override; + +private: + static const int64_t MVIEW_MAINTENANCE_INTERVAL = 24LL * 3600 * 1000 * 1000; // 1day + static const int64_t MVIEW_MAINTENANCE_SCHED_INTERVAL = 10LL * 1000 * 1000; // 10s + static const int64_t MVIEW_NUM_FETCH_PER_SCHED = 1000; + static const int64_t MVREF_STATS_NUM_PURGE_PER_SCHED = 1000; + + enum class StatusType + { + PREPARE = 0, + GC_MVIEW = 1, + SUCCESS = 100, + FAIL = 101, + }; + +private: + static bool is_retry_ret_code(int ret_code); + void switch_status(StatusType new_status, int ret_code); + + int prepare(); + int gc_mview(); + int finish(); + + void cleanup(); + int drop_mview(uint64_t mview_id); + +private: + uint64_t tenant_id_; + int64_t round_; + StatusType status_; + int error_code_; + uint64_t last_fetch_mview_id_; + ObArray mview_ids_; + int64_t mview_idx_; + uint64_t gc_mview_id_; + int64_t fetch_mview_num_; + int64_t gc_mview_num_; + int64_t gc_stats_num_; + int64_t start_time_; + int64_t start_gc_mview_time_; + int64_t cost_us_; + int64_t prepare_cost_us_; + int64_t gc_mview_cost_us_; + bool fetch_finish_; + bool in_sched_; + bool is_stop_; + bool is_inited_; +}; + +} // namespace rootserver +} // namespace oceanbase diff --git a/src/rootserver/mview/ob_mview_refresh_stats_maintenance_task.cpp b/src/rootserver/mview/ob_mview_refresh_stats_maintenance_task.cpp new file mode 100644 index 0000000000..6314a66364 --- /dev/null +++ b/src/rootserver/mview/ob_mview_refresh_stats_maintenance_task.cpp @@ -0,0 +1,299 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX RS + +#include "rootserver/mview/ob_mview_refresh_stats_maintenance_task.h" +#include "observer/ob_server_struct.h" +#include "observer/omt/ob_multi_tenant.h" +#include "share/ob_errno.h" +#include "share/schema/ob_mview_info.h" +#include "share/schema/ob_mview_refresh_stats.h" +#include "share/schema/ob_mview_refresh_stats_params.h" +#include "storage/mview/ob_mview_refresh_stats_purge.h" + +namespace oceanbase +{ +namespace rootserver +{ +using namespace common; +using namespace share::schema; + +/** + * ObMViewRefreshStatsMaintenanceTask + */ + +ObMViewRefreshStatsMaintenanceTask::ObMViewRefreshStatsMaintenanceTask() + : tenant_id_(OB_INVALID_TENANT_ID), + round_(0), + status_(StatusType::PREPARE), + error_code_(OB_SUCCESS), + last_fetch_mview_id_(OB_INVALID_ID), + mview_idx_(0), + fetch_mview_num_(0), + purge_mview_num_(0), + purge_stats_num_(0), + start_time_(-1), + start_purge_time_(-1), + cost_us_(-1), + prepare_cost_us_(-1), + purge_cost_us_(-1), + fetch_finish_(false), + in_sched_(false), + is_stop_(true), + is_inited_(false) +{ +} + +ObMViewRefreshStatsMaintenanceTask::~ObMViewRefreshStatsMaintenanceTask() {} + +int ObMViewRefreshStatsMaintenanceTask::init() +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("ObMViewRefreshStatsMaintenanceTask init twice", KR(ret), KP(this)); + } else { + const uint64_t tenant_id = MTL_ID(); + tenant_id_ = tenant_id; + mview_ids_.set_attr(ObMemAttr(tenant_id, "MVIds")); + is_inited_ = true; + } + return ret; +} + +int ObMViewRefreshStatsMaintenanceTask::start() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObMViewRefreshStatsMaintenanceTask not init", KR(ret), KP(this)); + } else { + is_stop_ = false; + if (!in_sched_ && + OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer *)->get_tg_id(), *this, + MVREF_STATS_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) { + LOG_WARN("fail to schedule mvref stats maintenance task", KR(ret)); + } else { + in_sched_ = true; + } + } + return ret; +} + +void ObMViewRefreshStatsMaintenanceTask::stop() +{ + is_stop_ = true; + in_sched_ = false; + TG_CANCEL_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); +} + +void ObMViewRefreshStatsMaintenanceTask::wait() +{ + TG_WAIT_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); +} + +void ObMViewRefreshStatsMaintenanceTask::destroy() +{ + is_inited_ = false; + is_stop_ = true; + in_sched_ = false; + TG_CANCEL_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); + TG_WAIT_TASK(MTL(omt::ObSharedTimer *)->get_tg_id(), *this); + cleanup(); + tenant_id_ = OB_INVALID_TENANT_ID; + mview_ids_.destroy(); +} + +void ObMViewRefreshStatsMaintenanceTask::runTimerTask() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObMViewRefreshStatsMaintenanceTask not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(is_stop_)) { + // do nothing + } else { + switch (status_) { + case StatusType::PREPARE: + if (OB_FAIL(prepare())) { + LOG_WARN("fail to prepare", KR(ret)); + } + break; + case StatusType::PURGE: + if (OB_FAIL(purge())) { + LOG_WARN("fail to purge", KR(ret)); + } + break; + case StatusType::SUCCESS: + case StatusType::FAIL: + if (OB_FAIL(finish())) { + LOG_WARN("fail to finish", KR(ret)); + } + break; + default: + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected status", KR(ret), K(status_)); + break; + } + } +} + +bool ObMViewRefreshStatsMaintenanceTask::is_retry_ret_code(int ret_code) +{ + return OB_EAGAIN == ret_code; +} + +void ObMViewRefreshStatsMaintenanceTask::switch_status(StatusType new_status, int ret_code) +{ + int ret = OB_SUCCESS; + if (OB_LIKELY(OB_SUCCESS == ret_code)) { + status_ = new_status; + } else if (is_retry_ret_code(ret_code)) { + // do nothing + } else { + status_ = StatusType::FAIL; + error_code_ = ret_code; + } + if (in_sched_ && OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer *)->get_tg_id(), *this, + MVREF_STATS_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) { + LOG_WARN("fail to schedule mvref stats maintenance task", KR(ret)); + } +} + +int ObMViewRefreshStatsMaintenanceTask::prepare() +{ + int ret = OB_SUCCESS; + uint64_t compat_version = 0; + if (start_time_ == -1) { + start_time_ = ObTimeUtil::current_time(); + } + if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, compat_version))) { + LOG_WARN("fail to get data version", KR(ret), K_(tenant_id)); + } else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_3_0_0)) { + ret = OB_EAGAIN; + LOG_WARN("version lower than 4.3, try again", KR(ret), K_(tenant_id), K(compat_version)); + } else { + ++round_; + prepare_cost_us_ = ObTimeUtil::current_time() - start_time_; + LOG_INFO("mvref stats maintenance task prepare success", K(tenant_id_), K(round_), + K(prepare_cost_us_)); + } + switch_status(StatusType::PURGE, ret); + return ret; +} + +int ObMViewRefreshStatsMaintenanceTask::purge() +{ + int ret = OB_SUCCESS; + StatusType new_status = StatusType::PURGE; + if (start_purge_time_ == -1) { + start_purge_time_ = ObTimeUtil::current_time(); + } + if (mview_idx_ >= mview_ids_.count()) { // fetch next batch + mview_ids_.reset(); + mview_idx_ = 0; + if (OB_FAIL(ObMViewInfo::batch_fetch_mview_ids(*GCTX.sql_proxy_, tenant_id_, + last_fetch_mview_id_, mview_ids_, + MVIEW_NUM_FETCH_PER_SCHED))) { + LOG_WARN("fail to batch fetch mview ids", KR(ret), K(tenant_id_), K(last_fetch_mview_id_)); + } else { + fetch_mview_num_ += mview_ids_.count(); + fetch_finish_ = mview_ids_.count() < MVIEW_NUM_FETCH_PER_SCHED; + if (!mview_ids_.empty()) { + last_fetch_mview_id_ = mview_ids_.at(mview_ids_.count() - 1); + } + } + } else { // purge current batch + int64_t purge_mview_num = 0; + int64_t purge_stats_num = 0; + int64_t affected_rows = 0; + while (OB_SUCC(ret) && mview_idx_ < mview_ids_.count() && + purge_stats_num < MVREF_STATS_NUM_PURGE_PER_SCHED) { + const uint64_t mview_id = mview_ids_.at(mview_idx_); + const int64_t limit = MVREF_STATS_NUM_PURGE_PER_SCHED - purge_stats_num; + ObMViewRefreshStatsParams refresh_stats_params; + ObMViewRefreshStats::FilterParam filter_param; + if (OB_FAIL(ObMViewRefreshStatsParams::fetch_mview_refresh_stats_params( + *GCTX.sql_proxy_, tenant_id_, mview_id, refresh_stats_params, + true /*with_sys_defaults*/))) { + LOG_WARN("fail to fetch mview refresh stats params", KR(ret), K(tenant_id_), K(mview_id)); + } else if (refresh_stats_params.get_retention_period() == -1) { + // never be purged, skip + affected_rows = 0; + } else { + filter_param.set_mview_id(mview_id); + filter_param.set_retention_period(refresh_stats_params.get_retention_period()); + if (OB_FAIL(ObMViewRefreshStatsPurgeUtil::purge_refresh_stats( + *GCTX.sql_proxy_, tenant_id_, filter_param, affected_rows, limit))) { + LOG_WARN("fail to purge refresh stats", KR(ret), K(tenant_id_), K(filter_param), + K(limit)); + } + } + if (OB_SUCC(ret)) { + purge_stats_num += affected_rows; + if (affected_rows < limit) { + ++purge_mview_num; + ++mview_idx_; + } + } + } + purge_mview_num_ += purge_mview_num; + purge_stats_num_ += purge_stats_num; + } + if (OB_SUCC(ret) && fetch_finish_ && mview_idx_ >= mview_ids_.count()) { // goto next status + purge_cost_us_ = ObTimeUtility::current_time() - start_purge_time_; + LOG_INFO("mvref stats maintenance task purge success", K(tenant_id_), K(round_), + K(purge_cost_us_), K(fetch_mview_num_), K(purge_mview_num_), K(purge_stats_num_)); + new_status = StatusType::SUCCESS; + } + switch_status(new_status, ret); + return ret; +} + +int ObMViewRefreshStatsMaintenanceTask::finish() +{ + int ret = OB_SUCCESS; + cost_us_ = ObTimeUtility::current_time() - start_time_; + LOG_INFO("mvref stats maintenace task finish", K(tenant_id_), K(round_), K(status_), + K(error_code_), K(cost_us_), K(prepare_cost_us_), K(purge_cost_us_), K(fetch_mview_num_), + K(purge_mview_num_), K(purge_stats_num_)); + // cleanup + cleanup(); + // schedule next round + if (in_sched_ && OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer *)->get_tg_id(), *this, + MVREF_STATS_MAINTENANCE_INTERVAL, false /*repeat*/))) { + LOG_WARN("fail to schedule mvref stats maintenance task", KR(ret)); + } + return ret; +} + +void ObMViewRefreshStatsMaintenanceTask::cleanup() +{ + status_ = StatusType::PREPARE; + error_code_ = OB_SUCCESS; + last_fetch_mview_id_ = OB_INVALID_ID; + mview_ids_.reset(); + mview_idx_ = 0; + fetch_mview_num_ = 0; + purge_mview_num_ = 0; + purge_stats_num_ = 0; + start_time_ = -1; + start_purge_time_ = -1; + cost_us_ = -1; + prepare_cost_us_ = -1; + purge_cost_us_ = -1; + fetch_finish_ = false; +} + +} // namespace rootserver +} // namespace oceanbase diff --git a/src/rootserver/mview/ob_mview_refresh_stats_maintenance_task.h b/src/rootserver/mview/ob_mview_refresh_stats_maintenance_task.h new file mode 100644 index 0000000000..f0caa3ce80 --- /dev/null +++ b/src/rootserver/mview/ob_mview_refresh_stats_maintenance_task.h @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#pragma once + +#include "lib/container/ob_array.h" +#include "lib/task/ob_timer.h" + +namespace oceanbase +{ +namespace rootserver +{ +class ObMViewRefreshStatsMaintenanceTask : public common::ObTimerTask +{ +public: + ObMViewRefreshStatsMaintenanceTask(); + virtual ~ObMViewRefreshStatsMaintenanceTask(); + DISABLE_COPY_ASSIGN(ObMViewRefreshStatsMaintenanceTask); + + // for Service + int init(); + int start(); + void stop(); + void wait(); + void destroy(); + + // for TimerTask + void runTimerTask() override; + +private: + static const int64_t MVREF_STATS_MAINTENANCE_INTERVAL = 24LL * 3600 * 1000 * 1000; // 1day + static const int64_t MVREF_STATS_MAINTENANCE_SCHED_INTERVAL = 10LL * 1000 * 1000; // 10s + static const int64_t MVIEW_NUM_FETCH_PER_SCHED = 1000; + static const int64_t MVREF_STATS_NUM_PURGE_PER_SCHED = 1000; + + enum class StatusType + { + PREPARE = 0, + PURGE = 1, + SUCCESS = 100, + FAIL = 101, + }; + +private: + static bool is_retry_ret_code(int ret_code); + void switch_status(StatusType new_status, int ret_code); + + int prepare(); + int purge(); + int finish(); + + void cleanup(); + +private: + uint64_t tenant_id_; + int64_t round_; + StatusType status_; + int error_code_; + uint64_t last_fetch_mview_id_; + ObArray mview_ids_; + int64_t mview_idx_; + int64_t fetch_mview_num_; + int64_t purge_mview_num_; + int64_t purge_stats_num_; + int64_t start_time_; + int64_t start_purge_time_; + int64_t cost_us_; + int64_t prepare_cost_us_; + int64_t purge_cost_us_; + bool fetch_finish_; + bool in_sched_; + bool is_stop_; + bool is_inited_; +}; + +} // namespace rootserver +} // namespace oceanbase diff --git a/src/share/rc/ob_tenant_base.h b/src/share/rc/ob_tenant_base.h index 261bee816a..47f737f3ee 100755 --- a/src/share/rc/ob_tenant_base.h +++ b/src/share/rc/ob_tenant_base.h @@ -174,6 +174,7 @@ namespace rootserver class ObStandbySchemaRefreshTrigger; class ObTenantSnapshotScheduler; class ObCloneScheduler; + class ObMViewMaintenanceService; } namespace observer { @@ -350,7 +351,8 @@ using ObTableScanIteratorObjPool = common::ObServerObjectPool &mlog_ids, + int64_t limit) +{ + int ret = OB_SUCCESS; + mlog_ids.reset(); + const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id); + SMART_VAR(ObMySQLProxy::MySQLResult, res) + { + ObMySQLResult *result = nullptr; + ObSqlString sql; + uint64_t mlog_id = OB_INVALID_ID; + if (OB_FAIL(sql.assign_fmt("SELECT mlog_id FROM %s WHERE tenant_id = 0", OB_ALL_MLOG_TNAME))) { + LOG_WARN("fail to assign sql", KR(ret)); + } else if (OB_INVALID_ID != last_mlog_id && + OB_FAIL(sql.append_fmt(" and mlog_id > %ld", last_mlog_id))) { + LOG_WARN("fail to append sql", KR(ret)); + } else if (OB_FAIL(sql.append(" order by mlog_id"))) { + LOG_WARN("fail to append sql", KR(ret)); + } else if (limit > 0 && OB_FAIL(sql.append_fmt(" limit %ld", limit))) { + LOG_WARN("fail to append sql", KR(ret)); + } else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) { + LOG_WARN("execute sql failed", KR(ret), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result is null", KR(ret)); + } + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_UNLIKELY(OB_ITER_END != ret)) { + LOG_WARN("fail to get next", KR(ret)); + } else { + ret = OB_SUCCESS; + break; + } + } + EXTRACT_INT_FIELD_MYSQL(*result, "mlog_id", mlog_id, uint64_t); + OZ(mlog_ids.push_back(mlog_id)); + } + } + return ret; +} + } // namespace schema } // namespace share } // namespace oceanbase diff --git a/src/share/schema/ob_mlog_info.h b/src/share/schema/ob_mlog_info.h index 9797f66dcc..6d43284cc2 100644 --- a/src/share/schema/ob_mlog_info.h +++ b/src/share/schema/ob_mlog_info.h @@ -76,6 +76,9 @@ public: const uint64_t mlog_id); static int fetch_mlog_info(ObISQLClient &sql_client, uint64_t tenant_id, uint64_t mlog_id, ObMLogInfo &mlog_info, bool for_update = false, bool nowait = false); + static int batch_fetch_mlog_ids(ObISQLClient &sql_client, uint64_t tenant_id, + uint64_t last_mlog_id, ObIArray &mlog_ids, + int64_t limit = -1); TO_STRING_KV(K_(tenant_id), K_(mlog_id), diff --git a/src/share/schema/ob_mview_info.cpp b/src/share/schema/ob_mview_info.cpp index 71cd826b30..0e3a0df269 100644 --- a/src/share/schema/ob_mview_info.cpp +++ b/src/share/schema/ob_mview_info.cpp @@ -415,6 +415,50 @@ int ObMViewInfo::fetch_mview_info(ObISQLClient &sql_client, uint64_t tenant_id, return ret; } +int ObMViewInfo::batch_fetch_mview_ids(ObISQLClient &sql_client, uint64_t tenant_id, + uint64_t last_mview_id, ObIArray &mview_ids, + int64_t limit) +{ + int ret = OB_SUCCESS; + mview_ids.reset(); + const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id); + SMART_VAR(ObMySQLProxy::MySQLResult, res) + { + ObMySQLResult *result = nullptr; + ObSqlString sql; + uint64_t mview_id = OB_INVALID_ID; + if (OB_FAIL( + sql.assign_fmt("SELECT mview_id FROM %s WHERE tenant_id = 0", OB_ALL_MVIEW_TNAME))) { + LOG_WARN("fail to assign sql", KR(ret)); + } else if (OB_INVALID_ID != last_mview_id && + OB_FAIL(sql.append_fmt(" and mview_id > %ld", last_mview_id))) { + LOG_WARN("fail to append sql", KR(ret)); + } else if (OB_FAIL(sql.append(" order by mview_id"))) { + LOG_WARN("fail to append sql", KR(ret)); + } else if (limit > 0 && OB_FAIL(sql.append_fmt(" limit %ld", limit))) { + LOG_WARN("fail to append sql", KR(ret)); + } else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) { + LOG_WARN("execute sql failed", KR(ret), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result is null", KR(ret)); + } + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_UNLIKELY(OB_ITER_END != ret)) { + LOG_WARN("fail to get next", KR(ret)); + } else { + ret = OB_SUCCESS; + break; + } + } + EXTRACT_INT_FIELD_MYSQL(*result, "mview_id", mview_id, uint64_t); + OZ(mview_ids.push_back(mview_id)); + } + } + return ret; +} + } // namespace schema } // namespace share } // namespace oceanbase diff --git a/src/share/schema/ob_mview_info.h b/src/share/schema/ob_mview_info.h index 1def5edb4a..d9ee9d808c 100644 --- a/src/share/schema/ob_mview_info.h +++ b/src/share/schema/ob_mview_info.h @@ -80,6 +80,9 @@ public: static int fetch_mview_info(ObISQLClient &sql_client, uint64_t tenant_id, uint64_t mview_id, ObMViewInfo &mview_info, bool for_update = false, bool nowait = false); + static int batch_fetch_mview_ids(ObISQLClient &sql_client, uint64_t tenant_id, + uint64_t last_mview_id, ObIArray &mview_ids, + int64_t limit = -1); TO_STRING_KV(K_(tenant_id), K_(mview_id), diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index 521b3a476b..6527a7f9c9 100644 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -70,6 +70,7 @@ #include "observer/table/ttl/ob_ttl_service.h" #include "observer/table/ttl/ob_tenant_tablet_ttl_mgr.h" #include "share/wr/ob_wr_service.h" +#include "rootserver/mview/ob_mview_maintenance_service.h" namespace oceanbase { @@ -337,6 +338,10 @@ int ObLS::init(const share::ObLSID &ls_id, } } + if (OB_SUCC(ret) && ls_id.is_sys_ls() && (is_user_tenant(tenant_id) || is_sys_tenant(tenant_id))) { + REGISTER_TO_LOGSERVICE(logservice::MVIEW_MAINTENANCE_SERVICE_LOG_BASE_TYPE, MTL(rootserver::ObMViewMaintenanceService *)); + LOG_INFO("mview maintenance service is registered successfully", KR(ret)); + } if (OB_SUCC(ret)) { // don't delete it election_priority_.set_ls_id(ls_id); @@ -934,6 +939,10 @@ void ObLS::destroy() } } + if (ls_meta_.ls_id_.is_sys_ls() && (is_user_tenant(MTL_ID()) || is_sys_tenant(MTL_ID()))) { + UNREGISTER_FROM_LOGSERVICE(logservice::MVIEW_MAINTENANCE_SERVICE_LOG_BASE_TYPE, MTL(rootserver::ObMViewMaintenanceService *)); + } + tx_table_.destroy(); lock_table_.destroy(); ls_tablet_svr_.destroy();