[Feature] Support config max length of zone map index (#6293)
This commit is contained in:
@ -22,7 +22,6 @@
|
||||
#include <iostream>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <sstream>
|
||||
|
||||
#define __IN_CONFIGBASE_CPP__
|
||||
#include "common/config.h"
|
||||
|
||||
@ -66,7 +66,14 @@ public:
|
||||
inline const std::string& name() const { return _name; }
|
||||
|
||||
virtual inline void set_to_max(char* buf) const { return _type_info->set_to_max(buf); }
|
||||
virtual inline void set_to_zone_map_max(char* buf) const {
|
||||
set_to_max(buf);
|
||||
}
|
||||
|
||||
inline void set_to_min(char* buf) const { return _type_info->set_to_min(buf); }
|
||||
inline void set_to_zone_map_min(char* buf) const {
|
||||
set_to_min(buf);
|
||||
}
|
||||
|
||||
// This function allocate memory from pool, other than allocate_memory
|
||||
// reserve memory from continuous memory.
|
||||
@ -74,6 +81,10 @@ public:
|
||||
return (char*)pool->allocate(_type_info->size());
|
||||
}
|
||||
|
||||
virtual inline char* allocate_zone_map_value(MemPool* pool) const {
|
||||
return allocate_value(pool);
|
||||
}
|
||||
|
||||
inline void agg_update(RowCursorCell* dest, const RowCursorCell& src,
|
||||
MemPool* mem_pool = nullptr) const {
|
||||
_agg_info->update(dest, src, mem_pool);
|
||||
@ -103,6 +114,8 @@ public:
|
||||
|
||||
virtual size_t get_variable_len() const { return 0; }
|
||||
|
||||
virtual void modify_zone_map_index(char*) const {};
|
||||
|
||||
virtual Field* clone() const {
|
||||
auto* local = new Field();
|
||||
this->clone(local);
|
||||
@ -456,6 +469,37 @@ public:
|
||||
slice->size = _length;
|
||||
memset(slice->data, 0xFF, slice->size);
|
||||
}
|
||||
|
||||
// To prevent zone map cost too many memory, if varchar length
|
||||
// longer than `MAX_ZONE_MAP_INDEX_SIZE`. we just allocate
|
||||
// `MAX_ZONE_MAP_INDEX_SIZE` of memory
|
||||
char* allocate_zone_map_value(MemPool *pool) const override {
|
||||
char* type_value = (char*)pool->allocate(sizeof(Slice));
|
||||
auto slice = reinterpret_cast<Slice*>(type_value);
|
||||
slice->size = MAX_ZONE_MAP_INDEX_SIZE > _length ? _length :
|
||||
MAX_ZONE_MAP_INDEX_SIZE;
|
||||
slice->data = (char*)pool->allocate(slice->size);
|
||||
return type_value;
|
||||
}
|
||||
|
||||
// only varchar filed need modify zone map index when zone map max_value
|
||||
// index longer than `MAX_ZONE_MAP_INDEX_SIZE`. so here we add one
|
||||
// for the last byte
|
||||
// In UTF8 encoding, here do not appear 0xff in last byte
|
||||
void modify_zone_map_index(char* src) const override {
|
||||
auto slice = reinterpret_cast<Slice*>(src);
|
||||
if (slice->size == MAX_ZONE_MAP_INDEX_SIZE) {
|
||||
slice->mutable_data()[slice->size - 1] += 1;
|
||||
}
|
||||
}
|
||||
|
||||
void set_to_zone_map_max(char* ch) const override {
|
||||
auto slice = reinterpret_cast<Slice*>(ch);
|
||||
int length = _length < MAX_ZONE_MAP_INDEX_SIZE ? _length :
|
||||
MAX_ZONE_MAP_INDEX_SIZE;
|
||||
slice->size = length;
|
||||
memset(slice->data, 0xFF, slice->size);
|
||||
}
|
||||
};
|
||||
|
||||
class VarcharField : public Field {
|
||||
@ -484,11 +528,43 @@ public:
|
||||
return Field::allocate_string_value(pool);
|
||||
}
|
||||
|
||||
// To prevent zone map cost too many memory, if varchar length
|
||||
// longer than `MAX_ZONE_MAP_INDEX_SIZE`. we just allocate
|
||||
// `MAX_ZONE_MAP_INDEX_SIZE` of memory
|
||||
char* allocate_zone_map_value(MemPool *pool) const override {
|
||||
char* type_value = (char*)pool->allocate(sizeof(Slice));
|
||||
auto slice = reinterpret_cast<Slice*>(type_value);
|
||||
slice->size = MAX_ZONE_MAP_INDEX_SIZE > _length ? _length :
|
||||
MAX_ZONE_MAP_INDEX_SIZE;
|
||||
slice->data = (char*)pool->allocate(slice->size);
|
||||
return type_value;
|
||||
}
|
||||
|
||||
// only varchar filed need modify zone map index when zone map max_value
|
||||
// index longer than `MAX_ZONE_MAP_INDEX_SIZE`. so here we add one
|
||||
// for the last byte
|
||||
// In UTF8 encoding, here do not appear 0xff in last byte
|
||||
void modify_zone_map_index(char* src) const override {
|
||||
auto slice = reinterpret_cast<Slice*>(src);
|
||||
if (slice->size == MAX_ZONE_MAP_INDEX_SIZE) {
|
||||
slice->mutable_data()[slice->size - 1] += 1;
|
||||
}
|
||||
}
|
||||
|
||||
void set_to_max(char* ch) const override {
|
||||
auto slice = reinterpret_cast<Slice*>(ch);
|
||||
slice->size = _length - OLAP_STRING_MAX_BYTES;
|
||||
memset(slice->data, 0xFF, slice->size);
|
||||
}
|
||||
|
||||
void set_to_zone_map_max(char* ch) const override {
|
||||
auto slice = reinterpret_cast<Slice*>(ch);
|
||||
int length = _length < MAX_ZONE_MAP_INDEX_SIZE ? _length :
|
||||
MAX_ZONE_MAP_INDEX_SIZE;
|
||||
|
||||
slice->size = length - OLAP_STRING_MAX_BYTES;
|
||||
memset(slice->data, 0xFF, slice->size);
|
||||
}
|
||||
};
|
||||
|
||||
class BitmapAggField : public Field {
|
||||
|
||||
@ -62,6 +62,8 @@ static const uint16_t OLAP_STRING_MAX_BYTES = sizeof(StringLengthType);
|
||||
// the max bytes for stored array length
|
||||
static const uint16_t OLAP_ARRAY_MAX_BYTES = OLAP_ARRAY_MAX_LENGTH;
|
||||
|
||||
static constexpr uint16_t MAX_ZONE_MAP_INDEX_SIZE = 512;
|
||||
|
||||
enum OLAPDataVersion {
|
||||
OLAP_V1 = 0,
|
||||
DORIS_V1 = 1,
|
||||
|
||||
@ -33,11 +33,11 @@ namespace segment_v2 {
|
||||
|
||||
ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field)
|
||||
: _field(field), _tracker(new MemTracker(-1, "ZoneMapIndexWriter")), _pool(_tracker.get()) {
|
||||
_page_zone_map.min_value = _field->allocate_value(&_pool);
|
||||
_page_zone_map.max_value = _field->allocate_value(&_pool);
|
||||
_page_zone_map.min_value = _field->allocate_zone_map_value(&_pool);
|
||||
_page_zone_map.max_value = _field->allocate_zone_map_value(&_pool);
|
||||
_reset_zone_map(&_page_zone_map);
|
||||
_segment_zone_map.min_value = _field->allocate_value(&_pool);
|
||||
_segment_zone_map.max_value = _field->allocate_value(&_pool);
|
||||
_segment_zone_map.min_value = _field->allocate_zone_map_value(&_pool);
|
||||
_segment_zone_map.max_value = _field->allocate_zone_map_value(&_pool);
|
||||
_reset_zone_map(&_segment_zone_map);
|
||||
}
|
||||
|
||||
@ -48,15 +48,19 @@ void ZoneMapIndexWriter::add_values(const void* values, size_t count) {
|
||||
const char* vals = reinterpret_cast<const char*>(values);
|
||||
for (int i = 0; i < count; ++i) {
|
||||
if (_field->compare(_page_zone_map.min_value, vals) > 0) {
|
||||
_field->type_info()->direct_copy(_page_zone_map.min_value, vals);
|
||||
_field->type_info()->direct_copy_may_cut(_page_zone_map.min_value, vals);
|
||||
}
|
||||
if (_field->compare(_page_zone_map.max_value, vals) < 0) {
|
||||
_field->type_info()->direct_copy(_page_zone_map.max_value, vals);
|
||||
_field->type_info()->direct_copy_may_cut(_page_zone_map.max_value, vals);
|
||||
}
|
||||
vals += _field->size();
|
||||
}
|
||||
}
|
||||
|
||||
void ZoneMapIndexWriter::moidfy_index_before_flush(struct doris::segment_v2::ZoneMap & zone_map) {
|
||||
_field->modify_zone_map_index(zone_map.max_value);
|
||||
}
|
||||
|
||||
void ZoneMapIndexWriter::reset_page_zone_map() {
|
||||
_page_zone_map.pass_all = true;
|
||||
}
|
||||
@ -81,6 +85,7 @@ Status ZoneMapIndexWriter::flush() {
|
||||
}
|
||||
|
||||
ZoneMapPB zone_map_pb;
|
||||
moidfy_index_before_flush(_page_zone_map);
|
||||
_page_zone_map.to_proto(&zone_map_pb, _field);
|
||||
_reset_zone_map(&_page_zone_map);
|
||||
|
||||
@ -98,6 +103,7 @@ Status ZoneMapIndexWriter::finish(fs::WritableBlock* wblock, ColumnIndexMetaPB*
|
||||
index_meta->set_type(ZONE_MAP_INDEX);
|
||||
ZoneMapIndexPB* meta = index_meta->mutable_zone_map_index();
|
||||
// store segment zone map
|
||||
moidfy_index_before_flush(_segment_zone_map);
|
||||
_segment_zone_map.to_proto(meta->mutable_segment_zone_map(), _field);
|
||||
|
||||
// write out zone map for each data pages
|
||||
|
||||
@ -85,6 +85,8 @@ public:
|
||||
|
||||
Status finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* index_meta);
|
||||
|
||||
void moidfy_index_before_flush(ZoneMap& zone_map);
|
||||
|
||||
uint64_t size() { return _estimated_size; }
|
||||
|
||||
void reset_page_zone_map();
|
||||
@ -93,8 +95,8 @@ public:
|
||||
private:
|
||||
void _reset_zone_map(ZoneMap* zone_map) {
|
||||
// we should allocate max varchar length and set to max for min value
|
||||
_field->set_to_max(zone_map->min_value);
|
||||
_field->set_to_min(zone_map->max_value);
|
||||
_field->set_to_zone_map_max(zone_map->min_value);
|
||||
_field->set_to_zone_map_min(zone_map->max_value);
|
||||
zone_map->has_null = false;
|
||||
zone_map->has_not_null = false;
|
||||
zone_map->pass_all = false;
|
||||
|
||||
@ -29,6 +29,7 @@ ScalarTypeInfo::ScalarTypeInfo(TypeTraitsClass t)
|
||||
_deep_copy(TypeTraitsClass::deep_copy),
|
||||
_copy_object(TypeTraitsClass::copy_object),
|
||||
_direct_copy(TypeTraitsClass::direct_copy),
|
||||
_direct_copy_may_cut(TypeTraitsClass::direct_copy_may_cut),
|
||||
_convert_from(TypeTraitsClass::convert_from),
|
||||
_from_string(TypeTraitsClass::from_string),
|
||||
_to_string(TypeTraitsClass::to_string),
|
||||
|
||||
@ -59,6 +59,9 @@ public:
|
||||
|
||||
virtual void direct_copy(void* dest, const void* src) const = 0;
|
||||
|
||||
// use only in zone map to cut data
|
||||
virtual void direct_copy_may_cut(void* dest, const void* src) const = 0;
|
||||
|
||||
//convert and deep copy value from other type's source
|
||||
virtual OLAPStatus convert_from(void* dest, const void* src, const TypeInfo* src_type,
|
||||
MemPool* mem_pool) const = 0;
|
||||
@ -100,6 +103,8 @@ public:
|
||||
|
||||
inline void direct_copy(void* dest, const void* src) const override { _direct_copy(dest, src); }
|
||||
|
||||
inline void direct_copy_may_cut(void* dest, const void* src) const override { _direct_copy_may_cut(dest, src); }
|
||||
|
||||
//convert and deep copy value from other type's source
|
||||
OLAPStatus convert_from(void* dest, const void* src, const TypeInfo* src_type,
|
||||
MemPool* mem_pool) const override {
|
||||
@ -130,6 +135,7 @@ private:
|
||||
void (*_deep_copy)(void* dest, const void* src, MemPool* mem_pool);
|
||||
void (*_copy_object)(void* dest, const void* src, MemPool* mem_pool);
|
||||
void (*_direct_copy)(void* dest, const void* src);
|
||||
void (*_direct_copy_may_cut)(void* dest, const void* src);
|
||||
OLAPStatus (*_convert_from)(void* dest, const void* src, const TypeInfo* src_type,
|
||||
MemPool* mem_pool);
|
||||
|
||||
@ -291,6 +297,10 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
inline void direct_copy_may_cut(void* dest, const void* src) const override {
|
||||
direct_copy(dest, src);
|
||||
}
|
||||
|
||||
OLAPStatus convert_from(void* dest, const void* src, const TypeInfo* src_type,
|
||||
MemPool* mem_pool) const override {
|
||||
return OLAPStatus::OLAP_ERR_FUNC_NOT_IMPLEMENTED;
|
||||
@ -492,6 +502,10 @@ struct BaseFieldtypeTraits : public CppTypeTraits<field_type> {
|
||||
*reinterpret_cast<CppType*>(dest) = *reinterpret_cast<const CppType*>(src);
|
||||
}
|
||||
|
||||
static inline void direct_copy_may_cut(void* dest, const void* src) {
|
||||
direct_copy(dest, src);
|
||||
}
|
||||
|
||||
static OLAPStatus convert_from(void* dest, const void* src, const TypeInfo* src_type,
|
||||
MemPool* mem_pool) {
|
||||
return OLAPStatus::OLAP_ERR_FUNC_NOT_IMPLEMENTED;
|
||||
@ -510,9 +524,7 @@ struct BaseFieldtypeTraits : public CppTypeTraits<field_type> {
|
||||
}
|
||||
|
||||
static std::string to_string(const void* src) {
|
||||
std::stringstream stream;
|
||||
stream << *reinterpret_cast<const CppType*>(src);
|
||||
return stream.str();
|
||||
return std::to_string(*reinterpret_cast<const CppType*>(src));
|
||||
}
|
||||
|
||||
static OLAPStatus from_string(void* buf, const std::string& scan_key) {
|
||||
@ -704,6 +716,11 @@ struct FieldTypeTraits<OLAP_FIELD_TYPE_LARGEINT>
|
||||
static void direct_copy(void* dest, const void* src) {
|
||||
*reinterpret_cast<PackedInt128*>(dest) = *reinterpret_cast<const PackedInt128*>(src);
|
||||
}
|
||||
|
||||
static inline void direct_copy_may_cut(void* dest, const void* src) {
|
||||
direct_copy(dest, src);
|
||||
}
|
||||
|
||||
static void set_to_max(void* buf) {
|
||||
*reinterpret_cast<PackedInt128*>(buf) = ~((int128_t)(1) << 127);
|
||||
}
|
||||
@ -979,6 +996,7 @@ struct FieldTypeTraits<OLAP_FIELD_TYPE_CHAR> : public BaseFieldtypeTraits<OLAP_F
|
||||
auto slice = reinterpret_cast<const Slice*>(src);
|
||||
return slice->to_string();
|
||||
}
|
||||
|
||||
static void deep_copy(void* dest, const void* src, MemPool* mem_pool) {
|
||||
auto l_slice = reinterpret_cast<Slice*>(dest);
|
||||
auto r_slice = reinterpret_cast<const Slice*>(src);
|
||||
@ -1005,6 +1023,17 @@ struct FieldTypeTraits<OLAP_FIELD_TYPE_CHAR> : public BaseFieldtypeTraits<OLAP_F
|
||||
auto slice = reinterpret_cast<Slice*>(buf);
|
||||
memset(slice->data, 0, slice->size);
|
||||
}
|
||||
|
||||
static void direct_copy_may_cut(void* dest, const void* src) {
|
||||
auto l_slice = reinterpret_cast<Slice*>(dest);
|
||||
auto r_slice = reinterpret_cast<const Slice*>(src);
|
||||
|
||||
auto min_size = MAX_ZONE_MAP_INDEX_SIZE >= r_slice->size ? r_slice->size :
|
||||
MAX_ZONE_MAP_INDEX_SIZE;
|
||||
memory_copy(l_slice->data, r_slice->data, min_size);
|
||||
l_slice->size = min_size;
|
||||
}
|
||||
|
||||
static uint32_t hash_code(const void* data, uint32_t seed) {
|
||||
auto slice = reinterpret_cast<const Slice*>(data);
|
||||
return HashUtil::hash(slice->data, slice->size, seed);
|
||||
|
||||
@ -22,9 +22,7 @@
|
||||
#include <sstream>
|
||||
|
||||
#include "common/object_pool.h"
|
||||
#include "exprs/expr.h"
|
||||
#include "gen_cpp/Descriptors_types.h"
|
||||
#include "gen_cpp/PlanNodes_types.h"
|
||||
#include "gen_cpp/descriptors.pb.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -25,11 +25,9 @@
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/string_value.h"
|
||||
#include "runtime/tuple_row.h"
|
||||
//#include "runtime/mem_tracker.h"
|
||||
#include "gen_cpp/Data_types.h"
|
||||
#include "gen_cpp/data.pb.h"
|
||||
#include "runtime/collection_value.h"
|
||||
#include "util/debug_util.h"
|
||||
|
||||
using std::vector;
|
||||
|
||||
|
||||
@ -25,7 +25,6 @@
|
||||
#include "codegen/doris_ir.h"
|
||||
#include "common/logging.h"
|
||||
#include "runtime/buffered_block_mgr2.h" // for BufferedBlockMgr2::Block
|
||||
// #include "runtime/buffered_tuple_stream2.inline.h"
|
||||
#include "runtime/bufferpool/buffer_pool.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/disk_io_mgr.h"
|
||||
|
||||
@ -52,12 +52,9 @@
|
||||
#include "service/http_service.h"
|
||||
#include "util/debug_util.h"
|
||||
#include "util/doris_metrics.h"
|
||||
#include "util/file_utils.h"
|
||||
#include "util/logging.h"
|
||||
#include "util/network_util.h"
|
||||
#include "util/thrift_rpc_helper.h"
|
||||
#include "util/thrift_server.h"
|
||||
#include "util/thrift_util.h"
|
||||
#include "util/uid_util.h"
|
||||
|
||||
static void help(const char*);
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "common/config.h"
|
||||
#include "olap/rowset/segment_v2/zone_map_index.h"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
@ -99,6 +100,47 @@ public:
|
||||
ASSERT_EQ(true, zone_maps[2].has_null());
|
||||
ASSERT_EQ(false, zone_maps[2].has_not_null());
|
||||
}
|
||||
|
||||
void test_cut_zone_map(std::string testname, Field* field) {
|
||||
std::string filename = kTestDir + "/" + testname;
|
||||
|
||||
ZoneMapIndexWriter builder(field);
|
||||
char ch = 'a';
|
||||
char buf[1024];
|
||||
for (int i = 0; i < 5; i++) {
|
||||
memset(buf, ch + i, 1024);
|
||||
Slice slice(buf, 1024);
|
||||
builder.add_values((const uint8_t*)&slice, 1);
|
||||
}
|
||||
builder.flush();
|
||||
|
||||
// write out zone map index
|
||||
ColumnIndexMetaPB index_meta;
|
||||
{
|
||||
std::unique_ptr<fs::WritableBlock> wblock;
|
||||
fs::CreateBlockOptions opts({filename});
|
||||
ASSERT_TRUE(fs::fs_util::block_manager()->create_block(opts, &wblock).ok());
|
||||
ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok());
|
||||
ASSERT_EQ(ZONE_MAP_INDEX, index_meta.type());
|
||||
ASSERT_TRUE(wblock->close().ok());
|
||||
}
|
||||
|
||||
ZoneMapIndexReader column_zone_map(filename, &index_meta.zone_map_index());
|
||||
Status status = column_zone_map.load(true, false);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(1, column_zone_map.num_pages());
|
||||
const std::vector<ZoneMapPB>& zone_maps = column_zone_map.page_zone_maps();
|
||||
ASSERT_EQ(1, zone_maps.size());
|
||||
|
||||
char value[512];
|
||||
memset(value, 'a', 512);
|
||||
ASSERT_EQ(value, zone_maps[0].min());
|
||||
memset(value, 'f', 512);
|
||||
value[511] += 1;
|
||||
ASSERT_EQ(value, zone_maps[0].max());
|
||||
ASSERT_EQ(false, zone_maps[0].has_null());
|
||||
ASSERT_EQ(true, zone_maps[0].has_not_null());
|
||||
}
|
||||
};
|
||||
|
||||
// Test for int
|
||||
@ -171,6 +213,15 @@ TEST_F(ColumnZoneMapTest, NormalTestCharPage) {
|
||||
delete field;
|
||||
}
|
||||
|
||||
// Test for zone map limit
|
||||
TEST_F(ColumnZoneMapTest, ZoneMapCut) {
|
||||
TabletColumn varchar_column = create_varchar_key(0);
|
||||
varchar_column.set_index_length(1024);
|
||||
Field* field = FieldFactory::create(varchar_column);
|
||||
test_string("ZoneMapCut", field);
|
||||
delete field;
|
||||
}
|
||||
|
||||
} // namespace segment_v2
|
||||
} // namespace doris
|
||||
|
||||
|
||||
Reference in New Issue
Block a user