[Feature][Materialized-View] support materialized view on vectorized engine (#10792)
This commit is contained in:
1
.gitattributes
vendored
1
.gitattributes
vendored
@ -14,3 +14,4 @@
|
||||
*.thrift text eol=lf
|
||||
*.proto text eol=lf
|
||||
*.conf text eol=lf
|
||||
*.out text eol=lf -diff
|
||||
|
||||
@ -407,44 +407,4 @@ void HllSetResolver::parse() {
|
||||
}
|
||||
}
|
||||
|
||||
void HllSetHelper::set_sparse(char* result, const std::map<int, uint8_t>& index_to_value,
|
||||
int& len) {
|
||||
result[0] = HLL_DATA_SPARSE;
|
||||
len = sizeof(HllSetResolver::SetTypeValueType) + sizeof(HllSetResolver::SparseLengthValueType);
|
||||
char* write_value_pos = result + len;
|
||||
for (auto iter = index_to_value.begin(); iter != index_to_value.end(); ++iter) {
|
||||
write_value_pos[0] = (char)(iter->first & 0xff);
|
||||
write_value_pos[1] = (char)(iter->first >> 8 & 0xff);
|
||||
write_value_pos[2] = iter->second;
|
||||
write_value_pos += 3;
|
||||
}
|
||||
int registers_count = index_to_value.size();
|
||||
len += registers_count *
|
||||
(sizeof(HllSetResolver::SparseIndexType) + sizeof(HllSetResolver::SparseValueType));
|
||||
*(int*)(result + 1) = registers_count;
|
||||
}
|
||||
|
||||
void HllSetHelper::set_explicit(char* result, const std::set<uint64_t>& hash_value_set, int& len) {
|
||||
result[0] = HLL_DATA_EXPLICIT;
|
||||
result[1] = (HllSetResolver::ExplicitLengthValueType)(hash_value_set.size());
|
||||
len = sizeof(HllSetResolver::SetTypeValueType) +
|
||||
sizeof(HllSetResolver::ExplicitLengthValueType);
|
||||
char* write_pos = result + len;
|
||||
for (auto iter = hash_value_set.begin(); iter != hash_value_set.end(); ++iter) {
|
||||
uint64_t hash_value = *iter;
|
||||
*(uint64_t*)write_pos = hash_value;
|
||||
write_pos += 8;
|
||||
}
|
||||
len += sizeof(uint64_t) * hash_value_set.size();
|
||||
}
|
||||
|
||||
void HllSetHelper::set_full(char* result, const std::map<int, uint8_t>& index_to_value,
|
||||
const int registers_len, int& len) {
|
||||
result[0] = HLL_DATA_FULL;
|
||||
for (auto iter = index_to_value.begin(); iter != index_to_value.end(); ++iter) {
|
||||
result[1 + iter->first] = iter->second;
|
||||
}
|
||||
len = registers_len + sizeof(HllSetResolver::SetTypeValueType);
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -370,13 +370,4 @@ private:
|
||||
SparseLengthValueType* _sparse_count;
|
||||
};
|
||||
|
||||
// todo(kks): remove this when dpp_sink class was removed
|
||||
class HllSetHelper {
|
||||
public:
|
||||
static void set_sparse(char* result, const std::map<int, uint8_t>& index_to_value, int& len);
|
||||
static void set_explicit(char* result, const std::set<uint64_t>& hash_value_set, int& len);
|
||||
static void set_full(char* result, const std::map<int, uint8_t>& index_to_value,
|
||||
const int set_len, int& len);
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -1001,16 +1001,15 @@ uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro
|
||||
|
||||
Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
|
||||
std::vector<rowid_t>& rowid_vector,
|
||||
uint16_t* sel_rowid_idx, size_t select_size,
|
||||
vectorized::MutableColumns* mutable_columns) {
|
||||
uint16_t* sel_rowid_idx, size_t select_size) {
|
||||
SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
|
||||
std::vector<rowid_t> rowids(select_size);
|
||||
for (size_t i = 0; i < select_size; ++i) {
|
||||
rowids[i] = rowid_vector[sel_rowid_idx[i]];
|
||||
}
|
||||
for (auto cid : read_column_ids) {
|
||||
auto& column = (*mutable_columns)[cid];
|
||||
RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size, column));
|
||||
RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size,
|
||||
_current_return_columns[cid]));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -1117,8 +1116,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
|
||||
|
||||
// step3: read non_predicate column
|
||||
RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, _block_rowids,
|
||||
sel_rowid_idx, selected_size,
|
||||
&_current_return_columns));
|
||||
sel_rowid_idx, selected_size));
|
||||
|
||||
// step4: output columns
|
||||
// 4.1 output non-predicate column
|
||||
|
||||
@ -116,7 +116,7 @@ private:
|
||||
void _output_non_pred_columns(vectorized::Block* block);
|
||||
Status _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
|
||||
std::vector<rowid_t>& rowid_vector, uint16_t* sel_rowid_idx,
|
||||
size_t select_size, vectorized::MutableColumns* mutable_columns);
|
||||
size_t select_size);
|
||||
|
||||
template <class Container>
|
||||
Status _output_column_by_sel_idx(vectorized::Block* block, const Container& column_ids,
|
||||
|
||||
@ -785,22 +785,11 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block,
|
||||
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
|
||||
}
|
||||
|
||||
// material-view or rollup task will fail now
|
||||
if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) {
|
||||
return Status::NotSupported(
|
||||
"_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup "
|
||||
"not supported now. ");
|
||||
}
|
||||
|
||||
std::vector<bool> nullable_tuples;
|
||||
for (int i = 0; i < ref_block->columns(); i++) {
|
||||
nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable());
|
||||
}
|
||||
|
||||
ObjectPool pool;
|
||||
RuntimeState* state = pool.add(new RuntimeState());
|
||||
state->set_desc_tbl(&_desc_tbl);
|
||||
RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples);
|
||||
RowDescriptor row_desc =
|
||||
RowDescriptor(_desc_tbl.get_tuple_descriptor(_desc_tbl.get_row_tuples()[0]), false);
|
||||
|
||||
const int row_size = ref_block->rows();
|
||||
const int column_size = new_block->columns();
|
||||
@ -811,10 +800,6 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block,
|
||||
for (int idx = 0; idx < column_size; idx++) {
|
||||
int ref_idx = _schema_mapping[idx].ref_column;
|
||||
|
||||
if (!_schema_mapping[idx].materialized_function.empty()) {
|
||||
return Status::NotSupported("Materialized function not supported now. ");
|
||||
}
|
||||
|
||||
if (ref_idx < 0) {
|
||||
// new column, write default value
|
||||
auto value = _schema_mapping[idx].default_value;
|
||||
@ -1547,15 +1532,14 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
|
||||
rowset_reader->next_block(ref_block.get());
|
||||
while (ref_block->rows()) {
|
||||
RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
|
||||
if (!_mem_tracker->check_limit(config::memory_limitation_per_thread_for_schema_change_bytes,
|
||||
new_block->allocated_bytes())) {
|
||||
if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) {
|
||||
RETURN_IF_ERROR(create_rowset());
|
||||
|
||||
if (!_mem_tracker->check_limit(
|
||||
config::memory_limitation_per_thread_for_schema_change_bytes,
|
||||
new_block->allocated_bytes())) {
|
||||
if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) {
|
||||
LOG(WARNING) << "Memory limitation is too small for Schema Change."
|
||||
<< "memory_limitation=" << _memory_limitation;
|
||||
<< " _memory_limitation=" << _memory_limitation
|
||||
<< ", new_block->allocated_bytes()=" << new_block->allocated_bytes()
|
||||
<< ", consumption=" << _mem_tracker->consumption();
|
||||
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
|
||||
}
|
||||
}
|
||||
@ -1649,9 +1633,8 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
|
||||
rs_readers.push_back(rs_reader);
|
||||
}
|
||||
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
|
||||
auto max_version_rowset = src_rowsets.back();
|
||||
const TabletSchema* cur_tablet_schema =
|
||||
max_version_rowset->rowset_meta()->tablet_schema().get();
|
||||
src_rowsets.back()->rowset_meta()->tablet_schema().get();
|
||||
if (cur_tablet_schema == nullptr) {
|
||||
cur_tablet_schema = new_tablet->tablet_schema().get();
|
||||
}
|
||||
@ -1680,6 +1663,12 @@ Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
|
||||
rs_readers.push_back(rs_reader);
|
||||
}
|
||||
|
||||
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
|
||||
auto cur_tablet_schema = src_rowsets.back()->rowset_meta()->tablet_schema();
|
||||
if (cur_tablet_schema == nullptr) {
|
||||
cur_tablet_schema = new_tablet->tablet_schema();
|
||||
}
|
||||
|
||||
Merger::Statistics stats;
|
||||
RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, READER_ALTER_TABLE,
|
||||
new_tablet->tablet_schema().get(), rs_readers,
|
||||
@ -1717,6 +1706,7 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req
|
||||
|
||||
std::shared_mutex SchemaChangeHandler::_mutex;
|
||||
std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting;
|
||||
std::set<std::string> SchemaChangeHandler::_supported_functions = {"hll_hash", "to_bitmap"};
|
||||
|
||||
// In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished
|
||||
// It will cost a lot of time to wait and the task is very difficult to understand.
|
||||
@ -1848,7 +1838,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
|
||||
LOG(WARNING) << "New tablet has a version " << pair.first
|
||||
<< " crossing base tablet's max_version="
|
||||
<< max_rowset->end_version();
|
||||
Status::OLAPInternalError(OLAP_ERR_VERSION_ALREADY_MERGED);
|
||||
return Status::OLAPInternalError(OLAP_ERR_VERSION_ALREADY_MERGED);
|
||||
}
|
||||
}
|
||||
std::vector<RowsetSharedPtr> empty_vec;
|
||||
@ -1949,9 +1939,14 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
|
||||
if (item.__isset.mv_expr) {
|
||||
if (item.mv_expr.nodes[0].node_type == TExprNodeType::FUNCTION_CALL) {
|
||||
mv_param.mv_expr = item.mv_expr.nodes[0].fn.name.function_name;
|
||||
if (!_supported_functions.count(mv_param.mv_expr)) {
|
||||
return Status::NotSupported("Unknow materialized view expr " +
|
||||
mv_param.mv_expr);
|
||||
}
|
||||
} else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) {
|
||||
mv_param.mv_expr = "count_field";
|
||||
}
|
||||
|
||||
mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
|
||||
}
|
||||
sc_params.materialized_params_map.insert(
|
||||
@ -2152,6 +2147,7 @@ Status SchemaChangeHandler::_parse_request(
|
||||
const TabletColumn& new_column = new_tablet->tablet_schema()->column(i);
|
||||
const string& column_name = new_column.name();
|
||||
ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i);
|
||||
column_mapping->new_column = &new_column;
|
||||
|
||||
if (new_column.has_reference_column()) {
|
||||
int32_t column_index = base_tablet_schema->field_index(new_column.referenced_column());
|
||||
|
||||
@ -328,6 +328,7 @@ private:
|
||||
|
||||
static std::shared_mutex _mutex;
|
||||
static std::unordered_set<int64_t> _tablet_ids_in_converting;
|
||||
static std::set<std::string> _supported_functions;
|
||||
};
|
||||
|
||||
using RowBlockDeleter = std::function<void(RowBlock*)>;
|
||||
|
||||
@ -392,11 +392,6 @@ public:
|
||||
RowDescriptor(const DescriptorTbl& desc_tbl, const std::vector<TTupleId>& row_tuples,
|
||||
const std::vector<bool>& nullable_tuples);
|
||||
|
||||
static RowDescriptor create_default(const DescriptorTbl& desc_tbl,
|
||||
const std::vector<bool>& nullable_tuples) {
|
||||
return RowDescriptor(desc_tbl, desc_tbl.get_row_tuples(), nullable_tuples);
|
||||
}
|
||||
|
||||
// standard copy c'tor, made explicit here
|
||||
RowDescriptor(const RowDescriptor& desc)
|
||||
: _tuple_desc_map(desc._tuple_desc_map),
|
||||
|
||||
@ -209,7 +209,6 @@ struct PrimitiveTypeTraits<TYPE_HLL> {
|
||||
using ColumnType = vectorized::ColumnString;
|
||||
};
|
||||
|
||||
// only for adapt get_predicate_column_ptr
|
||||
template <PrimitiveType type>
|
||||
struct PredicatePrimitiveTypeTraits {
|
||||
using PredicateFieldType = typename PrimitiveTypeTraits<type>::CppType;
|
||||
|
||||
@ -62,41 +62,30 @@ public:
|
||||
data.push_back(*reinterpret_cast<const T*>(pos));
|
||||
}
|
||||
|
||||
void insert_many_binary_data(char* data_array, uint32_t* len_array,
|
||||
uint32_t* start_offset_array, size_t num) override {
|
||||
void insert_binary_data(const char* pos, size_t length) {
|
||||
insert_default();
|
||||
T* pvalue = &get_element(size() - 1);
|
||||
if (!length) {
|
||||
*pvalue = *reinterpret_cast<const T*>(pos);
|
||||
return;
|
||||
}
|
||||
|
||||
if constexpr (std::is_same_v<T, BitmapValue>) {
|
||||
for (size_t i = 0; i < num; i++) {
|
||||
uint32_t len = len_array[i];
|
||||
uint32_t start_offset = start_offset_array[i];
|
||||
insert_default();
|
||||
BitmapValue* pvalue = &get_element(size() - 1);
|
||||
if (len != 0) {
|
||||
BitmapValue value;
|
||||
value.deserialize(data_array + start_offset);
|
||||
*pvalue = std::move(value);
|
||||
} else {
|
||||
*pvalue = std::move(*reinterpret_cast<BitmapValue*>(data_array + start_offset));
|
||||
}
|
||||
}
|
||||
pvalue->deserialize(pos);
|
||||
} else if constexpr (std::is_same_v<T, HyperLogLog>) {
|
||||
for (size_t i = 0; i < num; i++) {
|
||||
uint32_t len = len_array[i];
|
||||
uint32_t start_offset = start_offset_array[i];
|
||||
insert_default();
|
||||
HyperLogLog* pvalue = &get_element(size() - 1);
|
||||
if (len != 0) {
|
||||
HyperLogLog value;
|
||||
value.deserialize(Slice(data_array + start_offset, len));
|
||||
*pvalue = std::move(value);
|
||||
} else {
|
||||
*pvalue = std::move(*reinterpret_cast<HyperLogLog*>(data_array + start_offset));
|
||||
}
|
||||
}
|
||||
pvalue->deserialize(Slice(pos, length));
|
||||
} else {
|
||||
LOG(FATAL) << "Unexpected type in column complex";
|
||||
}
|
||||
}
|
||||
|
||||
void insert_many_binary_data(char* data_array, uint32_t* len_array,
|
||||
uint32_t* start_offset_array, size_t num) override {
|
||||
for (size_t i = 0; i < num; i++) {
|
||||
insert_binary_data(data_array + start_offset_array[i], len_array[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void insert_default() override { data.push_back(T()); }
|
||||
|
||||
void insert_many_defaults(size_t length) override {
|
||||
@ -299,10 +288,7 @@ template <typename T>
|
||||
ColumnPtr ColumnComplexType<T>::permute(const IColumn::Permutation& perm, size_t limit) const {
|
||||
size_t size = data.size();
|
||||
|
||||
if (limit == 0)
|
||||
limit = size;
|
||||
else
|
||||
limit = std::min(size, limit);
|
||||
limit = limit ? std::min(size, limit) : size;
|
||||
|
||||
if (perm.size() < limit) {
|
||||
LOG(FATAL) << "Size of permutation is less than required.";
|
||||
|
||||
@ -252,9 +252,13 @@ Status VOlapScanner::_init_return_columns(bool need_seq_col) {
|
||||
if (!slot->is_materialized()) {
|
||||
continue;
|
||||
}
|
||||
int32_t index = slot->col_unique_id() >= 0
|
||||
? _tablet_schema.field_index(slot->col_unique_id())
|
||||
: _tablet_schema.field_index(slot->col_name());
|
||||
|
||||
int32_t index = _tablet_schema.field_index(slot->col_unique_id());
|
||||
if (index < 0) {
|
||||
// rollup/materialized view should use col_name to find index
|
||||
index = _tablet_schema.field_index(slot->col_name());
|
||||
}
|
||||
|
||||
if (index < 0) {
|
||||
std::stringstream ss;
|
||||
ss << "field name is invalid. field=" << slot->col_name();
|
||||
@ -262,8 +266,9 @@ Status VOlapScanner::_init_return_columns(bool need_seq_col) {
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
_return_columns.push_back(index);
|
||||
if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable())
|
||||
if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable()) {
|
||||
_tablet_columns_convert_to_null_set.emplace(index);
|
||||
}
|
||||
}
|
||||
|
||||
// expand the sequence column
|
||||
|
||||
@ -138,10 +138,10 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
super(JobType.ROLLUP);
|
||||
}
|
||||
|
||||
public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs,
|
||||
long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName,
|
||||
List<Column> rollupSchema, int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType,
|
||||
short rollupShortKeyColumnCount, OriginStatement origStmt) {
|
||||
public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId,
|
||||
long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema,
|
||||
int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, short rollupShortKeyColumnCount,
|
||||
OriginStatement origStmt) {
|
||||
super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs);
|
||||
|
||||
this.baseIndexId = baseIndexId;
|
||||
@ -150,6 +150,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
this.rollupIndexName = rollupIndexName;
|
||||
|
||||
this.rollupSchema = rollupSchema;
|
||||
|
||||
this.baseSchemaHash = baseSchemaHash;
|
||||
this.rollupSchemaHash = rollupSchemaHash;
|
||||
this.rollupKeysType = rollupKeysType;
|
||||
@ -376,8 +377,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
|
||||
List<Column> fullSchema = tbl.getBaseSchema(true);
|
||||
DescriptorTable descTable = new DescriptorTable();
|
||||
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
|
||||
for (Column column : fullSchema) {
|
||||
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
|
||||
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
|
||||
destSlotDesc.setIsMaterialized(true);
|
||||
destSlotDesc.setColumn(column);
|
||||
|
||||
@ -416,8 +416,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
|
||||
List<Column> fullSchema = tbl.getBaseSchema(true);
|
||||
DescriptorTable descTable = new DescriptorTable();
|
||||
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
|
||||
for (Column column : fullSchema) {
|
||||
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
|
||||
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
|
||||
destSlotDesc.setIsMaterialized(true);
|
||||
destSlotDesc.setColumn(column);
|
||||
@ -648,7 +648,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
tbl.setStorageFormat(storageFormat);
|
||||
}
|
||||
|
||||
//update max column unique id
|
||||
// update max column unique id
|
||||
int maxColUniqueId = tbl.getMaxColUniqueId();
|
||||
for (Column column : tbl.getFullSchema()) {
|
||||
if (column.getUniqueId() > maxColUniqueId) {
|
||||
|
||||
@ -139,7 +139,7 @@ public class CreateMaterializedViewStmt extends DdlStmt {
|
||||
if (selectStmt.getAggInfo() != null) {
|
||||
mvKeysType = KeysType.AGG_KEYS;
|
||||
}
|
||||
analyzeSelectClause();
|
||||
analyzeSelectClause(analyzer);
|
||||
analyzeFromClause();
|
||||
if (selectStmt.getWhereClause() != null) {
|
||||
throw new AnalysisException("The where clause is not supported in add materialized view clause, expr:"
|
||||
@ -156,7 +156,7 @@ public class CreateMaterializedViewStmt extends DdlStmt {
|
||||
}
|
||||
}
|
||||
|
||||
public void analyzeSelectClause() throws AnalysisException {
|
||||
public void analyzeSelectClause(Analyzer analyzer) throws AnalysisException {
|
||||
SelectList selectList = selectStmt.getSelectList();
|
||||
if (selectList.getItems().isEmpty()) {
|
||||
throw new AnalysisException("The materialized view must contain at least one column");
|
||||
@ -222,7 +222,7 @@ public class CreateMaterializedViewStmt extends DdlStmt {
|
||||
}
|
||||
meetAggregate = true;
|
||||
// build mv column item
|
||||
mvColumnItemList.add(buildMVColumnItem(functionCallExpr));
|
||||
mvColumnItemList.add(buildMVColumnItem(analyzer, functionCallExpr));
|
||||
// TODO(ml): support REPLACE, REPLACE_IF_NOT_NULL, bitmap_union, hll_union only for aggregate table
|
||||
// TODO(ml): support different type of column, int -> bigint(sum)
|
||||
}
|
||||
@ -347,7 +347,8 @@ public class CreateMaterializedViewStmt extends DdlStmt {
|
||||
}
|
||||
}
|
||||
|
||||
private MVColumnItem buildMVColumnItem(FunctionCallExpr functionCallExpr) throws AnalysisException {
|
||||
private MVColumnItem buildMVColumnItem(Analyzer analyzer, FunctionCallExpr functionCallExpr)
|
||||
throws AnalysisException {
|
||||
String functionName = functionCallExpr.getFnName().getFunction();
|
||||
List<SlotRef> slots = new ArrayList<>();
|
||||
functionCallExpr.collect(SlotRef.class, slots);
|
||||
@ -399,6 +400,9 @@ public class CreateMaterializedViewStmt extends DdlStmt {
|
||||
mvColumnName = baseColumnName;
|
||||
} else {
|
||||
mvColumnName = mvColumnBuilder(functionName, baseColumnName);
|
||||
if (!functionChild0.getType().isStringType()) {
|
||||
functionChild0.uncheckedCastChild(Type.VARCHAR, 0);
|
||||
}
|
||||
defineExpr = functionChild0;
|
||||
}
|
||||
mvAggregateType = AggregateType.valueOf(functionName.toUpperCase());
|
||||
@ -410,6 +414,7 @@ public class CreateMaterializedViewStmt extends DdlStmt {
|
||||
defineExpr = new CaseExpr(null, Lists.newArrayList(new CaseWhenClause(
|
||||
new IsNullPredicate(baseColumnRef, false),
|
||||
new IntLiteral(0, Type.BIGINT))), new IntLiteral(1, Type.BIGINT));
|
||||
defineExpr.analyze(analyzer);
|
||||
type = Type.BIGINT;
|
||||
break;
|
||||
default:
|
||||
|
||||
@ -33,8 +33,6 @@ import org.apache.doris.rewrite.ExprRewriter;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Rewrite count(k1) to sum(mv_count_k1) when MV Column exists.
|
||||
* For example:
|
||||
@ -92,9 +90,7 @@ public class CountFieldToSum implements ExprRewriteRule {
|
||||
// exception to Unknown column, because we can't find an alias which named as origin table name that has
|
||||
// required column.
|
||||
SlotRef mvSlotRef = new SlotRef(null, mvColumn.getName());
|
||||
List<Expr> newFnParams = Lists.newArrayList();
|
||||
newFnParams.add(mvSlotRef);
|
||||
FunctionCallExpr result = new FunctionCallExpr("sum", newFnParams);
|
||||
FunctionCallExpr result = new FunctionCallExpr("sum", Lists.newArrayList(mvSlotRef));
|
||||
result.analyzeNoThrow(analyzer);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -106,8 +106,7 @@ public class HLLHashToSlotRefRule implements ExprRewriteRule {
|
||||
TableName tableName = queryColumnSlotRef.getTableName();
|
||||
Preconditions.checkNotNull(tableName);
|
||||
SlotRef mvSlotRef = new SlotRef(tableName, mvColumn.getName());
|
||||
List<Expr> newFnParams = Lists.newArrayList();
|
||||
newFnParams.add(mvSlotRef);
|
||||
List<Expr> newFnParams = Lists.newArrayList(mvSlotRef);
|
||||
FunctionCallExpr result = new FunctionCallExpr(fnName, newFnParams);
|
||||
result.analyzeNoThrow(analyzer);
|
||||
return result;
|
||||
|
||||
@ -314,8 +314,8 @@ public class RollupJobV2Test {
|
||||
|
||||
|
||||
@Test
|
||||
public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt) throws IOException,
|
||||
AnalysisException {
|
||||
public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt)
|
||||
throws IOException {
|
||||
// prepare file
|
||||
File file = new File(fileName);
|
||||
file.createNewFile();
|
||||
@ -326,6 +326,7 @@ public class RollupJobV2Test {
|
||||
String mvColumnName = CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX + "to_bitmap_" + "c1";
|
||||
Column column = new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", "");
|
||||
columns.add(column);
|
||||
|
||||
RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, 1, 1,
|
||||
KeysType.AGG_KEYS, keysCount,
|
||||
new OriginStatement("create materialized view rollup as select bitmap_union(to_bitmap(c1)) from test",
|
||||
|
||||
@ -1167,7 +1167,8 @@ public class CreateMaterializedViewStmtTest {
|
||||
result = Type.LARGEINT;
|
||||
}
|
||||
};
|
||||
MVColumnItem mvColumnItem = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr);
|
||||
MVColumnItem mvColumnItem = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer,
|
||||
functionCallExpr);
|
||||
Assert.assertEquals(Type.LARGEINT, mvColumnItem.getType());
|
||||
|
||||
SlotRef slotRef2 = new SlotRef(new TableName(internalCtl, "db", "table"), "a");
|
||||
@ -1183,7 +1184,8 @@ public class CreateMaterializedViewStmtTest {
|
||||
result = Type.BIGINT;
|
||||
}
|
||||
};
|
||||
MVColumnItem mvColumnItem2 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr2);
|
||||
MVColumnItem mvColumnItem2 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer,
|
||||
functionCallExpr2);
|
||||
Assert.assertEquals(Type.BIGINT, mvColumnItem2.getType());
|
||||
|
||||
SlotRef slotRef3 = new SlotRef(new TableName(internalCtl, "db", "table"), "a");
|
||||
@ -1199,7 +1201,8 @@ public class CreateMaterializedViewStmtTest {
|
||||
result = Type.VARCHAR;
|
||||
}
|
||||
};
|
||||
MVColumnItem mvColumnItem3 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr3);
|
||||
MVColumnItem mvColumnItem3 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer,
|
||||
functionCallExpr3);
|
||||
Assert.assertEquals(Type.VARCHAR, mvColumnItem3.getType());
|
||||
|
||||
SlotRef slotRef4 = new SlotRef(new TableName(internalCtl, "db", "table"), "a");
|
||||
@ -1215,7 +1218,8 @@ public class CreateMaterializedViewStmtTest {
|
||||
result = Type.DOUBLE;
|
||||
}
|
||||
};
|
||||
MVColumnItem mvColumnItem4 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr4);
|
||||
MVColumnItem mvColumnItem4 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer,
|
||||
functionCallExpr4);
|
||||
Assert.assertEquals(Type.DOUBLE, mvColumnItem4.getType());
|
||||
|
||||
}
|
||||
@ -1242,7 +1246,8 @@ public class CreateMaterializedViewStmtTest {
|
||||
result = ScalarType.createVarchar(50);
|
||||
}
|
||||
};
|
||||
MVColumnItem mvColumnItem = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr);
|
||||
MVColumnItem mvColumnItem = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer,
|
||||
functionCallExpr);
|
||||
Assert.assertEquals(50, mvColumnItem.getType().getLength());
|
||||
|
||||
SlotRef slotRef2 = new SlotRef(new TableName(internalCtl, "db", "table"), "a");
|
||||
@ -1258,7 +1263,8 @@ public class CreateMaterializedViewStmtTest {
|
||||
result = ScalarType.createDecimalType(10, 1);
|
||||
}
|
||||
};
|
||||
MVColumnItem mvColumnItem2 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr2);
|
||||
MVColumnItem mvColumnItem2 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer,
|
||||
functionCallExpr2);
|
||||
Assert.assertEquals(new Integer(10), mvColumnItem2.getType().getPrecision());
|
||||
Assert.assertEquals(1, ((ScalarType) mvColumnItem2.getType()).getScalarScale());
|
||||
|
||||
@ -1275,7 +1281,8 @@ public class CreateMaterializedViewStmtTest {
|
||||
result = ScalarType.createChar(5);
|
||||
}
|
||||
};
|
||||
MVColumnItem mvColumnItem3 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr3);
|
||||
MVColumnItem mvColumnItem3 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer,
|
||||
functionCallExpr3);
|
||||
Assert.assertEquals(5, mvColumnItem3.getType().getLength());
|
||||
}
|
||||
}
|
||||
|
||||
Binary file not shown.
@ -105,10 +105,16 @@ suite("test_materialized_view", "rollup") {
|
||||
}
|
||||
}
|
||||
sql "SELECT store_id, count(sale_amt) FROM ${tbName1} GROUP BY store_id;"
|
||||
qt_sql "DESC ${tbName1} ALL;"
|
||||
|
||||
qt_sql "SELECT store_id, count(sale_amt) FROM ${tbName1} GROUP BY store_id;"
|
||||
|
||||
explain {
|
||||
sql("SELECT store_id, count(sale_amt) FROM ${tbName1} GROUP BY store_id;")
|
||||
contains "(amt_count)"
|
||||
}
|
||||
|
||||
sql "DROP TABLE ${tbName1} FORCE;"
|
||||
sql "DROP TABLE ${tbName2} FORCE;"
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user