diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index 69cd9a5797..28f0134207 100755 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -3120,7 +3120,7 @@ int ObRootService::create_table(const ObCreateTableArg &arg, ObCreateTableRes &r } } RS_TRACE(generate_schema_lob); - if (OB_FAIL(ret) || table_schema.is_view_table()) { + if (OB_FAIL(ret) || table_schema.is_view_table() || table_schema.is_external_table()) { // do nothing } else if (OB_FAIL(ddl_service_.build_aux_lob_table_schema_if_need(table_schema, table_schemas))) { LOG_WARN("fail to build_aux_lob_table_schema_if_need", K(ret), K(table_schema)); diff --git a/src/share/external_table/ob_external_table_file_mgr.cpp b/src/share/external_table/ob_external_table_file_mgr.cpp index 4aa73322bc..5daeab97c5 100644 --- a/src/share/external_table/ob_external_table_file_mgr.cpp +++ b/src/share/external_table/ob_external_table_file_mgr.cpp @@ -64,7 +64,8 @@ int ObExternalTableFilesKey::deep_copy(char *buf, const int64_t buf_len, ObIKVCa int64_t ObExternalTableFiles::size() const { - int64_t size = sizeof(*this) + sizeof(ObString) * file_urls_.count() + sizeof(int64_t) * file_ids_.count(); + int64_t size = sizeof(*this) + sizeof(ObString) * file_urls_.count() + + sizeof(int64_t) * file_ids_.count() + sizeof(int64_t) * file_sizes_.count(); for (int i = 0; i < file_urls_.count(); ++i) { size += file_urls_.at(i).length(); } @@ -99,6 +100,15 @@ int ObExternalTableFiles::deep_copy(char *buf, const int64_t buf_len, ObIKVCache sizeof(int64_t) * this->file_ids_.count()); } } + + if (OB_SUCC(ret) && this->file_sizes_.count() > 0) { + if (OB_FAIL(new_value->file_sizes_.allocate_array(allocator, this->file_sizes_.count()))) { + LOG_WARN("fail to allocate array", K(ret)); + } else { + MEMCPY(new_value->file_sizes_.get_data(), this->file_sizes_.get_data(), + sizeof(int64_t) * this->file_sizes_.count()); + } + } if (OB_SUCC(ret)) { new_value->create_ts_ = this->create_ts_; } @@ -201,6 +211,7 @@ int ObExternalTableFileManager::get_external_files_by_part_id( ObExternalFileInfo file_info; ObString file_url = ext_files->file_urls_.at(i); file_info.file_id_ = ext_files->file_ids_.at(i); + file_info.file_size_ = ext_files->file_sizes_.at(i); if (is_local_file_on_disk) { ObString ip_port = file_url.split_on('%'); OZ (file_info.file_addr_.parse_from_string(ip_port)); @@ -431,7 +442,7 @@ int ObExternalTableFileManager::fill_cache_from_inner_table( ret = OB_ERR_UNEXPECTED; } - OZ (sql.append_fmt("SELECT file_url, file_id FROM %s" + OZ (sql.append_fmt("SELECT file_url, file_id, file_size FROM %s" " WHERE table_id = %lu AND part_id = %lu" " AND create_version <=%ld AND %ld < delete_version", OB_ALL_EXTERNAL_TABLE_FILE_TNAME, key.table_id_, key.partition_id_, @@ -445,16 +456,20 @@ int ObExternalTableFileManager::fill_cache_from_inner_table( } else { ObSEArray temp_file_urls; ObSEArray temp_file_ids; + ObSEArray temp_file_sizes; ObArenaAllocator allocator; while (OB_SUCC(result->next())) { ObString file_url; ObString tmp_url; int64_t file_id = INT64_MAX; + int64_t file_size = 0; EXTRACT_VARCHAR_FIELD_MYSQL(*result, "file_url", tmp_url); EXTRACT_INT_FIELD_MYSQL(*result, "file_id", file_id, int64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "file_size", file_size, int64_t); OZ (ob_write_string(allocator, tmp_url, file_url)); OZ (temp_file_urls.push_back(file_url)); OZ (temp_file_ids.push_back(file_id)); + OZ (temp_file_sizes.push_back(file_size)); } if (OB_FAIL(ret) && OB_ITER_END != ret) { LOG_WARN("get next result failed", K(ret)); @@ -466,6 +481,7 @@ int ObExternalTableFileManager::fill_cache_from_inner_table( temp_ext_files.create_ts_ = cur_time; temp_ext_files.file_urls_ = ObArrayWrap(temp_file_urls.get_data(), temp_file_urls.count()); temp_ext_files.file_ids_ = ObArrayWrap(temp_file_ids.get_data(), temp_file_ids.count()); + temp_ext_files.file_sizes_ = ObArrayWrap(temp_file_sizes.get_data(), temp_file_sizes.count()); OZ (kv_cache_.put_and_fetch(key, temp_ext_files, ext_files, handle, true)); } } @@ -507,7 +523,7 @@ int ObExternalTableFileManager::lock_for_refresh( return ret; } -OB_SERIALIZE_MEMBER(ObExternalFileInfo, file_url_, file_id_, file_addr_); +OB_SERIALIZE_MEMBER(ObExternalFileInfo, file_url_, file_id_, file_addr_, file_size_); } } diff --git a/src/share/external_table/ob_external_table_file_mgr.h b/src/share/external_table/ob_external_table_file_mgr.h index 2e8922e6a8..1897548611 100644 --- a/src/share/external_table/ob_external_table_file_mgr.h +++ b/src/share/external_table/ob_external_table_file_mgr.h @@ -21,11 +21,12 @@ namespace oceanbase { namespace share { struct ObExternalFileInfo { - ObExternalFileInfo() : file_id_(INT64_MAX) {} + ObExternalFileInfo() : file_id_(INT64_MAX), file_size_(0) {} common::ObString file_url_; int64_t file_id_; common::ObAddr file_addr_; - TO_STRING_KV(K_(file_url), K_(file_id), K_(file_addr)); + int64_t file_size_; + TO_STRING_KV(K_(file_url), K_(file_id), K_(file_addr), K_(file_size)); OB_UNIS_VERSION(1); }; @@ -67,6 +68,7 @@ public: public: ObArrayWrap file_urls_; ObArrayWrap file_ids_; + ObArrayWrap file_sizes_; int64_t create_ts_; }; diff --git a/src/share/external_table/ob_external_table_utils.cpp b/src/share/external_table/ob_external_table_utils.cpp index 3c9479845a..32ffbc1def 100644 --- a/src/share/external_table/ob_external_table_utils.cpp +++ b/src/share/external_table/ob_external_table_utils.cpp @@ -365,5 +365,62 @@ int ObExternalTableUtils::filter_external_table_files(const ObString &pattern, return ret; } +int ObExternalTableUtils::calc_assigned_files_to_sqcs( + const ObIArray &files, + ObIArray &assigned_idx, + int64_t sqc_count) +{ + int ret = OB_SUCCESS; + + struct SqcFileSet { + int64_t total_file_size_; + int64_t sqc_idx_; + TO_STRING_KV(K(total_file_size_), K(sqc_idx_)); + }; + + struct SqcFileSetCmp { + bool operator()(const SqcFileSet &l, const SqcFileSet &r) { + return l.total_file_size_ < r.total_file_size_; + } + int get_error_code() { return OB_SUCCESS; } + }; + + struct FileInfoWithIdx { + const ObExternalFileInfo *file_info_; + int64_t file_idx_; + TO_STRING_KV(K(file_idx_)); + }; + SqcFileSetCmp temp_cmp; + ObBinaryHeap heap(temp_cmp); + ObArray sorted_files; + OZ (sorted_files.reserve(files.count())); + OZ (assigned_idx.prepare_allocate(files.count())); + for (int64_t i = 0; OB_SUCC(ret) && i < files.count(); i++) { + FileInfoWithIdx file_info; + file_info.file_info_ = &(files.at(i)); + file_info.file_idx_ = i; + OZ (sorted_files.push_back(file_info)); + } + std::sort(sorted_files.begin(), sorted_files.end(), + [](const FileInfoWithIdx &l, const FileInfoWithIdx &r) -> bool { + return l.file_info_->file_size_ >= r.file_info_->file_size_; }); + for (int64_t i = 0; OB_SUCC(ret) && i < sqc_count; i++) { + SqcFileSet new_set; + new_set.total_file_size_ = sorted_files.at(i).file_info_->file_size_; + new_set.sqc_idx_ = i; + OZ (heap.push(new_set)); + assigned_idx.at(sorted_files.at(i).file_idx_) = i; + } + for (int64_t i = sqc_count; OB_SUCC(ret) && i < sorted_files.count(); i++) { + //assign file to set with the minimum total file size + SqcFileSet cur_min_set = heap.top(); + cur_min_set.total_file_size_ += sorted_files.at(i).file_info_->file_size_; + assigned_idx.at(sorted_files.at(i).file_idx_) = cur_min_set.sqc_idx_; + OZ (heap.pop()); + OZ (heap.push(cur_min_set)); + } + return ret; +} + } // namespace share } // namespace oceanbase diff --git a/src/share/external_table/ob_external_table_utils.h b/src/share/external_table/ob_external_table_utils.h index 43ae44bfee..0be33245c5 100644 --- a/src/share/external_table/ob_external_table_utils.h +++ b/src/share/external_table/ob_external_table_utils.h @@ -81,6 +81,10 @@ class ObExternalTableUtils { static int filter_external_table_files(const common::ObString &pattern, sql::ObExecContext &exec_ctx, common::ObIArray &file_urls); + static int calc_assigned_files_to_sqcs( + const common::ObIArray &files, + common::ObIArray &assigned_idx, + int64_t sqc_count); private: static bool is_left_edge(const common::ObObj &value); static bool is_right_edge(const common::ObObj &value); diff --git a/src/sql/engine/px/ob_px_util.cpp b/src/sql/engine/px/ob_px_util.cpp index 018a376eea..7d235c9365 100644 --- a/src/sql/engine/px/ob_px_util.cpp +++ b/src/sql/engine/px/ob_px_util.cpp @@ -244,12 +244,19 @@ int ObPXServerAddrUtil::assign_external_files_to_sqc( } } } else { - const int64_t file_count = files.count(); - int64_t files_per_sqc = (file_count - 1) / sqcs.count() + 1; - - for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); ++i) { - for (int j = i * files_per_sqc; OB_SUCC(ret) && j < std::min((i + 1) * files_per_sqc, file_count); j++) { - OZ (sqcs.at(i)->get_access_external_table_files().push_back(files.at(j))); + ObArray file_assigned_sqc_ids; + OZ (ObExternalTableUtils::calc_assigned_files_to_sqcs(files, file_assigned_sqc_ids, sqcs.count())); + if (OB_SUCC(ret) && file_assigned_sqc_ids.count() != files.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid result of assigned sqc", K(file_assigned_sqc_ids.count()), K(files.count())); + } + for (int i = 0; OB_SUCC(ret) && i < file_assigned_sqc_ids.count(); i++) { + int64_t assign_sqc_idx = file_assigned_sqc_ids.at(i); + if (OB_UNLIKELY(assign_sqc_idx >= sqcs.count() || assign_sqc_idx < 0)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected file idx", K(file_assigned_sqc_ids.at(i))); + } else { + OZ (sqcs.at(assign_sqc_idx)->get_access_external_table_files().push_back(files.at(i))); } } } diff --git a/src/sql/resolver/ddl/ob_create_table_resolver.cpp b/src/sql/resolver/ddl/ob_create_table_resolver.cpp index e665b2069e..f7ef0dde4f 100644 --- a/src/sql/resolver/ddl/ob_create_table_resolver.cpp +++ b/src/sql/resolver/ddl/ob_create_table_resolver.cpp @@ -2384,9 +2384,10 @@ int ObCreateTableResolver::set_table_option_to_schema(ObTableSchema &table_schem if (OB_SUCC(ret) && table_schema.is_external_table()) { if (table_schema.get_external_file_format().empty() - || table_schema.get_external_file_location().empty()) + || table_schema.get_external_file_location().empty()) { ret = OB_NOT_SUPPORTED; LOG_USER_ERROR(OB_NOT_SUPPORTED, "Default format or location option for external table"); + } } } return ret;