[improve](move-memtable) support variant in move-memtable (#28084)

* [improve](move-memtable) support variant in move-memtable

Pass the flush schema to the destination end of the sink.
This commit is contained in:
lihangyu
2023-12-07 16:41:38 +08:00
committed by GitHub
parent 1a46cf6fb5
commit 9c63dfd692
16 changed files with 59 additions and 25 deletions

View File

@ -720,7 +720,8 @@ Status BetaRowsetWriter::_check_segment_number_limit() {
return Status::OK();
}
Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) {
uint32_t segid_offset = segment_id - _segment_start_id;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
@ -742,6 +743,10 @@ Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistic
_num_segment++;
}
}
// tablet schema updated
if (flush_schema != nullptr) {
update_rowset_schema(flush_schema);
}
if (_context.mow_context != nullptr) {
RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
}

View File

@ -78,7 +78,8 @@ public:
Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) override;
Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) override;
Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) override;
Status flush() override;

View File

@ -82,10 +82,11 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWrite
return Status::OK();
}
Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) {
for (const auto& stream : _streams) {
RETURN_IF_ERROR(stream->add_segment(_context.partition_id, _context.index_id,
_context.tablet_id, segment_id, segstat));
_context.tablet_id, segment_id, segstat, flush_schema));
}
return Status::OK();
}

View File

@ -122,7 +122,8 @@ public:
return Status::OK();
}
Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) override;
Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) override;
int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); };

View File

@ -21,6 +21,7 @@
#include <gen_cpp/types.pb.h>
#include <functional>
#include <memory>
#include <optional>
#include "common/factory_creator.h"
@ -29,6 +30,7 @@
#include "olap/column_mapping.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "vec/core/block.h"
@ -114,7 +116,8 @@ public:
"RowsetWriter not support flush_single_block");
}
virtual Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
virtual Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) {
return Status::NotSupported("RowsetWriter does not support add_segment");
}

View File

@ -59,9 +59,9 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
if (block->rows() == 0) {
return Status::OK();
}
TabletSchemaSPtr flush_schema = nullptr;
// Expand variant columns
vectorized::Block flush_block(*block);
TabletSchemaSPtr flush_schema;
if (_context->write_type != DataWriteType::TYPE_COMPACTION &&
_context->tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, flush_schema));
@ -72,12 +72,12 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema));
RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size));
} else {
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema));
RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size));
}
return Status::OK();
}
@ -311,7 +311,8 @@ Status SegmentFlusher::_create_segment_writer(
}
Status SegmentFlusher::_flush_segment_writer(
std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t* flush_size) {
std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, TabletSchemaSPtr flush_schema,
int64_t* flush_size) {
uint32_t row_num = writer->num_rows_written();
_num_rows_filtered += writer->num_rows_filtered();
@ -344,7 +345,7 @@ Status SegmentFlusher::_flush_segment_writer(
writer.reset();
RETURN_IF_ERROR(_context->segment_collector->add(segment_id, segstat));
RETURN_IF_ERROR(_context->segment_collector->add(segment_id, segstat, flush_schema));
if (flush_size) {
*flush_size = segment_size + index_size;
@ -353,7 +354,7 @@ Status SegmentFlusher::_flush_segment_writer(
}
Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int64_t* flush_size) {
TabletSchemaSPtr flush_schema, int64_t* flush_size) {
uint32_t row_num = writer->num_rows_written();
_num_rows_filtered += writer->num_rows_filtered();
@ -386,7 +387,7 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::Segment
writer.reset();
RETURN_IF_ERROR(_context->segment_collector->add(segment_id, segstat));
RETURN_IF_ERROR(_context->segment_collector->add(segment_id, segstat, flush_schema));
if (flush_size) {
*flush_size = segment_size + index_size;

View File

@ -26,6 +26,7 @@
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/tablet_fwd.h"
#include "util/spinlock.h"
#include "vec/core/block.h"
@ -66,7 +67,8 @@ class SegmentCollector {
public:
virtual ~SegmentCollector() = default;
virtual Status add(uint32_t segment_id, SegmentStatistics& segstat) = 0;
virtual Status add(uint32_t segment_id, SegmentStatistics& segstat,
TabletSchemaSPtr flush_chema) = 0;
};
template <class T>
@ -74,8 +76,9 @@ class SegmentCollectorT : public SegmentCollector {
public:
explicit SegmentCollectorT(T* t) : _t(t) {}
Status add(uint32_t segment_id, SegmentStatistics& segstat) override {
return _t->add_segment(segment_id, segstat);
Status add(uint32_t segment_id, SegmentStatistics& segstat,
TabletSchemaSPtr flush_chema) override {
return _t->add_segment(segment_id, segstat, flush_chema);
}
private:
@ -140,8 +143,10 @@ private:
int32_t segment_id, bool no_compression = false,
TabletSchemaSPtr flush_schema = nullptr);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
TabletSchemaSPtr flush_schema = nullptr,
int64_t* flush_size = nullptr);
Status _flush_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer,
TabletSchemaSPtr flush_schema = nullptr,
int64_t* flush_size = nullptr);
private:
@ -195,7 +200,6 @@ private:
std::atomic<int32_t> _next_segment_id = 0;
SegmentFlusher _segment_flusher;
std::unique_ptr<SegmentFlusher::Writer> _flush_writer;
// Buffer block to num bytes before flushing
vectorized::MutableBlock _buffer_block;
};

View File

@ -26,9 +26,13 @@
#include <olap/tablet_manager.h>
#include <runtime/exec_env.h>
#include <memory>
#include "common/signal_handler.h"
#include "exec/tablet_info.h"
#include "gutil/ref_counted.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "runtime/load_channel.h"
#include "runtime/load_stream_mgr.h"
#include "runtime/load_stream_writer.h"
@ -139,6 +143,11 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
SCOPED_TIMER(_add_segment_timer);
DCHECK(header.has_segment_statistics());
SegmentStatistics stat(header.segment_statistics());
TabletSchemaSPtr flush_schema;
if (header.has_flush_schema()) {
flush_schema = std::make_shared<TabletSchema>();
flush_schema->init_from_pb(header.flush_schema());
}
int64_t src_id = header.src_id();
uint32_t segid = header.segment_id();
@ -154,8 +163,8 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
}
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
auto add_segment_func = [this, new_segid, stat]() {
auto st = _load_stream_writer->add_segment(new_segid, stat);
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
LOG(INFO) << "add segment failed " << *this;

View File

@ -120,7 +120,8 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
return Status::OK();
}
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) {
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat,
TabletSchemaSPtr flush_schema) {
if (_segment_file_writers[segid]->bytes_appended() != stat.data_size) {
LOG(WARNING) << _segment_file_writers[segid]->path() << " is incomplete, actual size: "
<< _segment_file_writers[segid]->bytes_appended()
@ -129,7 +130,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st
_segment_file_writers[segid]->path().native(),
_segment_file_writers[segid]->bytes_appended(), stat.data_size);
}
return _rowset_writer->add_segment(segid, stat);
return _rowset_writer->add_segment(segid, stat, flush_schema);
}
Status LoadStreamWriter::close() {

View File

@ -71,7 +71,7 @@ public:
Status close_segment(uint32_t segid);
Status add_segment(uint32_t segid, const SegmentStatistics& stat);
Status add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_chema);
// wait for all memtables to be flushed.
Status close();

View File

@ -201,7 +201,8 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64
// ADD_SEGMENT
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t segment_id, const SegmentStatistics& segment_stat) {
int64_t segment_id, const SegmentStatistics& segment_stat,
TabletSchemaSPtr flush_schema) {
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
@ -211,6 +212,9 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64
header.set_segment_id(segment_id);
header.set_opcode(doris::PStreamHeader::ADD_SEGMENT);
segment_stat.to_pb(header.mutable_segment_statistics());
if (flush_schema != nullptr) {
flush_schema->to_schema_pb(header.mutable_flush_schema());
}
return _encode_and_send(header);
}

View File

@ -166,7 +166,8 @@ public:
// ADD_SEGMENT
Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t segment_id, const SegmentStatistics& segment_stat);
int64_t segment_id, const SegmentStatistics& segment_stat,
TabletSchemaSPtr flush_schema);
// CLOSE_LOAD
Status close_load(const std::vector<PTabletID>& tablets_to_commit);

View File

@ -790,6 +790,7 @@ message PStreamHeader {
optional int64 src_id = 8;
optional SegmentStatisticsPB segment_statistics = 9;
repeated PTabletID tablets = 10;
optional TabletSchemaPB flush_schema = 11;
}
message PGetWalQueueSizeRequest{

View File

@ -149,7 +149,7 @@
[123]
-- !sql_25 --
50000 55000.00000000863 6150000
50000 55000.000000002256 6150000
-- !sql_26 --
5000

View File

@ -16,6 +16,7 @@
// under the License.
suite("regression_test_variant_github_events_p0", "variant_type"){
sql "set enable_memtable_on_sink_node = true"
def load_json_data = {table_name, file_name ->
// load the json data
streamLoad {

View File

@ -16,6 +16,7 @@
// under the License.
suite("regression_test_variant_complexjson", "variant_type_complex_json") {
sql "set enable_memtable_on_sink_node = true"
def create_table = { table_name ->
sql "DROP TABLE IF EXISTS ${table_name}"
sql """