Support to query rewrite by materialized view when join input has aggregate, the aggregate should be simple
For example as following:
The materialized view def is
> select
> l_linenumber,
> count(distinct l_orderkey),
> sum(case when l_orderkey in (1,2,3) then l_suppkey * l_linenumber else 0 end),
> max(case when l_orderkey in (4, 5) then (l_quantity *2 + part_supp_a.qty_max) * 0.88 else 100 end),
> avg(case when l_partkey in (2, 3, 4) then l_discount + o_totalprice + part_supp_a.qty_sum else 50 end)
> from lineitem
> left join orders on l_orderkey = o_orderkey
> left join
> (select ps_partkey, ps_suppkey, sum(ps_availqty) qty_sum, max(ps_availqty) qty_max,
> min(ps_availqty) qty_min,
> avg(ps_supplycost) cost_avg
> from partsupp
> group by ps_partkey,ps_suppkey) part_supp_a
> on l_partkey = part_supp_a.ps_partkey
> and l_suppkey = part_supp_a.ps_suppkey
> group by l_linenumber;
when query is like following, it can be rewritten by mv above
> select
> l_linenumber,
> sum(case when l_orderkey in (1,2,3) then l_suppkey * l_linenumber else 0 end),
> avg(case when l_partkey in (2, 3, 4) then l_discount + o_totalprice + part_supp_a.qty_sum else 50 end)
> from lineitem
> left join orders on l_orderkey = o_orderkey
> left join
> (select ps_partkey, ps_suppkey, sum(ps_availqty) qty_sum, max(ps_availqty) qty_max,
> min(ps_availqty) qty_min,
> avg(ps_supplycost) cost_avg
> from partsupp
> group by ps_partkey,ps_suppkey) part_supp_a
> on l_partkey = part_supp_a.ps_partkey
> and l_suppkey = part_supp_a.ps_suppkey
> group by l_linenumber;
In some scenarios, user has a huge amount of data and only a single replica was specified when creating the table, if one of the tablet is damaged, the table will not be able to be select. If the user does not care about the integrity of the data, they can use this variable to temporarily skip the bad tablet for querying and load the remaining data into a new table.
- IdToTask has no persistence, so the queried task will be lost once it is restarted.
- The cancel task does not update metadata after being removed from the running task.
- tvf displays an error when some fields in the query task result are empty
- cycle scheduling job should not be STOP when task fail
Before, drop stats operation need to call columns * followers times of isMaster() function and the same times of rpc to drop remote column stats. This pr is to reduce the rpc calls and use more efficient way to check master node instead of using isMaster()
1. Make sure instance when change params of StructInfo,Predicates.
2. Catch and record exception for every materialization context, this make sure that if throw exception when one materialization context rewrite, it will not influence others.
3. Support to mv rewrite when hava count function when aggregate without group by
1. do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join
tpcds1T, q48 improved from 4.x sec to 1.x sec
2. skip some redunant runtime filter
example: A join B on A.a1=B.b and A.a1 = A.a2
RF B.b->(A.a1, A.a2)
however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2
we skip RF(B.b->A.a2)
Issue Number: close #xxx
## Proposed changes
The current implement will persist all catalogs/databases of external catalogs, and only the master FE can handle hms events and make all slave nodes replay these events, this will bring some problems:
- The hms event processor ( `MetastoreEventsProcessor` ) can not consume the events in time. (Add journal log is a synchronized method, we can not speed up the consume rate by using concurrent processing, and each add-journal-log operation costs about tens of milliseconds) So the meta info of hive maybe out of date.
- Slave FE nodes maybe crashed if FE replays the journal logs of hms events failed. (In fact we have fixed some issues about this, but we can not make sure all the issues have been resolved)
- There are many journal logs which are produced by hms events, but in fact these logs are not used anymore after FE restart. It makes the start time of all FE nodes very long.
Now doris try to persis all databases/tables of external catalogs just to make sure that the dbId/tableId of databases/tables are the same through all FE nodes, it will be used by analysis jobs.
In this pr, we use a meta id manager called `ExternalMetaIdMgr` to manage these meta ids. On every loop when master fetches a batch of hms events, it handles the meta ids first and produce only one meta id mappings log, slave FE nodes will replay this log to sync the changes about these meta ids. `MetastoreEventsProcessor` will start on every FE nodes and try to consume these hms events as soon as possible.
## Further comments
I've submitted two prs ( #22869#21589 ) to speed up the consume rate of hms events before, it works fine when there are many `AlterTableEvent` / `DropTableEvent` on hive cluster. But the improvement is not that significant when most of hms events are partition-events. Unfortunately, we performed a cluster upgrade (upgrade spark 2.x to spark 3.x), maybe this is the reason that resulting in the majority of Hive Metastore events became partition-events. This is also the reason for the existence of this pull request.
Based on our observation, after merging this pull request, Doris is now capable of processing thousands of Hive Metastore events per second, compared to the previous capability of handling only a few dozen events.
```java
2023-12-07 05:17:03,518 INFO (replayer|105) [Env.replayJournal():2614] replayed journal id is 18287902, replay to journal id is 18287903
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventFactory.mergeEvents():188] Event size on catalog [xxx] before merge is [1947], after merge is [1849]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955309 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2022-05-27],partitionNameAfter:[partitions=2022-05-27]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955310 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20230318],partitionNameAfter:[pday=20230318]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955311 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20190826],partitionNameAfter:[pday=20190826]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955312 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2021-09-16],partitionNameAfter:[partitions=2021-09-16]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955314 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2020-04-26],partitionNameAfter:[partitions=2020-04-26]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955315 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20230702],partitionNameAfter:[pday=20230702]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955317 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20211019],partitionNameAfter:[pday=20211019]
...
2023-12-07 05:17:03,989 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357957252 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2021-08-27],partitionNameAfter:[partitions=2021-08-27]
2023-12-07 05:17:03,989 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357957253 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2022-02-05],partitionNameAfter:[partitions=2022-02-05]
2023-12-07 05:17:04,661 INFO (replayer|105) [Env.replayJournal():2614] replayed journal id is 18287903, replay to journal id is 18287904
2023-12-07 05:17:05,028 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventsProcessor.realRun():116] Events size are 587 on catalog [xxx]
2023-12-07 05:17:05,662 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventFactory.mergeEvents():188] Event size on catalog [xxx] before merge is [587], after merge is [587]
```
Sample analyze may write 0 result if getRowCount is not updated while analyzing. So we need to reanalyze the table if getRowCount > 0 and previous analyze row count is 0. Otherwise the stats for this table may stay 0 for ever before user load new data to this table.