Add block compression codec and remove not used codec (#1622)

This commit is contained in:
ZHAO Chun
2019-08-12 20:47:16 +08:00
committed by GitHub
parent af8256be2a
commit c0253a17fc
15 changed files with 638 additions and 1269 deletions

View File

@ -25,14 +25,12 @@ set(UTIL_FILES
arena.cpp
bfd_parser.cpp
bitmap.cpp
codec.cpp
block_compression.cpp
coding.cpp
compress.cpp
cpu_info.cpp
date_func.cpp
dynamic_util.cpp
debug_util.cpp
decompress.cpp
disk_info.cpp
errno.cpp
hash_util.hpp

View File

@ -0,0 +1,387 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/block_compression.h"
#include <lz4/lz4.h>
#include <lz4/lz4frame.h>
#include <snappy/snappy-sinksource.h>
#include <snappy/snappy.h>
#include <zlib.h>
#include "util/faststring.h"
#include "gutil/strings/substitute.h"
namespace doris {
using strings::Substitute;
Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, Slice* output) const {
faststring buf;
// we compute total size to avoid more memory copy
size_t total_size = Slice::compute_total_size(inputs);
buf.reserve(total_size);
for (auto& input : inputs) {
buf.append(input.data, input.size);
}
return compress(buf, output);
}
class Lz4BlockCompression : public BlockCompressionCodec {
public:
static Lz4BlockCompression* instance() {
static Lz4BlockCompression s_instance;
return &s_instance;
}
~Lz4BlockCompression() override { }
Status compress(const Slice& input, Slice* output) const override {
auto compressed_len =
LZ4_compress_default(input.data, output->data, input.size, output->size);
if (compressed_len == 0) {
return Status::InvalidArgument(
Substitute("Output buffer's capacity is not enough, size=$0", output->size));
}
output->size = compressed_len;
return Status::OK();
}
Status decompress(const Slice& input, Slice* output) const override {
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size, output->size);
if (decompressed_len < 0) {
return Status::InvalidArgument(
Substitute("fail to do LZ4 decompress, error=$0", decompressed_len));
}
output->size = decompressed_len;
return Status::OK();
}
size_t max_compressed_len(size_t len) const override {
return LZ4_compressBound(len);
}
};
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
class Lz4fBlockCompression : public BlockCompressionCodec {
public:
static Lz4fBlockCompression* instance() {
static Lz4fBlockCompression s_instance;
return &s_instance;
}
~Lz4fBlockCompression() override { }
Status compress(const Slice& input, Slice* output) const override {
auto compressed_len =
LZ4F_compressFrame(output->data, output->size, input.data, input.size, &_s_preferences);
if (LZ4F_isError(compressed_len)) {
return Status::InvalidArgument(
Substitute("Fail to do LZ4F compress frame, msg=$0",
LZ4F_getErrorName(compressed_len)));
}
output->size = compressed_len;
return Status::OK();
}
Status compress(const std::vector<Slice>& inputs, Slice* output) const override {
LZ4F_compressionContext_t ctx = nullptr;
auto lres = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (lres != 0) {
return Status::InvalidArgument(
Substitute("Fail to do LZ4F compress, res=$0", LZ4F_getErrorName(lres)));
}
auto st = _compress(ctx, inputs, output);
LZ4F_freeCompressionContext(ctx);
return st;
}
Status decompress(const Slice& input, Slice* output) const override {
LZ4F_decompressionContext_t ctx;
auto lres = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(lres)) {
return Status::InvalidArgument(
Substitute("Fail to do LZ4F decompress, res=$0", LZ4F_getErrorName(lres)));
}
auto st = _decompress(ctx, input, output);
LZ4F_freeDecompressionContext(ctx);
return st;
}
size_t max_compressed_len(size_t len) const override {
return std::max(LZ4F_compressBound(len, &_s_preferences),
LZ4F_compressFrameBound(len, &_s_preferences));
}
private:
Status _compress(LZ4F_compressionContext_t ctx,
const std::vector<Slice>& inputs, Slice* output) const {
auto wbytes = LZ4F_compressBegin(ctx, output->data, output->size, &_s_preferences);
if (LZ4F_isError(wbytes)) {
return Status::InvalidArgument(
Substitute("Fail to do LZ4F compress begin, res=$0", LZ4F_getErrorName(wbytes)));
}
size_t offset = wbytes;
for (auto input : inputs) {
wbytes = LZ4F_compressUpdate(ctx,
output->data + offset, output->size - offset,
input.data, input.size,
nullptr);
if (LZ4F_isError(wbytes)) {
return Status::InvalidArgument(
Substitute("Fail to do LZ4F compress update, res=$0", LZ4F_getErrorName(wbytes)));
}
offset += wbytes;
}
wbytes = LZ4F_compressEnd(ctx,
output->data + offset, output->size - offset,
nullptr);
if (LZ4F_isError(wbytes)) {
return Status::InvalidArgument(
Substitute("Fail to do LZ4F compress end, res=$0", LZ4F_getErrorName(wbytes)));
}
offset += wbytes;
output->size = wbytes;
return Status::OK();
}
Status _decompress(LZ4F_decompressionContext_t ctx,
const Slice& input, Slice* output) const {
size_t input_size = input.size;
auto lres =
LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr);
if (LZ4F_isError(lres)) {
return Status::InvalidArgument(
Substitute("Fail to do LZ4F decompress, res=$0", LZ4F_getErrorName(lres)));
} else if (input_size != input.size) {
return Status::InvalidArgument(
Substitute("Fail to do LZ4F decompress: trailing data left in compressed data, read=$0 vs given=$1",
input_size, input.size));
} else if (lres != 0) {
return Status::InvalidArgument(
Substitute("Fail to do LZ4F decompress: expect more compressed data, expect=$0", lres));
}
return Status::OK();
}
private:
static LZ4F_preferences_t _s_preferences;
};
LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
{
LZ4F_max256KB, LZ4F_blockLinked,
LZ4F_noContentChecksum, LZ4F_frame,
0, // unknown content size,
{0, 0} // reserved, must be set to 0
},
0, // compression level; 0 == default
0, // autoflush
{ 0, 0, 0, 0 }, // reserved, must be set to 0
};
class SnappySlicesSource : public snappy::Source {
public:
SnappySlicesSource(const std::vector<Slice>& slices) : _available(0), _cur_slice(0), _slice_off(0) {
for (auto& slice : slices) {
// We filter empty slice here to avoid complicated process
if (slice.size == 0) {
continue;
}
_available += slice.size;
_slices.push_back(slice);
}
}
~SnappySlicesSource() override { }
// Return the number of bytes left to read from the source
size_t Available() const override { return _available; }
// Peek at the next flat region of the source. Does not reposition
// the source. The returned region is empty iff Available()==0.
//
// Returns a pointer to the beginning of the region and store its
// length in *len.
//
// The returned region is valid until the next call to Skip() or
// until this object is destroyed, whichever occurs first.
//
// The returned region may be larger than Available() (for example
// if this ByteSource is a view on a substring of a larger source).
// The caller is responsible for ensuring that it only reads the
// Available() bytes.
const char* Peek(size_t* len) override {
if (_available == 0) {
*len = 0;
return nullptr;
}
// we should assure that *len is not 0
*len = _slices[_cur_slice].size - _slice_off;
DCHECK(*len != 0);
return _slices[_cur_slice].data;
}
// Skip the next n bytes. Invalidates any buffer returned by
// a previous call to Peek().
// REQUIRES: Available() >= n
void Skip(size_t n) override {
_available -= n;
do {
auto left = _slices[_cur_slice].size - _slice_off;
if (left > n) {
// n can be digest in current slice
_slice_off += n;
return;
}
_slice_off = 0;
_cur_slice++;
n -= left;
} while (n > 0);
}
private:
std::vector<Slice> _slices;
size_t _available;
size_t _cur_slice;
size_t _slice_off;
};
class SnappyBlockCompression : public BlockCompressionCodec {
public:
static SnappyBlockCompression* instance() {
static SnappyBlockCompression s_instance;
return &s_instance;
}
~SnappyBlockCompression() override { }
Status compress(const Slice& input, Slice* output) const override {
snappy::RawCompress(input.data, input.size, output->data, &output->size);
return Status::OK();
}
Status decompress(const Slice& input, Slice* output) const override {
if (!snappy::RawUncompress(input.data, input.size, output->data)) {
return Status::InvalidArgument("Fail to do Snappy decompress");
}
// NOTE: GetUncompressedLength only takes O(1) time
snappy::GetUncompressedLength(input.data, input.size, &output->size);
return Status::OK();
}
Status compress(const std::vector<Slice>& inputs, Slice* output) const override {
SnappySlicesSource source(inputs);
snappy::UncheckedByteArraySink sink(output->data);
output->size = snappy::Compress(&source, &sink);
return Status::OK();
}
size_t max_compressed_len(size_t len) const override {
return snappy::MaxCompressedLength(len);
}
};
class ZlibBlockCompression : public BlockCompressionCodec {
public:
static ZlibBlockCompression* instance() {
static ZlibBlockCompression s_instance;
return &s_instance;
}
~ZlibBlockCompression() { }
Status compress(const Slice& input, Slice* output) const override {
auto zres = ::compress((Bytef*)output->data, &output->size, (Bytef*)input.data, input.size);
if (zres != Z_OK) {
return Status::InvalidArgument(
Substitute("Fail to do ZLib compress, error=$0", zError(zres)));
}
return Status::OK();
}
Status compress(const std::vector<Slice>& inputs, Slice* output) const override {
z_stream zstrm;
zstrm.zalloc = Z_NULL;
zstrm.zfree = Z_NULL;
zstrm.opaque = Z_NULL;
auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION);
if (zres != Z_OK) {
return Status::InvalidArgument(
Substitute("Fail to do ZLib stream compress, error=$0", zError(zres)));
}
// we assume that output is e
zstrm.next_out = (Bytef*)output->data;
zstrm.avail_out = output->size;
for (int i = 0; i < inputs.size(); ++i) {
if (inputs[i].size == 0) {
continue;
}
zstrm.next_in = (Bytef*)inputs[i].data;
zstrm.avail_in = inputs[i].size;
int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
zres = deflate(&zstrm, flush);
if (zres != Z_OK || zres != Z_STREAM_END) {
return Status::InvalidArgument(
Substitute("Fail to do ZLib stream compress, error=$0", zError(zres)));
}
}
output->size = zstrm.total_out;
return Status::OK();
}
Status decompress(const Slice& input, Slice* output) const override {
size_t input_size = input.size;
auto zres = ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size);
if (zres != Z_OK) {
return Status::InvalidArgument(
Substitute("Fail to do ZLib decompress, error=$0", zError(zres)));
}
return Status::OK();
}
size_t max_compressed_len(size_t len) const override {
// one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block
return len + 6 + 5 * ((len >> 14) + 1);
}
};
Status get_block_compression_codec(
segment_v2::CompressionTypePB type, BlockCompressionCodec** codec) {
switch (type) {
case segment_v2::CompressionTypePB::NO_COMPRESSION:
*codec = nullptr;
break;
case segment_v2::CompressionTypePB::SNAPPY:
*codec = SnappyBlockCompression::instance();
break;
case segment_v2::CompressionTypePB::LZ4:
*codec = Lz4BlockCompression::instance();
break;
case segment_v2::CompressionTypePB::LZ4F:
*codec = Lz4fBlockCompression::instance();
break;
case segment_v2::CompressionTypePB::ZLIB:
*codec = ZlibBlockCompression::instance();
break;
default:
return Status::NotFound(Substitute("unknown compression type($0)", type));
}
return Status::OK();
}
}

View File

@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <vector>
#include "common/status.h"
#include "gen_cpp/segment_v2.pb.h"
#include "util/slice.h"
namespace doris {
// This class is used to encapsulate Compression/Decompression algorithm.
// This class only used to compress a block data, which means all data
// should given when call compress or decompress. This class don't handle
// stream compression.
class BlockCompressionCodec {
public:
virtual ~BlockCompressionCodec() { }
// This function will compress input data into output.
// output should be preallocated, and its capacity must be large enough
// for compressed input, which can be get through max_compressed_len function.
// Size of compressed data will be set in output's size.
virtual Status compress(const Slice& input, Slice* output) const = 0;
// Default implementation will merge input list into a big buffer and call
// compress(Slice) to finish compression. If compression type support digesting
// slice one by one, it should reimplement this function.
virtual Status compress(const std::vector<Slice>& input, Slice* output) const;
// Decompress input data into output, output's capacity should be large enough
// for decompressed data.
// Size of decompressed data will be set in output's size.
virtual Status decompress(const Slice& input, Slice* output) const = 0;
// Returns an upper bound on the max compressed length.
virtual size_t max_compressed_len(size_t len) const = 0;
};
// Get a BlockCompressionCodec through type.
// Return Status::OK if a valid codec is found. If codec is null, it means it is
// NO_COMPRESSION. If codec is not null, user can use it to compress/decompress
// data. And client doesn't have to release the codec.
//
// Return not OK, if error happens.
Status get_block_compression_codec(
segment_v2::CompressionTypePB type, BlockCompressionCodec** codec);
}

View File

@ -1,200 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/codec.h"
#include <boost/assign/list_of.hpp>
#include "util/compress.h"
#include "util/decompress.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/Descriptors_constants.h"
namespace doris {
const char* const Codec::DEFAULT_COMPRESSION =
"org.apache.hadoop.io.compress.DefaultCodec";
const char* const Codec::GZIP_COMPRESSION =
"org.apache.hadoop.io.compress.GzipCodec";
const char* const Codec::BZIP2_COMPRESSION =
"org.apache.hadoop.io.compress.BZip2Codec";
const char* const Codec::SNAPPY_COMPRESSION =
"org.apache.hadoop.io.compress.SnappyCodec";
const char* const UNKNOWN_CODEC_ERROR =
"This compression codec is currently unsupported: ";
const Codec::CodecMap Codec::CODEC_MAP = boost::assign::map_list_of
("", THdfsCompression::NONE)
(Codec::DEFAULT_COMPRESSION, THdfsCompression::DEFAULT)
(Codec::GZIP_COMPRESSION, THdfsCompression::GZIP)
(Codec::BZIP2_COMPRESSION, THdfsCompression::BZIP2)
(Codec::SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED);
std::string Codec::get_codec_name(THdfsCompression::type type) {
std::map<const std::string, THdfsCompression::type>::const_iterator im;
for (im = g_Descriptors_constants.COMPRESSION_MAP.begin();
im != g_Descriptors_constants.COMPRESSION_MAP.end(); ++im) {
if (im->second == type) {
return im->first;
}
}
DCHECK(im != g_Descriptors_constants.COMPRESSION_MAP.end());
return "INVALID";
}
Status Codec::create_compressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, const std::string& codec,
boost::scoped_ptr<Codec>* compressor) {
std::map<const std::string, const THdfsCompression::type>::const_iterator
type = CODEC_MAP.find(codec);
if (type == CODEC_MAP.end()) {
std::stringstream ss;
ss << UNKNOWN_CODEC_ERROR << codec;
return Status::InternalError(ss.str());
}
Codec* comp = NULL;
RETURN_IF_ERROR(
create_compressor(runtime_state, mem_pool, reuse, type->second, &comp));
compressor->reset(comp);
return Status::OK();
}
Status Codec::create_compressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, THdfsCompression::type format,
boost::scoped_ptr<Codec>* compressor) {
Codec* comp = NULL;
RETURN_IF_ERROR(
create_compressor(runtime_state, mem_pool, reuse, format, &comp));
compressor->reset(comp);
return Status::OK();
}
Status Codec::create_compressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, THdfsCompression::type format,
Codec** compressor) {
switch (format) {
case THdfsCompression::NONE:
*compressor = NULL;
return Status::OK();
case THdfsCompression::GZIP:
*compressor = new GzipCompressor(GzipCompressor::GZIP, mem_pool, reuse);
break;
case THdfsCompression::DEFAULT:
*compressor = new GzipCompressor(GzipCompressor::ZLIB, mem_pool, reuse);
break;
case THdfsCompression::DEFLATE:
*compressor = new GzipCompressor(GzipCompressor::DEFLATE, mem_pool, reuse);
break;
case THdfsCompression::BZIP2:
*compressor = new BzipCompressor(mem_pool, reuse);
break;
case THdfsCompression::SNAPPY_BLOCKED:
*compressor = new SnappyBlockCompressor(mem_pool, reuse);
break;
case THdfsCompression::SNAPPY:
*compressor = new SnappyCompressor(mem_pool, reuse);
break;
}
return (*compressor)->init();
}
Status Codec::create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, const std::string& codec,
boost::scoped_ptr<Codec>* decompressor) {
std::map<const std::string, const THdfsCompression::type>::const_iterator
type = CODEC_MAP.find(codec);
if (type == CODEC_MAP.end()) {
std::stringstream ss;
ss << UNKNOWN_CODEC_ERROR << codec;
return Status::InternalError(ss.str());
}
Codec* decom = NULL;
RETURN_IF_ERROR(
create_decompressor(runtime_state, mem_pool, reuse, type->second, &decom));
decompressor->reset(decom);
return Status::OK();
}
Status Codec::create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, THdfsCompression::type format,
boost::scoped_ptr<Codec>* decompressor) {
Codec* decom = NULL;
RETURN_IF_ERROR(
create_decompressor(runtime_state, mem_pool, reuse, format, &decom));
decompressor->reset(decom);
return Status::OK();
}
Status Codec::create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, THdfsCompression::type format,
Codec** decompressor) {
switch (format) {
case THdfsCompression::NONE:
*decompressor = NULL;
return Status::OK();
case THdfsCompression::DEFAULT:
case THdfsCompression::GZIP:
*decompressor = new GzipDecompressor(mem_pool, reuse, false);
break;
case THdfsCompression::DEFLATE:
*decompressor = new GzipDecompressor(mem_pool, reuse, true);
break;
case THdfsCompression::BZIP2:
*decompressor = new BzipDecompressor(mem_pool, reuse);
break;
case THdfsCompression::SNAPPY_BLOCKED:
*decompressor = new SnappyBlockDecompressor(mem_pool, reuse);
break;
case THdfsCompression::SNAPPY:
*decompressor = new SnappyDecompressor(mem_pool, reuse);
break;
}
return (*decompressor)->init();
}
Codec::Codec(MemPool* mem_pool, bool reuse_buffer) :
_memory_pool(mem_pool),
_temp_memory_pool(_memory_pool->mem_tracker()),
_reuse_buffer(reuse_buffer),
_out_buffer(NULL),
_buffer_length(0) {
}
}

View File

@ -1,163 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_SRC_COMMON_UTIL_CODEC_H
#define DORIS_BE_SRC_COMMON_UTIL_CODEC_H
#include <boost/scoped_ptr.hpp>
#include "common/status.h"
#include "runtime/mem_pool.h"
#include "util/runtime_profile.h"
#include "gen_cpp/Descriptors_types.h"
namespace doris {
class MemPool;
class RuntimeState;
// Create a compression object. This is the base class for all compression
// algorithms. A compression algorithm is either a compressor or a decompressor.
// To add a new algorithm, generally, both a compressor and a decompressor
// will be added. Each of these objects inherits from this class. The objects
// are instantiated in the Create static methods defined here. The type of
// compression is defined in the Thrift interface THdfsCompression.
class Codec {
public:
// These are the codec string representation used in Hadoop.
static const char* const DEFAULT_COMPRESSION;
static const char* const GZIP_COMPRESSION;
static const char* const BZIP2_COMPRESSION;
static const char* const SNAPPY_COMPRESSION;
// Map from codec string to compression format
typedef std::map<const std::string, const THdfsCompression::type> CodecMap;
static const CodecMap CODEC_MAP;
// Create a decompressor.
// Input:
// runtime_state: the current runtime state.
// mem_pool: the memory pool used to store the decompressed data.
// reuse: if true the allocated buffer can be reused.
// format: the type of decompressor to create.
// Output:
// decompressor: pointer to the decompressor class to use.
static Status create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, THdfsCompression::type format,
Codec** decompressor);
// Alternate creator: returns a scoped pointer.
static Status create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, THdfsCompression::type format,
boost::scoped_ptr<Codec>* decompressor);
// Alternate creator: takes a codec string and returns a scoped pointer.
static Status create_decompressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, const std::string& codec,
boost::scoped_ptr<Codec>* decompressor);
// Create the compressor.
// Input:
// runtime_state: the current runtime state.
// mem_pool: the memory pool used to store the compressed data.
// format: The type of compressor to create.
// reuse: if true the allocated buffer can be reused.
// Output:
// compressor: pointer to the compressor class to use.
static Status create_compressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, THdfsCompression::type format,
Codec** decompressor);
// Alternate creator: returns a scoped pointer.
static Status create_compressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, THdfsCompression::type format,
boost::scoped_ptr<Codec>* compressor);
// Alternate creator: takes a codec string and returns a scoped pointer.
// Input, as above except:
// codec: the string representing the codec of the current file.
static Status create_compressor(RuntimeState* runtime_state, MemPool* mem_pool,
bool reuse, const std::string& codec,
boost::scoped_ptr<Codec>* compressor);
virtual ~Codec() {}
// Process a block of data, either compressing or decompressing it.
// If *output_length is 0, the function will allocate from its mempool.
// If *output_length is non-zero, it should be the length of *output and must
// be exactly the size of the transformed output.
// Inputs:
// input_length: length of the data to process
// input: data to process
// In/Out:
// output_length: Length of the output, if known, 0 otherwise.
// Output:
// output: Pointer to processed data
// If this needs to allocate memory, a mempool must be passed into the c'tor.
virtual Status process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output) = 0;
// Return the name of a compression algorithm.
static std::string get_codec_name(THdfsCompression::type);
// Largest block we will compress/decompress: 2GB.
// We are dealing with compressed blocks that are never this big but we
// want to guard against a corrupt file that has the block length as some
// large number.
static const int MAX_BLOCK_SIZE = (2L * 1024 * 1024 * 1024) - 1;
protected:
// Create a compression operator
// Inputs:
// mem_pool: memory pool to allocate the output buffer, this implies that the
// caller is responsible for the memory allocated by the operator.
// reuse_buffer: if false always allocate a new buffer rather than reuse.
Codec(MemPool* mem_pool, bool reuse_buffer);
// Initialize the operation.
virtual Status init() = 0;
private:
friend class GzipCompressor;
friend class GzipDecompressor;
friend class BzipCompressor;
friend class BzipDecompressor;
friend class SnappyBlockCompressor;
friend class SnappyBlockDecompressor;
friend class SnappyCompressor;
friend class SnappyDecompressor;
// Pool to allocate the buffer to hold transformed data.
MemPool* _memory_pool;
// Temporary memory pool: in case we get the output size too small we can
// use this to free unused buffers.
MemPool _temp_memory_pool;
// Can we reuse the output buffer or do we need to allocate on each call?
bool _reuse_buffer;
// Buffer to hold transformed data.
// Either passed from the caller or allocated from _memory_pool.
uint8_t* _out_buffer;
// Length of the output buffer.
int _buffer_length;
};
}
#endif

View File

@ -1,271 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/compress.h"
#include "exec/read_write_util.h"
#include "runtime/runtime_state.h"
// Codec libraries
#include <zlib.h>
#include <bzlib.h>
#include <snappy/snappy.h>
namespace doris {
GzipCompressor::GzipCompressor(Format format, MemPool* mem_pool, bool reuse_buffer) :
Codec(mem_pool, reuse_buffer),
_format(format) {
bzero(&_stream, sizeof(_stream));
init();
}
GzipCompressor::~GzipCompressor() {
(void)deflateEnd(&_stream);
}
Status GzipCompressor::init() {
int ret = 0;
// Initialize to run specified format
int window_bits = WINDOW_BITS;
if (_format == DEFLATE) {
window_bits = -window_bits;
} else if (_format == GZIP) {
window_bits += GZIP_CODEC;
}
if ((ret = deflateInit2(&_stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
window_bits, 9, Z_DEFAULT_STRATEGY)) != Z_OK) {
return Status::InternalError("zlib deflateInit failed: " + std::string(_stream.msg));
}
return Status::OK();
}
int GzipCompressor::max_compressed_len(int input_length) {
return deflateBound(&_stream, input_length);
}
Status GzipCompressor::compress(
int input_length,
uint8_t* input,
int* output_length,
uint8_t* output) {
DCHECK_GE(*output_length, max_compressed_len(input_length));
_stream.next_in = reinterpret_cast<Bytef*>(input);
_stream.avail_in = input_length;
_stream.next_out = reinterpret_cast<Bytef*>(output);
_stream.avail_out = *output_length;
int ret = 0;
if ((ret = deflate(&_stream, Z_FINISH)) != Z_STREAM_END) {
std::stringstream ss;
ss << "zlib deflate failed: " << _stream.msg;
return Status::InternalError(ss.str());
}
*output_length = *output_length - _stream.avail_out;
if (deflateReset(&_stream) != Z_OK) {
return Status::InternalError("zlib deflateReset failed: " + std::string(_stream.msg));
}
return Status::OK();
}
Status GzipCompressor::process_block(
int input_length,
uint8_t* input,
int* output_length,
uint8_t** output) {
// If length is set then the output has been allocated.
if (*output_length != 0) {
_buffer_length = *output_length;
_out_buffer = *output;
} else {
int len = max_compressed_len(input_length);
if (!_reuse_buffer || _buffer_length < len || _out_buffer == NULL) {
DCHECK(_memory_pool != NULL) << "Can't allocate without passing in a mem pool";
_buffer_length = len;
_out_buffer = _memory_pool->allocate(_buffer_length);
*output_length = _buffer_length;
}
}
RETURN_IF_ERROR(compress(input_length, input, output_length, _out_buffer));
*output = _out_buffer;
return Status::OK();
}
BzipCompressor::BzipCompressor(MemPool* mem_pool, bool reuse_buffer) :
Codec(mem_pool, reuse_buffer) {
}
Status BzipCompressor::process_block(
int input_length,
uint8_t* input,
int* output_length,
uint8_t** output) {
// If length is set then the output has been allocated.
if (*output_length != 0) {
_buffer_length = *output_length;
_out_buffer = *output;
} else if (!_reuse_buffer || _out_buffer == NULL) {
// guess that we will need no more the input length.
_buffer_length = input_length;
_out_buffer = _temp_memory_pool.allocate(_buffer_length);
}
unsigned int outlen = 0;
int ret = BZ_OUTBUFF_FULL;
while (ret == BZ_OUTBUFF_FULL) {
if (_out_buffer == NULL) {
DCHECK_EQ(*output_length, 0);
_temp_memory_pool.clear();
_buffer_length = _buffer_length * 2;
_out_buffer = _temp_memory_pool.allocate(_buffer_length);
}
outlen = static_cast<unsigned int>(_buffer_length);
if ((ret = BZ2_bzBuffToBuffCompress(
reinterpret_cast<char*>(_out_buffer), &outlen,
reinterpret_cast<char*>(input),
static_cast<unsigned int>(input_length), 5, 2, 0)) == BZ_OUTBUFF_FULL) {
// If the output_length was passed we must have enough room.
DCHECK_EQ(*output_length, 0);
if (*output_length != 0) {
return Status::InternalError("Too small buffer passed to BzipCompressor");
}
_out_buffer = NULL;
}
}
if (ret != BZ_OK) {
std::stringstream ss;
ss << "bzlib BZ2_bzBuffToBuffCompressor failed: " << ret;
return Status::InternalError(ss.str());
}
*output = _out_buffer;
*output_length = outlen;
_memory_pool->acquire_data(&_temp_memory_pool, false);
return Status::OK();
}
// Currently this is only use for testing of the decompressor.
SnappyBlockCompressor::SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer) :
Codec(mem_pool, reuse_buffer) {
}
Status SnappyBlockCompressor::process_block(
int input_length,
uint8_t* input,
int* output_length,
uint8_t** output) {
// Hadoop uses a block compression scheme on top of snappy. First there is
// an integer which is the size of the decompressed data followed by a
// sequence of compressed blocks each preceded with an integer size.
// For testing purposes we are going to generate two blocks.
int block_size = input_length / 2;
size_t length = snappy::MaxCompressedLength(block_size) * 2;
length += 3 * sizeof(int32_t);
DCHECK(*output_length == 0 || length <= *output_length);
// If length is non-zero then the output has been allocated.
if (*output_length != 0) {
_buffer_length = *output_length;
_out_buffer = *output;
} else if (!_reuse_buffer || _out_buffer == NULL || _buffer_length < length) {
_buffer_length = length;
_out_buffer = _memory_pool->allocate(_buffer_length);
}
uint8_t* outp = _out_buffer;
uint8_t* sizep = NULL;
ReadWriteUtil::put_int(outp, input_length);
outp += sizeof(int32_t);
do {
// Point at the spot to store the compressed size.
sizep = outp;
outp += sizeof(int32_t);
size_t size = 0;
snappy::RawCompress(reinterpret_cast<const char*>(input),
static_cast<size_t>(block_size), reinterpret_cast<char*>(outp), &size);
ReadWriteUtil::put_int(sizep, size);
input += block_size;
input_length -= block_size;
outp += size;
} while (input_length > 0);
*output = _out_buffer;
*output_length = outp - _out_buffer;
return Status::OK();
}
SnappyCompressor::SnappyCompressor(MemPool* mem_pool, bool reuse_buffer) :
Codec(mem_pool, reuse_buffer) {
}
int SnappyCompressor::max_compressed_len(int input_length) {
return snappy::MaxCompressedLength(input_length);
}
Status SnappyCompressor::compress(int input_len, uint8_t* input,
int* output_len, uint8_t* output) {
DCHECK_GE(*output_len, max_compressed_len(input_len));
size_t out_len = 0;
snappy::RawCompress(reinterpret_cast<const char*>(input),
static_cast<size_t>(input_len),
reinterpret_cast<char*>(output), &out_len);
*output_len = out_len;
return Status::OK();
}
Status SnappyCompressor::process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output) {
int max_compressed_len = this->max_compressed_len(input_length);
if (*output_length != 0 && *output_length < max_compressed_len) {
return Status::InternalError("process_block: output length too small");
}
if (*output_length != 0) {
_buffer_length = *output_length;
_out_buffer = *output;
} else if (!_reuse_buffer ||
_out_buffer == NULL || _buffer_length < max_compressed_len) {
DCHECK(_memory_pool != NULL) << "Can't allocate without passing in a mem pool";
_buffer_length = max_compressed_len;
_out_buffer = _memory_pool->allocate(_buffer_length);
*output = _out_buffer;
*output_length = max_compressed_len;
}
return compress(input_length, input, output_length, _out_buffer);
}
}

View File

@ -1,133 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_SRC_COMMON_UTIL_COMPRESS_H
#define DORIS_BE_SRC_COMMON_UTIL_COMPRESS_H
// We need zlib.h here to declare _stream below.
#include <zlib.h>
#include "util/codec.h"
#include "runtime/mem_pool.h"
namespace doris {
// Different compression classes. The classes all expose the same API and
// abstracts the underlying calls to the compression libraries.
// TODO: reconsider the abstracted API
class GzipCompressor : public Codec {
public:
// Compression formats supported by the zlib library
enum Format {
ZLIB,
DEFLATE,
GZIP,
};
// If gzip is set then we create gzip otherwise lzip.
GzipCompressor(Format format, MemPool* mem_pool, bool reuse_buffer);
virtual ~GzipCompressor();
// Returns an upper bound on the max compressed length.
int max_compressed_len(int input_len);
// Compresses 'input' into 'output'. Output must be preallocated and
// at least big enough.
// *output_length should be called with the length of the output buffer and on return
// is the length of the output.
Status compress(int input_length, uint8_t* input,
int* output_length, uint8_t* output);
// Process a block of data.
virtual Status process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output);
protected:
// Initialize the compressor.
virtual Status init();
private:
Format _format;
// Structure used to communicate with the library.
z_stream _stream;
// These are magic numbers from zlib.h. Not clear why they are not defined there.
const static int WINDOW_BITS = 15; // Maximum window size
const static int GZIP_CODEC = 16; // Output Gzip.
};
class BzipCompressor : public Codec {
public:
BzipCompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~BzipCompressor() { }
// Process a block of data.
virtual Status process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output);
// Initialize the compressor.
virtual Status init() {
return Status::OK();
}
};
class SnappyBlockCompressor : public Codec {
public:
SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~SnappyBlockCompressor() { }
// Process a block of data.
virtual Status process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output);
protected:
// Snappy does not need initialization
virtual Status init() {
return Status::OK();
}
};
class SnappyCompressor : public Codec {
public:
SnappyCompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~SnappyCompressor() { }
// Process a block of data.
virtual Status process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output);
// Returns an upper bound on the max compressed length.
int max_compressed_len(int input_len);
// Compresses 'input' into 'output'. Output must be preallocated and
// at least big enough.
// *output_length should be called with the length of the output buffer and on return
// is the length of the output.
Status compress(int input_length, uint8_t* input,
int* output_length, uint8_t* output);
protected:
// Snappy does not need initialization
virtual Status init() {
return Status::OK();
}
};
}
#endif

View File

@ -1,392 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/assign/list_of.hpp>
#include "util/decompress.h"
#include "exec/read_write_util.h"
#include "runtime/runtime_state.h"
#include "gen_cpp/Descriptors_types.h"
// Codec libraries
#include <zlib.h>
#include <bzlib.h>
#include <snappy/snappy.h>
namespace doris {
GzipDecompressor::GzipDecompressor(MemPool* mem_pool, bool reuse_buffer, bool is_deflate) :
Codec(mem_pool, reuse_buffer),
_is_deflate(is_deflate) {
bzero(&_stream, sizeof(_stream));
}
GzipDecompressor::~GzipDecompressor() {
(void)inflateEnd(&_stream);
}
Status GzipDecompressor::init() {
int ret = 0;
// Initialize to run either deflate or zlib/gzip format
int window_bits = _is_deflate ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
if ((ret = inflateInit2(&_stream, window_bits)) != Z_OK) {
return Status::InternalError("zlib inflateInit failed: " + std::string(_stream.msg));
}
return Status::OK();
}
Status GzipDecompressor::process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output) {
bool use_temp = false;
// If length is set then the output has been allocated.
if (*output_length != 0) {
_buffer_length = *output_length;
_out_buffer = *output;
} else if (!_reuse_buffer || _out_buffer == NULL) {
// guess that we will need 2x the input length.
_buffer_length = input_length * 2;
if (_buffer_length > MAX_BLOCK_SIZE) {
return Status::InternalError("Decompressor: block size is too big");
}
_out_buffer = _temp_memory_pool.allocate(_buffer_length);
use_temp = true;
}
int ret = 0;
while (ret != Z_STREAM_END) {
_stream.next_in = reinterpret_cast<Bytef*>(input);
_stream.avail_in = input_length;
_stream.next_out = reinterpret_cast<Bytef*>(_out_buffer);
_stream.avail_out = _buffer_length;
ret = inflate(&_stream, 1);
if (ret != Z_STREAM_END) {
if (ret == Z_OK) {
// Not enough output space.
DCHECK_EQ(*output_length, 0);
if (*output_length != 0) {
return Status::InternalError("Too small a buffer passed to GzipDecompressor");
}
_temp_memory_pool.clear();
_buffer_length *= 2;
if (_buffer_length > MAX_BLOCK_SIZE) {
return Status::InternalError("Decompressor: block size is too big");
}
_out_buffer = _temp_memory_pool.allocate(_buffer_length);
if (inflateReset(&_stream) != Z_OK) {
return Status::InternalError("zlib inflateEnd failed: " + std::string(_stream.msg));
}
continue;
}
return Status::InternalError("zlib inflate failed: " + std::string(_stream.msg));
}
}
if (inflateReset(&_stream) != Z_OK) {
return Status::InternalError("zlib inflateEnd failed: " + std::string(_stream.msg));
}
*output = _out_buffer;
// _stream.avail_out is the number of bytes *left* in the out buffer, but
// we're interested in the number of bytes used.
if (*output_length == 0) {
*output_length = _buffer_length - _stream.avail_out;
}
if (use_temp) {
_memory_pool->acquire_data(&_temp_memory_pool, _reuse_buffer);
}
return Status::OK();
}
BzipDecompressor::BzipDecompressor(MemPool* mem_pool, bool reuse_buffer) :
Codec(mem_pool, reuse_buffer) {
}
Status BzipDecompressor::process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output) {
bool use_temp = false;
// If length is set then the output has been allocated.
if (*output_length != 0) {
_buffer_length = *output_length;
_out_buffer = *output;
} else if (!_reuse_buffer || _out_buffer == NULL) {
// guess that we will need 2x the input length.
_buffer_length = input_length * 2;
if (_buffer_length > MAX_BLOCK_SIZE) {
return Status::InternalError("Decompressor: block size is too big");
}
_out_buffer = _temp_memory_pool.allocate(_buffer_length);
use_temp = true;
}
int ret = BZ_OUTBUFF_FULL;
unsigned int outlen = 0;
while (ret == BZ_OUTBUFF_FULL) {
if (_out_buffer == NULL) {
DCHECK_EQ(*output_length, 0);
_temp_memory_pool.clear();
_buffer_length = _buffer_length * 2;
if (_buffer_length > MAX_BLOCK_SIZE) {
return Status::InternalError("Decompressor: block size is too big");
}
_out_buffer = _temp_memory_pool.allocate(_buffer_length);
}
outlen = static_cast<unsigned int>(_buffer_length);
if ((ret = BZ2_bzBuffToBuffDecompress(
reinterpret_cast<char*>(_out_buffer), &outlen,
reinterpret_cast<char*>(input),
static_cast<unsigned int>(input_length), 0, 0)) == BZ_OUTBUFF_FULL) {
// If the output_length was passed we must have enough room.
DCHECK_EQ(*output_length, 0);
if (*output_length != 0) {
return Status::InternalError("Too small a buffer passed to BzipDecompressor");
}
_out_buffer = NULL;
}
}
if (ret != BZ_OK) {
std::stringstream ss;
ss << "bzlib BZ2_bzBuffToBuffDecompressor failed: " << ret;
return Status::InternalError(ss.str());
}
*output = _out_buffer;
if (*output_length == 0) {
*output_length = outlen;
}
if (use_temp) {
_memory_pool->acquire_data(&_temp_memory_pool, _reuse_buffer);
}
return Status::OK();
}
SnappyDecompressor::SnappyDecompressor(MemPool* mem_pool, bool reuse_buffer)
: Codec(mem_pool, reuse_buffer) {
}
Status SnappyDecompressor::process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output) {
// If length is set then the output has been allocated.
size_t uncompressed_length = 0;
if (*output_length != 0) {
_buffer_length = *output_length;
_out_buffer = *output;
} else {
// Snappy saves the uncompressed length so we never have to retry.
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
input_length, &uncompressed_length)) {
return Status::InternalError("Snappy: GetUncompressedLength failed");
}
if (!_reuse_buffer || _out_buffer == NULL || _buffer_length < uncompressed_length) {
_buffer_length = uncompressed_length;
if (_buffer_length > MAX_BLOCK_SIZE) {
return Status::InternalError("Decompressor: block size is too big");
}
_out_buffer = _memory_pool->allocate(_buffer_length);
}
}
if (!snappy::RawUncompress(
reinterpret_cast<const char*>(input),
static_cast<size_t>(input_length), reinterpret_cast<char*>(_out_buffer))) {
return Status::InternalError("Snappy: RawUncompress failed");
}
if (*output_length == 0) {
*output_length = uncompressed_length;
*output = _out_buffer;
}
return Status::OK();
}
SnappyBlockDecompressor::SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer) :
Codec(mem_pool, reuse_buffer) {
}
// Hadoop uses a block compression scheme on top of snappy. As per the hadoop docs
// the input is split into blocks. Each block "contains the uncompressed length for
// the block followed by one of more length-prefixed blocks of compressed data."
// This is essentially blocks of blocks.
// The outer block consists of:
// - 4 byte little endian uncompressed_size
// < inner blocks >
// ... repeated until input_len is consumed ..
// The inner blocks have:
// - 4-byte little endian compressed_size
// < snappy compressed block >
// - 4-byte little endian compressed_size
// < snappy compressed block >
// ... repeated until uncompressed_size from outer block is consumed ...
// Utility function to decompress snappy block compressed data. If size_only is true,
// this function does not decompress but only computes the output size and writes
// the result to *output_len.
// If size_only is false, output must be preallocated to output_len and this needs to
// be exactly big enough to hold the decompressed output.
// size_only is a O(1) operations (just reads a single varint for each snappy block).
static Status snappy_block_decompress(int input_len, uint8_t* input, bool size_only,
int* output_len, char* output) {
int uncompressed_total_len = 0;
while (input_len > 0) {
size_t uncompressed_block_len = ReadWriteUtil::get_int(input);
input += sizeof(int32_t);
input_len -= sizeof(int32_t);
if (uncompressed_block_len > Codec::MAX_BLOCK_SIZE || uncompressed_block_len == 0) {
if (uncompressed_total_len == 0) {
// TODO: is this check really robust?
std::stringstream ss;
ss << "Decompressor: block size is too big. Data is likely corrupt. "
<< "Size: " << uncompressed_block_len;
return Status::InternalError(ss.str());
}
break;
}
if (!size_only) {
int remaining_output_size = *output_len - uncompressed_total_len;
DCHECK_GE(remaining_output_size, uncompressed_block_len);
}
while (uncompressed_block_len > 0) {
// Read the length of the next snappy compressed block.
size_t compressed_len = ReadWriteUtil::get_int(input);
input += sizeof(int32_t);
input_len -= sizeof(int32_t);
if (compressed_len == 0 || compressed_len > input_len) {
if (uncompressed_total_len == 0) {
return Status::InternalError(
"Decompressor: invalid compressed length. Data is likely corrupt.");
}
input_len = 0;
break;
}
// Read how big the output will be.
size_t uncompressed_len = 0;
if (!snappy::GetUncompressedLength(reinterpret_cast<char*>(input),
input_len, &uncompressed_len)) {
if (uncompressed_total_len == 0) {
return Status::InternalError("Snappy: GetUncompressedLength failed");
}
input_len = 0;
break;
}
DCHECK_GT(uncompressed_len, 0);
if (!size_only) {
// Decompress this snappy block
if (!snappy::RawUncompress(reinterpret_cast<char*>(input),
compressed_len, output)) {
return Status::InternalError("Snappy: RawUncompress failed");
}
output += uncompressed_len;
}
input += compressed_len;
input_len -= compressed_len;
uncompressed_block_len -= uncompressed_len;
uncompressed_total_len += uncompressed_len;
}
}
if (size_only) {
*output_len = uncompressed_total_len;
} else if (*output_len != uncompressed_total_len) {
return Status::InternalError("Snappy: Decompressed size is not correct.");
}
return Status::OK();
}
Status SnappyBlockDecompressor::process_block(int input_len, uint8_t* input,
int* output_len, uint8_t** output) {
if (*output_len == 0) {
// If we don't know the size beforehand, compute it.
RETURN_IF_ERROR(snappy_block_decompress(input_len, input, true, output_len, NULL));
DCHECK_NE(*output_len, 0);
if (!_reuse_buffer || _out_buffer == NULL || _buffer_length < *output_len) {
// Need to allocate a new buffer
_buffer_length = *output_len;
_out_buffer = _memory_pool->allocate(_buffer_length);
}
*output = _out_buffer;
}
DCHECK(*output != NULL);
if (*output_len > MAX_BLOCK_SIZE) {
// TODO: is this check really robust?
std::stringstream ss;
ss << "Decompressor: block size is too big. Data is likely corrupt. "
<< "Size: " << *output_len;
return Status::InternalError(ss.str());
}
char* out_ptr = reinterpret_cast<char*>(*output);
RETURN_IF_ERROR(snappy_block_decompress(input_len, input, false, output_len, out_ptr));
return Status::OK();
}
}

View File

@ -1,102 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_SRC_COMMON_UTIL_DECOMPRESS_H
#define DORIS_BE_SRC_COMMON_UTIL_DECOMPRESS_H
// We need zlib.h here to declare _stream below.
#include <zlib.h>
#include "util/codec.h"
#include "runtime/mem_pool.h"
namespace doris {
class GzipDecompressor : public Codec {
public:
GzipDecompressor(MemPool* mem_pool, bool reuse_buffer, bool is_deflate);
virtual ~GzipDecompressor();
// Process a block of data.
virtual Status process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output);
protected:
// Initialize the decompressor.
virtual Status init();
private:
// If set assume deflate format, otherwise zlib or gzip
bool _is_deflate;
z_stream _stream;
// These are magic numbers from zlib.h. Not clear why they are not defined there.
const static int WINDOW_BITS = 15; // Maximum window size
const static int DETECT_CODEC = 32; // Determine if this is libz or gzip from header.
};
class BzipDecompressor : public Codec {
public:
BzipDecompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~BzipDecompressor() { }
// Process a block of data.
virtual Status process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output);
protected:
// Bzip does not need initialization
virtual Status init() {
return Status::OK();
}
};
class SnappyDecompressor : public Codec {
public:
SnappyDecompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~SnappyDecompressor() { }
// Process a block of data.
virtual Status process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output);
protected:
// Snappy does not need initialization
virtual Status init() {
return Status::OK();
}
};
class SnappyBlockDecompressor : public Codec {
public:
SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~SnappyBlockDecompressor() { }
//Process a block of data.
virtual Status process_block(int input_length, uint8_t* input,
int* output_length, uint8_t** output);
protected:
// Snappy does not need initialization
virtual Status init() {
return Status::OK();
}
};
}
#endif

View File

@ -24,7 +24,6 @@
#include <boost/foreach.hpp>
#include "common/object_pool.h"
#include "util/compress.h"
#include "util/cpu_info.h"
#include "util/debug_util.h"
#include "util/pretty_printer.h"

View File

@ -20,6 +20,7 @@
#include <assert.h>
#include <map>
#include <vector>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
@ -177,6 +178,14 @@ public:
return memcmp(a, b, n);
}
static size_t compute_total_size(const std::vector<Slice>& slices) {
size_t total_size = 0;
for (auto& slice : slices) {
total_size += slice.size;
}
return total_size;
}
};
/// Check whether two slices are identical.

View File

@ -43,3 +43,4 @@ ADD_BE_TEST(bitmap_test)
ADD_BE_TEST(faststring_test)
ADD_BE_TEST(rle_encoding_test)
ADD_BE_TEST(tdigest_test)
ADD_BE_TEST(block_compression_test)

View File

@ -0,0 +1,168 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/block_compression.h"
#include <gtest/gtest.h>
#include <iostream>
namespace doris {
class BlockCompressionTest : public testing::Test {
public:
BlockCompressionTest() { }
virtual ~BlockCompressionTest() {
}
};
static std::string generate_str(size_t len) {
static char charset[] = "0123456789"
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
std::string result;
result.resize(len);
for (int i = 0; i < len; ++i) {
result[i] = charset[rand() % sizeof(charset)];
}
return result;
}
void test_single_slice(segment_v2::CompressionTypePB type) {
BlockCompressionCodec* codec = nullptr;
auto st = get_block_compression_codec(type, &codec);
ASSERT_TRUE(st.ok());
size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
for (auto size : test_sizes) {
auto orig = generate_str(size);
size_t max_len = codec->max_compressed_len(size);
std::string compressed;
compressed.resize(max_len);
{
Slice compressed_slice(compressed);
st = codec->compress(orig, &compressed_slice);
ASSERT_TRUE(st.ok());
std::string uncompressed;
uncompressed.resize(size);
{
Slice uncompressed_slice(uncompressed);
st = codec->decompress(compressed_slice, &uncompressed_slice);
ASSERT_TRUE(st.ok());
ASSERT_STREQ(orig.c_str(), uncompressed.c_str());
}
// buffer not enough for decompress
// snappy has no return value if given buffer is not enough
// NOTE: For ZLIB, we even get OK with a insufficient output
// when uncompressed size is 1
if ((type == segment_v2::CompressionTypePB::ZLIB && uncompressed.size() > 1) &&
type != segment_v2::CompressionTypePB::SNAPPY &&
uncompressed.size() > 0) {
Slice uncompressed_slice(uncompressed);
uncompressed_slice.size -= 1;
st = codec->decompress(compressed_slice, &uncompressed_slice);
ASSERT_FALSE(st.ok());
}
// corrupt compressed data
if (type != segment_v2::CompressionTypePB::SNAPPY) {
Slice uncompressed_slice(uncompressed);
compressed_slice.size -= 1;
st = codec->decompress(compressed_slice, &uncompressed_slice);
ASSERT_FALSE(st.ok());
compressed_slice.size += 1;
}
}
// buffer not enough for compress
if (type != segment_v2::CompressionTypePB::SNAPPY && size > 0) {
Slice compressed_slice(compressed);
compressed_slice.size = 1;
st = codec->compress(orig, &compressed_slice);
ASSERT_FALSE(st.ok());
}
}
}
TEST_F(BlockCompressionTest, single) {
test_single_slice(segment_v2::CompressionTypePB::SNAPPY);
test_single_slice(segment_v2::CompressionTypePB::ZLIB);
test_single_slice(segment_v2::CompressionTypePB::LZ4);
test_single_slice(segment_v2::CompressionTypePB::LZ4F);
}
void test_multi_slices(segment_v2::CompressionTypePB type) {
BlockCompressionCodec* codec = nullptr;
auto st = get_block_compression_codec(type, &codec);
ASSERT_TRUE(st.ok());
size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
std::vector<std::string> orig_strs;
for (auto size : test_sizes) {
orig_strs.emplace_back(generate_str(size));
}
std::vector<Slice> orig_slices;
std::string orig;
for (auto& str : orig_strs) {
orig_slices.emplace_back(str);
orig.append(str);
}
size_t total_size = orig.size();
size_t max_len = codec->max_compressed_len(total_size);
std::string compressed;
compressed.resize(max_len);
{
Slice compressed_slice(compressed);
st = codec->compress(orig, &compressed_slice);
ASSERT_TRUE(st.ok());
std::string uncompressed;
uncompressed.resize(total_size);
// normal case
{
Slice uncompressed_slice(uncompressed);
st = codec->decompress(compressed_slice, &uncompressed_slice);
ASSERT_TRUE(st.ok());
ASSERT_STREQ(orig.c_str(), uncompressed.c_str());
}
}
// buffer not enough failed
if (type != segment_v2::CompressionTypePB::SNAPPY) {
Slice compressed_slice(compressed);
compressed_slice.size = 10;
st = codec->compress(orig, &compressed_slice);
ASSERT_FALSE(st.ok());
}
}
TEST_F(BlockCompressionTest, multi) {
test_multi_slices(segment_v2::CompressionTypePB::SNAPPY);
test_multi_slices(segment_v2::CompressionTypePB::ZLIB);
test_multi_slices(segment_v2::CompressionTypePB::LZ4);
test_multi_slices(segment_v2::CompressionTypePB::LZ4F);
}
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -61,8 +61,9 @@ enum CompressionTypePB {
NO_COMPRESSION = 2;
SNAPPY = 3;
LZ4 = 4;
ZLIB = 5;
ZSTB = 6;
LZ4F = 5;
ZLIB = 6;
ZSTD = 7;
}
message ZoneMapPB {
@ -117,7 +118,7 @@ message FileFooterPB {
optional uint64 data_footprint = 5; // total data footprint of all columns
optional uint64 raw_data_footprint = 6; // raw data footprint
optional CompressionTypePB compress_type = 7 [default = ZSTB]; // default compression type for file columns
optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns
repeated MetadataPairPB file_meta_datas = 8; // meta data of file
optional PagePointerPB key_index_page = 9; // short key index page
}
@ -147,7 +148,7 @@ message SegmentFooterPB {
optional uint64 data_footprint = 5; // total data footprint of all columns
optional uint64 raw_data_footprint = 6; // raw data footprint
optional CompressionTypePB compress_type = 7 [default = LZ4]; // default compression type for file columns
optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns
repeated MetadataPairPB file_meta_datas = 8; // meta data of file
// Short key index's page

View File

@ -154,6 +154,7 @@ ${DORIS_TEST_BINARY_DIR}/util/string_util_test
${DORIS_TEST_BINARY_DIR}/util/coding_test
${DORIS_TEST_BINARY_DIR}/util/faststring_test
${DORIS_TEST_BINARY_DIR}/util/tdigest_test
${DORIS_TEST_BINARY_DIR}/util/block_compression_test
# Running common Unittest
${DORIS_TEST_BINARY_DIR}/common/resource_tls_test