diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp index 9d1d140df8..09295a07bd 100644 --- a/be/src/exprs/aggregate_functions.cpp +++ b/be/src/exprs/aggregate_functions.cpp @@ -1112,7 +1112,7 @@ void AggregateFunctions::hll_update(FunctionContext* ctx, const T& src, StringVa if (hash_value != 0) { int idx = hash_value % dst->len; uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1; - dst->ptr[idx] = std::max(dst->ptr[idx], first_one_bit); + dst->ptr[idx] = (dst->ptr[idx] < first_one_bit ? first_one_bit : dst->ptr[idx]); } } @@ -1122,8 +1122,10 @@ void AggregateFunctions::hll_merge(FunctionContext* ctx, const StringVal& src, S DCHECK_EQ(dst->len, std::pow(2, HLL_COLUMN_PRECISION)); DCHECK_EQ(src.len, std::pow(2, HLL_COLUMN_PRECISION)); + auto dp = dst->ptr; + auto sp = src.ptr; for (int i = 0; i < src.len; ++i) { - dst->ptr[i] = std::max(dst->ptr[i], src.ptr[i]); + dp[i] = (dp[i] < sp[i] ? sp[i] : dp[i]); } } diff --git a/be/src/olap/hll.cpp b/be/src/olap/hll.cpp index 885373a6f0..e1293f251a 100644 --- a/be/src/olap/hll.cpp +++ b/be/src/olap/hll.cpp @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "olap/hll.h" - #include #include #include "common/logging.h" +#include "olap/hll.h" #include "runtime/string_value.h" #include "util/coding.h" @@ -43,13 +42,13 @@ HyperLogLog::HyperLogLog(const Slice& src) { void HyperLogLog::_convert_explicit_to_register() { DCHECK(_type == HLL_DATA_EXPLICIT) << "_type(" << _type << ") should be explicit(" << HLL_DATA_EXPLICIT << ")"; - _registers = new uint8_t[HLL_REGISTERS_COUNT]; - memset(_registers, 0, HLL_REGISTERS_COUNT); - for (auto value : _hash_set) { - _update_registers(value); + _registers = new uint8_t[HLL_REGISTERS_COUNT](); + + for (uint32_t i = 0; i < _explicit_data_num; ++i) { + _update_registers(_explicit_data[i]); } - // clear _hash_set - std::set().swap(_hash_set); + + _explicit_data_num = 0; } // Change HLL_DATA_EXPLICIT to HLL_DATA_FULL directly, because HLL_DATA_SPARSE @@ -57,12 +56,13 @@ void HyperLogLog::_convert_explicit_to_register() { void HyperLogLog::update(uint64_t hash_value) { switch (_type) { case HLL_DATA_EMPTY: - _hash_set.insert(hash_value); + _explicit_data[0] = hash_value; + _explicit_data_num = 1; _type = HLL_DATA_EXPLICIT; break; case HLL_DATA_EXPLICIT: - if (_hash_set.size() < HLL_EXPLICIT_INT64_NUM) { - _hash_set.insert(hash_value); + if (_explicit_data_num < HLL_EXPLICIT_INT64_NUM) { + _explicit_data_insert(hash_value); break; } _convert_explicit_to_register(); @@ -86,7 +86,9 @@ void HyperLogLog::merge(const HyperLogLog& other) { _type = other._type; switch (other._type) { case HLL_DATA_EXPLICIT: - _hash_set = other._hash_set; + _explicit_data_num = other._explicit_data_num; + memcpy(_explicit_data, other._explicit_data, + sizeof(*_explicit_data) * _explicit_data_num); break; case HLL_DATA_SPARSE: case HLL_DATA_FULL: @@ -100,15 +102,54 @@ void HyperLogLog::merge(const HyperLogLog& other) { } case HLL_DATA_EXPLICIT: { switch (other._type) { - case HLL_DATA_EXPLICIT: + case HLL_DATA_EXPLICIT: { // Merge other's explicit values first, then check if the number is exceed // HLL_EXPLICIT_INT64_NUM. This is OK because the max value is 2 * 160. - _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); - if (_hash_set.size() > HLL_EXPLICIT_INT64_NUM) { + if (other._explicit_data_num > HLL_EXPLICIT_INT64_NUM / 2) { //merge + uint64_t explicit_data[HLL_EXPLICIT_INT64_NUM * 2]; + memcpy(explicit_data, _explicit_data, sizeof(*_explicit_data) * _explicit_data_num); + uint32_t explicit_data_num = _explicit_data_num; + _explicit_data_num = 0; + + // merge _explicit_data and other's _explicit_data to _explicit_data + uint32_t i = 0, j = 0, k = 0; + while (i < explicit_data_num || j < other._explicit_data_num) { + if (i == explicit_data_num) { + uint32_t n = other._explicit_data_num - j; + memcpy(_explicit_data + k, other._explicit_data + j, + n * sizeof(*_explicit_data)); + k += n; + break; + } else if (j == other._explicit_data_num) { + uint32_t n = explicit_data_num - i; + memcpy(_explicit_data + k, explicit_data + i, n * sizeof(*_explicit_data)); + k += n; + break; + } else { + if (explicit_data[i] < other._explicit_data[j]) { + _explicit_data[k++] = explicit_data[i++]; + } else if (explicit_data[i] > other._explicit_data[j]) { + _explicit_data[k++] = other._explicit_data[j++]; + } else { + _explicit_data[k++] = explicit_data[i++]; + j++; + } + } + } + _explicit_data_num = k; + } else { //insert one by one + int32_t n = other._explicit_data_num; + const uint64_t* data = other._explicit_data; + for (int32_t i = 0; i < n; ++i) { + _explicit_data_insert(data[i]); + } + } + + if (_explicit_data_num > HLL_EXPLICIT_INT64_NUM) { _convert_explicit_to_register(); _type = HLL_DATA_FULL; } - break; + } break; case HLL_DATA_SPARSE: case HLL_DATA_FULL: _convert_explicit_to_register(); @@ -124,8 +165,8 @@ void HyperLogLog::merge(const HyperLogLog& other) { case HLL_DATA_FULL: { switch (other._type) { case HLL_DATA_EXPLICIT: - for (auto hash_value : other._hash_set) { - _update_registers(hash_value); + for (int32_t i = 0; i < other._explicit_data_num; ++i) { + _update_registers(other._explicit_data[i]); } break; case HLL_DATA_SPARSE: @@ -146,7 +187,7 @@ size_t HyperLogLog::max_serialized_size() const { default: return 1; case HLL_DATA_EXPLICIT: - return 2 + _hash_set.size() * 8; + return 2 + _explicit_data_num * 8; case HLL_DATA_SPARSE: case HLL_DATA_FULL: return 1 + HLL_REGISTERS_COUNT; @@ -155,34 +196,41 @@ size_t HyperLogLog::max_serialized_size() const { size_t HyperLogLog::serialize(uint8_t* dst) const { uint8_t* ptr = dst; + switch (_type) { case HLL_DATA_EMPTY: default: { // When the _type is unknown, which may not happen, we encode it as // Empty HyperLogLog object. *ptr++ = HLL_DATA_EMPTY; + break; } case HLL_DATA_EXPLICIT: { - DCHECK(_hash_set.size() <= HLL_EXPLICIT_INT64_NUM) - << "Number of explicit elements(" << _hash_set.size() + DCHECK(_explicit_data_num < HLL_EXPLICIT_INT64_NUM) + << "Number of explicit elements(" << _explicit_data_num << ") should be less or equal than " << HLL_EXPLICIT_INT64_NUM; *ptr++ = _type; - *ptr++ = (uint8_t)_hash_set.size(); - for (auto hash_value : _hash_set) { - encode_fixed64_le(ptr, hash_value); + *ptr++ = (uint8_t)_explicit_data_num; + +#if __BYTE_ORDER == __LITTLE_ENDIAN + memcpy(ptr, _explicit_data, _explicit_data_num * sizeof(*_explicit_data)); + ptr += _explicit_data_num * sizeof(*_explicit_data); +#else + for (int32_t i = 0; i < _explicit_data_num; ++i) { + *(uint64_t*)ptr = (uint64_t)gbswap_64(_explicit_data[i]); ptr += 8; } +#endif break; } case HLL_DATA_SPARSE: case HLL_DATA_FULL: { uint32_t num_non_zero_registers = 0; - for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { - if (_registers[i] != 0) { - num_non_zero_registers++; - } + for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) { + num_non_zero_registers += (_registers[i] != 0); } + // each register in sparse format will occupy 3bytes, 2 for index and // 1 for register value. So if num_non_zero_registers is greater than // 4K we use full encode format. @@ -196,15 +244,39 @@ size_t HyperLogLog::serialize(uint8_t* dst) const { encode_fixed32_le(ptr, num_non_zero_registers); ptr += 4; - for (uint32_t i = 0; i < HLL_REGISTERS_COUNT; ++i) { - if (_registers[i] == 0) { + for (uint32_t i = 0; i < HLL_REGISTERS_COUNT;) { + if (*(uint32_t*)(&_registers[i]) == 0) { + i += 4; continue; } - // 2 bytes: register index - // 1 byte: register value - encode_fixed16_le(ptr, i); - ptr += 2; - *ptr++ = _registers[i]; + + if (UNLIKELY(_registers[i])) { + encode_fixed16_le(ptr, i); + ptr += 2; // 2 bytes: register index + *ptr++ = _registers[i]; // 1 byte: register value + } + ++i; + + if (UNLIKELY(_registers[i])) { + encode_fixed16_le(ptr, i); + ptr += 2; // 2 bytes: register index + *ptr++ = _registers[i]; // 1 byte: register value + } + ++i; + + if (UNLIKELY(_registers[i])) { + encode_fixed16_le(ptr, i); + ptr += 2; // 2 bytes: register index + *ptr++ = _registers[i]; // 1 byte: register value + } + ++i; + + if (UNLIKELY(_registers[i])) { + encode_fixed16_le(ptr, i); + ptr += 2; // 2 bytes: register index + *ptr++ = _registers[i]; // 1 byte: register value + } + ++i; } } break; @@ -280,22 +352,21 @@ bool HyperLogLog::deserialize(const Slice& slice) { uint8_t num_explicits = *ptr++; // 3+: 8 bytes hash value for (int i = 0; i < num_explicits; ++i) { - _hash_set.insert(decode_fixed64_le(ptr)); + _explicit_data_insert(decode_fixed64_le(ptr)); ptr += 8; } break; } case HLL_DATA_SPARSE: { - _registers = new uint8_t[HLL_REGISTERS_COUNT]; - memset(_registers, 0, HLL_REGISTERS_COUNT); - + _registers = new uint8_t[HLL_REGISTERS_COUNT](); // 2-5(4 byte): number of registers uint32_t num_registers = decode_fixed32_le(ptr); + uint16_t register_idx = 0; ptr += 4; for (uint32_t i = 0; i < num_registers; ++i) { // 2 bytes: register index // 1 byte: register value - uint16_t register_idx = decode_fixed16_le(ptr); + register_idx = decode_fixed16_le(ptr); ptr += 2; _registers[register_idx] = *ptr++; } @@ -320,7 +391,7 @@ int64_t HyperLogLog::estimate_cardinality() const { return 0; } if (_type == HLL_DATA_EXPLICIT) { - return _hash_set.size(); + return _explicit_data_num; } const int num_streams = HLL_REGISTERS_COUNT; diff --git a/be/src/olap/hll.h b/be/src/olap/hll.h index 850eb71423..78b14ec5db 100644 --- a/be/src/olap/hll.h +++ b/be/src/olap/hll.h @@ -18,10 +18,9 @@ #ifndef DORIS_BE_SRC_OLAP_HLL_H #define DORIS_BE_SRC_OLAP_HLL_H +#include #include #include - -#include #include #include @@ -65,7 +64,7 @@ const static int HLL_EMPTY_SIZE = 1; // (1 + 4 + 3 * 4096) = 12293. // // HLL_DATA_FULL: most space-consuming, store all registers -// +// // A HLL value will change in the sequence empty -> explicit -> sparse -> full, and not // allow reverse. // @@ -80,9 +79,11 @@ enum HllDataType { class HyperLogLog { public: + HyperLogLog() = default; - explicit HyperLogLog(uint64_t hash_value) : _type(HLL_DATA_EXPLICIT) { - _hash_set.emplace(hash_value); + explicit HyperLogLog(uint64_t hash_value): _type(HLL_DATA_EXPLICIT) { + _explicit_data[0] = hash_value; + _explicit_data_num = 1; } explicit HyperLogLog(const Slice& src); @@ -124,7 +125,7 @@ public: // Check if input slice is a valid serialized binary of HyperLogLog. // This function only check the encoded type in slice, whose complex - // function is O(1). + // function is O(1). static bool is_valid(const Slice& slice); // only for debug @@ -134,15 +135,16 @@ public: return {}; case HLL_DATA_EXPLICIT: case HLL_DATA_SPARSE: - case HLL_DATA_FULL: { - std::string str{"hash set size: "}; - str.append(std::to_string(_hash_set.size())); - str.append("\ncardinality:\t"); - str.append(std::to_string(estimate_cardinality())); - str.append("\ntype:\t"); - str.append(std::to_string(_type)); - return str; - } + case HLL_DATA_FULL: + { + std::string str {"hash set size: "}; + str.append(std::to_string((size_t)_explicit_data_num)); + str.append("\ncardinality:\t"); + str.append(std::to_string(estimate_cardinality())); + str.append("\ntype:\t"); + str.append(std::to_string(_type)); + return str; + } default: return {}; } @@ -150,7 +152,9 @@ public: private: HllDataType _type = HLL_DATA_EMPTY; - std::set _hash_set; + + uint32_t _explicit_data_num = 0; + uint64_t _explicit_data[HLL_EXPLICIT_INT64_NUM * 2]; // This field is much space consuming(HLL_REGISTERS_COUNT), we create // it only when it is really needed. @@ -162,7 +166,7 @@ private: void _convert_explicit_to_register(); // update one hash value into this registers - void _update_registers(uint64_t hash_value) { + inline void _update_registers(uint64_t hash_value) { // Use the lower bits to index into the number of streams and then // find the first 1 bit after the index bits. int idx = hash_value % HLL_REGISTERS_COUNT; @@ -170,27 +174,52 @@ private: // make sure max first_one_bit is HLL_ZERO_COUNT_BITS + 1 hash_value |= ((uint64_t)1 << HLL_ZERO_COUNT_BITS); uint8_t first_one_bit = __builtin_ctzl(hash_value) + 1; - _registers[idx] = std::max((uint8_t)_registers[idx], first_one_bit); + _registers[idx] = _registers[idx] > first_one_bit ? _registers[idx] : first_one_bit; } // absorb other registers into this registers - void _merge_registers(const uint8_t* other_registers) { + void _merge_registers(const uint8_t* other) { for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) { - _registers[i] = std::max(_registers[i], other_registers[i]); + _registers[i] = _registers[i] < other[i] ? other[i] : _registers[i]; } } + + bool _explicit_data_insert(uint64_t data) { + //find insert pos + int32_t i = (int32_t)_explicit_data_num - 1; + while (i >= 0) { + if (_explicit_data[i] == data) { + return false; + } else if (_explicit_data[i] < data) { + break; + } else { + --i; + } + } + + ++i; //now, i is the insert position + + size_t n = (_explicit_data_num - i) * sizeof(*_explicit_data); + if (n) { + memmove(_explicit_data+i+1, _explicit_data+i, n); + } + + //insert data + _explicit_data[i] = data; + _explicit_data_num++; + return true; + } }; // todo(kks): remove this when dpp_sink class was removed class HllSetResolver { public: - HllSetResolver() - : _buf_ref(nullptr), - _buf_len(0), - _set_type(HLL_DATA_EMPTY), - _full_value_position(nullptr), - _explicit_value(nullptr), - _explicit_num(0) {} + HllSetResolver() : _buf_ref(nullptr), + _buf_len(0), + _set_type(HLL_DATA_EMPTY), + _full_value_position(nullptr), + _explicit_value(nullptr), + _explicit_num(0) {} ~HllSetResolver() {} @@ -201,16 +230,16 @@ public: typedef uint8_t SparseValueType; // only save pointer - void init(char* buf, int len) { + void init(char* buf, int len){ this->_buf_ref = buf; this->_buf_len = len; } // hll set type - HllDataType get_hll_data_type() { return _set_type; }; + HllDataType get_hll_data_type() { return _set_type; } // explicit value num - int get_explicit_count() { return (int)_explicit_num; }; + int get_explicit_count() { return (int)_explicit_num; } // get explicit index value 64bit uint64_t get_explicit_value(int index) { @@ -218,21 +247,20 @@ public: return -1; } return _explicit_value[index]; - }; + } // get full register value - char* get_full_value() { return _full_value_position; }; + char* get_full_value() { return _full_value_position; } // get (index, value) map - std::map& get_sparse_map() { return _sparse_map; }; + std::map& get_sparse_map() { return _sparse_map; } // parse set , call after copy() or init() void parse(); - -private: - char* _buf_ref; // set - int _buf_len; // set len - HllDataType _set_type; //set type +private : + char* _buf_ref; // set + int _buf_len; // set len + HllDataType _set_type; //set type char* _full_value_position; uint64_t* _explicit_value; ExplicitLengthValueType _explicit_num; @@ -249,6 +277,6 @@ public: const int set_len, int& len); }; -} // namespace doris +} // namespace doris #endif // DORIS_BE_SRC_OLAP_HLL_H