branch-2.1-pick: [Fix](merge-on-write) should re-calculate delete bitmaps between segments if BE restart before publish (#48775) (#48873)
pick https://github.com/apache/doris/pull/48775
This commit is contained in:
@ -4109,6 +4109,9 @@ Status Tablet::calc_delete_bitmap_between_segments(
|
||||
|
||||
RETURN_IF_ERROR(calculator.calculate_all(delete_bitmap));
|
||||
|
||||
delete_bitmap->add(
|
||||
{rowset_id, DeleteBitmap::INVALID_SEGMENT_ID, DeleteBitmap::TEMP_VERSION_COMMON},
|
||||
DeleteBitmap::ROWSET_SENTINEL_MARK);
|
||||
LOG(INFO) << fmt::format(
|
||||
"construct delete bitmap between segments, "
|
||||
"tablet: {}, rowset: {}, number of segments: {}, bitmap size: {}, cost {} (us)",
|
||||
|
||||
@ -1141,6 +1141,10 @@ void DeleteBitmap::merge(const DeleteBitmap& other) {
|
||||
}
|
||||
}
|
||||
|
||||
bool DeleteBitmap::has_calculated_for_multi_segments(const RowsetId& rowset_id) const {
|
||||
return contains({rowset_id, INVALID_SEGMENT_ID, TEMP_VERSION_COMMON}, ROWSET_SENTINEL_MARK);
|
||||
}
|
||||
|
||||
// We cannot just copy the underlying memory to construct a string
|
||||
// due to equivalent objects may have different padding bytes.
|
||||
// Reading padding bytes is undefined behavior, neither copy nor
|
||||
|
||||
@ -507,6 +507,8 @@ public:
|
||||
*/
|
||||
std::shared_ptr<roaring::Roaring> get_agg(const BitmapKey& bmk) const;
|
||||
|
||||
bool has_calculated_for_multi_segments(const RowsetId& rowset_id) const;
|
||||
|
||||
class AggCachePolicy : public LRUCachePolicyTrackingManual {
|
||||
public:
|
||||
AggCachePolicy(size_t capacity)
|
||||
|
||||
@ -38,6 +38,7 @@
|
||||
#include "olap/delta_writer.h"
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/partial_update_info.h"
|
||||
#include "olap/rowset/beta_rowset.h"
|
||||
#include "olap/rowset/pending_rowset_helper.h"
|
||||
#include "olap/rowset/rowset_meta.h"
|
||||
#include "olap/rowset/rowset_meta_manager.h"
|
||||
@ -531,6 +532,16 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
// update delete_bitmap
|
||||
if (tablet_txn_info->unique_key_merge_on_write) {
|
||||
int64_t t2 = MonotonicMicros();
|
||||
if (rowset->num_segments() > 1 &&
|
||||
!tablet_txn_info->delete_bitmap->has_calculated_for_multi_segments(
|
||||
rowset->rowset_id())) {
|
||||
// delete bitmap is empty, should re-calculate delete bitmaps between segments
|
||||
std::vector<segment_v2::SegmentSharedPtr> segments;
|
||||
RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
|
||||
RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(
|
||||
rowset, segments, tablet_txn_info->delete_bitmap));
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(tablet->update_delete_bitmap(tablet_txn_info.get(), transaction_id));
|
||||
int64_t t3 = MonotonicMicros();
|
||||
stats->calc_delete_bitmap_time_us = t3 - t2;
|
||||
|
||||
@ -0,0 +1,12 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql --
|
||||
77777 77777 77777
|
||||
88888 88888 88888
|
||||
99999 99999 99999
|
||||
|
||||
-- !sql --
|
||||
4099
|
||||
|
||||
-- !dup_key_count --
|
||||
0
|
||||
|
||||
@ -0,0 +1,174 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import java.util.Date
|
||||
import java.text.SimpleDateFormat
|
||||
|
||||
import org.apache.doris.regression.suite.ClusterOptions
|
||||
import org.apache.http.HttpResponse
|
||||
import org.apache.http.client.methods.HttpPut
|
||||
import org.apache.http.impl.client.CloseableHttpClient
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.apache.http.entity.ContentType
|
||||
import org.apache.http.entity.StringEntity
|
||||
import org.apache.http.client.config.RequestConfig
|
||||
import org.apache.http.client.RedirectStrategy
|
||||
import org.apache.http.protocol.HttpContext
|
||||
import org.apache.http.HttpRequest
|
||||
import org.apache.http.impl.client.LaxRedirectStrategy
|
||||
import org.apache.http.client.methods.RequestBuilder
|
||||
import org.apache.http.entity.StringEntity
|
||||
import org.apache.http.client.methods.CloseableHttpResponse
|
||||
import org.apache.http.util.EntityUtils
|
||||
|
||||
suite("test_local_multi_segments_re_calc_in_publish", "docker") {
|
||||
|
||||
def dbName = context.config.getDbNameByFile(context.file)
|
||||
|
||||
def options = new ClusterOptions()
|
||||
options.setFeNum(1)
|
||||
options.setBeNum(1)
|
||||
options.cloudMode = false
|
||||
options.feConfigs += [
|
||||
'cloud_cluster_check_interval_second=1',
|
||||
'sys_log_verbose_modules=org',
|
||||
'heartbeat_interval_second=1'
|
||||
]
|
||||
options.beConfigs += [
|
||||
'doris_scanner_row_bytes=1' // to cause multi segments
|
||||
]
|
||||
options.enableDebugPoints()
|
||||
|
||||
docker(options) {
|
||||
GetDebugPoint().clearDebugPointsForAllFEs()
|
||||
GetDebugPoint().clearDebugPointsForAllBEs()
|
||||
|
||||
def fe = cluster.getFeByIndex(1)
|
||||
|
||||
def table1 = "test_local_multi_segments_re_calc_in_publish"
|
||||
sql "DROP TABLE IF EXISTS ${table1} FORCE;"
|
||||
sql """ CREATE TABLE IF NOT EXISTS ${table1} (
|
||||
`k1` int NOT NULL,
|
||||
`c1` int,
|
||||
`c2` int
|
||||
)UNIQUE KEY(k1)
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"disable_auto_compaction" = "true",
|
||||
"replication_num" = "1"); """
|
||||
|
||||
sql "insert into ${table1} values(99999,99999,99999);"
|
||||
sql "insert into ${table1} values(88888,88888,88888);"
|
||||
sql "insert into ${table1} values(77777,77777,77777);"
|
||||
sql "sync;"
|
||||
qt_sql "select * from ${table1} order by k1;"
|
||||
|
||||
def do_streamload_2pc_commit = { txnId ->
|
||||
def command = "curl -X PUT --location-trusted -u root:" +
|
||||
" -H txn_id:${txnId}" +
|
||||
" -H txn_operation:commit" +
|
||||
" http://${fe.host}:${fe.httpPort}/api/${dbName}/${table1}/_stream_load_2pc"
|
||||
log.info("http_stream execute 2pc: ${command}")
|
||||
|
||||
def process = command.execute()
|
||||
def code = process.waitFor()
|
||||
def out = process.text
|
||||
def json2pc = parseJson(out)
|
||||
log.info("http_stream 2pc result: ${out}".toString())
|
||||
assert code == 0
|
||||
assert "success" == json2pc.status.toLowerCase()
|
||||
}
|
||||
|
||||
def beNodes = sql_return_maparray("show backends;")
|
||||
def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0)
|
||||
def tabletBackendId = tabletStat.BackendId
|
||||
def tabletId = tabletStat.TabletId
|
||||
def be1
|
||||
for (def be : beNodes) {
|
||||
if (be.BackendId == tabletBackendId) {
|
||||
be1 = be
|
||||
}
|
||||
}
|
||||
logger.info("tablet ${tabletId} on backend ${be1.Host} with backendId=${be1.BackendId}");
|
||||
logger.info("backends: ${cluster.getBackends()}")
|
||||
int beIndex = 1
|
||||
for (def backend : cluster.getBackends()) {
|
||||
if (backend.host == be1.Host) {
|
||||
beIndex = backend.index
|
||||
break
|
||||
}
|
||||
}
|
||||
assert cluster.getBeByIndex(beIndex).backendId as String == tabletBackendId
|
||||
|
||||
try {
|
||||
// batch_size is 4164 in csv_reader.cpp
|
||||
// _batch_size is 8192 in vtablet_writer.cpp
|
||||
// to cause multi segments
|
||||
GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
|
||||
|
||||
|
||||
String txnId
|
||||
// load data that will have multi segments and there are duplicate keys between segments
|
||||
String content = ""
|
||||
(1..4096).each {
|
||||
content += "${it},${it},${it}\n"
|
||||
}
|
||||
content += content
|
||||
streamLoad {
|
||||
table "${table1}"
|
||||
set 'column_separator', ','
|
||||
inputStream new ByteArrayInputStream(content.getBytes())
|
||||
set 'two_phase_commit', 'true'
|
||||
time 30000
|
||||
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
def json = parseJson(result)
|
||||
logger.info(result)
|
||||
txnId = json.TxnId
|
||||
assert "success" == json.Status.toLowerCase()
|
||||
}
|
||||
}
|
||||
|
||||
// restart be, so that load will re-calculate delete bitmaps in publish phase
|
||||
Thread.sleep(1000)
|
||||
cluster.stopBackends(1)
|
||||
Thread.sleep(1000)
|
||||
cluster.startBackends(beIndex)
|
||||
|
||||
Thread.sleep(1000)
|
||||
do_streamload_2pc_commit(txnId)
|
||||
dockerAwaitUntil(30) {
|
||||
def result = sql_return_maparray "show transaction from ${dbName} where id = ${txnId}"
|
||||
result[0].TransactionStatus as String == "VISIBLE"
|
||||
}
|
||||
|
||||
qt_sql "select count() from ${table1};"
|
||||
qt_dup_key_count "select count() from (select k1,count() as cnt from ${table1} group by k1 having cnt > 1) A;"
|
||||
|
||||
} catch(Exception e) {
|
||||
logger.info(e.getMessage())
|
||||
throw e
|
||||
} finally {
|
||||
GetDebugPoint().clearDebugPointsForAllFEs()
|
||||
GetDebugPoint().clearDebugPointsForAllBEs()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user