From 93fe10a268ba64ec26377503313d85be2412712a Mon Sep 17 00:00:00 2001 From: ZHAO Chun Date: Sat, 21 Sep 2019 14:38:58 +0800 Subject: [PATCH] Reduce size of HyperLogLog struct (#1845) Now size of HyperLogLog struct is so large that it lead the rowset is too small when ingesting data. In this CL, registers in HyperLogLog are only created when it is needed. When ingesting data, it's normal case that there are only few values in one HyperLogLog. --- be/src/exec/exec_node.h | 1 + be/src/exprs/hll_function.cpp | 14 +- be/src/exprs/hll_hash_function.cpp | 9 +- be/src/olap/aggregate_func.h | 6 +- be/src/olap/hll.cpp | 310 +++++++++++++++++----------- be/src/olap/hll.h | 145 +++++++------ be/src/olap/olap_common.h | 9 - be/test/exprs/hll_function_test.cpp | 8 +- be/test/olap/CMakeLists.txt | 1 + be/test/olap/hll_test.cpp | 146 +++++++++++++ run-ut.sh | 1 + 11 files changed, 422 insertions(+), 228 deletions(-) create mode 100644 be/test/olap/hll_test.cpp diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 647ee8b1f0..ace7935926 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -31,6 +31,7 @@ #include "runtime/bufferpool/buffer_pool.h" #include "runtime/query_statistics.h" #include "service/backend_options.h" +#include "util/uid_util.h" // for print_id namespace llvm { class Function; diff --git a/be/src/exprs/hll_function.cpp b/be/src/exprs/hll_function.cpp index d8c759bd57..5894d0c057 100644 --- a/be/src/exprs/hll_function.cpp +++ b/be/src/exprs/hll_function.cpp @@ -29,19 +29,17 @@ void HllFunctions::init() { } StringVal HllFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) { - const int HLL_SINGLE_VALUE_SIZE = 10; - const int HLL_EMPTY_SIZE = 1; std::string buf; - std::unique_ptr hll; if (!input.is_null) { uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED); - hll.reset(new HyperLogLog(hash_value)); + HyperLogLog hll(hash_value); buf.resize(HLL_SINGLE_VALUE_SIZE); + hll.serialize((uint8_t*)buf.c_str()); } else { - hll.reset(new HyperLogLog()); + HyperLogLog hll; buf.resize(HLL_EMPTY_SIZE); + hll.serialize((uint8_t*)buf.c_str()); } - hll->serialize((char*)buf.c_str()); return AnyValUtil::from_string_temp(ctx, buf); } @@ -64,7 +62,7 @@ void HllFunctions::hll_update(FunctionContext *, const T &src, StringVal* dst) { } } void HllFunctions::hll_merge(FunctionContext*, const StringVal &src, StringVal* dst) { - HyperLogLog src_hll = HyperLogLog((char*)src.ptr); + HyperLogLog src_hll((uint8_t*)src.ptr); auto* dst_hll = reinterpret_cast(dst->ptr); dst_hll->merge(src_hll); } @@ -89,7 +87,7 @@ BigIntVal HllFunctions::hll_cardinality(FunctionContext* ctx, const StringVal& i StringVal HllFunctions::hll_serialize(FunctionContext *ctx, const StringVal &src) { auto* src_hll = reinterpret_cast(src.ptr); StringVal result(ctx, HLL_COLUMN_DEFAULT_LEN); - int size = src_hll->serialize((char*)result.ptr); + int size = src_hll->serialize((uint8_t*)result.ptr); result.resize(ctx, size); delete src_hll; return result; diff --git a/be/src/exprs/hll_hash_function.cpp b/be/src/exprs/hll_hash_function.cpp index 1534f27f91..07d61dd30e 100644 --- a/be/src/exprs/hll_hash_function.cpp +++ b/be/src/exprs/hll_hash_function.cpp @@ -27,18 +27,17 @@ void HllHashFunctions::init() { } StringVal HllHashFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) { - const int HLL_SINGLE_VALUE_SIZE = 10; - const int HLL_EMPTY_SIZE = 1; std::string buf; - std::unique_ptr hll {new HyperLogLog()}; if (!input.is_null) { uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED); - hll.reset(new HyperLogLog(hash_value)); + HyperLogLog hll(hash_value); buf.resize(HLL_SINGLE_VALUE_SIZE); + hll.serialize((uint8_t*)buf.c_str()); } else { + HyperLogLog hll; buf.resize(HLL_EMPTY_SIZE); + hll.serialize((uint8_t*)buf.c_str()); } - hll->serialize((char*)buf.c_str()); return AnyValUtil::from_string_temp(ctx, buf); } diff --git a/be/src/olap/aggregate_func.h b/be/src/olap/aggregate_func.h index 25ffdd0390..8381dca094 100644 --- a/be/src/olap/aggregate_func.h +++ b/be/src/olap/aggregate_func.h @@ -408,7 +408,7 @@ struct AggregateFuncTraitssize = sizeof(HyperLogLog); // use 'placement new' to allocate HyperLogLog on arena, so that we can control the memory usage. char* mem = arena->Allocate(dst_slice->size); - dst_slice->data = (char*) new (mem) HyperLogLog(src_slice->data); + dst_slice->data = (char*) new (mem) HyperLogLog((const uint8_t*)src_slice->data); } static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -420,7 +420,7 @@ struct AggregateFuncTraitsdata); + HyperLogLog src_hll((const uint8_t*)src_slice->data); dst_hll->merge(src_hll); } else { // for stream load auto* src_hll = reinterpret_cast(src_slice->data); @@ -435,7 +435,7 @@ struct AggregateFuncTraits(slice->data); slice->data = arena->Allocate(HLL_COLUMN_DEFAULT_LEN); - slice->size = hll->serialize(slice->data); + slice->size = hll->serialize((uint8_t*)slice->data); // NOT using 'delete hll' because the memory is managed by arena hll->~HyperLogLog(); } diff --git a/be/src/olap/hll.cpp b/be/src/olap/hll.cpp index 192bbc02f8..2e5a904e9c 100644 --- a/be/src/olap/hll.cpp +++ b/be/src/olap/hll.cpp @@ -20,6 +20,9 @@ #include #include +#include "common/logging.h" +#include "util/coding.h" + using std::map; using std::nothrow; using std::string; @@ -27,165 +30,226 @@ using std::stringstream; namespace doris { -HyperLogLog::HyperLogLog(char* src) { - _type = (HllDataType)src[0]; - memset(_registers, 0, HLL_REGISTERS_COUNT); - char* sparse_data = nullptr; - switch (_type) { - case HLL_DATA_EXPLICIT: - // first byte : type - // second~five byte : hash values's number - // five byte later : hash value - { - auto _explicit_num = (uint8_t) (src[sizeof(SetTypeValueType)]); - auto *_explicit_value = (uint64_t *) (src + sizeof(SetTypeValueType) + sizeof(uint8_t)); - for (int i = 0; i < _explicit_num; ++i) { - _hash_set.insert(_explicit_value[i]); - } - } - break; - case HLL_DATA_SPRASE: - // first byte : type - // second ~(2^HLL_COLUMN_PRECISION)/8 byte : bitmap mark which is not zero - // 2^HLL_COLUMN_PRECISION)/8 + 1以后value - { - auto* _sparse_count = (SparseLengthValueType*)(src + sizeof (SetTypeValueType)); - sparse_data = src + sizeof(SetTypeValueType) + sizeof(SparseLengthValueType); - std::map _sparse_map; - for (int i = 0; i < *_sparse_count; i++) { - auto* index = (SparseIndexType*)sparse_data; - sparse_data += sizeof(SparseIndexType); - auto* value = (SparseValueType*)sparse_data; - _sparse_map[*index] = *value; - sparse_data += sizeof(SetTypeValueType); - } +// TODO(zc): we should check if src is valid, it maybe corrupted +HyperLogLog::HyperLogLog(const uint8_t* src) { + deserialize(src); +} - for (auto iter: _sparse_map) { - _registers[iter.first] = (uint8_t)iter.second; - } +// Convert explicit values to register format, and clear explicit values. +// NOTE: this function won't modify _type. +void HyperLogLog::_convert_explicit_to_register() { + DCHECK(_type == HLL_DATA_EXPLICIT) << "_type(" << _type << ") should be explicit(" + << HLL_DATA_EXPLICIT << ")"; + _registers = new uint8_t[HLL_REGISTERS_COUNT]; + memset(_registers, 0, HLL_REGISTERS_COUNT); + for (auto value : _hash_set) { + _update_registers(value); + } + // clear _hash_set + std::set().swap(_hash_set); +} + +// Change HLL_DATA_EXPLICIT to HLL_DATA_FULL directly, because HLL_DATA_SPRASE +// is implemented in the same way in memory with HLL_DATA_FULL. +void HyperLogLog::update(uint64_t hash_value) { + switch (_type) { + case HLL_DATA_EMPTY: + _hash_set.insert(hash_value); + _type = HLL_DATA_EXPLICIT; + break; + case HLL_DATA_EXPLICIT: + if (_hash_set.size() < HLL_EXPLICLIT_INT64_NUM) { + _hash_set.insert(hash_value); + break; } - break; - case HLL_DATA_FULL: - // first byte : type - // second byte later : hll register value - { - char* _full_value_position = src + sizeof (SetTypeValueType); - memcpy(_registers, _full_value_position, HLL_REGISTERS_COUNT); - } - break; - case HLL_DATA_EMPTY: - break; - default: - break; + _convert_explicit_to_register(); + _type = HLL_DATA_FULL; + // fall through + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + _update_registers(hash_value); + break; } } void HyperLogLog::merge(const HyperLogLog& other) { + // fast path if (other._type == HLL_DATA_EMPTY) { return; } - - // _type must change - if (_type == HLL_DATA_EMPTY) { + switch (_type) { + case HLL_DATA_EMPTY: { + // _type must change _type = other._type; switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set = other._hash_set; - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); - return; - default: - return; + case HLL_DATA_EXPLICIT: + _hash_set = other._hash_set; + break; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + _registers = new uint8_t[HLL_REGISTERS_COUNT]; + memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); + break; + default: + break; } + break; } - - // _type maybe change - if (_type == HLL_DATA_EXPLICIT) { + case HLL_DATA_EXPLICIT: { switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); - _type = other._type; - return; - default: - return; + case HLL_DATA_EXPLICIT: + // Merge other's explicit values first, then check if the number is exccede + // HLL_EXPLICLIT_INT64_NUM. This is OK because the max value is 2 * 160. + _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); + if (_hash_set.size() > HLL_EXPLICLIT_INT64_NUM) { + _convert_explicit_to_register(); + _type = HLL_DATA_FULL; + } + break; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + _convert_explicit_to_register(); + _merge_registers(other._registers); + _type = HLL_DATA_FULL; + break; + default: + break; } + break; } - - // _type maybe change - if (_type == HLL_DATA_SPRASE) { + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: { switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - merge_registers(_registers, other._registers); - _type = other._type; - return; - default: - return; + case HLL_DATA_EXPLICIT: + for (auto hash_value : other._hash_set) { + _update_registers(hash_value); + } + break; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + _merge_registers(other._registers); + break; + default: + break; } + break; } - - // _type not change - if (_type == HLL_DATA_FULL) { - switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - merge_registers(_registers, other._registers); - return; - default: - return; - } } } -int HyperLogLog::serialize(char* dest) { - if (_type == HLL_DATA_EMPTY) { - dest[0] = _type; - return 1; +int HyperLogLog::serialize(uint8_t* dest) { + uint8_t* ptr = dest; + switch (_type) { + case HLL_DATA_EMPTY: { + *ptr++ = _type; + break; } - - std::map index_to_value; - if (_type == HLL_DATA_SPRASE || _type == HLL_DATA_FULL || - _hash_set.size() > HLL_EXPLICLIT_INT64_NUM) { - merge_hash_set_to_registers(_registers, _hash_set); + case HLL_DATA_EXPLICIT: { + DCHECK(_hash_set.size() < HLL_EXPLICLIT_INT64_NUM) + << "Number of explicit elements(" << _hash_set.size() + << ") should be less or equal than " << HLL_EXPLICLIT_INT64_NUM; + *ptr++ = _type; + *ptr++ = (uint8_t)_hash_set.size(); + for (auto hash_value : _hash_set) { + encode_fixed64_le(ptr, hash_value); + ptr += 8; + } + break; + } + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: { + uint32_t num_non_zero_registers = 0; for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { if (_registers[i] != 0) { - index_to_value[i] = _registers[i]; + num_non_zero_registers++; } } - } + // each register in sparse format will occupy 3bytes, 2 for index and + // 1 for register value. So if num_non_zero_registers is greater than + // 4K we use full encode format. + if (num_non_zero_registers > HLL_SPARSE_THRESHOLD) { + *ptr++ = HLL_DATA_FULL; + memcpy(ptr, _registers, HLL_REGISTERS_COUNT); + ptr += HLL_REGISTERS_COUNT; + } else { + *ptr++ = HLL_DATA_SPRASE; + // 2-5(4 byte): number of registers + encode_fixed32_le(ptr, num_non_zero_registers); + ptr += 4; - int sparse_set_len = index_to_value.size() * (sizeof(SparseIndexType) - + sizeof(SparseValueType) + sizeof(SparseLengthValueType)); - int result_len = 0; - if (sparse_set_len >= HLL_COLUMN_DEFAULT_LEN) { - result_len = serialize_full(dest, _registers); - } else if (index_to_value.size() > 0) { - result_len = serialize_sparse(dest, index_to_value); - } else if (_hash_set.size() > 0) { - result_len = serialize_explicit(dest, _hash_set); + for (uint32_t i = 0; i < HLL_REGISTERS_COUNT; ++i) { + if (_registers[i] == 0) { + continue; + } + // 2 bytes: register index + // 1 byte: register value + encode_fixed16_le(ptr, i); + ptr += 2; + *ptr++ = _registers[i]; + } + } + break; } + } + return ptr - dest; +} - return result_len & 0xffff; +// TODO(zc): check input string's length +bool HyperLogLog::deserialize(const uint8_t* ptr) { + // can be called only when type is empty + DCHECK(_type == HLL_DATA_EMPTY); + + // first byte : type + _type = (HllDataType)*ptr++; + switch (_type) { + case HLL_DATA_EMPTY: + break; + case HLL_DATA_EXPLICIT: { + // 2: number of explicit values + // make sure that num_explicit is positive + uint8_t num_explicits = *ptr++; + // 3+: 8 bytes hash value + for (int i = 0; i < num_explicits; ++i) { + _hash_set.insert(decode_fixed64_le(ptr)); + ptr += 8; + } + break; + } + case HLL_DATA_SPRASE: { + _registers = new uint8_t[HLL_REGISTERS_COUNT]; + memset(_registers, 0, HLL_REGISTERS_COUNT); + + // 2-5(4 byte): number of registers + uint32_t num_registers = decode_fixed32_le(ptr); + ptr += 4; + for (uint32_t i = 0; i < num_registers; ++i) { + // 2 bytes: register index + // 1 byte: register value + uint16_t register_idx = decode_fixed16_le(ptr); + ptr += 2; + _registers[register_idx] = *ptr++; + } + break; + } + case HLL_DATA_FULL: { + _registers = new uint8_t[HLL_REGISTERS_COUNT]; + // 2+ : hll register value + memcpy(_registers, ptr, HLL_REGISTERS_COUNT); + break; + } + default: + return false; + } + return true; } int64_t HyperLogLog::estimate_cardinality() { if (_type == HLL_DATA_EMPTY) { return 0; } - - merge_hash_set_to_registers(_registers, _hash_set); + if (_type == HLL_DATA_EXPLICIT) { + return _hash_set.size(); + } const int num_streams = HLL_REGISTERS_COUNT; // Empirical constants for the algorithm. diff --git a/be/src/olap/hll.h b/be/src/olap/hll.h index e6506bd848..21807f4f07 100644 --- a/be/src/olap/hll.h +++ b/be/src/olap/hll.h @@ -23,15 +23,21 @@ #include #include -#include "olap/olap_common.h" +#include "gutil/macros.h" namespace doris { const static int HLL_COLUMN_PRECISION = 14; +const static int HLL_ZERO_COUNT_BITS = (64 - HLL_COLUMN_PRECISION); const static int HLL_EXPLICLIT_INT64_NUM = 160; -const static int HLL_REGISTERS_COUNT = 16384; +const static int HLL_SPARSE_THRESHOLD = 4096; +const static int HLL_REGISTERS_COUNT = 16 * 1024; // maximum size in byte of serialized HLL: type(1) + registers (2^14) -const static int HLL_COLUMN_DEFAULT_LEN = 16385; +const static int HLL_COLUMN_DEFAULT_LEN = HLL_REGISTERS_COUNT + 1; + +// 1 for type; 1 for hash values count; 8 for hash value +const static int HLL_SINGLE_VALUE_SIZE = 10; +const static int HLL_EMPTY_SIZE = 1; // Hyperloglog distinct estimate algorithm. // See these papers for more details. @@ -39,43 +45,61 @@ const static int HLL_COLUMN_DEFAULT_LEN = 16385; // algorithm (2007) // 2) HyperLogLog in Practice (paper from google with some improvements) -// 通过varchar的变长编码方式实现hll集合 -// 实现hll列中间计算结果的处理 -// empty 空集合 -// explicit 存储64位hash值的集合 -// sparse 存储hll非0的register -// full 存储全部的hll register -// empty -> explicit -> sparse -> full 四种类型的转换方向不可逆 -// 第一个字节存放hll集合的类型 0:empty 1:explicit 2:sparse 3:full -// 已决定后面的数据怎么解析 +// Each HLL value is a set of values. To save space, Doris store HLL value +// in different format according to its cardinality. +// +// HLL_DATA_EMPTY: when set is empty. +// +// HLL_DATA_EXPLICIT: when there is only few values in set, store these values explicit. +// If the number of hash values is not greater than 160, set is encoded in this format. +// The max space occupied is (1 + 1 + 160 * 8) = 1282. I don't know why 160 is choosed, +// maybe can be other number. If you are interested, you can try other number and see +// if it will be better. +// +// HLL_DATA_SPRASE: only store non-zero registers. If the number of non-zero registers +// is not greater than 4096, set is encoded in this format. The max space occupied is +// (1 + 4 + 3 * 4096) = 12293. +// +// HLL_DATA_FULL: most space-consuming, store all registers +// +// A HLL value will change in the sequence empty -> explicit -> sparse -> full, and not +// allow reverse. +// +// NOTE: This values are persisted in storage devices, so don't change exist +// enum values. +enum HllDataType { + HLL_DATA_EMPTY = 0, + HLL_DATA_EXPLICIT = 1, + HLL_DATA_SPRASE = 2, + HLL_DATA_FULL = 3, +}; + class HyperLogLog { public: - HyperLogLog(): _type(HLL_DATA_EMPTY){ - memset(_registers, 0, HLL_REGISTERS_COUNT); - } + HyperLogLog() = default; explicit HyperLogLog(uint64_t hash_value): _type(HLL_DATA_EXPLICIT) { _hash_set.emplace(hash_value); } + explicit HyperLogLog(const uint8_t* src); - explicit HyperLogLog(char* src); + ~HyperLogLog() { + delete[] _registers; + } typedef uint8_t SetTypeValueType; typedef int32_t SparseLengthValueType; typedef uint16_t SparseIndexType; typedef uint8_t SparseValueType; - // change the _type to HLL_DATA_FULL directly has two reasons: - // 1. keep behavior consistent with before - // 2. make the code logic is simple - void update(const uint64_t hash_value) { - _type = HLL_DATA_FULL; - update_registers(_registers, hash_value); - } + // Add a hash value to this HLL value + // NOTE: input must be a hash_value + void update(uint64_t hash_value); void merge(const HyperLogLog& other); - int serialize(char* dest); + int serialize(uint8_t* dest); + bool deserialize(const uint8_t* ptr); int64_t estimate_cardinality(); @@ -102,67 +126,36 @@ public: } private: - HllDataType _type; - char _registers[HLL_REGISTERS_COUNT]; + HllDataType _type = HLL_DATA_EMPTY; std::set _hash_set; - static void update_registers(char* registers, uint64_t hash_value) { + // This field is much space consumming(HLL_REGISTERS_COUNT), we craete + // it only when it is really needed. + uint8_t* _registers = nullptr; + +private: + DISALLOW_COPY_AND_ASSIGN(HyperLogLog); + + void _convert_explicit_to_register(); + + // update one hash value into this registers + void _update_registers(uint64_t hash_value) { // Use the lower bits to index into the number of streams and then // find the first 1 bit after the index bits. int idx = hash_value % HLL_REGISTERS_COUNT; - uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1; - registers[idx] = std::max((uint8_t)registers[idx], first_one_bit); + hash_value >>= HLL_COLUMN_PRECISION; + // make sure max first_one_bit is HLL_ZERO_COUNT_BITS + 1 + hash_value |= ((uint64_t)1 << HLL_ZERO_COUNT_BITS); + uint8_t first_one_bit = __builtin_ctzl(hash_value) + 1; + _registers[idx] = std::max((uint8_t)_registers[idx], first_one_bit); } - static void merge_hash_set_to_registers(char* registers, const std::set& hash_set) { - for (auto hash_value: hash_set) { - update_registers(registers, hash_value); + // absorb other registers into this registers + void _merge_registers(const uint8_t* other_registers) { + for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) { + _registers[i] = std::max(_registers[i], other_registers[i]); } } - - static void merge_registers(char* registers, const char* other_registers) { - for (int i = 0; i < doris::HLL_REGISTERS_COUNT; ++i) { - registers[i] = std::max(registers[i], other_registers[i]); - } - } - - static int serialize_full(char* result, char* registers) { - result[0] = HLL_DATA_FULL; - memcpy(result + 1, registers, HLL_REGISTERS_COUNT); - return HLL_COLUMN_DEFAULT_LEN; - } - - static int serialize_sparse(char *result, const std::map& index_to_value) { - result[0] = HLL_DATA_SPRASE; - int len = sizeof(SetTypeValueType) + sizeof(SparseLengthValueType); - char* write_value_pos = result + len; - for (auto iter = index_to_value.begin(); - iter != index_to_value.end(); iter++) { - write_value_pos[0] = (char)(iter->first & 0xff); - write_value_pos[1] = (char)(iter->first >> 8 & 0xff); - write_value_pos[2] = iter->second; - write_value_pos += 3; - } - int registers_count = index_to_value.size(); - len += registers_count * (sizeof(SparseIndexType)+ sizeof(SparseValueType)); - *(int*)(result + 1) = registers_count; - return len; - } - - static int serialize_explicit(char* result, const std::set& hash_value_set) { - result[0] = HLL_DATA_EXPLICIT; - result[1] = (uint8_t)(hash_value_set.size()); - int len = sizeof(SetTypeValueType) + sizeof(uint8_t); - char* write_pos = result + len; - for (auto iter = hash_value_set.begin(); - iter != hash_value_set.end(); iter++) { - uint64_t hash_value = *iter; - *(uint64_t*)write_pos = hash_value; - write_pos += 8; - } - len += sizeof(uint64_t) * hash_value_set.size(); - return len; - } }; // todo(kks): remove this when dpp_sink class was removed diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 18d31e8cba..3ab899d1c1 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -164,15 +164,6 @@ enum OLAPCompressionType { OLAP_COMP_LZ4 = 3, // 用于储存的压缩算法,压缩率低,cpu开销低 }; -// hll数据存储格式,优化存储结构减少多余空间的占用 -enum HllDataType { - HLL_DATA_EMPTY = 0, // 用于记录空的hll集合 - HLL_DATA_EXPLICIT, // 直接存储hash后结果的集合类型 - HLL_DATA_SPRASE, // 记录register不为空的集合类型 - HLL_DATA_FULL, // 记录完整的hll集合 - HLL_DATA_NONE -}; - enum PushType { PUSH_NORMAL = 1, PUSH_FOR_DELETE = 2, diff --git a/be/test/exprs/hll_function_test.cpp b/be/test/exprs/hll_function_test.cpp index 68a6664b06..64ece680f5 100644 --- a/be/test/exprs/hll_function_test.cpp +++ b/be/test/exprs/hll_function_test.cpp @@ -32,7 +32,7 @@ namespace doris { StringVal convert_hll_to_string(FunctionContext* ctx, HyperLogLog& hll) { std::string buf; buf.resize(HLL_COLUMN_DEFAULT_LEN); - int size = hll.serialize((char*)buf.c_str()); + int size = hll.serialize((uint8_t*)buf.c_str()); buf.resize(size); return AnyValUtil::from_string_temp(ctx, buf); } @@ -58,7 +58,7 @@ TEST_F(HllFunctionsTest, hll_hash) { StringVal input = AnyValUtil::from_string_temp(ctx, std::string("1024")); StringVal result = HllFunctions::hll_hash(ctx, input); - HyperLogLog hll((char*)result.ptr); + HyperLogLog hll((uint8_t*)result.ptr); int64_t cardinality = hll.estimate_cardinality(); int64_t expected = 1; @@ -69,7 +69,7 @@ TEST_F(HllFunctionsTest, hll_hash_null) { StringVal input = StringVal::null(); StringVal result = HllFunctions::hll_hash(ctx, input); - HyperLogLog hll((char*)result.ptr); + HyperLogLog hll((uint8_t*)result.ptr); int64_t cardinality = hll.estimate_cardinality(); int64_t expected = 0; @@ -102,7 +102,7 @@ TEST_F(HllFunctionsTest, hll_merge) { HllFunctions::hll_merge(ctx, src2, &dst); StringVal serialized = HllFunctions::hll_serialize(ctx, dst); - HyperLogLog hll((char*)serialized.ptr); + HyperLogLog hll((uint8_t*)serialized.ptr); BigIntVal expected(1); ASSERT_EQ(expected, hll.estimate_cardinality()); diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index e97de0bebe..9d671d6f30 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -70,3 +70,4 @@ ADD_BE_TEST(generic_iterators_test) ADD_BE_TEST(key_coder_test) ADD_BE_TEST(short_key_index_test) ADD_BE_TEST(page_cache_test) +ADD_BE_TEST(hll_test) diff --git a/be/test/olap/hll_test.cpp b/be/test/olap/hll_test.cpp new file mode 100644 index 0000000000..d28d4709d1 --- /dev/null +++ b/be/test/olap/hll_test.cpp @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/hll.h" + +#include + +#include "util/hash_util.hpp" + +namespace doris { + +class TestHll : public testing::Test { +public: + virtual ~TestHll() { } +}; + +static uint64_t hash(uint64_t value) { + return HashUtil::murmur_hash64A(&value, 8, 0); +} + +TEST_F(TestHll, Normal) { + uint8_t buf[HLL_REGISTERS_COUNT + 1]; + // empty + { + HyperLogLog empty_hll; + int len = empty_hll.serialize(buf); + ASSERT_EQ(1, len); + HyperLogLog test_hll(buf); + ASSERT_EQ(0, test_hll.estimate_cardinality()); + } + // explicit [0. 100) + HyperLogLog explicit_hll; + { + for (int i = 0; i < 100; ++i) { + explicit_hll.update(hash(i)); + } + int len = explicit_hll.serialize(buf); + ASSERT_EQ(1 + 1 + 100 * 8, len); + + HyperLogLog test_hll(buf); + test_hll.update(hash(0)); + { + HyperLogLog other_hll; + for (int i = 0; i < 100; ++i) { + other_hll.update(hash(i)); + } + test_hll.merge(other_hll); + } + ASSERT_EQ(100, test_hll.estimate_cardinality()); + } + // sparse [1024, 2048) + HyperLogLog sparse_hll; + { + for (int i = 0; i < 1024; ++i) { + sparse_hll.update(hash(i + 1024)); + } + int len = sparse_hll.serialize(buf); + ASSERT_TRUE(len < HLL_REGISTERS_COUNT + 1); + + HyperLogLog test_hll(buf); + test_hll.update(hash(1024)); + { + HyperLogLog other_hll; + for (int i = 0; i < 1024; ++i) { + other_hll.update(hash(i + 1024)); + } + test_hll.merge(other_hll); + } + auto cardinality = test_hll.estimate_cardinality(); + ASSERT_EQ(sparse_hll.estimate_cardinality(), cardinality); + // 2% error rate + ASSERT_TRUE(cardinality > 1000 && cardinality < 1045); + } + // full [64 * 1024, 128 * 1024) + HyperLogLog full_hll; + { + for (int i = 0; i < 64 * 1024; ++i) { + full_hll.update(hash(64 * 1024 + i)); + } + int len = full_hll.serialize(buf); + ASSERT_EQ(HLL_REGISTERS_COUNT + 1, len); + + HyperLogLog test_hll(buf); + auto cardinality = test_hll.estimate_cardinality(); + ASSERT_EQ(full_hll.estimate_cardinality(), cardinality); + // 2% error rate + ASSERT_TRUE(cardinality > 62 * 1024 && cardinality < 66 * 1024); + } + // merge explicit to empty_hll + { + HyperLogLog new_explicit_hll; + new_explicit_hll.merge(explicit_hll); + ASSERT_EQ(100, new_explicit_hll.estimate_cardinality()); + + // merge another explicit + { + HyperLogLog other_hll; + for (int i = 100; i < 200; ++i) { + other_hll.update(hash(i)); + } + // this is converted to full + other_hll.merge(new_explicit_hll); + ASSERT_TRUE(other_hll.estimate_cardinality() > 190); + } + // merge full + { + new_explicit_hll.merge(full_hll); + ASSERT_TRUE(new_explicit_hll.estimate_cardinality() > full_hll.estimate_cardinality()); + } + } + // merge sparse to empty_hll + { + HyperLogLog new_sparse_hll; + new_sparse_hll.merge(sparse_hll); + ASSERT_EQ(sparse_hll.estimate_cardinality(), new_sparse_hll.estimate_cardinality()); + + // merge explicit + new_sparse_hll.merge(explicit_hll); + ASSERT_TRUE(new_sparse_hll.estimate_cardinality() > sparse_hll.estimate_cardinality()); + + // merge full + new_sparse_hll.merge(full_hll); + ASSERT_TRUE(new_sparse_hll.estimate_cardinality() > full_hll.estimate_cardinality()); + } +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/run-ut.sh b/run-ut.sh index 8b16d47abb..c816e0a19e 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -272,6 +272,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/aggregate_func_test ${DORIS_TEST_BINARY_DIR}/olap/short_key_index_test ${DORIS_TEST_BINARY_DIR}/olap/key_coder_test ${DORIS_TEST_BINARY_DIR}/olap/page_cache_test +${DORIS_TEST_BINARY_DIR}/olap/hll_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test