[Improvement](runtime filter) Reduce merging time for bloom filter (#13668)

This commit is contained in:
Gabriel
2022-10-27 00:02:05 +08:00
committed by GitHub
parent 06e433e14a
commit 0134e9d2f4
11 changed files with 87 additions and 44 deletions

View File

@ -21,10 +21,14 @@
#pragma once
#include "common/status.h"
#include "gutil/macros.h"
#include "fmt/format.h"
#include "util/hash_util.hpp"
#include "util/slice.h"
namespace butil {
class IOBufAsZeroCopyInputStream;
}
namespace doris {
// https://github.com/apache/kudu/blob/master/src/kudu/util/block_bloom_filter.h
@ -40,11 +44,14 @@ public:
explicit BlockBloomFilter();
~BlockBloomFilter();
BlockBloomFilter(const BlockBloomFilter&) = delete;
BlockBloomFilter& operator=(const BlockBloomFilter&) = delete;
Status init(int log_space_bytes, uint32_t hash_seed);
// Initialize the BlockBloomFilter from a populated "directory" structure.
// Useful for initializing the BlockBloomFilter by de-serializing a custom protobuf message.
Status init_from_directory(int log_space_bytes, const Slice& directory, bool always_false,
uint32_t hash_seed);
Status init_from_directory(int log_space_bytes, butil::IOBufAsZeroCopyInputStream* data,
const size_t data_size, bool always_false, uint32_t hash_seed);
void close();
@ -176,7 +183,7 @@ private:
#endif
// Size of the internal directory structure in bytes.
int64_t directory_size() const { return 1ULL << log_space_bytes(); }
size_t directory_size() const { return 1ULL << log_space_bytes(); }
// Some constants used in hashing. #defined for efficiency reasons.
#define BLOOM_HASH_CONSTANTS \
@ -200,8 +207,6 @@ private:
// Rehash32to32(hash2) is minimal.
return (static_cast<uint64_t>(hash) * m + a) >> 32U;
}
DISALLOW_COPY_AND_ASSIGN(BlockBloomFilter);
};
} // namespace doris

View File

@ -23,6 +23,7 @@
#include <immintrin.h>
#include "exprs/block_bloom_filter.hpp"
#include "gutil/macros.h"
namespace doris {
static inline ATTRIBUTE_ALWAYS_INLINE __attribute__((__target__("avx2"))) __m256i make_mark(

View File

@ -26,6 +26,8 @@
#include <mm_malloc.h>
#endif
#include <butil/iobuf.h>
#include <algorithm>
#include <climits>
#include <cmath>
@ -87,17 +89,26 @@ Status BlockBloomFilter::init(const int log_space_bytes, uint32_t hash_seed) {
return Status::OK();
}
Status BlockBloomFilter::init_from_directory(int log_space_bytes, const Slice& directory,
bool always_false, uint32_t hash_seed) {
Status BlockBloomFilter::init_from_directory(int log_space_bytes,
butil::IOBufAsZeroCopyInputStream* data,
const size_t data_size, bool always_false,
uint32_t hash_seed) {
RETURN_IF_ERROR(init_internal(log_space_bytes, hash_seed));
DCHECK(_directory);
if (directory_size() != directory.size) {
return Status::InvalidArgument(
if (directory_size() != data_size) {
return Status::InvalidArgument(fmt::format(
"Mismatch in BlockBloomFilter source directory size {} and expected size {}",
directory.size, directory_size());
data_size, directory_size()));
}
int size = 0;
char* tmp;
const void** ptr = (const void**)&tmp;
char* data_ptr = reinterpret_cast<char*>(_directory);
while (data->Next(ptr, &size)) {
memcpy(data_ptr, *ptr, size);
data_ptr += size;
}
memcpy(_directory, directory.data, directory.size);
_always_false = always_false;
return Status::OK();
}
@ -240,8 +251,8 @@ Status BlockBloomFilter::merge(const BlockBloomFilter& other) {
return Status::OK();
}
or_equal_array_internal(directory_size(), reinterpret_cast<const uint8*>(other._directory),
reinterpret_cast<uint8*>(_directory));
or_equal_array_internal(directory_size(), reinterpret_cast<const uint8_t*>(other._directory),
reinterpret_cast<uint8_t*>(_directory));
_always_false = false;
return Status::OK();

View File

@ -33,6 +33,10 @@
#include "olap/uint24.h"
#include "util/hash_util.hpp"
namespace butil {
class IOBufAsZeroCopyInputStream;
}
namespace doris {
class BloomFilterAdaptor {
public:
@ -50,9 +54,9 @@ public:
return _bloom_filter->init(log_space, /*hash_seed*/ 0);
}
Status init(const char* data, int len) {
int log_space = log2(len);
return _bloom_filter->init_from_directory(log_space, Slice(data, len), false, 0);
Status init(butil::IOBufAsZeroCopyInputStream* data, const size_t data_size) {
int log_space = log2(data_size);
return _bloom_filter->init_from_directory(log_space, data, data_size, false, 0);
}
char* data() { return (char*)_bloom_filter->directory().data; }
@ -161,13 +165,13 @@ public:
}
}
Status assign(const char* data, int len) {
Status assign(butil::IOBufAsZeroCopyInputStream* data, const size_t data_size) {
if (_bloom_filter == nullptr) {
_bloom_filter.reset(BloomFilterAdaptor::create());
}
_bloom_filter_alloced = len;
return _bloom_filter->init(data, len);
_bloom_filter_alloced = data_size;
return _bloom_filter->init(data, data_size);
}
Status get_data(char** data, int* len) {

View File

@ -862,7 +862,7 @@ public:
// used by shuffle runtime filter
// assign this filter by protobuf
Status assign(const PBloomFilter* bloom_filter, const char* data) {
Status assign(const PBloomFilter* bloom_filter, butil::IOBufAsZeroCopyInputStream* data) {
_is_bloomfilter = true;
// we won't use this class to insert or find any data
// so any type is ok

View File

@ -27,6 +27,10 @@
#include "util/time.h"
#include "util/uid_util.h"
namespace butil {
class IOBufAsZeroCopyInputStream;
}
namespace doris {
class Predicate;
class ObjectPool;
@ -97,14 +101,20 @@ struct RuntimeFilterParams {
};
struct UpdateRuntimeFilterParams {
UpdateRuntimeFilterParams(const PPublishFilterRequest* req,
butil::IOBufAsZeroCopyInputStream* data_stream, ObjectPool* obj_pool)
: request(req), data(data_stream), pool(obj_pool) {}
const PPublishFilterRequest* request;
const char* data;
butil::IOBufAsZeroCopyInputStream* data;
ObjectPool* pool;
};
struct MergeRuntimeFilterParams {
MergeRuntimeFilterParams(const PMergeFilterRequest* req,
butil::IOBufAsZeroCopyInputStream* data_stream)
: request(req), data(data_stream) {}
const PMergeFilterRequest* request;
const char* data;
butil::IOBufAsZeroCopyInputStream* data;
};
/// The runtimefilter is built in the join node.

View File

@ -921,7 +921,8 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
return exec_plan_fragment(exec_fragment_params);
}
Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const char* data) {
Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
UniqueId fragment_instance_id = request->fragment_id();
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
std::shared_ptr<FragmentExecState> fragment_state;
@ -946,11 +947,12 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const cha
RuntimeFilterMgr* runtime_filter_mgr =
fragment_state->executor()->runtime_state()->runtime_filter_mgr();
Status st = runtime_filter_mgr->update_filter(request, data);
Status st = runtime_filter_mgr->update_filter(request, attach_data);
return st;
}
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char* attach_data) {
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
UniqueId queryid = request->query_id();
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));

View File

@ -36,6 +36,10 @@
#include "util/metrics.h"
#include "util/thread.h"
namespace butil {
class IOBufAsZeroCopyInputStream;
}
namespace doris {
class QueryFragmentsCtx;
@ -85,9 +89,11 @@ public:
const TUniqueId& fragment_instance_id,
std::vector<TScanColumnDesc>* selected_columns);
Status apply_filter(const PPublishFilterRequest* request, const char* attach_data);
Status apply_filter(const PPublishFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data);
Status merge_filter(const PMergeFilterRequest* request, const char* attach_data);
Status merge_filter(const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data);
void set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe);

View File

@ -110,13 +110,10 @@ Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRunt
return Status::OK();
}
Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, const char* data) {
Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* data) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
UpdateRuntimeFilterParams params;
params.request = request;
params.data = data;
params.pool = &_pool;
UpdateRuntimeFilterParams params(request, data, &_pool);
int filter_id = request->filter_id();
IRuntimeFilter* real_filter = nullptr;
RETURN_IF_ERROR(get_consume_filter(filter_id, &real_filter));
@ -185,7 +182,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag
// merge data
Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request,
const char* data) {
butil::IOBufAsZeroCopyInputStream* attach_data) {
// SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
std::shared_ptr<RuntimeFilterCntlVal> cntVal;
int merged_size = 0;
@ -201,9 +198,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
if (auto bf = cntVal->filter->get_bloomfilter()) {
RETURN_IF_ERROR(bf->init_with_fixed_length());
}
MergeRuntimeFilterParams params;
params.data = data;
params.request = request;
MergeRuntimeFilterParams params(request, attach_data);
ObjectPool* pool = iter->second->pool.get();
RuntimeFilterWrapperHolder holder;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, pool, holder.getHandle()));

View File

@ -32,6 +32,10 @@
#include "util/time.h"
#include "util/uid_util.h"
namespace butil {
class IOBufAsZeroCopyInputStream;
}
namespace doris {
class TUniqueId;
class RuntimeFilter;
@ -69,7 +73,8 @@ public:
const TQueryOptions& options, int node_id = -1);
// update filter by remote
Status update_filter(const PPublishFilterRequest* request, const char* data);
Status update_filter(const PPublishFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* data);
void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params);
@ -113,7 +118,8 @@ public:
const TQueryOptions& query_options);
// handle merge rpc
Status merge(const PMergeFilterRequest* request, const char* data);
Status merge(const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data);
UniqueId query_id() { return _query_id; }

View File

@ -17,6 +17,8 @@
#include "service/internal_service.h"
#include <butil/iobuf.h>
#include <string>
#include "common/config.h"
@ -493,8 +495,9 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr
::doris::PMergeFilterResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
auto buf = static_cast<brpc::Controller*>(controller)->request_attachment();
Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data());
auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream);
if (!st.ok()) {
LOG(WARNING) << "merge meet error" << st.to_string();
}
@ -507,10 +510,10 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr
::google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
UniqueId unique_id(request->query_id());
// TODO: avoid copy attachment copy
VLOG_NOTICE << "rpc apply_filter recv";
Status st = _exec_env->fragment_mgr()->apply_filter(request, attachment.to_string().data());
Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream);
if (!st.ok()) {
LOG(WARNING) << "apply filter meet error: " << st.to_string();
}