[fix](scan) Avoid memory allocated by buffered_reader from being traced (#41921) (#44253)

Use OwnedSlice to replace `char*` in BufferedReader

## Proposed changes

pick #41921
This commit is contained in:
Jerry Hu
2024-11-20 10:37:06 +08:00
committed by GitHub
parent a666b5e0c9
commit dc67086d97
3 changed files with 20 additions and 16 deletions

View File

@ -23,6 +23,7 @@
#include <algorithm>
#include <chrono>
#include <memory>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
@ -31,6 +32,7 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
#include "util/threadpool.h"
namespace doris {
@ -270,7 +272,7 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t off
}
if (copy_out != nullptr) {
memcpy(copy_out + to_handle - remaining,
_boxes[box_index] + cached_data.box_start_offset[i], box_to_handle);
_boxes[box_index].data() + cached_data.box_start_offset[i], box_to_handle);
}
remaining -= box_to_handle;
cached_data.box_start_offset[i] += box_to_handle;
@ -307,14 +309,15 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t off
Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, size_t to_read,
size_t* bytes_read, const IOContext* io_ctx) {
if (_read_slice == nullptr) {
_read_slice = new char[READ_SLICE_SIZE];
if (!_read_slice) {
_read_slice = std::make_unique<OwnedSlice>(READ_SLICE_SIZE);
}
*bytes_read = 0;
{
SCOPED_RAW_TIMER(&_statistics.read_time);
RETURN_IF_ERROR(
_reader->read_at(start_offset, Slice(_read_slice, to_read), bytes_read, io_ctx));
RETURN_IF_ERROR(_reader->read_at(start_offset, Slice(_read_slice->data(), to_read),
bytes_read, io_ctx));
_statistics.merged_io++;
_statistics.merged_bytes += *bytes_read;
}
@ -328,8 +331,8 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
auto fill_box = [&](int16 fill_box_ref, uint32 box_usage, size_t box_copy_end) {
size_t copy_size = std::min(box_copy_end - copy_start, BOX_SIZE - box_usage);
memcpy(_boxes[fill_box_ref] + box_usage, _read_slice + copy_start - start_offset,
copy_size);
memcpy(_boxes[fill_box_ref].data() + box_usage,
_read_slice->data() + copy_start - start_offset, copy_size);
filled_boxes.emplace_back(fill_box_ref, box_usage, copy_start, copy_start + copy_size);
copy_start += copy_size;
_last_box_ref = fill_box_ref;
@ -367,7 +370,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
}
// apply for new box to copy data
while (copy_start < range_copy_end && _boxes.size() < NUM_BOX) {
_boxes.emplace_back(new char[BOX_SIZE]);
_boxes.emplace_back(BOX_SIZE);
_box_ref.emplace_back(0);
fill_box(_boxes.size() - 1, 0, range_copy_end);
}

View File

@ -168,12 +168,7 @@ public:
}
}
~MergeRangeFileReader() override {
delete[] _read_slice;
for (char* box : _boxes) {
delete[] box;
}
}
~MergeRangeFileReader() override = default;
Status close() override {
if (!_closed) {
@ -246,8 +241,8 @@ private:
bool _closed = false;
size_t _remaining;
char* _read_slice = nullptr;
std::vector<char*> _boxes;
std::unique_ptr<OwnedSlice> _read_slice;
std::vector<OwnedSlice> _boxes;
int16 _last_box_ref = -1;
uint32 _last_box_usage = 0;
std::vector<int16> _box_ref;

View File

@ -344,6 +344,10 @@ class OwnedSlice : private Allocator<false, false, false, DefaultMemoryAllocator
public:
OwnedSlice() : _slice((uint8_t*)nullptr, 0) {}
OwnedSlice(size_t length)
: _slice(reinterpret_cast<char*>(Allocator::alloc(length)), length),
_capacity(length) {}
OwnedSlice(OwnedSlice&& src) : _slice(src._slice), _capacity(src._capacity) {
src._slice.data = nullptr;
src._slice.size = 0;
@ -369,6 +373,8 @@ public:
}
}
char* data() const { return _slice.data; }
const Slice& slice() const { return _slice; }
private: