diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 05cc4983a6..68e3e0b747 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -25,14 +25,12 @@ set(UTIL_FILES arena.cpp bfd_parser.cpp bitmap.cpp - codec.cpp + block_compression.cpp coding.cpp - compress.cpp cpu_info.cpp date_func.cpp dynamic_util.cpp debug_util.cpp - decompress.cpp disk_info.cpp errno.cpp hash_util.hpp diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp new file mode 100644 index 0000000000..460e8fa7f4 --- /dev/null +++ b/be/src/util/block_compression.cpp @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/block_compression.h" + +#include +#include +#include +#include +#include + +#include "util/faststring.h" +#include "gutil/strings/substitute.h" + +namespace doris { + +using strings::Substitute; + +Status BlockCompressionCodec::compress(const std::vector& inputs, Slice* output) const { + faststring buf; + // we compute total size to avoid more memory copy + size_t total_size = Slice::compute_total_size(inputs); + buf.reserve(total_size); + for (auto& input : inputs) { + buf.append(input.data, input.size); + } + return compress(buf, output); +} + +class Lz4BlockCompression : public BlockCompressionCodec { +public: + static Lz4BlockCompression* instance() { + static Lz4BlockCompression s_instance; + return &s_instance; + } + ~Lz4BlockCompression() override { } + + Status compress(const Slice& input, Slice* output) const override { + auto compressed_len = + LZ4_compress_default(input.data, output->data, input.size, output->size); + if (compressed_len == 0) { + return Status::InvalidArgument( + Substitute("Output buffer's capacity is not enough, size=$0", output->size)); + } + output->size = compressed_len; + return Status::OK(); + } + + Status decompress(const Slice& input, Slice* output) const override { + auto decompressed_len = + LZ4_decompress_safe(input.data, output->data, input.size, output->size); + if (decompressed_len < 0) { + return Status::InvalidArgument( + Substitute("fail to do LZ4 decompress, error=$0", decompressed_len)); + } + output->size = decompressed_len; + return Status::OK(); + } + + size_t max_compressed_len(size_t len) const override { + return LZ4_compressBound(len); + } +}; + +// Used for LZ4 frame format, decompress speed is two times faster than LZ4. +class Lz4fBlockCompression : public BlockCompressionCodec { +public: + static Lz4fBlockCompression* instance() { + static Lz4fBlockCompression s_instance; + return &s_instance; + } + + ~Lz4fBlockCompression() override { } + + Status compress(const Slice& input, Slice* output) const override { + auto compressed_len = + LZ4F_compressFrame(output->data, output->size, input.data, input.size, &_s_preferences); + if (LZ4F_isError(compressed_len)) { + return Status::InvalidArgument( + Substitute("Fail to do LZ4F compress frame, msg=$0", + LZ4F_getErrorName(compressed_len))); + } + output->size = compressed_len; + return Status::OK(); + } + + Status compress(const std::vector& inputs, Slice* output) const override { + LZ4F_compressionContext_t ctx = nullptr; + auto lres = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); + if (lres != 0) { + return Status::InvalidArgument( + Substitute("Fail to do LZ4F compress, res=$0", LZ4F_getErrorName(lres))); + } + auto st = _compress(ctx, inputs, output); + LZ4F_freeCompressionContext(ctx); + return st; + } + + Status decompress(const Slice& input, Slice* output) const override { + LZ4F_decompressionContext_t ctx; + auto lres = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION); + if (LZ4F_isError(lres)) { + return Status::InvalidArgument( + Substitute("Fail to do LZ4F decompress, res=$0", LZ4F_getErrorName(lres))); + } + auto st = _decompress(ctx, input, output); + LZ4F_freeDecompressionContext(ctx); + return st; + } + + size_t max_compressed_len(size_t len) const override { + return std::max(LZ4F_compressBound(len, &_s_preferences), + LZ4F_compressFrameBound(len, &_s_preferences)); + } +private: + Status _compress(LZ4F_compressionContext_t ctx, + const std::vector& inputs, Slice* output) const { + auto wbytes = LZ4F_compressBegin(ctx, output->data, output->size, &_s_preferences); + if (LZ4F_isError(wbytes)) { + return Status::InvalidArgument( + Substitute("Fail to do LZ4F compress begin, res=$0", LZ4F_getErrorName(wbytes))); + } + size_t offset = wbytes; + for (auto input : inputs) { + wbytes = LZ4F_compressUpdate(ctx, + output->data + offset, output->size - offset, + input.data, input.size, + nullptr); + if (LZ4F_isError(wbytes)) { + return Status::InvalidArgument( + Substitute("Fail to do LZ4F compress update, res=$0", LZ4F_getErrorName(wbytes))); + } + offset += wbytes; + } + wbytes = LZ4F_compressEnd(ctx, + output->data + offset, output->size - offset, + nullptr); + if (LZ4F_isError(wbytes)) { + return Status::InvalidArgument( + Substitute("Fail to do LZ4F compress end, res=$0", LZ4F_getErrorName(wbytes))); + } + offset += wbytes; + output->size = wbytes; + return Status::OK(); + } + + Status _decompress(LZ4F_decompressionContext_t ctx, + const Slice& input, Slice* output) const { + size_t input_size = input.size; + auto lres = + LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr); + if (LZ4F_isError(lres)) { + return Status::InvalidArgument( + Substitute("Fail to do LZ4F decompress, res=$0", LZ4F_getErrorName(lres))); + } else if (input_size != input.size) { + return Status::InvalidArgument( + Substitute("Fail to do LZ4F decompress: trailing data left in compressed data, read=$0 vs given=$1", + input_size, input.size)); + } else if (lres != 0) { + return Status::InvalidArgument( + Substitute("Fail to do LZ4F decompress: expect more compressed data, expect=$0", lres)); + } + return Status::OK(); + } + +private: + static LZ4F_preferences_t _s_preferences; +}; + +LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { + { + LZ4F_max256KB, LZ4F_blockLinked, + LZ4F_noContentChecksum, LZ4F_frame, + 0, // unknown content size, + {0, 0} // reserved, must be set to 0 + }, + 0, // compression level; 0 == default + 0, // autoflush + { 0, 0, 0, 0 }, // reserved, must be set to 0 +}; + + +class SnappySlicesSource : public snappy::Source { +public: + SnappySlicesSource(const std::vector& slices) : _available(0), _cur_slice(0), _slice_off(0) { + for (auto& slice : slices) { + // We filter empty slice here to avoid complicated process + if (slice.size == 0) { + continue; + } + _available += slice.size; + _slices.push_back(slice); + } + } + ~SnappySlicesSource() override { } + + // Return the number of bytes left to read from the source + size_t Available() const override { return _available; } + + // Peek at the next flat region of the source. Does not reposition + // the source. The returned region is empty iff Available()==0. + // + // Returns a pointer to the beginning of the region and store its + // length in *len. + // + // The returned region is valid until the next call to Skip() or + // until this object is destroyed, whichever occurs first. + // + // The returned region may be larger than Available() (for example + // if this ByteSource is a view on a substring of a larger source). + // The caller is responsible for ensuring that it only reads the + // Available() bytes. + const char* Peek(size_t* len) override { + if (_available == 0) { + *len = 0; + return nullptr; + } + // we should assure that *len is not 0 + *len = _slices[_cur_slice].size - _slice_off; + DCHECK(*len != 0); + return _slices[_cur_slice].data; + } + + // Skip the next n bytes. Invalidates any buffer returned by + // a previous call to Peek(). + // REQUIRES: Available() >= n + void Skip(size_t n) override { + _available -= n; + do { + auto left = _slices[_cur_slice].size - _slice_off; + if (left > n) { + // n can be digest in current slice + _slice_off += n; + return; + } + _slice_off = 0; + _cur_slice++; + n -= left; + } while (n > 0); + } + +private: + std::vector _slices; + size_t _available; + size_t _cur_slice; + size_t _slice_off; +}; + +class SnappyBlockCompression : public BlockCompressionCodec { +public: + static SnappyBlockCompression* instance() { + static SnappyBlockCompression s_instance; + return &s_instance; + } + ~SnappyBlockCompression() override { } + + Status compress(const Slice& input, Slice* output) const override { + snappy::RawCompress(input.data, input.size, output->data, &output->size); + return Status::OK(); + } + + Status decompress(const Slice& input, Slice* output) const override { + if (!snappy::RawUncompress(input.data, input.size, output->data)) { + return Status::InvalidArgument("Fail to do Snappy decompress"); + } + // NOTE: GetUncompressedLength only takes O(1) time + snappy::GetUncompressedLength(input.data, input.size, &output->size); + return Status::OK(); + } + + Status compress(const std::vector& inputs, Slice* output) const override { + SnappySlicesSource source(inputs); + snappy::UncheckedByteArraySink sink(output->data); + output->size = snappy::Compress(&source, &sink); + return Status::OK(); + } + + size_t max_compressed_len(size_t len) const override { + return snappy::MaxCompressedLength(len); + } +}; + +class ZlibBlockCompression : public BlockCompressionCodec { +public: + static ZlibBlockCompression* instance() { + static ZlibBlockCompression s_instance; + return &s_instance; + } + ~ZlibBlockCompression() { } + + Status compress(const Slice& input, Slice* output) const override { + auto zres = ::compress((Bytef*)output->data, &output->size, (Bytef*)input.data, input.size); + if (zres != Z_OK) { + return Status::InvalidArgument( + Substitute("Fail to do ZLib compress, error=$0", zError(zres))); + } + return Status::OK(); + } + + Status compress(const std::vector& inputs, Slice* output) const override { + z_stream zstrm; + zstrm.zalloc = Z_NULL; + zstrm.zfree = Z_NULL; + zstrm.opaque = Z_NULL; + auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION); + if (zres != Z_OK) { + return Status::InvalidArgument( + Substitute("Fail to do ZLib stream compress, error=$0", zError(zres))); + } + // we assume that output is e + zstrm.next_out = (Bytef*)output->data; + zstrm.avail_out = output->size; + for (int i = 0; i < inputs.size(); ++i) { + if (inputs[i].size == 0) { + continue; + } + zstrm.next_in = (Bytef*)inputs[i].data; + zstrm.avail_in = inputs[i].size; + int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; + + zres = deflate(&zstrm, flush); + if (zres != Z_OK || zres != Z_STREAM_END) { + return Status::InvalidArgument( + Substitute("Fail to do ZLib stream compress, error=$0", zError(zres))); + } + } + + output->size = zstrm.total_out; + + return Status::OK(); + } + + Status decompress(const Slice& input, Slice* output) const override { + size_t input_size = input.size; + auto zres = ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); + if (zres != Z_OK) { + return Status::InvalidArgument( + Substitute("Fail to do ZLib decompress, error=$0", zError(zres))); + } + return Status::OK(); + } + + size_t max_compressed_len(size_t len) const override { + // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block + return len + 6 + 5 * ((len >> 14) + 1); + } +}; + +Status get_block_compression_codec( + segment_v2::CompressionTypePB type, BlockCompressionCodec** codec) { + switch (type) { + case segment_v2::CompressionTypePB::NO_COMPRESSION: + *codec = nullptr; + break; + case segment_v2::CompressionTypePB::SNAPPY: + *codec = SnappyBlockCompression::instance(); + break; + case segment_v2::CompressionTypePB::LZ4: + *codec = Lz4BlockCompression::instance(); + break; + case segment_v2::CompressionTypePB::LZ4F: + *codec = Lz4fBlockCompression::instance(); + break; + case segment_v2::CompressionTypePB::ZLIB: + *codec = ZlibBlockCompression::instance(); + break; + default: + return Status::NotFound(Substitute("unknown compression type($0)", type)); + } + return Status::OK(); +} + +} diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h new file mode 100644 index 0000000000..dc0ddfc5de --- /dev/null +++ b/be/src/util/block_compression.h @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "gen_cpp/segment_v2.pb.h" +#include "util/slice.h" + +namespace doris { + +// This class is used to encapsulate Compression/Decompression algorithm. +// This class only used to compress a block data, which means all data +// should given when call compress or decompress. This class don't handle +// stream compression. +class BlockCompressionCodec { +public: + virtual ~BlockCompressionCodec() { } + + // This function will compress input data into output. + // output should be preallocated, and its capacity must be large enough + // for compressed input, which can be get through max_compressed_len function. + // Size of compressed data will be set in output's size. + virtual Status compress(const Slice& input, Slice* output) const = 0; + + // Default implementation will merge input list into a big buffer and call + // compress(Slice) to finish compression. If compression type support digesting + // slice one by one, it should reimplement this function. + virtual Status compress(const std::vector& input, Slice* output) const; + + // Decompress input data into output, output's capacity should be large enough + // for decompressed data. + // Size of decompressed data will be set in output's size. + virtual Status decompress(const Slice& input, Slice* output) const = 0; + + // Returns an upper bound on the max compressed length. + virtual size_t max_compressed_len(size_t len) const = 0; +}; + +// Get a BlockCompressionCodec through type. +// Return Status::OK if a valid codec is found. If codec is null, it means it is +// NO_COMPRESSION. If codec is not null, user can use it to compress/decompress +// data. And client doesn't have to release the codec. +// +// Return not OK, if error happens. +Status get_block_compression_codec( + segment_v2::CompressionTypePB type, BlockCompressionCodec** codec); + +} diff --git a/be/src/util/codec.cpp b/be/src/util/codec.cpp deleted file mode 100644 index b18c790d61..0000000000 --- a/be/src/util/codec.cpp +++ /dev/null @@ -1,200 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "util/codec.h" -#include - -#include "util/compress.h" -#include "util/decompress.h" - -#include "gen_cpp/Descriptors_types.h" -#include "gen_cpp/Descriptors_constants.h" - -namespace doris { - -const char* const Codec::DEFAULT_COMPRESSION = - "org.apache.hadoop.io.compress.DefaultCodec"; - -const char* const Codec::GZIP_COMPRESSION = - "org.apache.hadoop.io.compress.GzipCodec"; - -const char* const Codec::BZIP2_COMPRESSION = - "org.apache.hadoop.io.compress.BZip2Codec"; - -const char* const Codec::SNAPPY_COMPRESSION = - "org.apache.hadoop.io.compress.SnappyCodec"; - -const char* const UNKNOWN_CODEC_ERROR = - "This compression codec is currently unsupported: "; - -const Codec::CodecMap Codec::CODEC_MAP = boost::assign::map_list_of - ("", THdfsCompression::NONE) - (Codec::DEFAULT_COMPRESSION, THdfsCompression::DEFAULT) - (Codec::GZIP_COMPRESSION, THdfsCompression::GZIP) - (Codec::BZIP2_COMPRESSION, THdfsCompression::BZIP2) - (Codec::SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED); - -std::string Codec::get_codec_name(THdfsCompression::type type) { - std::map::const_iterator im; - - for (im = g_Descriptors_constants.COMPRESSION_MAP.begin(); - im != g_Descriptors_constants.COMPRESSION_MAP.end(); ++im) { - if (im->second == type) { - return im->first; - } - } - - DCHECK(im != g_Descriptors_constants.COMPRESSION_MAP.end()); - return "INVALID"; -} - -Status Codec::create_compressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, const std::string& codec, - boost::scoped_ptr* compressor) { - std::map::const_iterator - type = CODEC_MAP.find(codec); - - if (type == CODEC_MAP.end()) { - std::stringstream ss; - ss << UNKNOWN_CODEC_ERROR << codec; - return Status::InternalError(ss.str()); - } - - Codec* comp = NULL; - RETURN_IF_ERROR( - create_compressor(runtime_state, mem_pool, reuse, type->second, &comp)); - compressor->reset(comp); - return Status::OK(); -} - -Status Codec::create_compressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, THdfsCompression::type format, - boost::scoped_ptr* compressor) { - Codec* comp = NULL; - RETURN_IF_ERROR( - create_compressor(runtime_state, mem_pool, reuse, format, &comp)); - compressor->reset(comp); - return Status::OK(); -} - -Status Codec::create_compressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, THdfsCompression::type format, - Codec** compressor) { - switch (format) { - case THdfsCompression::NONE: - *compressor = NULL; - return Status::OK(); - - case THdfsCompression::GZIP: - *compressor = new GzipCompressor(GzipCompressor::GZIP, mem_pool, reuse); - break; - - case THdfsCompression::DEFAULT: - *compressor = new GzipCompressor(GzipCompressor::ZLIB, mem_pool, reuse); - break; - - case THdfsCompression::DEFLATE: - *compressor = new GzipCompressor(GzipCompressor::DEFLATE, mem_pool, reuse); - break; - - case THdfsCompression::BZIP2: - *compressor = new BzipCompressor(mem_pool, reuse); - break; - - case THdfsCompression::SNAPPY_BLOCKED: - *compressor = new SnappyBlockCompressor(mem_pool, reuse); - break; - - case THdfsCompression::SNAPPY: - *compressor = new SnappyCompressor(mem_pool, reuse); - break; - } - - return (*compressor)->init(); -} - -Status Codec::create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, const std::string& codec, - boost::scoped_ptr* decompressor) { - std::map::const_iterator - type = CODEC_MAP.find(codec); - - if (type == CODEC_MAP.end()) { - std::stringstream ss; - ss << UNKNOWN_CODEC_ERROR << codec; - return Status::InternalError(ss.str()); - } - - Codec* decom = NULL; - RETURN_IF_ERROR( - create_decompressor(runtime_state, mem_pool, reuse, type->second, &decom)); - decompressor->reset(decom); - return Status::OK(); -} - -Status Codec::create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, THdfsCompression::type format, - boost::scoped_ptr* decompressor) { - Codec* decom = NULL; - RETURN_IF_ERROR( - create_decompressor(runtime_state, mem_pool, reuse, format, &decom)); - decompressor->reset(decom); - return Status::OK(); -} - -Status Codec::create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, THdfsCompression::type format, - Codec** decompressor) { - switch (format) { - case THdfsCompression::NONE: - *decompressor = NULL; - return Status::OK(); - - case THdfsCompression::DEFAULT: - case THdfsCompression::GZIP: - *decompressor = new GzipDecompressor(mem_pool, reuse, false); - break; - - case THdfsCompression::DEFLATE: - *decompressor = new GzipDecompressor(mem_pool, reuse, true); - break; - - case THdfsCompression::BZIP2: - *decompressor = new BzipDecompressor(mem_pool, reuse); - break; - - case THdfsCompression::SNAPPY_BLOCKED: - *decompressor = new SnappyBlockDecompressor(mem_pool, reuse); - break; - - case THdfsCompression::SNAPPY: - *decompressor = new SnappyDecompressor(mem_pool, reuse); - break; - } - - return (*decompressor)->init(); -} - -Codec::Codec(MemPool* mem_pool, bool reuse_buffer) : - _memory_pool(mem_pool), - _temp_memory_pool(_memory_pool->mem_tracker()), - _reuse_buffer(reuse_buffer), - _out_buffer(NULL), - _buffer_length(0) { -} - -} diff --git a/be/src/util/codec.h b/be/src/util/codec.h deleted file mode 100644 index ff53c0eb51..0000000000 --- a/be/src/util/codec.h +++ /dev/null @@ -1,163 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_COMMON_UTIL_CODEC_H -#define DORIS_BE_SRC_COMMON_UTIL_CODEC_H - -#include - -#include "common/status.h" -#include "runtime/mem_pool.h" -#include "util/runtime_profile.h" -#include "gen_cpp/Descriptors_types.h" - -namespace doris { - -class MemPool; -class RuntimeState; - -// Create a compression object. This is the base class for all compression -// algorithms. A compression algorithm is either a compressor or a decompressor. -// To add a new algorithm, generally, both a compressor and a decompressor -// will be added. Each of these objects inherits from this class. The objects -// are instantiated in the Create static methods defined here. The type of -// compression is defined in the Thrift interface THdfsCompression. -class Codec { -public: - // These are the codec string representation used in Hadoop. - static const char* const DEFAULT_COMPRESSION; - static const char* const GZIP_COMPRESSION; - static const char* const BZIP2_COMPRESSION; - static const char* const SNAPPY_COMPRESSION; - - // Map from codec string to compression format - typedef std::map CodecMap; - static const CodecMap CODEC_MAP; - - // Create a decompressor. - // Input: - // runtime_state: the current runtime state. - // mem_pool: the memory pool used to store the decompressed data. - // reuse: if true the allocated buffer can be reused. - // format: the type of decompressor to create. - // Output: - // decompressor: pointer to the decompressor class to use. - static Status create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, THdfsCompression::type format, - Codec** decompressor); - - // Alternate creator: returns a scoped pointer. - static Status create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, THdfsCompression::type format, - boost::scoped_ptr* decompressor); - - // Alternate creator: takes a codec string and returns a scoped pointer. - static Status create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, const std::string& codec, - boost::scoped_ptr* decompressor); - - // Create the compressor. - // Input: - // runtime_state: the current runtime state. - // mem_pool: the memory pool used to store the compressed data. - // format: The type of compressor to create. - // reuse: if true the allocated buffer can be reused. - // Output: - // compressor: pointer to the compressor class to use. - static Status create_compressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, THdfsCompression::type format, - Codec** decompressor); - - // Alternate creator: returns a scoped pointer. - static Status create_compressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, THdfsCompression::type format, - boost::scoped_ptr* compressor); - - // Alternate creator: takes a codec string and returns a scoped pointer. - // Input, as above except: - // codec: the string representing the codec of the current file. - static Status create_compressor(RuntimeState* runtime_state, MemPool* mem_pool, - bool reuse, const std::string& codec, - boost::scoped_ptr* compressor); - - virtual ~Codec() {} - - // Process a block of data, either compressing or decompressing it. - // If *output_length is 0, the function will allocate from its mempool. - // If *output_length is non-zero, it should be the length of *output and must - // be exactly the size of the transformed output. - // Inputs: - // input_length: length of the data to process - // input: data to process - // In/Out: - // output_length: Length of the output, if known, 0 otherwise. - // Output: - // output: Pointer to processed data - // If this needs to allocate memory, a mempool must be passed into the c'tor. - virtual Status process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output) = 0; - - // Return the name of a compression algorithm. - static std::string get_codec_name(THdfsCompression::type); - - // Largest block we will compress/decompress: 2GB. - // We are dealing with compressed blocks that are never this big but we - // want to guard against a corrupt file that has the block length as some - // large number. - static const int MAX_BLOCK_SIZE = (2L * 1024 * 1024 * 1024) - 1; - -protected: - // Create a compression operator - // Inputs: - // mem_pool: memory pool to allocate the output buffer, this implies that the - // caller is responsible for the memory allocated by the operator. - // reuse_buffer: if false always allocate a new buffer rather than reuse. - Codec(MemPool* mem_pool, bool reuse_buffer); - - // Initialize the operation. - virtual Status init() = 0; - -private: - friend class GzipCompressor; - friend class GzipDecompressor; - friend class BzipCompressor; - friend class BzipDecompressor; - friend class SnappyBlockCompressor; - friend class SnappyBlockDecompressor; - friend class SnappyCompressor; - friend class SnappyDecompressor; - - // Pool to allocate the buffer to hold transformed data. - MemPool* _memory_pool; - - // Temporary memory pool: in case we get the output size too small we can - // use this to free unused buffers. - MemPool _temp_memory_pool; - - // Can we reuse the output buffer or do we need to allocate on each call? - bool _reuse_buffer; - - // Buffer to hold transformed data. - // Either passed from the caller or allocated from _memory_pool. - uint8_t* _out_buffer; - - // Length of the output buffer. - int _buffer_length; -}; - -} -#endif diff --git a/be/src/util/compress.cpp b/be/src/util/compress.cpp deleted file mode 100644 index fca5f44c47..0000000000 --- a/be/src/util/compress.cpp +++ /dev/null @@ -1,271 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "util/compress.h" -#include "exec/read_write_util.h" -#include "runtime/runtime_state.h" - -// Codec libraries -#include -#include -#include - -namespace doris { - -GzipCompressor::GzipCompressor(Format format, MemPool* mem_pool, bool reuse_buffer) : - Codec(mem_pool, reuse_buffer), - _format(format) { - bzero(&_stream, sizeof(_stream)); - init(); -} - -GzipCompressor::~GzipCompressor() { - (void)deflateEnd(&_stream); -} - -Status GzipCompressor::init() { - int ret = 0; - // Initialize to run specified format - int window_bits = WINDOW_BITS; - - if (_format == DEFLATE) { - window_bits = -window_bits; - } else if (_format == GZIP) { - window_bits += GZIP_CODEC; - } - - if ((ret = deflateInit2(&_stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, - window_bits, 9, Z_DEFAULT_STRATEGY)) != Z_OK) { - return Status::InternalError("zlib deflateInit failed: " + std::string(_stream.msg)); - } - - return Status::OK(); -} - -int GzipCompressor::max_compressed_len(int input_length) { - return deflateBound(&_stream, input_length); -} - -Status GzipCompressor::compress( - int input_length, - uint8_t* input, - int* output_length, - uint8_t* output) { - DCHECK_GE(*output_length, max_compressed_len(input_length)); - _stream.next_in = reinterpret_cast(input); - _stream.avail_in = input_length; - _stream.next_out = reinterpret_cast(output); - _stream.avail_out = *output_length; - - int ret = 0; - - if ((ret = deflate(&_stream, Z_FINISH)) != Z_STREAM_END) { - std::stringstream ss; - ss << "zlib deflate failed: " << _stream.msg; - return Status::InternalError(ss.str()); - } - - *output_length = *output_length - _stream.avail_out; - - if (deflateReset(&_stream) != Z_OK) { - return Status::InternalError("zlib deflateReset failed: " + std::string(_stream.msg)); - } - - return Status::OK(); -} - -Status GzipCompressor::process_block( - int input_length, - uint8_t* input, - int* output_length, - uint8_t** output) { - // If length is set then the output has been allocated. - if (*output_length != 0) { - _buffer_length = *output_length; - _out_buffer = *output; - } else { - int len = max_compressed_len(input_length); - - if (!_reuse_buffer || _buffer_length < len || _out_buffer == NULL) { - DCHECK(_memory_pool != NULL) << "Can't allocate without passing in a mem pool"; - _buffer_length = len; - _out_buffer = _memory_pool->allocate(_buffer_length); - *output_length = _buffer_length; - } - } - - RETURN_IF_ERROR(compress(input_length, input, output_length, _out_buffer)); - *output = _out_buffer; - return Status::OK(); -} - -BzipCompressor::BzipCompressor(MemPool* mem_pool, bool reuse_buffer) : - Codec(mem_pool, reuse_buffer) { -} - -Status BzipCompressor::process_block( - int input_length, - uint8_t* input, - int* output_length, - uint8_t** output) { - // If length is set then the output has been allocated. - if (*output_length != 0) { - _buffer_length = *output_length; - _out_buffer = *output; - } else if (!_reuse_buffer || _out_buffer == NULL) { - // guess that we will need no more the input length. - _buffer_length = input_length; - _out_buffer = _temp_memory_pool.allocate(_buffer_length); - } - - unsigned int outlen = 0; - int ret = BZ_OUTBUFF_FULL; - - while (ret == BZ_OUTBUFF_FULL) { - if (_out_buffer == NULL) { - DCHECK_EQ(*output_length, 0); - _temp_memory_pool.clear(); - _buffer_length = _buffer_length * 2; - _out_buffer = _temp_memory_pool.allocate(_buffer_length); - } - - outlen = static_cast(_buffer_length); - - if ((ret = BZ2_bzBuffToBuffCompress( - reinterpret_cast(_out_buffer), &outlen, - reinterpret_cast(input), - static_cast(input_length), 5, 2, 0)) == BZ_OUTBUFF_FULL) { - // If the output_length was passed we must have enough room. - DCHECK_EQ(*output_length, 0); - - if (*output_length != 0) { - return Status::InternalError("Too small buffer passed to BzipCompressor"); - } - - _out_buffer = NULL; - } - } - - if (ret != BZ_OK) { - std::stringstream ss; - ss << "bzlib BZ2_bzBuffToBuffCompressor failed: " << ret; - return Status::InternalError(ss.str()); - - } - - *output = _out_buffer; - *output_length = outlen; - _memory_pool->acquire_data(&_temp_memory_pool, false); - return Status::OK(); -} - -// Currently this is only use for testing of the decompressor. -SnappyBlockCompressor::SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer) : - Codec(mem_pool, reuse_buffer) { -} - -Status SnappyBlockCompressor::process_block( - int input_length, - uint8_t* input, - int* output_length, - uint8_t** output) { - - // Hadoop uses a block compression scheme on top of snappy. First there is - // an integer which is the size of the decompressed data followed by a - // sequence of compressed blocks each preceded with an integer size. - // For testing purposes we are going to generate two blocks. - int block_size = input_length / 2; - size_t length = snappy::MaxCompressedLength(block_size) * 2; - length += 3 * sizeof(int32_t); - DCHECK(*output_length == 0 || length <= *output_length); - - // If length is non-zero then the output has been allocated. - if (*output_length != 0) { - _buffer_length = *output_length; - _out_buffer = *output; - } else if (!_reuse_buffer || _out_buffer == NULL || _buffer_length < length) { - _buffer_length = length; - _out_buffer = _memory_pool->allocate(_buffer_length); - } - - uint8_t* outp = _out_buffer; - uint8_t* sizep = NULL; - ReadWriteUtil::put_int(outp, input_length); - outp += sizeof(int32_t); - - do { - // Point at the spot to store the compressed size. - sizep = outp; - outp += sizeof(int32_t); - size_t size = 0; - snappy::RawCompress(reinterpret_cast(input), - static_cast(block_size), reinterpret_cast(outp), &size); - - ReadWriteUtil::put_int(sizep, size); - input += block_size; - input_length -= block_size; - outp += size; - } while (input_length > 0); - - *output = _out_buffer; - *output_length = outp - _out_buffer; - return Status::OK(); -} - -SnappyCompressor::SnappyCompressor(MemPool* mem_pool, bool reuse_buffer) : - Codec(mem_pool, reuse_buffer) { -} - -int SnappyCompressor::max_compressed_len(int input_length) { - return snappy::MaxCompressedLength(input_length); -} - -Status SnappyCompressor::compress(int input_len, uint8_t* input, - int* output_len, uint8_t* output) { - DCHECK_GE(*output_len, max_compressed_len(input_len)); - size_t out_len = 0; - snappy::RawCompress(reinterpret_cast(input), - static_cast(input_len), - reinterpret_cast(output), &out_len); - *output_len = out_len; - return Status::OK(); -} - -Status SnappyCompressor::process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output) { - int max_compressed_len = this->max_compressed_len(input_length); - - if (*output_length != 0 && *output_length < max_compressed_len) { - return Status::InternalError("process_block: output length too small"); - } - - if (*output_length != 0) { - _buffer_length = *output_length; - _out_buffer = *output; - } else if (!_reuse_buffer || - _out_buffer == NULL || _buffer_length < max_compressed_len) { - DCHECK(_memory_pool != NULL) << "Can't allocate without passing in a mem pool"; - _buffer_length = max_compressed_len; - _out_buffer = _memory_pool->allocate(_buffer_length); - *output = _out_buffer; - *output_length = max_compressed_len; - } - - return compress(input_length, input, output_length, _out_buffer); -} - -} diff --git a/be/src/util/compress.h b/be/src/util/compress.h deleted file mode 100644 index 5a1527eab7..0000000000 --- a/be/src/util/compress.h +++ /dev/null @@ -1,133 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_COMMON_UTIL_COMPRESS_H -#define DORIS_BE_SRC_COMMON_UTIL_COMPRESS_H - -// We need zlib.h here to declare _stream below. -#include - -#include "util/codec.h" -#include "runtime/mem_pool.h" - -namespace doris { - -// Different compression classes. The classes all expose the same API and -// abstracts the underlying calls to the compression libraries. -// TODO: reconsider the abstracted API - -class GzipCompressor : public Codec { -public: - // Compression formats supported by the zlib library - enum Format { - ZLIB, - DEFLATE, - GZIP, - }; - - // If gzip is set then we create gzip otherwise lzip. - GzipCompressor(Format format, MemPool* mem_pool, bool reuse_buffer); - - virtual ~GzipCompressor(); - - // Returns an upper bound on the max compressed length. - int max_compressed_len(int input_len); - - // Compresses 'input' into 'output'. Output must be preallocated and - // at least big enough. - // *output_length should be called with the length of the output buffer and on return - // is the length of the output. - Status compress(int input_length, uint8_t* input, - int* output_length, uint8_t* output); - - // Process a block of data. - virtual Status process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output); - -protected: - // Initialize the compressor. - virtual Status init(); - -private: - Format _format; - - // Structure used to communicate with the library. - z_stream _stream; - - // These are magic numbers from zlib.h. Not clear why they are not defined there. - const static int WINDOW_BITS = 15; // Maximum window size - const static int GZIP_CODEC = 16; // Output Gzip. -}; - -class BzipCompressor : public Codec { -public: - BzipCompressor(MemPool* mem_pool, bool reuse_buffer); - virtual ~BzipCompressor() { } - - // Process a block of data. - virtual Status process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output); - // Initialize the compressor. - virtual Status init() { - return Status::OK(); - } -}; - -class SnappyBlockCompressor : public Codec { -public: - SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer); - virtual ~SnappyBlockCompressor() { } - - // Process a block of data. - virtual Status process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output); - -protected: - // Snappy does not need initialization - virtual Status init() { - return Status::OK(); - } -}; - -class SnappyCompressor : public Codec { -public: - SnappyCompressor(MemPool* mem_pool, bool reuse_buffer); - virtual ~SnappyCompressor() { } - - // Process a block of data. - virtual Status process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output); - - // Returns an upper bound on the max compressed length. - int max_compressed_len(int input_len); - - // Compresses 'input' into 'output'. Output must be preallocated and - // at least big enough. - // *output_length should be called with the length of the output buffer and on return - // is the length of the output. - Status compress(int input_length, uint8_t* input, - int* output_length, uint8_t* output); - -protected: - // Snappy does not need initialization - virtual Status init() { - return Status::OK(); - } -}; - -} -#endif diff --git a/be/src/util/decompress.cpp b/be/src/util/decompress.cpp deleted file mode 100644 index 49b0600dc7..0000000000 --- a/be/src/util/decompress.cpp +++ /dev/null @@ -1,392 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include "util/decompress.h" -#include "exec/read_write_util.h" -#include "runtime/runtime_state.h" -#include "gen_cpp/Descriptors_types.h" - -// Codec libraries -#include -#include -#include - -namespace doris { - -GzipDecompressor::GzipDecompressor(MemPool* mem_pool, bool reuse_buffer, bool is_deflate) : - Codec(mem_pool, reuse_buffer), - _is_deflate(is_deflate) { - bzero(&_stream, sizeof(_stream)); -} - -GzipDecompressor::~GzipDecompressor() { - (void)inflateEnd(&_stream); -} - -Status GzipDecompressor::init() { - int ret = 0; - // Initialize to run either deflate or zlib/gzip format - int window_bits = _is_deflate ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC; - - if ((ret = inflateInit2(&_stream, window_bits)) != Z_OK) { - return Status::InternalError("zlib inflateInit failed: " + std::string(_stream.msg)); - } - - return Status::OK(); -} - -Status GzipDecompressor::process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output) { - bool use_temp = false; - - // If length is set then the output has been allocated. - if (*output_length != 0) { - _buffer_length = *output_length; - _out_buffer = *output; - } else if (!_reuse_buffer || _out_buffer == NULL) { - // guess that we will need 2x the input length. - _buffer_length = input_length * 2; - - if (_buffer_length > MAX_BLOCK_SIZE) { - return Status::InternalError("Decompressor: block size is too big"); - } - - _out_buffer = _temp_memory_pool.allocate(_buffer_length); - use_temp = true; - } - - int ret = 0; - - while (ret != Z_STREAM_END) { - _stream.next_in = reinterpret_cast(input); - _stream.avail_in = input_length; - _stream.next_out = reinterpret_cast(_out_buffer); - _stream.avail_out = _buffer_length; - - ret = inflate(&_stream, 1); - - if (ret != Z_STREAM_END) { - if (ret == Z_OK) { - // Not enough output space. - DCHECK_EQ(*output_length, 0); - - if (*output_length != 0) { - return Status::InternalError("Too small a buffer passed to GzipDecompressor"); - } - - _temp_memory_pool.clear(); - _buffer_length *= 2; - - if (_buffer_length > MAX_BLOCK_SIZE) { - return Status::InternalError("Decompressor: block size is too big"); - } - - _out_buffer = _temp_memory_pool.allocate(_buffer_length); - - if (inflateReset(&_stream) != Z_OK) { - return Status::InternalError("zlib inflateEnd failed: " + std::string(_stream.msg)); - } - - continue; - } - - return Status::InternalError("zlib inflate failed: " + std::string(_stream.msg)); - } - } - - if (inflateReset(&_stream) != Z_OK) { - return Status::InternalError("zlib inflateEnd failed: " + std::string(_stream.msg)); - } - - *output = _out_buffer; - - // _stream.avail_out is the number of bytes *left* in the out buffer, but - // we're interested in the number of bytes used. - if (*output_length == 0) { - *output_length = _buffer_length - _stream.avail_out; - } - - if (use_temp) { - _memory_pool->acquire_data(&_temp_memory_pool, _reuse_buffer); - } - - return Status::OK(); -} - -BzipDecompressor::BzipDecompressor(MemPool* mem_pool, bool reuse_buffer) : - Codec(mem_pool, reuse_buffer) { -} - -Status BzipDecompressor::process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output) { - bool use_temp = false; - - // If length is set then the output has been allocated. - if (*output_length != 0) { - _buffer_length = *output_length; - _out_buffer = *output; - } else if (!_reuse_buffer || _out_buffer == NULL) { - // guess that we will need 2x the input length. - _buffer_length = input_length * 2; - - if (_buffer_length > MAX_BLOCK_SIZE) { - return Status::InternalError("Decompressor: block size is too big"); - } - - _out_buffer = _temp_memory_pool.allocate(_buffer_length); - use_temp = true; - } - - int ret = BZ_OUTBUFF_FULL; - unsigned int outlen = 0; - - while (ret == BZ_OUTBUFF_FULL) { - if (_out_buffer == NULL) { - DCHECK_EQ(*output_length, 0); - _temp_memory_pool.clear(); - _buffer_length = _buffer_length * 2; - - if (_buffer_length > MAX_BLOCK_SIZE) { - return Status::InternalError("Decompressor: block size is too big"); - } - - _out_buffer = _temp_memory_pool.allocate(_buffer_length); - } - - outlen = static_cast(_buffer_length); - - if ((ret = BZ2_bzBuffToBuffDecompress( - reinterpret_cast(_out_buffer), &outlen, - reinterpret_cast(input), - static_cast(input_length), 0, 0)) == BZ_OUTBUFF_FULL) { - // If the output_length was passed we must have enough room. - DCHECK_EQ(*output_length, 0); - - if (*output_length != 0) { - return Status::InternalError("Too small a buffer passed to BzipDecompressor"); - } - - _out_buffer = NULL; - } - } - - if (ret != BZ_OK) { - std::stringstream ss; - ss << "bzlib BZ2_bzBuffToBuffDecompressor failed: " << ret; - return Status::InternalError(ss.str()); - - } - - *output = _out_buffer; - - if (*output_length == 0) { - *output_length = outlen; - } - - if (use_temp) { - _memory_pool->acquire_data(&_temp_memory_pool, _reuse_buffer); - } - - return Status::OK(); -} - -SnappyDecompressor::SnappyDecompressor(MemPool* mem_pool, bool reuse_buffer) - : Codec(mem_pool, reuse_buffer) { -} - -Status SnappyDecompressor::process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output) { - // If length is set then the output has been allocated. - size_t uncompressed_length = 0; - - if (*output_length != 0) { - _buffer_length = *output_length; - _out_buffer = *output; - } else { - // Snappy saves the uncompressed length so we never have to retry. - if (!snappy::GetUncompressedLength(reinterpret_cast(input), - input_length, &uncompressed_length)) { - return Status::InternalError("Snappy: GetUncompressedLength failed"); - } - - if (!_reuse_buffer || _out_buffer == NULL || _buffer_length < uncompressed_length) { - _buffer_length = uncompressed_length; - - if (_buffer_length > MAX_BLOCK_SIZE) { - return Status::InternalError("Decompressor: block size is too big"); - } - - _out_buffer = _memory_pool->allocate(_buffer_length); - } - } - - if (!snappy::RawUncompress( - reinterpret_cast(input), - static_cast(input_length), reinterpret_cast(_out_buffer))) { - return Status::InternalError("Snappy: RawUncompress failed"); - } - - if (*output_length == 0) { - *output_length = uncompressed_length; - *output = _out_buffer; - } - - return Status::OK(); -} - -SnappyBlockDecompressor::SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer) : - Codec(mem_pool, reuse_buffer) { -} - -// Hadoop uses a block compression scheme on top of snappy. As per the hadoop docs -// the input is split into blocks. Each block "contains the uncompressed length for -// the block followed by one of more length-prefixed blocks of compressed data." -// This is essentially blocks of blocks. -// The outer block consists of: -// - 4 byte little endian uncompressed_size -// < inner blocks > -// ... repeated until input_len is consumed .. -// The inner blocks have: -// - 4-byte little endian compressed_size -// < snappy compressed block > -// - 4-byte little endian compressed_size -// < snappy compressed block > -// ... repeated until uncompressed_size from outer block is consumed ... - -// Utility function to decompress snappy block compressed data. If size_only is true, -// this function does not decompress but only computes the output size and writes -// the result to *output_len. -// If size_only is false, output must be preallocated to output_len and this needs to -// be exactly big enough to hold the decompressed output. -// size_only is a O(1) operations (just reads a single varint for each snappy block). -static Status snappy_block_decompress(int input_len, uint8_t* input, bool size_only, - int* output_len, char* output) { - - int uncompressed_total_len = 0; - - while (input_len > 0) { - size_t uncompressed_block_len = ReadWriteUtil::get_int(input); - input += sizeof(int32_t); - input_len -= sizeof(int32_t); - - if (uncompressed_block_len > Codec::MAX_BLOCK_SIZE || uncompressed_block_len == 0) { - if (uncompressed_total_len == 0) { - // TODO: is this check really robust? - std::stringstream ss; - ss << "Decompressor: block size is too big. Data is likely corrupt. " - << "Size: " << uncompressed_block_len; - return Status::InternalError(ss.str()); - } - - break; - } - - if (!size_only) { - int remaining_output_size = *output_len - uncompressed_total_len; - DCHECK_GE(remaining_output_size, uncompressed_block_len); - } - - while (uncompressed_block_len > 0) { - // Read the length of the next snappy compressed block. - size_t compressed_len = ReadWriteUtil::get_int(input); - input += sizeof(int32_t); - input_len -= sizeof(int32_t); - - if (compressed_len == 0 || compressed_len > input_len) { - if (uncompressed_total_len == 0) { - return Status::InternalError( - "Decompressor: invalid compressed length. Data is likely corrupt."); - } - - input_len = 0; - break; - } - - // Read how big the output will be. - size_t uncompressed_len = 0; - - if (!snappy::GetUncompressedLength(reinterpret_cast(input), - input_len, &uncompressed_len)) { - if (uncompressed_total_len == 0) { - return Status::InternalError("Snappy: GetUncompressedLength failed"); - } - - input_len = 0; - break; - } - - DCHECK_GT(uncompressed_len, 0); - - if (!size_only) { - // Decompress this snappy block - if (!snappy::RawUncompress(reinterpret_cast(input), - compressed_len, output)) { - return Status::InternalError("Snappy: RawUncompress failed"); - } - - output += uncompressed_len; - } - - input += compressed_len; - input_len -= compressed_len; - uncompressed_block_len -= uncompressed_len; - uncompressed_total_len += uncompressed_len; - } - } - - if (size_only) { - *output_len = uncompressed_total_len; - } else if (*output_len != uncompressed_total_len) { - return Status::InternalError("Snappy: Decompressed size is not correct."); - } - - return Status::OK(); -} - -Status SnappyBlockDecompressor::process_block(int input_len, uint8_t* input, - int* output_len, uint8_t** output) { - if (*output_len == 0) { - // If we don't know the size beforehand, compute it. - RETURN_IF_ERROR(snappy_block_decompress(input_len, input, true, output_len, NULL)); - DCHECK_NE(*output_len, 0); - - if (!_reuse_buffer || _out_buffer == NULL || _buffer_length < *output_len) { - // Need to allocate a new buffer - _buffer_length = *output_len; - _out_buffer = _memory_pool->allocate(_buffer_length); - } - - *output = _out_buffer; - } - - DCHECK(*output != NULL); - - if (*output_len > MAX_BLOCK_SIZE) { - // TODO: is this check really robust? - std::stringstream ss; - ss << "Decompressor: block size is too big. Data is likely corrupt. " - << "Size: " << *output_len; - return Status::InternalError(ss.str()); - } - - char* out_ptr = reinterpret_cast(*output); - RETURN_IF_ERROR(snappy_block_decompress(input_len, input, false, output_len, out_ptr)); - return Status::OK(); -} - -} diff --git a/be/src/util/decompress.h b/be/src/util/decompress.h deleted file mode 100644 index 03514d0758..0000000000 --- a/be/src/util/decompress.h +++ /dev/null @@ -1,102 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_COMMON_UTIL_DECOMPRESS_H -#define DORIS_BE_SRC_COMMON_UTIL_DECOMPRESS_H - -// We need zlib.h here to declare _stream below. -#include - -#include "util/codec.h" -#include "runtime/mem_pool.h" - -namespace doris { - -class GzipDecompressor : public Codec { -public: - GzipDecompressor(MemPool* mem_pool, bool reuse_buffer, bool is_deflate); - virtual ~GzipDecompressor(); - - // Process a block of data. - virtual Status process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output); - -protected: - // Initialize the decompressor. - virtual Status init(); - -private: - // If set assume deflate format, otherwise zlib or gzip - bool _is_deflate; - - z_stream _stream; - - // These are magic numbers from zlib.h. Not clear why they are not defined there. - const static int WINDOW_BITS = 15; // Maximum window size - const static int DETECT_CODEC = 32; // Determine if this is libz or gzip from header. -}; - -class BzipDecompressor : public Codec { -public: - BzipDecompressor(MemPool* mem_pool, bool reuse_buffer); - virtual ~BzipDecompressor() { } - - // Process a block of data. - virtual Status process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output); -protected: - // Bzip does not need initialization - virtual Status init() { - return Status::OK(); - } -}; - -class SnappyDecompressor : public Codec { -public: - SnappyDecompressor(MemPool* mem_pool, bool reuse_buffer); - virtual ~SnappyDecompressor() { } - - // Process a block of data. - virtual Status process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output); - -protected: - // Snappy does not need initialization - virtual Status init() { - return Status::OK(); - } - -}; - -class SnappyBlockDecompressor : public Codec { -public: - SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer); - virtual ~SnappyBlockDecompressor() { } - - //Process a block of data. - virtual Status process_block(int input_length, uint8_t* input, - int* output_length, uint8_t** output); - -protected: - // Snappy does not need initialization - virtual Status init() { - return Status::OK(); - } -}; - -} -#endif diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index a5457227f3..9a2c6aacf9 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -24,7 +24,6 @@ #include #include "common/object_pool.h" -#include "util/compress.h" #include "util/cpu_info.h" #include "util/debug_util.h" #include "util/pretty_printer.h" diff --git a/be/src/util/slice.h b/be/src/util/slice.h index 1eebe3a5f2..d0485e3a5a 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -177,6 +178,14 @@ public: return memcmp(a, b, n); } + static size_t compute_total_size(const std::vector& slices) { + size_t total_size = 0; + for (auto& slice : slices) { + total_size += slice.size; + } + return total_size; + } + }; /// Check whether two slices are identical. diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 36cdc5b72d..4d37abbc1c 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -43,3 +43,4 @@ ADD_BE_TEST(bitmap_test) ADD_BE_TEST(faststring_test) ADD_BE_TEST(rle_encoding_test) ADD_BE_TEST(tdigest_test) +ADD_BE_TEST(block_compression_test) diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp new file mode 100644 index 0000000000..707892d85b --- /dev/null +++ b/be/test/util/block_compression_test.cpp @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/block_compression.h" + +#include +#include + +namespace doris { +class BlockCompressionTest : public testing::Test { +public: + BlockCompressionTest() { } + virtual ~BlockCompressionTest() { + } +}; + +static std::string generate_str(size_t len) { + static char charset[] = "0123456789" + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + std::string result; + result.resize(len); + for (int i = 0; i < len; ++i) { + result[i] = charset[rand() % sizeof(charset)]; + } + return result; +} + +void test_single_slice(segment_v2::CompressionTypePB type) { + BlockCompressionCodec* codec = nullptr; + auto st = get_block_compression_codec(type, &codec); + ASSERT_TRUE(st.ok()); + + size_t test_sizes[] = {0, 1, 10, 1000, 1000000}; + for (auto size : test_sizes) { + auto orig = generate_str(size); + size_t max_len = codec->max_compressed_len(size); + std::string compressed; + compressed.resize(max_len); + { + Slice compressed_slice(compressed); + st = codec->compress(orig, &compressed_slice); + ASSERT_TRUE(st.ok()); + + std::string uncompressed; + uncompressed.resize(size); + { + Slice uncompressed_slice(uncompressed); + st = codec->decompress(compressed_slice, &uncompressed_slice); + ASSERT_TRUE(st.ok()); + + ASSERT_STREQ(orig.c_str(), uncompressed.c_str()); + } + // buffer not enough for decompress + // snappy has no return value if given buffer is not enough + // NOTE: For ZLIB, we even get OK with a insufficient output + // when uncompressed size is 1 + if ((type == segment_v2::CompressionTypePB::ZLIB && uncompressed.size() > 1) && + type != segment_v2::CompressionTypePB::SNAPPY && + uncompressed.size() > 0) { + + Slice uncompressed_slice(uncompressed); + uncompressed_slice.size -= 1; + st = codec->decompress(compressed_slice, &uncompressed_slice); + ASSERT_FALSE(st.ok()); + } + // corrupt compressed data + if (type != segment_v2::CompressionTypePB::SNAPPY) { + Slice uncompressed_slice(uncompressed); + compressed_slice.size -= 1; + st = codec->decompress(compressed_slice, &uncompressed_slice); + ASSERT_FALSE(st.ok()); + compressed_slice.size += 1; + } + } + // buffer not enough for compress + if (type != segment_v2::CompressionTypePB::SNAPPY && size > 0) { + Slice compressed_slice(compressed); + compressed_slice.size = 1; + st = codec->compress(orig, &compressed_slice); + ASSERT_FALSE(st.ok()); + } + } +} + +TEST_F(BlockCompressionTest, single) { + test_single_slice(segment_v2::CompressionTypePB::SNAPPY); + test_single_slice(segment_v2::CompressionTypePB::ZLIB); + test_single_slice(segment_v2::CompressionTypePB::LZ4); + test_single_slice(segment_v2::CompressionTypePB::LZ4F); +} + +void test_multi_slices(segment_v2::CompressionTypePB type) { + BlockCompressionCodec* codec = nullptr; + auto st = get_block_compression_codec(type, &codec); + ASSERT_TRUE(st.ok()); + + size_t test_sizes[] = {0, 1, 10, 1000, 1000000}; + std::vector orig_strs; + for (auto size : test_sizes) { + orig_strs.emplace_back(generate_str(size)); + } + std::vector orig_slices; + std::string orig; + for (auto& str : orig_strs) { + orig_slices.emplace_back(str); + orig.append(str); + } + + size_t total_size = orig.size(); + size_t max_len = codec->max_compressed_len(total_size); + + std::string compressed; + compressed.resize(max_len); + { + Slice compressed_slice(compressed); + st = codec->compress(orig, &compressed_slice); + ASSERT_TRUE(st.ok()); + + std::string uncompressed; + uncompressed.resize(total_size); + // normal case + { + Slice uncompressed_slice(uncompressed); + st = codec->decompress(compressed_slice, &uncompressed_slice); + ASSERT_TRUE(st.ok()); + + ASSERT_STREQ(orig.c_str(), uncompressed.c_str()); + } + } + + // buffer not enough failed + if (type != segment_v2::CompressionTypePB::SNAPPY) { + Slice compressed_slice(compressed); + compressed_slice.size = 10; + st = codec->compress(orig, &compressed_slice); + ASSERT_FALSE(st.ok()); + } +} + +TEST_F(BlockCompressionTest, multi) { + test_multi_slices(segment_v2::CompressionTypePB::SNAPPY); + test_multi_slices(segment_v2::CompressionTypePB::ZLIB); + test_multi_slices(segment_v2::CompressionTypePB::LZ4); + test_multi_slices(segment_v2::CompressionTypePB::LZ4F); +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index edc7b23ff8..8153b64daa 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -61,8 +61,9 @@ enum CompressionTypePB { NO_COMPRESSION = 2; SNAPPY = 3; LZ4 = 4; - ZLIB = 5; - ZSTB = 6; + LZ4F = 5; + ZLIB = 6; + ZSTD = 7; } message ZoneMapPB { @@ -117,7 +118,7 @@ message FileFooterPB { optional uint64 data_footprint = 5; // total data footprint of all columns optional uint64 raw_data_footprint = 6; // raw data footprint - optional CompressionTypePB compress_type = 7 [default = ZSTB]; // default compression type for file columns + optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns repeated MetadataPairPB file_meta_datas = 8; // meta data of file optional PagePointerPB key_index_page = 9; // short key index page } @@ -147,7 +148,7 @@ message SegmentFooterPB { optional uint64 data_footprint = 5; // total data footprint of all columns optional uint64 raw_data_footprint = 6; // raw data footprint - optional CompressionTypePB compress_type = 7 [default = LZ4]; // default compression type for file columns + optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns repeated MetadataPairPB file_meta_datas = 8; // meta data of file // Short key index's page diff --git a/run-ut.sh b/run-ut.sh index 646b5556b0..05e2e355a4 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -154,6 +154,7 @@ ${DORIS_TEST_BINARY_DIR}/util/string_util_test ${DORIS_TEST_BINARY_DIR}/util/coding_test ${DORIS_TEST_BINARY_DIR}/util/faststring_test ${DORIS_TEST_BINARY_DIR}/util/tdigest_test +${DORIS_TEST_BINARY_DIR}/util/block_compression_test # Running common Unittest ${DORIS_TEST_BINARY_DIR}/common/resource_tls_test