[Enhancement] Add column prune support for VOlapScanNode (#10615)
This commit is contained in:
@ -51,7 +51,8 @@ VOlapScanNode::VOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const Des
|
||||
_buffered_bytes(0),
|
||||
_eval_conjuncts_fn(nullptr),
|
||||
_runtime_filter_descs(tnode.runtime_filters),
|
||||
_max_materialized_blocks(config::doris_scanner_queue_size) {
|
||||
_max_materialized_blocks(config::doris_scanner_queue_size),
|
||||
_output_slot_ids(tnode.output_slot_ids) {
|
||||
_materialized_blocks.reserve(_max_materialized_blocks);
|
||||
_free_blocks.reserve(_max_materialized_blocks);
|
||||
}
|
||||
@ -228,6 +229,7 @@ Status VOlapScanNode::prepare(RuntimeState* state) {
|
||||
DCHECK(runtime_filter != nullptr);
|
||||
runtime_filter->init_profile(_runtime_profile.get());
|
||||
}
|
||||
init_output_slots();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -1649,6 +1651,14 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
|
||||
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
||||
_free_blocks.emplace_back(materialized_block);
|
||||
}
|
||||
|
||||
auto columns = block->get_columns();
|
||||
auto slots = _tuple_desc->slots();
|
||||
for (int i = 0; i < slots.size(); i++) {
|
||||
if (!_output_slot_flags[i]) {
|
||||
std::move(columns[i])->assume_mutable()->clear();
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -1833,4 +1843,12 @@ Status VOlapScanNode::get_hints(TabletSharedPtr table, const TPaloScanRange& sca
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void VOlapScanNode::init_output_slots() {
|
||||
for (const auto& slot_desc : _tuple_desc->slots()) {
|
||||
_output_slot_flags.emplace_back(_output_slot_ids.empty() ||
|
||||
std::find(_output_slot_ids.begin(), _output_slot_ids.end(),
|
||||
slot_desc->id()) != _output_slot_ids.end());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -108,6 +108,9 @@ private:
|
||||
// OLAP_SCAN_NODE profile layering: OLAP_SCAN_NODE, OlapScanner, and SegmentIterator
|
||||
// according to the calling relationship
|
||||
void init_scan_profile();
|
||||
|
||||
void init_output_slots();
|
||||
|
||||
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const {
|
||||
return _runtime_filter_descs;
|
||||
}
|
||||
@ -326,6 +329,9 @@ private:
|
||||
|
||||
size_t _block_size = 0;
|
||||
|
||||
std::vector<SlotId> _output_slot_ids;
|
||||
|
||||
std::vector<bool> _output_slot_flags;
|
||||
phmap::flat_hash_set<VExpr*> _rf_vexpr_set;
|
||||
std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
|
||||
};
|
||||
|
||||
@ -33,7 +33,6 @@ VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, b
|
||||
: _runtime_state(runtime_state),
|
||||
_parent(parent),
|
||||
_tuple_desc(parent->_tuple_desc),
|
||||
_id(-1),
|
||||
_is_open(false),
|
||||
_aggregation(aggregation),
|
||||
_need_agg_finalize(need_agg_finalize),
|
||||
|
||||
@ -65,8 +65,6 @@ public:
|
||||
|
||||
bool need_to_close() { return _need_to_close; }
|
||||
|
||||
int id() const { return _id; }
|
||||
void set_id(int id) { _id = id; }
|
||||
bool is_open() const { return _is_open; }
|
||||
void set_opened() { _is_open = true; }
|
||||
|
||||
@ -111,7 +109,6 @@ private:
|
||||
// to record which runtime filters have been used
|
||||
std::vector<bool> _runtime_filter_marks;
|
||||
|
||||
int _id;
|
||||
bool _is_open;
|
||||
bool _aggregation;
|
||||
bool _need_agg_finalize = true;
|
||||
|
||||
@ -788,6 +788,7 @@ public class HashJoinNode extends PlanNode {
|
||||
}
|
||||
output.append("\n");
|
||||
}
|
||||
|
||||
if (hashOutputSlotIds != null) {
|
||||
output.append(detailPrefix).append("hash output slot ids: ");
|
||||
for (SlotId slotId : hashOutputSlotIds) {
|
||||
@ -795,6 +796,7 @@ public class HashJoinNode extends PlanNode {
|
||||
}
|
||||
output.append("\n");
|
||||
}
|
||||
appendCommonExplainString(detailPrefix, output);
|
||||
return output.toString();
|
||||
}
|
||||
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.analysis.InPredicate;
|
||||
import org.apache.doris.analysis.IntLiteral;
|
||||
import org.apache.doris.analysis.PartitionNames;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotId;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.analysis.TupleId;
|
||||
@ -48,6 +49,7 @@ import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.NotImplementedException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
@ -744,7 +746,7 @@ public class OlapScanNode extends ScanNode {
|
||||
output.append(prefix).append(String.format("cardinality=%s", cardinality))
|
||||
.append(String.format(", avgRowSize=%s", avgRowSize)).append(String.format(", numNodes=%s", numNodes));
|
||||
output.append("\n");
|
||||
|
||||
appendCommonExplainString(prefix, output);
|
||||
return output.toString();
|
||||
}
|
||||
|
||||
@ -940,4 +942,17 @@ public class OlapScanNode extends ScanNode {
|
||||
return DataPartition.RANDOM;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initOutputSlotIds(Set<SlotId> requiredSlotIdSet, Analyzer analyzer) throws NotImplementedException {
|
||||
outputSlotIds = Lists.newArrayList();
|
||||
for (TupleId tupleId : tupleIds) {
|
||||
for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getSlots()) {
|
||||
if (slotDescriptor.isMaterialized() && (requiredSlotIdSet == null || requiredSlotIdSet.contains(
|
||||
slotDescriptor.getId())) || slotDescriptor.getColumn().getName().equals(Column.DELETE_SIGN)) {
|
||||
outputSlotIds.add(slotDescriptor.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -986,4 +986,17 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
|
||||
sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF));
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to append some common explains to output
|
||||
*/
|
||||
protected void appendCommonExplainString(String detailPrefix, StringBuilder output) {
|
||||
if (outputSlotIds != null) {
|
||||
output.append(detailPrefix).append("output slot ids: ");
|
||||
for (SlotId slotId : outputSlotIds) {
|
||||
output.append(slotId).append(" ");
|
||||
}
|
||||
output.append("\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,4 +108,35 @@ public class ProjectPlannerFunctionTest {
|
||||
Assert.assertTrue(explainString.contains("output slot ids: 1"));
|
||||
Assert.assertTrue(explainString.contains("hash output slot ids: 1 2 3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void projectOlap() throws Exception {
|
||||
String createOrdersTbl = "CREATE TABLE test.`orders` (\n" + " `o_orderkey` integer NOT NULL,\n"
|
||||
+ " `o_custkey` integer NOT NULL,\n" + " `o_orderstatus` char(1) NOT NULL,\n"
|
||||
+ " `o_totalprice` decimal(12, 2) NOT NULL,\n" + " `o_orderdate` date NOT NULL,\n"
|
||||
+ " `o_orderpriority` char(15) NOT NULL,\n" + " `o_clerk` char(15) NOT NULL,\n"
|
||||
+ " `o_shippriority` integer NOT NULL,\n" + " `o_comment` varchar(79) NOT NULL\n"
|
||||
+ ") DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 32 PROPERTIES (\"replication_num\" = \"1\");";
|
||||
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createOrdersTbl, connectContext);
|
||||
Catalog.getCurrentCatalog().createTable(createTableStmt);
|
||||
|
||||
String createCustomerTbl = "CREATE TABLE test.`customer` (\n" + " `c_custkey` integer NOT NULL,\n"
|
||||
+ " `c_name` varchar(25) NOT NULL,\n" + " `c_address` varchar(40) NOT NULL,\n"
|
||||
+ " `c_nationkey` integer NOT NULL,\n" + " `c_phone` char(15) NOT NULL,\n"
|
||||
+ " `c_acctbal` decimal(12, 2) NOT NULL,\n" + " `c_mktsegment` char(10) NOT NULL,\n"
|
||||
+ " `c_comment` varchar(117) NOT NULL\n"
|
||||
+ ") DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 32 PROPERTIES (\"replication_num\" = \"1\");";
|
||||
createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createCustomerTbl, connectContext);
|
||||
Catalog.getCurrentCatalog().createTable(createTableStmt);
|
||||
String tpcH13 = "select\n" + " c_count,\n" + " count(*) as custdist\n" + "from\n" + " (\n"
|
||||
+ " select\n" + " c_custkey,\n" + " count(o_orderkey) as c_count\n"
|
||||
+ " from\n" + " test.customer left outer join test.orders on\n"
|
||||
+ " c_custkey = o_custkey\n"
|
||||
+ " and o_comment not like '%special%requests%'\n" + " group by\n"
|
||||
+ " c_custkey\n" + " ) as c_orders\n" + "group by\n" + " c_count\n" + "order by\n"
|
||||
+ " custdist desc,\n" + " c_count desc;";
|
||||
String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, tpcH13);
|
||||
Assert.assertTrue(explainString.contains("output slot ids: 1 3"));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -11,6 +11,7 @@ PLAN FRAGMENT 0
|
||||
PREDICATES: `k1` = 1
|
||||
partitions=0/1, tablets=0/0, tabletList=
|
||||
cardinality=0, avgRowSize=8.0, numNodes=1
|
||||
output slot ids: 0
|
||||
|
||||
-- !redundant_conjuncts_gnerated_by_extract_common_filter --
|
||||
PLAN FRAGMENT 0
|
||||
@ -24,4 +25,5 @@ PLAN FRAGMENT 0
|
||||
PREDICATES: (`k1` = 1 OR `k1` = 2)
|
||||
partitions=0/1, tablets=0/0, tabletList=
|
||||
cardinality=0, avgRowSize=8.0, numNodes=1
|
||||
output slot ids: 0
|
||||
|
||||
|
||||
Reference in New Issue
Block a user