diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index ff8c272fb2..62ff0b2fcc 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -121,6 +121,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { _is_strict_mode = pschema.is_strict_mode(); if (_is_partial_update) { _auto_increment_column = pschema.auto_increment_column(); + _auto_increment_column_unique_id = pschema.auto_increment_column_unique_id(); } _timestamp_ms = pschema.timestamp_ms(); _timezone = pschema.timezone(); @@ -186,6 +187,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { } if (_is_partial_update) { _auto_increment_column = tschema.auto_increment_column; + _auto_increment_column_unique_id = tschema.auto_increment_column_unique_id; } for (const auto& tcolumn : tschema.partial_update_input_columns) { @@ -258,6 +260,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_partial_update(_is_partial_update); pschema->set_is_strict_mode(_is_strict_mode); pschema->set_auto_increment_column(_auto_increment_column); + pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id); pschema->set_timestamp_ms(_timestamp_ms); pschema->set_timezone(_timezone); for (auto col : _partial_update_input_columns) { diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 20f4fa51fc..fcba8fd826 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -93,6 +93,7 @@ public: return _partial_update_input_columns; } std::string auto_increment_coulumn() const { return _auto_increment_column; } + int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; } void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; } int64_t timestamp_ms() const { return _timestamp_ms; } void set_timezone(std::string timezone) { _timezone = timezone; } @@ -113,6 +114,7 @@ private: std::set _partial_update_input_columns; bool _is_strict_mode = false; std::string _auto_increment_column; + int32_t _auto_increment_column_unique_id; int64_t _timestamp_ms = 0; std::string _timezone; }; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 51cef7e9f5..5cfc260d1b 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -238,7 +238,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), table_schema_param->timezone()); + table_schema_param->timestamp_ms(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn()); } } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 123eb7d826..2676bf7a32 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -68,16 +68,22 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, _query_thread_context.init(); _arena = std::make_unique(); _vec_row_comparator = std::make_shared(_tablet_schema); - // TODO: Support ZOrderComparator in the future - _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); _num_columns = _tablet_schema->num_columns(); if (partial_update_info != nullptr) { _is_partial_update = partial_update_info->is_partial_update; if (_is_partial_update) { _num_columns = partial_update_info->partial_update_input_columns.size(); + if (partial_update_info->is_schema_contains_auto_inc_column && + !partial_update_info->is_input_columns_contains_auto_inc_column) { + _is_partial_update_and_auto_inc = true; + _num_columns += 1; + } } } + // TODO: Support ZOrderComparator in the future + _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); } + void MemTable::_init_columns_offset_by_slot_descs(const std::vector* slot_descs, const TupleDescriptor* tuple_desc) { for (auto slot_desc : *slot_descs) { @@ -89,6 +95,9 @@ void MemTable::_init_columns_offset_by_slot_descs(const std::vector& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, const std::string& timezone) { + int64_t timestamp_ms, const std::string& timezone, + const std::string& auto_increment_column) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; this->timestamp_ms = timestamp_ms; @@ -42,8 +43,13 @@ struct PartialUpdateInfo { } else { update_cids.emplace_back(i); } + if (auto_increment_column == tablet_column.name()) { + is_schema_contains_auto_inc_column = true; + } } this->is_strict_mode = is_strict_mode; + is_input_columns_contains_auto_inc_column = + is_partial_update && partial_update_input_columns.contains(auto_increment_column); } bool is_partial_update {false}; @@ -56,5 +62,7 @@ struct PartialUpdateInfo { bool is_strict_mode {false}; int64_t timestamp_ms {0}; std::string timezone; + bool is_input_columns_contains_auto_inc_column = false; + bool is_schema_contains_auto_inc_column = false; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index b254356847..83e93631ab 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -112,13 +112,6 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); _seq_coder = get_key_coder(column.type()); } - if (!_tablet_schema->auto_increment_column().empty()) { - _auto_inc_id_buffer = - vectorized::GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( - _tablet_schema->db_id(), _tablet_schema->table_id(), - _tablet_schema->column(_tablet_schema->auto_increment_column()) - .unique_id()); - } // encode the rowid into the primary key index if (!_tablet_schema->cluster_key_idxes().empty()) { const auto* type_info = get_scalar_type_info(); @@ -559,7 +552,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* // read and fill block auto mutable_full_columns = full_block.mutate_columns(); RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag, - has_default_or_nullable, segment_start_pos)); + has_default_or_nullable, segment_start_pos, block)); full_block.set_columns(std::move(mutable_full_columns)); // row column should be filled here if (_tablet_schema->store_row_column()) { @@ -618,7 +611,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, bool has_default_or_nullable, - const size_t& segment_start_pos) { + const size_t& segment_start_pos, + const vectorized::Block* block) { if constexpr (!std::is_same_v) { // TODO(plat1ko): cloud mode return Status::NotSupported("fill_missing_columns"); @@ -712,18 +706,6 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f } } - // deal with partial update auto increment column when there no key in old block. - if (!_tablet_schema->auto_increment_column().empty()) { - if (_auto_inc_id_allocator.total_count < use_default_or_null_flag.size()) { - std::vector> res; - RETURN_IF_ERROR( - _auto_inc_id_buffer->sync_request_ids(use_default_or_null_flag.size(), &res)); - for (auto [start, length] : res) { - _auto_inc_id_allocator.insert_ids(start, length); - } - } - } - // fill all missing value from mutable_old_columns, need to consider default value and null value for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { // `use_default_or_null_flag[idx] == true` doesn't mean that we should read values from the old row @@ -751,7 +733,11 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f FieldType::OLAP_FIELD_TYPE_BIGINT); auto auto_inc_column = assert_cast( mutable_full_columns[cids_missing[i]].get()); - auto_inc_column->insert(_auto_inc_id_allocator.next_id()); + auto_inc_column->insert( + (assert_cast( + block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__") + .column.get())) + ->get_element(idx)); } else { // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 1adb94aad2..2f26d6158e 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -40,7 +40,6 @@ #include "olap/tablet_schema.h" #include "util/faststring.h" #include "util/slice.h" -#include "vec/sink/autoinc_buffer.h" namespace doris { namespace vectorized { @@ -130,7 +129,8 @@ public: void set_mow_context(std::shared_ptr mow_context); Status fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, - bool has_default_or_nullable, const size_t& segment_start_pos); + bool has_default_or_nullable, const size_t& segment_start_pos, + const vectorized::Block* block); private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); @@ -226,9 +226,6 @@ private: // group every rowset-segment row id to speed up reader PartialUpdateReadPlan _rssid_to_rid; std::map _rsid_to_rowset; - - std::shared_ptr _auto_inc_id_buffer = nullptr; - vectorized::AutoIncIDAllocator _auto_inc_id_allocator; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 5cadc9aac6..5d2ddedb20 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -103,11 +103,6 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); _seq_coder = get_key_coder(column.type()); } - if (!_tablet_schema->auto_increment_column().empty()) { - _auto_inc_id_buffer = vectorized::GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( - _tablet_schema->db_id(), _tablet_schema->table_id(), - _tablet_schema->column(_tablet_schema->auto_increment_column()).unique_id()); - } if (_tablet_schema->has_inverted_index()) { _inverted_index_file_writer = std::make_unique( _file_writer->fs(), _file_writer->path().parent_path(), @@ -493,7 +488,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da // read and fill block auto mutable_full_columns = full_block.mutate_columns(); RETURN_IF_ERROR(_fill_missing_columns(mutable_full_columns, use_default_or_null_flag, - has_default_or_nullable, segment_start_pos)); + has_default_or_nullable, segment_start_pos, data.block)); // row column should be filled here if (_tablet_schema->store_row_column()) { // convert block to row store format @@ -552,7 +547,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da Status VerticalSegmentWriter::_fill_missing_columns( vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, bool has_default_or_nullable, - const size_t& segment_start_pos) { + const size_t& segment_start_pos, const vectorized::Block* block) { if constexpr (!std::is_same_v) { // TODO(plat1ko): CloudStorageEngine return Status::NotSupported("fill_missing_columns"); @@ -645,18 +640,6 @@ Status VerticalSegmentWriter::_fill_missing_columns( } } - // deal with partial update auto increment column when there no key in old block. - if (!_tablet_schema->auto_increment_column().empty()) { - if (_auto_inc_id_allocator.total_count < use_default_or_null_flag.size()) { - std::vector> res; - RETURN_IF_ERROR( - _auto_inc_id_buffer->sync_request_ids(use_default_or_null_flag.size(), &res)); - for (auto [start, length] : res) { - _auto_inc_id_allocator.insert_ids(start, length); - } - } - } - // fill all missing value from mutable_old_columns, need to consider default value and null value for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { // `use_default_or_null_flag[idx] == true` doesn't mean that we should read values from the old row @@ -684,7 +667,11 @@ Status VerticalSegmentWriter::_fill_missing_columns( FieldType::OLAP_FIELD_TYPE_BIGINT); auto auto_inc_column = assert_cast( mutable_full_columns[missing_cids[i]].get()); - auto_inc_column->insert(_auto_inc_id_allocator.next_id()); + auto_inc_column->insert( + (assert_cast( + block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__") + .column.get())) + ->get_element(idx)); } else { // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 7bc6bf7c4f..02e7170ff5 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -38,7 +38,6 @@ #include "olap/tablet_schema.h" #include "util/faststring.h" #include "util/slice.h" -#include "vec/sink/autoinc_buffer.h" namespace doris { namespace vectorized { @@ -145,7 +144,8 @@ private: Status _append_block_with_partial_content(RowsInBlock& data); Status _fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, - bool has_default_or_nullable, const size_t& segment_start_pos); + bool has_default_or_nullable, const size_t& segment_start_pos, + const vectorized::Block* block); private: uint32_t _segment_id; @@ -193,8 +193,6 @@ private: std::map _rsid_to_rowset; std::vector _batched_blocks; - std::shared_ptr _auto_inc_id_buffer = nullptr; - vectorized::AutoIncIDAllocator _auto_inc_id_allocator; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 4355bb6f96..2153a9ad1a 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -377,7 +377,8 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), table_schema_param->timezone()); + table_schema_param->timestamp_ms(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn()); } } // namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index abef8615b1..9f483fdc26 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -42,6 +42,7 @@ #include "runtime/task_execution_context.h" #include "util/debug_util.h" #include "util/runtime_profile.h" +#include "vec/columns/columns_number.h" namespace doris { class IRuntimeFilter; @@ -628,6 +629,10 @@ public: int task_num() const { return _task_num; } + vectorized::ColumnInt64* partial_update_auto_inc_column() { + return _partial_update_auto_inc_column; + }; + private: Status create_error_log_file(); @@ -755,6 +760,8 @@ private: // prohibit copies RuntimeState(const RuntimeState&); + + vectorized::ColumnInt64* _partial_update_auto_inc_column; }; #define RETURN_IF_CANCELLED(state) \ diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index 678c899d98..d93a654728 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -46,6 +47,7 @@ #include "vec/common/assert_cast.h" #include "vec/core/block.h" #include "vec/core/types.h" +#include "vec/data_types/data_type.h" #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_nullable.h" #include "vec/exprs/vexpr.h" @@ -66,8 +68,21 @@ Status OlapTableBlockConvertor::validate_and_convert_block( output_vexpr_ctxs, *input_block, block.get())); } - // fill the valus for auto-increment columns - if (_auto_inc_col_idx.has_value()) { + if (_is_partial_update_and_auto_inc) { + // If this load is partial update and this table has a auto inc column, + // e.g. table schema: k1, v1, v2(auto inc) + // 1. insert columns include auto inc column + // e.g. insert into table (k1, v2) value(a, 1); + // we do nothing. + // 2. insert columns do not include auto inc column + // e.g. insert into table (k1, v1) value(a, a); + // we need to fill auto_inc_cols by creating a new column. + if (!_auto_inc_col_idx.has_value()) { + RETURN_IF_ERROR(_partial_update_fill_auto_inc_cols(block.get(), rows)); + } + } else if (_auto_inc_col_idx.has_value()) { + // fill the valus for auto-increment columns + DCHECK_EQ(_is_partial_update_and_auto_inc, false); RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows)); } @@ -91,8 +106,16 @@ Status OlapTableBlockConvertor::validate_and_convert_block( return Status::OK(); } -void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size) { +void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size, + bool is_partial_update_and_auto_inc, + int32_t auto_increment_column_unique_id) { _batch_size = batch_size; + if (is_partial_update_and_auto_inc) { + _is_partial_update_and_auto_inc = is_partial_update_and_auto_inc; + _auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( + db_id, table_id, auto_increment_column_unique_id); + return; + } for (size_t idx = 0; idx < _output_tuple_desc->slots().size(); idx++) { if (_output_tuple_desc->slots()[idx]->is_auto_increment()) { _auto_inc_col_idx = idx; @@ -522,4 +545,24 @@ Status OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si return Status::OK(); } +Status OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(vectorized::Block* block, + size_t rows) { + auto dst_column = vectorized::ColumnInt64::create(); + vectorized::ColumnInt64::Container& dst_values = dst_column->get_data(); + size_t null_value_count = rows; + std::vector> res; + RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res)); + for (auto [start, length] : res) { + _auto_inc_id_allocator.insert_ids(start, length); + } + + for (size_t i = 0; i < rows; i++) { + dst_values.emplace_back(_auto_inc_id_allocator.next_id()); + } + block->insert(vectorized::ColumnWithTypeAndName(std::move(dst_column), + std::make_shared>(), + "__PARTIAL_UPDATE_AUTO_INC_COLUMN__")); + return Status::OK(); +} + } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index 4eaaef3869..0db340ce6c 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -53,7 +53,9 @@ public: int64_t num_filtered_rows() const { return _num_filtered_rows; } - void init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size); + void init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size, + bool is_partial_update_and_auto_inc = false, + int32_t auto_increment_column_unique_id = -1); AutoIncIDAllocator& auto_inc_id_allocator() { return _auto_inc_id_allocator; } @@ -82,6 +84,8 @@ private: Status _fill_auto_inc_cols(vectorized::Block* block, size_t rows); + Status _partial_update_fill_auto_inc_cols(vectorized::Block* block, size_t rows); + TupleDescriptor* _output_tuple_desc = nullptr; std::map, DecimalV2Value> _max_decimalv2_val; @@ -105,6 +109,7 @@ private: std::optional _auto_inc_col_idx; std::shared_ptr _auto_inc_id_buffer = nullptr; AutoIncIDAllocator _auto_inc_id_allocator; + bool _is_partial_update_and_auto_inc = false; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 3fb17850ef..0a7238e9f5 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1178,8 +1178,10 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { } _block_convertor = std::make_unique(_output_tuple_desc); - _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), - _state->batch_size()); + _block_convertor->init_autoinc_info( + _schema->db_id(), _schema->table_id(), _state->batch_size(), + _schema->is_partial_update() && !_schema->auto_increment_coulumn().empty(), + _schema->auto_increment_column_unique_id()); _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); // add all counter diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index b883d8e87c..c1b43722c3 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -213,8 +213,10 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { } _block_convertor = std::make_unique(_output_tuple_desc); - _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), - _state->batch_size()); + _block_convertor->init_autoinc_info( + _schema->db_id(), _schema->table_id(), _state->batch_size(), + _schema->is_partial_update() && !_schema->auto_increment_coulumn().empty(), + _schema->auto_increment_column_unique_id()); _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); // add all counter diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index d180c72339..ada7c6b770 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -312,6 +312,7 @@ public class OlapTableSink extends DataSink { for (Column col : table.getFullSchema()) { if (col.isAutoInc()) { schemaParam.setAutoIncrementColumn(col.getName()); + schemaParam.setAutoIncrementColumnUniqueId(col.getUniqueId()); } } } diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 9d6945becc..13c069f414 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -72,5 +72,6 @@ message POlapTableSchemaParam { optional string auto_increment_column = 10; optional int64 timestamp_ms = 11 [default = 0]; optional string timezone = 12; + optional int32 auto_increment_column_unique_id = 13; }; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index d82f74d771..ef7a845168 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -245,6 +245,7 @@ struct TOlapTableSchemaParam { 9: optional list partial_update_input_columns 10: optional bool is_strict_mode = false 11: optional string auto_increment_column + 12: optional i32 auto_increment_column_unique_id = -1 } struct TTabletLocation { diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out new file mode 100644 index 0000000000..79a0829911 --- /dev/null +++ b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out @@ -0,0 +1,97 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1_1 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select1_2 -- + +-- !select1_3 -- +Alice 200 +Beata 723 +Bob 123 +Carter 523 +Doris 800 +Nereids 923 +Smith 600 +Test 400 +Tom 323 + +-- !select1_4 -- + +-- !select2_1 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select2_2 -- + +-- !select2_3 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select2_4 -- + +-- !select3_1 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select3_2 -- + +-- !select3_3 -- +Alice 200 +Beata 9996 +Bob 9990 +Carter 9994 +Doris 800 +Nereids 9998 +Smith 600 +Test 400 +Tom 9992 + +-- !select3_4 -- + +-- !select3_5 -- +Alice 200 +BBBBeata 9996 +BBBBob 9990 +Beata 9996 +Bob 9990 +CCCCarter 9994 +Carter 9994 +Doris 800 +NNNNereids 9998 +Nereids 9998 +Smith 600 +TTTTom 9992 +Test 400 +Tom 9992 + +-- !select3_6 -- + diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out new file mode 100644 index 0000000000..a31e438f29 --- /dev/null +++ b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out @@ -0,0 +1,95 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1_1 -- +1 Bob 100 +2 Alice 200 +3 Tom 300 +4 Test 400 +5 Carter 500 +6 Smith 600 +7 Beata 700 +8 Doris 800 +9 Nereids 900 + +-- !select1_1 -- +Alice 200 +Beata 723 +Bob 123 +Carter 523 +Doris 800 +Nereids 923 +Smith 600 +Test 400 +Tom 323 + +-- !select1_2 -- + +-- !select2_1 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select2_2 -- + +-- !select2_3 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select2_4 -- + +-- !select3_1 -- +Alice 200 +Beata 700 +Bob 100 +Carter 500 +Doris 800 +Nereids 900 +Smith 600 +Test 400 +Tom 300 + +-- !select3_2 -- + +-- !select3_3 -- +Alice 200 +Beata 9996 +Bob 9990 +Carter 9994 +Doris 800 +Nereids 9998 +Smith 600 +Test 400 +Tom 9992 + +-- !select3_4 -- + +-- !select3_5 -- +Alice 200 +BBeata 9996 +BBob 9990 +Beata 9996 +Bob 9990 +CCarter 9994 +Carter 9994 +Doris 800 +NNereids 9998 +Nereids 9998 +Smith 600 +TTom 9992 +Test 400 +Tom 9992 + +-- !select3_6 -- + diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy new file mode 100644 index 0000000000..4e2c8a5cbd --- /dev/null +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_unique_table_auto_inc_partial_update_correct_insert") { + + def backends = sql_return_maparray('show backends') + def replicaNum = 0 + def targetBackend = null + for (def be : backends) { + def alive = be.Alive.toBoolean() + def decommissioned = be.SystemDecommissioned.toBoolean() + if (alive && !decommissioned) { + replicaNum++ + targetBackend = be + } + } + assertTrue(replicaNum > 0) + + def check_data_correct = { def tableName -> + def old_result = sql "select id from ${tableName} order by id;" + logger.info("first result: " + old_result) + for (int i = 1; i<30; ++i){ + def new_result = sql "select id from ${tableName} order by id;" + logger.info("new result: " + new_result) + for (int j = 0; j 1;" + check_data_correct(table1) + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + + // 1, 123 + // 3, 323 + // 5, 523 + // 7, 723 + // 9, 923 + sql "insert into ${table1} (id, value) values (1,123),(3,323),(5,523),(7,723),(9,923)" + qt_select1_3 "select name, value from ${table1} order by name, value;" + qt_select1_4 "select id, count(*) from ${table1} group by id having count(*) > 1;" + check_data_correct(table1) + sql "drop table if exists ${table1};" + + // test for partial update, auto inc col is value, update auto inc col + def table2 = "unique_auto_inc_col_value_partial_update_insert" + sql "drop table if exists ${table2}" + sql """ + CREATE TABLE IF NOT EXISTS `${table2}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + + + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + sql "insert into ${table2} (name, value) values ('Bob',100)" + sql "insert into ${table2} (name, value) values ('Alice',200)" + sql "insert into ${table2} (name, value) values ('Tom',300)" + sql "insert into ${table2} (name, value) values ('Test',400)" + sql "insert into ${table2} (name, value) values ('Carter',500)" + sql "insert into ${table2} (name, value) values ('Smith',600)" + sql "insert into ${table2} (name, value) values ('Beata',700)" + sql "insert into ${table2} (name, value) values ('Doris',800)" + sql "insert into ${table2} (name, value) values ('Nereids',900)" + qt_select2_1 "select name, value from ${table2} order by name, value;" + qt_select2_2 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + sql "insert into ${table2} (name, id) values ('Bob',9990),('Tom',9992),('Carter',9994),('Beata',9996),('Nereids',9998)" + qt_select2_3 "select name, value from ${table2} order by name, value;" + qt_select2_4 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + sql "drop table if exists ${table2};" + + // test for partial update, auto inc col is value, update other col + def table3 = "unique_auto_inc_col_value_partial_update_insert" + sql "drop table if exists ${table3}" + sql """ + CREATE TABLE IF NOT EXISTS `${table3}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + sql "set enable_unique_key_partial_update=false;" + sql "set enable_insert_strict=true;" + + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + sql "insert into ${table2} (name, value) values ('Bob',100),('Alice',200),('Tom',300),('Test',400),('Carter',500),('Smith',600),('Beata',700),('Doris',800),('Nereids',900)" + qt_select3_1 "select name, value from ${table3} order by name, value;" + qt_select3_2 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + sql "insert into ${table2} (name, value) values ('Bob',9990)" + sql "insert into ${table2} (name, value) values ('Tom',9992)" + sql "insert into ${table2} (name, value) values ('Carter',9994)" + sql "insert into ${table2} (name, value) values ('Beata',9996)" + sql "insert into ${table2} (name, value) values ('Nereids',9998)" + qt_select3_3 "select name, value from ${table3} order by name, value;" + qt_select3_4 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + + sql "insert into ${table2} (name, value) values ('BBBBob',9990)" + sql "insert into ${table2} (name, value) values ('TTTTom',9992)" + sql "insert into ${table2} (name, value) values ('CCCCarter',9994)" + sql "insert into ${table2} (name, value) values ('BBBBeata',9996)" + sql "insert into ${table2} (name, value) values ('NNNNereids',9998)" + qt_select3_5 "select name, value from ${table3} order by name, value;" + qt_select3_6 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + sql "drop table if exists ${table3};" +} + diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy new file mode 100644 index 0000000000..474794deb9 --- /dev/null +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_unique_table_auto_inc_partial_update_correct_stream_load") { + + def backends = sql_return_maparray('show backends') + def replicaNum = 0 + def targetBackend = null + for (def be : backends) { + def alive = be.Alive.toBoolean() + def decommissioned = be.SystemDecommissioned.toBoolean() + if (alive && !decommissioned) { + replicaNum++ + targetBackend = be + } + } + assertTrue(replicaNum > 0) + + def check_data_correct = { def tableName -> + def old_result = sql "select id from ${tableName} order by id;" + logger.info("first result: " + old_result) + for (int i = 1; i<30; ++i){ + def new_result = sql "select id from ${tableName} order by id;" + logger.info("new result: " + new_result) + for (int j = 0; j 1;" + check_data_correct(table1) + sql "drop table if exists ${table1};" + + // test for partial update, auto inc col is value, update auto inc col + def table2 = "unique_auto_inc_col_value_partial_update_stream_load" + sql "drop table if exists ${table2}" + sql """ + CREATE TABLE IF NOT EXISTS `${table2}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + streamLoad { + table "${table2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + + file 'auto_inc_basic.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_select2_1 "select name, value from ${table2} order by name, value;" + qt_select2_2 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + streamLoad { + table "${table2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, id' + set 'partial_columns', 'true' + + file 'auto_inc_partial_update2.csv' + time 10000 + } + sql "sync" + qt_select2_3 "select name, value from ${table2} order by name, value;" + qt_select2_4 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + sql "drop table if exists ${table2};" + + // test for partial update, auto inc col is value, update other col + def table3 = "unique_auto_inc_col_value_partial_update_stream_load" + sql "drop table if exists ${table3}" + sql """ + CREATE TABLE IF NOT EXISTS `${table3}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + streamLoad { + table "${table3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + + file 'auto_inc_basic.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_select3_1 "select name, value from ${table3} order by name, value;" + qt_select3_2 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + streamLoad { + table "${table3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + set 'partial_columns', 'true' + + file 'auto_inc_partial_update2.csv' + time 10000 + } + sql "sync" + qt_select3_3 "select name, value from ${table3} order by name, value;" + qt_select3_4 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + // BBob, 9990 + // TTom, 9992 + // CCarter, 9994 + // BBeata, 9996 + // NNereids, 9998 + streamLoad { + table "${table3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + set 'partial_columns', 'true' + + file 'auto_inc_partial_update3.csv' + time 10000 + } + sql "sync" + qt_select3_5 "select name, value from ${table3} order by name, value;" + qt_select3_6 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + sql "drop table if exists ${table3};" +} +