(improvement)[inverted-index] add and optimize checks when IO error occurs. (#24167)
When a disk io error occurs, errors may occur when reading and writing files in the inverted index. This PR adds error checking to prevent empty files from being generated.
This commit is contained in:
@ -69,6 +69,12 @@
|
||||
#define PATH_DELIMITERA "/"
|
||||
#endif
|
||||
|
||||
#define LOG_AND_THROW_IF_ERROR(status, msg) \
|
||||
if (!status.ok()) { \
|
||||
auto err = std::string(msg) + ": " + status.to_string(); \
|
||||
LOG(WARNING) << err; \
|
||||
_CLTHROWA(CL_ERR_IO, err.c_str()); \
|
||||
}
|
||||
namespace doris {
|
||||
namespace segment_v2 {
|
||||
|
||||
@ -119,7 +125,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
|
||||
auto out_idx = ram_dir.createOutput(idx_name.c_str());
|
||||
if (out_idx == nullptr) {
|
||||
LOG(WARNING) << "Write compound file error: RAMDirectory output is nullptr.";
|
||||
return;
|
||||
_CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error");
|
||||
}
|
||||
|
||||
std::unique_ptr<lucene::store::IndexOutput> ram_output(out_idx);
|
||||
@ -151,7 +157,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
|
||||
auto out = out_dir->createOutput(idx_name.c_str());
|
||||
if (out == nullptr) {
|
||||
LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr.";
|
||||
return;
|
||||
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
|
||||
}
|
||||
std::unique_ptr<lucene::store::IndexOutput> output(out);
|
||||
output->writeVInt(file_count);
|
||||
@ -416,11 +422,11 @@ void DorisCompoundDirectory::FSIndexOutput::close() {
|
||||
BufferedIndexOutput::close();
|
||||
} catch (CLuceneError& err) {
|
||||
LOG(WARNING) << "FSIndexOutput close, BufferedIndexOutput close error: " << err.what();
|
||||
if (err.number() != CL_ERR_IO) {
|
||||
if (err.number() == CL_ERR_IO) {
|
||||
LOG(WARNING) << "FSIndexOutput close, BufferedIndexOutput close IO error: "
|
||||
<< err.what();
|
||||
throw;
|
||||
}
|
||||
_CLTHROWA(err.number(), err.what());
|
||||
}
|
||||
if (writer) {
|
||||
Status ret = writer->finalize();
|
||||
@ -486,12 +492,7 @@ void DorisCompoundDirectory::init(const io::FileSystemSPtr& _fs, const char* _pa
|
||||
return;
|
||||
}
|
||||
bool exists = false;
|
||||
Status status = fs->exists(directory, &exists);
|
||||
if (!status.ok()) {
|
||||
auto err = "File system error: " + status.to_string();
|
||||
LOG(WARNING) << err;
|
||||
_CLTHROWA(CL_ERR_IO, err.c_str());
|
||||
}
|
||||
LOG_AND_THROW_IF_ERROR(fs->exists(directory, &exists), "Doris compound directory init IO error")
|
||||
if (!exists) {
|
||||
auto e = "Doris compound directory init error: " + directory + " is not a directory";
|
||||
LOG(WARNING) << e;
|
||||
@ -539,7 +540,7 @@ bool DorisCompoundDirectory::list(std::vector<std::string>* names) const {
|
||||
priv_getFN(fl, "");
|
||||
std::vector<io::FileInfo> files;
|
||||
bool exists;
|
||||
RETURN_IF_ERROR(fs->list(fl, true, &files, &exists));
|
||||
LOG_AND_THROW_IF_ERROR(fs->list(fl, true, &files, &exists), "List file IO error");
|
||||
for (auto& file : files) {
|
||||
names->push_back(file.file_name);
|
||||
}
|
||||
@ -551,7 +552,7 @@ bool DorisCompoundDirectory::fileExists(const char* name) const {
|
||||
char fl[CL_MAX_DIR];
|
||||
priv_getFN(fl, name);
|
||||
bool exists = false;
|
||||
fs->exists(fl, &exists);
|
||||
LOG_AND_THROW_IF_ERROR(fs->exists(fl, &exists), "File exists IO error")
|
||||
return exists;
|
||||
}
|
||||
|
||||
@ -586,9 +587,10 @@ DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(
|
||||
const char* file = _file;
|
||||
|
||||
bool exists = false;
|
||||
_fs->exists(file, &exists);
|
||||
LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory exists IO error")
|
||||
if (!exists) {
|
||||
_fs->create_directory(file);
|
||||
LOG_AND_THROW_IF_ERROR(_fs->create_directory(file),
|
||||
"Get directory create directory IO error")
|
||||
}
|
||||
|
||||
dir = _CLNEW DorisCompoundDirectory();
|
||||
@ -615,9 +617,7 @@ void DorisCompoundDirectory::touchFile(const char* name) {
|
||||
snprintf(buffer, CL_MAX_DIR, "%s%s%s", directory.c_str(), PATH_DELIMITERA, name);
|
||||
|
||||
io::FileWriterPtr tmp_writer;
|
||||
if (!fs->create_file(buffer, &tmp_writer).ok()) {
|
||||
_CLTHROWA(CL_ERR_IO, "IO Error while touching file");
|
||||
}
|
||||
LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer), "Touch file IO error")
|
||||
}
|
||||
|
||||
int64_t DorisCompoundDirectory::fileLength(const char* name) const {
|
||||
@ -625,7 +625,7 @@ int64_t DorisCompoundDirectory::fileLength(const char* name) const {
|
||||
char buffer[CL_MAX_DIR];
|
||||
priv_getFN(buffer, name);
|
||||
int64_t size = -1;
|
||||
RETURN_IF_ERROR(fs->file_size(buffer, &size));
|
||||
LOG_AND_THROW_IF_ERROR(fs->file_size(buffer, &size), "Get file size IO error");
|
||||
return size;
|
||||
}
|
||||
|
||||
@ -652,21 +652,16 @@ bool DorisCompoundDirectory::doDeleteFile(const char* name) {
|
||||
CND_PRECONDITION(directory[0] != 0, "directory is not open");
|
||||
char fl[CL_MAX_DIR];
|
||||
priv_getFN(fl, name);
|
||||
return fs->delete_file(fl).ok();
|
||||
LOG_AND_THROW_IF_ERROR(fs->delete_file(fl), "Delete file IO error")
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DorisCompoundDirectory::deleteDirectory() {
|
||||
CND_PRECONDITION(directory[0] != 0, "directory is not open");
|
||||
char fl[CL_MAX_DIR];
|
||||
priv_getFN(fl, "");
|
||||
Status status = fs->delete_directory(fl);
|
||||
if (!status.ok()) {
|
||||
char* err = _CL_NEWARRAY(
|
||||
char, 16 + status.to_string().length() + 1); //16: len of "couldn't delete "
|
||||
strcpy(err, "couldn't delete directory: ");
|
||||
strcat(err, status.to_string().c_str());
|
||||
_CLTHROWA_DEL(CL_ERR_IO, err);
|
||||
}
|
||||
LOG_AND_THROW_IF_ERROR(fs->delete_directory(fl),
|
||||
fmt::format("Delete directory {} IO error", fl))
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -680,23 +675,12 @@ void DorisCompoundDirectory::renameFile(const char* from, const char* to) {
|
||||
priv_getFN(nu, to);
|
||||
|
||||
bool exists = false;
|
||||
fs->exists(nu, &exists);
|
||||
LOG_AND_THROW_IF_ERROR(fs->exists(nu, &exists), "File exists IO error")
|
||||
if (exists) {
|
||||
if (!fs->delete_directory(nu).ok()) {
|
||||
char* err = _CL_NEWARRAY(char, 16 + strlen(to) + 1); //16: len of "couldn't delete "
|
||||
strcpy(err, "couldn't delete ");
|
||||
strcat(err, to);
|
||||
_CLTHROWA_DEL(CL_ERR_IO, err);
|
||||
}
|
||||
}
|
||||
if (rename(old, nu) != 0) {
|
||||
char buffer[20 + CL_MAX_PATH + CL_MAX_PATH];
|
||||
strcpy(buffer, "couldn't rename ");
|
||||
strcat(buffer, from);
|
||||
strcat(buffer, " to ");
|
||||
strcat(buffer, nu);
|
||||
_CLTHROWA(CL_ERR_IO, buffer);
|
||||
LOG_AND_THROW_IF_ERROR(fs->delete_directory(nu), fmt::format("Delete {} IO error", nu))
|
||||
}
|
||||
LOG_AND_THROW_IF_ERROR(fs->rename_dir(old, nu),
|
||||
fmt::format("Rename {} to {} IO error", old, nu))
|
||||
}
|
||||
|
||||
lucene::store::IndexOutput* DorisCompoundDirectory::createOutput(const char* name) {
|
||||
@ -704,19 +688,11 @@ lucene::store::IndexOutput* DorisCompoundDirectory::createOutput(const char* nam
|
||||
char fl[CL_MAX_DIR];
|
||||
priv_getFN(fl, name);
|
||||
bool exists = false;
|
||||
auto status = fs->exists(fl, &exists);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "Doris compound directory create output error: " << status.to_string();
|
||||
return nullptr;
|
||||
}
|
||||
LOG_AND_THROW_IF_ERROR(fs->exists(fl, &exists), "Create output file exists IO error")
|
||||
if (exists) {
|
||||
if (!fs->delete_file(fl).ok()) {
|
||||
char tmp[1024];
|
||||
strcpy(tmp, "Cannot overwrite: ");
|
||||
strcat(tmp, name);
|
||||
_CLTHROWA(CL_ERR_IO, tmp);
|
||||
}
|
||||
fs->exists(fl, &exists);
|
||||
LOG_AND_THROW_IF_ERROR(fs->delete_file(fl),
|
||||
fmt::format("Create output delete file {} IO error", fl))
|
||||
LOG_AND_THROW_IF_ERROR(fs->exists(fl, &exists), "Create output file exists IO error")
|
||||
assert(!exists);
|
||||
}
|
||||
auto ret = _CLNEW FSIndexOutput();
|
||||
|
||||
@ -133,6 +133,12 @@ DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char
|
||||
entries(_CLNEW EntriesType(true, true)) {
|
||||
bool success = false;
|
||||
try {
|
||||
if (dir->fileLength(name) == 0) {
|
||||
LOG(WARNING) << "CompoundReader open failed, index file " << name << " is empty.";
|
||||
_CLTHROWA(CL_ERR_IO,
|
||||
fmt::format("CompoundReader open failed, index file {} is empty", name)
|
||||
.c_str());
|
||||
}
|
||||
stream = dir->openInput(name, readBufferSize);
|
||||
|
||||
int32_t count = stream->readVInt();
|
||||
|
||||
@ -472,6 +472,9 @@ public:
|
||||
if (data_out != nullptr && meta_out != nullptr && index_out != nullptr) {
|
||||
_bkd_writer->meta_finish(meta_out, _bkd_writer->finish(data_out, index_out),
|
||||
int(field_type));
|
||||
} else {
|
||||
LOG(WARNING) << "Inverted index writer create output error occurred: nullptr";
|
||||
_CLTHROWA(CL_ERR_IO, "Create output error with nullptr");
|
||||
}
|
||||
FINALIZE_OUTPUT(meta_out)
|
||||
FINALIZE_OUTPUT(data_out)
|
||||
|
||||
Reference in New Issue
Block a user