From e4e04e8203480a1157f607ef263ce7744e13d0df Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 7 Jun 2019 22:26:54 +0800 Subject: [PATCH] Make LZO support optional (#1263) --- README.md | 9 +- be/CMakeLists.txt | 17 +- be/src/exec/CMakeLists.txt | 6 + be/src/exec/decompressor.cpp | 381 +---------------- be/src/exec/decompressor.h | 5 + be/src/exec/lzo_decompressor.cpp | 403 ++++++++++++++++++ be/src/exec/olap_meta_reader.cpp | 1 - be/src/olap/compress.cpp | 2 + be/src/olap/compress.h | 2 + be/src/olap/in_stream.cpp | 4 +- be/src/olap/olap_define.h | 1 + be/src/olap/olap_header.h | 3 - be/src/olap/out_stream.cpp | 2 + be/src/olap/push_handler.cpp | 11 + be/src/olap/segment_reader.cpp | 2 + be/src/olap/utils.cpp | 13 + be/test/exec/CMakeLists.txt | 2 + be/test/olap/column_reader_test.cpp | 4 +- be/test/olap/run_length_byte_test.cpp | 37 +- build.sh | 34 +- .../doris/http/rest/SetConfigAction.java | 3 +- run-ut.sh | 4 +- thirdparty/LICENSE.txt | 66 +++ 23 files changed, 588 insertions(+), 424 deletions(-) create mode 100644 be/src/exec/lzo_decompressor.cpp create mode 100644 thirdparty/LICENSE.txt diff --git a/README.md b/README.md index 909dc74afd..ab9c3d4a41 100644 --- a/README.md +++ b/README.md @@ -115,11 +115,16 @@ sh build.sh After successfully building, it will install binary files in the directory `output/`. -## 5. Reporting Issues +## 5. Licence Notice + +License of some of third-party dependencies are not compatible with Apache 2.0 License. So you may have to disable +some features of Doris to be complied with Apache 2.0 License. Details can be found in `thirdparty/LICENSE.txt` + +## 6. Reporting Issues If you find any bugs, please file a [GitHub issue](https://github.com/apache/incubator-doris/issues). -## 6. Links +## 7. Links * Doris official site - * User Manual (GitHub Wiki) - diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 0c0ae3ae7e..b9886571dc 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -152,8 +152,10 @@ set_target_properties(thrift PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/ add_library(thriftnb STATIC IMPORTED) set_target_properties(thriftnb PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libthriftnb.a) -add_library(lzo STATIC IMPORTED) -set_target_properties(lzo PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/liblzo2.a) +if(WITH_LZO) + add_library(lzo STATIC IMPORTED) + set_target_properties(lzo PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/liblzo2.a) +endif() if (WITH_MYSQL) add_library(mysql STATIC IMPORTED) @@ -281,6 +283,10 @@ if (WITH_MYSQL) set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DDORIS_WITH_MYSQL") endif() +if (WITH_LZO) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DDORIS_WITH_LZO") +endif() + if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -faligned-new") endif() @@ -464,7 +470,6 @@ set(DORIS_DEPENDENCIES librdkafka_cpp librdkafka libs2 - lzo snappy ${Boost_LIBRARIES} ${LLVM_MODULE_LIBS} @@ -487,6 +492,12 @@ set(DORIS_DEPENDENCIES ${WL_END_GROUP} ) +if(WITH_LZO) + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} + lzo + ) +endif() + # Add all external dependencies. They should come after the palo libs. # static link gcc's lib set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 387a61550c..b5ad4d4f1a 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -101,6 +101,12 @@ if (WITH_MYSQL) ) endif() +if (WITH_LZO) + set(EXEC_FILES ${EXEC_FILES} + lzo_decompressor.cpp + ) +endif() + if(EXISTS "${BASE_DIR}/src/exec/kudu_util.cpp") set(EXEC_FILES ${EXEC_FILES} #kudu_scan_node.cpp diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index 7f8eb4283a..0104285417 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -37,9 +37,11 @@ Status Decompressor::create_decompressor(CompressType type, case CompressType::LZ4FRAME: *decompressor = new Lz4FrameDecompressor(); break; +#ifdef DORIS_WITH_LZO case CompressType::LZOP: *decompressor = new LzopDecompressor(); break; +#endif default: std::stringstream ss; ss << "Unknown compress type: " << type; @@ -341,383 +343,4 @@ size_t Lz4FrameDecompressor::get_block_size(const LZ4F_frameInfo_t* info) { } } -// Lzop -const uint8_t LzopDecompressor::LZOP_MAGIC[9] = - { 0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a }; - -const uint64_t LzopDecompressor::LZOP_VERSION = 0x1030; -const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100; -// magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1) -// + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1) -// without the real file name, extra field and checksum -const uint32_t LzopDecompressor::MIN_HEADER_SIZE = 34; -const uint32_t LzopDecompressor::LZO_MAX_BLOCK_SIZE = (64*1024l*1024l); - -const uint32_t LzopDecompressor::CRC32_INIT_VALUE = 0; -const uint32_t LzopDecompressor::ADLER32_INIT_VALUE = 1; - -const uint64_t LzopDecompressor::F_H_CRC32 = 0x00001000L; -const uint64_t LzopDecompressor::F_MASK = 0x00003FFFL; -const uint64_t LzopDecompressor::F_OS_MASK = 0xff000000L; -const uint64_t LzopDecompressor::F_CS_MASK = 0x00f00000L; -const uint64_t LzopDecompressor::F_RESERVED = ((F_MASK | F_OS_MASK | F_CS_MASK) ^ 0xffffffffL); -const uint64_t LzopDecompressor::F_MULTIPART = 0x00000400L; -const uint64_t LzopDecompressor::F_H_FILTER = 0x00000800L; -const uint64_t LzopDecompressor::F_H_EXTRA_FIELD = 0x00000040L; -const uint64_t LzopDecompressor::F_CRC32_C = 0x00000200L; -const uint64_t LzopDecompressor::F_ADLER32_C = 0x00000002L; -const uint64_t LzopDecompressor::F_CRC32_D = 0x00000100L; -const uint64_t LzopDecompressor::F_ADLER32_D = 0x00000001L; - -LzopDecompressor::~LzopDecompressor() { -} - -Status LzopDecompressor::init() { - return Status::OK; -} - -Status LzopDecompressor::decompress( - uint8_t* input, size_t input_len, size_t* input_bytes_read, - uint8_t* output, size_t output_max_len, - size_t* decompressed_len, bool* stream_end, - size_t* more_input_bytes, size_t* more_output_bytes) { - - if (!_is_header_loaded) { - // this is the first time to call lzo decompress, parse the header info first - RETURN_IF_ERROR(parse_header_info(input, input_len, input_bytes_read, more_input_bytes)); - if (*more_input_bytes > 0) { - return Status::OK; - } - } - - // LOG(INFO) << "after load header: " << *input_bytes_read; - - // read compressed block - // compressed-block ::= - // - // - // - // - // - int left_input_len = input_len - *input_bytes_read; - if (left_input_len < sizeof(uint32_t)) { - // block is at least have uncompressed_size - *more_input_bytes = sizeof(uint32_t) - left_input_len; - return Status::OK; - } - - uint8_t* block_start = input + *input_bytes_read; - uint8_t* ptr = block_start; - // 1. uncompressed size - uint32_t uncompressed_size; - ptr = get_uint32(ptr, &uncompressed_size); - left_input_len -= sizeof(uint32_t); - if (uncompressed_size == 0) { - *stream_end = true; - return Status::OK; - } - - // 2. compressed size - if (left_input_len < sizeof(uint32_t)) { - *more_input_bytes = sizeof(uint32_t) - left_input_len; - return Status::OK; - } - - uint32_t compressed_size; - ptr = get_uint32(ptr, &compressed_size); - left_input_len -= sizeof(uint32_t); - if (compressed_size > LZO_MAX_BLOCK_SIZE) { - std::stringstream ss; - ss << "lzo block size: " << compressed_size << " is greater than LZO_MAX_BLOCK_SIZE: " - << LZO_MAX_BLOCK_SIZE; - return Status(ss.str()); - } - - // 3. out checksum - uint32_t out_checksum = 0; - if (_header_info.output_checksum_type != CHECK_NONE) { - if (left_input_len < sizeof(uint32_t)) { - *more_input_bytes = sizeof(uint32_t) - left_input_len; - return Status::OK; - } - - ptr = get_uint32(ptr, &out_checksum); - left_input_len -= sizeof(uint32_t); - } - - // 4. in checksum - uint32_t in_checksum = 0; - if (compressed_size < uncompressed_size && _header_info.input_checksum_type != CHECK_NONE) { - if (left_input_len < sizeof(uint32_t)) { - *more_input_bytes = sizeof(uint32_t) - left_input_len; - return Status::OK; - } - - ptr = get_uint32(ptr, &out_checksum); - left_input_len -= sizeof(uint32_t); - } else { - // If the compressed data size is equal to the uncompressed data size, then - // the uncompressed data is stored and there is no compressed checksum. - in_checksum = out_checksum; - } - - // 5. checksum compressed data - if (left_input_len < compressed_size) { - *more_input_bytes = compressed_size - left_input_len; - return Status::OK; - } - RETURN_IF_ERROR(checksum(_header_info.input_checksum_type, - "compressed", in_checksum, ptr, compressed_size)); - - // 6. decompress - if (output_max_len < uncompressed_size) { - *more_output_bytes = uncompressed_size - output_max_len; - return Status::OK; - } - if (compressed_size == uncompressed_size) { - // the data is uncompressed, just copy to the output buf - memmove(output, ptr, compressed_size); - ptr += compressed_size; - } else { - // decompress - *decompressed_len = uncompressed_size; - int ret = lzo1x_decompress_safe(ptr, compressed_size, - output, reinterpret_cast(&uncompressed_size), nullptr); - if (ret != LZO_E_OK || uncompressed_size != *decompressed_len) { - std::stringstream ss; - ss << "Lzo decompression failed with ret: " << ret - << " decompressed len: " << uncompressed_size - << " expected: " << *decompressed_len; - return Status(ss.str()); - } - - RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, "decompressed", - out_checksum, output, uncompressed_size)); - ptr += compressed_size; - } - - // 7. peek next block's uncompressed size - uint32_t next_uncompressed_size; - get_uint32(ptr, &next_uncompressed_size); - if (next_uncompressed_size == 0) { - // 0 means current block is the last block. - // consume this uncompressed_size to finish reading. - ptr += sizeof(uint32_t); - } - - // 8. done - *stream_end = true; - *decompressed_len = uncompressed_size; - *input_bytes_read += ptr - block_start; - - LOG(INFO) << "finished decompress lzo block." - << " compressed_size: " << compressed_size - << " decompressed_len: " << *decompressed_len - << " input_bytes_read: " << *input_bytes_read - << " next_uncompressed_size: " << next_uncompressed_size; - - return Status::OK; -} - -// file-header ::= -- most of this information is not used. -// -// -// -// [] -- present for all modern files. -// -// -// -// -// -// -// -// -- presence indicated in flags, not currently used. -Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len, - size_t* input_bytes_read, - size_t* more_input_bytes) { - if (input_len < MIN_HEADER_SIZE) { - LOG(INFO) << "highly recommanded that Lzo header size is larger than " << MIN_HEADER_SIZE - << ", or parsing header info may failed." - << " only given: " << input_len; - *more_input_bytes = MIN_HEADER_SIZE - input_len; - return Status::OK; - } - - uint8_t* ptr = input; - // 1. magic - if (memcmp(ptr, LZOP_MAGIC, sizeof(LZOP_MAGIC))) { - std::stringstream ss; - ss << "invalid lzo magic number"; - return Status(ss.str()); - } - ptr += sizeof(LZOP_MAGIC); - uint8_t* header = ptr; - - // 2. version - ptr = get_uint16(ptr, &_header_info.version); - if (_header_info.version > LZOP_VERSION) { - std::stringstream ss; - ss << "compressed with later version of lzop: " << &_header_info.version - << " must be less than: " << LZOP_VERSION; - return Status(ss.str()); - } - - // 3. lib version - ptr = get_uint16(ptr, &_header_info.lib_version); - if (_header_info.lib_version < MIN_LZO_VERSION) { - std::stringstream ss; - ss << "compressed with incompatible lzo version: " << &_header_info.lib_version - << "must be at least: " << MIN_LZO_VERSION; - return Status(ss.str()); - } - - // 4. version needed - ptr = get_uint16(ptr, &_header_info.version_needed); - if (_header_info.version_needed > LZOP_VERSION) { - std::stringstream ss; - ss << "compressed with imp incompatible lzo version: " << &_header_info.version - << " must be at no more than: " << LZOP_VERSION; - return Status(ss.str()); - } - - // 5. method - ptr = get_uint8(ptr, &_header_info.method); - if (_header_info.method < 1 || _header_info.method > 3) { - std::stringstream ss; - ss << "invalid compression method: " << _header_info.method; - return Status(ss.str()); - } - - // 6. skip level - ++ptr; - - // 7. flags - uint32_t flags; - ptr = get_uint32(ptr, &flags); - if (flags & (F_RESERVED | F_MULTIPART | F_H_FILTER)) { - std::stringstream ss; - ss << "unsupported lzo flags: " << flags; - return Status(ss.str()); - } - _header_info.header_checksum_type = header_type(flags); - _header_info.input_checksum_type = input_type(flags); - _header_info.output_checksum_type = output_type(flags); - - // 8. skip mode and mtime - ptr += 3 * sizeof(int32_t); - - // 9. filename - uint8_t filename_len; - ptr = get_uint8(ptr, &filename_len); - - // here we already consume (MIN_HEADER_SIZE) - // from now we have to check left input is enough for each step - size_t left = input_len - (ptr - input); - if (left < filename_len) { - *more_input_bytes = filename_len - left; - return Status::OK; - } - - _header_info.filename = std::string((char*) ptr, (size_t) filename_len); - ptr += filename_len; - left -= filename_len; - - // 10. checksum - if (left < sizeof(uint32_t)) { - *more_input_bytes = sizeof(uint32_t) - left; - return Status::OK; - } - uint32_t expected_checksum; - uint8_t* cur = ptr; - ptr = get_uint32(ptr, &expected_checksum); - uint32_t computed_checksum; - if (_header_info.header_checksum_type == CHECK_CRC32) { - computed_checksum = CRC32_INIT_VALUE; - computed_checksum = lzo_crc32(computed_checksum, header, cur - header); - } else { - computed_checksum = ADLER32_INIT_VALUE; - computed_checksum = lzo_adler32(computed_checksum, header, cur - header); - } - - if (computed_checksum != expected_checksum) { - std::stringstream ss; - ss << "invalid header checksum: " << computed_checksum - << " expected: " << expected_checksum; - return Status(ss.str()); - } - left -= sizeof(uint32_t); - - // 11. skip extra - if (flags & F_H_EXTRA_FIELD) { - if (left < sizeof(uint32_t)) { - *more_input_bytes = sizeof(uint32_t) - left; - return Status::OK; - } - uint32_t extra_len; - ptr = get_uint32(ptr, &extra_len); - left -= sizeof(uint32_t); - - // add the checksum and the len to the total ptr size. - if (left < sizeof(int32_t) + extra_len) { - *more_input_bytes = sizeof(int32_t) + extra_len - left; - return Status::OK; - } - left -= sizeof(int32_t) + extra_len; - ptr += sizeof(int32_t) + extra_len; - } - - _header_info.header_size = ptr - input; - *input_bytes_read = _header_info.header_size; - - _is_header_loaded = true; - LOG(INFO) << debug_info(); - - return Status::OK; -} - -Status LzopDecompressor::checksum(LzoChecksum type, const std::string& source, - uint32_t expected, - uint8_t* ptr, size_t len) { - uint32_t computed_checksum; - switch (type) { - case CHECK_NONE: - return Status::OK; - case CHECK_CRC32: - computed_checksum = lzo_crc32(CRC32_INIT_VALUE, ptr, len); - break; - case CHECK_ADLER: - computed_checksum = lzo_adler32(ADLER32_INIT_VALUE, ptr, len); - break; - default: - std::stringstream ss; - ss << "Invalid checksum type: " << type; - return Status(ss.str()); - } - - if (computed_checksum != expected) { - std::stringstream ss; - ss << "checksum of " << source << " block failed." - << " computed checksum: " << computed_checksum - << " expected: " << expected; - return Status(ss.str()); - } - - return Status::OK; -} - -std::string LzopDecompressor::debug_info() { - std::stringstream ss; - ss << "LzopDecompressor." - << " version: " << _header_info.version - << " lib version: " << _header_info.lib_version - << " version needed: " << _header_info.version_needed - << " method: " << (uint16_t) _header_info.method - << " filename: " << _header_info.filename - << " header size: " << _header_info.header_size - << " header checksum type: " << _header_info.header_checksum_type - << " input checksum type: " << _header_info.input_checksum_type - << " ouput checksum type: " << _header_info.output_checksum_type; - return ss.str(); -} - } // namespace diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h index 88ee50e1b2..9f81c187fa 100644 --- a/be/src/exec/decompressor.h +++ b/be/src/exec/decompressor.h @@ -20,8 +20,11 @@ #include #include #include + +#ifdef DORIS_WITH_LZO #include #include +#endif #include "common/status.h" @@ -148,6 +151,7 @@ private: const static unsigned DORIS_LZ4F_VERSION; }; +#ifdef DORIS_WITH_LZO class LzopDecompressor : public Decompressor { public: virtual ~LzopDecompressor(); @@ -255,5 +259,6 @@ private: const static uint64_t F_CRC32_D; const static uint64_t F_ADLER32_D; }; +#endif // DORIS_WITH_LZO } // namespace diff --git a/be/src/exec/lzo_decompressor.cpp b/be/src/exec/lzo_decompressor.cpp new file mode 100644 index 0000000000..c82d0644c0 --- /dev/null +++ b/be/src/exec/lzo_decompressor.cpp @@ -0,0 +1,403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/decompressor.h" + +namespace doris { + +#ifdef DORIS_WITH_LZO +// Lzop +const uint8_t LzopDecompressor::LZOP_MAGIC[9] = + { 0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a }; + +const uint64_t LzopDecompressor::LZOP_VERSION = 0x1030; +const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100; +// magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1) +// + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1) +// without the real file name, extra field and checksum +const uint32_t LzopDecompressor::MIN_HEADER_SIZE = 34; +const uint32_t LzopDecompressor::LZO_MAX_BLOCK_SIZE = (64*1024l*1024l); + +const uint32_t LzopDecompressor::CRC32_INIT_VALUE = 0; +const uint32_t LzopDecompressor::ADLER32_INIT_VALUE = 1; + +const uint64_t LzopDecompressor::F_H_CRC32 = 0x00001000L; +const uint64_t LzopDecompressor::F_MASK = 0x00003FFFL; +const uint64_t LzopDecompressor::F_OS_MASK = 0xff000000L; +const uint64_t LzopDecompressor::F_CS_MASK = 0x00f00000L; +const uint64_t LzopDecompressor::F_RESERVED = ((F_MASK | F_OS_MASK | F_CS_MASK) ^ 0xffffffffL); +const uint64_t LzopDecompressor::F_MULTIPART = 0x00000400L; +const uint64_t LzopDecompressor::F_H_FILTER = 0x00000800L; +const uint64_t LzopDecompressor::F_H_EXTRA_FIELD = 0x00000040L; +const uint64_t LzopDecompressor::F_CRC32_C = 0x00000200L; +const uint64_t LzopDecompressor::F_ADLER32_C = 0x00000002L; +const uint64_t LzopDecompressor::F_CRC32_D = 0x00000100L; +const uint64_t LzopDecompressor::F_ADLER32_D = 0x00000001L; + +LzopDecompressor::~LzopDecompressor() { +} + +Status LzopDecompressor::init() { + return Status::OK; +} + +Status LzopDecompressor::decompress( + uint8_t* input, size_t input_len, size_t* input_bytes_read, + uint8_t* output, size_t output_max_len, + size_t* decompressed_len, bool* stream_end, + size_t* more_input_bytes, size_t* more_output_bytes) { + + if (!_is_header_loaded) { + // this is the first time to call lzo decompress, parse the header info first + RETURN_IF_ERROR(parse_header_info(input, input_len, input_bytes_read, more_input_bytes)); + if (*more_input_bytes > 0) { + return Status::OK; + } + } + + // LOG(INFO) << "after load header: " << *input_bytes_read; + + // read compressed block + // compressed-block ::= + // + // + // + // + // + int left_input_len = input_len - *input_bytes_read; + if (left_input_len < sizeof(uint32_t)) { + // block is at least have uncompressed_size + *more_input_bytes = sizeof(uint32_t) - left_input_len; + return Status::OK; + } + + uint8_t* block_start = input + *input_bytes_read; + uint8_t* ptr = block_start; + // 1. uncompressed size + uint32_t uncompressed_size; + ptr = get_uint32(ptr, &uncompressed_size); + left_input_len -= sizeof(uint32_t); + if (uncompressed_size == 0) { + *stream_end = true; + return Status::OK; + } + + // 2. compressed size + if (left_input_len < sizeof(uint32_t)) { + *more_input_bytes = sizeof(uint32_t) - left_input_len; + return Status::OK; + } + + uint32_t compressed_size; + ptr = get_uint32(ptr, &compressed_size); + left_input_len -= sizeof(uint32_t); + if (compressed_size > LZO_MAX_BLOCK_SIZE) { + std::stringstream ss; + ss << "lzo block size: " << compressed_size << " is greater than LZO_MAX_BLOCK_SIZE: " + << LZO_MAX_BLOCK_SIZE; + return Status(ss.str()); + } + + // 3. out checksum + uint32_t out_checksum = 0; + if (_header_info.output_checksum_type != CHECK_NONE) { + if (left_input_len < sizeof(uint32_t)) { + *more_input_bytes = sizeof(uint32_t) - left_input_len; + return Status::OK; + } + + ptr = get_uint32(ptr, &out_checksum); + left_input_len -= sizeof(uint32_t); + } + + // 4. in checksum + uint32_t in_checksum = 0; + if (compressed_size < uncompressed_size && _header_info.input_checksum_type != CHECK_NONE) { + if (left_input_len < sizeof(uint32_t)) { + *more_input_bytes = sizeof(uint32_t) - left_input_len; + return Status::OK; + } + + ptr = get_uint32(ptr, &out_checksum); + left_input_len -= sizeof(uint32_t); + } else { + // If the compressed data size is equal to the uncompressed data size, then + // the uncompressed data is stored and there is no compressed checksum. + in_checksum = out_checksum; + } + + // 5. checksum compressed data + if (left_input_len < compressed_size) { + *more_input_bytes = compressed_size - left_input_len; + return Status::OK; + } + RETURN_IF_ERROR(checksum(_header_info.input_checksum_type, + "compressed", in_checksum, ptr, compressed_size)); + + // 6. decompress + if (output_max_len < uncompressed_size) { + *more_output_bytes = uncompressed_size - output_max_len; + return Status::OK; + } + if (compressed_size == uncompressed_size) { + // the data is uncompressed, just copy to the output buf + memmove(output, ptr, compressed_size); + ptr += compressed_size; + } else { + // decompress + *decompressed_len = uncompressed_size; + int ret = lzo1x_decompress_safe(ptr, compressed_size, + output, reinterpret_cast(&uncompressed_size), nullptr); + if (ret != LZO_E_OK || uncompressed_size != *decompressed_len) { + std::stringstream ss; + ss << "Lzo decompression failed with ret: " << ret + << " decompressed len: " << uncompressed_size + << " expected: " << *decompressed_len; + return Status(ss.str()); + } + + RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, "decompressed", + out_checksum, output, uncompressed_size)); + ptr += compressed_size; + } + + // 7. peek next block's uncompressed size + uint32_t next_uncompressed_size; + get_uint32(ptr, &next_uncompressed_size); + if (next_uncompressed_size == 0) { + // 0 means current block is the last block. + // consume this uncompressed_size to finish reading. + ptr += sizeof(uint32_t); + } + + // 8. done + *stream_end = true; + *decompressed_len = uncompressed_size; + *input_bytes_read += ptr - block_start; + + LOG(INFO) << "finished decompress lzo block." + << " compressed_size: " << compressed_size + << " decompressed_len: " << *decompressed_len + << " input_bytes_read: " << *input_bytes_read + << " next_uncompressed_size: " << next_uncompressed_size; + + return Status::OK; +} + +// file-header ::= -- most of this information is not used. +// +// +// +// [] -- present for all modern files. +// +// +// +// +// +// +// +// -- presence indicated in flags, not currently used. +Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len, + size_t* input_bytes_read, + size_t* more_input_bytes) { + if (input_len < MIN_HEADER_SIZE) { + LOG(INFO) << "highly recommanded that Lzo header size is larger than " << MIN_HEADER_SIZE + << ", or parsing header info may failed." + << " only given: " << input_len; + *more_input_bytes = MIN_HEADER_SIZE - input_len; + return Status::OK; + } + + uint8_t* ptr = input; + // 1. magic + if (memcmp(ptr, LZOP_MAGIC, sizeof(LZOP_MAGIC))) { + std::stringstream ss; + ss << "invalid lzo magic number"; + return Status(ss.str()); + } + ptr += sizeof(LZOP_MAGIC); + uint8_t* header = ptr; + + // 2. version + ptr = get_uint16(ptr, &_header_info.version); + if (_header_info.version > LZOP_VERSION) { + std::stringstream ss; + ss << "compressed with later version of lzop: " << &_header_info.version + << " must be less than: " << LZOP_VERSION; + return Status(ss.str()); + } + + // 3. lib version + ptr = get_uint16(ptr, &_header_info.lib_version); + if (_header_info.lib_version < MIN_LZO_VERSION) { + std::stringstream ss; + ss << "compressed with incompatible lzo version: " << &_header_info.lib_version + << "must be at least: " << MIN_LZO_VERSION; + return Status(ss.str()); + } + + // 4. version needed + ptr = get_uint16(ptr, &_header_info.version_needed); + if (_header_info.version_needed > LZOP_VERSION) { + std::stringstream ss; + ss << "compressed with imp incompatible lzo version: " << &_header_info.version + << " must be at no more than: " << LZOP_VERSION; + return Status(ss.str()); + } + + // 5. method + ptr = get_uint8(ptr, &_header_info.method); + if (_header_info.method < 1 || _header_info.method > 3) { + std::stringstream ss; + ss << "invalid compression method: " << _header_info.method; + return Status(ss.str()); + } + + // 6. skip level + ++ptr; + + // 7. flags + uint32_t flags; + ptr = get_uint32(ptr, &flags); + if (flags & (F_RESERVED | F_MULTIPART | F_H_FILTER)) { + std::stringstream ss; + ss << "unsupported lzo flags: " << flags; + return Status(ss.str()); + } + _header_info.header_checksum_type = header_type(flags); + _header_info.input_checksum_type = input_type(flags); + _header_info.output_checksum_type = output_type(flags); + + // 8. skip mode and mtime + ptr += 3 * sizeof(int32_t); + + // 9. filename + uint8_t filename_len; + ptr = get_uint8(ptr, &filename_len); + + // here we already consume (MIN_HEADER_SIZE) + // from now we have to check left input is enough for each step + size_t left = input_len - (ptr - input); + if (left < filename_len) { + *more_input_bytes = filename_len - left; + return Status::OK; + } + + _header_info.filename = std::string((char*) ptr, (size_t) filename_len); + ptr += filename_len; + left -= filename_len; + + // 10. checksum + if (left < sizeof(uint32_t)) { + *more_input_bytes = sizeof(uint32_t) - left; + return Status::OK; + } + uint32_t expected_checksum; + uint8_t* cur = ptr; + ptr = get_uint32(ptr, &expected_checksum); + uint32_t computed_checksum; + if (_header_info.header_checksum_type == CHECK_CRC32) { + computed_checksum = CRC32_INIT_VALUE; + computed_checksum = lzo_crc32(computed_checksum, header, cur - header); + } else { + computed_checksum = ADLER32_INIT_VALUE; + computed_checksum = lzo_adler32(computed_checksum, header, cur - header); + } + + if (computed_checksum != expected_checksum) { + std::stringstream ss; + ss << "invalid header checksum: " << computed_checksum + << " expected: " << expected_checksum; + return Status(ss.str()); + } + left -= sizeof(uint32_t); + + // 11. skip extra + if (flags & F_H_EXTRA_FIELD) { + if (left < sizeof(uint32_t)) { + *more_input_bytes = sizeof(uint32_t) - left; + return Status::OK; + } + uint32_t extra_len; + ptr = get_uint32(ptr, &extra_len); + left -= sizeof(uint32_t); + + // add the checksum and the len to the total ptr size. + if (left < sizeof(int32_t) + extra_len) { + *more_input_bytes = sizeof(int32_t) + extra_len - left; + return Status::OK; + } + left -= sizeof(int32_t) + extra_len; + ptr += sizeof(int32_t) + extra_len; + } + + _header_info.header_size = ptr - input; + *input_bytes_read = _header_info.header_size; + + _is_header_loaded = true; + LOG(INFO) << debug_info(); + + return Status::OK; +} + +Status LzopDecompressor::checksum(LzoChecksum type, const std::string& source, + uint32_t expected, + uint8_t* ptr, size_t len) { + uint32_t computed_checksum; + switch (type) { + case CHECK_NONE: + return Status::OK; + case CHECK_CRC32: + computed_checksum = lzo_crc32(CRC32_INIT_VALUE, ptr, len); + break; + case CHECK_ADLER: + computed_checksum = lzo_adler32(ADLER32_INIT_VALUE, ptr, len); + break; + default: + std::stringstream ss; + ss << "Invalid checksum type: " << type; + return Status(ss.str()); + } + + if (computed_checksum != expected) { + std::stringstream ss; + ss << "checksum of " << source << " block failed." + << " computed checksum: " << computed_checksum + << " expected: " << expected; + return Status(ss.str()); + } + + return Status::OK; +} + +std::string LzopDecompressor::debug_info() { + std::stringstream ss; + ss << "LzopDecompressor." + << " version: " << _header_info.version + << " lib version: " << _header_info.lib_version + << " version needed: " << _header_info.version_needed + << " method: " << (uint16_t) _header_info.method + << " filename: " << _header_info.filename + << " header size: " << _header_info.header_size + << " header checksum type: " << _header_info.header_checksum_type + << " input checksum type: " << _header_info.input_checksum_type + << " ouput checksum type: " << _header_info.output_checksum_type; + return ss.str(); +} +#endif // DORIS_WITH_LZO + +} // namespace diff --git a/be/src/exec/olap_meta_reader.cpp b/be/src/exec/olap_meta_reader.cpp index 5a2bdd5119..fb5b65cbae 100644 --- a/be/src/exec/olap_meta_reader.cpp +++ b/be/src/exec/olap_meta_reader.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include "gen_cpp/PaloInternalService_types.h" #include "olap_scanner.h" diff --git a/be/src/olap/compress.cpp b/be/src/olap/compress.cpp index c5b6ef7f55..7c2fb20cd4 100644 --- a/be/src/olap/compress.cpp +++ b/be/src/olap/compress.cpp @@ -22,6 +22,7 @@ namespace doris { +#ifdef DORIS_WITH_LZO OLAPStatus lzo_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller) { size_t out_length = 0; OLAPStatus res = OLAP_SUCCESS; @@ -59,6 +60,7 @@ OLAPStatus lzo_decompress(StorageByteBuffer* in, StorageByteBuffer* out) { return res; } +#endif OLAPStatus lz4_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller) { size_t out_length = 0; diff --git a/be/src/olap/compress.h b/be/src/olap/compress.h index 832f0e4cd1..9506f19e99 100644 --- a/be/src/olap/compress.h +++ b/be/src/olap/compress.h @@ -43,8 +43,10 @@ typedef OLAPStatus(*Compressor)(StorageByteBuffer* in, StorageByteBuffer* out, b // OLAP_ERR_DECOMPRESS_ERROR - 解压缩错误 typedef OLAPStatus(*Decompressor)(StorageByteBuffer* in, StorageByteBuffer* out); +#ifdef DORIS_WITH_LZO OLAPStatus lzo_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller); OLAPStatus lzo_decompress(StorageByteBuffer* in, StorageByteBuffer* out); +#endif OLAPStatus lz4_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller); OLAPStatus lz4_decompress(StorageByteBuffer* in, StorageByteBuffer* out); diff --git a/be/src/olap/in_stream.cpp b/be/src/olap/in_stream.cpp index 20bf6b59a8..eaacff9b12 100644 --- a/be/src/olap/in_stream.cpp +++ b/be/src/olap/in_stream.cpp @@ -155,7 +155,7 @@ OLAPStatus InStream::_assure_data() { _compressed->get((char*)&head, sizeof(head)); if (head.length > _compress_buffer_size) { - OLAP_LOG_WARNING("chunk size if larger than buffer size. [chunk=%u buffer_size=%u]", + OLAP_LOG_WARNING("chunk size is larger than buffer size. [chunk=%u buffer_size=%u]", head.length, _compress_buffer_size); return OLAP_ERR_COLUMN_READ_STREAM; } @@ -189,7 +189,7 @@ OLAPStatus InStream::_assure_data() { } } else { OLAP_LOG_WARNING("compressed remaining size less than stream head size. " - "[compressed_remaining_size=%lu stream_head_size.=%lu]", + "[compressed_remaining_size=%lu stream_head_size=%lu]", _compressed->remaining(), sizeof(StreamHead)); return OLAP_ERR_COLUMN_READ_STREAM; } diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 18ec838cb4..932ac5f675 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -159,6 +159,7 @@ enum OLAPStatus { OLAP_ERR_INVALID_CLUSTER_INFO = -225, OLAP_ERR_TRANSACTION_NOT_EXIST = -226, OLAP_ERR_DISK_FAILURE = -227, + OLAP_ERR_LZO_DISABLED = -228, // CommandExecutor // [-300, -400) diff --git a/be/src/olap/olap_header.h b/be/src/olap/olap_header.h index 23f97efab5..daaa9a0d41 100644 --- a/be/src/olap/olap_header.h +++ b/be/src/olap/olap_header.h @@ -125,9 +125,6 @@ public: } void change_file_version_to_delta(); private: - // Compute schema hash(all fields name and type, index name and its field - // names) using lzo_adler32 function. - OLAPStatus _compute_schema_hash(SchemaHash* schema_hash); void _convert_file_version_to_delta(const FileVersionMessage& version, PDelta* delta); // full path of olap header file diff --git a/be/src/olap/out_stream.cpp b/be/src/olap/out_stream.cpp index 7442a56371..3b9002956c 100644 --- a/be/src/olap/out_stream.cpp +++ b/be/src/olap/out_stream.cpp @@ -32,9 +32,11 @@ OutStreamFactory::OutStreamFactory(CompressKind compress_kind, uint32_t stream_b _compressor = NULL; break; +#ifdef DORIS_WITH_LZO case COMPRESS_LZO: _compressor = lzo_compress; break; +#endif case COMPRESS_LZ4: _compressor = lz4_compress; diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 9bfaa26659..e45a75bfe8 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -562,6 +562,15 @@ OLAPStatus PushHandler::_convert( if (_request.__isset.need_decompress && _request.need_decompress) { need_decompress = true; } + +#ifndef DORIS_WITH_LZO + if (need_decompress) { + // if lzo is diabled, compressed data is not allowed here + res = OLAP_ERR_LZO_DISABLED; + break; + } +#endif + if (NULL == (reader = IBinaryReader::create(need_decompress))) { OLAP_LOG_WARNING("fail to create reader. [table='%s' file='%s']", curr_olap_table->full_name().c_str(), @@ -948,7 +957,9 @@ OLAPStatus BinaryFile::init(const char* path) { IBinaryReader* IBinaryReader::create(bool need_decompress) { IBinaryReader* reader = NULL; if (need_decompress) { +#ifdef DORIS_WITH_LZO reader = new(std::nothrow) LzoBinaryReader(); +#endif } else { reader = new(std::nothrow) BinaryReader(); } diff --git a/be/src/olap/segment_reader.cpp b/be/src/olap/segment_reader.cpp index c05455c1d7..2c74199a2b 100644 --- a/be/src/olap/segment_reader.cpp +++ b/be/src/olap/segment_reader.cpp @@ -168,10 +168,12 @@ OLAPStatus SegmentReader::_set_decompressor() { _decompressor = NULL; break; } +#ifdef DORIS_WITH_LZO case COMPRESS_LZO: { _decompressor = lzo_decompress; break; } +#endif case COMPRESS_LZ4: { _decompressor = lz4_decompress; break; diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp index 3cefd9024a..d992e3f21c 100644 --- a/be/src/olap/utils.cpp +++ b/be/src/olap/utils.cpp @@ -29,8 +29,12 @@ #include #include #include + +#ifdef DORIS_WITH_LZO #include #include +#endif + #include #include "common/logging.h" @@ -266,6 +270,8 @@ OLAPStatus olap_compress(const char* src_buf, *written_len = dest_len; switch (compression_type) { + +#ifdef DORIS_WITH_LZO case OLAP_COMP_TRANSPORT: { // A small buffer(hundreds of bytes) for LZO1X unsigned char mem[LZO1X_1_MEM_COMPRESS]; @@ -318,6 +324,8 @@ OLAPStatus olap_compress(const char* src_buf, } break; } +#endif + case OLAP_COMP_LZ4: { // int lz4_res = LZ4_compress_limitedOutput(src_buf, dest_buf, src_len, dest_len); int lz4_res = LZ4_compress_default(src_buf, dest_buf, src_len, dest_len); @@ -355,6 +363,8 @@ OLAPStatus olap_decompress(const char* src_buf, *written_len = dest_len; switch (compression_type) { + +#ifdef DORIS_WITH_LZO case OLAP_COMP_TRANSPORT: { int lzo_res = lzo1x_decompress_safe(reinterpret_cast(src_buf), src_len, @@ -403,6 +413,8 @@ OLAPStatus olap_decompress(const char* src_buf, } break; } +#endif + case OLAP_COMP_LZ4: { int lz4_res = LZ4_decompress_safe(src_buf, dest_buf, src_len, dest_len); *written_len = lz4_res; @@ -419,6 +431,7 @@ OLAPStatus olap_decompress(const char* src_buf, break; } default: + LOG(FATAL) << "unknown compress kind. kind=" << compression_type; break; } return OLAP_SUCCESS; diff --git a/be/test/exec/CMakeLists.txt b/be/test/exec/CMakeLists.txt index 79e33f5b8f..3dd76e3df5 100644 --- a/be/test/exec/CMakeLists.txt +++ b/be/test/exec/CMakeLists.txt @@ -39,7 +39,9 @@ ADD_BE_TEST(plain_text_line_reader_uncompressed_test) ADD_BE_TEST(plain_text_line_reader_gzip_test) ADD_BE_TEST(plain_text_line_reader_bzip_test) ADD_BE_TEST(plain_text_line_reader_lz4frame_test) +if(DEFINED DORIS_WITH_LZO) ADD_BE_TEST(plain_text_line_reader_lzop_test) +endif() ADD_BE_TEST(broker_reader_test) ADD_BE_TEST(broker_scanner_test) ADD_BE_TEST(broker_scan_node_test) diff --git a/be/test/olap/column_reader_test.cpp b/be/test/olap/column_reader_test.cpp index e1f68eb104..3574e6f794 100644 --- a/be/test/olap/column_reader_test.cpp +++ b/be/test/olap/column_reader_test.cpp @@ -67,7 +67,7 @@ public: _offsets.push_back(0); _stream_factory = - new(std::nothrow) OutStreamFactory(COMPRESS_LZO, + new(std::nothrow) OutStreamFactory(COMPRESS_LZ4, OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE); ASSERT_TRUE(_stream_factory != NULL); config::column_dictionary_key_ration_threshold = 30; @@ -191,7 +191,7 @@ public: &_shared_buffer, off[i], length[i], - lzo_decompress, + lz4_decompress, buffer_size[i], &_stats); ASSERT_EQ(OLAP_SUCCESS, in_stream->init()); diff --git a/be/test/olap/run_length_byte_test.cpp b/be/test/olap/run_length_byte_test.cpp index 2702e67b7d..416037947b 100755 --- a/be/test/olap/run_length_byte_test.cpp +++ b/be/test/olap/run_length_byte_test.cpp @@ -204,7 +204,7 @@ TEST(TestStream, UncompressInStream) { TEST(TestStream, CompressOutStream) { // write data OutStream *out_stream = - new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress); + new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress); ASSERT_TRUE(out_stream != NULL); ASSERT_TRUE(out_stream->_compressor != NULL); @@ -223,7 +223,8 @@ TEST(TestStream, CompressOutStream) { StreamHead head; (*it)->get((char *)&head, sizeof(head)); ASSERT_EQ(head.type, StreamHead::COMPRESSED); - ASSERT_EQ(head.length, 49); + // if lzo, this should be 49 + ASSERT_EQ(51, head.length); SAFE_DELETE_ARRAY(write_data); SAFE_DELETE(out_stream); @@ -232,7 +233,7 @@ TEST(TestStream, CompressOutStream) { TEST(TestStream, CompressOutStream2) { // write data OutStream *out_stream = - new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress); + new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress); ASSERT_TRUE(out_stream != NULL); ASSERT_TRUE(out_stream->_compressor != NULL); @@ -250,12 +251,12 @@ TEST(TestStream, CompressOutStream2) { } std::vector offsets; offsets.push_back(0); - offsets.push_back(57); + offsets.push_back(59); // if lzo, this shoudl be 57 InStream *in_stream = new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), - lzo_decompress, + lz4_decompress, out_stream->get_total_buffer_size()); char data; @@ -275,7 +276,7 @@ TEST(TestStream, CompressOutStream2) { TEST(TestStream, CompressOutStream3) { // write data OutStream *out_stream = - new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress); + new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress); ASSERT_TRUE(out_stream != NULL); ASSERT_TRUE(out_stream->_compressor != NULL); @@ -302,7 +303,7 @@ TEST(TestStream, CompressOutStream3) { new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), - lzo_decompress, + lz4_decompress, out_stream->get_total_buffer_size()); char data; @@ -324,7 +325,7 @@ TEST(TestStream, CompressOutStream3) { TEST(TestStream, CompressOutStream4) { // write data OutStream *out_stream = - new(std::nothrow) OutStream(18, lzo_compress); + new(std::nothrow) OutStream(18, lz4_compress); ASSERT_TRUE(out_stream != NULL); ASSERT_TRUE(out_stream->_compressor != NULL); @@ -354,7 +355,7 @@ TEST(TestStream, CompressOutStream4) { new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), - lzo_decompress, + lz4_decompress, out_stream->get_total_buffer_size()); char data; @@ -382,7 +383,7 @@ TEST(TestStream, CompressOutStream4) { TEST(TestStream, CompressMassOutStream) { // write data OutStream *out_stream = - new(std::nothrow) OutStream(100, lzo_compress); + new(std::nothrow) OutStream(100, lz4_compress); ASSERT_TRUE(out_stream != NULL); ASSERT_TRUE(out_stream->_compressor != NULL); @@ -405,12 +406,12 @@ TEST(TestStream, CompressMassOutStream) { } std::vector offsets; offsets.push_back(0); - offsets.push_back(17); + offsets.push_back(19); // if lzo, this should be 17 InStream *in_stream = new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), - lzo_decompress, + lz4_decompress, out_stream->get_total_buffer_size()); SAFE_DELETE(out_stream); @@ -432,7 +433,7 @@ TEST(TestStream, CompressMassOutStream) { TEST(TestStream, CompressInStream) { // write data OutStream *out_stream = - new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress); + new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress); ASSERT_TRUE(out_stream != NULL); ASSERT_TRUE(out_stream->_compressor != NULL); @@ -454,7 +455,7 @@ TEST(TestStream, CompressInStream) { InStream *in_stream = new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), - lzo_decompress, + lz4_decompress, out_stream->get_total_buffer_size()); ASSERT_EQ(in_stream->available(), OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE); char data; @@ -568,7 +569,7 @@ TEST(TestStream, SkipUncompress) { TEST(TestStream, SeekCompress) { // write data OutStream *out_stream = - new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress); + new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress); ASSERT_TRUE(out_stream != NULL); for (int32_t i = 0; i < 10; i++) { @@ -596,7 +597,7 @@ TEST(TestStream, SeekCompress) { new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), - lzo_decompress, + lz4_decompress, out_stream->get_total_buffer_size()); //ASSERT_EQ(in_stream->available(), 2); char buffer[256]; @@ -620,7 +621,7 @@ TEST(TestStream, SeekCompress) { TEST(TestStream, SkipCompress) { // write data OutStream *out_stream = - new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress); + new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress); ASSERT_TRUE(out_stream != NULL); for (int32_t i = 0; i < 10; i++) { @@ -642,7 +643,7 @@ TEST(TestStream, SkipCompress) { new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), - lzo_decompress, + lz4_decompress, out_stream->get_total_buffer_size()); in_stream->skip(10); diff --git a/build.sh b/build.sh index fc8575d208..a27f6fdf23 100755 --- a/build.sh +++ b/build.sh @@ -51,15 +51,18 @@ Usage: $0 --be build Backend --fe build Frontend --clean clean and build target - --with-mysql enable MySQL support - --without-mysql disable MySQL support + --with-mysql enable MySQL support(default) + --without-mysql disable MySQL support + --with-lzo enable LZO compress support(default) + --without-lzo disable LZO compress support Eg. - $0 build Backend and Frontend without clean - $0 --be build Backend without clean - $0 --be --without-mysql build Backend with MySQL disable - $0 --fe --clean clean and build Frontend - $0 --fe --be --clean clean and build both Frontend and Backend + $0 build Backend and Frontend without clean + $0 --be build Backend without clean + $0 --be --without-mysql build Backend with MySQL disable + $0 --be --without-mysql --without-lzo build Backend with both MySQL and LZO disable + $0 --fe --clean clean and build Frontend + $0 --fe --be --clean clean and build both Frontend and Backend " exit 1 } @@ -73,6 +76,8 @@ OPTS=$(getopt \ -l 'clean' \ -l 'with-mysql' \ -l 'without-mysql' \ + -l 'with-lzo' \ + -l 'without-lzo' \ -l 'help' \ -- "$@") @@ -87,6 +92,7 @@ BUILD_FE= CLEAN= RUN_UT= WITH_MYSQL=ON +WITH_LZO=ON HELP=0 if [ $# == 1 ] ; then # defuat @@ -107,6 +113,8 @@ else --ut) RUN_UT=1 ; shift ;; --with-mysql) WITH_MYSQL=ON; shift ;; --without-mysql) WITH_MYSQL=OFF; shift ;; + --with-lzo) WITH_LZO=ON; shift ;; + --without-lzo) WITH_LZO=OFF; shift ;; -h) HELP=1; shift ;; --help) HELP=1; shift ;; --) shift ; break ;; @@ -126,10 +134,12 @@ if [ ${CLEAN} -eq 1 -a ${BUILD_BE} -eq 0 -a ${BUILD_FE} -eq 0 ]; then fi echo "Get params: - BUILD_BE -- $BUILD_BE - BUILD_FE -- $BUILD_FE - CLEAN -- $CLEAN - RUN_UT -- $RUN_UT + BUILD_BE -- $BUILD_BE + BUILD_FE -- $BUILD_FE + CLEAN -- $CLEAN + RUN_UT -- $RUN_UT + WITH_MYSQL -- $WITH_MYSQL + WITH_LZO -- $WITH_LZO " # Clean and build generated code @@ -150,7 +160,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then fi mkdir -p ${DORIS_HOME}/be/build/ cd ${DORIS_HOME}/be/build/ - cmake -DWITH_MYSQL=${WITH_MYSQL} ../ + cmake -DWITH_MYSQL=${WITH_MYSQL} -DWITH_LZO=${WITH_LZO} ../ make -j${PARALLEL} make install cd ${DORIS_HOME} diff --git a/fe/src/main/java/org/apache/doris/http/rest/SetConfigAction.java b/fe/src/main/java/org/apache/doris/http/rest/SetConfigAction.java index 4641a609b6..96e900de09 100644 --- a/fe/src/main/java/org/apache/doris/http/rest/SetConfigAction.java +++ b/fe/src/main/java/org/apache/doris/http/rest/SetConfigAction.java @@ -93,6 +93,7 @@ public class SetConfigAction extends RestBaseAction { try { ConfigBase.setConfigField(f, confVals.get(0)); } catch (Exception e) { + LOG.warn("failed to set config {}:{}", confKey, confVals.get(0), e); continue; } @@ -127,4 +128,4 @@ public class SetConfigAction extends RestBaseAction { public static void print(String msg) { System.out.println(System.currentTimeMillis() + " " + msg); } -} \ No newline at end of file +} diff --git a/run-ut.sh b/run-ut.sh index 1502917539..69c7dc20d9 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -168,7 +168,9 @@ ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_uncompressed_test ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_gzip_test ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_bzip_test ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lz4frame_test -${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lzop_test +if [ -f ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lzop_test ];then + ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lzop_test +fi ${DORIS_TEST_BINARY_DIR}/exec/broker_scanner_test ${DORIS_TEST_BINARY_DIR}/exec/broker_scan_node_test ${DORIS_TEST_BINARY_DIR}/exec/es_scan_node_test diff --git a/thirdparty/LICENSE.txt b/thirdparty/LICENSE.txt new file mode 100644 index 0000000000..f3b399b8bf --- /dev/null +++ b/thirdparty/LICENSE.txt @@ -0,0 +1,66 @@ +========================================================== +License information for Doris third-party dependencies +========================================================== + +This directory contains scripts which download and install several third-party +dependencies of Doris. And for producation builds, most of these dependencies +are statically linked into the Doris binaries. Some of dependencies are optional +for Doris build. + +Most of these dependencies' licenses are compatible with Apache License Version 2.0. +But serverl dependencies are under imcompatible licenses. + +This file will give an interpretation of these licenses. + +============================================================================================ +LZ4 +source: https://github.com/lz4/lz4 + +This repository uses 2 different licenses : +- all files in the `lib` directory use a BSD 2-Clause license +- all other files use a GPLv2 license, unless explicitly stated otherwise + +Doris only dependents on features implemented in the `lib` directory, which are under BSD 2-Clause license. + +============================================================================================ +RapidJSON +source: https://github.com/Tencent/rapidjson/ + +RapidJSON source code is licensed under the MIT License, except for the third-party components listed below +which are subject to different license terms. Your integration of RapidJSON into your own projects may require +compliance with the MIT License, as well as the other licenses applicable to the third-party components included +within RapidJSON. To avoid the problematic JSON license in your own projects, it's sufficient to exclude the +bin/jsonchecker/ directory, as it's the only code under the JSON license. + +Doris only dependents on the source files in `include/rapidjson` directory, which are under MIT License. + +============================================================================================ +RocksDB +source: https://github.com/facebook/rocksdb + +RocksDB is dual-licensed under both the GPLv2 (found in the COPYING file in the root directory) and +Apache 2.0 License (found in the LICENSE.Apache file in the root directory) + +Doris selects Apache 2.0 License of RocksDB. + +============================================================================================ +LZO +source: https://github.com/nemequ/lzo + +LZO is under GNU General Public License Version 2(GPL-2), which is not compatible with Apache 2.0 License. + +So LZO is an optional feature when building Doris from source. You can disable the LZO support by adding +`--without-lzo` argument in `build.sh` script. + +Disable LZO support does not affect normal use of Doris. + +============================================================================================ +MySQL +source: https://github.com/mysql/mysql-server + +MySQL is under GNU General Public License Version 2(GPL-2), which is not compatible with Apache 2.0 License. + +So MySQL is an optional feature when building Doris from source. You can disable the MySQL support by adding +`--without-mysql` argument in `build.sh` script. + +Disable MySQL support with disable the feature of visiting a MySQL table as an external table in Doris.