[enhance](BufferedReader) don't blocking wait on buffered reader's condition variable (#21153)
This commit is contained in:
@ -22,6 +22,7 @@
|
||||
#include <string.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
|
||||
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
|
||||
#include "common/compiler_util.h" // IWYU pragma: keep
|
||||
@ -363,13 +364,23 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// the condition variable would wait at most 10 seconds
|
||||
// otherwise it would quit the procedure and treat it
|
||||
// as one time out error status and would make the load
|
||||
// task failed
|
||||
constexpr static int WAIT_TIME_OUT_MS = 10000;
|
||||
|
||||
// there exists occasions where the buffer is already closed but
|
||||
// some prior tasks are still queued in thread pool, so we have to check whether
|
||||
// the buffer is closed each time the condition variable is notified.
|
||||
void PrefetchBuffer::reset_offset(size_t offset) {
|
||||
{
|
||||
std::unique_lock lck {_lock};
|
||||
_prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; });
|
||||
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
|
||||
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
|
||||
_prefetch_status = Status::TimedOut("time out when reset prefetch buffer");
|
||||
return;
|
||||
}
|
||||
if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) {
|
||||
_prefetched.notify_all();
|
||||
return;
|
||||
@ -393,9 +404,13 @@ void PrefetchBuffer::reset_offset(size_t offset) {
|
||||
void PrefetchBuffer::prefetch_buffer() {
|
||||
{
|
||||
std::unique_lock lck {_lock};
|
||||
_prefetched.wait(lck, [this]() {
|
||||
return _buffer_status == BufferStatus::RESET || _buffer_status == BufferStatus::CLOSED;
|
||||
});
|
||||
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
|
||||
return _buffer_status == BufferStatus::RESET ||
|
||||
_buffer_status == BufferStatus::CLOSED;
|
||||
})) {
|
||||
_prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
|
||||
return;
|
||||
}
|
||||
// in case buffer is already closed
|
||||
if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) {
|
||||
_prefetched.notify_all();
|
||||
@ -432,7 +447,11 @@ void PrefetchBuffer::prefetch_buffer() {
|
||||
_statis.prefetch_request_io += 1;
|
||||
_statis.prefetch_request_bytes += _len;
|
||||
std::unique_lock lck {_lock};
|
||||
_prefetched.wait(lck, [this]() { return _buffer_status == BufferStatus::PENDING; });
|
||||
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
|
||||
[this]() { return _buffer_status == BufferStatus::PENDING; })) {
|
||||
_prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
|
||||
return;
|
||||
}
|
||||
if (!s.ok() && _offset < _reader->size()) {
|
||||
_prefetch_status = std::move(s);
|
||||
}
|
||||
@ -509,10 +528,13 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
|
||||
{
|
||||
std::unique_lock lck {_lock};
|
||||
// buffer must be prefetched or it's closed
|
||||
_prefetched.wait(lck, [this]() {
|
||||
return _buffer_status == BufferStatus::PREFETCHED ||
|
||||
_buffer_status == BufferStatus::CLOSED;
|
||||
});
|
||||
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
|
||||
return _buffer_status == BufferStatus::PREFETCHED ||
|
||||
_buffer_status == BufferStatus::CLOSED;
|
||||
})) {
|
||||
_prefetch_status = Status::TimedOut("time out when read prefetch buffer");
|
||||
return _prefetch_status;
|
||||
}
|
||||
if (UNLIKELY(BufferStatus::CLOSED == _buffer_status)) {
|
||||
return Status::OK();
|
||||
}
|
||||
@ -545,7 +567,11 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
|
||||
void PrefetchBuffer::close() {
|
||||
std::unique_lock lck {_lock};
|
||||
// in case _reader still tries to write to the buf after we close the buffer
|
||||
_prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; });
|
||||
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
|
||||
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
|
||||
_prefetch_status = Status::TimedOut("time out when close prefetch buffer");
|
||||
return;
|
||||
}
|
||||
_buffer_status = BufferStatus::CLOSED;
|
||||
_prefetched.notify_all();
|
||||
if (_sync_profile != nullptr) {
|
||||
|
||||
Reference in New Issue
Block a user