From 01a45e8691daec81e296bd7a109fe38db6438e6f Mon Sep 17 00:00:00 2001 From: Zhengguo Yang Date: Mon, 17 May 2021 11:46:38 +0800 Subject: [PATCH] add read buffer when use s3 reader (#5791) --- be/src/common/config.h | 12 ++--- be/src/exec/broker_reader.cpp | 10 ++-- be/src/exec/broker_reader.h | 4 +- be/src/exec/broker_scanner.cpp | 31 +++++------ be/src/exec/buffered_reader.cpp | 28 +++++----- be/src/exec/buffered_reader.h | 7 +-- be/src/exec/file_reader.h | 9 ++-- be/src/exec/hdfs_file_reader.cpp | 28 +++++----- be/src/exec/hdfs_file_reader.h | 6 +-- be/src/exec/json_scanner.cpp | 10 ++-- be/src/exec/local_file_reader.cpp | 11 ++-- be/src/exec/local_file_reader.h | 4 +- be/src/exec/plain_text_line_reader.cpp | 16 +++--- be/src/exec/s3_reader.cpp | 21 ++++---- be/src/exec/s3_reader.h | 4 +- .../routine_load_task_executor.cpp | 5 +- be/src/runtime/stream_load/stream_load_pipe.h | 27 +++++----- be/src/util/broker_storage_backend.cpp | 5 +- be/test/exec/buffered_reader_test.cpp | 40 +++++++------- be/test/exec/s3_reader_test.cpp | 6 +-- be/test/runtime/kafka_consumer_pipe_test.cpp | 11 ++-- be/test/runtime/stream_load_pipe_test.cpp | 53 ++++++++++--------- 22 files changed, 180 insertions(+), 168 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 70e89e6e8d..c9a87909ff 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -247,7 +247,7 @@ CONF_mInt32(base_compaction_write_mbytes_per_sec, "5"); // lower write amplification, trading off read amplification and space amplification. CONF_mString(cumulative_compaction_policy, "size_based"); CONF_Validator(cumulative_compaction_policy, [](const std::string config) -> bool { - return config == "size_based" || config == "num_based"; + return config == "size_based" || config == "num_based"; }); // In size_based policy, output rowset of cumulative compaction total disk size exceed this config size, @@ -289,14 +289,12 @@ CONF_Int32(max_meta_checkpoint_threads, "-1"); CONF_mInt64(total_permits_for_compaction_score, "10000"); // sleep interval in ms after generated compaction tasks -CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10") +CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10"); // Compaction task number per disk. // Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each. CONF_mInt32(compaction_task_num_per_disk, "2"); -CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool { - return config >= 2; -}); +CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool { return config >= 2; }); // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation. CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9"); @@ -597,9 +595,11 @@ CONF_mInt32(zone_map_row_num_threshold, "20"); // Trace = 6 CONF_Int32(aws_log_level, "3"); +// the buffer size when read data from remote storage like s3 +CONF_mInt32(remote_storage_read_buffer_mb, "256"); + } // namespace config } // namespace doris #endif // DORIS_BE_SRC_COMMON_CONFIG_H - diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp index 64e587e65e..a720704ba4 100644 --- a/be/src/exec/broker_reader.cpp +++ b/be/src/exec/broker_reader.cpp @@ -122,14 +122,14 @@ Status BrokerReader::open() { } //not support -Status BrokerReader::read_one_message(std::unique_ptr* buf, size_t* length) { +Status BrokerReader::read_one_message(std::unique_ptr* buf, int64_t* length) { return Status::NotSupported("Not support"); } -Status BrokerReader::read(uint8_t* buf, size_t* buf_len, bool* eof) { - DCHECK_NE(*buf_len, 0); - RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, (int64_t*)buf_len, buf)); - if (*buf_len == 0) { +Status BrokerReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) { + DCHECK_NE(buf_len, 0); + RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf)); + if (*bytes_read == 0) { *eof = true; } else { *eof = false; diff --git a/be/src/exec/broker_reader.h b/be/src/exec/broker_reader.h index d6efc8e984..05a4b24c79 100644 --- a/be/src/exec/broker_reader.h +++ b/be/src/exec/broker_reader.h @@ -47,10 +47,10 @@ public: virtual Status open() override; // Read - virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; + virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; - virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, int64_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index c5087c147c..939fa929c8 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -21,8 +21,10 @@ #include #include "exec/broker_reader.h" +#include "exec/buffered_reader.h" #include "exec/decompressor.h" #include "exec/exec_node.h" +#include "exec/hdfs_file_reader.h" #include "exec/local_file_reader.h" #include "exec/plain_text_line_reader.h" #include "exec/s3_reader.h" @@ -36,14 +38,6 @@ #include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/stream_load_pipe.h" #include "runtime/tuple.h" -#include "exprs/expr.h" -#include "exec/text_converter.h" -#include "exec/text_converter.hpp" -#include "exec/plain_text_line_reader.h" -#include "exec/hdfs_file_reader.h" -#include "exec/local_file_reader.h" -#include "exec/broker_reader.h" -#include "exec/decompressor.h" #include "util/utf8_check.h" namespace doris { @@ -52,7 +46,7 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const std::vector& pre_filter_ctxs, + const std::vector& pre_filter_ctxs, ScannerCounter* counter) : BaseScanner(state, profile, params, pre_filter_ctxs, counter), _ranges(ranges), @@ -168,8 +162,9 @@ Status BrokerScanner::open_file_reader() { break; } case TFileType::FILE_HDFS: { - HdfsFileReader* file_reader = new HdfsFileReader( - range.hdfs_params, range.path, start_offset); + BufferedReader* file_reader = + new BufferedReader(new HdfsFileReader(range.hdfs_params, range.path, start_offset), + config::remote_storage_read_buffer_mb * 1024 * 1024); RETURN_IF_ERROR(file_reader->open()); _cur_file_reader = file_reader; break; @@ -183,7 +178,9 @@ Status BrokerScanner::open_file_reader() { break; } case TFileType::FILE_S3: { - S3Reader* s3_reader = new S3Reader(_params.properties, range.path, start_offset); + BufferedReader* s3_reader = + new BufferedReader(new S3Reader(_params.properties, range.path, start_offset), + config::remote_storage_read_buffer_mb * 1024 * 1024); RETURN_IF_ERROR(s3_reader->open()); _cur_file_reader = s3_reader; break; @@ -320,12 +317,12 @@ void BrokerScanner::close() { void BrokerScanner::split_line(const Slice& line, std::vector* values) { const char* value = line.data; - size_t start = 0; // point to the start pos of next col value. - size_t curpos= 0; // point to the start pos of separator matching sequence. - size_t p1 = 0; // point to the current pos of separator matching sequence. + size_t start = 0; // point to the start pos of next col value. + size_t curpos = 0; // point to the start pos of separator matching sequence. + size_t p1 = 0; // point to the current pos of separator matching sequence. // Separator: AAAA - // + // // curpos // ▼ // AAAA @@ -351,7 +348,7 @@ void BrokerScanner::split_line(const Slice& line, std::vector* values) { } } - CHECK(curpos == line.size) << curpos << " vs " << line.size; + CHECK(curpos == line.size) << curpos << " vs " << line.size; values->emplace_back(value + start, curpos - start); } diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp index e9eb0e9647..3b6b4285b6 100644 --- a/be/src/exec/buffered_reader.cpp +++ b/be/src/exec/buffered_reader.cpp @@ -45,21 +45,18 @@ Status BufferedReader::open() { return Status::InternalError(ss.str()); } RETURN_IF_ERROR(_reader->open()); - RETURN_IF_ERROR(_fill()); return Status::OK(); } //not support -Status BufferedReader::read_one_message(std::unique_ptr* buf, size_t* length) { +Status BufferedReader::read_one_message(std::unique_ptr* buf, int64_t* length) { return Status::NotSupported("Not support"); - } -Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) { - DCHECK_NE(*buf_len, 0); - int64_t bytes_read; - RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf)); - if (bytes_read == 0) { +Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) { + DCHECK_NE(buf_len, 0); + RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf)); + if (*bytes_read == 0) { *eof = true; } else { *eof = false; @@ -97,7 +94,11 @@ Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* byt // if requested length is larger than the capacity of buffer, do not // need to copy the character into local buffer. if (nbytes > _buffer_size) { - return _reader->readat(position, nbytes, bytes_read, out); + auto st = _reader->readat(position, nbytes, bytes_read, out); + if (st.ok()) { + _cur_offset = position + *bytes_read; + } + return st; } _buffer_offset = position; RETURN_IF_ERROR(_fill()); @@ -116,13 +117,8 @@ Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* byt Status BufferedReader::_fill() { if (_buffer_offset >= 0) { - int64_t bytes_read; - // retry for new content - int retry_times = 1; - do { - // fill the buffer - RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer)); - } while (bytes_read == 0 && retry_times++ < 2); + int64_t bytes_read = 0; + RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer)); _buffer_limit = _buffer_offset + bytes_read; } return Status::OK(); diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h index 76002151fb..b421b1d08b 100644 --- a/be/src/exec/buffered_reader.h +++ b/be/src/exec/buffered_reader.h @@ -18,6 +18,7 @@ #pragma once #include + #include #include "common/status.h" @@ -34,16 +35,16 @@ public: // If the reader need the file size, set it when construct FileReader. // There is no other way to set the file size. // buffered_reader will acquire reader - BufferedReader(FileReader* reader, int64_t = 1024 * 1024); + BufferedReader(FileReader* reader, int64_t buffer_size = 1024 * 1024); virtual ~BufferedReader(); virtual Status open() override; // Read - virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; + virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; - virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, int64_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; diff --git a/be/src/exec/file_reader.h b/be/src/exec/file_reader.h index 6c7cb32394..447b3f547e 100644 --- a/be/src/exec/file_reader.h +++ b/be/src/exec/file_reader.h @@ -18,6 +18,7 @@ #pragma once #include + #include #include "common/status.h" @@ -29,10 +30,10 @@ public: virtual ~FileReader() {} virtual Status open() = 0; // Read content to 'buf', 'buf_len' is the max size of this buffer. - // Return ok when read success, and 'buf_len' is set to size of read content - // If reach to end of file, the eof is set to true. meanwhile 'buf_len' + // Return ok when read success, and 'bytes_read' is set to size of read content + // If reach to end of file, the eof is set to true. meanwhile 'bytes_read' // is set to zero. - virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) = 0; + virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) = 0; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) = 0; /** @@ -41,7 +42,7 @@ public: * if read eof then return Status::OK and length is set 0 and buf is set NULL, * other return readed bytes. */ - virtual Status read_one_message(std::unique_ptr* buf, size_t* length) = 0; + virtual Status read_one_message(std::unique_ptr* buf, int64_t* length) = 0; virtual int64_t size() = 0; virtual Status seek(int64_t position) = 0; virtual Status tell(int64_t* position) = 0; diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/exec/hdfs_file_reader.cpp index 2cecabe303..95c819a91e 100644 --- a/be/src/exec/hdfs_file_reader.cpp +++ b/be/src/exec/hdfs_file_reader.cpp @@ -22,12 +22,16 @@ #include "common/logging.h" namespace doris { -HdfsFileReader::HdfsFileReader(THdfsParams hdfs_params, - const std::string& path, int64_t start_offset) - : _hdfs_params(hdfs_params), _path(path), _current_offset(start_offset), - _file_size(-1), _hdfs_fs(nullptr), _hdfs_file(nullptr) { +HdfsFileReader::HdfsFileReader(THdfsParams hdfs_params, const std::string& path, + int64_t start_offset) + : _hdfs_params(hdfs_params), + _path(path), + _current_offset(start_offset), + _file_size(-1), + _hdfs_fs(nullptr), + _hdfs_file(nullptr) { std::stringstream namenode_ss; - namenode_ss << "hdfs://" << _hdfs_params.host<< ":" << _hdfs_params.port; + namenode_ss << "hdfs://" << _hdfs_params.host << ":" << _hdfs_params.port; _namenode = namenode_ss.str(); } @@ -47,7 +51,8 @@ Status HdfsFileReader::connect() { hdfsBuilderSetPrincipal(hdfs_builder, _hdfs_params.kerb_principal.c_str()); } if (_hdfs_params.__isset.kerb_ticket_cache_path) { - hdfsBuilderSetKerbTicketCachePath(hdfs_builder, _hdfs_params.kerb_ticket_cache_path.c_str()); + hdfsBuilderSetKerbTicketCachePath(hdfs_builder, + _hdfs_params.kerb_ticket_cache_path.c_str()); } // set token if (_hdfs_params.__isset.token) { @@ -107,7 +112,7 @@ bool HdfsFileReader::closed() { } // Read all bytes -Status HdfsFileReader::read_one_message(std::unique_ptr* buf, size_t* length) { +Status HdfsFileReader::read_one_message(std::unique_ptr* buf, int64_t* length) { int64_t file_size = size() - _current_offset; if (file_size <= 0) { buf->reset(); @@ -115,15 +120,14 @@ Status HdfsFileReader::read_one_message(std::unique_ptr* buf, size_t* return Status::OK(); } bool eof; - *length = file_size; buf->reset(new uint8_t[file_size]); - read(buf->get(), length, &eof); + read(buf->get(), file_size, length, &eof); return Status::OK(); } -Status HdfsFileReader::read(uint8_t* buf, size_t* buf_len, bool* eof) { - readat(_current_offset, (int64_t)*buf_len, (int64_t*)buf_len, buf); - if (*buf_len == 0) { +Status HdfsFileReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) { + readat(_current_offset, buf_len, bytes_read, buf); + if (*bytes_read == 0) { *eof = true; } else { *eof = false; diff --git a/be/src/exec/hdfs_file_reader.h b/be/src/exec/hdfs_file_reader.h index 037e2c11d2..df055da0cc 100644 --- a/be/src/exec/hdfs_file_reader.h +++ b/be/src/exec/hdfs_file_reader.h @@ -20,7 +20,6 @@ #include #include "exec/file_reader.h" - #include "gen_cpp/PlanNodes_types.h" namespace doris { @@ -36,10 +35,10 @@ public: // Return ok when read success, and 'buf_len' is set to size of read content // If reach to end of file, the eof is set to true. meanwhile 'buf_len' // is set to zero. - virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; + virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; - virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, int64_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; @@ -48,6 +47,7 @@ public: private: Status connect(); + private: THdfsParams _hdfs_params; std::string _namenode; diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index feb85b6ab4..5ff3d03ef9 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -21,6 +21,7 @@ #include "env/env.h" #include "exec/broker_reader.h" +#include "exec/buffered_reader.h" #include "exec/local_file_reader.h" #include "exec/s3_reader.h" #include "exprs/expr.h" @@ -37,8 +38,7 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const std::vector& pre_filter_ctxs, - ScannerCounter* counter) + const std::vector& pre_filter_ctxs, ScannerCounter* counter) : BaseScanner(state, profile, params, pre_filter_ctxs, counter), _ranges(ranges), _broker_addresses(broker_addresses), @@ -121,7 +121,9 @@ Status JsonScanner::open_next_reader() { break; } case TFileType::FILE_S3: { - S3Reader* s3_reader = new S3Reader(_params.properties, range.path, start_offset); + BufferedReader* s3_reader = + new BufferedReader(new S3Reader(_params.properties, range.path, start_offset), + config::remote_storage_read_buffer_mb * 1024 * 1024); RETURN_IF_ERROR(s3_reader->open()); file = s3_reader; break; @@ -286,7 +288,7 @@ Status JsonReader::_parse_json_doc(bool* eof) { // read a whole message, must be delete json_str by `delete[]` SCOPED_TIMER(_file_read_timer); std::unique_ptr json_str; - size_t length = 0; + int64_t length = 0; RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length)); _bytes_read_counter += length; if (length == 0) { diff --git a/be/src/exec/local_file_reader.cpp b/be/src/exec/local_file_reader.cpp index 04a795ac5c..d5c8454532 100644 --- a/be/src/exec/local_file_reader.cpp +++ b/be/src/exec/local_file_reader.cpp @@ -53,7 +53,7 @@ bool LocalFileReader::closed() { } // Read all bytes -Status LocalFileReader::read_one_message(std::unique_ptr* buf, size_t* length) { +Status LocalFileReader::read_one_message(std::unique_ptr* buf, int64_t* length) { bool eof; int64_t file_size = size() - _current_offset; if (file_size <= 0) { @@ -61,15 +61,14 @@ Status LocalFileReader::read_one_message(std::unique_ptr* buf, size_t *length = 0; return Status::OK(); } - *length = file_size; buf->reset(new uint8_t[file_size]); - read(buf->get(), length, &eof); + read(buf->get(), file_size, length, &eof); return Status::OK(); } -Status LocalFileReader::read(uint8_t* buf, size_t* buf_len, bool* eof) { - readat(_current_offset, (int64_t)*buf_len, (int64_t*)buf_len, buf); - if (*buf_len == 0) { +Status LocalFileReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) { + readat(_current_offset, buf_len, bytes_read, buf); + if (*bytes_read == 0) { *eof = true; } else { *eof = false; diff --git a/be/src/exec/local_file_reader.h b/be/src/exec/local_file_reader.h index 4a302cca2d..3224f94562 100644 --- a/be/src/exec/local_file_reader.h +++ b/be/src/exec/local_file_reader.h @@ -35,10 +35,10 @@ public: // Return ok when read success, and 'buf_len' is set to size of read content // If reach to end of file, the eof is set to true. meanwhile 'buf_len' // is set to zero. - virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; + virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; - virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, int64_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; diff --git a/be/src/exec/plain_text_line_reader.cpp b/be/src/exec/plain_text_line_reader.cpp index d51fa33a93..8932e1c492 100644 --- a/be/src/exec/plain_text_line_reader.cpp +++ b/be/src/exec/plain_text_line_reader.cpp @@ -34,7 +34,8 @@ namespace doris { PlainTextLineReader::PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader, Decompressor* decompressor, size_t length, - const std::string& line_delimiter, size_t line_delimiter_length) + const std::string& line_delimiter, + size_t line_delimiter_length) : _profile(profile), _file_reader(file_reader), _decompressor(decompressor), @@ -207,25 +208,27 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e // we still have data in input which is not decompressed. // and no more data is required for input } else { - size_t read_len = 0; + int64_t read_len = 0; + int64_t buffer_len = 0; uint8_t* file_buf; + if (_decompressor == nullptr) { // uncompressed file, read directly into output buf file_buf = _output_buf + _output_buf_limit; - read_len = _output_buf_size - _output_buf_limit; + buffer_len = _output_buf_size - _output_buf_limit; } else { // MARK if (_more_input_bytes > 0) { // we already extend input buf. // current data in input buf should remain unchanged file_buf = _input_buf + _input_buf_limit; - read_len = _input_buf_size - _input_buf_limit; + buffer_len = _input_buf_size - _input_buf_limit; // leave input pos and limit unchanged } else { // here we are sure that all data in input buf has been consumed. // which means input pos and limit should be reset. file_buf = _input_buf; - read_len = _input_buf_size; + buffer_len = _input_buf_size; // reset input pos and limit _input_buf_pos = 0; _input_buf_limit = 0; @@ -234,7 +237,8 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e { SCOPED_TIMER(_read_timer); - RETURN_IF_ERROR(_file_reader->read(file_buf, &read_len, &_file_eof)); + RETURN_IF_ERROR( + _file_reader->read(file_buf, buffer_len, &read_len, &_file_eof)); COUNTER_UPDATE(_bytes_read_counter, read_len); } // LOG(INFO) << "after read file: _file_eof: " << _file_eof << " read_len: " << read_len; diff --git a/be/src/exec/s3_reader.cpp b/be/src/exec/s3_reader.cpp index 3f4fa93924..b8a674b2bd 100644 --- a/be/src/exec/s3_reader.cpp +++ b/be/src/exec/s3_reader.cpp @@ -49,8 +49,8 @@ S3Reader::S3Reader(const std::map& properties, const s S3Reader::~S3Reader() {} Status S3Reader::open() { - CHECK_S3_CLIENT(_client); - if (!_uri.parse()) { + CHECK_S3_CLIENT(_client); + if (!_uri.parse()) { return Status::InvalidArgument("s3 uri is invalid: " + _path); } Aws::S3::Model::HeadObjectRequest request; @@ -68,10 +68,10 @@ Status S3Reader::open() { return Status::InternalError(out.str()); } } -Status S3Reader::read(uint8_t* buf, size_t* buf_len, bool* eof) { - DCHECK_NE(*buf_len, 0); - RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, (int64_t*)buf_len, buf)); - if (*buf_len == 0 ) { +Status S3Reader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) { + DCHECK_NE(buf_len, 0); + RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf)); + if (*bytes_read == 0) { *eof = true; } else { *eof = false; @@ -83,13 +83,13 @@ Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, v if (position >= _file_size) { *bytes_read = 0; VLOG_FILE << "Read end of file: " + _path; - return Status::EndOfFile("Read end of file: " + _path); + return Status::OK(); } Aws::S3::Model::GetObjectRequest request; request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key()); string bytes = StrCat("bytes=", position, "-"); if (position + nbytes < _file_size) { - string bytes = StrCat(bytes.c_str(), position + nbytes - 1); + bytes = StrCat(bytes.c_str(), position + nbytes - 1); } request.SetRange(bytes.c_str()); auto response = _client->GetObject(request); @@ -107,7 +107,7 @@ Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, v response.GetResult().GetBody().read((char*)out, *bytes_read); return Status::OK(); } -Status S3Reader::read_one_message(std::unique_ptr* buf, size_t* length) { +Status S3Reader::read_one_message(std::unique_ptr* buf, int64_t* length) { bool eof; int64_t file_size = size() - _cur_offset; if (file_size <= 0) { @@ -115,9 +115,8 @@ Status S3Reader::read_one_message(std::unique_ptr* buf, size_t* lengt *length = 0; return Status::OK(); } - *length = file_size; buf->reset(new uint8_t[file_size]); - read(buf->get(), length, &eof); + read(buf->get(), file_size, length, &eof); return Status::OK(); } diff --git a/be/src/exec/s3_reader.h b/be/src/exec/s3_reader.h index 260c9198e6..cd5c1855d7 100644 --- a/be/src/exec/s3_reader.h +++ b/be/src/exec/s3_reader.h @@ -40,7 +40,7 @@ public: // Return ok when read success, and 'buf_len' is set to size of read content // If reach to end of file, the eof is set to true. meanwhile 'buf_len' // is set to zero. - virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; + virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; @@ -50,7 +50,7 @@ public: * if read eof then return Status::OK and length is set 0 and buf is set NULL, * other return readed bytes. */ - virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, int64_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index ed0abc970c..8bcbfa999d 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -327,8 +327,9 @@ Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) { std::stringstream ss; while (true) { char one; - size_t len = 1; - Status st = pipe->read((uint8_t*)&one, &len, &eof); + int64_t len = 1; + int64_t read_bytes = 0; + Status st = pipe->read((uint8_t*)&one, len, &read_bytes, &eof); if (!st.ok()) { LOG(WARNING) << "read failed"; ctx->promise.set_value(st); diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 0861da5181..95b0adfba7 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -86,7 +86,7 @@ public: // If _total_length == -1, this should be a Kafka routine load task, // just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. // Otherwise, this should be a stream load task that needs to read the specified amount of data. - Status read_one_message(std::unique_ptr* data, size_t* length) override { + Status read_one_message(std::unique_ptr* data, int64_t* length) override { if (_total_length < -1) { std::stringstream ss; ss << "invalid, _total_length is: " << _total_length; @@ -103,18 +103,17 @@ public: // _total_length > 0, read the entire data data->reset(new uint8_t[_total_length]); - *length = _total_length; bool eof = false; - Status st = read(data->get(), length, &eof); + Status st = read(data->get(), _total_length, length, &eof); if (eof) { *length = 0; } return st; } - Status read(uint8_t* data, size_t* data_size, bool* eof) override { - size_t bytes_read = 0; - while (bytes_read < *data_size) { + Status read(uint8_t* data, int64_t data_size, int64_t* bytes_read, bool* eof) override { + *bytes_read = 0; + while (*bytes_read < data_size) { std::unique_lock l(_lock); while (!_cancelled && !_finished && _buf_queue.empty()) { _get_cond.wait(l); @@ -126,22 +125,22 @@ public: // finished if (_buf_queue.empty()) { DCHECK(_finished); - *data_size = bytes_read; - *eof = (bytes_read == 0); + data_size = *bytes_read; + *eof = (*bytes_read == 0); return Status::OK(); } auto buf = _buf_queue.front(); - size_t copy_size = std::min(*data_size - bytes_read, buf->remaining()); - buf->get_bytes((char*)data + bytes_read, copy_size); - bytes_read += copy_size; + int64_t copy_size = std::min(data_size - *bytes_read, (int64_t)buf->remaining()); + buf->get_bytes((char*)data + *bytes_read, copy_size); + *bytes_read += copy_size; if (!buf->has_remaining()) { _buf_queue.pop_front(); _buffered_bytes -= buf->limit; _put_cond.notify_one(); } } - DCHECK(bytes_read == *data_size) - << "bytes_read=" << bytes_read << ", *data_size=" << *data_size; + DCHECK(*bytes_read == data_size) + << "*bytes_read=" << *bytes_read << ", data_size=" << data_size; *eof = false; return Status::OK(); } @@ -188,7 +187,7 @@ public: private: // read the next buffer from _buf_queue - Status _read_next_buffer(std::unique_ptr* data, size_t* length) { + Status _read_next_buffer(std::unique_ptr* data, int64_t* length) { std::unique_lock l(_lock); while (!_cancelled && !_finished && _buf_queue.empty()) { _get_cond.wait(l); diff --git a/be/src/util/broker_storage_backend.cpp b/be/src/util/broker_storage_backend.cpp index 75fb224c64..f97798eccf 100644 --- a/be/src/util/broker_storage_backend.cpp +++ b/be/src/util/broker_storage_backend.cpp @@ -74,8 +74,9 @@ Status BrokerStorageBackend::download(const std::string& remote, const std::stri size_t write_offset = 0; bool eof = false; while (!eof) { - size_t read_len = buf_sz; - RETURN_IF_ERROR(broker_reader->read(reinterpret_cast(read_buf), &read_len, &eof)); + int64_t read_len = 0; + RETURN_IF_ERROR( + broker_reader->read(reinterpret_cast(read_buf), buf_sz, &read_len, &eof)); if (eof) { continue; diff --git a/be/test/exec/buffered_reader_test.cpp b/be/test/exec/buffered_reader_test.cpp index cb8968866a..19525a624b 100644 --- a/be/test/exec/buffered_reader_test.cpp +++ b/be/test/exec/buffered_reader_test.cpp @@ -58,34 +58,35 @@ TEST_F(BufferedReaderTest, test_validity) { ASSERT_TRUE(st.ok()); uint8_t buf[10]; bool eof = false; - size_t buf_len = 10; + int64_t buf_len = 10; + int64_t read_length = 0; - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); - ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, read_length).c_str()); ASSERT_FALSE(eof); - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); - ASSERT_STREQ("vxzAbCdEfG", std::string((char*)buf, buf_len).c_str()); + ASSERT_STREQ("vxzAbCdEfG", std::string((char*)buf, read_length).c_str()); ASSERT_FALSE(eof); - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); - ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, buf_len).c_str()); + ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, read_length).c_str()); ASSERT_FALSE(eof); - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); - ASSERT_STREQ("rStUvWxYz\n", std::string((char*)buf, buf_len).c_str()); + ASSERT_STREQ("rStUvWxYz\n", std::string((char*)buf, read_length).c_str()); ASSERT_FALSE(eof); - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); ASSERT_STREQ("IjKl", std::string((char*)buf, 4).c_str()); ASSERT_FALSE(eof); - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); } @@ -100,42 +101,43 @@ TEST_F(BufferedReaderTest, test_seek) { uint8_t buf[10]; bool eof = false; size_t buf_len = 10; + int64_t read_length = 0; // Seek to the end of the file st = reader.seek(45); ASSERT_TRUE(st.ok()); - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); // Seek to the beginning of the file st = reader.seek(0); ASSERT_TRUE(st.ok()); - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); - ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, read_length).c_str()); ASSERT_FALSE(eof); // Seek to a wrong position st = reader.seek(-1); ASSERT_TRUE(st.ok()); - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); - ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, read_length).c_str()); ASSERT_FALSE(eof); // Seek to a wrong position st = reader.seek(-1000); ASSERT_TRUE(st.ok()); - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); - ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, read_length).c_str()); ASSERT_FALSE(eof); // Seek to a wrong position st = reader.seek(1000); ASSERT_TRUE(st.ok()); - st = reader.read(buf, &buf_len, &eof); + st = reader.read(buf, buf_len, &read_length, &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); } diff --git a/be/test/exec/s3_reader_test.cpp b/be/test/exec/s3_reader_test.cpp index 4869b19119..cc8ad753e6 100644 --- a/be/test/exec/s3_reader_test.cpp +++ b/be/test/exec/s3_reader_test.cpp @@ -95,9 +95,9 @@ TEST_F(S3ReaderTest, normal) { ASSERT_EQ(_content.length(), reader->size()); std::string verification_contents; verification_contents.resize(_content.length()); - size_t total_read = _content.length(); + int64_t total_read = 0; bool eof = false; - st = reader->read((uint8_t*)&verification_contents[0], &total_read, &eof); + st = reader->read((uint8_t*)&verification_contents[0], _content.length(), &total_read, &eof); ASSERT_TRUE(st.ok()); ASSERT_EQ(_content, verification_contents); ASSERT_EQ(_content.length(), total_read); @@ -109,7 +109,7 @@ TEST_F(S3ReaderTest, normal) { st = reader->readat(_content.length(), _content.length(), (int64_t*)(&total_read), (uint8_t*)&verification_contents[0]); LOG(INFO) << total_read; - ASSERT_TRUE(total_read==0); + ASSERT_TRUE(total_read == 0); } } // end namespace doris diff --git a/be/test/runtime/kafka_consumer_pipe_test.cpp b/be/test/runtime/kafka_consumer_pipe_test.cpp index d63572775e..9b19c124e5 100644 --- a/be/test/runtime/kafka_consumer_pipe_test.cpp +++ b/be/test/runtime/kafka_consumer_pipe_test.cpp @@ -48,17 +48,18 @@ TEST_F(KafkaConsumerPipeTest, append_read) { ASSERT_TRUE(st.ok()); char buf[1024]; - size_t data_size = 1024; + int64_t data_size = 1024; + int64_t read_bytes = 0; bool eof = false; - st = k_pipe.read((uint8_t*)buf, &data_size, &eof); + st = k_pipe.read((uint8_t*)buf, data_size, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(data_size, msg1.length() + msg2.length() + 2); + ASSERT_EQ(read_bytes, msg1.length() + msg2.length() + 2); ASSERT_EQ(eof, false); data_size = 1024; - st = k_pipe.read((uint8_t*)buf, &data_size, &eof); + st = k_pipe.read((uint8_t*)buf, data_size, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(data_size, 0); + ASSERT_EQ(read_bytes, 0); ASSERT_EQ(eof, true); } diff --git a/be/test/runtime/stream_load_pipe_test.cpp b/be/test/runtime/stream_load_pipe_test.cpp index f33fb53647..cd7c678bca 100644 --- a/be/test/runtime/stream_load_pipe_test.cpp +++ b/be/test/runtime/stream_load_pipe_test.cpp @@ -52,18 +52,19 @@ TEST_F(StreamLoadPipeTest, append_buffer) { std::thread t1(appender); char buf[256]; - size_t buf_len = 256; + int64_t buf_len = 256; + int64_t read_bytes = 0; bool eof = false; - auto st = pipe.read((uint8_t*)buf, &buf_len, &eof); + auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(128, buf_len); + ASSERT_EQ(128, read_bytes); ASSERT_FALSE(eof); for (int i = 0; i < 128; ++i) { ASSERT_EQ('0' + (i % 10), buf[i]); } - st = pipe.read((uint8_t*)buf, &buf_len, &eof); + st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(0, buf_len); + ASSERT_EQ(0, read_bytes); ASSERT_TRUE(eof); t1.join(); @@ -82,18 +83,19 @@ TEST_F(StreamLoadPipeTest, append_bytes) { std::thread t1(appender); char buf[256]; - size_t buf_len = 256; + int64_t buf_len = 256; + int64_t read_bytes = 0; bool eof = false; - auto st = pipe.read((uint8_t*)buf, &buf_len, &eof); + auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(128, buf_len); + ASSERT_EQ(128, read_bytes); ASSERT_FALSE(eof); for (int i = 0; i < 128; ++i) { ASSERT_EQ('0' + (i % 10), buf[i]); } - st = pipe.read((uint8_t*)buf, &buf_len, &eof); + st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(0, buf_len); + ASSERT_EQ(0, read_bytes); ASSERT_TRUE(eof); t1.join(); @@ -112,11 +114,12 @@ TEST_F(StreamLoadPipeTest, append_bytes2) { std::thread t1(appender); char buf[128]; - size_t buf_len = 62; + int64_t buf_len = 62; + int64_t read_bytes = 0; bool eof = false; - auto st = pipe.read((uint8_t*)buf, &buf_len, &eof); + auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(62, buf_len); + ASSERT_EQ(62, read_bytes); ASSERT_FALSE(eof); for (int i = 0; i < 62; ++i) { ASSERT_EQ('0' + (i % 10), buf[i]); @@ -124,15 +127,15 @@ TEST_F(StreamLoadPipeTest, append_bytes2) { for (int i = 62; i < 128; ++i) { char ch; buf_len = 1; - auto st = pipe.read((uint8_t*)&ch, &buf_len, &eof); + auto st = pipe.read((uint8_t*)&ch, buf_len, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(1, buf_len); + ASSERT_EQ(1, read_bytes); ASSERT_FALSE(eof); ASSERT_EQ('0' + (i % 10), ch); } - st = pipe.read((uint8_t*)buf, &buf_len, &eof); + st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(0, buf_len); + ASSERT_EQ(0, read_bytes); ASSERT_TRUE(eof); t1.join(); @@ -180,18 +183,19 @@ TEST_F(StreamLoadPipeTest, append_mix) { std::thread t1(appender); char buf[128]; - size_t buf_len = 128; + int64_t buf_len = 128; + int64_t read_bytes = 0; bool eof = false; - auto st = pipe.read((uint8_t*)buf, &buf_len, &eof); + auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(128, buf_len); + ASSERT_EQ(128, read_bytes); ASSERT_FALSE(eof); for (int i = 0; i < 128; ++i) { ASSERT_EQ('0' + (i % 10), buf[i]); } - st = pipe.read((uint8_t*)buf, &buf_len, &eof); + st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); ASSERT_TRUE(st.ok()); - ASSERT_EQ(0, buf_len); + ASSERT_EQ(0, read_bytes); ASSERT_TRUE(eof); t1.join(); @@ -212,9 +216,10 @@ TEST_F(StreamLoadPipeTest, cancel) { std::thread t1(appender); char buf[128]; - size_t buf_len = 128; + int64_t buf_len = 128; + int64_t read_bytes = 0; bool eof = false; - auto st = pipe.read((uint8_t*)buf, &buf_len, &eof); + auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof); ASSERT_FALSE(st.ok()); t1.join(); }