Support to query rewrite by materialized view when join input has aggregate, the aggregate should be simple
For example as following:
The materialized view def is
> select
> l_linenumber,
> count(distinct l_orderkey),
> sum(case when l_orderkey in (1,2,3) then l_suppkey * l_linenumber else 0 end),
> max(case when l_orderkey in (4, 5) then (l_quantity *2 + part_supp_a.qty_max) * 0.88 else 100 end),
> avg(case when l_partkey in (2, 3, 4) then l_discount + o_totalprice + part_supp_a.qty_sum else 50 end)
> from lineitem
> left join orders on l_orderkey = o_orderkey
> left join
> (select ps_partkey, ps_suppkey, sum(ps_availqty) qty_sum, max(ps_availqty) qty_max,
> min(ps_availqty) qty_min,
> avg(ps_supplycost) cost_avg
> from partsupp
> group by ps_partkey,ps_suppkey) part_supp_a
> on l_partkey = part_supp_a.ps_partkey
> and l_suppkey = part_supp_a.ps_suppkey
> group by l_linenumber;
when query is like following, it can be rewritten by mv above
> select
> l_linenumber,
> sum(case when l_orderkey in (1,2,3) then l_suppkey * l_linenumber else 0 end),
> avg(case when l_partkey in (2, 3, 4) then l_discount + o_totalprice + part_supp_a.qty_sum else 50 end)
> from lineitem
> left join orders on l_orderkey = o_orderkey
> left join
> (select ps_partkey, ps_suppkey, sum(ps_availqty) qty_sum, max(ps_availqty) qty_max,
> min(ps_availqty) qty_min,
> avg(ps_supplycost) cost_avg
> from partsupp
> group by ps_partkey,ps_suppkey) part_supp_a
> on l_partkey = part_supp_a.ps_partkey
> and l_suppkey = part_supp_a.ps_suppkey
> group by l_linenumber;
In some scenarios, user has a huge amount of data and only a single replica was specified when creating the table, if one of the tablet is damaged, the table will not be able to be select. If the user does not care about the integrity of the data, they can use this variable to temporarily skip the bad tablet for querying and load the remaining data into a new table.
- IdToTask has no persistence, so the queried task will be lost once it is restarted.
- The cancel task does not update metadata after being removed from the running task.
- tvf displays an error when some fields in the query task result are empty
- cycle scheduling job should not be STOP when task fail
Before, drop stats operation need to call columns * followers times of isMaster() function and the same times of rpc to drop remote column stats. This pr is to reduce the rpc calls and use more efficient way to check master node instead of using isMaster()
1. Make sure instance when change params of StructInfo,Predicates.
2. Catch and record exception for every materialization context, this make sure that if throw exception when one materialization context rewrite, it will not influence others.
3. Support to mv rewrite when hava count function when aggregate without group by
1. do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join
tpcds1T, q48 improved from 4.x sec to 1.x sec
2. skip some redunant runtime filter
example: A join B on A.a1=B.b and A.a1 = A.a2
RF B.b->(A.a1, A.a2)
however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2
we skip RF(B.b->A.a2)
Issue Number: close #xxx
## Proposed changes
The current implement will persist all catalogs/databases of external catalogs, and only the master FE can handle hms events and make all slave nodes replay these events, this will bring some problems:
- The hms event processor ( `MetastoreEventsProcessor` ) can not consume the events in time. (Add journal log is a synchronized method, we can not speed up the consume rate by using concurrent processing, and each add-journal-log operation costs about tens of milliseconds) So the meta info of hive maybe out of date.
- Slave FE nodes maybe crashed if FE replays the journal logs of hms events failed. (In fact we have fixed some issues about this, but we can not make sure all the issues have been resolved)
- There are many journal logs which are produced by hms events, but in fact these logs are not used anymore after FE restart. It makes the start time of all FE nodes very long.
Now doris try to persis all databases/tables of external catalogs just to make sure that the dbId/tableId of databases/tables are the same through all FE nodes, it will be used by analysis jobs.
In this pr, we use a meta id manager called `ExternalMetaIdMgr` to manage these meta ids. On every loop when master fetches a batch of hms events, it handles the meta ids first and produce only one meta id mappings log, slave FE nodes will replay this log to sync the changes about these meta ids. `MetastoreEventsProcessor` will start on every FE nodes and try to consume these hms events as soon as possible.
## Further comments
I've submitted two prs ( #22869#21589 ) to speed up the consume rate of hms events before, it works fine when there are many `AlterTableEvent` / `DropTableEvent` on hive cluster. But the improvement is not that significant when most of hms events are partition-events. Unfortunately, we performed a cluster upgrade (upgrade spark 2.x to spark 3.x), maybe this is the reason that resulting in the majority of Hive Metastore events became partition-events. This is also the reason for the existence of this pull request.
Based on our observation, after merging this pull request, Doris is now capable of processing thousands of Hive Metastore events per second, compared to the previous capability of handling only a few dozen events.
```java
2023-12-07 05:17:03,518 INFO (replayer|105) [Env.replayJournal():2614] replayed journal id is 18287902, replay to journal id is 18287903
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventFactory.mergeEvents():188] Event size on catalog [xxx] before merge is [1947], after merge is [1849]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955309 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2022-05-27],partitionNameAfter:[partitions=2022-05-27]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955310 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20230318],partitionNameAfter:[pday=20230318]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955311 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20190826],partitionNameAfter:[pday=20190826]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955312 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2021-09-16],partitionNameAfter:[partitions=2021-09-16]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955314 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2020-04-26],partitionNameAfter:[partitions=2020-04-26]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955315 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20230702],partitionNameAfter:[pday=20230702]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955317 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20211019],partitionNameAfter:[pday=20211019]
...
2023-12-07 05:17:03,989 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357957252 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2021-08-27],partitionNameAfter:[partitions=2021-08-27]
2023-12-07 05:17:03,989 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357957253 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2022-02-05],partitionNameAfter:[partitions=2022-02-05]
2023-12-07 05:17:04,661 INFO (replayer|105) [Env.replayJournal():2614] replayed journal id is 18287903, replay to journal id is 18287904
2023-12-07 05:17:05,028 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventsProcessor.realRun():116] Events size are 587 on catalog [xxx]
2023-12-07 05:17:05,662 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventFactory.mergeEvents():188] Event size on catalog [xxx] before merge is [587], after merge is [587]
```
Sample analyze may write 0 result if getRowCount is not updated while analyzing. So we need to reanalyze the table if getRowCount > 0 and previous analyze row count is 0. Otherwise the stats for this table may stay 0 for ever before user load new data to this table.
eliminate redundant join when find hashing join condition
such as for plan:
```
T1 join T2 on T1.id = T2.id join T3 on T1.id = T3.id and T2.id = T3.id
```
we infer a new predicate T1.id = T2.id which is redundant. Therefore we need to eliminate it when find hash condition
This PR proposes mapping external catalog JSON types to String instead of JsonB in Apache Doris. This change is motivated by the realization that JDBC retrieves JSON data as a String JSON string, regardless of its storage format (Json(String) or Json(Binary)). Mapping to String streamlines data retrieval, simplifies write-backs, and ensures compatibility with all JSON(String) and JSON(Binary) functions, despite potentially misleading displays of JSON data as Strings in Doris. This approach avoids the performance overhead and complexity of converting each row of data from JsonB to String, making the process more efficient and elegant.
About Upgrade
To ensure query compatibility with existing Catalogs in the upgraded version,we currently still retain the capability to query external JSON types as JSONB. However, once you upgrade to the new version and either refresh the Catalog or create a new one, all external JSON types will be treated as Strings. To ensure consistent behavior,and possible future removal of support for JSON as JSONB query code, it is highly recommended that you manually refresh your Catalog as soon as possible after upgrading to the new version.