[fix](be) Make DorisCallOnce's function exception-safe (#25579)
This commit is contained in:
@ -235,12 +235,26 @@ Status Segment::_load_pk_bloom_filter() {
|
||||
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS);
|
||||
DCHECK(_pk_index_meta != nullptr);
|
||||
DCHECK(_pk_index_reader != nullptr);
|
||||
return _load_pk_bf_once.call([this] {
|
||||
RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta));
|
||||
_meta_mem_usage += _pk_index_reader->get_bf_memory_size();
|
||||
_segment_meta_mem_tracker->consume(_pk_index_reader->get_bf_memory_size());
|
||||
return Status::OK();
|
||||
});
|
||||
auto status = [this]() {
|
||||
return _load_pk_bf_once.call([this] {
|
||||
RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta));
|
||||
_meta_mem_usage += _pk_index_reader->get_bf_memory_size();
|
||||
_segment_meta_mem_tracker->consume(_pk_index_reader->get_bf_memory_size());
|
||||
return Status::OK();
|
||||
});
|
||||
}();
|
||||
if (!status.ok()) {
|
||||
remove_from_segment_cache();
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
void Segment::remove_from_segment_cache() const {
|
||||
if (config::disable_segment_cache) {
|
||||
return;
|
||||
}
|
||||
SegmentCache::CacheKey cache_key(_rowset_id, _segment_id);
|
||||
SegmentLoader::instance()->erase_segment(cache_key);
|
||||
}
|
||||
|
||||
Status Segment::load_pk_index_and_bf() {
|
||||
@ -249,6 +263,14 @@ Status Segment::load_pk_index_and_bf() {
|
||||
return Status::OK();
|
||||
}
|
||||
Status Segment::load_index() {
|
||||
auto status = [this]() { return _load_index_impl(); }();
|
||||
if (!status.ok()) {
|
||||
remove_from_segment_cache();
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
Status Segment::_load_index_impl() {
|
||||
return _load_index_once.call([this] {
|
||||
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr) {
|
||||
_pk_index_reader.reset(new PrimaryKeyIndexReader());
|
||||
|
||||
@ -125,6 +125,8 @@ public:
|
||||
|
||||
int64_t meta_mem_usage() const { return _meta_mem_usage; }
|
||||
|
||||
void remove_from_segment_cache() const;
|
||||
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(Segment);
|
||||
Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema);
|
||||
@ -134,6 +136,8 @@ private:
|
||||
Status _create_column_readers(const SegmentFooterPB& footer);
|
||||
Status _load_pk_bloom_filter();
|
||||
|
||||
Status _load_index_impl();
|
||||
|
||||
private:
|
||||
friend class SegmentIterator;
|
||||
io::FileReaderSPtr _file_reader;
|
||||
|
||||
@ -211,6 +211,14 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr sc
|
||||
_pool(new ObjectPool) {}
|
||||
|
||||
Status SegmentIterator::init(const StorageReadOptions& opts) {
|
||||
auto status = _init_impl(opts);
|
||||
if (!status.ok() && !config::disable_segment_cache) {
|
||||
_segment->remove_from_segment_cache();
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
|
||||
// get file handle from file descriptor of segment
|
||||
if (_inited) {
|
||||
return Status::OK();
|
||||
@ -1776,8 +1784,11 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
|
||||
}
|
||||
|
||||
Status SegmentIterator::next_batch(vectorized::Block* block) {
|
||||
RETURN_IF_CATCH_EXCEPTION({ return _next_batch_internal(block); });
|
||||
return Status::OK();
|
||||
auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return _next_batch_internal(block); }); }();
|
||||
if (!status.ok()) {
|
||||
_segment->remove_from_segment_cache();
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
|
||||
|
||||
@ -152,7 +152,7 @@ private:
|
||||
}
|
||||
|
||||
[[nodiscard]] Status _lazy_init();
|
||||
|
||||
[[nodiscard]] Status _init_impl(const StorageReadOptions& opts);
|
||||
[[nodiscard]] Status _init_return_column_iterators();
|
||||
[[nodiscard]] Status _init_bitmap_index_iterators();
|
||||
[[nodiscard]] Status _init_inverted_index_iterators();
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#include "common/exception.h"
|
||||
#include "olap/olap_common.h"
|
||||
#include "util/lock.h"
|
||||
|
||||
@ -60,7 +61,7 @@ public:
|
||||
std::lock_guard l(_mutex);
|
||||
if (_has_called.load(std::memory_order_acquire)) break;
|
||||
|
||||
_status = fn();
|
||||
_status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return fn(); }); }();
|
||||
_has_called.store(true, std::memory_order_release);
|
||||
|
||||
} while (false);
|
||||
|
||||
Reference in New Issue
Block a user