[CP] [CP] fix worker thread deadlock bug caused by purge history stats

This commit is contained in:
wangt1xiuyi
2024-02-23 07:46:11 +00:00
committed by ob-robot
parent 0b038c5677
commit f5464425a4
2 changed files with 196 additions and 93 deletions

View File

@ -60,7 +60,7 @@ namespace common {
#define UPDATE_STATS_HISTROY_RETENTION "UPDATE %s SET sval1 = %ld, \
sval2 = CURRENT_TIMESTAMP where sname = 'STATS_RETENTION';"
#define DELETE_STAT_HISTORY "DELETE FROM %s %s;"
#define DELETE_STAT_HISTORY "DELETE /*+QUERY_TIMEOUT(%ld)*/FROM %s %s LIMIT %ld;"
#define INSERT_TABLE_STAT_HISTORY "INSERT INTO %s(tenant_id, \
table_id, \
@ -868,6 +868,7 @@ int ObDbmsStatsHistoryManager::purge_stats(ObExecContext &ctx, const int64_t spe
ObSQLSessionInfo *session = ctx.get_my_session();
ObSqlString time_str;
ObSqlString gather_time_str;
bool only_delete_one_batch = false;
if (OB_ISNULL(session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(session), K(ret));
@ -889,85 +890,99 @@ int ObDbmsStatsHistoryManager::purge_stats(ObExecContext &ctx, const int64_t spe
retention_val))) {
LOG_WARN("failed to append fmt", K(ret));
} else {/*do nothing*/}
} else if (specify_time == 0) {//delete all statistics history
//Attempt to delete all opt stats at once. Since this is a synchronous operation,
//to avoid impacting user feedback, a batch of data will be synchronously deleted first,
//with the remaining data being cleaned up by an asynchronous task.
} else if (specify_time == 0) {//try delete all statistics history
if (OB_FAIL(time_str.append(" "))) {
LOG_WARN("failed to append", K(ret));
} else if (OB_FAIL(gather_time_str.append(" "))) {
LOG_WARN("failed to append", K(ret));
} else {/*do nothing*/}
} else {
only_delete_one_batch = true;
}
} else if (OB_FAIL(time_str.append_fmt("WHERE savtime < usec_to_time(%ld)", specify_time))) {
LOG_WARN("failed to append fmt", K(ret));
} else if (OB_FAIL(gather_time_str.append_fmt("WHERE start_time < usec_to_time(%ld)", specify_time))) {
LOG_WARN("failed to append fmt", K(ret));
} else {/*do nothing*/}
if (OB_FAIL(ret)) {
} else {
int64_t affected_rows = 0;
if (OB_SUCC(ret)) {
uint64_t tenant_id = session->get_effective_tenant_id();
ObMySQLProxy *mysql_proxy = ctx.get_sql_proxy();
ObSqlString del_tab_history;
ObSqlString del_col_history;
ObSqlString del_hist_history;
ObSqlString del_task_opt_stat_gather_history;
ObSqlString del_table_opt_stat_gather_history;
ObMySQLTransaction trans;
ObMySQLTransaction trans1;
if (OB_ISNULL(mysql_proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(ret), K(mysql_proxy));
} else if (OB_FAIL(del_tab_history.append_fmt(DELETE_STAT_HISTORY,
share::OB_ALL_TABLE_STAT_HISTORY_TNAME,
time_str.ptr()))) {
LOG_WARN("failed to append sql stmt", K(ret), K(del_tab_history));
} else if (OB_FAIL(del_col_history.append_fmt(DELETE_STAT_HISTORY,
share::OB_ALL_COLUMN_STAT_HISTORY_TNAME,
time_str.ptr()))) {
LOG_WARN("failed to append sql stmt", K(ret), K(del_col_history));
} else if (OB_FAIL(del_hist_history.append_fmt(DELETE_STAT_HISTORY,
share::OB_ALL_HISTOGRAM_STAT_HISTORY_TNAME,
time_str.ptr()))) {
LOG_WARN("failed to append sql stmt", K(ret), K(del_hist_history));
} else if (OB_FAIL(del_task_opt_stat_gather_history.append_fmt(DELETE_STAT_HISTORY,
share::OB_ALL_TASK_OPT_STAT_GATHER_HISTORY_TNAME,
gather_time_str.ptr()))) {
LOG_WARN("failed to append sql stmt", K(ret), K(del_hist_history));
} else if (OB_FAIL(del_table_opt_stat_gather_history.append_fmt(DELETE_STAT_HISTORY,
share::OB_ALL_TABLE_OPT_STAT_GATHER_HISTORY_TNAME,
gather_time_str.ptr()))) {
LOG_WARN("failed to append sql stmt", K(ret), K(del_hist_history));
} else if (OB_FAIL(trans.start(mysql_proxy, tenant_id))) {
LOG_WARN("fail to start transaction", K(ret));
} else if (OB_FAIL(trans1.start(mysql_proxy, gen_meta_tenant_id(tenant_id)))) {
LOG_WARN("fail to start transaction", K(ret));
} else if (OB_FAIL(trans.write(tenant_id, del_tab_history.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(del_tab_history), K(ret));
} else if (OB_FAIL(trans.write(tenant_id, del_col_history.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(del_col_history), K(ret));
} else if (OB_FAIL(trans.write(tenant_id, del_hist_history.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(del_hist_history), K(ret));
} else if (OB_FAIL(trans1.write(gen_meta_tenant_id(tenant_id), del_task_opt_stat_gather_history.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(del_task_opt_stat_gather_history), K(ret));
} else if (OB_FAIL(trans1.write(gen_meta_tenant_id(tenant_id), del_table_opt_stat_gather_history.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(del_table_opt_stat_gather_history), K(ret));
} else if (OB_FAIL(remove_useless_column_stats(trans, tenant_id))) {
LOG_WARN("failed to remove useless column stats", K(ret));
} else {
LOG_TRACE("Succeed to do execute sql", K(del_tab_history), K(del_col_history),
K(del_hist_history), K(del_task_opt_stat_gather_history),
K(del_table_opt_stat_gather_history));
int64_t start_time = ObTimeUtility::current_time();
int64_t max_duration_time = BATCH_DELETE_MAX_QUERY_TIMEOUT;
if (THIS_WORKER.is_timeout_ts_valid()) {
max_duration_time = std::min(max_duration_time, THIS_WORKER.get_timeout_remain());
}
if (OB_SUCC(ret)) {
if (OB_FAIL(trans.end(true)) || OB_FAIL(trans1.end(true))) {
LOG_WARN("fail to commit transaction", K(ret));
int64_t delete_flags = ObOptStatsDeleteFlags::DELETE_ALL;
do {
ObMySQLTransaction trans;
ObMySQLTransaction trans1;
if (OB_FAIL(THIS_WORKER.check_status())) {
LOG_WARN("check status failed", KR(ret));
} else if (OB_FAIL(trans.start(ctx.get_sql_proxy(), tenant_id))) {
LOG_WARN("fail to start transaction", K(ret));
} else if (OB_FAIL(trans1.start(ctx.get_sql_proxy(), gen_meta_tenant_id(tenant_id)))) {
LOG_WARN("fail to start transaction", K(ret));
} else if ((delete_flags & ObOptStatsDeleteFlags::DELETE_TAB_STAT_HISTORY) &&
OB_FAIL(do_delete_expired_stat_history(trans, tenant_id, start_time,
max_duration_time, time_str.ptr(),
share::OB_ALL_TABLE_STAT_HISTORY_TNAME,
ObOptStatsDeleteFlags::DELETE_TAB_STAT_HISTORY,
delete_flags))) {
LOG_WARN("failed to do delete expired stat history", K(ret));
} else if ((delete_flags & ObOptStatsDeleteFlags::DELETE_COL_STAT_HISTORY) &&
OB_FAIL(do_delete_expired_stat_history(trans, tenant_id, start_time,
max_duration_time, time_str.ptr(),
share::OB_ALL_COLUMN_STAT_HISTORY_TNAME,
ObOptStatsDeleteFlags::DELETE_COL_STAT_HISTORY,
delete_flags))) {
LOG_WARN("failed to do delete expired stat history", K(ret));
} else if ((delete_flags & ObOptStatsDeleteFlags::DELETE_HIST_STAT_HISTORY) &&
OB_FAIL(do_delete_expired_stat_history(trans, tenant_id, start_time,
max_duration_time, time_str.ptr(),
share::OB_ALL_HISTOGRAM_STAT_HISTORY_TNAME,
ObOptStatsDeleteFlags::DELETE_HIST_STAT_HISTORY,
delete_flags))) {
LOG_WARN("failed to do delete expired stat history", K(ret));
} else if ((delete_flags & ObOptStatsDeleteFlags::DELETE_TASK_GATHER_HISTORY) &&
OB_FAIL(do_delete_expired_stat_history(trans1, gen_meta_tenant_id(tenant_id), start_time,
max_duration_time, gather_time_str.ptr(),
share::OB_ALL_TASK_OPT_STAT_GATHER_HISTORY_TNAME,
ObOptStatsDeleteFlags::DELETE_TASK_GATHER_HISTORY,
delete_flags))) {
LOG_WARN("failed to do delete expired stat history", K(ret));
} else if ((delete_flags & ObOptStatsDeleteFlags::DELETE_TAB_GATHER_HISTORY) &&
OB_FAIL(do_delete_expired_stat_history(trans1, gen_meta_tenant_id(tenant_id), start_time,
max_duration_time, gather_time_str.ptr(),
share::OB_ALL_TABLE_OPT_STAT_GATHER_HISTORY_TNAME,
ObOptStatsDeleteFlags::DELETE_TAB_GATHER_HISTORY,
delete_flags))) {
LOG_WARN("failed to do delete expired stat history", K(ret));
} else if ((delete_flags & ObOptStatsDeleteFlags::DELETE_USELESS_COL_STAT ||
delete_flags & ObOptStatsDeleteFlags::DELETE_USELESS_HIST_STAT) &&
OB_FAIL(remove_useless_column_stats(trans, tenant_id, start_time, max_duration_time, delete_flags))) {
LOG_WARN("failed to remove useless column stats", K(ret));
}
} else {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
LOG_WARN("fail to roll back transaction", K(tmp_ret));
} else if (OB_SUCCESS != (tmp_ret = trans1.end(false))) {
LOG_WARN("fail to roll back transaction", K(tmp_ret));
if (OB_SUCC(ret)) {
int tmp_ret1 = OB_SUCCESS;
int tmp_ret2 = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret1 = trans.end(true))) {
LOG_WARN("fail to commit transaction", K(tmp_ret1));
}
if (OB_SUCCESS != (tmp_ret2 = trans1.end(true))) {
LOG_WARN("fail to commit transaction", K(tmp_ret2));
}
ret = tmp_ret1 != OB_SUCCESS ? tmp_ret1 : tmp_ret2;
} else {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
LOG_WARN("fail to roll back transaction", K(tmp_ret));
}
if (OB_SUCCESS != (tmp_ret = trans1.end(false))) {
LOG_WARN("fail to roll back transaction", K(tmp_ret));
}
}
}
} while(OB_SUCC(ret) && !only_delete_one_batch && delete_flags != ObOptStatsDeleteFlags::DELETE_NONE);
}
return ret;
}
@ -1623,35 +1638,96 @@ int ObDbmsStatsHistoryManager::gen_column_list(const ObIArray<uint64_t> &column_
//here we remove useless column stats, because drop table won't delete column stats.
int ObDbmsStatsHistoryManager::remove_useless_column_stats(ObMySQLTransaction &trans,
uint64_t tenant_id)
uint64_t tenant_id,
const uint64_t start_time,
const uint64_t max_duration_time,
int64_t &delete_flags)
{
int ret = OB_SUCCESS;
ObSqlString delete_col_stat_sql;
ObSqlString delete_hist_stat_sql;
int64_t affected_rows1 = 0;
int64_t affected_rows2 = 0;
if (OB_FAIL(delete_col_stat_sql.append_fmt("DELETE FROM %s c WHERE (NOT EXISTS (SELECT 1 " \
"FROM %s t, %s db WHERE t.tenant_id = db.tenant_id AND t.database_id = db.database_id "\
"AND t.table_id = c.table_id AND t.tenant_id = c.tenant_id AND db.database_name != '__recyclebin')) "\
"AND table_id > %ld;",
share::OB_ALL_COLUMN_STAT_TNAME,
share::OB_ALL_TABLE_TNAME,
share::OB_ALL_DATABASE_TNAME,
OB_MAX_INNER_TABLE_ID))) {
} else if (OB_FAIL(delete_hist_stat_sql.append_fmt("DELETE FROM %s hist WHERE (NOT EXISTS (SELECT 1 " \
"FROM %s t, %s db WHERE t.tenant_id = db.tenant_id AND t.database_id = db.database_id "\
"AND t.table_id = hist.table_id AND t.tenant_id = hist.tenant_id AND db.database_name != '__recyclebin')) "\
"AND table_id > %ld;",
share::OB_ALL_HISTOGRAM_STAT_TNAME,
share::OB_ALL_TABLE_TNAME,
share::OB_ALL_DATABASE_TNAME,
OB_MAX_INNER_TABLE_ID))) {
} else if (OB_FAIL(trans.write(tenant_id, delete_col_stat_sql.ptr(), affected_rows1))) {
LOG_WARN("fail to exec sql", K(delete_col_stat_sql), K(ret));
} else if (OB_FAIL(trans.write(tenant_id, delete_hist_stat_sql.ptr(), affected_rows2))) {
LOG_WARN("fail to exec sql", K(delete_hist_stat_sql), K(ret));
int64_t affected_rows = 0;
int64_t query_timeout = 0;
if (delete_flags & ObOptStatsDeleteFlags::DELETE_USELESS_COL_STAT) {
if (OB_FAIL(ObDbmsStatsUtils::get_valid_duration_time(start_time,
max_duration_time,
query_timeout))) {
LOG_WARN("failed to get valid duration time", K(ret));
} else if (OB_FAIL(delete_col_stat_sql.append_fmt("DELETE /*+QUERY_TIMEOUT(%ld)*/ FROM %s c WHERE (NOT EXISTS (SELECT 1 " \
"FROM %s t, %s db WHERE t.tenant_id = db.tenant_id AND t.database_id = db.database_id "\
"AND t.table_id = c.table_id AND t.tenant_id = c.tenant_id AND db.database_name != '__recyclebin')) "\
"AND table_id > %ld limit %ld;",
query_timeout,
share::OB_ALL_COLUMN_STAT_TNAME,
share::OB_ALL_TABLE_TNAME,
share::OB_ALL_DATABASE_TNAME,
OB_MAX_INNER_TABLE_ID,
BATCH_DELETE_MAX_ROWCNT))) {
LOG_WARN("fail to append fmt", K(ret));
} else if (OB_FAIL(trans.write(tenant_id, delete_col_stat_sql.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(delete_col_stat_sql), K(ret));
} else {
delete_flags = affected_rows >= BATCH_DELETE_MAX_ROWCNT ? delete_flags : (delete_flags & ~ObOptStatsDeleteFlags::DELETE_USELESS_COL_STAT);
LOG_TRACE("succeed to clean useless col stat", K(tenant_id), K(delete_col_stat_sql),
K(affected_rows),K(delete_flags));
}
}
if (OB_SUCC(ret) && delete_flags & ObOptStatsDeleteFlags::DELETE_USELESS_HIST_STAT) {
if (OB_FAIL(ObDbmsStatsUtils::get_valid_duration_time(start_time,
max_duration_time,
query_timeout))) {
LOG_WARN("failed to get valid duration time", K(ret));
} else if (OB_FAIL(delete_hist_stat_sql.append_fmt("DELETE /*+QUERY_TIMEOUT(%ld)*/ FROM %s hist WHERE (NOT EXISTS (SELECT 1 " \
"FROM %s t, %s db WHERE t.tenant_id = db.tenant_id AND t.database_id = db.database_id "\
"AND t.table_id = hist.table_id AND t.tenant_id = hist.tenant_id AND db.database_name != '__recyclebin')) "\
"AND table_id > %ld limit %ld;",
query_timeout,
share::OB_ALL_HISTOGRAM_STAT_TNAME,
share::OB_ALL_TABLE_TNAME,
share::OB_ALL_DATABASE_TNAME,
OB_MAX_INNER_TABLE_ID,
BATCH_DELETE_MAX_ROWCNT))) {
LOG_WARN("fail to append fmt", K(ret));
} else if (OB_FAIL(trans.write(tenant_id, delete_hist_stat_sql.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(delete_hist_stat_sql), K(ret));
} else {
delete_flags = affected_rows >= BATCH_DELETE_MAX_ROWCNT ? delete_flags : (delete_flags & ~ObOptStatsDeleteFlags::DELETE_USELESS_HIST_STAT);
LOG_TRACE("succeed to clean useless hist stat", K(tenant_id), K(delete_hist_stat_sql),
K(affected_rows),K(delete_flags));
}
}
return ret;
}
int ObDbmsStatsHistoryManager::do_delete_expired_stat_history(ObMySQLTransaction &trans,
const uint64_t tenant_id,
const uint64_t start_time,
const uint64_t max_duration_time,
const char* specify_time_str,
const char* process_table_name,
ObOptStatsDeleteFlags cur_delete_flag,
int64_t &delete_flags)
{
int ret = OB_SUCCESS;
int64_t query_timeout = 0;
ObSqlString delete_sql;
int64_t affected_rows;
if (OB_FAIL(ObDbmsStatsUtils::get_valid_duration_time(start_time,
max_duration_time,
query_timeout))) {
LOG_WARN("failed to get valid duration time", K(ret));
} else if (OB_FAIL(delete_sql.append_fmt(DELETE_STAT_HISTORY,
query_timeout,
process_table_name,
specify_time_str,
BATCH_DELETE_MAX_ROWCNT))) {
LOG_WARN("failed to append sql stmt", K(ret), K(delete_sql));
} else if (OB_FAIL(trans.write(tenant_id, delete_sql.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(delete_sql), K(ret));
} else {
LOG_TRACE("Succeed to do execute sql", K(delete_col_stat_sql), K(delete_hist_stat_sql), K(affected_rows1), K(affected_rows2));
delete_flags = affected_rows >= BATCH_DELETE_MAX_ROWCNT ? delete_flags : (delete_flags & ~cur_delete_flag);
LOG_TRACE("Succeed to do delete expired stat history", K(tenant_id), K(delete_sql), K(affected_rows),
K(cur_delete_flag), K(delete_flags));
}
return ret;
}