Adaptively adjust query timeout for refresh full simple table schemas
This commit is contained in:
@ -4408,47 +4408,59 @@ int ObSchemaServiceSQLImpl::fetch_tables(
|
||||
const int64_t snapshot_timestamp = schema_status.snapshot_timestamp_;
|
||||
const uint64_t exec_tenant_id = fill_exec_tenant_id(schema_status);
|
||||
const char *table_name = NULL;
|
||||
int64_t start_time = ObTimeUtility::current_time();
|
||||
if (!check_inner_stat()) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("check inner stat fail", K(ret));
|
||||
} else if (OB_FAIL(ObSchemaUtils::get_all_table_history_name(exec_tenant_id,
|
||||
table_name,
|
||||
schema_service_))) {
|
||||
LOG_WARN("fail to get all table name", K(ret), K(exec_tenant_id));
|
||||
} else if (!is_increase_schema) {
|
||||
if (OB_FAIL(sql.append_fmt(FETCH_ALL_TABLE_HISTORY_FULL_SCHEMA,
|
||||
table_name, table_name,
|
||||
fill_extract_tenant_id(schema_status, tenant_id),
|
||||
schema_version,
|
||||
fill_extract_schema_id(schema_status, OB_ALL_CORE_TABLE_TID)))) {
|
||||
LOG_WARN("append sql failed", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(sql.append_fmt(FETCH_ALL_TABLE_HISTORY_SQL3,
|
||||
table_name,
|
||||
fill_extract_tenant_id(schema_status, tenant_id)))) {
|
||||
LOG_WARN("append sql failed", K(ret));
|
||||
} else if (OB_FAIL(sql.append_fmt(" AND SCHEMA_VERSION <= %ld", schema_version))) {
|
||||
LOG_WARN("append sql failed", K(ret));
|
||||
} else if (OB_FAIL(sql.append_fmt(" AND table_id in"))) {
|
||||
LOG_WARN("append failed", K(ret));
|
||||
} else if (OB_FAIL(SQL_APPEND_SCHEMA_ID(table, schema_keys, schema_key_size, sql))) {
|
||||
LOG_WARN("sql append table id failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
DEFINE_SQL_CLIENT_RETRY_WEAK_WITH_SNAPSHOT(sql_client, snapshot_timestamp);
|
||||
if (OB_FAIL(sql.append(" ORDER BY tenant_id desc, table_id desc, schema_version desc"))) {
|
||||
LOG_WARN("sql append failed", K(ret));
|
||||
} else if (OB_FAIL(sql_client_retry_weak.read(res, exec_tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("execute sql failed", K(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_UNLIKELY(NULL == (result = res.get_result()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get result. ", K(ret));
|
||||
} else if (OB_FAIL(ObSchemaRetrieveUtils::retrieve_table_schema(tenant_id, *result, schema_array))) {
|
||||
LOG_WARN("failed to retrieve table schema", K(ret));
|
||||
ObTimeoutCtx ctx;
|
||||
if (OB_FAIL(ObSchemaUtils::get_all_table_history_name(exec_tenant_id,
|
||||
table_name,
|
||||
schema_service_))) {
|
||||
LOG_WARN("fail to get all table name", K(ret), K(exec_tenant_id));
|
||||
} else if (!is_increase_schema) {
|
||||
const char *tname = OB_ALL_TABLE_HISTORY_TNAME;
|
||||
if (OB_FAIL(set_refresh_full_schema_timeout_ctx_(sql_client, tenant_id, tname, ctx))) {
|
||||
LOG_WARN("fail to set refresh full schema timeout ctx", KR(ret), K(tenant_id), "tname", tname);
|
||||
} else if (OB_FAIL(sql.append_fmt(FETCH_ALL_TABLE_HISTORY_FULL_SCHEMA,
|
||||
table_name, table_name,
|
||||
fill_extract_tenant_id(schema_status, tenant_id),
|
||||
schema_version,
|
||||
fill_extract_schema_id(schema_status, OB_ALL_CORE_TABLE_TID)))) {
|
||||
LOG_WARN("append sql failed", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(sql.append_fmt(FETCH_ALL_TABLE_HISTORY_SQL3,
|
||||
table_name,
|
||||
fill_extract_tenant_id(schema_status, tenant_id)))) {
|
||||
LOG_WARN("append sql failed", K(ret));
|
||||
} else if (OB_FAIL(sql.append_fmt(" AND SCHEMA_VERSION <= %ld", schema_version))) {
|
||||
LOG_WARN("append sql failed", K(ret));
|
||||
} else if (OB_FAIL(sql.append_fmt(" AND table_id in"))) {
|
||||
LOG_WARN("append failed", K(ret));
|
||||
} else if (OB_FAIL(SQL_APPEND_SCHEMA_ID(table, schema_keys, schema_key_size, sql))) {
|
||||
LOG_WARN("sql append table id failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
DEFINE_SQL_CLIENT_RETRY_WEAK_WITH_SNAPSHOT(sql_client, snapshot_timestamp);
|
||||
if (OB_FAIL(sql.append(" ORDER BY tenant_id desc, table_id desc, schema_version desc"))) {
|
||||
LOG_WARN("sql append failed", K(ret));
|
||||
} else if (OB_FAIL(sql_client_retry_weak.read(res, exec_tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("execute sql failed", K(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_UNLIKELY(NULL == (result = res.get_result()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get result. ", K(ret));
|
||||
} else if (OB_FAIL(ObSchemaRetrieveUtils::retrieve_table_schema(tenant_id, *result, schema_array))) {
|
||||
LOG_WARN("failed to retrieve table schema", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!is_increase_schema) {
|
||||
FLOG_INFO("[REFRESH_SCHEMA] fetch all tables cost",
|
||||
KR(ret), K(tenant_id),
|
||||
"cost", ObTimeUtility::current_time() - start_time);
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -4496,6 +4508,11 @@ int ObSchemaServiceSQLImpl::fetch_tables(
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!is_increase_schema) {
|
||||
FLOG_INFO("[REFRESH_SCHEMA] fetch all simple tables cost",
|
||||
KR(ret), K(tenant_id),
|
||||
"cost", ObTimeUtility::current_time() - start_time);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -8716,6 +8733,89 @@ int ObSchemaServiceSQLImpl::try_mock_link_table_column(ObTableSchema &table_sche
|
||||
return ret;
|
||||
}
|
||||
|
||||
// this timeout context will take effective when ObTimeoutCtx/THIS_WORKER.timeout is not set.
|
||||
int ObSchemaServiceSQLImpl::set_refresh_full_schema_timeout_ctx_(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const char* tname,
|
||||
ObTimeoutCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t timeout = 0;
|
||||
if (OB_UNLIKELY(
|
||||
OB_INVALID_TENANT_ID == tenant_id
|
||||
|| OB_ISNULL(tname))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tenant_id or tname is empty", KR(ret), K(tenant_id), KP(tname));
|
||||
} else if (OB_FAIL(calc_refresh_full_schema_timeout_ctx_(sql_client, tenant_id, tname, timeout))) {
|
||||
LOG_WARN("fail to calc refresh full schema timeout", KR(ret), K(tenant_id), "tname", tname);
|
||||
} else {
|
||||
const int64_t ori_ctx_timeout = ctx.get_timeout();
|
||||
const int64_t ori_worker_timeout = THIS_WORKER.get_timeout_ts();
|
||||
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, timeout))) {
|
||||
LOG_WARN("fail to set timeout ctx", KR(ret), K(timeout));
|
||||
}
|
||||
FLOG_INFO("[REFRESH_SCHEMA] try set refresh schema timeout ctx",
|
||||
KR(ret), K(tenant_id),
|
||||
"tname", tname,
|
||||
K(ori_ctx_timeout),
|
||||
K(ori_worker_timeout),
|
||||
"calc_timeout", timeout,
|
||||
"actual_timeout", ctx.get_timeout());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSchemaServiceSQLImpl::calc_refresh_full_schema_timeout_ctx_(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const char* tname,
|
||||
int64_t &timeout)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTimeoutCtx ctx;
|
||||
timeout = 0;
|
||||
int64_t start_time = ObTimeUtility::current_time();
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObMySQLResult *result = NULL;
|
||||
ObSqlString sql;
|
||||
// consider that sql scan 10w records per scecond, and normally history table has 1000w records at most.
|
||||
int64_t default_timeout = 4 * GCONF.internal_sql_execute_timeout;
|
||||
if (OB_UNLIKELY(
|
||||
OB_INVALID_TENANT_ID == tenant_id
|
||||
|| OB_ISNULL(tname))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tenant_id or tname is empty", KR(ret), K(tenant_id), KP(tname));
|
||||
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, default_timeout))) {
|
||||
LOG_WARN("fail to set default timeout", KR(ret), K(default_timeout));
|
||||
} else if (OB_FAIL(sql.assign_fmt("SELECT count(*) as count FROM %s", tname))) {
|
||||
LOG_WARN("append sql failed", KR(ret), K(sql), "tname", tname);
|
||||
} else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get result. ", KR(ret));
|
||||
} else if (OB_FAIL(result->next())) {
|
||||
LOG_WARN("fail to get next row", KR(ret));
|
||||
} else {
|
||||
int64_t row_cnt = 0;
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "count", row_cnt, int64_t);
|
||||
if (OB_SUCC(ret)) {
|
||||
// each 100w history may cost almost 400s (almost each 7w history may cost `internal_sql_execute_timeout`)
|
||||
// for more details: ob/qa/gqk9w2
|
||||
timeout = ((row_cnt / (70 * 1000L)) + 1) * GCONF.internal_sql_execute_timeout;
|
||||
FLOG_INFO("[REFRESH_SCHEMA] calc refresh schema timeout",
|
||||
KR(ret), K(tenant_id),
|
||||
"tname", tname,
|
||||
K(row_cnt), K(timeout),
|
||||
"cost", ObTimeUtility::current_time() - start_time);
|
||||
}
|
||||
}
|
||||
} // end SMTART_VAR
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
}//namespace schema
|
||||
}//namespace share
|
||||
}//namespace oceanbase
|
||||
|
||||
@ -975,6 +975,18 @@ private:
|
||||
int gen_leader_normal_schema_version(const uint64_t tenant_id,
|
||||
const int64_t refreshed_schema_version,
|
||||
int64_t &schema_version);
|
||||
|
||||
int set_refresh_full_schema_timeout_ctx_(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const char* tname,
|
||||
ObTimeoutCtx &ctx);
|
||||
|
||||
int calc_refresh_full_schema_timeout_ctx_(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const char* tname,
|
||||
int64_t &timeout);
|
||||
private:
|
||||
common::ObMySQLProxy *mysql_proxy_;
|
||||
common::ObDbLinkProxy *dblink_proxy_;
|
||||
|
||||
Reference in New Issue
Block a user