[bugfix](segmentload) should remove segment from segment cache if load segment failed (#41608) (#41660)
This commit is contained in:
@ -312,25 +312,12 @@ Status Segment::_load_pk_bloom_filter() {
|
||||
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS);
|
||||
DCHECK(_pk_index_meta != nullptr);
|
||||
DCHECK(_pk_index_reader != nullptr);
|
||||
auto status = [this]() {
|
||||
return _load_pk_bf_once.call([this] {
|
||||
RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta));
|
||||
// _meta_mem_usage += _pk_index_reader->get_bf_memory_size();
|
||||
return Status::OK();
|
||||
});
|
||||
}();
|
||||
if (!status.ok()) {
|
||||
remove_from_segment_cache();
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
void Segment::remove_from_segment_cache() const {
|
||||
if (config::disable_segment_cache) {
|
||||
return;
|
||||
}
|
||||
SegmentCache::CacheKey cache_key(_rowset_id, _segment_id);
|
||||
SegmentLoader::instance()->erase_segment(cache_key);
|
||||
return _load_pk_bf_once.call([this] {
|
||||
RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta));
|
||||
// _meta_mem_usage += _pk_index_reader->get_bf_memory_size();
|
||||
return Status::OK();
|
||||
});
|
||||
}
|
||||
|
||||
Status Segment::load_pk_index_and_bf() {
|
||||
@ -340,14 +327,6 @@ Status Segment::load_pk_index_and_bf() {
|
||||
}
|
||||
|
||||
Status Segment::load_index() {
|
||||
auto status = [this]() { return _load_index_impl(); }();
|
||||
if (!status.ok()) {
|
||||
remove_from_segment_cache();
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
Status Segment::_load_index_impl() {
|
||||
return _load_index_once.call([this] {
|
||||
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr) {
|
||||
_pk_index_reader = std::make_unique<PrimaryKeyIndexReader>();
|
||||
@ -381,6 +360,32 @@ Status Segment::_load_index_impl() {
|
||||
});
|
||||
}
|
||||
|
||||
Status Segment::healthy_status() {
|
||||
try {
|
||||
if (_load_index_once.has_called()) {
|
||||
RETURN_IF_ERROR(_load_index_once.stored_result());
|
||||
}
|
||||
if (_load_pk_bf_once.has_called()) {
|
||||
RETURN_IF_ERROR(_load_pk_bf_once.stored_result());
|
||||
}
|
||||
if (_create_column_readers_once_call.has_called()) {
|
||||
RETURN_IF_ERROR(_create_column_readers_once_call.stored_result());
|
||||
}
|
||||
if (_inverted_index_file_reader_open.has_called()) {
|
||||
RETURN_IF_ERROR(_inverted_index_file_reader_open.stored_result());
|
||||
}
|
||||
// This status is set by running time, for example, if there is something wrong during read segment iterator.
|
||||
return _healthy_status.status();
|
||||
} catch (const doris::Exception& e) {
|
||||
// If there is an exception during load_xxx, should not throw exception directly because
|
||||
// the caller may not exception safe.
|
||||
return e.to_status();
|
||||
} catch (const std::exception& e) {
|
||||
// The exception is not thrown by doris code.
|
||||
return Status::InternalError("Unexcepted error during load segment: {}", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
// Return the storage datatype of related column to field.
|
||||
// Return nullptr meaning no such storage infomation for this column
|
||||
vectorized::DataTypePtr Segment::get_data_type_of(const ColumnIdentifier& identifier,
|
||||
|
||||
@ -135,6 +135,12 @@ public:
|
||||
|
||||
Status load_pk_index_and_bf();
|
||||
|
||||
void update_healthy_status(Status new_status) { _healthy_status.update(new_status); }
|
||||
// The segment is loaded into SegmentCache and then will load indices, if there are something wrong
|
||||
// during loading indices, should remove it from SegmentCache. If not, it will always report error during
|
||||
// query. So we add a healthy status API, the caller should check the healhty status before using the segment.
|
||||
Status healthy_status();
|
||||
|
||||
std::string min_key() {
|
||||
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr);
|
||||
return _pk_index_meta->min_key();
|
||||
@ -148,8 +154,6 @@ public:
|
||||
|
||||
int64_t meta_mem_usage() const { return _meta_mem_usage; }
|
||||
|
||||
void remove_from_segment_cache() const;
|
||||
|
||||
// Identify the column by unique id or path info
|
||||
struct ColumnIdentifier {
|
||||
int32_t unique_id = -1;
|
||||
@ -212,7 +216,6 @@ private:
|
||||
const SubcolumnColumnReaders::Node* root,
|
||||
vectorized::DataTypePtr target_type_hint);
|
||||
|
||||
Status _load_index_impl();
|
||||
Status _open_inverted_index();
|
||||
|
||||
Status _create_column_readers_once();
|
||||
@ -222,6 +225,7 @@ private:
|
||||
io::FileReaderSPtr _file_reader;
|
||||
uint32_t _segment_id;
|
||||
uint32_t _num_rows;
|
||||
AtomicStatus _healthy_status;
|
||||
|
||||
// 1. Tracking memory use by segment meta data such as footer or index page.
|
||||
// 2. Tracking memory use by segment column reader
|
||||
|
||||
@ -269,8 +269,8 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr sc
|
||||
|
||||
Status SegmentIterator::init(const StorageReadOptions& opts) {
|
||||
auto status = _init_impl(opts);
|
||||
if (!status.ok() && !config::disable_segment_cache) {
|
||||
_segment->remove_from_segment_cache();
|
||||
if (!status.ok()) {
|
||||
_segment->update_healthy_status(status);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
@ -1931,7 +1931,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
|
||||
|
||||
// if rows read by batch is 0, will return end of file, we should not remove segment cache in this situation.
|
||||
if (!status.ok() && !status.is<END_OF_FILE>()) {
|
||||
_segment->remove_from_segment_cache();
|
||||
_segment->update_healthy_status(status);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
@ -59,8 +59,14 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
|
||||
for (int64_t i = 0; i < rowset->num_segments(); i++) {
|
||||
SegmentCache::CacheKey cache_key(rowset->rowset_id(), i);
|
||||
if (_segment_cache->lookup(cache_key, cache_handle)) {
|
||||
continue;
|
||||
// Has to check the segment status here, because the segment in cache may has something wrong during
|
||||
// load index or create column reader.
|
||||
// Not merge this if logic with previous to make the logic more clear.
|
||||
if (cache_handle->pop_unhealthy_segment() == nullptr) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache.
|
||||
segment_v2::SegmentSharedPtr segment;
|
||||
RETURN_IF_ERROR(rowset->load_segment(i, &segment));
|
||||
if (need_load_pk_index_and_bf) {
|
||||
|
||||
@ -162,6 +162,18 @@ public:
|
||||
_init = true;
|
||||
}
|
||||
|
||||
segment_v2::SegmentSharedPtr pop_unhealthy_segment() {
|
||||
if (segments.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
auto& last_segment = segments.back();
|
||||
if (last_segment->healthy_status().ok()) {
|
||||
return nullptr;
|
||||
}
|
||||
segments.pop_back();
|
||||
return last_segment;
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<segment_v2::SegmentSharedPtr> segments;
|
||||
bool _init {false};
|
||||
|
||||
Reference in New Issue
Block a user