[chore](be) Acquire and check MD5 digest of the file to download (#37418)
Cherry-pick #35807, #36621, #36726
This commit is contained in:
@ -417,8 +417,8 @@ Status SnapshotLoader::remote_http_download(
|
||||
// Step before, validate all remote
|
||||
|
||||
// Step 1: Validate local tablet snapshot paths
|
||||
for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
|
||||
auto& path = remote_tablet_snapshot.local_snapshot_path;
|
||||
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
|
||||
const auto& path = remote_tablet_snapshot.local_snapshot_path;
|
||||
bool res = true;
|
||||
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res));
|
||||
if (!res) {
|
||||
@ -433,10 +433,10 @@ Status SnapshotLoader::remote_http_download(
|
||||
// Step 2: get all local files
|
||||
struct LocalFileStat {
|
||||
uint64_t size;
|
||||
// TODO(Drogon): add md5sum
|
||||
std::string md5;
|
||||
};
|
||||
std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map;
|
||||
for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
|
||||
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
|
||||
const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
|
||||
std::vector<std::string> local_files;
|
||||
RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files));
|
||||
@ -452,7 +452,14 @@ Status SnapshotLoader::remote_http_download(
|
||||
return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path,
|
||||
ec.message());
|
||||
}
|
||||
local_filestat[local_file] = {local_file_size};
|
||||
std::string md5;
|
||||
auto status = io::global_local_filesystem()->md5sum(local_file_path, &md5);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "download file error, local file " << local_file_path
|
||||
<< " md5sum: " << status.to_string();
|
||||
return status;
|
||||
}
|
||||
local_filestat[local_file] = {local_file_size, md5};
|
||||
}
|
||||
}
|
||||
|
||||
@ -465,22 +472,22 @@ Status SnapshotLoader::remote_http_download(
|
||||
int total_num = remote_tablet_snapshots.size();
|
||||
int finished_num = 0;
|
||||
struct RemoteFileStat {
|
||||
// TODO(Drogon): Add md5sum
|
||||
std::string url;
|
||||
std::string md5;
|
||||
uint64_t size;
|
||||
};
|
||||
std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>>
|
||||
remote_files_map;
|
||||
for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
|
||||
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
|
||||
const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
|
||||
auto& remote_files = remote_files_map[remote_path];
|
||||
const auto& token = remote_tablet_snapshot.remote_token;
|
||||
const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
|
||||
|
||||
// HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
|
||||
std::string remote_url_prefix =
|
||||
fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}",
|
||||
remote_be_addr.hostname, remote_be_addr.port, token, remote_path);
|
||||
std::string base_url = fmt::format("http://{}:{}/api/_tablet/_download?token={}",
|
||||
remote_be_addr.hostname, remote_be_addr.port, token);
|
||||
std::string remote_url_prefix = fmt::format("{}&file={}", base_url, remote_path);
|
||||
|
||||
string file_list_str;
|
||||
auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) {
|
||||
@ -493,30 +500,31 @@ Status SnapshotLoader::remote_http_download(
|
||||
strings::Split(file_list_str, "\n", strings::SkipWhitespace());
|
||||
|
||||
for (const auto& filename : filename_list) {
|
||||
std::string remote_file_url = fmt::format(
|
||||
"http://{}:{}/api/_tablet/_download?token={}&file={}/{}&channel=ingest_binlog",
|
||||
remote_tablet_snapshot.remote_be_addr.hostname,
|
||||
remote_tablet_snapshot.remote_be_addr.port, remote_tablet_snapshot.remote_token,
|
||||
remote_tablet_snapshot.remote_snapshot_path, filename);
|
||||
std::string remote_file_url =
|
||||
fmt::format("{}&file={}/{}&channel=ingest_binlog", base_url,
|
||||
remote_tablet_snapshot.remote_snapshot_path, filename);
|
||||
|
||||
// get file length
|
||||
uint64_t file_size = 0;
|
||||
auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(remote_file_url));
|
||||
std::string file_md5;
|
||||
auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) {
|
||||
std::string url = fmt::format("{}&acquire_md5=true", remote_file_url);
|
||||
RETURN_IF_ERROR(client->init(url));
|
||||
client->set_timeout_ms(kGetLengthTimeout * 1000);
|
||||
RETURN_IF_ERROR(client->head());
|
||||
RETURN_IF_ERROR(client->get_content_length(&file_size));
|
||||
RETURN_IF_ERROR(client->get_content_md5(&file_md5));
|
||||
return Status::OK();
|
||||
};
|
||||
RETURN_IF_ERROR(
|
||||
HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_size_cb));
|
||||
HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_stat_cb));
|
||||
|
||||
remote_files[filename] = RemoteFileStat {remote_file_url, file_size};
|
||||
remote_files[filename] = RemoteFileStat {remote_file_url, file_md5, file_size};
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Compare local and remote files && get all need download files
|
||||
for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
|
||||
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
|
||||
RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
|
||||
TTaskType::type::DOWNLOAD));
|
||||
|
||||
@ -529,8 +537,8 @@ Status SnapshotLoader::remote_http_download(
|
||||
// get all need download files
|
||||
std::vector<std::string> need_download_files;
|
||||
for (const auto& [remote_file, remote_filestat] : remote_files) {
|
||||
LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file,
|
||||
remote_filestat.size);
|
||||
LOG(INFO) << "remote file: " << remote_file << ", size: " << remote_filestat.size
|
||||
<< ", md5: " << remote_filestat.md5;
|
||||
auto it = local_files.find(remote_file);
|
||||
if (it == local_files.end()) {
|
||||
need_download_files.emplace_back(remote_file);
|
||||
@ -545,7 +553,11 @@ Status SnapshotLoader::remote_http_download(
|
||||
need_download_files.emplace_back(remote_file);
|
||||
continue;
|
||||
}
|
||||
// TODO(Drogon): check by md5sum, if not match then download
|
||||
|
||||
if (auto& local_filestat = it->second; local_filestat.md5 != remote_filestat.md5) {
|
||||
need_download_files.emplace_back(remote_file);
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file);
|
||||
}
|
||||
@ -569,6 +581,7 @@ Status SnapshotLoader::remote_http_download(
|
||||
auto& remote_filestat = remote_files[filename];
|
||||
auto file_size = remote_filestat.size;
|
||||
auto& remote_file_url = remote_filestat.url;
|
||||
auto& remote_file_md5 = remote_filestat.md5;
|
||||
|
||||
// check disk capacity
|
||||
if (data_dir->reach_capacity_limit(file_size)) {
|
||||
@ -591,8 +604,8 @@ Status SnapshotLoader::remote_http_download(
|
||||
<< " to: " << local_file_path << ". size(B): " << file_size
|
||||
<< ", timeout(s): " << estimate_timeout;
|
||||
|
||||
auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path,
|
||||
file_size](HttpClient* client) {
|
||||
auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout,
|
||||
&local_file_path, file_size](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(remote_file_url));
|
||||
client->set_timeout_ms(estimate_timeout * 1000);
|
||||
RETURN_IF_ERROR(client->download(local_file_path));
|
||||
@ -612,13 +625,35 @@ Status SnapshotLoader::remote_http_download(
|
||||
<< ", local_file_size=" << local_file_size;
|
||||
return Status::InternalError("downloaded file size is not equal");
|
||||
}
|
||||
|
||||
if (!remote_file_md5.empty()) { // keep compatibility
|
||||
std::string local_file_md5;
|
||||
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path,
|
||||
&local_file_md5));
|
||||
if (local_file_md5 != remote_file_md5) {
|
||||
LOG(WARNING) << "download file md5 error"
|
||||
<< ", remote_file_url=" << remote_file_url
|
||||
<< ", local_file_path=" << local_file_path
|
||||
<< ", remote_file_md5=" << remote_file_md5
|
||||
<< ", local_file_md5=" << local_file_md5;
|
||||
return Status::RuntimeError(
|
||||
"download file {} md5 is not equal, local={}, remote={}",
|
||||
remote_file_url, local_file_md5, remote_file_md5);
|
||||
}
|
||||
}
|
||||
|
||||
return io::global_local_filesystem()->permission(
|
||||
local_file_path, io::LocalFileSystem::PERMS_OWNER_RW);
|
||||
};
|
||||
RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb));
|
||||
auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to download file from " << remote_file_url
|
||||
<< ", status: " << status.to_string();
|
||||
return status;
|
||||
}
|
||||
|
||||
// local_files always keep the updated local files
|
||||
local_files[filename] = LocalFileStat {file_size};
|
||||
local_files[filename] = LocalFileStat {file_size, remote_file_md5};
|
||||
}
|
||||
|
||||
uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
|
||||
|
||||
Reference in New Issue
Block a user