From 2ea33518b0b8657508f113ed7f17b5b2596a64ac Mon Sep 17 00:00:00 2001 From: zclllyybb Date: Thu, 23 Nov 2023 19:12:28 +0800 Subject: [PATCH] [Opt](load) use batching to optimize auto partition (#26915) use batching to optimize auto partition --- be/src/common/config.cpp | 3 +- be/src/common/config.h | 4 +- .../pipeline/exec/exchange_sink_operator.cpp | 2 +- .../exec/result_file_sink_operator.cpp | 2 +- be/src/runtime/plan_fragment_executor.cpp | 5 - be/src/vec/columns/column.h | 2 +- be/src/vec/columns/column_dummy.h | 2 + be/src/vec/core/block.cpp | 25 +++ be/src/vec/core/block.h | 12 +- be/src/vec/exec/scan/pip_scanner_context.h | 2 +- be/src/vec/sink/vdata_stream_sender.cpp | 8 +- be/src/vec/sink/vrow_distribution.cpp | 106 +++++++----- be/src/vec/sink/vrow_distribution.h | 99 ++++++----- be/src/vec/sink/vtablet_finder.h | 2 +- .../vec/sink/writer/async_result_writer.cpp | 10 +- be/src/vec/sink/writer/async_result_writer.h | 1 - be/src/vec/sink/writer/vtablet_writer.cpp | 159 +++++++++++------- be/src/vec/sink/writer/vtablet_writer.h | 32 ++-- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 78 ++++++--- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 + docs/en/docs/admin-manual/config/be-config.md | 10 ++ .../docs/advanced/partition/auto-partition.md | 17 +- .../docs/admin-manual/config/be-config.md | 14 +- .../docs/advanced/partition/auto-partition.md | 17 +- .../apache/doris/planner/OlapTableSink.java | 13 +- .../test_auto_partition_load.out | 8 + .../test_auto_partition_load.groovy | 12 +- 27 files changed, 398 insertions(+), 249 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 2f52bd4417..adbc61fe56 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -505,7 +505,8 @@ DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. // You may need to lower the speed when the sink receiver bes are too busy. -DEFINE_mInt32(olap_table_sink_send_interval_ms, "1"); +DEFINE_mInt32(olap_table_sink_send_interval_microseconds, "1000"); +DEFINE_mDouble(olap_table_sink_send_interval_auto_partition_factor, "0.001"); // Fragment thread pool DEFINE_Int32(fragment_pool_thread_num_min, "64"); diff --git a/be/src/common/config.h b/be/src/common/config.h index e13d6dcfd4..f616cfb608 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -559,7 +559,9 @@ DECLARE_Int64(stream_tvf_buffer_size); // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. // You may need to lower the speed when the sink receiver bes are too busy. -DECLARE_mInt32(olap_table_sink_send_interval_ms); +DECLARE_mInt32(olap_table_sink_send_interval_microseconds); +// For auto partition, the send interval will multiply the factor +DECLARE_mDouble(olap_table_sink_send_interval_auto_partition_factor); // Fragment thread pool DECLARE_Int32(fragment_pool_thread_num_min); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index f0e03596cc..71517f377f 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -375,7 +375,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } } cur_block.clear_column_data(); - local_state._serializer.get_block()->set_muatable_columns( + local_state._serializer.get_block()->set_mutable_columns( cur_block.mutate_columns()); } } diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 3193c1b07c..b19c93cd28 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -247,7 +247,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) } } cur_block.clear_column_data(); - _serializer.get_block()->set_muatable_columns(cur_block.mutate_columns()); + _serializer.get_block()->set_mutable_columns(cur_block.mutate_columns()); } } } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 6359a2dbe6..dc7fb350b6 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -350,11 +350,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() { if (!eos || block->rows() > 0) { st = _sink->send(runtime_state(), block.get()); - //TODO: Asynchronisation need refactor this - if (st.is()) { // created partition, do it again. - st = _sink->send(runtime_state(), block.get()); - DCHECK(!st.is()); - } handle_group_commit(); if (st.is()) { break; diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 5202c51a3d..58fe0cb87e 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -609,7 +609,7 @@ public: virtual bool is_exclusive() const { return use_count() == 1; } /// Clear data of column, just like vector clear - virtual void clear() {} + virtual void clear() = 0; /** Memory layout properties. * diff --git a/be/src/vec/columns/column_dummy.h b/be/src/vec/columns/column_dummy.h index a152dc9751..790c135889 100644 --- a/be/src/vec/columns/column_dummy.h +++ b/be/src/vec/columns/column_dummy.h @@ -62,6 +62,8 @@ public: void insert_data(const char*, size_t) override { ++s; } + void clear() override {}; + StringRef serialize_value_into_arena(size_t /*n*/, Arena& arena, char const*& begin) const override { return {arena.alloc_continue(0, begin), 0}; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 779f214c97..0aef622870 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -21,6 +21,7 @@ #include "vec/core/block.h" #include +#include #include #include #include @@ -968,6 +969,22 @@ void MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) } } +void MutableBlock::add_rows(const Block* block, std::vector rows) { + DCHECK_LE(columns(), block->columns()); + const auto& block_data = block->get_columns_with_type_and_name(); + const size_t length = std::ranges::distance(rows); + for (size_t i = 0; i < _columns.size(); ++i) { + DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); + auto& dst = _columns[i]; + const auto& src = *block_data[i].column.get(); + dst->reserve(dst->size() + length); + for (size_t row : rows) { + // we can introduce a new function like `insert_assume_reserved` for IColumn. + dst->insert_from(src, row); + } + } +} + void MutableBlock::erase(const String& name) { auto index_it = index_by_name.find(name); if (index_it == index_by_name.end()) { @@ -1100,6 +1117,14 @@ void MutableBlock::clear_column_data() noexcept { } } +void MutableBlock::reset_column_data() noexcept { + _columns.clear(); + for (int i = 0; i < _names.size(); i++) { + _columns.emplace_back(_data_types[i]->create_column()); + index_by_name[_names[i]] = i; + } +} + void MutableBlock::initialize_index_by_name() { for (size_t i = 0, size = _names.size(); i < size; ++i) { index_by_name[_names[i]] = i; diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 927ed5c655..b03d9fa4e2 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -22,9 +22,9 @@ #include #include -#include -#include +#include +#include #include #include #include @@ -462,7 +462,7 @@ public: MutableColumns& mutable_columns() { return _columns; } - void set_muatable_columns(MutableColumns&& columns) { _columns = std::move(columns); } + void set_mutable_columns(MutableColumns&& columns) { _columns = std::move(columns); } DataTypes& data_types() { return _data_types; } @@ -583,8 +583,8 @@ public: return Status::OK(); } + // move to columns' data to a Block. this will invalidate Block to_block(int start_column = 0); - Block to_block(int start_column, int end_column); void swap(MutableBlock& other) noexcept; @@ -594,6 +594,7 @@ public: void add_row(const Block* block, int row); void add_rows(const Block* block, const int* row_begin, const int* row_end); void add_rows(const Block* block, size_t row_begin, size_t length); + void add_rows(const Block* block, std::vector rows); /// remove the column with the specified name void erase(const String& name); @@ -606,7 +607,10 @@ public: _names.clear(); } + // columns resist. columns' inner data removed. void clear_column_data() noexcept; + // reset columns by types and names. + void reset_column_data() noexcept; size_t allocated_bytes() const; diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 111e6ea2ab..6c1f8e6325 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -259,7 +259,7 @@ private: _dependency->set_ready(); } _colocate_blocks[loc] = get_free_block(); - _colocate_mutable_blocks[loc]->set_muatable_columns( + _colocate_mutable_blocks[loc]->set_mutable_columns( _colocate_blocks[loc]->mutate_columns()); } } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 128ad0b37f..5bfec60d1f 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -126,7 +126,7 @@ template Status Channel::send_local_block(Status exec_status, bool eos) { SCOPED_TIMER(_parent->local_send_timer()); Block block = _serializer.get_block()->to_block(); - _serializer.get_block()->set_muatable_columns(block.clone_empty_columns()); + _serializer.get_block()->set_mutable_columns(block.clone_empty_columns()); if (_recvr_is_valid()) { if constexpr (!std::is_same_v) { COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes()); @@ -568,7 +568,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } } cur_block.clear_column_data(); - _serializer.get_block()->set_muatable_columns(cur_block.mutate_columns()); + _serializer.get_block()->set_mutable_columns(cur_block.mutate_columns()); } } } else { @@ -595,7 +595,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } } cur_block.clear_column_data(); - _serializer.get_block()->set_muatable_columns(cur_block.mutate_columns()); + _serializer.get_block()->set_mutable_columns(cur_block.mutate_columns()); _roll_pb_block(); } } @@ -750,7 +750,7 @@ Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) auto block = _mutable_block->to_block(); RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers)); block.clear_column_data(); - _mutable_block->set_muatable_columns(block.mutate_columns()); + _mutable_block->set_mutable_columns(block.mutate_columns()); } return Status::OK(); diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 0071629175..74561594cf 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -20,9 +20,11 @@ #include #include +#include "common/status.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" +#include "util/doris_metrics.h" #include "util/thrift_rpc_helper.h" #include "vec/columns/column_const.h" #include "vec/columns/column_nullable.h" @@ -36,22 +38,37 @@ VRowDistribution::_get_partition_function() { return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; } -void VRowDistribution::_save_missing_values(vectorized::ColumnPtr col, - vectorized::DataTypePtr value_type) { - _partitions_need_create.clear(); - std::set deduper; - // de-duplication - for (auto row : _missing_map) { - deduper.emplace(value_type->to_string(*col, row)); +Status VRowDistribution::_save_missing_values(vectorized::ColumnPtr col, + vectorized::DataTypePtr value_type, Block* block, + std::vector filter) { + // de-duplication for new partitions but save all rows. + _batching_block->add_rows(block, filter); + for (auto row : filter) { + auto val_str = value_type->to_string(*col, row); + if (!_deduper.contains(val_str)) { + _deduper.emplace(val_str); + TStringLiteral node; + node.value = std::move(val_str); + _partitions_need_create.emplace_back(std::vector {node}); // only 1 partition column now + } } - for (auto& value : deduper) { - TStringLiteral node; - node.value = value; - _partitions_need_create.emplace_back(std::vector {node}); // only 1 partition column now + + // to avoid too large mem use + if (_batching_rows > _batch_size) { + _deal_batched = true; } + + return Status::OK(); } -Status VRowDistribution::_automatic_create_partition() { +void VRowDistribution::clear_batching_stats() { + _partitions_need_create.clear(); + _deduper.clear(); + _batching_rows = 0; + _batching_bytes = 0; +} + +Status VRowDistribution::automatic_create_partition() { SCOPED_TIMER(_add_partition_request_timer); TCreatePartitionRequest request; TCreatePartitionResult result; @@ -75,7 +92,7 @@ Status VRowDistribution::_automatic_create_partition() { if (result.status.status_code == TStatusCode::OK) { // add new created partitions RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); - RETURN_IF_ERROR(_on_partitions_created(_caller, &result)); + RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); } return status; @@ -126,7 +143,7 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( auto& row_ids = row_part_tablet_id.row_ids; auto& partition_ids = row_part_tablet_id.partition_ids; auto& tablet_ids = row_part_tablet_id.tablet_ids; - if (auto* nullable_column = + if (const auto* nullable_column = vectorized::check_and_get_column(*filter_column)) { for (size_t i = 0; i < block->rows(); i++) { if (nullable_column->get_bool_inline(i) && !_skip[i]) { @@ -135,7 +152,7 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( tablet_ids.emplace_back(_tablet_ids[i]); } } - } else if (auto* const_column = + } else if (const auto* const_column = vectorized::check_and_get_column(*filter_column)) { bool ret = const_column->get_bool(0); if (!ret) { @@ -144,7 +161,7 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( // should we optimize? _filter_block_by_skip(block, row_part_tablet_id); } else { - auto& filter = assert_cast(*filter_column).get_data(); + const auto& filter = assert_cast(*filter_column).get_data(); for (size_t i = 0; i < block->rows(); i++) { if (filter[i] != 0 && !_skip[i]) { row_ids.emplace_back(i); @@ -194,7 +211,7 @@ Status VRowDistribution::_generate_rows_distribution_for_non_auto_parititon( Status VRowDistribution::_generate_rows_distribution_for_auto_parititon( vectorized::Block* block, int partition_col_idx, bool has_filtered_rows, - std::vector& row_part_tablet_ids) { + std::vector& row_part_tablet_ids, int64_t& rows_stat_val) { auto num_rows = block->rows(); std::vector partition_keys = _vpartition->get_partition_keys(); @@ -204,22 +221,23 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_parititon( _missing_map.clear(); _missing_map.reserve(partition_col.column->size()); bool stop_processing = false; - //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, _tablet_indexes, stop_processing, _skip, &_missing_map)); - if (_missing_map.empty()) { - // we don't calculate it distribution when have missing values - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; - } + + // the missing vals for auto partition are also skipped. + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; } - RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); - } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create + } + RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); + + if (!_missing_map.empty()) { + // for missing partition keys, calc the missing partition and save in _partitions_need_create auto [part_ctx, part_func] = _get_partition_function(); auto return_type = part_func->data_type(); - // expose the data column vectorized::ColumnPtr range_left_col = block->get_by_position(partition_col_idx).column; if (const auto* nullable = @@ -228,15 +246,19 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_parititon( return_type = assert_cast(return_type.get()) ->get_nested_type(); } - // calc the end value and save them. - _save_missing_values(range_left_col, return_type); - // then call FE to create it. then FragmentExecutor will redo the load. - RETURN_IF_ERROR(_automatic_create_partition()); - // In the next round, we will _generate_rows_distribution_payload again to get right payload of new tablet - LOG(INFO) << "Auto created partition. Send block again."; - return Status::NeedSendAgain(""); - } // creating done + // calc the end value and save them. in the end of sending, we will create partitions for them and deal them. + RETURN_IF_ERROR(_save_missing_values(range_left_col, return_type, block, _missing_map)); + size_t new_bt_rows = _batching_block->rows(); + size_t new_bt_bytes = _batching_block->bytes(); + rows_stat_val -= new_bt_rows - _batching_rows; + _state->update_num_rows_load_total(_batching_rows - new_bt_rows); + _state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes); + DorisMetrics::instance()->load_rows->increment(_batching_rows - new_bt_rows); + DorisMetrics::instance()->load_bytes->increment(_batching_bytes - new_bt_bytes); + _batching_rows = new_bt_rows; + _batching_bytes = new_bt_bytes; + } return Status::OK(); } @@ -251,6 +273,7 @@ void VRowDistribution::_reset_row_part_tablet_ids( row_ids.clear(); partition_ids.clear(); tablet_ids.clear(); + // This is important for performance. row_ids.reserve(rows); partition_ids.reserve(rows); tablet_ids.reserve(rows); @@ -260,7 +283,7 @@ void VRowDistribution::_reset_row_part_tablet_ids( Status VRowDistribution::generate_rows_distribution( vectorized::Block& input_block, std::shared_ptr& block, int64_t& filtered_rows, bool& has_filtered_rows, - std::vector& row_part_tablet_ids) { + std::vector& row_part_tablet_ids, int64_t& rows_stat_val) { auto input_rows = input_block.rows(); _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows); @@ -269,6 +292,11 @@ Status VRowDistribution::generate_rows_distribution( RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows)); + // batching block rows which need new partitions. deal together at finish. + if (!_batching_block) [[unlikely]] { + _batching_block = MutableBlock::create_unique(block->create_same_struct_block(0).release()); + } + _row_distribution_watch.start(); auto num_rows = block->rows(); _tablet_finder->filter_bitmap().Reset(num_rows); @@ -283,15 +311,17 @@ Status VRowDistribution::generate_rows_distribution( int partition_col_idx = -1; if (_vpartition->is_projection_partition()) { // calc the start value of missing partition ranges. + // in VNodeChannel's add_block. the spare columns will be erased. RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &partition_col_idx)); VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); // change the column to compare to transformed. _vpartition->set_transformed_slots({(uint16_t)partition_col_idx}); } - if (_vpartition->is_auto_partition()) { + if (_vpartition->is_auto_partition() && !_deal_batched) { RETURN_IF_ERROR(_generate_rows_distribution_for_auto_parititon( - block.get(), partition_col_idx, has_filtered_rows, row_part_tablet_ids)); + block.get(), partition_col_idx, has_filtered_rows, row_part_tablet_ids, + rows_stat_val)); } else { // not auto partition RETURN_IF_ERROR(_generate_rows_distribution_for_non_auto_parititon( block.get(), has_filtered_rows, row_part_tablet_ids)); diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 3376eb5ab6..77104ef26f 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -22,12 +22,14 @@ #include #include +#include #include #include #include #include "common/status.h" #include "exec/tablet_info.h" +#include "runtime/runtime_state.h" #include "runtime/types.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" @@ -50,42 +52,46 @@ public: std::vector tablet_ids; }; -typedef Status (*OnPartitionsCreated)(void*, TCreatePartitionResult*); - -class VRowDistributionContext { -public: - RuntimeState* state = nullptr; - OlapTableBlockConvertor* block_convertor = nullptr; - OlapTabletFinder* tablet_finder = nullptr; - VOlapTablePartitionParam* vpartition = nullptr; - RuntimeProfile::Counter* add_partition_request_timer = nullptr; - int64_t txn_id = -1; - ObjectPool* pool; - OlapTableLocationParam* location; - const VExprContextSPtrs* vec_output_expr_ctxs; - OnPartitionsCreated on_partitions_created; - void* caller; - std::shared_ptr schema; -}; +// void* for caller +using CreatePartitionCallback = Status (*)(void*, TCreatePartitionResult*); class VRowDistribution { public: - VRowDistribution() {} - virtual ~VRowDistribution() {} + // only used to pass parameters for VRowDistribution + struct VRowDistributionContext { + RuntimeState* state; + OlapTableBlockConvertor* block_convertor; + OlapTabletFinder* tablet_finder; + VOlapTablePartitionParam* vpartition; + RuntimeProfile::Counter* add_partition_request_timer; + int64_t txn_id = -1; + ObjectPool* pool; + OlapTableLocationParam* location; + const VExprContextSPtrs* vec_output_expr_ctxs; + std::shared_ptr schema; + void* caller; + CreatePartitionCallback create_partition_callback; + }; + friend class VTabletWriter; + friend class VTabletWriterV2; - void init(VRowDistributionContext* ctx) { - _state = ctx->state; - _block_convertor = ctx->block_convertor; - _tablet_finder = ctx->tablet_finder; - _vpartition = ctx->vpartition; - _add_partition_request_timer = ctx->add_partition_request_timer; - _txn_id = ctx->txn_id; - _pool = ctx->pool; - _location = ctx->location; - _vec_output_expr_ctxs = ctx->vec_output_expr_ctxs; - _on_partitions_created = ctx->on_partitions_created; - _caller = ctx->caller; - _schema = ctx->schema; + VRowDistribution() = default; + virtual ~VRowDistribution() = default; + + void init(VRowDistributionContext ctx) { + _state = ctx.state; + _batch_size = std::max(_state->batch_size(), 8192); + _block_convertor = ctx.block_convertor; + _tablet_finder = ctx.tablet_finder; + _vpartition = ctx.vpartition; + _add_partition_request_timer = ctx.add_partition_request_timer; + _txn_id = ctx.txn_id; + _pool = ctx.pool; + _location = ctx.location; + _vec_output_expr_ctxs = ctx.vec_output_expr_ctxs; + _schema = ctx.schema; + _caller = ctx.caller; + _create_partition_callback = ctx.create_partition_callback; } Status open(RowDescriptor* output_row_desc) { @@ -111,15 +117,18 @@ public: Status generate_rows_distribution(vectorized::Block& input_block, std::shared_ptr& block, int64_t& filtered_rows, bool& has_filtered_rows, - std::vector& row_part_tablet_ids); + std::vector& row_part_tablet_ids, + int64_t& rows_stat_val); + bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; } + // create partitions when need for auto-partition table using #_partitions_need_create. + Status automatic_create_partition(); + void clear_batching_stats(); private: std::pair _get_partition_function(); - void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type); - - // create partitions when need for auto-partition table using #_partitions_need_create. - Status _automatic_create_partition(); + Status _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type, + Block* block, std::vector filter); void _get_tablet_ids(vectorized::Block* block, int32_t index_idx, std::vector& tablet_ids); @@ -135,7 +144,7 @@ private: Status _generate_rows_distribution_for_auto_parititon( vectorized::Block* block, int partition_col_idx, bool has_filtered_rows, - std::vector& row_part_tablet_ids); + std::vector& row_part_tablet_ids, int64_t& rows_stat_val); Status _generate_rows_distribution_for_non_auto_parititon( vectorized::Block* block, bool has_filtered_rows, @@ -144,11 +153,16 @@ private: void _reset_row_part_tablet_ids(std::vector& row_part_tablet_ids, int64_t rows); -private: RuntimeState* _state = nullptr; + int _batch_size = 0; - // support only one partition column now - std::vector> _partitions_need_create; + // for auto partitions + std::vector> + _partitions_need_create; // support only one partition column now + std::unique_ptr _batching_block; + bool _deal_batched = false; // If true, send batched block before any block's append. + size_t _batching_rows = 0, _batching_bytes = 0; + std::set _deduper; MonotonicStopWatch _row_distribution_watch; OlapTableBlockConvertor* _block_convertor = nullptr; @@ -158,10 +172,9 @@ private: int64_t _txn_id = -1; ObjectPool* _pool; OlapTableLocationParam* _location = nullptr; - // std::function _on_partition_created; // int64_t _number_output_rows = 0; const VExprContextSPtrs* _vec_output_expr_ctxs; - OnPartitionsCreated _on_partitions_created = nullptr; + CreatePartitionCallback _create_partition_callback = nullptr; void* _caller; std::shared_ptr _schema; diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h index bccdc39a06..3759284733 100644 --- a/be/src/vec/sink/vtablet_finder.h +++ b/be/src/vec/sink/vtablet_finder.h @@ -45,7 +45,7 @@ public: Status find_tablets(RuntimeState* state, vectorized::Block* block, int rows, std::vector& partitions, std::vector& tablet_index, bool& filtered, - std::vector& is_continue, std::vector* miss_rows = nullptr); + std::vector& skip, std::vector* miss_rows = nullptr); bool is_find_tablet_every_sink() { return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index d1fc46dc8d..8edde60adb 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -86,11 +86,6 @@ std::unique_ptr AsyncResultWriter::_get_block_from_queue() { return block; } -void AsyncResultWriter::_return_block_to_queue(std::unique_ptr add_block) { - std::lock_guard l(_m); - _data_queue.emplace_back(std::move(add_block)); -} - void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profile) { static_cast(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( [this, state, profile]() { this->process_block(state, profile); })); @@ -117,10 +112,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi auto block = _get_block_from_queue(); auto status = write(block); - if (status.is()) { - _return_block_to_queue(std::move(block)); - continue; - } else if (UNLIKELY(!status.ok())) { + if (!status.ok()) [[unlikely]] { std::unique_lock l(_m); _writer_status = status; if (_dependency && _is_finished()) { diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index a50b9296c7..0a217b34e6 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -105,7 +105,6 @@ private: [[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || _eos; } std::unique_ptr _get_block_from_queue(); - void _return_block_to_queue(std::unique_ptr); static constexpr auto QUEUE_SIZE = 3; std::mutex _m; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 7138993732..1639703d98 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -46,11 +46,13 @@ #include #include +#include "common/config.h" #include "olap/wal_manager.h" #include "util/runtime_profile.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/runtime/vdatetime_value.h" +#include "vec/sink/vrow_distribution.h" #include "vec/sink/vtablet_sink.h" #ifdef DEBUG @@ -110,9 +112,9 @@ bvar::PerSecond> g_sink_write_rows_per_second("sink_through Status IndexChannel::init(RuntimeState* state, const std::vector& tablets) { SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get()); - for (auto& tablet : tablets) { + for (const auto& tablet : tablets) { // First find the location BEs of this tablet - auto tablet_locations = _parent->_location->find_tablet(tablet.tablet_id); + auto* tablet_locations = _parent->_location->find_tablet(tablet.tablet_id); if (tablet_locations == nullptr) { return Status::InternalError("unknown tablet, tablet_id={}", tablet.tablet_id); } @@ -133,7 +135,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vectoradd_tablet(tablet); if (_parent->_write_single_replica) { - auto slave_location = _parent->_slave_location->find_tablet(tablet.tablet_id); + auto* slave_location = _parent->_slave_location->find_tablet(tablet.tablet_id); if (slave_location != nullptr) { channel->add_slave_tablet_nodes(tablet.tablet_id, slave_location->node_ids); } @@ -267,6 +269,22 @@ Status IndexChannel::check_tablet_filtered_rows_consistency() { return Status::OK(); } +static Status none_of(std::initializer_list vars) { + bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return var; }); + Status st = Status::OK(); + if (!none) { + std::string vars_str; + std::for_each(vars.begin(), vars.end(), + [&vars_str](bool var) -> void { vars_str += (var ? "1/" : "0/"); }); + if (!vars_str.empty()) { + vars_str.pop_back(); // 0/1/0/ -> 0/1/0 + } + st = Status::Uninitialized(vars_str); + } + + return st; +} + VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, int64_t node_id, bool is_incremental) : _parent(parent), @@ -296,7 +314,7 @@ Status VNodeChannel::init(RuntimeState* state) { _tuple_desc = _parent->_output_tuple_desc; _state = state; // get corresponding BE node. - auto node = _parent->_nodes_info->find_node(_node_id); + const auto* node = _parent->_nodes_info->find_node(_node_id); if (node == nullptr) { _cancelled = true; return Status::InternalError("unknown node id, id={}", _node_id); @@ -306,7 +324,7 @@ Status VNodeChannel::init(RuntimeState* state) { _load_info = "load_id=" + print_id(_parent->_load_id) + ", txn_id=" + std::to_string(_parent->_txn_id); - _row_desc.reset(new RowDescriptor(_tuple_desc, false)); + _row_desc = std::make_unique(_tuple_desc, false); _batch_size = state->batch_size(); _stub = state->exec_env()->brpc_internal_client_cache()->get_client(_node_info.host, @@ -352,7 +370,7 @@ void VNodeChannel::_open_internal(bool is_incremental) { if (deduper.contains(tablet.tablet_id)) { continue; } - auto ptablet = request->add_tablets(); + auto* ptablet = request->add_tablets(); ptablet->set_partition_id(tablet.partition_id); ptablet->set_tablet_id(tablet.tablet_id); deduper.insert(tablet.tablet_id); @@ -556,22 +574,6 @@ void VNodeChannel::_cancel_with_msg(const std::string& msg) { _cancelled = true; } -Status VNodeChannel::none_of(std::initializer_list vars) { - bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return var; }); - Status st = Status::OK(); - if (!none) { - std::string vars_str; - std::for_each(vars.begin(), vars.end(), - [&vars_str](bool var) -> void { vars_str += (var ? "1/" : "0/"); }); - if (!vars_str.empty()) { - vars_str.pop_back(); // 0/1/0/ -> 0/1/0 - } - st = Status::Uninitialized(vars_str); - } - - return st; -} - void VNodeChannel::try_send_pending_block(RuntimeState* state) { SCOPED_ATTACH_TASK(state); SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker); @@ -608,7 +610,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { _send_block_callback->clear_in_flight(); return; } - if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) { + if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95F) { LOG(WARNING) << "send block too large, this rpc may failed. send size: " << compressed_bytes << ", threshold: " << config::brpc_max_body_size << ", " << channel_info(); @@ -640,12 +642,10 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { request->set_write_single_replica(false); if (_parent->_write_single_replica) { request->set_write_single_replica(true); - for (std::unordered_map>::iterator iter = - _slave_tablet_nodes.begin(); - iter != _slave_tablet_nodes.end(); iter++) { + for (auto& _slave_tablet_node : _slave_tablet_nodes) { PSlaveTabletNodes slave_tablet_nodes; - for (auto node_id : iter->second) { - auto node = _parent->_nodes_info->find_node(node_id); + for (auto node_id : _slave_tablet_node.second) { + const auto* node = _parent->_nodes_info->find_node(node_id); if (node == nullptr) { return; } @@ -655,7 +655,8 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { pnode->set_host(node->host); pnode->set_async_internal_port(node->brpc_port); } - request->mutable_slave_tablet_nodes()->insert({iter->first, slave_tablet_nodes}); + request->mutable_slave_tablet_nodes()->insert( + {_slave_tablet_node.first, slave_tablet_nodes}); } } @@ -722,7 +723,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult Status status(Status::create(result.status())); if (status.ok()) { // if has error tablet, handle them first - for (auto& error : result.tablet_errors()) { + for (const auto& error : result.tablet_errors()) { _index_channel->mark_as_failed(this, "tablet error: " + error.msg(), error.tablet_id()); } @@ -730,7 +731,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult if (!st.ok()) { _cancel_with_msg(st.to_string()); } else if (is_last_rpc) { - for (auto& tablet : result.tablet_vec()) { + for (const auto& tablet : result.tablet_vec()) { TTabletCommitInfo commit_info; commit_info.tabletId = tablet.tablet_id(); commit_info.backendId = _node_id; @@ -748,7 +749,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult << ", host: " << this->host() << ", txn_id=" << _parent->_txn_id; } if (_parent->_write_single_replica) { - for (auto& tablet_slave_node_ids : result.success_slave_tablet_node_ids()) { + for (const auto& tablet_slave_node_ids : result.success_slave_tablet_node_ids()) { for (auto slave_node_id : tablet_slave_node_ids.second.slave_node_ids()) { TTabletCommitInfo commit_info; commit_info.tabletId = tablet_slave_node_ids.first; @@ -776,7 +777,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult } if (result.has_load_channel_profile()) { TRuntimeProfileTree tprofile; - const uint8_t* buf = (const uint8_t*)result.load_channel_profile().data(); + const auto* buf = (const uint8_t*)result.load_channel_profile().data(); uint32_t len = result.load_channel_profile().size(); auto st = deserialize_thrift_msg(buf, &len, false, &tprofile); if (st.ok()) { @@ -944,6 +945,11 @@ void VTabletWriter::_send_batch_process() { SCOPED_ATTACH_TASK(_state); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + int sleep_time = config::olap_table_sink_send_interval_microseconds * + (_vpartition->is_auto_partition() + ? config::olap_table_sink_send_interval_auto_partition_factor + : 1); + while (true) { // incremental open will temporarily make channels into abnormal state. stop checking when this. std::unique_lock l(_stop_check_channel); @@ -986,7 +992,7 @@ void VTabletWriter::_send_batch_process() { return; } } - bthread_usleep(config::olap_table_sink_send_interval_ms * 1000); + bthread_usleep(sleep_time); } } @@ -1059,25 +1065,20 @@ static Status on_partitions_created(void* writer, TCreatePartitionResult* result } Status VTabletWriter::_init_row_distribution() { - VRowDistributionContext ctx; + _row_distribution.init({.state = _state, + .block_convertor = _block_convertor.get(), + .tablet_finder = _tablet_finder.get(), + .vpartition = _vpartition, + .add_partition_request_timer = _add_partition_request_timer, + .txn_id = _txn_id, + .pool = _pool, + .location = _location, + .vec_output_expr_ctxs = &_vec_output_expr_ctxs, + .schema = _schema, + .caller = this, + .create_partition_callback = &vectorized::on_partitions_created}); - ctx.state = _state; - ctx.block_convertor = _block_convertor.get(); - ctx.tablet_finder = _tablet_finder.get(); - ctx.vpartition = _vpartition; - ctx.add_partition_request_timer = _add_partition_request_timer; - ctx.txn_id = _txn_id; - ctx.pool = _pool; - ctx.location = _location; - ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs; - ctx.on_partitions_created = &vectorized::on_partitions_created; - ctx.caller = (void*)this; - ctx.schema = _schema; - - _row_distribution.init(&ctx); - - RETURN_IF_ERROR(_row_distribution.open(_output_row_desc)); - return Status::OK(); + return _row_distribution.open(_output_row_desc); } Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { @@ -1198,7 +1199,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { for (int i = 0; i < _schema->indexes().size(); ++i) { // collect all tablets belong to this rollup std::vector tablets; - auto index = _schema->indexes()[i]; + auto* index = _schema->indexes()[i]; for (const auto& part : partitions) { for (const auto& tablet : part->indexes[i].tablets) { TTabletWithPartition tablet_with_partition; @@ -1235,7 +1236,7 @@ Status VTabletWriter::_incremental_open_node_channel( for (int i = 0; i < _schema->indexes().size(); ++i) { const OlapTableIndexSchema* index = _schema->indexes()[i]; std::vector tablets; - for (auto& t_part : partitions) { + for (const auto& t_part : partitions) { VOlapTablePartition* part = nullptr; RETURN_IF_ERROR(_vpartition->generate_partition_from(t_part, part)); for (const auto& tablet : part->indexes[i].tablets) { @@ -1279,7 +1280,7 @@ Status VTabletWriter::_incremental_open_node_channel( return Status::OK(); } -Status VTabletWriter::_cancel_channel_and_check_intolerable_failure( +static Status cancel_channel_and_check_intolerable_failure( Status status, const std::string& err_msg, const std::shared_ptr ich, const std::shared_ptr nch) { LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << err_msg; @@ -1311,10 +1312,38 @@ void VTabletWriter::_cancel_all_channel(Status status) { print_id(_load_id), _txn_id, status); } +Status VTabletWriter::_send_new_partition_batch() { + if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time + RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); + + Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref + + // these order is only. + // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. + // 2. deal batched block + // 3. now reuse the column of lval block. cuz append_block doesn't real adjust it. it generate a new block from that. + _row_distribution.clear_batching_stats(); + RETURN_IF_ERROR(this->append_block(tmp_block)); + _row_distribution._batching_block->set_mutable_columns( + tmp_block.mutate_columns()); // Recovery back + _row_distribution._batching_block->clear_column_data(); + _row_distribution._deal_batched = false; + } + return Status::OK(); +} + Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(_close_timer); Status status = exec_status; - _try_close = true; + + // must before set _try_close + if (status.ok()) { + SCOPED_TIMER(_profile->total_time_counter()); + _row_distribution._deal_batched = true; + status = _send_new_partition_batch(); + } + + _try_close = true; // will stop periodic thread if (status.ok()) { // only if status is ok can we call this _profile->total_time_counter(). // if status is not ok, this sink may not be prepared, so that _profile is null @@ -1325,14 +1354,14 @@ Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) { break; } index_channel->for_each_node_channel( - [this, &index_channel, &status](const std::shared_ptr& ch) { + [&index_channel, &status](const std::shared_ptr& ch) { if (!status.ok() || ch->is_closed()) { return; } // only first try close, all node channels will mark_close() ch->mark_close(); if (ch->is_cancelled()) { - status = this->_cancel_channel_and_check_intolerable_failure( + status = cancel_channel_and_check_intolerable_failure( status, ch->get_cancel_msg(), index_channel, ch); } }); @@ -1413,7 +1442,7 @@ Status VTabletWriter::close(Status exec_status) { // no pipeline, close may block waiting. auto s = ch->close_wait(_state); if (!s.ok()) { - status = this->_cancel_channel_and_check_intolerable_failure( + status = cancel_channel_and_check_intolerable_failure( status, s.to_string(), index_channel, ch); } ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, @@ -1572,6 +1601,9 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { return status; } + // check out of limit + RETURN_IF_ERROR(_send_new_partition_batch()); + auto rows = input_block.rows(); auto bytes = input_block.bytes(); if (UNLIKELY(rows == 0)) { @@ -1584,7 +1616,8 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { int64_t filtered_rows = 0; RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( - input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); + input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids, + _number_input_rows)); ChannelDistributionPayloadVec channel_to_payload; @@ -1624,11 +1657,7 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { block.get(), _block_convertor.get(), _tablet_finder.get())); } - // TODO: Before load, we need to projection unuseful column - // auto slots = _schema->tuple_desc()->slots(); - // for (auto desc : slots) { - // desc->col_pos(); - // } + // Add block to node channel for (size_t i = 0; i < _channels.size(); i++) { for (const auto& entry : channel_to_payload[i]) { diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index e9f6ae13f4..c6190c2675 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -29,15 +29,13 @@ #include #include #include -#include -#include - -#include #include "olap/wal_writer.h" #include "vwal_writer.h" // IWYU pragma: no_include +#include #include // IWYU pragma: keep +#include #include #include #include @@ -60,6 +58,7 @@ #include "exec/data_sink.h" #include "exec/tablet_info.h" #include "gutil/ref_counted.h" +#include "olap/wal_writer.h" #include "runtime/decimalv2_value.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" @@ -226,7 +225,7 @@ public: void add_tablet(const TTabletWithPartition& tablet) { _all_tablets.emplace_back(tablet); } std::string debug_tablets() const { std::stringstream ss; - for (auto& tab : _all_tablets) { + for (const auto& tab : _all_tablets) { tab.printTo(ss); ss << '\n'; } @@ -272,7 +271,7 @@ public: ss << "close wait failed coz rpc error"; { std::lock_guard l(_cancel_msg_lock); - if (_cancel_msg != "") { + if (!_cancel_msg.empty()) { ss << ". " << _cancel_msg; } } @@ -309,8 +308,6 @@ public: std::string host() const { return _node_info.host; } std::string name() const { return _name; } - Status none_of(std::initializer_list vars); - std::string channel_info() const { return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.host, _node_info.brpc_port); @@ -418,9 +415,8 @@ protected: // an IndexChannel is related to specific table and its rollup and mv class IndexChannel { public: - IndexChannel(VTabletWriter* parent, int64_t index_id, - const vectorized::VExprContextSPtr& where_clause) - : _parent(parent), _index_id(index_id), _where_clause(where_clause) { + IndexChannel(VTabletWriter* parent, int64_t index_id, vectorized::VExprContextSPtr where_clause) + : _parent(parent), _index_id(index_id), _where_clause(std::move(where_clause)) { _index_channel_tracker = std::make_unique("IndexChannel:indexID=" + std::to_string(_index_id)); } @@ -447,7 +443,7 @@ public: size_t get_pending_bytes() const { size_t mem_consumption = 0; - for (auto& kv : _node_channels) { + for (const auto& kv : _node_channels) { mem_consumption += kv.second->get_pending_bytes(); } return mem_consumption; @@ -542,6 +538,8 @@ public: Status on_partitions_created(TCreatePartitionResult* result); + Status _send_new_partition_batch(); + private: friend class VNodeChannel; friend class IndexChannel; @@ -560,18 +558,8 @@ private: void _generate_index_channels_payloads(std::vector& row_part_tablet_ids, ChannelDistributionPayloadVec& payload); - Status _cancel_channel_and_check_intolerable_failure(Status status, const std::string& err_msg, - const std::shared_ptr ich, - const std::shared_ptr nch); - void _cancel_all_channel(Status status); - void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type, - std::vector filter); - - // create partitions when need for auto-partition table using #_partitions_need_create. - Status _automatic_create_partition(); - Status _incremental_open_node_channel(const std::vector& partitions); TDataSink _t_sink; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 8bc65a4cba..fe8be28ec3 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -53,9 +53,7 @@ #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" -namespace doris { - -namespace vectorized { +namespace doris::vectorized { VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) : AsyncResultWriter(output_exprs), _t_sink(t_sink) { @@ -121,26 +119,20 @@ Status VTabletWriterV2::_incremental_open_streams( } Status VTabletWriterV2::_init_row_distribution() { - VRowDistributionContext ctx; + _row_distribution.init({.state = _state, + .block_convertor = _block_convertor.get(), + .tablet_finder = _tablet_finder.get(), + .vpartition = _vpartition, + .add_partition_request_timer = _add_partition_request_timer, + .txn_id = _txn_id, + .pool = _pool, + .location = _location, + .vec_output_expr_ctxs = &_vec_output_expr_ctxs, + .schema = _schema, + .caller = (void*)this, + .create_partition_callback = &vectorized::on_partitions_created}); - ctx.state = _state; - ctx.block_convertor = _block_convertor.get(); - ctx.tablet_finder = _tablet_finder.get(); - ctx.vpartition = _vpartition; - ctx.add_partition_request_timer = _add_partition_request_timer; - ctx.txn_id = _txn_id; - ctx.pool = _pool; - ctx.location = _location; - ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs; - ctx.on_partitions_created = &vectorized::on_partitions_created; - ctx.caller = (void*)this; - ctx.schema = _schema; - - _row_distribution.init(&ctx); - - RETURN_IF_ERROR(_row_distribution.open(_output_row_desc)); - - return Status::OK(); + return _row_distribution.open(_output_row_desc); } Status VTabletWriterV2::init_properties(ObjectPool* pool, bool group_commit) { @@ -331,11 +323,11 @@ void VTabletWriterV2::_generate_rows_for_tablet(std::vector& r Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, Streams& streams) { - auto location = _location->find_tablet(tablet_id); + const auto* location = _location->find_tablet(tablet_id); if (location == nullptr) { return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); } - for (auto& node_id : location->node_ids) { + for (const auto& node_id : location->node_ids) { PTabletID tablet; tablet.set_partition_id(partition_id); tablet.set_index_id(index_id); @@ -356,6 +348,9 @@ Status VTabletWriterV2::append_block(Block& input_block) { return status; } + // check out of limit + RETURN_IF_ERROR(_send_new_partition_batch()); + auto input_rows = input_block.rows(); auto input_bytes = input_block.bytes(); if (UNLIKELY(input_rows == 0)) { @@ -379,7 +374,8 @@ Status VTabletWriterV2::append_block(Block& input_block) { std::shared_ptr block; RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( - input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); + input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids, + _number_input_rows)); RowsForTablet rows_for_tablet; _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet); @@ -409,7 +405,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block .is_high_priority = _is_high_priority, .write_file_cache = _write_file_cache, }; - for (auto& index : _schema->indexes()) { + for (const auto& index : _schema->indexes()) { if (index->index_id == rows.index_id) { req.slots = &index->slots; req.schema_hash = index->schema_hash; @@ -440,6 +436,26 @@ Status VTabletWriterV2::_cancel(Status status) { return Status::OK(); } +Status VTabletWriterV2::_send_new_partition_batch() { + if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time + RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); + + Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref + + // these order is only. + // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. + // 2. deal batched block + // 3. now reuse the column of lval block. cuz append_block doesn't real adjust it. it generate a new block from that. + _row_distribution.clear_batching_stats(); + RETURN_IF_ERROR(this->append_block(tmp_block)); + _row_distribution._batching_block->set_mutable_columns( + tmp_block.mutate_columns()); // Recovery back + _row_distribution._batching_block->clear_column_data(); + _row_distribution._deal_batched = false; + } + return Status::OK(); +} + Status VTabletWriterV2::close(Status exec_status) { std::lock_guard close_lock(_close_mutex); if (_is_closed) { @@ -447,6 +463,13 @@ Status VTabletWriterV2::close(Status exec_status) { } SCOPED_TIMER(_close_timer); Status status = exec_status; + + if (status.ok()) { + SCOPED_TIMER(_profile->total_time_counter()); + _row_distribution._deal_batched = true; + status = _send_new_partition_batch(); + } + if (status.ok()) { // only if status is ok can we call this _profile->total_time_counter(). // if status is not ok, this sink may not be prepared, so that _profile is null @@ -538,5 +561,4 @@ Status VTabletWriterV2::_close_load(const Streams& streams) { return Status::OK(); } -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index d4ccf7b652..3ce291eb9a 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -130,6 +130,8 @@ private: Status _incremental_open_streams(const std::vector& partitions); + Status _send_new_partition_batch(); + void _build_tablet_node_mapping(); void _generate_rows_for_tablet(std::vector& row_part_tablet_ids, diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index d3192d9245..2250c06115 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -1500,3 +1500,13 @@ Indicates how many tablets failed to load in the data directory. At the same tim * Description: BE Whether to enable the use of java-jni. When enabled, mutual calls between c++ and java are allowed. Currently supports hudi, java-udf, jdbc, max-compute, paimon, preload, avro * Default value: true + +#### `olap_table_sink_send_interval_microseconds` + +* Description: While loading data, there's a polling thread keep sending data to corresponding BE from Coordinator's sink node. This thread will check whether there's data to send every `olap_table_sink_send_interval_microseconds` microseconds. +* Default value: 1000 + +#### `olap_table_sink_send_interval_auto_partition_factor` + +* Description: If we load data to a table which enabled auto partition. the interval of `olap_table_sink_send_interval_microseconds` is too slow. In that case the real interval will multiply this factor. +* Default value: 0.001 diff --git a/docs/en/docs/advanced/partition/auto-partition.md b/docs/en/docs/advanced/partition/auto-partition.md index 3e6fc6dc8e..f601445f87 100644 --- a/docs/en/docs/advanced/partition/auto-partition.md +++ b/docs/en/docs/advanced/partition/auto-partition.md @@ -38,19 +38,19 @@ When building a table, use the following syntax to populate [CREATE-TABLE](../.. 1. AUTO RANGE PARTITION: - ```SQL + ```sql AUTO PARTITION BY RANGE FUNC_CALL_EXPR ( ) ``` where - ```SQL + ```sql FUNC_CALL_EXPR ::= date_trunc ( , '' ) ``` 2. AUTO LIST PARTITION: - ```SQL + ```sql AUTO PARTITION BY LIST(`partition_col`) ( ) @@ -60,7 +60,7 @@ When building a table, use the following syntax to populate [CREATE-TABLE](../.. 1. AUTO RANGE PARTITION - ```SQL + ```sql CREATE TABLE `${tblDate}` ( `TIME_STAMP` datev2 NOT NULL COMMENT 'Date of collection' ) ENGINE=OLAP @@ -76,7 +76,7 @@ When building a table, use the following syntax to populate [CREATE-TABLE](../.. 2. AUTO LIST PARTITION - ```SQL + ```sql CREATE TABLE `${tblName1}` ( `str` varchar not null ) ENGINE=OLAP @@ -144,7 +144,7 @@ PROPERTIES ( The table stores a large amount of business history data, partitioned based on the date the transaction occurred. As you can see when building the table, we need to manually create the partitions in advance. If the data range of the partitioned columns changes, for example, 2022 is added to the above table, we need to create a partition by [ALTER-TABLE-PARTITION](../../sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-PARTITION.md) to make changes to the table partition. After using AUTO PARTITION, the table DDL can be changed to: -```SQL +```sql CREATE TABLE `DAILY_TRADE_VALUE` ( `TRADE_DATE` datev2 NULL, @@ -162,13 +162,13 @@ PROPERTIES ( ``` At this point the new table does not have a default partition: -```SQL +```sql mysql> show partitions from `DAILY_TRADE_VALUE`; Empty set (0.12 sec) ``` After inserting the data and then viewing it again, we could found that the table has been created with corresponding partitions: -```SQL +```sql mysql> insert into `DAILY_TRADE_VALUE` values ('2012-12-13', 1), ('2008-02-03', 2), ('2014-11-11', 3); Query OK, 3 rows affected (0.88 sec) {'label':'insert_754e2a3926a345ea_854793fb2638f0ec', 'status':'VISIBLE', 'txnId':'20014'} @@ -188,3 +188,4 @@ mysql> show partitions from `DAILY_TRADE_VALUE`; - If a partition is created during the insertion or importation of data and the process eventually fails, the created partition is not automatically deleted. - Tables that use AUTO PARTITION only have their partitions created automatically instead of manually. The original use of the table and the partitions it creates is the same as for non-AUTO PARTITION tables or partitions. +- When importing data to a table with AUTO PARTITION enabled, the polling interval for data sent by the Coordinator is different from that of a normal table. For details, see `olap_table_sink_send_interval_auto_partition_factor` in [BE Configuration](../../admin-manual/config/be-config.md). diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 3a94883804..586958e3a6 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -1527,5 +1527,15 @@ load tablets from header failed, failed tablets size: xxx, path=xxx #### `enable_java_support` -* Description: BE 是否开启使用java-jni,开启后允许 c++ 与 java 之间的相互调用。目前已经支持hudi、java-udf、jdbc、max-compute、paimon、preload、avro -* Default value: true +* 描述: BE 是否开启使用java-jni,开启后允许 c++ 与 java 之间的相互调用。目前已经支持hudi、java-udf、jdbc、max-compute、paimon、preload、avro +* 默认值: true + +#### `olap_table_sink_send_interval_microseconds`. + +* 描述: 数据导入时,Coordinator 的 sink 节点有一个轮询线程持续向对应BE发送数据。该线程将每隔 `olap_table_sink_send_interval_microseconds` 微秒检查是否有数据要发送。 +* 默认值:1000 + +#### `olap_table_sink_send_interval_auto_partition_factor`. + +* 描述: 如果我们向一个启用了自动分区的表导入数据,那么 `olap_table_sink_send_interval_microseconds` 的时间间隔就会太慢。在这种情况下,实际间隔将乘以该系数。 +* 默认值:0.001 diff --git a/docs/zh-CN/docs/advanced/partition/auto-partition.md b/docs/zh-CN/docs/advanced/partition/auto-partition.md index 42c07581c2..ef0da5032d 100644 --- a/docs/zh-CN/docs/advanced/partition/auto-partition.md +++ b/docs/zh-CN/docs/advanced/partition/auto-partition.md @@ -38,19 +38,19 @@ under the License. 1. AUTO RANGE PARTITION: - ```SQL + ```sql AUTO PARTITION BY RANGE FUNC_CALL_EXPR ( ) ``` 其中 - ```SQL + ```sql FUNC_CALL_EXPR ::= date_trunc ( , '' ) ``` 2. AUTO LIST PARTITION: - ```SQL + ```sql AUTO PARTITION BY LIST(`partition_col`) ( ) @@ -60,7 +60,7 @@ under the License. 1. AUTO RANGE PARTITION - ```SQL + ```sql CREATE TABLE `${tblDate}` ( `TIME_STAMP` datev2 NOT NULL COMMENT '采集日期' ) ENGINE=OLAP @@ -76,7 +76,7 @@ under the License. 2. AUTO LIST PARTITION - ```SQL + ```sql CREATE TABLE `${tblName1}` ( `str` varchar not null ) ENGINE=OLAP @@ -144,7 +144,7 @@ PROPERTIES ( 该表内存储了大量业务历史数据,依据交易发生的日期进行分区。可以看到在建表时,我们需要预先手动创建分区。如果分区列的数据范围发生变化,例如上表中增加了2022年的数据,则我们需要通过[ALTER-TABLE-PARTITION](../../sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-PARTITION.md)对表的分区进行更改。在使用AUTO PARTITION后,该表DDL可以改为: -```SQL +```sql CREATE TABLE `DAILY_TRADE_VALUE` ( `TRADE_DATE` datev2 NULL COMMENT '交易日期', @@ -162,13 +162,13 @@ PROPERTIES ( ``` 此时新表没有默认分区: -```SQL +```sql mysql> show partitions from `DAILY_TRADE_VALUE`; Empty set (0.12 sec) ``` 经过插入数据后再查看,发现该表已经创建了对应的分区: -```SQL +```sql mysql> insert into `DAILY_TRADE_VALUE` values ('2012-12-13', 1), ('2008-02-03', 2), ('2014-11-11', 3); Query OK, 3 rows affected (0.88 sec) {'label':'insert_754e2a3926a345ea_854793fb2638f0ec', 'status':'VISIBLE', 'txnId':'20014'} @@ -188,3 +188,4 @@ mysql> show partitions from `DAILY_TRADE_VALUE`; - 在数据的插入或导入过程中如果创建了分区,而最终整个过程失败,被创建的分区不会被自动删除。 - 使用AUTO PARTITION的表,只是分区创建方式上由手动转为了自动。表及其所创建分区的原本使用方法都与非AUTO PARTITION的表或分区相同。 +- 向开启了AUTO PARTITION的表导入数据时,Coordinator发送数据的轮询间隔与普通表有所不同。具体请见[BE配置项](../../admin-manual/config/be-config.md)中的`olap_table_sink_send_interval_auto_partition_factor`。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index e2cc67d992..351f7aa92f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -368,23 +368,30 @@ public class OlapTableSink extends DataSink { } } } - boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition(); + boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition(); // for auto create partition by function expr, there is no any partition firstly, // But this is required in thrift struct. if (enableAutomaticPartition && partitionIds.isEmpty()) { partitionParam.setDistributedColumns(getDistColumns(table.getDefaultDistributionInfo())); partitionParam.setPartitions(new ArrayList()); } - ArrayList exprs = partitionInfo.getPartitionExprs(); - if (enableAutomaticPartition && exprs != null && analyzer != null) { + + ArrayList exprSource = partitionInfo.getPartitionExprs(); + if (enableAutomaticPartition && exprSource != null && analyzer != null) { Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), analyzer.getContext()); tupleDescriptor.setTable(table); funcAnalyzer.registerTupleDescriptor(tupleDescriptor); + // we must clone the exprs. otherwise analyze will influence the origin exprs. + ArrayList exprs = new ArrayList(); + for (Expr e : exprSource) { + exprs.add(e.clone()); + } for (Expr e : exprs) { e.analyze(funcAnalyzer); } partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs)); } + partitionParam.setEnableAutomaticPartition(enableAutomaticPartition); break; } diff --git a/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out b/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out index 7e1dd673f6..2d85d85021 100644 --- a/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out +++ b/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out @@ -10,6 +10,10 @@ 8 2006-12-12T12:12:12 2001-11-12T12:12:12.123456 9 2006-12-12T12:12:12 2001-11-13T12:12:12.123456 10 2007-12-12T12:12:12 2001-11-14T12:12:12.123456 +11 2007-12-12T12:12:12 2001-11-14T12:12:12.123456 +12 2008-12-12T12:12:12 2001-11-14T12:12:12.123456 +13 2003-12-12T12:12:12 2001-11-14T12:12:12.123456 +14 2002-12-12T12:12:12 2001-11-14T12:12:12.123456 -- !select2 -- 1 Beijing 2001-12-12T12:12:12.123456 @@ -22,4 +26,8 @@ 8 chengDU 2001-11-12T12:12:12.123456 9 xian 2001-11-13T12:12:12.123456 10 beiJing 2001-11-14T12:12:12.123456 +11 11 2123-11-14T12:12:12.123456 +12 Chengdu 2123-11-14T12:12:12.123456 +13 11 2123-11-14T12:12:12.123456 +14 12 2123-11-14T12:12:12.123456 diff --git a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy index 0cf2eaf9c1..351d7bb320 100644 --- a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy +++ b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy @@ -40,11 +40,15 @@ suite("test_auto_partition_load") { file "auto_partition_stream_load1.csv" time 20000 } + sql """ insert into ${tblName1} values (11, '2007-12-12 12:12:12.123', '2001-11-14 12:12:12.123456') """ + sql """ insert into ${tblName1} values (12, '2008-12-12 12:12:12.123', '2001-11-14 12:12:12.123456') """ + sql """ insert into ${tblName1} values (13, '2003-12-12 12:12:12.123', '2001-11-14 12:12:12.123456') """ + sql """ insert into ${tblName1} values (14, '2002-12-12 12:12:12.123', '2001-11-14 12:12:12.123456') """ qt_select1 "select * from ${tblName1} order by k1" result1 = sql "show partitions from ${tblName1}" logger.info("${result1}") - assertEquals(result1.size(), 7) + assertEquals(result1.size(), 8) def tblName2 = "load_table2" @@ -71,9 +75,13 @@ suite("test_auto_partition_load") { file "auto_partition_stream_load2.csv" time 20000 } + sql """ insert into ${tblName2} values (11, '11', '2123-11-14 12:12:12.123456') """ + sql """ insert into ${tblName2} values (12, 'Chengdu', '2123-11-14 12:12:12.123456') """ + sql """ insert into ${tblName2} values (13, '11', '2123-11-14 12:12:12.123456') """ + sql """ insert into ${tblName2} values (14, '12', '2123-11-14 12:12:12.123456') """ qt_select2 "select * from ${tblName2} order by k1" result2 = sql "show partitions from ${tblName2}" logger.info("${result2}") - assertEquals(result2.size(), 9) + assertEquals(result2.size(), 11) }