[Case](wal) Add wal group commit sink case with low disk space fault injection (#29731)
This commit is contained in:
@ -521,6 +521,7 @@ Status LoadBlockQueue::close_wal() {
|
||||
}
|
||||
|
||||
bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
|
||||
DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { return false; });
|
||||
auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
|
||||
size_t available_bytes = 0;
|
||||
{
|
||||
|
||||
@ -0,0 +1,66 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_wal_mem_back_pressure_time_out_fault_injection","nonConcurrent") {
|
||||
|
||||
|
||||
def tableName = "wal_test"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
`k` int ,
|
||||
`v` int ,
|
||||
) engine=olap
|
||||
UNIQUE KEY(k)
|
||||
DISTRIBUTED BY HASH(`k`)
|
||||
BUCKETS 32
|
||||
properties("replication_num" = "1")
|
||||
"""
|
||||
|
||||
GetDebugPoint().clearDebugPointsForAllBEs()
|
||||
|
||||
sql """ set group_commit = async_mode; """
|
||||
try {
|
||||
GetDebugPoint().enableDebugPointForAllBEs("GroupCommitBlockSink._add_blocks.return_sync_mode")
|
||||
def t1 = []
|
||||
for (int i = 0; i < 20; i++) {
|
||||
t1.add(Thread.startDaemon {
|
||||
streamLoad {
|
||||
table "${tableName}1"
|
||||
|
||||
set 'column_separator', ','
|
||||
set 'compress_type', 'GZ'
|
||||
set 'format', 'csv'
|
||||
set 'group_commit', 'async_mode'
|
||||
unset 'label'
|
||||
|
||||
file 'test_low_wal_disk_space_fault_injection.csv.gz'
|
||||
time 600000
|
||||
}
|
||||
})
|
||||
}
|
||||
t1.join()
|
||||
} catch (Exception e) {
|
||||
logger.info(e.getMessage())
|
||||
// make sure there is no exception.
|
||||
assertFalse(true)
|
||||
} finally {
|
||||
GetDebugPoint().disableDebugPointForAllBEs("GroupCommitBlockSink._add_blocks.return_sync_mode")
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user