From c4cc681d14fc97a29bd64bbd8fcf9b1bedf6c009 Mon Sep 17 00:00:00 2001 From: Zhengguo Yang Date: Thu, 15 Apr 2021 10:52:29 +0800 Subject: [PATCH] remove boost_foreach, using c++ foreach instead (#5611) --- be/src/exec/olap_scan_node.cpp | 173 ++++++++++--------- be/src/exec/schema_scan_node.cpp | 1 - be/src/exec/schema_scanner/schema_helper.cpp | 1 - be/src/http/http_request.cpp | 1 - be/src/plugin/plugin_mgr.cpp | 6 +- be/src/runtime/buffered_block_mgr2.cc | 12 +- be/src/runtime/buffered_tuple_stream2.cc | 2 +- be/src/runtime/client_cache.cpp | 5 +- be/src/runtime/data_stream_recvr.cc | 10 +- be/src/runtime/disk_io_mgr.cc | 5 +- be/src/runtime/disk_io_mgr.h | 1 - be/src/runtime/disk_io_mgr_reader_context.cc | 2 +- be/src/runtime/plan_fragment_executor.cpp | 11 +- be/src/runtime/sorted_run_merger.cc | 38 ++-- be/src/runtime/spill_sorter.cc | 23 +-- be/src/runtime/tmp_file_mgr.cc | 6 +- be/src/runtime/types.cpp | 5 +- be/src/util/error_util.cc | 7 +- be/src/util/network_util.cpp | 3 +- be/src/util/runtime_profile.cpp | 13 +- be/src/util/thrift_rpc_helper.cpp | 1 - be/test/exec/mysql_scan_node_test.cpp | 1 - be/test/exec/schema_scan_node_test.cpp | 1 - be/test/runtime/buffered_block_mgr2_test.cpp | 14 +- 24 files changed, 174 insertions(+), 168 deletions(-) diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 29e37a4a38..c10b7f000d 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -18,7 +18,6 @@ #include "exec/olap_scan_node.h" #include -#include #include #include #include @@ -408,11 +407,12 @@ bool OlapScanNode::is_key_column(const std::string& key_name) { return true; } - auto res = std::find(_olap_scan_node.key_column_name.begin(), _olap_scan_node.key_column_name.end(), key_name); + auto res = std::find(_olap_scan_node.key_column_name.begin(), + _olap_scan_node.key_column_name.end(), key_name); return res != _olap_scan_node.key_column_name.end(); } -void OlapScanNode::remove_pushed_conjuncts(RuntimeState *state) { +void OlapScanNode::remove_pushed_conjuncts(RuntimeState* state) { if (_pushed_conjuncts_index.empty()) { return; } @@ -420,7 +420,8 @@ void OlapScanNode::remove_pushed_conjuncts(RuntimeState *state) { // dispose direct conjunct first std::vector new_conjunct_ctxs; for (int i = 0; i < _direct_conjunct_size; ++i) { - if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) == _pushed_conjuncts_index.cend()){ + if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) == + _pushed_conjuncts_index.cend()) { new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]); } else { _conjunct_ctxs[i]->close(state); @@ -430,7 +431,8 @@ void OlapScanNode::remove_pushed_conjuncts(RuntimeState *state) { // dispose hash push down conjunct second for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) { - if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) == _pushed_conjuncts_index.cend()){ + if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) == + _pushed_conjuncts_index.cend()) { new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]); } else { _conjunct_ctxs[i]->close(state); @@ -460,29 +462,29 @@ Status OlapScanNode::normalize_conjuncts() { for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) { switch (slots[slot_idx]->type().type) { case TYPE_TINYINT: { - ColumnValueRange range( - slots[slot_idx]->col_name(), slots[slot_idx]->type().type); + ColumnValueRange range(slots[slot_idx]->col_name(), + slots[slot_idx]->type().type); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_SMALLINT: { - ColumnValueRange range( - slots[slot_idx]->col_name(), slots[slot_idx]->type().type); + ColumnValueRange range(slots[slot_idx]->col_name(), + slots[slot_idx]->type().type); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_INT: { - ColumnValueRange range( - slots[slot_idx]->col_name(), slots[slot_idx]->type().type); + ColumnValueRange range(slots[slot_idx]->col_name(), + slots[slot_idx]->type().type); normalize_predicate(range, slots[slot_idx]); break; } case TYPE_BIGINT: { - ColumnValueRange range( - slots[slot_idx]->col_name(), slots[slot_idx]->type().type); + ColumnValueRange range(slots[slot_idx]->col_name(), + slots[slot_idx]->type().type); normalize_predicate(range, slots[slot_idx]); break; } @@ -497,8 +499,8 @@ Status OlapScanNode::normalize_conjuncts() { case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_HLL: { - ColumnValueRange range( - slots[slot_idx]->col_name(), slots[slot_idx]->type().type); + ColumnValueRange range(slots[slot_idx]->col_name(), + slots[slot_idx]->type().type); normalize_predicate(range, slots[slot_idx]); break; } @@ -532,7 +534,8 @@ Status OlapScanNode::normalize_conjuncts() { } default: { - VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << slots[slot_idx]->col_name() << "]"; + VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << slots[slot_idx]->col_name() + << "]"; break; } } @@ -713,8 +716,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { // add scanner to pool before doing prepare. // so that scanner can be automatically deconstructed if prepare failed. _scanner_pool->add(scanner); - RETURN_IF_ERROR( - scanner->prepare(*scan_range, scanner_ranges, _olap_filter)); + RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter)); _olap_scanners.push_back(scanner); disk_set.insert(scanner->scan_disk()); @@ -763,8 +765,8 @@ static bool ignore_cast(SlotDescriptor* slot, Expr* expr) { return false; } - -bool OlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor *slot, doris::InPredicate* pred) { +bool OlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor* slot, + doris::InPredicate* pred) { if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { // not a slot ref(column) return false; @@ -798,20 +800,20 @@ bool OlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor *slot, do // different thresholds to improve performance. if (pred->hybrid_set()->size() > _max_pushdown_conditions_per_column) { VLOG_NOTICE << "Predicate value num " << pred->hybrid_set()->size() << " exceed limit " - << _max_pushdown_conditions_per_column; + << _max_pushdown_conditions_per_column; return false; } return true; } -std::pair OlapScanNode::should_push_down_eq_predicate(doris::SlotDescriptor *slot, doris::Expr *pred, - int conj_idx, int child_idx) { +std::pair OlapScanNode::should_push_down_eq_predicate(doris::SlotDescriptor* slot, + doris::Expr* pred, int conj_idx, + int child_idx) { auto result_pair = std::make_pair(false, nullptr); // Do not get slot_ref of column, should not push_down to Storage Engine - if (Expr::type_without_cast(pred->get_child(child_idx)) != - TExprNodeType::SLOT_REF) { + if (Expr::type_without_cast(pred->get_child(child_idx)) != TExprNodeType::SLOT_REF) { return result_pair; } @@ -846,43 +848,41 @@ std::pair OlapScanNode::should_push_down_eq_predicate(doris::SlotDe } template -Status OlapScanNode::change_fixed_value_range(ColumnValueRange& temp_range, PrimitiveType type, void *value, - const ChangeFixedValueRangeFunc& func) { +Status OlapScanNode::change_fixed_value_range(ColumnValueRange& temp_range, PrimitiveType type, + void* value, const ChangeFixedValueRangeFunc& func) { switch (type) { - case TYPE_DATE: { - DateTimeValue date_value = - *reinterpret_cast(value); - // There is must return empty data in olap_scan_node, - // Because data value loss accuracy - if (!date_value.check_loss_accuracy_cast_to_date()) { - func(temp_range, reinterpret_cast(&date_value)); - } - break; - } - case TYPE_DECIMAL: - case TYPE_DECIMALV2: - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_HLL: - case TYPE_DATETIME: - case TYPE_TINYINT: - case TYPE_SMALLINT: - case TYPE_INT: - case TYPE_BIGINT: - case TYPE_LARGEINT: { - func(temp_range, reinterpret_cast(value)); - break; - } - case TYPE_BOOLEAN: { - bool v = *reinterpret_cast(value); - func(temp_range, reinterpret_cast(&v)); - break; - } - default: { - LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type=" - << type << "]"; - return Status::InternalError("Normalize filter fail, Unsupported Primitive type"); + case TYPE_DATE: { + DateTimeValue date_value = *reinterpret_cast(value); + // There is must return empty data in olap_scan_node, + // Because data value loss accuracy + if (!date_value.check_loss_accuracy_cast_to_date()) { + func(temp_range, reinterpret_cast(&date_value)); } + break; + } + case TYPE_DECIMAL: + case TYPE_DECIMALV2: + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_HLL: + case TYPE_DATETIME: + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_BIGINT: + case TYPE_LARGEINT: { + func(temp_range, reinterpret_cast(value)); + break; + } + case TYPE_BOOLEAN: { + bool v = *reinterpret_cast(value); + func(temp_range, reinterpret_cast(&v)); + break; + } + default: { + LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type=" << type << "]"; + return Status::InternalError("Normalize filter fail, Unsupported Primitive type"); + } } return Status::OK(); } @@ -901,7 +901,7 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' if (TExprOpcode::FILTER_IN == _conjunct_ctxs[conj_idx]->root()->op()) { - InPredicate* pred = dynamic_cast(_conjunct_ctxs[conj_idx]->root()); + InPredicate* pred = dynamic_cast(_conjunct_ctxs[conj_idx]->root()); if (!should_push_down_in_predicate(slot, pred)) { continue; } @@ -915,8 +915,9 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, continue; } auto value = const_cast(iter->get_value()); - RETURN_IF_ERROR(change_fixed_value_range(temp_range, slot->type().type, value, - ColumnValueRange::add_fixed_value_range)); + RETURN_IF_ERROR( + change_fixed_value_range(temp_range, slot->type().type, value, + ColumnValueRange::add_fixed_value_range)); iter->next(); } @@ -943,8 +944,9 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, auto value = result_pair.second; // where A = NULL should return empty result set if (value != nullptr) { - RETURN_IF_ERROR(change_fixed_value_range(temp_range, slot->type().type, value, - ColumnValueRange::add_fixed_value_range)); + RETURN_IF_ERROR( + change_fixed_value_range(temp_range, slot->type().type, value, + ColumnValueRange::add_fixed_value_range)); } if (is_key_column(slot->col_name())) { @@ -952,7 +954,7 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, } range->intersection(temp_range); } // end for each binary predicate child - } // end of handling eq binary predicate + } // end of handling eq binary predicate } // exceed limit, no conditions will be pushed down to storage engine. @@ -960,7 +962,7 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, range->set_whole_value_range(); } else { std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), - std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); + std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); } return Status::OK(); } @@ -971,11 +973,12 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, // But if the number of conditions exceeds the limit, none of conditions will be pushed down. template Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot, - ColumnValueRange* range) { + ColumnValueRange* range) { // If the conjunct of slot is fixed value, will change the fixed value set of column value range // else add value to not in range and push down predicate directly bool is_fixed_range = range->is_fixed_value_range(); - auto not_in_range = ColumnValueRange::create_empty_column_value_range(range->column_name(), range->type()); + auto not_in_range = ColumnValueRange::create_empty_column_value_range(range->column_name(), + range->type()); std::vector filter_conjuncts_index; for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { @@ -995,11 +998,13 @@ Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot, } auto value = const_cast(iter->get_value()); if (is_fixed_range) { - RETURN_IF_ERROR(change_fixed_value_range(*range, slot->type().type, value, + RETURN_IF_ERROR(change_fixed_value_range( + *range, slot->type().type, value, ColumnValueRange::remove_fixed_value_range)); } else { - RETURN_IF_ERROR(change_fixed_value_range(not_in_range, slot->type().type, value, - ColumnValueRange::add_fixed_value_range)); + RETURN_IF_ERROR( + change_fixed_value_range(not_in_range, slot->type().type, value, + ColumnValueRange::add_fixed_value_range)); } iter->next(); } @@ -1028,22 +1033,25 @@ Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot, auto value = result_pair.second; if (is_fixed_range) { - RETURN_IF_ERROR(change_fixed_value_range(*range, slot->type().type, value, - ColumnValueRange::remove_fixed_value_range)); + RETURN_IF_ERROR(change_fixed_value_range( + *range, slot->type().type, value, + ColumnValueRange::remove_fixed_value_range)); } else { - RETURN_IF_ERROR(change_fixed_value_range(not_in_range, slot->type().type, value, - ColumnValueRange::add_fixed_value_range)); + RETURN_IF_ERROR( + change_fixed_value_range(not_in_range, slot->type().type, value, + ColumnValueRange::add_fixed_value_range)); } if (is_key_column(slot->col_name())) { filter_conjuncts_index.emplace_back(conj_idx); } } // end for each binary predicate child - } // end of handling eq binary predicate + } // end of handling eq binary predicate } // exceed limit, no conditions will be pushed down to storage engine. - if (is_fixed_range || not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) { + if (is_fixed_range || + not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) { if (!is_fixed_range) { // push down not in condition to storage engine not_in_range.to_in_condition(_olap_filter, false); @@ -1056,7 +1064,8 @@ Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot, template bool OlapScanNode::normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot, - const std::string& is_null_str, ColumnValueRange* range) { + const std::string& is_null_str, + ColumnValueRange* range) { if (expr->node_type() != TExprNodeType::SLOT_REF) { return false; } @@ -1190,14 +1199,14 @@ Status OlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot, } VLOG_CRITICAL << slot->col_name() << " op: " - << static_cast(to_olap_filter_type(pred->op(), child_idx)) - << " value: " << *reinterpret_cast(value); + << static_cast(to_olap_filter_type(pred->op(), child_idx)) + << " value: " << *reinterpret_cast(value); } } } std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), - std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); + std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); return Status::OK(); } diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp index e28f50c6cb..b9c26ad539 100644 --- a/be/src/exec/schema_scan_node.cpp +++ b/be/src/exec/schema_scan_node.cpp @@ -18,7 +18,6 @@ #include "schema_scan_node.h" #include -#include #include "exec/schema_scanner/schema_helper.h" #include "exec/text_converter.hpp" diff --git a/be/src/exec/schema_scanner/schema_helper.cpp b/be/src/exec/schema_scanner/schema_helper.cpp index 335b250aa6..d1174bd459 100644 --- a/be/src/exec/schema_scanner/schema_helper.cpp +++ b/be/src/exec/schema_scanner/schema_helper.cpp @@ -17,7 +17,6 @@ #include "exec/schema_scanner/schema_helper.h" -#include #include #include #include diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 8606667040..c3228f7a58 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -24,7 +24,6 @@ #include #include -#include #include #include #include diff --git a/be/src/plugin/plugin_mgr.cpp b/be/src/plugin/plugin_mgr.cpp index 660faf8999..5cfee90626 100644 --- a/be/src/plugin/plugin_mgr.cpp +++ b/be/src/plugin/plugin_mgr.cpp @@ -17,8 +17,6 @@ #include "plugin/plugin_mgr.h" -#include - #include "common/config.h" #include "gutil/strings/substitute.h" @@ -117,7 +115,7 @@ Status PluginMgr::get_plugin_list(int type, std::vector> std::lock_guard l(_lock); - BOOST_FOREACH (const PluginLoaderMap::value_type& iter, _plugins[type]) { + for (const PluginLoaderMap::value_type& iter : _plugins[type]) { plugin_list->push_back(iter.second->plugin()); } @@ -152,7 +150,7 @@ Status PluginMgr::register_builtin_plugin(const std::string& name, int type, Status PluginMgr::get_all_plugin_info(std::vector* plugin_info_list) { for (int i = 0; i < PLUGIN_TYPE_MAX; ++i) { - BOOST_FOREACH (const PluginLoaderMap::value_type& iter, _plugins[i]) { + for (const PluginLoaderMap::value_type& iter : _plugins[i]) { TPluginInfo info; info.__set_plugin_name(iter.second->name()); info.__set_type(iter.second->type()); diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc index a59d6bf1a5..333a863e25 100644 --- a/be/src/runtime/buffered_block_mgr2.cc +++ b/be/src/runtime/buffered_block_mgr2.cc @@ -587,11 +587,13 @@ BufferedBlockMgr2::~BufferedBlockMgr2() { // See IMPALA-1890. DCHECK_EQ(_non_local_outstanding_writes, 0) << endl << debug_internal(); // Delete tmp files. - BOOST_FOREACH (TmpFileMgr::File& file, _tmp_files) { file.remove(); } + for (TmpFileMgr::File& file : _tmp_files) { + file.remove(); + } _tmp_files.clear(); // Free memory resources. - BOOST_FOREACH (BufferDescriptor* buffer, _all_io_buffers) { + for (BufferDescriptor* buffer : _all_io_buffers) { _mem_tracker->Release(buffer->len); delete[] buffer->buffer; } @@ -780,8 +782,8 @@ Status BufferedBlockMgr2::write_unpinned_block(Block* block) { disk_id = ++next_disk_id; } disk_id %= _io_mgr->num_local_disks(); - DiskIoMgr::WriteRange::WriteDoneCallback callback = - bind(mem_fn(&BufferedBlockMgr2::write_complete), this, block, boost::placeholders::_1); + DiskIoMgr::WriteRange::WriteDoneCallback callback = bind( + mem_fn(&BufferedBlockMgr2::write_complete), this, block, boost::placeholders::_1); block->_write_range = _obj_pool.add( new DiskIoMgr::WriteRange(tmp_file->path(), file_offset, disk_id, callback)); block->_tmp_file = tmp_file; @@ -1152,7 +1154,7 @@ bool BufferedBlockMgr2::validate() const { return false; } - BOOST_FOREACH (BufferDescriptor* buffer, _all_io_buffers) { + for (BufferDescriptor* buffer : _all_io_buffers) { bool is_free = _free_io_buffers.contains(buffer); num_free_io_buffers += is_free; diff --git a/be/src/runtime/buffered_tuple_stream2.cc b/be/src/runtime/buffered_tuple_stream2.cc index e0bcd5b141..6400830356 100644 --- a/be/src/runtime/buffered_tuple_stream2.cc +++ b/be/src/runtime/buffered_tuple_stream2.cc @@ -447,7 +447,7 @@ Status BufferedTupleStream2::unpin_stream(bool all) { DCHECK(!_closed); SCOPED_TIMER(_unpin_timer); - BOOST_FOREACH (BufferedBlockMgr2::Block* block, _blocks) { + for (BufferedBlockMgr2::Block* block : _blocks) { if (!block->is_pinned()) { continue; } diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp index f83b734e8a..c519f15093 100644 --- a/be/src/runtime/client_cache.cpp +++ b/be/src/runtime/client_cache.cpp @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -172,7 +171,7 @@ void ClientCacheHelper::close_connections(const TNetworkAddress& hostport) { } VLOG_RPC << "Invalidating all " << cache_entry->second.size() << " clients for: " << hostport; - BOOST_FOREACH (void* client_key, cache_entry->second) { + for (void* client_key : cache_entry->second) { ClientMap::iterator client_map_entry = _client_map.find(client_key); DCHECK(client_map_entry != _client_map.end()); ThriftClientImpl* info = client_map_entry->second; @@ -202,7 +201,7 @@ void ClientCacheHelper::test_shutdown() { std::vector hostports; { boost::lock_guard lock(_lock); - BOOST_FOREACH (const ClientCacheMap::value_type& i, _client_cache) { + for (const ClientCacheMap::value_type& i : _client_cache) { hostports.push_back(i.first); } } diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index 0962b3c34c..213b7dc851 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -374,7 +374,8 @@ Status DataStreamRecvr::create_merger(const TupleRowComparator& less_than) { return Status::OK(); } -Status DataStreamRecvr::create_parallel_merger(const TupleRowComparator& less_than, uint32_t batch_size, MemTracker* mem_tracker) { +Status DataStreamRecvr::create_parallel_merger(const TupleRowComparator& less_than, + uint32_t batch_size, MemTracker* mem_tracker) { DCHECK(_is_merging); vector child_input_batch_suppliers; @@ -424,7 +425,7 @@ void DataStreamRecvr::transfer_all_resources(RowBatch* transfer_batch) { if (!_child_mergers.empty()) { _merger->transfer_all_resources(transfer_batch); } else { - BOOST_FOREACH (SenderQueue* sender_queue, _sender_queues) { + for (SenderQueue* sender_queue : _sender_queues) { if (sender_queue->current_batch() != NULL) { sender_queue->current_batch()->transfer_resource_ownership(transfer_batch); } @@ -447,9 +448,8 @@ DataStreamRecvr::DataStreamRecvr( _num_buffered_bytes(0), _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { - _mem_tracker = MemTracker::CreateTracker(_profile, -1, - "DataStreamRecvr:" + print_id(_fragment_instance_id), - parent_tracker); + _mem_tracker = MemTracker::CreateTracker( + _profile, -1, "DataStreamRecvr:" + print_id(_fragment_instance_id), parent_tracker); // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc index af1cde7a62..20b3d0d939 100644 --- a/be/src/runtime/disk_io_mgr.cc +++ b/be/src/runtime/disk_io_mgr.cc @@ -189,7 +189,7 @@ string DiskIoMgr::debug_string() { ss << " " << (void*)_disk_queues[i] << ":"; if (!_disk_queues[i]->request_contexts.empty()) { ss << " Readers: "; - BOOST_FOREACH (RequestContext* req_context, _disk_queues[i]->request_contexts) { + for (RequestContext* req_context : _disk_queues[i]->request_contexts) { ss << (void*)req_context; } } @@ -404,7 +404,8 @@ Status DiskIoMgr::init(const std::shared_ptr& process_mem_tracker) { return Status::OK(); } -Status DiskIoMgr::register_context(RequestContext** request_context, std::shared_ptr mem_tracker) { +Status DiskIoMgr::register_context(RequestContext** request_context, + std::shared_ptr mem_tracker) { DCHECK(_request_context_cache) << "Must call init() first."; *request_context = _request_context_cache->get_new_context(); (*request_context)->reset(std::move(mem_tracker)); diff --git a/be/src/runtime/disk_io_mgr.h b/be/src/runtime/disk_io_mgr.h index 86e6630b72..4d6a03b85a 100644 --- a/be/src/runtime/disk_io_mgr.h +++ b/be/src/runtime/disk_io_mgr.h @@ -18,7 +18,6 @@ #ifndef DORIS_BE_SRC_QUERY_RUNTIME_DISK_IO_MGR_H #define DORIS_BE_SRC_QUERY_RUNTIME_DISK_IO_MGR_H -#include #include #include #include diff --git a/be/src/runtime/disk_io_mgr_reader_context.cc b/be/src/runtime/disk_io_mgr_reader_context.cc index a764dd87b8..b07466cc05 100644 --- a/be/src/runtime/disk_io_mgr_reader_context.cc +++ b/be/src/runtime/disk_io_mgr_reader_context.cc @@ -92,7 +92,7 @@ void DiskIoMgr::RequestContext::cancel(const Status& status) { } } - BOOST_FOREACH (const WriteRange::WriteDoneCallback& write_callback, write_callbacks) { + for (const WriteRange::WriteDoneCallback& write_callback : write_callbacks) { write_callback(_status); } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index fc1e54c773..6eceb75d5f 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -20,7 +20,6 @@ #include #include -#include #include #include "common/logging.h" @@ -131,10 +130,10 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, bytes_limit = _exec_env->process_mem_tracker()->limit(); } // NOTE: this MemTracker only for olap - _mem_tracker = MemTracker::CreateTracker( - bytes_limit, - "PlanFragmentExecutor:" + print_id(_query_id) + ":" + print_id(params.fragment_instance_id), - _exec_env->process_mem_tracker()); + _mem_tracker = MemTracker::CreateTracker(bytes_limit, + "PlanFragmentExecutor:" + print_id(_query_id) + ":" + + print_id(params.fragment_instance_id), + _exec_env->process_mem_tracker()); _runtime_state->set_fragment_mem_tracker(_mem_tracker); LOG(INFO) << "Using query memory limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES); @@ -167,7 +166,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, // set #senders of exchange nodes before calling Prepare() std::vector exch_nodes; _plan->collect_nodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes); - BOOST_FOREACH (ExecNode* exch_node, exch_nodes) { + for (ExecNode* exch_node : exch_nodes) { DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE); int num_senders = find_with_default(params.per_exch_num_senders, exch_node->id(), 0); DCHECK_GT(num_senders, 0); diff --git a/be/src/runtime/sorted_run_merger.cc b/be/src/runtime/sorted_run_merger.cc index adcec2d2a6..8b6f2839ec 100644 --- a/be/src/runtime/sorted_run_merger.cc +++ b/be/src/runtime/sorted_run_merger.cc @@ -123,7 +123,8 @@ public: // Retrieves the first batch of sorted rows from the run. Status init(bool* done) override { *done = false; - _pull_task_thread = std::thread(&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task, this); + _pull_task_thread = std::thread( + &SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task, this); RETURN_IF_ERROR(next(NULL, done)); return Status::OK(); @@ -144,7 +145,7 @@ public: delete _input_row_batch; std::unique_lock lock(_mutex); - _batch_prepared_cv.wait(lock, [this](){ return _backup_ready.load(); }); + _batch_prepared_cv.wait(lock, [this]() { return _backup_ready.load(); }); // switch input_row_batch_backup to _input_row_batch _input_row_batch = _input_row_batch_backup; @@ -187,9 +188,7 @@ private: // do merge from sender queue data _status_backup = _sorted_run(&_input_row_batch_backup); _backup_ready = true; - DeferOp defer_op([this]() { - _batch_prepared_cv.notify_one(); - }); + DeferOp defer_op([this]() { _batch_prepared_cv.notify_one(); }); if (!_status_backup.ok() || _input_row_batch_backup == nullptr || _cancel) { if (!_status_backup.ok()) _input_row_batch_backup = nullptr; @@ -237,9 +236,10 @@ SortedRunMerger::SortedRunMerger(const TupleRowComparator& compare_less_than, Status SortedRunMerger::prepare(const vector& input_runs, bool parallel) { DCHECK_EQ(_min_heap.size(), 0); _min_heap.reserve(input_runs.size()); - BOOST_FOREACH (const RunBatchSupplier& input_run, input_runs) { - BatchedRowSupplier* new_elem = _pool.add( - parallel ? new ParallelBatchedRowSupplier(this, input_run) : new BatchedRowSupplier(this, input_run)); + for (const RunBatchSupplier& input_run : input_runs) { + BatchedRowSupplier* new_elem = + _pool.add(parallel ? new ParallelBatchedRowSupplier(this, input_run) + : new BatchedRowSupplier(this, input_run)); DCHECK(new_elem != NULL); bool empty = false; RETURN_IF_ERROR(new_elem->init(&empty)); @@ -256,7 +256,7 @@ Status SortedRunMerger::prepare(const vector& input_runs, bool return Status::OK(); } -void SortedRunMerger::transfer_all_resources(class doris::RowBatch * transfer_resource_batch) { +void SortedRunMerger::transfer_all_resources(class doris::RowBatch* transfer_resource_batch) { for (BatchedRowSupplier* batched_row_supplier : _min_heap) { auto row_batch = batched_row_supplier->get_row_batch(); if (row_batch != nullptr) { @@ -306,17 +306,15 @@ Status SortedRunMerger::get_next(RowBatch* output_batch, bool* eos) { } ChildSortedRunMerger::ChildSortedRunMerger(const TupleRowComparator& compare_less_than, - RowDescriptor* row_desc, - RuntimeProfile* profile, - MemTracker* parent, - uint32_t row_batch_size, - bool deep_copy_input): - SortedRunMerger(compare_less_than, row_desc, profile, deep_copy_input), - _eos(false), - _parent(parent), - _row_batch_size(row_batch_size) { - _get_next_timer = ADD_TIMER(profile, "ChildMergeGetNext"); - _get_next_batch_timer = ADD_TIMER(profile, "ChildMergeGetNextBatch"); + RowDescriptor* row_desc, RuntimeProfile* profile, + MemTracker* parent, uint32_t row_batch_size, + bool deep_copy_input) + : SortedRunMerger(compare_less_than, row_desc, profile, deep_copy_input), + _eos(false), + _parent(parent), + _row_batch_size(row_batch_size) { + _get_next_timer = ADD_TIMER(profile, "ChildMergeGetNext"); + _get_next_batch_timer = ADD_TIMER(profile, "ChildMergeGetNextBatch"); } Status ChildSortedRunMerger::get_batch(RowBatch** output_batch) { diff --git a/be/src/runtime/spill_sorter.cc b/be/src/runtime/spill_sorter.cc index 24985df23a..25c9dd50d8 100644 --- a/be/src/runtime/spill_sorter.cc +++ b/be/src/runtime/spill_sorter.cc @@ -496,7 +496,7 @@ Status SpillSorter::Run::add_batch(RowBatch* batch, int start_index, int* num_pr // Sorting of tuples containing array values is not implemented. The planner // combined with projection should guarantee that none are in each tuple. - // BOOST_FOREACH(const SlotDescriptor* collection_slot, + // for(const SlotDescriptor* collection_slot : // _sort_tuple_desc->collection_slots()) { // DCHECK(new_tuple->is_null(collection_slot->null_indicator_offset())); // } @@ -535,13 +535,13 @@ Status SpillSorter::Run::add_batch(RowBatch* batch, int start_index, int* num_pr void SpillSorter::Run::transfer_resources(RowBatch* row_batch) { DCHECK(row_batch != NULL); - BOOST_FOREACH (BufferedBlockMgr2::Block* block, _fixed_len_blocks) { + for (BufferedBlockMgr2::Block* block : _fixed_len_blocks) { if (block != NULL) { row_batch->add_block(block); } } _fixed_len_blocks.clear(); - BOOST_FOREACH (BufferedBlockMgr2::Block* block, _var_len_blocks) { + for (BufferedBlockMgr2::Block* block : _var_len_blocks) { if (block != NULL) { row_batch->add_block(block); } @@ -554,13 +554,13 @@ void SpillSorter::Run::transfer_resources(RowBatch* row_batch) { } void SpillSorter::Run::delete_all_blocks() { - BOOST_FOREACH (BufferedBlockMgr2::Block* block, _fixed_len_blocks) { + for (BufferedBlockMgr2::Block* block : _fixed_len_blocks) { if (block != NULL) { block->del(); } } _fixed_len_blocks.clear(); - BOOST_FOREACH (BufferedBlockMgr2::Block* block, _var_len_blocks) { + for (BufferedBlockMgr2::Block* block : _var_len_blocks) { if (block != NULL) { block->del(); } @@ -619,7 +619,9 @@ Status SpillSorter::Run::unpin_all_blocks() { } // Clear _var_len_blocks and replace with it with the contents of sorted_var_len_blocks - BOOST_FOREACH (BufferedBlockMgr2::Block* var_block, _var_len_blocks) { var_block->del(); } + for (BufferedBlockMgr2::Block* var_block : _var_len_blocks) { + var_block->del(); + } _var_len_blocks.clear(); sorted_var_len_blocks.swap(_var_len_blocks); // Set _var_len_copy_block to NULL since it's now in _var_len_blocks and is no longer @@ -818,7 +820,7 @@ void SpillSorter::Run::collect_non_null_varslots(Tuple* src, vectorclear(); *total_var_len = 0; - BOOST_FOREACH (const SlotDescriptor* string_slot, _sort_tuple_desc->string_slots()) { + for (const SlotDescriptor* string_slot : _sort_tuple_desc->string_slots()) { if (!src->is_null(string_slot->null_indicator_offset())) { StringValue* string_val = reinterpret_cast(src->get_slot(string_slot->tuple_offset())); @@ -852,7 +854,7 @@ Status SpillSorter::Run::try_add_block(vector* block_ } void SpillSorter::Run::copy_var_len_data(char* dest, const vector& string_values) { - BOOST_FOREACH (StringValue* string_val, string_values) { + for (StringValue* string_val : string_values) { memcpy(dest, string_val->ptr, string_val->len); string_val->ptr = dest; dest += string_val->len; @@ -861,7 +863,7 @@ void SpillSorter::Run::copy_var_len_data(char* dest, const vector& void SpillSorter::Run::copy_var_len_data_convert_offset(char* dest, int64_t offset, const vector& string_values) { - BOOST_FOREACH (StringValue* string_val, string_values) { + for (StringValue* string_val : string_values) { memcpy(dest, string_val->ptr, string_val->len); string_val->ptr = reinterpret_cast(offset); dest += string_val->len; @@ -1316,7 +1318,8 @@ Status SpillSorter::create_merger(int num_runs) { RETURN_IF_ERROR(run->prepare_read()); // Run::get_next_batch() is used by the merger to retrieve a batch of rows to merge // from this run. - merge_runs.push_back(bind(mem_fn(&Run::get_next_batch), run, boost::placeholders::_1)); + merge_runs.push_back( + bind(mem_fn(&Run::get_next_batch), run, boost::placeholders::_1)); _sorted_runs.pop_front(); _merging_runs.push_back(run); } diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc index 090a464664..11283026de 100644 --- a/be/src/runtime/tmp_file_mgr.cc +++ b/be/src/runtime/tmp_file_mgr.cc @@ -18,12 +18,11 @@ #include "runtime/tmp_file_mgr.h" #include -#include -#include #include #include #include #include +#include // #include // #include @@ -84,7 +83,8 @@ Status TmpFileMgr::init_custom(const vector& tmp_dirs, bool one_dir_per_ // For each tmp directory, find the disk it is on, // so additional tmp directories on the same disk can be skipped. for (int i = 0; i < tmp_dirs.size(); ++i) { - std::filesystem::path tmp_path = std::string_view(boost::trim_right_copy_if(tmp_dirs[i], is_any_of("/"))); + std::filesystem::path tmp_path = + std::string_view(boost::trim_right_copy_if(tmp_dirs[i], is_any_of("/"))); tmp_path = std::filesystem::absolute(tmp_path); path scratch_subdir_path(tmp_path / _s_tmp_sub_dir_name); // tmp_path must be a writable directory. diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp index 1a3132a2d8..1953c22526 100644 --- a/be/src/runtime/types.cpp +++ b/be/src/runtime/types.cpp @@ -17,7 +17,6 @@ #include "runtime/types.h" -#include #include #include @@ -92,7 +91,9 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type) const { node.struct_fields.back().name = field_name; } } - BOOST_FOREACH (const TypeDescriptor& child, children) { child.to_thrift(thrift_type); } + for (const TypeDescriptor& child : children) { + child.to_thrift(thrift_type); + } } else { node.type = TTypeNodeType::SCALAR; node.__set_scalar_type(TScalarType()); diff --git a/be/src/util/error_util.cc b/be/src/util/error_util.cc index aa56728a46..a7c76fd864 100644 --- a/be/src/util/error_util.cc +++ b/be/src/util/error_util.cc @@ -19,7 +19,6 @@ #include -#include #include #include @@ -138,9 +137,9 @@ ErrorMsg ErrorMsg::init(TErrorCode::type error, const ArgType& arg0, } void print_error_map(ostream* stream, const ErrorLogMap& errors) { - BOOST_FOREACH(const ErrorLogMap::value_type& v, errors) { + for (const ErrorLogMap::value_type& v : errors) { if (v.first == TErrorCode::GENERAL) { - BOOST_FOREACH(const string& s, v.second.messages) { + for (const string& s : v.second.messages) { *stream << s << "\n"; } } else { @@ -161,7 +160,7 @@ string print_error_map_to_string(const ErrorLogMap& errors) { } void merge_error_maps(ErrorLogMap* left, const ErrorLogMap& right) { - BOOST_FOREACH(const ErrorLogMap::value_type& v, right) { + for (const ErrorLogMap::value_type& v : right) { // Append generic message, append specific codes or increment count if exists if (v.first == TErrorCode::GENERAL) { (*left)[v.first].messages.insert( diff --git a/be/src/util/network_util.cpp b/be/src/util/network_util.cpp index 7b13f2844d..ba858c1c4c 100644 --- a/be/src/util/network_util.cpp +++ b/be/src/util/network_util.cpp @@ -25,7 +25,6 @@ #include #include -#include #include namespace doris { @@ -102,7 +101,7 @@ Status hostname_to_ip_addrs(const std::string& name, std::vector* a } bool find_first_non_localhost(const std::vector& addresses, std::string* addr) { - BOOST_FOREACH (const std::string& candidate, addresses) { + for (const std::string& candidate : addresses) { if (candidate != LOCALHOST) { *addr = candidate; return true; diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 9da6f1a903..a2fd8f8d3d 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -17,7 +17,6 @@ #include "util/runtime_profile.h" -#include #include #include #include @@ -188,7 +187,7 @@ void RuntimeProfile::update(const std::vector& nodes, int* { boost::lock_guard l(_info_strings_lock); const InfoStrings& info_strings = node.info_strings; - BOOST_FOREACH (const std::string& key, node.info_strings_display_order) { + for (const std::string& key : node.info_strings_display_order) { // Look for existing info strings and update in place. If there // are new strings, add them to the end of the display order. // TODO: Is nodes.info_strings always a superset of @@ -514,7 +513,7 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co { boost::lock_guard l(_info_strings_lock); - BOOST_FOREACH (const std::string& key, _info_strings_display_order) { + for (const std::string& key : _info_strings_display_order) { stream << prefix << " - " << key << ": " << _info_strings.find(key)->second << std::endl; } @@ -528,13 +527,13 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co // - Event 3: 2s410ms (121.138ms) // The times in parentheses are the time elapsed since the last event. boost::lock_guard l(_event_sequences_lock); - BOOST_FOREACH (const EventSequenceMap::value_type& event_sequence, _event_sequence_map) { + for (const EventSequenceMap::value_type& event_sequence : _event_sequence_map) { stream << prefix << " " << event_sequence.first << ": " << PrettyPrinter::print(event_sequence.second->elapsed_time(), TUnit::TIME_NS) << std::endl; int64_t last = 0L; - BOOST_FOREACH (const EventSequence::Event& event, event_sequence.second->events()) { + for (const EventSequence::Event& event : event_sequence.second->events()) { stream << prefix << " - " << event.first << ": " << PrettyPrinter::print(event.second, TUnit::TIME_NS) << " (" << PrettyPrinter::print(event.second - last, TUnit::TIME_NS) << ")" @@ -783,7 +782,7 @@ void RuntimeProfile::stop_bucketing_counters_updates(std::vector* buck } if (convert && num_sampled > 0) { - BOOST_FOREACH (Counter* counter, *buckets) { + for (Counter* counter : *buckets) { double perc = 100 * counter->value() / (double)num_sampled; counter->set(perc); } @@ -873,7 +872,7 @@ void RuntimeProfile::print_child_counters(const std::string& prefix, if (itr != child_counter_map.end()) { const std::set& child_counters = itr->second; - BOOST_FOREACH (const std::string& child_counter, child_counters) { + for (const std::string& child_counter : child_counters) { CounterMap::const_iterator iter = counter_map.find(child_counter); DCHECK(iter != counter_map.end()); stream << prefix << " - " << iter->first << ": " diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp index e105942509..881f5f96d1 100644 --- a/be/src/util/thrift_rpc_helper.cpp +++ b/be/src/util/thrift_rpc_helper.cpp @@ -17,7 +17,6 @@ #include "util/thrift_rpc_helper.h" -#include #include #include #include diff --git a/be/test/exec/mysql_scan_node_test.cpp b/be/test/exec/mysql_scan_node_test.cpp index cb7914f8bb..186adcb911 100644 --- a/be/test/exec/mysql_scan_node_test.cpp +++ b/be/test/exec/mysql_scan_node_test.cpp @@ -19,7 +19,6 @@ #include -#include #include #include "common/object_pool.h" diff --git a/be/test/exec/schema_scan_node_test.cpp b/be/test/exec/schema_scan_node_test.cpp index fb1b698f28..beeda89377 100644 --- a/be/test/exec/schema_scan_node_test.cpp +++ b/be/test/exec/schema_scan_node_test.cpp @@ -19,7 +19,6 @@ #include -#include #include #include "common/object_pool.h" diff --git a/be/test/runtime/buffered_block_mgr2_test.cpp b/be/test/runtime/buffered_block_mgr2_test.cpp index bc4723bcbb..62ac0c65d1 100644 --- a/be/test/runtime/buffered_block_mgr2_test.cpp +++ b/be/test/runtime/buffered_block_mgr2_test.cpp @@ -22,9 +22,9 @@ #include #include -#include #include #include +#include #include "runtime/disk_io_mgr.h" #include "runtime/exec_env.h" @@ -325,7 +325,9 @@ protected: AllocateBlocks(block_mgr, client, max_num_buffers, &blocks); EXPECT_EQ(block_mgr->bytes_allocated(), max_num_buffers * block_size); - BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->unpin(); } + for (BufferedBlockMgr2::Block* block : blocks) { + block->unpin(); + } // Re-pinning all blocks for (int i = 0; i < blocks.size(); ++i) { @@ -339,7 +341,9 @@ protected: EXPECT_EQ(buffered_pin->value(), buffered_pins_expected); // Unpin all blocks - BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->unpin(); } + for (BufferedBlockMgr2::Block* block : blocks) { + block->unpin(); + } // Get two new blocks. AllocateBlocks(block_mgr, client, 2, &blocks); // At least two writes must be issued. The first (num_blocks - 2) must be in memory. @@ -705,7 +709,9 @@ TEST_F(BufferedBlockMgrTest, Deletion) { AllocateBlocks(block_mgr, client, max_num_buffers, &blocks); EXPECT_TRUE(created_cnt->value() == max_num_buffers); - BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->del(); } + for (BufferedBlockMgr2::Block* block : blocks) { + block->del(); + } AllocateBlocks(block_mgr, client, max_num_buffers, &blocks); EXPECT_TRUE(created_cnt->value() == max_num_buffers); EXPECT_TRUE(recycled_cnt->value() == max_num_buffers);