clean residue temp table after gaussdb crash and restart

This commit is contained in:
LiHeng
2021-03-16 21:13:54 +08:00
parent 4285d38520
commit 08b8f9cb9a
10 changed files with 75 additions and 3 deletions

View File

@ -13,6 +13,7 @@
./bin/gs_initdb
./bin/gs_guc
./bin/encrypt
./bin/gs_clean
./bin/openssl
./bin/gs_restore
./bin/gs_cgroup

View File

@ -13,6 +13,7 @@
./bin/gs_initdb
./bin/gs_guc
./bin/encrypt
./bin/gs_clean
./bin/openssl
./bin/gs_restore
./bin/gs_cgroup

View File

@ -13,6 +13,7 @@
./bin/gs_initdb
./bin/gs_guc
./bin/encrypt
./bin/gs_clean
./bin/openssl
./bin/gs_restore
./bin/gs_cgroup

View File

@ -16,6 +16,7 @@
./bin/gs_basebackup
./bin/gs_probackup
./bin/encrypt
./bin/gs_clean
./bin/openssl
./bin/gs_restore
./bin/gs_cgroup

View File

@ -13,6 +13,7 @@
./bin/gs_initdb
./bin/gs_guc
./bin/encrypt
./bin/gs_clean
./bin/openssl
./bin/gs_restore
./bin/gs_cgroup

View File

@ -13,6 +13,7 @@
./bin/gs_initdb
./bin/gs_guc
./bin/encrypt
./bin/gs_clean
./bin/openssl
./bin/gs_restore
./bin/gs_cgroup

View File

@ -52,6 +52,7 @@ SUBDIRS = \
pg_controldata \
pg_resetxlog \
gs_guc \
pgxc_clean \
gsqlerr \
pg_basebackup \
pg_probackup

View File

@ -145,12 +145,16 @@ static void usage(void);
static void showVersion(void);
static PGconn* loginDatabase(char* host, int port, char* user, char* password, char* dbname, const char* progname,
char* encoding, const char* password_prompt);
#ifdef ENABLE_MULTIPLE_NODES
static void getMyNodename(PGconn* conn);
static void getMyGtmMode(PGconn* conn);
#endif
static void recover2PCForDatabase(database_info* db_info, CleanWorkerInfo* wkinfo);
static void recover2PC(PGconn* conn, txn_info* txn, int num);
static void getDatabaseList(PGconn* conn);
#ifdef ENABLE_MULTIPLE_NODES
static void getNodeList(PGconn* conn);
#endif
static void getPreparedTxnList(PGconn* conn);
static void getTxnInfoOnOtherNodesAll(CleanWorkerInfo* wkinfo);
static void do_commit(PGconn* conn, txn_info* txn, int num);
@ -166,8 +170,10 @@ static void dropTempSchemas(PGconn* conn, bool missing_ok);
static void dropTempSchema(PGconn* conn, char* nspname, bool missing_ok);
static void getTempSchemaListOnCN(PGconn* conn);
#ifdef ENABLE_MULTIPLE_NODES
static void getTempSchemaOnOneDN(PGconn* conn, int no);
static int getTempSchemaListOnDN(PGconn* conn);
#endif
static void checkAllocMem(void* p);
static void cleanTempSchemaList();
static void cleanBackendList();
@ -664,6 +670,7 @@ int main(int argc, char* argv[])
exit(1);
}
#ifdef ENABLE_MULTIPLE_NODES
/*
* Get my nodename (connected Coordinator) and gtm_mode
*/
@ -677,6 +684,7 @@ int main(int argc, char* argv[])
transfor_gtm_optoin(gtm_option, gtm_mode),
gtm_option_num);
}
#endif
/*
* Get available databases
@ -704,6 +712,7 @@ int main(int argc, char* argv[])
}
}
#ifdef ENABLE_MULTIPLE_NODES
/*
* Get list of Nodes
*
@ -728,6 +737,7 @@ int main(int argc, char* argv[])
pgxc_clean_node_info[ii].type == NODE_TYPE_COORD ? "coordinator" : "datanode");
}
}
#endif
/*
* Only clean temp schema if clean_temp_schema_only == true,
@ -741,6 +751,15 @@ int main(int argc, char* argv[])
}
exit(0);
}
#ifndef ENABLE_MULTIPLE_NODES
else {
PQfinish(coord_conn);
coord_conn = NULL;
write_stderr("%s %s: We only clean temp schema in openGauss, please specify '-e' option.\n",
formatLogTime(), progname);
exit(0);
}
#endif
/*
* Only clean plan_table if clean_plan_table_only == true,
@ -1114,7 +1133,9 @@ static void cleanPlanTableSessidList()
static void dropTempNamespace()
{
database_info* cur_database = NULL;
#ifdef ENABLE_MULTIPLE_NODES
int datanode_no = -1;
#endif
if (head_database_info != NULL) {
for (cur_database = head_database_info; cur_database != NULL; cur_database = cur_database->next) {
@ -1166,10 +1187,12 @@ static void dropTempNamespace()
(void)dropTempSchemas(coord_conn, true);
(void)cleanBackendList();
#ifdef ENABLE_MULTIPLE_NODES
datanode_no = getTempSchemaListOnDN(coord_conn);
(void)getActiveBackendListOnCN(coord_conn, datanode_no);
(void)dropTempSchemas(coord_conn, true);
(void)cleanBackendList();
#endif
if (verbose_opt)
write_stderr("%s %s: drop temp namespace for database \"%s\" finished\n",
@ -1186,10 +1209,12 @@ static void dropTempSchemas(PGconn* conn, bool missing_ok)
{
tempschema_info* cell1 = temp_schema_info;
activebackend_info* cell2 = NULL;
char node_name[NAMEDATALEN];
char temp_buffer[NAMEDATALEN] = {0};
char temp_buffer2[NAMEDATALEN] = {0};
#ifdef ENABLE_MULTIPLE_NODES
char node_name[NAMEDATALEN];
bool onOtherCNNode = false;
#endif
errno_t rc;
if (temp_schema_info == NULL || active_backend_info == NULL)
@ -1229,6 +1254,7 @@ static void dropTempSchemas(PGconn* conn, bool missing_ok)
}
if (cell2 == NULL) {
#ifdef ENABLE_MULTIPLE_NODES
/*
* now it is not the active backend, furthermore, we need to
* check whether the schema on the other node:
@ -1303,6 +1329,9 @@ static void dropTempSchemas(PGconn* conn, bool missing_ok)
if (!onOtherCNNode)
dropTempSchema(conn, cell1->tempschema_name, missing_ok);
}
#else
dropTempSchema(conn, cell1->tempschema_name, missing_ok);
#endif
}
cell1 = cell1->next;
@ -1414,6 +1443,7 @@ static char* pg_strdup(const char* string)
return tmp;
}
#ifdef ENABLE_MULTIPLE_NODES
static void getMyNodename(PGconn* conn)
{
static const char* stmt = "SELECT pgxc_node_str()";
@ -1457,6 +1487,7 @@ static void getMyGtmMode(PGconn* conn)
}
PQclear(res);
}
#endif
static void recover2PCForDatabase(database_info* db_info, CleanWorkerInfo* wkinfo)
{
@ -2321,6 +2352,7 @@ static void add_active_backend_info(int64 sessionID, uint32 tempID, uint32 timeL
}
}
#ifdef ENABLE_MULTIPLE_NODES
/*
* @Description: get temp schema list of a random DN.
* @param[IN] conn: node connection handle
@ -2414,6 +2446,7 @@ static void getTempSchemaOnOneDN(PGconn* conn, int no)
PQclear(res);
}
#endif
static void getTempSchemaListOnCN(PGconn* conn)
{
@ -2425,11 +2458,18 @@ static void getTempSchemaListOnCN(PGconn* conn)
char STMT_GET_TEMP_SCHEMA_LIST[NAMEDATALEN + 128] = {0};
#ifdef ENABLE_MULTIPLE_NODES
rc = snprintf_s(STMT_GET_TEMP_SCHEMA_LIST,
sizeof(STMT_GET_TEMP_SCHEMA_LIST),
sizeof(STMT_GET_TEMP_SCHEMA_LIST) - 1,
"SELECT NSPNAME FROM PG_NAMESPACE WHERE NSPNAME LIKE 'pg_temp_%s_%%'",
my_nodename);
#else
rc = snprintf_s(STMT_GET_TEMP_SCHEMA_LIST,
sizeof(STMT_GET_TEMP_SCHEMA_LIST),
sizeof(STMT_GET_TEMP_SCHEMA_LIST) - 1,
"SELECT NSPNAME FROM PG_NAMESPACE WHERE NSPNAME LIKE 'pg_temp_%%'");
#endif
securec_check_ss_c(rc, "\0", "\0");
if (verbose_opt)
@ -2549,6 +2589,7 @@ static void checkAllocMem(void* p)
}
}
#ifdef ENABLE_MULTIPLE_NODES
static void getNodeList(PGconn* conn)
{
int ii;
@ -2631,6 +2672,7 @@ error_exit:
PQclear(res);
exit(1);
}
#endif
static char* transfor_gtm_optoin(const char* option, const char* gtm_free_mode)
{

View File

@ -2836,7 +2836,14 @@ static int ServerLoop(void)
g_instance.pid_cxt.CPMonitorPID = initialize_util_thread(WLM_CPMONITOR);
/* If we have lost the twophase cleaner, try to start a new one */
if (IS_PGXC_COORDINATOR && u_sess->attr.attr_common.upgrade_mode != 1 &&
if (
#ifdef ENABLE_MULTIPLE_NODES
IS_PGXC_COORDINATOR &&
#else
(t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE ||
t_thrd.postmaster_cxt.HaShmData->current_mode == PRIMARY_MODE) &&
#endif
u_sess->attr.attr_common.upgrade_mode != 1 &&
g_instance.pid_cxt.TwoPhaseCleanerPID == 0 && pmState == PM_RUN)
g_instance.pid_cxt.TwoPhaseCleanerPID = initialize_util_thread(TWOPASECLEANER);
@ -5065,7 +5072,14 @@ static void reaper(SIGNAL_ARGS)
t_thrd.postmaster_cxt.audit_primary_start = false;
}
if (IS_PGXC_COORDINATOR && u_sess->attr.attr_common.upgrade_mode != 1 &&
if (
#ifdef ENABLE_MULTIPLE_NODES
IS_PGXC_COORDINATOR &&
#else
(t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE ||
t_thrd.postmaster_cxt.HaShmData->current_mode == PRIMARY_MODE) &&
#endif
u_sess->attr.attr_common.upgrade_mode != 1 &&
g_instance.pid_cxt.TwoPhaseCleanerPID == 0)
g_instance.pid_cxt.TwoPhaseCleanerPID = initialize_util_thread(TWOPASECLEANER);

View File

@ -154,6 +154,7 @@ NON_EXEC_STATIC void TwoPhaseCleanerMain()
ereport(DEBUG5, (errmsg("failed to invoke get_prog_path()")));
} else {
/* if we find explicit cn listen address, we use tcp connection instead of unix socket */
#ifdef ENABLE_MULTIPLE_NODES
#ifdef USE_ASSERT_CHECKING
rc = sprintf_s(cmd,
sizeof(cmd),
@ -169,6 +170,14 @@ NON_EXEC_STATIC void TwoPhaseCleanerMain()
g_instance.attr.attr_network.PoolerPort,
u_sess->attr.attr_storage.twophase_clean_workers);
securec_check_ss(rc, "\0", "\0");
#endif
#else
rc = sprintf_s(cmd,
sizeof(cmd),
"gs_clean -a -p %d -e -v -r -j %d > /dev/null 2>&1",
g_instance.attr.attr_network.PoolerPort,
u_sess->attr.attr_storage.twophase_clean_workers);
securec_check_ss(rc, "\0", "\0");
#endif
socket_close_on_exec();