remove boost_foreach, using c++ foreach instead (#5611)

This commit is contained in:
Zhengguo Yang
2021-04-15 10:52:29 +08:00
committed by GitHub
parent 50ffae44b1
commit c4cc681d14
24 changed files with 174 additions and 168 deletions

View File

@ -18,7 +18,6 @@
#include "exec/olap_scan_node.h"
#include <algorithm>
#include <boost/foreach.hpp>
#include <boost/variant.hpp>
#include <iostream>
#include <sstream>
@ -408,11 +407,12 @@ bool OlapScanNode::is_key_column(const std::string& key_name) {
return true;
}
auto res = std::find(_olap_scan_node.key_column_name.begin(), _olap_scan_node.key_column_name.end(), key_name);
auto res = std::find(_olap_scan_node.key_column_name.begin(),
_olap_scan_node.key_column_name.end(), key_name);
return res != _olap_scan_node.key_column_name.end();
}
void OlapScanNode::remove_pushed_conjuncts(RuntimeState *state) {
void OlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
if (_pushed_conjuncts_index.empty()) {
return;
}
@ -420,7 +420,8 @@ void OlapScanNode::remove_pushed_conjuncts(RuntimeState *state) {
// dispose direct conjunct first
std::vector<ExprContext*> new_conjunct_ctxs;
for (int i = 0; i < _direct_conjunct_size; ++i) {
if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) == _pushed_conjuncts_index.cend()){
if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) ==
_pushed_conjuncts_index.cend()) {
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
} else {
_conjunct_ctxs[i]->close(state);
@ -430,7 +431,8 @@ void OlapScanNode::remove_pushed_conjuncts(RuntimeState *state) {
// dispose hash push down conjunct second
for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) {
if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) == _pushed_conjuncts_index.cend()){
if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) ==
_pushed_conjuncts_index.cend()) {
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
} else {
_conjunct_ctxs[i]->close(state);
@ -460,29 +462,29 @@ Status OlapScanNode::normalize_conjuncts() {
for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) {
switch (slots[slot_idx]->type().type) {
case TYPE_TINYINT: {
ColumnValueRange<int8_t> range(
slots[slot_idx]->col_name(), slots[slot_idx]->type().type);
ColumnValueRange<int8_t> range(slots[slot_idx]->col_name(),
slots[slot_idx]->type().type);
normalize_predicate(range, slots[slot_idx]);
break;
}
case TYPE_SMALLINT: {
ColumnValueRange<int16_t> range(
slots[slot_idx]->col_name(), slots[slot_idx]->type().type);
ColumnValueRange<int16_t> range(slots[slot_idx]->col_name(),
slots[slot_idx]->type().type);
normalize_predicate(range, slots[slot_idx]);
break;
}
case TYPE_INT: {
ColumnValueRange<int32_t> range(
slots[slot_idx]->col_name(), slots[slot_idx]->type().type);
ColumnValueRange<int32_t> range(slots[slot_idx]->col_name(),
slots[slot_idx]->type().type);
normalize_predicate(range, slots[slot_idx]);
break;
}
case TYPE_BIGINT: {
ColumnValueRange<int64_t> range(
slots[slot_idx]->col_name(), slots[slot_idx]->type().type);
ColumnValueRange<int64_t> range(slots[slot_idx]->col_name(),
slots[slot_idx]->type().type);
normalize_predicate(range, slots[slot_idx]);
break;
}
@ -497,8 +499,8 @@ Status OlapScanNode::normalize_conjuncts() {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_HLL: {
ColumnValueRange<StringValue> range(
slots[slot_idx]->col_name(), slots[slot_idx]->type().type);
ColumnValueRange<StringValue> range(slots[slot_idx]->col_name(),
slots[slot_idx]->type().type);
normalize_predicate(range, slots[slot_idx]);
break;
}
@ -532,7 +534,8 @@ Status OlapScanNode::normalize_conjuncts() {
}
default: {
VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << slots[slot_idx]->col_name() << "]";
VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << slots[slot_idx]->col_name()
<< "]";
break;
}
}
@ -713,8 +716,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare failed.
_scanner_pool->add(scanner);
RETURN_IF_ERROR(
scanner->prepare(*scan_range, scanner_ranges, _olap_filter));
RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter));
_olap_scanners.push_back(scanner);
disk_set.insert(scanner->scan_disk());
@ -763,8 +765,8 @@ static bool ignore_cast(SlotDescriptor* slot, Expr* expr) {
return false;
}
bool OlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor *slot, doris::InPredicate* pred) {
bool OlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor* slot,
doris::InPredicate* pred) {
if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
return false;
@ -798,20 +800,20 @@ bool OlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor *slot, do
// different thresholds to improve performance.
if (pred->hybrid_set()->size() > _max_pushdown_conditions_per_column) {
VLOG_NOTICE << "Predicate value num " << pred->hybrid_set()->size() << " exceed limit "
<< _max_pushdown_conditions_per_column;
<< _max_pushdown_conditions_per_column;
return false;
}
return true;
}
std::pair<bool, void*> OlapScanNode::should_push_down_eq_predicate(doris::SlotDescriptor *slot, doris::Expr *pred,
int conj_idx, int child_idx) {
std::pair<bool, void*> OlapScanNode::should_push_down_eq_predicate(doris::SlotDescriptor* slot,
doris::Expr* pred, int conj_idx,
int child_idx) {
auto result_pair = std::make_pair<bool, void*>(false, nullptr);
// Do not get slot_ref of column, should not push_down to Storage Engine
if (Expr::type_without_cast(pred->get_child(child_idx)) !=
TExprNodeType::SLOT_REF) {
if (Expr::type_without_cast(pred->get_child(child_idx)) != TExprNodeType::SLOT_REF) {
return result_pair;
}
@ -846,43 +848,41 @@ std::pair<bool, void*> OlapScanNode::should_push_down_eq_predicate(doris::SlotDe
}
template <typename T, typename ChangeFixedValueRangeFunc>
Status OlapScanNode::change_fixed_value_range(ColumnValueRange<T>& temp_range, PrimitiveType type, void *value,
const ChangeFixedValueRangeFunc& func) {
Status OlapScanNode::change_fixed_value_range(ColumnValueRange<T>& temp_range, PrimitiveType type,
void* value, const ChangeFixedValueRangeFunc& func) {
switch (type) {
case TYPE_DATE: {
DateTimeValue date_value =
*reinterpret_cast<DateTimeValue*>(value);
// There is must return empty data in olap_scan_node,
// Because data value loss accuracy
if (!date_value.check_loss_accuracy_cast_to_date()) {
func(temp_range, reinterpret_cast<T*>(&date_value));
}
break;
}
case TYPE_DECIMAL:
case TYPE_DECIMALV2:
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_HLL:
case TYPE_DATETIME:
case TYPE_TINYINT:
case TYPE_SMALLINT:
case TYPE_INT:
case TYPE_BIGINT:
case TYPE_LARGEINT: {
func(temp_range, reinterpret_cast<T*>(value));
break;
}
case TYPE_BOOLEAN: {
bool v = *reinterpret_cast<bool*>(value);
func(temp_range, reinterpret_cast<T*>(&v));
break;
}
default: {
LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type="
<< type << "]";
return Status::InternalError("Normalize filter fail, Unsupported Primitive type");
case TYPE_DATE: {
DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
// There is must return empty data in olap_scan_node,
// Because data value loss accuracy
if (!date_value.check_loss_accuracy_cast_to_date()) {
func(temp_range, reinterpret_cast<T*>(&date_value));
}
break;
}
case TYPE_DECIMAL:
case TYPE_DECIMALV2:
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_HLL:
case TYPE_DATETIME:
case TYPE_TINYINT:
case TYPE_SMALLINT:
case TYPE_INT:
case TYPE_BIGINT:
case TYPE_LARGEINT: {
func(temp_range, reinterpret_cast<T*>(value));
break;
}
case TYPE_BOOLEAN: {
bool v = *reinterpret_cast<bool*>(value);
func(temp_range, reinterpret_cast<T*>(&v));
break;
}
default: {
LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type=" << type << "]";
return Status::InternalError("Normalize filter fail, Unsupported Primitive type");
}
}
return Status::OK();
}
@ -901,7 +901,7 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
// 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
if (TExprOpcode::FILTER_IN == _conjunct_ctxs[conj_idx]->root()->op()) {
InPredicate* pred = dynamic_cast<InPredicate *>(_conjunct_ctxs[conj_idx]->root());
InPredicate* pred = dynamic_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
if (!should_push_down_in_predicate(slot, pred)) {
continue;
}
@ -915,8 +915,9 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
continue;
}
auto value = const_cast<void*>(iter->get_value());
RETURN_IF_ERROR(change_fixed_value_range(temp_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
RETURN_IF_ERROR(
change_fixed_value_range(temp_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
iter->next();
}
@ -943,8 +944,9 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
auto value = result_pair.second;
// where A = NULL should return empty result set
if (value != nullptr) {
RETURN_IF_ERROR(change_fixed_value_range(temp_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
RETURN_IF_ERROR(
change_fixed_value_range(temp_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
}
if (is_key_column(slot->col_name())) {
@ -952,7 +954,7 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
}
range->intersection(temp_range);
} // end for each binary predicate child
} // end of handling eq binary predicate
} // end of handling eq binary predicate
}
// exceed limit, no conditions will be pushed down to storage engine.
@ -960,7 +962,7 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
range->set_whole_value_range();
} else {
std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
}
return Status::OK();
}
@ -971,11 +973,12 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
// But if the number of conditions exceeds the limit, none of conditions will be pushed down.
template <class T>
Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot,
ColumnValueRange<T>* range) {
ColumnValueRange<T>* range) {
// If the conjunct of slot is fixed value, will change the fixed value set of column value range
// else add value to not in range and push down predicate directly
bool is_fixed_range = range->is_fixed_value_range();
auto not_in_range = ColumnValueRange<T>::create_empty_column_value_range(range->column_name(), range->type());
auto not_in_range = ColumnValueRange<T>::create_empty_column_value_range(range->column_name(),
range->type());
std::vector<uint32_t> filter_conjuncts_index;
for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
@ -995,11 +998,13 @@ Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot,
}
auto value = const_cast<void*>(iter->get_value());
if (is_fixed_range) {
RETURN_IF_ERROR(change_fixed_value_range(*range, slot->type().type, value,
RETURN_IF_ERROR(change_fixed_value_range(
*range, slot->type().type, value,
ColumnValueRange<T>::remove_fixed_value_range));
} else {
RETURN_IF_ERROR(change_fixed_value_range(not_in_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
RETURN_IF_ERROR(
change_fixed_value_range(not_in_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
}
iter->next();
}
@ -1028,22 +1033,25 @@ Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot,
auto value = result_pair.second;
if (is_fixed_range) {
RETURN_IF_ERROR(change_fixed_value_range(*range, slot->type().type, value,
ColumnValueRange<T>::remove_fixed_value_range));
RETURN_IF_ERROR(change_fixed_value_range(
*range, slot->type().type, value,
ColumnValueRange<T>::remove_fixed_value_range));
} else {
RETURN_IF_ERROR(change_fixed_value_range(not_in_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
RETURN_IF_ERROR(
change_fixed_value_range(not_in_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
}
if (is_key_column(slot->col_name())) {
filter_conjuncts_index.emplace_back(conj_idx);
}
} // end for each binary predicate child
} // end of handling eq binary predicate
} // end of handling eq binary predicate
}
// exceed limit, no conditions will be pushed down to storage engine.
if (is_fixed_range || not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) {
if (is_fixed_range ||
not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) {
if (!is_fixed_range) {
// push down not in condition to storage engine
not_in_range.to_in_condition(_olap_filter, false);
@ -1056,7 +1064,8 @@ Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot,
template <typename T>
bool OlapScanNode::normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot,
const std::string& is_null_str, ColumnValueRange<T>* range) {
const std::string& is_null_str,
ColumnValueRange<T>* range) {
if (expr->node_type() != TExprNodeType::SLOT_REF) {
return false;
}
@ -1190,14 +1199,14 @@ Status OlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot,
}
VLOG_CRITICAL << slot->col_name() << " op: "
<< static_cast<int>(to_olap_filter_type(pred->op(), child_idx))
<< " value: " << *reinterpret_cast<T*>(value);
<< static_cast<int>(to_olap_filter_type(pred->op(), child_idx))
<< " value: " << *reinterpret_cast<T*>(value);
}
}
}
std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
return Status::OK();
}

View File

@ -18,7 +18,6 @@
#include "schema_scan_node.h"
#include <boost/algorithm/string.hpp>
#include <boost/foreach.hpp>
#include "exec/schema_scanner/schema_helper.h"
#include "exec/text_converter.hpp"

View File

@ -17,7 +17,6 @@
#include "exec/schema_scanner/schema_helper.h"
#include <boost/foreach.hpp>
#include <boost/functional/hash.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/thread.hpp>

View File

@ -24,7 +24,6 @@
#include <event2/keyvalq_struct.h>
#include <boost/algorithm/string.hpp>
#include <boost/foreach.hpp>
#include <sstream>
#include <string>
#include <vector>

View File

@ -17,8 +17,6 @@
#include "plugin/plugin_mgr.h"
#include <boost/foreach.hpp>
#include "common/config.h"
#include "gutil/strings/substitute.h"
@ -117,7 +115,7 @@ Status PluginMgr::get_plugin_list(int type, std::vector<std::shared_ptr<Plugin>>
std::lock_guard<std::mutex> l(_lock);
BOOST_FOREACH (const PluginLoaderMap::value_type& iter, _plugins[type]) {
for (const PluginLoaderMap::value_type& iter : _plugins[type]) {
plugin_list->push_back(iter.second->plugin());
}
@ -152,7 +150,7 @@ Status PluginMgr::register_builtin_plugin(const std::string& name, int type,
Status PluginMgr::get_all_plugin_info(std::vector<TPluginInfo>* plugin_info_list) {
for (int i = 0; i < PLUGIN_TYPE_MAX; ++i) {
BOOST_FOREACH (const PluginLoaderMap::value_type& iter, _plugins[i]) {
for (const PluginLoaderMap::value_type& iter : _plugins[i]) {
TPluginInfo info;
info.__set_plugin_name(iter.second->name());
info.__set_type(iter.second->type());

View File

@ -587,11 +587,13 @@ BufferedBlockMgr2::~BufferedBlockMgr2() {
// See IMPALA-1890.
DCHECK_EQ(_non_local_outstanding_writes, 0) << endl << debug_internal();
// Delete tmp files.
BOOST_FOREACH (TmpFileMgr::File& file, _tmp_files) { file.remove(); }
for (TmpFileMgr::File& file : _tmp_files) {
file.remove();
}
_tmp_files.clear();
// Free memory resources.
BOOST_FOREACH (BufferDescriptor* buffer, _all_io_buffers) {
for (BufferDescriptor* buffer : _all_io_buffers) {
_mem_tracker->Release(buffer->len);
delete[] buffer->buffer;
}
@ -780,8 +782,8 @@ Status BufferedBlockMgr2::write_unpinned_block(Block* block) {
disk_id = ++next_disk_id;
}
disk_id %= _io_mgr->num_local_disks();
DiskIoMgr::WriteRange::WriteDoneCallback callback =
bind(mem_fn(&BufferedBlockMgr2::write_complete), this, block, boost::placeholders::_1);
DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(
mem_fn(&BufferedBlockMgr2::write_complete), this, block, boost::placeholders::_1);
block->_write_range = _obj_pool.add(
new DiskIoMgr::WriteRange(tmp_file->path(), file_offset, disk_id, callback));
block->_tmp_file = tmp_file;
@ -1152,7 +1154,7 @@ bool BufferedBlockMgr2::validate() const {
return false;
}
BOOST_FOREACH (BufferDescriptor* buffer, _all_io_buffers) {
for (BufferDescriptor* buffer : _all_io_buffers) {
bool is_free = _free_io_buffers.contains(buffer);
num_free_io_buffers += is_free;

View File

@ -447,7 +447,7 @@ Status BufferedTupleStream2::unpin_stream(bool all) {
DCHECK(!_closed);
SCOPED_TIMER(_unpin_timer);
BOOST_FOREACH (BufferedBlockMgr2::Block* block, _blocks) {
for (BufferedBlockMgr2::Block* block : _blocks) {
if (!block->is_pinned()) {
continue;
}

View File

@ -22,7 +22,6 @@
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <boost/foreach.hpp>
#include <memory>
#include <sstream>
@ -172,7 +171,7 @@ void ClientCacheHelper::close_connections(const TNetworkAddress& hostport) {
}
VLOG_RPC << "Invalidating all " << cache_entry->second.size() << " clients for: " << hostport;
BOOST_FOREACH (void* client_key, cache_entry->second) {
for (void* client_key : cache_entry->second) {
ClientMap::iterator client_map_entry = _client_map.find(client_key);
DCHECK(client_map_entry != _client_map.end());
ThriftClientImpl* info = client_map_entry->second;
@ -202,7 +201,7 @@ void ClientCacheHelper::test_shutdown() {
std::vector<TNetworkAddress> hostports;
{
boost::lock_guard<boost::mutex> lock(_lock);
BOOST_FOREACH (const ClientCacheMap::value_type& i, _client_cache) {
for (const ClientCacheMap::value_type& i : _client_cache) {
hostports.push_back(i.first);
}
}

View File

@ -374,7 +374,8 @@ Status DataStreamRecvr::create_merger(const TupleRowComparator& less_than) {
return Status::OK();
}
Status DataStreamRecvr::create_parallel_merger(const TupleRowComparator& less_than, uint32_t batch_size, MemTracker* mem_tracker) {
Status DataStreamRecvr::create_parallel_merger(const TupleRowComparator& less_than,
uint32_t batch_size, MemTracker* mem_tracker) {
DCHECK(_is_merging);
vector<SortedRunMerger::RunBatchSupplier> child_input_batch_suppliers;
@ -424,7 +425,7 @@ void DataStreamRecvr::transfer_all_resources(RowBatch* transfer_batch) {
if (!_child_mergers.empty()) {
_merger->transfer_all_resources(transfer_batch);
} else {
BOOST_FOREACH (SenderQueue* sender_queue, _sender_queues) {
for (SenderQueue* sender_queue : _sender_queues) {
if (sender_queue->current_batch() != NULL) {
sender_queue->current_batch()->transfer_resource_ownership(transfer_batch);
}
@ -447,9 +448,8 @@ DataStreamRecvr::DataStreamRecvr(
_num_buffered_bytes(0),
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
_mem_tracker = MemTracker::CreateTracker(_profile, -1,
"DataStreamRecvr:" + print_id(_fragment_instance_id),
parent_tracker);
_mem_tracker = MemTracker::CreateTracker(
_profile, -1, "DataStreamRecvr:" + print_id(_fragment_instance_id), parent_tracker);
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;

View File

@ -189,7 +189,7 @@ string DiskIoMgr::debug_string() {
ss << " " << (void*)_disk_queues[i] << ":";
if (!_disk_queues[i]->request_contexts.empty()) {
ss << " Readers: ";
BOOST_FOREACH (RequestContext* req_context, _disk_queues[i]->request_contexts) {
for (RequestContext* req_context : _disk_queues[i]->request_contexts) {
ss << (void*)req_context;
}
}
@ -404,7 +404,8 @@ Status DiskIoMgr::init(const std::shared_ptr<MemTracker>& process_mem_tracker) {
return Status::OK();
}
Status DiskIoMgr::register_context(RequestContext** request_context, std::shared_ptr<MemTracker> mem_tracker) {
Status DiskIoMgr::register_context(RequestContext** request_context,
std::shared_ptr<MemTracker> mem_tracker) {
DCHECK(_request_context_cache) << "Must call init() first.";
*request_context = _request_context_cache->get_new_context();
(*request_context)->reset(std::move(mem_tracker));

View File

@ -18,7 +18,6 @@
#ifndef DORIS_BE_SRC_QUERY_RUNTIME_DISK_IO_MGR_H
#define DORIS_BE_SRC_QUERY_RUNTIME_DISK_IO_MGR_H
#include <boost/foreach.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>

View File

@ -92,7 +92,7 @@ void DiskIoMgr::RequestContext::cancel(const Status& status) {
}
}
BOOST_FOREACH (const WriteRange::WriteDoneCallback& write_callback, write_callbacks) {
for (const WriteRange::WriteDoneCallback& write_callback : write_callbacks) {
write_callback(_status);
}

View File

@ -20,7 +20,6 @@
#include <thrift/protocol/TDebugProtocol.h>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/foreach.hpp>
#include <boost/unordered_map.hpp>
#include "common/logging.h"
@ -131,10 +130,10 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
bytes_limit = _exec_env->process_mem_tracker()->limit();
}
// NOTE: this MemTracker only for olap
_mem_tracker = MemTracker::CreateTracker(
bytes_limit,
"PlanFragmentExecutor:" + print_id(_query_id) + ":" + print_id(params.fragment_instance_id),
_exec_env->process_mem_tracker());
_mem_tracker = MemTracker::CreateTracker(bytes_limit,
"PlanFragmentExecutor:" + print_id(_query_id) + ":" +
print_id(params.fragment_instance_id),
_exec_env->process_mem_tracker());
_runtime_state->set_fragment_mem_tracker(_mem_tracker);
LOG(INFO) << "Using query memory limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES);
@ -167,7 +166,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
// set #senders of exchange nodes before calling Prepare()
std::vector<ExecNode*> exch_nodes;
_plan->collect_nodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
BOOST_FOREACH (ExecNode* exch_node, exch_nodes) {
for (ExecNode* exch_node : exch_nodes) {
DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
int num_senders = find_with_default(params.per_exch_num_senders, exch_node->id(), 0);
DCHECK_GT(num_senders, 0);

View File

@ -123,7 +123,8 @@ public:
// Retrieves the first batch of sorted rows from the run.
Status init(bool* done) override {
*done = false;
_pull_task_thread = std::thread(&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task, this);
_pull_task_thread = std::thread(
&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task, this);
RETURN_IF_ERROR(next(NULL, done));
return Status::OK();
@ -144,7 +145,7 @@ public:
delete _input_row_batch;
std::unique_lock<std::mutex> lock(_mutex);
_batch_prepared_cv.wait(lock, [this](){ return _backup_ready.load(); });
_batch_prepared_cv.wait(lock, [this]() { return _backup_ready.load(); });
// switch input_row_batch_backup to _input_row_batch
_input_row_batch = _input_row_batch_backup;
@ -187,9 +188,7 @@ private:
// do merge from sender queue data
_status_backup = _sorted_run(&_input_row_batch_backup);
_backup_ready = true;
DeferOp defer_op([this]() {
_batch_prepared_cv.notify_one();
});
DeferOp defer_op([this]() { _batch_prepared_cv.notify_one(); });
if (!_status_backup.ok() || _input_row_batch_backup == nullptr || _cancel) {
if (!_status_backup.ok()) _input_row_batch_backup = nullptr;
@ -237,9 +236,10 @@ SortedRunMerger::SortedRunMerger(const TupleRowComparator& compare_less_than,
Status SortedRunMerger::prepare(const vector<RunBatchSupplier>& input_runs, bool parallel) {
DCHECK_EQ(_min_heap.size(), 0);
_min_heap.reserve(input_runs.size());
BOOST_FOREACH (const RunBatchSupplier& input_run, input_runs) {
BatchedRowSupplier* new_elem = _pool.add(
parallel ? new ParallelBatchedRowSupplier(this, input_run) : new BatchedRowSupplier(this, input_run));
for (const RunBatchSupplier& input_run : input_runs) {
BatchedRowSupplier* new_elem =
_pool.add(parallel ? new ParallelBatchedRowSupplier(this, input_run)
: new BatchedRowSupplier(this, input_run));
DCHECK(new_elem != NULL);
bool empty = false;
RETURN_IF_ERROR(new_elem->init(&empty));
@ -256,7 +256,7 @@ Status SortedRunMerger::prepare(const vector<RunBatchSupplier>& input_runs, bool
return Status::OK();
}
void SortedRunMerger::transfer_all_resources(class doris::RowBatch * transfer_resource_batch) {
void SortedRunMerger::transfer_all_resources(class doris::RowBatch* transfer_resource_batch) {
for (BatchedRowSupplier* batched_row_supplier : _min_heap) {
auto row_batch = batched_row_supplier->get_row_batch();
if (row_batch != nullptr) {
@ -306,17 +306,15 @@ Status SortedRunMerger::get_next(RowBatch* output_batch, bool* eos) {
}
ChildSortedRunMerger::ChildSortedRunMerger(const TupleRowComparator& compare_less_than,
RowDescriptor* row_desc,
RuntimeProfile* profile,
MemTracker* parent,
uint32_t row_batch_size,
bool deep_copy_input):
SortedRunMerger(compare_less_than, row_desc, profile, deep_copy_input),
_eos(false),
_parent(parent),
_row_batch_size(row_batch_size) {
_get_next_timer = ADD_TIMER(profile, "ChildMergeGetNext");
_get_next_batch_timer = ADD_TIMER(profile, "ChildMergeGetNextBatch");
RowDescriptor* row_desc, RuntimeProfile* profile,
MemTracker* parent, uint32_t row_batch_size,
bool deep_copy_input)
: SortedRunMerger(compare_less_than, row_desc, profile, deep_copy_input),
_eos(false),
_parent(parent),
_row_batch_size(row_batch_size) {
_get_next_timer = ADD_TIMER(profile, "ChildMergeGetNext");
_get_next_batch_timer = ADD_TIMER(profile, "ChildMergeGetNextBatch");
}
Status ChildSortedRunMerger::get_batch(RowBatch** output_batch) {

View File

@ -496,7 +496,7 @@ Status SpillSorter::Run::add_batch(RowBatch* batch, int start_index, int* num_pr
// Sorting of tuples containing array values is not implemented. The planner
// combined with projection should guarantee that none are in each tuple.
// BOOST_FOREACH(const SlotDescriptor* collection_slot,
// for(const SlotDescriptor* collection_slot :
// _sort_tuple_desc->collection_slots()) {
// DCHECK(new_tuple->is_null(collection_slot->null_indicator_offset()));
// }
@ -535,13 +535,13 @@ Status SpillSorter::Run::add_batch(RowBatch* batch, int start_index, int* num_pr
void SpillSorter::Run::transfer_resources(RowBatch* row_batch) {
DCHECK(row_batch != NULL);
BOOST_FOREACH (BufferedBlockMgr2::Block* block, _fixed_len_blocks) {
for (BufferedBlockMgr2::Block* block : _fixed_len_blocks) {
if (block != NULL) {
row_batch->add_block(block);
}
}
_fixed_len_blocks.clear();
BOOST_FOREACH (BufferedBlockMgr2::Block* block, _var_len_blocks) {
for (BufferedBlockMgr2::Block* block : _var_len_blocks) {
if (block != NULL) {
row_batch->add_block(block);
}
@ -554,13 +554,13 @@ void SpillSorter::Run::transfer_resources(RowBatch* row_batch) {
}
void SpillSorter::Run::delete_all_blocks() {
BOOST_FOREACH (BufferedBlockMgr2::Block* block, _fixed_len_blocks) {
for (BufferedBlockMgr2::Block* block : _fixed_len_blocks) {
if (block != NULL) {
block->del();
}
}
_fixed_len_blocks.clear();
BOOST_FOREACH (BufferedBlockMgr2::Block* block, _var_len_blocks) {
for (BufferedBlockMgr2::Block* block : _var_len_blocks) {
if (block != NULL) {
block->del();
}
@ -619,7 +619,9 @@ Status SpillSorter::Run::unpin_all_blocks() {
}
// Clear _var_len_blocks and replace with it with the contents of sorted_var_len_blocks
BOOST_FOREACH (BufferedBlockMgr2::Block* var_block, _var_len_blocks) { var_block->del(); }
for (BufferedBlockMgr2::Block* var_block : _var_len_blocks) {
var_block->del();
}
_var_len_blocks.clear();
sorted_var_len_blocks.swap(_var_len_blocks);
// Set _var_len_copy_block to NULL since it's now in _var_len_blocks and is no longer
@ -818,7 +820,7 @@ void SpillSorter::Run::collect_non_null_varslots(Tuple* src, vector<StringValue*
int* total_var_len) {
string_values->clear();
*total_var_len = 0;
BOOST_FOREACH (const SlotDescriptor* string_slot, _sort_tuple_desc->string_slots()) {
for (const SlotDescriptor* string_slot : _sort_tuple_desc->string_slots()) {
if (!src->is_null(string_slot->null_indicator_offset())) {
StringValue* string_val =
reinterpret_cast<StringValue*>(src->get_slot(string_slot->tuple_offset()));
@ -852,7 +854,7 @@ Status SpillSorter::Run::try_add_block(vector<BufferedBlockMgr2::Block*>* block_
}
void SpillSorter::Run::copy_var_len_data(char* dest, const vector<StringValue*>& string_values) {
BOOST_FOREACH (StringValue* string_val, string_values) {
for (StringValue* string_val : string_values) {
memcpy(dest, string_val->ptr, string_val->len);
string_val->ptr = dest;
dest += string_val->len;
@ -861,7 +863,7 @@ void SpillSorter::Run::copy_var_len_data(char* dest, const vector<StringValue*>&
void SpillSorter::Run::copy_var_len_data_convert_offset(char* dest, int64_t offset,
const vector<StringValue*>& string_values) {
BOOST_FOREACH (StringValue* string_val, string_values) {
for (StringValue* string_val : string_values) {
memcpy(dest, string_val->ptr, string_val->len);
string_val->ptr = reinterpret_cast<char*>(offset);
dest += string_val->len;
@ -1316,7 +1318,8 @@ Status SpillSorter::create_merger(int num_runs) {
RETURN_IF_ERROR(run->prepare_read());
// Run::get_next_batch() is used by the merger to retrieve a batch of rows to merge
// from this run.
merge_runs.push_back(bind<Status>(mem_fn(&Run::get_next_batch), run, boost::placeholders::_1));
merge_runs.push_back(
bind<Status>(mem_fn(&Run::get_next_batch), run, boost::placeholders::_1));
_sorted_runs.pop_front();
_merging_runs.push_back(run);
}

View File

@ -18,12 +18,11 @@
#include "runtime/tmp_file_mgr.h"
#include <boost/algorithm/string.hpp>
#include <filesystem>
#include <boost/foreach.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread/locks.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <filesystem>
// #include <gutil/strings/substitute.h>
// #include <gutil/strings/join.h>
@ -84,7 +83,8 @@ Status TmpFileMgr::init_custom(const vector<string>& tmp_dirs, bool one_dir_per_
// For each tmp directory, find the disk it is on,
// so additional tmp directories on the same disk can be skipped.
for (int i = 0; i < tmp_dirs.size(); ++i) {
std::filesystem::path tmp_path = std::string_view(boost::trim_right_copy_if(tmp_dirs[i], is_any_of("/")));
std::filesystem::path tmp_path =
std::string_view(boost::trim_right_copy_if(tmp_dirs[i], is_any_of("/")));
tmp_path = std::filesystem::absolute(tmp_path);
path scratch_subdir_path(tmp_path / _s_tmp_sub_dir_name);
// tmp_path must be a writable directory.

View File

@ -17,7 +17,6 @@
#include "runtime/types.h"
#include <boost/foreach.hpp>
#include <ostream>
#include <sstream>
@ -92,7 +91,9 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type) const {
node.struct_fields.back().name = field_name;
}
}
BOOST_FOREACH (const TypeDescriptor& child, children) { child.to_thrift(thrift_type); }
for (const TypeDescriptor& child : children) {
child.to_thrift(thrift_type);
}
} else {
node.type = TTypeNodeType::SCALAR;
node.__set_scalar_type(TScalarType());

View File

@ -19,7 +19,6 @@
#include <errno.h>
#include <boost/foreach.hpp>
#include <cstring>
#include <sstream>
@ -138,9 +137,9 @@ ErrorMsg ErrorMsg::init(TErrorCode::type error, const ArgType& arg0,
}
void print_error_map(ostream* stream, const ErrorLogMap& errors) {
BOOST_FOREACH(const ErrorLogMap::value_type& v, errors) {
for (const ErrorLogMap::value_type& v : errors) {
if (v.first == TErrorCode::GENERAL) {
BOOST_FOREACH(const string& s, v.second.messages) {
for (const string& s : v.second.messages) {
*stream << s << "\n";
}
} else {
@ -161,7 +160,7 @@ string print_error_map_to_string(const ErrorLogMap& errors) {
}
void merge_error_maps(ErrorLogMap* left, const ErrorLogMap& right) {
BOOST_FOREACH(const ErrorLogMap::value_type& v, right) {
for (const ErrorLogMap::value_type& v : right) {
// Append generic message, append specific codes or increment count if exists
if (v.first == TErrorCode::GENERAL) {
(*left)[v.first].messages.insert(

View File

@ -25,7 +25,6 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <boost/foreach.hpp>
#include <sstream>
namespace doris {
@ -102,7 +101,7 @@ Status hostname_to_ip_addrs(const std::string& name, std::vector<std::string>* a
}
bool find_first_non_localhost(const std::vector<std::string>& addresses, std::string* addr) {
BOOST_FOREACH (const std::string& candidate, addresses) {
for (const std::string& candidate : addresses) {
if (candidate != LOCALHOST) {
*addr = candidate;
return true;

View File

@ -17,7 +17,6 @@
#include "util/runtime_profile.h"
#include <boost/foreach.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/thread.hpp>
#include <iomanip>
@ -188,7 +187,7 @@ void RuntimeProfile::update(const std::vector<TRuntimeProfileNode>& nodes, int*
{
boost::lock_guard<boost::mutex> l(_info_strings_lock);
const InfoStrings& info_strings = node.info_strings;
BOOST_FOREACH (const std::string& key, node.info_strings_display_order) {
for (const std::string& key : node.info_strings_display_order) {
// Look for existing info strings and update in place. If there
// are new strings, add them to the end of the display order.
// TODO: Is nodes.info_strings always a superset of
@ -514,7 +513,7 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co
{
boost::lock_guard<boost::mutex> l(_info_strings_lock);
BOOST_FOREACH (const std::string& key, _info_strings_display_order) {
for (const std::string& key : _info_strings_display_order) {
stream << prefix << " - " << key << ": " << _info_strings.find(key)->second
<< std::endl;
}
@ -528,13 +527,13 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co
// - Event 3: 2s410ms (121.138ms)
// The times in parentheses are the time elapsed since the last event.
boost::lock_guard<boost::mutex> l(_event_sequences_lock);
BOOST_FOREACH (const EventSequenceMap::value_type& event_sequence, _event_sequence_map) {
for (const EventSequenceMap::value_type& event_sequence : _event_sequence_map) {
stream << prefix << " " << event_sequence.first << ": "
<< PrettyPrinter::print(event_sequence.second->elapsed_time(), TUnit::TIME_NS)
<< std::endl;
int64_t last = 0L;
BOOST_FOREACH (const EventSequence::Event& event, event_sequence.second->events()) {
for (const EventSequence::Event& event : event_sequence.second->events()) {
stream << prefix << " - " << event.first << ": "
<< PrettyPrinter::print(event.second, TUnit::TIME_NS) << " ("
<< PrettyPrinter::print(event.second - last, TUnit::TIME_NS) << ")"
@ -783,7 +782,7 @@ void RuntimeProfile::stop_bucketing_counters_updates(std::vector<Counter*>* buck
}
if (convert && num_sampled > 0) {
BOOST_FOREACH (Counter* counter, *buckets) {
for (Counter* counter : *buckets) {
double perc = 100 * counter->value() / (double)num_sampled;
counter->set(perc);
}
@ -873,7 +872,7 @@ void RuntimeProfile::print_child_counters(const std::string& prefix,
if (itr != child_counter_map.end()) {
const std::set<std::string>& child_counters = itr->second;
BOOST_FOREACH (const std::string& child_counter, child_counters) {
for (const std::string& child_counter : child_counters) {
CounterMap::const_iterator iter = counter_map.find(child_counter);
DCHECK(iter != counter_map.end());
stream << prefix << " - " << iter->first << ": "

View File

@ -17,7 +17,6 @@
#include "util/thrift_rpc_helper.h"
#include <boost/foreach.hpp>
#include <boost/functional/hash.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/thread.hpp>

View File

@ -19,7 +19,6 @@
#include <gtest/gtest.h>
#include <boost/foreach.hpp>
#include <string>
#include "common/object_pool.h"

View File

@ -19,7 +19,6 @@
#include <gtest/gtest.h>
#include <boost/foreach.hpp>
#include <string>
#include "common/object_pool.h"

View File

@ -22,9 +22,9 @@
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <filesystem>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
#include <filesystem>
#include "runtime/disk_io_mgr.h"
#include "runtime/exec_env.h"
@ -325,7 +325,9 @@ protected:
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
EXPECT_EQ(block_mgr->bytes_allocated(), max_num_buffers * block_size);
BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->unpin(); }
for (BufferedBlockMgr2::Block* block : blocks) {
block->unpin();
}
// Re-pinning all blocks
for (int i = 0; i < blocks.size(); ++i) {
@ -339,7 +341,9 @@ protected:
EXPECT_EQ(buffered_pin->value(), buffered_pins_expected);
// Unpin all blocks
BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->unpin(); }
for (BufferedBlockMgr2::Block* block : blocks) {
block->unpin();
}
// Get two new blocks.
AllocateBlocks(block_mgr, client, 2, &blocks);
// At least two writes must be issued. The first (num_blocks - 2) must be in memory.
@ -705,7 +709,9 @@ TEST_F(BufferedBlockMgrTest, Deletion) {
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
EXPECT_TRUE(created_cnt->value() == max_num_buffers);
BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->del(); }
for (BufferedBlockMgr2::Block* block : blocks) {
block->del();
}
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
EXPECT_TRUE(created_cnt->value() == max_num_buffers);
EXPECT_TRUE(recycled_cnt->value() == max_num_buffers);