patch some bug fix:part 2

This commit is contained in:
ly0
2021-07-23 17:33:21 +08:00
committed by wangzelin.wzl
parent a56ffb2862
commit 8160997f02
5 changed files with 119 additions and 74 deletions

View File

@ -137,22 +137,21 @@ public:
} }
}; };
struct SCond { template <int PRIO>
struct SCondTemp {
public: public:
typedef SimpleCond CondPerCpu; typedef SimpleCond CondPerCpu;
typedef SCondReadyFlag Lock; typedef SCondReadyFlag Lock;
typedef SCondCounter Counter; typedef SCondCounter Counter;
typedef SCondSimpleIdGen IdGen; typedef SCondSimpleIdGen IdGen;
enum { CPU_COUNT = OB_MAX_CPU_NUM, COND_COUNT = CPU_COUNT, LOOP_LIMIT = 8 }; enum { CPU_COUNT = OB_MAX_CPU_NUM, COND_COUNT = CPU_COUNT, LOOP_LIMIT = 8 };
SCond() void signal(uint32_t x = 1, int prio=0)
{}
~SCond()
{}
void signal(uint32_t x = 1)
{ {
uint32_t v = conds_[id_gen_.get() % COND_COUNT].signal(x); for (int p = PRIO-1; p >= prio && x > 0; p--) {
if (v < x) { x -= conds_[id_gen_.get() % COND_COUNT][p].signal(x);
n2wakeup_.add(x - v); }
if (x > 0) {
n2wakeup_.add(x);
lock_.set_ready(); lock_.set_ready();
int64_t loop_cnt = 0; int64_t loop_cnt = 0;
while (loop_cnt++ < LOOP_LIMIT && lock_.lock()) { while (loop_cnt++ < LOOP_LIMIT && lock_.lock()) {
@ -164,11 +163,12 @@ public:
} }
} }
} }
void prepare() void prepare(int prio=0)
{ {
uint32_t id = 0; uint32_t id = 0;
uint32_t key = get_key(id); uint32_t key = get_key(prio, id);
get_wait_key() = ((uint64_t)id << 32) + key; id += (prio << 16);
get_wait_key() = ((uint64_t)id<<32) + key;
} }
void wait(int64_t timeout) void wait(int64_t timeout)
{ {
@ -177,15 +177,14 @@ public:
} }
protected: protected:
uint32_t get_key(uint32_t& id) uint32_t get_key(int prio, uint32_t& id)
{ {
return conds_[id = (id_gen_.next() % COND_COUNT)].get_key(); return conds_[id = (id_gen_.next() % COND_COUNT)][prio].get_key();
} }
void wait(uint32_t id, uint32_t key, int64_t timeout) void wait(uint32_t id, uint32_t key, int64_t timeout)
{ {
conds_[id % COND_COUNT].wait(key, timeout); conds_[((uint16_t)id) % COND_COUNT][id >> 16].wait(key, timeout);
} }
private: private:
static uint64_t& get_wait_key() static uint64_t& get_wait_key()
{ {
@ -194,20 +193,27 @@ private:
} }
void do_wakeup() void do_wakeup()
{ {
uint32_t n2wakeup = n2wakeup_.fetch(); uint32_t n2wakeup = 0;
for (int i = 0; n2wakeup > 0 && i < COND_COUNT; i++) { //for (int p = PRIO - 1; p >= 0; p--) {
n2wakeup -= conds_[i].signal(n2wakeup); n2wakeup = n2wakeup_.fetch();
// }
for (int p = PRIO - 1; n2wakeup > 0 && p >= 0; p--) {
for(int i = 0; n2wakeup > 0 && i < COND_COUNT; i++) {
n2wakeup -= conds_[i][p].signal(n2wakeup);
}
} }
} }
private: private:
Lock lock_ CACHE_ALIGNED; Lock lock_ CACHE_ALIGNED;
CondPerCpu conds_[COND_COUNT]; CondPerCpu conds_[COND_COUNT][PRIO];
Counter n2wakeup_; Counter n2wakeup_;
IdGen id_gen_; IdGen id_gen_;
}; };
}; // end namespace common using SCond = SCondTemp<1>;
}; // end namespace oceanbase
}; // end namespace common
}; // end namespace oceanbase
#endif /* OCEANBASE_LOCK_OB_SCOND_H_ */ #endif /* OCEANBASE_LOCK_OB_SCOND_H_ */

View File

@ -107,10 +107,10 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObPriorityQueue); DISALLOW_COPY_AND_ASSIGN(ObPriorityQueue);
}; };
template <int HIGH_PRIOS, int LOW_PRIOS> template <int HIGH_HIGH_PRIOS, int HIGH_PRIOS=0, int LOW_PRIOS=0>
class ObPriorityQueue2 { class ObPriorityQueue2 {
public: public:
enum { PRIO_CNT = HIGH_PRIOS + LOW_PRIOS }; enum { PRIO_CNT = HIGH_HIGH_PRIOS + HIGH_PRIOS + LOW_PRIOS };
ObPriorityQueue2() : queue_(), size_(0), limit_(INT64_MAX) ObPriorityQueue2() : queue_(), size_(0), limit_(INT64_MAX)
{} {}
@ -150,10 +150,13 @@ public:
} else if (OB_FAIL(queue_[priority].push(data))) { } else if (OB_FAIL(queue_[priority].push(data))) {
// do nothing // do nothing
} else { } else {
cond_.signal(); if (priority < HIGH_HIGH_PRIOS) {
// if (priority < HIGH_PRIOS) { cond_.signal(1, 0);
// high_cond_.signal(); } else if (priority < HIGH_PRIOS + HIGH_HIGH_PRIOS) {
// } cond_.signal(1, 1);
} else {
cond_.signal(1, 2);
}
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
@ -162,29 +165,6 @@ public:
return ret; return ret;
} }
inline int do_pop(ObLink*& data, int64_t plimit, int64_t timeout_us)
{
int ret = OB_ENTRY_NOT_EXIST;
if (OB_UNLIKELY(timeout_us < 0)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(ERROR, "timeout is invalid", K(ret), K(timeout_us));
} else {
cond_.prepare();
for (int i = 0; OB_ENTRY_NOT_EXIST == ret && i < plimit; i++) {
if (OB_SUCCESS == queue_[i].pop(data)) {
ret = OB_SUCCESS;
}
}
if (OB_FAIL(ret)) {
cond_.wait(timeout_us);
data = NULL;
} else {
(void)ATOMIC_FAA(&size_, -1);
}
}
return ret;
}
int pop(ObLink*& data, int64_t timeout_us) int pop(ObLink*& data, int64_t timeout_us)
{ {
return do_pop(data, PRIO_CNT, timeout_us); return do_pop(data, PRIO_CNT, timeout_us);
@ -195,8 +175,42 @@ public:
return do_pop(data, HIGH_PRIOS, timeout_us); return do_pop(data, HIGH_PRIOS, timeout_us);
} }
int pop_high_high(ObLink*& data, int64_t timeout_us)
{
return do_pop(data, HIGH_HIGH_PRIOS, timeout_us);
}
private: private:
SCond cond_; inline int do_pop(ObLink*& data, int64_t plimit, int64_t timeout_us)
{
int ret = OB_ENTRY_NOT_EXIST;
if (OB_UNLIKELY(timeout_us < 0)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(ERROR, "timeout is invalid", K(ret), K(timeout_us));
} else {
if (plimit <= HIGH_HIGH_PRIOS) {
cond_.prepare(0);
} else if (plimit <= HIGH_PRIOS + HIGH_HIGH_PRIOS) {
cond_.prepare(1);
} else {
cond_.prepare(2);
}
for (int i = 0; OB_ENTRY_NOT_EXIST == ret && i < plimit; i++) {
if (OB_SUCCESS == queue_[i].pop(data)) {
ret = OB_SUCCESS;
}
}
if (OB_FAIL(ret)) {
cond_.wait(timeout_us);
data = NULL;
} else {
(void)ATOMIC_FAA(&size_, -1);
}
}
return ret;
}
SCondTemp<3> cond_;
ObLinkQueue queue_[PRIO_CNT]; ObLinkQueue queue_[PRIO_CNT];
int64_t size_ CACHE_ALIGNED; int64_t size_ CACHE_ALIGNED;
int64_t limit_ CACHE_ALIGNED; int64_t limit_ CACHE_ALIGNED;

View File

@ -15,9 +15,11 @@
#include "lib/queue/ob_priority_queue.h" #include "lib/queue/ob_priority_queue.h"
#include "lib/coro/co.h" #include "lib/coro/co.h"
#include "lib/thread/thread_pool.h" #include "lib/thread/thread_pool.h"
#include <iostream>
using namespace oceanbase::lib; using namespace oceanbase::lib;
using namespace oceanbase::common; using namespace oceanbase::common;
using namespace std;
class TestQueue : public ThreadPool { class TestQueue : public ThreadPool {
public: public:
@ -31,16 +33,18 @@ public:
{} {}
int64_t val_; int64_t val_;
}; };
typedef ObPriorityQueue<3> Queue; typedef ObPriorityQueue2<1, 2> Queue;
TestQueue() : seq_(0) TestQueue(): push_seq_(0), pop_seq_(0)
{ {
limit_ = atoll(getenv("limit") ?: "1000000"); limit_ = atoll(getenv("limit")?: "1000000");
} }
virtual ~TestQueue() virtual ~TestQueue()
{} {}
void do_stress() void do_stress()
{ {
set_thread_count(atoi(getenv("n_thread") ?: "8")); set_thread_count(atoi(getenv("n_thread") ?: "8"));
n_pusher_ = atoi(getenv("n_pusher")?: "4");
queue_.set_limit(65536);
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_FAIL(start())) { if (OB_FAIL(start())) {
LIB_LOG(ERROR, "start fail", K(ret), K(errno)); LIB_LOG(ERROR, "start fail", K(ret), K(errno));
@ -51,59 +55,78 @@ public:
} }
void print() void print()
{ {
int64_t last_seq = ATOMIC_LOAD(&seq_); int64_t last_seq = ATOMIC_LOAD(&pop_seq_);
while (ATOMIC_LOAD(&seq_) < limit_) { while (ATOMIC_LOAD(&pop_seq_) < limit_) {
sleep(1); sleep(1);
int64_t cur_seq = ATOMIC_LOAD(&seq_); int64_t cur_seq = ATOMIC_LOAD(&pop_seq_);
LIB_LOG(INFO, "queue", "tps", BATCH * (cur_seq - last_seq)); LIB_LOG(INFO, "queue", "tps", BATCH * (cur_seq - last_seq));
last_seq = cur_seq; last_seq = cur_seq;
} }
} }
int64_t get_seq() int64_t get_seq(int64_t &seq)
{ {
return ATOMIC_FAA(&seq_, 1); return ATOMIC_FAA(&seq, 1);
} }
int insert(int64_t seq) int insert(int64_t seq)
{ {
int err = 0; int err = 0;
QData* data = new QData(seq); QData* data = new QData(seq);
err = queue_.push(data, (int)data->val_ % 3); err = queue_.push(data, data->val_ % 3);
return err; return err;
} }
int del(int64_t seq) int del(uint64_t idx)
{ {
UNUSED(seq);
int err; int err;
QData* data = NULL; QData* data = NULL;
err = queue_.pop((ObLink*&)data, 500); if (idx == 0) {
err = queue_.pop_high_high((ObLink*&)data, 10000);
} else {
err = queue_.pop((ObLink*&)data, 10000);
}
// auto now = ObTimeUtility::current_time();
// if (data) {
// if (now - data->val_ > 100) {
// cout << now - data->val_ << endl;
// }
// usleep(500);
// }
delete data; delete data;
return err; return err;
} }
void run1() override void run1() override
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t seq = 0; int64_t seq = 0;
const uint64_t idx = get_thread_idx(); const uint64_t idx = get_thread_idx();
while ((seq = get_seq()) < limit_) { cout << "idx: " << idx << endl;
if (0 == (idx % 2)) { if (idx >= get_thread_count() - n_pusher_) {
while ((seq = get_seq(push_seq_)) < limit_) {
for (int i = 0; i < BATCH; i++) { for (int i = 0; i < BATCH; i++) {
//::usleep(10000);
auto now = ObTimeUtility::current_time();
do { do {
ret = insert(i); ret = insert(now);
} while (OB_FAIL(ret)); } while (OB_FAIL(ret));
} }
} else { }
} else {
while ((seq = get_seq(pop_seq_)) < limit_) {
for (int i = 0; i < BATCH; i++) { for (int i = 0; i < BATCH; i++) {
do { do {
ret = del(i); ret = del(idx);
} while (OB_FAIL(ret)); } while (OB_FAIL(ret));
} }
} }
} }
std::cout << idx << " finished" << std::endl;
} }
private: private:
int64_t seq_ CACHE_ALIGNED; int64_t push_seq_ CACHE_ALIGNED;
int64_t pop_seq_ CACHE_ALIGNED;
int64_t limit_; int64_t limit_;
int64_t n_pusher_;
Queue queue_; Queue queue_;
}; // end of class Consumer }; // end of class Consumer

View File

@ -647,7 +647,7 @@ int ObTenant::get_new_request(ObThWorker& w, int64_t timeout, rpc::ObRequest*& r
if (OB_UNLIKELY(only_high_high_prio)) { if (OB_UNLIKELY(only_high_high_prio)) {
// We must ensure at least one worker can process the highest // We must ensure at least one worker can process the highest
// priority task. // priority task.
ret = req_queue_.do_pop(task, QQ_HIGH + 1, timeout); ret = req_queue_.pop_high_high(task, timeout);
} else if (OB_UNLIKELY(only_high_prio)) { } else if (OB_UNLIKELY(only_high_prio)) {
// We must ensure at least number of tokens of workers which don't // We must ensure at least number of tokens of workers which don't
// process low priority task. // process low priority task.
@ -656,7 +656,9 @@ int ObTenant::get_new_request(ObThWorker& w, int64_t timeout, rpc::ObRequest*& r
// If large requests exist and this worker doesn't have LQT but // If large requests exist and this worker doesn't have LQT but
// can acquire, do it. // can acquire, do it.
ATOMIC_INC(&pop_normal_cnt_); ATOMIC_INC(&pop_normal_cnt_);
if (large_req_queue_.size() > 0 && !w.has_lq_token() && acquire_lq_token()) { if (large_req_queue_.size() > 0 &&
!w.has_lq_token() &&
acquire_lq_token()) {
w.set_lq_token(); w.set_lq_token();
} }
if (OB_LIKELY(!w.has_lq_token())) { if (OB_LIKELY(!w.has_lq_token())) {

View File

@ -558,7 +558,7 @@ protected:
/// tenant task queue, /// tenant task queue,
// 'hp' for high priority and 'np' for normal priority // 'hp' for high priority and 'np' for normal priority
common::ObPriorityQueue2<QQ_MAX_PRIO, RQ_MAX_PRIO - QQ_MAX_PRIO> req_queue_; common::ObPriorityQueue2<1, QQ_MAX_PRIO - 1, RQ_MAX_PRIO - QQ_MAX_PRIO> req_queue_;
common::ObLinkQueue large_req_queue_; common::ObLinkQueue large_req_queue_;
// Create a request queue for each level of nested requests // Create a request queue for each level of nested requests