From 42425808a10c508a85925a178525672c9c8f8558 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Mon, 20 May 2024 15:43:46 +0800 Subject: [PATCH] [Cherry-Pick](branch-2.1) Pick "Fix multiple replica partial update auto inc data inconsistency problem #34788" (#35056) * [Fix](auto inc) Fix multiple replica partial update auto inc data inconsistency problem (#34788) * **Problem:** For tables with auto-increment columns, updating partial columns can cause data inconsistency among replicas. **Cause:** Previously, the implementation for updating partial columns in tables with auto-increment columns was done independently on each BE (Backend), leading to potential inconsistencies in the auto-increment column values generated by each BE. **Solution:** Before distributing blocks, determine if the update involves partial columns of a table with an auto-increment column. If so, add the auto-increment column to the last column of the block. After distributing to each BE, each BE will check if the data key for the partial column update exists. If it exists, the previous auto-increment column value is used; if not, the auto-increment column value from the last column of the block is used. This ensures that the auto-increment column values are consistent across different BEs. * 2 * [Fix](regression-test) Fix auto inc partial update unstable regression test (#34940) --- be/src/exec/tablet_info.cpp | 3 + be/src/exec/tablet_info.h | 2 + be/src/olap/delta_writer_v2.cpp | 3 +- be/src/olap/memtable.cpp | 13 +- be/src/olap/memtable.h | 2 + be/src/olap/partial_update_info.h | 10 +- .../olap/rowset/segment_v2/segment_writer.cpp | 30 +-- .../olap/rowset/segment_v2/segment_writer.h | 7 +- .../segment_v2/vertical_segment_writer.cpp | 27 +- .../segment_v2/vertical_segment_writer.h | 6 +- be/src/olap/rowset_builder.cpp | 3 +- be/src/runtime/runtime_state.h | 7 + be/src/vec/sink/vtablet_block_convertor.cpp | 49 +++- be/src/vec/sink/vtablet_block_convertor.h | 7 +- be/src/vec/sink/writer/vtablet_writer.cpp | 6 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 6 +- .../apache/doris/planner/OlapTableSink.java | 1 + gensrc/proto/descriptors.proto | 1 + gensrc/thrift/Descriptors.thrift | 1 + ..._inc_partial_update_consistency_insert.out | 97 +++++++ ...partial_update_consistency_stream_load.out | 95 +++++++ ...c_partial_update_consistency_insert.groovy | 221 +++++++++++++++ ...tial_update_consistency_stream_load.groovy | 255 ++++++++++++++++++ 23 files changed, 788 insertions(+), 64 deletions(-) create mode 100644 regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out create mode 100644 regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out create mode 100644 regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy create mode 100644 regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index ff8c272fb2..62ff0b2fcc 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -121,6 +121,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { _is_strict_mode = pschema.is_strict_mode(); if (_is_partial_update) { _auto_increment_column = pschema.auto_increment_column(); + _auto_increment_column_unique_id = pschema.auto_increment_column_unique_id(); } _timestamp_ms = pschema.timestamp_ms(); _timezone = pschema.timezone(); @@ -186,6 +187,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { } if (_is_partial_update) { _auto_increment_column = tschema.auto_increment_column; + _auto_increment_column_unique_id = tschema.auto_increment_column_unique_id; } for (const auto& tcolumn : tschema.partial_update_input_columns) { @@ -258,6 +260,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_partial_update(_is_partial_update); pschema->set_is_strict_mode(_is_strict_mode); pschema->set_auto_increment_column(_auto_increment_column); + pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id); pschema->set_timestamp_ms(_timestamp_ms); pschema->set_timezone(_timezone); for (auto col : _partial_update_input_columns) { diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 20f4fa51fc..fcba8fd826 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -93,6 +93,7 @@ public: return _partial_update_input_columns; } std::string auto_increment_coulumn() const { return _auto_increment_column; } + int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; } void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; } int64_t timestamp_ms() const { return _timestamp_ms; } void set_timezone(std::string timezone) { _timezone = timezone; } @@ -113,6 +114,7 @@ private: std::set _partial_update_input_columns; bool _is_strict_mode = false; std::string _auto_increment_column; + int32_t _auto_increment_column_unique_id; int64_t _timestamp_ms = 0; std::string _timezone; }; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 51cef7e9f5..5cfc260d1b 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -238,7 +238,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), table_schema_param->timezone()); + table_schema_param->timestamp_ms(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn()); } } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 123eb7d826..2676bf7a32 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -68,16 +68,22 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, _query_thread_context.init(); _arena = std::make_unique(); _vec_row_comparator = std::make_shared(_tablet_schema); - // TODO: Support ZOrderComparator in the future - _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); _num_columns = _tablet_schema->num_columns(); if (partial_update_info != nullptr) { _is_partial_update = partial_update_info->is_partial_update; if (_is_partial_update) { _num_columns = partial_update_info->partial_update_input_columns.size(); + if (partial_update_info->is_schema_contains_auto_inc_column && + !partial_update_info->is_input_columns_contains_auto_inc_column) { + _is_partial_update_and_auto_inc = true; + _num_columns += 1; + } } } + // TODO: Support ZOrderComparator in the future + _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); } + void MemTable::_init_columns_offset_by_slot_descs(const std::vector* slot_descs, const TupleDescriptor* tuple_desc) { for (auto slot_desc : *slot_descs) { @@ -89,6 +95,9 @@ void MemTable::_init_columns_offset_by_slot_descs(const std::vector& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, const std::string& timezone) { + int64_t timestamp_ms, const std::string& timezone, + const std::string& auto_increment_column) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; this->timestamp_ms = timestamp_ms; @@ -42,8 +43,13 @@ struct PartialUpdateInfo { } else { update_cids.emplace_back(i); } + if (auto_increment_column == tablet_column.name()) { + is_schema_contains_auto_inc_column = true; + } } this->is_strict_mode = is_strict_mode; + is_input_columns_contains_auto_inc_column = + is_partial_update && partial_update_input_columns.contains(auto_increment_column); } bool is_partial_update {false}; @@ -56,5 +62,7 @@ struct PartialUpdateInfo { bool is_strict_mode {false}; int64_t timestamp_ms {0}; std::string timezone; + bool is_input_columns_contains_auto_inc_column = false; + bool is_schema_contains_auto_inc_column = false; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index b254356847..83e93631ab 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -112,13 +112,6 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); _seq_coder = get_key_coder(column.type()); } - if (!_tablet_schema->auto_increment_column().empty()) { - _auto_inc_id_buffer = - vectorized::GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( - _tablet_schema->db_id(), _tablet_schema->table_id(), - _tablet_schema->column(_tablet_schema->auto_increment_column()) - .unique_id()); - } // encode the rowid into the primary key index if (!_tablet_schema->cluster_key_idxes().empty()) { const auto* type_info = get_scalar_type_info(); @@ -559,7 +552,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* // read and fill block auto mutable_full_columns = full_block.mutate_columns(); RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag, - has_default_or_nullable, segment_start_pos)); + has_default_or_nullable, segment_start_pos, block)); full_block.set_columns(std::move(mutable_full_columns)); // row column should be filled here if (_tablet_schema->store_row_column()) { @@ -618,7 +611,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, bool has_default_or_nullable, - const size_t& segment_start_pos) { + const size_t& segment_start_pos, + const vectorized::Block* block) { if constexpr (!std::is_same_v) { // TODO(plat1ko): cloud mode return Status::NotSupported("fill_missing_columns"); @@ -712,18 +706,6 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f } } - // deal with partial update auto increment column when there no key in old block. - if (!_tablet_schema->auto_increment_column().empty()) { - if (_auto_inc_id_allocator.total_count < use_default_or_null_flag.size()) { - std::vector> res; - RETURN_IF_ERROR( - _auto_inc_id_buffer->sync_request_ids(use_default_or_null_flag.size(), &res)); - for (auto [start, length] : res) { - _auto_inc_id_allocator.insert_ids(start, length); - } - } - } - // fill all missing value from mutable_old_columns, need to consider default value and null value for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { // `use_default_or_null_flag[idx] == true` doesn't mean that we should read values from the old row @@ -751,7 +733,11 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f FieldType::OLAP_FIELD_TYPE_BIGINT); auto auto_inc_column = assert_cast( mutable_full_columns[cids_missing[i]].get()); - auto_inc_column->insert(_auto_inc_id_allocator.next_id()); + auto_inc_column->insert( + (assert_cast( + block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__") + .column.get())) + ->get_element(idx)); } else { // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 1adb94aad2..2f26d6158e 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -40,7 +40,6 @@ #include "olap/tablet_schema.h" #include "util/faststring.h" #include "util/slice.h" -#include "vec/sink/autoinc_buffer.h" namespace doris { namespace vectorized { @@ -130,7 +129,8 @@ public: void set_mow_context(std::shared_ptr mow_context); Status fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, - bool has_default_or_nullable, const size_t& segment_start_pos); + bool has_default_or_nullable, const size_t& segment_start_pos, + const vectorized::Block* block); private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); @@ -226,9 +226,6 @@ private: // group every rowset-segment row id to speed up reader PartialUpdateReadPlan _rssid_to_rid; std::map _rsid_to_rowset; - - std::shared_ptr _auto_inc_id_buffer = nullptr; - vectorized::AutoIncIDAllocator _auto_inc_id_allocator; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 5cadc9aac6..5d2ddedb20 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -103,11 +103,6 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); _seq_coder = get_key_coder(column.type()); } - if (!_tablet_schema->auto_increment_column().empty()) { - _auto_inc_id_buffer = vectorized::GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( - _tablet_schema->db_id(), _tablet_schema->table_id(), - _tablet_schema->column(_tablet_schema->auto_increment_column()).unique_id()); - } if (_tablet_schema->has_inverted_index()) { _inverted_index_file_writer = std::make_unique( _file_writer->fs(), _file_writer->path().parent_path(), @@ -493,7 +488,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da // read and fill block auto mutable_full_columns = full_block.mutate_columns(); RETURN_IF_ERROR(_fill_missing_columns(mutable_full_columns, use_default_or_null_flag, - has_default_or_nullable, segment_start_pos)); + has_default_or_nullable, segment_start_pos, data.block)); // row column should be filled here if (_tablet_schema->store_row_column()) { // convert block to row store format @@ -552,7 +547,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da Status VerticalSegmentWriter::_fill_missing_columns( vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, bool has_default_or_nullable, - const size_t& segment_start_pos) { + const size_t& segment_start_pos, const vectorized::Block* block) { if constexpr (!std::is_same_v) { // TODO(plat1ko): CloudStorageEngine return Status::NotSupported("fill_missing_columns"); @@ -645,18 +640,6 @@ Status VerticalSegmentWriter::_fill_missing_columns( } } - // deal with partial update auto increment column when there no key in old block. - if (!_tablet_schema->auto_increment_column().empty()) { - if (_auto_inc_id_allocator.total_count < use_default_or_null_flag.size()) { - std::vector> res; - RETURN_IF_ERROR( - _auto_inc_id_buffer->sync_request_ids(use_default_or_null_flag.size(), &res)); - for (auto [start, length] : res) { - _auto_inc_id_allocator.insert_ids(start, length); - } - } - } - // fill all missing value from mutable_old_columns, need to consider default value and null value for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { // `use_default_or_null_flag[idx] == true` doesn't mean that we should read values from the old row @@ -684,7 +667,11 @@ Status VerticalSegmentWriter::_fill_missing_columns( FieldType::OLAP_FIELD_TYPE_BIGINT); auto auto_inc_column = assert_cast( mutable_full_columns[missing_cids[i]].get()); - auto_inc_column->insert(_auto_inc_id_allocator.next_id()); + auto_inc_column->insert( + (assert_cast( + block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__") + .column.get())) + ->get_element(idx)); } else { // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 7bc6bf7c4f..02e7170ff5 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -38,7 +38,6 @@ #include "olap/tablet_schema.h" #include "util/faststring.h" #include "util/slice.h" -#include "vec/sink/autoinc_buffer.h" namespace doris { namespace vectorized { @@ -145,7 +144,8 @@ private: Status _append_block_with_partial_content(RowsInBlock& data); Status _fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, - bool has_default_or_nullable, const size_t& segment_start_pos); + bool has_default_or_nullable, const size_t& segment_start_pos, + const vectorized::Block* block); private: uint32_t _segment_id; @@ -193,8 +193,6 @@ private: std::map _rsid_to_rowset; std::vector _batched_blocks; - std::shared_ptr _auto_inc_id_buffer = nullptr; - vectorized::AutoIncIDAllocator _auto_inc_id_allocator; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 4355bb6f96..2153a9ad1a 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -377,7 +377,8 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), table_schema_param->timezone()); + table_schema_param->timestamp_ms(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn()); } } // namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index abef8615b1..9f483fdc26 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -42,6 +42,7 @@ #include "runtime/task_execution_context.h" #include "util/debug_util.h" #include "util/runtime_profile.h" +#include "vec/columns/columns_number.h" namespace doris { class IRuntimeFilter; @@ -628,6 +629,10 @@ public: int task_num() const { return _task_num; } + vectorized::ColumnInt64* partial_update_auto_inc_column() { + return _partial_update_auto_inc_column; + }; + private: Status create_error_log_file(); @@ -755,6 +760,8 @@ private: // prohibit copies RuntimeState(const RuntimeState&); + + vectorized::ColumnInt64* _partial_update_auto_inc_column; }; #define RETURN_IF_CANCELLED(state) \ diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index 678c899d98..d93a654728 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -46,6 +47,7 @@ #include "vec/common/assert_cast.h" #include "vec/core/block.h" #include "vec/core/types.h" +#include "vec/data_types/data_type.h" #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_nullable.h" #include "vec/exprs/vexpr.h" @@ -66,8 +68,21 @@ Status OlapTableBlockConvertor::validate_and_convert_block( output_vexpr_ctxs, *input_block, block.get())); } - // fill the valus for auto-increment columns - if (_auto_inc_col_idx.has_value()) { + if (_is_partial_update_and_auto_inc) { + // If this load is partial update and this table has a auto inc column, + // e.g. table schema: k1, v1, v2(auto inc) + // 1. insert columns include auto inc column + // e.g. insert into table (k1, v2) value(a, 1); + // we do nothing. + // 2. insert columns do not include auto inc column + // e.g. insert into table (k1, v1) value(a, a); + // we need to fill auto_inc_cols by creating a new column. + if (!_auto_inc_col_idx.has_value()) { + RETURN_IF_ERROR(_partial_update_fill_auto_inc_cols(block.get(), rows)); + } + } else if (_auto_inc_col_idx.has_value()) { + // fill the valus for auto-increment columns + DCHECK_EQ(_is_partial_update_and_auto_inc, false); RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows)); } @@ -91,8 +106,16 @@ Status OlapTableBlockConvertor::validate_and_convert_block( return Status::OK(); } -void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size) { +void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size, + bool is_partial_update_and_auto_inc, + int32_t auto_increment_column_unique_id) { _batch_size = batch_size; + if (is_partial_update_and_auto_inc) { + _is_partial_update_and_auto_inc = is_partial_update_and_auto_inc; + _auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( + db_id, table_id, auto_increment_column_unique_id); + return; + } for (size_t idx = 0; idx < _output_tuple_desc->slots().size(); idx++) { if (_output_tuple_desc->slots()[idx]->is_auto_increment()) { _auto_inc_col_idx = idx; @@ -522,4 +545,24 @@ Status OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si return Status::OK(); } +Status OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(vectorized::Block* block, + size_t rows) { + auto dst_column = vectorized::ColumnInt64::create(); + vectorized::ColumnInt64::Container& dst_values = dst_column->get_data(); + size_t null_value_count = rows; + std::vector> res; + RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res)); + for (auto [start, length] : res) { + _auto_inc_id_allocator.insert_ids(start, length); + } + + for (size_t i = 0; i < rows; i++) { + dst_values.emplace_back(_auto_inc_id_allocator.next_id()); + } + block->insert(vectorized::ColumnWithTypeAndName(std::move(dst_column), + std::make_shared>(), + "__PARTIAL_UPDATE_AUTO_INC_COLUMN__")); + return Status::OK(); +} + } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index 4eaaef3869..0db340ce6c 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -53,7 +53,9 @@ public: int64_t num_filtered_rows() const { return _num_filtered_rows; } - void init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size); + void init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size, + bool is_partial_update_and_auto_inc = false, + int32_t auto_increment_column_unique_id = -1); AutoIncIDAllocator& auto_inc_id_allocator() { return _auto_inc_id_allocator; } @@ -82,6 +84,8 @@ private: Status _fill_auto_inc_cols(vectorized::Block* block, size_t rows); + Status _partial_update_fill_auto_inc_cols(vectorized::Block* block, size_t rows); + TupleDescriptor* _output_tuple_desc = nullptr; std::map, DecimalV2Value> _max_decimalv2_val; @@ -105,6 +109,7 @@ private: std::optional _auto_inc_col_idx; std::shared_ptr _auto_inc_id_buffer = nullptr; AutoIncIDAllocator _auto_inc_id_allocator; + bool _is_partial_update_and_auto_inc = false; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 3fb17850ef..0a7238e9f5 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1178,8 +1178,10 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { } _block_convertor = std::make_unique(_output_tuple_desc); - _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), - _state->batch_size()); + _block_convertor->init_autoinc_info( + _schema->db_id(), _schema->table_id(), _state->batch_size(), + _schema->is_partial_update() && !_schema->auto_increment_coulumn().empty(), + _schema->auto_increment_column_unique_id()); _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); // add all counter diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index b883d8e87c..c1b43722c3 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -213,8 +213,10 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { } _block_convertor = std::make_unique(_output_tuple_desc); - _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), - _state->batch_size()); + _block_convertor->init_autoinc_info( + _schema->db_id(), _schema->table_id(), _state->batch_size(), + _schema->is_partial_update() && !_schema->auto_increment_coulumn().empty(), + _schema->auto_increment_column_unique_id()); _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); // add all counter diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index d180c72339..ada7c6b770 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -312,6 +312,7 @@ public class OlapTableSink extends DataSink { for (Column col : table.getFullSchema()) { if (col.isAutoInc()) { schemaParam.setAutoIncrementColumn(col.getName()); + schemaParam.setAutoIncrementColumnUniqueId(col.getUniqueId()); } } } diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 9d6945becc..13c069f414 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -72,5 +72,6 @@ message POlapTableSchemaParam { optional string auto_increment_column = 10; optional int64 timestamp_ms = 11 [default = 0]; optional string timezone = 12; + optional int32 auto_increment_column_unique_id = 13; }; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index d82f74d771..ef7a845168 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -245,6 +245,7 @@ struct TOlapTableSchemaParam { 9: optional list partial_update_input_columns 10: optional bool is_strict_mode = false 11: optional string auto_increment_column + 12: optional i32 auto_increment_column_unique_id = -1 } struct TTabletLocation { diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out new file mode 100644 index 0000000000..79a0829911 --- /dev/null +++ b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out @@ -0,0 +1,97 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1_1 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select1_2 -- + +-- !select1_3 -- +Alice 200 +Beata 723 +Bob 123 +Carter 523 +Doris 800 +Nereids 923 +Smith 600 +Test 400 +Tom 323 + +-- !select1_4 -- + +-- !select2_1 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select2_2 -- + +-- !select2_3 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select2_4 -- + +-- !select3_1 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select3_2 -- + +-- !select3_3 -- +Alice 200 +Beata 9996 +Bob 9990 +Carter 9994 +Doris 800 +Nereids 9998 +Smith 600 +Test 400 +Tom 9992 + +-- !select3_4 -- + +-- !select3_5 -- +Alice 200 +BBBBeata 9996 +BBBBob 9990 +Beata 9996 +Bob 9990 +CCCCarter 9994 +Carter 9994 +Doris 800 +NNNNereids 9998 +Nereids 9998 +Smith 600 +TTTTom 9992 +Test 400 +Tom 9992 + +-- !select3_6 -- + diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out new file mode 100644 index 0000000000..a31e438f29 --- /dev/null +++ b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out @@ -0,0 +1,95 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1_1 -- +1 Bob 100 +2 Alice 200 +3 Tom 300 +4 Test 400 +5 Carter 500 +6 Smith 600 +7 Beata 700 +8 Doris 800 +9 Nereids 900 + +-- !select1_1 -- +Alice 200 +Beata 723 +Bob 123 +Carter 523 +Doris 800 +Nereids 923 +Smith 600 +Test 400 +Tom 323 + +-- !select1_2 -- + +-- !select2_1 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select2_2 -- + +-- !select2_3 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select2_4 -- + +-- !select3_1 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select3_2 -- + +-- !select3_3 -- +Alice 200 +Beata 9996 +Bob 9990 +Carter 9994 +Doris 800 +Nereids 9998 +Smith 600 +Test 400 +Tom 9992 + +-- !select3_4 -- + +-- !select3_5 -- +Alice 200 +BBeata 9996 +BBob 9990 +Beata 9996 +Bob 9990 +CCarter 9994 +Carter 9994 +Doris 800 +NNereids 9998 +Nereids 9998 +Smith 600 +TTom 9992 +Test 400 +Tom 9992 + +-- !select3_6 -- + diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy new file mode 100644 index 0000000000..4e2c8a5cbd --- /dev/null +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_unique_table_auto_inc_partial_update_correct_insert") { + + def backends = sql_return_maparray('show backends') + def replicaNum = 0 + def targetBackend = null + for (def be : backends) { + def alive = be.Alive.toBoolean() + def decommissioned = be.SystemDecommissioned.toBoolean() + if (alive && !decommissioned) { + replicaNum++ + targetBackend = be + } + } + assertTrue(replicaNum > 0) + + def check_data_correct = { def tableName -> + def old_result = sql "select id from ${tableName} order by id;" + logger.info("first result: " + old_result) + for (int i = 1; i<30; ++i){ + def new_result = sql "select id from ${tableName} order by id;" + logger.info("new result: " + new_result) + for (int j = 0; j 1;" + check_data_correct(table1) + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + + // 1, 123 + // 3, 323 + // 5, 523 + // 7, 723 + // 9, 923 + sql "insert into ${table1} (id, value) values (1,123),(3,323),(5,523),(7,723),(9,923)" + qt_select1_3 "select name, value from ${table1} order by name, value;" + qt_select1_4 "select id, count(*) from ${table1} group by id having count(*) > 1;" + check_data_correct(table1) + sql "drop table if exists ${table1};" + + // test for partial update, auto inc col is value, update auto inc col + def table2 = "unique_auto_inc_col_value_partial_update_insert" + sql "drop table if exists ${table2}" + sql """ + CREATE TABLE IF NOT EXISTS `${table2}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + + + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + sql "insert into ${table2} (name, value) values ('Bob',100)" + sql "insert into ${table2} (name, value) values ('Alice',200)" + sql "insert into ${table2} (name, value) values ('Tom',300)" + sql "insert into ${table2} (name, value) values ('Test',400)" + sql "insert into ${table2} (name, value) values ('Carter',500)" + sql "insert into ${table2} (name, value) values ('Smith',600)" + sql "insert into ${table2} (name, value) values ('Beata',700)" + sql "insert into ${table2} (name, value) values ('Doris',800)" + sql "insert into ${table2} (name, value) values ('Nereids',900)" + qt_select2_1 "select name, value from ${table2} order by name, value;" + qt_select2_2 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + sql "insert into ${table2} (name, id) values ('Bob',9990),('Tom',9992),('Carter',9994),('Beata',9996),('Nereids',9998)" + qt_select2_3 "select name, value from ${table2} order by name, value;" + qt_select2_4 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + sql "drop table if exists ${table2};" + + // test for partial update, auto inc col is value, update other col + def table3 = "unique_auto_inc_col_value_partial_update_insert" + sql "drop table if exists ${table3}" + sql """ + CREATE TABLE IF NOT EXISTS `${table3}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + sql "set enable_unique_key_partial_update=false;" + sql "set enable_insert_strict=true;" + + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + sql "insert into ${table2} (name, value) values ('Bob',100),('Alice',200),('Tom',300),('Test',400),('Carter',500),('Smith',600),('Beata',700),('Doris',800),('Nereids',900)" + qt_select3_1 "select name, value from ${table3} order by name, value;" + qt_select3_2 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + sql "insert into ${table2} (name, value) values ('Bob',9990)" + sql "insert into ${table2} (name, value) values ('Tom',9992)" + sql "insert into ${table2} (name, value) values ('Carter',9994)" + sql "insert into ${table2} (name, value) values ('Beata',9996)" + sql "insert into ${table2} (name, value) values ('Nereids',9998)" + qt_select3_3 "select name, value from ${table3} order by name, value;" + qt_select3_4 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + + sql "insert into ${table2} (name, value) values ('BBBBob',9990)" + sql "insert into ${table2} (name, value) values ('TTTTom',9992)" + sql "insert into ${table2} (name, value) values ('CCCCarter',9994)" + sql "insert into ${table2} (name, value) values ('BBBBeata',9996)" + sql "insert into ${table2} (name, value) values ('NNNNereids',9998)" + qt_select3_5 "select name, value from ${table3} order by name, value;" + qt_select3_6 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + sql "drop table if exists ${table3};" +} + diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy new file mode 100644 index 0000000000..474794deb9 --- /dev/null +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_unique_table_auto_inc_partial_update_correct_stream_load") { + + def backends = sql_return_maparray('show backends') + def replicaNum = 0 + def targetBackend = null + for (def be : backends) { + def alive = be.Alive.toBoolean() + def decommissioned = be.SystemDecommissioned.toBoolean() + if (alive && !decommissioned) { + replicaNum++ + targetBackend = be + } + } + assertTrue(replicaNum > 0) + + def check_data_correct = { def tableName -> + def old_result = sql "select id from ${tableName} order by id;" + logger.info("first result: " + old_result) + for (int i = 1; i<30; ++i){ + def new_result = sql "select id from ${tableName} order by id;" + logger.info("new result: " + new_result) + for (int j = 0; j 1;" + check_data_correct(table1) + sql "drop table if exists ${table1};" + + // test for partial update, auto inc col is value, update auto inc col + def table2 = "unique_auto_inc_col_value_partial_update_stream_load" + sql "drop table if exists ${table2}" + sql """ + CREATE TABLE IF NOT EXISTS `${table2}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + streamLoad { + table "${table2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + + file 'auto_inc_basic.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_select2_1 "select name, value from ${table2} order by name, value;" + qt_select2_2 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + streamLoad { + table "${table2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, id' + set 'partial_columns', 'true' + + file 'auto_inc_partial_update2.csv' + time 10000 + } + sql "sync" + qt_select2_3 "select name, value from ${table2} order by name, value;" + qt_select2_4 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + sql "drop table if exists ${table2};" + + // test for partial update, auto inc col is value, update other col + def table3 = "unique_auto_inc_col_value_partial_update_stream_load" + sql "drop table if exists ${table3}" + sql """ + CREATE TABLE IF NOT EXISTS `${table3}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + streamLoad { + table "${table3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + + file 'auto_inc_basic.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_select3_1 "select name, value from ${table3} order by name, value;" + qt_select3_2 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + streamLoad { + table "${table3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + set 'partial_columns', 'true' + + file 'auto_inc_partial_update2.csv' + time 10000 + } + sql "sync" + qt_select3_3 "select name, value from ${table3} order by name, value;" + qt_select3_4 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + // BBob, 9990 + // TTom, 9992 + // CCarter, 9994 + // BBeata, 9996 + // NNereids, 9998 + streamLoad { + table "${table3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + set 'partial_columns', 'true' + + file 'auto_inc_partial_update3.csv' + time 10000 + } + sql "sync" + qt_select3_5 "select name, value from ${table3} order by name, value;" + qt_select3_6 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + sql "drop table if exists ${table3};" +} +