diff --git a/be/src/olap/generic_iterators.cpp b/be/src/olap/generic_iterators.cpp index 66e97d6ddd..8fd8e685fd 100644 --- a/be/src/olap/generic_iterators.cpp +++ b/be/src/olap/generic_iterators.cpp @@ -66,9 +66,9 @@ Status AutoIncrementIterator::next_batch(RowBlockV2* block) { while (row_idx < block->capacity() && _rows_returned < _num_rows) { RowBlockRow row = block->row(row_idx); - for (int i = 0; i < _schema.columns().size(); ++i) { + for (int i = 0; i < _schema.num_columns(); ++i) { row.set_is_null(i, false); - auto& col_schema = _schema.columns()[i]; + const auto* col_schema = _schema.column(i); switch (col_schema->type()) { case OLAP_FIELD_TYPE_SMALLINT: *(int16_t*)row.cell_ptr(i) = _rows_returned + i; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index fc6020f91d..c2cc1df0c4 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -31,6 +31,7 @@ #include "olap/row_block.h" #include "olap/row_cursor.h" #include "olap/rowset/beta_rowset_reader.h" +#include "olap/schema.h" #include "olap/rowset/column_data.h" #include "olap/storage_engine.h" #include "olap/tablet.h" @@ -549,14 +550,34 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { size_t start_key_size = read_params.start_key.size(); _keys_param.start_keys.resize(start_key_size, nullptr); + + size_t scan_key_size = read_params.start_key.front().size(); + if (scan_key_size > _tablet->tablet_schema().num_columns()) { + LOG(WARNING) + << "Input param are invalid. Column count is bigger than num_columns of schema. " + << "column_count=" << scan_key_size + << ", schema.num_columns=" << _tablet->tablet_schema().num_columns(); + return OLAP_ERR_INPUT_PARAMETER_ERROR; + } + + std::vector columns(scan_key_size); + std::iota(columns.begin(), columns.end(), 0); + + std::shared_ptr schema = std::make_shared(_tablet->tablet_schema().columns(), columns); + for (size_t i = 0; i < start_key_size; ++i) { + if (read_params.start_key[i].size() != scan_key_size) { + OLAP_LOG_WARNING("The start_key.at(%ld).size == %ld, not equals the %ld", i, read_params.start_key[i].size(), scan_key_size); + return OLAP_ERR_INPUT_PARAMETER_ERROR; + } + if ((_keys_param.start_keys[i] = new (nothrow) RowCursor()) == nullptr) { OLAP_LOG_WARNING("fail to new RowCursor!"); return OLAP_ERR_MALLOC_ERROR; } OLAPStatus res = _keys_param.start_keys[i]->init_scan_key( - _tablet->tablet_schema(), read_params.start_key[i].values()); + _tablet->tablet_schema(), read_params.start_key[i].values(), schema); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res); return res; @@ -572,13 +593,18 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { size_t end_key_size = read_params.end_key.size(); _keys_param.end_keys.resize(end_key_size, nullptr); for (size_t i = 0; i < end_key_size; ++i) { + if (read_params.end_key[i].size() != scan_key_size) { + OLAP_LOG_WARNING("The end_key.at(%ld).size == %ld, not equals the %ld", i, read_params.end_key[i].size(), scan_key_size); + return OLAP_ERR_INPUT_PARAMETER_ERROR; + } + if ((_keys_param.end_keys[i] = new (nothrow) RowCursor()) == nullptr) { OLAP_LOG_WARNING("fail to new RowCursor!"); return OLAP_ERR_MALLOC_ERROR; } OLAPStatus res = _keys_param.end_keys[i]->init_scan_key(_tablet->tablet_schema(), - read_params.end_key[i].values()); + read_params.end_key[i].values(), schema); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res); return res; diff --git a/be/src/olap/row_cursor.cpp b/be/src/olap/row_cursor.cpp index acde936186..020ccf7782 100644 --- a/be/src/olap/row_cursor.cpp +++ b/be/src/olap/row_cursor.cpp @@ -35,9 +35,7 @@ RowCursor::~RowCursor() { delete[] _variable_buf; } -OLAPStatus RowCursor::_init(const std::vector& schema, - const std::vector& columns) { - _schema.reset(new Schema(schema, columns)); +OLAPStatus RowCursor::_init(const std::vector& columns) { _variable_len = 0; for (auto cid : columns) { if (_schema->column(cid) == nullptr) { @@ -59,6 +57,61 @@ OLAPStatus RowCursor::_init(const std::vector& schema, return OLAP_SUCCESS; } +OLAPStatus RowCursor::_init(const std::shared_ptr& shared_schema, + const std::vector& columns) { + _schema = shared_schema; + return _init(columns); +} + +OLAPStatus RowCursor::_init(const std::vector& schema, + const std::vector& columns) { + _schema.reset(new Schema(schema, columns)); + return _init(columns); +} + +OLAPStatus RowCursor::_init_scan_key(const TabletSchema& schema, const std::vector& scan_keys) { + // NOTE: cid equal with column index + // Hyperloglog cannot be key, no need to handle it + _variable_len = 0; + for (auto cid : _schema->column_ids()) { + const TabletColumn& column = schema.column(cid); + FieldType type = column.type(); + if (type == OLAP_FIELD_TYPE_VARCHAR) { + _variable_len += scan_keys[cid].length(); + } else if (type == OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_ARRAY) { + _variable_len += std::max(scan_keys[cid].length(), column.length()); + } + } + + // variable_len for null bytes + _variable_buf = new (nothrow) char[_variable_len]; + if (_variable_buf == nullptr) { + OLAP_LOG_WARNING("Fail to malloc _variable_buf."); + return OLAP_ERR_MALLOC_ERROR; + } + memset(_variable_buf, 0, _variable_len); + char* fixed_ptr = _fixed_buf; + char* variable_ptr = _variable_buf; + for (auto cid : _schema->column_ids()) { + const TabletColumn& column = schema.column(cid); + fixed_ptr = _fixed_buf + _schema->column_offset(cid); + FieldType type = column.type(); + if (type == OLAP_FIELD_TYPE_VARCHAR) { + Slice* slice = reinterpret_cast(fixed_ptr + 1); + slice->data = variable_ptr; + slice->size = scan_keys[cid].length(); + variable_ptr += scan_keys[cid].length(); + } else if (type == OLAP_FIELD_TYPE_CHAR) { + Slice* slice = reinterpret_cast(fixed_ptr + 1); + slice->data = variable_ptr; + slice->size = std::max(scan_keys[cid].length(), column.length()); + variable_ptr += slice->size; + } + } + + return OLAP_SUCCESS; +} + OLAPStatus RowCursor::init(const TabletSchema& schema) { return init(schema.columns(), schema.num_columns()); } @@ -116,53 +169,27 @@ OLAPStatus RowCursor::init_scan_key(const TabletSchema& schema, return OLAP_ERR_INPUT_PARAMETER_ERROR; } + std::vector columns(scan_key_size); + std::iota(columns.begin(), columns.end(), 0); + + RETURN_NOT_OK(_init(schema.columns(), columns)); + + return _init_scan_key(schema, scan_keys); +} + +OLAPStatus RowCursor::init_scan_key(const TabletSchema& schema, + const std::vector& scan_keys, + const std::shared_ptr& shared_schema) { + size_t scan_key_size = scan_keys.size(); + std::vector columns; for (size_t i = 0; i < scan_key_size; ++i) { columns.push_back(i); } - RETURN_NOT_OK(_init(schema.columns(), columns)); + RETURN_NOT_OK(_init(shared_schema, columns)); - // NOTE: cid equal with column index - // Hyperloglog cannot be key, no need to handle it - _variable_len = 0; - for (auto cid : _schema->column_ids()) { - const TabletColumn& column = schema.column(cid); - FieldType type = column.type(); - if (type == OLAP_FIELD_TYPE_VARCHAR) { - _variable_len += scan_keys[cid].length(); - } else if (type == OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_ARRAY) { - _variable_len += std::max(scan_keys[cid].length(), column.length()); - } - } - - // variable_len for null bytes - _variable_buf = new (nothrow) char[_variable_len]; - if (_variable_buf == nullptr) { - OLAP_LOG_WARNING("Fail to malloc _variable_buf."); - return OLAP_ERR_MALLOC_ERROR; - } - memset(_variable_buf, 0, _variable_len); - char* fixed_ptr = _fixed_buf; - char* variable_ptr = _variable_buf; - for (auto cid : _schema->column_ids()) { - const TabletColumn& column = schema.column(cid); - fixed_ptr = _fixed_buf + _schema->column_offset(cid); - FieldType type = column.type(); - if (type == OLAP_FIELD_TYPE_VARCHAR) { - Slice* slice = reinterpret_cast(fixed_ptr + 1); - slice->data = variable_ptr; - slice->size = scan_keys[cid].length(); - variable_ptr += scan_keys[cid].length(); - } else if (type == OLAP_FIELD_TYPE_CHAR) { - Slice* slice = reinterpret_cast(fixed_ptr + 1); - slice->data = variable_ptr; - slice->size = std::max(scan_keys[cid].length(), column.length()); - variable_ptr += slice->size; - } - } - - return OLAP_SUCCESS; + return _init_scan_key(schema, scan_keys); } // TODO(yingchun): parameter 'const TabletSchema& schema' is not used diff --git a/be/src/olap/row_cursor.h b/be/src/olap/row_cursor.h index cc8dc45041..222a4f985c 100644 --- a/be/src/olap/row_cursor.h +++ b/be/src/olap/row_cursor.h @@ -55,6 +55,10 @@ public: // 目前仅用在拆分key区间的时候 OLAPStatus init_scan_key(const TabletSchema& schema, const std::vector& keys); + OLAPStatus init_scan_key(const TabletSchema& schema, + const std::vector& keys, + const std::shared_ptr& shared_schema); + //allocate memory for string type, which include char, varchar, hyperloglog OLAPStatus allocate_memory_for_string_type(const TabletSchema& schema); @@ -143,10 +147,15 @@ public: char* row_ptr() const { return _fixed_buf; } private: + OLAPStatus _init(const std::vector& columns); + OLAPStatus _init(const std::shared_ptr& shared_schema, + const std::vector& columns); // common init function OLAPStatus _init(const std::vector& schema, const std::vector& columns); - std::unique_ptr _schema; + OLAPStatus _init_scan_key(const TabletSchema& schema, const std::vector& scan_keys); + + std::shared_ptr _schema; char* _fixed_buf = nullptr; // point to fixed buf size_t _fixed_len; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 50e8febb96..e2d4b1d50d 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -177,13 +177,13 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra if (key_range.lower_key != nullptr) { for (auto cid : key_range.lower_key->schema()->column_ids()) { column_set.emplace(cid); - key_fields.emplace_back(key_range.lower_key->schema()->column(cid)); + key_fields.emplace_back(key_range.lower_key->column_schema(cid)); } } if (key_range.upper_key != nullptr) { for (auto cid : key_range.upper_key->schema()->column_ids()) { if (column_set.count(cid) == 0) { - key_fields.emplace_back(key_range.upper_key->schema()->column(cid)); + key_fields.emplace_back(key_range.upper_key->column_schema(cid)); column_set.emplace(cid); } } diff --git a/be/src/olap/schema.h b/be/src/olap/schema.h index 0a2add8f8c..0a7c8e9919 100644 --- a/be/src/olap/schema.h +++ b/be/src/olap/schema.h @@ -100,7 +100,6 @@ public: ~Schema(); - const std::vector& columns() const { return _cols; } const Field* column(ColumnId cid) const { return _cols[cid]; } size_t num_key_columns() const { return _num_key_columns; } diff --git a/be/test/olap/row_cursor_test.cpp b/be/test/olap/row_cursor_test.cpp index a11ef6d101..34410b9613 100644 --- a/be/test/olap/row_cursor_test.cpp +++ b/be/test/olap/row_cursor_test.cpp @@ -21,6 +21,7 @@ #include "common/object_pool.h" #include "olap/row.h" +#include "olap/schema.h" #include "olap/tablet_schema.h" #include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" @@ -310,8 +311,11 @@ TEST_F(TestRowCursor, InitRowCursorWithScanKey) { scan_keys.push_back("char_exceed_length"); scan_keys.push_back("varchar_exceed_length"); + std::vector columns{0, 1}; + std::shared_ptr schema = std::make_shared(tablet_schema.columns(), columns); + RowCursor row; - OLAPStatus res = row.init_scan_key(tablet_schema, scan_keys); + OLAPStatus res = row.init_scan_key(tablet_schema, scan_keys, schema); ASSERT_EQ(res, OLAP_SUCCESS); ASSERT_EQ(row.get_fixed_len(), 34); ASSERT_EQ(row.get_variable_len(), 39);