[fix](merge-clod) fix file not found when load for mow table (#32144)

This commit is contained in:
Xin Liao
2024-03-13 14:23:56 +08:00
committed by yiguolei
parent 2f89ec961f
commit dc687dc4cc
3 changed files with 23 additions and 4 deletions

View File

@ -807,6 +807,11 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStati
update_rowset_schema(flush_schema);
}
if (_context.mow_context != nullptr) {
// ensure that the segment file writing is complete
auto* file_writer = _segment_creator.get_file_writer(segment_id);
if (file_writer) {
RETURN_IF_ERROR(file_writer->close());
}
RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
}
return Status::OK();

View File

@ -151,7 +151,7 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
Status SegmentFlusher::close() {
std::lock_guard<SpinLock> l(_lock);
for (auto& file_writer : _file_writers) {
for (auto& [segment_id, file_writer] : _file_writers) {
Status status = file_writer->close();
if (!status.ok()) {
LOG(WARNING) << "failed to close file writer, path=" << file_writer->path()
@ -205,7 +205,7 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
_context->max_rows_per_segment, writer_options, _context->mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
_file_writers.emplace(segment_id, std::move(file_writer));
}
auto s = writer->init();
if (!s.ok()) {
@ -236,7 +236,7 @@ Status SegmentFlusher::_create_segment_writer(
_context->max_rows_per_segment, writer_options, _context->mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
_file_writers.emplace(segment_id, std::move(file_writer));
}
auto s = writer->init();
if (!s.ok()) {
@ -341,6 +341,13 @@ Status SegmentFlusher::create_writer(std::unique_ptr<SegmentFlusher::Writer>& wr
return Status::OK();
}
io::FileWriter* SegmentFlusher::get_file_writer(int32_t segment_id) {
if (!_file_writers.contains(segment_id)) {
return nullptr;
}
return _file_writers[segment_id].get();
}
SegmentFlusher::Writer::Writer(SegmentFlusher* flusher,
std::unique_ptr<segment_v2::SegmentWriter>& segment_writer)
: _flusher(flusher), _writer(std::move(segment_writer)) {};

View File

@ -20,6 +20,7 @@
#include <gen_cpp/olap_file.pb.h>
#include <string>
#include <unordered_map>
#include <vector>
#include "common/status.h"
@ -102,6 +103,8 @@ public:
int64_t num_rows_filtered() const { return _num_rows_filtered; }
io::FileWriter* get_file_writer(int32_t segment_id);
Status close();
public:
@ -153,7 +156,7 @@ private:
RowsetWriterContext* _context;
mutable SpinLock _lock; // protect following vectors.
std::vector<io::FileWriterPtr> _file_writers;
std::unordered_map<int32_t, io::FileWriterPtr> _file_writers;
// written rows by add_block/add_row
std::atomic<int64_t> _num_rows_written = 0;
@ -196,6 +199,10 @@ public:
Status close();
io::FileWriter* get_file_writer(int32_t segment_id) {
return _segment_flusher.get_file_writer(segment_id);
}
private:
std::atomic<int32_t> _next_segment_id = 0;
SegmentFlusher _segment_flusher;