diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 0b8d2d6477..d6cc4930f0 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -101,6 +101,15 @@ Status SegmentWriter::init(const vectorized::Block* block) { return init(column_ids, true, block); } +// Dynamic table with extended columns and directly write from delta writer +// Compaction/SchemaChange path will use the latest schema version of rowset +// as it's shcema, so it's block is not from dynamic table load procedure. +// If it is a dynamic table load procedure we should handle auto generated columns. +bool SegmentWriter::_should_create_writers_with_dynamic_block(size_t num_columns_in_block) { + return _tablet_schema->is_dynamic_schema() && _opts.is_direct_write && + num_columns_in_block > _tablet_schema->columns().size(); +} + Status SegmentWriter::init(const std::vector& col_ids, bool has_key, const vectorized::Block* block) { DCHECK(_column_writers.empty()); @@ -193,8 +202,8 @@ Status SegmentWriter::init(const std::vector& col_ids, bool has_key, return Status::OK(); }; - if (block) { - RETURN_IF_ERROR(_create_writers_with_block(block, create_column_writer)); + if (block && _should_create_writers_with_dynamic_block(block->columns())) { + RETURN_IF_ERROR(_create_writers_with_dynamic_block(block, create_column_writer)); } else { RETURN_IF_ERROR(_create_writers(create_column_writer)); } @@ -227,38 +236,33 @@ Status SegmentWriter::_create_writers( return Status::OK(); } -Status SegmentWriter::_create_writers_with_block( +// Dynamic Block consists of two parts, dynamic part of columns and static part of columns +// static dynamic +// | ----- | ------- | +// the static ones are original _tablet_schame columns +// the dynamic ones are auto generated and extended from file scan +Status SegmentWriter::_create_writers_with_dynamic_block( const vectorized::Block* block, std::function create_column_writer) { // generate writers from schema and extended schema info _olap_data_convertor->reserve(block->columns()); // new columns added, query column info from Master vectorized::schema_util::FullBaseSchemaView schema_view; - if (block->columns() > _tablet_schema->num_columns()) { - schema_view.table_id = _tablet_schema->table_id(); - RETURN_IF_ERROR( - vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view)); + CHECK(block->columns() > _tablet_schema->num_columns()); + schema_view.table_id = _tablet_schema->table_id(); + RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view)); + // create writers with static columns + for (size_t i = 0; i < _tablet_schema->columns().size(); ++i) { + create_column_writer(i, _tablet_schema->column(i)); } - for (size_t i = 0; i < block->columns(); ++i) { + // create writers with auto generated columns + for (size_t i = _tablet_schema->columns().size(); i < block->columns(); ++i) { const auto& column_type_name = block->get_by_position(i); - auto idx = _tablet_schema->field_index(column_type_name.name); - if (idx >= 0) { - RETURN_IF_ERROR(create_column_writer(i, _tablet_schema->column(idx))); - } else { - if (schema_view.column_name_to_column.count(column_type_name.name) == 0) { - // expr columns, maybe happend in query like `insert into table1 select function(column1), column2 from table2` - // the first column name may become `function(column1)`, so we use column offset to get columns info - // TODO here we could optimize to col_unique_id in the future - RETURN_IF_ERROR(create_column_writer(i, _tablet_schema->column(i))); - continue; - } - // extended columns - const auto& tcolumn = schema_view.column_name_to_column[column_type_name.name]; - TabletColumn new_column(tcolumn); - RETURN_IF_ERROR(create_column_writer(i, new_column)); - _opts.rowset_ctx->schema_change_recorder->add_extended_columns( - new_column, schema_view.schema_version); - } + const auto& tcolumn = schema_view.column_name_to_column[column_type_name.name]; + TabletColumn new_column(tcolumn); + RETURN_IF_ERROR(create_column_writer(i, new_column)); + _opts.rowset_ctx->schema_change_recorder->add_extended_columns(new_column, + schema_view.schema_version); } return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 1bba784f58..00f93ce0d1 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -109,7 +109,7 @@ public: bool is_unique_key() { return _tablet_schema->keys_type() == UNIQUE_KEYS; } private: - Status _create_writers_with_block( + Status _create_writers_with_dynamic_block( const vectorized::Block* block, std::function writer_creator); Status _create_writers(std::function writer_creator); @@ -134,6 +134,7 @@ private: void set_min_max_key(const Slice& key); void set_min_key(const Slice& key); void set_max_key(const Slice& key); + bool _should_create_writers_with_dynamic_block(size_t num_columns_in_block); void clear(); diff --git a/regression-test/data/load_p0/insert/test_insert.out b/regression-test/data/load_p0/insert/test_insert.out new file mode 100644 index 0000000000..68d12c992f --- /dev/null +++ b/regression-test/data/load_p0/insert/test_insert.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +false -2147483647 7 7 +false 103 9 16 +false 1002 2 18 +false 5014 5 23 +false 2147483647 8 31 +true -2147483647 4 4 +true 3021 1 15 +true 3021 10 15 +true 25699 6 21 +true 2147483647 3 24 + diff --git a/regression-test/suites/load_p0/insert/test_insert.groovy b/regression-test/suites/load_p0/insert/test_insert.groovy index c1ec66f9b5..27a9f66992 100644 --- a/regression-test/suites/load_p0/insert/test_insert.groovy +++ b/regression-test/suites/load_p0/insert/test_insert.groovy @@ -18,4 +18,34 @@ suite("test_insert") { // todo: test insert, such as insert values, insert select, insert txn sql "show load" + def test_baseall = "test_query_db.baseall"; + def test_bigtable = "test_query_db.bigtable"; + def insert_tbl = "test_insert_tbl"; + + sql """ DROP TABLE IF EXISTS ${insert_tbl}""" + sql """ + CREATE TABLE ${insert_tbl} ( + `k1` char(5) NULL, + `k2` int(11) NULL, + `k3` tinyint(4) NULL, + `k4` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 + PROPERTIES ( + "replication_num"="1" + ); + """ + + sql """ + INSERT INTO ${insert_tbl} + SELECT a.k6, a.k3, b.k1 + , sum(b.k1) OVER (PARTITION BY a.k6 ORDER BY a.k3) AS w_sum + FROM ${test_baseall} a + JOIN ${test_bigtable} b ON a.k1 = b.k1 + 5 + ORDER BY a.k6, a.k3, a.k1, w_sum + """ + + qt_sql1 "select * from ${insert_tbl} order by 1, 2, 3, 4" }