From 0a70cbfe99a03d71e82b775460f2fcf4fce168e6 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 25 Aug 2023 10:20:06 +0800 Subject: [PATCH] [feature](move-memtable)[5/7] add olap table sink v2 and writers (#23458) Co-authored-by: laihui <1353307710@qq.com> --- be/src/common/config.cpp | 5 + be/src/common/config.h | 5 + be/src/olap/delta_writer_context.h | 7 + be/src/olap/delta_writer_v2.cpp | 226 +++++++ be/src/olap/delta_writer_v2.h | 128 ++++ be/src/olap/rowset/beta_rowset_writer_v2.cpp | 138 +++++ be/src/olap/rowset/beta_rowset_writer_v2.h | 166 +++++ be/src/olap/rowset/rowset_writer_context.h | 5 + be/src/vec/sink/vtablet_sink_v2.cpp | 601 +++++++++++++++++++ be/src/vec/sink/vtablet_sink_v2.h | 241 ++++++++ 10 files changed, 1522 insertions(+) create mode 100644 be/src/olap/delta_writer_v2.cpp create mode 100644 be/src/olap/delta_writer_v2.h create mode 100644 be/src/olap/rowset/beta_rowset_writer_v2.cpp create mode 100644 be/src/olap/rowset/beta_rowset_writer_v2.h create mode 100644 be/src/vec/sink/vtablet_sink_v2.cpp create mode 100644 be/src/vec/sink/vtablet_sink_v2.h diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d8a4afe9aa..a8307e7de1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -723,6 +723,11 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, "1048576"); // In most cases, it does not need to be modified. DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); +// number of brpc stream per OlapTableSinkV2 +DEFINE_Int32(num_streams_per_sink, "5"); +// timeout for open stream sink rpc in ms +DEFINE_Int64(open_stream_sink_timeout_ms, "500"); + // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, // if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job diff --git a/be/src/common/config.h b/be/src/common/config.h index fbc680f229..8d978c768e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -778,6 +778,11 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes); // In most cases, it does not need to be modified. DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio); +// number of brpc stream per OlapTableSinkV2 +DECLARE_Int32(num_streams_per_sink); +// timeout for open stream sink rpc in ms +DECLARE_Int64(open_stream_sink_timeout_ms); + // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, // if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job diff --git a/be/src/olap/delta_writer_context.h b/be/src/olap/delta_writer_context.h index c5c30b5ce7..680f2d0b6f 100644 --- a/be/src/olap/delta_writer_context.h +++ b/be/src/olap/delta_writer_context.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include @@ -27,6 +28,7 @@ namespace doris { class TupleDescriptor; class SlotDescriptor; class OlapTableSchemaParam; +class TabletSchema; struct WriteRequest { int64_t tablet_id; @@ -40,6 +42,11 @@ struct WriteRequest { bool is_high_priority = false; OlapTableSchemaParam* table_schema_param; int64_t index_id = 0; + // for DeltaWriterV2 + std::shared_ptr tablet_schema; + bool enable_unique_key_merge_on_write = false; + int sender_id = 0; + std::vector streams; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp new file mode 100644 index 0000000000..6f42ae068e --- /dev/null +++ b/be/src/olap/delta_writer_v2.cpp @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/delta_writer_v2.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +// IWYU pragma: no_include +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" +#include "common/logging.h" +#include "common/status.h" +#include "exec/tablet_info.h" +#include "gutil/integral_types.h" +#include "gutil/strings/numbers.h" +#include "io/fs/file_writer.h" // IWYU pragma: keep +#include "olap/data_dir.h" +#include "olap/olap_define.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/beta_rowset_writer_v2.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/schema.h" +#include "olap/schema_change.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "runtime/exec_env.h" +#include "service/backend_options.h" +#include "util/brpc_client_cache.h" +#include "util/mem_info.h" +#include "util/ref_count_closure.h" +#include "util/stopwatch.hpp" +#include "util/time.h" +#include "vec/core/block.h" + +namespace doris { +using namespace ErrorCode; + +Status DeltaWriterV2::open(WriteRequest* req, DeltaWriterV2** writer, RuntimeProfile* profile) { + *writer = new DeltaWriterV2(req, StorageEngine::instance(), profile); + return Status::OK(); +} + +DeltaWriterV2::DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine, + RuntimeProfile* profile) + : _req(*req), + _tablet_schema(new TabletSchema), + _profile(profile->create_child(fmt::format("DeltaWriterV2 {}", _req.tablet_id), true, + true)), + _memtable_writer(new MemTableWriter(*req)), + _streams(req->streams) { + _init_profile(profile); +} + +void DeltaWriterV2::_init_profile(RuntimeProfile* profile) { + _write_memtable_timer = ADD_TIMER(_profile, "WriteMemTableTime"); + _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); +} + +DeltaWriterV2::~DeltaWriterV2() { + if (!_is_init) { + return; + } + + // cancel and wait all memtables in flush queue to be finished + _memtable_writer->cancel(); +} + +Status DeltaWriterV2::init() { + if (_is_init) { + return Status::OK(); + } + // build tablet schema in request level + _build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_req.tablet_schema.get()); + RowsetWriterContext context; + context.txn_id = _req.txn_id; + context.load_id = _req.load_id; + context.index_id = _req.index_id; + context.partition_id = _req.partition_id; + context.rowset_state = PREPARED; + context.segments_overlap = OVERLAPPING; + context.tablet_schema = _tablet_schema; + context.newest_write_timestamp = UnixSeconds(); + context.tablet = nullptr; + context.write_type = DataWriteType::TYPE_DIRECT; + context.tablet_id = _req.tablet_id; + context.partition_id = _req.partition_id; + context.tablet_schema_hash = _req.schema_hash; + context.enable_unique_key_merge_on_write = _req.enable_unique_key_merge_on_write; + context.rowset_type = RowsetTypePB::BETA_ROWSET; + context.rowset_id = StorageEngine::instance()->next_rowset_id(); + context.data_dir = nullptr; + context.sender_id = _req.sender_id; + + _rowset_writer = std::make_shared(_streams); + _rowset_writer->init(context); + _memtable_writer->init(_rowset_writer, _tablet_schema, _req.enable_unique_key_merge_on_write); + ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); + _is_init = true; + return Status::OK(); +} + +Status DeltaWriterV2::append(const vectorized::Block* block) { + return write(block, {}, true); +} + +Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector& row_idxs, + bool is_append) { + if (UNLIKELY(row_idxs.empty() && !is_append)) { + return Status::OK(); + } + _lock_watch.start(); + std::lock_guard l(_lock); + _lock_watch.stop(); + if (!_is_init && !_is_cancelled) { + RETURN_IF_ERROR(init()); + } + SCOPED_TIMER(_write_memtable_timer); + return _memtable_writer->write(block, row_idxs, is_append); +} + +Status DeltaWriterV2::close() { + _lock_watch.start(); + std::lock_guard l(_lock); + _lock_watch.stop(); + if (!_is_init && !_is_cancelled) { + // if this delta writer is not initialized, but close() is called. + // which means this tablet has no data loaded, but at least one tablet + // in same partition has data loaded. + // so we have to also init this DeltaWriterV2, so that it can create an empty rowset + // for this tablet when being closed. + RETURN_IF_ERROR(init()); + } + return _memtable_writer->close(); +} + +Status DeltaWriterV2::close_wait() { + SCOPED_TIMER(_close_wait_timer); + std::lock_guard l(_lock); + DCHECK(_is_init) + << "delta writer is supposed be to initialized before close_wait() being called"; + + RETURN_IF_ERROR(_memtable_writer->close_wait(_profile)); + + _delta_written_success = true; + return Status::OK(); +} + +Status DeltaWriterV2::cancel() { + return cancel_with_status(Status::Cancelled("already cancelled")); +} + +Status DeltaWriterV2::cancel_with_status(const Status& st) { + std::lock_guard l(_lock); + if (_is_cancelled) { + return Status::OK(); + } + RETURN_IF_ERROR(_memtable_writer->cancel_with_status(st)); + _is_cancelled = true; + return Status::OK(); +} + +int64_t DeltaWriterV2::mem_consumption(MemType mem) { + return _memtable_writer->mem_consumption(mem); +} + +int64_t DeltaWriterV2::active_memtable_mem_consumption() { + return _memtable_writer->active_memtable_mem_consumption(); +} + +int64_t DeltaWriterV2::partition_id() const { + return _req.partition_id; +} + +void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { + _tablet_schema->copy_from(ori_tablet_schema); + // find the right index id + int i = 0; + auto indexes = table_schema_param->indexes(); + for (; i < indexes.size(); i++) { + if (indexes[i]->index_id == index_id) { + break; + } + } + + if (indexes.size() > 0 && indexes[i]->columns.size() != 0 && + indexes[i]->columns[0]->unique_id() >= 0) { + _tablet_schema->build_current_tablet_schema(index_id, table_schema_param->version(), + indexes[i], ori_tablet_schema); + } + + _tablet_schema->set_table_id(table_schema_param->table_id()); + // set partial update columns info + _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns()); +} + +} // namespace doris diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h new file mode 100644 index 0000000000..0d78162035 --- /dev/null +++ b/be/src/olap/delta_writer_v2.h @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "olap/delta_writer_context.h" +#include "olap/memtable_writer.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" +#include "olap/tablet_schema.h" +#include "util/spinlock.h" +#include "util/uid_util.h" + +namespace doris { + +class FlushToken; +class MemTable; +class MemTracker; +class Schema; +class StorageEngine; +class TupleDescriptor; +class SlotDescriptor; +class OlapTableSchemaParam; +class BetaRowsetWriterV2; + +namespace vectorized { +class Block; +} // namespace vectorized + +// Writer for a particular (load, index, tablet). +// This class is NOT thread-safe, external synchronization is required. +class DeltaWriterV2 { +public: + static Status open(WriteRequest* req, DeltaWriterV2** writer, RuntimeProfile* profile); + + ~DeltaWriterV2(); + + Status init(); + + Status write(const vectorized::Block* block, const std::vector& row_idxs, + bool is_append = false); + + Status append(const vectorized::Block* block); + + // flush the last memtable to flush queue, must call it before close_wait() + Status close(); + // wait for all memtables to be flushed. + // mem_consumption() should be 0 after this function returns. + Status close_wait(); + + // abandon current memtable and wait for all pending-flushing memtables to be destructed. + // mem_consumption() should be 0 after this function returns. + Status cancel(); + Status cancel_with_status(const Status& st); + + int64_t partition_id() const; + + int64_t mem_consumption(MemType mem); + int64_t active_memtable_mem_consumption(); + + int64_t tablet_id() { return _req.tablet_id; } + + int32_t schema_hash() { return _req.schema_hash; } + + int64_t total_received_rows() const { return _total_received_rows; } + +private: + DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile); + + void _build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema); + + void _init_profile(RuntimeProfile* profile); + + bool _is_init = false; + bool _is_cancelled = false; + WriteRequest _req; + std::shared_ptr _rowset_writer; + TabletSchemaSPtr _tablet_schema; + bool _delta_written_success = false; + + std::mutex _lock; + + // total rows num written by DeltaWriterV2 + int64_t _total_received_rows = 0; + + RuntimeProfile* _profile = nullptr; + RuntimeProfile::Counter* _write_memtable_timer = nullptr; + RuntimeProfile::Counter* _close_wait_timer = nullptr; + + std::shared_ptr _memtable_writer; + MonotonicStopWatch _lock_watch; + + std::vector _streams; +}; + +} // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp new file mode 100644 index 0000000000..d7a641b6e0 --- /dev/null +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/beta_rowset_writer_v2.h" + +#include +// IWYU pragma: no_include +#include // IWYU pragma: keep +#include + +#include // time +#include +#include +#include +#include + +// IWYU pragma: no_include +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" +#include "common/logging.h" +#include "gutil/integral_types.h" +#include "gutil/strings/substitute.h" +#include "io/fs/file_reader_options.h" +#include "io/fs/file_system.h" +#include "io/fs/file_writer.h" +#include "io/fs/stream_sink_file_writer.h" +#include "olap/data_dir.h" +#include "olap/olap_define.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/rowset/segment_v2/inverted_index_cache.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_writer.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/tablet_schema.h" +#include "util/slice.h" +#include "util/time.h" +#include "vec/common/schema_util.h" // LocalSchemaChangeRecorder +#include "vec/core/block.h" + +namespace doris { +using namespace ErrorCode; + +BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector& streams) + : _next_segment_id(0), + _num_segment(0), + _num_rows_written(0), + _total_data_size(0), + _total_index_size(0), + _streams(streams) {} + +BetaRowsetWriterV2::~BetaRowsetWriterV2() = default; + +Status BetaRowsetWriterV2::init(const RowsetWriterContext& rowset_writer_context) { + _context = rowset_writer_context; + _context.segment_collector = std::make_shared>(this); + _context.file_writer_creator = std::make_shared>(this); + _segment_creator.init(_context); + return Status::OK(); +} + +Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer) { + auto partition_id = _context.partition_id; + auto sender_id = _context.sender_id; + auto index_id = _context.index_id; + auto tablet_id = _context.tablet_id; + auto load_id = _context.load_id; + + auto stream_writer = std::make_unique(sender_id, _streams); + stream_writer->init(load_id, partition_id, index_id, tablet_id, segment_id); + file_writer = std::move(stream_writer); + return Status::OK(); +} + +Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, SegmentStatistics& segstat) { + butil::IOBuf buf; + PStreamHeader header; + header.set_src_id(_context.sender_id); + *header.mutable_load_id() = _context.load_id; + header.set_partition_id(_context.partition_id); + header.set_index_id(_context.index_id); + header.set_tablet_id(_context.tablet_id); + header.set_segment_id(segment_id); + header.set_opcode(doris::PStreamHeader::ADD_SEGMENT); + segstat.to_pb(header.mutable_segment_statistics()); + size_t header_len = header.ByteSizeLong(); + buf.append(reinterpret_cast(&header_len), sizeof(header_len)); + buf.append(header.SerializeAsString()); + for (const auto& stream : _streams) { + io::StreamSinkFileWriter::send_with_retry(stream, buf); + } + return Status::OK(); +} + +Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id, + int64_t* flush_size) { + if (block->rows() == 0) { + return Status::OK(); + } + + TabletSchemaSPtr flush_schema; + /* TODO: support dynamic schema + if (_context.tablet_schema->is_dynamic_schema()) { + // Unfold variant column + RETURN_IF_ERROR(_unfold_variant_column(*block, flush_schema)); + } + */ + { + SCOPED_RAW_TIMER(&_segment_writer_ns); + RETURN_IF_ERROR( + _segment_creator.flush_single_block(block, segment_id, flush_size, flush_schema)); + } + // delete bitmap and seg compaction are done on the destination BE. + return Status::OK(); +} + +Status BetaRowsetWriterV2::flush_single_block(const vectorized::Block* block) { + return _segment_creator.flush_single_block(block); +} + +} // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h new file mode 100644 index 0000000000..919c128607 --- /dev/null +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "brpc/controller.h" +#include "brpc/stream.h" +#include "common/status.h" +#include "io/fs/file_reader_writer_fwd.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/segment_creator.h" +#include "segment_v2/segment.h" +#include "util/spinlock.h" + +namespace doris { +namespace vectorized { +class Block; +} // namespace vectorized + +namespace segment_v2 { +class SegmentWriter; +} // namespace segment_v2 + +namespace vectorized::schema_util { +class LocalSchemaChangeRecorder; +} + +class BetaRowsetWriterV2 : public RowsetWriter { +public: + BetaRowsetWriterV2(const std::vector& streams); + + ~BetaRowsetWriterV2() override; + + Status init(const RowsetWriterContext& rowset_writer_context) override; + + Status add_block(const vectorized::Block* block) override { + return Status::Error("add_block is not implemented"); + } + + // add rowset by create hard link + Status add_rowset(RowsetSharedPtr rowset) override { + return Status::Error("add_rowset is not implemented"); + } + + Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) override { + return Status::Error( + "add_rowset_for_linked_schema_change is not implemented"); + } + + Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) override; + + Status flush() override { + return Status::Error("flush is not implemented"); + } + + Status flush_memtable(vectorized::Block* block, int32_t segment_id, + int64_t* flush_size) override; + + // Return the file size flushed to disk in "flush_size" + // This method is thread-safe. + Status flush_single_block(const vectorized::Block* block) override; + + RowsetSharedPtr build() override { return nullptr; }; + + RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override { + LOG(FATAL) << "not implemeted"; + return nullptr; + } + + Version version() override { return _context.version; } + + int64_t num_rows() const override { return _segment_creator.num_rows_written(); } + + int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); } + + RowsetId rowset_id() override { return _context.rowset_id; } + + RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; } + + Status get_segment_num_rows(std::vector* segment_num_rows) const override { + std::lock_guard l(_lock); + *segment_num_rows = _segment_num_rows; + return Status::OK(); + } + + Status add_segment(uint32_t segment_id, SegmentStatistics& segstat) override; + + int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); }; + + bool is_doing_segcompaction() const override { return false; } + + Status wait_flying_segcompaction() override { return Status::OK(); } + + int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; } + + int64_t segment_writer_ns() override { return _segment_writer_ns; } + +private: + RowsetWriterContext _context; + + std::atomic _next_segment_id; // the next available segment_id (offset), + // also the numer of allocated segments + std::atomic _num_segment; // number of consecutive flushed segments + roaring::Roaring _segment_set; // bitmap set to record flushed segment id + std::mutex _segment_set_mutex; // mutex for _segment_set + + mutable SpinLock _lock; // protect following vectors. + // record rows number of every segment already written, using for rowid + // conversion when compaction in unique key with MoW model + std::vector _segment_num_rows; + std::vector _file_writers; + // for unique key table with merge-on-write + std::vector _segments_encoded_key_bounds; + + // counters and statistics maintained during add_rowset + std::atomic _num_rows_written; + std::atomic _total_data_size; + std::atomic _total_index_size; + // TODO rowset Zonemap + + SegmentCreator _segment_creator; + + fmt::memory_buffer vlog_buffer; + + std::vector _streams; + + int64_t _delete_bitmap_ns = 0; + int64_t _segment_writer_ns = 0; +}; + +} // namespace doris diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index e26ab15dfa..b8bfd1225c 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -40,10 +40,12 @@ struct RowsetWriterContext { RowsetWriterContext() : tablet_id(0), tablet_schema_hash(0), + index_id(0), partition_id(0), rowset_type(BETA_ROWSET), rowset_state(PREPARED), version(Version(0, 0)), + sender_id(0), txn_id(0), tablet_uid(0, 0), segments_overlap(OVERLAP_UNKNOWN) { @@ -54,6 +56,7 @@ struct RowsetWriterContext { RowsetId rowset_id; int64_t tablet_id; int64_t tablet_schema_hash; + int64_t index_id; int64_t partition_id; RowsetTypePB rowset_type; io::FileSystemSPtr fs; @@ -65,6 +68,8 @@ struct RowsetWriterContext { // properties for non-pending rowset Version version; + int sender_id; + // properties for pending rowset int64_t txn_id; PUniqueId load_id; diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp new file mode 100644 index 0000000000..b77e44394f --- /dev/null +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -0,0 +1,601 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vtablet_sink_v2.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +// IWYU pragma: no_include +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/logging.h" +#include "common/object_pool.h" +#include "common/status.h" +#include "exec/tablet_info.h" +#include "io/fs/stream_sink_file_writer.h" +#include "olap/delta_writer_v2.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "runtime/thread_context.h" +#include "service/brpc.h" +#include "util/brpc_client_cache.h" +#include "util/doris_metrics.h" +#include "util/network_util.h" +#include "util/telemetry/telemetry.h" +#include "util/threadpool.h" +#include "util/thrift_util.h" +#include "util/uid_util.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr.h" +#include "vec/sink/vtablet_block_convertor.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris { +class TExpr; + +namespace stream_load { + +int StreamSinkHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], + size_t size) { + int64_t backend_id = _sink->_node_id_for_stream->at(id); + + for (size_t i = 0; i < size; i++) { + butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]); + PWriteStreamSinkResponse response; + response.ParseFromZeroCopyStream(&wrapper); + + Status st = Status::create(response.status()); + + std::stringstream ss; + ss << "received response from backend " << backend_id << ", status: " << st + << ", success tablet ids:"; + for (auto tablet_id : response.success_tablet_ids()) { + ss << " " << tablet_id; + } + ss << ", failed tablet ids:"; + for (auto tablet_id : response.failed_tablet_ids()) { + ss << " " << tablet_id; + } + LOG(INFO) << ss.str(); + + int replica = _sink->_num_replicas; + + { + std::lock_guard l(_sink->_tablet_success_map_mutex); + for (auto tablet_id : response.success_tablet_ids()) { + if (_sink->_tablet_success_map.count(tablet_id) == 0) { + _sink->_tablet_success_map.insert({tablet_id, {}}); + } + _sink->_tablet_success_map[tablet_id].push_back(backend_id); + } + } + { + std::lock_guard l(_sink->_tablet_failure_map_mutex); + for (auto tablet_id : response.failed_tablet_ids()) { + if (_sink->_tablet_failure_map.count(tablet_id) == 0) { + _sink->_tablet_failure_map.insert({tablet_id, {}}); + } + _sink->_tablet_failure_map[tablet_id].push_back(backend_id); + if (_sink->_tablet_failure_map[tablet_id].size() * 2 >= replica) { + _sink->_cancel(Status::Cancelled( + "Failed to meet num replicas requirements for tablet {}", tablet_id)); + break; + } + } + } + + if (response.has_load_stream_profile()) { + TRuntimeProfileTree tprofile; + const uint8_t* buf = + reinterpret_cast(response.load_stream_profile().data()); + uint32_t len = response.load_stream_profile().size(); + auto status = deserialize_thrift_msg(buf, &len, false, &tprofile); + if (status.ok()) { + _sink->_state->load_channel_profile()->update(tprofile); + } else { + LOG(WARNING) << "load channel TRuntimeProfileTree deserialize failed, errmsg=" + << status; + } + } + + _sink->_pending_reports.fetch_add(-1); + } + return 0; +} + +void StreamSinkHandler::on_closed(brpc::StreamId id) {} + +VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& texprs, Status* status) + : DataSink(row_desc), _pool(pool) { + // From the thrift expressions create the real exprs. + *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs); + _name = "VOlapTableSinkV2"; +} + +VOlapTableSinkV2::~VOlapTableSinkV2() = default; + +Status VOlapTableSinkV2::init(const TDataSink& t_sink) { + DCHECK(t_sink.__isset.olap_table_sink); + auto& table_sink = t_sink.olap_table_sink; + _load_id.set_hi(table_sink.load_id.hi); + _load_id.set_lo(table_sink.load_id.lo); + _txn_id = table_sink.txn_id; + _num_replicas = table_sink.num_replicas; + _tuple_desc_id = table_sink.tuple_id; + _schema.reset(new OlapTableSchemaParam()); + RETURN_IF_ERROR(_schema->init(table_sink.schema)); + _location = _pool->add(new OlapTableLocationParam(table_sink.location)); + _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); + + // if distributed column list is empty, we can ensure that tablet is with random distribution info + // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition + // for the whole olap table sink + auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; + if (table_sink.partition.distributed_columns.empty()) { + if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { + find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK; + } else { + find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH; + } + } + _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition)); + _tablet_finder = std::make_unique(_vpartition, find_tablet_mode); + return _vpartition->init(); +} + +Status VOlapTableSinkV2::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSink::prepare(state)); + + _state = state; + + _sender_id = state->per_fragment_instance_idx(); + _num_senders = state->num_per_fragment_instances(); + _is_high_priority = + (state->execution_timeout() <= config::load_task_high_priority_threshold_second); + + // profile must add to state's object pool + _profile = state->obj_pool()->add(new RuntimeProfile("VOlapTableSinkV2")); + _mem_tracker = std::make_shared("VOlapTableSinkV2:" + + std::to_string(state->load_job_id())); + SCOPED_TIMER(_profile->total_time_counter()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + + // get table's tuple descriptor + _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); + if (_output_tuple_desc == nullptr) { + return Status::InternalError("unknown destination tuple descriptor, id = {}", + _tuple_desc_id); + } + _block_convertor = std::make_unique(_output_tuple_desc); + + // add all counter + _input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); + _output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT); + _filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT); + _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); + _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime"); + _row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime", "SendDataTime"); + _write_memtable_timer = ADD_CHILD_TIMER(_profile, "WriteMemTableTime", "SendDataTime"); + _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); + _open_timer = ADD_TIMER(_profile, "OpenTime"); + _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); + _close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", "CloseWaitTime"); + _close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", "CloseWaitTime"); + _close_stream_timer = ADD_CHILD_TIMER(_profile, "CloseStreamTime", "CloseWaitTime"); + + // Prepare the exprs to run. + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); + return Status::OK(); +} + +Status VOlapTableSinkV2::open(RuntimeState* state) { + // Prepare the exprs to run. + RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state)); + SCOPED_TIMER(_profile->total_time_counter()); + SCOPED_TIMER(_open_timer); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + + _stream_pool_for_node = std::make_shared(); + _node_id_for_stream = std::make_shared(); + _delta_writer_for_tablet = std::make_shared(); + _build_tablet_node_mapping(); + RETURN_IF_ERROR(_init_stream_pools()); + + return Status::OK(); +} + +Status VOlapTableSinkV2::_init_stream_pools() { + for (auto& [node_id, _] : _tablets_for_node) { + auto node_info = _nodes_info->find_node(node_id); + if (node_info == nullptr) { + return Status::InternalError("Unknown node {} in tablet location", node_id); + } + _stream_pool_for_node->insert({node_id, StreamPool {}}); + StreamPool& stream_pool = _stream_pool_for_node->at(node_id); + RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool)); + for (auto stream : stream_pool) { + _node_id_for_stream->insert({stream, node_id}); + } + } + return Status::OK(); +} + +Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, StreamPool& stream_pool) { + DCHECK_GT(config::num_streams_per_sink, 0); + stream_pool.reserve(config::num_streams_per_sink); + for (int i = 0; i < config::num_streams_per_sink; ++i) { + brpc::StreamOptions opt; + opt.max_buf_size = 20 << 20; // 20MB + opt.idle_timeout_ms = 30000; + opt.messages_in_batch = 128; + opt.handler = new StreamSinkHandler(this); + brpc::StreamId stream; + brpc::Controller cntl; + if (int ret = StreamCreate(&stream, cntl, &opt)) { + return Status::RpcError("Failed to create stream, code = {}", ret); + } + LOG(INFO) << "Created stream " << stream << " for backend " << node_info.id << " (" + << node_info.host << ":" << node_info.brpc_port << ")"; + std::string host_port = get_host_port(node_info.host, node_info.brpc_port); + // use "pooled" connection to avoid conflicts between streaming rpc and regular rpc, + // see: https://github.com/apache/brpc/issues/392 + const auto& stub = + _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache( + host_port, "baidu_std", "pooled"); + POpenStreamSinkRequest request; + *request.mutable_load_id() = _load_id; + request.set_src_id(_sender_id); + request.set_txn_id(_txn_id); + request.set_enable_profile(_state->enable_profile()); + _schema->to_protobuf(request.mutable_schema()); + if (i == 0) { + // get tablet schema from each backend only in the 1st stream + for (auto& tablet : _indexes_from_node[node_info.id]) { + auto req = request.add_tablets(); + *req = tablet; + } + } + POpenStreamSinkResponse response; + cntl.set_timeout_ms(config::open_stream_sink_timeout_ms); + stub->open_stream_sink(&cntl, &request, &response, nullptr); + for (const auto& resp : response.tablet_schemas()) { + auto tablet_schema = std::make_shared(); + tablet_schema->init_from_pb(resp.tablet_schema()); + _tablet_schema_for_index[resp.index_id()] = tablet_schema; + _enable_unique_mow_for_index[resp.index_id()] = resp.enable_unique_key_merge_on_write(); + } + if (cntl.Failed()) { + return Status::InternalError("Failed to connect to backend {}: {}", node_info.id, + cntl.ErrorText()); + } + stream_pool.push_back(stream); + } + return Status::OK(); +} + +void VOlapTableSinkV2::_build_tablet_node_mapping() { + std::unordered_set known_indexes; + for (const auto& partition : _vpartition->get_partitions()) { + for (const auto& index : partition->indexes) { + for (const auto& tablet_id : index.tablets) { + auto nodes = _location->find_tablet(tablet_id)->node_ids; + for (auto& node : nodes) { + PTabletID tablet; + tablet.set_partition_id(partition->id); + tablet.set_index_id(index.index_id); + tablet.set_tablet_id(tablet_id); + _tablets_for_node[node].emplace_back(tablet); + if (known_indexes.contains(index.index_id)) [[likely]] { + continue; + } + _indexes_from_node[node].emplace_back(tablet); + known_indexes.insert(index.index_id); + } + } + } + } +} + +void VOlapTableSinkV2::_generate_rows_for_tablet(RowsForTablet& rows_for_tablet, + const VOlapTablePartition* partition, + uint32_t tablet_index, int row_idx) { + // Generate channel payload for sinking data to each tablet + for (const auto& index : partition->indexes) { + auto tablet_id = index.tablets[tablet_index]; + if (rows_for_tablet.count(tablet_id) == 0) { + Rows rows; + rows.partition_id = partition->id; + rows.index_id = index.index_id; + rows_for_tablet.insert({tablet_id, rows}); + } + rows_for_tablet[tablet_id].row_idxes.push_back(row_idx); + _number_output_rows++; + } +} + +Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, std::vector& streams) { + auto location = _location->find_tablet(tablet_id); + if (location == nullptr) { + return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); + } + for (auto& node_id : location->node_ids) { + streams.push_back(_stream_pool_for_node->at(node_id)[_stream_index]); + } + _stream_index = (_stream_index + 1) % config::num_streams_per_sink; + return Status::OK(); +} + +Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_block, bool eos) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + Status status = Status::OK(); + + LOG(INFO) << "upstream id = " << state->backend_id(); + + auto input_rows = input_block->rows(); + auto input_bytes = input_block->bytes(); + if (UNLIKELY(input_rows == 0)) { + return status; + } + SCOPED_TIMER(_profile->total_time_counter()); + _number_input_rows += input_rows; + // update incrementally so that FE can get the progress. + // the real 'num_rows_load_total' will be set when sink being closed. + state->update_num_rows_load_total(input_rows); + state->update_num_bytes_load_total(input_bytes); + DorisMetrics::instance()->load_rows->increment(input_rows); + DorisMetrics::instance()->load_bytes->increment(input_bytes); + + std::shared_ptr block; + bool has_filtered_rows = false; + RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( + state, input_block, block, _output_vexpr_ctxs, input_rows, eos, has_filtered_rows)); + + // clear and release the references of columns + input_block->clear(); + + SCOPED_RAW_TIMER(&_send_data_ns); + // This is just for passing compilation. + bool stop_processing = false; + RowsForTablet rows_for_tablet; + _tablet_finder->clear_for_new_batch(); + _row_distribution_watch.start(); + auto num_rows = block->rows(); + for (int i = 0; i < num_rows; ++i) { + if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) { + continue; + } + const VOlapTablePartition* partition = nullptr; + bool is_continue = false; + uint32_t tablet_index = 0; + RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block.get(), i, &partition, tablet_index, + stop_processing, is_continue)); + if (is_continue) { + continue; + } + _generate_rows_for_tablet(rows_for_tablet, partition, tablet_index, i); + } + _row_distribution_watch.stop(); + + // For each tablet, send its input_rows from block to delta writer + for (const auto& [tablet_id, rows] : rows_for_tablet) { + std::vector streams; + RETURN_IF_ERROR(_select_streams(tablet_id, streams)); + RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams)); + } + + return Status::OK(); +} + +Status VOlapTableSinkV2::_write_memtable(std::shared_ptr block, + int64_t tablet_id, const Rows& rows, + const std::vector& streams) { + DeltaWriterV2* delta_writer = nullptr; + { + auto it = _delta_writer_for_tablet->find(tablet_id); + if (it == _delta_writer_for_tablet->end()) { + VLOG_DEBUG << "Creating DeltaWriterV2 for Tablet(tablet id: " << tablet_id + << ", index id: " << rows.index_id << ")"; + WriteRequest req; + req.partition_id = rows.partition_id; + req.index_id = rows.index_id; + req.tablet_id = tablet_id; + req.txn_id = _txn_id; + req.load_id = _load_id; + req.tuple_desc = _output_tuple_desc; + req.is_high_priority = _is_high_priority; + req.table_schema_param = _schema.get(); + req.tablet_schema = _tablet_schema_for_index[rows.index_id]; + req.enable_unique_key_merge_on_write = _enable_unique_mow_for_index[rows.index_id]; + req.sender_id = _sender_id; + req.streams = streams; + for (auto& index : _schema->indexes()) { + if (index->index_id == rows.index_id) { + req.slots = &index->slots; + req.schema_hash = index->schema_hash; + break; + } + } + DeltaWriterV2::open(&req, &delta_writer, _profile); + _delta_writer_for_tablet->insert( + {tablet_id, std::unique_ptr(delta_writer)}); + } else { + VLOG_DEBUG << "Reusing DeltaWriterV2 for Tablet(tablet id: " << tablet_id + << ", index id: " << rows.index_id << ")"; + delta_writer = it->second.get(); + } + } + { + SCOPED_TIMER(_wait_mem_limit_timer); + ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(); + } + SCOPED_TIMER(_write_memtable_timer); + auto st = delta_writer->write(block.get(), rows.row_idxes, false); + return st; +} + +Status VOlapTableSinkV2::_cancel(Status status) { + LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id) + << ", txn_id=" << _txn_id << ", due to error: " << status; + + if (_delta_writer_for_tablet.use_count() == 1) { + std::for_each(std::begin(*_delta_writer_for_tablet), std::end(*_delta_writer_for_tablet), + [&status](auto&& entry) { entry.second->cancel_with_status(status); }); + } + _delta_writer_for_tablet.reset(); + if (_stream_pool_for_node.use_count() == 1) { + std::for_each(std::begin(*_node_id_for_stream), std::end(*_node_id_for_stream), + [](auto&& entry) { brpc::StreamClose(entry.first); }); + } + _stream_pool_for_node.reset(); + return Status::OK(); +} + +Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return _close_status; + } + SCOPED_TIMER(_close_timer); + Status status = exec_status; + if (status.ok()) { + // only if status is ok can we call this _profile->total_time_counter(). + // if status is not ok, this sink may not be prepared, so that _profile is null + SCOPED_TIMER(_profile->total_time_counter()); + + COUNTER_SET(_input_rows_counter, _number_input_rows); + COUNTER_SET(_output_rows_counter, _number_output_rows); + COUNTER_SET(_filtered_rows_counter, + _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows()); + COUNTER_SET(_send_data_timer, _send_data_ns); + COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); + COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); + + { + SCOPED_TIMER(_close_writer_timer); + // close all delta writers + if (_delta_writer_for_tablet.use_count() == 1) { + std::for_each(std::begin(*_delta_writer_for_tablet), + std::end(*_delta_writer_for_tablet), + [](auto&& entry) { entry.second->close(); }); + std::for_each(std::begin(*_delta_writer_for_tablet), + std::end(*_delta_writer_for_tablet), + [](auto&& entry) { entry.second->close_wait(); }); + } + _delta_writer_for_tablet.reset(); + } + + { + // send CLOSE_LOAD to all streams, return ERROR if any + RETURN_IF_ERROR(std::transform_reduce( + std::begin(*_node_id_for_stream), std::end(*_node_id_for_stream), Status::OK(), + [](Status& left, Status&& right) { return left.ok() ? right : left; }, + [this](auto&& entry) { return _close_load(entry.first); })); + } + + { + SCOPED_TIMER(_close_load_timer); + while (_pending_reports.load() > 0) { + // TODO: use a better wait + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + LOG(INFO) << "sinkv2 close_wait, pending reports: " << _pending_reports.load(); + } + } + + { + SCOPED_TIMER(_close_stream_timer); + // close streams + if (_stream_pool_for_node.use_count() == 1) { + std::for_each(std::begin(*_node_id_for_stream), std::end(*_node_id_for_stream), + [](auto&& entry) { brpc::StreamClose(entry.first); }); + } + _stream_pool_for_node.reset(); + } + + std::vector tablet_commit_infos; + for (auto& [tablet_id, backends] : _tablet_success_map) { + for (int64_t be_id : backends) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet_id; + commit_info.backendId = be_id; + tablet_commit_infos.emplace_back(std::move(commit_info)); + } + } + state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), + std::make_move_iterator(tablet_commit_infos.begin()), + std::make_move_iterator(tablet_commit_infos.end())); + + // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node + int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + + state->num_rows_load_unselected(); + state->set_num_rows_load_total(num_rows_load_total); + state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + + _tablet_finder->num_filtered_rows()); + state->update_num_rows_load_unselected( + _tablet_finder->num_immutable_partition_filtered_rows()); + + LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id) + << ", txn_id=" << _txn_id; + } else { + _cancel(status); + } + + _close_status = status; + DataSink::close(state, exec_status); + return status; +} + +Status VOlapTableSinkV2::_close_load(brpc::StreamId stream) { + butil::IOBuf buf; + PStreamHeader header; + *header.mutable_load_id() = _load_id; + header.set_src_id(_sender_id); + header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); + auto node_id = _node_id_for_stream.get()->at(stream); + for (auto tablet : _tablets_for_node[node_id]) { + int64_t partition_id = tablet.partition_id(); + if (_tablet_finder->partition_ids().contains(tablet.partition_id()) || + _send_partitions_recorder[node_id].find(partition_id) == + _send_partitions_recorder[node_id].end()) { + PTabletID* tablet_to_commit = header.add_tablets_to_commit(); + *tablet_to_commit = tablet; + _send_partitions_recorder[node_id].insert(tablet.partition_id()); + } + } + size_t header_len = header.ByteSizeLong(); + buf.append(reinterpret_cast(&header_len), sizeof(header_len)); + buf.append(header.SerializeAsString()); + _pending_reports.fetch_add(1); + io::StreamSinkFileWriter::send_with_retry(stream, buf); + return Status::OK(); +} + +} // namespace stream_load +} // namespace doris diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h new file mode 100644 index 0000000000..6c758d643c --- /dev/null +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +// IWYU pragma: no_include +#include // IWYU pragma: keep +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "common/status.h" +#include "exec/data_sink.h" +#include "exec/tablet_info.h" +#include "gutil/ref_counted.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker.h" +#include "runtime/thread_context.h" +#include "runtime/types.h" +#include "util/countdown_latch.h" +#include "util/runtime_profile.h" +#include "util/stopwatch.hpp" +#include "vec/columns/column.h" +#include "vec/common/allocator.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_fwd.h" + +namespace doris { +class DeltaWriterV2; +class ObjectPool; +class RowDescriptor; +class RuntimeState; +class TDataSink; +class TExpr; +class TabletSchema; +class TupleDescriptor; + +namespace stream_load { + +class OlapTableBlockConvertor; +class OlapTabletFinder; +class VOlapTableSinkV2; + +using DeltaWriterForTablet = std::unordered_map>; +using StreamPool = std::vector; +using StreamPoolForNode = std::unordered_map; +using NodeIdForStream = std::unordered_map; +using NodePartitionTabletMapping = + std::unordered_map>>; + +class StreamSinkHandler : public brpc::StreamInputHandler { +public: + StreamSinkHandler(VOlapTableSinkV2* sink) : _sink(sink) {} + + int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], + size_t size) override; + + void on_idle_timeout(brpc::StreamId id) override {} + + void on_closed(brpc::StreamId id) override; + +private: + VOlapTableSinkV2* _sink; +}; + +struct Rows { + int64_t partition_id; + int64_t index_id; + std::vector row_idxes; +}; + +using RowsForTablet = std::unordered_map; + +// Write block data to Olap Table. +// When OlapTableSink::open() called, there will be a consumer thread running in the background. +// When you call VOlapTableSinkV2::send(), you will be the producer who products pending batches. +// Join the consumer thread in close(). +class VOlapTableSinkV2 final : public DataSink { +public: + // Construct from thrift struct which is generated by FE. + VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& texprs, Status* status); + + ~VOlapTableSinkV2() override; + + Status init(const TDataSink& sink) override; + // TODO: unify the code of prepare/open/close with result sink + Status prepare(RuntimeState* state) override; + + Status open(RuntimeState* state) override; + + Status close(RuntimeState* state, Status close_status) override; + Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; + + // Returns the runtime profile for the sink. + RuntimeProfile* profile() override { return _profile; } + +private: + Status _init_stream_pool(const NodeInfo& node_info, StreamPool& stream_pool); + + Status _init_stream_pools(); + + void _build_tablet_node_mapping(); + + void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet, + const VOlapTablePartition* partition, uint32_t tablet_index, + int row_idx); + + Status _write_memtable(std::shared_ptr block, int64_t tablet_id, + const Rows& rows, const std::vector& streams); + + Status _select_streams(int64_t tablet_id, std::vector& streams); + + Status _close_load(brpc::StreamId stream); + + Status _cancel(Status status); + + std::shared_ptr _mem_tracker; + + ObjectPool* _pool; + + // unique load id + PUniqueId _load_id; + int64_t _txn_id = -1; + int _num_replicas = -1; + int _tuple_desc_id = -1; + + // this is tuple descriptor of destination OLAP table + TupleDescriptor* _output_tuple_desc = nullptr; + + // number of senders used to insert into OlapTable, if we only support single node insert, + // all data from select should collectted and then send to OlapTable. + // To support multiple senders, we maintain a channel for each sender. + int _sender_id = -1; + int _num_senders = -1; + bool _is_high_priority = false; + + // TODO(zc): think about cache this data + std::shared_ptr _schema; + std::unordered_map> _tablet_schema_for_index; + std::unordered_map _enable_unique_mow_for_index; + OlapTableLocationParam* _location = nullptr; + DorisNodesInfo* _nodes_info = nullptr; + + RuntimeProfile* _profile = nullptr; + + std::unique_ptr _tablet_finder; + + std::unique_ptr _block_convertor; + + // Stats for this + int64_t _send_data_ns = 0; + int64_t _number_input_rows = 0; + int64_t _number_output_rows = 0; + + MonotonicStopWatch _row_distribution_watch; + + RuntimeProfile::Counter* _input_rows_counter = nullptr; + RuntimeProfile::Counter* _output_rows_counter = nullptr; + RuntimeProfile::Counter* _filtered_rows_counter = nullptr; + RuntimeProfile::Counter* _send_data_timer = nullptr; + RuntimeProfile::Counter* _row_distribution_timer = nullptr; + RuntimeProfile::Counter* _write_memtable_timer = nullptr; + RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr; + RuntimeProfile::Counter* _validate_data_timer = nullptr; + RuntimeProfile::Counter* _open_timer = nullptr; + RuntimeProfile::Counter* _close_timer = nullptr; + RuntimeProfile::Counter* _close_writer_timer = nullptr; + RuntimeProfile::Counter* _close_load_timer = nullptr; + RuntimeProfile::Counter* _close_stream_timer = nullptr; + + // Save the status of close() method + Status _close_status; + + VOlapTablePartitionParam* _vpartition = nullptr; + vectorized::VExprContextSPtrs _output_vexpr_ctxs; + + RuntimeState* _state = nullptr; + + std::unordered_set _opened_partitions; + + std::unordered_map> _tablets_for_node; + std::unordered_map> _indexes_from_node; + std::unordered_map> _send_partitions_recorder; + + std::shared_ptr _stream_pool_for_node; + std::shared_ptr _node_id_for_stream; + size_t _stream_index = 0; + std::shared_ptr _delta_writer_for_tablet; + + std::atomic _pending_reports {0}; + + std::unordered_map> _tablet_success_map; + std::unordered_map> _tablet_failure_map; + bthread::Mutex _tablet_success_map_mutex; + bthread::Mutex _tablet_failure_map_mutex; + + friend class StreamSinkHandler; +}; + +} // namespace stream_load +} // namespace doris