[Profile] Support show load profile for broker load job (#6214)
1. Add new statement: `SHOW LOAD PROFILE "xxx";` 2. Improve the read performance of orc scanner
This commit is contained in:
@ -20,17 +20,22 @@
|
||||
#include <algorithm>
|
||||
#include <sstream>
|
||||
|
||||
#include "common/config.h"
|
||||
#include "common/logging.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
// buffered reader
|
||||
BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
|
||||
: _reader(reader),
|
||||
BufferedReader::BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t buffer_size)
|
||||
: _profile(profile),
|
||||
_reader(reader),
|
||||
_buffer_size(buffer_size),
|
||||
_buffer_offset(0),
|
||||
_buffer_limit(0),
|
||||
_cur_offset(0) {
|
||||
if (_buffer_size == -1L) {
|
||||
_buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
|
||||
}
|
||||
_buffer = new char[_buffer_size];
|
||||
// set the _cur_offset of this reader as same as the inner reader's,
|
||||
// to make sure the buffer reader will start to read at right position.
|
||||
@ -47,6 +52,14 @@ Status BufferedReader::open() {
|
||||
ss << "Open buffered reader failed, reader is null";
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
|
||||
// the macro ADD_XXX is idempotent.
|
||||
// So although each scanner calls the ADD_XXX method, they all use the same counters.
|
||||
_read_timer = ADD_TIMER(_profile, "FileReadTime");
|
||||
_remote_read_timer = ADD_CHILD_TIMER(_profile, "FileRemoteReadTime", "FileReadTime");
|
||||
_read_counter = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT);
|
||||
_remote_read_counter = ADD_COUNTER(_profile, "FileRemoteReadCalls", TUnit::UNIT);
|
||||
|
||||
RETURN_IF_ERROR(_reader->open());
|
||||
return Status::OK();
|
||||
}
|
||||
@ -68,6 +81,7 @@ Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read,
|
||||
}
|
||||
|
||||
Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
|
||||
SCOPED_TIMER(_read_timer);
|
||||
if (nbytes <= 0) {
|
||||
*bytes_read = 0;
|
||||
return Status::OK();
|
||||
@ -92,6 +106,7 @@ Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
|
||||
|
||||
Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read,
|
||||
void* out) {
|
||||
_read_count++;
|
||||
// requested bytes missed the local buffer
|
||||
if (position >= _buffer_limit || position < _buffer_offset) {
|
||||
// if requested length is larger than the capacity of buffer, do not
|
||||
@ -121,6 +136,7 @@ Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* byt
|
||||
Status BufferedReader::_fill() {
|
||||
if (_buffer_offset >= 0) {
|
||||
int64_t bytes_read = 0;
|
||||
SCOPED_TIMER(_remote_read_timer);
|
||||
RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer));
|
||||
_buffer_limit = _buffer_offset + bytes_read;
|
||||
}
|
||||
@ -144,6 +160,13 @@ Status BufferedReader::tell(int64_t* position) {
|
||||
void BufferedReader::close() {
|
||||
_reader->close();
|
||||
SAFE_DELETE_ARRAY(_buffer);
|
||||
|
||||
if (_read_counter != nullptr) {
|
||||
COUNTER_UPDATE(_read_counter, _read_count);
|
||||
}
|
||||
if (_remote_read_counter != nullptr) {
|
||||
COUNTER_UPDATE(_remote_read_counter, _remote_read_count);
|
||||
}
|
||||
}
|
||||
|
||||
bool BufferedReader::closed() {
|
||||
|
||||
Reference in New Issue
Block a user