Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2254956

Browse filesBrowse files
qzseegarenshi
and
garenshi
authored
[Improve](mtmv) skip the generation of invalid task for refresh mtmv (#46280)
### What problem does this PR solve? We specified the `excluded_trigger_tables = 'a'` attribute when creating the materialized view. If table `a` is updated frequently, many invalid tasks will be generated, and these tasks do not really refresh the mv, which is unreasonable, too many invalid tasks will wash away useful task information Co-authored-by: garenshi <garenshi@tencent.com>
1 parent ba28c94 commit 2254956
Copy full SHA for 2254956

File tree

3 files changed

+102
-1
lines changed
Filter options

3 files changed

+102
-1
lines changed

‎fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java

Copy file name to clipboardExpand all lines: fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
+10-1Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,21 @@ public void processEvent(Event event) throws EventException {
198198
try {
199199
// check if mtmv should trigger by event
200200
MTMV mtmv = (MTMV) MTMVUtil.getTable(baseTableInfo);
201-
if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) {
201+
if (canRefresh(mtmv, table)) {
202202
jobManager.onCommit(mtmv);
203203
}
204204
} catch (Exception e) {
205205
throw new EventException(e);
206206
}
207207
}
208208
}
209+
210+
private boolean canRefresh(MTMV mtmv, TableIf table) {
211+
if (mtmv.getExcludedTriggerTables().contains(table.getName())) {
212+
LOG.info("skip refresh mtmv: {}, because exclude trigger table: {}",
213+
mtmv.getName(), table.getName());
214+
return false;
215+
}
216+
return mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT);
217+
}
209218
}

‎regression-test/data/mtmv_p0/test_commit_mtmv.out

Copy file name to clipboardExpand all lines: regression-test/data/mtmv_p0/test_commit_mtmv.out
+20Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,23 @@
6161
-- !mv1_replace --
6262
3 2017-03-15 3
6363

64+
-- !mv_sag --
65+
1 1 60
66+
67+
-- !task_sag --
68+
{"triggerMode":"COMMIT","partitions":[],"isComplete":false}
69+
70+
-- !mv_sag1 --
71+
1 1 60
72+
73+
-- !task_sag1 --
74+
{"triggerMode":"COMMIT","partitions":[],"isComplete":false}
75+
76+
-- !mv_sag2 --
77+
1 1 60
78+
1 2 70
79+
2 1 70
80+
81+
-- !task_sag2 --
82+
{"triggerMode":"COMMIT","partitions":[],"isComplete":false}
83+

‎regression-test/suites/mtmv_p0/test_commit_mtmv.groovy

Copy file name to clipboardExpand all lines: regression-test/suites/mtmv_p0/test_commit_mtmv.groovy
+72Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,76 @@ suite("test_commit_mtmv") {
149149
sql """drop materialized view if exists ${mvName2};"""
150150
sql """drop table if exists `${tableName}`"""
151151

152+
//===========test excluded_trigger_tables===========
153+
def tblStu = "test_commit_mtmv_tbl_stu"
154+
def tblGrade = "test_commit_mtmv_tbl_grade"
155+
def mvSag = "test_commit_mv_sag"
156+
sql """drop materialized view if exists ${mvSag};"""
157+
sql """drop table if exists `${tblStu}`"""
158+
sql """drop table if exists `${tblGrade}`"""
159+
sql """
160+
CREATE TABLE `${tblStu}` (
161+
`sid` int(32) NULL,
162+
`sname` varchar(32) NULL,
163+
) ENGINE=OLAP
164+
DUPLICATE KEY(`sid`)
165+
DISTRIBUTED BY HASH(`sid`) BUCKETS 1
166+
PROPERTIES (
167+
"replication_allocation" = "tag.location.default: 1"
168+
);
169+
"""
170+
171+
sql """
172+
CREATE TABLE `${tblGrade}` (
173+
`sid` int(32) NULL,
174+
`cid` int(32) NULL,
175+
`score` int NULL
176+
) ENGINE=OLAP
177+
DUPLICATE KEY(`sid`)
178+
DISTRIBUTED BY HASH(`sid`) BUCKETS 1
179+
PROPERTIES (
180+
"replication_allocation" = "tag.location.default: 1"
181+
);
182+
"""
183+
184+
sql """
185+
CREATE MATERIALIZED VIEW ${mvSag}
186+
BUILD DEFERRED
187+
REFRESH COMPLETE ON commit
188+
DISTRIBUTED BY HASH(`sid`) BUCKETS 1
189+
PROPERTIES (
190+
"replication_allocation" = "tag.location.default: 1",
191+
"excluded_trigger_tables" = "${tblGrade}"
192+
)
193+
AS select a.sid,b.cid,b.score from ${tblStu} a join ${tblGrade} b on a.sid = b.sid;
194+
"""
195+
196+
sql """
197+
insert into ${tblGrade} values(1, 1, 60);
198+
insert into ${tblStu} values(1, 'sam');
199+
"""
200+
def sagJobName = getJobName(dbName, mvSag);
201+
waitingMTMVTaskFinished(sagJobName)
202+
order_qt_mv_sag "SELECT * FROM ${mvSag} order by sid,cid"
203+
order_qt_task_sag "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1"
204+
205+
sql """
206+
insert into ${tblGrade} values(1, 2, 70);
207+
"""
208+
waitingMTMVTaskFinished(sagJobName)
209+
order_qt_mv_sag1 "SELECT * FROM ${mvSag} order by sid,cid"
210+
order_qt_task_sag1 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1"
211+
212+
sql """
213+
insert into ${tblGrade} values(2, 1, 70);
214+
insert into ${tblStu} values(2, 'jack');
215+
"""
216+
217+
waitingMTMVTaskFinished(sagJobName)
218+
order_qt_mv_sag2 "SELECT * FROM ${mvSag} order by sid,cid"
219+
order_qt_task_sag2 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1"
220+
221+
sql """drop materialized view if exists ${mvSag};"""
222+
sql """drop table if exists `${tblStu}`"""
223+
sql """drop table if exists `${tblGrade}`"""
152224
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.