[improvement](exception-safe) create and prepare node/sink support exception safe (#20551)
This commit is contained in:
@ -92,3 +92,25 @@ inline std::string Exception::to_string() const {
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define RETURN_IF_ERROR_OR_CATCH_EXCEPTION(stmt) \
|
||||
do { \
|
||||
try { \
|
||||
doris::enable_thread_catch_bad_alloc++; \
|
||||
Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; \
|
||||
{ \
|
||||
Status _status_ = (stmt); \
|
||||
if (UNLIKELY(!_status_.ok())) { \
|
||||
return _status_; \
|
||||
} \
|
||||
} \
|
||||
} catch (const doris::Exception& e) { \
|
||||
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { \
|
||||
return Status::MemoryLimitExceeded(fmt::format( \
|
||||
"PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \
|
||||
e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); \
|
||||
} else { \
|
||||
return Status::Error(e.code(), e.to_string()); \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
@ -50,8 +50,6 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
const TPlanFragmentExecParams& params,
|
||||
const RowDescriptor& row_desc, RuntimeState* state,
|
||||
std::unique_ptr<DataSink>* sink, DescriptorTbl& desc_tbl) {
|
||||
DataSink* tmp_sink = nullptr;
|
||||
|
||||
switch (thrift_sink.type) {
|
||||
case TDataSinkType::DATA_STREAM_SINK: {
|
||||
if (!thrift_sink.__isset.stream_sink) {
|
||||
@ -62,11 +60,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
? params.send_query_statistics_with_every_batch
|
||||
: false;
|
||||
// TODO: figure out good buffer size based on size of output row
|
||||
tmp_sink = new vectorized::VDataStreamSender(
|
||||
sink->reset(new vectorized::VDataStreamSender(
|
||||
state, pool, params.sender_id, row_desc, thrift_sink.stream_sink,
|
||||
params.destinations, 16 * 1024, send_query_statistics_with_every_batch);
|
||||
params.destinations, 16 * 1024, send_query_statistics_with_every_batch));
|
||||
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
|
||||
sink->reset(tmp_sink);
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::RESULT_SINK: {
|
||||
@ -75,9 +72,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
}
|
||||
|
||||
// TODO: figure out good buffer size based on size of output row
|
||||
tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs,
|
||||
thrift_sink.result_sink, 4096);
|
||||
sink->reset(tmp_sink);
|
||||
sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
|
||||
thrift_sink.result_sink, 4096));
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::RESULT_FILE_SINK: {
|
||||
@ -92,17 +88,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
: false;
|
||||
// Result file sink is not the top sink
|
||||
if (params.__isset.destinations && params.destinations.size() > 0) {
|
||||
tmp_sink = new doris::vectorized::VResultFileSink(
|
||||
sink->reset(new doris::vectorized::VResultFileSink(
|
||||
pool, params.sender_id, row_desc, thrift_sink.result_file_sink,
|
||||
params.destinations, 16 * 1024, send_query_statistics_with_every_batch,
|
||||
output_exprs, desc_tbl);
|
||||
output_exprs, desc_tbl));
|
||||
} else {
|
||||
tmp_sink = new doris::vectorized::VResultFileSink(
|
||||
sink->reset(new doris::vectorized::VResultFileSink(
|
||||
pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
|
||||
send_query_statistics_with_every_batch, output_exprs);
|
||||
send_query_statistics_with_every_batch, output_exprs));
|
||||
}
|
||||
|
||||
sink->reset(tmp_sink);
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::MEMORY_SCRATCH_SINK: {
|
||||
@ -110,8 +104,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
return Status::InternalError("Missing data buffer sink.");
|
||||
}
|
||||
|
||||
tmp_sink = new vectorized::MemoryScratchSink(row_desc, output_exprs);
|
||||
sink->reset(tmp_sink);
|
||||
sink->reset(new vectorized::MemoryScratchSink(row_desc, output_exprs));
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::MYSQL_TABLE_SINK: {
|
||||
@ -193,7 +186,6 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
const size_t& local_param_idx, const RowDescriptor& row_desc,
|
||||
RuntimeState* state, std::unique_ptr<DataSink>* sink,
|
||||
DescriptorTbl& desc_tbl) {
|
||||
DataSink* tmp_sink = nullptr;
|
||||
const auto& local_params = params.local_params[local_param_idx];
|
||||
switch (thrift_sink.type) {
|
||||
case TDataSinkType::DATA_STREAM_SINK: {
|
||||
@ -205,11 +197,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
? params.send_query_statistics_with_every_batch
|
||||
: false;
|
||||
// TODO: figure out good buffer size based on size of output row
|
||||
tmp_sink = new vectorized::VDataStreamSender(
|
||||
sink->reset(new vectorized::VDataStreamSender(
|
||||
state, pool, local_params.sender_id, row_desc, thrift_sink.stream_sink,
|
||||
params.destinations, 16 * 1024, send_query_statistics_with_every_batch);
|
||||
params.destinations, 16 * 1024, send_query_statistics_with_every_batch));
|
||||
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
|
||||
sink->reset(tmp_sink);
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::RESULT_SINK: {
|
||||
@ -218,9 +209,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
}
|
||||
|
||||
// TODO: figure out good buffer size based on size of output row
|
||||
tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs,
|
||||
thrift_sink.result_sink, 4096);
|
||||
sink->reset(tmp_sink);
|
||||
sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
|
||||
thrift_sink.result_sink, 4096));
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::RESULT_FILE_SINK: {
|
||||
@ -235,17 +225,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
: false;
|
||||
// Result file sink is not the top sink
|
||||
if (params.__isset.destinations && params.destinations.size() > 0) {
|
||||
tmp_sink = new doris::vectorized::VResultFileSink(
|
||||
sink->reset(new doris::vectorized::VResultFileSink(
|
||||
pool, local_params.sender_id, row_desc, thrift_sink.result_file_sink,
|
||||
params.destinations, 16 * 1024, send_query_statistics_with_every_batch,
|
||||
output_exprs, desc_tbl);
|
||||
output_exprs, desc_tbl));
|
||||
} else {
|
||||
tmp_sink = new doris::vectorized::VResultFileSink(
|
||||
sink->reset(new doris::vectorized::VResultFileSink(
|
||||
pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
|
||||
send_query_statistics_with_every_batch, output_exprs);
|
||||
send_query_statistics_with_every_batch, output_exprs));
|
||||
}
|
||||
|
||||
sink->reset(tmp_sink);
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::MEMORY_SCRATCH_SINK: {
|
||||
@ -253,8 +241,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
return Status::InternalError("Missing data buffer sink.");
|
||||
}
|
||||
|
||||
tmp_sink = new vectorized::MemoryScratchSink(row_desc, output_exprs);
|
||||
sink->reset(tmp_sink);
|
||||
sink->reset(new vectorized::MemoryScratchSink(row_desc, output_exprs));
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::MYSQL_TABLE_SINK: {
|
||||
|
||||
@ -51,7 +51,7 @@ Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space
|
||||
size_t uncompressed_size = Slice::compute_total_size(body);
|
||||
if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) {
|
||||
faststring buf;
|
||||
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(codec->compress(body, uncompressed_size, &buf)));
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(body, uncompressed_size, &buf));
|
||||
double space_saving = 1.0 - static_cast<double>(buf.size()) / uncompressed_size;
|
||||
// return compressed body only when it saves more than min_space_saving
|
||||
if (space_saving > 0 && space_saving >= min_space_saving) {
|
||||
|
||||
@ -296,7 +296,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
|
||||
_runtime_state->set_num_per_fragment_instances(request.num_senders);
|
||||
|
||||
if (request.fragment.__isset.output_sink) {
|
||||
RETURN_IF_ERROR(DataSink::create_data_sink(
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
|
||||
_runtime_state->obj_pool(), request.fragment.output_sink,
|
||||
request.fragment.output_exprs, request, idx, _root_plan->row_desc(),
|
||||
_runtime_state.get(), &_sink, *desc_tbl));
|
||||
|
||||
@ -160,8 +160,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
|
||||
|
||||
// set up plan
|
||||
DCHECK(request.__isset.fragment);
|
||||
RETURN_IF_ERROR(ExecNode::create_tree(_runtime_state.get(), obj_pool(), request.fragment.plan,
|
||||
*desc_tbl, &_plan));
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ExecNode::create_tree(
|
||||
_runtime_state.get(), obj_pool(), request.fragment.plan, *desc_tbl, &_plan));
|
||||
|
||||
// set #senders of exchange nodes before calling Prepare()
|
||||
std::vector<ExecNode*> exch_nodes;
|
||||
@ -173,6 +173,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
|
||||
static_cast<doris::vectorized::VExchangeNode*>(exch_node)->set_num_senders(num_senders);
|
||||
}
|
||||
|
||||
// TODO Is it exception safe?
|
||||
RETURN_IF_ERROR(_plan->prepare(_runtime_state.get()));
|
||||
// set scan ranges
|
||||
std::vector<ExecNode*> scan_nodes;
|
||||
@ -211,10 +212,10 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
|
||||
|
||||
// set up sink, if required
|
||||
if (request.fragment.__isset.output_sink) {
|
||||
RETURN_IF_ERROR(DataSink::create_data_sink(obj_pool(), request.fragment.output_sink,
|
||||
request.fragment.output_exprs, params,
|
||||
row_desc(), runtime_state(), &_sink, *desc_tbl));
|
||||
RETURN_IF_ERROR(_sink->prepare(runtime_state()));
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
|
||||
obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, params,
|
||||
row_desc(), runtime_state(), &_sink, *desc_tbl));
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state()));
|
||||
|
||||
RuntimeProfile* sink_profile = _sink->profile();
|
||||
if (sink_profile != nullptr) {
|
||||
|
||||
@ -828,8 +828,8 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
|
||||
RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec));
|
||||
|
||||
faststring buf_compressed;
|
||||
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(codec->compress(
|
||||
Slice(column_values.data(), content_uncompressed_size), &buf_compressed)));
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(
|
||||
Slice(column_values.data(), content_uncompressed_size), &buf_compressed));
|
||||
size_t compressed_size = buf_compressed.size();
|
||||
if (LIKELY(compressed_size < content_uncompressed_size)) {
|
||||
pblock->set_column_values(buf_compressed.data(), buf_compressed.size());
|
||||
|
||||
@ -164,13 +164,16 @@ bool DataTypeDecimal<T>::parse_from_string(const std::string& str, T* res) const
|
||||
DataTypePtr create_decimal(UInt64 precision_value, UInt64 scale_value, bool use_v2) {
|
||||
if (precision_value < min_decimal_precision() ||
|
||||
precision_value > max_decimal_precision<Decimal128>()) {
|
||||
LOG(WARNING) << "Wrong precision " << precision_value;
|
||||
return nullptr;
|
||||
throw doris::Exception(doris::ErrorCode::NOT_IMPLEMENTED_ERROR,
|
||||
"Wrong precision {}, min: {}, max: {}", precision_value,
|
||||
min_decimal_precision(), max_decimal_precision<Decimal128>());
|
||||
}
|
||||
|
||||
if (static_cast<UInt64>(scale_value) > precision_value) {
|
||||
LOG(WARNING) << "Negative scales and scales larger than precision are not supported";
|
||||
return nullptr;
|
||||
throw doris::Exception(doris::ErrorCode::NOT_IMPLEMENTED_ERROR,
|
||||
"Negative scales and scales larger than precision are not "
|
||||
"supported, scale_value: {}, precision_value: {}",
|
||||
scale_value, precision_value);
|
||||
}
|
||||
|
||||
if (use_v2) {
|
||||
|
||||
@ -202,8 +202,8 @@ void DataTypeDateTimeV2::cast_to_date_v2(const UInt64 from, UInt32& to) {
|
||||
|
||||
DataTypePtr create_datetimev2(UInt64 scale_value) {
|
||||
if (scale_value > 6) {
|
||||
LOG(WARNING) << "Wrong scale " << scale_value;
|
||||
return nullptr;
|
||||
throw doris::Exception(doris::ErrorCode::NOT_IMPLEMENTED_ERROR, "scale_value > 6 {}",
|
||||
scale_value);
|
||||
}
|
||||
return std::make_shared<DataTypeDateTimeV2>(scale_value);
|
||||
}
|
||||
|
||||
@ -1302,8 +1302,8 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
|
||||
for (auto& conjunct : _lazy_read_ctx.conjuncts) {
|
||||
filter_conjuncts.push_back(conjunct);
|
||||
}
|
||||
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts(
|
||||
filter_conjuncts, nullptr, block, _filter.get(), &can_filter_all)));
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
|
||||
filter_conjuncts, nullptr, block, _filter.get(), &can_filter_all));
|
||||
|
||||
if (_lazy_read_ctx.resize_first_column) {
|
||||
block->get_by_position(0).column->assume_mutable()->clear();
|
||||
|
||||
@ -785,8 +785,8 @@ Status RowGroupReader::_rewrite_dict_predicates() {
|
||||
// The following process may be tricky and time-consuming, but we have no other way.
|
||||
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
|
||||
}
|
||||
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
|
||||
ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep)));
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block(
|
||||
ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep));
|
||||
if (dict_pos != 0) {
|
||||
// We have to clean the first column to insert right data.
|
||||
temp_block.get_by_position(0).column->assume_mutable()->clear();
|
||||
|
||||
@ -309,16 +309,11 @@ Status VFileScanner::_init_src_block(Block* block) {
|
||||
auto it = _name_to_col_type.find(slot->col_name());
|
||||
if (it == _name_to_col_type.end() || _is_dynamic_schema) {
|
||||
// not exist in file, using type from _input_tuple_desc
|
||||
data_type =
|
||||
DataTypeFactory::instance().create_data_type(slot->type(), slot->is_nullable());
|
||||
RETURN_IF_CATCH_EXCEPTION(data_type = DataTypeFactory::instance().create_data_type(
|
||||
slot->type(), slot->is_nullable()));
|
||||
} else {
|
||||
data_type = DataTypeFactory::instance().create_data_type(it->second, true);
|
||||
}
|
||||
if (data_type == nullptr) {
|
||||
return Status::NotSupported("Not support data type {} for column {}",
|
||||
it == _name_to_col_type.end() ? slot->type().debug_string()
|
||||
: it->second.debug_string(),
|
||||
slot->col_name());
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
data_type = DataTypeFactory::instance().create_data_type(it->second, true));
|
||||
}
|
||||
MutableColumnPtr data_column = data_type->create_column();
|
||||
_src_block.insert(
|
||||
|
||||
@ -179,7 +179,7 @@ Status VSortNode::open(RuntimeState* state) {
|
||||
ExecNode::get_next,
|
||||
_children[0], std::placeholders::_1, std::placeholders::_2,
|
||||
std::placeholders::_3)));
|
||||
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(sink(state, upstream_block.get(), eos)));
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink(state, upstream_block.get(), eos));
|
||||
} while (!eos);
|
||||
|
||||
child(0)->close(state);
|
||||
@ -191,7 +191,7 @@ Status VSortNode::open(RuntimeState* state) {
|
||||
}
|
||||
|
||||
Status VSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) {
|
||||
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_sorter->get_next(state, output_block, eos)));
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sorter->get_next(state, output_block, eos));
|
||||
reached_limit(output_block, eos);
|
||||
if (*eos) {
|
||||
_runtime_profile->add_info_string("Spilled", _sorter->is_spilled() ? "true" : "false");
|
||||
|
||||
Reference in New Issue
Block a user