[branch-2.1] Picks "[Fix](partial update) Fix __DORIS_SEQUENCE_COL__ is not set for newly inserted rows in partial update #40272" (#40964)

picks https://github.com/apache/doris/pull/40272
This commit is contained in:
bobhan1
2024-09-26 22:54:27 +08:00
committed by GitHub
parent e6ce4a2c26
commit eb13cd4154
15 changed files with 412 additions and 16 deletions

View File

@ -129,6 +129,9 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_auto_increment_column_unique_id = pschema.auto_increment_column_unique_id();
}
_timestamp_ms = pschema.timestamp_ms();
if (pschema.has_nano_seconds()) {
_nano_seconds = pschema.nano_seconds();
}
_timezone = pschema.timezone();
for (const auto& col : pschema.partial_update_input_columns()) {
@ -273,6 +276,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
pschema->set_nano_seconds(_nano_seconds);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}

View File

@ -96,6 +96,8 @@ public:
int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; }
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
int64_t timestamp_ms() const { return _timestamp_ms; }
void set_nano_seconds(int32_t nano_seconds) { _nano_seconds = nano_seconds; }
int32_t nano_seconds() const { return _nano_seconds; }
void set_timezone(std::string timezone) { _timezone = timezone; }
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
@ -116,6 +118,7 @@ private:
std::string _auto_increment_column;
int32_t _auto_increment_column_unique_id;
int64_t _timestamp_ms = 0;
int32_t _nano_seconds {0};
std::string _timezone;
};

View File

@ -239,7 +239,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn());
}

View File

@ -29,12 +29,14 @@ namespace doris {
void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version) {
int64_t timestamp_ms, int32_t nano_seconds,
const std::string& timezone, const std::string& auto_increment_column,
int64_t cur_max_version) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
this->nano_seconds = nano_seconds;
this->timezone = timezone;
missing_cids.clear();
update_cids.clear();
@ -75,6 +77,7 @@ void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const
can_insert_new_rows_in_partial_update);
partial_update_info_pb->set_is_strict_mode(is_strict_mode);
partial_update_info_pb->set_timestamp_ms(timestamp_ms);
partial_update_info_pb->set_nano_seconds(nano_seconds);
partial_update_info_pb->set_timezone(timezone);
partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
is_input_columns_contains_auto_inc_column);
@ -111,6 +114,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
partial_update_info_pb->is_input_columns_contains_auto_inc_column();
is_schema_contains_auto_inc_column =
partial_update_info_pb->is_schema_contains_auto_inc_column();
if (partial_update_info_pb->has_nano_seconds()) {
nano_seconds = partial_update_info_pb->nano_seconds();
}
default_values.clear();
for (const auto& value : partial_update_info_pb->default_values()) {
default_values.push_back(value);
@ -134,9 +140,18 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids(
to_lower(tablet_schema.column(cur_cid).default_value())
.find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
auto pos = to_lower(tablet_schema.column(cur_cid).default_value()).find('(');
if (pos == std::string::npos) {
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
} else {
int precision = std::stoi(
tablet_schema.column(cur_cid).default_value().substr(pos + 1));
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision);
default_value = dtv.debug_string();
}
} else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
to_lower(tablet_schema.column(cur_cid).default_value())

View File

@ -28,7 +28,7 @@ class PartialUpdateInfoPB;
struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
@ -48,6 +48,7 @@ public:
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
int64_t timestamp_ms {0};
int32_t nano_seconds {0};
std::string timezone;
bool is_input_columns_contains_auto_inc_column = false;
bool is_schema_contains_auto_inc_column = false;

View File

@ -410,12 +410,12 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn(),
_max_version_in_flush_phase);
_partial_update_info->init(
*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn(), _max_version_in_flush_phase);
}
} // namespace doris

View File

@ -1144,6 +1144,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_schema->set_timestamp_ms(state->timestamp_ms());
_schema->set_nano_seconds(state->nano_seconds());
_schema->set_timezone(state->timezone());
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));

View File

@ -147,6 +147,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_schema->set_timestamp_ms(state->timestamp_ms());
_schema->set_nano_seconds(state->nano_seconds());
_schema->set_timezone(state->timezone());
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));

View File

@ -1024,4 +1024,10 @@ public class Column implements Writable, GsonPostProcessable {
return getName().startsWith(CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX)
|| getName().startsWith(CreateMaterializedViewStmt.MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX);
}
public void setDefaultValueInfo(Column refColumn) {
this.defaultValue = refColumn.defaultValue;
this.defaultValueExprDef = refColumn.defaultValueExprDef;
this.realDefaultValue = refColumn.realDefaultValue;
}
}

View File

@ -1207,7 +1207,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
getOrCreatTableProperty().setSequenceMapCol(colName);
}
public void setSequenceInfo(Type type) {
public void setSequenceInfo(Type type, Column refColumn) {
this.hasSequenceCol = true;
this.sequenceType = type;
@ -1221,6 +1221,9 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
// unique key table
sequenceCol = ColumnDef.newSequenceColumnDef(type, AggregateType.REPLACE).toColumn();
}
if (refColumn != null) {
sequenceCol.setDefaultValueInfo(refColumn);
}
// add sequence column at last
fullSchema.add(sequenceCol);
nameToColumn.put(Column.SEQUENCE_COL, sequenceCol);
@ -1717,6 +1720,18 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
defaultDistributionInfo.markAutoBucket();
}
if (isUniqKeyMergeOnWrite() && getSequenceMapCol() != null) {
// set the hidden sequence column's default value the same with
// the sequence map column's for partial update
String seqMapColName = getSequenceMapCol();
Column seqMapCol = getBaseSchema().stream().filter(col -> col.getName().equalsIgnoreCase(seqMapColName))
.findFirst().orElse(null);
Column hiddenSeqCol = getSequenceCol();
if (seqMapCol != null && hiddenSeqCol != null) {
hiddenSeqCol.setDefaultValueInfo(seqMapCol);
}
}
// temp partitions
tempPartitions = TempPartitions.read(in);
RangePartitionInfo tempRangeInfo = tempPartitions.getPartitionInfo();

View File

@ -2759,7 +2759,7 @@ public class InternalCatalog implements CatalogIf<Database> {
throw new DdlException("Sequence type only support integer types and date types");
}
olapTable.setSequenceMapCol(col.getName());
olapTable.setSequenceInfo(col.getType());
olapTable.setSequenceInfo(col.getType(), col);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());
@ -2773,7 +2773,7 @@ public class InternalCatalog implements CatalogIf<Database> {
throw new DdlException("The sequence_col and sequence_type cannot be set at the same time");
}
if (sequenceColType != null) {
olapTable.setSequenceInfo(sequenceColType);
olapTable.setSequenceInfo(sequenceColType, null);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());

View File

@ -73,5 +73,6 @@ message POlapTableSchemaParam {
optional int64 timestamp_ms = 11 [default = 0];
optional string timezone = 12;
optional int32 auto_increment_column_unique_id = 13 [default = -1];
optional int32 nano_seconds = 14 [default = 0];
};

View File

@ -405,4 +405,5 @@ message PartialUpdateInfoPB {
optional bool is_schema_contains_auto_inc_column = 10 [default = false];
repeated string default_values = 11;
optional int64 max_version_in_flush_phase = 12 [default = -1];
optional int32 nano_seconds = 13 [default = 0];
}

View File

@ -0,0 +1,189 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql1 --
1 1 \N
2 2 \N
3 3 \N
4 4 \N
-- !sql1 --
1 1 20
2 2 20
3 3 \N
4 4 \N
-- !sql1 --
1 1 20
2 2 20
3 3 \N
4 4 \N
-- !sql1 --
3 3 2099-09-10T12:00:00.977174 \N 2099-09-10T12:00:00.977174
4 4 2099-09-10T12:00:00.977174 \N 2099-09-10T12:00:00.977174
-- !sql2 --
1 1
2 2
3 3
4 4
-- !sql3 --
1 1 999 999
2 2 999 999
3 3 999 999
4 4 999 999
-- !sql3 --
1 99 8888 8888
2 99 8888 8888
3 3 999 999
4 4 999 999
5 99 8888 8888
-- !sql4 --
1 1 \N \N
2 2 \N \N
3 3 \N \N
4 4 \N \N
-- !sql1 --
1 1 \N
2 2 \N
3 3 \N
4 4 \N
-- !sql1 --
1 1 20
2 2 20
3 3 \N
4 4 \N
-- !sql1 --
1 1 20
2 2 20
3 3 \N
4 4 \N
-- !sql1 --
3 3 2099-09-10T12:00:00.977174 \N 2099-09-10T12:00:00.977174
4 4 2099-09-10T12:00:00.977174 \N 2099-09-10T12:00:00.977174
-- !sql2 --
1 1
2 2
3 3
4 4
-- !sql3 --
1 1 999 999
2 2 999 999
3 3 999 999
4 4 999 999
-- !sql3 --
1 99 8888 8888
2 99 8888 8888
3 3 999 999
4 4 999 999
5 99 8888 8888
-- !sql4 --
1 1 \N \N
2 2 \N \N
3 3 \N \N
4 4 \N \N
-- !sql1 --
1 1 \N
2 2 \N
3 3 \N
4 4 \N
-- !sql1 --
1 1 20
2 2 20
3 3 \N
4 4 \N
-- !sql1 --
1 1 20
2 2 20
3 3 \N
4 4 \N
-- !sql1 --
3 3 2099-09-10T12:00:00.977174 \N 2099-09-10T12:00:00.977174
4 4 2099-09-10T12:00:00.977174 \N 2099-09-10T12:00:00.977174
-- !sql2 --
1 1
2 2
3 3
4 4
-- !sql3 --
1 1 999 999
2 2 999 999
3 3 999 999
4 4 999 999
-- !sql3 --
1 99 8888 8888
2 99 8888 8888
3 3 999 999
4 4 999 999
5 99 8888 8888
-- !sql4 --
1 1 \N \N
2 2 \N \N
3 3 \N \N
4 4 \N \N
-- !sql1 --
1 1 \N
2 2 \N
3 3 \N
4 4 \N
-- !sql1 --
1 1 20
2 2 20
3 3 \N
4 4 \N
-- !sql1 --
1 1 20
2 2 20
3 3 \N
4 4 \N
-- !sql1 --
3 3 2099-09-10T12:00:00.977174 \N 2099-09-10T12:00:00.977174
4 4 2099-09-10T12:00:00.977174 \N 2099-09-10T12:00:00.977174
-- !sql2 --
1 1
2 2
3 3
4 4
-- !sql3 --
1 1 999 999
2 2 999 999
3 3 999 999
4 4 999 999
-- !sql3 --
1 99 8888 8888
2 99 8888 8888
3 3 999 999
4 4 999 999
5 99 8888 8888
-- !sql4 --
1 1 \N \N
2 2 \N \N
3 3 \N \N
4 4 \N \N

View File

@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_partial_update_seq_map_col", "p0") {
for (def use_nereids : [true, false]) {
for (def use_row_store : [false, true]) {
logger.info("current params: use_nereids: ${use_nereids}, use_row_store: ${use_row_store}")
if (use_nereids) {
sql """ set enable_nereids_planner=true; """
sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_planner = false; """
}
sql "set enable_insert_strict=false;"
sql "set enable_unique_key_partial_update=true;"
sql "sync;"
def tableName = "test_partial_update_seq_map_col1"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ CREATE TABLE IF NOT EXISTS ${tableName} (
`k` BIGINT NOT NULL,
`c1` int,
`c2` datetime(6) null default current_timestamp(6),
c3 int,
c4 int,
c5 int,
c6 int
) UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true",
"function_column.sequence_col" = "c2",
"store_row_column" = "${use_row_store}"); """
sql "insert into ${tableName}(k,c1) values(1,1);"
sql "insert into ${tableName}(k,c1) values(2,2);"
sql "insert into ${tableName}(k,c1) values(3,3);"
sql "insert into ${tableName}(k,c1) values(4,4);"
order_qt_sql1 "select k,c1,c3 from ${tableName} where c2=__DORIS_SEQUENCE_COL__;"
// update column which is not sequence map col
if (use_nereids) {
explain {
sql "update ${tableName} set c3=20 where c1<=2;"
contains "IS_PARTIAL_UPDATE: false"
}
}
sql "update ${tableName} set c3=20 where c1<=2;"
order_qt_sql1 "select k,c1,c3 from ${tableName} where c2=__DORIS_SEQUENCE_COL__;"
// update sequence map col
if (use_nereids) {
explain {
sql "update ${tableName} set c2='2099-09-10 12:00:00.977174' where k>2;"
contains "IS_PARTIAL_UPDATE: false"
}
}
sql "update ${tableName} set c2='2099-09-10 12:00:00.977174' where k>2;"
order_qt_sql1 "select k,c1,c3 from ${tableName} where c2=__DORIS_SEQUENCE_COL__;"
order_qt_sql1 "select k,c1,c2,c3,__DORIS_SEQUENCE_COL__ from ${tableName} where c1>2;"
tableName = "test_partial_update_seq_map_col2"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ CREATE TABLE IF NOT EXISTS ${tableName} (
`k` BIGINT NOT NULL,
`c1` int,
`c2` datetime not null default current_timestamp,
) UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true",
"function_column.sequence_col" = "c2",
"store_row_column" = "${use_row_store}"); """
sql "insert into ${tableName}(k,c1) values(1,1);"
sql "insert into ${tableName}(k,c1) values(2,2);"
sql "insert into ${tableName}(k,c1) values(3,3);"
sql "insert into ${tableName}(k,c1) values(4,4);"
order_qt_sql2 "select k,c1 from ${tableName} where c2=__DORIS_SEQUENCE_COL__;"
tableName = "test_partial_update_seq_map_col3"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ CREATE TABLE IF NOT EXISTS ${tableName} (
`k` BIGINT NOT NULL,
`c1` int,
`c2` int not null default "999",
) UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true",
"function_column.sequence_col" = "c2",
"store_row_column" = "${use_row_store}"); """
sql "insert into ${tableName}(k,c1) values(1,1);"
sql "insert into ${tableName}(k,c1) values(2,2);"
sql "insert into ${tableName}(k,c1) values(3,3);"
sql "insert into ${tableName}(k,c1) values(4,4);"
order_qt_sql3 "select k,c1,c2,__DORIS_SEQUENCE_COL__ from ${tableName};"
sql "insert into ${tableName}(k,c1,c2) values(1,99,8888);"
sql "insert into ${tableName}(k,c1,c2) values(2,99,8888);"
sql "insert into ${tableName}(k,c1,c2) values(4,99,77);"
sql "insert into ${tableName}(k,c1,c2) values(5,99,8888);"
order_qt_sql3 "select k,c1,c2,__DORIS_SEQUENCE_COL__ from ${tableName}"
tableName = "test_partial_update_seq_map_col4"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ CREATE TABLE IF NOT EXISTS ${tableName} (
`k` BIGINT NOT NULL,
`c1` int,
`c2` int null,
) UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true",
"function_column.sequence_col" = "c2",
"store_row_column" = "${use_row_store}"); """
sql "insert into ${tableName}(k,c1) values(1,1);"
sql "insert into ${tableName}(k,c1) values(2,2);"
sql "insert into ${tableName}(k,c1) values(3,3);"
sql "insert into ${tableName}(k,c1) values(4,4);"
order_qt_sql4 "select k,c1,c2,__DORIS_SEQUENCE_COL__ from ${tableName};"
tableName = "test_partial_update_seq_map_col5"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ CREATE TABLE IF NOT EXISTS ${tableName} (
`k` BIGINT NOT NULL,
`c1` int,
`c2` int not null
) UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true",
"function_column.sequence_col" = "c2",
"store_row_column" = "${use_row_store}"); """
test {
sql "insert into ${tableName}(k,c1) values(1,1);"
exception "the unmentioned column `c2` should have default value or be nullable for newly inserted rows in non-strict mode partial update"
}
}
}
}