// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "io/buffered_reader.h" #include #include #include "common/config.h" #include "olap/olap_define.h" #include "util/bit_util.h" namespace doris { // buffered reader BufferedReader::BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t buffer_size) : _profile(profile), _reader(reader), _buffer_size(buffer_size), _buffer_offset(0), _buffer_limit(0), _cur_offset(0) { if (_buffer_size == -1L) { _buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024; } _buffer = new char[_buffer_size]; // set the _cur_offset of this reader as same as the inner reader's, // to make sure the buffer reader will start to read at right position. _reader->tell(&_cur_offset); } BufferedReader::~BufferedReader() { close(); } Status BufferedReader::open() { if (!_reader) { return Status::InternalError("Open buffered reader failed, reader is null"); } // the macro ADD_XXX is idempotent. // So although each scanner calls the ADD_XXX method, they all use the same counters. _read_timer = ADD_TIMER(_profile, "FileReadTime"); _remote_read_timer = ADD_TIMER(_profile, "FileRemoteReadTime"); _read_counter = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT); _remote_read_counter = ADD_COUNTER(_profile, "FileRemoteReadCalls", TUnit::UNIT); _remote_read_bytes = ADD_COUNTER(_profile, "FileRemoteReadBytes", TUnit::BYTES); _remote_read_rate = _profile->add_derived_counter( "FileRemoteReadRate", TUnit::BYTES_PER_SECOND, std::bind(&RuntimeProfile::units_per_second, _remote_read_bytes, _remote_read_timer), ""); RETURN_IF_ERROR(_reader->open()); return Status::OK(); } //not support Status BufferedReader::read_one_message(std::unique_ptr* buf, int64_t* length) { return Status::NotSupported("Not support"); } Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) { DCHECK_NE(buf_len, 0); RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf)); if (*bytes_read == 0) { *eof = true; } else { *eof = false; } return Status::OK(); } Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { SCOPED_TIMER(_read_timer); if (nbytes <= 0) { *bytes_read = 0; return Status::OK(); } RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out)); //EOF if (*bytes_read <= 0) { return Status::OK(); } while (*bytes_read < nbytes) { int64_t len; RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast(out) + *bytes_read)); // EOF if (len <= 0) { break; } *bytes_read += len; } return Status::OK(); } Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { ++_read_count; // requested bytes missed the local buffer if (position >= _buffer_limit || position < _buffer_offset) { // if requested length is larger than the capacity of buffer, do not // need to copy the character into local buffer. if (nbytes > _buffer_size) { auto st = _reader->readat(position, nbytes, bytes_read, out); if (st.ok()) { _cur_offset = position + *bytes_read; ++_remote_read_count; _remote_bytes += *bytes_read; } return st; } _buffer_offset = position; RETURN_IF_ERROR(_fill()); if (position >= _buffer_limit) { *bytes_read = 0; return Status::OK(); } } int64_t len = std::min(_buffer_limit - position, nbytes); int64_t off = position - _buffer_offset; memcpy(out, _buffer + off, len); *bytes_read = len; _cur_offset = position + *bytes_read; return Status::OK(); } Status BufferedReader::_fill() { if (_buffer_offset >= 0) { int64_t bytes_read = 0; SCOPED_TIMER(_remote_read_timer); RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer)); _buffer_limit = _buffer_offset + bytes_read; ++_remote_read_count; _remote_bytes += bytes_read; } return Status::OK(); } int64_t BufferedReader::size() { return _reader->size(); } Status BufferedReader::seek(int64_t position) { _cur_offset = position; return Status::OK(); } Status BufferedReader::tell(int64_t* position) { *position = _cur_offset; return Status::OK(); } void BufferedReader::close() { _reader->close(); SAFE_DELETE_ARRAY(_buffer); if (_read_counter != nullptr) { COUNTER_UPDATE(_read_counter, _read_count); } if (_remote_read_counter != nullptr) { COUNTER_UPDATE(_remote_read_counter, _remote_read_count); } if (_remote_read_bytes != nullptr) { COUNTER_UPDATE(_remote_read_bytes, _remote_bytes); } } bool BufferedReader::closed() { return _reader->closed(); } BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t length, size_t max_buf_size) : _file(file), _file_start_offset(offset), _file_end_offset(offset + length), _max_buf_size(max_buf_size) {} Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read) { if (offset < _file_start_offset || offset >= _file_end_offset) { return Status::IOError("Out-of-bounds Access"); } int64_t end_offset = offset + bytes_to_read; if (_buf_start_offset <= offset && _buf_end_offset >= end_offset) { *buf = _buf.get() + offset - _buf_start_offset; return Status::OK(); } size_t buf_size = std::max(_max_buf_size, bytes_to_read); if (_buf_size < buf_size) { std::unique_ptr new_buf(new uint8_t[buf_size]); if (offset >= _buf_start_offset && offset < _buf_end_offset) { memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset); } _buf = std::move(new_buf); _buf_size = buf_size; } else if (offset > _buf_start_offset && offset < _buf_end_offset) { memmove(_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset); } if (offset < _buf_start_offset || offset >= _buf_end_offset) { _buf_end_offset = offset; } _buf_start_offset = offset; int64_t buf_remaining = _buf_end_offset - _buf_start_offset; int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset); int64_t has_read = 0; while (has_read < to_read) { int64_t loop_read = 0; RETURN_IF_ERROR(_file->readat(_buf_end_offset + has_read, to_read - has_read, &loop_read, _buf.get() + buf_remaining + has_read)); has_read += loop_read; } if (has_read != to_read) { return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read); } _buf_end_offset += to_read; *buf = _buf.get(); return Status::OK(); } Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset) { return read_bytes((const uint8_t**)&slice.data, offset, slice.size); } } // namespace doris