[Enhance] improve performance of init_scan_key by sharing the schema (#6099)

Co-authored-by: huangmengbin <huangmengbin@bytedance.com>
This commit is contained in:
huangmengbin
2021-07-21 10:50:31 +08:00
committed by GitHub
parent 94c50012b2
commit 2d78c31d49
7 changed files with 118 additions and 53 deletions

View File

@ -66,9 +66,9 @@ Status AutoIncrementIterator::next_batch(RowBlockV2* block) {
while (row_idx < block->capacity() && _rows_returned < _num_rows) {
RowBlockRow row = block->row(row_idx);
for (int i = 0; i < _schema.columns().size(); ++i) {
for (int i = 0; i < _schema.num_columns(); ++i) {
row.set_is_null(i, false);
auto& col_schema = _schema.columns()[i];
const auto* col_schema = _schema.column(i);
switch (col_schema->type()) {
case OLAP_FIELD_TYPE_SMALLINT:
*(int16_t*)row.cell_ptr(i) = _rows_returned + i;

View File

@ -31,6 +31,7 @@
#include "olap/row_block.h"
#include "olap/row_cursor.h"
#include "olap/rowset/beta_rowset_reader.h"
#include "olap/schema.h"
#include "olap/rowset/column_data.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
@ -549,14 +550,34 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
size_t start_key_size = read_params.start_key.size();
_keys_param.start_keys.resize(start_key_size, nullptr);
size_t scan_key_size = read_params.start_key.front().size();
if (scan_key_size > _tablet->tablet_schema().num_columns()) {
LOG(WARNING)
<< "Input param are invalid. Column count is bigger than num_columns of schema. "
<< "column_count=" << scan_key_size
<< ", schema.num_columns=" << _tablet->tablet_schema().num_columns();
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
std::vector<uint32_t> columns(scan_key_size);
std::iota(columns.begin(), columns.end(), 0);
std::shared_ptr<Schema> schema = std::make_shared<Schema>(_tablet->tablet_schema().columns(), columns);
for (size_t i = 0; i < start_key_size; ++i) {
if (read_params.start_key[i].size() != scan_key_size) {
OLAP_LOG_WARNING("The start_key.at(%ld).size == %ld, not equals the %ld", i, read_params.start_key[i].size(), scan_key_size);
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
if ((_keys_param.start_keys[i] = new (nothrow) RowCursor()) == nullptr) {
OLAP_LOG_WARNING("fail to new RowCursor!");
return OLAP_ERR_MALLOC_ERROR;
}
OLAPStatus res = _keys_param.start_keys[i]->init_scan_key(
_tablet->tablet_schema(), read_params.start_key[i].values());
_tablet->tablet_schema(), read_params.start_key[i].values(), schema);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res);
return res;
@ -572,13 +593,18 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
size_t end_key_size = read_params.end_key.size();
_keys_param.end_keys.resize(end_key_size, nullptr);
for (size_t i = 0; i < end_key_size; ++i) {
if (read_params.end_key[i].size() != scan_key_size) {
OLAP_LOG_WARNING("The end_key.at(%ld).size == %ld, not equals the %ld", i, read_params.end_key[i].size(), scan_key_size);
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
if ((_keys_param.end_keys[i] = new (nothrow) RowCursor()) == nullptr) {
OLAP_LOG_WARNING("fail to new RowCursor!");
return OLAP_ERR_MALLOC_ERROR;
}
OLAPStatus res = _keys_param.end_keys[i]->init_scan_key(_tablet->tablet_schema(),
read_params.end_key[i].values());
read_params.end_key[i].values(), schema);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res);
return res;

View File

@ -35,9 +35,7 @@ RowCursor::~RowCursor() {
delete[] _variable_buf;
}
OLAPStatus RowCursor::_init(const std::vector<TabletColumn>& schema,
const std::vector<uint32_t>& columns) {
_schema.reset(new Schema(schema, columns));
OLAPStatus RowCursor::_init(const std::vector<uint32_t>& columns) {
_variable_len = 0;
for (auto cid : columns) {
if (_schema->column(cid) == nullptr) {
@ -59,6 +57,61 @@ OLAPStatus RowCursor::_init(const std::vector<TabletColumn>& schema,
return OLAP_SUCCESS;
}
OLAPStatus RowCursor::_init(const std::shared_ptr<Schema>& shared_schema,
const std::vector<uint32_t>& columns) {
_schema = shared_schema;
return _init(columns);
}
OLAPStatus RowCursor::_init(const std::vector<TabletColumn>& schema,
const std::vector<uint32_t>& columns) {
_schema.reset(new Schema(schema, columns));
return _init(columns);
}
OLAPStatus RowCursor::_init_scan_key(const TabletSchema& schema, const std::vector<std::string>& scan_keys) {
// NOTE: cid equal with column index
// Hyperloglog cannot be key, no need to handle it
_variable_len = 0;
for (auto cid : _schema->column_ids()) {
const TabletColumn& column = schema.column(cid);
FieldType type = column.type();
if (type == OLAP_FIELD_TYPE_VARCHAR) {
_variable_len += scan_keys[cid].length();
} else if (type == OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_ARRAY) {
_variable_len += std::max(scan_keys[cid].length(), column.length());
}
}
// variable_len for null bytes
_variable_buf = new (nothrow) char[_variable_len];
if (_variable_buf == nullptr) {
OLAP_LOG_WARNING("Fail to malloc _variable_buf.");
return OLAP_ERR_MALLOC_ERROR;
}
memset(_variable_buf, 0, _variable_len);
char* fixed_ptr = _fixed_buf;
char* variable_ptr = _variable_buf;
for (auto cid : _schema->column_ids()) {
const TabletColumn& column = schema.column(cid);
fixed_ptr = _fixed_buf + _schema->column_offset(cid);
FieldType type = column.type();
if (type == OLAP_FIELD_TYPE_VARCHAR) {
Slice* slice = reinterpret_cast<Slice*>(fixed_ptr + 1);
slice->data = variable_ptr;
slice->size = scan_keys[cid].length();
variable_ptr += scan_keys[cid].length();
} else if (type == OLAP_FIELD_TYPE_CHAR) {
Slice* slice = reinterpret_cast<Slice*>(fixed_ptr + 1);
slice->data = variable_ptr;
slice->size = std::max(scan_keys[cid].length(), column.length());
variable_ptr += slice->size;
}
}
return OLAP_SUCCESS;
}
OLAPStatus RowCursor::init(const TabletSchema& schema) {
return init(schema.columns(), schema.num_columns());
}
@ -116,53 +169,27 @@ OLAPStatus RowCursor::init_scan_key(const TabletSchema& schema,
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
std::vector<uint32_t> columns(scan_key_size);
std::iota(columns.begin(), columns.end(), 0);
RETURN_NOT_OK(_init(schema.columns(), columns));
return _init_scan_key(schema, scan_keys);
}
OLAPStatus RowCursor::init_scan_key(const TabletSchema& schema,
const std::vector<std::string>& scan_keys,
const std::shared_ptr<Schema>& shared_schema) {
size_t scan_key_size = scan_keys.size();
std::vector<uint32_t> columns;
for (size_t i = 0; i < scan_key_size; ++i) {
columns.push_back(i);
}
RETURN_NOT_OK(_init(schema.columns(), columns));
RETURN_NOT_OK(_init(shared_schema, columns));
// NOTE: cid equal with column index
// Hyperloglog cannot be key, no need to handle it
_variable_len = 0;
for (auto cid : _schema->column_ids()) {
const TabletColumn& column = schema.column(cid);
FieldType type = column.type();
if (type == OLAP_FIELD_TYPE_VARCHAR) {
_variable_len += scan_keys[cid].length();
} else if (type == OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_ARRAY) {
_variable_len += std::max(scan_keys[cid].length(), column.length());
}
}
// variable_len for null bytes
_variable_buf = new (nothrow) char[_variable_len];
if (_variable_buf == nullptr) {
OLAP_LOG_WARNING("Fail to malloc _variable_buf.");
return OLAP_ERR_MALLOC_ERROR;
}
memset(_variable_buf, 0, _variable_len);
char* fixed_ptr = _fixed_buf;
char* variable_ptr = _variable_buf;
for (auto cid : _schema->column_ids()) {
const TabletColumn& column = schema.column(cid);
fixed_ptr = _fixed_buf + _schema->column_offset(cid);
FieldType type = column.type();
if (type == OLAP_FIELD_TYPE_VARCHAR) {
Slice* slice = reinterpret_cast<Slice*>(fixed_ptr + 1);
slice->data = variable_ptr;
slice->size = scan_keys[cid].length();
variable_ptr += scan_keys[cid].length();
} else if (type == OLAP_FIELD_TYPE_CHAR) {
Slice* slice = reinterpret_cast<Slice*>(fixed_ptr + 1);
slice->data = variable_ptr;
slice->size = std::max(scan_keys[cid].length(), column.length());
variable_ptr += slice->size;
}
}
return OLAP_SUCCESS;
return _init_scan_key(schema, scan_keys);
}
// TODO(yingchun): parameter 'const TabletSchema& schema' is not used

View File

@ -55,6 +55,10 @@ public:
// 目前仅用在拆分key区间的时候
OLAPStatus init_scan_key(const TabletSchema& schema, const std::vector<std::string>& keys);
OLAPStatus init_scan_key(const TabletSchema& schema,
const std::vector<std::string>& keys,
const std::shared_ptr<Schema>& shared_schema);
//allocate memory for string type, which include char, varchar, hyperloglog
OLAPStatus allocate_memory_for_string_type(const TabletSchema& schema);
@ -143,10 +147,15 @@ public:
char* row_ptr() const { return _fixed_buf; }
private:
OLAPStatus _init(const std::vector<uint32_t>& columns);
OLAPStatus _init(const std::shared_ptr<Schema>& shared_schema,
const std::vector<uint32_t>& columns);
// common init function
OLAPStatus _init(const std::vector<TabletColumn>& schema, const std::vector<uint32_t>& columns);
std::unique_ptr<Schema> _schema;
OLAPStatus _init_scan_key(const TabletSchema& schema, const std::vector<std::string>& scan_keys);
std::shared_ptr<Schema> _schema;
char* _fixed_buf = nullptr; // point to fixed buf
size_t _fixed_len;

View File

@ -177,13 +177,13 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra
if (key_range.lower_key != nullptr) {
for (auto cid : key_range.lower_key->schema()->column_ids()) {
column_set.emplace(cid);
key_fields.emplace_back(key_range.lower_key->schema()->column(cid));
key_fields.emplace_back(key_range.lower_key->column_schema(cid));
}
}
if (key_range.upper_key != nullptr) {
for (auto cid : key_range.upper_key->schema()->column_ids()) {
if (column_set.count(cid) == 0) {
key_fields.emplace_back(key_range.upper_key->schema()->column(cid));
key_fields.emplace_back(key_range.upper_key->column_schema(cid));
column_set.emplace(cid);
}
}

View File

@ -100,7 +100,6 @@ public:
~Schema();
const std::vector<Field*>& columns() const { return _cols; }
const Field* column(ColumnId cid) const { return _cols[cid]; }
size_t num_key_columns() const { return _num_key_columns; }

View File

@ -21,6 +21,7 @@
#include "common/object_pool.h"
#include "olap/row.h"
#include "olap/schema.h"
#include "olap/tablet_schema.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
@ -310,8 +311,11 @@ TEST_F(TestRowCursor, InitRowCursorWithScanKey) {
scan_keys.push_back("char_exceed_length");
scan_keys.push_back("varchar_exceed_length");
std::vector<uint32_t> columns{0, 1};
std::shared_ptr<Schema> schema = std::make_shared<Schema>(tablet_schema.columns(), columns);
RowCursor row;
OLAPStatus res = row.init_scan_key(tablet_schema, scan_keys);
OLAPStatus res = row.init_scan_key(tablet_schema, scan_keys, schema);
ASSERT_EQ(res, OLAP_SUCCESS);
ASSERT_EQ(row.get_fixed_len(), 34);
ASSERT_EQ(row.get_variable_len(), 39);