From 58e58c94d81a4b4fafeb64617b822c98bb577e69 Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Fri, 15 Jan 2021 09:45:11 +0800 Subject: [PATCH] [TSAN] Fix tsan bugs (part 1) (#5162) ThreadSanitizer, aka TSAN, is a useful tool to detect multi-thread problems, such as data race, mutex problems, etc. We should detect TSAN problems for Doris BE, both unit tests and server should pass through TSAN mode, to make Doris more robustness. This is the very beginning patch to fix TSAN problems, and some difficult problems are suppressed in file 'tsan_suppressions', you can suppress these problems by setting: export TSAN_OPTIONS="suppressions=tsan_suppressions" before running: `BUILD_TYPE=tsan ./run-be-ut.sh --run` --- be/CMakeLists.txt | 4 +- be/src/exec/data_sink.cpp | 1 + be/src/exec/tablet_sink.cpp | 38 ++++----- be/src/exec/tablet_sink.h | 14 +--- be/src/http/ev_http_server.cpp | 33 ++++---- be/src/http/ev_http_server.h | 7 +- be/src/olap/data_dir.cpp | 101 ++++++++++++------------ be/src/olap/data_dir.h | 2 +- be/src/olap/olap_server.cpp | 8 +- be/src/olap/storage_engine.cpp | 1 + be/src/runtime/disk_io_mgr.h | 2 +- be/src/runtime/fragment_mgr.cpp | 9 ++- be/src/util/blocking_priority_queue.hpp | 8 +- be/src/util/condition_variable.cpp | 4 + be/src/util/debug/sanitizer_scopes.h | 46 +++++++++++ be/src/util/priority_thread_pool.hpp | 11 +-- be/src/util/runtime_profile.h | 11 ++- be/src/util/spinlock.h | 5 +- be/src/util/thread.cpp | 7 +- be/src/util/threadpool.cpp | 3 + be/test/exec/tablet_sink_test.cpp | 32 ++++---- be/test/plugin/plugin_zip_test.cpp | 2 - thirdparty/build-thirdparty.sh | 12 +++ thirdparty/vars.sh | 10 ++- tsan_suppressions | 24 ++++++ 25 files changed, 239 insertions(+), 156 deletions(-) create mode 100644 be/src/util/debug/sanitizer_scopes.h create mode 100644 tsan_suppressions diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index af30e50ac0..a9d0c6b557 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -315,7 +315,8 @@ SET(CXX_FLAGS_UBSAN "${CXX_GCC_FLAGS} -ggdb3 -O0 -gdwarf-2 -fno-wrapv -fsanitize # Set the flags to the thread sanitizer, also known as "tsan" # Turn on sanitizer and debug symbols to get stack traces: -SET(CXX_FLAGS_TSAN "${CXX_GCC_FLAGS} -O0 -ggdb3 -fsanitize=thread -DTHREAD_SANITIZER") +# Use -Wno-builtin-declaration-mismatch to mute warnings like "new declaration ‘__tsan_atomic16 __tsan_atomic16_fetch_nand(..." +SET(CXX_FLAGS_TSAN "${CXX_GCC_FLAGS} -O0 -ggdb3 -fsanitize=thread -DTHREAD_SANITIZER -Wno-builtin-declaration-mismatch") # Set compile flags based on the build type. if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") @@ -467,6 +468,7 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN") set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} -static-libubsan tcmalloc) elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN") set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} -static-libtsan) + add_definitions("-DTHREAD_SANITIZER") else() message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}") endif() diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 218e29f767..84c46ab1b6 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -159,6 +159,7 @@ Status DataSink::init(const TDataSink& thrift_sink) { Status DataSink::prepare(RuntimeState* state) { _expr_mem_tracker = MemTracker::CreateTracker( + // TODO(yingchun): use subclass' name -1, std::string("DataSink:") + std::to_string(state->load_job_id()), state->instance_mem_tracker()); return Status::OK(); diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 47305d9282..15677213be 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -27,6 +27,7 @@ #include "runtime/tuple_row.h" #include "service/brpc.h" #include "util/brpc_stub_cache.h" +#include "util/debug/sanitizer_scopes.h" #include "util/monotime.h" #include "util/uid_util.h" @@ -200,14 +201,14 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). while (!_cancelled && _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD) && _pending_batches_num > 0) { - SCOPED_RAW_TIMER(&_mem_exceeded_block_ns); + SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); SleepFor(MonoDelta::FromMilliseconds(10)); } auto row_no = _cur_batch->add_row(); if (row_no == RowBatch::INVALID_ROW_INDEX) { { - SCOPED_RAW_TIMER(&_queue_push_lock_ns); + SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); std::lock_guard l(_pending_batches_lock); //To simplify the add_row logic, postpone adding batch into req until the time of sending req _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); @@ -235,6 +236,7 @@ Status NodeChannel::mark_close() { _cur_add_batch_request.set_eos(true); { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; std::lock_guard l(_pending_batches_lock); _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); _pending_batches_num++; @@ -311,10 +313,11 @@ int NodeChannel::try_send_and_fetch_status() { } if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) { - SCOPED_RAW_TIMER(&_actual_consume_ns); + SCOPED_ATOMIC_TIMER(&_actual_consume_ns); AddBatchReq send_batch; { - std::lock_guard lg(_pending_batches_lock); + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; + std::lock_guard l(_pending_batches_lock); DCHECK(!_pending_batches.empty()); send_batch = std::move(_pending_batches.front()); _pending_batches.pop(); @@ -327,7 +330,7 @@ int NodeChannel::try_send_and_fetch_status() { // tablet_ids has already set when add row request.set_packet_seq(_next_packet_seq); if (row_batch->num_rows() > 0) { - SCOPED_RAW_TIMER(&_serialize_batch_ns); + SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); row_batch->serialize(request.mutable_row_batch()); } @@ -394,7 +397,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vectornode_ids) { NodeChannel* channel = nullptr; auto it = _node_channels.find(node_id); - if (it == std::end(_node_channels)) { + if (it == _node_channels.end()) { channel = _parent->_pool->add( new NodeChannel(_parent, _index_id, node_id, _schema_hash)); _node_channels.emplace(node_id, channel); @@ -414,7 +417,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vectorprint_load_info() - << ". error_msg=" << status.get_error_msg(); + << ". error_msg=" << s.get_error_msg(); } ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, &mem_exceeded_block_ns, &queue_push_lock_ns, @@ -733,7 +734,6 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_send_data_timer, _send_data_ns); COUNTER_SET(_convert_batch_timer, _convert_batch_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); - COUNTER_SET(_non_blocking_send_timer, _non_blocking_send_ns); COUNTER_SET(_serialize_batch_timer, serialize_batch_ns); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + @@ -939,7 +939,7 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* } void OlapTableSink::_send_batch_process() { - SCOPED_RAW_TIMER(&_non_blocking_send_ns); + SCOPED_TIMER(_non_blocking_send_timer); do { int running_channels_num = 0; for (auto index_channel : _channels) { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index d3ae181394..8d894c6da7 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -237,11 +237,10 @@ private: std::vector _tablet_commit_infos; AddBatchCounter _add_batch_counter; - int64_t _serialize_batch_ns = 0; - - int64_t _mem_exceeded_block_ns = 0; - int64_t _queue_push_lock_ns = 0; - int64_t _actual_consume_ns = 0; + std::atomic _serialize_batch_ns; + std::atomic _mem_exceeded_block_ns; + std::atomic _queue_push_lock_ns; + std::atomic _actual_consume_ns; }; class IndexChannel { @@ -328,12 +327,8 @@ private: // unique load id PUniqueId _load_id; int64_t _txn_id = -1; - int64_t _db_id = -1; - int64_t _table_id = -1; int _num_replicas = -1; bool _need_gen_rollup = false; - std::string _db_name; - std::string _table_name; int _tuple_desc_id = -1; // this is tuple descriptor of destination OLAP table @@ -378,7 +373,6 @@ private: int64_t _convert_batch_ns = 0; int64_t _validate_data_ns = 0; int64_t _send_data_ns = 0; - int64_t _non_blocking_send_ns = 0; int64_t _serialize_batch_ns = 0; int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index b2ff9ad693..e9582462bb 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -74,20 +74,15 @@ static int on_connection(struct evhttp_request* req, void* param) { EvHttpServer::EvHttpServer(int port, int num_workers) : _host("0.0.0.0"), _port(port), _num_workers(num_workers), _real_port(0) { DCHECK_GT(_num_workers, 0); - auto res = pthread_rwlock_init(&_rw_lock, nullptr); - DCHECK_EQ(res, 0); } EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers) : _host(host), _port(port), _num_workers(num_workers), _real_port(0) { DCHECK_GT(_num_workers, 0); - auto res = pthread_rwlock_init(&_rw_lock, nullptr); - DCHECK_EQ(res, 0); } EvHttpServer::~EvHttpServer() { stop(); - pthread_rwlock_destroy(&_rw_lock); } void EvHttpServer::start() { @@ -100,14 +95,17 @@ void EvHttpServer::start() { .build(&_workers); evthread_use_pthreads(); - event_bases.resize(_num_workers); + _event_bases.resize(_num_workers); for (int i = 0; i < _num_workers; ++i) { CHECK(_workers->submit_func([this, i]() { std::shared_ptr base(event_base_new(), [](event_base* base) { event_base_free(base); }); CHECK(base != nullptr) << "Couldn't create an event_base."; - event_bases[i] = base; + { + std::lock_guard lock(_event_bases_lock); + _event_bases[i] = base; + } /* Create a new evhttp object to handle requests. */ std::shared_ptr http(evhttp_new(base.get()), @@ -127,9 +125,13 @@ void EvHttpServer::start() { } void EvHttpServer::stop() { - for (int i = 0; i < _num_workers; ++i) { - LOG(WARNING) << "event_base_loopexit ret: " - << event_base_loopexit(event_bases[i].get(), nullptr); + { + std::lock_guard lock(_event_bases_lock); + for (int i = 0; i < _num_workers; ++i) { + LOG(WARNING) << "event_base_loopexit ret: " + << event_base_loopexit(_event_bases[i].get(), nullptr); + } + _event_bases.clear(); } _workers->shutdown(); close(_server_fd); @@ -180,7 +182,7 @@ bool EvHttpServer::register_handler(const HttpMethod& method, const std::string& } bool result = true; - pthread_rwlock_wrlock(&_rw_lock); + std::lock_guard lock(_handler_lock); PathTrie* root = nullptr; switch (method) { case GET: @@ -208,17 +210,15 @@ bool EvHttpServer::register_handler(const HttpMethod& method, const std::string& if (result) { result = root->insert(path, handler); } - pthread_rwlock_unlock(&_rw_lock); - + return result; } void EvHttpServer::register_static_file_handler(HttpHandler* handler) { DCHECK(handler != nullptr); DCHECK(_static_file_handler == nullptr); - pthread_rwlock_wrlock(&_rw_lock); + std::lock_guard lock(_handler_lock); _static_file_handler = handler; - pthread_rwlock_unlock(&_rw_lock); } int EvHttpServer::on_header(struct evhttp_request* ev_req) { @@ -258,7 +258,7 @@ HttpHandler* EvHttpServer::_find_handler(HttpRequest* req) { HttpHandler* handler = nullptr; - pthread_rwlock_rdlock(&_rw_lock); + std::lock_guard lock(_handler_lock); switch (req->method()) { case GET: _get_handlers.retrieve(path, &handler, req->params()); @@ -286,7 +286,6 @@ HttpHandler* EvHttpServer::_find_handler(HttpRequest* req) { LOG(WARNING) << "unknown HTTP method, method=" << req->method(); break; } - pthread_rwlock_unlock(&_rw_lock); return handler; } diff --git a/be/src/http/ev_http_server.h b/be/src/http/ev_http_server.h index d6ff3367f2..af25d5883b 100644 --- a/be/src/http/ev_http_server.h +++ b/be/src/http/ev_http_server.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -66,10 +67,10 @@ private: int _server_fd = -1; std::unique_ptr _workers; - std::vector> event_bases; - - pthread_rwlock_t _rw_lock; + std::mutex _event_bases_lock; // protect _event_bases + std::vector> _event_bases; + std::mutex _handler_lock; PathTrie _get_handlers; HttpHandler* _static_file_handler = nullptr; PathTrie _put_handlers; diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 9c20ddf0f8..4bf7b974be 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -114,8 +114,9 @@ Status DataDir::init() { } void DataDir::stop_bg_worker() { + std::unique_lock lck(_check_path_mutex); _stop_bg_worker = true; - _cv.notify_one(); + _check_path_cv.notify_one(); } Status DataDir::_init_cluster_id() { @@ -807,13 +808,13 @@ void DataDir::remove_pending_ids(const std::string& id) { // gc unused tablet schemahash dir void DataDir::perform_path_gc_by_tablet() { std::unique_lock lck(_check_path_mutex); - _cv.wait(lck, [this] { return _stop_bg_worker || !_all_tablet_schemahash_paths.empty(); }); + _check_path_cv.wait(lck, [this] { return _stop_bg_worker || !_all_tablet_schemahash_paths.empty(); }); if (_stop_bg_worker) { return; } LOG(INFO) << "start to path gc by tablet schemahash."; int counter = 0; - for (auto& path : _all_tablet_schemahash_paths) { + for (const auto& path : _all_tablet_schemahash_paths) { ++counter; if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) { SleepFor(MonoDelta::FromMilliseconds(config::path_gc_check_step_interval_ms)); @@ -857,13 +858,13 @@ void DataDir::perform_path_gc_by_rowsetid() { // init the set of valid path // validate the path in data dir std::unique_lock lck(_check_path_mutex); - _cv.wait(lck, [this] { return _stop_bg_worker || !_all_check_paths.empty(); }); + _check_path_cv.wait(lck, [this] { return _stop_bg_worker || !_all_check_paths.empty(); }); if (_stop_bg_worker) { return; } LOG(INFO) << "start to path gc by rowsetid."; int counter = 0; - for (auto& path : _all_check_paths) { + for (const auto& path : _all_check_paths) { ++counter; if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) { SleepFor(MonoDelta::FromMilliseconds(config::path_gc_check_step_interval_ms)); @@ -899,65 +900,63 @@ void DataDir::perform_path_gc_by_rowsetid() { // path producer void DataDir::perform_path_scan() { - { - std::unique_lock lck(_check_path_mutex); - if (!_all_check_paths.empty()) { - LOG(INFO) << "_all_check_paths is not empty when path scan."; - return; - } - LOG(INFO) << "start to scan data dir path:" << _path; - std::set shards; - std::string data_path = _path + DATA_PREFIX; + std::unique_lock lck(_check_path_mutex); + if (!_all_check_paths.empty()) { + LOG(INFO) << "_all_check_paths is not empty when path scan."; + return; + } + LOG(INFO) << "start to scan data dir path:" << _path; + std::set shards; + std::string data_path = _path + DATA_PREFIX; - Status ret = FileUtils::list_dirs_files(data_path, &shards, nullptr, Env::Default()); + Status ret = FileUtils::list_dirs_files(data_path, &shards, nullptr, Env::Default()); + if (!ret.ok()) { + LOG(WARNING) << "fail to walk dir. path=[" + data_path << "] error[" << ret.to_string() + << "]"; + return; + } + + for (const auto& shard : shards) { + std::string shard_path = data_path + "/" + shard; + std::set tablet_ids; + ret = FileUtils::list_dirs_files(shard_path, &tablet_ids, nullptr, Env::Default()); if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. path=[" + data_path << "] error[" << ret.to_string() - << "]"; - return; + LOG(WARNING) << "fail to walk dir. [path=" << shard_path << "] error[" + << ret.to_string() << "]"; + continue; } - - for (const auto& shard : shards) { - std::string shard_path = data_path + "/" + shard; - std::set tablet_ids; - ret = FileUtils::list_dirs_files(shard_path, &tablet_ids, nullptr, Env::Default()); + for (const auto& tablet_id : tablet_ids) { + std::string tablet_id_path = shard_path + "/" + tablet_id; + std::set schema_hashes; + ret = FileUtils::list_dirs_files(tablet_id_path, &schema_hashes, nullptr, + Env::Default()); if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. [path=" << shard_path << "] error[" - << ret.to_string() << "]"; + LOG(WARNING) << "fail to walk dir. [path=" << tablet_id_path << "]" + << " error[" << ret.to_string() << "]"; continue; } - for (const auto& tablet_id : tablet_ids) { - std::string tablet_id_path = shard_path + "/" + tablet_id; - std::set schema_hashes; - ret = FileUtils::list_dirs_files(tablet_id_path, &schema_hashes, nullptr, - Env::Default()); + for (const auto& schema_hash : schema_hashes) { + std::string tablet_schema_hash_path = tablet_id_path + "/" + schema_hash; + _all_tablet_schemahash_paths.insert(tablet_schema_hash_path); + + std::set rowset_files; + ret = FileUtils::list_dirs_files(tablet_schema_hash_path, nullptr, + &rowset_files, Env::Default()); if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. [path=" << tablet_id_path << "]" - << " error[" << ret.to_string() << "]"; + LOG(WARNING) << "fail to walk dir. [path=" << tablet_schema_hash_path + << "] error[" << ret.to_string() << "]"; continue; } - for (const auto& schema_hash : schema_hashes) { - std::string tablet_schema_hash_path = tablet_id_path + "/" + schema_hash; - _all_tablet_schemahash_paths.insert(tablet_schema_hash_path); - std::set rowset_files; - - ret = FileUtils::list_dirs_files(tablet_schema_hash_path, nullptr, - &rowset_files, Env::Default()); - if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. [path=" << tablet_schema_hash_path - << "] error[" << ret.to_string() << "]"; - continue; - } - for (const auto& rowset_file : rowset_files) { - std::string rowset_file_path = tablet_schema_hash_path + "/" + rowset_file; - _all_check_paths.insert(rowset_file_path); - } + for (const auto& rowset_file : rowset_files) { + std::string rowset_file_path = tablet_schema_hash_path + "/" + rowset_file; + _all_check_paths.insert(rowset_file_path); } } } - LOG(INFO) << "scan data dir path:" << _path - << " finished. path size:" << _all_check_paths.size(); } - _cv.notify_one(); + LOG(INFO) << "scan data dir path: " << _path + << " finished. path size: " << _all_check_paths.size() + _all_tablet_schemahash_paths.size(); + _check_path_cv.notify_one(); } void DataDir::_process_garbage_path(const std::string& path) { diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index a3c2d22b2a..c05b50e76c 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -190,7 +190,7 @@ private: RowsetIdGenerator* _id_generator = nullptr; std::mutex _check_path_mutex; - std::condition_variable _cv; + std::condition_variable _check_path_cv; std::set _all_check_paths; std::set _all_tablet_schemahash_paths; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 8f87c25af7..2026150732 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -324,7 +324,8 @@ void StorageEngine::_compaction_tasks_producer_callback() { int round = 0; CompactionType compaction_type; - while (true) { + int32_t interval = 1; + do { if (!config::disable_auto_compaction) { if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { compaction_type = CompactionType::CUMULATIVE_COMPACTION; @@ -387,10 +388,11 @@ void StorageEngine::_compaction_tasks_producer_callback() { tablet->reset_compaction(compaction_type); } } + interval = 1; } else { - sleep(config::check_auto_compaction_interval_seconds); + interval = config::check_auto_compaction_interval_seconds * 1000; } - } + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromMilliseconds(interval))); } std::vector StorageEngine::_compaction_tasks_generator( diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 0385650498..3b952c391d 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -510,6 +510,7 @@ void StorageEngine::stop() { thread->join(); \ } + THREAD_JOIN(_compaction_tasks_producer_thread); THREAD_JOIN(_unused_rowset_monitor_thread); THREAD_JOIN(_garbage_sweeper_thread); THREAD_JOIN(_disk_stat_monitor_thread); diff --git a/be/src/runtime/disk_io_mgr.h b/be/src/runtime/disk_io_mgr.h index e863107e40..86e6630b72 100644 --- a/be/src/runtime/disk_io_mgr.h +++ b/be/src/runtime/disk_io_mgr.h @@ -715,7 +715,7 @@ private: // True if the IoMgr should be torn down. Worker threads watch for this to // know to terminate. This variable is read/written to by different threads. - volatile bool _shut_down; + std::atomic _shut_down; // Total bytes read by the IoMgr. RuntimeProfile::Counter _total_bytes_read_counter; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d74875bf68..089921ac34 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -384,18 +384,19 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) return _fragment_map.size(); }); - CHECK(Thread::create( + auto s = Thread::create( "FragmentMgr", "cancel_timeout_plan_fragment", - [this]() { this->cancel_worker(); }, &_cancel_thread) - .ok()); + [this]() { this->cancel_worker(); }, &_cancel_thread); + CHECK(s.ok()) << s.to_string(); // TODO(zc): we need a better thread-pool // now one user can use all the thread pool, others have no resource. - ThreadPoolBuilder("FragmentMgrThreadPool") + s = ThreadPoolBuilder("FragmentMgrThreadPool") .set_min_threads(config::fragment_pool_thread_num_min) .set_max_threads(config::fragment_pool_thread_num_max) .set_max_queue_size(config::fragment_pool_queue_size) .build(&_thread_pool); + CHECK(s.ok()) << s.to_string(); } FragmentMgr::~FragmentMgr() { diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index c68dfbeed3..815fa969f5 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -140,11 +140,7 @@ public: // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. void shutdown() { - { - boost::lock_guard guard(_lock); - _shutdown = true; - } - + _shutdown = true; _get_cv.notify_all(); _put_cv.notify_all(); } @@ -167,7 +163,7 @@ public: } private: - bool _shutdown; + std::atomic _shutdown; const int _max_element; boost::condition_variable _get_cv; // 'get' callers wait on this boost::condition_variable _put_cv; // 'put' callers wait on this diff --git a/be/src/util/condition_variable.cpp b/be/src/util/condition_variable.cpp index aaf72cab84..ac763fce0f 100644 --- a/be/src/util/condition_variable.cpp +++ b/be/src/util/condition_variable.cpp @@ -11,6 +11,7 @@ #include #include "common/logging.h" +#include "util/debug/sanitizer_scopes.h" #include "util/monotime.h" #include "util/mutex.h" @@ -33,11 +34,13 @@ ConditionVariable::~ConditionVariable() { } void ConditionVariable::wait() const { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; int rv = pthread_cond_wait(&_condition, _user_mutex); DCHECK_EQ(0, rv); } bool ConditionVariable::wait_until(const MonoTime& until) const { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; // Have we already timed out? MonoTime now = MonoTime::Now(); if (now > until) { @@ -53,6 +56,7 @@ bool ConditionVariable::wait_until(const MonoTime& until) const { } bool ConditionVariable::wait_for(const MonoDelta& delta) const { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; // Negative delta means we've already timed out. int64_t nsecs = delta.ToNanoseconds(); if (nsecs < 0) { diff --git a/be/src/util/debug/sanitizer_scopes.h b/be/src/util/debug/sanitizer_scopes.h new file mode 100644 index 0000000000..363d6d7fce --- /dev/null +++ b/be/src/util/debug/sanitizer_scopes.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Wrappers around the annotations from gutil/dynamic_annotations.h, +// provided as C++-style scope guards. + +#pragma once + +#include "gutil/dynamic_annotations.h" +#include "gutil/macros.h" + +namespace doris { +namespace debug { + +// Scope guard which instructs TSAN to ignore all reads and writes +// on the current thread as long as it is alive. These may be safely +// nested. +class ScopedTSANIgnoreReadsAndWrites { + public: + ScopedTSANIgnoreReadsAndWrites() { + ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + } + ~ScopedTSANIgnoreReadsAndWrites() { + ANNOTATE_IGNORE_READS_AND_WRITES_END(); + } + private: + DISALLOW_COPY_AND_ASSIGN(ScopedTSANIgnoreReadsAndWrites); +}; + +} // namespace debug +} // namespace doris + diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 68f50d7e43..b0e836c6e4 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -97,10 +97,7 @@ public: // Returns once the shutdown flag has been set, does not wait for the threads to // terminate. void shutdown() { - { - boost::lock_guard l(_lock); - _shutdown = true; - } + _shutdown = true; _work_queue.shutdown(); } @@ -143,9 +140,7 @@ private: } } - // Returns value of _shutdown under a lock, forcing visibility to threads in the pool. bool is_shutdown() { - boost::lock_guard l(_lock); return _shutdown; } @@ -156,11 +151,11 @@ private: // Collection of worker threads that process work from the queue. boost::thread_group _threads; - // Guards _shutdown and _empty_cv + // Guards _empty_cv boost::mutex _lock; // Set to true when threads should stop doing work and terminate. - bool _shutdown; + std::atomic _shutdown; // Signalled when the queue becomes empty boost::condition_variable _empty_cv; diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 2bd2da759b..58eedb1669 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -52,7 +52,9 @@ namespace doris { #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \ ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled) #define SCOPED_RAW_TIMER(c) \ - ScopedRawTimer MACRO_CONCAT(SCOPED_RAW_TIMER, __COUNTER__)(c) + ScopedRawTimer MACRO_CONCAT(SCOPED_RAW_TIMER, __COUNTER__)(c) +#define SCOPED_ATOMIC_TIMER(c) \ + ScopedRawTimer> MACRO_CONCAT(SCOPED_ATOMIC_TIMER, __COUNTER__)(c) #define COUNTER_UPDATE(c, v) (c)->update(v) #define COUNTER_SET(c, v) (c)->set(v) #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->add_thread_counters(prefix) @@ -64,6 +66,7 @@ namespace doris { #define ADD_TIMER(profile, name) NULL #define SCOPED_TIMER(c) #define SCOPED_RAW_TIMER(c) +#define SCOPED_ATOMIC_TIMER(c) #define COUNTER_UPDATE(c, v) #define COUNTER_SET(c, v) #define ADD_THREADCOUNTERS(profile, prefix) NULL @@ -670,10 +673,10 @@ private: // Utility class to update time elapsed when the object goes out of scope. // 'T' must implement the stopWatch "interface" (start,stop,elapsed_time) but // we use templates not to pay for virtual function overhead. -template +template class ScopedRawTimer { public: - ScopedRawTimer(int64_t* counter) : _counter(counter) { _sw.start(); } + ScopedRawTimer(C* counter) : _counter(counter) { _sw.start(); } // Update counter when object is destroyed ~ScopedRawTimer() { *_counter += _sw.elapsed_time(); } @@ -683,7 +686,7 @@ private: ScopedRawTimer& operator=(const ScopedRawTimer& timer); T _sw; - int64_t* _counter; + C* _counter; }; } // namespace doris diff --git a/be/src/util/spinlock.h b/be/src/util/spinlock.h index 13a227fcfe..1329e5206b 100644 --- a/be/src/util/spinlock.h +++ b/be/src/util/spinlock.h @@ -38,10 +38,7 @@ public: } void unlock() { - // Memory barrier here. All updates before the unlock need to be made visible. - __sync_synchronize(); - DCHECK(_locked); - _locked = false; + __sync_bool_compare_and_swap(&_locked, true, false); } // Tries to acquire the lock diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp index ed978605e7..3c7a45bba2 100644 --- a/be/src/util/thread.cpp +++ b/be/src/util/thread.cpp @@ -35,6 +35,7 @@ #include "gutil/once.h" #include "gutil/strings/substitute.h" #include "olap/olap_define.h" +#include "util/debug/sanitizer_scopes.h" #include "util/easy_json.h" #include "util/mutex.h" #include "util/os_util.h" @@ -149,7 +150,7 @@ void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name, // relationship between thread functors, ignoring potential data races. // The annotations prevent this from happening. ANNOTATE_IGNORE_SYNC_BEGIN(); - ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; { MutexLock l(&_lock); _thread_categories[category][pthread_id] = ThreadDescriptor(category, name, tid); @@ -157,12 +158,11 @@ void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name, _threads_started_metric++; } ANNOTATE_IGNORE_SYNC_END(); - ANNOTATE_IGNORE_READS_AND_WRITES_END(); } void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& category) { ANNOTATE_IGNORE_SYNC_BEGIN(); - ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; { MutexLock l(&_lock); auto category_it = _thread_categories.find(category); @@ -171,7 +171,6 @@ void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& ca _threads_running_metric--; } ANNOTATE_IGNORE_SYNC_END(); - ANNOTATE_IGNORE_READS_AND_WRITES_END(); } void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args, diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index c9c9bed974..afe6f14f23 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -26,6 +26,7 @@ #include "gutil/map-util.h" #include "gutil/strings/substitute.h" #include "gutil/sysinfo.h" +#include "util/debug/sanitizer_scopes.h" #include "util/scoped_cleanup.h" #include "util/thread.h" @@ -278,6 +279,7 @@ Status ThreadPool::init() { } void ThreadPool::shutdown() { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; MutexLock unique_lock(&_lock); check_not_pool_thread_unlocked(); @@ -476,6 +478,7 @@ bool ThreadPool::wait_for(const MonoDelta& delta) { } void ThreadPool::dispatch_thread() { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; MutexLock unique_lock(&_lock); InsertOrDie(&_threads, Thread::current_thread()); DCHECK_GT(_num_threads_pending_start, 0); diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp index 54e3fea9a3..7b4e1120e6 100644 --- a/be/test/exec/tablet_sink_test.cpp +++ b/be/test/exec/tablet_sink_test.cpp @@ -307,27 +307,28 @@ public: const ::doris::PTransmitDataParams* request, ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done) override { - done->Run(); + brpc::ClosureGuard done_guard(done); } void tablet_writer_open(google::protobuf::RpcController* controller, const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) override { + brpc::ClosureGuard done_guard(done); Status status; status.to_protobuf(response->mutable_status()); - done->Run(); } void tablet_writer_add_batch(google::protobuf::RpcController* controller, const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) override { + brpc::ClosureGuard done_guard(done); { std::lock_guard l(_lock); - row_counters += request->tablet_ids_size(); + _row_counters += request->tablet_ids_size(); if (request->eos()) { - eof_counters++; + _eof_counters++; } k_add_batch_status.to_protobuf(response->mutable_status()); @@ -340,20 +341,19 @@ public: } } } - done->Run(); } void tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) override { - done->Run(); + brpc::ClosureGuard done_guard(done); } std::mutex _lock; - int64_t eof_counters = 0; - int64_t row_counters = 0; + int64_t _eof_counters = 0; + int64_t _row_counters = 0; RowDescriptor* _row_desc = nullptr; - std::set* _output_set; + std::set* _output_set = nullptr; }; TEST_F(OlapTableSinkTest, normal) { @@ -453,11 +453,11 @@ TEST_F(OlapTableSinkTest, normal) { ASSERT_TRUE(st.ok()); // close st = sink.close(&state, Status::OK()); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string(); // each node has a eof - ASSERT_EQ(2, service->eof_counters); - ASSERT_EQ(2 * 2, service->row_counters); + ASSERT_EQ(2, service->_eof_counters); + ASSERT_EQ(2 * 2, service->_row_counters); // 2node * 2 ASSERT_EQ(1, state.num_rows_load_filtered()); @@ -586,11 +586,11 @@ TEST_F(OlapTableSinkTest, convert) { ASSERT_TRUE(st.ok()); // close st = sink.close(&state, Status::OK()); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string(); // each node has a eof - ASSERT_EQ(2, service->eof_counters); - ASSERT_EQ(2 * 3, service->row_counters); + ASSERT_EQ(2, service->_eof_counters); + ASSERT_EQ(2 * 3, service->_row_counters); // 2node * 2 ASSERT_EQ(0, state.num_rows_load_filtered()); @@ -966,7 +966,7 @@ TEST_F(OlapTableSinkTest, decimal) { ASSERT_TRUE(st.ok()); // close st = sink.close(&state, Status::OK()); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string(); ASSERT_EQ(2, output_set.size()); ASSERT_TRUE(output_set.count("[(12 12.3)]") > 0); diff --git a/be/test/plugin/plugin_zip_test.cpp b/be/test/plugin/plugin_zip_test.cpp index 473a6c712a..7ae6c95cce 100644 --- a/be/test/plugin/plugin_zip_test.cpp +++ b/be/test/plugin/plugin_zip_test.cpp @@ -80,8 +80,6 @@ public: std::cout << "the path: " << _path << std::endl; } - ~PluginZipTest() { _server->stop(); }; - public: std::string _path; std::unique_ptr _server; diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index bbc0d9a200..d22407040b 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -699,6 +699,17 @@ build_js_and_css() { cp bootstrap-table.min.css $TP_INSTALL_DIR/webroot/Bootstrap-3.3.7/css } +build_tsan_header() { + cd $TP_SOURCE_DIR/ + if [[ ! -f $TSAN_HEADER_FILE ]]; then + echo "$TSAN_HEADER_FILE should exist." + exit 1 + fi + + mkdir -p $TP_INSTALL_DIR/include/sanitizer/ + cp $TSAN_HEADER_FILE $TP_INSTALL_DIR/include/sanitizer/ +} + # See https://github.com/apache/incubator-doris/issues/2910 # LLVM related codes have already be removed in master, so there is # no need to build llvm tool here. @@ -737,5 +748,6 @@ build_croaringbitmap build_orc build_cctz build_js_and_css +build_tsan_header echo "Finihsed to build all thirdparties" diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh index 3421d63f96..020bd55671 100644 --- a/thirdparty/vars.sh +++ b/thirdparty/vars.sh @@ -300,6 +300,12 @@ BOOTSTRAP_TABLE_CSS_NAME="bootstrap-table.min.css" BOOTSTRAP_TABLE_CSS_FILE="bootstrap-table.min.css" BOOTSTRAP_TABLE_CSS_MD5SUM="23389d4456da412e36bae30c469a766a" -# all thirdparties which need to be downloaded is set in array TP_ARCHIVES -export TP_ARCHIVES="LIBEVENT OPENSSL THRIFT LLVM CLANG COMPILER_RT PROTOBUF GFLAGS GLOG GTEST RAPIDJSON SNAPPY GPERFTOOLS ZLIB LZ4 BZIP LZO2 CURL RE2 BOOST MYSQL BOOST_FOR_MYSQL ODBC LEVELDB BRPC ROCKSDB LIBRDKAFKA FLATBUFFERS ARROW BROTLI DOUBLE_CONVERSION ZSTD S2 BITSHUFFLE CROARINGBITMAP ORC JEMALLOC CCTZ DATATABLES BOOTSTRAP_TABLE_JS BOOTSTRAP_TABLE_CSS" +# tsan_header +TSAN_HEADER_DOWNLOAD="https://gcc.gnu.org/git/?p=gcc.git;a=blob_plain;f=libsanitizer/include/sanitizer/tsan_interface_atomic.h;hb=refs/heads/releases/gcc-7" +TSAN_HEADER_NAME="tsan_interface_atomic.h" +TSAN_HEADER_FILE="tsan_interface_atomic.h" +TSAN_HEADER_MD5SUM="d72679bea167d6a513d959f5abd149dc" + +# all thirdparties which need to be downloaded is set in array TP_ARCHIVES +export TP_ARCHIVES="LIBEVENT OPENSSL THRIFT LLVM CLANG COMPILER_RT PROTOBUF GFLAGS GLOG GTEST RAPIDJSON SNAPPY GPERFTOOLS ZLIB LZ4 BZIP LZO2 CURL RE2 BOOST MYSQL BOOST_FOR_MYSQL ODBC LEVELDB BRPC ROCKSDB LIBRDKAFKA FLATBUFFERS ARROW BROTLI DOUBLE_CONVERSION ZSTD S2 BITSHUFFLE CROARINGBITMAP ORC JEMALLOC CCTZ DATATABLES BOOTSTRAP_TABLE_JS BOOTSTRAP_TABLE_CSS TSAN_HEADER" diff --git a/tsan_suppressions b/tsan_suppressions new file mode 100644 index 0000000000..a01346fe74 --- /dev/null +++ b/tsan_suppressions @@ -0,0 +1,24 @@ +mutex:boost::condition_variable::wait(boost::unique_lock&) +mutex:brpc::* +mutex:doris::ConditionVariable::wait_until(doris::MonoTime const&) const +mutex:doris::ConditionVariable::wait() const +race:boost::intrusive::list_node_traits::get_next(boost::intrusive::list_node const* const&) +race:brpc::* +race:butil::* +race:bvar::* +race:doris::CountDownLatch::wait_until(doris::MonoTime const&) const +race:doris::PBackendService::* +race:doris::PStatus::status_code() const +race:doris::PTabletWriterAddBatchResult::* +race:doris::PTabletWriterOpenResult::* +race:doris::RefCountClosure::unref() +race:doris::stream_load::TestInternalService::tablet_writer_add_batch(google::protobuf::RpcController*, doris::PTabletWriterAddBatchRequest const*, doris::PTabletWriterAddBatchResult*, google::protobuf::Closure*) +race:glog_internal_namespace_::* +race:google::protobuf::* +race:operator delete(void*) +race:std::_Bit_reference::operator bool() const +race:std::char_traits::compare(char const*, char const*, unsigned long) +race:std::char_traits::copy(char*, char const*, unsigned long) +race:std::lock_guard::lock_guard(int volatile&) +race:std::lock_guard::~lock_guard() +race:void google::protobuf::internal::RepeatedPtrFieldBase::Clear::TypeHandler>()