[FEAT MERGE]adaptive cost model

This commit is contained in:
zzg19950727
2024-02-07 11:01:37 +00:00
committed by ob-robot
parent 1ee1ad37e4
commit 9a22f3ea88
85 changed files with 21899 additions and 18832 deletions

View File

@ -23,6 +23,8 @@
#include "share/stat/ob_index_stats_estimator.h"
#include "pl/sys_package/ob_dbms_stats.h"
#include "share/stat/ob_opt_stat_gather_stat.h"
#include "src/observer/ob_server.h"
namespace oceanbase {
using namespace pl;
namespace common {
@ -869,6 +871,79 @@ int ObDbmsStatsExecutor::update_online_stat(ObExecContext &ctx,
return ret;
}
int ObDbmsStatsExecutor::gather_system_stats(ObExecContext &ctx, int64_t tenant_id)
{
int ret = OB_SUCCESS;
UNUSED(ctx);
int64_t cpu_mhz = OBSERVER.get_cpu_frequency_khz()/1000;
int64_t network_speed = OBSERVER.get_network_speed() / 1024.0 / 1024.0;
int64_t disk_seq_read_speed = 0;
int64_t disk_rnd_read_speed = 0;
OptSystemIoBenchmark &io_benchmark = OptSystemIoBenchmark::get_instance();
if (io_benchmark.is_init()) {
disk_seq_read_speed = io_benchmark.get_disk_seq_read_speed();
disk_rnd_read_speed = io_benchmark.get_disk_rnd_read_speed();
} else if (OB_FAIL(io_benchmark.run_benchmark(ctx.get_allocator()))) {
LOG_WARN("failed to run io benchmark", K(ret));
} else {
disk_seq_read_speed = io_benchmark.get_disk_seq_read_speed();
disk_rnd_read_speed = io_benchmark.get_disk_rnd_read_speed();
}
if (OB_SUCC(ret)) {
ObOptSystemStat system_stat;
ObOptStatManager &mgr = ObOptStatManager::get_instance();
int64_t current_time = ObTimeUtility::current_time();
system_stat.set_last_analyzed(current_time);
system_stat.set_cpu_speed(cpu_mhz);
system_stat.set_disk_seq_read_speed(disk_seq_read_speed);
system_stat.set_disk_rnd_read_speed(disk_rnd_read_speed);
system_stat.set_network_speed(network_speed);
if (OB_FAIL(mgr.update_system_stats(tenant_id,
&system_stat))) {
LOG_WARN("failed to update system stats", K(ret));
}
}
return ret;
}
int ObDbmsStatsExecutor::delete_system_stats(ObExecContext &ctx, int64_t tenant_id)
{
int ret = OB_SUCCESS;
UNUSED(ctx);
ObOptStatManager &mgr = ObOptStatManager::get_instance();
if (OB_FAIL(mgr.delete_system_stats(tenant_id))) {
LOG_WARN("failed to delete system stats", K(ret));
}
return ret;
}
int ObDbmsStatsExecutor::set_system_stats(ObExecContext &ctx, const ObSetSystemStatParam &param)
{
int ret = OB_SUCCESS;
UNUSED(ctx);
ObOptSystemStat system_stat;
ObOptStatManager &mgr = ObOptStatManager::get_instance();
if (OB_FAIL(mgr.get_system_stat(param.tenant_id_, system_stat))) {
LOG_WARN("failed to get table stat", K(ret));
} else if (ObCharset::case_insensitive_equal(param.name_, "cpu_speed")) {
system_stat.set_cpu_speed(param.value_);
} else if (ObCharset::case_insensitive_equal(param.name_, "disk_seq_read_speed")) {
system_stat.set_disk_seq_read_speed(param.value_);
} else if (ObCharset::case_insensitive_equal(param.name_, "disk_rnd_read_speed")) {
system_stat.set_disk_rnd_read_speed(param.value_);
} else if (ObCharset::case_insensitive_equal(param.name_, "network_speed")) {
system_stat.set_network_speed(param.value_);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(mgr.update_system_stats(param.tenant_id_,
&system_stat))) {
LOG_WARN("failed to update system stats", K(ret));
} else {
LOG_TRACE("end set system stats", K(param), K(system_stat));
}
return ret;
}
} // namespace common
} // namespace oceanbase

View File

@ -51,6 +51,13 @@ public:
share::schema::ObSchemaGetterGuard *schema_guard,
const TabStatIndMap &online_table_stats,
const ColStatIndMap &online_column_stats);
static int gather_system_stats(ObExecContext &ctx, int64_t tenant_id);
static int delete_system_stats(ObExecContext &ctx, int64_t tenant_id);
static int set_system_stats(ObExecContext &ctx, const ObSetSystemStatParam &param);
private:
static int do_gather_stats(ObExecContext &ctx,

View File

@ -109,7 +109,11 @@ void ObOptStatManager::destroy()
int ObOptStatManager::add_refresh_stat_task(const obrpc::ObUpdateStatCacheArg &analyze_arg)
{
int ret = OB_SUCCESS;
if (OB_FAIL(handle_refresh_stat_task(analyze_arg))) {
if (analyze_arg.update_system_stats_only_) {
if (OB_FAIL(handle_refresh_system_stat_task(analyze_arg))) {
LOG_WARN("failed to handle refresh system stat cache", K(ret));
}
} else if (OB_FAIL(handle_refresh_stat_task(analyze_arg))) {
LOG_WARN("failed to handld refresh stat task", K(ret));
}
return ret;
@ -419,7 +423,7 @@ int ObOptStatManager::handle_refresh_stat_task(const obrpc::ObUpdateStatCacheArg
{
int ret = OB_SUCCESS;
uint64_t table_id = arg.table_id_;
for (int64_t i = 0; OB_SUCC(ret) && i < arg.partition_ids_.count(); ++i) {
for (int64_t i = 0; OB_SUCC(ret) && i < arg.partition_ids_.count(); ++i) {
ObOptTableStat::Key table_key(arg.tenant_id_,
table_id,
arg.partition_ids_.at(i));
@ -444,6 +448,31 @@ int ObOptStatManager::handle_refresh_stat_task(const obrpc::ObUpdateStatCacheArg
return ret;
}
int ObOptStatManager::handle_refresh_system_stat_task(const obrpc::ObUpdateStatCacheArg &arg)
{
int ret = OB_SUCCESS;
ObOptSystemStat::Key key(arg.tenant_id_);
if (OB_FAIL(stat_service_.erase_system_stat(key))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("failed to erase system stat", K(ret));
} else {
ret = OB_SUCCESS;
LOG_TRACE("failed to erase system stat", K(key));
}
}
if (OB_SUCC(ret)) {
MTL_SWITCH(arg.tenant_id_) {
sql::ObPlanCache *pc = MTL(sql::ObPlanCache*);
if (OB_FAIL(pc->flush_plan_cache())) {
LOG_WARN("failed to evict plan", K(ret));
// use OB_SQL_PC_NOT_EXIST represent evict plan failed
ret = OB_SQL_PC_NOT_EXIST;
}
}
}
return ret;
}
int ObOptStatManager::invalidate_plan(const uint64_t tenant_id, const uint64_t table_id)
{
int ret = OB_SUCCESS;
@ -560,6 +589,30 @@ int ObOptStatManager::check_opt_stat_validity(sql::ObExecContext &ctx,
return ret;
}
int ObOptStatManager::check_system_stat_validity(sql::ObExecContext *ctx,
const uint64_t tenant_id,
bool &is_valid)
{
int ret = OB_SUCCESS;
const share::schema::ObTableSchema *table_schema = NULL;
is_valid = false;
if (OB_ISNULL(ctx) ||
OB_ISNULL(ctx->get_virtual_table_ctx().schema_guard_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(ctx->get_virtual_table_ctx().schema_guard_->get_table_schema(
tenant_id,
share::OB_ALL_AUX_STAT_TID,
table_schema))) {
LOG_WARN("failed to get table schema", K(ret), K(table_schema));
} else if (OB_ISNULL(table_schema)) {
//do nothing
} else {
is_valid = true;
}
return ret;
}
int ObOptStatManager::get_table_stat(const uint64_t tenant_id,
const uint64_t table_ref_id,
const int64_t part_id,
@ -773,5 +826,45 @@ int ObOptStatManager::update_opt_stat_task_stat(const ObOptStatTaskInfo &task_in
return stat_service_.get_sql_service().update_opt_stat_task_stat(task_info);
}
int ObOptStatManager::get_system_stat(const uint64_t tenant_id,
ObOptSystemStat &stat)
{
int ret = OB_SUCCESS;
ObOptSystemStat::Key key(tenant_id);
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("optimizer statistics manager has not been initialized.", K(ret), K(inited_));
} else if (OB_FAIL(stat_service_.get_system_stat(tenant_id, key, stat))) {
LOG_WARN("get system stat failed", K(ret));
}
return ret;
}
int ObOptStatManager::update_system_stats(const uint64_t tenant_id,
const ObOptSystemStat *system_stats)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_FAIL(stat_service_.get_sql_service().update_system_stats(tenant_id,
system_stats))) {
LOG_WARN("failed to update system stats", K(ret));
}
return ret;
}
int ObOptStatManager::delete_system_stats(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("optimizer statistics manager has not been initialized.", K(ret), K(inited_));
} else if (OB_FAIL(stat_service_.get_sql_service().delete_system_stats(tenant_id))) {
LOG_WARN("delete system stat failed", K(ret));
}
return ret;
}
}
}

View File

@ -23,6 +23,7 @@
#include "share/stat/ob_stat_define.h"
#include "share/stat/ob_opt_ds_stat.h"
#include "share/stat/ob_stat_item.h"
#include "share/stat/ob_opt_system_stat.h"
namespace oceanbase {
namespace common {
@ -50,6 +51,10 @@ public:
const ObIArray<int64_t> &part_ids,
bool &is_opt_stat_valid);
int check_system_stat_validity(sql::ObExecContext *ctx,
const uint64_t tenant_id,
bool &is_valid);
int check_opt_stat_validity(sql::ObExecContext &ctx,
const uint64_t tenant_id,
const uint64_t tab_ref_id,
@ -173,6 +178,8 @@ public:
int handle_refresh_stat_task(const obrpc::ObUpdateStatCacheArg &arg);
int handle_refresh_system_stat_task(const obrpc::ObUpdateStatCacheArg &arg);
int get_table_rowcnt(const uint64_t tenant_id,
const uint64_t table_id,
const ObIArray<ObTabletID> &all_tablet_ids,
@ -199,6 +206,11 @@ public:
ObOptDSStatHandle &ds_stat_handle);
int update_opt_stat_gather_stat(const ObOptStatGatherStat &gather_stat);
int update_opt_stat_task_stat(const ObOptStatTaskInfo &task_info);
int get_system_stat(const uint64_t tenant_id,
ObOptSystemStat &stat);
int update_system_stats(const uint64_t tenant_id,
const ObOptSystemStat *system_stats);
int delete_system_stats(const uint64_t tenant_id);
protected:
static const int64_t REFRESH_STAT_TASK_NUM = 5;
bool inited_;

View File

@ -247,6 +247,9 @@ int ObOptStatService::init(common::ObMySQLProxy *proxy, ObServerConfig *config)
} else if (OB_FAIL(ds_stat_cache_.init("opt_ds_stat_cache",
DEFAULT_DS_STAT_CACHE_PRIORITY))) {
LOG_WARN("fail to init table cache.", K(ret));
} else if (OB_FAIL(system_stat_cache_.init("opt_system_stat_cache",
DEFAULT_SYSTEM_STAT_CACHE_PRIORITY))) {
LOG_WARN("fail to init system stat cache.", K(ret));
} else {
inited_ = true;
}
@ -437,5 +440,64 @@ int ObOptStatService::erase_ds_stat(const ObOptDSStat::Key &key)
return ds_stat_cache_.erase(key);
}
int ObOptStatService::get_system_stat(const uint64_t tenant_id,
const ObOptSystemStat::Key &key,
ObOptSystemStat &stat)
{
int ret = OB_SUCCESS;
ObOptSystemStatHandle handle;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("statistics service is not initialized. ", K(ret), K(key));
} else if (OB_FAIL(system_stat_cache_.get_value(key, handle))) {
// we need to fetch statistics from inner table if it is not yet available from cache
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("get system stat from cache failed", K(ret), K(key));
} else if (OB_FAIL(load_system_stat_and_put_cache(tenant_id, key, handle))) {
LOG_WARN("load and put cache system stat failed.", K(ret), K(key));
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_ISNULL(handle.stat_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cache hit but value is NULL. BUG here.", K(ret), K(key));
} else {
stat = *handle.stat_;
}
return ret;
}
int ObOptStatService::load_system_stat_and_put_cache(const uint64_t tenant_id,
const ObOptSystemStat::Key &key,
ObOptSystemStatHandle &handle)
{
int ret = OB_SUCCESS;
ObOptSystemStat stat;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("statistics service is not initialized. ", K(ret), K(key));
} else if (OB_FAIL(system_stat_cache_.put_and_fetch_value(key, stat, handle))) {
LOG_WARN("put and fetch table stat failed.", K(ret), K(key));
} else if (OB_FAIL(sql_service_.fetch_system_stat(tenant_id, key, stat))) {
system_stat_cache_.erase(key);
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("fetch system stat failed. ", K(ret), K(key));
} else {
// it's not guaranteed that system stat exists.
stat.reset();
ret = OB_SUCCESS;
}
} else if (OB_FAIL(system_stat_cache_.put_and_fetch_value(key, stat, handle))) {
LOG_WARN("put and fetch table stat failed.", K(ret), K(key));
}
return ret;
}
int ObOptStatService::erase_system_stat(const ObOptSystemStat::Key &key)
{
return system_stat_cache_.erase(key);
}
}
}

View File

@ -19,6 +19,7 @@
#include "share/stat/ob_opt_stat_sql_service.h"
#include "share/stat/ob_opt_ds_stat.h"
#include "share/stat/ob_opt_ds_stat_cache.h"
#include "share/stat/ob_opt_system_stat_cache.h"
namespace oceanbase {
namespace common {
@ -60,6 +61,16 @@ public:
const ObIArray<share::ObLSID> &all_ls_ids,
int64_t &table_rowcnt);
int get_system_stat(const uint64_t tenant_id,
const ObOptSystemStat::Key &key,
ObOptSystemStat &stat);
int load_system_stat_and_put_cache(const uint64_t tenant_id,
const ObOptSystemStat::Key &key,
ObOptSystemStatHandle &handle);
int erase_system_stat(const ObOptSystemStat::Key &key);
private:
/**
* 接口load_and_put_cache(key, handle)的实现,外部不应该直接调用这个函数
@ -83,11 +94,13 @@ protected:
static const int64_t DEFAULT_TAB_STAT_CACHE_PRIORITY = 1;
static const int64_t DEFAULT_COL_STAT_CACHE_PRIORITY = 1;
static const int64_t DEFAULT_DS_STAT_CACHE_PRIORITY = 1;
static const int64_t DEFAULT_SYSTEM_STAT_CACHE_PRIORITY = 1;
ObOptStatSqlService sql_service_;
ObOptTableStatCache table_stat_cache_;
ObOptColumnStatCache column_stat_cache_;
ObOptDSStatCache ds_stat_cache_;
ObOptSystemStatCache system_stat_cache_;
};
}

View File

@ -299,6 +299,14 @@
"WHERE %.*s "\
"ORDER BY tenant_id, table_id, partition_id, column_id, endpoint_num;"
#define INSERT_SYSTEM_STAT_SQL "REPLACE INTO %s(tenant_id," \
"last_analyzed," \
"cpu_speed," \
"disk_seq_read_speed," \
"disk_rnd_read_speed," \
"network_speed) VALUES "
#define DELETE_SYSTEM_STAT_SQL "DELETE FROM %s WHERE TENANT_ID=%ld"
namespace oceanbase
{
@ -2255,6 +2263,155 @@ int ObOptStatSqlService::get_gather_stat_value(const ObOptStatGatherStat &gather
return ret;
}
int ObOptStatSqlService::update_system_stats(const uint64_t tenant_id,
const ObOptSystemStat *system_stat)
{
int ret = OB_SUCCESS;
ObSqlString system_stat_sql;
ObSqlString tmp;
int64_t current_time = ObTimeUtility::current_time();
int64_t affected_rows = 0;
if (OB_ISNULL(system_stat)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table stat is null", K(ret), K(system_stat));
} else if (OB_FAIL(system_stat_sql.append_fmt(INSERT_SYSTEM_STAT_SQL, OB_ALL_AUX_STAT_TNAME))) {
LOG_WARN("failed to append sql", K(ret));
} else if (OB_FAIL(get_system_stat_sql(tenant_id, *system_stat, current_time, tmp))) {
LOG_WARN("failed to get table stat sql", K(ret));
} else if (OB_FAIL(system_stat_sql.append_fmt("(%s);", tmp.ptr()))) {
LOG_WARN("failed to append system stat sql", K(ret));
} else {
ObMySQLTransaction trans;
LOG_TRACE("sql string of system stat update", K(system_stat_sql));
if (OB_FAIL(trans.start(mysql_proxy_, tenant_id))) {
LOG_WARN("fail to start transaction", K(ret), K(tenant_id));
} else if (OB_FAIL(trans.write(tenant_id, system_stat_sql.ptr(), affected_rows))) {
LOG_WARN("failed to exec sql", K(ret));
} else {/*do nothing*/}
if (OB_SUCC(ret)) {
if (OB_FAIL(trans.end(true))) {
LOG_WARN("fail to commit transaction", K(ret));
}
} else {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
LOG_WARN("fail to roll back transaction", K(tmp_ret));
}
}
}
return ret;
}
int ObOptStatSqlService::get_system_stat_sql(const uint64_t tenant_id,
const ObOptSystemStat &stat,
const int64_t current_time,
ObSqlString &sql_string)
{
int ret = OB_SUCCESS;
share::ObDMLSqlSplicer dml_splicer;
uint64_t ext_tenant_id = ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id);
if (OB_FAIL(dml_splicer.add_pk_column("tenant_id", ext_tenant_id)) ||
OB_FAIL(dml_splicer.add_time_column("last_analyzed", stat.get_last_analyzed() == 0 ?
current_time : stat.get_last_analyzed())) ||
OB_FAIL(dml_splicer.add_column("cpu_speed", stat.get_cpu_speed())) ||
OB_FAIL(dml_splicer.add_column("disk_seq_read_speed", stat.get_disk_seq_read_speed())) ||
OB_FAIL(dml_splicer.add_column("disk_rnd_read_speed", stat.get_disk_rnd_read_speed())) ||
OB_FAIL(dml_splicer.add_column("network_speed", stat.get_network_speed()))) {
LOG_WARN("failed to add dml splicer column", K(ret));
} else if (OB_FAIL(dml_splicer.splice_values(sql_string))) {
LOG_WARN("failed to get sql string", K(ret));
} else { /*do nothing*/ }
return ret;
}
int ObOptStatSqlService::fetch_system_stat(const uint64_t tenant_id,
const ObOptSystemStat::Key &key,
ObOptSystemStat &stat)
{
int ret = OB_SUCCESS;
uint64_t data_version = 0;
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("failed to get data version", K(ret));
} else if (data_version >= DATA_VERSION_4_3_0_0) {
ObSQLClientRetryWeak sql_client_retry_weak(mysql_proxy_, false, OB_INVALID_TIMESTAMP, false);
int64_t ext_tenant_id = ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id);
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
sqlclient::ObMySQLResult *result = NULL;
ObSqlString sql;
uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("sql service has not been initialized.", K(ret));
} else if (OB_FAIL(sql.append_fmt("SELECT * FROM %s ", share::OB_ALL_AUX_STAT_TNAME))) {
LOG_WARN("fail to append SQL stmt string.", K(sql), K(ret));
} else if (OB_FAIL(sql.append_fmt(" WHERE TENANT_ID = %ld", ext_tenant_id))) {
LOG_WARN("fail to append SQL where string.", K(ret));
} else if (OB_FAIL(sql_client_retry_weak.read(res, exec_tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", "sql", sql.ptr(), K(ret));
} else if (NULL == (result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to execute ", "sql", sql.ptr(), K(ret));
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END != ret) {
LOG_WARN("get next row failed", K(ret));
} else {
ret = OB_SUCCESS;
}
} else if (OB_FAIL(fill_system_stat(*result, stat))) {
LOG_WARN("failed to fill system stat", K(ret));
}
}
}
return ret;
}
int ObOptStatSqlService::fill_system_stat(sqlclient::ObMySQLResult &result, ObOptSystemStat &stat)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("sql service has not been initialized.", K(ret));
} else {
EXTRACT_INT_FIELD_TO_CLASS_MYSQL_WITH_DEFAULT_VALUE(result, cpu_speed, stat, int64_t, true, true, 0);
EXTRACT_INT_FIELD_TO_CLASS_MYSQL_WITH_DEFAULT_VALUE(result, disk_seq_read_speed, stat, int64_t, true, true, 0);
EXTRACT_INT_FIELD_TO_CLASS_MYSQL_WITH_DEFAULT_VALUE(result, disk_rnd_read_speed, stat, int64_t, true, true, 0);
EXTRACT_INT_FIELD_TO_CLASS_MYSQL_WITH_DEFAULT_VALUE(result, network_speed, stat, int64_t, true, true, 0);
}
return ret;
}
int ObOptStatSqlService::delete_system_stats(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObSqlString system_stat_sql;
int64_t current_time = ObTimeUtility::current_time();
int64_t affected_rows = 0;
int64_t ext_tenant_id = ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id);
if (OB_FAIL(system_stat_sql.append_fmt(DELETE_SYSTEM_STAT_SQL, OB_ALL_AUX_STAT_TNAME, ext_tenant_id))) {
LOG_WARN("failed to append sql", K(ret));
} else {
ObMySQLTransaction trans;
LOG_TRACE("sql string of system stat delete", K(system_stat_sql));
if (OB_FAIL(trans.start(mysql_proxy_, tenant_id))) {
LOG_WARN("fail to start transaction", K(ret), K(tenant_id));
} else if (OB_FAIL(trans.write(tenant_id, system_stat_sql.ptr(), affected_rows))) {
LOG_WARN("failed to exec sql", K(ret));
} else {/*do nothing*/}
if (OB_SUCC(ret)) {
if (OB_FAIL(trans.end(true))) {
LOG_WARN("fail to commit transaction", K(ret));
}
} else {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
LOG_WARN("fail to roll back transaction", K(tmp_ret));
}
}
}
return ret;
}
} // end of namespace common
} // end of namespace oceanbase
@ -2271,3 +2428,5 @@ int ObOptStatSqlService::get_gather_stat_value(const ObOptStatGatherStat &gather
#undef INSERT_ONLINE_TABLE_STAT_DUPLICATE
#undef INSERT_ONLINE_COL_STAT_SQL
#undef INSERT_ONLINE_COL_STAT_DUPLICATE
#undef INERT_SYSTEM_STAT_SQL
#undef DELETE_SYSTEM_STAT_SQL

View File

@ -15,6 +15,7 @@
#include "share/stat/ob_opt_column_stat.h"
#include "share/stat/ob_opt_table_stat.h"
#include "share/stat/ob_opt_system_stat.h"
#include "share/stat/ob_stat_define.h"
#include "share/stat/ob_opt_stat_gather_stat.h"
#include "lib/mysqlclient/ob_mysql_transaction.h"
@ -170,6 +171,14 @@ public:
int update_opt_stat_gather_stat(const ObOptStatGatherStat &gather_stat);
int update_opt_stat_task_stat(const ObOptStatTaskInfo &task_info);
int update_system_stats(const uint64_t tenant_id,
const ObOptSystemStat *system_stat);
int fetch_system_stat(const uint64_t tenant_id,
const ObOptSystemStat::Key &key,
ObOptSystemStat &stat);
int delete_system_stats(const uint64_t tenant_id);
private:
int get_table_stat_sql(const uint64_t tenant_id,
const ObOptTableStat &stat,
@ -273,6 +282,13 @@ private:
int get_gather_stat_task_value(const ObOptStatTaskInfo &task_info,
ObSqlString &values_str);
int get_system_stat_sql(const uint64_t tenant_id,
const ObOptSystemStat &stat,
const int64_t current_time,
ObSqlString &sql_string);
int fill_system_stat(sqlclient::ObMySQLResult &result, ObOptSystemStat &stat);
static const char *bitmap_compress_lib_name;
bool inited_;

View File

@ -0,0 +1,171 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_OPT
#include "share/stat/ob_opt_system_stat.h"
#include "lib/utility/ob_unify_serialize.h"
#include "lib/utility/ob_macro_utils.h"
#include "src/storage/blocksstable/ob_block_manager.h"
#include "src/share/io/ob_io_manager.h"
namespace oceanbase {
namespace common {
using namespace sql;
OB_DEF_SERIALIZE(ObOptSystemStat) {
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_ENCODE,
last_analyzed_,
cpu_speed_,
disk_seq_read_speed_,
disk_rnd_read_speed_,
network_speed_
);
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObOptSystemStat) {
int64_t len = 0;
LST_DO_CODE(OB_UNIS_ADD_LEN,
last_analyzed_,
cpu_speed_,
disk_seq_read_speed_,
disk_rnd_read_speed_,
network_speed_
);
return len;
}
OB_DEF_DESERIALIZE(ObOptSystemStat) {
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_DECODE,
last_analyzed_,
cpu_speed_,
disk_seq_read_speed_,
disk_rnd_read_speed_,
network_speed_
);
return ret;
}
void OptSystemIoBenchmark::reset()
{
disk_seq_read_speed_ = 0;
disk_rnd_read_speed_ = 0;
init_ = false;
}
OptSystemIoBenchmark& OptSystemIoBenchmark::get_instance()
{
static OptSystemIoBenchmark benchmark;
return benchmark;
}
int OptSystemIoBenchmark::run_benchmark(ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
int64_t load_size = 16 * 1024; //16k
int64_t io_count = 0;
int64_t rt_us = 0;
int64_t data_size = 0;
char *read_buf = NULL;
ObIOInfo io_info;
io_info.tenant_id_ = OB_SERVER_TENANT_ID;
io_info.size_ = load_size;
io_info.buf_ = nullptr;
io_info.flag_.set_mode(ObIOMode::READ);
io_info.flag_.set_group_id(ObIOModule::CALIBRATION_IO);
io_info.flag_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ);
io_info.flag_.set_unlimited(true);
io_info.timeout_us_ = MAX_IO_WAIT_TIME_MS;
ObIOHandle io_handle;
// prepare io bench
ObSEArray<blocksstable::ObMacroBlockHandle, 16> block_handles;
const double MIN_FREE_SPACE_PERCENTAGE = 0.1; //
const int64_t MIN_CALIBRATION_BLOCK_COUNT = 1024L * 1024L * 1024L / OB_DEFAULT_MACRO_BLOCK_SIZE;
const int64_t MAX_CALIBRATION_BLOCK_COUNT = 20L * 1024L * 1024L * 1024L / OB_DEFAULT_MACRO_BLOCK_SIZE;
const int64_t free_block_count = OB_SERVER_BLOCK_MGR.get_free_macro_block_count();
const int64_t total_block_count = OB_SERVER_BLOCK_MGR.get_total_macro_block_count();
int64_t benchmark_block_count = free_block_count * 0.2;
if (free_block_count <= MIN_CALIBRATION_BLOCK_COUNT ||
1.0 * free_block_count / total_block_count < MIN_FREE_SPACE_PERCENTAGE) {
ret = OB_SERVER_OUTOF_DISK_SPACE;
LOG_WARN("out of space", K(ret), K(free_block_count), K(total_block_count));
} else if (OB_ISNULL(read_buf = static_cast<char *>(allocator.alloc(OB_DEFAULT_MACRO_BLOCK_SIZE)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate read memory failed", K(ret));
} else {
benchmark_block_count = min(benchmark_block_count, MAX_CALIBRATION_BLOCK_COUNT);
benchmark_block_count = max(benchmark_block_count, MIN_CALIBRATION_BLOCK_COUNT);
io_info.user_data_buf_ = read_buf;
}
// prepare macro blocks
for (int64_t i = 0; OB_SUCC(ret) && i < benchmark_block_count; ++i) {
blocksstable::ObMacroBlockHandle block_handle;
if (OB_FAIL(OB_SERVER_BLOCK_MGR.alloc_block(block_handle))) {
LOG_WARN("alloc macro block failed", K(ret), K(i));
} else if (OB_FAIL(block_handles.push_back(block_handle))) {
LOG_WARN("push back block handle failed", K(ret), K(block_handle));
}
}
//test rnd io
while (OB_SUCC(ret) && io_count < 100) {
int64_t block_idx = ObRandom::rand(block_handles.count()/2, block_handles.count() - 1);
io_info.fd_.first_id_ = block_handles[block_idx].get_macro_id().first_id();
io_info.fd_.second_id_ = block_handles[block_idx].get_macro_id().second_id();
io_info.offset_ = ObRandom::rand(0, OB_DEFAULT_MACRO_BLOCK_SIZE - load_size);
io_info.size_ = load_size;
const int64_t begin_ts = ObTimeUtility::fast_current_time();
if (OB_FAIL(OB_IO_MANAGER.read(io_info, io_handle))) {
LOG_WARN("io benchmark read failed", K(ret), K(io_info));
} else {
++io_count;
rt_us += ObTimeUtility::fast_current_time() - begin_ts;
data_size += io_handle.get_data_size();
}
}
if (OB_SUCC(ret)) {
rt_us = rt_us >= 1 ? rt_us : 1;
disk_rnd_read_speed_ = data_size / rt_us;
init_ = true;
}
io_count = 0;
rt_us = 0;
data_size = 0;
//test seq io
while (OB_SUCC(ret) && io_count < 100 && io_count < block_handles.count()/2) {
int64_t block_idx = io_count;
io_info.fd_.first_id_ = block_handles[block_idx].get_macro_id().first_id();
io_info.fd_.second_id_ = block_handles[block_idx].get_macro_id().second_id();
io_info.offset_ = 0;
io_info.size_ = OB_DEFAULT_MACRO_BLOCK_SIZE;
const int64_t begin_ts = ObTimeUtility::fast_current_time();
if (OB_FAIL(OB_IO_MANAGER.read(io_info, io_handle))) {
LOG_WARN("io benchmark read failed", K(ret), K(io_info));
} else {
++io_count;
rt_us += ObTimeUtility::fast_current_time() - begin_ts;
data_size += io_handle.get_data_size();
}
}
if (OB_SUCC(ret)) {
rt_us = rt_us >= 1 ? rt_us : 1;
disk_seq_read_speed_ = data_size / rt_us;
allocator.free(read_buf);
}
return ret;
}
//TODO: collect system stat with workload
}
}

View File

@ -0,0 +1,223 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_OPT_SYSTEM_STAT_H_
#define _OB_OPT_SYSTEM_STAT_H_
#include <stdint.h>
#include <cstddef>
#include "lib/hash_func/murmur_hash.h"
#include "lib/utility/ob_print_utils.h"
#include "share/cache/ob_kvcache_struct.h"
namespace oceanbase {
namespace common {
/**
* Optimizer System Level Statistics
*/
class ObOptSystemStat : public common::ObIKVCacheValue
{
OB_UNIS_VERSION_V(1);
public:
struct Key : public common::ObIKVCacheKey
{
Key() : tenant_id_(0)
{
}
explicit Key(uint64_t tenant_id) :
tenant_id_(tenant_id)
{
}
void init(uint64_t tenant_id)
{
tenant_id_ = tenant_id;
}
uint64_t hash() const
{
return common::murmurhash(this, sizeof(Key), 0);
}
int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }
bool operator==(const ObIKVCacheKey &other) const
{
const Key &other_key = reinterpret_cast<const Key&>(other);
return tenant_id_ == other_key.tenant_id_;
}
uint64_t get_tenant_id() const
{
return tenant_id_;
}
int64_t size() const
{
return sizeof(*this);
}
int deep_copy(char *buf, const int64_t buf_len, ObIKVCacheKey *&key) const
{
int ret = OB_SUCCESS;
Key *tmp = NULL;
if (NULL == buf || buf_len < size()) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "invalid argument.",
K(ret), KP(buf), K(buf_len), K(size()));
} else {
tmp = new (buf) Key();
*tmp = *this;
key = tmp;
}
return ret;
}
bool is_valid() const
{
return tenant_id_ != 0;
}
void reset()
{
tenant_id_ = 0;
}
TO_STRING_KV(K_(tenant_id));
uint64_t tenant_id_;
};
ObOptSystemStat()
: last_analyzed_(0),
cpu_speed_(0),
disk_seq_read_speed_(0),
disk_rnd_read_speed_(0),
network_speed_(0) {}
ObOptSystemStat(int64_t last_analyzed,
int64_t cpu_speed,
int64_t disk_seq_read_speed,
int64_t disk_rnd_read_speed,
int64_t network_speed) :
last_analyzed_(last_analyzed),
cpu_speed_(cpu_speed),
disk_seq_read_speed_(disk_seq_read_speed),
disk_rnd_read_speed_(disk_rnd_read_speed),
network_speed_(network_speed) {}
virtual ~ObOptSystemStat() {}
int64_t get_last_analyzed() const { return last_analyzed_; }
void set_last_analyzed(int64_t last_analyzed) { last_analyzed_ = last_analyzed; }
int64_t get_cpu_speed() const { return cpu_speed_; }
void set_cpu_speed(int64_t cpu_speed) { cpu_speed_ = cpu_speed; }
int64_t get_disk_seq_read_speed() const { return disk_seq_read_speed_; }
void set_disk_seq_read_speed(int64_t disk_seq_read_speed) { disk_seq_read_speed_ = disk_seq_read_speed; }
int64_t get_disk_rnd_read_speed() const { return disk_rnd_read_speed_; }
void set_disk_rnd_read_speed(int64_t disk_rnd_read_speed) { disk_rnd_read_speed_ = disk_rnd_read_speed; }
int64_t get_network_speed() const { return network_speed_; }
void set_network_speed(int64_t network_speed) { network_speed_ = network_speed; }
virtual int64_t size() const
{
return sizeof(*this);
}
virtual int deep_copy(char *buf, const int64_t buf_len, ObIKVCacheValue *&value) const
{
int ret = OB_SUCCESS;
ObOptSystemStat *tstat = nullptr;
if (nullptr == buf || buf_len < size()) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "invalid argument.", K(ret), KP(buf), K(buf_len), K(size()));
} else {
tstat = new (buf) ObOptSystemStat();
*tstat = *this;
value = tstat;
}
return ret;
}
virtual int deep_copy(char *buf, const int64_t buf_len, ObOptSystemStat *&stat) const
{
int ret = OB_SUCCESS;
ObOptSystemStat *tstat = nullptr;
if (nullptr == buf || buf_len < size()) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "invalid argument.", K(ret), KP(buf), K(buf_len), K(size()));
} else {
tstat = new (buf) ObOptSystemStat();
*tstat = *this;
stat = tstat;
}
return ret;
}
bool is_valid() const
{
return cpu_speed_ > 0 &&
disk_seq_read_speed_ > 0 &&
disk_rnd_read_speed_ > 0 &&
network_speed_ > 0;
}
void reset() {
last_analyzed_ = 0;
cpu_speed_ = 0;
disk_seq_read_speed_ = 0;
disk_rnd_read_speed_ = 0;
network_speed_ = 0;
}
TO_STRING_KV(K(last_analyzed_),
K(cpu_speed_),
K(disk_seq_read_speed_),
K(disk_rnd_read_speed_),
K(network_speed_));
private:
int64_t last_analyzed_;
int64_t cpu_speed_;
int64_t disk_seq_read_speed_;
int64_t disk_rnd_read_speed_;
int64_t network_speed_;
};
class OptSystemIoBenchmark {
public:
OptSystemIoBenchmark()
:disk_seq_read_speed_(0),
disk_rnd_read_speed_(0),
init_(false)
{}
~OptSystemIoBenchmark()
{}
inline bool is_init() const { return init_; }
inline int64_t get_disk_seq_read_speed() const { return disk_seq_read_speed_; }
inline int64_t get_disk_rnd_read_speed() const { return disk_rnd_read_speed_; }
void reset();
static OptSystemIoBenchmark& get_instance();
int run_benchmark(ObIAllocator &allocator);
private:
int64_t disk_seq_read_speed_;
int64_t disk_rnd_read_speed_;
bool init_;
DISALLOW_COPY_AND_ASSIGN(OptSystemIoBenchmark);
};
}
}
#endif /* _OB_OPT_SYSTEM_STAT_H_ */

View File

@ -0,0 +1,52 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "share/stat/ob_opt_system_stat_cache.h"
#include "lib/stat/ob_diagnose_info.h"
namespace oceanbase {
namespace common {
/**
* @return OB_SUCCESS if value corresponding to the key is successfully fetched
* OB_ENTRY_NOT_EXIST if values is not available from the cache
* other error codes if unexpected errors occurred
*/
int ObOptSystemStatCache::get_value(const ObOptSystemStat::Key &key, ObOptSystemStatHandle &handle)
{
int ret = OB_SUCCESS;
if (OB_FAIL(get(key, handle.stat_, handle.handle_))) {
if (OB_ENTRY_NOT_EXIST != ret) {
COMMON_LOG(WARN, "fail to get value from cache", K(ret), K(key));
}
EVENT_INC(ObStatEventIds::OPT_SYSTEM_STAT_CACHE_MISS);
} else {
handle.cache_ = this;
EVENT_INC(ObStatEventIds::OPT_SYSTEM_STAT_CACHE_HIT);
}
return ret;
}
int ObOptSystemStatCache::put_value(const ObOptSystemStat::Key &key, const ObOptSystemStat &value)
{
return put(key, value, true /* overwrite */);
}
int ObOptSystemStatCache::put_and_fetch_value(const ObOptSystemStat::Key &key,
const ObOptSystemStat &value,
ObOptSystemStatHandle &handle)
{
return put_and_fetch(key, value, handle.stat_, handle.handle_, true /* overwrite */ );
}
}
}

View File

@ -0,0 +1,67 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_OPT_SYSTEM_STAT_CACHE_H_
#define _OB_OPT_SYSTEM_STAT_CACHE_H_
#include "share/cache/ob_kv_storecache.h"
#include "share/stat/ob_opt_system_stat.h"
namespace oceanbase {
namespace common {
struct ObOptSystemStatHandle;
class ObOptSystemStatCache : public common::ObKVCache<ObOptSystemStat::Key, ObOptSystemStat>
{
public:
int get_value(const ObOptSystemStat::Key &key, ObOptSystemStatHandle &handle);
int put_value(const ObOptSystemStat::Key &key, const ObOptSystemStat &value);
int put_and_fetch_value(const ObOptSystemStat::Key &key,
const ObOptSystemStat &value,
ObOptSystemStatHandle &handle);
};
struct ObOptSystemStatHandle
{
const ObOptSystemStat *stat_;
ObOptSystemStatCache *cache_;
ObKVCacheHandle handle_;
ObOptSystemStatHandle()
: stat_(nullptr), cache_(nullptr), handle_() {}
ObOptSystemStatHandle(const ObOptSystemStatHandle &other)
{
if (this != &other) {
*this = other;
}
}
~ObOptSystemStatHandle()
{
stat_ = nullptr;
cache_ = nullptr;
}
void reset()
{
stat_ = nullptr;
cache_ = nullptr;
handle_.reset();
}
TO_STRING_KV(K(stat_));
};
}
}
#endif /* _OB_OPT_SYSTEM_STAT_CACHE_H_ */

View File

@ -667,6 +667,24 @@ struct ObSetColumnStatParam
};
struct ObSetSystemStatParam
{
ObSetSystemStatParam()
:tenant_id_(OB_INVALID_ID),
name_(),
value_(0)
{ }
int64_t tenant_id_;
ObString name_;
int64_t value_;
TO_STRING_KV(K(tenant_id_),
K(name_),
K(value_)
);
};
struct ObPartitionStatInfo
{
ObPartitionStatInfo():