diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index 15028d8eb8..e5ceef0323 100755 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -448,8 +448,8 @@ int ObRedefCallback::modify_info(ObTableRedefinitionTask &redef_task, } else if (OB_FAIL(update_task_info_in_queue(redef_task, task_queue))) { if (OB_ENTRY_NOT_EXIST == ret) { bool exist = false; - if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(*GCTX.sql_proxy_, redef_task.get_task_id(), exist))) { - LOG_WARN("check task id exist fail", K(ret), K(task_id)); + if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(*GCTX.sql_proxy_, redef_task.get_tenant_id(), redef_task.get_task_id(), exist))) { + LOG_WARN("check task id exist fail", K(ret), K(redef_task.get_tenant_id()), K(task_id)); } else { if (exist) { ret = OB_EAGAIN; @@ -1097,7 +1097,8 @@ int ObDDLScheduler::get_task_record(const ObDDLTaskID &task_id, }))) { if (OB_ENTRY_NOT_EXIST == ret) { int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id.task_id_, + if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id.tenant_id_, + task_id.task_id_, root_service_->get_sql_proxy(), allocator, task_record))) { diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index bda01f580c..961367db6d 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -2654,6 +2654,7 @@ int ObDDLTaskRecordOperator::delete_record(common::ObMySQLProxy &proxy, const ui int ObDDLTaskRecordOperator::check_is_adding_constraint( common::ObMySQLProxy *proxy, common::ObIAllocator &allocator, + const uint64_t tenant_id, const uint64_t object_id, bool &is_building) { @@ -2668,10 +2669,10 @@ int ObDDLTaskRecordOperator::check_is_adding_constraint( if (OB_FAIL(sql_string.assign_fmt(" SELECT time_to_usec(gmt_create) AS create_time, tenant_id, task_id, object_id, target_object_id, ddl_type, " "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, " "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s " - "WHERE object_id = %" PRIu64 " && ddl_type IN (%d, %d, %d)", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, + "WHERE object_id = %" PRIu64 " && ddl_type IN (%d, %d, %d)", OB_ALL_DDL_TASK_STATUS_TNAME, object_id, DDL_CHECK_CONSTRAINT, DDL_FOREIGN_KEY_CONSTRAINT, DDL_ADD_NOT_NULL_COLUMN))) { LOG_WARN("assign sql string failed", K(ret)); - } else if (OB_FAIL(proxy->read(res, sql_string.ptr()))) { + } else if (OB_FAIL(proxy->read(res, tenant_id, sql_string.ptr()))) { LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -2679,7 +2680,7 @@ int ObDDLTaskRecordOperator::check_is_adding_constraint( } else { ObDDLTaskRecord task_record; if (OB_SUCC(ret) && OB_SUCC(result->next())) { - if (OB_FAIL(fill_task_record(result, allocator, task_record))) { + if (OB_FAIL(fill_task_record(tenant_id, result, allocator, task_record))) { LOG_WARN("fill index task failed", K(ret), K(result)); } else if (!task_record.is_valid()) { ret = OB_ERR_UNEXPECTED; @@ -2716,10 +2717,10 @@ int ObDDLTaskRecordOperator::check_has_long_running_ddl( SMART_VAR(ObMySQLProxy::MySQLResult, res) { sqlclient::ObMySQLResult *result = nullptr; if (OB_FAIL(sql_string.assign_fmt(" SELECT * FROM %s " - "WHERE tenant_id = %lu AND object_id = %lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, - tenant_id, table_id))) { + "WHERE object_id = %lu", OB_ALL_DDL_TASK_STATUS_TNAME, + table_id))) { LOG_WARN("assign sql string failed", K(ret)); - } else if (OB_FAIL(proxy->read(res, sql_string.ptr()))) { + } else if (OB_FAIL(proxy->read(res, tenant_id, sql_string.ptr()))) { LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -2760,10 +2761,9 @@ int ObDDLTaskRecordOperator::check_has_conflict_ddl( if (OB_FAIL(sql_string.assign_fmt("SELECT time_to_usec(gmt_create) AS create_time, tenant_id, task_id, object_id, target_object_id, ddl_type," "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id," "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s " - "WHERE tenant_id = %lu AND object_id = %lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, - tenant_id, table_id))) { + "WHERE object_id = %lu", OB_ALL_DDL_TASK_STATUS_TNAME, table_id))) { LOG_WARN("assign sql string failed", K(ret)); - } else if (OB_FAIL(proxy->read(res, sql_string.ptr()))) { + } else if (OB_FAIL(proxy->read(res, tenant_id, sql_string.ptr()))) { LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -2773,7 +2773,7 @@ int ObDDLTaskRecordOperator::check_has_conflict_ddl( ObArenaAllocator allocator("DdlTaskRec"); while (OB_SUCC(ret) && !has_conflict_ddl && OB_SUCC(result->next())) { allocator.reuse(); - if (OB_FAIL(fill_task_record(result, allocator, task_record))) { + if (OB_FAIL(fill_task_record(tenant_id, result, allocator, task_record))) { LOG_WARN("failed to fill task record", K(ret)); } else if (task_record.task_id_ != task_id) { switch (ddl_type) { @@ -2851,7 +2851,8 @@ int ObDDLTaskRecordOperator::check_has_index_task( return ret; } -int ObDDLTaskRecordOperator::get_task_record(const ObSqlString &sql_string, +int ObDDLTaskRecordOperator::get_task_record(const uint64_t tenant_id, + const ObSqlString &sql_string, common::ObMySQLProxy &proxy, common::ObIAllocator &allocator, common::ObIArray &records) @@ -2865,14 +2866,22 @@ int ObDDLTaskRecordOperator::get_task_record(const ObSqlString &sql_string, SMART_VAR(ObMySQLProxy::MySQLResult, res) { ObDDLTaskRecord record; sqlclient::ObMySQLResult *result = NULL; - if (OB_FAIL(proxy.read(res, sql_string.ptr()))) { - LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); + if (OB_INVALID_TENANT_ID == tenant_id) { + if (OB_FAIL(proxy.read(res, sql_string.ptr()))) { + LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); + } + } else { + if (OB_FAIL(proxy.read(res, tenant_id, sql_string.ptr()))) { + LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); + } + } + if (OB_FAIL(ret)) { } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("fail to get sql result", K(ret), KP(result)); } else { while (OB_SUCC(ret) && OB_SUCC(result->next())) { - if (OB_FAIL(fill_task_record(result, allocator, record))) { + if (OB_FAIL(fill_task_record(tenant_id, result, allocator, record))) { LOG_WARN("fill index task failed", K(ret), K(result)); } else if (!record.is_valid()) { ret = OB_ERR_UNEXPECTED; @@ -2890,7 +2899,8 @@ int ObDDLTaskRecordOperator::get_task_record(const ObSqlString &sql_string, return ret; } -int ObDDLTaskRecordOperator::get_ddl_task_record(const int64_t task_id, +int ObDDLTaskRecordOperator::get_ddl_task_record(const uint64_t tenant_id, + const int64_t task_id, common::ObMySQLProxy &proxy, common::ObIAllocator &allocator, ObDDLTaskRecord &record) @@ -2903,9 +2913,9 @@ int ObDDLTaskRecordOperator::get_ddl_task_record(const int64_t task_id, LOG_WARN("invalid argument", K(ret), K(proxy.is_inited())); } else if (OB_FAIL(sql_string.assign_fmt(" SELECT time_to_usec(gmt_create) AS create_time, tenant_id, task_id, object_id, target_object_id, ddl_type, " "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, " - "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s WHERE task_id=%lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, task_id))) { + "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s WHERE task_id=%lu", OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) { LOG_WARN("assign sql string failed", K(ret), K(task_id)); - } else if (OB_FAIL(get_task_record(sql_string, proxy, allocator, task_records))) { + } else if (OB_FAIL(get_task_record(tenant_id, sql_string, proxy, allocator, task_records))) { LOG_WARN("get task record failed", K(ret), K(sql_string)); } else if (task_records.count() != 1) { ret = OB_ERR_UNEXPECTED; @@ -2932,13 +2942,13 @@ int ObDDLTaskRecordOperator::get_all_ddl_task_record(common::ObMySQLProxy &proxy "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, " "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s ", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME))) { LOG_WARN("assign sql string failed", K(ret)); - } else if (OB_FAIL(get_task_record(sql_string, proxy, allocator, records))) { + } else if (OB_FAIL(get_task_record(OB_INVALID_TENANT_ID, sql_string, proxy, allocator, records))) { LOG_WARN("get task record failed", K(ret), K(sql_string)); } return ret; } -int ObDDLTaskRecordOperator::check_task_id_exist(common::ObMySQLProxy &proxy, const int64_t task_id, bool &exist) +int ObDDLTaskRecordOperator::check_task_id_exist(common::ObMySQLProxy &proxy, const uint64_t tenant_id, const int64_t task_id, bool &exist) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!proxy.is_inited())) { @@ -2948,10 +2958,9 @@ int ObDDLTaskRecordOperator::check_task_id_exist(common::ObMySQLProxy &proxy, co ObSqlString sql_string; SMART_VAR(ObMySQLProxy::MySQLResult, res) { sqlclient::ObMySQLResult *result = NULL; - // TODO: - if (OB_FAIL(sql_string.assign_fmt("SELECT count(*) as have FROM %s WHERE task_id=%lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, task_id))) { + if (OB_FAIL(sql_string.assign_fmt("SELECT count(*) as have FROM %s WHERE task_id=%lu", OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) { LOG_WARN("assign sql string failed", K(ret)); - } else if (OB_FAIL(proxy.read(res, sql_string.ptr()))) { + } else if (OB_FAIL(proxy.read(res, tenant_id, sql_string.ptr()))) { LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); } else if (OB_ISNULL((result = res.get_result()))) { ret = OB_ERR_UNEXPECTED; @@ -3063,6 +3072,7 @@ int ObDDLTaskRecordOperator::insert_record( } int ObDDLTaskRecordOperator::fill_task_record( + const uint64_t tenant_id, const common::sqlclient::ObMySQLResult *result_row, common::ObIAllocator &allocator, ObDDLTaskRecord &task_record) @@ -3095,6 +3105,14 @@ int ObDDLTaskRecordOperator::fill_task_record( EXTRACT_INT_FIELD_MYSQL(*result_row, "execution_id", task_record.execution_id_, int64_t); EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "message_unhex", task_message); EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "ddl_stmt_str_unhex", ddl_stmt_str); + if (OB_SUCC(ret) && OB_INVALID_TENANT_ID != tenant_id) { + if (OB_INVALID_TENANT_ID != task_record.tenant_id_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tenant id not empty, cannot overwrite it", K(ret)); + } else { + task_record.tenant_id_ = tenant_id; + } + } if (OB_SUCC(ret)) { SCN check_snapshot_version; if (OB_FAIL(check_snapshot_version.convert_for_tx(task_record.snapshot_version_))) { diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index fa3fae3ed9..fcb96de1f0 100755 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -218,6 +218,7 @@ public: int64_t &execution_id); static int get_ddl_task_record( + const uint64_t tenant_id, const int64_t task_id, common::ObMySQLProxy &proxy, common::ObIAllocator &allocator, @@ -229,12 +230,14 @@ public: static int check_task_id_exist( common::ObMySQLProxy &proxy, + const uint64_t tenant_id, const int64_t task_id, bool &exist); static int check_is_adding_constraint( common::ObMySQLProxy *proxy, common::ObIAllocator &allocator, + const uint64_t tenant_id, const uint64_t table_id, bool &is_building); @@ -275,6 +278,7 @@ public: private: static int fill_task_record( + const uint64_t tenant_id, const common::sqlclient::ObMySQLResult *result_row, common::ObIAllocator &allocator, ObDDLTaskRecord &task_record); @@ -286,6 +290,7 @@ private: const uint64_t session_id); static int get_task_record( + const uint64_t tenant_id, const ObSqlString &sql_string, common::ObMySQLProxy &proxy, common::ObIAllocator &allocator, diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index b4318a4071..0ab80df9ef 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -11023,7 +11023,7 @@ int ObDDLService::check_is_offline_ddl(ObAlterTableArg &alter_table_arg, table_id, has_index_operation))) { LOG_WARN("check has index operation failed", K(ret)); - } else if (OB_FAIL(check_is_adding_constraint(table_id, is_adding_constraint))) { + } else if (OB_FAIL(check_is_adding_constraint(tenant_id, table_id, is_adding_constraint))) { LOG_WARN("failed to call check_is_adding_constraint", K(ret)); } else if (has_index_operation) { ret = OB_NOT_SUPPORTED; @@ -11081,10 +11081,10 @@ int ObDDLService::check_has_index_operation(ObSchemaGetterGuard &schema_guard, } // check if is adding check constraint, foreign key, not null constraint -int ObDDLService::check_is_adding_constraint(const uint64_t table_id, bool &is_building) +int ObDDLService::check_is_adding_constraint(const uint64_t tenant_id, const uint64_t table_id, bool &is_building) { ObArenaAllocator allocator(lib::ObLabel("DdlTasRecord")); - return ObDDLTaskRecordOperator::check_is_adding_constraint(sql_proxy_, allocator, table_id, is_building); + return ObDDLTaskRecordOperator::check_is_adding_constraint(sql_proxy_, allocator, tenant_id, table_id, is_building); } // check whether the foreign key related table is executing offline ddl, creating index, and executin constrtaint task. diff --git a/src/rootserver/ob_ddl_service.h b/src/rootserver/ob_ddl_service.h index 409e7dd4d3..9218300fad 100644 --- a/src/rootserver/ob_ddl_service.h +++ b/src/rootserver/ob_ddl_service.h @@ -1124,7 +1124,7 @@ private: const uint64_t teannt_id, const uint64_t table_id, bool &has_index_operation); - int check_is_adding_constraint(const uint64_t table_id, bool &is_building); + int check_is_adding_constraint(const uint64_t tenant_id, const uint64_t table_id, bool &is_building); int modify_tenant_inner_phase(const obrpc::ObModifyTenantArg &arg, const ObTenantSchema *orig_tenant_schema, ObSchemaGetterGuard &schema_guard, diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.cpp b/src/storage/ddl/ob_ddl_redo_log_writer.cpp index ea9b486460..a564f4bb60 100755 --- a/src/storage/ddl/ob_ddl_redo_log_writer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_writer.cpp @@ -170,6 +170,7 @@ int ObDDLCtrlSpeedItem::check_cur_node_is_leader(bool &is_leader) int ObDDLCtrlSpeedItem::do_sleep( const int64_t next_available_ts, + const uint64_t tenant_id, const int64_t task_id, ObDDLKvMgrHandle &ddl_kv_mgr_handle, int64_t &real_sleep_us) @@ -183,9 +184,9 @@ int ObDDLCtrlSpeedItem::do_sleep( if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); - } else if (next_available_ts <= 0 || task_id == 0) { + } else if (next_available_ts <= 0 || OB_INVALID_TENANT_ID == tenant_id || task_id == 0) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument.", K(ret), K(next_available_ts), K(task_id)); + LOG_WARN("invalid argument.", K(ret), K(next_available_ts), K(tenant_id), K(task_id)); } else if (OB_TMP_FAIL(check_need_stop_write(ddl_kv_mgr_handle, is_need_stop_write))) { LOG_WARN("fail to check need stop write", K(tmp_ret), K(ddl_kv_mgr_handle)); } @@ -197,7 +198,7 @@ int ObDDLCtrlSpeedItem::do_sleep( // TODO YIREN (FIXME-20221017), exit when task is canceled, etc. ob_usleep(SLEEP_INTERVAL); if (0 == loop_cnt % 100) { - if (OB_TMP_FAIL(rootserver::ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy, task_id, is_exist))) { + if (OB_TMP_FAIL(rootserver::ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy, tenant_id, task_id, is_exist))) { is_exist = true; LOG_WARN("check task id exist failed", K(tmp_ret), K(task_id)); } else { @@ -254,6 +255,7 @@ int ObDDLCtrlSpeedItem::check_need_stop_write(ObDDLKvMgrHandle &ddl_kv_mgr_handl // calculate the sleep time for the input bytes, sleep. int ObDDLCtrlSpeedItem::limit_and_sleep( const int64_t bytes, + const uint64_t tenant_id, const int64_t task_id, ObDDLKvMgrHandle &ddl_kv_mgr_handle, int64_t &real_sleep_us) @@ -266,9 +268,9 @@ int ObDDLCtrlSpeedItem::limit_and_sleep( ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if ((disk_used_stop_write_threshold_ <= 0 - || disk_used_stop_write_threshold_ > 100) || bytes < 0 || 0 == task_id) { + || disk_used_stop_write_threshold_ > 100) || bytes < 0 || OB_INVALID_TENANT_ID == tenant_id || 0 == task_id) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument.", K(ret), K(disk_used_stop_write_threshold_), K(bytes), K(task_id)); + LOG_WARN("invalid argument.", K(ret), K(disk_used_stop_write_threshold_), K(bytes), K(tenant_id), K(task_id)); } else if (OB_FAIL(cal_limit(bytes, next_available_ts))) { LOG_WARN("fail to calculate sleep time", K(ret), K(bytes), K(next_available_ts)); } else if (OB_ISNULL(GCTX.bandwidth_throttle_)) { @@ -279,7 +281,7 @@ int ObDDLCtrlSpeedItem::limit_and_sleep( INT64_MAX, &transmit_sleep_us))) { LOG_WARN("fail to limit out and sleep", K(ret), K(bytes), K(transmit_sleep_us)); - } else if (OB_FAIL(do_sleep(next_available_ts, task_id, ddl_kv_mgr_handle, real_sleep_us))) { + } else if (OB_FAIL(do_sleep(next_available_ts, tenant_id, task_id, ddl_kv_mgr_handle, real_sleep_us))) { LOG_WARN("fail to sleep", K(ret), K(next_available_ts), K(real_sleep_us)); } else {/* do nothing. */} return ret; @@ -400,6 +402,7 @@ int ObDDLCtrlSpeedHandle::limit_and_sleep(const uint64_t tenant_id, ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected err, ctrl speed item is nullptr", K(ret), K(speed_handle_key)); } else if (OB_FAIL(speed_handle_item->limit_and_sleep(bytes, + tenant_id, task_id, ddl_kv_mgr_handle, real_sleep_us))) { diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.h b/src/storage/ddl/ob_ddl_redo_log_writer.h index 9d76fa1237..1898dc9118 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.h +++ b/src/storage/ddl/ob_ddl_redo_log_writer.h @@ -56,6 +56,7 @@ public: int init(const share::ObLSID &ls_id); int refresh(); int limit_and_sleep(const int64_t bytes, + const uint64_t tenant_id, const int64_t task_id, ObDDLKvMgrHandle &ddl_kv_mgr_handle, int64_t &real_sleep_us); @@ -72,6 +73,7 @@ private: int check_cur_node_is_leader(bool &is_leader); int cal_limit(const int64_t bytes, int64_t &next_available_ts); int do_sleep(const int64_t next_available_ts, + const uint64_t tenant_id, const int64_t task_id, ObDDLKvMgrHandle &ddl_kv_mgr_handle, int64_t &real_sleep_us);