[fix](partial update) fix some bugs about delete sign (#25712)
This commit is contained in:
@ -33,20 +33,16 @@ using namespace ErrorCode;
|
||||
Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset,
|
||||
const segment_v2::SegmentSharedPtr& cur_segment,
|
||||
const std::vector<RowsetSharedPtr>& target_rowsets,
|
||||
int64_t end_version, RowsetWriter* rowset_writer) {
|
||||
int64_t end_version, DeleteBitmapPtr delete_bitmap,
|
||||
RowsetWriter* rowset_writer) {
|
||||
{
|
||||
std::shared_lock rlock(_lock);
|
||||
RETURN_IF_ERROR(_status);
|
||||
}
|
||||
|
||||
DeleteBitmapPtr bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
|
||||
{
|
||||
std::lock_guard wlock(_lock);
|
||||
_delete_bitmaps.push_back(bitmap);
|
||||
}
|
||||
return _thread_token->submit_func([=, this]() {
|
||||
auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, target_rowsets,
|
||||
bitmap, end_version, rowset_writer);
|
||||
delete_bitmap, end_version, rowset_writer);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: "
|
||||
<< tablet->tablet_id() << " rowset: " << cur_rowset->rowset_id()
|
||||
@ -66,17 +62,6 @@ Status CalcDeleteBitmapToken::wait() {
|
||||
return _status;
|
||||
}
|
||||
|
||||
Status CalcDeleteBitmapToken::get_delete_bitmap(DeleteBitmapPtr res_bitmap) {
|
||||
std::lock_guard wlock(_lock);
|
||||
RETURN_IF_ERROR(_status);
|
||||
|
||||
for (auto bitmap : _delete_bitmaps) {
|
||||
res_bitmap->merge(*bitmap);
|
||||
}
|
||||
_delete_bitmaps.clear();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void CalcDeleteBitmapExecutor::init() {
|
||||
static_cast<void>(ThreadPoolBuilder("TabletCalcDeleteBitmapThreadPool")
|
||||
.set_min_threads(1)
|
||||
|
||||
@ -52,21 +52,17 @@ public:
|
||||
Status submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset,
|
||||
const segment_v2::SegmentSharedPtr& cur_segment,
|
||||
const std::vector<RowsetSharedPtr>& target_rowsets, int64_t end_version,
|
||||
RowsetWriter* rowset_writer);
|
||||
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer);
|
||||
|
||||
// wait all tasks in token to be completed.
|
||||
Status wait();
|
||||
|
||||
void cancel() { _thread_token->shutdown(); }
|
||||
|
||||
Status get_delete_bitmap(DeleteBitmapPtr res_bitmap);
|
||||
|
||||
private:
|
||||
std::unique_ptr<ThreadPoolToken> _thread_token;
|
||||
|
||||
std::shared_mutex _lock;
|
||||
std::vector<DeleteBitmapPtr> _delete_bitmaps;
|
||||
|
||||
// Records the current status of the calc delete bitmap job.
|
||||
// Note: Once its value is set to Failed, it cannot return to SUCCESS.
|
||||
Status _status;
|
||||
|
||||
@ -422,6 +422,17 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
|
||||
}
|
||||
_maybe_invalid_row_cache(key);
|
||||
|
||||
// mark key with delete sign as deleted.
|
||||
bool have_delete_sign =
|
||||
(delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0);
|
||||
if (have_delete_sign && !_tablet_schema->has_sequence_col() && !have_input_seq_column) {
|
||||
// we can directly use delete bitmap to mark the rows with delete sign as deleted
|
||||
// if sequence column doesn't exist to eliminate reading delete sign columns in later reads
|
||||
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id,
|
||||
DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
|
||||
segment_pos);
|
||||
}
|
||||
|
||||
RowLocation loc;
|
||||
// save rowset shared ptr so this rowset wouldn't delete
|
||||
RowsetSharedPtr rowset;
|
||||
@ -453,16 +464,9 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
|
||||
// if the delete sign is marked, it means that the value columns of the row
|
||||
// will not be read. So we don't need to read the missing values from the previous rows.
|
||||
// But we still need to mark the previous row on delete bitmap
|
||||
if (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0) {
|
||||
if (have_delete_sign) {
|
||||
has_default_or_nullable = true;
|
||||
use_default_or_null_flag.emplace_back(true);
|
||||
if (!_tablet_schema->has_sequence_col() && !have_input_seq_column) {
|
||||
// we can directly use delete bitmap to mark the rows with delete sign as deleted
|
||||
// if sequence column doesn't exist to eliminate reading delete sign columns in later reads
|
||||
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id,
|
||||
DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
|
||||
segment_pos);
|
||||
}
|
||||
} else {
|
||||
// partial update should not contain invisible columns
|
||||
use_default_or_null_flag.emplace_back(false);
|
||||
|
||||
@ -280,7 +280,6 @@ Status RowsetBuilder::wait_calc_delete_bitmap() {
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
SCOPED_TIMER(_wait_delete_bitmap_timer);
|
||||
RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
|
||||
RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap));
|
||||
LOG(INFO) << "Got result of calc delete bitmap task from executor, tablet_id: "
|
||||
<< _tablet->tablet_id() << ", txn_id: " << _req.txn_id;
|
||||
return Status::OK();
|
||||
|
||||
@ -3042,6 +3042,14 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
|
||||
continue;
|
||||
}
|
||||
if (is_partial_update && rowset_writer != nullptr) {
|
||||
if (delete_bitmap->contains(
|
||||
{rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
|
||||
row_id)) {
|
||||
LOG(INFO)
|
||||
<< "DEBUG: skip a delete sign column while calc_segment_delete_bitmap "
|
||||
<< "processing confict for partial update";
|
||||
continue;
|
||||
}
|
||||
// In publish version, record rows to be deleted for concurrent update
|
||||
// For example, if version 5 and 6 update a row, but version 6 only see
|
||||
// version 4 when write, and when publish version, version 5's value will
|
||||
@ -3127,26 +3135,17 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
|
||||
return Status::InternalError("Can't find tablet id: {}, maybe already dropped.",
|
||||
tablet_id());
|
||||
}
|
||||
std::vector<DeleteBitmapPtr> seg_delete_bitmaps;
|
||||
for (size_t i = 0; i < segments.size(); i++) {
|
||||
auto& seg = segments[i];
|
||||
if (token != nullptr) {
|
||||
RETURN_IF_ERROR(token->submit(tablet_ptr, rowset, seg, specified_rowsets, end_version,
|
||||
rowset_writer));
|
||||
delete_bitmap, rowset_writer));
|
||||
} else {
|
||||
DeleteBitmapPtr seg_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
|
||||
seg_delete_bitmaps.push_back(seg_delete_bitmap);
|
||||
RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[i], specified_rowsets,
|
||||
seg_delete_bitmap, end_version,
|
||||
rowset_writer));
|
||||
delete_bitmap, end_version, rowset_writer));
|
||||
}
|
||||
}
|
||||
|
||||
if (token == nullptr) {
|
||||
for (auto seg_delete_bitmap : seg_delete_bitmaps) {
|
||||
delete_bitmap->merge(*seg_delete_bitmap);
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -3311,7 +3310,6 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset)
|
||||
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap,
|
||||
cur_version - 1, token.get()));
|
||||
RETURN_IF_ERROR(token->wait());
|
||||
RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap));
|
||||
size_t total_rows = std::accumulate(
|
||||
segments.begin(), segments.end(), 0,
|
||||
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
|
||||
@ -3418,7 +3416,6 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
|
||||
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap,
|
||||
cur_version - 1, token.get(), rowset_writer));
|
||||
RETURN_IF_ERROR(token->wait());
|
||||
RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap));
|
||||
|
||||
std::stringstream ss;
|
||||
if (watch.get_elapse_time_us() < 1 * 1000 * 1000) {
|
||||
|
||||
@ -695,7 +695,6 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
|
||||
rowset, pre_rowset_ids, delete_bitmap, segments, txn_id,
|
||||
calc_delete_bitmap_token.get(), nullptr));
|
||||
static_cast<void>(calc_delete_bitmap_token->wait());
|
||||
static_cast<void>(calc_delete_bitmap_token->get_delete_bitmap(delete_bitmap));
|
||||
}
|
||||
|
||||
// Step 6.3: commit txn
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
1,1
|
||||
3,1
|
||||
5,1
|
||||
5,1
|
||||
6,1
|
||||
|
@ -0,0 +1,3 @@
|
||||
1,1
|
||||
3,1
|
||||
5,1
|
||||
|
@ -19,6 +19,7 @@
|
||||
4 4 4 4 4 0
|
||||
5 \N \N \N \N 1
|
||||
5 5 5 5 5 0
|
||||
6 \N \N \N \N 1
|
||||
|
||||
-- !2 --
|
||||
2 2 2 2 2 0
|
||||
@ -44,6 +45,7 @@
|
||||
4 4 4 4 4 0
|
||||
5 \N \N \N \N 1
|
||||
5 5 5 5 5 0
|
||||
6 \N \N \N \N 1
|
||||
|
||||
-- !2 --
|
||||
1 \N \N \N \N 1
|
||||
@ -51,6 +53,7 @@
|
||||
3 \N \N \N \N 1
|
||||
4 4 4 4 4 0
|
||||
5 \N \N \N \N 1
|
||||
6 \N \N \N \N 1
|
||||
|
||||
-- !1 --
|
||||
1 1 1
|
||||
|
||||
@ -331,5 +331,92 @@ suite("test_primary_key_partial_update_parallel", "p0") {
|
||||
qt_sql """ select id,name,score,test,dft,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__ from ${tableName} order by id;"""
|
||||
|
||||
sql """ DROP TABLE IF EXISTS ${tableName}; """
|
||||
|
||||
// case 5: partial update with delete sign in parallel
|
||||
tableName = "test_primary_key_partial_update_delete_sign"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NOT NULL COMMENT "用户 ID",
|
||||
`name` varchar(65533) NULL COMMENT "用户姓名",
|
||||
`score` int(11) NULL COMMENT "用户得分",
|
||||
`test` int(11) NULL COMMENT "null test",
|
||||
`dft` int(11) DEFAULT "4321")
|
||||
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true")
|
||||
"""
|
||||
|
||||
sql """insert into ${tableName} values
|
||||
(2, "doris2", 2000, 223, 2),
|
||||
(1, "doris", 1000, 123, 1),
|
||||
(5, "doris5", 5000, 523, 5),
|
||||
(4, "doris4", 4000, 423, 4),
|
||||
(3, "doris3", 3000, 323, 3);"""
|
||||
|
||||
t1 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,name'
|
||||
|
||||
file 'partial_update_parallel1.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t2 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,__DORIS_DELETE_SIGN__'
|
||||
|
||||
file 'partial_update_parallel4.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t3 = Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,dft'
|
||||
|
||||
file 'partial_update_parallel3.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
}
|
||||
|
||||
t1.join()
|
||||
t2.join()
|
||||
t3.join()
|
||||
|
||||
// Delete again, to make sure the result is right
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'format', 'csv'
|
||||
set 'partial_columns', 'true'
|
||||
set 'columns', 'id,__DORIS_DELETE_SIGN__'
|
||||
|
||||
file 'partial_update_parallel4.csv'
|
||||
time 10000 // limit inflight 10s
|
||||
}
|
||||
|
||||
sql "sync"
|
||||
|
||||
// don't check result here, since the different finish order of t1/t2/t3 will
|
||||
// generate different result, it's hard to check.
|
||||
|
||||
sql """ DROP TABLE IF EXISTS ${tableName}; """
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user