diff --git a/be/src/agent/topic_subscriber.cpp b/be/src/agent/topic_subscriber.cpp index abc9b305ac..ec040c977d 100644 --- a/be/src/agent/topic_subscriber.cpp +++ b/be/src/agent/topic_subscriber.cpp @@ -38,13 +38,13 @@ TopicSubscriber::~TopicSubscriber() { void TopicSubscriber::register_listener(TTopicType::type topic_type, TopicListener* listener) { // Unique lock here to prevent access to listeners - boost::unique_lock lock(_listener_mtx); + std::unique_lock lock(_listener_mtx); this->_registered_listeners[topic_type].push_back(listener); } void TopicSubscriber::handle_updates(const TAgentPublishRequest& agent_publish_request) { // Shared lock here in order to avoid updates in listeners' map - boost::shared_lock lock(_listener_mtx); + std::shared_lock lock(_listener_mtx); // Currently, not deal with protocol version, the listener should deal with protocol version const std::vector& topic_updates = agent_publish_request.updates; std::vector::const_iterator topic_update_it = topic_updates.begin(); diff --git a/be/src/agent/topic_subscriber.h b/be/src/agent/topic_subscriber.h index be71cda3c4..6048c997f3 100644 --- a/be/src/agent/topic_subscriber.h +++ b/be/src/agent/topic_subscriber.h @@ -20,6 +20,7 @@ #include #include +#include #include "agent/topic_listener.h" #include "gen_cpp/AgentService_types.h" @@ -37,7 +38,7 @@ public: private: std::map> _registered_listeners; - boost::shared_mutex _listener_mtx; + std::shared_mutex _listener_mtx; }; } // namespace doris #endif diff --git a/be/src/common/names.h b/be/src/common/names.h deleted file mode 100644 index 890b562e5b..0000000000 --- a/be/src/common/names.h +++ /dev/null @@ -1,174 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#ifdef _GLIBCXX_VECTOR -using std::vector; -#endif - -#ifdef _GLIBCXX_MAP -using std::map; -using std::multimap; -#endif - -#ifdef _GLIBCXX_LIST -using std::list; -#endif - -#ifdef _GLIBCXX_SET -using std::set; -using std::multiset; -#endif - -#ifdef _GLIBCXX_STACK -using std::stack; -#endif - -#ifdef _GLIBCXX_QUEUE -using std::queue; -#endif - -#ifdef _GLIBCXX_DEQUE -using std::deque; -#endif - -#ifdef _GLIBCXX_STRING -using std::string; -#endif - -#ifdef _GLIBCXX_IOSTREAM -using std::cout; -using std::cin; -using std::cerr; -#endif - -#ifdef _GLIBCXX_OSTREAM -using std::ostream; -using std::endl; -#endif - -#ifdef _GLIBCXX_IOS -using std::fixed; -using std::hex; -using std::oct; -using std::dec; -using std::left; -using std::ios; -#endif - -#ifdef _GLIBCXX_IOMANIP -using std::setprecision; -using std::setfill; -using std::setw; -#endif - -#ifdef _GLIBCXX_FSTREAM -using std::fstream; -using std::ifstream; -using std::ofstream; -#endif - -#ifdef _GLIBCXX_SSTREAM -using std::stringstream; -using std::istringstream; -using std::ostringstream; -#endif - -#ifdef _GLIBCXX_ALGORITHM -using std::swap; -using std::min; -using std::max; -using std::sort; -#endif - -#ifdef _GLIBCXX_MEMORY -using std::make_shared; -using std::shared_ptr; -using std::unique_ptr; -#endif - -#ifdef _GLIBCXX_UTILITY -using std::move; -#endif - -#ifdef _NEW -using std::nothrow; -#endif - -#ifdef BOOST_THREAD_THREAD_COMMON_HPP -using boost::thread; -#endif - -#ifdef BOOST_THREAD_DETAIL_THREAD_GROUP_HPP -using boost::thread_group; -#endif - -#ifdef BOOST_THREAD_MUTEX_HPP -using boost::mutex; -using boost::try_mutex; -#endif - -#ifdef BOOST_LEXICAL_CAST_INCLUDED -using boost::lexical_cast; -#endif - -#ifdef BOOST_THREAD_PTHREAD_SHARED_MUTEX_HPP -using boost::shared_mutex; -#endif - -/// In older versions of boost, when including mutex.hpp, it would include locks.hpp that -/// would in turn provide lock_guard<>. In more recent versions, including mutex.hpp would -/// include lock_types.hpp that does not provide lock_guard<>. This check verifies if boost -/// locks have been included and makes sure to only include lock_guard if the provided lock -/// implementations were not included using lock_types.hpp (for older boost versions) or if -/// lock_guard.hpp was explicitly included. -#if (defined(BOOST_THREAD_LOCKS_HPP) && BOOST_VERSION < 105300) || \ - defined(BOOST_THREAD_LOCK_GUARD_HPP) -using boost::lock_guard; -#endif - -#if defined(BOOST_THREAD_LOCKS_HPP) || defined(BOOST_THREAD_LOCK_TYPES_HPP) -using boost::unique_lock; -using boost::shared_lock; -using boost::upgrade_lock; -#endif - -#ifdef BOOST_SMART_PTR_SCOPED_PTR_HPP_INCLUDED -using boost::scoped_ptr; -#endif - -#ifdef BOOST_UNORDERED_MAP_HPP_INCLUDED -using boost::unordered_map; -#endif - -#ifdef BOOST_UNORDERED_SET_HPP_INCLUDED -using boost::unordered_set; -#endif - -#ifdef BOOST_FUNCTION_PROLOGUE_HPP -using boost::function; -#endif - -#ifdef BOOST_BIND_HPP_INCLUDED -using boost::bind; -using boost::mem_fn; -#endif - -#ifdef STRINGS_SUBSTITUTE_H_ -using strings::Substitute; -#endif diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h index 054a4108b1..78cde83f50 100644 --- a/be/src/common/object_pool.h +++ b/be/src/common/object_pool.h @@ -18,8 +18,7 @@ #ifndef DORIS_BE_SRC_COMMON_COMMON_OBJECT_POOL_H #define DORIS_BE_SRC_COMMON_COMMON_OBJECT_POOL_H -#include -#include +#include #include #include "util/spinlock.h" @@ -41,13 +40,13 @@ public: // TODO: Consider using a lock-free structure. SpecificElement* obj = new SpecificElement(t); DCHECK(obj != NULL); - boost::lock_guard l(_lock); + std::lock_guard l(_lock); _objects.push_back(obj); return t; } void clear() { - boost::lock_guard l(_lock); + std::lock_guard l(_lock); for (auto i = _objects.rbegin(); i != _objects.rend(); ++i) { delete *i; } diff --git a/be/src/exec/cross_join_node.h b/be/src/exec/cross_join_node.h index 54cb4fb483..76bca882c3 100644 --- a/be/src/exec/cross_join_node.h +++ b/be/src/exec/cross_join_node.h @@ -20,8 +20,8 @@ #include #include -#include #include +#include #include "exec/blocking_join_node.h" #include "exec/exec_node.h" diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index f0decce8f3..74baa27e68 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -50,8 +50,8 @@ using std::string; using std::stringstream; using std::vector; using std::map; -using boost::lock_guard; -using boost::mutex; +using std::lock_guard; +using std::mutex; // Superclass of all executor nodes. // All subclasses need to make sure to check RuntimeState::is_cancelled() @@ -304,7 +304,7 @@ protected: // Execution options that are determined at runtime. This is added to the // runtime profile at close(). Examples for options logged here would be // "Codegen Enabled" - boost::mutex _exec_options_lock; + std::mutex _exec_options_lock; std::string _runtime_exec_options; /// Buffer pool client for this node. Initialized with the node's minimum reservation diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h index 63e0505031..bfe2b36e96 100644 --- a/be/src/exec/hash_join_node.h +++ b/be/src/exec/hash_join_node.h @@ -20,8 +20,8 @@ #include #include -#include #include +#include #include "exec/exec_node.h" #include "exec/hash_table.h" @@ -67,7 +67,7 @@ private: bool _is_push_down; // for right outer joins, keep track of what's been joined - typedef boost::unordered_set BuildTupleRowSet; + typedef std::unordered_set BuildTupleRowSet; BuildTupleRowSet _joined_build_rows; TJoinOp::type _join_op; @@ -106,7 +106,7 @@ private: boost::scoped_ptr _probe_batch; int _probe_batch_pos; // current scan pos in _probe_batch int _probe_counter; - bool _probe_eos; // if true, probe child has no more rows to process + bool _probe_eos; // if true, probe child has no more rows to process TupleRow* _current_probe_row; // _build_tuple_idx[i] is the tuple index of child(1)'s tuple[i] in the output row diff --git a/be/src/exec/merge_join_node.h b/be/src/exec/merge_join_node.h index 3574520b10..5b15eef7cd 100644 --- a/be/src/exec/merge_join_node.h +++ b/be/src/exec/merge_join_node.h @@ -20,8 +20,8 @@ #include #include -#include #include +#include #include "exec/exec_node.h" #include "gen_cpp/PlanNodes_types.h" // for TJoinOp diff --git a/be/src/exec/schema_scanner/schema_helper.cpp b/be/src/exec/schema_scanner/schema_helper.cpp index d1174bd459..0824705fa2 100644 --- a/be/src/exec/schema_scanner/schema_helper.cpp +++ b/be/src/exec/schema_scanner/schema_helper.cpp @@ -18,7 +18,6 @@ #include "exec/schema_scanner/schema_helper.h" #include -#include #include #include diff --git a/be/src/exprs/agg_fn_evaluator.h b/be/src/exprs/agg_fn_evaluator.h index 855ca15526..ca7b8134d2 100644 --- a/be/src/exprs/agg_fn_evaluator.h +++ b/be/src/exprs/agg_fn_evaluator.h @@ -143,8 +143,6 @@ public: static const size_t DOUBLE_SIZE = sizeof(double); static const size_t DECIMAL_SIZE = sizeof(DecimalValue); static const size_t DECIMALV2_SIZE = sizeof(DecimalV2Value); - static const size_t TIME_DURATION_SIZE = sizeof(boost::posix_time::time_duration); - static const size_t DATE_SIZE = sizeof(boost::gregorian::date); static const size_t LARGEINT_SIZE = sizeof(__int128); // DATETIME VAL has two part: packet_time is 8 byte, and type is 4 byte // MySQL packet time : int64_t packed_time; diff --git a/be/src/exprs/in_predicate.h b/be/src/exprs/in_predicate.h index 55fa7433df..d544911a38 100644 --- a/be/src/exprs/in_predicate.h +++ b/be/src/exprs/in_predicate.h @@ -19,8 +19,8 @@ #define DORIS_BE_SRC_QUERY_EXPRS_IN_PREDICATE_H #include -#include #include +#include #include "exprs/hybrid_set.h" #include "exprs/predicate.h" diff --git a/be/src/exprs/new_agg_fn_evaluator.h b/be/src/exprs/new_agg_fn_evaluator.h index ca13c962c6..ecf13d7376 100644 --- a/be/src/exprs/new_agg_fn_evaluator.h +++ b/be/src/exprs/new_agg_fn_evaluator.h @@ -163,8 +163,6 @@ public: static const size_t DOUBLE_SIZE = sizeof(double); static const size_t DECIMAL_SIZE = sizeof(DecimalValue); static const size_t DECIMALV2_SIZE = sizeof(DecimalV2Value); - static const size_t TIME_DURATION_SIZE = sizeof(boost::posix_time::time_duration); - static const size_t DATE_SIZE = sizeof(boost::gregorian::date); static const size_t LARGEINT_SIZE = sizeof(__int128); // DATETIME VAL has two part: packet_time is 8 byte, and type is 4 byte diff --git a/be/src/exprs/new_in_predicate.h b/be/src/exprs/new_in_predicate.h index 111b30200d..161700734c 100644 --- a/be/src/exprs/new_in_predicate.h +++ b/be/src/exprs/new_in_predicate.h @@ -334,7 +334,7 @@ private: bool contains_null; /// The set of all non-NULL constant values in the IN list. - /// Note: boost::unordered_set and std::binary_search performed worse based on the + /// Note: std::unordered_set and std::binary_search performed worse based on the /// in-predicate-benchmark std::set val_set; diff --git a/be/src/http/action/restore_tablet_action.cpp b/be/src/http/action/restore_tablet_action.cpp index 29cdc5f11d..d207ca54c8 100644 --- a/be/src/http/action/restore_tablet_action.cpp +++ b/be/src/http/action/restore_tablet_action.cpp @@ -23,7 +23,6 @@ #include #include "agent/cgroups_mgr.h" -#include "boost/lexical_cast.hpp" #include "env/env.h" #include "gutil/strings/substitute.h" // for Substitute #include "http/http_channel.h" diff --git a/be/src/http/web_page_handler.cpp b/be/src/http/web_page_handler.cpp index 98eb9fc3e1..a4424c930b 100644 --- a/be/src/http/web_page_handler.cpp +++ b/be/src/http/web_page_handler.cpp @@ -74,7 +74,7 @@ void WebPageHandler::register_template_page(const std::string& path, const strin void WebPageHandler::register_page(const std::string& path, const string& alias, const PageHandlerCallback& callback, bool is_on_nav_bar) { - boost::mutex::scoped_lock lock(_map_lock); + std::unique_lock lock(_map_lock); CHECK(_page_map.find(path) == _page_map.end()); // first time, register this to web server _http_server->register_handler(HttpMethod::GET, path, this); @@ -85,7 +85,7 @@ void WebPageHandler::handle(HttpRequest* req) { VLOG_TRACE << req->debug_string(); PathHandler* handler = nullptr; { - boost::mutex::scoped_lock lock(_map_lock); + std::unique_lock lock(_map_lock); auto iter = _page_map.find(req->raw_path()); if (iter != _page_map.end()) { handler = iter->second; diff --git a/be/src/http/web_page_handler.h b/be/src/http/web_page_handler.h index b697d8693c..bf47e0033e 100644 --- a/be/src/http/web_page_handler.h +++ b/be/src/http/web_page_handler.h @@ -19,8 +19,8 @@ #define DORIS_BE_SRC_COMMON_UTIL_WEB_PAGE_HANDLER_H #include -#include #include +#include #include #include #include @@ -112,7 +112,7 @@ private: std::string _www_path; EvHttpServer* _http_server; // Lock guarding the _path_handlers map - boost::mutex _map_lock; + std::mutex _map_lock; // Map of path to a PathHandler containing a list of handlers for that // path. More than one handler may register itself with a path so that many // components may contribute to a single page. diff --git a/be/src/olap/olap_index.h b/be/src/olap/olap_index.h index 1a5e7e129e..08f6f6cdca 100644 --- a/be/src/olap/olap_index.h +++ b/be/src/olap/olap_index.h @@ -18,9 +18,9 @@ #ifndef DORIS_BE_SRC_OLAP_OLAP_INDEX_H #define DORIS_BE_SRC_OLAP_OLAP_INDEX_H -#include -#include +#include #include +#include #include #include #include diff --git a/be/src/olap/rowset/segment_group.cpp b/be/src/olap/rowset/segment_group.cpp index c1d7fa2a99..f7c4aaa96c 100644 --- a/be/src/olap/rowset/segment_group.cpp +++ b/be/src/olap/rowset/segment_group.cpp @@ -359,7 +359,7 @@ OLAPStatus SegmentGroup::load(bool use_cache) { return OLAP_SUCCESS; } OLAPStatus res = OLAP_ERR_INDEX_LOAD_ERROR; - boost::lock_guard guard(_index_load_lock); + std::lock_guard guard(_index_load_lock); if (_index_loaded) { return OLAP_SUCCESS; @@ -704,7 +704,7 @@ OLAPStatus SegmentGroup::finalize_segment(uint32_t data_segment_size, int64_t nu } VLOG_NOTICE << "finalize_segment. file_name=" << _current_file_handler.file_name() - << ", file_length=" << file_length; + << ", file_length=" << file_length; if ((res = _current_file_handler.close()) != OLAP_SUCCESS) { OLAP_LOG_WARNING("close file error. [err=%m]"); @@ -811,8 +811,8 @@ OLAPStatus SegmentGroup::convert_from_old_files(const std::string& snapshot_path << ", to=" << new_data_file_name << ", errno=" << Errno::no(); return OLAP_ERR_OS_ERROR; } else { - VLOG_NOTICE << "link data file from " << old_data_file_name << " to " << new_data_file_name - << " successfully"; + VLOG_NOTICE << "link data file from " << old_data_file_name << " to " + << new_data_file_name << " successfully"; } success_links->push_back(new_data_file_name); std::string new_index_file_name = @@ -831,7 +831,7 @@ OLAPStatus SegmentGroup::convert_from_old_files(const std::string& snapshot_path return OLAP_ERR_OS_ERROR; } else { VLOG_NOTICE << "link index file from " << old_index_file_name << " to " - << new_index_file_name << " successfully"; + << new_index_file_name << " successfully"; } success_links->push_back(new_index_file_name); } @@ -856,7 +856,7 @@ OLAPStatus SegmentGroup::convert_to_old_files(const std::string& snapshot_path, success_links->push_back(old_data_file_name); } VLOG_NOTICE << "create hard link. from=" << new_data_file_name << ", " - << "to=" << old_data_file_name; + << "to=" << old_data_file_name; std::string new_index_file_name = construct_index_file_path(_rowset_path_prefix, segment_id); std::string old_index_file_name = construct_old_index_file_path(snapshot_path, segment_id); @@ -870,7 +870,7 @@ OLAPStatus SegmentGroup::convert_to_old_files(const std::string& snapshot_path, success_links->push_back(old_index_file_name); } VLOG_NOTICE << "create hard link. from=" << new_index_file_name << ", " - << "to=" << old_index_file_name; + << "to=" << old_index_file_name; } return OLAP_SUCCESS; } diff --git a/be/src/olap/rowset/segment_group.h b/be/src/olap/rowset/segment_group.h index 79bfd1edf4..e5a033f5e3 100644 --- a/be/src/olap/rowset/segment_group.h +++ b/be/src/olap/rowset/segment_group.h @@ -18,9 +18,9 @@ #ifndef DORIS_BE_SRC_OLAP_ROWSET_SEGMENT_GROUP_H #define DORIS_BE_SRC_OLAP_ROWSET_SEGMENT_GROUP_H -#include -#include +#include #include +#include #include #include #include @@ -305,7 +305,7 @@ private: bool _empty; // Lock held while loading the index. - mutable boost::mutex _index_load_lock; + mutable std::mutex _index_load_lock; size_t _current_num_rows_per_row_block; std::vector> _zone_maps; diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 3846fa525e..5041aa6c3c 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -95,7 +95,7 @@ Status BufferControlBlock::init() { } Status BufferControlBlock::add_batch(TFetchDataResult* result) { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); if (_is_cancelled) { return Status::Cancelled("Cancelled"); @@ -128,7 +128,7 @@ Status BufferControlBlock::add_batch(TFetchDataResult* result) { Status BufferControlBlock::get_batch(TFetchDataResult* result) { TFetchDataResult* item = NULL; { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); while (_batch_queue.empty() && !_is_close && !_is_cancelled) { _data_arrival.wait(l); @@ -172,7 +172,7 @@ Status BufferControlBlock::get_batch(TFetchDataResult* result) { } void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { - boost::lock_guard l(_lock); + std::lock_guard l(_lock); if (!_status.ok()) { ctx->on_failure(_status); return; @@ -205,7 +205,7 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { } Status BufferControlBlock::close(Status exec_status) { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); _is_close = true; _status = exec_status; @@ -227,7 +227,7 @@ Status BufferControlBlock::close(Status exec_status) { } Status BufferControlBlock::cancel() { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); _is_cancelled = true; _data_removal.notify_all(); _data_arrival.notify_all(); diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index f3e12e275a..9f497c8a15 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -18,10 +18,10 @@ #ifndef DORIS_BE_RUNTIME_BUFFER_CONTROL_BLOCK_H #define DORIS_BE_RUNTIME_BUFFER_CONTROL_BLOCK_H -#include -#include +#include #include #include +#include #include "common/status.h" #include "gen_cpp/Types_types.h" @@ -112,11 +112,11 @@ private: // blocking queue for batch ResultQueue _batch_queue; // protects all subsequent data in this block - boost::mutex _lock; + std::mutex _lock; // signal arrival of new batch or the eos/cancelled condition - boost::condition_variable _data_arrival; + std::condition_variable _data_arrival; // signal removal of data by stream consumer - boost::condition_variable _data_removal; + std::condition_variable _data_removal; std::deque _waiting_rpc; diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc index 333a863e25..6fd97a8408 100644 --- a/be/src/runtime/buffered_block_mgr2.cc +++ b/be/src/runtime/buffered_block_mgr2.cc @@ -41,11 +41,11 @@ using std::endl; using boost::bind; using boost::mem_fn; -using boost::lock_guard; -using boost::mutex; +using std::lock_guard; +using std::mutex; using boost::scoped_array; using boost::shared_ptr; -using boost::unique_lock; +using std::unique_lock; namespace doris { diff --git a/be/src/runtime/buffered_block_mgr2.h b/be/src/runtime/buffered_block_mgr2.h index 463ae819a6..9c17e8b62f 100644 --- a/be/src/runtime/buffered_block_mgr2.h +++ b/be/src/runtime/buffered_block_mgr2.h @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include "runtime/disk_io_mgr.h" #include "runtime/tmp_file_mgr.h" @@ -274,7 +274,7 @@ public: // Only used if _client_local is true. // TODO: Currently we use _block_mgr->_lock for this condvar. There is no reason to // use that _lock that is already overloaded, see IMPALA-1883. - boost::condition_variable _write_complete_cv; + std::condition_variable _write_complete_cv; // If true, this block is being written out so the underlying buffer can be // transferred to another block from the same client. We don't want this buffer @@ -456,7 +456,7 @@ private: // 2. Using a buffer from the free list (which is populated by moving blocks from // the unpinned list by writing them out). // Must be called with the _lock already taken. This function can block. - Status find_buffer(boost::unique_lock& lock, BufferDescriptor** buffer); + Status find_buffer(std::unique_lock& lock, BufferDescriptor** buffer); // Writes unpinned blocks via DiskIoMgr until one of the following is true: // 1. The number of outstanding writes >= (_block_write_threshold - num free buffers) @@ -521,7 +521,7 @@ private: // used for the blocking condvars: _buffer_available_cv and block->_write_complete_cv. // TODO: We should break the protection of the various structures and usages to // different spinlocks and a mutex to be used in the wait()s, see IMPALA-1883. - boost::mutex _lock; + std::mutex _lock; // If true, init() has been called. bool _initialized; @@ -537,7 +537,7 @@ private: int _non_local_outstanding_writes; // Signal availability of free buffers. - boost::condition_variable _buffer_available_cv; + std::condition_variable _buffer_available_cv; // List of blocks _is_pinned = false AND are not on DiskIoMgr's write queue. // Blocks are added to and removed from the back of the list. (i.e. in LIFO order). @@ -622,7 +622,7 @@ private: // map contains only weak ptrs. BufferedBlockMgr2s that are handed out are shared ptrs. // When all the shared ptrs are no longer referenced, the BufferedBlockMgr2 // d'tor will be called at which point the weak ptr will be removed from the map. - typedef boost::unordered_map> BlockMgrsMap; + typedef std::unordered_map> BlockMgrsMap; static BlockMgrsMap _s_query_to_block_mgrs; // Unowned. diff --git a/be/src/runtime/bufferpool/buffer_pool.h b/be/src/runtime/bufferpool/buffer_pool.h index cdbc4a6108..f7cc1529ae 100644 --- a/be/src/runtime/bufferpool/buffer_pool.h +++ b/be/src/runtime/bufferpool/buffer_pool.h @@ -21,7 +21,6 @@ #include #include -#include #include #include diff --git a/be/src/runtime/bufferpool/buffer_pool_internal.h b/be/src/runtime/bufferpool/buffer_pool_internal.h index 11d4a1cbd7..fdfc1053c5 100644 --- a/be/src/runtime/bufferpool/buffer_pool_internal.h +++ b/be/src/runtime/bufferpool/buffer_pool_internal.h @@ -200,7 +200,7 @@ public: /// Wait for the in-flight write for 'page' to complete. /// 'lock_' must be held by the caller via 'client_lock'. page->buffer_lock should /// not be held. - //void WaitForWrite(boost::unique_lock* client_lock, Page* page); + //void WaitForWrite(std::unique_lock* client_lock, Page* page); /// Test helper: wait for all in-flight writes to complete. /// 'lock_' must not be held by the caller. diff --git a/be/src/runtime/bufferpool/free_list.h b/be/src/runtime/bufferpool/free_list.h index 8ee36ca380..2bde0bdd68 100644 --- a/be/src/runtime/bufferpool/free_list.h +++ b/be/src/runtime/bufferpool/free_list.h @@ -19,7 +19,6 @@ #define DORIS_BE_RUNTIME_BUFFERPOOL_FREE_LIST_H #include -#include #include #include diff --git a/be/src/runtime/bufferpool/reservation_tracker.h b/be/src/runtime/bufferpool/reservation_tracker.h index 1a7eb3fe7c..0c860f61cf 100644 --- a/be/src/runtime/bufferpool/reservation_tracker.h +++ b/be/src/runtime/bufferpool/reservation_tracker.h @@ -21,7 +21,6 @@ #include #include -#include #include #include "common/status.h" diff --git a/be/src/runtime/cache/cache_utils.h b/be/src/runtime/cache/cache_utils.h index 20c1dd9bb9..c6cdab2e7e 100644 --- a/be/src/runtime/cache/cache_utils.h +++ b/be/src/runtime/cache/cache_utils.h @@ -34,8 +34,8 @@ namespace doris { -typedef boost::shared_lock CacheReadLock; -typedef boost::unique_lock CacheWriteLock; +typedef std::shared_lock CacheReadLock; +typedef std::unique_lock CacheWriteLock; //#ifndef PARTITION_CACHE_DEV //#define PARTITION_CACHE_DEV diff --git a/be/src/runtime/cache/result_cache.h b/be/src/runtime/cache/result_cache.h index 0191b62fc4..ffd3b6d83d 100644 --- a/be/src/runtime/cache/result_cache.h +++ b/be/src/runtime/cache/result_cache.h @@ -102,7 +102,7 @@ private: //At the same time, multithreaded reading //Single thread updating and cleaning(only single be, Fe is not affected) - mutable boost::shared_mutex _cache_mtx; + mutable std::shared_mutex _cache_mtx; ResultNodeMap _node_map; //List of result nodes corresponding to SqlKey,last recently used at the tail ResultNodeList _node_list; diff --git a/be/src/runtime/cache/result_node.h b/be/src/runtime/cache/result_node.h index cfe5e1d807..2d5ca0de06 100644 --- a/be/src/runtime/cache/result_node.h +++ b/be/src/runtime/cache/result_node.h @@ -108,7 +108,7 @@ private: typedef int64 PartitionKey; typedef std::list PartitionRowBatchList; -typedef boost::unordered_map PartitionRowBatchMap; +typedef std::unordered_map PartitionRowBatchMap; /** * Cache the result of one SQL, include many partition rowsets. @@ -177,7 +177,7 @@ public: } private: - mutable boost::shared_mutex _node_mtx; + mutable std::shared_mutex _node_mtx; UniqueId _sql_key; ResultNode* _prev; ResultNode* _next; diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp index c519f15093..a46ef8627a 100644 --- a/be/src/runtime/client_cache.cpp +++ b/be/src/runtime/client_cache.cpp @@ -47,7 +47,7 @@ ClientCacheHelper::~ClientCacheHelper() { Status ClientCacheHelper::get_client(const TNetworkAddress& hostport, client_factory factory_method, void** client_key, int timeout_ms) { - boost::lock_guard lock(_lock); + std::lock_guard lock(_lock); //VLOG_RPC << "get_client(" << hostport << ")"; ClientCacheMap::iterator cache_entry = _client_cache.find(hostport); @@ -78,7 +78,7 @@ Status ClientCacheHelper::get_client(const TNetworkAddress& hostport, client_fac Status ClientCacheHelper::reopen_client(client_factory factory_method, void** client_key, int timeout_ms) { - boost::lock_guard lock(_lock); + std::lock_guard lock(_lock); ClientMap::iterator i = _client_map.find(*client_key); DCHECK(i != _client_map.end()); ThriftClientImpl* info = i->second; @@ -134,7 +134,7 @@ Status ClientCacheHelper::create_client(const TNetworkAddress& hostport, void ClientCacheHelper::release_client(void** client_key) { DCHECK(*client_key != NULL) << "Trying to release NULL client"; - boost::lock_guard lock(_lock); + std::lock_guard lock(_lock); ClientMap::iterator client_map_entry = _client_map.find(*client_key); DCHECK(client_map_entry != _client_map.end()); ThriftClientImpl* info = client_map_entry->second; @@ -163,7 +163,7 @@ void ClientCacheHelper::release_client(void** client_key) { } void ClientCacheHelper::close_connections(const TNetworkAddress& hostport) { - boost::lock_guard lock(_lock); + std::lock_guard lock(_lock); ClientCacheMap::iterator cache_entry = _client_cache.find(hostport); if (cache_entry == _client_cache.end()) { @@ -200,7 +200,7 @@ std::string ClientCacheHelper::debug_string() { void ClientCacheHelper::test_shutdown() { std::vector hostports; { - boost::lock_guard lock(_lock); + std::lock_guard lock(_lock); for (const ClientCacheMap::value_type& i : _client_cache) { hostports.push_back(i.first); } @@ -215,7 +215,7 @@ void ClientCacheHelper::test_shutdown() { void ClientCacheHelper::init_metrics(const std::string& name) { // Not strictly needed if init_metrics is called before any cache // usage, but ensures that _metrics_enabled is published. - boost::lock_guard lock(_lock); + std::lock_guard lock(_lock); _thrift_client_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( std::string("thrift_client.") + name, {{"name", name}}); diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h index b84db1c3c4..9ca367260e 100644 --- a/be/src/runtime/client_cache.h +++ b/be/src/runtime/client_cache.h @@ -19,13 +19,14 @@ #define DORIS_BE_RUNTIME_CLIENT_CACHE_H #include -#include -#include #include +#include #include +#include #include #include "common/status.h" +#include "util/hash_util.hpp" #include "util/metrics.h" #include "util/thrift_client.h" @@ -98,14 +99,14 @@ private: // Protects all member variables // TODO: have more fine-grained locks or use lock-free data structures, // this isn't going to scale for a high request rate - boost::mutex _lock; + std::mutex _lock; // map from (host, port) to list of client keys for that address - typedef boost::unordered_map> ClientCacheMap; + typedef std::unordered_map> ClientCacheMap; ClientCacheMap _client_cache; // Map from client key back to its associated ThriftClientImpl transport - typedef boost::unordered_map ClientMap; + typedef std::unordered_map ClientMap; ClientMap _client_map; bool _metrics_enabled; @@ -190,13 +191,15 @@ public: typedef ThriftClient Client; ClientCache() : _client_cache_helper() { - _client_factory = boost::bind(boost::mem_fn(&ClientCache::make_client), - this, boost::placeholders::_1, boost::placeholders::_2); + _client_factory = + boost::bind(boost::mem_fn(&ClientCache::make_client), this, + boost::placeholders::_1, boost::placeholders::_2); } ClientCache(int max_cache_size) : _client_cache_helper(max_cache_size) { - _client_factory = boost::bind(boost::mem_fn(&ClientCache::make_client), - this, boost::placeholders::_1, boost::placeholders::_2); + _client_factory = + boost::bind(boost::mem_fn(&ClientCache::make_client), this, + boost::placeholders::_1, boost::placeholders::_2); } // Close all clients connected to the supplied address, (e.g., in diff --git a/be/src/runtime/data_stream_mgr.cpp b/be/src/runtime/data_stream_mgr.cpp index 3940cbc0a9..b8f43d1ec9 100644 --- a/be/src/runtime/data_stream_mgr.cpp +++ b/be/src/runtime/data_stream_mgr.cpp @@ -18,7 +18,6 @@ #include "runtime/data_stream_mgr.h" #include -#include #include #include @@ -37,11 +36,10 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(data_stream_receiver_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_endpoint_count, MetricUnit::NOUNIT); -using boost::mutex; +using std::mutex; using boost::shared_ptr; -using boost::unique_lock; -using boost::try_mutex; -using boost::lock_guard; +using std::unique_lock; +using std::lock_guard; DataStreamMgr::DataStreamMgr() { REGISTER_HOOK_METRIC(data_stream_receiver_count, [this]() { diff --git a/be/src/runtime/data_stream_mgr.h b/be/src/runtime/data_stream_mgr.h index 2fec5ef897..a8f3e87ed5 100644 --- a/be/src/runtime/data_stream_mgr.h +++ b/be/src/runtime/data_stream_mgr.h @@ -19,12 +19,12 @@ #define DORIS_BE_SRC_RUNTIME_DATA_STREAM_MGR_H #include -#include -#include -#include -#include +#include #include +#include #include +#include +#include #include "common/object_pool.h" #include "common/status.h" @@ -91,14 +91,14 @@ private: friend class DataStreamSender; // protects all fields below - boost::mutex _lock; + std::mutex _lock; // map from hash value of fragment instance id/node id pair to stream receivers; // Ownership of the stream revcr is shared between this instance and the caller of // create_recvr(). // we don't want to create a map, DataStreamRecvr*>, // because that requires a bunch of copying of ids for lookup - typedef boost::unordered_multimap> StreamMap; + typedef std::unordered_multimap> StreamMap; StreamMap _receiver_map; // less-than ordering for pair diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index 213b7dc851..b1f4443bc9 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -19,9 +19,8 @@ #include -#include -#include #include +#include #include #include @@ -39,13 +38,12 @@ using std::vector; using std::pair; using std::make_pair; -using boost::condition_variable; -using boost::mutex; +using std::condition_variable; +using std::mutex; using boost::scoped_ptr; -using boost::unique_lock; -using boost::try_lock; -using boost::try_mutex; -using boost::lock_guard; +using std::unique_lock; +using std::try_lock; +using std::lock_guard; using boost::mem_fn; namespace doris { @@ -53,7 +51,7 @@ namespace doris { class ThreadClosure : public google::protobuf::Closure { public: void Run() { _cv.notify_one(); } - void wait(unique_lock& lock) { _cv.wait(lock); } + void wait(unique_lock& lock) { _cv.wait(lock); } private: condition_variable _cv; @@ -330,7 +328,7 @@ void DataStreamRecvr::SenderQueue::cancel() { // _recvr->_bytes_received_time_series_counter); { - boost::lock_guard l(_lock); + std::lock_guard l(_lock); for (auto closure_pair : _pending_closures) { closure_pair.first->Run(); } @@ -343,7 +341,7 @@ void DataStreamRecvr::SenderQueue::close() { // If _is_cancelled is not set to true, there may be concurrent send // which add batch to _batch_queue. The batch added after _batch_queue // is clear will be memory leak - boost::lock_guard l(_lock); + std::lock_guard l(_lock); _is_cancelled = true; for (auto closure_pair : _pending_closures) { diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h index e090a69633..6347017be2 100644 --- a/be/src/runtime/data_stream_recvr.h +++ b/be/src/runtime/data_stream_recvr.h @@ -19,7 +19,7 @@ #define DORIS_BE_SRC_RUNTIME_DATA_STREAM_RECVR_H #include -#include +#include #include "common/object_pool.h" #include "common/status.h" @@ -89,7 +89,8 @@ public: // queues. The exprs used in less_than must have already been prepared and opened. Status create_merger(const TupleRowComparator& less_than); - Status create_parallel_merger(const TupleRowComparator& less_than, uint32_t batch_size, MemTracker* mem_tracker); + Status create_parallel_merger(const TupleRowComparator& less_than, uint32_t batch_size, + MemTracker* mem_tracker); // Fill output_batch with the next batch of rows obtained by merging the per-sender // input streams. Must only be called if _is_merging is true. Status get_next(RowBatch* output_batch, bool* eos); diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc index 20b3d0d939..73ed8f24bc 100644 --- a/be/src/runtime/disk_io_mgr.cc +++ b/be/src/runtime/disk_io_mgr.cc @@ -27,9 +27,9 @@ using std::vector; using std::list; using std::endl; -using boost::lock_guard; -using boost::unique_lock; -using boost::mutex; +using std::lock_guard; +using std::unique_lock; +using std::mutex; using boost::thread; using boost::thread_group; diff --git a/be/src/runtime/disk_io_mgr.h b/be/src/runtime/disk_io_mgr.h index 4d6a03b85a..9806b78d1f 100644 --- a/be/src/runtime/disk_io_mgr.h +++ b/be/src/runtime/disk_io_mgr.h @@ -19,11 +19,11 @@ #define DORIS_BE_SRC_QUERY_RUNTIME_DISK_IO_MGR_H #include -#include -#include #include -#include +#include #include +#include +#include #include #include "common/atomic.h" @@ -445,7 +445,7 @@ public: // Lock protecting fields below. // This lock should not be taken during Open/Read/Close. - boost::mutex _lock; + std::mutex _lock; // Number of bytes read so far for this scan range int _bytes_read; @@ -467,7 +467,7 @@ public: // IO buffers that are queued for this scan range. // Condition variable for get_next - boost::condition_variable _buffer_ready_cv; + std::condition_variable _buffer_ready_cv; std::list _ready_buffers; // The soft capacity limit for _ready_buffers. _ready_buffers can exceed @@ -482,7 +482,7 @@ public: // that the disk threads are finished with HDFS calls before _is_cancelled is set // to true and cleanup starts. // If this lock and _lock need to be taken, _lock must be taken first. - boost::mutex _hdfs_lock; + std::mutex _hdfs_lock; // If true, this scan range has been cancelled. bool _is_cancelled; @@ -729,7 +729,7 @@ private: boost::scoped_ptr _request_context_cache; // Protects _free_buffers and _free_buffer_descs - boost::mutex _free_buffers_lock; + std::mutex _free_buffers_lock; // Free buffers that can be handed out to clients. There is one list for each buffer // size, indexed by the Log2 of the buffer size in units of _min_buffer_size. The diff --git a/be/src/runtime/disk_io_mgr_internal.h b/be/src/runtime/disk_io_mgr_internal.h index 4001e97c0c..897ab5df59 100644 --- a/be/src/runtime/disk_io_mgr_internal.h +++ b/be/src/runtime/disk_io_mgr_internal.h @@ -20,7 +20,6 @@ #include -#include #include #include "common/logging.h" @@ -41,13 +40,13 @@ struct DiskIoMgr::DiskQueue { int disk_id; // Lock that protects access to 'request_contexts' and 'work_available' - boost::mutex lock; + std::mutex lock; // Condition variable to signal the disk threads that there is work to do or the // thread should shut down. A disk thread will be woken up when there is a reader // added to the queue. A reader is only on the queue when it has at least one // scan range that is not blocked on available buffers. - boost::condition_variable work_available; + std::condition_variable work_available; // list of all request contexts that have work queued on this disk std::list request_contexts; @@ -55,7 +54,7 @@ struct DiskIoMgr::DiskQueue { // Enqueue the request context to the disk queue. The DiskQueue lock must not be taken. inline void enqueue_context(RequestContext* worker) { { - boost::unique_lock disk_lock(lock); + std::unique_lock disk_lock(lock); // Check that the reader is not already on the queue DCHECK(find(request_contexts.begin(), request_contexts.end(), worker) == request_contexts.end()); @@ -264,7 +263,7 @@ private: // All fields below are accessed by multiple threads and the lock needs to be // taken before accessing them. - boost::mutex _lock; + std::mutex _lock; // Current state of the reader State _state; @@ -289,13 +288,13 @@ private: // We currently populate one range per disk. // TODO: think about this some more. InternalQueue _ready_to_start_ranges; - boost::condition_variable _ready_to_start_ranges_cv; // used with _lock + std::condition_variable _ready_to_start_ranges_cv; // used with _lock // Ranges that are blocked due to back pressure on outgoing buffers. InternalQueue _blocked_ranges; // Condition variable for UnregisterContext() to wait for all disks to complete - boost::condition_variable _disks_complete_cond_var; + std::condition_variable _disks_complete_cond_var; // Struct containing state per disk. See comments in the disk read loop on how // they are used. diff --git a/be/src/runtime/disk_io_mgr_reader_context.cc b/be/src/runtime/disk_io_mgr_reader_context.cc index b07466cc05..dbd5aafeb2 100644 --- a/be/src/runtime/disk_io_mgr_reader_context.cc +++ b/be/src/runtime/disk_io_mgr_reader_context.cc @@ -25,9 +25,9 @@ using std::vector; using std::list; using std::endl; -using boost::lock_guard; -using boost::unique_lock; -using boost::mutex; +using std::lock_guard; +using std::unique_lock; +using std::mutex; void DiskIoMgr::RequestContext::cancel(const Status& status) { DCHECK(!status.ok()); diff --git a/be/src/runtime/disk_io_mgr_scan_range.cc b/be/src/runtime/disk_io_mgr_scan_range.cc index b8169a0fe9..a7ffff7638 100644 --- a/be/src/runtime/disk_io_mgr_scan_range.cc +++ b/be/src/runtime/disk_io_mgr_scan_range.cc @@ -26,9 +26,9 @@ using std::vector; using std::list; using std::endl; -using boost::lock_guard; -using boost::unique_lock; -using boost::mutex; +using std::lock_guard; +using std::unique_lock; +using std::mutex; namespace doris { diff --git a/be/src/runtime/initial_reservations.cc b/be/src/runtime/initial_reservations.cc index bd52dcbc2b..adbc2be09d 100644 --- a/be/src/runtime/initial_reservations.cc +++ b/be/src/runtime/initial_reservations.cc @@ -19,8 +19,8 @@ #include -#include #include +#include #include "common/logging.h" #include "common/object_pool.h" @@ -37,8 +37,8 @@ InitialReservations::InitialReservations(ObjectPool* obj_pool, ReservationTracker* query_reservation, std::shared_ptr query_mem_tracker, int64_t initial_reservation_total_claims) - : initial_reservation_mem_tracker_(MemTracker::CreateTracker(-1, "InitialReservations", - query_mem_tracker, false)), + : initial_reservation_mem_tracker_( + MemTracker::CreateTracker(-1, "InitialReservations", query_mem_tracker, false)), remaining_initial_reservation_claims_(initial_reservation_total_claims) { initial_reservations_.InitChildTracker(nullptr, query_reservation, initial_reservation_mem_tracker_.get(), diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 6eceb75d5f..b3dd97db5a 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -19,8 +19,7 @@ #include -#include -#include +#include #include "common/logging.h" #include "common/object_pool.h" @@ -243,7 +242,7 @@ Status PlanFragmentExecutor::open() { // TODO: if no report thread is started, make sure to send a final profile // at end, otherwise the coordinator hangs in case we finish w/ an error if (!_report_status_cb.empty() && config::status_report_interval > 0) { - boost::unique_lock l(_report_thread_lock); + std::unique_lock l(_report_thread_lock); _report_thread = boost::thread(&PlanFragmentExecutor::report_profile, this); // make sure the thread started up, otherwise report_profile() might get into a race // with stop_report_thread() @@ -327,7 +326,7 @@ Status PlanFragmentExecutor::open_internal() { _collect_query_statistics(); Status status; { - boost::lock_guard l(_status_lock); + std::lock_guard l(_status_lock); status = _status; } status = _sink->close(runtime_state(), status); @@ -355,7 +354,7 @@ void PlanFragmentExecutor::_collect_query_statistics() { void PlanFragmentExecutor::report_profile() { VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id(); DCHECK(!_report_status_cb.empty()); - boost::unique_lock l(_report_thread_lock); + std::unique_lock l(_report_thread_lock); // tell Open() that we started _report_thread_started_cv.notify_one(); @@ -364,21 +363,18 @@ void PlanFragmentExecutor::report_profile() { // updates at once so its better for contention as well as smoother progress // reporting. int report_fragment_offset = rand() % config::status_report_interval; - boost::system_time timeout = - boost::get_system_time() + boost::posix_time::seconds(report_fragment_offset); // We don't want to wait longer than it takes to run the entire fragment. - _stop_report_thread_cv.timed_wait(l, timeout); + _stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset)); bool is_report_profile_interval = _is_report_success && config::status_report_interval > 0; while (_report_thread_active) { if (is_report_profile_interval) { - boost::system_time timeout = boost::get_system_time() + - boost::posix_time::seconds(config::status_report_interval); - // timed_wait can return because the timeout occurred or the condition variable + // wait_for can return because the timeout occurred or the condition variable // was signaled. We can't rely on its return value to distinguish between the // two cases (e.g. there is a race here where the wait timed out but before grabbing // the lock, the condition variable was signaled). Instead, we will use an external // flag, _report_thread_active, to coordinate this. - _stop_report_thread_cv.timed_wait(l, timeout); + _stop_report_thread_cv.wait_for(l, + std::chrono::seconds(config::status_report_interval)); } else { // Artificial triggering, such as show proc "/current_queries". _stop_report_thread_cv.wait(l); @@ -410,7 +406,7 @@ void PlanFragmentExecutor::send_report(bool done) { Status status; { - boost::lock_guard l(_status_lock); + std::lock_guard l(_status_lock); status = _status; } @@ -440,7 +436,7 @@ void PlanFragmentExecutor::stop_report_thread() { } { - boost::lock_guard l(_report_thread_lock); + std::lock_guard l(_report_thread_lock); _report_thread_active = false; } @@ -494,7 +490,7 @@ void PlanFragmentExecutor::update_status(const Status& new_status) { } { - boost::lock_guard l(_status_lock); + std::lock_guard l(_status_lock); // if current `_status` is ok, set it to `new_status` to record the error. if (_status.ok()) { if (new_status.is_mem_limit_exceeded()) { @@ -560,7 +556,7 @@ void PlanFragmentExecutor::close() { if (_prepared) { Status status; { - boost::lock_guard l(_status_lock); + std::lock_guard l(_status_lock); status = _status; } _sink->close(runtime_state(), status); diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 37f544a703..2ad9e8a3bb 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -20,6 +20,7 @@ #include #include +#include #include #include "common/object_pool.h" @@ -155,15 +156,15 @@ private: // profile reporting-related report_status_callback _report_status_cb; boost::thread _report_thread; - boost::mutex _report_thread_lock; + std::mutex _report_thread_lock; // Indicates that profile reporting thread should stop. // Tied to _report_thread_lock. - boost::condition_variable _stop_report_thread_cv; + std::condition_variable _stop_report_thread_cv; // Indicates that profile reporting thread started. // Tied to _report_thread_lock. - boost::condition_variable _report_thread_started_cv; + std::condition_variable _report_thread_started_cv; bool _report_thread_active; // true if we started the thread // true if _plan->get_next() indicated that it's done @@ -192,7 +193,7 @@ private: // lock ordering: // 1. _report_thread_lock // 2. _status_lock - boost::mutex _status_lock; + std::mutex _status_lock; // note that RuntimeState should be constructed before and destructed after `_sink' and `_row_batch', // therefore we declare it before `_sink' and `_row_batch' diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index 247acc0094..b1e74bd445 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -40,7 +40,7 @@ ResultBufferMgr::ResultBufferMgr() : _stop_background_threads_latch(1) { // Each BufferControlBlock has a limited queue size of 1024, it's not needed to count the // actual size of all BufferControlBlock. REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() { - boost::lock_guard l(_lock); + std::lock_guard l(_lock); return _buffer_map.size(); }); } @@ -71,7 +71,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size boost::shared_ptr control_block( new BufferControlBlock(query_id, buffer_size)); { - boost::lock_guard l(_lock); + std::lock_guard l(_lock); _buffer_map.insert(std::make_pair(query_id, control_block)); } *sender = control_block; @@ -81,7 +81,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size boost::shared_ptr ResultBufferMgr::find_control_block( const TUniqueId& query_id) { // TODO(zhaochun): this lock can be bottleneck? - boost::lock_guard l(_lock); + std::lock_guard l(_lock); BufferMap::iterator iter = _buffer_map.find(query_id); if (_buffer_map.end() != iter) { @@ -116,7 +116,7 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c } Status ResultBufferMgr::cancel(const TUniqueId& query_id) { - boost::lock_guard l(_lock); + std::lock_guard l(_lock); BufferMap::iterator iter = _buffer_map.find(query_id); if (_buffer_map.end() != iter) { @@ -128,7 +128,7 @@ Status ResultBufferMgr::cancel(const TUniqueId& query_id) { } Status ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) { - boost::lock_guard l(_timeout_lock); + std::lock_guard l(_timeout_lock); TimeoutMap::iterator iter = _timeout_map.find(cancel_time); if (_timeout_map.end() == iter) { @@ -149,7 +149,7 @@ void ResultBufferMgr::cancel_thread() { std::vector query_to_cancel; time_t now_time = time(NULL); { - boost::lock_guard l(_timeout_lock); + std::lock_guard l(_timeout_lock); TimeoutMap::iterator end = _timeout_map.upper_bound(now_time + 1); for (TimeoutMap::iterator iter = _timeout_map.begin(); iter != end; ++iter) { diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index b380b3b564..8a1ef99547 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -20,10 +20,10 @@ #include #include -#include #include -#include #include +#include +#include #include #include "common/status.h" @@ -63,7 +63,7 @@ public: Status cancel_at_time(time_t cancel_time, const TUniqueId& query_id); private: - typedef boost::unordered_map> BufferMap; + typedef std::unordered_map> BufferMap; typedef std::map> TimeoutMap; boost::shared_ptr find_control_block(const TUniqueId& query_id); @@ -73,12 +73,12 @@ private: void cancel_thread(); // lock for buffer map - boost::mutex _lock; + std::mutex _lock; // buffer block map BufferMap _buffer_map; // lock for timeout map - boost::mutex _timeout_lock; + std::mutex _timeout_lock; // map (cancel_time : query to be cancelled), // cancel time maybe equal, so use one list @@ -88,7 +88,7 @@ private: scoped_refptr _clean_thread; }; -// TUniqueId hash function used for boost::unordered_map +// TUniqueId hash function used for std::unordered_map std::size_t hash_value(const TUniqueId& fragment_id); } // namespace doris diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 8b7104fc33..ddbcde91b2 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -212,12 +212,11 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { auto mem_tracker_counter = ADD_COUNTER(&_profile, "MemoryLimit", TUnit::BYTES); mem_tracker_counter->set(bytes_limit); - _query_mem_tracker = MemTracker::CreateTracker( - bytes_limit, "RuntimeState:query:" + print_id(query_id), - _exec_env->process_mem_tracker()); + _query_mem_tracker = + MemTracker::CreateTracker(bytes_limit, "RuntimeState:query:" + print_id(query_id), + _exec_env->process_mem_tracker()); _instance_mem_tracker = MemTracker::CreateTracker( - &_profile, -1, "RuntimeState:instance:" + print_id(query_id), - _query_mem_tracker); + &_profile, -1, "RuntimeState:instance:" + print_id(query_id), _query_mem_tracker); /* // TODO: this is a stopgap until we implement ExprContext @@ -287,17 +286,17 @@ Status RuntimeState::create_block_mgr() { } bool RuntimeState::error_log_is_empty() { - boost::lock_guard l(_error_log_lock); + std::lock_guard l(_error_log_lock); return (_error_log.size() > 0); } std::string RuntimeState::error_log() { - boost::lock_guard l(_error_log_lock); + std::lock_guard l(_error_log_lock); return boost::algorithm::join(_error_log, "\n"); } bool RuntimeState::log_error(const std::string& error) { - boost::lock_guard l(_error_log_lock); + std::lock_guard l(_error_log_lock); if (_error_log.size() < _query_options.max_errors) { _error_log.push_back(error); @@ -316,7 +315,7 @@ void RuntimeState::log_error(const Status& status) { } void RuntimeState::get_unreported_errors(std::vector* new_errors) { - boost::lock_guard l(_error_log_lock); + std::lock_guard l(_error_log_lock); if (_unreported_error_idx < _error_log.size()) { new_errors->assign(_error_log.begin() + _unreported_error_idx, _error_log.end()); @@ -328,7 +327,7 @@ Status RuntimeState::set_mem_limit_exceeded(MemTracker* tracker, int64_t failed_ const std::string* msg) { DCHECK_GE(failed_allocation_size, 0); { - boost::lock_guard l(_process_status_lock); + std::lock_guard l(_process_status_lock); if (_process_status.ok()) { if (msg != nullptr) { _process_status = Status::MemoryLimitExceeded(*msg); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index dc7a2c8609..f62e63f1c9 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -20,10 +20,9 @@ #include #include -#include -#include #include #include +#include #include #include #include @@ -154,7 +153,7 @@ public: } Status query_status() { - boost::lock_guard l(_process_status_lock); + std::lock_guard l(_process_status_lock); return _process_status; }; @@ -182,7 +181,7 @@ public: // Returns true if the error log has not reached _max_errors. bool log_has_space() { - boost::lock_guard l(_error_log_lock); + std::lock_guard l(_error_log_lock); return _error_log.size() < _query_options.max_errors; } @@ -205,7 +204,7 @@ public: // Sets _process_status with err_msg if no error has been set yet. void set_process_status(const std::string& err_msg) { - boost::lock_guard l(_process_status_lock); + std::lock_guard l(_process_status_lock); if (!_process_status.ok()) { return; } @@ -216,7 +215,7 @@ public: if (status.ok()) { return; } - boost::lock_guard l(_process_status_lock); + std::lock_guard l(_process_status_lock); if (!_process_status.ok()) { return; } @@ -342,7 +341,9 @@ public: bool enable_spill() const { return _query_options.enable_spilling; } - bool enable_exchange_node_parallel_merge() const { return _query_options.enable_enable_exchange_node_parallel_merge; } + bool enable_exchange_node_parallel_merge() const { + return _query_options.enable_enable_exchange_node_parallel_merge; + } // the following getters are only valid after Prepare() InitialReservations* initial_reservations() const { return _initial_reservations; } @@ -363,7 +364,6 @@ public: int64_t get_load_mem_limit(); private: - // Use a custom block manager for the query for testing purposes. void set_block_mgr2(const boost::shared_ptr& block_mgr) { _block_mgr2 = block_mgr; @@ -394,7 +394,7 @@ private: std::shared_ptr _obj_pool; // Protects _data_stream_recvrs_pool - boost::mutex _data_stream_recvrs_lock; + std::mutex _data_stream_recvrs_lock; // Data stream receivers created by a plan fragment are gathered here to make sure // they are destroyed before _obj_pool (class members are destroyed in reverse order). @@ -404,7 +404,7 @@ private: boost::scoped_ptr _data_stream_recvrs_pool; // Lock protecting _error_log and _unreported_error_idx - boost::mutex _error_log_lock; + std::mutex _error_log_lock; // Logs error messages. std::vector _error_log; @@ -441,7 +441,7 @@ private: // Non-OK if an error has occurred and query execution should abort. Used only for // asynchronously reporting such errors (e.g., when a UDF reports an error), so this // will not necessarily be set in all error cases. - boost::mutex _process_status_lock; + std::mutex _process_status_lock; Status _process_status; //boost::scoped_ptr _udf_pool; diff --git a/be/src/runtime/sorted_run_merger.h b/be/src/runtime/sorted_run_merger.h index 1c3e2d4206..d361bb15b1 100644 --- a/be/src/runtime/sorted_run_merger.h +++ b/be/src/runtime/sorted_run_merger.h @@ -19,7 +19,7 @@ #define DORIS_BE_SRC_RUNTIME_SORTED_RUN_MERGER_H #include -#include +#include #include "common/object_pool.h" #include "util/tuple_row_compare.h" @@ -107,16 +107,14 @@ protected: RuntimeProfile::Counter* _get_next_batch_timer; }; -class ChildSortedRunMerger: public SortedRunMerger { +class ChildSortedRunMerger : public SortedRunMerger { public: - ChildSortedRunMerger(const TupleRowComparator& compare_less_than, - RowDescriptor* row_desc, - RuntimeProfile* profile, - MemTracker* _parent, - uint32_t row_batch_size, - bool deep_copy_input); + ChildSortedRunMerger(const TupleRowComparator& compare_less_than, RowDescriptor* row_desc, + RuntimeProfile* profile, MemTracker* _parent, uint32_t row_batch_size, + bool deep_copy_input); Status get_batch(RowBatch** output_batch) override; + private: // Ptr to prevent mem leak for api get_batch(Rowbatch**) std::unique_ptr _current_row_batch; diff --git a/be/src/runtime/thread_resource_mgr.cpp b/be/src/runtime/thread_resource_mgr.cpp index a35bc6da06..de15d4c8aa 100644 --- a/be/src/runtime/thread_resource_mgr.cpp +++ b/be/src/runtime/thread_resource_mgr.cpp @@ -67,7 +67,7 @@ void ThreadResourceMgr::ResourcePool::reserve_optional_tokens(int num) { } ThreadResourceMgr::ResourcePool* ThreadResourceMgr::register_pool() { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); ResourcePool* pool = NULL; if (_free_pool_objs.empty()) { @@ -89,7 +89,7 @@ ThreadResourceMgr::ResourcePool* ThreadResourceMgr::register_pool() { void ThreadResourceMgr::unregister_pool(ResourcePool* pool) { DCHECK(pool != NULL); - boost::unique_lock l(_lock); + std::unique_lock l(_lock); // this may be double unregistered after pr #3326 by LaiYingChun, so check if the pool is already unregisted if (_pools.find(pool) != _pools.end()) { _pools.erase(pool); @@ -99,7 +99,7 @@ void ThreadResourceMgr::unregister_pool(ResourcePool* pool) { } void ThreadResourceMgr::ResourcePool::set_thread_available_cb(thread_available_cb fn) { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); DCHECK(_thread_available_fn == NULL || fn == NULL); _thread_available_fn = fn; } @@ -118,7 +118,7 @@ void ThreadResourceMgr::update_pool_quotas(ResourcePool* new_pool) { continue; } - boost::unique_lock l(pool->_lock); + std::unique_lock l(pool->_lock); if (pool->num_available_threads() > 0 && pool->_thread_available_fn != NULL) { pool->_thread_available_fn(pool); diff --git a/be/src/runtime/thread_resource_mgr.h b/be/src/runtime/thread_resource_mgr.h index 8707abffc2..7f8482451e 100644 --- a/be/src/runtime/thread_resource_mgr.h +++ b/be/src/runtime/thread_resource_mgr.h @@ -23,9 +23,9 @@ #include #include #include -#include #include #include +#include #include "common/status.h" @@ -174,7 +174,7 @@ public: // Lock for the fields below. This lock is taken when the callback // function is called. // TODO: reconsider this. - boost::mutex _lock; + std::mutex _lock; thread_available_cb _thread_available_fn; }; @@ -201,7 +201,7 @@ private: int _system_threads_quota; // Lock for the entire object. Protects all fields below. - boost::mutex _lock; + std::mutex _lock; // Pools currently being managed typedef std::set Pools; @@ -273,7 +273,7 @@ inline void ThreadResourceMgr::ResourcePool::release_thread_token(bool required) // happens once when the scanner thread is complete. // TODO: reconsider this. if (num_available_threads() > 0 && _thread_available_fn != NULL) { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); if (num_available_threads() > 0 && _thread_available_fn != NULL) { _thread_available_fn(this); diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc index 11283026de..b1664040fb 100644 --- a/be/src/runtime/tmp_file_mgr.cc +++ b/be/src/runtime/tmp_file_mgr.cc @@ -18,8 +18,6 @@ #include "runtime/tmp_file_mgr.h" #include -#include -#include #include #include #include @@ -147,7 +145,7 @@ Status TmpFileMgr::get_file(const DeviceId& device_id, const TUniqueId& query_id } // Generate the full file path. - string unique_name = boost::lexical_cast(boost::uuids::random_generator()()); + string unique_name = boost::uuids::to_string(boost::uuids::random_generator()()); std::stringstream file_name; file_name << print_id(query_id) << "_" << unique_name; path new_file_path(_tmp_dirs[device_id].path()); @@ -169,7 +167,7 @@ void TmpFileMgr::blacklist_device(DeviceId device_id) { DCHECK(device_id >= 0 && device_id < _tmp_dirs.size()); bool added = true; { - boost::lock_guard l(_dir_status_lock); + std::lock_guard l(_dir_status_lock); added = _tmp_dirs[device_id].blacklist(); } if (added) { @@ -180,13 +178,13 @@ void TmpFileMgr::blacklist_device(DeviceId device_id) { bool TmpFileMgr::is_blacklisted(DeviceId device_id) { DCHECK(_initialized); DCHECK(device_id >= 0 && device_id < _tmp_dirs.size()); - boost::lock_guard l(_dir_status_lock); + std::lock_guard l(_dir_status_lock); return _tmp_dirs[device_id].is_blacklisted(); } int TmpFileMgr::num_active_tmp_devices() { DCHECK(_initialized); - boost::lock_guard l(_dir_status_lock); + std::lock_guard l(_dir_status_lock); int num_active = 0; for (int device_id = 0; device_id < _tmp_dirs.size(); ++device_id) { if (!_tmp_dirs[device_id].is_blacklisted()) { @@ -201,7 +199,7 @@ vector TmpFileMgr::active_tmp_devices() { // Allocate vector before we grab lock devices.reserve(_tmp_dirs.size()); { - boost::lock_guard l(_dir_status_lock); + std::lock_guard l(_dir_status_lock); for (DeviceId device_id = 0; device_id < _tmp_dirs.size(); ++device_id) { if (!_tmp_dirs[device_id].is_blacklisted()) { devices.push_back(device_id); diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 1fd9efbf86..d1d085d83a 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -20,11 +20,11 @@ #include #include -#include -#include #include #include -#include +#include +#include +#include #if defined(LEAK_SANITIZER) #include diff --git a/be/src/util/batch_process_thread_pool.hpp b/be/src/util/batch_process_thread_pool.hpp index ad8d1970b2..98822b583a 100644 --- a/be/src/util/batch_process_thread_pool.hpp +++ b/be/src/util/batch_process_thread_pool.hpp @@ -18,14 +18,15 @@ #ifndef DORIS_BE_SRC_COMMON_UTIL_BATCH_PROCESS_THREAD_POOL_HPP #define DORIS_BE_SRC_COMMON_UTIL_BATCH_PROCESS_THREAD_POOL_HPP -#include #include -#include -#include + +#include +#include +#include #include "common/config.h" -#include "util/stopwatch.hpp" #include "util/blocking_priority_queue.hpp" +#include "util/stopwatch.hpp" namespace doris { @@ -35,7 +36,7 @@ template class BatchProcessThreadPool { public: // Signature of function that process task batch by batch not one by one - typedef std::function)> BatchProcessFunction; + typedef std::function)> BatchProcessFunction; // Creates a new thread pool and start num_threads threads. // -- num_threads: how many threads are part of this pool @@ -44,16 +45,15 @@ public: // capacity available. // -- work_function: the function to run every time an item is consumed from the queue BatchProcessThreadPool(uint32_t num_threads, uint32_t queue_size, uint32_t batch_size, - BatchProcessFunction work_func) : - _thread_num(num_threads), - _work_queue(queue_size), - _shutdown(false), - _batch_size(batch_size), - _work_func(work_func) { + BatchProcessFunction work_func) + : _thread_num(num_threads), + _work_queue(queue_size), + _shutdown(false), + _batch_size(batch_size), + _work_func(work_func) { for (int i = 0; i < num_threads; ++i) { _threads.create_thread( - std::bind( - std::mem_fn(&BatchProcessThreadPool::work_thread), this, i)); + std::bind(std::mem_fn(&BatchProcessThreadPool::work_thread), this, i)); } } @@ -75,9 +75,7 @@ public: // // Returns true if the work item was successfully added to the queue, false otherwise // (which typically means that the thread pool has already been shut down). - bool offer(T task) { - return _work_queue.blocking_put(task); - } + bool offer(T task) { return _work_queue.blocking_put(task); } // Shuts the thread pool down, causing the work queue to cease accepting offered work // and the worker threads to terminate once they have processed their current work item. @@ -85,7 +83,7 @@ public: // terminate. void shutdown() { { - boost::lock_guard l(_lock); + std::lock_guard l(_lock); _shutdown = true; } _work_queue.shutdown(); @@ -93,20 +91,16 @@ public: // Blocks until all threads are finished. shutdown does not need to have been called, // since it may be called on a separate thread. - void join() { - _threads.join_all(); - } + void join() { _threads.join_all(); } - uint32_t get_queue_size() const { - return _work_queue.get_size(); - } + uint32_t get_queue_size() const { return _work_queue.get_size(); } // Blocks until the work queue is empty, and then calls shutdown to stop the worker // threads and Join to wait until they are finished. // Any work Offer()'ed during DrainAndShutdown may or may not be processed. void drain_and_shutdown() { { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); while (_work_queue.get_size() != 0) { _empty_cv.wait(l); } @@ -151,7 +145,7 @@ private: // Returns value of _shutdown under a lock, forcing visibility to threads in the pool. bool is_shutdown() { - boost::lock_guard l(_lock); + std::lock_guard l(_lock); return _shutdown; } @@ -165,19 +159,19 @@ private: boost::thread_group _threads; // Guards _shutdown and _empty_cv - boost::mutex _lock; + std::mutex _lock; // Set to true when threads should stop doing work and terminate. bool _shutdown; // Signalled when the queue becomes empty - boost::condition_variable _empty_cv; + std::condition_variable _empty_cv; uint32_t _batch_size; BatchProcessFunction _work_func; }; -} +} // namespace doris #endif diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index 815fa969f5..574ebd1bd0 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -18,10 +18,11 @@ #ifndef DORIS_BE_SRC_COMMON_UTIL_BLOCKING_PRIORITY_QUEUE_HPP #define DORIS_BE_SRC_COMMON_UTIL_BLOCKING_PRIORITY_QUEUE_HPP -#include #include -#include -#include + +#include +#include +#include #include "common/config.h" #include "util/stopwatch.hpp" @@ -33,20 +34,19 @@ namespace doris { template class BlockingPriorityQueue { public: - BlockingPriorityQueue(size_t max_elements) : - _shutdown(false), - _max_element(max_elements), - _upgrade_counter(0), - _total_get_wait_time(0), - _total_put_wait_time(0) { - } + BlockingPriorityQueue(size_t max_elements) + : _shutdown(false), + _max_element(max_elements), + _upgrade_counter(0), + _total_get_wait_time(0), + _total_put_wait_time(0) {} // Get an element from the queue, waiting indefinitely for one to become available. // Returns false if we were shut down prior to getting the element, and there // are no more elements available. bool blocking_get(T* out) { MonotonicStopWatch timer; - boost::unique_lock unique_lock(_lock); + std::unique_lock unique_lock(_lock); while (true) { if (!_queue.empty()) { @@ -83,7 +83,7 @@ public: bool non_blocking_get(T* out) { MonotonicStopWatch timer; - boost::unique_lock unique_lock(_lock); + std::unique_lock unique_lock(_lock); while (true) { if (!_queue.empty()) { @@ -119,7 +119,7 @@ public: // If the queue is shut down, returns false. bool blocking_put(const T& val) { MonotonicStopWatch timer; - boost::unique_lock unique_lock(_lock); + std::unique_lock unique_lock(_lock); while (_queue.size() >= _max_element && !_shutdown) { timer.start(); @@ -146,35 +146,35 @@ public: } uint32_t get_size() const { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); return _queue.size(); } // Returns the total amount of time threads have blocked in blocking_get. uint64_t total_get_wait_time() const { - boost::lock_guard guard(_lock); + std::lock_guard guard(_lock); return _total_get_wait_time; } // Returns the total amount of time threads have blocked in blocking_put. uint64_t total_put_wait_time() const { - boost::lock_guard guard(_lock); + std::lock_guard guard(_lock); return _total_put_wait_time; } private: std::atomic _shutdown; const int _max_element; - boost::condition_variable _get_cv; // 'get' callers wait on this - boost::condition_variable _put_cv; // 'put' callers wait on this + std::condition_variable _get_cv; // 'get' callers wait on this + std::condition_variable _put_cv; // 'put' callers wait on this // _lock guards access to _queue, total_get_wait_time, and total_put_wait_time - mutable boost::mutex _lock; + mutable std::mutex _lock; std::priority_queue _queue; int _upgrade_counter; uint64_t _total_get_wait_time; uint64_t _total_put_wait_time; }; -} +} // namespace doris #endif diff --git a/be/src/util/container_util.hpp b/be/src/util/container_util.hpp index 5eb7cfd554..b9d03787c5 100644 --- a/be/src/util/container_util.hpp +++ b/be/src/util/container_util.hpp @@ -19,30 +19,26 @@ #define DORIS_BE_SRC_COMMON__UTIL_CONTAINER_UTIL_HPP #include -#include +#include -#include "util/hash_util.hpp" #include "gen_cpp/Types_types.h" +#include "util/hash_util.hpp" namespace doris { // Hash function for TNetworkAddress. This function must be called hash_value to be picked // up properly by boost. inline std::size_t hash_value(const TNetworkAddress& host_port) { - uint32_t hash = - HashUtil::hash(host_port.hostname.c_str(), host_port.hostname.length(), 0); + uint32_t hash = HashUtil::hash(host_port.hostname.c_str(), host_port.hostname.length(), 0); return HashUtil::hash(&host_port.port, sizeof(host_port.port), hash); } struct HashTNetworkAddressPtr : public std::unary_function { - size_t operator()(const TNetworkAddress* const& p) const { - return hash_value(*p); - } + size_t operator()(const TNetworkAddress* const& p) const { return hash_value(*p); } }; struct TNetworkAddressPtrEquals : public std::unary_function { - bool operator()(const TNetworkAddress* const& p1, - const TNetworkAddress* const& p2) const { + bool operator()(const TNetworkAddress* const& p1, const TNetworkAddress* const& p2) const { return p1->hostname == p2->hostname && p1->port == p2->port; } }; @@ -62,8 +58,8 @@ V* find_or_insert(std::map* m, const K& key, const V& default_val) { } template -V* find_or_insert(boost::unordered_map* m, const K& key, const V& default_val) { - typename boost::unordered_map::iterator it = m->find(key); +V* find_or_insert(std::unordered_map* m, const K& key, const V& default_val) { + typename std::unordered_map::iterator it = m->find(key); if (it == m->end()) { it = m->insert(make_pair(key, default_val)).first; @@ -87,9 +83,8 @@ const V& find_with_default(const std::map& m, const K& key, const V& defau } template -const V& find_with_default(const boost::unordered_map& m, const K& key, - const V& default_val) { - typename boost::unordered_map::const_iterator it = m.find(key); +const V& find_with_default(const std::unordered_map& m, const K& key, const V& default_val) { + typename std::unordered_map::const_iterator it = m.find(key); if (it == m.end()) { return default_val; @@ -98,6 +93,6 @@ const V& find_with_default(const boost::unordered_map& m, const K& key, return it->second; } -} +} // namespace doris #endif diff --git a/be/src/util/error_util.h b/be/src/util/error_util.h index cbf80dfaa7..35de1481fe 100644 --- a/be/src/util/error_util.h +++ b/be/src/util/error_util.h @@ -19,7 +19,6 @@ #define DORIS_BE_SRC_UTIL_ERROR_UTIL_H #include -#include #include #include diff --git a/be/src/util/internal_queue.h b/be/src/util/internal_queue.h index 7a6d9b6ead..8338ee713d 100644 --- a/be/src/util/internal_queue.h +++ b/be/src/util/internal_queue.h @@ -19,7 +19,7 @@ #define DORIS_BE_SRC_UTIL_INTERNAL_QUEUE_H #include -#include +#include #include "util/fake_lock.h" #include "util/spinlock.h" @@ -56,11 +56,11 @@ public: /// Returns the Next/Prev node or NULL if this is the end/front. T* next() const { - boost::lock_guard lock(parent_queue->lock_); + std::lock_guard lock(parent_queue->lock_); return reinterpret_cast(next_node); } T* prev() const { - boost::lock_guard lock(parent_queue->lock_); + std::lock_guard lock(parent_queue->lock_); return reinterpret_cast(prev_node); } @@ -78,7 +78,7 @@ public: /// Returns the element at the head of the list without dequeuing or NULL /// if the queue is empty. This is O(1). T* head() const { - boost::lock_guard lock(lock_); + std::lock_guard lock(lock_); if (empty()) return NULL; return reinterpret_cast(head_); } @@ -86,7 +86,7 @@ public: /// Returns the element at the end of the list without dequeuing or NULL /// if the queue is empty. This is O(1). T* tail() { - boost::lock_guard lock(lock_); + std::lock_guard lock(lock_); if (empty()) return NULL; return reinterpret_cast(tail_); } @@ -99,7 +99,7 @@ public: DCHECK(node->parent_queue == NULL); node->parent_queue = this; { - boost::lock_guard lock(lock_); + std::lock_guard lock(lock_); if (tail_ != NULL) tail_->next_node = node; node->prev_node = tail_; tail_ = node; @@ -113,7 +113,7 @@ public: T* dequeue() { Node* result = NULL; { - boost::lock_guard lock(lock_); + std::lock_guard lock(lock_); if (empty()) return NULL; --size_; result = head_; @@ -135,7 +135,7 @@ public: T* pop_back() { Node* result = NULL; { - boost::lock_guard lock(lock_); + std::lock_guard lock(lock_); if (empty()) return NULL; --size_; result = tail_; @@ -158,7 +158,7 @@ public: Node* node = (Node*)n; if (node->parent_queue != this) return false; { - boost::lock_guard lock(lock_); + std::lock_guard lock(lock_); if (node->next_node == NULL && node->prev_node == NULL) { // Removing only node DCHECK(node == head_); @@ -192,7 +192,7 @@ public: /// Clears all elements in the list. void clear() { - boost::lock_guard lock(lock_); + std::lock_guard lock(lock_); Node* cur = head_; while (cur != NULL) { Node* tmp = cur; @@ -213,7 +213,7 @@ public: /// Validates the internal structure of the list bool validate() { int num_elements_found = 0; - boost::lock_guard lock(lock_); + std::lock_guard lock(lock_); if (head_ == NULL) { if (tail_ != NULL) return false; if (size() != 0) return false; @@ -241,7 +241,7 @@ public: // false, terminate iteration. It is invalid to call other InternalQueue methods // from 'fn'. void iterate(boost::function fn) { - boost::lock_guard lock(lock_); + std::lock_guard lock(lock_); for (Node* current = head_; current != NULL; current = current->next_node) { if (!fn(reinterpret_cast(current))) return; } @@ -252,7 +252,7 @@ public: std::stringstream ss; ss << "("; { - boost::lock_guard lock(lock_); + std::lock_guard lock(lock_); Node* curr = head_; while (curr != NULL) { ss << (void*)curr; diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index b0e836c6e4..18d15ebc49 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -18,11 +18,11 @@ #ifndef DORIS_BE_SRC_COMMON_UTIL_PRIORITY_THREAD_POOL_HPP #define DORIS_BE_SRC_COMMON_UTIL_PRIORITY_THREAD_POOL_HPP -#include "util/blocking_priority_queue.hpp" - -#include -#include #include +#include +#include + +#include "util/blocking_priority_queue.hpp" namespace doris { @@ -33,15 +33,13 @@ public: // Signature of a work-processing function. Takes the integer id of the thread which is // calling it (ids run from 0 to num_threads - 1) and a reference to the item to // process. - typedef boost::function WorkFunction; + typedef boost::function WorkFunction; struct Task { public: int priority; WorkFunction work_function; - bool operator< (const Task& o) const { - return priority < o.priority; - } + bool operator<(const Task& o) const { return priority < o.priority; } Task& operator++() { priority += 2; @@ -55,13 +53,11 @@ public: // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. // -- work_function: the function to run every time an item is consumed from the queue - PriorityThreadPool(uint32_t num_threads, uint32_t queue_size) : - _work_queue(queue_size), - _shutdown(false) { + PriorityThreadPool(uint32_t num_threads, uint32_t queue_size) + : _work_queue(queue_size), _shutdown(false) { for (int i = 0; i < num_threads; ++i) { _threads.create_thread( - boost::bind( - boost::mem_fn(&PriorityThreadPool::work_thread), this, i)); + boost::bind(boost::mem_fn(&PriorityThreadPool::work_thread), this, i)); } } @@ -83,9 +79,7 @@ public: // // Returns true if the work item was successfully added to the queue, false otherwise // (which typically means that the thread pool has already been shut down). - bool offer(Task task) { - return _work_queue.blocking_put(task); - } + bool offer(Task task) { return _work_queue.blocking_put(task); } bool offer(WorkFunction func) { PriorityThreadPool::Task task = {0, func}; @@ -103,20 +97,16 @@ public: // Blocks until all threads are finished. shutdown does not need to have been called, // since it may be called on a separate thread. - void join() { - _threads.join_all(); - } + void join() { _threads.join_all(); } - uint32_t get_queue_size() const { - return _work_queue.get_size(); - } + uint32_t get_queue_size() const { return _work_queue.get_size(); } // Blocks until the work queue is empty, and then calls shutdown to stop the worker // threads and Join to wait until they are finished. // Any work Offer()'ed during DrainAndshutdown may or may not be processed. void drain_and_shutdown() { { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); while (_work_queue.get_size() != 0) { _empty_cv.wait(l); } @@ -140,9 +130,7 @@ private: } } - bool is_shutdown() { - return _shutdown; - } + bool is_shutdown() { return _shutdown; } // Queue on which work items are held until a thread is available to process them in // FIFO order. @@ -152,15 +140,15 @@ private: boost::thread_group _threads; // Guards _empty_cv - boost::mutex _lock; + std::mutex _lock; // Set to true when threads should stop doing work and terminate. std::atomic _shutdown; // Signalled when the queue becomes empty - boost::condition_variable _empty_cv; + std::condition_variable _empty_cv; }; -} +} // namespace doris #endif diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index a2fd8f8d3d..0d9a5b5b68 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -17,7 +17,6 @@ #include "util/runtime_profile.h" -#include #include #include #include @@ -82,8 +81,8 @@ void RuntimeProfile::merge(RuntimeProfile* other) { { CounterMap::iterator dst_iter; CounterMap::const_iterator src_iter; - boost::lock_guard l(_counter_map_lock); - boost::lock_guard m(other->_counter_map_lock); + std::lock_guard l(_counter_map_lock); + std::lock_guard m(other->_counter_map_lock); for (src_iter = other->_counter_map.begin(); src_iter != other->_counter_map.end(); ++src_iter) { @@ -117,8 +116,8 @@ void RuntimeProfile::merge(RuntimeProfile* other) { } { - boost::lock_guard l(_children_lock); - boost::lock_guard m(other->_children_lock); + std::lock_guard l(_children_lock); + std::lock_guard m(other->_children_lock); // Recursively merge children with matching names for (int i = 0; i < other->_children.size(); ++i) { @@ -152,7 +151,7 @@ void RuntimeProfile::update(const std::vector& nodes, int* DCHECK_LT(*idx, nodes.size()); const TRuntimeProfileNode& node = nodes[*idx]; { - boost::lock_guard l(_counter_map_lock); + std::lock_guard l(_counter_map_lock); // update this level std::map::iterator dst_iter; @@ -185,7 +184,7 @@ void RuntimeProfile::update(const std::vector& nodes, int* } { - boost::lock_guard l(_info_strings_lock); + std::lock_guard l(_info_strings_lock); const InfoStrings& info_strings = node.info_strings; for (const std::string& key : node.info_strings_display_order) { // Look for existing info strings and update in place. If there @@ -207,7 +206,7 @@ void RuntimeProfile::update(const std::vector& nodes, int* ++*idx; { - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); // update children with matching names; create new ones if they don't match for (int i = 0; i < node.num_children; ++i) { @@ -233,7 +232,7 @@ void RuntimeProfile::divide(int n) { DCHECK_GT(n, 0); std::map::iterator iter; { - boost::lock_guard l(_counter_map_lock); + std::lock_guard l(_counter_map_lock); for (iter = _counter_map.begin(); iter != _counter_map.end(); ++iter) { if (iter->second->type() == TUnit::DOUBLE_VALUE) { @@ -246,7 +245,7 @@ void RuntimeProfile::divide(int n) { } } { - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); for (ChildMap::iterator i = _child_map.begin(); i != _child_map.end(); ++i) { i->second->divide(n); @@ -265,7 +264,7 @@ void RuntimeProfile::compute_time_in_profile(int64_t total) { // Add all the total times in all the children int64_t total_child_time = 0; - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); for (int i = 0; i < _children.size(); ++i) { total_child_time += _children[i].first->total_time_counter()->value(); @@ -284,7 +283,7 @@ void RuntimeProfile::compute_time_in_profile(int64_t total) { } RuntimeProfile* RuntimeProfile::create_child(const std::string& name, bool indent, bool prepend) { - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); DCHECK(_child_map.find(name) == _child_map.end()); RuntimeProfile* child = _pool->add(new RuntimeProfile(name)); if (_children.empty()) { @@ -314,13 +313,13 @@ void RuntimeProfile::add_child_unlock(RuntimeProfile* child, bool indent, Runtim } void RuntimeProfile::add_child(RuntimeProfile* child, bool indent, RuntimeProfile* loc) { - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); add_child_unlock(child, indent, loc); } void RuntimeProfile::get_children(std::vector* children) { children->clear(); - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); for (ChildMap::iterator i = _child_map.begin(); i != _child_map.end(); ++i) { children->push_back(i->second); @@ -328,7 +327,7 @@ void RuntimeProfile::get_children(std::vector* children) { } void RuntimeProfile::get_all_children(std::vector* children) { - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); for (ChildMap::iterator i = _child_map.begin(); i != _child_map.end(); ++i) { children->push_back(i->second); @@ -337,7 +336,7 @@ void RuntimeProfile::get_all_children(std::vector* children) { } void RuntimeProfile::add_info_string(const std::string& key, const std::string& value) { - boost::lock_guard l(_info_strings_lock); + std::lock_guard l(_info_strings_lock); InfoStrings::iterator it = _info_strings.find(key); if (it == _info_strings.end()) { @@ -349,7 +348,7 @@ void RuntimeProfile::add_info_string(const std::string& key, const std::string& } const std::string* RuntimeProfile::get_info_string(const std::string& key) { - boost::lock_guard l(_info_strings_lock); + std::lock_guard l(_info_strings_lock); InfoStrings::const_iterator it = _info_strings.find(key); if (it == _info_strings.end()) { @@ -363,7 +362,7 @@ const std::string* RuntimeProfile::get_info_string(const std::string& key) { RuntimeProfile::T* RuntimeProfile::NAME(const std::string& name, TUnit::type unit, \ const std::string& parent_counter_name) { \ DCHECK_EQ(_is_averaged_profile, false); \ - boost::lock_guard l(_counter_map_lock); \ + std::lock_guard l(_counter_map_lock); \ if (_counter_map.find(name) != _counter_map.end()) { \ return reinterpret_cast(_counter_map[name]); \ } \ @@ -384,7 +383,7 @@ ADD_COUNTER_IMPL(AddHighWaterMarkCounter, HighWaterMarkCounter); std::shared_ptr RuntimeProfile::AddSharedHighWaterMarkCounter( const std::string& name, TUnit::type unit, const std::string& parent_counter_name) { DCHECK_EQ(_is_averaged_profile, false); - boost::lock_guard l(_counter_map_lock); + std::lock_guard l(_counter_map_lock); if (_shared_counter_pool.find(name) != _shared_counter_pool.end()) { return _shared_counter_pool[name]; } @@ -406,7 +405,7 @@ std::shared_ptr RuntimeProfile::AddSharedH RuntimeProfile::Counter* RuntimeProfile::add_counter(const std::string& name, TUnit::type type, const std::string& parent_counter_name) { - boost::lock_guard l(_counter_map_lock); + std::lock_guard l(_counter_map_lock); // TODO(yingchun): Can we ensure that 'name' is not exist in '_counter_map'? Use CHECK instead? if (_counter_map.find(name) != _counter_map.end()) { @@ -427,7 +426,7 @@ RuntimeProfile::Counter* RuntimeProfile::add_counter(const std::string& name, TU RuntimeProfile::DerivedCounter* RuntimeProfile::add_derived_counter( const std::string& name, TUnit::type type, const DerivedCounterFunction& counter_fn, const std::string& parent_counter_name) { - boost::lock_guard l(_counter_map_lock); + std::lock_guard l(_counter_map_lock); if (_counter_map.find(name) != _counter_map.end()) { return NULL; @@ -456,7 +455,7 @@ RuntimeProfile::ThreadCounters* RuntimeProfile::add_thread_counters(const std::s } RuntimeProfile::Counter* RuntimeProfile::get_counter(const std::string& name) { - boost::lock_guard l(_counter_map_lock); + std::lock_guard l(_counter_map_lock); if (_counter_map.find(name) != _counter_map.end()) { return _counter_map[name]; @@ -472,7 +471,7 @@ void RuntimeProfile::get_counters(const std::string& name, std::vector counters->push_back(c); } - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); for (int i = 0; i < _children.size(); ++i) { _children[i].first->get_counters(name, counters); @@ -492,7 +491,7 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co CounterMap counter_map; ChildCounterMap child_counter_map; { - boost::lock_guard l(_counter_map_lock); + std::lock_guard l(_counter_map_lock); counter_map = _counter_map; child_counter_map = _child_counter_map; } @@ -512,7 +511,7 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co stream << std::endl; { - boost::lock_guard l(_info_strings_lock); + std::lock_guard l(_info_strings_lock); for (const std::string& key : _info_strings_display_order) { stream << prefix << " - " << key << ": " << _info_strings.find(key)->second << std::endl; @@ -526,7 +525,7 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co // - Event 2: 2s288ms (2s288ms) // - Event 3: 2s410ms (121.138ms) // The times in parentheses are the time elapsed since the last event. - boost::lock_guard l(_event_sequences_lock); + std::lock_guard l(_event_sequences_lock); for (const EventSequenceMap::value_type& event_sequence : _event_sequence_map) { stream << prefix << " " << event_sequence.first << ": " << PrettyPrinter::print(event_sequence.second->elapsed_time(), TUnit::TIME_NS) @@ -549,7 +548,7 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co // pretty_print() on the children ChildVector children; { - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); children = _children; } @@ -578,7 +577,7 @@ void RuntimeProfile::to_thrift(std::vector* nodes) { CounterMap counter_map; { - boost::lock_guard l(_counter_map_lock); + std::lock_guard l(_counter_map_lock); counter_map = _counter_map; node.child_counters_map = _child_counter_map; } @@ -593,14 +592,14 @@ void RuntimeProfile::to_thrift(std::vector* nodes) { } { - boost::lock_guard l(_info_strings_lock); + std::lock_guard l(_info_strings_lock); node.info_strings = _info_strings; node.info_strings_display_order = _info_strings_display_order; } ChildVector children; { - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); children = _children; } @@ -685,7 +684,7 @@ void RuntimeProfile::add_bucketing_counters(const std::string& name, Counter* src_counter, int num_buckets, std::vector* buckets) { { - boost::lock_guard l(_counter_map_lock); + std::lock_guard l(_counter_map_lock); _bucketing_counters.insert(buckets); } @@ -696,7 +695,7 @@ void RuntimeProfile::add_bucketing_counters(const std::string& name, add_counter(counter_name.str(), TUnit::DOUBLE_VALUE, parent_counter_name)); } - boost::lock_guard l(_s_periodic_counter_update_state.lock); + std::lock_guard l(_s_periodic_counter_update_state.lock); if (_s_periodic_counter_update_state.update_thread.get() == NULL) { _s_periodic_counter_update_state.update_thread.reset( @@ -710,7 +709,7 @@ void RuntimeProfile::add_bucketing_counters(const std::string& name, } RuntimeProfile::EventSequence* RuntimeProfile::add_event_sequence(const std::string& name) { - boost::lock_guard l(_event_sequences_lock); + std::lock_guard l(_event_sequences_lock); EventSequenceMap::iterator timer_it = _event_sequence_map.find(name); if (timer_it != _event_sequence_map.end()) { @@ -726,7 +725,7 @@ void RuntimeProfile::register_periodic_counter(Counter* src_counter, SampleFn sa Counter* dst_counter, PeriodicCounterType type) { DCHECK(src_counter == NULL || sample_fn == NULL); - boost::lock_guard l(_s_periodic_counter_update_state.lock); + std::lock_guard l(_s_periodic_counter_update_state.lock); if (_s_periodic_counter_update_state.update_thread.get() == NULL) { _s_periodic_counter_update_state.update_thread.reset( @@ -759,19 +758,19 @@ void RuntimeProfile::register_periodic_counter(Counter* src_counter, SampleFn sa } void RuntimeProfile::stop_rate_counters_updates(Counter* rate_counter) { - boost::lock_guard l(_s_periodic_counter_update_state.lock); + std::lock_guard l(_s_periodic_counter_update_state.lock); _s_periodic_counter_update_state.rate_counters.erase(rate_counter); } void RuntimeProfile::stop_sampling_counters_updates(Counter* sampling_counter) { - boost::lock_guard l(_s_periodic_counter_update_state.lock); + std::lock_guard l(_s_periodic_counter_update_state.lock); _s_periodic_counter_update_state.sampling_counters.erase(sampling_counter); } void RuntimeProfile::stop_bucketing_counters_updates(std::vector* buckets, bool convert) { int64_t num_sampled = 0; { - boost::lock_guard l(_s_periodic_counter_update_state.lock); + std::lock_guard l(_s_periodic_counter_update_state.lock); PeriodicCounterUpdateState::BucketCountersMap::const_iterator itr = _s_periodic_counter_update_state.bucketing_counters.find(buckets); @@ -795,7 +794,7 @@ RuntimeProfile::PeriodicCounterUpdateState::~PeriodicCounterUpdateState() { if (_s_periodic_counter_update_state.update_thread.get() != NULL) { { // Lock to ensure the update thread will see the update to _done - boost::lock_guard l(_s_periodic_counter_update_state.lock); + std::lock_guard l(_s_periodic_counter_update_state.lock); _done = true; } _s_periodic_counter_update_state.update_thread->join(); @@ -809,7 +808,7 @@ void RuntimeProfile::periodic_counter_update_loop() { boost::posix_time::time_duration elapsed = boost::get_system_time() - before_time; int elapsed_ms = elapsed.total_milliseconds(); - boost::lock_guard l(_s_periodic_counter_update_state.lock); + std::lock_guard l(_s_periodic_counter_update_state.lock); for (PeriodicCounterUpdateState::RateCounterMap::iterator it = _s_periodic_counter_update_state.rate_counters.begin(); diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 58eedb1669..b0b9416136 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -23,9 +23,9 @@ #include #include -#include #include #include +#include #include "common/logging.h" #include "common/object_pool.h" @@ -48,13 +48,15 @@ namespace doris { #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) #define ADD_CHILD_TIMER(profile, name, parent) (profile)->add_counter(name, TUnit::TIME_NS, parent) #define SCOPED_TIMER(c) ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) -#define SCOPED_CPU_TIMER(c) ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) +#define SCOPED_CPU_TIMER(c) \ + ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \ ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled) #define SCOPED_RAW_TIMER(c) \ ScopedRawTimer MACRO_CONCAT(SCOPED_RAW_TIMER, __COUNTER__)(c) -#define SCOPED_ATOMIC_TIMER(c) \ - ScopedRawTimer> MACRO_CONCAT(SCOPED_ATOMIC_TIMER, __COUNTER__)(c) +#define SCOPED_ATOMIC_TIMER(c) \ + ScopedRawTimer> MACRO_CONCAT(SCOPED_ATOMIC_TIMER, \ + __COUNTER__)(c) #define COUNTER_UPDATE(c, v) (c)->update(v) #define COUNTER_SET(c, v) (c)->set(v) #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->add_thread_counters(prefix) @@ -283,7 +285,7 @@ public: // invalidate pointers to profiles. template void sort_childer(const Compare& cmp) { - boost::lock_guard l(_children_lock); + std::lock_guard l(_children_lock); std::sort(_children.begin(), _children.end(), cmp); } @@ -481,7 +483,7 @@ private: std::set*> _bucketing_counters; // protects _counter_map, _counter_child_map and _bucketing_counters - mutable boost::mutex _counter_map_lock; + mutable std::mutex _counter_map_lock; // Child profiles. Does not own memory. // We record children in both a map (to facilitate updates) and a vector @@ -491,7 +493,7 @@ private: // vector of (profile, indentation flag) typedef std::vector> ChildVector; ChildVector _children; - mutable boost::mutex _children_lock; // protects _child_map and _children + mutable std::mutex _children_lock; // protects _child_map and _children typedef std::map InfoStrings; InfoStrings _info_strings; @@ -501,11 +503,11 @@ private: InfoStringsDisplayOrder _info_strings_display_order; // Protects _info_strings and _info_strings_display_order - mutable boost::mutex _info_strings_lock; + mutable std::mutex _info_strings_lock; typedef std::map EventSequenceMap; EventSequenceMap _event_sequence_map; - mutable boost::mutex _event_sequences_lock; + mutable std::mutex _event_sequences_lock; Counter _counter_total_time; // Time spent in just in this profile (i.e. not the children) as a fraction @@ -545,7 +547,7 @@ private: ~PeriodicCounterUpdateState(); // Lock protecting state below - boost::mutex lock; + std::mutex lock; // If true, tear down the update thread. volatile bool _done; diff --git a/be/src/util/streaming_sampler.h b/be/src/util/streaming_sampler.h index 03b1cec0ce..c475f5211a 100644 --- a/be/src/util/streaming_sampler.h +++ b/be/src/util/streaming_sampler.h @@ -20,7 +20,6 @@ #include -#include #include #include "util/spinlock.h" @@ -64,7 +63,7 @@ public: /// TODO: we can make this more complex by taking a weighted average of samples /// accumulated in a period. void AddSample(T sample, int ms) { - boost::lock_guard l(lock_); + std::lock_guard l(lock_); ++current_sample_count_; current_sample_sum_ += sample; current_sample_total_time_ += ms; @@ -104,7 +103,7 @@ public: void SetSamples(int period, const std::vector& samples) { DCHECK_LE(samples.size(), MAX_SAMPLES); - boost::lock_guard l(lock_); + std::lock_guard l(lock_); period_ = period; samples_collected_ = samples.size(); memcpy(samples_, &samples[0], sizeof(T) * samples_collected_); @@ -114,7 +113,7 @@ public: } std::string DebugString(const std::string& prefix = "") const { - boost::lock_guard l(lock_); + std::lock_guard l(lock_); std::stringstream ss; ss << prefix << "Period = " << period_ << std::endl << prefix << "Num = " << samples_collected_ << std::endl diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp index 881f5f96d1..f6d51e313d 100644 --- a/be/src/util/thrift_rpc_helper.cpp +++ b/be/src/util/thrift_rpc_helper.cpp @@ -18,7 +18,6 @@ #include "util/thrift_rpc_helper.h" #include -#include #include #include diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp index dc5a5edbe8..5257498ce2 100644 --- a/be/src/util/thrift_server.cpp +++ b/be/src/util/thrift_server.cpp @@ -28,8 +28,8 @@ #include #include -#include -#include +#include +#include #include #include "util/doris_metrics.h" @@ -80,11 +80,11 @@ private: // Lock used to ensure that there are no missed notifications between starting the // supervision thread and calling _signal_cond.timed_wait. Also used to ensure // thread-safe access to members of _thrift_server - boost::mutex _signal_lock; + std::mutex _signal_lock; // Condition variable that is notified by the supervision thread once either // a) all is well or b) an error occurred. - boost::condition_variable _signal_cond; + std::condition_variable _signal_cond; // The ThriftServer under management. This class is a friend of ThriftServer, and // reaches in to change member variables at will. @@ -105,20 +105,18 @@ const int ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS = 2500; Status ThriftServer::ThriftServerEventProcessor::start_and_wait_for_server() { // Locking here protects against missed notifications if Supervise executes quickly - boost::unique_lock lock(_signal_lock); + std::unique_lock lock(_signal_lock); _thrift_server->_started = false; _thrift_server->_server_thread.reset( new boost::thread(&ThriftServer::ThriftServerEventProcessor::supervise, this)); - boost::system_time deadline = - boost::get_system_time() + boost::posix_time::milliseconds(TIMEOUT_MS); - // Loop protects against spurious wakeup. Locks provide necessary fences to ensure // visibility. while (!_signal_fired) { // Yields lock and allows supervision thread to continue and signal - if (!_signal_cond.timed_wait(lock, deadline)) { + std::cv_status cvsts = _signal_cond.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS)); + if (cvsts == std::cv_status::timeout) { std::stringstream ss; ss << "ThriftServer '" << _thrift_server->_name << "' (on port: " << _thrift_server->_port << ") did not start within " << TIMEOUT_MS @@ -154,7 +152,7 @@ void ThriftServer::ThriftServerEventProcessor::supervise() { { // _signal_lock ensures mutual exclusion of access to _thrift_server - boost::lock_guard lock(_signal_lock); + std::lock_guard lock(_signal_lock); _thrift_server->_started = false; // There may not be anyone waiting on this signal (if the @@ -170,7 +168,7 @@ void ThriftServer::ThriftServerEventProcessor::supervise() { void ThriftServer::ThriftServerEventProcessor::preServe() { // Acquire the signal lock to ensure that StartAndWaitForServer is // waiting on _signal_cond when we notify. - boost::lock_guard lock(_signal_lock); + std::lock_guard lock(_signal_lock); _signal_fired = true; // This is the (only) success path - if this is not reached within TIMEOUT_MS, @@ -221,7 +219,7 @@ void* ThriftServer::ThriftServerEventProcessor::createContext( ss << socket->getPeerAddress() << ":" << socket->getPeerPort(); { - boost::lock_guard _l(_thrift_server->_session_keys_lock); + std::lock_guard _l(_thrift_server->_session_keys_lock); boost::shared_ptr key_ptr(new std::string(ss.str())); @@ -257,7 +255,7 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext( } { - boost::lock_guard _l(_thrift_server->_session_keys_lock); + std::lock_guard _l(_thrift_server->_session_keys_lock); _thrift_server->_session_keys.erase(_session_key); } diff --git a/be/src/util/thrift_server.h b/be/src/util/thrift_server.h index b0ab22022e..0c4150bbc7 100644 --- a/be/src/util/thrift_server.h +++ b/be/src/util/thrift_server.h @@ -24,8 +24,8 @@ #include #include #include -#include -#include +#include +#include #include "common/status.h" #include "util/metrics.h" @@ -130,11 +130,11 @@ private: SessionHandlerIf* _session_handler; // Protects _session_keys - boost::mutex _session_keys_lock; + std::mutex _session_keys_lock; // Map of active session keys to shared_ptr containing that key; when a key is // removed it is automatically freed. - typedef boost::unordered_map> SessionKeySet; + typedef std::unordered_map> SessionKeySet; SessionKeySet _session_keys; // Helper class which monitors starting servers. Needs access to internal members, and diff --git a/be/test/runtime/buffered_block_mgr2_test.cpp b/be/test/runtime/buffered_block_mgr2_test.cpp index 62ac0c65d1..1e34e8ca01 100644 --- a/be/test/runtime/buffered_block_mgr2_test.cpp +++ b/be/test/runtime/buffered_block_mgr2_test.cpp @@ -40,7 +40,7 @@ using std::filesystem::directory_iterator; using std::filesystem::remove; using boost::scoped_ptr; -using boost::unordered_map; +using std::unordered_map; using boost::thread; using std::string; diff --git a/be/test/runtime/disk_io_mgr_test.cpp b/be/test/runtime/disk_io_mgr_test.cpp index ed9382655d..cc55459e33 100644 --- a/be/test/runtime/disk_io_mgr_test.cpp +++ b/be/test/runtime/disk_io_mgr_test.cpp @@ -33,11 +33,11 @@ using std::stringstream; using std::vector; using std::list; -using boost::lock_guard; -using boost::unique_lock; -using boost::mutex; +using std::lock_guard; +using std::unique_lock; +using std::mutex; using boost::mem_fn; -using boost::condition_variable; +using std::condition_variable; using boost::scoped_ptr; using boost::thread; using boost::thread_group; diff --git a/be/test/runtime/large_int_value_test.cpp b/be/test/runtime/large_int_value_test.cpp index 98e9c03f64..6b9c52fdca 100644 --- a/be/test/runtime/large_int_value_test.cpp +++ b/be/test/runtime/large_int_value_test.cpp @@ -19,7 +19,6 @@ #include -#include #include #include #include diff --git a/be/test/util/blocking_queue_test.cpp b/be/test/util/blocking_queue_test.cpp index fed934b92d..4dbf1b47b2 100644 --- a/be/test/util/blocking_queue_test.cpp +++ b/be/test/util/blocking_queue_test.cpp @@ -22,7 +22,7 @@ #include #include -#include +#include namespace doris { @@ -65,7 +65,7 @@ public: } { - boost::lock_guard guard(_lock); + std::lock_guard guard(_lock); if (--_num_inserters == 0) { _queue.shutdown(); @@ -83,7 +83,7 @@ public: } { - boost::lock_guard guard(_lock); + std::lock_guard guard(_lock); _gotten[arg] = _gotten[arg] + 1; } } @@ -107,7 +107,7 @@ public: } // Let's check to make sure we got what we should have. - boost::lock_guard guard(_lock); + std::lock_guard guard(_lock); for (int i = 0; i < _nthreads; ++i) { ASSERT_EQ(_iterations, _gotten[i]); @@ -126,7 +126,7 @@ private: int _nthreads; BlockingQueue _queue; // Lock for _gotten and _num_inserters. - boost::mutex _lock; + std::mutex _lock; // Map from inserter thread id to number of consumed elements from that id. // Ultimately, this should map each thread id to _insertions elements. // Additionally, if the blocking_get returns false, this increments id=-1. diff --git a/be/test/util/broker_storage_backend_test.cpp b/be/test/util/broker_storage_backend_test.cpp index 91a4a58972..f8f1611ff4 100644 --- a/be/test/util/broker_storage_backend_test.cpp +++ b/be/test/util/broker_storage_backend_test.cpp @@ -62,7 +62,7 @@ protected: virtual void TearDown() { remove(_test_file.c_str()); } std::string gen_uuid() { auto id = boost::uuids::random_generator()(); - return boost::lexical_cast(id); + return boost::uuids::to_string(id); } std::unique_ptr _broker; std::map _broker_properties; diff --git a/be/test/util/internal_queue_test.cpp b/be/test/util/internal_queue_test.cpp index 03d2139aa0..8a237394ef 100644 --- a/be/test/util/internal_queue_test.cpp +++ b/be/test/util/internal_queue_test.cpp @@ -21,11 +21,11 @@ #include #include -#include +#include #include "common/configbase.h" -#include "util/logging.h" #include "test_util/test_util.h" +#include "util/logging.h" using std::vector; using boost::thread; diff --git a/be/test/util/thread_pool_test.cpp b/be/test/util/thread_pool_test.cpp index edd2a353a2..fbdbaa698d 100644 --- a/be/test/util/thread_pool_test.cpp +++ b/be/test/util/thread_pool_test.cpp @@ -22,7 +22,7 @@ #include #include -#include +#include #include "util/logging.h" @@ -32,10 +32,10 @@ const int NUM_THREADS = 5; int g_thread_counters[NUM_THREADS]; // Per-thread mutex to ensure visibility of counters after thread pool terminates -boost::mutex g_thread_mutexes[NUM_THREADS]; +std::mutex g_thread_mutexes[NUM_THREADS]; void count(int thread_id, const int& i) { - boost::lock_guard l(g_thread_mutexes[thread_id]); + std::lock_guard l(g_thread_mutexes[thread_id]); g_thread_counters[thread_id] += i; } @@ -62,7 +62,7 @@ TEST(ThreadPoolTest, BasicTest) { int count = 0; for (int i = 0; i < NUM_THREADS; ++i) { - boost::lock_guard l(g_thread_mutexes[i]); + std::lock_guard l(g_thread_mutexes[i]); LOG(INFO) << "Counter " << i << ": " << g_thread_counters[i]; count += g_thread_counters[i]; }