[fix](build index) Fix inverted index hardlink leak and missing problem (#26903)
This commit is contained in:
@ -41,6 +41,7 @@
|
||||
#include "olap/rowset/segment_v2/inverted_index_desc.h"
|
||||
#include "olap/tablet_schema.h"
|
||||
#include "olap/utils.h"
|
||||
#include "util/debug_points.h"
|
||||
#include "util/doris_metrics.h"
|
||||
|
||||
namespace doris {
|
||||
@ -208,21 +209,40 @@ Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id,
|
||||
if (fs->type() != io::FileSystemType::LOCAL) {
|
||||
return Status::InternalError("should be local file system");
|
||||
}
|
||||
|
||||
Status status;
|
||||
std::vector<string> linked_success_files;
|
||||
Defer remove_linked_files {[&]() { // clear linked files if errors happen
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "will delete linked success files due to error " << status;
|
||||
std::vector<io::Path> paths;
|
||||
for (auto& file : linked_success_files) {
|
||||
paths.emplace_back(file);
|
||||
LOG(WARNING) << "will delete linked success file " << file << " due to error";
|
||||
}
|
||||
static_cast<void>(fs->batch_delete(paths));
|
||||
LOG(WARNING) << "done delete linked success files due to error " << status;
|
||||
}
|
||||
}};
|
||||
|
||||
io::LocalFileSystem* local_fs = (io::LocalFileSystem*)fs.get();
|
||||
for (int i = 0; i < num_segments(); ++i) {
|
||||
auto dst_path = segment_file_path(dir, new_rowset_id, i + new_rowset_start_seg_id);
|
||||
bool dst_path_exist = false;
|
||||
if (!fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) {
|
||||
return Status::Error<FILE_ALREADY_EXIST>(
|
||||
status = Status::Error<FILE_ALREADY_EXIST>(
|
||||
"failed to create hard link, file already exist: {}", dst_path);
|
||||
return status;
|
||||
}
|
||||
auto src_path = segment_file_path(i);
|
||||
// TODO(lingbin): how external storage support link?
|
||||
// use copy? or keep refcount to avoid being delete?
|
||||
if (!local_fs->link_file(src_path, dst_path).ok()) {
|
||||
return Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}",
|
||||
src_path, dst_path, Errno::no());
|
||||
status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}",
|
||||
src_path, dst_path, Errno::no());
|
||||
return status;
|
||||
}
|
||||
linked_success_files.push_back(dst_path);
|
||||
for (auto& index : _schema->indexes()) {
|
||||
if (index.index_type() != IndexType::INVERTED) {
|
||||
continue;
|
||||
@ -236,25 +256,31 @@ Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id,
|
||||
InvertedIndexDescriptor::get_index_file_name(src_path, index_id);
|
||||
std::string inverted_index_dst_file_path =
|
||||
InvertedIndexDescriptor::get_index_file_name(dst_path, index_id);
|
||||
bool need_to_link = true;
|
||||
if (_schema->skip_write_index_on_load()) {
|
||||
RETURN_IF_ERROR(local_fs->exists(inverted_index_src_file_path, &need_to_link));
|
||||
if (!need_to_link) {
|
||||
LOG(INFO) << "skip create hard link to not existed file="
|
||||
<< inverted_index_src_file_path;
|
||||
}
|
||||
}
|
||||
if (need_to_link) {
|
||||
bool index_file_exists = true;
|
||||
RETURN_IF_ERROR(local_fs->exists(inverted_index_src_file_path, &index_file_exists));
|
||||
if (index_file_exists) {
|
||||
DBUG_EXECUTE_IF(
|
||||
"fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", {
|
||||
status = Status::Error<OS_ERROR>(
|
||||
"fault_inject link_file error from={}, to={}",
|
||||
inverted_index_src_file_path, inverted_index_dst_file_path);
|
||||
return status;
|
||||
});
|
||||
if (!local_fs->link_file(inverted_index_src_file_path, inverted_index_dst_file_path)
|
||||
.ok()) {
|
||||
return Status::Error<OS_ERROR>(
|
||||
status = Status::Error<OS_ERROR>(
|
||||
"fail to create hard link. from={}, to={}, errno={}",
|
||||
inverted_index_src_file_path, inverted_index_dst_file_path,
|
||||
Errno::no());
|
||||
return status;
|
||||
}
|
||||
linked_success_files.push_back(inverted_index_dst_file_path);
|
||||
LOG(INFO) << "success to create hard link. from=" << inverted_index_src_file_path
|
||||
<< ", "
|
||||
<< "to=" << inverted_index_dst_file_path;
|
||||
} else {
|
||||
LOG(WARNING) << "skip create hard link to not existed index file="
|
||||
<< inverted_index_src_file_path;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,19 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !count1 --
|
||||
1000000
|
||||
|
||||
-- !count2 --
|
||||
1000000
|
||||
|
||||
-- !count3 --
|
||||
1000000
|
||||
|
||||
-- !count4 --
|
||||
1000000
|
||||
|
||||
-- !count5 --
|
||||
1000000
|
||||
|
||||
-- !count6 --
|
||||
1000000
|
||||
|
||||
@ -0,0 +1,224 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
suite("test_build_index_fault", "inverted_index"){
|
||||
// prepare test table
|
||||
def timeout = 60000
|
||||
def delta_time = 1000
|
||||
def alter_res = "null"
|
||||
def useTime = 0
|
||||
|
||||
def wait_for_latest_op_on_table_finish = { table_name, OpTimeout ->
|
||||
for(int t = delta_time; t <= OpTimeout; t += delta_time){
|
||||
alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${table_name}" ORDER BY CreateTime DESC LIMIT 1;"""
|
||||
alter_res = alter_res.toString()
|
||||
if(alter_res.contains("FINISHED")) {
|
||||
sleep(3000) // wait change table state to normal
|
||||
logger.info(table_name + " latest alter job finished, detail: " + alter_res)
|
||||
break
|
||||
}
|
||||
useTime = t
|
||||
sleep(delta_time)
|
||||
}
|
||||
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
|
||||
}
|
||||
|
||||
def wait_for_build_index_on_partition_finish = { table_name, OpTimeout ->
|
||||
for(int t = delta_time; t <= OpTimeout; t += delta_time){
|
||||
alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}";"""
|
||||
def expected_finished_num = alter_res.size();
|
||||
def finished_num = 0;
|
||||
for (int i = 0; i < expected_finished_num; i++) {
|
||||
logger.info(table_name + " build index job state: " + alter_res[i][7] + i)
|
||||
if (alter_res[i][7] == "FINISHED") {
|
||||
++finished_num;
|
||||
}
|
||||
}
|
||||
if (finished_num == expected_finished_num) {
|
||||
logger.info(table_name + " all build index jobs finished, detail: " + alter_res)
|
||||
break
|
||||
}
|
||||
useTime = t
|
||||
sleep(delta_time)
|
||||
}
|
||||
assertTrue(useTime <= OpTimeout, "wait_for_latest_build_index_on_partition_finish timeout")
|
||||
}
|
||||
|
||||
def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout ->
|
||||
for(int t = delta_time; t <= OpTimeout; t += delta_time){
|
||||
alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """
|
||||
|
||||
def last_job_state = alter_res[alter_res.size()-1][7];
|
||||
if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") {
|
||||
logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res)
|
||||
return last_job_state;
|
||||
}
|
||||
useTime = t
|
||||
sleep(delta_time)
|
||||
}
|
||||
assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout")
|
||||
return "wait_timeout"
|
||||
}
|
||||
|
||||
def wait_for_last_build_index_on_table_running = { table_name, OpTimeout ->
|
||||
for(int t = delta_time; t <= OpTimeout; t += delta_time){
|
||||
alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """
|
||||
|
||||
def last_job_state = alter_res[alter_res.size()-1][7];
|
||||
if (last_job_state == "RUNNING") {
|
||||
logger.info(table_name + " last index job running, state: " + last_job_state + ", detail: " + alter_res)
|
||||
return last_job_state;
|
||||
}
|
||||
useTime = t
|
||||
sleep(delta_time)
|
||||
}
|
||||
assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout")
|
||||
return "wait_timeout"
|
||||
}
|
||||
|
||||
def tableName = "hackernews_1m"
|
||||
|
||||
sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
// create 1 replica table
|
||||
sql """
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` bigint(20) NULL,
|
||||
`deleted` tinyint(4) NULL,
|
||||
`type` text NULL,
|
||||
`author` text NULL,
|
||||
`timestamp` datetime NULL,
|
||||
`comment` text NULL,
|
||||
`dead` tinyint(4) NULL,
|
||||
`parent` bigint(20) NULL,
|
||||
`poll` bigint(20) NULL,
|
||||
`children` array<bigint(20)> NULL,
|
||||
`url` text NULL,
|
||||
`score` int(11) NULL,
|
||||
`title` text NULL,
|
||||
`parts` array<int(11)> NULL,
|
||||
`descendants` int(11) NULL
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 10
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"is_being_synced" = "false",
|
||||
"storage_format" = "V2",
|
||||
"light_schema_change" = "true",
|
||||
"disable_auto_compaction" = "false",
|
||||
"enable_single_replica_compaction" = "false"
|
||||
);
|
||||
"""
|
||||
|
||||
// stream load data
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
set 'compress_type', 'GZ'
|
||||
|
||||
file """${getS3Url()}/regression/index/hacknernews_1m.csv.gz"""
|
||||
|
||||
time 60000 // limit inflight 60s
|
||||
|
||||
// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows
|
||||
|
||||
// if declared a check callback, the default check condition will ignore.
|
||||
// So you must check all condition
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
|
||||
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
|
||||
}
|
||||
}
|
||||
|
||||
sql "sync"
|
||||
|
||||
// check data
|
||||
qt_count1 """ SELECT COUNT() from ${tableName}; """
|
||||
|
||||
// ADD INDEX
|
||||
sql """ ALTER TABLE ${tableName} ADD INDEX idx_comment (`comment`) USING INVERTED PROPERTIES("parser" = "english") """
|
||||
|
||||
wait_for_latest_op_on_table_finish(tableName, timeout)
|
||||
|
||||
// BUILD INDEX and expect state is RUNNING
|
||||
sql """ BUILD INDEX idx_comment ON ${tableName} """
|
||||
def state = wait_for_last_build_index_on_table_running(tableName, timeout)
|
||||
def result = sql """ SHOW BUILD INDEX WHERE TableName = "${tableName}" ORDER BY JobId """
|
||||
assertEquals(result[result.size()-1][1], tableName)
|
||||
assertTrue(result[result.size()-1][3].contains("ADD INDEX"))
|
||||
assertEquals(result[result.size()-1][7], "RUNNING")
|
||||
|
||||
// CANCEL BUILD INDEX and expect state is CANCELED
|
||||
sql """ CANCEL BUILD INDEX ON ${tableName} (${result[result.size()-1][0]}) """
|
||||
result = sql """ SHOW BUILD INDEX WHERE TableName = "${tableName}" ORDER BY JobId """
|
||||
assertEquals(result[result.size()-1][1], tableName)
|
||||
assertTrue(result[result.size()-1][3].contains("ADD INDEX"))
|
||||
assertEquals(result[result.size()-1][7], "CANCELLED")
|
||||
assertEquals(result[result.size()-1][8], "user cancelled")
|
||||
// check data
|
||||
qt_count2 """ SELECT COUNT() from ${tableName}; """
|
||||
|
||||
// BUILD INDEX and expect state is FINISHED
|
||||
sql """ BUILD INDEX idx_comment ON ${tableName}; """
|
||||
state = wait_for_last_build_index_on_table_finish(tableName, timeout)
|
||||
assertEquals(state, "FINISHED")
|
||||
// check data
|
||||
qt_count3 """ SELECT COUNT() from ${tableName}; """
|
||||
|
||||
// CANCEL BUILD INDEX in FINISHED state and expect exception
|
||||
def success = false;
|
||||
try {
|
||||
sql """ CANCEL BUILD INDEX ON ${tableName}; """
|
||||
success = true
|
||||
} catch(Exception ex) {
|
||||
logger.info(" CANCEL BUILD INDEX ON ${tableName} exception: " + ex)
|
||||
}
|
||||
assertFalse(success)
|
||||
|
||||
// BUILD INDEX again and expect state is FINISHED
|
||||
sql """ BUILD INDEX idx_comment ON ${tableName}; """
|
||||
state = wait_for_last_build_index_on_table_finish(tableName, timeout)
|
||||
assertEquals(state, "FINISHED")
|
||||
// check data
|
||||
qt_count4 """ SELECT COUNT() from ${tableName}; """
|
||||
|
||||
// BUILD INDEX with error injection
|
||||
sql """ ALTER TABLE ${tableName} ADD INDEX idx_title (`title`) USING INVERTED """
|
||||
// enable error_inject for BetaRowset link inverted index file and expect state is RUNNGING
|
||||
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file")
|
||||
sql """ BUILD INDEX idx_title ON ${tableName}; """
|
||||
state = wait_for_last_build_index_on_table_finish(tableName, timeout)
|
||||
assertEquals(state, "wait_timeout")
|
||||
// check data
|
||||
qt_count5 """ SELECT COUNT() from ${tableName}; """
|
||||
|
||||
// disable error_inject for BetaRowset link inverted index file and expect state is FINISHED
|
||||
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file")
|
||||
// timeout * 10 for possible fe schedule delay
|
||||
state = wait_for_last_build_index_on_table_finish(tableName, timeout * 10)
|
||||
assertEquals(state, "FINISHED")
|
||||
// check data
|
||||
qt_count6 """ SELECT COUNT() from ${tableName}; """
|
||||
}
|
||||
Reference in New Issue
Block a user