[Feature] Support RuntimeFilter in Doris (BE Implement) (#6077)

1. support in/bloomfilter/minmax
2. support broadcast/shuffle/bucket shuffle/colocate join
3. opt memory use and cpu cache miss while build runtime filter
4. opt memory use in left semi join (works well on tpcds-95)
This commit is contained in:
stdpain
2021-07-04 20:59:05 +08:00
committed by GitHub
parent 4dd2617bd1
commit 149def9e42
56 changed files with 4039 additions and 283 deletions

View File

@ -14,19 +14,22 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hash_join_node.h"
#include <memory>
#include <sstream>
#include "exec/hash_table.hpp"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "exprs/in_predicate.h"
#include "exprs/runtime_filter.h"
#include "exprs/slot_ref.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"
namespace doris {
@ -36,7 +39,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
_join_op(tnode.hash_join_node.join_op),
_probe_counter(0),
_probe_eos(false),
_process_build_batch_fn(NULL),
_process_probe_batch_fn(NULL),
_anti_join_last_pos(NULL) {
_match_all_probe =
@ -44,8 +46,9 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
_match_one_build = (_join_op == TJoinOp::LEFT_SEMI_JOIN);
_match_all_build =
(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN);
_is_push_down = tnode.hash_join_node.is_push_down;
_build_unique = _join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN;
_runtime_filter_descs = tnode.runtime_filters;
}
HashJoinNode::~HashJoinNode() {
@ -81,6 +84,11 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
_build_unique = false;
}
for (const auto& filter_desc : _runtime_filter_descs) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(RuntimeFilterRole::PRODUCER,
filter_desc));
}
return Status::OK();
}
@ -97,7 +105,8 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_probe_rows_counter = ADD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT);
_hash_tbl_load_factor_counter =
ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
_hash_table_list_min_size = ADD_COUNTER(runtime_profile(), "HashTableMinList", TUnit::UNIT);
_hash_table_list_max_size = ADD_COUNTER(runtime_profile(), "HashTableMaxList", TUnit::UNIT);
// build and probe exprs are evaluated in the context of the rows produced by our
// right and left children, respectively
RETURN_IF_ERROR(
@ -186,21 +195,22 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) {
RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker().get());
RETURN_IF_ERROR(child(1)->open(state));
SCOPED_TIMER(_build_timer);
Defer defer {[&] {
COUNTER_SET(_build_rows_counter, _hash_tbl->size());
COUNTER_SET(_build_buckets_counter, _hash_tbl->num_buckets());
COUNTER_SET(_hash_tbl_load_factor_counter, _hash_tbl->load_factor());
auto node = _hash_tbl->minmax_node();
COUNTER_SET(_hash_table_list_min_size, node.first);
COUNTER_SET(_hash_table_list_max_size, node.second);
}};
while (true) {
RETURN_IF_CANCELLED(state);
bool eos = true;
RETURN_IF_ERROR(child(1)->get_next(state, &build_batch, &eos));
SCOPED_TIMER(_build_timer);
// take ownership of tuple data of build_batch
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
process_build_batch(&build_batch);
RETURN_IF_ERROR(process_build_batch(state, &build_batch));
VLOG_ROW << _hash_tbl->debug_string(true, &child(1)->row_desc());
COUNTER_SET(_build_rows_counter, _hash_tbl->size());
COUNTER_SET(_build_buckets_counter, _hash_tbl->num_buckets());
COUNTER_SET(_hash_tbl_load_factor_counter, _hash_tbl->load_factor());
build_batch.reset();
if (eos) {
@ -236,91 +246,23 @@ Status HashJoinNode::open(RuntimeState* state) {
thread_status.set_value(construct_hash_table(state));
}
if (_children[0]->type() == TPlanNodeType::EXCHANGE_NODE &&
_children[1]->type() == TPlanNodeType::EXCHANGE_NODE) {
_is_push_down = false;
}
if (!_runtime_filter_descs.empty()) {
RuntimeFilterSlots runtime_filter_slots(_probe_expr_ctxs, _build_expr_ctxs,
_runtime_filter_descs);
// The predicate could not be pushed down when there is Null-safe equal operator.
// The in predicate will filter the null value in child[0] while it is needed in the Null-safe equal join.
// For example: select * from a join b where a.id<=>b.id
// the null value in table a should be return by scan node instead of filtering it by In-predicate.
if (std::find(_is_null_safe_eq_join.begin(), _is_null_safe_eq_join.end(), true) !=
_is_null_safe_eq_join.end()) {
_is_push_down = false;
}
if (_is_push_down) {
// Blocks until ConstructHashTable has returned, after which
// the hash table is fully constructed and we can start the probe
// phase.
RETURN_IF_ERROR(thread_status.get_future().get());
if (_hash_tbl->size() == 0 && _join_op == TJoinOp::INNER_JOIN) {
// Hash table size is zero
LOG(INFO) << "No element need to push down, no need to read probe table";
RETURN_IF_ERROR(child(0)->open(state));
_probe_batch_pos = 0;
_hash_tbl_iterator = _hash_tbl->begin();
_eos = true;
return Status::OK();
RETURN_IF_ERROR(runtime_filter_slots.init(state, _pool, expr_mem_tracker().get(),
_hash_tbl->size()));
{
SCOPED_TIMER(_push_compute_timer);
auto func = [&](TupleRow* row) { runtime_filter_slots.insert(row); };
_hash_tbl->for_each_row(func);
}
if (_hash_tbl->size() > 1024) {
_is_push_down = false;
}
// TODO: this is used for Code Check, Remove this later
if (_is_push_down || 0 != child(1)->conjunct_ctxs().size()) {
for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
TExprNode node;
node.__set_node_type(TExprNodeType::IN_PRED);
TScalarType tscalar_type;
tscalar_type.__set_type(TPrimitiveType::BOOLEAN);
TTypeNode ttype_node;
ttype_node.__set_type(TTypeNodeType::SCALAR);
ttype_node.__set_scalar_type(tscalar_type);
TTypeDesc t_type_desc;
t_type_desc.types.push_back(ttype_node);
node.__set_type(t_type_desc);
node.in_predicate.__set_is_not_in(false);
node.__set_opcode(TExprOpcode::FILTER_IN);
node.__isset.vector_opcode = true;
node.__set_vector_opcode(to_in_opcode(_probe_expr_ctxs[i]->root()->type().type));
// NOTE(zc): in predicate only used here, no need prepare.
InPredicate* in_pred = _pool->add(new InPredicate(node));
RETURN_IF_ERROR(in_pred->prepare(state, _probe_expr_ctxs[i]->root()->type()));
in_pred->add_child(Expr::copy(_pool, _probe_expr_ctxs[i]->root()));
ExprContext* ctx = _pool->add(new ExprContext(in_pred));
_push_down_expr_ctxs.push_back(ctx);
}
{
SCOPED_TIMER(_push_compute_timer);
HashTable::Iterator iter = _hash_tbl->begin();
while (iter.has_next()) {
TupleRow* row = iter.get_row();
std::list<ExprContext*>::iterator ctx_iter = _push_down_expr_ctxs.begin();
for (int i = 0; i < _build_expr_ctxs.size(); ++i, ++ctx_iter) {
void* val = _build_expr_ctxs[i]->get_value(row);
InPredicate* in_pre = (InPredicate*)((*ctx_iter)->root());
in_pre->insert(val);
}
SCOPED_TIMER(_build_timer);
iter.next<false>();
}
}
COUNTER_UPDATE(_build_timer, _push_compute_timer->value());
{
SCOPED_TIMER(_push_down_timer);
push_down_predicate(state, &_push_down_expr_ctxs);
runtime_filter_slots.publish(this);
}
// Open the probe-side child so that it may perform any initialisation in parallel.
// Don't exit even if we see an error, we still need to wait for the build thread
// to finish.
Status open_status = child(0)->open(state);
RETURN_IF_ERROR(open_status);
} else {
@ -370,8 +312,8 @@ Status HashJoinNode::open(RuntimeState* state) {
Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
// In most cases, no additional memory overhead will be applied for at this stage,
// but if the expression calculation in this node needs to apply for additional memory,
// In most cases, no additional memory overhead will be applied for at this stage,
// but if the expression calculation in this node needs to apply for additional memory,
// it may cause the memory to exceed the limit.
RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while execute get_next.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
@ -618,6 +560,7 @@ Status HashJoinNode::left_join_get_next(RuntimeState* state, RowBatch* out_batch
*eos = _eos;
ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
Defer defer {[&] { COUNTER_SET(_rows_returned_counter, _num_rows_returned); }};
while (!_eos) {
// Compute max rows that should be added to out_batch
@ -628,16 +571,7 @@ Status HashJoinNode::left_join_get_next(RuntimeState* state, RowBatch* out_batch
}
// Continue processing this row batch
if (_process_probe_batch_fn == NULL) {
_num_rows_returned +=
process_probe_batch(out_batch, _probe_batch.get(), max_added_rows);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
} else {
// Use codegen'd function
_num_rows_returned +=
_process_probe_batch_fn(this, out_batch, _probe_batch.get(), max_added_rows);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
_num_rows_returned += process_probe_batch(out_batch, _probe_batch.get(), max_added_rows);
if (reached_limit() || out_batch->is_full()) {
*eos = reached_limit();