[feature](storage-format) Z-Order Implement (#7149)

Support sort data by Z-Order:

```
CREATE TABLE table2 (
siteid int(11) NULL DEFAULT "10" COMMENT "",
citycode int(11) NULL COMMENT "",
username varchar(32) NULL DEFAULT "" COMMENT "",
pv bigint(20) NULL DEFAULT "0" COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(siteid, citycode)
COMMENT "OLAP"
DISTRIBUTED BY HASH(siteid) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"data_sort.sort_type" = "ZORDER",
"data_sort.col_num" = "2",
"in_memory" = "false",
"storage_format" = "V2"
);
```
This commit is contained in:
xinghuayu007
2021-12-02 11:39:51 +08:00
committed by GitHub
parent d8ba6e3eb6
commit dd36ccc3bf
32 changed files with 2025 additions and 51 deletions

View File

@ -56,6 +56,8 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
void CollectIterator::build_heap(const std::vector<RowsetReaderSharedPtr>& rs_readers) {
DCHECK(rs_readers.size() == _children.size());
_reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS;
SortType sort_type = _reader->_tablet->tablet_schema().sort_type();
int sort_col_num = _reader->_tablet->tablet_schema().sort_col_num();
if (_children.empty()) {
_inner_iter.reset(nullptr);
return;
@ -86,18 +88,19 @@ void CollectIterator::build_heap(const std::vector<RowsetReaderSharedPtr>& rs_re
++i;
}
Level1Iterator* cumu_iter =
new Level1Iterator(cumu_children, cumu_children.size() > 1, _reverse, _reader->_sequence_col_idx);
new Level1Iterator(cumu_children, cumu_children.size() > 1, _reverse,
_reader->_sequence_col_idx, sort_type, sort_col_num);
cumu_iter->init();
_inner_iter.reset(new Level1Iterator(std::list<LevelIterator*>{*base_reader_child, cumu_iter}, _merge,
_reverse, _reader->_sequence_col_idx));
_reverse, _reader->_sequence_col_idx, sort_type, sort_col_num));
} else {
// _children.size() == 1
_inner_iter.reset(new Level1Iterator(_children, _merge,
_reverse, _reader->_sequence_col_idx));
_reverse, _reader->_sequence_col_idx, sort_type, sort_col_num));
}
} else {
_inner_iter.reset(new Level1Iterator(_children, _merge,
_reverse, _reader->_sequence_col_idx));
_reverse, _reader->_sequence_col_idx, sort_type, sort_col_num));
}
_inner_iter->init();
// Clear _children earlier to release any related references
@ -132,6 +135,34 @@ bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a
return a->version() > b->version();
}
CollectIterator::BaseComparator::BaseComparator(
std::shared_ptr<LevelIteratorComparator>& cmp) {
_cmp = cmp;
}
bool CollectIterator::BaseComparator::operator()(const LevelIterator* a, const LevelIterator* b) {
return _cmp->operator()(a, b);
}
bool CollectIterator::LevelZorderIteratorComparator::operator()(const LevelIterator* a,
const LevelIterator* b) {
// First compare row cursor.
const RowCursor* first = a->current_row();
const RowCursor* second = b->current_row();
int cmp_res = _comparator.compare_row(*first, *second);
if (cmp_res != 0) {
return cmp_res > 0;
}
// if row cursors equal, compare data version.
// read data from higher version to lower version.
// for UNIQUE_KEYS just read the highest version and no need agg_update.
// for AGG_KEYS if a version is deleted, the lower version no need to agg_update
if (_reverse) {
return a->version() < b->version();
}
return a->version() > b->version();
}
const RowCursor* CollectIterator::current_row(bool* delete_flag) const {
if (LIKELY(_inner_iter)) {
return _inner_iter->current_row(delete_flag);
@ -233,8 +264,11 @@ OLAPStatus CollectIterator::Level0Iterator::next(const RowCursor** row, bool* de
}
CollectIterator::Level1Iterator::Level1Iterator(
std::list<CollectIterator::LevelIterator*> children, bool merge, bool reverse, int sequence_id_idx)
: _children(std::move(children)), _merge(merge), _reverse(reverse), _sequence_id_idx(sequence_id_idx) {}
const std::list<CollectIterator::LevelIterator*>& children,
bool merge, bool reverse, int sequence_id_idx,
SortType sort_type, int sort_col_num)
: _children(children), _merge(merge), _reverse(reverse),
_sort_type(sort_type), _sort_col_num(sort_col_num) {}
CollectIterator::LevelIterator::~LevelIterator() = default;
@ -304,7 +338,14 @@ OLAPStatus CollectIterator::Level1Iterator::init() {
// Only when there are multiple children that need to be merged
if (_merge && _children.size() > 1) {
_heap.reset(new MergeHeap(LevelIteratorComparator(_reverse, _sequence_id_idx)));
std::shared_ptr<LevelIteratorComparator> cmp;
if (_sort_type == SortType::ZORDER) {
cmp = std::make_shared<LevelZorderIteratorComparator>(_reverse, _sequence_id_idx, _sort_col_num);
} else {
cmp = std::make_shared<LevelIteratorComparator>(_reverse, _sequence_id_idx);
}
BaseComparator bcmp(cmp);
_heap.reset(new MergeHeap(bcmp));
for (auto child : _children) {
DCHECK(child != nullptr);
DCHECK(child->current_row() != nullptr);

View File

@ -20,6 +20,7 @@
#include "olap/olap_define.h"
#include "olap/row_cursor.h"
#include "olap/rowset/rowset_reader.h"
#include "util/tuple_row_zorder_compare.h"
namespace doris {
@ -74,15 +75,39 @@ private:
public:
LevelIteratorComparator(const bool reverse = false, int sequence_id_idx = -1) :
_reverse(reverse), _sequence_id_idx(sequence_id_idx) {}
bool operator()(const LevelIterator* a, const LevelIterator* b);
virtual bool operator()(const LevelIterator* a, const LevelIterator* b);
private:
bool _reverse;
int _sequence_id_idx;
};
class LevelZorderIteratorComparator: public LevelIteratorComparator {
public:
LevelZorderIteratorComparator(const bool reverse = false, int sequence_id_idx = -1, const size_t sort_col_num = 0) :
_reverse(reverse), _sequence_id_idx(sequence_id_idx), _sort_col_num(sort_col_num) {
_comparator = TupleRowZOrderComparator(sort_col_num);
}
virtual bool operator()(const LevelIterator* a, const LevelIterator* b);
private:
bool _reverse = false;
int _sequence_id_idx;
size_t _sort_col_num = 0;
TupleRowZOrderComparator _comparator;
};
class BaseComparator {
public:
BaseComparator(std::shared_ptr<LevelIteratorComparator>& cmp);
bool operator()(const LevelIterator* a, const LevelIterator* b);
private:
std::shared_ptr<LevelIteratorComparator> _cmp;
};
typedef std::priority_queue<LevelIterator*, std::vector<LevelIterator*>,
LevelIteratorComparator>
BaseComparator>
MergeHeap;
// Iterate from rowset reader. This Iterator usually like a leaf node
class Level0Iterator : public LevelIterator {
@ -118,7 +143,9 @@ private:
// Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or mixed)
class Level1Iterator : public LevelIterator {
public:
Level1Iterator(std::list<LevelIterator*>, bool, bool, int);
Level1Iterator(const std::list<LevelIterator*>& children, bool merge, bool reverse,
int sequence_id_idx, SortType sort_type, int sort_col_num);
OLAPStatus init() override;
@ -157,6 +184,8 @@ private:
// used when `_merge == false`
int _child_idx = 0;
int _sequence_id_idx = -1;
SortType _sort_type;
int _sort_col_num;
};
std::unique_ptr<LevelIterator> _inner_iter;

View File

@ -39,14 +39,19 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
_tuple_desc(tuple_desc),
_slot_descs(slot_descs),
_keys_type(keys_type),
_row_comparator(_schema),
_mem_tracker(MemTracker::CreateTracker(-1, "MemTable", parent_tracker)),
_buffer_mem_pool(new MemPool(_mem_tracker.get())),
_table_mem_pool(new MemPool(_mem_tracker.get())),
_schema_size(_schema->schema_size()),
_skip_list(new Table(_row_comparator, _table_mem_pool.get(),
_keys_type == KeysType::DUP_KEYS)),
_rowset_writer(rowset_writer) {}
_rowset_writer(rowset_writer) {
if (tablet_schema->sort_type() == SortType::ZORDER) {
_row_comparator = std::make_shared<TupleRowZOrderComparator>(_schema, tablet_schema->sort_col_num());
} else {
_row_comparator = std::make_shared<RowCursorComparator>(_schema);
}
_skip_list = new Table(_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
}
MemTable::~MemTable() {
delete _skip_list;

View File

@ -24,6 +24,7 @@
#include "olap/olap_define.h"
#include "olap/skiplist.h"
#include "runtime/mem_tracker.h"
#include "util/tuple_row_zorder_compare.h"
namespace doris {
@ -53,16 +54,17 @@ public:
int64_t flush_size() const { return _flush_size; }
private:
class RowCursorComparator {
class RowCursorComparator: public RowComparator {
public:
RowCursorComparator(const Schema* schema);
int operator()(const char* left, const char* right) const;
virtual int operator()(const char* left, const char* right) const;
private:
const Schema* _schema;
};
typedef SkipList<char*, RowCursorComparator> Table;
private:
typedef SkipList<char*, RowComparator> Table;
typedef Table::key_type TableKey;
public:
@ -95,7 +97,7 @@ private:
const std::vector<SlotDescriptor*>* _slot_descs;
KeysType _keys_type;
RowCursorComparator _row_comparator;
std::shared_ptr<RowComparator> _row_comparator;
std::shared_ptr<MemTracker> _mem_tracker;
// This is a buffer, to hold the memory referenced by the rows that have not
// been inserted into the SkipList

View File

@ -239,6 +239,8 @@ Status ColumnReader::_get_filtered_pages(CondColumn* cond_column, CondColumn* de
}
}
}
VLOG(1) << "total-pages: " << page_size << " not-filtered-pages: " << page_indexes->size()
<< " filtered-percent:" << 1.0 - (page_indexes->size()*1.0)/(page_size*1.0);
return Status::OK();
}

View File

@ -128,7 +128,10 @@ Status SegmentIterator::_init() {
_row_bitmap.addRange(0, _segment->num_rows());
RETURN_IF_ERROR(_init_return_column_iterators());
RETURN_IF_ERROR(_init_bitmap_index_iterators());
RETURN_IF_ERROR(_get_row_ranges_by_keys());
// z-order can not use prefix index
if (_segment->_tablet_schema->sort_type() != SortType::ZORDER) {
RETURN_IF_ERROR(_get_row_ranges_by_keys());
}
RETURN_IF_ERROR(_get_row_ranges_by_column_conditions());
_init_lazy_materialization();
_range_iter.reset(new BitmapRangeIterator(_row_bitmap));

View File

@ -67,7 +67,7 @@ public:
// and will allocate memory using "*mem_pool".
// NOTE: Objects allocated in the mem_pool must remain allocated for
// the lifetime of the skiplist object.
explicit SkipList(Comparator cmp, MemPool* mem_pool, bool can_dup);
explicit SkipList(Comparator* cmp, MemPool* mem_pool, bool can_dup);
// Insert key into the list.
void Insert(const Key& key, bool* overwritten);
@ -121,7 +121,7 @@ public:
private:
// Immutable after construction
Comparator const compare_;
Comparator* const compare_;
// When value is true, means indicates that duplicate values are allowed.
bool _can_dup;
MemPool* const _mem_pool; // MemPool used for allocations of nodes
@ -139,7 +139,7 @@ private:
Node* NewNode(const Key& key, int height);
int RandomHeight();
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }
bool Equal(const Key& a, const Key& b) const { return ((*compare_)(a, b) == 0); }
// Return true if key is greater than the data stored in "n"
bool KeyIsAfterNode(const Key& key, Node* n) const;
@ -277,7 +277,7 @@ int SkipList<Key, Comparator>::RandomHeight() {
template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
// nullptr n is considered infinite
return (n != nullptr) && (compare_(n->key, key) < 0);
return (n != nullptr) && ((*compare_)(n->key, key) < 0);
}
template <typename Key, class Comparator>
@ -308,9 +308,9 @@ typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::FindLessTha
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
DCHECK(x == head_ || compare_(x->key, key) < 0);
DCHECK(x == head_ || (*compare_)(x->key, key) < 0);
Node* next = x->Next(level);
if (next == nullptr || compare_(next->key, key) >= 0) {
if (next == nullptr || (*compare_)(next->key, key) >= 0) {
if (level == 0) {
return x;
} else {
@ -343,7 +343,7 @@ typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::FindLast()
}
template <typename Key, class Comparator>
SkipList<Key, Comparator>::SkipList(Comparator cmp, MemPool* mem_pool, bool can_dup)
SkipList<Key, Comparator>::SkipList(Comparator* cmp, MemPool* mem_pool, bool can_dup)
: compare_(cmp),
_can_dup(can_dup),
_mem_pool(mem_pool),

View File

@ -82,6 +82,8 @@ public:
// properties encapsulated in TabletSchema
inline KeysType keys_type() const;
inline SortType sort_type() const;
inline size_t sort_col_num() const;
inline size_t num_columns() const;
inline size_t num_null_columns() const;
inline size_t num_key_columns() const;
@ -394,6 +396,14 @@ inline KeysType Tablet::keys_type() const {
return _schema.keys_type();
}
inline SortType Tablet::sort_type() const {
return _schema.sort_type();
}
inline size_t Tablet::sort_col_num() const {
return _schema.sort_col_num();
}
inline size_t Tablet::num_columns() const {
return _schema.num_columns();
}

View File

@ -86,6 +86,15 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
break;
}
schema->set_compress_kind(COMPRESS_LZ4);
switch(tablet_schema.sort_type) {
case TSortType::type::ZORDER:
schema->set_sort_type(SortType::ZORDER);
break;
default:
schema->set_sort_type(SortType::LEXICAL);
}
schema->set_sort_col_num(tablet_schema.sort_col_num);
tablet_meta_pb.set_in_restore_mode(false);
// set column information

View File

@ -422,6 +422,8 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
_is_in_memory = schema.is_in_memory();
_delete_sign_idx = schema.delete_sign_idx();
_sequence_col_idx = schema.sequence_col_idx();
_sort_type = schema.sort_type();
_sort_col_num = schema.sort_col_num();
}
void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) {
@ -440,6 +442,8 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) {
tablet_meta_pb->set_is_in_memory(_is_in_memory);
tablet_meta_pb->set_delete_sign_idx(_delete_sign_idx);
tablet_meta_pb->set_sequence_col_idx(_sequence_col_idx);
tablet_meta_pb->set_sort_type(_sort_type);
tablet_meta_pb->set_sort_col_num(_sort_col_num);
}
uint32_t TabletSchema::mem_size() const {

View File

@ -130,6 +130,8 @@ public:
inline size_t num_short_key_columns() const { return _num_short_key_columns; }
inline size_t num_rows_per_row_block() const { return _num_rows_per_row_block; }
inline KeysType keys_type() const { return _keys_type; }
inline SortType sort_type() const { return _sort_type; }
inline size_t sort_col_num() const { return _sort_col_num; }
inline CompressKind compress_kind() const { return _compress_kind; }
inline size_t next_column_unique_id() const { return _next_column_unique_id; }
inline double bloom_filter_fpp() const { return _bf_fpp; }
@ -149,6 +151,8 @@ private:
private:
KeysType _keys_type = DUP_KEYS;
SortType _sort_type = SortType::LEXICAL;
size_t _sort_col_num = 0;
std::vector<TabletColumn> _cols;
std::unordered_map<std::string, int32_t> _field_name_to_index;
size_t _num_columns = 0;

View File

@ -106,6 +106,7 @@ set(UTIL_FILES
s3_storage_backend.cpp
s3_util.cpp
topn_counter.cpp
tuple_row_zorder_compare.cpp
)
if (WITH_MYSQL)

View File

@ -24,6 +24,12 @@
#include "gutil/bits.h"
#include "gutil/port.h"
#include "util/cpu_info.h"
#ifdef __aarch64__
#include "sse2neon.h"
#else
#include <emmintrin.h>
#include <immintrin.h>
#endif
namespace doris {
@ -328,6 +334,116 @@ public:
if (PREDICT_FALSE(num_bits >= 64)) return 0;
return v >> num_bits;
}
static void ByteSwapScalar(void *dest, const void *source, int len) {
uint8_t *dst = reinterpret_cast<uint8_t *>(dest);
const uint8_t *src = reinterpret_cast<const uint8_t *>(source);
switch (len) {
case 1:
*reinterpret_cast<uint8_t *>(dst) = *reinterpret_cast<const uint8_t *>(src);
return;
case 2:
*reinterpret_cast<uint16_t *>(dst) =
BitUtil::byte_swap(*reinterpret_cast<const uint16_t *>(src));
return;
case 3:
*reinterpret_cast<uint16_t *>(dst + 1) =
BitUtil::byte_swap(*reinterpret_cast<const uint16_t *>(src));
*reinterpret_cast<uint8_t *>(dst) = *reinterpret_cast<const uint8_t *>(src + 2);
return;
case 4:
*reinterpret_cast<uint32_t *>(dst) =
BitUtil::byte_swap(*reinterpret_cast<const uint32_t *>(src));
return;
case 5:
*reinterpret_cast<uint32_t *>(dst + 1) =
BitUtil::byte_swap(*reinterpret_cast<const uint32_t *>(src));
*reinterpret_cast<uint8_t *>(dst) = *reinterpret_cast<const uint8_t *>(src + 4);
return;
case 6:
*reinterpret_cast<uint32_t *>(dst + 2) =
BitUtil::byte_swap(*reinterpret_cast<const uint32_t *>(src));
*reinterpret_cast<uint16_t *>(dst) =
BitUtil::byte_swap(*reinterpret_cast<const uint16_t *>(src + 4));
return;
case 7:
*reinterpret_cast<uint32_t *>(dst + 3) =
BitUtil::byte_swap(*reinterpret_cast<const uint32_t *>(src));
*reinterpret_cast<uint16_t *>(dst + 1) =
BitUtil::byte_swap(*reinterpret_cast<const uint16_t *>(src + 4));
*reinterpret_cast<uint8_t *>(dst) = *reinterpret_cast<const uint8_t *>(src + 6);
return;
case 8:
*reinterpret_cast<uint64_t *>(dst) =
BitUtil::byte_swap(*reinterpret_cast<const uint64_t *>(src));
return;
case 9:
*reinterpret_cast<uint64_t *>(dst + 1) =
BitUtil::byte_swap(*reinterpret_cast<const uint64_t *>(src));
*reinterpret_cast<uint8_t *>(dst) = *reinterpret_cast<const uint8_t *>(src + 8);
return;
case 10:
*reinterpret_cast<uint64_t *>(dst + 2) =
BitUtil::byte_swap(*reinterpret_cast<const uint64_t *>(src));
*reinterpret_cast<uint16_t *>(dst) =
BitUtil::byte_swap(*reinterpret_cast<const uint16_t *>(src + 8));
return;
case 11:
*reinterpret_cast<uint64_t *>(dst + 3) =
BitUtil::byte_swap(*reinterpret_cast<const uint64_t *>(src));
*reinterpret_cast<uint16_t *>(dst + 1) =
BitUtil::byte_swap(*reinterpret_cast<const uint16_t *>(src + 8));
*reinterpret_cast<uint8_t *>(dst) = *reinterpret_cast<const uint8_t *>(src + 10);
return;
case 12:
*reinterpret_cast<uint64_t *>(dst + 4) =
BitUtil::byte_swap(*reinterpret_cast<const uint64_t *>(src));
*reinterpret_cast<uint32_t *>(dst) =
BitUtil::byte_swap(*reinterpret_cast<const uint32_t *>(src + 8));
return;
case 13:
*reinterpret_cast<uint64_t *>(dst + 5) =
BitUtil::byte_swap(*reinterpret_cast<const uint64_t *>(src));
*reinterpret_cast<uint32_t *>(dst + 1) =
BitUtil::byte_swap(*reinterpret_cast<const uint32_t *>(src + 8));
*reinterpret_cast<uint8_t *>(dst) = *reinterpret_cast<const uint8_t *>(src + 12);
return;
case 14:
*reinterpret_cast<uint64_t *>(dst + 6) =
BitUtil::byte_swap(*reinterpret_cast<const uint64_t *>(src));
*reinterpret_cast<uint32_t *>(dst + 2) =
BitUtil::byte_swap(*reinterpret_cast<const uint32_t *>(src + 8));
*reinterpret_cast<uint16_t *>(dst) =
BitUtil::byte_swap(*reinterpret_cast<const uint16_t *>(src + 12));
return;
case 15:
*reinterpret_cast<uint64_t *>(dst + 7) =
BitUtil::byte_swap(*reinterpret_cast<const uint64_t *>(src));
*reinterpret_cast<uint32_t *>(dst + 3) =
BitUtil::byte_swap(*reinterpret_cast<const uint32_t *>(src + 8));
*reinterpret_cast<uint16_t *>(dst + 1) =
BitUtil::byte_swap(*reinterpret_cast<const uint16_t *>(src + 12));
*reinterpret_cast<uint8_t *>(dst) = *reinterpret_cast<const uint8_t *>(src + 14);
return;
case 16:
*reinterpret_cast<uint64_t *>(dst + 8) =
BitUtil::byte_swap(*reinterpret_cast<const uint64_t *>(src));
*reinterpret_cast<uint64_t *>(dst) =
BitUtil::byte_swap(*reinterpret_cast<const uint64_t *>(src + 8));
return;
default:
// Revert to slow loop-based swap.
ByteSwapScalarLoop(source, len, dest);
return;
}
}
static void ByteSwapScalarLoop(const void *src, int len, void *dst) {
//TODO: improve the performance of following code further using BSWAP intrinsic
uint8_t *d = reinterpret_cast<uint8_t *>(dst);
const uint8_t *s = reinterpret_cast<const uint8_t *>(src);
for (int i = 0; i < len; ++i) d[i] = s[len - i - 1];
}
};
} // namespace doris

View File

@ -0,0 +1,233 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/tuple_row_zorder_compare.h"
namespace doris {
RowComparator::RowComparator(Schema* schema) {
}
int RowComparator::operator()(const char* left, const char* right) const {
return -1;
}
TupleRowZOrderComparator::TupleRowZOrderComparator() {
_schema = nullptr;
_sort_col_num = 0;
}
TupleRowZOrderComparator::TupleRowZOrderComparator(int sort_col_num) {
_schema = nullptr;
_sort_col_num = sort_col_num;
}
TupleRowZOrderComparator::TupleRowZOrderComparator(Schema* schema, int sort_col_num)
:_schema(schema), _sort_col_num(sort_col_num) {
_max_col_size = get_type_byte_size(_schema->column(0)->type());
for (size_t i = 1; i < _sort_col_num; ++i) {
if (_max_col_size < get_type_byte_size(_schema->column(i)->type())) {
_max_col_size = get_type_byte_size(_schema->column(i)->type());
}
}
}
int TupleRowZOrderComparator::compare(const char* lhs, const char* rhs) const {
ContiguousRow lhs_row(_schema, lhs);
ContiguousRow rhs_row(_schema, rhs);
if (_max_col_size <= 4) {
return compare_based_on_size<uint32_t, ContiguousRow>(lhs_row, rhs_row);
} else if (_max_col_size <= 8) {
return compare_based_on_size<uint64_t, ContiguousRow>(lhs_row, rhs_row);
} else {
return compare_based_on_size<uint128_t, ContiguousRow>(lhs_row, rhs_row);
}
}
void TupleRowZOrderComparator::max_col_size(const RowCursor& rc) {
_max_col_size = get_type_byte_size(rc.schema()->column(0)->type());
for (size_t i = 1; i < _sort_col_num; ++i) {
if (_max_col_size < get_type_byte_size(rc.schema()->column(i)->type())) {
_max_col_size = get_type_byte_size(rc.schema()->column(i)->type());
}
}
}
int TupleRowZOrderComparator::compare_row(const RowCursor& lhs, const RowCursor& rhs) {
max_col_size(lhs);
if (_max_col_size <= 4) {
return compare_based_on_size<uint32_t, const RowCursor>(lhs, rhs);
} else if (_max_col_size <= 8) {
return compare_based_on_size<uint64_t, const RowCursor>(lhs, lhs);
} else {
return compare_based_on_size<uint128_t, const RowCursor>(lhs, lhs);
}
}
template<typename U, typename LhsRowType>
int TupleRowZOrderComparator::compare_based_on_size(LhsRowType& lhs, LhsRowType& rhs) const {
auto less_msb = [](U x, U y) { return x < y && x < (x ^ y); };
FieldType type = lhs.schema()->column(0)->type();
U msd_lhs = get_shared_representation<U>(lhs.cell(0).is_null() ? nullptr : lhs.cell(0).cell_ptr(),
type);
U msd_rhs = get_shared_representation<U>(rhs.cell(0).is_null() ? nullptr : rhs.cell(0).cell_ptr(),
type);
for (int i = 1; i < _sort_col_num; ++i) {
type = lhs.schema()->column(i)->type();
const void *lhs_v = lhs.cell(i).is_null() ? nullptr : lhs.cell(i).cell_ptr();
const void *rhs_v = rhs.cell(i).is_null() ? nullptr : rhs.cell(i).cell_ptr();
U lhsi = get_shared_representation<U>(lhs_v, type);
U rhsi = get_shared_representation<U>(rhs_v, type);
if (less_msb(msd_lhs ^ msd_rhs, lhsi ^ rhsi)) {
msd_lhs = lhsi;
msd_rhs = rhsi;
}
}
return msd_lhs < msd_rhs ? -1 : (msd_lhs > msd_rhs ? 1 : 0);
}
template<typename U>
U TupleRowZOrderComparator::get_shared_representation(const void *val, FieldType type) const {
// The mask used for setting the sign bit correctly.
if (val == NULL) return 0;
constexpr U mask = (U) 1 << (sizeof(U) * 8 - 1);
switch (type) {
case FieldType::OLAP_FIELD_TYPE_NONE:
return 0;
case FieldType::OLAP_FIELD_TYPE_BOOL:
return static_cast<U>(*reinterpret_cast<const bool *>(val)) << (sizeof(U) * 8 - 1);
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
case FieldType::OLAP_FIELD_TYPE_TINYINT:
return get_shared_int_representation<U, int8_t>(
*reinterpret_cast<const int8_t *>(val), mask);
case FieldType::OLAP_FIELD_TYPE_SMALLINT:
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
return get_shared_int_representation<U, int16_t>(
*reinterpret_cast<const int16_t *>(val), mask);
case FieldType::OLAP_FIELD_TYPE_INT:
return get_shared_int_representation<U, int32_t>(
*reinterpret_cast<const int32_t *>(val), mask);
case FieldType::OLAP_FIELD_TYPE_DATETIME:
case FieldType::OLAP_FIELD_TYPE_DATE:
case FieldType::OLAP_FIELD_TYPE_BIGINT:
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
return get_shared_int_representation<U, int64_t>(
*reinterpret_cast<const int64_t *>(val), mask);
case FieldType::OLAP_FIELD_TYPE_LARGEINT:
return static_cast<U>(*reinterpret_cast<const int128_t *>(val)) ^ mask;
case FieldType::OLAP_FIELD_TYPE_FLOAT:
return get_shared_float_representation<U, float>(val, mask);
case FieldType::OLAP_FIELD_TYPE_DISCRETE_DOUBLE:
case FieldType::OLAP_FIELD_TYPE_DOUBLE:
return get_shared_float_representation<U, double>(val, mask);
case FieldType::OLAP_FIELD_TYPE_CHAR:
case FieldType::OLAP_FIELD_TYPE_VARCHAR:{
const StringValue *string_value = reinterpret_cast<const StringValue *>(val);
return get_shared_string_representation<U>(string_value->ptr, string_value->len);
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL: {
decimal12_t decimal_val = *reinterpret_cast<const decimal12_t*>(val);
int128_t value = decimal_val.integer*DecimalV2Value::ONE_BILLION + decimal_val.fraction;
return static_cast<U>(value) ^ mask;
}
default:
return 0;
}
}
template<typename U, typename T>
U inline TupleRowZOrderComparator::get_shared_int_representation(const T val, U mask) const {
uint64_t shift_size = static_cast<uint64_t>(
std::max(static_cast<int64_t>((sizeof(U) - sizeof(T)) * 8), (int64_t) 0));
return (static_cast<U>(val) << shift_size) ^ mask;
}
template<typename U, typename T>
U inline TupleRowZOrderComparator::get_shared_float_representation(const void *val, U mask) const {
int64_t tmp;
T floating_value = *reinterpret_cast<const T *>(val);
memcpy(&tmp, &floating_value, sizeof(T));
if (UNLIKELY(std::isnan(floating_value))) return 0;
if (floating_value < 0.0) {
// Flipping all bits for negative values.
return static_cast<U>(~tmp) << std::max((sizeof(U) - sizeof(T)) * 8, (uint64_t) 0);
} else {
// Flipping only first bit.
return (static_cast<U>(tmp) << std::max((sizeof(U) - sizeof(T)) * 8, (uint64_t) 0)) ^
mask;
}
}
template<typename U>
U inline TupleRowZOrderComparator::get_shared_string_representation(const char *char_ptr,
int length) const {
int len = length < sizeof(U) ? length : sizeof(U);
if (len == 0) return 0;
U dst = 0;
// We copy the bytes from the string but swap the bytes because of integer endianness.
BitUtil::ByteSwapScalar(&dst, char_ptr, len);
return dst << ((sizeof(U) - len) * 8);
}
int TupleRowZOrderComparator::operator()(const char* lhs, const char* rhs) const {
int result = compare(lhs, rhs);
return result;
}
int TupleRowZOrderComparator::get_type_byte_size(FieldType type) const {
switch (type) {
case FieldType::OLAP_FIELD_TYPE_OBJECT:
case FieldType::OLAP_FIELD_TYPE_HLL:
case FieldType::OLAP_FIELD_TYPE_STRUCT:
case FieldType::OLAP_FIELD_TYPE_ARRAY:
case FieldType::OLAP_FIELD_TYPE_MAP:
case FieldType::OLAP_FIELD_TYPE_CHAR:
case FieldType::OLAP_FIELD_TYPE_VARCHAR:
return 0;
case FieldType::OLAP_FIELD_TYPE_NONE:
case FieldType::OLAP_FIELD_TYPE_BOOL:
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
case FieldType::OLAP_FIELD_TYPE_TINYINT:
return 1;
case FieldType::OLAP_FIELD_TYPE_SMALLINT:
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
return 2;
case FieldType::OLAP_FIELD_TYPE_FLOAT:
case FieldType::OLAP_FIELD_TYPE_INT:
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT:
return 4;
case FieldType::OLAP_FIELD_TYPE_DISCRETE_DOUBLE:
case FieldType::OLAP_FIELD_TYPE_DOUBLE:
case FieldType::OLAP_FIELD_TYPE_BIGINT:
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
return 8;
case FieldType::OLAP_FIELD_TYPE_DECIMAL:
case FieldType::OLAP_FIELD_TYPE_LARGEINT:
case FieldType::OLAP_FIELD_TYPE_DATETIME:
case FieldType::OLAP_FIELD_TYPE_DATE:
return 16;
case FieldType::OLAP_FIELD_TYPE_UNKNOWN:
DCHECK(false);
break;
default:
DCHECK(false);
}
return -1;
}
}

View File

@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_TUPLE_ROW_ZORDER_COMPARE_H
#define DORIS_TUPLE_ROW_ZORDER_COMPARE_H
#include "exec/sort_exec_exprs.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "runtime/descriptors.h"
#include "runtime/raw_value.h"
#include "runtime/tuple.h"
#include "runtime/tuple_row.h"
#include "olap/schema.h"
#include "olap/row.h"
#include "olap/row_cursor.h"
namespace doris {
class RowComparator {
public:
RowComparator()=default;
RowComparator(Schema* schema);
virtual int operator()(const char* left, const char* right) const;
};
class TupleRowZOrderComparator: public RowComparator {
private:
typedef __uint128_t uint128_t;
int _max_col_size = 0;
const Schema* _schema;
int _sort_col_num = 0;
public:
TupleRowZOrderComparator();
TupleRowZOrderComparator(int sort_col_num);
TupleRowZOrderComparator(Schema* schema, int sort_col_num);
int compare(const char* lhs, const char* rhs) const;
void max_col_size(const RowCursor& rc);
int compare_row(const RowCursor& lhs, const RowCursor& rhs);
template<typename U, typename LhsRowType>
int compare_based_on_size(LhsRowType& lhs, LhsRowType& rhs) const;
template<typename U>
U get_shared_representation(const void *val, FieldType type) const;
template<typename U, typename T>
U inline get_shared_int_representation(const T val, U mask) const;
template<typename U, typename T>
U inline get_shared_float_representation(const void *val, U mask) const;
template<typename U>
U inline get_shared_string_representation(const char *char_ptr, int length) const;
virtual int operator()(const char* lhs, const char* rhs) const;
int get_type_byte_size(FieldType type) const;
};
}
#endif //DORIS_TUPLE_ROW_ZORDER_COMPARE_H

View File

@ -55,7 +55,7 @@ TEST_F(SkipTest, Empty) {
std::shared_ptr<MemTracker> tracker(new MemTracker(-1));
std::unique_ptr<MemPool> mem_pool(new MemPool(tracker.get()));
TestComparator cmp;
TestComparator* cmp = new TestComparator();
SkipList<Key, TestComparator> list(cmp, mem_pool.get(), false);
ASSERT_TRUE(!list.Contains(10));
@ -67,6 +67,7 @@ TEST_F(SkipTest, Empty) {
ASSERT_TRUE(!iter.Valid());
iter.SeekToLast();
ASSERT_TRUE(!iter.Valid());
delete cmp;
}
TEST_F(SkipTest, InsertAndLookup) {
@ -77,7 +78,7 @@ TEST_F(SkipTest, InsertAndLookup) {
const int R = 5000;
Random rnd(1000);
std::set<Key> keys;
TestComparator cmp;
TestComparator* cmp = new TestComparator();
SkipList<Key, TestComparator> list(cmp, mem_pool.get(), false);
for (int i = 0; i < N; i++) {
Key key = rnd.Next() % R;
@ -147,6 +148,7 @@ TEST_F(SkipTest, InsertAndLookup) {
}
ASSERT_TRUE(!iter.Valid());
}
delete cmp;
}
// Only non-DUP model will use Find() and InsertWithHint().
@ -158,7 +160,7 @@ TEST_F(SkipTest, InsertWithHintNoneDupModel) {
const int R = 5000;
Random rnd(1000);
std::set<Key> keys;
TestComparator cmp;
TestComparator* cmp = new TestComparator();
SkipList<Key, TestComparator> list(cmp, mem_pool.get(), false);
SkipList<Key, TestComparator>::Hint hint;
for (int i = 0; i < N; i++) {
@ -179,6 +181,7 @@ TEST_F(SkipTest, InsertWithHintNoneDupModel) {
ASSERT_EQ(keys.count(i), 0);
}
}
delete cmp;
}
// We want to make sure that with a single writer and multiple
@ -259,7 +262,7 @@ private:
std::shared_ptr<MemTracker> _mem_tracker;
std::unique_ptr<MemPool> _mem_pool;
std::shared_ptr<TestComparator> _comparator;
// SkipList is not protected by _mu. We just use a single writer
// thread to modify it.
SkipList<Key, TestComparator> _list;
@ -268,8 +271,9 @@ public:
ConcurrentTest()
: _mem_tracker(new MemTracker(-1)),
_mem_pool(new MemPool(_mem_tracker.get())),
_list(TestComparator(), _mem_pool.get(), false) {}
_comparator(new TestComparator()),
_list(_comparator.get(), _mem_pool.get(), false) {}
// REQUIRES: External synchronization
void write_step(Random* rnd) {
const uint32_t k = rnd->Next() % K;

View File

@ -51,7 +51,9 @@
"next_column_unique_id": 3,
"is_in_memory": false,
"delete_sign_idx": -1,
"sequence_col_idx": -1
"sequence_col_idx": -1,
"sort_type": "LEXICAL",
"sort_col_num": 0
},
"rs_metas": [
{

View File

@ -74,5 +74,6 @@ ADD_BE_TEST(broker_storage_backend_test)
ADD_BE_TEST(sort_heap_test)
ADD_BE_TEST(counts_test)
ADD_BE_TEST(date_func_test)
ADD_BE_TEST(tuple_row_zorder_compare_test)
target_link_libraries(Test_util Common Util Gutil ${Boost_LIBRARIES} glog gflags fmt protobuf)

File diff suppressed because it is too large Load Diff

View File

@ -297,6 +297,13 @@ distribution_info
* `dynamic_partition.history_partition_num`: Specify the number of historical partitions to be created.
* `dynamic_partition.reserved_history_periods`: Used to specify the range of reserved history periods.
* Data Sort Info
The relevant parameters of data sort info are as follows:
* `data_sort.sort_type`: the method of data sorting, options: z-order/lexical, default is lexical
* `data_sort.col_num`: the first few columns to sort, col_num muster less than total key counts
### Example
1. Create a detailed model table

View File

@ -298,6 +298,12 @@ distribution_info
* `dynamic_partition.history_partition_num`: 指定创建历史分区的数量。
* `dynamic_partition.reserved_history_periods`: 用于指定保留的历史分区的时间段。
* 数据排序相关
数据排序相关参数如下:
* `data_sort.sort_type`: 数据排序使用的方法,目前支持两种:lexical/z-order,默认是lexical
* `data_sort.col_num`: 数据排序使用的列数,取最前面几列,不能超过总的key 列数
### Example
1. 创建一个明细模型的表

View File

@ -66,6 +66,7 @@ import org.apache.doris.persist.ModifyTableEngineOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TOdbcTableType;
import org.apache.doris.thrift.TSortType;
import org.apache.doris.thrift.TTabletType;
import com.google.common.base.Preconditions;
@ -126,6 +127,10 @@ public class Alter {
private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable, List<AlterClause> alterClauses,
final String clusterName, Database db) throws UserException {
if (olapTable.getDataSortInfo() != null
&& olapTable.getDataSortInfo().getSortType() == TSortType.ZORDER) {
throw new UserException("z-order table can not support schema change!");
}
stmt.rewriteAlterClause(olapTable);
// check conflict alter ops first

View File

@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import com.google.gson.annotations.SerializedName;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TSortType;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
public class DataSortInfo implements Writable {
public static final String DATA_SORT_PROPERTY_PREFIX = "data_sort";
public static final String DATA_SORT_TYPE = "data_sort.sort_type";
public static final String DATA_SORT_COL_NUM = "data_sort.col_num";
@SerializedName(value = "sort_type")
private TSortType sortType;
@SerializedName(value = "col_num")
private int colNum;
public DataSortInfo() {
}
public DataSortInfo(Map<String, String> properties) {
if (properties != null && !properties.isEmpty()) {
if (properties.get(DATA_SORT_TYPE).equalsIgnoreCase("ZORDER")) {
this.sortType = TSortType.ZORDER;
} else {
this.sortType = TSortType.LEXICAL;
}
this.colNum = Integer.parseInt(properties.get(DATA_SORT_COL_NUM));
}
}
public DataSortInfo (TSortType sortType, int colNum) {
this.sortType = sortType;
this.colNum = colNum;
}
public TSortType getSortType() {
return sortType;
}
public void setSortType(TSortType sortType) {
this.sortType = sortType;
}
public int getColNum() {
return colNum;
}
public void setColNum(int colNum) {
this.colNum = colNum;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static DataSortInfo read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, DataSortInfo.class);
}
public boolean equals(DataSortInfo dataSortInfo) {
if (this.sortType != dataSortInfo.sortType) {
return false;
}
if (this.colNum != dataSortInfo.colNum) {
return false;
}
return true;
}
public String toSql() {
String res = ",\n\"" + DATA_SORT_TYPE + "\" = \"" + this.sortType + "\"" +
",\n\"" + DATA_SORT_COL_NUM + "\" = \"" + this.colNum + "\"";
return res;
}
}

View File

@ -93,6 +93,7 @@ import org.apache.doris.analysis.TypeDef;
import org.apache.doris.analysis.UninstallPluginStmt;
import org.apache.doris.analysis.UserDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.analysis.DataSortInfo;
import org.apache.doris.backup.BackupHandler;
import org.apache.doris.blockrule.SqlBlockRuleMgr;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
@ -3254,7 +3255,6 @@ public class Catalog {
// create partition outside db lock
DataProperty dataProperty = singlePartitionDesc.getPartitionDataProperty();
Preconditions.checkNotNull(dataProperty);
// check replica quota if this operation done
long indexNum = indexIdToMeta.size();
long bucketNum = distributionInfo.getBucketNum();
@ -3281,7 +3281,8 @@ public class Catalog {
tabletIdSet, olapTable.getCopiedIndexes(),
singlePartitionDesc.isInMemory(),
olapTable.getStorageFormat(),
singlePartitionDesc.getTabletType()
singlePartitionDesc.getTabletType(),
olapTable.getDataSortInfo()
);
// check again
@ -3512,7 +3513,8 @@ public class Catalog {
List<Index> indexes,
boolean isInMemory,
TStorageFormat storageFormat,
TTabletType tabletType) throws DdlException {
TTabletType tabletType,
DataSortInfo dataSortInfo)throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
@ -3579,7 +3581,8 @@ public class Catalog {
countDownLatch,
indexes,
isInMemory,
tabletType);
tabletType,
dataSortInfo);
task.setStorageFormat(storageFormat);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
@ -3688,6 +3691,20 @@ public class Catalog {
// this should be done before create partition.
Map<String, String> properties = stmt.getProperties();
// get storage format
TStorageFormat storageFormat = TStorageFormat.V2; // default is segment v2
try {
storageFormat = PropertyAnalyzer.analyzeStorageFormat(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setStorageFormat(storageFormat);
// check data sort properties
DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
keysDesc.keysColumnSize(), storageFormat);
olapTable.setDataSortInfo(dataSortInfo);
// analyze bloom filter columns
Set<String> bfColumns = null;
double bfFpp = 0;
@ -3828,15 +3845,6 @@ public class Catalog {
}
Preconditions.checkNotNull(versionInfo);
// get storage format
TStorageFormat storageFormat = TStorageFormat.V2; // default is segment v2
try {
storageFormat = PropertyAnalyzer.analyzeStorageFormat(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setStorageFormat(storageFormat);
// a set to record every new tablet created when create table
// if failed in any step, use this set to do clear things
Set<Long> tabletIdSet = new HashSet<>();
@ -3868,7 +3876,7 @@ public class Catalog {
partitionInfo.getReplicaAllocation(partitionId),
versionInfo, bfColumns, bfFpp,
tabletIdSet, olapTable.getCopiedIndexes(),
isInMemory, storageFormat, tabletType);
isInMemory, storageFormat, tabletType, olapTable.getDataSortInfo());
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
try {
@ -3918,7 +3926,7 @@ public class Catalog {
versionInfo, bfColumns, bfFpp,
tabletIdSet, olapTable.getCopiedIndexes(),
isInMemory, storageFormat,
partitionInfo.getTabletType(entry.getValue()));
partitionInfo.getTabletType(entry.getValue()), olapTable.getDataSortInfo());
olapTable.addPartition(partition);
}
} else {
@ -4235,6 +4243,11 @@ public class Catalog {
sb.append(olapTable.getTableProperty().getDynamicPartitionProperty().getProperties(replicaAlloc));
}
// only display z-order sort info
if (olapTable.isZOrderSort()) {
sb.append(olapTable.getDataSortInfo().toSql());
}
// in memory
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_INMEMORY).append("\" = \"");
sb.append(olapTable.isInMemory()).append("\"");
@ -6761,7 +6774,8 @@ public class Catalog {
copiedTbl.getCopiedIndexes(),
copiedTbl.isInMemory(),
copiedTbl.getStorageFormat(),
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId));
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
copiedTbl.getDataSortInfo());
newPartitions.add(newPartition);
}
} catch (DdlException e) {

View File

@ -24,6 +24,7 @@ import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.DataSortInfo;
import org.apache.doris.backup.Status;
import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
@ -54,6 +55,7 @@ import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import org.apache.doris.thrift.TSortType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -197,6 +199,12 @@ public class OlapTable extends Table {
&& tableProperty.getDynamicPartitionProperty().isExist();
}
public boolean isZOrderSort() {
return tableProperty != null
&& tableProperty.getDataSortInfo() != null
&& tableProperty.getDataSortInfo().getSortType() == TSortType.ZORDER;
}
public void setBaseIndexId(long baseIndexId) {
this.baseIndexId = baseIndexId;
}
@ -1263,7 +1271,6 @@ public class OlapTable extends Table {
tempPartitions.unsetPartitionInfo();
}
}
// In the present, the fullSchema could be rebuilt by schema change while the properties is changed by MV.
// After that, some properties of fullSchema and nameToColumn may be not same as properties of base columns.
// So, here we need to rebuild the fullSchema to ensure the correctness of the properties.
@ -1569,6 +1576,14 @@ public class OlapTable extends Table {
tableProperty.buildInMemory();
}
public void setDataSortInfo(DataSortInfo dataSortInfo) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
}
tableProperty.modifyDataSortInfoProperties(dataSortInfo);
tableProperty.buildDataSortInfo();
}
// return true if partition with given name already exist, both in partitions and temp partitions.
// return false otherwise
public boolean checkPartitionNameExist(String partitionName) {
@ -1714,6 +1729,13 @@ public class OlapTable extends Table {
return tableProperty.getStorageFormat();
}
public DataSortInfo getDataSortInfo() {
if (tableProperty == null) {
return new DataSortInfo(TSortType.LEXICAL, this.getKeysNum());
}
return tableProperty.getDataSortInfo();
}
// For non partitioned table:
// The table's distribute hash columns need to be a subset of the aggregate columns.
//

View File

@ -17,6 +17,7 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.DataSortInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@ -69,6 +70,8 @@ public class TableProperty implements Writable {
*/
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
private DataSortInfo dataSortInfo = new DataSortInfo();
public TableProperty(Map<String, String> properties) {
this.properties = properties;
}
@ -127,6 +130,17 @@ public class TableProperty implements Writable {
return this;
}
public TableProperty buildDataSortInfo() {
HashMap<String, String> dataSortInfoProperties = new HashMap<>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(DataSortInfo.DATA_SORT_PROPERTY_PREFIX)) {
dataSortInfoProperties.put(entry.getKey(), entry.getValue());
}
}
dataSortInfo = new DataSortInfo(dataSortInfoProperties);
return this;
}
public TableProperty buildStorageFormat() {
storageFormat = TStorageFormat.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT,
TStorageFormat.DEFAULT.name()));
@ -137,6 +151,11 @@ public class TableProperty implements Writable {
properties.putAll(modifyProperties);
}
public void modifyDataSortInfoProperties(DataSortInfo dataSortInfo) {
properties.put(DataSortInfo.DATA_SORT_TYPE, String.valueOf(dataSortInfo.getSortType()));
properties.put(DataSortInfo.DATA_SORT_COL_NUM, String.valueOf(dataSortInfo.getColNum()));
}
public void setReplicaAlloc(ReplicaAllocation replicaAlloc) {
this.replicaAlloc = replicaAlloc;
// set it to "properties" so that this info can be persisted
@ -178,6 +197,10 @@ public class TableProperty implements Writable {
return storageFormat;
}
public DataSortInfo getDataSortInfo() {
return dataSortInfo;
}
public void buildReplicaAllocation() {
try {
// Must copy the properties because "analyzeReplicaAllocation" with remove the property
@ -200,7 +223,8 @@ public class TableProperty implements Writable {
TableProperty tableProperty = GsonUtils.GSON.fromJson(Text.readString(in), TableProperty.class)
.executeBuildDynamicProperty()
.buildInMemory()
.buildStorageFormat();
.buildStorageFormat()
.buildDataSortInfo();
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_105) {
// get replica num from property map and create replica allocation
String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);

View File

@ -17,6 +17,7 @@
package org.apache.doris.common.util;
import org.apache.doris.analysis.DataSortInfo;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
@ -34,6 +35,7 @@ import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletType;
import org.apache.doris.thrift.TSortType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@ -534,4 +536,43 @@ public class PropertyAnalyzer {
}
return replicaAlloc;
}
public static DataSortInfo analyzeDataSortInfo(Map<String, String> properties, KeysType keyType,
int keyCount, TStorageFormat storageFormat) throws AnalysisException {
if (properties == null || properties.isEmpty()) {
return new DataSortInfo(TSortType.LEXICAL, keyCount);
}
String sortMethod = TSortType.LEXICAL.name();
if (properties.containsKey(DataSortInfo.DATA_SORT_TYPE)) {
sortMethod = properties.remove(DataSortInfo.DATA_SORT_TYPE);
}
TSortType sortType = TSortType.LEXICAL;
if (sortMethod.equalsIgnoreCase(TSortType.ZORDER.name())) {
sortType = TSortType.ZORDER;
} else if (sortMethod.equalsIgnoreCase(TSortType.LEXICAL.name())) {
sortType = TSortType.LEXICAL;
} else {
throw new AnalysisException("only support zorder/lexical method!");
}
if (keyType != KeysType.DUP_KEYS && sortType == TSortType.ZORDER) {
throw new AnalysisException("only duplicate key supports zorder method!");
}
if (storageFormat != TStorageFormat.V2 && sortType == TSortType.ZORDER) {
throw new AnalysisException("only V2 storage format supports zorder method!");
}
int colNum = keyCount;
if (properties.containsKey(DataSortInfo.DATA_SORT_COL_NUM)) {
try {
colNum = Integer.valueOf(properties.remove(DataSortInfo.DATA_SORT_COL_NUM));
} catch (Exception e) {
throw new AnalysisException("param " + DataSortInfo.DATA_SORT_COL_NUM + " error");
}
}
if (sortType == TSortType.ZORDER && (colNum <= 1 || colNum > keyCount)) {
throw new AnalysisException("z-order needs 2 columns at least, " + keyCount + " columns at most!");
}
DataSortInfo dataSortInfo = new DataSortInfo(sortType, colNum);
return dataSortInfo;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.task;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.DataSortInfo;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
@ -82,6 +83,8 @@ public class CreateReplicaTask extends AgentTask {
// true if this task is created by recover request(See comment of Config.recover_with_empty_tablet)
private boolean isRecoverTask = false;
private DataSortInfo dataSortInfo;
public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
short shortKeyColumnCount, int schemaHash, long version, long versionHash,
KeysType keysType, TStorageType storageType,
@ -114,6 +117,40 @@ public class CreateReplicaTask extends AgentTask {
this.tabletType = tabletType;
}
public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
short shortKeyColumnCount, int schemaHash, long version, long versionHash,
KeysType keysType, TStorageType storageType,
TStorageMedium storageMedium, List<Column> columns,
Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch,
List<Index> indexes,
boolean isInMemory,
TTabletType tabletType,
DataSortInfo dataSortInfo) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
this.shortKeyColumnCount = shortKeyColumnCount;
this.schemaHash = schemaHash;
this.version = version;
this.versionHash = versionHash;
this.keysType = keysType;
this.storageType = storageType;
this.storageMedium = storageMedium;
this.columns = columns;
this.bfColumns = bfColumns;
this.indexes = indexes;
this.bfFpp = bfFpp;
this.latch = latch;
this.isInMemory = isInMemory;
this.tabletType = tabletType;
this.dataSortInfo = dataSortInfo;
}
public void setIsRecoverTask(boolean isRecoverTask) {
this.isRecoverTask = isRecoverTask;
}
@ -165,6 +202,10 @@ public class CreateReplicaTask extends AgentTask {
tSchema.setSchemaHash(schemaHash);
tSchema.setKeysType(keysType.toThrift());
tSchema.setStorageType(storageType);
if (dataSortInfo != null) {
tSchema.setSortType(dataSortInfo.getSortType());
tSchema.setSortColNum(dataSortInfo.getColNum());
}
int deleteSign = -1;
int sequenceCol = -1;
List<TColumn> tColumns = new ArrayList<TColumn>();

View File

@ -478,4 +478,45 @@ public class CreateTableTest {
");"));
}
@Test
public void testZOrderTable() {
// create lexically sort table
ExceptionChecker.expectThrowsNoException(() -> createTable(
"create table test.zorder_tbl1\n" + "(k1 varchar(40), k2 int, k3 int)\n" + "duplicate key(k1, k2, k3)\n"
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
+ "distributed by hash(k1) buckets 1\n" + "properties('replication_num' = '1'," +
" 'data_sort.sort_type' = 'lexical');"));
// create z-order sort table, default col_num
ExceptionChecker.expectThrowsNoException(() -> createTable(
"create table test.zorder_tbl2\n" + "(k1 varchar(40), k2 int, k3 int)\n" + "duplicate key(k1, k2, k3)\n"
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
+ "distributed by hash(k1) buckets 1\n" + "properties('replication_num' = '1'," +
" 'data_sort.sort_type' = 'zorder');"));
// create z-order sort table, define sort_col_num
ExceptionChecker.expectThrowsNoException(() -> createTable(
"create table test.zorder_tbl3\n" + "(k1 varchar(40), k2 int, k3 int)\n" + "duplicate key(k1, k2, k3)\n"
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
+ "distributed by hash(k1) buckets 1\n" + "properties('replication_num' = '1'," +
" 'data_sort.sort_type' = 'zorder'," +
" 'data_sort.col_num' = '2');"));
// create z-order sort table, only 1 sort column
ExceptionChecker
.expectThrowsWithMsg(AnalysisException.class, "z-order needs 2 columns at least, 3 columns at most",
() -> createTable("create table test.zorder_tbl4\n" + "(k1 varchar(40), k2 int, k3 int)\n" + "duplicate key(k1, k2, k3)\n"
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
+ "distributed by hash(k1) buckets 1\n" + "properties('replication_num' = '1'," +
" 'data_sort.sort_type' = 'zorder'," +
" 'data_sort.col_num' = '1');"));
// create z-order sort table, sort column is empty
ExceptionChecker
.expectThrowsWithMsg(AnalysisException.class, "param data_sort.col_num error",
() -> createTable("create table test.zorder_tbl4\n" + "(k1 varchar(40), k2 int, k3 int)\n" + "duplicate key(k1, k2, k3)\n"
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
+ "distributed by hash(k1) buckets 1\n" + "properties('replication_num' = '1'," +
" 'data_sort.sort_type' = 'zorder'," +
" 'data_sort.col_num' = '');"));
}
}

View File

@ -262,6 +262,11 @@ message ColumnPB {
repeated string children_column_names = 18;
}
enum SortType {
LEXICAL = 0;
ZORDER = 1;
}
message TabletSchemaPB {
optional KeysType keys_type = 1; // OLAPHeaderMessage.keys_type
repeated ColumnPB column = 2; // OLAPHeaderMessage.column
@ -273,6 +278,8 @@ message TabletSchemaPB {
optional bool is_in_memory = 8 [default=false];
optional int32 delete_sign_idx = 9 [default = -1];
optional int32 sequence_col_idx = 10 [default= -1];
optional SortType sort_type = 11;
optional int32 sort_col_num = 12;
}
enum TabletStatePB {

View File

@ -49,6 +49,8 @@ struct TTabletSchema {
8: optional bool is_in_memory
9: optional i32 delete_sign_idx = -1
10: optional i32 sequence_col_idx = -1
11: optional Types.TSortType sort_type
12: optional i32 sort_col_num
}
// this enum stands for different storage format in src_backends

View File

@ -424,6 +424,11 @@ enum TMergeType {
DELETE
}
enum TSortType {
LEXICAL,
ZORDER,
}
// represent a user identity
struct TUserIdentity {
1: optional string username