[feature](agg) add the aggregation function 'mag_agg' (#22043)

New aggregation function: map_agg.

This function requires two arguments: a key and a value, which are used to build a map.

select map_agg(column1, column2) from t group by column3;
This commit is contained in:
Jerry Hu
2023-07-25 11:21:03 +08:00
committed by GitHub
parent 6a03a612a0
commit b41fcbb783
13 changed files with 797 additions and 10 deletions

View File

@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/aggregate_functions/aggregate_function_map.h"
#include "runtime/primitive_type.h"
#include "vec/aggregate_functions/helpers.h"
namespace doris::vectorized {
template <typename K>
AggregateFunctionPtr create_agg_function_map_agg(const DataTypes& argument_types,
const bool result_is_nullable) {
return creator_without_type::create_ignore_nullable<
AggregateFunctionMapAgg<AggregateFunctionMapAggData<K>, K>>(argument_types,
result_is_nullable);
}
AggregateFunctionPtr create_aggregate_function_map_agg(const std::string& name,
const DataTypes& argument_types,
const bool result_is_nullable) {
WhichDataType type(remove_nullable(argument_types[0]));
#define DISPATCH(TYPE) \
if (type.idx == TypeIndex::TYPE) \
return create_agg_function_map_agg<TYPE>(argument_types, result_is_nullable);
FOR_NUMERIC_TYPES(DISPATCH)
FOR_DECIMAL_TYPES(DISPATCH)
#undef DISPATCH
if (type.idx == TypeIndex::String) {
return create_agg_function_map_agg<String>(argument_types, result_is_nullable);
}
if (type.idx == TypeIndex::DateTime || type.idx == TypeIndex::Date) {
return create_agg_function_map_agg<Int64>(argument_types, result_is_nullable);
}
if (type.idx == TypeIndex::DateV2) {
return create_agg_function_map_agg<UInt32>(argument_types, result_is_nullable);
}
if (type.idx == TypeIndex::DateTimeV2) {
return create_agg_function_map_agg<UInt64>(argument_types, result_is_nullable);
}
LOG(WARNING) << fmt::format("unsupported input type {} for aggregate function {}",
argument_types[0]->get_name(), name);
return nullptr;
}
void register_aggregate_function_map_agg(AggregateFunctionSimpleFactory& factory) {
factory.register_function_both("map_agg", create_aggregate_function_map_agg);
}
} // namespace doris::vectorized

View File

@ -0,0 +1,314 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <parallel_hashmap/phmap.h>
#include <string.h>
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/columns/column_decimal.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_string.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/hash_table_key_holder.h"
#include "vec/common/string_ref.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_map.h"
#include "vec/io/io_helper.h"
namespace doris::vectorized {
template <typename K>
struct AggregateFunctionMapAggData {
using KeyType = std::conditional_t<std::is_same_v<K, String>, StringRef, K>;
using Map = phmap::flat_hash_map<StringRef, int64_t>;
AggregateFunctionMapAggData() { __builtin_unreachable(); }
AggregateFunctionMapAggData(const DataTypes& argument_types) {
_key_type = remove_nullable(argument_types[0]);
_value_type = make_nullable(argument_types[1]);
_key_column = _key_type->create_column();
_value_column = _value_type->create_column();
}
void reset() {
_map.clear();
_key_column->clear();
_value_column->clear();
}
void add(const StringRef& key, const StringRef& value) {
DCHECK(key.data != nullptr);
if (UNLIKELY(_map.find(key) != _map.end())) {
return;
}
ArenaKeyHolder key_holder {key, _arena};
if (key.size > 0) {
key_holder_persist_key(key_holder);
}
_map.emplace(key_holder.key, _key_column->size());
_key_column->insert_data(key_holder.key.data, key_holder.key.size);
_value_column->insert_data(value.data, value.size);
}
void merge(const AggregateFunctionMapAggData& other) {
const size_t num_rows = other._key_column->size();
if (num_rows == 0) {
return;
}
auto& other_key_column = assert_cast<KeyColumnType&>(*other._key_column);
for (size_t i = 0; i != num_rows; ++i) {
auto key = static_cast<KeyColumnType&>(other_key_column).get_data_at(i);
if (_map.find(key) != _map.cend()) {
continue;
}
ArenaKeyHolder key_holder {key, _arena};
if (key.size > 0) {
key_holder_persist_key(key_holder);
}
_map.emplace(key_holder.key, _key_column->size());
static_cast<KeyColumnType&>(*_key_column)
.insert_data(key_holder.key.data, key_holder.key.size);
auto value = other._value_column->get_data_at(i);
_value_column->insert_data(value.data, value.size);
}
}
static void serialize(BufferWritable& buf, const IColumn& key_column,
const IColumn& value_column, const DataTypePtr& key_type,
const DataTypePtr& value_type) {
size_t element_number = key_column.size();
write_binary(element_number, buf);
DCHECK(!key_column.is_nullable());
DCHECK(!key_type->is_nullable());
DCHECK(value_column.is_nullable());
DCHECK(value_type->is_nullable());
if (element_number > 0) {
size_t serialized_size = key_type->get_uncompressed_serialized_bytes(key_column, 0);
serialized_size += value_type->get_uncompressed_serialized_bytes(value_column, 0);
std::string serialized_buffer;
serialized_buffer.resize(serialized_size);
auto* serialized_data = serialized_buffer.data();
serialized_data = key_type->serialize(key_column, serialized_data, 0);
value_type->serialize(value_column, serialized_data, 0);
write_binary(serialized_size, buf);
buf.write(serialized_buffer.data(), serialized_buffer.size());
}
}
void write(BufferWritable& buf) const {
serialize(buf, *_key_column, *_value_column, _key_type, _value_type);
}
void read(BufferReadable& buf) {
size_t element_number = 0;
read_binary(element_number, buf);
if (element_number > 0) {
_map.reserve(element_number);
size_t serialized_size;
read_binary(serialized_size, buf);
std::string serialized_buffer;
serialized_buffer.resize(serialized_size);
buf.read(serialized_buffer.data(), serialized_size);
const auto* serialized_data = serialized_buffer.data();
serialized_data = _key_type->deserialize(serialized_data, _key_column.get(), 0);
_value_type->deserialize(serialized_data, _value_column.get(), 0);
DCHECK_EQ(element_number, _key_column->size());
DCHECK_EQ(element_number, _value_column->size());
for (size_t i = 0; i != element_number; ++i) {
auto key = static_cast<KeyColumnType&>(*_key_column).get_data_at(i);
DCHECK(_map.find(key) == _map.cend());
_map.emplace(key, i);
}
}
}
void insert_result_into(IColumn& to) const {
auto& dst = assert_cast<ColumnMap&>(to);
size_t num_rows = _key_column->size();
auto& offsets = dst.get_offsets();
auto& dst_key_column = assert_cast<ColumnNullable&>(dst.get_keys());
dst_key_column.get_null_map_data().resize_fill(dst_key_column.get_null_map_data().size() +
num_rows);
dst_key_column.get_nested_column().insert_range_from(*_key_column, 0, num_rows);
dst.get_values().insert_range_from(*_value_column, 0, num_rows);
if (offsets.size() == 0) {
offsets.push_back(num_rows);
} else {
offsets.push_back(offsets.back() + num_rows);
}
}
private:
using KeyColumnType =
std::conditional_t<std::is_same_v<String, K>, ColumnString, ColumnVectorOrDecimal<K>>;
Map _map;
Arena _arena;
IColumn::MutablePtr _key_column;
IColumn::MutablePtr _value_column;
DataTypePtr _key_type;
DataTypePtr _value_type;
};
template <typename Data, typename K>
class AggregateFunctionMapAgg final
: public IAggregateFunctionDataHelper<Data, AggregateFunctionMapAgg<Data, K>> {
public:
using KeyColumnType =
std::conditional_t<std::is_same_v<String, K>, ColumnString, ColumnVectorOrDecimal<K>>;
AggregateFunctionMapAgg() = default;
AggregateFunctionMapAgg(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionMapAgg<Data, K>>(
argument_types_) {}
std::string get_name() const override { return "map_agg"; }
DataTypePtr get_return_type() const override {
/// keys and values column of `ColumnMap` are always nullable.
return std::make_shared<DataTypeMap>(make_nullable(argument_types[0]),
make_nullable(argument_types[1]));
}
void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num,
Arena* arena) const override {
if (columns[0]->is_nullable()) {
auto& nullable_col = assert_cast<const ColumnNullable&>(*columns[0]);
auto& nullable_map = nullable_col.get_null_map_data();
if (nullable_map[row_num]) {
return;
}
this->data(place).add(
assert_cast<const KeyColumnType&>(nullable_col.get_nested_column())
.get_data_at(row_num),
columns[1]->get_data_at(row_num));
} else {
this->data(place).add(
assert_cast<const KeyColumnType&>(*columns[0]).get_data_at(row_num),
columns[1]->get_data_at(row_num));
}
}
void create(AggregateDataPtr __restrict place) const override {
new (place) Data(argument_types);
}
void reset(AggregateDataPtr place) const override { this->data(place).reset(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
Arena* arena) const override {
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
this->data(place).write(buf);
}
template <bool key_nullable, bool value_nullable>
void streaming_agg_serialize_to_column_impl(const size_t num_rows, const IColumn& key_column,
const IColumn& value_column,
const NullMap& null_map,
BufferWritable& writer) const {
auto& key_col = assert_cast<const KeyColumnType&>(key_column);
auto key_to_serialize = key_col.clone_empty();
auto val_to_serialize = value_column.clone_empty();
auto key_type = remove_nullable(argument_types[0]);
auto val_type = make_nullable(argument_types[1]);
for (size_t i = 0; i != num_rows; ++i) {
key_to_serialize->clear();
val_to_serialize->clear();
if constexpr (key_nullable) {
if (!null_map[i]) {
key_to_serialize->insert_range_from(key_col, i, 1);
val_to_serialize->insert_range_from(value_column, i, 1);
}
} else {
key_to_serialize->insert_range_from(key_col, i, 1);
val_to_serialize->insert_range_from(value_column, i, 1);
}
if constexpr (value_nullable) {
Data::serialize(writer, *key_to_serialize, *val_to_serialize, key_type, val_type);
} else {
auto nullable_value_col = make_nullable(val_to_serialize->assume_mutable(), false);
Data::serialize(writer, *key_to_serialize, *nullable_value_col, key_type, val_type);
val_to_serialize = value_column.clone_empty();
}
writer.commit();
}
}
void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst,
const size_t num_rows, Arena* arena) const override {
auto& col = assert_cast<ColumnString&>(*dst);
col.reserve(num_rows);
VectorBufferWriter writer(col);
if (columns[0]->is_nullable()) {
auto& nullable_col = assert_cast<const ColumnNullable&>(*columns[0]);
auto& null_map = nullable_col.get_null_map_data();
if (columns[0]->is_nullable()) {
this->streaming_agg_serialize_to_column_impl<true, true>(
num_rows, nullable_col.get_nested_column(), *columns[1], null_map, writer);
} else {
this->streaming_agg_serialize_to_column_impl<true, false>(
num_rows, nullable_col.get_nested_column(), *columns[1], null_map, writer);
}
} else {
if (columns[0]->is_nullable()) {
this->streaming_agg_serialize_to_column_impl<false, true>(num_rows, *columns[0],
*columns[1], {}, writer);
} else {
this->streaming_agg_serialize_to_column_impl<false, false>(num_rows, *columns[0],
*columns[1], {}, writer);
}
}
}
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena*) const override {
this->data(place).read(buf);
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
this->data(place).insert_result_into(to);
}
protected:
using IAggregateFunction::argument_types;
};
} // namespace doris::vectorized

View File

@ -59,6 +59,7 @@ void register_aggregate_function_avg_weighted(AggregateFunctionSimpleFactory& fa
void register_aggregate_function_histogram(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_count_old(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_sum_old(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_map_agg(AggregateFunctionSimpleFactory& factory);
AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() {
static std::once_flag oc;
@ -93,6 +94,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() {
register_aggregate_function_sequence_match(instance);
register_aggregate_function_avg_weighted(instance);
register_aggregate_function_histogram(instance);
register_aggregate_function_map_agg(instance);
register_aggregate_function_stddev_variance_samp(instance);
register_aggregate_function_replace_reader_load(instance);

View File

@ -76,6 +76,17 @@ struct creator_without_type {
}
return AggregateFunctionPtr(result);
}
/// AggregateFunctionTemplate will handle the nullable arguments, no need to use
/// AggregateFunctionNullVariadicInline/AggregateFunctionNullUnaryInline
template <typename AggregateFunctionTemplate, typename... TArgs>
static AggregateFunctionPtr create_ignore_nullable(const DataTypes& argument_types,
const bool /*result_is_nullable*/,
TArgs&&... args) {
IAggregateFunction* result(
new AggregateFunctionTemplate(std::forward<TArgs>(args)..., argument_types));
return AggregateFunctionPtr(result);
}
};
template <template <typename> class AggregateFunctionTemplate>

View File

@ -231,7 +231,7 @@ void AggFnEvaluator::destroy(AggregateDataPtr place) {
}
Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena* arena) {
RETURN_IF_ERROR(_calc_argment_columns(block));
RETURN_IF_ERROR(_calc_argument_columns(block));
SCOPED_TIMER(_exec_timer);
_function->add_batch_single_place(block->rows(), place, _agg_columns.data(), arena);
return Status::OK();
@ -239,7 +239,7 @@ Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place,
Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
Arena* arena, bool agg_many) {
RETURN_IF_ERROR(_calc_argment_columns(block));
RETURN_IF_ERROR(_calc_argument_columns(block));
SCOPED_TIMER(_exec_timer);
_function->add_batch(block->rows(), places, offset, _agg_columns.data(), arena, agg_many);
return Status::OK();
@ -247,7 +247,7 @@ Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateD
Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset,
AggregateDataPtr* places, Arena* arena) {
RETURN_IF_ERROR(_calc_argment_columns(block));
RETURN_IF_ERROR(_calc_argument_columns(block));
SCOPED_TIMER(_exec_timer);
_function->add_batch_selected(block->rows(), places, offset, _agg_columns.data(), arena);
return Status::OK();
@ -255,7 +255,7 @@ Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset,
Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf,
const size_t num_rows, Arena* arena) {
RETURN_IF_ERROR(_calc_argment_columns(block));
RETURN_IF_ERROR(_calc_argument_columns(block));
SCOPED_TIMER(_exec_timer);
_function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows, arena);
return Status::OK();
@ -263,7 +263,7 @@ Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf
Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
const size_t num_rows, Arena* arena) {
RETURN_IF_ERROR(_calc_argment_columns(block));
RETURN_IF_ERROR(_calc_argument_columns(block));
SCOPED_TIMER(_exec_timer);
_function->streaming_agg_serialize_to_column(_agg_columns.data(), dst, num_rows, arena);
return Status::OK();
@ -302,7 +302,7 @@ std::string AggFnEvaluator::debug_string() const {
return out.str();
}
Status AggFnEvaluator::_calc_argment_columns(Block* block) {
Status AggFnEvaluator::_calc_argument_columns(Block* block) {
SCOPED_TIMER(_expr_timer);
_agg_columns.resize(_input_exprs_ctxs.size());
int column_ids[_input_exprs_ctxs.size()];

View File

@ -108,7 +108,7 @@ private:
AggFnEvaluator(const TExprNode& desc);
Status _calc_argment_columns(Block* block);
Status _calc_argument_columns(Block* block);
DataTypes _argument_types_with_sort;
DataTypes _real_argument_types;

View File

@ -58,6 +58,9 @@ namespace doris::vectorized {
class FunctionArrayElement : public IFunction {
public:
/// The count of items in the map may exceed 128(Int8).
using MapIndiceDataType = DataTypeInt16;
static constexpr auto name = "element_at";
static FunctionPtr create() { return std::make_shared<FunctionArrayElement>(); }
@ -131,7 +134,7 @@ private:
ColumnPtr nested_ptr = make_nullable(column.get_data_ptr());
size_t rows = offsets.size();
// prepare return data
auto matched_indices = ColumnVector<Int8>::create();
auto matched_indices = ColumnVector<MapIndiceDataType::FieldType>::create();
matched_indices->reserve(rows);
for (size_t i = 0; i < rows; i++) {
@ -273,7 +276,7 @@ private:
if (!matched_indices) {
return nullptr;
}
DataTypePtr indices_type(std::make_shared<vectorized::DataTypeInt8>());
DataTypePtr indices_type(std::make_shared<MapIndiceDataType>());
ColumnWithTypeAndName indices(matched_indices, indices_type, "indices");
ColumnWithTypeAndName data(val_arr, val_type, "value");
ColumnsWithTypeAndName args = {data, indices};

View File

@ -0,0 +1,92 @@
---
{
"title": "MAP_AGG",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
## MAP_AGG
### description
#### Syntax
`MAP_AGG(expr1, expr2)`
Returns a map consists of expr1 as the key and expr2 as the corresponding value.
### example
```
MySQL > select `n_nationkey`, `n_name`, `n_regionkey` from `nation`;
+-------------+----------------+-------------+
| n_nationkey | n_name | n_regionkey |
+-------------+----------------+-------------+
| 0 | ALGERIA | 0 |
| 1 | ARGENTINA | 1 |
| 2 | BRAZIL | 1 |
| 3 | CANADA | 1 |
| 4 | EGYPT | 4 |
| 5 | ETHIOPIA | 0 |
| 6 | FRANCE | 3 |
| 7 | GERMANY | 3 |
| 8 | INDIA | 2 |
| 9 | INDONESIA | 2 |
| 10 | IRAN | 4 |
| 11 | IRAQ | 4 |
| 12 | JAPAN | 2 |
| 13 | JORDAN | 4 |
| 14 | KENYA | 0 |
| 15 | MOROCCO | 0 |
| 16 | MOZAMBIQUE | 0 |
| 17 | PERU | 1 |
| 18 | CHINA | 2 |
| 19 | ROMANIA | 3 |
| 20 | SAUDI ARABIA | 4 |
| 21 | VIETNAM | 2 |
| 22 | RUSSIA | 3 |
| 23 | UNITED KINGDOM | 3 |
| 24 | UNITED STATES | 1 |
+-------------+----------------+-------------+
MySQL > select `n_regionkey`, map_agg(`n_nationkey`, `n_name`) from `nation` group by `n_regionkey`;
+-------------+---------------------------------------------------------------------------+
| n_regionkey | map_agg(`n_nationkey`, `n_name`) |
+-------------+---------------------------------------------------------------------------+
| 1 | {1:"ARGENTINA", 2:"BRAZIL", 3:"CANADA", 17:"PERU", 24:"UNITED STATES"} |
| 0 | {0:"ALGERIA", 5:"ETHIOPIA", 14:"KENYA", 15:"MOROCCO", 16:"MOZAMBIQUE"} |
| 3 | {6:"FRANCE", 7:"GERMANY", 19:"ROMANIA", 22:"RUSSIA", 23:"UNITED KINGDOM"} |
| 4 | {4:"EGYPT", 10:"IRAN", 11:"IRAQ", 13:"JORDAN", 20:"SAUDI ARABIA"} |
| 2 | {8:"INDIA", 9:"INDONESIA", 12:"JAPAN", 18:"CHINA", 21:"VIETNAM"} |
+-------------+---------------------------------------------------------------------------+
MySQL > select n_regionkey, map_agg(`n_name`, `n_nationkey` % 5) from `nation` group by `n_regionkey`;
+-------------+------------------------------------------------------------------------+
| n_regionkey | map_agg(`n_name`, (`n_nationkey` % 5)) |
+-------------+------------------------------------------------------------------------+
| 2 | {"INDIA":3, "INDONESIA":4, "JAPAN":2, "CHINA":3, "VIETNAM":1} |
| 0 | {"ALGERIA":0, "ETHIOPIA":0, "KENYA":4, "MOROCCO":0, "MOZAMBIQUE":1} |
| 3 | {"FRANCE":1, "GERMANY":2, "ROMANIA":4, "RUSSIA":2, "UNITED KINGDOM":3} |
| 1 | {"ARGENTINA":1, "BRAZIL":2, "CANADA":3, "PERU":2, "UNITED STATES":4} |
| 4 | {"EGYPT":4, "IRAN":0, "IRAQ":1, "JORDAN":3, "SAUDI ARABIA":0} |
+-------------+------------------------------------------------------------------------+
```
### keywords
MAP_AGG

View File

@ -0,0 +1,91 @@
---
{
"title": "MAP_AGG",
"language": "zh-CN"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
## MAP_AGG
### description
#### Syntax
`MAP_AGG(expr1, expr2)`
返回一个 map, 由 expr1 作为键,expr2 作为对应的值。
### example
```
MySQL > select `n_nationkey`, `n_name`, `n_regionkey` from `nation`;
+-------------+----------------+-------------+
| n_nationkey | n_name | n_regionkey |
+-------------+----------------+-------------+
| 0 | ALGERIA | 0 |
| 1 | ARGENTINA | 1 |
| 2 | BRAZIL | 1 |
| 3 | CANADA | 1 |
| 4 | EGYPT | 4 |
| 5 | ETHIOPIA | 0 |
| 6 | FRANCE | 3 |
| 7 | GERMANY | 3 |
| 8 | INDIA | 2 |
| 9 | INDONESIA | 2 |
| 10 | IRAN | 4 |
| 11 | IRAQ | 4 |
| 12 | JAPAN | 2 |
| 13 | JORDAN | 4 |
| 14 | KENYA | 0 |
| 15 | MOROCCO | 0 |
| 16 | MOZAMBIQUE | 0 |
| 17 | PERU | 1 |
| 18 | CHINA | 2 |
| 19 | ROMANIA | 3 |
| 20 | SAUDI ARABIA | 4 |
| 21 | VIETNAM | 2 |
| 22 | RUSSIA | 3 |
| 23 | UNITED KINGDOM | 3 |
| 24 | UNITED STATES | 1 |
+-------------+----------------+-------------+
MySQL > select `n_regionkey`, map_agg(`n_nationkey`, `n_name`) from `nation` group by `n_regionkey`;
+-------------+---------------------------------------------------------------------------+
| n_regionkey | map_agg(`n_nationkey`, `n_name`) |
+-------------+---------------------------------------------------------------------------+
| 1 | {1:"ARGENTINA", 2:"BRAZIL", 3:"CANADA", 17:"PERU", 24:"UNITED STATES"} |
| 0 | {0:"ALGERIA", 5:"ETHIOPIA", 14:"KENYA", 15:"MOROCCO", 16:"MOZAMBIQUE"} |
| 3 | {6:"FRANCE", 7:"GERMANY", 19:"ROMANIA", 22:"RUSSIA", 23:"UNITED KINGDOM"} |
| 4 | {4:"EGYPT", 10:"IRAN", 11:"IRAQ", 13:"JORDAN", 20:"SAUDI ARABIA"} |
| 2 | {8:"INDIA", 9:"INDONESIA", 12:"JAPAN", 18:"CHINA", 21:"VIETNAM"} |
+-------------+---------------------------------------------------------------------------+
MySQL > select n_regionkey, map_agg(`n_name`, `n_nationkey` % 5) from `nation` group by `n_regionkey`;
+-------------+------------------------------------------------------------------------+
| n_regionkey | map_agg(`n_name`, (`n_nationkey` % 5)) |
+-------------+------------------------------------------------------------------------+
| 2 | {"INDIA":3, "INDONESIA":4, "JAPAN":2, "CHINA":3, "VIETNAM":1} |
| 0 | {"ALGERIA":0, "ETHIOPIA":0, "KENYA":4, "MOROCCO":0, "MOZAMBIQUE":1} |
| 3 | {"FRANCE":1, "GERMANY":2, "ROMANIA":4, "RUSSIA":2, "UNITED KINGDOM":3} |
| 1 | {"ARGENTINA":1, "BRAZIL":2, "CANADA":3, "PERU":2, "UNITED STATES":4} |
| 4 | {"EGYPT":4, "IRAN":0, "IRAQ":1, "JORDAN":3, "SAUDI ARABIA":0} |
+-------------+------------------------------------------------------------------------+
```
### keywords
MAP_AGG

View File

@ -54,7 +54,7 @@ public class AggregateFunction extends Function {
FunctionSet.INTERSECT_COUNT, FunctionSet.ORTHOGONAL_BITMAP_UNION_COUNT,
FunctionSet.COUNT, "approx_count_distinct", "ndv", FunctionSet.BITMAP_UNION_INT,
FunctionSet.BITMAP_UNION_COUNT, "ndv_no_finalize", FunctionSet.WINDOW_FUNNEL, FunctionSet.RETENTION,
FunctionSet.SEQUENCE_MATCH, FunctionSet.SEQUENCE_COUNT);
FunctionSet.SEQUENCE_MATCH, FunctionSet.SEQUENCE_COUNT, FunctionSet.MAP_AGG);
public static ImmutableSet<String> ALWAYS_NULLABLE_AGGREGATE_FUNCTION_NAME_SET =
ImmutableSet.of("stddev_samp", "variance_samp", "var_samp", "percentile_approx");

View File

@ -200,6 +200,7 @@ public class FunctionSet<T> {
public static final String COLLECT_SET = "collect_set";
public static final String HISTOGRAM = "histogram";
public static final String HIST = "hist";
public static final String MAP_AGG = "map_agg";
private static final Map<Type, String> TOPN_UPDATE_SYMBOL =
ImmutableMap.<Type, String>builder()
@ -1021,6 +1022,15 @@ public class FunctionSet<T> {
true, false, true, true));
}
if (!Type.JSONB.equals(t)) {
for (Type valueType : Type.getTrivialTypes()) {
addBuiltin(AggregateFunction.createBuiltin(MAP_AGG, Lists.newArrayList(t, valueType), new MapType(t, valueType),
Type.VARCHAR,
"", "", "", "", "", null, "",
true, true, false, true));
}
}
if (STDDEV_UPDATE_SYMBOL.containsKey(t)) {
//vec stddev stddev_samp stddev_pop
addBuiltin(AggregateFunction.createBuiltin("stddev",

View File

@ -0,0 +1,22 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql1 --
1 V1_1 V1_2 V1_3
2 V2_1 V2_2 V2_3
3 V3_1 V3_2 V3_3
4 V4_1 V4_2 V4_3
5 V5_1 V5_2 V5_3
-- !sql2 --
1 V1_1 V1_2 \N
2 V2_1 \N V2_3
3 V3_1 V3_2 V3_3
4 V4_1 V4_2 \N
5 V5_1 V5_2 V5_3
-- !sql2 --
1 V1_1 V1_2 V1_3
2 V2_1 V2_2 V2_3
3 V3_1 V3_2 V3_3
4 V4_1 V4_2 V4_3
5 V5_1 V5_2 V5_3

View File

@ -0,0 +1,174 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("map_agg") {
sql "DROP TABLE IF EXISTS `test_map_agg`;"
sql """
CREATE TABLE `test_map_agg` (
`id` int(11) NOT NULL,
`label_name` varchar(32) NOT NULL,
`value_field` string
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
"""
sql """
insert into `test_map_agg` values
(1, "LA", "V1_1"),
(1, "LB", "V1_2"),
(1, "LC", "V1_3"),
(2, "LA", "V2_1"),
(2, "LB", "V2_2"),
(2, "LC", "V2_3"),
(3, "LA", "V3_1"),
(3, "LB", "V3_2"),
(3, "LC", "V3_3"),
(4, "LA", "V4_1"),
(4, "LB", "V4_2"),
(4, "LC", "V4_3"),
(5, "LA", "V5_1"),
(5, "LB", "V5_2"),
(5, "LC", "V5_3");
"""
sql "DROP TABLE IF EXISTS test_map_agg_nullable;"
sql """
CREATE TABLE `test_map_agg_nullable` (
`id` int(11) NOT NULL,
`label_name` varchar(32) NULL,
`value_field` string
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
"""
sql """
insert into `test_map_agg_nullable` values
(1, "LA", "V1_1"),
(1, "LB", "V1_2"),
(1, "LC", null),
(2, "LA", "V2_1"),
(2, null, "V2_2"),
(2, "LC", "V2_3"),
(3, "LA", "V3_1"),
(3, "LB", "V3_2"),
(3, "LC", "V3_3"),
(4, "LA", "V4_1"),
(4, "LB", "V4_2"),
(4, null, null),
(5, "LA", "V5_1"),
(5, "LB", "V5_2"),
(5, "LC", "V5_3");
"""
sql "DROP TABLE IF EXISTS `test_map_agg_string_key`;"
sql """
CREATE TABLE `test_map_agg_numeric_key` (
`id` int(11) NOT NULL,
`label_name` bigint NOT NULL,
`value_field` string
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
"""
sql """
insert into `test_map_agg_numeric_key` values
(1, 1, "V1_1"),
(1, 9223372036854775807, "V1_2"),
(1, 22000000000, "V1_3"),
(2, 1, "V2_1"),
(2, 9223372036854775807, "V2_2"),
(2, 22000000000, "V2_3"),
(3, 1, "V3_1"),
(3, 9223372036854775807, "V3_2"),
(3, 22000000000, "V3_3"),
(4, 1, "V4_1"),
(4, 9223372036854775807, "V4_2"),
(4, 22000000000, "V4_3"),
(5, 1, "V5_1"),
(5, 9223372036854775807, "V5_2"),
(5, 22000000000, "V5_3");
"""
qt_sql1 """
WITH `labels` as (
SELECT `id`, map_agg(`label_name`, `value_field`) m FROM test_map_agg GROUP BY `id`
)
SELECT
id,
m['LA'] LA,
m['LB'] LB,
m['LC'] LC
FROM `labels`
ORDER BY `id`;
"""
qt_sql2 """
WITH `labels` as (
SELECT `id`, map_agg(`label_name`, `value_field`) m FROM test_map_agg_nullable GROUP BY `id`
)
SELECT
id,
m['LA'] LA,
m['LB'] LB,
m['LC'] LC
FROM `labels`
ORDER BY `id`;
"""
qt_sql2 """
WITH `labels` as (
SELECT `id`, map_agg(`label_name`, `value_field`) m FROM test_map_agg_numeric_key GROUP BY `id`
)
SELECT
id,
m[1] LA,
m[9223372036854775807] LB,
m[22000000000] LC
FROM `labels`
ORDER BY `id`;
"""
sql "DROP TABLE `test_map_agg`"
sql "DROP TABLE `test_map_agg_nullable`"
sql "DROP TABLE `test_map_agg_numeric_key`"
}