[Enhancement](topn) support two phase read for topn query (#15642)

This PR optimize topn query like `SELECT * FROM tableX ORDER BY columnA ASC/DESC LIMIT N`.

TopN is is compose of SortNode and ScanNode, when user table is wide like 100+ columns the order by clause is just a few columns.But ScanNode need to scan all data from storage engine even if the limit is very small.This may lead to lots of read amplification.So In this PR I devide TopN query into two phase:
1. The first phase we just need to read `columnA`'s data from storage engine along with an extra RowId column called `__DORIS_ROWID_COL__`.The other columns are pruned from ScanNode.
2. The second phase I put it in the ExchangeNode beacuase it's the central node for topn nodes in the cluster.The ExchangeNode will spawn a RPC to other nodes using the RowIds(sorted and limited from SortNode) read from the first phase and read row by row from storage engine.

After the second phase read, Block will contain all the data needed for the query
This commit is contained in:
lihangyu
2023-01-19 10:01:33 +08:00
committed by GitHub
parent c7a72436e6
commit 3894de49d2
53 changed files with 829 additions and 33 deletions

View File

@ -1955,5 +1955,8 @@ public class Config extends ConfigBase {
*/
@ConfField(masterOnly = true)
public static int hms_events_polling_interval_ms = 10000;
@ConfField(mutable = false)
public static int topn_two_phase_limit_threshold = 512;
}

View File

@ -551,6 +551,10 @@ public class Analyzer {
isInlineView = inlineView;
}
public boolean isInlineViewAnalyzer() {
return isInlineView;
}
public void setExplicitViewAlias(String alias) {
explicitViewAlias = alias;
}
@ -997,6 +1001,9 @@ public class Analyzer {
result.setStats(srcSlotDesc.getStats());
result.setType(srcSlotDesc.getType());
result.setIsNullable(srcSlotDesc.getIsNullable());
if (srcSlotDesc.getColumn() != null) {
result.setColumn(srcSlotDesc.getColumn());
}
// result.setItemTupleDesc(srcSlotDesc.getItemTupleDesc());
return result;
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Type;
@ -557,7 +558,6 @@ public class SelectStmt extends QueryStmt {
"cannot combine SELECT DISTINCT with analytic functions");
}
}
whereClauseRewrite();
if (whereClause != null) {
if (checkGroupingFn(whereClause)) {
@ -576,7 +576,6 @@ public class SelectStmt extends QueryStmt {
}
analyzer.registerConjuncts(whereClause, false, getTableRefIds());
}
createSortInfo(analyzer);
if (sortInfo != null && CollectionUtils.isNotEmpty(sortInfo.getOrderingExprs())) {
if (groupingInfo != null) {
@ -591,6 +590,33 @@ public class SelectStmt extends QueryStmt {
analyzeAggregation(analyzer);
createAnalyticInfo(analyzer);
eliminatingSortNode();
if (checkEnableTwoPhaseRead(analyzer)) {
// If optimize enabled, we try our best to read less columns from ScanNode,
// here we analyze conjunct exprs and ordering exprs before resultExprs,
// rest of resultExprs will be marked as `INVALID`, such columns will
// be prevent from reading from ScanNode.Those columns will be finally
// read by the second fetch phase
LOG.debug("two phase read optimize enabled");
// Expr.analyze(resultExprs, analyzer);
Set<SlotRef> resultSlots = Sets.newHashSet();
Set<SlotRef> orderingSlots = Sets.newHashSet();
Set<SlotRef> conjuntSlots = Sets.newHashSet();
TreeNode.collect(resultExprs, Predicates.instanceOf(SlotRef.class), resultSlots);
TreeNode.collect(sortInfo.getOrderingExprs(), Predicates.instanceOf(SlotRef.class), orderingSlots);
if (whereClause != null) {
whereClause.collect(SlotRef.class, conjuntSlots);
}
resultSlots.removeAll(orderingSlots);
resultSlots.removeAll(conjuntSlots);
// reset slots need to do fetch column
for (SlotRef slot : resultSlots) {
// invalid slots will be pruned from reading from ScanNode
slot.setInvalid();
}
LOG.debug("resultsSlots {}", resultSlots);
LOG.debug("orderingSlots {}", orderingSlots);
LOG.debug("conjuntSlots {}", conjuntSlots);
}
if (evaluateOrderBy) {
createSortTupleInfo(analyzer);
}
@ -615,6 +641,72 @@ public class SelectStmt extends QueryStmt {
}
}
// Check whether enable two phase read optimize, if enabled query will be devieded into two phase read:
// 1. read conjuncts columns and order by columns along with an extra RowId column from ScanNode
// 2. sort and filter data, and get final RowId column, spawn RPC to other BE to fetch final data
// 3. final matrialize all data
public boolean checkEnableTwoPhaseRead(Analyzer analyzer) {
// only vectorized mode and session opt variable enabled
if (ConnectContext.get() == null
|| ConnectContext.get().getSessionVariable() == null
|| !ConnectContext.get().getSessionVariable().enableVectorizedEngine
|| !ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
return false;
}
if (!evaluateOrderBy) {
// Need evaluate orderby, if sort node was eliminated then this optmization
// could be useless
return false;
}
// Only handle the simplest `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...` query
if (getAggInfo() != null
|| getHavingPred() != null
|| getWithClause() != null) {
return false;
}
if (!analyzer.isRootAnalyzer()) {
// ensure no sub query
return false;
}
// If select stmt has inline view or this is an inline view query stmt analyze call
if (hasInlineView() || analyzer.isInlineViewAnalyzer()) {
return false;
}
// single olap table
List<TableRef> tblRefs = getTableRefs();
if (tblRefs.size() != 1 || !(tblRefs.get(0) instanceof BaseTableRef)) {
return false;
}
TableRef tbl = tblRefs.get(0);
if (tbl.getTable().getType() != Table.TableType.OLAP) {
return false;
}
LOG.debug("table ref {}", tbl);
// Need enable light schema change, since opt rely on
// column_unique_id of each slot
OlapTable olapTable = (OlapTable) tbl.getTable();
if (!olapTable.getEnableLightSchemaChange()) {
return false;
}
// Only TOPN query at present
if (getOrderByElements() == null
|| !hasLimit()
|| getLimit() == 0
|| getLimit() > ConnectContext.get().getSessionVariable().twoPhaseReadLimitThreshold) {
return false;
}
// Check order by exprs are all slot refs
// Rethink? implement more generic to support all exprs
LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs());
LOG.debug("getOrderByElements {}", getOrderByElements());
for (OrderByElement orderby : getOrderByElements()) {
if (!(orderby.getExpr() instanceof SlotRef)) {
return false;
}
}
return true;
}
public List<TupleId> getTableRefIds() {
List<TupleId> result = Lists.newArrayList();

View File

@ -69,6 +69,9 @@ public class SlotDescriptor {
private boolean isMultiRef;
// used for load to get more information of varchar and decimal
private Type originType;
// If set to false, then such slots will be ignored during
// materialize them.Used to optmize to read less data and less memory usage
private boolean needMaterialize = true;
public SlotDescriptor(SlotId id, TupleDescriptor parent) {
this.id = id;
@ -108,6 +111,14 @@ public class SlotDescriptor {
return isAgg;
}
public void setInvalid() {
this.needMaterialize = false;
}
public boolean isInvalid() {
return !this.needMaterialize;
}
public void setIsAgg(boolean agg) {
isAgg = agg;
}
@ -255,6 +266,12 @@ public class SlotDescriptor {
return sourceExprs;
}
public int getUniqueId() {
if (column == null) {
return -1;
}
return column.getUniqueId();
}
/**
* Initializes a slot by setting its source expression information
@ -301,10 +318,11 @@ public class SlotDescriptor {
TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(),
(originType != null ? originType.toThrift() : type.toThrift()), -1, byteOffset, nullIndicatorByte,
nullIndicatorBit, ((column != null) ? column.getName() : ""), slotIdx, isMaterialized);
tSlotDescriptor.setNeedMaterialize(needMaterialize);
if (column != null) {
LOG.debug("column name:{}, column unique id:{}", column.getName(), column.getUniqueId());
tSlotDescriptor.setColUniqueId(column.getUniqueId());
tSlotDescriptor.setIsKey(column.isKey());
}
return tSlotDescriptor;
}

View File

@ -122,6 +122,14 @@ public class SlotRef extends Expr {
return desc.getId();
}
public void setInvalid() {
this.desc.setInvalid();
}
public boolean isInvalid() {
return this.desc.isInvalid();
}
public Column getColumn() {
if (desc == null) {
return null;
@ -289,6 +297,7 @@ public class SlotRef extends Expr {
protected void toThrift(TExprNode msg) {
msg.node_type = TExprNodeType.SLOT_REF;
msg.slot_ref = new TSlotRef(desc.getId().asInt(), desc.getParent().getId().asInt());
msg.slot_ref.setColUniqueId(desc.getUniqueId());
msg.setOutputColumn(outputColumn);
}
@ -437,6 +446,10 @@ public class SlotRef extends Expr {
this.label = label;
}
public boolean hasCol() {
return this.col != null;
}
public String getColumnName() {
return col;
}

View File

@ -63,6 +63,7 @@ public class SortInfo {
// Input expressions materialized into sortTupleDesc_. One expr per slot in
// sortTupleDesc_.
private List<Expr> sortTupleSlotExprs;
private boolean useTwoPhaseRead = false;
public SortInfo(List<Expr> orderingExprs, List<Boolean> isAscOrder,
List<Boolean> nullsFirstParams) {
@ -145,6 +146,14 @@ public class SortInfo {
sortTupleDesc = tupleDesc;
}
public void setUseTwoPhaseRead() {
useTwoPhaseRead = true;
}
public boolean useTwoPhaseRead() {
return useTwoPhaseRead;
}
public TupleDescriptor getSortTupleDescriptor() {
return sortTupleDesc;
}
@ -258,6 +267,7 @@ public class SortInfo {
// Update the tuple descriptor used to materialize the input of the sort.
setMaterializedTupleInfo(sortTupleDesc, sortTupleExprs);
LOG.debug("sortTupleDesc {}", sortTupleDesc);
return substOrderBy;
}
@ -285,6 +295,11 @@ public class SortInfo {
SlotDescriptor materializedDesc = analyzer.addSlotDescriptor(sortTupleDesc);
materializedDesc.initFromExpr(origOrderingExpr);
materializedDesc.setIsMaterialized(true);
SlotRef origSlotRef = origOrderingExpr.getSrcSlotRef();
LOG.debug("origOrderingExpr {}", origOrderingExpr);
if (origSlotRef != null) {
materializedDesc.setColumn(origSlotRef.getColumn());
}
SlotRef materializedRef = new SlotRef(materializedDesc);
substOrderBy.put(origOrderingExpr, materializedRef);
materializedOrderingExprs.add(origOrderingExpr);
@ -301,6 +316,9 @@ public class SortInfo {
Expr.treesToThrift(orderingExprs),
isAscOrder,
nullsFirstParams);
if (useTwoPhaseRead) {
sortInfo.setUseTwoPhaseRead(true);
}
return sortInfo;
}
}

View File

@ -58,6 +58,7 @@ public class Column implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(Column.class);
public static final String DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String SEQUENCE_COL = "__DORIS_SEQUENCE_COL__";
public static final String ROWID_COL = "__DORIS_ROWID_COL__";
private static final String COLUMN_ARRAY_CHILDREN = "item";
public static final int COLUMN_UNIQUE_ID_INIT_VALUE = -1;

View File

@ -21,19 +21,22 @@
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TExchangeNode;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TNodeInfo;
import org.apache.doris.thrift.TPaloNodesInfo;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TSortInfo;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
@ -161,10 +164,10 @@ public class ExchangeNode extends PlanNode {
msg.exchange_node.addToInputRowTuples(tid.asInt());
}
if (mergeInfo != null) {
TSortInfo sortInfo = new TSortInfo(
Expr.treesToThrift(mergeInfo.getOrderingExprs()),
mergeInfo.getIsAscOrder(), mergeInfo.getNullsFirst());
msg.exchange_node.setSortInfo(sortInfo);
msg.exchange_node.setSortInfo(mergeInfo.toThrift());
if (mergeInfo.useTwoPhaseRead()) {
msg.exchange_node.setNodesInfo(createNodesInfo());
}
}
msg.exchange_node.setOffset(offset);
}
@ -187,4 +190,17 @@ public class ExchangeNode extends PlanNode {
return prefix + "offset: " + offset + "\n";
}
/**
* Set the parameters used to fetch data by rowid column
* after init().
*/
private TPaloNodesInfo createNodesInfo() {
TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) {
Backend backend = systemInfoService.getBackend(id);
nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
}
return nodesInfo;
}
}

View File

@ -966,6 +966,9 @@ public class OlapScanNode extends ScanNode {
sortInfo.getMaterializedOrderingExprs().forEach(expr -> {
output.append(prefix).append(prefix).append(expr.toSql()).append("\n");
});
if (sortInfo.useTwoPhaseRead()) {
output.append(prefix).append("OPT TWO PHASE\n");
}
}
if (sortLimit != -1) {
output.append(prefix).append("SORT LIMIT: ").append(sortLimit).append("\n");

View File

@ -32,8 +32,10 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.qe.ConnectContext;
@ -239,6 +241,8 @@ public class OriginalPlanner extends Planner {
} else {
List<Expr> resExprs = Expr.substituteList(queryStmt.getResultExprs(),
rootFragment.getPlanRoot().getOutputSmap(), analyzer, false);
LOG.debug("result Exprs {}", queryStmt.getResultExprs());
LOG.debug("substitute result Exprs {}", resExprs);
rootFragment.setOutputExprs(resExprs);
}
LOG.debug("finalize plan fragments");
@ -259,6 +263,9 @@ public class OriginalPlanner extends Planner {
isBlockQuery = false;
LOG.debug("this isn't block query");
}
if (selectStmt.checkEnableTwoPhaseRead(analyzer)) {
injectRowIdColumnSlot();
}
}
}
@ -334,6 +341,52 @@ public class OriginalPlanner extends Planner {
topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId()));
}
private SlotDescriptor injectRowIdColumnSlot(Analyzer analyzer, TupleDescriptor tupleDesc) {
SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(tupleDesc);
LOG.debug("inject slot {}", slotDesc);
String name = Column.ROWID_COL;
Column col = new Column(name, Type.STRING, false, null, false, "",
"rowid column");
slotDesc.setType(Type.STRING);
slotDesc.setColumn(col);
slotDesc.setIsNullable(false);
slotDesc.setIsMaterialized(true);
// Non-nullable slots will have 0 for the byte offset and -1 for the bit mask
slotDesc.setNullIndicatorBit(-1);
slotDesc.setNullIndicatorByte(0);
return slotDesc;
}
// We use two phase read to optimize sql like: select * from tbl [where xxx = ???] order by column1 limit n
// in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes
// in the second phase, we have n rows, we do a fetch rpc to get all rowids date for the n rows
// and reconconstruct the final block
private void injectRowIdColumnSlot() {
for (PlanFragment fragment : fragments) {
PlanNode node = fragment.getPlanRoot();
PlanNode parent = null;
// OlapScanNode is the last node.
// So, just get the last two node and check if they are SortNode and OlapScan.
while (node.getChildren().size() != 0) {
parent = node;
node = node.getChildren().get(0);
}
if (!(node instanceof OlapScanNode) || !(parent instanceof SortNode)) {
continue;
}
SortNode sortNode = (SortNode) parent;
OlapScanNode scanNode = (OlapScanNode) node;
SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor());
SlotRef extSlot = new SlotRef(slot);
sortNode.getResolvedTupleExprs().add(extSlot);
sortNode.getSortInfo().setUseTwoPhaseRead();
break;
}
}
/**
* Push sort down to olap scan.
*/
@ -354,6 +407,7 @@ public class OriginalPlanner extends Planner {
}
SortNode sortNode = (SortNode) parent;
OlapScanNode scanNode = (OlapScanNode) node;
if (!scanNode.checkPushSort(sortNode)) {
continue;
}

View File

@ -466,9 +466,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
}
}
public String getExplainString() {
return getExplainString("", "", TExplainLevel.VERBOSE);
}

View File

@ -142,6 +142,10 @@ public class SortNode extends PlanNode {
this.useTopnOpt = useTopnOpt;
}
public List<Expr> getResolvedTupleExprs() {
return resolvedTupleExprs;
}
@Override
public void setCompactData(boolean on) {
this.compactData = on;

View File

@ -254,6 +254,9 @@ public class SessionVariable implements Serializable, Writable {
public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold";
public static final String ENABLE_TWO_PHASE_READ_OPT = "enable_two_phase_read_opt";
public static final String TWO_PHASE_READ_OPT_LIMIT_THRESHOLD = "two_phase_read_opt_limit_threshold";
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@ -659,6 +662,14 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD, checker = "checkExternalSortBytesThreshold")
public long externalSortBytesThreshold = 0;
// Whether enable two phase read optimization
// 1. read related rowids along with necessary column data
// 2. spawn fetch RPC to other nodes to get related data by sorted rowids
@VariableMgr.VarAttr(name = ENABLE_TWO_PHASE_READ_OPT)
public boolean enableTwoPhaseReadOpt = true;
@VariableMgr.VarAttr(name = TWO_PHASE_READ_OPT_LIMIT_THRESHOLD)
public long twoPhaseReadLimitThreshold = 512;
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables,
// not the default value set in the code.
public void initFuzzyModeVariables() {