[Feature] Support backup,restore,load,export directly connect to s3 (#5399)
* [doris-1008] support backup and restore directly to cloud storage via aws s3 protocol * Internal][S3DirectAccess] Support backup,restore,load,export directlyconnect to s3 1. Support load and export data from/to s3 directly. 2. Add a config to auto convert broker access to s3 acces when available Change-Id: Iac96d4b3670776708bc96a119ff491db8cb4cde7 (cherry picked from commit 2f03832ca52221cc7436069b96c45c48c4bc7201) * [Internal][S3DirectAccess] File path glob compatible with broker Change-Id: Ie55e07a547aa22c6fa8d432ca926216c10384e68 (cherry picked from commit d4fb25544c0dc06d23e1ada571ec3f8edd4ba56f) * [internal] [doris-1008] fix log4j class not found Change-Id: I468176aca0d821383c74ee658d461aba9e7d5be3 (cherry picked from commit 029adaa9d6ded8503acbd6644c1519456f3db232) * add poms Co-authored-by: yangzhengguo01 <yangzhengguo01@baidu.com>
This commit is contained in:
142
be/src/exec/s3_reader.cpp
Normal file
142
be/src/exec/s3_reader.cpp
Normal file
@ -0,0 +1,142 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "exec/s3_reader.h"
|
||||
|
||||
#include <aws/s3/S3Client.h>
|
||||
#include <aws/s3/model/GetObjectRequest.h>
|
||||
#include <aws/s3/model/HeadObjectRequest.h>
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "gutil/strings/strcat.h"
|
||||
#include "util/s3_util.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
#ifndef CHECK_S3_CLIENT
|
||||
#define CHECK_S3_CLIENT(client) \
|
||||
if (!client) { \
|
||||
return Status::InternalError("init aws s3 client error."); \
|
||||
}
|
||||
#endif
|
||||
|
||||
S3Reader::S3Reader(const std::map<std::string, std::string>& properties, const std::string& path,
|
||||
int64_t start_offset)
|
||||
: _properties(properties),
|
||||
_path(path),
|
||||
_uri(path),
|
||||
_cur_offset(start_offset),
|
||||
_file_size(0),
|
||||
_closed(false) {
|
||||
_client = create_client(_properties);
|
||||
DCHECK(_client) << "init aws s3 client error.";
|
||||
}
|
||||
|
||||
S3Reader::~S3Reader() {}
|
||||
|
||||
Status S3Reader::open() {
|
||||
CHECK_S3_CLIENT(_client);
|
||||
if (!_uri.parse()) {
|
||||
return Status::InvalidArgument("s3 uri is invalid: " + _path);
|
||||
}
|
||||
Aws::S3::Model::HeadObjectRequest request;
|
||||
request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
|
||||
Aws::S3::Model::HeadObjectOutcome response = _client->HeadObject(request);
|
||||
if (response.IsSuccess()) {
|
||||
_file_size = response.GetResult().GetContentLength();
|
||||
return Status::OK();
|
||||
} else if (response.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
|
||||
return Status::NotFound(_path + " not exists!");
|
||||
} else {
|
||||
std::stringstream out;
|
||||
out << "Error: [" << response.GetError().GetExceptionName() << ":"
|
||||
<< response.GetError().GetMessage();
|
||||
return Status::InternalError(out.str());
|
||||
}
|
||||
}
|
||||
Status S3Reader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
|
||||
DCHECK_NE(*buf_len, 0);
|
||||
RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, (int64_t*)buf_len, buf));
|
||||
if (*buf_len == 0 ) {
|
||||
*eof = true;
|
||||
} else {
|
||||
*eof = false;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
|
||||
CHECK_S3_CLIENT(_client);
|
||||
if (position >= _file_size) {
|
||||
*bytes_read = 0;
|
||||
VLOG_FILE << "Read end of file: " + _path;
|
||||
return Status::EndOfFile("Read end of file: " + _path);
|
||||
}
|
||||
Aws::S3::Model::GetObjectRequest request;
|
||||
request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
|
||||
string bytes = StrCat("bytes=", position, "-");
|
||||
if (position + nbytes < _file_size) {
|
||||
string bytes = StrCat(bytes.c_str(), position + nbytes - 1);
|
||||
}
|
||||
request.SetRange(bytes.c_str());
|
||||
auto response = _client->GetObject(request);
|
||||
if (!response.IsSuccess()) {
|
||||
*bytes_read = 0;
|
||||
std::stringstream out;
|
||||
out << "Error: [" << response.GetError().GetExceptionName() << ":"
|
||||
<< response.GetError().GetMessage();
|
||||
LOG(INFO) << out.str();
|
||||
return Status::InternalError(out.str());
|
||||
}
|
||||
*bytes_read = response.GetResult().GetContentLength();
|
||||
*bytes_read = nbytes < *bytes_read ? nbytes : *bytes_read;
|
||||
_cur_offset = position + *bytes_read;
|
||||
response.GetResult().GetBody().read((char*)out, *bytes_read);
|
||||
return Status::OK();
|
||||
}
|
||||
Status S3Reader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
|
||||
bool eof;
|
||||
int64_t file_size = size() - _cur_offset;
|
||||
if (file_size <= 0) {
|
||||
buf->reset();
|
||||
*length = 0;
|
||||
return Status::OK();
|
||||
}
|
||||
*length = file_size;
|
||||
buf->reset(new uint8_t[file_size]);
|
||||
read(buf->get(), length, &eof);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int64_t S3Reader::size() {
|
||||
return _file_size;
|
||||
}
|
||||
Status S3Reader::seek(int64_t position) {
|
||||
_cur_offset = position;
|
||||
return Status::OK();
|
||||
}
|
||||
Status S3Reader::tell(int64_t* position) {
|
||||
*position = _cur_offset;
|
||||
return Status::OK();
|
||||
}
|
||||
void S3Reader::close() {
|
||||
_closed = true;
|
||||
}
|
||||
bool S3Reader::closed() {
|
||||
return _closed;
|
||||
}
|
||||
|
||||
} // end namespace doris
|
||||
Reference in New Issue
Block a user