[fix](group commit) Pick make group commit cancel in time (#36249) (#37404)

pick https://github.com/apache/doris/pull/36249/
This commit is contained in:
meiyi
2024-07-09 09:25:11 +08:00
committed by GitHub
parent 1a25270918
commit 1e3ab0ff8c
2 changed files with 56 additions and 1 deletions

View File

@ -142,7 +142,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
<< ", runtime_state=" << runtime_state;
}
}
_get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
_get_cond.wait_for(l, std::chrono::milliseconds(std::min(left_milliseconds, 10000L)));
}
if (runtime_state->is_cancelled()) {
auto st = Status::Cancelled<false>(runtime_state->cancel_reason());

View File

@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_group_commit_timeout", "nonConcurrent") {
def tableName = "test_group_commit_timeout"
sql """
CREATE TABLE if not exists ${tableName} (
`id` int(11) NOT NULL,
`name` varchar(100) NULL,
`score` int(11) NULL default "-1"
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"group_commit_interval_ms" = "300000"
);
"""
def query_timeout = sql """show variables where variable_name = 'query_timeout';"""
def insert_timeout = sql """show variables where variable_name = 'insert_timeout';"""
logger.info("query_timeout: ${query_timeout}, insert_timeout: ${insert_timeout}")
long start = System.currentTimeMillis()
try {
sql "SET global query_timeout = 5"
sql "SET global insert_timeout = 5"
sql "set group_commit = sync_mode"
sql "insert into ${tableName} values(1, 'a', 10)"
assertTrue(false)
} catch (Exception e) {
long end = System.currentTimeMillis()
logger.info("failed " + e.getMessage())
assertTrue(e.getMessage().contains("FragmentMgr cancel worker going to cancel timeout instance") || e.getMessage().contains("CANCELLED"))
assertTrue(end - start <= 60000)
} finally {
sql "SET global query_timeout = ${query_timeout[0][1]}"
sql "SET global insert_timeout = ${insert_timeout[0][1]}"
}
}