[fix](load) handle Status in beta rowset writer (#25293)
This commit is contained in:
@ -77,11 +77,12 @@ BetaRowsetWriter::~BetaRowsetWriter() {
|
||||
* when the job is cancelled. Although it is meaningless to continue segcompaction when the job
|
||||
* is cancelled, the objects involved in the job should be preserved during segcompaction to
|
||||
* avoid crashs for memory issues. */
|
||||
static_cast<void>(wait_flying_segcompaction());
|
||||
WARN_IF_ERROR(wait_flying_segcompaction(), "segment compaction failed");
|
||||
|
||||
// TODO(lingbin): Should wrapper exception logic, no need to know file ops directly.
|
||||
if (!_already_built) { // abnormal exit, remove all files generated
|
||||
static_cast<void>(_segment_creator.close()); // ensure all files are closed
|
||||
if (!_already_built) { // abnormal exit, remove all files generated
|
||||
WARN_IF_ERROR(_segment_creator.close(),
|
||||
"close segment creator failed"); // ensure all files are closed
|
||||
auto fs = _rowset_meta->fs();
|
||||
if (fs->type() != io::FileSystemType::LOCAL) { // Remote fs will delete them asynchronously
|
||||
return;
|
||||
@ -123,7 +124,7 @@ Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
|
||||
std::make_shared<vectorized::schema_util::LocalSchemaChangeRecorder>();
|
||||
_context.segment_collector = std::make_shared<SegmentCollectorT<BetaRowsetWriter>>(this);
|
||||
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BetaRowsetWriter>>(this);
|
||||
static_cast<void>(_segment_creator.init(_context));
|
||||
RETURN_IF_ERROR(_segment_creator.init(_context));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -413,7 +414,7 @@ Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
|
||||
_total_index_size += rowset->rowset_meta()->index_disk_size();
|
||||
_num_segment += rowset->num_segments();
|
||||
// append key_bounds to current rowset
|
||||
static_cast<void>(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
|
||||
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
|
||||
// TODO update zonemap
|
||||
if (rowset->rowset_meta()->has_delete_predicate()) {
|
||||
_rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate());
|
||||
@ -515,7 +516,11 @@ RowsetSharedPtr BetaRowsetWriter::build() {
|
||||
}
|
||||
|
||||
if (_segcompaction_worker.get_file_writer()) {
|
||||
static_cast<void>(_segcompaction_worker.get_file_writer()->close());
|
||||
status = _segcompaction_worker.get_file_writer()->close();
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "close segment compaction worker failed" << status;
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
status = _check_segment_number_limit();
|
||||
@ -679,7 +684,7 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
|
||||
_context.data_dir, _context.max_rows_per_segment,
|
||||
writer_options, _context.mow_context));
|
||||
if (_segcompaction_worker.get_file_writer() != nullptr) {
|
||||
static_cast<void>(_segcompaction_worker.get_file_writer()->close());
|
||||
RETURN_IF_ERROR(_segcompaction_worker.get_file_writer()->close());
|
||||
}
|
||||
_segcompaction_worker.get_file_writer().reset(file_writer.release());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user