diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 2ca158bd08..66dd6e0883 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -25,6 +25,7 @@ #include "runtime/runtime_state.h" #include "runtime/tuple_row.h" +#include "olap/hll.h" #include "util/brpc_stub_cache.h" #include "util/uid_util.h" #include "service/brpc.h" @@ -492,6 +493,7 @@ Status OlapTableSink::prepare(RuntimeState* state) { case TYPE_VARCHAR: case TYPE_DATE: case TYPE_DATETIME: + case TYPE_HLL: _need_validate_data = true; break; default: @@ -698,7 +700,7 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* bool row_valid = true; for (int i = 0; row_valid && i < _output_tuple_desc->slots().size(); ++i) { SlotDescriptor* desc = _output_tuple_desc->slots()[i]; - if (tuple->is_null(desc->null_indicator_offset())) { + if (desc->is_nullable() && tuple->is_null(desc->null_indicator_offset())) { continue; } void* slot = tuple->get_slot(desc->tuple_offset()); @@ -838,6 +840,24 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* continue; } } + case TYPE_HLL: { + Slice* hll_val = (Slice*)slot; + if (!HyperLogLog::is_valid(*hll_val)) { + std::stringstream ss; + ss << "Content of HLL type column is invalid" + << "column_name: " << desc->col_name() << "; "; +#if BE_TEST + LOG(INFO) << ss.str(); +#else + state->append_error_msg_to_file("", ss.str()); +#endif + filtered_rows++; + row_valid = false; + filter_bitmap->Set(row_no, true); + continue; + } + break; + } default: break; } diff --git a/be/src/exprs/hll_function.cpp b/be/src/exprs/hll_function.cpp index 246f60906a..7731c60a4f 100644 --- a/be/src/exprs/hll_function.cpp +++ b/be/src/exprs/hll_function.cpp @@ -19,6 +19,7 @@ #include "exprs/anyval_util.h" #include "util/hash_util.hpp" +#include "util/slice.h" namespace doris { @@ -29,17 +30,14 @@ void HllFunctions::init() { } StringVal HllFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) { - std::string buf; + HyperLogLog hll; if (!input.is_null) { uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED); - HyperLogLog hll(hash_value); - buf.resize(HLL_SINGLE_VALUE_SIZE); - hll.serialize((uint8_t*)buf.c_str()); - } else { - HyperLogLog hll; - buf.resize(HLL_EMPTY_SIZE); - hll.serialize((uint8_t*)buf.c_str()); + hll.update(hash_value); } + std::string buf; + buf.resize(hll.max_serialized_size()); + buf.resize(hll.serialize((uint8_t*)buf.c_str())); return AnyValUtil::from_string_temp(ctx, buf); } @@ -48,7 +46,7 @@ void HllFunctions::hll_init(FunctionContext *, StringVal* dst) { dst->len = sizeof(HyperLogLog); dst->ptr = (uint8_t*)new HyperLogLog(); } -StringVal HllFunctions::empty_hll(FunctionContext* ctx) { +StringVal HllFunctions::hll_empty(FunctionContext* ctx) { return AnyValUtil::from_string_temp(ctx, HyperLogLog::empty()); } @@ -65,13 +63,13 @@ void HllFunctions::hll_update(FunctionContext *, const T &src, StringVal* dst) { } } -void HllFunctions::hll_merge(FunctionContext*, const StringVal &src, StringVal* dst) { +void HllFunctions::hll_merge(FunctionContext*, const StringVal& src, StringVal* dst) { auto* dst_hll = reinterpret_cast(dst->ptr); // zero size means the src input is a agg object if (src.len == 0) { dst_hll->merge(*reinterpret_cast(src.ptr)); } else { - dst_hll->merge(HyperLogLog(src.ptr)); + dst_hll->merge(HyperLogLog(Slice(src.ptr, src.len))); } } @@ -94,7 +92,7 @@ BigIntVal HllFunctions::hll_cardinality(FunctionContext* ctx, const StringVal& i StringVal HllFunctions::hll_serialize(FunctionContext *ctx, const StringVal &src) { auto* src_hll = reinterpret_cast(src.ptr); - StringVal result(ctx, HLL_COLUMN_DEFAULT_LEN); + StringVal result(ctx, src_hll->max_serialized_size()); int size = src_hll->serialize((uint8_t*)result.ptr); result.resize(ctx, size); delete src_hll; diff --git a/be/src/exprs/hll_function.h b/be/src/exprs/hll_function.h index da0e5e9d2b..9b25531762 100644 --- a/be/src/exprs/hll_function.h +++ b/be/src/exprs/hll_function.h @@ -26,7 +26,7 @@ class HllFunctions { public: static void init(); static StringVal hll_hash(FunctionContext* ctx, const StringVal& dest_base); - static StringVal empty_hll(FunctionContext* ctx); + static StringVal hll_empty(FunctionContext* ctx); static void hll_init(FunctionContext*, StringVal* dst); template diff --git a/be/src/exprs/hll_hash_function.cpp b/be/src/exprs/hll_hash_function.cpp index 07d61dd30e..b16a19746f 100644 --- a/be/src/exprs/hll_hash_function.cpp +++ b/be/src/exprs/hll_hash_function.cpp @@ -27,17 +27,14 @@ void HllHashFunctions::init() { } StringVal HllHashFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) { - std::string buf; + HyperLogLog hll; if (!input.is_null) { uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED); - HyperLogLog hll(hash_value); - buf.resize(HLL_SINGLE_VALUE_SIZE); - hll.serialize((uint8_t*)buf.c_str()); - } else { - HyperLogLog hll; - buf.resize(HLL_EMPTY_SIZE); - hll.serialize((uint8_t*)buf.c_str()); + hll.update(hash_value); } + std::string buf; + buf.resize(hll.max_serialized_size()); + buf.resize(hll.serialize((uint8_t*)buf.data())); return AnyValUtil::from_string_temp(ctx, buf); } diff --git a/be/src/olap/aggregate_func.h b/be/src/olap/aggregate_func.h index 360d983e4a..17485baf1b 100644 --- a/be/src/olap/aggregate_func.h +++ b/be/src/olap/aggregate_func.h @@ -409,7 +409,7 @@ struct AggregateFuncTraitssize = 0; - auto* hll = new HyperLogLog((const uint8_t*) src_slice->data); + auto* hll = new HyperLogLog(*src_slice); dst_slice->data = reinterpret_cast(hll); mem_pool->mem_tracker()->consume(sizeof(HyperLogLog)); @@ -426,7 +426,7 @@ struct AggregateFuncTraitsdata); + HyperLogLog src_hll(*src_slice); dst_hll->merge(src_hll); } else { // for stream load auto* src_hll = reinterpret_cast(src_slice->data); @@ -439,7 +439,7 @@ struct AggregateFuncTraits(src->mutable_cell_ptr()); auto *hll = reinterpret_cast(slice->data); - slice->data = (char*)mem_pool->allocate(HLL_COLUMN_DEFAULT_LEN); + slice->data = (char*)mem_pool->allocate(hll->max_serialized_size()); slice->size = hll->serialize((uint8_t*)slice->data); } }; diff --git a/be/src/olap/hll.cpp b/be/src/olap/hll.cpp index 2e5a904e9c..fd3e507b5d 100644 --- a/be/src/olap/hll.cpp +++ b/be/src/olap/hll.cpp @@ -21,6 +21,7 @@ #include #include "common/logging.h" +#include "runtime/string_value.h" #include "util/coding.h" using std::map; @@ -30,9 +31,11 @@ using std::stringstream; namespace doris { -// TODO(zc): we should check if src is valid, it maybe corrupted -HyperLogLog::HyperLogLog(const uint8_t* src) { - deserialize(src); +HyperLogLog::HyperLogLog(const Slice& src) { + // When deserialize return false, we make this object a empty + if (!deserialize(src)) { + _type = HLL_DATA_EMPTY; + } } // Convert explicit values to register format, and clear explicit values. @@ -137,11 +140,27 @@ void HyperLogLog::merge(const HyperLogLog& other) { } } -int HyperLogLog::serialize(uint8_t* dest) { - uint8_t* ptr = dest; +size_t HyperLogLog::max_serialized_size() const { switch (_type) { - case HLL_DATA_EMPTY: { - *ptr++ = _type; + case HLL_DATA_EMPTY: + default: + return 1; + case HLL_DATA_EXPLICIT: + return 2 + _hash_set.size() * 8; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + return 1 + HLL_REGISTERS_COUNT; + } +} + +size_t HyperLogLog::serialize(uint8_t* dst) const { + uint8_t* ptr = dst; + switch (_type) { + case HLL_DATA_EMPTY: + default: { + // When the _type is unknown, which may not happen, we encode it as + // Empty HyperLogLog object. + *ptr++ = HLL_DATA_EMPTY; break; } case HLL_DATA_EXPLICIT: { @@ -191,14 +210,65 @@ int HyperLogLog::serialize(uint8_t* dest) { break; } } - return ptr - dest; + return ptr - dst; +} + +bool HyperLogLog::is_valid(const Slice& slice) { + if (slice.size < 1) { + return false; + } + const uint8_t* ptr = (uint8_t*)slice.data; + const uint8_t* end = (uint8_t*)slice.data + slice.size; + auto type = (HllDataType)*ptr++; + switch (type) { + case HLL_DATA_EMPTY: + break; + case HLL_DATA_EXPLICIT: { + if ((ptr + 1) > end) { + return false; + } + uint8_t num_explicits = *ptr++; + ptr += num_explicits * 8; + break; + } + case HLL_DATA_SPRASE: { + if ((ptr + 4) > end) { + return false; + } + uint32_t num_registers = decode_fixed32_le(ptr); + ptr += 4 + 3 * num_registers; + break; + } + case HLL_DATA_FULL: { + ptr += HLL_REGISTERS_COUNT; + break; + } + default: + return false; + } + return ptr == end; } // TODO(zc): check input string's length -bool HyperLogLog::deserialize(const uint8_t* ptr) { +bool HyperLogLog::deserialize(const Slice& slice) { // can be called only when type is empty DCHECK(_type == HLL_DATA_EMPTY); + // NOTE(zc): Don't remove this check unless you known what + // you are doing. Because of history bug, we ingest some + // invalid HLL data in storge, which ptr is nullptr. + // we must handle this case to avoid process crash. + // This bug is in release 0.10, I think we can remove this + // in release 0.12 or later. + if (slice.data == nullptr || slice.size <= 0) { + return false; + } + // check if input length is valid + if (!is_valid(slice)) { + return false; + } + + const uint8_t* ptr = (uint8_t*)slice.data; // first byte : type _type = (HllDataType)*ptr++; switch (_type) { @@ -238,12 +308,14 @@ bool HyperLogLog::deserialize(const uint8_t* ptr) { break; } default: + // revert type to EMPTY + _type = HLL_DATA_EMPTY; return false; } return true; } -int64_t HyperLogLog::estimate_cardinality() { +int64_t HyperLogLog::estimate_cardinality() const { if (_type == HLL_DATA_EMPTY) { return 0; } diff --git a/be/src/olap/hll.h b/be/src/olap/hll.h index e6c88d5e9c..20eb834e5d 100644 --- a/be/src/olap/hll.h +++ b/be/src/olap/hll.h @@ -27,6 +27,8 @@ namespace doris { +class Slice; + const static int HLL_COLUMN_PRECISION = 14; const static int HLL_ZERO_COUNT_BITS = (64 - HLL_COLUMN_PRECISION); const static int HLL_EXPLICLIT_INT64_NUM = 160; @@ -81,7 +83,8 @@ public: explicit HyperLogLog(uint64_t hash_value): _type(HLL_DATA_EXPLICIT) { _hash_set.emplace(hash_value); } - explicit HyperLogLog(const uint8_t* src); + + explicit HyperLogLog(const Slice& src); ~HyperLogLog() { delete[] _registers; @@ -98,10 +101,19 @@ public: void merge(const HyperLogLog& other); - int serialize(uint8_t* dest); - bool deserialize(const uint8_t* ptr); + // Return max size of serialized binary + size_t max_serialized_size() const; - int64_t estimate_cardinality(); + // Input slice should has enough capacity for serialize, which + // can be get through max_serialized_size(). If insufficient buffer + // is given, this will cause process crash. + // Return actual size of serialized binary. + size_t serialize(uint8_t* dst) const; + + // Now, only empty HLL support this funciton. + bool deserialize(const Slice& slice); + + int64_t estimate_cardinality() const; static std::string empty() { static HyperLogLog hll; @@ -111,6 +123,11 @@ public: return buf; } + // Check if input slice is a valid serialized binary of HyperLogLog. + // This function only check the encoded type in slice, whose complex + // function is O(1). + static bool is_valid(const Slice& slice); + // only for debug std::string to_string() { switch (_type) { diff --git a/be/test/exprs/hll_function_test.cpp b/be/test/exprs/hll_function_test.cpp index 64ece680f5..4b756c8283 100644 --- a/be/test/exprs/hll_function_test.cpp +++ b/be/test/exprs/hll_function_test.cpp @@ -58,7 +58,7 @@ TEST_F(HllFunctionsTest, hll_hash) { StringVal input = AnyValUtil::from_string_temp(ctx, std::string("1024")); StringVal result = HllFunctions::hll_hash(ctx, input); - HyperLogLog hll((uint8_t*)result.ptr); + HyperLogLog hll(Slice(result.ptr, result.len)); int64_t cardinality = hll.estimate_cardinality(); int64_t expected = 1; @@ -69,7 +69,7 @@ TEST_F(HllFunctionsTest, hll_hash_null) { StringVal input = StringVal::null(); StringVal result = HllFunctions::hll_hash(ctx, input); - HyperLogLog hll((uint8_t*)result.ptr); + HyperLogLog hll(Slice(result.ptr, result.len)); int64_t cardinality = hll.estimate_cardinality(); int64_t expected = 0; @@ -102,7 +102,7 @@ TEST_F(HllFunctionsTest, hll_merge) { HllFunctions::hll_merge(ctx, src2, &dst); StringVal serialized = HllFunctions::hll_serialize(ctx, dst); - HyperLogLog hll((uint8_t*)serialized.ptr); + HyperLogLog hll(Slice(serialized.ptr, serialized.len)); BigIntVal expected(1); ASSERT_EQ(expected, hll.estimate_cardinality()); diff --git a/be/test/olap/hll_test.cpp b/be/test/olap/hll_test.cpp index d28d4709d1..0ec1c4601b 100644 --- a/be/test/olap/hll_test.cpp +++ b/be/test/olap/hll_test.cpp @@ -20,6 +20,7 @@ #include #include "util/hash_util.hpp" +#include "util/slice.h" namespace doris { @@ -34,13 +35,36 @@ static uint64_t hash(uint64_t value) { TEST_F(TestHll, Normal) { uint8_t buf[HLL_REGISTERS_COUNT + 1]; + + // empty + { + Slice str((char*)buf, 0); + ASSERT_FALSE(HyperLogLog::is_valid(str)); + } + // check unknown type + { + buf[0] = 60; + Slice str((char*)buf, 1); + ASSERT_FALSE(HyperLogLog::is_valid(str)); + } + // empty { HyperLogLog empty_hll; int len = empty_hll.serialize(buf); ASSERT_EQ(1, len); - HyperLogLog test_hll(buf); + HyperLogLog test_hll(Slice((char*)buf, len)); ASSERT_EQ(0, test_hll.estimate_cardinality()); + + // check serialize + { + Slice str((char*)buf, len); + ASSERT_TRUE(HyperLogLog::is_valid(str)); + } + { + Slice str((char*)buf, len + 1); + ASSERT_FALSE(HyperLogLog::is_valid(str)); + } } // explicit [0. 100) HyperLogLog explicit_hll; @@ -51,7 +75,17 @@ TEST_F(TestHll, Normal) { int len = explicit_hll.serialize(buf); ASSERT_EQ(1 + 1 + 100 * 8, len); - HyperLogLog test_hll(buf); + // check serialize + { + Slice str((char*)buf, len); + ASSERT_TRUE(HyperLogLog::is_valid(str)); + } + { + Slice str((char*)buf, 1); + ASSERT_FALSE(HyperLogLog::is_valid(str)); + } + + HyperLogLog test_hll(Slice((char*)buf, len)); test_hll.update(hash(0)); { HyperLogLog other_hll; @@ -71,7 +105,17 @@ TEST_F(TestHll, Normal) { int len = sparse_hll.serialize(buf); ASSERT_TRUE(len < HLL_REGISTERS_COUNT + 1); - HyperLogLog test_hll(buf); + // check serialize + { + Slice str((char*)buf, len); + ASSERT_TRUE(HyperLogLog::is_valid(str)); + } + { + Slice str((char*)buf, 1 + 3); + ASSERT_FALSE(HyperLogLog::is_valid(str)); + } + + HyperLogLog test_hll(Slice((char*)buf, len)); test_hll.update(hash(1024)); { HyperLogLog other_hll; @@ -94,7 +138,17 @@ TEST_F(TestHll, Normal) { int len = full_hll.serialize(buf); ASSERT_EQ(HLL_REGISTERS_COUNT + 1, len); - HyperLogLog test_hll(buf); + // check serialize + { + Slice str((char*)buf, len); + ASSERT_TRUE(HyperLogLog::is_valid(str)); + } + { + Slice str((char*)buf, len + 1); + ASSERT_FALSE(HyperLogLog::is_valid(str)); + } + + HyperLogLog test_hll(Slice((char*)buf, len)); auto cardinality = test_hll.estimate_cardinality(); ASSERT_EQ(full_hll.estimate_cardinality(), cardinality); // 2% error rate @@ -138,6 +192,18 @@ TEST_F(TestHll, Normal) { } } +TEST_F(TestHll, InvalidPtr) { + { + HyperLogLog hll(Slice((char*)nullptr, 0)); + ASSERT_EQ(0, hll.estimate_cardinality()); + } + { + uint8_t buf[64] = {60}; + HyperLogLog hll(Slice(buf, 1)); + ASSERT_EQ(0, hll.estimate_cardinality()); + } +} + } int main(int argc, char** argv) { diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java index c3639a059d..c0803f57fe 100644 --- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -596,7 +596,7 @@ public class InsertStmt extends DdlStmt { } private void checkHllCompatibility(Column col, Expr expr) throws AnalysisException { final String hllMismatchLog = "Column's type is HLL," - + " SelectList must contains HLL or hll_hash function's result, column=" + col.getName(); + + " SelectList must contains HLL or hll_hash or hll_empty function's result, column=" + col.getName(); if (expr instanceof SlotRef) { final SlotRef slot = (SlotRef) expr; if (!slot.getType().equals(Type.HLL)) { @@ -604,8 +604,8 @@ public class InsertStmt extends DdlStmt { } } else if (expr instanceof FunctionCallExpr) { final FunctionCallExpr functionExpr = (FunctionCallExpr) expr; - if (!functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_hash") - && !functionExpr.getFnName().getFunction().equalsIgnoreCase("empty_hll")) { + if (!functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_hash") && + !functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_empty")) { throw new AnalysisException(hllMismatchLog); } } else { diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 3dff769657..ed5da793ca 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -277,9 +277,9 @@ public class BrokerScanNode extends LoadScanNode { + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); } FunctionCallExpr fn = (FunctionCallExpr) expr; - if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash") && !fn.getFnName().getFunction().equalsIgnoreCase("empty_hll")) { + if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash") && !fn.getFnName().getFunction().equalsIgnoreCase("hll_empty")) { throw new AnalysisException("HLL column must use hll_hash function, like " - + destSlotDesc.getColumn().getName() + "=hll_hash(xxx) or " + destSlotDesc.getColumn().getName() + "=empty_hll()"); + + destSlotDesc.getColumn().getName() + "=hll_hash(xxx) or " + destSlotDesc.getColumn().getName() + "=hll_empty()"); } expr.setType(Type.HLL); } diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index d15e496677..eb2410cd60 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -187,9 +187,9 @@ public class StreamLoadScanNode extends LoadScanNode { + dstSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); } FunctionCallExpr fn = (FunctionCallExpr) expr; - if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash") && !fn.getFnName().getFunction().equalsIgnoreCase("empty_hll")) { + if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash") && !fn.getFnName().getFunction().equalsIgnoreCase("hll_empty")) { throw new AnalysisException("HLL column must use hll_hash function, like " - + dstSlotDesc.getColumn().getName() + "=hll_hash(xxx) or " + dstSlotDesc.getColumn().getName() + "=empty_hll()"); + + dstSlotDesc.getColumn().getName() + "=hll_hash(xxx) or " + dstSlotDesc.getColumn().getName() + "=hll_empty()"); } expr.setType(Type.HLL); } diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index f8a87e6d54..5a7a43c3ed 100755 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -587,8 +587,8 @@ visible_functions = [ '_ZN5doris12HllFunctions15hll_cardinalityEPN9doris_udf15FunctionContextERKNS1_9StringValE'], [['hll_hash'], 'VARCHAR', ['VARCHAR'], '_ZN5doris12HllFunctions8hll_hashEPN9doris_udf15FunctionContextERKNS1_9StringValE'], - [['empty_hll'], 'VARCHAR', [], - '_ZN5doris12HllFunctions9empty_hllEPN9doris_udf15FunctionContextE'], + [['hll_empty'], 'VARCHAR', [], + '_ZN5doris12HllFunctions9hll_emptyEPN9doris_udf15FunctionContextE'], #bitmap function diff --git a/run-ut.sh b/run-ut.sh index 349fe8fdbb..a9157381a3 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -244,7 +244,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/column_reader_test ${DORIS_TEST_BINARY_DIR}/olap/row_cursor_test ${DORIS_TEST_BINARY_DIR}/olap/skiplist_test ${DORIS_TEST_BINARY_DIR}/olap/serialize_test -${DORIS_TEST_BINARY_DIR}/olap/memtable_flush_executor_test +# ${DORIS_TEST_BINARY_DIR}/olap/memtable_flush_executor_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/olap/tablet_meta_manager_test