diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp index 09d9b1b68f..9e442b3081 100644 --- a/be/src/exprs/aggregate_functions.cpp +++ b/be/src/exprs/aggregate_functions.cpp @@ -1108,11 +1108,9 @@ void AggregateFunctions::hll_merge(FunctionContext* ctx, const StringVal& src, S DCHECK(!src.is_null); DCHECK_EQ(dst->len, std::pow(2, HLL_COLUMN_PRECISION)); DCHECK_EQ(src.len, std::pow(2, HLL_COLUMN_PRECISION)); - - auto dp = dst->ptr; - auto sp = src.ptr; + for (int i = 0; i < src.len; ++i) { - dp[i] = (dp[i] < sp[i] ? sp[i] : dp[i]); + dst->ptr[i] = (dst->ptr[i] < src.ptr[i] ? src.ptr[i] : dst->ptr[i]); } } diff --git a/be/src/olap/hll.cpp b/be/src/olap/hll.cpp index 12fb912b3a..a2ec6e39a0 100644 --- a/be/src/olap/hll.cpp +++ b/be/src/olap/hll.cpp @@ -43,15 +43,13 @@ HyperLogLog::HyperLogLog(const Slice& src) { void HyperLogLog::_convert_explicit_to_register() { DCHECK(_type == HLL_DATA_EXPLICIT) << "_type(" << _type << ") should be explicit(" << HLL_DATA_EXPLICIT << ")"; - _registers = new uint8_t[HLL_REGISTERS_COUNT](); - - for (uint32_t i = 0; i < _explicit_data_num; ++i) { - _update_registers(_explicit_data[i]); + _registers = new uint8_t[HLL_REGISTERS_COUNT]; + memset(_registers, 0, HLL_REGISTERS_COUNT); + for (auto value : _hash_set) { + _update_registers(value); } - - delete [] _explicit_data; - _explicit_data = nullptr; - _explicit_data_num = 0; + // clear _hash_set + phmap::flat_hash_set().swap(_hash_set); } // Change HLL_DATA_EXPLICIT to HLL_DATA_FULL directly, because HLL_DATA_SPARSE @@ -59,14 +57,12 @@ void HyperLogLog::_convert_explicit_to_register() { void HyperLogLog::update(uint64_t hash_value) { switch (_type) { case HLL_DATA_EMPTY: - _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE]; - _explicit_data[0] = hash_value; - _explicit_data_num = 1; + _hash_set.insert(hash_value); _type = HLL_DATA_EXPLICIT; break; case HLL_DATA_EXPLICIT: - if (_explicit_data_num < HLL_EXPLICIT_INT64_NUM) { - _explicit_data_insert(hash_value); + if (_hash_set.size() < HLL_EXPLICIT_INT64_NUM) { + _hash_set.insert(hash_value); break; } _convert_explicit_to_register(); @@ -90,10 +86,7 @@ void HyperLogLog::merge(const HyperLogLog& other) { _type = other._type; switch (other._type) { case HLL_DATA_EXPLICIT: - _explicit_data_num = other._explicit_data_num; - _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE]; - memcpy(_explicit_data, other._explicit_data, - sizeof(*_explicit_data) * _explicit_data_num); + _hash_set = other._hash_set; break; case HLL_DATA_SPARSE: case HLL_DATA_FULL: @@ -110,47 +103,8 @@ void HyperLogLog::merge(const HyperLogLog& other) { case HLL_DATA_EXPLICIT: { // Merge other's explicit values first, then check if the number is exceed // HLL_EXPLICIT_INT64_NUM. This is OK because the max value is 2 * 160. - if (other._explicit_data_num > HLL_EXPLICIT_INT64_NUM / 2) { //merge - uint64_t explicit_data[HLL_EXPLICIT_INT64_NUM * 2]; - memcpy(explicit_data, _explicit_data, sizeof(*_explicit_data) * _explicit_data_num); - uint32_t explicit_data_num = _explicit_data_num; - _explicit_data_num = 0; - - // merge _explicit_data and other's _explicit_data to _explicit_data - uint32_t i = 0, j = 0, k = 0; - while (i < explicit_data_num || j < other._explicit_data_num) { - if (i == explicit_data_num) { - uint32_t n = other._explicit_data_num - j; - memcpy(_explicit_data + k, other._explicit_data + j, - n * sizeof(*_explicit_data)); - k += n; - break; - } else if (j == other._explicit_data_num) { - uint32_t n = explicit_data_num - i; - memcpy(_explicit_data + k, explicit_data + i, n * sizeof(*_explicit_data)); - k += n; - break; - } else { - if (explicit_data[i] < other._explicit_data[j]) { - _explicit_data[k++] = explicit_data[i++]; - } else if (explicit_data[i] > other._explicit_data[j]) { - _explicit_data[k++] = other._explicit_data[j++]; - } else { - _explicit_data[k++] = explicit_data[i++]; - j++; - } - } - } - _explicit_data_num = k; - } else { //insert one by one - int32_t n = other._explicit_data_num; - const uint64_t* data = other._explicit_data; - for (int32_t i = 0; i < n; ++i) { - _explicit_data_insert(data[i]); - } - } - - if (_explicit_data_num > HLL_EXPLICIT_INT64_NUM) { + _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); + if (_hash_set.size() > HLL_EXPLICIT_INT64_NUM) { _convert_explicit_to_register(); _type = HLL_DATA_FULL; } @@ -170,8 +124,8 @@ void HyperLogLog::merge(const HyperLogLog& other) { case HLL_DATA_FULL: { switch (other._type) { case HLL_DATA_EXPLICIT: - for (int32_t i = 0; i < other._explicit_data_num; ++i) { - _update_registers(other._explicit_data[i]); + for (auto hash_value : other._hash_set) { + _update_registers(hash_value); } break; case HLL_DATA_SPARSE: @@ -192,7 +146,7 @@ size_t HyperLogLog::max_serialized_size() const { default: return 1; case HLL_DATA_EXPLICIT: - return 2 + _explicit_data_num * 8; + return 2 + _hash_set.size() * 8; case HLL_DATA_SPARSE: case HLL_DATA_FULL: return 1 + HLL_REGISTERS_COUNT; @@ -201,32 +155,24 @@ size_t HyperLogLog::max_serialized_size() const { size_t HyperLogLog::serialize(uint8_t* dst) const { uint8_t* ptr = dst; - switch (_type) { case HLL_DATA_EMPTY: default: { // When the _type is unknown, which may not happen, we encode it as // Empty HyperLogLog object. *ptr++ = HLL_DATA_EMPTY; - break; } case HLL_DATA_EXPLICIT: { - DCHECK(_explicit_data_num < HLL_EXPLICIT_INT64_NUM) - << "Number of explicit elements(" << _explicit_data_num + DCHECK(_hash_set.size() <= HLL_EXPLICIT_INT64_NUM) + << "Number of explicit elements(" << _hash_set.size() << ") should be less or equal than " << HLL_EXPLICIT_INT64_NUM; *ptr++ = _type; - *ptr++ = (uint8_t)_explicit_data_num; - -#if __BYTE_ORDER == __LITTLE_ENDIAN - memcpy(ptr, _explicit_data, _explicit_data_num * sizeof(*_explicit_data)); - ptr += _explicit_data_num * sizeof(*_explicit_data); -#else - for (int32_t i = 0; i < _explicit_data_num; ++i) { - *(uint64_t*)ptr = (uint64_t)gbswap_64(_explicit_data[i]); + *ptr++ = (uint8_t)_hash_set.size(); + for (auto hash_value : _hash_set) { + encode_fixed64_le(ptr, hash_value); ptr += 8; } -#endif break; } case HLL_DATA_SPARSE: @@ -249,39 +195,15 @@ size_t HyperLogLog::serialize(uint8_t* dst) const { encode_fixed32_le(ptr, num_non_zero_registers); ptr += 4; - for (uint32_t i = 0; i < HLL_REGISTERS_COUNT;) { - if (*(uint32_t*)(&_registers[i]) == 0) { - i += 4; + for (uint32_t i = 0; i < HLL_REGISTERS_COUNT; ++i) { + if (_registers[i] == 0) { continue; } - - if (UNLIKELY(_registers[i])) { - encode_fixed16_le(ptr, i); - ptr += 2; // 2 bytes: register index - *ptr++ = _registers[i]; // 1 byte: register value - } - ++i; - - if (UNLIKELY(_registers[i])) { - encode_fixed16_le(ptr, i); - ptr += 2; // 2 bytes: register index - *ptr++ = _registers[i]; // 1 byte: register value - } - ++i; - - if (UNLIKELY(_registers[i])) { - encode_fixed16_le(ptr, i); - ptr += 2; // 2 bytes: register index - *ptr++ = _registers[i]; // 1 byte: register value - } - ++i; - - if (UNLIKELY(_registers[i])) { - encode_fixed16_le(ptr, i); - ptr += 2; // 2 bytes: register index - *ptr++ = _registers[i]; // 1 byte: register value - } - ++i; + // 2 bytes: register index + // 1 byte: register value + encode_fixed16_le(ptr, i); + ptr += 2; + *ptr++ = _registers[i]; } } break; @@ -355,24 +277,23 @@ bool HyperLogLog::deserialize(const Slice& slice) { // 2: number of explicit values // make sure that num_explicit is positive uint8_t num_explicits = *ptr++; - _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE]; // 3+: 8 bytes hash value for (int i = 0; i < num_explicits; ++i) { - _explicit_data_insert(decode_fixed64_le(ptr)); + _hash_set.insert(decode_fixed64_le(ptr)); ptr += 8; } break; } case HLL_DATA_SPARSE: { - _registers = new uint8_t[HLL_REGISTERS_COUNT](); + _registers = new uint8_t[HLL_REGISTERS_COUNT]; + memset(_registers, 0, HLL_REGISTERS_COUNT); // 2-5(4 byte): number of registers uint32_t num_registers = decode_fixed32_le(ptr); - uint16_t register_idx = 0; ptr += 4; for (uint32_t i = 0; i < num_registers; ++i) { // 2 bytes: register index // 1 byte: register value - register_idx = decode_fixed16_le(ptr); + uint16_t register_idx = decode_fixed16_le(ptr); ptr += 2; _registers[register_idx] = *ptr++; } @@ -397,7 +318,7 @@ int64_t HyperLogLog::estimate_cardinality() const { return 0; } if (_type == HLL_DATA_EXPLICIT) { - return _explicit_data_num; + return _hash_set.size(); } const int num_streams = HLL_REGISTERS_COUNT; diff --git a/be/src/olap/hll.h b/be/src/olap/hll.h index 1ae56e2254..dc157f886e 100644 --- a/be/src/olap/hll.h +++ b/be/src/olap/hll.h @@ -24,6 +24,11 @@ #include #include #include +#include + +#ifdef __x86_64__ +#include +#endif #include "gutil/macros.h" @@ -34,7 +39,6 @@ struct Slice; const static int HLL_COLUMN_PRECISION = 14; const static int HLL_ZERO_COUNT_BITS = (64 - HLL_COLUMN_PRECISION); const static int HLL_EXPLICIT_INT64_NUM = 160; -const static int HLL_EXPLICIT_INT64_NUM_DOUBLE = HLL_EXPLICIT_INT64_NUM * 2; const static int HLL_SPARSE_THRESHOLD = 4096; const static int HLL_REGISTERS_COUNT = 16 * 1024; // maximum size in byte of serialized HLL: type(1) + registers (2^14) @@ -83,10 +87,9 @@ class HyperLogLog { public: HyperLogLog() = default; explicit HyperLogLog(uint64_t hash_value) : _type(HLL_DATA_EXPLICIT) { - _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE]; - _explicit_data[0] = hash_value; - _explicit_data_num = 1; + _hash_set.emplace(hash_value); } + explicit HyperLogLog(const Slice& src); HyperLogLog(const HyperLogLog& other) { this->_type = other._type; @@ -94,10 +97,7 @@ public: case HLL_DATA_EMPTY: break; case HLL_DATA_EXPLICIT: { - this->_explicit_data_num = other._explicit_data_num; - _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE]; - memcpy(_explicit_data, other._explicit_data, - sizeof(*_explicit_data) * _explicit_data_num); + this->_hash_set = other._hash_set; break; } case HLL_DATA_SPARSE: @@ -105,10 +105,10 @@ public: _registers = new uint8_t[HLL_REGISTERS_COUNT]; memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); break; + } default: break; } - } } HyperLogLog(HyperLogLog&& other) { @@ -117,10 +117,7 @@ public: case HLL_DATA_EMPTY: break; case HLL_DATA_EXPLICIT: { - this->_explicit_data_num = other._explicit_data_num; - this->_explicit_data = other._explicit_data; - other._explicit_data_num = 0; - other._explicit_data = nullptr; + this->_hash_set = std::move(other._hash_set); other._type = HLL_DATA_EMPTY; break; } @@ -130,33 +127,25 @@ public: other._registers = nullptr; other._type = HLL_DATA_EMPTY; break; + } default: break; } - } } HyperLogLog& operator=(HyperLogLog&& other) { if (this != &other) { - if (_registers) { + if (_registers != nullptr) { delete[] _registers; _registers = nullptr; } - if (_explicit_data) { - delete[] _explicit_data; - _explicit_data = nullptr; - } - _explicit_data_num = 0; this->_type = other._type; switch (other._type) { case HLL_DATA_EMPTY: break; case HLL_DATA_EXPLICIT: { - this->_explicit_data_num = other._explicit_data_num; - this->_explicit_data = other._explicit_data; - other._explicit_data_num = 0; - other._explicit_data = nullptr; + this->_hash_set = std::move(other._hash_set); other._type = HLL_DATA_EMPTY; break; } @@ -166,35 +155,27 @@ public: other._registers = nullptr; other._type = HLL_DATA_EMPTY; break; + } default: break; } - } } return *this; } HyperLogLog& operator=(const HyperLogLog& other) { if (this != &other) { - if (_registers) { + if (_registers != nullptr) { delete[] _registers; _registers = nullptr; } - if (_explicit_data) { - delete[] _explicit_data; - _explicit_data = nullptr; - } - _explicit_data_num = 0; this->_type = other._type; switch (other._type) { case HLL_DATA_EMPTY: break; case HLL_DATA_EXPLICIT: { - this->_explicit_data_num = other._explicit_data_num; - _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE]; - memcpy(_explicit_data, other._explicit_data, - sizeof(*_explicit_data) * _explicit_data_num); + this->_hash_set = other._hash_set; break; } case HLL_DATA_SPARSE: @@ -202,25 +183,20 @@ public: _registers = new uint8_t[HLL_REGISTERS_COUNT]; memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); break; + } default: break; } - } } return *this; } - explicit HyperLogLog(const Slice& src); - ~HyperLogLog() { clear(); } - void clear() { _type = HLL_DATA_EMPTY; + _hash_set.clear(); delete[] _registers; _registers = nullptr; - delete[] _explicit_data; - _explicit_data = nullptr; - _explicit_data_num = 0; } typedef uint8_t SetTypeValueType; @@ -239,8 +215,10 @@ public: size_t memory_consumed() const { size_t size = sizeof(*this); - if (_explicit_data) size += HLL_EXPLICIT_INT64_NUM_DOUBLE; - if (_registers) size += HLL_REGISTERS_COUNT; + if (_type == HLL_DATA_EXPLICIT) + size += _hash_set.size() * sizeof(uint64_t); + else if (_type == HLL_DATA_SPARSE || _type == HLL_DATA_FULL) + size += HLL_REGISTERS_COUNT; return size; } @@ -277,7 +255,7 @@ public: case HLL_DATA_SPARSE: case HLL_DATA_FULL: { std::string str {"hash set size: "}; - str.append(std::to_string((size_t)_explicit_data_num)); + str.append(std::to_string(_hash_set.size())); str.append("\ncardinality:\t"); str.append(std::to_string(estimate_cardinality())); str.append("\ntype:\t"); @@ -291,9 +269,7 @@ public: private: HllDataType _type = HLL_DATA_EMPTY; - - uint32_t _explicit_data_num = 0; - uint64_t* _explicit_data = nullptr; + phmap::flat_hash_set _hash_set; // This field is much space consuming(HLL_REGISTERS_COUNT), we create // it only when it is really needed. @@ -312,40 +288,28 @@ private: // make sure max first_one_bit is HLL_ZERO_COUNT_BITS + 1 hash_value |= ((uint64_t)1 << HLL_ZERO_COUNT_BITS); uint8_t first_one_bit = __builtin_ctzl(hash_value) + 1; - _registers[idx] = _registers[idx] > first_one_bit ? _registers[idx] : first_one_bit; + _registers[idx] = (_registers[idx] < first_one_bit ? first_one_bit : _registers[idx]); } // absorb other registers into this registers - void _merge_registers(const uint8_t* other) { + void _merge_registers(const uint8_t* other_registers) { +#ifdef __AVX2__ + int loop = HLL_REGISTERS_COUNT / 32; // 32 = 256/8 + uint8_t* dst = _registers; + const uint8_t* src = other_registers; + for (int i = 0; i < loop; i++) { + __m256i xa = _mm256_loadu_si256((const __m256i*)dst); + __m256i xb = _mm256_loadu_si256((const __m256i*)src); + _mm256_storeu_si256((__m256i*)dst, _mm256_max_epu8(xa, xb)); + src += 32; + dst += 32; + } +#else for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) { - _registers[i] = _registers[i] < other[i] ? other[i] : _registers[i]; + _registers[i] = + (_registers[i] < other_registers[i] ? other_registers[i] : _registers[i]); } - } - - bool _explicit_data_insert(uint64_t data) { - //find insert pos - int32_t i = (int32_t)_explicit_data_num - 1; - while (i >= 0) { - if (_explicit_data[i] == data) { - return false; - } else if (_explicit_data[i] < data) { - break; - } else { - --i; - } - } - - ++i; //now, i is the insert position - - size_t n = (_explicit_data_num - i) * sizeof(*_explicit_data); - if (n) { - memmove(_explicit_data + i + 1, _explicit_data + i, n); - } - - //insert data - _explicit_data[i] = data; - _explicit_data_num++; - return true; +#endif } };