Refine filtering external files by pattern option

This commit is contained in:
wjhh2008 2023-12-07 04:46:33 +00:00 committed by ob-robot
parent 0e346eecdd
commit 0fe424c626
23 changed files with 282 additions and 208 deletions

View File

@ -679,36 +679,6 @@ int ObFileListArrayOp::func(const dirent *entry)
return ret;
}
int ObFullPathArrayOp::func(const dirent *entry)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(entry)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid list entry, entry is null");
} else if (OB_ISNULL(entry->d_name)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid list entry, d_name is null");
} else {
ObSqlString full_path;
const ObString file_name(entry->d_name);
ObString tmp_file;
if (OB_FAIL(full_path.assign(path_))) {
OB_LOG(WARN, "assign string failed", K(ret));
} else if (full_path.length() > 0 && *(full_path.ptr() + full_path.length() - 1) != '/' &&
OB_FAIL(full_path.append("/"))) {
OB_LOG(WARN, "append failed", K(ret)) ;
} else if (OB_FAIL(full_path.append(file_name))) {
OB_LOG(WARN, "append file name failed", K(ret));
} else if (OB_FAIL(ob_write_string(allocator_, full_path.string(), tmp_file))) {
OB_LOG(WARN, "fail to save file name", K(ret), K(file_name));
} else if (OB_FAIL(name_array_.push_back(tmp_file))) {
OB_LOG(WARN, "fail to push filename to array", K(ret), K(tmp_file));
}
}
return ret;
}
//*************ObDirPrefixEntryNameFilter*************
int ObDirPrefixEntryNameFilter::func(const dirent *entry)
{

View File

@ -110,20 +110,6 @@ private:
common::ObIAllocator& allocator_;
};
class ObFullPathArrayOp : public ObBaseDirEntryOperator
{
public:
ObFullPathArrayOp(common::ObIArray <common::ObString> &name_array, common::ObString &path,
common::ObIAllocator &array_allocator)
: name_array_(name_array), path_(path), allocator_(array_allocator) {}
~ObFullPathArrayOp() {}
int func(const dirent *entry) ;
private:
common::ObIArray <common::ObString> &name_array_;
common::ObString &path_;
common::ObIAllocator &allocator_;
};
class ObDirPrefixEntryNameFilter : public ObBaseDirEntryOperator
{
public:

View File

@ -41,6 +41,7 @@ namespace oceanbase
{
using namespace observer;
using namespace common;
using namespace sql;
using namespace transaction::tablelock;
namespace share
{
@ -243,17 +244,20 @@ int ObExternalTableFileManager::update_inner_table_file_list(
return ret;
}
int ObExternalTableFileManager::get_external_file_list_on_device(const ObString &location,
ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes,
const ObString &access_info,
ObIAllocator &allocator)
int ObExternalTableFileManager::get_external_file_list_on_device(
const ObString &location,
const ObString &pattern,
const ObExprRegexpSessionVariables &regexp_vars,
ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes,
const ObString &access_info,
ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
sql::ObExternalDataAccessDriver driver;
if (OB_FAIL(driver.init(location, access_info))) {
LOG_WARN("init external data access driver failed", K(ret));
} else if (OB_FAIL(driver.get_file_list(location, file_urls, allocator))) {
} else if (OB_FAIL(driver.get_file_list(location, pattern, regexp_vars, file_urls, allocator))) {
LOG_WARN("get file urls failed", K(ret));
} else if (OB_FAIL(driver.get_file_sizes(location, file_urls, file_sizes))) {
LOG_WARN("get file sizes failed", K(ret));

View File

@ -17,6 +17,9 @@
#include "observer/ob_server_struct.h"
namespace oceanbase {
namespace sql {
class ObExprRegexpSessionVariables;
}
namespace share {
@ -124,11 +127,13 @@ public:
const uint64_t table_id,
ObMySQLTransaction &trans);
int get_external_file_list_on_device(const ObString &location,
ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes,
const ObString &access_info,
ObIAllocator &allocator);
int get_external_file_list_on_device(const common::ObString &location,
const common::ObString &pattern,
const sql::ObExprRegexpSessionVariables &regexp_vars,
common::ObIArray<common::ObString> &file_urls,
common::ObIArray<int64_t> &file_sizes,
const common::ObString &access_info,
common::ObIAllocator &allocator);
private:

View File

@ -39,11 +39,12 @@ int ObAsyncLoadExternalTableFileListP::process()
int ret = OB_SUCCESS;
ObLoadExternalFileListReq &req = arg_;
ObLoadExternalFileListRes &res = result_;
ObString &location = req.location_;
ObSEArray<ObString, 16> file_urls;
ObString access_info;
ObArenaAllocator allocator;
if (OB_FAIL(ObExternalTableFileManager::get_instance().get_external_file_list_on_device(location,
if (OB_FAIL(ObExternalTableFileManager::get_instance().get_external_file_list_on_device(req.location_,
req.pattern_,
req.regexp_vars_,
file_urls,
res.file_sizes_,
access_info,
@ -56,7 +57,7 @@ int ObAsyncLoadExternalTableFileListP::process()
OZ(res.file_urls_.push_back(tmp));
}
res.rcode_.rcode_ = ret;
LOG_DEBUG("get external table file", K(ret), K(location), K(file_urls), K(res.file_urls_));
LOG_DEBUG("get external table file", K(ret), K(req.location_), K(req.pattern_), K(file_urls), K(res.file_urls_));
return ret;
}

View File

@ -23,7 +23,7 @@ OB_SERIALIZE_MEMBER(ObFlushExternalTableFileCacheReq, tenant_id_, table_id_, par
OB_SERIALIZE_MEMBER(ObFlushExternalTableFileCacheRes, rcode_);
OB_SERIALIZE_MEMBER(ObLoadExternalFileListReq, location_);
OB_SERIALIZE_MEMBER(ObLoadExternalFileListReq, location_, pattern_, regexp_vars_);
OB_DEF_SERIALIZE(ObLoadExternalFileListRes)
{

View File

@ -14,6 +14,7 @@
#define OBDEV_SRC_EXTERNAL_TABLE_FILE_TASK_H_
#include "rpc/obrpc/ob_rpc_result_code.h"
#include "deps/oblib/src/lib/lock/ob_thread_cond.h"
#include "sql/engine/expr/ob_expr_regexp_context.h"
namespace oceanbase
{
namespace share
@ -47,9 +48,11 @@ class ObLoadExternalFileListReq
OB_UNIS_VERSION(1);
public:
ObLoadExternalFileListReq() :
location_() {}
location_(), pattern_() {}
public:
ObString location_;
ObString pattern_;
sql::ObExprRegexpSessionVariables regexp_vars_;
TO_STRING_KV(K_(location));
};

View File

@ -22,6 +22,7 @@
#include "sql/engine/table/ob_external_table_access_service.h"
#include "sql/ob_sql_utils.h"
#include "sql/rewrite/ob_query_range.h"
#include "share/backup/ob_backup_io_adapter.h"
namespace oceanbase
{
@ -319,48 +320,44 @@ int ObExternalTableUtils::prepare_single_scan_range(const uint64_t tenant_id,
return ret;
}
int ObExternalTableUtils::filter_external_table_files(const ObString &pattern,
ObExecContext &exec_ctx,
ObIArray<ObString> &file_urls)
bool ObExternalPathFilter::is_inited() {
return regex_ctx_.is_inited();
}
int ObExternalPathFilter::is_filtered(const ObString &path, bool &is_filtered)
{
int ret = OB_SUCCESS;
if (!pattern.empty()) {
const common::ObCollationType cs_type_pattern = CS_TYPE_UTF8MB4_BIN;
const common::ObCollationType cs_type_file = CS_TYPE_UTF8MB4_BIN;
const common::ObCollationType cs_type_match = CS_TYPE_UTF16_BIN;
ObExprRegexContext regex_ctx;
ObArenaAllocator allocator;
bool match = false;
ObString out_text;
if (OB_FAIL(ObExprUtil::convert_string_collation(path,
CS_TYPE_UTF8MB4_BIN,
out_text,
CS_TYPE_UTF16_BIN,
temp_allocator_))) {
LOG_WARN("convert charset failed", K(ret));
} else if (OB_FAIL(regex_ctx_.match(temp_allocator_, out_text, 0, match))) {
LOG_WARN("regex match failed", K(ret));
}
is_filtered = !match;
temp_allocator_.reuse();
return ret;
}
int ObExternalPathFilter::init(const ObString &pattern,
const ObExprRegexpSessionVariables &regexp_vars)
{
int ret = OB_SUCCESS;
if (regex_ctx_.is_inited()) {
ret = OB_INIT_TWICE;
LOG_WARN("fail to init", K(ret));
} else {
uint32_t flags = 0;
ObString match_string;
ObSEArray<ObString, 8> tmp_file_urls;
if (OB_FAIL(ObExprRegexContext::get_regexp_flags(match_string, true, flags))) {
LOG_WARN("failed to get regexp flags", K(ret));
} else if (OB_FAIL(regex_ctx.init(exec_ctx.get_allocator(),
exec_ctx.get_my_session(),
pattern,
flags,
true,
cs_type_pattern))) {
} else if (OB_FAIL(regex_ctx_.init(allocator_, regexp_vars,
pattern, flags, true, CS_TYPE_UTF8MB4_BIN))) {
LOG_WARN("init regex context failed", K(ret), K(pattern));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < file_urls.count(); ++i) {
bool match = false;
ObString out_text;
if (OB_FAIL(ObExprUtil::convert_string_collation(file_urls.at(i),
cs_type_file,
out_text,
cs_type_match,
allocator))) {
LOG_WARN("convert charset failed", K(ret));
} else if (OB_FAIL(regex_ctx.match(allocator, out_text, 0, match))) {
LOG_WARN("regex match failed", K(ret));
} else if (match && OB_FAIL(tmp_file_urls.push_back(file_urls.at(i)))) {
LOG_WARN("failed to push back into tmp_file_urls", K(ret));
}
}
if (OB_SUCC(ret) && OB_FAIL(file_urls.assign(tmp_file_urls))) {
LOG_WARN("failed to assign file_urls", K(ret));
}
}
}
return ret;

View File

@ -14,6 +14,8 @@
#define _OB_EXTERNAL_TABLE_UTILS_H_
#include "lib/container/ob_iarray.h"
#include "lib/string/ob_string.h"
#include "lib/allocator/page_arena.h"
namespace oceanbase
{
@ -30,10 +32,24 @@ class ObDASTabletLoc;
class ObExecContext;
class ObExternalTableAccessService;
class ObQueryRange;
class ObExprRegexContext;
class ObExprRegexpSessionVariables;
}
namespace share
{
struct ObExternalPathFilter {
ObExternalPathFilter(sql::ObExprRegexContext &regex_ctx, common::ObIAllocator &allocator)
: regex_ctx_(regex_ctx), allocator_(allocator) {}
int init(const common::ObString &pattern, const sql::ObExprRegexpSessionVariables &regexp_vars);
bool is_inited();
int is_filtered(const common::ObString &path, bool &is_filtered);
sql::ObExprRegexContext &regex_ctx_;
common::ObIAllocator &allocator_;
common::ObArenaAllocator temp_allocator_;
};
class ObExternalTableUtils {
public:
enum ExternalTableRangeColumn {
@ -78,9 +94,6 @@ class ObExternalTableUtils {
common::ObIArray<common::ObNewRange *> &new_range,
bool is_file_on_disk);
static int filter_external_table_files(const common::ObString &pattern,
sql::ObExecContext &exec_ctx,
common::ObIArray<common::ObString> &file_urls);
static int calc_assigned_files_to_sqcs(
const common::ObIArray<ObExternalFileInfo> &files,
common::ObIArray<int64_t> &assigned_idx,

View File

@ -597,12 +597,14 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt)
}
if (OB_SUCC(ret) && table_schema.is_external_table()) {
//auto refresh after create external table
ObExprRegexpSessionVariables regexp_vars;
OZ (my_session->get_regexp_session_vars(regexp_vars));
OZ (ObAlterTableExecutor::update_external_file_list(
table_schema.get_tenant_id(), res.table_id_,
table_schema.get_external_file_location(),
table_schema.get_external_file_location_access_info(),
table_schema.get_external_file_pattern(),
ctx));
regexp_vars));
}
} else {
if (table_schema.is_external_table()) {
@ -878,35 +880,8 @@ int ObAlterTableExecutor::alter_table_rpc_v2(
return ret;
}
int ObAlterTableExecutor::get_external_file_list(const ObString &location,
ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes,
const ObString &access_info,
ObIAllocator &allocator,
common::ObStorageType &storage_type)
{
int ret = OB_SUCCESS;
ObExternalDataAccessDriver driver;
if (OB_FAIL(driver.init(location, access_info))) {
LOG_WARN("init external data access driver failed", K(ret));
} else if (OB_FAIL(driver.get_file_list(location, file_urls, allocator))) {
LOG_WARN("get file urls failed", K(ret));
} else if (OB_FAIL(driver.get_file_sizes(location, file_urls, file_sizes))) {
LOG_WARN("get file sizes failed", K(ret));
}
if (driver.is_opened()) {
storage_type = driver.get_storage_type();
driver.close();
}
LOG_DEBUG("show external table files", K(file_urls), K(storage_type), K(access_info));
return ret;
}
int ObAlterTableExecutor::filter_and_sort_external_files(const ObString &pattern,
ObExecContext &exec_ctx,
ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes) {
int ObAlterTableExecutor::sort_external_files(ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes) {
int ret = OB_SUCCESS;
const int64_t count = file_urls.count();
ObSEArray<int64_t, 8> tmp_file_sizes;
@ -924,11 +899,6 @@ int ObAlterTableExecutor::filter_and_sort_external_files(const ObString &pattern
LOG_WARN("failed to set refactored to file_map", K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObExternalTableUtils::filter_external_table_files(pattern, exec_ctx, file_urls))) {
LOG_WARN("failed to filter external table files");
}
}
if (OB_SUCC(ret)) {
std::sort(file_urls.get_data(), file_urls.get_data() + file_urls.count());
for (int64_t i = 0; OB_SUCC(ret) && i < file_urls.count(); ++i) {
@ -1019,6 +989,8 @@ int ObAlterTableExecutor::flush_external_file_cache(
int ObAlterTableExecutor::collect_local_files_on_servers(
const uint64_t tenant_id,
const ObString &location,
const ObString &pattern,
const ObExprRegexpSessionVariables &regexp_vars,
ObIArray<ObAddr> &all_servers,
ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes,
@ -1074,6 +1046,8 @@ int ObAlterTableExecutor::collect_local_files_on_servers(
ObRpcAsyncLoadExternalTableFileCallBack* async_cb = nullptr;
ObLoadExternalFileListReq req;
req.location_ = location;
req.pattern_ = pattern;
req.regexp_vars_ = regexp_vars;
if (OB_ISNULL(async_cb = OB_NEWx(ObRpcAsyncLoadExternalTableFileCallBack, (&allocator), (&context)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -1134,23 +1108,25 @@ int ObAlterTableExecutor::update_external_file_list(
const ObString &location,
const ObString &access_info,
const ObString &pattern,
ObExecContext &exec_ctx)
const ObExprRegexpSessionVariables &regexp_vars)
{
int ret = OB_SUCCESS;
ObSEArray<ObString, 8> file_urls;
ObSEArray<int64_t, 8> file_sizes;
ObArenaAllocator allocator;
ObSEArray<ObAddr, 8> all_servers;
OZ (GCTX.location_service_->external_table_get(tenant_id, table_id, all_servers));
if (ObSQLUtils::is_external_files_on_local_disk(location)) {
OZ (collect_local_files_on_servers(tenant_id, location, all_servers, file_urls, file_sizes, allocator));
OZ (collect_local_files_on_servers(tenant_id, location, pattern, regexp_vars,
all_servers, file_urls, file_sizes, allocator));
} else {
OZ (ObExternalTableFileManager::get_instance().get_external_file_list_on_device(
location, file_urls, file_sizes, access_info, allocator));
location, pattern, regexp_vars, file_urls, file_sizes, access_info, allocator));
}
OZ (filter_and_sort_external_files(pattern, exec_ctx, file_urls, file_sizes));
OZ (sort_external_files(file_urls, file_sizes));
//TODO [External Table] opt performance
OZ (ObExternalTableFileManager::get_instance().update_inner_table_file_list(tenant_id, table_id, file_urls, file_sizes));
@ -1166,12 +1142,15 @@ int ObAlterTableExecutor::execute_alter_external_table(ObExecContext &ctx, ObAlt
int64_t option = stmt.get_alter_external_table_type();
switch (option) {
case T_ALTER_REFRESH_EXTERNAL_TABLE: {
ObExprRegexpSessionVariables regexp_vars;
CK (ctx.get_my_session());
OZ (ctx.get_my_session()->get_regexp_session_vars(regexp_vars));
OZ (update_external_file_list(stmt.get_tenant_id(),
arg.alter_table_schema_.get_table_id(),
arg.alter_table_schema_.get_external_file_location(),
arg.alter_table_schema_.get_external_file_location_access_info(),
arg.alter_table_schema_.get_external_file_pattern(),
ctx));
regexp_vars));
break;
}
default: {

View File

@ -50,6 +50,7 @@ class ObExecContext;
class ObRawExpr;
class ObCreateTableStmt;
class ObTableStmt;
class ObExprRegexpSessionVariables;
class ObCreateTableExecutor
{
@ -106,10 +107,12 @@ public:
const common::ObString &location,
const common::ObString &access_info,
const common::ObString &pattern,
ObExecContext &ctx);
const ObExprRegexpSessionVariables &regexp_vars);
static int collect_local_files_on_servers(
const uint64_t tenant_id,
const common::ObString &location,
const common::ObString &pattern,
const ObExprRegexpSessionVariables &regexp_vars,
common::ObIArray<common::ObAddr> &all_servers,
common::ObIArray<common::ObString> &file_urls,
common::ObIArray<int64_t> &file_sizes,
@ -199,17 +202,8 @@ private:
int refresh_schema_for_table(const uint64_t tenant_id);
int execute_alter_external_table(ObExecContext &ctx, ObAlterTableStmt &stmt);
static int get_external_file_list(
const ObString &location,
common::ObIArray<common::ObString> &file_urls,
common::ObIArray<int64_t> &file_sizes,
const common::ObString &access_info,
common::ObIAllocator &allocator,
common::ObStorageType &storage_type);
static int filter_and_sort_external_files(const ObString &pattern,
ObExecContext &exec_ctx,
ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes);
static int sort_external_files(ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes);
private:
//DISALLOW_COPY_AND_ASSIGN(ObAlterTableExecutor);
};

View File

@ -222,10 +222,13 @@ int ObExprRegexp::eval_regexp(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &expr_
int64_t start_pos = 1;
bool is_case_sensitive = ObCharset::is_bin_sort(expr.args_[0]->datum_meta_.cs_type_);
ObString text_utf16;
ObExprRegexpSessionVariables regexp_vars;
if (OB_FAIL(ObExprRegexContext::get_regexp_flags(match_string, is_case_sensitive, flags))) {
LOG_WARN("failed to get regexp flags", K(ret));
} else if (OB_FAIL(ctx.exec_ctx_.get_my_session()->get_regexp_session_vars(regexp_vars))) {
LOG_WARN("fail to get regexp");
} else if (OB_FAIL(regex_ctx->init(reusable ? ctx.exec_ctx_.get_allocator() : tmp_alloc,
ctx.exec_ctx_.get_my_session(),
regexp_vars,
pattern->get_string(), flags, reusable, expr.args_[1]->datum_meta_.cs_type_))) {
LOG_WARN("init regex context failed", K(ret), K(pattern->get_string()));
} else if (expr.args_[0]->datum_meta_.cs_type_ == CS_TYPE_UTF8MB4_BIN ||

View File

@ -58,7 +58,7 @@ void ObExprRegexContext::destroy()
}
int ObExprRegexContext::init(ObExprStringBuf &string_buf,
ObSQLSessionInfo *session_info,
const ObExprRegexpSessionVariables &regex_vars,
const ObString &origin_pattern,
const uint32_t cflags,
const bool reusable,
@ -131,22 +131,16 @@ int ObExprRegexContext::init(ObExprStringBuf &string_buf,
UChar *u_pattern = NULL;
UParseError parse_error;
UErrorCode u_error_code = U_ZERO_ERROR;
int64_t regexp_stack_limit = 0;
int64_t regexp_time_limit = 0;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(get_valid_unicode_string(string_buf, pattern, u_pattern, u_pattern_length))) {
LOG_WARN("failed to get valid unicode string", K(ret));
} else if (OB_ISNULL(u_pattern) || OB_ISNULL(session_info)) {
} else if (OB_ISNULL(u_pattern)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpcted null", K(ret), K(pattern), K(u_pattern_length), K(session_info));
} else if (OB_FAIL(session_info->get_regexp_stack_limit(regexp_stack_limit)) ||
OB_FAIL(session_info->get_regexp_time_limit(regexp_time_limit))) {
LOG_WARN("failed to get regexp_stack_limit or get_regexp_time_limit", K(ret),
K(regexp_stack_limit), K(regexp_time_limit));
LOG_WARN("get unexpcted null", K(ret), K(pattern), K(u_pattern_length));
} else {
regexp_engine_ = uregex_open(u_pattern, u_pattern_length, cflags, &parse_error, &u_error_code);
uregex_setStackLimit(regexp_engine_, regexp_stack_limit, &u_error_code);
uregex_setTimeLimit(regexp_engine_, regexp_time_limit, &u_error_code);
uregex_setStackLimit(regexp_engine_, regex_vars.regexp_stack_limit_, &u_error_code);
uregex_setTimeLimit(regexp_engine_, regex_vars.regexp_time_limit_, &u_error_code);
if (OB_FAIL(check_icu_regexp_status(u_error_code, &parse_error))) {
LOG_WARN("failed to check icu regexp status", K(ret));
if (regexp_engine_ != NULL) {
@ -888,5 +882,7 @@ int ObExprRegexContext::check_binary_compatible(const ObExprResType *types, int6
return ret;
}
OB_SERIALIZE_MEMBER(ObExprRegexpSessionVariables, regexp_stack_limit_, regexp_time_limit_);
}
}

View File

@ -29,6 +29,16 @@ namespace oceanbase
namespace sql
{
struct ObExprRegexpSessionVariables {
ObExprRegexpSessionVariables():
regexp_stack_limit_(0),
regexp_time_limit_(0)
{}
int64_t regexp_stack_limit_;
int64_t regexp_time_limit_;
OB_UNIS_VERSION(1);
};
class ObExprRegexContext : public ObExprOperatorCtx
{
public:
@ -43,7 +53,7 @@ public:
// The previous regex compile result can be used if pattern not change, if %reusable is true.
// %string_buf must be the same with previous init too if %reusable is true.
int init(ObExprStringBuf &string_buf,
ObSQLSessionInfo *session_info,
const ObExprRegexpSessionVariables &regex_vars,
const ObString &origin_pattern,
const uint32_t cflags,
const bool reusable,

View File

@ -170,6 +170,7 @@ int ObExprRegexpCount::eval_regexp_count(
int64_t res_count = 0;
ObExprRegexContext local_regex_ctx;
ObExprRegexContext *regexp_ctx = &local_regex_ctx;
ObExprRegexpSessionVariables regexp_vars;
const bool reusable = (0 != expr.extra_) && ObExpr::INVALID_EXP_CTX_ID != expr.expr_ctx_id_;
bool is_case_sensitive = ObCharset::is_bin_sort(expr.args_[0]->datum_meta_.cs_type_);
uint32_t flags = 0;
@ -191,9 +192,11 @@ int ObExprRegexpCount::eval_regexp_count(
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObExprRegexContext::get_regexp_flags(match_param, is_case_sensitive, flags))) {
LOG_WARN("fail to get regexp flags", K(ret), K(match_param));
} else if (OB_FAIL(ctx.exec_ctx_.get_my_session()->get_regexp_session_vars(regexp_vars))) {
LOG_WARN("fail to get regexp");
} else if (!pattern->is_null() &&
OB_FAIL(regexp_ctx->init(reusable ? ctx.exec_ctx_.get_allocator() : tmp_alloc,
ctx.exec_ctx_.get_my_session(),
regexp_vars,
pattern->get_string(), flags, reusable, expr.args_[1]->datum_meta_.cs_type_))) {
LOG_WARN("fail to init regexp", K(pattern), K(flags), K(ret));
} else if (ob_is_text_tc(expr.args_[0]->datum_meta_.type_)) {

View File

@ -248,6 +248,7 @@ int ObExprRegexpInstr::eval_regexp_instr(
ObIAllocator &tmp_alloc = alloc_guard.get_allocator();
ObExprRegexContext local_regex_ctx;
ObExprRegexContext *regexp_ctx = &local_regex_ctx;
ObExprRegexpSessionVariables regexp_vars;
const bool reusable = (0 != expr.extra_) && ObExpr::INVALID_EXP_CTX_ID != expr.expr_ctx_id_;
bool is_case_sensitive = ObCharset::is_bin_sort(expr.args_[0]->datum_meta_.cs_type_);
uint32_t flags = 0;
@ -269,9 +270,11 @@ int ObExprRegexpInstr::eval_regexp_instr(
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObExprRegexContext::get_regexp_flags(match_param, is_case_sensitive, flags))) {
LOG_WARN("fail to get regexp flags", K(ret), K(match_param));
} else if (OB_FAIL(ctx.exec_ctx_.get_my_session()->get_regexp_session_vars(regexp_vars))) {
LOG_WARN("fail to get regexp");
} else if (!pattern->is_null() && !null_result &&
OB_FAIL(regexp_ctx->init(reusable ? ctx.exec_ctx_.get_allocator() : tmp_alloc,
ctx.exec_ctx_.get_my_session(),
regexp_vars,
pattern->get_string(), flags, reusable, expr.args_[1]->datum_meta_.cs_type_))) {
LOG_WARN("fail to init regexp", K(pattern), K(flags), K(ret));
} else if (ob_is_text_tc(expr.args_[0]->datum_meta_.type_)) {

View File

@ -175,6 +175,7 @@ int ObExprRegexpLike::eval_regexp_like(
ObIAllocator &tmp_alloc = alloc_guard.get_allocator();
ObExprRegexContext local_regex_ctx;
ObExprRegexContext *regexp_ctx = &local_regex_ctx;
ObExprRegexpSessionVariables regexp_vars;
uint32_t flags = 0;
int64_t start_pos = 1;
bool match = false;
@ -197,9 +198,11 @@ int ObExprRegexpLike::eval_regexp_like(
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObExprRegexContext::get_regexp_flags(match_param, is_case_sensitive, flags))) {
LOG_WARN("fail to get regexp flags", K(ret), K(match_param));
} else if (OB_FAIL(ctx.exec_ctx_.get_my_session()->get_regexp_session_vars(regexp_vars))) {
LOG_WARN("fail to get regexp");
} else if (!pattern->is_null() &&
OB_FAIL(regexp_ctx->init(reusable ? ctx.exec_ctx_.get_allocator() : tmp_alloc,
ctx.exec_ctx_.get_my_session(),
regexp_vars,
pattern->get_string(), flags, reusable,
expr.args_[1]->datum_meta_.cs_type_))) {
LOG_WARN("fail to init regexp", K(pattern), K(flags), K(ret));

View File

@ -287,6 +287,7 @@ int ObExprRegexpReplace::eval_regexp_replace(
ObString match_param = (NULL != match_type && !match_type->is_null()) ? match_type->get_string() : ObString();
ObExprRegexContext local_regex_ctx;
ObExprRegexContext *regexp_ctx = &local_regex_ctx;
ObExprRegexpSessionVariables regexp_vars;
const bool reusable = (0 != expr.extra_) && ObExpr::INVALID_EXP_CTX_ID != expr.expr_ctx_id_;
uint32_t flags = 0;
bool is_case_sensitive = ObCharset::is_bin_sort(expr.args_[0]->datum_meta_.cs_type_);
@ -304,9 +305,11 @@ int ObExprRegexpReplace::eval_regexp_replace(
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObExprRegexContext::get_regexp_flags(match_param, is_case_sensitive, flags))) {
LOG_WARN("fail to get regexp flags", K(ret), K(match_param));
} else if (OB_FAIL(ctx.exec_ctx_.get_my_session()->get_regexp_session_vars(regexp_vars))) {
LOG_WARN("fail to get regexp");
} else if (!pattern->is_null() && !null_result &&
OB_FAIL(regexp_ctx->init(reusable ? ctx.exec_ctx_.get_allocator() : tmp_alloc,
ctx.exec_ctx_.get_my_session(),
regexp_vars,
pattern->get_string(), flags, reusable,
expr.args_[1]->datum_meta_.cs_type_))) {
LOG_WARN("fail to init regexp", K(pattern), K(flags), K(ret));

View File

@ -248,6 +248,7 @@ int ObExprRegexpSubstr::eval_regexp_substr(
ObIAllocator &tmp_alloc = alloc_guard.get_allocator();
ObExprRegexContext local_regex_ctx;
ObExprRegexContext *regexp_ctx = &local_regex_ctx;
ObExprRegexpSessionVariables regexp_vars;
const bool reusable = (0 != expr.extra_) && ObExpr::INVALID_EXP_CTX_ID != expr.expr_ctx_id_;
bool is_case_sensitive = ObCharset::is_bin_sort(expr.args_[0]->datum_meta_.cs_type_);
ObString res_substr;
@ -271,9 +272,11 @@ int ObExprRegexpSubstr::eval_regexp_substr(
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObExprRegexContext::get_regexp_flags(match_param, is_case_sensitive, flags))) {
LOG_WARN("fail to get regexp flags", K(ret), K(match_param));
} else if (OB_FAIL(ctx.exec_ctx_.get_my_session()->get_regexp_session_vars(regexp_vars))) {
LOG_WARN("fail to get regexp");
} else if (!pattern->is_null() && !null_result &&
OB_FAIL(regexp_ctx->init(reusable ? ctx.exec_ctx_.get_allocator() : tmp_alloc,
ctx.exec_ctx_.get_my_session(),
regexp_vars,
pattern->get_string(), flags, reusable,
expr.args_[1]->datum_meta_.cs_type_))) {
LOG_WARN("fail to init regexp", K(pattern), K(flags), K(ret));
@ -368,4 +371,4 @@ int ObExprRegexpSubstr::is_valid_for_generated_column(const ObRawExpr*expr,
}
}
}
}

View File

@ -120,68 +120,152 @@ int ObExternalDataAccessDriver::pread(void *buf, const int64_t count, const int6
return ret;
}
class ObExternalFileListArrayOpWithFilter : public ObBaseDirEntryOperator
{
public:
ObExternalFileListArrayOpWithFilter(ObIArray <common::ObString>& name_array,
ObExternalPathFilter *filter,
ObIAllocator& array_allocator)
: name_array_(name_array), filter_(filter), allocator_(array_allocator) {}
int func(const dirent *entry) {
int ret = OB_SUCCESS;
if (OB_ISNULL(entry)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid list entry, entry is null");
} else if (OB_ISNULL(entry->d_name)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid list entry, d_name is null");
} else {
const ObString file_name(entry->d_name);
ObString tmp_file;
bool is_filtered = false;
if (!file_name.empty() && file_name[file_name.length() - 1] != '/') {
if (OB_NOT_NULL(filter_) && OB_FAIL(filter_->is_filtered(file_name, is_filtered))) {
LOG_WARN("fail check is filtered", K(ret));
} else if (!is_filtered) {
if (OB_FAIL(ob_write_string(allocator_, file_name, tmp_file, true))) {
OB_LOG(WARN, "fail to save file name", K(ret), K(file_name));
} else if (OB_FAIL(name_array_.push_back(tmp_file))) {
OB_LOG(WARN, "fail to push filename to array", K(ret), K(tmp_file));
}
}
}
}
return ret;
}
private:
ObIArray <ObString>& name_array_;
ObExternalPathFilter *filter_;
ObIAllocator& allocator_;
};
class ObLocalFileListArrayOpWithFilter : public ObBaseDirEntryOperator
{
public:
ObLocalFileListArrayOpWithFilter(ObIArray <common::ObString> &name_array,
const ObString &path,
const ObString &origin_path,
ObExternalPathFilter *filter,
ObIAllocator &array_allocator)
: name_array_(name_array), path_(path), origin_path_(origin_path),
filter_(filter), allocator_(array_allocator) {}
int func(const dirent *entry)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(entry)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid list entry, entry is null");
} else if (OB_ISNULL(entry->d_name)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid list entry, d_name is null");
} else {
const ObString file_name(entry->d_name);
ObSqlString full_path;
ObString tmp_file;
bool is_filtered = false;
ObString cur_path = path_;
if (file_name.case_compare(".") == 0
|| file_name.case_compare("..") == 0) {
//do nothing
} else if (OB_FAIL(full_path.assign(cur_path))) {
OB_LOG(WARN, "assign string failed", K(ret));
} else if (full_path.length() > 0 && *(full_path.ptr() + full_path.length() - 1) != '/' &&
OB_FAIL(full_path.append("/"))) {
OB_LOG(WARN, "append failed", K(ret)) ;
} else if (OB_FAIL(full_path.append(file_name))) {
OB_LOG(WARN, "append file name failed", K(ret));
} else if (OB_NOT_NULL(filter_) && OB_FAIL(filter_->is_filtered(full_path.string(), is_filtered))) {
LOG_WARN("fail check is filtered", K(ret));
} else if (!is_filtered) {
ObString target = full_path.string();
if (!is_dir_scan()) {
target += origin_path_.length();
if (!target.empty() && '/' == target[0]) {
target += 1;
}
}
if (target.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("empty dir or name", K(full_path), K(origin_path_));
} else if (OB_FAIL(ob_write_string(allocator_, target, tmp_file))) {
OB_LOG(WARN, "fail to save file name", K(ret), K(file_name));
} else if (OB_FAIL(name_array_.push_back(tmp_file))) {
OB_LOG(WARN, "fail to push filename to array", K(ret), K(tmp_file));
}
}
}
return ret;
}
private:
ObIArray <ObString> &name_array_;
const ObString &path_;
const ObString &origin_path_;
ObExternalPathFilter *filter_;
ObIAllocator &allocator_;
};
int ObExternalDataAccessDriver::get_file_list(const ObString &path,
const ObString &pattern,
const ObExprRegexpSessionVariables &regexp_vars,
ObIArray<ObString> &file_urls,
ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
const int64_t MAX_VISIT_COUNT = 100000;
ObArray<ObString> file_dirs;
ObExprRegexContext regexp_ctx;
ObExternalPathFilter filter(regexp_ctx, allocator);
if (OB_ISNULL(device_handle_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObExternalDataAccessDriver not init", K(ret));
} else if (!pattern.empty() && OB_FAIL(filter.init(pattern, regexp_vars))) {
LOG_WARN("fail to init filter", K(ret));
} else if (get_storage_type() == OB_STORAGE_OSS
|| get_storage_type() == OB_STORAGE_COS) {
ObSEArray<ObString, 16> temp_file_urls;
ObFileListArrayOp file_op(temp_file_urls, allocator);
ObExternalFileListArrayOpWithFilter file_op(file_urls, pattern.empty() ? NULL : &filter, allocator);
if (OB_FAIL(device_handle_->scan_dir(to_cstring(path), file_op))) {
LOG_WARN("scan dir failed", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < temp_file_urls.count(); i++) {
if (temp_file_urls.at(i).length() <= 0) {
//do nothing
} else if ( '/' == *(temp_file_urls.at(i).ptr() + temp_file_urls.at(i).length() - 1)) {
//is direcotry
} else {
OZ (file_urls.push_back(temp_file_urls.at(i)));
}
}
LOG_DEBUG("show oss files", K(file_urls), K(file_dirs));
} else if (get_storage_type() == OB_STORAGE_FILE) {
ObSEArray<ObString, 4> file_dirs;
OZ (file_dirs.push_back(path));
for (int64_t i = 0; OB_SUCC(ret) && i < file_dirs.count(); i++) {
ObString file_dir = file_dirs.at(i);
ObFullPathArrayOp dir_op(file_dirs, file_dir, allocator);
ObFullPathArrayOp file_op(file_urls, file_dir, allocator);
ObLocalFileListArrayOpWithFilter dir_op(file_dirs, file_dir, path, NULL, allocator);
ObLocalFileListArrayOpWithFilter file_op(file_urls, file_dir, path,
pattern.empty() ? NULL : &filter, allocator);
dir_op.set_dir_flag();
if (file_dir.case_compare(".") == 0
|| file_dir.case_compare("..") == 0) {
//do nothing
} else if (OB_FAIL(device_handle_->scan_dir(to_cstring(file_dir), file_op))) {
if (OB_FAIL(device_handle_->scan_dir(to_cstring(file_dir), file_op))) {
LOG_WARN("scan dir failed", K(ret));
} else if (OB_FAIL(device_handle_->scan_dir(to_cstring(file_dir), dir_op))) {
LOG_WARN("scan dir failed", K(ret));
} else if (file_dirs.count() + file_urls.count() > MAX_VISIT_COUNT) {
ret = OB_ERR_UNEXPECTED;
ret = OB_SIZE_OVERFLOW;
LOG_WARN("too many files and dirs to visit", K(ret));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < file_urls.count(); i++) {
if (file_urls.at(i).length() <= path.length()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid file url", K(ret), K(path), K(file_urls.at(i)));
} else {
file_urls.at(i) += path.length();
if (OB_LIKELY(file_urls.at(i).length() > 0)) {
if (OB_NOT_NULL(file_urls.at(i).ptr()) && *file_urls.at(i).ptr() == '/') {
file_urls.at(i) += 1;
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("file name length is invalid", K(ret));
}
}
}
}
return ret;
}

View File

@ -30,6 +30,7 @@ namespace common
}
namespace sql {
class ObExprRegexpSessionVariables;
class ObExternalDataAccessDriver
{
@ -43,9 +44,11 @@ public:
int get_file_sizes(const ObString &location, const ObIArray<ObString> &urls, ObIArray<int64_t> &file_sizes);
int pread(void *buf, const int64_t count, const int64_t offset, int64_t &read_size);
int get_file_list(const ObString &path,
ObIArray<ObString> &file_urls,
ObIAllocator &allocator);
int get_file_list(const common::ObString &path,
const common::ObString &pattern,
const ObExprRegexpSessionVariables &regexp_vars,
common::ObIArray<common::ObString> &file_urls,
common::ObIAllocator &allocator);
static int resolve_storage_type(const ObString &location, common::ObStorageType &device_type);
common::ObStorageType get_storage_type() { return storage_type_; }
void close();

View File

@ -37,6 +37,7 @@
#include "pl/sys_package/ob_dbms_sql.h"
#include "pl/ob_pl_package_state.h"
#include "rpc/obmysql/ob_sql_sock_session.h"
#include "sql/engine/expr/ob_expr_regexp_context.h"
using namespace oceanbase::common;
using namespace oceanbase::share;
@ -5380,6 +5381,14 @@ int ObBasicSessionInfo::get_regexp_time_limit(int64_t &v) const
return get_sys_variable(SYS_VAR_REGEXP_TIME_LIMIT, v);
}
int ObBasicSessionInfo::get_regexp_session_vars(ObExprRegexpSessionVariables &vars) const
{
int ret = OB_SUCCESS;
OZ (get_regexp_stack_limit(vars.regexp_stack_limit_));
OZ (get_regexp_time_limit(vars.regexp_time_limit_));
return ret;
}
void ObBasicSessionInfo::reset_tx_variable(bool reset_next_scope)
{
LOG_DEBUG("reset tx variable", K(lbt()));

View File

@ -55,6 +55,7 @@ class ObSMConnection;
using sql::FLTControlInfo;
namespace sql
{
class ObExprRegexpSessionVariables;
class ObPCMemPctConf;
class ObPartitionHitInfo
{
@ -645,6 +646,7 @@ public:
int get_sql_notes(bool &sql_notes) const;
int get_regexp_stack_limit(int64_t &v) const;
int get_regexp_time_limit(int64_t &v) const;
int get_regexp_session_vars(ObExprRegexpSessionVariables &vars) const;
int update_timezone_info();
const common::ObTimeZoneInfo *get_timezone_info() const { return tz_info_wrap_.get_time_zone_info(); }
const common::ObTimeZoneInfoWrap &get_tz_info_wrap() const { return tz_info_wrap_; }