[improve](dynamic table) refine SegmentWriter columns writer generate (#16816)
* [improve](dynamic table) refine SegmentWriter columns writer generate ``` Dynamic Block consists of two parts, dynamic part of columns and static part of columns static dynamic | ----- | ------- | the static ones are original _tablet_schame columns the dynamic ones are auto generated and extended from file scan. ``` **We should only consisder to use Block info to generte columns when it's a dynamic table load procudure.** And seperate the static ones and dynamic ones * test
This commit is contained in:
@ -101,6 +101,15 @@ Status SegmentWriter::init(const vectorized::Block* block) {
|
||||
return init(column_ids, true, block);
|
||||
}
|
||||
|
||||
// Dynamic table with extended columns and directly write from delta writer
|
||||
// Compaction/SchemaChange path will use the latest schema version of rowset
|
||||
// as it's shcema, so it's block is not from dynamic table load procedure.
|
||||
// If it is a dynamic table load procedure we should handle auto generated columns.
|
||||
bool SegmentWriter::_should_create_writers_with_dynamic_block(size_t num_columns_in_block) {
|
||||
return _tablet_schema->is_dynamic_schema() && _opts.is_direct_write &&
|
||||
num_columns_in_block > _tablet_schema->columns().size();
|
||||
}
|
||||
|
||||
Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
|
||||
const vectorized::Block* block) {
|
||||
DCHECK(_column_writers.empty());
|
||||
@ -193,8 +202,8 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
if (block) {
|
||||
RETURN_IF_ERROR(_create_writers_with_block(block, create_column_writer));
|
||||
if (block && _should_create_writers_with_dynamic_block(block->columns())) {
|
||||
RETURN_IF_ERROR(_create_writers_with_dynamic_block(block, create_column_writer));
|
||||
} else {
|
||||
RETURN_IF_ERROR(_create_writers(create_column_writer));
|
||||
}
|
||||
@ -227,38 +236,33 @@ Status SegmentWriter::_create_writers(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SegmentWriter::_create_writers_with_block(
|
||||
// Dynamic Block consists of two parts, dynamic part of columns and static part of columns
|
||||
// static dynamic
|
||||
// | ----- | ------- |
|
||||
// the static ones are original _tablet_schame columns
|
||||
// the dynamic ones are auto generated and extended from file scan
|
||||
Status SegmentWriter::_create_writers_with_dynamic_block(
|
||||
const vectorized::Block* block,
|
||||
std::function<Status(uint32_t, const TabletColumn&)> create_column_writer) {
|
||||
// generate writers from schema and extended schema info
|
||||
_olap_data_convertor->reserve(block->columns());
|
||||
// new columns added, query column info from Master
|
||||
vectorized::schema_util::FullBaseSchemaView schema_view;
|
||||
if (block->columns() > _tablet_schema->num_columns()) {
|
||||
schema_view.table_id = _tablet_schema->table_id();
|
||||
RETURN_IF_ERROR(
|
||||
vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
|
||||
CHECK(block->columns() > _tablet_schema->num_columns());
|
||||
schema_view.table_id = _tablet_schema->table_id();
|
||||
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
|
||||
// create writers with static columns
|
||||
for (size_t i = 0; i < _tablet_schema->columns().size(); ++i) {
|
||||
create_column_writer(i, _tablet_schema->column(i));
|
||||
}
|
||||
for (size_t i = 0; i < block->columns(); ++i) {
|
||||
// create writers with auto generated columns
|
||||
for (size_t i = _tablet_schema->columns().size(); i < block->columns(); ++i) {
|
||||
const auto& column_type_name = block->get_by_position(i);
|
||||
auto idx = _tablet_schema->field_index(column_type_name.name);
|
||||
if (idx >= 0) {
|
||||
RETURN_IF_ERROR(create_column_writer(i, _tablet_schema->column(idx)));
|
||||
} else {
|
||||
if (schema_view.column_name_to_column.count(column_type_name.name) == 0) {
|
||||
// expr columns, maybe happend in query like `insert into table1 select function(column1), column2 from table2`
|
||||
// the first column name may become `function(column1)`, so we use column offset to get columns info
|
||||
// TODO here we could optimize to col_unique_id in the future
|
||||
RETURN_IF_ERROR(create_column_writer(i, _tablet_schema->column(i)));
|
||||
continue;
|
||||
}
|
||||
// extended columns
|
||||
const auto& tcolumn = schema_view.column_name_to_column[column_type_name.name];
|
||||
TabletColumn new_column(tcolumn);
|
||||
RETURN_IF_ERROR(create_column_writer(i, new_column));
|
||||
_opts.rowset_ctx->schema_change_recorder->add_extended_columns(
|
||||
new_column, schema_view.schema_version);
|
||||
}
|
||||
const auto& tcolumn = schema_view.column_name_to_column[column_type_name.name];
|
||||
TabletColumn new_column(tcolumn);
|
||||
RETURN_IF_ERROR(create_column_writer(i, new_column));
|
||||
_opts.rowset_ctx->schema_change_recorder->add_extended_columns(new_column,
|
||||
schema_view.schema_version);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -109,7 +109,7 @@ public:
|
||||
bool is_unique_key() { return _tablet_schema->keys_type() == UNIQUE_KEYS; }
|
||||
|
||||
private:
|
||||
Status _create_writers_with_block(
|
||||
Status _create_writers_with_dynamic_block(
|
||||
const vectorized::Block* block,
|
||||
std::function<Status(uint32_t, const TabletColumn&)> writer_creator);
|
||||
Status _create_writers(std::function<Status(uint32_t, const TabletColumn&)> writer_creator);
|
||||
@ -134,6 +134,7 @@ private:
|
||||
void set_min_max_key(const Slice& key);
|
||||
void set_min_key(const Slice& key);
|
||||
void set_max_key(const Slice& key);
|
||||
bool _should_create_writers_with_dynamic_block(size_t num_columns_in_block);
|
||||
|
||||
void clear();
|
||||
|
||||
|
||||
13
regression-test/data/load_p0/insert/test_insert.out
Normal file
13
regression-test/data/load_p0/insert/test_insert.out
Normal file
@ -0,0 +1,13 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql1 --
|
||||
false -2147483647 7 7
|
||||
false 103 9 16
|
||||
false 1002 2 18
|
||||
false 5014 5 23
|
||||
false 2147483647 8 31
|
||||
true -2147483647 4 4
|
||||
true 3021 1 15
|
||||
true 3021 10 15
|
||||
true 25699 6 21
|
||||
true 2147483647 3 24
|
||||
|
||||
@ -18,4 +18,34 @@
|
||||
suite("test_insert") {
|
||||
// todo: test insert, such as insert values, insert select, insert txn
|
||||
sql "show load"
|
||||
def test_baseall = "test_query_db.baseall";
|
||||
def test_bigtable = "test_query_db.bigtable";
|
||||
def insert_tbl = "test_insert_tbl";
|
||||
|
||||
sql """ DROP TABLE IF EXISTS ${insert_tbl}"""
|
||||
sql """
|
||||
CREATE TABLE ${insert_tbl} (
|
||||
`k1` char(5) NULL,
|
||||
`k2` int(11) NULL,
|
||||
`k3` tinyint(4) NULL,
|
||||
`k4` int(11) NULL
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 5
|
||||
PROPERTIES (
|
||||
"replication_num"="1"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """
|
||||
INSERT INTO ${insert_tbl}
|
||||
SELECT a.k6, a.k3, b.k1
|
||||
, sum(b.k1) OVER (PARTITION BY a.k6 ORDER BY a.k3) AS w_sum
|
||||
FROM ${test_baseall} a
|
||||
JOIN ${test_bigtable} b ON a.k1 = b.k1 + 5
|
||||
ORDER BY a.k6, a.k3, a.k1, w_sum
|
||||
"""
|
||||
|
||||
qt_sql1 "select * from ${insert_tbl} order by 1, 2, 3, 4"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user