branch-2.1-pick: [Fix](merge-on-write) should calculate delete bitmaps between segmens before skip if tablet is in NOT_READY state in flush phase (#48056) (#48089)

pick https://github.com/apache/doris/pull/48056
This commit is contained in:
bobhan1
2025-02-21 19:53:17 +08:00
committed by GitHub
parent 469bc77a42
commit bcd01c5f24
5 changed files with 240 additions and 7 deletions

View File

@ -35,6 +35,7 @@
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "tablet_meta.h"
#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/aggregate_functions/aggregate_function_reader.h"
@ -486,6 +487,7 @@ void MemTable::shrink_memtable_by_agg() {
}
bool MemTable::need_flush() const {
DBUG_EXECUTE_IF("MemTable.need_flush", { return true; });
auto max_size = config::write_buffer_size;
if (_is_partial_update) {
auto update_columns_size = _num_columns;

View File

@ -255,13 +255,6 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
}
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_submit_delete_bitmap_timer);
// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (tablet()->tablet_state() == TABLET_NOTREADY) {
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< tablet()->tablet_id() << " txn_id: " << _req.txn_id;
return Status::OK();
}
auto* beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
@ -271,6 +264,14 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
tablet()->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap));
}
// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (_tablet->tablet_state() == TABLET_NOTREADY) {
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet->tablet_id() << " txn_id: " << _req.txn_id;
return Status::OK();
}
// For partial update, we need to fill in the entire row of data, during the calculation
// of the delete bitmap. This operation is resource-intensive, and we need to minimize
// the number of times it occurs. Therefore, we skip this operation here.

View File

@ -0,0 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
77777 77777 77777
88888 88888 88888
99999 99999 99999
-- !sql --
0

View File

@ -1543,6 +1543,74 @@ class Suite implements GroovyInterceptable {
}
}
def get_be_param = { paramName ->
def ipList = [:]
def portList = [:]
def backendId_to_params = [:]
getBackendIpHttpPort(ipList, portList)
for (String id in ipList.keySet()) {
def beIp = ipList.get(id)
def bePort = portList.get(id)
// get the config value from be
def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName))
assert code == 0
assert out.contains(paramName)
// parsing
def resultList = parseJson(out)[0]
assert resultList.size() == 4
// get original value
def paramValue = resultList[2]
backendId_to_params.put(id, paramValue)
}
logger.info("backendId_to_params: ${backendId_to_params}".toString())
return backendId_to_params
}
def set_be_param = { paramName, paramValue ->
def ipList = [:]
def portList = [:]
getBackendIpHttpPort(ipList, portList)
for (String id in ipList.keySet()) {
def beIp = ipList.get(id)
def bePort = portList.get(id)
logger.info("set be_id ${id} ${paramName} to ${paramValue}".toString())
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue))
assert out.contains("OK")
}
}
def set_original_be_param = { paramName, backendId_to_params ->
def ipList = [:]
def portList = [:]
getBackendIpHttpPort(ipList, portList)
for (String id in ipList.keySet()) {
def beIp = ipList.get(id)
def bePort = portList.get(id)
def paramValue = backendId_to_params.get(id)
logger.info("set be_id ${id} ${paramName} to ${paramValue}".toString())
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue))
assert out.contains("OK")
}
}
void setBeConfigTemporary(Map<String, Object> tempConfig, Closure actionSupplier) {
Map<String, Map<String, String>> originConf = Maps.newHashMap()
tempConfig.each{ k, v ->
originConf.put(k, get_be_param(k))
}
try {
tempConfig.each{ k, v -> set_be_param(k, v)}
actionSupplier()
} catch (Exception e) {
logger.info(e.getMessage())
throw e
} finally {
originConf.each { k, confs ->
set_original_be_param(k, confs)
}
}
}
void waiteCreateTableFinished(String tableName) {
Thread.sleep(2000);
String showCreateTable = "SHOW CREATE TABLE ${tableName}"

View File

@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_skip_calc_between_segments", "nonConcurrent") {
def table1 = "test_skip_calc_between_segments"
sql "DROP TABLE IF EXISTS ${table1} FORCE;"
sql """ CREATE TABLE IF NOT EXISTS ${table1} (
`k1` int NOT NULL,
`c1` int,
`c2` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1"); """
sql "insert into ${table1} values(99999,99999,99999);"
sql "insert into ${table1} values(88888,88888,88888);"
sql "insert into ${table1} values(77777,77777,77777);"
sql "sync;"
qt_sql "select * from ${table1} order by k1;"
def block_sc = {
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
}
def unblock_sc = {
GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
}
def block_publish = {
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
}
def unblock_publish = {
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
}
def checkSegmentNum = { rowsetNum, lastRowsetSegmentNum ->
def tablets = sql_return_maparray """ show tablets from ${table1}; """
logger.info("tablets: ${tablets}")
assertEquals(1, tablets.size())
String compactionUrl = tablets[0]["CompactionStatus"]
def (code, out, err) = curl("GET", compactionUrl)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
assert tabletJson.rowsets.size() == rowsetNum + 1
def rowset = tabletJson.rowsets.get(tabletJson.rowsets.size() - 1)
logger.info("rowset: ${rowset}")
int start_index = rowset.indexOf("]")
int end_index = rowset.indexOf("DATA")
def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
logger.info("segmentNumStr: ${segmentNumStr}")
assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr)
}
// to cause multi segments
def customBeConfig = [
doris_scanner_row_bytes : 1
]
setBeConfigTemporary(customBeConfig) {
try {
// batch_size is 4164 in csv_reader.cpp
// _batch_size is 8192 in vtablet_writer.cpp
// to cause multi segments
GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
block_publish()
// block sc to let load skip to calculate delete bitmap in flush and commit phase
block_sc()
sql "alter table ${table1} modify column c1 varchar(100);"
Thread.sleep(3000)
def t1 = Thread.start {
// load data that will have multi segments and there are duplicate keys between segments
String content = ""
(1..4096).each {
content += "${it},${it},${it}\n"
}
content += content
streamLoad {
table "${table1}"
set 'column_separator', ','
inputStream new ByteArrayInputStream(content.getBytes())
time 30000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(8192, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
}
Thread.sleep(2000)
// let sc finish and wait for tablet state to be RUNNING
unblock_sc()
waitForSchemaChangeDone {
sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """
time 1000
}
logger.info("wait for schema change done")
Thread.sleep(500)
unblock_publish()
t1.join()
// ensure that we really write multi segments
checkSegmentNum(4, 3)
qt_sql "select count() from (select k1,count() as cnt from ${table1} group by k1 having cnt > 1) A;"
} catch(Exception e) {
logger.info(e.getMessage())
throw e
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
GetDebugPoint().clearDebugPointsForAllFEs()
}
}
}