[feature] support backup/restore connect to HDFS (#10081)

This commit is contained in:
xiepengcheng01
2022-06-19 10:26:20 +08:00
committed by GitHub
parent 0e404edf54
commit 1d3496c6ab
29 changed files with 1366 additions and 101 deletions

View File

@ -1266,10 +1266,10 @@ void TaskWorkerPool::_upload_worker_thread_callback() {
std::map<int64_t, std::vector<std::string>> tablet_files;
std::unique_ptr<SnapshotLoader> loader = nullptr;
if (upload_request.__isset.storage_backend &&
upload_request.storage_backend == TStorageBackendType::S3) {
if (upload_request.__isset.storage_backend) {
loader.reset(new SnapshotLoader(_env, upload_request.job_id, agent_task_req.signature,
upload_request.broker_prop));
upload_request.broker_prop,
upload_request.storage_backend));
} else {
loader.reset(new SnapshotLoader(_env, upload_request.job_id, agent_task_req.signature,
upload_request.broker_addr,
@ -1333,10 +1333,10 @@ void TaskWorkerPool::_download_worker_thread_callback() {
std::vector<int64_t> downloaded_tablet_ids;
std::unique_ptr<SnapshotLoader> loader = nullptr;
if (download_request.__isset.storage_backend &&
download_request.storage_backend == TStorageBackendType::S3) {
if (download_request.__isset.storage_backend) {
loader.reset(new SnapshotLoader(_env, download_request.job_id, agent_task_req.signature,
download_request.broker_prop));
download_request.broker_prop,
download_request.storage_backend));
} else {
loader.reset(new SnapshotLoader(_env, download_request.job_id, agent_task_req.signature,
download_request.broker_addr,

View File

@ -23,12 +23,11 @@
#include "agent/utils.h"
#include "common/logging.h"
#include "util/string_util.h"
#include "util/uid_util.h"
#include "util/url_coding.h"
namespace doris {
const std::string TICKET_CACHE_PATH = "/tmp/krb5cc_doris_";
Status HDFSCommonBuilder::run_kinit() {
if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) {
return Status::InvalidArgument("Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab");
@ -48,6 +47,35 @@ Status HDFSCommonBuilder::run_kinit() {
return Status::OK();
}
THdfsParams parse_properties(const std::map<std::string, std::string>& properties) {
StringCaseMap<std::string> prop(properties.begin(), properties.end());
std::vector<THdfsConf> hdfs_configs;
THdfsParams hdfsParams;
for (auto iter = prop.begin(); iter != prop.end();) {
if (iter->first.compare(FS_KEY) == 0) {
hdfsParams.__set_fs_name(iter->second);
iter = prop.erase(iter);
} else if (iter->first.compare(USER) == 0) {
hdfsParams.__set_user(iter->second);
iter = prop.erase(iter);
} else if (iter->first.compare(KERBEROS_PRINCIPAL) == 0) {
hdfsParams.__set_hdfs_kerberos_principal(iter->second);
iter = prop.erase(iter);
} else if (iter->first.compare(KERBEROS_KEYTAB) == 0) {
hdfsParams.__set_hdfs_kerberos_keytab(iter->second);
iter = prop.erase(iter);
} else {
THdfsConf item;
item.key = iter->first;
item.value = iter->second;
hdfs_configs.push_back(item);
iter = prop.erase(iter);
}
}
hdfsParams.__set_hdfs_conf(hdfs_configs);
return hdfsParams;
}
HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams) {
HDFSCommonBuilder builder;
hdfsBuilderSetNameNode(builder.get(), hdfsParams.fs_name.c_str());
@ -75,4 +103,9 @@ HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams) {
return builder;
}
HDFSCommonBuilder createHDFSBuilder(const std::map<std::string, std::string>& properties) {
THdfsParams hdfsParams = parse_properties(properties);
return createHDFSBuilder(hdfsParams);
}
} // namespace doris

View File

@ -24,8 +24,16 @@
namespace doris {
const std::string FS_KEY = "fs.defaultFS";
const std::string USER = "hadoop.username";
const std::string KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
const std::string KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
const std::string TICKET_CACHE_PATH = "/tmp/krb5cc_doris_";
class HDFSCommonBuilder {
friend HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams);
friend HDFSCommonBuilder createHDFSBuilder(
const std::map<std::string, std::string>& properties);
public:
HDFSCommonBuilder() : hdfs_builder(hdfsNewBuilder()) {};
@ -42,6 +50,9 @@ private:
std::string hdfs_kerberos_principal;
};
THdfsParams parse_properties(const std::map<std::string, std::string>& properties);
HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams);
HDFSCommonBuilder createHDFSBuilder(const std::map<std::string, std::string>& properties);
} // namespace doris

View File

@ -35,10 +35,28 @@ HdfsFileReader::HdfsFileReader(const THdfsParams& hdfs_params, const std::string
_namenode = _hdfs_params.fs_name;
}
HdfsFileReader::HdfsFileReader(const std::map<std::string, std::string>& properties,
const std::string& path, int64_t start_offset)
: _path(path),
_current_offset(start_offset),
_file_size(-1),
_hdfs_fs(nullptr),
_hdfs_file(nullptr),
_builder(createHDFSBuilder(properties)) {
_parse_properties(properties);
}
HdfsFileReader::~HdfsFileReader() {
close();
}
void HdfsFileReader::_parse_properties(const std::map<std::string, std::string>& prop) {
auto iter = prop.find(FS_KEY);
if (iter != prop.end()) {
_namenode = iter->second;
}
}
Status HdfsFileReader::connect() {
if (_builder.is_need_kinit()) {
RETURN_IF_ERROR(_builder.run_kinit());
@ -54,6 +72,16 @@ Status HdfsFileReader::connect() {
}
Status HdfsFileReader::open() {
if (_namenode.empty()) {
LOG(WARNING) << "hdfs properties is incorrect.";
return Status::InternalError("hdfs properties is incorrect");
}
// if the format of _path is hdfs://ip:port/path, replace it to /path.
// path like hdfs://ip:port/path can't be used by libhdfs3.
if (_path.find(_namenode) != _path.npos) {
_path = _path.substr(_namenode.size());
}
if (!closed()) {
close();
}
@ -62,12 +90,11 @@ Status HdfsFileReader::open() {
if (_hdfs_file == nullptr) {
std::stringstream ss;
ss << "open file failed. "
<< "(BE: " << BackendOptions::get_localhost() << ")" << _namenode << _path
<< ", err: " << strerror(errno);
;
<< "(BE: " << BackendOptions::get_localhost() << ")"
<< " namenode:" << _namenode << ", path:" << _path << ", err: " << strerror(errno);
return Status::InternalError(ss.str());
}
LOG(INFO) << "open file. " << _namenode << _path;
LOG(INFO) << "open file, namenode:" << _namenode << ", path:" << _path;
return seek(_current_offset);
}
@ -91,7 +118,7 @@ void HdfsFileReader::close() {
}
bool HdfsFileReader::closed() {
return _hdfs_file == nullptr || _hdfs_fs == nullptr;
return _hdfs_file == nullptr && _hdfs_fs == nullptr;
}
// Read all bytes
@ -102,7 +129,7 @@ Status HdfsFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t
*length = 0;
return Status::OK();
}
bool eof;
bool eof = false;
buf->reset(new uint8_t[file_size]);
read(buf->get(), file_size, length, &eof);
return Status::OK();

View File

@ -26,6 +26,8 @@ namespace doris {
class HdfsFileReader : public FileReader {
public:
HdfsFileReader(const THdfsParams& hdfs_params, const std::string& path, int64_t start_offset);
HdfsFileReader(const std::map<std::string, std::string>& properties, const std::string& path,
int64_t start_offset);
virtual ~HdfsFileReader();
virtual Status open() override;
@ -46,11 +48,12 @@ public:
private:
Status connect();
void _parse_properties(const std::map<std::string, std::string>& prop);
private:
THdfsParams _hdfs_params;
std::string _namenode;
std::string _path;
std::string _namenode = "";
std::string _path = "";
int64_t _current_offset;
int64_t _file_size;
hdfsFS _hdfs_fs;

View File

@ -28,7 +28,14 @@ Status HdfsReaderWriter::create_reader(const THdfsParams& hdfs_params, const std
return Status::OK();
}
Status HdfsReaderWriter::create_writer(std::map<std::string, std::string>& properties,
Status HdfsReaderWriter::create_reader(const std::map<std::string, std::string>& hdfs_params,
const std::string& path, int64_t start_offset,
FileReader** reader) {
*reader = new HdfsFileReader(hdfs_params, path, start_offset);
return Status::OK();
}
Status HdfsReaderWriter::create_writer(const std::map<std::string, std::string>& properties,
const std::string& path,
std::unique_ptr<FileWriter>& writer) {
writer.reset(new HDFSWriter(properties, path));

View File

@ -34,7 +34,10 @@ public:
static Status create_reader(const THdfsParams& hdfs_params, const std::string& path,
int64_t start_offset, FileReader** reader);
static Status create_writer(std::map<std::string, std::string>& properties,
static Status create_reader(const std::map<std::string, std::string>& properties,
const std::string& path, int64_t start_offset, FileReader** reader);
static Status create_writer(const std::map<std::string, std::string>& properties,
const std::string& path, std::unique_ptr<FileWriter>& writer);
};

View File

@ -22,18 +22,15 @@
#include "service/backend_options.h"
namespace doris {
const static std::string FS_KEY = "fs.defaultFS";
const static std::string USER = "hadoop.username";
const static std::string KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
const static std::string KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
const static std::string TOKEN = "token";
HDFSWriter::HDFSWriter(std::map<std::string, std::string>& properties, const std::string& path)
HDFSWriter::HDFSWriter(const std::map<std::string, std::string>& properties,
const std::string& path)
: _properties(properties),
_path(path),
_hdfs_fs(nullptr),
_hdfs_params(_parse_properties(_properties)),
_builder(createHDFSBuilder(_hdfs_params)) {}
_builder(createHDFSBuilder(_properties)) {
_parse_properties(_properties);
}
HDFSWriter::~HDFSWriter() {
close();
@ -85,7 +82,7 @@ Status HDFSWriter::open() {
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
LOG(INFO) << "open file. namenode:" << _namenode << " path:" << _path;
LOG(INFO) << "open file. namenode:" << _namenode << ", path:" << _path;
return Status::OK();
}
@ -152,34 +149,11 @@ Status HDFSWriter::_connect() {
return Status::OK();
}
THdfsParams HDFSWriter::_parse_properties(std::map<std::string, std::string>& prop) {
std::map<std::string, std::string>::iterator iter;
std::vector<THdfsConf> hdfs_configs;
THdfsParams hdfsParams;
for (iter = prop.begin(); iter != prop.end();) {
if (iter->first.compare(FS_KEY) == 0) {
_namenode = iter->second;
hdfsParams.__set_fs_name(_namenode);
iter = prop.erase(iter);
} else if (iter->first.compare(USER) == 0) {
hdfsParams.__set_user(iter->second);
iter = prop.erase(iter);
} else if (iter->first.compare(KERBEROS_PRINCIPAL) == 0) {
hdfsParams.__set_hdfs_kerberos_principal(iter->second);
iter = prop.erase(iter);
} else if (iter->first.compare(KERBEROS_KEYTAB) == 0) {
hdfsParams.__set_hdfs_kerberos_keytab(iter->second);
iter = prop.erase(iter);
} else {
THdfsConf item;
item.key = iter->first;
item.value = iter->second;
hdfs_configs.push_back(item);
iter = prop.erase(iter);
}
void HDFSWriter::_parse_properties(const std::map<std::string, std::string>& prop) {
auto iter = prop.find(FS_KEY);
if (iter != prop.end()) {
_namenode = iter->second;
}
hdfsParams.__set_hdfs_conf(hdfs_configs);
return hdfsParams;
}
} // end namespace doris

View File

@ -27,7 +27,7 @@
namespace doris {
class HDFSWriter : public FileWriter {
public:
HDFSWriter(std::map<std::string, std::string>& properties, const std::string& path);
HDFSWriter(const std::map<std::string, std::string>& properties, const std::string& path);
~HDFSWriter();
Status open() override;
@ -39,15 +39,15 @@ public:
private:
Status _connect();
THdfsParams _parse_properties(std::map<std::string, std::string>& prop);
void _parse_properties(const std::map<std::string, std::string>& prop);
private:
std::map<std::string, std::string> _properties;
std::string _namenode = "";
std::string _path = "";
hdfsFS _hdfs_fs = nullptr;
hdfsFile _hdfs_file = nullptr;
bool _closed = false;
THdfsParams _hdfs_params;
HDFSCommonBuilder _builder;
};

View File

@ -34,6 +34,7 @@
#include "runtime/exec_env.h"
#include "util/broker_storage_backend.h"
#include "util/file_utils.h"
#include "util/hdfs_storage_backend.h"
#include "util/s3_storage_backend.h"
#include "util/thrift_rpc_helper.h"
@ -59,13 +60,20 @@ SnapshotLoader::SnapshotLoader(ExecEnv* env, int64_t job_id, int64_t task_id)
_storage_backend(nullptr) {}
SnapshotLoader::SnapshotLoader(ExecEnv* env, int64_t job_id, int64_t task_id,
const std::map<std::string, std::string>& prop)
const std::map<std::string, std::string>& prop,
TStorageBackendType::type type)
: _env(env),
_job_id(job_id),
_task_id(task_id),
_broker_addr(TNetworkAddress()),
_prop(prop) {
_storage_backend.reset(new S3StorageBackend(_prop));
if (TStorageBackendType::type::S3 == type) {
_storage_backend.reset(new S3StorageBackend(_prop));
} else if (TStorageBackendType::type::HDFS == type) {
_storage_backend.reset(new HDFSStorageBackend(_prop));
} else {
_storage_backend = nullptr;
}
}
SnapshotLoader::~SnapshotLoader() = default;

View File

@ -49,10 +49,10 @@ struct FileStat;
* It will also only download files which does not exist in local dir.
*
* Move:
* move() is the final step of restore process. it will replace the
* move() is the final step of restore process. it will replace the
* old tablet data dir with the newly downloaded snapshot dir.
* and reload the tablet header to take this tablet on line.
*
*
*/
class SnapshotLoader {
public:
@ -61,7 +61,8 @@ public:
const TNetworkAddress& broker_addr,
const std::map<std::string, std::string>& broker_prop);
SnapshotLoader(ExecEnv* env, int64_t job_id, int64_t task_id,
const std::map<std::string, std::string>& broker_prop);
const std::map<std::string, std::string>& broker_prop,
TStorageBackendType::type type);
~SnapshotLoader();

View File

@ -103,6 +103,8 @@ set(UTIL_FILES
s3_uri.cpp
s3_storage_backend.cpp
s3_util.cpp
hdfs_storage_backend.cpp
hdfs_util.cpp
topn_counter.cpp
tuple_row_zorder_compare.cpp
quantile_state.cpp

View File

@ -0,0 +1,309 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/hdfs_storage_backend.h"
#include "io/hdfs_file_reader.h"
#include "io/hdfs_reader_writer.h"
#include "io/hdfs_writer.h"
#include "olap/file_helper.h"
#include "util/hdfs_util.h"
namespace doris {
#ifndef CHECK_HDFS_CLIENT
#define CHECK_HDFS_CLIENT(client) \
if (!client) { \
return Status::InternalError("init hdfs client error."); \
}
#endif
static const std::string hdfs_file_prefix = "hdfs://";
HDFSStorageBackend::HDFSStorageBackend(const std::map<std::string, std::string>& prop)
: _properties(prop), _builder(createHDFSBuilder(_properties)) {
_hdfs_fs = HDFSHandle::instance().create_hdfs_fs(_builder);
DCHECK(_hdfs_fs) << "init hdfs client error.";
}
HDFSStorageBackend::~HDFSStorageBackend() {
close();
}
Status HDFSStorageBackend::close() {
if (_hdfs_fs != nullptr) {
hdfsDisconnect(_hdfs_fs);
_hdfs_fs = nullptr;
}
return Status::OK();
}
// if the format of path is hdfs://ip:port/path, replace it to /path.
// path like hdfs://ip:port/path can't be used by libhdfs3.
std::string HDFSStorageBackend::parse_path(const std::string& path) {
if (path.find(hdfs_file_prefix) != path.npos) {
std::string temp = path.substr(hdfs_file_prefix.size());
std::size_t pos = temp.find_first_of('/');
return temp.substr(pos);
} else {
return path;
}
}
Status HDFSStorageBackend::upload(const std::string& local, const std::string& remote) {
FileHandler file_handler;
Status ost = file_handler.open(local, O_RDONLY);
if (!ost.ok()) {
return Status::InternalError("failed to open file: " + local);
}
size_t file_len = file_handler.length();
if (file_len == -1) {
return Status::InternalError("failed to get length of file: " + local);
}
std::unique_ptr<HDFSWriter> hdfs_writer(new HDFSWriter(_properties, remote));
RETURN_IF_ERROR(hdfs_writer->open());
constexpr size_t buf_sz = 1024 * 1024;
char read_buf[buf_sz];
size_t left_len = file_len;
size_t read_offset = 0;
while (left_len > 0) {
size_t read_len = left_len > buf_sz ? buf_sz : left_len;
ost = file_handler.pread(read_buf, read_len, read_offset);
if (!ost.ok()) {
return Status::InternalError("failed to read file: " + local);
}
size_t write_len = 0;
RETURN_IF_ERROR(hdfs_writer->write(reinterpret_cast<const uint8_t*>(read_buf), read_len,
&write_len));
DCHECK_EQ(write_len, read_len);
read_offset += read_len;
left_len -= read_len;
}
LOG(INFO) << "finished to write file: " << local << ", length: " << file_len;
return Status::OK();
}
Status HDFSStorageBackend::direct_upload(const std::string& remote, const std::string& content) {
std::unique_ptr<HDFSWriter> hdfs_writer(new HDFSWriter(_properties, remote));
RETURN_IF_ERROR(hdfs_writer->open());
size_t write_len = 0;
RETURN_IF_ERROR(hdfs_writer->write(reinterpret_cast<const uint8_t*>(content.c_str()),
content.size(), &write_len));
DCHECK_EQ(write_len, content.size());
return Status::OK();
}
Status HDFSStorageBackend::list(const std::string& remote_path, bool contain_md5, bool recursion,
std::map<std::string, FileStat>* files) {
CHECK_HDFS_CLIENT(_hdfs_fs);
std::string normal_str = parse_path(remote_path);
int exists = hdfsExists(_hdfs_fs, normal_str.c_str());
if (exists != 0) {
LOG(INFO) << "path does not exist: " << normal_str << ", err: " << strerror(errno);
return Status::OK();
}
int file_num = 0;
hdfsFileInfo* files_info = hdfsListDirectory(_hdfs_fs, normal_str.c_str(), &file_num);
if (files_info == nullptr) {
std::stringstream ss;
ss << "failed to list files from remote path: " << normal_str
<< ", err: " << strerror(errno);
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
LOG(INFO) << "finished to list files from remote path:" << normal_str
<< ", file num:" << file_num;
for (int i = 0; i < file_num; ++i) {
auto file_info = files_info[i];
if (file_info.mKind == kObjectKindDirectory) {
continue;
}
const std::string& file_name_with_path(file_info.mName);
// get filename
std::filesystem::path file_path(file_name_with_path);
std::string file_name = file_path.filename();
size_t pos = file_name.find_last_of(".");
if (pos == std::string::npos || pos == file_name.size() - 1) {
// Not found checksum separator, ignore this file
continue;
}
FileStat stat = {std::string(file_name, 0, pos), std::string(file_name, pos + 1),
file_info.mSize};
files->emplace(std::string(file_name, 0, pos), stat);
VLOG(2) << "split remote file: " << std::string(file_name, 0, pos)
<< ", checksum: " << std::string(file_name, pos + 1);
}
hdfsFreeFileInfo(files_info, file_num);
LOG(INFO) << "finished to split files. valid file num: " << files->size();
return Status::OK();
}
Status HDFSStorageBackend::download(const std::string& remote, const std::string& local) {
// 1. open remote file for read
std::unique_ptr<HdfsFileReader> hdfs_reader(new HdfsFileReader(_properties, remote, 0));
RETURN_IF_ERROR(hdfs_reader->open());
// 2. remove the existing local file if exist
if (std::filesystem::remove(local)) {
LOG(INFO) << "remove the previously exist local file: " << local;
}
// 3. open local file for write
FileHandler file_handler;
Status ost =
file_handler.open_with_mode(local, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR);
if (!ost.ok()) {
return Status::InternalError("failed to open file: " + local);
}
// 4. read remote and write to local
LOG(INFO) << "read remote file: " << remote << " to local: " << local;
constexpr size_t buf_sz = 1024 * 1024;
char read_buf[buf_sz];
size_t write_offset = 0;
bool eof = false;
while (!eof) {
int64_t read_len = 0;
RETURN_IF_ERROR(
hdfs_reader->read(reinterpret_cast<uint8_t*>(read_buf), buf_sz, &read_len, &eof));
if (eof) {
continue;
}
if (read_len > 0) {
ost = file_handler.pwrite(read_buf, read_len, write_offset);
if (!ost.ok()) {
return Status::InternalError("failed to write file: " + local);
}
write_offset += read_len;
}
}
return Status::OK();
}
Status HDFSStorageBackend::direct_download(const std::string& remote, std::string* content) {
std::unique_ptr<HdfsFileReader> hdfs_reader(new HdfsFileReader(_properties, remote, 0));
RETURN_IF_ERROR(hdfs_reader->open());
constexpr size_t buf_sz = 1024 * 1024;
char read_buf[buf_sz];
size_t write_offset = 0;
bool eof = false;
while (!eof) {
int64_t read_len = 0;
RETURN_IF_ERROR(
hdfs_reader->read(reinterpret_cast<uint8_t*>(read_buf), buf_sz, &read_len, &eof));
if (eof) {
continue;
}
if (read_len > 0) {
content->insert(write_offset, read_buf, read_len);
write_offset += read_len;
}
}
return Status::OK();
}
Status HDFSStorageBackend::upload_with_checksum(const std::string& local, const std::string& remote,
const std::string& checksum) {
std::string temp = remote + ".part";
std::string final = remote + "." + checksum;
RETURN_IF_ERROR(upload(local, temp));
return rename(temp, final);
}
Status HDFSStorageBackend::rename(const std::string& orig_name, const std::string& new_name) {
CHECK_HDFS_CLIENT(_hdfs_fs);
std::string normal_orig_name = parse_path(orig_name);
std::string normal_new_name = parse_path(new_name);
int ret = hdfsRename(_hdfs_fs, normal_orig_name.c_str(), normal_new_name.c_str());
if (ret == 0) {
LOG(INFO) << "finished to rename file. orig: " << normal_orig_name
<< ", new: " << normal_new_name;
return Status::OK();
} else {
std::stringstream ss;
ss << "Fail to rename file: " << normal_orig_name << " to: " << normal_new_name;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
}
Status HDFSStorageBackend::rename_dir(const std::string& orig_name, const std::string& new_name) {
return rename(orig_name, new_name);
}
Status HDFSStorageBackend::copy(const std::string& src, const std::string& dst) {
return Status::NotSupported("copy not implemented!");
}
Status HDFSStorageBackend::copy_dir(const std::string& src, const std::string& dst) {
return copy(src, dst);
}
Status HDFSStorageBackend::mkdir(const std::string& path) {
return Status::NotSupported("mkdir not implemented!");
}
Status HDFSStorageBackend::mkdirs(const std::string& path) {
return Status::NotSupported("mkdirs not implemented!");
}
Status HDFSStorageBackend::exist(const std::string& path) {
CHECK_HDFS_CLIENT(_hdfs_fs);
int exists = hdfsExists(_hdfs_fs, path.c_str());
if (exists == 0) {
return Status::OK();
} else {
return Status::NotFound(path + " not exists!");
}
}
Status HDFSStorageBackend::exist_dir(const std::string& path) {
return exist(path);
}
Status HDFSStorageBackend::rm(const std::string& remote) {
CHECK_HDFS_CLIENT(_hdfs_fs);
int ret = hdfsDelete(_hdfs_fs, remote.c_str(), 1);
if (ret == 0) {
return Status::OK();
} else {
std::stringstream ss;
ss << "failed to delete from remote path: " << remote;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
}
Status HDFSStorageBackend::rmdir(const std::string& remote) {
return rm(remote);
}
} // end namespace doris

View File

@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <hdfs/hdfs.h>
#include "io/hdfs_builder.h"
#include "util/storage_backend.h"
namespace doris {
class HDFSStorageBackend : public StorageBackend {
public:
HDFSStorageBackend(const std::map<std::string, std::string>& prop);
~HDFSStorageBackend();
Status close();
Status download(const std::string& remote, const std::string& local) override;
Status direct_download(const std::string& remote, std::string* content) override;
Status upload(const std::string& local, const std::string& remote) override;
Status upload_with_checksum(const std::string& local, const std::string& remote,
const std::string& checksum) override;
Status list(const std::string& remote_path, bool contain_md5, bool recursion,
std::map<std::string, FileStat>* files) override;
Status rename(const std::string& orig_name, const std::string& new_name) override;
Status rename_dir(const std::string& orig_name, const std::string& new_name) override;
Status direct_upload(const std::string& remote, const std::string& content) override;
Status rm(const std::string& remote) override;
Status rmdir(const std::string& remote) override;
Status copy(const std::string& src, const std::string& dst) override;
Status copy_dir(const std::string& src, const std::string& dst) override;
Status mkdir(const std::string& path) override;
Status mkdirs(const std::string& path) override;
Status exist(const std::string& path) override;
Status exist_dir(const std::string& path) override;
private:
// if the format of path is hdfs://ip:port/path, replace it to /path.
// path like hdfs://ip:port/path can't be used by libhdfs3.
std::string parse_path(const std::string& path);
private:
std::map<std::string, std::string> _properties;
HDFSCommonBuilder _builder;
hdfsFS _hdfs_fs = nullptr;
};
} // end namespace doris

49
be/src/util/hdfs_util.cpp Normal file
View File

@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/hdfs_util.h"
#include <util/string_util.h>
#include "common/config.h"
#include "util/logging.h"
namespace doris {
HDFSHandle& HDFSHandle::instance() {
static HDFSHandle hdfs_handle;
return hdfs_handle;
}
hdfsFS HDFSHandle::create_hdfs_fs(HDFSCommonBuilder& hdfs_builder) {
if (hdfs_builder.is_need_kinit()) {
Status status = hdfs_builder.run_kinit();
if (!status.ok()) {
LOG(WARNING) << status.get_error_msg();
return nullptr;
}
}
hdfsFS hdfs_fs = hdfsBuilderConnect(hdfs_builder.get());
if (hdfs_fs == nullptr) {
LOG(WARNING) << "connect to hdfs failed."
<< ", error: " << hdfsGetLastError();
return nullptr;
}
return hdfs_fs;
}
} // namespace doris

43
be/src/util/hdfs_util.h Normal file
View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <hdfs/hdfs.h>
#include <map>
#include <memory>
#include <string>
#include "common/status.h"
#include "io/hdfs_builder.h"
namespace doris {
class HDFSHandle {
public:
~HDFSHandle() {}
static HDFSHandle& instance();
hdfsFS create_hdfs_fs(HDFSCommonBuilder& builder);
private:
HDFSHandle() {}
};
} // namespace doris

View File

@ -62,13 +62,10 @@ bool ClientFactory::is_s3_conf_valid(const std::map<std::string, std::string>& p
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
const std::map<std::string, std::string>& prop) {
StringCaseMap<std::string> properties(prop.begin(), prop.end());
if (properties.find(S3_AK) == properties.end() || properties.find(S3_SK) == properties.end() ||
properties.find(S3_ENDPOINT) == properties.end() ||
properties.find(S3_REGION) == properties.end()) {
DCHECK(false) << "aws properties is incorrect.";
LOG(ERROR) << "aws properties is incorrect.";
if (!is_s3_conf_valid(prop)) {
return nullptr;
}
StringCaseMap<std::string> properties(prop.begin(), prop.end());
Aws::Auth::AWSCredentials aws_cred(properties.find(S3_AK)->second,
properties.find(S3_SK)->second);
DCHECK(!aws_cred.IsExpiredOrEmpty());

View File

@ -318,6 +318,7 @@ set(UTIL_TEST_FILES
util/tuple_row_zorder_compare_test.cpp
util/array_parser_test.cpp
util/quantile_state_test.cpp
util/hdfs_storage_backend_test.cpp
)
set(VEC_TEST_FILES
vec/aggregate_functions/agg_test.cpp

View File

@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/hdfs_storage_backend.h"
#include <gtest/gtest.h>
#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <fstream>
#include <map>
#include <string>
#include "util/file_utils.h"
#include "util/storage_backend.h"
namespace doris {
static const std::string fs_name = "hdfs://127.0.0.1:9000"; // An invalid address
static const std::string user = "test";
static const std::string base_path = "/user/test";
#define HDFSStorageBackendTest DISABLED_HDFSStorageBackendTest
class HDFSStorageBackendTest : public testing::Test {
public:
HDFSStorageBackendTest() : _properties({{FS_KEY, fs_name}, {USER, user}}) {
_fs.reset(new HDFSStorageBackend(_properties));
_base_path = base_path + "/" + gen_uuid();
}
virtual ~HDFSStorageBackendTest() {}
protected:
virtual void SetUp() {
_test_file = "/tmp/" + gen_uuid();
std::ofstream out(_test_file);
out << _content;
out.close();
}
virtual void TearDown() { remove(_test_file.c_str()); }
std::string gen_uuid() {
auto id = boost::uuids::random_generator()();
return boost::lexical_cast<std::string>(id);
}
std::unique_ptr<HDFSStorageBackend> _fs;
std::map<std::string, std::string> _properties;
std::string _test_file;
std::string _base_path;
std::string _content =
"O wild West Wind, thou breath of Autumn's being\n"
"Thou, from whose unseen presence the leaves dead\n"
"Are driven, like ghosts from an enchanter fleeing,\n"
"Yellow, and black, and pale, and hectic red,\n"
"Pestilence-stricken multitudes:O thou\n"
"Who chariotest to their dark wintry bed\n"
"The winged seeds, where they lie cold and low,\n"
"Each like a corpse within its grave, until\n"
"Thine azure sister of the Spring shall blow\n"
"Her clarion o'er the dreaming earth, and fill\n"
"(Driving sweet buds like flocks to feed in air)\n"
"With living hues and odors plain and hill:\n"
"Wild Spirit, which art moving everywhere;\n"
"Destroyer and preserver; hear, oh, hear!";
};
TEST_F(HDFSStorageBackendTest, hdfs_upload) {
Status status = _fs->upload(_test_file, _base_path + "/Ode_to_the_West_Wind.txt");
EXPECT_TRUE(status.ok());
status = _fs->exist(_base_path + "/Ode_to_the_West_Wind.txt");
EXPECT_TRUE(status.ok());
std::string orig_md5sum;
FileUtils::md5sum(_test_file, &orig_md5sum);
status = _fs->download(_base_path + "/Ode_to_the_West_Wind.txt", _test_file + ".download");
EXPECT_TRUE(status.ok());
std::string download_md5sum;
FileUtils::md5sum(_test_file + ".download", &download_md5sum);
EXPECT_EQ(orig_md5sum, download_md5sum);
status = _fs->upload(_test_file + "_not_found", _base_path + "/Ode_to_the_West_Wind1.txt");
EXPECT_FALSE(status.ok());
status = _fs->exist(_base_path + "/Ode_to_the_West_Wind1.txt");
EXPECT_EQ(TStatusCode::NOT_FOUND, status.code());
}
TEST_F(HDFSStorageBackendTest, hdfs_direct_upload) {
Status status = _fs->direct_upload(_base_path + "/Ode_to_the_West_Wind.txt", _content);
EXPECT_TRUE(status.ok());
status = _fs->exist(_base_path + "/Ode_to_the_West_Wind.txt");
EXPECT_TRUE(status.ok());
std::string orig_md5sum;
FileUtils::md5sum(_test_file, &orig_md5sum);
status = _fs->download(_base_path + "/Ode_to_the_West_Wind.txt", _test_file + ".download");
EXPECT_TRUE(status.ok());
std::string download_md5sum;
FileUtils::md5sum(_test_file + ".download", &download_md5sum);
EXPECT_EQ(orig_md5sum, download_md5sum);
}
TEST_F(HDFSStorageBackendTest, hdfs_download) {
Status status = _fs->upload(_test_file, _base_path + "/Ode_to_the_West_Wind.txt");
EXPECT_TRUE(status.ok());
std::string orig_md5sum;
FileUtils::md5sum(_test_file, &orig_md5sum);
status = _fs->download(_base_path + "/Ode_to_the_West_Wind.txt", _test_file + ".download");
EXPECT_TRUE(status.ok());
std::string download_md5sum;
FileUtils::md5sum(_test_file + ".download", &download_md5sum);
EXPECT_EQ(orig_md5sum, download_md5sum);
status = _fs->download(_base_path + "/Ode_to_the_West_Wind.txt.not_found",
_test_file + ".download");
EXPECT_FALSE(status.ok());
status = _fs->download(_base_path + "/Ode_to_the_West_Wind.txt.not_found",
"/not_permitted.download");
EXPECT_FALSE(status.ok());
}
TEST_F(HDFSStorageBackendTest, hdfs_rename) {
Status status = _fs->direct_upload(_base_path + "/Ode_to_the_West_Wind.txt", _content);
EXPECT_TRUE(status.ok());
status = _fs->rename(_base_path + "/Ode_to_the_West_Wind.txt",
_base_path + "/Ode_to_the_West_Wind.txt.new");
EXPECT_TRUE(status.ok());
status = _fs->exist(_base_path + "/Ode_to_the_West_Wind.txt.new");
EXPECT_TRUE(status.ok());
}
TEST_F(HDFSStorageBackendTest, hdfs_list) {
Status status = _fs->direct_upload(_base_path + "/Ode_to_the_West_Wind.md5", _content);
EXPECT_TRUE(status.ok());
status = _fs->direct_upload(_base_path + "/Ode_to_the_West_Wind1.md5", _content);
EXPECT_TRUE(status.ok());
status = _fs->direct_upload(_base_path + "/Ode_to_the_West_Wind2.md5", _content);
EXPECT_TRUE(status.ok());
std::map<std::string, FileStat> files;
status = _fs->list(_base_path, true, false, &files);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(files.find("Ode_to_the_West_Wind") != files.end());
EXPECT_TRUE(files.find("Ode_to_the_West_Wind1") != files.end());
EXPECT_TRUE(files.find("Ode_to_the_West_Wind2") != files.end());
EXPECT_EQ(3, files.size());
}
TEST_F(HDFSStorageBackendTest, hdfs_rm) {
Status status = _fs->direct_upload(_base_path + "/Ode_to_the_West_Wind.txt", _content);
EXPECT_TRUE(status.ok());
status = _fs->exist(_base_path + "/Ode_to_the_West_Wind.txt");
EXPECT_TRUE(status.ok());
status = _fs->rm(_base_path + "/Ode_to_the_West_Wind.txt");
EXPECT_TRUE(status.ok());
status = _fs->exist(_base_path + "/Ode_to_the_West_Wind.txt");
EXPECT_TRUE(status.code() == TStatusCode::NOT_FOUND);
}
} // namespace doris

View File

@ -83,7 +83,7 @@ ALTER TABLE tbl1 SET ("dynamic_partition.enable"="true")
```sql
BACKUP SNAPSHOT example_db.snapshot_label2
TO example_repo
ON
ON
(
example_tbl PARTITION (p1,p2),
example_tbl2
@ -104,9 +104,9 @@ ALTER TABLE tbl1 SET ("dynamic_partition.enable"="true")
SnapshotFinishedTime: 2022-04-08 15:52:32
UploadFinishedTime: 2022-04-08 15:52:38
FinishedTime: 2022-04-08 15:52:44
UnfinishedTasks:
Progress:
TaskErrMsg:
UnfinishedTasks:
Progress:
TaskErrMsg:
Status: [OK]
Timeout: 86400
1 row in set (0.01 sec)
@ -156,7 +156,7 @@ It is recommended to import the new and old clusters in parallel for a period of
1. CREATE REPOSITORY
Create a remote repository path for backup or restore. This command needs to use the Broker process to access the remote storage. Different brokers need to provide different parameters. For details, please refer to [Broker documentation](../../advanced/broker.md), or you can directly back up to support through the S3 protocol For the remote storage of AWS S3 protocol, please refer to [Create Remote Warehouse Documentation](../../sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/CREATE-REPOSITORY.md )
Create a remote repository path for backup or restore. This command needs to use the Broker process to access the remote storage. Different brokers need to provide different parameters. For details, please refer to [Broker documentation](../../advanced/broker.md), or you can directly back up to support through the S3 protocol For the remote storage of AWS S3 protocol, or directly back up to HDFS, please refer to [Create Remote Warehouse Documentation](../../sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/CREATE-REPOSITORY.md )
2. BACKUP

View File

@ -118,9 +118,9 @@ The restore operation needs to specify an existing backup in the remote warehous
SnapshotFinishedTime: 2022-04-08 15:59:05
DownloadFinishedTime: 2022-04-08 15:59:12
FinishedTime: 2022-04-08 15:59:18
UnfinishedTasks:
Progress:
TaskErrMsg:
UnfinishedTasks:
Progress:
TaskErrMsg:
Status: [OK]
Timeout: 86400
1 row in set (0.01 sec)
@ -134,7 +134,7 @@ The commands related to the backup and restore function are as follows. For the
1. CREATE REPOSITORY
Create a remote repository path for backup or restore. This command needs to use the Broker process to access the remote storage. Different brokers need to provide different parameters. For details, please refer to [Broker documentation](../../advanced/broker.md), or you can directly back up to support through the S3 protocol For the remote storage of AWS S3 protocol, please refer to [Create Remote Warehouse Documentation](../../sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/CREATE-REPOSITORY.md )
Create a remote repository path for backup or restore. This command needs to use the Broker process to access the remote storage. Different brokers need to provide different parameters. For details, please refer to [Broker documentation](../../advanced/broker.md), or you can directly back up to support through the S3 protocol For the remote storage of AWS S3 protocol, directly back up to HDFS, please refer to [Create Remote Warehouse Documentation](../../sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/CREATE-REPOSITORY.md )
2. RESTORE

View File

@ -38,16 +38,16 @@ grammar:
```sql
CREATE [READ ONLY] REPOSITORY `repo_name`
WITH [BROKER `broker_name`|S3]
WITH [BROKER `broker_name`|S3|hdfs]
ON LOCATION `repo_location`
PROPERTIES ("key"="value", ...);
```
illustrate:
- Creation of repositories, relying on existing brokers or accessing cloud storage directly through AWS s3 protocol
- Creation of repositories, relying on existing brokers or accessing cloud storage directly through AWS s3 protocol, or accessing HDFS directly.
- If it is a read-only repository, restores can only be done on the repository. If not, backup and restore operations are available.
- PROPERTIES are different according to different types of broker or S3, see the example for details.
- PROPERTIES are different according to different types of broker or S3 or hdfs, see the example for details.
### Example
@ -107,6 +107,18 @@ PROPERTIES
);
```
5. Create a repository named hdfs_repo to link HDFS directly without going through the broker.
```sql
CREATE REPOSITORY `hdfs_repo`
WITH hdfs
ON LOCATION "hdfs://hadoop-name-node:54310/path/to/repo/"
PROPERTIES
(
"fs.defaultFS"="hdfs://hadoop-name-node:54310",
"hadoop.username"="user"
);
### Keywords
```

View File

@ -83,7 +83,7 @@ Doris 支持将当前数据以文件的形式,通过 broker 备份到远端存
```sql
BACKUP SNAPSHOT example_db.snapshot_label2
TO example_repo
ON
ON
(
example_tbl PARTITION (p1,p2),
example_tbl2
@ -104,9 +104,9 @@ Doris 支持将当前数据以文件的形式,通过 broker 备份到远端存
SnapshotFinishedTime: 2022-04-08 15:52:32
UploadFinishedTime: 2022-04-08 15:52:38
FinishedTime: 2022-04-08 15:52:44
UnfinishedTasks:
Progress:
TaskErrMsg:
UnfinishedTasks:
Progress:
TaskErrMsg:
Status: [OK]
Timeout: 86400
1 row in set (0.01 sec)
@ -156,7 +156,7 @@ BACKUP的更多用法可参考 [这里](../../sql-manual/sql-reference/Data-Defi
1. CREATE REPOSITORY
创建一个远端仓库路径,用于备份或恢复。该命令需要借助 Broker 进程访问远端存储,不同的 Broker 需要提供不同的参数,具体请参阅 [Broker文档](../../advanced/broker.md),也可以直接通过S3 协议备份到支持AWS S3协议的远程存储上去,具体参考 [创建远程仓库文档](../../sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/CREATE-REPOSITORY.md)
创建一个远端仓库路径,用于备份或恢复。该命令需要借助 Broker 进程访问远端存储,不同的 Broker 需要提供不同的参数,具体请参阅 [Broker文档](../../advanced/broker.md),也可以直接通过S3 协议备份到支持AWS S3协议的远程存储上去,也可以直接备份到HDFS,具体参考 [创建远程仓库文档](../../sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/CREATE-REPOSITORY.md)
2. BACKUP

View File

@ -55,7 +55,7 @@ Doris 支持将当前数据以文件的形式,通过 broker 备份到远端存
## 开始恢复
1. 从 example_repo 中恢复备份 snapshot_1 中的表 backup_tbl 到数据库 example_db1,时间版本为 "2018-05-04-16-45-08"。恢复为 1 个副本:
```sql
RESTORE SNAPSHOT example_db1.`snapshot_1`
FROM `example_repo`
@ -66,9 +66,9 @@ Doris 支持将当前数据以文件的形式,通过 broker 备份到远端存
"replication_num" = "1"
);
```
2. 从 example_repo 中恢复备份 snapshot_2 中的表 backup_tbl 的分区 p1,p2,以及表 backup_tbl2 到数据库 example_db1,并重命名为 new_tbl,时间版本为 "2018-05-04-17-11-01"。默认恢复为 3 个副本:
```sql
RESTORE SNAPSHOT example_db1.`snapshot_2`
FROM `example_repo`
@ -118,9 +118,9 @@ Doris 支持将当前数据以文件的形式,通过 broker 备份到远端存
SnapshotFinishedTime: 2022-04-08 15:59:05
DownloadFinishedTime: 2022-04-08 15:59:12
FinishedTime: 2022-04-08 15:59:18
UnfinishedTasks:
Progress:
TaskErrMsg:
UnfinishedTasks:
Progress:
TaskErrMsg:
Status: [OK]
Timeout: 86400
1 row in set (0.01 sec)
@ -134,7 +134,7 @@ RESTORE的更多用法可参考 [这里](../../sql-manual/sql-reference/Data-Def
1. CREATE REPOSITORY
创建一个远端仓库路径,用于备份或恢复。该命令需要借助 Broker 进程访问远端存储,不同的 Broker 需要提供不同的参数,具体请参阅 [Broker文档](../../advanced/broker.md),也可以直接通过S3 协议备份到支持AWS S3协议的远程存储上去,具体参考 [创建远程仓库文档](../../sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/CREATE-REPOSITORY.md)
创建一个远端仓库路径,用于备份或恢复。该命令需要借助 Broker 进程访问远端存储,不同的 Broker 需要提供不同的参数,具体请参阅 [Broker文档](../../advanced/broker.md),也可以直接通过S3 协议备份到支持AWS S3协议的远程存储上去,也可以直接备份到HDFS,具体参考 [创建远程仓库文档](../../sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/CREATE-REPOSITORY.md)
2. RESTORE

View File

@ -38,16 +38,16 @@ CREATE REPOSITORY
```sql
CREATE [READ ONLY] REPOSITORY `repo_name`
WITH [BROKER `broker_name`|S3]
WITH [BROKER `broker_name`|S3|hdfs]
ON LOCATION `repo_location`
PROPERTIES ("key"="value", ...);
```
说明:
- 仓库的创建,依赖于已存在的 broker 或者直接通过AWS s3 协议访问云存储
- 仓库的创建,依赖于已存在的 broker 或者直接通过AWS s3 协议访问云存储,或者直接访问HDFS
- 如果是只读仓库,则只能在仓库上进行恢复。如果不是,则可以进行备份和恢复操作。
- 根据 broker 或者S3的不同类型,PROPERTIES 有所不同,具体见示例。
- 根据 broker 或者S3、hdfs的不同类型,PROPERTIES 有所不同,具体见示例。
### Example
@ -107,6 +107,19 @@ PROPERTIES
);
```
5. 创建名为 hdfs_repo 的仓库,直接链接HDFS,而不通过broker.
```sql
CREATE REPOSITORY `hdfs_repo`
WITH hdfs
ON LOCATION "hdfs://hadoop-name-node:54310/path/to/repo/"
PROPERTIES
(
"fs.defaultFS"="hdfs://hadoop-name-node:54310",
"hadoop.username"="user"
);
```
### Keywords
CREATE, REPOSITORY

View File

@ -85,7 +85,8 @@ public class StorageBackend extends StorageDesc implements ParseNode {
if (this.storageType != StorageType.BROKER && StringUtils.isEmpty(name)) {
name = this.storageType.name();
}
if (this.storageType != StorageType.BROKER && this.storageType != StorageType.S3) {
if (this.storageType != StorageType.BROKER && this.storageType != StorageType.S3
&& this.storageType != StorageType.HDFS) {
throw new NotImplementedException(this.storageType.toString() + " is not support now.");
}
FeNameFormat.checkCommonName("repository", name);

View File

@ -46,6 +46,8 @@ public abstract class BlobStorage implements Writable {
public static BlobStorage create(String name, StorageBackend.StorageType type, Map<String, String> properties) {
if (type == StorageBackend.StorageType.S3) {
return new S3Storage(properties);
} else if (type == StorageBackend.StorageType.HDFS) {
return new HdfsStorage(properties);
} else if (type == StorageBackend.StorageType.BROKER) {
return new BrokerStorage(name, properties);
} else {

View File

@ -17,17 +17,540 @@
package org.apache.doris.backup;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.AuthType;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.URI;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
// TODO: extend BlobStorage
public class HdfsStorage {
public static final String HDFS_DEFAULT_FS = "fs.defaultFS";
/**
* HdfsStorage encapsulate interfaces accessing HDFS directly.
*
*/
public class HdfsStorage extends BlobStorage {
private static final Logger LOG = LogManager.getLogger(HdfsStorage.class);
private final Map<String, String> caseInsensitiveProperties;
private final int readBufferSize = 128 << 10; // 128k
private final int writeBufferSize = 128 << 10; // 128k
private FileSystem dfsFileSystem = null;
/**
* init HdfsStorage with properties.
*
* @param properties parameters to access HDFS.
*/
public HdfsStorage(Map<String, String> properties) {
caseInsensitiveProperties = new CaseInsensitiveMap();
setProperties(properties);
setType(StorageBackend.StorageType.HDFS);
setName(StorageBackend.StorageType.HDFS.name());
}
public static void checkHDFS(Map<String, String> properties) throws UserException {
if (!properties.containsKey(HDFS_DEFAULT_FS)) {
throw new UserException(HDFS_DEFAULT_FS + " not found. This is required field");
if (!properties.containsKey(BrokerUtil.HADOOP_FS_NAME)) {
throw new UserException(
String.format("The properties of hdfs is invalid. %s are needed", BrokerUtil.HADOOP_FS_NAME));
}
}
private FileSystem getFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
checkHDFS(caseInsensitiveProperties);
String hdfsFsName = caseInsensitiveProperties.get(BrokerUtil.HADOOP_FS_NAME).toString();
String username = caseInsensitiveProperties.get(BrokerUtil.HADOOP_USER_NAME).toString();
Configuration conf = new Configuration();
boolean isSecurityEnabled = false;
for (Map.Entry<String, String> propEntry : caseInsensitiveProperties.entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
if (propEntry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
&& propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
isSecurityEnabled = true;
}
}
try {
if (isSecurityEnabled) {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
caseInsensitiveProperties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
caseInsensitiveProperties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
}
dfsFileSystem = FileSystem.get(java.net.URI.create(hdfsFsName), conf, username);
} catch (Exception e) {
LOG.error("errors while connect to " + remotePath, e);
throw new UserException("errors while connect to " + remotePath, e);
}
}
return dfsFileSystem;
}
@Override
public void setProperties(Map<String, String> properties) {
super.setProperties(properties);
caseInsensitiveProperties.putAll(properties);
}
@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize);
final long start = System.currentTimeMillis();
FSDataInputStream fsDataInputStream = null;
try {
fsDataInputStream = openReader(remoteFilePath, 0);
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
LOG.info("finished to open reader. download {} to {}.", remoteFilePath, localFilePath);
// delete local file if exist
File localFile = new File(localFilePath);
if (localFile.exists()) {
try {
Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS).sorted(Comparator.reverseOrder())
.map(java.nio.file.Path::toFile).forEach(File::delete);
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to delete exist local file: " + localFilePath + ", msg: " + e.getMessage());
}
}
// create local file
try {
if (!localFile.createNewFile()) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + localFilePath);
}
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to create local file: " + localFilePath + ", msg: " + e.getMessage());
}
String lastErrMsg = null;
Status status = Status.OK;
try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(localFile))) {
final long bufSize = 1024 * 1024; // 1MB
long leftSize = fileSize;
long readOffset = 0;
while (leftSize > 0) {
long readLen = Math.min(leftSize, bufSize);
try {
ByteBuffer data = pread(fsDataInputStream, readOffset, readLen);
if (readLen != data.array().length) {
LOG.warn(
"the actual read length does not equal to "
+ "the expected read length: {} vs. {}, file: {}",
data.array().length, readLen, remoteFilePath);
}
// write local file
out.write(data.array());
readOffset += data.array().length;
leftSize -= data.array().length;
} catch (Exception e) {
lastErrMsg = String.format(
"failed to read. " + "current read offset: %d, read length: %d,"
+ " file size: %d, file: %s. msg: %s",
readOffset, readLen, fileSize, remoteFilePath, e.getMessage());
LOG.warn(lastErrMsg);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
}
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e.getMessage());
} finally {
Status closeStatus = closeReader(fsDataInputStream);
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
// we return close write error only if no other error has been encountered.
status = closeStatus;
}
}
}
LOG.info("finished to download from {} to {} with size: {}. cost {} ms", remoteFilePath, localFilePath,
fileSize, (System.currentTimeMillis() - start));
return status;
}
private int readBytesFully(FSDataInputStream is, byte[] dest) throws IOException {
int readLength = 0;
while (readLength < dest.length) {
int availableReadLength = dest.length - readLength;
int n = is.read(dest, readLength, availableReadLength);
if (n <= 0) {
break;
}
readLength += n;
}
return readLength;
}
/**
* read data from fsDataInputStream.
*
* @param fsDataInputStream input stream for read.
* @param readOffset read offset.
* @param length read length.
* @return ByteBuffer
* @throws IOException when read data error.
*/
public ByteBuffer pread(FSDataInputStream fsDataInputStream, long readOffset, long length) throws IOException {
synchronized (fsDataInputStream) {
long currentStreamOffset;
try {
currentStreamOffset = fsDataInputStream.getPos();
} catch (IOException e) {
LOG.error("errors while get file pos from output stream", e);
throw new IOException("errors while get file pos from output stream", e);
}
if (currentStreamOffset != readOffset) {
// it's ok, when reading some format like parquet, it is not a sequential read
LOG.debug("invalid offset, current read offset is " + currentStreamOffset
+ " is not equal to request offset " + readOffset + " seek to it");
try {
fsDataInputStream.seek(readOffset);
} catch (IOException e) {
throw new IOException(String.format(
"current read offset %d is not equal to %d, and could not seek to it, msg: %s",
currentStreamOffset, readOffset, e.getMessage()));
}
}
// Avoid using the ByteBuffer based read for Hadoop because some
// FSDataInputStream
// implementations are not ByteBufferReadable,
// See https://issues.apache.org/jira/browse/HADOOP-14603
byte[] buf;
if (length > readBufferSize) {
buf = new byte[readBufferSize];
} else {
buf = new byte[(int) length];
}
try {
int readLength = readBytesFully(fsDataInputStream, buf);
if (readLength < 0) {
throw new IOException("end of file reached");
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"read buffer from input stream, buffer size:" + buf.length + ", read length:" + readLength);
}
return ByteBuffer.wrap(buf, 0, readLength);
} catch (IOException e) {
LOG.error("errors while read data from stream", e);
throw new IOException("errors while read data from stream " + e.getMessage());
}
}
}
@Override
public Status directUpload(String content, String remoteFile) {
FSDataOutputStream fsDataOutputStream = null;
try {
fsDataOutputStream = openWriter(remoteFile);
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
LOG.info("finished to open writer. directly upload to remote path {}.", remoteFile);
Status status = Status.OK;
try {
fsDataOutputStream.writeBytes(content);
} catch (IOException e) {
LOG.error("errors while write data to output stream", e);
status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: " + e.getMessage());
} finally {
Status closeStatus = closeWriter(fsDataOutputStream);
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
status = closeStatus;
}
}
}
return status;
}
/**
* open remotePath for write.
*
* @param remotePath hdfs://namenode:port/path.
* @return FSDataOutputStream
* @throws UserException when get filesystem failed.
* @throws IOException when open path error.
*/
public FSDataOutputStream openWriter(String remotePath) throws UserException, IOException {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
FileSystem fileSystem = getFileSystem(remotePath);
try {
return fileSystem.create(inputFilePath, true, writeBufferSize);
} catch (IOException e) {
LOG.error("errors while open path", e);
throw new IOException(e.getMessage());
}
}
/**
* close for write.
*
* @param fsDataOutputStream output stream.
* @return Status.OK if success.
*/
public Status closeWriter(FSDataOutputStream fsDataOutputStream) {
synchronized (fsDataOutputStream) {
try {
fsDataOutputStream.flush();
fsDataOutputStream.close();
LOG.info("finished to close writer");
} catch (IOException e) {
LOG.error("errors while close file output stream", e);
return new Status(Status.ErrCode.COMMON_ERROR, "failed to close writer, msg:" + e.getMessage());
}
}
return Status.OK;
}
/**
* open remotePath for read.
*
* @param remotePath hdfs://namenode:port/path.
* @param startOffset the offset to read.
* @return FSDataInputStream if success.
* @throws UserException when get filesystem failed.
* @throws IOException when open file error.
*/
public FSDataInputStream openReader(String remotePath, long startOffset) throws UserException, IOException {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
FileSystem fileSystem = getFileSystem(remotePath);
try {
FSDataInputStream fsDataInputStream = fileSystem.open(inputFilePath, readBufferSize);
fsDataInputStream.seek(startOffset);
return fsDataInputStream;
} catch (IOException e) {
LOG.error("errors while open path", e);
throw new IOException(e.getMessage());
}
}
/**
* close for read.
*
* @param fsDataInputStream the input stream.
* @return Status.OK if success.
*/
public Status closeReader(FSDataInputStream fsDataInputStream) {
synchronized (fsDataInputStream) {
try {
fsDataInputStream.close();
} catch (IOException e) {
LOG.error("errors while close file input stream", e);
return new Status(Status.ErrCode.COMMON_ERROR,
"errors while close file input stream, msg: " + e.getMessage());
}
}
return Status.OK;
}
@Override
public Status upload(String localPath, String remotePath) {
long start = System.currentTimeMillis();
LOG.debug("local path {}, remote path {}", localPath, remotePath);
FSDataOutputStream fsDataOutputStream = null;
try {
fsDataOutputStream = openWriter(remotePath);
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
LOG.info("finished to open writer. directly upload to remote path {}.", remotePath);
// read local file and write remote
File localFile = new File(localPath);
long fileLength = localFile.length();
byte[] readBuf = new byte[1024];
Status status = new Status(Status.ErrCode.OK, "");
try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(localFile))) {
// save the last err msg
String lastErrMsg = null;
// save the current write offset of remote file
long writeOffset = 0;
// read local file, 1MB at a time
int bytesRead;
while ((bytesRead = in.read(readBuf)) != -1) {
try {
fsDataOutputStream.write(readBuf, 0, bytesRead);
} catch (IOException e) {
LOG.error("errors while write data to output stream", e);
lastErrMsg = String.format(
"failed to write hdfs. current write offset: %d, write length: %d, "
+ "file length: %d, file: %s, msg: errors while write data to output stream",
writeOffset, bytesRead, fileLength, remotePath);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
// write succeed, update current write offset
writeOffset += bytesRead;
} // end of read local file loop
} catch (FileNotFoundException e1) {
return new Status(Status.ErrCode.COMMON_ERROR, "encounter file not found exception: " + e1.getMessage());
} catch (IOException e1) {
return new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e1.getMessage());
} finally {
Status closeStatus = closeWriter(fsDataOutputStream);
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
// we return close write error only if no other error has been encountered.
status = closeStatus;
}
}
}
if (status.ok()) {
LOG.info("finished to upload {} to remote path {}. cost: {} ms", localPath, remotePath,
(System.currentTimeMillis() - start));
}
return status;
}
@Override
public Status rename(String srcPath, String destPath) {
long start = System.currentTimeMillis();
try {
URI srcPathUri = URI.create(srcPath);
URI destPathUri = URI.create(destPath);
if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) {
return new Status(Status.ErrCode.COMMON_ERROR, "only allow rename in same file system");
}
FileSystem fileSystem = getFileSystem(destPath);
Path srcfilePath = new Path(srcPathUri.getPath());
Path destfilePath = new Path(destPathUri.getPath());
boolean isRenameSuccess = fileSystem.rename(srcfilePath, destfilePath);
if (!isRenameSuccess) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename " + srcPath + " to " + destPath);
}
} catch (UserException e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
} catch (IOException e) {
LOG.error("errors while rename path from " + srcPath + " to " + destPath);
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to rename remote " + srcPath + " to " + destPath + ", msg: " + e.getMessage());
}
LOG.info("finished to rename {} to {}. cost: {} ms", srcPath, destPath, (System.currentTimeMillis() - start));
return Status.OK;
}
@Override
public Status delete(String remotePath) {
try {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
FileSystem fileSystem = getFileSystem(remotePath);
fileSystem.delete(inputFilePath, true);
} catch (UserException e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
} catch (IOException e) {
LOG.error("errors while delete path " + remotePath);
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to delete remote path: " + remotePath + ", msg: " + e.getMessage());
}
LOG.info("finished to delete remote path {}.", remotePath);
return Status.OK;
}
@Override
public Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
}
/**
* get files in remotePath of HDFS.
*
* @param remotePath hdfs://namenode:port/path.
* @param result files in remotePath.
* @param fileNameOnly means get file only in remotePath if true.
* @return Status.OK if success.
*/
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
URI pathUri = URI.create(remotePath);
FileSystem fileSystem = getFileSystem(remotePath);
Path pathPattern = new Path(pathUri.getPath());
FileStatus[] files = fileSystem.globStatus(pathPattern);
if (files == null) {
LOG.info("no files in path " + remotePath);
return Status.OK;
}
for (FileStatus fileStatus : files) {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen());
result.add(remoteFile);
}
} catch (FileNotFoundException e) {
LOG.info("file not found: " + e.getMessage());
return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage());
} catch (Exception e) {
LOG.error("errors while get file status ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage());
}
LOG.info("finish list path {}", remotePath);
return Status.OK;
}
@Override
public Status makeDir(String remotePath) {
return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not implemented.");
}
@Override
public Status checkPathExist(String remotePath) {
try {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
FileSystem fileSystem = getFileSystem(remotePath);
boolean isPathExist = fileSystem.exists(inputFilePath);
if (!isPathExist) {
return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath);
}
return Status.OK;
} catch (Exception e) {
LOG.error("errors while check path exist " + remotePath, e);
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to check remote path exist: " + remotePath + ". msg: " + e.getMessage());
}
}
@Override
public StorageBackend.StorageType getStorageType() {
return StorageBackend.StorageType.HDFS;
}
}

View File

@ -484,6 +484,18 @@ public class Repository implements Writable {
return st;
}
// upload final file
st = storage.upload(localFilePath, finalRemotePath);
if (!st.ok()) {
return st;
}
} else if (storage instanceof HdfsStorage) {
LOG.debug("hdfs get md5sum of file: {}. final remote path: {}", localFilePath, finalRemotePath);
st = storage.delete(finalRemotePath);
if (!st.ok()) {
return st;
}
// upload final file
st = storage.upload(localFilePath, finalRemotePath);
if (!st.ok()) {