diff --git a/be/src/common/status.h b/be/src/common/status.h index ec8fbba4fe..d989ba12c3 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -209,6 +209,8 @@ E(COLUMN_READ_STREAM, -1706); E(COLUMN_STREAM_NOT_EXIST, -1716); E(COLUMN_VALUE_NULL, -1717); E(COLUMN_SEEK_ERROR, -1719); +E(COLUMN_NO_MATCH_OFFSETS_SIZE, -1720); +E(COLUMN_NO_MATCH_FILTER_SIZE, -1721); E(DELETE_INVALID_CONDITION, -1900); E(DELETE_UPDATE_HEADER_FAILED, -1901); E(DELETE_SAVE_HEADER_FAILED, -1902); diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 043913a5f8..20633d2c48 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -194,6 +194,7 @@ Status BaseScanner::init_expr_ctxes() { return Status::OK(); } +// need exception safety Status BaseScanner::_filter_src_block() { auto origin_column_num = _src_block.columns(); // filter block @@ -348,6 +349,7 @@ Status BaseScanner::_init_src_block() { return Status::OK(); } +// need exception safety Status BaseScanner::_fill_dest_block(vectorized::Block* dest_block, bool* eof) { *eof = _scanner_eof; _fill_columns_from_path(); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 4d4aa4ae56..5d3c375f54 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -1601,13 +1601,8 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector& read_colu } Status SegmentIterator::next_batch(vectorized::Block* block) { - Status st; - try { - st = _next_batch_internal(block); - } catch (const doris::Exception& e) { - st = Status::Error(e.code(), e.to_string()); - } - return st; + RETURN_IF_CATCH_EXCEPTION({ return _next_batch_internal(block); }); + return Status::OK(); } Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { @@ -1882,7 +1877,8 @@ Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& } selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter); - vectorized::Block::filter_block_internal(block, _columns_to_filter, filter); + RETURN_IF_CATCH_EXCEPTION( + vectorized::Block::filter_block_internal(block, _columns_to_filter, filter)); } else if (auto* const_column = vectorized::check_and_get_column(*filter_column)) { bool ret = const_column->get_bool(0); @@ -1898,7 +1894,8 @@ Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& *filter_column) .get_data(); selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter); - vectorized::Block::filter_block_internal(block, _columns_to_filter, filter); + RETURN_IF_CATCH_EXCEPTION( + vectorized::Block::filter_block_internal(block, _columns_to_filter, filter)); } return Status::OK(); } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index c86691a062..04e8915378 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -110,7 +110,7 @@ public: }; inline thread_local ThreadContextPtr thread_context_ptr; -inline thread_local bool enable_thread_catch_bad_alloc = false; +inline thread_local int enable_thread_catch_bad_alloc = 0; // To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS // in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS. @@ -292,33 +292,22 @@ private: tracker->transfer_to( \ size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()) -// Consider catching other memory errors, such as memset failure, etc. -#define RETURN_IF_CATCH_BAD_ALLOC(stmt) \ - do { \ - doris::thread_context()->thread_mem_tracker_mgr->clear_exceed_mem_limit_msg(); \ - if (doris::enable_thread_catch_bad_alloc) { \ - try { \ - { stmt; } \ - } catch (std::bad_alloc const& e) { \ - doris::thread_context()->thread_mem_tracker()->print_log_usage( \ - doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg()); \ - return Status::MemoryLimitExceeded(fmt::format( \ - "PreCatch {}, {}", e.what(), \ - doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg())); \ - } \ - } else { \ - try { \ - doris::enable_thread_catch_bad_alloc = true; \ - Defer defer {[&]() { doris::enable_thread_catch_bad_alloc = false; }}; \ - { stmt; } \ - } catch (std::bad_alloc const& e) { \ - doris::thread_context()->thread_mem_tracker()->print_log_usage( \ - doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg()); \ - return Status::MemoryLimitExceeded(fmt::format( \ - "PreCatch {}, {}", e.what(), \ - doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg())); \ - } \ - } \ +#define RETURN_IF_CATCH_EXCEPTION(stmt) \ + do { \ + try { \ + doris::thread_context()->thread_mem_tracker_mgr->clear_exceed_mem_limit_msg(); \ + doris::enable_thread_catch_bad_alloc++; \ + Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; \ + { stmt; } \ + } catch (std::bad_alloc const& e) { \ + doris::thread_context()->thread_mem_tracker()->print_log_usage( \ + doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg()); \ + return Status::MemoryLimitExceeded(fmt::format( \ + "PreCatch {}, {}", e.what(), \ + doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg())); \ + } catch (const doris::Exception& e) { \ + return Status::Error(e.code(), e.to_string()); \ + } \ } while (0) // Mem Hook to consume thread mem tracker @@ -350,6 +339,7 @@ private: #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0 #define CONSUME_MEM_TRACKER(size) (void)0 #define RELEASE_MEM_TRACKER(size) (void)0 -#define RETURN_IF_CATCH_BAD_ALLOC(stmt) (stmt) +#define RETURN_IF_CATCH_EXCEPTION(stmt) \ + { stmt; } #endif } // namespace doris diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 35f016d210..79b1aa7914 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -440,7 +440,7 @@ size_t ColumnArray::filter_number(const Filter& filter) { ColumnPtr ColumnArray::filter_string(const Filter& filt, ssize_t result_size_hint) const { size_t col_size = get_offsets().size(); - if (col_size != filt.size()) LOG(FATAL) << "Size of filter doesn't match size of column."; + column_match_filter_size(col_size, filt.size()); if (0 == col_size) return ColumnArray::create(data); @@ -503,9 +503,7 @@ ColumnPtr ColumnArray::filter_string(const Filter& filt, ssize_t result_size_hin size_t ColumnArray::filter_string(const Filter& filter) { size_t col_size = get_offsets().size(); - if (col_size != filter.size()) { - LOG(FATAL) << "Size of filter doesn't match size of column."; - } + column_match_filter_size(col_size, filter.size()); if (0 == col_size) { return ColumnArray::create(data); @@ -567,7 +565,7 @@ size_t ColumnArray::filter_string(const Filter& filter) { ColumnPtr ColumnArray::filter_generic(const Filter& filt, ssize_t result_size_hint) const { size_t size = get_offsets().size(); - if (size != filt.size()) LOG(FATAL) << "Size of filter doesn't match size of column."; + column_match_filter_size(size, filt.size()); if (size == 0) return ColumnArray::create(data); @@ -606,9 +604,7 @@ ColumnPtr ColumnArray::filter_generic(const Filter& filt, ssize_t result_size_hi size_t ColumnArray::filter_generic(const Filter& filter) { size_t size = get_offsets().size(); - if (size != filter.size()) { - LOG(FATAL) << "Size of filter doesn't match size of column."; - } + column_match_filter_size(size, filter.size()); if (size == 0) { return 0; @@ -788,8 +784,7 @@ void ColumnArray::replicate(const uint32_t* counts, size_t target_size, IColumn& template ColumnPtr ColumnArray::replicate_number(const IColumn::Offsets& replicate_offsets) const { size_t col_size = size(); - if (col_size != replicate_offsets.size()) - LOG(FATAL) << "Size of offsets doesn't match size of column."; + column_match_offsets_size(col_size, replicate_offsets.size()); MutableColumnPtr res = clone_empty(); @@ -836,8 +831,7 @@ ColumnPtr ColumnArray::replicate_number(const IColumn::Offsets& replicate_offset ColumnPtr ColumnArray::replicate_string(const IColumn::Offsets& replicate_offsets) const { size_t col_size = size(); - if (col_size != replicate_offsets.size()) - LOG(FATAL) << "Size of offsets doesn't match size of column."; + column_match_offsets_size(col_size, replicate_offsets.size()); MutableColumnPtr res = clone_empty(); @@ -910,8 +904,7 @@ ColumnPtr ColumnArray::replicate_string(const IColumn::Offsets& replicate_offset ColumnPtr ColumnArray::replicate_const(const IColumn::Offsets& replicate_offsets) const { size_t col_size = size(); - if (col_size != replicate_offsets.size()) - LOG(FATAL) << "Size of offsets doesn't match size of column."; + column_match_offsets_size(col_size, replicate_offsets.size()); if (0 == col_size) return clone_empty(); @@ -944,8 +937,7 @@ ColumnPtr ColumnArray::replicate_const(const IColumn::Offsets& replicate_offsets ColumnPtr ColumnArray::replicate_generic(const IColumn::Offsets& replicate_offsets) const { size_t col_size = size(); - if (col_size != replicate_offsets.size()) - LOG(FATAL) << "Size of offsets doesn't match size of column."; + column_match_offsets_size(col_size, replicate_offsets.size()); MutableColumnPtr res = clone_empty(); ColumnArray& res_concrete = assert_cast(*res); diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index 95a6793853..138f3d0fb3 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -29,6 +29,7 @@ #include "vec/columns/column_impl.h" #include "vec/columns/column_string.h" #include "vec/columns/column_vector.h" +#include "vec/columns/columns_common.h" #include "vec/core/types.h" namespace doris::vectorized { @@ -313,9 +314,7 @@ template ColumnPtr ColumnComplexType::filter(const IColumn::Filter& filt, ssize_t result_size_hint) const { size_t size = data.size(); - if (size != filt.size()) { - LOG(FATAL) << "Size of filter doesn't match size of column."; - } + column_match_filter_size(size, filt.size()); if (data.size() == 0) return this->create(); auto res = this->create(); @@ -340,9 +339,7 @@ ColumnPtr ColumnComplexType::filter(const IColumn::Filter& filt, template size_t ColumnComplexType::filter(const IColumn::Filter& filter) { size_t size = data.size(); - if (size != filter.size()) { - LOG(FATAL) << "Size of filter doesn't match size of column."; - } + column_match_filter_size(size, filter.size()); if (data.size() == 0) { return 0; @@ -391,9 +388,7 @@ ColumnPtr ColumnComplexType::permute(const IColumn::Permutation& perm, size_t template ColumnPtr ColumnComplexType::replicate(const IColumn::Offsets& offsets) const { size_t size = data.size(); - if (size != offsets.size()) { - LOG(FATAL) << "Size of offsets doesn't match size of column."; - } + column_match_offsets_size(size, offsets.size()); if (0 == size) return this->create(); diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index 96d0c013b1..0beec35355 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -55,19 +55,13 @@ ColumnPtr ColumnConst::remove_low_cardinality() const { } ColumnPtr ColumnConst::filter(const Filter& filt, ssize_t /*result_size_hint*/) const { - if (s != filt.size()) { - LOG(FATAL) << fmt::format("Size of filter ({}) doesn't match size of column ({})", - filt.size(), s); - } + column_match_filter_size(s, filt.size()); return ColumnConst::create(data, count_bytes_in_filter(filt)); } size_t ColumnConst::filter(const Filter& filter) { - if (s != filter.size()) { - LOG(FATAL) << fmt::format("Size of filter ({}) doesn't match size of column ({})", - filter.size(), s); - } + column_match_filter_size(s, filter.size()); const auto result_size = count_bytes_in_filter(filter); resize(result_size); @@ -75,10 +69,7 @@ size_t ColumnConst::filter(const Filter& filter) { } ColumnPtr ColumnConst::replicate(const Offsets& offsets) const { - if (s != offsets.size()) { - LOG(FATAL) << fmt::format("Size of offsets ({}) doesn't match size of column ({})", - offsets.size(), s); - } + column_match_offsets_size(s, offsets.size()); size_t replicated_size = 0 == s ? 0 : offsets.back(); return ColumnConst::create(data, replicated_size); diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 7731e05389..ab375d1d0d 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -276,9 +276,7 @@ void ColumnDecimal::insert_range_from(const IColumn& src, size_t start, size_ template ColumnPtr ColumnDecimal::filter(const IColumn::Filter& filt, ssize_t result_size_hint) const { size_t size = data.size(); - if (size != filt.size()) { - LOG(FATAL) << "Size of filter doesn't match size of column."; - } + column_match_filter_size(size, filt.size()); auto res = this->create(0, scale); Container& res_data = res->get_data(); @@ -327,9 +325,7 @@ ColumnPtr ColumnDecimal::filter(const IColumn::Filter& filt, ssize_t result_s template size_t ColumnDecimal::filter(const IColumn::Filter& filter) { size_t size = data.size(); - if (size != filter.size()) { - LOG(FATAL) << "Size of filter doesn't match size of column."; - } + column_match_filter_size(size, filter.size()); const UInt8* filter_pos = filter.data(); const UInt8* filter_end = filter_pos + size; @@ -382,9 +378,7 @@ size_t ColumnDecimal::filter(const IColumn::Filter& filter) { template ColumnPtr ColumnDecimal::replicate(const IColumn::Offsets& offsets) const { size_t size = data.size(); - if (size != offsets.size()) { - LOG(FATAL) << "Size of offsets doesn't match size of column."; - } + column_match_offsets_size(size, offsets.size()); auto res = this->create(0, scale); if (0 == size) return res; diff --git a/be/src/vec/columns/column_dummy.h b/be/src/vec/columns/column_dummy.h index b66b284e3f..f6b36ae80a 100644 --- a/be/src/vec/columns/column_dummy.h +++ b/be/src/vec/columns/column_dummy.h @@ -108,9 +108,7 @@ public: } ColumnPtr replicate(const Offsets& offsets) const override { - if (s != offsets.size()) { - LOG(FATAL) << "Size of offsets doesn't match size of column."; - } + column_match_offsets_size(s, offsets.size()); return clone_dummy(offsets.back()); } diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index eff29d17a4..c6317e7453 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -377,9 +377,7 @@ void ColumnString::get_permutation(bool reverse, size_t limit, int /*nan_directi ColumnPtr ColumnString::replicate(const Offsets& replicate_offsets) const { size_t col_size = size(); - if (col_size != replicate_offsets.size()) { - LOG(FATAL) << "Size of offsets doesn't match size of column."; - } + column_match_offsets_size(col_size, replicate_offsets.size()); auto res = ColumnString::create(); diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 4deaa46b69..45c8b94ddb 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -387,10 +387,7 @@ void ColumnVector::insert_indices_from(const IColumn& src, const int* indices template ColumnPtr ColumnVector::filter(const IColumn::Filter& filt, ssize_t result_size_hint) const { size_t size = data.size(); - if (size != filt.size()) { - LOG(FATAL) << "Size of filter doesn't match size of column. data size: " << size - << ", filter size: " << filt.size() << get_stack_trace(); - } + column_match_filter_size(size, filt.size()); auto res = this->create(); if constexpr (std::is_same_v) { @@ -444,10 +441,7 @@ ColumnPtr ColumnVector::filter(const IColumn::Filter& filt, ssize_t result_si template size_t ColumnVector::filter(const IColumn::Filter& filter) { size_t size = data.size(); - if (size != filter.size()) { - LOG(FATAL) << "Size of filter doesn't match size of column. data size: " << size - << ", filter size: " << filter.size() << get_stack_trace(); - } + column_match_filter_size(size, filter.size()); const UInt8* filter_pos = filter.data(); const UInt8* filter_end = filter_pos + size; @@ -523,9 +517,7 @@ ColumnPtr ColumnVector::permute(const IColumn::Permutation& perm, size_t limi template ColumnPtr ColumnVector::replicate(const IColumn::Offsets& offsets) const { size_t size = data.size(); - if (size != offsets.size()) { - LOG(FATAL) << "Size of offsets doesn't match size of column."; - } + column_match_offsets_size(size, offsets.size()); auto res = this->create(); if constexpr (std::is_same_v) { diff --git a/be/src/vec/columns/columns_common.cpp b/be/src/vec/columns/columns_common.cpp index 5665fd2998..4a4200a002 100644 --- a/be/src/vec/columns/columns_common.cpp +++ b/be/src/vec/columns/columns_common.cpp @@ -153,9 +153,7 @@ void filter_arrays_impl_generic(const PaddedPODArray& src_elems, PaddedPODArray* res_offsets, const IColumn::Filter& filt, ssize_t result_size_hint) { const size_t size = src_offsets.size(); - if (size != filt.size()) { - LOG(FATAL) << "Size of filter doesn't match size of column."; - } + column_match_filter_size(size, filt.size()); constexpr int ASSUME_STRING_LENGTH = 5; ResultOffsetsBuilder result_offsets_builder(res_offsets); @@ -233,9 +231,7 @@ size_t filter_arrays_impl_generic_without_reserving(PaddedPODArray& elems, PaddedPODArray& offsets, const IColumn::Filter& filter) { const size_t size = offsets.size(); - if (offsets.size() != filter.size()) { - LOG(FATAL) << "Size of filter doesn't match size of column."; - } + column_match_filter_size(size, filter.size()); /// If no need to filter the `offsets`, here do not reset the end ptr of `offsets` if constexpr (!std::is_same_v>) { diff --git a/be/src/vec/columns/columns_common.h b/be/src/vec/columns/columns_common.h index 3bae20f0ba..6102cef86b 100644 --- a/be/src/vec/columns/columns_common.h +++ b/be/src/vec/columns/columns_common.h @@ -59,6 +59,24 @@ template size_t filter_arrays_impl_only_data(PaddedPODArray& data, PaddedPODArray& offsets, const IColumn::Filter& filter); +inline void column_match_offsets_size(size_t size, size_t offsets_size) { + if (size != offsets_size) { + throw doris::Exception( + ErrorCode::COLUMN_NO_MATCH_OFFSETS_SIZE, + "Size of offsets doesn't match size of column: size={}, offsets.size={}", size, + offsets_size); + } +} + +inline void column_match_filter_size(size_t size, size_t filter_size) { + if (size != filter_size) { + throw doris::Exception( + ErrorCode::COLUMN_NO_MATCH_FILTER_SIZE, + "Size of filter doesn't match size of column: size={}, filter.size={}", size, + filter_size); + } +} + namespace detail { template const PaddedPODArray* get_indexes_data(const IColumn& indexes); diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp index de36223915..406176ff51 100644 --- a/be/src/vec/common/sort/heap_sorter.cpp +++ b/be/src/vec/common/sort/heap_sorter.cpp @@ -74,7 +74,7 @@ Status HeapSorter::append_block(Block* block) { if (_heap_size == _heap->size()) { { SCOPED_TIMER(_topn_filter_timer); - _do_filter(block_view->value(), num_rows); + RETURN_IF_CATCH_EXCEPTION(_do_filter(block_view->value(), num_rows)); } size_t remain_rows = block_view->value().block.rows(); _topn_filter_rows += (num_rows - remain_rows); @@ -155,6 +155,7 @@ Field HeapSorter::get_top_value() { return field; } +// need exception safety void HeapSorter::_do_filter(HeapSortCursorBlockView& block_view, size_t num_rows) { const auto& top_cursor = _heap->top(); const int cursor_rid = top_cursor.row_id(); diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index b5315a1489..2e630fb44c 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -305,13 +305,9 @@ Status FullSorter::append_block(Block* block) { DCHECK(data[i].type->equals(*(arrival_data[i].type))) << " type1: " << data[i].type->get_name() << " type2: " << arrival_data[i].type->get_name(); - try { - //TODO: to eliminate unnecessary expansion, we need a `insert_range_from_const` for every column type. - RETURN_IF_CATCH_BAD_ALLOC(data[i].column->assume_mutable()->insert_range_from( - *arrival_data[i].column->convert_to_full_column_if_const(), 0, sz)); - } catch (const doris::Exception& e) { - return Status::Error(e.code(), e.to_string()); - } + //TODO: to eliminate unnecessary expansion, we need a `insert_range_from_const` for every column type. + RETURN_IF_CATCH_EXCEPTION(data[i].column->assume_mutable()->insert_range_from( + *arrival_data[i].column->convert_to_full_column_if_const(), 0, sz)); } block->clear_column_data(); } diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 0625718767..63b79d7965 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -738,7 +738,7 @@ Status Block::filter_block(Block* block, const std::vector& columns_to for (size_t i = 0; i < size; ++i) { filter_data[i] &= !null_map[i]; } - filter_block_internal(block, columns_to_filter, filter); + RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); } else if (auto* const_column = check_and_get_column(*filter_column)) { bool ret = const_column->get_bool(0); if (!ret) { @@ -750,7 +750,7 @@ Status Block::filter_block(Block* block, const std::vector& columns_to const IColumn::Filter& filter = assert_cast&>(*filter_column) .get_data(); - filter_block_internal(block, columns_to_filter, filter); + RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); } erase_useless_column(block, column_to_keep); diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 36810edd28..dba2e62ea3 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -107,6 +107,7 @@ public: ColumnWithTypeAndName& get_by_position(size_t position) { return data[position]; } const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; } + // need exception safety Status copy_column_data_to_block(doris::vectorized::IColumn* input_col_ptr, uint16_t* sel_rowid_idx, uint16_t select_size, int block_cid, size_t batch_size) { @@ -264,9 +265,11 @@ public: void append_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const; + // need exception safety static void filter_block_internal(Block* block, const std::vector& columns_to_filter, const IColumn::Filter& filter); + // need exception safety static void filter_block_internal(Block* block, const IColumn::Filter& filter, uint32_t column_to_keep); diff --git a/be/src/vec/core/block_spill_writer.cpp b/be/src/vec/core/block_spill_writer.cpp index b1cca3a851..a501253c63 100644 --- a/be/src/vec/core/block_spill_writer.cpp +++ b/be/src/vec/core/block_spill_writer.cpp @@ -87,14 +87,12 @@ Status BlockSpillWriter::write(const Block& block) { auto& dst_data = tmp_block_.get_columns_with_type_and_name(); size_t block_rows = std::min(rows - row_idx, batch_size_); - try { + RETURN_IF_CATCH_EXCEPTION({ for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx) { dst_data[col_idx].column->assume_mutable()->insert_range_from( *src_data[col_idx].column, row_idx, block_rows); } - } catch (const doris::Exception& e) { - return Status::Error(e.code(), e.to_string()); - } + }); RETURN_IF_ERROR(_write_internal(tmp_block_)); diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 01834f8345..aced2c022d 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -38,6 +38,7 @@ public: _reset(); } + // need exception safety void filter_block(IColumn::Filter& filter) { Block::filter_block_internal(&block, filter, block.columns()); _reset(); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 0604106b03..29c2f488a5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -272,11 +272,12 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ if (_position_delete_ctx.has_filter) { filters.push_back(_pos_delete_filter_ptr.get()); } - RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(_filter_conjuncts, filters, block, - columns_to_filter, column_to_keep)); + RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_execute_conjuncts_and_filter_block( + _filter_conjuncts, filters, block, columns_to_filter, column_to_keep))); _convert_dict_cols_to_string_cols(block); } else { - RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)); + RETURN_IF_CATCH_EXCEPTION( + RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter))); } *read_rows = block->rows(); @@ -440,8 +441,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re // generated from next batch, so the filter column is removed ahead. DCHECK_EQ(block->rows(), 0); } else { - RETURN_IF_ERROR(_filter_block_internal(block, _lazy_read_ctx.all_predicate_col_ids, - result_filter)); + RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_filter_block_internal( + block, _lazy_read_ctx.all_predicate_col_ids, result_filter))); Block::erase_useless_column(block, origin_column_num); } } else { @@ -633,6 +634,7 @@ Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) { return Status::OK(); } +// need exception safety Status RowGroupReader::_filter_block(Block* block, int column_to_keep, const std::vector& columns_to_filter) { if (_pos_delete_filter_ptr) { @@ -644,6 +646,7 @@ Status RowGroupReader::_filter_block(Block* block, int column_to_keep, return Status::OK(); } +// need exception safety Status RowGroupReader::_filter_block_internal(Block* block, const std::vector& columns_to_filter, const IColumn::Filter& filter) { @@ -734,8 +737,8 @@ Status RowGroupReader::_rewrite_dict_predicates() { temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size); } std::vector filters; - RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(*ctxs, filters, &temp_block, - columns_to_filter, column_to_keep)); + RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_execute_conjuncts_and_filter_block( + *ctxs, filters, &temp_block, columns_to_filter, column_to_keep))); if (dict_pos != 0) { // We have to clean the first column to insert right data. temp_block.get_by_position(0).column->assume_mutable()->clear(); @@ -978,6 +981,7 @@ Status RowGroupReader::_execute_conjuncts(const std::vector& ctxs } // TODO Performance Optimization +// need exception safety Status RowGroupReader::_execute_conjuncts_and_filter_block( const std::vector& ctxs, const std::vector& filters, Block* block, std::vector& columns_to_filter, int column_to_keep) { diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index ba49134c03..c5fbc34b01 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -392,7 +392,7 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) { SCOPED_TIMER(_probe_side_output_timer); - RETURN_IF_CATCH_BAD_ALLOC( + RETURN_IF_CATCH_EXCEPTION( probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, last_probe_index, probe_size, all_match_one, false)); } @@ -665,7 +665,7 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( } { SCOPED_TIMER(_probe_side_output_timer); - RETURN_IF_CATCH_BAD_ALLOC(probe_side_output_column( + RETURN_IF_CATCH_EXCEPTION(probe_side_output_column( mcol, _join_node->_left_output_slot_flags, current_offset, last_probe_index, probe_size, all_match_one, true)); } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 9c8a8a10f0..2dc9e09073 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -106,7 +106,7 @@ struct ProcessHashTableBuild { // the hash table build bucket, which may waste a lot of memory. // TODO, use the NDV expansion of the key column in the optimizer statistics if (!_join_node->_build_unique) { - RETURN_IF_CATCH_BAD_ALLOC(hash_table_ctx.hash_table.expanse_for_add_elem( + RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table.expanse_for_add_elem( std::min(_rows, config::hash_table_pre_expanse_max_rows))); } @@ -479,7 +479,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ Status st; if (_probe_index < _probe_block.rows()) { DCHECK(_has_set_need_null_map_for_probe); - try { + RETURN_IF_CATCH_EXCEPTION({ std::visit( [&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe, auto ignore_null) { @@ -516,9 +516,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ *_hash_table_variants, *_process_hashtable_ctx_variants, make_bool_variant(_need_null_map_for_probe), make_bool_variant(_probe_ignore_null)); - } catch (const doris::Exception& e) { - return Status::Error(e.code(), e.to_string()); - } + }); } else if (_probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { std::visit( @@ -774,7 +772,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc if (in_block->rows() != 0) { SCOPED_TIMER(_build_side_merge_block_timer); - RETURN_IF_CATCH_BAD_ALLOC(_build_side_mutable_block.merge(*in_block)); + RETURN_IF_CATCH_EXCEPTION(_build_side_mutable_block.merge(*in_block)); } if (UNLIKELY(_build_side_mem_used - _build_side_last_mem_used > BUILD_BLOCK_MAX_SIZE)) { diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index c995929ca1..ee7d0cdc51 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -488,6 +488,7 @@ void VNestedLoopJoinNode::_reset_with_next_probe_row() { block->get_by_position(i).column->assume_mutable()->clear(); \ } +// need exception safety template void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl( Block* block, int column_to_keep, int build_block_idx, int processed_blocks_num, @@ -517,6 +518,7 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl( } } +// need exception safety template Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block, bool materialize) { auto column_to_keep = block->columns(); diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 84f5f18d69..537ed2d7f0 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -105,10 +105,11 @@ private: } if constexpr (set_probe_side_flag) { - auto status = - _do_filtering_and_update_visited_flags( - &_join_block, !_is_left_semi_anti); + Status status; + RETURN_IF_CATCH_EXCEPTION( + (status = _do_filtering_and_update_visited_flags< + set_build_side_flag, set_probe_side_flag, ignore_null>( + &_join_block, !_is_left_semi_anti))); _update_additional_flags(&_join_block); if (!status.ok()) { return status; @@ -141,10 +142,11 @@ private: } if constexpr (!set_probe_side_flag) { - Status status = - _do_filtering_and_update_visited_flags(&_join_block, - !_is_right_semi_anti); + Status status; + RETURN_IF_CATCH_EXCEPTION( + (status = _do_filtering_and_update_visited_flags< + set_build_side_flag, set_probe_side_flag, ignore_null>( + &_join_block, !_is_right_semi_anti))); _update_additional_flags(&_join_block); mutable_join_block = MutableBlock(&_join_block); if (!status.ok()) { diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 77a0d0e7cb..c19a4b7730 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -1115,7 +1115,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i _agg_data->_aggregated_method_variant)); if (!ret_flag) { - RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(), key_columns, rows)); + RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(), key_columns, rows)); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp index 9e3137f7c6..eae0bb0deb 100644 --- a/be/src/vec/exec/varrow_scanner.cpp +++ b/be/src/vec/exec/varrow_scanner.cpp @@ -251,7 +251,7 @@ Status VArrowScanner::get_next(vectorized::Block* block, bool* eof) { RETURN_IF_ERROR(_cast_src_block(&_src_block)); // materialize, src block => dest columns - return _fill_dest_block(block, eof); + RETURN_IF_CATCH_EXCEPTION({ return _fill_dest_block(block, eof); }); } // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index 7d881e5ef2..ab8ee1d696 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -38,12 +38,10 @@ VExprContext::~VExprContext() { doris::Status VExprContext::execute(doris::vectorized::Block* block, int* result_column_id) { Status st; - try { + RETURN_IF_CATCH_EXCEPTION({ st = _root->execute(this, block, result_column_id); _last_result_column_id = *result_column_id; - } catch (const doris::Exception& e) { - st = Status::Error(e.code(), e.to_string()); - } + }); return st; } diff --git a/be/src/vec/functions/array/function_array_apply.cpp b/be/src/vec/functions/array/function_array_apply.cpp index bb81f04fda..a47a28f281 100644 --- a/be/src/vec/functions/array/function_array_apply.cpp +++ b/be/src/vec/functions/array/function_array_apply.cpp @@ -68,8 +68,9 @@ public: const ColumnConst& rhs_value_column = static_cast(*block.get_by_position(arguments[2]).column.get()); ColumnPtr result_ptr; - RETURN_IF_ERROR(_execute(*src_nested_column, nested_type, src_offsets, condition, - rhs_value_column, &result_ptr)); + RETURN_IF_CATCH_EXCEPTION( + RETURN_IF_ERROR(_execute(*src_nested_column, nested_type, src_offsets, condition, + rhs_value_column, &result_ptr))); block.replace_by_position(result, std::move(result_ptr)); return Status::OK(); } @@ -107,6 +108,7 @@ private: __builtin_unreachable(); } + // need exception safety template ColumnPtr _apply_internal(const IColumn& src_column, const ColumnArray::Offsets64& src_offsets, const ColumnConst& cmp) { @@ -144,6 +146,7 @@ private: return ColumnArray::create(filtered, std::move(column_offsets)); } +// need exception safety #define APPLY_ALL_TYPES(src_column, src_offsets, OP, cmp, dst) \ do { \ WhichDataType which(remove_nullable(nested_type)); \ @@ -186,6 +189,7 @@ private: } \ } while (0) + // need exception safety Status _execute(const IColumn& nested_src, DataTypePtr nested_type, const ColumnArray::Offsets64& offsets, const std::string& condition, const ColumnConst& rhs_value_column, ColumnPtr* dst) { diff --git a/be/src/vec/functions/array/function_array_with_constant.cpp b/be/src/vec/functions/array/function_array_with_constant.cpp index 5ee91b97a3..bb8e789525 100644 --- a/be/src/vec/functions/array/function_array_with_constant.cpp +++ b/be/src/vec/functions/array/function_array_with_constant.cpp @@ -78,7 +78,8 @@ public: } auto clone = value->clone_empty(); clone->reserve(input_rows_count); - value->replicate(array_sizes.data(), offset, *clone->assume_mutable().get()); + RETURN_IF_CATCH_EXCEPTION( + value->replicate(array_sizes.data(), offset, *clone->assume_mutable().get())); if (!clone->is_nullable()) { clone = ColumnNullable::create(std::move(clone), ColumnUInt8::create(clone->size(), 0)); } diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index ad3796477d..d1bcf7fcca 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -1201,7 +1201,8 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, for (size_t i = 0; i < filter_col.size(); ++i) { filter_data[i] = !_filter_bitmap.Get(i); } - vectorized::Block::filter_block_internal(&block, filter_col, block.columns()); + RETURN_IF_CATCH_EXCEPTION( + vectorized::Block::filter_block_internal(&block, filter_col, block.columns())); } } // Add block to node channel