[fix](inverted index) reset fs_writer to nullptr before throw exception (#27202)

This commit is contained in:
qiye
2023-11-20 17:40:56 +08:00
committed by GitHub
parent 7164b7cebb
commit 273cbfc36c
4 changed files with 1171 additions and 13 deletions

View File

@ -22,6 +22,7 @@
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
#include "io/fs/path.h"
#include "util/debug_points.h"
#include "util/slice.h"
#ifdef _CL_HAVE_IO_H
@ -239,7 +240,7 @@ void DorisCompoundFileWriter::copyFile(const char* fileName, lucene::store::Inde
class DorisCompoundDirectory::FSIndexOutput : public lucene::store::BufferedIndexOutput {
private:
io::FileWriterPtr writer;
io::FileWriterPtr _writer;
protected:
void flushBuffer(const uint8_t* b, const int32_t size) override;
@ -379,9 +380,9 @@ void DorisCompoundDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_
void DorisCompoundDirectory::FSIndexOutput::init(const io::FileSystemSPtr& fileSystem,
const char* path) {
Status status = fileSystem->create_file(path, &writer);
Status status = fileSystem->create_file(path, &_writer);
if (!status.ok()) {
writer.reset(nullptr);
_writer.reset(nullptr);
auto err = "Create compound file error: " + status.to_string();
LOG(WARNING) << err;
_CLTHROWA(CL_ERR_IO, err.c_str());
@ -389,9 +390,16 @@ void DorisCompoundDirectory::FSIndexOutput::init(const io::FileSystemSPtr& fileS
}
DorisCompoundDirectory::FSIndexOutput::~FSIndexOutput() {
if (writer) {
if (_writer) {
try {
FSIndexOutput::close();
DBUG_EXECUTE_IF(
"DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_"
"destructor",
{
_CLTHROWA(CL_ERR_IO,
"debug point: test throw error in fsindexoutput destructor");
})
} catch (CLuceneError& err) {
//ignore errors...
LOG(WARNING) << "FSIndexOutput deconstruct error: " << err.what();
@ -400,14 +408,14 @@ DorisCompoundDirectory::FSIndexOutput::~FSIndexOutput() {
}
void DorisCompoundDirectory::FSIndexOutput::flushBuffer(const uint8_t* b, const int32_t size) {
if (writer != nullptr && b != nullptr && size > 0) {
if (_writer != nullptr && b != nullptr && size > 0) {
Slice data {b, (size_t)size};
Status st = writer->append(data);
Status st = _writer->append(data);
if (!st.ok()) {
LOG(WARNING) << "File IO Write error: " << st.to_string();
}
} else {
if (writer == nullptr) {
if (_writer == nullptr) {
LOG(WARNING) << "File writer is nullptr in DorisCompoundDirectory::FSIndexOutput, "
"ignore flush.";
} else if (b == nullptr) {
@ -420,35 +428,49 @@ void DorisCompoundDirectory::FSIndexOutput::flushBuffer(const uint8_t* b, const
void DorisCompoundDirectory::FSIndexOutput::close() {
try {
BufferedIndexOutput::close();
DBUG_EXECUTE_IF(
"DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_"
"close",
{
_CLTHROWA(CL_ERR_IO,
"debug point: test throw error in bufferedindexoutput close");
})
} catch (CLuceneError& err) {
LOG(WARNING) << "FSIndexOutput close, BufferedIndexOutput close error: " << err.what();
if (err.number() == CL_ERR_IO) {
LOG(WARNING) << "FSIndexOutput close, BufferedIndexOutput close IO error: "
<< err.what();
}
_writer.reset(nullptr);
_CLTHROWA(err.number(), err.what());
}
if (writer) {
Status ret = writer->finalize();
if (_writer) {
Status ret = _writer->finalize();
DBUG_EXECUTE_IF("DorisCompoundDirectory::FSIndexOutput._set_writer_finalize_status_error",
{ ret = Status::Error<INTERNAL_ERROR>("writer finalize status error"); })
if (!ret.ok()) {
LOG(WARNING) << "FSIndexOutput close, file writer finalize error: " << ret.to_string();
_writer.reset(nullptr);
_CLTHROWA(CL_ERR_IO, ret.to_string().c_str());
}
ret = writer->close();
ret = _writer->close();
DBUG_EXECUTE_IF("DorisCompoundDirectory::FSIndexOutput._set_writer_close_status_error",
{ ret = Status::Error<INTERNAL_ERROR>("writer close status error"); })
if (!ret.ok()) {
LOG(WARNING) << "FSIndexOutput close, file writer close error: " << ret.to_string();
_writer.reset(nullptr);
_CLTHROWA(CL_ERR_IO, ret.to_string().c_str());
}
} else {
LOG(WARNING) << "File writer is nullptr, ignore finalize and close.";
}
writer = nullptr;
_writer.reset(nullptr);
}
int64_t DorisCompoundDirectory::FSIndexOutput::length() const {
CND_PRECONDITION(writer != nullptr, "file is not open");
CND_PRECONDITION(_writer != nullptr, "file is not open");
int64_t ret;
if (!writer->fs()->file_size(writer->path(), &ret).ok()) {
if (!_writer->fs()->file_size(_writer->path(), &ret).ok()) {
return -1;
}
return ret;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
863
-- !sql --
863
-- !sql --
863
-- !sql --
863

View File

@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_index_compound_directory_failure_injection", "nonConcurrent") {
// define a sql table
def testTable_dup = "httplogs_dup_compound"
def create_httplogs_dup_table = {testTablex ->
// multi-line sql
def result = sql """
CREATE TABLE IF NOT EXISTS ${testTablex} (
`@timestamp` int(11) NULL,
`clientip` varchar(20) NULL,
`request` text NULL,
`status` int(11) NULL,
`size` int(11) NULL,
INDEX size_idx (`size`) USING INVERTED COMMENT '',
INDEX status_idx (`status`) USING INVERTED COMMENT '',
INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser"="english") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`)
COMMENT 'OLAP'
PARTITION BY RANGE(`@timestamp`)
(PARTITION p181998 VALUES [("-2147483648"), ("894225602")),
PARTITION p191998 VALUES [("894225602"), ("894830402")),
PARTITION p201998 VALUES [("894830402"), ("895435201")),
PARTITION p211998 VALUES [("895435201"), ("896040001")),
PARTITION p221998 VALUES [("896040001"), ("896644801")),
PARTITION p231998 VALUES [("896644801"), ("897249601")),
PARTITION p241998 VALUES [("897249601"), ("897854300")),
PARTITION p251998 VALUES [("897854300"), ("2147483647")))
DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 12
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2",
"compression" = "ZSTD",
"light_schema_change" = "true",
"disable_auto_compaction" = "false"
);
"""
}
def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
expected_succ_rows = -1, load_to_single_tablet = 'true' ->
// load the json data
streamLoad {
table "${table_name}"
// set http request header params
set 'label', label + "_" + UUID.randomUUID().toString()
set 'read_json_by_line', read_flag
set 'format', format_flag
file file_name // import json file
time 10000 // limit inflight 10s
if (expected_succ_rows >= 0) {
set 'max_filter_ratio', '1'
}
// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (ignore_failure && expected_succ_rows < 0) { return }
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
}
}
}
try {
sql "DROP TABLE IF EXISTS ${testTable_dup}"
create_httplogs_dup_table.call(testTable_dup)
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_finalize_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_finalize_status_error")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_close_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_close_status_error")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
} finally {
//try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}