From cda8fb6b8b17a9361df97cd3d5cea46e4b80fe5e Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 17 Oct 2023 09:40:23 +0800 Subject: [PATCH] [fix](load) return Status when error in RowsetWriter::build (#25381) --- be/src/http/action/pad_rowset_action.cpp | 3 +- be/src/olap/compaction.cpp | 8 +-- be/src/olap/push_handler.cpp | 14 +++-- be/src/olap/rowset/beta_rowset_writer.cpp | 61 ++++++------------- be/src/olap/rowset/beta_rowset_writer.h | 2 +- be/src/olap/rowset/beta_rowset_writer_v2.h | 4 +- be/src/olap/rowset/rowset_writer.h | 6 +- be/src/olap/rowset_builder.cpp | 5 +- be/src/olap/schema_change.cpp | 6 +- be/src/olap/snapshot_manager.cpp | 7 +-- be/src/olap/tablet.cpp | 16 ++--- be/src/olap/txn_manager.cpp | 3 +- be/test/olap/ordered_data_compaction_test.cpp | 3 +- be/test/olap/rowid_conversion_test.cpp | 6 +- be/test/olap/segcompaction_test.cpp | 10 +-- be/test/vec/olap/vertical_compaction_test.cpp | 21 ++++--- 16 files changed, 80 insertions(+), 95 deletions(-) diff --git a/be/src/http/action/pad_rowset_action.cpp b/be/src/http/action/pad_rowset_action.cpp index 63c19dbc94..c14458438c 100644 --- a/be/src/http/action/pad_rowset_action.cpp +++ b/be/src/http/action/pad_rowset_action.cpp @@ -107,7 +107,8 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet, const Version& versi ctx.tablet_schema = tablet->tablet_schema(); ctx.newest_write_timestamp = UnixSeconds(); RETURN_IF_ERROR(tablet->create_rowset_writer(ctx, &writer)); - auto rowset = writer->build(); + RowsetSharedPtr rowset; + RETURN_IF_ERROR(writer->build(rowset)); rowset->make_visible(version); std::vector to_add {rowset}; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 268c62847b..8fb2df2a49 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -366,11 +366,9 @@ Status Compaction::do_compaction_impl(int64_t permits) { COUNTER_UPDATE(_merged_rows_counter, stats.merged_rows); COUNTER_UPDATE(_filtered_rows_counter, stats.filtered_rows); - _output_rowset = _output_rs_writer->build(); - if (_output_rowset == nullptr) { - return Status::Error("rowset writer build failed. output_version: {}", - _output_version.to_string()); - } + RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset), + fmt::format("rowset writer build failed. output_version: {}", + _output_version.to_string())); // Now we support delete in cumu compaction, to make all data in rowsets whose version // is below output_version to be delete in the future base compaction, we should carry // all delete predicate in the output rowset. diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index fc13b1692a..28583afc7e 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -306,7 +306,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur if (reader->eof()) { break; } - if (!(res = rowset_writer->add_block(&block))) { + if (!(res = rowset_writer->add_block(&block)).ok()) { LOG(WARNING) << "fail to attach block to rowset_writer. " << "res=" << res << ", tablet=" << cur_tablet->full_name() << ", read_rows=" << num_rows; @@ -320,13 +320,17 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur static_cast(reader->close()); } - if (!rowset_writer->flush().ok()) { + if (!res.ok()) { + break; + } + + if (!(res = rowset_writer->flush()).ok()) { LOG(WARNING) << "failed to finalize writer"; break; } - *cur_rowset = rowset_writer->build(); - if (*cur_rowset == nullptr) { - res = Status::Error("fail to build rowset"); + + if (!(res = rowset_writer->build(*cur_rowset)).ok()) { + LOG(WARNING) << "failed to build rowset"; break; } diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 6419378cc4..b8c578ff80 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -484,65 +484,42 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_r return rowset; } -RowsetSharedPtr BetaRowsetWriter::build() { +Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { // TODO(lingbin): move to more better place, or in a CreateBlockBatch? for (auto& file_writer : _file_writers) { - Status status = file_writer->close(); - if (!status.ok()) { - LOG(WARNING) << "failed to close file writer, path=" << file_writer->path() - << " res=" << status; - return nullptr; - } - } - Status status; - status = _segment_creator.close(); - if (!status.ok()) { - LOG(WARNING) << "failed to close segment creator when build new rowset, res=" << status; - return nullptr; + RETURN_NOT_OK_STATUS_WITH_WARN( + file_writer->close(), + fmt::format("failed to close file writer, path={}", file_writer->path().string())); } + RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(), + "failed to close segment creator when build new rowset") // if _segment_start_id is not zero, that means it's a transient rowset writer for // MoW partial update, don't need to do segment compaction. if (_segment_start_id == 0) { _segcompaction_worker.cancel(); - status = wait_flying_segcompaction(); - if (!status.ok()) { - LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status; - return nullptr; - } - status = _segcompaction_rename_last_segments(); - if (!status.ok()) { - LOG(WARNING) << "rename last segments failed when build new rowset, res=" << status; - return nullptr; - } - + RETURN_NOT_OK_STATUS_WITH_WARN(wait_flying_segcompaction(), + "segcompaction failed when build new rowset"); + RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(), + "rename last segments failed when build new rowset"); if (_segcompaction_worker.get_file_writer()) { - status = _segcompaction_worker.get_file_writer()->close(); - if (!status.ok()) { - LOG(WARNING) << "close segment compaction worker failed" << status; - return nullptr; - } + RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker.get_file_writer()->close(), + "close segment compaction worker failed"); } } - status = _check_segment_number_limit(); - if (!status.ok()) { - LOG(WARNING) << "build rowset failed, res=" << status; - return nullptr; - } + RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(), + "too many segments when build new rowset"); _build_rowset_meta(_rowset_meta); if (_rowset_meta->newest_write_timestamp() == -1) { _rowset_meta->set_newest_write_timestamp(UnixSeconds()); } - RowsetSharedPtr rowset; - status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta, - &rowset); - if (!status.ok()) { - LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; - return nullptr; - } + RETURN_NOT_OK_STATUS_WITH_WARN( + RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta, + &rowset), + "rowset init failed when build new rowset"); _already_built = true; - return rowset; + return Status::OK(); } bool BetaRowsetWriter::_is_segment_overlapping( diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 38cb6d24f9..3691a66140 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -92,7 +92,7 @@ public: // This method is thread-safe. Status flush_single_block(const vectorized::Block* block) override; - RowsetSharedPtr build() override; + Status build(RowsetSharedPtr& rowset) override; RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override; diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index a982272217..1279a564ff 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -97,7 +97,9 @@ public: // This method is thread-safe. Status flush_single_block(const vectorized::Block* block) override; - RowsetSharedPtr build() override { return nullptr; }; + Status build(RowsetSharedPtr& rowset) override { + return Status::NotSupported("BetaRowsetWriterV2::build is not supported"); + }; RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override { LOG(FATAL) << "not implemeted"; diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 21637a2379..d32c813233 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -118,9 +118,9 @@ public: return Status::NotSupported("RowsetWriter does not support add_segment"); } - // finish building and return pointer to the built rowset (guaranteed to be inited). - // return nullptr when failed - virtual RowsetSharedPtr build() = 0; + // finish building and set rowset pointer to the built rowset (guaranteed to be inited). + // rowset is invalid if returned Status is not OK + virtual Status build(RowsetSharedPtr& rowset) = 0; // For ordered rowset compaction, manual build rowset virtual RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) = 0; diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index e599d96a8f..0173643b8c 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -188,10 +188,7 @@ Status RowsetBuilder::build_rowset() { SCOPED_TIMER(_build_rowset_timer); // use rowset meta manager to save meta - _rowset = _rowset_writer->build(); - if (_rowset == nullptr) { - return Status::Error("fail to build rowset"); - } + RETURN_NOT_OK_STATUS_WITH_WARN(_rowset_writer->build(_rowset), "fail to build rowset"); return Status::OK(); } diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index abc17fb0bc..c688e45101 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -612,7 +612,7 @@ Status VSchemaChangeWithSorting::_internal_sorting( RETURN_IF_ERROR(merger.merge(blocks, rowset_writer.get(), &merged_rows)); _add_merged_rows(merged_rows); - *rowset = rowset_writer->build(); + RETURN_IF_ERROR(rowset_writer->build(*rowset)); return Status::OK(); } @@ -1135,8 +1135,8 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams // Add the new version of the data to the header // In order to prevent the occurrence of deadlock, we must first lock the old table, and then lock the new table std::lock_guard lock(sc_params.new_tablet->get_push_lock()); - RowsetSharedPtr new_rowset = rowset_writer->build(); - if (new_rowset == nullptr) { + RowsetSharedPtr new_rowset; + if (!(res = rowset_writer->build(new_rowset)).ok()) { LOG(WARNING) << "failed to build rowset, exit alter process"; return process_alter_exit(); } diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index a2b7276918..c7eaf930a2 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -283,10 +283,9 @@ Status SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb, << " id = " << org_rowset->rowset_id() << " to rowset " << rowset_id; return res; } - RowsetSharedPtr new_rowset = rs_writer->build(); - if (new_rowset == nullptr) { - return Status::Error("failed to build rowset when rename rowset id"); - } + RowsetSharedPtr new_rowset; + RETURN_NOT_OK_STATUS_WITH_WARN(rs_writer->build(new_rowset), + "failed to build rowset when rename rowset id"); RETURN_IF_ERROR(new_rowset->load(false)); new_rowset->rowset_meta()->to_rowset_pb(new_rs_meta_pb); RETURN_IF_ERROR(org_rowset->remove()); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 5e55fd68d1..8d11e39731 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1958,21 +1958,23 @@ Status Tablet::create_initial_rowset(const int64_t req_version) { context.segments_overlap = OVERLAP_UNKNOWN; context.tablet_schema = tablet_schema(); context.newest_write_timestamp = UnixSeconds(); - res = create_rowset_writer(context, &rs_writer); - if (!res.ok()) { + if (!(res = create_rowset_writer(context, &rs_writer)).ok()) { LOG(WARNING) << "failed to init rowset writer for tablet " << full_name(); break; } - res = rs_writer->flush(); - if (!res.ok()) { + + if (!(res = rs_writer->flush()).ok()) { LOG(WARNING) << "failed to flush rowset writer for tablet " << full_name(); break; } - new_rowset = rs_writer->build(); - res = add_rowset(new_rowset); - if (!res.ok()) { + if (!(res = rs_writer->build(new_rowset)).ok()) { + LOG(WARNING) << "failed to build rowset for tablet " << full_name(); + break; + } + + if (!(res = add_rowset(new_rowset)).ok()) { LOG(WARNING) << "failed to add rowset for tablet " << full_name(); break; } diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 407aea6e44..6d14e620d9 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -374,7 +374,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, if (rowset->tablet_schema()->is_partial_update()) { // build rowset writer and merge transient rowset RETURN_IF_ERROR(rowset_writer->flush()); - RowsetSharedPtr transient_rowset = rowset_writer->build(); + RowsetSharedPtr transient_rowset; + RETURN_IF_ERROR(rowset_writer->build(transient_rowset)); rowset->merge_rowset_meta(transient_rowset->rowset_meta()); // erase segment cache cause we will add a segment to rowset diff --git a/be/test/olap/ordered_data_compaction_test.cpp b/be/test/olap/ordered_data_compaction_test.cpp index bac4b63e20..ae33022cff 100644 --- a/be/test/olap/ordered_data_compaction_test.cpp +++ b/be/test/olap/ordered_data_compaction_test.cpp @@ -260,8 +260,7 @@ protected: } RowsetSharedPtr rowset; - rowset = rowset_writer->build(); - EXPECT_TRUE(rowset != nullptr); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); EXPECT_EQ(rowset_data.size(), rowset->rowset_meta()->num_segments()); EXPECT_EQ(num_rows, rowset->rowset_meta()->num_rows()); return rowset; diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index 20687d263c..221ae4c3a6 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -212,8 +212,7 @@ protected: } RowsetSharedPtr rowset; - rowset = rowset_writer->build(); - EXPECT_TRUE(rowset != nullptr); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); EXPECT_EQ(rowset_data.size(), rowset->rowset_meta()->num_segments()); EXPECT_EQ(num_rows, rowset->rowset_meta()->num_rows()); return rowset; @@ -357,7 +356,8 @@ protected: input_rs_readers, output_rs_writer.get(), &stats); } EXPECT_TRUE(s.ok()); - RowsetSharedPtr out_rowset = output_rs_writer->build(); + RowsetSharedPtr out_rowset; + EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); // create output rowset reader RowsetReaderContext reader_context; diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp index ed0b51341b..826ed7da33 100644 --- a/be/test/olap/segcompaction_test.cpp +++ b/be/test/olap/segcompaction_test.cpp @@ -263,7 +263,7 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) { sleep(1); } - rowset = rowset_writer->build(); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); std::vector ls; ls.push_back("10047_0.dat"); ls.push_back("10047_1.dat"); @@ -459,7 +459,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { sleep(1); } - rowset = rowset_writer->build(); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); std::vector ls; // ooooOOoOooooooooO ls.push_back("10048_0.dat"); // oooo @@ -585,7 +585,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_OoOoO) { sleep(1); } - rowset = rowset_writer->build(); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); std::vector ls; ls.push_back("10049_0.dat"); // O ls.push_back("10049_1.dat"); // o @@ -764,7 +764,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { EXPECT_EQ(Status::OK(), s); sleep(1); - rowset = rowset_writer->build(); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); std::vector ls; ls.push_back("10051_0.dat"); ls.push_back("10051_1.dat"); @@ -998,7 +998,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { EXPECT_EQ(Status::OK(), s); sleep(1); - rowset = rowset_writer->build(); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); std::vector ls; ls.push_back("10052_0.dat"); ls.push_back("10052_1.dat"); diff --git a/be/test/vec/olap/vertical_compaction_test.cpp b/be/test/vec/olap/vertical_compaction_test.cpp index 619e3d44a0..536077f1dd 100644 --- a/be/test/vec/olap/vertical_compaction_test.cpp +++ b/be/test/vec/olap/vertical_compaction_test.cpp @@ -262,8 +262,7 @@ protected: } RowsetSharedPtr rowset; - rowset = rowset_writer->build(); - EXPECT_TRUE(rowset != nullptr); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); EXPECT_EQ(rowset_data.size(), rowset->rowset_meta()->num_segments()); EXPECT_EQ(num_rows, rowset->rowset_meta()->num_rows()); return rowset; @@ -490,7 +489,8 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) { s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, input_rs_readers, output_rs_writer.get(), 100, &stats); ASSERT_TRUE(s.ok()) << s; - RowsetSharedPtr out_rowset = output_rs_writer->build(); + RowsetSharedPtr out_rowset; + EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); ASSERT_TRUE(out_rowset); // create output rowset reader @@ -596,7 +596,8 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, input_rs_readers, output_rs_writer.get(), 100, &stats); ASSERT_TRUE(s.ok()) << s; - RowsetSharedPtr out_rowset = output_rs_writer->build(); + RowsetSharedPtr out_rowset; + EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); // create output rowset reader RowsetReaderContext reader_context; @@ -702,7 +703,8 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, input_rs_readers, output_rs_writer.get(), 10000, &stats); EXPECT_TRUE(s.ok()); - RowsetSharedPtr out_rowset = output_rs_writer->build(); + RowsetSharedPtr out_rowset; + EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); // create output rowset reader RowsetReaderContext reader_context; @@ -810,7 +812,8 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, input_rs_readers, output_rs_writer.get(), 100, &stats); ASSERT_TRUE(st.ok()) << st; - RowsetSharedPtr out_rowset = output_rs_writer->build(); + RowsetSharedPtr out_rowset; + EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); ASSERT_TRUE(out_rowset); // create output rowset reader @@ -911,7 +914,8 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, input_rs_readers, output_rs_writer.get(), 100, &stats); ASSERT_TRUE(st.ok()) << st; - RowsetSharedPtr out_rowset = output_rs_writer->build(); + RowsetSharedPtr out_rowset; + EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); ASSERT_TRUE(out_rowset); // create output rowset reader @@ -1002,7 +1006,8 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) { s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, input_rs_readers, output_rs_writer.get(), 100, &stats); EXPECT_TRUE(s.ok()); - RowsetSharedPtr out_rowset = output_rs_writer->build(); + RowsetSharedPtr out_rowset; + EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); // create output rowset reader RowsetReaderContext reader_context;