Make LZO support optional (#1263)
This commit is contained in:
@ -115,11 +115,16 @@ sh build.sh
|
||||
|
||||
After successfully building, it will install binary files in the directory `output/`.
|
||||
|
||||
## 5. Reporting Issues
|
||||
## 5. Licence Notice
|
||||
|
||||
License of some of third-party dependencies are not compatible with Apache 2.0 License. So you may have to disable
|
||||
some features of Doris to be complied with Apache 2.0 License. Details can be found in `thirdparty/LICENSE.txt`
|
||||
|
||||
## 6. Reporting Issues
|
||||
|
||||
If you find any bugs, please file a [GitHub issue](https://github.com/apache/incubator-doris/issues).
|
||||
|
||||
## 6. Links
|
||||
## 7. Links
|
||||
|
||||
* Doris official site - <http://doris.incubator.apache.org>
|
||||
* User Manual (GitHub Wiki) - <https://github.com/apache/incubator-doris/wiki>
|
||||
|
||||
@ -152,8 +152,10 @@ set_target_properties(thrift PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/
|
||||
add_library(thriftnb STATIC IMPORTED)
|
||||
set_target_properties(thriftnb PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libthriftnb.a)
|
||||
|
||||
add_library(lzo STATIC IMPORTED)
|
||||
set_target_properties(lzo PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/liblzo2.a)
|
||||
if(WITH_LZO)
|
||||
add_library(lzo STATIC IMPORTED)
|
||||
set_target_properties(lzo PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/liblzo2.a)
|
||||
endif()
|
||||
|
||||
if (WITH_MYSQL)
|
||||
add_library(mysql STATIC IMPORTED)
|
||||
@ -281,6 +283,10 @@ if (WITH_MYSQL)
|
||||
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DDORIS_WITH_MYSQL")
|
||||
endif()
|
||||
|
||||
if (WITH_LZO)
|
||||
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DDORIS_WITH_LZO")
|
||||
endif()
|
||||
|
||||
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
|
||||
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -faligned-new")
|
||||
endif()
|
||||
@ -464,7 +470,6 @@ set(DORIS_DEPENDENCIES
|
||||
librdkafka_cpp
|
||||
librdkafka
|
||||
libs2
|
||||
lzo
|
||||
snappy
|
||||
${Boost_LIBRARIES}
|
||||
${LLVM_MODULE_LIBS}
|
||||
@ -487,6 +492,12 @@ set(DORIS_DEPENDENCIES
|
||||
${WL_END_GROUP}
|
||||
)
|
||||
|
||||
if(WITH_LZO)
|
||||
set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES}
|
||||
lzo
|
||||
)
|
||||
endif()
|
||||
|
||||
# Add all external dependencies. They should come after the palo libs.
|
||||
# static link gcc's lib
|
||||
set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS}
|
||||
|
||||
@ -101,6 +101,12 @@ if (WITH_MYSQL)
|
||||
)
|
||||
endif()
|
||||
|
||||
if (WITH_LZO)
|
||||
set(EXEC_FILES ${EXEC_FILES}
|
||||
lzo_decompressor.cpp
|
||||
)
|
||||
endif()
|
||||
|
||||
if(EXISTS "${BASE_DIR}/src/exec/kudu_util.cpp")
|
||||
set(EXEC_FILES ${EXEC_FILES}
|
||||
#kudu_scan_node.cpp
|
||||
|
||||
@ -37,9 +37,11 @@ Status Decompressor::create_decompressor(CompressType type,
|
||||
case CompressType::LZ4FRAME:
|
||||
*decompressor = new Lz4FrameDecompressor();
|
||||
break;
|
||||
#ifdef DORIS_WITH_LZO
|
||||
case CompressType::LZOP:
|
||||
*decompressor = new LzopDecompressor();
|
||||
break;
|
||||
#endif
|
||||
default:
|
||||
std::stringstream ss;
|
||||
ss << "Unknown compress type: " << type;
|
||||
@ -341,383 +343,4 @@ size_t Lz4FrameDecompressor::get_block_size(const LZ4F_frameInfo_t* info) {
|
||||
}
|
||||
}
|
||||
|
||||
// Lzop
|
||||
const uint8_t LzopDecompressor::LZOP_MAGIC[9] =
|
||||
{ 0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a };
|
||||
|
||||
const uint64_t LzopDecompressor::LZOP_VERSION = 0x1030;
|
||||
const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100;
|
||||
// magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1)
|
||||
// + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1)
|
||||
// without the real file name, extra field and checksum
|
||||
const uint32_t LzopDecompressor::MIN_HEADER_SIZE = 34;
|
||||
const uint32_t LzopDecompressor::LZO_MAX_BLOCK_SIZE = (64*1024l*1024l);
|
||||
|
||||
const uint32_t LzopDecompressor::CRC32_INIT_VALUE = 0;
|
||||
const uint32_t LzopDecompressor::ADLER32_INIT_VALUE = 1;
|
||||
|
||||
const uint64_t LzopDecompressor::F_H_CRC32 = 0x00001000L;
|
||||
const uint64_t LzopDecompressor::F_MASK = 0x00003FFFL;
|
||||
const uint64_t LzopDecompressor::F_OS_MASK = 0xff000000L;
|
||||
const uint64_t LzopDecompressor::F_CS_MASK = 0x00f00000L;
|
||||
const uint64_t LzopDecompressor::F_RESERVED = ((F_MASK | F_OS_MASK | F_CS_MASK) ^ 0xffffffffL);
|
||||
const uint64_t LzopDecompressor::F_MULTIPART = 0x00000400L;
|
||||
const uint64_t LzopDecompressor::F_H_FILTER = 0x00000800L;
|
||||
const uint64_t LzopDecompressor::F_H_EXTRA_FIELD = 0x00000040L;
|
||||
const uint64_t LzopDecompressor::F_CRC32_C = 0x00000200L;
|
||||
const uint64_t LzopDecompressor::F_ADLER32_C = 0x00000002L;
|
||||
const uint64_t LzopDecompressor::F_CRC32_D = 0x00000100L;
|
||||
const uint64_t LzopDecompressor::F_ADLER32_D = 0x00000001L;
|
||||
|
||||
LzopDecompressor::~LzopDecompressor() {
|
||||
}
|
||||
|
||||
Status LzopDecompressor::init() {
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
Status LzopDecompressor::decompress(
|
||||
uint8_t* input, size_t input_len, size_t* input_bytes_read,
|
||||
uint8_t* output, size_t output_max_len,
|
||||
size_t* decompressed_len, bool* stream_end,
|
||||
size_t* more_input_bytes, size_t* more_output_bytes) {
|
||||
|
||||
if (!_is_header_loaded) {
|
||||
// this is the first time to call lzo decompress, parse the header info first
|
||||
RETURN_IF_ERROR(parse_header_info(input, input_len, input_bytes_read, more_input_bytes));
|
||||
if (*more_input_bytes > 0) {
|
||||
return Status::OK;
|
||||
}
|
||||
}
|
||||
|
||||
// LOG(INFO) << "after load header: " << *input_bytes_read;
|
||||
|
||||
// read compressed block
|
||||
// compressed-block ::=
|
||||
// <uncompressed-size>
|
||||
// <compressed-size>
|
||||
// <uncompressed-checksums>
|
||||
// <compressed-checksums>
|
||||
// <compressed-data>
|
||||
int left_input_len = input_len - *input_bytes_read;
|
||||
if (left_input_len < sizeof(uint32_t)) {
|
||||
// block is at least have uncompressed_size
|
||||
*more_input_bytes = sizeof(uint32_t) - left_input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
uint8_t* block_start = input + *input_bytes_read;
|
||||
uint8_t* ptr = block_start;
|
||||
// 1. uncompressed size
|
||||
uint32_t uncompressed_size;
|
||||
ptr = get_uint32(ptr, &uncompressed_size);
|
||||
left_input_len -= sizeof(uint32_t);
|
||||
if (uncompressed_size == 0) {
|
||||
*stream_end = true;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
// 2. compressed size
|
||||
if (left_input_len < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - left_input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
uint32_t compressed_size;
|
||||
ptr = get_uint32(ptr, &compressed_size);
|
||||
left_input_len -= sizeof(uint32_t);
|
||||
if (compressed_size > LZO_MAX_BLOCK_SIZE) {
|
||||
std::stringstream ss;
|
||||
ss << "lzo block size: " << compressed_size << " is greater than LZO_MAX_BLOCK_SIZE: "
|
||||
<< LZO_MAX_BLOCK_SIZE;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// 3. out checksum
|
||||
uint32_t out_checksum = 0;
|
||||
if (_header_info.output_checksum_type != CHECK_NONE) {
|
||||
if (left_input_len < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - left_input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
ptr = get_uint32(ptr, &out_checksum);
|
||||
left_input_len -= sizeof(uint32_t);
|
||||
}
|
||||
|
||||
// 4. in checksum
|
||||
uint32_t in_checksum = 0;
|
||||
if (compressed_size < uncompressed_size && _header_info.input_checksum_type != CHECK_NONE) {
|
||||
if (left_input_len < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - left_input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
ptr = get_uint32(ptr, &out_checksum);
|
||||
left_input_len -= sizeof(uint32_t);
|
||||
} else {
|
||||
// If the compressed data size is equal to the uncompressed data size, then
|
||||
// the uncompressed data is stored and there is no compressed checksum.
|
||||
in_checksum = out_checksum;
|
||||
}
|
||||
|
||||
// 5. checksum compressed data
|
||||
if (left_input_len < compressed_size) {
|
||||
*more_input_bytes = compressed_size - left_input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
RETURN_IF_ERROR(checksum(_header_info.input_checksum_type,
|
||||
"compressed", in_checksum, ptr, compressed_size));
|
||||
|
||||
// 6. decompress
|
||||
if (output_max_len < uncompressed_size) {
|
||||
*more_output_bytes = uncompressed_size - output_max_len;
|
||||
return Status::OK;
|
||||
}
|
||||
if (compressed_size == uncompressed_size) {
|
||||
// the data is uncompressed, just copy to the output buf
|
||||
memmove(output, ptr, compressed_size);
|
||||
ptr += compressed_size;
|
||||
} else {
|
||||
// decompress
|
||||
*decompressed_len = uncompressed_size;
|
||||
int ret = lzo1x_decompress_safe(ptr, compressed_size,
|
||||
output, reinterpret_cast<lzo_uint*>(&uncompressed_size), nullptr);
|
||||
if (ret != LZO_E_OK || uncompressed_size != *decompressed_len) {
|
||||
std::stringstream ss;
|
||||
ss << "Lzo decompression failed with ret: " << ret
|
||||
<< " decompressed len: " << uncompressed_size
|
||||
<< " expected: " << *decompressed_len;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, "decompressed",
|
||||
out_checksum, output, uncompressed_size));
|
||||
ptr += compressed_size;
|
||||
}
|
||||
|
||||
// 7. peek next block's uncompressed size
|
||||
uint32_t next_uncompressed_size;
|
||||
get_uint32(ptr, &next_uncompressed_size);
|
||||
if (next_uncompressed_size == 0) {
|
||||
// 0 means current block is the last block.
|
||||
// consume this uncompressed_size to finish reading.
|
||||
ptr += sizeof(uint32_t);
|
||||
}
|
||||
|
||||
// 8. done
|
||||
*stream_end = true;
|
||||
*decompressed_len = uncompressed_size;
|
||||
*input_bytes_read += ptr - block_start;
|
||||
|
||||
LOG(INFO) << "finished decompress lzo block."
|
||||
<< " compressed_size: " << compressed_size
|
||||
<< " decompressed_len: " << *decompressed_len
|
||||
<< " input_bytes_read: " << *input_bytes_read
|
||||
<< " next_uncompressed_size: " << next_uncompressed_size;
|
||||
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
// file-header ::= -- most of this information is not used.
|
||||
// <magic>
|
||||
// <version>
|
||||
// <lib-version>
|
||||
// [<version-needed>] -- present for all modern files.
|
||||
// <method>
|
||||
// <level>
|
||||
// <flags>
|
||||
// <mode>
|
||||
// <mtime>
|
||||
// <file-name>
|
||||
// <header-checksum>
|
||||
// <extra-field> -- presence indicated in flags, not currently used.
|
||||
Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len,
|
||||
size_t* input_bytes_read,
|
||||
size_t* more_input_bytes) {
|
||||
if (input_len < MIN_HEADER_SIZE) {
|
||||
LOG(INFO) << "highly recommanded that Lzo header size is larger than " << MIN_HEADER_SIZE
|
||||
<< ", or parsing header info may failed."
|
||||
<< " only given: " << input_len;
|
||||
*more_input_bytes = MIN_HEADER_SIZE - input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
uint8_t* ptr = input;
|
||||
// 1. magic
|
||||
if (memcmp(ptr, LZOP_MAGIC, sizeof(LZOP_MAGIC))) {
|
||||
std::stringstream ss;
|
||||
ss << "invalid lzo magic number";
|
||||
return Status(ss.str());
|
||||
}
|
||||
ptr += sizeof(LZOP_MAGIC);
|
||||
uint8_t* header = ptr;
|
||||
|
||||
// 2. version
|
||||
ptr = get_uint16(ptr, &_header_info.version);
|
||||
if (_header_info.version > LZOP_VERSION) {
|
||||
std::stringstream ss;
|
||||
ss << "compressed with later version of lzop: " << &_header_info.version
|
||||
<< " must be less than: " << LZOP_VERSION;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// 3. lib version
|
||||
ptr = get_uint16(ptr, &_header_info.lib_version);
|
||||
if (_header_info.lib_version < MIN_LZO_VERSION) {
|
||||
std::stringstream ss;
|
||||
ss << "compressed with incompatible lzo version: " << &_header_info.lib_version
|
||||
<< "must be at least: " << MIN_LZO_VERSION;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// 4. version needed
|
||||
ptr = get_uint16(ptr, &_header_info.version_needed);
|
||||
if (_header_info.version_needed > LZOP_VERSION) {
|
||||
std::stringstream ss;
|
||||
ss << "compressed with imp incompatible lzo version: " << &_header_info.version
|
||||
<< " must be at no more than: " << LZOP_VERSION;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// 5. method
|
||||
ptr = get_uint8(ptr, &_header_info.method);
|
||||
if (_header_info.method < 1 || _header_info.method > 3) {
|
||||
std::stringstream ss;
|
||||
ss << "invalid compression method: " << _header_info.method;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// 6. skip level
|
||||
++ptr;
|
||||
|
||||
// 7. flags
|
||||
uint32_t flags;
|
||||
ptr = get_uint32(ptr, &flags);
|
||||
if (flags & (F_RESERVED | F_MULTIPART | F_H_FILTER)) {
|
||||
std::stringstream ss;
|
||||
ss << "unsupported lzo flags: " << flags;
|
||||
return Status(ss.str());
|
||||
}
|
||||
_header_info.header_checksum_type = header_type(flags);
|
||||
_header_info.input_checksum_type = input_type(flags);
|
||||
_header_info.output_checksum_type = output_type(flags);
|
||||
|
||||
// 8. skip mode and mtime
|
||||
ptr += 3 * sizeof(int32_t);
|
||||
|
||||
// 9. filename
|
||||
uint8_t filename_len;
|
||||
ptr = get_uint8(ptr, &filename_len);
|
||||
|
||||
// here we already consume (MIN_HEADER_SIZE)
|
||||
// from now we have to check left input is enough for each step
|
||||
size_t left = input_len - (ptr - input);
|
||||
if (left < filename_len) {
|
||||
*more_input_bytes = filename_len - left;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
_header_info.filename = std::string((char*) ptr, (size_t) filename_len);
|
||||
ptr += filename_len;
|
||||
left -= filename_len;
|
||||
|
||||
// 10. checksum
|
||||
if (left < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - left;
|
||||
return Status::OK;
|
||||
}
|
||||
uint32_t expected_checksum;
|
||||
uint8_t* cur = ptr;
|
||||
ptr = get_uint32(ptr, &expected_checksum);
|
||||
uint32_t computed_checksum;
|
||||
if (_header_info.header_checksum_type == CHECK_CRC32) {
|
||||
computed_checksum = CRC32_INIT_VALUE;
|
||||
computed_checksum = lzo_crc32(computed_checksum, header, cur - header);
|
||||
} else {
|
||||
computed_checksum = ADLER32_INIT_VALUE;
|
||||
computed_checksum = lzo_adler32(computed_checksum, header, cur - header);
|
||||
}
|
||||
|
||||
if (computed_checksum != expected_checksum) {
|
||||
std::stringstream ss;
|
||||
ss << "invalid header checksum: " << computed_checksum
|
||||
<< " expected: " << expected_checksum;
|
||||
return Status(ss.str());
|
||||
}
|
||||
left -= sizeof(uint32_t);
|
||||
|
||||
// 11. skip extra
|
||||
if (flags & F_H_EXTRA_FIELD) {
|
||||
if (left < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - left;
|
||||
return Status::OK;
|
||||
}
|
||||
uint32_t extra_len;
|
||||
ptr = get_uint32(ptr, &extra_len);
|
||||
left -= sizeof(uint32_t);
|
||||
|
||||
// add the checksum and the len to the total ptr size.
|
||||
if (left < sizeof(int32_t) + extra_len) {
|
||||
*more_input_bytes = sizeof(int32_t) + extra_len - left;
|
||||
return Status::OK;
|
||||
}
|
||||
left -= sizeof(int32_t) + extra_len;
|
||||
ptr += sizeof(int32_t) + extra_len;
|
||||
}
|
||||
|
||||
_header_info.header_size = ptr - input;
|
||||
*input_bytes_read = _header_info.header_size;
|
||||
|
||||
_is_header_loaded = true;
|
||||
LOG(INFO) << debug_info();
|
||||
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
Status LzopDecompressor::checksum(LzoChecksum type, const std::string& source,
|
||||
uint32_t expected,
|
||||
uint8_t* ptr, size_t len) {
|
||||
uint32_t computed_checksum;
|
||||
switch (type) {
|
||||
case CHECK_NONE:
|
||||
return Status::OK;
|
||||
case CHECK_CRC32:
|
||||
computed_checksum = lzo_crc32(CRC32_INIT_VALUE, ptr, len);
|
||||
break;
|
||||
case CHECK_ADLER:
|
||||
computed_checksum = lzo_adler32(ADLER32_INIT_VALUE, ptr, len);
|
||||
break;
|
||||
default:
|
||||
std::stringstream ss;
|
||||
ss << "Invalid checksum type: " << type;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
if (computed_checksum != expected) {
|
||||
std::stringstream ss;
|
||||
ss << "checksum of " << source << " block failed."
|
||||
<< " computed checksum: " << computed_checksum
|
||||
<< " expected: " << expected;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
std::string LzopDecompressor::debug_info() {
|
||||
std::stringstream ss;
|
||||
ss << "LzopDecompressor."
|
||||
<< " version: " << _header_info.version
|
||||
<< " lib version: " << _header_info.lib_version
|
||||
<< " version needed: " << _header_info.version_needed
|
||||
<< " method: " << (uint16_t) _header_info.method
|
||||
<< " filename: " << _header_info.filename
|
||||
<< " header size: " << _header_info.header_size
|
||||
<< " header checksum type: " << _header_info.header_checksum_type
|
||||
<< " input checksum type: " << _header_info.input_checksum_type
|
||||
<< " ouput checksum type: " << _header_info.output_checksum_type;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
@ -20,8 +20,11 @@
|
||||
#include <zlib.h>
|
||||
#include <bzlib.h>
|
||||
#include <lz4/lz4frame.h>
|
||||
|
||||
#ifdef DORIS_WITH_LZO
|
||||
#include <lzo/lzoconf.h>
|
||||
#include <lzo/lzo1x.h>
|
||||
#endif
|
||||
|
||||
#include "common/status.h"
|
||||
|
||||
@ -148,6 +151,7 @@ private:
|
||||
const static unsigned DORIS_LZ4F_VERSION;
|
||||
};
|
||||
|
||||
#ifdef DORIS_WITH_LZO
|
||||
class LzopDecompressor : public Decompressor {
|
||||
public:
|
||||
virtual ~LzopDecompressor();
|
||||
@ -255,5 +259,6 @@ private:
|
||||
const static uint64_t F_CRC32_D;
|
||||
const static uint64_t F_ADLER32_D;
|
||||
};
|
||||
#endif // DORIS_WITH_LZO
|
||||
|
||||
} // namespace
|
||||
|
||||
403
be/src/exec/lzo_decompressor.cpp
Normal file
403
be/src/exec/lzo_decompressor.cpp
Normal file
@ -0,0 +1,403 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "exec/decompressor.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
#ifdef DORIS_WITH_LZO
|
||||
// Lzop
|
||||
const uint8_t LzopDecompressor::LZOP_MAGIC[9] =
|
||||
{ 0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a };
|
||||
|
||||
const uint64_t LzopDecompressor::LZOP_VERSION = 0x1030;
|
||||
const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100;
|
||||
// magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1)
|
||||
// + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1)
|
||||
// without the real file name, extra field and checksum
|
||||
const uint32_t LzopDecompressor::MIN_HEADER_SIZE = 34;
|
||||
const uint32_t LzopDecompressor::LZO_MAX_BLOCK_SIZE = (64*1024l*1024l);
|
||||
|
||||
const uint32_t LzopDecompressor::CRC32_INIT_VALUE = 0;
|
||||
const uint32_t LzopDecompressor::ADLER32_INIT_VALUE = 1;
|
||||
|
||||
const uint64_t LzopDecompressor::F_H_CRC32 = 0x00001000L;
|
||||
const uint64_t LzopDecompressor::F_MASK = 0x00003FFFL;
|
||||
const uint64_t LzopDecompressor::F_OS_MASK = 0xff000000L;
|
||||
const uint64_t LzopDecompressor::F_CS_MASK = 0x00f00000L;
|
||||
const uint64_t LzopDecompressor::F_RESERVED = ((F_MASK | F_OS_MASK | F_CS_MASK) ^ 0xffffffffL);
|
||||
const uint64_t LzopDecompressor::F_MULTIPART = 0x00000400L;
|
||||
const uint64_t LzopDecompressor::F_H_FILTER = 0x00000800L;
|
||||
const uint64_t LzopDecompressor::F_H_EXTRA_FIELD = 0x00000040L;
|
||||
const uint64_t LzopDecompressor::F_CRC32_C = 0x00000200L;
|
||||
const uint64_t LzopDecompressor::F_ADLER32_C = 0x00000002L;
|
||||
const uint64_t LzopDecompressor::F_CRC32_D = 0x00000100L;
|
||||
const uint64_t LzopDecompressor::F_ADLER32_D = 0x00000001L;
|
||||
|
||||
LzopDecompressor::~LzopDecompressor() {
|
||||
}
|
||||
|
||||
Status LzopDecompressor::init() {
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
Status LzopDecompressor::decompress(
|
||||
uint8_t* input, size_t input_len, size_t* input_bytes_read,
|
||||
uint8_t* output, size_t output_max_len,
|
||||
size_t* decompressed_len, bool* stream_end,
|
||||
size_t* more_input_bytes, size_t* more_output_bytes) {
|
||||
|
||||
if (!_is_header_loaded) {
|
||||
// this is the first time to call lzo decompress, parse the header info first
|
||||
RETURN_IF_ERROR(parse_header_info(input, input_len, input_bytes_read, more_input_bytes));
|
||||
if (*more_input_bytes > 0) {
|
||||
return Status::OK;
|
||||
}
|
||||
}
|
||||
|
||||
// LOG(INFO) << "after load header: " << *input_bytes_read;
|
||||
|
||||
// read compressed block
|
||||
// compressed-block ::=
|
||||
// <uncompressed-size>
|
||||
// <compressed-size>
|
||||
// <uncompressed-checksums>
|
||||
// <compressed-checksums>
|
||||
// <compressed-data>
|
||||
int left_input_len = input_len - *input_bytes_read;
|
||||
if (left_input_len < sizeof(uint32_t)) {
|
||||
// block is at least have uncompressed_size
|
||||
*more_input_bytes = sizeof(uint32_t) - left_input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
uint8_t* block_start = input + *input_bytes_read;
|
||||
uint8_t* ptr = block_start;
|
||||
// 1. uncompressed size
|
||||
uint32_t uncompressed_size;
|
||||
ptr = get_uint32(ptr, &uncompressed_size);
|
||||
left_input_len -= sizeof(uint32_t);
|
||||
if (uncompressed_size == 0) {
|
||||
*stream_end = true;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
// 2. compressed size
|
||||
if (left_input_len < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - left_input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
uint32_t compressed_size;
|
||||
ptr = get_uint32(ptr, &compressed_size);
|
||||
left_input_len -= sizeof(uint32_t);
|
||||
if (compressed_size > LZO_MAX_BLOCK_SIZE) {
|
||||
std::stringstream ss;
|
||||
ss << "lzo block size: " << compressed_size << " is greater than LZO_MAX_BLOCK_SIZE: "
|
||||
<< LZO_MAX_BLOCK_SIZE;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// 3. out checksum
|
||||
uint32_t out_checksum = 0;
|
||||
if (_header_info.output_checksum_type != CHECK_NONE) {
|
||||
if (left_input_len < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - left_input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
ptr = get_uint32(ptr, &out_checksum);
|
||||
left_input_len -= sizeof(uint32_t);
|
||||
}
|
||||
|
||||
// 4. in checksum
|
||||
uint32_t in_checksum = 0;
|
||||
if (compressed_size < uncompressed_size && _header_info.input_checksum_type != CHECK_NONE) {
|
||||
if (left_input_len < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - left_input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
ptr = get_uint32(ptr, &out_checksum);
|
||||
left_input_len -= sizeof(uint32_t);
|
||||
} else {
|
||||
// If the compressed data size is equal to the uncompressed data size, then
|
||||
// the uncompressed data is stored and there is no compressed checksum.
|
||||
in_checksum = out_checksum;
|
||||
}
|
||||
|
||||
// 5. checksum compressed data
|
||||
if (left_input_len < compressed_size) {
|
||||
*more_input_bytes = compressed_size - left_input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
RETURN_IF_ERROR(checksum(_header_info.input_checksum_type,
|
||||
"compressed", in_checksum, ptr, compressed_size));
|
||||
|
||||
// 6. decompress
|
||||
if (output_max_len < uncompressed_size) {
|
||||
*more_output_bytes = uncompressed_size - output_max_len;
|
||||
return Status::OK;
|
||||
}
|
||||
if (compressed_size == uncompressed_size) {
|
||||
// the data is uncompressed, just copy to the output buf
|
||||
memmove(output, ptr, compressed_size);
|
||||
ptr += compressed_size;
|
||||
} else {
|
||||
// decompress
|
||||
*decompressed_len = uncompressed_size;
|
||||
int ret = lzo1x_decompress_safe(ptr, compressed_size,
|
||||
output, reinterpret_cast<lzo_uint*>(&uncompressed_size), nullptr);
|
||||
if (ret != LZO_E_OK || uncompressed_size != *decompressed_len) {
|
||||
std::stringstream ss;
|
||||
ss << "Lzo decompression failed with ret: " << ret
|
||||
<< " decompressed len: " << uncompressed_size
|
||||
<< " expected: " << *decompressed_len;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, "decompressed",
|
||||
out_checksum, output, uncompressed_size));
|
||||
ptr += compressed_size;
|
||||
}
|
||||
|
||||
// 7. peek next block's uncompressed size
|
||||
uint32_t next_uncompressed_size;
|
||||
get_uint32(ptr, &next_uncompressed_size);
|
||||
if (next_uncompressed_size == 0) {
|
||||
// 0 means current block is the last block.
|
||||
// consume this uncompressed_size to finish reading.
|
||||
ptr += sizeof(uint32_t);
|
||||
}
|
||||
|
||||
// 8. done
|
||||
*stream_end = true;
|
||||
*decompressed_len = uncompressed_size;
|
||||
*input_bytes_read += ptr - block_start;
|
||||
|
||||
LOG(INFO) << "finished decompress lzo block."
|
||||
<< " compressed_size: " << compressed_size
|
||||
<< " decompressed_len: " << *decompressed_len
|
||||
<< " input_bytes_read: " << *input_bytes_read
|
||||
<< " next_uncompressed_size: " << next_uncompressed_size;
|
||||
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
// file-header ::= -- most of this information is not used.
|
||||
// <magic>
|
||||
// <version>
|
||||
// <lib-version>
|
||||
// [<version-needed>] -- present for all modern files.
|
||||
// <method>
|
||||
// <level>
|
||||
// <flags>
|
||||
// <mode>
|
||||
// <mtime>
|
||||
// <file-name>
|
||||
// <header-checksum>
|
||||
// <extra-field> -- presence indicated in flags, not currently used.
|
||||
Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len,
|
||||
size_t* input_bytes_read,
|
||||
size_t* more_input_bytes) {
|
||||
if (input_len < MIN_HEADER_SIZE) {
|
||||
LOG(INFO) << "highly recommanded that Lzo header size is larger than " << MIN_HEADER_SIZE
|
||||
<< ", or parsing header info may failed."
|
||||
<< " only given: " << input_len;
|
||||
*more_input_bytes = MIN_HEADER_SIZE - input_len;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
uint8_t* ptr = input;
|
||||
// 1. magic
|
||||
if (memcmp(ptr, LZOP_MAGIC, sizeof(LZOP_MAGIC))) {
|
||||
std::stringstream ss;
|
||||
ss << "invalid lzo magic number";
|
||||
return Status(ss.str());
|
||||
}
|
||||
ptr += sizeof(LZOP_MAGIC);
|
||||
uint8_t* header = ptr;
|
||||
|
||||
// 2. version
|
||||
ptr = get_uint16(ptr, &_header_info.version);
|
||||
if (_header_info.version > LZOP_VERSION) {
|
||||
std::stringstream ss;
|
||||
ss << "compressed with later version of lzop: " << &_header_info.version
|
||||
<< " must be less than: " << LZOP_VERSION;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// 3. lib version
|
||||
ptr = get_uint16(ptr, &_header_info.lib_version);
|
||||
if (_header_info.lib_version < MIN_LZO_VERSION) {
|
||||
std::stringstream ss;
|
||||
ss << "compressed with incompatible lzo version: " << &_header_info.lib_version
|
||||
<< "must be at least: " << MIN_LZO_VERSION;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// 4. version needed
|
||||
ptr = get_uint16(ptr, &_header_info.version_needed);
|
||||
if (_header_info.version_needed > LZOP_VERSION) {
|
||||
std::stringstream ss;
|
||||
ss << "compressed with imp incompatible lzo version: " << &_header_info.version
|
||||
<< " must be at no more than: " << LZOP_VERSION;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// 5. method
|
||||
ptr = get_uint8(ptr, &_header_info.method);
|
||||
if (_header_info.method < 1 || _header_info.method > 3) {
|
||||
std::stringstream ss;
|
||||
ss << "invalid compression method: " << _header_info.method;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// 6. skip level
|
||||
++ptr;
|
||||
|
||||
// 7. flags
|
||||
uint32_t flags;
|
||||
ptr = get_uint32(ptr, &flags);
|
||||
if (flags & (F_RESERVED | F_MULTIPART | F_H_FILTER)) {
|
||||
std::stringstream ss;
|
||||
ss << "unsupported lzo flags: " << flags;
|
||||
return Status(ss.str());
|
||||
}
|
||||
_header_info.header_checksum_type = header_type(flags);
|
||||
_header_info.input_checksum_type = input_type(flags);
|
||||
_header_info.output_checksum_type = output_type(flags);
|
||||
|
||||
// 8. skip mode and mtime
|
||||
ptr += 3 * sizeof(int32_t);
|
||||
|
||||
// 9. filename
|
||||
uint8_t filename_len;
|
||||
ptr = get_uint8(ptr, &filename_len);
|
||||
|
||||
// here we already consume (MIN_HEADER_SIZE)
|
||||
// from now we have to check left input is enough for each step
|
||||
size_t left = input_len - (ptr - input);
|
||||
if (left < filename_len) {
|
||||
*more_input_bytes = filename_len - left;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
_header_info.filename = std::string((char*) ptr, (size_t) filename_len);
|
||||
ptr += filename_len;
|
||||
left -= filename_len;
|
||||
|
||||
// 10. checksum
|
||||
if (left < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - left;
|
||||
return Status::OK;
|
||||
}
|
||||
uint32_t expected_checksum;
|
||||
uint8_t* cur = ptr;
|
||||
ptr = get_uint32(ptr, &expected_checksum);
|
||||
uint32_t computed_checksum;
|
||||
if (_header_info.header_checksum_type == CHECK_CRC32) {
|
||||
computed_checksum = CRC32_INIT_VALUE;
|
||||
computed_checksum = lzo_crc32(computed_checksum, header, cur - header);
|
||||
} else {
|
||||
computed_checksum = ADLER32_INIT_VALUE;
|
||||
computed_checksum = lzo_adler32(computed_checksum, header, cur - header);
|
||||
}
|
||||
|
||||
if (computed_checksum != expected_checksum) {
|
||||
std::stringstream ss;
|
||||
ss << "invalid header checksum: " << computed_checksum
|
||||
<< " expected: " << expected_checksum;
|
||||
return Status(ss.str());
|
||||
}
|
||||
left -= sizeof(uint32_t);
|
||||
|
||||
// 11. skip extra
|
||||
if (flags & F_H_EXTRA_FIELD) {
|
||||
if (left < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - left;
|
||||
return Status::OK;
|
||||
}
|
||||
uint32_t extra_len;
|
||||
ptr = get_uint32(ptr, &extra_len);
|
||||
left -= sizeof(uint32_t);
|
||||
|
||||
// add the checksum and the len to the total ptr size.
|
||||
if (left < sizeof(int32_t) + extra_len) {
|
||||
*more_input_bytes = sizeof(int32_t) + extra_len - left;
|
||||
return Status::OK;
|
||||
}
|
||||
left -= sizeof(int32_t) + extra_len;
|
||||
ptr += sizeof(int32_t) + extra_len;
|
||||
}
|
||||
|
||||
_header_info.header_size = ptr - input;
|
||||
*input_bytes_read = _header_info.header_size;
|
||||
|
||||
_is_header_loaded = true;
|
||||
LOG(INFO) << debug_info();
|
||||
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
Status LzopDecompressor::checksum(LzoChecksum type, const std::string& source,
|
||||
uint32_t expected,
|
||||
uint8_t* ptr, size_t len) {
|
||||
uint32_t computed_checksum;
|
||||
switch (type) {
|
||||
case CHECK_NONE:
|
||||
return Status::OK;
|
||||
case CHECK_CRC32:
|
||||
computed_checksum = lzo_crc32(CRC32_INIT_VALUE, ptr, len);
|
||||
break;
|
||||
case CHECK_ADLER:
|
||||
computed_checksum = lzo_adler32(ADLER32_INIT_VALUE, ptr, len);
|
||||
break;
|
||||
default:
|
||||
std::stringstream ss;
|
||||
ss << "Invalid checksum type: " << type;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
if (computed_checksum != expected) {
|
||||
std::stringstream ss;
|
||||
ss << "checksum of " << source << " block failed."
|
||||
<< " computed checksum: " << computed_checksum
|
||||
<< " expected: " << expected;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
std::string LzopDecompressor::debug_info() {
|
||||
std::stringstream ss;
|
||||
ss << "LzopDecompressor."
|
||||
<< " version: " << _header_info.version
|
||||
<< " lib version: " << _header_info.lib_version
|
||||
<< " version needed: " << _header_info.version_needed
|
||||
<< " method: " << (uint16_t) _header_info.method
|
||||
<< " filename: " << _header_info.filename
|
||||
<< " header size: " << _header_info.header_size
|
||||
<< " header checksum type: " << _header_info.header_checksum_type
|
||||
<< " input checksum type: " << _header_info.input_checksum_type
|
||||
<< " ouput checksum type: " << _header_info.output_checksum_type;
|
||||
return ss.str();
|
||||
}
|
||||
#endif // DORIS_WITH_LZO
|
||||
|
||||
} // namespace
|
||||
@ -21,7 +21,6 @@
|
||||
#include <string>
|
||||
#include <errno.h>
|
||||
#include <algorithm>
|
||||
#include <lzo/lzo1x.h>
|
||||
|
||||
#include "gen_cpp/PaloInternalService_types.h"
|
||||
#include "olap_scanner.h"
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
#ifdef DORIS_WITH_LZO
|
||||
OLAPStatus lzo_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller) {
|
||||
size_t out_length = 0;
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
@ -59,6 +60,7 @@ OLAPStatus lzo_decompress(StorageByteBuffer* in, StorageByteBuffer* out) {
|
||||
|
||||
return res;
|
||||
}
|
||||
#endif
|
||||
|
||||
OLAPStatus lz4_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller) {
|
||||
size_t out_length = 0;
|
||||
|
||||
@ -43,8 +43,10 @@ typedef OLAPStatus(*Compressor)(StorageByteBuffer* in, StorageByteBuffer* out, b
|
||||
// OLAP_ERR_DECOMPRESS_ERROR - 解压缩错误
|
||||
typedef OLAPStatus(*Decompressor)(StorageByteBuffer* in, StorageByteBuffer* out);
|
||||
|
||||
#ifdef DORIS_WITH_LZO
|
||||
OLAPStatus lzo_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller);
|
||||
OLAPStatus lzo_decompress(StorageByteBuffer* in, StorageByteBuffer* out);
|
||||
#endif
|
||||
|
||||
OLAPStatus lz4_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller);
|
||||
OLAPStatus lz4_decompress(StorageByteBuffer* in, StorageByteBuffer* out);
|
||||
|
||||
@ -155,7 +155,7 @@ OLAPStatus InStream::_assure_data() {
|
||||
_compressed->get((char*)&head, sizeof(head));
|
||||
|
||||
if (head.length > _compress_buffer_size) {
|
||||
OLAP_LOG_WARNING("chunk size if larger than buffer size. [chunk=%u buffer_size=%u]",
|
||||
OLAP_LOG_WARNING("chunk size is larger than buffer size. [chunk=%u buffer_size=%u]",
|
||||
head.length, _compress_buffer_size);
|
||||
return OLAP_ERR_COLUMN_READ_STREAM;
|
||||
}
|
||||
@ -189,7 +189,7 @@ OLAPStatus InStream::_assure_data() {
|
||||
}
|
||||
} else {
|
||||
OLAP_LOG_WARNING("compressed remaining size less than stream head size. "
|
||||
"[compressed_remaining_size=%lu stream_head_size.=%lu]",
|
||||
"[compressed_remaining_size=%lu stream_head_size=%lu]",
|
||||
_compressed->remaining(), sizeof(StreamHead));
|
||||
return OLAP_ERR_COLUMN_READ_STREAM;
|
||||
}
|
||||
|
||||
@ -159,6 +159,7 @@ enum OLAPStatus {
|
||||
OLAP_ERR_INVALID_CLUSTER_INFO = -225,
|
||||
OLAP_ERR_TRANSACTION_NOT_EXIST = -226,
|
||||
OLAP_ERR_DISK_FAILURE = -227,
|
||||
OLAP_ERR_LZO_DISABLED = -228,
|
||||
|
||||
// CommandExecutor
|
||||
// [-300, -400)
|
||||
|
||||
@ -125,9 +125,6 @@ public:
|
||||
}
|
||||
void change_file_version_to_delta();
|
||||
private:
|
||||
// Compute schema hash(all fields name and type, index name and its field
|
||||
// names) using lzo_adler32 function.
|
||||
OLAPStatus _compute_schema_hash(SchemaHash* schema_hash);
|
||||
void _convert_file_version_to_delta(const FileVersionMessage& version, PDelta* delta);
|
||||
|
||||
// full path of olap header file
|
||||
|
||||
@ -32,9 +32,11 @@ OutStreamFactory::OutStreamFactory(CompressKind compress_kind, uint32_t stream_b
|
||||
_compressor = NULL;
|
||||
break;
|
||||
|
||||
#ifdef DORIS_WITH_LZO
|
||||
case COMPRESS_LZO:
|
||||
_compressor = lzo_compress;
|
||||
break;
|
||||
#endif
|
||||
|
||||
case COMPRESS_LZ4:
|
||||
_compressor = lz4_compress;
|
||||
|
||||
@ -562,6 +562,15 @@ OLAPStatus PushHandler::_convert(
|
||||
if (_request.__isset.need_decompress && _request.need_decompress) {
|
||||
need_decompress = true;
|
||||
}
|
||||
|
||||
#ifndef DORIS_WITH_LZO
|
||||
if (need_decompress) {
|
||||
// if lzo is diabled, compressed data is not allowed here
|
||||
res = OLAP_ERR_LZO_DISABLED;
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (NULL == (reader = IBinaryReader::create(need_decompress))) {
|
||||
OLAP_LOG_WARNING("fail to create reader. [table='%s' file='%s']",
|
||||
curr_olap_table->full_name().c_str(),
|
||||
@ -948,7 +957,9 @@ OLAPStatus BinaryFile::init(const char* path) {
|
||||
IBinaryReader* IBinaryReader::create(bool need_decompress) {
|
||||
IBinaryReader* reader = NULL;
|
||||
if (need_decompress) {
|
||||
#ifdef DORIS_WITH_LZO
|
||||
reader = new(std::nothrow) LzoBinaryReader();
|
||||
#endif
|
||||
} else {
|
||||
reader = new(std::nothrow) BinaryReader();
|
||||
}
|
||||
|
||||
@ -168,10 +168,12 @@ OLAPStatus SegmentReader::_set_decompressor() {
|
||||
_decompressor = NULL;
|
||||
break;
|
||||
}
|
||||
#ifdef DORIS_WITH_LZO
|
||||
case COMPRESS_LZO: {
|
||||
_decompressor = lzo_decompress;
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
case COMPRESS_LZ4: {
|
||||
_decompressor = lz4_decompress;
|
||||
break;
|
||||
|
||||
@ -29,8 +29,12 @@
|
||||
#include <boost/regex.hpp>
|
||||
#include <errno.h>
|
||||
#include <lz4/lz4.h>
|
||||
|
||||
#ifdef DORIS_WITH_LZO
|
||||
#include <lzo/lzo1c.h>
|
||||
#include <lzo/lzo1x.h>
|
||||
#endif
|
||||
|
||||
#include <stdarg.h>
|
||||
|
||||
#include "common/logging.h"
|
||||
@ -266,6 +270,8 @@ OLAPStatus olap_compress(const char* src_buf,
|
||||
|
||||
*written_len = dest_len;
|
||||
switch (compression_type) {
|
||||
|
||||
#ifdef DORIS_WITH_LZO
|
||||
case OLAP_COMP_TRANSPORT: {
|
||||
// A small buffer(hundreds of bytes) for LZO1X
|
||||
unsigned char mem[LZO1X_1_MEM_COMPRESS];
|
||||
@ -318,6 +324,8 @@ OLAPStatus olap_compress(const char* src_buf,
|
||||
}
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
|
||||
case OLAP_COMP_LZ4: {
|
||||
// int lz4_res = LZ4_compress_limitedOutput(src_buf, dest_buf, src_len, dest_len);
|
||||
int lz4_res = LZ4_compress_default(src_buf, dest_buf, src_len, dest_len);
|
||||
@ -355,6 +363,8 @@ OLAPStatus olap_decompress(const char* src_buf,
|
||||
|
||||
*written_len = dest_len;
|
||||
switch (compression_type) {
|
||||
|
||||
#ifdef DORIS_WITH_LZO
|
||||
case OLAP_COMP_TRANSPORT: {
|
||||
int lzo_res = lzo1x_decompress_safe(reinterpret_cast<const lzo_byte*>(src_buf),
|
||||
src_len,
|
||||
@ -403,6 +413,8 @@ OLAPStatus olap_decompress(const char* src_buf,
|
||||
}
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
|
||||
case OLAP_COMP_LZ4: {
|
||||
int lz4_res = LZ4_decompress_safe(src_buf, dest_buf, src_len, dest_len);
|
||||
*written_len = lz4_res;
|
||||
@ -419,6 +431,7 @@ OLAPStatus olap_decompress(const char* src_buf,
|
||||
break;
|
||||
}
|
||||
default:
|
||||
LOG(FATAL) << "unknown compress kind. kind=" << compression_type;
|
||||
break;
|
||||
}
|
||||
return OLAP_SUCCESS;
|
||||
|
||||
@ -39,7 +39,9 @@ ADD_BE_TEST(plain_text_line_reader_uncompressed_test)
|
||||
ADD_BE_TEST(plain_text_line_reader_gzip_test)
|
||||
ADD_BE_TEST(plain_text_line_reader_bzip_test)
|
||||
ADD_BE_TEST(plain_text_line_reader_lz4frame_test)
|
||||
if(DEFINED DORIS_WITH_LZO)
|
||||
ADD_BE_TEST(plain_text_line_reader_lzop_test)
|
||||
endif()
|
||||
ADD_BE_TEST(broker_reader_test)
|
||||
ADD_BE_TEST(broker_scanner_test)
|
||||
ADD_BE_TEST(broker_scan_node_test)
|
||||
|
||||
@ -67,7 +67,7 @@ public:
|
||||
_offsets.push_back(0);
|
||||
|
||||
_stream_factory =
|
||||
new(std::nothrow) OutStreamFactory(COMPRESS_LZO,
|
||||
new(std::nothrow) OutStreamFactory(COMPRESS_LZ4,
|
||||
OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE);
|
||||
ASSERT_TRUE(_stream_factory != NULL);
|
||||
config::column_dictionary_key_ration_threshold = 30;
|
||||
@ -191,7 +191,7 @@ public:
|
||||
&_shared_buffer,
|
||||
off[i],
|
||||
length[i],
|
||||
lzo_decompress,
|
||||
lz4_decompress,
|
||||
buffer_size[i],
|
||||
&_stats);
|
||||
ASSERT_EQ(OLAP_SUCCESS, in_stream->init());
|
||||
|
||||
@ -204,7 +204,7 @@ TEST(TestStream, UncompressInStream) {
|
||||
TEST(TestStream, CompressOutStream) {
|
||||
// write data
|
||||
OutStream *out_stream =
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress);
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
|
||||
ASSERT_TRUE(out_stream != NULL);
|
||||
ASSERT_TRUE(out_stream->_compressor != NULL);
|
||||
|
||||
@ -223,7 +223,8 @@ TEST(TestStream, CompressOutStream) {
|
||||
StreamHead head;
|
||||
(*it)->get((char *)&head, sizeof(head));
|
||||
ASSERT_EQ(head.type, StreamHead::COMPRESSED);
|
||||
ASSERT_EQ(head.length, 49);
|
||||
// if lzo, this should be 49
|
||||
ASSERT_EQ(51, head.length);
|
||||
|
||||
SAFE_DELETE_ARRAY(write_data);
|
||||
SAFE_DELETE(out_stream);
|
||||
@ -232,7 +233,7 @@ TEST(TestStream, CompressOutStream) {
|
||||
TEST(TestStream, CompressOutStream2) {
|
||||
// write data
|
||||
OutStream *out_stream =
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress);
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
|
||||
ASSERT_TRUE(out_stream != NULL);
|
||||
ASSERT_TRUE(out_stream->_compressor != NULL);
|
||||
|
||||
@ -250,12 +251,12 @@ TEST(TestStream, CompressOutStream2) {
|
||||
}
|
||||
std::vector<uint64_t> offsets;
|
||||
offsets.push_back(0);
|
||||
offsets.push_back(57);
|
||||
offsets.push_back(59); // if lzo, this shoudl be 57
|
||||
InStream *in_stream =
|
||||
new (std::nothrow) InStream(&inputs,
|
||||
offsets,
|
||||
out_stream->get_stream_length(),
|
||||
lzo_decompress,
|
||||
lz4_decompress,
|
||||
out_stream->get_total_buffer_size());
|
||||
|
||||
char data;
|
||||
@ -275,7 +276,7 @@ TEST(TestStream, CompressOutStream2) {
|
||||
TEST(TestStream, CompressOutStream3) {
|
||||
// write data
|
||||
OutStream *out_stream =
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress);
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
|
||||
ASSERT_TRUE(out_stream != NULL);
|
||||
ASSERT_TRUE(out_stream->_compressor != NULL);
|
||||
|
||||
@ -302,7 +303,7 @@ TEST(TestStream, CompressOutStream3) {
|
||||
new (std::nothrow) InStream(&inputs,
|
||||
offsets,
|
||||
out_stream->get_stream_length(),
|
||||
lzo_decompress,
|
||||
lz4_decompress,
|
||||
out_stream->get_total_buffer_size());
|
||||
|
||||
char data;
|
||||
@ -324,7 +325,7 @@ TEST(TestStream, CompressOutStream3) {
|
||||
TEST(TestStream, CompressOutStream4) {
|
||||
// write data
|
||||
OutStream *out_stream =
|
||||
new(std::nothrow) OutStream(18, lzo_compress);
|
||||
new(std::nothrow) OutStream(18, lz4_compress);
|
||||
ASSERT_TRUE(out_stream != NULL);
|
||||
ASSERT_TRUE(out_stream->_compressor != NULL);
|
||||
|
||||
@ -354,7 +355,7 @@ TEST(TestStream, CompressOutStream4) {
|
||||
new (std::nothrow) InStream(&inputs,
|
||||
offsets,
|
||||
out_stream->get_stream_length(),
|
||||
lzo_decompress,
|
||||
lz4_decompress,
|
||||
out_stream->get_total_buffer_size());
|
||||
|
||||
char data;
|
||||
@ -382,7 +383,7 @@ TEST(TestStream, CompressOutStream4) {
|
||||
TEST(TestStream, CompressMassOutStream) {
|
||||
// write data
|
||||
OutStream *out_stream =
|
||||
new(std::nothrow) OutStream(100, lzo_compress);
|
||||
new(std::nothrow) OutStream(100, lz4_compress);
|
||||
ASSERT_TRUE(out_stream != NULL);
|
||||
ASSERT_TRUE(out_stream->_compressor != NULL);
|
||||
|
||||
@ -405,12 +406,12 @@ TEST(TestStream, CompressMassOutStream) {
|
||||
}
|
||||
std::vector<uint64_t> offsets;
|
||||
offsets.push_back(0);
|
||||
offsets.push_back(17);
|
||||
offsets.push_back(19); // if lzo, this should be 17
|
||||
InStream *in_stream =
|
||||
new (std::nothrow) InStream(&inputs,
|
||||
offsets,
|
||||
out_stream->get_stream_length(),
|
||||
lzo_decompress,
|
||||
lz4_decompress,
|
||||
out_stream->get_total_buffer_size());
|
||||
SAFE_DELETE(out_stream);
|
||||
|
||||
@ -432,7 +433,7 @@ TEST(TestStream, CompressMassOutStream) {
|
||||
TEST(TestStream, CompressInStream) {
|
||||
// write data
|
||||
OutStream *out_stream =
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress);
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
|
||||
ASSERT_TRUE(out_stream != NULL);
|
||||
ASSERT_TRUE(out_stream->_compressor != NULL);
|
||||
|
||||
@ -454,7 +455,7 @@ TEST(TestStream, CompressInStream) {
|
||||
InStream *in_stream = new (std::nothrow) InStream(&inputs,
|
||||
offsets,
|
||||
out_stream->get_stream_length(),
|
||||
lzo_decompress,
|
||||
lz4_decompress,
|
||||
out_stream->get_total_buffer_size());
|
||||
ASSERT_EQ(in_stream->available(), OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE);
|
||||
char data;
|
||||
@ -568,7 +569,7 @@ TEST(TestStream, SkipUncompress) {
|
||||
TEST(TestStream, SeekCompress) {
|
||||
// write data
|
||||
OutStream *out_stream =
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress);
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
|
||||
ASSERT_TRUE(out_stream != NULL);
|
||||
|
||||
for (int32_t i = 0; i < 10; i++) {
|
||||
@ -596,7 +597,7 @@ TEST(TestStream, SeekCompress) {
|
||||
new (std::nothrow) InStream(&inputs,
|
||||
offsets,
|
||||
out_stream->get_stream_length(),
|
||||
lzo_decompress,
|
||||
lz4_decompress,
|
||||
out_stream->get_total_buffer_size());
|
||||
//ASSERT_EQ(in_stream->available(), 2);
|
||||
char buffer[256];
|
||||
@ -620,7 +621,7 @@ TEST(TestStream, SeekCompress) {
|
||||
TEST(TestStream, SkipCompress) {
|
||||
// write data
|
||||
OutStream *out_stream =
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lzo_compress);
|
||||
new(std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
|
||||
ASSERT_TRUE(out_stream != NULL);
|
||||
|
||||
for (int32_t i = 0; i < 10; i++) {
|
||||
@ -642,7 +643,7 @@ TEST(TestStream, SkipCompress) {
|
||||
new (std::nothrow) InStream(&inputs,
|
||||
offsets,
|
||||
out_stream->get_stream_length(),
|
||||
lzo_decompress,
|
||||
lz4_decompress,
|
||||
out_stream->get_total_buffer_size());
|
||||
|
||||
in_stream->skip(10);
|
||||
|
||||
34
build.sh
34
build.sh
@ -51,15 +51,18 @@ Usage: $0 <options>
|
||||
--be build Backend
|
||||
--fe build Frontend
|
||||
--clean clean and build target
|
||||
--with-mysql enable MySQL support
|
||||
--without-mysql disable MySQL support
|
||||
--with-mysql enable MySQL support(default)
|
||||
--without-mysql disable MySQL support
|
||||
--with-lzo enable LZO compress support(default)
|
||||
--without-lzo disable LZO compress support
|
||||
|
||||
Eg.
|
||||
$0 build Backend and Frontend without clean
|
||||
$0 --be build Backend without clean
|
||||
$0 --be --without-mysql build Backend with MySQL disable
|
||||
$0 --fe --clean clean and build Frontend
|
||||
$0 --fe --be --clean clean and build both Frontend and Backend
|
||||
$0 build Backend and Frontend without clean
|
||||
$0 --be build Backend without clean
|
||||
$0 --be --without-mysql build Backend with MySQL disable
|
||||
$0 --be --without-mysql --without-lzo build Backend with both MySQL and LZO disable
|
||||
$0 --fe --clean clean and build Frontend
|
||||
$0 --fe --be --clean clean and build both Frontend and Backend
|
||||
"
|
||||
exit 1
|
||||
}
|
||||
@ -73,6 +76,8 @@ OPTS=$(getopt \
|
||||
-l 'clean' \
|
||||
-l 'with-mysql' \
|
||||
-l 'without-mysql' \
|
||||
-l 'with-lzo' \
|
||||
-l 'without-lzo' \
|
||||
-l 'help' \
|
||||
-- "$@")
|
||||
|
||||
@ -87,6 +92,7 @@ BUILD_FE=
|
||||
CLEAN=
|
||||
RUN_UT=
|
||||
WITH_MYSQL=ON
|
||||
WITH_LZO=ON
|
||||
HELP=0
|
||||
if [ $# == 1 ] ; then
|
||||
# defuat
|
||||
@ -107,6 +113,8 @@ else
|
||||
--ut) RUN_UT=1 ; shift ;;
|
||||
--with-mysql) WITH_MYSQL=ON; shift ;;
|
||||
--without-mysql) WITH_MYSQL=OFF; shift ;;
|
||||
--with-lzo) WITH_LZO=ON; shift ;;
|
||||
--without-lzo) WITH_LZO=OFF; shift ;;
|
||||
-h) HELP=1; shift ;;
|
||||
--help) HELP=1; shift ;;
|
||||
--) shift ; break ;;
|
||||
@ -126,10 +134,12 @@ if [ ${CLEAN} -eq 1 -a ${BUILD_BE} -eq 0 -a ${BUILD_FE} -eq 0 ]; then
|
||||
fi
|
||||
|
||||
echo "Get params:
|
||||
BUILD_BE -- $BUILD_BE
|
||||
BUILD_FE -- $BUILD_FE
|
||||
CLEAN -- $CLEAN
|
||||
RUN_UT -- $RUN_UT
|
||||
BUILD_BE -- $BUILD_BE
|
||||
BUILD_FE -- $BUILD_FE
|
||||
CLEAN -- $CLEAN
|
||||
RUN_UT -- $RUN_UT
|
||||
WITH_MYSQL -- $WITH_MYSQL
|
||||
WITH_LZO -- $WITH_LZO
|
||||
"
|
||||
|
||||
# Clean and build generated code
|
||||
@ -150,7 +160,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then
|
||||
fi
|
||||
mkdir -p ${DORIS_HOME}/be/build/
|
||||
cd ${DORIS_HOME}/be/build/
|
||||
cmake -DWITH_MYSQL=${WITH_MYSQL} ../
|
||||
cmake -DWITH_MYSQL=${WITH_MYSQL} -DWITH_LZO=${WITH_LZO} ../
|
||||
make -j${PARALLEL}
|
||||
make install
|
||||
cd ${DORIS_HOME}
|
||||
|
||||
@ -93,6 +93,7 @@ public class SetConfigAction extends RestBaseAction {
|
||||
try {
|
||||
ConfigBase.setConfigField(f, confVals.get(0));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to set config {}:{}", confKey, confVals.get(0), e);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -127,4 +128,4 @@ public class SetConfigAction extends RestBaseAction {
|
||||
public static void print(String msg) {
|
||||
System.out.println(System.currentTimeMillis() + " " + msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -168,7 +168,9 @@ ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_uncompressed_test
|
||||
${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_gzip_test
|
||||
${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_bzip_test
|
||||
${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lz4frame_test
|
||||
${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lzop_test
|
||||
if [ -f ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lzop_test ];then
|
||||
${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lzop_test
|
||||
fi
|
||||
${DORIS_TEST_BINARY_DIR}/exec/broker_scanner_test
|
||||
${DORIS_TEST_BINARY_DIR}/exec/broker_scan_node_test
|
||||
${DORIS_TEST_BINARY_DIR}/exec/es_scan_node_test
|
||||
|
||||
66
thirdparty/LICENSE.txt
vendored
Normal file
66
thirdparty/LICENSE.txt
vendored
Normal file
@ -0,0 +1,66 @@
|
||||
==========================================================
|
||||
License information for Doris third-party dependencies
|
||||
==========================================================
|
||||
|
||||
This directory contains scripts which download and install several third-party
|
||||
dependencies of Doris. And for producation builds, most of these dependencies
|
||||
are statically linked into the Doris binaries. Some of dependencies are optional
|
||||
for Doris build.
|
||||
|
||||
Most of these dependencies' licenses are compatible with Apache License Version 2.0.
|
||||
But serverl dependencies are under imcompatible licenses.
|
||||
|
||||
This file will give an interpretation of these licenses.
|
||||
|
||||
============================================================================================
|
||||
LZ4
|
||||
source: https://github.com/lz4/lz4
|
||||
|
||||
This repository uses 2 different licenses :
|
||||
- all files in the `lib` directory use a BSD 2-Clause license
|
||||
- all other files use a GPLv2 license, unless explicitly stated otherwise
|
||||
|
||||
Doris only dependents on features implemented in the `lib` directory, which are under BSD 2-Clause license.
|
||||
|
||||
============================================================================================
|
||||
RapidJSON
|
||||
source: https://github.com/Tencent/rapidjson/
|
||||
|
||||
RapidJSON source code is licensed under the MIT License, except for the third-party components listed below
|
||||
which are subject to different license terms. Your integration of RapidJSON into your own projects may require
|
||||
compliance with the MIT License, as well as the other licenses applicable to the third-party components included
|
||||
within RapidJSON. To avoid the problematic JSON license in your own projects, it's sufficient to exclude the
|
||||
bin/jsonchecker/ directory, as it's the only code under the JSON license.
|
||||
|
||||
Doris only dependents on the source files in `include/rapidjson` directory, which are under MIT License.
|
||||
|
||||
============================================================================================
|
||||
RocksDB
|
||||
source: https://github.com/facebook/rocksdb
|
||||
|
||||
RocksDB is dual-licensed under both the GPLv2 (found in the COPYING file in the root directory) and
|
||||
Apache 2.0 License (found in the LICENSE.Apache file in the root directory)
|
||||
|
||||
Doris selects Apache 2.0 License of RocksDB.
|
||||
|
||||
============================================================================================
|
||||
LZO
|
||||
source: https://github.com/nemequ/lzo
|
||||
|
||||
LZO is under GNU General Public License Version 2(GPL-2), which is not compatible with Apache 2.0 License.
|
||||
|
||||
So LZO is an optional feature when building Doris from source. You can disable the LZO support by adding
|
||||
`--without-lzo` argument in `build.sh` script.
|
||||
|
||||
Disable LZO support does not affect normal use of Doris.
|
||||
|
||||
============================================================================================
|
||||
MySQL
|
||||
source: https://github.com/mysql/mysql-server
|
||||
|
||||
MySQL is under GNU General Public License Version 2(GPL-2), which is not compatible with Apache 2.0 License.
|
||||
|
||||
So MySQL is an optional feature when building Doris from source. You can disable the MySQL support by adding
|
||||
`--without-mysql` argument in `build.sh` script.
|
||||
|
||||
Disable MySQL support with disable the feature of visiting a MySQL table as an external table in Doris.
|
||||
Reference in New Issue
Block a user