[Feature] [Vectorized] support vectorized schema-change (#10187)
This commit is contained in:
@ -245,7 +245,7 @@ CONF_Bool(enable_low_cardinality_optimize, "true");
|
||||
CONF_mBool(disable_auto_compaction, "false");
|
||||
// whether enable vectorized compaction
|
||||
CONF_Bool(enable_vectorized_compaction, "true");
|
||||
// whether enable vectorized schema change
|
||||
// whether enable vectorized schema change, material-view or rollup task will fail if this config open.
|
||||
CONF_Bool(enable_vectorized_alter_table, "false");
|
||||
|
||||
// check the configuration of auto compaction in seconds when auto compaction disabled
|
||||
|
||||
@ -20,6 +20,8 @@
|
||||
#include <gen_cpp/Exprs_types.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "olap/tablet_schema.h"
|
||||
namespace doris {
|
||||
|
||||
class WrapperField;
|
||||
@ -36,8 +38,9 @@ struct ColumnMapping {
|
||||
// materialize view transform function used in schema change
|
||||
std::string materialized_function;
|
||||
std::shared_ptr<TExpr> expr;
|
||||
const TabletColumn* new_column;
|
||||
};
|
||||
|
||||
using SchemaMapping = std::vector<ColumnMapping>;
|
||||
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
@ -148,7 +148,7 @@ Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) {
|
||||
|
||||
Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
|
||||
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
|
||||
BlockCompressionCodec* codec) {
|
||||
BlockCompressionCodec* codec) const {
|
||||
iter_opts.sanity_check();
|
||||
PageReadOptions opts;
|
||||
opts.rblock = iter_opts.rblock;
|
||||
@ -847,74 +847,70 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, b
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void DefaultValueColumnIterator::insert_default_data(vectorized::MutableColumnPtr& dst, size_t n) {
|
||||
void DefaultValueColumnIterator::insert_default_data(const TypeInfo* type_info, size_t type_size,
|
||||
void* mem_value,
|
||||
vectorized::MutableColumnPtr& dst, size_t n) {
|
||||
vectorized::Int128 int128;
|
||||
char* data_ptr = (char*)&int128;
|
||||
size_t data_len = sizeof(int128);
|
||||
|
||||
auto insert_column_data = [&]() {
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
dst->insert_data(data_ptr, data_len);
|
||||
}
|
||||
};
|
||||
|
||||
switch (_type_info->type()) {
|
||||
switch (type_info->type()) {
|
||||
case OLAP_FIELD_TYPE_OBJECT:
|
||||
case OLAP_FIELD_TYPE_HLL: {
|
||||
dst->insert_many_defaults(n);
|
||||
break;
|
||||
}
|
||||
case OLAP_FIELD_TYPE_DATE: {
|
||||
assert(_type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::CppType)); //uint24_t
|
||||
std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::to_string(_mem_value);
|
||||
assert(type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::CppType)); //uint24_t
|
||||
std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::to_string(mem_value);
|
||||
|
||||
vectorized::VecDateTimeValue value;
|
||||
value.from_date_str(str.c_str(), str.length());
|
||||
value.cast_to_date();
|
||||
//TODO: here is int128 = int64, here rely on the logic of little endian
|
||||
int128 = binary_cast<vectorized::VecDateTimeValue, vectorized::Int64>(value);
|
||||
insert_column_data();
|
||||
dst->insert_many_data(data_ptr, data_len, n);
|
||||
break;
|
||||
}
|
||||
case OLAP_FIELD_TYPE_DATETIME: {
|
||||
assert(_type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::CppType)); //int64_t
|
||||
std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::to_string(_mem_value);
|
||||
assert(type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::CppType)); //int64_t
|
||||
std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::to_string(mem_value);
|
||||
|
||||
vectorized::VecDateTimeValue value;
|
||||
value.from_date_str(str.c_str(), str.length());
|
||||
value.to_datetime();
|
||||
|
||||
int128 = binary_cast<vectorized::VecDateTimeValue, vectorized::Int64>(value);
|
||||
insert_column_data();
|
||||
dst->insert_many_data(data_ptr, data_len, n);
|
||||
break;
|
||||
}
|
||||
case OLAP_FIELD_TYPE_DATEV2: {
|
||||
assert(_type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType)); //uint32_t
|
||||
assert(type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType)); //uint32_t
|
||||
|
||||
int128 = *((FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType*)_mem_value);
|
||||
insert_column_data();
|
||||
int128 = *((FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType*)mem_value);
|
||||
dst->insert_many_data(data_ptr, data_len, n);
|
||||
break;
|
||||
}
|
||||
case OLAP_FIELD_TYPE_DECIMAL: {
|
||||
assert(_type_size ==
|
||||
assert(type_size ==
|
||||
sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DECIMAL>::CppType)); //decimal12_t
|
||||
decimal12_t* d = (decimal12_t*)_mem_value;
|
||||
decimal12_t* d = (decimal12_t*)mem_value;
|
||||
int128 = DecimalV2Value(d->integer, d->fraction).value();
|
||||
insert_column_data();
|
||||
dst->insert_many_data(data_ptr, data_len, n);
|
||||
break;
|
||||
}
|
||||
case OLAP_FIELD_TYPE_STRING:
|
||||
case OLAP_FIELD_TYPE_VARCHAR:
|
||||
case OLAP_FIELD_TYPE_CHAR: {
|
||||
data_ptr = ((Slice*)_mem_value)->data;
|
||||
data_len = ((Slice*)_mem_value)->size;
|
||||
insert_column_data();
|
||||
data_ptr = ((Slice*)mem_value)->data;
|
||||
data_len = ((Slice*)mem_value)->size;
|
||||
dst->insert_many_data(data_ptr, data_len, n);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
data_ptr = (char*)_mem_value;
|
||||
data_len = _type_size;
|
||||
insert_column_data();
|
||||
data_ptr = (char*)mem_value;
|
||||
data_len = type_size;
|
||||
dst->insert_many_data(data_ptr, data_len, n);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -926,7 +922,7 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, vectorized::MutableColu
|
||||
dst->insert_many_defaults(*n);
|
||||
} else {
|
||||
*has_null = false;
|
||||
insert_default_data(dst, *n);
|
||||
insert_default_data(_type_info.get(), _type_size, _mem_value, dst, *n);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
||||
@ -106,7 +106,7 @@ public:
|
||||
// read a page from file into a page handle
|
||||
Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
|
||||
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
|
||||
BlockCompressionCodec* codec);
|
||||
BlockCompressionCodec* codec) const;
|
||||
|
||||
bool is_nullable() const { return _meta.is_nullable(); }
|
||||
|
||||
@ -449,9 +449,10 @@ public:
|
||||
|
||||
ordinal_t get_current_ordinal() const override { return _current_rowid; }
|
||||
|
||||
private:
|
||||
void insert_default_data(vectorized::MutableColumnPtr& dst, size_t n);
|
||||
static void insert_default_data(const TypeInfo* type_info, size_t type_size, void* mem_value,
|
||||
vectorized::MutableColumnPtr& dst, size_t n);
|
||||
|
||||
private:
|
||||
bool _has_default_value;
|
||||
std::string _default_value;
|
||||
bool _is_nullable;
|
||||
|
||||
@ -26,9 +26,10 @@
|
||||
#include "olap/row.h"
|
||||
#include "olap/row_block.h"
|
||||
#include "olap/row_cursor.h"
|
||||
#include "olap/rowset/rowset_id_generator.h"
|
||||
#include "olap/rowset/segment_v2/column_reader.h"
|
||||
#include "olap/storage_engine.h"
|
||||
#include "olap/tablet.h"
|
||||
#include "olap/types.h"
|
||||
#include "olap/wrapper_field.h"
|
||||
#include "runtime/mem_tracker.h"
|
||||
#include "util/defer_op.h"
|
||||
@ -91,6 +92,154 @@ private:
|
||||
std::priority_queue<MergeElement> _heap;
|
||||
};
|
||||
|
||||
class MultiBlockMerger {
|
||||
public:
|
||||
MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
|
||||
|
||||
Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
|
||||
RowsetWriter* rowset_writer, uint64_t* merged_rows) {
|
||||
int rows = 0;
|
||||
for (auto& block : blocks) {
|
||||
rows += block->rows();
|
||||
}
|
||||
if (!rows) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
std::vector<RowRef> row_refs;
|
||||
row_refs.reserve(rows);
|
||||
for (auto& block : blocks) {
|
||||
for (uint16_t i = 0; i < block->rows(); i++) {
|
||||
row_refs.emplace_back(block.get(), i);
|
||||
}
|
||||
}
|
||||
// TODO: try to use pdqsort to replace std::sort
|
||||
// The block version is incremental.
|
||||
std::stable_sort(row_refs.begin(), row_refs.end(), _cmp);
|
||||
|
||||
auto finalized_block = _tablet->tablet_schema().create_block();
|
||||
int columns = finalized_block.columns();
|
||||
*merged_rows += rows;
|
||||
|
||||
if (_tablet->keys_type() == KeysType::AGG_KEYS) {
|
||||
auto tablet_schema = _tablet->tablet_schema();
|
||||
int key_number = _tablet->num_key_columns();
|
||||
|
||||
std::vector<vectorized::AggregateFunctionPtr> agg_functions;
|
||||
std::vector<vectorized::AggregateDataPtr> agg_places;
|
||||
|
||||
for (int i = key_number; i < columns; i++) {
|
||||
vectorized::AggregateFunctionPtr function =
|
||||
tablet_schema.column(i).get_aggregate_function(
|
||||
{finalized_block.get_data_type(i)}, vectorized::AGG_LOAD_SUFFIX);
|
||||
agg_functions.push_back(function);
|
||||
// create aggregate data
|
||||
vectorized::AggregateDataPtr place = new char[function->size_of_data()];
|
||||
function->create(place);
|
||||
agg_places.push_back(place);
|
||||
}
|
||||
|
||||
for (int i = 0; i < rows; i++) {
|
||||
auto row_ref = row_refs[i];
|
||||
|
||||
for (int j = key_number; j < columns; j++) {
|
||||
auto column_ptr = row_ref.get_column(j).get();
|
||||
agg_functions[j - key_number]->add(
|
||||
agg_places[j - key_number],
|
||||
const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position,
|
||||
nullptr);
|
||||
}
|
||||
|
||||
if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
|
||||
for (int j = 0; j < key_number; j++) {
|
||||
finalized_block.get_by_position(j).column->assume_mutable()->insert_from(
|
||||
*row_ref.get_column(j), row_ref.position);
|
||||
}
|
||||
|
||||
for (int j = key_number; j < columns; j++) {
|
||||
agg_functions[j - key_number]->insert_result_into(
|
||||
agg_places[j - key_number],
|
||||
finalized_block.get_by_position(j).column->assume_mutable_ref());
|
||||
agg_functions[j - key_number]->create(agg_places[j - key_number]);
|
||||
}
|
||||
|
||||
if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) {
|
||||
*merged_rows -= finalized_block.rows();
|
||||
rowset_writer->add_block(&finalized_block);
|
||||
finalized_block.clear_column_data();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < columns - key_number; i++) {
|
||||
agg_functions[i]->destroy(agg_places[i]);
|
||||
delete[] agg_places[i];
|
||||
}
|
||||
} else {
|
||||
std::vector<RowRef> pushed_row_refs;
|
||||
if (_tablet->keys_type() == KeysType::DUP_KEYS) {
|
||||
std::swap(pushed_row_refs, row_refs);
|
||||
} else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
|
||||
for (int i = 0; i < rows; i++) {
|
||||
if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
|
||||
pushed_row_refs.push_back(row_refs[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update real inserted row number
|
||||
rows = pushed_row_refs.size();
|
||||
*merged_rows -= rows;
|
||||
|
||||
for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) {
|
||||
int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i);
|
||||
|
||||
for (int idx = 0; idx < columns; idx++) {
|
||||
auto column = finalized_block.get_by_position(idx).column->assume_mutable();
|
||||
|
||||
for (int j = 0; j < limit; j++) {
|
||||
auto row_ref = pushed_row_refs[i + j];
|
||||
column->insert_from(*row_ref.get_column(idx), row_ref.position);
|
||||
}
|
||||
}
|
||||
rowset_writer->add_block(&finalized_block);
|
||||
finalized_block.clear_column_data();
|
||||
}
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(rowset_writer->flush());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
struct RowRef {
|
||||
RowRef(vectorized::Block* block_, uint16_t position_)
|
||||
: block(block_), position(position_) {}
|
||||
vectorized::ColumnPtr get_column(int index) const {
|
||||
return block->get_by_position(index).column;
|
||||
}
|
||||
const vectorized::Block* block;
|
||||
uint16_t position;
|
||||
};
|
||||
|
||||
struct RowRefComparator {
|
||||
RowRefComparator(TabletSharedPtr tablet) : _num_columns(tablet->num_key_columns()) {}
|
||||
|
||||
int compare(const RowRef& lhs, const RowRef& rhs) const {
|
||||
return lhs.block->compare_at(lhs.position, rhs.position, _num_columns, *rhs.block, -1);
|
||||
}
|
||||
|
||||
bool operator()(const RowRef& lhs, const RowRef& rhs) const {
|
||||
return compare(lhs, rhs) < 0;
|
||||
}
|
||||
|
||||
const size_t _num_columns;
|
||||
};
|
||||
|
||||
TabletSharedPtr _tablet;
|
||||
RowRefComparator _cmp;
|
||||
};
|
||||
|
||||
RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, DescriptorTbl desc_tbl)
|
||||
: _desc_tbl(desc_tbl) {
|
||||
_schema_mapping.resize(tablet_schema.num_columns());
|
||||
@ -665,6 +814,128 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
|
||||
#undef TYPE_REINTERPRET_CAST
|
||||
#undef ASSIGN_DEFAULT_VALUE
|
||||
|
||||
Status RowBlockChanger::change_block(vectorized::Block* ref_block,
|
||||
vectorized::Block* new_block) const {
|
||||
if (new_block->columns() != _schema_mapping.size()) {
|
||||
LOG(WARNING) << "block does not match with schema mapping rules. "
|
||||
<< "block_schema_size=" << new_block->columns()
|
||||
<< ", mapping_schema_size=" << _schema_mapping.size();
|
||||
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
|
||||
}
|
||||
|
||||
// material-view or rollup task will fail now
|
||||
if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) {
|
||||
return Status::NotSupported(
|
||||
"_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup "
|
||||
"not supported now. ");
|
||||
}
|
||||
|
||||
std::vector<bool> nullable_tuples;
|
||||
for (int i = 0; i < ref_block->columns(); i++) {
|
||||
nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable());
|
||||
}
|
||||
|
||||
ObjectPool pool;
|
||||
RuntimeState* state = pool.add(new RuntimeState());
|
||||
state->set_desc_tbl(&_desc_tbl);
|
||||
RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples);
|
||||
|
||||
const int row_size = ref_block->rows();
|
||||
const int column_size = new_block->columns();
|
||||
|
||||
// swap ref_block[key] and new_block[value]
|
||||
std::map<int, int> swap_idx_map;
|
||||
|
||||
for (int idx = 0; idx < column_size; idx++) {
|
||||
int ref_idx = _schema_mapping[idx].ref_column;
|
||||
|
||||
if (!_schema_mapping[idx].materialized_function.empty()) {
|
||||
return Status::NotSupported("Materialized function not supported now. ");
|
||||
}
|
||||
|
||||
if (ref_idx < 0) {
|
||||
// new column, write default value
|
||||
auto value = _schema_mapping[idx].default_value;
|
||||
auto column = new_block->get_by_position(idx).column->assume_mutable();
|
||||
if (value->is_null()) {
|
||||
DCHECK(column->is_nullable());
|
||||
column->insert_many_defaults(row_size);
|
||||
} else {
|
||||
auto type_info = get_type_info(_schema_mapping[idx].new_column);
|
||||
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
|
||||
value->ptr(), column, row_size);
|
||||
}
|
||||
} else if (_schema_mapping[idx].expr != nullptr) {
|
||||
// calculate special materialized function, to_bitmap/hll_hash/count_field or cast expr
|
||||
vectorized::VExprContext* ctx = nullptr;
|
||||
RETURN_IF_ERROR(
|
||||
vectorized::VExpr::create_expr_tree(&pool, *_schema_mapping[idx].expr, &ctx));
|
||||
|
||||
RETURN_IF_ERROR(ctx->prepare(state, row_desc));
|
||||
RETURN_IF_ERROR(ctx->open(state));
|
||||
|
||||
int result_column_id = -1;
|
||||
RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
|
||||
DCHECK(ref_block->get_by_position(result_column_id).column->size() == row_size)
|
||||
<< new_block->get_by_position(idx).name << " size invalid"
|
||||
<< ", expect=" << row_size
|
||||
<< ", real=" << ref_block->get_by_position(result_column_id).column->size();
|
||||
|
||||
if (_schema_mapping[idx].expr->nodes[0].node_type == TExprNodeType::CAST_EXPR) {
|
||||
RETURN_IF_ERROR(
|
||||
_check_cast_valid(ref_block->get_by_position(ref_idx).column,
|
||||
ref_block->get_by_position(result_column_id).column));
|
||||
}
|
||||
swap_idx_map[result_column_id] = idx;
|
||||
|
||||
ctx->close(state);
|
||||
} else {
|
||||
// same type, just swap column
|
||||
swap_idx_map[ref_idx] = idx;
|
||||
}
|
||||
}
|
||||
|
||||
for (auto it : swap_idx_map) {
|
||||
new_block->get_by_position(it.second).column.swap(
|
||||
ref_block->get_by_position(it.first).column);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status RowBlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
|
||||
vectorized::ColumnPtr new_column) const {
|
||||
// TODO: rethink this check
|
||||
// This check is to prevent schema-change from causing data loss,
|
||||
// But it is possible to generate null data in material-view or rollup.
|
||||
|
||||
if (ref_column->is_nullable() != new_column->is_nullable()) {
|
||||
return Status::DataQualityError("column.is_nullable() is changed!");
|
||||
}
|
||||
|
||||
if (ref_column->is_nullable()) {
|
||||
auto* ref_null_map =
|
||||
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
|
||||
->get_null_map_column()
|
||||
.get_data()
|
||||
.data();
|
||||
auto* new_null_map =
|
||||
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
|
||||
->get_null_map_column()
|
||||
.get_data()
|
||||
.data();
|
||||
|
||||
bool is_changed = false;
|
||||
for (size_t i = 0; i < ref_column->size(); i++) {
|
||||
is_changed |= (ref_null_map[i] != new_null_map[i]);
|
||||
}
|
||||
if (is_changed) {
|
||||
return Status::DataQualityError("is_null of data is changed!");
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
RowBlockSorter::RowBlockSorter(RowBlockAllocator* row_block_allocator)
|
||||
: _row_block_allocator(row_block_allocator), _swap_row_block(nullptr) {}
|
||||
|
||||
@ -1050,6 +1321,34 @@ Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
|
||||
return res;
|
||||
}
|
||||
|
||||
Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
|
||||
RowsetWriter* rowset_writer,
|
||||
TabletSharedPtr new_tablet,
|
||||
TabletSharedPtr base_tablet) {
|
||||
auto new_block =
|
||||
std::make_unique<vectorized::Block>(new_tablet->tablet_schema().create_block());
|
||||
auto ref_block =
|
||||
std::make_unique<vectorized::Block>(base_tablet->tablet_schema().create_block());
|
||||
|
||||
int origin_columns_size = ref_block->columns();
|
||||
|
||||
rowset_reader->next_block(ref_block.get());
|
||||
while (ref_block->rows()) {
|
||||
RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
|
||||
RETURN_IF_ERROR(rowset_writer->add_block(new_block.get()));
|
||||
|
||||
new_block->clear_column_data();
|
||||
ref_block->clear_column_data(origin_columns_size);
|
||||
rowset_reader->next_block(ref_block.get());
|
||||
}
|
||||
|
||||
if (!rowset_writer->flush()) {
|
||||
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
|
||||
size_t memory_limitation)
|
||||
: _row_block_changer(row_block_changer),
|
||||
@ -1062,6 +1361,18 @@ SchemaChangeWithSorting::~SchemaChangeWithSorting() {
|
||||
SAFE_DELETE(_row_block_allocator);
|
||||
}
|
||||
|
||||
VSchemaChangeWithSorting::VSchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
|
||||
size_t memory_limitation)
|
||||
: _changer(row_block_changer),
|
||||
_memory_limitation(memory_limitation),
|
||||
_temp_delta_versions(Version::mock()) {
|
||||
_mem_tracker = MemTracker::create_tracker(
|
||||
config::memory_limitation_per_thread_for_schema_change_bytes,
|
||||
fmt::format("VSchemaChangeWithSorting:changer={}",
|
||||
std::to_string(int64(&row_block_changer))),
|
||||
StorageEngine::instance()->schema_change_mem_tracker(), MemTrackerLevel::TASK);
|
||||
}
|
||||
|
||||
Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
|
||||
RowsetWriter* rowset_writer,
|
||||
TabletSharedPtr new_tablet,
|
||||
@ -1219,6 +1530,89 @@ Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_read
|
||||
return res;
|
||||
}
|
||||
|
||||
Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
|
||||
RowsetWriter* rowset_writer,
|
||||
TabletSharedPtr new_tablet,
|
||||
TabletSharedPtr base_tablet) {
|
||||
// for internal sorting
|
||||
std::vector<std::unique_ptr<vectorized::Block>> blocks;
|
||||
|
||||
// for external sorting
|
||||
// src_rowsets to store the rowset generated by internal sorting
|
||||
std::vector<RowsetSharedPtr> src_rowsets;
|
||||
|
||||
Defer defer {[&]() {
|
||||
// remove the intermediate rowsets generated by internal sorting
|
||||
for (auto& row_set : src_rowsets) {
|
||||
StorageEngine::instance()->add_unused_rowset(row_set);
|
||||
}
|
||||
}};
|
||||
|
||||
RowsetSharedPtr rowset = rowset_reader->rowset();
|
||||
SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
|
||||
_temp_delta_versions.first = _temp_delta_versions.second;
|
||||
|
||||
auto new_block =
|
||||
std::make_unique<vectorized::Block>(new_tablet->tablet_schema().create_block());
|
||||
auto ref_block =
|
||||
std::make_unique<vectorized::Block>(base_tablet->tablet_schema().create_block());
|
||||
|
||||
int origin_columns_size = ref_block->columns();
|
||||
|
||||
auto create_rowset = [&]() -> Status {
|
||||
if (blocks.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
RowsetSharedPtr rowset;
|
||||
RETURN_IF_ERROR(_internal_sorting(
|
||||
blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second),
|
||||
new_tablet, BETA_ROWSET, segments_overlap, &rowset));
|
||||
src_rowsets.push_back(rowset);
|
||||
|
||||
for (auto& block : blocks) {
|
||||
_mem_tracker->release(block->allocated_bytes());
|
||||
}
|
||||
blocks.clear();
|
||||
|
||||
// increase temp version
|
||||
_temp_delta_versions.second++;
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
rowset_reader->next_block(ref_block.get());
|
||||
while (ref_block->rows()) {
|
||||
RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
|
||||
if (!_mem_tracker->try_consume(new_block->allocated_bytes())) {
|
||||
RETURN_IF_ERROR(create_rowset());
|
||||
|
||||
if (!_mem_tracker->try_consume(new_block->allocated_bytes())) {
|
||||
LOG(WARNING) << "Memory limitation is too small for Schema Change."
|
||||
<< "memory_limitation=" << _memory_limitation;
|
||||
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
// move unique ptr
|
||||
blocks.push_back(
|
||||
std::make_unique<vectorized::Block>(new_tablet->tablet_schema().create_block()));
|
||||
swap(blocks.back(), new_block);
|
||||
|
||||
ref_block->clear_column_data(origin_columns_size);
|
||||
rowset_reader->next_block(ref_block.get());
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(create_rowset());
|
||||
|
||||
if (src_rowsets.empty()) {
|
||||
RETURN_IF_ERROR(rowset_writer->flush());
|
||||
} else {
|
||||
RETURN_IF_ERROR(_external_sorting(src_rowsets, rowset_writer, new_tablet));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& row_block_arr,
|
||||
const Version& version, TabletSharedPtr new_tablet,
|
||||
SegmentsOverlapPB segments_overlap,
|
||||
@ -1247,6 +1641,29 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro
|
||||
return true;
|
||||
}
|
||||
|
||||
Status VSchemaChangeWithSorting::_internal_sorting(
|
||||
const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
|
||||
TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type,
|
||||
SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) {
|
||||
uint64_t merged_rows = 0;
|
||||
MultiBlockMerger merger(new_tablet);
|
||||
|
||||
std::unique_ptr<RowsetWriter> rowset_writer;
|
||||
RETURN_IF_ERROR(
|
||||
new_tablet->create_rowset_writer(version, VISIBLE, segments_overlap, &rowset_writer));
|
||||
|
||||
Defer defer {[&]() {
|
||||
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
|
||||
rowset_writer->rowset_id().to_string());
|
||||
}};
|
||||
|
||||
RETURN_IF_ERROR(merger.merge(blocks, rowset_writer.get(), &merged_rows));
|
||||
|
||||
_add_merged_rows(merged_rows);
|
||||
*rowset = rowset_writer->build();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
|
||||
RowsetWriter* rowset_writer,
|
||||
TabletSharedPtr new_tablet) {
|
||||
@ -1275,6 +1692,25 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
|
||||
return true;
|
||||
}
|
||||
|
||||
Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
|
||||
RowsetWriter* rowset_writer,
|
||||
TabletSharedPtr new_tablet) {
|
||||
std::vector<RowsetReaderSharedPtr> rs_readers;
|
||||
for (auto& rowset : src_rowsets) {
|
||||
RowsetReaderSharedPtr rs_reader;
|
||||
RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
|
||||
rs_readers.push_back(rs_reader);
|
||||
}
|
||||
|
||||
Merger::Statistics stats;
|
||||
RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers,
|
||||
rowset_writer, &stats));
|
||||
|
||||
_add_merged_rows(stats.merged_rows);
|
||||
_add_filtered_rows(stats.filtered_rows);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) {
|
||||
LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
|
||||
<< ", new_tablet_id=" << request.new_tablet_id
|
||||
|
||||
@ -52,7 +52,12 @@ public:
|
||||
Status change_row_block(const RowBlock* ref_block, int32_t data_version,
|
||||
RowBlock* mutable_block, uint64_t* filtered_rows) const;
|
||||
|
||||
Status change_block(vectorized::Block* ref_block, vectorized::Block* new_block) const;
|
||||
|
||||
private:
|
||||
Status _check_cast_valid(vectorized::ColumnPtr ref_column,
|
||||
vectorized::ColumnPtr new_column) const;
|
||||
|
||||
// @brief column-mapping specification of new schema
|
||||
SchemaMapping _schema_mapping;
|
||||
|
||||
@ -180,6 +185,17 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(SchemaChangeDirectly);
|
||||
};
|
||||
|
||||
class VSchemaChangeDirectly : public SchemaChange {
|
||||
public:
|
||||
VSchemaChangeDirectly(const RowBlockChanger& row_block_changer) : _changer(row_block_changer) {}
|
||||
|
||||
private:
|
||||
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
|
||||
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
|
||||
|
||||
const RowBlockChanger& _changer;
|
||||
};
|
||||
|
||||
// @breif schema change with sorting
|
||||
class SchemaChangeWithSorting : public SchemaChange {
|
||||
public:
|
||||
@ -206,6 +222,29 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(SchemaChangeWithSorting);
|
||||
};
|
||||
|
||||
class VSchemaChangeWithSorting : public SchemaChange {
|
||||
public:
|
||||
VSchemaChangeWithSorting(const RowBlockChanger& row_block_changer, size_t memory_limitation);
|
||||
~VSchemaChangeWithSorting() override = default;
|
||||
|
||||
private:
|
||||
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
|
||||
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
|
||||
|
||||
Status _internal_sorting(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
|
||||
const Version& temp_delta_versions, TabletSharedPtr new_tablet,
|
||||
RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap,
|
||||
RowsetSharedPtr* rowset);
|
||||
|
||||
Status _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer,
|
||||
TabletSharedPtr new_tablet);
|
||||
|
||||
const RowBlockChanger& _changer;
|
||||
size_t _memory_limitation;
|
||||
Version _temp_delta_versions;
|
||||
std::shared_ptr<MemTracker> _mem_tracker;
|
||||
};
|
||||
|
||||
class SchemaChangeHandler {
|
||||
public:
|
||||
static Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
|
||||
@ -218,12 +257,23 @@ public:
|
||||
static std::unique_ptr<SchemaChange> get_sc_procedure(const RowBlockChanger& rb_changer,
|
||||
bool sc_sorting, bool sc_directly) {
|
||||
if (sc_sorting) {
|
||||
return std::make_unique<SchemaChangeWithSorting>(
|
||||
rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
|
||||
if (config::enable_vectorized_alter_table) {
|
||||
return std::make_unique<VSchemaChangeWithSorting>(
|
||||
rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
|
||||
} else {
|
||||
return std::make_unique<SchemaChangeWithSorting>(
|
||||
rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
if (sc_directly) {
|
||||
return std::make_unique<SchemaChangeDirectly>(rb_changer);
|
||||
if (config::enable_vectorized_alter_table) {
|
||||
return std::make_unique<VSchemaChangeDirectly>(rb_changer);
|
||||
} else {
|
||||
return std::make_unique<SchemaChangeDirectly>(rb_changer);
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_unique<LinkedSchemaChange>(rb_changer);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user