add read buffer when use s3 reader (#5791)

This commit is contained in:
Zhengguo Yang
2021-05-17 11:46:38 +08:00
committed by GitHub
parent e7a6d659a9
commit 01a45e8691
22 changed files with 180 additions and 168 deletions

View File

@ -49,8 +49,8 @@ S3Reader::S3Reader(const std::map<std::string, std::string>& properties, const s
S3Reader::~S3Reader() {}
Status S3Reader::open() {
CHECK_S3_CLIENT(_client);
if (!_uri.parse()) {
CHECK_S3_CLIENT(_client);
if (!_uri.parse()) {
return Status::InvalidArgument("s3 uri is invalid: " + _path);
}
Aws::S3::Model::HeadObjectRequest request;
@ -68,10 +68,10 @@ Status S3Reader::open() {
return Status::InternalError(out.str());
}
}
Status S3Reader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
DCHECK_NE(*buf_len, 0);
RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, (int64_t*)buf_len, buf));
if (*buf_len == 0 ) {
Status S3Reader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) {
DCHECK_NE(buf_len, 0);
RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf));
if (*bytes_read == 0) {
*eof = true;
} else {
*eof = false;
@ -83,13 +83,13 @@ Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, v
if (position >= _file_size) {
*bytes_read = 0;
VLOG_FILE << "Read end of file: " + _path;
return Status::EndOfFile("Read end of file: " + _path);
return Status::OK();
}
Aws::S3::Model::GetObjectRequest request;
request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
string bytes = StrCat("bytes=", position, "-");
if (position + nbytes < _file_size) {
string bytes = StrCat(bytes.c_str(), position + nbytes - 1);
bytes = StrCat(bytes.c_str(), position + nbytes - 1);
}
request.SetRange(bytes.c_str());
auto response = _client->GetObject(request);
@ -107,7 +107,7 @@ Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, v
response.GetResult().GetBody().read((char*)out, *bytes_read);
return Status::OK();
}
Status S3Reader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
Status S3Reader::read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) {
bool eof;
int64_t file_size = size() - _cur_offset;
if (file_size <= 0) {
@ -115,9 +115,8 @@ Status S3Reader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* lengt
*length = 0;
return Status::OK();
}
*length = file_size;
buf->reset(new uint8_t[file_size]);
read(buf->get(), length, &eof);
read(buf->get(), file_size, length, &eof);
return Status::OK();
}