From da71fde0660d0cb4e78858fc0ba56caeb7eaed0f Mon Sep 17 00:00:00 2001 From: Kang Date: Fri, 24 Nov 2023 10:30:21 +0800 Subject: [PATCH] [fix](build index) Fix inverted index hardlink leak and missing problem (#26903) --- be/src/olap/rowset/beta_rowset.cpp | 52 +++- .../test_build_index_fault.out | 19 ++ .../test_build_index_fault.groovy | 224 ++++++++++++++++++ 3 files changed, 282 insertions(+), 13 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/test_build_index_fault.out create mode 100644 regression-test/suites/fault_injection_p0/test_build_index_fault.groovy diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 5bc4a16e8b..67fbb6c020 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -41,6 +41,7 @@ #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/tablet_schema.h" #include "olap/utils.h" +#include "util/debug_points.h" #include "util/doris_metrics.h" namespace doris { @@ -208,21 +209,40 @@ Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id, if (fs->type() != io::FileSystemType::LOCAL) { return Status::InternalError("should be local file system"); } + + Status status; + std::vector linked_success_files; + Defer remove_linked_files {[&]() { // clear linked files if errors happen + if (!status.ok()) { + LOG(WARNING) << "will delete linked success files due to error " << status; + std::vector paths; + for (auto& file : linked_success_files) { + paths.emplace_back(file); + LOG(WARNING) << "will delete linked success file " << file << " due to error"; + } + static_cast(fs->batch_delete(paths)); + LOG(WARNING) << "done delete linked success files due to error " << status; + } + }}; + io::LocalFileSystem* local_fs = (io::LocalFileSystem*)fs.get(); for (int i = 0; i < num_segments(); ++i) { auto dst_path = segment_file_path(dir, new_rowset_id, i + new_rowset_start_seg_id); bool dst_path_exist = false; if (!fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) { - return Status::Error( + status = Status::Error( "failed to create hard link, file already exist: {}", dst_path); + return status; } auto src_path = segment_file_path(i); // TODO(lingbin): how external storage support link? // use copy? or keep refcount to avoid being delete? if (!local_fs->link_file(src_path, dst_path).ok()) { - return Status::Error("fail to create hard link. from={}, to={}, errno={}", - src_path, dst_path, Errno::no()); + status = Status::Error("fail to create hard link. from={}, to={}, errno={}", + src_path, dst_path, Errno::no()); + return status; } + linked_success_files.push_back(dst_path); for (auto& index : _schema->indexes()) { if (index.index_type() != IndexType::INVERTED) { continue; @@ -236,25 +256,31 @@ Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id, InvertedIndexDescriptor::get_index_file_name(src_path, index_id); std::string inverted_index_dst_file_path = InvertedIndexDescriptor::get_index_file_name(dst_path, index_id); - bool need_to_link = true; - if (_schema->skip_write_index_on_load()) { - RETURN_IF_ERROR(local_fs->exists(inverted_index_src_file_path, &need_to_link)); - if (!need_to_link) { - LOG(INFO) << "skip create hard link to not existed file=" - << inverted_index_src_file_path; - } - } - if (need_to_link) { + bool index_file_exists = true; + RETURN_IF_ERROR(local_fs->exists(inverted_index_src_file_path, &index_file_exists)); + if (index_file_exists) { + DBUG_EXECUTE_IF( + "fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", { + status = Status::Error( + "fault_inject link_file error from={}, to={}", + inverted_index_src_file_path, inverted_index_dst_file_path); + return status; + }); if (!local_fs->link_file(inverted_index_src_file_path, inverted_index_dst_file_path) .ok()) { - return Status::Error( + status = Status::Error( "fail to create hard link. from={}, to={}, errno={}", inverted_index_src_file_path, inverted_index_dst_file_path, Errno::no()); + return status; } + linked_success_files.push_back(inverted_index_dst_file_path); LOG(INFO) << "success to create hard link. from=" << inverted_index_src_file_path << ", " << "to=" << inverted_index_dst_file_path; + } else { + LOG(WARNING) << "skip create hard link to not existed index file=" + << inverted_index_src_file_path; } } } diff --git a/regression-test/data/fault_injection_p0/test_build_index_fault.out b/regression-test/data/fault_injection_p0/test_build_index_fault.out new file mode 100644 index 0000000000..543ca7ae5f --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_build_index_fault.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !count1 -- +1000000 + +-- !count2 -- +1000000 + +-- !count3 -- +1000000 + +-- !count4 -- +1000000 + +-- !count5 -- +1000000 + +-- !count6 -- +1000000 + diff --git a/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy b/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy new file mode 100644 index 0000000000..3c8e7bc57f --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +suite("test_build_index_fault", "inverted_index"){ + // prepare test table + def timeout = 60000 + def delta_time = 1000 + def alter_res = "null" + def useTime = 0 + + def wait_for_latest_op_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${table_name}" ORDER BY CreateTime DESC LIMIT 1;""" + alter_res = alter_res.toString() + if(alter_res.contains("FINISHED")) { + sleep(3000) // wait change table state to normal + logger.info(table_name + " latest alter job finished, detail: " + alter_res) + break + } + useTime = t + sleep(delta_time) + } + assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout") + } + + def wait_for_build_index_on_partition_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}";""" + def expected_finished_num = alter_res.size(); + def finished_num = 0; + for (int i = 0; i < expected_finished_num; i++) { + logger.info(table_name + " build index job state: " + alter_res[i][7] + i) + if (alter_res[i][7] == "FINISHED") { + ++finished_num; + } + } + if (finished_num == expected_finished_num) { + logger.info(table_name + " all build index jobs finished, detail: " + alter_res) + break + } + useTime = t + sleep(delta_time) + } + assertTrue(useTime <= OpTimeout, "wait_for_latest_build_index_on_partition_finish timeout") + } + + def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") { + logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + useTime = t + sleep(delta_time) + } + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout") + return "wait_timeout" + } + + def wait_for_last_build_index_on_table_running = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "RUNNING") { + logger.info(table_name + " last index job running, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + useTime = t + sleep(delta_time) + } + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout") + return "wait_timeout" + } + + def tableName = "hackernews_1m" + + sql "DROP TABLE IF EXISTS ${tableName}" + // create 1 replica table + sql """ + CREATE TABLE ${tableName} ( + `id` bigint(20) NULL, + `deleted` tinyint(4) NULL, + `type` text NULL, + `author` text NULL, + `timestamp` datetime NULL, + `comment` text NULL, + `dead` tinyint(4) NULL, + `parent` bigint(20) NULL, + `poll` bigint(20) NULL, + `children` array NULL, + `url` text NULL, + `score` int(11) NULL, + `title` text NULL, + `parts` array NULL, + `descendants` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "is_being_synced" = "false", + "storage_format" = "V2", + "light_schema_change" = "true", + "disable_auto_compaction" = "false", + "enable_single_replica_compaction" = "false" + ); + """ + + // stream load data + streamLoad { + table "${tableName}" + + set 'compress_type', 'GZ' + + file """${getS3Url()}/regression/index/hacknernews_1m.csv.gz""" + + time 60000 // limit inflight 60s + + // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + + sql "sync" + + // check data + qt_count1 """ SELECT COUNT() from ${tableName}; """ + + // ADD INDEX + sql """ ALTER TABLE ${tableName} ADD INDEX idx_comment (`comment`) USING INVERTED PROPERTIES("parser" = "english") """ + + wait_for_latest_op_on_table_finish(tableName, timeout) + + // BUILD INDEX and expect state is RUNNING + sql """ BUILD INDEX idx_comment ON ${tableName} """ + def state = wait_for_last_build_index_on_table_running(tableName, timeout) + def result = sql """ SHOW BUILD INDEX WHERE TableName = "${tableName}" ORDER BY JobId """ + assertEquals(result[result.size()-1][1], tableName) + assertTrue(result[result.size()-1][3].contains("ADD INDEX")) + assertEquals(result[result.size()-1][7], "RUNNING") + + // CANCEL BUILD INDEX and expect state is CANCELED + sql """ CANCEL BUILD INDEX ON ${tableName} (${result[result.size()-1][0]}) """ + result = sql """ SHOW BUILD INDEX WHERE TableName = "${tableName}" ORDER BY JobId """ + assertEquals(result[result.size()-1][1], tableName) + assertTrue(result[result.size()-1][3].contains("ADD INDEX")) + assertEquals(result[result.size()-1][7], "CANCELLED") + assertEquals(result[result.size()-1][8], "user cancelled") + // check data + qt_count2 """ SELECT COUNT() from ${tableName}; """ + + // BUILD INDEX and expect state is FINISHED + sql """ BUILD INDEX idx_comment ON ${tableName}; """ + state = wait_for_last_build_index_on_table_finish(tableName, timeout) + assertEquals(state, "FINISHED") + // check data + qt_count3 """ SELECT COUNT() from ${tableName}; """ + + // CANCEL BUILD INDEX in FINISHED state and expect exception + def success = false; + try { + sql """ CANCEL BUILD INDEX ON ${tableName}; """ + success = true + } catch(Exception ex) { + logger.info(" CANCEL BUILD INDEX ON ${tableName} exception: " + ex) + } + assertFalse(success) + + // BUILD INDEX again and expect state is FINISHED + sql """ BUILD INDEX idx_comment ON ${tableName}; """ + state = wait_for_last_build_index_on_table_finish(tableName, timeout) + assertEquals(state, "FINISHED") + // check data + qt_count4 """ SELECT COUNT() from ${tableName}; """ + + // BUILD INDEX with error injection + sql """ ALTER TABLE ${tableName} ADD INDEX idx_title (`title`) USING INVERTED """ + // enable error_inject for BetaRowset link inverted index file and expect state is RUNNGING + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file") + sql """ BUILD INDEX idx_title ON ${tableName}; """ + state = wait_for_last_build_index_on_table_finish(tableName, timeout) + assertEquals(state, "wait_timeout") + // check data + qt_count5 """ SELECT COUNT() from ${tableName}; """ + + // disable error_inject for BetaRowset link inverted index file and expect state is FINISHED + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file") + // timeout * 10 for possible fe schedule delay + state = wait_for_last_build_index_on_table_finish(tableName, timeout * 10) + assertEquals(state, "FINISHED") + // check data + qt_count6 """ SELECT COUNT() from ${tableName}; """ +}