[refactor] fix warings when compile with clang (#8069)
This commit is contained in:
@ -142,7 +142,7 @@ public:
|
||||
LOG(FATAL) << "get field not implemented";
|
||||
}
|
||||
|
||||
void insert_range_from(const IColumn& src, size_t start, size_t length) {
|
||||
void insert_range_from(const IColumn& src, size_t start, size_t length) override {
|
||||
auto& col = static_cast<const Self&>(src);
|
||||
auto& src_data = col.get_data();
|
||||
auto st = src_data.begin() + start;
|
||||
@ -150,7 +150,8 @@ public:
|
||||
data.insert(data.end(), st, ed);
|
||||
}
|
||||
|
||||
void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override {
|
||||
void insert_indices_from(const IColumn& src, const int* indices_begin,
|
||||
const int* indices_end) override {
|
||||
const Self& src_vec = assert_cast<const Self&>(src);
|
||||
data.reserve(size() + (indices_end - indices_begin));
|
||||
for (auto x = indices_begin; x != indices_end; ++x) {
|
||||
@ -158,27 +159,27 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void pop_back(size_t n) { data.erase(data.end() - n, data.end()); }
|
||||
void pop_back(size_t n) override { data.erase(data.end() - n, data.end()); }
|
||||
// it's impossable to use ComplexType as key , so we don't have to implemnt them
|
||||
[[noreturn]] StringRef serialize_value_into_arena(size_t n, Arena& arena,
|
||||
char const*& begin) const {
|
||||
char const*& begin) const override {
|
||||
LOG(FATAL) << "serialize_value_into_arena not implemented";
|
||||
}
|
||||
|
||||
[[noreturn]] const char* deserialize_and_insert_from_arena(const char* pos) {
|
||||
[[noreturn]] const char* deserialize_and_insert_from_arena(const char* pos) override {
|
||||
LOG(FATAL) << "deserialize_and_insert_from_arena not implemented";
|
||||
}
|
||||
|
||||
void update_hash_with_value(size_t n, SipHash& hash) const {
|
||||
void update_hash_with_value(size_t n, SipHash& hash) const override {
|
||||
// TODO add hash function
|
||||
}
|
||||
|
||||
[[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs,
|
||||
int nan_direction_hint) const {
|
||||
int nan_direction_hint) const override {
|
||||
LOG(FATAL) << "compare_at not implemented";
|
||||
}
|
||||
|
||||
void get_extremes(Field& min, Field& max) const {
|
||||
void get_extremes(Field& min, Field& max) const override {
|
||||
LOG(FATAL) << "get_extremes not implemented";
|
||||
}
|
||||
|
||||
@ -319,7 +320,8 @@ ColumnPtr ColumnComplexType<T>::replicate(const IColumn::Offsets& offsets) const
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const {
|
||||
void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t target_size,
|
||||
IColumn& column) const {
|
||||
size_t size = data.size();
|
||||
if (0 == size) return;
|
||||
|
||||
|
||||
@ -22,12 +22,12 @@
|
||||
|
||||
#include <cmath>
|
||||
|
||||
#include "olap/uint24.h"
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/columns/column_impl.h"
|
||||
#include "vec/columns/column_vector_helper.h"
|
||||
#include "vec/common/unaligned.h"
|
||||
#include "vec/core/field.h"
|
||||
#include "olap/uint24.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
@ -120,8 +120,9 @@ private:
|
||||
/// Sugar constructor.
|
||||
ColumnVector(std::initializer_list<T> il) : data {il} {}
|
||||
|
||||
void insert_res_column(const uint16_t* sel, size_t sel_size, vectorized::ColumnVector<T>* res_ptr) {
|
||||
auto& res_data = res_ptr->data;
|
||||
void insert_res_column(const uint16_t* sel, size_t sel_size,
|
||||
vectorized::ColumnVector<T>* res_ptr) {
|
||||
auto& res_data = res_ptr->data;
|
||||
DCHECK(res_data.empty());
|
||||
res_data.reserve(sel_size);
|
||||
T* t = (T*)res_data.get_end_ptr();
|
||||
@ -139,7 +140,7 @@ private:
|
||||
}
|
||||
res_val_ptr += num;
|
||||
data.set_end_ptr(res_val_ptr);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
bool is_numeric() const override { return IsNumber<T>; }
|
||||
@ -165,7 +166,7 @@ public:
|
||||
res_ptr += num * sizeof(T);
|
||||
data.set_end_ptr(res_ptr);
|
||||
}
|
||||
|
||||
|
||||
void insert_date_column(const char* data_ptr, size_t num) {
|
||||
size_t value_size = sizeof(uint24_t);
|
||||
for (int i = 0; i < num; i++) {
|
||||
@ -181,7 +182,7 @@ public:
|
||||
data.push_back_without_reserve(date);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void insert_datetime_column(const char* data_ptr, size_t num) {
|
||||
size_t value_size = sizeof(uint64_t);
|
||||
for (int i = 0; i < num; i++) {
|
||||
@ -191,7 +192,7 @@ public:
|
||||
data.push_back_without_reserve(date);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
use by date, datetime, basic type
|
||||
*/
|
||||
@ -270,7 +271,8 @@ public:
|
||||
|
||||
void insert_range_from(const IColumn& src, size_t start, size_t length) override;
|
||||
|
||||
void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override;
|
||||
void insert_indices_from(const IColumn& src, const int* indices_begin,
|
||||
const int* indices_end) override;
|
||||
|
||||
void fill(const value_type& element, size_t num) {
|
||||
auto old_size = data.size();
|
||||
|
||||
@ -17,14 +17,13 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/columns/column_impl.h"
|
||||
|
||||
#include "runtime/string_value.h"
|
||||
#include "olap/decimal12.h"
|
||||
#include "olap/uint24.h"
|
||||
#include "vec/columns/column_string.h"
|
||||
#include "runtime/string_value.h"
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/columns/column_decimal.h"
|
||||
#include "vec/columns/column_impl.h"
|
||||
#include "vec/columns/column_string.h"
|
||||
#include "vec/columns/column_vector.h"
|
||||
#include "vec/core/types.h"
|
||||
|
||||
@ -32,7 +31,7 @@ namespace doris::vectorized {
|
||||
|
||||
/**
|
||||
* used to keep predicate column in storage layer
|
||||
*
|
||||
*
|
||||
* T = predicate column type
|
||||
*/
|
||||
template <typename T>
|
||||
@ -56,7 +55,8 @@ private:
|
||||
return value;
|
||||
}
|
||||
|
||||
void insert_date_to_res_column(const uint16_t* sel, size_t sel_size, vectorized::ColumnVector<Int64>* res_ptr) {
|
||||
void insert_date_to_res_column(const uint16_t* sel, size_t sel_size,
|
||||
vectorized::ColumnVector<Int64>* res_ptr) {
|
||||
for (size_t i = 0; i < sel_size; i++) {
|
||||
VecDateTimeValue date;
|
||||
date.from_olap_date(get_date_at(sel[i]));
|
||||
@ -64,7 +64,8 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
void insert_datetime_to_res_column(const uint16_t* sel, size_t sel_size, vectorized::ColumnVector<Int64>* res_ptr) {
|
||||
void insert_datetime_to_res_column(const uint16_t* sel, size_t sel_size,
|
||||
vectorized::ColumnVector<Int64>* res_ptr) {
|
||||
for (size_t i = 0; i < sel_size; i++) {
|
||||
uint64_t value = data[sel[i]];
|
||||
vectorized::VecDateTimeValue date(value);
|
||||
@ -72,7 +73,8 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
void insert_string_to_res_column(const uint16_t* sel, size_t sel_size, vectorized::ColumnString* res_ptr) {
|
||||
void insert_string_to_res_column(const uint16_t* sel, size_t sel_size,
|
||||
vectorized::ColumnString* res_ptr) {
|
||||
for (size_t i = 0; i < sel_size; i++) {
|
||||
uint16_t n = sel[i];
|
||||
auto& sv = reinterpret_cast<StringValue&>(data[n]);
|
||||
@ -80,7 +82,8 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
void insert_decimal_to_res_column(const uint16_t* sel, size_t sel_size, vectorized::ColumnDecimal<Decimal128>* res_ptr) {
|
||||
void insert_decimal_to_res_column(const uint16_t* sel, size_t sel_size,
|
||||
vectorized::ColumnDecimal<Decimal128>* res_ptr) {
|
||||
for (size_t i = 0; i < sel_size; i++) {
|
||||
uint16_t n = sel[i];
|
||||
auto& dv = reinterpret_cast<const decimal12_t&>(data[n]);
|
||||
@ -90,7 +93,8 @@ private:
|
||||
}
|
||||
|
||||
template <typename Y>
|
||||
void insert_default_value_res_column(const uint16_t* sel, size_t sel_size, vectorized::ColumnVector<Y>* res_ptr) {
|
||||
void insert_default_value_res_column(const uint16_t* sel, size_t sel_size,
|
||||
vectorized::ColumnVector<Y>* res_ptr) {
|
||||
static_assert(std::is_same_v<T, Y>);
|
||||
auto& res_data = res_ptr->get_data();
|
||||
DCHECK(res_data.empty());
|
||||
@ -102,7 +106,8 @@ private:
|
||||
res_data.set_end_ptr(y + sel_size);
|
||||
}
|
||||
|
||||
void insert_byte_to_res_column(const uint16_t* sel, size_t sel_size, vectorized::IColumn* res_ptr) {
|
||||
void insert_byte_to_res_column(const uint16_t* sel, size_t sel_size,
|
||||
vectorized::IColumn* res_ptr) {
|
||||
for (size_t i = 0; i < sel_size; i++) {
|
||||
uint16_t n = sel[i];
|
||||
char* ch_val = reinterpret_cast<char*>(&data[n]);
|
||||
@ -120,7 +125,7 @@ private:
|
||||
res_val_ptr += num;
|
||||
data.set_end_ptr(res_val_ptr);
|
||||
}
|
||||
|
||||
|
||||
void insert_many_in_copy_way(const char* data_ptr, size_t num) {
|
||||
char* res_ptr = (char*)data.get_end_ptr();
|
||||
memcpy(res_ptr, data_ptr, num * sizeof(T));
|
||||
@ -139,20 +144,21 @@ public:
|
||||
|
||||
size_t size() const override { return data.size(); }
|
||||
|
||||
[[noreturn]] StringRef get_data_at(size_t n) const override {
|
||||
LOG(FATAL) << "get_data_at not supported in PredicateColumnType";
|
||||
[[noreturn]] StringRef get_data_at(size_t n) const override {
|
||||
LOG(FATAL) << "get_data_at not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
void insert_from(const IColumn& src, size_t n) override {
|
||||
LOG(FATAL) << "insert_from not supported in PredicateColumnType";
|
||||
LOG(FATAL) << "insert_from not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
void insert_range_from(const IColumn& src, size_t start, size_t length) override {
|
||||
LOG(FATAL) << "insert_range_from not supported in PredicateColumnType";
|
||||
LOG(FATAL) << "insert_range_from not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override {
|
||||
LOG(FATAL) << "insert_indices_from not supported in PredicateColumnType";
|
||||
void insert_indices_from(const IColumn& src, const int* indices_begin,
|
||||
const int* indices_end) override {
|
||||
LOG(FATAL) << "insert_indices_from not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
void pop_back(size_t n) override {
|
||||
@ -160,7 +166,7 @@ public:
|
||||
}
|
||||
|
||||
void update_hash_with_value(size_t n, SipHash& hash) const override {
|
||||
LOG(FATAL) << "update_hash_with_value not supported in PredicateColumnType";
|
||||
LOG(FATAL) << "update_hash_with_value not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
void insert_string_value(char* data_ptr, size_t length) {
|
||||
@ -181,7 +187,7 @@ public:
|
||||
memcpy(&val, data_ptr, sizeof(val));
|
||||
data.push_back_without_reserve(val);
|
||||
}
|
||||
|
||||
|
||||
void insert_default_type(char* data_ptr, size_t length) {
|
||||
T* val = (T*)data_ptr;
|
||||
data.push_back_without_reserve(*val);
|
||||
@ -191,13 +197,13 @@ public:
|
||||
char* ch = const_cast<char*>(data_ptr);
|
||||
if constexpr (std::is_same_v<T, StringValue>) {
|
||||
insert_string_value(ch, length);
|
||||
} else if constexpr (std::is_same_v<T, decimal12_t>) {
|
||||
} else if constexpr (std::is_same_v<T, decimal12_t>) {
|
||||
insert_decimal_value(ch, length);
|
||||
} else if constexpr (std::is_same_v<T, doris::vectorized::Int128>) {
|
||||
} else if constexpr (std::is_same_v<T, doris::vectorized::Int128>) {
|
||||
insert_in_copy_way(ch, length);
|
||||
} else {
|
||||
} else {
|
||||
insert_default_type(ch, length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void insert_many_fix_len_data(const char* data_ptr, size_t num) override {
|
||||
@ -211,9 +217,10 @@ public:
|
||||
insert_many_default_type(data_ptr, num);
|
||||
}
|
||||
}
|
||||
|
||||
void insert_many_dict_data(const int32_t* data_array, size_t start_index, const uint32_t* start_offset_array,
|
||||
const uint32_t* len_array, char* dict_data, size_t num) override {
|
||||
|
||||
void insert_many_dict_data(const int32_t* data_array, size_t start_index,
|
||||
const uint32_t* start_offset_array, const uint32_t* len_array,
|
||||
char* dict_data, size_t num) override {
|
||||
if constexpr (std::is_same_v<T, StringValue>) {
|
||||
for (int i = 0; i < num; i++, start_index++) {
|
||||
int32_t codeword = data_array[start_index];
|
||||
@ -223,8 +230,9 @@ public:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void insert_many_binary_data(char* data_array, uint32_t* len_array, uint32_t* start_offset_array, size_t num) override {
|
||||
|
||||
void insert_many_binary_data(char* data_array, uint32_t* len_array,
|
||||
uint32_t* start_offset_array, size_t num) override {
|
||||
if constexpr (std::is_same_v<T, StringValue>) {
|
||||
for (size_t i = 0; i < num; i++) {
|
||||
uint32_t len = len_array[i];
|
||||
@ -234,34 +242,28 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void insert_default() override {
|
||||
data.push_back(T());
|
||||
}
|
||||
void insert_default() override { data.push_back(T()); }
|
||||
|
||||
void clear() override { data.clear(); }
|
||||
|
||||
size_t byte_size() const override {
|
||||
return data.size() * sizeof(T);
|
||||
}
|
||||
size_t byte_size() const override { return data.size() * sizeof(T); }
|
||||
|
||||
size_t allocated_bytes() const override { return byte_size(); }
|
||||
|
||||
void protect() override {}
|
||||
|
||||
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
|
||||
IColumn::Permutation& res) const override {
|
||||
IColumn::Permutation& res) const override {
|
||||
LOG(FATAL) << "get_permutation not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
void reserve(size_t n) override {
|
||||
data.reserve(n);
|
||||
}
|
||||
void reserve(size_t n) override { data.reserve(n); }
|
||||
|
||||
[[noreturn]] const char* get_family_name() const override {
|
||||
[[noreturn]] const char* get_family_name() const override {
|
||||
LOG(FATAL) << "get_family_name not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
[[noreturn]] MutableColumnPtr clone_resized(size_t size) const override {
|
||||
[[noreturn]] MutableColumnPtr clone_resized(size_t size) const override {
|
||||
LOG(FATAL) << "clone_resized not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
@ -299,20 +301,20 @@ public:
|
||||
|
||||
// it's impossable to use ComplexType as key , so we don't have to implemnt them
|
||||
[[noreturn]] StringRef serialize_value_into_arena(size_t n, Arena& arena,
|
||||
char const*& begin) const {
|
||||
char const*& begin) const override {
|
||||
LOG(FATAL) << "serialize_value_into_arena not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
[[noreturn]] const char* deserialize_and_insert_from_arena(const char* pos) {
|
||||
[[noreturn]] const char* deserialize_and_insert_from_arena(const char* pos) override {
|
||||
LOG(FATAL) << "deserialize_and_insert_from_arena not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
[[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs,
|
||||
int nan_direction_hint) const {
|
||||
int nan_direction_hint) const override {
|
||||
LOG(FATAL) << "compare_at not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
void get_extremes(Field& min, Field& max) const {
|
||||
void get_extremes(Field& min, Field& max) const override {
|
||||
LOG(FATAL) << "get_extremes not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
@ -326,15 +328,16 @@ public:
|
||||
}
|
||||
|
||||
[[noreturn]] bool structure_equals(const IColumn& rhs) const override {
|
||||
LOG(FATAL) << "structure_equals not supported in PredicateColumnType";
|
||||
LOG(FATAL) << "structure_equals not supported in PredicateColumnType";
|
||||
}
|
||||
|
||||
[[noreturn]] ColumnPtr filter(const IColumn::Filter& filt, ssize_t result_size_hint) const override {
|
||||
LOG(FATAL) << "filter not supported in PredicateColumnType";
|
||||
[[noreturn]] ColumnPtr filter(const IColumn::Filter& filt,
|
||||
ssize_t result_size_hint) const override {
|
||||
LOG(FATAL) << "filter not supported in PredicateColumnType";
|
||||
};
|
||||
|
||||
[[noreturn]] ColumnPtr permute(const IColumn::Permutation& perm, size_t limit) const override {
|
||||
LOG(FATAL) << "permute not supported in PredicateColumnType";
|
||||
[[noreturn]] ColumnPtr permute(const IColumn::Permutation& perm, size_t limit) const override {
|
||||
LOG(FATAL) << "permute not supported in PredicateColumnType";
|
||||
};
|
||||
|
||||
Container& get_data() { return data; }
|
||||
@ -352,27 +355,49 @@ public:
|
||||
|
||||
Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override {
|
||||
if constexpr (std::is_same_v<T, StringValue>) {
|
||||
insert_string_to_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnString*>(col_ptr));
|
||||
insert_string_to_res_column(sel, sel_size,
|
||||
reinterpret_cast<vectorized::ColumnString*>(col_ptr));
|
||||
} else if constexpr (std::is_same_v<T, decimal12_t>) {
|
||||
insert_decimal_to_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnDecimal<Decimal128>*>(col_ptr));
|
||||
insert_decimal_to_res_column(
|
||||
sel, sel_size,
|
||||
reinterpret_cast<vectorized::ColumnDecimal<Decimal128>*>(col_ptr));
|
||||
} else if constexpr (std::is_same_v<T, doris::vectorized::Int8>) {
|
||||
insert_default_value_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Int8>*>(col_ptr));
|
||||
insert_default_value_res_column(
|
||||
sel, sel_size,
|
||||
reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Int8>*>(col_ptr));
|
||||
} else if constexpr (std::is_same_v<T, doris::vectorized::Int16>) {
|
||||
insert_default_value_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Int16>*>(col_ptr));
|
||||
insert_default_value_res_column(
|
||||
sel, sel_size,
|
||||
reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Int16>*>(col_ptr));
|
||||
} else if constexpr (std::is_same_v<T, doris::vectorized::Int32>) {
|
||||
insert_default_value_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Int32>*>(col_ptr));
|
||||
insert_default_value_res_column(
|
||||
sel, sel_size,
|
||||
reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Int32>*>(col_ptr));
|
||||
} else if constexpr (std::is_same_v<T, doris::vectorized::Int64>) {
|
||||
insert_default_value_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Int64>*>(col_ptr));
|
||||
insert_default_value_res_column(
|
||||
sel, sel_size,
|
||||
reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Int64>*>(col_ptr));
|
||||
} else if constexpr (std::is_same_v<T, doris::vectorized::Float32>) {
|
||||
insert_default_value_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Float32>*>(col_ptr));
|
||||
insert_default_value_res_column(
|
||||
sel, sel_size,
|
||||
reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Float32>*>(
|
||||
col_ptr));
|
||||
} else if constexpr (std::is_same_v<T, doris::vectorized::Float64>) {
|
||||
insert_default_value_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Float64>*>(col_ptr));
|
||||
insert_default_value_res_column(
|
||||
sel, sel_size,
|
||||
reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Float64>*>(
|
||||
col_ptr));
|
||||
} else if constexpr (std::is_same_v<T, uint64_t>) {
|
||||
insert_datetime_to_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnVector<Int64>*>(col_ptr));
|
||||
insert_datetime_to_res_column(
|
||||
sel, sel_size, reinterpret_cast<vectorized::ColumnVector<Int64>*>(col_ptr));
|
||||
} else if constexpr (std::is_same_v<T, uint24_t>) {
|
||||
insert_date_to_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnVector<Int64>*>(col_ptr));
|
||||
insert_date_to_res_column(sel, sel_size,
|
||||
reinterpret_cast<vectorized::ColumnVector<Int64>*>(col_ptr));
|
||||
} else if constexpr (std::is_same_v<T, doris::vectorized::Int128>) {
|
||||
insert_default_value_res_column(sel, sel_size, reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Int128>*>(col_ptr));
|
||||
insert_default_value_res_column(
|
||||
sel, sel_size,
|
||||
reinterpret_cast<vectorized::ColumnVector<doris::vectorized::Int128>*>(
|
||||
col_ptr));
|
||||
} else if (std::is_same_v<T, bool>) {
|
||||
insert_byte_to_res_column(sel, sel_size, col_ptr);
|
||||
} else {
|
||||
@ -381,7 +406,6 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
void replace_column_data(const IColumn&, size_t row, size_t self_row = 0) override {
|
||||
LOG(FATAL) << "should not call replace_column_data in predicate column";
|
||||
}
|
||||
@ -395,4 +419,4 @@ private:
|
||||
};
|
||||
using ColumnStringValue = PredicateColumnType<StringValue>;
|
||||
|
||||
} // namespace
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -24,7 +24,6 @@
|
||||
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
|
||||
#include <initializer_list>
|
||||
|
||||
|
||||
/** Copy-on-write shared ptr.
|
||||
* Allows to work with shared immutable objects and sometimes unshare and mutate you own unique copy.
|
||||
*
|
||||
@ -101,13 +100,9 @@ protected:
|
||||
|
||||
COW(COW const&) : ref_counter(0) {}
|
||||
|
||||
COW& operator=(COW const&) {
|
||||
return *this;
|
||||
}
|
||||
COW& operator=(COW const&) { return *this; }
|
||||
|
||||
void add_ref() {
|
||||
++ref_counter;
|
||||
}
|
||||
void add_ref() { ++ref_counter; }
|
||||
|
||||
void release_ref() {
|
||||
if (--ref_counter == 0) {
|
||||
@ -124,7 +119,7 @@ protected:
|
||||
public:
|
||||
intrusive_ptr() : t(nullptr) {}
|
||||
|
||||
intrusive_ptr(T* t, bool add_ref=true) : t(t) {
|
||||
intrusive_ptr(T* t, bool add_ref = true) : t(t) {
|
||||
if (t && add_ref) ((std::remove_const_t<T>*)t)->add_ref();
|
||||
}
|
||||
|
||||
@ -146,27 +141,33 @@ protected:
|
||||
intrusive_ptr(rhs).swap(*this);
|
||||
return *this;
|
||||
}
|
||||
|
||||
#if defined(__clang__)
|
||||
#pragma clang diagnostic push
|
||||
#pragma clang diagnostic ignored "-Wuninitialized"
|
||||
#elif defined(__GNUC__) || defined(__GNUG__)
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
|
||||
intrusive_ptr(intrusive_ptr&& rhs) : t(rhs.t) {
|
||||
rhs.t = nullptr;
|
||||
}
|
||||
#endif
|
||||
intrusive_ptr(intrusive_ptr&& rhs) : t(rhs.t) { rhs.t = nullptr; }
|
||||
#if defined(__clang__)
|
||||
#pragma clang diagnostic pop
|
||||
#elif defined(__GNUC__) || defined(__GNUG__)
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
#endif
|
||||
intrusive_ptr& operator=(intrusive_ptr&& rhs) {
|
||||
intrusive_ptr(static_cast<intrusive_ptr&&>(rhs)).swap(*this);
|
||||
return *this;
|
||||
}
|
||||
|
||||
template<class U> friend class intrusive_ptr;
|
||||
template <class U>
|
||||
friend class intrusive_ptr;
|
||||
|
||||
template<class U>
|
||||
template <class U>
|
||||
intrusive_ptr(intrusive_ptr<U>&& rhs) : t(rhs.t) {
|
||||
rhs.t = nullptr;
|
||||
}
|
||||
|
||||
template<class U>
|
||||
template <class U>
|
||||
intrusive_ptr& operator=(intrusive_ptr<U>&& rhs) {
|
||||
intrusive_ptr(static_cast<intrusive_ptr<U>&&>(rhs)).swap(*this);
|
||||
return *this;
|
||||
@ -182,21 +183,13 @@ protected:
|
||||
return *this;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
intrusive_ptr().swap(*this);
|
||||
}
|
||||
void reset() { intrusive_ptr().swap(*this); }
|
||||
|
||||
void reset(T* rhs) {
|
||||
intrusive_ptr(rhs).swap(*this);
|
||||
}
|
||||
void reset(T* rhs) { intrusive_ptr(rhs).swap(*this); }
|
||||
|
||||
void reset(T* rhs, bool add_ref) {
|
||||
intrusive_ptr(rhs, add_ref).swap(*this);
|
||||
}
|
||||
void reset(T* rhs, bool add_ref) { intrusive_ptr(rhs, add_ref).swap(*this); }
|
||||
|
||||
T* get() const {
|
||||
return t;
|
||||
}
|
||||
T* get() const { return t; }
|
||||
|
||||
T* detach() {
|
||||
T* ret = t;
|
||||
@ -210,25 +203,15 @@ protected:
|
||||
rhs.t = tmp;
|
||||
}
|
||||
|
||||
T& operator*() const& {
|
||||
return *t;
|
||||
}
|
||||
T& operator*() const& { return *t; }
|
||||
|
||||
T&& operator*() const&& {
|
||||
return const_cast<std::remove_const_t<T>&&>(*t);
|
||||
}
|
||||
T&& operator*() const&& { return const_cast<std::remove_const_t<T>&&>(*t); }
|
||||
|
||||
T* operator->() const {
|
||||
return t;
|
||||
}
|
||||
T* operator->() const { return t; }
|
||||
|
||||
operator bool() const {
|
||||
return t != nullptr;
|
||||
}
|
||||
operator bool() const { return t != nullptr; }
|
||||
|
||||
operator T*() const {
|
||||
return t;
|
||||
}
|
||||
operator T*() const { return t; }
|
||||
|
||||
private:
|
||||
T* t;
|
||||
@ -240,10 +223,13 @@ protected:
|
||||
private:
|
||||
using Base = intrusive_ptr<T>;
|
||||
|
||||
template <typename> friend class COW;
|
||||
template <typename, typename> friend class COWHelper;
|
||||
template <typename>
|
||||
friend class COW;
|
||||
template <typename, typename>
|
||||
friend class COWHelper;
|
||||
|
||||
explicit mutable_ptr(T* ptr) : Base(ptr) {}
|
||||
|
||||
public:
|
||||
/// Copy: not possible.
|
||||
mutable_ptr(const mutable_ptr&) = delete;
|
||||
@ -264,9 +250,7 @@ protected:
|
||||
public:
|
||||
using MutablePtr = mutable_ptr<Derived>;
|
||||
|
||||
unsigned int use_count() const {
|
||||
return ref_counter.load();
|
||||
}
|
||||
unsigned int use_count() const { return ref_counter.load(); }
|
||||
|
||||
protected:
|
||||
template <typename T>
|
||||
@ -274,10 +258,13 @@ protected:
|
||||
private:
|
||||
using Base = intrusive_ptr<const T>;
|
||||
|
||||
template <typename> friend class COW;
|
||||
template <typename, typename> friend class COWHelper;
|
||||
template <typename>
|
||||
friend class COW;
|
||||
template <typename, typename>
|
||||
friend class COWHelper;
|
||||
|
||||
explicit immutable_ptr(const T* ptr) : Base(ptr) {}
|
||||
|
||||
public:
|
||||
/// Copy from immutable ptr: ok.
|
||||
immutable_ptr(const immutable_ptr&) = default;
|
||||
|
||||
@ -99,16 +99,16 @@ inline DataTypePtr create_data_type(const PColumnMeta& pcolumn_meta) {
|
||||
return std::make_shared<DataTypeDateTime>();
|
||||
}
|
||||
case PGenericType::DECIMAL32: {
|
||||
return std::make_shared<DataTypeDecimal<Decimal32>>(pcolumn_meta.decimal_param().precision(),
|
||||
pcolumn_meta.decimal_param().scale());
|
||||
return std::make_shared<DataTypeDecimal<Decimal32>>(
|
||||
pcolumn_meta.decimal_param().precision(), pcolumn_meta.decimal_param().scale());
|
||||
}
|
||||
case PGenericType::DECIMAL64: {
|
||||
return std::make_shared<DataTypeDecimal<Decimal64>>(pcolumn_meta.decimal_param().precision(),
|
||||
pcolumn_meta.decimal_param().scale());
|
||||
return std::make_shared<DataTypeDecimal<Decimal64>>(
|
||||
pcolumn_meta.decimal_param().precision(), pcolumn_meta.decimal_param().scale());
|
||||
}
|
||||
case PGenericType::DECIMAL128: {
|
||||
return std::make_shared<DataTypeDecimal<Decimal128>>(pcolumn_meta.decimal_param().precision(),
|
||||
pcolumn_meta.decimal_param().scale());
|
||||
return std::make_shared<DataTypeDecimal<Decimal128>>(
|
||||
pcolumn_meta.decimal_param().precision(), pcolumn_meta.decimal_param().scale());
|
||||
}
|
||||
case PGenericType::BITMAP: {
|
||||
return std::make_shared<DataTypeBitMap>();
|
||||
@ -136,14 +136,16 @@ Block::Block(const PBlock& pblock) {
|
||||
const char* buf = nullptr;
|
||||
std::string compression_scratch;
|
||||
if (pblock.compressed()) {
|
||||
// Decompress
|
||||
// Decompress
|
||||
const char* compressed_data = pblock.column_values().c_str();
|
||||
size_t compressed_size = pblock.column_values().size();
|
||||
size_t uncompressed_size = 0;
|
||||
bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
|
||||
bool success =
|
||||
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
|
||||
DCHECK(success) << "snappy::GetUncompressedLength failed";
|
||||
compression_scratch.resize(uncompressed_size);
|
||||
success = snappy::RawUncompress(compressed_data, compressed_size, compression_scratch.data());
|
||||
success =
|
||||
snappy::RawUncompress(compressed_data, compressed_size, compression_scratch.data());
|
||||
DCHECK(success) << "snappy::RawUncompress failed";
|
||||
buf = compression_scratch.data();
|
||||
} else {
|
||||
@ -154,14 +156,14 @@ Block::Block(const PBlock& pblock) {
|
||||
DataTypePtr type = create_data_type(pcol_meta);
|
||||
MutableColumnPtr data_column;
|
||||
if (pcol_meta.is_nullable()) {
|
||||
data_column = ColumnNullable::create(std::move(type->create_column()), ColumnUInt8::create());
|
||||
data_column = ColumnNullable::create(type->create_column(), ColumnUInt8::create());
|
||||
type = make_nullable(type);
|
||||
} else {
|
||||
data_column = type->create_column();
|
||||
}
|
||||
buf = type->deserialize(buf, data_column.get());
|
||||
data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
|
||||
}
|
||||
}
|
||||
initialize_index_by_name();
|
||||
}
|
||||
|
||||
@ -704,8 +706,8 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes,
|
||||
size_t* compressed_bytes, std::string* allocated_buf) const {
|
||||
Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes,
|
||||
std::string* allocated_buf) const {
|
||||
// calc uncompressed size for allocation
|
||||
size_t content_uncompressed_size = 0;
|
||||
for (const auto& c : *this) {
|
||||
@ -722,7 +724,8 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes,
|
||||
for (const auto& c : *this) {
|
||||
buf = c.type->serialize(*(c.column), buf);
|
||||
}
|
||||
CHECK(content_uncompressed_size == (buf - start_buf)) << content_uncompressed_size << " vs. " << (buf - start_buf);
|
||||
CHECK(content_uncompressed_size == (buf - start_buf))
|
||||
<< content_uncompressed_size << " vs. " << (buf - start_buf);
|
||||
*uncompressed_bytes = content_uncompressed_size;
|
||||
|
||||
// compress
|
||||
@ -735,7 +738,8 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes,
|
||||
|
||||
size_t compressed_size = 0;
|
||||
char* compressed_output = compression_scratch.data();
|
||||
snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, compressed_output, &compressed_size);
|
||||
snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, compressed_output,
|
||||
&compressed_size);
|
||||
|
||||
if (LIKELY(compressed_size < content_uncompressed_size)) {
|
||||
compression_scratch.resize(compressed_size);
|
||||
@ -746,13 +750,13 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes,
|
||||
*compressed_bytes = content_uncompressed_size;
|
||||
}
|
||||
|
||||
VLOG_ROW << "uncompressed size: " << content_uncompressed_size << ", compressed size: " << compressed_size;
|
||||
VLOG_ROW << "uncompressed size: " << content_uncompressed_size
|
||||
<< ", compressed size: " << compressed_size;
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
void Block::serialize(RowBatch* output_batch, const RowDescriptor& row_desc) {
|
||||
auto num_rows = rows();
|
||||
auto mem_pool = output_batch->tuple_data_pool();
|
||||
|
||||
@ -67,10 +67,12 @@ public:
|
||||
|
||||
bool can_be_inside_low_cardinality() const override { return false; }
|
||||
|
||||
std::string to_string(const IColumn& column, size_t row_num) const { return "BitMap()"; }
|
||||
void to_string(const IColumn &column, size_t row_num, BufferWritable &ostr) const override;
|
||||
std::string to_string(const IColumn& column, size_t row_num) const override {
|
||||
return "BitMap()";
|
||||
}
|
||||
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
|
||||
|
||||
[[noreturn]] virtual Field get_default() const {
|
||||
[[noreturn]] virtual Field get_default() const override {
|
||||
LOG(FATAL) << "Method get_default() is not implemented for data type " << get_name();
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
@ -34,7 +34,7 @@ public:
|
||||
bool can_be_inside_nullable() const override { return true; }
|
||||
|
||||
bool equals(const IDataType& rhs) const override;
|
||||
std::string to_string(const IColumn& column, size_t row_num) const;
|
||||
std::string to_string(const IColumn& column, size_t row_num) const override;
|
||||
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
|
||||
|
||||
static void cast_to_date(Int64& x);
|
||||
|
||||
@ -60,7 +60,7 @@ public:
|
||||
|
||||
bool equals(const IDataType& rhs) const override;
|
||||
|
||||
std::string to_string(const IColumn& column, size_t row_num) const;
|
||||
std::string to_string(const IColumn& column, size_t row_num) const override;
|
||||
|
||||
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
|
||||
|
||||
|
||||
@ -143,8 +143,8 @@ public:
|
||||
bool is_summable() const override { return true; }
|
||||
bool can_be_used_in_boolean_context() const override { return true; }
|
||||
bool can_be_inside_nullable() const override { return true; }
|
||||
std::string to_string(const IColumn& column, size_t row_num) const;
|
||||
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const;
|
||||
std::string to_string(const IColumn& column, size_t row_num) const override;
|
||||
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
|
||||
|
||||
/// Decimal specific
|
||||
|
||||
@ -330,19 +330,9 @@ convert_to_decimal(const typename FromDataType::FieldType& value, UInt32 scale)
|
||||
}
|
||||
|
||||
auto out = value * ToDataType::get_scale_multiplier(scale);
|
||||
if constexpr (std::is_same_v<ToNativeType, Int128>) {
|
||||
static constexpr __int128 min_int128 = __int128(0x8000000000000000ll) << 64;
|
||||
static constexpr __int128 max_int128 =
|
||||
(__int128(0x7fffffffffffffffll) << 64) + 0xffffffffffffffffll;
|
||||
if (out <= static_cast<ToNativeType>(min_int128) ||
|
||||
out >= static_cast<ToNativeType>(max_int128)) {
|
||||
LOG(FATAL) << "Decimal convert overflow. Float is out of Decimal range";
|
||||
}
|
||||
} else {
|
||||
if (out <= std::numeric_limits<ToNativeType>::min() ||
|
||||
out >= std::numeric_limits<ToNativeType>::max()) {
|
||||
LOG(FATAL) << "Decimal convert overflow. Float is out of Decimal range";
|
||||
}
|
||||
if (out <= static_cast<FromFieldType>(std::numeric_limits<ToNativeType>::min()) ||
|
||||
out >= static_cast<FromFieldType>(std::numeric_limits<ToNativeType>::max())) {
|
||||
LOG(FATAL) << "Decimal convert overflow. Float is out of Decimal range";
|
||||
}
|
||||
return out;
|
||||
} else {
|
||||
|
||||
@ -65,12 +65,13 @@ public:
|
||||
|
||||
bool can_be_inside_low_cardinality() const override { return false; }
|
||||
|
||||
std::string to_string(const IColumn& column, size_t row_num) const { return "HLL()"; }
|
||||
std::string to_string(const IColumn& column, size_t row_num) const override { return "HLL()"; }
|
||||
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
|
||||
|
||||
[[noreturn]] virtual Field get_default() const {
|
||||
virtual Field get_default() const override {
|
||||
LOG(FATAL) << "Method get_default() is not implemented for data type " << get_name();
|
||||
__builtin_unreachable();
|
||||
// unreachable
|
||||
return String();
|
||||
}
|
||||
|
||||
static void serialize_as_stream(const HyperLogLog& value, BufferWritable& buf);
|
||||
|
||||
@ -80,7 +80,7 @@ public:
|
||||
bool can_be_inside_low_cardinality() const override {
|
||||
return nested_data_type->can_be_inside_low_cardinality();
|
||||
}
|
||||
std::string to_string(const IColumn& column, size_t row_num) const;
|
||||
std::string to_string(const IColumn& column, size_t row_num) const override;
|
||||
|
||||
const DataTypePtr& get_nested_type() const { return nested_data_type; }
|
||||
|
||||
|
||||
@ -64,8 +64,8 @@ public:
|
||||
bool is_categorial() const override { return is_value_represented_by_integer(); }
|
||||
bool can_be_inside_low_cardinality() const override { return true; }
|
||||
|
||||
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const;
|
||||
std::string to_string(const IColumn& column, size_t row_num) const;
|
||||
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
|
||||
std::string to_string(const IColumn& column, size_t row_num) const override;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -56,8 +56,8 @@ public:
|
||||
bool is_categorial() const override { return true; }
|
||||
bool can_be_inside_nullable() const override { return true; }
|
||||
bool can_be_inside_low_cardinality() const override { return true; }
|
||||
std::string to_string(const IColumn& column, size_t row_num) const;
|
||||
void to_string(const IColumn &column, size_t row_num, BufferWritable &ostr) const override;
|
||||
std::string to_string(const IColumn& column, size_t row_num) const override;
|
||||
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -147,12 +147,12 @@ public:
|
||||
HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~HashJoinNode() override;
|
||||
|
||||
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
|
||||
virtual Status prepare(RuntimeState* state);
|
||||
virtual Status open(RuntimeState* state);
|
||||
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
|
||||
virtual Status get_next(RuntimeState* state, Block* block, bool* eos);
|
||||
virtual Status close(RuntimeState* state);
|
||||
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
|
||||
virtual Status prepare(RuntimeState* state) override;
|
||||
virtual Status open(RuntimeState* state) override;
|
||||
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
|
||||
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
||||
virtual Status close(RuntimeState* state) override;
|
||||
HashTableVariants& get_hash_table_variants() { return _hash_table_variants; }
|
||||
void init_join_op();
|
||||
|
||||
@ -240,13 +240,13 @@ private:
|
||||
void _hash_table_init();
|
||||
|
||||
template <class HashTableContext, bool ignore_null, bool build_unique>
|
||||
friend class ProcessHashTableBuild;
|
||||
friend struct ProcessHashTableBuild;
|
||||
|
||||
template <class HashTableContext, class JoinOpType, bool ignore_null>
|
||||
friend class ProcessHashTableProbe;
|
||||
friend struct ProcessHashTableProbe;
|
||||
|
||||
template <class HashTableContext>
|
||||
friend class ProcessRuntimeFilterBuild;
|
||||
friend struct ProcessRuntimeFilterBuild;
|
||||
|
||||
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
|
||||
std::unordered_map<const Block*, std::vector<int>> _inserted_rows;
|
||||
|
||||
@ -59,7 +59,7 @@ struct AggregationMethodSerialized {
|
||||
using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped>;
|
||||
|
||||
static void insert_key_into_columns(const StringRef& key, MutableColumns& key_columns,
|
||||
const Sizes&) {
|
||||
const Sizes&) {
|
||||
auto pos = key.data;
|
||||
for (auto& column : key_columns) pos = column->deserialize_and_insert_from_arena(pos);
|
||||
}
|
||||
@ -77,10 +77,8 @@ using AggregatedDataWithStringKey = HashMapWithSavedHash<StringRef, AggregateDat
|
||||
|
||||
/// For the case where there is one numeric key.
|
||||
/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
|
||||
template <typename FieldType, typename TData,
|
||||
bool consecutive_keys_optimization = true>
|
||||
struct AggregationMethodOneNumber
|
||||
{
|
||||
template <typename FieldType, typename TData, bool consecutive_keys_optimization = true>
|
||||
struct AggregationMethodOneNumber {
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
@ -93,16 +91,17 @@ struct AggregationMethodOneNumber
|
||||
AggregationMethodOneNumber() = default;
|
||||
|
||||
template <typename Other>
|
||||
AggregationMethodOneNumber(const Other & other) : data(other.data) {}
|
||||
AggregationMethodOneNumber(const Other& other) : data(other.data) {}
|
||||
|
||||
/// To use one `Method` in different threads, use different `State`.
|
||||
using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type,
|
||||
Mapped, FieldType, consecutive_keys_optimization>;
|
||||
using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type, Mapped, FieldType,
|
||||
consecutive_keys_optimization>;
|
||||
|
||||
// Insert the key from the hash table into columns.
|
||||
static void insert_key_into_columns(const Key & key, MutableColumns & key_columns, const Sizes & /*key_sizes*/) {
|
||||
const auto * key_holder = reinterpret_cast<const char *>(&key);
|
||||
auto * column = static_cast<ColumnVectorHelper *>(key_columns[0].get());
|
||||
static void insert_key_into_columns(const Key& key, MutableColumns& key_columns,
|
||||
const Sizes& /*key_sizes*/) {
|
||||
const auto* key_holder = reinterpret_cast<const char*>(&key);
|
||||
auto* column = static_cast<ColumnVectorHelper*>(key_columns[0].get());
|
||||
column->insert_raw_data<sizeof(FieldType)>(key_holder);
|
||||
}
|
||||
|
||||
@ -118,7 +117,7 @@ template <typename Base>
|
||||
struct AggregationDataWithNullKey : public Base {
|
||||
using Base::Base;
|
||||
|
||||
bool & has_null_key_data() { return has_null_key; }
|
||||
bool& has_null_key_data() { return has_null_key; }
|
||||
AggregateDataPtr& get_null_key_data() { return null_key_data; }
|
||||
bool has_null_key_data() const { return has_null_key; }
|
||||
const AggregateDataPtr get_null_key_data() const { return null_key_data; }
|
||||
@ -155,30 +154,32 @@ struct AggregationMethodKeysFixed {
|
||||
AggregationMethodKeysFixed() {}
|
||||
|
||||
template <typename Other>
|
||||
AggregationMethodKeysFixed(const Other & other) : data(other.data) {}
|
||||
AggregationMethodKeysFixed(const Other& other) : data(other.data) {}
|
||||
|
||||
using State = ColumnsHashing::HashMethodKeysFixed<typename Data::value_type, Key, Mapped, has_nullable_keys>;
|
||||
using State = ColumnsHashing::HashMethodKeysFixed<typename Data::value_type, Key, Mapped,
|
||||
has_nullable_keys>;
|
||||
|
||||
static void insert_key_into_columns(const Key & key, MutableColumns & key_columns, const Sizes & key_sizes) {
|
||||
static void insert_key_into_columns(const Key& key, MutableColumns& key_columns,
|
||||
const Sizes& key_sizes) {
|
||||
size_t keys_size = key_columns.size();
|
||||
|
||||
static constexpr auto bitmap_size = has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;
|
||||
static constexpr auto bitmap_size =
|
||||
has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;
|
||||
/// In any hash key value, column values to be read start just after the bitmap, if it exists.
|
||||
size_t pos = bitmap_size;
|
||||
|
||||
for (size_t i = 0; i < keys_size; ++i) {
|
||||
IColumn * observed_column;
|
||||
ColumnUInt8 * null_map;
|
||||
IColumn* observed_column;
|
||||
ColumnUInt8* null_map;
|
||||
|
||||
bool column_nullable = false;
|
||||
if constexpr (has_nullable_keys)
|
||||
column_nullable = is_column_nullable(*key_columns[i]);
|
||||
if constexpr (has_nullable_keys) column_nullable = is_column_nullable(*key_columns[i]);
|
||||
|
||||
/// If we have a nullable column, get its nested column and its null map.
|
||||
if (column_nullable) {
|
||||
ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*key_columns[i]);
|
||||
ColumnNullable& nullable_col = assert_cast<ColumnNullable&>(*key_columns[i]);
|
||||
observed_column = &nullable_col.get_nested_column();
|
||||
null_map = assert_cast<ColumnUInt8 *>(&nullable_col.get_null_map_column());
|
||||
null_map = assert_cast<ColumnUInt8*>(&nullable_col.get_null_map_column());
|
||||
} else {
|
||||
observed_column = key_columns[i].get();
|
||||
null_map = nullptr;
|
||||
@ -190,7 +191,7 @@ struct AggregationMethodKeysFixed {
|
||||
/// corresponding key is nullable. Update the null map accordingly.
|
||||
size_t bucket = i / 8;
|
||||
size_t offset = i % 8;
|
||||
UInt8 val = (reinterpret_cast<const UInt8 *>(&key)[bucket] >> offset) & 1;
|
||||
UInt8 val = (reinterpret_cast<const UInt8*>(&key)[bucket] >> offset) & 1;
|
||||
null_map->insert_value(val);
|
||||
is_null = val == 1;
|
||||
}
|
||||
@ -199,7 +200,7 @@ struct AggregationMethodKeysFixed {
|
||||
observed_column->insert_default();
|
||||
else {
|
||||
size_t size = key_sizes[i];
|
||||
observed_column->insert_data(reinterpret_cast<const char *>(&key) + pos, size);
|
||||
observed_column->insert_data(reinterpret_cast<const char*>(&key) + pos, size);
|
||||
pos += size;
|
||||
}
|
||||
}
|
||||
@ -228,23 +229,24 @@ struct AggregationMethodSingleNullableColumn : public SingleColumnMethod {
|
||||
AggregationMethodSingleNullableColumn() = default;
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodSingleNullableColumn(const Other & other) : Base(other) {}
|
||||
explicit AggregationMethodSingleNullableColumn(const Other& other) : Base(other) {}
|
||||
|
||||
using State = ColumnsHashing::HashMethodSingleLowNullableColumn<BaseState, Mapped, true>;
|
||||
|
||||
static void insert_key_into_columns(const Key & key,
|
||||
MutableColumns & key_columns, const Sizes & /*key_sizes*/) {
|
||||
static void insert_key_into_columns(const Key& key, MutableColumns& key_columns,
|
||||
const Sizes& /*key_sizes*/) {
|
||||
auto col = key_columns[0].get();
|
||||
|
||||
if constexpr (std::is_same_v<Key, StringRef>) {
|
||||
col->insert_data(key.data, key.size);
|
||||
} else {
|
||||
col->insert_data(reinterpret_cast<const char *>(&key), sizeof(key));
|
||||
col->insert_data(reinterpret_cast<const char*>(&key), sizeof(key));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
using AggregatedDataWithUInt8Key = FixedImplicitZeroHashMapWithCalculatedSize<UInt8, AggregateDataPtr>;
|
||||
using AggregatedDataWithUInt8Key =
|
||||
FixedImplicitZeroHashMapWithCalculatedSize<UInt8, AggregateDataPtr>;
|
||||
using AggregatedDataWithUInt16Key = FixedImplicitZeroHashMap<UInt16, AggregateDataPtr>;
|
||||
using AggregatedDataWithUInt32Key = HashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>;
|
||||
using AggregatedDataWithUInt64Key = HashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
|
||||
@ -255,25 +257,32 @@ using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey<Aggregated
|
||||
using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey<AggregatedDataWithUInt16Key>;
|
||||
using AggregatedDataWithNullableUInt32Key = AggregationDataWithNullKey<AggregatedDataWithUInt32Key>;
|
||||
using AggregatedDataWithNullableUInt64Key = AggregationDataWithNullKey<AggregatedDataWithUInt64Key>;
|
||||
using AggregatedDataWithNullableUInt128Key = AggregationDataWithNullKey<AggregatedDataWithUInt128Key>;
|
||||
using AggregatedDataWithNullableUInt128Key =
|
||||
AggregationDataWithNullKey<AggregatedDataWithUInt128Key>;
|
||||
|
||||
using AggregatedMethodVariants = std::variant<AggregationMethodSerialized<AggregatedDataWithStringKey>,
|
||||
AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>,
|
||||
AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>,
|
||||
AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32Key>,
|
||||
AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>,
|
||||
AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128Key>,
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>,
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>,
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key>>,
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>,
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<UInt128, AggregatedDataWithNullableUInt128Key>>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, false>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, true>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, false>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, true>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, false>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, true>>;
|
||||
using AggregatedMethodVariants = std::variant<
|
||||
AggregationMethodSerialized<AggregatedDataWithStringKey>,
|
||||
AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>,
|
||||
AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>,
|
||||
AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32Key>,
|
||||
AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>,
|
||||
AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128Key>,
|
||||
AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>,
|
||||
AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>,
|
||||
AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key>>,
|
||||
AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>,
|
||||
AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt128, AggregatedDataWithNullableUInt128Key>>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, false>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, true>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, false>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, true>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, false>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, true>>;
|
||||
|
||||
struct AggregatedDataVariants {
|
||||
AggregatedDataVariants() = default;
|
||||
@ -305,62 +314,82 @@ struct AggregatedDataVariants {
|
||||
case Type::without_key:
|
||||
break;
|
||||
case Type::serialized:
|
||||
_aggregated_method_variant.emplace<AggregationMethodSerialized<AggregatedDataWithStringKey>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodSerialized<AggregatedDataWithStringKey>>();
|
||||
break;
|
||||
case Type::int8_key:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt8, AggregatedDataWithNullableUInt8Key, false>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>>();
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>>();
|
||||
}
|
||||
break;
|
||||
case Type::int16_key:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt16, AggregatedDataWithNullableUInt16Key, false>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>>();
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>>();
|
||||
}
|
||||
break;
|
||||
case Type::int32_key:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key>>>();
|
||||
_aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32Key>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32Key>>();
|
||||
}
|
||||
break;
|
||||
case Type::int64_key:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>>();
|
||||
_aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>>();
|
||||
}
|
||||
break;
|
||||
case Type::int128_key:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<UInt128, AggregatedDataWithNullableUInt128Key>>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt128, AggregatedDataWithNullableUInt128Key>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128Key>>();
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128Key>>();
|
||||
}
|
||||
break;
|
||||
case Type::int64_keys:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, true>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, false>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, false>>();
|
||||
}
|
||||
break;
|
||||
case Type::int128_keys:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, true>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, false>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, false>>();
|
||||
}
|
||||
break;
|
||||
case Type::int256_keys:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, true>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, false>>();
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, false>>();
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@ -427,9 +456,6 @@ private:
|
||||
bool _should_expand_hash_table = true;
|
||||
std::vector<char*> _streaming_pre_places;
|
||||
|
||||
/// Expose the minimum reduction factor to continue growing the hash tables.
|
||||
RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_;
|
||||
|
||||
private:
|
||||
/// Return true if we should keep expanding hash tables in the preagg. If false,
|
||||
/// the preagg should pass through any rows it can't fit in its tables.
|
||||
|
||||
@ -49,6 +49,7 @@ public:
|
||||
virtual Status close(RuntimeState* state);
|
||||
|
||||
protected:
|
||||
using ExecNode::debug_string;
|
||||
virtual std::string debug_string();
|
||||
|
||||
private:
|
||||
@ -72,7 +73,7 @@ private:
|
||||
Status _output_current_block(Block* block);
|
||||
BlockRowPos _get_partition_by_end();
|
||||
BlockRowPos _compare_row_to_find_end(int idx, BlockRowPos start, BlockRowPos end);
|
||||
|
||||
|
||||
Status _fetch_next_block_data(RuntimeState* state);
|
||||
Status _consumed_block_and_init_partition(RuntimeState* state, bool* next_partition, bool* eos);
|
||||
bool whether_need_next_partition(BlockRowPos found_partition_end);
|
||||
|
||||
@ -30,8 +30,8 @@ public:
|
||||
return Status::NotSupported("Not Implemented VAnalyticEvalNode::get_next.");
|
||||
}
|
||||
|
||||
virtual Status open(RuntimeState* state);
|
||||
virtual Status get_next(RuntimeState* state, Block* block, bool* eos);
|
||||
virtual Status open(RuntimeState* state) override;
|
||||
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
int64_t _desired_num_rows;
|
||||
@ -39,4 +39,4 @@ private:
|
||||
TAssertion::type _assertion;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -27,7 +27,6 @@
|
||||
#include "gen_cpp/PlanNodes_types.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/mem_pool.h"
|
||||
|
||||
#include "vec/core/block.h"
|
||||
#include "vec/exec/vblocking_join_node.h"
|
||||
|
||||
@ -40,18 +39,19 @@ namespace doris::vectorized {
|
||||
// the left child as necessary in get_next().
|
||||
class VCrossJoinNode final : public VBlockingJoinNode {
|
||||
public:
|
||||
VCrossJoinNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs);
|
||||
VCrossJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
|
||||
Status prepare(RuntimeState *state) override;
|
||||
Status prepare(RuntimeState* state) override;
|
||||
|
||||
using VBlockingJoinNode::get_next;
|
||||
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
||||
|
||||
Status close(RuntimeState *state) override;
|
||||
Status close(RuntimeState* state) override;
|
||||
|
||||
protected:
|
||||
void init_get_next(int first_left_row) override;
|
||||
|
||||
Status construct_build_side(RuntimeState *state) override;
|
||||
Status construct_build_side(RuntimeState* state) override;
|
||||
|
||||
private:
|
||||
// List of build blocks, constructed in prepare()
|
||||
@ -68,18 +68,18 @@ private:
|
||||
// if block can mem reuse, just clear data in block
|
||||
// else build a new block and alloc mem of column from left and right child block
|
||||
MutableColumns get_mutable_columns(Block* block);
|
||||
|
||||
|
||||
// Processes a block from the left child.
|
||||
// dst_columns: left_child_row and now_process_build_block to construct a bundle column of new block
|
||||
// now_process_build_block: right child block now to process
|
||||
void process_left_child_block(MutableColumns& dst_columns, const Block& now_process_build_block);
|
||||
void process_left_child_block(MutableColumns& dst_columns,
|
||||
const Block& now_process_build_block);
|
||||
|
||||
// Returns a debug string for _build_rows. This is used for debugging during the
|
||||
// build list construction and before doing the join.
|
||||
std::string build_list_debug_string();
|
||||
};
|
||||
|
||||
}
|
||||
} // namespace doris::vectorized
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
@ -21,15 +21,15 @@
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
/// Node that returns an empty result set, i.e., just sets eos_ in GetNext().
|
||||
/// Corresponds to EmptySetNode.java in the FE.
|
||||
class VEmptySetNode : public ExecNode {
|
||||
public:
|
||||
VEmptySetNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
|
||||
return Status::NotSupported("Not Implemented get RowBatch in vecorized execution.");
|
||||
}
|
||||
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
||||
};
|
||||
/// Node that returns an empty result set, i.e., just sets eos_ in GetNext().
|
||||
/// Corresponds to EmptySetNode.java in the FE.
|
||||
class VEmptySetNode : public ExecNode {
|
||||
public:
|
||||
VEmptySetNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
|
||||
return Status::NotSupported("Not Implemented get RowBatch in vecorized execution.");
|
||||
}
|
||||
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
@ -35,12 +35,13 @@ public:
|
||||
VEsHttpScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~VEsHttpScanNode();
|
||||
|
||||
virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);
|
||||
using EsHttpScanNode::get_next;
|
||||
virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
|
||||
|
||||
virtual Status close(RuntimeState* state) override;
|
||||
|
||||
private:
|
||||
virtual Status scanner_scan(std::unique_ptr<VEsHttpScanner> scanner);
|
||||
virtual Status scanner_scan(std::unique_ptr<VEsHttpScanner> scanner) override;
|
||||
|
||||
std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
|
||||
std::mutex _block_queue_lock;
|
||||
|
||||
@ -28,12 +28,13 @@ public:
|
||||
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
|
||||
virtual Status prepare(RuntimeState* state);
|
||||
virtual Status open(RuntimeState* state);
|
||||
using VSetOperationNode::get_next;
|
||||
virtual Status get_next(RuntimeState* state, vectorized::Block* output_block, bool* eos);
|
||||
virtual Status close(RuntimeState* state);
|
||||
|
||||
private:
|
||||
template <class HashTableContext, bool is_intersected>
|
||||
friend class HashTableProbe;
|
||||
friend struct HashTableProbe;
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
@ -38,12 +38,13 @@ public:
|
||||
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
|
||||
virtual Status prepare(RuntimeState* state);
|
||||
virtual Status open(RuntimeState* state);
|
||||
using VSetOperationNode::get_next;
|
||||
virtual Status get_next(RuntimeState* state, vectorized::Block* output_block, bool* eos);
|
||||
virtual Status close(RuntimeState* state);
|
||||
|
||||
private:
|
||||
template <class HashTableContext, bool is_intersected>
|
||||
friend class HashTableProbe;
|
||||
friend struct HashTableProbe;
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
@ -37,6 +37,7 @@ public:
|
||||
VMysqlScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~VMysqlScanNode();
|
||||
|
||||
using MysqlScanNode::get_next;
|
||||
// Fill the next block by calling next() on the _mysql_scanner,
|
||||
// converting text data in MySQL cells to binary data.
|
||||
virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);
|
||||
|
||||
@ -27,6 +27,7 @@ public:
|
||||
VOdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~VOdbcScanNode();
|
||||
|
||||
using OdbcScanNode::get_next;
|
||||
Status get_next(RuntimeState* state, Block* block, bool* eos);
|
||||
};
|
||||
} // namespace vectorized
|
||||
|
||||
@ -35,6 +35,7 @@ public:
|
||||
|
||||
virtual Status prepare(RuntimeState* state) override;
|
||||
virtual Status open(RuntimeState* state) override;
|
||||
using RepeatNode::get_next;
|
||||
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
||||
virtual Status close(RuntimeState* state) override;
|
||||
|
||||
|
||||
@ -36,8 +36,8 @@ public:
|
||||
VSchemaScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~VSchemaScanNode();
|
||||
Status prepare(RuntimeState* state) override;
|
||||
|
||||
virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);
|
||||
using SchemaScanNode::get_next;
|
||||
virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
Status write_slot_to_vectorized_column(void* slot, SlotDescriptor* slot_desc,
|
||||
|
||||
@ -90,9 +90,9 @@ protected:
|
||||
RuntimeProfile::Counter* _probe_timer; // time to probe
|
||||
|
||||
template <class HashTableContext>
|
||||
friend class HashTableBuild;
|
||||
friend struct HashTableBuild;
|
||||
template <class HashTableContext, bool is_intersected>
|
||||
friend class HashTableProbe;
|
||||
friend struct HashTableProbe;
|
||||
};
|
||||
|
||||
template <bool keep_matched>
|
||||
|
||||
@ -17,10 +17,9 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "exec/exec_node.h"
|
||||
|
||||
#include <queue>
|
||||
|
||||
#include "exec/exec_node.h"
|
||||
#include "vec/core/block.h"
|
||||
#include "vec/core/sort_cursor.h"
|
||||
#include "vec/exec/vsort_exec_exprs.h"
|
||||
@ -33,30 +32,30 @@ namespace doris::vectorized {
|
||||
// support spill to disk in the future
|
||||
class VSortNode : public doris::ExecNode {
|
||||
public:
|
||||
VSortNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs);
|
||||
VSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
|
||||
~VSortNode() override = default;
|
||||
|
||||
virtual Status init(const TPlanNode &tnode, RuntimeState *state = nullptr);
|
||||
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
|
||||
|
||||
virtual Status prepare(RuntimeState *state);
|
||||
virtual Status prepare(RuntimeState* state) override;
|
||||
|
||||
virtual Status open(RuntimeState *state);
|
||||
virtual Status open(RuntimeState* state) override;
|
||||
|
||||
virtual Status get_next(RuntimeState *state, RowBatch *row_batch, bool *eos);
|
||||
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
|
||||
|
||||
virtual Status get_next(RuntimeState* state, Block* block, bool* eos);
|
||||
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
||||
|
||||
virtual Status reset(RuntimeState *state);
|
||||
virtual Status reset(RuntimeState* state) override;
|
||||
|
||||
virtual Status close(RuntimeState *state);
|
||||
virtual Status close(RuntimeState* state) override;
|
||||
|
||||
protected:
|
||||
virtual void debug_string(int indentation_level, std::stringstream *out) const;
|
||||
virtual void debug_string(int indentation_level, std::stringstream* out) const override;
|
||||
|
||||
private:
|
||||
// Fetch input rows and feed them to the sorter until the input is exhausted.
|
||||
Status sort_input(RuntimeState *state);
|
||||
Status sort_input(RuntimeState* state);
|
||||
|
||||
Status pretreat_block(Block& block);
|
||||
|
||||
@ -87,6 +86,4 @@ private:
|
||||
std::priority_queue<SortBlockCursor> _block_priority_queue;
|
||||
};
|
||||
|
||||
} // end namespace doris
|
||||
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -26,12 +26,14 @@ class VCaseExpr final : public VExpr {
|
||||
public:
|
||||
VCaseExpr(const TExprNode& node);
|
||||
~VCaseExpr() = default;
|
||||
virtual Status execute(VExprContext* context, vectorized::Block* block, int* result_column_id);
|
||||
virtual Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context);
|
||||
virtual Status execute(VExprContext* context, vectorized::Block* block,
|
||||
int* result_column_id) override;
|
||||
virtual Status prepare(RuntimeState* state, const RowDescriptor& desc,
|
||||
VExprContext* context) override;
|
||||
virtual Status open(RuntimeState* state, VExprContext* context,
|
||||
FunctionContext::FunctionStateScope scope);
|
||||
FunctionContext::FunctionStateScope scope) override;
|
||||
virtual void close(RuntimeState* state, VExprContext* context,
|
||||
FunctionContext::FunctionStateScope scope);
|
||||
FunctionContext::FunctionStateScope scope) override;
|
||||
virtual VExpr* clone(ObjectPool* pool) const override {
|
||||
return pool->add(new VCaseExpr(*this));
|
||||
}
|
||||
|
||||
@ -20,18 +20,18 @@
|
||||
#include "vec/functions/function.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
class VCastExpr final: public VExpr {
|
||||
class VCastExpr final : public VExpr {
|
||||
public:
|
||||
VCastExpr(const TExprNode& node) : VExpr(node) {}
|
||||
~VCastExpr() = default;
|
||||
virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
|
||||
int* result_column_id);
|
||||
int* result_column_id) override;
|
||||
virtual doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
|
||||
VExprContext* context);
|
||||
VExprContext* context) override;
|
||||
virtual doris::Status open(doris::RuntimeState* state, VExprContext* context,
|
||||
FunctionContext::FunctionStateScope scope);
|
||||
FunctionContext::FunctionStateScope scope) override;
|
||||
virtual void close(doris::RuntimeState* state, VExprContext* context,
|
||||
FunctionContext::FunctionStateScope scope);
|
||||
FunctionContext::FunctionStateScope scope) override;
|
||||
virtual VExpr* clone(doris::ObjectPool* pool) const override {
|
||||
return pool->add(new VCastExpr(*this));
|
||||
}
|
||||
|
||||
@ -25,18 +25,18 @@ class VectorizedFnCall : public VExpr {
|
||||
public:
|
||||
VectorizedFnCall(const doris::TExprNode& node);
|
||||
virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
|
||||
int* result_column_id);
|
||||
int* result_column_id) override;
|
||||
virtual doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
|
||||
VExprContext* context);
|
||||
VExprContext* context) override;
|
||||
virtual doris::Status open(doris::RuntimeState* state, VExprContext* context,
|
||||
FunctionContext::FunctionStateScope scope);
|
||||
FunctionContext::FunctionStateScope scope) override;
|
||||
virtual void close(doris::RuntimeState* state, VExprContext* context,
|
||||
FunctionContext::FunctionStateScope scope);
|
||||
FunctionContext::FunctionStateScope scope) override;
|
||||
virtual VExpr* clone(doris::ObjectPool* pool) const override {
|
||||
return pool->add(new VectorizedFnCall(*this));
|
||||
}
|
||||
virtual const std::string& expr_name() const override;
|
||||
virtual std::string debug_string() const;
|
||||
virtual std::string debug_string() const override;
|
||||
static std::string debug_string(const std::vector<VectorizedFnCall*>& exprs);
|
||||
|
||||
private:
|
||||
|
||||
@ -18,23 +18,22 @@
|
||||
#pragma once
|
||||
|
||||
#include "exprs/hybrid_set.h"
|
||||
|
||||
#include "vec/exprs/vexpr.h"
|
||||
#include "vec/functions/function.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
class VInPredicate final: public VExpr {
|
||||
class VInPredicate final : public VExpr {
|
||||
public:
|
||||
VInPredicate(const TExprNode& node);
|
||||
~VInPredicate() = default;
|
||||
virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
|
||||
int* result_column_id);
|
||||
int* result_column_id) override;
|
||||
virtual doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
|
||||
VExprContext* context);
|
||||
VExprContext* context) override;
|
||||
virtual doris::Status open(doris::RuntimeState* state, VExprContext* context,
|
||||
FunctionContext::FunctionStateScope scope);
|
||||
FunctionContext::FunctionStateScope scope) override;
|
||||
virtual void close(doris::RuntimeState* state, VExprContext* context,
|
||||
FunctionContext::FunctionStateScope scope);
|
||||
FunctionContext::FunctionStateScope scope) override;
|
||||
virtual VExpr* clone(doris::ObjectPool* pool) const override {
|
||||
return pool->add(new VInPredicate(*this));
|
||||
}
|
||||
@ -46,6 +45,7 @@ private:
|
||||
|
||||
const bool _is_not_in;
|
||||
bool _is_prepare;
|
||||
|
||||
private:
|
||||
static const constexpr char* function_name = "in";
|
||||
};
|
||||
|
||||
@ -28,16 +28,16 @@ public:
|
||||
VSlotRef(const doris::TExprNode& node);
|
||||
VSlotRef(const SlotDescriptor* desc);
|
||||
virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
|
||||
int* result_column_id);
|
||||
int* result_column_id) override;
|
||||
virtual doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
|
||||
VExprContext* context);
|
||||
VExprContext* context) override;
|
||||
virtual VExpr* clone(doris::ObjectPool* pool) const override {
|
||||
return pool->add(new VSlotRef(*this));
|
||||
}
|
||||
|
||||
virtual const std::string& expr_name() const override;
|
||||
virtual std::string debug_string() const;
|
||||
virtual bool is_constant() const { return false; }
|
||||
virtual std::string debug_string() const override;
|
||||
virtual bool is_constant() const override { return false; }
|
||||
|
||||
private:
|
||||
FunctionPtr _function;
|
||||
|
||||
@ -374,9 +374,9 @@ struct ToNumberMonotonicity {
|
||||
Float64 right_float = right.get<Float64>();
|
||||
|
||||
if (left_float >= std::numeric_limits<T>::min() &&
|
||||
left_float <= std::numeric_limits<T>::max() &&
|
||||
left_float <= static_cast<Float64>(std::numeric_limits<T>::max()) &&
|
||||
right_float >= std::numeric_limits<T>::min() &&
|
||||
right_float <= std::numeric_limits<T>::max())
|
||||
right_float <= static_cast<Float64>(std::numeric_limits<T>::max()))
|
||||
return {true};
|
||||
|
||||
return {};
|
||||
|
||||
@ -495,8 +495,8 @@ void convert_to_block(Block& block, const PValues& result, size_t pos) {
|
||||
null_map_data[i] = false;
|
||||
}
|
||||
}
|
||||
block.replace_by_position(
|
||||
pos, std::move(ColumnNullable::create(std::move(data_col), std::move(null_col))));
|
||||
block.replace_by_position(pos,
|
||||
ColumnNullable::create(std::move(data_col), std::move(null_col)));
|
||||
} else {
|
||||
auto column = data_type->create_column();
|
||||
convert_to_column<false>(column, result);
|
||||
|
||||
@ -181,8 +181,6 @@ private:
|
||||
bool _skip_same;
|
||||
// used when `_merge == true`
|
||||
std::unique_ptr<MergeHeap> _heap;
|
||||
// used when `_merge == false`
|
||||
int _child_idx = 0;
|
||||
};
|
||||
|
||||
std::unique_ptr<LevelIterator> _inner_iter;
|
||||
|
||||
@ -89,7 +89,7 @@ public:
|
||||
|
||||
private:
|
||||
class SenderQueue;
|
||||
friend class ReceiveQueueSortCursorImpl;
|
||||
friend struct ReceiveQueueSortCursorImpl;
|
||||
|
||||
bool exceeds_limit(int batch_size) {
|
||||
return _num_buffered_bytes + batch_size > _total_buffer_limit;
|
||||
@ -140,7 +140,7 @@ public:
|
||||
|
||||
private:
|
||||
std::condition_variable _cv;
|
||||
};
|
||||
};
|
||||
|
||||
class VDataStreamRecvr::SenderQueue {
|
||||
public:
|
||||
|
||||
@ -28,7 +28,7 @@ class BufferControlBlock;
|
||||
class ExprContext;
|
||||
class ResultWriter;
|
||||
class MemTracker;
|
||||
class ResultFileOptions;
|
||||
struct ResultFileOptions;
|
||||
namespace vectorized {
|
||||
class VExprContext;
|
||||
|
||||
@ -58,7 +58,6 @@ private:
|
||||
// set file options when sink type is FILE
|
||||
std::unique_ptr<ResultFileOptions> _file_opts;
|
||||
|
||||
ObjectPool* _obj_pool;
|
||||
// Owned by the RuntimeState.
|
||||
const RowDescriptor& _row_desc;
|
||||
|
||||
|
||||
@ -37,8 +37,8 @@ class OlapTableSink;
|
||||
class VOlapTableSink : public OlapTableSink {
|
||||
public:
|
||||
// Construct from thrift struct which is generated by FE.
|
||||
VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs,
|
||||
Status* status);
|
||||
VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
|
||||
const std::vector<TExpr>& texprs, Status* status);
|
||||
|
||||
Status init(const TDataSink& sink) override;
|
||||
// TODO: unify the code of prepare/open/close with result sink
|
||||
@ -47,7 +47,7 @@ public:
|
||||
Status open(RuntimeState* state) override;
|
||||
|
||||
Status close(RuntimeState* state, Status close_status) override;
|
||||
|
||||
using OlapTableSink::send;
|
||||
Status send(RuntimeState* state, vectorized::Block* block) override;
|
||||
|
||||
private:
|
||||
@ -55,8 +55,8 @@ private:
|
||||
// return number of invalid/filtered rows.
|
||||
// invalid row number is set in Bitmap
|
||||
// set stop_processing is we want to stop the whole process now.
|
||||
Status _validate_data(RuntimeState* state, vectorized::Block* block, Bitmap* filter_bitmap, int* filtered_rows,
|
||||
bool* stop_processing);
|
||||
Status _validate_data(RuntimeState* state, vectorized::Block* block, Bitmap* filter_bitmap,
|
||||
int* filtered_rows, bool* stop_processing);
|
||||
|
||||
VOlapTablePartitionParam* _vpartition = nullptr;
|
||||
std::vector<vectorized::VExprContext*> _output_vexpr_ctxs;
|
||||
|
||||
Reference in New Issue
Block a user