[Chore](runtime filter) remove runtime filter ready_for_publish/publish_finally (#20593)

This commit is contained in:
Pxl
2023-06-13 11:20:49 +08:00
committed by GitHub
parent ee0e2b40da
commit e010fa8d4f
12 changed files with 79 additions and 96 deletions

View File

@ -732,10 +732,6 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, "1048576");
// In most cases, it does not need to be modified.
DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method
// else we will call sync method
DEFINE_mBool(runtime_filter_use_async_rpc, "true");
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job

View File

@ -768,10 +768,6 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes);
// In most cases, it does not need to be modified.
DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method
// else we will call sync method
DECLARE_mBool(runtime_filter_use_async_rpc);
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job

View File

@ -1019,26 +1019,6 @@ public:
PrimitiveType column_type() { return _column_return_type; }
void ready_for_publish() {
if (_filter_type == RuntimeFilterType::MINMAX_FILTER) {
switch (_column_return_type) {
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
StringRef* min_value = static_cast<StringRef*>(_context.minmax_func->get_min());
StringRef* max_value = static_cast<StringRef*>(_context.minmax_func->get_max());
auto min_val_ptr = _pool->add(new std::string(min_value->data));
auto max_val_ptr = _pool->add(new std::string(max_value->data));
StringRef min_val(min_val_ptr->c_str(), min_val_ptr->length());
StringRef max_val(max_val_ptr->c_str(), max_val_ptr->length());
_context.minmax_func->assign(&min_val, &max_val);
}
default:
break;
}
}
}
bool is_bloomfilter() const { return _is_bloomfilter; }
bool is_ignored_in_filter() const { return _is_ignored_in_filter; }
@ -1164,11 +1144,6 @@ Status IRuntimeFilter::publish() {
}
}
void IRuntimeFilter::publish_finally() {
DCHECK(is_producer());
join_rpc();
}
Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs) {
DCHECK(is_consumer());
if (!_is_ignored) {
@ -1531,10 +1506,6 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() {
}
}
void IRuntimeFilter::ready_for_publish() {
_wrapper->ready_for_publish();
}
Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
if (!_is_ignored && wrapper->is_ignored_in_filter()) {
set_ignored();

View File

@ -217,8 +217,6 @@ public:
// push filter to remote node or push down it to scan_node
Status publish();
void publish_finally();
RuntimeFilterType type() const { return _runtime_filter_type; }
Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs);
@ -293,9 +291,12 @@ public:
// consumer should call before released
Status consumer_close();
bool is_finish_rpc();
Status join_rpc();
// async push runtimefilter to remote node
Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr, bool opt_remote_rf);
Status join_rpc();
void init_profile(RuntimeProfile* parent_profile);
@ -303,8 +304,6 @@ public:
void update_runtime_filter_type_to_profile();
void ready_for_publish();
static bool enable_use_batch(bool use_batch, PrimitiveType type) {
return use_batch && (is_int_or_bool(type) || is_float_or_double(type));
}
@ -389,9 +388,9 @@ protected:
std::vector<doris::vectorized::VExprSPtr> _push_down_vexprs;
struct rpc_context;
struct RPCContext;
std::shared_ptr<rpc_context> _rpc_context;
std::shared_ptr<RPCContext> _rpc_context;
// parent profile
// only effect on consumer
@ -403,7 +402,7 @@ protected:
const bool _enable_pipeline_exec;
bool _profile_init = false;
doris::Mutex _profile_mutex;
std::mutex _profile_mutex;
std::string _name;
bool _opt_remote_rf;
};

View File

@ -38,11 +38,14 @@
namespace doris {
struct IRuntimeFilter::rpc_context {
struct IRuntimeFilter::RPCContext {
PMergeFilterRequest request;
PMergeFilterResponse response;
brpc::Controller cntl;
brpc::CallId cid;
bool is_finished = false;
static void finish(std::shared_ptr<RPCContext> ctx) { ctx->is_finished = true; }
};
Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr,
@ -54,10 +57,9 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
if (!stub) {
std::string msg =
fmt::format("Get rpc stub failed, host={}, port=", addr->hostname, addr->port);
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
_rpc_context = std::make_shared<IRuntimeFilter::rpc_context>();
_rpc_context = std::make_shared<IRuntimeFilter::RPCContext>();
void* data = nullptr;
int len = 0;
@ -72,7 +74,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
_rpc_context->request.set_filter_id(_filter_id);
_rpc_context->request.set_opt_remote_rf(opt_remote_rf);
_rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
_rpc_context->cntl.set_timeout_ms(1000);
_rpc_context->cntl.set_timeout_ms(state->runtime_filter_wait_time_ms());
_rpc_context->cid = _rpc_context->cntl.call_id();
Status serialize_status = serialize(&_rpc_context->request, &data, &len);
@ -83,14 +85,9 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
DCHECK(data != nullptr);
_rpc_context->cntl.request_attachment().append(data, len);
}
if (config::runtime_filter_use_async_rpc) {
stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
brpc::DoNothing());
} else {
stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
nullptr);
_rpc_context.reset();
}
stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
brpc::NewCallback(RPCContext::finish, _rpc_context));
} else {
// we should reset context
@ -99,15 +96,25 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
return serialize_status;
}
bool IRuntimeFilter::is_finish_rpc() {
if (_rpc_context == nullptr) {
return true;
}
return _rpc_context->is_finished;
}
Status IRuntimeFilter::join_rpc() {
DCHECK(is_producer());
if (!is_producer()) {
return Status::InternalError("RuntimeFilter::join_rpc only called when rf is producer.");
}
if (_rpc_context != nullptr) {
brpc::Join(_rpc_context->cid);
if (_rpc_context->cntl.Failed()) {
LOG(WARNING) << "runtimefilter rpc err:" << _rpc_context->cntl.ErrorText();
// reset stub cache
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
_rpc_context->cntl.remote_side());
return Status::InternalError("RuntimeFilter::join_rpc meet rpc error, msg={}.",
_rpc_context->cntl.ErrorText());
}
}
return Status::OK();

View File

@ -17,6 +17,7 @@
#pragma once
#include "common/status.h"
#include "exprs/runtime_filter.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
@ -60,8 +61,8 @@ public:
auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) {
runtime_filter->set_ignored();
runtime_filter->set_ignored_msg(msg);
runtime_filter->publish();
runtime_filter->publish_finally();
RETURN_IF_ERROR(runtime_filter->publish());
return Status::OK();
};
// ordered vector: IN, IN_OR_BLOOM, others.
@ -142,9 +143,9 @@ public:
"in_num({}) >= max_in_num({})",
print_id(state->fragment_instance_id()), filter_desc.filter_id,
hash_table_size, max_in_num);
ignore_remote_filter(runtime_filter, msg);
RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg));
#else
ignore_remote_filter(runtime_filter, "ignored");
RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, "ignored"));
#endif
continue;
}
@ -196,31 +197,35 @@ public:
}
}
// should call this method after insert
void ready_for_publish() {
bool ready_finish_publish() {
for (auto& pair : _runtime_filters) {
for (auto filter : pair.second) {
filter->ready_for_publish();
}
}
}
// publish runtime filter
void publish() {
for (int i = 0; i < _probe_expr_context.size(); ++i) {
auto iter = _runtime_filters.find(i);
if (iter != _runtime_filters.end()) {
for (auto filter : iter->second) {
filter->publish();
if (!filter->is_finish_rpc()) {
return false;
}
}
}
return true;
}
void finish_publish() {
for (auto& pair : _runtime_filters) {
for (auto filter : pair.second) {
filter->publish_finally();
filter->join_rpc();
}
}
}
// publish runtime filter
Status publish() {
for (auto& pair : _runtime_filters) {
for (auto filter : pair.second) {
RETURN_IF_ERROR(filter->publish());
}
}
return Status::OK();
}
void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) {
for (auto& it : _runtime_filters) {
for (auto& filter : it.second) {

View File

@ -19,6 +19,7 @@
#include <vector>
#include "common/status.h"
#include "exprs/runtime_filter.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
@ -88,14 +89,11 @@ public:
return Status::OK();
}
void publish() {
Status publish() {
for (auto& filter : _runtime_filters) {
filter->publish();
}
for (auto& filter : _runtime_filters) {
// todo: cross join may not need publish_finally()
filter->publish_finally();
RETURN_IF_ERROR(filter->publish());
}
return Status::OK();
}
bool empty() { return !_runtime_filters.size(); }

View File

@ -39,6 +39,7 @@ class HashJoinBuildSink final : public StreamingOperator<HashJoinBuildSinkBuilde
public:
HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
bool can_write() override { return _node->can_sink_write(); }
bool is_pending_finish() const override { return !_node->ready_for_finish(); }
};
} // namespace pipeline

View File

@ -43,7 +43,7 @@
namespace doris {
template <class RPCRequest, class RPCResponse>
struct async_rpc_context {
struct AsyncRPCContext {
RPCRequest request;
RPCResponse response;
brpc::Controller cntl;
@ -55,7 +55,7 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state
RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx)
: _query_ctx(query_ctx) {}
RuntimeFilterMgr::~RuntimeFilterMgr() {}
RuntimeFilterMgr::~RuntimeFilterMgr() = default;
Status RuntimeFilterMgr::init() {
_tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
@ -315,7 +315,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
// 2. FE has been upgraded (e.g. cntVal->targetv2_info.size() > 0)
// 3. This filter is bloom filter (only bloom filter should be used for merging)
using PPublishFilterRpcContext =
async_rpc_context<PPublishFilterRequestV2, PPublishFilterResponse>;
AsyncRPCContext<PPublishFilterRequestV2, PPublishFilterResponse>;
std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
rpc_contexts.reserve(cntVal->targetv2_info.size());
@ -380,7 +380,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
} else {
// prepare rpc context
using PPublishFilterRpcContext =
async_rpc_context<PPublishFilterRequest, PPublishFilterResponse>;
AsyncRPCContext<PPublishFilterRequest, PPublishFilterResponse>;
std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
rpc_contexts.reserve(cntVal->target_info.size());

View File

@ -285,9 +285,9 @@ struct ProcessRuntimeFilterBuild {
if (_join_node->_runtime_filter_descs.empty()) {
return Status::OK();
}
_join_node->_runtime_filter_slots = _join_node->_pool->add(
new VRuntimeFilterSlots(_join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs,
_join_node->_runtime_filter_descs));
_join_node->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
_join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs,
_join_node->_runtime_filter_descs);
RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init(
state, hash_table_ctx.hash_table.get_size(), _join_node->_build_bf_cardinality));
@ -300,7 +300,7 @@ struct ProcessRuntimeFilterBuild {
}
{
SCOPED_TIMER(_join_node->_push_down_timer);
_join_node->_runtime_filter_slots->publish();
RETURN_IF_ERROR(_join_node->_runtime_filter_slots->publish());
}
return Status::OK();
@ -917,15 +917,15 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
if (_runtime_filter_descs.empty()) {
return Status::OK();
}
_runtime_filter_slots = _pool->add(new VRuntimeFilterSlots(
_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
_probe_expr_ctxs, _build_expr_ctxs,
_runtime_filter_descs));
_runtime_filter_descs);
RETURN_IF_ERROR(_runtime_filter_slots->init(
state, arg.hash_table.get_size(), 0));
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));
_runtime_filter_slots->publish();
RETURN_IF_ERROR(_runtime_filter_slots->publish());
return Status::OK();
}},
*_hash_table_variants);
@ -1261,6 +1261,9 @@ HashJoinNode::~HashJoinNode() {
// signal at here is abnormal
_shared_hashtable_controller->signal(id(), Status::Cancelled("signaled in destructor"));
}
if (_runtime_filter_slots != nullptr) {
_runtime_filter_slots->finish_publish();
}
}
void HashJoinNode::_release_mem() {

View File

@ -252,6 +252,13 @@ public:
bool should_build_hash_table() const { return _should_build_hash_table; }
bool ready_for_finish() {
if (_runtime_filter_slots == nullptr) {
return true;
}
return _runtime_filter_slots->ready_finish_publish();
}
private:
void _init_short_circuit_for_probe() override {
_short_circuit_for_probe =
@ -341,7 +348,7 @@ private:
bool _is_broadcast_join = false;
bool _should_build_hash_table = true;
std::shared_ptr<SharedHashTableController> _shared_hashtable_controller = nullptr;
VRuntimeFilterSlots* _runtime_filter_slots = nullptr;
std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots = nullptr;
std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _left_output_slot_flags;

View File

@ -84,7 +84,7 @@ struct RuntimeFilterBuild {
}
{
SCOPED_TIMER(_join_node->_push_down_timer);
runtime_filter_slots.publish();
RETURN_IF_ERROR(runtime_filter_slots.publish());
}
return Status::OK();