Query __all_ddl_task_status in tenant space
This commit is contained in:
@ -448,8 +448,8 @@ int ObRedefCallback::modify_info(ObTableRedefinitionTask &redef_task,
|
||||
} else if (OB_FAIL(update_task_info_in_queue(redef_task, task_queue))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
bool exist = false;
|
||||
if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(*GCTX.sql_proxy_, redef_task.get_task_id(), exist))) {
|
||||
LOG_WARN("check task id exist fail", K(ret), K(task_id));
|
||||
if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(*GCTX.sql_proxy_, redef_task.get_tenant_id(), redef_task.get_task_id(), exist))) {
|
||||
LOG_WARN("check task id exist fail", K(ret), K(redef_task.get_tenant_id()), K(task_id));
|
||||
} else {
|
||||
if (exist) {
|
||||
ret = OB_EAGAIN;
|
||||
@ -1097,7 +1097,8 @@ int ObDDLScheduler::get_task_record(const ObDDLTaskID &task_id,
|
||||
}))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id.task_id_,
|
||||
if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id.tenant_id_,
|
||||
task_id.task_id_,
|
||||
root_service_->get_sql_proxy(),
|
||||
allocator,
|
||||
task_record))) {
|
||||
|
||||
@ -2654,6 +2654,7 @@ int ObDDLTaskRecordOperator::delete_record(common::ObMySQLProxy &proxy, const ui
|
||||
int ObDDLTaskRecordOperator::check_is_adding_constraint(
|
||||
common::ObMySQLProxy *proxy,
|
||||
common::ObIAllocator &allocator,
|
||||
const uint64_t tenant_id,
|
||||
const uint64_t object_id,
|
||||
bool &is_building)
|
||||
{
|
||||
@ -2668,10 +2669,10 @@ int ObDDLTaskRecordOperator::check_is_adding_constraint(
|
||||
if (OB_FAIL(sql_string.assign_fmt(" SELECT time_to_usec(gmt_create) AS create_time, tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s "
|
||||
"WHERE object_id = %" PRIu64 " && ddl_type IN (%d, %d, %d)", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME,
|
||||
"WHERE object_id = %" PRIu64 " && ddl_type IN (%d, %d, %d)", OB_ALL_DDL_TASK_STATUS_TNAME,
|
||||
object_id, DDL_CHECK_CONSTRAINT, DDL_FOREIGN_KEY_CONSTRAINT, DDL_ADD_NOT_NULL_COLUMN))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
} else if (OB_FAIL(proxy->read(res, sql_string.ptr()))) {
|
||||
} else if (OB_FAIL(proxy->read(res, tenant_id, sql_string.ptr()))) {
|
||||
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2679,7 +2680,7 @@ int ObDDLTaskRecordOperator::check_is_adding_constraint(
|
||||
} else {
|
||||
ObDDLTaskRecord task_record;
|
||||
if (OB_SUCC(ret) && OB_SUCC(result->next())) {
|
||||
if (OB_FAIL(fill_task_record(result, allocator, task_record))) {
|
||||
if (OB_FAIL(fill_task_record(tenant_id, result, allocator, task_record))) {
|
||||
LOG_WARN("fill index task failed", K(ret), K(result));
|
||||
} else if (!task_record.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2716,10 +2717,10 @@ int ObDDLTaskRecordOperator::check_has_long_running_ddl(
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
sqlclient::ObMySQLResult *result = nullptr;
|
||||
if (OB_FAIL(sql_string.assign_fmt(" SELECT * FROM %s "
|
||||
"WHERE tenant_id = %lu AND object_id = %lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME,
|
||||
tenant_id, table_id))) {
|
||||
"WHERE object_id = %lu", OB_ALL_DDL_TASK_STATUS_TNAME,
|
||||
table_id))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
} else if (OB_FAIL(proxy->read(res, sql_string.ptr()))) {
|
||||
} else if (OB_FAIL(proxy->read(res, tenant_id, sql_string.ptr()))) {
|
||||
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2760,10 +2761,9 @@ int ObDDLTaskRecordOperator::check_has_conflict_ddl(
|
||||
if (OB_FAIL(sql_string.assign_fmt("SELECT time_to_usec(gmt_create) AS create_time, tenant_id, task_id, object_id, target_object_id, ddl_type,"
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id,"
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s "
|
||||
"WHERE tenant_id = %lu AND object_id = %lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME,
|
||||
tenant_id, table_id))) {
|
||||
"WHERE object_id = %lu", OB_ALL_DDL_TASK_STATUS_TNAME, table_id))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
} else if (OB_FAIL(proxy->read(res, sql_string.ptr()))) {
|
||||
} else if (OB_FAIL(proxy->read(res, tenant_id, sql_string.ptr()))) {
|
||||
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2773,7 +2773,7 @@ int ObDDLTaskRecordOperator::check_has_conflict_ddl(
|
||||
ObArenaAllocator allocator("DdlTaskRec");
|
||||
while (OB_SUCC(ret) && !has_conflict_ddl && OB_SUCC(result->next())) {
|
||||
allocator.reuse();
|
||||
if (OB_FAIL(fill_task_record(result, allocator, task_record))) {
|
||||
if (OB_FAIL(fill_task_record(tenant_id, result, allocator, task_record))) {
|
||||
LOG_WARN("failed to fill task record", K(ret));
|
||||
} else if (task_record.task_id_ != task_id) {
|
||||
switch (ddl_type) {
|
||||
@ -2851,7 +2851,8 @@ int ObDDLTaskRecordOperator::check_has_index_task(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::get_task_record(const ObSqlString &sql_string,
|
||||
int ObDDLTaskRecordOperator::get_task_record(const uint64_t tenant_id,
|
||||
const ObSqlString &sql_string,
|
||||
common::ObMySQLProxy &proxy,
|
||||
common::ObIAllocator &allocator,
|
||||
common::ObIArray<ObDDLTaskRecord> &records)
|
||||
@ -2865,14 +2866,22 @@ int ObDDLTaskRecordOperator::get_task_record(const ObSqlString &sql_string,
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObDDLTaskRecord record;
|
||||
sqlclient::ObMySQLResult *result = NULL;
|
||||
if (OB_FAIL(proxy.read(res, sql_string.ptr()))) {
|
||||
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||
if (OB_INVALID_TENANT_ID == tenant_id) {
|
||||
if (OB_FAIL(proxy.read(res, sql_string.ptr()))) {
|
||||
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(proxy.read(res, tenant_id, sql_string.ptr()))) {
|
||||
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get sql result", K(ret), KP(result));
|
||||
} else {
|
||||
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
|
||||
if (OB_FAIL(fill_task_record(result, allocator, record))) {
|
||||
if (OB_FAIL(fill_task_record(tenant_id, result, allocator, record))) {
|
||||
LOG_WARN("fill index task failed", K(ret), K(result));
|
||||
} else if (!record.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2890,7 +2899,8 @@ int ObDDLTaskRecordOperator::get_task_record(const ObSqlString &sql_string,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::get_ddl_task_record(const int64_t task_id,
|
||||
int ObDDLTaskRecordOperator::get_ddl_task_record(const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
common::ObMySQLProxy &proxy,
|
||||
common::ObIAllocator &allocator,
|
||||
ObDDLTaskRecord &record)
|
||||
@ -2903,9 +2913,9 @@ int ObDDLTaskRecordOperator::get_ddl_task_record(const int64_t task_id,
|
||||
LOG_WARN("invalid argument", K(ret), K(proxy.is_inited()));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT time_to_usec(gmt_create) AS create_time, tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s WHERE task_id=%lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, task_id))) {
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s WHERE task_id=%lu", OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) {
|
||||
LOG_WARN("assign sql string failed", K(ret), K(task_id));
|
||||
} else if (OB_FAIL(get_task_record(sql_string, proxy, allocator, task_records))) {
|
||||
} else if (OB_FAIL(get_task_record(tenant_id, sql_string, proxy, allocator, task_records))) {
|
||||
LOG_WARN("get task record failed", K(ret), K(sql_string));
|
||||
} else if (task_records.count() != 1) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2932,13 +2942,13 @@ int ObDDLTaskRecordOperator::get_all_ddl_task_record(common::ObMySQLProxy &proxy
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s ", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
} else if (OB_FAIL(get_task_record(sql_string, proxy, allocator, records))) {
|
||||
} else if (OB_FAIL(get_task_record(OB_INVALID_TENANT_ID, sql_string, proxy, allocator, records))) {
|
||||
LOG_WARN("get task record failed", K(ret), K(sql_string));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::check_task_id_exist(common::ObMySQLProxy &proxy, const int64_t task_id, bool &exist)
|
||||
int ObDDLTaskRecordOperator::check_task_id_exist(common::ObMySQLProxy &proxy, const uint64_t tenant_id, const int64_t task_id, bool &exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!proxy.is_inited())) {
|
||||
@ -2948,10 +2958,9 @@ int ObDDLTaskRecordOperator::check_task_id_exist(common::ObMySQLProxy &proxy, co
|
||||
ObSqlString sql_string;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
sqlclient::ObMySQLResult *result = NULL;
|
||||
// TODO:
|
||||
if (OB_FAIL(sql_string.assign_fmt("SELECT count(*) as have FROM %s WHERE task_id=%lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, task_id))) {
|
||||
if (OB_FAIL(sql_string.assign_fmt("SELECT count(*) as have FROM %s WHERE task_id=%lu", OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
} else if (OB_FAIL(proxy.read(res, sql_string.ptr()))) {
|
||||
} else if (OB_FAIL(proxy.read(res, tenant_id, sql_string.ptr()))) {
|
||||
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||
} else if (OB_ISNULL((result = res.get_result()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -3063,6 +3072,7 @@ int ObDDLTaskRecordOperator::insert_record(
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::fill_task_record(
|
||||
const uint64_t tenant_id,
|
||||
const common::sqlclient::ObMySQLResult *result_row,
|
||||
common::ObIAllocator &allocator,
|
||||
ObDDLTaskRecord &task_record)
|
||||
@ -3095,6 +3105,14 @@ int ObDDLTaskRecordOperator::fill_task_record(
|
||||
EXTRACT_INT_FIELD_MYSQL(*result_row, "execution_id", task_record.execution_id_, int64_t);
|
||||
EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "message_unhex", task_message);
|
||||
EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "ddl_stmt_str_unhex", ddl_stmt_str);
|
||||
if (OB_SUCC(ret) && OB_INVALID_TENANT_ID != tenant_id) {
|
||||
if (OB_INVALID_TENANT_ID != task_record.tenant_id_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tenant id not empty, cannot overwrite it", K(ret));
|
||||
} else {
|
||||
task_record.tenant_id_ = tenant_id;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
SCN check_snapshot_version;
|
||||
if (OB_FAIL(check_snapshot_version.convert_for_tx(task_record.snapshot_version_))) {
|
||||
|
||||
@ -218,6 +218,7 @@ public:
|
||||
int64_t &execution_id);
|
||||
|
||||
static int get_ddl_task_record(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
common::ObMySQLProxy &proxy,
|
||||
common::ObIAllocator &allocator,
|
||||
@ -229,12 +230,14 @@ public:
|
||||
|
||||
static int check_task_id_exist(
|
||||
common::ObMySQLProxy &proxy,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
bool &exist);
|
||||
|
||||
static int check_is_adding_constraint(
|
||||
common::ObMySQLProxy *proxy,
|
||||
common::ObIAllocator &allocator,
|
||||
const uint64_t tenant_id,
|
||||
const uint64_t table_id,
|
||||
bool &is_building);
|
||||
|
||||
@ -275,6 +278,7 @@ public:
|
||||
|
||||
private:
|
||||
static int fill_task_record(
|
||||
const uint64_t tenant_id,
|
||||
const common::sqlclient::ObMySQLResult *result_row,
|
||||
common::ObIAllocator &allocator,
|
||||
ObDDLTaskRecord &task_record);
|
||||
@ -286,6 +290,7 @@ private:
|
||||
const uint64_t session_id);
|
||||
|
||||
static int get_task_record(
|
||||
const uint64_t tenant_id,
|
||||
const ObSqlString &sql_string,
|
||||
common::ObMySQLProxy &proxy,
|
||||
common::ObIAllocator &allocator,
|
||||
|
||||
@ -11023,7 +11023,7 @@ int ObDDLService::check_is_offline_ddl(ObAlterTableArg &alter_table_arg,
|
||||
table_id,
|
||||
has_index_operation))) {
|
||||
LOG_WARN("check has index operation failed", K(ret));
|
||||
} else if (OB_FAIL(check_is_adding_constraint(table_id, is_adding_constraint))) {
|
||||
} else if (OB_FAIL(check_is_adding_constraint(tenant_id, table_id, is_adding_constraint))) {
|
||||
LOG_WARN("failed to call check_is_adding_constraint", K(ret));
|
||||
} else if (has_index_operation) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
@ -11081,10 +11081,10 @@ int ObDDLService::check_has_index_operation(ObSchemaGetterGuard &schema_guard,
|
||||
}
|
||||
|
||||
// check if is adding check constraint, foreign key, not null constraint
|
||||
int ObDDLService::check_is_adding_constraint(const uint64_t table_id, bool &is_building)
|
||||
int ObDDLService::check_is_adding_constraint(const uint64_t tenant_id, const uint64_t table_id, bool &is_building)
|
||||
{
|
||||
ObArenaAllocator allocator(lib::ObLabel("DdlTasRecord"));
|
||||
return ObDDLTaskRecordOperator::check_is_adding_constraint(sql_proxy_, allocator, table_id, is_building);
|
||||
return ObDDLTaskRecordOperator::check_is_adding_constraint(sql_proxy_, allocator, tenant_id, table_id, is_building);
|
||||
}
|
||||
|
||||
// check whether the foreign key related table is executing offline ddl, creating index, and executin constrtaint task.
|
||||
|
||||
@ -1124,7 +1124,7 @@ private:
|
||||
const uint64_t teannt_id,
|
||||
const uint64_t table_id,
|
||||
bool &has_index_operation);
|
||||
int check_is_adding_constraint(const uint64_t table_id, bool &is_building);
|
||||
int check_is_adding_constraint(const uint64_t tenant_id, const uint64_t table_id, bool &is_building);
|
||||
int modify_tenant_inner_phase(const obrpc::ObModifyTenantArg &arg,
|
||||
const ObTenantSchema *orig_tenant_schema,
|
||||
ObSchemaGetterGuard &schema_guard,
|
||||
|
||||
@ -170,6 +170,7 @@ int ObDDLCtrlSpeedItem::check_cur_node_is_leader(bool &is_leader)
|
||||
|
||||
int ObDDLCtrlSpeedItem::do_sleep(
|
||||
const int64_t next_available_ts,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
int64_t &real_sleep_us)
|
||||
@ -183,9 +184,9 @@ int ObDDLCtrlSpeedItem::do_sleep(
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (next_available_ts <= 0 || task_id == 0) {
|
||||
} else if (next_available_ts <= 0 || OB_INVALID_TENANT_ID == tenant_id || task_id == 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument.", K(ret), K(next_available_ts), K(task_id));
|
||||
LOG_WARN("invalid argument.", K(ret), K(next_available_ts), K(tenant_id), K(task_id));
|
||||
} else if (OB_TMP_FAIL(check_need_stop_write(ddl_kv_mgr_handle, is_need_stop_write))) {
|
||||
LOG_WARN("fail to check need stop write", K(tmp_ret), K(ddl_kv_mgr_handle));
|
||||
}
|
||||
@ -197,7 +198,7 @@ int ObDDLCtrlSpeedItem::do_sleep(
|
||||
// TODO YIREN (FIXME-20221017), exit when task is canceled, etc.
|
||||
ob_usleep(SLEEP_INTERVAL);
|
||||
if (0 == loop_cnt % 100) {
|
||||
if (OB_TMP_FAIL(rootserver::ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy, task_id, is_exist))) {
|
||||
if (OB_TMP_FAIL(rootserver::ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy, tenant_id, task_id, is_exist))) {
|
||||
is_exist = true;
|
||||
LOG_WARN("check task id exist failed", K(tmp_ret), K(task_id));
|
||||
} else {
|
||||
@ -254,6 +255,7 @@ int ObDDLCtrlSpeedItem::check_need_stop_write(ObDDLKvMgrHandle &ddl_kv_mgr_handl
|
||||
// calculate the sleep time for the input bytes, sleep.
|
||||
int ObDDLCtrlSpeedItem::limit_and_sleep(
|
||||
const int64_t bytes,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
int64_t &real_sleep_us)
|
||||
@ -266,9 +268,9 @@ int ObDDLCtrlSpeedItem::limit_and_sleep(
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if ((disk_used_stop_write_threshold_ <= 0
|
||||
|| disk_used_stop_write_threshold_ > 100) || bytes < 0 || 0 == task_id) {
|
||||
|| disk_used_stop_write_threshold_ > 100) || bytes < 0 || OB_INVALID_TENANT_ID == tenant_id || 0 == task_id) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument.", K(ret), K(disk_used_stop_write_threshold_), K(bytes), K(task_id));
|
||||
LOG_WARN("invalid argument.", K(ret), K(disk_used_stop_write_threshold_), K(bytes), K(tenant_id), K(task_id));
|
||||
} else if (OB_FAIL(cal_limit(bytes, next_available_ts))) {
|
||||
LOG_WARN("fail to calculate sleep time", K(ret), K(bytes), K(next_available_ts));
|
||||
} else if (OB_ISNULL(GCTX.bandwidth_throttle_)) {
|
||||
@ -279,7 +281,7 @@ int ObDDLCtrlSpeedItem::limit_and_sleep(
|
||||
INT64_MAX,
|
||||
&transmit_sleep_us))) {
|
||||
LOG_WARN("fail to limit out and sleep", K(ret), K(bytes), K(transmit_sleep_us));
|
||||
} else if (OB_FAIL(do_sleep(next_available_ts, task_id, ddl_kv_mgr_handle, real_sleep_us))) {
|
||||
} else if (OB_FAIL(do_sleep(next_available_ts, tenant_id, task_id, ddl_kv_mgr_handle, real_sleep_us))) {
|
||||
LOG_WARN("fail to sleep", K(ret), K(next_available_ts), K(real_sleep_us));
|
||||
} else {/* do nothing. */}
|
||||
return ret;
|
||||
@ -400,6 +402,7 @@ int ObDDLCtrlSpeedHandle::limit_and_sleep(const uint64_t tenant_id,
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected err, ctrl speed item is nullptr", K(ret), K(speed_handle_key));
|
||||
} else if (OB_FAIL(speed_handle_item->limit_and_sleep(bytes,
|
||||
tenant_id,
|
||||
task_id,
|
||||
ddl_kv_mgr_handle,
|
||||
real_sleep_us))) {
|
||||
|
||||
@ -56,6 +56,7 @@ public:
|
||||
int init(const share::ObLSID &ls_id);
|
||||
int refresh();
|
||||
int limit_and_sleep(const int64_t bytes,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
int64_t &real_sleep_us);
|
||||
@ -72,6 +73,7 @@ private:
|
||||
int check_cur_node_is_leader(bool &is_leader);
|
||||
int cal_limit(const int64_t bytes, int64_t &next_available_ts);
|
||||
int do_sleep(const int64_t next_available_ts,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
int64_t &real_sleep_us);
|
||||
|
||||
Reference in New Issue
Block a user