From 70aa9d6ca85a131fa69c4f487434790804db9b02 Mon Sep 17 00:00:00 2001 From: Binglin Chang Date: Wed, 3 Jun 2020 15:42:38 +0800 Subject: [PATCH] [Memory Engine] Add MemTabletScan (#3734) --- be/src/olap/base_tablet.cpp | 29 +-- be/src/olap/memory/CMakeLists.txt | 2 + be/src/olap/memory/column_block.cpp | 1 + be/src/olap/memory/column_block.h | 4 + be/src/olap/memory/column_reader.h | 13 +- be/src/olap/memory/mem_sub_tablet.cpp | 6 +- be/src/olap/memory/mem_tablet.cpp | 34 +++- be/src/olap/memory/mem_tablet.h | 12 +- be/src/olap/memory/mem_tablet_scan.cpp | 59 ++++++ be/src/olap/memory/mem_tablet_scan.h | 82 ++++++++ be/src/olap/memory/partial_row_batch.cpp | 4 +- be/src/olap/memory/partial_row_batch.h | 4 +- be/src/olap/memory/row_block.cpp | 28 +++ be/src/olap/memory/row_block.h | 42 ++++ be/src/olap/memory/schema.h | 2 + be/src/olap/memory/typed_column_writer.h | 2 +- be/src/olap/memory/write_txn.h | 2 + be/test/olap/CMakeLists.txt | 1 + be/test/olap/memory/mem_tablet_test.cpp | 192 ++++++++++++++++++ .../olap/memory/partial_row_batch_test.cpp | 2 +- 20 files changed, 483 insertions(+), 38 deletions(-) create mode 100644 be/src/olap/memory/mem_tablet_scan.cpp create mode 100644 be/src/olap/memory/mem_tablet_scan.h create mode 100644 be/src/olap/memory/row_block.cpp create mode 100644 be/src/olap/memory/row_block.h create mode 100644 be/test/olap/memory/mem_tablet_test.cpp diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 368f5ed880..d544f3653e 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -16,22 +16,21 @@ // under the License. #include "olap/base_tablet.h" -#include "util/path_util.h" + #include "olap/data_dir.h" +#include "util/path_util.h" namespace doris { -BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) : - _state(tablet_meta->tablet_state()), - _tablet_meta(tablet_meta), - _schema(tablet_meta->tablet_schema()), - _data_dir(data_dir) { +BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) + : _state(tablet_meta->tablet_state()), + _tablet_meta(tablet_meta), + _schema(tablet_meta->tablet_schema()), + _data_dir(data_dir) { _gen_tablet_path(); } -BaseTablet::~BaseTablet() { -} - +BaseTablet::~BaseTablet() {} OLAPStatus BaseTablet::set_tablet_state(TabletState state) { if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) { @@ -44,11 +43,13 @@ OLAPStatus BaseTablet::set_tablet_state(TabletState state) { } void BaseTablet::_gen_tablet_path() { - std::string path = _data_dir->path() + DATA_PREFIX; - path = path_util::join_path_segments(path, std::to_string(_tablet_meta->shard_id())); - path = path_util::join_path_segments(path, std::to_string(_tablet_meta->tablet_id())); - path = path_util::join_path_segments(path, std::to_string(_tablet_meta->schema_hash())); - _tablet_path = path; + if (_data_dir != nullptr) { + std::string path = _data_dir->path() + DATA_PREFIX; + path = path_util::join_path_segments(path, std::to_string(_tablet_meta->shard_id())); + path = path_util::join_path_segments(path, std::to_string(_tablet_meta->tablet_id())); + path = path_util::join_path_segments(path, std::to_string(_tablet_meta->schema_hash())); + _tablet_path = path; + } } } /* namespace doris */ diff --git a/be/src/olap/memory/CMakeLists.txt b/be/src/olap/memory/CMakeLists.txt index b552dfe96d..e674df1535 100644 --- a/be/src/olap/memory/CMakeLists.txt +++ b/be/src/olap/memory/CMakeLists.txt @@ -29,8 +29,10 @@ add_library(Memory STATIC delta_index.cpp hash_index.cpp mem_tablet.cpp + mem_tablet_scan.cpp mem_sub_tablet.cpp partial_row_batch.cpp + row_block.cpp schema.cpp write_txn.cpp ) diff --git a/be/src/olap/memory/column_block.cpp b/be/src/olap/memory/column_block.cpp index 1d25fbcaa7..050aecb477 100644 --- a/be/src/olap/memory/column_block.cpp +++ b/be/src/olap/memory/column_block.cpp @@ -64,6 +64,7 @@ Status ColumnBlock::copy_to(ColumnBlock* dest, size_t size, size_t esize) { RETURN_IF_ERROR(dest->nulls().alloc(dest->size())); memcpy(dest->nulls().data(), nulls().data(), size); } + memcpy(dest->data().data(), data().data(), size * esize); return Status::OK(); } diff --git a/be/src/olap/memory/column_block.h b/be/src/olap/memory/column_block.h index 5dc00249cf..c658765a07 100644 --- a/be/src/olap/memory/column_block.h +++ b/be/src/olap/memory/column_block.h @@ -34,8 +34,12 @@ public: Buffer& data() { return _data; } + const Buffer& data() const { return _data; } + Buffer& nulls() { return _nulls; } + const Buffer& nulls() const { return _nulls; } + // Allocate memory for this block, with space for size elements and each // element have esize byte size Status alloc(size_t size, size_t esize); diff --git a/be/src/olap/memory/column_reader.h b/be/src/olap/memory/column_reader.h index 4a0271f57c..d747ba615e 100644 --- a/be/src/olap/memory/column_reader.h +++ b/be/src/olap/memory/column_reader.h @@ -36,6 +36,7 @@ namespace memory { // Note: this class is only intended for single-thread single reader usage. class ColumnBlockHolder { public: + ColumnBlockHolder() {} ColumnBlockHolder(ColumnBlock* cb, bool own) : _cb(cb), _own_cb(own) {} void init(ColumnBlock* cb, bool own) { @@ -46,7 +47,9 @@ public: void release() { if (_own_cb) { - delete _cb; + // use delete _cb directly will cause DCHECK fail in DEBUG mode + // so we use scoped_refptr to free _cb instead + scoped_refptr ref(_cb); _own_cb = false; } _cb = nullptr; @@ -56,13 +59,7 @@ public: bool own() const { return _own_cb; } - ~ColumnBlockHolder() { - if (_own_cb) { - delete _cb; - _cb = nullptr; - _own_cb = false; - } - } + ~ColumnBlockHolder() { release(); } private: ColumnBlock* _cb = nullptr; diff --git a/be/src/olap/memory/mem_sub_tablet.cpp b/be/src/olap/memory/mem_sub_tablet.cpp index 8bbdf5fac4..3305bb5e1e 100644 --- a/be/src/olap/memory/mem_sub_tablet.cpp +++ b/be/src/olap/memory/mem_sub_tablet.cpp @@ -78,7 +78,7 @@ Status MemSubTablet::read_column(uint64_t version, uint32_t cid, cl = _columns[cid]; } } - if (cl.get() != nullptr) { + if (cl.get() == nullptr) { return Status::NotFound("column not found"); } return cl->create_reader(version, reader); @@ -101,7 +101,7 @@ Status MemSubTablet::begin_write(scoped_refptr* schema) { // precache key columns for (size_t i = 0; i < _schema->num_key_columns(); i++) { uint32_t cid = _schema->get(i)->cid(); - if (_writers[cid] != nullptr) { + if (_writers[cid] == nullptr) { RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid])); } } @@ -202,7 +202,7 @@ Status MemSubTablet::commit_write(uint64_t version) { } { std::lock_guard lg(_lock); - if (_index != _write_index) { + if (_index.get() != _write_index.get()) { _index = _write_index; } for (size_t cid = 0; cid < _writers.size(); cid++) { diff --git a/be/src/olap/memory/mem_tablet.cpp b/be/src/olap/memory/mem_tablet.cpp index a75e30b318..03a9d45d2e 100644 --- a/be/src/olap/memory/mem_tablet.cpp +++ b/be/src/olap/memory/mem_tablet.cpp @@ -18,6 +18,7 @@ #include "olap/memory/mem_tablet.h" #include "olap/memory/mem_sub_tablet.h" +#include "olap/memory/mem_tablet_scan.h" #include "olap/memory/write_txn.h" namespace doris { @@ -36,11 +37,35 @@ std::shared_ptr MemTablet::create_tablet_from_meta(TabletMetaSharedPt } Status MemTablet::init() { + _max_version = 0; return MemSubTablet::create(0, *_mem_schema.get(), &_sub_tablet); } -Status MemTablet::scan(std::unique_ptr&& spec, std::unique_ptr* scan) { - return Status::NotSupported("scan not supported"); +Status MemTablet::scan(std::unique_ptr* spec, std::unique_ptr* scan) { + uint64_t version = (*spec)->version(); + if (version == UINT64_MAX) { + version = _max_version; + (*spec)->_version = version; + } + if (version > _max_version) { + return Status::InvalidArgument("Illegal scan version (larger than latest version)"); + } + size_t num_rows = 0; + RETURN_IF_ERROR(_sub_tablet->get_size(version, &num_rows)); + num_rows = std::min((*spec)->_limit, num_rows); + std::vector> readers; + auto& columns = (*spec)->columns(); + readers.resize(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) { + const ColumnSchema* cs = _mem_schema->get_by_name(columns[i]); + if (!cs) { + return Status::NotFound("column not found for scan"); + } + RETURN_IF_ERROR(_sub_tablet->read_column(version, cs->cid(), &readers[i])); + } + scan->reset(new MemTabletScan(std::static_pointer_cast(shared_from_this()), spec, + num_rows, &readers)); + return Status::OK(); } Status MemTablet::create_write_txn(std::unique_ptr* wtxn) { @@ -50,12 +75,15 @@ Status MemTablet::create_write_txn(std::unique_ptr* wtxn) { Status MemTablet::commit_write_txn(WriteTxn* wtxn, uint64_t version) { std::lock_guard lg(_write_lock); + DCHECK_LT(_max_version, version); RETURN_IF_ERROR(_sub_tablet->begin_write(&_mem_schema)); for (size_t i = 0; i < wtxn->batch_size(); i++) { auto batch = wtxn->get_batch(i); RETURN_IF_ERROR(_sub_tablet->apply_partial_row_batch(batch)); } - return _sub_tablet->commit_write(version); + RETURN_IF_ERROR(_sub_tablet->commit_write(version)); + _max_version = version; + return Status::OK(); } } // namespace memory diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/mem_tablet.h index cfc8c3cfce..dfafa27299 100644 --- a/be/src/olap/memory/mem_tablet.h +++ b/be/src/olap/memory/mem_tablet.h @@ -52,12 +52,13 @@ public: // Initialize Status init(); - // Scan the tablet, return a MemTabletScan object scan, user can specify projections, - // predicates and aggregations using ScanSpec, currently only support full scan with - // projection. + // Scan the tablet, return a MemTabletScan object scan, user can specify projections + // using ScanSpec, currently only support full scan with projection, will support + // filter/aggregation in the future. // + // Note: spec will be passed to scan object // Note: thread-safe, supports multi-reader concurrency. - Status scan(std::unique_ptr&& spec, std::unique_ptr* scan); + Status scan(std::unique_ptr* spec, std::unique_ptr* scan); // Create a write transaction // @@ -70,6 +71,7 @@ public: Status commit_write_txn(WriteTxn* wtxn, uint64_t version); private: + friend class MemTabletScan; // memory::Schema is used internally rather than TabletSchema, so we need an extra // copy of _schema with type memory::Schema. scoped_refptr _mem_schema; @@ -79,6 +81,8 @@ private: std::mutex _write_lock; + std::atomic _max_version; + DISALLOW_COPY_AND_ASSIGN(MemTablet); }; diff --git a/be/src/olap/memory/mem_tablet_scan.cpp b/be/src/olap/memory/mem_tablet_scan.cpp new file mode 100644 index 0000000000..0d0126deeb --- /dev/null +++ b/be/src/olap/memory/mem_tablet_scan.cpp @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/mem_tablet_scan.h" + +#include "olap/memory/column_reader.h" +#include "olap/memory/mem_sub_tablet.h" +#include "olap/memory/mem_tablet.h" + +namespace doris { +namespace memory { + +MemTabletScan::~MemTabletScan() {} + +MemTabletScan::MemTabletScan(std::shared_ptr&& tablet, std::unique_ptr* spec, + size_t num_rows, std::vector>* readers) + : _tablet(std::move(tablet)), + _schema(_tablet->_mem_schema.get()), + _spec(std::move(*spec)), + _num_rows(num_rows), + _readers(std::move(*readers)) { + _row_block.reset(new RowBlock(_readers.size())); + _next_block = 0; + _num_blocks = num_block(_num_rows, Column::BLOCK_SIZE); +} + +Status MemTabletScan::next_block(const RowBlock** block) { + if (_next_block >= _num_blocks) { + *block = nullptr; + return Status::OK(); + } + size_t rows_in_block = + std::min((size_t)Column::BLOCK_SIZE, _num_rows - _next_block * Column::BLOCK_SIZE); + _row_block->_nrows = rows_in_block; + for (size_t i = 0; i < _readers.size(); ++i) { + RETURN_IF_ERROR( + _readers[i]->get_block(rows_in_block, _next_block, &_row_block->_columns[i])); + } + _next_block++; + *block = _row_block.get(); + return Status::OK(); +} + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/mem_tablet_scan.h b/be/src/olap/memory/mem_tablet_scan.h new file mode 100644 index 0000000000..e6da9ce38e --- /dev/null +++ b/be/src/olap/memory/mem_tablet_scan.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/memory/common.h" +#include "olap/memory/row_block.h" +#include "olap/memory/schema.h" + +namespace doris { +namespace memory { + +class ScanSpec { +public: + ScanSpec(vector&& columns, uint64_t version = UINT64_MAX, + uint64_t limit = UINT64_MAX) + : _version(version), _limit(limit), _columns(std::move(columns)) {} + + uint64_t version() const { return _version; } + + uint64_t limit() const { return _limit; } + + const vector columns() const { return _columns; } + +private: + friend class MemTablet; + + uint64_t _version = UINT64_MAX; + uint64_t _limit = UINT64_MAX; + vector _columns; +}; + +class HashIndex; +class MemTablet; +class ColumnReader; + +// Scanner for MemTablet +class MemTabletScan { +public: + ~MemTabletScan(); + + // Get next row_block, it will remain valid until next call to next_block. + Status next_block(const RowBlock** block); + +private: + friend class MemTablet; + + MemTabletScan(std::shared_ptr&& tablet, std::unique_ptr* spec, + size_t num_rows, std::vector>* readers); + + std::shared_ptr _tablet; + const Schema* _schema = nullptr; + std::unique_ptr _spec; + + size_t _num_rows = 0; + size_t _num_blocks = 0; + // full scan support + std::vector> _readers; + + // returned block + std::unique_ptr _row_block; + size_t _next_block = 0; + + DISALLOW_COPY_AND_ASSIGN(MemTabletScan); +}; + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/partial_row_batch.cpp b/be/src/olap/memory/partial_row_batch.cpp index b7d2b5e2c9..e0cb60c22c 100644 --- a/be/src/olap/memory/partial_row_batch.cpp +++ b/be/src/olap/memory/partial_row_batch.cpp @@ -111,8 +111,8 @@ Status PartialRowBatch::cur_row_get_cell(size_t idx, const ColumnSchema** cs, // Methods for PartialRowWriter -PartialRowWriter::PartialRowWriter(scoped_refptr* schema) - : _schema(*schema), _bit_set_size(_schema->cid_size()), _bit_nullable_size(0) { +PartialRowWriter::PartialRowWriter(const scoped_refptr& schema) + : _schema(schema), _bit_set_size(_schema->cid_size()), _bit_nullable_size(0) { _temp_cells.resize(_schema->cid_size()); } diff --git a/be/src/olap/memory/partial_row_batch.h b/be/src/olap/memory/partial_row_batch.h index 893f865e5f..ac5a2b4d8d 100644 --- a/be/src/olap/memory/partial_row_batch.h +++ b/be/src/olap/memory/partial_row_batch.h @@ -105,7 +105,7 @@ private: // Example usage: // scoped_refptr sc; // Schema::create("id int,uv int,pv int,city tinyint null", &sc); -// PartialRowWriter writer(*sc.get()); +// PartialRowWriter writer(&sc); // writer.start_batch(); // for (auto& row : rows) { // writer.start_row(); @@ -121,7 +121,7 @@ public: static const size_t DEFAULT_BYTE_CAPACITY = 1 << 20; static const size_t DEFAULT_ROW_CAPACIT = 1 << 16; - explicit PartialRowWriter(scoped_refptr* schema); + explicit PartialRowWriter(const scoped_refptr& schema); ~PartialRowWriter(); Status start_batch(size_t row_capacity = DEFAULT_ROW_CAPACIT, diff --git a/be/src/olap/memory/row_block.cpp b/be/src/olap/memory/row_block.cpp new file mode 100644 index 0000000000..c9e9804b05 --- /dev/null +++ b/be/src/olap/memory/row_block.cpp @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/row_block.h" + +namespace doris { +namespace memory { + +RowBlock::RowBlock(size_t num_columns) { + _columns.resize(num_columns); +} + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/row_block.h b/be/src/olap/memory/row_block.h new file mode 100644 index 0000000000..5283dd8cfe --- /dev/null +++ b/be/src/olap/memory/row_block.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/memory/column_block.h" +#include "olap/memory/column_reader.h" +#include "olap/memory/common.h" + +namespace doris { +namespace memory { + +class RowBlock { +public: + size_t num_rows() const { return _nrows; } + size_t num_columns() const { return _columns.size(); } + const ColumnBlock& get_column(size_t idx) const { return *_columns[idx].get(); } + +private: + friend class MemTabletScan; + explicit RowBlock(size_t num_columns); + + size_t _nrows = 0; + vector _columns; +}; + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/schema.h b/be/src/olap/memory/schema.h index 9a41480106..e11d916c85 100644 --- a/be/src/olap/memory/schema.h +++ b/be/src/olap/memory/schema.h @@ -82,6 +82,8 @@ public: inline size_t num_key_columns() const { return _tschema.num_key_columns(); } + const TabletSchema& get_tablet_schema() const { return _tschema; } + // Get ColumnSchema by index const ColumnSchema* get(size_t idx) const; diff --git a/be/src/olap/memory/typed_column_writer.h b/be/src/olap/memory/typed_column_writer.h index fdac8b3096..f180be1b00 100644 --- a/be/src/olap/memory/typed_column_writer.h +++ b/be/src/olap/memory/typed_column_writer.h @@ -201,7 +201,7 @@ public: } Status get_new_column(scoped_refptr* ret) { - if (*ret != _column) { + if (ret->get() != _column.get()) { DLOG(INFO) << StringPrintf("%s switch new column", _column->debug_string().c_str()); (*ret).swap(_column); _column.reset(); diff --git a/be/src/olap/memory/write_txn.h b/be/src/olap/memory/write_txn.h index 74cee3cc80..e766e86d46 100644 --- a/be/src/olap/memory/write_txn.h +++ b/be/src/olap/memory/write_txn.h @@ -40,6 +40,8 @@ public: const Schema& schema() const { return *_schema.get(); } + scoped_refptr get_schema_ptr() const { return _schema; } + // Get number of batches size_t batch_size() const { return _batches.size(); } diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index f00d7b74d2..fc9f2a883a 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -87,3 +87,4 @@ ADD_BE_TEST(memory/column_delta_test) ADD_BE_TEST(memory/schema_test) ADD_BE_TEST(memory/column_test) ADD_BE_TEST(memory/partial_row_batch_test) +ADD_BE_TEST(memory/mem_tablet_test) diff --git a/be/test/olap/memory/mem_tablet_test.cpp b/be/test/olap/memory/mem_tablet_test.cpp new file mode 100644 index 0000000000..979f4622da --- /dev/null +++ b/be/test/olap/memory/mem_tablet_test.cpp @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/mem_tablet.h" + +#include + +#include "olap/memory/mem_tablet_scan.h" +#include "olap/memory/write_txn.h" +#include "olap/tablet_meta.h" + +namespace doris { +namespace memory { + +struct TData { + int32_t id; + int32_t uv; + int32_t pv; + int8_t city; +}; + +TEST(MemTablet, writescan) { + const int num_insert = 2000000; + const int insert_per_write = 500000; + const int num_update = 10000; + const int update_time = 3; + scoped_refptr sc; + ASSERT_TRUE(Schema::create("id int,uv int,pv int,city tinyint null", &sc).ok()); + std::unordered_map col_idx_to_unique_id; + std::vector columns(sc->num_columns()); + for (size_t i = 0; i < sc->num_columns(); i++) { + const ColumnSchema* cs = sc->get(i); + col_idx_to_unique_id[i] = cs->cid(); + TColumn& c = columns[i]; + c.__set_column_name(cs->name()); + TColumnType tct; + if (cs->type() == ColumnType::OLAP_FIELD_TYPE_INT) { + tct.__set_type(TPrimitiveType::INT); + } else if (cs->type() == ColumnType::OLAP_FIELD_TYPE_TINYINT) { + tct.__set_type(TPrimitiveType::TINYINT); + } else { + ASSERT_TRUE(false); + } + c.__set_column_type(tct); + c.__set_is_allow_null(cs->is_nullable()); + c.__set_is_key(cs->is_key()); + c.__set_aggregation_type(TAggregationType::REPLACE); + } + TTabletSchema tschema; + tschema.__set_short_key_column_count(1); + tschema.__set_keys_type(TKeysType::UNIQUE_KEYS); + tschema.__set_columns(columns); + tschema.__set_is_in_memory(false); + tschema.__set_schema_hash(1); + TabletMetaSharedPtr tablet_meta( + new TabletMeta(1, 1, 1, 1, 1, tschema, static_cast(sc->cid_size()), + col_idx_to_unique_id, TabletUid(1, 1), TTabletType::TABLET_TYPE_MEMORY)); + std::shared_ptr tablet = MemTablet::create_tablet_from_meta(tablet_meta, nullptr); + ASSERT_TRUE(tablet->init().ok()); + + uint64_t cur_version = 0; + vector alldata(num_insert); + + // insert + srand(1); + size_t nrow = 0; + for (int insert_start = 0; insert_start < num_insert; insert_start += insert_per_write) { + std::unique_ptr wtx; + EXPECT_TRUE(tablet->create_write_txn(&wtx).ok()); + PartialRowWriter writer(wtx->get_schema_ptr()); + int insert_end = std::min(insert_start + insert_per_write, num_insert); + EXPECT_TRUE(writer.start_batch(insert_per_write + 1, insert_per_write * 32).ok()); + for (int i = insert_start; i < insert_end; i++) { + nrow++; + EXPECT_TRUE(writer.start_row().ok()); + int id = i; + int uv = rand() % 10000; + int pv = rand() % 10000; + int8_t city = rand() % 100; + alldata[i].id = id; + alldata[i].uv = uv; + alldata[i].pv = pv; + alldata[i].city = city; + EXPECT_TRUE(writer.set("id", &id).ok()); + EXPECT_TRUE(writer.set("uv", &uv).ok()); + EXPECT_TRUE(writer.set("pv", &pv).ok()); + EXPECT_TRUE(writer.set("city", city % 2 == 0 ? nullptr : &city).ok()); + EXPECT_TRUE(writer.end_row().ok()); + } + vector wtxn_buff; + EXPECT_TRUE(writer.finish_batch(&wtxn_buff).ok()); + PartialRowBatch* batch = wtx->new_batch(); + EXPECT_TRUE(batch->load(std::move(wtxn_buff)).ok()); + EXPECT_TRUE(tablet->commit_write_txn(wtx.get(), ++cur_version).ok()); + wtx.reset(); + } + + // update + for (int i = 0; i < update_time; i++) { + std::unique_ptr wtx; + EXPECT_TRUE(tablet->create_write_txn(&wtx).ok()); + PartialRowWriter writer(wtx->get_schema_ptr()); + EXPECT_TRUE(writer.start_batch(num_update + 1, num_update * 32).ok()); + size_t nrow = 0; + for (int j = 0; j < num_update; j++) { + nrow++; + EXPECT_TRUE(writer.start_row().ok()); + int id = rand() % num_insert; + int uv = rand() % 10000; + int pv = rand() % 10000; + int8_t city = rand() % 100; + alldata[id].uv = uv; + alldata[id].pv = pv; + alldata[id].city = city; + EXPECT_TRUE(writer.set("id", &id).ok()); + EXPECT_TRUE(writer.set("pv", &pv).ok()); + EXPECT_TRUE(writer.set("city", city % 2 == 0 ? nullptr : &city).ok()); + EXPECT_TRUE(writer.end_row().ok()); + } + vector wtxn_buff; + EXPECT_TRUE(writer.finish_batch(&wtxn_buff).ok()); + PartialRowBatch* batch = wtx->new_batch(); + EXPECT_TRUE(batch->load(std::move(wtxn_buff)).ok()); + EXPECT_TRUE(tablet->commit_write_txn(wtx.get(), ++cur_version).ok()); + wtx.reset(); + } + + // scan perf test + { + double t0 = GetMonoTimeSecondsAsDouble(); + std::unique_ptr scanspec(new ScanSpec({"pv"}, cur_version)); + std::unique_ptr scan; + ASSERT_TRUE(tablet->scan(&scanspec, &scan).ok()); + const RowBlock* rblock = nullptr; + while (true) { + EXPECT_TRUE(scan->next_block(&rblock).ok()); + if (!rblock) { + break; + } + } + double t = GetMonoTimeSecondsAsDouble() - t0; + LOG(INFO) << StringPrintf("scan %d record, time: %.3lfs %.0lf row/s", num_insert, t, + num_insert / t); + scan.reset(); + } + + // scan result validation + { + std::unique_ptr scanspec(new ScanSpec({"pv"}, cur_version)); + std::unique_ptr scan; + ASSERT_TRUE(tablet->scan(&scanspec, &scan).ok()); + size_t curidx = 0; + while (true) { + const RowBlock* rblock = nullptr; + EXPECT_TRUE(scan->next_block(&rblock).ok()); + if (!rblock) { + break; + } + size_t nrows = rblock->num_rows(); + const ColumnBlock& cb = rblock->get_column(0); + for (size_t i = 0; i < nrows; i++) { + int32_t value = cb.data().as()[i]; + EXPECT_EQ(value, alldata[curidx].pv); + curidx++; + } + } + EXPECT_EQ(curidx, (size_t)num_insert); + scan.reset(); + } +} + +} // namespace memory +} // namespace doris + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/olap/memory/partial_row_batch_test.cpp b/be/test/olap/memory/partial_row_batch_test.cpp index 354795cfe6..f031cc9369 100644 --- a/be/test/olap/memory/partial_row_batch_test.cpp +++ b/be/test/olap/memory/partial_row_batch_test.cpp @@ -29,7 +29,7 @@ namespace memory { TEST(PartialRowbatch, write) { scoped_refptr sc; ASSERT_TRUE(Schema::create("id int,uv int,pv int,city tinyint null", &sc).ok()); - PartialRowWriter writer(&sc); + PartialRowWriter writer(sc); srand(1); const int N = 1000; size_t nrow = 0;