[enhancement](load) avoid schema copy to reduce cpu usage (#16034)
This commit is contained in:
@ -17,6 +17,7 @@
|
||||
|
||||
#include "olap/delta_writer.h"
|
||||
|
||||
#include "exec/tablet_info.h"
|
||||
#include "olap/data_dir.h"
|
||||
#include "olap/memtable.h"
|
||||
#include "olap/memtable_flush_executor.h"
|
||||
@ -129,8 +130,7 @@ Status DeltaWriter::init() {
|
||||
_req.txn_id, _req.load_id));
|
||||
}
|
||||
// build tablet schema in request level
|
||||
_build_current_tablet_schema(_req.index_id, _req.ptable_schema_param,
|
||||
*_tablet->tablet_schema());
|
||||
_build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_tablet->tablet_schema());
|
||||
RowsetWriterContext context;
|
||||
context.txn_id = _req.txn_id;
|
||||
context.load_id = _req.load_id;
|
||||
@ -424,22 +424,22 @@ int64_t DeltaWriter::partition_id() const {
|
||||
}
|
||||
|
||||
void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
|
||||
const POlapTableSchemaParam& ptable_schema_param,
|
||||
const OlapTableSchemaParam* table_schema_param,
|
||||
const TabletSchema& ori_tablet_schema) {
|
||||
_tablet_schema->copy_from(ori_tablet_schema);
|
||||
|
||||
// find the right index id
|
||||
int i = 0;
|
||||
for (; i < ptable_schema_param.indexes_size(); i++) {
|
||||
if (ptable_schema_param.indexes(i).id() == index_id) break;
|
||||
auto indexes = table_schema_param->indexes();
|
||||
for (; i < indexes.size(); i++) {
|
||||
if (indexes[i]->index_id == index_id) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (ptable_schema_param.indexes_size() > 0 &&
|
||||
ptable_schema_param.indexes(i).columns_desc_size() != 0 &&
|
||||
ptable_schema_param.indexes(i).columns_desc(0).unique_id() >= 0) {
|
||||
_tablet_schema->build_current_tablet_schema(index_id, ptable_schema_param.version(),
|
||||
ptable_schema_param.indexes(i),
|
||||
ori_tablet_schema);
|
||||
if (indexes.size() > 0 && indexes[i]->columns.size() != 0 &&
|
||||
indexes[i]->columns[0]->unique_id() >= 0) {
|
||||
_tablet_schema->build_current_tablet_schema(index_id, table_schema_param->version(),
|
||||
indexes[i], ori_tablet_schema);
|
||||
}
|
||||
if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) {
|
||||
_tablet->update_max_version_schema(_tablet_schema);
|
||||
|
||||
@ -45,7 +45,7 @@ struct WriteRequest {
|
||||
// slots are in order of tablet's schema
|
||||
const std::vector<SlotDescriptor*>* slots;
|
||||
bool is_high_priority = false;
|
||||
POlapTableSchemaParam ptable_schema_param = {};
|
||||
OlapTableSchemaParam* table_schema_param;
|
||||
int64_t index_id = 0;
|
||||
};
|
||||
|
||||
@ -118,7 +118,7 @@ private:
|
||||
void _reset_mem_table();
|
||||
|
||||
void _build_current_tablet_schema(int64_t index_id,
|
||||
const POlapTableSchemaParam& table_schema_param,
|
||||
const OlapTableSchemaParam* table_schema_param,
|
||||
const TabletSchema& ori_tablet_schema);
|
||||
|
||||
void _request_slave_tablet_pull_rowset(PNodeInfo node_info);
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
|
||||
#include <gen_cpp/olap_file.pb.h>
|
||||
|
||||
#include "exec/tablet_info.h"
|
||||
#include "gen_cpp/descriptors.pb.h"
|
||||
#include "olap/utils.h"
|
||||
#include "tablet_meta.h"
|
||||
@ -611,7 +612,7 @@ std::string TabletSchema::to_key() const {
|
||||
}
|
||||
|
||||
void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version,
|
||||
const POlapTableIndexSchema& index,
|
||||
const OlapTableIndexSchema* index,
|
||||
const TabletSchema& ori_tablet_schema) {
|
||||
// copy from ori_tablet_schema
|
||||
_keys_type = ori_tablet_schema.keys_type();
|
||||
@ -637,26 +638,24 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version
|
||||
_field_name_to_index.clear();
|
||||
_field_id_to_index.clear();
|
||||
|
||||
for (auto& pcolumn : index.columns_desc()) {
|
||||
TabletColumn column;
|
||||
column.init_from_pb(pcolumn);
|
||||
if (column.is_key()) {
|
||||
for (auto& column : index->columns) {
|
||||
if (column->is_key()) {
|
||||
_num_key_columns++;
|
||||
}
|
||||
if (column.is_nullable()) {
|
||||
if (column->is_nullable()) {
|
||||
_num_null_columns++;
|
||||
}
|
||||
if (column.is_bf_column()) {
|
||||
if (column->is_bf_column()) {
|
||||
has_bf_columns = true;
|
||||
}
|
||||
if (UNLIKELY(column.name() == DELETE_SIGN)) {
|
||||
if (UNLIKELY(column->name() == DELETE_SIGN)) {
|
||||
_delete_sign_idx = _num_columns;
|
||||
} else if (UNLIKELY(column.name() == SEQUENCE_COL)) {
|
||||
} else if (UNLIKELY(column->name() == SEQUENCE_COL)) {
|
||||
_sequence_col_idx = _num_columns;
|
||||
}
|
||||
_field_name_to_index[column.name()] = _num_columns;
|
||||
_field_id_to_index[column.unique_id()] = _num_columns;
|
||||
_cols.emplace_back(std::move(column));
|
||||
_field_name_to_index[column->name()] = _num_columns;
|
||||
_field_id_to_index[column->unique_id()] = _num_columns;
|
||||
_cols.emplace_back(*column);
|
||||
_num_columns++;
|
||||
}
|
||||
|
||||
|
||||
@ -31,7 +31,7 @@ namespace vectorized {
|
||||
class Block;
|
||||
}
|
||||
|
||||
class POlapTableIndexSchema;
|
||||
struct OlapTableIndexSchema;
|
||||
|
||||
class TabletColumn {
|
||||
public:
|
||||
@ -225,7 +225,7 @@ public:
|
||||
vectorized::Block create_block(bool ignore_dropped_col = true) const;
|
||||
|
||||
void build_current_tablet_schema(int64_t index_id, int32_t version,
|
||||
const POlapTableIndexSchema& index,
|
||||
const OlapTableIndexSchema* index,
|
||||
const TabletSchema& out_tablet_schema);
|
||||
|
||||
// Merge columns that not exit in current schema, these column is dropped in current schema
|
||||
|
||||
@ -242,7 +242,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
|
||||
wrequest.tuple_desc = _tuple_desc;
|
||||
wrequest.slots = index_slots;
|
||||
wrequest.is_high_priority = _is_high_priority;
|
||||
wrequest.ptable_schema_param = request.schema();
|
||||
wrequest.table_schema_param = _schema;
|
||||
|
||||
DeltaWriter* writer = nullptr;
|
||||
auto st = DeltaWriter::open(&wrequest, &writer, _load_id);
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "exec/tablet_info.h"
|
||||
#include "gen_cpp/Descriptors_types.h"
|
||||
#include "gen_cpp/PaloInternalService_types.h"
|
||||
#include "gen_cpp/Types_types.h"
|
||||
@ -382,12 +383,13 @@ TEST_F(TestDeltaWriter, open) {
|
||||
DescriptorTbl* desc_tbl = nullptr;
|
||||
DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
|
||||
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
|
||||
OlapTableSchemaParam param;
|
||||
|
||||
PUniqueId load_id;
|
||||
load_id.set_hi(0);
|
||||
load_id.set_lo(0);
|
||||
WriteRequest write_req = {10003, 270068375, WriteType::LOAD, 20001, 30001,
|
||||
load_id, tuple_desc, &tuple_desc->slots(), true};
|
||||
load_id, tuple_desc, &tuple_desc->slots(), true, ¶m};
|
||||
DeltaWriter* delta_writer = nullptr;
|
||||
|
||||
// test vec delta writer
|
||||
@ -415,12 +417,13 @@ TEST_F(TestDeltaWriter, vec_write) {
|
||||
DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
|
||||
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
|
||||
// const std::vector<SlotDescriptor*>& slots = tuple_desc->slots();
|
||||
OlapTableSchemaParam param;
|
||||
|
||||
PUniqueId load_id;
|
||||
load_id.set_hi(0);
|
||||
load_id.set_lo(0);
|
||||
WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002,
|
||||
30002, load_id, tuple_desc, &(tuple_desc->slots())};
|
||||
WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002, 30002,
|
||||
load_id, tuple_desc, &(tuple_desc->slots()), false, ¶m};
|
||||
DeltaWriter* delta_writer = nullptr;
|
||||
DeltaWriter::open(&write_req, &delta_writer, TUniqueId());
|
||||
ASSERT_NE(delta_writer, nullptr);
|
||||
@ -561,12 +564,13 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
|
||||
DescriptorTbl* desc_tbl = nullptr;
|
||||
DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
|
||||
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
|
||||
OlapTableSchemaParam param;
|
||||
|
||||
PUniqueId load_id;
|
||||
load_id.set_hi(0);
|
||||
load_id.set_lo(0);
|
||||
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
|
||||
30003, load_id, tuple_desc, &(tuple_desc->slots())};
|
||||
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, 30003,
|
||||
load_id, tuple_desc, &(tuple_desc->slots()), false, ¶m};
|
||||
DeltaWriter* delta_writer = nullptr;
|
||||
DeltaWriter::open(&write_req, &delta_writer, TUniqueId());
|
||||
ASSERT_NE(delta_writer, nullptr);
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "exec/tablet_info.h"
|
||||
#include "gen_cpp/Descriptors_types.h"
|
||||
#include "gen_cpp/PaloInternalService_types.h"
|
||||
#include "gen_cpp/Types_types.h"
|
||||
@ -163,12 +164,13 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) {
|
||||
DescriptorTbl* desc_tbl = nullptr;
|
||||
DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
|
||||
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
|
||||
OlapTableSchemaParam param;
|
||||
|
||||
PUniqueId load_id;
|
||||
load_id.set_hi(0);
|
||||
load_id.set_lo(0);
|
||||
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
|
||||
30003, load_id, tuple_desc, &(tuple_desc->slots())};
|
||||
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, 30003,
|
||||
load_id, tuple_desc, &(tuple_desc->slots()), false, ¶m};
|
||||
DeltaWriter* delta_writer = nullptr;
|
||||
DeltaWriter::open(&write_req, &delta_writer);
|
||||
EXPECT_NE(delta_writer, nullptr);
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
|
||||
#include "common/config.h"
|
||||
#include "common/status.h"
|
||||
#include "exec/tablet_info.h"
|
||||
#include "gen_cpp/internal_service.pb.h"
|
||||
#include "io/fs/file_system_map.h"
|
||||
#include "io/fs/s3_file_system.h"
|
||||
@ -150,12 +151,13 @@ TEST_F(RemoteRowsetGcTest, normal) {
|
||||
DescriptorTbl* desc_tbl = nullptr;
|
||||
DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
|
||||
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
|
||||
OlapTableSchemaParam param;
|
||||
|
||||
PUniqueId load_id;
|
||||
load_id.set_hi(0);
|
||||
load_id.set_lo(0);
|
||||
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
|
||||
30003, load_id, tuple_desc, &(tuple_desc->slots())};
|
||||
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, 30003,
|
||||
load_id, tuple_desc, &(tuple_desc->slots()), false, ¶m};
|
||||
DeltaWriter* delta_writer = nullptr;
|
||||
DeltaWriter::open(&write_req, &delta_writer);
|
||||
ASSERT_NE(delta_writer, nullptr);
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
|
||||
#include "common/config.h"
|
||||
#include "common/status.h"
|
||||
#include "exec/tablet_info.h"
|
||||
#include "gen_cpp/internal_service.pb.h"
|
||||
#include "io/fs/file_system_map.h"
|
||||
#include "io/fs/s3_file_system.h"
|
||||
@ -149,12 +150,13 @@ TEST_F(TabletCooldownTest, normal) {
|
||||
DescriptorTbl* desc_tbl = nullptr;
|
||||
DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
|
||||
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
|
||||
OlapTableSchemaParam param;
|
||||
|
||||
PUniqueId load_id;
|
||||
load_id.set_hi(0);
|
||||
load_id.set_lo(0);
|
||||
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
|
||||
30003, load_id, tuple_desc, &(tuple_desc->slots())};
|
||||
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, 30003,
|
||||
load_id, tuple_desc, &(tuple_desc->slots()), false, ¶m};
|
||||
DeltaWriter* delta_writer = nullptr;
|
||||
DeltaWriter::open(&write_req, &delta_writer);
|
||||
ASSERT_NE(delta_writer, nullptr);
|
||||
|
||||
Reference in New Issue
Block a user