diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 647ee8b1f0..ace7935926 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -31,6 +31,7 @@ #include "runtime/bufferpool/buffer_pool.h" #include "runtime/query_statistics.h" #include "service/backend_options.h" +#include "util/uid_util.h" // for print_id namespace llvm { class Function; diff --git a/be/src/exprs/hll_function.cpp b/be/src/exprs/hll_function.cpp index d8c759bd57..5894d0c057 100644 --- a/be/src/exprs/hll_function.cpp +++ b/be/src/exprs/hll_function.cpp @@ -29,19 +29,17 @@ void HllFunctions::init() { } StringVal HllFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) { - const int HLL_SINGLE_VALUE_SIZE = 10; - const int HLL_EMPTY_SIZE = 1; std::string buf; - std::unique_ptr hll; if (!input.is_null) { uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED); - hll.reset(new HyperLogLog(hash_value)); + HyperLogLog hll(hash_value); buf.resize(HLL_SINGLE_VALUE_SIZE); + hll.serialize((uint8_t*)buf.c_str()); } else { - hll.reset(new HyperLogLog()); + HyperLogLog hll; buf.resize(HLL_EMPTY_SIZE); + hll.serialize((uint8_t*)buf.c_str()); } - hll->serialize((char*)buf.c_str()); return AnyValUtil::from_string_temp(ctx, buf); } @@ -64,7 +62,7 @@ void HllFunctions::hll_update(FunctionContext *, const T &src, StringVal* dst) { } } void HllFunctions::hll_merge(FunctionContext*, const StringVal &src, StringVal* dst) { - HyperLogLog src_hll = HyperLogLog((char*)src.ptr); + HyperLogLog src_hll((uint8_t*)src.ptr); auto* dst_hll = reinterpret_cast(dst->ptr); dst_hll->merge(src_hll); } @@ -89,7 +87,7 @@ BigIntVal HllFunctions::hll_cardinality(FunctionContext* ctx, const StringVal& i StringVal HllFunctions::hll_serialize(FunctionContext *ctx, const StringVal &src) { auto* src_hll = reinterpret_cast(src.ptr); StringVal result(ctx, HLL_COLUMN_DEFAULT_LEN); - int size = src_hll->serialize((char*)result.ptr); + int size = src_hll->serialize((uint8_t*)result.ptr); result.resize(ctx, size); delete src_hll; return result; diff --git a/be/src/exprs/hll_hash_function.cpp b/be/src/exprs/hll_hash_function.cpp index 1534f27f91..07d61dd30e 100644 --- a/be/src/exprs/hll_hash_function.cpp +++ b/be/src/exprs/hll_hash_function.cpp @@ -27,18 +27,17 @@ void HllHashFunctions::init() { } StringVal HllHashFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) { - const int HLL_SINGLE_VALUE_SIZE = 10; - const int HLL_EMPTY_SIZE = 1; std::string buf; - std::unique_ptr hll {new HyperLogLog()}; if (!input.is_null) { uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED); - hll.reset(new HyperLogLog(hash_value)); + HyperLogLog hll(hash_value); buf.resize(HLL_SINGLE_VALUE_SIZE); + hll.serialize((uint8_t*)buf.c_str()); } else { + HyperLogLog hll; buf.resize(HLL_EMPTY_SIZE); + hll.serialize((uint8_t*)buf.c_str()); } - hll->serialize((char*)buf.c_str()); return AnyValUtil::from_string_temp(ctx, buf); } diff --git a/be/src/olap/aggregate_func.h b/be/src/olap/aggregate_func.h index 25ffdd0390..8381dca094 100644 --- a/be/src/olap/aggregate_func.h +++ b/be/src/olap/aggregate_func.h @@ -408,7 +408,7 @@ struct AggregateFuncTraitssize = sizeof(HyperLogLog); // use 'placement new' to allocate HyperLogLog on arena, so that we can control the memory usage. char* mem = arena->Allocate(dst_slice->size); - dst_slice->data = (char*) new (mem) HyperLogLog(src_slice->data); + dst_slice->data = (char*) new (mem) HyperLogLog((const uint8_t*)src_slice->data); } static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -420,7 +420,7 @@ struct AggregateFuncTraitsdata); + HyperLogLog src_hll((const uint8_t*)src_slice->data); dst_hll->merge(src_hll); } else { // for stream load auto* src_hll = reinterpret_cast(src_slice->data); @@ -435,7 +435,7 @@ struct AggregateFuncTraits(slice->data); slice->data = arena->Allocate(HLL_COLUMN_DEFAULT_LEN); - slice->size = hll->serialize(slice->data); + slice->size = hll->serialize((uint8_t*)slice->data); // NOT using 'delete hll' because the memory is managed by arena hll->~HyperLogLog(); } diff --git a/be/src/olap/hll.cpp b/be/src/olap/hll.cpp index 192bbc02f8..2e5a904e9c 100644 --- a/be/src/olap/hll.cpp +++ b/be/src/olap/hll.cpp @@ -20,6 +20,9 @@ #include #include +#include "common/logging.h" +#include "util/coding.h" + using std::map; using std::nothrow; using std::string; @@ -27,165 +30,226 @@ using std::stringstream; namespace doris { -HyperLogLog::HyperLogLog(char* src) { - _type = (HllDataType)src[0]; - memset(_registers, 0, HLL_REGISTERS_COUNT); - char* sparse_data = nullptr; - switch (_type) { - case HLL_DATA_EXPLICIT: - // first byte : type - // second~five byte : hash values's number - // five byte later : hash value - { - auto _explicit_num = (uint8_t) (src[sizeof(SetTypeValueType)]); - auto *_explicit_value = (uint64_t *) (src + sizeof(SetTypeValueType) + sizeof(uint8_t)); - for (int i = 0; i < _explicit_num; ++i) { - _hash_set.insert(_explicit_value[i]); - } - } - break; - case HLL_DATA_SPRASE: - // first byte : type - // second ~(2^HLL_COLUMN_PRECISION)/8 byte : bitmap mark which is not zero - // 2^HLL_COLUMN_PRECISION)/8 + 1以后value - { - auto* _sparse_count = (SparseLengthValueType*)(src + sizeof (SetTypeValueType)); - sparse_data = src + sizeof(SetTypeValueType) + sizeof(SparseLengthValueType); - std::map _sparse_map; - for (int i = 0; i < *_sparse_count; i++) { - auto* index = (SparseIndexType*)sparse_data; - sparse_data += sizeof(SparseIndexType); - auto* value = (SparseValueType*)sparse_data; - _sparse_map[*index] = *value; - sparse_data += sizeof(SetTypeValueType); - } +// TODO(zc): we should check if src is valid, it maybe corrupted +HyperLogLog::HyperLogLog(const uint8_t* src) { + deserialize(src); +} - for (auto iter: _sparse_map) { - _registers[iter.first] = (uint8_t)iter.second; - } +// Convert explicit values to register format, and clear explicit values. +// NOTE: this function won't modify _type. +void HyperLogLog::_convert_explicit_to_register() { + DCHECK(_type == HLL_DATA_EXPLICIT) << "_type(" << _type << ") should be explicit(" + << HLL_DATA_EXPLICIT << ")"; + _registers = new uint8_t[HLL_REGISTERS_COUNT]; + memset(_registers, 0, HLL_REGISTERS_COUNT); + for (auto value : _hash_set) { + _update_registers(value); + } + // clear _hash_set + std::set().swap(_hash_set); +} + +// Change HLL_DATA_EXPLICIT to HLL_DATA_FULL directly, because HLL_DATA_SPRASE +// is implemented in the same way in memory with HLL_DATA_FULL. +void HyperLogLog::update(uint64_t hash_value) { + switch (_type) { + case HLL_DATA_EMPTY: + _hash_set.insert(hash_value); + _type = HLL_DATA_EXPLICIT; + break; + case HLL_DATA_EXPLICIT: + if (_hash_set.size() < HLL_EXPLICLIT_INT64_NUM) { + _hash_set.insert(hash_value); + break; } - break; - case HLL_DATA_FULL: - // first byte : type - // second byte later : hll register value - { - char* _full_value_position = src + sizeof (SetTypeValueType); - memcpy(_registers, _full_value_position, HLL_REGISTERS_COUNT); - } - break; - case HLL_DATA_EMPTY: - break; - default: - break; + _convert_explicit_to_register(); + _type = HLL_DATA_FULL; + // fall through + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + _update_registers(hash_value); + break; } } void HyperLogLog::merge(const HyperLogLog& other) { + // fast path if (other._type == HLL_DATA_EMPTY) { return; } - - // _type must change - if (_type == HLL_DATA_EMPTY) { + switch (_type) { + case HLL_DATA_EMPTY: { + // _type must change _type = other._type; switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set = other._hash_set; - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); - return; - default: - return; + case HLL_DATA_EXPLICIT: + _hash_set = other._hash_set; + break; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + _registers = new uint8_t[HLL_REGISTERS_COUNT]; + memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); + break; + default: + break; } + break; } - - // _type maybe change - if (_type == HLL_DATA_EXPLICIT) { + case HLL_DATA_EXPLICIT: { switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); - _type = other._type; - return; - default: - return; + case HLL_DATA_EXPLICIT: + // Merge other's explicit values first, then check if the number is exccede + // HLL_EXPLICLIT_INT64_NUM. This is OK because the max value is 2 * 160. + _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); + if (_hash_set.size() > HLL_EXPLICLIT_INT64_NUM) { + _convert_explicit_to_register(); + _type = HLL_DATA_FULL; + } + break; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + _convert_explicit_to_register(); + _merge_registers(other._registers); + _type = HLL_DATA_FULL; + break; + default: + break; } + break; } - - // _type maybe change - if (_type == HLL_DATA_SPRASE) { + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: { switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - merge_registers(_registers, other._registers); - _type = other._type; - return; - default: - return; + case HLL_DATA_EXPLICIT: + for (auto hash_value : other._hash_set) { + _update_registers(hash_value); + } + break; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + _merge_registers(other._registers); + break; + default: + break; } + break; } - - // _type not change - if (_type == HLL_DATA_FULL) { - switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - merge_registers(_registers, other._registers); - return; - default: - return; - } } } -int HyperLogLog::serialize(char* dest) { - if (_type == HLL_DATA_EMPTY) { - dest[0] = _type; - return 1; +int HyperLogLog::serialize(uint8_t* dest) { + uint8_t* ptr = dest; + switch (_type) { + case HLL_DATA_EMPTY: { + *ptr++ = _type; + break; } - - std::map index_to_value; - if (_type == HLL_DATA_SPRASE || _type == HLL_DATA_FULL || - _hash_set.size() > HLL_EXPLICLIT_INT64_NUM) { - merge_hash_set_to_registers(_registers, _hash_set); + case HLL_DATA_EXPLICIT: { + DCHECK(_hash_set.size() < HLL_EXPLICLIT_INT64_NUM) + << "Number of explicit elements(" << _hash_set.size() + << ") should be less or equal than " << HLL_EXPLICLIT_INT64_NUM; + *ptr++ = _type; + *ptr++ = (uint8_t)_hash_set.size(); + for (auto hash_value : _hash_set) { + encode_fixed64_le(ptr, hash_value); + ptr += 8; + } + break; + } + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: { + uint32_t num_non_zero_registers = 0; for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { if (_registers[i] != 0) { - index_to_value[i] = _registers[i]; + num_non_zero_registers++; } } - } + // each register in sparse format will occupy 3bytes, 2 for index and + // 1 for register value. So if num_non_zero_registers is greater than + // 4K we use full encode format. + if (num_non_zero_registers > HLL_SPARSE_THRESHOLD) { + *ptr++ = HLL_DATA_FULL; + memcpy(ptr, _registers, HLL_REGISTERS_COUNT); + ptr += HLL_REGISTERS_COUNT; + } else { + *ptr++ = HLL_DATA_SPRASE; + // 2-5(4 byte): number of registers + encode_fixed32_le(ptr, num_non_zero_registers); + ptr += 4; - int sparse_set_len = index_to_value.size() * (sizeof(SparseIndexType) - + sizeof(SparseValueType) + sizeof(SparseLengthValueType)); - int result_len = 0; - if (sparse_set_len >= HLL_COLUMN_DEFAULT_LEN) { - result_len = serialize_full(dest, _registers); - } else if (index_to_value.size() > 0) { - result_len = serialize_sparse(dest, index_to_value); - } else if (_hash_set.size() > 0) { - result_len = serialize_explicit(dest, _hash_set); + for (uint32_t i = 0; i < HLL_REGISTERS_COUNT; ++i) { + if (_registers[i] == 0) { + continue; + } + // 2 bytes: register index + // 1 byte: register value + encode_fixed16_le(ptr, i); + ptr += 2; + *ptr++ = _registers[i]; + } + } + break; } + } + return ptr - dest; +} - return result_len & 0xffff; +// TODO(zc): check input string's length +bool HyperLogLog::deserialize(const uint8_t* ptr) { + // can be called only when type is empty + DCHECK(_type == HLL_DATA_EMPTY); + + // first byte : type + _type = (HllDataType)*ptr++; + switch (_type) { + case HLL_DATA_EMPTY: + break; + case HLL_DATA_EXPLICIT: { + // 2: number of explicit values + // make sure that num_explicit is positive + uint8_t num_explicits = *ptr++; + // 3+: 8 bytes hash value + for (int i = 0; i < num_explicits; ++i) { + _hash_set.insert(decode_fixed64_le(ptr)); + ptr += 8; + } + break; + } + case HLL_DATA_SPRASE: { + _registers = new uint8_t[HLL_REGISTERS_COUNT]; + memset(_registers, 0, HLL_REGISTERS_COUNT); + + // 2-5(4 byte): number of registers + uint32_t num_registers = decode_fixed32_le(ptr); + ptr += 4; + for (uint32_t i = 0; i < num_registers; ++i) { + // 2 bytes: register index + // 1 byte: register value + uint16_t register_idx = decode_fixed16_le(ptr); + ptr += 2; + _registers[register_idx] = *ptr++; + } + break; + } + case HLL_DATA_FULL: { + _registers = new uint8_t[HLL_REGISTERS_COUNT]; + // 2+ : hll register value + memcpy(_registers, ptr, HLL_REGISTERS_COUNT); + break; + } + default: + return false; + } + return true; } int64_t HyperLogLog::estimate_cardinality() { if (_type == HLL_DATA_EMPTY) { return 0; } - - merge_hash_set_to_registers(_registers, _hash_set); + if (_type == HLL_DATA_EXPLICIT) { + return _hash_set.size(); + } const int num_streams = HLL_REGISTERS_COUNT; // Empirical constants for the algorithm. diff --git a/be/src/olap/hll.h b/be/src/olap/hll.h index e6506bd848..21807f4f07 100644 --- a/be/src/olap/hll.h +++ b/be/src/olap/hll.h @@ -23,15 +23,21 @@ #include #include -#include "olap/olap_common.h" +#include "gutil/macros.h" namespace doris { const static int HLL_COLUMN_PRECISION = 14; +const static int HLL_ZERO_COUNT_BITS = (64 - HLL_COLUMN_PRECISION); const static int HLL_EXPLICLIT_INT64_NUM = 160; -const static int HLL_REGISTERS_COUNT = 16384; +const static int HLL_SPARSE_THRESHOLD = 4096; +const static int HLL_REGISTERS_COUNT = 16 * 1024; // maximum size in byte of serialized HLL: type(1) + registers (2^14) -const static int HLL_COLUMN_DEFAULT_LEN = 16385; +const static int HLL_COLUMN_DEFAULT_LEN = HLL_REGISTERS_COUNT + 1; + +// 1 for type; 1 for hash values count; 8 for hash value +const static int HLL_SINGLE_VALUE_SIZE = 10; +const static int HLL_EMPTY_SIZE = 1; // Hyperloglog distinct estimate algorithm. // See these papers for more details. @@ -39,43 +45,61 @@ const static int HLL_COLUMN_DEFAULT_LEN = 16385; // algorithm (2007) // 2) HyperLogLog in Practice (paper from google with some improvements) -// 通过varchar的变长编码方式实现hll集合 -// 实现hll列中间计算结果的处理 -// empty 空集合 -// explicit 存储64位hash值的集合 -// sparse 存储hll非0的register -// full 存储全部的hll register -// empty -> explicit -> sparse -> full 四种类型的转换方向不可逆 -// 第一个字节存放hll集合的类型 0:empty 1:explicit 2:sparse 3:full -// 已决定后面的数据怎么解析 +// Each HLL value is a set of values. To save space, Doris store HLL value +// in different format according to its cardinality. +// +// HLL_DATA_EMPTY: when set is empty. +// +// HLL_DATA_EXPLICIT: when there is only few values in set, store these values explicit. +// If the number of hash values is not greater than 160, set is encoded in this format. +// The max space occupied is (1 + 1 + 160 * 8) = 1282. I don't know why 160 is choosed, +// maybe can be other number. If you are interested, you can try other number and see +// if it will be better. +// +// HLL_DATA_SPRASE: only store non-zero registers. If the number of non-zero registers +// is not greater than 4096, set is encoded in this format. The max space occupied is +// (1 + 4 + 3 * 4096) = 12293. +// +// HLL_DATA_FULL: most space-consuming, store all registers +// +// A HLL value will change in the sequence empty -> explicit -> sparse -> full, and not +// allow reverse. +// +// NOTE: This values are persisted in storage devices, so don't change exist +// enum values. +enum HllDataType { + HLL_DATA_EMPTY = 0, + HLL_DATA_EXPLICIT = 1, + HLL_DATA_SPRASE = 2, + HLL_DATA_FULL = 3, +}; + class HyperLogLog { public: - HyperLogLog(): _type(HLL_DATA_EMPTY){ - memset(_registers, 0, HLL_REGISTERS_COUNT); - } + HyperLogLog() = default; explicit HyperLogLog(uint64_t hash_value): _type(HLL_DATA_EXPLICIT) { _hash_set.emplace(hash_value); } + explicit HyperLogLog(const uint8_t* src); - explicit HyperLogLog(char* src); + ~HyperLogLog() { + delete[] _registers; + } typedef uint8_t SetTypeValueType; typedef int32_t SparseLengthValueType; typedef uint16_t SparseIndexType; typedef uint8_t SparseValueType; - // change the _type to HLL_DATA_FULL directly has two reasons: - // 1. keep behavior consistent with before - // 2. make the code logic is simple - void update(const uint64_t hash_value) { - _type = HLL_DATA_FULL; - update_registers(_registers, hash_value); - } + // Add a hash value to this HLL value + // NOTE: input must be a hash_value + void update(uint64_t hash_value); void merge(const HyperLogLog& other); - int serialize(char* dest); + int serialize(uint8_t* dest); + bool deserialize(const uint8_t* ptr); int64_t estimate_cardinality(); @@ -102,67 +126,36 @@ public: } private: - HllDataType _type; - char _registers[HLL_REGISTERS_COUNT]; + HllDataType _type = HLL_DATA_EMPTY; std::set _hash_set; - static void update_registers(char* registers, uint64_t hash_value) { + // This field is much space consumming(HLL_REGISTERS_COUNT), we craete + // it only when it is really needed. + uint8_t* _registers = nullptr; + +private: + DISALLOW_COPY_AND_ASSIGN(HyperLogLog); + + void _convert_explicit_to_register(); + + // update one hash value into this registers + void _update_registers(uint64_t hash_value) { // Use the lower bits to index into the number of streams and then // find the first 1 bit after the index bits. int idx = hash_value % HLL_REGISTERS_COUNT; - uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1; - registers[idx] = std::max((uint8_t)registers[idx], first_one_bit); + hash_value >>= HLL_COLUMN_PRECISION; + // make sure max first_one_bit is HLL_ZERO_COUNT_BITS + 1 + hash_value |= ((uint64_t)1 << HLL_ZERO_COUNT_BITS); + uint8_t first_one_bit = __builtin_ctzl(hash_value) + 1; + _registers[idx] = std::max((uint8_t)_registers[idx], first_one_bit); } - static void merge_hash_set_to_registers(char* registers, const std::set& hash_set) { - for (auto hash_value: hash_set) { - update_registers(registers, hash_value); + // absorb other registers into this registers + void _merge_registers(const uint8_t* other_registers) { + for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) { + _registers[i] = std::max(_registers[i], other_registers[i]); } } - - static void merge_registers(char* registers, const char* other_registers) { - for (int i = 0; i < doris::HLL_REGISTERS_COUNT; ++i) { - registers[i] = std::max(registers[i], other_registers[i]); - } - } - - static int serialize_full(char* result, char* registers) { - result[0] = HLL_DATA_FULL; - memcpy(result + 1, registers, HLL_REGISTERS_COUNT); - return HLL_COLUMN_DEFAULT_LEN; - } - - static int serialize_sparse(char *result, const std::map& index_to_value) { - result[0] = HLL_DATA_SPRASE; - int len = sizeof(SetTypeValueType) + sizeof(SparseLengthValueType); - char* write_value_pos = result + len; - for (auto iter = index_to_value.begin(); - iter != index_to_value.end(); iter++) { - write_value_pos[0] = (char)(iter->first & 0xff); - write_value_pos[1] = (char)(iter->first >> 8 & 0xff); - write_value_pos[2] = iter->second; - write_value_pos += 3; - } - int registers_count = index_to_value.size(); - len += registers_count * (sizeof(SparseIndexType)+ sizeof(SparseValueType)); - *(int*)(result + 1) = registers_count; - return len; - } - - static int serialize_explicit(char* result, const std::set& hash_value_set) { - result[0] = HLL_DATA_EXPLICIT; - result[1] = (uint8_t)(hash_value_set.size()); - int len = sizeof(SetTypeValueType) + sizeof(uint8_t); - char* write_pos = result + len; - for (auto iter = hash_value_set.begin(); - iter != hash_value_set.end(); iter++) { - uint64_t hash_value = *iter; - *(uint64_t*)write_pos = hash_value; - write_pos += 8; - } - len += sizeof(uint64_t) * hash_value_set.size(); - return len; - } }; // todo(kks): remove this when dpp_sink class was removed diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 18d31e8cba..3ab899d1c1 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -164,15 +164,6 @@ enum OLAPCompressionType { OLAP_COMP_LZ4 = 3, // 用于储存的压缩算法,压缩率低,cpu开销低 }; -// hll数据存储格式,优化存储结构减少多余空间的占用 -enum HllDataType { - HLL_DATA_EMPTY = 0, // 用于记录空的hll集合 - HLL_DATA_EXPLICIT, // 直接存储hash后结果的集合类型 - HLL_DATA_SPRASE, // 记录register不为空的集合类型 - HLL_DATA_FULL, // 记录完整的hll集合 - HLL_DATA_NONE -}; - enum PushType { PUSH_NORMAL = 1, PUSH_FOR_DELETE = 2, diff --git a/be/test/exprs/hll_function_test.cpp b/be/test/exprs/hll_function_test.cpp index 68a6664b06..64ece680f5 100644 --- a/be/test/exprs/hll_function_test.cpp +++ b/be/test/exprs/hll_function_test.cpp @@ -32,7 +32,7 @@ namespace doris { StringVal convert_hll_to_string(FunctionContext* ctx, HyperLogLog& hll) { std::string buf; buf.resize(HLL_COLUMN_DEFAULT_LEN); - int size = hll.serialize((char*)buf.c_str()); + int size = hll.serialize((uint8_t*)buf.c_str()); buf.resize(size); return AnyValUtil::from_string_temp(ctx, buf); } @@ -58,7 +58,7 @@ TEST_F(HllFunctionsTest, hll_hash) { StringVal input = AnyValUtil::from_string_temp(ctx, std::string("1024")); StringVal result = HllFunctions::hll_hash(ctx, input); - HyperLogLog hll((char*)result.ptr); + HyperLogLog hll((uint8_t*)result.ptr); int64_t cardinality = hll.estimate_cardinality(); int64_t expected = 1; @@ -69,7 +69,7 @@ TEST_F(HllFunctionsTest, hll_hash_null) { StringVal input = StringVal::null(); StringVal result = HllFunctions::hll_hash(ctx, input); - HyperLogLog hll((char*)result.ptr); + HyperLogLog hll((uint8_t*)result.ptr); int64_t cardinality = hll.estimate_cardinality(); int64_t expected = 0; @@ -102,7 +102,7 @@ TEST_F(HllFunctionsTest, hll_merge) { HllFunctions::hll_merge(ctx, src2, &dst); StringVal serialized = HllFunctions::hll_serialize(ctx, dst); - HyperLogLog hll((char*)serialized.ptr); + HyperLogLog hll((uint8_t*)serialized.ptr); BigIntVal expected(1); ASSERT_EQ(expected, hll.estimate_cardinality()); diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index e97de0bebe..9d671d6f30 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -70,3 +70,4 @@ ADD_BE_TEST(generic_iterators_test) ADD_BE_TEST(key_coder_test) ADD_BE_TEST(short_key_index_test) ADD_BE_TEST(page_cache_test) +ADD_BE_TEST(hll_test) diff --git a/be/test/olap/hll_test.cpp b/be/test/olap/hll_test.cpp new file mode 100644 index 0000000000..d28d4709d1 --- /dev/null +++ b/be/test/olap/hll_test.cpp @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/hll.h" + +#include + +#include "util/hash_util.hpp" + +namespace doris { + +class TestHll : public testing::Test { +public: + virtual ~TestHll() { } +}; + +static uint64_t hash(uint64_t value) { + return HashUtil::murmur_hash64A(&value, 8, 0); +} + +TEST_F(TestHll, Normal) { + uint8_t buf[HLL_REGISTERS_COUNT + 1]; + // empty + { + HyperLogLog empty_hll; + int len = empty_hll.serialize(buf); + ASSERT_EQ(1, len); + HyperLogLog test_hll(buf); + ASSERT_EQ(0, test_hll.estimate_cardinality()); + } + // explicit [0. 100) + HyperLogLog explicit_hll; + { + for (int i = 0; i < 100; ++i) { + explicit_hll.update(hash(i)); + } + int len = explicit_hll.serialize(buf); + ASSERT_EQ(1 + 1 + 100 * 8, len); + + HyperLogLog test_hll(buf); + test_hll.update(hash(0)); + { + HyperLogLog other_hll; + for (int i = 0; i < 100; ++i) { + other_hll.update(hash(i)); + } + test_hll.merge(other_hll); + } + ASSERT_EQ(100, test_hll.estimate_cardinality()); + } + // sparse [1024, 2048) + HyperLogLog sparse_hll; + { + for (int i = 0; i < 1024; ++i) { + sparse_hll.update(hash(i + 1024)); + } + int len = sparse_hll.serialize(buf); + ASSERT_TRUE(len < HLL_REGISTERS_COUNT + 1); + + HyperLogLog test_hll(buf); + test_hll.update(hash(1024)); + { + HyperLogLog other_hll; + for (int i = 0; i < 1024; ++i) { + other_hll.update(hash(i + 1024)); + } + test_hll.merge(other_hll); + } + auto cardinality = test_hll.estimate_cardinality(); + ASSERT_EQ(sparse_hll.estimate_cardinality(), cardinality); + // 2% error rate + ASSERT_TRUE(cardinality > 1000 && cardinality < 1045); + } + // full [64 * 1024, 128 * 1024) + HyperLogLog full_hll; + { + for (int i = 0; i < 64 * 1024; ++i) { + full_hll.update(hash(64 * 1024 + i)); + } + int len = full_hll.serialize(buf); + ASSERT_EQ(HLL_REGISTERS_COUNT + 1, len); + + HyperLogLog test_hll(buf); + auto cardinality = test_hll.estimate_cardinality(); + ASSERT_EQ(full_hll.estimate_cardinality(), cardinality); + // 2% error rate + ASSERT_TRUE(cardinality > 62 * 1024 && cardinality < 66 * 1024); + } + // merge explicit to empty_hll + { + HyperLogLog new_explicit_hll; + new_explicit_hll.merge(explicit_hll); + ASSERT_EQ(100, new_explicit_hll.estimate_cardinality()); + + // merge another explicit + { + HyperLogLog other_hll; + for (int i = 100; i < 200; ++i) { + other_hll.update(hash(i)); + } + // this is converted to full + other_hll.merge(new_explicit_hll); + ASSERT_TRUE(other_hll.estimate_cardinality() > 190); + } + // merge full + { + new_explicit_hll.merge(full_hll); + ASSERT_TRUE(new_explicit_hll.estimate_cardinality() > full_hll.estimate_cardinality()); + } + } + // merge sparse to empty_hll + { + HyperLogLog new_sparse_hll; + new_sparse_hll.merge(sparse_hll); + ASSERT_EQ(sparse_hll.estimate_cardinality(), new_sparse_hll.estimate_cardinality()); + + // merge explicit + new_sparse_hll.merge(explicit_hll); + ASSERT_TRUE(new_sparse_hll.estimate_cardinality() > sparse_hll.estimate_cardinality()); + + // merge full + new_sparse_hll.merge(full_hll); + ASSERT_TRUE(new_sparse_hll.estimate_cardinality() > full_hll.estimate_cardinality()); + } +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/run-ut.sh b/run-ut.sh index 8b16d47abb..c816e0a19e 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -272,6 +272,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/aggregate_func_test ${DORIS_TEST_BINARY_DIR}/olap/short_key_index_test ${DORIS_TEST_BINARY_DIR}/olap/key_coder_test ${DORIS_TEST_BINARY_DIR}/olap/page_cache_test +${DORIS_TEST_BINARY_DIR}/olap/hll_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test