From 9c63dfd692ac6d07be90c5afc2131593797cbd15 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Thu, 7 Dec 2023 16:41:38 +0800 Subject: [PATCH] [improve](move-memtable) support variant in move-memtable (#28084) * [improve](move-memtable) support variant in move-memtable Pass the flush schema to the destination end of the sink. --- be/src/olap/rowset/beta_rowset_writer.cpp | 7 ++++++- be/src/olap/rowset/beta_rowset_writer.h | 3 ++- be/src/olap/rowset/beta_rowset_writer_v2.cpp | 5 +++-- be/src/olap/rowset/beta_rowset_writer_v2.h | 3 ++- be/src/olap/rowset/rowset_writer.h | 5 ++++- be/src/olap/rowset/segment_creator.cpp | 15 ++++++++------- be/src/olap/rowset/segment_creator.h | 12 ++++++++---- be/src/runtime/load_stream.cpp | 13 +++++++++++-- be/src/runtime/load_stream_writer.cpp | 5 +++-- be/src/runtime/load_stream_writer.h | 2 +- be/src/vec/sink/load_stream_stub.cpp | 6 +++++- be/src/vec/sink/load_stream_stub.h | 3 ++- gensrc/proto/internal_service.proto | 1 + regression-test/data/variant_p0/load.out | 2 +- .../suites/variant_github_events_p0/load.groovy | 1 + .../suites/variant_p0/complexjson.groovy | 1 + 16 files changed, 59 insertions(+), 25 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 3b2852a283..ab9e97a3d3 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -720,7 +720,8 @@ Status BetaRowsetWriter::_check_segment_number_limit() { return Status::OK(); } -Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { +Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat, + TabletSchemaSPtr flush_schema) { uint32_t segid_offset = segment_id - _segment_start_id; { std::lock_guard lock(_segid_statistics_map_mutex); @@ -742,6 +743,10 @@ Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistic _num_segment++; } } + // tablet schema updated + if (flush_schema != nullptr) { + update_rowset_schema(flush_schema); + } if (_context.mow_context != nullptr) { RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 62e802658f..68932c5ef7 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -78,7 +78,8 @@ public: Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) override; - Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) override; + Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat, + TabletSchemaSPtr flush_schema) override; Status flush() override; diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp index 7d83742af1..225ba490a3 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -82,10 +82,11 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWrite return Status::OK(); } -Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { +Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat, + TabletSchemaSPtr flush_schema) { for (const auto& stream : _streams) { RETURN_IF_ERROR(stream->add_segment(_context.partition_id, _context.index_id, - _context.tablet_id, segment_id, segstat)); + _context.tablet_id, segment_id, segstat, flush_schema)); } return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index bd2d5284e8..4a99acdaba 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -122,7 +122,8 @@ public: return Status::OK(); } - Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) override; + Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat, + TabletSchemaSPtr flush_schema) override; int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); }; diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index dc85283c4e..542528b1ac 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -21,6 +21,7 @@ #include #include +#include #include #include "common/factory_creator.h" @@ -29,6 +30,7 @@ #include "olap/column_mapping.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_writer_context.h" +#include "olap/tablet_fwd.h" #include "olap/tablet_schema.h" #include "vec/core/block.h" @@ -114,7 +116,8 @@ public: "RowsetWriter not support flush_single_block"); } - virtual Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { + virtual Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat, + TabletSchemaSPtr flush_schema) { return Status::NotSupported("RowsetWriter does not support add_segment"); } diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index af7cec3e80..11991f0db8 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -59,9 +59,9 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_ if (block->rows() == 0) { return Status::OK(); } - TabletSchemaSPtr flush_schema = nullptr; // Expand variant columns vectorized::Block flush_block(*block); + TabletSchemaSPtr flush_schema; if (_context->write_type != DataWriteType::TYPE_COMPACTION && _context->tablet_schema->num_variant_columns() > 0) { RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, flush_schema)); @@ -72,12 +72,12 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_ std::unique_ptr writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); - RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); + RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size)); } else { std::unique_ptr writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); - RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); + RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size)); } return Status::OK(); } @@ -311,7 +311,8 @@ Status SegmentFlusher::_create_segment_writer( } Status SegmentFlusher::_flush_segment_writer( - std::unique_ptr& writer, int64_t* flush_size) { + std::unique_ptr& writer, TabletSchemaSPtr flush_schema, + int64_t* flush_size) { uint32_t row_num = writer->num_rows_written(); _num_rows_filtered += writer->num_rows_filtered(); @@ -344,7 +345,7 @@ Status SegmentFlusher::_flush_segment_writer( writer.reset(); - RETURN_IF_ERROR(_context->segment_collector->add(segment_id, segstat)); + RETURN_IF_ERROR(_context->segment_collector->add(segment_id, segstat, flush_schema)); if (flush_size) { *flush_size = segment_size + index_size; @@ -353,7 +354,7 @@ Status SegmentFlusher::_flush_segment_writer( } Status SegmentFlusher::_flush_segment_writer(std::unique_ptr& writer, - int64_t* flush_size) { + TabletSchemaSPtr flush_schema, int64_t* flush_size) { uint32_t row_num = writer->num_rows_written(); _num_rows_filtered += writer->num_rows_filtered(); @@ -386,7 +387,7 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptrsegment_collector->add(segment_id, segstat)); + RETURN_IF_ERROR(_context->segment_collector->add(segment_id, segstat, flush_schema)); if (flush_size) { *flush_size = segment_size + index_size; diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index 0a53117b4d..668c75e47b 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -26,6 +26,7 @@ #include "io/fs/file_reader_writer_fwd.h" #include "olap/olap_common.h" #include "olap/rowset/rowset_writer_context.h" +#include "olap/tablet_fwd.h" #include "util/spinlock.h" #include "vec/core/block.h" @@ -66,7 +67,8 @@ class SegmentCollector { public: virtual ~SegmentCollector() = default; - virtual Status add(uint32_t segment_id, SegmentStatistics& segstat) = 0; + virtual Status add(uint32_t segment_id, SegmentStatistics& segstat, + TabletSchemaSPtr flush_chema) = 0; }; template @@ -74,8 +76,9 @@ class SegmentCollectorT : public SegmentCollector { public: explicit SegmentCollectorT(T* t) : _t(t) {} - Status add(uint32_t segment_id, SegmentStatistics& segstat) override { - return _t->add_segment(segment_id, segstat); + Status add(uint32_t segment_id, SegmentStatistics& segstat, + TabletSchemaSPtr flush_chema) override { + return _t->add_segment(segment_id, segstat, flush_chema); } private: @@ -140,8 +143,10 @@ private: int32_t segment_id, bool no_compression = false, TabletSchemaSPtr flush_schema = nullptr); Status _flush_segment_writer(std::unique_ptr& writer, + TabletSchemaSPtr flush_schema = nullptr, int64_t* flush_size = nullptr); Status _flush_segment_writer(std::unique_ptr& writer, + TabletSchemaSPtr flush_schema = nullptr, int64_t* flush_size = nullptr); private: @@ -195,7 +200,6 @@ private: std::atomic _next_segment_id = 0; SegmentFlusher _segment_flusher; std::unique_ptr _flush_writer; - // Buffer block to num bytes before flushing vectorized::MutableBlock _buffer_block; }; diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index febd328418..80d82956a0 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -26,9 +26,13 @@ #include #include +#include + #include "common/signal_handler.h" #include "exec/tablet_info.h" #include "gutil/ref_counted.h" +#include "olap/tablet_fwd.h" +#include "olap/tablet_schema.h" #include "runtime/load_channel.h" #include "runtime/load_stream_mgr.h" #include "runtime/load_stream_writer.h" @@ -139,6 +143,11 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data SCOPED_TIMER(_add_segment_timer); DCHECK(header.has_segment_statistics()); SegmentStatistics stat(header.segment_statistics()); + TabletSchemaSPtr flush_schema; + if (header.has_flush_schema()) { + flush_schema = std::make_shared(); + flush_schema->init_from_pb(header.flush_schema()); + } int64_t src_id = header.src_id(); uint32_t segid = header.segment_id(); @@ -154,8 +163,8 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data } DCHECK(new_segid != std::numeric_limits::max()); - auto add_segment_func = [this, new_segid, stat]() { - auto st = _load_stream_writer->add_segment(new_segid, stat); + auto add_segment_func = [this, new_segid, stat, flush_schema]() { + auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema); if (!st.ok() && _failed_st->ok()) { _failed_st = std::make_shared(st); LOG(INFO) << "add segment failed " << *this; diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index e7fe9abdcf..52948429e3 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -120,7 +120,8 @@ Status LoadStreamWriter::close_segment(uint32_t segid) { return Status::OK(); } -Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) { +Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat, + TabletSchemaSPtr flush_schema) { if (_segment_file_writers[segid]->bytes_appended() != stat.data_size) { LOG(WARNING) << _segment_file_writers[segid]->path() << " is incomplete, actual size: " << _segment_file_writers[segid]->bytes_appended() @@ -129,7 +130,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st _segment_file_writers[segid]->path().native(), _segment_file_writers[segid]->bytes_appended(), stat.data_size); } - return _rowset_writer->add_segment(segid, stat); + return _rowset_writer->add_segment(segid, stat, flush_schema); } Status LoadStreamWriter::close() { diff --git a/be/src/runtime/load_stream_writer.h b/be/src/runtime/load_stream_writer.h index afdbdecdcc..37514377a3 100644 --- a/be/src/runtime/load_stream_writer.h +++ b/be/src/runtime/load_stream_writer.h @@ -71,7 +71,7 @@ public: Status close_segment(uint32_t segid); - Status add_segment(uint32_t segid, const SegmentStatistics& stat); + Status add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_chema); // wait for all memtables to be flushed. Status close(); diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 26331053e3..8831613dd8 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -201,7 +201,8 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 // ADD_SEGMENT Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, - int64_t segment_id, const SegmentStatistics& segment_stat) { + int64_t segment_id, const SegmentStatistics& segment_stat, + TabletSchemaSPtr flush_schema) { PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -211,6 +212,9 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 header.set_segment_id(segment_id); header.set_opcode(doris::PStreamHeader::ADD_SEGMENT); segment_stat.to_pb(header.mutable_segment_statistics()); + if (flush_schema != nullptr) { + flush_schema->to_schema_pb(header.mutable_flush_schema()); + } return _encode_and_send(header); } diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 7aae8496a0..786de57d75 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -166,7 +166,8 @@ public: // ADD_SEGMENT Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, - int64_t segment_id, const SegmentStatistics& segment_stat); + int64_t segment_id, const SegmentStatistics& segment_stat, + TabletSchemaSPtr flush_schema); // CLOSE_LOAD Status close_load(const std::vector& tablets_to_commit); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 544a91a3cb..ad66efc62a 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -790,6 +790,7 @@ message PStreamHeader { optional int64 src_id = 8; optional SegmentStatisticsPB segment_statistics = 9; repeated PTabletID tablets = 10; + optional TabletSchemaPB flush_schema = 11; } message PGetWalQueueSizeRequest{ diff --git a/regression-test/data/variant_p0/load.out b/regression-test/data/variant_p0/load.out index 3b9cdd658b..490da2618d 100644 --- a/regression-test/data/variant_p0/load.out +++ b/regression-test/data/variant_p0/load.out @@ -149,7 +149,7 @@ [123] -- !sql_25 -- -50000 55000.00000000863 6150000 +50000 55000.000000002256 6150000 -- !sql_26 -- 5000 diff --git a/regression-test/suites/variant_github_events_p0/load.groovy b/regression-test/suites/variant_github_events_p0/load.groovy index 5dec74a736..e58f5f6afa 100644 --- a/regression-test/suites/variant_github_events_p0/load.groovy +++ b/regression-test/suites/variant_github_events_p0/load.groovy @@ -16,6 +16,7 @@ // under the License. suite("regression_test_variant_github_events_p0", "variant_type"){ + sql "set enable_memtable_on_sink_node = true" def load_json_data = {table_name, file_name -> // load the json data streamLoad { diff --git a/regression-test/suites/variant_p0/complexjson.groovy b/regression-test/suites/variant_p0/complexjson.groovy index 244da16f00..cceab301ba 100644 --- a/regression-test/suites/variant_p0/complexjson.groovy +++ b/regression-test/suites/variant_p0/complexjson.groovy @@ -16,6 +16,7 @@ // under the License. suite("regression_test_variant_complexjson", "variant_type_complex_json") { + sql "set enable_memtable_on_sink_node = true" def create_table = { table_name -> sql "DROP TABLE IF EXISTS ${table_name}" sql """