[Fix](partial-update) Correct the alignment process when the table has sequence column and add cases (#25346)
This PR fix the alignment process during publish phase when conflict occurs during concurrent partial updates: if we encounter a row with the same key and larger value in sequence column, it means that there exists another load which introduces a row with the same keys and larger sequence column value published successfully after the commit phase of the current load. We should act as follows: - If the columns we update include sequence column, we should delete the current row becase the partial update on the current row has been overwritten by the previous one with larger sequence column value. - Otherwise, we should combine the values of the missing columns in the previous row and the values of the including columns in the current row into a new row.
This commit is contained in:
@ -2934,6 +2934,15 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
|
||||
Version dummy_version(end_version + 1, end_version + 1);
|
||||
auto rowset_schema = rowset->tablet_schema();
|
||||
bool is_partial_update = rowset_writer && rowset_writer->is_partial_update();
|
||||
bool have_input_seq_column = false;
|
||||
if (is_partial_update && rowset_schema->has_sequence_col()) {
|
||||
std::vector<uint32_t> including_cids =
|
||||
rowset_writer->get_partial_update_info()->update_cids;
|
||||
have_input_seq_column =
|
||||
rowset_schema->has_sequence_col() &&
|
||||
(std::find(including_cids.cbegin(), including_cids.cend(),
|
||||
rowset_schema->sequence_col_idx()) != including_cids.cend());
|
||||
}
|
||||
// use for partial update
|
||||
PartialUpdateReadPlan read_plan_ori;
|
||||
PartialUpdateReadPlan read_plan_update;
|
||||
@ -3002,12 +3011,24 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
|
||||
continue;
|
||||
}
|
||||
|
||||
// sequence id smaller than the previous one, so delete current row
|
||||
if (st.is<KEY_ALREADY_EXISTS>()) {
|
||||
if (st.is<KEY_ALREADY_EXISTS>() && (!is_partial_update || have_input_seq_column)) {
|
||||
// `st.is<KEY_ALREADY_EXISTS>()` means that there exists a row with the same key and larger value
|
||||
// in seqeunce column.
|
||||
// - If the current load is not a partial update, we just delete current row.
|
||||
// - Otherwise, it means that we are doing the alignment process in publish phase due to conflicts
|
||||
// during concurrent partial updates. And there exists another load which introduces a row with
|
||||
// the same keys and larger sequence column value published successfully after the commit phase
|
||||
// of the current load.
|
||||
// - If the columns we update include sequence column, we should delete the current row becase the
|
||||
// partial update on the current row has been `overwritten` by the previous one with larger sequence
|
||||
// column value.
|
||||
// - Otherwise, we should combine the values of the missing columns in the previous row and the values
|
||||
// of the including columns in the current row into a new row.
|
||||
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
|
||||
row_id);
|
||||
continue;
|
||||
} else if (is_partial_update && rowset_writer != nullptr) {
|
||||
}
|
||||
if (is_partial_update && rowset_writer != nullptr) {
|
||||
// In publish version, record rows to be deleted for concurrent update
|
||||
// For example, if version 5 and 6 update a row, but version 6 only see
|
||||
// version 4 when write, and when publish version, version 5's value will
|
||||
|
||||
@ -6,3 +6,24 @@
|
||||
4 "bbbbbbbb" 4444 499 40
|
||||
5 "cccccccccccc" 5555 599 50
|
||||
|
||||
-- !sql --
|
||||
1 "ddddddddddd" 1111 199 10
|
||||
2 "eeeeee" 2222 299 20
|
||||
3 "aaaaa" 3333 399 30
|
||||
4 "bbbbbbbb" 4444 499 40
|
||||
5 "cccccccccccc" 5555 599 50
|
||||
|
||||
-- !sql --
|
||||
1 "ddddddddddd" 1111 199 10 0 5 10
|
||||
2 "eeeeee" 2222 299 20 0 5 20
|
||||
3 "aaaaa" 3333 399 30 0 5 30
|
||||
4 "bbbbbbbb" 4444 499 40 0 5 40
|
||||
5 "cccccccccccc" 5555 599 50 0 5 50
|
||||
|
||||
-- !sql --
|
||||
1 "ddddddddddd" 1111 199 10 0 5 10
|
||||
2 "eeeeee" 2222 299 20 0 5 20
|
||||
3 "aaaaa" 3333 399 30 0 5 30
|
||||
4 "bbbbbbbb" 4444 499 40 0 5 40
|
||||
5 "cccccccccccc" 5555 599 50 0 5 50
|
||||
|
||||
|
||||
@ -17,9 +17,8 @@
|
||||
|
||||
suite("test_primary_key_partial_update_parallel", "p0") {
|
||||
|
||||
// case 1: concurrent partial update
|
||||
def tableName = "test_primary_key_partial_update"
|
||||
|
||||
// create table
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE ${tableName} (
|
||||
@ -90,5 +89,247 @@ suite("test_primary_key_partial_update_parallel", "p0") {
|
||||
qt_sql """ select * from ${tableName} order by id;"""
|
||||
|
||||
sql """ DROP TABLE IF EXISTS ${tableName}; """
|
||||
|
||||
|
||||
// case 2: concurrent partial update with row store column
|
||||
tableName = "test_primary_key_row_store_partial_update"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NOT NULL COMMENT "用户 ID",
|
||||
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
|
||||
`score` int(11) NOT NULL COMMENT "用户得分",
|
||||
`test` int(11) NULL COMMENT "null test",
|
||||
`dft` int(11) DEFAULT "4321")
|
||||
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "store_row_column" = "true")
|
||||
"""
|
||||
|
||||
sql """insert into ${tableName} values
|
||||
(2, "doris2", 2000, 223, 2),
|
||||
(1, "doris", 1000, 123, 1),
|
||||
(5, "doris5", 5000, 523, 5),
|
||||
(4, "doris4", 4000, 423, 4),
|
||||
(3, "doris3", 3000, 323, 3);"""
|
||||
|
||||
t1 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,name'
|
||||
|
||||
file 'partial_update_parallel1.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t2 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,score,test'
|
||||
|
||||
file 'partial_update_parallel2.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t3 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,dft'
|
||||
|
||||
file 'partial_update_parallel3.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t1.join()
|
||||
t2.join()
|
||||
t3.join()
|
||||
|
||||
sql "sync"
|
||||
|
||||
qt_sql """ select * from ${tableName} order by id;"""
|
||||
|
||||
sql """ DROP TABLE IF EXISTS ${tableName}; """
|
||||
|
||||
|
||||
// case 3: concurrent partial update with sequence column
|
||||
tableName = "test_primary_key_seq_partial_update"
|
||||
|
||||
// create table
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NOT NULL COMMENT "用户 ID",
|
||||
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
|
||||
`score` int(11) NOT NULL COMMENT "用户得分",
|
||||
`test` int(11) NULL COMMENT "null test",
|
||||
`dft` int(11) DEFAULT "4321")
|
||||
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES(
|
||||
"replication_num" = "1",
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"function_column.sequence_col" = "dft")
|
||||
"""
|
||||
|
||||
sql """insert into ${tableName} values
|
||||
(2, "deprecated", 99999, 999, 1),
|
||||
(2, "doris2", 2000, 223, 2),
|
||||
(1, "doris", 1000, 123, 1),
|
||||
(3, "deprecated", 99999, 999, 2),
|
||||
(5, "doris5", 5000, 523, 5),
|
||||
(4, "doris4", 4000, 423, 4),
|
||||
(4, "deprecated", 99999, 999, 3),
|
||||
(4, "deprecated", 99999, 999, 1),
|
||||
(3, "doris3", 3000, 323, 3);"""
|
||||
|
||||
t1 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,name'
|
||||
|
||||
file 'partial_update_parallel1.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t2 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,score,test'
|
||||
|
||||
file 'partial_update_parallel2.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t3 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,dft'
|
||||
|
||||
file 'partial_update_parallel3.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t1.join()
|
||||
t2.join()
|
||||
t3.join()
|
||||
|
||||
sql "set show_hidden_columns=true;"
|
||||
sql "sync"
|
||||
|
||||
qt_sql """ select * from ${tableName} order by id;"""
|
||||
sql "set show_hidden_columns=false;"
|
||||
sql "sync"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName}; """
|
||||
|
||||
|
||||
// case 4: concurrent partial update with row store column and sequence column
|
||||
tableName = "test_primary_key_row_store_seq_partial_update"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NOT NULL COMMENT "用户 ID",
|
||||
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
|
||||
`score` int(11) NOT NULL COMMENT "用户得分",
|
||||
`test` int(11) NULL COMMENT "null test",
|
||||
`dft` int(11) DEFAULT "4321")
|
||||
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES(
|
||||
"replication_num" = "1",
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"function_column.sequence_col" = "dft",
|
||||
"store_row_column" = "true")
|
||||
"""
|
||||
|
||||
sql """insert into ${tableName} values
|
||||
(2, "deprecated", 99999, 999, 1),
|
||||
(2, "doris2", 2000, 223, 2),
|
||||
(1, "doris", 1000, 123, 1),
|
||||
(3, "deprecated", 99999, 999, 2),
|
||||
(5, "doris5", 5000, 523, 5),
|
||||
(4, "doris4", 4000, 423, 4),
|
||||
(4, "deprecated", 99999, 999, 3),
|
||||
(4, "deprecated", 99999, 999, 1),
|
||||
(3, "doris3", 3000, 323, 3);"""
|
||||
|
||||
t1 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,name'
|
||||
|
||||
file 'partial_update_parallel1.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t2 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,score,test'
|
||||
|
||||
file 'partial_update_parallel2.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t3 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,dft'
|
||||
|
||||
file 'partial_update_parallel3.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t1.join()
|
||||
t2.join()
|
||||
t3.join()
|
||||
|
||||
sql "set show_hidden_columns=true;"
|
||||
sql "sync"
|
||||
|
||||
qt_sql """ select id,name,score,test,dft,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__ from ${tableName} order by id;"""
|
||||
|
||||
sql """ DROP TABLE IF EXISTS ${tableName}; """
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user