[BUG] Fix Memory Leak in SchemaChange And Fix some DCHECK error (#5491)

This commit is contained in:
stdpain
2021-03-17 09:27:05 +08:00
committed by GitHub
parent 105a86d1cd
commit a1bce25677
16 changed files with 180 additions and 154 deletions

View File

@ -753,7 +753,14 @@ Status AnalyticEvalNode::get_next_output_batch(RuntimeState* state, RowBatch* ou
// CopyRow works as expected: input_batch tuples form a prefix of output_batch
// tuples.
TupleRow* dest = output_batch->get_row(output_batch->add_row());
input_batch.copy_row(input_batch.get_row(i), dest);
// input_batch is from a tuple_buffer_stream,
// It can only guarantee that the life cycle is valid in a batch stage.
// If the ancestor node is a no-spilling blocking node (such as hash_join_node except_node ...)
// these node may acquire a invalid tuple pointer,
// so we should use deep_copy, and copy tuple to the tuple_pool, to ensure tuple not finalized.
// reference issue #5466
input_batch.get_row(i)->deep_copy(dest, child(0)->row_desc().tuple_descriptors(),
output_batch->tuple_data_pool(), false);
dest->set_tuple(num_child_tuples, _result_tuples.front().second);
if (ExecNode::eval_conjuncts(ctxs, num_ctxs, dest)) {

View File

@ -18,6 +18,7 @@
#pragma once
#include <stdint.h>
#include <memory>
#include "common/status.h"
#include "exec/file_reader.h"
@ -32,6 +33,7 @@ class BufferedReader : public FileReader {
public:
// If the reader need the file size, set it when construct FileReader.
// There is no other way to set the file size.
// buffered_reader will acquire reader
BufferedReader(FileReader* reader, int64_t = 1024 * 1024);
virtual ~BufferedReader();
@ -53,7 +55,7 @@ private:
Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out);
private:
FileReader* _reader;
std::unique_ptr<FileReader> _reader;
char* _buffer;
int64_t _buffer_size;
int64_t _buffer_offset;

View File

@ -207,6 +207,9 @@ inline SQLFilterOp to_olap_filter_type(TExprOpcode::type type, bool opposite) {
case TExprOpcode::NE:
return opposite ? FILTER_IN : FILTER_NOT_IN;
case TExprOpcode::EQ_FOR_NULL:
return FILTER_IN;
default:
VLOG_CRITICAL << "TExprOpcode: " << type;
DCHECK(false);

View File

@ -345,9 +345,20 @@ Status PartitionedAggregationNode::open(RuntimeState* state) {
}
Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
int first_row_idx = row_batch->num_rows();
RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos));
RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx));
// PartitionedAggregationNode is a spill node, GetNextInternal will read tuple from a tuple stream
// then copy the pointer to a RowBatch, it can only guarantee that the life cycle is valid in a batch stage.
// If the ancestor node is a no-spilling blocking node (such as hash_join_node except_node ...)
// these node may acquire a invalid tuple pointer,
// so we should use deep_copy, and copy tuple to the tuple_pool, to ensure tuple not finalized.
// reference issue #5466
// TODO: if ancestor node don't have a no-spilling blocking node, we could avoid a deep_copy
// we should a flag indicate this node don't have to deep_copy
DCHECK_EQ(row_batch->num_rows(), 0);
RowBatch batch(row_batch->row_desc(), row_batch->capacity(), _mem_tracker.get());
int first_row_idx = batch.num_rows();
RETURN_IF_ERROR(GetNextInternal(state, &batch, eos));
RETURN_IF_ERROR(HandleOutputStrings(&batch, first_row_idx));
batch.deep_copy_to(row_batch);
return Status::OK();
}

View File

@ -141,7 +141,7 @@ FloatVal Literal::get_float_val(ExprContext* context, TupleRow* row) {
}
DoubleVal Literal::get_double_val(ExprContext* context, TupleRow* row) {
DCHECK_EQ(_type.type, TYPE_DOUBLE) << _type;
DCHECK(_type.type == TYPE_DOUBLE || _type.type == TYPE_TIME) << _type;
return DoubleVal(_value.double_val);
}

View File

@ -483,8 +483,6 @@ bool MiniLoadAction::_is_streaming(HttpRequest* req) {
LOG(INFO) << ss.str();
return false;
}
MiniLoadCtx* mini_load_ctx = new MiniLoadCtx(true);
req->set_handler_ctx(mini_load_ctx);
return true;
}

View File

@ -28,6 +28,7 @@
#include "agent/utils.h"
#include "common/config.h"
#include "common/object_pool.h"
#include "gutil/strings/substitute.h"
#include "http/ev_http_server.h"
#include "http/http_channel.h"
@ -281,18 +282,18 @@ void SymbolAction::handle(HttpRequest* req) {
}
}
Status PprofActions::setup(ExecEnv* exec_env, EvHttpServer* http_server) {
Status PprofActions::setup(ExecEnv* exec_env, EvHttpServer* http_server, ObjectPool& pool) {
if (!config::pprof_profile_dir.empty()) {
FileUtils::create_dir(config::pprof_profile_dir);
}
http_server->register_handler(HttpMethod::GET, "/pprof/heap", new HeapAction());
http_server->register_handler(HttpMethod::GET, "/pprof/growth", new GrowthAction());
http_server->register_handler(HttpMethod::GET, "/pprof/profile", new ProfileAction());
http_server->register_handler(HttpMethod::GET, "/pprof/pmuprofile", new PmuProfileAction());
http_server->register_handler(HttpMethod::GET, "/pprof/contention", new ContentionAction());
http_server->register_handler(HttpMethod::GET, "/pprof/cmdline", new CmdlineAction());
auto action = new SymbolAction(exec_env->bfd_parser());
http_server->register_handler(HttpMethod::GET, "/pprof/heap", pool.add(new HeapAction()));
http_server->register_handler(HttpMethod::GET, "/pprof/growth", pool.add(new GrowthAction()));
http_server->register_handler(HttpMethod::GET, "/pprof/profile", pool.add(new ProfileAction()));
http_server->register_handler(HttpMethod::GET, "/pprof/pmuprofile", pool.add(new PmuProfileAction()));
http_server->register_handler(HttpMethod::GET, "/pprof/contention", pool.add(new ContentionAction()));
http_server->register_handler(HttpMethod::GET, "/pprof/cmdline", pool.add(new CmdlineAction()));
auto action = pool.add(new SymbolAction(exec_env->bfd_parser()));
http_server->register_handler(HttpMethod::GET, "/pprof/symbol", action);
http_server->register_handler(HttpMethod::HEAD, "/pprof/symbol", action);
http_server->register_handler(HttpMethod::POST, "/pprof/symbol", action);

View File

@ -24,10 +24,11 @@ namespace doris {
class EvHttpServer;
class ExecEnv;
class ObjectPool;
class PprofActions {
public:
static Status setup(ExecEnv* exec_env, EvHttpServer* http_server);
static Status setup(ExecEnv* exec_env, EvHttpServer* http_server, ObjectPool& pool);
};
} // namespace doris

View File

@ -128,8 +128,8 @@ void EvHttpServer::stop() {
{
std::lock_guard<std::mutex> lock(_event_bases_lock);
for (int i = 0; i < _num_workers; ++i) {
LOG(WARNING) << "event_base_loopexit ret: "
<< event_base_loopexit(_event_bases[i].get(), nullptr);
LOG(WARNING) << "event_base_loopbreak ret: "
<< event_base_loopbreak(_event_bases[i].get());
}
_event_bases.clear();
}

View File

@ -461,10 +461,9 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL
// we use zero size represent this slice is a agg object
dst_slice->size = 0;
auto* hll = new HyperLogLog(*src_slice);
dst_slice->data = reinterpret_cast<char*>(hll);
mem_pool->mem_tracker()->Consume(sizeof(HyperLogLog));
agg_pool->add(hll);
}
@ -511,8 +510,6 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_BITMAP_UNION, OLAP_FIELD_TYPE_
dst_slice->data = (char*)bitmap;
mem_pool->mem_tracker()->Consume(sizeof(BitmapValue));
agg_pool->add(bitmap);
}

View File

@ -37,6 +37,7 @@
#include "runtime/exec_env.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "util/defer_op.h"
using std::deque;
using std::list;
@ -56,7 +57,8 @@ public:
bool sort(RowBlock** row_block);
private:
static bool _row_cursor_comparator(const RowCursor* a, const RowCursor* b) {
static bool _row_cursor_comparator(const std::unique_ptr<RowCursor>& a,
const std::unique_ptr<RowCursor>& b) {
return compare_row(*a, *b) < 0;
}
@ -447,7 +449,8 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t
// filter data according to delete conditions specified in DeleteData command
if (is_data_left_vec[row_index] == 1) {
if (_delete_handler != nullptr && _delete_handler->is_filter_data(data_version, read_helper)) {
if (_delete_handler != nullptr &&
_delete_handler->is_filter_data(data_version, read_helper)) {
is_data_left_vec[row_index] = 0;
(*filtered_rows)++;
}
@ -480,8 +483,8 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t
<< _schema_mapping[i].materialized_function;
return OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID;
}
VLOG_NOTICE << "_schema_mapping[" << i
<< "].materialized_function : " << _schema_mapping[i].materialized_function;
VLOG_NOTICE << "_schema_mapping[" << i << "].materialized_function : "
<< _schema_mapping[i].materialized_function;
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
// No need row, need to be filter
@ -597,10 +600,11 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t
if (newtype < reftype) {
VLOG_NOTICE << "type degraded while altering column. "
<< "column=" << mutable_block->tablet_schema().column(i).name()
<< ", origin_type="
<< ref_block->tablet_schema().column(ref_column).type()
<< ", alter_type=" << mutable_block->tablet_schema().column(i).type();
<< "column=" << mutable_block->tablet_schema().column(i).name()
<< ", origin_type="
<< ref_block->tablet_schema().column(ref_column).type()
<< ", alter_type="
<< mutable_block->tablet_schema().column(i).type();
}
}
} else {
@ -671,21 +675,21 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
return false;
}
RowBlock* temp = nullptr;
std::vector<RowCursor*> row_cursor_list((*row_block)->row_block_info().row_num, nullptr);
std::vector<std::unique_ptr<RowCursor>> row_cursor_list;
row_cursor_list.reserve((*row_block)->row_block_info().row_num);
// create an list of row cursor as long as the number of rows in data block.
for (size_t i = 0; i < (*row_block)->row_block_info().row_num; ++i) {
if ((row_cursor_list[i] = new (nothrow) RowCursor()) == nullptr) {
row_cursor_list.emplace_back(new (nothrow) RowCursor());
if (row_cursor_list[i] == nullptr) {
LOG(WARNING) << "failed to malloc RowCursor. size=" << sizeof(RowCursor);
goto SORT_ERR_EXIT;
return false;
}
if (row_cursor_list[i]->init((*row_block)->tablet_schema()) != OLAP_SUCCESS) {
goto SORT_ERR_EXIT;
return false;
}
(*row_block)->get_row(i, row_cursor_list[i]);
(*row_block)->get_row(i, row_cursor_list[i].get());
}
// Must use 'std::' because this class has a function whose name is sort too
@ -700,23 +704,10 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
_swap_row_block->finalize(row_cursor_list.size());
for (size_t i = 0; i < (*row_block)->row_block_info().row_num; ++i) {
SAFE_DELETE(row_cursor_list[i]);
}
// swap the row block for reducing memory allocating.
temp = *row_block;
*row_block = _swap_row_block;
_swap_row_block = temp;
std::swap(*row_block, _swap_row_block);
return true;
SORT_ERR_EXIT:
for (size_t i = 0; i < (*row_block)->row_block_info().row_num; ++i) {
SAFE_DELETE(row_cursor_list[i]);
}
return false;
}
RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, size_t memory_limitation)
@ -738,7 +729,7 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bo
if (_memory_limitation > 0 && _memory_allocated + row_block_size > _memory_limitation) {
VLOG_NOTICE << "RowBlockAllocator::alocate() memory exceeded. "
<< "m_memory_allocated=" << _memory_allocated;
<< "m_memory_allocated=" << _memory_allocated;
*row_block = nullptr;
return OLAP_SUCCESS;
}
@ -757,7 +748,8 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bo
_memory_allocated += row_block_size;
VLOG_NOTICE << "RowBlockAllocator::allocate() this=" << this << ", num_rows=" << num_rows
<< ", m_memory_allocated=" << _memory_allocated << ", row_block_addr=" << *row_block;
<< ", m_memory_allocated=" << _memory_allocated
<< ", row_block_addr=" << *row_block;
return OLAP_SUCCESS;
}
@ -770,8 +762,8 @@ void RowBlockAllocator::release(RowBlock* row_block) {
_memory_allocated -= row_block->capacity() * _row_len;
VLOG_NOTICE << "RowBlockAllocator::release() this=" << this
<< ", num_rows=" << row_block->capacity()
<< ", m_memory_allocated=" << _memory_allocated << ", row_block_addr=" << row_block;
<< ", num_rows=" << row_block->capacity()
<< ", m_memory_allocated=" << _memory_allocated << ", row_block_addr=" << row_block;
delete row_block;
}
@ -925,6 +917,24 @@ bool SchemaChangeDirectly::_write_row_block(RowsetWriter* rowset_writer, RowBloc
return true;
}
OLAPStatus reserve_block(std::unique_ptr<RowBlock, RowBlockDeleter>* block_handle_ptr, int row_num,
RowBlockAllocator* allocator) {
auto& block_handle = *block_handle_ptr;
if (block_handle == nullptr || block_handle->capacity() < row_num) {
// release old block and alloc new block
if (block_handle != nullptr) {
block_handle.reset();
}
RowBlock* new_row_block = nullptr;
auto res = allocator->allocate(&new_row_block, row_num, true);
RETURN_NOT_OK_LOG(res, "failed to allocate RowBlock.");
block_handle.reset(new_row_block);
} else {
block_handle->clear();
}
return OLAP_SUCCESS;
}
OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer, TabletSharedPtr new_tablet,
TabletSharedPtr base_tablet) {
@ -973,8 +983,13 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
}
VLOG_NOTICE << "init writer. new_tablet=" << new_tablet->full_name()
<< ", block_row_number=" << new_tablet->num_rows_per_row_block();
RowBlock* new_row_block = nullptr;
<< ", block_row_number=" << new_tablet->num_rows_per_row_block();
std::unique_ptr<RowBlock, RowBlockDeleter> new_row_block(nullptr, [&](RowBlock* block) {
if (block != nullptr) {
_row_block_allocator->release(block);
}
});
// Reset filtered_rows and merged_rows statistic
reset_merged_rows();
@ -983,38 +998,24 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
RowBlock* ref_row_block = nullptr;
rowset_reader->next_block(&ref_row_block);
while (ref_row_block != nullptr && ref_row_block->has_remaining()) {
// 注意这里强制分配和旧块等大的块(小了可能会存不下)
if (new_row_block == nullptr ||
new_row_block->capacity() < ref_row_block->row_block_info().row_num) {
if (new_row_block != nullptr) {
_row_block_allocator->release(new_row_block);
new_row_block = nullptr;
}
res = _row_block_allocator->allocate(&new_row_block,
ref_row_block->row_block_info().row_num, true);
if (OLAP_SUCCESS != res) {
LOG(WARNING) << "failed to allocate RowBlock.";
goto DIRECTLY_PROCESS_ERR;
}
} else {
new_row_block->clear();
}
// We will allocate blocks of the same size as before
// to ensure that the data can be stored
RETURN_NOT_OK(reserve_block(&new_row_block, ref_row_block->row_block_info().row_num,
_row_block_allocator));
// 将ref改为new。这一步按道理来说确实需要等大的块,但理论上和writer无关。
uint64_t filtered_rows = 0;
res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second,
new_row_block, &filtered_rows);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to change data in row block.";
goto DIRECTLY_PROCESS_ERR;
}
new_row_block.get(), &filtered_rows);
RETURN_NOT_OK_LOG(res, "failed to change data in row block.");
// rows filtered by delete handler one by one
add_filtered_rows(filtered_rows);
if (!_write_row_block(rowset_writer, new_row_block)) {
LOG(WARNING) << "failed to write row block.";
if (!_write_row_block(rowset_writer, new_row_block.get())) {
res = OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID;
goto DIRECTLY_PROCESS_ERR;
LOG(WARNING) << "failed to write row block.";
return res;
}
ref_row_block->clear();
@ -1022,8 +1023,7 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
}
if (OLAP_SUCCESS != rowset_writer->flush()) {
res = OLAP_ERR_ALTER_STATUS_ERR;
goto DIRECTLY_PROCESS_ERR;
return OLAP_ERR_ALTER_STATUS_ERR;
}
// rows filtered by zone map against delete handler
@ -1048,12 +1048,6 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
<< ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
<< ", new_index_rows=" << rowset_writer->num_rows();
}
DIRECTLY_PROCESS_ERR:
if (new_row_block) {
_row_block_allocator->release(new_row_block);
new_row_block = nullptr;
}
return res;
}
@ -1117,6 +1111,19 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
// src_rowsets to store the rowset generated by internal sorting
std::vector<RowsetSharedPtr> src_rowsets;
DeferOp defer([&]() {
// remove the intermediate rowsets generated by internal sorting
for (auto& row_set : src_rowsets) {
StorageEngine::instance()->add_unused_rowset(row_set);
}
for (auto block : row_block_arr) {
_row_block_allocator->release(block);
}
row_block_arr.clear();
});
_temp_delta_versions.first = _temp_delta_versions.second;
// Reset filtered_rows and merged_rows statistic
@ -1136,8 +1143,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
ref_row_block->row_block_info().row_num,
true)) {
LOG(WARNING) << "failed to allocate RowBlock.";
res = OLAP_ERR_INPUT_PARAMETER_ERROR;
goto SORTING_PROCESS_ERR;
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
if (new_row_block == nullptr) {
@ -1159,15 +1165,13 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
rowset_reader->version_hash(), new_tablet, new_rowset_type,
segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
res = OLAP_ERR_ALTER_STATUS_ERR;
goto SORTING_PROCESS_ERR;
return OLAP_ERR_ALTER_STATUS_ERR;
}
src_rowsets.push_back(rowset);
for (vector<RowBlock*>::iterator it = row_block_arr.begin(); it != row_block_arr.end();
++it) {
_row_block_allocator->release(*it);
for (auto block : row_block_arr) {
_row_block_allocator->release(block);
}
row_block_arr.clear();
@ -1181,18 +1185,18 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second,
new_row_block, &filtered_rows);
if (res != OLAP_SUCCESS) {
row_block_arr.push_back(new_row_block);
LOG(WARNING) << "failed to change data in row block.";
goto SORTING_PROCESS_ERR;
return res;
}
add_filtered_rows(filtered_rows);
if (new_row_block->row_block_info().row_num > 0) {
if (!row_block_sorter.sort(&new_row_block)) {
row_block_arr.push_back(new_row_block);
LOG(WARNING) << "failed to sort row block.";
res = OLAP_ERR_ALTER_STATUS_ERR;
OLAP_GOTO(SORTING_PROCESS_ERR);
return OLAP_ERR_ALTER_STATUS_ERR;
}
row_block_arr.push_back(new_row_block);
} else {
LOG(INFO) << "new block num rows is: " << new_row_block->row_block_info().row_num;
@ -1217,15 +1221,13 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
rowset_reader->version_hash(), new_tablet, new_rowset_type,
segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
res = OLAP_ERR_ALTER_STATUS_ERR;
goto SORTING_PROCESS_ERR;
return OLAP_ERR_ALTER_STATUS_ERR;
}
src_rowsets.push_back(rowset);
for (vector<RowBlock*>::iterator it = row_block_arr.begin(); it != row_block_arr.end();
++it) {
_row_block_allocator->release(*it);
for (auto block : row_block_arr) {
_row_block_allocator->release(block);
}
row_block_arr.clear();
@ -1244,8 +1246,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
}
} else if (!_external_sorting(src_rowsets, new_rowset_writer, new_tablet)) {
LOG(WARNING) << "failed to sorting externally.";
res = OLAP_ERR_ALTER_STATUS_ERR;
goto SORTING_PROCESS_ERR;
return OLAP_ERR_ALTER_STATUS_ERR;
}
add_filtered_rows(rowset_reader->filtered_rows());
@ -1269,20 +1270,6 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
<< ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
<< ", new_index_rows=" << new_rowset_writer->num_rows();
}
SORTING_PROCESS_ERR:
// remove the intermediate rowsets generated by internal sorting
for (vector<RowsetSharedPtr>::iterator it = src_rowsets.begin(); it != src_rowsets.end();
++it) {
StorageEngine::instance()->add_unused_rowset(*it);
}
for (vector<RowBlock*>::iterator it = row_block_arr.begin(); it != row_block_arr.end(); ++it) {
_row_block_allocator->release(*it);
}
row_block_arr.clear();
return res;
}
@ -1309,7 +1296,7 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro
context.version_hash = version_hash;
context.segments_overlap = segments_overlap;
VLOG_NOTICE << "init rowset builder. tablet=" << new_tablet->full_name()
<< ", block_row_size=" << new_tablet->num_rows_per_row_block();
<< ", block_row_size=" << new_tablet->num_rows_per_row_block();
std::unique_ptr<RowsetWriter> rowset_writer;
if (RowsetFactory::create_rowset_writer(context, &rowset_writer) != OLAP_SUCCESS) {
@ -1864,7 +1851,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
// c. 转换历史数据
for (auto& rs_reader : sc_params.ref_rowset_readers) {
VLOG_TRACE << "begin to convert a history rowset. version=" << rs_reader->version().first
<< "-" << rs_reader->version().second;
<< "-" << rs_reader->version().second;
// set status for monitor
// 只要有一个new_table为running,ref table就设置为running
@ -1935,13 +1922,14 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
goto PROCESS_ALTER_EXIT;
} else {
VLOG_NOTICE << "register new version. tablet=" << sc_params.new_tablet->full_name()
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
sc_params.new_tablet->release_push_lock();
VLOG_TRACE << "succeed to convert a history version."
<< " version=" << rs_reader->version().first << "-" << rs_reader->version().second;
<< " version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
// XXX: 此时应该不取消SchemaChange状态,因为新Delta还要转换成新旧Schema的版本
PROCESS_ALTER_EXIT : {
@ -1989,7 +1977,7 @@ OLAPStatus SchemaChangeHandler::_parse_request(
column_mapping->ref_column = column_index;
VLOG_NOTICE << "A column refered to existed column will be added after schema changing."
<< "column=" << column_name << ", ref_column=" << column_index;
<< "column=" << column_name << ", ref_column=" << column_index;
continue;
}
@ -2031,8 +2019,8 @@ OLAPStatus SchemaChangeHandler::_parse_request(
}
VLOG_TRACE << "A column with default value will be added after schema changing. "
<< "column=" << column_name
<< ", default_value=" << new_column.default_value();
<< "column=" << column_name
<< ", default_value=" << new_column.default_value();
continue;
}
@ -2044,8 +2032,8 @@ OLAPStatus SchemaChangeHandler::_parse_request(
}
VLOG_NOTICE << "A new schema delta is converted while dropping column. "
<< "Dropped column will be assigned as '0' for the older schema. "
<< "column=" << column_name;
<< "Dropped column will be assigned as '0' for the older schema. "
<< "column=" << column_name;
}
// Check if re-aggregation is needed.

View File

@ -19,6 +19,7 @@
#define DORIS_BE_SRC_OLAP_SCHEMA_CHANGE_H
#include <deque>
#include <functional>
#include <queue>
#include <vector>
@ -245,6 +246,7 @@ private:
DISALLOW_COPY_AND_ASSIGN(SchemaChangeHandler);
};
using RowBlockDeleter = std::function<void(RowBlock*)>;
} // namespace doris
#endif // DORIS_BE_SRC_OLAP_SCHEMA_CHANGE_H

View File

@ -604,6 +604,19 @@ void RowBatch::acquire_state(RowBatch* src) {
src->transfer_resource_ownership(this);
}
void RowBatch::deep_copy_to(RowBatch* dst) {
DCHECK(dst->_row_desc.equals(_row_desc));
DCHECK_EQ(dst->_num_rows, 0);
DCHECK_GE(dst->_capacity, _num_rows);
dst->add_rows(_num_rows);
for (int i = 0; i < _num_rows; ++i) {
TupleRow* src_row = get_row(i);
TupleRow* dst_row = reinterpret_cast<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row);
src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), dst->_tuple_data_pool.get(),
false);
}
dst->commit_rows(_num_rows);
}
// TODO: consider computing size of batches as they are built up
int RowBatch::total_byte_size() {
int result = 0;

View File

@ -260,7 +260,6 @@ Status TabletsChannel::cancel() {
for (auto& it : _tablet_writers) {
it.second->cancel();
}
DCHECK_EQ(_mem_tracker->consumption(), 0);
_state = kFinished;
return Status::OK();
}

View File

@ -55,95 +55,96 @@ Status HttpService::start() {
add_default_path_handlers(_web_page_handler.get(), _env->process_mem_tracker());
// register load
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load",
new MiniLoadAction(_env));
MiniLoadAction* miniload_action = _pool.add(new MiniLoadAction(_env));
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load", miniload_action);
StreamLoadAction* streamload_action = _pool.add(new StreamLoadAction(_env));
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load",
new StreamLoadAction(_env));
streamload_action);
// register download action
std::vector<std::string> allow_paths;
for (auto& path : _env->store_paths()) {
allow_paths.emplace_back(path.path);
}
DownloadAction* download_action = new DownloadAction(_env, allow_paths);
DownloadAction* download_action = _pool.add(new DownloadAction(_env, allow_paths));
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action);
_ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", download_action);
DownloadAction* tablet_download_action = new DownloadAction(_env, allow_paths);
DownloadAction* tablet_download_action = _pool.add(new DownloadAction(_env, allow_paths));
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_tablet/_download",
tablet_download_action);
_ev_http_server->register_handler(HttpMethod::GET, "/api/_tablet/_download",
tablet_download_action);
DownloadAction* error_log_download_action =
new DownloadAction(_env, _env->load_path_mgr()->get_load_error_file_dir());
_pool.add(new DownloadAction(_env, _env->load_path_mgr()->get_load_error_file_dir()));
_ev_http_server->register_handler(HttpMethod::GET, "/api/_load_error_log",
error_log_download_action);
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_load_error_log",
error_log_download_action);
// Register BE health action
HealthAction* health_action = new HealthAction(_env);
HealthAction* health_action = _pool.add(new HealthAction(_env));
_ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action);
// Register Tablets Info action
TabletsInfoAction* tablets_info_action = new TabletsInfoAction();
TabletsInfoAction* tablets_info_action = _pool.add(new TabletsInfoAction());
_ev_http_server->register_handler(HttpMethod::GET, "/tablets_json", tablets_info_action);
// Register Tablets Distribution action
TabletsDistributionAction* tablets_distribution_action = new TabletsDistributionAction();
TabletsDistributionAction* tablets_distribution_action = _pool.add(new TabletsDistributionAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/tablets_distribution", tablets_distribution_action);
// Register tablet migration action
TabletMigrationAction* tablet_migration_action = new TabletMigrationAction();
TabletMigrationAction* tablet_migration_action = _pool.add(new TabletMigrationAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/tablet_migration", tablet_migration_action);
// register pprof actions
PprofActions::setup(_env, _ev_http_server.get());
PprofActions::setup(_env, _ev_http_server.get(), _pool);
// register metrics
{
auto action = new MetricsAction(DorisMetrics::instance()->metric_registry());
auto action = _pool.add(new MetricsAction(DorisMetrics::instance()->metric_registry()));
_ev_http_server->register_handler(HttpMethod::GET, "/metrics", action);
}
MetaAction* meta_action = new MetaAction(HEADER);
MetaAction* meta_action = _pool.add(new MetaAction(HEADER));
_ev_http_server->register_handler(HttpMethod::GET, "/api/meta/header/{tablet_id}/{schema_hash}",
meta_action);
#ifndef BE_TEST
// Register BE checksum action
ChecksumAction* checksum_action = new ChecksumAction(_env);
ChecksumAction* checksum_action = _pool.add(new ChecksumAction(_env));
_ev_http_server->register_handler(HttpMethod::GET, "/api/checksum", checksum_action);
// Register BE reload tablet action
ReloadTabletAction* reload_tablet_action = new ReloadTabletAction(_env);
ReloadTabletAction* reload_tablet_action = _pool.add(new ReloadTabletAction(_env));
_ev_http_server->register_handler(HttpMethod::GET, "/api/reload_tablet", reload_tablet_action);
RestoreTabletAction* restore_tablet_action = new RestoreTabletAction(_env);
RestoreTabletAction* restore_tablet_action = _pool.add(new RestoreTabletAction(_env));
_ev_http_server->register_handler(HttpMethod::POST, "/api/restore_tablet",
restore_tablet_action);
// Register BE snapshot action
SnapshotAction* snapshot_action = new SnapshotAction(_env);
SnapshotAction* snapshot_action = _pool.add(new SnapshotAction(_env));
_ev_http_server->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action);
#endif
// 2 compaction actions
CompactionAction* show_compaction_action =
new CompactionAction(CompactionActionType::SHOW_INFO);
_pool.add(new CompactionAction(CompactionActionType::SHOW_INFO));
_ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/show",
show_compaction_action);
CompactionAction* run_compaction_action =
new CompactionAction(CompactionActionType::RUN_COMPACTION);
_pool.add(new CompactionAction(CompactionActionType::RUN_COMPACTION));
_ev_http_server->register_handler(HttpMethod::POST, "/api/compaction/run",
run_compaction_action);
CompactionAction* run_status_compaction_action =
new CompactionAction(CompactionActionType::RUN_COMPACTION_STATUS);
_pool.add(new CompactionAction(CompactionActionType::RUN_COMPACTION_STATUS));
_ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status",
run_status_compaction_action);
UpdateConfigAction* update_config_action = new UpdateConfigAction();
UpdateConfigAction* update_config_action = _pool.add(new UpdateConfigAction());
_ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action);
_ev_http_server->start();
@ -152,6 +153,7 @@ Status HttpService::start() {
void HttpService::stop() {
_ev_http_server->stop();
_pool.clear();
}
} // namespace doris

View File

@ -20,6 +20,7 @@
#include <memory>
#include "common/status.h"
#include "common/object_pool.h"
namespace doris {
@ -38,6 +39,7 @@ public:
private:
ExecEnv* _env;
ObjectPool _pool;
std::unique_ptr<EvHttpServer> _ev_http_server;
std::unique_ptr<WebPageHandler> _web_page_handler;