[Optimize] Reduce lock conflicts in ThreadResourceMgr of be (#5772)
Removed some useless code that caused lock conflicts in ThreadResourceMgr of be.
This commit is contained in:
@ -57,7 +57,6 @@ ThreadResourceMgr::ResourcePool::ResourcePool(ThreadResourceMgr* parent) : _pare
|
||||
void ThreadResourceMgr::ResourcePool::reset() {
|
||||
_num_threads = 0;
|
||||
_num_reserved_optional_threads = 0;
|
||||
_thread_available_fn = NULL;
|
||||
_max_quota = INT_MAX;
|
||||
}
|
||||
|
||||
@ -83,7 +82,7 @@ ThreadResourceMgr::ResourcePool* ThreadResourceMgr::register_pool() {
|
||||
pool->reset();
|
||||
|
||||
// Added a new pool, update the quotas for each pool.
|
||||
update_pool_quotas(pool);
|
||||
update_pool_quotas();
|
||||
return pool;
|
||||
}
|
||||
|
||||
@ -98,32 +97,12 @@ void ThreadResourceMgr::unregister_pool(ResourcePool* pool) {
|
||||
}
|
||||
}
|
||||
|
||||
void ThreadResourceMgr::ResourcePool::set_thread_available_cb(thread_available_cb fn) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
DCHECK(_thread_available_fn == NULL || fn == NULL);
|
||||
_thread_available_fn = fn;
|
||||
}
|
||||
|
||||
void ThreadResourceMgr::update_pool_quotas(ResourcePool* new_pool) {
|
||||
void ThreadResourceMgr::update_pool_quotas() {
|
||||
if (_pools.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
_per_pool_quota = ceil(static_cast<double>(_system_threads_quota) / _pools.size());
|
||||
|
||||
for (Pools::iterator it = _pools.begin(); it != _pools.end(); ++it) {
|
||||
ResourcePool* pool = *it;
|
||||
|
||||
if (pool == new_pool) {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> l(pool->_lock);
|
||||
|
||||
if (pool->num_available_threads() > 0 && pool->_thread_available_fn != NULL) {
|
||||
pool->_thread_available_fn(pool);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -117,14 +117,6 @@ public:
|
||||
// Must not be called from from thread_available_cb.
|
||||
void release_thread_token(bool required);
|
||||
|
||||
// Add a callback to be notified when a thread is available.
|
||||
// 'arg' is opaque and passed directly to the callback.
|
||||
// The previous callback is no longer notified.
|
||||
// TODO: rethink this. How we do coordinate when we have multiple places in
|
||||
// the execution that all need threads (e.g. do we use that thread for
|
||||
// the scanner or for the join).
|
||||
void set_thread_available_cb(thread_available_cb fn);
|
||||
|
||||
// Returns the number of threads that are from acquire_thread_token.
|
||||
int num_required_threads() const { return _num_threads & 0xFFFFFFFF; }
|
||||
|
||||
@ -170,13 +162,6 @@ public:
|
||||
// swap operations. The number of required threads is the lower
|
||||
// 32 bits and the number of optional threads is the upper 32 bits.
|
||||
int64_t _num_threads;
|
||||
|
||||
// Lock for the fields below. This lock is taken when the callback
|
||||
// function is called.
|
||||
// TODO: reconsider this.
|
||||
std::mutex _lock;
|
||||
|
||||
thread_available_cb _thread_available_fn;
|
||||
};
|
||||
|
||||
// Create a thread mgr object. If threads_quota is non-zero, it will be
|
||||
@ -214,12 +199,7 @@ private:
|
||||
// Recycled list of pool objects
|
||||
std::list<ResourcePool*> _free_pool_objs;
|
||||
|
||||
// Updates the per pool quota and notifies any pools that now have
|
||||
// more threads they can use. Must be called with _lock taken.
|
||||
// If new_pool is non-null, new_pool will *not* be notified.
|
||||
void update_pool_quotas(ResourcePool* new_pool);
|
||||
|
||||
void update_pool_quotas() { update_pool_quotas(NULL); }
|
||||
void update_pool_quotas();
|
||||
};
|
||||
|
||||
inline void ThreadResourceMgr::ResourcePool::acquire_thread_token() {
|
||||
@ -265,20 +245,6 @@ inline void ThreadResourceMgr::ResourcePool::release_thread_token(bool required)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We need to grab a lock before issuing the callback to prevent the
|
||||
// callback from being removed while it is happening.
|
||||
// Note: this is unlikely to be a big deal for performance currently
|
||||
// since only scanner threads call this with any frequency and that only
|
||||
// happens once when the scanner thread is complete.
|
||||
// TODO: reconsider this.
|
||||
if (num_available_threads() > 0 && _thread_available_fn != NULL) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
|
||||
if (num_available_threads() > 0 && _thread_available_fn != NULL) {
|
||||
_thread_available_fn(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -26,42 +26,20 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
class NotifiedCounter {
|
||||
public:
|
||||
NotifiedCounter() : _counter(0) {}
|
||||
|
||||
void Notify(ThreadResourceMgr::ResourcePool* consumer) {
|
||||
DCHECK(consumer != NULL);
|
||||
DCHECK_LT(consumer->num_threads(), consumer->quota());
|
||||
++_counter;
|
||||
}
|
||||
|
||||
int counter() const { return _counter; }
|
||||
|
||||
private:
|
||||
int _counter;
|
||||
};
|
||||
|
||||
TEST(ThreadResourceMgr, BasicTest) {
|
||||
ThreadResourceMgr mgr(5);
|
||||
NotifiedCounter counter1;
|
||||
NotifiedCounter counter2;
|
||||
|
||||
ThreadResourceMgr::ResourcePool* c1 = mgr.register_pool();
|
||||
c1->set_thread_available_cb(
|
||||
std::bind<void>(std::mem_fn(&NotifiedCounter::Notify), &counter1, _1));
|
||||
c1->acquire_thread_token();
|
||||
c1->acquire_thread_token();
|
||||
c1->acquire_thread_token();
|
||||
EXPECT_EQ(c1->num_threads(), 3);
|
||||
EXPECT_EQ(c1->num_required_threads(), 3);
|
||||
EXPECT_EQ(c1->num_optional_threads(), 0);
|
||||
EXPECT_EQ(counter1.counter(), 0);
|
||||
c1->release_thread_token(true);
|
||||
EXPECT_EQ(c1->num_threads(), 2);
|
||||
EXPECT_EQ(c1->num_required_threads(), 2);
|
||||
EXPECT_EQ(c1->num_optional_threads(), 0);
|
||||
EXPECT_EQ(counter1.counter(), 1);
|
||||
EXPECT_TRUE(c1->try_acquire_thread_token());
|
||||
EXPECT_TRUE(c1->try_acquire_thread_token());
|
||||
EXPECT_TRUE(c1->try_acquire_thread_token());
|
||||
@ -71,12 +49,9 @@ TEST(ThreadResourceMgr, BasicTest) {
|
||||
EXPECT_EQ(c1->num_optional_threads(), 3);
|
||||
c1->release_thread_token(true);
|
||||
c1->release_thread_token(false);
|
||||
EXPECT_EQ(counter1.counter(), 3);
|
||||
|
||||
// Register a new consumer, quota is cut in half
|
||||
ThreadResourceMgr::ResourcePool* c2 = mgr.register_pool();
|
||||
c2->set_thread_available_cb(
|
||||
std::bind<void>(std::mem_fn(&NotifiedCounter::Notify), &counter2, _1));
|
||||
EXPECT_FALSE(c1->try_acquire_thread_token());
|
||||
EXPECT_EQ(c1->num_threads(), 3);
|
||||
c1->acquire_thread_token();
|
||||
@ -86,8 +61,6 @@ TEST(ThreadResourceMgr, BasicTest) {
|
||||
|
||||
mgr.unregister_pool(c1);
|
||||
mgr.unregister_pool(c2);
|
||||
EXPECT_EQ(counter1.counter(), 3);
|
||||
EXPECT_EQ(counter2.counter(), 1);
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user