diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 6dcbad96b7..e6093612ba 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -22,6 +22,7 @@ #include #include +#include // IWYU pragma: no_include #include "common/compiler_util.h" // IWYU pragma: keep @@ -363,13 +364,23 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz return Status::OK(); } +// the condition variable would wait at most 10 seconds +// otherwise it would quit the procedure and treat it +// as one time out error status and would make the load +// task failed +constexpr static int WAIT_TIME_OUT_MS = 10000; + // there exists occasions where the buffer is already closed but // some prior tasks are still queued in thread pool, so we have to check whether // the buffer is closed each time the condition variable is notified. void PrefetchBuffer::reset_offset(size_t offset) { { std::unique_lock lck {_lock}; - _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), + [this]() { return _buffer_status != BufferStatus::PENDING; })) { + _prefetch_status = Status::TimedOut("time out when reset prefetch buffer"); + return; + } if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { _prefetched.notify_all(); return; @@ -393,9 +404,13 @@ void PrefetchBuffer::reset_offset(size_t offset) { void PrefetchBuffer::prefetch_buffer() { { std::unique_lock lck {_lock}; - _prefetched.wait(lck, [this]() { - return _buffer_status == BufferStatus::RESET || _buffer_status == BufferStatus::CLOSED; - }); + if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() { + return _buffer_status == BufferStatus::RESET || + _buffer_status == BufferStatus::CLOSED; + })) { + _prefetch_status = Status::TimedOut("time out when invoking prefetch buffer"); + return; + } // in case buffer is already closed if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { _prefetched.notify_all(); @@ -432,7 +447,11 @@ void PrefetchBuffer::prefetch_buffer() { _statis.prefetch_request_io += 1; _statis.prefetch_request_bytes += _len; std::unique_lock lck {_lock}; - _prefetched.wait(lck, [this]() { return _buffer_status == BufferStatus::PENDING; }); + if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), + [this]() { return _buffer_status == BufferStatus::PENDING; })) { + _prefetch_status = Status::TimedOut("time out when invoking prefetch buffer"); + return; + } if (!s.ok() && _offset < _reader->size()) { _prefetch_status = std::move(s); } @@ -509,10 +528,13 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, { std::unique_lock lck {_lock}; // buffer must be prefetched or it's closed - _prefetched.wait(lck, [this]() { - return _buffer_status == BufferStatus::PREFETCHED || - _buffer_status == BufferStatus::CLOSED; - }); + if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() { + return _buffer_status == BufferStatus::PREFETCHED || + _buffer_status == BufferStatus::CLOSED; + })) { + _prefetch_status = Status::TimedOut("time out when read prefetch buffer"); + return _prefetch_status; + } if (UNLIKELY(BufferStatus::CLOSED == _buffer_status)) { return Status::OK(); } @@ -545,7 +567,11 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, void PrefetchBuffer::close() { std::unique_lock lck {_lock}; // in case _reader still tries to write to the buf after we close the buffer - _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), + [this]() { return _buffer_status != BufferStatus::PENDING; })) { + _prefetch_status = Status::TimedOut("time out when close prefetch buffer"); + return; + } _buffer_status = BufferStatus::CLOSED; _prefetched.notify_all(); if (_sync_profile != nullptr) {