Files
doris/be/src/olap/schema_change.cpp
Xinyi Zou 0b945fe361 [enhancement](memtracker) Refactor mem tracker hierarchy (#13585)
mem tracker can be logically divided into 4 layers: 1)process 2)type 3)query/load/compation task etc. 4)exec node etc.

type includes

enum Type {
        GLOBAL = 0,        // Life cycle is the same as the process, e.g. Cache and default Orphan
        QUERY = 1,         // Count the memory consumption of all Query tasks.
        LOAD = 2,          // Count the memory consumption of all Load tasks.
        COMPACTION = 3,    // Count the memory consumption of all Base and Cumulative tasks.
        SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks.
        CLONE = 5, // Count the memory consumption of all EngineCloneTask. Note: Memory that does not contain make/release snapshots.
        BATCHLOAD = 6,  // Count the memory consumption of all EngineBatchLoadTask.
        CONSISTENCY = 7 // Count the memory consumption of all EngineChecksumTask.
    }
Object pointers are no longer saved between each layer, and the values of process and each type are periodically aggregated.

other fix:

In [fix](memtracker) Fix transmit_tracker null pointer because phamp is not thread safe #13528, I tried to separate the memory that was manually abandoned in the query from the orphan mem tracker. But in the actual test, the accuracy of this part of the memory cannot be guaranteed, so put it back to the orphan mem tracker again.
2022-11-08 09:52:33 +08:00

2444 lines
109 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/schema_change.h"
#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/merger.h"
#include "olap/olap_common.h"
#include "olap/row.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
#include "olap/types.h"
#include "olap/wrapper_field.h"
#include "runtime/memory/mem_tracker.h"
#include "util/defer_op.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/aggregate_functions/aggregate_function_reader.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
using std::nothrow;
namespace doris {
constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
class RowBlockSorter {
public:
explicit RowBlockSorter(RowBlockAllocator* allocator);
virtual ~RowBlockSorter();
size_t num_rows() { return _swap_row_block != nullptr ? _swap_row_block->capacity() : 0; }
bool sort(RowBlock** row_block);
private:
static bool _row_cursor_comparator(const std::unique_ptr<RowCursor>& a,
const std::unique_ptr<RowCursor>& b) {
return compare_row(*a, *b) < 0;
}
RowBlockAllocator* _row_block_allocator;
RowBlock* _swap_row_block;
};
class RowBlockMerger {
public:
explicit RowBlockMerger(TabletSharedPtr tablet);
virtual ~RowBlockMerger();
bool merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer,
uint64_t* merged_rows);
private:
struct MergeElement {
bool operator<(const MergeElement& other) const {
return compare_row(*row_cursor, *other.row_cursor) > 0;
}
const RowBlock* row_block;
RowCursor* row_cursor;
uint32_t row_block_index;
};
bool _make_heap(const std::vector<RowBlock*>& row_block_arr);
void _pop_heap();
TabletSharedPtr _tablet;
std::priority_queue<MergeElement> _heap;
};
class MultiBlockMerger {
public:
MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
RowsetWriter* rowset_writer, uint64_t* merged_rows) {
int rows = 0;
for (auto& block : blocks) {
rows += block->rows();
}
if (!rows) {
return Status::OK();
}
std::vector<RowRef> row_refs;
row_refs.reserve(rows);
for (auto& block : blocks) {
for (uint16_t i = 0; i < block->rows(); i++) {
row_refs.emplace_back(block.get(), i);
}
}
// TODO: try to use pdqsort to replace std::sort
// The block version is incremental.
std::stable_sort(row_refs.begin(), row_refs.end(), _cmp);
auto finalized_block = _tablet->tablet_schema()->create_block();
int columns = finalized_block.columns();
*merged_rows += rows;
if (_tablet->keys_type() == KeysType::AGG_KEYS) {
auto tablet_schema = _tablet->tablet_schema();
int key_number = _tablet->num_key_columns();
std::vector<vectorized::AggregateFunctionPtr> agg_functions;
std::vector<vectorized::AggregateDataPtr> agg_places;
for (int i = key_number; i < columns; i++) {
vectorized::AggregateFunctionPtr function =
tablet_schema->column(i).get_aggregate_function(
{finalized_block.get_data_type(i)}, vectorized::AGG_LOAD_SUFFIX);
agg_functions.push_back(function);
// create aggregate data
vectorized::AggregateDataPtr place = new char[function->size_of_data()];
function->create(place);
agg_places.push_back(place);
}
for (int i = 0; i < rows; i++) {
auto row_ref = row_refs[i];
for (int j = key_number; j < columns; j++) {
auto column_ptr = row_ref.get_column(j).get();
agg_functions[j - key_number]->add(
agg_places[j - key_number],
const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position,
nullptr);
}
if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
for (int j = 0; j < key_number; j++) {
finalized_block.get_by_position(j).column->assume_mutable()->insert_from(
*row_ref.get_column(j), row_ref.position);
}
for (int j = key_number; j < columns; j++) {
agg_functions[j - key_number]->insert_result_into(
agg_places[j - key_number],
finalized_block.get_by_position(j).column->assume_mutable_ref());
agg_functions[j - key_number]->create(agg_places[j - key_number]);
}
if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) {
*merged_rows -= finalized_block.rows();
rowset_writer->add_block(&finalized_block);
finalized_block.clear_column_data();
}
}
}
for (int i = 0; i < columns - key_number; i++) {
agg_functions[i]->destroy(agg_places[i]);
delete[] agg_places[i];
}
} else {
std::vector<RowRef> pushed_row_refs;
if (_tablet->keys_type() == KeysType::DUP_KEYS) {
std::swap(pushed_row_refs, row_refs);
} else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
for (int i = 0; i < rows; i++) {
if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
pushed_row_refs.push_back(row_refs[i]);
}
}
}
// update real inserted row number
rows = pushed_row_refs.size();
*merged_rows -= rows;
for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) {
int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i);
for (int idx = 0; idx < columns; idx++) {
auto column = finalized_block.get_by_position(idx).column->assume_mutable();
for (int j = 0; j < limit; j++) {
auto row_ref = pushed_row_refs[i + j];
column->insert_from(*row_ref.get_column(idx), row_ref.position);
}
}
rowset_writer->add_block(&finalized_block);
finalized_block.clear_column_data();
}
}
RETURN_IF_ERROR(rowset_writer->flush());
return Status::OK();
}
private:
struct RowRef {
RowRef(vectorized::Block* block_, uint16_t position_)
: block(block_), position(position_) {}
vectorized::ColumnPtr get_column(int index) const {
return block->get_by_position(index).column;
}
const vectorized::Block* block;
uint16_t position;
};
struct RowRefComparator {
RowRefComparator(TabletSharedPtr tablet) : _num_columns(tablet->num_key_columns()) {}
int compare(const RowRef& lhs, const RowRef& rhs) const {
return lhs.block->compare_at(lhs.position, rhs.position, _num_columns, *rhs.block, -1);
}
bool operator()(const RowRef& lhs, const RowRef& rhs) const {
return compare(lhs, rhs) < 0;
}
const size_t _num_columns;
};
TabletSharedPtr _tablet;
RowRefComparator _cmp;
};
RowBlockChanger::RowBlockChanger(TabletSchemaSPtr tablet_schema,
const DeleteHandler* delete_handler, DescriptorTbl desc_tbl)
: _desc_tbl(desc_tbl) {
_schema_mapping.resize(tablet_schema->num_columns());
_delete_handler = delete_handler;
}
RowBlockChanger::~RowBlockChanger() {
for (auto it = _schema_mapping.begin(); it != _schema_mapping.end(); ++it) {
SAFE_DELETE(it->default_value);
}
_schema_mapping.clear();
}
ColumnMapping* RowBlockChanger::get_mutable_column_mapping(size_t column_index) {
if (column_index >= _schema_mapping.size()) {
return nullptr;
}
return &(_schema_mapping[column_index]);
}
#define TYPE_REINTERPRET_CAST(FromType, ToType) \
{ \
size_t row_num = ref_block->row_block_info().row_num; \
for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \
char* ref_ptr = ref_block->field_ptr(row, ref_column); \
char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \
*new_ptr = *ref_ptr; \
*(ToType*)(new_ptr + 1) = *(FromType*)(ref_ptr + 1); \
} \
break; \
}
#define LARGEINT_REINTERPRET_CAST(FromType, ToType) \
{ \
size_t row_num = ref_block->row_block_info().row_num; \
for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \
char* ref_ptr = ref_block->field_ptr(row, ref_column); \
char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \
*new_ptr = *ref_ptr; \
ToType new_value = *(FromType*)(ref_ptr + 1); \
memcpy(new_ptr + 1, &new_value, sizeof(ToType)); \
} \
break; \
}
#define CONVERT_FROM_TYPE(from_type) \
{ \
switch (newtype) { \
case OLAP_FIELD_TYPE_TINYINT: \
TYPE_REINTERPRET_CAST(from_type, int8_t); \
case OLAP_FIELD_TYPE_UNSIGNED_TINYINT: \
TYPE_REINTERPRET_CAST(from_type, uint8_t); \
case OLAP_FIELD_TYPE_SMALLINT: \
TYPE_REINTERPRET_CAST(from_type, int16_t); \
case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT: \
TYPE_REINTERPRET_CAST(from_type, uint16_t); \
case OLAP_FIELD_TYPE_INT: \
TYPE_REINTERPRET_CAST(from_type, int32_t); \
case OLAP_FIELD_TYPE_UNSIGNED_INT: \
TYPE_REINTERPRET_CAST(from_type, uint32_t); \
case OLAP_FIELD_TYPE_BIGINT: \
TYPE_REINTERPRET_CAST(from_type, int64_t); \
case OLAP_FIELD_TYPE_UNSIGNED_BIGINT: \
TYPE_REINTERPRET_CAST(from_type, uint64_t); \
case OLAP_FIELD_TYPE_LARGEINT: \
LARGEINT_REINTERPRET_CAST(from_type, int128_t); \
case OLAP_FIELD_TYPE_FLOAT: \
TYPE_REINTERPRET_CAST(from_type, float); \
case OLAP_FIELD_TYPE_DOUBLE: \
TYPE_REINTERPRET_CAST(from_type, double); \
default: \
LOG(WARNING) << "the column type which was altered to was unsupported." \
<< " origin_type=" \
<< ref_block->tablet_schema()->column(ref_column).type() \
<< ", alter_type=" << mutable_block->tablet_schema()->column(i).type(); \
return Status::OLAPInternalError(OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID); \
} \
break; \
}
#define ASSIGN_DEFAULT_VALUE(length) \
case length: { \
for (size_t row = 0; row < ref_block.row_block_info().row_num; ++row) { \
memcpy(buf, _schema_mapping[i].default_value->ptr(), length); \
buf += length; \
} \
break; \
}
struct ConvertTypeMapHash {
size_t operator()(const std::pair<FieldType, FieldType>& pair) const {
return (pair.first + 31) ^ pair.second;
}
};
class ConvertTypeResolver {
DECLARE_SINGLETON(ConvertTypeResolver);
public:
bool get_convert_type_info(const FieldType from_type, const FieldType to_type) const {
return _convert_type_set.find(std::make_pair(from_type, to_type)) !=
_convert_type_set.end();
}
template <FieldType from_type, FieldType to_type>
void add_convert_type_mapping() {
_convert_type_set.emplace(std::make_pair(from_type, to_type));
}
private:
using convert_type_pair = std::pair<FieldType, FieldType>;
std::unordered_set<convert_type_pair, ConvertTypeMapHash> _convert_type_set;
DISALLOW_COPY_AND_ASSIGN(ConvertTypeResolver);
};
ConvertTypeResolver::ConvertTypeResolver() {
// from char type
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_TINYINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_SMALLINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_INT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_BIGINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_LARGEINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_FLOAT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_DOUBLE>();
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_DATE>();
// supported type convert should annotate in doc:
// http://doris.apache.org/master/zh-CN/sql-reference/sql-statements/Data%20Definition/ALTER%20TABLE.html#description
// If type convert is supported here, you should check fe/src/main/java/org/apache/doris/catalog/ColumnType.java to supported it either
// from varchar type
add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_TINYINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_SMALLINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_INT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_BIGINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_LARGEINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_FLOAT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_DOUBLE>();
add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_DATE>();
add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_STRING>();
// to varchar type
add_convert_type_mapping<OLAP_FIELD_TYPE_TINYINT, OLAP_FIELD_TYPE_VARCHAR>();
add_convert_type_mapping<OLAP_FIELD_TYPE_SMALLINT, OLAP_FIELD_TYPE_VARCHAR>();
add_convert_type_mapping<OLAP_FIELD_TYPE_INT, OLAP_FIELD_TYPE_VARCHAR>();
add_convert_type_mapping<OLAP_FIELD_TYPE_BIGINT, OLAP_FIELD_TYPE_VARCHAR>();
add_convert_type_mapping<OLAP_FIELD_TYPE_LARGEINT, OLAP_FIELD_TYPE_VARCHAR>();
add_convert_type_mapping<OLAP_FIELD_TYPE_FLOAT, OLAP_FIELD_TYPE_VARCHAR>();
add_convert_type_mapping<OLAP_FIELD_TYPE_DOUBLE, OLAP_FIELD_TYPE_VARCHAR>();
add_convert_type_mapping<OLAP_FIELD_TYPE_DECIMAL, OLAP_FIELD_TYPE_VARCHAR>();
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_VARCHAR>();
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_VARCHAR>();
// from string
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_TINYINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_SMALLINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_INT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_BIGINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_LARGEINT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_FLOAT>();
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_DOUBLE>();
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_DATE>();
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_VARCHAR>();
// to string
add_convert_type_mapping<OLAP_FIELD_TYPE_TINYINT, OLAP_FIELD_TYPE_STRING>();
add_convert_type_mapping<OLAP_FIELD_TYPE_SMALLINT, OLAP_FIELD_TYPE_STRING>();
add_convert_type_mapping<OLAP_FIELD_TYPE_INT, OLAP_FIELD_TYPE_STRING>();
add_convert_type_mapping<OLAP_FIELD_TYPE_BIGINT, OLAP_FIELD_TYPE_STRING>();
add_convert_type_mapping<OLAP_FIELD_TYPE_LARGEINT, OLAP_FIELD_TYPE_STRING>();
add_convert_type_mapping<OLAP_FIELD_TYPE_FLOAT, OLAP_FIELD_TYPE_STRING>();
add_convert_type_mapping<OLAP_FIELD_TYPE_DOUBLE, OLAP_FIELD_TYPE_STRING>();
add_convert_type_mapping<OLAP_FIELD_TYPE_DECIMAL, OLAP_FIELD_TYPE_STRING>();
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_STRING>();
add_convert_type_mapping<OLAP_FIELD_TYPE_STRING, OLAP_FIELD_TYPE_STRING>();
add_convert_type_mapping<OLAP_FIELD_TYPE_DATE, OLAP_FIELD_TYPE_DATETIME>();
add_convert_type_mapping<OLAP_FIELD_TYPE_DATETIME, OLAP_FIELD_TYPE_DATE>();
add_convert_type_mapping<OLAP_FIELD_TYPE_FLOAT, OLAP_FIELD_TYPE_DOUBLE>();
add_convert_type_mapping<OLAP_FIELD_TYPE_INT, OLAP_FIELD_TYPE_DATE>();
}
ConvertTypeResolver::~ConvertTypeResolver() = default;
bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool) {
write_helper->set_not_null(field_idx);
BitmapValue bitmap;
if (!read_helper->is_null(ref_field_idx)) {
uint64_t origin_value;
char* src = read_helper->cell_ptr(ref_field_idx);
switch (ref_column.type()) {
case OLAP_FIELD_TYPE_TINYINT:
if (*(int8_t*)src < 0) {
LOG(WARNING)
<< "The input: " << *(int8_t*)src
<< " is not valid, to_bitmap only support bigint value from 0 to "
"18446744073709551615 currently, cannot create MV with to_bitmap on "
"column with negative values.";
return false;
}
origin_value = *(int8_t*)src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
origin_value = *(uint8_t*)src;
break;
case OLAP_FIELD_TYPE_SMALLINT:
if (*(int16_t*)src < 0) {
LOG(WARNING)
<< "The input: " << *(int16_t*)src
<< " is not valid, to_bitmap only support bigint value from 0 to "
"18446744073709551615 currently, cannot create MV with to_bitmap on "
"column with negative values.";
return false;
}
origin_value = *(int16_t*)src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
origin_value = *(uint16_t*)src;
break;
case OLAP_FIELD_TYPE_INT:
if (*(int32_t*)src < 0) {
LOG(WARNING)
<< "The input: " << *(int32_t*)src
<< " is not valid, to_bitmap only support bigint value from 0 to "
"18446744073709551615 currently, cannot create MV with to_bitmap on "
"column with negative values.";
return false;
}
origin_value = *(int32_t*)src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_INT:
origin_value = *(uint32_t*)src;
break;
case OLAP_FIELD_TYPE_BIGINT:
if (*(int64_t*)src < 0) {
LOG(WARNING)
<< "The input: " << *(int64_t*)src
<< " is not valid, to_bitmap only support bigint value from 0 to "
"18446744073709551615 currently, cannot create MV with to_bitmap on "
"column with negative values.";
return false;
}
origin_value = *(int64_t*)src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
origin_value = *(uint64_t*)src;
break;
default:
LOG(WARNING) << "the column type which was altered from was unsupported."
<< " from_type=" << ref_column.type();
return false;
}
bitmap.add(origin_value);
}
char* buf = reinterpret_cast<char*>(mem_pool->allocate(bitmap.getSizeInBytes()));
Slice dst(buf, bitmap.getSizeInBytes());
bitmap.write(dst.data);
write_helper->set_field_content(field_idx, reinterpret_cast<char*>(&dst), mem_pool);
return true;
}
bool hll_hash(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool) {
write_helper->set_not_null(field_idx);
HyperLogLog hll;
if (!read_helper->is_null(ref_field_idx)) {
uint64_t hash_value;
switch (ref_column.type()) {
case OLAP_FIELD_TYPE_CHAR: {
int p = ref_column.length() - 1;
Slice* slice = reinterpret_cast<Slice*>(read_helper->cell_ptr(ref_field_idx));
char* buf = slice->data;
while (p >= 0 && buf[p] == '\0') {
p--;
}
slice->size = p + 1;
}
case OLAP_FIELD_TYPE_VARCHAR:
case OLAP_FIELD_TYPE_STRING: {
Slice slice = *reinterpret_cast<Slice*>(read_helper->cell_ptr(ref_field_idx));
hash_value = HashUtil::murmur_hash64A(slice.data, slice.size, HashUtil::MURMUR_SEED);
break;
}
case OLAP_FIELD_TYPE_BOOL:
case OLAP_FIELD_TYPE_TINYINT:
case OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
case OLAP_FIELD_TYPE_SMALLINT:
case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
case OLAP_FIELD_TYPE_INT:
case OLAP_FIELD_TYPE_UNSIGNED_INT:
case OLAP_FIELD_TYPE_BIGINT:
case OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
case OLAP_FIELD_TYPE_LARGEINT:
case OLAP_FIELD_TYPE_FLOAT:
case OLAP_FIELD_TYPE_DOUBLE:
case OLAP_FIELD_TYPE_DISCRETE_DOUBLE:
case OLAP_FIELD_TYPE_DATE:
case OLAP_FIELD_TYPE_DATETIME: {
std::string ref_column_string =
read_helper->column_schema(ref_field_idx)
->type_info()
->to_string(read_helper->cell_ptr(ref_field_idx));
hash_value = HashUtil::murmur_hash64A(
ref_column_string.c_str(), ref_column_string.length(), HashUtil::MURMUR_SEED);
break;
}
default:
LOG(WARNING) << "fail to hll hash type : " << ref_column.type();
return false;
}
hll.update(hash_value);
}
std::string buf;
buf.resize(hll.max_serialized_size());
buf.resize(hll.serialize((uint8_t*)buf.c_str()));
Slice dst(buf);
write_helper->set_field_content(field_idx, reinterpret_cast<char*>(&dst), mem_pool);
return true;
}
bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool) {
write_helper->set_not_null(field_idx);
int64_t count = read_helper->is_null(ref_field_idx) ? 0 : 1;
write_helper->set_field_content(field_idx, (char*)&count, mem_pool);
return true;
}
Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data_version,
RowBlock* mutable_block,
const uint64_t* filtered_rows) const {
if (mutable_block == nullptr) {
LOG(FATAL) << "mutable block is uninitialized.";
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
} else if (mutable_block->tablet_schema()->num_columns() != _schema_mapping.size()) {
LOG(WARNING) << "mutable block does not match with schema mapping rules. "
<< "block_schema_size=" << mutable_block->tablet_schema()->num_columns()
<< ", mapping_schema_size=" << _schema_mapping.size();
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
}
if (mutable_block->capacity() < ref_block->row_block_info().row_num) {
LOG(WARNING) << "mutable block is not large enough for storing the changed block. "
<< "mutable_block_size=" << mutable_block->capacity()
<< ", ref_block_row_num=" << ref_block->row_block_info().row_num;
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
}
mutable_block->clear();
RowCursor write_helper;
if (write_helper.init(mutable_block->tablet_schema()) != Status::OK()) {
LOG(WARNING) << "fail to init rowcursor.";
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
}
RowCursor read_helper;
if (read_helper.init(ref_block->tablet_schema()) != Status::OK()) {
LOG(WARNING) << "fail to init rowcursor.";
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
}
// a.1 First determine whether the data needs to be filtered, and finally only those marked as 1 are left as needed
// For those without filter, it is equivalent to leave after setting all to 1
const uint32_t row_num = ref_block->row_block_info().row_num;
// a.2 Calculate the left row num
uint32_t new_row_num = row_num - *filtered_rows;
const bool filter_all = (new_row_num == 0);
MemPool* mem_pool = mutable_block->mem_pool();
// b. According to the previous filtering information, only processes that are also marked as 1
for (size_t i = 0, len = mutable_block->tablet_schema()->num_columns(); !filter_all && i < len;
++i) {
int32_t ref_column = _schema_mapping[i].ref_column;
if (_schema_mapping[i].ref_column >= 0) {
if (!_schema_mapping[i].materialized_function.empty()) {
bool (*_do_materialized_transform)(RowCursor*, RowCursor*, const TabletColumn&, int,
int, MemPool*) = nullptr;
if (_schema_mapping[i].materialized_function == "to_bitmap" ||
_schema_mapping[i].materialized_function == "to_bitmap_with_check") {
_do_materialized_transform = to_bitmap;
} else if (_schema_mapping[i].materialized_function == "hll_hash") {
_do_materialized_transform = hll_hash;
} else if (_schema_mapping[i].materialized_function == "count_field") {
_do_materialized_transform = count_field;
} else {
LOG(WARNING) << "error materialized view function : "
<< _schema_mapping[i].materialized_function;
return Status::OLAPInternalError(OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID);
}
VLOG_NOTICE << "_schema_mapping[" << i << "].materialized_function : "
<< _schema_mapping[i].materialized_function;
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
mutable_block->get_row(new_row_index++, &write_helper);
ref_block->get_row(row_index, &read_helper);
if (!_do_materialized_transform(&read_helper, &write_helper,
ref_block->tablet_schema()->column(ref_column),
i, _schema_mapping[i].ref_column, mem_pool)) {
return Status::OLAPInternalError(OLAP_ERR_DATA_QUALITY_ERR);
}
}
continue;
}
// new column will be assigned as referenced column
// check if the type of new column is equal to the older's.
FieldType reftype = ref_block->tablet_schema()->column(ref_column).type();
FieldType newtype = mutable_block->tablet_schema()->column(i).type();
if (newtype == reftype) {
// Low efficiency, you can also directly calculate the variable length domain copy, but it will still destroy the package
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
// Specify the new row index to be written (different from the read row_index)
mutable_block->get_row(new_row_index++, &write_helper);
ref_block->get_row(row_index, &read_helper);
if (read_helper.is_null(ref_column)) {
write_helper.set_null(i);
} else {
write_helper.set_not_null(i);
if (newtype == OLAP_FIELD_TYPE_CHAR) {
// if modify length of CHAR type, the size of slice should be equal
// to new length.
Slice* src = (Slice*)(read_helper.cell_ptr(ref_column));
size_t size = mutable_block->tablet_schema()->column(i).length();
char* buf = reinterpret_cast<char*>(mem_pool->allocate(size));
memset(buf, 0, size);
size_t copy_size = (size < src->size) ? size : src->size;
memcpy(buf, src->data, copy_size);
Slice dst(buf, size);
write_helper.set_field_content(i, reinterpret_cast<char*>(&dst),
mem_pool);
} else {
char* src = read_helper.cell_ptr(ref_column);
write_helper.set_field_content(i, src, mem_pool);
}
}
}
} else if (ConvertTypeResolver::instance()->get_convert_type_info(reftype, newtype)) {
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
mutable_block->get_row(new_row_index++, &write_helper);
ref_block->get_row(row_index, &read_helper);
if (read_helper.is_null(ref_column)) {
write_helper.set_null(i);
} else {
write_helper.set_not_null(i);
const Field* ref_field = read_helper.column_schema(ref_column);
char* ref_value = read_helper.cell_ptr(ref_column);
Status st = write_helper.convert_from(i, ref_value, ref_field->type_info(),
mem_pool);
if (!st) {
LOG(WARNING)
<< "the column type which was altered from was unsupported."
<< "status:" << st.to_string() << ", from_type=" << reftype
<< ", to_type=" << newtype;
return st;
}
}
}
// Write column i from ref_column.
} else {
// copy and alter the field
// You can stay here for the time being, the new type does not involve type conversion for the time being
switch (reftype) {
case OLAP_FIELD_TYPE_TINYINT:
CONVERT_FROM_TYPE(int8_t);
case OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
CONVERT_FROM_TYPE(uint8_t);
case OLAP_FIELD_TYPE_SMALLINT:
CONVERT_FROM_TYPE(int16_t);
case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
CONVERT_FROM_TYPE(uint16_t);
case OLAP_FIELD_TYPE_INT:
CONVERT_FROM_TYPE(int32_t);
case OLAP_FIELD_TYPE_UNSIGNED_INT:
CONVERT_FROM_TYPE(uint32_t);
case OLAP_FIELD_TYPE_BIGINT:
CONVERT_FROM_TYPE(int64_t);
case OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
CONVERT_FROM_TYPE(uint64_t);
case OLAP_FIELD_TYPE_LARGEINT:
CONVERT_FROM_TYPE(int128_t);
default:
LOG(WARNING) << "the column type which was altered from was unsupported."
<< " from_type="
<< ref_block->tablet_schema()->column(ref_column).type();
return Status::OLAPInternalError(OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID);
}
if (newtype < reftype) {
VLOG_NOTICE << "type degraded while altering column. "
<< "column=" << mutable_block->tablet_schema()->column(i).name()
<< ", origin_type="
<< ref_block->tablet_schema()->column(ref_column).type()
<< ", alter_type="
<< mutable_block->tablet_schema()->column(i).type();
}
}
} else {
// New column, write default value
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
mutable_block->get_row(new_row_index++, &write_helper);
if (_schema_mapping[i].default_value->is_null()) {
write_helper.set_null(i);
} else {
write_helper.set_not_null(i);
write_helper.set_field_content(i, _schema_mapping[i].default_value->ptr(),
mem_pool);
}
}
}
}
// NOTE The current row_num of mutable_block is still as much as ref
// (Actually, you can re-init into less when init, the new_row_num left by the filter)
// In split_table, there may be no data due to filtering
mutable_block->finalize(new_row_num);
return Status::OK();
}
#undef CONVERT_FROM_TYPE
#undef TYPE_REINTERPRET_CAST
#undef ASSIGN_DEFAULT_VALUE
Status RowBlockChanger::change_block(vectorized::Block* ref_block,
vectorized::Block* new_block) const {
if (new_block->columns() != _schema_mapping.size()) {
LOG(WARNING) << "block does not match with schema mapping rules. "
<< "block_schema_size=" << new_block->columns()
<< ", mapping_schema_size=" << _schema_mapping.size();
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
}
ObjectPool pool;
RuntimeState* state = pool.add(new RuntimeState());
state->set_desc_tbl(&_desc_tbl);
RowDescriptor row_desc =
RowDescriptor(_desc_tbl.get_tuple_descriptor(_desc_tbl.get_row_tuples()[0]), false);
const int row_size = ref_block->rows();
const int column_size = new_block->columns();
// swap ref_block[key] and new_block[value]
std::map<int, int> swap_idx_map;
for (int idx = 0; idx < column_size; idx++) {
int ref_idx = _schema_mapping[idx].ref_column;
if (ref_idx < 0) {
// new column, write default value
auto value = _schema_mapping[idx].default_value;
auto column = new_block->get_by_position(idx).column->assume_mutable();
if (value->is_null()) {
DCHECK(column->is_nullable());
column->insert_many_defaults(row_size);
} else {
auto type_info = get_type_info(_schema_mapping[idx].new_column);
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
value->ptr(), column, row_size);
}
} else if (_schema_mapping[idx].expr != nullptr) {
// calculate special materialized function, to_bitmap/hll_hash/count_field or cast expr
vectorized::VExprContext* ctx = nullptr;
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_tree(&pool, *_schema_mapping[idx].expr, &ctx));
Defer defer {[&]() { ctx->close(state); }};
RETURN_IF_ERROR(ctx->prepare(state, row_desc));
RETURN_IF_ERROR(ctx->open(state));
int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
DCHECK(ref_block->get_by_position(result_column_id).column->size() == row_size)
<< new_block->get_by_position(idx).name << " size invalid"
<< ", expect=" << row_size
<< ", real=" << ref_block->get_by_position(result_column_id).column->size();
if (ctx->root()->node_type() == TExprNodeType::CAST_EXPR) {
RETURN_IF_ERROR(
_check_cast_valid(ref_block->get_by_position(ref_idx).column,
ref_block->get_by_position(result_column_id).column));
}
swap_idx_map[result_column_id] = idx;
} else {
// same type, just swap column
swap_idx_map[ref_idx] = idx;
}
}
for (auto it : swap_idx_map) {
auto& ref_col = ref_block->get_by_position(it.first);
auto& new_col = new_block->get_by_position(it.second);
bool ref_col_nullable = ref_col.column->is_nullable();
bool new_col_nullable = new_col.column->is_nullable();
if (ref_col_nullable != new_col_nullable) {
// not nullable to nullable
if (new_col_nullable) {
auto* new_nullable_col = assert_cast<vectorized::ColumnNullable*>(
std::move(*new_col.column).mutate().get());
new_nullable_col->swap_nested_column(ref_col.column);
new_nullable_col->get_null_map_data().resize_fill(new_nullable_col->size());
} else {
// nullable to not nullable:
// suppose column `c_phone` is originally varchar(16) NOT NULL,
// then do schema change `alter table test modify column c_phone int not null`,
// the cast expr of schema change is `CastExpr(CAST String to Nullable(Int32))`,
// so need to handle nullable to not nullable here
auto* ref_nullable_col = assert_cast<vectorized::ColumnNullable*>(
std::move(*ref_col.column).mutate().get());
const auto* null_map = ref_nullable_col->get_null_map_column().get_data().data();
for (size_t i = 0; i < row_size; i++) {
if (null_map[i]) {
return Status::DataQualityError("is_null of data is changed!");
}
}
ref_nullable_col->swap_nested_column(new_col.column);
}
} else {
new_block->get_by_position(it.second).column.swap(
ref_block->get_by_position(it.first).column);
}
}
return Status::OK();
}
// This check is to prevent schema-change from causing data loss
Status RowBlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
vectorized::ColumnPtr new_column) const {
if (ref_column->is_nullable() != new_column->is_nullable()) {
if (ref_column->is_nullable()) {
return Status::DataQualityError("Can not change nullable column to not nullable");
} else {
auto* new_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
->get_null_map_column()
.get_data()
.data();
bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
is_changed |= new_null_map[i];
}
if (is_changed) {
return Status::DataQualityError("is_null of data is changed!");
}
}
}
if (ref_column->is_nullable() && new_column->is_nullable()) {
auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
->get_null_map_column()
.get_data()
.data();
auto* new_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
->get_null_map_column()
.get_data()
.data();
bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
is_changed |= (ref_null_map[i] != new_null_map[i]);
}
if (is_changed) {
return Status::DataQualityError("is_null of data is changed!");
}
}
return Status::OK();
}
RowBlockSorter::RowBlockSorter(RowBlockAllocator* row_block_allocator)
: _row_block_allocator(row_block_allocator), _swap_row_block(nullptr) {}
RowBlockSorter::~RowBlockSorter() {
if (_swap_row_block) {
_row_block_allocator->release(_swap_row_block);
_swap_row_block = nullptr;
}
}
bool RowBlockSorter::sort(RowBlock** row_block) {
uint32_t row_num = (*row_block)->row_block_info().row_num;
bool null_supported = (*row_block)->row_block_info().null_supported;
if (_swap_row_block == nullptr || _swap_row_block->capacity() < row_num) {
if (_swap_row_block != nullptr) {
_row_block_allocator->release(_swap_row_block);
_swap_row_block = nullptr;
}
if (!_row_block_allocator->allocate(&_swap_row_block, row_num, null_supported)) {
LOG(WARNING) << "fail to allocate memory.";
return false;
}
}
RowCursor helper_row;
auto res = helper_row.init(_swap_row_block->tablet_schema());
if (!res) {
LOG(WARNING) << "row cursor init failed.res:" << res;
return false;
}
std::vector<std::unique_ptr<RowCursor>> row_cursor_list;
row_cursor_list.reserve((*row_block)->row_block_info().row_num);
// create an list of row cursor as long as the number of rows in data block.
for (size_t i = 0; i < (*row_block)->row_block_info().row_num; ++i) {
row_cursor_list.emplace_back(new (nothrow) RowCursor());
if (row_cursor_list[i] == nullptr) {
LOG(WARNING) << "failed to malloc RowCursor. size=" << sizeof(RowCursor);
return false;
}
if (row_cursor_list[i]->init((*row_block)->tablet_schema()) != Status::OK()) {
return false;
}
(*row_block)->get_row(i, row_cursor_list[i].get());
}
// Must use 'std::' because this class has a function whose name is sort too
std::stable_sort(row_cursor_list.begin(), row_cursor_list.end(), _row_cursor_comparator);
// copy the results sorted to temp row block.
_swap_row_block->clear();
for (size_t i = 0; i < row_cursor_list.size(); ++i) {
_swap_row_block->get_row(i, &helper_row);
copy_row(&helper_row, *row_cursor_list[i], _swap_row_block->mem_pool());
}
_swap_row_block->finalize(row_cursor_list.size());
// swap the row block for reducing memory allocating.
std::swap(*row_block, _swap_row_block);
return true;
}
RowBlockAllocator::RowBlockAllocator(TabletSchemaSPtr tablet_schema, size_t memory_limitation)
: _tablet_schema(tablet_schema),
_tracker(std::make_unique<MemTracker>("RowBlockAllocator")),
_row_len(tablet_schema->row_size()),
_memory_limitation(memory_limitation) {
VLOG_NOTICE << "RowBlockAllocator(). row_len=" << _row_len;
}
RowBlockAllocator::~RowBlockAllocator() {
if (_tracker->consumption() != 0) {
LOG(WARNING) << "memory lost in RowBlockAllocator. memory_size=" << _tracker->consumption();
}
}
Status RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bool null_supported) {
size_t row_block_size = _row_len * num_rows;
if (_memory_limitation > 0 && _tracker->consumption() + row_block_size > _memory_limitation) {
*row_block = nullptr;
return Status::OLAPInternalError(OLAP_ERR_FETCH_MEMORY_EXCEEDED);
}
// TODO(lijiao) : Why abandon the original m_row_block_buffer
*row_block = new (nothrow) RowBlock(_tablet_schema);
if (*row_block == nullptr) {
LOG(WARNING) << "failed to malloc RowBlock. size=" << sizeof(RowBlock);
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
}
RowBlockInfo row_block_info(0U, num_rows);
row_block_info.null_supported = null_supported;
(*row_block)->init(row_block_info);
_tracker->consume(row_block_size);
VLOG_NOTICE << "RowBlockAllocator::allocate() this=" << this << ", num_rows=" << num_rows
<< ", m_memory_allocated=" << _tracker->consumption()
<< ", row_block_addr=" << *row_block;
return Status::OK();
}
void RowBlockAllocator::release(RowBlock* row_block) {
if (row_block == nullptr) {
LOG(INFO) << "null row block released.";
return;
}
_tracker->release(row_block->capacity() * _row_len);
VLOG_NOTICE << "RowBlockAllocator::release() this=" << this
<< ", num_rows=" << row_block->capacity()
<< ", m_memory_allocated=" << _tracker->consumption()
<< ", row_block_addr=" << row_block;
delete row_block;
}
bool RowBlockAllocator::is_memory_enough_for_sorting(size_t num_rows, size_t allocated_rows) {
if (num_rows <= allocated_rows) {
return true;
}
size_t row_block_size = _row_len * (num_rows - allocated_rows);
return _tracker->consumption() + row_block_size < _memory_limitation;
}
RowBlockMerger::RowBlockMerger(TabletSharedPtr tablet) : _tablet(tablet) {}
RowBlockMerger::~RowBlockMerger() = default;
bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer,
uint64_t* merged_rows) {
uint64_t tmp_merged_rows = 0;
RowCursor row_cursor;
std::unique_ptr<MemPool> mem_pool(new MemPool());
std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
auto merge_error = [&]() -> bool {
while (!_heap.empty()) {
MergeElement element = _heap.top();
_heap.pop();
SAFE_DELETE(element.row_cursor);
}
return false;
};
if (row_cursor.init(_tablet->tablet_schema()) != Status::OK()) {
LOG(WARNING) << "fail to init row cursor.";
return merge_error();
}
if (!_make_heap(row_block_arr)) {
// There is error log in _make_heap, so no need to more log.
return merge_error();
}
row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
while (_heap.size() > 0) {
init_row_with_others(&row_cursor, *(_heap.top().row_cursor), mem_pool.get(),
agg_object_pool.get());
_pop_heap();
if (KeysType::DUP_KEYS == _tablet->keys_type()) {
if (rowset_writer->add_row(row_cursor) != Status::OK()) {
LOG(WARNING) << "fail to add row to rowset writer.";
return merge_error();
}
continue;
}
while (!_heap.empty() && compare_row(row_cursor, *_heap.top().row_cursor) == 0) {
// TODO(zc): Currently we keep nullptr to indicate that this is a query path,
// we should fix this trick ASAP
agg_update_row(&row_cursor, *(_heap.top().row_cursor), nullptr);
++tmp_merged_rows;
_pop_heap();
}
agg_finalize_row(&row_cursor, mem_pool.get());
if (rowset_writer->add_row(row_cursor) != Status::OK()) {
LOG(WARNING) << "fail to add row to rowset writer.";
return merge_error();
}
// the memory allocate by mem pool has been copied,
// so we should release memory immediately
mem_pool->clear();
agg_object_pool.reset(new ObjectPool());
}
if (rowset_writer->flush() != Status::OK()) {
LOG(WARNING) << "failed to finalizing writer.";
return merge_error();
}
*merged_rows = tmp_merged_rows;
return true;
}
bool RowBlockMerger::_make_heap(const std::vector<RowBlock*>& row_block_arr) {
for (auto row_block : row_block_arr) {
MergeElement element;
element.row_block = row_block;
element.row_block_index = 0;
element.row_cursor = new (nothrow) RowCursor();
if (element.row_cursor == nullptr) {
LOG(FATAL) << "failed to malloc RowCursor. size=" << sizeof(RowCursor);
return false;
}
if (element.row_cursor->init(element.row_block->tablet_schema()) != Status::OK()) {
LOG(WARNING) << "failed to init row cursor.";
SAFE_DELETE(element.row_cursor);
return false;
}
element.row_block->get_row(element.row_block_index, element.row_cursor);
_heap.push(element);
}
return true;
}
void RowBlockMerger::_pop_heap() {
MergeElement element = _heap.top();
_heap.pop();
if (++element.row_block_index >= element.row_block->row_block_info().row_num) {
SAFE_DELETE(element.row_cursor);
return;
}
element.row_block->get_row(element.row_block_index, element.row_cursor);
_heap.push(element);
}
Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet,
TabletSchemaSPtr base_tablet_schema) {
// In some cases, there may be more than one type of rowset in a tablet,
// in which case the conversion cannot be done directly by linked schema change,
// but requires direct schema change to rewrite the data.
if (rowset_reader->type() != rowset_writer->type()) {
LOG(INFO) << "the type of rowset " << rowset_reader->rowset()->rowset_id()
<< " in base tablet is not same as type " << rowset_writer->type()
<< ", use direct schema change.";
return SchemaChangeHandler::get_sc_procedure(_row_block_changer, false, true)
->process(rowset_reader, rowset_writer, new_tablet, base_tablet,
base_tablet_schema);
} else {
Status status = rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset());
if (!status) {
LOG(WARNING) << "fail to convert rowset."
<< ", new_tablet=" << new_tablet->full_name()
<< ", version=" << rowset_writer->version().first << "-"
<< rowset_writer->version().second << ", error status " << status;
return status;
}
// copy delete bitmap to new tablet.
if (new_tablet->keys_type() == UNIQUE_KEYS &&
new_tablet->enable_unique_key_merge_on_write()) {
DeleteBitmap origin_delete_bitmap(base_tablet->tablet_id());
base_tablet->tablet_meta()->delete_bitmap().subset(
{rowset_reader->rowset()->rowset_id(), 0, 0},
{rowset_reader->rowset()->rowset_id(), UINT32_MAX, INT64_MAX},
&origin_delete_bitmap);
for (auto iter = origin_delete_bitmap.delete_bitmap.begin();
iter != origin_delete_bitmap.delete_bitmap.end(); ++iter) {
int ret = new_tablet->tablet_meta()->delete_bitmap().set(
{rowset_writer->rowset_id(), std::get<1>(iter->first),
std::get<2>(iter->first)},
iter->second);
DCHECK(ret == 1);
}
}
return Status::OK();
}
}
SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer)
: _row_block_changer(row_block_changer), _row_block_allocator(nullptr), _cursor(nullptr) {}
SchemaChangeDirectly::~SchemaChangeDirectly() {
VLOG_NOTICE << "~SchemaChangeDirectly()";
SAFE_DELETE(_row_block_allocator);
SAFE_DELETE(_cursor);
}
bool SchemaChangeDirectly::_write_row_block(RowsetWriter* rowset_writer, RowBlock* row_block) {
for (uint32_t i = 0; i < row_block->row_block_info().row_num; i++) {
row_block->get_row(i, _cursor);
if (!rowset_writer->add_row(*_cursor)) {
LOG(WARNING) << "fail to write to new rowset for direct schema change";
return false;
}
}
return true;
}
Status reserve_block(std::unique_ptr<RowBlock, RowBlockDeleter>* block_handle_ptr, int row_num,
RowBlockAllocator* allocator) {
auto& block_handle = *block_handle_ptr;
if (block_handle == nullptr || block_handle->capacity() < row_num) {
// release old block and alloc new block
if (block_handle != nullptr) {
block_handle.reset();
}
RowBlock* new_row_block = nullptr;
auto res = allocator->allocate(&new_row_block, row_num, true);
RETURN_NOT_OK_LOG(res, "failed to allocate RowBlock.");
block_handle.reset(new_row_block);
} else {
block_handle->clear();
}
return Status::OK();
}
Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer, TabletSharedPtr new_tablet,
TabletSchemaSPtr base_tablet_schema) {
if (_row_block_allocator == nullptr) {
_row_block_allocator = new RowBlockAllocator(new_tablet->tablet_schema(), 0);
if (_row_block_allocator == nullptr) {
LOG(FATAL) << "failed to malloc RowBlockAllocator. size=" << sizeof(RowBlockAllocator);
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
}
}
if (nullptr == _cursor) {
_cursor = new (nothrow) RowCursor();
if (nullptr == _cursor) {
LOG(WARNING) << "fail to allocate row cursor.";
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
}
if (!_cursor->init(new_tablet->tablet_schema())) {
LOG(WARNING) << "fail to init row cursor.";
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
}
}
Status res = Status::OK();
VLOG_NOTICE << "init writer. new_tablet=" << new_tablet->full_name()
<< ", block_row_number=" << new_tablet->num_rows_per_row_block();
std::unique_ptr<RowBlock, RowBlockDeleter> new_row_block(nullptr, [&](RowBlock* block) {
if (block != nullptr) {
_row_block_allocator->release(block);
}
});
RowBlock* ref_row_block = nullptr;
rowset_reader->next_block(&ref_row_block);
while (ref_row_block != nullptr && ref_row_block->has_remaining()) {
// We will allocate blocks of the same size as before
// to ensure that the data can be stored
RETURN_NOT_OK(reserve_block(&new_row_block, ref_row_block->row_block_info().row_num,
_row_block_allocator));
// Change ref to new. This step is reasonable to say that it does need to wait for a large block, but theoretically it has nothing to do with the writer.
uint64_t filtered_rows = 0;
res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second,
new_row_block.get(), &filtered_rows);
RETURN_NOT_OK_LOG(res, "failed to change data in row block.");
// rows filtered by delete handler one by one
_add_filtered_rows(filtered_rows);
if (!_write_row_block(rowset_writer, new_row_block.get())) {
res = Status::OLAPInternalError(OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID);
LOG(WARNING) << "failed to write row block.";
return res;
}
ref_row_block->clear();
rowset_reader->next_block(&ref_row_block);
}
if (!rowset_writer->flush()) {
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
return res;
}
Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet,
TabletSchemaSPtr base_tablet_schema) {
do {
auto new_block =
std::make_unique<vectorized::Block>(new_tablet->tablet_schema()->create_block());
auto ref_block = std::make_unique<vectorized::Block>(base_tablet_schema->create_block());
rowset_reader->next_block(ref_block.get());
if (ref_block->rows() < 1) {
break;
}
RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
RETURN_IF_ERROR(rowset_writer->add_block(new_block.get()));
} while (true);
if (!rowset_writer->flush()) {
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
return Status::OK();
}
SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
size_t memory_limitation)
: _row_block_changer(row_block_changer),
_memory_limitation(memory_limitation),
_temp_delta_versions(Version::mock()),
_row_block_allocator(nullptr) {}
SchemaChangeWithSorting::~SchemaChangeWithSorting() {
VLOG_NOTICE << "~SchemaChangeWithSorting()";
SAFE_DELETE(_row_block_allocator);
}
VSchemaChangeWithSorting::VSchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
size_t memory_limitation)
: _changer(row_block_changer),
_memory_limitation(memory_limitation),
_temp_delta_versions(Version::mock()) {
_mem_tracker = std::make_unique<MemTracker>(fmt::format(
"VSchemaChangeWithSorting:changer={}", std::to_string(int64(&row_block_changer))));
}
Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet,
TabletSchemaSPtr base_tablet_schema) {
if (_row_block_allocator == nullptr) {
_row_block_allocator =
new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _memory_limitation);
if (_row_block_allocator == nullptr) {
LOG(FATAL) << "failed to malloc RowBlockAllocator. size=" << sizeof(RowBlockAllocator);
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
}
}
Status res = Status::OK();
RowsetSharedPtr rowset = rowset_reader->rowset();
RowBlockSorter row_block_sorter(_row_block_allocator);
// for internal sorting
RowBlock* new_row_block = nullptr;
std::vector<RowBlock*> row_block_arr;
// for external sorting
// src_rowsets to store the rowset generated by internal sorting
std::vector<RowsetSharedPtr> src_rowsets;
Defer defer {[&]() {
// remove the intermediate rowsets generated by internal sorting
for (auto& row_set : src_rowsets) {
StorageEngine::instance()->add_unused_rowset(row_set);
}
for (auto block : row_block_arr) {
_row_block_allocator->release(block);
}
row_block_arr.clear();
}};
_temp_delta_versions.first = _temp_delta_versions.second;
SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
int64_t oldest_write_timestamp = rowset->oldest_write_timestamp();
int64_t newest_write_timestamp = rowset->newest_write_timestamp();
RowBlock* ref_row_block = nullptr;
rowset_reader->next_block(&ref_row_block);
while (ref_row_block != nullptr && ref_row_block->has_remaining()) {
auto st = _row_block_allocator->allocate(&new_row_block,
ref_row_block->row_block_info().row_num, true);
// if OLAP_ERR_FETCH_MEMORY_EXCEEDED == st.precise_code()
// that mean RowBlockAllocator::alocate() memory exceeded.
// But we can flush row_block_arr if row_block_arr is not empty.
// Don't return directly.
if (OLAP_ERR_MALLOC_ERROR == st.precise_code()) {
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
} else if (st) {
// do memory check for sorting, in case schema change task fail at row block sorting because of
// not doing internal sorting first
if (!_row_block_allocator->is_memory_enough_for_sorting(
ref_row_block->row_block_info().row_num, row_block_sorter.num_rows())) {
if (new_row_block != nullptr) {
_row_block_allocator->release(new_row_block);
new_row_block = nullptr;
}
}
}
if (new_row_block == nullptr) {
if (row_block_arr.empty()) {
LOG(WARNING) << "Memory limitation is too small for Schema Change."
<< "memory_limitation=" << _memory_limitation
<< "You can increase the memory "
<< "by changing the "
"Config.memory_limitation_per_thread_for_schema_change_bytes";
return Status::OLAPInternalError(OLAP_ERR_FETCH_MEMORY_EXCEEDED);
}
// enter here while memory limitation is reached.
RowsetSharedPtr rowset;
if (!_internal_sorting(
row_block_arr,
Version(_temp_delta_versions.second, _temp_delta_versions.second),
oldest_write_timestamp, newest_write_timestamp, new_tablet,
segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
src_rowsets.push_back(rowset);
for (auto block : row_block_arr) {
_row_block_allocator->release(block);
}
row_block_arr.clear();
// increase temp version
++_temp_delta_versions.second;
continue;
}
uint64_t filtered_rows = 0;
res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second,
new_row_block, &filtered_rows);
if (!res) {
row_block_arr.push_back(new_row_block);
LOG(WARNING) << "failed to change data in row block.";
return res;
}
_add_filtered_rows(filtered_rows);
if (new_row_block->row_block_info().row_num > 0) {
if (!row_block_sorter.sort(&new_row_block)) {
row_block_arr.push_back(new_row_block);
LOG(WARNING) << "failed to sort row block.";
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
row_block_arr.push_back(new_row_block);
} else {
LOG(INFO) << "new block num rows is: " << new_row_block->row_block_info().row_num;
_row_block_allocator->release(new_row_block);
new_row_block = nullptr;
}
ref_row_block->clear();
rowset_reader->next_block(&ref_row_block);
}
if (!row_block_arr.empty()) {
// enter here while memory limitation is reached.
RowsetSharedPtr rowset = nullptr;
if (!_internal_sorting(row_block_arr,
Version(_temp_delta_versions.second, _temp_delta_versions.second),
oldest_write_timestamp, newest_write_timestamp, new_tablet,
segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
src_rowsets.push_back(rowset);
for (auto block : row_block_arr) {
_row_block_allocator->release(block);
}
row_block_arr.clear();
// increase temp version
++_temp_delta_versions.second;
}
if (src_rowsets.empty()) {
res = rowset_writer->flush();
if (!res) {
LOG(WARNING) << "create empty version for schema change failed."
<< " version=" << rowset_writer->version().first << "-"
<< rowset_writer->version().second;
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
} else if (!_external_sorting(src_rowsets, rowset_writer, new_tablet)) {
LOG(WARNING) << "failed to sorting externally.";
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
return res;
}
Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet,
TabletSchemaSPtr base_tablet_schema) {
// for internal sorting
std::vector<std::unique_ptr<vectorized::Block>> blocks;
// for external sorting
// src_rowsets to store the rowset generated by internal sorting
std::vector<RowsetSharedPtr> src_rowsets;
Defer defer {[&]() {
// remove the intermediate rowsets generated by internal sorting
for (auto& row_set : src_rowsets) {
StorageEngine::instance()->add_unused_rowset(row_set);
}
}};
RowsetSharedPtr rowset = rowset_reader->rowset();
SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
int64_t oldest_write_timestamp = rowset->oldest_write_timestamp();
int64_t newest_write_timestamp = rowset->newest_write_timestamp();
_temp_delta_versions.first = _temp_delta_versions.second;
auto create_rowset = [&]() -> Status {
if (blocks.empty()) {
return Status::OK();
}
RowsetSharedPtr rowset;
RETURN_IF_ERROR(_internal_sorting(
blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second),
oldest_write_timestamp, newest_write_timestamp, new_tablet, BETA_ROWSET,
segments_overlap, &rowset));
src_rowsets.push_back(rowset);
for (auto& block : blocks) {
_mem_tracker->release(block->allocated_bytes());
}
blocks.clear();
// increase temp version
_temp_delta_versions.second++;
return Status::OK();
};
auto new_block =
std::make_unique<vectorized::Block>(new_tablet->tablet_schema()->create_block());
do {
auto ref_block = std::make_unique<vectorized::Block>(base_tablet_schema->create_block());
rowset_reader->next_block(ref_block.get());
if (ref_block->rows() < 1) {
break;
}
RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation) {
RETURN_IF_ERROR(create_rowset());
if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation) {
LOG(WARNING) << "Memory limitation is too small for Schema Change."
<< " _memory_limitation=" << _memory_limitation
<< ", new_block->allocated_bytes()=" << new_block->allocated_bytes()
<< ", consumption=" << _mem_tracker->consumption();
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
}
}
_mem_tracker->consume(new_block->allocated_bytes());
// move unique ptr
blocks.push_back(
std::make_unique<vectorized::Block>(new_tablet->tablet_schema()->create_block()));
swap(blocks.back(), new_block);
} while (true);
RETURN_IF_ERROR(create_rowset());
if (src_rowsets.empty()) {
RETURN_IF_ERROR(rowset_writer->flush());
} else {
RETURN_IF_ERROR(_external_sorting(src_rowsets, rowset_writer, new_tablet));
}
return Status::OK();
}
bool SchemaChangeWithSorting::_internal_sorting(
const std::vector<RowBlock*>& row_block_arr, const Version& version,
int64_t oldest_write_timestamp, int64_t newest_write_timestamp, TabletSharedPtr new_tablet,
SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) {
uint64_t merged_rows = 0;
RowBlockMerger merger(new_tablet);
VLOG_NOTICE << "init rowset builder. tablet=" << new_tablet->full_name()
<< ", block_row_size=" << new_tablet->num_rows_per_row_block();
std::unique_ptr<RowsetWriter> rowset_writer;
if (!new_tablet->create_rowset_writer(version, VISIBLE, segments_overlap,
new_tablet->tablet_schema(), oldest_write_timestamp,
newest_write_timestamp, &rowset_writer)) {
return false;
}
if (!merger.merge(row_block_arr, rowset_writer.get(), &merged_rows)) {
LOG(WARNING) << "failed to merge row blocks.";
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
return false;
}
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
_add_merged_rows(merged_rows);
*rowset = rowset_writer->build();
return true;
}
Status VSchemaChangeWithSorting::_internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
int64_t oldest_write_timestamp, int64_t newest_write_timestamp, TabletSharedPtr new_tablet,
RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) {
uint64_t merged_rows = 0;
MultiBlockMerger merger(new_tablet);
std::unique_ptr<RowsetWriter> rowset_writer;
RETURN_IF_ERROR(new_tablet->create_rowset_writer(
version, VISIBLE, segments_overlap, new_tablet->tablet_schema(), oldest_write_timestamp,
newest_write_timestamp, &rowset_writer));
Defer defer {[&]() {
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
}};
RETURN_IF_ERROR(merger.merge(blocks, rowset_writer.get(), &merged_rows));
_add_merged_rows(merged_rows);
*rowset = rowset_writer->build();
return Status::OK();
}
bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet) {
std::vector<RowsetReaderSharedPtr> rs_readers;
for (auto& rowset : src_rowsets) {
RowsetReaderSharedPtr rs_reader;
auto res = rowset->create_reader(&rs_reader);
if (!res) {
LOG(WARNING) << "failed to create rowset reader.";
return false;
}
rs_readers.push_back(rs_reader);
}
Merger::Statistics stats;
auto res = Merger::merge_rowsets(new_tablet, READER_ALTER_TABLE, new_tablet->tablet_schema(),
rs_readers, rowset_writer, &stats);
if (!res) {
LOG(WARNING) << "failed to merge rowsets. tablet=" << new_tablet->full_name()
<< ", version=" << rowset_writer->version().first << "-"
<< rowset_writer->version().second;
return false;
}
_add_merged_rows(stats.merged_rows);
_add_filtered_rows(stats.filtered_rows);
return true;
}
Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet) {
std::vector<RowsetReaderSharedPtr> rs_readers;
for (auto& rowset : src_rowsets) {
RowsetReaderSharedPtr rs_reader;
RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
rs_readers.push_back(rs_reader);
}
Merger::Statistics stats;
RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, READER_ALTER_TABLE,
new_tablet->tablet_schema(), rs_readers, rowset_writer,
&stats));
_add_merged_rows(stats.merged_rows);
_add_filtered_rows(stats.filtered_rows);
return Status::OK();
}
Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) {
if (!request.__isset.desc_tbl) {
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR)
.append("desc_tbl is not set. Maybe the FE version is not equal to the BE "
"version.");
}
LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
<< ", new_tablet_id=" << request.new_tablet_id
<< ", alter_version=" << request.alter_version;
TabletSharedPtr base_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(request.base_tablet_id);
if (base_tablet == nullptr) {
LOG(WARNING) << "fail to find base tablet. base_tablet=" << request.base_tablet_id;
return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
}
// Lock schema_change_lock util schema change info is stored in tablet header
std::unique_lock<std::mutex> schema_change_lock(base_tablet->get_schema_change_lock(),
std::try_to_lock);
if (!schema_change_lock.owns_lock()) {
LOG(WARNING) << "failed to obtain schema change lock. "
<< "base_tablet=" << request.base_tablet_id;
return Status::OLAPInternalError(OLAP_ERR_TRY_LOCK_FAILED);
}
Status res = _do_process_alter_tablet_v2(request);
LOG(INFO) << "finished alter tablet process, res=" << res;
return res;
}
std::shared_mutex SchemaChangeHandler::_mutex;
std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting;
std::set<std::string> SchemaChangeHandler::_supported_functions = {"hll_hash", "to_bitmap",
"to_bitmap_with_check"};
// In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished
// It will cost a lot of time to wait and the task is very difficult to understand.
// In alter task v2, FE will call BE to create tablet and send an alter task to BE to convert historical data.
// The admin should upgrade all BE and then upgrade FE.
// Should delete the old code after upgrade finished.
Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& request) {
Status res = Status::OK();
TabletSharedPtr base_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(request.base_tablet_id);
if (base_tablet == nullptr) {
LOG(WARNING) << "fail to find base tablet. base_tablet=" << request.base_tablet_id;
return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
}
// new tablet has to exist
TabletSharedPtr new_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(request.new_tablet_id);
if (new_tablet == nullptr) {
LOG(WARNING) << "fail to find new tablet."
<< " new_tablet=" << request.new_tablet_id;
return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
}
// check if tablet's state is not_ready, if it is ready, it means the tablet already finished
// check whether the tablet's max continuous version == request.version
if (new_tablet->tablet_state() != TABLET_NOTREADY) {
res = _validate_alter_result(new_tablet, request);
LOG(INFO) << "tablet's state=" << new_tablet->tablet_state()
<< " the convert job already finished, check its version"
<< " res=" << res;
return res;
}
LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet "
"to new tablet"
<< " base_tablet=" << base_tablet->full_name()
<< " new_tablet=" << new_tablet->full_name();
std::shared_lock base_migration_rlock(base_tablet->get_migration_lock(), std::try_to_lock);
if (!base_migration_rlock.owns_lock()) {
return Status::OLAPInternalError(OLAP_ERR_RWLOCK_ERROR);
}
std::shared_lock new_migration_rlock(new_tablet->get_migration_lock(), std::try_to_lock);
if (!new_migration_rlock.owns_lock()) {
return Status::OLAPInternalError(OLAP_ERR_RWLOCK_ERROR);
}
std::vector<Version> versions_to_be_changed;
int64_t end_version = -1;
// reader_context is stack variables, it's lifetime should keep the same
// with rs_readers
RowsetReaderContext reader_context;
std::vector<RowsetReaderSharedPtr> rs_readers;
// delete handlers for new tablet
DeleteHandler delete_handler;
std::vector<ColumnId> return_columns;
// Create a new tablet schema, should merge with dropped columns in light weight schema change
TabletSchemaSPtr base_tablet_schema = std::make_shared<TabletSchema>();
base_tablet_schema->copy_from(*base_tablet->tablet_schema());
if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) {
base_tablet_schema->clear_columns();
for (const auto& column : request.columns) {
base_tablet_schema->append_column(TabletColumn(column));
}
}
// Use tablet schema directly from base tablet, they are the newest schema, not contain
// dropped column during light weight schema change.
// But the tablet schema in base tablet maybe not the latest from FE, so that if fe pass through
// a tablet schema, then use request schema.
size_t num_cols = request.columns.empty() ? base_tablet->tablet_schema()->num_columns()
: request.columns.size();
return_columns.resize(num_cols);
for (int i = 0; i < num_cols; ++i) {
return_columns[i] = i;
}
// begin to find deltas to convert from base tablet to new tablet so that
// obtain base tablet and new tablet's push lock and header write lock to prevent loading data
{
std::lock_guard<std::mutex> base_tablet_lock(base_tablet->get_push_lock());
std::lock_guard<std::mutex> new_tablet_lock(new_tablet->get_push_lock());
std::lock_guard<std::shared_mutex> base_tablet_wlock(base_tablet->get_header_lock());
std::lock_guard<std::shared_mutex> new_tablet_wlock(new_tablet->get_header_lock());
do {
RowsetSharedPtr max_rowset;
// get history data to be converted and it will check if there is hold in base tablet
if (!_get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset)) {
LOG(WARNING) << "fail to get version to be changed. res=" << res;
break;
}
// should check the max_version >= request.alter_version, if not the convert is useless
if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) {
res = Status::InternalError(
"base tablet's max version={} is less than request version={}",
(max_rowset == nullptr ? 0 : max_rowset->end_version()),
request.alter_version);
break;
}
// before calculating version_to_be_changed,
// remove all data from new tablet, prevent to rewrite data(those double pushed when wait)
LOG(INFO) << "begin to remove all data from new tablet to prevent rewrite."
<< " new_tablet=" << new_tablet->full_name();
std::vector<RowsetSharedPtr> rowsets_to_delete;
std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
new_tablet->acquire_version_and_rowsets(&version_rowsets);
std::sort(version_rowsets.begin(), version_rowsets.end(),
[](const std::pair<Version, RowsetSharedPtr>& l,
const std::pair<Version, RowsetSharedPtr>& r) {
return l.first.first < r.first.first;
});
for (auto& pair : version_rowsets) {
if (pair.first.second <= max_rowset->end_version()) {
rowsets_to_delete.push_back(pair.second);
} else if (pair.first.first <= max_rowset->end_version()) {
// If max version is [X-10] and new tablet has version [7-9][10-12],
// we only can remove [7-9] from new tablet. If we add [X-10] to new tablet, it will has version
// cross: [X-10] [10-12].
// So, we should return OLAP_ERR_VERSION_ALREADY_MERGED for fast fail.
LOG(WARNING) << "New tablet has a version " << pair.first
<< " crossing base tablet's max_version="
<< max_rowset->end_version();
return Status::OLAPInternalError(OLAP_ERR_VERSION_ALREADY_MERGED);
}
}
std::vector<RowsetSharedPtr> empty_vec;
new_tablet->modify_rowsets(empty_vec, rowsets_to_delete);
// inherit cumulative_layer_point from base_tablet
// check if new_tablet.ce_point > base_tablet.ce_point?
new_tablet->set_cumulative_layer_point(-1);
// save tablet meta
new_tablet->save_meta();
for (auto& rowset : rowsets_to_delete) {
// do not call rowset.remove directly, using gc thread to delete it
StorageEngine::instance()->add_unused_rowset(rowset);
}
// init one delete handler
for (auto& version : versions_to_be_changed) {
end_version = std::max(end_version, version.second);
}
// acquire data sources correspond to history versions
base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers);
if (rs_readers.empty()) {
LOG(WARNING) << "fail to acquire all data sources. "
<< "version_num=" << versions_to_be_changed.size()
<< ", data_source_num=" << rs_readers.size();
res = Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS);
break;
}
auto& all_del_preds = base_tablet->delete_predicates();
for (auto& delete_pred : all_del_preds) {
if (delete_pred->version().first > end_version) {
continue;
}
base_tablet_schema->merge_dropped_columns(
base_tablet->tablet_schema(delete_pred->version()));
}
res = delete_handler.init(base_tablet_schema, all_del_preds, end_version);
if (!res) {
LOG(WARNING) << "init delete handler failed. base_tablet="
<< base_tablet->full_name() << ", end_version=" << end_version;
break;
}
reader_context.reader_type = READER_ALTER_TABLE;
reader_context.tablet_schema = base_tablet_schema;
reader_context.need_ordered_result = true;
reader_context.delete_handler = &delete_handler;
reader_context.return_columns = &return_columns;
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.is_vec = config::enable_vectorized_alter_table;
reader_context.delete_bitmap = &base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, end_version);
for (auto& rs_reader : rs_readers) {
res = rs_reader->init(&reader_context);
if (!res) {
LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name();
break;
}
}
} while (false);
}
do {
if (!res) {
break;
}
SchemaChangeParams sc_params;
DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl);
sc_params.base_tablet = base_tablet;
sc_params.new_tablet = new_tablet;
sc_params.ref_rowset_readers = rs_readers;
sc_params.delete_handler = &delete_handler;
sc_params.base_tablet_schema = base_tablet_schema;
if (request.__isset.materialized_view_params) {
for (auto item : request.materialized_view_params) {
AlterMaterializedViewParam mv_param;
mv_param.column_name = item.column_name;
/*
* origin_column_name is always be set now,
* but origin_column_name may be not set in some materialized view function. eg:count(1)
*/
if (item.__isset.origin_column_name) {
mv_param.origin_column_name = item.origin_column_name;
}
/*
* TODO(lhy)
* Building the materialized view function for schema_change here based on defineExpr.
* This is a trick because the current storage layer does not support expression evaluation.
* We can refactor this part of the code until the uniform expression evaluates the logic.
* count distinct materialized view will set mv_expr with to_bitmap or hll_hash.
* count materialized view will set mv_expr with count.
*/
if (item.__isset.mv_expr) {
if (item.mv_expr.nodes[0].node_type == TExprNodeType::FUNCTION_CALL) {
mv_param.mv_expr = item.mv_expr.nodes[0].fn.name.function_name;
if (!_supported_functions.count(mv_param.mv_expr)) {
return Status::NotSupported("Unknow materialized view expr " +
mv_param.mv_expr);
}
} else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) {
mv_param.mv_expr = "count_field";
}
mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
}
sc_params.materialized_params_map.insert(
std::make_pair(item.column_name, mv_param));
}
}
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.insert(new_tablet->tablet_id());
}
res = _convert_historical_rowsets(sc_params);
if (new_tablet->keys_type() != UNIQUE_KEYS ||
!new_tablet->enable_unique_key_merge_on_write() || !res) {
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
}
}
if (!res) {
break;
}
// For unique with merge-on-write table, should process delete bitmap here.
// 1. During double write, the newly imported rowsets does not calculate
// delete bitmap and publish successfully.
// 2. After conversion, calculate delete bitmap for the rowsets imported
// during double write. During this period, new data can still be imported
// witout calculating delete bitmap and publish successfully.
// 3. Block the new publish, calculate the delete bitmap of the
// incremental rowsets.
// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
// data will calculate delete bitmap.
if (new_tablet->keys_type() == UNIQUE_KEYS &&
new_tablet->enable_unique_key_merge_on_write()) {
// step 2
int64_t max_version = new_tablet->max_version().second;
std::vector<RowsetSharedPtr> rowsets;
if (end_version < max_version) {
LOG(INFO)
<< "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "double write rowsets for version: " << end_version + 1 << "-"
<< max_version;
RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
{end_version + 1, max_version}, &rowsets));
}
for (auto rowset_ptr : rowsets) {
if (rowset_ptr->version().second <= end_version) {
continue;
}
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
std::shared_lock<std::shared_mutex> wrlock(new_tablet->get_header_lock());
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
}
// step 3
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
int64_t new_max_version = new_tablet->max_version().second;
rowsets.clear();
if (max_version < new_max_version) {
LOG(INFO)
<< "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "incremental rowsets for version: " << max_version + 1 << "-"
<< new_max_version;
RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
{max_version + 1, new_max_version}, &rowsets));
}
for (auto rowset_ptr : rowsets) {
if (rowset_ptr->version().second <= max_version) {
continue;
}
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
}
// step 4
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
}
res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
if (!res) {
break;
}
new_tablet->save_meta();
} else {
// set state to ready
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
if (!res) {
break;
}
new_tablet->save_meta();
}
} while (false);
if (res) {
// _validate_alter_result should be outside the above while loop.
// to avoid requiring the header lock twice.
res = _validate_alter_result(new_tablet, request);
}
// if failed convert history data, then just remove the new tablet
if (!res) {
LOG(WARNING) << "failed to alter tablet. base_tablet=" << base_tablet->full_name()
<< ", drop new_tablet=" << new_tablet->full_name();
// do not drop the new tablet and its data. GC thread will
}
return res;
}
bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) {
std::shared_lock rdlock(_mutex);
return _tablet_ids_in_converting.find(tablet_id) != _tablet_ids_in_converting.end();
}
Status SchemaChangeHandler::_get_versions_to_be_changed(
TabletSharedPtr base_tablet, std::vector<Version>* versions_to_be_changed,
RowsetSharedPtr* max_rowset) {
RowsetSharedPtr rowset = base_tablet->rowset_with_max_version();
if (rowset == nullptr) {
LOG(WARNING) << "Tablet has no version. base_tablet=" << base_tablet->full_name();
return Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS);
}
*max_rowset = rowset;
RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0, rowset->version().second),
versions_to_be_changed));
return Status::OK();
}
Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params) {
LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet."
<< " base_tablet=" << sc_params.base_tablet->full_name()
<< ", new_tablet=" << sc_params.new_tablet->full_name();
// find end version
int32_t end_version = -1;
for (size_t i = 0; i < sc_params.ref_rowset_readers.size(); ++i) {
if (sc_params.ref_rowset_readers[i]->version().second > end_version) {
end_version = sc_params.ref_rowset_readers[i]->version().second;
}
}
// Add filter information in change, and filter column information will be set in _parse_request
// And filter some data every time the row block changes
RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler,
*sc_params.desc_tbl);
bool sc_sorting = false;
bool sc_directly = false;
// a.Parse the Alter request and convert it into an internal representation
Status res = _parse_request(sc_params, &rb_changer, &sc_sorting, &sc_directly);
auto process_alter_exit = [&]() -> Status {
{
// save tablet meta here because rowset meta is not saved during add rowset
std::lock_guard<std::shared_mutex> new_wlock(sc_params.new_tablet->get_header_lock());
sc_params.new_tablet->save_meta();
}
if (res) {
Version test_version(0, end_version);
res = sc_params.new_tablet->check_version_integrity(test_version);
}
LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
<< "base_tablet=" << sc_params.base_tablet->full_name()
<< ", new_tablet=" << sc_params.new_tablet->full_name();
return res;
};
if (!res) {
LOG(WARNING) << "failed to parse the request. res=" << res;
return process_alter_exit();
}
// b. Generate historical data converter
auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly);
// c.Convert historical data
for (auto& rs_reader : sc_params.ref_rowset_readers) {
VLOG_TRACE << "begin to convert a history rowset. version=" << rs_reader->version().first
<< "-" << rs_reader->version().second;
// set status for monitor
// As long as there is a new_table as running, ref table is set as running
// NOTE If the first sub_table fails first, it will continue to go as normal here
TabletSharedPtr new_tablet = sc_params.new_tablet;
// When tablet create new rowset writer, it may change rowset type, in this case
// linked schema change will not be used.
std::unique_ptr<RowsetWriter> rowset_writer;
Status status = new_tablet->create_rowset_writer(
rs_reader->version(), VISIBLE,
rs_reader->rowset()->rowset_meta()->segments_overlap(), new_tablet->tablet_schema(),
rs_reader->oldest_write_timestamp(), rs_reader->newest_write_timestamp(),
rs_reader->rowset()->rowset_meta()->fs(), &rowset_writer);
if (!status.ok()) {
res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT);
return process_alter_exit();
}
if (res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet,
sc_params.base_tablet, sc_params.base_tablet_schema);
!res) {
LOG(WARNING) << "failed to process the version."
<< " version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
return process_alter_exit();
}
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
// Add the new version of the data to the header
// In order to prevent the occurrence of deadlock, we must first lock the old table, and then lock the new table
std::lock_guard<std::mutex> lock(sc_params.new_tablet->get_push_lock());
RowsetSharedPtr new_rowset = rowset_writer->build();
if (new_rowset == nullptr) {
LOG(WARNING) << "failed to build rowset, exit alter process";
return process_alter_exit();
}
res = sc_params.new_tablet->add_rowset(new_rowset);
if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "version already exist, version revert occurred. "
<< "tablet=" << sc_params.new_tablet->full_name() << ", version='"
<< rs_reader->version().first << "-" << rs_reader->version().second;
StorageEngine::instance()->add_unused_rowset(new_rowset);
res = Status::OK();
} else if (!res) {
LOG(WARNING) << "failed to register new version. "
<< " tablet=" << sc_params.new_tablet->full_name()
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
StorageEngine::instance()->add_unused_rowset(new_rowset);
return process_alter_exit();
} else {
VLOG_NOTICE << "register new version. tablet=" << sc_params.new_tablet->full_name()
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
VLOG_TRACE << "succeed to convert a history version."
<< " version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
// XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version
return process_alter_exit();
}
// @static
// Analyze the mapping of the column and the mapping of the filter key
Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
RowBlockChanger* rb_changer, bool* sc_sorting,
bool* sc_directly) {
TabletSharedPtr base_tablet = sc_params.base_tablet;
TabletSharedPtr new_tablet = sc_params.new_tablet;
TabletSchemaSPtr base_tablet_schema = sc_params.base_tablet_schema;
const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map =
sc_params.materialized_params_map;
DescriptorTbl desc_tbl = *sc_params.desc_tbl;
// set column mapping
for (int i = 0, new_schema_size = new_tablet->tablet_schema()->num_columns();
i < new_schema_size; ++i) {
const TabletColumn& new_column = new_tablet->tablet_schema()->column(i);
const string& column_name = new_column.name();
ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i);
column_mapping->new_column = &new_column;
if (materialized_function_map.find(column_name) != materialized_function_map.end()) {
auto mvParam = materialized_function_map.find(column_name)->second;
column_mapping->materialized_function = mvParam.mv_expr;
column_mapping->expr = mvParam.expr;
int32_t column_index = base_tablet_schema->field_index(mvParam.origin_column_name);
if (column_index >= 0) {
column_mapping->ref_column = column_index;
continue;
} else {
LOG(WARNING) << "referenced column was missing. "
<< "[column=" << column_name << " referenced_column=" << column_index
<< "]";
return Status::OLAPInternalError(OLAP_ERR_CE_CMD_PARAMS_ERROR);
}
}
int32_t column_index = base_tablet_schema->field_index(column_name);
if (column_index >= 0) {
column_mapping->ref_column = column_index;
continue;
}
// Newly added column go here
column_mapping->ref_column = -1;
if (i < base_tablet_schema->num_short_key_columns()) {
*sc_directly = true;
}
RETURN_IF_ERROR(
_init_column_mapping(column_mapping, new_column, new_column.default_value()));
VLOG_TRACE << "A column with default value will be added after schema changing. "
<< "column=" << column_name << ", default_value=" << new_column.default_value();
}
// Check if re-aggregation is needed.
*sc_sorting = false;
// If the reference sequence of the Key column is out of order, it needs to be reordered
int num_default_value = 0;
for (int i = 0, new_schema_size = new_tablet->num_key_columns(); i < new_schema_size; ++i) {
ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i);
if (column_mapping->ref_column < 0) {
num_default_value++;
continue;
}
if (column_mapping->ref_column != i - num_default_value) {
*sc_sorting = true;
return Status::OK();
}
}
TabletSchemaSPtr new_tablet_schema = new_tablet->tablet_schema();
if (base_tablet_schema->keys_type() != new_tablet_schema->keys_type()) {
// only when base table is dup and mv is agg
// the rollup job must be reagg.
*sc_sorting = true;
return Status::OK();
}
// If the sort of key has not been changed but the new keys num is less then base's,
// the new table should be re agg.
// So we also need to set sc_sorting = true.
// A, B, C are keys(sort keys), D is value
// followings need resort:
// old keys: A B C D
// new keys: A B
if (new_tablet_schema->keys_type() != KeysType::DUP_KEYS &&
new_tablet->num_key_columns() < base_tablet_schema->num_key_columns()) {
// this is a table with aggregate key type, and num of key columns in new schema
// is less, which means the data in new tablet should be more aggregated.
// so we use sorting schema change to sort and merge the data.
*sc_sorting = true;
return Status::OK();
}
if (base_tablet_schema->num_short_key_columns() != new_tablet->num_short_key_columns()) {
// the number of short_keys changed, can't do linked schema change
*sc_directly = true;
return Status::OK();
}
for (size_t i = 0; i < new_tablet->num_columns(); ++i) {
ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i);
if (column_mapping->ref_column < 0) {
continue;
} else {
auto column_new = new_tablet_schema->column(i);
auto column_old = base_tablet_schema->column(column_mapping->ref_column);
if (column_new.type() != column_old.type() ||
column_new.precision() != column_old.precision() ||
column_new.frac() != column_old.frac() ||
column_new.length() != column_old.length() ||
column_new.is_bf_column() != column_old.is_bf_column() ||
column_new.has_bitmap_index() != column_old.has_bitmap_index()) {
*sc_directly = true;
return Status::OK();
}
}
}
if (!sc_params.delete_handler->empty()) {
// there exists delete condition in header, can't do linked schema change
*sc_directly = true;
}
if (base_tablet->tablet_meta()->preferred_rowset_type() !=
new_tablet->tablet_meta()->preferred_rowset_type()) {
// If the base_tablet and new_tablet rowset types are different, just use directly type
*sc_directly = true;
}
// if rs_reader has remote files, link schema change is not supported,
// use directly schema change instead.
if (!(*sc_directly) && !(*sc_sorting)) {
// check has remote rowset
for (auto& rs_reader : sc_params.ref_rowset_readers) {
if (!rs_reader->rowset()->is_local()) {
*sc_directly = true;
break;
}
}
}
return Status::OK();
}
Status SchemaChangeHandler::_init_column_mapping(ColumnMapping* column_mapping,
const TabletColumn& column_schema,
const std::string& value) {
column_mapping->default_value = WrapperField::create(column_schema);
if (column_mapping->default_value == nullptr) {
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
}
if (column_schema.is_nullable() && value.length() == 0) {
column_mapping->default_value->set_null();
} else {
column_mapping->default_value->from_string(value, column_schema.precision(),
column_schema.frac());
}
return Status::OK();
}
Status SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet,
const TAlterTabletReqV2& request) {
Version max_continuous_version = {-1, 0};
new_tablet->max_continuous_version_from_beginning(&max_continuous_version);
LOG(INFO) << "find max continuous version of tablet=" << new_tablet->full_name()
<< ", start_version=" << max_continuous_version.first
<< ", end_version=" << max_continuous_version.second;
if (max_continuous_version.second < request.alter_version) {
return Status::InternalError("result version={} is less than request version={}",
max_continuous_version.second, request.alter_version);
}
std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
{
std::shared_lock rdlock(new_tablet->get_header_lock());
new_tablet->acquire_version_and_rowsets(&version_rowsets);
}
for (auto& pair : version_rowsets) {
RowsetSharedPtr rowset = pair.second;
if (!rowset->check_file_exist()) {
return Status::OLAPInternalError(OLAP_ERR_FILE_NOT_EXIST);
}
}
return Status::OK();
}
} // namespace doris