From 4e86f9bab5fa561c6922d58343abe28578a72b52 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Sat, 9 Dec 2023 16:21:36 +0800 Subject: [PATCH] [improve](move-memtable) include and check offset when append data (#28159) --- be/src/io/fs/stream_sink_file_writer.cpp | 10 +-- be/src/runtime/load_stream.cpp | 2 +- be/src/runtime/load_stream_writer.cpp | 71 ++++++++++++++----- be/src/runtime/load_stream_writer.h | 2 +- be/src/vec/sink/load_stream_stub.cpp | 3 +- be/src/vec/sink/load_stream_stub.h | 3 +- .../io/fs/stream_sink_file_writer_test.cpp | 4 +- be/test/runtime/load_stream_test.cpp | 47 ++++++------ gensrc/proto/internal_service.proto | 1 + 9 files changed, 94 insertions(+), 49 deletions(-) diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp index 9b2125c944..484be9e07e 100644 --- a/be/src/io/fs/stream_sink_file_writer.cpp +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -44,7 +44,6 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { for (int i = 0; i < data_cnt; i++) { bytes_req += data[i].get_size(); } - _bytes_appended += bytes_req; VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id @@ -52,9 +51,10 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { std::span slices {data, data_cnt}; for (auto& stream : _streams) { - RETURN_IF_ERROR( - stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, slices)); + RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, + _bytes_appended, slices)); } + _bytes_appended += bytes_req; return Status::OK(); } @@ -63,8 +63,8 @@ Status StreamSinkFileWriter::finalize() { << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id; // TODO(zhengyu): update get_inverted_index_file_size into stat for (auto& stream : _streams) { - RETURN_IF_ERROR( - stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, {}, true)); + RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, + _bytes_appended, {}, true)); } return Status::OK(); } diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 80d82956a0..9d05d48f54 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -127,7 +127,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DCHECK(new_segid != std::numeric_limits::max()); butil::IOBuf buf = data->movable(); auto flush_func = [this, new_segid, eos, buf, header]() { - auto st = _load_stream_writer->append_data(new_segid, buf); + auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf); if (eos && st.ok()) { st = _load_stream_writer->close_segment(new_segid); } diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 427ace47d0..0a339e854a 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -83,14 +83,14 @@ Status LoadStreamWriter::init() { return Status::OK(); } -Status LoadStreamWriter::append_data(uint32_t segid, butil::IOBuf buf) { +Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf) { io::FileWriter* file_writer = nullptr; { std::lock_guard lock_guard(_lock); if (!_is_init) { RETURN_IF_ERROR(init()); } - if (segid + 1 > _segment_file_writers.size()) { + if (segid >= _segment_file_writers.size()) { for (size_t i = _segment_file_writers.size(); i <= segid; i++) { Status st; io::FileWriterPtr file_writer; @@ -107,32 +107,70 @@ Status LoadStreamWriter::append_data(uint32_t segid, butil::IOBuf buf) { file_writer = _segment_file_writers[segid].get(); } VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid; + if (file_writer == nullptr) { + return Status::Corruption("append_data failed, file writer {} is destoryed", segid); + } + if (file_writer->bytes_appended() != offset) { + return Status::Corruption( + "append_data out-of-order in segment={}, expected offset={}, actual={}", + file_writer->path().native(), offset, file_writer->bytes_appended()); + } return file_writer->append(buf.to_string()); } Status LoadStreamWriter::close_segment(uint32_t segid) { - auto st = _segment_file_writers[segid]->close(); + io::FileWriter* file_writer = nullptr; + { + std::lock_guard lock_guard(_lock); + if (!_is_init) { + return Status::Corruption("close_segment failed, LoadStreamWriter is not inited"); + } + if (segid >= _segment_file_writers.size()) { + return Status::Corruption("close_segment failed, segment {} is never opened", segid); + } + file_writer = _segment_file_writers[segid].get(); + } + if (file_writer == nullptr) { + return Status::Corruption("close_segment failed, file writer {} is destoryed", segid); + } + auto st = file_writer->close(); if (!st.ok()) { _is_canceled = true; return st; } - if (_segment_file_writers[segid]->bytes_appended() == 0) { - return Status::Corruption("segment {} is zero bytes", segid); + LOG(INFO) << "segment " << segid << " path " << file_writer->path().native() + << "closed, written " << file_writer->bytes_appended() << " bytes"; + if (file_writer->bytes_appended() == 0) { + return Status::Corruption("segment {} closed with 0 bytes", file_writer->path().native()); } - LOG(INFO) << "segid " << segid << "path " << _segment_file_writers[segid]->path() << " written " - << _segment_file_writers[segid]->bytes_appended() << " bytes"; return Status::OK(); } Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_schema) { - if (_segment_file_writers[segid]->bytes_appended() != stat.data_size) { - LOG(WARNING) << _segment_file_writers[segid]->path() << " is incomplete, actual size: " - << _segment_file_writers[segid]->bytes_appended() - << ", expected size: " << stat.data_size; - return Status::Corruption("segment {} is incomplete, actual size: {}, expected size: {}", - _segment_file_writers[segid]->path().native(), - _segment_file_writers[segid]->bytes_appended(), stat.data_size); + io::FileWriter* file_writer = nullptr; + { + std::lock_guard lock_guard(_lock); + if (!_is_init) { + return Status::Corruption("add_segment failed, LoadStreamWriter is not inited"); + } + if (segid >= _segment_file_writers.size()) { + return Status::Corruption("add_segment failed, segment {} is never opened", segid); + } + file_writer = _segment_file_writers[segid].get(); + } + if (file_writer == nullptr) { + return Status::Corruption("add_segment failed, file writer {} is destoryed", segid); + } + if (!file_writer->is_closed()) { + return Status::Corruption("add_segment failed, segment {} is not closed", + file_writer->path().native()); + } + if (file_writer->bytes_appended() != stat.data_size) { + return Status::Corruption( + "add_segment failed, segment stat {} does not match, file size={}, " + "stat.data_size={}", + file_writer->path().native(), file_writer->bytes_appended(), stat.data_size); } return _rowset_writer->add_segment(segid, stat, flush_schema); } @@ -152,12 +190,13 @@ Status LoadStreamWriter::close() { << "rowset builder is supposed be to initialized before close_wait() being called"; if (_is_canceled) { - return Status::Error("flush segment failed"); + return Status::InternalError("flush segment failed"); } for (const auto& writer : _segment_file_writers) { if (!writer->is_closed()) { - return Status::Corruption("segment {} is not closed", writer->path().native()); + return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed", + writer->path().native()); } } diff --git a/be/src/runtime/load_stream_writer.h b/be/src/runtime/load_stream_writer.h index e038ceeb89..ab6530bf60 100644 --- a/be/src/runtime/load_stream_writer.h +++ b/be/src/runtime/load_stream_writer.h @@ -61,7 +61,7 @@ public: Status init(); - Status append_data(uint32_t segid, butil::IOBuf buf); + Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf); Status close_segment(uint32_t segid); diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index dd7a4a8cec..535e941b3b 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -185,7 +185,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, // APPEND_DATA Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, - int64_t segment_id, std::span data, + int64_t segment_id, uint64_t offset, std::span data, bool segment_eos) { PStreamHeader header; header.set_src_id(_src_id); @@ -195,6 +195,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 header.set_tablet_id(tablet_id); header.set_segment_id(segment_id); header.set_segment_eos(segment_eos); + header.set_offset(offset); header.set_opcode(doris::PStreamHeader::APPEND_DATA); return _encode_and_send(header, data); } diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 786de57d75..edbbbda1e6 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -162,7 +162,8 @@ public: // APPEND_DATA Status append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, - int64_t segment_id, std::span data, bool segment_eos = false); + int64_t segment_id, uint64_t offset, std::span data, + bool segment_eos = false); // ADD_SEGMENT Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp b/be/test/io/fs/stream_sink_file_writer_test.cpp index c52b59f01e..7e5bdd350f 100644 --- a/be/test/io/fs/stream_sink_file_writer_test.cpp +++ b/be/test/io/fs/stream_sink_file_writer_test.cpp @@ -57,7 +57,7 @@ class StreamSinkFileWriterTest : public testing::Test { // APPEND_DATA virtual Status append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, - int64_t segment_id, std::span data, + int64_t segment_id, uint64_t offset, std::span data, bool segment_eos = false) override { EXPECT_EQ(PARTITION_ID, partition_id); EXPECT_EQ(INDEX_ID, index_id); @@ -65,10 +65,12 @@ class StreamSinkFileWriterTest : public testing::Test { EXPECT_EQ(SEGMENT_ID, segment_id); if (segment_eos) { EXPECT_EQ(0, data.size()); + EXPECT_EQ(DATA0.length() + DATA1.length(), offset); } else { EXPECT_EQ(2, data.size()); EXPECT_EQ(DATA0, data[0].to_string()); EXPECT_EQ(DATA1, data[1].to_string()); + EXPECT_EQ(0, offset); } g_num_request++; return Status::OK(); diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index 247f9c6b6b..b1ad082617 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -513,8 +513,8 @@ public: } void write_one_tablet(MockSinkClient& client, UniqueId load_id, uint32_t sender_id, - int64_t index_id, int64_t tablet_id, uint32_t segid, std::string& data, - bool segment_eos) { + int64_t index_id, int64_t tablet_id, uint32_t segid, uint64_t offset, + std::string& data, bool segment_eos) { // append data butil::IOBuf append_buf; PStreamHeader header; @@ -527,6 +527,7 @@ public: header.set_segment_eos(segment_eos); header.set_src_id(sender_id); header.set_partition_id(NORMAL_PARTITION_ID); + header.set_offset(offset); size_t hdr_len = header.ByteSizeLong(); append_buf.append((char*)&hdr_len, sizeof(size_t)); append_buf.append(header.SerializeAsString()); @@ -539,27 +540,27 @@ public: void write_normal(MockSinkClient& client) { write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, - NORMAL_TABLET_ID, 0, NORMAL_STRING, true); + NORMAL_TABLET_ID, 0, 0, NORMAL_STRING, true); } void write_abnormal_load(MockSinkClient& client) { write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, - NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true); + NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); } void write_abnormal_index(MockSinkClient& client) { write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, ABNORMAL_INDEX_ID, - NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true); + NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); } void write_abnormal_sender(MockSinkClient& client) { write_one_tablet(client, NORMAL_LOAD_ID, ABNORMAL_SENDER_ID, NORMAL_INDEX_ID, - NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true); + NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); } void write_abnormal_tablet(MockSinkClient& client) { write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, - ABNORMAL_TABLET_ID, 0, ABNORMAL_STRING, true); + ABNORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); } void wait_for_ack(int32_t num) { @@ -710,7 +711,7 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_load) { EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); - EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); + EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); // server will close stream on CLOSE_LOAD wait_for_close(); @@ -820,7 +821,7 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_b PStreamHeader header; std::string data; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, - data, true); + 0, data, true); EXPECT_EQ(g_response_stat.num, 0); // CLOSE_LOAD @@ -861,9 +862,9 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) { PStreamHeader header; std::string data = "file1 hello world 123 !@#$%^&*()_+"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, - data, false); + 0, data, false); write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, - data, true); + data.length(), data, true); EXPECT_EQ(g_response_stat.num, 0); // CLOSE_LOAD @@ -907,7 +908,7 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment_without PStreamHeader header; std::string data = "file1 hello world 123 !@#$%^&*()_+"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, - data, false); + 0, data, false); EXPECT_EQ(g_response_stat.num, 0); // CLOSE_LOAD @@ -948,9 +949,9 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) { PStreamHeader header; std::string data = "file1 hello world 123 !@#$%^&*()_+"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1, - data, false); + 0, data, false); write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1, - data, true); + data.length(), data, true); EXPECT_EQ(g_response_stat.num, 0); // CLOSE_LOAD @@ -991,13 +992,13 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_two_segment) { PStreamHeader header; std::string data1 = "file1 hello world 123 !@#$%^&*()_+1"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, - data1, false); + 0, data1, false); std::string empty; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, - empty, true); + data1.length(), empty, true); std::string data2 = "file1 hello world 123 !@#$%^&*()_+2"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1, - data2, true); + 0, data2, true); EXPECT_EQ(g_response_stat.num, 0); // CLOSE_LOAD @@ -1044,12 +1045,12 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_three_tablet) { PStreamHeader header; std::string data1 = "file1 hello world 123 !@#$%^&*()_+1"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, - NORMAL_TABLET_ID + 0, 0, data1, true); + NORMAL_TABLET_ID + 0, 0, 0, data1, true); write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, - NORMAL_TABLET_ID + 1, 0, data1, true); + NORMAL_TABLET_ID + 1, 0, 0, data1, true); std::string data2 = "file1 hello world 123 !@#$%^&*()_+2"; write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, - NORMAL_TABLET_ID + 2, 0, data2, true); + NORMAL_TABLET_ID + 2, 0, 0, data2, true); EXPECT_EQ(g_response_stat.num, 0); // CLOSE_LOAD @@ -1113,7 +1114,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) { std::string data1 = "sender_id=" + std::to_string(i) + ",segid=" + std::to_string(segid); write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, - NORMAL_TABLET_ID, segid, data1, true); + NORMAL_TABLET_ID, segid, 0, data1, true); segment_data[i * 3 + segid] = data1; LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data1; } @@ -1186,7 +1187,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) { for (int32_t segid = 2; segid >= 0; segid--) { int i = 0; write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, - NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], true); + NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true); } EXPECT_EQ(g_response_stat.num, 0); @@ -1205,7 +1206,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) { for (int32_t segid = 2; segid >= 0; segid--) { int i = 1; write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, - NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], true); + NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true); } close_load(clients[1], 1); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 3676d854a9..ec3714d618 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -791,6 +791,7 @@ message PStreamHeader { optional SegmentStatisticsPB segment_statistics = 9; repeated PTabletID tablets = 10; optional TabletSchemaPB flush_schema = 11; + optional uint64 offset = 12; } message PGetWalQueueSizeRequest{