support DDL_group_id transfer with RPC struct
This commit is contained in:
@ -103,12 +103,13 @@ int ObInnerSqlRpcP::process_write(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t affected_rows = -1;
|
||||
ResourceGroupGuard guard(transmit_arg.get_consumer_group_id());
|
||||
if (OB_FAIL(conn->execute_write(transmit_arg.get_tenant_id(), write_sql.ptr(), affected_rows))) {
|
||||
LOG_WARN("execute write failed", K(ret), K(transmit_arg), K(write_sql));
|
||||
} else {
|
||||
transmit_result.set_affected_rows(affected_rows);
|
||||
transmit_result.set_stmt_type(
|
||||
static_cast<observer::ObInnerSQLConnection *>(conn)->get_session().get_stmt_type());
|
||||
static_cast<observer::ObInnerSQLConnection *>(conn)->get_session().get_stmt_type());
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -534,7 +535,6 @@ int ObInnerSqlRpcP::process()
|
||||
LOG_WARN("failed to acquire inner connection", K(ret), K(transmit_arg));
|
||||
}
|
||||
/* init session info */
|
||||
const int64_t group_id = transmit_arg.get_consumer_group_id();
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(tmp_session)) {
|
||||
tmp_session->set_current_trace_id(ObCurTraceId::get_trace_id());
|
||||
tmp_session->switch_tenant(transmit_arg.get_tenant_id());
|
||||
@ -717,6 +717,23 @@ int ObInnerSqlRpcP::set_session_param_to_conn(
|
||||
return ret;
|
||||
}
|
||||
|
||||
ResourceGroupGuard::ResourceGroupGuard(const int32_t group_id)
|
||||
: group_change_(false), old_group_id_(0)
|
||||
{
|
||||
if (group_id >= RESOURCE_GROUP_START_ID) {
|
||||
old_group_id_ = THIS_WORKER.get_group_id();
|
||||
THIS_WORKER.set_group_id(group_id);
|
||||
group_change_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
ResourceGroupGuard::~ResourceGroupGuard()
|
||||
{
|
||||
if (group_change_) {
|
||||
THIS_WORKER.set_group_id(old_group_id_);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} // namespace oceanbase
|
||||
|
||||
|
||||
@ -95,6 +95,18 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObInnerSqlRpcP);
|
||||
};
|
||||
|
||||
class ResourceGroupGuard
|
||||
{
|
||||
//todo qilu:revert after ddl_back_threads are split under tenants
|
||||
public:
|
||||
ResourceGroupGuard(const int32_t group_id);
|
||||
~ResourceGroupGuard();
|
||||
public:
|
||||
bool group_change_;
|
||||
int32_t old_group_id_;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
#endif /* OBDEV_SRC_OBSERVER_INNER_SQL_RPC_PROCESSOR_H_ */
|
||||
|
||||
@ -438,7 +438,7 @@ int64_t ObIORequest::get_group_id() const
|
||||
uint64_t ObIORequest::get_io_usage_index()
|
||||
{
|
||||
uint64_t index = 0;
|
||||
if (get_group_id() < GROUP_START_ID) {
|
||||
if (get_group_id() < RESOURCE_GROUP_START_ID) {
|
||||
//other group , do nothing
|
||||
} else {
|
||||
index = tenant_io_mgr_.get_ptr()->get_usage_index(get_group_id());
|
||||
@ -1173,7 +1173,7 @@ int ObTenantIOConfig::parse_group_config(const char *config_str)
|
||||
int ObTenantIOConfig::add_single_group_config(const uint64_t tenant_id, const int64_t group_id, int64_t min_percent, int64_t max_percent, int64_t weight_percent)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (group_id < GROUP_START_ID || !is_valid_tenant_id(tenant_id) ||
|
||||
if (group_id < RESOURCE_GROUP_START_ID || !is_valid_tenant_id(tenant_id) ||
|
||||
min_percent < 0 || min_percent > 100 ||
|
||||
max_percent < 0 || max_percent > 100 ||
|
||||
weight_percent < 0 || weight_percent > 100 ||
|
||||
|
||||
@ -31,7 +31,6 @@ namespace common
|
||||
|
||||
static constexpr int64_t DEFAULT_IO_WAIT_TIME_MS = 5000L; // 5s
|
||||
static constexpr int64_t MAX_IO_WAIT_TIME_MS = 300L * 1000L; // 5min
|
||||
static constexpr int64_t GROUP_START_ID = 10000L; // start id = 10000
|
||||
static constexpr int64_t GROUP_START_NUM = 8L;
|
||||
enum class ObIOMode : uint8_t
|
||||
{
|
||||
|
||||
@ -980,7 +980,7 @@ int ObTenantIOManager::modify_io_config(const uint64_t group_id,
|
||||
} else {
|
||||
uint64_t index = INT64_MAX;
|
||||
DRWLock::WRLockGuard guard(io_config_lock_);
|
||||
if (group_id < GROUP_START_ID && group_id > 0) {
|
||||
if (group_id < RESOURCE_GROUP_START_ID && group_id > 0) {
|
||||
ret = OB_INVALID_CONFIG;
|
||||
LOG_WARN("invalid group id", K(ret), K(tenant_id_), K(group_id));
|
||||
} else if (min_percent < 0 || min_percent > 100 ||
|
||||
@ -1046,7 +1046,7 @@ int ObTenantIOManager::add_group_io_config(const int64_t group_id,
|
||||
} else if (OB_UNLIKELY(!is_working())) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
LOG_WARN("tenant not working", K(ret), K(tenant_id_));
|
||||
} else if (group_id < GROUP_START_ID || min_percent < 0 || min_percent > 100 ||
|
||||
} else if (group_id < RESOURCE_GROUP_START_ID || min_percent < 0 || min_percent > 100 ||
|
||||
max_percent < 0 || max_percent > 100 || max_percent < min_percent ||
|
||||
weight_percent < 0 || weight_percent > 100) {
|
||||
ret = OB_INVALID_CONFIG;
|
||||
@ -1099,7 +1099,7 @@ int ObTenantIOManager::reset_consumer_group_config(const int64_t group_id)
|
||||
} else if (OB_UNLIKELY(!is_working())) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
LOG_WARN("tenant not working", K(ret), K(tenant_id_));
|
||||
} else if (group_id < GROUP_START_ID) {
|
||||
} else if (group_id < RESOURCE_GROUP_START_ID) {
|
||||
ret = OB_INVALID_CONFIG;
|
||||
LOG_WARN("cannot reset other group io config", K(ret), K(group_id));
|
||||
} else {
|
||||
@ -1138,7 +1138,7 @@ int ObTenantIOManager::delete_consumer_group_config(const int64_t group_id)
|
||||
} else if (OB_UNLIKELY(!is_working())) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
LOG_WARN("tenant not working", K(ret), K(tenant_id_));
|
||||
} else if (group_id < GROUP_START_ID) {
|
||||
} else if (group_id < RESOURCE_GROUP_START_ID) {
|
||||
ret = OB_INVALID_CONFIG;
|
||||
LOG_WARN("cannot delete other group io config", K(ret), K(group_id));
|
||||
} else {
|
||||
|
||||
@ -992,7 +992,7 @@ int ObIOSender::enqueue_request(ObIORequest &req)
|
||||
} else {
|
||||
uint64_t index = INT_MAX64;
|
||||
const int64_t group_id = tmp_req->get_group_id();
|
||||
if (group_id < GROUP_START_ID) { //other
|
||||
if (group_id < RESOURCE_GROUP_START_ID) { //other
|
||||
tmp_phy_queue = &(io_group_queues->other_phy_queue_);
|
||||
} else if (OB_FAIL(req.tenant_io_mgr_.get_ptr()->get_group_index(group_id, index))) {
|
||||
// 防止删除group、新建group等情况发生时在途req无法找到对应的group
|
||||
|
||||
@ -375,7 +375,7 @@ int ObCgroupCtrl::add_self_to_cgroup(const uint64_t tenant_id, int64_t group_id)
|
||||
bool exist_cgroup = false;
|
||||
|
||||
if (OB_FAIL(get_group_path(group_path, PATH_BUFSIZE, tenant_id, group_id))) {
|
||||
LOG_WARN("fail get group path", K(tenant_id), K(ret));
|
||||
LOG_WARN("fail get group path", K(tenant_id), K(ret), K(group_id));
|
||||
} else if (OB_FAIL(FileDirectoryUtils::is_exists(group_path, exist_cgroup))) {
|
||||
LOG_WARN("fail check file exist", K(group_path), K(ret));
|
||||
} else if (!exist_cgroup && OB_FAIL(init_cgroup_dir_(group_path))) {
|
||||
@ -752,7 +752,7 @@ int ObCgroupCtrl::reset_group_iops(const uint64_t tenant_id,
|
||||
} else {
|
||||
LOG_WARN("fail get group id", K(ret), K(group_id), K(group_name));
|
||||
}
|
||||
} else if (group_id < GROUP_START_ID) {
|
||||
} else if (group_id < RESOURCE_GROUP_START_ID) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid group id", K(ret), K(group_id), K(group_name));
|
||||
} else if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(tenant_id, tenant_holder))) {
|
||||
@ -787,7 +787,7 @@ int ObCgroupCtrl::delete_group_iops(const uint64_t tenant_id,
|
||||
} else {
|
||||
LOG_WARN("fail get group id", K(ret), K(group_id), K(group_name));
|
||||
}
|
||||
} else if (group_id < GROUP_START_ID) {
|
||||
} else if (group_id < RESOURCE_GROUP_START_ID) {
|
||||
//OTHER_GROUPS and all cannot be deleted
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid group id", K(ret), K(group_id), K(group_name));
|
||||
|
||||
@ -820,7 +820,7 @@ int ObResourceManagerProxy::check_iops_validity(
|
||||
uint64_t total_min = 0;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < directives.count(); ++i) {
|
||||
ObPlanDirective &cur_directive = directives.at(i);
|
||||
if (cur_directive.group_id_ < GROUP_START_ID) {
|
||||
if (cur_directive.group_id_ < RESOURCE_GROUP_START_ID) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected group id", K(cur_directive));
|
||||
} else if (OB_UNLIKELY(!cur_directive.is_valid())) {
|
||||
|
||||
@ -222,7 +222,7 @@ int ObResourcePlanManager::normalize_iops_directives(const uint64_t tenant_id,
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < directives.count(); ++i) {
|
||||
ObPlanDirective &cur_directive = directives.at(i);
|
||||
if (cur_directive.group_id_ < GROUP_START_ID) {
|
||||
if (cur_directive.group_id_ < RESOURCE_GROUP_START_ID) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
// 理论上不应该出现
|
||||
LOG_WARN("unexpected error!!!", K(cur_directive));
|
||||
|
||||
Reference in New Issue
Block a user