From 98e80aa65e7111da7201525ec8ad0338da6f42bc Mon Sep 17 00:00:00 2001 From: Zhengguo Yang Date: Sun, 9 May 2021 22:00:48 +0800 Subject: [PATCH] [refactor] Replace boost::function with std::function (#5700) Replace boost::function with std::function --- be/src/exec/aggregation_node.cpp | 1 - be/src/exec/exec_node.cpp | 7 +- be/src/exec/olap_scan_node.cpp | 2 +- be/src/exec/schema_scanner/schema_helper.cpp | 1 - be/src/http/default_path_handlers.cpp | 16 ++- be/src/http/web_page_handler.h | 6 +- be/src/runtime/buffered_block_mgr2.cc | 6 +- be/src/runtime/buffered_tuple_stream2.cc | 2 - be/src/runtime/buffered_tuple_stream3.cc | 2 - be/src/runtime/buffered_tuple_stream3.h | 2 +- be/src/runtime/bufferpool/buffer_allocator.cc | 4 +- be/src/runtime/bufferpool/buffer_pool.cc | 9 +- .../runtime/bufferpool/buffer_pool_internal.h | 2 +- be/src/runtime/client_cache.h | 11 +- be/src/runtime/data_stream_mgr.cpp | 1 - be/src/runtime/data_stream_recvr.cc | 8 +- be/src/runtime/data_stream_sender.cpp | 12 +- be/src/runtime/disk_io_mgr.cc | 2 +- be/src/runtime/disk_io_mgr.h | 2 +- be/src/runtime/dpp_sink.cpp | 2 +- be/src/runtime/fragment_mgr.cpp | 13 +- be/src/runtime/plan_fragment_executor.cpp | 10 +- be/src/runtime/plan_fragment_executor.h | 4 +- be/src/runtime/raw_value.cpp | 1 - be/src/runtime/raw_value.h | 1 - be/src/runtime/result_buffer_mgr.cpp | 2 - .../routine_load/data_consumer_group.cpp | 11 +- .../routine_load_task_executor.cpp | 26 ++-- be/src/runtime/sorted_run_merger.h | 2 +- be/src/runtime/spill_sorter.cc | 9 +- be/src/runtime/thread_resource_mgr.h | 4 +- be/src/util/hash_util.hpp | 132 ++++++++++-------- be/src/util/internal_queue.h | 4 +- be/src/util/priority_thread_pool.hpp | 4 +- be/src/util/runtime_profile.h | 6 +- be/src/util/string_util.cpp | 3 +- be/src/util/string_util.h | 1 - be/src/util/thrift_rpc_helper.cpp | 1 - be/src/util/uid_util.h | 5 +- be/src/util/uuid_generator.h | 1 - be/test/runtime/buffered_block_mgr2_test.cpp | 6 +- be/test/runtime/disk_io_mgr_test.cpp | 2 +- be/test/runtime/thread_resource_mgr_test.cpp | 4 +- be/test/util/blocking_queue_test.cpp | 6 +- 44 files changed, 182 insertions(+), 174 deletions(-) diff --git a/be/src/exec/aggregation_node.cpp b/be/src/exec/aggregation_node.cpp index 3edb36e2a5..43e3bd8c90 100644 --- a/be/src/exec/aggregation_node.cpp +++ b/be/src/exec/aggregation_node.cpp @@ -21,7 +21,6 @@ #include #include -#include #include #include "exec/hash_table.hpp" diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 298e7072b5..675cb1e4c3 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -169,13 +169,14 @@ Status ExecNode::prepare(RuntimeState* state) { _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); _rows_returned_rate = runtime_profile()->add_derived_counter( ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, - boost::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, - runtime_profile()->total_time_counter()), + std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, + runtime_profile()->total_time_counter()), ""); _mem_tracker = MemTracker::CreateTracker(_runtime_profile.get(), -1, "ExecNode:" + _runtime_profile->name(), state->instance_mem_tracker()); - _expr_mem_tracker = MemTracker::CreateTracker(-1, "ExecNode:Exprs:" + _runtime_profile->name(), _mem_tracker); + _expr_mem_tracker = MemTracker::CreateTracker(-1, "ExecNode:Exprs:" + _runtime_profile->name(), + _mem_tracker); _expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get())); // TODO chenhao RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc(), expr_mem_tracker())); diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 8a8f386918..b1099fb0c6 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -1294,7 +1294,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { auto iter = olap_scanners.begin(); while (iter != olap_scanners.end()) { PriorityThreadPool::Task task; - task.work_function = boost::bind(&OlapScanNode::scanner_thread, this, *iter); + task.work_function = std::bind(&OlapScanNode::scanner_thread, this, *iter); task.priority = _nice; (*iter)->start_wait_worker_timer(); if (thread_pool->offer(task)) { diff --git a/be/src/exec/schema_scanner/schema_helper.cpp b/be/src/exec/schema_scanner/schema_helper.cpp index 0824705fa2..b0f9c08cfd 100644 --- a/be/src/exec/schema_scanner/schema_helper.cpp +++ b/be/src/exec/schema_scanner/schema_helper.cpp @@ -17,7 +17,6 @@ #include "exec/schema_scanner/schema_helper.h" -#include #include #include diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index b24267562e..d11991d36f 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -21,7 +21,6 @@ #include #include -#include #include #include "agent/utils.h" @@ -336,18 +335,21 @@ void add_default_path_handlers(WebPageHandler* web_page_handler, // TODO(yingchun): logs_handler is not implemented yet, so not show it on navigate bar web_page_handler->register_page("/logs", "Logs", logs_handler, false /* is_on_nav_bar */); web_page_handler->register_page("/varz", "Configs", config_handler, true /* is_on_nav_bar */); - web_page_handler->register_page( - "/memz", "Memory", boost::bind(&mem_usage_handler, process_mem_tracker, boost::placeholders::_1, boost::placeholders::_2), - true /* is_on_nav_bar */); + web_page_handler->register_page("/memz", "Memory", + std::bind(&mem_usage_handler, process_mem_tracker, + std::placeholders::_1, std::placeholders::_2), + true /* is_on_nav_bar */); web_page_handler->register_page("/mem_tracker", "MemTracker", mem_tracker_handler, true /* is_on_nav_bar */); web_page_handler->register_page("/heap", "Heap Profile", heap_handler, true /* is_on_nav_bar */); web_page_handler->register_page("/cpu", "CPU Profile", cpu_handler, true /* is_on_nav_bar */); register_thread_display_page(web_page_handler); - web_page_handler->register_template_page("/tablets_page", "Tablets", - boost::bind(&display_tablets_callback, boost::placeholders::_1, boost::placeholders::_2), - true /* is_on_nav_bar */); + web_page_handler->register_template_page( + "/tablets_page", "Tablets", + std::bind(&display_tablets_callback, std::placeholders::_1, + std::placeholders::_2), + true /* is_on_nav_bar */); } } // namespace doris diff --git a/be/src/http/web_page_handler.h b/be/src/http/web_page_handler.h index bf47e0033e..c1863d9c1b 100644 --- a/be/src/http/web_page_handler.h +++ b/be/src/http/web_page_handler.h @@ -18,7 +18,7 @@ #ifndef DORIS_BE_SRC_COMMON_UTIL_WEB_PAGE_HANDLER_H #define DORIS_BE_SRC_COMMON_UTIL_WEB_PAGE_HANDLER_H -#include +#include #include #include #include @@ -37,9 +37,9 @@ class EvHttpServer; class WebPageHandler : public HttpHandler { public: typedef std::map ArgumentMap; - typedef boost::function + typedef std::function PageHandlerCallback; - typedef boost::function + typedef std::function TemplatePageHandlerCallback; WebPageHandler(EvHttpServer* http_server); diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc index 6fd97a8408..9621092500 100644 --- a/be/src/runtime/buffered_block_mgr2.cc +++ b/be/src/runtime/buffered_block_mgr2.cc @@ -39,8 +39,8 @@ using std::vector; using std::list; using std::endl; -using boost::bind; -using boost::mem_fn; +using std::bind; +using std::mem_fn; using std::lock_guard; using std::mutex; using boost::scoped_array; @@ -783,7 +783,7 @@ Status BufferedBlockMgr2::write_unpinned_block(Block* block) { } disk_id %= _io_mgr->num_local_disks(); DiskIoMgr::WriteRange::WriteDoneCallback callback = bind( - mem_fn(&BufferedBlockMgr2::write_complete), this, block, boost::placeholders::_1); + mem_fn(&BufferedBlockMgr2::write_complete), this, block, std::placeholders::_1); block->_write_range = _obj_pool.add( new DiskIoMgr::WriteRange(tmp_file->path(), file_offset, disk_id, callback)); block->_tmp_file = tmp_file; diff --git a/be/src/runtime/buffered_tuple_stream2.cc b/be/src/runtime/buffered_tuple_stream2.cc index 6400830356..b127e9f79f 100644 --- a/be/src/runtime/buffered_tuple_stream2.cc +++ b/be/src/runtime/buffered_tuple_stream2.cc @@ -17,8 +17,6 @@ #include "runtime/buffered_tuple_stream2.h" -#include - #include "runtime/descriptors.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" diff --git a/be/src/runtime/buffered_tuple_stream3.cc b/be/src/runtime/buffered_tuple_stream3.cc index 126798727a..2ea57a7528 100644 --- a/be/src/runtime/buffered_tuple_stream3.cc +++ b/be/src/runtime/buffered_tuple_stream3.cc @@ -17,8 +17,6 @@ #include -#include - #include "runtime/buffered_tuple_stream3.inline.h" #include "runtime/bufferpool/reservation_tracker.h" //#include "runtime/collection_value.h" diff --git a/be/src/runtime/buffered_tuple_stream3.h b/be/src/runtime/buffered_tuple_stream3.h index 21b7f49f69..8aaf8f0df7 100644 --- a/be/src/runtime/buffered_tuple_stream3.h +++ b/be/src/runtime/buffered_tuple_stream3.h @@ -18,8 +18,8 @@ #ifndef DORIS_BE_RUNTIME_BUFFERED_TUPLE_STREAM_H #define DORIS_BE_RUNTIME_BUFFERED_TUPLE_STREAM_H -#include #include +#include #include #include diff --git a/be/src/runtime/bufferpool/buffer_allocator.cc b/be/src/runtime/bufferpool/buffer_allocator.cc index 8dbdb453dd..a3bbe4c6c2 100644 --- a/be/src/runtime/bufferpool/buffer_allocator.cc +++ b/be/src/runtime/bufferpool/buffer_allocator.cc @@ -17,7 +17,6 @@ #include "runtime/bufferpool/buffer_allocator.h" -#include #include #include "common/atomic.h" @@ -721,7 +720,8 @@ std::string BufferPool::FreeBufferArena::DebugString() { << " free buffers: " << lists.num_free_buffers.load() << " low water mark: " << lists.low_water_mark << " clean pages: " << lists.num_clean_pages.load() << " "; - lists.clean_pages.iterate(boost::bind(Page::DebugStringCallback, &ss, boost::placeholders::_1)); + lists.clean_pages.iterate( + std::bind(Page::DebugStringCallback, &ss, std::placeholders::_1)); ss << "\n"; } return ss.str(); diff --git a/be/src/runtime/bufferpool/buffer_pool.cc b/be/src/runtime/bufferpool/buffer_pool.cc index f2749e4fa2..5fe0aed93d 100644 --- a/be/src/runtime/bufferpool/buffer_pool.cc +++ b/be/src/runtime/bufferpool/buffer_pool.cc @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include #include #include @@ -700,11 +699,13 @@ string BufferPool::Client::DebugString() { << " in_flight_write_bytes: " << in_flight_write_pages_.bytes() << " reservation: " << reservation_.DebugString(); ss << "\n " << pinned_pages_.size() << " pinned pages: "; - pinned_pages_.iterate(boost::bind(Page::DebugStringCallback, &ss, boost::placeholders::_1)); + pinned_pages_.iterate(std::bind(Page::DebugStringCallback, &ss, std::placeholders::_1)); ss << "\n " << dirty_unpinned_pages_.size() << " dirty unpinned pages: "; - dirty_unpinned_pages_.iterate(boost::bind(Page::DebugStringCallback, &ss, boost::placeholders::_1)); + dirty_unpinned_pages_.iterate( + std::bind(Page::DebugStringCallback, &ss, std::placeholders::_1)); ss << "\n " << in_flight_write_pages_.size() << " in flight write pages: "; - in_flight_write_pages_.iterate(boost::bind(Page::DebugStringCallback, &ss, boost::placeholders::_1)); + in_flight_write_pages_.iterate( + std::bind(Page::DebugStringCallback, &ss, std::placeholders::_1)); return ss.str(); } diff --git a/be/src/runtime/bufferpool/buffer_pool_internal.h b/be/src/runtime/bufferpool/buffer_pool_internal.h index fdfc1053c5..b2362d0756 100644 --- a/be/src/runtime/bufferpool/buffer_pool_internal.h +++ b/be/src/runtime/bufferpool/buffer_pool_internal.h @@ -113,7 +113,7 @@ public: return page; } - void iterate(boost::function fn) { list_.iterate(fn); } + void iterate(std::function fn) { list_.iterate(fn); } bool contains(Page* page) { return list_.contains(page); } Page* tail() { return list_.tail(); } bool empty() const { return list_.empty(); } diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h index 9ca367260e..4211e1e583 100644 --- a/be/src/runtime/client_cache.h +++ b/be/src/runtime/client_cache.h @@ -18,7 +18,6 @@ #ifndef DORIS_BE_RUNTIME_CLIENT_CACHE_H #define DORIS_BE_RUNTIME_CLIENT_CACHE_H -#include #include #include #include @@ -60,7 +59,7 @@ public: ~ClientCacheHelper(); // Callback method which produces a client object when one cannot be // found in the cache. Supplied by the ClientCache wrapper. - typedef boost::function + typedef std::function client_factory; // Return client for specific host/port in 'client'. If a client @@ -192,14 +191,14 @@ public: ClientCache() : _client_cache_helper() { _client_factory = - boost::bind(boost::mem_fn(&ClientCache::make_client), this, - boost::placeholders::_1, boost::placeholders::_2); + std::bind(std::mem_fn(&ClientCache::make_client), this, + std::placeholders::_1, std::placeholders::_2); } ClientCache(int max_cache_size) : _client_cache_helper(max_cache_size) { _client_factory = - boost::bind(boost::mem_fn(&ClientCache::make_client), this, - boost::placeholders::_1, boost::placeholders::_2); + std::bind(std::mem_fn(&ClientCache::make_client), this, + std::placeholders::_1, std::placeholders::_2); } // Close all clients connected to the supplied address, (e.g., in diff --git a/be/src/runtime/data_stream_mgr.cpp b/be/src/runtime/data_stream_mgr.cpp index b8f43d1ec9..32ff126a6b 100644 --- a/be/src/runtime/data_stream_mgr.cpp +++ b/be/src/runtime/data_stream_mgr.cpp @@ -17,7 +17,6 @@ #include "runtime/data_stream_mgr.h" -#include #include #include diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index b1f4443bc9..fd3a536a30 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -44,7 +44,7 @@ using boost::scoped_ptr; using std::unique_lock; using std::try_lock; using std::lock_guard; -using boost::mem_fn; +using std::mem_fn; namespace doris { @@ -366,7 +366,7 @@ Status DataStreamRecvr::create_merger(const TupleRowComparator& less_than) { for (int i = 0; i < _sender_queues.size(); ++i) { child_input_batch_suppliers.emplace_back( - bind(mem_fn(&SenderQueue::get_batch), _sender_queues[i], boost::placeholders::_1)); + bind(mem_fn(&SenderQueue::get_batch), _sender_queues[i], std::placeholders::_1)); } RETURN_IF_ERROR(_merger->prepare(child_input_batch_suppliers)); return Status::OK(); @@ -403,12 +403,12 @@ Status DataStreamRecvr::create_parallel_merger(const TupleRowComparator& less_th vector input_batch_suppliers; for (int j = i; j < std::min((size_t)i + step, _sender_queues.size()); ++j) { input_batch_suppliers.emplace_back(bind(mem_fn(&SenderQueue::get_batch), - _sender_queues[j], boost::placeholders::_1)); + _sender_queues[j], std::placeholders::_1)); } child_merger->prepare(input_batch_suppliers); child_input_batch_suppliers.emplace_back(bind(mem_fn(&SortedRunMerger::get_batch), - child_merger.get(), boost::placeholders::_1)); + child_merger.get(), std::placeholders::_1)); _child_mergers.emplace_back(std::move(child_merger)); } RETURN_IF_ERROR(_merger->prepare(child_input_batch_suppliers, true)); diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 9b9f60a68b..343baf5043 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -47,8 +47,8 @@ #include "service/backend_options.h" #include "service/brpc.h" #include "util/brpc_stub_cache.h" -#include "util/defer_op.h" #include "util/debug_util.h" +#include "util/defer_op.h" #include "util/network_util.h" #include "util/ref_count_closure.h" #include "util/thrift_client.h" @@ -468,9 +468,9 @@ Status DataStreamSender::prepare(RuntimeState* state) { << "])"; _profile = _pool->add(new RuntimeProfile(title.str())); SCOPED_TIMER(_profile->total_time_counter()); - _mem_tracker = MemTracker::CreateTracker(_profile, -1, - "DataStreamSender:" + print_id(state->fragment_instance_id()), - state->instance_mem_tracker()); + _mem_tracker = MemTracker::CreateTracker( + _profile, -1, "DataStreamSender:" + print_id(state->fragment_instance_id()), + state->instance_mem_tracker()); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) { // Randomize the order we open/transmit to channels to avoid thundering herd problems. @@ -492,8 +492,8 @@ Status DataStreamSender::prepare(RuntimeState* state) { _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime"); _overall_throughput = profile()->add_derived_counter( "OverallThroughput", TUnit::BYTES_PER_SECOND, - boost::bind(&RuntimeProfile::units_per_second, _bytes_sent_counter, - profile()->total_time_counter()), + std::bind(&RuntimeProfile::units_per_second, _bytes_sent_counter, + profile()->total_time_counter()), ""); for (int i = 0; i < _channels.size(); ++i) { RETURN_IF_ERROR(_channels[i]->init(state)); diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc index 73ed8f24bc..9841022115 100644 --- a/be/src/runtime/disk_io_mgr.cc +++ b/be/src/runtime/disk_io_mgr.cc @@ -387,7 +387,7 @@ Status DiskIoMgr::init(const std::shared_ptr& process_mem_tracker) { // _disk_thread_group.AddThread(new Thread("disk-io-mgr", ss.str(), // &DiskIoMgr::work_loop, this, _disk_queues[i])); _disk_thread_group.add_thread( - new boost::thread(boost::bind(&DiskIoMgr::work_loop, this, _disk_queues[i]))); + new boost::thread(std::bind(&DiskIoMgr::work_loop, this, _disk_queues[i]))); } } _request_context_cache.reset(new RequestContextCache(this)); diff --git a/be/src/runtime/disk_io_mgr.h b/be/src/runtime/disk_io_mgr.h index 9806b78d1f..89c487b4e1 100644 --- a/be/src/runtime/disk_io_mgr.h +++ b/be/src/runtime/disk_io_mgr.h @@ -507,7 +507,7 @@ public: // (TStatusCode::CANCELLED). The callback is only invoked if this WriteRange was // successfully added (i.e. add_write_range() succeeded). No locks are held while // the callback is invoked. - typedef boost::function WriteDoneCallback; + typedef std::function WriteDoneCallback; WriteRange(const std::string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback); diff --git a/be/src/runtime/dpp_sink.cpp b/be/src/runtime/dpp_sink.cpp index 309a0c0a4c..32eee41751 100644 --- a/be/src/runtime/dpp_sink.cpp +++ b/be/src/runtime/dpp_sink.cpp @@ -912,7 +912,7 @@ Status DppSink::finish(RuntimeState* state) { for (auto& iter : _translator_map) { for (auto& trans : iter.second) { state->exec_env()->etl_thread_pool()->offer( - boost::bind(&DppSink::process, this, state, trans, &latch)); + std::bind(&DppSink::process, this, state, trans, &latch)); } } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index bc22aeeed4..e7914d5415 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -20,7 +20,6 @@ #include #include -#include #include #include @@ -167,9 +166,9 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id, _fragment_instance_id(fragment_instance_id), _backend_num(backend_num), _exec_env(exec_env), - _executor(exec_env, boost::bind(boost::mem_fn(&FragmentExecState::coordinator_callback), - this, boost::placeholders::_1, boost::placeholders::_2, - boost::placeholders::_3)), + _executor(exec_env, std::bind(std::mem_fn(&FragmentExecState::coordinator_callback), + this, std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)), _timeout_second(-1), _fragments_ctx(fragments_ctx) { _start_time = DateTimeValue::local_time(); @@ -184,9 +183,9 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id, _backend_num(backend_num), _exec_env(exec_env), _coord_addr(coord_addr), - _executor(exec_env, boost::bind(boost::mem_fn(&FragmentExecState::coordinator_callback), - this, boost::placeholders::_1, boost::placeholders::_2, - boost::placeholders::_3)), + _executor(exec_env, std::bind(std::mem_fn(&FragmentExecState::coordinator_callback), + this, std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)), _timeout_second(-1) { _start_time = DateTimeValue::local_time(); } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index b3dd97db5a..2af9cb8fe0 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -105,8 +105,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, _average_thread_tokens = profile()->add_sampling_counter( "AverageThreadTokens", - boost::bind(boost::mem_fn(&ThreadResourceMgr::ResourcePool::num_threads), - _runtime_state->resource_pool())); + std::bind(std::mem_fn(&ThreadResourceMgr::ResourcePool::num_threads), + _runtime_state->resource_pool())); // if (_exec_env->process_mem_tracker() != NULL) { // // we have a global limit @@ -241,7 +241,7 @@ Status PlanFragmentExecutor::open() { // may block // TODO: if no report thread is started, make sure to send a final profile // at end, otherwise the coordinator hangs in case we finish w/ an error - if (!_report_status_cb.empty() && config::status_report_interval > 0) { + if (_report_status_cb && config::status_report_interval > 0) { std::unique_lock l(_report_thread_lock); _report_thread = boost::thread(&PlanFragmentExecutor::report_profile, this); // make sure the thread started up, otherwise report_profile() might get into a race @@ -353,7 +353,7 @@ void PlanFragmentExecutor::_collect_query_statistics() { void PlanFragmentExecutor::report_profile() { VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id(); - DCHECK(!_report_status_cb.empty()); + DCHECK(_report_status_cb); std::unique_lock l(_report_thread_lock); // tell Open() that we started _report_thread_started_cv.notify_one(); @@ -400,7 +400,7 @@ void PlanFragmentExecutor::report_profile() { } void PlanFragmentExecutor::send_report(bool done) { - if (_report_status_cb.empty()) { + if (!_report_status_cb) { return; } diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 2ad9e8a3bb..85311baad4 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -18,9 +18,9 @@ #ifndef DORIS_BE_RUNTIME_PLAN_FRAGMENT_EXECUTOR_H #define DORIS_BE_RUNTIME_PLAN_FRAGMENT_EXECUTOR_H -#include #include #include +#include #include #include "common/object_pool.h" @@ -75,7 +75,7 @@ public: // Note: this does not take a const RuntimeProfile&, because it might need to call // functions like PrettyPrint() or to_thrift(), neither of which is const // because they take locks. - typedef boost::function + typedef std::function report_status_callback; // report_status_cb, if !empty(), is used to report the accumulated profile diff --git a/be/src/runtime/raw_value.cpp b/be/src/runtime/raw_value.cpp index 2fc1a8a532..4bdf7cbf8a 100644 --- a/be/src/runtime/raw_value.cpp +++ b/be/src/runtime/raw_value.cpp @@ -17,7 +17,6 @@ #include "runtime/raw_value.h" -#include #include #include "olap/utils.h" diff --git a/be/src/runtime/raw_value.h b/be/src/runtime/raw_value.h index 18a7f5c753..761e59e0ea 100644 --- a/be/src/runtime/raw_value.h +++ b/be/src/runtime/raw_value.h @@ -18,7 +18,6 @@ #ifndef DORIS_BE_RUNTIME_RAW_VALUE_H #define DORIS_BE_RUNTIME_RAW_VALUE_H -#include #include #include "common/logging.h" diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index b1e74bd445..3076e0f891 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -17,8 +17,6 @@ #include "runtime/result_buffer_mgr.h" -#include - #include "gen_cpp/PaloInternalService_types.h" #include "gen_cpp/types.pb.h" #include "runtime/buffer_control_block.h" diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index d3ffad8fe9..b81cfab2a1 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -67,7 +67,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { Status result_st = Status::OK(); // start all consumers for (auto& consumer : _consumers) { - if (!_thread_pool.offer(boost::bind( + if (!_thread_pool.offer(std::bind( &KafkaDataConsumerGroup::actual_consume, this, consumer, &_queue, ctx->max_interval_s * 1000, [this, &result_st](const Status& st) { std::unique_lock lock(_mutex); @@ -86,7 +86,8 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { << ", group id: " << _grp_id; return Status::InternalError("failed to submit data consumer"); } else { - VLOG_CRITICAL << "submit a data consumer: " << consumer->id() << ", group id: " << _grp_id; + VLOG_CRITICAL << "submit a data consumer: " << consumer->id() + << ", group id: " << _grp_id; } } @@ -163,8 +164,8 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { bool res = _queue.blocking_get(&msg); if (res) { VLOG_NOTICE << "get kafka message" - << ", partition: " << msg->partition() << ", offset: " << msg->offset() - << ", len: " << msg->len(); + << ", partition: " << msg->partition() << ", offset: " << msg->offset() + << ", len: " << msg->len(); (kafka_pipe.get()->*append_data)(static_cast(msg->payload()), static_cast(msg->len())); @@ -174,7 +175,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { left_bytes -= msg->len(); cmt_offset[msg->partition()] = msg->offset(); VLOG_NOTICE << "consume partition[" << msg->partition() << " - " << msg->offset() - << "]"; + << "]"; } else { // failed to append this msg, we must stop LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 661ef57cee..6d40c9db1f 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -169,19 +169,19 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { _task_map[ctx->id] = ctx; // offer the task to thread pool - if (!_thread_pool.offer(boost::bind(&RoutineLoadTaskExecutor::exec_task, this, ctx, - &_data_consumer_pool, [this](StreamLoadContext* ctx) { - std::unique_lock l(_lock); - _task_map.erase(ctx->id); - LOG(INFO) << "finished routine load task " - << ctx->brief() << ", status: " - << ctx->status.get_error_msg() - << ", current tasks num: " - << _task_map.size(); - if (ctx->unref()) { - delete ctx; - } - }))) { + if (!_thread_pool.offer(std::bind(&RoutineLoadTaskExecutor::exec_task, this, ctx, + &_data_consumer_pool, [this](StreamLoadContext* ctx) { + std::unique_lock l(_lock); + _task_map.erase(ctx->id); + LOG(INFO) << "finished routine load task " + << ctx->brief() << ", status: " + << ctx->status.get_error_msg() + << ", current tasks num: " + << _task_map.size(); + if (ctx->unref()) { + delete ctx; + } + }))) { // failed to submit task, clear and return LOG(WARNING) << "failed to submit routine load task: " << ctx->brief(); _task_map.erase(ctx->id); diff --git a/be/src/runtime/sorted_run_merger.h b/be/src/runtime/sorted_run_merger.h index d361bb15b1..a08ea8c0f5 100644 --- a/be/src/runtime/sorted_run_merger.h +++ b/be/src/runtime/sorted_run_merger.h @@ -46,7 +46,7 @@ public: // Function that returns the next batch of rows from an input sorted run. The batch // is owned by the supplier (i.e. not SortedRunMerger). eos is indicated by an NULL // batch being returned. - typedef boost::function RunBatchSupplier; + typedef std::function RunBatchSupplier; SortedRunMerger(const TupleRowComparator& compare_less_than, RowDescriptor* row_desc, RuntimeProfile* profile, bool deep_copy_input); diff --git a/be/src/runtime/spill_sorter.cc b/be/src/runtime/spill_sorter.cc index 25c9dd50d8..4d5d655514 100644 --- a/be/src/runtime/spill_sorter.cc +++ b/be/src/runtime/spill_sorter.cc @@ -17,7 +17,6 @@ #include "runtime/spill_sorter.h" -#include #include #include @@ -32,9 +31,9 @@ using std::deque; using std::string; using std::vector; -using boost::bind; -using boost::function; -using boost::mem_fn; +using std::bind; +using std::function; +using std::mem_fn; using boost::scoped_ptr; namespace doris { @@ -1319,7 +1318,7 @@ Status SpillSorter::create_merger(int num_runs) { // Run::get_next_batch() is used by the merger to retrieve a batch of rows to merge // from this run. merge_runs.push_back( - bind(mem_fn(&Run::get_next_batch), run, boost::placeholders::_1)); + bind(mem_fn(&Run::get_next_batch), run, std::placeholders::_1)); _sorted_runs.pop_front(); _merging_runs.push_back(run); } diff --git a/be/src/runtime/thread_resource_mgr.h b/be/src/runtime/thread_resource_mgr.h index 7f8482451e..ca8840e836 100644 --- a/be/src/runtime/thread_resource_mgr.h +++ b/be/src/runtime/thread_resource_mgr.h @@ -20,10 +20,10 @@ #include -#include #include #include #include +#include #include #include @@ -76,7 +76,7 @@ public: // variable semantics). // TODO: this is manageable now since it just needs to call into the io // mgr. What's the best model for something more general. - typedef boost::function thread_available_cb; + typedef std::function thread_available_cb; // Pool abstraction for a single resource pool. // TODO: this is not quite sufficient going forward. We need a hierarchy of pools, diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp index ccbc79bfa8..7d053a1709 100644 --- a/be/src/util/hash_util.hpp +++ b/be/src/util/hash_util.hpp @@ -18,8 +18,8 @@ #ifndef DORIS_BE_SRC_COMMON_UTIL_HASH_UTIL_HPP #define DORIS_BE_SRC_COMMON_UTIL_HASH_UTIL_HPP -#include "common/logging.h" #include "common/compiler_util.h" +#include "common/logging.h" // For cross compiling with clang, we need to be able to generate an IR file with // no sse instructions. Attempting to load a precompiled IR file that contains @@ -29,9 +29,10 @@ #include #endif #include + +#include "gen_cpp/Types_types.h" #include "util/cpu_info.h" #include "util/murmur_hash3.h" -#include "gen_cpp/Types_types.h" namespace doris { @@ -134,27 +135,33 @@ public: const uint32_t c1 = 0xcc9e2d51; const uint32_t c2 = 0x1b873593; - const uint32_t * blocks = (const uint32_t *)(data + nblocks * 4); + const uint32_t* blocks = (const uint32_t*)(data + nblocks * 4); - for(int i = -nblocks; i; i++) { + for (int i = -nblocks; i; i++) { uint32_t k1 = blocks[i]; k1 *= c1; - k1 = rotl32(k1,15); + k1 = rotl32(k1, 15); k1 *= c2; h1 ^= k1; - h1 = rotl32(h1,13); + h1 = rotl32(h1, 13); h1 = h1 * 5 + 0xe6546b64; } - const uint8_t * tail = (const uint8_t*)(data + nblocks * 4); + const uint8_t* tail = (const uint8_t*)(data + nblocks * 4); uint32_t k1 = 0; - switch(len & 3) { - case 3: k1 ^= tail[2] << 16; - case 2: k1 ^= tail[1] << 8; - case 1: k1 ^= tail[0]; - k1 *= c1; k1 = rotl32(k1,15); k1 *= c2; h1 ^= k1; + switch (len & 3) { + case 3: + k1 ^= tail[2] << 16; + case 2: + k1 ^= tail[1] << 8; + case 1: + k1 ^= tail[0]; + k1 *= c1; + k1 = rotl32(k1, 15); + k1 *= c2; + h1 ^= k1; }; h1 ^= len; @@ -182,21 +189,21 @@ public: const uint8_t* data2 = reinterpret_cast(data); switch (len & 7) { - case 7: - h ^= uint64_t(data2[6]) << 48; - case 6: - h ^= uint64_t(data2[5]) << 40; - case 5: - h ^= uint64_t(data2[4]) << 32; - case 4: - h ^= uint64_t(data2[3]) << 24; - case 3: - h ^= uint64_t(data2[2]) << 16; - case 2: - h ^= uint64_t(data2[1]) << 8; - case 1: - h ^= uint64_t(data2[0]); - h *= MURMUR_PRIME; + case 7: + h ^= uint64_t(data2[6]) << 48; + case 6: + h ^= uint64_t(data2[5]) << 40; + case 5: + h ^= uint64_t(data2[4]) << 32; + case 4: + h ^= uint64_t(data2[3]) << 24; + case 3: + h ^= uint64_t(data2[2]) << 16; + case 2: + h ^= uint64_t(data2[1]) << 8; + case 1: + h ^= uint64_t(data2[0]); + h *= MURMUR_PRIME; } h ^= h >> MURMUR_R; @@ -207,7 +214,7 @@ public: // default values recommended by http://isthe.com/chongo/tech/comp/fnv/ static const uint32_t FNV_PRIME = 0x01000193; // 16777619 - static const uint32_t FNV_SEED = 0x811C9DC5; // 2166136261 + static const uint32_t FNV_SEED = 0x811C9DC5; // 2166136261 static const uint64_t FNV64_PRIME = 1099511628211UL; static const uint64_t FNV64_SEED = 14695981039346656037UL; static const uint64_t MURMUR_PRIME = 0xc6a4a7935bd1e995ULL; @@ -243,24 +250,24 @@ public: // Our hash function is MurmurHash2, 64 bit version. // It was modified in order to provide the same result in // big and little endian archs (endian neutral). - static uint64_t murmur_hash64A (const void* key, int32_t len, unsigned int seed) { + static uint64_t murmur_hash64A(const void* key, int32_t len, unsigned int seed) { const uint64_t m = MURMUR_PRIME; const int r = 47; uint64_t h = seed ^ (len * m); - const uint8_t *data = (const uint8_t *)key; - const uint8_t *end = data + (len-(len&7)); + const uint8_t* data = (const uint8_t*)key; + const uint8_t* end = data + (len - (len & 7)); - while(data != end) { + while (data != end) { uint64_t k; #if (BYTE_ORDER == BIG_ENDIAN) - k = (uint64_t) data[0]; - k |= (uint64_t) data[1] << 8; - k |= (uint64_t) data[2] << 16; - k |= (uint64_t) data[3] << 24; - k |= (uint64_t) data[4] << 32; - k |= (uint64_t) data[5] << 40; - k |= (uint64_t) data[6] << 48; - k |= (uint64_t) data[7] << 56; + k = (uint64_t)data[0]; + k |= (uint64_t)data[1] << 8; + k |= (uint64_t)data[2] << 16; + k |= (uint64_t)data[3] << 24; + k |= (uint64_t)data[4] << 32; + k |= (uint64_t)data[5] << 40; + k |= (uint64_t)data[6] << 48; + k |= (uint64_t)data[7] << 56; #else k = *((uint64_t*)data); #endif @@ -273,15 +280,22 @@ public: data += 8; } - switch(len & 7) { - case 7: h ^= (uint64_t)data[6] << 48; - case 6: h ^= (uint64_t)data[5] << 40; - case 5: h ^= (uint64_t)data[4] << 32; - case 4: h ^= (uint64_t)data[3] << 24; - case 3: h ^= (uint64_t)data[2] << 16; - case 2: h ^= (uint64_t)data[1] << 8; - case 1: h ^= (uint64_t)data[0]; - h *= m; + switch (len & 7) { + case 7: + h ^= (uint64_t)data[6] << 48; + case 6: + h ^= (uint64_t)data[5] << 40; + case 5: + h ^= (uint64_t)data[4] << 32; + case 4: + h ^= (uint64_t)data[3] << 24; + case 3: + h ^= (uint64_t)data[2] << 16; + case 2: + h ^= (uint64_t)data[1] << 8; + case 1: + h ^= (uint64_t)data[0]; + h *= m; }; h ^= h >> r; @@ -323,14 +337,20 @@ public: murmur_hash3_x64_64(data, bytes, seed, &hash); return hash; #endif - + } + // hash_combine is the same with boost hash_combine, + // except replace boost::hash with std::hash + template + static inline void hash_combine(std::size_t& seed, const T& v) { + std::hash hasher; + seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2); } }; -} +} // namespace doris namespace std { -template<> +template <> struct hash { std::size_t operator()(const doris::TUniqueId& id) const { std::size_t seed = 0; @@ -340,7 +360,7 @@ struct hash { } }; -template<> +template <> struct hash { size_t operator()(const doris::TNetworkAddress& address) const { std::size_t seed = 0; @@ -352,7 +372,7 @@ struct hash { #if !defined(IR_COMPILE) && __GNUC__ < 6 // Cause this is builtin function -template<> +template <> struct hash<__int128> { std::size_t operator()(const __int128& val) const { return doris::HashUtil::hash(&val, sizeof(val), 0); @@ -360,7 +380,7 @@ struct hash<__int128> { }; #endif -template<> +template <> struct hash> { size_t operator()(const std::pair& pair) const { size_t seed = 0; @@ -371,6 +391,6 @@ struct hash> { } }; -} +} // namespace std #endif diff --git a/be/src/util/internal_queue.h b/be/src/util/internal_queue.h index 8338ee713d..64d0c3ca8a 100644 --- a/be/src/util/internal_queue.h +++ b/be/src/util/internal_queue.h @@ -18,7 +18,7 @@ #ifndef DORIS_BE_SRC_UTIL_INTERNAL_QUEUE_H #define DORIS_BE_SRC_UTIL_INTERNAL_QUEUE_H -#include +#include #include #include "util/fake_lock.h" @@ -240,7 +240,7 @@ public: // Iterate over elements of queue, calling 'fn' for each element. If 'fn' returns // false, terminate iteration. It is invalid to call other InternalQueue methods // from 'fn'. - void iterate(boost::function fn) { + void iterate(std::function fn) { std::lock_guard lock(lock_); for (Node* current = head_; current != NULL; current = current->next_node) { if (!fn(reinterpret_cast(current))) return; diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 18d15ebc49..b39145365f 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -33,7 +33,7 @@ public: // Signature of a work-processing function. Takes the integer id of the thread which is // calling it (ids run from 0 to num_threads - 1) and a reference to the item to // process. - typedef boost::function WorkFunction; + typedef std::function WorkFunction; struct Task { public: @@ -57,7 +57,7 @@ public: : _work_queue(queue_size), _shutdown(false) { for (int i = 0; i < num_threads; ++i) { _threads.create_thread( - boost::bind(boost::mem_fn(&PriorityThreadPool::work_thread), this, i)); + std::bind(std::mem_fn(&PriorityThreadPool::work_thread), this, i)); } } diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index b0b9416136..7b562eed46 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -21,9 +21,9 @@ #include #include -#include #include #include +#include #include #include @@ -187,7 +187,7 @@ public: AtomicInt64 current_value_; }; - typedef boost::function DerivedCounterFunction; + typedef std::function DerivedCounterFunction; // A DerivedCounter also has a name and type, but the value is computed. // Do not call Set() and Update(). @@ -392,7 +392,7 @@ public: // Function that returns a counter metric. // Note: this function should not block (or take a long time). - typedef boost::function SampleFn; + typedef std::function SampleFn; // Add a rate counter to the current profile based on src_counter with name. // The rate counter is updated periodically based on the src counter. diff --git a/be/src/util/string_util.cpp b/be/src/util/string_util.cpp index dc29a9684e..bbd3077167 100644 --- a/be/src/util/string_util.cpp +++ b/be/src/util/string_util.cpp @@ -18,6 +18,7 @@ #include "util/string_util.h" #include "gutil/strings/split.h" +#include "util/hash_util.hpp" namespace doris { @@ -25,7 +26,7 @@ size_t hash_of_path(const std::string& identifier, const std::string& path) { size_t hash = std::hash()(identifier); std::vector path_parts = strings::Split(path, "/", strings::SkipWhitespace()); for (auto& part : path_parts) { - boost::hash_combine(hash, part); + HashUtil::hash_combine(hash, part); } return hash; } diff --git a/be/src/util/string_util.h b/be/src/util/string_util.h index 0f1a51a6f9..5f15ed4837 100644 --- a/be/src/util/string_util.h +++ b/be/src/util/string_util.h @@ -19,7 +19,6 @@ #include #include // to_lower_copy -#include #include #include #include diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp index f6d51e313d..63918ab608 100644 --- a/be/src/util/thrift_rpc_helper.cpp +++ b/be/src/util/thrift_rpc_helper.cpp @@ -17,7 +17,6 @@ #include "util/thrift_rpc_helper.h" -#include #include #include diff --git a/be/src/util/uid_util.h b/be/src/util/uid_util.h index 3979a9a56d..8a5a89aaeb 100644 --- a/be/src/util/uid_util.h +++ b/be/src/util/uid_util.h @@ -18,7 +18,6 @@ #ifndef DORIS_BE_SRC_UTIL_UID_UTIL_H #define DORIS_BE_SRC_UTIL_UID_UTIL_H -#include #include #include @@ -145,8 +144,8 @@ struct UniqueId { // This function must be called 'hash_value' to be picked up by boost. inline std::size_t hash_value(const doris::TUniqueId& id) { std::size_t seed = 0; - boost::hash_combine(seed, id.lo); - boost::hash_combine(seed, id.hi); + HashUtil::hash_combine(seed, id.lo); + HashUtil::hash_combine(seed, id.hi); return seed; } diff --git a/be/src/util/uuid_generator.h b/be/src/util/uuid_generator.h index 5d3aab0cea..0a78ca9b8c 100644 --- a/be/src/util/uuid_generator.h +++ b/be/src/util/uuid_generator.h @@ -17,7 +17,6 @@ #pragma once -#include #include #include #include diff --git a/be/test/runtime/buffered_block_mgr2_test.cpp b/be/test/runtime/buffered_block_mgr2_test.cpp index 1e34e8ca01..bf2cfd4452 100644 --- a/be/test/runtime/buffered_block_mgr2_test.cpp +++ b/be/test/runtime/buffered_block_mgr2_test.cpp @@ -529,8 +529,8 @@ protected: boost::thread_group workers; for (int i = 0; i < num_threads; ++i) { - thread* t = new boost::thread(boost::bind(&BufferedBlockMgrTest::TestRandomInternalImpl, - this, state, block_mgr, max_num_buffers, i)); + thread* t = new boost::thread(std::bind(&BufferedBlockMgrTest::TestRandomInternalImpl, + this, state, block_mgr, max_num_buffers, i)); workers.add_thread(t); } workers.join_all(); @@ -561,7 +561,7 @@ protected: _test_env->exec_env()); for (int i = 0; i < num_threads; ++i) { thread* t = new boost::thread( - boost::bind(&BufferedBlockMgrTest::CreateDestroyThread, this, i, shared_state)); + std::bind(&BufferedBlockMgrTest::CreateDestroyThread, this, i, shared_state)); workers.add_thread(t); } workers.join_all(); diff --git a/be/test/runtime/disk_io_mgr_test.cpp b/be/test/runtime/disk_io_mgr_test.cpp index cc55459e33..f1a64e3e2b 100644 --- a/be/test/runtime/disk_io_mgr_test.cpp +++ b/be/test/runtime/disk_io_mgr_test.cpp @@ -36,7 +36,7 @@ using std::list; using std::lock_guard; using std::unique_lock; using std::mutex; -using boost::mem_fn; +using std::mem_fn; using std::condition_variable; using boost::scoped_ptr; using boost::thread; diff --git a/be/test/runtime/thread_resource_mgr_test.cpp b/be/test/runtime/thread_resource_mgr_test.cpp index c96f358fc1..711852fdb5 100644 --- a/be/test/runtime/thread_resource_mgr_test.cpp +++ b/be/test/runtime/thread_resource_mgr_test.cpp @@ -49,7 +49,7 @@ TEST(ThreadResourceMgr, BasicTest) { ThreadResourceMgr::ResourcePool* c1 = mgr.register_pool(); c1->set_thread_available_cb( - boost::bind(boost::mem_fn(&NotifiedCounter::Notify), &counter1, _1)); + std::bind(std::mem_fn(&NotifiedCounter::Notify), &counter1, _1)); c1->acquire_thread_token(); c1->acquire_thread_token(); c1->acquire_thread_token(); @@ -76,7 +76,7 @@ TEST(ThreadResourceMgr, BasicTest) { // Register a new consumer, quota is cut in half ThreadResourceMgr::ResourcePool* c2 = mgr.register_pool(); c2->set_thread_available_cb( - boost::bind(boost::mem_fn(&NotifiedCounter::Notify), &counter2, _1)); + std::bind(std::mem_fn(&NotifiedCounter::Notify), &counter2, _1)); EXPECT_FALSE(c1->try_acquire_thread_token()); EXPECT_EQ(c1->num_threads(), 3); c1->acquire_thread_token(); diff --git a/be/test/util/blocking_queue_test.cpp b/be/test/util/blocking_queue_test.cpp index 4dbf1b47b2..94dcd8cbf4 100644 --- a/be/test/util/blocking_queue_test.cpp +++ b/be/test/util/blocking_queue_test.cpp @@ -92,15 +92,15 @@ public: void Run() { for (int i = 0; i < _nthreads; ++i) { _threads.push_back(boost::shared_ptr( - new boost::thread(boost::bind(&MultiThreadTest::inserter_thread, this, i)))); + new boost::thread(std::bind(&MultiThreadTest::inserter_thread, this, i)))); _threads.push_back(boost::shared_ptr( - new boost::thread(boost::bind(&MultiThreadTest::RemoverThread, this)))); + new boost::thread(std::bind(&MultiThreadTest::RemoverThread, this)))); } // We add an extra thread to ensure that there aren't enough elements in // the queue to go around. This way, we test removal after shutdown. _threads.push_back(boost::shared_ptr( - new boost::thread(boost::bind(&MultiThreadTest::RemoverThread, this)))); + new boost::thread(std::bind(&MultiThreadTest::RemoverThread, this)))); for (int i = 0; i < _threads.size(); ++i) { _threads[i]->join();