[Optimiaze] Optimize HyperLogLog (#6625)
1. Replace std::max with a ternary expression, std::max is much heavier than the ternary operator 2. Replace std::set with arrays, std::set is based on red-black trees, traversal will follow the chain domain, and cache hits are not good 3. Optimize the serialize function, improve the calculation speed of num_non_zero_registers by reducing branches, and the serialization of _registers after optimization is faster 4. The test found that the performance improvement is more obvious
This commit is contained in:
@ -1112,7 +1112,7 @@ void AggregateFunctions::hll_update(FunctionContext* ctx, const T& src, StringVa
|
||||
if (hash_value != 0) {
|
||||
int idx = hash_value % dst->len;
|
||||
uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1;
|
||||
dst->ptr[idx] = std::max(dst->ptr[idx], first_one_bit);
|
||||
dst->ptr[idx] = (dst->ptr[idx] < first_one_bit ? first_one_bit : dst->ptr[idx]);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1122,8 +1122,10 @@ void AggregateFunctions::hll_merge(FunctionContext* ctx, const StringVal& src, S
|
||||
DCHECK_EQ(dst->len, std::pow(2, HLL_COLUMN_PRECISION));
|
||||
DCHECK_EQ(src.len, std::pow(2, HLL_COLUMN_PRECISION));
|
||||
|
||||
auto dp = dst->ptr;
|
||||
auto sp = src.ptr;
|
||||
for (int i = 0; i < src.len; ++i) {
|
||||
dst->ptr[i] = std::max(dst->ptr[i], src.ptr[i]);
|
||||
dp[i] = (dp[i] < sp[i] ? sp[i] : dp[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -15,12 +15,11 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "olap/hll.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <map>
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "olap/hll.h"
|
||||
#include "runtime/string_value.h"
|
||||
#include "util/coding.h"
|
||||
|
||||
@ -43,13 +42,13 @@ HyperLogLog::HyperLogLog(const Slice& src) {
|
||||
void HyperLogLog::_convert_explicit_to_register() {
|
||||
DCHECK(_type == HLL_DATA_EXPLICIT)
|
||||
<< "_type(" << _type << ") should be explicit(" << HLL_DATA_EXPLICIT << ")";
|
||||
_registers = new uint8_t[HLL_REGISTERS_COUNT];
|
||||
memset(_registers, 0, HLL_REGISTERS_COUNT);
|
||||
for (auto value : _hash_set) {
|
||||
_update_registers(value);
|
||||
_registers = new uint8_t[HLL_REGISTERS_COUNT]();
|
||||
|
||||
for (uint32_t i = 0; i < _explicit_data_num; ++i) {
|
||||
_update_registers(_explicit_data[i]);
|
||||
}
|
||||
// clear _hash_set
|
||||
std::set<uint64_t>().swap(_hash_set);
|
||||
|
||||
_explicit_data_num = 0;
|
||||
}
|
||||
|
||||
// Change HLL_DATA_EXPLICIT to HLL_DATA_FULL directly, because HLL_DATA_SPARSE
|
||||
@ -57,12 +56,13 @@ void HyperLogLog::_convert_explicit_to_register() {
|
||||
void HyperLogLog::update(uint64_t hash_value) {
|
||||
switch (_type) {
|
||||
case HLL_DATA_EMPTY:
|
||||
_hash_set.insert(hash_value);
|
||||
_explicit_data[0] = hash_value;
|
||||
_explicit_data_num = 1;
|
||||
_type = HLL_DATA_EXPLICIT;
|
||||
break;
|
||||
case HLL_DATA_EXPLICIT:
|
||||
if (_hash_set.size() < HLL_EXPLICIT_INT64_NUM) {
|
||||
_hash_set.insert(hash_value);
|
||||
if (_explicit_data_num < HLL_EXPLICIT_INT64_NUM) {
|
||||
_explicit_data_insert(hash_value);
|
||||
break;
|
||||
}
|
||||
_convert_explicit_to_register();
|
||||
@ -86,7 +86,9 @@ void HyperLogLog::merge(const HyperLogLog& other) {
|
||||
_type = other._type;
|
||||
switch (other._type) {
|
||||
case HLL_DATA_EXPLICIT:
|
||||
_hash_set = other._hash_set;
|
||||
_explicit_data_num = other._explicit_data_num;
|
||||
memcpy(_explicit_data, other._explicit_data,
|
||||
sizeof(*_explicit_data) * _explicit_data_num);
|
||||
break;
|
||||
case HLL_DATA_SPARSE:
|
||||
case HLL_DATA_FULL:
|
||||
@ -100,15 +102,54 @@ void HyperLogLog::merge(const HyperLogLog& other) {
|
||||
}
|
||||
case HLL_DATA_EXPLICIT: {
|
||||
switch (other._type) {
|
||||
case HLL_DATA_EXPLICIT:
|
||||
case HLL_DATA_EXPLICIT: {
|
||||
// Merge other's explicit values first, then check if the number is exceed
|
||||
// HLL_EXPLICIT_INT64_NUM. This is OK because the max value is 2 * 160.
|
||||
_hash_set.insert(other._hash_set.begin(), other._hash_set.end());
|
||||
if (_hash_set.size() > HLL_EXPLICIT_INT64_NUM) {
|
||||
if (other._explicit_data_num > HLL_EXPLICIT_INT64_NUM / 2) { //merge
|
||||
uint64_t explicit_data[HLL_EXPLICIT_INT64_NUM * 2];
|
||||
memcpy(explicit_data, _explicit_data, sizeof(*_explicit_data) * _explicit_data_num);
|
||||
uint32_t explicit_data_num = _explicit_data_num;
|
||||
_explicit_data_num = 0;
|
||||
|
||||
// merge _explicit_data and other's _explicit_data to _explicit_data
|
||||
uint32_t i = 0, j = 0, k = 0;
|
||||
while (i < explicit_data_num || j < other._explicit_data_num) {
|
||||
if (i == explicit_data_num) {
|
||||
uint32_t n = other._explicit_data_num - j;
|
||||
memcpy(_explicit_data + k, other._explicit_data + j,
|
||||
n * sizeof(*_explicit_data));
|
||||
k += n;
|
||||
break;
|
||||
} else if (j == other._explicit_data_num) {
|
||||
uint32_t n = explicit_data_num - i;
|
||||
memcpy(_explicit_data + k, explicit_data + i, n * sizeof(*_explicit_data));
|
||||
k += n;
|
||||
break;
|
||||
} else {
|
||||
if (explicit_data[i] < other._explicit_data[j]) {
|
||||
_explicit_data[k++] = explicit_data[i++];
|
||||
} else if (explicit_data[i] > other._explicit_data[j]) {
|
||||
_explicit_data[k++] = other._explicit_data[j++];
|
||||
} else {
|
||||
_explicit_data[k++] = explicit_data[i++];
|
||||
j++;
|
||||
}
|
||||
}
|
||||
}
|
||||
_explicit_data_num = k;
|
||||
} else { //insert one by one
|
||||
int32_t n = other._explicit_data_num;
|
||||
const uint64_t* data = other._explicit_data;
|
||||
for (int32_t i = 0; i < n; ++i) {
|
||||
_explicit_data_insert(data[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if (_explicit_data_num > HLL_EXPLICIT_INT64_NUM) {
|
||||
_convert_explicit_to_register();
|
||||
_type = HLL_DATA_FULL;
|
||||
}
|
||||
break;
|
||||
} break;
|
||||
case HLL_DATA_SPARSE:
|
||||
case HLL_DATA_FULL:
|
||||
_convert_explicit_to_register();
|
||||
@ -124,8 +165,8 @@ void HyperLogLog::merge(const HyperLogLog& other) {
|
||||
case HLL_DATA_FULL: {
|
||||
switch (other._type) {
|
||||
case HLL_DATA_EXPLICIT:
|
||||
for (auto hash_value : other._hash_set) {
|
||||
_update_registers(hash_value);
|
||||
for (int32_t i = 0; i < other._explicit_data_num; ++i) {
|
||||
_update_registers(other._explicit_data[i]);
|
||||
}
|
||||
break;
|
||||
case HLL_DATA_SPARSE:
|
||||
@ -146,7 +187,7 @@ size_t HyperLogLog::max_serialized_size() const {
|
||||
default:
|
||||
return 1;
|
||||
case HLL_DATA_EXPLICIT:
|
||||
return 2 + _hash_set.size() * 8;
|
||||
return 2 + _explicit_data_num * 8;
|
||||
case HLL_DATA_SPARSE:
|
||||
case HLL_DATA_FULL:
|
||||
return 1 + HLL_REGISTERS_COUNT;
|
||||
@ -155,34 +196,41 @@ size_t HyperLogLog::max_serialized_size() const {
|
||||
|
||||
size_t HyperLogLog::serialize(uint8_t* dst) const {
|
||||
uint8_t* ptr = dst;
|
||||
|
||||
switch (_type) {
|
||||
case HLL_DATA_EMPTY:
|
||||
default: {
|
||||
// When the _type is unknown, which may not happen, we encode it as
|
||||
// Empty HyperLogLog object.
|
||||
*ptr++ = HLL_DATA_EMPTY;
|
||||
|
||||
break;
|
||||
}
|
||||
case HLL_DATA_EXPLICIT: {
|
||||
DCHECK(_hash_set.size() <= HLL_EXPLICIT_INT64_NUM)
|
||||
<< "Number of explicit elements(" << _hash_set.size()
|
||||
DCHECK(_explicit_data_num < HLL_EXPLICIT_INT64_NUM)
|
||||
<< "Number of explicit elements(" << _explicit_data_num
|
||||
<< ") should be less or equal than " << HLL_EXPLICIT_INT64_NUM;
|
||||
*ptr++ = _type;
|
||||
*ptr++ = (uint8_t)_hash_set.size();
|
||||
for (auto hash_value : _hash_set) {
|
||||
encode_fixed64_le(ptr, hash_value);
|
||||
*ptr++ = (uint8_t)_explicit_data_num;
|
||||
|
||||
#if __BYTE_ORDER == __LITTLE_ENDIAN
|
||||
memcpy(ptr, _explicit_data, _explicit_data_num * sizeof(*_explicit_data));
|
||||
ptr += _explicit_data_num * sizeof(*_explicit_data);
|
||||
#else
|
||||
for (int32_t i = 0; i < _explicit_data_num; ++i) {
|
||||
*(uint64_t*)ptr = (uint64_t)gbswap_64(_explicit_data[i]);
|
||||
ptr += 8;
|
||||
}
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
case HLL_DATA_SPARSE:
|
||||
case HLL_DATA_FULL: {
|
||||
uint32_t num_non_zero_registers = 0;
|
||||
for (int i = 0; i < HLL_REGISTERS_COUNT; i++) {
|
||||
if (_registers[i] != 0) {
|
||||
num_non_zero_registers++;
|
||||
}
|
||||
for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) {
|
||||
num_non_zero_registers += (_registers[i] != 0);
|
||||
}
|
||||
|
||||
// each register in sparse format will occupy 3bytes, 2 for index and
|
||||
// 1 for register value. So if num_non_zero_registers is greater than
|
||||
// 4K we use full encode format.
|
||||
@ -196,15 +244,39 @@ size_t HyperLogLog::serialize(uint8_t* dst) const {
|
||||
encode_fixed32_le(ptr, num_non_zero_registers);
|
||||
ptr += 4;
|
||||
|
||||
for (uint32_t i = 0; i < HLL_REGISTERS_COUNT; ++i) {
|
||||
if (_registers[i] == 0) {
|
||||
for (uint32_t i = 0; i < HLL_REGISTERS_COUNT;) {
|
||||
if (*(uint32_t*)(&_registers[i]) == 0) {
|
||||
i += 4;
|
||||
continue;
|
||||
}
|
||||
// 2 bytes: register index
|
||||
// 1 byte: register value
|
||||
encode_fixed16_le(ptr, i);
|
||||
ptr += 2;
|
||||
*ptr++ = _registers[i];
|
||||
|
||||
if (UNLIKELY(_registers[i])) {
|
||||
encode_fixed16_le(ptr, i);
|
||||
ptr += 2; // 2 bytes: register index
|
||||
*ptr++ = _registers[i]; // 1 byte: register value
|
||||
}
|
||||
++i;
|
||||
|
||||
if (UNLIKELY(_registers[i])) {
|
||||
encode_fixed16_le(ptr, i);
|
||||
ptr += 2; // 2 bytes: register index
|
||||
*ptr++ = _registers[i]; // 1 byte: register value
|
||||
}
|
||||
++i;
|
||||
|
||||
if (UNLIKELY(_registers[i])) {
|
||||
encode_fixed16_le(ptr, i);
|
||||
ptr += 2; // 2 bytes: register index
|
||||
*ptr++ = _registers[i]; // 1 byte: register value
|
||||
}
|
||||
++i;
|
||||
|
||||
if (UNLIKELY(_registers[i])) {
|
||||
encode_fixed16_le(ptr, i);
|
||||
ptr += 2; // 2 bytes: register index
|
||||
*ptr++ = _registers[i]; // 1 byte: register value
|
||||
}
|
||||
++i;
|
||||
}
|
||||
}
|
||||
break;
|
||||
@ -280,22 +352,21 @@ bool HyperLogLog::deserialize(const Slice& slice) {
|
||||
uint8_t num_explicits = *ptr++;
|
||||
// 3+: 8 bytes hash value
|
||||
for (int i = 0; i < num_explicits; ++i) {
|
||||
_hash_set.insert(decode_fixed64_le(ptr));
|
||||
_explicit_data_insert(decode_fixed64_le(ptr));
|
||||
ptr += 8;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case HLL_DATA_SPARSE: {
|
||||
_registers = new uint8_t[HLL_REGISTERS_COUNT];
|
||||
memset(_registers, 0, HLL_REGISTERS_COUNT);
|
||||
|
||||
_registers = new uint8_t[HLL_REGISTERS_COUNT]();
|
||||
// 2-5(4 byte): number of registers
|
||||
uint32_t num_registers = decode_fixed32_le(ptr);
|
||||
uint16_t register_idx = 0;
|
||||
ptr += 4;
|
||||
for (uint32_t i = 0; i < num_registers; ++i) {
|
||||
// 2 bytes: register index
|
||||
// 1 byte: register value
|
||||
uint16_t register_idx = decode_fixed16_le(ptr);
|
||||
register_idx = decode_fixed16_le(ptr);
|
||||
ptr += 2;
|
||||
_registers[register_idx] = *ptr++;
|
||||
}
|
||||
@ -320,7 +391,7 @@ int64_t HyperLogLog::estimate_cardinality() const {
|
||||
return 0;
|
||||
}
|
||||
if (_type == HLL_DATA_EXPLICIT) {
|
||||
return _hash_set.size();
|
||||
return _explicit_data_num;
|
||||
}
|
||||
|
||||
const int num_streams = HLL_REGISTERS_COUNT;
|
||||
|
||||
@ -18,10 +18,9 @@
|
||||
#ifndef DORIS_BE_SRC_OLAP_HLL_H
|
||||
#define DORIS_BE_SRC_OLAP_HLL_H
|
||||
|
||||
#include <map>
|
||||
#include <math.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <string>
|
||||
|
||||
@ -65,7 +64,7 @@ const static int HLL_EMPTY_SIZE = 1;
|
||||
// (1 + 4 + 3 * 4096) = 12293.
|
||||
//
|
||||
// HLL_DATA_FULL: most space-consuming, store all registers
|
||||
//
|
||||
//
|
||||
// A HLL value will change in the sequence empty -> explicit -> sparse -> full, and not
|
||||
// allow reverse.
|
||||
//
|
||||
@ -80,9 +79,11 @@ enum HllDataType {
|
||||
|
||||
class HyperLogLog {
|
||||
public:
|
||||
|
||||
HyperLogLog() = default;
|
||||
explicit HyperLogLog(uint64_t hash_value) : _type(HLL_DATA_EXPLICIT) {
|
||||
_hash_set.emplace(hash_value);
|
||||
explicit HyperLogLog(uint64_t hash_value): _type(HLL_DATA_EXPLICIT) {
|
||||
_explicit_data[0] = hash_value;
|
||||
_explicit_data_num = 1;
|
||||
}
|
||||
|
||||
explicit HyperLogLog(const Slice& src);
|
||||
@ -124,7 +125,7 @@ public:
|
||||
|
||||
// Check if input slice is a valid serialized binary of HyperLogLog.
|
||||
// This function only check the encoded type in slice, whose complex
|
||||
// function is O(1).
|
||||
// function is O(1).
|
||||
static bool is_valid(const Slice& slice);
|
||||
|
||||
// only for debug
|
||||
@ -134,15 +135,16 @@ public:
|
||||
return {};
|
||||
case HLL_DATA_EXPLICIT:
|
||||
case HLL_DATA_SPARSE:
|
||||
case HLL_DATA_FULL: {
|
||||
std::string str{"hash set size: "};
|
||||
str.append(std::to_string(_hash_set.size()));
|
||||
str.append("\ncardinality:\t");
|
||||
str.append(std::to_string(estimate_cardinality()));
|
||||
str.append("\ntype:\t");
|
||||
str.append(std::to_string(_type));
|
||||
return str;
|
||||
}
|
||||
case HLL_DATA_FULL:
|
||||
{
|
||||
std::string str {"hash set size: "};
|
||||
str.append(std::to_string((size_t)_explicit_data_num));
|
||||
str.append("\ncardinality:\t");
|
||||
str.append(std::to_string(estimate_cardinality()));
|
||||
str.append("\ntype:\t");
|
||||
str.append(std::to_string(_type));
|
||||
return str;
|
||||
}
|
||||
default:
|
||||
return {};
|
||||
}
|
||||
@ -150,7 +152,9 @@ public:
|
||||
|
||||
private:
|
||||
HllDataType _type = HLL_DATA_EMPTY;
|
||||
std::set<uint64_t> _hash_set;
|
||||
|
||||
uint32_t _explicit_data_num = 0;
|
||||
uint64_t _explicit_data[HLL_EXPLICIT_INT64_NUM * 2];
|
||||
|
||||
// This field is much space consuming(HLL_REGISTERS_COUNT), we create
|
||||
// it only when it is really needed.
|
||||
@ -162,7 +166,7 @@ private:
|
||||
void _convert_explicit_to_register();
|
||||
|
||||
// update one hash value into this registers
|
||||
void _update_registers(uint64_t hash_value) {
|
||||
inline void _update_registers(uint64_t hash_value) {
|
||||
// Use the lower bits to index into the number of streams and then
|
||||
// find the first 1 bit after the index bits.
|
||||
int idx = hash_value % HLL_REGISTERS_COUNT;
|
||||
@ -170,27 +174,52 @@ private:
|
||||
// make sure max first_one_bit is HLL_ZERO_COUNT_BITS + 1
|
||||
hash_value |= ((uint64_t)1 << HLL_ZERO_COUNT_BITS);
|
||||
uint8_t first_one_bit = __builtin_ctzl(hash_value) + 1;
|
||||
_registers[idx] = std::max((uint8_t)_registers[idx], first_one_bit);
|
||||
_registers[idx] = _registers[idx] > first_one_bit ? _registers[idx] : first_one_bit;
|
||||
}
|
||||
|
||||
// absorb other registers into this registers
|
||||
void _merge_registers(const uint8_t* other_registers) {
|
||||
void _merge_registers(const uint8_t* other) {
|
||||
for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) {
|
||||
_registers[i] = std::max(_registers[i], other_registers[i]);
|
||||
_registers[i] = _registers[i] < other[i] ? other[i] : _registers[i];
|
||||
}
|
||||
}
|
||||
|
||||
bool _explicit_data_insert(uint64_t data) {
|
||||
//find insert pos
|
||||
int32_t i = (int32_t)_explicit_data_num - 1;
|
||||
while (i >= 0) {
|
||||
if (_explicit_data[i] == data) {
|
||||
return false;
|
||||
} else if (_explicit_data[i] < data) {
|
||||
break;
|
||||
} else {
|
||||
--i;
|
||||
}
|
||||
}
|
||||
|
||||
++i; //now, i is the insert position
|
||||
|
||||
size_t n = (_explicit_data_num - i) * sizeof(*_explicit_data);
|
||||
if (n) {
|
||||
memmove(_explicit_data+i+1, _explicit_data+i, n);
|
||||
}
|
||||
|
||||
//insert data
|
||||
_explicit_data[i] = data;
|
||||
_explicit_data_num++;
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
// todo(kks): remove this when dpp_sink class was removed
|
||||
class HllSetResolver {
|
||||
public:
|
||||
HllSetResolver()
|
||||
: _buf_ref(nullptr),
|
||||
_buf_len(0),
|
||||
_set_type(HLL_DATA_EMPTY),
|
||||
_full_value_position(nullptr),
|
||||
_explicit_value(nullptr),
|
||||
_explicit_num(0) {}
|
||||
HllSetResolver() : _buf_ref(nullptr),
|
||||
_buf_len(0),
|
||||
_set_type(HLL_DATA_EMPTY),
|
||||
_full_value_position(nullptr),
|
||||
_explicit_value(nullptr),
|
||||
_explicit_num(0) {}
|
||||
|
||||
~HllSetResolver() {}
|
||||
|
||||
@ -201,16 +230,16 @@ public:
|
||||
typedef uint8_t SparseValueType;
|
||||
|
||||
// only save pointer
|
||||
void init(char* buf, int len) {
|
||||
void init(char* buf, int len){
|
||||
this->_buf_ref = buf;
|
||||
this->_buf_len = len;
|
||||
}
|
||||
|
||||
// hll set type
|
||||
HllDataType get_hll_data_type() { return _set_type; };
|
||||
HllDataType get_hll_data_type() { return _set_type; }
|
||||
|
||||
// explicit value num
|
||||
int get_explicit_count() { return (int)_explicit_num; };
|
||||
int get_explicit_count() { return (int)_explicit_num; }
|
||||
|
||||
// get explicit index value 64bit
|
||||
uint64_t get_explicit_value(int index) {
|
||||
@ -218,21 +247,20 @@ public:
|
||||
return -1;
|
||||
}
|
||||
return _explicit_value[index];
|
||||
};
|
||||
}
|
||||
|
||||
// get full register value
|
||||
char* get_full_value() { return _full_value_position; };
|
||||
char* get_full_value() { return _full_value_position; }
|
||||
|
||||
// get (index, value) map
|
||||
std::map<SparseIndexType, SparseValueType>& get_sparse_map() { return _sparse_map; };
|
||||
std::map<SparseIndexType, SparseValueType>& get_sparse_map() { return _sparse_map; }
|
||||
|
||||
// parse set , call after copy() or init()
|
||||
void parse();
|
||||
|
||||
private:
|
||||
char* _buf_ref; // set
|
||||
int _buf_len; // set len
|
||||
HllDataType _set_type; //set type
|
||||
private :
|
||||
char* _buf_ref; // set
|
||||
int _buf_len; // set len
|
||||
HllDataType _set_type; //set type
|
||||
char* _full_value_position;
|
||||
uint64_t* _explicit_value;
|
||||
ExplicitLengthValueType _explicit_num;
|
||||
@ -249,6 +277,6 @@ public:
|
||||
const int set_len, int& len);
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
#endif // DORIS_BE_SRC_OLAP_HLL_H
|
||||
|
||||
Reference in New Issue
Block a user