From 70c7e3d7aa30cfdbddbc6ba1725c22f270343c9b Mon Sep 17 00:00:00 2001 From: zhannngchen <48427519+zhannngchen@users.noreply.github.com> Date: Thu, 28 Jul 2022 17:03:05 +0800 Subject: [PATCH] [feature-wip](unique-key-merge-on-write) remove AggType on unique table with MoW, enable preAggreation, DSIP-018[5/2] (#11205) remove AggType on unique table with MoW, enable preAggreation --- be/src/olap/delta_writer.cpp | 5 +- be/src/olap/memtable.cpp | 87 +++++++++++++------ be/src/olap/memtable.h | 14 +-- .../doris/analysis/CreateTableStmt.java | 19 +++- .../doris/datasource/InternalDataSource.java | 5 ++ .../planner/MaterializedViewSelector.java | 22 +++-- .../apache/doris/planner/OlapScanNode.java | 8 +- .../doris/analysis/CreateTableStmtTest.java | 46 ++++++++++ .../planner/MaterializedViewSelectorTest.java | 8 +- 9 files changed, 163 insertions(+), 51 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 04b7327f25..e50a8567b4 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -278,9 +278,8 @@ Status DeltaWriter::wait_flush() { } void DeltaWriter::_reset_mem_table() { - _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema.get(), - _req.slots, _req.tuple_desc, _tablet->keys_type(), - _rowset_writer.get(), _is_vec)); + _mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots, + _req.tuple_desc, _rowset_writer.get(), _is_vec)); } Status DeltaWriter::close() { diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 46607f6a6c..a1ade071d0 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -29,16 +29,15 @@ namespace doris { -MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, +MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, - KeysType keys_type, RowsetWriter* rowset_writer, bool support_vec) - : _tablet_id(tablet_id), + RowsetWriter* rowset_writer, bool support_vec) + : _tablet(std::move(tablet)), _schema(schema), _tablet_schema(tablet_schema), _slot_descs(slot_descs), - _keys_type(keys_type), _mem_tracker(std::make_unique( - fmt::format("MemTable:tabletId={}", std::to_string(tablet_id)))), + fmt::format("MemTable:tabletId={}", std::to_string(tablet_id())))), _buffer_mem_pool(new MemPool(_mem_tracker.get())), _table_mem_pool(new MemPool(_mem_tracker.get())), _schema_size(_schema->schema_size()), @@ -52,18 +51,19 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet _skip_list = nullptr; _vec_row_comparator = std::make_shared(_schema); // TODO: Support ZOrderComparator in the future - _vec_skip_list = std::make_unique( - _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS); + _vec_skip_list = + std::make_unique(_vec_row_comparator.get(), _table_mem_pool.get(), + keys_type() == KeysType::DUP_KEYS); _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); } else { _vec_skip_list = nullptr; - if (_keys_type == KeysType::DUP_KEYS) { + if (keys_type() == KeysType::DUP_KEYS) { _insert_fn = &MemTable::_insert_dup; } else { _insert_fn = &MemTable::_insert_agg; } - if (_tablet_schema->has_sequence_col()) { - _aggregate_two_row_fn = &MemTable::_aggregate_two_row_with_sequence; + if (keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { + _aggregate_two_row_fn = &MemTable::_replace_row; } else { _aggregate_two_row_fn = &MemTable::_aggregate_two_row; } @@ -74,7 +74,7 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet _row_comparator = std::make_shared(_schema); } _skip_list = std::make_unique(_row_comparator.get(), _table_mem_pool.get(), - _keys_type == KeysType::DUP_KEYS); + keys_type() == KeysType::DUP_KEYS); } } void MemTable::_init_columns_offset_by_slot_descs(const std::vector* slot_descs, @@ -92,9 +92,18 @@ void MemTable::_init_columns_offset_by_slot_descs(const std::vectornum_key_columns(); cid < _schema->num_columns(); ++cid) { - vectorized::AggregateFunctionPtr function = - _tablet_schema->column(cid).get_aggregate_function({block->get_data_type(cid)}, - vectorized::AGG_LOAD_SUFFIX); + vectorized::AggregateFunctionPtr function; + if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->enable_unique_key_merge_on_write()) { + // In such table, non-key column's aggregation type is NONE, so we need to construct + // the aggregate function manually. + function = vectorized::AggregateFunctionSimpleFactory::instance().get( + "replace_load", {block->get_data_type(cid)}, {}, + block->get_data_type(cid)->is_nullable()); + } else { + function = _tablet_schema->column(cid).get_aggregate_function( + {block->get_data_type(cid)}, vectorized::AGG_LOAD_SUFFIX); + } DCHECK(function != nullptr); _agg_functions[cid] = function; @@ -118,7 +127,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) { } MemTable::~MemTable() { - if (_vec_skip_list != nullptr && _keys_type != KeysType::DUP_KEYS) { + if (_vec_skip_list != nullptr && keys_type() != KeysType::DUP_KEYS) { VecTable::Iterator it(_vec_skip_list.get()); for (it.SeekToFirst(); it.Valid(); it.Next()) { // We should release agg_places here, because they are not relesed when a @@ -160,7 +169,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vectorset_block(&_input_mutable_block); _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); - if (_keys_type != KeysType::DUP_KEYS) { + if (keys_type() != KeysType::DUP_KEYS) { _init_agg_functions(&target_block); } } @@ -180,7 +189,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vectorInsert(row_in_block, &overwritten); DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList"; @@ -255,14 +264,38 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) { ContiguousRow dst_row(_schema, row_in_skiplist); + if (_tablet_schema->has_sequence_col()) { + return agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(), + _table_mem_pool.get()); + } agg_update_row(&dst_row, src_row, _table_mem_pool.get()); } -void MemTable::_aggregate_two_row_with_sequence(const ContiguousRow& src_row, - TableKey row_in_skiplist) { +// In the Unique Key table with primary key index, the non-key column's aggregation +// type is NONE, to replace the data in duplicate row, we should copy the data manually. +void MemTable::_replace_row(const ContiguousRow& src_row, TableKey row_in_skiplist) { ContiguousRow dst_row(_schema, row_in_skiplist); - agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(), - _table_mem_pool.get()); + if (_tablet_schema->has_sequence_col()) { + const int32_t sequence_idx = _tablet_schema->sequence_col_idx(); + auto seq_dst_cell = dst_row.cell(sequence_idx); + auto seq_src_cell = src_row.cell(sequence_idx); + auto res = _schema->column(sequence_idx)->compare_cell(seq_dst_cell, seq_src_cell); + // dst sequence column larger than src, don't need to replace + if (res > 0) { + return; + } + } + // do replace + for (uint32_t cid = dst_row.schema()->num_key_columns(); cid < dst_row.schema()->num_columns(); + ++cid) { + auto dst_cell = dst_row.cell(cid); + auto src_cell = src_row.cell(cid); + auto column = _schema->column(cid); + // Dest cell already allocated memory, use dirct_copy rather than deep_copy(which will + // allocate memory for dst_cell). If dst_cell's size is smaller than src_cell, direct_copy + // will reallocate the memory to fit the src_cell's data. + column->direct_copy(&dst_cell, src_cell); + } } void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist) { @@ -287,7 +320,7 @@ template void MemTable::_collect_vskiplist_results() { VecTable::Iterator it(_vec_skip_list.get()); vectorized::Block in_block = _input_mutable_block.to_block(); - if (_keys_type == KeysType::DUP_KEYS) { + if (keys_type() == KeysType::DUP_KEYS) { std::vector row_pos_vec; DCHECK(in_block.rows() <= std::numeric_limits::max()); row_pos_vec.reserve(in_block.rows()); @@ -347,7 +380,7 @@ void MemTable::_collect_vskiplist_results() { } void MemTable::shrink_memtable_by_agg() { - if (_keys_type == KeysType::DUP_KEYS) { + if (keys_type() == KeysType::DUP_KEYS) { return; } _collect_vskiplist_results(); @@ -358,18 +391,18 @@ bool MemTable::is_flush() const { } bool MemTable::need_to_agg() { - return _keys_type == KeysType::DUP_KEYS ? is_flush() - : memory_usage() >= config::memtable_max_buffer_size; + return keys_type() == KeysType::DUP_KEYS ? is_flush() + : memory_usage() >= config::memtable_max_buffer_size; } Status MemTable::flush() { - VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id + VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id() << ", memsize: " << memory_usage() << ", rows: " << _rows; int64_t duration_ns = 0; RETURN_NOT_OK(_do_flush(duration_ns)); DorisMetrics::instance()->memtable_flush_total->increment(1); DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); - VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id + VLOG_CRITICAL << "after flush memtable for tablet: " << tablet_id() << ", flushsize: " << _flush_size; return Status::OK(); } diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 9be1f7d33d..0a18f51753 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -22,6 +22,7 @@ #include "common/object_pool.h" #include "olap/olap_define.h" #include "olap/skiplist.h" +#include "olap/tablet.h" #include "runtime/memory/mem_tracker.h" #include "util/tuple_row_zorder_compare.h" #include "vec/aggregate_functions/aggregate_function.h" @@ -40,12 +41,13 @@ class TupleDescriptor; class MemTable { public: - MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, + MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, - KeysType keys_type, RowsetWriter* rowset_writer, bool support_vec = false); + RowsetWriter* rowset_writer, bool support_vec = false); ~MemTable(); - int64_t tablet_id() const { return _tablet_id; } + int64_t tablet_id() const { return _tablet->tablet_id(); } + KeysType keys_type() const { return _tablet->keys_type(); } size_t memory_usage() const { return _mem_tracker->consumption(); } inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); } @@ -132,19 +134,19 @@ public: private: void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool); void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist); - void _aggregate_two_row_with_sequence(const ContiguousRow& new_row, TableKey row_in_skiplist); + void _replace_row(const ContiguousRow& src_row, TableKey row_in_skiplist); void _insert_dup(const Tuple* tuple); void _insert_agg(const Tuple* tuple); // for vectorized void _insert_one_row_from_block(RowInBlock* row_in_block); void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist); - int64_t _tablet_id; +private: + TabletSharedPtr _tablet; Schema* _schema; const TabletSchema* _tablet_schema; // the slot in _slot_descs are in order of tablet's schema const std::vector* _slot_descs; - KeysType _keys_type; // TODO: change to unique_ptr of comparator std::shared_ptr _row_comparator; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 27546c6ec9..de10c63b47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.external.elasticsearch.EsUtil; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -47,6 +48,7 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -276,6 +278,14 @@ public class CreateTableStmt extends DdlStmt { if (engineName.equals("hive") && !Config.enable_spark_load) { throw new AnalysisException("Spark Load from hive table is coming soon"); } + + // `analyzeUniqueKeyMergeOnWrite` would modify `properties`, which will be used later, + // so we just clone a properties map here. + boolean enableUniqueKeyMergeOnWrite = false; + if (properties != null) { + enableUniqueKeyMergeOnWrite = PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(new HashMap<>(properties)); + } + // analyze key desc if (engineName.equalsIgnoreCase("olap")) { // olap table @@ -339,6 +349,9 @@ public class CreateTableStmt extends DdlStmt { if (keysDesc.getKeysType() == KeysType.DUP_KEYS) { type = AggregateType.NONE; } + if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS && enableUniqueKeyMergeOnWrite) { + type = AggregateType.NONE; + } for (int i = keysDesc.keysColumnSize(); i < columnDefs.size(); ++i) { columnDefs.get(i).setAggregateType(type); } @@ -363,7 +376,11 @@ public class CreateTableStmt extends DdlStmt { if (Config.enable_batch_delete_by_default && keysDesc != null && keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) { - columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE)); + // TODO(zhangchen): Disable the delete sign column for primary key temporary, will replace + // with a better solution later. + if (!enableUniqueKeyMergeOnWrite) { + columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE)); + } } boolean hasObjectStored = false; String objectStoredColumn = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java index 6796ba922b..cb484d2820 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java @@ -1890,6 +1890,11 @@ public class InternalDataSource implements DataSourceIf { try { sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType()); if (sequenceColType != null) { + // TODO(zhannngchen) will support sequence column later. + if (olapTable.getEnableUniqueKeyMergeOnWrite()) { + throw new AnalysisException("Unique key table with MoW(merge on write) not support " + + "sequence column for now"); + } olapTable.setSequenceInfo(sequenceColType); } } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java index aa613540e3..dbdcbfb765 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java @@ -140,13 +140,14 @@ public class MaterializedViewSelector { // Step2: check all columns in compensating predicates are available in the view output checkCompensatingPredicates(columnNamesInPredicates.get(tableId), candidateIndexIdToMeta); // Step3: group by list in query is the subset of group by list in view or view contains no aggregation - checkGrouping(columnNamesInGrouping.get(tableId), candidateIndexIdToMeta); + checkGrouping(table, columnNamesInGrouping.get(tableId), candidateIndexIdToMeta); // Step4: aggregation functions are available in the view output - checkAggregationFunction(aggColumnsInQuery.get(tableId), candidateIndexIdToMeta); + checkAggregationFunction(table, aggColumnsInQuery.get(tableId), candidateIndexIdToMeta); // Step5: columns required to compute output expr are available in the view output checkOutputColumns(columnNamesInQueryOutput.get(tableId), candidateIndexIdToMeta); // Step6: if table type is aggregate and the candidateIndexIdToSchema is empty, - if ((table.getKeysType() == KeysType.AGG_KEYS || table.getKeysType() == KeysType.UNIQUE_KEYS) + if ((table.getKeysType() == KeysType.AGG_KEYS || (table.getKeysType() == KeysType.UNIQUE_KEYS + && !table.getTableProperty().getEnableUniqueKeyMergeOnWrite())) && candidateIndexIdToMeta.size() == 0) { // the base index will be added in the candidateIndexIdToSchema. /** @@ -299,7 +300,7 @@ public class MaterializedViewSelector { * @param candidateIndexIdToMeta */ - private void checkGrouping(Set columnsInGrouping, Map + private void checkGrouping(OlapTable table, Set columnsInGrouping, Map candidateIndexIdToMeta) { Iterator> iterator = candidateIndexIdToMeta.entrySet().iterator(); while (iterator.hasNext()) { @@ -325,8 +326,10 @@ public class MaterializedViewSelector { ISSUE-3016, MaterializedViewFunctionTest: testDeduplicateQueryInAgg */ - if (indexNonAggregatedColumnNames.size() == candidateIndexSchema.size() - && candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS) { + boolean noNeedAggregation = candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS + || (candidateIndexMeta.getKeysType() == KeysType.UNIQUE_KEYS + && table.getTableProperty().getEnableUniqueKeyMergeOnWrite()); + if (indexNonAggregatedColumnNames.size() == candidateIndexSchema.size() && noNeedAggregation) { continue; } // When the query is SPJ type but the candidate index is SPJG type, it will not pass directly. @@ -348,7 +351,7 @@ public class MaterializedViewSelector { + Joiner.on(",").join(candidateIndexIdToMeta.keySet())); } - private void checkAggregationFunction(Set aggregatedColumnsInQueryOutput, + private void checkAggregationFunction(OlapTable table, Set aggregatedColumnsInQueryOutput, Map candidateIndexIdToMeta) throws AnalysisException { Iterator> iterator = candidateIndexIdToMeta.entrySet().iterator(); while (iterator.hasNext()) { @@ -356,7 +359,10 @@ public class MaterializedViewSelector { MaterializedIndexMeta candidateIndexMeta = entry.getValue(); List indexAggColumnExpsList = mvAggColumnsToExprList(candidateIndexMeta); // When the candidate index is SPJ type, it passes the verification directly - if (indexAggColumnExpsList.size() == 0 && candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS) { + boolean noNeedAggregation = candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS + || (candidateIndexMeta.getKeysType() == KeysType.UNIQUE_KEYS + && table.getTableProperty().getEnableUniqueKeyMergeOnWrite()); + if (indexAggColumnExpsList.size() == 0 && noNeedAggregation) { continue; } // When the query is SPJ type but the candidate index is SPJG type, it will not pass directly. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 2d4e9399e3..787d8a771f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -275,8 +275,9 @@ public class OlapScanNode extends ScanNode { String situation; boolean update; CHECK: { // CHECKSTYLE IGNORE THIS LINE - if (olapTable.getKeysType() == KeysType.DUP_KEYS) { - situation = "The key type of table is duplicate."; + if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS + && olapTable.getEnableUniqueKeyMergeOnWrite())) { + situation = "The key type of table is duplicate, or unique key with merge-on-write."; update = true; break CHECK; } @@ -659,7 +660,8 @@ public class OlapScanNode extends ScanNode { public void selectBestRollupByRollupSelector(Analyzer analyzer) throws UserException { // Step2: select best rollup long start = System.currentTimeMillis(); - if (olapTable.getKeysType() == KeysType.DUP_KEYS) { + if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS + && olapTable.getEnableUniqueKeyMergeOnWrite())) { // This function is compatible with the INDEX selection logic of ROLLUP, // so the Duplicate table here returns base index directly // and the selection logic of materialized view is selected in diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java index 5371482345..44e62d2dc5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.datasource.InternalDataSource; import org.apache.doris.mysql.privilege.MockedAuth; import org.apache.doris.mysql.privilege.PaloAuth; @@ -122,6 +123,51 @@ public class CreateTableStmtTest { Assert.assertTrue(stmt.toSql().contains("DISTRIBUTED BY RANDOM\nBUCKETS 6")); } + @Test + public void testCreateTableUniqueKeyNormal() throws UserException { + // setup + Map properties = new HashMap<>(); + ColumnDef col3 = new ColumnDef("col3", new TypeDef(ScalarType.createType(PrimitiveType.BIGINT))); + col3.setIsKey(false); + cols.add(col3); + ColumnDef col4 = new ColumnDef("col4", new TypeDef(ScalarType.createType(PrimitiveType.STRING))); + col4.setIsKey(false); + cols.add(col4); + // test normal case + CreateTableStmt stmt = new CreateTableStmt(false, false, tblName, cols, "olap", + new KeysDesc(KeysType.UNIQUE_KEYS, colsName), null, + new HashDistributionDesc(10, Lists.newArrayList("col1")), properties, null, ""); + stmt.analyze(analyzer); + Assert.assertEquals(col3.getAggregateType(), AggregateType.REPLACE); + Assert.assertEquals(col4.getAggregateType(), AggregateType.REPLACE); + // clear + cols.remove(col3); + cols.remove(col4); + } + + @Test + public void testCreateTableUniqueKeyMoW() throws UserException { + // setup + Map properties = new HashMap<>(); + properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, "true"); + ColumnDef col3 = new ColumnDef("col3", new TypeDef(ScalarType.createType(PrimitiveType.BIGINT))); + col3.setIsKey(false); + cols.add(col3); + ColumnDef col4 = new ColumnDef("col4", new TypeDef(ScalarType.createType(PrimitiveType.STRING))); + col4.setIsKey(false); + cols.add(col4); + // test merge-on-write + CreateTableStmt stmt = new CreateTableStmt(false, false, tblName, cols, "olap", + new KeysDesc(KeysType.UNIQUE_KEYS, colsName), null, + new HashDistributionDesc(10, Lists.newArrayList("col1")), properties, null, ""); + stmt.analyze(analyzer); + Assert.assertEquals(col3.getAggregateType(), AggregateType.NONE); + Assert.assertEquals(col4.getAggregateType(), AggregateType.NONE); + // clear + cols.remove(col3); + cols.remove(col4); + } + @Test public void testCreateTableWithRollup() throws UserException { List ops = Lists.newArrayList(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java index 6f8d711c0e..820c90d4c9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java @@ -210,6 +210,7 @@ public class MaterializedViewSelectorTest { @Test public void testCheckGrouping(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer, + @Injectable OlapTable table, @Injectable MaterializedIndexMeta indexMeta1, @Injectable MaterializedIndexMeta indexMeta2, @Injectable MaterializedIndexMeta indexMeta3) { @@ -249,7 +250,7 @@ public class MaterializedViewSelectorTest { MaterializedViewSelector selector = new MaterializedViewSelector(selectStmt, analyzer); Deencapsulation.setField(selector, "isSPJQuery", false); - Deencapsulation.invoke(selector, "checkGrouping", tableAColumnNames, candidateIndexIdToSchema); + Deencapsulation.invoke(selector, "checkGrouping", table, tableAColumnNames, candidateIndexIdToSchema); Assert.assertEquals(2, candidateIndexIdToSchema.size()); Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(1))); Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(2))); @@ -257,6 +258,7 @@ public class MaterializedViewSelectorTest { @Test public void testCheckAggregationFunction(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer, + @Injectable OlapTable table, @Injectable MaterializedIndexMeta indexMeta1, @Injectable MaterializedIndexMeta indexMeta2, @Injectable MaterializedIndexMeta indexMeta3) { @@ -299,8 +301,8 @@ public class MaterializedViewSelectorTest { Set aggregatedColumnsInQueryOutput = Sets.newHashSet(); aggregatedColumnsInQueryOutput.add(functionCallExpr); Deencapsulation.setField(selector, "isSPJQuery", false); - Deencapsulation.invoke(selector, "checkAggregationFunction", aggregatedColumnsInQueryOutput, - candidateIndexIdToSchema); + Deencapsulation.invoke(selector, "checkAggregationFunction", table, aggregatedColumnsInQueryOutput, + candidateIndexIdToSchema); Assert.assertEquals(2, candidateIndexIdToSchema.size()); Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(1))); Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(3)));