[WIP](row store) two phase opt read row store (#18654)
This commit is contained in:
@ -331,7 +331,6 @@ public class CreateTableStmt extends DdlStmt {
|
||||
}
|
||||
|
||||
boolean enableUniqueKeyMergeOnWrite = false;
|
||||
boolean enableStoreRowColumn = false;
|
||||
// analyze key desc
|
||||
if (engineName.equalsIgnoreCase("olap")) {
|
||||
// olap table
|
||||
@ -401,7 +400,6 @@ public class CreateTableStmt extends DdlStmt {
|
||||
throw new AnalysisException(
|
||||
PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE + " property only support unique key table");
|
||||
}
|
||||
|
||||
if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
|
||||
enableUniqueKeyMergeOnWrite = true;
|
||||
if (properties != null) {
|
||||
@ -409,7 +407,6 @@ public class CreateTableStmt extends DdlStmt {
|
||||
// so we just clone a properties map here.
|
||||
enableUniqueKeyMergeOnWrite = PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(
|
||||
new HashMap<>(properties));
|
||||
enableStoreRowColumn = PropertyAnalyzer.analyzeStoreRowColumn(new HashMap<>(properties));
|
||||
}
|
||||
}
|
||||
|
||||
@ -455,7 +452,8 @@ public class CreateTableStmt extends DdlStmt {
|
||||
columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE));
|
||||
}
|
||||
}
|
||||
if (enableStoreRowColumn) {
|
||||
// add a hidden column as row store
|
||||
if (properties != null && PropertyAnalyzer.analyzeStoreRowColumn(new HashMap<>(properties))) {
|
||||
columnDefs.add(ColumnDef.newRowStoreColumnDef());
|
||||
}
|
||||
if (Config.enable_hidden_version_column_by_default && keysDesc != null
|
||||
|
||||
@ -658,7 +658,10 @@ public class SelectStmt extends QueryStmt {
|
||||
Set<SlotRef> orderingSlots = Sets.newHashSet();
|
||||
Set<SlotRef> conjuntSlots = Sets.newHashSet();
|
||||
TreeNode.collect(resultExprs, Predicates.instanceOf(SlotRef.class), resultSlots);
|
||||
TreeNode.collect(sortInfo.getOrderingExprs(), Predicates.instanceOf(SlotRef.class), orderingSlots);
|
||||
if (sortInfo != null) {
|
||||
TreeNode.collect(sortInfo.getOrderingExprs(),
|
||||
Predicates.instanceOf(SlotRef.class), orderingSlots);
|
||||
}
|
||||
if (whereClause != null) {
|
||||
whereClause.collect(SlotRef.class, conjuntSlots);
|
||||
}
|
||||
@ -714,20 +717,23 @@ public class SelectStmt extends QueryStmt {
|
||||
|| !ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
|
||||
return false;
|
||||
}
|
||||
if (!evaluateOrderBy) {
|
||||
// Need evaluate orderby, if sort node was eliminated then this optmization
|
||||
// could be useless
|
||||
return false;
|
||||
}
|
||||
// Only handle the simplest `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...` query
|
||||
// Only handle the simplest `SELECT ... FROM <tbl> WHERE ... [ORDER BY ...] [LIMIT ...]` query
|
||||
if (getAggInfo() != null
|
||||
|| getHavingPred() != null
|
||||
|| getWithClause() != null
|
||||
|| getAnalyticInfo() != null) {
|
||||
return false;
|
||||
}
|
||||
// ignore short circuit query
|
||||
if (isPointQueryShortCircuit()) {
|
||||
return false;
|
||||
}
|
||||
// ignore insert into select
|
||||
if (fromInsert) {
|
||||
return false;
|
||||
}
|
||||
// ensure no sub query
|
||||
if (!analyzer.isRootAnalyzer()) {
|
||||
// ensure no sub query
|
||||
return false;
|
||||
}
|
||||
// If select stmt has inline view or this is an inline view query stmt analyze call
|
||||
@ -750,24 +756,35 @@ public class SelectStmt extends QueryStmt {
|
||||
if (!olapTable.getEnableLightSchemaChange()) {
|
||||
return false;
|
||||
}
|
||||
// Only TOPN query at present
|
||||
if (getOrderByElements() == null
|
||||
|| !hasLimit()
|
||||
|| getLimit() <= 0
|
||||
|| getLimit() > ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
|
||||
return false;
|
||||
}
|
||||
// Check order by exprs are all slot refs
|
||||
// Rethink? implement more generic to support all exprs
|
||||
LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs());
|
||||
LOG.debug("getOrderByElements {}", getOrderByElements());
|
||||
for (Expr sortExpr : sortInfo.getOrderingExprs()) {
|
||||
if (!(sortExpr instanceof SlotRef)) {
|
||||
if (getOrderByElements() != null) {
|
||||
if (!evaluateOrderBy) {
|
||||
// Need evaluate orderby, if sort node was eliminated then this optmization
|
||||
// could be useless
|
||||
return false;
|
||||
}
|
||||
// case1: general topn query, like: select * from tbl where xxx order by yyy limit n
|
||||
if (!hasLimit()
|
||||
|| getLimit() <= 0
|
||||
|| getLimit() > ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
|
||||
return false;
|
||||
}
|
||||
// Check order by exprs are all slot refs
|
||||
// Rethink? implement more generic to support all exprs
|
||||
LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs());
|
||||
LOG.debug("getOrderByElements {}", getOrderByElements());
|
||||
for (Expr sortExpr : sortInfo.getOrderingExprs()) {
|
||||
if (!(sortExpr instanceof SlotRef)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
isTwoPhaseOptEnabled = true;
|
||||
return true;
|
||||
} else {
|
||||
// case2: optimize scan utilize row store column, query like select * from tbl where xxx [limit xxx]
|
||||
// TODO: we only optimize query with select * at present
|
||||
return olapTable.storeRowColumn() && selectList.getItems().stream().anyMatch(e -> e.isStar());
|
||||
}
|
||||
isTwoPhaseOptEnabled = true;
|
||||
return true;
|
||||
// return false;
|
||||
}
|
||||
|
||||
public List<TupleId> getTableRefIds() {
|
||||
|
||||
@ -38,6 +38,7 @@ import org.apache.doris.analysis.TableRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.analysis.TupleId;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Function.NullableMode;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
@ -134,6 +135,7 @@ import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.PlanNode;
|
||||
import org.apache.doris.planner.RepeatNode;
|
||||
import org.apache.doris.planner.ResultSink;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.planner.SchemaScanNode;
|
||||
import org.apache.doris.planner.SelectNode;
|
||||
@ -146,6 +148,7 @@ import org.apache.doris.planner.external.HudiScanNode;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.tablefunction.TableValuedFunctionIf;
|
||||
import org.apache.doris.thrift.TFetchOption;
|
||||
import org.apache.doris.thrift.TPartitionType;
|
||||
import org.apache.doris.thrift.TPushAggOp;
|
||||
|
||||
@ -200,6 +203,47 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
this.statsErrorEstimator = statsErrorEstimator;
|
||||
}
|
||||
|
||||
// We use two phase read to optimize sql like: select * from tbl [where xxx = ???] [order by column1] [limit n]
|
||||
// in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes
|
||||
// in the second phase, we have n rows, we do a fetch rpc to get all rowids data for the n rows
|
||||
// and reconconstruct the final block
|
||||
private void setResultSinkFetchOptionIfNeed() {
|
||||
boolean needFetch = false;
|
||||
// Only single olap table should be fetched
|
||||
OlapTable fetchOlapTable = null;
|
||||
for (PlanFragment fragment : context.getPlanFragments()) {
|
||||
PlanNode node = fragment.getPlanRoot();
|
||||
PlanNode parent = null;
|
||||
// OlapScanNode is the last node.
|
||||
// So, just get the last two node and check if they are SortNode and OlapScan.
|
||||
while (node.getChildren().size() != 0) {
|
||||
parent = node;
|
||||
node = node.getChildren().get(0);
|
||||
}
|
||||
|
||||
// case1: general topn optimized query
|
||||
if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) {
|
||||
SortNode sortNode = (SortNode) parent;
|
||||
OlapScanNode scanNode = (OlapScanNode) node;
|
||||
if (sortNode.getUseTwoPhaseReadOpt()) {
|
||||
needFetch = true;
|
||||
fetchOlapTable = scanNode.getOlapTable();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (PlanFragment fragment : context.getPlanFragments()) {
|
||||
if (needFetch && fragment.getSink() instanceof ResultSink) {
|
||||
TFetchOption fetchOption = new TFetchOption();
|
||||
fetchOption.setFetchRowStore(fetchOlapTable.storeRowColumn());
|
||||
fetchOption.setUseTwoPhaseFetch(true);
|
||||
fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo());
|
||||
((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Translate Nereids Physical Plan tree to Stale Planner PlanFragment tree.
|
||||
*
|
||||
@ -236,6 +280,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
for (PlanFragment fragment : context.getPlanFragments()) {
|
||||
fragment.finalize(null);
|
||||
}
|
||||
setResultSinkFetchOptionIfNeed();
|
||||
Collections.reverse(context.getPlanFragments());
|
||||
context.getDescTable().computeMemLayout();
|
||||
return rootFragment;
|
||||
|
||||
@ -24,17 +24,12 @@ import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.SortInfo;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.analysis.TupleId;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.statistics.StatsRecursiveDerive;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TExchangeNode;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TNodeInfo;
|
||||
import org.apache.doris.thrift.TPaloNodesInfo;
|
||||
import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
|
||||
@ -150,9 +145,6 @@ public class ExchangeNode extends PlanNode {
|
||||
}
|
||||
if (mergeInfo != null) {
|
||||
msg.exchange_node.setSortInfo(mergeInfo.toThrift());
|
||||
if (mergeInfo.useTwoPhaseRead()) {
|
||||
msg.exchange_node.setNodesInfo(createNodesInfo());
|
||||
}
|
||||
}
|
||||
msg.exchange_node.setOffset(offset);
|
||||
}
|
||||
@ -174,18 +166,4 @@ public class ExchangeNode extends PlanNode {
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
return prefix + "offset: " + offset + "\n";
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the parameters used to fetch data by rowid column
|
||||
* after init().
|
||||
*/
|
||||
private TPaloNodesInfo createNodesInfo() {
|
||||
TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
|
||||
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
|
||||
for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) {
|
||||
Backend backend = systemInfoService.getBackend(id);
|
||||
nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
|
||||
}
|
||||
return nodesInfo;
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,6 +35,7 @@ import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Table;
|
||||
@ -51,6 +52,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
|
||||
import org.apache.doris.statistics.query.StatsDelta;
|
||||
import org.apache.doris.thrift.TFetchOption;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
import org.apache.doris.thrift.TRuntimeFilterMode;
|
||||
|
||||
@ -305,6 +307,10 @@ public class OriginalPlanner extends Planner {
|
||||
// Double check this plan to ensure it's a general topn query
|
||||
injectRowIdColumnSlot();
|
||||
((SortNode) singleNodePlan).setUseTwoPhaseReadOpt(true);
|
||||
} else if (singleNodePlan instanceof OlapScanNode && singleNodePlan.getChildren().size() == 0) {
|
||||
// Optimize query like `SELECT ... FROM <tbl> WHERE ... LIMIT ...`.
|
||||
// This typically used when row store enabled, to reduce scan cost
|
||||
injectRowIdColumnSlot();
|
||||
} else {
|
||||
// This is not a general topn query, rollback needMaterialize flag
|
||||
for (SlotDescriptor slot : analyzer.getDescTbl().getSlotDescs().values()) {
|
||||
@ -463,11 +469,13 @@ public class OriginalPlanner extends Planner {
|
||||
return slotDesc;
|
||||
}
|
||||
|
||||
// We use two phase read to optimize sql like: select * from tbl [where xxx = ???] order by column1 limit n
|
||||
// We use two phase read to optimize sql like: select * from tbl [where xxx = ???] [order by column1] [limit n]
|
||||
// in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes
|
||||
// in the second phase, we have n rows, we do a fetch rpc to get all rowids date for the n rows
|
||||
// and reconconstruct the final block
|
||||
private void injectRowIdColumnSlot() {
|
||||
boolean injected = false;
|
||||
OlapTable olapTable = null;
|
||||
for (PlanFragment fragment : fragments) {
|
||||
PlanNode node = fragment.getPlanRoot();
|
||||
PlanNode parent = null;
|
||||
@ -478,17 +486,37 @@ public class OriginalPlanner extends Planner {
|
||||
node = node.getChildren().get(0);
|
||||
}
|
||||
|
||||
if (!(node instanceof OlapScanNode) || !(parent instanceof SortNode)) {
|
||||
continue;
|
||||
// case1
|
||||
if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) {
|
||||
SortNode sortNode = (SortNode) parent;
|
||||
OlapScanNode scanNode = (OlapScanNode) node;
|
||||
SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
|
||||
injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor());
|
||||
SlotRef extSlot = new SlotRef(slot);
|
||||
sortNode.getResolvedTupleExprs().add(extSlot);
|
||||
sortNode.getSortInfo().setUseTwoPhaseRead();
|
||||
injected = true;
|
||||
olapTable = scanNode.getOlapTable();
|
||||
break;
|
||||
}
|
||||
// case2
|
||||
if ((node instanceof OlapScanNode) && parent == null) {
|
||||
OlapScanNode scanNode = (OlapScanNode) node;
|
||||
injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
|
||||
injected = true;
|
||||
olapTable = scanNode.getOlapTable();
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (PlanFragment fragment : fragments) {
|
||||
if (injected && fragment.getSink() instanceof ResultSink) {
|
||||
TFetchOption fetchOption = new TFetchOption();
|
||||
fetchOption.setFetchRowStore(olapTable.storeRowColumn());
|
||||
fetchOption.setUseTwoPhaseFetch(true);
|
||||
fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo());
|
||||
((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
|
||||
break;
|
||||
}
|
||||
SortNode sortNode = (SortNode) parent;
|
||||
OlapScanNode scanNode = (OlapScanNode) node;
|
||||
SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
|
||||
injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor());
|
||||
SlotRef extSlot = new SlotRef(slot);
|
||||
sortNode.getResolvedTupleExprs().add(extSlot);
|
||||
sortNode.getSortInfo().setUseTwoPhaseRead();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.thrift.TDataSink;
|
||||
import org.apache.doris.thrift.TDataSinkType;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TFetchOption;
|
||||
import org.apache.doris.thrift.TResultSink;
|
||||
|
||||
/**
|
||||
@ -30,6 +31,8 @@ import org.apache.doris.thrift.TResultSink;
|
||||
*/
|
||||
public class ResultSink extends DataSink {
|
||||
private final PlanNodeId exchNodeId;
|
||||
// Two phase fetch option
|
||||
private TFetchOption fetchOption;
|
||||
|
||||
public ResultSink(PlanNodeId exchNodeId) {
|
||||
this.exchNodeId = exchNodeId;
|
||||
@ -43,13 +46,26 @@ public class ResultSink extends DataSink {
|
||||
strBuilder.append("V");
|
||||
}
|
||||
strBuilder.append("RESULT SINK\n");
|
||||
if (fetchOption != null) {
|
||||
strBuilder.append(prefix).append(" ").append("OPT TWO PHASE\n");
|
||||
if (fetchOption.isFetchRowStore()) {
|
||||
strBuilder.append(prefix).append(" ").append("FETCH ROW STORE\n");
|
||||
}
|
||||
}
|
||||
return strBuilder.toString();
|
||||
}
|
||||
|
||||
public void setFetchOption(TFetchOption fetchOption) {
|
||||
this.fetchOption = fetchOption;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TDataSink toThrift() {
|
||||
TDataSink result = new TDataSink(TDataSinkType.RESULT_SINK);
|
||||
TResultSink tResultSink = new TResultSink();
|
||||
if (fetchOption != null) {
|
||||
tResultSink.setFetchOption(fetchOption);
|
||||
}
|
||||
result.setResultSink(tResultSink);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -144,7 +144,7 @@ public class SortNode extends PlanNode {
|
||||
}
|
||||
|
||||
public boolean getUseTwoPhaseReadOpt() {
|
||||
return useTopnOpt;
|
||||
return this.useTwoPhaseReadOpt;
|
||||
}
|
||||
|
||||
public void setUseTwoPhaseReadOpt(boolean useTwoPhaseReadOpt) {
|
||||
|
||||
@ -35,6 +35,8 @@ import org.apache.doris.common.util.NetUtils;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend.BackendState;
|
||||
import org.apache.doris.thrift.TNodeInfo;
|
||||
import org.apache.doris.thrift.TPaloNodesInfo;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
@ -156,6 +158,16 @@ public class SystemInfoService {
|
||||
}
|
||||
};
|
||||
|
||||
public static TPaloNodesInfo createAliveNodesInfo() {
|
||||
TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
|
||||
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
|
||||
for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) {
|
||||
Backend backend = systemInfoService.getBackend(id);
|
||||
nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
|
||||
}
|
||||
return nodesInfo;
|
||||
}
|
||||
|
||||
// for deploy manager
|
||||
public void addBackends(List<HostInfo> hostInfos, boolean isFree)
|
||||
throws UserException {
|
||||
|
||||
Reference in New Issue
Block a user