[fix](load) return Status when error in RowsetWriter::build (#25381)

This commit is contained in:
Kaijie Chen
2023-10-17 09:40:23 +08:00
committed by GitHub
parent a194a15442
commit cda8fb6b8b
16 changed files with 80 additions and 95 deletions

View File

@ -107,7 +107,8 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet, const Version& versi
ctx.tablet_schema = tablet->tablet_schema();
ctx.newest_write_timestamp = UnixSeconds();
RETURN_IF_ERROR(tablet->create_rowset_writer(ctx, &writer));
auto rowset = writer->build();
RowsetSharedPtr rowset;
RETURN_IF_ERROR(writer->build(rowset));
rowset->make_visible(version);
std::vector<RowsetSharedPtr> to_add {rowset};

View File

@ -366,11 +366,9 @@ Status Compaction::do_compaction_impl(int64_t permits) {
COUNTER_UPDATE(_merged_rows_counter, stats.merged_rows);
COUNTER_UPDATE(_filtered_rows_counter, stats.filtered_rows);
_output_rowset = _output_rs_writer->build();
if (_output_rowset == nullptr) {
return Status::Error<ROWSET_BUILDER_INIT>("rowset writer build failed. output_version: {}",
_output_version.to_string());
}
RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset),
fmt::format("rowset writer build failed. output_version: {}",
_output_version.to_string()));
// Now we support delete in cumu compaction, to make all data in rowsets whose version
// is below output_version to be delete in the future base compaction, we should carry
// all delete predicate in the output rowset.

View File

@ -306,7 +306,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur
if (reader->eof()) {
break;
}
if (!(res = rowset_writer->add_block(&block))) {
if (!(res = rowset_writer->add_block(&block)).ok()) {
LOG(WARNING) << "fail to attach block to rowset_writer. "
<< "res=" << res << ", tablet=" << cur_tablet->full_name()
<< ", read_rows=" << num_rows;
@ -320,13 +320,17 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur
static_cast<void>(reader->close());
}
if (!rowset_writer->flush().ok()) {
if (!res.ok()) {
break;
}
if (!(res = rowset_writer->flush()).ok()) {
LOG(WARNING) << "failed to finalize writer";
break;
}
*cur_rowset = rowset_writer->build();
if (*cur_rowset == nullptr) {
res = Status::Error<MEM_ALLOC_FAILED>("fail to build rowset");
if (!(res = rowset_writer->build(*cur_rowset)).ok()) {
LOG(WARNING) << "failed to build rowset";
break;
}

View File

@ -484,65 +484,42 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_r
return rowset;
}
RowsetSharedPtr BetaRowsetWriter::build() {
Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
for (auto& file_writer : _file_writers) {
Status status = file_writer->close();
if (!status.ok()) {
LOG(WARNING) << "failed to close file writer, path=" << file_writer->path()
<< " res=" << status;
return nullptr;
}
}
Status status;
status = _segment_creator.close();
if (!status.ok()) {
LOG(WARNING) << "failed to close segment creator when build new rowset, res=" << status;
return nullptr;
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->close(),
fmt::format("failed to close file writer, path={}", file_writer->path().string()));
}
RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(),
"failed to close segment creator when build new rowset")
// if _segment_start_id is not zero, that means it's a transient rowset writer for
// MoW partial update, don't need to do segment compaction.
if (_segment_start_id == 0) {
_segcompaction_worker.cancel();
status = wait_flying_segcompaction();
if (!status.ok()) {
LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status;
return nullptr;
}
status = _segcompaction_rename_last_segments();
if (!status.ok()) {
LOG(WARNING) << "rename last segments failed when build new rowset, res=" << status;
return nullptr;
}
RETURN_NOT_OK_STATUS_WITH_WARN(wait_flying_segcompaction(),
"segcompaction failed when build new rowset");
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(),
"rename last segments failed when build new rowset");
if (_segcompaction_worker.get_file_writer()) {
status = _segcompaction_worker.get_file_writer()->close();
if (!status.ok()) {
LOG(WARNING) << "close segment compaction worker failed" << status;
return nullptr;
}
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker.get_file_writer()->close(),
"close segment compaction worker failed");
}
}
status = _check_segment_number_limit();
if (!status.ok()) {
LOG(WARNING) << "build rowset failed, res=" << status;
return nullptr;
}
RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(),
"too many segments when build new rowset");
_build_rowset_meta(_rowset_meta);
if (_rowset_meta->newest_write_timestamp() == -1) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
}
RowsetSharedPtr rowset;
status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta,
&rowset);
if (!status.ok()) {
LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
return nullptr;
}
RETURN_NOT_OK_STATUS_WITH_WARN(
RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta,
&rowset),
"rowset init failed when build new rowset");
_already_built = true;
return rowset;
return Status::OK();
}
bool BetaRowsetWriter::_is_segment_overlapping(

View File

@ -92,7 +92,7 @@ public:
// This method is thread-safe.
Status flush_single_block(const vectorized::Block* block) override;
RowsetSharedPtr build() override;
Status build(RowsetSharedPtr& rowset) override;
RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override;

View File

@ -97,7 +97,9 @@ public:
// This method is thread-safe.
Status flush_single_block(const vectorized::Block* block) override;
RowsetSharedPtr build() override { return nullptr; };
Status build(RowsetSharedPtr& rowset) override {
return Status::NotSupported("BetaRowsetWriterV2::build is not supported");
};
RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override {
LOG(FATAL) << "not implemeted";

View File

@ -118,9 +118,9 @@ public:
return Status::NotSupported("RowsetWriter does not support add_segment");
}
// finish building and return pointer to the built rowset (guaranteed to be inited).
// return nullptr when failed
virtual RowsetSharedPtr build() = 0;
// finish building and set rowset pointer to the built rowset (guaranteed to be inited).
// rowset is invalid if returned Status is not OK
virtual Status build(RowsetSharedPtr& rowset) = 0;
// For ordered rowset compaction, manual build rowset
virtual RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) = 0;

View File

@ -188,10 +188,7 @@ Status RowsetBuilder::build_rowset() {
SCOPED_TIMER(_build_rowset_timer);
// use rowset meta manager to save meta
_rowset = _rowset_writer->build();
if (_rowset == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("fail to build rowset");
}
RETURN_NOT_OK_STATUS_WITH_WARN(_rowset_writer->build(_rowset), "fail to build rowset");
return Status::OK();
}

View File

@ -612,7 +612,7 @@ Status VSchemaChangeWithSorting::_internal_sorting(
RETURN_IF_ERROR(merger.merge(blocks, rowset_writer.get(), &merged_rows));
_add_merged_rows(merged_rows);
*rowset = rowset_writer->build();
RETURN_IF_ERROR(rowset_writer->build(*rowset));
return Status::OK();
}
@ -1135,8 +1135,8 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
// Add the new version of the data to the header
// In order to prevent the occurrence of deadlock, we must first lock the old table, and then lock the new table
std::lock_guard<std::mutex> lock(sc_params.new_tablet->get_push_lock());
RowsetSharedPtr new_rowset = rowset_writer->build();
if (new_rowset == nullptr) {
RowsetSharedPtr new_rowset;
if (!(res = rowset_writer->build(new_rowset)).ok()) {
LOG(WARNING) << "failed to build rowset, exit alter process";
return process_alter_exit();
}

View File

@ -283,10 +283,9 @@ Status SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
<< " id = " << org_rowset->rowset_id() << " to rowset " << rowset_id;
return res;
}
RowsetSharedPtr new_rowset = rs_writer->build();
if (new_rowset == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("failed to build rowset when rename rowset id");
}
RowsetSharedPtr new_rowset;
RETURN_NOT_OK_STATUS_WITH_WARN(rs_writer->build(new_rowset),
"failed to build rowset when rename rowset id");
RETURN_IF_ERROR(new_rowset->load(false));
new_rowset->rowset_meta()->to_rowset_pb(new_rs_meta_pb);
RETURN_IF_ERROR(org_rowset->remove());

View File

@ -1958,21 +1958,23 @@ Status Tablet::create_initial_rowset(const int64_t req_version) {
context.segments_overlap = OVERLAP_UNKNOWN;
context.tablet_schema = tablet_schema();
context.newest_write_timestamp = UnixSeconds();
res = create_rowset_writer(context, &rs_writer);
if (!res.ok()) {
if (!(res = create_rowset_writer(context, &rs_writer)).ok()) {
LOG(WARNING) << "failed to init rowset writer for tablet " << full_name();
break;
}
res = rs_writer->flush();
if (!res.ok()) {
if (!(res = rs_writer->flush()).ok()) {
LOG(WARNING) << "failed to flush rowset writer for tablet " << full_name();
break;
}
new_rowset = rs_writer->build();
res = add_rowset(new_rowset);
if (!res.ok()) {
if (!(res = rs_writer->build(new_rowset)).ok()) {
LOG(WARNING) << "failed to build rowset for tablet " << full_name();
break;
}
if (!(res = add_rowset(new_rowset)).ok()) {
LOG(WARNING) << "failed to add rowset for tablet " << full_name();
break;
}

View File

@ -374,7 +374,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
if (rowset->tablet_schema()->is_partial_update()) {
// build rowset writer and merge transient rowset
RETURN_IF_ERROR(rowset_writer->flush());
RowsetSharedPtr transient_rowset = rowset_writer->build();
RowsetSharedPtr transient_rowset;
RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
rowset->merge_rowset_meta(transient_rowset->rowset_meta());
// erase segment cache cause we will add a segment to rowset

View File

@ -260,8 +260,7 @@ protected:
}
RowsetSharedPtr rowset;
rowset = rowset_writer->build();
EXPECT_TRUE(rowset != nullptr);
EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
EXPECT_EQ(rowset_data.size(), rowset->rowset_meta()->num_segments());
EXPECT_EQ(num_rows, rowset->rowset_meta()->num_rows());
return rowset;

View File

@ -212,8 +212,7 @@ protected:
}
RowsetSharedPtr rowset;
rowset = rowset_writer->build();
EXPECT_TRUE(rowset != nullptr);
EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
EXPECT_EQ(rowset_data.size(), rowset->rowset_meta()->num_segments());
EXPECT_EQ(num_rows, rowset->rowset_meta()->num_rows());
return rowset;
@ -357,7 +356,8 @@ protected:
input_rs_readers, output_rs_writer.get(), &stats);
}
EXPECT_TRUE(s.ok());
RowsetSharedPtr out_rowset = output_rs_writer->build();
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
// create output rowset reader
RowsetReaderContext reader_context;

View File

@ -263,7 +263,7 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
sleep(1);
}
rowset = rowset_writer->build();
EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
std::vector<std::string> ls;
ls.push_back("10047_0.dat");
ls.push_back("10047_1.dat");
@ -459,7 +459,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
sleep(1);
}
rowset = rowset_writer->build();
EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
std::vector<std::string> ls;
// ooooOOoOooooooooO
ls.push_back("10048_0.dat"); // oooo
@ -585,7 +585,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_OoOoO) {
sleep(1);
}
rowset = rowset_writer->build();
EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
std::vector<std::string> ls;
ls.push_back("10049_0.dat"); // O
ls.push_back("10049_1.dat"); // o
@ -764,7 +764,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) {
EXPECT_EQ(Status::OK(), s);
sleep(1);
rowset = rowset_writer->build();
EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
std::vector<std::string> ls;
ls.push_back("10051_0.dat");
ls.push_back("10051_1.dat");
@ -998,7 +998,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
EXPECT_EQ(Status::OK(), s);
sleep(1);
rowset = rowset_writer->build();
EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
std::vector<std::string> ls;
ls.push_back("10052_0.dat");
ls.push_back("10052_1.dat");

View File

@ -262,8 +262,7 @@ protected:
}
RowsetSharedPtr rowset;
rowset = rowset_writer->build();
EXPECT_TRUE(rowset != nullptr);
EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
EXPECT_EQ(rowset_data.size(), rowset->rowset_meta()->num_segments());
EXPECT_EQ(num_rows, rowset->rowset_meta()->num_rows());
return rowset;
@ -490,7 +489,8 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema,
input_rs_readers, output_rs_writer.get(), 100, &stats);
ASSERT_TRUE(s.ok()) << s;
RowsetSharedPtr out_rowset = output_rs_writer->build();
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
ASSERT_TRUE(out_rowset);
// create output rowset reader
@ -596,7 +596,8 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) {
s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema,
input_rs_readers, output_rs_writer.get(), 100, &stats);
ASSERT_TRUE(s.ok()) << s;
RowsetSharedPtr out_rowset = output_rs_writer->build();
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
// create output rowset reader
RowsetReaderContext reader_context;
@ -702,7 +703,8 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema,
input_rs_readers, output_rs_writer.get(), 10000, &stats);
EXPECT_TRUE(s.ok());
RowsetSharedPtr out_rowset = output_rs_writer->build();
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
// create output rowset reader
RowsetReaderContext reader_context;
@ -810,7 +812,8 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) {
st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema,
input_rs_readers, output_rs_writer.get(), 100, &stats);
ASSERT_TRUE(st.ok()) << st;
RowsetSharedPtr out_rowset = output_rs_writer->build();
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
ASSERT_TRUE(out_rowset);
// create output rowset reader
@ -911,7 +914,8 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) {
st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema,
input_rs_readers, output_rs_writer.get(), 100, &stats);
ASSERT_TRUE(st.ok()) << st;
RowsetSharedPtr out_rowset = output_rs_writer->build();
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
ASSERT_TRUE(out_rowset);
// create output rowset reader
@ -1002,7 +1006,8 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema,
input_rs_readers, output_rs_writer.get(), 100, &stats);
EXPECT_TRUE(s.ok());
RowsetSharedPtr out_rowset = output_rs_writer->build();
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
// create output rowset reader
RowsetReaderContext reader_context;