diff --git a/be/src/olap/column_block.h b/be/src/olap/column_block.h index 704daa17fe..4fc5403241 100644 --- a/be/src/olap/column_block.h +++ b/be/src/olap/column_block.h @@ -44,10 +44,12 @@ public: const uint8_t* cell_ptr(size_t idx) const { return _data + idx * _type_info->size(); } uint8_t* mutable_cell_ptr(size_t idx) const { return _data + idx * _type_info->size(); } bool is_null(size_t idx) const { - return BitmapTest(_null_bitmap, idx); + return is_nullable() && BitmapTest(_null_bitmap, idx); } void set_is_null(size_t idx, bool is_null) const { - return BitmapChange(_null_bitmap, idx, is_null); + if (is_nullable()) { + BitmapChange(_null_bitmap, idx, is_null); + } } ColumnBlockCell cell(size_t idx) const; diff --git a/be/src/olap/field.h b/be/src/olap/field.h index 452cc8f82a..04167899e7 100644 --- a/be/src/olap/field.h +++ b/be/src/olap/field.h @@ -211,7 +211,7 @@ public: // 将内部的value转成string输出 // 没有考虑实现的性能,仅供DEBUG使用 - inline std::string to_string(char* src) const { + inline std::string to_string(const char* src) const { return _type_info->to_string(src); } diff --git a/be/src/olap/generic_iterators.cpp b/be/src/olap/generic_iterators.cpp index fa92bdfe9e..f9630a2de7 100644 --- a/be/src/olap/generic_iterators.cpp +++ b/be/src/olap/generic_iterators.cpp @@ -104,16 +104,16 @@ Status AutoIncrementIterator::next_batch(RowBlockV2* block) { // This class will iterate all data from internal iterator // through client call advance(). // Usage: -// MergeContext ctx(iter); +// MergeIteratorContext ctx(iter); // RETURN_IF_ERROR(ctx.init()); // while (ctx.valid()) { // visit(ctx.current_row()); // RETURN_IF_ERROR(ctx.advance()); // } -class MergeContext { +class MergeIteratorContext { public: // This class don't take iter's ownership, client should delete it - MergeContext(RowwiseIterator* iter) + MergeIteratorContext(RowwiseIterator* iter) : _iter(iter), _block(iter->schema(), 1024) { } @@ -151,13 +151,13 @@ private: size_t _index_in_block = 0; }; -Status MergeContext::init(const StorageReadOptions& opts) { +Status MergeIteratorContext::init(const StorageReadOptions& opts) { RETURN_IF_ERROR(_iter->init(opts)); RETURN_IF_ERROR(_load_next_block()); return Status::OK(); } -Status MergeContext::advance() { +Status MergeIteratorContext::advance() { // NOTE: we increase _index_in_block directly to valid one check _index_in_block++; do { @@ -172,7 +172,7 @@ Status MergeContext::advance() { return Status::OK(); } -Status MergeContext::_load_next_block() { +Status MergeIteratorContext::_load_next_block() { Status st; do { _block.clear(); @@ -214,19 +214,20 @@ public: } private: std::vector _origin_iters; - std::vector _merge_ctxs; + std::vector _merge_ctxs; std::unique_ptr _schema; struct MergeContextComparator { - bool operator()(const MergeContext* lhs, const MergeContext* rhs) const { + bool operator()(const MergeIteratorContext* lhs, const MergeIteratorContext* rhs) const { auto lhs_row = lhs->current_row(); auto rhs_row = rhs->current_row(); return compare_row(lhs_row, rhs_row) > 0; } }; - using MergeHeap = std::priority_queue, MergeContextComparator>; + using MergeHeap = std::priority_queue, MergeContextComparator>; std::unique_ptr _merge_heap; }; @@ -238,7 +239,7 @@ Status MergeIterator::init(const StorageReadOptions& opts) { _merge_heap.reset(new MergeHeap); for (auto iter : _origin_iters) { - std::unique_ptr ctx(new MergeContext(iter)); + std::unique_ptr ctx(new MergeIteratorContext(iter)); RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { continue; diff --git a/be/src/olap/rowset/column_data_writer.cpp b/be/src/olap/rowset/column_data_writer.cpp index 7037f87241..8900d41d48 100644 --- a/be/src/olap/rowset/column_data_writer.cpp +++ b/be/src/olap/rowset/column_data_writer.cpp @@ -127,7 +127,6 @@ OLAPStatus ColumnDataWriter::write(const RowType& row) { // copy input row to row block _row_block->get_row(_row_index, &_cursor); copy_row(&_cursor, row, _row_block->mem_pool()); - next(row); if (_row_index >= _segment_group->get_num_rows_per_row_block()) { if (OLAP_SUCCESS != _flush_row_block(false)) { diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp index 5a77462635..675ed26d58 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp @@ -129,7 +129,6 @@ uint64_t BinaryDictPageBuilder::size() const { } Status BinaryDictPageBuilder::get_dictionary_page(Slice* dictionary_page) { - DCHECK(_finished) << "get dictionary page when the builder is not finished"; _dictionary.clear(); _dict_builder->reset(); size_t add_count = 1; @@ -191,7 +190,7 @@ Status BinaryDictPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) { *n = 0; return Status::OK(); } - Slice *out = reinterpret_cast(dst->data()); + Slice* out = reinterpret_cast(dst->data()); _code_buf.resize((*n) * sizeof(int32_t)); // copy the codewords into a temporary buffer first @@ -202,10 +201,10 @@ Status BinaryDictPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) { ColumnBlockView tmp_block_view(&column_block); RETURN_IF_ERROR(_data_page_decoder->next_batch(n, &tmp_block_view)); for (int i = 0; i < *n; ++i) { - int32_t codeword = *reinterpret_cast(&_code_buf[i * sizeof(int32_t)]); + int32_t codeword = *reinterpret_cast(&_code_buf[i * sizeof(int32_t)]); // get the string from the dict decoder Slice element = _dict_decoder->string_at_index(codeword); - char *destination = dst->column_block()->arena()->Allocate(element.size); + char* destination = dst->column_block()->arena()->Allocate(element.size); if (destination == nullptr) { return Status::MemoryAllocFailed(Substitute("memory allocate failed, size:$0", element.size)); } diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.h b/be/src/olap/rowset/segment_v2/binary_dict_page.h index a37ec7ff58..e434e99a12 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.h +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.h @@ -85,10 +85,10 @@ private: std::unique_ptr _dict_builder; EncodingTypePB _encoding_type; - struct HashOfSlice { + struct HashOfSlice { size_t operator()(const Slice& slice) const { return HashStringThoroughly(slice.data, slice.size); - } + } }; // query for dict item -> dict id std::unordered_map _dictionary; @@ -120,7 +120,7 @@ private: Slice _data; PageDecoderOptions _options; std::unique_ptr _data_page_decoder; - std::shared_ptr _dict_decoder; + BinaryPlainPageDecoder* _dict_decoder; bool _parsed; EncodingTypePB _encoding_type; faststring _code_buf; diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h index e41ffd74a9..8fda450194 100644 --- a/be/src/olap/rowset/segment_v2/binary_plain_page.h +++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h @@ -214,7 +214,7 @@ private: if (idx >= _num_elems) { return _offsets_pos; } - const uint8_t *p = reinterpret_cast(&_data[_offsets_pos + idx * sizeof(uint32_t)]); + const uint8_t* p = reinterpret_cast(&_data[_offsets_pos + idx * sizeof(uint32_t)]); return decode_fixed32_le(p); } diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index a34e21d063..c08b39afec 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -455,7 +455,9 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, ColumnBlock* dst) { } else { for (int i = 0; i < *n; ++i) { memcpy(dst->mutable_cell_ptr(i), _mem_value.data(), _value_size); - dst->set_is_null(i, false); + if (dst->is_nullable()) { + dst->set_is_null(i, false); + } } } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/options.h b/be/src/olap/rowset/segment_v2/options.h index aea886b95d..386afad332 100644 --- a/be/src/olap/rowset/segment_v2/options.h +++ b/be/src/olap/rowset/segment_v2/options.h @@ -1,40 +1,38 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -namespace doris { -namespace segment_v2 { - -class BinaryPlainPageDecoder; - -static const size_t DEFAULT_PAGE_SIZE = 1024 * 1024; // default size: 1M - -struct PageBuilderOptions { - size_t data_page_size = DEFAULT_PAGE_SIZE; - - size_t dict_page_size = DEFAULT_PAGE_SIZE; -}; - -struct PageDecoderOptions { - std::shared_ptr dict_decoder = nullptr; -}; - -} // namespace segment_v2 -} // namespace doris +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +namespace doris { +namespace segment_v2 { + +class BinaryPlainPageDecoder; + +static const size_t DEFAULT_PAGE_SIZE = 1024 * 1024; // default size: 1M + +struct PageBuilderOptions { + size_t data_page_size = DEFAULT_PAGE_SIZE; + + size_t dict_page_size = DEFAULT_PAGE_SIZE; +}; + +struct PageDecoderOptions { + BinaryPlainPageDecoder* dict_decoder = nullptr; +}; + +} // namespace segment_v2 +} // namespace doris diff --git a/be/test/olap/rowset/alpha_rowset_test.cpp b/be/test/olap/rowset/alpha_rowset_test.cpp index 3c8dba175f..9d545357a4 100644 --- a/be/test/olap/rowset/alpha_rowset_test.cpp +++ b/be/test/olap/rowset/alpha_rowset_test.cpp @@ -134,7 +134,7 @@ void create_tablet_schema(KeysType keys_type, TabletSchema* tablet_schema) { column_1->set_is_key(true); column_1->set_length(4); column_1->set_index_length(4); - column_1->set_is_nullable(true); + column_1->set_is_nullable(false); column_1->set_is_bf_column(false); ColumnPB* column_2 = tablet_schema_pb.add_column(); @@ -143,9 +143,8 @@ void create_tablet_schema(KeysType keys_type, TabletSchema* tablet_schema) { column_2->set_type("VARCHAR"); column_2->set_length(20); column_2->set_index_length(20); - column_2->set_is_nullable(true); column_2->set_is_key(true); - column_2->set_is_nullable(true); + column_2->set_is_nullable(false); column_2->set_is_bf_column(false); ColumnPB* column_3 = tablet_schema_pb.add_column(); @@ -219,13 +218,18 @@ TEST_F(AlphaRowsetTest, TestAlphaRowsetReader) { ASSERT_EQ(OLAP_SUCCESS, res); int32_t field_0 = 10; + row.set_not_null(0); row.set_field_content(0, reinterpret_cast(&field_0), _mem_pool.get()); Slice field_1("well"); + row.set_not_null(1); row.set_field_content(1, reinterpret_cast(&field_1), _mem_pool.get()); int32_t field_2 = 100; + row.set_not_null(2); row.set_field_content(2, reinterpret_cast(&field_2), _mem_pool.get()); - _alpha_rowset_writer->add_row(row); - _alpha_rowset_writer->flush(); + res = _alpha_rowset_writer->add_row(row); + ASSERT_EQ(OLAP_SUCCESS, res); + res = _alpha_rowset_writer->flush(); + ASSERT_EQ(OLAP_SUCCESS, res); RowsetSharedPtr alpha_rowset = _alpha_rowset_writer->build(); ASSERT_TRUE(alpha_rowset != nullptr); RowsetId rowset_id; @@ -261,14 +265,6 @@ TEST_F(AlphaRowsetTest, TestAlphaRowsetReader) { } // namespace doris int main(int argc, char **argv) { - std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; - if (!doris::config::init(conffile.c_str(), false)) { - fprintf(stderr, "error read config file. \n"); - return -1; - } - doris::init_glog("be-test"); ::testing::InitGoogleTest(&argc, argv); - int ret = RUN_ALL_TESTS(); - google::protobuf::ShutdownProtobufLibrary(); - return ret; + return RUN_ALL_TESTS(); } diff --git a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp index c26befff3e..9a25b7a845 100644 --- a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp +++ b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp @@ -53,7 +53,7 @@ public: Status status = page_builder.get_dictionary_page(&dict_slice); ASSERT_TRUE(status.ok()); PageDecoderOptions dict_decoder_options; - std::shared_ptr dict_page_decoder( + std::unique_ptr dict_page_decoder( new BinaryPlainPageDecoder(dict_slice, dict_decoder_options)); status = dict_page_decoder->init(); ASSERT_TRUE(status.ok()); @@ -62,7 +62,7 @@ public: // decode PageDecoderOptions decoder_options; - decoder_options.dict_decoder = dict_page_decoder; + decoder_options.dict_decoder = dict_page_decoder.get(); BinaryDictPageDecoder page_decoder(s, decoder_options); status = page_decoder.init(); ASSERT_TRUE(status.ok()); @@ -147,14 +147,14 @@ public: int slice_index = random() % results.size(); //int slice_index = 1; PageDecoderOptions dict_decoder_options; - std::shared_ptr dict_page_decoder( + std::unique_ptr dict_page_decoder( new BinaryPlainPageDecoder(dict_slice, dict_decoder_options)); status = dict_page_decoder->init(); ASSERT_TRUE(status.ok()); // decode PageDecoderOptions decoder_options; - decoder_options.dict_decoder = dict_page_decoder; + decoder_options.dict_decoder = dict_page_decoder.get(); BinaryDictPageDecoder page_decoder(results[slice_index], decoder_options); status = page_decoder.init(); ASSERT_TRUE(status.ok());