Refine filtering external files by pattern option
This commit is contained in:
@ -120,68 +120,152 @@ int ObExternalDataAccessDriver::pread(void *buf, const int64_t count, const int6
|
||||
return ret;
|
||||
}
|
||||
|
||||
class ObExternalFileListArrayOpWithFilter : public ObBaseDirEntryOperator
|
||||
{
|
||||
public:
|
||||
ObExternalFileListArrayOpWithFilter(ObIArray <common::ObString>& name_array,
|
||||
ObExternalPathFilter *filter,
|
||||
ObIAllocator& array_allocator)
|
||||
: name_array_(name_array), filter_(filter), allocator_(array_allocator) {}
|
||||
int func(const dirent *entry) {
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(entry)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid list entry, entry is null");
|
||||
} else if (OB_ISNULL(entry->d_name)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid list entry, d_name is null");
|
||||
} else {
|
||||
const ObString file_name(entry->d_name);
|
||||
ObString tmp_file;
|
||||
bool is_filtered = false;
|
||||
if (!file_name.empty() && file_name[file_name.length() - 1] != '/') {
|
||||
if (OB_NOT_NULL(filter_) && OB_FAIL(filter_->is_filtered(file_name, is_filtered))) {
|
||||
LOG_WARN("fail check is filtered", K(ret));
|
||||
} else if (!is_filtered) {
|
||||
if (OB_FAIL(ob_write_string(allocator_, file_name, tmp_file, true))) {
|
||||
OB_LOG(WARN, "fail to save file name", K(ret), K(file_name));
|
||||
} else if (OB_FAIL(name_array_.push_back(tmp_file))) {
|
||||
OB_LOG(WARN, "fail to push filename to array", K(ret), K(tmp_file));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private:
|
||||
ObIArray <ObString>& name_array_;
|
||||
ObExternalPathFilter *filter_;
|
||||
ObIAllocator& allocator_;
|
||||
};
|
||||
|
||||
class ObLocalFileListArrayOpWithFilter : public ObBaseDirEntryOperator
|
||||
{
|
||||
public:
|
||||
ObLocalFileListArrayOpWithFilter(ObIArray <common::ObString> &name_array,
|
||||
const ObString &path,
|
||||
const ObString &origin_path,
|
||||
ObExternalPathFilter *filter,
|
||||
ObIAllocator &array_allocator)
|
||||
: name_array_(name_array), path_(path), origin_path_(origin_path),
|
||||
filter_(filter), allocator_(array_allocator) {}
|
||||
int func(const dirent *entry)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(entry)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid list entry, entry is null");
|
||||
} else if (OB_ISNULL(entry->d_name)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid list entry, d_name is null");
|
||||
} else {
|
||||
const ObString file_name(entry->d_name);
|
||||
ObSqlString full_path;
|
||||
ObString tmp_file;
|
||||
bool is_filtered = false;
|
||||
ObString cur_path = path_;
|
||||
if (file_name.case_compare(".") == 0
|
||||
|| file_name.case_compare("..") == 0) {
|
||||
//do nothing
|
||||
} else if (OB_FAIL(full_path.assign(cur_path))) {
|
||||
OB_LOG(WARN, "assign string failed", K(ret));
|
||||
} else if (full_path.length() > 0 && *(full_path.ptr() + full_path.length() - 1) != '/' &&
|
||||
OB_FAIL(full_path.append("/"))) {
|
||||
OB_LOG(WARN, "append failed", K(ret)) ;
|
||||
} else if (OB_FAIL(full_path.append(file_name))) {
|
||||
OB_LOG(WARN, "append file name failed", K(ret));
|
||||
} else if (OB_NOT_NULL(filter_) && OB_FAIL(filter_->is_filtered(full_path.string(), is_filtered))) {
|
||||
LOG_WARN("fail check is filtered", K(ret));
|
||||
} else if (!is_filtered) {
|
||||
ObString target = full_path.string();
|
||||
if (!is_dir_scan()) {
|
||||
target += origin_path_.length();
|
||||
if (!target.empty() && '/' == target[0]) {
|
||||
target += 1;
|
||||
}
|
||||
}
|
||||
if (target.empty()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("empty dir or name", K(full_path), K(origin_path_));
|
||||
} else if (OB_FAIL(ob_write_string(allocator_, target, tmp_file))) {
|
||||
OB_LOG(WARN, "fail to save file name", K(ret), K(file_name));
|
||||
} else if (OB_FAIL(name_array_.push_back(tmp_file))) {
|
||||
OB_LOG(WARN, "fail to push filename to array", K(ret), K(tmp_file));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
private:
|
||||
ObIArray <ObString> &name_array_;
|
||||
const ObString &path_;
|
||||
const ObString &origin_path_;
|
||||
ObExternalPathFilter *filter_;
|
||||
ObIAllocator &allocator_;
|
||||
};
|
||||
|
||||
|
||||
int ObExternalDataAccessDriver::get_file_list(const ObString &path,
|
||||
const ObString &pattern,
|
||||
const ObExprRegexpSessionVariables ®exp_vars,
|
||||
ObIArray<ObString> &file_urls,
|
||||
ObIAllocator &allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t MAX_VISIT_COUNT = 100000;
|
||||
ObArray<ObString> file_dirs;
|
||||
ObExprRegexContext regexp_ctx;
|
||||
ObExternalPathFilter filter(regexp_ctx, allocator);
|
||||
|
||||
if (OB_ISNULL(device_handle_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObExternalDataAccessDriver not init", K(ret));
|
||||
} else if (!pattern.empty() && OB_FAIL(filter.init(pattern, regexp_vars))) {
|
||||
LOG_WARN("fail to init filter", K(ret));
|
||||
} else if (get_storage_type() == OB_STORAGE_OSS
|
||||
|| get_storage_type() == OB_STORAGE_COS) {
|
||||
ObSEArray<ObString, 16> temp_file_urls;
|
||||
ObFileListArrayOp file_op(temp_file_urls, allocator);
|
||||
ObExternalFileListArrayOpWithFilter file_op(file_urls, pattern.empty() ? NULL : &filter, allocator);
|
||||
if (OB_FAIL(device_handle_->scan_dir(to_cstring(path), file_op))) {
|
||||
LOG_WARN("scan dir failed", K(ret));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < temp_file_urls.count(); i++) {
|
||||
if (temp_file_urls.at(i).length() <= 0) {
|
||||
//do nothing
|
||||
} else if ( '/' == *(temp_file_urls.at(i).ptr() + temp_file_urls.at(i).length() - 1)) {
|
||||
//is direcotry
|
||||
} else {
|
||||
OZ (file_urls.push_back(temp_file_urls.at(i)));
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("show oss files", K(file_urls), K(file_dirs));
|
||||
} else if (get_storage_type() == OB_STORAGE_FILE) {
|
||||
ObSEArray<ObString, 4> file_dirs;
|
||||
OZ (file_dirs.push_back(path));
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < file_dirs.count(); i++) {
|
||||
ObString file_dir = file_dirs.at(i);
|
||||
ObFullPathArrayOp dir_op(file_dirs, file_dir, allocator);
|
||||
ObFullPathArrayOp file_op(file_urls, file_dir, allocator);
|
||||
ObLocalFileListArrayOpWithFilter dir_op(file_dirs, file_dir, path, NULL, allocator);
|
||||
ObLocalFileListArrayOpWithFilter file_op(file_urls, file_dir, path,
|
||||
pattern.empty() ? NULL : &filter, allocator);
|
||||
dir_op.set_dir_flag();
|
||||
if (file_dir.case_compare(".") == 0
|
||||
|| file_dir.case_compare("..") == 0) {
|
||||
//do nothing
|
||||
} else if (OB_FAIL(device_handle_->scan_dir(to_cstring(file_dir), file_op))) {
|
||||
if (OB_FAIL(device_handle_->scan_dir(to_cstring(file_dir), file_op))) {
|
||||
LOG_WARN("scan dir failed", K(ret));
|
||||
} else if (OB_FAIL(device_handle_->scan_dir(to_cstring(file_dir), dir_op))) {
|
||||
LOG_WARN("scan dir failed", K(ret));
|
||||
} else if (file_dirs.count() + file_urls.count() > MAX_VISIT_COUNT) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
LOG_WARN("too many files and dirs to visit", K(ret));
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < file_urls.count(); i++) {
|
||||
if (file_urls.at(i).length() <= path.length()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid file url", K(ret), K(path), K(file_urls.at(i)));
|
||||
} else {
|
||||
file_urls.at(i) += path.length();
|
||||
if (OB_LIKELY(file_urls.at(i).length() > 0)) {
|
||||
if (OB_NOT_NULL(file_urls.at(i).ptr()) && *file_urls.at(i).ptr() == '/') {
|
||||
file_urls.at(i) += 1;
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("file name length is invalid", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -30,6 +30,7 @@ namespace common
|
||||
}
|
||||
|
||||
namespace sql {
|
||||
class ObExprRegexpSessionVariables;
|
||||
|
||||
class ObExternalDataAccessDriver
|
||||
{
|
||||
@ -43,9 +44,11 @@ public:
|
||||
|
||||
int get_file_sizes(const ObString &location, const ObIArray<ObString> &urls, ObIArray<int64_t> &file_sizes);
|
||||
int pread(void *buf, const int64_t count, const int64_t offset, int64_t &read_size);
|
||||
int get_file_list(const ObString &path,
|
||||
ObIArray<ObString> &file_urls,
|
||||
ObIAllocator &allocator);
|
||||
int get_file_list(const common::ObString &path,
|
||||
const common::ObString &pattern,
|
||||
const ObExprRegexpSessionVariables ®exp_vars,
|
||||
common::ObIArray<common::ObString> &file_urls,
|
||||
common::ObIAllocator &allocator);
|
||||
static int resolve_storage_type(const ObString &location, common::ObStorageType &device_type);
|
||||
common::ObStorageType get_storage_type() { return storage_type_; }
|
||||
void close();
|
||||
|
||||
Reference in New Issue
Block a user