From 591d391bbc1e9ec0d7cc19be04bf5658243d38dd Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sat, 22 May 2021 23:38:10 +0800 Subject: [PATCH] [Bug] Fix bug that the buffered reader may read at wrong position. (#5847) The buffered reader's _cur_offset should be initialized as same as the inner file reader's, to make sure that the reader will start to read at rignt position. --- be/src/exec/buffered_reader.cpp | 3 +++ be/src/exec/s3_reader.cpp | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp index 3b6b4285b6..a86044dbf5 100644 --- a/be/src/exec/buffered_reader.cpp +++ b/be/src/exec/buffered_reader.cpp @@ -32,6 +32,9 @@ BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size) _buffer_limit(0), _cur_offset(0) { _buffer = new char[_buffer_size]; + // set the _cur_offset of this reader as same as the inner reader's, + // to make sure the buffer reader will start to read at right position. + _reader->tell(&_cur_offset); } BufferedReader::~BufferedReader() { diff --git a/be/src/exec/s3_reader.cpp b/be/src/exec/s3_reader.cpp index b8a674b2bd..ddc71c69b1 100644 --- a/be/src/exec/s3_reader.cpp +++ b/be/src/exec/s3_reader.cpp @@ -68,6 +68,7 @@ Status S3Reader::open() { return Status::InternalError(out.str()); } } + Status S3Reader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) { DCHECK_NE(buf_len, 0); RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf)); @@ -78,6 +79,7 @@ Status S3Reader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* } return Status::OK(); } + Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { CHECK_S3_CLIENT(_client); if (position >= _file_size) { @@ -107,6 +109,7 @@ Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, v response.GetResult().GetBody().read((char*)out, *bytes_read); return Status::OK(); } + Status S3Reader::read_one_message(std::unique_ptr* buf, int64_t* length) { bool eof; int64_t file_size = size() - _cur_offset; @@ -123,17 +126,21 @@ Status S3Reader::read_one_message(std::unique_ptr* buf, int64_t* leng int64_t S3Reader::size() { return _file_size; } + Status S3Reader::seek(int64_t position) { _cur_offset = position; return Status::OK(); } + Status S3Reader::tell(int64_t* position) { *position = _cur_offset; return Status::OK(); } + void S3Reader::close() { _closed = true; } + bool S3Reader::closed() { return _closed; }