[New Featrue] Support Vectorization Execution Engine Interface For Doris (#6329)

1. FE vectorized plan code
2. Function register vec function
3. Diff function nullable type
4. New thirdparty code and new thrift struct
This commit is contained in:
HappenLee
2021-08-11 01:54:06 -05:00
committed by GitHub
parent 1a5b03167a
commit 9216735cfa
120 changed files with 2765 additions and 1007 deletions

View File

@ -127,9 +127,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_rows_returned_counter(NULL),
_rows_returned_rate(NULL),
_memory_used_counter(NULL),
_is_closed(false) {
init_runtime_profile(print_plan_node_type(tnode.node_type));
}
_is_closed(false) {}
ExecNode::~ExecNode() {}
@ -159,7 +157,18 @@ void ExecNode::push_down_predicate(RuntimeState* state, std::list<ExprContext*>*
}
Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
std::string profile;
if (state && state->enable_vectorized_exec()) {
profile = "V" + print_plan_node_type(tnode.node_type);
} else {
profile = print_plan_node_type(tnode.node_type);
}
init_runtime_profile(profile);
if (tnode.__isset.vconjunct) {
}
RETURN_IF_ERROR(Expr::create_expr_trees(_pool, tnode.conjuncts, &_conjunct_ctxs));
return Status::OK();
}
@ -178,11 +187,11 @@ Status ExecNode::prepare(RuntimeState* state) {
_expr_mem_tracker = MemTracker::CreateTracker(-1, "ExecNode:Exprs:" + _runtime_profile->name(),
_mem_tracker);
_expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get()));
// TODO chenhao
RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc(), expr_mem_tracker()));
// TODO(zc):
// AddExprCtxsToFree(_conjunct_ctxs);
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->prepare(state));
}
@ -362,22 +371,32 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();
case TPlanNodeType::OLAP_SCAN_NODE:
*node = pool->add(new OlapScanNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new OlapScanNode(pool, tnode, descs));
}
return Status::OK();
case TPlanNodeType::AGGREGATION_NODE:
if (config::enable_partitioned_aggregation) {
*node = pool->add(new PartitionedAggregationNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new AggregationNode(pool, tnode, descs));
if (config::enable_partitioned_aggregation) {
*node = pool->add(new PartitionedAggregationNode(pool, tnode, descs));
} else {
*node = pool->add(new AggregationNode(pool, tnode, descs));
}
}
return Status::OK();
case TPlanNodeType::HASH_JOIN_NODE:
*node = pool->add(new HashJoinNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::CROSS_JOIN_NODE:
*node = pool->add(new CrossJoinNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new CrossJoinNode(pool, tnode, descs));
}
return Status::OK();
case TPlanNodeType::MERGE_JOIN_NODE:
@ -389,7 +408,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();
case TPlanNodeType::EXCHANGE_NODE:
*node = pool->add(new ExchangeNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new ExchangeNode(pool, tnode, descs));
}
return Status::OK();
case TPlanNodeType::SELECT_NODE:
@ -401,10 +423,13 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();
case TPlanNodeType::SORT_NODE:
if (tnode.sort_node.use_top_n) {
*node = pool->add(new TopNNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new SpillSortNode(pool, tnode, descs));
if (tnode.sort_node.use_top_n) {
*node = pool->add(new TopNNode(pool, tnode, descs));
} else {
*node = pool->add(new SpillSortNode(pool, tnode, descs));
}
}
return Status::OK();
@ -417,7 +442,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();
case TPlanNodeType::UNION_NODE:
*node = pool->add(new UnionNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new UnionNode(pool, tnode, descs));
}
return Status::OK();
case TPlanNodeType::INTERSECT_NODE:
@ -624,4 +652,8 @@ Status ExecNode::QueryMaintenance(RuntimeState* state, const std::string& msg) {
return state->check_query_state(msg);
}
Status ExecNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
return Status::NotSupported("Not Implemented get block");
}
} // namespace doris