// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "io/buffered_reader.h" #include #include #include "common/config.h" #include "olap/olap_define.h" #include "util/bit_util.h" namespace doris { // buffered reader BufferedReader::BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t buffer_size) : _profile(profile), _reader(reader), _buffer_size(buffer_size), _buffer_offset(0), _buffer_limit(0), _cur_offset(0) { if (_buffer_size == -1L) { _buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024; } _buffer = new char[_buffer_size]; // set the _cur_offset of this reader as same as the inner reader's, // to make sure the buffer reader will start to read at right position. _reader->tell(&_cur_offset); } BufferedReader::~BufferedReader() { close(); } Status BufferedReader::open() { if (!_reader) { return Status::InternalError("Open buffered reader failed, reader is null"); } // the macro ADD_XXX is idempotent. // So although each scanner calls the ADD_XXX method, they all use the same counters. _read_timer = ADD_TIMER(_profile, "FileReadTime"); _remote_read_timer = ADD_TIMER(_profile, "FileRemoteReadTime"); _read_counter = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT); _remote_read_counter = ADD_COUNTER(_profile, "FileRemoteReadCalls", TUnit::UNIT); _remote_read_bytes = ADD_COUNTER(_profile, "FileRemoteReadBytes", TUnit::BYTES); _remote_read_rate = _profile->add_derived_counter( "FileRemoteReadRate", TUnit::BYTES_PER_SECOND, std::bind(&RuntimeProfile::units_per_second, _remote_read_bytes, _remote_read_timer), ""); RETURN_IF_ERROR(_reader->open()); return Status::OK(); } //not support Status BufferedReader::read_one_message(std::unique_ptr* buf, int64_t* length) { return Status::NotSupported("Not support"); } Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) { DCHECK_NE(buf_len, 0); RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf)); if (*bytes_read == 0) { *eof = true; } else { *eof = false; } return Status::OK(); } Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { SCOPED_TIMER(_read_timer); if (nbytes <= 0) { *bytes_read = 0; return Status::OK(); } RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out)); //EOF if (*bytes_read <= 0) { return Status::OK(); } while (*bytes_read < nbytes) { int64_t len; RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast(out) + *bytes_read)); // EOF if (len <= 0) { break; } *bytes_read += len; } return Status::OK(); } Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { ++_read_count; // requested bytes missed the local buffer if (position >= _buffer_limit || position < _buffer_offset) { // if requested length is larger than the capacity of buffer, do not // need to copy the character into local buffer. if (nbytes > _buffer_size) { auto st = _reader->readat(position, nbytes, bytes_read, out); if (st.ok()) { _cur_offset = position + *bytes_read; ++_remote_read_count; _remote_bytes += *bytes_read; } return st; } _buffer_offset = position; RETURN_IF_ERROR(_fill()); if (position >= _buffer_limit) { *bytes_read = 0; return Status::OK(); } } int64_t len = std::min(_buffer_limit - position, nbytes); int64_t off = position - _buffer_offset; memcpy(out, _buffer + off, len); *bytes_read = len; _cur_offset = position + *bytes_read; return Status::OK(); } Status BufferedReader::_fill() { if (_buffer_offset >= 0) { int64_t bytes_read = 0; SCOPED_TIMER(_remote_read_timer); RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer)); _buffer_limit = _buffer_offset + bytes_read; ++_remote_read_count; _remote_bytes += bytes_read; } return Status::OK(); } int64_t BufferedReader::size() { return _reader->size(); } Status BufferedReader::seek(int64_t position) { _cur_offset = position; return Status::OK(); } Status BufferedReader::tell(int64_t* position) { *position = _cur_offset; return Status::OK(); } void BufferedReader::close() { _reader->close(); SAFE_DELETE_ARRAY(_buffer); if (_read_counter != nullptr) { COUNTER_UPDATE(_read_counter, _read_count); } if (_remote_read_counter != nullptr) { COUNTER_UPDATE(_remote_read_counter, _remote_read_count); } if (_remote_read_bytes != nullptr) { COUNTER_UPDATE(_remote_read_bytes, _remote_bytes); } } bool BufferedReader::closed() { return _reader->closed(); } BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, int64_t offset, int64_t length) : _file(file), _file_start_offset(offset), _file_end_offset(offset + length) {} Status BufferedFileStreamReader::seek(int64_t position) { if (_file_position != position) { RETURN_IF_ERROR(_file->seek(position)); _file_position = position; } return Status::OK(); } Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, int64_t offset, int64_t* bytes_to_read) { if (offset < _file_start_offset) { return Status::IOError("Out-of-bounds Access"); } if (offset >= _file_end_offset) { *bytes_to_read = 0; return Status::OK(); } int64_t end_offset = offset + *bytes_to_read; if (_buf_start_offset <= offset && _buf_end_offset >= end_offset) { *buf = _buf.get() + offset - _buf_start_offset; return Status::OK(); } if (_buf_size < *bytes_to_read) { size_t new_size = BitUtil::next_power_of_two(*bytes_to_read); std::unique_ptr new_buf(new uint8_t[new_size]); if (offset >= _buf_start_offset && offset < _buf_end_offset) { memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset); } _buf = std::move(new_buf); _buf_size = new_size; } else if (offset > _buf_start_offset && offset < _buf_end_offset) { memmove(_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset); } if (offset < _buf_start_offset || offset >= _buf_end_offset) { _buf_end_offset = offset; } _buf_start_offset = offset; int64_t to_read = end_offset - _buf_end_offset; RETURN_IF_ERROR(seek(_buf_end_offset)); bool eof = false; int64_t buf_remaining = _buf_end_offset - _buf_start_offset; RETURN_IF_ERROR( _file->read(_buf.get() + buf_remaining, _buf_size - buf_remaining, &to_read, &eof)); *bytes_to_read = buf_remaining + to_read; _buf_end_offset += to_read; *buf = _buf.get(); return Status::OK(); } Status BufferedFileStreamReader::read_bytes(Slice& slice, int64_t offset) { int64_t bytes_to_read = slice.size; Status st = read_bytes((const uint8_t**)&slice.data, offset, &bytes_to_read); slice.size = bytes_to_read; return st; } } // namespace doris