diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index af30e50ac0..a9d0c6b557 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -315,7 +315,8 @@ SET(CXX_FLAGS_UBSAN "${CXX_GCC_FLAGS} -ggdb3 -O0 -gdwarf-2 -fno-wrapv -fsanitize # Set the flags to the thread sanitizer, also known as "tsan" # Turn on sanitizer and debug symbols to get stack traces: -SET(CXX_FLAGS_TSAN "${CXX_GCC_FLAGS} -O0 -ggdb3 -fsanitize=thread -DTHREAD_SANITIZER") +# Use -Wno-builtin-declaration-mismatch to mute warnings like "new declaration ‘__tsan_atomic16 __tsan_atomic16_fetch_nand(..." +SET(CXX_FLAGS_TSAN "${CXX_GCC_FLAGS} -O0 -ggdb3 -fsanitize=thread -DTHREAD_SANITIZER -Wno-builtin-declaration-mismatch") # Set compile flags based on the build type. if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") @@ -467,6 +468,7 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN") set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} -static-libubsan tcmalloc) elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN") set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} -static-libtsan) + add_definitions("-DTHREAD_SANITIZER") else() message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}") endif() diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 218e29f767..84c46ab1b6 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -159,6 +159,7 @@ Status DataSink::init(const TDataSink& thrift_sink) { Status DataSink::prepare(RuntimeState* state) { _expr_mem_tracker = MemTracker::CreateTracker( + // TODO(yingchun): use subclass' name -1, std::string("DataSink:") + std::to_string(state->load_job_id()), state->instance_mem_tracker()); return Status::OK(); diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 47305d9282..15677213be 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -27,6 +27,7 @@ #include "runtime/tuple_row.h" #include "service/brpc.h" #include "util/brpc_stub_cache.h" +#include "util/debug/sanitizer_scopes.h" #include "util/monotime.h" #include "util/uid_util.h" @@ -200,14 +201,14 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). while (!_cancelled && _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD) && _pending_batches_num > 0) { - SCOPED_RAW_TIMER(&_mem_exceeded_block_ns); + SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); SleepFor(MonoDelta::FromMilliseconds(10)); } auto row_no = _cur_batch->add_row(); if (row_no == RowBatch::INVALID_ROW_INDEX) { { - SCOPED_RAW_TIMER(&_queue_push_lock_ns); + SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); std::lock_guard l(_pending_batches_lock); //To simplify the add_row logic, postpone adding batch into req until the time of sending req _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); @@ -235,6 +236,7 @@ Status NodeChannel::mark_close() { _cur_add_batch_request.set_eos(true); { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; std::lock_guard l(_pending_batches_lock); _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); _pending_batches_num++; @@ -311,10 +313,11 @@ int NodeChannel::try_send_and_fetch_status() { } if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) { - SCOPED_RAW_TIMER(&_actual_consume_ns); + SCOPED_ATOMIC_TIMER(&_actual_consume_ns); AddBatchReq send_batch; { - std::lock_guard lg(_pending_batches_lock); + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; + std::lock_guard l(_pending_batches_lock); DCHECK(!_pending_batches.empty()); send_batch = std::move(_pending_batches.front()); _pending_batches.pop(); @@ -327,7 +330,7 @@ int NodeChannel::try_send_and_fetch_status() { // tablet_ids has already set when add row request.set_packet_seq(_next_packet_seq); if (row_batch->num_rows() > 0) { - SCOPED_RAW_TIMER(&_serialize_batch_ns); + SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); row_batch->serialize(request.mutable_row_batch()); } @@ -394,7 +397,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vectornode_ids) { NodeChannel* channel = nullptr; auto it = _node_channels.find(node_id); - if (it == std::end(_node_channels)) { + if (it == _node_channels.end()) { channel = _parent->_pool->add( new NodeChannel(_parent, _index_id, node_id, _schema_hash)); _node_channels.emplace(node_id, channel); @@ -414,7 +417,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vectorprint_load_info() - << ". error_msg=" << status.get_error_msg(); + << ". error_msg=" << s.get_error_msg(); } ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, &mem_exceeded_block_ns, &queue_push_lock_ns, @@ -733,7 +734,6 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_send_data_timer, _send_data_ns); COUNTER_SET(_convert_batch_timer, _convert_batch_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); - COUNTER_SET(_non_blocking_send_timer, _non_blocking_send_ns); COUNTER_SET(_serialize_batch_timer, serialize_batch_ns); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + @@ -939,7 +939,7 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* } void OlapTableSink::_send_batch_process() { - SCOPED_RAW_TIMER(&_non_blocking_send_ns); + SCOPED_TIMER(_non_blocking_send_timer); do { int running_channels_num = 0; for (auto index_channel : _channels) { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index d3ae181394..8d894c6da7 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -237,11 +237,10 @@ private: std::vector _tablet_commit_infos; AddBatchCounter _add_batch_counter; - int64_t _serialize_batch_ns = 0; - - int64_t _mem_exceeded_block_ns = 0; - int64_t _queue_push_lock_ns = 0; - int64_t _actual_consume_ns = 0; + std::atomic _serialize_batch_ns; + std::atomic _mem_exceeded_block_ns; + std::atomic _queue_push_lock_ns; + std::atomic _actual_consume_ns; }; class IndexChannel { @@ -328,12 +327,8 @@ private: // unique load id PUniqueId _load_id; int64_t _txn_id = -1; - int64_t _db_id = -1; - int64_t _table_id = -1; int _num_replicas = -1; bool _need_gen_rollup = false; - std::string _db_name; - std::string _table_name; int _tuple_desc_id = -1; // this is tuple descriptor of destination OLAP table @@ -378,7 +373,6 @@ private: int64_t _convert_batch_ns = 0; int64_t _validate_data_ns = 0; int64_t _send_data_ns = 0; - int64_t _non_blocking_send_ns = 0; int64_t _serialize_batch_ns = 0; int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index b2ff9ad693..e9582462bb 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -74,20 +74,15 @@ static int on_connection(struct evhttp_request* req, void* param) { EvHttpServer::EvHttpServer(int port, int num_workers) : _host("0.0.0.0"), _port(port), _num_workers(num_workers), _real_port(0) { DCHECK_GT(_num_workers, 0); - auto res = pthread_rwlock_init(&_rw_lock, nullptr); - DCHECK_EQ(res, 0); } EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers) : _host(host), _port(port), _num_workers(num_workers), _real_port(0) { DCHECK_GT(_num_workers, 0); - auto res = pthread_rwlock_init(&_rw_lock, nullptr); - DCHECK_EQ(res, 0); } EvHttpServer::~EvHttpServer() { stop(); - pthread_rwlock_destroy(&_rw_lock); } void EvHttpServer::start() { @@ -100,14 +95,17 @@ void EvHttpServer::start() { .build(&_workers); evthread_use_pthreads(); - event_bases.resize(_num_workers); + _event_bases.resize(_num_workers); for (int i = 0; i < _num_workers; ++i) { CHECK(_workers->submit_func([this, i]() { std::shared_ptr base(event_base_new(), [](event_base* base) { event_base_free(base); }); CHECK(base != nullptr) << "Couldn't create an event_base."; - event_bases[i] = base; + { + std::lock_guard lock(_event_bases_lock); + _event_bases[i] = base; + } /* Create a new evhttp object to handle requests. */ std::shared_ptr http(evhttp_new(base.get()), @@ -127,9 +125,13 @@ void EvHttpServer::start() { } void EvHttpServer::stop() { - for (int i = 0; i < _num_workers; ++i) { - LOG(WARNING) << "event_base_loopexit ret: " - << event_base_loopexit(event_bases[i].get(), nullptr); + { + std::lock_guard lock(_event_bases_lock); + for (int i = 0; i < _num_workers; ++i) { + LOG(WARNING) << "event_base_loopexit ret: " + << event_base_loopexit(_event_bases[i].get(), nullptr); + } + _event_bases.clear(); } _workers->shutdown(); close(_server_fd); @@ -180,7 +182,7 @@ bool EvHttpServer::register_handler(const HttpMethod& method, const std::string& } bool result = true; - pthread_rwlock_wrlock(&_rw_lock); + std::lock_guard lock(_handler_lock); PathTrie* root = nullptr; switch (method) { case GET: @@ -208,17 +210,15 @@ bool EvHttpServer::register_handler(const HttpMethod& method, const std::string& if (result) { result = root->insert(path, handler); } - pthread_rwlock_unlock(&_rw_lock); - + return result; } void EvHttpServer::register_static_file_handler(HttpHandler* handler) { DCHECK(handler != nullptr); DCHECK(_static_file_handler == nullptr); - pthread_rwlock_wrlock(&_rw_lock); + std::lock_guard lock(_handler_lock); _static_file_handler = handler; - pthread_rwlock_unlock(&_rw_lock); } int EvHttpServer::on_header(struct evhttp_request* ev_req) { @@ -258,7 +258,7 @@ HttpHandler* EvHttpServer::_find_handler(HttpRequest* req) { HttpHandler* handler = nullptr; - pthread_rwlock_rdlock(&_rw_lock); + std::lock_guard lock(_handler_lock); switch (req->method()) { case GET: _get_handlers.retrieve(path, &handler, req->params()); @@ -286,7 +286,6 @@ HttpHandler* EvHttpServer::_find_handler(HttpRequest* req) { LOG(WARNING) << "unknown HTTP method, method=" << req->method(); break; } - pthread_rwlock_unlock(&_rw_lock); return handler; } diff --git a/be/src/http/ev_http_server.h b/be/src/http/ev_http_server.h index d6ff3367f2..af25d5883b 100644 --- a/be/src/http/ev_http_server.h +++ b/be/src/http/ev_http_server.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -66,10 +67,10 @@ private: int _server_fd = -1; std::unique_ptr _workers; - std::vector> event_bases; - - pthread_rwlock_t _rw_lock; + std::mutex _event_bases_lock; // protect _event_bases + std::vector> _event_bases; + std::mutex _handler_lock; PathTrie _get_handlers; HttpHandler* _static_file_handler = nullptr; PathTrie _put_handlers; diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 9c20ddf0f8..4bf7b974be 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -114,8 +114,9 @@ Status DataDir::init() { } void DataDir::stop_bg_worker() { + std::unique_lock lck(_check_path_mutex); _stop_bg_worker = true; - _cv.notify_one(); + _check_path_cv.notify_one(); } Status DataDir::_init_cluster_id() { @@ -807,13 +808,13 @@ void DataDir::remove_pending_ids(const std::string& id) { // gc unused tablet schemahash dir void DataDir::perform_path_gc_by_tablet() { std::unique_lock lck(_check_path_mutex); - _cv.wait(lck, [this] { return _stop_bg_worker || !_all_tablet_schemahash_paths.empty(); }); + _check_path_cv.wait(lck, [this] { return _stop_bg_worker || !_all_tablet_schemahash_paths.empty(); }); if (_stop_bg_worker) { return; } LOG(INFO) << "start to path gc by tablet schemahash."; int counter = 0; - for (auto& path : _all_tablet_schemahash_paths) { + for (const auto& path : _all_tablet_schemahash_paths) { ++counter; if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) { SleepFor(MonoDelta::FromMilliseconds(config::path_gc_check_step_interval_ms)); @@ -857,13 +858,13 @@ void DataDir::perform_path_gc_by_rowsetid() { // init the set of valid path // validate the path in data dir std::unique_lock lck(_check_path_mutex); - _cv.wait(lck, [this] { return _stop_bg_worker || !_all_check_paths.empty(); }); + _check_path_cv.wait(lck, [this] { return _stop_bg_worker || !_all_check_paths.empty(); }); if (_stop_bg_worker) { return; } LOG(INFO) << "start to path gc by rowsetid."; int counter = 0; - for (auto& path : _all_check_paths) { + for (const auto& path : _all_check_paths) { ++counter; if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) { SleepFor(MonoDelta::FromMilliseconds(config::path_gc_check_step_interval_ms)); @@ -899,65 +900,63 @@ void DataDir::perform_path_gc_by_rowsetid() { // path producer void DataDir::perform_path_scan() { - { - std::unique_lock lck(_check_path_mutex); - if (!_all_check_paths.empty()) { - LOG(INFO) << "_all_check_paths is not empty when path scan."; - return; - } - LOG(INFO) << "start to scan data dir path:" << _path; - std::set shards; - std::string data_path = _path + DATA_PREFIX; + std::unique_lock lck(_check_path_mutex); + if (!_all_check_paths.empty()) { + LOG(INFO) << "_all_check_paths is not empty when path scan."; + return; + } + LOG(INFO) << "start to scan data dir path:" << _path; + std::set shards; + std::string data_path = _path + DATA_PREFIX; - Status ret = FileUtils::list_dirs_files(data_path, &shards, nullptr, Env::Default()); + Status ret = FileUtils::list_dirs_files(data_path, &shards, nullptr, Env::Default()); + if (!ret.ok()) { + LOG(WARNING) << "fail to walk dir. path=[" + data_path << "] error[" << ret.to_string() + << "]"; + return; + } + + for (const auto& shard : shards) { + std::string shard_path = data_path + "/" + shard; + std::set tablet_ids; + ret = FileUtils::list_dirs_files(shard_path, &tablet_ids, nullptr, Env::Default()); if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. path=[" + data_path << "] error[" << ret.to_string() - << "]"; - return; + LOG(WARNING) << "fail to walk dir. [path=" << shard_path << "] error[" + << ret.to_string() << "]"; + continue; } - - for (const auto& shard : shards) { - std::string shard_path = data_path + "/" + shard; - std::set tablet_ids; - ret = FileUtils::list_dirs_files(shard_path, &tablet_ids, nullptr, Env::Default()); + for (const auto& tablet_id : tablet_ids) { + std::string tablet_id_path = shard_path + "/" + tablet_id; + std::set schema_hashes; + ret = FileUtils::list_dirs_files(tablet_id_path, &schema_hashes, nullptr, + Env::Default()); if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. [path=" << shard_path << "] error[" - << ret.to_string() << "]"; + LOG(WARNING) << "fail to walk dir. [path=" << tablet_id_path << "]" + << " error[" << ret.to_string() << "]"; continue; } - for (const auto& tablet_id : tablet_ids) { - std::string tablet_id_path = shard_path + "/" + tablet_id; - std::set schema_hashes; - ret = FileUtils::list_dirs_files(tablet_id_path, &schema_hashes, nullptr, - Env::Default()); + for (const auto& schema_hash : schema_hashes) { + std::string tablet_schema_hash_path = tablet_id_path + "/" + schema_hash; + _all_tablet_schemahash_paths.insert(tablet_schema_hash_path); + + std::set rowset_files; + ret = FileUtils::list_dirs_files(tablet_schema_hash_path, nullptr, + &rowset_files, Env::Default()); if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. [path=" << tablet_id_path << "]" - << " error[" << ret.to_string() << "]"; + LOG(WARNING) << "fail to walk dir. [path=" << tablet_schema_hash_path + << "] error[" << ret.to_string() << "]"; continue; } - for (const auto& schema_hash : schema_hashes) { - std::string tablet_schema_hash_path = tablet_id_path + "/" + schema_hash; - _all_tablet_schemahash_paths.insert(tablet_schema_hash_path); - std::set rowset_files; - - ret = FileUtils::list_dirs_files(tablet_schema_hash_path, nullptr, - &rowset_files, Env::Default()); - if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. [path=" << tablet_schema_hash_path - << "] error[" << ret.to_string() << "]"; - continue; - } - for (const auto& rowset_file : rowset_files) { - std::string rowset_file_path = tablet_schema_hash_path + "/" + rowset_file; - _all_check_paths.insert(rowset_file_path); - } + for (const auto& rowset_file : rowset_files) { + std::string rowset_file_path = tablet_schema_hash_path + "/" + rowset_file; + _all_check_paths.insert(rowset_file_path); } } } - LOG(INFO) << "scan data dir path:" << _path - << " finished. path size:" << _all_check_paths.size(); } - _cv.notify_one(); + LOG(INFO) << "scan data dir path: " << _path + << " finished. path size: " << _all_check_paths.size() + _all_tablet_schemahash_paths.size(); + _check_path_cv.notify_one(); } void DataDir::_process_garbage_path(const std::string& path) { diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index a3c2d22b2a..c05b50e76c 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -190,7 +190,7 @@ private: RowsetIdGenerator* _id_generator = nullptr; std::mutex _check_path_mutex; - std::condition_variable _cv; + std::condition_variable _check_path_cv; std::set _all_check_paths; std::set _all_tablet_schemahash_paths; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 8f87c25af7..2026150732 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -324,7 +324,8 @@ void StorageEngine::_compaction_tasks_producer_callback() { int round = 0; CompactionType compaction_type; - while (true) { + int32_t interval = 1; + do { if (!config::disable_auto_compaction) { if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { compaction_type = CompactionType::CUMULATIVE_COMPACTION; @@ -387,10 +388,11 @@ void StorageEngine::_compaction_tasks_producer_callback() { tablet->reset_compaction(compaction_type); } } + interval = 1; } else { - sleep(config::check_auto_compaction_interval_seconds); + interval = config::check_auto_compaction_interval_seconds * 1000; } - } + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromMilliseconds(interval))); } std::vector StorageEngine::_compaction_tasks_generator( diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 0385650498..3b952c391d 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -510,6 +510,7 @@ void StorageEngine::stop() { thread->join(); \ } + THREAD_JOIN(_compaction_tasks_producer_thread); THREAD_JOIN(_unused_rowset_monitor_thread); THREAD_JOIN(_garbage_sweeper_thread); THREAD_JOIN(_disk_stat_monitor_thread); diff --git a/be/src/runtime/disk_io_mgr.h b/be/src/runtime/disk_io_mgr.h index e863107e40..86e6630b72 100644 --- a/be/src/runtime/disk_io_mgr.h +++ b/be/src/runtime/disk_io_mgr.h @@ -715,7 +715,7 @@ private: // True if the IoMgr should be torn down. Worker threads watch for this to // know to terminate. This variable is read/written to by different threads. - volatile bool _shut_down; + std::atomic _shut_down; // Total bytes read by the IoMgr. RuntimeProfile::Counter _total_bytes_read_counter; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d74875bf68..089921ac34 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -384,18 +384,19 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) return _fragment_map.size(); }); - CHECK(Thread::create( + auto s = Thread::create( "FragmentMgr", "cancel_timeout_plan_fragment", - [this]() { this->cancel_worker(); }, &_cancel_thread) - .ok()); + [this]() { this->cancel_worker(); }, &_cancel_thread); + CHECK(s.ok()) << s.to_string(); // TODO(zc): we need a better thread-pool // now one user can use all the thread pool, others have no resource. - ThreadPoolBuilder("FragmentMgrThreadPool") + s = ThreadPoolBuilder("FragmentMgrThreadPool") .set_min_threads(config::fragment_pool_thread_num_min) .set_max_threads(config::fragment_pool_thread_num_max) .set_max_queue_size(config::fragment_pool_queue_size) .build(&_thread_pool); + CHECK(s.ok()) << s.to_string(); } FragmentMgr::~FragmentMgr() { diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index c68dfbeed3..815fa969f5 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -140,11 +140,7 @@ public: // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. void shutdown() { - { - boost::lock_guard guard(_lock); - _shutdown = true; - } - + _shutdown = true; _get_cv.notify_all(); _put_cv.notify_all(); } @@ -167,7 +163,7 @@ public: } private: - bool _shutdown; + std::atomic _shutdown; const int _max_element; boost::condition_variable _get_cv; // 'get' callers wait on this boost::condition_variable _put_cv; // 'put' callers wait on this diff --git a/be/src/util/condition_variable.cpp b/be/src/util/condition_variable.cpp index aaf72cab84..ac763fce0f 100644 --- a/be/src/util/condition_variable.cpp +++ b/be/src/util/condition_variable.cpp @@ -11,6 +11,7 @@ #include #include "common/logging.h" +#include "util/debug/sanitizer_scopes.h" #include "util/monotime.h" #include "util/mutex.h" @@ -33,11 +34,13 @@ ConditionVariable::~ConditionVariable() { } void ConditionVariable::wait() const { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; int rv = pthread_cond_wait(&_condition, _user_mutex); DCHECK_EQ(0, rv); } bool ConditionVariable::wait_until(const MonoTime& until) const { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; // Have we already timed out? MonoTime now = MonoTime::Now(); if (now > until) { @@ -53,6 +56,7 @@ bool ConditionVariable::wait_until(const MonoTime& until) const { } bool ConditionVariable::wait_for(const MonoDelta& delta) const { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; // Negative delta means we've already timed out. int64_t nsecs = delta.ToNanoseconds(); if (nsecs < 0) { diff --git a/be/src/util/debug/sanitizer_scopes.h b/be/src/util/debug/sanitizer_scopes.h new file mode 100644 index 0000000000..363d6d7fce --- /dev/null +++ b/be/src/util/debug/sanitizer_scopes.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Wrappers around the annotations from gutil/dynamic_annotations.h, +// provided as C++-style scope guards. + +#pragma once + +#include "gutil/dynamic_annotations.h" +#include "gutil/macros.h" + +namespace doris { +namespace debug { + +// Scope guard which instructs TSAN to ignore all reads and writes +// on the current thread as long as it is alive. These may be safely +// nested. +class ScopedTSANIgnoreReadsAndWrites { + public: + ScopedTSANIgnoreReadsAndWrites() { + ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + } + ~ScopedTSANIgnoreReadsAndWrites() { + ANNOTATE_IGNORE_READS_AND_WRITES_END(); + } + private: + DISALLOW_COPY_AND_ASSIGN(ScopedTSANIgnoreReadsAndWrites); +}; + +} // namespace debug +} // namespace doris + diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 68f50d7e43..b0e836c6e4 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -97,10 +97,7 @@ public: // Returns once the shutdown flag has been set, does not wait for the threads to // terminate. void shutdown() { - { - boost::lock_guard l(_lock); - _shutdown = true; - } + _shutdown = true; _work_queue.shutdown(); } @@ -143,9 +140,7 @@ private: } } - // Returns value of _shutdown under a lock, forcing visibility to threads in the pool. bool is_shutdown() { - boost::lock_guard l(_lock); return _shutdown; } @@ -156,11 +151,11 @@ private: // Collection of worker threads that process work from the queue. boost::thread_group _threads; - // Guards _shutdown and _empty_cv + // Guards _empty_cv boost::mutex _lock; // Set to true when threads should stop doing work and terminate. - bool _shutdown; + std::atomic _shutdown; // Signalled when the queue becomes empty boost::condition_variable _empty_cv; diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 2bd2da759b..58eedb1669 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -52,7 +52,9 @@ namespace doris { #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \ ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled) #define SCOPED_RAW_TIMER(c) \ - ScopedRawTimer MACRO_CONCAT(SCOPED_RAW_TIMER, __COUNTER__)(c) + ScopedRawTimer MACRO_CONCAT(SCOPED_RAW_TIMER, __COUNTER__)(c) +#define SCOPED_ATOMIC_TIMER(c) \ + ScopedRawTimer> MACRO_CONCAT(SCOPED_ATOMIC_TIMER, __COUNTER__)(c) #define COUNTER_UPDATE(c, v) (c)->update(v) #define COUNTER_SET(c, v) (c)->set(v) #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->add_thread_counters(prefix) @@ -64,6 +66,7 @@ namespace doris { #define ADD_TIMER(profile, name) NULL #define SCOPED_TIMER(c) #define SCOPED_RAW_TIMER(c) +#define SCOPED_ATOMIC_TIMER(c) #define COUNTER_UPDATE(c, v) #define COUNTER_SET(c, v) #define ADD_THREADCOUNTERS(profile, prefix) NULL @@ -670,10 +673,10 @@ private: // Utility class to update time elapsed when the object goes out of scope. // 'T' must implement the stopWatch "interface" (start,stop,elapsed_time) but // we use templates not to pay for virtual function overhead. -template +template class ScopedRawTimer { public: - ScopedRawTimer(int64_t* counter) : _counter(counter) { _sw.start(); } + ScopedRawTimer(C* counter) : _counter(counter) { _sw.start(); } // Update counter when object is destroyed ~ScopedRawTimer() { *_counter += _sw.elapsed_time(); } @@ -683,7 +686,7 @@ private: ScopedRawTimer& operator=(const ScopedRawTimer& timer); T _sw; - int64_t* _counter; + C* _counter; }; } // namespace doris diff --git a/be/src/util/spinlock.h b/be/src/util/spinlock.h index 13a227fcfe..1329e5206b 100644 --- a/be/src/util/spinlock.h +++ b/be/src/util/spinlock.h @@ -38,10 +38,7 @@ public: } void unlock() { - // Memory barrier here. All updates before the unlock need to be made visible. - __sync_synchronize(); - DCHECK(_locked); - _locked = false; + __sync_bool_compare_and_swap(&_locked, true, false); } // Tries to acquire the lock diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp index ed978605e7..3c7a45bba2 100644 --- a/be/src/util/thread.cpp +++ b/be/src/util/thread.cpp @@ -35,6 +35,7 @@ #include "gutil/once.h" #include "gutil/strings/substitute.h" #include "olap/olap_define.h" +#include "util/debug/sanitizer_scopes.h" #include "util/easy_json.h" #include "util/mutex.h" #include "util/os_util.h" @@ -149,7 +150,7 @@ void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name, // relationship between thread functors, ignoring potential data races. // The annotations prevent this from happening. ANNOTATE_IGNORE_SYNC_BEGIN(); - ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; { MutexLock l(&_lock); _thread_categories[category][pthread_id] = ThreadDescriptor(category, name, tid); @@ -157,12 +158,11 @@ void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name, _threads_started_metric++; } ANNOTATE_IGNORE_SYNC_END(); - ANNOTATE_IGNORE_READS_AND_WRITES_END(); } void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& category) { ANNOTATE_IGNORE_SYNC_BEGIN(); - ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; { MutexLock l(&_lock); auto category_it = _thread_categories.find(category); @@ -171,7 +171,6 @@ void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& ca _threads_running_metric--; } ANNOTATE_IGNORE_SYNC_END(); - ANNOTATE_IGNORE_READS_AND_WRITES_END(); } void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args, diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index c9c9bed974..afe6f14f23 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -26,6 +26,7 @@ #include "gutil/map-util.h" #include "gutil/strings/substitute.h" #include "gutil/sysinfo.h" +#include "util/debug/sanitizer_scopes.h" #include "util/scoped_cleanup.h" #include "util/thread.h" @@ -278,6 +279,7 @@ Status ThreadPool::init() { } void ThreadPool::shutdown() { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; MutexLock unique_lock(&_lock); check_not_pool_thread_unlocked(); @@ -476,6 +478,7 @@ bool ThreadPool::wait_for(const MonoDelta& delta) { } void ThreadPool::dispatch_thread() { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; MutexLock unique_lock(&_lock); InsertOrDie(&_threads, Thread::current_thread()); DCHECK_GT(_num_threads_pending_start, 0); diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp index 54e3fea9a3..7b4e1120e6 100644 --- a/be/test/exec/tablet_sink_test.cpp +++ b/be/test/exec/tablet_sink_test.cpp @@ -307,27 +307,28 @@ public: const ::doris::PTransmitDataParams* request, ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done) override { - done->Run(); + brpc::ClosureGuard done_guard(done); } void tablet_writer_open(google::protobuf::RpcController* controller, const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) override { + brpc::ClosureGuard done_guard(done); Status status; status.to_protobuf(response->mutable_status()); - done->Run(); } void tablet_writer_add_batch(google::protobuf::RpcController* controller, const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) override { + brpc::ClosureGuard done_guard(done); { std::lock_guard l(_lock); - row_counters += request->tablet_ids_size(); + _row_counters += request->tablet_ids_size(); if (request->eos()) { - eof_counters++; + _eof_counters++; } k_add_batch_status.to_protobuf(response->mutable_status()); @@ -340,20 +341,19 @@ public: } } } - done->Run(); } void tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) override { - done->Run(); + brpc::ClosureGuard done_guard(done); } std::mutex _lock; - int64_t eof_counters = 0; - int64_t row_counters = 0; + int64_t _eof_counters = 0; + int64_t _row_counters = 0; RowDescriptor* _row_desc = nullptr; - std::set* _output_set; + std::set* _output_set = nullptr; }; TEST_F(OlapTableSinkTest, normal) { @@ -453,11 +453,11 @@ TEST_F(OlapTableSinkTest, normal) { ASSERT_TRUE(st.ok()); // close st = sink.close(&state, Status::OK()); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string(); // each node has a eof - ASSERT_EQ(2, service->eof_counters); - ASSERT_EQ(2 * 2, service->row_counters); + ASSERT_EQ(2, service->_eof_counters); + ASSERT_EQ(2 * 2, service->_row_counters); // 2node * 2 ASSERT_EQ(1, state.num_rows_load_filtered()); @@ -586,11 +586,11 @@ TEST_F(OlapTableSinkTest, convert) { ASSERT_TRUE(st.ok()); // close st = sink.close(&state, Status::OK()); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string(); // each node has a eof - ASSERT_EQ(2, service->eof_counters); - ASSERT_EQ(2 * 3, service->row_counters); + ASSERT_EQ(2, service->_eof_counters); + ASSERT_EQ(2 * 3, service->_row_counters); // 2node * 2 ASSERT_EQ(0, state.num_rows_load_filtered()); @@ -966,7 +966,7 @@ TEST_F(OlapTableSinkTest, decimal) { ASSERT_TRUE(st.ok()); // close st = sink.close(&state, Status::OK()); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string(); ASSERT_EQ(2, output_set.size()); ASSERT_TRUE(output_set.count("[(12 12.3)]") > 0); diff --git a/be/test/plugin/plugin_zip_test.cpp b/be/test/plugin/plugin_zip_test.cpp index 473a6c712a..7ae6c95cce 100644 --- a/be/test/plugin/plugin_zip_test.cpp +++ b/be/test/plugin/plugin_zip_test.cpp @@ -80,8 +80,6 @@ public: std::cout << "the path: " << _path << std::endl; } - ~PluginZipTest() { _server->stop(); }; - public: std::string _path; std::unique_ptr _server; diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index bbc0d9a200..d22407040b 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -699,6 +699,17 @@ build_js_and_css() { cp bootstrap-table.min.css $TP_INSTALL_DIR/webroot/Bootstrap-3.3.7/css } +build_tsan_header() { + cd $TP_SOURCE_DIR/ + if [[ ! -f $TSAN_HEADER_FILE ]]; then + echo "$TSAN_HEADER_FILE should exist." + exit 1 + fi + + mkdir -p $TP_INSTALL_DIR/include/sanitizer/ + cp $TSAN_HEADER_FILE $TP_INSTALL_DIR/include/sanitizer/ +} + # See https://github.com/apache/incubator-doris/issues/2910 # LLVM related codes have already be removed in master, so there is # no need to build llvm tool here. @@ -737,5 +748,6 @@ build_croaringbitmap build_orc build_cctz build_js_and_css +build_tsan_header echo "Finihsed to build all thirdparties" diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh index 3421d63f96..020bd55671 100644 --- a/thirdparty/vars.sh +++ b/thirdparty/vars.sh @@ -300,6 +300,12 @@ BOOTSTRAP_TABLE_CSS_NAME="bootstrap-table.min.css" BOOTSTRAP_TABLE_CSS_FILE="bootstrap-table.min.css" BOOTSTRAP_TABLE_CSS_MD5SUM="23389d4456da412e36bae30c469a766a" -# all thirdparties which need to be downloaded is set in array TP_ARCHIVES -export TP_ARCHIVES="LIBEVENT OPENSSL THRIFT LLVM CLANG COMPILER_RT PROTOBUF GFLAGS GLOG GTEST RAPIDJSON SNAPPY GPERFTOOLS ZLIB LZ4 BZIP LZO2 CURL RE2 BOOST MYSQL BOOST_FOR_MYSQL ODBC LEVELDB BRPC ROCKSDB LIBRDKAFKA FLATBUFFERS ARROW BROTLI DOUBLE_CONVERSION ZSTD S2 BITSHUFFLE CROARINGBITMAP ORC JEMALLOC CCTZ DATATABLES BOOTSTRAP_TABLE_JS BOOTSTRAP_TABLE_CSS" +# tsan_header +TSAN_HEADER_DOWNLOAD="https://gcc.gnu.org/git/?p=gcc.git;a=blob_plain;f=libsanitizer/include/sanitizer/tsan_interface_atomic.h;hb=refs/heads/releases/gcc-7" +TSAN_HEADER_NAME="tsan_interface_atomic.h" +TSAN_HEADER_FILE="tsan_interface_atomic.h" +TSAN_HEADER_MD5SUM="d72679bea167d6a513d959f5abd149dc" + +# all thirdparties which need to be downloaded is set in array TP_ARCHIVES +export TP_ARCHIVES="LIBEVENT OPENSSL THRIFT LLVM CLANG COMPILER_RT PROTOBUF GFLAGS GLOG GTEST RAPIDJSON SNAPPY GPERFTOOLS ZLIB LZ4 BZIP LZO2 CURL RE2 BOOST MYSQL BOOST_FOR_MYSQL ODBC LEVELDB BRPC ROCKSDB LIBRDKAFKA FLATBUFFERS ARROW BROTLI DOUBLE_CONVERSION ZSTD S2 BITSHUFFLE CROARINGBITMAP ORC JEMALLOC CCTZ DATATABLES BOOTSTRAP_TABLE_JS BOOTSTRAP_TABLE_CSS TSAN_HEADER" diff --git a/tsan_suppressions b/tsan_suppressions new file mode 100644 index 0000000000..a01346fe74 --- /dev/null +++ b/tsan_suppressions @@ -0,0 +1,24 @@ +mutex:boost::condition_variable::wait(boost::unique_lock&) +mutex:brpc::* +mutex:doris::ConditionVariable::wait_until(doris::MonoTime const&) const +mutex:doris::ConditionVariable::wait() const +race:boost::intrusive::list_node_traits::get_next(boost::intrusive::list_node const* const&) +race:brpc::* +race:butil::* +race:bvar::* +race:doris::CountDownLatch::wait_until(doris::MonoTime const&) const +race:doris::PBackendService::* +race:doris::PStatus::status_code() const +race:doris::PTabletWriterAddBatchResult::* +race:doris::PTabletWriterOpenResult::* +race:doris::RefCountClosure::unref() +race:doris::stream_load::TestInternalService::tablet_writer_add_batch(google::protobuf::RpcController*, doris::PTabletWriterAddBatchRequest const*, doris::PTabletWriterAddBatchResult*, google::protobuf::Closure*) +race:glog_internal_namespace_::* +race:google::protobuf::* +race:operator delete(void*) +race:std::_Bit_reference::operator bool() const +race:std::char_traits::compare(char const*, char const*, unsigned long) +race:std::char_traits::copy(char*, char const*, unsigned long) +race:std::lock_guard::lock_guard(int volatile&) +race:std::lock_guard::~lock_guard() +race:void google::protobuf::internal::RepeatedPtrFieldBase::Clear::TypeHandler>()