diff --git a/be/src/exprs/block_bloom_filter.hpp b/be/src/exprs/block_bloom_filter.hpp index e8876ad135..539f5f80e0 100644 --- a/be/src/exprs/block_bloom_filter.hpp +++ b/be/src/exprs/block_bloom_filter.hpp @@ -30,6 +30,8 @@ namespace doris { // more friendly to CPU Cache, and it is easier to use SIMD instructions to // speed up the implementation. +// BlockBloomFilter will not store null values, and will always return a false if the input is null. + class BlockBloomFilter { public: explicit BlockBloomFilter(); @@ -51,7 +53,9 @@ public: void insert(uint32_t hash) noexcept; // Same as above with convenience of hashing the key. void insert(const Slice& key) noexcept { - insert(HashUtil::murmur_hash3_32(key.data, key.size, _hash_seed)); + if (key.data) { + insert(HashUtil::murmur_hash3_32(key.data, key.size, _hash_seed)); + } } // Finds an element in the BloomFilter, returning true if it is found and false (with @@ -59,7 +63,11 @@ public: bool find(uint32_t hash) const noexcept; // Same as above with convenience of hashing the key. bool find(const Slice& key) const noexcept { - return find(HashUtil::murmur_hash3_32(key.data, key.size, _hash_seed)); + if (key.data) { + return find(HashUtil::murmur_hash3_32(key.data, key.size, _hash_seed)); + } else { + return false; + } } // Computes the logical OR of this filter with 'other' and stores the result in this diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index a1958508ba..739b004cce 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -56,7 +56,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_ _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + if (config::enable_partitioned_aggregation) { + _mem_tracker->Consume(_tuple_ptrs_size); + _tuple_ptrs = reinterpret_cast(malloc(_tuple_ptrs_size)); + DCHECK(_tuple_ptrs != NULL); + } else { + _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + } } // TODO: we want our input_batch's tuple_data to come from our (not yet implemented) @@ -82,7 +88,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + if (config::enable_partitioned_aggregation) { + _mem_tracker->Consume(_tuple_ptrs_size); + _tuple_ptrs = reinterpret_cast(malloc(_tuple_ptrs_size)); + DCHECK(_tuple_ptrs != nullptr); + } else { + _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + } uint8_t* tuple_data = nullptr; if (input_batch.is_compressed()) { @@ -210,7 +222,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, _tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + if (config::enable_partitioned_aggregation) { + _mem_tracker->Consume(_tuple_ptrs_size); + _tuple_ptrs = reinterpret_cast(malloc(_tuple_ptrs_size)); + DCHECK(_tuple_ptrs != NULL); + } else { + _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + } uint8_t* tuple_data = NULL; if (input_batch.is_compressed) { @@ -338,6 +356,12 @@ void RowBatch::clear() { for (int i = 0; i < _blocks.size(); ++i) { _blocks[i]->del(); } + if (config::enable_partitioned_aggregation) { + DCHECK(_tuple_ptrs != NULL); + free(_tuple_ptrs); + _mem_tracker->Release(_tuple_ptrs_size); + _tuple_ptrs = NULL; + } _cleared = true; } @@ -521,6 +545,8 @@ void RowBatch::reset() { _capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*)); _has_in_flight_row = false; + // TODO: Change this to Clear() and investigate the repercussions. + _tuple_data_pool->free_all(); _agg_object_pool.reset(new ObjectPool()); for (int i = 0; i < _io_buffers.size(); ++i) { _io_buffers[i]->return_buffer(); @@ -538,6 +564,9 @@ void RowBatch::reset() { } _blocks.clear(); _auxiliary_mem_usage = 0; + if (!config::enable_partitioned_aggregation) { + _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + } _need_to_return = false; _flush = FlushMode::NO_FLUSH_RESOURCES; _needs_deep_copy = false; @@ -635,7 +664,14 @@ void RowBatch::acquire_state(RowBatch* src) { _num_rows = src->_num_rows; _capacity = src->_capacity; _need_to_return = src->_need_to_return; - std::swap(_tuple_ptrs, src->_tuple_ptrs); + if (!config::enable_partitioned_aggregation) { + // Tuple pointers are allocated from tuple_data_pool_ so are transferred. + _tuple_ptrs = src->_tuple_ptrs; + src->_tuple_ptrs = NULL; + } else { + // tuple_ptrs_ were allocated with malloc so can be swapped between batches. + std::swap(_tuple_ptrs, src->_tuple_ptrs); + } src->transfer_resource_ownership(this); } diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index 4ff6624b83..66e2f6979e 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -70,6 +70,7 @@ class PRowBatch; // // A row batch is considered at capacity if all the rows are full or it has accumulated // auxiliary memory up to a soft cap. (See _at_capacity_mem_usage comment). +// TODO: stick _tuple_ptrs into a pool? class RowBatch : public RowBatchInterface { public: /// Flag indicating whether the resources attached to a RowBatch need to be flushed. @@ -413,14 +414,22 @@ private: int _num_tuples_per_row; RowDescriptor _row_desc; - // Memory is allocated from MemPool, need to investigate the repercussions. - // - // In the past, there were malloc'd and MemPool memory allocation methods. - // Malloc'd memory belongs to RowBatch itself, and the latter belongs to MemPool management. + // Array of pointers with _capacity * _num_tuples_per_row elements. // The memory ownership depends on whether legacy joins and aggs are enabled. // - // At present, it is allocated from MemPool uniformly, and tuple pointers are not transferred - // and do not have to be re-created in every Reset(), which has better performance. + // Memory is malloc'd and owned by RowBatch: + // If enable_partitioned_hash_join=true and enable_partitioned_aggregation=true + // then the memory is owned by this RowBatch and is freed upon its destruction. + // This mode is more performant especially with SubplanNodes in the ExecNode tree + // because the tuple pointers are not transferred and do not have to be re-created + // in every Reset(). + // + // Memory is allocated from MemPool: + // Otherwise, the memory is allocated from _tuple_data_pool. As a result, the + // pointer memory is transferred just like tuple data, and must be re-created + // in Reset(). This mode is required for the legacy join and agg which rely on + // the tuple pointers being allocated from the _tuple_data_pool, so they can + // acquire ownership of the tuple pointers. Tuple** _tuple_ptrs; int _tuple_ptrs_size; diff --git a/be/test/exprs/bloom_filter_predicate_test.cpp b/be/test/exprs/bloom_filter_predicate_test.cpp index 8b0f1ab6c0..3e4b2411c9 100644 --- a/be/test/exprs/bloom_filter_predicate_test.cpp +++ b/be/test/exprs/bloom_filter_predicate_test.cpp @@ -46,6 +46,9 @@ TEST_F(BloomFilterPredicateTest, bloom_filter_func_int_test) { // test not exist val int not_exist_val = 0x3355ff; ASSERT_FALSE(func->find((const void*)¬_exist_val)); + // TEST null value + func->insert(nullptr); + func->find(nullptr); } TEST_F(BloomFilterPredicateTest, bloom_filter_func_stringval_test) {