[branch-2.1](cherry-pick) Pick some partial-update PR from master (#33639)
* [Fix](partial-update) Fix partial update fail when the datetime default value is 'current_time' (#32926) * Problem: When importing data that includes datetime with a default value of current time for partial column updates, the import fails. Reason: Partial column updates do not handle the logic for datetime default values. Solution: During partial column updates, when the default value is set to current time, read the current time from the runtime state and write it into the data. * [Enhancement](partial update)Add timezone case for partial update timestamp #33177 * [fix](partial update) Support partial update when the date default value is 'current_date'. This PR is a extension of PR #32926. (#33394)
This commit is contained in:
@ -122,6 +122,8 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
|
||||
if (_is_partial_update) {
|
||||
_auto_increment_column = pschema.auto_increment_column();
|
||||
}
|
||||
_timestamp_ms = pschema.timestamp_ms();
|
||||
_timezone = pschema.timezone();
|
||||
|
||||
for (const auto& col : pschema.partial_update_input_columns()) {
|
||||
_partial_update_input_columns.insert(col);
|
||||
@ -256,6 +258,8 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
|
||||
pschema->set_partial_update(_is_partial_update);
|
||||
pschema->set_is_strict_mode(_is_strict_mode);
|
||||
pschema->set_auto_increment_column(_auto_increment_column);
|
||||
pschema->set_timestamp_ms(_timestamp_ms);
|
||||
pschema->set_timezone(_timezone);
|
||||
for (auto col : _partial_update_input_columns) {
|
||||
*pschema->add_partial_update_input_columns() = col;
|
||||
}
|
||||
|
||||
@ -93,6 +93,10 @@ public:
|
||||
return _partial_update_input_columns;
|
||||
}
|
||||
std::string auto_increment_coulumn() const { return _auto_increment_column; }
|
||||
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
|
||||
int64_t timestamp_ms() const { return _timestamp_ms; }
|
||||
void set_timezone(std::string timezone) { _timezone = timezone; }
|
||||
std::string timezone() const { return _timezone; }
|
||||
bool is_strict_mode() const { return _is_strict_mode; }
|
||||
std::string debug_string() const;
|
||||
|
||||
@ -109,6 +113,8 @@ private:
|
||||
std::set<std::string> _partial_update_input_columns;
|
||||
bool _is_strict_mode = false;
|
||||
std::string _auto_increment_column;
|
||||
int64_t _timestamp_ms = 0;
|
||||
std::string _timezone;
|
||||
};
|
||||
|
||||
using OlapTableIndexTablets = TOlapTableIndexTablets;
|
||||
|
||||
@ -242,7 +242,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
|
||||
_partial_update_info = std::make_shared<PartialUpdateInfo>();
|
||||
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
|
||||
table_schema_param->partial_update_input_columns(),
|
||||
table_schema_param->is_strict_mode());
|
||||
table_schema_param->is_strict_mode(),
|
||||
table_schema_param->timestamp_ms(), table_schema_param->timezone());
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -23,9 +23,12 @@ namespace doris {
|
||||
|
||||
struct PartialUpdateInfo {
|
||||
void init(const TabletSchema& tablet_schema, bool partial_update,
|
||||
const std::set<string>& partial_update_cols, bool is_strict_mode) {
|
||||
const std::set<string>& partial_update_cols, bool is_strict_mode,
|
||||
int64_t timestamp_ms, const std::string& timezone) {
|
||||
is_partial_update = partial_update;
|
||||
partial_update_input_columns = partial_update_cols;
|
||||
this->timestamp_ms = timestamp_ms;
|
||||
this->timezone = timezone;
|
||||
missing_cids.clear();
|
||||
update_cids.clear();
|
||||
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
|
||||
@ -51,5 +54,7 @@ struct PartialUpdateInfo {
|
||||
// to generate a new row, only available in non-strict mode
|
||||
bool can_insert_new_rows_in_partial_update {true};
|
||||
bool is_strict_mode {false};
|
||||
int64_t timestamp_ms {0};
|
||||
std::string timezone;
|
||||
};
|
||||
} // namespace doris
|
||||
|
||||
@ -57,6 +57,7 @@
|
||||
#include "util/faststring.h"
|
||||
#include "util/key_util.h"
|
||||
#include "vec/columns/column_nullable.h"
|
||||
#include "vec/columns/columns_number.h"
|
||||
#include "vec/common/schema_util.h"
|
||||
#include "vec/core/block.h"
|
||||
#include "vec/core/column_with_type_and_name.h"
|
||||
@ -64,6 +65,7 @@
|
||||
#include "vec/io/reader_buffer.h"
|
||||
#include "vec/jsonb/serialize.h"
|
||||
#include "vec/olap/olap_data_convertor.h"
|
||||
#include "vec/runtime/vdatetime_value.h"
|
||||
|
||||
namespace doris {
|
||||
namespace segment_v2 {
|
||||
@ -695,7 +697,29 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
|
||||
for (auto i = 0; i < cids_missing.size(); ++i) {
|
||||
const auto& column = _tablet_schema->column(cids_missing[i]);
|
||||
if (column.has_default_value()) {
|
||||
auto default_value = _tablet_schema->column(cids_missing[i]).default_value();
|
||||
std::string default_value;
|
||||
if (UNLIKELY(_tablet_schema->column(cids_missing[i]).type() ==
|
||||
FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
|
||||
to_lower(_tablet_schema->column(cids_missing[i]).default_value())
|
||||
.find(to_lower("CURRENT_TIMESTAMP")) !=
|
||||
std::string::npos)) {
|
||||
DateV2Value<DateTimeV2ValueType> dtv;
|
||||
dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
|
||||
_opts.rowset_ctx->partial_update_info->timezone);
|
||||
default_value = dtv.debug_string();
|
||||
} else if (UNLIKELY(
|
||||
_tablet_schema->column(cids_missing[i]).type() ==
|
||||
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
|
||||
to_lower(_tablet_schema->column(cids_missing[i]).default_value())
|
||||
.find(to_lower("CURRENT_DATE")) !=
|
||||
std::string::npos)) {
|
||||
DateV2Value<DateV2ValueType> dv;
|
||||
dv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
|
||||
_opts.rowset_ctx->partial_update_info->timezone);
|
||||
default_value = dv.debug_string();
|
||||
} else {
|
||||
default_value = _tablet_schema->column(cids_missing[i]).default_value();
|
||||
}
|
||||
vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
|
||||
default_value.size());
|
||||
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include <algorithm>
|
||||
#include <cassert>
|
||||
#include <ostream>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
|
||||
@ -35,6 +36,7 @@
|
||||
#include "olap/data_dir.h"
|
||||
#include "olap/key_coder.h"
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/partial_update_info.h"
|
||||
#include "olap/primary_key_index.h"
|
||||
#include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep
|
||||
#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
|
||||
@ -54,6 +56,7 @@
|
||||
#include "util/faststring.h"
|
||||
#include "util/key_util.h"
|
||||
#include "vec/columns/column_nullable.h"
|
||||
#include "vec/columns/column_vector.h"
|
||||
#include "vec/columns/columns_number.h"
|
||||
#include "vec/common/schema_util.h"
|
||||
#include "vec/core/block.h"
|
||||
@ -625,7 +628,29 @@ Status VerticalSegmentWriter::_fill_missing_columns(
|
||||
for (auto i = 0; i < missing_cids.size(); ++i) {
|
||||
const auto& column = _tablet_schema->column(missing_cids[i]);
|
||||
if (column.has_default_value()) {
|
||||
auto default_value = _tablet_schema->column(missing_cids[i]).default_value();
|
||||
std::string default_value;
|
||||
if (UNLIKELY(_tablet_schema->column(missing_cids[i]).type() ==
|
||||
FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
|
||||
to_lower(_tablet_schema->column(missing_cids[i]).default_value())
|
||||
.find(to_lower("CURRENT_TIMESTAMP")) !=
|
||||
std::string::npos)) {
|
||||
DateV2Value<DateTimeV2ValueType> dtv;
|
||||
dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
|
||||
_opts.rowset_ctx->partial_update_info->timezone);
|
||||
default_value = dtv.debug_string();
|
||||
} else if (UNLIKELY(
|
||||
_tablet_schema->column(missing_cids[i]).type() ==
|
||||
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
|
||||
to_lower(_tablet_schema->column(missing_cids[i]).default_value())
|
||||
.find(to_lower("CURRENT_DATE")) !=
|
||||
std::string::npos)) {
|
||||
DateV2Value<DateV2ValueType> dv;
|
||||
dv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
|
||||
_opts.rowset_ctx->partial_update_info->timezone);
|
||||
default_value = dv.debug_string();
|
||||
} else {
|
||||
default_value = _tablet_schema->column(missing_cids[i]).default_value();
|
||||
}
|
||||
vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
|
||||
default_value.size());
|
||||
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
|
||||
|
||||
@ -374,7 +374,8 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
|
||||
_partial_update_info = std::make_shared<PartialUpdateInfo>();
|
||||
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
|
||||
table_schema_param->partial_update_input_columns(),
|
||||
table_schema_param->is_strict_mode());
|
||||
table_schema_param->is_strict_mode(),
|
||||
table_schema_param->timestamp_ms(), table_schema_param->timezone());
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -1128,6 +1128,8 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
|
||||
_tuple_desc_id = table_sink.tuple_id;
|
||||
_schema.reset(new OlapTableSchemaParam());
|
||||
RETURN_IF_ERROR(_schema->init(table_sink.schema));
|
||||
_schema->set_timestamp_ms(state->timestamp_ms());
|
||||
_schema->set_timezone(state->timezone());
|
||||
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
|
||||
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
|
||||
if (table_sink.__isset.write_single_replica && table_sink.write_single_replica) {
|
||||
|
||||
@ -150,6 +150,8 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
|
||||
_write_file_cache = table_sink.write_file_cache;
|
||||
_schema.reset(new OlapTableSchemaParam());
|
||||
RETURN_IF_ERROR(_schema->init(table_sink.schema));
|
||||
_schema->set_timestamp_ms(state->timestamp_ms());
|
||||
_schema->set_timezone(state->timezone());
|
||||
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
|
||||
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
|
||||
|
||||
|
||||
@ -70,5 +70,7 @@ message POlapTableSchemaParam {
|
||||
repeated string partial_update_input_columns = 8;
|
||||
optional bool is_strict_mode = 9 [default = false];
|
||||
optional string auto_increment_column = 10;
|
||||
optional int64 timestamp_ms = 11 [default = 0];
|
||||
optional string timezone = 12;
|
||||
};
|
||||
|
||||
|
||||
@ -35,6 +35,18 @@
|
||||
3 "stranger" 500 \N 4321
|
||||
4 "foreigner" 600 \N 4321
|
||||
|
||||
-- !select_timestamp --
|
||||
1
|
||||
|
||||
-- !select_timestamp2 --
|
||||
11
|
||||
|
||||
-- !select_date --
|
||||
1
|
||||
|
||||
-- !select_date2 --
|
||||
2
|
||||
|
||||
-- !select_default --
|
||||
1 doris 200 123 1
|
||||
2 doris2 400 223 1
|
||||
@ -71,3 +83,15 @@
|
||||
3 "stranger" 500 \N 4321
|
||||
4 "foreigner" 600 \N 4321
|
||||
|
||||
-- !select_timestamp --
|
||||
1
|
||||
|
||||
-- !select_timestamp2 --
|
||||
11
|
||||
|
||||
-- !select_date --
|
||||
1
|
||||
|
||||
-- !select_date2 --
|
||||
2
|
||||
|
||||
|
||||
@ -185,8 +185,63 @@ suite("test_primary_key_partial_update", "p0") {
|
||||
select * from ${tableName} order by id;
|
||||
"""
|
||||
|
||||
// drop drop
|
||||
// drop table
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
|
||||
sql """ CREATE TABLE ${tableName} (
|
||||
`name` VARCHAR(600) NULL,
|
||||
`userid` INT NOT NULL,
|
||||
`seq` BIGINT NOT NULL AUTO_INCREMENT(1),
|
||||
`ctime` DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
|
||||
`rtime` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
|
||||
`corp_name` VARCHAR(600) NOT NULL
|
||||
) ENGINE = OLAP UNIQUE KEY(`name`, `userid`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`name`) BUCKETS 10
|
||||
PROPERTIES ("replication_num" = "1",
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"store_row_column" = "${use_row_store}"); """
|
||||
|
||||
sql "set enable_unique_key_partial_update=true;"
|
||||
sql "set enable_insert_strict=false;"
|
||||
|
||||
sql "INSERT INTO ${tableName}(`name`, `userid`, `corp_name`) VALUES ('test1', 1234567, 'A');"
|
||||
|
||||
qt_select_timestamp "select count(*) from ${tableName} where `ctime` > \"1970-01-01\""
|
||||
|
||||
sql "set time_zone = 'America/New_York'"
|
||||
|
||||
Thread.sleep(5000)
|
||||
|
||||
sql "INSERT INTO ${tableName}(`name`, `userid`, `corp_name`) VALUES ('test2', 1234567, 'A');"
|
||||
|
||||
qt_select_timestamp2 "SELECT ABS(TIMESTAMPDIFF(HOUR, MIN(ctime), MAX(ctime))) AS time_difference_hours FROM ${tableName};"
|
||||
|
||||
// drop table
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
|
||||
sql """ SET enable_nereids_planner=true; """
|
||||
sql """ CREATE TABLE ${tableName} (
|
||||
`name` VARCHAR(600) NULL,
|
||||
`userid` INT NOT NULL,
|
||||
`seq` BIGINT NOT NULL AUTO_INCREMENT(1),
|
||||
`ctime` DATE DEFAULT CURRENT_DATE,
|
||||
`corp_name` VARCHAR(600) NOT NULL
|
||||
) ENGINE = OLAP UNIQUE KEY(`name`, `userid`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`name`) BUCKETS 10
|
||||
PROPERTIES ("replication_num" = "1",
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"store_row_column" = "${use_row_store}"); """
|
||||
|
||||
sql "set enable_unique_key_partial_update=true;"
|
||||
sql "set enable_insert_strict=false;"
|
||||
|
||||
sql "INSERT INTO ${tableName}(`name`, `userid`, `corp_name`) VALUES ('test1', 1234567, 'A');"
|
||||
|
||||
qt_select_date "select count(*) from ${tableName} where `ctime` > \"1970-01-01\""
|
||||
|
||||
sql "set time_zone = 'America/New_York'"
|
||||
|
||||
sql "INSERT INTO ${tableName}(`name`, `userid`, `corp_name`) VALUES ('test2', 1234567, 'B');"
|
||||
|
||||
qt_select_date2 "select count(*) from ${tableName} where `ctime` > \"1970-01-01\""
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user