1. Analyze what mode of cache can be used by query 2. Query cache before executing query in StmtExecutor 3. Two cache mode, sqlcache and partitioncache, are implemented
This commit is contained in:
@ -47,6 +47,12 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
|
||||
public ArrayList<NodeType> getChildren() { return children; }
|
||||
public void clearChildren() { children.clear(); }
|
||||
|
||||
public void removeNode(int i){
|
||||
if (children != null && i>=0 && i< children.size()) {
|
||||
children.remove(i);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Count the total number of nodes in this tree. Leaf node will return 1.
|
||||
* Non-leaf node will include all its children.
|
||||
|
||||
@ -59,6 +59,7 @@ public class ProfileManager {
|
||||
public static final String SQL_STATEMENT = "Sql Statement";
|
||||
public static final String USER = "User";
|
||||
public static final String DEFAULT_DB = "Default Db";
|
||||
public static final String IS_CACHED = "IsCached";
|
||||
|
||||
public static final ArrayList<String> PROFILE_HEADERS = new ArrayList(
|
||||
Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE,
|
||||
|
||||
@ -65,6 +65,11 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.proto.PQueryStatistics;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.cache.Cache;
|
||||
import org.apache.doris.qe.cache.CacheAnalyzer;
|
||||
import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
|
||||
import org.apache.doris.qe.cache.CacheBeProxy;
|
||||
import org.apache.doris.qe.cache.CacheProxy;
|
||||
import org.apache.doris.rewrite.ExprRewriter;
|
||||
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
|
||||
import org.apache.doris.rpc.RpcException;
|
||||
@ -114,6 +119,7 @@ public class StmtExecutor {
|
||||
private boolean isProxy;
|
||||
private ShowResultSet proxyResultSet = null;
|
||||
private PQueryStatistics statisticsForAuditLog;
|
||||
private boolean isCached;
|
||||
|
||||
// this constructor is mainly for proxy
|
||||
public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) {
|
||||
@ -155,6 +161,8 @@ public class StmtExecutor {
|
||||
summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser());
|
||||
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase());
|
||||
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt);
|
||||
summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ? "Yes" : "No");
|
||||
|
||||
profile.addChild(summaryProfile);
|
||||
if (coord != null) {
|
||||
coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond));
|
||||
@ -581,6 +589,78 @@ public class StmtExecutor {
|
||||
context.getState().setOk();
|
||||
}
|
||||
|
||||
private void sendChannel(MysqlChannel channel, List<CacheProxy.CacheValue> cacheValues, boolean isEos)
|
||||
throws Exception {
|
||||
RowBatch batch = null;
|
||||
for (CacheBeProxy.CacheValue value : cacheValues) {
|
||||
batch = value.getRowBatch();
|
||||
for (ByteBuffer row : batch.getBatch().getRows()) {
|
||||
channel.sendOnePacket(row);
|
||||
}
|
||||
context.updateReturnRows(batch.getBatch().getRows().size());
|
||||
}
|
||||
if (isEos) {
|
||||
if (batch != null) {
|
||||
statisticsForAuditLog = batch.getQueryStatistics();
|
||||
}
|
||||
context.getState().setEof();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel) throws Exception {
|
||||
RowBatch batch = null;
|
||||
CacheBeProxy.FetchCacheResult cacheResult = cacheAnalyzer.getCacheData();
|
||||
CacheMode mode = cacheAnalyzer.getCacheMode();
|
||||
if (cacheResult != null) {
|
||||
isCached = true;
|
||||
if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) {
|
||||
sendChannel(channel, cacheResult.getValueList(), true);
|
||||
return true;
|
||||
}
|
||||
//rewrite sql
|
||||
if (mode == CacheMode.Partition) {
|
||||
if (cacheAnalyzer.getHitRange() == Cache.HitRange.Left) {
|
||||
sendChannel(channel, cacheResult.getValueList(), false);
|
||||
}
|
||||
SelectStmt newSelectStmt = cacheAnalyzer.getRewriteStmt();
|
||||
newSelectStmt.reset();
|
||||
analyzer = new Analyzer(context.getCatalog(), context);
|
||||
newSelectStmt.analyze(analyzer);
|
||||
planner = new Planner();
|
||||
planner.plan(newSelectStmt, analyzer, context.getSessionVariable().toThrift());
|
||||
}
|
||||
}
|
||||
|
||||
coord = new Coordinator(context, analyzer, planner);
|
||||
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
|
||||
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
|
||||
coord.exec();
|
||||
|
||||
while (true) {
|
||||
batch = coord.getNext();
|
||||
if (batch.getBatch() != null) {
|
||||
cacheAnalyzer.copyRowBatch(batch);
|
||||
for (ByteBuffer row : batch.getBatch().getRows()) {
|
||||
channel.sendOnePacket(row);
|
||||
}
|
||||
context.updateReturnRows(batch.getBatch().getRows().size());
|
||||
}
|
||||
if (batch.isEos()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
|
||||
sendChannel(channel, cacheResult.getValueList(), false);
|
||||
}
|
||||
|
||||
cacheAnalyzer.updateCache();
|
||||
statisticsForAuditLog = batch.getQueryStatistics();
|
||||
context.getState().setEof();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Process a select statement.
|
||||
private void handleQueryStmt() throws Exception {
|
||||
// Every time set no send flag and clean all data in buffer
|
||||
@ -601,12 +681,6 @@ public class StmtExecutor {
|
||||
handleExplainStmt(explainString);
|
||||
return;
|
||||
}
|
||||
coord = new Coordinator(context, analyzer, planner);
|
||||
|
||||
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
|
||||
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
|
||||
|
||||
coord.exec();
|
||||
|
||||
// send result
|
||||
// 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx,
|
||||
@ -620,6 +694,21 @@ public class StmtExecutor {
|
||||
MysqlChannel channel = context.getMysqlChannel();
|
||||
boolean isOutfileQuery = queryStmt.hasOutFileClause();
|
||||
boolean isSendFields = false;
|
||||
if (!isOutfileQuery) {
|
||||
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
|
||||
}
|
||||
|
||||
//Sql and PartitionCache
|
||||
CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner);
|
||||
if (cacheAnalyzer.enableCache()) {
|
||||
handleCacheStmt(cacheAnalyzer, channel);
|
||||
return;
|
||||
}
|
||||
|
||||
coord = new Coordinator(context, analyzer, planner);
|
||||
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
|
||||
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
|
||||
coord.exec();
|
||||
while (true) {
|
||||
batch = coord.getNext();
|
||||
// for outfile query, there will be only one empty batch send back with eos flag
|
||||
@ -632,8 +721,8 @@ public class StmtExecutor {
|
||||
}
|
||||
for (ByteBuffer row : batch.getBatch().getRows()) {
|
||||
channel.sendOnePacket(row);
|
||||
}
|
||||
context.updateReturnRows(batch.getBatch().getRows().size());
|
||||
}
|
||||
context.updateReturnRows(batch.getBatch().getRows().size());
|
||||
}
|
||||
if (batch.isEos()) {
|
||||
break;
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
package org.apache.doris.qe.cache;
|
||||
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
//import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.Status;
|
||||
import org.apache.doris.qe.RowBatch;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
@ -38,8 +38,8 @@ public abstract class Cache {
|
||||
|
||||
protected TUniqueId queryId;
|
||||
protected SelectStmt selectStmt;
|
||||
//protected RowBatchBuilder rowBatchBuilder;
|
||||
//protected CacheAnalyzer.CacheTable latestTable;
|
||||
protected RowBatchBuilder rowBatchBuilder;
|
||||
protected CacheAnalyzer.CacheTable latestTable;
|
||||
protected CacheProxy proxy;
|
||||
protected HitRange hitRange;
|
||||
|
||||
@ -72,7 +72,6 @@ public abstract class Cache {
|
||||
public abstract void updateCache();
|
||||
|
||||
protected boolean checkRowLimit() {
|
||||
/*
|
||||
if (rowBatchBuilder == null) {
|
||||
return false;
|
||||
}
|
||||
@ -82,7 +81,6 @@ public abstract class Cache {
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}*/
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
451
fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
vendored
Normal file
451
fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
vendored
Normal file
@ -0,0 +1,451 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.qe.cache;
|
||||
|
||||
import org.apache.doris.analysis.AggregateInfo;
|
||||
import org.apache.doris.analysis.BinaryPredicate;
|
||||
import org.apache.doris.analysis.CastExpr;
|
||||
import org.apache.doris.analysis.CompoundPredicate;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.InlineViewRef;
|
||||
import org.apache.doris.analysis.QueryStmt;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.analysis.TableRef;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.RangePartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.RowBatch;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.Status;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Analyze which caching mode a SQL is suitable for
|
||||
* 1. T + 1 update is suitable for SQL mode
|
||||
* 2. Partition by date, update the data of the day in near real time, which is suitable for Partition mode
|
||||
*/
|
||||
public class CacheAnalyzer {
|
||||
private static final Logger LOG = LogManager.getLogger(CacheAnalyzer.class);
|
||||
|
||||
/**
|
||||
* NoNeed : disable config or variable, not query, not scan table etc.
|
||||
*/
|
||||
public enum CacheMode {
|
||||
NoNeed,
|
||||
None,
|
||||
TTL,
|
||||
Sql,
|
||||
Partition
|
||||
}
|
||||
|
||||
private ConnectContext context;
|
||||
private boolean enableSqlCache = false;
|
||||
private boolean enablePartitionCache = false;
|
||||
private TUniqueId queryId;
|
||||
private CacheMode cacheMode;
|
||||
private CacheTable latestTable;
|
||||
private StatementBase parsedStmt;
|
||||
private SelectStmt selectStmt;
|
||||
private List<ScanNode> scanNodes;
|
||||
private OlapTable olapTable;
|
||||
private RangePartitionInfo partitionInfo;
|
||||
private Column partColumn;
|
||||
private CompoundPredicate partitionPredicate;
|
||||
private Cache cache;
|
||||
|
||||
public Cache getCache() {
|
||||
return cache;
|
||||
}
|
||||
|
||||
public CacheAnalyzer(ConnectContext context, StatementBase parsedStmt, Planner planner) {
|
||||
this.context = context;
|
||||
this.queryId = context.queryId();
|
||||
this.parsedStmt = parsedStmt;
|
||||
scanNodes = planner.getScanNodes();
|
||||
latestTable = new CacheTable();
|
||||
checkCacheConfig();
|
||||
}
|
||||
|
||||
//for unit test
|
||||
public CacheAnalyzer(ConnectContext context, StatementBase parsedStmt, List<ScanNode> scanNodes) {
|
||||
this.context = context;
|
||||
this.parsedStmt = parsedStmt;
|
||||
this.scanNodes = scanNodes;
|
||||
checkCacheConfig();
|
||||
}
|
||||
|
||||
private void checkCacheConfig() {
|
||||
if (Config.cache_enable_sql_mode) {
|
||||
if (context.getSessionVariable().isEnableSqlCache()) {
|
||||
enableSqlCache = true;
|
||||
}
|
||||
}
|
||||
if (Config.cache_enable_partition_mode) {
|
||||
if (context.getSessionVariable().isEnablePartitionCache()) {
|
||||
enablePartitionCache = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public CacheMode getCacheMode() {
|
||||
return cacheMode;
|
||||
}
|
||||
|
||||
public class CacheTable implements Comparable<CacheTable> {
|
||||
public OlapTable olapTable;
|
||||
public long latestPartitionId;
|
||||
public long latestVersion;
|
||||
public long latestTime;
|
||||
|
||||
public CacheTable() {
|
||||
olapTable = null;
|
||||
latestPartitionId = 0;
|
||||
latestVersion = 0;
|
||||
latestTime = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(CacheTable table) {
|
||||
return (int) (table.latestTime - this.latestTime);
|
||||
}
|
||||
|
||||
public void Debug() {
|
||||
LOG.debug("table {}, partition id {}, ver {}, time {}", olapTable.getName(), latestPartitionId, latestVersion, latestTime);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean enableCache() {
|
||||
return enableSqlCache || enablePartitionCache;
|
||||
}
|
||||
|
||||
public boolean enableSqlCache() {
|
||||
return enableSqlCache;
|
||||
}
|
||||
|
||||
public boolean enablePartitionCache() {
|
||||
return enablePartitionCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check cache mode with SQL and table
|
||||
* 1、Only Olap table
|
||||
* 2、The update time of the table is before Config.last_version_interval_time
|
||||
* 2、PartitionType is PartitionType.RANGE, and partition key has only one column
|
||||
* 4、Partition key must be included in the group by clause
|
||||
* 5、Where clause must contain only one partition key predicate
|
||||
* CacheMode.Sql
|
||||
* xxx FROM user_profile, updated before Config.last_version_interval_time
|
||||
* CacheMode.Partition, partition by event_date, only the partition of today will be updated.
|
||||
* SELECT xxx FROM app_event WHERE event_date >= 20191201 AND event_date <= 20191207 GROUP BY event_date
|
||||
* SELECT xxx FROM app_event INNER JOIN user_Profile ON app_event.user_id = user_profile.user_id xxx
|
||||
* SELECT xxx FROM app_event INNER JOIN user_profile ON xxx INNER JOIN site_channel ON xxx
|
||||
*/
|
||||
public void checkCacheMode(long now) {
|
||||
cacheMode = innerCheckCacheMode(now);
|
||||
}
|
||||
|
||||
private CacheMode innerCheckCacheMode(long now) {
|
||||
if (!enableCache()) {
|
||||
return CacheMode.NoNeed;
|
||||
}
|
||||
if (!(parsedStmt instanceof SelectStmt) || scanNodes.size() == 0) {
|
||||
return CacheMode.NoNeed;
|
||||
}
|
||||
MetricRepo.COUNTER_QUERY_TABLE.increase(1L);
|
||||
|
||||
this.selectStmt = (SelectStmt) parsedStmt;
|
||||
//Check the last version time of the table
|
||||
List<CacheTable> tblTimeList = Lists.newArrayList();
|
||||
for (int i = 0; i < scanNodes.size(); i++) {
|
||||
ScanNode node = scanNodes.get(i);
|
||||
if (!(node instanceof OlapScanNode)) {
|
||||
return CacheMode.None;
|
||||
}
|
||||
OlapScanNode oNode = (OlapScanNode) node;
|
||||
OlapTable oTable = oNode.getOlapTable();
|
||||
CacheTable cTable = getLastUpdateTime(oTable);
|
||||
tblTimeList.add(cTable);
|
||||
}
|
||||
MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
|
||||
Collections.sort(tblTimeList);
|
||||
latestTable = tblTimeList.get(0);
|
||||
latestTable.Debug();
|
||||
|
||||
if (now == 0) {
|
||||
now = nowtime();
|
||||
}
|
||||
if (enableSqlCache() &&
|
||||
(now - latestTable.latestTime) >= Config.cache_last_version_interval_second * 1000) {
|
||||
LOG.debug("TIME:{},{},{}", now, latestTable.latestTime, Config.cache_last_version_interval_second*1000);
|
||||
cache = new SqlCache(this.queryId, this.selectStmt);
|
||||
((SqlCache) cache).setCacheInfo(this.latestTable);
|
||||
MetricRepo.COUNTER_CACHE_MODE_SQL.increase(1L);
|
||||
return CacheMode.Sql;
|
||||
}
|
||||
|
||||
if (!enablePartitionCache()) {
|
||||
return CacheMode.None;
|
||||
}
|
||||
|
||||
//Check if selectStmt matches partition key
|
||||
//Only one table can be updated in Config.cache_last_version_interval_second range
|
||||
for (int i = 1; i < tblTimeList.size(); i++) {
|
||||
if ((now - tblTimeList.get(i).latestTime) < Config.cache_last_version_interval_second * 1000) {
|
||||
LOG.info("the time of other tables is newer than {}", Config.cache_last_version_interval_second);
|
||||
return CacheMode.None;
|
||||
}
|
||||
}
|
||||
olapTable = latestTable.olapTable;
|
||||
if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) {
|
||||
LOG.debug("the partition of OlapTable not RANGE type");
|
||||
return CacheMode.None;
|
||||
}
|
||||
partitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
|
||||
List<Column> columns = partitionInfo.getPartitionColumns();
|
||||
//Partition key has only one column
|
||||
if (columns.size() != 1) {
|
||||
LOG.info("the size of columns for partition key is {}", columns.size());
|
||||
return CacheMode.None;
|
||||
}
|
||||
partColumn = columns.get(0);
|
||||
//Check if group expr contain partition column
|
||||
if (!checkGroupByPartitionKey(this.selectStmt, partColumn)) {
|
||||
LOG.info("not group by partition key, key {}", partColumn.getName());
|
||||
return CacheMode.None;
|
||||
}
|
||||
//Check if whereClause have one CompoundPredicate of partition column
|
||||
List<CompoundPredicate> compoundPredicates = Lists.newArrayList();
|
||||
getPartitionKeyFromSelectStmt(this.selectStmt, partColumn, compoundPredicates);
|
||||
if (compoundPredicates.size() != 1) {
|
||||
LOG.info("the predicate size include partition key has {}", compoundPredicates.size());
|
||||
return CacheMode.None;
|
||||
}
|
||||
partitionPredicate = compoundPredicates.get(0);
|
||||
cache = new PartitionCache(this.queryId, this.selectStmt);
|
||||
((PartitionCache) cache).setCacheInfo(this.latestTable, this.partitionInfo, this.partColumn,
|
||||
this.partitionPredicate);
|
||||
MetricRepo.COUNTER_CACHE_MODE_PARTITION.increase(1L);
|
||||
return CacheMode.Partition;
|
||||
}
|
||||
|
||||
public CacheBeProxy.FetchCacheResult getCacheData() {
|
||||
CacheProxy.FetchCacheResult cacheResult = null;
|
||||
cacheMode = innerCheckCacheMode(0);
|
||||
if (cacheMode == CacheMode.NoNeed) {
|
||||
return cacheResult;
|
||||
}
|
||||
if (cacheMode == CacheMode.None) {
|
||||
LOG.info("check cache mode {}, queryid {}", cacheMode, DebugUtil.printId(queryId));
|
||||
return cacheResult;
|
||||
}
|
||||
Status status = new Status();
|
||||
cacheResult = cache.getCacheData(status);
|
||||
|
||||
if (status.ok() && cacheResult != null) {
|
||||
LOG.info("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}",
|
||||
cacheMode, DebugUtil.printId(queryId),
|
||||
cacheResult.all_count, cacheResult.value_count,
|
||||
cacheResult.row_count, cacheResult.data_size);
|
||||
} else {
|
||||
LOG.info("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode,
|
||||
DebugUtil.printId(queryId), status.getErrorCode(), status.getErrorMsg());
|
||||
cacheResult = null;
|
||||
}
|
||||
return cacheResult;
|
||||
}
|
||||
|
||||
public long nowtime() {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
private void getPartitionKeyFromSelectStmt(SelectStmt stmt, Column partColumn,
|
||||
List<CompoundPredicate> compoundPredicates) {
|
||||
getPartitionKeyFromWhereClause(stmt.getWhereClause(), partColumn, compoundPredicates);
|
||||
List<TableRef> tableRefs = stmt.getTableRefs();
|
||||
for (TableRef tblRef : tableRefs) {
|
||||
if (tblRef instanceof InlineViewRef) {
|
||||
InlineViewRef viewRef = (InlineViewRef) tblRef;
|
||||
QueryStmt queryStmt = viewRef.getViewStmt();
|
||||
if (queryStmt instanceof SelectStmt) {
|
||||
getPartitionKeyFromSelectStmt((SelectStmt) queryStmt, partColumn, compoundPredicates);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Only support case 1
|
||||
* 1.key >= a and key <= b
|
||||
* 2.key = a or key = b
|
||||
* 3.key in(a,b,c)
|
||||
*/
|
||||
private void getPartitionKeyFromWhereClause(Expr expr, Column partColumn,
|
||||
List<CompoundPredicate> compoundPredicates) {
|
||||
if (expr == null) {
|
||||
return;
|
||||
}
|
||||
if (expr instanceof CompoundPredicate) {
|
||||
CompoundPredicate cp = (CompoundPredicate) expr;
|
||||
if (cp.getOp() == CompoundPredicate.Operator.AND) {
|
||||
if (cp.getChildren().size() == 2 && cp.getChild(0) instanceof BinaryPredicate &&
|
||||
cp.getChild(1) instanceof BinaryPredicate) {
|
||||
BinaryPredicate leftPre = (BinaryPredicate) cp.getChild(0);
|
||||
BinaryPredicate rightPre = (BinaryPredicate) cp.getChild(1);
|
||||
String leftColumn = getColumnName(leftPre);
|
||||
String rightColumn = getColumnName(rightPre);
|
||||
if (leftColumn.equalsIgnoreCase(partColumn.getName()) &&
|
||||
rightColumn.equalsIgnoreCase(partColumn.getName())) {
|
||||
compoundPredicates.add(cp);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Expr subExpr : expr.getChildren()) {
|
||||
getPartitionKeyFromWhereClause(subExpr, partColumn, compoundPredicates);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getColumnName(BinaryPredicate predicate) {
|
||||
SlotRef slot = null;
|
||||
if (predicate.getChild(0) instanceof SlotRef) {
|
||||
slot = (SlotRef) predicate.getChild(0);
|
||||
} else if (predicate.getChild(0) instanceof CastExpr) {
|
||||
CastExpr expr = (CastExpr) predicate.getChild(0);
|
||||
if (expr.getChild(0) instanceof SlotRef) {
|
||||
slot = (SlotRef) expr.getChild(0);
|
||||
}
|
||||
}
|
||||
|
||||
if (slot != null) {
|
||||
return slot.getColumnName();
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the selectStmt and tableRefs always group by partition key
|
||||
* 1. At least one group by
|
||||
* 2. group by must contain partition key
|
||||
*/
|
||||
private boolean checkGroupByPartitionKey(SelectStmt stmt, Column partColumn) {
|
||||
List<AggregateInfo> aggInfoList = Lists.newArrayList();
|
||||
getAggInfoList(stmt, aggInfoList);
|
||||
int groupbyCount = 0;
|
||||
for (AggregateInfo aggInfo : aggInfoList) {
|
||||
/*
|
||||
Support COUNT(DISTINCT xxx) now,next version will remove the code
|
||||
if (aggInfo.isDistinctAgg()) {
|
||||
return false;
|
||||
}*/
|
||||
ArrayList<Expr> groupExprs = aggInfo.getGroupingExprs();
|
||||
if (groupExprs == null) {
|
||||
continue;
|
||||
}
|
||||
groupbyCount += 1;
|
||||
boolean matched = false;
|
||||
for (Expr groupExpr : groupExprs) {
|
||||
SlotRef slot = (SlotRef) groupExpr;
|
||||
if (partColumn.getName().equals(slot.getColumnName())) {
|
||||
matched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!matched) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return groupbyCount > 0 ? true : false;
|
||||
}
|
||||
|
||||
private void getAggInfoList(SelectStmt stmt, List<AggregateInfo> aggInfoList) {
|
||||
AggregateInfo aggInfo = stmt.getAggInfo();
|
||||
if (aggInfo != null) {
|
||||
aggInfoList.add(aggInfo);
|
||||
}
|
||||
List<TableRef> tableRefs = stmt.getTableRefs();
|
||||
for (TableRef tblRef : tableRefs) {
|
||||
if (tblRef instanceof InlineViewRef) {
|
||||
InlineViewRef viewRef = (InlineViewRef) tblRef;
|
||||
QueryStmt queryStmt = viewRef.getViewStmt();
|
||||
if (queryStmt instanceof SelectStmt) {
|
||||
getAggInfoList((SelectStmt) queryStmt, aggInfoList);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private CacheTable getLastUpdateTime(OlapTable olapTable) {
|
||||
CacheTable table = new CacheTable();
|
||||
table.olapTable = olapTable;
|
||||
for (Partition partition : olapTable.getPartitions()) {
|
||||
if (partition.getVisibleVersionTime() >= table.latestTime &&
|
||||
partition.getVisibleVersion() > table.latestVersion) {
|
||||
table.latestPartitionId = partition.getId();
|
||||
table.latestTime = partition.getVisibleVersionTime();
|
||||
table.latestVersion = partition.getVisibleVersion();
|
||||
}
|
||||
}
|
||||
return table;
|
||||
}
|
||||
|
||||
public Cache.HitRange getHitRange() {
|
||||
if (cacheMode == CacheMode.None) {
|
||||
return Cache.HitRange.None;
|
||||
}
|
||||
return cache.getHitRange();
|
||||
}
|
||||
|
||||
public SelectStmt getRewriteStmt() {
|
||||
if (cacheMode != CacheMode.Partition) {
|
||||
return null;
|
||||
}
|
||||
return cache.getRewriteStmt();
|
||||
}
|
||||
|
||||
public void copyRowBatch(RowBatch rowBatch) {
|
||||
if (cacheMode == CacheMode.None || cacheMode == CacheMode.NoNeed) {
|
||||
return;
|
||||
}
|
||||
cache.copyRowBatch(rowBatch);
|
||||
}
|
||||
|
||||
public void updateCache() {
|
||||
if (cacheMode == CacheMode.None || cacheMode == CacheMode.NoNeed) {
|
||||
return;
|
||||
}
|
||||
cache.updateCache();
|
||||
}
|
||||
}
|
||||
217
fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
vendored
Normal file
217
fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
vendored
Normal file
@ -0,0 +1,217 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.qe.cache;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.doris.analysis.CompoundPredicate;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.InlineViewRef;
|
||||
import org.apache.doris.analysis.QueryStmt;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.analysis.TableRef;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.RangePartitionInfo;
|
||||
import org.apache.doris.common.Status;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.qe.RowBatch;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class PartitionCache extends Cache {
|
||||
private static final Logger LOG = LogManager.getLogger(PartitionCache.class);
|
||||
private SelectStmt nokeyStmt;
|
||||
private SelectStmt rewriteStmt;
|
||||
private CompoundPredicate partitionPredicate;
|
||||
private OlapTable olapTable;
|
||||
private RangePartitionInfo partitionInfo;
|
||||
private Column partColumn;
|
||||
|
||||
private PartitionRange range;
|
||||
private List<PartitionRange.PartitionSingle> newRangeList;
|
||||
|
||||
public SelectStmt getRewriteStmt() {
|
||||
return rewriteStmt;
|
||||
}
|
||||
|
||||
public SelectStmt getNokeyStmt() {
|
||||
return nokeyStmt;
|
||||
}
|
||||
|
||||
public PartitionCache(TUniqueId queryId, SelectStmt selectStmt) {
|
||||
super(queryId, selectStmt);
|
||||
}
|
||||
|
||||
public void setCacheInfo(CacheAnalyzer.CacheTable latestTable, RangePartitionInfo partitionInfo, Column partColumn,
|
||||
CompoundPredicate partitionPredicate) {
|
||||
this.latestTable = latestTable;
|
||||
this.olapTable = latestTable.olapTable;
|
||||
this.partitionInfo = partitionInfo;
|
||||
this.partColumn = partColumn;
|
||||
this.partitionPredicate = partitionPredicate;
|
||||
this.newRangeList = Lists.newArrayList();
|
||||
}
|
||||
|
||||
public CacheProxy.FetchCacheResult getCacheData(Status status) {
|
||||
CacheProxy.FetchCacheRequest request;
|
||||
rewriteSelectStmt(null);
|
||||
request = new CacheBeProxy.FetchCacheRequest(nokeyStmt.toSql());
|
||||
range = new PartitionRange(this.partitionPredicate, this.olapTable,
|
||||
this.partitionInfo);
|
||||
if (!range.analytics()) {
|
||||
status.setStatus("analytics range error");
|
||||
return null;
|
||||
}
|
||||
|
||||
for (PartitionRange.PartitionSingle single : range.getPartitionSingleList()) {
|
||||
request.addParam(single.getCacheKey().realValue(),
|
||||
single.getPartition().getVisibleVersion(),
|
||||
single.getPartition().getVisibleVersionTime()
|
||||
);
|
||||
}
|
||||
|
||||
CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
|
||||
if (status.ok() && cacheResult != null) {
|
||||
cacheResult.all_count = range.getPartitionSingleList().size();
|
||||
for (CacheBeProxy.CacheValue value : cacheResult.getValueList()) {
|
||||
range.setCacheFlag(value.param.partition_key);
|
||||
}
|
||||
MetricRepo.COUNTER_CACHE_HIT_PARTITION.increase(1L);
|
||||
MetricRepo.COUNTER_CACHE_PARTITION_ALL.increase((long) range.getPartitionSingleList().size());
|
||||
MetricRepo.COUNTER_CACHE_PARTITION_HIT.increase((long) cacheResult.getValueList().size());
|
||||
}
|
||||
|
||||
range.setTooNewByID(latestTable.latestPartitionId);
|
||||
//build rewrite sql
|
||||
this.hitRange = range.buildDiskPartitionRange(newRangeList);
|
||||
if (newRangeList != null && newRangeList.size() > 0) {
|
||||
rewriteSelectStmt(newRangeList);
|
||||
}
|
||||
return cacheResult;
|
||||
}
|
||||
|
||||
public void copyRowBatch(RowBatch rowBatch) {
|
||||
if (rowBatchBuilder == null) {
|
||||
rowBatchBuilder = new RowBatchBuilder(CacheAnalyzer.CacheMode.Partition);
|
||||
rowBatchBuilder.buildPartitionIndex(selectStmt.getResultExprs(), selectStmt.getColLabels(),
|
||||
partColumn, range.buildUpdatePartitionRange());
|
||||
}
|
||||
rowBatchBuilder.copyRowData(rowBatch);
|
||||
}
|
||||
|
||||
public void updateCache() {
|
||||
if (!super.checkRowLimit()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildPartitionUpdateRequest(nokeyStmt.toSql());
|
||||
if (updateRequest.value_count > 0) {
|
||||
CacheBeProxy proxy = new CacheBeProxy();
|
||||
Status status = new Status();
|
||||
proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status);
|
||||
LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}",
|
||||
CacheAnalyzer.CacheMode.Partition, DebugUtil.printId(queryId),
|
||||
DebugUtil.printId(updateRequest.sql_key),
|
||||
updateRequest.value_count, updateRequest.row_count, updateRequest.data_size);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the predicate containing partition key to null
|
||||
*/
|
||||
public void rewriteSelectStmt(List<PartitionRange.PartitionSingle> newRangeList) {
|
||||
if (newRangeList == null || newRangeList.size() == 0) {
|
||||
this.nokeyStmt = (SelectStmt) this.selectStmt.clone();
|
||||
rewriteSelectStmt(nokeyStmt, this.partitionPredicate, null);
|
||||
} else {
|
||||
this.rewriteStmt = (SelectStmt) this.selectStmt.clone();
|
||||
rewriteSelectStmt(rewriteStmt, this.partitionPredicate, newRangeList);
|
||||
}
|
||||
}
|
||||
|
||||
private void rewriteSelectStmt(SelectStmt newStmt, CompoundPredicate predicate,
|
||||
List<PartitionRange.PartitionSingle> newRangeList) {
|
||||
newStmt.setWhereClause(
|
||||
rewriteWhereClause(newStmt.getWhereClause(), predicate, newRangeList)
|
||||
);
|
||||
List<TableRef> tableRefs = newStmt.getTableRefs();
|
||||
for (TableRef tblRef : tableRefs) {
|
||||
if (tblRef instanceof InlineViewRef) {
|
||||
InlineViewRef viewRef = (InlineViewRef) tblRef;
|
||||
QueryStmt queryStmt = viewRef.getViewStmt();
|
||||
if (queryStmt instanceof SelectStmt) {
|
||||
rewriteSelectStmt((SelectStmt) queryStmt, predicate, newRangeList);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Rewrite the query scope of partition key in the where condition
|
||||
* origin expr : where eventdate>="2020-01-12" and eventdate<="2020-01-15"
|
||||
* rewrite expr : where eventdate>="2020-01-14" and eventdate<="2020=01-15"
|
||||
*/
|
||||
private Expr rewriteWhereClause(Expr expr, CompoundPredicate predicate,
|
||||
List<PartitionRange.PartitionSingle> newRangeList) {
|
||||
if (expr == null) {
|
||||
return null;
|
||||
}
|
||||
if (!(expr instanceof CompoundPredicate)) {
|
||||
return expr;
|
||||
}
|
||||
if (expr.equals(predicate)) {
|
||||
if (newRangeList == null) {
|
||||
return null;
|
||||
} else {
|
||||
getPartitionRange().rewritePredicate((CompoundPredicate) expr, newRangeList);
|
||||
return expr;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < expr.getChildren().size(); i++) {
|
||||
Expr child = rewriteWhereClause(expr.getChild(i), predicate, newRangeList);
|
||||
if (child == null) {
|
||||
expr.removeNode(i);
|
||||
i--;
|
||||
} else {
|
||||
expr.setChild(i, child);
|
||||
}
|
||||
}
|
||||
if (expr.getChildren().size() == 0) {
|
||||
return null;
|
||||
} else if (expr.getChildren().size() == 1) {
|
||||
return expr.getChild(0);
|
||||
} else {
|
||||
return expr;
|
||||
}
|
||||
}
|
||||
|
||||
public PartitionRange getPartitionRange() {
|
||||
if (range == null) {
|
||||
range = new PartitionRange(this.partitionPredicate,
|
||||
this.olapTable, this.partitionInfo);
|
||||
return range;
|
||||
} else {
|
||||
return range;
|
||||
}
|
||||
}
|
||||
}
|
||||
604
fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java
vendored
Normal file
604
fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java
vendored
Normal file
@ -0,0 +1,604 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.qe.cache;
|
||||
|
||||
import org.apache.doris.analysis.CompoundPredicate;
|
||||
import org.apache.doris.analysis.BinaryPredicate;
|
||||
import org.apache.doris.analysis.DateLiteral;
|
||||
import org.apache.doris.analysis.InPredicate;
|
||||
import org.apache.doris.analysis.PartitionValue;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.analysis.IntLiteral;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.RangePartitionInfo;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionKey;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.planner.PartitionColumnFilter;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Range;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Convert the range of the partition to the list
|
||||
* all partition by day/week/month split to day list
|
||||
*/
|
||||
public class PartitionRange {
|
||||
private static final Logger LOG = LogManager.getLogger(PartitionRange.class);
|
||||
|
||||
public class PartitionSingle {
|
||||
private Partition partition;
|
||||
private PartitionKey partitionKey;
|
||||
private long partitionId;
|
||||
private PartitionKeyType cacheKey;
|
||||
private boolean fromCache;
|
||||
private boolean tooNew;
|
||||
|
||||
public Partition getPartition() {
|
||||
return partition;
|
||||
}
|
||||
|
||||
public void setPartition(Partition partition) {
|
||||
this.partition = partition;
|
||||
}
|
||||
|
||||
public PartitionKey getPartitionKey() {
|
||||
return partitionKey;
|
||||
}
|
||||
|
||||
public void setPartitionKey(PartitionKey key) {
|
||||
this.partitionKey = key;
|
||||
}
|
||||
|
||||
public long getPartitionId() {
|
||||
return partitionId;
|
||||
}
|
||||
|
||||
public void setPartitionId(long partitionId) {
|
||||
this.partitionId = partitionId;
|
||||
}
|
||||
|
||||
public PartitionKeyType getCacheKey() {
|
||||
return cacheKey;
|
||||
}
|
||||
|
||||
public void setCacheKey(PartitionKeyType cacheKey) {
|
||||
this.cacheKey.clone(cacheKey);
|
||||
}
|
||||
|
||||
public boolean isFromCache() {
|
||||
return fromCache;
|
||||
}
|
||||
|
||||
public void setFromCache(boolean fromCache) {
|
||||
this.fromCache = fromCache;
|
||||
}
|
||||
|
||||
public boolean isTooNew() {
|
||||
return tooNew;
|
||||
}
|
||||
|
||||
public void setTooNew(boolean tooNew) {
|
||||
this.tooNew = tooNew;
|
||||
}
|
||||
|
||||
public PartitionSingle() {
|
||||
this.partitionId = 0;
|
||||
this.cacheKey = new PartitionKeyType();
|
||||
this.fromCache = false;
|
||||
this.tooNew = false;
|
||||
}
|
||||
|
||||
public void Debug() {
|
||||
if (partition != null) {
|
||||
LOG.info("partition id {}, cacheKey {}, version {}, time {}, fromCache {}, tooNew {} ",
|
||||
partitionId, cacheKey.realValue(),
|
||||
partition.getVisibleVersion(), partition.getVisibleVersionTime(),
|
||||
fromCache, tooNew);
|
||||
} else {
|
||||
LOG.info("partition id {}, cacheKey {}, fromCache {}, tooNew {} ", partitionId,
|
||||
cacheKey.realValue(), fromCache, tooNew);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public enum KeyType {
|
||||
DEFAULT,
|
||||
LONG,
|
||||
DATE,
|
||||
DATETIME,
|
||||
TIME
|
||||
}
|
||||
|
||||
public static class PartitionKeyType {
|
||||
private SimpleDateFormat df8 = new SimpleDateFormat("yyyyMMdd");
|
||||
private SimpleDateFormat df10 = new SimpleDateFormat("yyyy-MM-dd");
|
||||
|
||||
public KeyType keyType = KeyType.DEFAULT;
|
||||
public long value;
|
||||
public Date date;
|
||||
|
||||
public boolean init(Type type, String str) {
|
||||
if (type.getPrimitiveType() == PrimitiveType.DATE) {
|
||||
try {
|
||||
date = df10.parse(str);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("parse error str{}.", str);
|
||||
return false;
|
||||
}
|
||||
keyType = KeyType.DATE;
|
||||
} else {
|
||||
value = Long.valueOf(str);
|
||||
keyType = KeyType.LONG;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean init(Type type, LiteralExpr expr) {
|
||||
switch (type.getPrimitiveType()) {
|
||||
case BOOLEAN:
|
||||
case TIME:
|
||||
case DATETIME:
|
||||
case FLOAT:
|
||||
case DOUBLE:
|
||||
case DECIMAL:
|
||||
case DECIMALV2:
|
||||
case CHAR:
|
||||
case VARCHAR:
|
||||
case LARGEINT:
|
||||
LOG.info("PartitionCache not support such key type {}", type.toSql());
|
||||
return false;
|
||||
case DATE:
|
||||
date = getDateValue(expr);
|
||||
keyType = KeyType.DATE;
|
||||
break;
|
||||
case TINYINT:
|
||||
case SMALLINT:
|
||||
case INT:
|
||||
case BIGINT:
|
||||
value = expr.getLongValue();
|
||||
keyType = KeyType.LONG;
|
||||
break;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public void clone(PartitionKeyType key) {
|
||||
keyType = key.keyType;
|
||||
value = key.value;
|
||||
date = key.date;
|
||||
}
|
||||
|
||||
public boolean equals(PartitionKeyType key) {
|
||||
return realValue() == key.realValue();
|
||||
}
|
||||
|
||||
public void add(int num) {
|
||||
if (keyType == KeyType.DATE) {
|
||||
date = new Date(date.getTime() + num * 3600 * 24 * 1000);
|
||||
} else {
|
||||
value += num;
|
||||
}
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
if (keyType == KeyType.DEFAULT) {
|
||||
return "";
|
||||
} else if (keyType == KeyType.DATE) {
|
||||
return df10.format(date);
|
||||
} else {
|
||||
return String.valueOf(value);
|
||||
}
|
||||
}
|
||||
|
||||
public long realValue() {
|
||||
if (keyType == KeyType.DATE) {
|
||||
return Long.parseLong(df8.format(date));
|
||||
} else {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
private Date getDateValue(LiteralExpr expr) {
|
||||
value = expr.getLongValue() / 1000000;
|
||||
Date dt = null;
|
||||
try {
|
||||
dt = df8.parse(String.valueOf(value));
|
||||
} catch (Exception e) {
|
||||
}
|
||||
return dt;
|
||||
}
|
||||
}
|
||||
|
||||
private CompoundPredicate partitionKeyPredicate;
|
||||
private OlapTable olapTable;
|
||||
private RangePartitionInfo rangePartitionInfo;
|
||||
private Column partitionColumn;
|
||||
private List<PartitionSingle> partitionSingleList;
|
||||
|
||||
public CompoundPredicate getPartitionKeyPredicate() {
|
||||
return partitionKeyPredicate;
|
||||
}
|
||||
|
||||
public void setPartitionKeyPredicate(CompoundPredicate partitionKeyPredicate) {
|
||||
this.partitionKeyPredicate = partitionKeyPredicate;
|
||||
}
|
||||
|
||||
public RangePartitionInfo getRangePartitionInfo() {
|
||||
return rangePartitionInfo;
|
||||
}
|
||||
|
||||
public void setRangePartitionInfo(RangePartitionInfo rangePartitionInfo) {
|
||||
this.rangePartitionInfo = rangePartitionInfo;
|
||||
}
|
||||
|
||||
public Column getPartitionColumn() {
|
||||
return partitionColumn;
|
||||
}
|
||||
|
||||
public void setPartitionColumn(Column partitionColumn) {
|
||||
this.partitionColumn = partitionColumn;
|
||||
}
|
||||
|
||||
public List<PartitionSingle> getPartitionSingleList() {
|
||||
return partitionSingleList;
|
||||
}
|
||||
|
||||
public PartitionRange() {
|
||||
}
|
||||
|
||||
public PartitionRange(CompoundPredicate partitionKeyPredicate, OlapTable olapTable,
|
||||
RangePartitionInfo rangePartitionInfo) {
|
||||
this.partitionKeyPredicate = partitionKeyPredicate;
|
||||
this.olapTable = olapTable;
|
||||
this.rangePartitionInfo = rangePartitionInfo;
|
||||
this.partitionSingleList = Lists.newArrayList();
|
||||
}
|
||||
|
||||
/**
|
||||
* analytics PartitionKey and PartitionInfo
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public boolean analytics() {
|
||||
if (rangePartitionInfo.getPartitionColumns().size() != 1) {
|
||||
return false;
|
||||
}
|
||||
partitionColumn = rangePartitionInfo.getPartitionColumns().get(0);
|
||||
PartitionColumnFilter filter = createPartitionFilter(this.partitionKeyPredicate, partitionColumn);
|
||||
try {
|
||||
if (!buildPartitionKeyRange(filter, partitionColumn)) {
|
||||
return false;
|
||||
}
|
||||
getTablePartitionList(olapTable);
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("get partition range failed, because:", e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean setCacheFlag(long cacheKey) {
|
||||
boolean find = false;
|
||||
for (PartitionSingle single : partitionSingleList) {
|
||||
if (single.getCacheKey().realValue() == cacheKey) {
|
||||
single.setFromCache(true);
|
||||
find = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return find;
|
||||
}
|
||||
|
||||
public boolean setTooNewByID(long partitionId) {
|
||||
boolean find = false;
|
||||
for (PartitionSingle single : partitionSingleList) {
|
||||
if (single.getPartition().getId() == partitionId) {
|
||||
single.setTooNew(true);
|
||||
find = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return find;
|
||||
}
|
||||
|
||||
public boolean setTooNewByKey(long cacheKey) {
|
||||
boolean find = false;
|
||||
for (PartitionSingle single : partitionSingleList) {
|
||||
if (single.getCacheKey().realValue() == cacheKey) {
|
||||
single.setTooNew(true);
|
||||
find = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return find;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only the range query of the key of the partition is supported, and the separated partition key query is not supported.
|
||||
* Because a query can only be divided into two parts, part1 get data from cache, part2 fetch_data by scan node from BE.
|
||||
* Partion cache : 20191211-20191215
|
||||
* Hit cache parameter : [20191211 - 20191215], [20191212 - 20191214], [20191212 - 20191216],[20191210 - 20191215]
|
||||
* Miss cache parameter: [20191210 - 20191216]
|
||||
* So hit range is full, left or right, not support middle now
|
||||
*/
|
||||
public Cache.HitRange buildDiskPartitionRange(List<PartitionSingle> rangeList) {
|
||||
Cache.HitRange hitRange = Cache.HitRange.None;
|
||||
if (partitionSingleList.size() == 0) {
|
||||
return hitRange;
|
||||
}
|
||||
int begin = partitionSingleList.size() - 1;
|
||||
int end = 0;
|
||||
for (int i = 0; i < partitionSingleList.size(); i++) {
|
||||
if (!partitionSingleList.get(i).isFromCache()) {
|
||||
if (begin > i) {
|
||||
begin = i;
|
||||
}
|
||||
if (end < i) {
|
||||
end = i;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (end < begin) {
|
||||
hitRange = Cache.HitRange.Full;
|
||||
return hitRange;
|
||||
}
|
||||
|
||||
if (end == partitionSingleList.size() - 1) {
|
||||
hitRange = Cache.HitRange.Left;
|
||||
}
|
||||
if (begin == 0) {
|
||||
hitRange = Cache.HitRange.Right;
|
||||
}
|
||||
|
||||
rangeList.add(partitionSingleList.get(begin));
|
||||
rangeList.add(partitionSingleList.get(end));
|
||||
LOG.info("the new range for scan be is [{},{}], hit range", rangeList.get(0).getCacheKey().realValue(),
|
||||
rangeList.get(1).getCacheKey().realValue(), hitRange);
|
||||
return hitRange;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the partition range that needs to be updated
|
||||
* @return
|
||||
*/
|
||||
public List<PartitionSingle> buildUpdatePartitionRange() {
|
||||
List<PartitionSingle> updateList = Lists.newArrayList();
|
||||
for (PartitionSingle single : partitionSingleList) {
|
||||
if (!single.isFromCache() && !single.isTooNew()) {
|
||||
updateList.add(single);
|
||||
}
|
||||
}
|
||||
return updateList;
|
||||
}
|
||||
|
||||
public boolean rewritePredicate(CompoundPredicate predicate, List<PartitionSingle> rangeList) {
|
||||
if (predicate.getOp() != CompoundPredicate.Operator.AND) {
|
||||
LOG.debug("predicate op {}", predicate.getOp().toString());
|
||||
return false;
|
||||
}
|
||||
for (Expr expr : predicate.getChildren()) {
|
||||
if (expr instanceof BinaryPredicate) {
|
||||
BinaryPredicate binPredicate = (BinaryPredicate) expr;
|
||||
BinaryPredicate.Operator op = binPredicate.getOp();
|
||||
if (binPredicate.getChildren().size() != 2) {
|
||||
LOG.info("binary predicate children size {}", binPredicate.getChildren().size());
|
||||
continue;
|
||||
}
|
||||
if (op == BinaryPredicate.Operator.NE) {
|
||||
LOG.info("binary predicate op {}", op.toString());
|
||||
continue;
|
||||
}
|
||||
PartitionKeyType key = new PartitionKeyType();
|
||||
switch (op) {
|
||||
case LE: //<=
|
||||
key.clone(rangeList.get(1).getCacheKey());
|
||||
break;
|
||||
case LT: //<
|
||||
key.clone(rangeList.get(1).getCacheKey());
|
||||
key.add(1);
|
||||
break;
|
||||
case GE: //>=
|
||||
key.clone(rangeList.get(0).getCacheKey());
|
||||
break;
|
||||
case GT: //>
|
||||
key.clone(rangeList.get(0).getCacheKey());
|
||||
key.add(-1);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
LiteralExpr newLiteral;
|
||||
if (key.keyType == KeyType.DATE) {
|
||||
try {
|
||||
newLiteral = new DateLiteral(key.toString(), Type.DATE);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Date's format is error {},{}", key.toString(), e);
|
||||
continue;
|
||||
}
|
||||
} else if (key.keyType == KeyType.LONG) {
|
||||
newLiteral = new IntLiteral(key.realValue());
|
||||
} else {
|
||||
LOG.warn("Partition cache not support type {}", key.keyType);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (binPredicate.getChild(1) instanceof LiteralExpr) {
|
||||
binPredicate.removeNode(1);
|
||||
binPredicate.addChild(newLiteral);
|
||||
} else if (binPredicate.getChild(0) instanceof LiteralExpr) {
|
||||
binPredicate.removeNode(0);
|
||||
binPredicate.setChild(0, newLiteral);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else if (expr instanceof InPredicate) {
|
||||
InPredicate inPredicate = (InPredicate) expr;
|
||||
if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get partition info from SQL Predicate and OlapTable
|
||||
* Pair<PartitionID, PartitionKey>
|
||||
* PARTITION BY RANGE(`olap_date`)
|
||||
* ( PARTITION p20200101 VALUES [("20200101"), ("20200102")),
|
||||
* PARTITION p20200102 VALUES [("20200102"), ("20200103")) )
|
||||
*/
|
||||
private void getTablePartitionList(OlapTable table) {
|
||||
Map<Long, Range<PartitionKey>> range = rangePartitionInfo.getIdToRange(false);
|
||||
for (Map.Entry<Long, Range<PartitionKey>> entry : range.entrySet()) {
|
||||
Long partId = entry.getKey();
|
||||
for (PartitionSingle single : partitionSingleList) {
|
||||
if (entry.getValue().contains(single.getPartitionKey())) {
|
||||
if (single.getPartitionId() == 0) {
|
||||
single.setPartitionId(partId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (PartitionSingle single : partitionSingleList) {
|
||||
single.setPartition(table.getPartition(single.getPartitionId()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get value range of partition column from predicate
|
||||
*/
|
||||
private boolean buildPartitionKeyRange(PartitionColumnFilter partitionColumnFilter,
|
||||
Column partitionColumn) throws AnalysisException {
|
||||
if (partitionColumnFilter.lowerBound == null || partitionColumnFilter.upperBound == null) {
|
||||
LOG.info("filter is null");
|
||||
return false;
|
||||
}
|
||||
PartitionKeyType begin = new PartitionKeyType();
|
||||
PartitionKeyType end = new PartitionKeyType();
|
||||
begin.init(partitionColumn.getType(), partitionColumnFilter.lowerBound);
|
||||
end.init(partitionColumn.getType(), partitionColumnFilter.upperBound);
|
||||
|
||||
if (!partitionColumnFilter.lowerBoundInclusive) {
|
||||
begin.add(1);
|
||||
}
|
||||
if (!partitionColumnFilter.upperBoundInclusive) {
|
||||
end.add(-1);
|
||||
}
|
||||
if (begin.realValue() > end.realValue()) {
|
||||
LOG.info("partition range begin {}, end {}", begin, end);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (end.realValue() - begin.realValue() > Config.cache_result_max_row_count) {
|
||||
LOG.info("partition key range is too large, begin {}, end {}", begin.realValue(), end.realValue());
|
||||
return false;
|
||||
}
|
||||
|
||||
while (begin.realValue() <= end.realValue()) {
|
||||
PartitionKey key = PartitionKey.createPartitionKey(
|
||||
Lists.newArrayList(new PartitionValue(begin.toString())),
|
||||
Lists.newArrayList(partitionColumn));
|
||||
PartitionSingle single = new PartitionSingle();
|
||||
single.setCacheKey(begin);
|
||||
single.setPartitionKey(key);
|
||||
partitionSingleList.add(single);
|
||||
begin.add(1);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private PartitionColumnFilter createPartitionFilter(CompoundPredicate partitionKeyPredicate,
|
||||
Column partitionColumn) {
|
||||
if (partitionKeyPredicate.getOp() != CompoundPredicate.Operator.AND) {
|
||||
LOG.debug("not and op");
|
||||
return null;
|
||||
}
|
||||
PartitionColumnFilter partitionColumnFilter = new PartitionColumnFilter();
|
||||
;
|
||||
for (Expr expr : partitionKeyPredicate.getChildren()) {
|
||||
if (expr instanceof BinaryPredicate) {
|
||||
BinaryPredicate binPredicate = (BinaryPredicate) expr;
|
||||
BinaryPredicate.Operator op = binPredicate.getOp();
|
||||
if (binPredicate.getChildren().size() != 2) {
|
||||
LOG.warn("child size {}", binPredicate.getChildren().size());
|
||||
continue;
|
||||
}
|
||||
if (binPredicate.getOp() == BinaryPredicate.Operator.NE) {
|
||||
LOG.debug("not support NE operator");
|
||||
continue;
|
||||
}
|
||||
Expr slotBinding;
|
||||
if (binPredicate.getChild(1) instanceof LiteralExpr) {
|
||||
slotBinding = binPredicate.getChild(1);
|
||||
} else if (binPredicate.getChild(0) instanceof LiteralExpr) {
|
||||
slotBinding = binPredicate.getChild(0);
|
||||
} else {
|
||||
LOG.debug("not find LiteralExpr");
|
||||
continue;
|
||||
}
|
||||
|
||||
LiteralExpr literal = (LiteralExpr) slotBinding;
|
||||
switch (op) {
|
||||
case EQ: //=
|
||||
partitionColumnFilter.setLowerBound(literal, true);
|
||||
partitionColumnFilter.setUpperBound(literal, true);
|
||||
break;
|
||||
case LE: //<=
|
||||
partitionColumnFilter.setUpperBound(literal, true);
|
||||
break;
|
||||
case LT: //<
|
||||
partitionColumnFilter.setUpperBound(literal, false);
|
||||
break;
|
||||
case GE: //>=
|
||||
partitionColumnFilter.setLowerBound(literal, true);
|
||||
|
||||
break;
|
||||
case GT: //>
|
||||
partitionColumnFilter.setLowerBound(literal, false);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
} else if (expr instanceof InPredicate) {
|
||||
InPredicate inPredicate = (InPredicate) expr;
|
||||
if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) {
|
||||
continue;
|
||||
}
|
||||
partitionColumnFilter.setInPredicate(inPredicate);
|
||||
}
|
||||
}
|
||||
return partitionColumnFilter;
|
||||
}
|
||||
}
|
||||
158
fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
vendored
Normal file
158
fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
vendored
Normal file
@ -0,0 +1,158 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.qe.cache;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.qe.RowBatch;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* According to the query partition range and cache hit, the rowbatch to update the cache is constructed
|
||||
*/
|
||||
public class RowBatchBuilder {
|
||||
private static final Logger LOG = LogManager.getLogger(RowBatchBuilder.class);
|
||||
|
||||
private CacheBeProxy.UpdateCacheRequest updateRequest;
|
||||
private CacheAnalyzer.CacheMode cacheMode;
|
||||
private int keyIndex;
|
||||
private Type keyType;
|
||||
private HashMap<Long, PartitionRange.PartitionSingle> cachePartMap;
|
||||
private List<byte[]> rowList;
|
||||
private int batchSize;
|
||||
private int rowSize;
|
||||
private int dataSize;
|
||||
|
||||
public int getRowSize() {
|
||||
return rowSize;
|
||||
}
|
||||
|
||||
public RowBatchBuilder(CacheAnalyzer.CacheMode model) {
|
||||
cacheMode = model;
|
||||
keyIndex = 0;
|
||||
keyType = Type.INVALID;
|
||||
rowList = Lists.newArrayList();
|
||||
cachePartMap = new HashMap<>();
|
||||
batchSize = 0;
|
||||
rowSize = 0;
|
||||
dataSize = 0;
|
||||
}
|
||||
|
||||
public void buildPartitionIndex(ArrayList<Expr> resultExpr,
|
||||
List<String> columnLabel, Column partColumn,
|
||||
List<PartitionRange.PartitionSingle> newSingleList) {
|
||||
if (cacheMode != CacheAnalyzer.CacheMode.Partition) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < columnLabel.size(); i++) {
|
||||
if (columnLabel.get(i).equalsIgnoreCase(partColumn.getName())) {
|
||||
keyType = resultExpr.get(i).getType();
|
||||
keyIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (newSingleList != null) {
|
||||
for (PartitionRange.PartitionSingle single : newSingleList) {
|
||||
cachePartMap.put(single.getCacheKey().realValue(), single);
|
||||
}
|
||||
} else {
|
||||
LOG.info("no new partition single list ");
|
||||
}
|
||||
}
|
||||
|
||||
public void copyRowData(RowBatch rowBatch) {
|
||||
batchSize++;
|
||||
rowSize += rowBatch.getBatch().getRowsSize();
|
||||
for (ByteBuffer buf : rowBatch.getBatch().getRows()) {
|
||||
byte[] bytes = Arrays.copyOfRange(buf.array(), buf.position(), buf.limit());
|
||||
dataSize += bytes.length;
|
||||
rowList.add(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
public CacheBeProxy.UpdateCacheRequest buildSqlUpdateRequest(String sql, long partitionKey, long lastVersion, long lastestTime) {
|
||||
if (updateRequest == null) {
|
||||
updateRequest = new CacheBeProxy.UpdateCacheRequest(sql);
|
||||
}
|
||||
updateRequest.addValue(partitionKey, lastVersion, lastestTime, rowList);
|
||||
return updateRequest;
|
||||
}
|
||||
|
||||
public PartitionRange.PartitionKeyType getKeyFromRow(byte[] row, int index, Type type) {
|
||||
PartitionRange.PartitionKeyType key = new PartitionRange.PartitionKeyType();
|
||||
ByteBuffer buf = ByteBuffer.wrap(row);
|
||||
int len;
|
||||
for (int i = 0; i <= index; i++) {
|
||||
len = buf.get();
|
||||
if (i < index) {
|
||||
buf.position(buf.position() + len);
|
||||
}
|
||||
if (i == index) {
|
||||
byte[] content = Arrays.copyOfRange(buf.array(), buf.position(), buf.position() + len);
|
||||
String str = new String(content);
|
||||
key.init(type, str.toString());
|
||||
}
|
||||
}
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rowbatch split to Row
|
||||
*/
|
||||
public CacheBeProxy.UpdateCacheRequest buildPartitionUpdateRequest(String sql) {
|
||||
if (updateRequest == null) {
|
||||
updateRequest = new CacheBeProxy.UpdateCacheRequest(sql);
|
||||
}
|
||||
HashMap<Long, List<byte[]>> partRowMap = new HashMap<>();
|
||||
List<byte[]> partitionRowList;
|
||||
PartitionRange.PartitionKeyType cacheKey;
|
||||
for (byte[] row : rowList) {
|
||||
cacheKey = getKeyFromRow(row, keyIndex, keyType);
|
||||
if (!cachePartMap.containsKey(cacheKey.realValue())) {
|
||||
LOG.info("cant find partition key {}", cacheKey.realValue());
|
||||
continue;
|
||||
}
|
||||
if (!partRowMap.containsKey(cacheKey.realValue())) {
|
||||
partitionRowList = Lists.newArrayList();
|
||||
partitionRowList.add(row);
|
||||
partRowMap.put(cacheKey.realValue(), partitionRowList);
|
||||
} else {
|
||||
partRowMap.get(cacheKey).add(row);
|
||||
}
|
||||
}
|
||||
|
||||
for (HashMap.Entry<Long, List<byte[]>> entry : partRowMap.entrySet()) {
|
||||
Long key = entry.getKey();
|
||||
PartitionRange.PartitionSingle partition = cachePartMap.get(key);
|
||||
partitionRowList = entry.getValue();
|
||||
updateRequest.addValue(key, partition.getPartition().getVisibleVersion(),
|
||||
partition.getPartition().getVisibleVersionTime(), partitionRowList);
|
||||
}
|
||||
return updateRequest;
|
||||
}
|
||||
}
|
||||
80
fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
vendored
Normal file
80
fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
vendored
Normal file
@ -0,0 +1,80 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.qe.cache;
|
||||
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.common.Status;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.qe.RowBatch;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
public class SqlCache extends Cache {
|
||||
private static final Logger LOG = LogManager.getLogger(SqlCache.class);
|
||||
|
||||
public SqlCache(TUniqueId queryId, SelectStmt selectStmt) {
|
||||
super(queryId, selectStmt);
|
||||
}
|
||||
|
||||
public void setCacheInfo(CacheAnalyzer.CacheTable latestTable) {
|
||||
this.latestTable = latestTable;
|
||||
}
|
||||
|
||||
public CacheProxy.FetchCacheResult getCacheData(Status status) {
|
||||
CacheProxy.FetchCacheRequest request = new CacheProxy.FetchCacheRequest(selectStmt.toSql());
|
||||
request.addParam(latestTable.latestPartitionId, latestTable.latestVersion,
|
||||
latestTable.latestTime);
|
||||
CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
|
||||
if (status.ok() && cacheResult != null) {
|
||||
cacheResult.all_count = 1;
|
||||
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);
|
||||
hitRange = HitRange.Full;
|
||||
}
|
||||
return cacheResult;
|
||||
}
|
||||
|
||||
public SelectStmt getRewriteStmt() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void copyRowBatch(RowBatch rowBatch) {
|
||||
if (rowBatchBuilder == null) {
|
||||
rowBatchBuilder = new RowBatchBuilder(CacheAnalyzer.CacheMode.Sql);
|
||||
}
|
||||
rowBatchBuilder.copyRowData(rowBatch);
|
||||
}
|
||||
|
||||
public void updateCache() {
|
||||
if (!super.checkRowLimit()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildSqlUpdateRequest(selectStmt.toSql(),
|
||||
latestTable.latestPartitionId, latestTable.latestVersion, latestTable.latestTime);
|
||||
if (updateRequest.value_count > 0) {
|
||||
CacheBeProxy proxy = new CacheBeProxy();
|
||||
Status status = new Status();
|
||||
proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status);
|
||||
LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}",
|
||||
CacheAnalyzer.CacheMode.Sql, DebugUtil.printId(queryId), DebugUtil.printId(updateRequest.sql_key),
|
||||
updateRequest.value_count, updateRequest.row_count, updateRequest.data_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,851 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.qe;
|
||||
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.SqlParserUtils;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.thrift.TStorageType;
|
||||
|
||||
import org.apache.doris.qe.ConnectScheduler;
|
||||
import org.apache.doris.qe.cache.Cache;
|
||||
import org.apache.doris.qe.cache.CacheCoordinator;
|
||||
import org.apache.doris.qe.cache.PartitionCache;
|
||||
import org.apache.doris.qe.cache.PartitionRange;
|
||||
import org.apache.doris.qe.cache.CacheAnalyzer;
|
||||
import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
|
||||
import org.apache.doris.qe.cache.RowBatchBuilder;
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.analysis.SqlParser;
|
||||
import org.apache.doris.analysis.SqlScanner;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.analysis.TupleId;
|
||||
import org.apache.doris.analysis.SetPassVar;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.proto.PUniqueId;
|
||||
import org.apache.doris.alter.SchemaChangeHandler;
|
||||
import org.apache.doris.catalog.BrokerMgr;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.MaterializedIndex.IndexState;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.RandomDistributionInfo;
|
||||
import org.apache.doris.catalog.SinglePartitionInfo;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.RangePartitionInfo;
|
||||
import org.apache.doris.catalog.DistributionInfo;
|
||||
import org.apache.doris.load.Load;
|
||||
import org.apache.doris.mysql.privilege.PaloAuth;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.mysql.privilege.MockedAuth;
|
||||
import org.apache.doris.mysql.MysqlChannel;
|
||||
import org.apache.doris.mysql.MysqlSerializer;
|
||||
import org.apache.doris.persist.EditLog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import mockit.Mocked;
|
||||
import mockit.Tested;
|
||||
import mockit.Injectable;
|
||||
import mockit.Expectations;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.StringReader;
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
|
||||
public class PartitionCacheTest {
|
||||
private static final Logger LOG = LogManager.getLogger(PartitionCacheTest.class);
|
||||
public static String clusterName = "testCluster";
|
||||
public static String dbName = "testDb";
|
||||
public static String fullDbName = "testCluster:testDb";
|
||||
public static String tableName = "testTbl";
|
||||
public static String userName = "testUser";
|
||||
|
||||
private static ConnectContext context;
|
||||
|
||||
private List<PartitionRange.PartitionSingle> newRangeList;
|
||||
private Cache.HitRange hitRange;
|
||||
private Analyzer analyzer;
|
||||
private Database db;
|
||||
|
||||
@Mocked
|
||||
private PaloAuth auth;
|
||||
@Mocked
|
||||
private SystemInfoService service;
|
||||
@Mocked
|
||||
private Catalog catalog;
|
||||
@Mocked
|
||||
private ConnectContext ctx;
|
||||
@Mocked
|
||||
MysqlChannel channel;
|
||||
@Mocked
|
||||
ConnectScheduler scheduler;
|
||||
|
||||
@BeforeClass
|
||||
public static void start() {
|
||||
MetricRepo.init();
|
||||
try {
|
||||
FrontendOptions.init();
|
||||
context = new ConnectContext(null);
|
||||
Config.cache_enable_sql_mode = true;
|
||||
Config.cache_enable_partition_mode = true;
|
||||
context.getSessionVariable().setEnableSqlCache(true);
|
||||
context.getSessionVariable().setEnablePartitionCache(true);
|
||||
Config.cache_last_version_interval_second = 7200;
|
||||
} catch (UnknownHostException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
MockedAuth.mockedAuth(auth);
|
||||
MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1");
|
||||
|
||||
db = new Database(1L, fullDbName);
|
||||
|
||||
OlapTable tbl1 = createOrderTable();
|
||||
OlapTable tbl2 = createProfileTable();
|
||||
OlapTable tbl3 = createEventTable();
|
||||
db.createTable(tbl1);
|
||||
db.createTable(tbl2);
|
||||
db.createTable(tbl3);
|
||||
|
||||
new Expectations(catalog) {
|
||||
{
|
||||
catalog.getAuth();
|
||||
minTimes = 0;
|
||||
result = auth;
|
||||
|
||||
Deencapsulation.invoke(Catalog.class, "getCurrentSystemInfo");
|
||||
minTimes = 0;
|
||||
result = service;
|
||||
|
||||
catalog.getDb(fullDbName);
|
||||
minTimes = 0;
|
||||
result = db;
|
||||
|
||||
catalog.getDb(dbName);
|
||||
minTimes = 0;
|
||||
result = db;
|
||||
|
||||
catalog.getDb(db.getId());
|
||||
minTimes = 0;
|
||||
result = db;
|
||||
|
||||
catalog.getDbNames();
|
||||
minTimes = 0;
|
||||
result = Lists.newArrayList(fullDbName);
|
||||
}
|
||||
};
|
||||
|
||||
QueryState state = new QueryState();
|
||||
channel.reset();
|
||||
|
||||
new Expectations(ctx) {
|
||||
{
|
||||
ctx.getMysqlChannel();
|
||||
minTimes = 0;
|
||||
result = channel;
|
||||
|
||||
ctx.getClusterName();
|
||||
minTimes = 0;
|
||||
result = clusterName;
|
||||
|
||||
ctx.getSerializer();
|
||||
minTimes = 0;
|
||||
result = MysqlSerializer.newInstance();
|
||||
|
||||
ctx.getCatalog();
|
||||
minTimes = 0;
|
||||
result = catalog;
|
||||
|
||||
ctx.getState();
|
||||
minTimes = 0;
|
||||
result = state;
|
||||
|
||||
ctx.getConnectScheduler();
|
||||
minTimes = 0;
|
||||
result = scheduler;
|
||||
|
||||
ctx.getConnectionId();
|
||||
minTimes = 0;
|
||||
result = 1;
|
||||
|
||||
ctx.getQualifiedUser();
|
||||
minTimes = 0;
|
||||
result = userName;
|
||||
|
||||
ctx.getForwardedStmtId();
|
||||
minTimes = 0;
|
||||
result = 123L;
|
||||
|
||||
ctx.setKilled();
|
||||
minTimes = 0;
|
||||
ctx.updateReturnRows(anyInt);
|
||||
minTimes = 0;
|
||||
ctx.setQueryId((TUniqueId) any);
|
||||
minTimes = 0;
|
||||
|
||||
ctx.queryId();
|
||||
minTimes = 0;
|
||||
result = new TUniqueId();
|
||||
|
||||
ctx.getStartTime();
|
||||
minTimes = 0;
|
||||
result = 0L;
|
||||
|
||||
ctx.getDatabase();
|
||||
minTimes = 0;
|
||||
result = dbName;
|
||||
|
||||
SessionVariable sessionVariable = new SessionVariable();
|
||||
ctx.getSessionVariable();
|
||||
minTimes = 0;
|
||||
result = sessionVariable;
|
||||
|
||||
ctx.setStmtId(anyLong);
|
||||
minTimes = 0;
|
||||
|
||||
ctx.getStmtId();
|
||||
minTimes = 0;
|
||||
result = 1L;
|
||||
}
|
||||
};
|
||||
|
||||
analyzer = new Analyzer(catalog, ctx);
|
||||
newRangeList = Lists.newArrayList();
|
||||
}
|
||||
|
||||
private void test1() {
|
||||
new Expectations(catalog) {
|
||||
{
|
||||
catalog.getAuth();
|
||||
result = auth;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private OlapTable createOrderTable() {
|
||||
Column column1 = new Column("date", ScalarType.INT);
|
||||
Column column2 = new Column("id", ScalarType.INT);
|
||||
Column column3 = new Column("value", ScalarType.INT);
|
||||
List<Column> columns = Lists.newArrayList(column1, column2, column3);
|
||||
|
||||
MaterializedIndex baseIndex = new MaterializedIndex(10001, IndexState.NORMAL);
|
||||
RandomDistributionInfo distInfo = new RandomDistributionInfo(10);
|
||||
|
||||
PartitionInfo partInfo = new RangePartitionInfo(Lists.newArrayList(column1));
|
||||
|
||||
Partition part12 = new Partition(20200112, "p20200112", baseIndex, distInfo);
|
||||
part12.setVisibleVersion(1,1578762000000L,1); //2020-01-12 1:00:00
|
||||
Partition part13 = new Partition(20200113, "p20200113", baseIndex, distInfo);
|
||||
part13.setVisibleVersion(1,1578848400000L,1); //2020-01-13 1:00:00
|
||||
Partition part14 = new Partition(20200114, "p20200114", baseIndex, distInfo);
|
||||
part14.setVisibleVersion(1,1578934800000L,1); //2020-01-14 1:00:00
|
||||
Partition part15 = new Partition(20200115, "p20200115", baseIndex, distInfo);
|
||||
part15.setVisibleVersion(2,1579053661000L,2); //2020-01-15 10:01:01
|
||||
|
||||
OlapTable table = new OlapTable(10000L, "order", columns,KeysType.DUP_KEYS, partInfo, distInfo);
|
||||
|
||||
short shortKeyColumnCount = 1;
|
||||
table.setIndexMeta(10001, "group1", columns, 1, 1, shortKeyColumnCount,TStorageType.COLUMN, KeysType.DUP_KEYS);
|
||||
|
||||
List<Column> idx_columns = Lists.newArrayList();
|
||||
idx_columns.add(column1);
|
||||
table.setIndexMeta(new Long(1), "test", idx_columns, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.DUP_KEYS);
|
||||
Deencapsulation.setField(table, "baseIndexId", 1000);
|
||||
|
||||
table.addPartition(part12);
|
||||
table.addPartition(part13);
|
||||
table.addPartition(part14);
|
||||
table.addPartition(part15);
|
||||
|
||||
return table;
|
||||
}
|
||||
|
||||
private ScanNode createOrderScanNode() {
|
||||
OlapTable table = createOrderTable();
|
||||
TupleDescriptor desc = new TupleDescriptor(new TupleId(10004));
|
||||
desc.setTable(table);
|
||||
ScanNode node = new OlapScanNode(new PlanNodeId(10008), desc, "ordernode");
|
||||
return node;
|
||||
}
|
||||
|
||||
private OlapTable createProfileTable() {
|
||||
Column column2 = new Column("eventdate", ScalarType.DATE);
|
||||
Column column3 = new Column("userid", ScalarType.INT);
|
||||
Column column4 = new Column("country", ScalarType.INT);
|
||||
List<Column> columns = Lists.newArrayList(column2, column3, column4);
|
||||
|
||||
MaterializedIndex baseIndex = new MaterializedIndex(20001, IndexState.NORMAL);
|
||||
RandomDistributionInfo distInfo = new RandomDistributionInfo(10);
|
||||
|
||||
PartitionInfo partInfo = new RangePartitionInfo(Lists.newArrayList(column2));
|
||||
|
||||
Partition part12 = new Partition(2020112, "p20200112", baseIndex, distInfo);
|
||||
part12.setVisibleVersion(1,1578762000000L,1); //2020-01-12 1:00:00
|
||||
Partition part13 = new Partition(2020113, "p20200113", baseIndex, distInfo);
|
||||
part13.setVisibleVersion(1,1578848400000L,1); //2020-01-13 1:00:00
|
||||
Partition part14 = new Partition(2020114, "p20200114", baseIndex, distInfo);
|
||||
part14.setVisibleVersion(1,1578934800000L,1); //2020-01-14 1:00:00
|
||||
Partition part15 = new Partition(2020115, "p20200115", baseIndex, distInfo);
|
||||
part15.setVisibleVersion(2,1579021200000L,2); //2020-01-15 1:00:00
|
||||
|
||||
OlapTable table = new OlapTable(20000L, "userprofile", columns,KeysType.AGG_KEYS, partInfo, distInfo);
|
||||
|
||||
short shortKeyColumnCount = 1;
|
||||
table.setIndexMeta(20001, "group1", columns, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.AGG_KEYS);
|
||||
|
||||
List<Column> idx_columns = Lists.newArrayList();
|
||||
idx_columns.add(column2);
|
||||
table.setIndexMeta(new Long(2), "test", idx_columns, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.AGG_KEYS);
|
||||
|
||||
Deencapsulation.setField(table, "baseIndexId", 1000);
|
||||
|
||||
table.addPartition(part12);
|
||||
table.addPartition(part13);
|
||||
table.addPartition(part14);
|
||||
table.addPartition(part15);
|
||||
|
||||
return table;
|
||||
}
|
||||
|
||||
private ScanNode createProfileScanNode(){
|
||||
OlapTable table = createProfileTable();
|
||||
TupleDescriptor desc = new TupleDescriptor(new TupleId(20004));
|
||||
desc.setTable(table);
|
||||
ScanNode node = new OlapScanNode(new PlanNodeId(20008), desc, "userprofilenode");
|
||||
return node;
|
||||
}
|
||||
|
||||
/**
|
||||
* table appevent(date(pk), userid, eventid, eventtime), stream load every 5 miniutes
|
||||
*/
|
||||
private OlapTable createEventTable() {
|
||||
Column column1 = new Column("eventdate", ScalarType.DATE);
|
||||
Column column2 = new Column("userid", ScalarType.INT);
|
||||
Column column3 = new Column("eventid", ScalarType.INT);
|
||||
Column column4 = new Column("eventtime", ScalarType.DATETIME);
|
||||
List<Column> columns = Lists.newArrayList(column1, column2, column3,column4);
|
||||
PartitionInfo partInfo = new RangePartitionInfo(Lists.newArrayList(column1));
|
||||
MaterializedIndex baseIndex = new MaterializedIndex(30001, IndexState.NORMAL);
|
||||
RandomDistributionInfo distInfo = new RandomDistributionInfo(10);
|
||||
|
||||
Partition part12 = new Partition(20200112, "p20200112", baseIndex, distInfo);
|
||||
part12.setVisibleVersion(1,1578762000000L,1); //2020-01-12 1:00:00
|
||||
Partition part13 = new Partition(20200113, "p20200113", baseIndex, distInfo);
|
||||
part13.setVisibleVersion(1,1578848400000L,1); //2020-01-13 1:00:00
|
||||
Partition part14 = new Partition(20200114, "p20200114", baseIndex, distInfo);
|
||||
part14.setVisibleVersion(1,1578934800000L,1); //2020-01-14 1:00:00
|
||||
Partition part15 = new Partition(20200115, "p20200115", baseIndex, distInfo);
|
||||
part15.setVisibleVersion(2,1579053661000L,2); //2020-01-15 10:01:01
|
||||
|
||||
OlapTable table = new OlapTable(30000L, "appevent", columns,KeysType.DUP_KEYS, partInfo, distInfo);
|
||||
|
||||
short shortKeyColumnCount = 1;
|
||||
table.setIndexMeta(30001, "group1", columns, 1,1,shortKeyColumnCount,TStorageType.COLUMN, KeysType.AGG_KEYS);
|
||||
|
||||
List<Column> column = Lists.newArrayList();
|
||||
column.add(column1);
|
||||
|
||||
table.setIndexMeta(new Long(2), "test", column, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.AGG_KEYS);
|
||||
Deencapsulation.setField(table, "baseIndexId", 1000);
|
||||
|
||||
table.addPartition(part12);
|
||||
table.addPartition(part13);
|
||||
table.addPartition(part14);
|
||||
table.addPartition(part15);
|
||||
|
||||
return table;
|
||||
}
|
||||
|
||||
private ScanNode createEventScanNode(){
|
||||
OlapTable table = createEventTable();
|
||||
TupleDescriptor desc = new TupleDescriptor(new TupleId(30002));
|
||||
desc.setTable(table);
|
||||
ScanNode node = new OlapScanNode(new PlanNodeId(30004), desc, "appeventnode");
|
||||
return node;
|
||||
}
|
||||
|
||||
private StatementBase parseSql(String sql){
|
||||
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(sql)));
|
||||
StatementBase parseStmt = null;
|
||||
try {
|
||||
parseStmt = SqlParserUtils.getFirstStmt(parser);
|
||||
parseStmt.analyze(analyzer);
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("Part,an_ex={}", e);
|
||||
Assert.fail(e.getMessage());
|
||||
} catch (UserException e) {
|
||||
LOG.warn("Part,ue_ex={}", e);
|
||||
Assert.fail(e.getMessage());
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Part,cm_ex={}", e);
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
return parseStmt;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheNode() throws Exception {
|
||||
CacheCoordinator cp = CacheCoordinator.getInstance();
|
||||
cp.DebugModel = true;
|
||||
Backend bd1 = new Backend(1, "", 1000);
|
||||
bd1.updateOnce(0,0,0);
|
||||
Backend bd2 = new Backend(2, "", 2000);
|
||||
bd2.updateOnce(0,0,0);
|
||||
Backend bd3 = new Backend(3, "", 3000);
|
||||
bd3.updateOnce(0,0,0);
|
||||
cp.addBackend(bd1);
|
||||
cp.addBackend(bd2);
|
||||
cp.addBackend(bd3);
|
||||
|
||||
PUniqueId key1 = new PUniqueId();
|
||||
key1.hi = 1L;
|
||||
key1.lo = 1L;
|
||||
Backend bk = cp.findBackend(key1);
|
||||
Assert.assertNotNull(bk);
|
||||
Assert.assertEquals(bk.getId(),3);
|
||||
|
||||
key1.hi = 669560558156283345L;
|
||||
key1.lo = 1L;
|
||||
bk = cp.findBackend(key1);
|
||||
Assert.assertNotNull(bk);
|
||||
Assert.assertEquals(bk.getId(),1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheModeNone() throws Exception {
|
||||
StatementBase parseStmt = parseSql("select @@version_comment limit 1");
|
||||
List<ScanNode> scanNodes = Lists.newArrayList();
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes);
|
||||
ca.checkCacheMode(0);
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.NoNeed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheModeTable() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT country, COUNT(userid) FROM userprofile GROUP BY country"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createProfileScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes);
|
||||
ca.checkCacheMode(0);
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.Sql);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithinMinTime() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT country, COUNT(userid) FROM userprofile GROUP BY country"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createProfileScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes);
|
||||
ca.checkCacheMode(1579024800000L); //2020-1-15 02:00:00
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.None);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionModel() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT eventdate, COUNT(DISTINCT userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " +
|
||||
"eventdate<=\"2020-01-15\" GROUP BY eventdate"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createEventScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes);
|
||||
ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseByte() throws Exception {
|
||||
RowBatchBuilder sb = new RowBatchBuilder(CacheMode.Partition);
|
||||
byte[] buffer = new byte[]{10, 50, 48, 50, 48, 45, 48, 51, 45, 49, 48, 1, 51, 2, 67, 78};
|
||||
PartitionRange.PartitionKeyType key1 = sb.getKeyFromRow(buffer, 0, Type.DATE);
|
||||
LOG.info("real value key1 {}",key1.realValue());
|
||||
Assert.assertEquals(key1.realValue(), 20200310);
|
||||
PartitionRange.PartitionKeyType key2 = sb.getKeyFromRow(buffer, 1, Type.INT);
|
||||
LOG.info("real value key2 {}",key2.realValue());
|
||||
Assert.assertEquals(key2.realValue(), 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionIntTypeSql() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT `date`, COUNT(id) FROM `order` WHERE `date`>=20200112 and `date`<=20200115 GROUP BY date"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createOrderScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes);
|
||||
ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first
|
||||
try {
|
||||
PartitionCache cache = (PartitionCache) ca.getCache();
|
||||
cache.rewriteSelectStmt(null);
|
||||
Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null);
|
||||
|
||||
PartitionRange range = cache.getPartitionRange();
|
||||
boolean flag = range.analytics();
|
||||
Assert.assertEquals(flag, true);
|
||||
|
||||
int size = range.getPartitionSingleList().size();
|
||||
LOG.warn("Rewrite partition range size={}", size);
|
||||
Assert.assertEquals(size, 4);
|
||||
|
||||
String sql;
|
||||
range.setCacheFlag(20200112L); //get data from cache
|
||||
range.setCacheFlag(20200113L); //get data from cache
|
||||
|
||||
hitRange = range.buildDiskPartitionRange(newRangeList);
|
||||
Assert.assertEquals(hitRange, Cache.HitRange.Left);
|
||||
Assert.assertEquals(newRangeList.size(), 2);
|
||||
Assert.assertEquals(newRangeList.get(0).getCacheKey().realValue(), 20200114);
|
||||
Assert.assertEquals(newRangeList.get(1).getCacheKey().realValue(), 20200115);
|
||||
|
||||
cache.rewriteSelectStmt(newRangeList);
|
||||
sql = ca.getRewriteStmt().getWhereClause().toSql();
|
||||
Assert.assertEquals(sql, "(`date` >= 20200114) AND (`date` <= 20200115)");
|
||||
} catch (Exception e) {
|
||||
LOG.warn("ex={}", e);
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleCacheSql() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-15\" GROUP BY eventdate"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createEventScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes);
|
||||
ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first
|
||||
SelectStmt selectStmt = (SelectStmt) parseStmt;
|
||||
|
||||
try{
|
||||
PartitionCache cache = (PartitionCache) ca.getCache();
|
||||
cache.rewriteSelectStmt(null);
|
||||
Assert.assertEquals(cache.getNokeyStmt().getWhereClause(),null);
|
||||
|
||||
PartitionRange range = cache.getPartitionRange();
|
||||
boolean flag = range.analytics();
|
||||
Assert.assertEquals(flag,true);
|
||||
|
||||
int size = range.getPartitionSingleList().size();
|
||||
LOG.warn("Rewrite partition range size={}", size);
|
||||
Assert.assertEquals(size, 4);
|
||||
|
||||
String sql;
|
||||
range.setCacheFlag(20200112L); //get data from cache
|
||||
range.setCacheFlag(20200113L); //get data from cache
|
||||
|
||||
hitRange = range.buildDiskPartitionRange(newRangeList);
|
||||
cache.rewriteSelectStmt(newRangeList);
|
||||
sql = ca.getRewriteStmt().getWhereClause().toSql();
|
||||
Assert.assertEquals(sql,"(`eventdate` >= '2020-01-14') AND (`eventdate` <= '2020-01-15')");
|
||||
} catch(Exception e){
|
||||
LOG.warn("ex={}",e);
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHitPartPartition() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-14\" GROUP BY eventdate"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createEventScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes);
|
||||
ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first
|
||||
|
||||
try {
|
||||
PartitionCache cache = (PartitionCache) ca.getCache();
|
||||
|
||||
cache.rewriteSelectStmt(null);
|
||||
Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null);
|
||||
|
||||
PartitionRange range = cache.getPartitionRange();
|
||||
boolean flag = range.analytics();
|
||||
Assert.assertEquals(flag, true);
|
||||
|
||||
int size = range.getPartitionSingleList().size();
|
||||
LOG.warn("Rewrite partition range size={}", size);
|
||||
Assert.assertEquals(size, 3);
|
||||
|
||||
String sql;
|
||||
range.setCacheFlag(20200113);
|
||||
range.setCacheFlag(20200114);
|
||||
|
||||
hitRange = range.buildDiskPartitionRange(newRangeList);
|
||||
Assert.assertEquals(hitRange,Cache.HitRange.Right);
|
||||
Assert.assertEquals(newRangeList.size(), 2);
|
||||
Assert.assertEquals(newRangeList.get(0).getCacheKey().realValue(), 20200112);
|
||||
Assert.assertEquals(newRangeList.get(1).getCacheKey().realValue(), 20200112);
|
||||
|
||||
List<PartitionRange.PartitionSingle> updateRangeList = range.buildUpdatePartitionRange();
|
||||
Assert.assertEquals(updateRangeList.size(), 1);
|
||||
Assert.assertEquals(updateRangeList.get(0).getCacheKey().realValue(), 20200112);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("ex={}", e);
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoUpdatePartition() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-14\" GROUP BY eventdate"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createEventScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes);
|
||||
ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first
|
||||
|
||||
try {
|
||||
PartitionCache cache = (PartitionCache) ca.getCache();
|
||||
|
||||
cache.rewriteSelectStmt(null);
|
||||
Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null);
|
||||
|
||||
PartitionRange range = cache.getPartitionRange();
|
||||
boolean flag = range.analytics();
|
||||
Assert.assertEquals(flag, true);
|
||||
|
||||
int size = range.getPartitionSingleList().size();
|
||||
LOG.warn("Rewrite partition range size={}", size);
|
||||
Assert.assertEquals(size, 3);
|
||||
|
||||
String sql;
|
||||
range.setCacheFlag(20200112); //get data from cache
|
||||
range.setCacheFlag(20200113);
|
||||
range.setCacheFlag(20200114);
|
||||
|
||||
hitRange = range.buildDiskPartitionRange(newRangeList);
|
||||
Assert.assertEquals(hitRange, Cache.HitRange.Full);
|
||||
Assert.assertEquals(newRangeList.size(), 0);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("ex={}", e);
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUpdatePartition() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-15\" GROUP BY eventdate"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createEventScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes);
|
||||
ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first
|
||||
|
||||
try {
|
||||
PartitionCache cache = (PartitionCache) ca.getCache();
|
||||
|
||||
cache.rewriteSelectStmt(null);
|
||||
Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null);
|
||||
|
||||
PartitionRange range = cache.getPartitionRange();
|
||||
boolean flag = range.analytics();
|
||||
Assert.assertEquals(flag, true);
|
||||
|
||||
int size = range.getPartitionSingleList().size();
|
||||
LOG.warn("Rewrite partition range size={}", size);
|
||||
Assert.assertEquals(size, 4);
|
||||
|
||||
String sql;
|
||||
range.setCacheFlag(20200112L); //get data from cache
|
||||
range.setTooNewByKey(20200115);
|
||||
|
||||
range.buildDiskPartitionRange(newRangeList);
|
||||
Assert.assertEquals(newRangeList.size(), 2);
|
||||
cache.rewriteSelectStmt(newRangeList);
|
||||
|
||||
sql = ca.getRewriteStmt().getWhereClause().toSql();
|
||||
Assert.assertEquals(sql, "(`eventdate` >= '2020-01-13') AND (`eventdate` <= '2020-01-15')");
|
||||
|
||||
List<PartitionRange.PartitionSingle> updateRangeList = range.buildUpdatePartitionRange();
|
||||
Assert.assertEquals(updateRangeList.size(), 2);
|
||||
Assert.assertEquals(updateRangeList.get(0).getCacheKey().realValue(), 20200113);
|
||||
Assert.assertEquals(updateRangeList.get(1).getCacheKey().realValue(), 20200114);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("ex={}", e);
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRewriteMultiPredicate1() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>\"2020-01-11\" and eventdate<\"2020-01-16\"" +
|
||||
" and eventid=1 GROUP BY eventdate"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createEventScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes);
|
||||
ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first
|
||||
try{
|
||||
PartitionCache cache = (PartitionCache) ca.getCache();
|
||||
|
||||
cache.rewriteSelectStmt(null);
|
||||
LOG.warn("Nokey multi={}", cache.getNokeyStmt().getWhereClause().toSql());
|
||||
Assert.assertEquals(cache.getNokeyStmt().getWhereClause().toSql(),"`eventid` = 1");
|
||||
|
||||
PartitionRange range = cache.getPartitionRange();
|
||||
boolean flag = range.analytics();
|
||||
Assert.assertEquals(flag,true);
|
||||
|
||||
int size = range.getPartitionSingleList().size();
|
||||
Assert.assertEquals(size, 4);
|
||||
|
||||
String sql;
|
||||
range.setCacheFlag(20200112L); //get data from cache
|
||||
range.setCacheFlag(20200113L); //get data from cache
|
||||
|
||||
range.buildDiskPartitionRange(newRangeList);
|
||||
|
||||
cache.rewriteSelectStmt(newRangeList);
|
||||
sql = ca.getRewriteStmt().getWhereClause().toSql();
|
||||
LOG.warn("MultiPredicate={}", sql);
|
||||
Assert.assertEquals(sql,"((`eventdate` > '2020-01-13') AND (`eventdate` < '2020-01-16')) AND (`eventid` = 1)");
|
||||
} catch(Exception e){
|
||||
LOG.warn("multi ex={}",e);
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRewriteJoin() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT appevent.eventdate, country, COUNT(appevent.userid) FROM appevent" +
|
||||
" INNER JOIN userprofile ON appevent.userid = userprofile.userid" +
|
||||
" WHERE appevent.eventdate>=\"2020-01-12\" and appevent.eventdate<=\"2020-01-15\"" +
|
||||
" and eventid=1 GROUP BY appevent.eventdate, country"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createEventScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes);
|
||||
ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first
|
||||
try{
|
||||
PartitionCache cache = (PartitionCache) ca.getCache();
|
||||
cache.rewriteSelectStmt(null);
|
||||
LOG.warn("Join nokey={}", cache.getNokeyStmt().getWhereClause().toSql());
|
||||
Assert.assertEquals(cache.getNokeyStmt().getWhereClause().toSql(),"`eventid` = 1");
|
||||
|
||||
PartitionRange range = cache.getPartitionRange();
|
||||
boolean flag = range.analytics();
|
||||
Assert.assertEquals(flag,true);
|
||||
|
||||
int size = range.getPartitionSingleList().size();
|
||||
Assert.assertEquals(size, 4);
|
||||
|
||||
String sql;
|
||||
range.setCacheFlag(20200112L); //get data from cache
|
||||
range.setCacheFlag(20200113L); //get data from cache
|
||||
|
||||
range.buildDiskPartitionRange(newRangeList);
|
||||
|
||||
cache.rewriteSelectStmt(newRangeList);
|
||||
sql = ca.getRewriteStmt().getWhereClause().toSql();
|
||||
LOG.warn("Join rewrite={}", sql);
|
||||
Assert.assertEquals(sql,"((`appevent`.`eventdate` >= '2020-01-14')" +
|
||||
" AND (`appevent`.`eventdate` <= '2020-01-15')) AND (`eventid` = 1)");
|
||||
} catch(Exception e){
|
||||
LOG.warn("Join ex={}",e);
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubSelect() throws Exception {
|
||||
StatementBase parseStmt = parseSql(
|
||||
"SELECT eventdate, sum(pv) FROM (SELECT eventdate, COUNT(userid) AS pv FROM appevent WHERE eventdate>\"2020-01-11\" AND eventdate<\"2020-01-16\"" +
|
||||
" AND eventid=1 GROUP BY eventdate) tbl GROUP BY eventdate"
|
||||
);
|
||||
List<ScanNode> scanNodes = Lists.newArrayList(createEventScanNode());
|
||||
CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes);
|
||||
ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01
|
||||
Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first
|
||||
try{
|
||||
PartitionCache cache = (PartitionCache) ca.getCache();
|
||||
|
||||
cache.rewriteSelectStmt(null);
|
||||
LOG.warn("Sub nokey={}", cache.getNokeyStmt().toSql());
|
||||
Assert.assertEquals(cache.getNokeyStmt().toSql(),"SELECT <slot 7> `eventdate` AS `eventdate`, <slot 8> sum(`pv`) AS `sum(``pv``)` FROM (" +
|
||||
"SELECT <slot 3> `eventdate` AS `eventdate`, <slot 4> count(`userid`) AS `pv` FROM `testCluster:testDb`.`appevent` WHERE `eventid` = 1" +
|
||||
" GROUP BY `eventdate`) tbl GROUP BY `eventdate`");
|
||||
|
||||
PartitionRange range = cache.getPartitionRange();
|
||||
boolean flag = range.analytics();
|
||||
Assert.assertEquals(flag,true);
|
||||
|
||||
int size = range.getPartitionSingleList().size();
|
||||
Assert.assertEquals(size, 4);
|
||||
|
||||
String sql;
|
||||
range.setCacheFlag(20200112L); //get data from cache
|
||||
range.setCacheFlag(20200113L); //get data from cache
|
||||
|
||||
range.buildDiskPartitionRange(newRangeList);
|
||||
|
||||
cache.rewriteSelectStmt(newRangeList);
|
||||
sql = ca.getRewriteStmt().toSql();
|
||||
LOG.warn("Sub rewrite={}", sql);
|
||||
Assert.assertEquals(sql,"SELECT <slot 7> `eventdate` AS `eventdate`, <slot 8> sum(`pv`) AS `sum(``pv``)` FROM (" +
|
||||
"SELECT <slot 3> `eventdate` AS `eventdate`, <slot 4> count(`userid`) AS `pv` FROM `testCluster:testDb`.`appevent` WHERE " +
|
||||
"((`eventdate` > '2020-01-13') AND (`eventdate` < '2020-01-16')) AND (`eventid` = 1) GROUP BY `eventdate`) tbl GROUP BY `eventdate`");
|
||||
} catch(Exception e){
|
||||
LOG.warn("sub ex={}",e);
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user