[Bug][Vectorized] fix core dump with HLL and some refactor of Decompressor (#8668)

This commit is contained in:
Pxl
2022-03-31 17:05:08 +08:00
committed by GitHub
parent e684ffa6f5
commit 01cc0573aa
16 changed files with 102 additions and 43 deletions

View File

@ -52,6 +52,7 @@ header:
- 'be/src/util/sse2neon.h'
- 'be/src/util/utf8_check.cpp'
- 'build-support/run_clang_format.py'
- 'regression-test/common'
- 'regression-test/suites'
- 'regression-test/data'

View File

@ -55,8 +55,6 @@ Status Decompressor::create_decompressor(CompressType type, Decompressor** decom
return st;
}
Decompressor::~Decompressor() {}
std::string Decompressor::debug_info() {
return "Decompressor";
}
@ -71,7 +69,7 @@ GzipDecompressor::~GzipDecompressor() {
}
Status GzipDecompressor::init() {
_z_strm = {0};
_z_strm = {nullptr};
_z_strm.zalloc = Z_NULL;
_z_strm.zfree = Z_NULL;
_z_strm.opaque = Z_NULL;

View File

@ -34,7 +34,7 @@ enum CompressType { UNCOMPRESSED, GZIP, DEFLATE, BZIP2, LZ4FRAME, LZOP };
class Decompressor {
public:
virtual ~Decompressor();
virtual ~Decompressor() = default;
// implement in derived class
// input(in): buf where decompress begin
@ -71,19 +71,18 @@ protected:
class GzipDecompressor : public Decompressor {
public:
virtual ~GzipDecompressor();
~GzipDecompressor() override;
virtual Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
uint8_t* output, size_t output_max_len, size_t* decompressed_len,
bool* stream_end, size_t* more_input_bytes,
size_t* more_output_bytes) override;
Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) override;
virtual std::string debug_info() override;
std::string debug_info() override;
private:
friend class Decompressor;
GzipDecompressor(bool is_deflate);
virtual Status init() override;
Status init() override;
private:
bool _is_deflate;
@ -97,19 +96,18 @@ private:
class Bzip2Decompressor : public Decompressor {
public:
virtual ~Bzip2Decompressor();
~Bzip2Decompressor() override;
virtual Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
uint8_t* output, size_t output_max_len, size_t* decompressed_len,
bool* stream_end, size_t* more_input_bytes,
size_t* more_output_bytes) override;
Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) override;
virtual std::string debug_info() override;
std::string debug_info() override;
private:
friend class Decompressor;
Bzip2Decompressor() : Decompressor(CompressType::BZIP2) {}
virtual Status init() override;
Status init() override;
private:
bz_stream _bz_strm;
@ -117,19 +115,18 @@ private:
class Lz4FrameDecompressor : public Decompressor {
public:
virtual ~Lz4FrameDecompressor();
~Lz4FrameDecompressor() override;
virtual Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
uint8_t* output, size_t output_max_len, size_t* decompressed_len,
bool* stream_end, size_t* more_input_bytes,
size_t* more_output_bytes) override;
Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) override;
virtual std::string debug_info() override;
std::string debug_info() override;
private:
friend class Decompressor;
Lz4FrameDecompressor() : Decompressor(CompressType::LZ4FRAME) {}
virtual Status init() override;
Status init() override;
size_t get_block_size(const LZ4F_frameInfo_t* info);
@ -142,20 +139,19 @@ private:
#ifdef DORIS_WITH_LZO
class LzopDecompressor : public Decompressor {
public:
virtual ~LzopDecompressor();
~LzopDecompressor() override = default;
virtual Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
uint8_t* output, size_t output_max_len, size_t* decompressed_len,
bool* stream_end, size_t* more_input_bytes,
size_t* more_output_bytes) override;
Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) override;
virtual std::string debug_info() override;
std::string debug_info() override;
private:
friend class Decompressor;
LzopDecompressor()
: Decompressor(CompressType::LZOP), _header_info({0}), _is_header_loaded(false) {}
virtual Status init() override;
Status init() override;
private:
enum LzoChecksum { CHECK_NONE, CHECK_CRC32, CHECK_ADLER };

View File

@ -24,7 +24,7 @@ namespace doris {
// This class is used for CSV scanner, to read content line by line
class LineReader {
public:
virtual ~LineReader() {}
virtual ~LineReader() = default;
virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof) = 0;
virtual void close() = 0;

View File

@ -48,8 +48,6 @@ const uint64_t LzopDecompressor::F_ADLER32_C = 0x00000002L;
const uint64_t LzopDecompressor::F_CRC32_D = 0x00000100L;
const uint64_t LzopDecompressor::F_ADLER32_D = 0x00000001L;
LzopDecompressor::~LzopDecompressor() {}
Status LzopDecompressor::init() {
return Status::OK();
}

View File

@ -32,18 +32,18 @@ public:
Decompressor* decompressor, size_t length,
const std::string& line_delimiter, size_t line_delimiter_length);
virtual ~PlainTextLineReader();
~PlainTextLineReader() override;
virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override;
Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override;
virtual void close() override;
void close() override;
private:
bool update_eof();
inline size_t output_buf_read_remaining() { return _output_buf_limit - _output_buf_pos; }
inline size_t output_buf_read_remaining() const { return _output_buf_limit - _output_buf_pos; }
inline size_t input_buf_read_remaining() { return _input_buf_limit - _input_buf_pos; }
inline size_t input_buf_read_remaining() const { return _input_buf_limit - _input_buf_pos; }
inline bool done() { return _file_eof && output_buf_read_remaining() == 0; }

View File

@ -20,6 +20,8 @@
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_hll.h"
namespace doris::vectorized {
DataTypePtr DataTypeFactory::create_data_type(const doris::Field& col_desc) {
@ -239,6 +241,9 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
case PGenericType::BITMAP:
nested = std::make_shared<DataTypeBitMap>();
break;
case PGenericType::HLL:
nested = std::make_shared<DataTypeHLL>();
break;
case PGenericType::LIST:
DCHECK(pcolumn.children_size() == 1);
nested = std::make_shared<DataTypeArray>(create_data_type(pcolumn.children(0)));

View File

@ -0,0 +1,4 @@
insert into bitmap_basic_agg values
(1,bitmap_empty()),
(2,bitmap_hash(0)),(2,bitmap_hash(0)),
(3,bitmap_hash(0)),(3,bitmap_hash(1));

View File

@ -0,0 +1,4 @@
insert into hll_basic_agg values
(1,hll_empty()),
(2,hll_hash(0)),(2,hll_hash(0)),
(3,hll_hash(0)),(3,hll_hash(1));

View File

@ -0,0 +1,6 @@
create TABLE `bitmap_basic_agg` (
`k1` int(11) NULL,
`k2` bitmap BITMAP_UNION NULL
)AGGREGATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES("replication_num" = "1");

View File

@ -0,0 +1,6 @@
create TABLE `hll_basic_agg` (
`k1` int(11) NULL,
`k2` hll HLL_UNION NULL
)AGGREGATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES("replication_num" = "1");

View File

@ -73,3 +73,4 @@ TESTING AGAIN
-- !aggregate --
9223845.04 1607.2585798911111

View File

@ -8,3 +8,4 @@
0 abc
1 123
2 \N

View File

@ -1,7 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !diffrent_tag1 --
100
-- !diffrent_tag2 --
100
-- !diffrent_tag1 --
100

View File

@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql_bitmap --
1 \N
2 \N
3 \N
-- !sql_hll --
1 \N
2 \N
3 \N

View File

@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
def tables=["bitmap_basic_agg","hll_basic_agg"]
for (String table in tables) {
sql """drop table if exists ${table};"""
sql new File("""regression-test/common/table/${table}.sql""").text
sql new File("""regression-test/common/load/${table}.sql""").text
}
qt_sql_bitmap """select * from bitmap_basic_agg;"""
qt_sql_hll """select * from hll_basic_agg;"""