Release memory pool held by the parquet reader when the data has been flushed by rowset writter. Co-authored-by: spaces-x <weixiang06@meituan.com>
This commit is contained in:
@ -495,6 +495,8 @@ CONF_mInt64(write_buffer_size, "209715200");
|
||||
|
||||
// max buffer size used in memtable for the aggregated table
|
||||
CONF_mInt64(memtable_max_buffer_size, "419430400");
|
||||
// write buffer size in push task for sparkload, default 1GB
|
||||
CONF_mInt64(flush_size_for_sparkload, "1073741824");
|
||||
|
||||
// following 2 configs limit the memory consumption of load process on a Backend.
|
||||
// eg: memory limit to 80% of mem limit config but up to 100GB(default)
|
||||
|
||||
@ -66,6 +66,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo
|
||||
|
||||
COUNTER_UPDATE(_rows_read_counter, 1);
|
||||
SCOPED_TIMER(_materialize_timer);
|
||||
// TODO(weixiang): check whether shallow copy is enough
|
||||
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
|
||||
break; // break always
|
||||
}
|
||||
|
||||
@ -242,13 +242,18 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur
|
||||
}
|
||||
|
||||
// 3. Init Row
|
||||
uint8_t* tuple_buf = reader->mem_pool()->allocate(schema->schema_size());
|
||||
ContiguousRow row(schema.get(), tuple_buf);
|
||||
std::unique_ptr<uint8_t[]> tuple_buf(new uint8_t[schema->schema_size()]);
|
||||
ContiguousRow row(schema.get(), tuple_buf.get());
|
||||
|
||||
// 4. Read data from broker and write into cur_tablet
|
||||
// Convert from raw to delta
|
||||
VLOG_NOTICE << "start to convert etl file to delta.";
|
||||
while (!reader->eof()) {
|
||||
if (reader->mem_pool()->mem_tracker()->consumption() >
|
||||
config::flush_size_for_sparkload) {
|
||||
RETURN_NOT_OK(rowset_writer->flush());
|
||||
reader->mem_pool()->free_all();
|
||||
}
|
||||
res = reader->next(&row);
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "read next row failed."
|
||||
@ -824,7 +829,9 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc
|
||||
}
|
||||
_runtime_profile = _runtime_state->runtime_profile();
|
||||
_runtime_profile->set_name("PushBrokerReader");
|
||||
_mem_pool.reset(new MemPool());
|
||||
_mem_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get()));
|
||||
_tuple_buffer_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get()));
|
||||
|
||||
_counter.reset(new ScannerCounter());
|
||||
|
||||
// init scanner
|
||||
@ -856,7 +863,7 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc
|
||||
}
|
||||
|
||||
int tuple_buffer_size = _tuple_desc->byte_size();
|
||||
void* tuple_buffer = _mem_pool->allocate(tuple_buffer_size);
|
||||
void* tuple_buffer = _tuple_buffer_pool->allocate(tuple_buffer_size);
|
||||
if (tuple_buffer == nullptr) {
|
||||
LOG(WARNING) << "Allocate memory for tuple failed";
|
||||
return Status::Error<PUSH_INIT_ERROR>();
|
||||
|
||||
@ -206,6 +206,7 @@ private:
|
||||
std::unique_ptr<RuntimeState> _runtime_state;
|
||||
RuntimeProfile* _runtime_profile;
|
||||
std::unique_ptr<MemPool> _mem_pool;
|
||||
std::unique_ptr<MemPool> _tuple_buffer_pool;
|
||||
std::unique_ptr<ScannerCounter> _counter;
|
||||
std::unique_ptr<BaseScanner> _scanner;
|
||||
// Not used, just for placeholding
|
||||
|
||||
Reference in New Issue
Block a user