support external table auto refresh
This commit is contained in:
parent
a08c6adbb9
commit
e8f5de0a1a
@ -2525,7 +2525,7 @@ typedef enum ObItemType
|
||||
T_MV_REFRESH_OPT,
|
||||
T_MV_BUILD_OPT,
|
||||
T_PSEUDO_EXTERNAL_FILE_ROW,
|
||||
|
||||
T_EXTERNAL_TABLE_AUTO_REFRESH,
|
||||
T_MAX //Attention: add a new type before T_MAX
|
||||
} ObItemType;
|
||||
|
||||
|
@ -44,6 +44,7 @@ ob_set_subtarget(ob_pl pl_cache
|
||||
)
|
||||
|
||||
ob_set_subtarget(ob_pl sys_package
|
||||
sys_package/ob_dbms_external_table.cpp
|
||||
sys_package/ob_dbms_stats.cpp
|
||||
sys_package/ob_dbms_scheduler_mysql.cpp
|
||||
sys_package/ob_dbms_application.cpp
|
||||
|
@ -71,6 +71,7 @@
|
||||
#include "pl/sys_package/ob_dbms_mview_stats_mysql.h"
|
||||
#include "pl/sys_package/ob_pl_dbms_trusted_certificate_manager.h"
|
||||
#include "pl/sys_package/ob_dbms_limit_calculator_mysql.h"
|
||||
#include "pl/sys_package/ob_dbms_external_table.h"
|
||||
|
||||
#ifdef INTERFACE_DEF
|
||||
INTERFACE_DEF(INTERFACE_START, "TEST", (ObPLInterfaceImpl::call))
|
||||
@ -764,6 +765,9 @@
|
||||
INTERFACE_DEF(INTERFACE_DBMS_OB_LIMIT_CALCULATOR_PHY_RES_CALCULATE_BY_STADNBY_TENANT, "PHY_RES_CALCULATE_BY_STANDBY_TENANT", (ObDBMSLimitCalculator::phy_res_calculate_by_standby_tenant))
|
||||
// end of dbms_ob_limit_calculator
|
||||
|
||||
// start of dbms_external_table
|
||||
INTERFACE_DEF(INTERFACE_DBMS_EXTERNAL_TABLE_AUTO_REFRESH_EXTERNAL_TABLE, "AUTO_REFRESH_EXTERNAL_TABLE", (ObDBMSExternalTable::auto_refresh_external_table))
|
||||
//end of dbms_external_table
|
||||
INTERFACE_DEF(INTERFACE_END, "INVALID", (nullptr))
|
||||
#endif
|
||||
|
||||
|
@ -263,6 +263,7 @@ static ObSysPackageFile oracle_sys_package_file_table[] = {
|
||||
{"json_array_t", "json_array_type.sql", "json_array_type_body.sql"},
|
||||
{"xmlsequence", "xml_sequence_type.sql", "xml_sequence_type_body.sql"},
|
||||
{"sdo_geometry", "sdo_geometry.sql", "sdo_geometry_body.sql"},
|
||||
{"dbms_external_table", "dbms_external_table.sql", "dbms_external_table_body.sql"},
|
||||
#endif
|
||||
};
|
||||
|
||||
@ -283,7 +284,8 @@ static ObSysPackageFile mysql_sys_package_file_table[] = {
|
||||
{"dbms_mview", "dbms_mview_mysql.sql", "dbms_mview_body_mysql.sql"},
|
||||
{"dbms_mview_stats", "dbms_mview_stats_mysql.sql", "dbms_mview_stats_body_mysql.sql"},
|
||||
{"dbms_trusted_certificate_manager", "dbms_trusted_certificate_manager_mysql.sql", "dbms_trusted_certificate_manager_body_mysql.sql"},
|
||||
{"dbms_ob_limit_calculator", "dbms_ob_limit_calculator_mysql.sql", "dbms_ob_limit_calculator_body_mysql.sql"}
|
||||
{"dbms_ob_limit_calculator", "dbms_ob_limit_calculator_mysql.sql", "dbms_ob_limit_calculator_body_mysql.sql"},
|
||||
{"dbms_external_table", "dbms_external_table_mysql.sql", "dbms_external_table_body_mysql.sql"}
|
||||
};
|
||||
|
||||
int ObPLPackageManager::load_sys_package(ObMySQLProxy &sql_proxy, ObString &package_name, ObCompatibilityMode compa_mode)
|
||||
|
50
src/pl/sys_package/ob_dbms_external_table.cpp
Normal file
50
src/pl/sys_package/ob_dbms_external_table.cpp
Normal file
@ -0,0 +1,50 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#define USING_LOG_PREFIX PL
|
||||
#include "pl/sys_package/ob_dbms_external_table.h"
|
||||
#include "pl/ob_pl_package_manager.h"
|
||||
#include "sql/resolver/expr/ob_raw_expr_util.h"
|
||||
#include "share/external_table/ob_external_table_file_mgr.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace sql;
|
||||
using namespace common;
|
||||
|
||||
namespace pl
|
||||
{
|
||||
|
||||
|
||||
int ObDBMSExternalTable::auto_refresh_external_table(ObExecContext &exec_ctx, ParamStore ¶ms, ObObj &result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int32_t interval = 0;
|
||||
if (params.count() == 0) {
|
||||
interval = 0;
|
||||
} else {
|
||||
const ObObjParam ¶m0 = params.at(0);
|
||||
if (param0.is_null()) {
|
||||
//do nothing
|
||||
} else if (OB_FAIL(param0.get_int32(interval))) {
|
||||
LOG_WARN("failed to get number", K(ret), K(param0));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
OZ (ObExternalTableFileManager::get_instance().auto_refresh_external_table(exec_ctx, interval));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // end of pl
|
||||
} // end oceanbase
|
31
src/pl/sys_package/ob_dbms_external_table.h
Normal file
31
src/pl/sys_package/ob_dbms_external_table.h
Normal file
@ -0,0 +1,31 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef OCEANBASE_SRC_PL_SYS_PACKAGE_DBMS_EXTERNAL_TABLE_H_
|
||||
#define OCEANBASE_SRC_PL_SYS_PACKAGE_DBMS_EXTERNAL_TABLE_H_
|
||||
#include "sql/engine/ob_exec_context.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace pl
|
||||
{
|
||||
|
||||
class ObDBMSExternalTable
|
||||
{
|
||||
public:
|
||||
static int auto_refresh_external_table(
|
||||
sql::ObExecContext &ctx, sql::ParamStore ¶ms, common::ObObj &result);
|
||||
};
|
||||
|
||||
} // end of pl
|
||||
} // end of oceanbase
|
||||
#endif /* OCEANBASE_SRC_PL_SYS_PACKAGE_DBMS_EXTERNAL_TABLE_H_ */
|
@ -41,12 +41,17 @@
|
||||
#include "sql/resolver/ddl/ob_alter_table_resolver.h"
|
||||
#include "share/external_table/ob_external_table_file_rpc_processor.h"
|
||||
#include "share/external_table/ob_external_table_file_rpc_proxy.h"
|
||||
#include "storage/ob_common_id_utils.h"
|
||||
#include "observer/dbms_scheduler/ob_dbms_sched_table_operator.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace observer;
|
||||
using namespace common;
|
||||
using namespace sql;
|
||||
using namespace transaction::tablelock;
|
||||
using namespace pl;
|
||||
using namespace common::sqlclient;
|
||||
namespace share
|
||||
{
|
||||
|
||||
@ -1231,6 +1236,252 @@ int ObExternalTableFileManager::flush_external_file_cache(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExternalTableFileManager::refresh_external_table(const uint64_t tenant_id,
|
||||
const uint64_t table_id,
|
||||
ObSchemaGetterGuard &schema_guard,
|
||||
ObExecContext &exec_ctx) {
|
||||
int ret = OB_SUCCESS;
|
||||
ObArray<ObString> file_urls;
|
||||
ObArray<int64_t> file_sizes;
|
||||
ObExprRegexpSessionVariables regexp_vars;
|
||||
const ObTableSchema *table_schema = NULL;
|
||||
OZ (schema_guard.get_table_schema(tenant_id,
|
||||
table_id,
|
||||
table_schema));
|
||||
CK (table_schema != NULL);
|
||||
OZ (refresh_external_table(tenant_id, table_schema, exec_ctx));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExternalTableFileManager::refresh_external_table(const uint64_t tenant_id,
|
||||
const ObTableSchema *table_schema,
|
||||
ObExecContext &exec_ctx) {
|
||||
int ret = OB_SUCCESS;
|
||||
ObArray<ObString> file_urls;
|
||||
ObArray<int64_t> file_sizes;
|
||||
ObExprRegexpSessionVariables regexp_vars;
|
||||
CK (table_schema != NULL);
|
||||
CK (exec_ctx.get_my_session() != NULL);
|
||||
if (OB_SUCC(ret) && ObSQLUtils::is_external_files_on_local_disk(table_schema->get_external_file_location())) {
|
||||
OZ (ObSQLUtils::check_location_access_priv(table_schema->get_external_file_location(), exec_ctx.get_my_session()));
|
||||
}
|
||||
ObSqlString full_path;
|
||||
CK (GCTX.location_service_);
|
||||
OZ (exec_ctx.get_my_session()->get_regexp_session_vars(regexp_vars));
|
||||
OZ (ObExternalTableUtils::collect_external_file_list(
|
||||
tenant_id,
|
||||
table_schema->get_table_id(),
|
||||
table_schema->get_external_file_location(),
|
||||
table_schema->get_external_file_location_access_info(),
|
||||
table_schema->get_external_file_pattern(), regexp_vars, exec_ctx.get_allocator(),
|
||||
full_path,
|
||||
file_urls, file_sizes));
|
||||
//TODO [External Table] opt performance
|
||||
ObSEArray<ObAddr, 8> all_servers;
|
||||
OZ (GCTX.location_service_->external_table_get(tenant_id, table_schema->get_table_id(), all_servers));
|
||||
OZ (ObExternalTableFileManager::get_instance().update_inner_table_file_list(exec_ctx, tenant_id, table_schema->get_table_id(), file_urls, file_sizes));
|
||||
if (OB_SUCC(ret)) {
|
||||
if (table_schema->is_partitioned_table()) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < table_schema->get_partition_num(); i++) {
|
||||
CK (OB_NOT_NULL(table_schema->get_part_array()[i]));
|
||||
OZ (ObExternalTableFileManager::get_instance().flush_external_file_cache(tenant_id, table_schema->get_table_id(),
|
||||
table_schema->get_part_array()[i]->get_part_id(), all_servers));
|
||||
}
|
||||
} else {
|
||||
OZ (ObExternalTableFileManager::get_instance().flush_external_file_cache(tenant_id, table_schema->get_table_id(),
|
||||
table_schema->get_table_id(), all_servers));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExternalTableFileManager::auto_refresh_external_table(ObExecContext &exec_ctx, const int64_t interval) {
|
||||
int ret = OB_SUCCESS;
|
||||
ObMySQLTransaction trans;
|
||||
CK (exec_ctx.get_my_session() != NULL);
|
||||
CK (exec_ctx.get_sql_ctx()->schema_guard_ != NULL);
|
||||
CK (OB_NOT_NULL(GCTX.sql_proxy_),
|
||||
OB_NOT_NULL(GCTX.schema_service_));
|
||||
uint64_t tenant_id = 0;
|
||||
if (OB_SUCC(ret)) {
|
||||
tenant_id = exec_ctx.get_my_session()->get_effective_tenant_id();
|
||||
}
|
||||
OZ (trans.start(GCTX.sql_proxy_, tenant_id));
|
||||
if (OB_SUCC(ret)) {
|
||||
if (interval == 0) {
|
||||
ObArray<const ObSimpleTableSchemaV2 *> table_schemas;
|
||||
OZ (exec_ctx.get_sql_ctx()->schema_guard_->get_table_schemas_in_tenant(tenant_id, table_schemas));
|
||||
for (int i = 0; OB_SUCC(ret) && i < table_schemas.count(); i++) {
|
||||
const ObSimpleTableSchemaV2 *simple_table = table_schemas.at(i);
|
||||
CK (simple_table != NULL);
|
||||
if (OB_SUCC(ret) && simple_table->get_table_type() == ObTableType::EXTERNAL_TABLE) {
|
||||
const ObTableSchema *table_schema = NULL;
|
||||
OZ (exec_ctx.get_sql_ctx()->schema_guard_->get_table_schema(tenant_id, simple_table->get_table_id(), table_schema));
|
||||
CK (table_schema != NULL);
|
||||
if (OB_SUCC(ret) && (2 == ((table_schema->get_table_flags() & 0B1100) >> 2))) {
|
||||
OZ (refresh_external_table(tenant_id, simple_table->get_table_id(), *exec_ctx.get_sql_ctx()->schema_guard_, exec_ctx));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (interval == -1) {
|
||||
OZ (delete_auto_refresh_job(exec_ctx, trans));
|
||||
} else if (interval > 0) {
|
||||
OZ (delete_auto_refresh_job(exec_ctx, trans));
|
||||
OZ (create_auto_refresh_job(exec_ctx, interval, trans));
|
||||
} else {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_USER_ERROR(OB_NOT_SUPPORTED, "interval value");
|
||||
LOG_WARN("interval not supported", K(ret), K(interval));
|
||||
}
|
||||
|
||||
}
|
||||
if (trans.is_started()) {
|
||||
int temp_ret = OB_SUCCESS;
|
||||
bool commit = OB_SUCC(ret);
|
||||
if (OB_SUCCESS != (temp_ret = trans.end(commit))) {
|
||||
ret = (OB_SUCC(ret)) ? temp_ret : ret;
|
||||
LOG_WARN("trans end failed", K(commit), K(temp_ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObExternalTableFileManager::delete_auto_refresh_job(ObExecContext &ctx, ObMySQLTransaction &trans) {
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
if (OB_FAIL(sql.append_fmt(
|
||||
"delete from %s where tenant_id = %lu and job_name= '%s'",
|
||||
share::OB_ALL_TENANT_SCHEDULER_JOB_TNAME,
|
||||
0UL, auto_refresh_job_name))) {
|
||||
LOG_WARN("failed to append fmt", K(ret));
|
||||
} else {
|
||||
int64_t affected_rows = 0;
|
||||
if (OB_ISNULL(ctx.get_my_session())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error", K(ret));
|
||||
} else if (OB_FAIL(trans.write(ctx.get_my_session()->get_effective_tenant_id(), sql.ptr(), affected_rows))) {
|
||||
LOG_WARN("execute sql failed", KR(ret), "sql", sql.ptr());
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExternalTableFileManager::create_repeat_job_sql_(const bool is_oracle_mode,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t job_id,
|
||||
const char *job_name,
|
||||
const ObString &exec_env,
|
||||
const int64_t start_usec,
|
||||
ObSqlString &job_action,
|
||||
ObSqlString &interval,
|
||||
const int64_t interval_ts,
|
||||
ObSqlString &raw_sql)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t end_date = 64060560000000000;//4000-01-01 00:00:00.000000
|
||||
int64_t default_duration_sec = 24 * 60 * 60; //one day
|
||||
share::ObDMLSqlSplicer dml;
|
||||
OZ (dml.add_pk_column("tenant_id", 0));
|
||||
OZ (dml.add_column("job_name", ObHexEscapeSqlStr(ObString(job_name))));
|
||||
OZ (dml.add_pk_column("job", job_id));
|
||||
OZ (dml.add_column("lowner", is_oracle_mode ? "SYS" : "root@%"));
|
||||
OZ (dml.add_column("powner", is_oracle_mode ? "SYS" : "root@%"));
|
||||
OZ (dml.add_column("cowner", is_oracle_mode ? "SYS" : "oceanbase"));
|
||||
OZ (dml.add_time_column("next_date", start_usec));
|
||||
OZ (dml.add_column("total", 0));
|
||||
OZ (dml.add_column("`interval#`", ObHexEscapeSqlStr(interval.string()))); //ObString("FREQ=SECONDLY; INTERVAL=1")
|
||||
OZ (dml.add_column("flag", 0));
|
||||
OZ (dml.add_column("what", ObHexEscapeSqlStr(job_action.string())));
|
||||
OZ (dml.add_column("nlsenv", ""));
|
||||
OZ (dml.add_column("field1", ""));
|
||||
OZ (dml.add_column("exec_env", ObHexEscapeSqlStr(exec_env)));
|
||||
OZ (dml.add_column("job_style", "REGULER"));
|
||||
OZ (dml.add_column("program_name", ""));
|
||||
OZ (dml.add_column("job_type", "STORED_PROCEDURE"));
|
||||
OZ (dml.add_column("job_action", ObHexEscapeSqlStr(job_action.string())));
|
||||
OZ (dml.add_column("number_of_argument", 0));
|
||||
OZ (dml.add_time_column("start_date", start_usec));
|
||||
OZ (dml.add_column("repeat_interval", ObHexEscapeSqlStr(interval.string()))); //ObString("FREQ=SECONDLY; INTERVAL=1")
|
||||
OZ (dml.add_raw_time_column("end_date", end_date));
|
||||
OZ (dml.add_column("job_class", "DEFAULT_JOB_CLASS"));
|
||||
OZ (dml.add_column("enabled", true));
|
||||
OZ (dml.add_column("auto_drop", false));
|
||||
OZ (dml.add_column("comments", "used to auto refresh external tables"));
|
||||
OZ (dml.add_column("credential_name", ""));
|
||||
OZ (dml.add_column("destination_name", ""));
|
||||
OZ (dml.add_column("interval_ts", interval_ts));
|
||||
OZ (dml.add_column("max_run_duration", default_duration_sec));
|
||||
OZ (dml.splice_values(raw_sql));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExternalTableFileManager::create_auto_refresh_job(ObExecContext &ctx, const int64_t interval, ObMySQLTransaction &trans) {
|
||||
int ret = OB_SUCCESS;
|
||||
#ifndef ALL_TENANT_SCHEDULER_JOB_COLUMN_NAME
|
||||
#define ALL_TENANT_SCHEDULER_JOB_COLUMN_NAME "tenant_id, " \
|
||||
"job_name, " \
|
||||
"job, " \
|
||||
"lowner, " \
|
||||
"powner, " \
|
||||
"cowner, " \
|
||||
"next_date," \
|
||||
"total," \
|
||||
"`interval#`," \
|
||||
"flag," \
|
||||
"what," \
|
||||
"nlsenv," \
|
||||
"field1," \
|
||||
"exec_env,"\
|
||||
"job_style,"\
|
||||
"program_name,"\
|
||||
"job_type,"\
|
||||
"job_action,"\
|
||||
"number_of_argument,"\
|
||||
"start_date,"\
|
||||
"repeat_interval,"\
|
||||
"end_date,"\
|
||||
"job_class,"\
|
||||
"enabled,"\
|
||||
"auto_drop,"\
|
||||
"comments,"\
|
||||
"credential_name,"\
|
||||
"destination_name,"\
|
||||
"interval_ts,"\
|
||||
"max_run_duration"
|
||||
#endif
|
||||
char buf[OB_MAX_PROC_ENV_LENGTH];
|
||||
int64_t pos = 0;
|
||||
CK (ctx.get_my_session() != NULL);
|
||||
OZ (sql::ObExecEnv::gen_exec_env(*ctx.get_my_session(), buf, OB_MAX_PROC_ENV_LENGTH, pos));
|
||||
ObString exec_env(pos, buf);
|
||||
ObCommonID raw_id;
|
||||
bool is_oracle_mode = false;
|
||||
OZ (ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(ctx.get_my_session()->get_effective_tenant_id(), is_oracle_mode));
|
||||
OZ (storage::ObCommonIDUtils::gen_unique_id(ctx.get_my_session()->get_effective_tenant_id(), raw_id));
|
||||
int64_t max_job_id = raw_id.id() + dbms_scheduler::ObDBMSSchedTableOperator::JOB_ID_OFFSET;
|
||||
ObSqlString raw_sql;
|
||||
OZ (raw_sql.append_fmt("INSERT INTO %s( "ALL_TENANT_SCHEDULER_JOB_COLUMN_NAME") VALUES ",
|
||||
share::OB_ALL_TENANT_SCHEDULER_JOB_TNAME));
|
||||
uint64_t start_usec = ObTimeUtility::current_time();
|
||||
ObSqlString job_action;
|
||||
ObSqlString interval_str;
|
||||
int64_t interval_ts = 1000000L * interval;
|
||||
OZ (job_action.append("dbms_external_table.auto_refresh_external_table()"));
|
||||
OZ (interval_str.append_fmt("FREQ=SECONDLY; INTERVAL=%ld", interval));
|
||||
ObSqlString tmp_sql;
|
||||
OZ (create_repeat_job_sql_(is_oracle_mode, 0, 0, auto_refresh_job_name, exec_env, start_usec, job_action, interval_str, interval_ts, tmp_sql));
|
||||
OZ (raw_sql.append_fmt("(%s)", tmp_sql.ptr()));
|
||||
tmp_sql.reset();
|
||||
OZ (create_repeat_job_sql_(is_oracle_mode, 0, max_job_id, auto_refresh_job_name, exec_env, start_usec, job_action, interval_str, interval_ts, tmp_sql));
|
||||
OZ (raw_sql.append_fmt(",(%s);", tmp_sql.ptr()));
|
||||
int64_t affected_rows = 0;
|
||||
OZ (trans.write(ctx.get_my_session()->get_effective_tenant_id(), raw_sql.ptr(), affected_rows));
|
||||
CK (affected_rows == 2);
|
||||
return ret;
|
||||
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObExternalFileInfo, file_url_, file_id_, file_addr_, file_size_, part_id_);
|
||||
|
||||
}
|
||||
|
@ -101,6 +101,8 @@ public:
|
||||
static const int64_t MAX_VERSION = INT64_MAX;
|
||||
static const int64_t LOAD_CACHE_LOCK_CNT = 16;
|
||||
static const int64_t LOCK_TIMEOUT = 2 * 1000000L;
|
||||
|
||||
const char* auto_refresh_job_name = "auto_refresh_external_table_job";
|
||||
const char ip_delimiter = '%';
|
||||
|
||||
ObExternalTableFileManager() {}
|
||||
@ -178,8 +180,19 @@ public:
|
||||
const uint64_t part_id,
|
||||
const ObIArray<ObAddr> &all_servers);
|
||||
|
||||
int refresh_external_table(const uint64_t tenant_id,
|
||||
const uint64_t table_id,
|
||||
ObSchemaGetterGuard &schema_guard,
|
||||
ObExecContext &exec_ctx);
|
||||
|
||||
int refresh_external_table(const uint64_t tenant_id,
|
||||
const ObTableSchema *table_schema,
|
||||
ObExecContext &exec_ctx);
|
||||
|
||||
int auto_refresh_external_table(ObExecContext &exec_ctx, const int64_t interval);
|
||||
private:
|
||||
int delete_auto_refresh_job(ObExecContext &exec_ctx, ObMySQLTransaction &trans);
|
||||
int create_auto_refresh_job(ObExecContext &ctx, const int64_t interval, ObMySQLTransaction &trans);
|
||||
int update_inner_table_files_list_by_part(
|
||||
ObMySQLTransaction &trans,
|
||||
const uint64_t tenant_id,
|
||||
@ -287,6 +300,17 @@ private:
|
||||
const ObString &part_name,
|
||||
ObNewRow &part_val);
|
||||
|
||||
int create_repeat_job_sql_(const bool is_oracle_mode,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t job_id,
|
||||
const char *job_name,
|
||||
const ObString &exec_env,
|
||||
const int64_t start_usec,
|
||||
ObSqlString &job_action,
|
||||
ObSqlString &interval,
|
||||
const int64_t interval_ts,
|
||||
ObSqlString &raw_sql);
|
||||
|
||||
private:
|
||||
common::ObSpinLock fill_cache_locks_[LOAD_CACHE_LOCK_CNT];
|
||||
common::ObKVCache<ObExternalTableFilesKey, ObExternalTableFiles> kv_cache_;
|
||||
|
10
src/share/inner_table/sys_package/dbms_external_table.sql
Normal file
10
src/share/inner_table/sys_package/dbms_external_table.sql
Normal file
@ -0,0 +1,10 @@
|
||||
#package_name:dbms_external_table
|
||||
#author:mingye.swj
|
||||
|
||||
CREATE OR REPLACE PACKAGE dbms_external_table IS
|
||||
|
||||
PROCEDURE AUTO_REFRESH_EXTERNAL_TABLE(
|
||||
interval IN BINARY_INTEGER := 0);
|
||||
|
||||
END dbms_external_table;
|
||||
//
|
@ -0,0 +1,10 @@
|
||||
#package_name:dbms_external_table
|
||||
#author:mingye.swj
|
||||
|
||||
CREATE OR REPLACE PACKAGE BODY dbms_external_table IS
|
||||
PROCEDURE AUTO_REFRESH_EXTERNAL_TABLE(
|
||||
interval IN BINARY_INTEGER := 0);
|
||||
PRAGMA INTERFACE(c, AUTO_REFRESH_EXTERNAL_TABLE);
|
||||
|
||||
END dbms_external_table;
|
||||
//
|
@ -0,0 +1,9 @@
|
||||
#package_name:dbms_external_table
|
||||
#author:mingye.swj
|
||||
|
||||
CREATE OR REPLACE PACKAGE BODY dbms_external_table
|
||||
PROCEDURE AUTO_REFRESH_EXTERNAL_TABLE(
|
||||
IN interval INT DEFAULT 0);
|
||||
PRAGMA INTERFACE(c, AUTO_REFRESH_EXTERNAL_TABLE);
|
||||
|
||||
END dbms_external_table;
|
@ -0,0 +1,9 @@
|
||||
#package_name:dbms_external_table
|
||||
#author:mingye.swj
|
||||
|
||||
CREATE OR REPLACE PACKAGE dbms_external_table AUTHID CURRENT_USER
|
||||
|
||||
PROCEDURE AUTO_REFRESH_EXTERNAL_TABLE(
|
||||
IN interval INT DEFAULT 0);
|
||||
|
||||
END dbms_external_table;
|
@ -5285,6 +5285,9 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s
|
||||
SHARE_SCHEMA_LOG(WARN, "fail to print LOCATION", K(ret));
|
||||
} else if (!pattern.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nPATTERN='%.*s'", pattern.length(), pattern.ptr()))) {
|
||||
SHARE_SCHEMA_LOG(WARN, "fail to print PATTERN", K(ret));
|
||||
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\nAUTO_REFRESH = %s", table_schema.get_external_table_auto_refresh() == 0 ? "OFF" :
|
||||
table_schema.get_external_table_auto_refresh() == 1 ? "IMMEDIATE" : "INTERVAL"))) {
|
||||
SHARE_SCHEMA_LOG(WARN, "fail to print AUTO REFRESH", K(ret));
|
||||
} else if (user_specified) {
|
||||
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\nPARTITION_TYPE=USER_SPECIFIED"))) {
|
||||
SHARE_SCHEMA_LOG(WARN, "fail to print PATTERN", K(ret));
|
||||
|
@ -148,8 +148,11 @@ static const uint64_t OB_MIN_ID = 0;//used for lower_bound
|
||||
// table_flags stored in __all_table.table_flag
|
||||
#define CASCADE_RLS_OBJECT_FLAG (INT64_C(1) << 0)
|
||||
#define EXTERNAL_TABLE_USER_SPECIFIED_PARTITION_FLAG (INT64_C(1) << 1)
|
||||
#define EXTERNAL_TABLE_AUTO_REFRESH_FLAG (INT64_C(1) << 2)
|
||||
#define EXTERNAL_TABLE_CREATE_ON_REFRESH_FLAG (INT64_C(1) << 3)
|
||||
|
||||
#define EXTERNAL_TABLE_AUTO_REFRESH_IMMEDIATE_FLAG (INT64_C(1) << 2)
|
||||
#define EXTERNAL_TABLE_AUTO_REFRESH_INTERVAL_FLAG (INT64_C(1) << 3)
|
||||
#define EXTERNAL_TABLE_AUTO_REFRESH_FLAG_OFFSET 2
|
||||
#define EXTERNAL_TABLE_AUTO_REFRESH_FLAG_BITS 2
|
||||
|
||||
// schema array size
|
||||
static const int64_t SCHEMA_SMALL_MALLOC_BLOCK_SIZE = 64;
|
||||
|
@ -1185,6 +1185,7 @@ public:
|
||||
int set_external_file_location_access_info(const common::ObString &access_info) { return deep_copy_str(access_info, external_file_location_access_info_); }
|
||||
int set_external_file_format(const common::ObString &format) { return deep_copy_str(format, external_file_format_); }
|
||||
int set_external_file_pattern(const common::ObString &pattern) { return deep_copy_str(pattern, external_file_pattern_); }
|
||||
void set_external_table_auto_refresh(const int64_t flag) { table_flags_ |= (flag << EXTERNAL_TABLE_AUTO_REFRESH_FLAG_OFFSET); }
|
||||
inline void set_user_specified_partition_for_external_table() { table_flags_ |= EXTERNAL_TABLE_USER_SPECIFIED_PARTITION_FLAG; }
|
||||
template<typename ColumnType>
|
||||
int add_column(const ColumnType &column);
|
||||
@ -1327,6 +1328,10 @@ public:
|
||||
const ObString &get_external_file_location_access_info() const { return external_file_location_access_info_; }
|
||||
const ObString &get_external_file_format() const { return external_file_format_; }
|
||||
const ObString &get_external_file_pattern() const { return external_file_pattern_; }
|
||||
int64_t get_external_table_auto_refresh() const { return (table_flags_ >> EXTERNAL_TABLE_AUTO_REFRESH_FLAG_OFFSET) & ((1 << EXTERNAL_TABLE_AUTO_REFRESH_FLAG_BITS) - 1); }
|
||||
bool is_external_table_immediate_refresh() const { return get_external_table_auto_refresh() == 1; }
|
||||
bool is_external_table_interval_refresh() const { return get_external_table_auto_refresh() == 2; }
|
||||
bool is_external_table_auto_refresh_off() const { return get_external_table_auto_refresh() == 0; }
|
||||
inline void set_name_generated_type(const ObNameGeneratedType is_sys_generated) {
|
||||
name_generated_type_ = is_sys_generated;
|
||||
}
|
||||
|
@ -8023,6 +8023,7 @@ int ObStaticEngineCG::set_other_properties(const ObLogPlan &log_plan, ObPhysical
|
||||
|
||||
ObArray<uint64_t> gtt_trans_scope_ids;
|
||||
ObArray<uint64_t> gtt_session_scope_ids;
|
||||
ObArray<uint64_t> immediate_refresh_external_table_ids;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < dependency_table->count(); i++) {
|
||||
if (DEPENDENCY_TABLE == dependency_table->at(i).object_type_) {
|
||||
const ObTableSchema *table_schema = NULL;
|
||||
@ -8042,10 +8043,15 @@ int ObStaticEngineCG::set_other_properties(const ObLogPlan &log_plan, ObPhysical
|
||||
if (OB_FAIL(gtt_session_scope_ids.push_back(object_id))) {
|
||||
LOG_WARN("fail to push back", K(ret));
|
||||
}
|
||||
} else if (table_schema->is_external_table() && table_schema->is_external_table_immediate_refresh()) {
|
||||
if (OB_FAIL(add_var_to_array_no_dup(immediate_refresh_external_table_ids, (uint64_t)object_id))) {
|
||||
LOG_WARN("fail to push back", K(ret));
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("plan contain temporary table",
|
||||
"trx level", table_schema->is_oracle_trx_tmp_table(),
|
||||
"session level", table_schema->is_oracle_sess_tmp_table());
|
||||
"session level", table_schema->is_oracle_sess_tmp_table(),
|
||||
"auto refresh external_table", table_schema->is_external_table());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -8054,6 +8060,8 @@ int ObStaticEngineCG::set_other_properties(const ObLogPlan &log_plan, ObPhysical
|
||||
LOG_WARN("fail to assign array", K(ret));
|
||||
} else if (OB_FAIL(phy_plan.get_gtt_session_scope_ids().assign(gtt_session_scope_ids))) {
|
||||
LOG_WARN("fail to assign array", K(ret));
|
||||
} else if (OB_FAIL(phy_plan.get_immediate_refresh_external_table_ids().assign(immediate_refresh_external_table_ids))) {
|
||||
LOG_WARN("fail to assign array", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -84,6 +84,7 @@ ObPhysicalPlan::ObPhysicalPlan(MemoryContext &mem_context /* = CURRENT_CONTEXT *
|
||||
contain_oracle_session_level_temporary_table_(false),
|
||||
gtt_session_scope_ids_(allocator_),
|
||||
gtt_trans_scope_ids_(allocator_),
|
||||
immediate_refresh_external_table_ids_(allocator_),
|
||||
concurrent_num_(0),
|
||||
max_concurrent_num_(ObMaxConcurrentParam::UNLIMITED),
|
||||
table_locations_(allocator_),
|
||||
@ -191,6 +192,7 @@ void ObPhysicalPlan::reset()
|
||||
contain_oracle_session_level_temporary_table_ = false;
|
||||
gtt_session_scope_ids_.reset();
|
||||
gtt_trans_scope_ids_.reset();
|
||||
immediate_refresh_external_table_ids_.reset();
|
||||
concurrent_num_ = 0;
|
||||
max_concurrent_num_ = ObMaxConcurrentParam::UNLIMITED;
|
||||
is_update_uniq_index_ = false;
|
||||
@ -809,7 +811,8 @@ OB_SERIALIZE_MEMBER(ObPhysicalPlan,
|
||||
stat_.format_sql_id_,
|
||||
mview_ids_,
|
||||
enable_inc_direct_load_,
|
||||
enable_replace_);
|
||||
enable_replace_,
|
||||
immediate_refresh_external_table_ids_);
|
||||
|
||||
int ObPhysicalPlan::set_table_locations(const ObTablePartitionInfoArray &infos,
|
||||
ObSchemaGetterGuard &schema_guard)
|
||||
|
@ -276,8 +276,10 @@ public:
|
||||
uint64_t get_session_id() const { return session_id_; }
|
||||
common::ObIArray<uint64_t> &get_gtt_trans_scope_ids() { return gtt_trans_scope_ids_; }
|
||||
common::ObIArray<uint64_t> &get_gtt_session_scope_ids() { return gtt_session_scope_ids_; }
|
||||
common::ObIArray<uint64_t> &get_immediate_refresh_external_table_ids() { return immediate_refresh_external_table_ids_; }
|
||||
bool is_contain_oracle_trx_level_temporary_table() const { return gtt_trans_scope_ids_.count() > 0; }
|
||||
bool is_contain_oracle_session_level_temporary_table() const { return gtt_session_scope_ids_.count() > 0; }
|
||||
bool is_contain_immediate_refresh_external_table() const { return immediate_refresh_external_table_ids_.count() > 0; }
|
||||
bool contains_temp_table() const {return 0 != session_id_; }
|
||||
void set_returning(bool is_returning) { is_returning_ = is_returning; }
|
||||
bool is_returning() const { return is_returning_; }
|
||||
@ -596,6 +598,7 @@ private:
|
||||
bool contain_oracle_session_level_temporary_table_; // not used
|
||||
common::ObFixedArray<uint64_t, common::ObIAllocator> gtt_session_scope_ids_;
|
||||
common::ObFixedArray<uint64_t, common::ObIAllocator> gtt_trans_scope_ids_;
|
||||
common::ObFixedArray<uint64_t, common::ObIAllocator> immediate_refresh_external_table_ids_;
|
||||
|
||||
//for outline use
|
||||
ObOutlineState outline_state_;
|
||||
|
@ -4844,6 +4844,16 @@ int ObSql::after_get_plan(ObPlanCacheCtx &pc_ctx,
|
||||
LOG_WARN("fail to set all local session vars", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && NULL != phy_plan) {
|
||||
CK (pc_ctx.exec_ctx_.get_sql_ctx()->schema_guard_ != NULL);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < phy_plan->get_immediate_refresh_external_table_ids().count(); i++) {
|
||||
int64_t object_id = phy_plan->get_immediate_refresh_external_table_ids().at(i);
|
||||
OZ (ObExternalTableFileManager::get_instance().refresh_external_table(session.get_effective_tenant_id(),
|
||||
object_id,
|
||||
*pc_ctx.exec_ctx_.get_sql_ctx()->schema_guard_,
|
||||
pc_ctx.exec_ctx_));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// not phy_plan, ignore
|
||||
}
|
||||
|
@ -64,6 +64,7 @@ static const NonReservedKeyword Mysql_none_reserved_keywords[] =
|
||||
{"auto", AUTO},
|
||||
{"auto_increment", AUTO_INCREMENT},
|
||||
{"auto_increment_mode", AUTO_INCREMENT_MODE},
|
||||
{"auto_refresh", AUTO_REFRESH},
|
||||
{"autoextend_size", AUTOEXTEND_SIZE},
|
||||
{"availability", AVAILABILITY},
|
||||
{"avg", AVG},
|
||||
|
@ -264,7 +264,7 @@ END_P SET_VAR DELIMITER
|
||||
ACCESS ACCOUNT ACTION ACTIVE ADDDATE AFTER AGAINST AGGREGATE ALGORITHM ALL_META ALL_USER ALWAYS ALLOW ANALYSE ANY
|
||||
APPROX_COUNT_DISTINCT APPROX_COUNT_DISTINCT_SYNOPSIS APPROX_COUNT_DISTINCT_SYNOPSIS_MERGE
|
||||
ARBITRATION ARRAY ASCII ASIS AT AUTHORS AUTO AUTOEXTEND_SIZE AUTO_INCREMENT AUTO_INCREMENT_MODE AVG AVG_ROW_LENGTH
|
||||
ACTIVATE AVAILABILITY ARCHIVELOG ASYNCHRONOUS AUDIT ADMIN
|
||||
ACTIVATE AVAILABILITY ARCHIVELOG ASYNCHRONOUS AUDIT ADMIN AUTO_REFRESH
|
||||
|
||||
BACKUP BACKUP_COPIES BALANCE BANDWIDTH BASE BASELINE BASELINE_ID BASIC BEGI BINDING SHARDING BINLOG BIT BIT_AND
|
||||
BIT_OR BIT_XOR BLOCK BLOCK_INDEX BLOCK_SIZE BLOOM_FILTER BOOL BOOLEAN BOOTSTRAP BTREE BYTE
|
||||
@ -7123,6 +7123,30 @@ TABLE_MODE opt_equal_mark STRING_VALUE
|
||||
{
|
||||
$$ = $1;
|
||||
}
|
||||
| AUTO_REFRESH opt_equal_mark OFF
|
||||
{
|
||||
(void)($2) ; /* make bison mute */
|
||||
ParseNode *int_node = NULL;
|
||||
malloc_terminal_node(int_node, result->malloc_pool_, T_INT);
|
||||
int_node->value_ = 0;
|
||||
malloc_non_terminal_node($$, result->malloc_pool_, T_EXTERNAL_TABLE_AUTO_REFRESH, 1, int_node);
|
||||
}
|
||||
| AUTO_REFRESH opt_equal_mark IMMEDIATE
|
||||
{
|
||||
(void)($2) ; /* make bison mute */
|
||||
ParseNode *int_node = NULL;
|
||||
malloc_terminal_node(int_node, result->malloc_pool_, T_INT);
|
||||
int_node->value_ = 1;
|
||||
malloc_non_terminal_node($$, result->malloc_pool_, T_EXTERNAL_TABLE_AUTO_REFRESH, 1, int_node);
|
||||
}
|
||||
| AUTO_REFRESH opt_equal_mark INTERVAL
|
||||
{
|
||||
(void)($2) ; /* make bison mute */
|
||||
ParseNode *int_node = NULL;
|
||||
malloc_terminal_node(int_node, result->malloc_pool_, T_INT);
|
||||
int_node->value_ = 2;
|
||||
malloc_non_terminal_node($$, result->malloc_pool_, T_EXTERNAL_TABLE_AUTO_REFRESH, 1, int_node);
|
||||
}
|
||||
;
|
||||
|
||||
parallel_option:
|
||||
@ -21558,6 +21582,7 @@ ACCOUNT
|
||||
| AUTOEXTEND_SIZE
|
||||
| AUTO_INCREMENT
|
||||
| AUTO_INCREMENT_MODE
|
||||
| AUTO_REFRESH
|
||||
| AVG
|
||||
| AVG_ROW_LENGTH
|
||||
| BACKUP
|
||||
|
@ -2533,6 +2533,27 @@ int ObDDLResolver::resolve_table_option(const ParseNode *option_node, const bool
|
||||
}
|
||||
break;
|
||||
}
|
||||
case T_EXTERNAL_TABLE_AUTO_REFRESH: {
|
||||
if (stmt::T_CREATE_TABLE != stmt_->get_stmt_type()) {
|
||||
ret = OB_ERR_UNEXPECTED; //TODO-EXTERNAL-TABLE add new error code
|
||||
LOG_WARN("invalid file format option", K(ret));
|
||||
} else {
|
||||
ObCreateTableArg &arg = static_cast<ObCreateTableStmt*>(stmt_)->get_create_table_arg();
|
||||
if (!arg.schema_.is_external_table()) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
ObSqlString err_msg;
|
||||
err_msg.append_fmt("Using CREATE ON REFRESH as a CREATE TABLE option");
|
||||
LOG_USER_ERROR(OB_NOT_SUPPORTED, err_msg.ptr());
|
||||
LOG_WARN("using CREATE ON REFRESH as a table option is support in external table only", K(ret));
|
||||
} else if (option_node->num_child_ != 1 || OB_ISNULL(option_node->children_[0])) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected child num", K(option_node->num_child_));
|
||||
} else {
|
||||
arg.schema_.set_external_table_auto_refresh(option_node->children_[0]->value_);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
/* won't be here */
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
Loading…
x
Reference in New Issue
Block a user