[improve](move-memtable) cancel load immediately when back pressure in delta writer v2 (#29280)

This commit is contained in:
HHoflittlefish777
2023-12-30 10:45:06 +08:00
committed by GitHub
parent 94cbabf675
commit 51cb15d032
5 changed files with 132 additions and 12 deletions

View File

@ -65,16 +65,18 @@ namespace doris {
using namespace ErrorCode;
std::unique_ptr<DeltaWriterV2> DeltaWriterV2::open(
WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams) {
WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
RuntimeState* state) {
std::unique_ptr<DeltaWriterV2> writer(
new DeltaWriterV2(req, streams, StorageEngine::instance()));
new DeltaWriterV2(req, streams, StorageEngine::instance(), state));
return writer;
}
DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
StorageEngine* storage_engine)
: _req(*req),
StorageEngine* storage_engine, RuntimeState* state)
: _state(state),
_req(*req),
_tablet_schema(new TabletSchema),
_memtable_writer(new MemTableWriter(*req)),
_streams(streams) {}
@ -158,8 +160,13 @@ Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<ui
}
{
SCOPED_RAW_TIMER(&_wait_flush_limit_time);
while (_memtable_writer->flush_running_count() >=
config::memtable_flush_running_count_limit) {
auto memtable_flush_running_count_limit = config::memtable_flush_running_count_limit;
DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure",
{ memtable_flush_running_count_limit = 0; });
while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) {
if (_state->is_cancelled()) {
return Status::Cancelled(_state->cancel_reason());
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

View File

@ -64,7 +64,8 @@ class Block;
class DeltaWriterV2 {
public:
static std::unique_ptr<DeltaWriterV2> open(
WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams);
WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
RuntimeState* state);
~DeltaWriterV2();
@ -88,7 +89,7 @@ public:
private:
DeltaWriterV2(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
StorageEngine* storage_engine);
StorageEngine* storage_engine, RuntimeState* state);
void _build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
@ -96,6 +97,8 @@ private:
void _update_profile(RuntimeProfile* profile);
RuntimeState* _state = nullptr;
bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;

View File

@ -438,7 +438,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
break;
}
}
return DeltaWriterV2::open(&req, streams);
return DeltaWriterV2::open(&req, streams, _state);
});
{
SCOPED_TIMER(_wait_mem_limit_timer);

View File

@ -56,9 +56,13 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
auto map = pool.get_or_create(load_id);
EXPECT_EQ(1, pool.size());
WriteRequest req;
auto writer = map->get_or_create(100, [&req]() { return DeltaWriterV2::open(&req, {}); });
auto writer2 = map->get_or_create(101, [&req]() { return DeltaWriterV2::open(&req, {}); });
auto writer3 = map->get_or_create(100, [&req]() { return DeltaWriterV2::open(&req, {}); });
RuntimeState state;
auto writer = map->get_or_create(
100, [&req, &state]() { return DeltaWriterV2::open(&req, {}, &state); });
auto writer2 = map->get_or_create(
101, [&req, &state]() { return DeltaWriterV2::open(&req, {}, &state); });
auto writer3 = map->get_or_create(
100, [&req, &state]() { return DeltaWriterV2::open(&req, {}, &state); });
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
EXPECT_NE(writer, writer2);

View File

@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.util.Http
suite("test_delta_writer_v2_back_pressure_fault_injection", "nonConcurrent") {
sql """ set enable_memtable_on_sink_node=true """
sql """ DROP TABLE IF EXISTS `baseall` """
sql """ DROP TABLE IF EXISTS `test` """
sql """
CREATE TABLE IF NOT EXISTS `baseall` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""
sql """
CREATE TABLE IF NOT EXISTS `test` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace_if_not_null null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""
GetDebugPoint().clearDebugPointsForAllBEs()
streamLoad {
table "baseall"
db "regression_test_fault_injection_p0"
set 'column_separator', ','
file "baseall.txt"
}
try {
GetDebugPoint().enableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure")
def thread1 = new Thread({
try {
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("Communications link failure"))
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure")
}
})
thread1.start()
sleep(1000)
def processList = sql "show processlist"
logger.info(processList.toString())
processList.each { item ->
logger.info(item[1].toString())
logger.info(item[11].toString())
if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){
def res = sql "kill ${item[1]}"
logger.info(res.toString())
}
}
} catch(Exception e) {
logger.info(e.getMessage())
}
sql """ DROP TABLE IF EXISTS `baseall` """
sql """ DROP TABLE IF EXISTS `test` """
sql """ set enable_memtable_on_sink_node=false """
}