modify odps table

This commit is contained in:
cqliang1995 2024-11-22 08:17:12 +00:00 committed by ob-robot
parent f302270f0c
commit 3867bc5ceb
39 changed files with 1264 additions and 303 deletions

View File

@ -1203,6 +1203,7 @@
#define N_RB_BUILD "rb_build"
#define N_GET_MYSQL_ROUTINE_PARAMETER_TYPE_STR "get_mysql_routine_parameter_type_str"
#define N_ORA_LOGIN_USER "ora_login_user"
#define N_CALC_ODPS_SIZE "calc_odps_size"
#define N_PRIV_ST_GEOHASH "_st_geohash"
#define N_PRIV_ST_MAKEPOINT "_st_makepoint"
#endif //OCEANBASE_LIB_OB_NAME_DEF_H_

View File

@ -964,6 +964,7 @@ typedef enum ObItemType
T_FUN_SYS_CALC_SUB_PARTITION_NAME = 2053,
T_FUN_SYS_CALC_PARTITION_IDX = 2054,
T_FUN_SYS_CALC_SUB_PARTITION_IDX = 2055,
T_FUN_SYS_CALC_ODPS_SIZE = 2056,
T_MAX_OP = 3000,
//pseudo column, to mark the group iterator id
@ -2628,7 +2629,6 @@ typedef enum ObItemType
T_UNION_MERGE_HINT = 4740,
T_UNION_MERGE_LIST = 4741,
T_PSEUDO_OLD_NEW_COL = 4742,
T_TRANSFORM_DISTINCT_AGG = 4743,
@ -2653,6 +2653,10 @@ typedef enum ObItemType
T_UNIT_GROUP = 4759,
T_TRANSPOSE_TABLE = 4760,
T_FUN_UNPIVOT = 4761,
//odps external table
T_TUNNEL_ENDPOINT = 4762,
T_COLLECT_STATISTICS_ON_CREATE = 4763,
T_MAX //Attention: add a new type before T_MAX
} ObItemType;

View File

@ -43,6 +43,7 @@
#include "share/external_table/ob_external_table_file_rpc_proxy.h"
#include "storage/ob_common_id_utils.h"
#include "observer/dbms_scheduler/ob_dbms_sched_table_operator.h"
#include "sql/engine/cmd/ob_load_data_parser.h"
namespace oceanbase
{
@ -56,6 +57,21 @@ using namespace dbms_scheduler;
namespace share
{
int ObExternalFileInfo::deep_copy(ObIAllocator &allocator, const ObExternalFileInfo &other)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ob_write_string(allocator, other.file_url_, this->file_url_))) {
LOG_WARN("fail to write string", K(ret));
} else {
this->file_id_ = other.file_id_;
this->part_id_ = other.part_id_;
this->file_addr_ = other.file_addr_;
this->file_size_ = other.file_size_;
this->row_start_ = other.row_start_;
this->row_count_ = other.row_count_;
}
return ret;
}
int ObExternalTableFilesKey::deep_copy(char *buf, const int64_t buf_len, ObIKVCacheKey *&key) const
{
int ret = OB_SUCCESS;
@ -592,8 +608,10 @@ int ObExternalTableFileManager::calculate_odps_part_val_by_part_spec(const ObTab
{
int ret = OB_SUCCESS;
ObIAllocator &allocator = exec_ctx.get_allocator();
CK (OB_NOT_NULL(table_schema) && OB_LIKELY(table_schema->is_odps_external_table()));
if (OB_SUCC(ret)) {
bool is_odps_external_table = false;
if (OB_FAIL(ObSQLUtils::is_odps_external_table(table_schema, is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret));
} else if (is_odps_external_table) {
const common::ObPartitionKeyInfo &part_key_info = table_schema->get_partition_key_info();
const int part_key_size = part_key_info.get_size();
for (int64_t i = 0; OB_SUCC(ret) && i < file_infos.count(); i++) {
@ -602,8 +620,10 @@ int ObExternalTableFileManager::calculate_odps_part_val_by_part_spec(const ObTab
if (OB_FAIL(ObSQLUtils::extract_odps_part_spec(all_part_spec, part_spec_list))) {
LOG_WARN("failed to extract odps part spec", K(ret), K(all_part_spec));
} else if (part_spec_list.count() != part_key_size) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected count find part spec of odps", K(ret), K(file_infos), K(file_infos.count()), K(i), K(all_part_spec), K(part_spec_list.count()), K(part_key_size));
ret = OB_EXTERNAL_ODPS_UNEXPECTED_ERROR;
LOG_WARN("unexpected count find part spec of odps", K(ret), K(file_infos), K(file_infos.count()), K(i), K(all_part_spec), K(part_spec_list.count()), K(part_key_size));
LOG_USER_ERROR(OB_EXTERNAL_ODPS_UNEXPECTED_ERROR, "unexpected count of partition key between odps table and external table");
} else {
ObNewRow odps_part_row;
ObObj *obj_array = nullptr;
@ -626,8 +646,30 @@ int ObExternalTableFileManager::calculate_odps_part_val_by_part_spec(const ObTab
} else if (FALSE_IT(part_key_type = part_col->get_meta_type().get_type())) {
} else if (part_key_type == ObVarcharType ||
part_key_type == ObCharType) {
odps_part_row.get_cell(j).set_meta_type(part_col->get_meta_type());
odps_part_row.get_cell(j).set_varchar_value(part_spec.ptr(), part_spec.length());
oceanbase::common::ObObjMeta meta_type = part_col->get_meta_type();
ObCollationType coll_dst = static_cast<ObCollationType>(meta_type.get_cs_type());
ObCollationType coll_src = CS_TYPE_UTF8MB4_BIN;
int64_t dst_maxblen = 0;
int64_t src_minblen = 0;
if (OB_FAIL(ObCharset::get_mbmaxlen_by_coll(coll_dst, dst_maxblen))) {
LOG_WARN("failed to get dst mb max len", K(ret), K(coll_dst));
} else if (OB_FAIL(ObCharset::get_mbminlen_by_coll(coll_src, src_minblen))) {
LOG_WARN("failed to get src mb min len", K(ret), K(coll_src));
} else {
void *dst_buf = NULL;
uint64_t dst_buf_size = (part_spec.length() / src_minblen) * dst_maxblen;
uint32_t dst_len = 0;
if (OB_ISNULL(dst_buf = exec_ctx.get_allocator().alloc(dst_buf_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc buf", K(ret));
} else if (OB_FAIL(ObCharset::charset_convert(coll_src, part_spec.ptr(), part_spec.length(), coll_dst, static_cast<char*>(dst_buf), dst_buf_size, dst_len))) {
LOG_WARN("failed to convert charset", K(ret));
} else {
odps_part_row.get_cell(j).set_meta_type(part_col->get_meta_type());
odps_part_row.get_cell(j).set_varchar_value(static_cast<char*>(dst_buf),
static_cast<int64_t>(dst_len));
}
}
} else if (part_key_type == ObTinyIntType ||
part_key_type == ObSmallIntType ||
part_key_type == ObMediumIntType ||
@ -675,11 +717,16 @@ int ObExternalTableFileManager::calculate_file_part_val_by_file_name(const ObTab
OZ (cg_partition_expr_rt_expr(table_schema, exec_ctx.get_expr_factory(), exec_ctx.get_my_session(),
schema_guard, exec_ctx.get_allocator(), temp_exprs));
OZ (build_row_for_file_name(file_name_row, exec_ctx.get_allocator()));
bool is_odps_external_table = false;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObSQLUtils::is_odps_external_table(table_schema, is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < file_infos.count(); i++) {
ObNewRow list_val;
ObObj *obj_array = nullptr;
if (file_name_row.get_count() > 0) {
file_name_row.get_cell(0).set_string(ObVarcharType, table_schema->is_odps_external_table() ?
file_name_row.get_cell(0).set_string(ObVarcharType, is_odps_external_table ?
file_infos.at(i).file_url_.after(equals_delimiter) :
(is_local_storage ?
file_infos.at(i).file_url_.after(ip_delimiter) :
@ -754,12 +801,13 @@ int ObExternalTableFileManager::calculate_all_files_partitions(share::schema::Ob
ObArray<ObNewRow> existed_part_vals;
ObArray<int64_t> existed_part_ids;
ObArray<ObNewRow> file_part_vals;
bool is_odps_external_table = false;
CK (OB_NOT_NULL(table_schema) && OB_LIKELY(table_schema->is_external_table()));
OZ (get_all_partition_list_val(table_schema, existed_part_vals, existed_part_ids));
OX(is_odps_external_table = table_schema->is_odps_external_table());
if (is_odps_external_table) {
bool is_odps_external_table = false;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObSQLUtils::is_odps_external_table(table_schema, is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret));
} else if (is_odps_external_table) {
OZ(calculate_odps_part_val_by_part_spec(table_schema, file_infos, file_part_vals, schema_guard, exec_ctx));
} else {
OZ (calculate_file_part_val_by_file_name(table_schema, file_infos, file_part_vals, schema_guard, exec_ctx));
@ -810,11 +858,17 @@ int ObExternalTableFileManager::update_inner_table_file_list(
ObIArray<int64_t> &file_sizes,
ObIArray<uint64_t> &updated_part_ids,
bool &has_partition_changed,
const uint64_t part_id)
const uint64_t part_id,
bool collect_statistic)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
ObArenaAllocator allocator;
share::schema::ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = NULL;
OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard));
OZ (schema_guard.get_table_schema(tenant_id, table_id, table_schema));
CK (OB_NOT_NULL(table_schema));
CK (OB_NOT_NULL(GCTX.sql_proxy_),
OB_NOT_NULL(GCTX.schema_service_));
OZ (trans.start(GCTX.sql_proxy_, tenant_id));
@ -829,6 +883,10 @@ int ObExternalTableFileManager::update_inner_table_file_list(
} else {
OZ (update_inner_table_files_list_by_table(exec_ctx, trans, tenant_id, table_id, file_infos, updated_part_ids, has_partition_changed));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(collect_odps_table_statistics(collect_statistic, tenant_id, table_id, updated_part_ids, trans))) {
LOG_WARN("failed to collect odps table statistics", K(collect_statistic), K(tenant_id), K(table_id));
}
OZ (trans.end(true));
if (trans.is_started()) {
trans.end(false);
@ -836,6 +894,51 @@ int ObExternalTableFileManager::update_inner_table_file_list(
return ret;
}
int ObExternalTableFileManager::collect_odps_table_statistics(const bool collect_statistic,
const uint64_t tenant_id,
const uint64_t table_id,
ObIArray<uint64_t> &updated_part_ids,
ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
bool is_odps_external_table = false;
if (OB_FAIL(ObSQLUtils::is_odps_external_table(tenant_id, table_id, is_odps_external_table))) {
LOG_WARN("failed to check is odps table or not", K(ret), K(tenant_id), K(table_id));
} else if (is_odps_external_table && collect_statistic) {
int64_t update_rows = 0;
ObSqlString update_sql;
int64_t dop_of_collect_external_table_statistics = 16;
if (updated_part_ids.count() <= 32) {
// do nothing
} else {
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (OB_LIKELY(tenant_config.is_valid()) && static_cast<int64_t>(tenant_config->_dop_of_collect_external_table_statistics) > 0) {
dop_of_collect_external_table_statistics = tenant_config->_dop_of_collect_external_table_statistics;
} else {
double min_cpu;
double max_cpu;
if (OB_ISNULL(GCTX.omt_)) {
ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(GCTX.omt_->get_tenant_cpu(tenant_id, min_cpu, max_cpu))) {
LOG_WARN("fail to get tenant cpu", K(ret));
} else {
dop_of_collect_external_table_statistics = max_cpu;
}
}
}
OZ(update_sql.assign_fmt("UPDATE /*+ enable_parallel_dml parallel(%ld) */ %s SET FILE_SIZE = CALC_ODPS_SIZE(FILE_URL, TABLE_ID) WHERE TABLE_ID = %ld and PART_ID IN (",
dop_of_collect_external_table_statistics,
OB_ALL_EXTERNAL_TABLE_FILE_TNAME,
table_id));
for (int64_t i = 0; OB_SUCC(ret) && i < updated_part_ids.count(); i++) {
OZ(update_sql.append_fmt("%ld%c", updated_part_ids.at(i), ((updated_part_ids.count() - 1) == i) ? ' ' : ','));
}
OZ(update_sql.append(")"));
OZ(trans.write(tenant_id, update_sql.ptr(), update_rows));
}
return ret;
}
int ObExternalTableFileManager::get_file_sizes_by_map(ObIArray<ObString> &file_urls,
ObIArray<int64_t> &file_sizes,
common::hash::ObHashMap<ObString, int64_t> &map)
@ -1035,15 +1138,10 @@ int ObExternalTableFileManager::update_inner_table_files_list_by_part(
int64_t insert_rows = 0;
int64_t max_file_id = 0;// ObCSVTableRowIterator::MIN_EXTERNAL_TABLE_FILE_ID - 1
common::hash::ObHashMap<ObString, int64_t> hash_map;
share::schema::ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = NULL;
bool is_odps_external_table = false;
char file_url_buf[256] = { 0 };
OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard));
OZ (schema_guard.get_table_schema(tenant_id, table_id, table_schema));
CK (OB_NOT_NULL(table_schema));
if (OB_SUCC(ret)) {
is_odps_external_table = table_schema->is_odps_external_table();
bool is_odps_external_table = false;
if (OB_FAIL(ObSQLUtils::is_odps_external_table(tenant_id, table_id, is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret), K(tenant_id), K(table_id));
}
OZ(get_all_records_from_inner_table(allocator, tenant_id, table_id, partition_id, old_file_infos, old_file_ids));
OZ(hash_map.create(std::max(file_infos.count(), old_file_infos.count()) + 1, "ExternalFile"));
@ -1607,7 +1705,7 @@ int ObExternalTableFileManager::create_auto_refresh_job(ObExecContext &ctx, cons
}
OB_SERIALIZE_MEMBER(ObExternalFileInfo, file_url_, file_id_, file_addr_, file_size_, part_id_);
OB_SERIALIZE_MEMBER(ObExternalFileInfo, file_url_, file_id_, file_addr_, file_size_, part_id_, row_start_, row_count_);
}
}

View File

@ -32,13 +32,16 @@ class ObAlterTableStmt;
namespace share {
struct ObExternalFileInfo {
ObExternalFileInfo() : file_id_(INT64_MAX), part_id_(0), file_size_(0) {}
ObExternalFileInfo() : file_id_(INT64_MAX), part_id_(0), file_size_(0), row_start_(0), row_count_(0) {}
common::ObString file_url_;
int64_t file_id_;
int64_t part_id_;
common::ObAddr file_addr_;
int64_t file_size_;
TO_STRING_KV(K_(file_url), K_(file_id), K_(part_id), K_(file_addr), K_(file_size));
int64_t row_start_;
int64_t row_count_;
int deep_copy(ObIAllocator &allocator, const ObExternalFileInfo &other);
TO_STRING_KV(K_(file_url), K_(file_id), K_(part_id), K_(file_addr), K_(file_size), K_(row_start), K_(row_count));
OB_UNIS_VERSION(1);
};
@ -151,7 +154,8 @@ public:
common::ObIArray<int64_t> &file_sizes,
common::ObIArray<uint64_t> &updated_part_ids,
bool &has_partition_changed,
const uint64_t part_id = -1);
const uint64_t part_id = -1,
bool collect_statistic = true);
int get_all_records_from_inner_table(ObIAllocator &allocator,
int64_t tenant_id,
@ -197,6 +201,11 @@ public:
int auto_refresh_external_table(ObExecContext &exec_ctx, const int64_t interval);
private:
int collect_odps_table_statistics(const bool collect_statistic,
const uint64_t tenant_id,
const uint64_t table_id,
ObIArray<uint64_t> &updated_part_ids,
ObMySQLTransaction &trans);
int delete_auto_refresh_job(ObExecContext &exec_ctx, ObMySQLTransaction &trans);
int create_auto_refresh_job(ObExecContext &ctx, const int64_t interval, ObMySQLTransaction &trans);
int update_inner_table_files_list_by_part(

View File

@ -26,6 +26,7 @@
#include "deps/oblib/src/lib/net/ob_addr.h"
#include "share/external_table/ob_external_table_file_rpc_processor.h"
#include "share/external_table/ob_external_table_file_rpc_proxy.h"
#include "sql/executor/ob_task_spliter.h"
namespace oceanbase
{
@ -302,6 +303,7 @@ int ObExternalTableUtils::make_external_table_scan_range(const common::ObString
int ObExternalTableUtils::prepare_single_scan_range(const uint64_t tenant_id,
const uint64_t table_id,
const ObString &table_format_or_properties,
ObIArray<int64_t> &partition_ids,
ObIArray<ObNewRange *> &ranges,
ObIAllocator &range_allocator,
@ -311,6 +313,8 @@ int ObExternalTableUtils::prepare_single_scan_range(const uint64_t tenant_id,
ObSEArray<ObExternalFileInfo, 16> file_urls;
ObSEArray<ObNewRange *, 4> tmp_ranges;
ObSEArray<ObAddr, 16> all_locations;
ObExternalFileFormat::FormatType external_table_type;
bool is_odps_external_table = false;
if (OB_ISNULL(GCTX.location_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret));
@ -330,7 +334,10 @@ int ObExternalTableUtils::prepare_single_scan_range(const uint64_t tenant_id,
} else {
new_range.reset();
}
if (!file_urls.empty() && file_urls.at(0).file_id_ == 0) {// if file_id_ == 0 means, it's odps table
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObSQLUtils::is_odps_external_table(table_format_or_properties, is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret), K(table_format_or_properties));
} else if (!file_urls.empty() && is_odps_external_table) {
for (int64_t i = 0; OB_SUCC(ret) && i < file_urls.count(); ++i) {
const ObExternalFileInfo& external_info = file_urls.at(i);
ObNewRange *range = NULL;
@ -344,6 +351,7 @@ int ObExternalTableUtils::prepare_single_scan_range(const uint64_t tenant_id,
external_info.file_id_,
external_info.part_id_,
0,
//external_info.file_size_,
INT64_MAX,
range_allocator,
*range))) {
@ -428,6 +436,217 @@ int ObExternalPathFilter::init(const ObString &pattern,
return ret;
}
int ObExternalTableUtils::assign_odps_file_to_sqcs(
ObDfo &dfo,
ObIArray<ObPxSqcMeta *> &sqcs,
int64_t parallel)
{
int ret = OB_SUCCESS;
#ifdef OB_BUILD_CPP_ODPS
const common::ObIArray<share::ObExternalFileInfo> &files = dfo.get_external_table_files();
int64_t sqc_count = sqcs.count();
int64_t sqc_idx = 0;
bool use_partition_gi = false;
ObSEArray<const ObTableScanSpec *, 2> scan_ops;
const ObTableScanSpec *scan_op = nullptr;
const ObOpSpec *root_op = NULL;
dfo.get_root(root_op);
if (OB_ISNULL(root_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(ObTaskSpliter::find_scan_ops(scan_ops, *root_op))) {
LOG_WARN("failed to find scan_ops", K(ret), KP(root_op));
} else if (scan_ops.count() == 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("empty scan_ops", K(ret));
} else if (OB_FAIL(ObOdpsPartitionDownloaderMgr::fetch_row_count(MTL_ID(),
files,
scan_ops.at(0)->tsc_ctdef_.scan_ctdef_.external_file_format_str_.str_,
use_partition_gi))) {
LOG_WARN("failed to fetch row count", K(ret));
} else if (use_partition_gi) {
sqc_idx = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < files.count(); ++i) {
OZ (sqcs.at(sqc_idx++ % sqc_count)->get_access_external_table_files().push_back(files.at(i)));
}
} else {
ObSEArray<share::ObExternalFileInfo, 8> temp_block_files;
ObSEArray<int64_t, 8> sqc_row_counts;
int64_t big_file_row_count = 0;
int64_t small_file_count = 0;
const int64_t SMALL_FILE_SIZE = 100000;
const double DROP_FACTOR = 0.05;
for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); ++i) {
if (OB_FAIL(sqc_row_counts.push_back(0))) {
LOG_WARN("failed to init sqc_row_counts", K(i), K(ret));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < files.count(); ++i) {
if (files.at(i).file_size_ > SMALL_FILE_SIZE) {
big_file_row_count += files.at(i).file_size_;
} else {
++small_file_count;
}
}
int64_t avg_row_count_to_sqc = big_file_row_count / sqc_count;
int64_t expected_row_count_to_sqc = 0;
int64_t row_tail = big_file_row_count % sqc_count;
int64_t file_idx = 0;
int64_t file_start = 0;
int64_t row_count_assigned_to_sqc = 0;
int64_t remaining_row_count_in_file = 0;
int64_t row_count_needed_to_sqc = 0;
int64_t droped_row_count_on_last_loop = 0;
sqc_idx = 0;
LOG_TRACE("before split odps file", K(ret), K(small_file_count), K(big_file_row_count), K(sqc_count), K(row_tail), K(avg_row_count_to_sqc), K(files));
while (OB_SUCC(ret) && big_file_row_count && sqc_idx < sqc_count) {
expected_row_count_to_sqc = avg_row_count_to_sqc + droped_row_count_on_last_loop;
row_count_assigned_to_sqc = 0;
droped_row_count_on_last_loop = 0;
if (sqc_idx == sqc_count - 1) {
expected_row_count_to_sqc += row_tail;
}
while (OB_SUCC(ret) && row_count_assigned_to_sqc != expected_row_count_to_sqc) {
while (file_idx < files.count() && files.at(file_idx).file_size_ < SMALL_FILE_SIZE) { ++file_idx; }
if (file_idx >= files.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("big odps files is iter end", K(sqc_idx), K(sqc_count), K(row_count_assigned_to_sqc), K(expected_row_count_to_sqc), K(avg_row_count_to_sqc), K(row_tail), K(files.count()), K(file_idx));
} else {
remaining_row_count_in_file = files.at(file_idx).file_size_ - file_start;
row_count_needed_to_sqc = expected_row_count_to_sqc - row_count_assigned_to_sqc;
if (remaining_row_count_in_file >= row_count_needed_to_sqc) {
ObExternalFileInfo splited_file_info = files.at(file_idx);
splited_file_info.row_start_ = file_start;
splited_file_info.row_count_ = row_count_needed_to_sqc;
row_count_assigned_to_sqc += row_count_needed_to_sqc;
file_start += row_count_needed_to_sqc;
OZ (sqcs.at(sqc_idx)->get_access_external_table_files().push_back(splited_file_info));
} else if (remaining_row_count_in_file > 0) {
ObExternalFileInfo splited_file_info = files.at(file_idx);
splited_file_info.row_start_ = file_start;
splited_file_info.row_count_ = remaining_row_count_in_file;
row_count_assigned_to_sqc += remaining_row_count_in_file;
file_start += remaining_row_count_in_file;
OZ (sqcs.at(sqc_idx)->get_access_external_table_files().push_back(splited_file_info));
} else { //remaining_row_count_in_file == 0
++file_idx;
file_start = 0;
if (expected_row_count_to_sqc - row_count_assigned_to_sqc <= avg_row_count_to_sqc * DROP_FACTOR) {
droped_row_count_on_last_loop = expected_row_count_to_sqc - row_count_assigned_to_sqc;
expected_row_count_to_sqc = row_count_assigned_to_sqc;
}
}
}
}
sqc_row_counts.at(sqc_idx) = row_count_assigned_to_sqc;
sqc_idx++;
LOG_TRACE("assigned big odps file to sqc", K(ret), K(sqc_row_counts), K(row_count_assigned_to_sqc), K(expected_row_count_to_sqc), K(avg_row_count_to_sqc), K(file_idx), K(sqc_idx), K(remaining_row_count_in_file));
}
const int64_t block_cnt = parallel / sqcs.count() * 3;
LOG_TRACE("split big odps file to sqc, going to split files to block in sqc", K(ret), K(sqc_row_counts), K(sqcs.count()), K(block_cnt), K(parallel), K(small_file_count));
for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); ++i) {
ObIArray<share::ObExternalFileInfo> &sqc_files = sqcs.at(i)->get_access_external_table_files();
if (sqc_files.empty() || sqc_row_counts.at(i) <= 0) {
continue;
}
temp_block_files.reuse();
int64_t actual_block_cnt = block_cnt;
if (i < small_file_count % sqcs.count()) {
--actual_block_cnt;
}
LOG_WARN("split big odps file twice", K(i), K(i < small_file_count % sqcs.count()), K(actual_block_cnt), K(sqc_files), K(ret));
int64_t avg_row_count_to_block = sqc_row_counts.at(i) / actual_block_cnt;
int64_t expected_row_count_to_block = 0;
int64_t block_row_tail = sqc_row_counts.at(i) % actual_block_cnt;
int64_t block_idx = 0;
int64_t row_count_assigned_to_block = 0;
file_idx = 0;
file_start = sqc_row_counts.at(i) ? sqc_files.at(file_idx).row_start_ : 0; // sqc_row_counts.at(i) means sqc_files is not empty
droped_row_count_on_last_loop = 0;
while (OB_SUCC(ret) && sqc_row_counts.at(i) && block_idx < actual_block_cnt) {
expected_row_count_to_block = avg_row_count_to_block + droped_row_count_on_last_loop;
row_count_assigned_to_block = 0;
droped_row_count_on_last_loop = 0;
if (block_idx == actual_block_cnt - 1) {
expected_row_count_to_block += block_row_tail;
}
while (OB_SUCC(ret) && row_count_assigned_to_block != expected_row_count_to_block) {
if (file_idx >= sqc_files.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("files in sqc iter end", K(i), K(block_idx), K(actual_block_cnt), K(row_count_assigned_to_block), K(expected_row_count_to_block), K(avg_row_count_to_block), K(block_row_tail), K(file_start), K(file_idx), K(sqc_files));
} else {
int64_t remaining_row_count_in_file = sqc_files.at(file_idx).row_count_ - (file_start - sqc_files.at(file_idx).row_start_);
int64_t row_count_needed_to_block = expected_row_count_to_block - row_count_assigned_to_block;
if (remaining_row_count_in_file >= row_count_needed_to_block) {
ObExternalFileInfo splited_file_info = sqc_files.at(file_idx);
splited_file_info.row_start_ = file_start;
splited_file_info.row_count_ = row_count_needed_to_block;
row_count_assigned_to_block += row_count_needed_to_block;
file_start += row_count_needed_to_block;
OZ (temp_block_files.push_back(splited_file_info));
} else if (remaining_row_count_in_file > 0) {
ObExternalFileInfo splited_file_info = sqc_files.at(file_idx);
splited_file_info.row_start_ = file_start;
splited_file_info.row_count_ = remaining_row_count_in_file;
row_count_assigned_to_block += remaining_row_count_in_file;
file_start += remaining_row_count_in_file;
OZ (temp_block_files.push_back(splited_file_info));
} else { //remaining_row_count_in_file == 0
++file_idx;
file_start = file_idx >= sqc_files.count() ? 0 : sqc_files.at(file_idx).row_start_;
if (expected_row_count_to_block - row_count_assigned_to_block <= avg_row_count_to_block * DROP_FACTOR) {
droped_row_count_on_last_loop = expected_row_count_to_block - row_count_assigned_to_block;
expected_row_count_to_block = row_count_assigned_to_block;
}
}
}
}
block_idx++;
}
sqc_files.reuse();
int64_t part_id = OB_INVALID_ID;
for (int64_t j = 0; OB_SUCC(ret) && j < temp_block_files.count(); ++j) {
if (part_id != temp_block_files.at(j).part_id_) {// to find first distinct file
OZ(sqc_files.push_back(temp_block_files.at(j)));
part_id = temp_block_files.at(j).part_id_;
}
}
part_id = OB_INVALID_ID;
for (int64_t j = 0; OB_SUCC(ret) && j < temp_block_files.count(); ++j) {
if (part_id != temp_block_files.at(j).part_id_) {
part_id = temp_block_files.at(j).part_id_;
} else {
OZ(sqc_files.push_back(temp_block_files.at(j)));
}
}
}
sqc_idx = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < files.count(); ++i) {
if (files.at(i).file_size_ <= SMALL_FILE_SIZE) {
OZ (sqcs.at(sqc_idx++ % sqc_count)->get_access_external_table_files().push_back(files.at(i)));
}
}
}
if (OB_SUCC(ret)) {
ObExternalFileInfo dummy_file;
const char* dummy_file_name = "#######DUMMY_FILE#######";
dummy_file.file_url_ = dummy_file_name;
for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); ++i) {
if (sqcs.at(i)->get_access_external_table_files().empty()) {
OZ(sqcs.at(i)->get_access_external_table_files().push_back(dummy_file));
}
}
}
#else
int64_t sqc_idx = 0;
ret = OB_NOT_SUPPORTED;
LOG_WARN("not supported featue", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "odps external table");
#endif
return ret;
}
int ObExternalTableUtils::calc_assigned_files_to_sqcs(
const ObIArray<ObExternalFileInfo> &files,
ObIArray<int64_t> &assigned_idx,

View File

@ -17,6 +17,7 @@
#include "lib/string/ob_string.h"
#include "lib/allocator/page_arena.h"
#include "src/share/schema/ob_column_schema.h"
#include "sql/engine/px/ob_dfo.h"
namespace oceanbase
{
@ -96,6 +97,7 @@ class ObExternalTableUtils {
static int prepare_single_scan_range(const uint64_t tenant_id,
const uint64_t table_id,
const ObString &table_format_or_properties,
ObIArray<int64_t> &partition_ids,
common::ObIArray<common::ObNewRange *> &ranges,
common::ObIAllocator &range_allocator,
@ -107,6 +109,11 @@ class ObExternalTableUtils {
common::ObIArray<int64_t> &assigned_idx,
int64_t sqc_count);
static int assign_odps_file_to_sqcs(
ObDfo &dfo,
ObIArray<ObPxSqcMeta *> &sqcs,
int64_t parallel);
static int filter_files_in_locations(common::ObIArray<share::ObExternalFileInfo> &files,
common::ObIArray<common::ObAddr> &locations);

View File

@ -2310,6 +2310,14 @@ DEF_STR_WITH_CHECKER(ob_storage_s3_url_encode_type, OB_CLUSTER_PARAMETER, "defau
"\"compliantRfc3986Encoding\": Uses URL encoding that adheres to the RFC 3986 standard.",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_dop_of_collect_external_table_statistics, OB_TENANT_PARAMETER, "0", "[0,)",
"parallelism of pull statistics of external table",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_max_partition_count_to_collect_statistic, OB_TENANT_PARAMETER, "5", "[0,)",
"force odps external table to using block granule iterator when count of partition is under this config",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(ob_encoding_granularity, OB_TENANT_PARAMETER, "65536", "[8192, 1048576]",
"Maximum rows for encoding in one micro block. Range:[8192,1048576]",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -5687,12 +5687,12 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s
const ObString &format_string = table_schema.get_external_file_format();
const ObString &properties_string = table_schema.get_external_properties();
const bool user_specified = table_schema.is_user_specified_partition_for_external_table();
bool is_odps_table = false;
if (OB_FAIL(ObSQLUtils::is_external_odps_table(properties_string, allocator, is_odps_table))) {
LOG_WARN("failed check is odps table or not", K(ret));
} else if (!is_odps_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nLOCATION='%.*s'", location.length(), location.ptr()))) {
bool is_odps_external_table = false;
if (OB_FAIL(ObSQLUtils::is_odps_external_table(&table_schema, is_odps_external_table))) {
LOG_WARN("failed to check is odps table or not", K(ret));
} else if (!is_odps_external_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nLOCATION='%.*s'", location.length(), location.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print LOCATION", K(ret));
} else if (!is_odps_table && !pattern.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nPATTERN='%.*s'", pattern.length(), pattern.ptr()))) {
} else if (!is_odps_external_table && !pattern.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nPATTERN='%.*s'", pattern.length(), pattern.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print PATTERN", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\nAUTO_REFRESH = %s", table_schema.get_external_table_auto_refresh() == 0 ? "OFF" :
table_schema.get_external_table_auto_refresh() == 1 ? "IMMEDIATE" : "INTERVAL"))) {
@ -5706,19 +5706,19 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s
// 2. print file format
if (OB_SUCC(ret)) {
ObExternalFileFormat format;
const ObString &format_or_properties = is_odps_table ? properties_string : format_string;
if (format_or_properties.empty()) {
const ObString &table_format_or_properties = is_odps_external_table ? properties_string : format_string;
if (table_format_or_properties.empty()) {
ret = OB_ERR_UNEXPECTED;
SHARE_SCHEMA_LOG(WARN, "format_or_properties is empty", K(ret));
} else if (OB_FAIL(format.load_from_string(format_or_properties, allocator))) {
SHARE_SCHEMA_LOG(WARN, "table_format_or_properties is empty", K(ret));
} else if (OB_FAIL(format.load_from_string(table_format_or_properties, allocator))) {
SHARE_SCHEMA_LOG(WARN, "fail to load from json string", K(ret));
} else if (!(format.format_type_ > ObExternalFileFormat::INVALID_FORMAT
&& format.format_type_ < ObExternalFileFormat::MAX_FORMAT)) {
ret = OB_NOT_SUPPORTED;
SHARE_SCHEMA_LOG(WARN, "unsupported to print file format", K(ret), K(format.format_type_));
} else if (!is_odps_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nFORMAT (\n"))) {
} else if (!is_odps_external_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nFORMAT (\n"))) {
SHARE_SCHEMA_LOG(WARN, "fail to print FORMAT (", K(ret));
} else if (is_odps_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nPROPERTIES (\n"))) {
} else if (is_odps_external_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nPROPERTIES (\n"))) {
SHARE_SCHEMA_LOG(WARN, "fail to print FORMAT (", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, " TYPE = '%s',", ObExternalFileFormat::FORMAT_TYPE_STR[format.format_type_]))) {
SHARE_SCHEMA_LOG(WARN, "fail to print TYPE", K(ret));
@ -5764,25 +5764,34 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s
} else if (OB_SUCC(ret) && ObExternalFileFormat::ODPS_FORMAT == format.format_type_) {
const ObODPSGeneralFormat &odps = format.odps_format_;
ObString scret_str("********");
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[0], odps.access_type_.length(), odps.access_type_.ptr()))) {
int64_t option_names_idx = 0;
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.access_type_.length(), odps.access_type_.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (!odps.access_id_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[1], scret_str.length(), scret_str.ptr()))) {
} else if (!odps.access_id_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], scret_str.length(), scret_str.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (!odps.access_key_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[2], scret_str.length(), scret_str.ptr()))) {
} else if (!odps.access_key_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], scret_str.length(), scret_str.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (!odps.sts_token_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[3], scret_str.length(), scret_str.ptr()))) {
} else if (!odps.sts_token_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], scret_str.length(), scret_str.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[4], odps.endpoint_.length(), odps.endpoint_.ptr()))) {
} else if (odps.sts_token_.empty()) {
option_names_idx++;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.endpoint_.length(), odps.endpoint_.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[5], odps.project_.length(), odps.project_.ptr()))) {
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.tunnel_endpoint_.length(), odps.tunnel_endpoint_.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[6], odps.schema_.length(), odps.schema_.ptr()))) {
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.project_.length(), odps.project_.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[7], odps.table_.length(), odps.table_.ptr()))) {
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.schema_.length(), odps.schema_.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[8], odps.quota_.length(), odps.quota_.ptr()))) {
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.table_.length(), odps.table_.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[9], odps.compression_code_.length(), odps.compression_code_.ptr()))) {
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.quota_.length(), odps.quota_.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.compression_code_.length(), odps.compression_code_.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = %s,", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.collect_statistics_on_create_ ? "TRUE" : "FALSE"))) {
SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret));
}
}

View File

@ -1520,7 +1520,6 @@ public:
inline ObNameGeneratedType get_name_generated_type() const { return name_generated_type_; }
bool is_sys_generated_name(bool check_unknown) const;
inline bool is_user_specified_partition_for_external_table() const { return (table_flags_ & EXTERNAL_TABLE_USER_SPECIFIED_PARTITION_FLAG) != 0; }
inline bool is_odps_external_table() const { return !external_properties_.empty(); }
inline bool is_index_visible() const
{
return 0 == (index_attributes_set_ & ((uint64_t)(1) << INDEX_VISIBILITY));

View File

@ -815,6 +815,7 @@ ob_set_subtarget(ob_sql engine_expr
engine/expr/ob_expr_func_ceil.cpp
engine/expr/ob_expr_eval_functions.cpp
engine/expr/ob_expr_in.cpp
engine/expr/ob_expr_calc_odps_size.cpp
)
ob_set_subtarget(ob_sql ALONE

View File

@ -69,13 +69,13 @@ int ObTscCgService::generate_tsc_ctdef(ObLogTableScan &op, ObTableScanCtDef &tsc
LOG_WARN("fail to check location access priv", K(ret));
} else {
scan_ctdef.is_external_table_ = true;
const ObString &format_or_properties = table_schema->get_external_file_format().empty() ?
const ObString &table_format_or_properties = table_schema->get_external_file_format().empty() ?
table_schema->get_external_properties() :
table_schema->get_external_file_format();
if (format_or_properties.empty()) {
if (table_format_or_properties.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("format_or_properties is empty", K(ret));
} else if (OB_FAIL(scan_ctdef.external_file_format_str_.store_str(format_or_properties))) {
LOG_WARN("table_format_or_properties is empty", K(ret));
} else if (OB_FAIL(scan_ctdef.external_file_format_str_.store_str(table_format_or_properties))) {
LOG_WARN("fail to set string", K(ret));
} else if (OB_FAIL(scan_ctdef.external_file_location_.store_str(table_schema->get_external_file_location()))) {
LOG_WARN("fail to set string", K(ret));

View File

@ -56,6 +56,8 @@ int64_t ObODPSGeneralFormat::to_json_kv_string(char *buf, const int64_t buf_len)
J_COMMA();
databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(endpoint_)));
J_COMMA();
databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(tunnel_endpoint_)));
J_COMMA();
databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(project_)));
J_COMMA();
databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(schema_)));
@ -65,6 +67,8 @@ int64_t ObODPSGeneralFormat::to_json_kv_string(char *buf, const int64_t buf_len)
databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(quota_)));
J_COMMA();
databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(compression_code_)));
J_COMMA();
databuff_printf(buf, buf_len, pos, "\"%s\":%s", OPTION_NAMES[idx++], STR_BOOL(collect_statistics_on_create_));
return pos;
}
@ -221,6 +225,8 @@ int ObODPSGeneralFormat::deep_copy(const ObODPSGeneralFormat &src) {
LOG_WARN("failed to deep copy", K(ret));
} else if (OB_FAIL(deep_copy_str(src.endpoint_, endpoint_))) {
LOG_WARN("failed to deep copy", K(ret));
} else if (OB_FAIL(deep_copy_str(src.tunnel_endpoint_, tunnel_endpoint_))) {
LOG_WARN("failed to deep copy", K(ret));
} else if (OB_FAIL(deep_copy_str(src.project_, project_))) {
LOG_WARN("failed to deep copy", K(ret));
} else if (OB_FAIL(deep_copy_str(src.schema_, schema_))) {
@ -231,6 +237,8 @@ int ObODPSGeneralFormat::deep_copy(const ObODPSGeneralFormat &src) {
LOG_WARN("failed to deep copy", K(ret));
} else if (OB_FAIL(deep_copy_str(src.compression_code_, compression_code_))) {
LOG_WARN("failed to deep copy", K(ret));
} else {
collect_statistics_on_create_ = src.collect_statistics_on_create_;
}
return ret;
}
@ -284,6 +292,15 @@ int ObODPSGeneralFormat::load_from_json_data(json::Pair *&node, ObIAllocator &al
}
node = node->get_next();
}
if (OB_NOT_NULL(node) && 0 == node->name_.case_compare(OPTION_NAMES[idx++])
&& json::JT_STRING == node->value_->get_type()) {
ObObj obj;
OZ (ObHexUtilsBase::unhex(node->value_->get_string(), allocator, obj));
if (OB_SUCC(ret) && !obj.is_null()) {
tunnel_endpoint_ = obj.get_string();
}
node = node->get_next();
}
if (OB_NOT_NULL(node) && 0 == node->name_.case_compare(OPTION_NAMES[idx++])
&& json::JT_STRING == node->value_->get_type()) {
ObObj obj;
@ -329,6 +346,14 @@ int ObODPSGeneralFormat::load_from_json_data(json::Pair *&node, ObIAllocator &al
}
node = node->get_next();
}
if (OB_NOT_NULL(node) && 0 == node->name_.case_compare(OPTION_NAMES[idx++])) {
if (json::JT_TRUE == node->value_->get_type()) {
collect_statistics_on_create_ = true;
} else {
collect_statistics_on_create_ = false;
}
node = node->get_next();
}
return ret;
}

View File

@ -37,11 +37,13 @@ struct ObODPSGeneralFormat {
access_key_(),
sts_token_(),
endpoint_(),
tunnel_endpoint_(),
project_(),
schema_(),
table_(),
quota_(),
compression_code_()
compression_code_(),
collect_statistics_on_create_(false)
{}
int deep_copy_str(const ObString &src,
ObString &dest);
@ -56,26 +58,30 @@ struct ObODPSGeneralFormat {
"ACCESSKEY",
"STSTOKEN",
"ENDPOINT",
"TUNNEL_ENDPOINT",
"PROJECT_NAME",
"SCHEMA_NAME",
"TABLE_NAME",
"QUOTA_NAME",
"COMPRESSION_CODE",
"COLLECT_STATISTICS_ON_CREATE",
};
common::ObString access_type_;
common::ObString access_id_;
common::ObString access_key_;
common::ObString sts_token_;
common::ObString endpoint_;
common::ObString tunnel_endpoint_;
common::ObString project_;
common::ObString schema_;
common::ObString table_;
common::ObString quota_;
common::ObString compression_code_;
bool collect_statistics_on_create_;
common::ObArenaAllocator arena_alloc_;
int64_t to_json_kv_string(char* buf, const int64_t buf_len) const;
int load_from_json_data(json::Pair *&node, common::ObIAllocator &allocator);
TO_STRING_KV(K_(access_type), K_(access_id), K_(access_key), K_(sts_token), K_(endpoint), K_(project), K_(schema), K_(table), K_(quota), K_(compression_code));
TO_STRING_KV(K_(access_type), K_(access_id), K_(access_key), K_(sts_token), K_(endpoint), K_(tunnel_endpoint), K_(project), K_(schema), K_(table), K_(quota), K_(compression_code), K_(collect_statistics_on_create));
OB_UNIS_VERSION(1);
};

View File

@ -703,7 +703,23 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt)
//auto refresh after create external table
ObArray<uint64_t> updated_part_ids; //not used
bool has_partition_changed = false; //not used
OZ (ObExternalTableFileManager::get_instance().update_inner_table_file_list(ctx, tenant_id, res.table_id_, file_urls, file_sizes, updated_part_ids, has_partition_changed));
const uint64_t part_id = -1;
bool collect_statistics_on_create = false;
bool is_odps_external_table = false;
if (OB_FAIL(ObSQLUtils::is_odps_external_table(&table_schema, is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret));
} else if (is_odps_external_table) {
sql::ObExternalFileFormat ex_format;
ex_format.format_type_ = sql::ObExternalFileFormat::ODPS_FORMAT;
ObArenaAllocator allocator("CreateTableExec");
if (OB_FAIL(ex_format.load_from_string(table_schema.get_external_properties(), allocator))) {
LOG_WARN("failed to load from string", K(ret));
} else {
collect_statistics_on_create = ex_format.odps_format_.collect_statistics_on_create_;
}
}
OZ (ObExternalTableFileManager::get_instance().update_inner_table_file_list(ctx, tenant_id, res.table_id_, file_urls, file_sizes, updated_part_ids, has_partition_changed,
part_id, collect_statistics_on_create));
}
} else {
if (table_schema.is_external_table()) {

View File

@ -0,0 +1,115 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/table/ob_odps_table_row_iter.h"
#include "sql/engine/expr/ob_expr_calc_odps_size.h"
namespace oceanbase
{
using namespace oceanbase::common;
namespace sql
{
ObExprCalcOdpsSize::ObExprCalcOdpsSize(ObIAllocator &alloc)
: ObFuncExprOperator(alloc, T_FUN_SYS_CALC_ODPS_SIZE, N_CALC_ODPS_SIZE, 2, NOT_VALID_FOR_GENERATED_COL, NOT_ROW_DIMENSION)
{
}
int ObExprCalcOdpsSize::calc_result_type2(ObExprResType &type,
ObExprResType &type1,
ObExprResType &type2,
ObExprTypeCtx &type_ctx) const
{
UNUSED(type_ctx);
int ret = OB_SUCCESS;
UNUSED(type2);
if (NOT_ROW_DIMENSION == row_dimension_) {
if (ObMaxType == type1.get_type()) {
ret = OB_ERR_INVALID_TYPE_FOR_OP;
} else {
type.set_int();
type1.set_calc_type(ObVarcharType);
type2.set_calc_type(ObIntType);
type_ctx.set_cast_mode(type_ctx.get_cast_mode() | CM_WARN_ON_FAIL);
}
ObExprOperator::calc_result_flag2(type, type1, type2);
} else {
ret = OB_ERR_INVALID_TYPE_FOR_OP; // arithmetic not support row
}
return ret;
}
int ObExprCalcOdpsSize::calc_odps_size(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res_datum)
{
int ret = OB_SUCCESS;
ObDatum *file_url_datum = NULL;
ObDatum *table_id_datum = NULL;
int ret_file_url = expr.args_[0]->eval(ctx, file_url_datum);
int ret_table_id = expr.args_[1]->eval(ctx, table_id_datum);
if (OB_SUCCESS == ret_file_url && OB_SUCCESS == ret_table_id) {
if (file_url_datum->is_null() || table_id_datum->is_null()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null datum", K(file_url_datum->is_null()), K(table_id_datum->is_null()), K(ret));
} else {
const ObString file_url = file_url_datum->get_string();
const int64_t table_id = table_id_datum->get_int();
const uint64_t tenant_id = ctx.exec_ctx_.get_my_session()->get_effective_tenant_id();
ObSchemaGetterGuard *schema_guard = ctx.exec_ctx_.get_sql_ctx()->schema_guard_;
const ObTableSchema *table_schema = NULL;
int64_t row_count = 0;
if (OB_ISNULL(schema_guard)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(schema_guard->get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("failed to get table schema", K(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
#ifdef OB_BUILD_CPP_ODPS
} else if (OB_FAIL(ObOdpsPartitionDownloaderMgr::fetch_row_count(file_url,
table_schema->get_external_properties(),
row_count))) {
if (ret == OB_ODPS_ERROR) {
ret = OB_SUCCESS;
row_count = -1;
} else {
LOG_WARN("failed to fetch row count", K(ret), K(file_url), K(table_schema->get_external_properties()));
}
#else
} else {
row_count = -1;
#endif
}
if (OB_SUCC(ret)) {
res_datum.set_int(row_count);
}
}
} else {
ret = (OB_SUCCESS == ret_file_url) ? ret_table_id : ret_file_url;
}
return ret;
}
int ObExprCalcOdpsSize::cg_expr(ObExprCGCtx &expr_cg_ctx, const ObRawExpr &raw_expr,
ObExpr &rt_expr) const
{
int ret = OB_SUCCESS;
UNUSED(expr_cg_ctx);
CK (2 == raw_expr.get_param_count());
OX (rt_expr.eval_func_ = calc_odps_size);
return ret;
}
} // namespace sql
} // namespace oceanbase

View File

@ -0,0 +1,41 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_SQL_ENGINE_EXPR_CALC_ODPS_SIZE_
#define OCEANBASE_SQL_ENGINE_EXPR_CALC_ODPS_SIZE_
#include "sql/engine/expr/ob_expr_operator.h"
namespace oceanbase
{
namespace sql
{
class ObExprCalcOdpsSize : public ObFuncExprOperator
{
public:
explicit ObExprCalcOdpsSize(common::ObIAllocator &alloc);
virtual ~ObExprCalcOdpsSize() {};
virtual int calc_result_type2(ObExprResType &type,
ObExprResType &type1,
ObExprResType &type2,
common::ObExprTypeCtx &type_ctx) const;
static int calc_odps_size(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res_datum);
virtual int cg_expr(ObExprCGCtx &expr_cg_ctx, const ObRawExpr &raw_expr,
ObExpr &rt_expr) const override;
private:
DISALLOW_COPY_AND_ASSIGN(ObExprCalcOdpsSize) const;
};
} // namespace sql
} // namespace oceanbase
#endif // OCEANBASE_SQL_ENGINE_EXPR_CALC_ODPS_SIZE_

View File

@ -420,6 +420,7 @@
#include "ob_expr_array_distinct.h"
#include "ob_expr_array_remove.h"
#include "ob_expr_array_map.h"
#include "ob_expr_calc_odps_size.h"
#include "ob_expr_get_mysql_routine_parameter_type_str.h"
#include "ob_expr_priv_st_geohash.h"
#include "ob_expr_priv_st_makepoint.h"
@ -1315,6 +1316,7 @@ static ObExpr::EvalFunc g_expr_eval_functions[] = {
NULL, // ObExprCalcSubPartitionName::get_sub_partition_name, /* 788 */
NULL, // ObExprCalcPartitionIdx::get_partition_idx, /* 789 */
NULL, // ObExprCalcSubPartitionIdx::get_sub_partition_idx, /* 790 */
ObExprCalcOdpsSize::calc_odps_size, /* 791 */
};
static ObExpr::EvalBatchFunc g_expr_eval_batch_functions[] = {

View File

@ -484,6 +484,7 @@
#include "sql/engine/expr/ob_expr_array_distinct.h"
#include "sql/engine/expr/ob_expr_array_remove.h"
#include "sql/engine/expr/ob_expr_array_map.h"
#include "sql/engine/expr/ob_expr_calc_odps_size.h"
#include "sql/engine/expr/ob_expr_get_mysql_routine_parameter_type_str.h"
#include "sql/engine/expr/ob_expr_priv_st_geohash.h"
#include "sql/engine/expr/ob_expr_priv_st_makepoint.h"
@ -1197,6 +1198,7 @@ void ObExprOperatorFactory::register_expr_operators()
REG_OP(ObExprArrayRemove);
REG_OP(ObExprArrayMap);
REG_OP(ObExprGetMySQLRoutineParameterTypeStr);
REG_OP(ObExprCalcOdpsSize);
}();
// 注册oracle系统函数
REG_OP_ORCL(ObExprSysConnectByPath);
@ -1529,6 +1531,7 @@ void ObExprOperatorFactory::register_expr_operators()
REG_OP_ORCL(ObExprGetPath);
REG_OP_ORCL(ObExprDecodeTraceId);
REG_OP_ORCL(ObExprSplitPart);
REG_OP_ORCL(ObExprCalcOdpsSize);
}
bool ObExprOperatorFactory::is_expr_op_type_valid(ObExprOperatorType type)

View File

@ -194,12 +194,8 @@ int ObPxSqcMeta::assign(const ObPxSqcMeta &other)
for (int i = 0; OB_SUCC(ret) && i < other.access_external_table_files_.count(); i++) {
const ObExternalFileInfo &other_file = other.access_external_table_files_.at(i);
ObExternalFileInfo temp_file;
temp_file.file_id_ = other_file.file_id_;
temp_file.part_id_ = other_file.part_id_;
temp_file.file_addr_ = other_file.file_addr_;
temp_file.file_size_ = other_file.file_size_;
if (OB_FAIL(ob_write_string(allocator_, other_file.file_url_, temp_file.file_url_))) {
LOG_WARN("fail to write string", K(ret));
if (OB_FAIL(temp_file.deep_copy(allocator_, other_file))) {
LOG_WARN("fail to deep copy ObExternalFileInfo", K(ret));
} else if (OB_FAIL(access_external_table_files_.push_back(temp_file))) {
LOG_WARN("fail to push back", K(ret));
}

View File

@ -638,14 +638,14 @@ int ObGranulePump::add_new_gi_task(ObGranulePumpArgs &args)
// if (!(args.asc_order() || args.desc_order() || ObGITaskSet::GI_RANDOM_NONE != random_type)) {
// random_type = ObGITaskSet::GI_RANDOM_TASK;
// }
if (OB_FAIL(splitter.split_granule(args,
if (OB_FAIL(init_external_odps_table_downloader(args))) {
LOG_WARN("failed to init external odps table downloader", K(ret));
} else if (OB_FAIL(splitter.split_granule(args,
scan_ops,
gi_task_array_map_,
random_type,
partition_granule))) {
LOG_WARN("failed to prepare random gi task", K(ret), K(partition_granule));
} else if (OB_FAIL(init_external_odps_table_downloader(args))) {
LOG_WARN("failed to init external odps table downloader", K(ret));
}
}
return ret;
@ -655,10 +655,16 @@ int ObGranulePump::init_external_odps_table_downloader(ObGranulePumpArgs &args)
{
int ret = OB_SUCCESS;
const ObTableScanSpec *tsc = NULL;
sql::ObExternalFileFormat external_odps_format;
if (!args.external_table_files_.empty() &&
0 == args.external_table_files_.at(0).file_id_) { //file_id_ == 0 means it's a external odps table
ObIArray<const ObTableScanSpec *> &scan_ops = args.op_info_.get_scan_ops();
bool is_odps_external_table = false;
ObIArray<const ObTableScanSpec *> &scan_ops = args.op_info_.get_scan_ops();
if (scan_ops.count() == 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("empty scan_ops", K(ret));
} else if (OB_FAIL(ObSQLUtils::is_odps_external_table(scan_ops.at(0)->tsc_ctdef_.scan_ctdef_.external_file_format_str_.str_,
is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret));
} else if (!args.external_table_files_.empty() &&
is_odps_external_table) {
if (scan_ops.empty() || scan_ops.count() != gi_task_array_map_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid scan ops and gi task array result", K(ret), K(scan_ops.count()), K(gi_task_array_map_.count()));
@ -666,12 +672,11 @@ int ObGranulePump::init_external_odps_table_downloader(ObGranulePumpArgs &args)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
#ifdef OB_BUILD_CPP_ODPS
} else if (OB_FAIL(odps_partition_downloader_mgr_.init_downloader(args.external_table_files_,
tsc->tsc_ctdef_.scan_ctdef_.external_file_format_str_.str_))) {
} else if (OB_FAIL(odps_partition_downloader_mgr_.init_map(args.external_table_files_.count()))) {
LOG_WARN("init odps_partition_downloader_mgr_ failed", K(ret), K(args.external_table_files_.count()));
#endif
} else {
LOG_TRACE("succ to init odps table partition downloader", K(ret));
LOG_TRACE("succ to init odps table partition downloader", K(ret), K(is_odps_downloader_inited()));
#endif
}
}
return ret;
@ -721,10 +726,10 @@ int ObGranulePump::check_can_randomize(ObGranulePumpArgs &args, bool &can_random
void ObGranulePump::destroy()
{
gi_task_array_map_.reset();
pump_args_.reset();
#ifdef OB_BUILD_CPP_ODPS
odps_partition_downloader_mgr_.reset();
#endif
pump_args_.reset();
}
void ObGranulePump::reset_task_array()

View File

@ -568,6 +568,9 @@ public:
ret = odps_partition_downloader_mgr_.get_odps_downloader(part_id, downloader);
return ret;
}
inline ObOdpsPartitionDownloaderMgr::OdpsMgrMap& get_odps_map() {
return odps_partition_downloader_mgr_.get_odps_map();
}
inline bool is_odps_downloader_inited() { return odps_partition_downloader_mgr_.is_download_mgr_inited(); }
ObOdpsPartitionDownloaderMgr &get_odps_mgr() { return odps_partition_downloader_mgr_; }
#endif

View File

@ -116,36 +116,28 @@ int ObGranuleUtil::split_granule_for_external_table(ObIAllocator &allocator,
ObExternalFileFormat::ODPS_FORMAT == external_file_format.format_type_) {
#ifdef OB_BUILD_CPP_ODPS
int64_t task_idx = 0;
LOG_TRACE("odps external table granule switch", K(ret), K(external_table_files.count()), K(external_table_files));
for (int64_t i = 0; OB_SUCC(ret) && i < external_table_files.count(); ++i) {
const ObExternalFileInfo& external_info = external_table_files.at(i);
ObNewRange new_range;
if (0 != external_info.file_id_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected file id", K(ret), K(i), K(external_info.file_id_));
} else {
// file_size_ is the total row cnt of odps table partition
uint64_t block_cnt = (external_info.file_size_ + sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE - 1)
/ sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE;
uint64_t start_idx = 0;
block_cnt = (0 == block_cnt ? 1 : block_cnt); // one odps table partition should have at least one task, even it's empty
for (int64_t j = 0; OB_SUCC(ret) && j < block_cnt; ++j) {
ObNewRange new_range;
int64_t start = start_idx + (sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE * j);
int64_t end = sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE;
if (OB_FAIL(ObExternalTableUtils::make_external_table_scan_range(external_info.file_url_,
int64_t file_row_count = external_info.row_count_ ? external_info.row_count_ : (external_info.file_size_ > 0 ? external_info.file_size_ : INT64_MAX);
int64_t file_start = external_info.row_count_ ? external_info.row_start_ : 0;
if (OB_FAIL(ObExternalTableUtils::make_external_table_scan_range(external_info.file_url_,
external_info.file_id_,
external_info.part_id_,
start_idx + (sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE * j),
j == block_cnt -1 ?
INT64_MAX :
sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE,
file_start,
file_row_count,
allocator,
new_range))) {
LOG_WARN("failed to make external table scan range", K(ret));
} else if ((OB_FAIL(granule_ranges.push_back(new_range)) ||
OB_FAIL(granule_idx.push_back(task_idx++)) ||
OB_FAIL(granule_tablets.push_back(tablets.at(0))))) {
LOG_WARN("fail to push back", K(ret));
}
LOG_WARN("failed to make external table scan range", K(ret));
} else if ((OB_FAIL(granule_ranges.push_back(new_range)) ||
OB_FAIL(granule_idx.push_back(task_idx++)) ||
OB_FAIL(granule_tablets.push_back(tablets.at(0))))) {
LOG_WARN("fail to push back", K(ret));
}
}
}

View File

@ -273,7 +273,7 @@ public:
bool range_independent);
static int split_granule_for_external_table(common::ObIAllocator &allocator,
static int split_granule_for_external_table(ObIAllocator &allocator,
const ObTableScanSpec *tsc,
const common::ObIArray<common::ObNewRange> &input_ranges,
const common::ObIArray<ObDASTabletLoc*> &tablet_array,

View File

@ -34,6 +34,9 @@
#include "share/external_table/ob_external_table_file_mgr.h"
#include "rpc/obrpc/ob_net_keepalive.h"
#include "share/external_table/ob_external_table_utils.h"
#ifdef OB_BUILD_CPP_ODPS
#include "sql/engine/table/ob_odps_table_row_iter.h"
#endif
using namespace oceanbase::common;
@ -291,8 +294,28 @@ int ObPXServerAddrUtil::get_external_table_loc(
}
}
} else {
int64_t expected_location_cnt = std::min(dfo.get_dop(), dfo.get_external_table_files().count());
if (1 == expected_location_cnt) {
bool is_odps_external_table = false;
ObSEArray<const ObTableScanSpec *, 2> scan_ops;
const ObTableScanSpec *scan_op = nullptr;
const ObOpSpec *root_op = NULL;
int64_t expected_location_cnt = 0;
dfo.get_root(root_op);
if (OB_ISNULL(root_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(ObTaskSpliter::find_scan_ops(scan_ops, *root_op))) {
LOG_WARN("failed to find scan_ops", K(ret), KP(root_op));
} else if (scan_ops.count() == 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("empty scan_ops", K(ret));
} else if (OB_FAIL(ObSQLUtils::is_odps_external_table(scan_ops.at(0)->tsc_ctdef_.scan_ctdef_.external_file_format_str_.str_,
is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret));
} else if (FALSE_IT(expected_location_cnt = std::min(dfo.get_dop(),
((!ext_file_urls.empty() && is_odps_external_table) ?
all_locations.count() : dfo.get_external_table_files().count())))) {
} else if (1 == expected_location_cnt) {
if (OB_FAIL(target_locations.push_back(GCTX.self_addr()))) {
LOG_WARN("fail to push push back", K(ret));
}
@ -318,11 +341,13 @@ int ObPXServerAddrUtil::get_external_table_loc(
}
int ObPXServerAddrUtil::assign_external_files_to_sqc(
const ObIArray<ObExternalFileInfo> &files,
ObDfo &dfo,
bool is_file_on_disk,
ObIArray<ObPxSqcMeta *> &sqcs)
ObIArray<ObPxSqcMeta *> &sqcs,
int64_t parallel)
{
int ret = OB_SUCCESS;
const common::ObIArray<share::ObExternalFileInfo> &files = dfo.get_external_table_files();
if (is_file_on_disk) {
ObAddr pre_addr;
ObPxSqcMeta *target_sqc = NULL;
@ -346,19 +371,41 @@ int ObPXServerAddrUtil::assign_external_files_to_sqc(
}
}
} else {
ObArray<int64_t> file_assigned_sqc_ids;
OZ (ObExternalTableUtils::calc_assigned_files_to_sqcs(files, file_assigned_sqc_ids, sqcs.count()));
if (OB_SUCC(ret) && file_assigned_sqc_ids.count() != files.count()) {
bool is_odps_external_table = false;
ObSEArray<const ObTableScanSpec *, 2> scan_ops;
const ObTableScanSpec *scan_op = nullptr;
const ObOpSpec *root_op = NULL;
dfo.get_root(root_op);
if (OB_ISNULL(root_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid result of assigned sqc", K(file_assigned_sqc_ids.count()), K(files.count()));
}
for (int i = 0; OB_SUCC(ret) && i < file_assigned_sqc_ids.count(); i++) {
int64_t assign_sqc_idx = file_assigned_sqc_ids.at(i);
if (OB_UNLIKELY(assign_sqc_idx >= sqcs.count() || assign_sqc_idx < 0)) {
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(ObTaskSpliter::find_scan_ops(scan_ops, *root_op))) {
LOG_WARN("failed to find scan_ops", K(ret), KP(root_op));
} else if (scan_ops.count() == 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("empty scan_ops", K(ret));
} else if (OB_FAIL(ObSQLUtils::is_odps_external_table(scan_ops.at(0)->tsc_ctdef_.scan_ctdef_.external_file_format_str_.str_,
is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret));
} else if (is_odps_external_table) {
if (OB_FAIL(ObExternalTableUtils::assign_odps_file_to_sqcs(dfo, sqcs, parallel))) {
LOG_WARN("failed to assisn odps file to sqcs", K(files), K(ret));
}
} else {
ObArray<int64_t> file_assigned_sqc_ids;
OZ (ObExternalTableUtils::calc_assigned_files_to_sqcs(files, file_assigned_sqc_ids, sqcs.count()));
if (OB_SUCC(ret) && file_assigned_sqc_ids.count() != files.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected file idx", K(file_assigned_sqc_ids.at(i)));
} else {
OZ (sqcs.at(assign_sqc_idx)->get_access_external_table_files().push_back(files.at(i)));
LOG_WARN("invalid result of assigned sqc", K(file_assigned_sqc_ids.count()), K(files.count()));
}
for (int i = 0; OB_SUCC(ret) && i < file_assigned_sqc_ids.count(); i++) {
int64_t assign_sqc_idx = file_assigned_sqc_ids.at(i);
if (OB_UNLIKELY(assign_sqc_idx >= sqcs.count() || assign_sqc_idx < 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected file idx", K(file_assigned_sqc_ids.at(i)));
} else {
OZ (sqcs.at(assign_sqc_idx)->get_access_external_table_files().push_back(files.at(i)));
}
}
}
}
@ -648,8 +695,8 @@ int ObPXServerAddrUtil::build_dfo_sqc(ObExecContext &ctx,
}
if (OB_SUCC(ret) && !locations.empty()
&& (*locations.begin())->loc_meta_->is_external_table_) {
if (OB_FAIL(assign_external_files_to_sqc(dfo.get_external_table_files(),
(*locations.begin())->loc_meta_->is_external_files_on_disk_, sqcs))) {
if (OB_FAIL(assign_external_files_to_sqc(dfo,
(*locations.begin())->loc_meta_->is_external_files_on_disk_, sqcs, parallel))) {
LOG_WARN("fail to assign external files to sqc", K(ret));
}
}

View File

@ -280,9 +280,10 @@ private:
common::ObIArray<common::ObAddr> &dst_addrs);
static int sort_and_collect_local_file_distribution(common::ObIArray<share::ObExternalFileInfo> &files,
common::ObIArray<common::ObAddr> &dst_addrs);
static int assign_external_files_to_sqc(const common::ObIArray<share::ObExternalFileInfo> &files,
static int assign_external_files_to_sqc(ObDfo &dfo,
bool is_file_on_disk,
common::ObIArray<ObPxSqcMeta *> &sqcs);
common::ObIArray<ObPxSqcMeta *> &sqcs,
int64_t parallel);
private:
static int generate_dh_map_info(ObDfo &dfo);
DISALLOW_COPY_AND_ASSIGN(ObPXServerAddrUtil);

View File

@ -876,6 +876,8 @@ int ObExternalTableRowIterator::calc_file_partition_list_value(const int64_t par
share::schema::ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = NULL;
const ObPartition *partition = NULL;
ObExternalFileFormat::FormatType external_table_type;
bool is_odps_external_table = false;
if (OB_ISNULL(GCTX.schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error");
@ -888,7 +890,9 @@ int ObExternalTableRowIterator::calc_file_partition_list_value(const int64_t par
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", K(scan_param_->index_id_), K(scan_param_->tenant_id_));
} else if (table_schema->is_partitioned_table() && (table_schema->is_user_specified_partition_for_external_table() || table_schema->is_odps_external_table())) {
} else if (OB_FAIL(ObSQLUtils::is_odps_external_table(table_schema, is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret));
} else if (table_schema->is_partitioned_table() && (table_schema->is_user_specified_partition_for_external_table() || is_odps_external_table)) {
if (OB_FAIL(table_schema->get_partition_by_part_id(part_id, CHECK_PARTITION_MODE_NORMAL, partition))) {
LOG_WARN("get partition failed", K(ret), K(part_id));
} else if (OB_ISNULL(partition) || OB_UNLIKELY(partition->get_list_row_values().count() != 1)

View File

@ -22,40 +22,18 @@ namespace sql {
int ObODPSTableRowIterator::OdpsPartition::reset()
{
int ret = OB_SUCCESS;
try {
download_handle_->Complete();
download_handle_ = NULL;
record_count_ = -1;
name_ = "";
download_id_ = "";
} catch (apsara::odps::sdk::OdpsTunnelException& ex) {
if (OB_SUCC(ret)) {
ret = OB_ODPS_ERROR;
LOG_WARN("odps exception occured when calling Complete method", K(ret), K(ex.what()));
LOG_USER_ERROR(OB_ODPS_ERROR, ex.what());
}
} catch (const std::exception &ex) {
if (OB_SUCC(ret)) {
ret = OB_ODPS_ERROR;
LOG_WARN("odps exception occured when calling Complete method", K(ret), K(ex.what()), KP(this));
LOG_USER_ERROR(OB_ODPS_ERROR, ex.what());
}
} catch (...) {
if (OB_SUCC(ret)) {
ret = OB_ODPS_ERROR;
LOG_WARN("odps exception occured when calling Complete method", K(ret));
}
}
record_count_ = -1;
name_.clear();
return ret;
}
int ObODPSTableRowIterator::init_tunnel(const sql::ObODPSGeneralFormat &odps_format)
int ObODPSTableRowIterator::init_tunnel(const sql::ObODPSGeneralFormat &odps_format, bool need_decrypt)
{
int ret = OB_SUCCESS;
try {
if (OB_FAIL(odps_format_.deep_copy(odps_format))) {
LOG_WARN("failed to deep copy odps format", K(ret));
} else if (OB_FAIL(odps_format_.decrypt())) {
} else if (need_decrypt && OB_FAIL(odps_format_.decrypt())) {
LOG_WARN("failed to decrypt odps format", K(ret));
} else {
LOG_TRACE("init tunnel format", K(ret));
@ -89,6 +67,10 @@ int ObODPSTableRowIterator::init_tunnel(const sql::ObODPSGeneralFormat &odps_for
}
conf_.SetAccount(account_);
conf_.SetEndpoint(std::string(odps_format_.endpoint_.ptr(), odps_format_.endpoint_.length()));
if (!odps_format_.tunnel_endpoint_.empty()) {
LOG_TRACE("set tunnel endpoint", K(ret), K(odps_format_.tunnel_endpoint_));
conf_.SetTunnelEndpoint(std::string(odps_format_.tunnel_endpoint_.ptr(), odps_format_.tunnel_endpoint_.length()));
}
conf_.SetUserAgent("OB_ACCESS_ODPS");
conf_.SetTunnelQuotaName(std::string(odps_format_.quota_.ptr(), odps_format_.quota_.length()));
if (0 == odps_format_.compression_code_.case_compare("zlib")) {
@ -140,7 +122,7 @@ int ObODPSTableRowIterator::init_tunnel(const sql::ObODPSGeneralFormat &odps_for
return ret;
}
int ObODPSTableRowIterator::create_downloader(ObString &part_spec, apsara::odps::sdk::IDownloadPtr &downloader)
int ObODPSTableRowIterator::create_downloader(const ObString &part_spec, apsara::odps::sdk::IDownloadPtr &downloader)
{
int ret = OB_SUCCESS;
try {
@ -207,6 +189,7 @@ int ObODPSTableRowIterator::next_task()
{
int ret = OB_SUCCESS;
ObEvalCtx &eval_ctx = scan_param_->op_->get_eval_ctx();
LOG_TRACE("get a new task start", K(ret), K(batch_size_), K(state_));
int64_t task_idx = state_.task_idx_;
int64_t start = 0;
int64_t step = 0;
@ -225,39 +208,80 @@ int ObODPSTableRowIterator::next_task()
try {
const ObString &part_spec = scan_param_->key_ranges_.at(task_idx).get_start_key().get_obj_ptr()[ObExternalTableUtils::FILE_URL].get_string();
int64_t part_id = scan_param_->key_ranges_.at(task_idx).get_start_key().get_obj_ptr()[ObExternalTableUtils::PARTITION_ID].get_int();
std::string project(odps_format_.project_.ptr(), odps_format_.project_.length());
std::string table(odps_format_.table_.ptr(), odps_format_.table_.length());
std::string std_part_spec(part_spec.ptr(), part_spec.length());
std::string download_id("");
std::string schema(odps_format_.schema_.ptr(), odps_format_.schema_.length());
std::vector<std::string> column_names;
const ExprFixedArray &file_column_exprs = *(scan_param_->ext_file_column_exprs_);
for (int64_t column_idx = 0; column_idx < target_column_id_list_.count(); ++column_idx) {
if (file_column_exprs.at(column_idx)->type_ == T_PSEUDO_EXTERNAL_FILE_COL) {
column_names.emplace_back(column_list_.at(target_column_id_list_.at(column_idx)).name_);
}
}
if (part_spec.compare("#######DUMMY_FILE#######") == 0) {
ret = OB_ITER_END;
} else if (OB_ISNULL(sqc) &&
OB_ISNULL((state_.download_handle_ = tunnel_.CreateDownload(project,
table,
std_part_spec,
download_id,
schema)).get())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexcepted null ptr", K(ret));
} else if (OB_NOT_NULL(sqc) &&
!sqc->get_sqc_ctx().gi_pump_.is_odps_downloader_inited() &&
OB_ISNULL((state_.download_handle_ = tunnel_.CreateDownload(project,
table,
std_part_spec,
download_id,
schema)).get())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexcepted null ptr", K(ret));
} else if (OB_NOT_NULL(sqc) &&
sqc->get_sqc_ctx().gi_pump_.is_odps_downloader_inited() &&
OB_FAIL(sqc->get_sqc_ctx().gi_pump_.get_odps_downloader(part_id, state_.download_handle_))) {
LOG_WARN("failed to get odps downloader", K(ret), K(part_id));
LOG_WARN("empty file", K(ret));
} else {
if (OB_ISNULL(sqc) || !sqc->get_sqc_ctx().gi_pump_.is_odps_downloader_inited()) {
std::string project(odps_format_.project_.ptr(), odps_format_.project_.length());
std::string table(odps_format_.table_.ptr(), odps_format_.table_.length());
std::string std_part_spec(part_spec.ptr(), part_spec.length());
std::string download_id("");
std::string schema(odps_format_.schema_.ptr(), odps_format_.schema_.length());
state_.download_handle_ = tunnel_.CreateDownload(project,
table,
std_part_spec,
download_id,
schema);
state_.is_from_gi_pump_ = false;
LOG_TRACE("succ to create downloader handle without GI", K(ret), K(part_id), KP(sqc), K(state_.is_from_gi_pump_));
} else {
ObOdpsPartitionDownloaderMgr::OdpsMgrMap& odps_map = sqc->get_sqc_ctx().gi_pump_.get_odps_map();
state_.is_from_gi_pump_ = true;
if (OB_FAIL(sqc->get_sqc_ctx().gi_pump_.get_odps_downloader(part_id, state_.download_handle_))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader *temp_downloader = NULL;
if (sqc->get_sqc_ctx().gi_pump_.get_pump_args().empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected empty gi pump args", K(ret));
} else if (OB_ISNULL(temp_downloader = static_cast<ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader *>(
sqc->get_sqc_ctx().gi_pump_.get_odps_mgr().get_allocator().alloc(sizeof(ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory", K(ret), K(sizeof(ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader)));
} else if (FALSE_IT(new(temp_downloader)ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader())) {
} else if (OB_FAIL(temp_downloader->odps_driver_.init_tunnel(odps_format_, false))) {
LOG_WARN("failed to init tunnel", K(ret), K(part_id));
} else if (OB_FAIL(temp_downloader->odps_driver_.create_downloader(part_spec, temp_downloader->odps_partition_downloader_))) {
LOG_WARN("failed create odps partition downloader", K(ret), K(part_id));
} else if (OB_FAIL(odps_map.set_refactored(part_id, reinterpret_cast<int64_t>(temp_downloader), 0/*flag*/, 0/*broadcast*/, 0/*overwrite_key*/))) {
if (OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
if (OB_FAIL(sqc->get_sqc_ctx().gi_pump_.get_odps_downloader(part_id, state_.download_handle_))) {
LOG_WARN("failed to get from odps_map", K(part_id), K(ret));
} else {
LOG_TRACE("succ to get downloader handle from GI", K(ret), K(part_id), K(state_.is_from_gi_pump_));
}
} else {
LOG_WARN("fail to set refactored", K(ret));
}
temp_downloader->reset();
} else {
state_.download_handle_ = temp_downloader->odps_partition_downloader_;
LOG_TRACE("succ to create downloader handle and set it to GI", K(ret), K(part_id), K(state_.is_from_gi_pump_));
}
} else {
LOG_WARN("failed to get from odps_map", K(part_id), K(ret));
}
} else {
LOG_TRACE("succ to get downloader handle from GI", K(ret), K(part_id), K(state_.is_from_gi_pump_));
}
}
}
if (OB_FAIL(ret)) {
//do nothing
} else if (OB_ISNULL(state_.download_handle_.get())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexcepted null ptr", K(ret));
} else if (OB_ISNULL((state_.record_reader_handle_ = state_.download_handle_->OpenReader(start,
LOG_WARN("unexcepted null ptr", K(ret), KP(sqc), K(state_.is_from_gi_pump_));
} else if (column_names.size() &&
OB_ISNULL((state_.record_reader_handle_ = state_.download_handle_->OpenReader(start,
step,
column_names,
true)).get())) {
@ -266,17 +290,24 @@ int ObODPSTableRowIterator::next_task()
} else if (OB_FAIL(calc_file_partition_list_value(part_id, arena_alloc_, state_.part_list_val_))) {
LOG_WARN("failed to calc parttion list value", K(part_id), K(ret));
} else {
state_.task_idx_ = task_idx;
state_.part_id_ = part_id;
state_.start_ = start;
state_.step_ = step;
state_.count_ = 0;
state_.is_from_gi_pump_ = OB_NOT_NULL(sqc) && sqc->get_sqc_ctx().gi_pump_.is_odps_downloader_inited();
state_.download_id_ = state_.download_handle_->GetDownloadId();
state_.part_spec_ = std_part_spec;
// what if error occur after this line, how to close state_.record_reader_handle_?
LOG_TRACE("get a new task", K(ret), K(batch_size_), K(state_));
if (OB_SUCC(ret) && -1 == batch_size_) { // exec once only
int64_t real_time_partition_row_count = state_.download_handle_->GetRecordCount();
if (start >= real_time_partition_row_count) {
ret = OB_ITER_END;
LOG_WARN("odps iter end", K(ret), K(part_id), K(state_.start_), K(real_time_partition_row_count));
} else if (INT64_MAX == step || start + step > real_time_partition_row_count) {
step = real_time_partition_row_count - start;
}
if (OB_SUCC(ret)) {
state_.task_idx_ = task_idx;
state_.part_id_ = part_id;
state_.start_ = start;
state_.step_ = step;
state_.count_ = 0;
state_.download_id_ = state_.download_handle_->GetDownloadId();
// what if error occur after this line, how to close state_.record_reader_handle_?
LOG_TRACE("get a new task", K(ret), K(batch_size_), K(state_), K(real_time_partition_row_count), K(column_names.size()));
}
if (OB_SUCC(ret) && -1 == batch_size_ && column_names.size()) { // exec once only
batch_size_ = eval_ctx.max_batch_size_;
if (0 == batch_size_) {
// even state_.record_reader_handle_ was destroyed, record_/records_ is still valid.
@ -684,6 +715,7 @@ int ObODPSTableRowIterator::pull_partition_info()
partition_list_.reset();
std::vector<std::string> part_specs;
try {
LOG_TRACE("get partition names start", K(ret));
if (OB_ISNULL(table_handle_.get())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexcepted null ptr", K(ret));
@ -691,6 +723,7 @@ int ObODPSTableRowIterator::pull_partition_info()
table_handle_->GetPartitionNames(part_specs);
is_part_table_ = true;
}
LOG_TRACE("get partition names end", K(ret), K(is_part_table_));
} catch (apsara::odps::sdk::OdpsException& ex) {
std::string ex_msg = ex.what();
if (std::string::npos != ex_msg.find("ODPS-0110031")) { // ODPS-0110031 means table is not a partitional table
@ -714,30 +747,20 @@ int ObODPSTableRowIterator::pull_partition_info()
LOG_WARN("odps exception occured when calling GetPartitionNames method", K(ret));
}
}
if (OB_SUCC(ret) && !is_part_table_) {
part_specs.push_back("");
}
try {
std::string project(odps_format_.project_.ptr(), odps_format_.project_.length());
std::string table(odps_format_.table_.ptr(), odps_format_.table_.length());
std::string schema(odps_format_.schema_.ptr(), odps_format_.schema_.length());
for (std::vector<std::string>::iterator part_spec = part_specs.begin(); OB_SUCC(ret) && part_spec != part_specs.end(); part_spec++) {
std::string download_id("");
apsara::odps::sdk::IDownloadPtr download_handle = NULL;
if (is_part_table_) {
for (std::vector<std::string>::iterator part_spec = part_specs.begin(); OB_SUCC(ret) && part_spec != part_specs.end(); part_spec++) {
int64_t record_count = -1;
//apsara::odps::sdk::IODPSPartitionPtr partition = table_handle_->GetPartition(*part_spec);
//record_count = partition->GetPartitionSize();
if (OB_FAIL(partition_list_.push_back(OdpsPartition(*part_spec, record_count)))){
LOG_WARN("failed to push back partition_list_", K(ret));
}
}
} else {
int64_t record_count = -1;
if (OB_ISNULL((download_handle = tunnel_.CreateDownload(project,
table,
*part_spec,
download_id,
schema)).get())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexcepted null ptr", K(ret));
} else if (FALSE_IT(download_id = download_handle->GetDownloadId())) {
} else if (FALSE_IT(record_count = download_handle->GetRecordCount())) {
} else if (OB_FAIL(partition_list_.push_back(OdpsPartition(*part_spec,
download_handle,
download_id,
record_count)))){
//record_count = table_handle_->GetSize();
if (OB_FAIL(partition_list_.push_back(OdpsPartition("", record_count)))){
LOG_WARN("failed to push back partition_list_", K(ret));
}
}
@ -802,6 +825,30 @@ int ObODPSTableRowIterator::pull_column() {
return ret;
}
int ObODPSTableRowIterator::fill_partition_list_data(ObExpr &expr, int64_t returned_row_cnt) {
int ret = OB_SUCCESS;
ObEvalCtx &ctx = scan_param_->op_->get_eval_ctx();
ObDatum *datums = expr.locate_batch_datums(ctx);
ObObjType type = expr.obj_meta_.get_type();
if (expr.type_ == T_PSEUDO_PARTITION_LIST_COL) {
for (int64_t row_idx = 0; OB_SUCC(ret) && row_idx < returned_row_cnt; ++row_idx) {
int64_t loc_idx = expr.extra_ - 1;
if (OB_UNLIKELY(loc_idx < 0 || loc_idx >= state_.part_list_val_.get_count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexcepted loc_idx", K(ret), K(loc_idx), K(state_.part_list_val_.get_count()), KP(&state_.part_list_val_));
} else if (state_.part_list_val_.get_cell(loc_idx).is_null()) {
datums[row_idx].set_null();
} else {
CK (OB_NOT_NULL(datums[row_idx].ptr_));
OZ (datums[row_idx].from_obj(state_.part_list_val_.get_cell(loc_idx)));
}
}
} else {
//do nothing
}
return ret;
}
void ObODPSTableRowIterator::reset()
{
state_.reuse(); // reset state_ to initial values for rescan
@ -813,19 +860,20 @@ int ObODPSTableRowIterator::StateValues::reuse()
try {
if (-1 == task_idx_) {
// do nothing
} else if (OB_ISNULL(record_reader_handle_.get())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret), K(lbt()));
} else if (OB_ISNULL(download_handle_.get())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret), K(lbt()));
} else {
record_reader_handle_->Close();
record_reader_handle_.reset();
if (!is_from_gi_pump_) {
download_handle_->Complete();
if (OB_NOT_NULL(record_reader_handle_.get())) {
record_reader_handle_->Close();
record_reader_handle_.reset();
}
if (OB_ISNULL(download_handle_.get())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret), K(lbt()));
} else {
if (!is_from_gi_pump_) {
download_handle_->Complete();
}
download_handle_.reset();
}
download_handle_.reset();
}
} catch (const apsara::odps::sdk::OdpsTunnelException& ex) {
if (OB_SUCC(ret)) {
@ -850,21 +898,86 @@ int ObODPSTableRowIterator::StateValues::reuse()
start_ = 0;
step_ = 0;
count_ = 0;
part_spec_.clear();
download_id_.clear();
part_list_val_.reset();
is_from_gi_pump_ = false;
return ret;
}
int ObODPSTableRowIterator::retry_read_task()
{
int ret = OB_SUCCESS;
try {
LOG_TRACE("before retry read task", K(ret), K(state_), K(total_count_));
if (OB_NOT_NULL(state_.record_reader_handle_.get())) {
state_.record_reader_handle_->Close();
state_.record_reader_handle_.reset();
}
std::vector<std::string> column_names;
const ExprFixedArray &file_column_exprs = *(scan_param_->ext_file_column_exprs_);
for (int64_t column_idx = 0; column_idx < target_column_id_list_.count(); ++column_idx) {
if (file_column_exprs.at(column_idx)->type_ == T_PSEUDO_EXTERNAL_FILE_COL) {
column_names.emplace_back(column_list_.at(target_column_id_list_.at(column_idx)).name_);
}
}
if (column_names.size() &&
OB_ISNULL((state_.record_reader_handle_ = state_.download_handle_->OpenReader(state_.start_ + state_.count_,
state_.step_ - state_.count_,
column_names,
true)).get())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexcepted null ptr", K(ret));
} else {
state_.download_id_ = state_.download_handle_->GetDownloadId();
LOG_TRACE("retry odps task success", K(ret), K(state_), K(total_count_));
}
} catch (apsara::odps::sdk::OdpsException& ex) {
if (OB_SUCC(ret)) {
ret = OB_ODPS_ERROR;
LOG_WARN("odps exception occured when calling odps api", K(ret), K(ex.what()));
LOG_USER_ERROR(OB_ODPS_ERROR, ex.what());
}
} catch (const std::exception &ex) {
if (OB_SUCC(ret)) {
ret = OB_ODPS_ERROR;
LOG_WARN("odps exception occured when calling odps api", K(ret), K(ex.what()));
LOG_USER_ERROR(OB_ODPS_ERROR, ex.what());
}
} catch (...) {
if (OB_SUCC(ret)) {
ret = OB_ODPS_ERROR;
LOG_WARN("odps exception occured when calling odps api", K(ret));
}
}
return ret;
}
int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
{
int ret = 0;
int ret = OB_SUCCESS;
ObMallocHookAttrGuard guard(mem_attr_);
int64_t returned_row_cnt = 0;
ObEvalCtx &ctx = scan_param_->op_->get_eval_ctx();
const ExprFixedArray &file_column_exprs = *(scan_param_->ext_file_column_exprs_);
if (state_.count_ >= state_.step_ && OB_FAIL(next_task())) {
if (!file_column_exprs.count() ||
OB_ISNULL(state_.record_reader_handle_.get())) {
count = std::min(capacity, state_.step_ - state_.count_);
total_count_ += count;
state_.count_ += count;
for (int64_t column_idx = 0; OB_SUCC(ret) && column_idx < target_column_id_list_.count(); ++column_idx) {
ObExpr &expr = *file_column_exprs.at(column_idx);
if (OB_FAIL(fill_partition_list_data(expr, count))) {
LOG_WARN("failed to fill partition list data", K(ret), K(file_column_exprs.count()));
}
}
if (OB_SUCC(ret) &&
state_.count_ >= state_.step_ &&
OB_FAIL(next_task())) {
if (OB_ITER_END != ret) {
LOG_WARN("get next task failed", K(ret));
}
}
} else if (state_.count_ >= state_.step_ && OB_FAIL(next_task())) {
if (OB_ITER_END != ret) {
LOG_WARN("get next task failed", K(ret));
}
@ -889,13 +1002,16 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
state_.step_ = state_.count_; // goto get next task
count = 0;
} else if (0 == returned_row_cnt) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected returned_row_cnt", K(total_count_), K(returned_row_cnt), K(state_), K(ret));
LOG_TRACE("unexpected returned_row_cnt, going to retry read task", K(total_count_), K(returned_row_cnt), K(state_), K(ret));
if (OB_FAIL(retry_read_task())) {
LOG_WARN("failed to retry read task", K(ret), K(state_));
}
}
} else {
ret = OB_ODPS_ERROR;
LOG_WARN("odps exception occured when calling Read method", K(ret), K(total_count_), K(returned_row_cnt), K(ex.what()));
LOG_USER_ERROR(OB_ODPS_ERROR, ex.what());
LOG_TRACE("unexpected read error exception, going to retry read task", K(OB_ODPS_ERROR), K(total_count_), K(returned_row_cnt), K(state_), K(ret), K(ex.what()));
if (OB_FAIL(retry_read_task())) {
LOG_WARN("failed to retry read task", K(ret), K(state_));
}
}
}
} catch (const std::exception& ex) {
@ -916,29 +1032,22 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
state_.step_ = state_.count_; // goto get next task
count = 0;
} else if (0 == returned_row_cnt) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected returned_row_cnt", K(total_count_), K(returned_row_cnt), K(state_), K(ret));
// do nothing
LOG_TRACE("expected result: already retried reading task successfully", K(total_count_), K(returned_row_cnt), K(state_), K(ret));
} else {
int64_t data_idx = 0;
for (int64_t column_idx = 0; OB_SUCC(ret) && column_idx < target_column_id_list_.count(); ++column_idx) {
uint32_t target_idx = target_column_id_list_.at(column_idx);
ObExpr &expr = *file_column_exprs.at(column_idx);
ObDatum *datums = expr.locate_batch_datums(ctx);
ObObjType type = expr.obj_meta_.get_type();
if (expr.type_ == T_PSEUDO_PARTITION_LIST_COL) {
for (int64_t row_idx = 0; OB_SUCC(ret) && row_idx < returned_row_cnt; ++row_idx) {
int64_t loc_idx = file_column_exprs.at(column_idx)->extra_ - 1;
if (OB_UNLIKELY(loc_idx < 0 || loc_idx >= state_.part_list_val_.get_count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexcepted loc_idx", K(ret), K(loc_idx), K(state_.part_list_val_.get_count()), KP(&state_.part_list_val_));
} else if (state_.part_list_val_.get_cell(loc_idx).is_null()) {
datums[row_idx].set_null();
} else {
CK (OB_NOT_NULL(datums[row_idx].ptr_));
OZ (datums[row_idx].from_obj(state_.part_list_val_.get_cell(loc_idx)));
}
if (OB_FAIL(fill_partition_list_data(expr, returned_row_cnt))) {
LOG_WARN("failed to fill partition list data", K(ret));
}
} else {
apsara::odps::sdk::ODPSColumnType odps_type = column_list_.at(target_idx).type_info_.mType;
target_idx = data_idx++;
try {
switch(odps_type)
{
@ -1489,26 +1598,28 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
}
}
}
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(ctx);
batch_info_guard.set_batch_idx(0);
for (int i = 0; OB_SUCC(ret) && i < file_column_exprs.count(); i++) {
file_column_exprs.at(i)->set_evaluated_flag(ctx);
}
for (int i = 0; OB_SUCC(ret) && i < column_exprs_.count(); i++) {
ObExpr *column_expr = column_exprs_.at(i);
ObExpr *column_convert_expr = scan_param_->ext_column_convert_exprs_->at(i);
OZ (column_convert_expr->eval_batch(ctx, *bit_vector_cache_, returned_row_cnt));
if (OB_SUCC(ret)) {
MEMCPY(column_expr->locate_batch_datums(ctx),
column_convert_expr->locate_batch_datums(ctx), sizeof(ObDatum) * returned_row_cnt);
column_expr->set_evaluated_flag(ctx);
}
}
if (OB_SUCC(ret)) {
count = returned_row_cnt;
}
}
}
if (OB_SUCC(ret)) {
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(ctx);
batch_info_guard.set_batch_idx(0);
for (int i = 0; OB_SUCC(ret) && i < file_column_exprs.count(); i++) {
file_column_exprs.at(i)->set_evaluated_flag(ctx);
}
for (int i = 0; OB_SUCC(ret) && i < column_exprs_.count(); i++) {
ObExpr *column_expr = column_exprs_.at(i);
ObExpr *column_convert_expr = scan_param_->ext_column_convert_exprs_->at(i);
OZ (column_convert_expr->eval_batch(ctx, *bit_vector_cache_, count));
if (OB_SUCC(ret)) {
MEMCPY(column_expr->locate_batch_datums(ctx),
column_convert_expr->locate_batch_datums(ctx), sizeof(ObDatum) * count);
column_expr->set_evaluated_flag(ctx);
}
}
}
return ret;
}
@ -1520,9 +1631,12 @@ int ObODPSTableRowIterator::get_next_row()
LOG_WARN("get next task failed", K(ret));
}
} else {
if (OB_FAIL(inner_get_next_row())) {
LOG_WARN("failed to get next row inner", K(ret));
}
bool need_retry = false;
do {
if (OB_FAIL(inner_get_next_row(need_retry))) {
LOG_WARN("failed to get next row inner", K(ret));
}
} while (OB_SUCC(ret) && need_retry);
}
while(OB_SUCC(ret) && get_next_task_) { // used to get next task which has data need to fetch
if (state_.count_ >= state_.step_ && OB_FAIL(next_task())) {
@ -1530,33 +1644,47 @@ int ObODPSTableRowIterator::get_next_row()
LOG_WARN("get next task failed", K(ret));
}
} else {
if (OB_FAIL(inner_get_next_row())) {
LOG_WARN("failed to get next row inner", K(ret));
}
bool need_retry = false;
do {
if (OB_FAIL(inner_get_next_row(need_retry))) {
LOG_WARN("failed to get next row inner", K(ret));
}
} while (OB_SUCC(ret) && need_retry);
}
}
return ret;
}
int ObODPSTableRowIterator::inner_get_next_row()
int ObODPSTableRowIterator::inner_get_next_row(bool &need_retry)
{
int ret = OB_SUCCESS;
ObMallocHookAttrGuard guard(mem_attr_);
ObEvalCtx &ctx = scan_param_->op_->get_eval_ctx();
const ExprFixedArray &file_column_exprs = *(scan_param_->ext_file_column_exprs_);
get_next_task_ = false;
need_retry = false;
try {
if (!(state_.record_reader_handle_->Read(*record_))) {
if (OB_ISNULL(state_.record_reader_handle_.get()) || !file_column_exprs.count()) {
if (INT64_MAX == state_.step_ || state_.count_ == state_.step_) {
get_next_task_ = true; // goto get next task
state_.step_ = state_.count_;
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected end", K(total_count_), K(state_), K(ret));
++state_.count_;
++total_count_;
}
} else {
++state_.count_;
++total_count_;
if (!(state_.record_reader_handle_->Read(*record_))) {
if (INT64_MAX == state_.step_ || state_.count_ == state_.step_) {
get_next_task_ = true; // goto get next task
state_.step_ = state_.count_;
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected end", K(total_count_), K(state_), K(ret));
}
} else {
++state_.count_;
++total_count_;
}
}
} catch (apsara::odps::sdk::OdpsTunnelException& ex) {
if (OB_SUCC(ret)) {
@ -1567,13 +1695,20 @@ int ObODPSTableRowIterator::inner_get_next_row()
get_next_task_ = true; // goto get next task
state_.step_ = state_.count_;
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected end", K(total_count_), K(state_), K(ret));
LOG_TRACE("unexpected end, going to retry read task", K(total_count_), K(state_), K(ret));
if (OB_FAIL(retry_read_task())) {
LOG_WARN("failed to retry read task", K(ret), K(state_));
} else {
need_retry = true;
}
}
} else {
ret = OB_ODPS_ERROR;
LOG_WARN("odps exception occured when calling Read or Close method", K(ret), K(total_count_), K(ex.what()));
LOG_USER_ERROR(OB_ODPS_ERROR, ex.what());
LOG_WARN("odps exception occured when calling Read or Close method, going to retry read task", K(OB_ODPS_ERROR), K(ret), K(total_count_), K(ex.what()));
if (OB_FAIL(retry_read_task())) {
LOG_WARN("failed to retry read task", K(ret), K(state_));
} else {
need_retry = true;
}
}
}
} catch (const std::exception& ex) {
@ -1593,6 +1728,7 @@ int ObODPSTableRowIterator::inner_get_next_row()
} else if (get_next_task_) {
// do nothing
} else {
int64_t data_idx = 0;
for (int64_t column_idx = 0; OB_SUCC(ret) && column_idx < target_column_id_list_.count(); ++column_idx) {
uint32_t target_idx = target_column_id_list_.at(column_idx);
ObExpr &expr = *file_column_exprs.at(column_idx); // do not check null ptr
@ -1611,6 +1747,7 @@ int ObODPSTableRowIterator::inner_get_next_row()
}
} else {
apsara::odps::sdk::ODPSColumnType odps_type = column_list_.at(target_idx).type_info_.mType;
target_idx = data_idx++;
try {
switch(odps_type)
{
@ -2118,6 +2255,8 @@ int ObOdpsPartitionDownloaderMgr::init_downloader(common::ObArray<share::ObExter
if (0 != odps_partition.file_id_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected file id", K(ret), K(i), K(odps_partition.file_id_), K(odps_partition.part_id_));
} else if (OB_NOT_NULL(odps_mgr_map_.get(odps_partition.part_id_))) {
// do nothing
} else if (OB_ISNULL(downloader = static_cast<OdpsPartitionDownloader *>(
arena_alloc_.alloc(sizeof(OdpsPartitionDownloader))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -2143,6 +2282,79 @@ int ObOdpsPartitionDownloaderMgr::init_downloader(common::ObArray<share::ObExter
return ret;
}
int ObOdpsPartitionDownloaderMgr::fetch_row_count(uint64_t tenant_id,
const ObIArray<ObExternalFileInfo> &external_table_files,
const ObString &properties,
bool &use_partition_gi)
{
int ret = OB_SUCCESS;
sql::ObExternalFileFormat external_odps_format;
ObODPSTableRowIterator odps_driver;
common::ObArenaAllocator arena_alloc;
use_partition_gi = false;
int64_t uncollect_statistics_part_cnt = 0;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
int64_t max_parttition_count_to_collect_statistic = 5;
if (OB_LIKELY(tenant_config.is_valid())) {
max_parttition_count_to_collect_statistic = tenant_config->_max_partition_count_to_collect_statistic;
}
for (int64_t i = 0; i < external_table_files.count(); ++i) {
if (external_table_files.at(i).file_size_ < 0 && ++uncollect_statistics_part_cnt > max_parttition_count_to_collect_statistic) {
break;
}
}
if (uncollect_statistics_part_cnt > max_parttition_count_to_collect_statistic) {
use_partition_gi = true;
} else if (OB_FAIL(external_odps_format.load_from_string(properties, arena_alloc))) {
LOG_WARN("failed to init external_odps_format", K(ret));
} else if (OB_FAIL(odps_driver.init_tunnel(external_odps_format.odps_format_))) {
LOG_WARN("failed to init tunnel", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < external_table_files.count(); ++i) {
const share::ObExternalFileInfo &odps_partition = external_table_files.at(i);
apsara::odps::sdk::IDownloadPtr odps_partition_downloader = NULL;
if (0 != odps_partition.file_id_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected file id", K(ret), K(i), K(odps_partition.file_id_), K(odps_partition.part_id_));
} else if (odps_partition.file_size_ >= 0) {
// do nothing
} else if (OB_FAIL(odps_driver.create_downloader(odps_partition.file_url_,
odps_partition_downloader))) {
LOG_WARN("failed create odps partition downloader", K(ret), K(i), K(odps_partition.part_id_), K(odps_partition.file_url_));
} else {
*(const_cast<int64_t*>(&odps_partition.file_size_)) = odps_partition_downloader->GetRecordCount();
odps_partition_downloader->Complete();
}
}
}
return ret;
}
int ObOdpsPartitionDownloaderMgr::fetch_row_count(const ObString part_spec,
const ObString &properties,
int64_t &row_count)
{
int ret = OB_SUCCESS;
sql::ObExternalFileFormat external_odps_format;
ObODPSTableRowIterator odps_driver;
common::ObArenaAllocator arena_alloc;
row_count = 0;
if (OB_FAIL(external_odps_format.load_from_string(properties, arena_alloc))) {
LOG_WARN("failed to init external_odps_format", K(ret));
} else if (OB_FAIL(odps_driver.init_tunnel(external_odps_format.odps_format_))) {
LOG_WARN("failed to init tunnel", K(ret));
} else {
apsara::odps::sdk::IDownloadPtr odps_partition_downloader = NULL;
if (OB_FAIL(odps_driver.create_downloader(part_spec, odps_partition_downloader))) {
LOG_WARN("failed create odps partition downloader", K(ret), K(part_spec));
} else {
row_count = odps_partition_downloader->GetRecordCount();
odps_partition_downloader->Complete();
}
}
return ret;
}
int ObOdpsPartitionDownloaderMgr::create_upload_session(const sql::ObODPSGeneralFormat &odps_format,
const ObString &external_partition,
bool is_overwrite,
@ -2351,7 +2563,7 @@ int ObOdpsPartitionDownloaderMgr::commit_upload()
uint32_t task_count = static_cast<uint32_t>(odps_mgr_map_.size());
OdpsUploader *uploader = NULL;
try {
for (common::hash::ObHashMap<int64_t, int64_t>::iterator iter = odps_mgr_map_.begin();
for (OdpsMgrMap::iterator iter = odps_mgr_map_.begin();
OB_SUCC(ret) && iter != odps_mgr_map_.end(); iter++) {
if (OB_ISNULL(uploader = reinterpret_cast<OdpsUploader *>(iter->second))
|| OB_UNLIKELY(!uploader->record_writer_ || !uploader->upload_)) {
@ -2424,11 +2636,12 @@ int ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader::reset()
{
int ret = OB_SUCCESS;
try {
if (OB_ISNULL(odps_partition_downloader_)) {
if (OB_ISNULL(odps_partition_downloader_.get())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else {
odps_partition_downloader_->Complete();
odps_partition_downloader_.reset();
}
} catch (const apsara::odps::sdk::OdpsTunnelException& ex) {
if (OB_SUCC(ret)) {

View File

@ -46,7 +46,6 @@ public:
K(step_),
K(count_),
K(is_from_gi_pump_),
K(ObString(part_spec_.c_str())),
K(ObString(download_id_.c_str())));
int64_t task_idx_;
int64_t part_id_;
@ -56,32 +55,23 @@ public:
bool is_from_gi_pump_;
apsara::odps::sdk::IDownloadPtr download_handle_;
apsara::odps::sdk::IRecordReaderPtr record_reader_handle_;
std::string part_spec_;
std::string download_id_;
ObNewRow part_list_val_;
};
struct OdpsPartition {
OdpsPartition() :
name_(""),
download_handle_(NULL),
download_id_(""),
record_count_(-1)
{
}
OdpsPartition(const std::string &name) :
name_(name),
download_handle_(NULL),
download_id_(""),
record_count_(-1)
{
}
OdpsPartition(const std::string &name,
apsara::odps::sdk::IDownloadPtr download_handle,
const std::string download_id,
int64_t &record_count) :
name_(name),
download_handle_(download_handle),
download_id_(download_id),
record_count_(record_count)
{
}
@ -91,8 +81,6 @@ public:
int reset();
TO_STRING_KV(K(ObString(name_.c_str())), K(record_count_));
std::string name_;
apsara::odps::sdk::IDownloadPtr download_handle_;
std::string download_id_;
int64_t record_count_;
};
@ -151,8 +139,8 @@ public:
return common::OB_ERR_UNEXPECTED;
}
virtual void reset() override;
int init_tunnel(const sql::ObODPSGeneralFormat &odps_format);
int create_downloader(ObString &part_spec, apsara::odps::sdk::IDownloadPtr &downloader);
int init_tunnel(const sql::ObODPSGeneralFormat &odps_format, bool need_decrypt = true);
int create_downloader(const ObString &part_spec, apsara::odps::sdk::IDownloadPtr &downloader);
int pull_partition_info();
inline ObIArray<OdpsPartition>& get_partition_info() { return partition_list_; }
inline bool is_part_table() { return is_part_table_; }
@ -165,7 +153,7 @@ public:
const int32_t ob_type_precision,
const int32_t ob_type_scale);
private:
int inner_get_next_row();
int inner_get_next_row(bool &need_retry);
int prepare_expr();
int pull_column();
int next_task();
@ -186,6 +174,8 @@ private:
}
return is_valid;
}
int fill_partition_list_data(ObExpr &expr, int64_t returned_row_cnt);
int retry_read_task();
private:
ObODPSGeneralFormat odps_format_;
apsara::odps::sdk::Account account_;
@ -212,6 +202,7 @@ private:
class ObOdpsPartitionDownloaderMgr
{
public:
typedef common::hash::ObHashMap<int64_t, int64_t, common::hash::SpinReadWriteDefendMode> OdpsMgrMap;
struct OdpsPartitionDownloader {
OdpsPartitionDownloader() :
odps_driver_(),
@ -241,12 +232,34 @@ public:
apsara::odps::sdk::IRecordWriterPtr record_writer_;
};
ObOdpsPartitionDownloaderMgr() : inited_(false), is_download_(true), ref_(0), need_commit_(true) {}
OB_INLINE int init_map(int64_t bucket_size) {
int ret = OB_SUCCESS;
if (!odps_mgr_map_.created()){
ret = odps_mgr_map_.create(bucket_size, "OdpsTable","OdpsTableReader");
if (OB_SUCC(ret)) {
inited_ = true;
is_download_ = true;
}
}
return ret;
}
OdpsMgrMap &get_odps_map() {
return odps_mgr_map_;
}
OB_INLINE ObIAllocator &get_allocator() { return arena_alloc_; }
int init_downloader(common::ObArray<share::ObExternalFileInfo> &external_table_files,
const ObString &properties);
int init_uploader(const ObString &properties,
const ObString &external_partition,
bool is_overwrite,
int64_t parallel);
static int fetch_row_count(uint64_t tenant_id,
const ObIArray<ObExternalFileInfo> &external_table_files,
const ObString &properties,
bool &use_partition_gi);
static int fetch_row_count(const ObString part_spec,
const ObString &properties,
int64_t &row_count);
static int create_upload_session(const sql::ObODPSGeneralFormat &odps_format,
const ObString &external_partition,
bool is_overwrite,
@ -273,7 +286,7 @@ public:
private:
bool inited_;
bool is_download_;
common::hash::ObHashMap<int64_t, int64_t> odps_mgr_map_;
OdpsMgrMap odps_mgr_map_;
common::ObArenaAllocator arena_alloc_;
int64_t ref_;
bool need_commit_;

View File

@ -1397,6 +1397,7 @@ int ObTableScanOp::prepare_single_scan_range(int64_t group_idx)
} else if (MY_CTDEF.scan_ctdef_.is_external_table_) {
uint64_t table_loc_id = MY_SPEC.get_table_loc_id();
ObDASTableLoc *tab_loc = DAS_CTX(ctx_).get_table_loc_by_id(table_loc_id, MY_CTDEF.scan_ctdef_.ref_table_id_);
const ObString &table_format_or_properties = MY_CTDEF.scan_ctdef_.external_file_format_str_.str_;
ObArray<int64_t> partition_ids;
if (OB_ISNULL(tab_loc)) {
ret = OB_ERR_UNEXPECTED;
@ -1411,6 +1412,7 @@ int ObTableScanOp::prepare_single_scan_range(int64_t group_idx)
} else if (OB_FAIL(ObExternalTableUtils::prepare_single_scan_range(
ctx_.get_my_session()->get_effective_tenant_id(),
MY_CTDEF.scan_ctdef_.ref_table_id_,
table_format_or_properties,
partition_ids,
key_ranges,
range_allocator,

View File

@ -1464,17 +1464,97 @@ int ObSQLUtils::extract_odps_part_spec(const ObString &all_part_spec, ObIArray<O
return ret;
}
int ObSQLUtils::is_external_odps_table(const ObString &properties, ObIAllocator &allocator, bool &is_odps)
int ObSQLUtils::get_external_table_type(const uint64_t tenant_id,
const uint64_t table_id,
ObExternalFileFormat::FormatType &type)
{
int ret = OB_SUCCESS;
is_odps = false;
ObExternalFileFormat format;
if (properties.empty()) {
// do nothing
} else if (OB_FAIL(format.load_from_string(properties, allocator))) {
LOG_WARN("fail to load from properties string", K(ret), K(properties));
const ObTableSchema *table_schema = NULL;
share::schema::ObSchemaGetterGuard schema_guard;
OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard));
OZ (schema_guard.get_table_schema(tenant_id, table_id, table_schema));
if (OB_FAIL(ret)) {
} else if (OB_FAIL(get_external_table_type(table_schema, type))) {
LOG_WARN("failed to get external table type", K(tenant_id), K(table_id), KP(table_schema), K(ret));
}
return ret;
}
int ObSQLUtils::get_external_table_type(const ObTableSchema *table_schema, ObExternalFileFormat::FormatType &type)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else {
is_odps = ObExternalFileFormat::FormatType::ODPS_FORMAT == format.format_type_;
ObExternalFileFormat format;
ObArenaAllocator allocator;
ObString table_format_or_properties = table_schema->get_external_file_format().empty() ?
table_schema->get_external_properties(): table_schema->get_external_file_format();
if (OB_FAIL(get_external_table_type(table_format_or_properties, type))) {
LOG_WARN("failed to get external table type", K(ret), K(table_format_or_properties));
}
}
return ret;
}
int ObSQLUtils::get_external_table_type(const ObString &table_format_or_properties,
ObExternalFileFormat::FormatType &type) {
int ret = OB_SUCCESS;
ObExternalFileFormat format;
ObArenaAllocator allocator;
if (table_format_or_properties.empty()) {
} else if (OB_FAIL(format.load_from_string(table_format_or_properties, allocator))) {
LOG_WARN("fail to load from properties string", K(ret), K(table_format_or_properties));
} else {
type = format.format_type_;
}
return ret;
}
int ObSQLUtils::is_odps_external_table(const uint64_t tenant_id,
const uint64_t table_id,
bool &is_odps_external_table)
{
int ret = OB_SUCCESS;
is_odps_external_table = false;
ObExternalFileFormat::FormatType external_table_type;
if (OB_FAIL(ObSQLUtils::get_external_table_type(tenant_id, table_id, external_table_type))) {
LOG_WARN("failed to get external table type", K(ret));
} else {
is_odps_external_table = (ObExternalFileFormat::FormatType:: ODPS_FORMAT == external_table_type);
}
return ret;
}
int ObSQLUtils::is_odps_external_table(const ObTableSchema *table_schema,
bool &is_odps_external_table)
{
int ret = OB_SUCCESS;
is_odps_external_table = false;
ObExternalFileFormat::FormatType external_table_type;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(ObSQLUtils::get_external_table_type(table_schema, external_table_type))) {
LOG_WARN("failed to get external table type", K(ret));
} else {
is_odps_external_table = (ObExternalFileFormat::FormatType:: ODPS_FORMAT == external_table_type);
}
return ret;
}
int ObSQLUtils::is_odps_external_table(const ObString &table_format_or_properties,
bool &is_odps_external_table)
{
int ret = OB_SUCCESS;
is_odps_external_table = false;
ObExternalFileFormat::FormatType external_table_type;
if (OB_FAIL(ObSQLUtils::get_external_table_type(table_format_or_properties, external_table_type))) {
LOG_WARN("failed to get external table type", K(ret));
} else {
is_odps_external_table = (ObExternalFileFormat::FormatType:: ODPS_FORMAT == external_table_type);
}
return ret;
}

View File

@ -32,6 +32,8 @@
#include "sql/engine/expr/ob_expr_frame_info.h"
#include "sql/monitor/flt/ob_flt_span_mgr.h"
#include "share/ob_compatibility_control.h"
#include "sql/engine/cmd/ob_load_data_parser.h"
namespace oceanbase
{
namespace share {
@ -751,8 +753,21 @@ public:
static int64_t combine_server_id(int64_t ts, uint64_t server_id) {
return (ts & ((1LL << 43) - 1LL)) | ((server_id & 0xFFFF) << 48);
}
static int get_external_table_type(const uint64_t tenant_id,
const uint64_t table_id,
ObExternalFileFormat::FormatType &type);
static int get_external_table_type(const ObTableSchema *table_schema,
ObExternalFileFormat::FormatType &type);
static int get_external_table_type(const ObString &table_format_or_properties,
ObExternalFileFormat::FormatType &type);
static int is_odps_external_table(const uint64_t tenant_id,
const uint64_t table_id,
bool &is_odps_external_table);
static int is_odps_external_table(const ObTableSchema *table_schema,
bool &is_odps_external_table);
static int is_odps_external_table(const ObString &table_format_or_properties,
bool &is_odps_external_table);
static int extract_odps_part_spec(const ObString &all_part_spec, ObIArray<ObString> &part_spec_list);
static int is_external_odps_table(const ObString &properties, ObIAllocator &allocator, bool &is_odps);
static int check_ident_name(const common::ObCollationType cs_type, common::ObString &name,
const bool check_for_path_char, const int64_t max_ident_len);

View File

@ -1814,18 +1814,18 @@ int ObInsertLogPlan::allocate_select_into_as_top_for_insert(ObLogicalOperator *&
LOG_WARN("allocate memory for ObLogSelectInto failed", K(ret));
} else {
ObString external_properties;
const ObString &format_or_properties = table_schema->get_external_file_format().empty()
const ObString &table_format_or_properties = table_schema->get_external_file_format().empty()
? table_schema->get_external_properties()
: table_schema->get_external_file_format();
const ObInsertTableInfo& table_info = stmt->get_insert_table_info();
if (format_or_properties.empty()) {
if (table_format_or_properties.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("external properties is empty", K(ret));
} else if (table_schema->get_external_properties().empty()) { //目前只支持写odps外表 其他类型暂不支持
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support to insert into external table which is not in odps", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "insert into external table which is not in odps");
} else if (OB_FAIL(ob_write_string(get_allocator(), format_or_properties, external_properties))) {
} else if (OB_FAIL(ob_write_string(get_allocator(), table_format_or_properties, external_properties))) {
LOG_WARN("failed to append string", K(ret));
} else if (OB_FAIL(select_into->get_select_exprs().assign(table_info.column_conv_exprs_))) {
LOG_WARN("failed to get select exprs", K(ret));

View File

@ -615,6 +615,8 @@ static const NonReservedKeyword Mysql_none_reserved_keywords[] =
{"accesskey", ACCESSKEY},
{"accesstype", ACCESSTYPE},
{"endpoint", ENDPOINT},
{"tunnel_endpoint", TUNNEL_ENDPOINT},
{"collect_statistics_on_create", COLLECT_STATISTICS_ON_CREATE},
{"project_name", PROJECT_NAME},
{"quota_name", QUOTA_NAME},
{"compression_code", COMPRESSION_CODE},

View File

@ -279,7 +279,7 @@ END_P SET_VAR DELIMITER
CACHE CALIBRATION CALIBRATION_INFO CANCEL CASCADED CAST CATALOG_NAME CHAIN CHANGED CHARSET CHECKSUM CHECKPOINT CHUNK CIPHER
CLASS_ORIGIN CLEAN CLEAR CLIENT CLONE CLOG CLOSE CLUSTER CLUSTER_ID CLUSTER_NAME COALESCE COLUMN_BLOOM_FILTER COLUMN_STAT
CODE COLLATION COLUMN_FORMAT COLUMN_NAME COLUMNS COMMENT COMMIT COMMITTED COMPACT COMPLETION COMPLETE
CODE COLLATION COLLECT_STATISTICS_ON_CREATE COLUMN_FORMAT COLUMN_NAME COLUMNS COMMENT COMMIT COMMITTED COMPACT COMPLETION COMPLETE
COMPRESSED COMPRESSION COMPRESSION_BLOCK_SIZE COMPRESSION_CODE COMPUTATION COMPUTE CONCURRENT CONDENSED CONDITIONAL CONNECTION CONSISTENT CONSISTENT_MODE CONSTRAINT_CATALOG
CONSTRAINT_NAME CONSTRAINT_SCHEMA CONTAINS CONTEXT CONTRIBUTORS COPY COSINE COUNT CPU CREATE_TIMESTAMP
CTXCAT CTX_ID CUBE CURDATE CURRENT STACKED CURTIME CURSOR_NAME CUME_DIST CYCLE CALC_PARTITION_ID CONNECT
@ -366,7 +366,7 @@ END_P SET_VAR DELIMITER
TEMPLATE TEMPORARY TEMPTABLE TENANT TEXT THAN TIME TIMESTAMP TIMESTAMPADD TIMESTAMPDIFF TP_NO
TP_NAME TRACE TRADITIONAL TRANSACTION TRIGGERS TRIM TRUNCATE TYPE TYPES TASK TABLET_SIZE
TABLEGROUP_ID TENANT_ID THROTTLE TIME_ZONE_INFO TOP_K_FRE_HIST TIMES TRIM_SPACE TTL
TRANSFER TENANT_STS_CREDENTIAL
TRANSFER TUNNEL_ENDPOINT TENANT_STS_CREDENTIAL
UNCOMMITTED UNCONDITIONAL UNDEFINED UNDO_BUFFER_SIZE UNDOFILE UNNEST UNICODE UNINSTALL UNIT UNIT_GROUP UNIT_NUM UNLOCKED UNTIL
UNUSUAL UPGRADE USE_BLOOM_FILTER UNKNOWN USE_FRM USER USER_RESOURCES UNBOUNDED UP UNLIMITED USER_SPECIFIED
@ -8747,6 +8747,10 @@ TYPE COMP_EQ STRING_VALUE
{
malloc_non_terminal_node($$, result->malloc_pool_, T_ENDPOINT, 1, $3);
}
| TUNNEL_ENDPOINT COMP_EQ STRING_VALUE
{
malloc_non_terminal_node($$, result->malloc_pool_, T_TUNNEL_ENDPOINT, 1, $3);
}
| PROJECT_NAME COMP_EQ STRING_VALUE
{
malloc_non_terminal_node($$, result->malloc_pool_, T_PROJECT, 1, $3);
@ -8767,7 +8771,12 @@ TYPE COMP_EQ STRING_VALUE
| TABLE_NAME COMP_EQ STRING_VALUE
{
malloc_non_terminal_node($$, result->malloc_pool_, T_TABLE, 1, $3);
};
}
| COLLECT_STATISTICS_ON_CREATE COMP_EQ BOOL_VALUE
{
malloc_non_terminal_node($$, result->malloc_pool_, T_COLLECT_STATISTICS_ON_CREATE, 1, $3);
}
;
compression_key:
COMPRESSION_CODE
@ -24028,6 +24037,7 @@ ACCESS_INFO
| CLUSTER_NAME
| COALESCE
| CODE
| COLLECT_STATISTICS_ON_CREATE
| COLLATION
| COLUMN_BLOOM_FILTER
| COLUMN_FORMAT
@ -24650,6 +24660,7 @@ ACCESS_INFO
| TRIM_SPACE
| TRUNCATE
| TTL
| TUNNEL_ENDPOINT
| TYPE
| TYPES
| TABLEGROUP_ID

View File

@ -1068,7 +1068,12 @@ int ObCreateTableResolver::check_external_table_generated_partition_column_sanit
{
int ret = OB_SUCCESS;
ObArray<ObRawExpr *> col_exprs;
if (OB_ISNULL(dependant_expr)) {
bool is_odps_external_table = false;
if (OB_FAIL(ObSQLUtils::is_odps_external_table(&table_schema, is_odps_external_table))) {
LOG_WARN("failed to check is odps external table or not", K(ret));
} else if (is_odps_external_table) {
// do nothing
} else if (OB_ISNULL(dependant_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dependant expr is null", K(ret));
} else if (OB_FAIL(ObRawExprUtils::extract_column_exprs(dependant_expr, col_exprs, true/*extract pseudo column*/))) {
@ -1105,8 +1110,6 @@ int ObCreateTableResolver::check_external_table_generated_partition_column_sanit
LOG_WARN("user specified partition col expr contains no external partition pseudo column is not supported", K(ret));
}
}
} else if (table_schema.is_odps_external_table()) {
// lcqlog to do check
} else {
bool found = false;
for (int64_t i = 0; OB_SUCC(ret) && i < col_exprs.count(); i++) {

View File

@ -3089,6 +3089,7 @@ int ObDDLResolver::mask_properties_sensitive_info(const ParseNode *node, ObStrin
} else {
switch (node->type_) {
case ObItemType::T_ENDPOINT:
case ObItemType::T_TUNNEL_ENDPOINT:
case ObItemType::T_STSTOKEN:
case ObItemType::T_ACCESSKEY:
case ObItemType::T_ACCESSID: {

View File

@ -8684,10 +8684,10 @@ int ObDMLResolver::resolve_external_table_generated_column(
}
} else {
ObExternalFileFormat format;
const ObString &format_or_properties = table_schema->get_external_file_format().empty() ?
const ObString &table_format_or_properties = table_schema->get_external_file_format().empty() ?
table_schema->get_external_properties() :
table_schema->get_external_file_format();
if (OB_FAIL(format.load_from_string(format_or_properties, *params_.allocator_))) {
if (OB_FAIL(format.load_from_string(table_format_or_properties, *params_.allocator_))) {
LOG_WARN("load from string failed", K(ret));
} else if (format.format_type_ == ObExternalFileFormat::ORC_FORMAT && lib::is_oracle_mode()) {
ret = OB_NOT_SUPPORTED;

View File

@ -9698,6 +9698,14 @@ int ObResolverUtils::resolve_file_format(const ParseNode *node, ObExternalFileFo
format.odps_format_.endpoint_ = ObString(node->children_[0]->str_len_, node->children_[0]->str_value_).trim_space_only();
break;
}
case ObItemType::T_TUNNEL_ENDPOINT: {
format.odps_format_.tunnel_endpoint_ = ObString(node->children_[0]->str_len_, node->children_[0]->str_value_).trim_space_only();
break;
}
case ObItemType::T_COLLECT_STATISTICS_ON_CREATE: {
format.odps_format_.collect_statistics_on_create_ = node->children_[0]->value_;
break;
}
case T_PROJECT: {
format.odps_format_.project_ = ObString(node->children_[0]->str_len_, node->children_[0]->str_value_).trim_space_only();
break;

View File

@ -318,6 +318,7 @@ _datafile_usage_upper_bound_percentage
_data_storage_io_timeout
_delay_resource_recycle_after_correctness_issue
_display_mysql_version
_dop_of_collect_external_table_statistics
_enable_active_txn_transfer
_enable_adaptive_auto_dop
_enable_adaptive_compaction
@ -416,6 +417,7 @@ _ls_migration_wait_completing_timeout
_max_elr_dependent_trx_count
_max_ls_cnt_per_server
_max_malloc_sample_interval
_max_partition_count_to_collect_statistic
_max_rpc_packet_size
_max_schema_slot_num
_max_tablet_cnt_per_gb