[CP] add pre_check when set group_io_config && fix same io_timestamp in schedule_heap

This commit is contained in:
renju96
2023-11-22 05:45:02 +00:00
committed by ob-robot
parent 3648cbf52b
commit 54bf7798c8
10 changed files with 157 additions and 59 deletions

View File

@ -137,6 +137,7 @@ public:
int refresh(const bool only_refresh, const ObIArray<ObIOBenchResult> &items); int refresh(const bool only_refresh, const ObIArray<ObIOBenchResult> &items);
int execute_benchmark(); int execute_benchmark();
int get_benchmark_status(int64_t &start_ts, int64_t &finish_ts, int &ret_code); int get_benchmark_status(int64_t &start_ts, int64_t &finish_ts, int &ret_code);
bool is_valid() { return io_ability_.is_valid(); }
private: private:
ObIOCalibration(); ObIOCalibration();
~ObIOCalibration(); ~ObIOCalibration();

View File

@ -384,6 +384,7 @@ private:
ObCurTraceId::TraceId trace_id_; ObCurTraceId::TraceId trace_id_;
}; };
typedef common::ObDList<ObIORequest> IOReqList;
class ObPhyQueue final class ObPhyQueue final
{ {
public: public:
@ -397,7 +398,6 @@ public:
void set_stop_accept() { stop_accept_ = true; } void set_stop_accept() { stop_accept_ = true; }
bool reach_adjust_interval(); bool reach_adjust_interval();
public: public:
typedef common::ObDList<ObIORequest> IOReqList;
TO_STRING_KV(K_(reservation_ts), K_(group_limitation_ts), K_(tenant_limitation_ts), TO_STRING_KV(K_(reservation_ts), K_(group_limitation_ts), K_(tenant_limitation_ts),
K_(stop_accept), K_(last_empty_ts)); K_(stop_accept), K_(last_empty_ts));
bool is_inited_; bool is_inited_;
@ -533,29 +533,30 @@ public:
int64_t &proportion_ts); int64_t &proportion_ts);
int push_phyqueue(ObPhyQueue *phy_queue); int push_phyqueue(ObPhyQueue *phy_queue);
int pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts); int pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts);
TO_STRING_KV(K(is_inited_)); TO_STRING_KV(K(is_inited_), K(r_heap_.count()), K(gl_heap_.count()), K(tl_heap_.count()), K(ready_heap_.count()));
int remove_from_heap(ObPhyQueue *phy_queue); int remove_from_heap(ObPhyQueue *phy_queue);
private: private:
int pop_with_ready_queue(const int64_t current_ts, ObIORequest *&req, int64_t &deadline_ts); int pop_with_ready_queue(const int64_t current_ts, ObIORequest *&req, int64_t &deadline_ts);
template<typename T, int64_t T::*member> template<typename T, int64_t T::*member_ts, IOReqList T::*list>
struct HeapCompare { struct HeapCompare {
int get_error_code() { return OB_SUCCESS; } int get_error_code() { return OB_SUCCESS; }
bool operator() (const T *left, const T *right) const { bool operator() (const T *left, const T *right) const {
return left->*member != right->*member ? left->*member > right->*member: (int64_t)left > (int64_t)right; return left->*member_ts != right->*member_ts ? left->*member_ts > right->*member_ts :
(left->*list).get_size() != (right->*list).get_size() ? (left->*list).get_size() < (right->*list).get_size() : (int64_t)left > (int64_t)right;
} }
}; };
private: private:
bool is_inited_; bool is_inited_;
HeapCompare<ObPhyQueue, &ObPhyQueue::reservation_ts_> r_cmp_; HeapCompare<ObPhyQueue, &ObPhyQueue::reservation_ts_, &ObPhyQueue::req_list_> r_cmp_;
HeapCompare<ObPhyQueue, &ObPhyQueue::group_limitation_ts_> gl_cmp_; HeapCompare<ObPhyQueue, &ObPhyQueue::group_limitation_ts_, &ObPhyQueue::req_list_> gl_cmp_;
HeapCompare<ObPhyQueue, &ObPhyQueue::tenant_limitation_ts_> tl_cmp_; HeapCompare<ObPhyQueue, &ObPhyQueue::tenant_limitation_ts_, &ObPhyQueue::req_list_> tl_cmp_;
HeapCompare<ObPhyQueue, &ObPhyQueue::proportion_ts_> p_cmp_; HeapCompare<ObPhyQueue, &ObPhyQueue::proportion_ts_, &ObPhyQueue::req_list_> p_cmp_;
ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::reservation_ts_>, &ObPhyQueue::reservation_pos_> r_heap_; ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::reservation_ts_, &ObPhyQueue::req_list_>, &ObPhyQueue::reservation_pos_> r_heap_;
ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::group_limitation_ts_>, &ObPhyQueue::group_limitation_pos_> gl_heap_; ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::group_limitation_ts_, &ObPhyQueue::req_list_>, &ObPhyQueue::group_limitation_pos_> gl_heap_;
ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::tenant_limitation_ts_>, &ObPhyQueue::tenant_limitation_pos_> tl_heap_; ObRemovableHeap<ObPhyQueue *,HeapCompare<ObPhyQueue, &ObPhyQueue::tenant_limitation_ts_, &ObPhyQueue::req_list_>, &ObPhyQueue::tenant_limitation_pos_> tl_heap_;
ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::proportion_ts_>, &ObPhyQueue::proportion_pos_> ready_heap_; ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::proportion_ts_, &ObPhyQueue::req_list_>, &ObPhyQueue::proportion_pos_> ready_heap_;
}; };
template <typename T> template <typename T>

View File

@ -1042,6 +1042,7 @@ int ObTenantIOManager::update_basic_io_config(const ObTenantIOConfig &io_config)
LOG_WARN("update io clock unit config failed", K(ret), K(io_config), K(io_config_)); LOG_WARN("update io clock unit config failed", K(ret), K(io_config), K(io_config_));
} else { } else {
io_config_.unit_config_ = io_config.unit_config_; io_config_.unit_config_ = io_config.unit_config_;
LOG_INFO("update io clock success", K(tenant_id_), K(io_config_), KPC(io_clock_));
} }
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
@ -1446,9 +1447,8 @@ int ObTenantIOManager::delete_consumer_group_config(const int64_t group_id)
LOG_WARN("stop phy queues failed", K(ret), K(tenant_id_), K(index)); LOG_WARN("stop phy queues failed", K(ret), K(tenant_id_), K(index));
} }
} else if (OB_STATE_NOT_MATCH == ret) { } else if (OB_STATE_NOT_MATCH == ret) {
// group delete twice // group delete twice, maybe deleted by delete_directive or delete_plan
ret = OB_ERR_UNEXPECTED; LOG_INFO("group delete twice", K(ret), K(index), K(group_id));
LOG_WARN("group delete twice", K(ret), K(index), K(group_id));
} else { } else {
LOG_WARN("get index from map failed", K(ret), K(group_id), K(index)); LOG_WARN("get index from map failed", K(ret), K(group_id), K(index));
} }
@ -1627,7 +1627,7 @@ void ObTenantIOManager::print_io_status()
if (OB_FAIL(callback_mgr_.get_queue_count(queue_count_array))) { if (OB_FAIL(callback_mgr_.get_queue_count(queue_count_array))) {
LOG_WARN("get callback queue count failed", K(ret)); LOG_WARN("get callback queue count failed", K(ret));
} }
LOG_INFO("[IO STATUS]", K_(tenant_id), K_(ref_cnt), K_(io_config), LOG_INFO("[IO STATUS CONFIG]", K_(tenant_id), K_(ref_cnt), K_(io_config),
"allocated_memory", io_allocator_.get_allocated_size(), "allocated_memory", io_allocator_.get_allocated_size(),
"free_request_count", io_request_pool_.get_free_cnt(), "free_request_count", io_request_pool_.get_free_cnt(),
"free_result_count", io_result_pool_.get_free_cnt(), "free_result_count", io_result_pool_.get_free_cnt(),

View File

@ -852,7 +852,7 @@ void ObIOTuner::print_sender_status()
ret = sender->get_sender_info(reservation_ts, group_limitation_ts, tenant_limitation_ts, proportion_ts); ret = sender->get_sender_info(reservation_ts, group_limitation_ts, tenant_limitation_ts, proportion_ts);
if (OB_NOT_INIT != ret) { if (OB_NOT_INIT != ret) {
LOG_INFO("[IO SENDER STATUS]", "send_index", sender->sender_index_, "req_count", sender->get_queue_count(), LOG_INFO("[IO STATUS SENDER]", "send_index", sender->sender_index_, "req_count", sender->get_queue_count(),
K(reservation_ts), K(group_limitation_ts), K(tenant_limitation_ts), K(proportion_ts)); K(reservation_ts), K(group_limitation_ts), K(tenant_limitation_ts), K(proportion_ts));
} }
} }

View File

@ -25,11 +25,13 @@
#include "share/resource_manager/ob_resource_manager.h" #include "share/resource_manager/ob_resource_manager.h"
#include "share/inner_table/ob_inner_table_schema_constants.h" #include "share/inner_table/ob_inner_table_schema_constants.h"
#include "share/resource_manager/ob_resource_mapping_rule_manager.h" #include "share/resource_manager/ob_resource_mapping_rule_manager.h"
#include "share/io/ob_io_manager.h"
#include "common/ob_timeout_ctx.h" #include "common/ob_timeout_ctx.h"
#include "observer/ob_sql_client_decorator.h" #include "observer/ob_sql_client_decorator.h"
#include "observer/ob_server_struct.h" #include "observer/ob_server_struct.h"
#include "sql/session/ob_sql_session_info.h" #include "sql/session/ob_sql_session_info.h"
#include "lib/utility/ob_fast_convert.h" #include "lib/utility/ob_fast_convert.h"
#include "observer/ob_server.h"
using namespace oceanbase::common; using namespace oceanbase::common;
using namespace oceanbase::common::sqlclient; using namespace oceanbase::common::sqlclient;
@ -653,7 +655,8 @@ int ObResourceManagerProxy::check_if_function_exist(const ObString &function_nam
0 == function_name.compare("COMPACTION_LOW") || 0 == function_name.compare("COMPACTION_LOW") ||
0 == function_name.compare("HA_LOW") || 0 == function_name.compare("HA_LOW") ||
0 == function_name.compare("DDL_HIGH") || 0 == function_name.compare("DDL_HIGH") ||
0 == function_name.compare("DDL")) { 0 == function_name.compare("DDL") ||
0 == function_name.compare("OTHER_BACKGROUND")) {
exist = true; exist = true;
} else { } else {
exist = false; exist = false;
@ -817,33 +820,92 @@ int ObResourceManagerProxy::check_iops_validity(
} else if (iops_maximum < iops_minimum) { } else if (iops_maximum < iops_minimum) {
// precheck // precheck
valid = false; valid = false;
} else if (iops_minimum == 0 && iops_maximum == 0) {
ret = OB_INVALID_CONFIG;
LOG_USER_ERROR(OB_INVALID_CONFIG, "io request cannot schedule with this config");
} else { } else {
ObSEArray<ObPlanDirective, 8> directives; //step 1: check io calibration status
if (OB_FAIL(get_all_plan_directives(tenant_id, plan_name, directives))) { if (!ObIOCalibration::get_instance().is_valid()) {
LOG_WARN("fail get plan directive", K(tenant_id), K(plan_name), K(ret)); valid = false;
ret = OB_INVALID_CONFIG;
LOG_WARN("not run io_calibration yet", K(ret));
LOG_USER_ERROR(OB_INVALID_CONFIG, "not run io_calibration yet");
} else { } else {
uint64_t total_min = 0; //step 2: check unit_config.min_iops
for (int64_t i = 0; OB_SUCC(ret) && i < directives.count(); ++i) { int64_t iops_16k = 0;
ObPlanDirective &cur_directive = directives.at(i); sqlclient::ObMySQLResult *result = nullptr;
if (OB_UNLIKELY(!is_user_group(cur_directive.group_id_))) { SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObSqlString sql_string;
char ip_str[INET6_ADDRSTRLEN] = { 0 };
const ObAddr &self_addr = OBSERVER.get_self();
if (OB_UNLIKELY(!self_addr.ip_to_string(ip_str, sizeof(ip_str)))) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected group id", K(cur_directive)); LOG_WARN("get self ip string failed", K(ret));
} else if (OB_UNLIKELY(!cur_directive.is_valid())) { } else if (OB_FAIL(sql_string.append_fmt(
ret = OB_INVALID_CONFIG; "SELECT iops FROM %s WHERE svr_ip = \"%s\" AND svr_port = %d AND mode = 'READ' AND size = 16384 AND storage_name = \"DATA\"",
LOG_WARN("invalid group io config", K(cur_directive)); share::OB_ALL_DISK_IO_CALIBRATION_TNAME, ip_str, self_addr.get_port()))) {
} else if ((0 == group.compare(cur_directive.group_name_.get_value()))) { LOG_WARN("generate sql string failed", K(ret), K(self_addr));
//skip cur group } else if (OB_FAIL(OBSERVER.get_mysql_proxy().read(res, sql_string.ptr()))) {
LOG_WARN("query failed", K(ret), K(sql_string));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result is null", K(ret), KP(result));
} else { } else {
total_min += cur_directive.min_iops_; if (OB_FAIL(result->next())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to read", K(ret), K(sql_string));
} else {
ObIOBenchResult item;
ObString mode_string;
EXTRACT_INT_FIELD_MYSQL(*result, "iops", iops_16k, int64_t);
}
} }
} }
if(OB_SUCC(ret)) { if (OB_SUCC(ret)) {
total_min += iops_minimum; ObRefHolder<ObTenantIOManager> tenant_holder;
if (total_min > 100) { if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(tenant_id, tenant_holder))) {
valid = false; LOG_WARN("get tenant io manager failed", K(ret), K(tenant_id));
LOG_WARN("invalid group io config", K(total_min), K(iops_minimum), K(iops_maximum), K(plan_name));
} else { } else {
valid = true; if (iops_minimum != 0 && iops_minimum / 100 * (tenant_holder.get_ptr()->get_io_config().unit_config_.min_iops_) > iops_16k * 10) {
valid = false;
ret = OB_INVALID_CONFIG;
LOG_WARN("unit_config.min_iops is too big, iops isolation may not work", K(ret), K(iops_minimum), K(iops_16k));
LOG_USER_ERROR(OB_INVALID_CONFIG, "unit_config.min_iops is too big, iops isolation may not work");
}
}
}
}
//step 3: check min/max iops
if (OB_SUCC(ret)) {
ObSEArray<ObPlanDirective, 8> directives;
if (OB_FAIL(get_all_plan_directives(tenant_id, plan_name, directives))) {
LOG_WARN("fail get plan directive", K(tenant_id), K(plan_name), K(ret));
} else {
uint64_t total_min = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < directives.count(); ++i) {
ObPlanDirective &cur_directive = directives.at(i);
if (OB_UNLIKELY(!is_user_group(cur_directive.group_id_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected group id", K(cur_directive));
} else if (OB_UNLIKELY(!cur_directive.is_valid())) {
ret = OB_INVALID_CONFIG;
LOG_WARN("invalid group io config", K(cur_directive));
} else if ((0 == group.compare(cur_directive.group_name_.get_value()))) {
//skip cur group
} else {
total_min += cur_directive.min_iops_;
}
}
if(OB_SUCC(ret)) {
total_min += iops_minimum;
if (total_min > 100) {
valid = false;
ret = OB_INVALID_CONFIG;
LOG_USER_ERROR(OB_INVALID_CONFIG, "invalid config, sum min_iops > 100");
LOG_WARN("invalid group io config", K(ret), K(total_min), K(iops_minimum), K(iops_maximum), K(plan_name));
} else {
valid = true;
}
} }
} }
} }

View File

@ -45,6 +45,9 @@ ObString oceanbase::share::get_io_function_name(ObFunctionType function_type)
case ObFunctionType::PRIO_DDL_HIGH: case ObFunctionType::PRIO_DDL_HIGH:
ret_name = ObString("DDL_HIGH"); ret_name = ObString("DDL_HIGH");
break; break;
case ObFunctionType::PRIO_OTHER_BACKGROUND:
ret_name = ObString("OTHER_BACKGROUND");
break;
default: default:
ret_name = ObString("OTHER_GROUPS"); ret_name = ObString("OTHER_GROUPS");
break; break;

View File

@ -36,6 +36,7 @@ enum ObFunctionType
PRIO_HA_LOW = 5, PRIO_HA_LOW = 5,
PRIO_DDL = 6, PRIO_DDL = 6,
PRIO_DDL_HIGH = 7, PRIO_DDL_HIGH = 7,
PRIO_OTHER_BACKGROUND = 8,
MAX_FUNCTION_NUM MAX_FUNCTION_NUM
}; };

View File

@ -1619,26 +1619,22 @@ void ObTenantDagWorker::resume()
int ObTenantDagWorker::set_dag_resource(const uint64_t group_id) int ObTenantDagWorker::set_dag_resource(const uint64_t group_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(GCTX.cgroup_ctrl_)) { uint64_t consumer_group_id = 0;
//cgroup not init, cannot bind thread and control resource if (group_id != 0) {
} else { //user level
uint64_t consumer_group_id = 0; consumer_group_id = group_id;
if (group_id != 0) { } else if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_function_type(MTL_ID(), function_type_, consumer_group_id))) {
//user level //function level
consumer_group_id = group_id; LOG_WARN("fail to get group id by function", K(ret), K(MTL_ID()), K(function_type_), K(consumer_group_id));
} else if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_function_type(MTL_ID(), function_type_, consumer_group_id))) { }
//function level if (OB_SUCC(ret) && consumer_group_id != group_id_) {
LOG_WARN("fail to get group id by function", K(ret), K(MTL_ID()), K(function_type_), K(consumer_group_id)); // for CPU isolation, depend on cgroup
} if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_group(MTL_ID(), consumer_group_id))) {
if (OB_SUCC(ret) && consumer_group_id != group_id_) { LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(MTL_ID()), K(group_id));
// for CPU isolation, depend on cgroup } else {
if (GCTX.cgroup_ctrl_->is_valid() && OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_group(MTL_ID(), consumer_group_id))) { // for IOPS isolation, only depend on consumer_group_id
LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(MTL_ID()), K(group_id)); ATOMIC_SET(&group_id_, consumer_group_id);
} else { THIS_WORKER.set_group_id(static_cast<int32_t>(consumer_group_id));
// for IOPS isolation, only depend on consumer_group_id
ATOMIC_SET(&group_id_, consumer_group_id);
THIS_WORKER.set_group_id(static_cast<int32_t>(consumer_group_id));
}
} }
} }
return ret; return ret;

View File

@ -23,6 +23,7 @@
#include "share/ob_io_device_helper.h" #include "share/ob_io_device_helper.h"
#include "share/ob_unit_getter.h" #include "share/ob_unit_getter.h"
#include "share/rc/ob_tenant_base.h" #include "share/rc/ob_tenant_base.h"
#include "share/resource_manager/ob_resource_manager.h"
#include "storage/blocksstable/ob_block_manager.h" #include "storage/blocksstable/ob_block_manager.h"
#include "storage/blocksstable/ob_macro_block_struct.h" #include "storage/blocksstable/ob_macro_block_struct.h"
#include "storage/blocksstable/ob_sstable_meta.h" #include "storage/blocksstable/ob_sstable_meta.h"
@ -140,6 +141,7 @@ ObBlockManager::ObBlockManager()
blk_seq_generator_(), blk_seq_generator_(),
alloc_num_(0), alloc_num_(0),
resize_file_lock_(), resize_file_lock_(),
group_id_(0),
is_inited_(false), is_inited_(false),
is_started_(false) is_started_(false)
{ {
@ -273,6 +275,7 @@ void ObBlockManager::destroy()
marker_status_.reset(); marker_status_.reset();
blk_seq_generator_.reset(); blk_seq_generator_.reset();
ATOMIC_STORE(&alloc_num_, 0); ATOMIC_STORE(&alloc_num_, 0);
group_id_ = 0;
is_inited_ = false; is_inited_ = false;
} }
@ -1018,7 +1021,9 @@ int ObBlockManager::mark_macro_blocks(
const uint64_t tenant_id = mtl_tenant_ids.at(i); const uint64_t tenant_id = mtl_tenant_ids.at(i);
MacroBlockId macro_id; MacroBlockId macro_id;
MTL_SWITCH(tenant_id) { MTL_SWITCH(tenant_id) {
if (OB_FAIL(mark_tenant_blocks(mark_info, macro_id_set, tmp_status))) { if (OB_FAIL(set_group_id(tenant_id))) {
LOG_WARN("isolate CPU and IOPS failed", K(ret));
} else if (OB_FAIL(mark_tenant_blocks(mark_info, macro_id_set, tmp_status))) {
LOG_WARN("fail to mark tenant blocks", K(ret), K(tenant_id)); LOG_WARN("fail to mark tenant blocks", K(ret), K(tenant_id));
} else if (OB_FALSE_IT(MTL(ObSharedMacroBlockMgr*)->get_cur_shared_block(macro_id))) { } else if (OB_FALSE_IT(MTL(ObSharedMacroBlockMgr*)->get_cur_shared_block(macro_id))) {
} else if (OB_FAIL(mark_held_shared_block(macro_id, mark_info, macro_id_set, tmp_status))) { } else if (OB_FAIL(mark_held_shared_block(macro_id, mark_info, macro_id_set, tmp_status))) {
@ -1414,6 +1419,31 @@ int ObBlockManager::update_mark_info(const MacroBlockId &macro_id,
return ret; return ret;
} }
int ObBlockManager::set_group_id(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(tenant_id <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant id", K(ret), K(tenant_id));
} else {
uint64_t consumer_group_id = 0;
if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_function_type(tenant_id, ObFunctionType::PRIO_OTHER_BACKGROUND, consumer_group_id))) {
//function level
LOG_WARN("fail to get group id by function", K(ret), K(tenant_id), K(consumer_group_id));
} else if (consumer_group_id != group_id_) {
// for CPU isolation, depend on cgroup
if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_group(tenant_id, consumer_group_id))) {
LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(tenant_id), K(consumer_group_id));
}
}
if (OB_SUCC(ret)) {
group_id_ = consumer_group_id;
THIS_WORKER.set_group_id(static_cast<int32_t>(consumer_group_id));
}
}
return ret;
}
int ObBlockManager::BlockMapIterator::get_next_block(common::ObIOFd &block_id) int ObBlockManager::BlockMapIterator::get_next_block(common::ObIOFd &block_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -376,6 +376,7 @@ private:
MacroBlkIdMap &mark_info, MacroBlkIdMap &mark_info,
common::hash::ObHashSet<MacroBlockId, common::hash::NoPthreadDefendMode> &macro_id_set, common::hash::ObHashSet<MacroBlockId, common::hash::NoPthreadDefendMode> &macro_id_set,
ObMacroBlockMarkerStatus &tmp_status); ObMacroBlockMarkerStatus &tmp_status);
int set_group_id(const uint64_t tenant_id);
bool continue_mark(); bool continue_mark();
int do_sweep(MacroBlkIdMap &mark_info); int do_sweep(MacroBlkIdMap &mark_info);
@ -478,6 +479,9 @@ private:
int64_t alloc_num_; int64_t alloc_num_;
lib::ObMutex resize_file_lock_; lib::ObMutex resize_file_lock_;
// for resource_isolation
uint64_t group_id_;
bool is_inited_; bool is_inited_;
bool is_started_; bool is_started_;
}; };