[feature](move-memtable)[5/7] add olap table sink v2 and writers (#23458)

Co-authored-by: laihui <1353307710@qq.com>
This commit is contained in:
Kaijie Chen
2023-08-25 10:20:06 +08:00
committed by GitHub
parent 2847c5e5b8
commit 0a70cbfe99
10 changed files with 1522 additions and 0 deletions

View File

@ -723,6 +723,11 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, "1048576");
// In most cases, it does not need to be modified.
DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
// number of brpc stream per OlapTableSinkV2
DEFINE_Int32(num_streams_per_sink, "5");
// timeout for open stream sink rpc in ms
DEFINE_Int64(open_stream_sink_timeout_ms, "500");
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job

View File

@ -778,6 +778,11 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes);
// In most cases, it does not need to be modified.
DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
// number of brpc stream per OlapTableSinkV2
DECLARE_Int32(num_streams_per_sink);
// timeout for open stream sink rpc in ms
DECLARE_Int64(open_stream_sink_timeout_ms);
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job

View File

@ -17,6 +17,7 @@
#pragma once
#include <brpc/stream.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>
@ -27,6 +28,7 @@ namespace doris {
class TupleDescriptor;
class SlotDescriptor;
class OlapTableSchemaParam;
class TabletSchema;
struct WriteRequest {
int64_t tablet_id;
@ -40,6 +42,11 @@ struct WriteRequest {
bool is_high_priority = false;
OlapTableSchemaParam* table_schema_param;
int64_t index_id = 0;
// for DeltaWriterV2
std::shared_ptr<TabletSchema> tablet_schema;
bool enable_unique_key_merge_on_write = false;
int sender_id = 0;
std::vector<brpc::StreamId> streams;
};
} // namespace doris

View File

@ -0,0 +1,226 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/delta_writer_v2.h"
#include <brpc/controller.h>
#include <butil/errno.h>
#include <fmt/format.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <filesystem>
#include <ostream>
#include <string>
#include <utility>
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "exec/tablet_info.h"
#include "gutil/integral_types.h"
#include "gutil/strings/numbers.h"
#include "io/fs/file_writer.h" // IWYU pragma: keep
#include "olap/data_dir.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/beta_rowset_writer_v2.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
#include "util/mem_info.h"
#include "util/ref_count_closure.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
#include "vec/core/block.h"
namespace doris {
using namespace ErrorCode;
Status DeltaWriterV2::open(WriteRequest* req, DeltaWriterV2** writer, RuntimeProfile* profile) {
*writer = new DeltaWriterV2(req, StorageEngine::instance(), profile);
return Status::OK();
}
DeltaWriterV2::DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile)
: _req(*req),
_tablet_schema(new TabletSchema),
_profile(profile->create_child(fmt::format("DeltaWriterV2 {}", _req.tablet_id), true,
true)),
_memtable_writer(new MemTableWriter(*req)),
_streams(req->streams) {
_init_profile(profile);
}
void DeltaWriterV2::_init_profile(RuntimeProfile* profile) {
_write_memtable_timer = ADD_TIMER(_profile, "WriteMemTableTime");
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
}
DeltaWriterV2::~DeltaWriterV2() {
if (!_is_init) {
return;
}
// cancel and wait all memtables in flush queue to be finished
_memtable_writer->cancel();
}
Status DeltaWriterV2::init() {
if (_is_init) {
return Status::OK();
}
// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_req.tablet_schema.get());
RowsetWriterContext context;
context.txn_id = _req.txn_id;
context.load_id = _req.load_id;
context.index_id = _req.index_id;
context.partition_id = _req.partition_id;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
context.tablet_schema = _tablet_schema;
context.newest_write_timestamp = UnixSeconds();
context.tablet = nullptr;
context.write_type = DataWriteType::TYPE_DIRECT;
context.tablet_id = _req.tablet_id;
context.partition_id = _req.partition_id;
context.tablet_schema_hash = _req.schema_hash;
context.enable_unique_key_merge_on_write = _req.enable_unique_key_merge_on_write;
context.rowset_type = RowsetTypePB::BETA_ROWSET;
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.data_dir = nullptr;
context.sender_id = _req.sender_id;
_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
_rowset_writer->init(context);
_memtable_writer->init(_rowset_writer, _tablet_schema, _req.enable_unique_key_merge_on_write);
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
return Status::OK();
}
Status DeltaWriterV2::append(const vectorized::Block* block) {
return write(block, {}, true);
}
Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<int>& row_idxs,
bool is_append) {
if (UNLIKELY(row_idxs.empty() && !is_append)) {
return Status::OK();
}
_lock_watch.start();
std::lock_guard<std::mutex> l(_lock);
_lock_watch.stop();
if (!_is_init && !_is_cancelled) {
RETURN_IF_ERROR(init());
}
SCOPED_TIMER(_write_memtable_timer);
return _memtable_writer->write(block, row_idxs, is_append);
}
Status DeltaWriterV2::close() {
_lock_watch.start();
std::lock_guard<std::mutex> l(_lock);
_lock_watch.stop();
if (!_is_init && !_is_cancelled) {
// if this delta writer is not initialized, but close() is called.
// which means this tablet has no data loaded, but at least one tablet
// in same partition has data loaded.
// so we have to also init this DeltaWriterV2, so that it can create an empty rowset
// for this tablet when being closed.
RETURN_IF_ERROR(init());
}
return _memtable_writer->close();
}
Status DeltaWriterV2::close_wait() {
SCOPED_TIMER(_close_wait_timer);
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait() being called";
RETURN_IF_ERROR(_memtable_writer->close_wait(_profile));
_delta_written_success = true;
return Status::OK();
}
Status DeltaWriterV2::cancel() {
return cancel_with_status(Status::Cancelled("already cancelled"));
}
Status DeltaWriterV2::cancel_with_status(const Status& st) {
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return Status::OK();
}
RETURN_IF_ERROR(_memtable_writer->cancel_with_status(st));
_is_cancelled = true;
return Status::OK();
}
int64_t DeltaWriterV2::mem_consumption(MemType mem) {
return _memtable_writer->mem_consumption(mem);
}
int64_t DeltaWriterV2::active_memtable_mem_consumption() {
return _memtable_writer->active_memtable_mem_consumption();
}
int64_t DeltaWriterV2::partition_id() const {
return _req.partition_id;
}
void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema) {
_tablet_schema->copy_from(ori_tablet_schema);
// find the right index id
int i = 0;
auto indexes = table_schema_param->indexes();
for (; i < indexes.size(); i++) {
if (indexes[i]->index_id == index_id) {
break;
}
}
if (indexes.size() > 0 && indexes[i]->columns.size() != 0 &&
indexes[i]->columns[0]->unique_id() >= 0) {
_tablet_schema->build_current_tablet_schema(index_id, table_schema_param->version(),
indexes[i], ori_tablet_schema);
}
_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns());
}
} // namespace doris

View File

@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <brpc/stream.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <stdint.h>
#include <atomic>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_set>
#include <vector>
#include "common/status.h"
#include "olap/delta_writer_context.h"
#include "olap/memtable_writer.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "util/spinlock.h"
#include "util/uid_util.h"
namespace doris {
class FlushToken;
class MemTable;
class MemTracker;
class Schema;
class StorageEngine;
class TupleDescriptor;
class SlotDescriptor;
class OlapTableSchemaParam;
class BetaRowsetWriterV2;
namespace vectorized {
class Block;
} // namespace vectorized
// Writer for a particular (load, index, tablet).
// This class is NOT thread-safe, external synchronization is required.
class DeltaWriterV2 {
public:
static Status open(WriteRequest* req, DeltaWriterV2** writer, RuntimeProfile* profile);
~DeltaWriterV2();
Status init();
Status write(const vectorized::Block* block, const std::vector<int>& row_idxs,
bool is_append = false);
Status append(const vectorized::Block* block);
// flush the last memtable to flush queue, must call it before close_wait()
Status close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
Status close_wait();
// abandon current memtable and wait for all pending-flushing memtables to be destructed.
// mem_consumption() should be 0 after this function returns.
Status cancel();
Status cancel_with_status(const Status& st);
int64_t partition_id() const;
int64_t mem_consumption(MemType mem);
int64_t active_memtable_mem_consumption();
int64_t tablet_id() { return _req.tablet_id; }
int32_t schema_hash() { return _req.schema_hash; }
int64_t total_received_rows() const { return _total_received_rows; }
private:
DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile);
void _build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema);
void _init_profile(RuntimeProfile* profile);
bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
std::shared_ptr<BetaRowsetWriterV2> _rowset_writer;
TabletSchemaSPtr _tablet_schema;
bool _delta_written_success = false;
std::mutex _lock;
// total rows num written by DeltaWriterV2
int64_t _total_received_rows = 0;
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _write_memtable_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
std::shared_ptr<MemTableWriter> _memtable_writer;
MonotonicStopWatch _lock_watch;
std::vector<brpc::StreamId> _streams;
};
} // namespace doris

View File

@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/beta_rowset_writer_v2.h"
#include <assert.h>
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <stdio.h>
#include <ctime> // time
#include <filesystem>
#include <memory>
#include <sstream>
#include <utility>
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "gutil/integral_types.h"
#include "gutil/strings/substitute.h"
#include "io/fs/file_reader_options.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/stream_sink_file_writer.h"
#include "olap/data_dir.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
#include "util/slice.h"
#include "util/time.h"
#include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
#include "vec/core/block.h"
namespace doris {
using namespace ErrorCode;
BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector<brpc::StreamId>& streams)
: _next_segment_id(0),
_num_segment(0),
_num_rows_written(0),
_total_data_size(0),
_total_index_size(0),
_streams(streams) {}
BetaRowsetWriterV2::~BetaRowsetWriterV2() = default;
Status BetaRowsetWriterV2::init(const RowsetWriterContext& rowset_writer_context) {
_context = rowset_writer_context;
_context.segment_collector = std::make_shared<SegmentCollectorT<BetaRowsetWriterV2>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BetaRowsetWriterV2>>(this);
_segment_creator.init(_context);
return Status::OK();
}
Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer) {
auto partition_id = _context.partition_id;
auto sender_id = _context.sender_id;
auto index_id = _context.index_id;
auto tablet_id = _context.tablet_id;
auto load_id = _context.load_id;
auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(sender_id, _streams);
stream_writer->init(load_id, partition_id, index_id, tablet_id, segment_id);
file_writer = std::move(stream_writer);
return Status::OK();
}
Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, SegmentStatistics& segstat) {
butil::IOBuf buf;
PStreamHeader header;
header.set_src_id(_context.sender_id);
*header.mutable_load_id() = _context.load_id;
header.set_partition_id(_context.partition_id);
header.set_index_id(_context.index_id);
header.set_tablet_id(_context.tablet_id);
header.set_segment_id(segment_id);
header.set_opcode(doris::PStreamHeader::ADD_SEGMENT);
segstat.to_pb(header.mutable_segment_statistics());
size_t header_len = header.ByteSizeLong();
buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
buf.append(header.SerializeAsString());
for (const auto& stream : _streams) {
io::StreamSinkFileWriter::send_with_retry(stream, buf);
}
return Status::OK();
}
Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
TabletSchemaSPtr flush_schema;
/* TODO: support dynamic schema
if (_context.tablet_schema->is_dynamic_schema()) {
// Unfold variant column
RETURN_IF_ERROR(_unfold_variant_column(*block, flush_schema));
}
*/
{
SCOPED_RAW_TIMER(&_segment_writer_ns);
RETURN_IF_ERROR(
_segment_creator.flush_single_block(block, segment_id, flush_size, flush_schema));
}
// delete bitmap and seg compaction are done on the destination BE.
return Status::OK();
}
Status BetaRowsetWriterV2::flush_single_block(const vectorized::Block* block) {
return _segment_creator.flush_single_block(block);
}
} // namespace doris

View File

@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <roaring/roaring.hh>
#include <string>
#include <unordered_set>
#include <vector>
#include "brpc/controller.h"
#include "brpc/stream.h"
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_creator.h"
#include "segment_v2/segment.h"
#include "util/spinlock.h"
namespace doris {
namespace vectorized {
class Block;
} // namespace vectorized
namespace segment_v2 {
class SegmentWriter;
} // namespace segment_v2
namespace vectorized::schema_util {
class LocalSchemaChangeRecorder;
}
class BetaRowsetWriterV2 : public RowsetWriter {
public:
BetaRowsetWriterV2(const std::vector<brpc::StreamId>& streams);
~BetaRowsetWriterV2() override;
Status init(const RowsetWriterContext& rowset_writer_context) override;
Status add_block(const vectorized::Block* block) override {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>("add_block is not implemented");
}
// add rowset by create hard link
Status add_rowset(RowsetSharedPtr rowset) override {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>("add_rowset is not implemented");
}
Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) override {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
"add_rowset_for_linked_schema_change is not implemented");
}
Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) override;
Status flush() override {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>("flush is not implemented");
}
Status flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) override;
// Return the file size flushed to disk in "flush_size"
// This method is thread-safe.
Status flush_single_block(const vectorized::Block* block) override;
RowsetSharedPtr build() override { return nullptr; };
RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override {
LOG(FATAL) << "not implemeted";
return nullptr;
}
Version version() override { return _context.version; }
int64_t num_rows() const override { return _segment_creator.num_rows_written(); }
int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); }
RowsetId rowset_id() override { return _context.rowset_id; }
RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) const override {
std::lock_guard<SpinLock> l(_lock);
*segment_num_rows = _segment_num_rows;
return Status::OK();
}
Status add_segment(uint32_t segment_id, SegmentStatistics& segstat) override;
int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); };
bool is_doing_segcompaction() const override { return false; }
Status wait_flying_segcompaction() override { return Status::OK(); }
int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
int64_t segment_writer_ns() override { return _segment_writer_ns; }
private:
RowsetWriterContext _context;
std::atomic<int32_t> _next_segment_id; // the next available segment_id (offset),
// also the numer of allocated segments
std::atomic<int32_t> _num_segment; // number of consecutive flushed segments
roaring::Roaring _segment_set; // bitmap set to record flushed segment id
std::mutex _segment_set_mutex; // mutex for _segment_set
mutable SpinLock _lock; // protect following vectors.
// record rows number of every segment already written, using for rowid
// conversion when compaction in unique key with MoW model
std::vector<uint32_t> _segment_num_rows;
std::vector<io::FileWriterPtr> _file_writers;
// for unique key table with merge-on-write
std::vector<KeyBoundsPB> _segments_encoded_key_bounds;
// counters and statistics maintained during add_rowset
std::atomic<int64_t> _num_rows_written;
std::atomic<int64_t> _total_data_size;
std::atomic<int64_t> _total_index_size;
// TODO rowset Zonemap
SegmentCreator _segment_creator;
fmt::memory_buffer vlog_buffer;
std::vector<brpc::StreamId> _streams;
int64_t _delete_bitmap_ns = 0;
int64_t _segment_writer_ns = 0;
};
} // namespace doris

View File

@ -40,10 +40,12 @@ struct RowsetWriterContext {
RowsetWriterContext()
: tablet_id(0),
tablet_schema_hash(0),
index_id(0),
partition_id(0),
rowset_type(BETA_ROWSET),
rowset_state(PREPARED),
version(Version(0, 0)),
sender_id(0),
txn_id(0),
tablet_uid(0, 0),
segments_overlap(OVERLAP_UNKNOWN) {
@ -54,6 +56,7 @@ struct RowsetWriterContext {
RowsetId rowset_id;
int64_t tablet_id;
int64_t tablet_schema_hash;
int64_t index_id;
int64_t partition_id;
RowsetTypePB rowset_type;
io::FileSystemSPtr fs;
@ -65,6 +68,8 @@ struct RowsetWriterContext {
// properties for non-pending rowset
Version version;
int sender_id;
// properties for pending rowset
int64_t txn_id;
PUniqueId load_id;

View File

@ -0,0 +1,601 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/vtablet_sink_v2.h"
#include <brpc/uri.h>
#include <bthread/bthread.h>
#include <fmt/format.h>
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <opentelemetry/nostd/shared_ptr.h>
#include <algorithm>
#include <execution>
#include <mutex>
#include <string>
#include <unordered_map>
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "exec/tablet_info.h"
#include "io/fs/stream_sink_file_writer.h"
#include "olap/delta_writer_v2.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/brpc.h"
#include "util/brpc_client_cache.h"
#include "util/doris_metrics.h"
#include "util/network_util.h"
#include "util/telemetry/telemetry.h"
#include "util/threadpool.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
namespace doris {
class TExpr;
namespace stream_load {
int StreamSinkHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
size_t size) {
int64_t backend_id = _sink->_node_id_for_stream->at(id);
for (size_t i = 0; i < size; i++) {
butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
PWriteStreamSinkResponse response;
response.ParseFromZeroCopyStream(&wrapper);
Status st = Status::create(response.status());
std::stringstream ss;
ss << "received response from backend " << backend_id << ", status: " << st
<< ", success tablet ids:";
for (auto tablet_id : response.success_tablet_ids()) {
ss << " " << tablet_id;
}
ss << ", failed tablet ids:";
for (auto tablet_id : response.failed_tablet_ids()) {
ss << " " << tablet_id;
}
LOG(INFO) << ss.str();
int replica = _sink->_num_replicas;
{
std::lock_guard<bthread::Mutex> l(_sink->_tablet_success_map_mutex);
for (auto tablet_id : response.success_tablet_ids()) {
if (_sink->_tablet_success_map.count(tablet_id) == 0) {
_sink->_tablet_success_map.insert({tablet_id, {}});
}
_sink->_tablet_success_map[tablet_id].push_back(backend_id);
}
}
{
std::lock_guard<bthread::Mutex> l(_sink->_tablet_failure_map_mutex);
for (auto tablet_id : response.failed_tablet_ids()) {
if (_sink->_tablet_failure_map.count(tablet_id) == 0) {
_sink->_tablet_failure_map.insert({tablet_id, {}});
}
_sink->_tablet_failure_map[tablet_id].push_back(backend_id);
if (_sink->_tablet_failure_map[tablet_id].size() * 2 >= replica) {
_sink->_cancel(Status::Cancelled(
"Failed to meet num replicas requirements for tablet {}", tablet_id));
break;
}
}
}
if (response.has_load_stream_profile()) {
TRuntimeProfileTree tprofile;
const uint8_t* buf =
reinterpret_cast<const uint8_t*>(response.load_stream_profile().data());
uint32_t len = response.load_stream_profile().size();
auto status = deserialize_thrift_msg(buf, &len, false, &tprofile);
if (status.ok()) {
_sink->_state->load_channel_profile()->update(tprofile);
} else {
LOG(WARNING) << "load channel TRuntimeProfileTree deserialize failed, errmsg="
<< status;
}
}
_sink->_pending_reports.fetch_add(-1);
}
return 0;
}
void StreamSinkHandler::on_closed(brpc::StreamId id) {}
VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, Status* status)
: DataSink(row_desc), _pool(pool) {
// From the thrift expressions create the real exprs.
*status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
_name = "VOlapTableSinkV2";
}
VOlapTableSinkV2::~VOlapTableSinkV2() = default;
Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
DCHECK(t_sink.__isset.olap_table_sink);
auto& table_sink = t_sink.olap_table_sink;
_load_id.set_hi(table_sink.load_id.hi);
_load_id.set_lo(table_sink.load_id.lo);
_txn_id = table_sink.txn_id;
_num_replicas = table_sink.num_replicas;
_tuple_desc_id = table_sink.tuple_id;
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
// if distributed column list is empty, we can ensure that tablet is with random distribution info
// and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition
// for the whole olap table sink
auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
if (table_sink.partition.distributed_columns.empty()) {
if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
} else {
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
}
}
_vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
return _vpartition->init();
}
Status VOlapTableSinkV2::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
_state = state;
_sender_id = state->per_fragment_instance_idx();
_num_senders = state->num_per_fragment_instances();
_is_high_priority =
(state->execution_timeout() <= config::load_task_high_priority_threshold_second);
// profile must add to state's object pool
_profile = state->obj_pool()->add(new RuntimeProfile("VOlapTableSinkV2"));
_mem_tracker = std::make_shared<MemTracker>("VOlapTableSinkV2:" +
std::to_string(state->load_job_id()));
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// get table's tuple descriptor
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
if (_output_tuple_desc == nullptr) {
return Status::InternalError("unknown destination tuple descriptor, id = {}",
_tuple_desc_id);
}
_block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
// add all counter
_input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
_output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT);
_filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT);
_send_data_timer = ADD_TIMER(_profile, "SendDataTime");
_wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime");
_row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime", "SendDataTime");
_write_memtable_timer = ADD_CHILD_TIMER(_profile, "WriteMemTableTime", "SendDataTime");
_validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
_open_timer = ADD_TIMER(_profile, "OpenTime");
_close_timer = ADD_TIMER(_profile, "CloseWaitTime");
_close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", "CloseWaitTime");
_close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", "CloseWaitTime");
_close_stream_timer = ADD_CHILD_TIMER(_profile, "CloseStreamTime", "CloseWaitTime");
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return Status::OK();
}
Status VOlapTableSinkV2::open(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
_stream_pool_for_node = std::make_shared<StreamPoolForNode>();
_node_id_for_stream = std::make_shared<NodeIdForStream>();
_delta_writer_for_tablet = std::make_shared<DeltaWriterForTablet>();
_build_tablet_node_mapping();
RETURN_IF_ERROR(_init_stream_pools());
return Status::OK();
}
Status VOlapTableSinkV2::_init_stream_pools() {
for (auto& [node_id, _] : _tablets_for_node) {
auto node_info = _nodes_info->find_node(node_id);
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location", node_id);
}
_stream_pool_for_node->insert({node_id, StreamPool {}});
StreamPool& stream_pool = _stream_pool_for_node->at(node_id);
RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool));
for (auto stream : stream_pool) {
_node_id_for_stream->insert({stream, node_id});
}
}
return Status::OK();
}
Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, StreamPool& stream_pool) {
DCHECK_GT(config::num_streams_per_sink, 0);
stream_pool.reserve(config::num_streams_per_sink);
for (int i = 0; i < config::num_streams_per_sink; ++i) {
brpc::StreamOptions opt;
opt.max_buf_size = 20 << 20; // 20MB
opt.idle_timeout_ms = 30000;
opt.messages_in_batch = 128;
opt.handler = new StreamSinkHandler(this);
brpc::StreamId stream;
brpc::Controller cntl;
if (int ret = StreamCreate(&stream, cntl, &opt)) {
return Status::RpcError("Failed to create stream, code = {}", ret);
}
LOG(INFO) << "Created stream " << stream << " for backend " << node_info.id << " ("
<< node_info.host << ":" << node_info.brpc_port << ")";
std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
// use "pooled" connection to avoid conflicts between streaming rpc and regular rpc,
// see: https://github.com/apache/brpc/issues/392
const auto& stub =
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(
host_port, "baidu_std", "pooled");
POpenStreamSinkRequest request;
*request.mutable_load_id() = _load_id;
request.set_src_id(_sender_id);
request.set_txn_id(_txn_id);
request.set_enable_profile(_state->enable_profile());
_schema->to_protobuf(request.mutable_schema());
if (i == 0) {
// get tablet schema from each backend only in the 1st stream
for (auto& tablet : _indexes_from_node[node_info.id]) {
auto req = request.add_tablets();
*req = tablet;
}
}
POpenStreamSinkResponse response;
cntl.set_timeout_ms(config::open_stream_sink_timeout_ms);
stub->open_stream_sink(&cntl, &request, &response, nullptr);
for (const auto& resp : response.tablet_schemas()) {
auto tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(resp.tablet_schema());
_tablet_schema_for_index[resp.index_id()] = tablet_schema;
_enable_unique_mow_for_index[resp.index_id()] = resp.enable_unique_key_merge_on_write();
}
if (cntl.Failed()) {
return Status::InternalError("Failed to connect to backend {}: {}", node_info.id,
cntl.ErrorText());
}
stream_pool.push_back(stream);
}
return Status::OK();
}
void VOlapTableSinkV2::_build_tablet_node_mapping() {
std::unordered_set<int64_t> known_indexes;
for (const auto& partition : _vpartition->get_partitions()) {
for (const auto& index : partition->indexes) {
for (const auto& tablet_id : index.tablets) {
auto nodes = _location->find_tablet(tablet_id)->node_ids;
for (auto& node : nodes) {
PTabletID tablet;
tablet.set_partition_id(partition->id);
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
_tablets_for_node[node].emplace_back(tablet);
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
known_indexes.insert(index.index_id);
}
}
}
}
}
void VOlapTableSinkV2::_generate_rows_for_tablet(RowsForTablet& rows_for_tablet,
const VOlapTablePartition* partition,
uint32_t tablet_index, int row_idx) {
// Generate channel payload for sinking data to each tablet
for (const auto& index : partition->indexes) {
auto tablet_id = index.tablets[tablet_index];
if (rows_for_tablet.count(tablet_id) == 0) {
Rows rows;
rows.partition_id = partition->id;
rows.index_id = index.index_id;
rows_for_tablet.insert({tablet_id, rows});
}
rows_for_tablet[tablet_id].row_idxes.push_back(row_idx);
_number_output_rows++;
}
}
Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, std::vector<brpc::StreamId>& streams) {
auto location = _location->find_tablet(tablet_id);
if (location == nullptr) {
return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id);
}
for (auto& node_id : location->node_ids) {
streams.push_back(_stream_pool_for_node->at(node_id)[_stream_index]);
}
_stream_index = (_stream_index + 1) % config::num_streams_per_sink;
return Status::OK();
}
Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_block, bool eos) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
LOG(INFO) << "upstream id = " << state->backend_id();
auto input_rows = input_block->rows();
auto input_bytes = input_block->bytes();
if (UNLIKELY(input_rows == 0)) {
return status;
}
SCOPED_TIMER(_profile->total_time_counter());
_number_input_rows += input_rows;
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
state->update_num_rows_load_total(input_rows);
state->update_num_bytes_load_total(input_bytes);
DorisMetrics::instance()->load_rows->increment(input_rows);
DorisMetrics::instance()->load_bytes->increment(input_bytes);
std::shared_ptr<vectorized::Block> block;
bool has_filtered_rows = false;
RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
state, input_block, block, _output_vexpr_ctxs, input_rows, eos, has_filtered_rows));
// clear and release the references of columns
input_block->clear();
SCOPED_RAW_TIMER(&_send_data_ns);
// This is just for passing compilation.
bool stop_processing = false;
RowsForTablet rows_for_tablet;
_tablet_finder->clear_for_new_batch();
_row_distribution_watch.start();
auto num_rows = block->rows();
for (int i = 0; i < num_rows; ++i) {
if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) {
continue;
}
const VOlapTablePartition* partition = nullptr;
bool is_continue = false;
uint32_t tablet_index = 0;
RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block.get(), i, &partition, tablet_index,
stop_processing, is_continue));
if (is_continue) {
continue;
}
_generate_rows_for_tablet(rows_for_tablet, partition, tablet_index, i);
}
_row_distribution_watch.stop();
// For each tablet, send its input_rows from block to delta writer
for (const auto& [tablet_id, rows] : rows_for_tablet) {
std::vector<brpc::StreamId> streams;
RETURN_IF_ERROR(_select_streams(tablet_id, streams));
RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
}
return Status::OK();
}
Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> block,
int64_t tablet_id, const Rows& rows,
const std::vector<brpc::StreamId>& streams) {
DeltaWriterV2* delta_writer = nullptr;
{
auto it = _delta_writer_for_tablet->find(tablet_id);
if (it == _delta_writer_for_tablet->end()) {
VLOG_DEBUG << "Creating DeltaWriterV2 for Tablet(tablet id: " << tablet_id
<< ", index id: " << rows.index_id << ")";
WriteRequest req;
req.partition_id = rows.partition_id;
req.index_id = rows.index_id;
req.tablet_id = tablet_id;
req.txn_id = _txn_id;
req.load_id = _load_id;
req.tuple_desc = _output_tuple_desc;
req.is_high_priority = _is_high_priority;
req.table_schema_param = _schema.get();
req.tablet_schema = _tablet_schema_for_index[rows.index_id];
req.enable_unique_key_merge_on_write = _enable_unique_mow_for_index[rows.index_id];
req.sender_id = _sender_id;
req.streams = streams;
for (auto& index : _schema->indexes()) {
if (index->index_id == rows.index_id) {
req.slots = &index->slots;
req.schema_hash = index->schema_hash;
break;
}
}
DeltaWriterV2::open(&req, &delta_writer, _profile);
_delta_writer_for_tablet->insert(
{tablet_id, std::unique_ptr<DeltaWriterV2>(delta_writer)});
} else {
VLOG_DEBUG << "Reusing DeltaWriterV2 for Tablet(tablet id: " << tablet_id
<< ", index id: " << rows.index_id << ")";
delta_writer = it->second.get();
}
}
{
SCOPED_TIMER(_wait_mem_limit_timer);
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
}
SCOPED_TIMER(_write_memtable_timer);
auto st = delta_writer->write(block.get(), rows.row_idxes, false);
return st;
}
Status VOlapTableSinkV2::_cancel(Status status) {
LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id << ", due to error: " << status;
if (_delta_writer_for_tablet.use_count() == 1) {
std::for_each(std::begin(*_delta_writer_for_tablet), std::end(*_delta_writer_for_tablet),
[&status](auto&& entry) { entry.second->cancel_with_status(status); });
}
_delta_writer_for_tablet.reset();
if (_stream_pool_for_node.use_count() == 1) {
std::for_each(std::begin(*_node_id_for_stream), std::end(*_node_id_for_stream),
[](auto&& entry) { brpc::StreamClose(entry.first); });
}
_stream_pool_for_node.reset();
return Status::OK();
}
Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return _close_status;
}
SCOPED_TIMER(_close_timer);
Status status = exec_status;
if (status.ok()) {
// only if status is ok can we call this _profile->total_time_counter().
// if status is not ok, this sink may not be prepared, so that _profile is null
SCOPED_TIMER(_profile->total_time_counter());
COUNTER_SET(_input_rows_counter, _number_input_rows);
COUNTER_SET(_output_rows_counter, _number_output_rows);
COUNTER_SET(_filtered_rows_counter,
_block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
{
SCOPED_TIMER(_close_writer_timer);
// close all delta writers
if (_delta_writer_for_tablet.use_count() == 1) {
std::for_each(std::begin(*_delta_writer_for_tablet),
std::end(*_delta_writer_for_tablet),
[](auto&& entry) { entry.second->close(); });
std::for_each(std::begin(*_delta_writer_for_tablet),
std::end(*_delta_writer_for_tablet),
[](auto&& entry) { entry.second->close_wait(); });
}
_delta_writer_for_tablet.reset();
}
{
// send CLOSE_LOAD to all streams, return ERROR if any
RETURN_IF_ERROR(std::transform_reduce(
std::begin(*_node_id_for_stream), std::end(*_node_id_for_stream), Status::OK(),
[](Status& left, Status&& right) { return left.ok() ? right : left; },
[this](auto&& entry) { return _close_load(entry.first); }));
}
{
SCOPED_TIMER(_close_load_timer);
while (_pending_reports.load() > 0) {
// TODO: use a better wait
std::this_thread::sleep_for(std::chrono::milliseconds(1));
LOG(INFO) << "sinkv2 close_wait, pending reports: " << _pending_reports.load();
}
}
{
SCOPED_TIMER(_close_stream_timer);
// close streams
if (_stream_pool_for_node.use_count() == 1) {
std::for_each(std::begin(*_node_id_for_stream), std::end(*_node_id_for_stream),
[](auto&& entry) { brpc::StreamClose(entry.first); });
}
_stream_pool_for_node.reset();
}
std::vector<TTabletCommitInfo> tablet_commit_infos;
for (auto& [tablet_id, backends] : _tablet_success_map) {
for (int64_t be_id : backends) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;
commit_info.backendId = be_id;
tablet_commit_infos.emplace_back(std::move(commit_info));
}
}
state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
std::make_move_iterator(tablet_commit_infos.begin()),
std::make_move_iterator(tablet_commit_infos.end()));
// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() +
state->num_rows_load_unselected();
state->set_num_rows_load_total(num_rows_load_total);
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
_tablet_finder->num_filtered_rows());
state->update_num_rows_load_unselected(
_tablet_finder->num_immutable_partition_filtered_rows());
LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id;
} else {
_cancel(status);
}
_close_status = status;
DataSink::close(state, exec_status);
return status;
}
Status VOlapTableSinkV2::_close_load(brpc::StreamId stream) {
butil::IOBuf buf;
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_sender_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
auto node_id = _node_id_for_stream.get()->at(stream);
for (auto tablet : _tablets_for_node[node_id]) {
int64_t partition_id = tablet.partition_id();
if (_tablet_finder->partition_ids().contains(tablet.partition_id()) ||
_send_partitions_recorder[node_id].find(partition_id) ==
_send_partitions_recorder[node_id].end()) {
PTabletID* tablet_to_commit = header.add_tablets_to_commit();
*tablet_to_commit = tablet;
_send_partitions_recorder[node_id].insert(tablet.partition_id());
}
}
size_t header_len = header.ByteSizeLong();
buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
buf.append(header.SerializeAsString());
_pending_reports.fetch_add(1);
io::StreamSinkFileWriter::send_with_retry(stream, buf);
return Status::OK();
}
} // namespace stream_load
} // namespace doris

View File

@ -0,0 +1,241 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <brpc/controller.h>
#include <bthread/types.h>
#include <butil/errno.h>
#include <fmt/format.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/status.h"
#include "exec/data_sink.h"
#include "exec/tablet_info.h"
#include "gutil/ref_counted.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "util/countdown_latch.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
#include "vec/common/allocator.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
namespace doris {
class DeltaWriterV2;
class ObjectPool;
class RowDescriptor;
class RuntimeState;
class TDataSink;
class TExpr;
class TabletSchema;
class TupleDescriptor;
namespace stream_load {
class OlapTableBlockConvertor;
class OlapTabletFinder;
class VOlapTableSinkV2;
using DeltaWriterForTablet = std::unordered_map<int64_t, std::unique_ptr<DeltaWriterV2>>;
using StreamPool = std::vector<brpc::StreamId>;
using StreamPoolForNode = std::unordered_map<int64_t, StreamPool>;
using NodeIdForStream = std::unordered_map<brpc::StreamId, int64_t>;
using NodePartitionTabletMapping =
std::unordered_map<int64_t, std::unordered_map<int64_t, std::unordered_set<int64_t>>>;
class StreamSinkHandler : public brpc::StreamInputHandler {
public:
StreamSinkHandler(VOlapTableSinkV2* sink) : _sink(sink) {}
int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
size_t size) override;
void on_idle_timeout(brpc::StreamId id) override {}
void on_closed(brpc::StreamId id) override;
private:
VOlapTableSinkV2* _sink;
};
struct Rows {
int64_t partition_id;
int64_t index_id;
std::vector<int32_t> row_idxes;
};
using RowsForTablet = std::unordered_map<int64_t, Rows>;
// Write block data to Olap Table.
// When OlapTableSink::open() called, there will be a consumer thread running in the background.
// When you call VOlapTableSinkV2::send(), you will be the producer who products pending batches.
// Join the consumer thread in close().
class VOlapTableSinkV2 final : public DataSink {
public:
// Construct from thrift struct which is generated by FE.
VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, Status* status);
~VOlapTableSinkV2() override;
Status init(const TDataSink& sink) override;
// TODO: unify the code of prepare/open/close with result sink
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status close_status) override;
Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override;
// Returns the runtime profile for the sink.
RuntimeProfile* profile() override { return _profile; }
private:
Status _init_stream_pool(const NodeInfo& node_info, StreamPool& stream_pool);
Status _init_stream_pools();
void _build_tablet_node_mapping();
void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet,
const VOlapTablePartition* partition, uint32_t tablet_index,
int row_idx);
Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id,
const Rows& rows, const std::vector<brpc::StreamId>& streams);
Status _select_streams(int64_t tablet_id, std::vector<brpc::StreamId>& streams);
Status _close_load(brpc::StreamId stream);
Status _cancel(Status status);
std::shared_ptr<MemTracker> _mem_tracker;
ObjectPool* _pool;
// unique load id
PUniqueId _load_id;
int64_t _txn_id = -1;
int _num_replicas = -1;
int _tuple_desc_id = -1;
// this is tuple descriptor of destination OLAP table
TupleDescriptor* _output_tuple_desc = nullptr;
// number of senders used to insert into OlapTable, if we only support single node insert,
// all data from select should collectted and then send to OlapTable.
// To support multiple senders, we maintain a channel for each sender.
int _sender_id = -1;
int _num_senders = -1;
bool _is_high_priority = false;
// TODO(zc): think about cache this data
std::shared_ptr<OlapTableSchemaParam> _schema;
std::unordered_map<int64_t, std::shared_ptr<TabletSchema>> _tablet_schema_for_index;
std::unordered_map<int64_t, bool> _enable_unique_mow_for_index;
OlapTableLocationParam* _location = nullptr;
DorisNodesInfo* _nodes_info = nullptr;
RuntimeProfile* _profile = nullptr;
std::unique_ptr<OlapTabletFinder> _tablet_finder;
std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
// Stats for this
int64_t _send_data_ns = 0;
int64_t _number_input_rows = 0;
int64_t _number_output_rows = 0;
MonotonicStopWatch _row_distribution_watch;
RuntimeProfile::Counter* _input_rows_counter = nullptr;
RuntimeProfile::Counter* _output_rows_counter = nullptr;
RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
RuntimeProfile::Counter* _send_data_timer = nullptr;
RuntimeProfile::Counter* _row_distribution_timer = nullptr;
RuntimeProfile::Counter* _write_memtable_timer = nullptr;
RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
RuntimeProfile::Counter* _validate_data_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _close_writer_timer = nullptr;
RuntimeProfile::Counter* _close_load_timer = nullptr;
RuntimeProfile::Counter* _close_stream_timer = nullptr;
// Save the status of close() method
Status _close_status;
VOlapTablePartitionParam* _vpartition = nullptr;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
RuntimeState* _state = nullptr;
std::unordered_set<int64_t> _opened_partitions;
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
std::unordered_map<int64_t, std::unordered_set<int64_t>> _send_partitions_recorder;
std::shared_ptr<StreamPoolForNode> _stream_pool_for_node;
std::shared_ptr<NodeIdForStream> _node_id_for_stream;
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterForTablet> _delta_writer_for_tablet;
std::atomic<int> _pending_reports {0};
std::unordered_map<int64_t, std::vector<int64_t>> _tablet_success_map;
std::unordered_map<int64_t, std::vector<int64_t>> _tablet_failure_map;
bthread::Mutex _tablet_success_map_mutex;
bthread::Mutex _tablet_failure_map_mutex;
friend class StreamSinkHandler;
};
} // namespace stream_load
} // namespace doris