diff --git a/src/gausskernel/storage/replication/logical/origin.cpp b/src/gausskernel/storage/replication/logical/origin.cpp index 4bed2d34f..c2f306824 100644 --- a/src/gausskernel/storage/replication/logical/origin.cpp +++ b/src/gausskernel/storage/replication/logical/origin.cpp @@ -962,6 +962,28 @@ static void ReplicationOriginExitCleanup(int code, Datum arg) cv = &u_sess->reporigin_cxt.curRepState->orginCV; mutex = &u_sess->reporigin_cxt.curRepState->originMutex; u_sess->reporigin_cxt.curRepState->acquired_by = 0; + + /* + * For table sync worker, clean the origin status when thread exit. + * Otherwise it will remain if worker exit abnormally. + */ + if (t_thrd.applyworker_cxt.curWorker && AM_TABLESYNC_WORKER) { + /* first WAL log */ + { + xl_replorigin_drop xlrec; + + xlrec.node_id = u_sess->reporigin_cxt.originId; + XLogBeginInsert(); + XLogRegisterData((char *)(&xlrec), sizeof(xlrec)); + XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP); + } + + /* then reset the in-memory entry */ + u_sess->reporigin_cxt.curRepState->roident = InvalidRepOriginId; + u_sess->reporigin_cxt.curRepState->remote_lsn = InvalidXLogRecPtr; + u_sess->reporigin_cxt.curRepState->local_lsn = InvalidXLogRecPtr; + } + u_sess->reporigin_cxt.curRepState = NULL; } diff --git a/src/test/subscription/testcase/bugs.sh b/src/test/subscription/testcase/bugs.sh index 5eaa09491..6590a92b0 100644 --- a/src/test/subscription/testcase/bugs.sh +++ b/src/test/subscription/testcase/bugs.sh @@ -133,6 +133,27 @@ create table t_pubsub_0349( fi exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = error" + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;DROP TABLE t_pubsub_0349" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;DROP TABLE t_pubsub_0349" + + # BUG4: fix pg_replication_origin_status remain + exec_sql $case_db $pub_node1_port "create table tab_rep (a int primary key, b int); insert into tab_rep values (1,1)" + exec_sql $case_db $sub_node1_port "create table tab_rep (a int primary key, b int); insert into tab_rep values (1,1)" + + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub;" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_replication_origin_status")" = "0" ]; then + echo "check if pg_replication_origin_status is empty success" + else + echo "$failed_keyword when check if pg_replication_origin_status is empty" + exit 1 + fi } function tear_down() {