[Fix](inverted index) check inverted index file existence befor data compaction (#21173)
This commit is contained in:
@ -399,10 +399,17 @@ Status Compaction::do_compaction_impl(int64_t permits) {
|
||||
[&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files,
|
||||
&dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows,
|
||||
this](int32_t column_uniq_id) {
|
||||
compact_column(
|
||||
auto st = compact_column(
|
||||
_cur_tablet_schema->get_inverted_index(column_uniq_id)->index_id(),
|
||||
src_segment_num, dest_segment_num, src_index_files, dest_index_files,
|
||||
fs, index_writer_path, tablet_path, trans_vec, dest_segment_num_rows);
|
||||
if (!st.ok()) {
|
||||
LOG(ERROR) << "failed to do index compaction"
|
||||
<< ". tablet=" << _tablet->full_name()
|
||||
<< ". column uniq id=" << column_uniq_id << ". index_id= "
|
||||
<< _cur_tablet_schema->get_inverted_index(column_uniq_id)
|
||||
->index_id();
|
||||
}
|
||||
});
|
||||
|
||||
LOG(INFO) << "succeed to do index compaction"
|
||||
@ -465,8 +472,37 @@ Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
|
||||
//NOTE: here src_rs may be in building index progress, so it would not contain inverted index info.
|
||||
bool all_have_inverted_index = std::all_of(
|
||||
_input_rowsets.begin(), _input_rowsets.end(), [&](const auto& src_rs) {
|
||||
return src_rs->tablet_schema()->get_inverted_index(unique_id) !=
|
||||
nullptr;
|
||||
BetaRowsetSharedPtr rowset =
|
||||
std::static_pointer_cast<BetaRowset>(src_rs);
|
||||
if (rowset == nullptr) {
|
||||
return false;
|
||||
}
|
||||
auto fs = rowset->rowset_meta()->fs();
|
||||
|
||||
auto index_meta =
|
||||
rowset->tablet_schema()->get_inverted_index(unique_id);
|
||||
if (index_meta == nullptr) {
|
||||
return false;
|
||||
}
|
||||
for (auto i = 0; i < rowset->num_segments(); i++) {
|
||||
auto segment_file = rowset->segment_file_path(i);
|
||||
std::string inverted_index_src_file_path =
|
||||
InvertedIndexDescriptor::get_index_file_name(
|
||||
segment_file, index_meta->index_id());
|
||||
bool exists = false;
|
||||
if (fs->exists(inverted_index_src_file_path, &exists) !=
|
||||
Status::OK()) {
|
||||
LOG(ERROR)
|
||||
<< inverted_index_src_file_path << " fs->exists error";
|
||||
return false;
|
||||
}
|
||||
if (!exists) {
|
||||
LOG(WARNING) << inverted_index_src_file_path
|
||||
<< " is not exists, will skip index compaction";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
if (all_have_inverted_index &&
|
||||
field_is_slice_type(_cur_tablet_schema->column_by_uid(unique_id).type())) {
|
||||
|
||||
@ -24,12 +24,12 @@
|
||||
|
||||
namespace doris {
|
||||
namespace segment_v2 {
|
||||
void compact_column(int32_t index_id, int src_segment_num, int dest_segment_num,
|
||||
std::vector<std::string> src_index_files,
|
||||
std::vector<std::string> dest_index_files, const io::FileSystemSPtr& fs,
|
||||
std::string index_writer_path, std::string tablet_path,
|
||||
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec,
|
||||
std::vector<uint32_t> dest_segment_num_rows) {
|
||||
Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_num,
|
||||
std::vector<std::string> src_index_files,
|
||||
std::vector<std::string> dest_index_files, const io::FileSystemSPtr& fs,
|
||||
std::string index_writer_path, std::string tablet_path,
|
||||
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec,
|
||||
std::vector<uint32_t> dest_segment_num_rows) {
|
||||
lucene::store::Directory* dir =
|
||||
DorisCompoundDirectory::getDirectory(fs, index_writer_path.c_str(), false);
|
||||
auto index_writer = _CLNEW lucene::index::IndexWriter(dir, nullptr, true /* create */,
|
||||
@ -41,16 +41,6 @@ void compact_column(int32_t index_id, int src_segment_num, int dest_segment_num,
|
||||
// format: rowsetId_segmentId_indexId.idx
|
||||
std::string src_idx_full_name =
|
||||
src_index_files[i] + "_" + std::to_string(index_id) + ".idx";
|
||||
bool exists = false;
|
||||
auto st = fs->exists(src_idx_full_name, &exists);
|
||||
if (!st.ok()) {
|
||||
LOG(ERROR) << src_idx_full_name << " fs->exists error:" << st;
|
||||
return;
|
||||
}
|
||||
if (!exists) {
|
||||
LOG(WARNING) << src_idx_full_name << " is not exists, will stop index compaction ";
|
||||
return;
|
||||
}
|
||||
DorisCompoundReader* reader = new DorisCompoundReader(
|
||||
DorisCompoundDirectory::getDirectory(fs, tablet_path.c_str()),
|
||||
src_idx_full_name.c_str());
|
||||
@ -90,6 +80,7 @@ void compact_column(int32_t index_id, int src_segment_num, int dest_segment_num,
|
||||
|
||||
// delete temporary index_writer_path
|
||||
fs->delete_directory(index_writer_path.c_str());
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace segment_v2
|
||||
} // namespace doris
|
||||
|
||||
@ -25,11 +25,11 @@
|
||||
namespace doris {
|
||||
|
||||
namespace segment_v2 {
|
||||
void compact_column(int32_t index_id, int src_segment_num, int dest_segment_num,
|
||||
std::vector<std::string> src_index_files,
|
||||
std::vector<std::string> dest_index_files, const io::FileSystemSPtr& fs,
|
||||
std::string index_writer_path, std::string tablet_path,
|
||||
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec,
|
||||
std::vector<uint32_t> dest_segment_num_rows);
|
||||
Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_num,
|
||||
std::vector<std::string> src_index_files,
|
||||
std::vector<std::string> dest_index_files, const io::FileSystemSPtr& fs,
|
||||
std::string index_writer_path, std::string tablet_path,
|
||||
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec,
|
||||
std::vector<uint32_t> dest_segment_num_rows);
|
||||
} // namespace segment_v2
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user