[branch-2.1](fix) fix snappy decompressor bug (#40862)
## Proposed changes Hadoop snappycodec source : https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc Example: OriginData(The original data will be divided into several large data block.) : large data block1 | large data block2 | large data block3 | .... The large data block will be divided into several small data block. Suppose a large data block is divided into three small blocks: large data block1: | small block1 | small block2 | small block3 | CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]> A : original length of the current block of large data block. sizeof(A) = 4 bytes. A = length(small block1) + length(small block2) + length(small block3) Bx : length of small data block bx. sizeof(Bx) = 4 bytes. Bx = length(compress(small blockx))
This commit is contained in:
@ -468,7 +468,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
|
||||
}
|
||||
|
||||
std::size_t decompressed_large_block_len = 0;
|
||||
do {
|
||||
while (remaining_decompressed_large_block_len > 0) {
|
||||
// Check that input length should not be negative.
|
||||
if (input_len < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - input_len;
|
||||
@ -505,8 +505,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
|
||||
output_ptr += decompressed_small_block_len;
|
||||
remaining_decompressed_large_block_len -= decompressed_small_block_len;
|
||||
decompressed_large_block_len += decompressed_small_block_len;
|
||||
|
||||
} while (remaining_decompressed_large_block_len > 0);
|
||||
};
|
||||
|
||||
if (*more_input_bytes != 0) {
|
||||
// Need more input buffer
|
||||
@ -535,90 +534,113 @@ Status SnappyBlockDecompressor::init() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Hadoop snappycodec source :
|
||||
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
|
||||
// Example:
|
||||
// OriginData(The original data will be divided into several large data block.) :
|
||||
// large data block1 | large data block2 | large data block3 | ....
|
||||
// The large data block will be divided into several small data block.
|
||||
// Suppose a large data block is divided into three small blocks:
|
||||
// large data block1: | small block1 | small block2 | small block3 |
|
||||
// CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
|
||||
//
|
||||
// A : original length of the current block of large data block.
|
||||
// sizeof(A) = 4 bytes.
|
||||
// A = length(small block1) + length(small block2) + length(small block3)
|
||||
// Bx : length of small data block bx.
|
||||
// sizeof(Bx) = 4 bytes.
|
||||
// Bx = length(compress(small blockx))
|
||||
Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
|
||||
size_t* input_bytes_read, uint8_t* output,
|
||||
size_t output_max_len, size_t* decompressed_len,
|
||||
bool* stream_end, size_t* more_input_bytes,
|
||||
size_t* more_output_bytes) {
|
||||
uint8_t* src = input;
|
||||
size_t remaining_input_size = input_len;
|
||||
int64_t uncompressed_total_len = 0;
|
||||
*input_bytes_read = 0;
|
||||
auto* input_ptr = input;
|
||||
auto* output_ptr = output;
|
||||
|
||||
// The hadoop snappy codec is as:
|
||||
// <4 byte big endian uncompressed size>
|
||||
// <4 byte big endian compressed size>
|
||||
// <snappy compressed block>
|
||||
// ....
|
||||
// <4 byte big endian uncompressed size>
|
||||
// <4 byte big endian compressed size>
|
||||
// <snappy compressed block>
|
||||
//
|
||||
// See:
|
||||
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
|
||||
while (remaining_input_size > 0) {
|
||||
if (remaining_input_size < 4) {
|
||||
*more_input_bytes = 4 - remaining_input_size;
|
||||
break;
|
||||
while (input_len > 0) {
|
||||
//if faild , fall back to large block begin
|
||||
auto* large_block_input_ptr = input_ptr;
|
||||
auto* large_block_output_ptr = output_ptr;
|
||||
|
||||
if (input_len < sizeof(uint32_t)) {
|
||||
return Status::InvalidArgument(strings::Substitute(
|
||||
"fail to do hadoop-snappy decompress, input_len=$0", input_len));
|
||||
}
|
||||
// Read uncompressed size
|
||||
uint32_t uncompressed_block_len = Decompressor::_read_int32(src);
|
||||
int64_t remaining_output_len = output_max_len - uncompressed_total_len;
|
||||
if (remaining_output_len < uncompressed_block_len) {
|
||||
|
||||
uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);
|
||||
|
||||
input_ptr += sizeof(uint32_t);
|
||||
input_len -= sizeof(uint32_t);
|
||||
|
||||
std::size_t remaining_output_len = output_max_len - *decompressed_len;
|
||||
|
||||
if (remaining_output_len < remaining_decompressed_large_block_len) {
|
||||
// Need more output buffer
|
||||
*more_output_bytes = uncompressed_block_len - remaining_output_len;
|
||||
*more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
|
||||
input_ptr = large_block_input_ptr;
|
||||
output_ptr = large_block_output_ptr;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if (uncompressed_block_len == 0) {
|
||||
remaining_input_size -= sizeof(uint32_t);
|
||||
std::size_t decompressed_large_block_len = 0;
|
||||
while (remaining_decompressed_large_block_len > 0) {
|
||||
// Check that input length should not be negative.
|
||||
if (input_len < sizeof(uint32_t)) {
|
||||
*more_input_bytes = sizeof(uint32_t) - input_len;
|
||||
break;
|
||||
}
|
||||
|
||||
// Read the length of the next snappy compressed block.
|
||||
size_t compressed_small_block_len = BigEndian::Load32(input_ptr);
|
||||
|
||||
input_ptr += sizeof(uint32_t);
|
||||
input_len -= sizeof(uint32_t);
|
||||
|
||||
if (compressed_small_block_len == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (compressed_small_block_len > input_len) {
|
||||
// Need more input buffer
|
||||
*more_input_bytes = compressed_small_block_len - input_len;
|
||||
break;
|
||||
}
|
||||
|
||||
// Decompress this block.
|
||||
size_t decompressed_small_block_len;
|
||||
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input_ptr),
|
||||
compressed_small_block_len,
|
||||
&decompressed_small_block_len)) {
|
||||
return Status::InternalError(
|
||||
"snappy block decompress failed to get uncompressed len");
|
||||
}
|
||||
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input_ptr),
|
||||
compressed_small_block_len,
|
||||
reinterpret_cast<char*>(output_ptr))) {
|
||||
return Status::InternalError(
|
||||
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
|
||||
decompressed_small_block_len, compressed_small_block_len);
|
||||
}
|
||||
input_ptr += compressed_small_block_len;
|
||||
input_len -= compressed_small_block_len;
|
||||
|
||||
output_ptr += decompressed_small_block_len;
|
||||
remaining_decompressed_large_block_len -= decompressed_small_block_len;
|
||||
decompressed_large_block_len += decompressed_small_block_len;
|
||||
};
|
||||
|
||||
if (*more_input_bytes != 0) {
|
||||
// Need more input buffer
|
||||
input_ptr = large_block_input_ptr;
|
||||
output_ptr = large_block_output_ptr;
|
||||
break;
|
||||
}
|
||||
|
||||
if (remaining_input_size <= 2 * sizeof(uint32_t)) {
|
||||
// The remaining input size should be larger then <uncompressed size><compressed size><compressed data>
|
||||
// +1 means we need at least 1 bytes of compressed data.
|
||||
*more_input_bytes = 2 * sizeof(uint32_t) + 1 - remaining_input_size;
|
||||
break;
|
||||
}
|
||||
|
||||
// Read compressed size
|
||||
size_t tmp_remaining_size = remaining_input_size - 2 * sizeof(uint32_t);
|
||||
size_t compressed_len = _read_int32(src + sizeof(uint32_t));
|
||||
if (compressed_len > tmp_remaining_size) {
|
||||
// Need more input data
|
||||
*more_input_bytes = compressed_len - tmp_remaining_size;
|
||||
break;
|
||||
}
|
||||
|
||||
src += 2 * sizeof(uint32_t);
|
||||
remaining_input_size -= 2 * sizeof(uint32_t);
|
||||
|
||||
// ATTN: the uncompressed len from GetUncompressedLength() is same as
|
||||
// uncompressed_block_len, so I think it is unnecessary to get it again.
|
||||
// Get uncompressed len from snappy
|
||||
// size_t uncompressed_len;
|
||||
// if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(src),
|
||||
// compressed_len, &uncompressed_len)) {
|
||||
// return Status::InternalError("snappy block decompress failed to get uncompressed len");
|
||||
// }
|
||||
|
||||
// Decompress
|
||||
if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), compressed_len,
|
||||
reinterpret_cast<char*>(output))) {
|
||||
return Status::InternalError(
|
||||
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
|
||||
uncompressed_block_len, compressed_len);
|
||||
}
|
||||
|
||||
output += uncompressed_block_len;
|
||||
src += compressed_len;
|
||||
remaining_input_size -= compressed_len;
|
||||
uncompressed_total_len += uncompressed_block_len;
|
||||
*decompressed_len += decompressed_large_block_len;
|
||||
}
|
||||
|
||||
*input_bytes_read += (input_len - remaining_input_size);
|
||||
*decompressed_len = uncompressed_total_len;
|
||||
*input_bytes_read += (input_ptr - input);
|
||||
// If no more input and output need, means this is the end of a compressed block
|
||||
*stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);
|
||||
|
||||
|
||||
Binary file not shown.
@ -123,28 +123,28 @@
|
||||
2023-09-18 7
|
||||
|
||||
-- !snappy_1 --
|
||||
1 694832 buHDwfGeNHfpRFdNaogneddi 2024-02-09 4.899588807225554
|
||||
10 218729 goZsLvvWFOIjlzSAitC 2024-06-10 4.137732740231178
|
||||
100 813423 zICskqgcdPc 2024-03-23 8.486529018746493
|
||||
1000 612650 RzOXeYpKOmuJOogUyeIEDNDmvq 2023-12-05 7.8741752707933435
|
||||
1001 29486 WoUAFJFuJNnwyqMnoDhX 2024-03-11 9.758244908785949
|
||||
1002 445363 OdTEeeWtxfcRwx 2024-08-01 0.3934945460194128
|
||||
1003 707035 JAYnKxusVpGzYueACf 2023-11-14 5.377110182643222
|
||||
1004 227858 JIFyjKzmbjkt 2024-03-24 5.748037621519263
|
||||
1005 539305 PlruLkSUSXZgaHafFriklrhCi 2023-11-08 4.122635188836725
|
||||
1006 145518 KCwqEcSCGuXrHerwn 2024-06-22 8.482290064407216
|
||||
1007 939028 KzXhEMelsKVLbDMsEKh 2024-01-01 8.144449761594585
|
||||
1008 913569 CHlqPKqkIdqwBCBUHreXbFAkCt 2024-05-25 1.5683842369495904
|
||||
1009 757881 AjcSyYMIMzS 2024-05-04 7.5674012939461255
|
||||
101 326164 QWLnalYNmYDt 2024-01-07 3.8159876011523854
|
||||
1010 427079 AlRUfmxfAuoLnPqUTvQVMtrS 2024-06-04 3.8087069699523313
|
||||
1011 252076 gHmFDhtytYzWETIxdpkpMUpnLd 2023-09-17 6.773606843056635
|
||||
1012 819615 rFfRHquexplDJvSeUK 2023-11-02 3.220639250504097
|
||||
1013 413456 uvNPelHXYjJKiOkwdNbmUkGzxiiqLo 2024-03-15 8.305048700108081
|
||||
1014 308042 vnzcsvHxnWFhvLwJkAtUqe 2024-06-15 1.5668867233009998
|
||||
1015 603837 VBEsRVGyhRNWQeKzDaBnJHmFDnXAOU 2024-08-17 3.8287482122289007
|
||||
1016 912679 eEjldPhxojSjTnE 2024-01-09 1.3717891874157961
|
||||
1017 630392 TcczYHXbwaCYzFSfXJlhsFjN 2023-10-07 4.733337480058437
|
||||
1 694832 buHDwfGeNHfpRFdNaogneddi 2024-02-09 4.8995886
|
||||
10 218729 goZsLvvWFOIjlzSAitC 2024-06-10 4.1377325
|
||||
100 813423 zICskqgcdPc 2024-03-23 8.4865294
|
||||
1000 612650 RzOXeYpKOmuJOogUyeIEDNDmvq 2023-12-05 7.8741751
|
||||
1001 29486 WoUAFJFuJNnwyqMnoDhX 2024-03-11 9.7582445
|
||||
1002 445363 OdTEeeWtxfcRwx 2024-08-01 0.39349455
|
||||
1003 707035 JAYnKxusVpGzYueACf 2023-11-14 5.37711
|
||||
1004 227858 JIFyjKzmbjkt 2024-03-24 5.7480378
|
||||
1005 539305 PlruLkSUSXZgaHafFriklrhCi 2023-11-08 4.1226354
|
||||
1006 145518 KCwqEcSCGuXrHerwn 2024-06-22 8.48229
|
||||
1007 939028 KzXhEMelsKVLbDMsEKh 2024-01-01 8.14445
|
||||
1008 913569 CHlqPKqkIdqwBCBUHreXbFAkCt 2024-05-25 1.5683843
|
||||
1009 757881 AjcSyYMIMzS 2024-05-04 7.5674014
|
||||
101 326164 QWLnalYNmYDt 2024-01-07 3.8159876
|
||||
1010 427079 AlRUfmxfAuoLnPqUTvQVMtrS 2024-06-04 3.808707
|
||||
1011 252076 gHmFDhtytYzWETIxdpkpMUpnLd 2023-09-17 6.7736068
|
||||
1012 819615 rFfRHquexplDJvSeUK 2023-11-02 3.2206392
|
||||
1013 413456 uvNPelHXYjJKiOkwdNbmUkGzxiiqLo 2024-03-15 8.3050489
|
||||
1014 308042 vnzcsvHxnWFhvLwJkAtUqe 2024-06-15 1.5668868
|
||||
1015 603837 VBEsRVGyhRNWQeKzDaBnJHmFDnXAOU 2024-08-17 3.8287482
|
||||
1016 912679 eEjldPhxojSjTnE 2024-01-09 1.3717892
|
||||
1017 630392 TcczYHXbwaCYzFSfXJlhsFjN 2023-10-07 4.7333374
|
||||
|
||||
-- !snappy_2 --
|
||||
|
||||
|
||||
Reference in New Issue
Block a user