diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp b/be/src/olap/calc_delete_bitmap_executor.cpp index b7d1d5cd3c..363a3d4019 100644 --- a/be/src/olap/calc_delete_bitmap_executor.cpp +++ b/be/src/olap/calc_delete_bitmap_executor.cpp @@ -33,20 +33,16 @@ using namespace ErrorCode; Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset, const segment_v2::SegmentSharedPtr& cur_segment, const std::vector& target_rowsets, - int64_t end_version, RowsetWriter* rowset_writer) { + int64_t end_version, DeleteBitmapPtr delete_bitmap, + RowsetWriter* rowset_writer) { { std::shared_lock rlock(_lock); RETURN_IF_ERROR(_status); } - DeleteBitmapPtr bitmap = std::make_shared(tablet->tablet_id()); - { - std::lock_guard wlock(_lock); - _delete_bitmaps.push_back(bitmap); - } return _thread_token->submit_func([=, this]() { auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, target_rowsets, - bitmap, end_version, rowset_writer); + delete_bitmap, end_version, rowset_writer); if (!st.ok()) { LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: " << tablet->tablet_id() << " rowset: " << cur_rowset->rowset_id() @@ -66,17 +62,6 @@ Status CalcDeleteBitmapToken::wait() { return _status; } -Status CalcDeleteBitmapToken::get_delete_bitmap(DeleteBitmapPtr res_bitmap) { - std::lock_guard wlock(_lock); - RETURN_IF_ERROR(_status); - - for (auto bitmap : _delete_bitmaps) { - res_bitmap->merge(*bitmap); - } - _delete_bitmaps.clear(); - return Status::OK(); -} - void CalcDeleteBitmapExecutor::init() { static_cast(ThreadPoolBuilder("TabletCalcDeleteBitmapThreadPool") .set_min_threads(1) diff --git a/be/src/olap/calc_delete_bitmap_executor.h b/be/src/olap/calc_delete_bitmap_executor.h index d2c392a04d..02e8867151 100644 --- a/be/src/olap/calc_delete_bitmap_executor.h +++ b/be/src/olap/calc_delete_bitmap_executor.h @@ -52,21 +52,17 @@ public: Status submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset, const segment_v2::SegmentSharedPtr& cur_segment, const std::vector& target_rowsets, int64_t end_version, - RowsetWriter* rowset_writer); + DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer); // wait all tasks in token to be completed. Status wait(); void cancel() { _thread_token->shutdown(); } - Status get_delete_bitmap(DeleteBitmapPtr res_bitmap); - private: std::unique_ptr _thread_token; std::shared_mutex _lock; - std::vector _delete_bitmaps; - // Records the current status of the calc delete bitmap job. // Note: Once its value is set to Failed, it cannot return to SUCCESS. Status _status; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 3c76c206e6..9e2073b9fb 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -422,6 +422,17 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } _maybe_invalid_row_cache(key); + // mark key with delete sign as deleted. + bool have_delete_sign = + (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0); + if (have_delete_sign && !_tablet_schema->has_sequence_col() && !have_input_seq_column) { + // we can directly use delete bitmap to mark the rows with delete sign as deleted + // if sequence column doesn't exist to eliminate reading delete sign columns in later reads + _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, + DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN}, + segment_pos); + } + RowLocation loc; // save rowset shared ptr so this rowset wouldn't delete RowsetSharedPtr rowset; @@ -453,16 +464,9 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* // if the delete sign is marked, it means that the value columns of the row // will not be read. So we don't need to read the missing values from the previous rows. // But we still need to mark the previous row on delete bitmap - if (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0) { + if (have_delete_sign) { has_default_or_nullable = true; use_default_or_null_flag.emplace_back(true); - if (!_tablet_schema->has_sequence_col() && !have_input_seq_column) { - // we can directly use delete bitmap to mark the rows with delete sign as deleted - // if sequence column doesn't exist to eliminate reading delete sign columns in later reads - _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, - DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN}, - segment_pos); - } } else { // partial update should not contain invisible columns use_default_or_null_flag.emplace_back(false); diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 022fcac2a7..85b7268300 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -280,7 +280,6 @@ Status RowsetBuilder::wait_calc_delete_bitmap() { std::lock_guard l(_lock); SCOPED_TIMER(_wait_delete_bitmap_timer); RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); - RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap)); LOG(INFO) << "Got result of calc delete bitmap task from executor, tablet_id: " << _tablet->tablet_id() << ", txn_id: " << _req.txn_id; return Status::OK(); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 5b7c07fbd1..2fb03ac45b 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3042,6 +3042,14 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, continue; } if (is_partial_update && rowset_writer != nullptr) { + if (delete_bitmap->contains( + {rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN}, + row_id)) { + LOG(INFO) + << "DEBUG: skip a delete sign column while calc_segment_delete_bitmap " + << "processing confict for partial update"; + continue; + } // In publish version, record rows to be deleted for concurrent update // For example, if version 5 and 6 update a row, but version 6 only see // version 4 when write, and when publish version, version 5's value will @@ -3127,26 +3135,17 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, return Status::InternalError("Can't find tablet id: {}, maybe already dropped.", tablet_id()); } - std::vector seg_delete_bitmaps; for (size_t i = 0; i < segments.size(); i++) { auto& seg = segments[i]; if (token != nullptr) { RETURN_IF_ERROR(token->submit(tablet_ptr, rowset, seg, specified_rowsets, end_version, - rowset_writer)); + delete_bitmap, rowset_writer)); } else { - DeleteBitmapPtr seg_delete_bitmap = std::make_shared(tablet_id()); - seg_delete_bitmaps.push_back(seg_delete_bitmap); RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[i], specified_rowsets, - seg_delete_bitmap, end_version, - rowset_writer)); + delete_bitmap, end_version, rowset_writer)); } } - if (token == nullptr) { - for (auto seg_delete_bitmap : seg_delete_bitmaps) { - delete_bitmap->merge(*seg_delete_bitmap); - } - } return Status::OK(); } @@ -3311,7 +3310,6 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, cur_version - 1, token.get())); RETURN_IF_ERROR(token->wait()); - RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap)); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); @@ -3418,7 +3416,6 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, cur_version - 1, token.get(), rowset_writer)); RETURN_IF_ERROR(token->wait()); - RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap)); std::stringstream ss; if (watch.get_elapse_time_us() < 1 * 1000 * 1000) { diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 7addca7aca..4374a789ca 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -695,7 +695,6 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, rowset, pre_rowset_ids, delete_bitmap, segments, txn_id, calc_delete_bitmap_token.get(), nullptr)); static_cast(calc_delete_bitmap_token->wait()); - static_cast(calc_delete_bitmap_token->get_delete_bitmap(delete_bitmap)); } // Step 6.3: commit txn diff --git a/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv b/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv index 712d86b9ab..332000d696 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv +++ b/regression-test/data/unique_with_mow_p0/partial_update/delete_sign.csv @@ -1,3 +1,4 @@ 1,1 3,1 -5,1 \ No newline at end of file +5,1 +6,1 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel4.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel4.csv new file mode 100644 index 0000000000..0a7cbd412f --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel4.csv @@ -0,0 +1,3 @@ +1,1 +3,1 +5,1 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out index eb0a501090..baf484fd3d 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out @@ -19,6 +19,7 @@ 4 4 4 4 4 0 5 \N \N \N \N 1 5 5 5 5 5 0 +6 \N \N \N \N 1 -- !2 -- 2 2 2 2 2 0 @@ -44,6 +45,7 @@ 4 4 4 4 4 0 5 \N \N \N \N 1 5 5 5 5 5 0 +6 \N \N \N \N 1 -- !2 -- 1 \N \N \N \N 1 @@ -51,6 +53,7 @@ 3 \N \N \N \N 1 4 4 4 4 4 0 5 \N \N \N \N 1 +6 \N \N \N \N 1 -- !1 -- 1 1 1 diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy index ba0c1766aa..9a96c3358e 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy @@ -331,5 +331,92 @@ suite("test_primary_key_partial_update_parallel", "p0") { qt_sql """ select id,name,score,test,dft,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__ from ${tableName} order by id;""" sql """ DROP TABLE IF EXISTS ${tableName}; """ + + // case 5: partial update with delete sign in parallel + tableName = "test_primary_key_partial_update_delete_sign" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) NULL COMMENT "用户姓名", + `score` int(11) NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321") + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true") + """ + + sql """insert into ${tableName} values + (2, "doris2", 2000, 223, 2), + (1, "doris", 1000, 123, 1), + (5, "doris5", 5000, 523, 5), + (4, "doris4", 4000, 423, 4), + (3, "doris3", 3000, 323, 3);""" + + t1 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,name' + + file 'partial_update_parallel1.csv' + time 10000 // limit inflight 10s + } + } + + t2 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,__DORIS_DELETE_SIGN__' + + file 'partial_update_parallel4.csv' + time 10000 // limit inflight 10s + } + } + + t3 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,dft' + + file 'partial_update_parallel3.csv' + time 10000 // limit inflight 10s + } + } + + t1.join() + t2.join() + t3.join() + + // Delete again, to make sure the result is right + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,__DORIS_DELETE_SIGN__' + + file 'partial_update_parallel4.csv' + time 10000 // limit inflight 10s + } + + sql "sync" + + // don't check result here, since the different finish order of t1/t2/t3 will + // generate different result, it's hard to check. + + sql """ DROP TABLE IF EXISTS ${tableName}; """ }