opt bucket limit and add DRWLock

This commit is contained in:
obdev 2024-12-03 15:15:04 +00:00 committed by ob-robot
parent 58a758dd0b
commit c0e0590870
7 changed files with 128 additions and 69 deletions

View File

@ -47,11 +47,13 @@ class ITCLimiter: public IQD
{
public:
enum { LIMIT_BALANCE_BOUND_MS = 10 };
ITCLimiter(int id, int type, const char* name): IQD(id, type, name), limit_per_sec_(INT64_MAX), due_ts_(0) {}
ITCLimiter(int id, int type, const char* name): IQD(id, type, name), limit_per_sec_(INT64_MAX), due_ts_(0), storage_key_(0) {}
ITCLimiter(int id, int type, const char* name, uint64_t storage_key): IQD(id, type, name), limit_per_sec_(INT64_MAX), due_ts_(0), storage_key_(storage_key) {}
virtual ~ITCLimiter() {}
virtual int64_t get_cost(TCRequest* req) = 0;
void set_limit_per_sec(int64_t limit) { limit_per_sec_ = limit; }
int64_t get_limit_per_sec() { return limit_per_sec_; }
uint64_t get_storage_key() { return storage_key_; }
int64_t inc_due_ns(TCRequest* req)
{
if (INT64_MAX == limit_per_sec_) {
@ -76,6 +78,7 @@ public:
protected:
int64_t limit_per_sec_;
int64_t due_ts_ CACHE_ALIGNED;
uint64_t storage_key_;
};
#include "ob_tc_limit.cpp"
@ -121,29 +124,27 @@ public:
}
}
} else {
for(int i = 0; i < MAX_LIMITER_COUNT && limiter_[i]; i++) {
for (int j = 0; (j < MAX_SHARED_DEVICE_LIMIT_COUNT); j++) {
if (req->ss_limiter_ids_[j] > 0 && req->ss_limiter_ids_[j] == limiter_[i]->get_id()) {
int64_t due_ns = limiter_[i]->get_due_ns();
if (max_ts < due_ns) {
max_ts = due_ns;
}
for (int i = 0; i < MAX_LIMITER_COUNT && limiter_[i]; i++) {
if (req->storage_key_ == limiter_[i]->get_storage_key()) {
int64_t due_ns = limiter_[i]->get_due_ns();
if (max_ts < due_ns) {
max_ts = due_ns;
}
}
}
}
return max_ts;
}
void inc_due_ns(TCRequest* req) {
void inc_due_ns(TCRequest *req)
{
limiter0_.inc_due_ns(req);
for(int i = 0; i < MAX_LIMITER_COUNT && limiter_[i]; i++) {
for (int j = 0; j < MAX_SHARED_DEVICE_LIMIT_COUNT; j++) {
if (req->ss_limiter_ids_[j] > 0 && req->ss_limiter_ids_[j] == limiter_[i]->get_id()) {
limiter_[i]->inc_due_ns(req);
}
for (int i = 0; i < MAX_LIMITER_COUNT && limiter_[i]; i++) {
if (req->storage_key_ == limiter_[i]->get_storage_key()) {
limiter_[i]->inc_due_ns(req);
}
}
}
private:
BytesLimiter limiter0_;
Limiter* limiter_[MAX_LIMITER_COUNT];

View File

@ -23,16 +23,15 @@ struct TCLink
struct TCRequest
{
TCRequest(): link_(NULL), qid_(-1), bytes_(0), start_ns_(0) {
memset(ss_limiter_ids_, -1, sizeof(ss_limiter_ids_));
TCRequest(): link_(NULL), qid_(-1), bytes_(0), start_ns_(0), storage_key_(0) {
}
TCRequest(int qid, int64_t bytes): link_(NULL), qid_(qid), bytes_(bytes), start_ns_(0) {}
TCRequest(int qid, int64_t bytes): link_(NULL), qid_(qid), bytes_(bytes), start_ns_(0), storage_key_(0) {}
~TCRequest() {}
TCLink link_;
int qid_;
int64_t bytes_;
int64_t start_ns_;
int ss_limiter_ids_[MAX_SHARED_DEVICE_LIMIT_COUNT];
uint64_t storage_key_;
};
class ITCHandler
@ -45,7 +44,7 @@ public:
enum QD_TYPE { QDISC_ROOT, QDISC_BUFFER_QUEUE, QDISC_WEIGHTED_QUEUE, QDISC_QUEUE_END, TCLIMIT_BYTES, TCLIMIT_COUNT };
int init_qdtable();
int tclimit_create(int type, const char* name);
int tclimit_create(int type, const char* name, uint64_t storage_key = 0);
void tclimit_destroy(int limiter_id);
int tclimit_set_limit(int limiter_id, int64_t limit);
int tclimit_get_limit(int limiter_id, int64_t &limit);

View File

@ -322,30 +322,31 @@ int qsched_submit(int qid, TCRequest* req, uint32_t chan_id)
return err;
}
ITCLimiter* tclimit_new(int id, int type, const char* name)
ITCLimiter* tclimit_new(int id, int type, const char* name, uint64_t storage_key = 0)
{
switch(type) {
case TCLIMIT_BYTES:
return new BytesLimiter(id, name);
return new BytesLimiter(id, name, storage_key);
case TCLIMIT_COUNT:
return new CountLimiter(id, name);
return new CountLimiter(id, name, storage_key);
break;
default:
abort();
}
}
int tclimit_create(int type, const char* name)
int tclimit_create(int type, const char* name, uint64_t storage_key)
{
QWGuard("tclimit_create");
int id = imap_lock();
if (id >= 0) {
ITCLimiter* limiter = tclimit_new(id, type, name);
ITCLimiter* limiter = tclimit_new(id, type, name, storage_key);
imap_set(id, limiter);
if (NULL == limiter) {
id = -1;
}
}
TC_INFO("tclimit create: type: %d, name: %s, storage_key: %lu, id: %d", type, name, storage_key, id);
return id;
}

View File

@ -13,7 +13,8 @@
class BytesLimiter: public ITCLimiter
{
public:
BytesLimiter(int id, const char* name): ITCLimiter(id, TCLIMIT_BYTES, name) {}
BytesLimiter(int id, const char* name): ITCLimiter(id, TCLIMIT_BYTES, name, 0) {}
BytesLimiter(int id, const char* name, uint64_t storage_key): ITCLimiter(id, TCLIMIT_BYTES, name, storage_key) {}
~BytesLimiter() {}
int64_t get_cost(TCRequest* req) { return req->bytes_; }
};
@ -22,6 +23,7 @@ class CountLimiter: public ITCLimiter
{
public:
CountLimiter(int id, const char* name): ITCLimiter(id, TCLIMIT_COUNT, name) {}
CountLimiter(int id, const char* name, uint64_t storage_key): ITCLimiter(id, TCLIMIT_COUNT, name, storage_key) {}
~CountLimiter() {}
int64_t get_cost(TCRequest* req) { return 1; }
};

View File

@ -107,9 +107,8 @@ int ObVirtualSharedStorageQuota::add_one_storage_batch_row()
{
GetLimitV2(obrpc::ObSharedDeviceResourceArray &limits) : limits_(limits)
{}
int operator()(
oceanbase::common::hash::HashMapPair<ObTrafficControl::ObStorageKey, ObTrafficControl::ObSharedDeviceControlV2>
&entry)
int operator()(oceanbase::common::hash::HashMapPair<ObTrafficControl::ObStorageKey,
ObTrafficControl::ObSharedDeviceControlV2 *> &entry)
{
int ret = OB_SUCCESS;
int idx_begin = limits_.array_.count();
@ -118,11 +117,12 @@ int ObVirtualSharedStorageQuota::add_one_storage_batch_row()
}
if (OB_UNLIKELY(idx_begin < 0)
|| OB_UNLIKELY(idx_begin + obrpc::ResourceType::ResourceTypeCnt > limits_.array_.count())) {
} else if (OB_UNLIKELY(OB_ISNULL(entry.second))) {
} else {
for (int i = 0; i < obrpc::ResourceType::ResourceTypeCnt; ++i) {
limits_.array_.at(idx_begin + i).key_ = entry.first;
limits_.array_.at(idx_begin + i).type_ = static_cast<obrpc::ResourceType>(i);
limits_.array_.at(idx_begin + i).value_ = entry.second.get_limit(static_cast<obrpc::ResourceType>(i));
limits_.array_.at(idx_begin + i).value_ = entry.second->get_limit(static_cast<obrpc::ResourceType>(i));
}
}
return ret;

View File

@ -145,18 +145,29 @@ int ObTrafficControl::ObSharedDeviceControlV2::init()
limits_[obrpc::ResourceType::ibw] = INT64_MAX / (16 * (1<<11));
limits_[obrpc::ResourceType::iobw] = INT64_MAX / (16 * (1<<10));
limits_[obrpc::ResourceType::tag] = INT64_MAX;
storage_key_ = ObStorageKey();
return ret;
}
int ObTrafficControl::ObSharedDeviceControlV2::destroy()
void ObTrafficControl::ObSharedDeviceControlV2::destroy()
{
int ret = OB_SUCCESS;
if (OB_FAIL(group_list_.clear())) {
LOG_WARN("clear map failed", K(ret));
} else {
for (int i = 0; i < static_cast<int>(obrpc::ResourceType::ResourceTypeCnt); i++) {
if (limit_ids_[i] < 0) {
LOG_WARN("invalid limit id failed", K(ret), K(limit_ids_[i]), K(i));
} else {
tclimit_destroy(limit_ids_[i]);
}
}
}
return ret;
}
int ObTrafficControl::ObSharedDeviceControlV2::set_storage_key(const ObTrafficControl::ObStorageKey &key)
{
return storage_key_.assign(key);
}
int ObTrafficControl::ObSharedDeviceControlV2::add_shared_device_limits()
{
int ret = OB_SUCCESS;
@ -164,19 +175,23 @@ int ObTrafficControl::ObSharedDeviceControlV2::add_shared_device_limits()
limit_ids_[static_cast<int>(ResourceType::ibw)] = tclimit_create(TCLIMIT_BYTES, get_resource_type_str(ResourceType::ibw));
limit_ids_[static_cast<int>(ResourceType::ops)] = tclimit_create(TCLIMIT_COUNT, get_resource_type_str(ResourceType::ops));
limit_ids_[static_cast<int>(ResourceType::obw)] = tclimit_create(TCLIMIT_BYTES, get_resource_type_str(ResourceType::obw));
LOG_INFO("add shared device limit success",
"ips_limit_id",
limit_ids_[static_cast<int>(ResourceType::ips)],
"ibw_limit_id",
limit_ids_[static_cast<int>(ResourceType::ibw)],
"ops_limit_id",
limit_ids_[static_cast<int>(ResourceType::ops)],
"obw_limit_id",
limit_ids_[static_cast<int>(ResourceType::obw)],
K(ret));
return ret;
}
int ObTrafficControl::ObSharedDeviceControlV2::fill_qsched_req_ss_limits(ObIORequest& req)
int ObTrafficControl::ObSharedDeviceControlV2::fill_qsched_req_storage_key(ObIORequest& req)
{
int ret = OB_SUCCESS;
if (ObIOMode::READ == req.get_mode()) {
req.qsched_req_.ss_limiter_ids_[0] = limit_ids_[static_cast<int>(ResourceType::ips)];
req.qsched_req_.ss_limiter_ids_[1] = limit_ids_[static_cast<int>(ResourceType::ibw)];
} else if (ObIOMode::WRITE == req.get_mode()) {
req.qsched_req_.ss_limiter_ids_[0] = limit_ids_[static_cast<int>(ResourceType::ops)];
req.qsched_req_.ss_limiter_ids_[1] = limit_ids_[static_cast<int>(ResourceType::obw)];
}
req.qsched_req_.storage_key_ = this->storage_key_.hash();
return ret;
}
@ -430,7 +445,7 @@ void ObTrafficControl::print_bucket_status_V2()
int64_t &tag_;
};
PrinterFn(const hash::ObHashMap<ObIORecordKey, ObSharedDeviceIORecord> &map) : map_(map) {}
int operator () (oceanbase::common::hash::HashMapPair<ObStorageKey, ObSharedDeviceControlV2> &entry) {
int operator () (oceanbase::common::hash::HashMapPair<ObStorageKey, ObSharedDeviceControlV2*> &entry) {
int64_t bw_in = 0;
int64_t bw_out = 0;
int64_t req_in = 0;
@ -438,19 +453,20 @@ void ObTrafficControl::print_bucket_status_V2()
int64_t tag = 0;
CalFn fn(entry.first, bw_in, bw_out, req_in, req_out, tag);
map_.foreach_refactored(fn);
if (bw_in || bw_out || req_in || req_out || tag) {
if (OB_UNLIKELY(OB_ISNULL(entry.second))) {
} else if (bw_in || bw_out || req_in || req_out || tag) {
_LOG_INFO("[IO STATUS BUCKET] storage={%u, %ld, %ld}, in=[%ld / %ld]kB/s, out=[%ld / %ld]kB/s, ips=[%ld / %ld], ops=[%ld / %ld]",
entry.first.get_category(),
entry.first.get_tenant_id(),
entry.first.get_storage_id(),
bw_in / 1024,
entry.second.limits_[static_cast<int>(ResourceType::ibw)] / 1024,
entry.second->limits_[static_cast<int>(ResourceType::ibw)] / 1024,
bw_out / 1024,
entry.second.limits_[static_cast<int>(ResourceType::obw)] / 1024,
entry.second->limits_[static_cast<int>(ResourceType::obw)] / 1024,
req_in,
entry.second.limits_[static_cast<int>(ResourceType::ips)],
entry.second->limits_[static_cast<int>(ResourceType::ips)],
req_out,
entry.second.limits_[static_cast<int>(ResourceType::ops)]);
entry.second->limits_[static_cast<int>(ResourceType::ops)]);
}
return OB_SUCCESS;
}
@ -459,6 +475,7 @@ void ObTrafficControl::print_bucket_status_V2()
PrinterFn fn(io_record_map_);
shared_device_map_v2_.foreach_refactored(fn);
}
int ObTrafficControl::set_limit(const obrpc::ObSharedDeviceResourceArray &limit)
{
int ret = OB_SUCCESS;
@ -483,9 +500,10 @@ int ObTrafficControl::set_limit_v2(const obrpc::ObSharedDeviceResourceArray &lim
ObSharedDeviceControlV2 *tc = nullptr;
if (ResourceType::tag == limit.array_.at(i).type_) {
// Tag is currently unavailable
} else if (OB_ISNULL(tc = shared_device_map_v2_.get(limit.array_.at(i).key_))) {
// ignore ret
} else if (OB_FAIL(shared_device_map_v2_.get_refactored(limit.array_.at(i).key_, tc))) {
LOG_WARN_RET(OB_HASH_NOT_EXIST, "get index from map failed", K(limit.array_.at(i).key_));
} else if (OB_UNLIKELY(OB_ISNULL(tc))) {
LOG_WARN_RET(OB_HASH_NOT_EXIST, "tc is not exist", K(limit.array_.at(i).key_));
} else if (OB_SUCCESS != (tc->update_limit(limit.array_.at(i)))) {
LOG_WARN("update shared device limit failed", K(ret), K(i), K(limit.array_.at(i)));
}
@ -523,26 +541,40 @@ int ObTrafficControl::register_bucket(ObIORequest &req, const int qid) {
ObIOSSGrpKey grp_key(req.tenant_id_, req.get_group_key());
ObSharedDeviceControlV2 *tc = nullptr;
// global register bucket
if (OB_NOT_NULL(tc = shared_device_map_v2_.get(key))) {
} else if (OB_FAIL(shared_device_map_v2_.set_refactored(key, ObSharedDeviceControlV2())) && OB_HASH_EXIST != ret) {
LOG_WARN("set map failed", K(ret));
} else if (OB_ISNULL(tc = shared_device_map_v2_.get(key))) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("hash is not existed", K(ret), K(key));
} else if (OB_FAIL(tc->add_shared_device_limits())) {
LOG_WARN("add shared device limits failed", K(ret), K(req), K(grp_key), K(qid));
if (OB_SUCCESS == shared_device_map_v2_.get_refactored(key, tc)) {
} else {
DRWLock::WRLockGuard guard(rw_lock_);
int tmp_ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_SUCCESS == shared_device_map_v2_.get_refactored(key, tc))) {
} else if (OB_ISNULL(tc = OB_NEW(ObSharedDeviceControlV2, "SDCtrlV2"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_FAIL(tc->set_storage_key(key))) {
} else if (OB_FAIL(tc->add_shared_device_limits())) {
LOG_WARN("add shared device limits failed", K(ret), K(req), K(grp_key), K(qid));
} else if (OB_FAIL(shared_device_map_v2_.set_refactored(key, tc))) {
LOG_WARN("set map failed", K(ret));
}
}
// register bucket for group
if (REACH_TIME_INTERVAL(100 * 1000)) {
if (OB_HASH_NOT_EXIST != tc->is_group_key_exist(grp_key)) {
} else if (OB_FAIL(tc->add_group(grp_key, qid))) {
LOG_WARN("add shared device limits failed", K(ret), K(grp_key), K(qid));
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(tc)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tc is not exist", K(ret), K(tc), K(qid), K(req));
} else if (OB_SUCCESS == tc->is_group_key_exist(grp_key)) {
} else {
DRWLock::WRLockGuard guard(rw_lock_);
if (OB_SUCCESS == tc->is_group_key_exist(grp_key)) {
} else if (OB_FAIL(tc->add_group(grp_key, qid))) {
LOG_WARN("add shared device limits failed", K(ret), K(grp_key), K(qid));
}
}
}
(void)tc->fill_qsched_req_ss_limits(req);
if (OB_NOT_NULL(tc)) {
(void)tc->fill_qsched_req_storage_key(req);
}
}
return ret;
}
@ -596,6 +628,7 @@ int ObTrafficControl::gc_tenant_infos()
{
int ret = OB_SUCCESS;
if (REACH_TIME_INTERVAL(1 * 60 * 1000L * 1000L)) { // 60s
DRWLock::WRLockGuard guard(rw_lock_);
struct GCTenantSharedDeviceInfos
{
GCTenantSharedDeviceInfos(
@ -624,7 +657,7 @@ int ObTrafficControl::gc_tenant_infos()
const ObVector<uint64_t> &tenant_ids, ObSEArray<ObTrafficControl::ObStorageKey, 7> &gc_tenant_infos)
: tenant_ids_(tenant_ids), gc_tenant_infos_(gc_tenant_infos)
{}
int operator()(hash::HashMapPair<ObTrafficControl::ObStorageKey, ObTrafficControl::ObSharedDeviceControlV2> &pair)
int operator()(hash::HashMapPair<ObTrafficControl::ObStorageKey, ObTrafficControl::ObSharedDeviceControlV2 *> &pair)
{
bool is_find = false;
for (int i = 0; !is_find && i < tenant_ids_.size(); ++i) {
@ -681,18 +714,31 @@ int ObTrafficControl::gc_tenant_infos()
for (int i = 0; i < gc_tenant_record_infos.count(); ++i) {
if (OB_SUCCESS != io_record_map_.erase_refactored(gc_tenant_record_infos.at(i))) {
LOG_WARN("SSNT:failed to erase gc tenant record infos", K(ret), K(gc_tenant_record_infos.at(i)));
} else {
LOG_INFO("SSNT:erase gc tenant record infos", K(ret), K(gc_tenant_record_infos.at(i)));
}
}
for (int i = 0; i < gc_tenant_shared_device_infos.count(); ++i) {
if (OB_SUCCESS != shared_device_map_.erase_refactored(gc_tenant_shared_device_infos.at(i))) {
LOG_WARN(
"SSNT:failed to erase gc tenant shared device infos", K(ret), K(gc_tenant_shared_device_infos.at(i)));
} else {
LOG_INFO("SSNT:erase gc tenant shared device infos", K(ret), K(gc_tenant_shared_device_infos.at(i)));
}
}
for (int i = 0; i < gc_tenant_shared_device_infos_v2.count(); ++i) {
if (OB_SUCCESS != shared_device_map_v2_.erase_refactored(gc_tenant_shared_device_infos_v2.at(i))) {
LOG_WARN(
"SSNT:failed to erase gc tenant shared device infos", K(ret), K(gc_tenant_shared_device_infos_v2.at(i)));
int tmp_ret = OB_SUCCESS;
ObTrafficControl::ObSharedDeviceControlV2 **val_ptr = nullptr;
if (OB_TMP_FAIL(shared_device_map_v2_.erase_refactored(gc_tenant_shared_device_infos_v2.at(i), val_ptr))) {
LOG_WARN("SSNT:failed to erase gc tenant shared device infos", K(tmp_ret), K(gc_tenant_shared_device_infos_v2.at(i)), K(val_ptr));
} else if (OB_ISNULL(val_ptr)) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_ERROR("SSNT:failed to erase gc tenant shared device infos", K(tmp_ret), K(gc_tenant_shared_device_infos_v2.at(i)), K(val_ptr));
} else if (FALSE_IT((*val_ptr)->destroy())) {
LOG_WARN("SSNT:failed to destroy shared device control", K(tmp_ret), K(gc_tenant_shared_device_infos_v2.at(i)), K(val_ptr));
} else if (FALSE_IT(ob_delete(*val_ptr))) {
} else {
LOG_INFO("SSNT:erase gc tenant shared device infos succ", K(ret), K(tmp_ret), K(gc_tenant_shared_device_infos_v2.at(i)), K(val_ptr));
}
}
}

View File

@ -93,6 +93,13 @@ public:
tenant_id_ = gen_user_tenant_id(tenant_id_);
}
}
int assign(const ObTrafficControl::ObStorageKey &other)
{
storage_id_ = other.storage_id_;
tenant_id_ = other.tenant_id_;
category_ = other.category_;
return OB_SUCCESS;
}
uint64_t hash() const
{
return (storage_id_ << 48) ^ (tenant_id_ << 32) ^ ((uint64_t)category_ << 16);
@ -247,17 +254,19 @@ public:
TO_STRING_KV(K(grp_list_));
};
ObSharedDeviceControlV2();
ObSharedDeviceControlV2(const ObStorageKey &key);
~ObSharedDeviceControlV2();
int init();
int destroy();
void destroy();
int set_storage_key(const ObTrafficControl::ObStorageKey &key);
int add_shared_device_limits();
int fill_qsched_req_ss_limits(ObIORequest& req);
int fill_qsched_req_storage_key(ObIORequest& req);
int add_group(const ObIOSSGrpKey &grp_key, const int qid);
int is_group_key_exist(const ObIOSSGrpKey &grp_key);
int64_t get_limit(const obrpc::ResourceType type) const;
int update_limit(const obrpc::ObSharedDeviceResource &limit);
int64_t to_string(char* buf, const int64_t buf_len) const;
ObStorageKey storage_key_;
// limit and limit_ids: ops = 0, ips = 1, iops = 2, obw = 3, ibw = 4, iobw = 5, tag = 6
int64_t limits_[static_cast<int>(obrpc::ResourceType::ResourceTypeCnt)];
int limit_ids_[static_cast<int>(obrpc::ResourceType::ResourceTypeCnt)];
@ -293,7 +302,7 @@ private:
private:
// for device limitation
hash::ObHashMap<ObStorageKey, ObSharedDeviceControl> shared_device_map_;
hash::ObHashMap<ObStorageKey, ObSharedDeviceControlV2> shared_device_map_v2_;
hash::ObHashMap<ObStorageKey, ObSharedDeviceControlV2*> shared_device_map_v2_;
// for diagnose
hash::ObHashMap<ObIORecordKey, ObSharedDeviceIORecord> io_record_map_;
// maybe different key between limitation and diagnose later
@ -307,6 +316,7 @@ private:
ObAtomIOClock ibw_clock_;
ObAtomIOClock obw_clock_;
int64_t device_bandwidth_;
DRWLock rw_lock_;
};
class ObIOManager final