Make HLL be able to handle invalid data (#1908)

In this change list
1. validate HLL column when loading data, if data is invalid, this row
will be filtered.
2. seems as empty HLL when serializing invalid type of HLL data, with
this change, all ingested data will be valid.
3. seems as empty HLL when deserializing nullptr or invalid type of HLL data.
With this change, dirty data can be handled normally.
4. rename function empty_hll to hll_empty.
5. disable memtable_flush_execute_test because this will fails
sometimes. When tearing down, some thread is not joined, and they will
visit destroyed resource, which is invalid.
This commit is contained in:
ZHAO Chun
2019-09-29 10:55:23 +08:00
committed by GitHub
parent 58f1d79597
commit 8f016d3ab2
14 changed files with 226 additions and 56 deletions

View File

@ -25,6 +25,7 @@
#include "runtime/runtime_state.h"
#include "runtime/tuple_row.h"
#include "olap/hll.h"
#include "util/brpc_stub_cache.h"
#include "util/uid_util.h"
#include "service/brpc.h"
@ -492,6 +493,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
case TYPE_VARCHAR:
case TYPE_DATE:
case TYPE_DATETIME:
case TYPE_HLL:
_need_validate_data = true;
break;
default:
@ -698,7 +700,7 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap*
bool row_valid = true;
for (int i = 0; row_valid && i < _output_tuple_desc->slots().size(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
if (tuple->is_null(desc->null_indicator_offset())) {
if (desc->is_nullable() && tuple->is_null(desc->null_indicator_offset())) {
continue;
}
void* slot = tuple->get_slot(desc->tuple_offset());
@ -838,6 +840,24 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap*
continue;
}
}
case TYPE_HLL: {
Slice* hll_val = (Slice*)slot;
if (!HyperLogLog::is_valid(*hll_val)) {
std::stringstream ss;
ss << "Content of HLL type column is invalid"
<< "column_name: " << desc->col_name() << "; ";
#if BE_TEST
LOG(INFO) << ss.str();
#else
state->append_error_msg_to_file("", ss.str());
#endif
filtered_rows++;
row_valid = false;
filter_bitmap->Set(row_no, true);
continue;
}
break;
}
default:
break;
}

View File

@ -19,6 +19,7 @@
#include "exprs/anyval_util.h"
#include "util/hash_util.hpp"
#include "util/slice.h"
namespace doris {
@ -29,17 +30,14 @@ void HllFunctions::init() {
}
StringVal HllFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) {
std::string buf;
HyperLogLog hll;
if (!input.is_null) {
uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED);
HyperLogLog hll(hash_value);
buf.resize(HLL_SINGLE_VALUE_SIZE);
hll.serialize((uint8_t*)buf.c_str());
} else {
HyperLogLog hll;
buf.resize(HLL_EMPTY_SIZE);
hll.serialize((uint8_t*)buf.c_str());
hll.update(hash_value);
}
std::string buf;
buf.resize(hll.max_serialized_size());
buf.resize(hll.serialize((uint8_t*)buf.c_str()));
return AnyValUtil::from_string_temp(ctx, buf);
}
@ -48,7 +46,7 @@ void HllFunctions::hll_init(FunctionContext *, StringVal* dst) {
dst->len = sizeof(HyperLogLog);
dst->ptr = (uint8_t*)new HyperLogLog();
}
StringVal HllFunctions::empty_hll(FunctionContext* ctx) {
StringVal HllFunctions::hll_empty(FunctionContext* ctx) {
return AnyValUtil::from_string_temp(ctx, HyperLogLog::empty());
}
@ -65,13 +63,13 @@ void HllFunctions::hll_update(FunctionContext *, const T &src, StringVal* dst) {
}
}
void HllFunctions::hll_merge(FunctionContext*, const StringVal &src, StringVal* dst) {
void HllFunctions::hll_merge(FunctionContext*, const StringVal& src, StringVal* dst) {
auto* dst_hll = reinterpret_cast<HyperLogLog*>(dst->ptr);
// zero size means the src input is a agg object
if (src.len == 0) {
dst_hll->merge(*reinterpret_cast<HyperLogLog*>(src.ptr));
} else {
dst_hll->merge(HyperLogLog(src.ptr));
dst_hll->merge(HyperLogLog(Slice(src.ptr, src.len)));
}
}
@ -94,7 +92,7 @@ BigIntVal HllFunctions::hll_cardinality(FunctionContext* ctx, const StringVal& i
StringVal HllFunctions::hll_serialize(FunctionContext *ctx, const StringVal &src) {
auto* src_hll = reinterpret_cast<HyperLogLog*>(src.ptr);
StringVal result(ctx, HLL_COLUMN_DEFAULT_LEN);
StringVal result(ctx, src_hll->max_serialized_size());
int size = src_hll->serialize((uint8_t*)result.ptr);
result.resize(ctx, size);
delete src_hll;

View File

@ -26,7 +26,7 @@ class HllFunctions {
public:
static void init();
static StringVal hll_hash(FunctionContext* ctx, const StringVal& dest_base);
static StringVal empty_hll(FunctionContext* ctx);
static StringVal hll_empty(FunctionContext* ctx);
static void hll_init(FunctionContext*, StringVal* dst);
template <typename T>

View File

@ -27,17 +27,14 @@ void HllHashFunctions::init() {
}
StringVal HllHashFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) {
std::string buf;
HyperLogLog hll;
if (!input.is_null) {
uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED);
HyperLogLog hll(hash_value);
buf.resize(HLL_SINGLE_VALUE_SIZE);
hll.serialize((uint8_t*)buf.c_str());
} else {
HyperLogLog hll;
buf.resize(HLL_EMPTY_SIZE);
hll.serialize((uint8_t*)buf.c_str());
hll.update(hash_value);
}
std::string buf;
buf.resize(hll.max_serialized_size());
buf.resize(hll.serialize((uint8_t*)buf.data()));
return AnyValUtil::from_string_temp(ctx, buf);
}

View File

@ -409,7 +409,7 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL
// we use zero size represent this slice is a agg object
dst_slice->size = 0;
auto* hll = new HyperLogLog((const uint8_t*) src_slice->data);
auto* hll = new HyperLogLog(*src_slice);
dst_slice->data = reinterpret_cast<char*>(hll);
mem_pool->mem_tracker()->consume(sizeof(HyperLogLog));
@ -426,7 +426,7 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL
// fixme(kks): trick here, need improve
if (mem_pool == nullptr) { // for query
HyperLogLog src_hll((const uint8_t*)src_slice->data);
HyperLogLog src_hll(*src_slice);
dst_hll->merge(src_hll);
} else { // for stream load
auto* src_hll = reinterpret_cast<HyperLogLog*>(src_slice->data);
@ -439,7 +439,7 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL
auto *slice = reinterpret_cast<Slice*>(src->mutable_cell_ptr());
auto *hll = reinterpret_cast<HyperLogLog*>(slice->data);
slice->data = (char*)mem_pool->allocate(HLL_COLUMN_DEFAULT_LEN);
slice->data = (char*)mem_pool->allocate(hll->max_serialized_size());
slice->size = hll->serialize((uint8_t*)slice->data);
}
};

View File

@ -21,6 +21,7 @@
#include <map>
#include "common/logging.h"
#include "runtime/string_value.h"
#include "util/coding.h"
using std::map;
@ -30,9 +31,11 @@ using std::stringstream;
namespace doris {
// TODO(zc): we should check if src is valid, it maybe corrupted
HyperLogLog::HyperLogLog(const uint8_t* src) {
deserialize(src);
HyperLogLog::HyperLogLog(const Slice& src) {
// When deserialize return false, we make this object a empty
if (!deserialize(src)) {
_type = HLL_DATA_EMPTY;
}
}
// Convert explicit values to register format, and clear explicit values.
@ -137,11 +140,27 @@ void HyperLogLog::merge(const HyperLogLog& other) {
}
}
int HyperLogLog::serialize(uint8_t* dest) {
uint8_t* ptr = dest;
size_t HyperLogLog::max_serialized_size() const {
switch (_type) {
case HLL_DATA_EMPTY: {
*ptr++ = _type;
case HLL_DATA_EMPTY:
default:
return 1;
case HLL_DATA_EXPLICIT:
return 2 + _hash_set.size() * 8;
case HLL_DATA_SPRASE:
case HLL_DATA_FULL:
return 1 + HLL_REGISTERS_COUNT;
}
}
size_t HyperLogLog::serialize(uint8_t* dst) const {
uint8_t* ptr = dst;
switch (_type) {
case HLL_DATA_EMPTY:
default: {
// When the _type is unknown, which may not happen, we encode it as
// Empty HyperLogLog object.
*ptr++ = HLL_DATA_EMPTY;
break;
}
case HLL_DATA_EXPLICIT: {
@ -191,14 +210,65 @@ int HyperLogLog::serialize(uint8_t* dest) {
break;
}
}
return ptr - dest;
return ptr - dst;
}
bool HyperLogLog::is_valid(const Slice& slice) {
if (slice.size < 1) {
return false;
}
const uint8_t* ptr = (uint8_t*)slice.data;
const uint8_t* end = (uint8_t*)slice.data + slice.size;
auto type = (HllDataType)*ptr++;
switch (type) {
case HLL_DATA_EMPTY:
break;
case HLL_DATA_EXPLICIT: {
if ((ptr + 1) > end) {
return false;
}
uint8_t num_explicits = *ptr++;
ptr += num_explicits * 8;
break;
}
case HLL_DATA_SPRASE: {
if ((ptr + 4) > end) {
return false;
}
uint32_t num_registers = decode_fixed32_le(ptr);
ptr += 4 + 3 * num_registers;
break;
}
case HLL_DATA_FULL: {
ptr += HLL_REGISTERS_COUNT;
break;
}
default:
return false;
}
return ptr == end;
}
// TODO(zc): check input string's length
bool HyperLogLog::deserialize(const uint8_t* ptr) {
bool HyperLogLog::deserialize(const Slice& slice) {
// can be called only when type is empty
DCHECK(_type == HLL_DATA_EMPTY);
// NOTE(zc): Don't remove this check unless you known what
// you are doing. Because of history bug, we ingest some
// invalid HLL data in storge, which ptr is nullptr.
// we must handle this case to avoid process crash.
// This bug is in release 0.10, I think we can remove this
// in release 0.12 or later.
if (slice.data == nullptr || slice.size <= 0) {
return false;
}
// check if input length is valid
if (!is_valid(slice)) {
return false;
}
const uint8_t* ptr = (uint8_t*)slice.data;
// first byte : type
_type = (HllDataType)*ptr++;
switch (_type) {
@ -238,12 +308,14 @@ bool HyperLogLog::deserialize(const uint8_t* ptr) {
break;
}
default:
// revert type to EMPTY
_type = HLL_DATA_EMPTY;
return false;
}
return true;
}
int64_t HyperLogLog::estimate_cardinality() {
int64_t HyperLogLog::estimate_cardinality() const {
if (_type == HLL_DATA_EMPTY) {
return 0;
}

View File

@ -27,6 +27,8 @@
namespace doris {
class Slice;
const static int HLL_COLUMN_PRECISION = 14;
const static int HLL_ZERO_COUNT_BITS = (64 - HLL_COLUMN_PRECISION);
const static int HLL_EXPLICLIT_INT64_NUM = 160;
@ -81,7 +83,8 @@ public:
explicit HyperLogLog(uint64_t hash_value): _type(HLL_DATA_EXPLICIT) {
_hash_set.emplace(hash_value);
}
explicit HyperLogLog(const uint8_t* src);
explicit HyperLogLog(const Slice& src);
~HyperLogLog() {
delete[] _registers;
@ -98,10 +101,19 @@ public:
void merge(const HyperLogLog& other);
int serialize(uint8_t* dest);
bool deserialize(const uint8_t* ptr);
// Return max size of serialized binary
size_t max_serialized_size() const;
int64_t estimate_cardinality();
// Input slice should has enough capacity for serialize, which
// can be get through max_serialized_size(). If insufficient buffer
// is given, this will cause process crash.
// Return actual size of serialized binary.
size_t serialize(uint8_t* dst) const;
// Now, only empty HLL support this funciton.
bool deserialize(const Slice& slice);
int64_t estimate_cardinality() const;
static std::string empty() {
static HyperLogLog hll;
@ -111,6 +123,11 @@ public:
return buf;
}
// Check if input slice is a valid serialized binary of HyperLogLog.
// This function only check the encoded type in slice, whose complex
// function is O(1).
static bool is_valid(const Slice& slice);
// only for debug
std::string to_string() {
switch (_type) {

View File

@ -58,7 +58,7 @@ TEST_F(HllFunctionsTest, hll_hash) {
StringVal input = AnyValUtil::from_string_temp(ctx, std::string("1024"));
StringVal result = HllFunctions::hll_hash(ctx, input);
HyperLogLog hll((uint8_t*)result.ptr);
HyperLogLog hll(Slice(result.ptr, result.len));
int64_t cardinality = hll.estimate_cardinality();
int64_t expected = 1;
@ -69,7 +69,7 @@ TEST_F(HllFunctionsTest, hll_hash_null) {
StringVal input = StringVal::null();
StringVal result = HllFunctions::hll_hash(ctx, input);
HyperLogLog hll((uint8_t*)result.ptr);
HyperLogLog hll(Slice(result.ptr, result.len));
int64_t cardinality = hll.estimate_cardinality();
int64_t expected = 0;
@ -102,7 +102,7 @@ TEST_F(HllFunctionsTest, hll_merge) {
HllFunctions::hll_merge(ctx, src2, &dst);
StringVal serialized = HllFunctions::hll_serialize(ctx, dst);
HyperLogLog hll((uint8_t*)serialized.ptr);
HyperLogLog hll(Slice(serialized.ptr, serialized.len));
BigIntVal expected(1);
ASSERT_EQ(expected, hll.estimate_cardinality());

View File

@ -20,6 +20,7 @@
#include <gtest/gtest.h>
#include "util/hash_util.hpp"
#include "util/slice.h"
namespace doris {
@ -34,13 +35,36 @@ static uint64_t hash(uint64_t value) {
TEST_F(TestHll, Normal) {
uint8_t buf[HLL_REGISTERS_COUNT + 1];
// empty
{
Slice str((char*)buf, 0);
ASSERT_FALSE(HyperLogLog::is_valid(str));
}
// check unknown type
{
buf[0] = 60;
Slice str((char*)buf, 1);
ASSERT_FALSE(HyperLogLog::is_valid(str));
}
// empty
{
HyperLogLog empty_hll;
int len = empty_hll.serialize(buf);
ASSERT_EQ(1, len);
HyperLogLog test_hll(buf);
HyperLogLog test_hll(Slice((char*)buf, len));
ASSERT_EQ(0, test_hll.estimate_cardinality());
// check serialize
{
Slice str((char*)buf, len);
ASSERT_TRUE(HyperLogLog::is_valid(str));
}
{
Slice str((char*)buf, len + 1);
ASSERT_FALSE(HyperLogLog::is_valid(str));
}
}
// explicit [0. 100)
HyperLogLog explicit_hll;
@ -51,7 +75,17 @@ TEST_F(TestHll, Normal) {
int len = explicit_hll.serialize(buf);
ASSERT_EQ(1 + 1 + 100 * 8, len);
HyperLogLog test_hll(buf);
// check serialize
{
Slice str((char*)buf, len);
ASSERT_TRUE(HyperLogLog::is_valid(str));
}
{
Slice str((char*)buf, 1);
ASSERT_FALSE(HyperLogLog::is_valid(str));
}
HyperLogLog test_hll(Slice((char*)buf, len));
test_hll.update(hash(0));
{
HyperLogLog other_hll;
@ -71,7 +105,17 @@ TEST_F(TestHll, Normal) {
int len = sparse_hll.serialize(buf);
ASSERT_TRUE(len < HLL_REGISTERS_COUNT + 1);
HyperLogLog test_hll(buf);
// check serialize
{
Slice str((char*)buf, len);
ASSERT_TRUE(HyperLogLog::is_valid(str));
}
{
Slice str((char*)buf, 1 + 3);
ASSERT_FALSE(HyperLogLog::is_valid(str));
}
HyperLogLog test_hll(Slice((char*)buf, len));
test_hll.update(hash(1024));
{
HyperLogLog other_hll;
@ -94,7 +138,17 @@ TEST_F(TestHll, Normal) {
int len = full_hll.serialize(buf);
ASSERT_EQ(HLL_REGISTERS_COUNT + 1, len);
HyperLogLog test_hll(buf);
// check serialize
{
Slice str((char*)buf, len);
ASSERT_TRUE(HyperLogLog::is_valid(str));
}
{
Slice str((char*)buf, len + 1);
ASSERT_FALSE(HyperLogLog::is_valid(str));
}
HyperLogLog test_hll(Slice((char*)buf, len));
auto cardinality = test_hll.estimate_cardinality();
ASSERT_EQ(full_hll.estimate_cardinality(), cardinality);
// 2% error rate
@ -138,6 +192,18 @@ TEST_F(TestHll, Normal) {
}
}
TEST_F(TestHll, InvalidPtr) {
{
HyperLogLog hll(Slice((char*)nullptr, 0));
ASSERT_EQ(0, hll.estimate_cardinality());
}
{
uint8_t buf[64] = {60};
HyperLogLog hll(Slice(buf, 1));
ASSERT_EQ(0, hll.estimate_cardinality());
}
}
}
int main(int argc, char** argv) {

View File

@ -596,7 +596,7 @@ public class InsertStmt extends DdlStmt {
}
private void checkHllCompatibility(Column col, Expr expr) throws AnalysisException {
final String hllMismatchLog = "Column's type is HLL,"
+ " SelectList must contains HLL or hll_hash function's result, column=" + col.getName();
+ " SelectList must contains HLL or hll_hash or hll_empty function's result, column=" + col.getName();
if (expr instanceof SlotRef) {
final SlotRef slot = (SlotRef) expr;
if (!slot.getType().equals(Type.HLL)) {
@ -604,8 +604,8 @@ public class InsertStmt extends DdlStmt {
}
} else if (expr instanceof FunctionCallExpr) {
final FunctionCallExpr functionExpr = (FunctionCallExpr) expr;
if (!functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_hash")
&& !functionExpr.getFnName().getFunction().equalsIgnoreCase("empty_hll")) {
if (!functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_hash") &&
!functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_empty")) {
throw new AnalysisException(hllMismatchLog);
}
} else {

View File

@ -277,9 +277,9 @@ public class BrokerScanNode extends LoadScanNode {
+ destSlotDesc.getColumn().getName() + "=hll_hash(xxx)");
}
FunctionCallExpr fn = (FunctionCallExpr) expr;
if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash") && !fn.getFnName().getFunction().equalsIgnoreCase("empty_hll")) {
if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash") && !fn.getFnName().getFunction().equalsIgnoreCase("hll_empty")) {
throw new AnalysisException("HLL column must use hll_hash function, like "
+ destSlotDesc.getColumn().getName() + "=hll_hash(xxx) or " + destSlotDesc.getColumn().getName() + "=empty_hll()");
+ destSlotDesc.getColumn().getName() + "=hll_hash(xxx) or " + destSlotDesc.getColumn().getName() + "=hll_empty()");
}
expr.setType(Type.HLL);
}

View File

@ -187,9 +187,9 @@ public class StreamLoadScanNode extends LoadScanNode {
+ dstSlotDesc.getColumn().getName() + "=hll_hash(xxx)");
}
FunctionCallExpr fn = (FunctionCallExpr) expr;
if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash") && !fn.getFnName().getFunction().equalsIgnoreCase("empty_hll")) {
if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash") && !fn.getFnName().getFunction().equalsIgnoreCase("hll_empty")) {
throw new AnalysisException("HLL column must use hll_hash function, like "
+ dstSlotDesc.getColumn().getName() + "=hll_hash(xxx) or " + dstSlotDesc.getColumn().getName() + "=empty_hll()");
+ dstSlotDesc.getColumn().getName() + "=hll_hash(xxx) or " + dstSlotDesc.getColumn().getName() + "=hll_empty()");
}
expr.setType(Type.HLL);
}

View File

@ -587,8 +587,8 @@ visible_functions = [
'_ZN5doris12HllFunctions15hll_cardinalityEPN9doris_udf15FunctionContextERKNS1_9StringValE'],
[['hll_hash'], 'VARCHAR', ['VARCHAR'],
'_ZN5doris12HllFunctions8hll_hashEPN9doris_udf15FunctionContextERKNS1_9StringValE'],
[['empty_hll'], 'VARCHAR', [],
'_ZN5doris12HllFunctions9empty_hllEPN9doris_udf15FunctionContextE'],
[['hll_empty'], 'VARCHAR', [],
'_ZN5doris12HllFunctions9hll_emptyEPN9doris_udf15FunctionContextE'],
#bitmap function

View File

@ -244,7 +244,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/column_reader_test
${DORIS_TEST_BINARY_DIR}/olap/row_cursor_test
${DORIS_TEST_BINARY_DIR}/olap/skiplist_test
${DORIS_TEST_BINARY_DIR}/olap/serialize_test
${DORIS_TEST_BINARY_DIR}/olap/memtable_flush_executor_test
# ${DORIS_TEST_BINARY_DIR}/olap/memtable_flush_executor_test
# Running routine load test
${DORIS_TEST_BINARY_DIR}/olap/tablet_meta_manager_test