From 754fceafaf81b9cb80c4d41e4f6ee2933ac76b48 Mon Sep 17 00:00:00 2001 From: ElvinWei Date: Thu, 22 Dec 2022 16:42:17 +0800 Subject: [PATCH] [feature-wip](statistics) add aggregate function histogram and collect histogram statistics (#14910) **Histogram statistics** Currently doris collects statistics, but no histogram data, and by default the optimizer assumes that the different values of the columns are evenly distributed. This calculation can be problematic when the data distribution is skewed. So this pr implements the collection of histogram statistics. For columns containing data skew columns (columns with unevenly distributed data in the column), histogram statistics enable the optimizer to generate more accurate estimates of cardinality for filtering or join predicates involving these columns, resulting in a more precise execution plan. The optimization of the execution plan by histogram is mainly in two aspects: the selection of where condition and the selection of join order. The selection principle of the where condition is relatively simple: the histogram is used to calculate the selection rate of each predicate, and the filter with higher selection rate is preferred. The selection of join order is based on the estimation of the number of rows in the join result. In the case of uneven data distribution in the join condition columns, histogram can greatly improve the accuracy of the prediction of the number of rows in the join result. At the same time, if the number of rows of a bucket in one of the columns is 0, you can mark it and directly skip the bucket in the subsequent join process to improve efficiency. --- Histogram statistics are mainly collected by the histogram aggregation function, which is used as follows: **Syntax** ```SQL histogram(expr) ``` > The histogram function is used to describe the distribution of the data. It uses an "equal height" bucking strategy, and divides the data into buckets according to the value of the data. It describes each bucket with some simple data, such as the number of values that fall in the bucket. It is mainly used by the optimizer to estimate the range query. **example** ``` MySQL [test]> select histogram(login_time) from dev_table; +------------------------------------------------------------------------------------------------------------------------------+ | histogram(`login_time`) | +------------------------------------------------------------------------------------------------------------------------------+ | {"bucket_size":5,"buckets":[{"lower":"2022-09-21 17:30:29","upper":"2022-09-21 22:30:29","count":9,"pre_sum":0,"ndv":1},...]}| +------------------------------------------------------------------------------------------------------------------------------+ ``` **description** ```JSON { "bucket_size": 5, "buckets": [ { "lower": "2022-09-21 17:30:29", "upper": "2022-09-21 22:30:29", "count": 9, "pre_sum": 0, "ndv": 1 }, { "lower": "2022-09-22 17:30:29", "upper": "2022-09-22 22:30:29", "count": 10, "pre_sum": 9, "ndv": 1 }, { "lower": "2022-09-23 17:30:29", "upper": "2022-09-23 22:30:29", "count": 9, "pre_sum": 19, "ndv": 1 }, { "lower": "2022-09-24 17:30:29", "upper": "2022-09-24 22:30:29", "count": 9, "pre_sum": 28, "ndv": 1 }, { "lower": "2022-09-25 17:30:29", "upper": "2022-09-25 22:30:29", "count": 9, "pre_sum": 37, "ndv": 1 } ] } ``` TODO: - histogram func supports parameter and sample statistics (It's got another pr) - use histogram statistics - add p0 regression --- be/src/vec/CMakeLists.txt | 1 + .../aggregate_function_histogram.cpp | 73 ++++ .../aggregate_function_histogram.h | 354 ++++++++++++++++++ .../aggregate_function_simple_factory.cpp | 3 + be/test/CMakeLists.txt | 1 + .../agg_histogram_test.cpp | 142 +++++++ .../aggregate-functions/histogram.md | 99 +++++ .../aggregate-functions/histogram.md | 100 +++++ .../doris/analysis/AlterColumnStatsStmt.java | 1 + .../org/apache/doris/catalog/FunctionSet.java | 3 + .../catalog/InternalSchemaInitializer.java | 1 + .../doris/nereids/stats/StatsCalculator.java | 2 + .../expressions/functions/table/Numbers.java | 2 +- .../org/apache/doris/statistics/Bucket.java | 68 ++++ .../doris/statistics/ColumnStatistic.java | 12 +- .../statistics/ColumnStatisticBuilder.java | 15 +- .../apache/doris/statistics/Histogram.java | 180 +++++++++ .../doris/statistics/HiveAnalysisTask.java | 6 +- .../doris/statistics/IcebergAnalysisTask.java | 2 +- .../statistics/StatisticsRepository.java | 8 +- .../apache/doris/statistics/StatsType.java | 1 + .../doris/nereids/util/HyperGraphBuilder.java | 2 +- .../apache/doris/statistics/CacheTest.java | 2 + .../doris/statistics/HistogramTest.java | 136 +++++++ .../statistics/StatsDeriveResultTest.java | 2 +- 25 files changed, 1203 insertions(+), 13 deletions(-) create mode 100644 be/src/vec/aggregate_functions/aggregate_function_histogram.cpp create mode 100644 be/src/vec/aggregate_functions/aggregate_function_histogram.h create mode 100644 be/test/vec/aggregate_functions/agg_histogram_test.cpp create mode 100644 docs/en/docs/sql-manual/sql-functions/aggregate-functions/histogram.md create mode 100644 docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/histogram.md create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/Bucket.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/Histogram.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTest.java diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index ca5c9c17f5..6f9b63db7e 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -46,6 +46,7 @@ set(VEC_FILES aggregate_functions/aggregate_function_simple_factory.cpp aggregate_functions/aggregate_function_orthogonal_bitmap.cpp aggregate_functions/aggregate_function_avg_weighted.cpp + aggregate_functions/aggregate_function_histogram.cpp columns/column.cpp columns/column_array.cpp columns/column_const.cpp diff --git a/be/src/vec/aggregate_functions/aggregate_function_histogram.cpp b/be/src/vec/aggregate_functions/aggregate_function_histogram.cpp new file mode 100644 index 0000000000..b400b37edc --- /dev/null +++ b/be/src/vec/aggregate_functions/aggregate_function_histogram.cpp @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/aggregate_functions/aggregate_function_histogram.h" + +namespace doris::vectorized { + +template +AggregateFunctionPtr create_agg_function_histogram(const DataTypes& argument_types) { + return AggregateFunctionPtr( + new AggregateFunctionHistogram, T>(argument_types)); +} + +AggregateFunctionPtr create_aggregate_function_histogram(const std::string& name, + const DataTypes& argument_types, + const Array& parameters, + const bool result_is_nullable) { + WhichDataType type(argument_types[0]); + + if (type.is_uint8()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_int8()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_int16()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_int32()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_int64()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_int128()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_float32()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_float64()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_decimal32()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_decimal64()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_decimal128()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_date()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_date_time()) { + return create_agg_function_histogram(argument_types); + } else if (type.is_string()) { + return create_agg_function_histogram(argument_types); + } + + LOG(WARNING) << fmt::format("unsupported input type {} for aggregate function {}", + argument_types[0]->get_name(), name); + return nullptr; +} + +void register_aggregate_function_histogram(AggregateFunctionSimpleFactory& factory) { + factory.register_function("histogram", create_aggregate_function_histogram); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/aggregate_functions/aggregate_function_histogram.h b/be/src/vec/aggregate_functions/aggregate_function_histogram.h new file mode 100644 index 0000000000..f92edd7090 --- /dev/null +++ b/be/src/vec/aggregate_functions/aggregate_function_histogram.h @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include +#include + +#include "runtime/datetime_value.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/aggregate_function_simple_factory.h" +#include "vec/io/io_helper.h" + +namespace doris::vectorized { + +// TODO: support input parameters and statistics of sampling +const int64_t MAX_BUCKET_SIZE = 128; +const float_t SAMPLE_RATE = 1.0; + +template +struct Bucket { +public: + Bucket() = default; + Bucket(T value, size_t pre_sum) + : lower(value), upper(value), count(1), pre_sum(pre_sum), ndv(1) {} + Bucket(T lower, T upper, size_t count, size_t pre_sum, size_t ndv) + : lower(lower), upper(upper), count(count), pre_sum(pre_sum), ndv(ndv) {} + + T lower; + T upper; + int64_t count; + int64_t pre_sum; + int64_t ndv; +}; + +struct AggregateFunctionHistogramBase { +public: + AggregateFunctionHistogramBase() = default; + + template + static std::vector> build_bucket_from_data(const std::vector& sorted_data, + int64_t max_bucket_size) { + std::vector> buckets; + + if (sorted_data.size() > 0) { + int64_t data_size = sorted_data.size(); + int num_per_bucket = (int64_t)std::ceil((Float64)data_size / max_bucket_size); + + for (int i = 0; i < data_size; ++i) { + T v = sorted_data[i]; + if (buckets.empty()) { + Bucket bucket(v, 0); + buckets.emplace_back(bucket); + } else { + Bucket* bucket = &buckets.back(); + T upper = bucket->upper; + if (upper == v) { + bucket->count++; + } else if (bucket->count < num_per_bucket) { + bucket->count++; + bucket->ndv++; + bucket->upper = v; + } else { + int64_t pre_sum = bucket->pre_sum + bucket->count; + Bucket new_bucket(v, pre_sum); + buckets.emplace_back(new_bucket); + } + } + } + } + + return buckets; + } + + template + static std::string build_json_from_bucket(const std::vector>& buckets, + const DataTypePtr& data_type, int64_t max_bucket_size, + int64_t sample_rate) { + rapidjson::Document doc; + doc.SetObject(); + rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); + + rapidjson::Value max_bucket_size_val(max_bucket_size); + doc.AddMember("max_bucket_size", max_bucket_size_val, allocator); + + rapidjson::Value sample_rate_val(sample_rate); + doc.AddMember("sample_rate", sample_rate_val, allocator); + + // buckets + rapidjson::Value bucket_arr(rapidjson::kArrayType); + + if (!buckets.empty()) { + int size = buckets.size(); + rapidjson::Value bucket_size_val(size); + doc.AddMember("bucket_size", bucket_size_val, allocator); + + WhichDataType type(data_type); + if (type.is_int() || type.is_float() || type.is_decimal() || type.is_string()) { + for (int i = 0; i < size; ++i) { + std::string lower_str = numerical_to_string(buckets[i].lower); + std::string upper_str = numerical_to_string(buckets[i].upper); + to_bucket_json(allocator, bucket_arr, lower_str, upper_str, + (int64_t)(buckets[i].count), (int64_t)(buckets[i].pre_sum), + (int64_t)(buckets[i].ndv)); + } + } else if (type.is_date_or_datetime()) { + for (int i = 0; i < size; ++i) { + std::string lower_str = to_date_string(buckets[i].lower); + std::string upper_str = to_date_string(buckets[i].upper); + to_bucket_json(allocator, bucket_arr, lower_str, upper_str, + (int64_t)(buckets[i].count), (int64_t)(buckets[i].pre_sum), + (int64_t)(buckets[i].ndv)); + } + } else { + rapidjson::Value bucket_size_zero(0); + doc.AddMember("bucket_size", bucket_size_zero, allocator); + LOG(WARNING) << fmt::format("unable to convert histogram data of type {}", + data_type->get_name()); + } + } + + doc.AddMember("buckets", bucket_arr, allocator); + + rapidjson::StringBuffer sb; + rapidjson::Writer writer(sb); + doc.Accept(writer); + + return std::string(sb.GetString()); + } + + static void to_bucket_json(rapidjson::Document::AllocatorType& allocator, + rapidjson::Value& bucket_arr, std::string lower, std::string upper, + int64 count, int64 pre_sum, int64 ndv) { + rapidjson::Value bucket(rapidjson::kObjectType); + + rapidjson::Value lower_val(lower.c_str(), allocator); + bucket.AddMember("lower", lower_val, allocator); + + rapidjson::Value upper_val(upper.c_str(), allocator); + bucket.AddMember("upper", upper_val, allocator); + + rapidjson::Value count_val(count); + bucket.AddMember("count", count_val, allocator); + + rapidjson::Value pre_sum_val(pre_sum); + bucket.AddMember("pre_sum", pre_sum_val, allocator); + + rapidjson::Value ndv_val(ndv); + bucket.AddMember("ndv", ndv_val, allocator); + + bucket_arr.PushBack(bucket, allocator); + } + +private: + template + static std::string numerical_to_string(T input) { + fmt::memory_buffer buffer; + fmt::format_to(buffer, "{}", input); + return std::string(buffer.data(), buffer.size()); + } + + template + static std::string to_date_string(T input) { + auto* date_int = reinterpret_cast(&input); + auto date_value = binary_cast(*date_int); + char buf[32] = {}; + date_value.to_string(buf); + return std::string(buf, strlen(buf)); + } +}; + +template +struct AggregateFunctionHistogramData : public AggregateFunctionHistogramBase { + using ElementType = T; + using ColVecType = ColumnVectorOrDecimal; + PaddedPODArray data; + + void add(const IColumn& column, size_t row_num) { + const auto& vec = assert_cast(column).get_data(); + data.push_back(vec[row_num]); + } + + void write(BufferWritable& buf) const { + write_var_uint(data.size(), buf); + buf.write(data.raw_data(), data.size() * sizeof(ElementType)); + } + + void read(BufferReadable& buf) { + UInt64 rows = 0; + read_var_uint(rows, buf); + data.resize(rows); + buf.read(reinterpret_cast(data.data()), rows * sizeof(ElementType)); + } + + void merge(const AggregateFunctionHistogramData& rhs) { + data.insert(rhs.data.begin(), rhs.data.end()); + } + + void insert_result_into(IColumn& to) const { + auto& vec = assert_cast(to).get_data(); + size_t old_size = vec.size(); + vec.resize(old_size + data.size()); + memcpy(vec.data() + old_size, data.data(), data.size() * sizeof(ElementType)); + } + + std::string get(const DataTypePtr& data_type) const { + std::vector vec_data; + + for (size_t i = 0; i < data.size(); ++i) { + [[maybe_unused]] ElementType d = data[i]; + vec_data.push_back(d); + } + + std::sort(vec_data.begin(), vec_data.end()); + auto buckets = build_bucket_from_data(vec_data, MAX_BUCKET_SIZE); + auto result_str = build_json_from_bucket(buckets, data_type, MAX_BUCKET_SIZE, + SAMPLE_RATE); + + return result_str; + } + + void reset() { data.clear(); } +}; + +template <> +struct AggregateFunctionHistogramData : public AggregateFunctionHistogramBase { + using ElementType = StringRef; + using ColVecType = ColumnString; + MutableColumnPtr data; + + AggregateFunctionHistogramData() { data = ColVecType::create(); } + + void add(const IColumn& column, size_t row_num) { data->insert_from(column, row_num); } + + void write(BufferWritable& buf) const { + auto& col = assert_cast(*data); + + write_var_uint(col.size(), buf); + buf.write(col.get_offsets().raw_data(), col.size() * sizeof(IColumn::Offset)); + + write_var_uint(col.get_chars().size(), buf); + buf.write(col.get_chars().raw_data(), col.get_chars().size()); + } + + void read(BufferReadable& buf) { + auto& col = assert_cast(*data); + UInt64 offs_size = 0; + read_var_uint(offs_size, buf); + col.get_offsets().resize(offs_size); + buf.read(reinterpret_cast(col.get_offsets().data()), + offs_size * sizeof(IColumn::Offset)); + + UInt64 chars_size = 0; + read_var_uint(chars_size, buf); + col.get_chars().resize(chars_size); + buf.read(reinterpret_cast(col.get_chars().data()), chars_size); + } + + void merge(const AggregateFunctionHistogramData& rhs) { + data->insert_range_from(*rhs.data, 0, rhs.data->size()); + } + + void insert_result_into(IColumn& to) const { + auto& to_str = assert_cast(to); + to_str.insert_range_from(*data, 0, data->size()); + } + + std::string get(const DataTypePtr& data_type) const { + std::vector str_data; + auto* res_column = reinterpret_cast(data.get()); + + for (int i = 0; i < res_column->size(); ++i) { + [[maybe_unused]] ElementType c = res_column->get_data_at(i); + str_data.push_back(c.to_string()); + } + + std::sort(str_data.begin(), str_data.end()); + const auto buckets = build_bucket_from_data(str_data, MAX_BUCKET_SIZE); + auto result_str = build_json_from_bucket(buckets, data_type, MAX_BUCKET_SIZE, + SAMPLE_RATE); + + return result_str; + } + + void reset() { data->clear(); } +}; + +template +class AggregateFunctionHistogram final + : public IAggregateFunctionDataHelper> { +public: + AggregateFunctionHistogram() = default; + AggregateFunctionHistogram(const DataTypes& argument_types_) + : IAggregateFunctionDataHelper>( + argument_types_, {}), + _argument_type(argument_types_[0]) {} + + std::string get_name() const override { return "histogram"; } + + DataTypePtr get_return_type() const override { return std::make_shared(); } + + void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num, + Arena* arena) const override { + if (columns[0]->is_null_at(row_num)) { + return; + } + + this->data(place).add(*columns[0], row_num); + } + + void reset(AggregateDataPtr place) const override { this->data(place).reset(); } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, + Arena* arena) const override { + this->data(place).merge(this->data(rhs)); + } + + void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { + this->data(place).write(buf); + } + + void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, + Arena*) const override { + this->data(place).read(buf); + } + + void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { + const std::string bucket_json = this->data(place).get(_argument_type); + assert_cast(to).insert_data(bucket_json.c_str(), bucket_json.length()); + } + +private: + DataTypePtr _argument_type; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp index 8a28ee2f69..e49246035c 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp @@ -55,6 +55,7 @@ void register_aggregate_function_orthogonal_bitmap(AggregateFunctionSimpleFactor void register_aggregate_function_collect_list(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_sequence_match(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_avg_weighted(AggregateFunctionSimpleFactory& factory); +void register_aggregate_function_histogram(AggregateFunctionSimpleFactory& factory); AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { static std::once_flag oc; @@ -84,6 +85,8 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { register_aggregate_function_collect_list(instance); register_aggregate_function_sequence_match(instance); register_aggregate_function_avg_weighted(instance); + register_aggregate_function_avg_weighted(instance); + register_aggregate_function_histogram(instance); // if you only register function with no nullable, and wants to add nullable automatically, you should place function above this line register_aggregate_function_combinator_null(instance); diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 5a45baca56..cb3bceb95d 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -275,6 +275,7 @@ endif() set(VEC_TEST_FILES vec/aggregate_functions/agg_collect_test.cpp + vec/aggregate_functions/agg_histogram_test.cpp vec/aggregate_functions/agg_test.cpp vec/aggregate_functions/agg_min_max_test.cpp vec/aggregate_functions/vec_window_funnel_test.cpp diff --git a/be/test/vec/aggregate_functions/agg_histogram_test.cpp b/be/test/vec/aggregate_functions/agg_histogram_test.cpp new file mode 100644 index 0000000000..d2d41d5f51 --- /dev/null +++ b/be/test/vec/aggregate_functions/agg_histogram_test.cpp @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "common/logging.h" +#include "gtest/gtest.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/aggregate_function_histogram.h" +#include "vec/aggregate_functions/aggregate_function_simple_factory.h" +#include "vec/columns/column_vector.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_date.h" +#include "vec/data_types/data_type_date_time.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" + +namespace doris::vectorized { + +void register_aggregate_function_histogram(AggregateFunctionSimpleFactory& factory); + +class VAggHistogramTest : public testing::Test { +public: + void SetUp() override { + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + register_aggregate_function_histogram(factory); + } + + void TearDown() override {} + + template + void agg_histogram_add_elements(AggregateFunctionPtr agg_function, AggregateDataPtr place, + size_t input_nums) { + using FieldType = typename DataType::FieldType; + auto type = std::make_shared(); + auto input_col = type->create_column(); + for (size_t i = 0; i < input_nums; ++i) { + if constexpr (std::is_same_v) { + auto item = std::string("item") + std::to_string(i); + input_col->insert_data(item.c_str(), item.size()); + } else { + auto item = FieldType(static_cast(i)); + input_col->insert_data(reinterpret_cast(&item), 0); + } + } + EXPECT_EQ(input_col->size(), input_nums); + + const IColumn* column[1] = {input_col.get()}; + for (int i = 0; i < input_col->size(); i++) { + agg_function->add(place, column, i, &_agg_arena_pool); + } + } + + template + void test_agg_histogram(size_t input_nums = 0) { + DataTypes data_types = {(DataTypePtr)std::make_shared()}; + LOG(INFO) << "test_agg_histogram for type" + << "(" << data_types[0]->get_name() << ")"; + + Array array; + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + auto agg_function = factory.get("histogram", data_types, array); + EXPECT_NE(agg_function, nullptr); + + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + + agg_histogram_add_elements(agg_function, place, input_nums); + + ColumnString buf; + VectorBufferWriter buf_writer(buf); + agg_function->serialize(place, buf_writer); + buf_writer.commit(); + VectorBufferReader buf_reader(buf.get_data_at(0)); + agg_function->deserialize(place, buf_reader, &_agg_arena_pool); + + std::unique_ptr memory2(new char[agg_function->size_of_data()]); + AggregateDataPtr place2 = memory2.get(); + agg_function->create(place2); + + agg_histogram_add_elements(agg_function, place2, input_nums); + + agg_function->merge(place, place2, &_agg_arena_pool); + auto column_result = ColumnString::create(); + agg_function->insert_result_into(place, *column_result); + EXPECT_EQ(column_result->size(), 1); + EXPECT_TRUE(column_result->get_offsets()[0] >= 1); + + auto column_result2 = ColumnString::create(); + agg_function->insert_result_into(place2, *column_result2); + EXPECT_EQ(column_result2->size(), 1); + EXPECT_TRUE(column_result2->get_offsets()[0] >= 1); + + LOG(INFO) << column_result->get_offsets()[0]; + LOG(INFO) << column_result2->get_offsets()[0]; + + agg_function->destroy(place); + agg_function->destroy(place2); + } + +private: + Arena _agg_arena_pool; +}; + +TEST_F(VAggHistogramTest, test_empty) { + test_agg_histogram(); + test_agg_histogram(); + test_agg_histogram(); + test_agg_histogram(); + test_agg_histogram(); + + test_agg_histogram>(); + test_agg_histogram(); + test_agg_histogram(); +} + +TEST_F(VAggHistogramTest, test_with_data) { + test_agg_histogram(8); + test_agg_histogram(10); + + test_agg_histogram>(12); + test_agg_histogram(14); + test_agg_histogram(10); +} + +} // namespace doris::vectorized diff --git a/docs/en/docs/sql-manual/sql-functions/aggregate-functions/histogram.md b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/histogram.md new file mode 100644 index 0000000000..63c522c500 --- /dev/null +++ b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/histogram.md @@ -0,0 +1,99 @@ +--- +{ +"title": "TOPN", +"language": "zh-CN" +} +--- + + + +## HISTOGRAM +### description +#### Syntax + +`histogram(expr)` + +The histogram function is used to describe the distribution of the data. It uses an "equal height" bucking strategy, and divides the data into buckets according to the value of the data. It describes each bucket with some simple data, such as the number of values that fall in the bucket. It is mainly used by the optimizer to estimate the range query. + +### notice + +``` +Only supported in vectorized engine +``` + +### example + +``` +MySQL [test]> select histogram(login_time) from dev_table; ++------------------------------------------------------------------------------------------------------------------------------+ +| histogram(`login_time`) | ++------------------------------------------------------------------------------------------------------------------------------+ +| {"bucket_size":5,"buckets":[{"lower":"2022-09-21 17:30:29","upper":"2022-09-21 22:30:29","count":9,"pre_sum":0,"ndv":1},...]}| ++------------------------------------------------------------------------------------------------------------------------------+ +``` +Query result description: + +``` +{ + "bucket_size": 5, + "buckets": [ + { + "lower": "2022-09-21 17:30:29", + "upper": "2022-09-21 22:30:29", + "count": 9, + "pre_sum": 0, + "ndv": 1 + }, + { + "lower": "2022-09-22 17:30:29", + "upper": "2022-09-22 22:30:29", + "count": 10, + "pre_sum": 9, + "ndv": 1 + }, + { + "lower": "2022-09-23 17:30:29", + "upper": "2022-09-23 22:30:29", + "count": 9, + "pre_sum": 19, + "ndv": 1 + }, + { + "lower": "2022-09-24 17:30:29", + "upper": "2022-09-24 22:30:29", + "count": 9, + "pre_sum": 28, + "ndv": 1 + }, + { + "lower": "2022-09-25 17:30:29", + "upper": "2022-09-25 22:30:29", + "count": 9, + "pre_sum": 37, + "ndv": 1 + } + ] +} +``` + + +### keywords + +HISTOGRAM diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/histogram.md b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/histogram.md new file mode 100644 index 0000000000..dbd1cd8445 --- /dev/null +++ b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/histogram.md @@ -0,0 +1,100 @@ +--- +{ +"title": "TOPN", +"language": "zh-CN" +} +--- + + + +## HISTOGRAM +### description +#### Syntax + +仅支持向量 + +`histogram(expr)` + +histogram(直方图)函数用于描述数据分布情况,它使用“等高”的分桶策略,并按照数据的值大小进行分桶,并用一些简单的数据来描述每个桶,比如落在桶里的值的个数。主要用于优化器进行区间查询的估算。 + +### notice + +``` +仅支持向量化引擎中使用 +``` + +### example + +``` +MySQL [test]> select histogram(login_time) from dev_table; ++------------------------------------------------------------------------------------------------------------------------------+ +| histogram(`login_time`) | ++------------------------------------------------------------------------------------------------------------------------------+ +| {"bucket_size":5,"buckets":[{"lower":"2022-09-21 17:30:29","upper":"2022-09-21 22:30:29","count":9,"pre_sum":0,"ndv":1},...]}| ++------------------------------------------------------------------------------------------------------------------------------+ +``` +查询结果说明: + +``` +{ + "bucket_size": 5, + "buckets": [ + { + "lower": "2022-09-21 17:30:29", + "upper": "2022-09-21 22:30:29", + "count": 9, + "pre_sum": 0, + "ndv": 1 + }, + { + "lower": "2022-09-22 17:30:29", + "upper": "2022-09-22 22:30:29", + "count": 10, + "pre_sum": 9, + "ndv": 1 + }, + { + "lower": "2022-09-23 17:30:29", + "upper": "2022-09-23 22:30:29", + "count": 9, + "pre_sum": 19, + "ndv": 1 + }, + { + "lower": "2022-09-24 17:30:29", + "upper": "2022-09-24 22:30:29", + "count": 9, + "pre_sum": 28, + "ndv": 1 + }, + { + "lower": "2022-09-25 17:30:29", + "upper": "2022-09-25 22:30:29", + "count": 9, + "pre_sum": 37, + "ndv": 1 + } + ] +} +``` + +### keywords + +HISTOGRAM diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java index 440a6acca9..1af3dffff9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java @@ -62,6 +62,7 @@ public class AlterColumnStatsStmt extends DdlStmt { .add(ColumnStatistic.NUM_NULLS) .add(ColumnStatistic.MIN_VALUE) .add(ColumnStatistic.MAX_VALUE) + .add(ColumnStatistic.HISTOGRAM) .add(StatsType.DATA_SIZE) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java index 532635cf36..08da0d9f57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -928,6 +928,7 @@ public class FunctionSet { public static final String TO_QUANTILE_STATE = "to_quantile_state"; public static final String COLLECT_LIST = "collect_list"; public static final String COLLECT_SET = "collect_set"; + public static final String HISTOGRAM = "histogram"; private static final Map ORTHOGONAL_BITMAP_INTERSECT_INIT_SYMBOL = ImmutableMap.builder() @@ -2601,6 +2602,8 @@ public class FunctionSet { .createBuiltin("topn_weighted", Lists.newArrayList(t, Type.BIGINT, Type.INT, Type.INT), new ArrayType(t), t, "", "", "", "", "", true, false, true, true)); + addBuiltin(AggregateFunction.createBuiltin(HISTOGRAM, Lists.newArrayList(t), Type.VARCHAR, t, + "", "", "", "", "", true, false, true, true)); } // Avg diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 7e96b5943f..eccadac0f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -115,6 +115,7 @@ public class InternalSchemaInitializer extends Thread { columnDefs.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT))); columnDefs.add(new ColumnDef("min", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))); columnDefs.add(new ColumnDef("max", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))); + columnDefs.add(new ColumnDef("histogram", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))); columnDefs.add(new ColumnDef("data_size_in_bytes", TypeDef.create(PrimitiveType.BIGINT))); columnDefs.add(new ColumnDef("update_time", TypeDef.create(PrimitiveType.DATETIME))); String engineName = "olap"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java index ec73d77592..21bfebf78f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java @@ -410,6 +410,7 @@ public class StatsCalculator extends DefaultPlanVisitor stats.dataSize < 0 ? stats.dataSize : stats.dataSize * groupingSetNum, stats.minValue, stats.maxValue, + stats.histogram, stats.selectivity, stats.minExpr, stats.maxExpr, @@ -500,6 +501,7 @@ public class StatsCalculator extends DefaultPlanVisitor leftStats.dataSize + rightStats.dataSize, Math.min(leftStats.minValue, rightStats.minValue), Math.max(leftStats.maxValue, rightStats.maxValue), + null, 1.0 / (leftStats.ndv + rightStats.ndv), leftStats.minExpr, leftStats.maxExpr, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java index fc94a3dc09..0aa0fb93a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java @@ -71,7 +71,7 @@ public class Numbers extends TableValuedFunction { Map columnToStatistics = Maps.newHashMap(); ColumnStatistic columnStat = new ColumnStatistic(rowNum, rowNum, 8, 0, 8, 0, rowNum - 1, - 1.0 / rowNum, new IntLiteral(0, Type.BIGINT), new IntLiteral(rowNum - 1, Type.BIGINT), false); + null, 1.0 / rowNum, new IntLiteral(0, Type.BIGINT), new IntLiteral(rowNum - 1, Type.BIGINT), false); columnToStatistics.put(slots.get(0).getExprId(), columnStat); return new StatsDeriveResult(rowNum, columnToStatistics); } catch (Exception t) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/Bucket.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/Bucket.java new file mode 100644 index 0000000000..adcbd27d73 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/Bucket.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.LiteralExpr; + +public class Bucket { + public LiteralExpr lower; + public LiteralExpr upper; + public int count; + public int preSum; + public int ndv; + + public LiteralExpr getLower() { + return lower; + } + + public void setLower(LiteralExpr lower) { + this.lower = lower; + } + + public LiteralExpr getUpper() { + return upper; + } + + public void setUpper(LiteralExpr upper) { + this.upper = upper; + } + + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + + public int getPreSum() { + return preSum; + } + + public void setPreSum(int preSum) { + this.preSum = preSum; + } + + public int getNdv() { + return ndv; + } + + public void setNdv(int ndv) { + this.ndv = ndv; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index a0530a7f59..3ca6de9f08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -43,12 +43,13 @@ public class ColumnStatistic { public static final StatsType NUM_NULLS = StatsType.NUM_NULLS; public static final StatsType MIN_VALUE = StatsType.MIN_VALUE; public static final StatsType MAX_VALUE = StatsType.MAX_VALUE; + public static final StatsType HISTOGRAM = StatsType.HISTOGRAM; private static final Logger LOG = LogManager.getLogger(StmtExecutor.class); public static ColumnStatistic DEFAULT = new ColumnStatisticBuilder().setAvgSizeByte(1).setNdv(1) .setNumNulls(1).setCount(1).setMaxValue(Double.MAX_VALUE).setMinValue(Double.MIN_VALUE) - .setSelectivity(1.0).setIsUnknown(true) + .setHistogram(Histogram.defaultHistogram()).setSelectivity(1.0).setIsUnknown(true) .build(); public static final Set MAX_MIN_UNSUPPORTED_TYPE = new HashSet<>(); @@ -68,6 +69,7 @@ public class ColumnStatistic { public final double avgSizeByte; public final double minValue; public final double maxValue; + public final Histogram histogram; public final boolean isUnKnown; /* selectivity of Column T1.A: @@ -89,7 +91,7 @@ public class ColumnStatistic { public ColumnStatistic(double count, double ndv, double avgSizeByte, double numNulls, double dataSize, double minValue, double maxValue, - double selectivity, LiteralExpr minExpr, + Histogram histogram, double selectivity, LiteralExpr minExpr, LiteralExpr maxExpr, boolean isUnKnown) { this.count = count; this.ndv = ndv; @@ -98,6 +100,7 @@ public class ColumnStatistic { this.dataSize = dataSize; this.minValue = minValue; this.maxValue = maxValue; + this.histogram = histogram; this.selectivity = selectivity; this.minExpr = minExpr; this.maxExpr = maxExpr; @@ -134,8 +137,10 @@ public class ColumnStatistic { } String min = resultRow.getColumnValue("min"); String max = resultRow.getColumnValue("max"); + String histogram = resultRow.getColumnValue("histogram"); columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min)); columnStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max)); + columnStatisticBuilder.setHistogram(Histogram.deserializeFromJson(col.getType(), histogram)); columnStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max)); columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min)); columnStatisticBuilder.setSelectivity(1.0); @@ -154,7 +159,7 @@ public class ColumnStatistic { public ColumnStatistic copy() { return new ColumnStatisticBuilder().setCount(count).setNdv(ndv).setAvgSizeByte(avgSizeByte) .setNumNulls(numNulls).setDataSize(dataSize).setMinValue(minValue) - .setMaxValue(maxValue).setMinExpr(minExpr).setMaxExpr(maxExpr) + .setMaxValue(maxValue).setHistogram(histogram).setMinExpr(minExpr).setMaxExpr(maxExpr) .setSelectivity(selectivity).setIsUnknown(isUnKnown).build(); } @@ -178,6 +183,7 @@ public class ColumnStatistic { .setDataSize(Math.ceil(dataSize * ratio)) .setMinValue(minValue) .setMaxValue(maxValue) + .setHistogram(histogram) .setMinExpr(minExpr) .setMaxExpr(maxExpr) .setSelectivity(newSelectivity) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticBuilder.java index bb4535a7de..4714b3a1bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticBuilder.java @@ -27,6 +27,7 @@ public class ColumnStatisticBuilder { private double dataSize; private double minValue; private double maxValue; + private Histogram histogram; private double selectivity = 1.0; private LiteralExpr minExpr; private LiteralExpr maxExpr; @@ -44,6 +45,7 @@ public class ColumnStatisticBuilder { this.dataSize = columnStatistic.dataSize; this.minValue = columnStatistic.minValue; this.maxValue = columnStatistic.maxValue; + this.histogram = columnStatistic.histogram; this.selectivity = columnStatistic.selectivity; this.minExpr = columnStatistic.minExpr; this.maxExpr = columnStatistic.maxExpr; @@ -85,6 +87,11 @@ public class ColumnStatisticBuilder { return this; } + public ColumnStatisticBuilder setHistogram(Histogram histogram) { + this.histogram = histogram; + return this; + } + public ColumnStatisticBuilder setSelectivity(double selectivity) { this.selectivity = selectivity; return this; @@ -133,6 +140,10 @@ public class ColumnStatisticBuilder { return maxValue; } + public Histogram getHistogram() { + return histogram; + } + public double getSelectivity() { return selectivity; } @@ -150,7 +161,7 @@ public class ColumnStatisticBuilder { } public ColumnStatistic build() { - return new ColumnStatistic(count, ndv, avgSizeByte, numNulls, dataSize, minValue, maxValue, selectivity, - minExpr, maxExpr, isUnknown); + return new ColumnStatistic(count, ndv, avgSizeByte, numNulls, + dataSize, minValue, maxValue, histogram, selectivity, minExpr, maxExpr, isUnknown); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/Histogram.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/Histogram.java new file mode 100644 index 0000000000..0516c1fabf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/Histogram.java @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Type; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.parquet.Strings; + +import java.util.List; + +public class Histogram { + private static final Logger LOG = LogManager.getLogger(Histogram.class); + + private Type dataType; + + private int maxBucketSize; + private int bucketSize; + private float sampleRate; + + private List buckets; + + public Histogram(Type dataType) { + this.dataType = dataType; + } + + public Type getDataType() { + return dataType; + } + + public void setDataType(Type dataType) { + this.dataType = dataType; + } + + public int getMaxBucketSize() { + return maxBucketSize; + } + + public void setMaxBucketSize(int maxBucketSize) { + this.maxBucketSize = maxBucketSize; + } + + public int getBucketSize() { + return bucketSize; + } + + public void setBucketSize(int bucketSize) { + this.bucketSize = bucketSize; + } + + public float getSampleRate() { + return sampleRate; + } + + public void setSampleRate(float sampleRate) { + if (sampleRate < 0f || sampleRate > 1f) { + this.sampleRate = 1f; + } else { + this.sampleRate = sampleRate; + } + } + + public void setBuckets(List buckets) { + this.buckets = buckets; + } + + public List getBuckets() { + return buckets; + } + + public static Histogram defaultHistogram() { + Type type = Type.fromPrimitiveType(PrimitiveType.INVALID_TYPE); + List buckets = Lists.newArrayList(); + Histogram histogram = new Histogram(type); + histogram.setMaxBucketSize(0); + histogram.setBucketSize(0); + histogram.setSampleRate(1.0f); + histogram.setBuckets(buckets); + return histogram; + } + + /** + * Histogram info is stored in an internal table in json format, + * and Histogram obj can be obtained by this method. + */ + public static Histogram deserializeFromJson(Type datatype, String json) { + if (Strings.isNullOrEmpty(json)) { + return null; + } + + try { + Histogram histogram = new Histogram(datatype); + JSONObject histogramJson = JSON.parseObject(json); + + List buckets = Lists.newArrayList(); + JSONArray jsonArray = histogramJson.getJSONArray("buckets"); + + for (int i = 0; i < jsonArray.size(); i++) { + JSONObject bucketJson = jsonArray.getJSONObject(i); + Bucket bucket = new Bucket(); + bucket.lower = StatisticsUtil.readableValue(datatype, bucketJson.get("lower").toString()); + bucket.upper = StatisticsUtil.readableValue(datatype, bucketJson.get("upper").toString()); + bucket.count = bucketJson.getIntValue("count"); + bucket.preSum = bucketJson.getIntValue("pre_sum"); + bucket.ndv = bucketJson.getIntValue("ndv"); + buckets.add(bucket); + } + + histogram.setBuckets(buckets); + + int maxBucketSize = histogramJson.getIntValue("max_bucket_size"); + histogram.setMaxBucketSize(maxBucketSize); + + int bucketSize = histogramJson.getIntValue("bucket_size"); + histogram.setBucketSize(bucketSize); + + float sampleRate = histogramJson.getFloatValue("sample_rate"); + histogram.setSampleRate(sampleRate); + + return histogram; + } catch (Throwable e) { + LOG.warn("deserialize from json error, input json string: {}", json, e); + } + + return null; + } + + /** + * Convert to json format string + */ + public static String serializeToJson(Histogram histogram) { + if (histogram == null) { + return ""; + } + + JSONObject histogramJson = new JSONObject(); + histogramJson.put("max_bucket_size", histogram.maxBucketSize); + histogramJson.put("bucket_size", histogram.bucketSize); + histogramJson.put("sample_rate", histogram.sampleRate); + + JSONArray bucketsJsonArray = new JSONArray(); + histogramJson.put("buckets", bucketsJsonArray); + + if (histogram.buckets != null) { + for (Bucket bucket : histogram.buckets) { + JSONObject bucketJson = new JSONObject(); + bucketJson.put("count", bucket.count); + bucketJson.put("pre_sum", bucket.preSum); + bucketJson.put("ndv", bucket.ndv); + bucketJson.put("upper", bucket.upper.getStringValue()); + bucketJson.put("lower", bucket.lower.getStringValue()); + bucketsJsonArray.add(bucketJson); + } + } + + return histogramJson.toJSONString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java index 4ad00bdc75..af2103f13a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java @@ -58,12 +58,12 @@ public class HiveAnalysisTask extends HMSAnalysisTask { private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', '${partId}', " - + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; + + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', NULL, ${dataSize}, '${update_time}')"; private static final String ANALYZE_TABLE_SQL_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" - + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', 'NULL', '${colId}', NULL, " - + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; + + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', NULL, '${colId}', NULL, " + + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', NULL, ${dataSize}, '${update_time}')"; @Override protected void getColumnStatsByMeta() throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java index fcb6abf457..4dfc68a946 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java @@ -52,7 +52,7 @@ public class IcebergAnalysisTask extends HMSAnalysisTask { private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, " - + "${numRows}, 0, ${nulls}, '0', '0', ${dataSize}, '${update_time}')"; + + "${numRows}, 0, ${nulls}, '0', '0', NULL, ${dataSize}, '${update_time}')"; @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index 78693bbaef..7901c1c918 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -74,7 +74,8 @@ public class StatisticsRepository { private static final String INSERT_INTO_COLUMN_STATISTICS = "INSERT INTO " + FULL_QUALIFIED_COLUMN_STATISTICS_NAME + " VALUES('${id}', ${catalogId}, ${dbId}, ${tblId}, '${idxId}'," - + "'${colId}', ${partId}, ${count}, ${ndv}, ${nullCount}, '${min}', '${max}', ${dataSize}, NOW())"; + + "'${colId}', ${partId}, ${count}, ${ndv}, ${nullCount}, '${min}', '${max}', " + + "'${histogram}', ${dataSize}, NOW())"; public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) { ResultRow resultRow = queryColumnStatisticById(tableId, colName); @@ -158,6 +159,7 @@ public class StatisticsRepository { String nullCount = alterColumnStatsStmt.getValue(StatsType.NUM_NULLS); String min = alterColumnStatsStmt.getValue(StatsType.MIN_VALUE); String max = alterColumnStatsStmt.getValue(StatsType.MAX_VALUE); + String histogram = alterColumnStatsStmt.getValue(StatsType.HISTOGRAM); String dataSize = alterColumnStatsStmt.getValue(StatsType.DATA_SIZE); ColumnStatisticBuilder builder = new ColumnStatisticBuilder(); String colName = alterColumnStatsStmt.getColumnName(); @@ -179,6 +181,9 @@ public class StatisticsRepository { builder.setMaxExpr(StatisticsUtil.readableValue(column.getType(), max)); builder.setMaxValue(StatisticsUtil.convertToDouble(column.getType(), max)); } + if (histogram != null) { + builder.setHistogram(Histogram.deserializeFromJson(column.getType(), histogram)); + } if (dataSize != null) { builder.setDataSize(Double.parseDouble(dataSize)); } @@ -196,6 +201,7 @@ public class StatisticsRepository { params.put("nullCount", String.valueOf(columnStatistic.numNulls)); params.put("min", min == null ? "NULL" : min); params.put("max", max == null ? "NULL" : max); + params.put("histogram", (columnStatistic.histogram == null) ? "NULL" : histogram); params.put("dataSize", String.valueOf(columnStatistic.dataSize)); StatisticsUtil.execUpdate(INSERT_INTO_COLUMN_STATISTICS, params); Env.getCurrentEnv().getStatisticsCache().updateCache(objects.table.getId(), -1, colName, builder.build()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java index a7fd1d6cf1..d59f85285a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java @@ -26,6 +26,7 @@ public enum StatsType { NUM_NULLS("num_nulls"), MIN_VALUE("min_value"), MAX_VALUE("max_value"), + HISTOGRAM("histogram"), // only for test UNKNOWN("unknown"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/HyperGraphBuilder.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/HyperGraphBuilder.java index 49e80acccf..99e6a24338 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/HyperGraphBuilder.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/HyperGraphBuilder.java @@ -191,7 +191,7 @@ public class HyperGraphBuilder { int count = rowCounts.get(Integer.parseInt(scanPlan.getTable().getName())); for (Slot slot : scanPlan.getOutput()) { slotIdToColumnStats.put(slot.getExprId(), - new ColumnStatistic(count, count, 0, 0, 0, 0, 0, 0, null, null, true)); + new ColumnStatistic(count, count, 0, 0, 0, 0, 0, null, 0, null, null, true)); } StatsDeriveResult stats = new StatsDeriveResult(count, slotIdToColumnStats); group.setStatistics(stats); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java index c61b692870..96ae8fae9f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java @@ -91,6 +91,7 @@ public class CacheTest extends TestWithFeService { colNames.add("col_id"); colNames.add("min"); colNames.add("max"); + colNames.add("histogram"); List primitiveTypes = new ArrayList<>(); primitiveTypes.add(PrimitiveType.BIGINT); primitiveTypes.add(PrimitiveType.BIGINT); @@ -114,6 +115,7 @@ public class CacheTest extends TestWithFeService { values.add("8"); values.add("9"); values.add("10"); + values.add(""); ResultRow resultRow = new ResultRow(colNames, primitiveTypes, values); return Arrays.asList(resultRow); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTest.java new file mode 100644 index 0000000000..f575872c23 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTest.java @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Objects; + +class HistogramTest { + private final Type datatype = Type.fromPrimitiveType(PrimitiveType.DATETIME); + private Histogram histogramUnderTest; + + @BeforeEach + void setUp() throws Exception { + String json = "{\"max_bucket_size\":128,\"bucket_size\":5,\"sample_rate\":1.0,\"buckets\":" + + "[{\"lower\":\"2022-09-21 17:30:29\",\"upper\":\"2022-09-21 22:30:29\"," + + "\"count\":9,\"pre_sum\":0,\"ndv\":1}," + + "{\"lower\":\"2022-09-22 17:30:29\",\"upper\":\"2022-09-22 22:30:29\"," + + "\"count\":10,\"pre_sum\":9,\"ndv\":1}," + + "{\"lower\":\"2022-09-23 17:30:29\",\"upper\":\"2022-09-23 22:30:29\"," + + "\"count\":9,\"pre_sum\":19,\"ndv\":1}," + + "{\"lower\":\"2022-09-24 17:30:29\",\"upper\":\"2022-09-24 22:30:29\"," + + "\"count\":9,\"pre_sum\":28,\"ndv\":1}," + + "{\"lower\":\"2022-09-25 17:30:29\",\"upper\":\"2022-09-25 22:30:29\"," + + "\"count\":9,\"pre_sum\":37,\"ndv\":1}]}"; + histogramUnderTest = Histogram.deserializeFromJson(datatype, json); + if (histogramUnderTest == null) { + Assertions.fail(); + } + } + + @Test + void testDeserializeFromJson() throws Exception { + Type dataType = histogramUnderTest.getDataType(); + Assertions.assertTrue(dataType.isDatetime()); + + int maxBucketSize = histogramUnderTest.getMaxBucketSize(); + Assertions.assertEquals(128, maxBucketSize); + + int bucketSize = histogramUnderTest.getBucketSize(); + Assertions.assertEquals(5, bucketSize); + + float sampleRate = histogramUnderTest.getSampleRate(); + Assertions.assertEquals(1.0, sampleRate); + + List buckets = histogramUnderTest.getBuckets(); + Assertions.assertEquals(5, buckets.size()); + + LiteralExpr expectedLower = LiteralExpr.create("2022-09-21 17:30:29", + Objects.requireNonNull(Type.fromPrimitiveType(PrimitiveType.DATETIME))); + LiteralExpr expectedUpper = LiteralExpr.create("2022-09-21 22:30:29", + Objects.requireNonNull(Type.fromPrimitiveType(PrimitiveType.DATETIME))); + + boolean flag = false; + + for (Bucket bucket : buckets) { + LiteralExpr lower = bucket.getLower(); + LiteralExpr upper = bucket.getUpper(); + if (expectedLower.equals(lower) && expectedUpper.equals(upper)) { + flag = true; + break; + } + } + + Assertions.assertTrue(flag); + } + + @Test + void testSerializeToJson() throws AnalysisException { + String json = Histogram.serializeToJson(histogramUnderTest); + JSONObject histogramJson = JSON.parseObject(json); + + int maxBucketSize = histogramJson.getIntValue("max_bucket_size"); + Assertions.assertEquals(128, maxBucketSize); + + int bucketSize = histogramJson.getIntValue("bucket_size"); + Assertions.assertEquals(5, bucketSize); + + float sampleRate = histogramJson.getFloat("sample_rate"); + Assertions.assertEquals(1.0, sampleRate); + + JSONArray jsonArray = histogramJson.getJSONArray("buckets"); + Assertions.assertEquals(5, jsonArray.size()); + + // test first bucket + LiteralExpr expectedLower = LiteralExpr.create("2022-09-21 17:30:29", + Objects.requireNonNull(Type.fromPrimitiveType(PrimitiveType.DATETIME))); + LiteralExpr expectedUpper = LiteralExpr.create("2022-09-21 22:30:29", + Objects.requireNonNull(Type.fromPrimitiveType(PrimitiveType.DATETIME))); + + boolean flag = false; + + for (int i = 0; i < jsonArray.size(); i++) { + JSONObject bucketJson = jsonArray.getJSONObject(i); + assert datatype != null; + LiteralExpr lower = StatisticsUtil.readableValue(datatype, bucketJson.get("lower").toString()); + LiteralExpr upper = StatisticsUtil.readableValue(datatype, bucketJson.get("upper").toString()); + int count = bucketJson.getIntValue("count"); + int preSum = bucketJson.getIntValue("pre_sum"); + int ndv = bucketJson.getIntValue("ndv"); + if (expectedLower.equals(lower) && expectedUpper.equals(upper) && count == 9 && preSum == 0 && ndv == 1) { + flag = true; + break; + } + } + + Assertions.assertTrue(flag); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatsDeriveResultTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatsDeriveResultTest.java index 0f2cebf511..df4f48858a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatsDeriveResultTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatsDeriveResultTest.java @@ -27,7 +27,7 @@ public class StatsDeriveResultTest { public void testUpdateRowCountByLimit() { StatsDeriveResult stats = new StatsDeriveResult(100); ColumnStatistic a = new ColumnStatistic(100, 10, 1, 5, 10, - 1, 100, 0.5, null, null, false); + 1, 100, null, 0.5, null, null, false); Id id = new Id(1); stats.addColumnStats(id, a); StatsDeriveResult res = stats.updateByLimit(0);