change the update stat cache way, use the current worker thread instead of RS worker thread

This commit is contained in:
wangt1xiuyi
2023-02-21 05:41:51 +00:00
committed by ob-robot
parent ed67ac652e
commit 7afa91237f
6 changed files with 140 additions and 165 deletions

View File

@ -28,6 +28,7 @@
#include "lib/timezone/ob_time_convert.h"
#include "sql/das/ob_das_location_router.h"
#include "sql/ob_sql_utils.h"
#include "storage/ob_locality_manager.h"
namespace oceanbase
{
@ -64,7 +65,6 @@ int ObDbmsStats::gather_table_stats(ObExecContext &ctx, ParamStore &params, ObOb
{
int ret = OB_SUCCESS;
UNUSED(result);
obrpc::ObCommonRpcProxy *proxy = NULL;
ObTableStatParam stat_param;
bool is_all_fast_gather = false;
ObSEArray<int64_t, 4> no_gather_index_ids;
@ -73,8 +73,6 @@ int ObDbmsStats::gather_table_stats(ObExecContext &ctx, ParamStore &params, ObOb
} else if (OB_ISNULL(ctx.get_my_session()) || OB_ISNULL(ctx.get_task_executor_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(ret), K(ctx.get_my_session()), K(ctx.get_task_executor_ctx()));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(parse_table_part_info(ctx,
params.at(0),
params.at(1),
@ -102,7 +100,7 @@ int ObDbmsStats::gather_table_stats(ObExecContext &ctx, ParamStore &params, ObOb
LOG_WARN("failed to do flush database monitoring info", K(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::gather_table_stats(ctx, stat_param))) {
LOG_WARN("failed to gather table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else if (stat_param.cascade_ &&
OB_FAIL(fast_gather_index_stats(ctx, stat_param,
@ -143,14 +141,11 @@ int ObDbmsStats::gather_schema_stats(ObExecContext &ctx, ParamStore &params, ObO
UNUSED(result);
ObTableStatParam global_param;
ObSEArray<uint64_t, 4> table_ids;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_ISNULL(ctx.get_my_session()) || OB_ISNULL(ctx.get_task_executor_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(ret), K(ctx.get_my_session()), K(ctx.get_task_executor_ctx()));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(ObOptStatMonitorManager::flush_database_monitoring_info(ctx, false, true))) {
LOG_WARN("failed to do flush database monitoring info", K(ret));
} else if (OB_FAIL(get_all_table_ids_in_database(ctx, params.at(0), global_param, table_ids))) {
@ -190,7 +185,7 @@ int ObDbmsStats::gather_schema_stats(ObExecContext &ctx, ParamStore &params, ObO
}
} else if (OB_FAIL(ObDbmsStatsExecutor::gather_table_stats(ctx, stat_param))) {
LOG_WARN("failed to gather table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else if (stat_param.cascade_ &&
OB_FAIL(fast_gather_index_stats(ctx, stat_param,
@ -230,7 +225,6 @@ int ObDbmsStats::gather_index_stats(ObExecContext &ctx, ParamStore &params, ObOb
{
int ret = OB_SUCCESS;
UNUSED(result);
obrpc::ObCommonRpcProxy *proxy = NULL;
ObTableStatParam ind_stat_param;
ind_stat_param.is_index_stat_ = true;
ObObjParam empty_sample;
@ -241,8 +235,6 @@ int ObDbmsStats::gather_index_stats(ObExecContext &ctx, ParamStore &params, ObOb
empty_cascade.set_null();
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (lib::is_oracle_mode() && !params.at(11).is_null()) {
ret = OB_ERR_DBMS_STATS_PL;
LOG_WARN("table name shouldn't be specified in gather index stats", K(ret));
@ -279,7 +271,7 @@ int ObDbmsStats::gather_index_stats(ObExecContext &ctx, ParamStore &params, ObOb
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::gather_index_stats(ctx, ind_stat_param))) {
LOG_WARN("failed to gather table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, ind_stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), ind_stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("succeed to gather index stats", K(ind_stat_param));
@ -292,12 +284,8 @@ int ObDbmsStats::gather_table_index_stats(ObExecContext &ctx,
ObIArray<int64_t> &no_gather_index_ids)
{
int ret = OB_SUCCESS;
obrpc::ObCommonRpcProxy *proxy = NULL;
share::schema::ObSchemaGetterGuard *schema_guard = ctx.get_virtual_table_ctx().schema_guard_;
int64_t start_time = ObTimeUtility::current_time();
if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < no_gather_index_ids.count(); ++i) {
StatTable stat_table;
stat_table.database_id_ = data_param.db_id_;
@ -337,14 +325,13 @@ int ObDbmsStats::gather_table_index_stats(ObExecContext &ctx,
LOG_WARN("failed to get valid duration time", K(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::gather_index_stats(ctx, index_param))) {
LOG_WARN("failed to gather table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, index_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), index_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("Succeed to gather index stats", K(data_param), K(index_param));
}
}
}
}
return ret;
}
@ -358,10 +345,7 @@ int ObDbmsStats::fast_gather_index_stats(ObExecContext &ctx,
is_all_fast_gather = true;
ObSEArray<ObAuxTableMetaInfo, 4> simple_index_infos;
share::schema::ObSchemaGetterGuard *schema_guard = ctx.get_virtual_table_ctx().schema_guard_;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(get_table_index_infos(ctx, data_param.table_id_, simple_index_infos))) {
if (OB_FAIL(get_table_index_infos(ctx, data_param.table_id_, simple_index_infos))) {
LOG_WARN("failed to get table index infos", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < simple_index_infos.count(); ++i) {
@ -418,7 +402,7 @@ int ObDbmsStats::fast_gather_index_stats(ObExecContext &ctx,
is_all_fast_gather &= is_fast_gather;
LOG_TRACE("can't fast gather index stats", K(data_param), K(index_param));
}
} else if (OB_FAIL(update_stat_cache(proxy, index_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), index_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
is_all_fast_gather &= is_fast_gather;
@ -457,11 +441,8 @@ int ObDbmsStats::set_table_stats(ObExecContext &ctx, ParamStore &params, ObObj &
int ret = OB_SUCCESS;
UNUSED(result);
ObSetTableStatParam param;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(parse_set_table_info(ctx,
params.at(0),
params.at(1),
@ -492,7 +473,7 @@ int ObDbmsStats::set_table_stats(ObExecContext &ctx, ParamStore &params, ObObj &
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::set_table_stats(ctx, param))) {
LOG_WARN("failed to set table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, param.table_param_))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), param.table_param_))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("succeed to set table stat", K(param));
@ -537,11 +518,8 @@ int ObDbmsStats::set_column_stats(sql::ObExecContext &ctx,
int ret = OB_SUCCESS;
UNUSED(result);
ObSetColumnStatParam param;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (params.at(2).is_null() && !params.at(1).is_null()) {
//do nothing
} else if (OB_FAIL(parse_set_column_stats(ctx,
@ -584,7 +562,7 @@ int ObDbmsStats::set_column_stats(sql::ObExecContext &ctx,
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::set_column_stats(ctx, param))) {
LOG_WARN("failed to set column stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, param.table_param_))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), param.table_param_))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("succeed to set column stats", K(param));
@ -629,15 +607,12 @@ int ObDbmsStats::set_index_stats(ObExecContext &ctx, ParamStore &params, ObObj &
ObSetTableStatParam set_index_param;
ObTableStatParam index_stat_param;
index_stat_param.is_index_stat_ = true;
obrpc::ObCommonRpcProxy *proxy = NULL;
number::ObNumber num_numrows;
number::ObNumber num_avgrlen;
number::ObNumber num_nummacroblks;
number::ObNumber num_nummicroblks;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (lib::is_oracle_mode() && !params.at(22).is_null()) {
ret = OB_ERR_DBMS_STATS_PL;
LOG_WARN("table name shouldn't be specified in gather index stats", K(ret));
@ -702,7 +677,8 @@ int ObDbmsStats::set_index_stats(ObExecContext &ctx, ParamStore &params, ObObj &
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::set_table_stats(ctx, set_index_param))) {
LOG_WARN("failed to set table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, set_index_param.table_param_))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(),
set_index_param.table_param_))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("succeed to set index stat", K(set_index_param));
@ -732,15 +708,12 @@ int ObDbmsStats::delete_table_stats(ObExecContext &ctx, ParamStore &params, ObOb
{
int ret = OB_SUCCESS;
UNUSED(result);
obrpc::ObCommonRpcProxy *proxy = NULL;
ObTableStatParam stat_param;
bool cascade_parts = false;
bool cascade_columns = false;
bool cascade_indexes = false;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(parse_table_part_info(ctx,
params.at(0),
params.at(1),
@ -794,7 +767,7 @@ int ObDbmsStats::delete_table_stats(ObExecContext &ctx, ParamStore &params, ObOb
stat_param,
cascade_columns))) {
LOG_WARN("failed to delete table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else if (cascade_indexes && stat_param.part_name_.empty()) {
if (OB_FAIL(delete_table_index_stats(ctx, stat_param))) {
@ -827,15 +800,12 @@ int ObDbmsStats::delete_column_stats(ObExecContext &ctx, ParamStore &params, ObO
{
int ret = OB_SUCCESS;
UNUSED(result);
obrpc::ObCommonRpcProxy *proxy = NULL;
ObTableStatParam stat_param;
ObString col_stat_type("ALL");
bool cascade_parts = false;
bool only_histogram = false;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(parse_table_part_info(ctx,
params.at(0),
params.at(1),
@ -899,7 +869,7 @@ int ObDbmsStats::delete_column_stats(ObExecContext &ctx, ParamStore &params, ObO
stat_param,
only_histogram))) {
LOG_WARN("failed to delete table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
}
}
@ -925,11 +895,8 @@ int ObDbmsStats::delete_schema_stats(ObExecContext &ctx, ParamStore &params, ObO
UNUSED(result);
ObTableStatParam global_param;
ObSEArray<uint64_t, 4> table_ids;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(get_all_table_ids_in_database(ctx, params.at(0), global_param, table_ids))) {
LOG_WARN("failed to get all table ids in database", K(ret));
} else {
@ -962,7 +929,7 @@ int ObDbmsStats::delete_schema_stats(ObExecContext &ctx, ParamStore &params, ObO
}
} else if (OB_FAIL(ObDbmsStatsExecutor::delete_table_stats(ctx, stat_param, true))) {
LOG_WARN("failed to delete table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("Succeed to delete table stats", K(stat_param));
@ -995,15 +962,12 @@ int ObDbmsStats::delete_index_stats(ObExecContext &ctx, ParamStore &params, ObOb
{
int ret = OB_SUCCESS;
UNUSED(result);
obrpc::ObCommonRpcProxy *proxy = NULL;
ObTableStatParam index_stat_param;
index_stat_param.is_index_stat_ = true;
bool cascade_parts = false;
bool only_histogram = false;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (lib::is_oracle_mode() && !params.at(10).is_null()) {
ret = OB_ERR_DBMS_STATS_PL;
LOG_WARN("table name shouldn't be specified in gather index stats", K(ret));
@ -1056,7 +1020,7 @@ int ObDbmsStats::delete_index_stats(ObExecContext &ctx, ParamStore &params, ObOb
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::delete_table_stats(ctx, index_stat_param, false))) {
LOG_WARN("failed to delete table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, index_stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), index_stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
}
}
@ -1068,10 +1032,7 @@ int ObDbmsStats::delete_table_index_stats(sql::ObExecContext &ctx,
{
int ret = OB_SUCCESS;
ObSEArray<ObAuxTableMetaInfo, 4> simple_index_infos;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(get_table_index_infos(ctx, data_param.table_id_, simple_index_infos))) {
if (OB_FAIL(get_table_index_infos(ctx, data_param.table_id_, simple_index_infos))) {
LOG_WARN("failed to get table index infos", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < simple_index_infos.count(); ++i) {
@ -1094,7 +1055,7 @@ int ObDbmsStats::delete_table_index_stats(sql::ObExecContext &ctx,
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::delete_table_stats(ctx, index_param, false))) {
LOG_WARN("failed to delete table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, index_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), index_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {/*do nothing*/}
}
@ -1556,12 +1517,9 @@ int ObDbmsStats::import_table_stats(ObExecContext &ctx, ParamStore &params, ObOb
SMART_VAR(ObTableStatParam, stat_table_param) {
ObTableStatParam stat_param;
bool cascade_index = false;
obrpc::ObCommonRpcProxy *proxy = NULL;
const share::schema::ObTableSchema *table_schema = NULL;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(parse_table_part_info(ctx,
params.at(0),
params.at(1),
@ -1634,7 +1592,7 @@ int ObDbmsStats::import_table_stats(ObExecContext &ctx, ParamStore &params, ObOb
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExportImport::import_table_stats(ctx, stat_param))) {
LOG_WARN("failed to import table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else if (cascade_index && stat_param.part_name_.empty() &&
OB_FAIL(import_table_index_stats(ctx, stat_param))) {
@ -1672,11 +1630,8 @@ int ObDbmsStats::import_column_stats(sql::ObExecContext &ctx,
ObTableStatParam stat_table_param;
const share::schema::ObTableSchema *table_schema = NULL;
stat_param.cascade_ = true;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(parse_table_part_info(ctx,
params.at(0),
params.at(1),
@ -1733,7 +1688,7 @@ int ObDbmsStats::import_column_stats(sql::ObExecContext &ctx,
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExportImport::import_column_stats(ctx, stat_param))) {
LOG_WARN("failed to import column stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("succeed to import column stats", K(stat_param));
@ -1762,11 +1717,8 @@ int ObDbmsStats::import_schema_stats(ObExecContext &ctx, ParamStore &params, ObO
ObTableStatParam stat_table_param;
const share::schema::ObTableSchema *table_schema = NULL;
ObSEArray<uint64_t, 4> table_ids;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(get_all_table_ids_in_database(ctx, params.at(0), global_param, table_ids))) {
LOG_WARN("failed to get all table ids in database", K(ret));
} else if (OB_FAIL(parse_table_info(ctx,
@ -1818,7 +1770,7 @@ int ObDbmsStats::import_schema_stats(ObExecContext &ctx, ParamStore &params, ObO
}
} else if (OB_FAIL(ObDbmsStatsExportImport::import_table_stats(ctx, stat_param))) {
LOG_WARN("failed to import table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else if (OB_FAIL(import_table_index_stats(ctx, stat_param))) {
LOG_WARN("failed to import table index stats", K(ret));
@ -1855,12 +1807,9 @@ int ObDbmsStats::import_index_stats(ObExecContext &ctx, ParamStore &params, ObOb
ObTableStatParam stat_table_param;
ObTableStatParam index_stat_param;
index_stat_param.is_index_stat_ = true;
obrpc::ObCommonRpcProxy *proxy = NULL;
const share::schema::ObTableSchema *table_schema = NULL;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (lib::is_oracle_mode() && !params.at(8).is_null()) {
ret = OB_ERR_DBMS_STATS_PL;
LOG_WARN("table name shouldn't be specified in gather index stats", K(ret));
@ -1927,7 +1876,7 @@ int ObDbmsStats::import_index_stats(ObExecContext &ctx, ParamStore &params, ObOb
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExportImport::import_table_stats(ctx, index_stat_param))) {
LOG_WARN("failed to import table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, index_stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), index_stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("succeed to import index stats", K(index_stat_param));
@ -1940,10 +1889,7 @@ int ObDbmsStats::import_table_index_stats(sql::ObExecContext &ctx,
{
int ret = OB_SUCCESS;
ObSEArray<ObAuxTableMetaInfo, 4> simple_index_infos;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(get_table_index_infos(ctx, data_param.table_id_, simple_index_infos))) {
if (OB_FAIL(get_table_index_infos(ctx, data_param.table_id_, simple_index_infos))) {
LOG_WARN("failed to get table index infos", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < simple_index_infos.count(); ++i) {
@ -1967,7 +1913,7 @@ int ObDbmsStats::import_table_index_stats(sql::ObExecContext &ctx,
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExportImport::import_table_stats(ctx, index_param))) {
LOG_WARN("failed to import table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, index_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), index_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("succeed to import table index stats", K(index_param));
@ -2339,11 +2285,8 @@ int ObDbmsStats::restore_table_stats(sql::ObExecContext &ctx,
ObString stat_type_str;
bool restore_cluster_index = false;
int64_t specify_time = 0;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(parse_table_part_info(ctx,
params.at(0),
params.at(1),
@ -2421,7 +2364,7 @@ int ObDbmsStats::restore_table_stats(sql::ObExecContext &ctx,
stat_param,
specify_time))) {
LOG_WARN("failed restore table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {/*do nothing*/}
return ret;
@ -2446,12 +2389,9 @@ int ObDbmsStats::restore_schema_stats(sql::ObExecContext &ctx,
UNUSED(result);
ObTableStatParam global_param;
ObSEArray<uint64_t, 4> table_ids;
obrpc::ObCommonRpcProxy *proxy = NULL;
int64_t specify_time = 0;
if (OB_FAIL(check_statistic_table_writeable(ctx))) {
LOG_WARN("failed to check tenant is restore", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(get_all_table_ids_in_database(ctx, params.at(0), global_param, table_ids))) {
LOG_WARN("failed to get all table ids in database", K(ret));
} else if (lib::is_oracle_mode()) {
@ -2501,7 +2441,7 @@ int ObDbmsStats::restore_schema_stats(sql::ObExecContext &ctx,
stat_param,
specify_time))) {
LOG_WARN("failed restore table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("Succeed to restore table stats", K(stat_param), K(specify_time));
@ -3020,7 +2960,7 @@ int ObDbmsStats::delete_table_prefs(sql::ObExecContext &ctx,
return ret;
}
int ObDbmsStats::update_stat_cache(obrpc::ObCommonRpcProxy *proxy,
int ObDbmsStats::update_stat_cache(const uint64_t rpc_tenant_id,
const ObTableStatParam &param)
{
int ret = OB_SUCCESS;
@ -3051,19 +2991,47 @@ int ObDbmsStats::update_stat_cache(obrpc::ObCommonRpcProxy *proxy,
}
if (OB_SUCC(ret)) {
LOG_TRACE("update stat cache", K(stat_arg));
if (OB_ISNULL(proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("proxy is null", K(ret), K(proxy));
} else if (OB_FAIL(proxy->update_stat_cache(stat_arg))) {
bool evict_plan_failed = false;
int64_t timeout = -1;
ObSEArray<ObServerLocality, 8> all_server_arr;
bool has_read_only_zone = false; // UNUSED;
if (OB_ISNULL(GCTX.srv_rpc_proxy_) || OB_ISNULL(GCTX.locality_manager_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("rpc_proxy or session is null", K(ret), K(GCTX.srv_rpc_proxy_), K(GCTX.locality_manager_));
} else if (OB_FAIL(GCTX.locality_manager_->get_server_locality_array(all_server_arr,
has_read_only_zone))) {
LOG_WARN("fail to get server locality", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < all_server_arr.count(); i++) {
if (!all_server_arr.at(i).is_active()
|| ObServerStatus::OB_SERVER_ACTIVE != all_server_arr.at(i).get_server_status()
|| 0 == all_server_arr.at(i).get_start_service_time()
|| 0 != all_server_arr.at(i).get_server_stop_time()) {
//server may not serving
} else if (0 >= (timeout = THIS_WORKER.get_timeout_remain())) {
ret = OB_TIMEOUT;
LOG_WARN("query timeout is reached", K(ret), K(timeout));
} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(all_server_arr.at(i).get_addr())
.timeout(timeout)
.by(rpc_tenant_id)
.update_local_stat_cache(stat_arg))) {
// OB_SQL_PC_NOT_EXIST represent evict plan failed
if (OB_SQL_PC_NOT_EXIST == ret) {
ret = OB_SUCCESS;
evict_plan_failed = true;
} else {
LOG_WARN("failed to update local stat cache", K(ret));
}
}
}
if (OB_SUCC(ret) && evict_plan_failed) {
ret = OB_ERR_DBMS_STATS_PL;
LOG_USER_ERROR(OB_ERR_DBMS_STATS_PL,
"Evict plan cache failed. SQL execution will use origin plan. " \
"Try flush plan cache manually if you want to generate plan with " \
"latest statistics.");
} else {
LOG_WARN("failed to update stat cache", K(ret));
LOG_TRACE("Succeed to update stat cache", K(param), K(stat_arg), K(all_server_arr));
}
}
}
@ -5681,7 +5649,6 @@ int ObDbmsStats::gather_table_stats_with_default_param(ObExecContext &ctx,
{
int ret = OB_SUCCESS;
ObTableStatParam stat_param;
obrpc::ObCommonRpcProxy *proxy = NULL;
stat_param.db_id_ = stat_table.database_id_;
bool is_all_fast_gather = false;
ObSEArray<int64_t, 4> no_gather_index_ids;
@ -5689,8 +5656,6 @@ int ObDbmsStats::gather_table_stats_with_default_param(ObExecContext &ctx,
duration_time,
stat_param.duration_time_))) {
LOG_WARN("failed to get valid duration time", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(parse_table_part_info(ctx, stat_table, stat_param))) {
LOG_WARN("failed to parse owner", K(ret));
} else if (OB_FAIL(use_default_gather_stat_options(ctx, stat_table, stat_param))) {
@ -5704,7 +5669,7 @@ int ObDbmsStats::gather_table_stats_with_default_param(ObExecContext &ctx,
LOG_WARN("failed to adjust table stat param", K(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::gather_table_stats(ctx, stat_param))) {
LOG_WARN("failed to gather table stats", K(ret));
} else if (OB_FAIL(update_stat_cache(proxy, stat_param))) {
} else if (OB_FAIL(update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), stat_param))) {
LOG_WARN("failed to update stat cache", K(ret));
//refresh duration time
} else if (OB_FAIL(ObDbmsStatsUtils::get_valid_duration_time(start_time,

View File

@ -369,7 +369,7 @@ public:
static int get_part_ids_from_schema(const share::schema::ObTableSchema *table_schema,
common::ObIArray<ObObjectID> &target_part_ids);
static int update_stat_cache(obrpc::ObCommonRpcProxy *proxy,
static int update_stat_cache(const uint64_t rpc_tenant_id,
const ObTableStatParam &param);
static int parse_set_table_stat_options(ObExecContext &ctx,

View File

@ -783,7 +783,6 @@ int ObDbmsStatsExecutor::update_stat_online(ObExecContext &ctx,
int ret = OB_SUCCESS;
int64_t affected_rows = 0;
obrpc::ObCommonRpcProxy *proxy = NULL;
//before write, we need record history stats.
ObSEArray<ObOptTableStatHandle, 4> history_tab_handles;
@ -802,8 +801,6 @@ int ObDbmsStatsExecutor::update_stat_online(ObExecContext &ctx,
LOG_WARN("fail to check lock stat", K(ret));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("fail to get proxy", K(ret));
} else if (OB_FAIL(ObDbmsStatsUtils::get_part_ids_from_param(param, part_ids))) {
//part id should generated after check stat locked, since check_stat_locked will change part_info
LOG_WARN("fail to get part_ids");
@ -832,7 +829,7 @@ int ObDbmsStatsExecutor::update_stat_online(ObExecContext &ctx,
} else if (OB_FAIL(ObBasicStatsEstimator::update_last_modified_count(ctx, param))) {
//update history
LOG_WARN("failed to update last modified count", K(ret));
} else if (OB_FAIL(pl::ObDbmsStats::update_stat_cache(proxy, param))) {
} else if (OB_FAIL(pl::ObDbmsStats::update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), param))) {
LOG_WARN("fail to update stat cache", K(ret));
} else {
// should reuse stats out-side this function.

View File

@ -142,13 +142,10 @@ int ObIncrementalStatEstimator::write_all_opt_stats_by_dircet_load(
ObIArray<ObOptColumnStat *> &all_cstats)
{
int ret = OB_SUCCESS;
obrpc::ObCommonRpcProxy *proxy = NULL;
ObSEArray<ObOptTableStatHandle, 4> history_tab_handles;
ObSEArray<ObOptColumnStatHandle, 4> history_col_handles;
//before write, we need record history stats.
if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("fail to get proxy", K(ret));
} else if (OB_FAIL(ObDbmsStatsHistoryManager::get_history_stat_handles(ctx, param,
if (OB_FAIL(ObDbmsStatsHistoryManager::get_history_stat_handles(ctx, param,
history_tab_handles,
history_col_handles))) {
LOG_WARN("failed to get history stat handles", K(ret));
@ -160,7 +157,7 @@ int ObIncrementalStatEstimator::write_all_opt_stats_by_dircet_load(
LOG_WARN("failed to batch write history stats", K(ret));
} else if (OB_FAIL(ObBasicStatsEstimator::update_last_modified_count(ctx, param))) {
LOG_WARN("failed to update last modified count", K(ret));
} else if (OB_FAIL(pl::ObDbmsStats::update_stat_cache(proxy, param))) {
} else if (OB_FAIL(pl::ObDbmsStats::update_stat_cache(ctx.get_my_session()->get_rpc_tenant_id(), param))) {
LOG_WARN("fail to update stat cache", K(ret));
} else {/*do nothing*/}
return ret;

View File

@ -23,6 +23,7 @@
#include "share/schema/ob_schema_utils.h"
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "observer/ob_sql_client_decorator.h"
#include "storage/ob_locality_manager.h"
namespace oceanbase
{
@ -171,22 +172,40 @@ int ObOptStatMonitorManager::flush_database_monitoring_info(sql::ObExecContext &
const bool is_flush_dml_stat)
{
int ret = OB_SUCCESS;
obrpc::ObCommonRpcProxy *proxy = NULL;
if (OB_ISNULL(ctx.get_my_session())) {
int64_t timeout = -1;
ObSEArray<ObServerLocality, 8> all_server_arr;
bool has_read_only_zone = false; // UNUSED;
if (OB_ISNULL(ctx.get_my_session()) ||
OB_ISNULL(GCTX.srv_rpc_proxy_) ||
OB_ISNULL(GCTX.locality_manager_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_ISNULL(proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("proxy is null", K(ret), K(proxy));
LOG_WARN("get unexpected null", K(ret), K(GCTX.srv_rpc_proxy_),
K(GCTX.locality_manager_), K(ctx.get_my_session()));
} else {
obrpc::ObFlushOptStatArg arg(ctx.get_my_session()->get_effective_tenant_id(),
is_flush_col_usage,
is_flush_dml_stat);
if (OB_FAIL(proxy->flush_opt_stat_monitoring_info(arg))) {
if (OB_FAIL(GCTX.locality_manager_->get_server_locality_array(all_server_arr,
has_read_only_zone))) {
LOG_WARN("fail to get server locality", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < all_server_arr.count(); i++) {
if (!all_server_arr.at(i).is_active()
|| ObServerStatus::OB_SERVER_ACTIVE != all_server_arr.at(i).get_server_status()
|| 0 == all_server_arr.at(i).get_start_service_time()
|| 0 != all_server_arr.at(i).get_server_stop_time()) {
//server may not serving
} else if (0 >= (timeout = THIS_WORKER.get_timeout_remain())) {
ret = OB_TIMEOUT;
LOG_WARN("query timeout is reached", K(ret), K(timeout));
} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(all_server_arr.at(i).get_addr())
.timeout(timeout)
.by(ctx.get_my_session()->get_rpc_tenant_id())
.flush_local_opt_stat_monitoring_info(arg))) {
LOG_WARN("failed to flush opt stat monitoring info", K(ret));
} else {/*do nothing*/}
}
}
}
}
return ret;
}

View File

@ -41,7 +41,6 @@ namespace sql
int ObAnalyzeExecutor::execute(ObExecContext &ctx, ObAnalyzeStmt &stmt)
{
int ret = OB_SUCCESS;
obrpc::ObCommonRpcProxy *proxy = NULL;
ObTableStatParam param;
share::schema::ObSchemaGetterGuard *schema_guard = ctx.get_virtual_table_ctx().schema_guard_;
ObSQLSessionInfo *session = ctx.get_my_session();
@ -59,13 +58,11 @@ int ObAnalyzeExecutor::execute(ObExecContext &ctx, ObAnalyzeStmt &stmt)
} else if (OB_FAIL(stmt.fill_table_stat_param(ctx, param))) {
LOG_WARN("failed to fill table stat param", K(ret));
} else if (!stmt.is_delete_histogram()) {
if (OB_FAIL(ctx.get_task_executor_ctx()->get_common_rpc(proxy))) {
LOG_WARN("failed to get common rpc", K(ret));
} else if (OB_FAIL(ObDbmsStatsLockUnlock::check_stat_locked(ctx, param))) {
if (OB_FAIL(ObDbmsStatsLockUnlock::check_stat_locked(ctx, param))) {
LOG_WARN("failed check stat locked", K(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::gather_table_stats(ctx, param))) {
LOG_WARN("failed to gather table stats", K(ret));
} else if (OB_FAIL(pl::ObDbmsStats::update_stat_cache(proxy, param))) {
} else if (OB_FAIL(pl::ObDbmsStats::update_stat_cache(session->get_rpc_tenant_id(), param))) {
LOG_WARN("failed to update stat cache", K(ret));
} else {
LOG_TRACE("succeed to gather table stats", K(param));