branch-2.1: [Improve](mtmv) skip the generation of invalid task for refresh mtmv #46280 (#46392)

Cherry-picked from #46280

Co-authored-by: shee <13843187+qzsee@users.noreply.github.com>
Co-authored-by: garenshi <garenshi@tencent.com>
This commit is contained in:
github-actions[bot]
2025-01-05 08:59:11 +08:00
committed by GitHub
parent a4c328c4cd
commit 12350ca867
3 changed files with 102 additions and 1 deletions

View File

@ -198,7 +198,7 @@ public class MTMVService implements EventListener {
try {
// check if mtmv should trigger by event
MTMV mtmv = (MTMV) MTMVUtil.getTable(baseTableInfo);
if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) {
if (canRefresh(mtmv, table)) {
jobManager.onCommit(mtmv);
}
} catch (Exception e) {
@ -206,4 +206,13 @@ public class MTMVService implements EventListener {
}
}
}
private boolean canRefresh(MTMV mtmv, TableIf table) {
if (mtmv.getExcludedTriggerTables().contains(table.getName())) {
LOG.info("skip refresh mtmv: {}, because exclude trigger table: {}",
mtmv.getName(), table.getName());
return false;
}
return mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT);
}
}

View File

@ -61,3 +61,23 @@
-- !mv1_replace --
3 2017-03-15 3
-- !mv_sag --
1 1 60
-- !task_sag --
{"triggerMode":"COMMIT","partitions":[],"isComplete":false}
-- !mv_sag1 --
1 1 60
-- !task_sag1 --
{"triggerMode":"COMMIT","partitions":[],"isComplete":false}
-- !mv_sag2 --
1 1 60
1 2 70
2 1 70
-- !task_sag2 --
{"triggerMode":"COMMIT","partitions":[],"isComplete":false}

View File

@ -149,4 +149,76 @@ suite("test_commit_mtmv") {
sql """drop materialized view if exists ${mvName2};"""
sql """drop table if exists `${tableName}`"""
//===========test excluded_trigger_tables===========
def tblStu = "test_commit_mtmv_tbl_stu"
def tblGrade = "test_commit_mtmv_tbl_grade"
def mvSag = "test_commit_mv_sag"
sql """drop materialized view if exists ${mvSag};"""
sql """drop table if exists `${tblStu}`"""
sql """drop table if exists `${tblGrade}`"""
sql """
CREATE TABLE `${tblStu}` (
`sid` int(32) NULL,
`sname` varchar(32) NULL,
) ENGINE=OLAP
DUPLICATE KEY(`sid`)
DISTRIBUTED BY HASH(`sid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
sql """
CREATE TABLE `${tblGrade}` (
`sid` int(32) NULL,
`cid` int(32) NULL,
`score` int NULL
) ENGINE=OLAP
DUPLICATE KEY(`sid`)
DISTRIBUTED BY HASH(`sid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
sql """
CREATE MATERIALIZED VIEW ${mvSag}
BUILD DEFERRED
REFRESH COMPLETE ON commit
DISTRIBUTED BY HASH(`sid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"excluded_trigger_tables" = "${tblGrade}"
)
AS select a.sid,b.cid,b.score from ${tblStu} a join ${tblGrade} b on a.sid = b.sid;
"""
sql """
insert into ${tblGrade} values(1, 1, 60);
insert into ${tblStu} values(1, 'sam');
"""
def sagJobName = getJobName(dbName, mvSag);
waitingMTMVTaskFinished(sagJobName)
order_qt_mv_sag "SELECT * FROM ${mvSag} order by sid,cid"
order_qt_task_sag "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1"
sql """
insert into ${tblGrade} values(1, 2, 70);
"""
waitingMTMVTaskFinished(sagJobName)
order_qt_mv_sag1 "SELECT * FROM ${mvSag} order by sid,cid"
order_qt_task_sag1 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1"
sql """
insert into ${tblGrade} values(2, 1, 70);
insert into ${tblStu} values(2, 'jack');
"""
waitingMTMVTaskFinished(sagJobName)
order_qt_mv_sag2 "SELECT * FROM ${mvSag} order by sid,cid"
order_qt_task_sag2 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1"
sql """drop materialized view if exists ${mvSag};"""
sql """drop table if exists `${tblStu}`"""
sql """drop table if exists `${tblGrade}`"""
}