[CP] Check rs status while process async task

This commit is contained in:
tino247
2023-05-17 11:11:34 +00:00
committed by ob-robot
parent a47570d7f6
commit 0a76ae999f
7 changed files with 46 additions and 340 deletions

View File

@ -39,7 +39,6 @@ ob_set_subtarget(ob_rootserver common
ob_ddl_help.cpp
ob_ddl_operator.cpp
ob_ddl_sql_generator.cpp
ob_inner_table_monitor.cpp
ob_index_builder.cpp
ob_locality_util.cpp
ob_resource_weight_parser.cpp

View File

@ -1,193 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "ob_inner_table_monitor.h"
#include "lib/oblog/ob_log_module.h"
#include "lib/string/ob_sql_string.h"
#include "lib/stat/ob_diagnose_info.h"
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "common/ob_role_mgr.h"
#include "share/config/ob_server_config.h"
#include "share/ob_common_rpc_proxy.h"
#include "share/inner_table/ob_inner_table_schema.h"
#include "rootserver/ob_root_service.h"
namespace oceanbase
{
using namespace common;
using namespace share;
namespace rootserver
{
ObInnerTableMonitor::ObInnerTableMonitor()
: inited_(false),
rs_proxy_(NULL),
sql_proxy_(NULL),
root_service_(NULL)
{
}
int ObInnerTableMonitor::init(ObMySQLProxy &mysql_proxy,
obrpc::ObCommonRpcProxy &rs_proxy,
ObRootService &root_service) {
int ret = OB_SUCCESS;
if (inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice");
} else {
sql_proxy_ = &mysql_proxy;
rs_proxy_ = &rs_proxy;
root_service_ = &root_service;
inited_ = true;
}
return ret;
}
int ObInnerTableMonitor::check_inner_stat() const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql_proxy_ is null", K(ret));
} else if (OB_ISNULL(rs_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common_proxy_ is null", K(ret));
} else if (OB_ISNULL(root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("root_service is null", K(ret));
}
return ret;
}
int ObInnerTableMonitor::get_all_tenants_from_stats(
ObIArray<uint64_t> &tenant_ids)
{
int ret = OB_SUCCESS;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
sqlclient::ObMySQLResult *result = NULL;
ObSqlString sql;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("check inner stat failed", K(ret));
} else if (OB_FAIL(sql.append_fmt("SELECT distinct tenant_id FROM %s",
OB_ALL_TENANT_HISTORY_TNAME))) {
LOG_WARN("assign sql string failed", K(ret));
} else if (OB_FAIL(sql_proxy_->read(res, sql.ptr()))) {
LOG_WARN("execute sql failed", K(sql), K(ret));
} else if (NULL == (result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get result failed", K(ret));
}
while (OB_SUCC(ret)) {
if (OB_FAIL(result->next())) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("get next result failed", K(ret));
}
} else {
int64_t tenant_id = 0;
if (OB_FAIL(result->get_int("tenant_id", tenant_id))) {
LOG_WARN("get tenant id failed", K(ret));
} else if (OB_FAIL(tenant_ids.push_back(tenant_id))) {
LOG_WARN("push back tenant id failed", K(ret));
}
}
}
}
return ret;
}
int ObInnerTableMonitor::purge_inner_table_history()
{
bool ret = OB_SUCCESS;
ObSEArray<uint64_t, 16> tenant_ids;
if (OB_FAIL(get_all_tenants_from_stats(tenant_ids))) {
LOG_WARN("get_all_tenants_from_stats failed", K(ret));
} else if (OB_FAIL(purge_recyclebin_objects(tenant_ids))) {
LOG_WARN("purge recyclebin objects failed", K(ret));
}
return ret;
}
int ObInnerTableMonitor::check_cancel() const
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("check inner stat failed", K(ret));
} else if (!root_service_->in_service()) {
ret = OB_RS_SHUTDOWN;
LOG_WARN("root service is shutdown", K(ret));
}
return ret;
}
int ObInnerTableMonitor::purge_recyclebin_objects(ObIArray<uint64_t> &tenant_ids)
{
int ret = OB_SUCCESS;
const int64_t current_time = ObTimeUtility::current_time();
obrpc::Int64 expire_time = current_time - GCONF.schema_history_expire_time;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("check inner stat failed", K(ret));
} else {
const int64_t SLEEP_INTERVAL = 100 * 1000; //100ms
const int64_t SLEEP_TIMES = 100;
const int64_t PURGE_EACH_TIME = 10;
obrpc::Int64 affected_rows = 0;
bool is_tenant_finish = false;
obrpc::ObPurgeRecycleBinArg arg;
//ignore ret
for (int i = 0; i < tenant_ids.count() && OB_SUCCESS == check_cancel(); ++i) {
is_tenant_finish = false;
affected_rows = 0;
arg.tenant_id_ = tenant_ids.at(i);
arg.purge_num_ = PURGE_EACH_TIME;
arg.expire_time_ = expire_time;
LOG_INFO("start purge recycle objects of tenant", K(arg));
int retry_cnt = 0;
while (!is_tenant_finish && OB_SUCCESS == check_cancel()) {
// In case of holding DDL thread in long time, Each tenant only purge 10 recycle objects in one round.
int64_t start_time = ObTimeUtility::current_time();
if (OB_FAIL(rs_proxy_->purge_expire_recycle_objects(arg, affected_rows))) {
LOG_WARN("purge reyclebin objects failed", K(ret),
K(current_time), K(expire_time), K(affected_rows), K(arg), K(retry_cnt));
if (retry_cnt < 3) {
is_tenant_finish = false;
++retry_cnt;
} else {
LOG_WARN("retry purge recyclebin object of tenant failed, ignore it", K(retry_cnt), K(ret), K(arg));
is_tenant_finish = true;
}
} else {
retry_cnt = 0;
is_tenant_finish = PURGE_EACH_TIME == affected_rows ? false : true;
}
int64_t cost_time = ObTimeUtility::current_time() - start_time;
LOG_INFO("purge recycle objects", K(ret), K(cost_time),
K(expire_time), K(current_time), K(affected_rows), K(is_tenant_finish));
//sleep 10s so that will not block the rs DDL thread
int i = 0;
while (OB_SUCCESS == check_cancel() && i < SLEEP_TIMES) {
ob_usleep(SLEEP_INTERVAL);
++i;
}
}
}
}
return ret;
}
} //namespace rootserver
} //namespace oceanbase

View File

@ -1,61 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_SERVER_INNER_TABLE_MONITOR_H_
#define OB_SERVER_INNER_TABLE_MONITOR_H_
#include "lib/net/ob_addr.h" // tbsys
#include "lib/mysqlclient/ob_mysql_transaction.h" // common::ObMySQLTransaction
namespace oceanbase
{
namespace common
{
class ObServerConfig;
class ObMySQLProxy;
}
namespace obrpc
{
class ObCommonRpcProxy;
}
namespace rootserver
{
class ObRootService;
class ObInnerTableMonitor
{
public:
ObInnerTableMonitor();
~ObInnerTableMonitor() = default;
int init(common::ObMySQLProxy &sql_proxy,
obrpc::ObCommonRpcProxy &rpc_proxy,
ObRootService &root_service);
int purge_inner_table_history();
private:
int check_inner_stat() const;
int get_all_tenants_from_stats(
common::ObIArray<uint64_t> &tenant_ids);
int purge_recyclebin_objects(common::ObIArray<uint64_t> &tenant_id);
int check_cancel() const;
private:
bool inited_;
obrpc::ObCommonRpcProxy *rs_proxy_;
common::ObMySQLProxy *sql_proxy_;
ObRootService *root_service_;
};
}
}
#endif

View File

@ -132,13 +132,20 @@ int ObTenantChecker::check_create_tenant_end_()
// skip
} else if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) {
LOG_WARN("get_tenant_ids failed", K(ret));
} else if (OB_ISNULL(GCTX.root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rootservice is null", KR(ret));
} else {
const ObSimpleTenantSchema *tenant_schema = NULL;
int64_t schema_version = OB_INVALID_VERSION;
int64_t baseline_schema_version = OB_INVALID_VERSION;
FOREACH_CNT(tenant_id, tenant_ids) {
// overwrite ret
if (OB_FAIL(schema_service_->get_tenant_schema_guard(*tenant_id, schema_guard))) {
if (!GCTX.root_service_->is_full_service()) {
ret = OB_CANCELED;
LOG_WARN("rs is not in full service", KR(ret));
break;
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(*tenant_id, schema_guard))) {
LOG_WARN("get_schema_guard failed", KR(ret), K(*tenant_id));
} else if (OB_FAIL(schema_guard.get_schema_version(*tenant_id, schema_version))) {
LOG_WARN("fail to get tenant schema version", KR(ret), K(*tenant_id));
@ -295,13 +302,22 @@ int ObTableGroupChecker::inspect_(
LOG_WARN("get schema guard failed", K(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_ids_in_tenant(tenant_id, table_ids))) {
LOG_WARN("fail to get table_ids", KR(ret), K(tenant_id));
} else if (OB_ISNULL(GCTX.root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rootservice is null", KR(ret));
} else if (!GCTX.root_service_->is_full_service()) {
ret = OB_CANCELED;
LOG_WARN("rs is not in full service", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < table_ids.count(); i++) {
const uint64_t table_id = table_ids.at(i);
const ObTableSchema *table = NULL;
// schema guard cannot be used repeatedly in iterative logic,
// otherwise it will cause a memory hike in schema cache
if (OB_FAIL(schema_service_.get_tenant_schema_guard(tenant_id, schema_guard))) {
if (!GCTX.root_service_->is_full_service()) {
ret = OB_CANCELED;
LOG_WARN("rs is not in full service", KR(ret));
} else if (OB_FAIL(schema_service_.get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("get schema guard failed", K(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table))) {
LOG_WARN("get table schema failed", K(ret), KT(table_id));
@ -766,7 +782,11 @@ int ObRootInspection::check_all()
tmp = OB_SUCCESS;
ObRsJobType job_type = upgrade_job_type_array[i];
if (job_type > JOB_TYPE_INVALID && job_type < JOB_TYPE_MAX) {
if (OB_SUCCESS != (tmp = ObUpgradeUtils::check_upgrade_job_passed(job_type))) {
if (OB_SUCCESS != (tmp = check_cancel())) {
LOG_WARN("check_cancel failed", KR(ret), K(tmp));
ret = (OB_SUCCESS == ret) ? tmp : ret;
break;
} else if (OB_SUCCESS != (tmp = ObUpgradeUtils::check_upgrade_job_passed(job_type))) {
LOG_WARN("fail to check upgrade job passed", K(tmp), K(job_type));
if (OB_RUN_JOB_NOT_SUCCESS != tmp) {
ret = (OB_SUCCESS == ret) ? tmp : ret;
@ -1923,6 +1943,11 @@ int ObRootInspection::check_cancel()
int ret = OB_SUCCESS;
if (stopped_) {
ret = OB_CANCELED;
} else if (OB_ISNULL(GCTX.root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rootservice is null", KR(ret));
} else if (!GCTX.root_service_->is_full_service()) {
ret = OB_CANCELED;
}
return ret;
}

View File

@ -381,31 +381,6 @@ ObAsyncTask *ObRootService::ObMinorFreezeTask::deep_copy(char *buf, const int64_
return task;
}
ObRootService::ObInnerTableMonitorTask::ObInnerTableMonitorTask(ObRootService &rs)
:ObAsyncTimerTask(rs.task_queue_),
rs_(rs)
{}
int ObRootService::ObInnerTableMonitorTask::process()
{
int ret = OB_SUCCESS;
if (OB_FAIL(rs_.inner_table_monitor_.purge_inner_table_history())) {
LOG_WARN("failed to purge inner table history", K(ret));
}
return ret;
}
ObAsyncTask *ObRootService::ObInnerTableMonitorTask::deep_copy(char *buf, const int64_t buf_size) const
{
ObInnerTableMonitorTask *task = NULL;
if (NULL == buf || buf_size < static_cast<int64_t>(sizeof(*this))) {
LOG_WARN_RET(OB_BUF_NOT_ENOUGH, "buffer not large enough", K(buf_size));
} else {
task = new(buf) ObInnerTableMonitorTask(rs_);
}
return task;
}
////////////////////////////////////////////////////////////////
bool ObRsStatus::can_start_service() const
@ -655,7 +630,6 @@ ObRootService::ObRootService()
upgrade_executor_(),
upgrade_storage_format_executor_(), create_inner_schema_executor_(),
bootstrap_lock_(), broadcast_rs_list_lock_(ObLatchIds::RS_BROADCAST_LOCK),
inner_table_monitor_(),
task_queue_(),
inspect_task_queue_(),
restart_task_(*this),
@ -668,7 +642,6 @@ ObRootService::ObRootService()
SERVER_EVENT_INSTANCE,
DEALOCK_EVENT_INSTANCE,
task_queue_),
inner_table_monitor_task_(*this),
inspector_task_(*this),
purge_recyclebin_task_(*this),
ddl_scheduler_(),
@ -919,8 +892,6 @@ int ObRootService::init(ObServerConfig &config,
server_manager_, zone_manager_, rpc_proxy_,
self_addr_, sql_proxy, disaster_recovery_task_mgr_))) {
FLOG_WARN("init root balancer failed", KR(ret));
} else if (OB_FAIL(inner_table_monitor_.init(sql_proxy, common_proxy_, *this))) {
FLOG_WARN("init inner table monitor failed", KR(ret));
} else if (OB_FAIL(ROOTSERVICE_EVENT_INSTANCE.init(sql_proxy, self_addr_))) {
FLOG_WARN("init rootservice event history failed", KR(ret));
} else if (OB_FAIL(THE_RS_JOB_TABLE.init(&sql_proxy, self_addr_))) {
@ -1346,6 +1317,10 @@ void ObRootService::wait()
int64_t cost = ObTimeUtility::current_time() - start_time;
ROOTSERVICE_EVENT_ADD("root_service", "finish_wait_stop", K(cost));
FLOG_INFO("[ROOTSERVICE_NOTICE] rootservice wait finished", K(start_time), K(cost));
if (cost > 10 * 60 * 1000 * 1000L) { // 10min
int ret = OB_ERROR;
LOG_ERROR("cost too much time to wait rs stop", KR(ret), K(start_time), K(cost));
}
}
int ObRootService::reload_config()
@ -1552,21 +1527,6 @@ int ObRootService::schedule_check_server_timer_task()
return ret;
}
int ObRootService::schedule_inner_table_monitor_task()
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_FAIL(task_queue_.add_timer_task(inner_table_monitor_task_,
ObInnerTableMonitorTask::PURGE_INTERVAL, true))) {
LOG_WARN("failed to add task", K(ret));
} else {
LOG_INFO("schedule inner_table_monitor task");
}
return ret;
}
int ObRootService::schedule_recyclebin_task(int64_t delay)
{
int ret = OB_SUCCESS;
@ -5241,15 +5201,6 @@ int ObRootService::start_timer_tasks()
}
}
if (OB_SUCC(ret) && !task_queue_.exist_timer_task(inner_table_monitor_task_)) {
// remove purge inner table task, we may fail to get history schema after purge.
// if (OB_FAIL(schedule_inner_table_monitor_task())) {
// LOG_WARN("start inner table monitor service fail", K(ret));
// } else {
// LOG_INFO("start inner table monitor success");
// }
}
if (OB_SUCC(ret)) {
if (OB_FAIL(schedule_inspector_task())) {
LOG_WARN("start inspector fail", K(ret));
@ -5292,7 +5243,6 @@ int ObRootService::stop_timer_tasks()
task_queue_.cancel_timer_task(restart_task_);
task_queue_.cancel_timer_task(check_server_task_);
task_queue_.cancel_timer_task(event_table_clear_task_);
task_queue_.cancel_timer_task(inner_table_monitor_task_);
task_queue_.cancel_timer_task(self_check_task_);
task_queue_.cancel_timer_task(update_rs_list_timer_task_);
inspect_task_queue_.cancel_timer_task(inspector_task_);

View File

@ -41,7 +41,6 @@
#include "rootserver/ob_root_inspection.h"
#include "rootserver/ob_rs_event_history_table_operator.h"
#include "rootserver/ob_rs_thread_checker.h"
#include "rootserver/ob_inner_table_monitor.h"
#include "rootserver/ob_snapshot_info_manager.h"
#include "rootserver/ob_upgrade_storage_format_version_executor.h"
#include "rootserver/ob_upgrade_executor.h"
@ -335,21 +334,6 @@ public:
DISALLOW_COPY_AND_ASSIGN(RsListChangeCb);
};
class ObInnerTableMonitorTask: public common::ObAsyncTimerTask
{
public:
const static int64_t PURGE_INTERVAL = 3600L * 1000L * 1000L;//1h
ObInnerTableMonitorTask(ObRootService &rs);
virtual ~ObInnerTableMonitorTask() {}
// interface of AsyncTask
virtual int process() override;
virtual int64_t get_deep_copy_size() const override { return sizeof(*this); }
virtual ObAsyncTask *deep_copy(char *buf, const int64_t buf_size) const override;
private:
ObRootService &rs_;
DISALLOW_COPY_AND_ASSIGN(ObInnerTableMonitorTask);
};
class ObMinorFreezeTask : public share::ObAsyncTask
{
public:
@ -754,8 +738,6 @@ public:
int schedule_check_server_timer_task();
// @see ObRefreshServerTask
int schedule_refresh_server_timer_task(const int64_t delay);
// @see ObInnerTableMonitorTask
int schedule_inner_table_monitor_task();
int schedule_primary_cluster_inspection_task();
int schedule_recyclebin_task(int64_t delay);
// @see ObInspector
@ -944,9 +926,6 @@ private:
common::SpinRWLock broadcast_rs_list_lock_;
// Inner table mointor
ObInnerTableMonitor inner_table_monitor_;
// the single task queue for all async tasks and timer tasks
common::ObWorkQueue task_queue_;
common::ObWorkQueue inspect_task_queue_;
@ -959,7 +938,6 @@ private:
ObLoadDDLTask load_ddl_task_; // repeat to succeed & no retry
ObRefreshIOCalibrationTask refresh_io_calibration_task_; // retry to succeed & no repeat
share::ObEventTableClearTask event_table_clear_task_; // repeat & no retry
ObInnerTableMonitorTask inner_table_monitor_task_; // repeat & no retry
ObInspector inspector_task_; // repeat & no retry
ObPurgeRecyclebinTask purge_recyclebin_task_; // not repeat & no retry

View File

@ -21,6 +21,7 @@
#include "ob_deadlock_parameters.h"
#include "share/schema/ob_schema_utils.h"
#include "share/schema/ob_multi_version_schema_service.h"
#include "rootserver/ob_root_service.h"
namespace oceanbase
{
@ -121,6 +122,7 @@ int ObDeadLockInnerTableService::insert_all(const ObIArray<ObDetectorInnerReport
return ret;
}
// called by rs
int ObDeadLockInnerTableService::ObDeadLockEventHistoryTableOperator::async_delete()
{
int ret = OB_SUCCESS;
@ -131,7 +133,11 @@ int ObDeadLockInnerTableService::ObDeadLockEventHistoryTableOperator::async_dele
const int64_t rs_delete_timestap = now - REMAIN_RECORD_DURATION;
DETECT_TIME_GUARD(3_s);
if (CLICK() && OB_FAIL(schema::ObMultiVersionSchemaService::
rootserver::ObRootService *root_service = GCTX.root_service_;
if (OB_ISNULL(root_service)) {
ret = OB_ERR_UNEXPECTED;
DETECT_LOG(WARN, "ptr is null", KR(ret), KP(root_service));
} else if (CLICK() && OB_FAIL(schema::ObMultiVersionSchemaService::
get_instance().get_tenant_schema_guard(OB_SYS_TENANT_ID,
schema_guard))) {
DETECT_LOG(WARN, "get schema guard failed", KR(ret));
@ -143,16 +149,18 @@ int ObDeadLockInnerTableService::ObDeadLockEventHistoryTableOperator::async_dele
DETECT_LOG(WARN, "assign_fmt failed", KR(ret));
} else {
CLICK();
for (int64_t idx = 0; idx < tenant_ids.count(); ++idx) {
for (int64_t idx = 0; OB_SUCC(ret) && idx < tenant_ids.count(); ++idx) {
int64_t affected_rows = 0;
uint64_t tenant_id = tenant_ids.at(idx);
int temp_ret = OB_SUCCESS;
if (is_user_tenant(tenant_id)) {
if (!root_service->is_full_service()) {
ret = OB_CANCELED;
DETECT_LOG(WARN, "rs exit", KR(ret));
} else if (is_user_tenant(tenant_id)) {
// skip
} else if (OB_SUCCESS !=
(temp_ret = GCTX.sql_proxy_->write(tenant_id,
sql.ptr(),
affected_rows))) {
(temp_ret = root_service->get_sql_proxy().write(
tenant_id, sql.ptr(), affected_rows))) {
DETECT_LOG(WARN, "execute delete sql failed", K(sql), K(tenant_id), KR(temp_ret));
} else {
DETECT_LOG(INFO, "delete old history record event", K(sql), K(tenant_id), K(affected_rows));