diff --git a/deps/oblib/src/lib/ob_name_def.h b/deps/oblib/src/lib/ob_name_def.h index a03dc590a..2fc8bba9b 100644 --- a/deps/oblib/src/lib/ob_name_def.h +++ b/deps/oblib/src/lib/ob_name_def.h @@ -1203,6 +1203,7 @@ #define N_RB_BUILD "rb_build" #define N_GET_MYSQL_ROUTINE_PARAMETER_TYPE_STR "get_mysql_routine_parameter_type_str" #define N_ORA_LOGIN_USER "ora_login_user" +#define N_CALC_ODPS_SIZE "calc_odps_size" #define N_PRIV_ST_GEOHASH "_st_geohash" #define N_PRIV_ST_MAKEPOINT "_st_makepoint" #endif //OCEANBASE_LIB_OB_NAME_DEF_H_ diff --git a/src/objit/include/objit/common/ob_item_type.h b/src/objit/include/objit/common/ob_item_type.h index ceba190ac..df358230a 100644 --- a/src/objit/include/objit/common/ob_item_type.h +++ b/src/objit/include/objit/common/ob_item_type.h @@ -964,6 +964,7 @@ typedef enum ObItemType T_FUN_SYS_CALC_SUB_PARTITION_NAME = 2053, T_FUN_SYS_CALC_PARTITION_IDX = 2054, T_FUN_SYS_CALC_SUB_PARTITION_IDX = 2055, + T_FUN_SYS_CALC_ODPS_SIZE = 2056, T_MAX_OP = 3000, //pseudo column, to mark the group iterator id @@ -2628,7 +2629,6 @@ typedef enum ObItemType T_UNION_MERGE_HINT = 4740, T_UNION_MERGE_LIST = 4741, - T_PSEUDO_OLD_NEW_COL = 4742, T_TRANSFORM_DISTINCT_AGG = 4743, @@ -2653,6 +2653,10 @@ typedef enum ObItemType T_UNIT_GROUP = 4759, T_TRANSPOSE_TABLE = 4760, T_FUN_UNPIVOT = 4761, + + //odps external table + T_TUNNEL_ENDPOINT = 4762, + T_COLLECT_STATISTICS_ON_CREATE = 4763, T_MAX //Attention: add a new type before T_MAX } ObItemType; diff --git a/src/share/external_table/ob_external_table_file_mgr.cpp b/src/share/external_table/ob_external_table_file_mgr.cpp index 92238c9f2..ab997ce9f 100644 --- a/src/share/external_table/ob_external_table_file_mgr.cpp +++ b/src/share/external_table/ob_external_table_file_mgr.cpp @@ -43,6 +43,7 @@ #include "share/external_table/ob_external_table_file_rpc_proxy.h" #include "storage/ob_common_id_utils.h" #include "observer/dbms_scheduler/ob_dbms_sched_table_operator.h" +#include "sql/engine/cmd/ob_load_data_parser.h" namespace oceanbase { @@ -56,6 +57,21 @@ using namespace dbms_scheduler; namespace share { +int ObExternalFileInfo::deep_copy(ObIAllocator &allocator, const ObExternalFileInfo &other) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ob_write_string(allocator, other.file_url_, this->file_url_))) { + LOG_WARN("fail to write string", K(ret)); + } else { + this->file_id_ = other.file_id_; + this->part_id_ = other.part_id_; + this->file_addr_ = other.file_addr_; + this->file_size_ = other.file_size_; + this->row_start_ = other.row_start_; + this->row_count_ = other.row_count_; + } + return ret; +} int ObExternalTableFilesKey::deep_copy(char *buf, const int64_t buf_len, ObIKVCacheKey *&key) const { int ret = OB_SUCCESS; @@ -592,8 +608,10 @@ int ObExternalTableFileManager::calculate_odps_part_val_by_part_spec(const ObTab { int ret = OB_SUCCESS; ObIAllocator &allocator = exec_ctx.get_allocator(); - CK (OB_NOT_NULL(table_schema) && OB_LIKELY(table_schema->is_odps_external_table())); - if (OB_SUCC(ret)) { + bool is_odps_external_table = false; + if (OB_FAIL(ObSQLUtils::is_odps_external_table(table_schema, is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret)); + } else if (is_odps_external_table) { const common::ObPartitionKeyInfo &part_key_info = table_schema->get_partition_key_info(); const int part_key_size = part_key_info.get_size(); for (int64_t i = 0; OB_SUCC(ret) && i < file_infos.count(); i++) { @@ -602,8 +620,10 @@ int ObExternalTableFileManager::calculate_odps_part_val_by_part_spec(const ObTab if (OB_FAIL(ObSQLUtils::extract_odps_part_spec(all_part_spec, part_spec_list))) { LOG_WARN("failed to extract odps part spec", K(ret), K(all_part_spec)); } else if (part_spec_list.count() != part_key_size) { - ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected count find part spec of odps", K(ret), K(file_infos), K(file_infos.count()), K(i), K(all_part_spec), K(part_spec_list.count()), K(part_key_size)); + ret = OB_EXTERNAL_ODPS_UNEXPECTED_ERROR; + LOG_WARN("unexpected count find part spec of odps", K(ret), K(file_infos), K(file_infos.count()), K(i), K(all_part_spec), K(part_spec_list.count()), K(part_key_size)); + LOG_USER_ERROR(OB_EXTERNAL_ODPS_UNEXPECTED_ERROR, "unexpected count of partition key between odps table and external table"); } else { ObNewRow odps_part_row; ObObj *obj_array = nullptr; @@ -626,8 +646,30 @@ int ObExternalTableFileManager::calculate_odps_part_val_by_part_spec(const ObTab } else if (FALSE_IT(part_key_type = part_col->get_meta_type().get_type())) { } else if (part_key_type == ObVarcharType || part_key_type == ObCharType) { - odps_part_row.get_cell(j).set_meta_type(part_col->get_meta_type()); - odps_part_row.get_cell(j).set_varchar_value(part_spec.ptr(), part_spec.length()); + oceanbase::common::ObObjMeta meta_type = part_col->get_meta_type(); + ObCollationType coll_dst = static_cast(meta_type.get_cs_type()); + ObCollationType coll_src = CS_TYPE_UTF8MB4_BIN; + int64_t dst_maxblen = 0; + int64_t src_minblen = 0; + if (OB_FAIL(ObCharset::get_mbmaxlen_by_coll(coll_dst, dst_maxblen))) { + LOG_WARN("failed to get dst mb max len", K(ret), K(coll_dst)); + } else if (OB_FAIL(ObCharset::get_mbminlen_by_coll(coll_src, src_minblen))) { + LOG_WARN("failed to get src mb min len", K(ret), K(coll_src)); + } else { + void *dst_buf = NULL; + uint64_t dst_buf_size = (part_spec.length() / src_minblen) * dst_maxblen; + uint32_t dst_len = 0; + if (OB_ISNULL(dst_buf = exec_ctx.get_allocator().alloc(dst_buf_size))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc buf", K(ret)); + } else if (OB_FAIL(ObCharset::charset_convert(coll_src, part_spec.ptr(), part_spec.length(), coll_dst, static_cast(dst_buf), dst_buf_size, dst_len))) { + LOG_WARN("failed to convert charset", K(ret)); + } else { + odps_part_row.get_cell(j).set_meta_type(part_col->get_meta_type()); + odps_part_row.get_cell(j).set_varchar_value(static_cast(dst_buf), + static_cast(dst_len)); + } + } } else if (part_key_type == ObTinyIntType || part_key_type == ObSmallIntType || part_key_type == ObMediumIntType || @@ -675,11 +717,16 @@ int ObExternalTableFileManager::calculate_file_part_val_by_file_name(const ObTab OZ (cg_partition_expr_rt_expr(table_schema, exec_ctx.get_expr_factory(), exec_ctx.get_my_session(), schema_guard, exec_ctx.get_allocator(), temp_exprs)); OZ (build_row_for_file_name(file_name_row, exec_ctx.get_allocator())); + bool is_odps_external_table = false; + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ObSQLUtils::is_odps_external_table(table_schema, is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret)); + } for (int64_t i = 0; OB_SUCC(ret) && i < file_infos.count(); i++) { ObNewRow list_val; ObObj *obj_array = nullptr; if (file_name_row.get_count() > 0) { - file_name_row.get_cell(0).set_string(ObVarcharType, table_schema->is_odps_external_table() ? + file_name_row.get_cell(0).set_string(ObVarcharType, is_odps_external_table ? file_infos.at(i).file_url_.after(equals_delimiter) : (is_local_storage ? file_infos.at(i).file_url_.after(ip_delimiter) : @@ -754,12 +801,13 @@ int ObExternalTableFileManager::calculate_all_files_partitions(share::schema::Ob ObArray existed_part_vals; ObArray existed_part_ids; ObArray file_part_vals; - bool is_odps_external_table = false; CK (OB_NOT_NULL(table_schema) && OB_LIKELY(table_schema->is_external_table())); OZ (get_all_partition_list_val(table_schema, existed_part_vals, existed_part_ids)); - OX(is_odps_external_table = table_schema->is_odps_external_table()); - - if (is_odps_external_table) { + bool is_odps_external_table = false; + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ObSQLUtils::is_odps_external_table(table_schema, is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret)); + } else if (is_odps_external_table) { OZ(calculate_odps_part_val_by_part_spec(table_schema, file_infos, file_part_vals, schema_guard, exec_ctx)); } else { OZ (calculate_file_part_val_by_file_name(table_schema, file_infos, file_part_vals, schema_guard, exec_ctx)); @@ -810,11 +858,17 @@ int ObExternalTableFileManager::update_inner_table_file_list( ObIArray &file_sizes, ObIArray &updated_part_ids, bool &has_partition_changed, - const uint64_t part_id) + const uint64_t part_id, + bool collect_statistic) { int ret = OB_SUCCESS; ObMySQLTransaction trans; ObArenaAllocator allocator; + share::schema::ObSchemaGetterGuard schema_guard; + const ObTableSchema *table_schema = NULL; + OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard)); + OZ (schema_guard.get_table_schema(tenant_id, table_id, table_schema)); + CK (OB_NOT_NULL(table_schema)); CK (OB_NOT_NULL(GCTX.sql_proxy_), OB_NOT_NULL(GCTX.schema_service_)); OZ (trans.start(GCTX.sql_proxy_, tenant_id)); @@ -829,6 +883,10 @@ int ObExternalTableFileManager::update_inner_table_file_list( } else { OZ (update_inner_table_files_list_by_table(exec_ctx, trans, tenant_id, table_id, file_infos, updated_part_ids, has_partition_changed)); } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(collect_odps_table_statistics(collect_statistic, tenant_id, table_id, updated_part_ids, trans))) { + LOG_WARN("failed to collect odps table statistics", K(collect_statistic), K(tenant_id), K(table_id)); + } OZ (trans.end(true)); if (trans.is_started()) { trans.end(false); @@ -836,6 +894,51 @@ int ObExternalTableFileManager::update_inner_table_file_list( return ret; } +int ObExternalTableFileManager::collect_odps_table_statistics(const bool collect_statistic, + const uint64_t tenant_id, + const uint64_t table_id, + ObIArray &updated_part_ids, + ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + bool is_odps_external_table = false; + if (OB_FAIL(ObSQLUtils::is_odps_external_table(tenant_id, table_id, is_odps_external_table))) { + LOG_WARN("failed to check is odps table or not", K(ret), K(tenant_id), K(table_id)); + } else if (is_odps_external_table && collect_statistic) { + int64_t update_rows = 0; + ObSqlString update_sql; + int64_t dop_of_collect_external_table_statistics = 16; + if (updated_part_ids.count() <= 32) { + // do nothing + } else { + omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); + if (OB_LIKELY(tenant_config.is_valid()) && static_cast(tenant_config->_dop_of_collect_external_table_statistics) > 0) { + dop_of_collect_external_table_statistics = tenant_config->_dop_of_collect_external_table_statistics; + } else { + double min_cpu; + double max_cpu; + if (OB_ISNULL(GCTX.omt_)) { + ret = OB_ERR_UNEXPECTED; + } else if (OB_FAIL(GCTX.omt_->get_tenant_cpu(tenant_id, min_cpu, max_cpu))) { + LOG_WARN("fail to get tenant cpu", K(ret)); + } else { + dop_of_collect_external_table_statistics = max_cpu; + } + } + } + OZ(update_sql.assign_fmt("UPDATE /*+ enable_parallel_dml parallel(%ld) */ %s SET FILE_SIZE = CALC_ODPS_SIZE(FILE_URL, TABLE_ID) WHERE TABLE_ID = %ld and PART_ID IN (", + dop_of_collect_external_table_statistics, + OB_ALL_EXTERNAL_TABLE_FILE_TNAME, + table_id)); + for (int64_t i = 0; OB_SUCC(ret) && i < updated_part_ids.count(); i++) { + OZ(update_sql.append_fmt("%ld%c", updated_part_ids.at(i), ((updated_part_ids.count() - 1) == i) ? ' ' : ',')); + } + OZ(update_sql.append(")")); + OZ(trans.write(tenant_id, update_sql.ptr(), update_rows)); + } + return ret; +} + int ObExternalTableFileManager::get_file_sizes_by_map(ObIArray &file_urls, ObIArray &file_sizes, common::hash::ObHashMap &map) @@ -1035,15 +1138,10 @@ int ObExternalTableFileManager::update_inner_table_files_list_by_part( int64_t insert_rows = 0; int64_t max_file_id = 0;// ObCSVTableRowIterator::MIN_EXTERNAL_TABLE_FILE_ID - 1 common::hash::ObHashMap hash_map; - share::schema::ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = NULL; - bool is_odps_external_table = false; char file_url_buf[256] = { 0 }; - OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard)); - OZ (schema_guard.get_table_schema(tenant_id, table_id, table_schema)); - CK (OB_NOT_NULL(table_schema)); - if (OB_SUCC(ret)) { - is_odps_external_table = table_schema->is_odps_external_table(); + bool is_odps_external_table = false; + if (OB_FAIL(ObSQLUtils::is_odps_external_table(tenant_id, table_id, is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret), K(tenant_id), K(table_id)); } OZ(get_all_records_from_inner_table(allocator, tenant_id, table_id, partition_id, old_file_infos, old_file_ids)); OZ(hash_map.create(std::max(file_infos.count(), old_file_infos.count()) + 1, "ExternalFile")); @@ -1607,7 +1705,7 @@ int ObExternalTableFileManager::create_auto_refresh_job(ObExecContext &ctx, cons } -OB_SERIALIZE_MEMBER(ObExternalFileInfo, file_url_, file_id_, file_addr_, file_size_, part_id_); +OB_SERIALIZE_MEMBER(ObExternalFileInfo, file_url_, file_id_, file_addr_, file_size_, part_id_, row_start_, row_count_); } } diff --git a/src/share/external_table/ob_external_table_file_mgr.h b/src/share/external_table/ob_external_table_file_mgr.h index 0d310a75f..a91e46ede 100644 --- a/src/share/external_table/ob_external_table_file_mgr.h +++ b/src/share/external_table/ob_external_table_file_mgr.h @@ -32,13 +32,16 @@ class ObAlterTableStmt; namespace share { struct ObExternalFileInfo { - ObExternalFileInfo() : file_id_(INT64_MAX), part_id_(0), file_size_(0) {} + ObExternalFileInfo() : file_id_(INT64_MAX), part_id_(0), file_size_(0), row_start_(0), row_count_(0) {} common::ObString file_url_; int64_t file_id_; int64_t part_id_; common::ObAddr file_addr_; int64_t file_size_; - TO_STRING_KV(K_(file_url), K_(file_id), K_(part_id), K_(file_addr), K_(file_size)); + int64_t row_start_; + int64_t row_count_; + int deep_copy(ObIAllocator &allocator, const ObExternalFileInfo &other); + TO_STRING_KV(K_(file_url), K_(file_id), K_(part_id), K_(file_addr), K_(file_size), K_(row_start), K_(row_count)); OB_UNIS_VERSION(1); }; @@ -151,7 +154,8 @@ public: common::ObIArray &file_sizes, common::ObIArray &updated_part_ids, bool &has_partition_changed, - const uint64_t part_id = -1); + const uint64_t part_id = -1, + bool collect_statistic = true); int get_all_records_from_inner_table(ObIAllocator &allocator, int64_t tenant_id, @@ -197,6 +201,11 @@ public: int auto_refresh_external_table(ObExecContext &exec_ctx, const int64_t interval); private: + int collect_odps_table_statistics(const bool collect_statistic, + const uint64_t tenant_id, + const uint64_t table_id, + ObIArray &updated_part_ids, + ObMySQLTransaction &trans); int delete_auto_refresh_job(ObExecContext &exec_ctx, ObMySQLTransaction &trans); int create_auto_refresh_job(ObExecContext &ctx, const int64_t interval, ObMySQLTransaction &trans); int update_inner_table_files_list_by_part( diff --git a/src/share/external_table/ob_external_table_utils.cpp b/src/share/external_table/ob_external_table_utils.cpp index f2b147354..ce12d5dba 100644 --- a/src/share/external_table/ob_external_table_utils.cpp +++ b/src/share/external_table/ob_external_table_utils.cpp @@ -26,6 +26,7 @@ #include "deps/oblib/src/lib/net/ob_addr.h" #include "share/external_table/ob_external_table_file_rpc_processor.h" #include "share/external_table/ob_external_table_file_rpc_proxy.h" +#include "sql/executor/ob_task_spliter.h" namespace oceanbase { @@ -302,6 +303,7 @@ int ObExternalTableUtils::make_external_table_scan_range(const common::ObString int ObExternalTableUtils::prepare_single_scan_range(const uint64_t tenant_id, const uint64_t table_id, + const ObString &table_format_or_properties, ObIArray &partition_ids, ObIArray &ranges, ObIAllocator &range_allocator, @@ -311,6 +313,8 @@ int ObExternalTableUtils::prepare_single_scan_range(const uint64_t tenant_id, ObSEArray file_urls; ObSEArray tmp_ranges; ObSEArray all_locations; + ObExternalFileFormat::FormatType external_table_type; + bool is_odps_external_table = false; if (OB_ISNULL(GCTX.location_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error", K(ret)); @@ -330,7 +334,10 @@ int ObExternalTableUtils::prepare_single_scan_range(const uint64_t tenant_id, } else { new_range.reset(); } - if (!file_urls.empty() && file_urls.at(0).file_id_ == 0) {// if file_id_ == 0 means, it's odps table + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ObSQLUtils::is_odps_external_table(table_format_or_properties, is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret), K(table_format_or_properties)); + } else if (!file_urls.empty() && is_odps_external_table) { for (int64_t i = 0; OB_SUCC(ret) && i < file_urls.count(); ++i) { const ObExternalFileInfo& external_info = file_urls.at(i); ObNewRange *range = NULL; @@ -344,6 +351,7 @@ int ObExternalTableUtils::prepare_single_scan_range(const uint64_t tenant_id, external_info.file_id_, external_info.part_id_, 0, + //external_info.file_size_, INT64_MAX, range_allocator, *range))) { @@ -428,6 +436,217 @@ int ObExternalPathFilter::init(const ObString &pattern, return ret; } +int ObExternalTableUtils::assign_odps_file_to_sqcs( + ObDfo &dfo, + ObIArray &sqcs, + int64_t parallel) +{ + int ret = OB_SUCCESS; +#ifdef OB_BUILD_CPP_ODPS + const common::ObIArray &files = dfo.get_external_table_files(); + int64_t sqc_count = sqcs.count(); + int64_t sqc_idx = 0; + bool use_partition_gi = false; + ObSEArray scan_ops; + const ObTableScanSpec *scan_op = nullptr; + const ObOpSpec *root_op = NULL; + dfo.get_root(root_op); + if (OB_ISNULL(root_op)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null ptr", K(ret)); + } else if (OB_FAIL(ObTaskSpliter::find_scan_ops(scan_ops, *root_op))) { + LOG_WARN("failed to find scan_ops", K(ret), KP(root_op)); + } else if (scan_ops.count() == 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("empty scan_ops", K(ret)); + } else if (OB_FAIL(ObOdpsPartitionDownloaderMgr::fetch_row_count(MTL_ID(), + files, + scan_ops.at(0)->tsc_ctdef_.scan_ctdef_.external_file_format_str_.str_, + use_partition_gi))) { + LOG_WARN("failed to fetch row count", K(ret)); + } else if (use_partition_gi) { + sqc_idx = 0; + for (int64_t i = 0; OB_SUCC(ret) && i < files.count(); ++i) { + OZ (sqcs.at(sqc_idx++ % sqc_count)->get_access_external_table_files().push_back(files.at(i))); + } + } else { + ObSEArray temp_block_files; + ObSEArray sqc_row_counts; + int64_t big_file_row_count = 0; + int64_t small_file_count = 0; + const int64_t SMALL_FILE_SIZE = 100000; + const double DROP_FACTOR = 0.05; + for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); ++i) { + if (OB_FAIL(sqc_row_counts.push_back(0))) { + LOG_WARN("failed to init sqc_row_counts", K(i), K(ret)); + } + } + for (int64_t i = 0; OB_SUCC(ret) && i < files.count(); ++i) { + if (files.at(i).file_size_ > SMALL_FILE_SIZE) { + big_file_row_count += files.at(i).file_size_; + } else { + ++small_file_count; + } + } + int64_t avg_row_count_to_sqc = big_file_row_count / sqc_count; + int64_t expected_row_count_to_sqc = 0; + int64_t row_tail = big_file_row_count % sqc_count; + int64_t file_idx = 0; + int64_t file_start = 0; + int64_t row_count_assigned_to_sqc = 0; + int64_t remaining_row_count_in_file = 0; + int64_t row_count_needed_to_sqc = 0; + int64_t droped_row_count_on_last_loop = 0; + sqc_idx = 0; + LOG_TRACE("before split odps file", K(ret), K(small_file_count), K(big_file_row_count), K(sqc_count), K(row_tail), K(avg_row_count_to_sqc), K(files)); + while (OB_SUCC(ret) && big_file_row_count && sqc_idx < sqc_count) { + expected_row_count_to_sqc = avg_row_count_to_sqc + droped_row_count_on_last_loop; + row_count_assigned_to_sqc = 0; + droped_row_count_on_last_loop = 0; + if (sqc_idx == sqc_count - 1) { + expected_row_count_to_sqc += row_tail; + } + while (OB_SUCC(ret) && row_count_assigned_to_sqc != expected_row_count_to_sqc) { + while (file_idx < files.count() && files.at(file_idx).file_size_ < SMALL_FILE_SIZE) { ++file_idx; } + if (file_idx >= files.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("big odps files is iter end", K(sqc_idx), K(sqc_count), K(row_count_assigned_to_sqc), K(expected_row_count_to_sqc), K(avg_row_count_to_sqc), K(row_tail), K(files.count()), K(file_idx)); + } else { + remaining_row_count_in_file = files.at(file_idx).file_size_ - file_start; + row_count_needed_to_sqc = expected_row_count_to_sqc - row_count_assigned_to_sqc; + if (remaining_row_count_in_file >= row_count_needed_to_sqc) { + ObExternalFileInfo splited_file_info = files.at(file_idx); + splited_file_info.row_start_ = file_start; + splited_file_info.row_count_ = row_count_needed_to_sqc; + row_count_assigned_to_sqc += row_count_needed_to_sqc; + file_start += row_count_needed_to_sqc; + OZ (sqcs.at(sqc_idx)->get_access_external_table_files().push_back(splited_file_info)); + } else if (remaining_row_count_in_file > 0) { + ObExternalFileInfo splited_file_info = files.at(file_idx); + splited_file_info.row_start_ = file_start; + splited_file_info.row_count_ = remaining_row_count_in_file; + row_count_assigned_to_sqc += remaining_row_count_in_file; + file_start += remaining_row_count_in_file; + OZ (sqcs.at(sqc_idx)->get_access_external_table_files().push_back(splited_file_info)); + } else { //remaining_row_count_in_file == 0 + ++file_idx; + file_start = 0; + if (expected_row_count_to_sqc - row_count_assigned_to_sqc <= avg_row_count_to_sqc * DROP_FACTOR) { + droped_row_count_on_last_loop = expected_row_count_to_sqc - row_count_assigned_to_sqc; + expected_row_count_to_sqc = row_count_assigned_to_sqc; + } + } + } + } + sqc_row_counts.at(sqc_idx) = row_count_assigned_to_sqc; + sqc_idx++; + LOG_TRACE("assigned big odps file to sqc", K(ret), K(sqc_row_counts), K(row_count_assigned_to_sqc), K(expected_row_count_to_sqc), K(avg_row_count_to_sqc), K(file_idx), K(sqc_idx), K(remaining_row_count_in_file)); + } + const int64_t block_cnt = parallel / sqcs.count() * 3; + LOG_TRACE("split big odps file to sqc, going to split files to block in sqc", K(ret), K(sqc_row_counts), K(sqcs.count()), K(block_cnt), K(parallel), K(small_file_count)); + for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); ++i) { + ObIArray &sqc_files = sqcs.at(i)->get_access_external_table_files(); + if (sqc_files.empty() || sqc_row_counts.at(i) <= 0) { + continue; + } + temp_block_files.reuse(); + int64_t actual_block_cnt = block_cnt; + if (i < small_file_count % sqcs.count()) { + --actual_block_cnt; + } + LOG_WARN("split big odps file twice", K(i), K(i < small_file_count % sqcs.count()), K(actual_block_cnt), K(sqc_files), K(ret)); + int64_t avg_row_count_to_block = sqc_row_counts.at(i) / actual_block_cnt; + int64_t expected_row_count_to_block = 0; + int64_t block_row_tail = sqc_row_counts.at(i) % actual_block_cnt; + int64_t block_idx = 0; + int64_t row_count_assigned_to_block = 0; + file_idx = 0; + file_start = sqc_row_counts.at(i) ? sqc_files.at(file_idx).row_start_ : 0; // sqc_row_counts.at(i) means sqc_files is not empty + droped_row_count_on_last_loop = 0; + while (OB_SUCC(ret) && sqc_row_counts.at(i) && block_idx < actual_block_cnt) { + expected_row_count_to_block = avg_row_count_to_block + droped_row_count_on_last_loop; + row_count_assigned_to_block = 0; + droped_row_count_on_last_loop = 0; + if (block_idx == actual_block_cnt - 1) { + expected_row_count_to_block += block_row_tail; + } + while (OB_SUCC(ret) && row_count_assigned_to_block != expected_row_count_to_block) { + if (file_idx >= sqc_files.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("files in sqc iter end", K(i), K(block_idx), K(actual_block_cnt), K(row_count_assigned_to_block), K(expected_row_count_to_block), K(avg_row_count_to_block), K(block_row_tail), K(file_start), K(file_idx), K(sqc_files)); + } else { + int64_t remaining_row_count_in_file = sqc_files.at(file_idx).row_count_ - (file_start - sqc_files.at(file_idx).row_start_); + int64_t row_count_needed_to_block = expected_row_count_to_block - row_count_assigned_to_block; + if (remaining_row_count_in_file >= row_count_needed_to_block) { + ObExternalFileInfo splited_file_info = sqc_files.at(file_idx); + splited_file_info.row_start_ = file_start; + splited_file_info.row_count_ = row_count_needed_to_block; + row_count_assigned_to_block += row_count_needed_to_block; + file_start += row_count_needed_to_block; + OZ (temp_block_files.push_back(splited_file_info)); + } else if (remaining_row_count_in_file > 0) { + ObExternalFileInfo splited_file_info = sqc_files.at(file_idx); + splited_file_info.row_start_ = file_start; + splited_file_info.row_count_ = remaining_row_count_in_file; + row_count_assigned_to_block += remaining_row_count_in_file; + file_start += remaining_row_count_in_file; + OZ (temp_block_files.push_back(splited_file_info)); + } else { //remaining_row_count_in_file == 0 + ++file_idx; + file_start = file_idx >= sqc_files.count() ? 0 : sqc_files.at(file_idx).row_start_; + if (expected_row_count_to_block - row_count_assigned_to_block <= avg_row_count_to_block * DROP_FACTOR) { + droped_row_count_on_last_loop = expected_row_count_to_block - row_count_assigned_to_block; + expected_row_count_to_block = row_count_assigned_to_block; + } + } + } + } + block_idx++; + } + sqc_files.reuse(); + int64_t part_id = OB_INVALID_ID; + for (int64_t j = 0; OB_SUCC(ret) && j < temp_block_files.count(); ++j) { + if (part_id != temp_block_files.at(j).part_id_) {// to find first distinct file + OZ(sqc_files.push_back(temp_block_files.at(j))); + part_id = temp_block_files.at(j).part_id_; + } + } + part_id = OB_INVALID_ID; + for (int64_t j = 0; OB_SUCC(ret) && j < temp_block_files.count(); ++j) { + if (part_id != temp_block_files.at(j).part_id_) { + part_id = temp_block_files.at(j).part_id_; + } else { + OZ(sqc_files.push_back(temp_block_files.at(j))); + } + } + + } + sqc_idx = 0; + for (int64_t i = 0; OB_SUCC(ret) && i < files.count(); ++i) { + if (files.at(i).file_size_ <= SMALL_FILE_SIZE) { + OZ (sqcs.at(sqc_idx++ % sqc_count)->get_access_external_table_files().push_back(files.at(i))); + } + } + } + if (OB_SUCC(ret)) { + ObExternalFileInfo dummy_file; + const char* dummy_file_name = "#######DUMMY_FILE#######"; + dummy_file.file_url_ = dummy_file_name; + for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); ++i) { + if (sqcs.at(i)->get_access_external_table_files().empty()) { + OZ(sqcs.at(i)->get_access_external_table_files().push_back(dummy_file)); + } + } + } +#else + int64_t sqc_idx = 0; + ret = OB_NOT_SUPPORTED; + LOG_WARN("not supported featue", K(ret)); + LOG_USER_ERROR(OB_NOT_SUPPORTED, "odps external table"); +#endif + return ret; +} + int ObExternalTableUtils::calc_assigned_files_to_sqcs( const ObIArray &files, ObIArray &assigned_idx, diff --git a/src/share/external_table/ob_external_table_utils.h b/src/share/external_table/ob_external_table_utils.h index e80f17d7c..76e93a40e 100644 --- a/src/share/external_table/ob_external_table_utils.h +++ b/src/share/external_table/ob_external_table_utils.h @@ -17,6 +17,7 @@ #include "lib/string/ob_string.h" #include "lib/allocator/page_arena.h" #include "src/share/schema/ob_column_schema.h" +#include "sql/engine/px/ob_dfo.h" namespace oceanbase { @@ -96,6 +97,7 @@ class ObExternalTableUtils { static int prepare_single_scan_range(const uint64_t tenant_id, const uint64_t table_id, + const ObString &table_format_or_properties, ObIArray &partition_ids, common::ObIArray &ranges, common::ObIAllocator &range_allocator, @@ -107,6 +109,11 @@ class ObExternalTableUtils { common::ObIArray &assigned_idx, int64_t sqc_count); + static int assign_odps_file_to_sqcs( + ObDfo &dfo, + ObIArray &sqcs, + int64_t parallel); + static int filter_files_in_locations(common::ObIArray &files, common::ObIArray &locations); diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 5fec4d3d8..7bfa06398 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -2310,6 +2310,14 @@ DEF_STR_WITH_CHECKER(ob_storage_s3_url_encode_type, OB_CLUSTER_PARAMETER, "defau "\"compliantRfc3986Encoding\": Uses URL encoding that adheres to the RFC 3986 standard.", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_INT(_dop_of_collect_external_table_statistics, OB_TENANT_PARAMETER, "0", "[0,)", + "parallelism of pull statistics of external table", + ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); + +DEF_INT(_max_partition_count_to_collect_statistic, OB_TENANT_PARAMETER, "5", "[0,)", + "force odps external table to using block granule iterator when count of partition is under this config", + ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); + DEF_INT(ob_encoding_granularity, OB_TENANT_PARAMETER, "65536", "[8192, 1048576]", "Maximum rows for encoding in one micro block. Range:[8192,1048576]", ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/src/share/schema/ob_schema_printer.cpp b/src/share/schema/ob_schema_printer.cpp index 1d78a8b3f..9d5771a99 100644 --- a/src/share/schema/ob_schema_printer.cpp +++ b/src/share/schema/ob_schema_printer.cpp @@ -5687,12 +5687,12 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s const ObString &format_string = table_schema.get_external_file_format(); const ObString &properties_string = table_schema.get_external_properties(); const bool user_specified = table_schema.is_user_specified_partition_for_external_table(); - bool is_odps_table = false; - if (OB_FAIL(ObSQLUtils::is_external_odps_table(properties_string, allocator, is_odps_table))) { - LOG_WARN("failed check is odps table or not", K(ret)); - } else if (!is_odps_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nLOCATION='%.*s'", location.length(), location.ptr()))) { + bool is_odps_external_table = false; + if (OB_FAIL(ObSQLUtils::is_odps_external_table(&table_schema, is_odps_external_table))) { + LOG_WARN("failed to check is odps table or not", K(ret)); + } else if (!is_odps_external_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nLOCATION='%.*s'", location.length(), location.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print LOCATION", K(ret)); - } else if (!is_odps_table && !pattern.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nPATTERN='%.*s'", pattern.length(), pattern.ptr()))) { + } else if (!is_odps_external_table && !pattern.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nPATTERN='%.*s'", pattern.length(), pattern.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print PATTERN", K(ret)); } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\nAUTO_REFRESH = %s", table_schema.get_external_table_auto_refresh() == 0 ? "OFF" : table_schema.get_external_table_auto_refresh() == 1 ? "IMMEDIATE" : "INTERVAL"))) { @@ -5706,19 +5706,19 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s // 2. print file format if (OB_SUCC(ret)) { ObExternalFileFormat format; - const ObString &format_or_properties = is_odps_table ? properties_string : format_string; - if (format_or_properties.empty()) { + const ObString &table_format_or_properties = is_odps_external_table ? properties_string : format_string; + if (table_format_or_properties.empty()) { ret = OB_ERR_UNEXPECTED; - SHARE_SCHEMA_LOG(WARN, "format_or_properties is empty", K(ret)); - } else if (OB_FAIL(format.load_from_string(format_or_properties, allocator))) { + SHARE_SCHEMA_LOG(WARN, "table_format_or_properties is empty", K(ret)); + } else if (OB_FAIL(format.load_from_string(table_format_or_properties, allocator))) { SHARE_SCHEMA_LOG(WARN, "fail to load from json string", K(ret)); } else if (!(format.format_type_ > ObExternalFileFormat::INVALID_FORMAT && format.format_type_ < ObExternalFileFormat::MAX_FORMAT)) { ret = OB_NOT_SUPPORTED; SHARE_SCHEMA_LOG(WARN, "unsupported to print file format", K(ret), K(format.format_type_)); - } else if (!is_odps_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nFORMAT (\n"))) { + } else if (!is_odps_external_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nFORMAT (\n"))) { SHARE_SCHEMA_LOG(WARN, "fail to print FORMAT (", K(ret)); - } else if (is_odps_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nPROPERTIES (\n"))) { + } else if (is_odps_external_table && OB_FAIL(databuff_printf(buf, buf_len, pos, "\nPROPERTIES (\n"))) { SHARE_SCHEMA_LOG(WARN, "fail to print FORMAT (", K(ret)); } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, " TYPE = '%s',", ObExternalFileFormat::FORMAT_TYPE_STR[format.format_type_]))) { SHARE_SCHEMA_LOG(WARN, "fail to print TYPE", K(ret)); @@ -5764,25 +5764,34 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s } else if (OB_SUCC(ret) && ObExternalFileFormat::ODPS_FORMAT == format.format_type_) { const ObODPSGeneralFormat &odps = format.odps_format_; ObString scret_str("********"); - if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[0], odps.access_type_.length(), odps.access_type_.ptr()))) { + int64_t option_names_idx = 0; + if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.access_type_.length(), odps.access_type_.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); - } else if (!odps.access_id_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[1], scret_str.length(), scret_str.ptr()))) { + } else if (!odps.access_id_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], scret_str.length(), scret_str.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); - } else if (!odps.access_key_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[2], scret_str.length(), scret_str.ptr()))) { + } else if (!odps.access_key_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], scret_str.length(), scret_str.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); - } else if (!odps.sts_token_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[3], scret_str.length(), scret_str.ptr()))) { + } else if (!odps.sts_token_.empty() && OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], scret_str.length(), scret_str.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); - } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[4], odps.endpoint_.length(), odps.endpoint_.ptr()))) { + } else if (odps.sts_token_.empty()) { + option_names_idx++; + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.endpoint_.length(), odps.endpoint_.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); - } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[5], odps.project_.length(), odps.project_.ptr()))) { + } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.tunnel_endpoint_.length(), odps.tunnel_endpoint_.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); - } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[6], odps.schema_.length(), odps.schema_.ptr()))) { + } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.project_.length(), odps.project_.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); - } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[7], odps.table_.length(), odps.table_.ptr()))) { + } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.schema_.length(), odps.schema_.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); - } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[8], odps.quota_.length(), odps.quota_.ptr()))) { + } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.table_.length(), odps.table_.ptr()))) { SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); - } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[9], odps.compression_code_.length(), odps.compression_code_.ptr()))) { + } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.quota_.length(), odps.quota_.ptr()))) { + SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); + } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = '%.*s',", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.compression_code_.length(), odps.compression_code_.ptr()))) { + SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); + } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n %s = %s,", ObODPSGeneralFormat::OPTION_NAMES[option_names_idx++], odps.collect_statistics_on_create_ ? "TRUE" : "FALSE"))) { SHARE_SCHEMA_LOG(WARN, "fail to print ODPS_INFO", K(ret)); } } diff --git a/src/share/schema/ob_table_schema.h b/src/share/schema/ob_table_schema.h index 5d6a41df8..3668048ff 100644 --- a/src/share/schema/ob_table_schema.h +++ b/src/share/schema/ob_table_schema.h @@ -1520,7 +1520,6 @@ public: inline ObNameGeneratedType get_name_generated_type() const { return name_generated_type_; } bool is_sys_generated_name(bool check_unknown) const; inline bool is_user_specified_partition_for_external_table() const { return (table_flags_ & EXTERNAL_TABLE_USER_SPECIFIED_PARTITION_FLAG) != 0; } - inline bool is_odps_external_table() const { return !external_properties_.empty(); } inline bool is_index_visible() const { return 0 == (index_attributes_set_ & ((uint64_t)(1) << INDEX_VISIBILITY)); diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index 304aa3a38..307eb3d75 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -815,6 +815,7 @@ ob_set_subtarget(ob_sql engine_expr engine/expr/ob_expr_func_ceil.cpp engine/expr/ob_expr_eval_functions.cpp engine/expr/ob_expr_in.cpp + engine/expr/ob_expr_calc_odps_size.cpp ) ob_set_subtarget(ob_sql ALONE diff --git a/src/sql/code_generator/ob_tsc_cg_service.cpp b/src/sql/code_generator/ob_tsc_cg_service.cpp index 057ffea61..03fe9d503 100644 --- a/src/sql/code_generator/ob_tsc_cg_service.cpp +++ b/src/sql/code_generator/ob_tsc_cg_service.cpp @@ -69,13 +69,13 @@ int ObTscCgService::generate_tsc_ctdef(ObLogTableScan &op, ObTableScanCtDef &tsc LOG_WARN("fail to check location access priv", K(ret)); } else { scan_ctdef.is_external_table_ = true; - const ObString &format_or_properties = table_schema->get_external_file_format().empty() ? + const ObString &table_format_or_properties = table_schema->get_external_file_format().empty() ? table_schema->get_external_properties() : table_schema->get_external_file_format(); - if (format_or_properties.empty()) { + if (table_format_or_properties.empty()) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("format_or_properties is empty", K(ret)); - } else if (OB_FAIL(scan_ctdef.external_file_format_str_.store_str(format_or_properties))) { + LOG_WARN("table_format_or_properties is empty", K(ret)); + } else if (OB_FAIL(scan_ctdef.external_file_format_str_.store_str(table_format_or_properties))) { LOG_WARN("fail to set string", K(ret)); } else if (OB_FAIL(scan_ctdef.external_file_location_.store_str(table_schema->get_external_file_location()))) { LOG_WARN("fail to set string", K(ret)); diff --git a/src/sql/engine/cmd/ob_load_data_parser.cpp b/src/sql/engine/cmd/ob_load_data_parser.cpp index 3c92b8b58..18cb32516 100644 --- a/src/sql/engine/cmd/ob_load_data_parser.cpp +++ b/src/sql/engine/cmd/ob_load_data_parser.cpp @@ -56,6 +56,8 @@ int64_t ObODPSGeneralFormat::to_json_kv_string(char *buf, const int64_t buf_len) J_COMMA(); databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(endpoint_))); J_COMMA(); + databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(tunnel_endpoint_))); + J_COMMA(); databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(project_))); J_COMMA(); databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(schema_))); @@ -65,6 +67,8 @@ int64_t ObODPSGeneralFormat::to_json_kv_string(char *buf, const int64_t buf_len) databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(quota_))); J_COMMA(); databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], to_cstring(ObHexStringWrap(compression_code_))); + J_COMMA(); + databuff_printf(buf, buf_len, pos, "\"%s\":%s", OPTION_NAMES[idx++], STR_BOOL(collect_statistics_on_create_)); return pos; } @@ -221,6 +225,8 @@ int ObODPSGeneralFormat::deep_copy(const ObODPSGeneralFormat &src) { LOG_WARN("failed to deep copy", K(ret)); } else if (OB_FAIL(deep_copy_str(src.endpoint_, endpoint_))) { LOG_WARN("failed to deep copy", K(ret)); + } else if (OB_FAIL(deep_copy_str(src.tunnel_endpoint_, tunnel_endpoint_))) { + LOG_WARN("failed to deep copy", K(ret)); } else if (OB_FAIL(deep_copy_str(src.project_, project_))) { LOG_WARN("failed to deep copy", K(ret)); } else if (OB_FAIL(deep_copy_str(src.schema_, schema_))) { @@ -231,6 +237,8 @@ int ObODPSGeneralFormat::deep_copy(const ObODPSGeneralFormat &src) { LOG_WARN("failed to deep copy", K(ret)); } else if (OB_FAIL(deep_copy_str(src.compression_code_, compression_code_))) { LOG_WARN("failed to deep copy", K(ret)); + } else { + collect_statistics_on_create_ = src.collect_statistics_on_create_; } return ret; } @@ -284,6 +292,15 @@ int ObODPSGeneralFormat::load_from_json_data(json::Pair *&node, ObIAllocator &al } node = node->get_next(); } + if (OB_NOT_NULL(node) && 0 == node->name_.case_compare(OPTION_NAMES[idx++]) + && json::JT_STRING == node->value_->get_type()) { + ObObj obj; + OZ (ObHexUtilsBase::unhex(node->value_->get_string(), allocator, obj)); + if (OB_SUCC(ret) && !obj.is_null()) { + tunnel_endpoint_ = obj.get_string(); + } + node = node->get_next(); + } if (OB_NOT_NULL(node) && 0 == node->name_.case_compare(OPTION_NAMES[idx++]) && json::JT_STRING == node->value_->get_type()) { ObObj obj; @@ -329,6 +346,14 @@ int ObODPSGeneralFormat::load_from_json_data(json::Pair *&node, ObIAllocator &al } node = node->get_next(); } + if (OB_NOT_NULL(node) && 0 == node->name_.case_compare(OPTION_NAMES[idx++])) { + if (json::JT_TRUE == node->value_->get_type()) { + collect_statistics_on_create_ = true; + } else { + collect_statistics_on_create_ = false; + } + node = node->get_next(); + } return ret; } diff --git a/src/sql/engine/cmd/ob_load_data_parser.h b/src/sql/engine/cmd/ob_load_data_parser.h index 578650dcc..3edbd5081 100644 --- a/src/sql/engine/cmd/ob_load_data_parser.h +++ b/src/sql/engine/cmd/ob_load_data_parser.h @@ -37,11 +37,13 @@ struct ObODPSGeneralFormat { access_key_(), sts_token_(), endpoint_(), + tunnel_endpoint_(), project_(), schema_(), table_(), quota_(), - compression_code_() + compression_code_(), + collect_statistics_on_create_(false) {} int deep_copy_str(const ObString &src, ObString &dest); @@ -56,26 +58,30 @@ struct ObODPSGeneralFormat { "ACCESSKEY", "STSTOKEN", "ENDPOINT", + "TUNNEL_ENDPOINT", "PROJECT_NAME", "SCHEMA_NAME", "TABLE_NAME", "QUOTA_NAME", "COMPRESSION_CODE", + "COLLECT_STATISTICS_ON_CREATE", }; common::ObString access_type_; common::ObString access_id_; common::ObString access_key_; common::ObString sts_token_; common::ObString endpoint_; + common::ObString tunnel_endpoint_; common::ObString project_; common::ObString schema_; common::ObString table_; common::ObString quota_; common::ObString compression_code_; + bool collect_statistics_on_create_; common::ObArenaAllocator arena_alloc_; int64_t to_json_kv_string(char* buf, const int64_t buf_len) const; int load_from_json_data(json::Pair *&node, common::ObIAllocator &allocator); - TO_STRING_KV(K_(access_type), K_(access_id), K_(access_key), K_(sts_token), K_(endpoint), K_(project), K_(schema), K_(table), K_(quota), K_(compression_code)); + TO_STRING_KV(K_(access_type), K_(access_id), K_(access_key), K_(sts_token), K_(endpoint), K_(tunnel_endpoint), K_(project), K_(schema), K_(table), K_(quota), K_(compression_code), K_(collect_statistics_on_create)); OB_UNIS_VERSION(1); }; diff --git a/src/sql/engine/cmd/ob_table_executor.cpp b/src/sql/engine/cmd/ob_table_executor.cpp index b546648b2..51b3a3dc6 100644 --- a/src/sql/engine/cmd/ob_table_executor.cpp +++ b/src/sql/engine/cmd/ob_table_executor.cpp @@ -703,7 +703,23 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt) //auto refresh after create external table ObArray updated_part_ids; //not used bool has_partition_changed = false; //not used - OZ (ObExternalTableFileManager::get_instance().update_inner_table_file_list(ctx, tenant_id, res.table_id_, file_urls, file_sizes, updated_part_ids, has_partition_changed)); + const uint64_t part_id = -1; + bool collect_statistics_on_create = false; + bool is_odps_external_table = false; + if (OB_FAIL(ObSQLUtils::is_odps_external_table(&table_schema, is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret)); + } else if (is_odps_external_table) { + sql::ObExternalFileFormat ex_format; + ex_format.format_type_ = sql::ObExternalFileFormat::ODPS_FORMAT; + ObArenaAllocator allocator("CreateTableExec"); + if (OB_FAIL(ex_format.load_from_string(table_schema.get_external_properties(), allocator))) { + LOG_WARN("failed to load from string", K(ret)); + } else { + collect_statistics_on_create = ex_format.odps_format_.collect_statistics_on_create_; + } + } + OZ (ObExternalTableFileManager::get_instance().update_inner_table_file_list(ctx, tenant_id, res.table_id_, file_urls, file_sizes, updated_part_ids, has_partition_changed, + part_id, collect_statistics_on_create)); } } else { if (table_schema.is_external_table()) { diff --git a/src/sql/engine/expr/ob_expr_calc_odps_size.cpp b/src/sql/engine/expr/ob_expr_calc_odps_size.cpp new file mode 100644 index 000000000..8baf8a12d --- /dev/null +++ b/src/sql/engine/expr/ob_expr_calc_odps_size.cpp @@ -0,0 +1,115 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SQL_ENG +#include "sql/engine/table/ob_odps_table_row_iter.h" +#include "sql/engine/expr/ob_expr_calc_odps_size.h" + +namespace oceanbase +{ +using namespace oceanbase::common; + +namespace sql +{ + +ObExprCalcOdpsSize::ObExprCalcOdpsSize(ObIAllocator &alloc) + : ObFuncExprOperator(alloc, T_FUN_SYS_CALC_ODPS_SIZE, N_CALC_ODPS_SIZE, 2, NOT_VALID_FOR_GENERATED_COL, NOT_ROW_DIMENSION) +{ +} + +int ObExprCalcOdpsSize::calc_result_type2(ObExprResType &type, + ObExprResType &type1, + ObExprResType &type2, + ObExprTypeCtx &type_ctx) const +{ + UNUSED(type_ctx); + int ret = OB_SUCCESS; + UNUSED(type2); + if (NOT_ROW_DIMENSION == row_dimension_) { + if (ObMaxType == type1.get_type()) { + ret = OB_ERR_INVALID_TYPE_FOR_OP; + } else { + type.set_int(); + type1.set_calc_type(ObVarcharType); + type2.set_calc_type(ObIntType); + type_ctx.set_cast_mode(type_ctx.get_cast_mode() | CM_WARN_ON_FAIL); + } + ObExprOperator::calc_result_flag2(type, type1, type2); + } else { + ret = OB_ERR_INVALID_TYPE_FOR_OP; // arithmetic not support row + } + return ret; +} + +int ObExprCalcOdpsSize::calc_odps_size(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res_datum) +{ + int ret = OB_SUCCESS; + ObDatum *file_url_datum = NULL; + ObDatum *table_id_datum = NULL; + int ret_file_url = expr.args_[0]->eval(ctx, file_url_datum); + int ret_table_id = expr.args_[1]->eval(ctx, table_id_datum); + if (OB_SUCCESS == ret_file_url && OB_SUCCESS == ret_table_id) { + if (file_url_datum->is_null() || table_id_datum->is_null()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null datum", K(file_url_datum->is_null()), K(table_id_datum->is_null()), K(ret)); + } else { + const ObString file_url = file_url_datum->get_string(); + const int64_t table_id = table_id_datum->get_int(); + const uint64_t tenant_id = ctx.exec_ctx_.get_my_session()->get_effective_tenant_id(); + ObSchemaGetterGuard *schema_guard = ctx.exec_ctx_.get_sql_ctx()->schema_guard_; + const ObTableSchema *table_schema = NULL; + int64_t row_count = 0; + if (OB_ISNULL(schema_guard)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null ptr", K(ret)); + } else if (OB_FAIL(schema_guard->get_table_schema(tenant_id, table_id, table_schema))) { + LOG_WARN("failed to get table schema", K(ret), K(tenant_id), K(table_id)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null ptr", K(ret)); +#ifdef OB_BUILD_CPP_ODPS + } else if (OB_FAIL(ObOdpsPartitionDownloaderMgr::fetch_row_count(file_url, + table_schema->get_external_properties(), + row_count))) { + if (ret == OB_ODPS_ERROR) { + ret = OB_SUCCESS; + row_count = -1; + } else { + LOG_WARN("failed to fetch row count", K(ret), K(file_url), K(table_schema->get_external_properties())); + } +#else + } else { + row_count = -1; +#endif + } + if (OB_SUCC(ret)) { + res_datum.set_int(row_count); + } + } + } else { + ret = (OB_SUCCESS == ret_file_url) ? ret_table_id : ret_file_url; + } + return ret; +} + +int ObExprCalcOdpsSize::cg_expr(ObExprCGCtx &expr_cg_ctx, const ObRawExpr &raw_expr, + ObExpr &rt_expr) const +{ + int ret = OB_SUCCESS; + UNUSED(expr_cg_ctx); + CK (2 == raw_expr.get_param_count()); + OX (rt_expr.eval_func_ = calc_odps_size); + return ret; +} + +} // namespace sql +} // namespace oceanbase diff --git a/src/sql/engine/expr/ob_expr_calc_odps_size.h b/src/sql/engine/expr/ob_expr_calc_odps_size.h new file mode 100644 index 000000000..f457fa65b --- /dev/null +++ b/src/sql/engine/expr/ob_expr_calc_odps_size.h @@ -0,0 +1,41 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_SQL_ENGINE_EXPR_CALC_ODPS_SIZE_ +#define OCEANBASE_SQL_ENGINE_EXPR_CALC_ODPS_SIZE_ + +#include "sql/engine/expr/ob_expr_operator.h" + +namespace oceanbase +{ +namespace sql +{ +class ObExprCalcOdpsSize : public ObFuncExprOperator +{ +public: + explicit ObExprCalcOdpsSize(common::ObIAllocator &alloc); + virtual ~ObExprCalcOdpsSize() {}; + + virtual int calc_result_type2(ObExprResType &type, + ObExprResType &type1, + ObExprResType &type2, + common::ObExprTypeCtx &type_ctx) const; + static int calc_odps_size(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res_datum); + virtual int cg_expr(ObExprCGCtx &expr_cg_ctx, const ObRawExpr &raw_expr, + ObExpr &rt_expr) const override; +private: + DISALLOW_COPY_AND_ASSIGN(ObExprCalcOdpsSize) const; +}; +} // namespace sql +} // namespace oceanbase + +#endif // OCEANBASE_SQL_ENGINE_EXPR_CALC_ODPS_SIZE_ diff --git a/src/sql/engine/expr/ob_expr_eval_functions.cpp b/src/sql/engine/expr/ob_expr_eval_functions.cpp index 786d4e356..20b3d3db8 100644 --- a/src/sql/engine/expr/ob_expr_eval_functions.cpp +++ b/src/sql/engine/expr/ob_expr_eval_functions.cpp @@ -420,6 +420,7 @@ #include "ob_expr_array_distinct.h" #include "ob_expr_array_remove.h" #include "ob_expr_array_map.h" +#include "ob_expr_calc_odps_size.h" #include "ob_expr_get_mysql_routine_parameter_type_str.h" #include "ob_expr_priv_st_geohash.h" #include "ob_expr_priv_st_makepoint.h" @@ -1315,6 +1316,7 @@ static ObExpr::EvalFunc g_expr_eval_functions[] = { NULL, // ObExprCalcSubPartitionName::get_sub_partition_name, /* 788 */ NULL, // ObExprCalcPartitionIdx::get_partition_idx, /* 789 */ NULL, // ObExprCalcSubPartitionIdx::get_sub_partition_idx, /* 790 */ + ObExprCalcOdpsSize::calc_odps_size, /* 791 */ }; static ObExpr::EvalBatchFunc g_expr_eval_batch_functions[] = { diff --git a/src/sql/engine/expr/ob_expr_operator_factory.cpp b/src/sql/engine/expr/ob_expr_operator_factory.cpp index 8b8a1cbe8..e6ecf2812 100644 --- a/src/sql/engine/expr/ob_expr_operator_factory.cpp +++ b/src/sql/engine/expr/ob_expr_operator_factory.cpp @@ -484,6 +484,7 @@ #include "sql/engine/expr/ob_expr_array_distinct.h" #include "sql/engine/expr/ob_expr_array_remove.h" #include "sql/engine/expr/ob_expr_array_map.h" +#include "sql/engine/expr/ob_expr_calc_odps_size.h" #include "sql/engine/expr/ob_expr_get_mysql_routine_parameter_type_str.h" #include "sql/engine/expr/ob_expr_priv_st_geohash.h" #include "sql/engine/expr/ob_expr_priv_st_makepoint.h" @@ -1197,6 +1198,7 @@ void ObExprOperatorFactory::register_expr_operators() REG_OP(ObExprArrayRemove); REG_OP(ObExprArrayMap); REG_OP(ObExprGetMySQLRoutineParameterTypeStr); + REG_OP(ObExprCalcOdpsSize); }(); // 注册oracle系统函数 REG_OP_ORCL(ObExprSysConnectByPath); @@ -1529,6 +1531,7 @@ void ObExprOperatorFactory::register_expr_operators() REG_OP_ORCL(ObExprGetPath); REG_OP_ORCL(ObExprDecodeTraceId); REG_OP_ORCL(ObExprSplitPart); + REG_OP_ORCL(ObExprCalcOdpsSize); } bool ObExprOperatorFactory::is_expr_op_type_valid(ObExprOperatorType type) diff --git a/src/sql/engine/px/ob_dfo.cpp b/src/sql/engine/px/ob_dfo.cpp index a73e69f1f..88898e73b 100644 --- a/src/sql/engine/px/ob_dfo.cpp +++ b/src/sql/engine/px/ob_dfo.cpp @@ -194,12 +194,8 @@ int ObPxSqcMeta::assign(const ObPxSqcMeta &other) for (int i = 0; OB_SUCC(ret) && i < other.access_external_table_files_.count(); i++) { const ObExternalFileInfo &other_file = other.access_external_table_files_.at(i); ObExternalFileInfo temp_file; - temp_file.file_id_ = other_file.file_id_; - temp_file.part_id_ = other_file.part_id_; - temp_file.file_addr_ = other_file.file_addr_; - temp_file.file_size_ = other_file.file_size_; - if (OB_FAIL(ob_write_string(allocator_, other_file.file_url_, temp_file.file_url_))) { - LOG_WARN("fail to write string", K(ret)); + if (OB_FAIL(temp_file.deep_copy(allocator_, other_file))) { + LOG_WARN("fail to deep copy ObExternalFileInfo", K(ret)); } else if (OB_FAIL(access_external_table_files_.push_back(temp_file))) { LOG_WARN("fail to push back", K(ret)); } diff --git a/src/sql/engine/px/ob_granule_pump.cpp b/src/sql/engine/px/ob_granule_pump.cpp index 3967eb58b..b773958d3 100644 --- a/src/sql/engine/px/ob_granule_pump.cpp +++ b/src/sql/engine/px/ob_granule_pump.cpp @@ -638,14 +638,14 @@ int ObGranulePump::add_new_gi_task(ObGranulePumpArgs &args) // if (!(args.asc_order() || args.desc_order() || ObGITaskSet::GI_RANDOM_NONE != random_type)) { // random_type = ObGITaskSet::GI_RANDOM_TASK; // } - if (OB_FAIL(splitter.split_granule(args, + if (OB_FAIL(init_external_odps_table_downloader(args))) { + LOG_WARN("failed to init external odps table downloader", K(ret)); + } else if (OB_FAIL(splitter.split_granule(args, scan_ops, gi_task_array_map_, random_type, partition_granule))) { LOG_WARN("failed to prepare random gi task", K(ret), K(partition_granule)); - } else if (OB_FAIL(init_external_odps_table_downloader(args))) { - LOG_WARN("failed to init external odps table downloader", K(ret)); } } return ret; @@ -655,10 +655,16 @@ int ObGranulePump::init_external_odps_table_downloader(ObGranulePumpArgs &args) { int ret = OB_SUCCESS; const ObTableScanSpec *tsc = NULL; - sql::ObExternalFileFormat external_odps_format; - if (!args.external_table_files_.empty() && - 0 == args.external_table_files_.at(0).file_id_) { //file_id_ == 0 means it's a external odps table - ObIArray &scan_ops = args.op_info_.get_scan_ops(); + bool is_odps_external_table = false; + ObIArray &scan_ops = args.op_info_.get_scan_ops(); + if (scan_ops.count() == 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("empty scan_ops", K(ret)); + } else if (OB_FAIL(ObSQLUtils::is_odps_external_table(scan_ops.at(0)->tsc_ctdef_.scan_ctdef_.external_file_format_str_.str_, + is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret)); + } else if (!args.external_table_files_.empty() && + is_odps_external_table) { if (scan_ops.empty() || scan_ops.count() != gi_task_array_map_.count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid scan ops and gi task array result", K(ret), K(scan_ops.count()), K(gi_task_array_map_.count())); @@ -666,12 +672,11 @@ int ObGranulePump::init_external_odps_table_downloader(ObGranulePumpArgs &args) ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null ptr", K(ret)); #ifdef OB_BUILD_CPP_ODPS - } else if (OB_FAIL(odps_partition_downloader_mgr_.init_downloader(args.external_table_files_, - tsc->tsc_ctdef_.scan_ctdef_.external_file_format_str_.str_))) { + } else if (OB_FAIL(odps_partition_downloader_mgr_.init_map(args.external_table_files_.count()))) { LOG_WARN("init odps_partition_downloader_mgr_ failed", K(ret), K(args.external_table_files_.count())); -#endif } else { - LOG_TRACE("succ to init odps table partition downloader", K(ret)); + LOG_TRACE("succ to init odps table partition downloader", K(ret), K(is_odps_downloader_inited())); +#endif } } return ret; @@ -721,10 +726,10 @@ int ObGranulePump::check_can_randomize(ObGranulePumpArgs &args, bool &can_random void ObGranulePump::destroy() { gi_task_array_map_.reset(); - pump_args_.reset(); #ifdef OB_BUILD_CPP_ODPS odps_partition_downloader_mgr_.reset(); #endif + pump_args_.reset(); } void ObGranulePump::reset_task_array() diff --git a/src/sql/engine/px/ob_granule_pump.h b/src/sql/engine/px/ob_granule_pump.h index 6c1421526..5696ba8ea 100644 --- a/src/sql/engine/px/ob_granule_pump.h +++ b/src/sql/engine/px/ob_granule_pump.h @@ -568,6 +568,9 @@ public: ret = odps_partition_downloader_mgr_.get_odps_downloader(part_id, downloader); return ret; } + inline ObOdpsPartitionDownloaderMgr::OdpsMgrMap& get_odps_map() { + return odps_partition_downloader_mgr_.get_odps_map(); + } inline bool is_odps_downloader_inited() { return odps_partition_downloader_mgr_.is_download_mgr_inited(); } ObOdpsPartitionDownloaderMgr &get_odps_mgr() { return odps_partition_downloader_mgr_; } #endif diff --git a/src/sql/engine/px/ob_granule_util.cpp b/src/sql/engine/px/ob_granule_util.cpp index 7828415bb..b5b18320f 100644 --- a/src/sql/engine/px/ob_granule_util.cpp +++ b/src/sql/engine/px/ob_granule_util.cpp @@ -116,36 +116,28 @@ int ObGranuleUtil::split_granule_for_external_table(ObIAllocator &allocator, ObExternalFileFormat::ODPS_FORMAT == external_file_format.format_type_) { #ifdef OB_BUILD_CPP_ODPS int64_t task_idx = 0; + LOG_TRACE("odps external table granule switch", K(ret), K(external_table_files.count()), K(external_table_files)); for (int64_t i = 0; OB_SUCC(ret) && i < external_table_files.count(); ++i) { const ObExternalFileInfo& external_info = external_table_files.at(i); + ObNewRange new_range; if (0 != external_info.file_id_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected file id", K(ret), K(i), K(external_info.file_id_)); } else { - // file_size_ is the total row cnt of odps table partition - uint64_t block_cnt = (external_info.file_size_ + sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE - 1) - / sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE; - uint64_t start_idx = 0; - block_cnt = (0 == block_cnt ? 1 : block_cnt); // one odps table partition should have at least one task, even it's empty - for (int64_t j = 0; OB_SUCC(ret) && j < block_cnt; ++j) { - ObNewRange new_range; - int64_t start = start_idx + (sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE * j); - int64_t end = sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE; - if (OB_FAIL(ObExternalTableUtils::make_external_table_scan_range(external_info.file_url_, + int64_t file_row_count = external_info.row_count_ ? external_info.row_count_ : (external_info.file_size_ > 0 ? external_info.file_size_ : INT64_MAX); + int64_t file_start = external_info.row_count_ ? external_info.row_start_ : 0; + if (OB_FAIL(ObExternalTableUtils::make_external_table_scan_range(external_info.file_url_, external_info.file_id_, external_info.part_id_, - start_idx + (sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE * j), - j == block_cnt -1 ? - INT64_MAX : - sql::ObODPSTableRowIterator::ODPS_BLOCK_DOWNLOAD_SIZE, + file_start, + file_row_count, allocator, new_range))) { - LOG_WARN("failed to make external table scan range", K(ret)); - } else if ((OB_FAIL(granule_ranges.push_back(new_range)) || - OB_FAIL(granule_idx.push_back(task_idx++)) || - OB_FAIL(granule_tablets.push_back(tablets.at(0))))) { - LOG_WARN("fail to push back", K(ret)); - } + LOG_WARN("failed to make external table scan range", K(ret)); + } else if ((OB_FAIL(granule_ranges.push_back(new_range)) || + OB_FAIL(granule_idx.push_back(task_idx++)) || + OB_FAIL(granule_tablets.push_back(tablets.at(0))))) { + LOG_WARN("fail to push back", K(ret)); } } } diff --git a/src/sql/engine/px/ob_granule_util.h b/src/sql/engine/px/ob_granule_util.h index ca0e1c507..cec87cb45 100644 --- a/src/sql/engine/px/ob_granule_util.h +++ b/src/sql/engine/px/ob_granule_util.h @@ -273,7 +273,7 @@ public: bool range_independent); - static int split_granule_for_external_table(common::ObIAllocator &allocator, + static int split_granule_for_external_table(ObIAllocator &allocator, const ObTableScanSpec *tsc, const common::ObIArray &input_ranges, const common::ObIArray &tablet_array, diff --git a/src/sql/engine/px/ob_px_util.cpp b/src/sql/engine/px/ob_px_util.cpp index 837eb3899..4a28c2ec8 100644 --- a/src/sql/engine/px/ob_px_util.cpp +++ b/src/sql/engine/px/ob_px_util.cpp @@ -34,6 +34,9 @@ #include "share/external_table/ob_external_table_file_mgr.h" #include "rpc/obrpc/ob_net_keepalive.h" #include "share/external_table/ob_external_table_utils.h" +#ifdef OB_BUILD_CPP_ODPS +#include "sql/engine/table/ob_odps_table_row_iter.h" +#endif using namespace oceanbase::common; @@ -291,8 +294,28 @@ int ObPXServerAddrUtil::get_external_table_loc( } } } else { - int64_t expected_location_cnt = std::min(dfo.get_dop(), dfo.get_external_table_files().count()); - if (1 == expected_location_cnt) { + bool is_odps_external_table = false; + ObSEArray scan_ops; + const ObTableScanSpec *scan_op = nullptr; + const ObOpSpec *root_op = NULL; + int64_t expected_location_cnt = 0; + dfo.get_root(root_op); + if (OB_ISNULL(root_op)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null ptr", K(ret)); + } else if (OB_FAIL(ObTaskSpliter::find_scan_ops(scan_ops, *root_op))) { + LOG_WARN("failed to find scan_ops", K(ret), KP(root_op)); + } else if (scan_ops.count() == 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("empty scan_ops", K(ret)); + } else if (OB_FAIL(ObSQLUtils::is_odps_external_table(scan_ops.at(0)->tsc_ctdef_.scan_ctdef_.external_file_format_str_.str_, + is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret)); + } else if (FALSE_IT(expected_location_cnt = std::min(dfo.get_dop(), + ((!ext_file_urls.empty() && is_odps_external_table) ? + all_locations.count() : dfo.get_external_table_files().count())))) { + + } else if (1 == expected_location_cnt) { if (OB_FAIL(target_locations.push_back(GCTX.self_addr()))) { LOG_WARN("fail to push push back", K(ret)); } @@ -318,11 +341,13 @@ int ObPXServerAddrUtil::get_external_table_loc( } int ObPXServerAddrUtil::assign_external_files_to_sqc( - const ObIArray &files, + ObDfo &dfo, bool is_file_on_disk, - ObIArray &sqcs) + ObIArray &sqcs, + int64_t parallel) { int ret = OB_SUCCESS; + const common::ObIArray &files = dfo.get_external_table_files(); if (is_file_on_disk) { ObAddr pre_addr; ObPxSqcMeta *target_sqc = NULL; @@ -346,19 +371,41 @@ int ObPXServerAddrUtil::assign_external_files_to_sqc( } } } else { - ObArray file_assigned_sqc_ids; - OZ (ObExternalTableUtils::calc_assigned_files_to_sqcs(files, file_assigned_sqc_ids, sqcs.count())); - if (OB_SUCC(ret) && file_assigned_sqc_ids.count() != files.count()) { + bool is_odps_external_table = false; + ObSEArray scan_ops; + const ObTableScanSpec *scan_op = nullptr; + const ObOpSpec *root_op = NULL; + dfo.get_root(root_op); + if (OB_ISNULL(root_op)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid result of assigned sqc", K(file_assigned_sqc_ids.count()), K(files.count())); - } - for (int i = 0; OB_SUCC(ret) && i < file_assigned_sqc_ids.count(); i++) { - int64_t assign_sqc_idx = file_assigned_sqc_ids.at(i); - if (OB_UNLIKELY(assign_sqc_idx >= sqcs.count() || assign_sqc_idx < 0)) { + LOG_WARN("unexpected null ptr", K(ret)); + } else if (OB_FAIL(ObTaskSpliter::find_scan_ops(scan_ops, *root_op))) { + LOG_WARN("failed to find scan_ops", K(ret), KP(root_op)); + } else if (scan_ops.count() == 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("empty scan_ops", K(ret)); + } else if (OB_FAIL(ObSQLUtils::is_odps_external_table(scan_ops.at(0)->tsc_ctdef_.scan_ctdef_.external_file_format_str_.str_, + is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret)); + } else if (is_odps_external_table) { + if (OB_FAIL(ObExternalTableUtils::assign_odps_file_to_sqcs(dfo, sqcs, parallel))) { + LOG_WARN("failed to assisn odps file to sqcs", K(files), K(ret)); + } + } else { + ObArray file_assigned_sqc_ids; + OZ (ObExternalTableUtils::calc_assigned_files_to_sqcs(files, file_assigned_sqc_ids, sqcs.count())); + if (OB_SUCC(ret) && file_assigned_sqc_ids.count() != files.count()) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected file idx", K(file_assigned_sqc_ids.at(i))); - } else { - OZ (sqcs.at(assign_sqc_idx)->get_access_external_table_files().push_back(files.at(i))); + LOG_WARN("invalid result of assigned sqc", K(file_assigned_sqc_ids.count()), K(files.count())); + } + for (int i = 0; OB_SUCC(ret) && i < file_assigned_sqc_ids.count(); i++) { + int64_t assign_sqc_idx = file_assigned_sqc_ids.at(i); + if (OB_UNLIKELY(assign_sqc_idx >= sqcs.count() || assign_sqc_idx < 0)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected file idx", K(file_assigned_sqc_ids.at(i))); + } else { + OZ (sqcs.at(assign_sqc_idx)->get_access_external_table_files().push_back(files.at(i))); + } } } } @@ -648,8 +695,8 @@ int ObPXServerAddrUtil::build_dfo_sqc(ObExecContext &ctx, } if (OB_SUCC(ret) && !locations.empty() && (*locations.begin())->loc_meta_->is_external_table_) { - if (OB_FAIL(assign_external_files_to_sqc(dfo.get_external_table_files(), - (*locations.begin())->loc_meta_->is_external_files_on_disk_, sqcs))) { + if (OB_FAIL(assign_external_files_to_sqc(dfo, + (*locations.begin())->loc_meta_->is_external_files_on_disk_, sqcs, parallel))) { LOG_WARN("fail to assign external files to sqc", K(ret)); } } diff --git a/src/sql/engine/px/ob_px_util.h b/src/sql/engine/px/ob_px_util.h index ac2add493..40105c64d 100644 --- a/src/sql/engine/px/ob_px_util.h +++ b/src/sql/engine/px/ob_px_util.h @@ -280,9 +280,10 @@ private: common::ObIArray &dst_addrs); static int sort_and_collect_local_file_distribution(common::ObIArray &files, common::ObIArray &dst_addrs); - static int assign_external_files_to_sqc(const common::ObIArray &files, + static int assign_external_files_to_sqc(ObDfo &dfo, bool is_file_on_disk, - common::ObIArray &sqcs); + common::ObIArray &sqcs, + int64_t parallel); private: static int generate_dh_map_info(ObDfo &dfo); DISALLOW_COPY_AND_ASSIGN(ObPXServerAddrUtil); diff --git a/src/sql/engine/table/ob_external_table_access_service.cpp b/src/sql/engine/table/ob_external_table_access_service.cpp index 001d57919..71364e8f6 100644 --- a/src/sql/engine/table/ob_external_table_access_service.cpp +++ b/src/sql/engine/table/ob_external_table_access_service.cpp @@ -876,6 +876,8 @@ int ObExternalTableRowIterator::calc_file_partition_list_value(const int64_t par share::schema::ObSchemaGetterGuard schema_guard; const ObTableSchema *table_schema = NULL; const ObPartition *partition = NULL; + ObExternalFileFormat::FormatType external_table_type; + bool is_odps_external_table = false; if (OB_ISNULL(GCTX.schema_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error"); @@ -888,7 +890,9 @@ int ObExternalTableRowIterator::calc_file_partition_list_value(const int64_t par } else if (OB_ISNULL(table_schema)) { ret = OB_TABLE_NOT_EXIST; LOG_WARN("table not exist", K(scan_param_->index_id_), K(scan_param_->tenant_id_)); - } else if (table_schema->is_partitioned_table() && (table_schema->is_user_specified_partition_for_external_table() || table_schema->is_odps_external_table())) { + } else if (OB_FAIL(ObSQLUtils::is_odps_external_table(table_schema, is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret)); + } else if (table_schema->is_partitioned_table() && (table_schema->is_user_specified_partition_for_external_table() || is_odps_external_table)) { if (OB_FAIL(table_schema->get_partition_by_part_id(part_id, CHECK_PARTITION_MODE_NORMAL, partition))) { LOG_WARN("get partition failed", K(ret), K(part_id)); } else if (OB_ISNULL(partition) || OB_UNLIKELY(partition->get_list_row_values().count() != 1) diff --git a/src/sql/engine/table/ob_odps_table_row_iter.cpp b/src/sql/engine/table/ob_odps_table_row_iter.cpp index cabc8b9de..2afb9b6ac 100644 --- a/src/sql/engine/table/ob_odps_table_row_iter.cpp +++ b/src/sql/engine/table/ob_odps_table_row_iter.cpp @@ -22,40 +22,18 @@ namespace sql { int ObODPSTableRowIterator::OdpsPartition::reset() { int ret = OB_SUCCESS; - try { - download_handle_->Complete(); - download_handle_ = NULL; - record_count_ = -1; - name_ = ""; - download_id_ = ""; - } catch (apsara::odps::sdk::OdpsTunnelException& ex) { - if (OB_SUCC(ret)) { - ret = OB_ODPS_ERROR; - LOG_WARN("odps exception occured when calling Complete method", K(ret), K(ex.what())); - LOG_USER_ERROR(OB_ODPS_ERROR, ex.what()); - } - } catch (const std::exception &ex) { - if (OB_SUCC(ret)) { - ret = OB_ODPS_ERROR; - LOG_WARN("odps exception occured when calling Complete method", K(ret), K(ex.what()), KP(this)); - LOG_USER_ERROR(OB_ODPS_ERROR, ex.what()); - } - } catch (...) { - if (OB_SUCC(ret)) { - ret = OB_ODPS_ERROR; - LOG_WARN("odps exception occured when calling Complete method", K(ret)); - } - } + record_count_ = -1; + name_.clear(); return ret; } -int ObODPSTableRowIterator::init_tunnel(const sql::ObODPSGeneralFormat &odps_format) +int ObODPSTableRowIterator::init_tunnel(const sql::ObODPSGeneralFormat &odps_format, bool need_decrypt) { int ret = OB_SUCCESS; try { if (OB_FAIL(odps_format_.deep_copy(odps_format))) { LOG_WARN("failed to deep copy odps format", K(ret)); - } else if (OB_FAIL(odps_format_.decrypt())) { + } else if (need_decrypt && OB_FAIL(odps_format_.decrypt())) { LOG_WARN("failed to decrypt odps format", K(ret)); } else { LOG_TRACE("init tunnel format", K(ret)); @@ -89,6 +67,10 @@ int ObODPSTableRowIterator::init_tunnel(const sql::ObODPSGeneralFormat &odps_for } conf_.SetAccount(account_); conf_.SetEndpoint(std::string(odps_format_.endpoint_.ptr(), odps_format_.endpoint_.length())); + if (!odps_format_.tunnel_endpoint_.empty()) { + LOG_TRACE("set tunnel endpoint", K(ret), K(odps_format_.tunnel_endpoint_)); + conf_.SetTunnelEndpoint(std::string(odps_format_.tunnel_endpoint_.ptr(), odps_format_.tunnel_endpoint_.length())); + } conf_.SetUserAgent("OB_ACCESS_ODPS"); conf_.SetTunnelQuotaName(std::string(odps_format_.quota_.ptr(), odps_format_.quota_.length())); if (0 == odps_format_.compression_code_.case_compare("zlib")) { @@ -140,7 +122,7 @@ int ObODPSTableRowIterator::init_tunnel(const sql::ObODPSGeneralFormat &odps_for return ret; } -int ObODPSTableRowIterator::create_downloader(ObString &part_spec, apsara::odps::sdk::IDownloadPtr &downloader) +int ObODPSTableRowIterator::create_downloader(const ObString &part_spec, apsara::odps::sdk::IDownloadPtr &downloader) { int ret = OB_SUCCESS; try { @@ -207,6 +189,7 @@ int ObODPSTableRowIterator::next_task() { int ret = OB_SUCCESS; ObEvalCtx &eval_ctx = scan_param_->op_->get_eval_ctx(); + LOG_TRACE("get a new task start", K(ret), K(batch_size_), K(state_)); int64_t task_idx = state_.task_idx_; int64_t start = 0; int64_t step = 0; @@ -225,39 +208,80 @@ int ObODPSTableRowIterator::next_task() try { const ObString &part_spec = scan_param_->key_ranges_.at(task_idx).get_start_key().get_obj_ptr()[ObExternalTableUtils::FILE_URL].get_string(); int64_t part_id = scan_param_->key_ranges_.at(task_idx).get_start_key().get_obj_ptr()[ObExternalTableUtils::PARTITION_ID].get_int(); - std::string project(odps_format_.project_.ptr(), odps_format_.project_.length()); - std::string table(odps_format_.table_.ptr(), odps_format_.table_.length()); - std::string std_part_spec(part_spec.ptr(), part_spec.length()); - std::string download_id(""); - std::string schema(odps_format_.schema_.ptr(), odps_format_.schema_.length()); std::vector column_names; + const ExprFixedArray &file_column_exprs = *(scan_param_->ext_file_column_exprs_); + for (int64_t column_idx = 0; column_idx < target_column_id_list_.count(); ++column_idx) { + if (file_column_exprs.at(column_idx)->type_ == T_PSEUDO_EXTERNAL_FILE_COL) { + column_names.emplace_back(column_list_.at(target_column_id_list_.at(column_idx)).name_); + } + } if (part_spec.compare("#######DUMMY_FILE#######") == 0) { ret = OB_ITER_END; - } else if (OB_ISNULL(sqc) && - OB_ISNULL((state_.download_handle_ = tunnel_.CreateDownload(project, - table, - std_part_spec, - download_id, - schema)).get())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexcepted null ptr", K(ret)); - } else if (OB_NOT_NULL(sqc) && - !sqc->get_sqc_ctx().gi_pump_.is_odps_downloader_inited() && - OB_ISNULL((state_.download_handle_ = tunnel_.CreateDownload(project, - table, - std_part_spec, - download_id, - schema)).get())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexcepted null ptr", K(ret)); - } else if (OB_NOT_NULL(sqc) && - sqc->get_sqc_ctx().gi_pump_.is_odps_downloader_inited() && - OB_FAIL(sqc->get_sqc_ctx().gi_pump_.get_odps_downloader(part_id, state_.download_handle_))) { - LOG_WARN("failed to get odps downloader", K(ret), K(part_id)); + LOG_WARN("empty file", K(ret)); + } else { + if (OB_ISNULL(sqc) || !sqc->get_sqc_ctx().gi_pump_.is_odps_downloader_inited()) { + std::string project(odps_format_.project_.ptr(), odps_format_.project_.length()); + std::string table(odps_format_.table_.ptr(), odps_format_.table_.length()); + std::string std_part_spec(part_spec.ptr(), part_spec.length()); + std::string download_id(""); + std::string schema(odps_format_.schema_.ptr(), odps_format_.schema_.length()); + state_.download_handle_ = tunnel_.CreateDownload(project, + table, + std_part_spec, + download_id, + schema); + state_.is_from_gi_pump_ = false; + LOG_TRACE("succ to create downloader handle without GI", K(ret), K(part_id), KP(sqc), K(state_.is_from_gi_pump_)); + } else { + ObOdpsPartitionDownloaderMgr::OdpsMgrMap& odps_map = sqc->get_sqc_ctx().gi_pump_.get_odps_map(); + state_.is_from_gi_pump_ = true; + if (OB_FAIL(sqc->get_sqc_ctx().gi_pump_.get_odps_downloader(part_id, state_.download_handle_))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader *temp_downloader = NULL; + if (sqc->get_sqc_ctx().gi_pump_.get_pump_args().empty()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected empty gi pump args", K(ret)); + } else if (OB_ISNULL(temp_downloader = static_cast( + sqc->get_sqc_ctx().gi_pump_.get_odps_mgr().get_allocator().alloc(sizeof(ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate memory", K(ret), K(sizeof(ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader))); + } else if (FALSE_IT(new(temp_downloader)ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader())) { + } else if (OB_FAIL(temp_downloader->odps_driver_.init_tunnel(odps_format_, false))) { + LOG_WARN("failed to init tunnel", K(ret), K(part_id)); + } else if (OB_FAIL(temp_downloader->odps_driver_.create_downloader(part_spec, temp_downloader->odps_partition_downloader_))) { + LOG_WARN("failed create odps partition downloader", K(ret), K(part_id)); + } else if (OB_FAIL(odps_map.set_refactored(part_id, reinterpret_cast(temp_downloader), 0/*flag*/, 0/*broadcast*/, 0/*overwrite_key*/))) { + if (OB_HASH_EXIST == ret) { + ret = OB_SUCCESS; + if (OB_FAIL(sqc->get_sqc_ctx().gi_pump_.get_odps_downloader(part_id, state_.download_handle_))) { + LOG_WARN("failed to get from odps_map", K(part_id), K(ret)); + } else { + LOG_TRACE("succ to get downloader handle from GI", K(ret), K(part_id), K(state_.is_from_gi_pump_)); + } + } else { + LOG_WARN("fail to set refactored", K(ret)); + } + temp_downloader->reset(); + } else { + state_.download_handle_ = temp_downloader->odps_partition_downloader_; + LOG_TRACE("succ to create downloader handle and set it to GI", K(ret), K(part_id), K(state_.is_from_gi_pump_)); + } + } else { + LOG_WARN("failed to get from odps_map", K(part_id), K(ret)); + } + } else { + LOG_TRACE("succ to get downloader handle from GI", K(ret), K(part_id), K(state_.is_from_gi_pump_)); + } + } + } + if (OB_FAIL(ret)) { + //do nothing } else if (OB_ISNULL(state_.download_handle_.get())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexcepted null ptr", K(ret)); - } else if (OB_ISNULL((state_.record_reader_handle_ = state_.download_handle_->OpenReader(start, + LOG_WARN("unexcepted null ptr", K(ret), KP(sqc), K(state_.is_from_gi_pump_)); + } else if (column_names.size() && + OB_ISNULL((state_.record_reader_handle_ = state_.download_handle_->OpenReader(start, step, column_names, true)).get())) { @@ -266,17 +290,24 @@ int ObODPSTableRowIterator::next_task() } else if (OB_FAIL(calc_file_partition_list_value(part_id, arena_alloc_, state_.part_list_val_))) { LOG_WARN("failed to calc parttion list value", K(part_id), K(ret)); } else { - state_.task_idx_ = task_idx; - state_.part_id_ = part_id; - state_.start_ = start; - state_.step_ = step; - state_.count_ = 0; - state_.is_from_gi_pump_ = OB_NOT_NULL(sqc) && sqc->get_sqc_ctx().gi_pump_.is_odps_downloader_inited(); - state_.download_id_ = state_.download_handle_->GetDownloadId(); - state_.part_spec_ = std_part_spec; - // what if error occur after this line, how to close state_.record_reader_handle_? - LOG_TRACE("get a new task", K(ret), K(batch_size_), K(state_)); - if (OB_SUCC(ret) && -1 == batch_size_) { // exec once only + int64_t real_time_partition_row_count = state_.download_handle_->GetRecordCount(); + if (start >= real_time_partition_row_count) { + ret = OB_ITER_END; + LOG_WARN("odps iter end", K(ret), K(part_id), K(state_.start_), K(real_time_partition_row_count)); + } else if (INT64_MAX == step || start + step > real_time_partition_row_count) { + step = real_time_partition_row_count - start; + } + if (OB_SUCC(ret)) { + state_.task_idx_ = task_idx; + state_.part_id_ = part_id; + state_.start_ = start; + state_.step_ = step; + state_.count_ = 0; + state_.download_id_ = state_.download_handle_->GetDownloadId(); + // what if error occur after this line, how to close state_.record_reader_handle_? + LOG_TRACE("get a new task", K(ret), K(batch_size_), K(state_), K(real_time_partition_row_count), K(column_names.size())); + } + if (OB_SUCC(ret) && -1 == batch_size_ && column_names.size()) { // exec once only batch_size_ = eval_ctx.max_batch_size_; if (0 == batch_size_) { // even state_.record_reader_handle_ was destroyed, record_/records_ is still valid. @@ -684,6 +715,7 @@ int ObODPSTableRowIterator::pull_partition_info() partition_list_.reset(); std::vector part_specs; try { + LOG_TRACE("get partition names start", K(ret)); if (OB_ISNULL(table_handle_.get())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexcepted null ptr", K(ret)); @@ -691,6 +723,7 @@ int ObODPSTableRowIterator::pull_partition_info() table_handle_->GetPartitionNames(part_specs); is_part_table_ = true; } + LOG_TRACE("get partition names end", K(ret), K(is_part_table_)); } catch (apsara::odps::sdk::OdpsException& ex) { std::string ex_msg = ex.what(); if (std::string::npos != ex_msg.find("ODPS-0110031")) { // ODPS-0110031 means table is not a partitional table @@ -714,30 +747,20 @@ int ObODPSTableRowIterator::pull_partition_info() LOG_WARN("odps exception occured when calling GetPartitionNames method", K(ret)); } } - if (OB_SUCC(ret) && !is_part_table_) { - part_specs.push_back(""); - } try { - std::string project(odps_format_.project_.ptr(), odps_format_.project_.length()); - std::string table(odps_format_.table_.ptr(), odps_format_.table_.length()); - std::string schema(odps_format_.schema_.ptr(), odps_format_.schema_.length()); - for (std::vector::iterator part_spec = part_specs.begin(); OB_SUCC(ret) && part_spec != part_specs.end(); part_spec++) { - std::string download_id(""); - apsara::odps::sdk::IDownloadPtr download_handle = NULL; + if (is_part_table_) { + for (std::vector::iterator part_spec = part_specs.begin(); OB_SUCC(ret) && part_spec != part_specs.end(); part_spec++) { + int64_t record_count = -1; + //apsara::odps::sdk::IODPSPartitionPtr partition = table_handle_->GetPartition(*part_spec); + //record_count = partition->GetPartitionSize(); + if (OB_FAIL(partition_list_.push_back(OdpsPartition(*part_spec, record_count)))){ + LOG_WARN("failed to push back partition_list_", K(ret)); + } + } + } else { int64_t record_count = -1; - if (OB_ISNULL((download_handle = tunnel_.CreateDownload(project, - table, - *part_spec, - download_id, - schema)).get())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexcepted null ptr", K(ret)); - } else if (FALSE_IT(download_id = download_handle->GetDownloadId())) { - } else if (FALSE_IT(record_count = download_handle->GetRecordCount())) { - } else if (OB_FAIL(partition_list_.push_back(OdpsPartition(*part_spec, - download_handle, - download_id, - record_count)))){ + //record_count = table_handle_->GetSize(); + if (OB_FAIL(partition_list_.push_back(OdpsPartition("", record_count)))){ LOG_WARN("failed to push back partition_list_", K(ret)); } } @@ -802,6 +825,30 @@ int ObODPSTableRowIterator::pull_column() { return ret; } +int ObODPSTableRowIterator::fill_partition_list_data(ObExpr &expr, int64_t returned_row_cnt) { + int ret = OB_SUCCESS; + ObEvalCtx &ctx = scan_param_->op_->get_eval_ctx(); + ObDatum *datums = expr.locate_batch_datums(ctx); + ObObjType type = expr.obj_meta_.get_type(); + if (expr.type_ == T_PSEUDO_PARTITION_LIST_COL) { + for (int64_t row_idx = 0; OB_SUCC(ret) && row_idx < returned_row_cnt; ++row_idx) { + int64_t loc_idx = expr.extra_ - 1; + if (OB_UNLIKELY(loc_idx < 0 || loc_idx >= state_.part_list_val_.get_count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexcepted loc_idx", K(ret), K(loc_idx), K(state_.part_list_val_.get_count()), KP(&state_.part_list_val_)); + } else if (state_.part_list_val_.get_cell(loc_idx).is_null()) { + datums[row_idx].set_null(); + } else { + CK (OB_NOT_NULL(datums[row_idx].ptr_)); + OZ (datums[row_idx].from_obj(state_.part_list_val_.get_cell(loc_idx))); + } + } + } else { + //do nothing + } + return ret; +} + void ObODPSTableRowIterator::reset() { state_.reuse(); // reset state_ to initial values for rescan @@ -813,19 +860,20 @@ int ObODPSTableRowIterator::StateValues::reuse() try { if (-1 == task_idx_) { // do nothing - } else if (OB_ISNULL(record_reader_handle_.get())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null ptr", K(ret), K(lbt())); - } else if (OB_ISNULL(download_handle_.get())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null ptr", K(ret), K(lbt())); } else { - record_reader_handle_->Close(); - record_reader_handle_.reset(); - if (!is_from_gi_pump_) { - download_handle_->Complete(); + if (OB_NOT_NULL(record_reader_handle_.get())) { + record_reader_handle_->Close(); + record_reader_handle_.reset(); + } + if (OB_ISNULL(download_handle_.get())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null ptr", K(ret), K(lbt())); + } else { + if (!is_from_gi_pump_) { + download_handle_->Complete(); + } + download_handle_.reset(); } - download_handle_.reset(); } } catch (const apsara::odps::sdk::OdpsTunnelException& ex) { if (OB_SUCC(ret)) { @@ -850,21 +898,86 @@ int ObODPSTableRowIterator::StateValues::reuse() start_ = 0; step_ = 0; count_ = 0; - part_spec_.clear(); download_id_.clear(); part_list_val_.reset(); is_from_gi_pump_ = false; return ret; } +int ObODPSTableRowIterator::retry_read_task() +{ + int ret = OB_SUCCESS; + try { + LOG_TRACE("before retry read task", K(ret), K(state_), K(total_count_)); + if (OB_NOT_NULL(state_.record_reader_handle_.get())) { + state_.record_reader_handle_->Close(); + state_.record_reader_handle_.reset(); + } + std::vector column_names; + const ExprFixedArray &file_column_exprs = *(scan_param_->ext_file_column_exprs_); + for (int64_t column_idx = 0; column_idx < target_column_id_list_.count(); ++column_idx) { + if (file_column_exprs.at(column_idx)->type_ == T_PSEUDO_EXTERNAL_FILE_COL) { + column_names.emplace_back(column_list_.at(target_column_id_list_.at(column_idx)).name_); + } + } + if (column_names.size() && + OB_ISNULL((state_.record_reader_handle_ = state_.download_handle_->OpenReader(state_.start_ + state_.count_, + state_.step_ - state_.count_, + column_names, + true)).get())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexcepted null ptr", K(ret)); + } else { + state_.download_id_ = state_.download_handle_->GetDownloadId(); + LOG_TRACE("retry odps task success", K(ret), K(state_), K(total_count_)); + } + } catch (apsara::odps::sdk::OdpsException& ex) { + if (OB_SUCC(ret)) { + ret = OB_ODPS_ERROR; + LOG_WARN("odps exception occured when calling odps api", K(ret), K(ex.what())); + LOG_USER_ERROR(OB_ODPS_ERROR, ex.what()); + } + } catch (const std::exception &ex) { + if (OB_SUCC(ret)) { + ret = OB_ODPS_ERROR; + LOG_WARN("odps exception occured when calling odps api", K(ret), K(ex.what())); + LOG_USER_ERROR(OB_ODPS_ERROR, ex.what()); + } + } catch (...) { + if (OB_SUCC(ret)) { + ret = OB_ODPS_ERROR; + LOG_WARN("odps exception occured when calling odps api", K(ret)); + } + } + return ret; +} + int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) { - int ret = 0; + int ret = OB_SUCCESS; ObMallocHookAttrGuard guard(mem_attr_); int64_t returned_row_cnt = 0; ObEvalCtx &ctx = scan_param_->op_->get_eval_ctx(); const ExprFixedArray &file_column_exprs = *(scan_param_->ext_file_column_exprs_); - if (state_.count_ >= state_.step_ && OB_FAIL(next_task())) { + if (!file_column_exprs.count() || + OB_ISNULL(state_.record_reader_handle_.get())) { + count = std::min(capacity, state_.step_ - state_.count_); + total_count_ += count; + state_.count_ += count; + for (int64_t column_idx = 0; OB_SUCC(ret) && column_idx < target_column_id_list_.count(); ++column_idx) { + ObExpr &expr = *file_column_exprs.at(column_idx); + if (OB_FAIL(fill_partition_list_data(expr, count))) { + LOG_WARN("failed to fill partition list data", K(ret), K(file_column_exprs.count())); + } + } + if (OB_SUCC(ret) && + state_.count_ >= state_.step_ && + OB_FAIL(next_task())) { + if (OB_ITER_END != ret) { + LOG_WARN("get next task failed", K(ret)); + } + } + } else if (state_.count_ >= state_.step_ && OB_FAIL(next_task())) { if (OB_ITER_END != ret) { LOG_WARN("get next task failed", K(ret)); } @@ -889,13 +1002,16 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) state_.step_ = state_.count_; // goto get next task count = 0; } else if (0 == returned_row_cnt) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected returned_row_cnt", K(total_count_), K(returned_row_cnt), K(state_), K(ret)); + LOG_TRACE("unexpected returned_row_cnt, going to retry read task", K(total_count_), K(returned_row_cnt), K(state_), K(ret)); + if (OB_FAIL(retry_read_task())) { + LOG_WARN("failed to retry read task", K(ret), K(state_)); + } } } else { - ret = OB_ODPS_ERROR; - LOG_WARN("odps exception occured when calling Read method", K(ret), K(total_count_), K(returned_row_cnt), K(ex.what())); - LOG_USER_ERROR(OB_ODPS_ERROR, ex.what()); + LOG_TRACE("unexpected read error exception, going to retry read task", K(OB_ODPS_ERROR), K(total_count_), K(returned_row_cnt), K(state_), K(ret), K(ex.what())); + if (OB_FAIL(retry_read_task())) { + LOG_WARN("failed to retry read task", K(ret), K(state_)); + } } } } catch (const std::exception& ex) { @@ -916,29 +1032,22 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) state_.step_ = state_.count_; // goto get next task count = 0; } else if (0 == returned_row_cnt) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected returned_row_cnt", K(total_count_), K(returned_row_cnt), K(state_), K(ret)); + // do nothing + LOG_TRACE("expected result: already retried reading task successfully", K(total_count_), K(returned_row_cnt), K(state_), K(ret)); } else { + int64_t data_idx = 0; for (int64_t column_idx = 0; OB_SUCC(ret) && column_idx < target_column_id_list_.count(); ++column_idx) { uint32_t target_idx = target_column_id_list_.at(column_idx); ObExpr &expr = *file_column_exprs.at(column_idx); ObDatum *datums = expr.locate_batch_datums(ctx); ObObjType type = expr.obj_meta_.get_type(); if (expr.type_ == T_PSEUDO_PARTITION_LIST_COL) { - for (int64_t row_idx = 0; OB_SUCC(ret) && row_idx < returned_row_cnt; ++row_idx) { - int64_t loc_idx = file_column_exprs.at(column_idx)->extra_ - 1; - if (OB_UNLIKELY(loc_idx < 0 || loc_idx >= state_.part_list_val_.get_count())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexcepted loc_idx", K(ret), K(loc_idx), K(state_.part_list_val_.get_count()), KP(&state_.part_list_val_)); - } else if (state_.part_list_val_.get_cell(loc_idx).is_null()) { - datums[row_idx].set_null(); - } else { - CK (OB_NOT_NULL(datums[row_idx].ptr_)); - OZ (datums[row_idx].from_obj(state_.part_list_val_.get_cell(loc_idx))); - } + if (OB_FAIL(fill_partition_list_data(expr, returned_row_cnt))) { + LOG_WARN("failed to fill partition list data", K(ret)); } } else { apsara::odps::sdk::ODPSColumnType odps_type = column_list_.at(target_idx).type_info_.mType; + target_idx = data_idx++; try { switch(odps_type) { @@ -1489,26 +1598,28 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) } } } - ObEvalCtx::BatchInfoScopeGuard batch_info_guard(ctx); - batch_info_guard.set_batch_idx(0); - for (int i = 0; OB_SUCC(ret) && i < file_column_exprs.count(); i++) { - file_column_exprs.at(i)->set_evaluated_flag(ctx); - } - for (int i = 0; OB_SUCC(ret) && i < column_exprs_.count(); i++) { - ObExpr *column_expr = column_exprs_.at(i); - ObExpr *column_convert_expr = scan_param_->ext_column_convert_exprs_->at(i); - OZ (column_convert_expr->eval_batch(ctx, *bit_vector_cache_, returned_row_cnt)); - if (OB_SUCC(ret)) { - MEMCPY(column_expr->locate_batch_datums(ctx), - column_convert_expr->locate_batch_datums(ctx), sizeof(ObDatum) * returned_row_cnt); - column_expr->set_evaluated_flag(ctx); - } - } if (OB_SUCC(ret)) { count = returned_row_cnt; } } } + if (OB_SUCC(ret)) { + ObEvalCtx::BatchInfoScopeGuard batch_info_guard(ctx); + batch_info_guard.set_batch_idx(0); + for (int i = 0; OB_SUCC(ret) && i < file_column_exprs.count(); i++) { + file_column_exprs.at(i)->set_evaluated_flag(ctx); + } + for (int i = 0; OB_SUCC(ret) && i < column_exprs_.count(); i++) { + ObExpr *column_expr = column_exprs_.at(i); + ObExpr *column_convert_expr = scan_param_->ext_column_convert_exprs_->at(i); + OZ (column_convert_expr->eval_batch(ctx, *bit_vector_cache_, count)); + if (OB_SUCC(ret)) { + MEMCPY(column_expr->locate_batch_datums(ctx), + column_convert_expr->locate_batch_datums(ctx), sizeof(ObDatum) * count); + column_expr->set_evaluated_flag(ctx); + } + } + } return ret; } @@ -1520,9 +1631,12 @@ int ObODPSTableRowIterator::get_next_row() LOG_WARN("get next task failed", K(ret)); } } else { - if (OB_FAIL(inner_get_next_row())) { - LOG_WARN("failed to get next row inner", K(ret)); - } + bool need_retry = false; + do { + if (OB_FAIL(inner_get_next_row(need_retry))) { + LOG_WARN("failed to get next row inner", K(ret)); + } + } while (OB_SUCC(ret) && need_retry); } while(OB_SUCC(ret) && get_next_task_) { // used to get next task which has data need to fetch if (state_.count_ >= state_.step_ && OB_FAIL(next_task())) { @@ -1530,33 +1644,47 @@ int ObODPSTableRowIterator::get_next_row() LOG_WARN("get next task failed", K(ret)); } } else { - if (OB_FAIL(inner_get_next_row())) { - LOG_WARN("failed to get next row inner", K(ret)); - } + bool need_retry = false; + do { + if (OB_FAIL(inner_get_next_row(need_retry))) { + LOG_WARN("failed to get next row inner", K(ret)); + } + } while (OB_SUCC(ret) && need_retry); } } return ret; } -int ObODPSTableRowIterator::inner_get_next_row() +int ObODPSTableRowIterator::inner_get_next_row(bool &need_retry) { int ret = OB_SUCCESS; ObMallocHookAttrGuard guard(mem_attr_); ObEvalCtx &ctx = scan_param_->op_->get_eval_ctx(); const ExprFixedArray &file_column_exprs = *(scan_param_->ext_file_column_exprs_); get_next_task_ = false; + need_retry = false; try { - if (!(state_.record_reader_handle_->Read(*record_))) { + if (OB_ISNULL(state_.record_reader_handle_.get()) || !file_column_exprs.count()) { if (INT64_MAX == state_.step_ || state_.count_ == state_.step_) { get_next_task_ = true; // goto get next task state_.step_ = state_.count_; } else { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected end", K(total_count_), K(state_), K(ret)); + ++state_.count_; + ++total_count_; } } else { - ++state_.count_; - ++total_count_; + if (!(state_.record_reader_handle_->Read(*record_))) { + if (INT64_MAX == state_.step_ || state_.count_ == state_.step_) { + get_next_task_ = true; // goto get next task + state_.step_ = state_.count_; + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected end", K(total_count_), K(state_), K(ret)); + } + } else { + ++state_.count_; + ++total_count_; + } } } catch (apsara::odps::sdk::OdpsTunnelException& ex) { if (OB_SUCC(ret)) { @@ -1567,13 +1695,20 @@ int ObODPSTableRowIterator::inner_get_next_row() get_next_task_ = true; // goto get next task state_.step_ = state_.count_; } else { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected end", K(total_count_), K(state_), K(ret)); + LOG_TRACE("unexpected end, going to retry read task", K(total_count_), K(state_), K(ret)); + if (OB_FAIL(retry_read_task())) { + LOG_WARN("failed to retry read task", K(ret), K(state_)); + } else { + need_retry = true; + } } } else { - ret = OB_ODPS_ERROR; - LOG_WARN("odps exception occured when calling Read or Close method", K(ret), K(total_count_), K(ex.what())); - LOG_USER_ERROR(OB_ODPS_ERROR, ex.what()); + LOG_WARN("odps exception occured when calling Read or Close method, going to retry read task", K(OB_ODPS_ERROR), K(ret), K(total_count_), K(ex.what())); + if (OB_FAIL(retry_read_task())) { + LOG_WARN("failed to retry read task", K(ret), K(state_)); + } else { + need_retry = true; + } } } } catch (const std::exception& ex) { @@ -1593,6 +1728,7 @@ int ObODPSTableRowIterator::inner_get_next_row() } else if (get_next_task_) { // do nothing } else { + int64_t data_idx = 0; for (int64_t column_idx = 0; OB_SUCC(ret) && column_idx < target_column_id_list_.count(); ++column_idx) { uint32_t target_idx = target_column_id_list_.at(column_idx); ObExpr &expr = *file_column_exprs.at(column_idx); // do not check null ptr @@ -1611,6 +1747,7 @@ int ObODPSTableRowIterator::inner_get_next_row() } } else { apsara::odps::sdk::ODPSColumnType odps_type = column_list_.at(target_idx).type_info_.mType; + target_idx = data_idx++; try { switch(odps_type) { @@ -2118,6 +2255,8 @@ int ObOdpsPartitionDownloaderMgr::init_downloader(common::ObArray( arena_alloc_.alloc(sizeof(OdpsPartitionDownloader))))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -2143,6 +2282,79 @@ int ObOdpsPartitionDownloaderMgr::init_downloader(common::ObArray &external_table_files, + const ObString &properties, + bool &use_partition_gi) +{ + int ret = OB_SUCCESS; + sql::ObExternalFileFormat external_odps_format; + ObODPSTableRowIterator odps_driver; + common::ObArenaAllocator arena_alloc; + use_partition_gi = false; + int64_t uncollect_statistics_part_cnt = 0; + omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); + int64_t max_parttition_count_to_collect_statistic = 5; + if (OB_LIKELY(tenant_config.is_valid())) { + max_parttition_count_to_collect_statistic = tenant_config->_max_partition_count_to_collect_statistic; + } + for (int64_t i = 0; i < external_table_files.count(); ++i) { + if (external_table_files.at(i).file_size_ < 0 && ++uncollect_statistics_part_cnt > max_parttition_count_to_collect_statistic) { + break; + } + } + if (uncollect_statistics_part_cnt > max_parttition_count_to_collect_statistic) { + use_partition_gi = true; + } else if (OB_FAIL(external_odps_format.load_from_string(properties, arena_alloc))) { + LOG_WARN("failed to init external_odps_format", K(ret)); + } else if (OB_FAIL(odps_driver.init_tunnel(external_odps_format.odps_format_))) { + LOG_WARN("failed to init tunnel", K(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < external_table_files.count(); ++i) { + const share::ObExternalFileInfo &odps_partition = external_table_files.at(i); + apsara::odps::sdk::IDownloadPtr odps_partition_downloader = NULL; + if (0 != odps_partition.file_id_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected file id", K(ret), K(i), K(odps_partition.file_id_), K(odps_partition.part_id_)); + } else if (odps_partition.file_size_ >= 0) { + // do nothing + } else if (OB_FAIL(odps_driver.create_downloader(odps_partition.file_url_, + odps_partition_downloader))) { + LOG_WARN("failed create odps partition downloader", K(ret), K(i), K(odps_partition.part_id_), K(odps_partition.file_url_)); + } else { + *(const_cast(&odps_partition.file_size_)) = odps_partition_downloader->GetRecordCount(); + odps_partition_downloader->Complete(); + } + } + } + return ret; +} + +int ObOdpsPartitionDownloaderMgr::fetch_row_count(const ObString part_spec, + const ObString &properties, + int64_t &row_count) +{ + int ret = OB_SUCCESS; + sql::ObExternalFileFormat external_odps_format; + ObODPSTableRowIterator odps_driver; + common::ObArenaAllocator arena_alloc; + row_count = 0; + if (OB_FAIL(external_odps_format.load_from_string(properties, arena_alloc))) { + LOG_WARN("failed to init external_odps_format", K(ret)); + } else if (OB_FAIL(odps_driver.init_tunnel(external_odps_format.odps_format_))) { + LOG_WARN("failed to init tunnel", K(ret)); + } else { + apsara::odps::sdk::IDownloadPtr odps_partition_downloader = NULL; + if (OB_FAIL(odps_driver.create_downloader(part_spec, odps_partition_downloader))) { + LOG_WARN("failed create odps partition downloader", K(ret), K(part_spec)); + } else { + row_count = odps_partition_downloader->GetRecordCount(); + odps_partition_downloader->Complete(); + } + } + return ret; +} + int ObOdpsPartitionDownloaderMgr::create_upload_session(const sql::ObODPSGeneralFormat &odps_format, const ObString &external_partition, bool is_overwrite, @@ -2351,7 +2563,7 @@ int ObOdpsPartitionDownloaderMgr::commit_upload() uint32_t task_count = static_cast(odps_mgr_map_.size()); OdpsUploader *uploader = NULL; try { - for (common::hash::ObHashMap::iterator iter = odps_mgr_map_.begin(); + for (OdpsMgrMap::iterator iter = odps_mgr_map_.begin(); OB_SUCC(ret) && iter != odps_mgr_map_.end(); iter++) { if (OB_ISNULL(uploader = reinterpret_cast(iter->second)) || OB_UNLIKELY(!uploader->record_writer_ || !uploader->upload_)) { @@ -2424,11 +2636,12 @@ int ObOdpsPartitionDownloaderMgr::OdpsPartitionDownloader::reset() { int ret = OB_SUCCESS; try { - if (OB_ISNULL(odps_partition_downloader_)) { + if (OB_ISNULL(odps_partition_downloader_.get())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null ptr", K(ret)); } else { odps_partition_downloader_->Complete(); + odps_partition_downloader_.reset(); } } catch (const apsara::odps::sdk::OdpsTunnelException& ex) { if (OB_SUCC(ret)) { diff --git a/src/sql/engine/table/ob_odps_table_row_iter.h b/src/sql/engine/table/ob_odps_table_row_iter.h index 18628fd56..0c6715629 100644 --- a/src/sql/engine/table/ob_odps_table_row_iter.h +++ b/src/sql/engine/table/ob_odps_table_row_iter.h @@ -46,7 +46,6 @@ public: K(step_), K(count_), K(is_from_gi_pump_), - K(ObString(part_spec_.c_str())), K(ObString(download_id_.c_str()))); int64_t task_idx_; int64_t part_id_; @@ -56,32 +55,23 @@ public: bool is_from_gi_pump_; apsara::odps::sdk::IDownloadPtr download_handle_; apsara::odps::sdk::IRecordReaderPtr record_reader_handle_; - std::string part_spec_; std::string download_id_; ObNewRow part_list_val_; }; struct OdpsPartition { OdpsPartition() : name_(""), - download_handle_(NULL), - download_id_(""), record_count_(-1) { } OdpsPartition(const std::string &name) : name_(name), - download_handle_(NULL), - download_id_(""), record_count_(-1) { } OdpsPartition(const std::string &name, - apsara::odps::sdk::IDownloadPtr download_handle, - const std::string download_id, int64_t &record_count) : name_(name), - download_handle_(download_handle), - download_id_(download_id), record_count_(record_count) { } @@ -91,8 +81,6 @@ public: int reset(); TO_STRING_KV(K(ObString(name_.c_str())), K(record_count_)); std::string name_; - apsara::odps::sdk::IDownloadPtr download_handle_; - std::string download_id_; int64_t record_count_; }; @@ -151,8 +139,8 @@ public: return common::OB_ERR_UNEXPECTED; } virtual void reset() override; - int init_tunnel(const sql::ObODPSGeneralFormat &odps_format); - int create_downloader(ObString &part_spec, apsara::odps::sdk::IDownloadPtr &downloader); + int init_tunnel(const sql::ObODPSGeneralFormat &odps_format, bool need_decrypt = true); + int create_downloader(const ObString &part_spec, apsara::odps::sdk::IDownloadPtr &downloader); int pull_partition_info(); inline ObIArray& get_partition_info() { return partition_list_; } inline bool is_part_table() { return is_part_table_; } @@ -165,7 +153,7 @@ public: const int32_t ob_type_precision, const int32_t ob_type_scale); private: - int inner_get_next_row(); + int inner_get_next_row(bool &need_retry); int prepare_expr(); int pull_column(); int next_task(); @@ -186,6 +174,8 @@ private: } return is_valid; } + int fill_partition_list_data(ObExpr &expr, int64_t returned_row_cnt); + int retry_read_task(); private: ObODPSGeneralFormat odps_format_; apsara::odps::sdk::Account account_; @@ -212,6 +202,7 @@ private: class ObOdpsPartitionDownloaderMgr { public: + typedef common::hash::ObHashMap OdpsMgrMap; struct OdpsPartitionDownloader { OdpsPartitionDownloader() : odps_driver_(), @@ -241,12 +232,34 @@ public: apsara::odps::sdk::IRecordWriterPtr record_writer_; }; ObOdpsPartitionDownloaderMgr() : inited_(false), is_download_(true), ref_(0), need_commit_(true) {} + OB_INLINE int init_map(int64_t bucket_size) { + int ret = OB_SUCCESS; + if (!odps_mgr_map_.created()){ + ret = odps_mgr_map_.create(bucket_size, "OdpsTable","OdpsTableReader"); + if (OB_SUCC(ret)) { + inited_ = true; + is_download_ = true; + } + } + return ret; + } + OdpsMgrMap &get_odps_map() { + return odps_mgr_map_; + } + OB_INLINE ObIAllocator &get_allocator() { return arena_alloc_; } int init_downloader(common::ObArray &external_table_files, const ObString &properties); int init_uploader(const ObString &properties, const ObString &external_partition, bool is_overwrite, int64_t parallel); + static int fetch_row_count(uint64_t tenant_id, + const ObIArray &external_table_files, + const ObString &properties, + bool &use_partition_gi); + static int fetch_row_count(const ObString part_spec, + const ObString &properties, + int64_t &row_count); static int create_upload_session(const sql::ObODPSGeneralFormat &odps_format, const ObString &external_partition, bool is_overwrite, @@ -273,7 +286,7 @@ public: private: bool inited_; bool is_download_; - common::hash::ObHashMap odps_mgr_map_; + OdpsMgrMap odps_mgr_map_; common::ObArenaAllocator arena_alloc_; int64_t ref_; bool need_commit_; diff --git a/src/sql/engine/table/ob_table_scan_op.cpp b/src/sql/engine/table/ob_table_scan_op.cpp index 08de0d560..8a238297d 100644 --- a/src/sql/engine/table/ob_table_scan_op.cpp +++ b/src/sql/engine/table/ob_table_scan_op.cpp @@ -1397,6 +1397,7 @@ int ObTableScanOp::prepare_single_scan_range(int64_t group_idx) } else if (MY_CTDEF.scan_ctdef_.is_external_table_) { uint64_t table_loc_id = MY_SPEC.get_table_loc_id(); ObDASTableLoc *tab_loc = DAS_CTX(ctx_).get_table_loc_by_id(table_loc_id, MY_CTDEF.scan_ctdef_.ref_table_id_); + const ObString &table_format_or_properties = MY_CTDEF.scan_ctdef_.external_file_format_str_.str_; ObArray partition_ids; if (OB_ISNULL(tab_loc)) { ret = OB_ERR_UNEXPECTED; @@ -1411,6 +1412,7 @@ int ObTableScanOp::prepare_single_scan_range(int64_t group_idx) } else if (OB_FAIL(ObExternalTableUtils::prepare_single_scan_range( ctx_.get_my_session()->get_effective_tenant_id(), MY_CTDEF.scan_ctdef_.ref_table_id_, + table_format_or_properties, partition_ids, key_ranges, range_allocator, diff --git a/src/sql/ob_sql_utils.cpp b/src/sql/ob_sql_utils.cpp index be2249820..1ab1b59b6 100644 --- a/src/sql/ob_sql_utils.cpp +++ b/src/sql/ob_sql_utils.cpp @@ -1464,17 +1464,97 @@ int ObSQLUtils::extract_odps_part_spec(const ObString &all_part_spec, ObIArrayget_tenant_schema_guard(tenant_id, schema_guard)); + OZ (schema_guard.get_table_schema(tenant_id, table_id, table_schema)); + if (OB_FAIL(ret)) { + } else if (OB_FAIL(get_external_table_type(table_schema, type))) { + LOG_WARN("failed to get external table type", K(tenant_id), K(table_id), KP(table_schema), K(ret)); + } + return ret; +} + +int ObSQLUtils::get_external_table_type(const ObTableSchema *table_schema, ObExternalFileFormat::FormatType &type) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null ptr", K(ret)); } else { - is_odps = ObExternalFileFormat::FormatType::ODPS_FORMAT == format.format_type_; + ObExternalFileFormat format; + ObArenaAllocator allocator; + ObString table_format_or_properties = table_schema->get_external_file_format().empty() ? + table_schema->get_external_properties(): table_schema->get_external_file_format(); + if (OB_FAIL(get_external_table_type(table_format_or_properties, type))) { + LOG_WARN("failed to get external table type", K(ret), K(table_format_or_properties)); + } + } + return ret; +} + +int ObSQLUtils::get_external_table_type(const ObString &table_format_or_properties, + ObExternalFileFormat::FormatType &type) { + int ret = OB_SUCCESS; + ObExternalFileFormat format; + ObArenaAllocator allocator; + if (table_format_or_properties.empty()) { + } else if (OB_FAIL(format.load_from_string(table_format_or_properties, allocator))) { + LOG_WARN("fail to load from properties string", K(ret), K(table_format_or_properties)); + } else { + type = format.format_type_; + } + return ret; +} + + +int ObSQLUtils::is_odps_external_table(const uint64_t tenant_id, + const uint64_t table_id, + bool &is_odps_external_table) +{ + int ret = OB_SUCCESS; + is_odps_external_table = false; + ObExternalFileFormat::FormatType external_table_type; + if (OB_FAIL(ObSQLUtils::get_external_table_type(tenant_id, table_id, external_table_type))) { + LOG_WARN("failed to get external table type", K(ret)); + } else { + is_odps_external_table = (ObExternalFileFormat::FormatType:: ODPS_FORMAT == external_table_type); + } + return ret; +} + +int ObSQLUtils::is_odps_external_table(const ObTableSchema *table_schema, + bool &is_odps_external_table) +{ + int ret = OB_SUCCESS; + is_odps_external_table = false; + ObExternalFileFormat::FormatType external_table_type; + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null ptr", K(ret)); + } else if (OB_FAIL(ObSQLUtils::get_external_table_type(table_schema, external_table_type))) { + LOG_WARN("failed to get external table type", K(ret)); + } else { + is_odps_external_table = (ObExternalFileFormat::FormatType:: ODPS_FORMAT == external_table_type); + } + return ret; +} + +int ObSQLUtils::is_odps_external_table(const ObString &table_format_or_properties, + bool &is_odps_external_table) +{ + int ret = OB_SUCCESS; + is_odps_external_table = false; + ObExternalFileFormat::FormatType external_table_type; + if (OB_FAIL(ObSQLUtils::get_external_table_type(table_format_or_properties, external_table_type))) { + LOG_WARN("failed to get external table type", K(ret)); + } else { + is_odps_external_table = (ObExternalFileFormat::FormatType:: ODPS_FORMAT == external_table_type); } return ret; } diff --git a/src/sql/ob_sql_utils.h b/src/sql/ob_sql_utils.h index aaf38d268..17b74c2ad 100644 --- a/src/sql/ob_sql_utils.h +++ b/src/sql/ob_sql_utils.h @@ -32,6 +32,8 @@ #include "sql/engine/expr/ob_expr_frame_info.h" #include "sql/monitor/flt/ob_flt_span_mgr.h" #include "share/ob_compatibility_control.h" +#include "sql/engine/cmd/ob_load_data_parser.h" + namespace oceanbase { namespace share { @@ -751,8 +753,21 @@ public: static int64_t combine_server_id(int64_t ts, uint64_t server_id) { return (ts & ((1LL << 43) - 1LL)) | ((server_id & 0xFFFF) << 48); } + static int get_external_table_type(const uint64_t tenant_id, + const uint64_t table_id, + ObExternalFileFormat::FormatType &type); + static int get_external_table_type(const ObTableSchema *table_schema, + ObExternalFileFormat::FormatType &type); + static int get_external_table_type(const ObString &table_format_or_properties, + ObExternalFileFormat::FormatType &type); + static int is_odps_external_table(const uint64_t tenant_id, + const uint64_t table_id, + bool &is_odps_external_table); + static int is_odps_external_table(const ObTableSchema *table_schema, + bool &is_odps_external_table); + static int is_odps_external_table(const ObString &table_format_or_properties, + bool &is_odps_external_table); static int extract_odps_part_spec(const ObString &all_part_spec, ObIArray &part_spec_list); - static int is_external_odps_table(const ObString &properties, ObIAllocator &allocator, bool &is_odps); static int check_ident_name(const common::ObCollationType cs_type, common::ObString &name, const bool check_for_path_char, const int64_t max_ident_len); diff --git a/src/sql/optimizer/ob_insert_log_plan.cpp b/src/sql/optimizer/ob_insert_log_plan.cpp index 496bfa5fd..cbef57281 100644 --- a/src/sql/optimizer/ob_insert_log_plan.cpp +++ b/src/sql/optimizer/ob_insert_log_plan.cpp @@ -1814,18 +1814,18 @@ int ObInsertLogPlan::allocate_select_into_as_top_for_insert(ObLogicalOperator *& LOG_WARN("allocate memory for ObLogSelectInto failed", K(ret)); } else { ObString external_properties; - const ObString &format_or_properties = table_schema->get_external_file_format().empty() + const ObString &table_format_or_properties = table_schema->get_external_file_format().empty() ? table_schema->get_external_properties() : table_schema->get_external_file_format(); const ObInsertTableInfo& table_info = stmt->get_insert_table_info(); - if (format_or_properties.empty()) { + if (table_format_or_properties.empty()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("external properties is empty", K(ret)); } else if (table_schema->get_external_properties().empty()) { //目前只支持写odps外表 其他类型暂不支持 ret = OB_NOT_SUPPORTED; LOG_WARN("not support to insert into external table which is not in odps", K(ret)); LOG_USER_ERROR(OB_NOT_SUPPORTED, "insert into external table which is not in odps"); - } else if (OB_FAIL(ob_write_string(get_allocator(), format_or_properties, external_properties))) { + } else if (OB_FAIL(ob_write_string(get_allocator(), table_format_or_properties, external_properties))) { LOG_WARN("failed to append string", K(ret)); } else if (OB_FAIL(select_into->get_select_exprs().assign(table_info.column_conv_exprs_))) { LOG_WARN("failed to get select exprs", K(ret)); diff --git a/src/sql/parser/non_reserved_keywords_mysql_mode.c b/src/sql/parser/non_reserved_keywords_mysql_mode.c index a51e65c53..7180a96cb 100644 --- a/src/sql/parser/non_reserved_keywords_mysql_mode.c +++ b/src/sql/parser/non_reserved_keywords_mysql_mode.c @@ -615,6 +615,8 @@ static const NonReservedKeyword Mysql_none_reserved_keywords[] = {"accesskey", ACCESSKEY}, {"accesstype", ACCESSTYPE}, {"endpoint", ENDPOINT}, + {"tunnel_endpoint", TUNNEL_ENDPOINT}, + {"collect_statistics_on_create", COLLECT_STATISTICS_ON_CREATE}, {"project_name", PROJECT_NAME}, {"quota_name", QUOTA_NAME}, {"compression_code", COMPRESSION_CODE}, diff --git a/src/sql/parser/sql_parser_mysql_mode.y b/src/sql/parser/sql_parser_mysql_mode.y index b40772e5b..fafeed49d 100644 --- a/src/sql/parser/sql_parser_mysql_mode.y +++ b/src/sql/parser/sql_parser_mysql_mode.y @@ -279,7 +279,7 @@ END_P SET_VAR DELIMITER CACHE CALIBRATION CALIBRATION_INFO CANCEL CASCADED CAST CATALOG_NAME CHAIN CHANGED CHARSET CHECKSUM CHECKPOINT CHUNK CIPHER CLASS_ORIGIN CLEAN CLEAR CLIENT CLONE CLOG CLOSE CLUSTER CLUSTER_ID CLUSTER_NAME COALESCE COLUMN_BLOOM_FILTER COLUMN_STAT - CODE COLLATION COLUMN_FORMAT COLUMN_NAME COLUMNS COMMENT COMMIT COMMITTED COMPACT COMPLETION COMPLETE + CODE COLLATION COLLECT_STATISTICS_ON_CREATE COLUMN_FORMAT COLUMN_NAME COLUMNS COMMENT COMMIT COMMITTED COMPACT COMPLETION COMPLETE COMPRESSED COMPRESSION COMPRESSION_BLOCK_SIZE COMPRESSION_CODE COMPUTATION COMPUTE CONCURRENT CONDENSED CONDITIONAL CONNECTION CONSISTENT CONSISTENT_MODE CONSTRAINT_CATALOG CONSTRAINT_NAME CONSTRAINT_SCHEMA CONTAINS CONTEXT CONTRIBUTORS COPY COSINE COUNT CPU CREATE_TIMESTAMP CTXCAT CTX_ID CUBE CURDATE CURRENT STACKED CURTIME CURSOR_NAME CUME_DIST CYCLE CALC_PARTITION_ID CONNECT @@ -366,7 +366,7 @@ END_P SET_VAR DELIMITER TEMPLATE TEMPORARY TEMPTABLE TENANT TEXT THAN TIME TIMESTAMP TIMESTAMPADD TIMESTAMPDIFF TP_NO TP_NAME TRACE TRADITIONAL TRANSACTION TRIGGERS TRIM TRUNCATE TYPE TYPES TASK TABLET_SIZE TABLEGROUP_ID TENANT_ID THROTTLE TIME_ZONE_INFO TOP_K_FRE_HIST TIMES TRIM_SPACE TTL - TRANSFER TENANT_STS_CREDENTIAL + TRANSFER TUNNEL_ENDPOINT TENANT_STS_CREDENTIAL UNCOMMITTED UNCONDITIONAL UNDEFINED UNDO_BUFFER_SIZE UNDOFILE UNNEST UNICODE UNINSTALL UNIT UNIT_GROUP UNIT_NUM UNLOCKED UNTIL UNUSUAL UPGRADE USE_BLOOM_FILTER UNKNOWN USE_FRM USER USER_RESOURCES UNBOUNDED UP UNLIMITED USER_SPECIFIED @@ -8747,6 +8747,10 @@ TYPE COMP_EQ STRING_VALUE { malloc_non_terminal_node($$, result->malloc_pool_, T_ENDPOINT, 1, $3); } +| TUNNEL_ENDPOINT COMP_EQ STRING_VALUE +{ + malloc_non_terminal_node($$, result->malloc_pool_, T_TUNNEL_ENDPOINT, 1, $3); +} | PROJECT_NAME COMP_EQ STRING_VALUE { malloc_non_terminal_node($$, result->malloc_pool_, T_PROJECT, 1, $3); @@ -8767,7 +8771,12 @@ TYPE COMP_EQ STRING_VALUE | TABLE_NAME COMP_EQ STRING_VALUE { malloc_non_terminal_node($$, result->malloc_pool_, T_TABLE, 1, $3); -}; +} +| COLLECT_STATISTICS_ON_CREATE COMP_EQ BOOL_VALUE +{ + malloc_non_terminal_node($$, result->malloc_pool_, T_COLLECT_STATISTICS_ON_CREATE, 1, $3); +} +; compression_key: COMPRESSION_CODE @@ -24028,6 +24037,7 @@ ACCESS_INFO | CLUSTER_NAME | COALESCE | CODE +| COLLECT_STATISTICS_ON_CREATE | COLLATION | COLUMN_BLOOM_FILTER | COLUMN_FORMAT @@ -24650,6 +24660,7 @@ ACCESS_INFO | TRIM_SPACE | TRUNCATE | TTL +| TUNNEL_ENDPOINT | TYPE | TYPES | TABLEGROUP_ID diff --git a/src/sql/resolver/ddl/ob_create_table_resolver.cpp b/src/sql/resolver/ddl/ob_create_table_resolver.cpp index eacd066aa..979325f49 100644 --- a/src/sql/resolver/ddl/ob_create_table_resolver.cpp +++ b/src/sql/resolver/ddl/ob_create_table_resolver.cpp @@ -1068,7 +1068,12 @@ int ObCreateTableResolver::check_external_table_generated_partition_column_sanit { int ret = OB_SUCCESS; ObArray col_exprs; - if (OB_ISNULL(dependant_expr)) { + bool is_odps_external_table = false; + if (OB_FAIL(ObSQLUtils::is_odps_external_table(&table_schema, is_odps_external_table))) { + LOG_WARN("failed to check is odps external table or not", K(ret)); + } else if (is_odps_external_table) { + // do nothing + } else if (OB_ISNULL(dependant_expr)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("dependant expr is null", K(ret)); } else if (OB_FAIL(ObRawExprUtils::extract_column_exprs(dependant_expr, col_exprs, true/*extract pseudo column*/))) { @@ -1105,8 +1110,6 @@ int ObCreateTableResolver::check_external_table_generated_partition_column_sanit LOG_WARN("user specified partition col expr contains no external partition pseudo column is not supported", K(ret)); } } - } else if (table_schema.is_odps_external_table()) { - // lcqlog to do check } else { bool found = false; for (int64_t i = 0; OB_SUCC(ret) && i < col_exprs.count(); i++) { diff --git a/src/sql/resolver/ddl/ob_ddl_resolver.cpp b/src/sql/resolver/ddl/ob_ddl_resolver.cpp index 562e7e17a..779c5cecd 100644 --- a/src/sql/resolver/ddl/ob_ddl_resolver.cpp +++ b/src/sql/resolver/ddl/ob_ddl_resolver.cpp @@ -3089,6 +3089,7 @@ int ObDDLResolver::mask_properties_sensitive_info(const ParseNode *node, ObStrin } else { switch (node->type_) { case ObItemType::T_ENDPOINT: + case ObItemType::T_TUNNEL_ENDPOINT: case ObItemType::T_STSTOKEN: case ObItemType::T_ACCESSKEY: case ObItemType::T_ACCESSID: { diff --git a/src/sql/resolver/dml/ob_dml_resolver.cpp b/src/sql/resolver/dml/ob_dml_resolver.cpp index c43343791..01b4ab779 100755 --- a/src/sql/resolver/dml/ob_dml_resolver.cpp +++ b/src/sql/resolver/dml/ob_dml_resolver.cpp @@ -8684,10 +8684,10 @@ int ObDMLResolver::resolve_external_table_generated_column( } } else { ObExternalFileFormat format; - const ObString &format_or_properties = table_schema->get_external_file_format().empty() ? + const ObString &table_format_or_properties = table_schema->get_external_file_format().empty() ? table_schema->get_external_properties() : table_schema->get_external_file_format(); - if (OB_FAIL(format.load_from_string(format_or_properties, *params_.allocator_))) { + if (OB_FAIL(format.load_from_string(table_format_or_properties, *params_.allocator_))) { LOG_WARN("load from string failed", K(ret)); } else if (format.format_type_ == ObExternalFileFormat::ORC_FORMAT && lib::is_oracle_mode()) { ret = OB_NOT_SUPPORTED; diff --git a/src/sql/resolver/ob_resolver_utils.cpp b/src/sql/resolver/ob_resolver_utils.cpp index 7e18297d3..6dbfb6c6c 100644 --- a/src/sql/resolver/ob_resolver_utils.cpp +++ b/src/sql/resolver/ob_resolver_utils.cpp @@ -9698,6 +9698,14 @@ int ObResolverUtils::resolve_file_format(const ParseNode *node, ObExternalFileFo format.odps_format_.endpoint_ = ObString(node->children_[0]->str_len_, node->children_[0]->str_value_).trim_space_only(); break; } + case ObItemType::T_TUNNEL_ENDPOINT: { + format.odps_format_.tunnel_endpoint_ = ObString(node->children_[0]->str_len_, node->children_[0]->str_value_).trim_space_only(); + break; + } + case ObItemType::T_COLLECT_STATISTICS_ON_CREATE: { + format.odps_format_.collect_statistics_on_create_ = node->children_[0]->value_; + break; + } case T_PROJECT: { format.odps_format_.project_ = ObString(node->children_[0]->str_len_, node->children_[0]->str_value_).trim_space_only(); break; diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index 4115e2db5..a36873efe 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -318,6 +318,7 @@ _datafile_usage_upper_bound_percentage _data_storage_io_timeout _delay_resource_recycle_after_correctness_issue _display_mysql_version +_dop_of_collect_external_table_statistics _enable_active_txn_transfer _enable_adaptive_auto_dop _enable_adaptive_compaction @@ -416,6 +417,7 @@ _ls_migration_wait_completing_timeout _max_elr_dependent_trx_count _max_ls_cnt_per_server _max_malloc_sample_interval +_max_partition_count_to_collect_statistic _max_rpc_packet_size _max_schema_slot_num _max_tablet_cnt_per_gb