init push

This commit is contained in:
oceanbase-admin
2021-05-31 22:56:52 +08:00
commit cea7de1475
7020 changed files with 5689869 additions and 0 deletions

View File

@ -0,0 +1,140 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_LIB_QUEUE_OB_SEQ_QUEUE_H_
#define _OB_LIB_QUEUE_OB_SEQ_QUEUE_H_
#include "lib/ob_define.h"
#include "lib/allocator/ob_allocator.h" // ObIAllocator
#include "lib/atomic/ob_atomic.h" // PAUSE
#include "lib/oblog/ob_log.h"
namespace oceanbase {
namespace common {
class ObCoSeqQueue {
enum { MIN_QUEUE_SIZE = 4, READY = 1 };
public:
ObCoSeqQueue() : next_(0), len_(0), items_(NULL), allocator_(NULL)
{}
~ObCoSeqQueue()
{
destroy();
}
public:
int init(const int64_t limit, ObIAllocator* allocator)
{
int ret = common::OB_SUCCESS;
if (limit <= MIN_QUEUE_SIZE || NULL == allocator) {
ret = common::OB_INVALID_ARGUMENT;
LIB_LOG(ERROR, "invalid args", K(limit), K(allocator));
} else if (NULL == (items_ = static_cast<int64_t*>(allocator->alloc(sizeof(int64_t) * limit)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
LIB_LOG(ERROR, "alloc memory for items failed", "size", sizeof(int64_t) * limit);
}
for (int64_t i = 0; common::OB_SUCCESS == ret && i < limit; i++) {
items_[i] = i;
}
if (OB_SUCCESS != ret) {
destroy();
} else {
len_ = limit;
allocator_ = allocator;
}
return ret;
}
int destroy()
{
if (NULL != items_ && NULL != allocator_) {
allocator_->free((void*)items_);
items_ = NULL;
}
len_ = 0;
next_ = 0;
allocator_ = NULL;
return common::OB_SUCCESS;
}
int64_t get_next() const
{
return next_;
}
bool is_ready(const int64_t id) const
{
bool bret = false;
if (NULL == items_) {
LIB_LOG(ERROR, "invalid item", K(items_));
} else if (id < 0) {
LIB_LOG(ERROR, "invalid id", K(id));
} else if (0 == len_) {
LIB_LOG(ERROR, "invalid len", K(len_));
} else {
bret = items_[id % len_] >= id + READY;
}
return bret;
}
int wait(const int64_t id, int64_t& target_id) const
{
int ret = common::OB_SUCCESS;
if (NULL == items_) {
ret = common::OB_NOT_INIT;
} else if (id < 0) {
ret = common::OB_INVALID_ARGUMENT;
} else {
while (items_[id % len_] < id) {
PAUSE();
}
target_id = items_[id % len_];
}
return ret;
}
int64_t add(const int64_t id)
{
int64_t last_got = 0;
if (NULL != items_) {
while (!__sync_bool_compare_and_swap(items_ + (id % len_), id, id + READY)) {
PAUSE();
}
while (true) {
int64_t next = next_;
if (__sync_bool_compare_and_swap(items_ + (next % len_), next + READY, next + len_)) {
last_got = next + 1;
next_++;
} else if (items_[next % len_] < next + READY) {
break;
}
}
}
return last_got >= next_ ? last_got : 0;
}
private:
volatile int64_t next_ CACHE_ALIGNED;
int64_t len_ CACHE_ALIGNED;
volatile int64_t* items_;
common::ObIAllocator* allocator_;
DISALLOW_COPY_AND_ASSIGN(ObCoSeqQueue);
};
}; // end namespace common
}; // end namespace oceanbase
#endif /* __OB_LIB_QUEUE_OB_SEQ_QUEUE_H__ */

View File

@ -0,0 +1,419 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "lib/queue/ob_dedup_queue.h"
#include "lib/thread/ob_thread_name.h"
namespace oceanbase {
namespace common {
#define ERASE_FROM_LIST(head, tail, node) \
do { \
if (NULL == node->get_prev()) { \
head = node->get_next(); \
} else { \
node->get_prev()->set_next(node->get_next()); \
} \
if (NULL == node->get_next()) { \
tail = node->get_prev(); \
} else { \
node->get_next()->set_prev(node->get_prev()); \
} \
node->set_prev(NULL); \
node->set_next(NULL); \
} while (0)
ObDedupQueue::ObDedupQueue()
: is_inited_(false),
thread_num_(DEFAULT_THREAD_NUM),
work_thread_num_(DEFAULT_THREAD_NUM),
thread_dead_threshold_(DEFALT_THREAD_DEAD_THRESHOLD),
hash_allocator_(allocator_),
gc_queue_head_(NULL),
gc_queue_tail_(NULL),
thread_name_(nullptr)
{}
ObDedupQueue::~ObDedupQueue()
{
destroy();
}
int ObDedupQueue::init(int32_t thread_num /*= DEFAULT_THREAD_NUM*/, const char* thread_name /*= NULL*/,
const int64_t queue_size /*= TASK_QUEUE_SIZE*/, const int64_t task_map_size /*= TASK_MAP_SIZE*/,
const int64_t total_mem_limit /*= TOTAL_LIMIT*/, const int64_t hold_mem_limit /*= HOLD_LIMIT*/,
const int64_t page_size /*= PAGE_SIZE*/)
{
int ret = OB_SUCCESS;
if (is_inited_) {
ret = OB_INIT_TWICE;
} else if (thread_num <= 0 || thread_num > MAX_THREAD_NUM || total_mem_limit <= 0 || hold_mem_limit <= 0 ||
page_size <= 0) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(
WARN, "invalid argument", K(thread_num), K(queue_size), K(total_mem_limit), K(hold_mem_limit), K(page_size));
} else if (OB_FAIL(task_queue_sync_.init(ObWaitEventIds::DEDUP_QUEUE_COND_WAIT))) {
COMMON_LOG(WARN, "fail to init task queue sync cond, ", K(ret));
} else if (OB_FAIL(work_thread_sync_.init(ObWaitEventIds::DEFAULT_COND_WAIT))) {
COMMON_LOG(WARN, "fail to init work thread sync cond, ", K(ret));
} else {
thread_name_ = thread_name;
thread_num_ = thread_num;
work_thread_num_ = thread_num;
set_thread_count(thread_num);
if (OB_SUCCESS != (ret = allocator_.init(total_mem_limit, hold_mem_limit, page_size))) {
COMMON_LOG(WARN, "allocator init fail", K(total_mem_limit), K(hold_mem_limit), K(page_size), K(ret));
} else if (OB_SUCCESS !=
(ret = task_map_.create(task_map_size, &hash_allocator_, ObModIds::OB_HASH_BUCKET_TASK_MAP))) {
COMMON_LOG(WARN, "task_map create fail", K(ret));
} else if (OB_SUCCESS != (ret = task_queue_.init(queue_size))) {
COMMON_LOG(WARN, "task_queue init fail", K(ret));
} else if (OB_FAIL(start())) {
COMMON_LOG(WARN, "start thread fail", K(ret));
} else {
is_inited_ = true;
}
}
if (OB_FAIL(ret)) {
destroy();
} else {
COMMON_LOG(INFO,
"init dedup-queue:",
K(thread_num),
K(queue_size),
K(task_map_size),
K(total_mem_limit),
K(hold_mem_limit),
K(page_size),
KP(this),
"lbt",
lbt());
}
return ret;
}
void ObDedupQueue::destroy()
{
lib::CoKThread::stop();
lib::CoKThread::wait();
lib::CoKThread::destroy();
IObDedupTask* iter = gc_queue_head_;
while (NULL != iter) {
IObDedupTask* next = iter->get_next();
destroy_task_(iter);
iter = next;
}
gc_queue_head_ = NULL;
gc_queue_tail_ = NULL;
while (OB_SUCCESS == task_queue_.pop(iter)) {
destroy_task_(iter);
iter = NULL;
}
task_queue_.destroy();
task_map_.destroy();
allocator_.destroy();
task_queue_sync_.destroy();
work_thread_sync_.destroy();
is_inited_ = false;
}
int ObDedupQueue::set_thread_dead_threshold(const int64_t thread_dead_threshold)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (0 > thread_dead_threshold) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "thread_dead_threshold is not valid", K(ret));
} else {
thread_dead_threshold_ = thread_dead_threshold;
}
return ret;
}
int ObDedupQueue::set_work_thread_num(const int32_t work_thread_num)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (0 >= work_thread_num) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "work_thread_num is not valid. ", K(ret), K(work_thread_num));
} else {
ObThreadCondGuard cond_guard(work_thread_sync_);
work_thread_num_ = work_thread_num;
(void)work_thread_sync_.broadcast();
}
return ret;
}
int ObDedupQueue::add_task(const IObDedupTask& task)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
MapFunction func(*this, task);
int hash_ret = task_map_.atomic_refactored(&task, func);
if (OB_SUCCESS == hash_ret) {
ret = func.result_code_;
} else if (OB_HASH_NOT_EXIST == hash_ret) {
if (OB_FAIL(add_task_(task))) {
COMMON_LOG(WARN, "failed to add task", K(ret));
}
} else {
COMMON_LOG(WARN, "unexpected hash_ret", K(hash_ret));
ret = hash_ret;
}
if (OB_SUCC(ret)) {
ObThreadCondGuard guard(task_queue_sync_);
(void)task_queue_sync_.signal();
}
if (REACH_TIME_INTERVAL(THREAD_CHECK_INTERVAL)) {
for (int64_t i = 0; i < thread_num_; i++) {
if (thread_metas_[i].check_dead(thread_dead_threshold_)) {
COMMON_LOG(WARN, "thread maybe dead", K(i), K(thread_metas_[i]));
}
}
}
}
return ret;
}
IObDedupTask* ObDedupQueue::copy_task_(const IObDedupTask& task)
{
IObDedupTask* ret = NULL;
if (IS_NOT_INIT) {
COMMON_LOG(WARN, "ObDedupQueue is not inited");
} else {
int64_t deep_copy_size = task.get_deep_copy_size();
char* memory = NULL;
if (NULL == (memory = (char*)allocator_.alloc(deep_copy_size))) {
COMMON_LOG(WARN, "alloc memory fail", K(deep_copy_size), K(task.get_type()));
} else if (NULL == (ret = task.deep_copy(memory, deep_copy_size))) {
COMMON_LOG(WARN, "deep copy task object fail", K(deep_copy_size), KP(memory));
} else {
COMMON_LOG(DEBUG, "deep copy task succ", K(ret), KP(memory), K(deep_copy_size));
ret->set_memory_ptr(memory);
}
if (NULL == ret) {
if (NULL != memory) {
allocator_.free(memory);
memory = NULL;
}
}
}
return ret;
}
void ObDedupQueue::destroy_task_(IObDedupTask* task)
{
if (IS_NOT_INIT) {
COMMON_LOG(WARN, "ObDedupQueue is not inited");
} else if (OB_ISNULL(task)) {
COMMON_LOG(WARN, "invalid argument");
} else {
char* memory = task->get_memory_ptr();
task->~IObDedupTask();
if (NULL != memory) {
allocator_.free(memory);
memory = NULL;
}
}
}
int ObDedupQueue::map_callback_(const IObDedupTask& task, TaskMapKVPair& kvpair)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
IObDedupTask* task2remove = NULL;
IObDedupTask* task2add = NULL;
if (kvpair.first != kvpair.second || NULL == (task2remove = kvpair.second)) {
COMMON_LOG(WARN, "unexpected key null pointer", K(kvpair.first), K(kvpair.second));
ret = OB_ERR_UNEXPECTED;
} else if (0 != task2remove->trylock()) {
ret = OB_EAGAIN;
} else if (!task2remove->is_process_done() ||
::oceanbase::common::ObTimeUtility::current_time() < task2remove->get_abs_expired_time()) {
task2remove->unlock();
ret = OB_EAGAIN;
} else if (NULL == (task2add = copy_task_(task))) {
task2remove->unlock();
COMMON_LOG(WARN, "copy task fail");
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_SUCCESS != (ret = task_queue_.push(task2add))) {
task2remove->unlock();
COMMON_LOG(WARN, "push task to queue fail", K(ret));
destroy_task_(task2add);
task2add = NULL;
} else {
gc_queue_sync_.lock();
ERASE_FROM_LIST(gc_queue_head_, gc_queue_tail_, task2remove);
gc_queue_sync_.unlock();
kvpair.first = task2add;
kvpair.second = task2add;
task2remove->unlock();
destroy_task_(task2remove);
task2remove = NULL;
}
}
return ret;
}
int ObDedupQueue::add_task_(const IObDedupTask& task)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
IObDedupTask* task2add = NULL;
if (NULL == (task2add = copy_task_(task))) {
COMMON_LOG(WARN, "copy task fail");
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
// lockstat is to aviod that job not insert task_queue was deleted from map by other thread.
task2add->lock();
int hash_ret = task_map_.set_refactored(task2add, task2add);
if (OB_SUCCESS != hash_ret) {
if (OB_HASH_EXIST != hash_ret) {
// OB_HASH_EXIST is a possible case, do not log any warn
COMMON_LOG(WARN, "set to task map fail", K(hash_ret));
}
task2add->unlock();
destroy_task_(task2add);
task2add = NULL;
ret = OB_EAGAIN;
} else if (OB_SUCCESS != (ret = task_queue_.push(task2add))) {
COMMON_LOG(WARN, "push task to queue fail", K(ret));
if (OB_SUCCESS != (hash_ret = task_map_.erase_refactored(task2add))) {
task2add->unlock();
COMMON_LOG(WARN, "unexpected erase from task_map fail", K(hash_ret));
} else {
task2add->unlock();
destroy_task_(task2add);
task2add = NULL;
}
} else {
task2add->unlock();
}
}
}
return ret;
}
bool ObDedupQueue::gc_()
{
bool bret = false;
IObDedupTask* task_list[GC_BATCH_NUM];
int64_t task_list_size = 0;
if (0 == gc_queue_sync_.trylock()) {
bret = true;
IObDedupTask* iter = gc_queue_head_;
while (NULL != iter && GC_BATCH_NUM > task_list_size) {
IObDedupTask* next = iter->get_next();
if (iter->is_process_done() &&
::oceanbase::common::ObTimeUtility::current_time() > iter->get_abs_expired_time() && 0 == iter->trylock()) {
task_list[task_list_size++] = iter;
ERASE_FROM_LIST(gc_queue_head_, gc_queue_tail_, iter);
}
iter = next;
}
gc_queue_sync_.unlock();
}
for (int64_t i = 0; i < task_list_size; i++) {
if (NULL == task_list[i]) {
continue;
}
int hash_ret = task_map_.erase_refactored(task_list[i]);
if (OB_SUCCESS != hash_ret) {
const int64_t type = task_list[i]->get_type();
task_list[i]->unlock();
COMMON_LOG(WARN, "unexpected erase from task_map fail", K(hash_ret), K(type), K(task_list_size));
} else {
task_list[i]->unlock();
destroy_task_(task_list[i]);
task_list[i] = NULL;
}
}
return bret;
}
void ObDedupQueue::run1()
{
int tmp_ret = OB_SUCCESS;
int64_t thread_pos = (int64_t)get_thread_idx();
ThreadMeta& thread_meta = thread_metas_[thread_pos];
thread_meta.init();
COMMON_LOG(INFO, "dedup queue thread start", KP(this));
if (OB_NOT_NULL(thread_name_)) {
lib::set_thread_name(thread_name_);
}
while (!has_set_stop()) {
IObDedupTask* task2process = NULL;
if (thread_pos < work_thread_num_) {
if (OB_SUCCESS != (tmp_ret = task_queue_.pop(task2process)) && OB_UNLIKELY(tmp_ret != OB_ENTRY_NOT_EXIST)) {
COMMON_LOG(WARN, "task_queue_.pop error", K(tmp_ret), K(task2process));
} else if (NULL != task2process) {
thread_meta.on_process_start(task2process);
task2process->process();
thread_meta.on_process_end();
gc_queue_sync_.lock();
task2process->set_next(NULL);
if (NULL == gc_queue_tail_) {
task2process->set_prev(NULL);
gc_queue_head_ = task2process;
gc_queue_tail_ = task2process;
} else {
task2process->set_prev(gc_queue_tail_);
gc_queue_tail_->set_next(task2process);
gc_queue_tail_ = task2process;
}
gc_queue_sync_.unlock();
task2process->set_process_done();
}
thread_meta.on_gc_start();
bool gc_done = gc_();
thread_meta.on_gc_end();
if ((NULL != gc_queue_head_ && gc_done) || NULL != task2process) {
// need not wait
} else {
ObThreadCondGuard guard(task_queue_sync_);
if (0 == task_queue_.get_total()) {
if (OB_SUCCESS != (tmp_ret = task_queue_sync_.wait(QUEUE_WAIT_TIME_MS))) {
if (OB_TIMEOUT != tmp_ret) {
COMMON_LOG(WARN, "Fail to wait task queue sync, ", K(tmp_ret));
}
}
}
}
} else {
ObThreadCondGuard guard(work_thread_sync_);
if (thread_pos >= work_thread_num_) {
if (OB_SUCCESS != (tmp_ret = work_thread_sync_.wait(MAX_QUEUE_WAIT_TIME_MS))) {
if (OB_TIMEOUT != tmp_ret) {
COMMON_LOG(WARN, "Fail to wait work thread sync, ", K(tmp_ret));
}
}
}
}
}
}
} // namespace common
} // namespace oceanbase

View File

@ -0,0 +1,360 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_COMMON_DEDUP_QUEUE_
#define OCEANBASE_COMMON_DEDUP_QUEUE_
#include "lib/allocator/ob_concurrent_fifo_allocator.h"
#include "lib/allocator/ob_malloc.h"
#include "lib/hash/ob_hashmap.h"
#include "lib/lock/ob_mutex.h"
#include "lib/lock/ob_spin_lock.h"
#include "lib/lock/ob_thread_cond.h"
#include "lib/ob_define.h"
#include "lib/queue/ob_fixed_queue.h"
#include "lib/utility/utility.h"
#include "lib/thread/thread_pool.h"
namespace oceanbase {
namespace common {
enum ObDedupTaskType {
T_BLOOMFILTER = 0,
T_SCHEMA = 1,
T_BF_WARMUP = 2,
T_PT_MAINTENANCE = 3, // partition table maintenance
T_PL_UPDATE = 4, // partition location update
T_PT_CHECK = 5,
T_PT_MERGE = 6,
T_PL_FETCH = 7, // obproxy partition location fetch
T_PT_MIGRATE = 8,
T_PT_LOCAL_INDEX_BUILD = 9,
T_CONN_ID_FETCH = 10, // obproxy conn id fetch
T_VIP_TENANT_FETCH = 11, // obproxy vip--->tenant fetch
T_CLUSTER_RESOURCE_INIT = 12, // obproxy cluster resource init
T_SS_FETCH = 13, // obproxy server state fetch
T_RS_ET_UPDATE = 14, // rootservice event history table update
T_SYS_VAR_FETCH = 15, // obproxy renew system variable
T_PT_FREEZE = 16,
T_ELECTION_ET_UPDATE = 17,
T_WARM_UP_TASK = 18,
T_MAIN_ST_MERGE = 19,
T_INDEX_ST_MERGE = 20,
T_MAIN_MB_MERGE = 21,
T_INDEX_MB_MERGE = 22,
T_REFRESH_LOCALITY = 23,
T_TRANSFER_COPY_SSTORE = 24,
T_DANGLING_REPLICA_CHECK = 25,
T_PL_LEADER_UPDATE = 26, // partition location leader update
T_REFRESH_OPT_STAT = 27,
T_SCHEMA_RELEASE = 28,
T_BLOOMFILTER_LOAD = 29,
T_SCHEMA_ASYNC_REFRESH = 30,
T_CHECK_PG_RECOVERY_FINISHED = 31,
T_UPDATE_FILE_RECOVERY_STATUS = 32,
T_UPDATE_FILE_RECOVERY_STATUS_V2 = 33,
};
class ObDedupQueue;
class IObDedupTask {
friend class ObDedupQueue;
public:
explicit IObDedupTask(const int type) : type_(type), stat_(ST_WAITING), memory_(NULL), prev_(NULL), next_(NULL)
{}
virtual ~IObDedupTask()
{}
public:
virtual int64_t hash() const = 0;
virtual bool operator==(const IObDedupTask& other) const = 0;
virtual int64_t get_deep_copy_size() const = 0;
virtual IObDedupTask* deep_copy(char* buffer, const int64_t buf_size) const = 0;
virtual int64_t get_abs_expired_time() const = 0;
virtual int process() = 0;
inline int get_type() const
{
return type_;
}
private:
static const int ST_WAITING = 0;
static const int ST_DONE = 1;
private:
void set_prev(IObDedupTask* prev)
{
prev_ = prev;
}
void set_next(IObDedupTask* next)
{
next_ = next;
}
IObDedupTask* get_prev() const
{
return prev_;
}
IObDedupTask* get_next() const
{
return next_;
}
bool is_process_done() const
{
return ST_DONE == stat_;
}
void set_process_done()
{
stat_ = ST_DONE;
}
void lock()
{
sync_.lock();
}
int trylock()
{
return sync_.trylock();
}
void unlock()
{
sync_.unlock();
}
char* get_memory_ptr() const
{
return memory_;
}
void set_memory_ptr(char* memory)
{
memory_ = memory;
}
private:
const int type_;
int stat_;
ObSpinLock sync_;
char* memory_;
IObDedupTask* prev_;
IObDedupTask* next_;
private:
DISALLOW_COPY_AND_ASSIGN(IObDedupTask);
};
template <class T, class Host>
class AllocatorWrapper {
public:
AllocatorWrapper() : allocator_(NULL)
{}
explicit AllocatorWrapper(Host& allocator) : allocator_(&allocator)
{}
~AllocatorWrapper()
{}
public:
T* alloc()
{
T* ret = nullptr;
void* ptr = nullptr;
if (OB_NOT_NULL(allocator_) && OB_NOT_NULL(ptr = allocator_->alloc(sizeof(T)))) {
ret = new (ptr) T();
}
return ret;
}
void free(T* ptr)
{
if (NULL != ptr && NULL != allocator_) {
ptr->~T();
allocator_->free(ptr);
ptr = NULL;
}
}
void set_attr(const ObMemAttr& attr)
{
UNUSED(attr);
}
void inc_ref()
{}
void dec_ref()
{}
void clear()
{}
private:
Host* allocator_;
private:
DISALLOW_COPY_AND_ASSIGN(AllocatorWrapper);
};
class ObDedupQueue : public lib::ThreadPool {
public:
static const int64_t TOTAL_LIMIT = 1024L * 1024L * 1024L;
static const int64_t HOLD_LIMIT = 512L * 1024L * 1024L;
static const int64_t PAGE_SIZE = common::OB_MALLOC_BIG_BLOCK_SIZE;
static const int64_t TASK_MAP_SIZE = 20L * 1000;
static const int64_t TASK_QUEUE_SIZE = 20L * 1000;
public:
ObDedupQueue();
virtual ~ObDedupQueue();
public:
int init(int32_t thread_num = DEFAULT_THREAD_NUM, const char* thread_name = nullptr,
const int64_t queue_size = TASK_QUEUE_SIZE, const int64_t task_map_size = TASK_MAP_SIZE,
const int64_t total_mem_limit = TOTAL_LIMIT, const int64_t hold_mem_limit = HOLD_LIMIT,
const int64_t page_size = PAGE_SIZE);
void destroy();
public:
int add_task(const IObDedupTask& task);
int64_t task_count() const
{
return task_queue_.get_total();
}
void set_label(const lib::ObLabel& label)
{
allocator_.set_label(label);
}
int set_thread_dead_threshold(const int64_t thread_dead_threshold);
int set_work_thread_num(const int32_t work_thread_num);
public:
void run1() override;
private:
typedef ObFixedQueue<IObDedupTask> TaskQueue;
typedef AllocatorWrapper<hash::HashMapTypes<const IObDedupTask*, IObDedupTask*>::AllocType, ObConcurrentFIFOAllocator>
HashAllocator;
typedef hash::ObHashMap<const IObDedupTask*, IObDedupTask*, hash::MultiWriteDefendMode,
hash::hash_func<const IObDedupTask*>, hash::equal_to<const IObDedupTask*>, HashAllocator>
TaskMap;
typedef hash::HashMapTypes<const IObDedupTask*, IObDedupTask*>::pair_type TaskMapKVPair;
static const int32_t DEFAULT_THREAD_NUM = 4;
static const int32_t MAX_THREAD_NUM = 64;
static const int32_t QUEUE_WAIT_TIME_MS = 10; // 10ms
static const int32_t MAX_QUEUE_WAIT_TIME_MS = 100; // 100ms
static const int64_t GC_BATCH_NUM = 1024;
static const int64_t DEFALT_THREAD_DEAD_THRESHOLD = 30000000L; // 30s
static const int64_t THREAD_CHECK_INTERVAL = 10000000L; // 10s
struct MapFunction {
ObDedupQueue& host_;
const IObDedupTask& task_;
int result_code_;
MapFunction(ObDedupQueue& host, const IObDedupTask& task) : host_(host), task_(task), result_code_(OB_SUCCESS){};
void operator()(TaskMapKVPair& kvpair)
{
result_code_ = host_.map_callback_(task_, kvpair);
}
};
enum {
TH_IDEL = 0,
TH_RUN = 1,
TH_GC = 2,
};
struct ThreadMeta {
public:
int stat_;
int task_type_;
const IObDedupTask* running_task_;
int64_t busy_start_time_;
pthread_t pthread_id_;
int64_t tid_;
ThreadMeta() : stat_(TH_IDEL), task_type_(-1), running_task_(NULL), busy_start_time_(0), pthread_id_(), tid_(0)
{}
void init()
{
stat_ = TH_IDEL;
task_type_ = -1;
running_task_ = NULL;
busy_start_time_ = 0;
pthread_id_ = pthread_self();
tid_ = GETTID();
}
void on_process_start(const IObDedupTask* task)
{
busy_start_time_ = ::oceanbase::common::ObTimeUtility::current_time();
stat_ = TH_RUN;
if (NULL != task) {
task_type_ = task->get_type();
}
running_task_ = task;
};
void on_process_end()
{
stat_ = TH_IDEL;
};
void on_gc_start()
{
busy_start_time_ = ::oceanbase::common::ObTimeUtility::current_time();
stat_ = TH_GC;
};
void on_gc_end()
{
stat_ = TH_IDEL;
};
bool check_dead(const int64_t thread_dead_threshold) const
{
bool bret = false;
if (TH_IDEL != stat_ &&
(busy_start_time_ + thread_dead_threshold) < ::oceanbase::common::ObTimeUtility::current_time()) {
bret = true;
}
return bret;
};
int64_t to_string(char* buffer, const int64_t length) const
{
int64_t pos = 0;
databuff_printf(buffer,
length,
pos,
"stat=%d task_type=%d running_task=%p busy_start_time=%ld pthread_id=%ld tid=%ld",
stat_,
task_type_,
running_task_,
busy_start_time_,
pthread_id_,
tid_);
return pos;
};
};
private:
int map_callback_(const IObDedupTask& task, TaskMapKVPair& kvpair);
int add_task_(const IObDedupTask& task);
IObDedupTask* copy_task_(const IObDedupTask& task);
void destroy_task_(IObDedupTask* task);
bool gc_();
private:
bool is_inited_;
ThreadMeta thread_metas_[MAX_THREAD_NUM];
int32_t thread_num_;
int32_t work_thread_num_;
int64_t thread_dead_threshold_;
ObConcurrentFIFOAllocator allocator_;
HashAllocator hash_allocator_;
TaskMap task_map_;
TaskQueue task_queue_;
ObThreadCond task_queue_sync_;
IObDedupTask* gc_queue_head_;
IObDedupTask* gc_queue_tail_;
lib::ObMutex gc_queue_sync_;
ObThreadCond work_thread_sync_;
const char* thread_name_;
private:
DISALLOW_COPY_AND_ASSIGN(ObDedupQueue);
};
} // namespace common
} // namespace oceanbase
#endif // OCEANBASE_COMMON_DEDUP_QUEUE_

View File

@ -0,0 +1,845 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LIB_QUEUE_OB_EXT_MS_QUEUE_H__
#define OCEANBASE_LIB_QUEUE_OB_EXT_MS_QUEUE_H__
#include "lib/queue/ob_link.h"
#include "lib/queue/ob_ms_queue.h" // ObMsQueue
#include "lib/queue/ob_link_queue.h" // ObLinkQueue
#include "lib/objectpool/ob_small_obj_pool.h" // ObSmallObjPool
#include "lib/hash/ob_linear_hash_map.h" // ObLinearHashMap
#include "common/ob_queue_thread.h" // ObCond
#define MSQ_STAT(level, str, ...) LIB_LOG(level, "[STAT] [MsQueue] " str, ##__VA_ARGS__)
namespace oceanbase {
namespace common {
template <class KeyType>
class ObExtMsQueue {
public:
static const int64_t DEFAULT_MS_QUEUE_ITEM_COUNT = 1024;
public:
typedef ObLink Task;
enum QueueFlag {
IDLE = 0, // There is no output data in the queue
READY = 1, // The queue has data to output
HANDLING = 2, // The data in the queue is being processed by threads
TERMINATED = 3, // Terminate state, no longer accept data
};
// Task context
struct MsQueueItem;
struct TaskCtx : public ObLink {
int64_t queue_index_; // Internal queue number
MsQueueItem* ms_queue_item_; // MsQueue object
TaskCtx()
{
reset();
}
~TaskCtx()
{
reset();
}
bool is_valid() const
{
return NULL != ms_queue_item_ && queue_index_ >= 0;
}
void reset()
{
queue_index_ = 0;
ms_queue_item_ = NULL;
ObLink::reset();
}
TO_STRING_KV(K_(queue_index), KP_(ms_queue_item));
};
struct MsQueueItem {
static const int64_t PAGE_SIZE = OB_MALLOC_NORMAL_BLOCK_SIZE;
bool inited_;
KeyType key_;
int8_t* flags_; // Internal queue status array
int64_t terminated_qcount_; // The number of queues in a terminating state
int64_t next_to_terminate_queue_index_; // The number of the next queue to be terminated, that is, the number of
// the next queue to be pushed to terminate the task
TaskCtx* task_ctx_array_; // Task context array, a queue corresponds to a task context
ObMsQueue queue_; // MsQueue
ObArenaAllocator allocator_; // allocator
ObCond cond_;
MsQueueItem()
: inited_(false),
key_(),
flags_(NULL),
terminated_qcount_(),
next_to_terminate_queue_index_(0),
task_ctx_array_(NULL),
queue_(),
allocator_(ObModIds::OB_EXT_MS_QUEUE_QITEM, PAGE_SIZE),
cond_()
{}
~MsQueueItem()
{
reset();
}
int init(const KeyType& key, const int64_t queue_count, const int64_t queue_len);
void reset();
int push(Task* task, const int64_t seq, const uint64_t hash, const int64_t timeout);
int end_batch(const int64_t seq, const int64_t count);
int get(Task*& task, const int64_t queue_index);
TO_STRING_KV(K_(inited), K_(key), K_(flags), K_(terminated_qcount), K_(next_to_terminate_queue_index),
KP_(task_ctx_array), K_(queue));
};
typedef ObLinearHashMap<KeyType, MsQueueItem*> MsQueueMap;
typedef ObSmallObjPool<MsQueueItem> MsQueuePool;
public:
ObExtMsQueue();
virtual ~ObExtMsQueue();
public:
/// Initialization function
///
/// @param max_cached_ms_queue_item_count Maximum number of cached MsQueue
/// @param queue_count_of_ms_queue The number of queues inside each MsQueue
/// @param queue_len_of_ms_queue The length of the queue inside each MsQueue
int init(const int64_t max_cached_ms_queue_item_count, const int64_t queue_count_of_ms_queue,
const int64_t queue_len_of_ms_queue, const lib::ObLabel& label = ObModIds::OB_EXT_MS_QUEUE);
void destroy();
public:
/// Add <key, MsQueue> key-value pair
/// @note The same key assumes single-threaded operation
///
/// @param key target key value
///
/// @retval OB_SUCCESS success
/// @retval OB_ENTRY_EXIST already exists
/// @retval Other error codes failed
int add_ms_queue(const KeyType& key);
/// Terminate the MsQueue corresponding to the key to ensure that no more data will be pushed afterwards
/// @note A key queue is terminated by a thread
///
/// @param key target key value
/// @param end_seq end sequence number
/// @param timeout timeout
///
/// @retval OB_SUCCESS success
/// @retval OB_TIMEOUT timeout
/// @retval Other error codes failed
int terminate_ms_queue(const KeyType& key, const int64_t end_seq, const int64_t timeout);
/// Push a task with sequence number seq in the MsQueue of a specific key
///
/// @note: Multi-threaded push, but one seq of a key can only be processed by one thread
///
/// @param key target key value
/// @param task target task
/// @param seq task number
/// @param hash hash value
/// @param timeout timeout
///
/// @retval OB_SUCCESS success
/// @retval OB_TIMEOUT timeout
/// @retval Other error codes failed
int push(const KeyType& key, Task* task, const int64_t seq, const uint64_t hash, const int64_t timeout);
/// Notify the MsQueue corresponding to the key that all tasks with the sequence number seq have been pushed.
///
/// @param key target key value
/// @param seq task number
/// @param count number of tasks
///
/// @retval OB_SUCCESS success
/// @retval Other error codes failed
int end_batch(const KeyType& key, const int64_t seq, const int64_t count);
/// Get a task
///
/// @param [out] task returned task
/// @param [in/out] ctx task context
/// @param [in] timeout timeout
///
/// @retval OB_SUCCESS success
/// @retval OB_TIMEOUT timeout
/// @retval Other error codes failed
int get(Task*& task, void*& ctx, const int64_t timeout);
/// Get the number of sub-queues of MsQueue
int64_t get_queue_count_of_ms_queue() const
{
return queue_count_of_ms_queue_;
}
/// Get the number of MsQueue
int64_t get_ms_queue_count() const
{
return ms_queue_item_count_;
}
private:
int get_task_ctx_(TaskCtx*& ctx, const int64_t timeout);
int push_msg_task_(TaskCtx* ctx);
int get_task_(Task*& task, TaskCtx* ctx);
int handle_end_task_(TaskCtx* ctx);
private:
bool inited_;
Task end_task_; // Terminate task
ObLinkQueue task_ctx_queue_; // Task context queue
ObCond task_ctx_queue_cond_; // Condition variables corresponding to the task context queue
int64_t queue_len_of_ms_queue_; // Queue length in MsQueue
int64_t queue_count_of_ms_queue_; // Number of queues in MsQueue
int64_t ms_queue_item_count_; // Number of MsQueueItem
MsQueuePool ms_queue_pool_; // Queue resource pool
MsQueueMap ms_queue_map_; // Queue Map structure
private:
DISALLOW_COPY_AND_ASSIGN(ObExtMsQueue);
};
template <class KeyType>
int ObExtMsQueue<KeyType>::MsQueueItem::init(const KeyType& key, const int64_t queue_count, const int64_t queue_len)
{
int ret = OB_SUCCESS;
if (inited_) {
ret = OB_INIT_TWICE;
} else if (!key.is_valid() || queue_count <= 0 || queue_len <= 0) {
ret = OB_INVALID_ARGUMENT;
LIB_LOG(ERROR, "invalid args", K(ret), "key_valid", key.is_valid(), K(queue_count), K(queue_len));
} else if (OB_FAIL(queue_.init(queue_count, queue_len, &allocator_))) {
LIB_LOG(ERROR, "init ms queue fail", K(ret), K(queue_count), K(queue_len));
} else {
flags_ = static_cast<int8_t*>(allocator_.alloc(queue_count * sizeof(int8_t)));
if (NULL == flags_) {
LIB_LOG(ERROR, "allocate memory for flags array fail", K(queue_count));
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
// Initialize Flags to IDLE
for (int64_t index = 0; index < queue_count; index++) {
flags_[index] = IDLE;
}
}
}
if (OB_SUCC(ret)) {
int64_t tctx_size = queue_count * sizeof(TaskCtx);
task_ctx_array_ = static_cast<TaskCtx*>(allocator_.alloc(tctx_size));
if (NULL == task_ctx_array_) {
LIB_LOG(ERROR, "allocate memory for TaskCtx array fail", "size", tctx_size);
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
for (int64_t index = 0; index < queue_count; index++) {
new (task_ctx_array_ + index) TaskCtx();
task_ctx_array_[index].queue_index_ = index;
task_ctx_array_[index].ms_queue_item_ = this;
}
}
}
if (OB_SUCC(ret)) {
terminated_qcount_ = 0;
next_to_terminate_queue_index_ = 0;
key_ = key;
inited_ = true;
}
return ret;
}
template <class KeyType>
void ObExtMsQueue<KeyType>::MsQueueItem::reset()
{
int64_t queue_count = queue_.get_queue_num();
inited_ = false;
queue_.destroy();
if (NULL != flags_) {
allocator_.free(reinterpret_cast<char*>(flags_));
flags_ = NULL;
}
if (NULL != task_ctx_array_) {
for (int64_t index = 0; index < queue_count; index++) {
task_ctx_array_[index].~TaskCtx();
}
allocator_.free(reinterpret_cast<char*>(task_ctx_array_));
task_ctx_array_ = NULL;
}
key_.reset();
terminated_qcount_ = 0;
next_to_terminate_queue_index_ = 0;
allocator_.reset();
}
template <class KeyType>
int ObExtMsQueue<KeyType>::MsQueueItem::push(Task* task, const int64_t seq, const uint64_t hash, const int64_t timeout)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
} else {
int64_t end_time = ObTimeUtility::current_time() + timeout;
while (true) {
ret = queue_.push(task, seq, hash);
if (OB_EAGAIN != ret) {
break;
}
int64_t left_time = end_time - ObTimeUtility::current_time();
if (left_time <= 0) {
ret = OB_TIMEOUT;
break;
} else {
cond_.timedwait(left_time);
}
}
}
return ret;
}
template <class KeyType>
int ObExtMsQueue<KeyType>::MsQueueItem::end_batch(const int64_t seq, const int64_t count)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
} else {
ret = queue_.end_batch(seq, count);
}
return ret;
}
template <class KeyType>
int ObExtMsQueue<KeyType>::MsQueueItem::get(Task*& task, const int64_t queue_index)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
} else {
ret = queue_.get(task, queue_index);
if (OB_SUCC(ret)) {
cond_.signal();
}
}
return ret;
}
/////////////////////////////////////////////////////////////////////////////////////
template <class KeyType>
ObExtMsQueue<KeyType>::ObExtMsQueue()
: inited_(false),
end_task_(),
task_ctx_queue_(),
task_ctx_queue_cond_(),
queue_len_of_ms_queue_(0),
queue_count_of_ms_queue_(0),
ms_queue_item_count_(0),
ms_queue_pool_(),
ms_queue_map_()
{}
template <class KeyType>
ObExtMsQueue<KeyType>::~ObExtMsQueue()
{
destroy();
}
template <class KeyType>
int ObExtMsQueue<KeyType>::init(const int64_t max_cached_ms_queue_item_count, const int64_t queue_count_of_ms_queue,
const int64_t queue_len_of_ms_queue, const lib::ObLabel& label)
{
int ret = OB_SUCCESS;
if (inited_) {
ret = OB_INIT_TWICE;
} else if (queue_count_of_ms_queue <= 0 || queue_len_of_ms_queue <= 0 || max_cached_ms_queue_item_count <= 0) {
ret = OB_INVALID_ARGUMENT;
LIB_LOG(ERROR,
"invalid args",
K(ret),
K(queue_count_of_ms_queue),
K(queue_len_of_ms_queue),
K(max_cached_ms_queue_item_count));
} else if (OB_FAIL(ms_queue_pool_.init(max_cached_ms_queue_item_count, label))) {
LIB_LOG(ERROR, "init ms_queue pool fail", K(ret), K(max_cached_ms_queue_item_count));
} else if (OB_FAIL(ms_queue_map_.init(label))) {
LIB_LOG(ERROR, "init ms_queue map fail", K(ret));
} else {
queue_len_of_ms_queue_ = queue_len_of_ms_queue;
queue_count_of_ms_queue_ = queue_count_of_ms_queue;
ms_queue_item_count_ = 0;
inited_ = true;
}
return ret;
}
template <class KeyType>
void ObExtMsQueue<KeyType>::destroy()
{
inited_ = false;
queue_count_of_ms_queue_ = 0;
ms_queue_item_count_ = 0;
queue_len_of_ms_queue_ = 0;
(void)ms_queue_map_.destroy();
ms_queue_pool_.destroy();
}
template <class KeyType>
int ObExtMsQueue<KeyType>::add_ms_queue(const KeyType& key)
{
int ret = OB_SUCCESS;
MsQueueItem* queue = NULL;
if (!inited_) {
ret = OB_NOT_INIT;
} else if (!key.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LIB_LOG(ERROR, "invalid args", K(ret), "key_valid", key.is_valid());
} else if (OB_SUCCESS == (ret = ms_queue_map_.get(key, queue))) {
ret = OB_ENTRY_EXIST;
} else if (OB_ENTRY_NOT_EXIST != ret) {
LIB_LOG(ERROR, "get from ms_queue_map fail", K(ret), K(key));
} else { // OB_ENTRY_NOT_EXIST == ret
queue = NULL;
if (OB_FAIL(ms_queue_pool_.alloc(queue))) {
LIB_LOG(ERROR, "alloc MsQueueItem fail");
} else if (OB_FAIL(queue->init(key, queue_count_of_ms_queue_, queue_len_of_ms_queue_))) {
LIB_LOG(ERROR, "init MsQueueItem fail", K(ret), K(key), K(queue_count_of_ms_queue_), K(queue_len_of_ms_queue_));
} else if (OB_FAIL(ms_queue_map_.insert(key, queue))) {
// TODO: If you want to support concurrent inserts, handle insert conflicts here
LIB_LOG(ERROR, "insert queue into ms_queue_map fail", K(ret), K(key));
} else {
int64_t ms_queue_count = ATOMIC_AAF(&ms_queue_item_count_, 1);
MSQ_STAT(INFO,
"add",
KP(queue),
"queue_count",
queue_count_of_ms_queue_,
"queue_len",
queue_len_of_ms_queue_,
K(ms_queue_count),
K(key));
}
if (OB_SUCCESS != ret && NULL != queue) {
queue->reset();
ms_queue_pool_.free(queue);
queue = NULL;
}
}
return ret;
}
template <class KeyType>
int ObExtMsQueue<KeyType>::terminate_ms_queue(const KeyType& key, const int64_t end_seq, const int64_t timeout)
{
int ret = OB_SUCCESS;
MsQueueItem* queue = NULL;
if (!inited_) {
ret = OB_NOT_INIT;
} else if (!key.is_valid() || end_seq < 0) {
ret = OB_INVALID_ARGUMENT;
LIB_LOG(ERROR, "invalid args", K(ret), "key_valid", key.is_valid(), K(end_seq));
} else if (OB_FAIL(ms_queue_map_.get(key, queue))) {
if (OB_ENTRY_NOT_EXIST == ret) {
LIB_LOG(ERROR, "entry does not exist", K(key), K(ret));
} else {
LIB_LOG(ERROR, "get MsQueue fail", K(ret), K(key));
}
} else if (NULL == queue) {
ret = OB_ERR_UNEXPECTED;
LIB_LOG(INFO, "invalid queue", K(queue));
} else if (queue->next_to_terminate_queue_index_ >= queue_count_of_ms_queue_) {
LIB_LOG(INFO, "MsQueue has been terminated", K(key), K(queue));
} else {
// Terminate each sub-Queue in turn
for (int64_t queue_index = queue->next_to_terminate_queue_index_;
OB_SUCCESS == ret && queue_index < queue_count_of_ms_queue_;
queue_index++) {
MSQ_STAT(INFO, "terminate_push", KP(queue), K(queue_index), K(end_seq), K(key));
// NOTE: It is required that each sub-Queue can only push end_task once, because there is only one end_task
// globally, and repeated push will modify the next pointer of end_task
if (OB_FAIL(queue->push(&end_task_, end_seq, queue_index, timeout))) {
if (OB_TIMEOUT == ret) {
MSQ_STAT(INFO, "terminate_push_timeout", KP(queue), K(queue_index), K(end_seq), K(key));
} else {
LIB_LOG(ERROR, "push end_task fail", K(ret), K(key), K(queue_index), K(end_seq), KP(queue));
}
} else {
ATOMIC_INC(&(queue->next_to_terminate_queue_index_));
}
}
if (OB_SUCC(ret) && (queue->next_to_terminate_queue_index_ >= queue_count_of_ms_queue_)) {
MSQ_STAT(INFO, "terminate_end_batch", KP(queue), K(end_seq), K(key));
if (OB_FAIL(end_batch(key, end_seq, queue_count_of_ms_queue_))) {
LIB_LOG(ERROR, "end_batch end_seq fail", K(ret), K(queue), "queue", *queue, K(end_seq), K(key));
}
}
}
return ret;
}
template <class KeyType>
int ObExtMsQueue<KeyType>::push(
const KeyType& key, Task* task, const int64_t seq, const uint64_t hash, const int64_t timeout)
{
int ret = OB_SUCCESS;
MsQueueItem* queue = NULL;
if (!inited_) {
ret = OB_NOT_INIT;
} else if (!key.is_valid() || NULL == task || seq < 0) {
ret = OB_INVALID_ARGUMENT;
LIB_LOG(ERROR, "invalid args", K(ret), "key_valid", key.is_valid(), K(task), K(seq));
} else if (OB_FAIL(ms_queue_map_.get(key, queue))) {
if (OB_ENTRY_NOT_EXIST == ret) {
LIB_LOG(ERROR, "entry does not exist", K(key), K(ret));
} else {
LIB_LOG(ERROR, "get MsQueue fail", K(ret), K(key));
}
} else if (NULL == queue) {
ret = OB_ERR_UNEXPECTED;
LIB_LOG(INFO, "invalid queue", K(queue));
} else if (OB_FAIL(queue->push(task, seq, hash, timeout))) {
if (OB_TIMEOUT != ret) {
LIB_LOG(ERROR, "push task into MsQueue fail", K(ret), K(key), KP(queue), K(task), K(seq), K(hash));
}
} else {
MSQ_STAT(DEBUG, "push_task", KP(queue), K(seq), K(hash), K(task), K(key));
}
return ret;
}
template <class KeyType>
int ObExtMsQueue<KeyType>::end_batch(const KeyType& key, const int64_t seq, const int64_t count)
{
int ret = OB_SUCCESS;
MsQueueItem* queue = NULL;
if (!inited_) {
ret = OB_NOT_INIT;
} else if (!key.is_valid() || seq < 0) {
ret = OB_INVALID_ARGUMENT;
LIB_LOG(ERROR, "invalid args", K(ret), "key_valid", key.is_valid(), K(seq), K(count));
} else if (OB_FAIL(ms_queue_map_.get(key, queue))) {
if (OB_ENTRY_NOT_EXIST == ret) {
LIB_LOG(ERROR, "entry does not exist", K(key), K(ret));
} else {
LIB_LOG(ERROR, "get MsQueue fail", K(ret), K(key));
}
} else if (NULL == queue) {
ret = OB_ERR_UNEXPECTED;
LIB_LOG(INFO, "invalid queue", K(queue));
} else if (OB_FAIL(queue->end_batch(seq, count))) {
LIB_LOG(ERROR, "end_batch fail", K(ret), K(key), KP(queue), K(seq), K(count));
} else {
MSQ_STAT(DEBUG, "end_batch", KP(queue), K(seq), K(count), K(key));
// Check the status of each Queue and try to generate a message task for each Queue
for (int64_t queue_index = 0; OB_SUCC(ret) && queue_index < queue_count_of_ms_queue_; queue_index++) {
if (NULL == queue->flags_ || NULL == queue->task_ctx_array_) {
ret = OB_ERR_UNEXPECTED;
} else {
int8_t* flag_ptr = queue->flags_ + queue_index;
TaskCtx* task_ctx_ptr = queue->task_ctx_array_ + queue_index;
// When there is data in the Queue that can be output, forcibly set the status of the Queue to READY, and then
// check the original status value:
// 1. If the original Flag is in the IDLE state, a message task is generated to notify the worker thread to
// consume the data of the Queue
// 2. If the original Flag is in another state, it means that a thread is processing the data of the Queue, so
// no processing is done
if (queue->queue_.next_is_ready(queue_index)) {
int8_t old_flag = ATOMIC_SET(flag_ptr, READY);
if (IDLE == old_flag) {
MSQ_STAT(DEBUG, "push_msg_task", KP(queue), K(queue_index), K(seq), K(old_flag), K(key));
ret = push_msg_task_(task_ctx_ptr);
}
}
}
}
}
return ret;
}
template <class KeyType>
int ObExtMsQueue<KeyType>::push_msg_task_(TaskCtx* ctx)
{
int ret = OB_SUCCESS;
if (!(inited_ && NULL != ctx && ctx->is_valid())) {
ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(task_ctx_queue_.push(ctx))) {
LIB_LOG(ERROR, "push task ctx into msg queue fail", K(ret), "task_ctx", *ctx);
} else {
// succ
task_ctx_queue_cond_.signal();
}
return ret;
}
template <class KeyType>
int ObExtMsQueue<KeyType>::get(Task*& task, void*& ctx, const int64_t timeout)
{
int ret = OB_SUCCESS;
TaskCtx* task_ctx = static_cast<TaskCtx*>(ctx);
if (!inited_) {
ret = OB_NOT_INIT;
} else if (NULL != task_ctx && !task_ctx->is_valid()) {
ret = OB_INVALID_ARGUMENT;
LIB_LOG(ERROR, "invalid argument", K(task_ctx), "task_ctx", *task_ctx);
} else {
int64_t end_time = ObTimeUtility::current_time() + timeout;
int64_t left_time = timeout;
while (true) {
// If there is no task context, get a context from the queue
if (NULL == task_ctx && OB_FAIL(get_task_ctx_(task_ctx, left_time))) {
if (OB_TIMEOUT != ret) {
LIB_LOG(ERROR, "get_task_ctx_ fail", K(ret));
}
break;
// Take tasks from the queue
} else if (OB_EAGAIN != (ret = get_task_(task, task_ctx))) {
break;
} else {
task_ctx = NULL;
left_time = end_time - ObTimeUtility::current_time();
if (left_time <= 0) {
ret = OB_TIMEOUT;
break;
}
}
}
// First reset the context
ctx = NULL;
if (OB_SUCC(ret)) {
if (NULL == task || NULL == task_ctx || !task_ctx->is_valid()) {
LIB_LOG(ERROR,
"unexpected error: task or task_ctx is invalid",
K(task),
K(task_ctx),
"task_ctx",
NULL == task_ctx ? "NULL" : to_cstring(*task_ctx));
ret = OB_ERR_UNEXPECTED;
} else {
ctx = task_ctx;
}
} else { // OB_SUCCESS != ret
ctx = NULL;
}
}
return ret;
}
template <class KeyType>
int ObExtMsQueue<KeyType>::handle_end_task_(TaskCtx* ctx)
{
int ret = OB_SUCCESS;
if (!(inited_ && NULL != ctx && ctx->is_valid())) {
ret = OB_ERR_UNEXPECTED;
} else {
MSQ_STAT(DEBUG,
"handle_end_task",
"queue",
ctx->ms_queue_item_,
"queue_index",
ctx->queue_index_,
"terminated_qcount",
ctx->ms_queue_item_->terminated_qcount_,
K(ctx),
"key",
ctx->ms_queue_item_->key_);
int64_t terminated_qcount = ATOMIC_AAF(&(ctx->ms_queue_item_->terminated_qcount_), 1);
// If it is the last Queue to be terminated, the Queue will be recycled
if (terminated_qcount >= queue_count_of_ms_queue_) {
KeyType key = ctx->ms_queue_item_->key_;
MsQueueItem* queue = ctx->ms_queue_item_;
int64_t ms_queue_count = ATOMIC_AAF(&ms_queue_item_count_, -1);
MSQ_STAT(INFO, "recycle_ms_queue", KP(queue), K(terminated_qcount), K(ms_queue_count), K(key));
ctx = NULL;
(void)ms_queue_map_.erase(key);
queue->reset();
ms_queue_pool_.free(queue);
queue = NULL;
}
}
return ret;
}
template <class KeyType>
int ObExtMsQueue<KeyType>::get_task_ctx_(TaskCtx*& ctx, const int64_t timeout)
{
int ret = OB_SUCCESS;
int64_t end_time = ObTimeUtility::current_time() + timeout;
if (!inited_) {
ret = OB_ERR_UNEXPECTED;
} else {
while (true) {
ret = task_ctx_queue_.pop((ObLink*&)ctx);
if (OB_EAGAIN != ret) {
break;
}
int64_t left_time = end_time - ObTimeUtility::current_time();
if (left_time <= 0) {
ret = OB_TIMEOUT;
break;
}
task_ctx_queue_cond_.timedwait(left_time);
}
if (OB_SUCC(ret) && ctx->is_valid()) {
MSQ_STAT(DEBUG,
"consume_queue_begin",
"queue",
ctx->ms_queue_item_,
"queue_index",
ctx->queue_index_,
"key",
ctx->ms_queue_item_->key_);
// After obtaining the task context, first set the corresponding queue status to HANDLING
int8_t* flag_ptr = ctx->ms_queue_item_->flags_ + ctx->queue_index_;
int8_t old_flag = ATOMIC_SET(flag_ptr, HANDLING);
MSQ_STAT(DEBUG,
"get_msg_task",
"queue",
ctx->ms_queue_item_,
"queue_index",
ctx->queue_index_,
K(old_flag),
"key",
ctx->ms_queue_item_->key_);
}
}
return ret;
}
template <class KeyType>
int ObExtMsQueue<KeyType>::get_task_(Task*& task, TaskCtx* ctx)
{
int ret = OB_SUCCESS;
if (!(inited_ && NULL != ctx && ctx->is_valid())) {
ret = OB_ERR_UNEXPECTED;
} else {
ret = ctx->ms_queue_item_->get(task, ctx->queue_index_);
if (OB_SUCC(ret)) {
LIB_LOG(DEBUG,
"get_task",
"queue",
ctx->ms_queue_item_,
"queue_index",
ctx->queue_index_,
K(task),
"key",
ctx->ms_queue_item_->key_);
}
if (OB_SUCCESS == ret && &end_task_ == task) {
// If a termination task is encountered, terminate the Queue
if (OB_FAIL(handle_end_task_(ctx))) {
LIB_LOG(ERROR, "handle_end_task_ fail", K(ret), K(ctx));
} else {
task = NULL;
ctx = NULL;
// Always return to retry
ret = OB_EAGAIN;
}
} else if (OB_EAGAIN == ret) {
MSQ_STAT(DEBUG,
"consume_queue_end",
"queue",
ctx->ms_queue_item_,
"queue_index",
ctx->queue_index_,
"key",
ctx->ms_queue_item_->key_);
// After the data in the queue is fetched, check the Flag status of the queue
// 1) If it is READY, it means that there is a thread that thinks that there is data in the Queue that can be
// processed, so the message task of the Queue is directly generated for subsequent threads to process 2) If it is
// still HANDLING, set Flag to IDLE state
int8_t* flag_ptr = ctx->ms_queue_item_->flags_ + ctx->queue_index_;
int8_t old_flag = ATOMIC_CAS(flag_ptr, HANDLING, IDLE);
if (READY == old_flag) {
MSQ_STAT(DEBUG,
"push_msg_task",
"queue",
ctx->ms_queue_item_,
"queue_index",
ctx->queue_index_,
K(old_flag),
"key",
ctx->ms_queue_item_->key_);
if (OB_FAIL(push_msg_task_(ctx))) {
LIB_LOG(ERROR, "push_msg_task_ fail", K(ret), K(ctx));
} else {
// Always return to retry
ret = OB_EAGAIN;
}
}
} else if (OB_FAIL(ret)) {
LIB_LOG(ERROR, "get task from MsQueue fail", K(ret));
}
}
return ret;
}
} // namespace common
} // namespace oceanbase
#endif /* OCEANBASE_LIB_QUEUE_OB_EXT_MS_QUEUE_H__ */

View File

@ -0,0 +1,56 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_QUEUE_OB_PRIORITY_QUEUE_H_
#define OCEANBASE_QUEUE_OB_PRIORITY_QUEUE_H_
#include "lib/lock/ob_seq_sem.h"
#include "lib/queue/ob_link_queue.h"
#include "lib/queue/ob_futex_queue.h"
namespace oceanbase {
namespace common {
class ObFakePriorityQueue {
public:
typedef ObLink Link;
enum { Q_CAPACITY = 1 << 16, PRIO_CNT = 2 };
ObFakePriorityQueue()
{
queue_.init(Q_CAPACITY);
}
~ObFakePriorityQueue()
{}
int64_t size() const
{
return queue_.size();
}
int push(Link* data, int priority)
{
UNUSED(priority);
return queue_.push((void*)data);
}
int pop(Link*& data, int64_t timeout_us)
{
return queue_.pop((void*&)data, timeout_us);
}
private:
ObFutexQueue queue_;
};
}; // end namespace common
}; // end namespace oceanbase
#endif /* OCEANBASE_QUEUE_OB_PRIORITY_QUEUE_H_ */

View File

@ -0,0 +1,227 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_COMMON_FIXED_QUEUE_
#define OCEANBASE_COMMON_FIXED_QUEUE_
#include "lib/allocator/ob_allocator.h"
#include "lib/ob_errno.h"
#include "lib/ob_define.h"
#include "lib/allocator/ob_mod_define.h"
#include "lib/atomic/ob_atomic.h"
namespace oceanbase {
namespace common {
template <typename T>
class ObFixedQueue {
public:
ObFixedQueue();
~ObFixedQueue();
public:
int init(const int64_t max_num, char* buf);
int init(const int64_t max_num, ObIAllocator* allocator = global_default_allocator,
const lib::ObLabel& label = ObModIds::OB_FIXED_QUEUE);
void destroy();
public:
int push(T* ptr);
int pop(T*& ptr);
inline int64_t get_total() const;
inline int64_t get_free() const;
inline int64_t get_curr_total() const;
bool is_inited() const
{
return is_inited_;
};
int64_t capacity() const
{
return max_num_;
}
private:
struct ArrayItem {
T* data;
};
private:
inline int64_t get_total_(const uint64_t consumer, const uint64_t producer) const;
inline int64_t get_free_(const uint64_t consumer, const uint64_t producer) const;
private:
bool is_inited_;
int64_t max_num_;
ArrayItem* array_;
ObIAllocator* allocator_;
uint64_t consumer_ CACHE_ALIGNED;
uint64_t producer_ CACHE_ALIGNED;
private:
DISALLOW_COPY_AND_ASSIGN(ObFixedQueue);
};
template <typename T>
ObFixedQueue<T>::ObFixedQueue()
: is_inited_(false), max_num_(0), array_(NULL), allocator_(NULL), consumer_(0), producer_(0)
{}
template <typename T>
ObFixedQueue<T>::~ObFixedQueue()
{
destroy();
}
template <typename T>
int ObFixedQueue<T>::init(const int64_t max_num, ObIAllocator* allocator, const lib::ObLabel& label)
{
int ret = common::OB_SUCCESS;
lib::ObMemAttr attr;
attr.label_ = label;
if (NULL == allocator || 0 >= max_num) {
ret = common::OB_INVALID_ARGUMENT;
} else if (is_inited_) {
ret = common::OB_INIT_TWICE;
} else if (NULL == (array_ = static_cast<ArrayItem*>(allocator->alloc(sizeof(ArrayItem) * max_num, attr)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
} else {
memset(array_, 0, sizeof(ArrayItem) * max_num);
max_num_ = max_num;
allocator_ = allocator;
consumer_ = 0;
producer_ = 0;
is_inited_ = true;
}
return ret;
}
template <typename T>
int ObFixedQueue<T>::init(const int64_t max_num, char* buf)
{
int ret = common::OB_SUCCESS;
if (NULL == buf || 0 >= max_num) {
ret = common::OB_INVALID_ARGUMENT;
} else if (is_inited_) {
ret = common::OB_INIT_TWICE;
} else {
array_ = reinterpret_cast<ArrayItem*>(buf);
memset(array_, 0, sizeof(ArrayItem) * max_num);
max_num_ = max_num;
allocator_ = NULL;
consumer_ = 0;
producer_ = 0;
is_inited_ = true;
}
return ret;
}
template <typename T>
void ObFixedQueue<T>::destroy()
{
if (is_inited_) {
if (NULL != allocator_) {
allocator_->free(array_);
array_ = NULL;
}
array_ = NULL;
max_num_ = 0;
consumer_ = 0;
producer_ = 0;
allocator_ = NULL;
is_inited_ = false;
}
}
template <typename T>
inline int64_t ObFixedQueue<T>::get_total() const
{
return get_total_(consumer_, producer_);
}
template <typename T>
inline int64_t ObFixedQueue<T>::get_curr_total() const
{
return get_total_(ATOMIC_LOAD(&consumer_), ATOMIC_LOAD(&producer_));
}
template <typename T>
inline int64_t ObFixedQueue<T>::get_free() const
{
return get_free_(consumer_, producer_);
}
template <typename T>
inline int64_t ObFixedQueue<T>::get_total_(const uint64_t consumer, const uint64_t producer) const
{
return (producer - consumer);
}
template <typename T>
inline int64_t ObFixedQueue<T>::get_free_(const uint64_t consumer, const uint64_t producer) const
{
return max_num_ - get_total_(consumer, producer);
}
template <typename T>
int ObFixedQueue<T>::push(T* ptr)
{
int ret = common::OB_SUCCESS;
if (IS_NOT_INIT) {
ret = common::OB_NOT_INIT;
} else if (NULL == ptr) {
ret = common::OB_INVALID_ARGUMENT;
} else {
uint64_t push = ATOMIC_LOAD(&producer_);
uint64_t push_limit = ATOMIC_LOAD(&consumer_) + max_num_;
uint64_t old_push = 0;
while (((old_push = push) < push_limit || push < (push_limit = ATOMIC_LOAD(&consumer_) + max_num_)) &&
old_push != (push = ATOMIC_CAS(&producer_, old_push, old_push + 1))) {
PAUSE();
}
if (push < push_limit) {
T** pdata = &array_[push % max_num_].data;
while (NULL != ATOMIC_CAS(pdata, NULL, ptr)) {
PAUSE();
}
} else {
ret = common::OB_SIZE_OVERFLOW;
}
}
return ret;
}
template <typename T>
int ObFixedQueue<T>::pop(T*& ptr)
{
int ret = common::OB_SUCCESS;
if (IS_NOT_INIT) {
ret = common::OB_NOT_INIT;
} else {
uint64_t pop = ATOMIC_LOAD(&consumer_);
uint64_t pop_limit = ATOMIC_LOAD(&producer_);
uint64_t old_pop = 0;
while (((old_pop = pop) < pop_limit || pop < (pop_limit = ATOMIC_LOAD(&producer_))) &&
old_pop != (pop = ATOMIC_CAS(&consumer_, old_pop, old_pop + 1))) {
PAUSE();
}
if (pop < pop_limit) {
T** pdata = &array_[(pop % max_num_)].data;
while (NULL == (ptr = static_cast<T*>(ATOMIC_SET(pdata, NULL)))) {
PAUSE();
}
} else {
ret = common::OB_ENTRY_NOT_EXIST;
}
}
return ret;
}
} // namespace common
} // namespace oceanbase
#endif // OCEANBASE_COMMON_FIXED_QUEUE_

View File

@ -0,0 +1,241 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_QUEUE_OB_FUTEX_QUEUE_H_
#define OCEANBASE_QUEUE_OB_FUTEX_QUEUE_H_
#include <stdint.h>
#include <stddef.h>
#include "lib/ob_define.h"
#include "lib/allocator/ob_malloc.h"
#include "lib/thread_local/ob_tsi_utils.h"
#include "lib/coro/co.h"
namespace oceanbase {
namespace common {
class ObFutexQueue {
public:
static bool is2n(uint64_t n)
{
return 0 == (n & (n - 1));
}
static uint64_t faa_bounded(uint64_t* addr, uint64_t limit)
{
uint64_t ov = 0;
uint64_t nv = ATOMIC_LOAD(addr);
while ((ov = nv) < limit && ov != (nv = ATOMIC_VCAS(addr, ov, ov + 1))) {
PAUSE();
}
return nv;
}
struct Cond {
public:
Cond() : n_waiters_(0)
{}
~Cond()
{}
void signal()
{
(void)ATOMIC_FAA(&futex_.uval(), 1);
if (ATOMIC_LOAD(&n_waiters_) > 0) {
futex_.wake(INT32_MAX);
}
}
uint32_t get_seq()
{
return ATOMIC_LOAD(&futex_.uval());
}
void wait(uint32_t cmp, int64_t timeout)
{
if (timeout > 0) {
(void)ATOMIC_FAA(&n_waiters_, 1);
futex_.wait(cmp, timeout);
(void)ATOMIC_FAA(&n_waiters_, -1);
}
}
private:
lib::CoFutex futex_;
uint32_t n_waiters_;
};
struct Item {
Item() : cond_(), data_(NULL)
{}
~Item()
{}
void push(void* data)
{
if (NULL != data) {
while (!ATOMIC_BCAS(&data_, NULL, data)) {
PAUSE();
}
cond_.signal();
}
}
void* pop(int64_t timeout)
{
void* data = NULL;
if (NULL == (data = ATOMIC_TAS(&data_, NULL)) && timeout > 0) {
uint32_t seq = cond_.get_seq();
if (NULL == (data = ATOMIC_TAS(&data_, NULL))) {
cond_.wait(seq, timeout);
}
}
return data;
}
Cond cond_;
void* data_;
};
public:
const static uint64_t WAKE_ID = ~(0LL);
ObFutexQueue() : push_(0), pop_(0), capacity_(0), allocated_(NULL), items_(NULL)
{}
~ObFutexQueue()
{
destroy();
}
static uint64_t calc_mem_usage(uint64_t capacity)
{
return sizeof(Item) * capacity;
}
int init(const uint64_t capacity, const lib::ObLabel& label = ObModIds::OB_LIGHTY_QUEUE)
{
int err = OB_SUCCESS;
if (capacity <= 0 || !is2n(capacity)) {
err = OB_INVALID_ARGUMENT;
} else if (NULL == (allocated_ = ob_malloc(calc_mem_usage(capacity), label))) {
err = OB_ALLOCATE_MEMORY_FAILED;
} else {
err = init(capacity, allocated_);
}
if (OB_SUCCESS != err) {
destroy();
}
return err;
}
int init(uint64_t capacity, void* data)
{
int err = OB_SUCCESS;
;
if (capacity <= 0 || !is2n(capacity) || NULL == data) {
err = OB_INVALID_ARGUMENT;
} else {
capacity_ = capacity;
items_ = (Item*)data;
memset(data, 0, calc_mem_usage(capacity));
}
return err;
}
void destroy()
{
if (NULL != items_) {
push_ = 0;
pop_ = 0;
capacity_ = 0;
items_ = NULL;
}
if (NULL != allocated_) {
ob_free(allocated_);
allocated_ = NULL;
}
}
int push(void* data)
{
int err = OB_SUCCESS;
;
if (NULL == data) {
err = OB_INVALID_ARGUMENT;
} else if (NULL == items_) {
err = OB_NOT_INIT;
} else {
err = do_push(data, ATOMIC_LOAD(&pop_) + capacity_);
}
return err;
}
int pop(void*& data, int64_t timeout_us)
{
int err = OB_SUCCESS;
;
if (NULL == items_) {
err = OB_NOT_INIT;
} else {
err = do_pop(data, ATOMIC_FAA(&pop_, 1), timeout_us);
}
return err;
}
int64_t size() const
{
uint64_t pop = ATOMIC_LOAD(&pop_);
uint64_t push = ATOMIC_LOAD(&push_);
return (int64_t)(push - pop);
}
private:
uint64_t idx(uint64_t x)
{
return x & (capacity_ - 1);
}
int do_pop(void*& data, uint64_t pop_idx, int64_t timeout)
{
int err = OB_SUCCESS;
;
Item* item = items_ + idx(pop_idx);
if (NULL == (data = item->pop(timeout))) {
while (NULL == (data = item->pop(0))) {
do_push((void*)WAKE_ID, pop_idx + 1);
}
}
if ((void*)WAKE_ID == data) {
err = OB_ENTRY_NOT_EXIST;
data = NULL;
}
return err;
}
int do_push(void* data, uint64_t limit)
{
int err = OB_SUCCESS;
uint64_t push_idx = faa_bounded(&push_, limit);
if (push_idx >= limit) {
err = OB_SIZE_OVERFLOW;
} else {
items_[idx(push_idx)].push(data);
}
return err;
}
private:
uint64_t push_ CACHE_ALIGNED;
uint64_t pop_ CACHE_ALIGNED;
uint64_t capacity_ CACHE_ALIGNED;
void* allocated_;
Item* items_;
};
}; // namespace common
}; // end namespace oceanbase
#endif /* OCEANBASE_QUEUE_OB_FUTEX_QUEUE_H_ */

View File

@ -0,0 +1,112 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "lib/queue/ob_lighty_queue.h"
#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>
#include "lib/utility/utility.h"
namespace oceanbase {
namespace common {
static int64_t get_us()
{
return ::oceanbase::common::ObTimeUtility::current_time();
}
int LightyQueue::init(const uint64_t capacity, const lib::ObLabel& label)
{
return queue_.init(capacity, global_default_allocator, label);
}
int LightyQueue::push(void* data, const int64_t timeout)
{
int ret = OB_SUCCESS;
int64_t abs_timeout = (timeout > 0 ? (get_us() + timeout) : 0);
int64_t wait_timeout = 0;
while (true) { // WHITESCAN: OB_CUSTOM_ERROR_COVERED
uint32_t seq = cond_.get_seq();
if (OB_SUCCESS == (ret = queue_.push(data))) {
break;
} else if (timeout <= 0 || (wait_timeout = abs_timeout - get_us()) <= 0) {
break;
} else {
cond_.wait(seq, wait_timeout);
}
}
if (OB_SUCCESS == ret) {
cond_.signal();
}
return ret;
}
int LightyQueue::pop(void*& data, const int64_t timeout)
{
int ret = OB_SUCCESS;
int64_t abs_timeout = (timeout > 0 ? (get_us() + timeout) : 0);
int64_t wait_timeout = 0;
while (true) { // WHITESCAN: OB_CUSTOM_ERROR_COVERED
uint32_t seq = cond_.get_seq();
if (OB_SUCCESS == (ret = queue_.pop(data))) {
break;
} else if (timeout <= 0 || (wait_timeout = abs_timeout - get_us()) <= 0) {
break;
} else {
cond_.wait(seq, wait_timeout);
}
}
if (OB_SUCCESS == ret) {
cond_.signal();
}
return ret;
}
int LightyQueue::multi_pop(void** data, const int64_t data_count, int64_t& avail_count, const int64_t timeout)
{
int ret = OB_SUCCESS;
avail_count = 0;
if (data_count > 0) {
void* curr_data = NULL;
int64_t abs_timeout = (timeout > 0 ? (get_us() + timeout) : 0);
int64_t wait_timeout = 0;
while (true) { // WHITESCAN: OB_CUSTOM_ERROR_COVERED
uint32_t seq = cond_.get_seq();
if (OB_SUCCESS == (ret = queue_.pop(curr_data))) {
data[avail_count++] = curr_data;
curr_data = NULL;
cond_.signal();
if (avail_count >= data_count) {
// finish then break
break;
}
} else if (avail_count > 0) {
// not finish, but has already got one, break
ret = OB_SUCCESS;
break;
} else if (timeout <= 0 || (wait_timeout = abs_timeout - get_us()) <= 0) {
break;
} else {
cond_.wait(seq, wait_timeout);
}
}
}
return ret;
}
void LightyQueue::reset()
{
void* p = NULL;
while (0 == pop(p, 0))
;
}
}; // end namespace common
}; // end namespace oceanbase

View File

@ -0,0 +1,281 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_COMMON_OB_LIGHTY_QUEUE_
#define OB_COMMON_OB_LIGHTY_QUEUE_
#include <stdint.h>
#include <stddef.h>
#include "lib/allocator/ob_malloc.h"
#include "lib/ob_define.h"
#include "lib/queue/ob_fixed_queue.h"
#include "lib/thread_local/ob_tsi_utils.h"
#include "lib/coro/co.h"
namespace oceanbase {
namespace common {
struct ObLightyCond {
public:
ObLightyCond() : n_waiters_(0)
{}
~ObLightyCond()
{}
void signal()
{
(void)ATOMIC_FAA(&futex_.uval(), 1);
if (ATOMIC_LOAD(&n_waiters_) > 0) {
futex_.wake(INT32_MAX);
}
}
uint32_t get_seq()
{
return ATOMIC_LOAD(&futex_.uval());
}
void wait(const uint32_t cmp, const int64_t timeout)
{
if (timeout > 0) {
(void)ATOMIC_FAA(&n_waiters_, 1);
(void)futex_.wait(cmp, timeout);
(void)ATOMIC_FAA(&n_waiters_, -1);
}
}
private:
lib::CoFutex futex_;
uint32_t n_waiters_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLightyCond);
};
class ObLightyQueue {
public:
typedef ObLightyCond Cond;
ObLightyQueue() : capacity_(0), n_cond_(0), data_(NULL), cond_(NULL), push_(0), pop_(0)
{}
~ObLightyQueue()
{
destroy();
}
int init(const uint64_t capacity, const lib::ObLabel& label = ObModIds::OB_LIGHTY_QUEUE,
const uint64_t tenant_id = common::OB_SERVER_TENANT_ID)
{
int ret = OB_SUCCESS;
uint64_t n_cond = calc_n_cond(capacity);
ObMemAttr attr;
attr.tenant_id_ = tenant_id;
attr.label_ = label;
if (is_inited()) {
ret = OB_INIT_TWICE;
} else if (NULL == (data_ = (void**)ob_malloc(capacity * sizeof(void*) + n_cond * sizeof(Cond), attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
memset(data_, 0, capacity * sizeof(void*));
capacity_ = capacity;
n_cond_ = n_cond;
cond_ = (Cond*)(data_ + capacity);
for (int i = 0; i < n_cond; i++) {
new (cond_ + i) Cond();
}
}
return ret;
}
void destroy()
{
if (NULL != data_) {
ob_free(data_);
data_ = NULL;
cond_ = NULL;
}
}
void reset()
{
clear();
}
void clear()
{
void* p = NULL;
while (OB_SUCCESS == pop(p, 0))
;
}
int64_t size() const
{
return (int64_t)(ATOMIC_LOAD(&push_) - ATOMIC_LOAD(&pop_));
}
int64_t capacity() const
{
return capacity_;
}
bool is_inited() const
{
return NULL != data_;
}
int push(void* p)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
ret = OB_NOT_INIT;
} else {
uint64_t limit = ATOMIC_LOAD(&pop_) + capacity_;
uint64_t seq = push_bounded(p, limit);
if (seq >= limit) {
ret = OB_SIZE_OVERFLOW;
}
}
return ret;
}
int pop(void*& p, int64_t timeout = 0)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
ret = OB_NOT_INIT;
} else {
void* DUMMY = (void*)(~0ULL);
uint64_t seq = ATOMIC_FAA(&pop_, 1);
uint64_t push_idx = ATOMIC_LOAD(&push_);
if (push_idx <= seq) {
int64_t abs_timeout = (timeout > 0 ? (get_us() + timeout) : 0);
while ((push_idx = wait_push(seq, timeout)) <= seq && (timeout = abs_timeout - get_us()) > 0) {
PAUSE();
}
while ((push_idx = push_bounded(DUMMY, seq + 1)) < seq) {
PAUSE();
}
}
if (DUMMY == (p = fetch(seq))) {
p = NULL;
ret = OB_ENTRY_NOT_EXIST;
}
}
return ret;
}
private:
static uint64_t calc_n_cond(uint64_t capacity)
{
return std::min(1024ULL, 1ULL << (63 - __builtin_clzll(capacity)));
}
uint64_t push_bounded(void* p, uint64_t limit)
{
uint64_t seq = inc_if_lt(&push_, limit);
if (seq < limit) {
store(seq, p);
get_cond(seq).signal();
}
return seq;
}
uint64_t wait_push(uint64_t seq, int64_t timeout)
{
uint32_t wait_id = get_cond(seq).get_seq();
uint64_t push_idx = ATOMIC_LOAD(&push_);
if (push_idx <= seq) {
get_cond(seq).wait(wait_id, timeout);
}
return push_idx;
}
static int64_t get_us()
{
return ::oceanbase::common::ObTimeUtility::current_time();
}
uint64_t idx(uint64_t x)
{
return x % capacity_;
}
Cond& get_cond(uint64_t seq)
{
return cond_[seq % n_cond_];
}
static uint64_t inc_if_lt(uint64_t* addr, uint64_t b)
{
uint64_t ov = ATOMIC_LOAD(addr);
uint64_t nv = 0;
while (ov < b && ov != (nv = ATOMIC_VCAS(addr, ov, ov + 1))) {
ov = nv;
}
return ov;
}
void* fetch(uint64_t seq)
{
void* p = NULL;
void** addr = data_ + idx(seq);
while (NULL == ATOMIC_LOAD(addr) || NULL == (p = ATOMIC_TAS(addr, NULL))) {
PAUSE();
}
return p;
}
void store(uint64_t seq, void* p)
{
void** addr = data_ + idx(seq);
while (!ATOMIC_BCAS(addr, NULL, p)) {
PAUSE();
}
}
private:
uint64_t capacity_;
uint64_t n_cond_;
void** data_;
Cond* cond_;
uint64_t push_ CACHE_ALIGNED;
uint64_t pop_ CACHE_ALIGNED;
};
class LightyQueue {
public:
typedef ObLightyCond Cond;
LightyQueue()
{}
~LightyQueue()
{
destroy();
}
public:
int init(const uint64_t capacity, const lib::ObLabel& label = ObModIds::OB_LIGHTY_QUEUE);
void destroy()
{
queue_.destroy();
}
void reset();
int64_t size() const
{
return queue_.get_total();
}
int64_t curr_size() const
{
return queue_.get_total();
}
int64_t max_size() const
{
return queue_.capacity();
}
bool is_inited() const
{
return queue_.is_inited();
}
int push(void* data, const int64_t timeout = 0);
int pop(void*& data, const int64_t timeout = 0);
int multi_pop(void** data, const int64_t data_count, int64_t& avail_count, const int64_t timeout = 0);
private:
typedef ObFixedQueue<void> Queue;
Queue queue_;
Cond cond_;
private:
DISALLOW_COPY_AND_ASSIGN(LightyQueue);
};
}; // end namespace common
}; // end namespace oceanbase
#endif /* __OB_COMMON_OB_LIGHTY_QUEUE_H__ */

274
deps/oblib/src/lib/queue/ob_link.h vendored Normal file
View File

@ -0,0 +1,274 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LIB_OB_QUEUE_
#define OCEANBASE_LIB_OB_QUEUE_
#include <stdlib.h> // NULL
#include <errno.h>
#include "lib/ob_define.h"
#include "lib/atomic/ob_atomic.h"
namespace oceanbase {
namespace common {
struct ObLink {
ObLink() : next_(NULL)
{}
~ObLink()
{
next_ = NULL;
}
ObLink* next_;
void reset()
{
next_ = NULL;
}
bool is_deleted() const
{
return ((uint64_t)next_) & 1;
}
};
inline bool is_last_bit_set(uint64_t x)
{
return x & 1;
}
inline uint64_t clear_last_bit(uint64_t x)
{
return x & ~1;
}
// try until last_bit is set; return value before CAS
inline uint64_t set_last_bit(uint64_t* addr)
{
uint64_t ov = 0;
uint64_t nv = ATOMIC_LOAD(addr); // newest value before CAS
while (0 == ((ov = nv) & 1) && ov != (nv = ATOMIC_VCAS(addr, ov, ov | 1))) {
// do nothing
}
return nv;
}
inline void unset_last_bit(uint64_t* addr)
{
ATOMIC_STORE(addr, clear_last_bit(ATOMIC_LOAD(addr)));
}
inline ObLink* link_next(ObLink* cur)
{
return (ObLink*)clear_last_bit((uint64_t)ATOMIC_LOAD(&cur->next_));
}
// return prev->next_ before CAS
// success is return == next
inline ObLink* link_insert(ObLink* prev, ObLink* target, ObLink* next)
{
target->next_ = next;
return ATOMIC_VCAS(&prev->next_, next, target);
}
// return prev->next_ before CAS
// success is return == target
inline ObLink* link_del(ObLink* prev, ObLink* target, ObLink*& next)
{
ObLink* ret = NULL;
if (!is_last_bit_set((uint64_t)(next = (ObLink*)set_last_bit((uint64_t*)(&target->next_))))) {
if (target != (ret = ATOMIC_VCAS(&prev->next_, target, next))) {
unset_last_bit((uint64_t*)(&target->next_));
}
}
return ret;
}
// return first node that >= key
// make sure start > key
template <typename T>
T* ol_search(T* start, T* key, T*& prev)
{
T* next = NULL;
prev = start;
while (NULL != (next = (T*)link_next(prev)) && next->compare(key) < 0) {
prev = next;
}
return next;
}
// return first node that > key
// make sure start > key
template <typename T>
T* ol_search_next(T* start, T* key, T*& prev)
{
T* next = NULL;
prev = start;
while (NULL != (next = (T*)link_next(prev)) && next->compare(key) <= 0) {
prev = next;
}
return next;
}
template <typename T>
int ol_get(T* start, T* key, T*& target)
{
int err = 0;
T* prev = NULL;
target = ol_search(start, key, prev);
if (NULL == target || 0 != target->compare(key)) {
err = -ENOENT;
}
return err;
}
template <typename T>
int _ol_insert(T* start, T* target)
{
int err = 0;
T* prev = NULL;
T* next = ol_search(start, target, prev);
if (NULL != next && 0 == next->compare(target)) {
err = -EEXIST;
} else if (next != link_insert(prev, target, next)) {
err = -EAGAIN;
}
return err;
}
template <typename T>
int _ol_del(T* start, T* key, T*& target)
{
int err = 0;
T* prev = NULL;
T* next = NULL;
target = ol_search(start, key, prev);
if (NULL == target || 0 != target->compare(key)) {
err = -ENOENT;
} else if (target == link_del(prev, target, (ObLink*&)next)) {
err = 0;
} else if (is_last_bit_set((uint64_t)next)) {
err = -ENOENT;
} else {
err = -EAGAIN;
}
return err;
}
// make sure start is valid all time
template <typename T>
int ol_insert(T* start, T* target)
{
int err = 0;
while (-EAGAIN == (err = _ol_insert(start, target)))
;
return err;
}
// make sure start is valid all time
template <typename T>
int ol_del(T* start, T* key, T*& target)
{
int err = 0;
while (-EAGAIN == (err = _ol_del(start, key, target)))
;
return err;
}
struct ObDLink : public ObLink {
ObDLink() : ObLink(), prev_(NULL)
{}
~ObDLink()
{}
ObDLink* prev_;
};
// correct target->prev_ link
inline void try_correct_prev_link(ObDLink* target, ObDLink* prev)
{
if (NULL != target) {
while (true) {
// all threads DO FAS to make sure that one operation will correct the target->prev_ after all operation stop.
ObDLink* old_prev = ATOMIC_TAS(&target->prev_, prev);
if (ATOMIC_LOAD(&prev->next_) == target) {
// make sure that the last operation can return.
break;
} else if (ATOMIC_LOAD(&old_prev->next_) != target) {
// not this operation cover the last operation
break;
} else {
// make sure that none operation cound cover the last operation.
prev = old_prev;
}
}
}
}
// make sure prev is valid all time
// must succed
inline int dl_insert(ObDLink* prev, ObDLink* target)
{
int err = 0;
target->prev_ = prev;
while (true) {
ObLink* next = link_next(prev);
if (next == link_insert(prev, target, next)) {
try_correct_prev_link((ObDLink*)next, target);
break;
}
}
return err;
}
// make sure next is valid all time
// must succed
inline int dl_insert_before(ObDLink* next, ObDLink* target)
{
int err = 0;
while (true) {
ObDLink* prev = ATOMIC_LOAD(&next->prev_);
target->prev_ = prev;
if (next == link_insert(prev, target, next)) {
try_correct_prev_link((ObDLink*)next, target);
break;
}
}
return err;
}
inline ObDLink* search_direct_prev(ObDLink* prev, ObDLink* target)
{
ObDLink* next = NULL;
while ((next = (ObDLink*)link_next(prev)) != target) {
prev = next;
}
return prev;
}
// -ENOENT: target is deleted by other thread
inline int dl_del(ObDLink* target)
{
int err = 0;
while (0 == err) {
ObDLink* prev = search_direct_prev(ATOMIC_LOAD(&target->prev_), target);
ObDLink* next = NULL;
if (target == link_del(prev, target, (ObLink*&)next)) {
try_correct_prev_link(next, prev);
break;
} else if (is_last_bit_set((uint64_t)next)) {
err = -ENOENT;
}
}
return err;
}
} // namespace common
} // namespace oceanbase
#endif // OCEANBASE_LIB_OB_QUEUE_

View File

@ -0,0 +1,213 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "lib/ob_errno.h"
#include "lib/queue/ob_link_queue.h"
#include "lib/atomic/ob_atomic.h"
using namespace oceanbase;
using namespace oceanbase::common;
namespace oceanbase {
namespace common {
bool ObSpLinkQueue::is_empty() const
{
return ATOMIC_LOAD(&head_) == ATOMIC_LOAD(&tail_);
}
int ObSpLinkQueue::top(Link*& p)
{
int ret = OB_SUCCESS;
Link* head = ATOMIC_LOAD(&head_);
if (head != &dummy_) {
p = head;
} else if (NULL == head->next_) {
ret = OB_EAGAIN;
} else {
ATOMIC_STORE(&head_, head->next_);
p = ATOMIC_LOAD(&head_);
ret = push(&dummy_);
}
return ret;
}
int ObSpLinkQueue::pop(Link*& p)
{
int ret = OB_SUCCESS;
while (OB_SUCCESS == (ret = do_pop(p)) && p == &dummy_) {
ret = push(p);
}
if (OB_SUCCESS != ret) {
p = NULL;
}
return ret;
}
int ObSpLinkQueue::push(Link* p)
{
int ret = OB_SUCCESS;
Link* tail = NULL;
if (NULL == p) {
ret = OB_INVALID_ARGUMENT;
} else {
p->next_ = NULL;
tail = ATOMIC_TAS(&tail_, p);
ATOMIC_STORE(&tail->next_, p);
}
return ret;
}
int ObSpLinkQueue::push_front(Link* p)
{
int ret = OB_SUCCESS;
Link* head = NULL;
if (NULL == p) {
ret = OB_INVALID_ARGUMENT;
} else {
while (NULL == (head = ATOMIC_TAS(&head_, NULL))) {
PAUSE();
}
ATOMIC_STORE(&p->next_, head);
ATOMIC_STORE(&head_, p);
}
return ret;
}
int ObSpLinkQueue::do_pop(Link*& p)
{
int ret = OB_SUCCESS;
Link* head = NULL;
while (NULL == (head = ATOMIC_TAS(&head_, NULL))) {
PAUSE();
}
if (head == ATOMIC_LOAD(&tail_)) {
ret = OB_EAGAIN;
ATOMIC_STORE(&head_, head);
} else {
Link* next = NULL;
while (NULL == (next = ATOMIC_LOAD(&head->next_))) {
PAUSE();
}
ATOMIC_STORE(&head_, next);
p = head;
}
return ret;
}
int64_t ObLinkQueue::size() const
{
uint64_t pop = ATOMIC_LOAD(&pop_);
uint64_t push = ATOMIC_LOAD(&push_);
return (int64_t)(push - pop);
}
static uint64_t faa_bounded(uint64_t* addr, uint64_t* limit_addr, uint64_t& limit)
{
uint64_t ov = 0;
uint64_t nv = ATOMIC_LOAD(addr);
while (((ov = nv) < limit || ov < (limit = ATOMIC_LOAD(limit_addr))) && ov != (nv = ATOMIC_CAS(addr, ov, ov + 1))) {
PAUSE();
}
return nv;
}
int ObLinkQueue::push(Link* p)
{
int ret = OB_SUCCESS;
if (NULL == p) {
ret = OB_INVALID_ARGUMENT;
} else {
uint64_t push_idx = ATOMIC_FAA(&push_, 1);
ObSpLinkQueue* pqueue = queue_ + idx(push_idx);
while (OB_SUCCESS != pqueue->push(p)) {
;
}
}
return ret;
}
int ObLinkQueue::push_front(Link* p)
{
int ret = OB_SUCCESS;
if (NULL == p) {
ret = OB_INVALID_ARGUMENT;
} else {
uint64_t push_idx = ATOMIC_SAF(&pop_, 1);
ObSpLinkQueue* pqueue = queue_ + idx(push_idx);
while (OB_SUCCESS != pqueue->push_front(p)) {
;
}
}
return ret;
}
int ObLinkQueue::pop(Link*& p)
{
int ret = OB_SUCCESS;
uint64_t pop_limit = 0;
uint64_t pop_idx = faa_bounded(&pop_, &push_, pop_limit);
if (pop_idx >= pop_limit) {
ret = OB_EAGAIN;
p = NULL;
} else {
ObSpLinkQueue* pqueue = queue_ + idx(pop_idx);
while (OB_SUCCESS != pqueue->pop(p)) {
;
}
}
return ret;
}
// add interface like ObSpLinkQueue
int ObSimpleLinkQueue::pop(Link*& p)
{
int ret = OB_SUCCESS;
while (OB_SUCCESS == (ret = do_pop(p)) && p == &dummy_) {
ret = push(p);
}
if (OB_SUCCESS != ret) {
p = NULL;
}
return ret;
}
int ObSimpleLinkQueue::push(Link* p)
{
int ret = OB_SUCCESS;
Link* tail = NULL;
if (NULL == p) {
ret = OB_INVALID_ARGUMENT;
} else {
p->next_ = NULL;
tail = tail_;
tail_ = p;
tail->next_ = p;
}
return ret;
}
int ObSimpleLinkQueue::do_pop(Link*& p)
{
int ret = OB_SUCCESS;
if (head_ == tail_) {
ret = OB_EAGAIN;
} else {
Link* head = head_;
Link* next = head->next_;
head_ = next;
p = head;
}
return ret;
}
} // end namespace common
} // end namespace oceanbase

157
deps/oblib/src/lib/queue/ob_link_queue.h vendored Normal file
View File

@ -0,0 +1,157 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_QUEUE_OB_LINK_QUEUE_
#define OCEANBASE_QUEUE_OB_LINK_QUEUE_
#include "lib/ob_define.h"
#include "lib/queue/ob_link.h" // ObLink
namespace oceanbase {
namespace common {
typedef ObLink QLink;
class ObSpScLinkQueue {
public:
typedef QLink Link;
ObSpScLinkQueue() : tail_(&head_)
{}
~ObSpScLinkQueue()
{}
Link* top()
{
return ATOMIC_LOAD(&head_.next_);
}
bool is_empty() const
{
return (NULL == ATOMIC_LOAD(&head_.next_));
}
void push(Link* p)
{
p->next_ = NULL;
Link* ot = ATOMIC_TAS(&tail_, p);
ATOMIC_STORE(&ot->next_, p);
}
Link* pop()
{
Link* ret = top();
if (NULL != ret) {
Link* next = ATOMIC_LOAD(&ret->next_);
if (NULL != next) {
ATOMIC_STORE(&head_.next_, next);
} else {
if (ATOMIC_BCAS(&tail_, ret, &head_)) {
// if update successfully, there is no parallel push, else there is no need to update,
IGNORE_RETURN ATOMIC_BCAS(&head_.next_, ret, NULL);
} else {
while (NULL == (next = ATOMIC_LOAD(&ret->next_))) {
PAUSE();
}
ATOMIC_STORE(&head_.next_, next);
}
}
}
return ret;
}
private:
Link head_;
Link* tail_;
};
class ObSpLinkQueue {
public:
typedef QLink Link;
ObSpLinkQueue() : head_(&dummy_), tail_(&dummy_), dummy_()
{}
~ObSpLinkQueue()
{}
public:
int top(Link*& p);
int pop(Link*& p);
int push(Link* p);
int push_front(Link* p);
bool is_empty() const;
private:
int do_pop(Link*& p);
private:
Link* head_ CACHE_ALIGNED;
Link* tail_ CACHE_ALIGNED;
Link dummy_;
private:
DISALLOW_COPY_AND_ASSIGN(ObSpLinkQueue);
};
class ObLinkQueue {
public:
typedef QLink Link;
enum { QUEUE_COUNT = 256 };
ObLinkQueue() : queue_(), push_(0), pop_(0)
{}
~ObLinkQueue()
{}
int push(Link* p);
int push_front(Link* p);
int pop(Link*& p);
int64_t size() const;
private:
static int64_t idx(int64_t x)
{
return x & (QUEUE_COUNT - 1);
}
private:
ObSpLinkQueue queue_[QUEUE_COUNT];
uint64_t push_ CACHE_ALIGNED;
uint64_t pop_ CACHE_ALIGNED;
private:
DISALLOW_COPY_AND_ASSIGN(ObLinkQueue);
};
class ObSimpleLinkQueue {
public:
typedef QLink Link;
ObSimpleLinkQueue() : head_(&dummy_), tail_(&dummy_), dummy_()
{}
~ObSimpleLinkQueue()
{}
public:
int top(Link*& p);
int pop(Link*& p);
int push(Link* p);
inline bool is_empty() const
{
return head_ == tail_;
}
private:
int do_pop(Link*& p);
private:
Link* head_ CACHE_ALIGNED;
Link* tail_ CACHE_ALIGNED;
Link dummy_;
private:
DISALLOW_COPY_AND_ASSIGN(ObSimpleLinkQueue);
};
} // end namespace common
} // end namespace oceanbase
#endif // OCEANBASE_QUEUE_OB_LINK_QUEUE_

1432
deps/oblib/src/lib/queue/ob_mpmc_queue.h vendored Normal file

File diff suppressed because it is too large Load Diff

246
deps/oblib/src/lib/queue/ob_ms_queue.cpp vendored Normal file
View File

@ -0,0 +1,246 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX LIB
#include "lib/queue/ob_ms_queue.h"
#include "lib/allocator/ob_allocator.h" // ObIAllocator
namespace oceanbase {
namespace common {
////////////////////////////////////////////// ObMsQueue::TaskHead ///////////////////////////////////
void ObMsQueue::TaskHead::add(ObMsQueue::Task* node)
{
if (NULL == node) {
} else {
node->next_ = NULL;
if (NULL == head_) {
head_ = node;
tail_ = node;
} else {
tail_->next_ = node;
tail_ = node;
}
}
}
ObMsQueue::Task* ObMsQueue::TaskHead::pop()
{
ObMsQueue::Task* pret = NULL;
if (NULL == head_) {
} else {
pret = head_;
head_ = head_->next_;
if (NULL == head_) {
tail_ = NULL;
}
}
return pret;
}
////////////////////////////////////////////// ObMsQueue::QueueInfo ///////////////////////////////////
int ObMsQueue::QueueInfo::init(char* buf, const int64_t len)
{
int ret = OB_SUCCESS;
if (NULL == buf || len <= 0) {
ret = OB_INVALID_ARGUMENT;
LIB_LOG(ERROR, "invalid args", K(ret), K(buf), K(len));
} else {
array_ = reinterpret_cast<TaskHead*>(buf);
memset(array_, 0, sizeof(TaskHead) * len);
len_ = len;
pop_ = 0;
}
return ret;
}
int ObMsQueue::QueueInfo::destroy()
{
array_ = NULL;
len_ = 0;
pop_ = 0;
return OB_SUCCESS;
}
int ObMsQueue::QueueInfo::add(const int64_t seq, ObMsQueue::Task* task)
{
int ret = OB_SUCCESS;
int64_t pop = ATOMIC_LOAD(&pop_);
if (seq < pop) {
ret = OB_ERR_UNEXPECTED;
} else if (seq >= pop + len_) {
ret = OB_EAGAIN;
} else if (NULL == array_) {
ret = OB_ERR_UNEXPECTED;
LIB_LOG(ERROR, "invalid array", K(ret), K(array_));
} else {
array_[seq % len_].add(task);
}
return ret;
}
// NOT thread-safe
int ObMsQueue::QueueInfo::get(const int64_t ready_seq, ObMsQueue::Task*& task)
{
int ret = OB_SUCCESS;
while (OB_SUCC(ret)) {
if (pop_ >= ready_seq) {
ret = OB_EAGAIN;
} else if (NULL == (task = array_[pop_ % len_].pop())) {
pop_++;
} else {
break;
}
}
return ret;
}
bool ObMsQueue::QueueInfo::next_is_ready(const int64_t ready_seq) const
{
return pop_ < ready_seq;
}
////////////////////////////////////////////// ObMsQueue::TaskHead ///////////////////////////////////
ObMsQueue::ObMsQueue() : inited_(false), qlen_(0), qcount_(0), qinfo_(NULL), seq_queue_(), allocator_(NULL)
{}
ObMsQueue::~ObMsQueue()
{
destroy();
}
int ObMsQueue::init(const int64_t n_queue, const int64_t queue_len, ObIAllocator* allocator)
{
int ret = OB_SUCCESS;
if (n_queue <= 0 || queue_len <= 0 || NULL == allocator) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid args", K(n_queue), K(queue_len), K(allocator));
} else if (inited_) {
ret = OB_INIT_TWICE;
} else if (NULL == (qinfo_ = static_cast<QueueInfo*>(allocator->alloc(n_queue * sizeof(QueueInfo))))) {
LOG_ERROR("allocate memory for QueueInfo fail", K(n_queue), K(sizeof(QueueInfo)));
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
for (int i = 0; OB_SUCC(ret) && i < n_queue; i++) {
char* ptr = NULL;
if (NULL == (ptr = static_cast<char*>(allocator->alloc(queue_len * sizeof(TaskHead))))) {
LOG_ERROR("allocate memory for TaskHead fail", "size", queue_len * sizeof(TaskHead));
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
new (qinfo_ + i) QueueInfo();
if (OB_FAIL(qinfo_[i].init(ptr, queue_len))) {
LOG_ERROR("init queue info fail", K(i), K(ret), K(ptr), K(queue_len));
}
}
}
}
if (OB_SUCC(ret)) {
if (OB_SUCCESS != (ret = seq_queue_.init(queue_len, allocator))) {
LOG_ERROR("init co-seq queue fail", K(queue_len), K(ret));
}
}
if (OB_SUCC(ret)) {
qcount_ = n_queue;
qlen_ = queue_len;
allocator_ = allocator;
inited_ = true;
}
return ret;
}
int ObMsQueue::destroy()
{
inited_ = false;
seq_queue_.destroy();
if (NULL != qinfo_ && NULL != allocator_) {
for (int64_t index = 0; index < qcount_; index++) {
if (NULL != qinfo_[index].array_) {
allocator_->free(qinfo_[index].array_);
}
qinfo_[index].~QueueInfo();
}
allocator_->free(qinfo_);
qinfo_ = NULL;
}
allocator_ = NULL;
qcount_ = 0;
qlen_ = 0;
return OB_SUCCESS;
}
// it must be one thread in common slot
int ObMsQueue::push(Task* task, const int64_t seq, const uint64_t hash)
{
int ret = OB_SUCCESS;
if (NULL == task || seq < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid args", K(ret), K(task), K(seq), K(hash));
} else if (!inited_) {
ret = OB_NOT_INIT;
} else if (OB_SUCCESS != (ret = qinfo_[hash % qcount_].add(seq, task)) && OB_EAGAIN != ret) {
LOG_ERROR("push to ms_queue: unexpected error", K(seq), K(task), K(hash));
} else {
// succ
}
return ret;
}
// different threads operate different qinfo
int ObMsQueue::get(Task*& task, const int64_t idx)
{
int ret = OB_SUCCESS;
if (idx < 0 || idx >= qcount_) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid args", K(ret), K(idx));
} else if (!inited_) {
ret = OB_NOT_INIT;
} else if (OB_SUCCESS != (ret = qinfo_[idx].get(seq_queue_.get_next(), task)) && OB_EAGAIN != ret) {
LOG_ERROR("get task from queue info fail", K(ret), K(idx));
}
return ret;
}
bool ObMsQueue::next_is_ready(const int64_t queue_index) const
{
bool bool_ret = false;
if (inited_ && queue_index >= 0 && queue_index < qcount_) {
bool_ret = qinfo_[queue_index].next_is_ready(seq_queue_.get_next());
}
return bool_ret;
}
int ObMsQueue::end_batch(const int64_t seq, const int64_t count)
{
int ret = OB_SUCCESS;
UNUSED(count);
if (!inited_) {
ret = OB_NOT_INIT;
} else {
int64_t next_seq = seq_queue_.add(seq);
UNUSED(next_seq);
}
return ret;
}
}; // namespace common
}; // end namespace oceanbase

86
deps/oblib/src/lib/queue/ob_ms_queue.h vendored Normal file
View File

@ -0,0 +1,86 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LIB_QUEUE_OB_MS_QUEUE_H_
#define OCEANBASE_LIB_QUEUE_OB_MS_QUEUE_H_
#include "lib/queue/ob_co_seq_queue.h" // ObCoSeqQueue
#include "lib/ob_define.h"
#include "lib/queue/ob_link.h" // ObLink
#include "lib/utility/ob_print_utils.h" // TO_STRING_KV
namespace oceanbase {
namespace common {
class ObIAllocator;
class ObMsQueue {
public:
typedef ObLink Task;
struct TaskHead {
Task* head_;
Task* tail_;
TaskHead() : head_(NULL), tail_(NULL)
{}
~TaskHead()
{}
void add(Task* node);
Task* pop();
};
struct QueueInfo {
TaskHead* array_;
int64_t len_;
int64_t pop_;
QueueInfo() : array_(NULL), len_(0), pop_(0)
{}
~QueueInfo()
{
destroy();
}
int init(char* buf, const int64_t len);
int destroy();
// there may be parallel add, but one seq can only be handled by one thread.
int add(const int64_t seq, Task* task);
// NOT thread-safe
int get(const int64_t ready_seq, Task*& task);
bool next_is_ready(const int64_t ready_seq) const;
};
public:
ObMsQueue();
~ObMsQueue();
int init(const int64_t n_queue, const int64_t queue_len, ObIAllocator* allocator);
int destroy();
int push(Task* task, const int64_t seq, const uint64_t hash);
int end_batch(const int64_t seq, const int64_t count);
int get(Task*& task, const int64_t idx);
int64_t get_queue_num() const
{
return qcount_;
}
bool next_is_ready(const int64_t queue_index) const;
TO_STRING_KV(K_(inited), K_(qlen), K_(qcount));
private:
bool inited_;
int64_t qlen_;
int64_t qcount_;
QueueInfo* qinfo_;
ObCoSeqQueue seq_queue_;
ObIAllocator* allocator_;
DISALLOW_COPY_AND_ASSIGN(ObMsQueue);
};
} // end namespace common
} // end namespace oceanbase
#endif /* OCEANBASE_LIB_QUEUE_OB_MS_QUEUE_H_ */

View File

@ -0,0 +1,215 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LIB_QUEUE_M_FIXED_QUEUE_H_
#define OCEANBASE_LIB_QUEUE_M_FIXED_QUEUE_H_
#include "lib/queue/ob_fixed_queue.h" // ObFixedQueue
#include "common/ob_queue_thread.h" // ObCond
namespace oceanbase {
namespace common {
const int64_t DefaultMaxQueueNum = 32;
const int64_t DefaultQueueNum = 0;
template <int MAX_QUEUE_NUM = DefaultMaxQueueNum>
class ObMultiFixedQueue {
public:
ObMultiFixedQueue() : inited_(false), queue_num_(DefaultQueueNum)
{}
virtual ~ObMultiFixedQueue()
{
int ret = OB_SUCCESS;
if (OB_SUCCESS != (ret = destroy())) {
LIB_LOG(ERROR, "err destroy multi fixed queue", K(ret));
}
}
public:
int init(const int64_t queue_size, const int64_t queue_num);
int destroy();
int push(void* data, const uint64_t hash_val, const int64_t timeout);
int pop(void*& data, const int64_t queue_index, const int64_t timeout);
int get_task_count(const int64_t queue_index, int64_t& task_count);
private:
int init_queue_(const int64_t queue_num, const int64_t queue_size);
void destroy_queue_(const int64_t queue_num);
private:
bool inited_;
ObFixedQueue<void> queue_[MAX_QUEUE_NUM];
ObCond queue_conds_[MAX_QUEUE_NUM];
int64_t queue_num_;
private:
DISALLOW_COPY_AND_ASSIGN(ObMultiFixedQueue);
};
template <int MAX_QUEUE_NUM>
int ObMultiFixedQueue<MAX_QUEUE_NUM>::init(const int64_t queue_size, const int64_t queue_num)
{
int ret = OB_SUCCESS;
if (inited_) {
ret = OB_INIT_TWICE;
} else if (0 >= queue_size || 0 >= queue_num || queue_num > MAX_QUEUE_NUM) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_SUCCESS != (ret = init_queue_(queue_num, queue_size))) {
LIB_LOG(ERROR, "err init queue", K(ret));
} else {
inited_ = true;
}
return ret;
}
template <int MAX_QUEUE_NUM>
int ObMultiFixedQueue<MAX_QUEUE_NUM>::destroy()
{
inited_ = false;
if (0 < queue_num_) {
destroy_queue_(queue_num_);
}
queue_num_ = 0;
return OB_SUCCESS;
}
template <int MAX_QUEUE_NUM>
int ObMultiFixedQueue<MAX_QUEUE_NUM>::init_queue_(const int64_t queue_num, const int64_t queue_size)
{
int ret = OB_SUCCESS;
if (!(0 < queue_num && MAX_QUEUE_NUM >= queue_num && 0 < queue_size)) {
ret = OB_INVALID_ARGUMENT;
LIB_LOG(ERROR, "invalid arguments", K(ret), K(queue_num), K(queue_size));
} else {
queue_num_ = queue_num;
}
for (int64_t index = 0; OB_SUCC(ret) && index < queue_num; index++) {
if (OB_SUCCESS != (ret = queue_[index].init(queue_size))) {
LIB_LOG(ERROR, "err init queue", K(ret));
}
}
return ret;
}
template <int MAX_QUEUE_NUM>
void ObMultiFixedQueue<MAX_QUEUE_NUM>::destroy_queue_(const int64_t queue_num)
{
if (0 < queue_num) {
for (int64_t index = 0; index < queue_num; index++) {
queue_[index].destroy();
}
}
}
template <int MAX_QUEUE_NUM>
int ObMultiFixedQueue<MAX_QUEUE_NUM>::push(void* data, const uint64_t hash_val, const int64_t timeout)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
} else if (NULL == data) {
ret = OB_INVALID_ARGUMENT;
} else {
int64_t index = hash_val % queue_num_;
ObCond& cond = queue_conds_[index];
int64_t end_time = timeout + ::oceanbase::common::ObTimeUtility::current_time();
while (true) {
ret = queue_[index].push(data);
if (OB_SIZE_OVERFLOW != ret) {
break;
}
int64_t left_time = end_time - ::oceanbase::common::ObTimeUtility::current_time();
if (left_time <= 0) {
ret = OB_TIMEOUT;
break;
}
cond.timedwait(left_time);
}
if (OB_SUCC(ret)) {
cond.signal();
}
}
return ret;
}
template <int MAX_QUEUE_NUM>
int ObMultiFixedQueue<MAX_QUEUE_NUM>::pop(void*& data, const int64_t queue_index, const int64_t timeout)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
} else if (0 > queue_index || queue_index >= MAX_QUEUE_NUM) {
ret = OB_INVALID_ARGUMENT;
} else {
ObCond& cond = queue_conds_[queue_index];
int64_t end_time = timeout + ::oceanbase::common::ObTimeUtility::current_time();
data = NULL;
while (true) {
ret = queue_[queue_index].pop(data);
if (OB_ENTRY_NOT_EXIST != ret) {
break;
}
int64_t left_time = end_time - ::oceanbase::common::ObTimeUtility::current_time();
if (left_time <= 0) {
ret = OB_TIMEOUT;
break;
}
cond.timedwait(left_time);
}
if (OB_SUCC(ret)) {
cond.signal();
}
}
return ret;
}
template <int MAX_QUEUE_NUM>
int ObMultiFixedQueue<MAX_QUEUE_NUM>::get_task_count(const int64_t queue_index, int64_t& task_count)
{
int ret = OB_SUCCESS;
task_count = 0;
if (OB_UNLIKELY(queue_index < 0) || OB_UNLIKELY(queue_index >= queue_num_)) {
LIB_LOG(ERROR, "invalid argument", K(queue_index), K(queue_num_));
ret = OB_INVALID_ARGUMENT;
} else {
task_count = queue_[queue_index].get_total();
}
return ret;
}
} // namespace common
} // namespace oceanbase
#endif /* OCEANBASE_LIB_QUEUE_M_FIXED_QUEUE_H_ */

View File

@ -0,0 +1,208 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_QUEUE_OB_PRIORITY_QUEUE_
#define OCEANBASE_QUEUE_OB_PRIORITY_QUEUE_
#include "lib/lock/ob_seq_sem.h"
#include "lib/queue/ob_link_queue.h"
#include "lib/lock/ob_scond.h"
namespace oceanbase {
namespace common {
template <int PRIOS>
class ObPriorityQueue {
public:
enum { PRIO_CNT = PRIOS };
ObPriorityQueue() : sem_(), queue_(), size_(0), limit_(INT64_MAX)
{}
~ObPriorityQueue()
{}
void set_limit(int64_t limit)
{
limit_ = limit;
}
inline int64_t size() const
{
return ATOMIC_LOAD(&size_);
}
int push(ObLink* data, int priority)
{
int ret = OB_SUCCESS;
if (ATOMIC_FAA(&size_, 1) > limit_) {
ret = OB_SIZE_OVERFLOW;
} else if (OB_UNLIKELY(NULL == data) || OB_UNLIKELY(priority < 0) || OB_UNLIKELY(priority >= PRIO_CNT)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "push error, invalid argument", KP(data), K(priority));
} else if (OB_FAIL(queue_[priority].push(data))) {
// do nothing
} else {
ret = sem_.post();
}
if (OB_FAIL(ret)) {
(void)ATOMIC_FAA(&size_, -1);
}
return ret;
}
int push_front(ObLink* data, int priority)
{
int ret = OB_SUCCESS;
ATOMIC_FAA(&size_, 1);
if (OB_UNLIKELY(NULL == data) || OB_UNLIKELY(priority < 0) || OB_UNLIKELY(priority >= PRIO_CNT)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "push front error, invalid argument", KP(data), K(priority));
} else if (OB_FAIL(queue_[priority].push_front(data))) {
// do nothing
} else {
ret = sem_.post();
}
if (OB_FAIL(ret)) {
(void)ATOMIC_FAA(&size_, -1);
}
return ret;
}
int pop(ObLink*& data, int64_t timeout_us)
{
int ret = OB_ENTRY_NOT_EXIST;
if (OB_UNLIKELY(timeout_us < 0)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(ERROR, "timeout is invalid", K(ret), K(timeout_us));
} else {
if (0 == sem_.wait(timeout_us)) {
for (int i = 0; OB_ENTRY_NOT_EXIST == ret && i < PRIO_CNT; i++) {
if (OB_SUCCESS == queue_[i].pop(data)) {
ret = OB_SUCCESS;
}
}
}
if (OB_FAIL(ret)) {
data = NULL;
} else {
(void)ATOMIC_FAA(&size_, -1);
}
}
return ret;
}
private:
ObSeqSem sem_;
ObLinkQueue queue_[PRIO_CNT];
int64_t size_ CACHE_ALIGNED;
int64_t limit_ CACHE_ALIGNED;
DISALLOW_COPY_AND_ASSIGN(ObPriorityQueue);
};
template <int HIGH_PRIOS, int LOW_PRIOS>
class ObPriorityQueue2 {
public:
enum { PRIO_CNT = HIGH_PRIOS + LOW_PRIOS };
ObPriorityQueue2() : queue_(), size_(0), limit_(INT64_MAX)
{}
~ObPriorityQueue2()
{}
void set_limit(int64_t limit)
{
limit_ = limit;
}
inline int64_t size() const
{
return ATOMIC_LOAD(&size_);
}
int64_t queue_size(const int i) const
{
return queue_[i].size();
}
int64_t to_string(char* buf, const int64_t buf_len) const
{
int64_t pos = 0;
common::databuff_printf(buf, buf_len, pos, "total_size=%ld ", size());
for (int i = 0; i < PRIO_CNT; i++) {
common::databuff_printf(buf, buf_len, pos, "queue[%d]=%ld ", i, queue_[i].size());
}
return pos;
}
int push(ObLink* data, int priority)
{
int ret = OB_SUCCESS;
if (ATOMIC_FAA(&size_, 1) > limit_) {
ret = OB_SIZE_OVERFLOW;
} else if (OB_UNLIKELY(NULL == data) || OB_UNLIKELY(priority < 0) || OB_UNLIKELY(priority >= PRIO_CNT)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "push error, invalid argument", KP(data), K(priority));
} else if (OB_FAIL(queue_[priority].push(data))) {
// do nothing
} else {
cond_.signal();
// if (priority < HIGH_PRIOS) {
// high_cond_.signal();
// }
}
if (OB_FAIL(ret)) {
(void)ATOMIC_FAA(&size_, -1);
}
return ret;
}
inline int do_pop(ObLink*& data, int64_t plimit, int64_t timeout_us)
{
int ret = OB_ENTRY_NOT_EXIST;
if (OB_UNLIKELY(timeout_us < 0)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(ERROR, "timeout is invalid", K(ret), K(timeout_us));
} else {
cond_.prepare();
for (int i = 0; OB_ENTRY_NOT_EXIST == ret && i < plimit; i++) {
if (OB_SUCCESS == queue_[i].pop(data)) {
ret = OB_SUCCESS;
}
}
if (OB_FAIL(ret)) {
cond_.wait(timeout_us);
data = NULL;
} else {
(void)ATOMIC_FAA(&size_, -1);
}
}
return ret;
}
int pop(ObLink*& data, int64_t timeout_us)
{
return do_pop(data, PRIO_CNT, timeout_us);
}
int pop_high(ObLink*& data, int64_t timeout_us)
{
return do_pop(data, HIGH_PRIOS, timeout_us);
}
private:
SCond cond_;
ObLinkQueue queue_[PRIO_CNT];
int64_t size_ CACHE_ALIGNED;
int64_t limit_ CACHE_ALIGNED;
DISALLOW_COPY_AND_ASSIGN(ObPriorityQueue2);
};
} // end namespace common
} // end namespace oceanbase
#endif // OCEANBASE_QUEUE_OB_PRIORITY_QUEUE_

View File

@ -0,0 +1,176 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "lib/queue/ob_seq_queue.h"
namespace oceanbase {
namespace common {
ObSeqQueue::ObSeqQueue() : seq_(0), items_(NULL), limit_(0)
{}
ObSeqQueue::~ObSeqQueue()
{}
int ObSeqQueue::init(const int64_t limit, SeqItem* buf)
{
int ret = OB_SUCCESS;
if (0 >= limit) {
ret = OB_INVALID_ARGUMENT;
_OB_LOG(ERROR, "init(limit=%ld, buf=%p): INVALID_ARGUMENT", limit, buf);
} else if (limit_ > 0 || NULL != items_) {
ret = OB_INIT_TWICE;
} else if (NULL == (items_ = (buf ?: (SeqItem*)buf_holder_.get(sizeof(SeqItem) * limit)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
_OB_LOG(ERROR, "buf_holder.get(%ld)=>NULL", sizeof(SeqItem) * limit);
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < N_COND; ++i) {
if (OB_FAIL(cond_[i].init(ObWaitEventIds::SEQ_QUEUE_COND_WAIT))) {
_OB_LOG(ERROR, "fail to init cond, ret=%d,", ret);
}
}
if (OB_SUCC(ret)) {
limit_ = limit;
memset(items_, 0, sizeof(SeqItem) * limit);
for (int64_t i = 0; i < limit; i++) {
items_[i].seq_ = -1;
}
}
}
return ret;
}
bool ObSeqQueue::is_inited() const
{
return NULL != items_ && limit_ > 0;
}
int ObSeqQueue::start(const int64_t seq)
{
int err = OB_SUCCESS;
if (0 >= seq) {
err = OB_INVALID_ARGUMENT;
_OB_LOG(ERROR, "start(seq=%ld): INVALID_ARGUMENT", seq);
} else if (NULL == items_) {
err = OB_NOT_INIT;
} else if (seq_ > 0) {
err = OB_INIT_TWICE;
} else {
err = update(seq);
}
return err;
}
int64_t ObSeqQueue::get_seq()
{
return seq_;
}
ObThreadCond* ObSeqQueue::get_cond(const int64_t seq)
{
return cond_ + (seq % N_COND);
}
int ObSeqQueue::add(const int64_t seq, void* data)
{
int err = OB_SUCCESS;
SeqItem* pitem = NULL;
if (!is_inited()) {
err = OB_NOT_INIT;
} else if (seq_ <= 0) {
err = OB_NOT_INIT;
_OB_LOG(ERROR, "seq_[%ld] <= 0", seq_);
} else if (seq < seq_) {
err = OB_INVALID_ARGUMENT;
_OB_LOG(ERROR, "add(seq[%ld] < seq_[%ld]): INVALID_ARGUMEN", seq, seq_);
} else if (seq_ + limit_ <= seq) {
err = OB_EAGAIN;
} else if (seq <= (pitem = items_ + seq % limit_)->seq_) {
err = OB_ENTRY_EXIST;
_OB_LOG(ERROR, "add(seq=%ld): ENTRY_EXIST", seq);
} else if (!__sync_bool_compare_and_swap(&pitem->seq_, -1, -2)) {
err = OB_EAGAIN;
} else {
ObThreadCond* cond = get_cond(seq);
ObThreadCondGuard guard(*cond);
pitem->data_ = data;
__sync_synchronize();
pitem->seq_ = seq;
cond->signal();
}
return err;
}
bool ObSeqQueue::next_is_ready() const
{
int64_t seq = seq_;
return NULL != items_ && (items_ + seq % limit_)->seq_ == seq;
}
int ObSeqQueue::get(int64_t& seq, void*& data, const int64_t timeout_us)
{
int err = OB_EAGAIN;
SeqItem* pitem = NULL;
int64_t end_time_us = ::oceanbase::common::ObTimeUtility::current_time() + timeout_us;
int64_t wait_time_us = timeout_us;
int wait_time_ms = (int)(wait_time_us / 1000LL);
ObThreadCond* cond = NULL;
seq = seq_;
cond = get_cond(seq);
if (!is_inited()) {
err = OB_NOT_INIT;
} else if (seq_ < 0) {
err = OB_ERR_UNEXPECTED;
_OB_LOG(ERROR, "seq_[%ld] < 0", seq_);
} else {
ObThreadCondGuard guard(*cond);
while (OB_EAGAIN == err) {
if (seq != seq_) {
break;
}
if ((pitem = items_ + seq % limit_)->seq_ != seq) {
if ((wait_time_ms = (int)(wait_time_us / 1000LL)) <= 0) {
break;
} else {
(void)cond->wait(wait_time_ms);
wait_time_us = end_time_us - ::oceanbase::common::ObTimeUtility::current_time();
}
} else if (__sync_bool_compare_and_swap(&seq_, seq, seq + 1)) {
err = OB_SUCCESS;
}
}
}
if (OB_SUCCESS == err) {
pitem = items_ + seq % limit_;
data = pitem->data_;
__sync_synchronize();
pitem->seq_ = -1;
}
return err;
}
int ObSeqQueue::update(const int64_t seq)
{
int err = OB_SUCCESS;
if (seq < seq_) {
err = OB_DISCONTINUOUS_LOG;
_OB_LOG(ERROR, "seq[%ld] < seq_[%ld]", seq, seq_);
} else {
ObThreadCond* cond = get_cond(seq_);
ObThreadCondGuard guard(*cond);
seq_ = seq;
cond->signal();
}
return err;
}
}; // end namespace common
}; // end namespace oceanbase

84
deps/oblib/src/lib/queue/ob_seq_queue.h vendored Normal file
View File

@ -0,0 +1,84 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef __OB_COMMON_OB_SEQ_QUEUE_H__
#define __OB_COMMON_OB_SEQ_QUEUE_H__
#include "lib/allocator/ob_malloc.h"
#include "lib/lock/ob_thread_cond.h"
namespace oceanbase {
namespace common {
// Suppose there are multiple threads adding elements to the queue, but only one thread will fetch elements, and no two
// threads will add elements with the same number to the queue at the same time.
// there is only one thread get object, and there are not two threads adding common-id object.
class ObSeqQueue {
struct BufHolder {
BufHolder() : buf_(NULL)
{}
~BufHolder()
{
if (NULL != buf_) {
ob_free(buf_);
buf_ = NULL;
}
}
const void* get(const int64_t size)
{
void* buf = NULL;
ObMemAttr memattr;
memattr.label_ = ObModIds::OB_SEQ_QUEUE;
if (NULL != buf_) {
_OB_LOG(ERROR, "buf_holder.get(size=%ld): not allowed to get second time", size);
} else if (NULL == (buf = ob_malloc(size, memattr))) {
_OB_LOG(ERROR, "ob_malloc(size=%ld)=>NULL", size);
} else {
buf_ = buf;
}
return buf;
}
void* buf_;
};
static const int64_t N_COND = 256;
struct SeqItem {
volatile int64_t seq_;
void* volatile data_;
};
public:
ObSeqQueue();
~ObSeqQueue();
public:
int init(const int64_t limit, SeqItem* buf = NULL);
int start(const int64_t seq);
int add(const int64_t seq, void* data);
int get(int64_t& seq, void*& data, const int64_t timeout_us);
int update(const int64_t seq);
bool next_is_ready() const;
int64_t get_seq();
protected:
bool is_inited() const;
ObThreadCond* get_cond(const int64_t seq);
private:
BufHolder buf_holder_;
volatile int64_t seq_;
SeqItem* items_ CACHE_ALIGNED;
int64_t limit_;
ObThreadCond cond_[N_COND] CACHE_ALIGNED;
};
}; // end namespace common
}; // end namespace oceanbase
#endif /* __OB_COMMON_OB_SEQ_QUEUE_H__ */

92
deps/oblib/src/lib/queue/spin_queue.h vendored Normal file
View File

@ -0,0 +1,92 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef __OB_UPDATESERVER_SPIN_QUEUE_H__
#define __OB_UPDATESERVER_SPIN_QUEUE_H__
#include "lib/allocator/ob_malloc.h"
namespace oceanbase {
namespace common {
class SpinQueue {
public:
enum { MIN_QLEN = 4 };
public:
SpinQueue() : push_(0), pop_(0), pos_mask_(0), items_(NULL)
{}
~SpinQueue()
{
destroy();
}
int init(uint64_t len)
{
int err = 0;
// int64_t len = 1 << (_len? (64 - __builtin_clzl(_len)): 0);
ObMemAttr memattr(OB_SERVER_TENANT_ID, ObModIds::TEST);
if (len < MIN_QLEN || 0 != ((len - 1) & len)) {
err = OB_INVALID_ARGUMENT;
} else if (NULL == (items_ = (void**)ob_malloc(sizeof(void*) * len, memattr))) {
err = OB_ALLOCATE_MEMORY_FAILED;
} else {
pos_mask_ = len - 1;
memset(items_, 0, sizeof(void*) * len);
}
return err;
}
void destroy()
{
if (!items_) {
ob_free(items_);
items_ = NULL;
}
}
int push(void* data)
{
int err = 0;
uint64_t seq = __sync_fetch_and_add(&push_, 1);
void* volatile* pi = items_ + (seq & pos_mask_);
while (!__sync_bool_compare_and_swap(pi, NULL, data))
;
return err;
}
int pop(void*& data)
{
int err = 0;
uint64_t seq = __sync_fetch_and_add(&pop_, 1);
void* volatile* pi = items_ + (seq & pos_mask_);
while (NULL == (data = *pi) || !__sync_bool_compare_and_swap(pi, data, NULL))
;
return err;
}
bool inited() const
{
return NULL != items_;
}
int64_t get_total() const
{
return push_ - pop_;
}
int64_t get_free() const
{
return pop_ + pos_mask_ + 1 - push_;
}
private:
volatile uint64_t push_ CACHE_ALIGNED;
volatile uint64_t pop_ CACHE_ALIGNED;
uint64_t pos_mask_ CACHE_ALIGNED;
void** items_;
};
}; // namespace common
}; // end namespace oceanbase
#endif /* __OB_UPDATESERVER_SPIN_QUEUE_H__ */