Files
doris/be/src/vec/columns/column_object.h
github-actions[bot] 37c4de3cbf branch-2.1: [improve](variant) only sanitize in debug mode #45689 (#45698)
Cherry-picked from #45689

Co-authored-by: lihangyu <lihangyu@selectdb.com>
2024-12-20 14:25:49 +08:00

618 lines
22 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Columns/ColumnObject.h
// and modified by Doris
#pragma once
#include <glog/logging.h>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <sys/types.h>
#include <algorithm>
#include <memory>
#include <ostream>
#include <string>
#include <string_view>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/status.h"
#include "olap/tablet_schema.h"
#include "util/jsonb_document.h"
#include "vec/columns/column.h"
#include "vec/columns/subcolumn_tree.h"
#include "vec/common/cow.h"
#include "vec/common/string_ref.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_jsonb.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/io/reader_buffer.h"
#include "vec/json/path_in_data.h"
class SipHash;
namespace doris {
namespace vectorized {
class Arena;
} // namespace vectorized
} // namespace doris
namespace doris::vectorized {
/// Info that represents a scalar or array field in a decomposed view.
/// It allows to recreate field with different number
/// of dimensions or nullability.
struct FieldInfo {
/// The common type id of of all scalars in field.
TypeIndex scalar_type_id;
/// Do we have NULL scalar in field.
bool have_nulls;
/// If true then we have scalars with different types in array and
/// we need to convert scalars to the common type.
bool need_convert;
/// Number of dimension in array. 0 if field is scalar.
size_t num_dimensions;
};
void get_field_info(const Field& field, FieldInfo* info);
/** A column that represents object with dynamic set of subcolumns.
* Subcolumns are identified by paths in document and are stored in
* a trie-like structure. ColumnObject is not suitable for writing into tables
* and it should be converted to Tuple with fixed set of subcolumns before that.
*/
class ColumnObject final : public COWHelper<IColumn, ColumnObject> {
public:
/** Class that represents one subcolumn.
* It stores values in several parts of column
* and keeps current common type of all parts.
* We add a new column part with a new type, when we insert a field,
* which can't be converted to the current common type.
* After insertion of all values subcolumn should be finalized
* for writing and other operations.
*/
// Using jsonb type as most common type, since it's adopted all types of json
using MostCommonType = DataTypeJsonb;
constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB;
class Subcolumn {
public:
Subcolumn() = default;
Subcolumn(size_t size_, bool is_nullable_, bool is_root = false);
Subcolumn(MutableColumnPtr&& data_, DataTypePtr type, bool is_nullable_,
bool is_root = false);
size_t size() const;
size_t byteSize() const;
size_t allocatedBytes() const;
bool is_finalized() const;
const DataTypePtr& get_least_common_type() const { return least_common_type.get(); }
const DataTypePtr& get_least_common_typeBase() const {
return least_common_type.get_base();
}
const DataTypeSerDeSPtr& get_least_common_type_serde() const {
return least_common_type.get_serde();
}
size_t get_dimensions() const { return least_common_type.get_dimensions(); }
void get(size_t n, Field& res) const;
/// Checks the consistency of column's parts stored in @data.
void checkTypes() const;
/// Inserts a field, which scalars can be arbitrary, but number of
/// dimensions should be consistent with current common type.
/// throws InvalidArgument when meet conflict types
void insert(Field field);
void insert(Field field, FieldInfo info);
void insertDefault();
void insertManyDefaults(size_t length);
void insertRangeFrom(const Subcolumn& src, size_t start, size_t length);
void pop_back(size_t n);
/// Converts all column's parts to the common type and
/// creates a single column that stores all values.
void finalize();
bool check_if_sparse_column(size_t num_rows);
/// Returns single column if subcolumn in finalizes.
/// Otherwise -- undefined behaviour.
IColumn& get_finalized_column();
const IColumn& get_finalized_column() const;
const ColumnPtr& get_finalized_column_ptr() const;
ColumnPtr& get_finalized_column_ptr();
void remove_nullable();
void add_new_column_part(DataTypePtr type);
friend class ColumnObject;
private:
class LeastCommonType {
public:
LeastCommonType() = default;
explicit LeastCommonType(DataTypePtr type_);
const DataTypePtr& get() const { return type; }
const DataTypePtr& get_base() const { return base_type; }
const TypeIndex& get_type_id() const { return type_id; }
const TypeIndex& get_base_type_id() const { return base_type_id; }
size_t get_dimensions() const { return num_dimensions; }
void remove_nullable() { type = doris::vectorized::remove_nullable(type); }
const DataTypeSerDeSPtr& get_serde() const { return least_common_type_serder; }
private:
DataTypePtr type;
DataTypePtr base_type;
TypeIndex type_id;
TypeIndex base_type_id;
size_t num_dimensions = 0;
DataTypeSerDeSPtr least_common_type_serder;
};
void wrapp_array_nullable();
/// Current least common type of all values inserted to this subcolumn.
LeastCommonType least_common_type;
/// If true then common type type of subcolumn is Nullable
/// and default values are NULLs.
bool is_nullable = false;
/// Parts of column. Parts should be in increasing order in terms of subtypes/supertypes.
/// That means that the least common type for i-th prefix is the type of i-th part
/// and it's the supertype for all type of column from 0 to i-1.
std::vector<WrappedPtr> data;
std::vector<DataTypePtr> data_types;
/// Until we insert any non-default field we don't know further
/// least common type and we count number of defaults in prefix,
/// which will be converted to the default type of final common type.
size_t num_of_defaults_in_prefix = 0;
// If it is the root subcolumn of SubcolumnsTree,
// the root Node should be JSONB type when finalize
bool is_root = false;
};
using Subcolumns = SubcolumnsTree<Subcolumn>;
private:
/// If true then all subcolumns are nullable.
const bool is_nullable;
Subcolumns subcolumns;
size_t num_rows;
// sparse columns will be merge and encoded into root column
Subcolumns sparse_columns;
// The rapidjson document format of Subcolumns tree structure
// the leaves is null.In order to display whole document, copy
// this structure and fill with Subcolumns sub items
mutable std::shared_ptr<rapidjson::Document> doc_structure;
using SubColumnWithName = std::pair<PathInData, const Subcolumn*>;
// Cached search results for previous row (keyed as index in JSON object) - used as a hint.
mutable std::vector<SubColumnWithName> _prev_positions;
public:
static constexpr auto COLUMN_NAME_DUMMY = "_dummy";
explicit ColumnObject(bool is_nullable_, bool create_root = true);
explicit ColumnObject(bool is_nullable_, DataTypePtr type, MutableColumnPtr&& column);
ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_);
~ColumnObject() override = default;
/// Checks that all subcolumns have consistent sizes.
void check_consistency() const;
MutableColumnPtr get_root() {
if (subcolumns.empty() || is_nothing(subcolumns.get_root()->data.get_least_common_type())) {
return nullptr;
}
return subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable();
}
Status serialize_one_row_to_string(int row, std::string* output) const;
Status serialize_one_row_to_string(int row, BufferWritable& output) const;
// serialize one row to json format
Status serialize_one_row_to_json_format(int row, rapidjson::StringBuffer* output,
bool* is_null) const;
// merge multiple sub sparse columns into root
Status merge_sparse_to_root_column();
// ensure root node is a certain type
void ensure_root_node_type(const DataTypePtr& type);
// create jsonb root if missing
void create_root();
// create root with type and column if missing
void create_root(const DataTypePtr& type, MutableColumnPtr&& column);
DataTypePtr get_most_common_type() const;
// root is null or type nothing
bool is_null_root() const;
// Only single scalar root column
bool is_scalar_variant() const;
bool is_exclusive() const override;
ColumnPtr get_root() const { return subcolumns.get_root()->data.get_finalized_column_ptr(); }
bool has_subcolumn(const PathInData& key) const;
DataTypePtr get_root_type() const;
// return null if not found
const Subcolumn* get_subcolumn(const PathInData& key) const;
// return null if not found
const Subcolumn* get_subcolumn(const PathInData& key, size_t index_hint) const;
// return null if not found
Subcolumn* get_subcolumn(const PathInData& key);
// return null if not found
Subcolumn* get_subcolumn(const PathInData& key, size_t index_hint);
// return null if not found
const Subcolumn* get_subcolumn_with_cache(const PathInData& key, size_t index_hint) const;
void incr_num_rows() { ++num_rows; }
void incr_num_rows(size_t n) { num_rows += n; }
void set_num_rows(size_t n) { num_rows = n; }
size_t rows() const { return num_rows; }
/// Adds a subcolumn from existing IColumn.
bool add_sub_column(const PathInData& key, MutableColumnPtr&& subcolumn, DataTypePtr type);
/// Adds a subcolumn of specific size with default values.
bool add_sub_column(const PathInData& key, size_t new_size);
const Subcolumns& get_subcolumns() const { return subcolumns; }
const Subcolumns& get_sparse_subcolumns() const { return sparse_columns; }
Subcolumns& get_subcolumns() { return subcolumns; }
PathsInData getKeys() const;
std::string get_keys_str() const {
std::stringstream ss;
bool first = true;
for (auto& k : getKeys()) {
if (first) {
first = false;
} else {
ss << ", ";
}
ss << k.get_path();
}
return ss.str();
}
void remove_subcolumns(const std::unordered_set<std::string>& keys);
// use sparse_subcolumns_schema to record sparse column's path info and type
void finalize(bool ignore_sparser);
/// Finalizes all subcolumns.
void finalize() override;
bool is_finalized() const;
MutableColumnPtr clone_finalized() const {
auto finalized = IColumn::mutate(get_ptr());
static_cast<ColumnObject*>(finalized.get())->finalize();
return finalized;
}
void finalize_if_not();
void clear() override;
void resize(size_t n) override;
void clear_subcolumns_data();
std::string get_name() const override {
if (is_scalar_variant()) {
return "var_scalar(" + get_root()->get_name() + ")";
}
return "variant";
}
/// Part of interface
const char* get_family_name() const override { return "Variant"; }
size_t size() const override;
MutableColumnPtr clone_resized(size_t new_size) const override;
size_t byte_size() const override;
size_t allocated_bytes() const override;
void for_each_subcolumn(ColumnCallback callback) override;
// Do nothing, call try_insert instead
void insert(const Field& field) override { try_insert(field); }
void append_data_by_selector(MutableColumnPtr& res,
const IColumn::Selector& selector) const override {
append_data_by_selector_impl<ColumnObject>(res, selector);
}
void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector,
size_t begin, size_t end) const override {
append_data_by_selector_impl<ColumnObject>(res, selector, begin, end);
}
void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
const uint32_t* indices_end) override;
// May throw execption
void try_insert(const Field& field);
void insert_from(const IColumn& src, size_t n) override;
void insert_range_from(const IColumn& src, size_t start, size_t length) override;
void insert_default() override;
ColumnPtr replicate(const Offsets& offsets) const override;
void pop_back(size_t length) override;
Field operator[](size_t n) const override;
void get(size_t n, Field& res) const override;
void update_hash_with_value(size_t n, SipHash& hash) const override;
ColumnPtr filter(const Filter&, ssize_t) const override;
Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override;
size_t filter(const Filter&) override;
ColumnPtr permute(const Permutation&, size_t) const override;
int compare_at(size_t n, size_t m, const IColumn& rhs, int nan_direction_hint) const override {
LOG(FATAL) << "should not call the method in column object";
return 0;
}
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
Permutation& res) const override {
LOG(FATAL) << "should not call the method in column object";
__builtin_unreachable();
}
bool is_variable_length() const override { return true; }
template <typename Func>
MutableColumnPtr apply_for_subcolumns(Func&& func) const;
ColumnPtr index(const IColumn& indexes, size_t limit) const override;
void for_each_imutable_subcolumn(ImutableColumnCallback callback) const;
// Extract path from root column and output to dst
Status extract_root(const PathInData& path, MutableColumnPtr& dst) const;
bool empty() const;
// Check if all columns and types are aligned, only in debug mode
Status sanitize() const;
std::string debug_string() const;
void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data = nullptr) const override;
void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const override;
void update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows,
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;
// Not implemented
MutableColumnPtr get_shrinked_column() override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_shrinked_column" + std::string(get_family_name()));
}
Int64 get_int(size_t /*n*/) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_int" + std::string(get_family_name()));
}
bool get_bool(size_t /*n*/) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_bool" + std::string(get_family_name()));
}
void insert_many_fix_len_data(const char* pos, size_t num) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_fix_len_data" + std::string(get_family_name()));
}
void insert_many_dict_data(const int32_t* data_array, size_t start_index, const StringRef* dict,
size_t data_num, uint32_t dict_num = 0) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_dict_data" + std::string(get_family_name()));
}
void insert_many_binary_data(char* data_array, uint32_t* len_array,
uint32_t* start_offset_array, size_t num) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_binary_data" + std::string(get_family_name()));
}
void insert_many_continuous_binary_data(const char* data, const uint32_t* offsets,
const size_t num) override {
throw doris::Exception(
ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_continuous_binary_data" + std::string(get_family_name()));
}
void insert_many_strings(const StringRef* strings, size_t num) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_strings" + std::string(get_family_name()));
}
void insert_many_strings_overflow(const StringRef* strings, size_t num,
size_t max_length) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_strings_overflow" + std::string(get_family_name()));
}
void insert_many_raw_data(const char* pos, size_t num) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_raw_data" + std::string(get_family_name()));
}
size_t get_max_row_byte_size() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_max_row_byte_size" + std::string(get_family_name()));
}
void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
size_t max_row_byte_size) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"serialize_vec" + std::string(get_family_name()));
}
void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows,
const uint8_t* null_map) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"serialize_vec_with_null_map" + std::string(get_family_name()));
}
void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"deserialize_vec" + std::string(get_family_name()));
}
void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const size_t num_rows,
const uint8_t* null_map) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"deserialize_vec_with_null_map" + std::string(get_family_name()));
}
Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) const {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"filter_by_selector" + std::string(get_family_name()));
}
bool structure_equals(const IColumn&) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"structure_equals" + std::string(get_family_name()));
}
StringRef get_raw_data() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_raw_data" + std::string(get_family_name()));
}
size_t size_of_value_if_fixed() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"size_of_value_if_fixed" + std::string(get_family_name()));
}
StringRef get_data_at(size_t) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_data_at" + std::string(get_family_name()));
}
StringRef serialize_value_into_arena(size_t n, Arena& arena,
char const*& begin) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"serialize_value_into_arena" + std::string(get_family_name()));
}
const char* deserialize_and_insert_from_arena(const char* pos) override {
throw doris::Exception(
ErrorCode::NOT_IMPLEMENTED_ERROR,
"deserialize_and_insert_from_arena" + std::string(get_family_name()));
}
void insert_data(const char* pos, size_t length) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_data" + std::string(get_family_name()));
}
void replace_column_data(const IColumn&, size_t row, size_t self_row) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"replace_column_data" + std::string(get_family_name()));
}
void replace_column_data_default(size_t self_row) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"replace_column_data_default" + std::string(get_family_name()));
}
void get_indices_of_non_default_rows(Offsets64& indices, size_t from,
size_t limit) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_indices_of_non_default_rows" + std::string(get_family_name()));
}
};
} // namespace doris::vectorized