[bugfix](schema_change) Fix the coredump when doubly write during schema change (#22557)
This commit is contained in:
@ -32,9 +32,11 @@
|
||||
#include "common/exception.h"
|
||||
#include "common/status.h"
|
||||
#include "olap/tablet_schema.h"
|
||||
#include "runtime/define_primitive_type.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/large_int_value.h"
|
||||
#include "runtime/memory/mem_tracker.h"
|
||||
#include "runtime/primitive_type.h"
|
||||
#include "runtime/raw_value.h"
|
||||
#include "runtime/types.h"
|
||||
#include "util/hash_util.hpp"
|
||||
@ -128,30 +130,33 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
|
||||
for (auto& col : pschema.partial_update_input_columns()) {
|
||||
_partial_update_input_columns.insert(col);
|
||||
}
|
||||
std::map<std::string, SlotDescriptor*> slots_map;
|
||||
std::unordered_map<std::pair<std::string, std::string>, SlotDescriptor*> slots_map;
|
||||
_tuple_desc = _obj_pool.add(new TupleDescriptor(pschema.tuple_desc()));
|
||||
|
||||
for (auto& p_slot_desc : pschema.slot_descs()) {
|
||||
auto slot_desc = _obj_pool.add(new SlotDescriptor(p_slot_desc));
|
||||
_tuple_desc->add_slot(slot_desc);
|
||||
slots_map.emplace(slot_desc->col_name(), slot_desc);
|
||||
string data_type;
|
||||
EnumToString(TPrimitiveType, to_thrift(slot_desc->col_type()), data_type);
|
||||
slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()), std::move(data_type)),
|
||||
slot_desc);
|
||||
}
|
||||
|
||||
for (auto& p_index : pschema.indexes()) {
|
||||
auto index = _obj_pool.add(new OlapTableIndexSchema());
|
||||
index->index_id = p_index.id();
|
||||
index->schema_hash = p_index.schema_hash();
|
||||
for (auto& col : p_index.columns()) {
|
||||
if (_is_partial_update && _partial_update_input_columns.count(col) == 0) {
|
||||
continue;
|
||||
}
|
||||
auto it = slots_map.find(col);
|
||||
if (it == std::end(slots_map)) {
|
||||
return Status::InternalError("unknown index column, column={}", col);
|
||||
}
|
||||
index->slots.emplace_back(it->second);
|
||||
}
|
||||
for (auto& pcolumn_desc : p_index.columns_desc()) {
|
||||
if (!_is_partial_update ||
|
||||
_partial_update_input_columns.count(pcolumn_desc.name()) > 0) {
|
||||
auto it = slots_map.find(
|
||||
std::make_pair(to_lower(pcolumn_desc.name()), pcolumn_desc.type()));
|
||||
if (it == std::end(slots_map)) {
|
||||
return Status::InternalError("unknown index column, column={}, type={}",
|
||||
pcolumn_desc.name(), pcolumn_desc.type());
|
||||
}
|
||||
index->slots.emplace_back(it->second);
|
||||
}
|
||||
TabletColumn* tc = _obj_pool.add(new TabletColumn());
|
||||
tc->init_from_pb(pcolumn_desc);
|
||||
index->columns.emplace_back(tc);
|
||||
@ -183,41 +188,43 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
|
||||
for (auto& tcolumn : tschema.partial_update_input_columns) {
|
||||
_partial_update_input_columns.insert(tcolumn);
|
||||
}
|
||||
std::map<std::string, SlotDescriptor*> slots_map;
|
||||
std::unordered_map<std::pair<std::string, PrimitiveType>, SlotDescriptor*> slots_map;
|
||||
_tuple_desc = _obj_pool.add(new TupleDescriptor(tschema.tuple_desc));
|
||||
for (auto& t_slot_desc : tschema.slot_descs) {
|
||||
auto slot_desc = _obj_pool.add(new SlotDescriptor(t_slot_desc));
|
||||
_tuple_desc->add_slot(slot_desc);
|
||||
slots_map.emplace(to_lower(slot_desc->col_name()), slot_desc);
|
||||
slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()), slot_desc->col_type()),
|
||||
slot_desc);
|
||||
}
|
||||
|
||||
for (auto& t_index : tschema.indexes) {
|
||||
std::unordered_map<std::string, SlotDescriptor*> index_slots_map;
|
||||
auto index = _obj_pool.add(new OlapTableIndexSchema());
|
||||
index->index_id = t_index.id;
|
||||
index->schema_hash = t_index.schema_hash;
|
||||
for (auto& col : t_index.columns) {
|
||||
if (_is_partial_update && _partial_update_input_columns.count(col) == 0) {
|
||||
continue;
|
||||
}
|
||||
auto it = slots_map.find(to_lower(col));
|
||||
if (it == std::end(slots_map)) {
|
||||
return Status::InternalError("unknown index column, column={}", col);
|
||||
}
|
||||
index->slots.emplace_back(it->second);
|
||||
}
|
||||
if (t_index.__isset.columns_desc) {
|
||||
for (auto& tcolumn_desc : t_index.columns_desc) {
|
||||
TabletColumn* tc = _obj_pool.add(new TabletColumn());
|
||||
tc->init_from_thrift(tcolumn_desc);
|
||||
index->columns.emplace_back(tc);
|
||||
for (auto& tcolumn_desc : t_index.columns_desc) {
|
||||
auto it = slots_map.find(std::make_pair(to_lower(tcolumn_desc.column_name),
|
||||
thrift_to_type(tcolumn_desc.column_type.type)));
|
||||
if (!_is_partial_update ||
|
||||
_partial_update_input_columns.count(tcolumn_desc.column_name) > 0) {
|
||||
if (it == slots_map.end()) {
|
||||
return Status::InternalError("unknown index column, column={}, type={}",
|
||||
tcolumn_desc.column_name,
|
||||
tcolumn_desc.column_type.type);
|
||||
}
|
||||
index_slots_map.emplace(to_lower(tcolumn_desc.column_name), it->second);
|
||||
index->slots.emplace_back(it->second);
|
||||
}
|
||||
TabletColumn* tc = _obj_pool.add(new TabletColumn());
|
||||
tc->init_from_thrift(tcolumn_desc);
|
||||
index->columns.emplace_back(tc);
|
||||
}
|
||||
if (t_index.__isset.indexes_desc) {
|
||||
for (auto& tindex_desc : t_index.indexes_desc) {
|
||||
std::vector<int32_t> column_unique_ids(tindex_desc.columns.size());
|
||||
for (size_t i = 0; i < tindex_desc.columns.size(); i++) {
|
||||
auto it = slots_map.find(to_lower(tindex_desc.columns[i]));
|
||||
if (it != std::end(slots_map)) {
|
||||
auto it = index_slots_map.find(to_lower(tindex_desc.columns[i]));
|
||||
if (it != index_slots_map.end()) {
|
||||
column_unique_ids[i] = it->second->col_unique_id();
|
||||
}
|
||||
}
|
||||
|
||||
@ -799,6 +799,9 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version
|
||||
_indexes.clear();
|
||||
_field_name_to_index.clear();
|
||||
_field_id_to_index.clear();
|
||||
_delete_sign_idx = -1;
|
||||
_sequence_col_idx = -1;
|
||||
_version_col_idx = -1;
|
||||
|
||||
for (auto& column : index->columns) {
|
||||
if (column->is_key()) {
|
||||
|
||||
@ -143,6 +143,10 @@ private:
|
||||
int32_t _unique_id = -1;
|
||||
std::string _col_name;
|
||||
std::string _col_name_lower_case;
|
||||
// the field _type will change from TPrimitiveType
|
||||
// to string by 'EnumToString(TPrimitiveType, tcolumn.column_type.type, data_type);' (reference: TabletMeta::init_column_from_tcolumn)
|
||||
// to FieldType by 'TabletColumn::get_field_type_by_string' (reference: TabletColumn::init_from_pb).
|
||||
// And the _type in columnPB is string and it changed from FieldType by 'get_string_by_field_type' (reference: TabletColumn::to_schema_pb).
|
||||
FieldType _type;
|
||||
bool _is_key = false;
|
||||
FieldAggregationMethod _aggregation;
|
||||
|
||||
@ -31,6 +31,7 @@
|
||||
#include <memory>
|
||||
|
||||
#include "common/object_pool.h"
|
||||
#include "runtime/primitive_type.h"
|
||||
#include "util/string_util.h"
|
||||
#include "vec/aggregate_functions/aggregate_function.h"
|
||||
#include "vec/data_types/data_type_factory.hpp"
|
||||
@ -59,6 +60,7 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
|
||||
_col_name(tdesc.colName),
|
||||
_col_name_lower_case(to_lower(tdesc.colName)),
|
||||
_col_unique_id(tdesc.col_unique_id),
|
||||
_col_type(thrift_to_type(tdesc.primitive_type)),
|
||||
_slot_idx(tdesc.slotIdx),
|
||||
_field_idx(-1),
|
||||
_is_materialized(tdesc.isMaterialized),
|
||||
@ -77,6 +79,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
|
||||
_col_name(pdesc.col_name()),
|
||||
_col_name_lower_case(to_lower(pdesc.col_name())),
|
||||
_col_unique_id(pdesc.col_unique_id()),
|
||||
_col_type(static_cast<PrimitiveType>(pdesc.col_type())),
|
||||
_slot_idx(pdesc.slot_idx()),
|
||||
_field_idx(-1),
|
||||
_is_materialized(pdesc.is_materialized()),
|
||||
@ -99,6 +102,7 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
|
||||
pslot->set_col_unique_id(_col_unique_id);
|
||||
pslot->set_is_key(_is_key);
|
||||
pslot->set_is_auto_increment(_is_auto_increment);
|
||||
pslot->set_col_type(_col_type);
|
||||
}
|
||||
|
||||
vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const {
|
||||
|
||||
@ -36,6 +36,7 @@
|
||||
#include "common/compiler_util.h" // IWYU pragma: keep
|
||||
#include "common/global_types.h"
|
||||
#include "common/status.h"
|
||||
#include "runtime/define_primitive_type.h"
|
||||
#include "runtime/types.h"
|
||||
#include "vec/data_types/data_type.h"
|
||||
|
||||
@ -113,6 +114,7 @@ public:
|
||||
bool is_auto_increment() const { return _is_auto_increment; }
|
||||
|
||||
const std::string& col_default_value() const { return _col_default_value; }
|
||||
PrimitiveType col_type() const { return _col_type; }
|
||||
|
||||
private:
|
||||
friend class DescriptorTbl;
|
||||
@ -132,6 +134,7 @@ private:
|
||||
const std::string _col_name_lower_case;
|
||||
|
||||
const int32_t _col_unique_id;
|
||||
const PrimitiveType _col_type;
|
||||
|
||||
// the idx of the slot in the tuple descriptor (0-based).
|
||||
// this is provided by the FE
|
||||
|
||||
@ -154,6 +154,7 @@ PrimitiveType thrift_to_type(TPrimitiveType::type ttype) {
|
||||
|
||||
case TPrimitiveType::VARIANT:
|
||||
return TYPE_VARIANT;
|
||||
|
||||
default:
|
||||
CHECK(false) << ", meet unknown type " << ttype;
|
||||
return INVALID_TYPE;
|
||||
@ -259,6 +260,8 @@ TPrimitiveType::type to_thrift(PrimitiveType ptype) {
|
||||
return TPrimitiveType::STRUCT;
|
||||
case TYPE_LAMBDA_FUNCTION:
|
||||
return TPrimitiveType::LAMBDA_FUNCTION;
|
||||
case TYPE_AGG_STATE:
|
||||
return TPrimitiveType::AGG_STATE;
|
||||
|
||||
default:
|
||||
return TPrimitiveType::INVALID_TYPE;
|
||||
@ -365,6 +368,9 @@ std::string type_to_string(PrimitiveType t) {
|
||||
case TYPE_LAMBDA_FUNCTION:
|
||||
return "LAMBDA_FUNCTION TYPE";
|
||||
|
||||
case TYPE_VARIANT:
|
||||
return "VARIANT";
|
||||
|
||||
default:
|
||||
return "";
|
||||
};
|
||||
|
||||
@ -296,13 +296,14 @@ public class SlotDescriptor {
|
||||
public TSlotDescriptor toThrift() {
|
||||
// Non-nullable slots will have 0 for the byte offset and -1 for the bit mask
|
||||
TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(), type.toThrift(), -1,
|
||||
byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? column.getName() : ""), slotIdx,
|
||||
byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? column.getNonShadowName() : ""), slotIdx,
|
||||
isMaterialized);
|
||||
tSlotDescriptor.setNeedMaterialize(needMaterialize);
|
||||
tSlotDescriptor.setIsAutoIncrement(isAutoInc);
|
||||
if (column != null) {
|
||||
LOG.debug("column name:{}, column unique id:{}", column.getName(), column.getUniqueId());
|
||||
LOG.debug("column name:{}, column unique id:{}", column.getNonShadowName(), column.getUniqueId());
|
||||
tSlotDescriptor.setColUniqueId(column.getUniqueId());
|
||||
tSlotDescriptor.setPrimitiveType(column.getDataType().toThrift());
|
||||
tSlotDescriptor.setIsKey(column.isKey());
|
||||
tSlotDescriptor.setColDefaultValue(column.getDefaultValue());
|
||||
}
|
||||
|
||||
@ -491,7 +491,7 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
|
||||
public TColumn toThrift() {
|
||||
TColumn tColumn = new TColumn();
|
||||
tColumn.setColumnName(this.name);
|
||||
tColumn.setColumnName(removeNamePrefix(this.name));
|
||||
|
||||
TColumnType tColumnType = new TColumnType();
|
||||
tColumnType.setType(this.getDataType().toThrift());
|
||||
|
||||
@ -37,6 +37,7 @@ message PSlotDescriptor {
|
||||
optional int32 col_unique_id = 11;
|
||||
optional bool is_key = 12;
|
||||
optional bool is_auto_increment = 13;
|
||||
optional int32 col_type = 14 [default = 0];
|
||||
};
|
||||
|
||||
message PTupleDescriptor {
|
||||
|
||||
@ -63,6 +63,7 @@ struct TSlotDescriptor {
|
||||
// subcolumn path info list for semi structure column(variant)
|
||||
15: optional list<string> column_paths
|
||||
16: optional string col_default_value
|
||||
17: optional Types.TPrimitiveType primitive_type = Types.TPrimitiveType.INVALID_TYPE
|
||||
}
|
||||
|
||||
struct TTupleDescriptor {
|
||||
|
||||
@ -0,0 +1,24 @@
|
||||
CREATE TABLE IF NOT EXISTS `lineorder` (
|
||||
`lo_orderkey` bigint(20) NOT NULL COMMENT "",
|
||||
`lo_linenumber` bigint(20) NOT NULL COMMENT "",
|
||||
`lo_custkey` int(11) NOT NULL COMMENT "",
|
||||
`lo_partkey` int(11) NOT NULL COMMENT "",
|
||||
`lo_suppkey` int(11) NOT NULL COMMENT "",
|
||||
`lo_orderdate` int(11) NOT NULL COMMENT "",
|
||||
`lo_orderpriority` varchar(16) NOT NULL COMMENT "",
|
||||
`lo_shippriority` int(11) NOT NULL COMMENT "",
|
||||
`lo_quantity` bigint(20) NOT NULL COMMENT "",
|
||||
`lo_extendedprice` bigint(20) NOT NULL COMMENT "",
|
||||
`lo_ordtotalprice` bigint(20) NOT NULL COMMENT "",
|
||||
`lo_discount` bigint(20) NOT NULL COMMENT "",
|
||||
`lo_revenue` bigint(20) NOT NULL COMMENT "",
|
||||
`lo_supplycost` bigint(20) NOT NULL COMMENT "",
|
||||
`lo_tax` bigint(20) NOT NULL COMMENT "",
|
||||
`lo_commitdate` bigint(20) NOT NULL COMMENT "",
|
||||
`lo_shipmode` varchar(11) NOT NULL COMMENT ""
|
||||
)
|
||||
DUPLICATE KEY (`lo_orderkey`, `lo_linenumber`)
|
||||
DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
);
|
||||
@ -0,0 +1 @@
|
||||
drop table IF EXISTS lineorder;
|
||||
@ -0,0 +1,99 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
// Most of the cases are copied from https://github.com/trinodb/trino/tree/master
|
||||
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
|
||||
// and modified by Doris.
|
||||
|
||||
// Note: To filter out tables from sql files, use the following one-liner comamnd
|
||||
// sed -nr 's/.*tables: (.*)$/\1/gp' /path/to/*.sql | sed -nr 's/,/\n/gp' | sort | uniq
|
||||
suite("double_write_schema_change") {
|
||||
|
||||
// ssb_sf1_p1 is writted to test unique key table merge correctly.
|
||||
// It creates unique key table and sets bucket num to 1 in order to make sure that
|
||||
// many rowsets will be created during loading and then the merge process will be triggered.
|
||||
|
||||
def tableName = "lineorder"
|
||||
def columns = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
|
||||
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
|
||||
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy"""
|
||||
|
||||
sql new File("""${context.file.parent}/ddl/${tableName}_delete.sql""").text
|
||||
sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
|
||||
|
||||
streamLoad {
|
||||
// a default db 'regression_test' is specified in
|
||||
// ${DORIS_HOME}/conf/regression-conf.groovy
|
||||
table tableName
|
||||
|
||||
// default label is UUID:
|
||||
// set 'label' UUID.randomUUID().toString()
|
||||
|
||||
// default column_separator is specify in doris fe config, usually is '\t'.
|
||||
// this line change to ','
|
||||
set 'column_separator', '|'
|
||||
set 'compress_type', 'GZ'
|
||||
set 'columns', columns
|
||||
|
||||
|
||||
// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
|
||||
// also, you can stream load a http stream, e.g. http://xxx/some.csv
|
||||
file """${getS3Url()}/regression/ssb/sf1/${tableName}.tbl.gz"""
|
||||
|
||||
time 10000 // limit inflight 10s
|
||||
|
||||
// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows
|
||||
|
||||
// if declared a check callback, the default check condition will ignore.
|
||||
// So you must check all condition
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
|
||||
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
|
||||
}
|
||||
}
|
||||
|
||||
def getJobState = { indexName ->
|
||||
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${indexName}' ORDER BY createtime DESC LIMIT 1 """
|
||||
return jobStateResult[0][9]
|
||||
}
|
||||
|
||||
def insert_sql = """ insert into ${tableName} values(100000000, 1, 1, 1, 1, 1, "1", 1, 1, 1, 1, 1, 1, 1, 1, 1, "1") """
|
||||
|
||||
sql """ ALTER TABLE ${tableName} modify COLUMN lo_custkey double"""
|
||||
int max_try_time = 3000
|
||||
while (max_try_time--){
|
||||
String result = getJobState(tableName)
|
||||
if (result == "FINISHED") {
|
||||
sleep(3000)
|
||||
break
|
||||
} else {
|
||||
if (result == "RUNNING") {
|
||||
sql insert_sql
|
||||
}
|
||||
sleep(100)
|
||||
if (max_try_time < 1){
|
||||
assertEquals(1,2)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user