Fix segment v2 bug (#1904)

This commit is contained in:
kangpinghuang
2019-09-30 13:50:39 +08:00
committed by ZHAO Chun
parent 262c7f4834
commit 2cecf5901f
11 changed files with 78 additions and 81 deletions

View File

@ -44,10 +44,12 @@ public:
const uint8_t* cell_ptr(size_t idx) const { return _data + idx * _type_info->size(); }
uint8_t* mutable_cell_ptr(size_t idx) const { return _data + idx * _type_info->size(); }
bool is_null(size_t idx) const {
return BitmapTest(_null_bitmap, idx);
return is_nullable() && BitmapTest(_null_bitmap, idx);
}
void set_is_null(size_t idx, bool is_null) const {
return BitmapChange(_null_bitmap, idx, is_null);
if (is_nullable()) {
BitmapChange(_null_bitmap, idx, is_null);
}
}
ColumnBlockCell cell(size_t idx) const;

View File

@ -211,7 +211,7 @@ public:
// 将内部的value转成string输出
// 没有考虑实现的性能,仅供DEBUG使用
inline std::string to_string(char* src) const {
inline std::string to_string(const char* src) const {
return _type_info->to_string(src);
}

View File

@ -104,16 +104,16 @@ Status AutoIncrementIterator::next_batch(RowBlockV2* block) {
// This class will iterate all data from internal iterator
// through client call advance().
// Usage:
// MergeContext ctx(iter);
// MergeIteratorContext ctx(iter);
// RETURN_IF_ERROR(ctx.init());
// while (ctx.valid()) {
// visit(ctx.current_row());
// RETURN_IF_ERROR(ctx.advance());
// }
class MergeContext {
class MergeIteratorContext {
public:
// This class don't take iter's ownership, client should delete it
MergeContext(RowwiseIterator* iter)
MergeIteratorContext(RowwiseIterator* iter)
: _iter(iter), _block(iter->schema(), 1024) {
}
@ -151,13 +151,13 @@ private:
size_t _index_in_block = 0;
};
Status MergeContext::init(const StorageReadOptions& opts) {
Status MergeIteratorContext::init(const StorageReadOptions& opts) {
RETURN_IF_ERROR(_iter->init(opts));
RETURN_IF_ERROR(_load_next_block());
return Status::OK();
}
Status MergeContext::advance() {
Status MergeIteratorContext::advance() {
// NOTE: we increase _index_in_block directly to valid one check
_index_in_block++;
do {
@ -172,7 +172,7 @@ Status MergeContext::advance() {
return Status::OK();
}
Status MergeContext::_load_next_block() {
Status MergeIteratorContext::_load_next_block() {
Status st;
do {
_block.clear();
@ -214,19 +214,20 @@ public:
}
private:
std::vector<RowwiseIterator*> _origin_iters;
std::vector<MergeContext*> _merge_ctxs;
std::vector<MergeIteratorContext*> _merge_ctxs;
std::unique_ptr<Schema> _schema;
struct MergeContextComparator {
bool operator()(const MergeContext* lhs, const MergeContext* rhs) const {
bool operator()(const MergeIteratorContext* lhs, const MergeIteratorContext* rhs) const {
auto lhs_row = lhs->current_row();
auto rhs_row = rhs->current_row();
return compare_row(lhs_row, rhs_row) > 0;
}
};
using MergeHeap = std::priority_queue<MergeContext*, std::vector<MergeContext*>, MergeContextComparator>;
using MergeHeap = std::priority_queue<MergeIteratorContext*,
std::vector<MergeIteratorContext*>, MergeContextComparator>;
std::unique_ptr<MergeHeap> _merge_heap;
};
@ -238,7 +239,7 @@ Status MergeIterator::init(const StorageReadOptions& opts) {
_merge_heap.reset(new MergeHeap);
for (auto iter : _origin_iters) {
std::unique_ptr<MergeContext> ctx(new MergeContext(iter));
std::unique_ptr<MergeIteratorContext> ctx(new MergeIteratorContext(iter));
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
continue;

View File

@ -127,7 +127,6 @@ OLAPStatus ColumnDataWriter::write(const RowType& row) {
// copy input row to row block
_row_block->get_row(_row_index, &_cursor);
copy_row(&_cursor, row, _row_block->mem_pool());
next(row);
if (_row_index >= _segment_group->get_num_rows_per_row_block()) {
if (OLAP_SUCCESS != _flush_row_block(false)) {

View File

@ -129,7 +129,6 @@ uint64_t BinaryDictPageBuilder::size() const {
}
Status BinaryDictPageBuilder::get_dictionary_page(Slice* dictionary_page) {
DCHECK(_finished) << "get dictionary page when the builder is not finished";
_dictionary.clear();
_dict_builder->reset();
size_t add_count = 1;
@ -191,7 +190,7 @@ Status BinaryDictPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) {
*n = 0;
return Status::OK();
}
Slice *out = reinterpret_cast<Slice *>(dst->data());
Slice* out = reinterpret_cast<Slice*>(dst->data());
_code_buf.resize((*n) * sizeof(int32_t));
// copy the codewords into a temporary buffer first
@ -202,10 +201,10 @@ Status BinaryDictPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) {
ColumnBlockView tmp_block_view(&column_block);
RETURN_IF_ERROR(_data_page_decoder->next_batch(n, &tmp_block_view));
for (int i = 0; i < *n; ++i) {
int32_t codeword = *reinterpret_cast<int32_t *>(&_code_buf[i * sizeof(int32_t)]);
int32_t codeword = *reinterpret_cast<int32_t*>(&_code_buf[i * sizeof(int32_t)]);
// get the string from the dict decoder
Slice element = _dict_decoder->string_at_index(codeword);
char *destination = dst->column_block()->arena()->Allocate(element.size);
char* destination = dst->column_block()->arena()->Allocate(element.size);
if (destination == nullptr) {
return Status::MemoryAllocFailed(Substitute("memory allocate failed, size:$0", element.size));
}

View File

@ -85,10 +85,10 @@ private:
std::unique_ptr<BinaryPlainPageBuilder> _dict_builder;
EncodingTypePB _encoding_type;
struct HashOfSlice {
struct HashOfSlice {
size_t operator()(const Slice& slice) const {
return HashStringThoroughly(slice.data, slice.size);
}
}
};
// query for dict item -> dict id
std::unordered_map<Slice, uint32_t, HashOfSlice> _dictionary;
@ -120,7 +120,7 @@ private:
Slice _data;
PageDecoderOptions _options;
std::unique_ptr<PageDecoder> _data_page_decoder;
std::shared_ptr<BinaryPlainPageDecoder> _dict_decoder;
BinaryPlainPageDecoder* _dict_decoder;
bool _parsed;
EncodingTypePB _encoding_type;
faststring _code_buf;

View File

@ -214,7 +214,7 @@ private:
if (idx >= _num_elems) {
return _offsets_pos;
}
const uint8_t *p = reinterpret_cast<const uint8_t *>(&_data[_offsets_pos + idx * sizeof(uint32_t)]);
const uint8_t* p = reinterpret_cast<const uint8_t*>(&_data[_offsets_pos + idx * sizeof(uint32_t)]);
return decode_fixed32_le(p);
}

View File

@ -455,7 +455,9 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, ColumnBlock* dst) {
} else {
for (int i = 0; i < *n; ++i) {
memcpy(dst->mutable_cell_ptr(i), _mem_value.data(), _value_size);
dst->set_is_null(i, false);
if (dst->is_nullable()) {
dst->set_is_null(i, false);
}
}
}
return Status::OK();

View File

@ -1,40 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
namespace doris {
namespace segment_v2 {
class BinaryPlainPageDecoder;
static const size_t DEFAULT_PAGE_SIZE = 1024 * 1024; // default size: 1M
struct PageBuilderOptions {
size_t data_page_size = DEFAULT_PAGE_SIZE;
size_t dict_page_size = DEFAULT_PAGE_SIZE;
};
struct PageDecoderOptions {
std::shared_ptr<BinaryPlainPageDecoder> dict_decoder = nullptr;
};
} // namespace segment_v2
} // namespace doris
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
namespace doris {
namespace segment_v2 {
class BinaryPlainPageDecoder;
static const size_t DEFAULT_PAGE_SIZE = 1024 * 1024; // default size: 1M
struct PageBuilderOptions {
size_t data_page_size = DEFAULT_PAGE_SIZE;
size_t dict_page_size = DEFAULT_PAGE_SIZE;
};
struct PageDecoderOptions {
BinaryPlainPageDecoder* dict_decoder = nullptr;
};
} // namespace segment_v2
} // namespace doris

View File

@ -134,7 +134,7 @@ void create_tablet_schema(KeysType keys_type, TabletSchema* tablet_schema) {
column_1->set_is_key(true);
column_1->set_length(4);
column_1->set_index_length(4);
column_1->set_is_nullable(true);
column_1->set_is_nullable(false);
column_1->set_is_bf_column(false);
ColumnPB* column_2 = tablet_schema_pb.add_column();
@ -143,9 +143,8 @@ void create_tablet_schema(KeysType keys_type, TabletSchema* tablet_schema) {
column_2->set_type("VARCHAR");
column_2->set_length(20);
column_2->set_index_length(20);
column_2->set_is_nullable(true);
column_2->set_is_key(true);
column_2->set_is_nullable(true);
column_2->set_is_nullable(false);
column_2->set_is_bf_column(false);
ColumnPB* column_3 = tablet_schema_pb.add_column();
@ -219,13 +218,18 @@ TEST_F(AlphaRowsetTest, TestAlphaRowsetReader) {
ASSERT_EQ(OLAP_SUCCESS, res);
int32_t field_0 = 10;
row.set_not_null(0);
row.set_field_content(0, reinterpret_cast<char*>(&field_0), _mem_pool.get());
Slice field_1("well");
row.set_not_null(1);
row.set_field_content(1, reinterpret_cast<char*>(&field_1), _mem_pool.get());
int32_t field_2 = 100;
row.set_not_null(2);
row.set_field_content(2, reinterpret_cast<char*>(&field_2), _mem_pool.get());
_alpha_rowset_writer->add_row(row);
_alpha_rowset_writer->flush();
res = _alpha_rowset_writer->add_row(row);
ASSERT_EQ(OLAP_SUCCESS, res);
res = _alpha_rowset_writer->flush();
ASSERT_EQ(OLAP_SUCCESS, res);
RowsetSharedPtr alpha_rowset = _alpha_rowset_writer->build();
ASSERT_TRUE(alpha_rowset != nullptr);
RowsetId rowset_id;
@ -261,14 +265,6 @@ TEST_F(AlphaRowsetTest, TestAlphaRowsetReader) {
} // namespace doris
int main(int argc, char **argv) {
std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
if (!doris::config::init(conffile.c_str(), false)) {
fprintf(stderr, "error read config file. \n");
return -1;
}
doris::init_glog("be-test");
::testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
google::protobuf::ShutdownProtobufLibrary();
return ret;
return RUN_ALL_TESTS();
}

View File

@ -53,7 +53,7 @@ public:
Status status = page_builder.get_dictionary_page(&dict_slice);
ASSERT_TRUE(status.ok());
PageDecoderOptions dict_decoder_options;
std::shared_ptr<BinaryPlainPageDecoder> dict_page_decoder(
std::unique_ptr<BinaryPlainPageDecoder> dict_page_decoder(
new BinaryPlainPageDecoder(dict_slice, dict_decoder_options));
status = dict_page_decoder->init();
ASSERT_TRUE(status.ok());
@ -62,7 +62,7 @@ public:
// decode
PageDecoderOptions decoder_options;
decoder_options.dict_decoder = dict_page_decoder;
decoder_options.dict_decoder = dict_page_decoder.get();
BinaryDictPageDecoder page_decoder(s, decoder_options);
status = page_decoder.init();
ASSERT_TRUE(status.ok());
@ -147,14 +147,14 @@ public:
int slice_index = random() % results.size();
//int slice_index = 1;
PageDecoderOptions dict_decoder_options;
std::shared_ptr<BinaryPlainPageDecoder> dict_page_decoder(
std::unique_ptr<BinaryPlainPageDecoder> dict_page_decoder(
new BinaryPlainPageDecoder(dict_slice, dict_decoder_options));
status = dict_page_decoder->init();
ASSERT_TRUE(status.ok());
// decode
PageDecoderOptions decoder_options;
decoder_options.dict_decoder = dict_page_decoder;
decoder_options.dict_decoder = dict_page_decoder.get();
BinaryDictPageDecoder page_decoder(results[slice_index], decoder_options);
status = page_decoder.init();
ASSERT_TRUE(status.ok());