Refine task assignments for external table
This commit is contained in:
@ -64,7 +64,8 @@ int ObExternalTableFilesKey::deep_copy(char *buf, const int64_t buf_len, ObIKVCa
|
||||
|
||||
int64_t ObExternalTableFiles::size() const
|
||||
{
|
||||
int64_t size = sizeof(*this) + sizeof(ObString) * file_urls_.count() + sizeof(int64_t) * file_ids_.count();
|
||||
int64_t size = sizeof(*this) + sizeof(ObString) * file_urls_.count()
|
||||
+ sizeof(int64_t) * file_ids_.count() + sizeof(int64_t) * file_sizes_.count();
|
||||
for (int i = 0; i < file_urls_.count(); ++i) {
|
||||
size += file_urls_.at(i).length();
|
||||
}
|
||||
@ -99,6 +100,15 @@ int ObExternalTableFiles::deep_copy(char *buf, const int64_t buf_len, ObIKVCache
|
||||
sizeof(int64_t) * this->file_ids_.count());
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && this->file_sizes_.count() > 0) {
|
||||
if (OB_FAIL(new_value->file_sizes_.allocate_array(allocator, this->file_sizes_.count()))) {
|
||||
LOG_WARN("fail to allocate array", K(ret));
|
||||
} else {
|
||||
MEMCPY(new_value->file_sizes_.get_data(), this->file_sizes_.get_data(),
|
||||
sizeof(int64_t) * this->file_sizes_.count());
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
new_value->create_ts_ = this->create_ts_;
|
||||
}
|
||||
@ -201,6 +211,7 @@ int ObExternalTableFileManager::get_external_files_by_part_id(
|
||||
ObExternalFileInfo file_info;
|
||||
ObString file_url = ext_files->file_urls_.at(i);
|
||||
file_info.file_id_ = ext_files->file_ids_.at(i);
|
||||
file_info.file_size_ = ext_files->file_sizes_.at(i);
|
||||
if (is_local_file_on_disk) {
|
||||
ObString ip_port = file_url.split_on('%');
|
||||
OZ (file_info.file_addr_.parse_from_string(ip_port));
|
||||
@ -431,7 +442,7 @@ int ObExternalTableFileManager::fill_cache_from_inner_table(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
}
|
||||
|
||||
OZ (sql.append_fmt("SELECT file_url, file_id FROM %s"
|
||||
OZ (sql.append_fmt("SELECT file_url, file_id, file_size FROM %s"
|
||||
" WHERE table_id = %lu AND part_id = %lu"
|
||||
" AND create_version <=%ld AND %ld < delete_version",
|
||||
OB_ALL_EXTERNAL_TABLE_FILE_TNAME, key.table_id_, key.partition_id_,
|
||||
@ -445,16 +456,20 @@ int ObExternalTableFileManager::fill_cache_from_inner_table(
|
||||
} else {
|
||||
ObSEArray<ObString, 16> temp_file_urls;
|
||||
ObSEArray<int64_t, 16> temp_file_ids;
|
||||
ObSEArray<int64_t, 16> temp_file_sizes;
|
||||
ObArenaAllocator allocator;
|
||||
while (OB_SUCC(result->next())) {
|
||||
ObString file_url;
|
||||
ObString tmp_url;
|
||||
int64_t file_id = INT64_MAX;
|
||||
int64_t file_size = 0;
|
||||
EXTRACT_VARCHAR_FIELD_MYSQL(*result, "file_url", tmp_url);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "file_id", file_id, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "file_size", file_size, int64_t);
|
||||
OZ (ob_write_string(allocator, tmp_url, file_url));
|
||||
OZ (temp_file_urls.push_back(file_url));
|
||||
OZ (temp_file_ids.push_back(file_id));
|
||||
OZ (temp_file_sizes.push_back(file_size));
|
||||
}
|
||||
if (OB_FAIL(ret) && OB_ITER_END != ret) {
|
||||
LOG_WARN("get next result failed", K(ret));
|
||||
@ -466,6 +481,7 @@ int ObExternalTableFileManager::fill_cache_from_inner_table(
|
||||
temp_ext_files.create_ts_ = cur_time;
|
||||
temp_ext_files.file_urls_ = ObArrayWrap<ObString>(temp_file_urls.get_data(), temp_file_urls.count());
|
||||
temp_ext_files.file_ids_ = ObArrayWrap<int64_t>(temp_file_ids.get_data(), temp_file_ids.count());
|
||||
temp_ext_files.file_sizes_ = ObArrayWrap<int64_t>(temp_file_sizes.get_data(), temp_file_sizes.count());
|
||||
OZ (kv_cache_.put_and_fetch(key, temp_ext_files, ext_files, handle, true));
|
||||
}
|
||||
}
|
||||
@ -507,7 +523,7 @@ int ObExternalTableFileManager::lock_for_refresh(
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObExternalFileInfo, file_url_, file_id_, file_addr_);
|
||||
OB_SERIALIZE_MEMBER(ObExternalFileInfo, file_url_, file_id_, file_addr_, file_size_);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,11 +21,12 @@ namespace oceanbase {
|
||||
namespace share {
|
||||
|
||||
struct ObExternalFileInfo {
|
||||
ObExternalFileInfo() : file_id_(INT64_MAX) {}
|
||||
ObExternalFileInfo() : file_id_(INT64_MAX), file_size_(0) {}
|
||||
common::ObString file_url_;
|
||||
int64_t file_id_;
|
||||
common::ObAddr file_addr_;
|
||||
TO_STRING_KV(K_(file_url), K_(file_id), K_(file_addr));
|
||||
int64_t file_size_;
|
||||
TO_STRING_KV(K_(file_url), K_(file_id), K_(file_addr), K_(file_size));
|
||||
OB_UNIS_VERSION(1);
|
||||
};
|
||||
|
||||
@ -67,6 +68,7 @@ public:
|
||||
public:
|
||||
ObArrayWrap<ObString> file_urls_;
|
||||
ObArrayWrap<int64_t> file_ids_;
|
||||
ObArrayWrap<int64_t> file_sizes_;
|
||||
int64_t create_ts_;
|
||||
};
|
||||
|
||||
|
||||
@ -365,5 +365,62 @@ int ObExternalTableUtils::filter_external_table_files(const ObString &pattern,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExternalTableUtils::calc_assigned_files_to_sqcs(
|
||||
const ObIArray<ObExternalFileInfo> &files,
|
||||
ObIArray<int64_t> &assigned_idx,
|
||||
int64_t sqc_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
struct SqcFileSet {
|
||||
int64_t total_file_size_;
|
||||
int64_t sqc_idx_;
|
||||
TO_STRING_KV(K(total_file_size_), K(sqc_idx_));
|
||||
};
|
||||
|
||||
struct SqcFileSetCmp {
|
||||
bool operator()(const SqcFileSet &l, const SqcFileSet &r) {
|
||||
return l.total_file_size_ < r.total_file_size_;
|
||||
}
|
||||
int get_error_code() { return OB_SUCCESS; }
|
||||
};
|
||||
|
||||
struct FileInfoWithIdx {
|
||||
const ObExternalFileInfo *file_info_;
|
||||
int64_t file_idx_;
|
||||
TO_STRING_KV(K(file_idx_));
|
||||
};
|
||||
SqcFileSetCmp temp_cmp;
|
||||
ObBinaryHeap<SqcFileSet, SqcFileSetCmp> heap(temp_cmp);
|
||||
ObArray<FileInfoWithIdx> sorted_files;
|
||||
OZ (sorted_files.reserve(files.count()));
|
||||
OZ (assigned_idx.prepare_allocate(files.count()));
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < files.count(); i++) {
|
||||
FileInfoWithIdx file_info;
|
||||
file_info.file_info_ = &(files.at(i));
|
||||
file_info.file_idx_ = i;
|
||||
OZ (sorted_files.push_back(file_info));
|
||||
}
|
||||
std::sort(sorted_files.begin(), sorted_files.end(),
|
||||
[](const FileInfoWithIdx &l, const FileInfoWithIdx &r) -> bool {
|
||||
return l.file_info_->file_size_ >= r.file_info_->file_size_; });
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < sqc_count; i++) {
|
||||
SqcFileSet new_set;
|
||||
new_set.total_file_size_ = sorted_files.at(i).file_info_->file_size_;
|
||||
new_set.sqc_idx_ = i;
|
||||
OZ (heap.push(new_set));
|
||||
assigned_idx.at(sorted_files.at(i).file_idx_) = i;
|
||||
}
|
||||
for (int64_t i = sqc_count; OB_SUCC(ret) && i < sorted_files.count(); i++) {
|
||||
//assign file to set with the minimum total file size
|
||||
SqcFileSet cur_min_set = heap.top();
|
||||
cur_min_set.total_file_size_ += sorted_files.at(i).file_info_->file_size_;
|
||||
assigned_idx.at(sorted_files.at(i).file_idx_) = cur_min_set.sqc_idx_;
|
||||
OZ (heap.pop());
|
||||
OZ (heap.push(cur_min_set));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace share
|
||||
} // namespace oceanbase
|
||||
|
||||
@ -81,6 +81,10 @@ class ObExternalTableUtils {
|
||||
static int filter_external_table_files(const common::ObString &pattern,
|
||||
sql::ObExecContext &exec_ctx,
|
||||
common::ObIArray<common::ObString> &file_urls);
|
||||
static int calc_assigned_files_to_sqcs(
|
||||
const common::ObIArray<ObExternalFileInfo> &files,
|
||||
common::ObIArray<int64_t> &assigned_idx,
|
||||
int64_t sqc_count);
|
||||
private:
|
||||
static bool is_left_edge(const common::ObObj &value);
|
||||
static bool is_right_edge(const common::ObObj &value);
|
||||
|
||||
Reference in New Issue
Block a user